package com.xuqm.im.service; import com.fasterxml.jackson.databind.ObjectMapper; import com.xuqm.common.exception.BusinessException; import com.xuqm.im.cluster.ImClusterPublisher; import com.xuqm.im.entity.ImGroupEntity; import com.xuqm.im.entity.ImMessageEntity; import com.xuqm.im.entity.WebhookConfigEntity; import com.xuqm.im.model.ConversationView; import com.xuqm.im.model.SendMessageRequest; import com.xuqm.im.repository.ImFriendRepository; import com.xuqm.im.repository.WebhookConfigRepository; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.time.LocalDateTime; import java.time.ZoneOffset; import com.xuqm.im.repository.ImMessageRepository; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.UUID; @Service public class MessageService { private static final Logger log = LoggerFactory.getLogger(MessageService.class); private final ImMessageRepository messageRepository; private final WebhookConfigRepository webhookRepository; private final KeywordFilterService keywordFilterService; private final ImClusterPublisher clusterPublisher; private final ImGroupService groupService; private final BlacklistService blacklistService; private final ConversationStateService conversationStateService; private final ImPushBridgeClient pushBridgeClient; private final ImFeatureConfigClient featureConfigClient; private final ImFriendRepository friendRepository; private final ObjectMapper objectMapper; @Value("${im.webhook-timeout-ms:3000}") private int webhookTimeoutMs; public MessageService(ImMessageRepository messageRepository, WebhookConfigRepository webhookRepository, KeywordFilterService keywordFilterService, ImClusterPublisher clusterPublisher, ImGroupService groupService, BlacklistService blacklistService, ConversationStateService conversationStateService, ImPushBridgeClient pushBridgeClient, ImFeatureConfigClient featureConfigClient, ImFriendRepository friendRepository, ObjectMapper objectMapper) { this.messageRepository = messageRepository; this.webhookRepository = webhookRepository; this.keywordFilterService = keywordFilterService; this.clusterPublisher = clusterPublisher; this.groupService = groupService; this.blacklistService = blacklistService; this.conversationStateService = conversationStateService; this.pushBridgeClient = pushBridgeClient; this.featureConfigClient = featureConfigClient; this.friendRepository = friendRepository; this.objectMapper = objectMapper; } public ImMessageEntity send(String appId, String fromUserId, SendMessageRequest req) { String content = req.content(); if (req.msgType() == ImMessageEntity.MsgType.TEXT) { content = keywordFilterService.filter(appId, content); if (content == null) { throw new BusinessException("消息包含违禁内容"); } } ImGroupEntity group = null; if (req.chatType() == ImMessageEntity.ChatType.GROUP) { group = groupService.get(req.toId()); if (!groupService.memberIds(group).contains(fromUserId)) { throw new BusinessException(403, "不在群内"); } if (groupService.isMemberMuted(req.toId(), fromUserId)) { throw new BusinessException(403, "当前用户已被禁言"); } } else if (blacklistService.isEitherBlocked(appId, fromUserId, req.toId())) { throw new BusinessException(403, "已被拉黑,无法发送消息"); } else if (!isFriend(appId, fromUserId, req.toId()) && !featureConfigClient.allowStrangerMessage(appId)) { throw new BusinessException(403, "仅允许好友之间发送消息"); } ImMessageEntity message = new ImMessageEntity(); message.setId(req.messageId() != null && !req.messageId().isBlank() ? req.messageId() : UUID.randomUUID().toString()); message.setAppId(appId); message.setFromUserId(fromUserId); message.setToId(req.toId()); message.setChatType(req.chatType()); message.setMsgType(req.msgType()); message.setContent(content); message.setStatus(ImMessageEntity.MsgStatus.SENT); message.setMentionedUserIds(req.mentionedUserIds()); message.setCreatedAt(LocalDateTime.now()); ImMessageEntity saved = messageRepository.save(message); if (req.chatType() == ImMessageEntity.ChatType.GROUP) { saved.setGroupReadCount(groupReadCount(appId, req.toId(), saved.getCreatedAt(), saved.getFromUserId())); } String destination = req.chatType() == ImMessageEntity.ChatType.SINGLE ? "/user/" + req.toId() + "/queue/messages" : "/topic/group/" + req.toId(); log.debug("send message appId={} from={} to={} chatType={} msgType={} destination={}", appId, fromUserId, req.toId(), req.chatType(), req.msgType(), destination); clusterPublisher.publish(destination, saved); if (req.chatType() == ImMessageEntity.ChatType.SINGLE && !fromUserId.equals(req.toId())) { log.debug("echo message back to sender appId={} from={} to={}", appId, fromUserId, req.toId()); clusterPublisher.publish("/user/" + fromUserId + "/queue/messages", saved); conversationStateService.clearHiddenForUsers(appId, req.toId(), req.chatType().name(), List.of(fromUserId, req.toId())); pushBridgeClient.notifyUsers( appId, List.of(req.toId()), "新消息", saved.getContent(), buildPushPayload(saved) ); } else if (req.chatType() == ImMessageEntity.ChatType.GROUP) { List memberIds = groupService.memberIds(group); conversationStateService.clearHiddenForUsers(appId, req.toId(), req.chatType().name(), memberIds); pushBridgeClient.notifyUsers( appId, memberIds.stream() .filter(memberId -> !memberId.equals(fromUserId)) .toList(), "群聊消息", saved.getContent(), buildPushPayload(saved) ); } dispatchWebhooks(appId, saved); return saved; } private boolean isFriend(String appId, String userId, String friendId) { return friendRepository.existsByAppIdAndUserIdAndFriendId(appId, userId, friendId) || friendRepository.existsByAppIdAndUserIdAndFriendId(appId, friendId, userId); } public ImMessageEntity revoke(String appId, String messageId, String requestUserId) { ImMessageEntity message = messageRepository.findById(messageId) .orElseThrow(() -> new BusinessException(404, "消息不存在")); if (!message.getAppId().equals(appId)) { throw new BusinessException(403, "无权操作"); } if (!message.getFromUserId().equals(requestUserId)) { throw new BusinessException(403, "只能撤回自己发送的消息"); } message.setStatus(ImMessageEntity.MsgStatus.REVOKED); message.setMsgType(ImMessageEntity.MsgType.REVOKED); ImMessageEntity saved = messageRepository.save(message); if (saved.getChatType() == ImMessageEntity.ChatType.SINGLE) { log.debug("revoke single messageId={} destinationTo={} destinationFrom={}", saved.getId(), saved.getToId(), saved.getFromUserId()); clusterPublisher.publish("/user/" + saved.getToId() + "/queue/messages", saved); if (!saved.getFromUserId().equals(saved.getToId())) { clusterPublisher.publish("/user/" + saved.getFromUserId() + "/queue/messages", saved); } } else { log.debug("revoke group messageId={} groupId={}", saved.getId(), saved.getToId()); clusterPublisher.publish("/topic/group/" + saved.getToId(), saved); } return saved; } public ImMessageEntity adminRevoke(String appId, String messageId) { ImMessageEntity message = messageRepository.findById(messageId) .orElseThrow(() -> new BusinessException(404, "消息不存在")); if (!message.getAppId().equals(appId)) { throw new BusinessException(403, "无权操作"); } message.setStatus(ImMessageEntity.MsgStatus.REVOKED); message.setMsgType(ImMessageEntity.MsgType.REVOKED); ImMessageEntity saved = messageRepository.save(message); if (saved.getChatType() == ImMessageEntity.ChatType.SINGLE) { log.debug("admin revoke single messageId={} destinationTo={} destinationFrom={}", saved.getId(), saved.getToId(), saved.getFromUserId()); clusterPublisher.publish("/user/" + saved.getToId() + "/queue/messages", saved); if (!saved.getFromUserId().equals(saved.getToId())) { clusterPublisher.publish("/user/" + saved.getFromUserId() + "/queue/messages", saved); } } else { log.debug("admin revoke group messageId={} groupId={}", saved.getId(), saved.getToId()); clusterPublisher.publish("/topic/group/" + saved.getToId(), saved); } return saved; } public Page history( String appId, String userId, String toId, ImMessageEntity.MsgType msgType, String keyword, LocalDateTime startTime, LocalDateTime endTime, int page, int size) { return messageRepository.findSingleConversationFiltered( appId, userId, toId, msgType, keyword, startTime, endTime, PageRequest.of(page, size)); } public Page groupHistory( String appId, String groupId, String userId, ImMessageEntity.MsgType msgType, String keyword, LocalDateTime startTime, LocalDateTime endTime, int page, int size) { ImGroupEntity group = groupService.get(groupId); if (!groupService.memberIds(group).contains(userId)) { throw new BusinessException(403, "不在群内"); } Page pageResult = messageRepository.findGroupHistoryFiltered( appId, groupId, msgType, keyword, startTime, endTime, PageRequest.of(page, size)); pageResult.forEach(message -> message.setGroupReadCount( groupReadCount(appId, groupId, message.getCreatedAt(), message.getFromUserId()))); return pageResult; } public void syncReadReceipt(String appId, String readerId, String peerId, String chatType, LocalDateTime readAt) { if (!ImMessageEntity.ChatType.SINGLE.name().equals(chatType) || readerId.equals(peerId)) { return; } List messages = messageRepository .findByAppIdAndFromUserIdAndToIdAndCreatedAtLessThanEqualOrderByCreatedAtAsc( appId, peerId, readerId, readAt); if (messages.isEmpty()) { return; } for (ImMessageEntity message : messages) { if (message.getStatus() == ImMessageEntity.MsgStatus.READ) { continue; } message.setStatus(ImMessageEntity.MsgStatus.READ); ImMessageEntity saved = messageRepository.save(message); clusterPublisher.publish("/user/" + peerId + "/queue/messages", saved); } } public void syncGroupReadReceipt(String appId, String readerId, String groupId, LocalDateTime readAt) { ImGroupEntity group = groupService.get(groupId); if (!groupService.memberIds(group).contains(readerId)) { return; } List messages = messageRepository .findByAppIdAndToIdAndChatTypeAndCreatedAtLessThanEqualOrderByCreatedAtAsc( appId, groupId, ImMessageEntity.ChatType.GROUP, readAt); if (messages.isEmpty()) { return; } for (ImMessageEntity message : messages) { message.setGroupReadCount(groupReadCount(appId, groupId, message.getCreatedAt(), message.getFromUserId())); clusterPublisher.publish("/topic/group/" + groupId, message); } } public Page adminHistory( String appId, String userA, String userB, ImMessageEntity.MsgType msgType, String keyword, LocalDateTime startTime, LocalDateTime endTime, int page, int size) { return messageRepository.findSingleConversationFiltered( appId, userA, userB, msgType, keyword, startTime, endTime, PageRequest.of(page, size)); } public Page adminGroupHistory( String appId, String groupId, ImMessageEntity.MsgType msgType, String keyword, LocalDateTime startTime, LocalDateTime endTime, int page, int size) { Page pageResult = messageRepository.findGroupHistoryFiltered( appId, groupId, msgType, keyword, startTime, endTime, PageRequest.of(page, size)); pageResult.forEach(message -> message.setGroupReadCount( groupReadCount(appId, groupId, message.getCreatedAt(), message.getFromUserId()))); return pageResult; } public List conversations(String appId, String userId, int size) { return messageRepository.findConversations(appId, userId, size); } public List conversationViews(String appId, String userId, int size) { int fetchSize = Math.max(size * 3, size); return messageRepository.findConversations(appId, userId, fetchSize).stream() .map(summary -> toConversationView(appId, userId, summary)) .filter(Objects::nonNull) .limit(size) .toList(); } private ConversationView toConversationView( String appId, String userId, ImMessageRepository.ConversationSummary summary ) { String targetId = summary.getTargetId(); String chatType = summary.getChatType(); var state = conversationStateService.find(appId, userId, targetId, chatType); if (state != null && state.isHidden()) { return null; } Page page = chatType.equals("GROUP") ? messageRepository.findGroupHistory(appId, targetId, PageRequest.of(0, 1)) : messageRepository.findSingleConversation(appId, userId, targetId, PageRequest.of(0, 1)); ImMessageEntity lastMessage = page.getContent().stream().findFirst().orElse(null); LocalDateTime lastReadAt = state == null ? null : state.getLastReadAt(); long unreadCount = chatType.equals("GROUP") ? messageRepository.countUnreadGroupConversation(appId, userId, targetId, lastReadAt) : messageRepository.countUnreadSingleConversation(appId, userId, targetId, lastReadAt); return new ConversationView( targetId, chatType, lastMessage != null ? conversationPreview(lastMessage) : null, lastMessage != null ? lastMessage.getMsgType().name() : null, toEpochMillis(lastMessage != null ? lastMessage.getCreatedAt() : summary.getLastTime()), (int) unreadCount, state != null && state.isMuted(), state != null && state.isPinned() ); } private long toEpochMillis(LocalDateTime time) { return time == null ? 0L : time.toInstant(ZoneOffset.UTC).toEpochMilli(); } private String buildPushPayload(ImMessageEntity message) { try { return objectMapper.writeValueAsString(Map.of( "messageId", message.getId(), "appId", message.getAppId(), "fromUserId", message.getFromUserId(), "toId", message.getToId(), "chatType", message.getChatType().name(), "msgType", message.getMsgType().name() )); } catch (Exception e) { return "{}"; } } private String conversationPreview(ImMessageEntity message) { String content = message.getContent(); return switch (message.getMsgType()) { case TEXT -> extractJsonField(content, "text").orElse(content); case IMAGE -> "[图片]"; case AUDIO -> "[语音]"; case VIDEO -> "[视频]"; case FILE -> "[文件]" + extractJsonField(content, "name").map(name -> " " + name).orElse(""); case LOCATION -> "[位置]"; case CUSTOM -> "[自定义]"; case RICH_TEXT -> "[富文本]"; case CALL_AUDIO -> "[语音通话]"; case CALL_VIDEO -> "[视频通话]"; case FORWARD -> "[转发]"; case QUOTE -> extractJsonField(content, "text").orElse("[引用]"); case MERGE -> extractJsonField(content, "title").orElse("[合并转发]"); case REVOKED -> "[消息已撤回]"; case NOTIFY -> extractJsonField(content, "content").orElse("[通知]"); default -> content; }; } private int groupReadCount(String appId, String groupId, LocalDateTime createdAt, String senderId) { ImGroupEntity group = groupService.get(groupId); int count = 0; for (String memberId : groupService.memberIds(group)) { if (memberId.equals(senderId)) { count += 1; continue; } var state = conversationStateService.find(appId, memberId, groupId, ImMessageEntity.ChatType.GROUP.name()); if (state != null && state.getLastReadAt() != null && !state.getLastReadAt().isBefore(createdAt)) { count += 1; } } return Math.max(count, 1); } private java.util.Optional extractJsonField(String content, String field) { try { var node = objectMapper.readTree(content); if (node.hasNonNull(field)) { String value = node.get(field).asText(); if (!value.isBlank()) { return java.util.Optional.of(value); } } } catch (Exception ignored) { } return java.util.Optional.empty(); } @Async protected void dispatchWebhooks(String appId, ImMessageEntity message) { List webhooks = webhookRepository.findByAppIdAndEnabledTrue(appId); if (webhooks.isEmpty()) return; try { String body = objectMapper.writeValueAsString(message); HttpClient client = HttpClient.newHttpClient(); for (WebhookConfigEntity webhook : webhooks) { try { HttpRequest request = HttpRequest.newBuilder() .uri(URI.create(webhook.getUrl())) .header("Content-Type", "application/json") .POST(HttpRequest.BodyPublishers.ofString(body)) .build(); client.send(request, HttpResponse.BodyHandlers.ofString()); } catch (Exception ignored) { } } } catch (Exception ignored) { } } }