XuqmGroup-RNSDK/packages/im/src/db/ImDatabase.ts
2026-05-07 19:39:41 +08:00

349 行
11 KiB
TypeScript

import { Database, Q } from '@nozbe/watermelondb'
import SQLiteAdapter from '@nozbe/watermelondb/adapters/sqlite'
import { imDbSchema } from './schema'
import { ConversationModel } from './ConversationModel'
import { MessageModel } from './MessageModel'
import type { ImMessage } from '../types'
let _db: Database | null = null
let _dbName: string | null = null
const draftStore = new Map<string, string>()
function getDb(): Database {
if (!_db) throw new Error('[ImDatabase] Not initialized — call ImDatabase.init() first.')
return _db
}
function conversationId(appKey: string, userId: string, targetId: string, chatType: string): string {
if (chatType === 'GROUP') return `${appKey}:G:${targetId}`
const [a, b] = [userId, targetId].sort()
return `${appKey}:S:${a}:${b}`
}
function draftKey(appKey: string, targetId: string, chatType: string): string {
return `${appKey}:${chatType}:${targetId}`
}
export interface MessageSearchParams {
keyword?: string
toId?: string
chatType?: string
startTime?: number
endTime?: number
msgTypes?: string[]
limit?: number
offset?: number
}
export const ImDatabase = {
init(dbName = 'xuqm_im') {
if (_db && _dbName === dbName) return
const adapter = new SQLiteAdapter({
schema: imDbSchema,
dbName,
jsi: true,
onSetUpError: (err: unknown) => console.error('[ImDatabase] setup error', err),
})
_db = new Database({
adapter,
modelClasses: [ConversationModel, MessageModel],
})
_dbName = dbName
},
async saveMessage(msg: ImMessage, currentUserId: string): Promise<void> {
const db = getDb()
const convId = conversationId(msg.appKey, currentUserId, msg.toId, msg.chatType)
const now = Date.now()
await db.write(async () => {
// upsert message
const existing = await db
.get<MessageModel>('im_messages')
.query(Q.where('server_id', msg.id))
.fetch()
if (existing.length === 0) {
await db.get<MessageModel>('im_messages').create((m: MessageModel) => {
m.serverId = msg.id
m.appKey = msg.appKey
m.conversationId = convId
m.fromUserId = msg.fromUserId
m.toId = msg.toId
m.chatType = msg.chatType
m.msgType = msg.msgType
m.content = msg.content
m.status = msg.status
m.mentionedUserIds = msg.mentionedUserIds ?? null
m.serverCreatedAt = new Date(msg.createdAt).getTime()
m.editedAt = msg.editedAt ?? null
m.syncedAt = now
})
} else {
await existing[0].update((m: MessageModel) => {
m.status = msg.status
m.content = msg.content
m.editedAt = msg.editedAt ?? m.editedAt
m.syncedAt = now
})
}
// upsert conversation — preserve isMuted/isPinned on update
const convs = await db
.get<ConversationModel>('im_conversations')
.query(Q.where('app_id', msg.appKey), Q.where('target_id', msg.toId))
.fetch()
const msgTime = new Date(msg.createdAt).getTime()
if (convs.length === 0) {
await db.get<ConversationModel>('im_conversations').create((c: ConversationModel) => {
c.appKey = msg.appKey
c.targetId = msg.toId
c.chatType = msg.chatType
c.lastMsgId = msg.id
c.lastMsgContent = msg.content
c.lastMsgType = msg.msgType
c.lastMsgTime = msgTime
c.unreadCount = msg.fromUserId !== currentUserId ? 1 : 0
c.isMuted = false
c.isPinned = false
c.draft = null
c.updatedAt = new Date()
})
} else {
await convs[0].update((c: ConversationModel) => {
if (msgTime >= c.lastMsgTime) {
c.lastMsgId = msg.id
c.lastMsgContent = msg.content
c.lastMsgType = msg.msgType
c.lastMsgTime = msgTime
}
if (msg.fromUserId !== currentUserId) {
c.unreadCount = c.unreadCount + 1
}
// isMuted and isPinned are intentionally preserved (not overwritten)
c.updatedAt = new Date()
})
}
})
},
async revokeMessage(appKey: string, messageId: string): Promise<void> {
const db = getDb()
const messages = await db
.get<MessageModel>('im_messages')
.query(Q.where('app_id', appKey), Q.where('server_id', messageId))
.fetch()
if (messages.length === 0) return
const message = messages[0]
const now = Date.now()
const revokedContent = '消息已撤回'
await db.write(async () => {
await message.update((m: MessageModel) => {
m.status = 'REVOKED'
m.content = revokedContent
m.syncedAt = now
})
const conversations = await db
.get<ConversationModel>('im_conversations')
.query(Q.where('app_id', appKey), Q.where('target_id', message.toId))
.fetch()
if (conversations.length > 0) {
await conversations[0].update((c: ConversationModel) => {
if (c.lastMsgId === messageId) {
c.lastMsgContent = revokedContent
c.lastMsgType = 'REVOKED'
c.updatedAt = new Date()
}
})
}
})
},
async getMessages(appKey: string, targetId: string, chatType: string, currentUserId: string, limit = 50): Promise<MessageModel[]> {
const db = getDb()
const convId = conversationId(appKey, currentUserId, targetId, chatType)
return db
.get<MessageModel>('im_messages')
.query(
Q.where('conversation_id', convId),
Q.sortBy('server_created_at', Q.desc),
Q.take(limit),
)
.fetch()
},
async getConversations(appKey: string): Promise<ConversationModel[]> {
const db = getDb()
return db
.get<ConversationModel>('im_conversations')
.query(
Q.where('app_id', appKey),
Q.sortBy('last_msg_time', Q.desc),
)
.fetch()
},
async markRead(appKey: string, targetId: string): Promise<void> {
const db = getDb()
const convs = await db
.get<ConversationModel>('im_conversations')
.query(Q.where('app_id', appKey), Q.where('target_id', targetId))
.fetch()
if (convs.length > 0) {
await db.write(async () => {
await convs[0].update((c: ConversationModel) => { c.unreadCount = 0 })
})
}
},
async setDraft(appKey: string, targetId: string, chatType: string, draft: string): Promise<void> {
draftStore.set(draftKey(appKey, targetId, chatType), draft)
const db = getDb()
const convs = await db
.get<ConversationModel>('im_conversations')
.query(Q.where('app_id', appKey), Q.where('target_id', targetId))
.fetch()
if (convs.length > 0) {
await db.write(async () => {
await convs[0].update((c: ConversationModel) => {
c.draft = draft
c.updatedAt = new Date()
})
})
}
},
async bulkSave(messages: ImMessage[], currentUserId: string): Promise<void> {
for (const msg of messages) {
await this.saveMessage(msg, currentUserId)
}
},
async searchMessages(appKey: string, params: MessageSearchParams): Promise<MessageModel[]> {
const db = getDb()
const conditions: any[] = [
Q.where('app_id', appKey),
]
if (params.toId) {
conditions.push(Q.where('to_id', params.toId))
}
if (params.chatType) {
conditions.push(Q.where('chat_type', params.chatType))
}
if (params.keyword) {
conditions.push(Q.where('content', Q.like(`%${Q.sanitizeLikeString(params.keyword)}%`)))
}
if (params.startTime !== undefined) {
conditions.push(Q.where('server_created_at', Q.gte(params.startTime)))
}
if (params.endTime !== undefined) {
conditions.push(Q.where('server_created_at', Q.lte(params.endTime)))
}
if (params.msgTypes && params.msgTypes.length > 0) {
conditions.push(Q.where('msg_type', Q.oneOf(params.msgTypes)))
}
let query = db
.get<MessageModel>('im_messages')
.query(...conditions)
if (params.limit !== undefined) {
query = query.extend(Q.take(params.limit) as any)
}
if (params.offset !== undefined) {
query = query.extend(Q.skip(params.offset) as any)
}
return query.fetch()
},
subscribeConversations(
appKey: string,
callback: (conversations: ConversationModel[]) => void,
): () => void {
const db = getDb()
const query = db
.get<ConversationModel>('im_conversations')
.query(
Q.where('app_id', appKey),
Q.sortBy('is_pinned', Q.desc),
Q.sortBy('last_msg_time', Q.desc),
)
const subscription = query.observe().subscribe(callback)
return () => subscription.unsubscribe()
},
async setConversationMuted(appKey: string, targetId: string, muted: boolean): Promise<void> {
const db = getDb()
const convs = await db
.get<ConversationModel>('im_conversations')
.query(Q.where('app_id', appKey), Q.where('target_id', targetId))
.fetch()
if (convs.length > 0) {
await db.write(async () => {
await convs[0].update((c: ConversationModel) => { c.isMuted = muted })
})
}
},
async setConversationPinned(appKey: string, targetId: string, pinned: boolean): Promise<void> {
const db = getDb()
const convs = await db
.get<ConversationModel>('im_conversations')
.query(Q.where('app_id', appKey), Q.where('target_id', targetId))
.fetch()
if (convs.length > 0) {
await db.write(async () => {
await convs[0].update((c: ConversationModel) => { c.isPinned = pinned })
})
}
},
async getConversationDraft(appKey: string, targetId: string, chatType: string): Promise<string | null> {
const memoryDraft = draftStore.get(draftKey(appKey, targetId, chatType))
if (memoryDraft !== undefined) {
return memoryDraft
}
const db = getDb()
const convs = await db
.get<ConversationModel>('im_conversations')
.query(Q.where('app_id', appKey), Q.where('target_id', targetId))
.fetch()
if (convs.length === 0) return null
return convs[0].draft
},
async deleteConversation(appKey: string, targetId: string, chatType: string, currentUserId: string): Promise<void> {
const db = getDb()
const convId = conversationId(appKey, currentUserId, targetId, chatType)
const convs = await db
.get<ConversationModel>('im_conversations')
.query(Q.where('app_id', appKey), Q.where('target_id', targetId))
.fetch()
const messages = await db
.get<MessageModel>('im_messages')
.query(Q.where('conversation_id', convId))
.fetch()
await db.write(async () => {
for (const message of messages) {
await message.destroyPermanently()
}
for (const conversation of convs) {
await conversation.destroyPermanently()
}
})
},
isInitialized(): boolean {
return _db !== null
},
}