From 4e821b280bd5682eae4f0a5b8271bcda3db4e1de Mon Sep 17 00:00:00 2001 From: XuqmGroup Date: Fri, 24 Apr 2026 20:54:12 +0800 Subject: [PATCH] feat(rn-im): add WatermelonDB local message database - Add schema for im_conversations and im_messages tables - ConversationModel/MessageModel: WatermelonDB ORM models - ImDatabase: init/saveMessage/getMessages/getConversations/markRead/bulkSave - ImSDK: fetchHistory/fetchGroupHistory read local DB first, cache server results - ImSDK: incoming WebSocket messages auto-saved to local DB - ImSDK: new listConversations() and markRead() public APIs - @nozbe/watermelondb >=0.27.0 added as peer dependency Co-Authored-By: Claude Sonnet 4.6 --- packages/im/package.json | 1 + packages/im/src/ImSDK.ts | 115 +++++++++++++++++- packages/im/src/db/ConversationModel.ts | 16 +++ packages/im/src/db/ImDatabase.ts | 153 ++++++++++++++++++++++++ packages/im/src/db/MessageModel.ts | 19 +++ packages/im/src/db/schema.ts | 38 ++++++ packages/im/src/index.ts | 1 + 7 files changed, 337 insertions(+), 6 deletions(-) create mode 100644 packages/im/src/db/ConversationModel.ts create mode 100644 packages/im/src/db/ImDatabase.ts create mode 100644 packages/im/src/db/MessageModel.ts create mode 100644 packages/im/src/db/schema.ts diff --git a/packages/im/package.json b/packages/im/package.json index b947bb6..4cbbffa 100644 --- a/packages/im/package.json +++ b/packages/im/package.json @@ -12,6 +12,7 @@ "scripts": { "typecheck": "tsc --noEmit" }, "peerDependencies": { "@xuqm/rn-common": ">=0.2.0", + "@nozbe/watermelondb": ">=0.27.0", "react-native": ">=0.76.0" }, "devDependencies": { diff --git a/packages/im/src/ImSDK.ts b/packages/im/src/ImSDK.ts index beab32e..15c50ff 100644 --- a/packages/im/src/ImSDK.ts +++ b/packages/im/src/ImSDK.ts @@ -1,14 +1,17 @@ import { apiRequest, _getToken, _saveToken, getConfig } from '@xuqm/rn-common' import { ImClient } from './ImClient' +import { ImDatabase } from './db/ImDatabase' import type { ChatType, ImEventListener, ImGroup, ImMessage, MsgType } from './types' let client: ImClient | null = null +let _currentUserId: string | null = null export const ImSDK = { /** * Login to IM service. Fetches a token internally and opens the WebSocket connection. + * Pass dbName to enable local SQLite message caching (requires @nozbe/watermelondb). */ - async login(userId: string, nickname?: string, avatar?: string): Promise { + async login(userId: string, nickname?: string, avatar?: string, dbName?: string): Promise { const config = getConfig() const res = await apiRequest<{ token: string }>('/api/im/auth/login', { method: 'POST', @@ -21,6 +24,12 @@ export const ImSDK = { }, }) await _saveToken(res.token) + _currentUserId = userId + + if (dbName !== undefined || ImDatabase.isInitialized()) { + ImDatabase.init(dbName ?? 'xuqm_im') + } + client = new ImClient(config.imWsUrl, res.token, config.appId) client.connect() }, @@ -33,13 +42,42 @@ export const ImSDK = { client.connect() }, + /** + * Fetch message history. Reads from local DB first; falls back to server if DB is empty + * or not initialized, then caches results locally. + */ async fetchHistory(toId: string, page = 0, size = 20): Promise { const config = getConfig() + + if (ImDatabase.isInitialized() && page === 0 && _currentUserId) { + const local = await ImDatabase.getMessages(config.appId, toId, 'SINGLE', _currentUserId, size) + if (local.length > 0) { + return local.map(m => ({ + id: m.serverId, + appId: m.appId, + fromUserId: m.fromUserId, + toId: m.toId, + chatType: m.chatType as ChatType, + msgType: m.msgType as MsgType, + content: m.content, + status: m.status as ImMessage['status'], + mentionedUserIds: m.mentionedUserIds ?? undefined, + createdAt: new Date(m.serverCreatedAt).toISOString(), + })) + } + } + const res = await apiRequest<{ content?: ImMessage[] } | ImMessage[]>( `/api/im/messages/history/${encodeURIComponent(toId)}`, { params: { appId: config.appId, page: String(page), size: String(size) } }, ) - return Array.isArray(res) ? res : (res.content ?? []) + const messages = Array.isArray(res) ? res : (res.content ?? []) + + if (ImDatabase.isInitialized() && _currentUserId) { + await ImDatabase.bulkSave(messages, _currentUserId) + } + + return messages }, async sendMessage( @@ -50,19 +88,27 @@ export const ImSDK = { mentionedUserIds?: string, ): Promise { const config = getConfig() - return apiRequest('/api/im/messages/send', { + const msg = await apiRequest('/api/im/messages/send', { method: 'POST', params: { appId: config.appId }, body: { toId, chatType, msgType, content, mentionedUserIds: mentionedUserIds ?? '' }, }) + if (ImDatabase.isInitialized() && _currentUserId) { + await ImDatabase.saveMessage(msg, _currentUserId) + } + return msg }, async revokeMessage(messageId: string): Promise { const config = getConfig() - return apiRequest(`/api/im/messages/${encodeURIComponent(messageId)}/revoke`, { + const msg = await apiRequest(`/api/im/messages/${encodeURIComponent(messageId)}/revoke`, { method: 'POST', params: { appId: config.appId }, }) + if (ImDatabase.isInitialized() && _currentUserId) { + await ImDatabase.saveMessage(msg, _currentUserId) + } + return msg }, async createGroup(name: string, memberIds: string[]): Promise { @@ -84,14 +130,70 @@ export const ImSDK = { async fetchGroupHistory(groupId: string, page = 0, size = 50): Promise { const config = getConfig() + + if (ImDatabase.isInitialized() && page === 0 && _currentUserId) { + const local = await ImDatabase.getMessages(config.appId, groupId, 'GROUP', _currentUserId, size) + if (local.length > 0) { + return local.map(m => ({ + id: m.serverId, + appId: m.appId, + fromUserId: m.fromUserId, + toId: m.toId, + chatType: m.chatType as ChatType, + msgType: m.msgType as MsgType, + content: m.content, + status: m.status as ImMessage['status'], + mentionedUserIds: m.mentionedUserIds ?? undefined, + createdAt: new Date(m.serverCreatedAt).toISOString(), + })) + } + } + const res = await apiRequest<{ content?: ImMessage[] } | ImMessage[]>( `/api/im/messages/history/${encodeURIComponent(groupId)}`, { params: { appId: config.appId, page: String(page), size: String(size) } }, ) - return Array.isArray(res) ? res : (res.content ?? []) + const messages = Array.isArray(res) ? res : (res.content ?? []) + + if (ImDatabase.isInitialized() && _currentUserId) { + await ImDatabase.bulkSave(messages, _currentUserId) + } + + return messages + }, + + /** List all conversations from local DB sorted by last message time. */ + async listConversations() { + if (!ImDatabase.isInitialized()) return [] + const config = getConfig() + return ImDatabase.getConversations(config.appId) + }, + + /** Mark a conversation as read (clears unread count). */ + async markRead(targetId: string): Promise { + if (!ImDatabase.isInitialized()) return + const config = getConfig() + await ImDatabase.markRead(config.appId, targetId) + }, + + addListener(listener: ImEventListener): void { + client?.addListener({ + ...listener, + onMessage: async (msg) => { + if (ImDatabase.isInitialized() && _currentUserId) { + await ImDatabase.saveMessage(msg, _currentUserId) + } + listener.onMessage?.(msg) + }, + onGroupMessage: async (msg) => { + if (ImDatabase.isInitialized() && _currentUserId) { + await ImDatabase.saveMessage(msg, _currentUserId) + } + listener.onGroupMessage?.(msg) + }, + }) }, - addListener(listener: ImEventListener): void { client?.addListener(listener) }, removeListener(listener: ImEventListener): void { client?.removeListener(listener) }, subscribeGroup(groupId: string): void { client?.subscribeGroup(groupId) }, isConnected(): boolean { return client?.isConnected() ?? false }, @@ -99,5 +201,6 @@ export const ImSDK = { disconnect(): void { client?.disconnect() client = null + _currentUserId = null }, } diff --git a/packages/im/src/db/ConversationModel.ts b/packages/im/src/db/ConversationModel.ts new file mode 100644 index 0000000..e0fa120 --- /dev/null +++ b/packages/im/src/db/ConversationModel.ts @@ -0,0 +1,16 @@ +import { Model } from '@nozbe/watermelondb' +import { field, date } from '@nozbe/watermelondb/decorators' + +export class ConversationModel extends Model { + static table = 'im_conversations' + + @field('app_id') appId!: string + @field('target_id') targetId!: string + @field('chat_type') chatType!: string + @field('last_msg_id') lastMsgId!: string | null + @field('last_msg_content') lastMsgContent!: string | null + @field('last_msg_type') lastMsgType!: string | null + @field('last_msg_time') lastMsgTime!: number + @field('unread_count') unreadCount!: number + @date('updated_at') updatedAt!: Date +} diff --git a/packages/im/src/db/ImDatabase.ts b/packages/im/src/db/ImDatabase.ts new file mode 100644 index 0000000..5714ba7 --- /dev/null +++ b/packages/im/src/db/ImDatabase.ts @@ -0,0 +1,153 @@ +import { Database, Q } from '@nozbe/watermelondb' +import SQLiteAdapter from '@nozbe/watermelondb/adapters/sqlite' +import { imDbSchema } from './schema' +import { ConversationModel } from './ConversationModel' +import { MessageModel } from './MessageModel' +import type { ImMessage } from '../types' + +let _db: Database | null = null + +function getDb(): Database { + if (!_db) throw new Error('[ImDatabase] Not initialized — call ImDatabase.init() first.') + return _db +} + +function conversationId(appId: string, userId: string, targetId: string, chatType: string): string { + if (chatType === 'GROUP') return `${appId}:G:${targetId}` + const [a, b] = [userId, targetId].sort() + return `${appId}:S:${a}:${b}` +} + +export const ImDatabase = { + init(dbName = 'xuqm_im') { + if (_db) return + const adapter = new SQLiteAdapter({ + schema: imDbSchema, + dbName, + jsi: true, + onSetUpError: (err) => console.error('[ImDatabase] setup error', err), + }) + _db = new Database({ + adapter, + modelClasses: [ConversationModel, MessageModel], + }) + }, + + async saveMessage(msg: ImMessage, currentUserId: string): Promise { + const db = getDb() + const convId = conversationId(msg.appId, currentUserId, msg.toId, msg.chatType) + const now = Date.now() + + await db.write(async () => { + // upsert message + const existing = await db + .get('im_messages') + .query(Q.where('server_id', msg.id)) + .fetch() + + if (existing.length === 0) { + await db.get('im_messages').create(m => { + m.serverId = msg.id + m.appId = msg.appId + m.conversationId = convId + m.fromUserId = msg.fromUserId + m.toId = msg.toId + m.chatType = msg.chatType + m.msgType = msg.msgType + m.content = msg.content + m.status = msg.status + m.mentionedUserIds = msg.mentionedUserIds ?? null + m.serverCreatedAt = new Date(msg.createdAt).getTime() + m.syncedAt = now + }) + } else { + await existing[0].update(m => { + m.status = msg.status + m.content = msg.content + m.syncedAt = now + }) + } + + // upsert conversation + const convs = await db + .get('im_conversations') + .query(Q.where('app_id', msg.appId), Q.where('target_id', msg.toId)) + .fetch() + + const msgTime = new Date(msg.createdAt).getTime() + if (convs.length === 0) { + await db.get('im_conversations').create(c => { + c.appId = msg.appId + c.targetId = msg.toId + c.chatType = msg.chatType + c.lastMsgId = msg.id + c.lastMsgContent = msg.content + c.lastMsgType = msg.msgType + c.lastMsgTime = msgTime + c.unreadCount = msg.fromUserId !== currentUserId ? 1 : 0 + c.updatedAt = new Date() + }) + } else { + await convs[0].update(c => { + if (msgTime >= c.lastMsgTime) { + c.lastMsgId = msg.id + c.lastMsgContent = msg.content + c.lastMsgType = msg.msgType + c.lastMsgTime = msgTime + } + if (msg.fromUserId !== currentUserId) { + c.unreadCount = c.unreadCount + 1 + } + c.updatedAt = new Date() + }) + } + }) + }, + + async getMessages(appId: string, targetId: string, chatType: string, currentUserId: string, limit = 50): Promise { + const db = getDb() + const convId = conversationId(appId, currentUserId, targetId, chatType) + return db + .get('im_messages') + .query( + Q.where('conversation_id', convId), + Q.sortBy('server_created_at', Q.desc), + Q.take(limit), + ) + .fetch() + }, + + async getConversations(appId: string): Promise { + const db = getDb() + return db + .get('im_conversations') + .query( + Q.where('app_id', appId), + Q.sortBy('last_msg_time', Q.desc), + ) + .fetch() + }, + + async markRead(appId: string, targetId: string): Promise { + const db = getDb() + const convs = await db + .get('im_conversations') + .query(Q.where('app_id', appId), Q.where('target_id', targetId)) + .fetch() + if (convs.length > 0) { + await db.write(async () => { + await convs[0].update(c => { c.unreadCount = 0 }) + }) + } + }, + + async bulkSave(messages: ImMessage[], currentUserId: string): Promise { + for (const msg of messages) { + await this.saveMessage(msg, currentUserId) + } + }, + + isInitialized(): boolean { + return _db !== null + }, +} diff --git a/packages/im/src/db/MessageModel.ts b/packages/im/src/db/MessageModel.ts new file mode 100644 index 0000000..1900506 --- /dev/null +++ b/packages/im/src/db/MessageModel.ts @@ -0,0 +1,19 @@ +import { Model } from '@nozbe/watermelondb' +import { field } from '@nozbe/watermelondb/decorators' + +export class MessageModel extends Model { + static table = 'im_messages' + + @field('server_id') serverId!: string + @field('app_id') appId!: string + @field('conversation_id') conversationId!: string + @field('from_user_id') fromUserId!: string + @field('to_id') toId!: string + @field('chat_type') chatType!: string + @field('msg_type') msgType!: string + @field('content') content!: string + @field('status') status!: string + @field('mentioned_user_ids') mentionedUserIds!: string | null + @field('server_created_at') serverCreatedAt!: number + @field('synced_at') syncedAt!: number +} diff --git a/packages/im/src/db/schema.ts b/packages/im/src/db/schema.ts new file mode 100644 index 0000000..40f76fe --- /dev/null +++ b/packages/im/src/db/schema.ts @@ -0,0 +1,38 @@ +import { appSchema, tableSchema } from '@nozbe/watermelondb' + +export const imDbSchema = appSchema({ + version: 1, + tables: [ + tableSchema({ + name: 'im_conversations', + columns: [ + { name: 'app_id', type: 'string', isIndexed: true }, + { name: 'target_id', type: 'string', isIndexed: true }, + { name: 'chat_type', type: 'string' }, + { name: 'last_msg_id', type: 'string', isOptional: true }, + { name: 'last_msg_content', type: 'string', isOptional: true }, + { name: 'last_msg_type', type: 'string', isOptional: true }, + { name: 'last_msg_time', type: 'number' }, + { name: 'unread_count', type: 'number' }, + { name: 'updated_at', type: 'number' }, + ], + }), + tableSchema({ + name: 'im_messages', + columns: [ + { name: 'server_id', type: 'string', isIndexed: true }, + { name: 'app_id', type: 'string', isIndexed: true }, + { name: 'conversation_id', type: 'string', isIndexed: true }, + { name: 'from_user_id', type: 'string' }, + { name: 'to_id', type: 'string' }, + { name: 'chat_type', type: 'string' }, + { name: 'msg_type', type: 'string' }, + { name: 'content', type: 'string' }, + { name: 'status', type: 'string' }, + { name: 'mentioned_user_ids', type: 'string', isOptional: true }, + { name: 'server_created_at', type: 'number' }, + { name: 'synced_at', type: 'number' }, + ], + }), + ], +}) diff --git a/packages/im/src/index.ts b/packages/im/src/index.ts index dce34e3..e27b23d 100644 --- a/packages/im/src/index.ts +++ b/packages/im/src/index.ts @@ -1,5 +1,6 @@ export { ImSDK } from './ImSDK' export { ImClient } from './ImClient' +export { ImDatabase } from './db/ImDatabase' export type { ImMessage, ImGroup, ChatType, MsgType, MsgStatus, ImEventListener, SendMessageParams,