|
|
@@ -1,5 +1,6 @@
|
|
|
package com.yunfeiyun.agmp.iots.core.manager;
|
|
|
|
|
|
+import cn.hutool.core.util.ArrayUtil;
|
|
|
import com.alibaba.fastjson2.JSONObject;
|
|
|
import com.yunfeiyun.agmp.common.constant.ErrorCode;
|
|
|
import com.yunfeiyun.agmp.common.utils.DateUtils;
|
|
|
@@ -17,6 +18,7 @@ import com.yunfeiyun.agmp.iots.device.common.Device;
|
|
|
import com.yunfeiyun.agmp.iots.service.IIotDeviceService;
|
|
|
import com.yunfeiyun.agmp.iots.service.checker.CmdResultCheckService;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.collections4.CollectionUtils;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttException;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
@@ -157,16 +159,21 @@ public class MqttManager {
|
|
|
try {
|
|
|
// 尝试关闭MqttCore
|
|
|
log.info("【尝试关闭MQTT连接】 connectionId: {}", connectionId);
|
|
|
+ String[] topics = mqttCore.getSubTopic();
|
|
|
+ if(!ArrayUtil.isEmpty(topics)){
|
|
|
+ mqttCore.unsubscribe(topics);
|
|
|
+ }
|
|
|
mqttCore.close();
|
|
|
log.info("【成功关闭MQTT连接】 connectionId: {}", connectionId);
|
|
|
} catch (Exception e) {
|
|
|
// 记录关闭失败的日志
|
|
|
log.error("【链接关闭失败】 connectionId: {}, 异常信息: {}", connectionId, e.getMessage(), e);
|
|
|
+ throw e;
|
|
|
+ } finally {
|
|
|
+ // 从map中移除该MqttCore
|
|
|
+ serviceMqttMap.remove(connectionId);
|
|
|
+ log.info("【从映射中移除MQTT连接】 connectionId: {}", connectionId);
|
|
|
}
|
|
|
-
|
|
|
- // 从map中移除该MqttCore
|
|
|
- serviceMqttMap.remove(connectionId);
|
|
|
- log.info("【从映射中移除MQTT连接】 connectionId: {}", connectionId);
|
|
|
} else {
|
|
|
// 如果找不到对应的MqttCore,则抛出异常
|
|
|
log.warn("【尝试关闭不存在的MQTT连接】 connectionId: {}", connectionId);
|