package com.xuqm.im.service; import com.fasterxml.jackson.databind.ObjectMapper; import com.xuqm.common.exception.BusinessException; import com.xuqm.im.cluster.ImClusterPublisher; import com.xuqm.im.entity.ImGroupEntity; import com.xuqm.im.entity.ImMessageEntity; import com.xuqm.im.entity.WebhookConfigEntity; import com.xuqm.im.model.ConversationView; import com.xuqm.im.model.SendMessageRequest; import com.xuqm.im.repository.WebhookConfigRepository; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.time.LocalDateTime; import java.time.ZoneOffset; import com.xuqm.im.repository.ImMessageRepository; import java.util.List; import java.util.Objects; import java.util.UUID; @Service public class MessageService { private final ImMessageRepository messageRepository; private final WebhookConfigRepository webhookRepository; private final KeywordFilterService keywordFilterService; private final ImClusterPublisher clusterPublisher; private final ImGroupService groupService; private final BlacklistService blacklistService; private final ConversationStateService conversationStateService; private final ObjectMapper objectMapper; @Value("${im.webhook-timeout-ms:3000}") private int webhookTimeoutMs; public MessageService(ImMessageRepository messageRepository, WebhookConfigRepository webhookRepository, KeywordFilterService keywordFilterService, ImClusterPublisher clusterPublisher, ImGroupService groupService, BlacklistService blacklistService, ConversationStateService conversationStateService, ObjectMapper objectMapper) { this.messageRepository = messageRepository; this.webhookRepository = webhookRepository; this.keywordFilterService = keywordFilterService; this.clusterPublisher = clusterPublisher; this.groupService = groupService; this.blacklistService = blacklistService; this.conversationStateService = conversationStateService; this.objectMapper = objectMapper; } public ImMessageEntity send(String appId, String fromUserId, SendMessageRequest req) { String content = req.content(); if (req.msgType() == ImMessageEntity.MsgType.TEXT) { content = keywordFilterService.filter(appId, content); if (content == null) { throw new BusinessException("消息包含违禁内容"); } } ImGroupEntity group = null; if (req.chatType() == ImMessageEntity.ChatType.GROUP) { group = groupService.get(req.toId()); if (!groupService.memberIds(group).contains(fromUserId)) { throw new BusinessException(403, "不在群内"); } if (groupService.isMemberMuted(req.toId(), fromUserId)) { throw new BusinessException(403, "当前用户已被禁言"); } } else if (blacklistService.isEitherBlocked(appId, fromUserId, req.toId())) { throw new BusinessException(403, "已被拉黑,无法发送消息"); } ImMessageEntity message = new ImMessageEntity(); message.setId(UUID.randomUUID().toString()); message.setAppId(appId); message.setFromUserId(fromUserId); message.setToId(req.toId()); message.setChatType(req.chatType()); message.setMsgType(req.msgType()); message.setContent(content); message.setStatus(ImMessageEntity.MsgStatus.SENT); message.setMentionedUserIds(req.mentionedUserIds()); message.setCreatedAt(LocalDateTime.now()); messageRepository.save(message); String destination = req.chatType() == ImMessageEntity.ChatType.SINGLE ? "/user/" + req.toId() + "/queue/messages" : "/topic/group/" + req.toId(); clusterPublisher.publish(destination, message); if (req.chatType() == ImMessageEntity.ChatType.SINGLE && !fromUserId.equals(req.toId())) { clusterPublisher.publish("/user/" + fromUserId + "/queue/messages", message); conversationStateService.clearHiddenForUsers(appId, req.toId(), req.chatType().name(), List.of(fromUserId, req.toId())); } else if (req.chatType() == ImMessageEntity.ChatType.GROUP) { conversationStateService.clearHiddenForUsers(appId, req.toId(), req.chatType().name(), groupService.memberIds(group)); } dispatchWebhooks(appId, message); return message; } public ImMessageEntity revoke(String appId, String messageId, String requestUserId) { ImMessageEntity message = messageRepository.findById(messageId) .orElseThrow(() -> new BusinessException(404, "消息不存在")); if (!message.getAppId().equals(appId)) { throw new BusinessException(403, "无权操作"); } if (!message.getFromUserId().equals(requestUserId)) { throw new BusinessException(403, "只能撤回自己发送的消息"); } message.setStatus(ImMessageEntity.MsgStatus.REVOKED); message.setMsgType(ImMessageEntity.MsgType.REVOKED); ImMessageEntity saved = messageRepository.save(message); if (saved.getChatType() == ImMessageEntity.ChatType.SINGLE) { clusterPublisher.publish("/user/" + saved.getToId() + "/queue/messages", saved); if (!saved.getFromUserId().equals(saved.getToId())) { clusterPublisher.publish("/user/" + saved.getFromUserId() + "/queue/messages", saved); } } else { clusterPublisher.publish("/topic/group/" + saved.getToId(), saved); } return saved; } public Page history(String appId, String userId, String toId, int page, int size) { return messageRepository.findSingleConversation( appId, userId, toId, PageRequest.of(page, size)); } public Page groupHistory(String appId, String groupId, String userId, int page, int size) { ImGroupEntity group = groupService.get(groupId); if (!groupService.memberIds(group).contains(userId)) { throw new BusinessException(403, "不在群内"); } return messageRepository.findGroupHistory(appId, groupId, PageRequest.of(page, size)); } public List conversations(String appId, String userId, int size) { return messageRepository.findConversations(appId, userId, size); } public List conversationViews(String appId, String userId, int size) { int fetchSize = Math.max(size * 3, size); return messageRepository.findConversations(appId, userId, fetchSize).stream() .map(summary -> toConversationView(appId, userId, summary)) .filter(Objects::nonNull) .limit(size) .toList(); } private ConversationView toConversationView( String appId, String userId, ImMessageRepository.ConversationSummary summary ) { String targetId = summary.getTargetId(); String chatType = summary.getChatType(); var state = conversationStateService.find(appId, userId, targetId, chatType); if (state != null && state.isHidden()) { return null; } Page page = chatType.equals("GROUP") ? messageRepository.findGroupHistory(appId, targetId, PageRequest.of(0, 1)) : messageRepository.findSingleConversation(appId, userId, targetId, PageRequest.of(0, 1)); ImMessageEntity lastMessage = page.getContent().stream().findFirst().orElse(null); LocalDateTime lastReadAt = state == null ? null : state.getLastReadAt(); long unreadCount = chatType.equals("GROUP") ? messageRepository.countUnreadGroupConversation(appId, userId, targetId, lastReadAt) : messageRepository.countUnreadSingleConversation(appId, userId, targetId, lastReadAt); return new ConversationView( targetId, chatType, lastMessage != null ? lastMessage.getContent() : null, lastMessage != null ? lastMessage.getMsgType().name() : null, toEpochMillis(lastMessage != null ? lastMessage.getCreatedAt() : summary.getLastTime()), (int) unreadCount, state != null && state.isMuted(), state != null && state.isPinned() ); } private long toEpochMillis(LocalDateTime time) { return time == null ? 0L : time.toInstant(ZoneOffset.UTC).toEpochMilli(); } @Async protected void dispatchWebhooks(String appId, ImMessageEntity message) { List webhooks = webhookRepository.findByAppIdAndEnabledTrue(appId); if (webhooks.isEmpty()) return; try { String body = objectMapper.writeValueAsString(message); HttpClient client = HttpClient.newHttpClient(); for (WebhookConfigEntity webhook : webhooks) { try { HttpRequest request = HttpRequest.newBuilder() .uri(URI.create(webhook.getUrl())) .header("Content-Type", "application/json") .POST(HttpRequest.BodyPublishers.ofString(body)) .build(); client.send(request, HttpResponse.BodyHandlers.ofString()); } catch (Exception ignored) { } } } catch (Exception ignored) { } } }