import { _getToken, getConfig } from '@xuqm/rn-common' import type { ImEventListener, ImMessage, SendMessageParams } from './types' interface StompFrame { command: string headers: Record body: string } export class ImClient { private ws: WebSocket | null = null private listeners: ImEventListener[] = [] private reconnectTimer: ReturnType | null = null private reconnectDelay = 3000 private shouldReconnect = true private readonly subscriptionId = 'sub-user-queue' private groupSubscriptions = new Set() private activeWsUrl: string | null = null private activeToken: string | null = null private activeAppId: string | null = null constructor( private readonly wsUrl?: string, private readonly token?: string, private readonly appId?: string, ) {} async connect() { this.shouldReconnect = true this.activeWsUrl = this.wsUrl ?? getConfig().imWsUrl this.activeToken = this.token ?? null this.activeAppId = this.appId ?? getConfig().appId if (!this.activeToken) { this.activeToken = await _getToken() } if (!this.activeWsUrl) { this.listeners.forEach(listener => listener.onError?.('IM websocket URL not configured')) return } if (!this.activeToken) { this.listeners.forEach(listener => listener.onError?.('IM token not configured')) return } this.openSocket() } sendMessage( toId: string, chatType: SendMessageParams['chatType'], msgType: SendMessageParams['msgType'], content: string, mentionedUserIds?: string, ) { this.send({ toId, chatType, msgType, content, mentionedUserIds, }) } send(params: SendMessageParams) { if (this.ws?.readyState !== WebSocket.OPEN) { throw new Error('IM not connected') } if (!this.activeAppId) { throw new Error('IM appId not configured') } this.sendFrame( 'SEND', { destination: '/app/chat.send', 'content-type': 'application/json', }, JSON.stringify({ appId: this.activeAppId, toId: params.toId, chatType: params.chatType, msgType: params.msgType, content: params.content, mentionedUserIds: params.mentionedUserIds ?? '', }), ) } revoke(messageId: string) { if (this.ws?.readyState !== WebSocket.OPEN) { throw new Error('IM not connected') } if (!this.activeAppId) { throw new Error('IM appId not configured') } this.sendFrame( 'SEND', { destination: '/app/chat.revoke', 'content-type': 'application/json', }, JSON.stringify({ appId: this.activeAppId, messageId, }), ) } subscribeGroup(groupId: string) { const alreadySubscribed = this.groupSubscriptions.has(groupId) this.groupSubscriptions.add(groupId) if (this.ws?.readyState === WebSocket.OPEN) { if (alreadySubscribed) return this.subscribe(`/topic/group/${groupId}`, `group-${groupId}`) } } unsubscribeGroup(groupId: string) { this.groupSubscriptions.delete(groupId) if (this.ws?.readyState === WebSocket.OPEN) { this.sendFrame('UNSUBSCRIBE', { id: `group-${groupId}` }) } } addListener(listener: ImEventListener) { this.listeners.push(listener) } removeListener(listener: ImEventListener) { this.listeners = this.listeners.filter(item => item !== listener) } disconnect() { this.shouldReconnect = false if (this.reconnectTimer) clearTimeout(this.reconnectTimer) if (this.ws?.readyState === WebSocket.OPEN) { this.sendFrame('DISCONNECT') } this.ws?.close(1000, 'User disconnect') this.ws = null } isConnected(): boolean { return this.ws?.readyState === WebSocket.OPEN } private openSocket() { if (!this.activeWsUrl) return this.ws = new WebSocket(this.activeWsUrl) this.ws.onopen = () => { if (!this.activeToken) { void _getToken().then(token => { this.activeToken = token if (token) { this.sendFrame('CONNECT', { 'accept-version': '1.2', Authorization: `Bearer ${token}`, 'heart-beat': '10000,10000', }) } }) return } this.sendFrame('CONNECT', { 'accept-version': '1.2', Authorization: `Bearer ${this.activeToken}`, 'heart-beat': '10000,10000', }) } this.ws.onmessage = event => { try { const frames = this.parseFrames(String(event.data)) frames.forEach(frame => this.handleFrame(frame)) } catch { this.listeners.forEach(listener => listener.onError?.('Parse error')) } } this.ws.onclose = event => { this.listeners.forEach(listener => listener.onDisconnected?.(event.reason)) if (this.shouldReconnect) { this.reconnectTimer = setTimeout(() => { this.reconnectDelay = Math.min(this.reconnectDelay * 2, 30000) this.openSocket() }, this.reconnectDelay) } } this.ws.onerror = () => { this.listeners.forEach(listener => listener.onError?.('WebSocket error')) } } private handleFrame(frame: StompFrame) { if (frame.command === 'CONNECTED') { this.reconnectDelay = 3000 this.subscribe('/user/queue/messages', this.subscriptionId) this.groupSubscriptions.forEach(groupId => { this.subscribe(`/topic/group/${groupId}`, `group-${groupId}`) }) this.listeners.forEach(listener => listener.onConnected?.()) return } if (frame.command === 'MESSAGE') { const message: ImMessage = JSON.parse(frame.body) if (message.chatType === 'GROUP') { this.listeners.forEach(listener => listener.onGroupMessage?.(message)) return } this.listeners.forEach(listener => listener.onMessage?.(message)) return } if (frame.command === 'ERROR') { this.listeners.forEach(listener => listener.onError?.(frame.body || 'WebSocket error')) } } private subscribe(destination: string, id: string) { this.sendFrame('SUBSCRIBE', { destination, id }) } private sendFrame(command: string, headers: Record = {}, body = '') { if (!this.ws) return const headerLines = Object.entries(headers) .map(([key, value]) => `${key}:${value}`) .join('\n') const frame = `${command}\n${headerLines}\n\n${body}\u0000` this.ws.send(frame) } private parseFrames(raw: string): StompFrame[] { return raw .split('\u0000') .map(frame => frame.replace(/^\n+/, '').trim()) .filter(Boolean) .map(frame => { const separatorIndex = frame.indexOf('\n\n') const headerBlock = separatorIndex >= 0 ? frame.slice(0, separatorIndex) : frame const body = separatorIndex >= 0 ? frame.slice(separatorIndex + 2) : '' const [command, ...headerLines] = headerBlock.split('\n').filter(Boolean) const headers = Object.fromEntries( headerLines .filter(line => line.includes(':')) .map(line => { const index = line.indexOf(':') return [line.slice(0, index), line.slice(index + 1)] }), ) return { command, headers, body } }) } }