From 37f34876be545c852b63cd58d3216388c424c878 Mon Sep 17 00:00:00 2001 From: XuqmGroup Date: Fri, 24 Apr 2026 15:08:54 +0800 Subject: [PATCH] =?UTF-8?q?feat(im):=20Redis=20Pub/Sub=20=E9=9B=86?= =?UTF-8?q?=E7=BE=A4=E5=B9=BF=E6=92=AD=EF=BC=8C=E6=94=AF=E6=8C=81=E6=B0=B4?= =?UTF-8?q?=E5=B9=B3=E6=89=A9=E5=B1=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 架构变更: - 新增 cluster/ClusterMessage.java:消息信封 record - 新增 cluster/ImClusterPublisher.java:发布到 Redis im:broadcast 频道 - 新增 cluster/ImClusterListener.java:订阅 Redis,转发给本地 WS 会话 - 新增 config/RedisClusterConfig.java:注册 Redis 监听容器 - 修改 MessageService:send/revoke 改用 ImClusterPublisher 替代直接推送 集群路由原理:任意节点收到消息 → 写 DB → 发布 Redis → 所有节点监听并尝试推送 → 只有持有目标 session 的节点实际送达 Co-Authored-By: Claude Sonnet 4.6 --- .../com/xuqm/im/cluster/ClusterMessage.java | 5 +++ .../xuqm/im/cluster/ImClusterListener.java | 33 +++++++++++++++++++ .../xuqm/im/cluster/ImClusterPublisher.java | 32 ++++++++++++++++++ .../xuqm/im/config/RedisClusterConfig.java | 24 ++++++++++++++ .../com/xuqm/im/service/MessageService.java | 20 +++++------ 5 files changed, 104 insertions(+), 10 deletions(-) create mode 100644 im-service/src/main/java/com/xuqm/im/cluster/ClusterMessage.java create mode 100644 im-service/src/main/java/com/xuqm/im/cluster/ImClusterListener.java create mode 100644 im-service/src/main/java/com/xuqm/im/cluster/ImClusterPublisher.java create mode 100644 im-service/src/main/java/com/xuqm/im/config/RedisClusterConfig.java diff --git a/im-service/src/main/java/com/xuqm/im/cluster/ClusterMessage.java b/im-service/src/main/java/com/xuqm/im/cluster/ClusterMessage.java new file mode 100644 index 0000000..94b98aa --- /dev/null +++ b/im-service/src/main/java/com/xuqm/im/cluster/ClusterMessage.java @@ -0,0 +1,5 @@ +package com.xuqm.im.cluster; + +import com.xuqm.im.entity.ImMessageEntity; + +public record ClusterMessage(String destination, ImMessageEntity message) {} diff --git a/im-service/src/main/java/com/xuqm/im/cluster/ImClusterListener.java b/im-service/src/main/java/com/xuqm/im/cluster/ImClusterListener.java new file mode 100644 index 0000000..cf22bae --- /dev/null +++ b/im-service/src/main/java/com/xuqm/im/cluster/ImClusterListener.java @@ -0,0 +1,33 @@ +package com.xuqm.im.cluster; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.messaging.simp.SimpMessagingTemplate; +import org.springframework.stereotype.Component; + +@Component +public class ImClusterListener implements MessageListener { + + private static final Logger log = LoggerFactory.getLogger(ImClusterListener.class); + + private final SimpMessagingTemplate messagingTemplate; + private final ObjectMapper objectMapper; + + public ImClusterListener(SimpMessagingTemplate messagingTemplate, ObjectMapper objectMapper) { + this.messagingTemplate = messagingTemplate; + this.objectMapper = objectMapper; + } + + @Override + public void onMessage(Message message, byte[] pattern) { + try { + ClusterMessage envelope = objectMapper.readValue(message.getBody(), ClusterMessage.class); + messagingTemplate.convertAndSend(envelope.destination(), envelope.message()); + } catch (Exception e) { + log.error("Failed to process cluster message from Redis", e); + } + } +} diff --git a/im-service/src/main/java/com/xuqm/im/cluster/ImClusterPublisher.java b/im-service/src/main/java/com/xuqm/im/cluster/ImClusterPublisher.java new file mode 100644 index 0000000..6b4c546 --- /dev/null +++ b/im-service/src/main/java/com/xuqm/im/cluster/ImClusterPublisher.java @@ -0,0 +1,32 @@ +package com.xuqm.im.cluster; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.xuqm.im.entity.ImMessageEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.stereotype.Service; + +@Service +public class ImClusterPublisher { + + private static final String CHANNEL = "im:broadcast"; + private static final Logger log = LoggerFactory.getLogger(ImClusterPublisher.class); + + private final StringRedisTemplate redisTemplate; + private final ObjectMapper objectMapper; + + public ImClusterPublisher(StringRedisTemplate redisTemplate, ObjectMapper objectMapper) { + this.redisTemplate = redisTemplate; + this.objectMapper = objectMapper; + } + + public void publish(String destination, ImMessageEntity message) { + try { + String json = objectMapper.writeValueAsString(new ClusterMessage(destination, message)); + redisTemplate.convertAndSend(CHANNEL, json); + } catch (Exception e) { + log.error("Failed to publish cluster message to Redis", e); + } + } +} diff --git a/im-service/src/main/java/com/xuqm/im/config/RedisClusterConfig.java b/im-service/src/main/java/com/xuqm/im/config/RedisClusterConfig.java new file mode 100644 index 0000000..11d3463 --- /dev/null +++ b/im-service/src/main/java/com/xuqm/im/config/RedisClusterConfig.java @@ -0,0 +1,24 @@ +package com.xuqm.im.config; + +import com.xuqm.im.cluster.ImClusterListener; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.listener.ChannelTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; + +@Configuration +public class RedisClusterConfig { + + private static final String CHANNEL = "im:broadcast"; + + @Bean + public RedisMessageListenerContainer imClusterListenerContainer( + RedisConnectionFactory connectionFactory, + ImClusterListener listener) { + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + container.addMessageListener(listener, new ChannelTopic(CHANNEL)); + return container; + } +} diff --git a/im-service/src/main/java/com/xuqm/im/service/MessageService.java b/im-service/src/main/java/com/xuqm/im/service/MessageService.java index f4c3d88..9222254 100644 --- a/im-service/src/main/java/com/xuqm/im/service/MessageService.java +++ b/im-service/src/main/java/com/xuqm/im/service/MessageService.java @@ -2,6 +2,7 @@ package com.xuqm.im.service; import com.fasterxml.jackson.databind.ObjectMapper; import com.xuqm.common.exception.BusinessException; +import com.xuqm.im.cluster.ImClusterPublisher; import com.xuqm.im.entity.ImMessageEntity; import com.xuqm.im.entity.WebhookConfigEntity; import com.xuqm.im.model.SendMessageRequest; @@ -10,7 +11,6 @@ import com.xuqm.im.repository.WebhookConfigRepository; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; -import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; @@ -28,7 +28,7 @@ public class MessageService { private final ImMessageRepository messageRepository; private final WebhookConfigRepository webhookRepository; private final KeywordFilterService keywordFilterService; - private final SimpMessagingTemplate messagingTemplate; + private final ImClusterPublisher clusterPublisher; private final ObjectMapper objectMapper; @Value("${im.webhook-timeout-ms:3000}") @@ -37,12 +37,12 @@ public class MessageService { public MessageService(ImMessageRepository messageRepository, WebhookConfigRepository webhookRepository, KeywordFilterService keywordFilterService, - SimpMessagingTemplate messagingTemplate, + ImClusterPublisher clusterPublisher, ObjectMapper objectMapper) { this.messageRepository = messageRepository; this.webhookRepository = webhookRepository; this.keywordFilterService = keywordFilterService; - this.messagingTemplate = messagingTemplate; + this.clusterPublisher = clusterPublisher; this.objectMapper = objectMapper; } @@ -71,13 +71,12 @@ public class MessageService { String destination = req.chatType() == ImMessageEntity.ChatType.SINGLE ? "/user/" + req.toId() + "/queue/messages" : "/topic/group/" + req.toId(); - messagingTemplate.convertAndSend(destination, message); + clusterPublisher.publish(destination, message); if (req.chatType() == ImMessageEntity.ChatType.SINGLE && !fromUserId.equals(req.toId())) { - messagingTemplate.convertAndSend("/user/" + fromUserId + "/queue/messages", message); + clusterPublisher.publish("/user/" + fromUserId + "/queue/messages", message); } dispatchWebhooks(appId, message); - return message; } @@ -93,13 +92,14 @@ public class MessageService { message.setStatus(ImMessageEntity.MsgStatus.REVOKED); message.setMsgType(ImMessageEntity.MsgType.REVOKED); ImMessageEntity saved = messageRepository.save(message); + if (saved.getChatType() == ImMessageEntity.ChatType.SINGLE) { - messagingTemplate.convertAndSend("/user/" + saved.getToId() + "/queue/messages", saved); + clusterPublisher.publish("/user/" + saved.getToId() + "/queue/messages", saved); if (!saved.getFromUserId().equals(saved.getToId())) { - messagingTemplate.convertAndSend("/user/" + saved.getFromUserId() + "/queue/messages", saved); + clusterPublisher.publish("/user/" + saved.getFromUserId() + "/queue/messages", saved); } } else { - messagingTemplate.convertAndSend("/topic/group/" + saved.getToId(), saved); + clusterPublisher.publish("/topic/group/" + saved.getToId(), saved); } return saved; }