Bladeren bron

调整Mqtt消息处理逻辑,新增处理线程

liuyaowen 1 jaar geleden
bovenliggende
commit
beaabfe3b2

+ 2 - 0
src/main/java/com/yunfeiyun/agmp/iots/AgmpIotsApplication.java

@@ -7,6 +7,7 @@ import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
 import org.springframework.context.annotation.ComponentScan;
+import org.springframework.scheduling.annotation.EnableAsync;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
 /**
@@ -17,6 +18,7 @@ import org.springframework.scheduling.annotation.EnableScheduling;
 @EnableScheduling
 @SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
 @ComponentScan({"com.yunfeiyun"})
+@EnableAsync
 @MapperScan({"com.yunfeiyun.agmp.**.mapper"})
 public class AgmpIotsApplication {
     private static final Logger log = LoggerFactory.getLogger(AgmpIotsApplication.class);

+ 12 - 0
src/main/java/com/yunfeiyun/agmp/iots/config/CmdAsyncTaskConfig.java

@@ -43,4 +43,16 @@ public class CmdAsyncTaskConfig {
         threadPoolTaskExecutor.initialize();
         return threadPoolTaskExecutor;
     }
+    @Bean("MqttTopicMessageHandlerExecutor")
+    public ThreadPoolTaskExecutor MqttTopicMessageHandlerExecutor() {
+        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
+        threadPoolTaskExecutor.setThreadNamePrefix("MqttTopicMessageHandlerExecutor-");//线程前缀
+        threadPoolTaskExecutor.setCorePoolSize( 2);//核心线程数
+        threadPoolTaskExecutor.setMaxPoolSize(4);//最大线程数
+        threadPoolTaskExecutor.setQueueCapacity(50);//等待队列
+        threadPoolTaskExecutor.setKeepAliveSeconds(30);//线程池维护线程所允许的空闲时间,单位为秒
+        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 线程池对拒绝任务(无线程可用)的处理策略
+        threadPoolTaskExecutor.initialize();
+        return threadPoolTaskExecutor;
+    }
 }

+ 2 - 2
src/main/java/com/yunfeiyun/agmp/iots/core/cmd/core/task/CmdTaskService.java

@@ -429,7 +429,7 @@ public class CmdTaskService {
         taskResult.setCompleteDate(DateUtils.dateTimeNow());
         //TODO
         // iotsMqService.sendTaskResultMsg(taskResult);
-        //TestConst.printError("!!!!!!!!!!【看这里:临时测试】需要完善mq发送回执消息");
+        //  TestConst.printError("!!!!!!!!!!【看这里:临时测试】需要完善mq发送回执消息");
 
     }
 
@@ -473,7 +473,7 @@ public class CmdTaskService {
         taskResult.setExtraBody(ext);
         // TODO
         // iotsMqService.sendTaskResultMsg(taskResult);
-        //TestConst.printError("!!!!!!!!!!【看这里:临时测试】需要完善mq发送回执消息");
+        // TestConst.printError("!!!!!!!!!!【看这里:临时测试】需要完善mq发送回执消息");
     }
 
     public static long getTimeDifference(String datetime1, String datetime2) {

+ 20 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/manager/ConnectionManager.java

@@ -1,5 +1,6 @@
 package com.yunfeiyun.agmp.iots.core.manager;
 
+import com.alibaba.fastjson2.JSON;
 import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.yunfeiyun.agmp.common.utils.StringUtils;
@@ -7,6 +8,7 @@ import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictEnum;
 import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceTypeLv1Enum;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
+import com.yunfeiyun.agmp.iot.common.domain.TosDevicetype;
 import com.yunfeiyun.agmp.iot.common.model.mq.IotDeviceEditMqModel;
 import com.yunfeiyun.agmp.iot.common.service.DeviceconnCacheService;
 import com.yunfeiyun.agmp.iot.common.service.TypeCacheService;
@@ -352,5 +354,23 @@ public class ConnectionManager {
         httpManager.deviceCreateHandle(iotDevice);
     }
 
+    public void updateCommonManager(TosDevicetype tosDevicetype) {
+        List<IotDeviceconnResVo> iotDeviceconnResVos = businessCoreService.selectDevConnResVoListByDevTypeBid(tosDevicetype.getDevtypeBid());
+        // 中断设备连接
+        for(IotDeviceconnResVo iotDeviceconnResVo : iotDeviceconnResVos){
+            // 构建topic
+            List<IotDevice> devices = deviceTopicService.getDevicesByDevConnBid(iotDeviceconnResVo.getDevconnBid());
+            for(IotDevice iotDevice : devices){
+                // 停止设备订阅
+                deleteDeviceHandle(iotDevice);
+            }
+            // 释放连接缓存
+            deleteIotDeviceconnHandle(iotDeviceconnResVo);
+        }
+        // 创建新连接
+        for(IotDeviceconnResVo iotDeviceconnResVo : iotDeviceconnResVos){
+            createIotDeviceconnHandle(iotDeviceconnResVo);
+        }
+    }
 }
 

+ 6 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/manager/HttpManager.java

@@ -1,8 +1,10 @@
 package com.yunfeiyun.agmp.iots.core.manager;
 
+import com.alibaba.fastjson2.JSONArray;
 import com.alibaba.fastjson2.JSONObject;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
+import com.yunfeiyun.agmp.iot.common.domain.TosDevicetype;
 import com.yunfeiyun.agmp.iot.common.enums.IotDeviceconnTypeEnum;
 import com.yunfeiyun.agmp.iot.common.service.DeviceconnCacheService;
 import com.yunfeiyun.agmp.iot.common.service.TypeCacheService;
@@ -115,4 +117,8 @@ public class HttpManager {
     public void deleteHttpConnection(String devconnBid) {
         privateHttpClientByConnBid.remove(devconnBid);
     }
+
+    public void updateCommonConnection(TosDevicetype tosDevicetype, JSONArray jsonConfig) {
+
+    }
 }

+ 7 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -10,6 +10,7 @@ import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictEnum;
 import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceTypeLv1Enum;
 import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.domain.IotDeviceconn;
+import com.yunfeiyun.agmp.iot.common.domain.TosDevicetype;
 import com.yunfeiyun.agmp.iot.common.enums.IotDeviceconnTypeEnum;
 import com.yunfeiyun.agmp.iot.common.exception.IotBizException;
 import com.yunfeiyun.agmp.iot.common.service.DeviceconnCacheService;
@@ -128,6 +129,8 @@ public class MqttManager {
                 mqttCore.buildMqttCore(cfgYf);
                 addConnectionMap(connectionId, mqttCore);
             }
+            log.info("【开始构建MQTT连接】构建完成 devconnId:{} ,devconnName: {}, tosDeviceTypeName:{}, jsonConfig: {}", iotDeviceconnResVo.getDevconnBid(), iotDeviceconnResVo.getDevconnName(), iotDeviceconnResVo.getDevtypeBid(), jsonConfig);
+
         } catch (Exception e) {
             log.error("【构建MqttCore失败】 异常信息: {} ,{}", e.getMessage(), e);
         }
@@ -550,4 +553,8 @@ public class MqttManager {
     }
 
 
+    public void updateCommonConnection(TosDevicetype tosDevicetype, JSONArray jsonConfig) {
+
+
+    }
 }

+ 31 - 25
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttSubscriber.java

@@ -9,8 +9,12 @@ import com.yunfeiyun.agmp.iots.core.cmd.checker.CmdResultCheckService;
 import org.eclipse.paho.client.mqttv3.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
 
 public class MqttSubscriber implements MqttCallbackExtended {
 
@@ -19,11 +23,13 @@ public class MqttSubscriber implements MqttCallbackExtended {
     private MqttClient mqttClient;
 
     private MqttCore mqttCore;
+    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
 
 
     public void init(MqttCore mqttCore) throws MqttException {
         this.mqttCore = mqttCore;
         this.mqttClient = mqttCore.getClient();
+        this.threadPoolTaskExecutor = SpringUtils.getBean("MqttTopicMessageHandlerExecutor");
         mqttClient.setCallback(this);
     }
 
@@ -39,34 +45,34 @@ public class MqttSubscriber implements MqttCallbackExtended {
      */
     @Override
     public void messageArrived(String topic, MqttMessage mqttMessage) {
-        try {
-
-            String msgContent = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
-            log.info("【上报数据:收到】收到mqtt消息:" + topic + ", " + msgContent);
-            JSONObject obj = null;
+        CompletableFuture.runAsync(() -> {
             try {
-                obj = JSON.parseObject(msgContent);
+                String msgContent = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
+                log.info("【上报数据:收到】收到mqtt消息:" + topic + ", " + msgContent);
+                JSONObject obj = null;
+                try {
+                    obj = JSON.parseObject(msgContent);
+                } catch (Exception e) {
+                    log.error("【上报数据:解析异常】收到mqtt消息:" + topic + ", " + msgContent);
+                    return;
+                }
+                Device device = mqttCore.getDevice(topic);
+                if (null == device) {
+                    // 当收到mqtt消息无对应的设备类型时,则丢弃消息。
+                    IotException.UNKNOWN_DEVICE.throwException();
+                }
+                Object result = device.receiveData(topic, obj, mqttCore.getConnectionId());
+                log.info("【上报数据:处理结果】{}", result);
+                //如果是“设备属性”消息,进行执行结果检查
+                if (device.isDeviceProps(obj)) {
+                    SpringUtils.getBean(CmdResultCheckService.class).beginCheck(device.findIotDevice(topic, obj, mqttCore.getConnectionId()), obj);
+                } else {
+                    log.error("其它数据");
+                }
             } catch (Exception e) {
-                log.error("【上报数据:解析异常】收到mqtt消息:" + topic + ", " + msgContent);
-                return;
-            }
-
-            Device device = mqttCore.getDevice(topic);
-            if (null == device) {
-                // 当收到mqtt消息无对应的设备类型时,则丢弃消息。
-                IotException.UNKNOWN_DEVICE.throwException();
+                log.error("【接收上报】异常{}", e);
             }
-            Object result = device.receiveData(topic, obj, mqttCore.getConnectionId());
-            log.info("【上报数据:处理结果】{}", result);
-            //如果是“设备属性”消息,进行执行结果检查
-            if (device.isDeviceProps(obj)) {
-                SpringUtils.getBean(CmdResultCheckService.class).beginCheck(device.findIotDevice(topic, obj, mqttCore.getConnectionId()), obj);
-            } else {
-                log.error("其它数据");
-            }
-        } catch (Exception e) {
-            log.error("【接收上报】异常{}", e);
-        }
+        },threadPoolTaskExecutor);
     }
 
 

