XuqmGroup-AndroidSDK/sdk-im/src/main/java/com/xuqm/sdk/im/ImClient.kt
2026-05-07 19:39:38 +08:00

327 行
11 KiB
Kotlin

package com.xuqm.sdk.im
import com.google.gson.Gson
import com.xuqm.sdk.im.listener.ImEventListener
import com.xuqm.sdk.im.model.ImMessage
import android.util.Log
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import okhttp3.WebSocket
import okhttp3.WebSocketListener
import java.net.URI
import java.util.concurrent.CopyOnWriteArraySet
import java.util.concurrent.TimeUnit
class ImClient(
private val wsUrl: String,
private val token: String,
private val appKey: String,
) {
companion object {
private const val TAG = "XuqmImClient"
}
private var webSocket: WebSocket? = null
private val listeners = CopyOnWriteArraySet<ImEventListener>()
private val gson = Gson()
private val subscriptions = mutableMapOf<String, String>()
private var subscriptionSeed = 0
private var connected = false
private var inboundBuffer = StringBuilder()
private val okhttp = OkHttpClient.Builder()
.connectTimeout(10, TimeUnit.SECONDS)
.readTimeout(0, TimeUnit.SECONDS)
.build()
fun connect() {
Log.d(TAG, "connect() wsUrl=$wsUrl appKey=$appKey")
disconnect(closeSocket = false)
val request = Request.Builder()
.url(wsUrl)
.build()
webSocket = okhttp.newWebSocket(request, object : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
Log.d(TAG, "websocket onOpen code=${response.code}")
sendConnectFrame()
}
override fun onMessage(webSocket: WebSocket, text: String) {
Log.d(TAG, "websocket raw frame received length=${text.length}")
handleIncoming(text)
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
connected = false
Log.e(TAG, "websocket onFailure connected=false reason=${t.message}", t)
listeners.forEach { it.onDisconnected(t.message) }
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
connected = false
Log.d(TAG, "websocket onClosed code=$code reason=$reason")
listeners.forEach { it.onDisconnected(reason) }
}
})
}
fun subscribe(destination: String) {
val subscriptionId = synchronized(subscriptions) {
if (subscriptions.containsKey(destination)) return
val id = nextSubscriptionId()
subscriptions[destination] = id
id
}
if (connected) {
sendSubscribe(destination, subscriptionId)
}
}
fun unsubscribe(destination: String) {
val subscriptionId = synchronized(subscriptions) { subscriptions.remove(destination) } ?: return
if (connected) {
sendFrame("UNSUBSCRIBE", mapOf("id" to subscriptionId), null)
}
}
fun sendMessage(
messageId: String,
toId: String,
chatType: String,
msgType: String,
content: String,
mentionedUserIds: String? = null,
): Boolean {
Log.d(TAG, "sendMessage messageId=$messageId toId=$toId chatType=$chatType msgType=$msgType contentLength=${content.length} mentioned=${mentionedUserIds.orEmpty()}")
val payload = linkedMapOf(
"appKey" to appKey,
"messageId" to messageId,
"toId" to toId,
"chatType" to chatType,
"msgType" to msgType,
"content" to content,
)
if (!mentionedUserIds.isNullOrBlank()) {
payload["mentionedUserIds"] = mentionedUserIds
}
return sendFrame(
"SEND",
mapOf(
"destination" to "/app/chat.send",
"content-type" to "application/json",
),
gson.toJson(payload),
)
}
fun revokeMessage(messageId: String) {
sendFrame(
"SEND",
mapOf(
"destination" to "/app/chat.revoke",
"content-type" to "application/json",
),
gson.toJson(
mapOf(
"appKey" to appKey,
"messageId" to messageId,
)
),
)
}
private fun sendSync() {
sendFrame(
"SEND",
mapOf(
"destination" to "/app/chat.sync",
"content-type" to "application/json",
),
gson.toJson(mapOf("appKey" to appKey)),
)
}
fun addListener(listener: ImEventListener) = listeners.add(listener)
fun removeListener(listener: ImEventListener) = listeners.remove(listener)
fun disconnect() {
disconnect(closeSocket = true)
}
private fun sendConnectFrame() {
connected = false
Log.d(TAG, "send CONNECT frame")
sendFrame(
"CONNECT",
mapOf(
"accept-version" to "1.2",
"heart-beat" to "0,0",
"host" to URI.create(wsUrl).host.orEmpty(),
"Authorization" to "Bearer $token",
),
null,
)
}
private fun handleIncoming(chunk: String) {
if (chunk.isBlank()) return
inboundBuffer.append(chunk)
while (true) {
val terminator = inboundBuffer.indexOf("\u0000")
if (terminator < 0) return
val frame = inboundBuffer.substring(0, terminator)
inboundBuffer = StringBuilder(inboundBuffer.substring(terminator + 1))
if (frame.isNotBlank()) {
Log.d(TAG, "stomp frame completed size=${frame.length}")
handleFrame(frame)
}
}
}
private fun handleFrame(frame: String) {
val parts = frame.split("\n\n", limit = 2)
val headerLines = parts.firstOrNull().orEmpty().split("\n").filter { it.isNotBlank() }
val command = headerLines.firstOrNull()?.trim().orEmpty()
val headers = parseHeaders(headerLines.drop(1))
val body = parts.getOrNull(1).orEmpty()
when (command.uppercase()) {
"CONNECTED" -> {
connected = true
Log.d(TAG, "stomp CONNECTED subscriptionCount=${subscriptions.size}")
listeners.forEach { it.onConnected() }
sendSubscribe("/user/queue/messages", nextSubscriptionId(prefix = "user"))
val pendingSubscriptions = synchronized(subscriptions) { subscriptions.toMap() }
pendingSubscriptions.forEach { (destination, id) ->
if (destination != "/user/queue/messages") {
sendSubscribe(destination, id)
}
}
sendSync()
}
"MESSAGE" -> {
runCatching {
val msg = gson.fromJson(body, ImMessage::class.java)
Log.d(
TAG,
buildString {
append("stomp MESSAGE destination=").append(headers["destination"].orEmpty())
append(" id=").append(msg.id)
append(" chatType=").append(msg.chatType)
append(" msgType=").append(msg.msgType)
append(" from=").append(msg.fromId)
append(" to=").append(msg.toId)
append(" status=").append(msg.status)
},
)
if (msg.status.uppercase() == "READ") {
listeners.forEach { it.onRead(msg) }
}
if (msg.status.uppercase() == "REVOKED" || msg.msgType.uppercase() == "REVOKED") {
listeners.forEach { it.onRevoke(msg) }
return
}
if (msg.chatType.uppercase() == "GROUP") {
listeners.forEach { it.onGroupMessage(msg) }
} else {
listeners.forEach { it.onMessage(msg) }
}
}.onFailure { e ->
Log.e(TAG, "failed to parse MESSAGE frame body length=${body.length}", e)
listeners.forEach { it.onError("Parse error: ${e.message}") }
}
}
"ERROR" -> {
val reason = body.ifBlank { headers["message"].orEmpty() }
Log.e(TAG, "stomp ERROR reason=$reason")
listeners.forEach { it.onError(reason.ifBlank { "STOMP error" }) }
}
}
}
private fun sendSubscribe(destination: String, subscriptionId: String) {
sendFrame(
"SUBSCRIBE",
mapOf(
"id" to subscriptionId,
"destination" to destination,
),
null,
)
}
private fun sendFrame(command: String, headers: Map<String, String>, body: String?): Boolean {
val socket = webSocket ?: return false
val frame = buildString {
append(command).append('\n')
headers.forEach { (key, value) ->
append(escapeHeader(key)).append(':').append(escapeHeader(value)).append('\n')
}
append('\n')
if (body != null) {
append(body)
}
append('\u0000')
}
return socket.send(frame)
}
private fun parseHeaders(lines: List<String>): Map<String, String> {
val headers = linkedMapOf<String, String>()
lines.forEach { line ->
val index = line.indexOf(':')
if (index <= 0) return@forEach
val key = unescapeHeader(line.substring(0, index))
val value = unescapeHeader(line.substring(index + 1))
headers[key] = value
}
return headers
}
private fun nextSubscriptionId(prefix: String = "sub"): String {
subscriptionSeed += 1
return "$prefix-$subscriptionSeed"
}
private fun disconnect(closeSocket: Boolean) {
connected = false
synchronized(subscriptions) { subscriptions.clear() }
inboundBuffer = StringBuilder()
if (closeSocket) {
webSocket?.close(1000, "User disconnect")
}
webSocket = null
}
private fun escapeHeader(value: String): String =
value.replace("\\", "\\\\")
.replace("\r", "\\r")
.replace("\n", "\\n")
.replace(":", "\\c")
private fun unescapeHeader(value: String): String {
val builder = StringBuilder()
var index = 0
while (index < value.length) {
val ch = value[index]
if (ch == '\\' && index + 1 < value.length) {
when (value[index + 1]) {
'r' -> builder.append('\r')
'n' -> builder.append('\n')
'c' -> builder.append(':')
'\\' -> builder.append('\\')
else -> {
builder.append(value[index + 1])
}
}
index += 2
} else {
builder.append(ch)
index += 1
}
}
return builder.toString()
}
}