XuqmGroup-RNSDK/packages/im/src/ImClient.ts

332 行
9.7 KiB
TypeScript

import { _getToken, getConfig, getUserId } from '@xuqm/rn-common'
import type { ImEventListener, ImMessage, SendMessageParams } from './types'
interface StompFrame {
command: string
headers: Record<string, string>
body: string
}
export class ImClient {
private ws: WebSocket | null = null
private listeners: ImEventListener[] = []
private reconnectTimer: ReturnType<typeof setTimeout> | null = null
private reconnectDelay = 3000
private shouldReconnect = true
private readonly subscriptionId = 'sub-user-queue'
private groupSubscriptions = new Set<string>()
private activeWsUrl: string | null = null
private activeToken: string | null = null
2026-05-07 19:39:41 +08:00
private activeAppKey: string | null = null
private connectedResolvers: Array<() => void> = []
constructor(
private readonly wsUrl?: string,
private readonly token?: string,
2026-05-07 19:39:41 +08:00
private readonly appKey?: string,
) {}
async connect(): Promise<void> {
this.shouldReconnect = true
this.activeWsUrl = this.wsUrl ?? getConfig().imWsUrl
this.activeToken = this.token ?? null
2026-05-07 19:39:41 +08:00
this.activeAppKey = this.appKey ?? getConfig().appKey
if (!this.activeToken) {
this.activeToken = await _getToken()
}
if (!this.activeWsUrl) {
this.listeners.forEach(listener => listener.onError?.('IM websocket URL not configured'))
return
}
if (!this.activeToken) {
this.listeners.forEach(listener => listener.onError?.('IM token not configured'))
return
}
this.openSocket()
// 等待 STOMP CONNECTED 帧
await new Promise<void>(resolve => {
this.connectedResolvers.push(resolve)
})
}
sendMessage(
toId: string,
chatType: SendMessageParams['chatType'],
msgType: SendMessageParams['msgType'],
content: string,
mentionedUserIds?: string,
): ImMessage {
return this.send({
toId,
chatType,
msgType,
content,
mentionedUserIds,
})
}
send(params: SendMessageParams): ImMessage {
2026-05-07 19:39:41 +08:00
if (!this.activeAppKey) {
throw new Error('IM appKey not configured')
}
const outgoing = this.buildOutgoingMessage(params)
if (this.ws?.readyState !== WebSocket.OPEN) {
return { ...outgoing, status: 'FAILED' }
}
this.sendFrame(
'SEND',
{
destination: '/app/chat.send',
'content-type': 'application/json',
},
JSON.stringify({
2026-05-07 19:39:41 +08:00
appKey: this.activeAppKey,
messageId: outgoing.id,
toId: params.toId,
chatType: params.chatType,
msgType: params.msgType,
content: params.content,
mentionedUserIds: params.mentionedUserIds ?? '',
}),
)
return outgoing
}
revoke(messageId: string) {
if (this.ws?.readyState !== WebSocket.OPEN) {
throw new Error('IM not connected')
}
2026-05-07 19:39:41 +08:00
if (!this.activeAppKey) {
throw new Error('IM appKey not configured')
}
this.sendFrame(
'SEND',
{
destination: '/app/chat.revoke',
'content-type': 'application/json',
},
JSON.stringify({
2026-05-07 19:39:41 +08:00
appKey: this.activeAppKey,
messageId,
}),
)
}
sync() {
if (this.ws?.readyState !== WebSocket.OPEN) return
2026-05-07 19:39:41 +08:00
if (!this.activeAppKey) return
this.sendFrame(
'SEND',
{
destination: '/app/chat.sync',
'content-type': 'application/json',
},
2026-05-07 19:39:41 +08:00
JSON.stringify({ appKey: this.activeAppKey }),
)
}
private sendSync() {
this.sync()
}
subscribeGroup(groupId: string) {
const alreadySubscribed = this.groupSubscriptions.has(groupId)
this.groupSubscriptions.add(groupId)
if (this.ws?.readyState === WebSocket.OPEN) {
if (alreadySubscribed) return
this.subscribe(`/topic/group/${groupId}`, `group-${groupId}`)
}
}
unsubscribeGroup(groupId: string) {
this.groupSubscriptions.delete(groupId)
if (this.ws?.readyState === WebSocket.OPEN) {
this.sendFrame('UNSUBSCRIBE', { id: `group-${groupId}` })
}
}
addListener(listener: ImEventListener) {
this.listeners.push(listener)
}
removeListener(listener: ImEventListener) {
this.listeners = this.listeners.filter(item => item !== listener)
}
disconnect() {
this.shouldReconnect = false
if (this.reconnectTimer) clearTimeout(this.reconnectTimer)
if (this.ws?.readyState === WebSocket.OPEN) {
this.sendFrame('DISCONNECT')
}
this.ws?.close(1000, 'User disconnect')
this.ws = null
}
isConnected(): boolean {
return this.ws?.readyState === WebSocket.OPEN
}
private openSocket() {
if (!this.activeWsUrl) return
this.ws = new WebSocket(this.activeWsUrl)
this.ws.onopen = () => {
if (!this.activeToken) {
void _getToken().then((token: string | null) => {
this.activeToken = token
if (token) {
this.sendFrame('CONNECT', {
'accept-version': '1.2',
Authorization: `Bearer ${token}`,
'heart-beat': '10000,10000',
})
}
})
return
}
this.sendFrame('CONNECT', {
'accept-version': '1.2',
Authorization: `Bearer ${this.activeToken}`,
'heart-beat': '10000,10000',
})
}
this.ws.onmessage = event => {
try {
const frames = this.parseFrames(String(event.data))
frames.forEach(frame => this.handleFrame(frame))
} catch {
this.listeners.forEach(listener => listener.onError?.('Parse error'))
}
}
this.ws.onclose = event => {
this.listeners.forEach(listener => listener.onDisconnected?.(event.reason))
if (this.shouldReconnect) {
this.reconnectTimer = setTimeout(() => {
this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000)
this.openSocket()
}, this.reconnectDelay)
}
}
this.ws.onerror = () => {
this.listeners.forEach(listener => listener.onError?.('WebSocket error'))
}
}
private handleFrame(frame: StompFrame) {
if (frame.command === 'CONNECTED') {
this.reconnectDelay = 3000
this.subscribe('/user/queue/messages', this.subscriptionId)
this.groupSubscriptions.forEach(groupId => {
this.subscribe(`/topic/group/${groupId}`, `group-${groupId}`)
})
this.sendSync()
this.connectedResolvers.forEach(resolve => resolve())
this.connectedResolvers = []
this.listeners.forEach(listener => listener.onConnected?.())
return
}
if (frame.command === 'MESSAGE') {
const message: ImMessage = this.normalizeMessage(JSON.parse(frame.body))
if (message.status === 'READ') {
this.listeners.forEach(listener => listener.onRead?.(message))
}
if (message.revoked || message.status === 'REVOKED' || message.msgType === 'REVOKED') {
this.listeners.forEach(listener =>
listener.onRevoke?.({ msgId: message.id, operatorId: message.fromId ?? message.fromUserId }),
)
}
if (message.chatType === 'GROUP') {
this.listeners.forEach(listener => listener.onGroupMessage?.(message))
} else {
this.listeners.forEach(listener => listener.onMessage?.(message))
}
if (message.msgType === 'NOTIFY') {
this.listeners.forEach(listener => listener.onSystemMessage?.(message))
}
return
}
if (frame.command === 'ERROR') {
this.listeners.forEach(listener => listener.onError?.(frame.body || 'WebSocket error'))
}
}
private subscribe(destination: string, id: string) {
this.sendFrame('SUBSCRIBE', { destination, id })
}
private sendFrame(command: string, headers: Record<string, string> = {}, body = '') {
if (!this.ws) return
const headerLines = Object.entries(headers)
.map(([key, value]) => `${key}:${value}`)
.join('\n')
const frame = `${command}\n${headerLines}\n\n${body}\u0000`
this.ws.send(frame)
}
private parseFrames(raw: string): StompFrame[] {
return raw
.split('\u0000')
.map(frame => frame.replace(/^\n+/, '').trim())
.filter(Boolean)
.map(frame => {
const separatorIndex = frame.indexOf('\n\n')
const headerBlock = separatorIndex >= 0 ? frame.slice(0, separatorIndex) : frame
const body = separatorIndex >= 0 ? frame.slice(separatorIndex + 2) : ''
const [command, ...headerLines] = headerBlock.split('\n').filter(Boolean)
const headers = Object.fromEntries(
headerLines
.filter(line => line.includes(':'))
.map(line => {
const index = line.indexOf(':')
return [line.slice(0, index), line.slice(index + 1)]
}),
)
return { command, headers, body }
})
}
private buildOutgoingMessage(params: SendMessageParams): ImMessage {
const userId = getUserId() ?? ''
const messageId = params.messageId ?? this.generateMessageId()
return {
id: messageId,
2026-05-07 19:39:41 +08:00
appKey: this.activeAppKey ?? getConfig().appKey,
fromUserId: userId,
fromId: userId,
toId: params.toId,
chatType: params.chatType,
msgType: params.msgType,
content: params.content,
status: 'SENDING',
mentionedUserIds: params.mentionedUserIds,
groupReadCount: 0,
revoked: false,
createdAt: Date.now(),
}
}
private generateMessageId(): string {
const cryptoApi = globalThis as typeof globalThis & { crypto?: { randomUUID?: () => string } }
const cryptoId = cryptoApi.crypto?.randomUUID?.()
return cryptoId ?? `msg_${Date.now()}_${Math.random().toString(16).slice(2)}`
}
private normalizeMessage(message: ImMessage): ImMessage {
return {
...message,
fromId: message.fromId ?? message.fromUserId,
revoked: message.revoked ?? message.status === 'REVOKED',
2026-05-07 19:39:41 +08:00
appKey: message.appKey ?? (this.activeAppKey ?? getConfig().appKey),
}
}
}