+ 1 - 0
src/main/java/com/yunfeiyun/agmp/iots/device/mapper/BusinessCoreMapper.java

@@ -12,4 +12,5 @@ public interface BusinessCoreMapper {
      */
     List<IotDeviceconnResVo> selectDevConnResVoList();
 
+    List<IotDeviceconnResVo> selectDevConnResVoListByDevTypeBid(String devTypeBid);
 }

+ 4 - 0
src/main/java/com/yunfeiyun/agmp/iots/mq/listener/IotmBaseDataChannelAwareMessageListener.java

@@ -87,6 +87,10 @@ public class IotmBaseDataChannelAwareMessageListener implements ChannelAwareMess
                     case DEVICE_CONN_DELETE:
                         connectionManager.deleteIotDeviceconnHandle(synGlobalTenantInfoDto.getData().to(IotDeviceconn.class));
                         break;
+                    //通用连接信息更新
+                    case DEVICE_COMMON_CONN_UPDATE:
+                        connectionManager.updateCommonManager(synGlobalTenantInfoDto.getData().to(TosDevicetype.class));
+                        break;
                     //更新所有设备信息
                     case DEVICE_ALL_SYN:
                         Device device = SpringUtils.getBean(typeCacheService.getServiceNameByDevTypeCode(synGlobalTenantInfoDto.getData().getString("devTypeCode")));

