feat(rn-im): add WatermelonDB local message database

- Add schema for im_conversations and im_messages tables
- ConversationModel/MessageModel: WatermelonDB ORM models
- ImDatabase: init/saveMessage/getMessages/getConversations/markRead/bulkSave
- ImSDK: fetchHistory/fetchGroupHistory read local DB first, cache server results
- ImSDK: incoming WebSocket messages auto-saved to local DB
- ImSDK: new listConversations() and markRead() public APIs
- @nozbe/watermelondb >=0.27.0 added as peer dependency

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
这个提交包含在:
XuqmGroup 2026-04-24 20:54:12 +08:00
父节点 febefc8d69
当前提交 4e821b280b
共有 7 个文件被更改,包括 337 次插入6 次删除

查看文件

@ -12,6 +12,7 @@
"scripts": { "typecheck": "tsc --noEmit" }, "scripts": { "typecheck": "tsc --noEmit" },
"peerDependencies": { "peerDependencies": {
"@xuqm/rn-common": ">=0.2.0", "@xuqm/rn-common": ">=0.2.0",
"@nozbe/watermelondb": ">=0.27.0",
"react-native": ">=0.76.0" "react-native": ">=0.76.0"
}, },
"devDependencies": { "devDependencies": {

查看文件

@ -1,14 +1,17 @@
import { apiRequest, _getToken, _saveToken, getConfig } from '@xuqm/rn-common' import { apiRequest, _getToken, _saveToken, getConfig } from '@xuqm/rn-common'
import { ImClient } from './ImClient' import { ImClient } from './ImClient'
import { ImDatabase } from './db/ImDatabase'
import type { ChatType, ImEventListener, ImGroup, ImMessage, MsgType } from './types' import type { ChatType, ImEventListener, ImGroup, ImMessage, MsgType } from './types'
let client: ImClient | null = null let client: ImClient | null = null
let _currentUserId: string | null = null
export const ImSDK = { export const ImSDK = {
/** /**
* Login to IM service. Fetches a token internally and opens the WebSocket connection. * Login to IM service. Fetches a token internally and opens the WebSocket connection.
* Pass dbName to enable local SQLite message caching (requires @nozbe/watermelondb).
*/ */
async login(userId: string, nickname?: string, avatar?: string): Promise<void> { async login(userId: string, nickname?: string, avatar?: string, dbName?: string): Promise<void> {
const config = getConfig() const config = getConfig()
const res = await apiRequest<{ token: string }>('/api/im/auth/login', { const res = await apiRequest<{ token: string }>('/api/im/auth/login', {
method: 'POST', method: 'POST',
@ -21,6 +24,12 @@ export const ImSDK = {
}, },
}) })
await _saveToken(res.token) await _saveToken(res.token)
_currentUserId = userId
if (dbName !== undefined || ImDatabase.isInitialized()) {
ImDatabase.init(dbName ?? 'xuqm_im')
}
client = new ImClient(config.imWsUrl, res.token, config.appId) client = new ImClient(config.imWsUrl, res.token, config.appId)
client.connect() client.connect()
}, },
@ -33,13 +42,42 @@ export const ImSDK = {
client.connect() client.connect()
}, },
/**
* Fetch message history. Reads from local DB first; falls back to server if DB is empty
* or not initialized, then caches results locally.
*/
async fetchHistory(toId: string, page = 0, size = 20): Promise<ImMessage[]> { async fetchHistory(toId: string, page = 0, size = 20): Promise<ImMessage[]> {
const config = getConfig() const config = getConfig()
if (ImDatabase.isInitialized() && page === 0 && _currentUserId) {
const local = await ImDatabase.getMessages(config.appId, toId, 'SINGLE', _currentUserId, size)
if (local.length > 0) {
return local.map(m => ({
id: m.serverId,
appId: m.appId,
fromUserId: m.fromUserId,
toId: m.toId,
chatType: m.chatType as ChatType,
msgType: m.msgType as MsgType,
content: m.content,
status: m.status as ImMessage['status'],
mentionedUserIds: m.mentionedUserIds ?? undefined,
createdAt: new Date(m.serverCreatedAt).toISOString(),
}))
}
}
const res = await apiRequest<{ content?: ImMessage[] } | ImMessage[]>( const res = await apiRequest<{ content?: ImMessage[] } | ImMessage[]>(
`/api/im/messages/history/${encodeURIComponent(toId)}`, `/api/im/messages/history/${encodeURIComponent(toId)}`,
{ params: { appId: config.appId, page: String(page), size: String(size) } }, { params: { appId: config.appId, page: String(page), size: String(size) } },
) )
return Array.isArray(res) ? res : (res.content ?? []) const messages = Array.isArray(res) ? res : (res.content ?? [])
if (ImDatabase.isInitialized() && _currentUserId) {
await ImDatabase.bulkSave(messages, _currentUserId)
}
return messages
}, },
async sendMessage( async sendMessage(
@ -50,19 +88,27 @@ export const ImSDK = {
mentionedUserIds?: string, mentionedUserIds?: string,
): Promise<ImMessage> { ): Promise<ImMessage> {
const config = getConfig() const config = getConfig()
return apiRequest<ImMessage>('/api/im/messages/send', { const msg = await apiRequest<ImMessage>('/api/im/messages/send', {
method: 'POST', method: 'POST',
params: { appId: config.appId }, params: { appId: config.appId },
body: { toId, chatType, msgType, content, mentionedUserIds: mentionedUserIds ?? '' }, body: { toId, chatType, msgType, content, mentionedUserIds: mentionedUserIds ?? '' },
}) })
if (ImDatabase.isInitialized() && _currentUserId) {
await ImDatabase.saveMessage(msg, _currentUserId)
}
return msg
}, },
async revokeMessage(messageId: string): Promise<ImMessage> { async revokeMessage(messageId: string): Promise<ImMessage> {
const config = getConfig() const config = getConfig()
return apiRequest<ImMessage>(`/api/im/messages/${encodeURIComponent(messageId)}/revoke`, { const msg = await apiRequest<ImMessage>(`/api/im/messages/${encodeURIComponent(messageId)}/revoke`, {
method: 'POST', method: 'POST',
params: { appId: config.appId }, params: { appId: config.appId },
}) })
if (ImDatabase.isInitialized() && _currentUserId) {
await ImDatabase.saveMessage(msg, _currentUserId)
}
return msg
}, },
async createGroup(name: string, memberIds: string[]): Promise<ImGroup> { async createGroup(name: string, memberIds: string[]): Promise<ImGroup> {
@ -84,14 +130,70 @@ export const ImSDK = {
async fetchGroupHistory(groupId: string, page = 0, size = 50): Promise<ImMessage[]> { async fetchGroupHistory(groupId: string, page = 0, size = 50): Promise<ImMessage[]> {
const config = getConfig() const config = getConfig()
if (ImDatabase.isInitialized() && page === 0 && _currentUserId) {
const local = await ImDatabase.getMessages(config.appId, groupId, 'GROUP', _currentUserId, size)
if (local.length > 0) {
return local.map(m => ({
id: m.serverId,
appId: m.appId,
fromUserId: m.fromUserId,
toId: m.toId,
chatType: m.chatType as ChatType,
msgType: m.msgType as MsgType,
content: m.content,
status: m.status as ImMessage['status'],
mentionedUserIds: m.mentionedUserIds ?? undefined,
createdAt: new Date(m.serverCreatedAt).toISOString(),
}))
}
}
const res = await apiRequest<{ content?: ImMessage[] } | ImMessage[]>( const res = await apiRequest<{ content?: ImMessage[] } | ImMessage[]>(
`/api/im/messages/history/${encodeURIComponent(groupId)}`, `/api/im/messages/history/${encodeURIComponent(groupId)}`,
{ params: { appId: config.appId, page: String(page), size: String(size) } }, { params: { appId: config.appId, page: String(page), size: String(size) } },
) )
return Array.isArray(res) ? res : (res.content ?? []) const messages = Array.isArray(res) ? res : (res.content ?? [])
if (ImDatabase.isInitialized() && _currentUserId) {
await ImDatabase.bulkSave(messages, _currentUserId)
}
return messages
},
/** List all conversations from local DB sorted by last message time. */
async listConversations() {
if (!ImDatabase.isInitialized()) return []
const config = getConfig()
return ImDatabase.getConversations(config.appId)
},
/** Mark a conversation as read (clears unread count). */
async markRead(targetId: string): Promise<void> {
if (!ImDatabase.isInitialized()) return
const config = getConfig()
await ImDatabase.markRead(config.appId, targetId)
},
addListener(listener: ImEventListener): void {
client?.addListener({
...listener,
onMessage: async (msg) => {
if (ImDatabase.isInitialized() && _currentUserId) {
await ImDatabase.saveMessage(msg, _currentUserId)
}
listener.onMessage?.(msg)
},
onGroupMessage: async (msg) => {
if (ImDatabase.isInitialized() && _currentUserId) {
await ImDatabase.saveMessage(msg, _currentUserId)
}
listener.onGroupMessage?.(msg)
},
})
}, },
addListener(listener: ImEventListener): void { client?.addListener(listener) },
removeListener(listener: ImEventListener): void { client?.removeListener(listener) }, removeListener(listener: ImEventListener): void { client?.removeListener(listener) },
subscribeGroup(groupId: string): void { client?.subscribeGroup(groupId) }, subscribeGroup(groupId: string): void { client?.subscribeGroup(groupId) },
isConnected(): boolean { return client?.isConnected() ?? false }, isConnected(): boolean { return client?.isConnected() ?? false },
@ -99,5 +201,6 @@ export const ImSDK = {
disconnect(): void { disconnect(): void {
client?.disconnect() client?.disconnect()
client = null client = null
_currentUserId = null
}, },
} }

查看文件

@ -0,0 +1,16 @@
import { Model } from '@nozbe/watermelondb'
import { field, date } from '@nozbe/watermelondb/decorators'
export class ConversationModel extends Model {
static table = 'im_conversations'
@field('app_id') appId!: string
@field('target_id') targetId!: string
@field('chat_type') chatType!: string
@field('last_msg_id') lastMsgId!: string | null
@field('last_msg_content') lastMsgContent!: string | null
@field('last_msg_type') lastMsgType!: string | null
@field('last_msg_time') lastMsgTime!: number
@field('unread_count') unreadCount!: number
@date('updated_at') updatedAt!: Date
}

查看文件

@ -0,0 +1,153 @@
import { Database, Q } from '@nozbe/watermelondb'
import SQLiteAdapter from '@nozbe/watermelondb/adapters/sqlite'
import { imDbSchema } from './schema'
import { ConversationModel } from './ConversationModel'
import { MessageModel } from './MessageModel'
import type { ImMessage } from '../types'
let _db: Database | null = null
function getDb(): Database {
if (!_db) throw new Error('[ImDatabase] Not initialized — call ImDatabase.init() first.')
return _db
}
function conversationId(appId: string, userId: string, targetId: string, chatType: string): string {
if (chatType === 'GROUP') return `${appId}:G:${targetId}`
const [a, b] = [userId, targetId].sort()
return `${appId}:S:${a}:${b}`
}
export const ImDatabase = {
init(dbName = 'xuqm_im') {
if (_db) return
const adapter = new SQLiteAdapter({
schema: imDbSchema,
dbName,
jsi: true,
onSetUpError: (err) => console.error('[ImDatabase] setup error', err),
})
_db = new Database({
adapter,
modelClasses: [ConversationModel, MessageModel],
})
},
async saveMessage(msg: ImMessage, currentUserId: string): Promise<void> {
const db = getDb()
const convId = conversationId(msg.appId, currentUserId, msg.toId, msg.chatType)
const now = Date.now()
await db.write(async () => {
// upsert message
const existing = await db
.get<MessageModel>('im_messages')
.query(Q.where('server_id', msg.id))
.fetch()
if (existing.length === 0) {
await db.get<MessageModel>('im_messages').create(m => {
m.serverId = msg.id
m.appId = msg.appId
m.conversationId = convId
m.fromUserId = msg.fromUserId
m.toId = msg.toId
m.chatType = msg.chatType
m.msgType = msg.msgType
m.content = msg.content
m.status = msg.status
m.mentionedUserIds = msg.mentionedUserIds ?? null
m.serverCreatedAt = new Date(msg.createdAt).getTime()
m.syncedAt = now
})
} else {
await existing[0].update(m => {
m.status = msg.status
m.content = msg.content
m.syncedAt = now
})
}
// upsert conversation
const convs = await db
.get<ConversationModel>('im_conversations')
.query(Q.where('app_id', msg.appId), Q.where('target_id', msg.toId))
.fetch()
const msgTime = new Date(msg.createdAt).getTime()
if (convs.length === 0) {
await db.get<ConversationModel>('im_conversations').create(c => {
c.appId = msg.appId
c.targetId = msg.toId
c.chatType = msg.chatType
c.lastMsgId = msg.id
c.lastMsgContent = msg.content
c.lastMsgType = msg.msgType
c.lastMsgTime = msgTime
c.unreadCount = msg.fromUserId !== currentUserId ? 1 : 0
c.updatedAt = new Date()
})
} else {
await convs[0].update(c => {
if (msgTime >= c.lastMsgTime) {
c.lastMsgId = msg.id
c.lastMsgContent = msg.content
c.lastMsgType = msg.msgType
c.lastMsgTime = msgTime
}
if (msg.fromUserId !== currentUserId) {
c.unreadCount = c.unreadCount + 1
}
c.updatedAt = new Date()
})
}
})
},
async getMessages(appId: string, targetId: string, chatType: string, currentUserId: string, limit = 50): Promise<MessageModel[]> {
const db = getDb()
const convId = conversationId(appId, currentUserId, targetId, chatType)
return db
.get<MessageModel>('im_messages')
.query(
Q.where('conversation_id', convId),
Q.sortBy('server_created_at', Q.desc),
Q.take(limit),
)
.fetch()
},
async getConversations(appId: string): Promise<ConversationModel[]> {
const db = getDb()
return db
.get<ConversationModel>('im_conversations')
.query(
Q.where('app_id', appId),
Q.sortBy('last_msg_time', Q.desc),
)
.fetch()
},
async markRead(appId: string, targetId: string): Promise<void> {
const db = getDb()
const convs = await db
.get<ConversationModel>('im_conversations')
.query(Q.where('app_id', appId), Q.where('target_id', targetId))
.fetch()
if (convs.length > 0) {
await db.write(async () => {
await convs[0].update(c => { c.unreadCount = 0 })
})
}
},
async bulkSave(messages: ImMessage[], currentUserId: string): Promise<void> {
for (const msg of messages) {
await this.saveMessage(msg, currentUserId)
}
},
isInitialized(): boolean {
return _db !== null
},
}

查看文件

@ -0,0 +1,19 @@
import { Model } from '@nozbe/watermelondb'
import { field } from '@nozbe/watermelondb/decorators'
export class MessageModel extends Model {
static table = 'im_messages'
@field('server_id') serverId!: string
@field('app_id') appId!: string
@field('conversation_id') conversationId!: string
@field('from_user_id') fromUserId!: string
@field('to_id') toId!: string
@field('chat_type') chatType!: string
@field('msg_type') msgType!: string
@field('content') content!: string
@field('status') status!: string
@field('mentioned_user_ids') mentionedUserIds!: string | null
@field('server_created_at') serverCreatedAt!: number
@field('synced_at') syncedAt!: number
}

查看文件

@ -0,0 +1,38 @@
import { appSchema, tableSchema } from '@nozbe/watermelondb'
export const imDbSchema = appSchema({
version: 1,
tables: [
tableSchema({
name: 'im_conversations',
columns: [
{ name: 'app_id', type: 'string', isIndexed: true },
{ name: 'target_id', type: 'string', isIndexed: true },
{ name: 'chat_type', type: 'string' },
{ name: 'last_msg_id', type: 'string', isOptional: true },
{ name: 'last_msg_content', type: 'string', isOptional: true },
{ name: 'last_msg_type', type: 'string', isOptional: true },
{ name: 'last_msg_time', type: 'number' },
{ name: 'unread_count', type: 'number' },
{ name: 'updated_at', type: 'number' },
],
}),
tableSchema({
name: 'im_messages',
columns: [
{ name: 'server_id', type: 'string', isIndexed: true },
{ name: 'app_id', type: 'string', isIndexed: true },
{ name: 'conversation_id', type: 'string', isIndexed: true },
{ name: 'from_user_id', type: 'string' },
{ name: 'to_id', type: 'string' },
{ name: 'chat_type', type: 'string' },
{ name: 'msg_type', type: 'string' },
{ name: 'content', type: 'string' },
{ name: 'status', type: 'string' },
{ name: 'mentioned_user_ids', type: 'string', isOptional: true },
{ name: 'server_created_at', type: 'number' },
{ name: 'synced_at', type: 'number' },
],
}),
],
})

查看文件

@ -1,5 +1,6 @@
export { ImSDK } from './ImSDK' export { ImSDK } from './ImSDK'
export { ImClient } from './ImClient' export { ImClient } from './ImClient'
export { ImDatabase } from './db/ImDatabase'
export type { export type {
ImMessage, ImGroup, ChatType, MsgType, MsgStatus, ImMessage, ImGroup, ChatType, MsgType, MsgStatus,
ImEventListener, SendMessageParams, ImEventListener, SendMessageParams,