package com.xuqm.im.service; import com.fasterxml.jackson.databind.ObjectMapper; import com.xuqm.common.exception.BusinessException; import com.xuqm.im.cluster.ImClusterPublisher; import com.xuqm.im.entity.ImGroupEntity; import com.xuqm.im.entity.ImMessageEntity; import com.xuqm.im.model.ConversationView; import com.xuqm.im.model.EditMessageRequest; import com.xuqm.im.model.MessageReadCallbackPayload; import com.xuqm.im.model.SendMessageRequest; import com.xuqm.im.repository.ImFriendRepository; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.stereotype.Service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.LocalDateTime; import java.time.ZoneOffset; import com.xuqm.im.repository.ImMessageRepository; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; @Service public class MessageService { private static final Logger log = LoggerFactory.getLogger(MessageService.class); private final ImMessageRepository messageRepository; private final KeywordFilterService keywordFilterService; private final GlobalMuteService globalMuteService; private final ImClusterPublisher clusterPublisher; private final ImGroupService groupService; private final BlacklistService blacklistService; private final ConversationStateService conversationStateService; private final ImPushBridgeClient pushBridgeClient; private final ImFeatureConfigClient featureConfigClient; private final ImFriendRepository friendRepository; private final WebhookDispatchService webhookDispatchService; private final ObjectMapper objectMapper; public MessageService(ImMessageRepository messageRepository, KeywordFilterService keywordFilterService, GlobalMuteService globalMuteService, ImClusterPublisher clusterPublisher, ImGroupService groupService, BlacklistService blacklistService, ConversationStateService conversationStateService, ImPushBridgeClient pushBridgeClient, ImFeatureConfigClient featureConfigClient, ImFriendRepository friendRepository, WebhookDispatchService webhookDispatchService, ObjectMapper objectMapper) { this.messageRepository = messageRepository; this.keywordFilterService = keywordFilterService; this.globalMuteService = globalMuteService; this.clusterPublisher = clusterPublisher; this.groupService = groupService; this.blacklistService = blacklistService; this.conversationStateService = conversationStateService; this.pushBridgeClient = pushBridgeClient; this.featureConfigClient = featureConfigClient; this.friendRepository = friendRepository; this.webhookDispatchService = webhookDispatchService; this.objectMapper = objectMapper; } public ImMessageEntity send(String appId, String fromUserId, SendMessageRequest req) { if (globalMuteService.isEnabled(appId)) { throw new BusinessException(403, "当前应用已开启全局禁言"); } String content = req.content(); if (req.msgType() == ImMessageEntity.MsgType.TEXT) { content = keywordFilterService.filter(appId, content); if (content == null) { throw new BusinessException("消息包含违禁内容"); } } ImGroupEntity group = null; boolean receiverBlocksSender = false; if (req.chatType() == ImMessageEntity.ChatType.GROUP) { group = groupService.get(req.toId()); if (!groupService.memberIds(group).contains(fromUserId)) { throw new BusinessException(403, "不在群内"); } if (groupService.isMemberMuted(req.toId(), fromUserId)) { throw new BusinessException(403, "当前用户已被禁言"); } } else if (!isFriend(appId, fromUserId, req.toId()) && !featureConfigClient.allowStrangerMessage(appId)) { throw new BusinessException(403, "仅允许好友之间发送消息"); } else { boolean senderBlocksReceiver = blacklistService.isBlocked(appId, fromUserId, req.toId()); receiverBlocksSender = blacklistService.isBlocked(appId, req.toId(), fromUserId); boolean blacklistSendSuccess = featureConfigClient.blacklistSendSuccess(appId); if (senderBlocksReceiver && receiverBlocksSender) { throw new BusinessException(403, "已被拉黑,无法发送消息"); } if (receiverBlocksSender && !blacklistSendSuccess) { throw new BusinessException(403, "已被拉黑,无法发送消息"); } } ImMessageEntity message = new ImMessageEntity(); message.setId(req.messageId() != null && !req.messageId().isBlank() ? req.messageId() : UUID.randomUUID().toString()); message.setAppId(appId); message.setFromUserId(fromUserId); message.setToId(req.toId()); message.setChatType(req.chatType()); message.setMsgType(req.msgType()); message.setContent(content); message.setStatus(ImMessageEntity.MsgStatus.SENT); message.setMentionedUserIds(req.mentionedUserIds()); message.setCreatedAt(LocalDateTime.now()); ImMessageEntity saved = messageRepository.save(message); if (req.chatType() == ImMessageEntity.ChatType.GROUP) { saved.setGroupReadCount(groupReadCount(appId, req.toId(), saved.getCreatedAt(), saved.getFromUserId())); } if (req.chatType() == ImMessageEntity.ChatType.SINGLE && !fromUserId.equals(req.toId())) { log.debug("echo message back to sender appId={} from={} to={}", appId, fromUserId, req.toId()); clusterPublisher.publish("/user/" + fromUserId + "/queue/messages", saved); if (!receiverBlocksSender) { log.debug("deliver message to receiver appId={} from={} to={}", appId, fromUserId, req.toId()); clusterPublisher.publish("/user/" + req.toId() + "/queue/messages", saved); conversationStateService.clearHiddenForUsers(appId, req.toId(), req.chatType().name(), List.of(fromUserId, req.toId())); pushBridgeClient.notifyUsers( appId, List.of(req.toId()), "新消息", saved.getContent(), buildPushPayload(saved) ); } else { conversationStateService.clearHiddenForUsers(appId, req.toId(), req.chatType().name(), List.of(fromUserId)); } } else if (req.chatType() == ImMessageEntity.ChatType.GROUP) { String destination = "/topic/group/" + req.toId(); log.debug("send message appId={} from={} to={} chatType={} msgType={} destination={}", appId, fromUserId, req.toId(), req.chatType(), req.msgType(), destination); clusterPublisher.publish(destination, saved); List memberIds = groupService.memberIds(group); conversationStateService.clearHiddenForUsers(appId, req.toId(), req.chatType().name(), memberIds); pushBridgeClient.notifyUsers( appId, memberIds.stream() .filter(memberId -> !memberId.equals(fromUserId)) .toList(), "群聊消息", saved.getContent(), buildPushPayload(saved) ); } else { String destination = "/user/" + req.toId() + "/queue/messages"; log.debug("send message appId={} from={} to={} chatType={} msgType={} destination={}", appId, fromUserId, req.toId(), req.chatType(), req.msgType(), destination); if (!receiverBlocksSender) { clusterPublisher.publish(destination, saved); } } dispatchWebhooks(appId, "message.sent", saved); return saved; } private boolean isFriend(String appId, String userId, String friendId) { return friendRepository.existsByAppIdAndUserIdAndFriendId(appId, userId, friendId) || friendRepository.existsByAppIdAndUserIdAndFriendId(appId, friendId, userId); } public ImMessageEntity revoke(String appId, String messageId, String requestUserId) { ImMessageEntity message = messageRepository.findById(messageId) .orElseThrow(() -> new BusinessException(404, "消息不存在")); if (!message.getAppId().equals(appId)) { throw new BusinessException(403, "无权操作"); } if (!message.getFromUserId().equals(requestUserId)) { throw new BusinessException(403, "只能撤回自己发送的消息"); } int recallMinutes = featureConfigClient.messageRecallMinutes(appId); if (recallMinutes > 0 && message.getCreatedAt().plusMinutes(recallMinutes).isBefore(LocalDateTime.now())) { throw new BusinessException(403, "已超过可撤回时长"); } message.setStatus(ImMessageEntity.MsgStatus.REVOKED); message.setMsgType(ImMessageEntity.MsgType.REVOKED); ImMessageEntity saved = messageRepository.save(message); if (saved.getChatType() == ImMessageEntity.ChatType.SINGLE) { log.debug("revoke single messageId={} destinationTo={} destinationFrom={}", saved.getId(), saved.getToId(), saved.getFromUserId()); clusterPublisher.publish("/user/" + saved.getToId() + "/queue/messages", saved); if (!saved.getFromUserId().equals(saved.getToId())) { clusterPublisher.publish("/user/" + saved.getFromUserId() + "/queue/messages", saved); } } else { log.debug("revoke group messageId={} groupId={}", saved.getId(), saved.getToId()); clusterPublisher.publish("/topic/group/" + saved.getToId(), saved); } dispatchWebhooks(appId, "message.revoked", saved); return saved; } public ImMessageEntity edit(String appId, String messageId, String requestUserId, EditMessageRequest req) { ImMessageEntity message = messageRepository.findById(messageId) .orElseThrow(() -> new BusinessException(404, "消息不存在")); if (!message.getAppId().equals(appId)) { throw new BusinessException(403, "无权操作"); } if (!message.getFromUserId().equals(requestUserId)) { throw new BusinessException(403, "只能编辑自己发送的消息"); } if (message.getStatus() == ImMessageEntity.MsgStatus.REVOKED || message.getMsgType() == ImMessageEntity.MsgType.REVOKED) { throw new BusinessException(400, "已撤回消息不能编辑"); } if (message.getMsgType() != ImMessageEntity.MsgType.TEXT) { throw new BusinessException(400, "仅支持编辑文本消息"); } String content = keywordFilterService.filter(appId, req.content()); if (content == null) { throw new BusinessException("消息包含违禁内容"); } message.setContent(content); message.setEditedAt(LocalDateTime.now()); ImMessageEntity saved = messageRepository.save(message); if (saved.getChatType() == ImMessageEntity.ChatType.SINGLE) { clusterPublisher.publish("/user/" + saved.getToId() + "/queue/messages", saved); if (!saved.getFromUserId().equals(saved.getToId())) { clusterPublisher.publish("/user/" + saved.getFromUserId() + "/queue/messages", saved); } pushBridgeClient.notifyUsers( appId, List.of(saved.getToId()), "消息已编辑", saved.getContent(), buildPushPayload(saved) ); } else { clusterPublisher.publish("/topic/group/" + saved.getToId(), saved); List memberIds = groupService.memberIds(groupService.get(saved.getToId())); pushBridgeClient.notifyUsers( appId, memberIds.stream() .filter(memberId -> !memberId.equals(saved.getFromUserId())) .toList(), "群消息已编辑", saved.getContent(), buildPushPayload(saved) ); } dispatchWebhooks(appId, "message.edited", saved); return saved; } public ImMessageEntity adminRevoke(String appId, String messageId) { ImMessageEntity message = messageRepository.findById(messageId) .orElseThrow(() -> new BusinessException(404, "消息不存在")); if (!message.getAppId().equals(appId)) { throw new BusinessException(403, "无权操作"); } message.setStatus(ImMessageEntity.MsgStatus.REVOKED); message.setMsgType(ImMessageEntity.MsgType.REVOKED); ImMessageEntity saved = messageRepository.save(message); if (saved.getChatType() == ImMessageEntity.ChatType.SINGLE) { log.debug("admin revoke single messageId={} destinationTo={} destinationFrom={}", saved.getId(), saved.getToId(), saved.getFromUserId()); clusterPublisher.publish("/user/" + saved.getToId() + "/queue/messages", saved); if (!saved.getFromUserId().equals(saved.getToId())) { clusterPublisher.publish("/user/" + saved.getFromUserId() + "/queue/messages", saved); } } else { log.debug("admin revoke group messageId={} groupId={}", saved.getId(), saved.getToId()); clusterPublisher.publish("/topic/group/" + saved.getToId(), saved); } dispatchWebhooks(appId, "message.revoked", saved); return saved; } public Page history( String appId, String userId, String toId, ImMessageEntity.MsgType msgType, String keyword, LocalDateTime startTime, LocalDateTime endTime, int page, int size) { LocalDateTime effectiveStart = applyHistoryRetention(appId, startTime); return messageRepository.findSingleConversationFiltered( appId, userId, toId, msgType, keyword, effectiveStart, endTime, PageRequest.of(page, size)); } public Page groupHistory( String appId, String groupId, String userId, ImMessageEntity.MsgType msgType, String keyword, LocalDateTime startTime, LocalDateTime endTime, int page, int size) { LocalDateTime effectiveStart = applyHistoryRetention(appId, startTime); ImGroupEntity group = groupService.get(groupId); if (!groupService.memberIds(group).contains(userId)) { throw new BusinessException(403, "不在群内"); } Page pageResult = messageRepository.findGroupHistoryFiltered( appId, groupId, msgType, keyword, effectiveStart, endTime, PageRequest.of(page, size)); pageResult.forEach(message -> message.setGroupReadCount( groupReadCount(appId, groupId, message.getCreatedAt(), message.getFromUserId()))); return pageResult; } public void syncReadReceipt(String appId, String readerId, String peerId, String chatType, LocalDateTime readAt) { if (!ImMessageEntity.ChatType.SINGLE.name().equals(chatType) || readerId.equals(peerId)) { return; } List messages = messageRepository .findByAppIdAndFromUserIdAndToIdAndCreatedAtLessThanEqualOrderByCreatedAtAsc( appId, peerId, readerId, readAt); if (messages.isEmpty()) { return; } List messageIds = new java.util.ArrayList<>(); for (ImMessageEntity message : messages) { if (message.getStatus() == ImMessageEntity.MsgStatus.READ) { messageIds.add(message.getId()); continue; } message.setStatus(ImMessageEntity.MsgStatus.READ); ImMessageEntity saved = messageRepository.save(message); clusterPublisher.publish("/user/" + peerId + "/queue/messages", saved); messageIds.add(saved.getId()); } dispatchWebhooks(appId, "message.read", new MessageReadCallbackPayload( appId, readerId, peerId, null, chatType, toEpochMillis(readAt), messageIds )); } public void syncGroupReadReceipt(String appId, String readerId, String groupId, LocalDateTime readAt) { ImGroupEntity group = groupService.get(groupId); if (!groupService.memberIds(group).contains(readerId)) { return; } List messages = messageRepository .findByAppIdAndToIdAndChatTypeAndCreatedAtLessThanEqualOrderByCreatedAtAsc( appId, groupId, ImMessageEntity.ChatType.GROUP, readAt); if (messages.isEmpty()) { return; } List messageIds = new java.util.ArrayList<>(); for (ImMessageEntity message : messages) { message.setGroupReadCount(groupReadCount(appId, groupId, message.getCreatedAt(), message.getFromUserId())); clusterPublisher.publish("/topic/group/" + groupId, message); messageIds.add(message.getId()); } dispatchWebhooks(appId, "message.read", new MessageReadCallbackPayload( appId, readerId, null, groupId, ImMessageEntity.ChatType.GROUP.name(), toEpochMillis(readAt), messageIds )); } public Page adminHistory( String appId, String userA, String userB, ImMessageEntity.MsgType msgType, String keyword, LocalDateTime startTime, LocalDateTime endTime, int page, int size) { LocalDateTime effectiveStart = applyHistoryRetention(appId, startTime); return messageRepository.findSingleConversationFiltered( appId, userA, userB, msgType, keyword, effectiveStart, endTime, PageRequest.of(page, size)); } public Page adminGroupHistory( String appId, String groupId, ImMessageEntity.MsgType msgType, String keyword, LocalDateTime startTime, LocalDateTime endTime, int page, int size) { LocalDateTime effectiveStart = applyHistoryRetention(appId, startTime); Page pageResult = messageRepository.findGroupHistoryFiltered( appId, groupId, msgType, keyword, effectiveStart, endTime, PageRequest.of(page, size)); pageResult.forEach(message -> message.setGroupReadCount( groupReadCount(appId, groupId, message.getCreatedAt(), message.getFromUserId()))); return pageResult; } public List conversations(String appId, String userId, int size) { return messageRepository.findConversations(appId, userId, normalizeConversationSize(appId, size)); } public List conversationViews(String appId, String userId, int size) { int cappedSize = normalizeConversationSize(appId, size); int fetchSize = Math.max(cappedSize * 3, cappedSize); return messageRepository.findConversations(appId, userId, fetchSize).stream() .map(summary -> toConversationView(appId, userId, summary)) .filter(Objects::nonNull) .limit(cappedSize) .toList(); } private LocalDateTime applyHistoryRetention(String appId, LocalDateTime requestedStart) { int retentionDays = featureConfigClient.historyRetentionDays(appId); LocalDateTime retentionStart = LocalDateTime.now().minusDays(retentionDays); if (requestedStart == null || requestedStart.isBefore(retentionStart)) { return retentionStart; } return requestedStart; } private int normalizeConversationSize(String appId, int requestedSize) { int limit = featureConfigClient.conversationPullLimit(appId); int safeRequested = Math.max(requestedSize, 1); return Math.min(safeRequested, limit); } private ConversationView toConversationView( String appId, String userId, ImMessageRepository.ConversationSummary summary ) { String targetId = summary.getTargetId(); String chatType = summary.getChatType(); var state = conversationStateService.find(appId, userId, targetId, chatType); if (state != null && state.isHidden()) { return null; } Page page = chatType.equals("GROUP") ? messageRepository.findGroupHistory(appId, targetId, PageRequest.of(0, 1)) : messageRepository.findSingleConversation(appId, userId, targetId, PageRequest.of(0, 1)); ImMessageEntity lastMessage = page.getContent().stream().findFirst().orElse(null); LocalDateTime lastReadAt = state == null ? null : state.getLastReadAt(); long unreadCount = chatType.equals("GROUP") ? messageRepository.countUnreadGroupConversation(appId, userId, targetId, lastReadAt) : messageRepository.countUnreadSingleConversation(appId, userId, targetId, lastReadAt); return new ConversationView( targetId, chatType, lastMessage != null ? conversationPreview(lastMessage) : null, lastMessage != null ? lastMessage.getMsgType().name() : null, toEpochMillis(lastMessage != null ? lastMessage.getCreatedAt() : summary.getLastTime()), (int) unreadCount, state != null && state.isMuted(), state != null && state.isPinned() ); } private long toEpochMillis(LocalDateTime time) { return time == null ? 0L : time.toInstant(ZoneOffset.UTC).toEpochMilli(); } private String buildPushPayload(ImMessageEntity message) { try { Map payload = new java.util.LinkedHashMap<>(); payload.put("messageId", message.getId()); payload.put("appId", message.getAppId()); payload.put("fromUserId", message.getFromUserId()); payload.put("toId", message.getToId()); payload.put("chatType", message.getChatType().name()); payload.put("msgType", message.getMsgType().name()); if (message.getEditedAt() != null) { payload.put("editedAt", message.getEditedAt().toInstant(ZoneOffset.UTC).toEpochMilli()); } return objectMapper.writeValueAsString(payload); } catch (Exception e) { return "{}"; } } private String conversationPreview(ImMessageEntity message) { String content = message.getContent(); return switch (message.getMsgType()) { case TEXT -> extractJsonField(content, "text").orElse(content); case IMAGE -> "[图片]"; case AUDIO -> "[语音]"; case VIDEO -> "[视频]"; case FILE -> "[文件]" + extractJsonField(content, "name").map(name -> " " + name).orElse(""); case LOCATION -> "[位置]"; case CUSTOM -> "[自定义]"; case RICH_TEXT -> "[富文本]"; case CALL_AUDIO -> "[语音通话]"; case CALL_VIDEO -> "[视频通话]"; case FORWARD -> "[转发]"; case QUOTE -> extractJsonField(content, "text").orElse("[引用]"); case MERGE -> extractJsonField(content, "title").orElse("[合并转发]"); case REVOKED -> "[消息已撤回]"; case NOTIFY -> extractJsonField(content, "content").orElse("[通知]"); default -> content; }; } private int groupReadCount(String appId, String groupId, LocalDateTime createdAt, String senderId) { ImGroupEntity group = groupService.get(groupId); int count = 0; for (String memberId : groupService.memberIds(group)) { if (memberId.equals(senderId)) { count += 1; continue; } var state = conversationStateService.find(appId, memberId, groupId, ImMessageEntity.ChatType.GROUP.name()); if (state != null && state.getLastReadAt() != null && !state.getLastReadAt().isBefore(createdAt)) { count += 1; } } return Math.max(count, 1); } private java.util.Optional extractJsonField(String content, String field) { try { var node = objectMapper.readTree(content); if (node.hasNonNull(field)) { String value = node.get(field).asText(); if (!value.isBlank()) { return java.util.Optional.of(value); } } } catch (Exception ignored) { } return java.util.Optional.empty(); } protected void dispatchWebhooks(String appId, String callbackEvent, ImMessageEntity message) { webhookDispatchService.dispatch(appId, "message", callbackEvent, message); } protected void dispatchWebhooks(String appId, String callbackEvent, Object payload) { webhookDispatchService.dispatch(appId, "message", callbackEvent, payload); } }