+ 6 - 0
src/main/java/com/yunfeiyun/agmp/iots/service/BusinessCoreService.java

@@ -24,5 +24,11 @@ public class BusinessCoreService {
     public List<IotDeviceconnResVo> selectDevConnResVoList() {
         return businessCoreMapper.selectDevConnResVoList();
     }
+    /**
+     * 获取设备型号的信息
+     * */
+    public List<IotDeviceconnResVo> selectDevConnResVoListByDevTypeBid(String devTypeBid){
+        return businessCoreMapper.selectDevConnResVoListByDevTypeBid(devTypeBid);
+    }
 
 }

+ 15 - 0
src/main/resources/mapper/BusinessCoreMapper.xml

@@ -21,5 +21,20 @@
 
     </select>
 
+    <select id="selectDevConnResVoListByDevTypeBid"
+            resultType="com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo">
+        SELECT
+        ic.*,
+        it.devtypeName,
+        it.devtypeCode ,
+        ifd.firmName,
+        ifd.firmBid
+        from
+        IotDeviceconn ic
+        LEFT JOIN TosDevicetype it on ic.devtypeBid = it.devtypeBid
+        LEFT JOIN TosFirm ifd on ifd.firmBid=it.firmBid
+        where ic.devtypeBid = #{devtypeBid}
+    </select>
+
 
 </mapper>