feat(im): Redis Pub/Sub 集群广播,支持水平扩展
架构变更: - 新增 cluster/ClusterMessage.java:消息信封 record - 新增 cluster/ImClusterPublisher.java:发布到 Redis im:broadcast 频道 - 新增 cluster/ImClusterListener.java:订阅 Redis,转发给本地 WS 会话 - 新增 config/RedisClusterConfig.java:注册 Redis 监听容器 - 修改 MessageService:send/revoke 改用 ImClusterPublisher 替代直接推送 集群路由原理:任意节点收到消息 → 写 DB → 发布 Redis → 所有节点监听并尝试推送 → 只有持有目标 session 的节点实际送达 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
这个提交包含在:
父节点
94aa4001d3
当前提交
37f34876be
@ -0,0 +1,5 @@
|
|||||||
|
package com.xuqm.im.cluster;
|
||||||
|
|
||||||
|
import com.xuqm.im.entity.ImMessageEntity;
|
||||||
|
|
||||||
|
public record ClusterMessage(String destination, ImMessageEntity message) {}
|
||||||
@ -0,0 +1,33 @@
|
|||||||
|
package com.xuqm.im.cluster;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.data.redis.connection.Message;
|
||||||
|
import org.springframework.data.redis.connection.MessageListener;
|
||||||
|
import org.springframework.messaging.simp.SimpMessagingTemplate;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
@Component
|
||||||
|
public class ImClusterListener implements MessageListener {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(ImClusterListener.class);
|
||||||
|
|
||||||
|
private final SimpMessagingTemplate messagingTemplate;
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
|
|
||||||
|
public ImClusterListener(SimpMessagingTemplate messagingTemplate, ObjectMapper objectMapper) {
|
||||||
|
this.messagingTemplate = messagingTemplate;
|
||||||
|
this.objectMapper = objectMapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(Message message, byte[] pattern) {
|
||||||
|
try {
|
||||||
|
ClusterMessage envelope = objectMapper.readValue(message.getBody(), ClusterMessage.class);
|
||||||
|
messagingTemplate.convertAndSend(envelope.destination(), envelope.message());
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Failed to process cluster message from Redis", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,32 @@
|
|||||||
|
package com.xuqm.im.cluster;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import com.xuqm.im.entity.ImMessageEntity;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class ImClusterPublisher {
|
||||||
|
|
||||||
|
private static final String CHANNEL = "im:broadcast";
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(ImClusterPublisher.class);
|
||||||
|
|
||||||
|
private final StringRedisTemplate redisTemplate;
|
||||||
|
private final ObjectMapper objectMapper;
|
||||||
|
|
||||||
|
public ImClusterPublisher(StringRedisTemplate redisTemplate, ObjectMapper objectMapper) {
|
||||||
|
this.redisTemplate = redisTemplate;
|
||||||
|
this.objectMapper = objectMapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void publish(String destination, ImMessageEntity message) {
|
||||||
|
try {
|
||||||
|
String json = objectMapper.writeValueAsString(new ClusterMessage(destination, message));
|
||||||
|
redisTemplate.convertAndSend(CHANNEL, json);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Failed to publish cluster message to Redis", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -0,0 +1,24 @@
|
|||||||
|
package com.xuqm.im.config;
|
||||||
|
|
||||||
|
import com.xuqm.im.cluster.ImClusterListener;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||||
|
import org.springframework.data.redis.listener.ChannelTopic;
|
||||||
|
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class RedisClusterConfig {
|
||||||
|
|
||||||
|
private static final String CHANNEL = "im:broadcast";
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public RedisMessageListenerContainer imClusterListenerContainer(
|
||||||
|
RedisConnectionFactory connectionFactory,
|
||||||
|
ImClusterListener listener) {
|
||||||
|
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
|
||||||
|
container.setConnectionFactory(connectionFactory);
|
||||||
|
container.addMessageListener(listener, new ChannelTopic(CHANNEL));
|
||||||
|
return container;
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -2,6 +2,7 @@ package com.xuqm.im.service;
|
|||||||
|
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.xuqm.common.exception.BusinessException;
|
import com.xuqm.common.exception.BusinessException;
|
||||||
|
import com.xuqm.im.cluster.ImClusterPublisher;
|
||||||
import com.xuqm.im.entity.ImMessageEntity;
|
import com.xuqm.im.entity.ImMessageEntity;
|
||||||
import com.xuqm.im.entity.WebhookConfigEntity;
|
import com.xuqm.im.entity.WebhookConfigEntity;
|
||||||
import com.xuqm.im.model.SendMessageRequest;
|
import com.xuqm.im.model.SendMessageRequest;
|
||||||
@ -10,7 +11,6 @@ import com.xuqm.im.repository.WebhookConfigRepository;
|
|||||||
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.data.domain.Page;
|
import org.springframework.data.domain.Page;
|
||||||
import org.springframework.data.domain.PageRequest;
|
import org.springframework.data.domain.PageRequest;
|
||||||
import org.springframework.messaging.simp.SimpMessagingTemplate;
|
|
||||||
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.scheduling.annotation.Async;
|
||||||
import org.springframework.stereotype.Service;
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
@ -28,7 +28,7 @@ public class MessageService {
|
|||||||
private final ImMessageRepository messageRepository;
|
private final ImMessageRepository messageRepository;
|
||||||
private final WebhookConfigRepository webhookRepository;
|
private final WebhookConfigRepository webhookRepository;
|
||||||
private final KeywordFilterService keywordFilterService;
|
private final KeywordFilterService keywordFilterService;
|
||||||
private final SimpMessagingTemplate messagingTemplate;
|
private final ImClusterPublisher clusterPublisher;
|
||||||
private final ObjectMapper objectMapper;
|
private final ObjectMapper objectMapper;
|
||||||
|
|
||||||
@Value("${im.webhook-timeout-ms:3000}")
|
@Value("${im.webhook-timeout-ms:3000}")
|
||||||
@ -37,12 +37,12 @@ public class MessageService {
|
|||||||
public MessageService(ImMessageRepository messageRepository,
|
public MessageService(ImMessageRepository messageRepository,
|
||||||
WebhookConfigRepository webhookRepository,
|
WebhookConfigRepository webhookRepository,
|
||||||
KeywordFilterService keywordFilterService,
|
KeywordFilterService keywordFilterService,
|
||||||
SimpMessagingTemplate messagingTemplate,
|
ImClusterPublisher clusterPublisher,
|
||||||
ObjectMapper objectMapper) {
|
ObjectMapper objectMapper) {
|
||||||
this.messageRepository = messageRepository;
|
this.messageRepository = messageRepository;
|
||||||
this.webhookRepository = webhookRepository;
|
this.webhookRepository = webhookRepository;
|
||||||
this.keywordFilterService = keywordFilterService;
|
this.keywordFilterService = keywordFilterService;
|
||||||
this.messagingTemplate = messagingTemplate;
|
this.clusterPublisher = clusterPublisher;
|
||||||
this.objectMapper = objectMapper;
|
this.objectMapper = objectMapper;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,13 +71,12 @@ public class MessageService {
|
|||||||
String destination = req.chatType() == ImMessageEntity.ChatType.SINGLE
|
String destination = req.chatType() == ImMessageEntity.ChatType.SINGLE
|
||||||
? "/user/" + req.toId() + "/queue/messages"
|
? "/user/" + req.toId() + "/queue/messages"
|
||||||
: "/topic/group/" + req.toId();
|
: "/topic/group/" + req.toId();
|
||||||
messagingTemplate.convertAndSend(destination, message);
|
clusterPublisher.publish(destination, message);
|
||||||
if (req.chatType() == ImMessageEntity.ChatType.SINGLE && !fromUserId.equals(req.toId())) {
|
if (req.chatType() == ImMessageEntity.ChatType.SINGLE && !fromUserId.equals(req.toId())) {
|
||||||
messagingTemplate.convertAndSend("/user/" + fromUserId + "/queue/messages", message);
|
clusterPublisher.publish("/user/" + fromUserId + "/queue/messages", message);
|
||||||
}
|
}
|
||||||
|
|
||||||
dispatchWebhooks(appId, message);
|
dispatchWebhooks(appId, message);
|
||||||
|
|
||||||
return message;
|
return message;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -93,13 +92,14 @@ public class MessageService {
|
|||||||
message.setStatus(ImMessageEntity.MsgStatus.REVOKED);
|
message.setStatus(ImMessageEntity.MsgStatus.REVOKED);
|
||||||
message.setMsgType(ImMessageEntity.MsgType.REVOKED);
|
message.setMsgType(ImMessageEntity.MsgType.REVOKED);
|
||||||
ImMessageEntity saved = messageRepository.save(message);
|
ImMessageEntity saved = messageRepository.save(message);
|
||||||
|
|
||||||
if (saved.getChatType() == ImMessageEntity.ChatType.SINGLE) {
|
if (saved.getChatType() == ImMessageEntity.ChatType.SINGLE) {
|
||||||
messagingTemplate.convertAndSend("/user/" + saved.getToId() + "/queue/messages", saved);
|
clusterPublisher.publish("/user/" + saved.getToId() + "/queue/messages", saved);
|
||||||
if (!saved.getFromUserId().equals(saved.getToId())) {
|
if (!saved.getFromUserId().equals(saved.getToId())) {
|
||||||
messagingTemplate.convertAndSend("/user/" + saved.getFromUserId() + "/queue/messages", saved);
|
clusterPublisher.publish("/user/" + saved.getFromUserId() + "/queue/messages", saved);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
messagingTemplate.convertAndSend("/topic/group/" + saved.getToId(), saved);
|
clusterPublisher.publish("/topic/group/" + saved.getToId(), saved);
|
||||||
}
|
}
|
||||||
return saved;
|
return saved;
|
||||||
}
|
}
|
||||||
|
|||||||
正在加载...
在新工单中引用
屏蔽一个用户