import type { ImMessage, SendMessageParams, ImEventMap } from '../types' import { getConfig, getToken, getUserId } from '../core/sdk' import { DEFAULT_IM_WS_URL } from '../core/endpoints' type EventListener = ImEventMap[K] const MAX_RECONNECT_DELAY = 30_000 function buildStompFrame(command: string, headers: Record, body?: string): string { let frame = command + '\n' for (const [key, value] of Object.entries(headers)) { frame += `${key}:${value}\n` } frame += '\n' if (body) frame += body frame += '\x00' return frame } function parseStompFrame(data: string): { command: string; headers: Record; body: string } | null { const terminator = data.indexOf('\x00') if (terminator < 0) return null const frame = data.substring(0, terminator) const splitIndex = frame.indexOf('\n\n') if (splitIndex < 0) return null const headerPart = frame.substring(0, splitIndex) const body = frame.substring(splitIndex + 2) const lines = headerPart.split('\n').filter((l) => l.trim() !== '') const command = lines[0]?.trim() || '' const headers: Record = {} for (let i = 1; i < lines.length; i++) { const line = lines[i] const idx = line.indexOf(':') if (idx > 0) headers[line.substring(0, idx).trim()] = line.substring(idx + 1).trim() } return { command, headers, body } } export class ImClient { private ws: WebSocket | null = null private reconnectDelay = 3_000 private reconnectTimer: ReturnType | null = null private destroyed = false private listeners: { [K in keyof ImEventMap]?: Set> } = {} private subscriptionSeed = 0 private subscriptions = new Map() // destination -> id on(event: K, handler: EventListener): this { const store = this.listeners as Record> const listeners = (store[event] ?? new Set>()) as Set> listeners.add(handler) store[event] = listeners as Set return this } off(event: K, handler: EventListener): this { (this.listeners[event] as Set> | undefined)?.delete(handler) return this } private emit(event: K, ...args: Parameters): void { (this.listeners[event] as Set<(...a: unknown[]) => void> | undefined)?.forEach((h) => h(...(args as unknown[])) ) } connect(): void { if (this.destroyed) return const config = getConfig() const token = getToken() const url = `${config.wsUrl || DEFAULT_IM_WS_URL}?token=${token ?? ''}` this.ws = new WebSocket(url) let buffer = '' this.ws.onopen = () => { this.reconnectDelay = 3_000 if (config.debug) console.log('[ImClient] ws opened, sending STOMP CONNECT') const wsUrl = new URL(url) const headers: Record = { 'accept-version': '1.2', 'heart-beat': '0,0', host: wsUrl.hostname + (wsUrl.port ? ':' + wsUrl.port : ''), } if (token) headers['Authorization'] = `Bearer ${token}` this.ws?.send(buildStompFrame('CONNECT', headers)) } this.ws.onmessage = (event) => { buffer += event.data as string let frame: ReturnType while ((frame = parseStompFrame(buffer)) !== null) { buffer = buffer.substring(buffer.indexOf('\x00') + 1) this.handleStompFrame(frame, config) } } this.ws.onclose = (event) => { this.emit('disconnected', event.code, event.reason) if (!this.destroyed) this.scheduleReconnect() } this.ws.onerror = (event) => { this.emit('error', event) } } private handleStompFrame( frame: { command: string; headers: Record; body: string }, config: ReturnType ): void { const cmd = frame.command.toUpperCase() if (config.debug) console.log(`[ImClient] STOMP ${cmd}`, frame.headers) switch (cmd) { case 'CONNECTED': { if (config.debug) console.log('[ImClient] STOMP connected') // Auto subscribe to user queue this.sendSubscribe('/user/queue/messages') // Resubscribe previous subscriptions this.subscriptions.forEach((id, dest) => { if (dest !== '/user/queue/messages') { this.ws?.send(buildStompFrame('SUBSCRIBE', { id, destination: dest })) } }) this.emit('connected') break } case 'MESSAGE': { try { const message = this.normalizeMessage(JSON.parse(frame.body) as ImMessage) if (config.debug) console.log('[ImClient] MESSAGE', message.id, message.msgType, message.status) if (message.status === 'READ') { this.emit('read', message) } if (message.revoked || message.status === 'REVOKED' || message.msgType === 'REVOKED') { this.emit('revoke', { msgId: message.id, operatorId: message.fromId ?? message.fromUserId }) // Don't return early — also emit as message for consistency with old behavior } this.emit('message', message) } catch { // ignore malformed frames } break } case 'ERROR': { const reason = frame.body || frame.headers['message'] || 'STOMP error' if (config.debug) console.error('[ImClient] STOMP ERROR', reason) this.emit('error', new Event(reason)) break } } } send(params: SendMessageParams): ImMessage { const config = getConfig() const userId = getUserId() ?? '' const messageId = params.messageId ?? this.generateMessageId() const outgoing: ImMessage = { id: messageId, appId: config.appKey, fromUserId: userId, fromId: userId, toId: params.toId, chatType: params.chatType, msgType: params.msgType, content: params.content, status: 'SENDING', mentionedUserIds: params.mentionedUserIds, revoked: false, createdAt: new Date().toISOString(), } if (this.ws?.readyState !== WebSocket.OPEN) { return { ...outgoing, status: 'FAILED' } } const payload: Record = { appId: config.appKey, messageId, toId: params.toId, chatType: params.chatType, msgType: params.msgType, content: params.content, } if (params.mentionedUserIds) { payload.mentionedUserIds = params.mentionedUserIds } this.ws.send( buildStompFrame( 'SEND', { destination: '/app/chat.send', 'content-type': 'application/json' }, JSON.stringify(payload) ) ) return outgoing } revoke(msgId: string): void { if (this.ws?.readyState !== WebSocket.OPEN) { throw new Error('WebSocket not connected') } const config = getConfig() this.ws.send( buildStompFrame( 'SEND', { destination: '/app/chat.revoke', 'content-type': 'application/json' }, JSON.stringify({ appId: config.appKey, messageId: msgId }) ) ) } subscribe(destination: string): void { if (this.subscriptions.has(destination)) return this.sendSubscribe(destination) } private sendSubscribe(destination: string, id?: string): void { const sid = id ?? this.nextSubscriptionId() this.subscriptions.set(destination, sid) this.ws?.send(buildStompFrame('SUBSCRIBE', { id: sid, destination })) } disconnect(): void { this.destroyed = true if (this.reconnectTimer) clearTimeout(this.reconnectTimer) if (this.ws?.readyState === WebSocket.OPEN) { this.ws.send(buildStompFrame('DISCONNECT', {})) } this.ws?.close() this.ws = null this.subscriptions.clear() } private scheduleReconnect(): void { if (this.destroyed) return if (getConfig().debug) { console.log(`[ImClient] reconnect in ${this.reconnectDelay}ms`) } this.reconnectTimer = setTimeout(() => { this.connect() this.reconnectDelay = Math.min(this.reconnectDelay * 2, MAX_RECONNECT_DELAY) }, this.reconnectDelay) } private generateMessageId(): string { const cryptoId = globalThis.crypto?.randomUUID?.() if (cryptoId) return cryptoId return `msg_${Date.now()}_${Math.random().toString(16).slice(2)}` } private nextSubscriptionId(): string { this.subscriptionSeed += 1 return `sub-${this.subscriptionSeed}` } private normalizeMessage(message: ImMessage): ImMessage { return { ...message, fromId: message.fromId ?? message.fromUserId, revoked: message.revoked ?? message.status === 'REVOKED', } } }