package com.xuqm.im.service; import com.fasterxml.jackson.databind.ObjectMapper; import com.xuqm.common.exception.BusinessException; import com.xuqm.im.cluster.ImClusterPublisher; import com.xuqm.im.entity.ImMessageEntity; import com.xuqm.im.entity.WebhookConfigEntity; import com.xuqm.im.model.SendMessageRequest; import com.xuqm.im.repository.WebhookConfigRepository; import org.springframework.beans.factory.annotation.Value; import org.springframework.data.domain.Page; import org.springframework.data.domain.PageRequest; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import java.net.URI; import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.time.LocalDateTime; import com.xuqm.im.repository.ImMessageRepository; import java.util.List; import java.util.UUID; @Service public class MessageService { private final ImMessageRepository messageRepository; private final WebhookConfigRepository webhookRepository; private final KeywordFilterService keywordFilterService; private final ImClusterPublisher clusterPublisher; private final ObjectMapper objectMapper; @Value("${im.webhook-timeout-ms:3000}") private int webhookTimeoutMs; public MessageService(ImMessageRepository messageRepository, WebhookConfigRepository webhookRepository, KeywordFilterService keywordFilterService, ImClusterPublisher clusterPublisher, ObjectMapper objectMapper) { this.messageRepository = messageRepository; this.webhookRepository = webhookRepository; this.keywordFilterService = keywordFilterService; this.clusterPublisher = clusterPublisher; this.objectMapper = objectMapper; } public ImMessageEntity send(String appId, String fromUserId, SendMessageRequest req) { String content = req.content(); if (req.msgType() == ImMessageEntity.MsgType.TEXT) { content = keywordFilterService.filter(appId, content); if (content == null) { throw new BusinessException("消息包含违禁内容"); } } ImMessageEntity message = new ImMessageEntity(); message.setId(UUID.randomUUID().toString()); message.setAppId(appId); message.setFromUserId(fromUserId); message.setToId(req.toId()); message.setChatType(req.chatType()); message.setMsgType(req.msgType()); message.setContent(content); message.setStatus(ImMessageEntity.MsgStatus.SENT); message.setMentionedUserIds(req.mentionedUserIds()); message.setCreatedAt(LocalDateTime.now()); messageRepository.save(message); String destination = req.chatType() == ImMessageEntity.ChatType.SINGLE ? "/user/" + req.toId() + "/queue/messages" : "/topic/group/" + req.toId(); clusterPublisher.publish(destination, message); if (req.chatType() == ImMessageEntity.ChatType.SINGLE && !fromUserId.equals(req.toId())) { clusterPublisher.publish("/user/" + fromUserId + "/queue/messages", message); } dispatchWebhooks(appId, message); return message; } public ImMessageEntity revoke(String appId, String messageId, String requestUserId) { ImMessageEntity message = messageRepository.findById(messageId) .orElseThrow(() -> new BusinessException(404, "消息不存在")); if (!message.getAppId().equals(appId)) { throw new BusinessException(403, "无权操作"); } if (!message.getFromUserId().equals(requestUserId)) { throw new BusinessException(403, "只能撤回自己发送的消息"); } message.setStatus(ImMessageEntity.MsgStatus.REVOKED); message.setMsgType(ImMessageEntity.MsgType.REVOKED); ImMessageEntity saved = messageRepository.save(message); if (saved.getChatType() == ImMessageEntity.ChatType.SINGLE) { clusterPublisher.publish("/user/" + saved.getToId() + "/queue/messages", saved); if (!saved.getFromUserId().equals(saved.getToId())) { clusterPublisher.publish("/user/" + saved.getFromUserId() + "/queue/messages", saved); } } else { clusterPublisher.publish("/topic/group/" + saved.getToId(), saved); } return saved; } public Page history(String appId, String userId, String toId, int page, int size) { return messageRepository.findSingleConversation( appId, userId, toId, PageRequest.of(page, size)); } public List conversations(String appId, String userId, int size) { return messageRepository.findConversations(appId, userId, size); } @Async protected void dispatchWebhooks(String appId, ImMessageEntity message) { List webhooks = webhookRepository.findByAppIdAndEnabledTrue(appId); if (webhooks.isEmpty()) return; try { String body = objectMapper.writeValueAsString(message); HttpClient client = HttpClient.newHttpClient(); for (WebhookConfigEntity webhook : webhooks) { try { HttpRequest request = HttpRequest.newBuilder() .uri(URI.create(webhook.getUrl())) .header("Content-Type", "application/json") .POST(HttpRequest.BodyPublishers.ofString(body)) .build(); client.send(request, HttpResponse.BodyHandlers.ofString()); } catch (Exception ignored) { } } } catch (Exception ignored) { } } }