import { Database, Q } from '@nozbe/watermelondb' import SQLiteAdapter from '@nozbe/watermelondb/adapters/sqlite' import { imDbSchema } from './schema' import { ConversationModel } from './ConversationModel' import { MessageModel } from './MessageModel' import type { ImMessage } from '../types' let _db: Database | null = null let _dbName: string | null = null const draftStore = new Map() function getDb(): Database { if (!_db) throw new Error('[ImDatabase] Not initialized — call ImDatabase.init() first.') return _db } function conversationId(appId: string, userId: string, targetId: string, chatType: string): string { if (chatType === 'GROUP') return `${appId}:G:${targetId}` const [a, b] = [userId, targetId].sort() return `${appId}:S:${a}:${b}` } function draftKey(appId: string, targetId: string, chatType: string): string { return `${appId}:${chatType}:${targetId}` } export interface MessageSearchParams { keyword?: string toId?: string chatType?: string startTime?: number endTime?: number msgTypes?: string[] limit?: number offset?: number } export const ImDatabase = { init(dbName = 'xuqm_im') { if (_db && _dbName === dbName) return const adapter = new SQLiteAdapter({ schema: imDbSchema, dbName, jsi: true, onSetUpError: (err: unknown) => console.error('[ImDatabase] setup error', err), }) _db = new Database({ adapter, modelClasses: [ConversationModel, MessageModel], }) _dbName = dbName }, async saveMessage(msg: ImMessage, currentUserId: string): Promise { const db = getDb() const convId = conversationId(msg.appId, currentUserId, msg.toId, msg.chatType) const now = Date.now() await db.write(async () => { // upsert message const existing = await db .get('im_messages') .query(Q.where('server_id', msg.id)) .fetch() if (existing.length === 0) { await db.get('im_messages').create((m: MessageModel) => { m.serverId = msg.id m.appId = msg.appId m.conversationId = convId m.fromUserId = msg.fromUserId m.toId = msg.toId m.chatType = msg.chatType m.msgType = msg.msgType m.content = msg.content m.status = msg.status m.mentionedUserIds = msg.mentionedUserIds ?? null m.serverCreatedAt = new Date(msg.createdAt).getTime() m.editedAt = msg.editedAt ?? null m.syncedAt = now }) } else { await existing[0].update((m: MessageModel) => { m.status = msg.status m.content = msg.content m.editedAt = msg.editedAt ?? m.editedAt m.syncedAt = now }) } // upsert conversation — preserve isMuted/isPinned on update const convs = await db .get('im_conversations') .query(Q.where('app_id', msg.appId), Q.where('target_id', msg.toId)) .fetch() const msgTime = new Date(msg.createdAt).getTime() if (convs.length === 0) { await db.get('im_conversations').create((c: ConversationModel) => { c.appId = msg.appId c.targetId = msg.toId c.chatType = msg.chatType c.lastMsgId = msg.id c.lastMsgContent = msg.content c.lastMsgType = msg.msgType c.lastMsgTime = msgTime c.unreadCount = msg.fromUserId !== currentUserId ? 1 : 0 c.isMuted = false c.isPinned = false c.draft = null c.updatedAt = new Date() }) } else { await convs[0].update((c: ConversationModel) => { if (msgTime >= c.lastMsgTime) { c.lastMsgId = msg.id c.lastMsgContent = msg.content c.lastMsgType = msg.msgType c.lastMsgTime = msgTime } if (msg.fromUserId !== currentUserId) { c.unreadCount = c.unreadCount + 1 } // isMuted and isPinned are intentionally preserved (not overwritten) c.updatedAt = new Date() }) } }) }, async revokeMessage(appId: string, messageId: string): Promise { const db = getDb() const messages = await db .get('im_messages') .query(Q.where('app_id', appId), Q.where('server_id', messageId)) .fetch() if (messages.length === 0) return const message = messages[0] const now = Date.now() const revokedContent = '消息已撤回' await db.write(async () => { await message.update((m: MessageModel) => { m.status = 'REVOKED' m.content = revokedContent m.syncedAt = now }) const conversations = await db .get('im_conversations') .query(Q.where('app_id', appId), Q.where('target_id', message.toId)) .fetch() if (conversations.length > 0) { await conversations[0].update((c: ConversationModel) => { if (c.lastMsgId === messageId) { c.lastMsgContent = revokedContent c.lastMsgType = 'REVOKED' c.updatedAt = new Date() } }) } }) }, async getMessages(appId: string, targetId: string, chatType: string, currentUserId: string, limit = 50): Promise { const db = getDb() const convId = conversationId(appId, currentUserId, targetId, chatType) return db .get('im_messages') .query( Q.where('conversation_id', convId), Q.sortBy('server_created_at', Q.desc), Q.take(limit), ) .fetch() }, async getConversations(appId: string): Promise { const db = getDb() return db .get('im_conversations') .query( Q.where('app_id', appId), Q.sortBy('last_msg_time', Q.desc), ) .fetch() }, async markRead(appId: string, targetId: string): Promise { const db = getDb() const convs = await db .get('im_conversations') .query(Q.where('app_id', appId), Q.where('target_id', targetId)) .fetch() if (convs.length > 0) { await db.write(async () => { await convs[0].update((c: ConversationModel) => { c.unreadCount = 0 }) }) } }, async setDraft(appId: string, targetId: string, chatType: string, draft: string): Promise { draftStore.set(draftKey(appId, targetId, chatType), draft) const db = getDb() const convs = await db .get('im_conversations') .query(Q.where('app_id', appId), Q.where('target_id', targetId)) .fetch() if (convs.length > 0) { await db.write(async () => { await convs[0].update((c: ConversationModel) => { c.draft = draft c.updatedAt = new Date() }) }) } }, async bulkSave(messages: ImMessage[], currentUserId: string): Promise { for (const msg of messages) { await this.saveMessage(msg, currentUserId) } }, async searchMessages(appId: string, params: MessageSearchParams): Promise { const db = getDb() const conditions: any[] = [ Q.where('app_id', appId), ] if (params.toId) { conditions.push(Q.where('to_id', params.toId)) } if (params.chatType) { conditions.push(Q.where('chat_type', params.chatType)) } if (params.keyword) { conditions.push(Q.where('content', Q.like(`%${Q.sanitizeLikeString(params.keyword)}%`))) } if (params.startTime !== undefined) { conditions.push(Q.where('server_created_at', Q.gte(params.startTime))) } if (params.endTime !== undefined) { conditions.push(Q.where('server_created_at', Q.lte(params.endTime))) } if (params.msgTypes && params.msgTypes.length > 0) { conditions.push(Q.where('msg_type', Q.oneOf(params.msgTypes))) } let query = db .get('im_messages') .query(...conditions) if (params.limit !== undefined) { query = query.extend(Q.take(params.limit) as any) } if (params.offset !== undefined) { query = query.extend(Q.skip(params.offset) as any) } return query.fetch() }, subscribeConversations( appId: string, callback: (conversations: ConversationModel[]) => void, ): () => void { const db = getDb() const query = db .get('im_conversations') .query( Q.where('app_id', appId), Q.sortBy('is_pinned', Q.desc), Q.sortBy('last_msg_time', Q.desc), ) const subscription = query.observe().subscribe(callback) return () => subscription.unsubscribe() }, async setConversationMuted(appId: string, targetId: string, muted: boolean): Promise { const db = getDb() const convs = await db .get('im_conversations') .query(Q.where('app_id', appId), Q.where('target_id', targetId)) .fetch() if (convs.length > 0) { await db.write(async () => { await convs[0].update((c: ConversationModel) => { c.isMuted = muted }) }) } }, async setConversationPinned(appId: string, targetId: string, pinned: boolean): Promise { const db = getDb() const convs = await db .get('im_conversations') .query(Q.where('app_id', appId), Q.where('target_id', targetId)) .fetch() if (convs.length > 0) { await db.write(async () => { await convs[0].update((c: ConversationModel) => { c.isPinned = pinned }) }) } }, async getConversationDraft(appId: string, targetId: string, chatType: string): Promise { const memoryDraft = draftStore.get(draftKey(appId, targetId, chatType)) if (memoryDraft !== undefined) { return memoryDraft } const db = getDb() const convs = await db .get('im_conversations') .query(Q.where('app_id', appId), Q.where('target_id', targetId)) .fetch() if (convs.length === 0) return null return convs[0].draft }, async deleteConversation(appId: string, targetId: string, chatType: string, currentUserId: string): Promise { const db = getDb() const convId = conversationId(appId, currentUserId, targetId, chatType) const convs = await db .get('im_conversations') .query(Q.where('app_id', appId), Q.where('target_id', targetId)) .fetch() const messages = await db .get('im_messages') .query(Q.where('conversation_id', convId)) .fetch() await db.write(async () => { for (const message of messages) { await message.destroyPermanently() } for (const conversation of convs) { await conversation.destroyPermanently() } }) }, isInitialized(): boolean { return _db !== null }, }