XuqmGroup-Server/im-service/src/main/java/com/xuqm/im/service/MessageService.java
XuqmGroup d2dea0c332 feat(android-sdk): 添加完整的IM客户端SDK实现
- 实现了Android SDK的完整IM功能接口,包括消息、群组、好友、会话等核心功能
- 添加了消息收发、历史记录、撤回编辑等完整的消息操作能力
- 实现了群组管理功能,包括创建、成员管理、权限设置等操作
- 添加了好友关系链管理,支持添加、删除、分组等操作
- 实现了会话管理功能,包括置顶、免打扰、已读状态等
- 添加了黑名单、资料管理、搜索等辅助功能
- 补齐了批量操作接口,提升客户端操作效率
- 实现了WebSocket连接管理和事件监听机制
- 添加了离线消息同步和状态管理功能
2026-05-02 22:57:55 +08:00

675 行
31 KiB
Java

package com.xuqm.im.service;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xuqm.common.exception.BusinessException;
import com.xuqm.im.cluster.ImClusterPublisher;
import com.xuqm.im.entity.ImGroupEntity;
import com.xuqm.im.entity.ImMessageEntity;
import com.xuqm.im.model.ConversationView;
import com.xuqm.im.model.EditMessageRequest;
import com.xuqm.im.model.MessageReadCallbackPayload;
import com.xuqm.im.model.SendMessageRequest;
import com.xuqm.im.repository.ImFriendRepository;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import com.xuqm.im.repository.ImMessageRepository;
import java.util.ArrayList;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
@Service
public class MessageService {
private static final Logger log = LoggerFactory.getLogger(MessageService.class);
private final ImMessageRepository messageRepository;
private final KeywordFilterService keywordFilterService;
private final GlobalMuteService globalMuteService;
private final ImClusterPublisher clusterPublisher;
private final ImGroupService groupService;
private final BlacklistService blacklistService;
private final ConversationStateService conversationStateService;
private final ImPushBridge imPushBridge;
private final ImFeatureConfigClient featureConfigClient;
private final ImFriendRepository friendRepository;
private final WebhookDispatchService webhookDispatchService;
private final OfflineMessageSyncService offlineMessageSyncService;
private final UserPresenceService userPresenceService;
private final ObjectMapper objectMapper;
public MessageService(ImMessageRepository messageRepository,
KeywordFilterService keywordFilterService,
GlobalMuteService globalMuteService,
ImClusterPublisher clusterPublisher,
ImGroupService groupService,
BlacklistService blacklistService,
ConversationStateService conversationStateService,
ImPushBridge imPushBridge,
ImFeatureConfigClient featureConfigClient,
ImFriendRepository friendRepository,
WebhookDispatchService webhookDispatchService,
OfflineMessageSyncService offlineMessageSyncService,
UserPresenceService userPresenceService,
ObjectMapper objectMapper) {
this.messageRepository = messageRepository;
this.keywordFilterService = keywordFilterService;
this.globalMuteService = globalMuteService;
this.clusterPublisher = clusterPublisher;
this.groupService = groupService;
this.blacklistService = blacklistService;
this.conversationStateService = conversationStateService;
this.imPushBridge = imPushBridge;
this.featureConfigClient = featureConfigClient;
this.friendRepository = friendRepository;
this.webhookDispatchService = webhookDispatchService;
this.offlineMessageSyncService = offlineMessageSyncService;
this.userPresenceService = userPresenceService;
this.objectMapper = objectMapper;
}
public ImMessageEntity send(String appId, String fromUserId, SendMessageRequest req) {
if (globalMuteService.isEnabled(appId)) {
throw new BusinessException(403, "当前应用已开启全局禁言");
}
String content = req.content();
if (req.msgType() == ImMessageEntity.MsgType.TEXT) {
content = keywordFilterService.filter(appId, content);
if (content == null) {
throw new BusinessException("消息包含违禁内容");
}
}
ImGroupEntity group = null;
boolean receiverBlocksSender = false;
if (req.chatType() == ImMessageEntity.ChatType.GROUP) {
group = groupService.get(req.toId());
if (!groupService.memberIds(group).contains(fromUserId)) {
throw new BusinessException(403, "不在群内");
}
if (groupService.isMemberMuted(req.toId(), fromUserId)) {
throw new BusinessException(403, "当前用户已被禁言");
}
} else if (!isFriend(appId, fromUserId, req.toId())
&& !featureConfigClient.allowStrangerMessage(appId)) {
throw new BusinessException(403, "仅允许好友之间发送消息");
} else {
boolean senderBlocksReceiver = blacklistService.isBlocked(appId, fromUserId, req.toId());
receiverBlocksSender = blacklistService.isBlocked(appId, req.toId(), fromUserId);
boolean blacklistSendSuccess = featureConfigClient.blacklistSendSuccess(appId);
if (senderBlocksReceiver && receiverBlocksSender) {
throw new BusinessException(403, "已被拉黑,无法发送消息");
}
if (receiverBlocksSender && !blacklistSendSuccess) {
throw new BusinessException(403, "已被拉黑,无法发送消息");
}
}
ImMessageEntity message = new ImMessageEntity();
message.setId(req.messageId() != null && !req.messageId().isBlank()
? req.messageId()
: UUID.randomUUID().toString());
message.setAppId(appId);
message.setFromUserId(fromUserId);
message.setToId(req.toId());
message.setChatType(req.chatType());
message.setMsgType(req.msgType());
message.setContent(content);
message.setStatus(ImMessageEntity.MsgStatus.SENT);
message.setMentionedUserIds(req.mentionedUserIds());
message.setCreatedAt(LocalDateTime.now());
ImMessageEntity saved = messageRepository.save(message);
if (req.chatType() == ImMessageEntity.ChatType.GROUP) {
saved.setGroupReadCount(groupReadCount(appId, req.toId(), saved.getCreatedAt(), saved.getFromUserId()));
}
if (req.chatType() == ImMessageEntity.ChatType.SINGLE && !fromUserId.equals(req.toId())) {
log.debug("echo message back to sender appId={} from={} to={}",
appId, fromUserId, req.toId());
clusterPublisher.publish("/user/" + fromUserId + "/queue/messages", saved);
if (!receiverBlocksSender) {
log.debug("deliver message to receiver appId={} from={} to={}",
appId, fromUserId, req.toId());
boolean receiverOnline = userPresenceService.isOnline(req.toId());
if (receiverOnline) {
clusterPublisher.publish("/user/" + req.toId() + "/queue/messages", saved);
} else {
offlineMessageSyncService.storeOfflineMessage(appId, req.toId(), saved.getId());
}
conversationStateService.clearHiddenForUsers(appId, req.toId(), req.chatType().name(), List.of(fromUserId, req.toId()));
imPushBridge.sendOfflinePushToUsers(
appId,
List.of(req.toId()),
"新消息",
saved.getContent(),
buildPushPayload(saved)
);
} else {
conversationStateService.clearHiddenForUsers(appId, req.toId(), req.chatType().name(), List.of(fromUserId));
}
} else if (req.chatType() == ImMessageEntity.ChatType.GROUP) {
String destination = "/topic/group/" + req.toId();
log.debug("send message appId={} from={} to={} chatType={} msgType={} destination={}",
appId, fromUserId, req.toId(), req.chatType(), req.msgType(), destination);
clusterPublisher.publish(destination, saved);
List<String> memberIds = groupService.memberIds(group);
conversationStateService.clearHiddenForUsers(appId, req.toId(), req.chatType().name(), memberIds);
imPushBridge.sendOfflinePushToUsers(
appId,
memberIds.stream()
.filter(memberId -> !memberId.equals(fromUserId))
.toList(),
"群聊消息",
saved.getContent(),
buildPushPayload(saved)
);
} else {
String destination = "/user/" + req.toId() + "/queue/messages";
log.debug("send message appId={} from={} to={} chatType={} msgType={} destination={}",
appId, fromUserId, req.toId(), req.chatType(), req.msgType(), destination);
if (!receiverBlocksSender) {
clusterPublisher.publish(destination, saved);
}
}
dispatchWebhooks(appId, "message.sent", saved);
return saved;
}
private boolean isFriend(String appId, String userId, String friendId) {
return friendRepository.existsByAppIdAndUserIdAndFriendId(appId, userId, friendId)
|| friendRepository.existsByAppIdAndUserIdAndFriendId(appId, friendId, userId);
}
public ImMessageEntity revoke(String appId, String messageId, String requestUserId) {
ImMessageEntity message = messageRepository.findById(messageId)
.orElseThrow(() -> new BusinessException(404, "消息不存在"));
if (!message.getAppId().equals(appId)) {
throw new BusinessException(403, "无权操作");
}
if (!message.getFromUserId().equals(requestUserId)) {
throw new BusinessException(403, "只能撤回自己发送的消息");
}
int recallMinutes = featureConfigClient.messageRecallMinutes(appId);
if (recallMinutes > 0 && message.getCreatedAt().plusMinutes(recallMinutes).isBefore(LocalDateTime.now())) {
throw new BusinessException(403, "已超过可撤回时长");
}
message.setStatus(ImMessageEntity.MsgStatus.REVOKED);
message.setMsgType(ImMessageEntity.MsgType.REVOKED);
ImMessageEntity saved = messageRepository.save(message);
if (saved.getChatType() == ImMessageEntity.ChatType.SINGLE) {
log.debug("revoke single messageId={} destinationTo={} destinationFrom={}",
saved.getId(), saved.getToId(), saved.getFromUserId());
clusterPublisher.publish("/user/" + saved.getToId() + "/queue/messages", saved);
if (!saved.getFromUserId().equals(saved.getToId())) {
clusterPublisher.publish("/user/" + saved.getFromUserId() + "/queue/messages", saved);
}
} else {
log.debug("revoke group messageId={} groupId={}", saved.getId(), saved.getToId());
clusterPublisher.publish("/topic/group/" + saved.getToId(), saved);
}
dispatchWebhooks(appId, "message.revoked", saved);
return saved;
}
public ImMessageEntity edit(String appId, String messageId, String requestUserId, EditMessageRequest req) {
ImMessageEntity message = messageRepository.findById(messageId)
.orElseThrow(() -> new BusinessException(404, "消息不存在"));
if (!message.getAppId().equals(appId)) {
throw new BusinessException(403, "无权操作");
}
if (!message.getFromUserId().equals(requestUserId)) {
throw new BusinessException(403, "只能编辑自己发送的消息");
}
if (message.getStatus() == ImMessageEntity.MsgStatus.REVOKED || message.getMsgType() == ImMessageEntity.MsgType.REVOKED) {
throw new BusinessException(400, "已撤回消息不能编辑");
}
if (message.getMsgType() != ImMessageEntity.MsgType.TEXT) {
throw new BusinessException(400, "仅支持编辑文本消息");
}
String content = keywordFilterService.filter(appId, req.content());
if (content == null) {
throw new BusinessException("消息包含违禁内容");
}
message.setContent(content);
message.setEditedAt(LocalDateTime.now());
ImMessageEntity saved = messageRepository.save(message);
if (saved.getChatType() == ImMessageEntity.ChatType.SINGLE) {
clusterPublisher.publish("/user/" + saved.getToId() + "/queue/messages", saved);
if (!saved.getFromUserId().equals(saved.getToId())) {
clusterPublisher.publish("/user/" + saved.getFromUserId() + "/queue/messages", saved);
}
imPushBridge.sendOfflinePushToUsers(
appId,
List.of(saved.getToId()),
"消息已编辑",
saved.getContent(),
buildPushPayload(saved)
);
} else {
clusterPublisher.publish("/topic/group/" + saved.getToId(), saved);
List<String> memberIds = groupService.memberIds(groupService.get(saved.getToId()));
imPushBridge.sendOfflinePushToUsers(
appId,
memberIds.stream()
.filter(memberId -> !memberId.equals(saved.getFromUserId()))
.toList(),
"群消息已编辑",
saved.getContent(),
buildPushPayload(saved)
);
}
dispatchWebhooks(appId, "message.edited", saved);
return saved;
}
public ImMessageEntity adminRevoke(String appId, String messageId) {
ImMessageEntity message = messageRepository.findById(messageId)
.orElseThrow(() -> new BusinessException(404, "消息不存在"));
if (!message.getAppId().equals(appId)) {
throw new BusinessException(403, "无权操作");
}
message.setStatus(ImMessageEntity.MsgStatus.REVOKED);
message.setMsgType(ImMessageEntity.MsgType.REVOKED);
ImMessageEntity saved = messageRepository.save(message);
if (saved.getChatType() == ImMessageEntity.ChatType.SINGLE) {
log.debug("admin revoke single messageId={} destinationTo={} destinationFrom={}",
saved.getId(), saved.getToId(), saved.getFromUserId());
clusterPublisher.publish("/user/" + saved.getToId() + "/queue/messages", saved);
if (!saved.getFromUserId().equals(saved.getToId())) {
clusterPublisher.publish("/user/" + saved.getFromUserId() + "/queue/messages", saved);
}
} else {
log.debug("admin revoke group messageId={} groupId={}", saved.getId(), saved.getToId());
clusterPublisher.publish("/topic/group/" + saved.getToId(), saved);
}
dispatchWebhooks(appId, "message.revoked", saved);
return saved;
}
public Page<ImMessageEntity> history(
String appId,
String userId,
String toId,
ImMessageEntity.MsgType msgType,
String keyword,
LocalDateTime startTime,
LocalDateTime endTime,
int page,
int size) {
LocalDateTime effectiveStart = applyHistoryRetention(appId, startTime);
return messageRepository.findSingleConversationFiltered(
appId, userId, toId, msgType, keyword, effectiveStart, endTime, PageRequest.of(page, size));
}
public Page<ImMessageEntity> groupHistory(
String appId,
String groupId,
String userId,
ImMessageEntity.MsgType msgType,
String keyword,
LocalDateTime startTime,
LocalDateTime endTime,
int page,
int size) {
LocalDateTime effectiveStart = applyHistoryRetention(appId, startTime);
ImGroupEntity group = groupService.get(groupId);
if (!groupService.memberIds(group).contains(userId)) {
throw new BusinessException(403, "不在群内");
}
Page<ImMessageEntity> pageResult = messageRepository.findGroupHistoryFiltered(
appId, groupId, msgType, keyword, effectiveStart, endTime, PageRequest.of(page, size));
pageResult.forEach(message -> message.setGroupReadCount(
groupReadCount(appId, groupId, message.getCreatedAt(), message.getFromUserId())));
return pageResult;
}
public void syncReadReceipt(String appId, String readerId, String peerId, String chatType, LocalDateTime readAt) {
if (!ImMessageEntity.ChatType.SINGLE.name().equals(chatType) || readerId.equals(peerId)) {
return;
}
List<ImMessageEntity> messages = messageRepository
.findByAppIdAndFromUserIdAndToIdAndCreatedAtLessThanEqualOrderByCreatedAtAsc(
appId, peerId, readerId, readAt);
if (messages.isEmpty()) {
return;
}
List<String> messageIds = new java.util.ArrayList<>();
for (ImMessageEntity message : messages) {
if (message.getStatus() == ImMessageEntity.MsgStatus.READ) {
messageIds.add(message.getId());
continue;
}
message.setStatus(ImMessageEntity.MsgStatus.READ);
ImMessageEntity saved = messageRepository.save(message);
clusterPublisher.publish("/user/" + peerId + "/queue/messages", saved);
messageIds.add(saved.getId());
}
dispatchWebhooks(appId, "message.read", new MessageReadCallbackPayload(
appId,
readerId,
peerId,
null,
chatType,
toEpochMillis(readAt),
messageIds
));
}
public void syncGroupReadReceipt(String appId, String readerId, String groupId, LocalDateTime readAt) {
ImGroupEntity group = groupService.get(groupId);
if (!groupService.memberIds(group).contains(readerId)) {
return;
}
List<ImMessageEntity> messages = messageRepository
.findByAppIdAndToIdAndChatTypeAndCreatedAtLessThanEqualOrderByCreatedAtAsc(
appId, groupId, ImMessageEntity.ChatType.GROUP, readAt);
if (messages.isEmpty()) {
return;
}
List<String> messageIds = new java.util.ArrayList<>();
for (ImMessageEntity message : messages) {
message.setGroupReadCount(groupReadCount(appId, groupId, message.getCreatedAt(), message.getFromUserId()));
clusterPublisher.publish("/topic/group/" + groupId, message);
messageIds.add(message.getId());
}
dispatchWebhooks(appId, "message.read", new MessageReadCallbackPayload(
appId,
readerId,
null,
groupId,
ImMessageEntity.ChatType.GROUP.name(),
toEpochMillis(readAt),
messageIds
));
}
public Page<ImMessageEntity> adminHistory(
String appId,
String userA,
String userB,
ImMessageEntity.MsgType msgType,
String keyword,
LocalDateTime startTime,
LocalDateTime endTime,
int page,
int size) {
LocalDateTime effectiveStart = applyHistoryRetention(appId, startTime);
return messageRepository.findSingleConversationFiltered(
appId, userA, userB, msgType, keyword, effectiveStart, endTime, PageRequest.of(page, size));
}
public Page<ImMessageEntity> adminGroupHistory(
String appId,
String groupId,
ImMessageEntity.MsgType msgType,
String keyword,
LocalDateTime startTime,
LocalDateTime endTime,
int page,
int size) {
LocalDateTime effectiveStart = applyHistoryRetention(appId, startTime);
Page<ImMessageEntity> pageResult = messageRepository.findGroupHistoryFiltered(
appId, groupId, msgType, keyword, effectiveStart, endTime, PageRequest.of(page, size));
pageResult.forEach(message -> message.setGroupReadCount(
groupReadCount(appId, groupId, message.getCreatedAt(), message.getFromUserId())));
return pageResult;
}
public List<ImMessageRepository.ConversationSummary> conversations(String appId, String userId, int size) {
return messageRepository.findConversations(appId, userId, normalizeConversationSize(appId, size));
}
public List<ConversationView> conversationViews(String appId, String userId, int size) {
int cappedSize = normalizeConversationSize(appId, size);
int fetchSize = Math.max(cappedSize * 3, cappedSize);
return messageRepository.findConversations(appId, userId, fetchSize).stream()
.map(summary -> toConversationView(appId, userId, summary))
.filter(Objects::nonNull)
.limit(cappedSize)
.toList();
}
private LocalDateTime applyHistoryRetention(String appId, LocalDateTime requestedStart) {
int retentionDays = featureConfigClient.historyRetentionDays(appId);
LocalDateTime retentionStart = LocalDateTime.now().minusDays(retentionDays);
if (requestedStart == null || requestedStart.isBefore(retentionStart)) {
return retentionStart;
}
return requestedStart;
}
private int normalizeConversationSize(String appId, int requestedSize) {
int limit = featureConfigClient.conversationPullLimit(appId);
int safeRequested = Math.max(requestedSize, 1);
return Math.min(safeRequested, limit);
}
private ConversationView toConversationView(
String appId,
String userId,
ImMessageRepository.ConversationSummary summary
) {
String targetId = summary.getTargetId();
String chatType = summary.getChatType();
var state = conversationStateService.find(appId, userId, targetId, chatType);
if (state != null && state.isHidden()) {
return null;
}
Page<ImMessageEntity> page = chatType.equals("GROUP")
? messageRepository.findGroupHistory(appId, targetId, PageRequest.of(0, 1))
: messageRepository.findSingleConversation(appId, userId, targetId, PageRequest.of(0, 1));
ImMessageEntity lastMessage = page.getContent().stream().findFirst().orElse(null);
LocalDateTime lastReadAt = state == null ? null : state.getLastReadAt();
long unreadCount = chatType.equals("GROUP")
? messageRepository.countUnreadGroupConversation(appId, userId, targetId, lastReadAt)
: messageRepository.countUnreadSingleConversation(appId, userId, targetId, lastReadAt);
return new ConversationView(
targetId,
chatType,
lastMessage != null ? conversationPreview(lastMessage) : null,
lastMessage != null ? lastMessage.getMsgType().name() : null,
toEpochMillis(lastMessage != null ? lastMessage.getCreatedAt() : summary.getLastTime()),
(int) unreadCount,
state != null && state.isMuted(),
state != null && state.isPinned(),
state == null ? null : state.getConversationGroup()
);
}
public List<GroupReadReceiptSummary> groupReadReceipts(String appId, String groupId, List<String> messageIds) {
ImGroupEntity group = groupService.get(groupId);
if (!group.getAppId().equals(appId)) {
throw new BusinessException(403, "无权操作");
}
List<String> members = groupService.memberIds(group);
return messageRepository.findAllById(messageIds == null ? List.of() : messageIds).stream()
.filter(message -> appId.equals(message.getAppId()))
.filter(message -> groupId.equals(message.getToId()))
.filter(message -> message.getChatType() == ImMessageEntity.ChatType.GROUP)
.map(message -> {
int readCount = groupReadCount(appId, groupId, message.getCreatedAt(), message.getFromUserId());
return new GroupReadReceiptSummary(
message.getId(),
groupId,
members.size(),
readCount,
Math.max(members.size() - readCount, 0)
);
})
.toList();
}
private long toEpochMillis(LocalDateTime time) {
return time == null ? 0L : time.toInstant(ZoneOffset.UTC).toEpochMilli();
}
private String buildPushPayload(ImMessageEntity message) {
try {
Map<String, Object> payload = new java.util.LinkedHashMap<>();
payload.put("messageId", message.getId());
payload.put("appId", message.getAppId());
payload.put("fromUserId", message.getFromUserId());
payload.put("toId", message.getToId());
payload.put("chatType", message.getChatType().name());
payload.put("msgType", message.getMsgType().name());
if (message.getEditedAt() != null) {
payload.put("editedAt", message.getEditedAt().toInstant(ZoneOffset.UTC).toEpochMilli());
}
return objectMapper.writeValueAsString(payload);
} catch (Exception e) {
return "{}";
}
}
private String conversationPreview(ImMessageEntity message) {
String content = message.getContent();
return switch (message.getMsgType()) {
case TEXT -> extractJsonField(content, "text").orElse(content);
case IMAGE -> "[图片]";
case AUDIO -> "[语音]";
case VIDEO -> "[视频]";
case FILE -> "[文件]" + extractJsonField(content, "name").map(name -> " " + name).orElse("");
case LOCATION -> "[位置]";
case CUSTOM -> "[自定义]";
case RICH_TEXT -> "[富文本]";
case CALL_AUDIO -> "[语音通话]";
case CALL_VIDEO -> "[视频通话]";
case FORWARD -> "[转发]";
case QUOTE -> extractJsonField(content, "text").orElse("[引用]");
case MERGE -> extractJsonField(content, "title").orElse("[合并转发]");
case REVOKED -> "[消息已撤回]";
case NOTIFY -> extractJsonField(content, "content").orElse("[通知]");
default -> content;
};
}
private int groupReadCount(String appId, String groupId, LocalDateTime createdAt, String senderId) {
ImGroupEntity group = groupService.get(groupId);
int count = 0;
for (String memberId : groupService.memberIds(group)) {
if (memberId.equals(senderId)) {
count += 1;
continue;
}
var state = conversationStateService.find(appId, memberId, groupId, ImMessageEntity.ChatType.GROUP.name());
if (state != null && state.getLastReadAt() != null && !state.getLastReadAt().isBefore(createdAt)) {
count += 1;
}
}
return Math.max(count, 1);
}
private java.util.Optional<String> extractJsonField(String content, String field) {
try {
var node = objectMapper.readTree(content);
if (node.hasNonNull(field)) {
String value = node.get(field).asText();
if (!value.isBlank()) {
return java.util.Optional.of(value);
}
}
} catch (Exception ignored) {
}
return java.util.Optional.empty();
}
protected void dispatchWebhooks(String appId, String callbackEvent, ImMessageEntity message) {
webhookDispatchService.dispatch(appId, "message", callbackEvent, message);
}
protected void dispatchWebhooks(String appId, String callbackEvent, Object payload) {
webhookDispatchService.dispatch(appId, "message", callbackEvent, payload);
}
public ImMessageEntity adminSend(String appId, String fromUserId, String toId, ImMessageEntity.MsgType msgType, String content) {
ImMessageEntity message = new ImMessageEntity();
message.setId(UUID.randomUUID().toString());
message.setAppId(appId);
message.setFromUserId(fromUserId);
message.setToId(toId);
message.setChatType(ImMessageEntity.ChatType.SINGLE);
message.setMsgType(msgType);
message.setContent(content);
message.setStatus(ImMessageEntity.MsgStatus.SENT);
message.setCreatedAt(LocalDateTime.now());
ImMessageEntity saved = messageRepository.save(message);
clusterPublisher.publish("/user/" + fromUserId + "/queue/messages", saved);
clusterPublisher.publish("/user/" + toId + "/queue/messages", saved);
conversationStateService.clearHiddenForUsers(appId, toId, ImMessageEntity.ChatType.SINGLE.name(), List.of(fromUserId, toId));
imPushBridge.sendOfflinePushToUsers(
appId,
List.of(toId),
"新消息",
saved.getContent(),
buildPushPayload(saved)
);
dispatchWebhooks(appId, "message.sent", saved);
return saved;
}
public void adminSetMsgRead(String appId, String userId) {
List<ImMessageEntity> messages = messageRepository.findUnreadByAppIdAndToId(appId, userId);
for (ImMessageEntity message : messages) {
message.setStatus(ImMessageEntity.MsgStatus.READ);
messageRepository.save(message);
}
}
public List<ImMessageEntity> importMessages(String appId, List<ImportMessageRequest> requests) {
List<ImMessageEntity> result = new ArrayList<>();
for (ImportMessageRequest req : requests == null ? List.<ImportMessageRequest>of() : requests) {
if (req == null || req.fromUserId() == null || req.fromUserId().isBlank()
|| req.toId() == null || req.toId().isBlank()) {
continue;
}
ImMessageEntity message = new ImMessageEntity();
message.setId(req.messageId() != null && !req.messageId().isBlank()
? req.messageId()
: UUID.randomUUID().toString());
message.setAppId(appId);
message.setFromUserId(req.fromUserId());
message.setToId(req.toId());
message.setChatType(req.chatType() != null ? req.chatType() : ImMessageEntity.ChatType.SINGLE);
message.setMsgType(req.msgType() != null ? req.msgType() : ImMessageEntity.MsgType.TEXT);
message.setContent(req.content() != null ? req.content() : "");
message.setStatus(req.status() != null ? req.status() : ImMessageEntity.MsgStatus.READ);
message.setCreatedAt(req.createdAt() != null ? req.createdAt() : LocalDateTime.now());
result.add(messageRepository.save(message));
}
return result;
}
public record ImportMessageRequest(
String messageId,
String fromUserId,
String toId,
ImMessageEntity.ChatType chatType,
ImMessageEntity.MsgType msgType,
String content,
ImMessageEntity.MsgStatus status,
LocalDateTime createdAt) {}
public record GroupReadReceiptSummary(
String messageId,
String groupId,
int memberCount,
int readCount,
int unreadCount
) {}
}