Просмотр исходного кода

阶段提交:配置加载,注销tcp

yf_zn 1 год назад
Родитель
Сommit
0badb94f2d

+ 3 - 1
src/main/java/com/yunfeiyun/agmp/iots/common/modal/IotDeviceconnResVo.java

@@ -14,8 +14,10 @@ import lombok.Data;
 @Data
 public class IotDeviceconnResVo extends IotDeviceconn
 {
-    private String devclassName;
     private String firmName;
     private String devNum;
     private String firmBid;
+    private String devtypeName;
+    private String devtypeCode;
+    private String devClassName;
 }

+ 19 - 7
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -19,6 +19,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
@@ -78,7 +79,7 @@ public class MqttManager {
      * @throws MqttException
      */
     public void buildMqttConnection(IotDeviceconnResVo iotDeviceconnResVo, JSONObject jsonConfig) throws MqttException {
-        log.info("【开始构建MQTT连接】 tosDeviceTypeName: {}, tosDeviceTypeId:{}, jsonConfig: {}", iotDeviceconnResVo.getDevclassName(), iotDeviceconnResVo.getDevtypeBid(), jsonConfig);
+        log.info("【开始构建MQTT连接】 tosDeviceTypeName: {}, tosDeviceTypeId:{}, jsonConfig: {}", iotDeviceconnResVo.getDevtypeName(), iotDeviceconnResVo.getDevtypeBid(), jsonConfig);
 
         // 构建配置
         MqttConfig cfgYf = buildMqttConfig(iotDeviceconnResVo, jsonConfig);
@@ -99,15 +100,16 @@ public class MqttManager {
         log.info("【创建MqttCore实例】 mqttCore: {}", mqttCore);
 
         // 查询topics【需要实现:重新更改获取该型号下的设备】
-        //String[] topics = deviceTopicService.getBatchTopic(serviceName, deviceTopicService.getDeviceIdByFirmBizId(firmBizId, deviceTypeId));
-        String[] topics = deviceTopicService.getBatchTopic(serviceName, new String[]{"", "", ""});
+        String connectionId=iotDeviceconnResVo.getDevconnBid();
+        String[] topics = deviceTopicService.getBatchTopic(serviceName, deviceTopicService.getDeviceCodesByConectionId(connectionId));
+        //String[] topics = deviceTopicService.getBatchTopic(serviceName, new String[]{"", "", ""});
         cfgYf.setSubTopic(topics);
         cfgYf.setServiceName(serviceName);
         log.info("【初始化厂商加载配置】 {} {} {}", firmName, type, Arrays.toString(topics));
 
-        // 处理连接ID的逻辑
-        log.info("【添加公共连接】 connectionId: connectionId");
-        addConnectionMap("connectionId", mqttCore);
+        // 处理连接ID的逻辑 IP+port+name
+        log.info("【添加公共连接】 connectionId: {}",connectionId);
+        addConnectionMap(connectionId, mqttCore);
 
         // 构建MqttCore
         try {
@@ -177,7 +179,7 @@ public class MqttManager {
         cfgYf.setType(type);
         //其他信息
         cfgYf.setFirmName(iotDeviceconnResVo.getFirmName());
-        cfgYf.setDeviceType(DevType.valueOfCode(iotDeviceconnResVo.getDevtypeBid()));
+        cfgYf.setDeviceType(iotDeviceconnResVo.getDevtypeName());
         cfgYf.setDeviceTypeBizId(iotDeviceconnResVo.getDevtypeBid());
 
         cfgYf.setIp(jsonConfig.getString("ip"));
@@ -548,4 +550,14 @@ public class MqttManager {
         }
     }
 
+    /**
+     * 模拟异步发送
+     *
+     * @param msg
+     */
+    @Async
+    public void receiveMsg(String msg) throws InterruptedException {
+        Thread.sleep(5000);
+
+    }
 }

+ 20 - 4
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/DeviceTopicService.java

@@ -11,6 +11,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
+import java.util.ArrayList;
 import java.util.List;
 
 
@@ -78,7 +79,7 @@ public class DeviceTopicService {
             case ServiceNameConst.SERVICE_YF_XYCB_2: {
                 return getYfXycb2BatchSubTopic(deviceId);
             }
-            case ServiceNameConst.SERVICE_BIG_DATA_MONITOR:{
+            case ServiceNameConst.SERVICE_BIG_DATA_MONITOR: {
                 return getHKBatchSubTopic();
             }
             case ServiceNameConst.SERVICE_XPH:
@@ -100,7 +101,7 @@ public class DeviceTopicService {
             topics = new String[iotDeviceList.size()];
             for (int i = 0; i < iotDeviceList.size(); i++) {
                 // 只接受上报的订阅
-                topics[i] = IotMqttConstant.MonitorPhotoTopic.TOPIC_MONITOR_PHOTO_PREFIX + iotDeviceList.get(i).getDevCode() ;
+                topics[i] = IotMqttConstant.MonitorPhotoTopic.TOPIC_MONITOR_PHOTO_PREFIX + iotDeviceList.get(i).getDevCode();
             }
         }
         return topics;
@@ -173,19 +174,20 @@ public class DeviceTopicService {
         return getTopics(deviceId, topicArray);
     }
 
-    private String[] getTopics(String[] deviceId, String[] topicArray){
+    private String[] getTopics(String[] deviceId, String[] topicArray) {
         String[] topics = new String[0];
         if (deviceId != null) {
             int topicLen = topicArray.length;
             topics = new String[deviceId.length * topicLen];
             for (int i = 0; i < deviceId.length; i++) {
-                for(int x = 0; x < topicLen; x++){
+                for (int x = 0; x < topicLen; x++) {
                     topics[i * topicLen + x] = topicArray[x] + deviceId[i];
                 }
             }
         }
         return topics;
     }
+
     /**
      * 获取性诱2.0设备 topic
      *
@@ -239,4 +241,18 @@ public class DeviceTopicService {
         };
         return getTopics(deviceId, topicArray);
     }
+
+    /**
+     * 根据id获取设备信息
+     *
+     * @param connectionId
+     * @return
+     */
+    public String[] getDeviceCodesByConectionId(String connectionId) {
+        List<String> ids = iotDeviceService.getDeviceCodesByConectionId(connectionId);
+        if (ids == null || ids.isEmpty()) {
+            return new String[]{};
+        }
+        return ids.toArray(new String[]{});
+    }
 }

+ 11 - 3
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttConfig.java

@@ -18,12 +18,18 @@ public class MqttConfig {
     private String username;
     private String password;
     /**
-     * 设备类型 ,用于标明实现类
-     * 测报灯:YF-CBD
+     * 设备型号名称
      */
     private String deviceType;
+    /**
+     * 设备型号的id
+     */
     private String deviceTypeBizId;
+    /**
+     * 厂家id
+     */
     private String firmBizId;
+
     private String firmName;
 
     private MqttClient mqttClient;
@@ -33,7 +39,9 @@ public class MqttConfig {
      * 上报的topic
      */
     private String[] subTopic;
-
+    /**
+     * 标识实现类
+     */
     private String serviceName;
     private String type;
 }

+ 1 - 1
src/main/java/com/yunfeiyun/agmp/iots/core/tcp/NettyServer.java

@@ -53,7 +53,7 @@ public class NettyServer {
      *
      * @throws InterruptedException
      */
-    @PostConstruct
+    //@PostConstruct
     public void start() throws InterruptedException {
         ServerBootstrap bootstrap = new ServerBootstrap();
         bootstrap.group(boss, work)

+ 2 - 0
src/main/java/com/yunfeiyun/agmp/iots/device/mapper/IotDeviceMapper.java

@@ -95,6 +95,8 @@ public interface IotDeviceMapper {
     List<IotDevice> selectIotDeviceByIotDeviceDTO(IotDeviceDto iotDeviceDto);
 
     int updateIotDeviceStatusByType(@Param("devStatus") String devStatus,@Param("firmBid") String firmBid,@Param("devtypeBid")String devtypeBid);
+
+    List<String> getDeviceCodesByConectionId(@Param("connectionId") String connectionId);
 }
 
 

+ 2 - 0
src/main/java/com/yunfeiyun/agmp/iots/service/IIotDeviceService.java

@@ -129,5 +129,7 @@ public interface IIotDeviceService {
     Map<String, List<IotDevice>> getXphHttpDeviceMapByDevtypeBids(String firmBizId, List<String> devtypeBids);
 
     public void updateIotDeviceBatch(List<IotDevice> iotDeviceList);
+
+    List<String> getDeviceCodesByConectionId(String connectionId);
 }
 

+ 5 - 0
src/main/java/com/yunfeiyun/agmp/iots/service/impl/IotDeviceServiceImpl.java

@@ -319,6 +319,11 @@ public class IotDeviceServiceImpl implements IIotDeviceService {
         iotDeviceMapper.updateIotDeviceBatch(iotDeviceList);
     }
 
+    @Override
+    public List<String> getDeviceCodesByConectionId(String connectionId) {
+        return iotDeviceMapper.getDeviceCodesByConectionId(connectionId);
+    }
+
 
     /**
      * 是否是属于需要忽略状态的设备

+ 11 - 4
src/main/resources/mapper/BusinessCoreMapper.xml

@@ -8,10 +8,17 @@
     <!-- * 这个方法的定位就是:获取设备型号的信息,用于构建链接 *-->
     <select id="selectTosDevicetypeResVoList"
             resultType="com.yunfeiyun.agmp.iots.common.modal.IotDeviceconnResVo">
-        SELECT
-        *
-        from
-        IotDeviceconn
+            SELECT
+                ic.*,
+        it.devtypeName,
+        it.devtypeCode ,
+        ifd.firmName,
+        ifd.firmBid
+
+                from
+                IotDeviceconn ic
+        LEFT JOIN TosDevicetype it on ic.devtypeBid = it.devtypeBid
+        LEFT JOIN TosFirm ifd on ifd.firmBid=it.firmBid
 
     </select>
 

+ 3 - 0
src/main/resources/mapper/IotDeviceMapper.xml

@@ -509,6 +509,9 @@
         and d.devDelstatus='0'
         group by d.devBid
     </select>
+    <select id="getDeviceCodesByConectionId" resultType="java.lang.String">
+        select  devCode from IotDevice where devconnBid=#{connectionId} and devDelstatus='0'
+    </select>
 </mapper>