diff --git a/im-service/src/main/java/com/xuqm/im/cluster/ClusterMessage.java b/im-service/src/main/java/com/xuqm/im/cluster/ClusterMessage.java new file mode 100644 index 0000000..94b98aa --- /dev/null +++ b/im-service/src/main/java/com/xuqm/im/cluster/ClusterMessage.java @@ -0,0 +1,5 @@ +package com.xuqm.im.cluster; + +import com.xuqm.im.entity.ImMessageEntity; + +public record ClusterMessage(String destination, ImMessageEntity message) {} diff --git a/im-service/src/main/java/com/xuqm/im/cluster/ImClusterListener.java b/im-service/src/main/java/com/xuqm/im/cluster/ImClusterListener.java new file mode 100644 index 0000000..cf22bae --- /dev/null +++ b/im-service/src/main/java/com/xuqm/im/cluster/ImClusterListener.java @@ -0,0 +1,33 @@ +package com.xuqm.im.cluster; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Component; + +@Component +public class ImClusterListener implements MessageListener { + + private static final Logger log = LoggerFactory.getLogger(ImClusterListener.class); + + private final SimpMessagingTemplate messagingTemplate; + private final ObjectMapper objectMapper; + + public ImClusterListener(SimpMessagingTemplate messagingTemplate, ObjectMapper objectMapper) { + this.messagingTemplate = messagingTemplate; + this.objectMapper = objectMapper; + } + + @Override + public void onMessage(Message message, byte[] pattern) { + try { + ClusterMessage envelope = objectMapper.readValue(message.getBody(), ClusterMessage.class); + messagingTemplate.convertAndSend(envelope.destination(), envelope.message()); + } catch (Exception e) { + log.error("Failed to process cluster message from Redis", e); + } + } +} diff --git a/im-service/src/main/java/com/xuqm/im/cluster/ImClusterPublisher.java b/im-service/src/main/java/com/xuqm/im/cluster/ImClusterPublisher.java new file mode 100644 index 0000000..6b4c546 --- /dev/null +++ b/im-service/src/main/java/com/xuqm/im/cluster/ImClusterPublisher.java @@ -0,0 +1,32 @@ +package com.xuqm.im.cluster; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.xuqm.im.entity.ImMessageEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +@Service +public class ImClusterPublisher { + + private static final String CHANNEL = "im:broadcast"; + private static final Logger log = LoggerFactory.getLogger(ImClusterPublisher.class); + + private final StringRedisTemplate redisTemplate; + private final ObjectMapper objectMapper; + + public ImClusterPublisher(StringRedisTemplate redisTemplate, ObjectMapper objectMapper) { + this.redisTemplate = redisTemplate; + this.objectMapper = objectMapper; + } + + public void publish(String destination, ImMessageEntity message) { + try { + String json = objectMapper.writeValueAsString(new ClusterMessage(destination, message)); + redisTemplate.convertAndSend(CHANNEL, json); + } catch (Exception e) { + log.error("Failed to publish cluster message to Redis", e); + } + } +} diff --git a/im-service/src/main/java/com/xuqm/im/config/RedisClusterConfig.java b/im-service/src/main/java/com/xuqm/im/config/RedisClusterConfig.java new file mode 100644 index 0000000..11d3463 --- /dev/null +++ b/im-service/src/main/java/com/xuqm/im/config/RedisClusterConfig.java @@ -0,0 +1,24 @@ +package com.xuqm.im.config; + +import com.xuqm.im.cluster.ImClusterListener; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.listener.ChannelTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; + +@Configuration +public class RedisClusterConfig { + + private static final String CHANNEL = "im:broadcast"; + + @Bean + public RedisMessageListenerContainer imClusterListenerContainer( + RedisConnectionFactory connectionFactory, + ImClusterListener listener) { + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + container.addMessageListener(listener, new ChannelTopic(CHANNEL)); + return container; + } +} diff --git a/im-service/src/main/java/com/xuqm/im/service/MessageService.java b/im-service/src/main/java/com/xuqm/im/service/MessageService.java index f4c3d88..9222254 100644 --- a/im-service/src/main/java/com/xuqm/im/service/MessageService.java +++ b/im-service/src/main/java/com/xuqm/im/service/MessageService.java @@ -2,6 +2,7 @@ package com.xuqm.im.service; import com.fasterxml.jackson.databind.ObjectMapper; import com.xuqm.common.exception.BusinessException; +import com.xuqm.im.cluster.ImClusterPublisher; import com.xuqm.im.entity.ImMessageEntity; import com.xuqm.im.entity.WebhookConfigEntity; import com.xuqm.im.model.SendMessageRequest; @@ -10,7 +11,6 @@ import com.xuqm.im.repository.WebhookConfigRepository; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; -import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @@ -28,7 +28,7 @@ public class MessageService { private final ImMessageRepository messageRepository; private final WebhookConfigRepository webhookRepository; private final KeywordFilterService keywordFilterService; - private final SimpMessagingTemplate messagingTemplate; + private final ImClusterPublisher clusterPublisher; private final ObjectMapper objectMapper; @Value("${im.webhook-timeout-ms:3000}") @@ -37,12 +37,12 @@ public class MessageService { public MessageService(ImMessageRepository messageRepository, WebhookConfigRepository webhookRepository, KeywordFilterService keywordFilterService, - SimpMessagingTemplate messagingTemplate, + ImClusterPublisher clusterPublisher, ObjectMapper objectMapper) { this.messageRepository = messageRepository; this.webhookRepository = webhookRepository; this.keywordFilterService = keywordFilterService; - this.messagingTemplate = messagingTemplate; + this.clusterPublisher = clusterPublisher; this.objectMapper = objectMapper; } @@ -71,13 +71,12 @@ public class MessageService { String destination = req.chatType() == ImMessageEntity.ChatType.SINGLE ? "/user/" + req.toId() + "/queue/messages" : "/topic/group/" + req.toId(); - messagingTemplate.convertAndSend(destination, message); + clusterPublisher.publish(destination, message); if (req.chatType() == ImMessageEntity.ChatType.SINGLE && !fromUserId.equals(req.toId())) { - messagingTemplate.convertAndSend("/user/" + fromUserId + "/queue/messages", message); + clusterPublisher.publish("/user/" + fromUserId + "/queue/messages", message); } dispatchWebhooks(appId, message); - return message; } @@ -93,13 +92,14 @@ public class MessageService { message.setStatus(ImMessageEntity.MsgStatus.REVOKED); message.setMsgType(ImMessageEntity.MsgType.REVOKED); ImMessageEntity saved = messageRepository.save(message); + if (saved.getChatType() == ImMessageEntity.ChatType.SINGLE) { - messagingTemplate.convertAndSend("/user/" + saved.getToId() + "/queue/messages", saved); + clusterPublisher.publish("/user/" + saved.getToId() + "/queue/messages", saved); if (!saved.getFromUserId().equals(saved.getToId())) { - messagingTemplate.convertAndSend("/user/" + saved.getFromUserId() + "/queue/messages", saved); + clusterPublisher.publish("/user/" + saved.getFromUserId() + "/queue/messages", saved); } } else { - messagingTemplate.convertAndSend("/topic/group/" + saved.getToId(), saved); + clusterPublisher.publish("/topic/group/" + saved.getToId(), saved); } return saved; }