|
|
@@ -5,17 +5,15 @@ import com.yunfeiyun.agmp.common.constant.ErrorCode;
|
|
|
import com.yunfeiyun.agmp.common.utils.DateUtils;
|
|
|
import com.yunfeiyun.agmp.iot.common.constant.IotErrorCode;
|
|
|
import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
|
|
|
-import com.yunfeiyun.agmp.iot.common.domain.IotFirmdev;
|
|
|
-import com.yunfeiyun.agmp.iot.common.domain.resvo.IotFirmdevResVo;
|
|
|
import com.yunfeiyun.agmp.iot.common.enums.DevType;
|
|
|
import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
|
|
|
+import com.yunfeiyun.agmp.iots.common.modal.TosDevicetypeResVo;
|
|
|
import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
|
|
|
import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttConfig;
|
|
|
import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttCore;
|
|
|
import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttPublisher;
|
|
|
import com.yunfeiyun.agmp.iots.device.common.Device;
|
|
|
import com.yunfeiyun.agmp.iots.device.service.IIotDeviceService;
|
|
|
-import com.yunfeiyun.agmp.iots.device.service.IIotFirmdevService;
|
|
|
import com.yunfeiyun.agmp.iots.service.CmdResultCheckService;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.eclipse.paho.client.mqttv3.MqttClient;
|
|
|
@@ -44,8 +42,6 @@ public class MqttManager {
|
|
|
@Autowired
|
|
|
IIotDeviceService iIotDeviceService;
|
|
|
|
|
|
- @Autowired
|
|
|
- IIotFirmdevService iotFirmdevService;
|
|
|
|
|
|
/**
|
|
|
* 实现类名称-->mqtt
|
|
|
@@ -62,12 +58,11 @@ public class MqttManager {
|
|
|
private CmdResultCheckService cmdResultCheckService;
|
|
|
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
* 强制对于已经废弃/逻辑需要重新梳理抛出异常
|
|
|
*/
|
|
|
- void throwDeprecatedMethod(String msg){
|
|
|
- throw new IotBizException(ErrorCode.FAILURE.getCode(),msg == null ? "方法已经废弃/逻辑需要重新梳理,重新更换" : msg);
|
|
|
+ void throwDeprecatedMethod(String msg) {
|
|
|
+ throw new IotBizException(ErrorCode.FAILURE.getCode(), msg == null ? "方法已经废弃/逻辑需要重新梳理,重新更换" : msg);
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -77,19 +72,25 @@ public class MqttManager {
|
|
|
* 【链接管理-创建】
|
|
|
* 根据配置构建 MqttCoreConnection
|
|
|
* 创建连接,底层统一调这个
|
|
|
- * @param iotFirmdevResVo
|
|
|
+ *
|
|
|
+ * @param tosDeviceType
|
|
|
* @param jsonConfig
|
|
|
* @throws MqttException
|
|
|
*/
|
|
|
- public void buildMqttConnection(IotFirmdevResVo iotFirmdevResVo, JSONObject jsonConfig) throws MqttException {
|
|
|
- log.info("【开始构建MQTT连接】 iotFirmdevResVo: {}, jsonConfig: {}", iotFirmdevResVo, jsonConfig);
|
|
|
+ public void buildMqttConnection(TosDevicetypeResVo tosDeviceType, JSONObject jsonConfig) throws MqttException {
|
|
|
+ log.info("【开始构建MQTT连接】 tosDeviceTypeName: {}, tosDeviceTypeId:{}, jsonConfig: {}", tosDeviceType.getDevclassName(), tosDeviceType.getDevtypeBid(), jsonConfig);
|
|
|
|
|
|
// 构建配置
|
|
|
- MqttConfig cfgYf = buildMqttConfig(iotFirmdevResVo, jsonConfig);
|
|
|
+ MqttConfig cfgYf = buildMqttConfig(tosDeviceType, jsonConfig);
|
|
|
+ // 厂家id
|
|
|
String firmBizId = cfgYf.getFirmBizId();
|
|
|
+ //服务类Bean名称
|
|
|
String serviceName = cfgYf.getServiceName();
|
|
|
+ // 之前干嘛的待定,
|
|
|
String type = cfgYf.getType();
|
|
|
+ //厂家名称
|
|
|
String firmName = cfgYf.getFirmName();
|
|
|
+ //设备类型id(原来的类型小类id)
|
|
|
String deviceTypeId = cfgYf.getDeviceTypeBizId();
|
|
|
log.info("【初始化厂商配置】 {} {} 开始", firmName, type);
|
|
|
|
|
|
@@ -97,20 +98,16 @@ public class MqttManager {
|
|
|
MqttCore mqttCore = new MqttCore();
|
|
|
log.info("【创建MqttCore实例】 mqttCore: {}", mqttCore);
|
|
|
|
|
|
- // 查询topics
|
|
|
- String[] topics = deviceTopicService.getBatchTopic(serviceName, deviceTopicService.getDeviceIdByFirmBizId(firmBizId, deviceTypeId));
|
|
|
+ // 查询topics【需要实现:重新更改获取该型号下的设备】
|
|
|
+ //String[] topics = deviceTopicService.getBatchTopic(serviceName, deviceTopicService.getDeviceIdByFirmBizId(firmBizId, deviceTypeId));
|
|
|
+ String[] topics = deviceTopicService.getBatchTopic(serviceName, new String[]{"", "", ""});
|
|
|
cfgYf.setSubTopic(topics);
|
|
|
cfgYf.setServiceName(serviceName);
|
|
|
log.info("【初始化厂商加载配置】 {} {} {}", firmName, type, Arrays.toString(topics));
|
|
|
|
|
|
// 处理连接ID的逻辑
|
|
|
- if ("私有".equals("查询字段")) {
|
|
|
- log.info("【添加私有连接】 connectionId: connectionId");
|
|
|
- addPrivateConnectionMap("connectionId", mqttCore);
|
|
|
- } else {
|
|
|
- log.info("【添加公共连接】 connectionId: connectionId");
|
|
|
- addCommonConnectionMap("connectionId", mqttCore);
|
|
|
- }
|
|
|
+ log.info("【添加公共连接】 connectionId: connectionId");
|
|
|
+ addConnectionMap("connectionId", mqttCore);
|
|
|
|
|
|
// 构建MqttCore
|
|
|
try {
|
|
|
@@ -129,6 +126,7 @@ public class MqttManager {
|
|
|
/**
|
|
|
* 【链接管理-删除】
|
|
|
* 删除mqtt链接,底层统一调这个
|
|
|
+ *
|
|
|
* @param connectionId
|
|
|
* @throws MqttException
|
|
|
*/
|
|
|
@@ -164,13 +162,13 @@ public class MqttManager {
|
|
|
/**
|
|
|
* 将json配置文件解析,构建配置
|
|
|
*
|
|
|
- * @param iotFirmdevResVo
|
|
|
+ * @param tosDeviceType
|
|
|
* @param jsonConfig
|
|
|
* @return
|
|
|
*/
|
|
|
- MqttConfig buildMqttConfig(IotFirmdevResVo iotFirmdevResVo, JSONObject jsonConfig) {
|
|
|
+ MqttConfig buildMqttConfig(TosDevicetypeResVo tosDeviceType, JSONObject jsonConfig) {
|
|
|
MqttConfig cfgYf = new MqttConfig();
|
|
|
- String firmBizId = iotFirmdevResVo.getFirmBid();
|
|
|
+ String firmBizId = tosDeviceType.getFirmBid();
|
|
|
String serviceName = jsonConfig.getString("service");
|
|
|
String type = jsonConfig.getString("type");
|
|
|
|
|
|
@@ -178,9 +176,9 @@ public class MqttManager {
|
|
|
cfgYf.setFirmBizId(firmBizId);
|
|
|
cfgYf.setType(type);
|
|
|
//其他信息
|
|
|
- cfgYf.setFirmName(iotFirmdevResVo.getFirmName());
|
|
|
- cfgYf.setDeviceType(DevType.valueOfCode(iotFirmdevResVo.getDevtypeBid()));
|
|
|
- cfgYf.setDeviceTypeBizId(iotFirmdevResVo.getDevtypeBid());
|
|
|
+ cfgYf.setFirmName(tosDeviceType.getFirmName());
|
|
|
+ cfgYf.setDeviceType(DevType.valueOfCode(tosDeviceType.getDevtypeBid()));
|
|
|
+ cfgYf.setDeviceTypeBizId(tosDeviceType.getDevtypeBid());
|
|
|
|
|
|
cfgYf.setIp(jsonConfig.getString("ip"));
|
|
|
cfgYf.setPort(jsonConfig.getString("port"));
|
|
|
@@ -193,27 +191,18 @@ public class MqttManager {
|
|
|
|
|
|
|
|
|
/**
|
|
|
- * 添加私有的链接:自行修改入参,链接的id生成
|
|
|
- * @param connectionId
|
|
|
- * @param mqttCore
|
|
|
- */
|
|
|
- private void addPrivateConnectionMap(String connectionId, MqttCore mqttCore) {
|
|
|
- putConnection(connectionId,mqttCore);
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 添加公有的链接:自行修改入参,链接的id生成
|
|
|
+ * 添加链接:自行修改入参,链接的id生成
|
|
|
+ *
|
|
|
* @param connectionId
|
|
|
* @param mqttCore
|
|
|
*/
|
|
|
- private void addCommonConnectionMap(String connectionId, MqttCore mqttCore) {
|
|
|
- putConnection(connectionId,mqttCore);
|
|
|
-
|
|
|
+ private void addConnectionMap(String connectionId, MqttCore mqttCore) {
|
|
|
+ putConnection(connectionId, mqttCore);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 维护链接,connectionId => mqttCore
|
|
|
+ *
|
|
|
* @param connectionId
|
|
|
* @param mqttCore
|
|
|
*/
|
|
|
@@ -236,6 +225,7 @@ public class MqttManager {
|
|
|
/**
|
|
|
* 获取Publisher 根据服务名称
|
|
|
* 之前对外暴漏publish,现在将其包装,直接提供发布方法
|
|
|
+ *
|
|
|
* @param connectionId
|
|
|
* @return
|
|
|
*/
|
|
|
@@ -252,6 +242,7 @@ public class MqttManager {
|
|
|
/**
|
|
|
* 【操作-发布消息】
|
|
|
* 发布消息
|
|
|
+ *
|
|
|
* @param connectionId 链接id
|
|
|
* @param topic
|
|
|
* @param message
|
|
|
@@ -346,7 +337,6 @@ public class MqttManager {
|
|
|
}
|
|
|
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
* 根据connectionId 获取mqttcore
|
|
|
*
|
|
|
@@ -376,6 +366,7 @@ public class MqttManager {
|
|
|
* 如此分类的原因
|
|
|
* 1. 相同connectionId下serviceName不一定相同
|
|
|
* 2. 相同的serviceName的connectionId不一定相同
|
|
|
+ *
|
|
|
* @param connectionId
|
|
|
* @param serviceName
|
|
|
* @param deviceIds
|
|
|
@@ -407,6 +398,7 @@ public class MqttManager {
|
|
|
/**
|
|
|
* 【订阅-单个订阅】
|
|
|
* 单个设备订阅
|
|
|
+ *
|
|
|
* @param connectionId
|
|
|
* @param serviceName
|
|
|
* @param deviceId
|
|
|
@@ -428,6 +420,7 @@ public class MqttManager {
|
|
|
|
|
|
/**
|
|
|
* 【订阅-批量取消订阅】
|
|
|
+ *
|
|
|
* @param connectionId
|
|
|
* @param serviceName
|
|
|
* @param deviceIds
|
|
|
@@ -458,6 +451,7 @@ public class MqttManager {
|
|
|
|
|
|
/**
|
|
|
* 【订阅-单个设备取消订阅】
|
|
|
+ *
|
|
|
* @param connectionId
|
|
|
* @param serviceName
|
|
|
* @param deviceId
|
|
|
@@ -481,46 +475,43 @@ public class MqttManager {
|
|
|
|
|
|
|
|
|
/**
|
|
|
- * @deprecated
|
|
|
- * 逻辑需要基于connectionId梳理
|
|
|
- *
|
|
|
+ * @deprecated 逻辑需要基于connectionId梳理
|
|
|
+ * <p>
|
|
|
* 两个小时内没有产生更新便认为是设备订阅失效,重新订阅
|
|
|
*/
|
|
|
@Deprecated
|
|
|
public void reSubscribe() {
|
|
|
throwDeprecatedMethod("重新订阅逻辑不完善,需要梳理构建新的逻辑");
|
|
|
- IotFirmdev iotFirmdev = new IotFirmdev();
|
|
|
- List<IotFirmdevResVo> iotFirmdevResVos = iotFirmdevService.selectIotFirmdevList(iotFirmdev);
|
|
|
- for (IotFirmdevResVo iotFirmdevResVo : iotFirmdevResVos) {
|
|
|
- try {
|
|
|
- JSONObject jsonConfig = JSONObject.parseObject(iotFirmdevResVo.getFirmdevCfg());
|
|
|
- String type = jsonConfig.getString("type");
|
|
|
- if (!Objects.equals(type, "mqtt")) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- startSubscribe(iotFirmdevResVo, jsonConfig);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("【设备重新订阅】【订阅】【重连】 解析配置文件错误: \n" + iotFirmdevResVo.getFirmdevCfg() + "\n" + e);
|
|
|
- }
|
|
|
- }
|
|
|
+// IotFirmdev iotFirmdev = new IotFirmdev();
|
|
|
+// List<IotFirmdevResVo> iotFirmdevResVos = iotFirmdevService.selectIotFirmdevList(iotFirmdev);
|
|
|
+// for (IotFirmdevResVo tosDeviceType : iotFirmdevResVos) {
|
|
|
+// try {
|
|
|
+// JSONObject jsonConfig = JSONObject.parseObject(tosDeviceType.getFirmdevCfg());
|
|
|
+// String type = jsonConfig.getString("type");
|
|
|
+// if (!Objects.equals(type, "mqtt")) {
|
|
|
+// continue;
|
|
|
+// }
|
|
|
+// startSubscribe(tosDeviceType, jsonConfig);
|
|
|
+// } catch (Exception e) {
|
|
|
+// log.error("【设备重新订阅】【订阅】【重连】 解析配置文件错误: \n" + tosDeviceType.getFirmdevCfg() + "\n" + e);
|
|
|
+// }
|
|
|
+// }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * @deprecated
|
|
|
- * 逻辑需要基于connectionId梳理
|
|
|
- * 两个小时内没有产生更新便认为是设备订阅失效,重新订阅
|
|
|
- *
|
|
|
- * @param iotFirmdevResVo
|
|
|
+ * @param tosDeviceType
|
|
|
* @param jsonConfig
|
|
|
+ * @deprecated 逻辑需要基于connectionId梳理
|
|
|
+ * 两个小时内没有产生更新便认为是设备订阅失效,重新订阅
|
|
|
*/
|
|
|
@Deprecated
|
|
|
- private void startSubscribe(IotFirmdevResVo iotFirmdevResVo, JSONObject jsonConfig) {
|
|
|
- log.info("【设备重新订阅】【订阅】【重连】:设备:{} {}", iotFirmdevResVo, jsonConfig);
|
|
|
- MqttConfig cfgYf = buildMqttConfig(iotFirmdevResVo, jsonConfig);
|
|
|
+ private void startSubscribe(TosDevicetypeResVo tosDeviceType, JSONObject jsonConfig) {
|
|
|
+ log.info("【设备重新订阅】【订阅】【重连】:设备:{} {}", tosDeviceType, jsonConfig);
|
|
|
+ MqttConfig cfgYf = buildMqttConfig(tosDeviceType, jsonConfig);
|
|
|
String firmBizId = cfgYf.getFirmBizId();
|
|
|
String serviceName = cfgYf.getServiceName();
|
|
|
String deviceTypeId = cfgYf.getDeviceTypeBizId();
|
|
|
- String connectionId="获取链接id";
|
|
|
+ String connectionId = "获取链接id";
|
|
|
long nowTime = DateUtils.getNowDate().getTime() - (2 * 60 * 60 * 1000);
|
|
|
String devUpdateddate = DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS, new Date(nowTime));
|
|
|
|
|
|
@@ -546,7 +537,7 @@ public class MqttManager {
|
|
|
try {
|
|
|
MqttClient mqttClient = mqttCore.getClient();
|
|
|
if (!mqttClient.isConnected()) {
|
|
|
- buildMqttConnection(iotFirmdevResVo,jsonConfig);
|
|
|
+ buildMqttConnection(tosDeviceType, jsonConfig);
|
|
|
} else {
|
|
|
mqttCore.unsubscribe(topics);
|
|
|
mqttCore.subscribe(topics);
|