|
@@ -0,0 +1,153 @@
|
|
|
+package cn.org.bjca.trust.java.imserver.im;
|
|
|
+
|
|
|
+
|
|
|
+import cn.org.bjca.trust.java.imserver.common.SpringUtilsAuTo;
|
|
|
+import cn.org.bjca.trust.java.imserver.common.json.GsonImplHelp;
|
|
|
+import cn.org.bjca.trust.java.imserver.enums.PacketType;
|
|
|
+import cn.org.bjca.trust.java.imserver.im.bean.PacketMessage;
|
|
|
+import cn.org.bjca.trust.java.imserver.im.msg.ConnectMessage;
|
|
|
+import cn.org.bjca.trust.java.imserver.im.msg.ConnectedMessage;
|
|
|
+import cn.org.bjca.trust.java.imserver.im.msg.msg.SZYXMessage;
|
|
|
+import cn.org.bjca.trust.java.imserver.im.msg.msg.SendAckMessage;
|
|
|
+import cn.org.bjca.trust.java.imserver.repository.UserRepository;
|
|
|
+import org.eclipse.paho.client.mqttv3.*;
|
|
|
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
|
|
|
+
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
+
|
|
|
+public class ImManager {
|
|
|
+
|
|
|
+ private final UserRepository userRepository = SpringUtilsAuTo.getBean(UserRepository.class);
|
|
|
+
|
|
|
+ public static ImManager getInstance() {
|
|
|
+ return ImManagerHolder.instance;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static final class ImManagerHolder {
|
|
|
+ static final ImManager instance = new ImManager();
|
|
|
+ }
|
|
|
+
|
|
|
+ private MqttClient mqttClient;
|
|
|
+ private final MqttConnectOptions connectOptions;
|
|
|
+
|
|
|
+ public ImManager() {
|
|
|
+ connectOptions = new MqttConnectOptions();
|
|
|
+ connectOptions.setCleanSession(false);
|
|
|
+ connectOptions.setUserName("server");
|
|
|
+ connectOptions.setPassword("server".toCharArray());
|
|
|
+ connectOptions.setConnectionTimeout(30);
|
|
|
+ connectOptions.setKeepAliveInterval(10);
|
|
|
+ connectOptions.setAutomaticReconnect(true);
|
|
|
+
|
|
|
+
|
|
|
+ try {
|
|
|
+ mqttClient = new MqttClient("tcp://114.115.203.60:18883",
|
|
|
+ "server" + System.currentTimeMillis(), new MemoryPersistence());
|
|
|
+ mqttClient.setCallback(new MqttCallbackExtended() {
|
|
|
+ @Override
|
|
|
+ public void connectComplete(boolean reconnect, String serverURI) {
|
|
|
+ System.out.println("======>connectComplete:reconnect?" + reconnect + "::" + serverURI);
|
|
|
+ if (reconnect) return;
|
|
|
+ try {
|
|
|
+ mqttClient.subscribe("server" + PacketType.CONNECT, 2);
|
|
|
+ mqttClient.subscribe("server" + PacketType.SEND, 2);
|
|
|
+ mqttClient.subscribe("server" + PacketType.PING, 2);
|
|
|
+ mqttClient.subscribe("server" + PacketType.REQUEST, 2);
|
|
|
+ } catch (MqttException ignored) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void connectionLost(Throwable cause) {
|
|
|
+ System.out.println("======>connectionLost" + cause.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
|
|
|
+ String msg = new String(message.getPayload());
|
|
|
+ if (topic.equals("server" + PacketType.CONNECT)) {
|
|
|
+ sendConnectedMsg(GsonImplHelp.get().toObject(msg, ConnectMessage.class));
|
|
|
+ } else if (topic.equals("server" + PacketType.SEND)) {
|
|
|
+ SZYXMessage szyxMessage = GsonImplHelp.get().toObject(msg, SZYXMessage.class);
|
|
|
+
|
|
|
+ if (!szyxMessage.isGroup()) {
|
|
|
+// List<UserInfo> userInfoList = userRepository.findAllByUserIdAndAppId(szyxMessage.getToClientId(), szyxMessage.getAppId());
|
|
|
+
|
|
|
+// for (UserInfo userInfo : userInfoList) {
|
|
|
+ SZYXMessage message1 = szyxMessage.copy();
|
|
|
+ message1.setUserName(szyxMessage.getToClientId());
|
|
|
+ message1.setArrive(true);
|
|
|
+ message1.setStatus(0);
|
|
|
+// sendPacketMessage(szyxMessage.getPacketType(), message1);
|
|
|
+// }
|
|
|
+ sendPacketMessage(szyxMessage.getPacketType(), message1);
|
|
|
+ }
|
|
|
+ // TODO: 这里应该有计时器,判断转发消息是否送达,如果没送达则ack返回失败状态
|
|
|
+ sendAckMsg(szyxMessage);
|
|
|
+
|
|
|
+ } else
|
|
|
+ System.out.println("======>messageArrived:\n" + topic + "::" + msg);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void deliveryComplete(IMqttDeliveryToken token) {
|
|
|
+ }
|
|
|
+
|
|
|
+ });
|
|
|
+ } catch (MqttException e) {
|
|
|
+ System.out.println("======>MqttException" + e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ public void connect() {
|
|
|
+ new Thread(() -> {
|
|
|
+ try {
|
|
|
+ if (!mqttClient.isConnected()) {
|
|
|
+ mqttClient.connect(connectOptions);
|
|
|
+ }
|
|
|
+ } catch (MqttException e) {
|
|
|
+ System.out.println("======>connect" + e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ).start();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private void sendConnectedMsg(ConnectMessage connectMessage) {
|
|
|
+ ConnectedMessage message = new ConnectedMessage();
|
|
|
+ message.setStatus(200);
|
|
|
+ message.setAppId(connectMessage.getAppId());
|
|
|
+ message.setUserName(connectMessage.getUserName());
|
|
|
+ sendPacketMessage(PacketType.CONNECTED, message);
|
|
|
+ System.out.println("======>登录成功\n" + GsonImplHelp.get().toJson(message));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendAckMsg(SZYXMessage szyxMessage) {
|
|
|
+ SendAckMessage message = new SendAckMessage();
|
|
|
+ message.setAckId(szyxMessage.getMsgId());
|
|
|
+ message.setAckStatus(0);
|
|
|
+ message.setAppId(szyxMessage.getAppId());
|
|
|
+ message.setUserName(szyxMessage.getUserName());
|
|
|
+ sendPacketMessage(PacketType.SEND_ACK, message);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendPacketMessage(PacketType packetType, PacketMessage message) {
|
|
|
+ if (null == connectOptions) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ String msg = GsonImplHelp.get().toJson(message);
|
|
|
+ try {
|
|
|
+ String topic = message.getAppId() + "/" + packetType + message.getUserName();
|
|
|
+ mqttClient.publish(topic, msg.getBytes(StandardCharsets.UTF_8), 2, false);
|
|
|
+
|
|
|
+ System.out.println("======>发送消息:\n" + topic + "\n" + msg);
|
|
|
+ } catch (MqttException ignored) {
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+}
|