package com.xuqm.im.service; import com.xuqm.im.cluster.ImClusterPublisher; import com.xuqm.im.entity.ImMessageEntity; import com.xuqm.im.entity.ImOfflineMessageEntity; import com.xuqm.im.repository.ImMessageRepository; import com.xuqm.im.repository.ImOfflineMessageRepository; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.UUID; @Service public class OfflineMessageSyncService { private static final Logger log = LoggerFactory.getLogger(OfflineMessageSyncService.class); private final ImOfflineMessageRepository offlineMessageRepository; private final ImMessageRepository messageRepository; private final ImClusterPublisher clusterPublisher; public OfflineMessageSyncService(ImOfflineMessageRepository offlineMessageRepository, ImMessageRepository messageRepository, ImClusterPublisher clusterPublisher) { this.offlineMessageRepository = offlineMessageRepository; this.messageRepository = messageRepository; this.clusterPublisher = clusterPublisher; } @Transactional public void storeOfflineMessage(String appId, String userId, String messageId) { ImOfflineMessageEntity entity = new ImOfflineMessageEntity(); entity.setId(UUID.randomUUID().toString()); entity.setAppId(appId); entity.setUserId(userId); entity.setMessageId(messageId); entity.setDelivered(false); entity.setCreatedAt(LocalDateTime.now()); offlineMessageRepository.save(entity); log.debug("Stored offline message appId={} userId={} messageId={}", appId, userId, messageId); } @Transactional public void syncAndDeliver(String appId, String userId) { List offlineMessages = offlineMessageRepository .findByAppIdAndUserIdAndDeliveredFalse(appId, userId); if (offlineMessages.isEmpty()) { return; } List deliveredIds = new ArrayList<>(); for (ImOfflineMessageEntity offline : offlineMessages) { ImMessageEntity message = messageRepository.findById(offline.getMessageId()).orElse(null); if (message != null) { clusterPublisher.publish("/user/" + userId + "/queue/messages", message); deliveredIds.add(offline.getId()); log.debug("Delivered offline message appId={} userId={} messageId={}", appId, userId, message.getId()); } } if (!deliveredIds.isEmpty()) { offlineMessageRepository.markDeliveredByIds(deliveredIds); log.info("Synced {} offline messages for appId={} userId={}", deliveredIds.size(), appId, userId); } offlineMessageRepository.deleteByAppIdAndUserIdAndDeliveredTrue(appId, userId); } public long countUndelivered(String appId, String userId) { return offlineMessageRepository.countUndeliveredByAppIdAndUserId(appId, userId); } @Transactional public List syncAndReturn(String appId, String userId) { List offlineMessages = offlineMessageRepository .findByAppIdAndUserIdAndDeliveredFalse(appId, userId); List result = new ArrayList<>(); List deliveredIds = new ArrayList<>(); for (ImOfflineMessageEntity offline : offlineMessages) { ImMessageEntity message = messageRepository.findById(offline.getMessageId()).orElse(null); if (message != null) { result.add(message); deliveredIds.add(offline.getId()); } } if (!deliveredIds.isEmpty()) { offlineMessageRepository.markDeliveredByIds(deliveredIds); } offlineMessageRepository.deleteByAppIdAndUserIdAndDeliveredTrue(appId, userId); return result; } }