|
|
@@ -1,13 +1,13 @@
|
|
|
-package com.yunfeiyun.agmp.iots.core.mqtt;
|
|
|
+package com.yunfeiyun.agmp.iots.core.manager;
|
|
|
|
|
|
import com.alibaba.fastjson2.JSONObject;
|
|
|
-import com.yunfeiyun.agmp.common.framework.web.domain.server.Sys;
|
|
|
import com.yunfeiyun.agmp.common.utils.DateUtils;
|
|
|
import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
|
|
|
import com.yunfeiyun.agmp.iot.common.domain.IotFirmdev;
|
|
|
import com.yunfeiyun.agmp.iot.common.domain.resvo.IotFirmdevResVo;
|
|
|
import com.yunfeiyun.agmp.iot.common.enums.DevType;
|
|
|
-import com.yunfeiyun.agmp.iots.core.http.HttpManager;
|
|
|
+import com.yunfeiyun.agmp.iots.core.manager.HttpManager;
|
|
|
+import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
|
|
|
import com.yunfeiyun.agmp.iots.device.common.Device;
|
|
|
import com.yunfeiyun.agmp.iot.common.constant.IotErrorCode;
|
|
|
import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
|
|
|
@@ -40,21 +40,13 @@ public class MqttManager {
|
|
|
* 实现类名称-->mqtt
|
|
|
*/
|
|
|
private Map<String, MqttCore> serviceMqttMap = new HashMap<>();
|
|
|
- /**
|
|
|
- * 厂家+设备类型--> mqtt
|
|
|
- */
|
|
|
- private Map<String, MqttCore> firmDevTypeMqttMap = new HashMap<>();
|
|
|
+
|
|
|
/**
|
|
|
* spring 自动注入
|
|
|
*/
|
|
|
@Resource
|
|
|
private Map<String, Device> deviceHandlerMap;
|
|
|
|
|
|
- /**
|
|
|
- * 厂家 -> 实现类
|
|
|
- */
|
|
|
- private Map<String, Map<String, MqttCore>> firmDeviceTypeMap = new HashMap<>();
|
|
|
-
|
|
|
@Resource
|
|
|
private CmdResultCheckService cmdResultCheckService;
|
|
|
|
|
|
@@ -72,65 +64,13 @@ public class MqttManager {
|
|
|
private HttpManager httpManager;
|
|
|
|
|
|
/**
|
|
|
- * 加载数据库设备配置
|
|
|
- *
|
|
|
- * @throws MqttException
|
|
|
- */
|
|
|
- @PostConstruct
|
|
|
- public void init() throws MqttException {
|
|
|
- log.info("【初始化】协议");
|
|
|
- IotFirmdev iotFirmdev = new IotFirmdev();
|
|
|
- List<IotFirmdevResVo> iotFirmdevResVos = iotFirmdevService.selectIotFirmdevList(iotFirmdev);
|
|
|
- for (IotFirmdevResVo iotFirmdevResVo : iotFirmdevResVos) {
|
|
|
- log.info("【初始化】【开始】协议加载,厂家:{},类型:{} ,配置:{}", iotFirmdevResVo.getFirmName(), iotFirmdevResVo.getDevtypeBid(), iotFirmdevResVo.getFirmdevCfg());
|
|
|
- if (TextUtils.isEmpty(iotFirmdevResVo.getFirmdevCfg())) {
|
|
|
- log.error("【初始化】协议加载, 对接配置 empty, {}", iotFirmdevResVo);
|
|
|
- continue;
|
|
|
- }
|
|
|
- //将配置信息转换成jsonObject
|
|
|
- JSONObject jsonConfig = null;
|
|
|
- try {
|
|
|
- jsonConfig = JSONObject.parseObject(iotFirmdevResVo.getFirmdevCfg());
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("【初始化】解析配置文件错误: \n" + iotFirmdevResVo.getFirmdevCfg() + "\n" + e);
|
|
|
- }
|
|
|
-
|
|
|
- if (jsonConfig != null) {
|
|
|
- String type = jsonConfig.getString("type");
|
|
|
- if (TextUtils.isEmpty(type)) {
|
|
|
- continue;
|
|
|
- }
|
|
|
- if (type.equals("mqtt") || type.equals("modbus-tcp")) {
|
|
|
- try {
|
|
|
- initConfig(iotFirmdevResVo, jsonConfig);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("【初始化】加载配置文件错误: \n" + jsonConfig + "\n" + e);
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- } else if (type.equals("http")) {
|
|
|
- initHttp(iotFirmdevResVo, jsonConfig);
|
|
|
- } else {
|
|
|
- log.info("【初始化】其它类型,跳过");
|
|
|
- continue;
|
|
|
- }
|
|
|
- } else {
|
|
|
- log.info("【初始化】协议加载,厂家:{},失败:配置为空", iotFirmdevResVo.getFirmName());
|
|
|
- }
|
|
|
- log.info("【初始化】【完成】协议加载,厂家:{},类型:{} ,", iotFirmdevResVo.getFirmName(), iotFirmdevResVo.getDevtypeBid());
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
- /**
|
|
|
* 根据厂家配置构建MqttCore
|
|
|
*
|
|
|
* @param iotFirmdevResVo
|
|
|
* @param jsonConfig
|
|
|
* @throws MqttException
|
|
|
*/
|
|
|
- void initConfig(IotFirmdevResVo iotFirmdevResVo, JSONObject jsonConfig) throws MqttException {
|
|
|
+ public void initConfig(IotFirmdevResVo iotFirmdevResVo, JSONObject jsonConfig) throws MqttException {
|
|
|
MqttConfig cfgYf = getMqttConfig(iotFirmdevResVo, jsonConfig);
|
|
|
String firmBizId = cfgYf.getFirmBizId();
|
|
|
String serviceName = cfgYf.getServiceName();
|
|
|
@@ -139,42 +79,19 @@ public class MqttManager {
|
|
|
String deviceTypeId = cfgYf.getDeviceTypeBizId();
|
|
|
log.info("【初始化厂商 配置 】 {} {} 开始 ##########################", firmName, type);
|
|
|
MqttCore mqttCore = new MqttCore();
|
|
|
- if (type.equals("mqtt")) {
|
|
|
- String[] topics = deviceTopicService.getBatchTopic(serviceName, getDeviceIdByFirmBizId(firmBizId, deviceTypeId));
|
|
|
- cfgYf.setSubTopic(topics);
|
|
|
- cfgYf.setServiceName(serviceName);
|
|
|
- MqttCore oldMqttCore = null;
|
|
|
- try{
|
|
|
- oldMqttCore = getMqttCoreByService(serviceName);
|
|
|
- if(Objects.equals(serviceName, "XPH_SERVICE")){
|
|
|
- mqttCore = oldMqttCore;
|
|
|
- }
|
|
|
- }catch (Exception ignored){
|
|
|
-
|
|
|
- }
|
|
|
-// if(oldMqttCore != null && Objects.equals(serviceName, "XPH_SERVICE")){
|
|
|
-// mqttCore = oldMqttCore;
|
|
|
-// }else{
|
|
|
-// mqttCore.buildMqttCore(cfgYf, cmdResultCheckService);
|
|
|
-// }
|
|
|
- log.info("【mqtt:{} 】 {} ", cfgYf.getDeviceType(), firmBizId);
|
|
|
- log.info("【初始化厂商 加载 配置】 {} {} {} ", firmName, type, topics);
|
|
|
-
|
|
|
- addFirmDeviceTypeMap(firmBizId, serviceName, deviceTypeId, mqttCore);
|
|
|
-
|
|
|
- if(oldMqttCore != null && Objects.equals(serviceName, "XPH_SERVICE")){
|
|
|
- subscribeByBatch(topics, mqttCore);
|
|
|
- }else{
|
|
|
- mqttCore.buildMqttCore(cfgYf, cmdResultCheckService);
|
|
|
- }
|
|
|
-
|
|
|
- } else {
|
|
|
- mqttCore.setMqttConfig(cfgYf);
|
|
|
- addFirmDeviceTypeMap(firmBizId, serviceName, deviceTypeId, mqttCore);
|
|
|
+ String[] topics = deviceTopicService.getBatchTopic(serviceName, getDeviceIdByFirmBizId(firmBizId, deviceTypeId));
|
|
|
+ cfgYf.setSubTopic(topics);
|
|
|
+ cfgYf.setServiceName(serviceName);
|
|
|
+ log.info("【mqtt:{} 】 {} ", cfgYf.getDeviceType(), firmBizId);
|
|
|
+ log.info("【初始化厂商 加载 配置】 {} {} {} ", firmName, type, topics);
|
|
|
+ //这里从配置获取到一个链接的唯一标识,内部处理复用问题,目前来看,链接id只要相同就实现复用链接
|
|
|
+ if("私有".equals("查询字段")){
|
|
|
+ addPrivateConnectionMap("connectionId", mqttCore);
|
|
|
+ }else{
|
|
|
+ addConmonConnectionMap("connectionId", mqttCore);
|
|
|
}
|
|
|
-
|
|
|
- // addFirmDeviceTypeMap(firmBizId, serviceName, deviceTypeId, mqttCore);
|
|
|
-
|
|
|
+ // 这个checker待梳理
|
|
|
+ mqttCore.buildMqttCore(cfgYf, cmdResultCheckService);
|
|
|
log.info("【初始化厂商 配置 】 {} {} 完成 ##########################", firmName, type);
|
|
|
}
|
|
|
|
|
|
@@ -304,46 +221,40 @@ public class MqttManager {
|
|
|
return mqttCore;
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
- * 根据厂家id和设备devtypeBid找到mqtt
|
|
|
- *
|
|
|
- * @param firmId
|
|
|
- * @param devtypeBid
|
|
|
- * @return
|
|
|
+ * 添加私有的链接:自行修改入参,链接的id生成
|
|
|
+ * @param connectionId
|
|
|
+ * @param mqttCore
|
|
|
*/
|
|
|
- public MqttCore getMqttCoreByFirmAndDevType(String firmId, String devtypeBid) {
|
|
|
- return firmDevTypeMqttMap.get(firmId + "-" + devtypeBid);
|
|
|
+ public void addPrivateConnectionMap(String connectionId, MqttCore mqttCore) {
|
|
|
+ putConnection(connectionId,mqttCore);
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * 将厂家、设备类型与mqtt core 映射
|
|
|
- *
|
|
|
- * @param firmBizId
|
|
|
- * @param serviceName
|
|
|
- * @param deviceTypeId
|
|
|
+ * 添加公有的链接:自行修改入参,链接的id生成
|
|
|
+ * @param connectionId
|
|
|
* @param mqttCore
|
|
|
*/
|
|
|
- public void addFirmDeviceTypeMap(String firmBizId, String serviceName, String deviceTypeId, MqttCore mqttCore) {
|
|
|
- if (!firmDeviceTypeMap.containsKey(firmBizId)) {
|
|
|
- firmDeviceTypeMap.put(firmBizId, new HashMap<>());
|
|
|
- }
|
|
|
- Map<String, MqttCore> mqttCoreMap = firmDeviceTypeMap.get(firmBizId);
|
|
|
- if (!mqttCoreMap.containsKey(serviceName)) {
|
|
|
- // 维护 厂家--> List<mqtt>
|
|
|
- mqttCoreMap.put(serviceName, mqttCore);
|
|
|
- }
|
|
|
+ public void addConmonConnectionMap(String connectionId, MqttCore mqttCore) {
|
|
|
+ putConnection(connectionId,mqttCore);
|
|
|
+
|
|
|
+ }
|
|
|
|
|
|
- // 维护 厂家+设备类型--> mqtt
|
|
|
- firmDevTypeMqttMap.put(firmBizId + "-" + deviceTypeId, mqttCore);
|
|
|
- if (!serviceMqttMap.containsKey(serviceName)) {
|
|
|
+ /**
|
|
|
+ * 放链接
|
|
|
+ * @param connectionId
|
|
|
+ * @param mqttCore
|
|
|
+ */
|
|
|
+ void putConnection(String connectionId, MqttCore mqttCore){
|
|
|
+ if (!serviceMqttMap.containsKey(connectionId)) {
|
|
|
// 维护 服务名字--> mqtt
|
|
|
- serviceMqttMap.put(serviceName, mqttCore);
|
|
|
+ serviceMqttMap.put(connectionId, mqttCore);
|
|
|
+ } else {
|
|
|
+ log.info("【MQTT】重复配置 connectionId:{} ", connectionId);
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
-
|
|
|
/**
|
|
|
* 根据serviceName 获取mqttcore
|
|
|
*
|
|
|
@@ -351,21 +262,15 @@ public class MqttManager {
|
|
|
* @return
|
|
|
*/
|
|
|
public MqttCore getMqttCore(String serviceName) {
|
|
|
- for (Map<String, MqttCore> serviceMap : firmDeviceTypeMap.values()) {
|
|
|
- if (serviceMap.containsKey(serviceName)) {
|
|
|
- return serviceMap.get(serviceName);
|
|
|
- }
|
|
|
- }
|
|
|
- return null;
|
|
|
+ return serviceMqttMap.get(serviceName);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 打印已经加载的配置信息
|
|
|
*/
|
|
|
public void showConfig() {
|
|
|
- for (String serviceName : firmDevTypeMqttMap.keySet()) {
|
|
|
- MqttCore mqttCore = firmDevTypeMqttMap.get(serviceName);
|
|
|
- log.info("【MQTT】已经记载的配置 FirmBizName:{} ,FirmBizId-devType:{}, ServiceType:{}, ServiceName:{} ,SubTopic:{}", mqttCore.getFirmName(), serviceName, mqttCore.getServiceType(), mqttCore.getSubTopic());
|
|
|
+ for (MqttCore mqttCore : serviceMqttMap.values()) {
|
|
|
+ log.info("【MQTT】已经记载的配置 FirmBizName:{} ,FirmBizId-devType:{}, ServiceType:{}, ServiceName:{} ,SubTopic:{}", mqttCore.getFirmName(), mqttCore.getServiceName(), mqttCore.getServiceType(), mqttCore.getSubTopic());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -410,6 +315,7 @@ public class MqttManager {
|
|
|
|
|
|
/**
|
|
|
* 两个小时内没有产生更新便认为是设备订阅失效,重新订阅
|
|
|
+ *
|
|
|
* @param iotFirmdevResVo
|
|
|
* @param jsonConfig
|
|
|
*/
|
|
|
@@ -444,16 +350,16 @@ public class MqttManager {
|
|
|
MqttCore mqttCore = getMqttCoreByService(serviceName);
|
|
|
try {
|
|
|
MqttClient mqttClient = mqttCore.getClient();
|
|
|
- if(!mqttClient.isConnected()){
|
|
|
+ if (!mqttClient.isConnected()) {
|
|
|
mqttCore.buildMqttCore(cfgYf, cmdResultCheckService);
|
|
|
- }else{
|
|
|
- for(String topic : topics){
|
|
|
+ } else {
|
|
|
+ for (String topic : topics) {
|
|
|
mqttClient.unsubscribe(topic);
|
|
|
mqttClient.subscribe(topic);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- }catch (Exception e){
|
|
|
+ } catch (Exception e) {
|
|
|
log.error("【设备重新订阅】【订阅】【重连】:设备:{} {} ", topics, mqttCore.getServiceType());
|
|
|
}
|
|
|
}
|
|
|
@@ -468,7 +374,7 @@ public class MqttManager {
|
|
|
try {
|
|
|
JSONObject jsonConfig = JSONObject.parseObject(iotFirmdevResVo.getFirmdevCfg());
|
|
|
String type = jsonConfig.getString("type");
|
|
|
- if(!Objects.equals(type, "mqtt")){
|
|
|
+ if (!Objects.equals(type, "mqtt")) {
|
|
|
continue;
|
|
|
}
|
|
|
startSubscribe(iotFirmdevResVo, jsonConfig);
|