XuqmGroup-Server/im-service/src/main/java/com/xuqm/im/service/OfflineMessageSyncService.java

98 行
4.1 KiB
Java

package com.xuqm.im.service;
import com.xuqm.im.cluster.ImClusterPublisher;
import com.xuqm.im.entity.ImMessageEntity;
import com.xuqm.im.entity.ImOfflineMessageEntity;
import com.xuqm.im.repository.ImMessageRepository;
import com.xuqm.im.repository.ImOfflineMessageRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@Service
public class OfflineMessageSyncService {
private static final Logger log = LoggerFactory.getLogger(OfflineMessageSyncService.class);
private final ImOfflineMessageRepository offlineMessageRepository;
private final ImMessageRepository messageRepository;
private final ImClusterPublisher clusterPublisher;
public OfflineMessageSyncService(ImOfflineMessageRepository offlineMessageRepository,
ImMessageRepository messageRepository,
ImClusterPublisher clusterPublisher) {
this.offlineMessageRepository = offlineMessageRepository;
this.messageRepository = messageRepository;
this.clusterPublisher = clusterPublisher;
}
@Transactional
2026-05-07 19:39:42 +08:00
public void storeOfflineMessage(String appKey, String userId, String messageId) {
ImOfflineMessageEntity entity = new ImOfflineMessageEntity();
entity.setId(UUID.randomUUID().toString());
2026-05-07 19:39:42 +08:00
entity.setAppId(appKey);
entity.setUserId(userId);
entity.setMessageId(messageId);
entity.setDelivered(false);
entity.setCreatedAt(LocalDateTime.now());
offlineMessageRepository.save(entity);
2026-05-07 19:39:42 +08:00
log.debug("Stored offline message appKey={} userId={} messageId={}", appKey, userId, messageId);
}
@Transactional
2026-05-07 19:39:42 +08:00
public void syncAndDeliver(String appKey, String userId) {
List<ImOfflineMessageEntity> offlineMessages = offlineMessageRepository
2026-05-07 19:39:42 +08:00
.findByAppIdAndUserIdAndDeliveredFalse(appKey, userId);
if (offlineMessages.isEmpty()) {
return;
}
List<String> deliveredIds = new ArrayList<>();
for (ImOfflineMessageEntity offline : offlineMessages) {
ImMessageEntity message = messageRepository.findById(offline.getMessageId()).orElse(null);
if (message != null) {
clusterPublisher.publish("/user/" + userId + "/queue/messages", message);
deliveredIds.add(offline.getId());
2026-05-07 19:39:42 +08:00
log.debug("Delivered offline message appKey={} userId={} messageId={}", appKey, userId, message.getId());
}
}
if (!deliveredIds.isEmpty()) {
offlineMessageRepository.markDeliveredByIds(deliveredIds);
2026-05-07 19:39:42 +08:00
log.info("Synced {} offline messages for appKey={} userId={}", deliveredIds.size(), appKey, userId);
}
2026-05-07 19:39:42 +08:00
offlineMessageRepository.deleteByAppIdAndUserIdAndDeliveredTrue(appKey, userId);
}
2026-05-07 19:39:42 +08:00
public long countUndelivered(String appKey, String userId) {
return offlineMessageRepository.countUndeliveredByAppIdAndUserId(appKey, userId);
}
@Transactional
2026-05-07 19:39:42 +08:00
public List<ImMessageEntity> syncAndReturn(String appKey, String userId) {
List<ImOfflineMessageEntity> offlineMessages = offlineMessageRepository
2026-05-07 19:39:42 +08:00
.findByAppIdAndUserIdAndDeliveredFalse(appKey, userId);
List<ImMessageEntity> result = new ArrayList<>();
List<String> deliveredIds = new ArrayList<>();
for (ImOfflineMessageEntity offline : offlineMessages) {
ImMessageEntity message = messageRepository.findById(offline.getMessageId()).orElse(null);
if (message != null) {
result.add(message);
deliveredIds.add(offline.getId());
}
}
if (!deliveredIds.isEmpty()) {
offlineMessageRepository.markDeliveredByIds(deliveredIds);
}
2026-05-07 19:39:42 +08:00
offlineMessageRepository.deleteByAppIdAndUserIdAndDeliveredTrue(appKey, userId);
return result;
}
}