Преглед изворни кода

阶段提交:先激活设备,给出设备消费订阅的方法

yf_zn пре 1 година
родитељ
комит
4033498b47

+ 9 - 12
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -101,7 +101,7 @@ public class MqttManager {
         log.info("【创建MqttCore实例】 mqttCore: {} connectionId:{} devSize:{}", mqttCore,connectionId,devices.size());
         List<MqttTopicValue> mqttTopicValues=new ArrayList<>();
         for(IotDevice iotDevice:devices){
-
+            // 根据设备code获取topics
             String[] topics= deviceTopicService.getTopic(serviceName, iotDevice.getDevCode());
             for(String s: topics){
                 MqttTopicValue mqttTopicValue=new MqttTopicValue();
@@ -110,7 +110,6 @@ public class MqttManager {
                 mqttTopicValue.setTopic(s);
                 mqttTopicValues.add(mqttTopicValue);
             }
-
         }
 
         String[] topics=new String[mqttTopicValues.size()];
@@ -392,16 +391,18 @@ public class MqttManager {
         log.info("【开始批量订阅】 connectionId: {}, serviceName: {}, deviceIds: {}", connectionId, serviceName, mqttTopicValues);
 
         // 获取批量订阅的主题
-        String[] topics = deviceTopicService.getBatchTopic(serviceName, mqttTopicValues);
+        String[] topics = new String[mqttTopicValues.size()];
+        for(int i=0;i<topics.length;i++){
+            topics[i]=mqttTopicValues.get(i).getTopic();
+        }
         log.info("【获取批量订阅主题】 topics: {}", Arrays.toString(topics));
-
         // 获取MqttCore实例
         MqttCore mqttCore = getMqttCoreByConnectionId(connectionId);
         log.info("【获取MqttCore实例】 mqttCore: {}", mqttCore);
-
         // 执行订阅
         try {
             mqttCore.subscribe(topics);
+            mqttCore.bindTopicToDeviceId(mqttTopicValues);
             log.info("【批量订阅成功】 connectionId: {}, serviceName: {}, topics: {}, serviceType: {}", connectionId, serviceName, Arrays.toString(topics), mqttCore.getServiceType());
         } catch (MqttException e) {
             log.error("【批量订阅失败】 connectionId: {}, serviceName: {}, topics: {}, 异常信息: {}", connectionId, serviceName, Arrays.toString(topics), e.getMessage(), e);
@@ -417,15 +418,11 @@ public class MqttManager {
      *
      * @param connectionId
      * @param serviceName
-     * @param mqttTopicValue
+     * @param mqttTopicValues
      * @throws MqttException
      */
-    public void topicSingleSubscribeDevice(String connectionId, String serviceName, MqttTopicValue mqttTopicValue) throws MqttException {
-        log.info("【开始单个订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, mqttTopicValue);
-
-        // 转换为批量订阅
-        List<MqttTopicValue> mqttTopicValues = new ArrayList<>();
-        mqttTopicValues.add(mqttTopicValue);
+    public void topicSingleSubscribeDevice(String connectionId, String serviceName, List<MqttTopicValue>  mqttTopicValues) throws MqttException {
+        log.info("【开始单个订阅】 connectionId: {}, serviceName: {}, deviceId: {}", connectionId, serviceName, mqttTopicValues);
         // 调用批量订阅方法
         topicBatchSubscribeDevices(connectionId, serviceName, mqttTopicValues);
 

+ 2 - 0
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttCore.java

@@ -240,7 +240,9 @@ public class MqttCore {
      * @param mqttTopicValues
      */
     public void bindTopicToDeviceId(List<MqttTopicValue> mqttTopicValues){
+        log.info("xxxxxxxxxxxxxxxxxy映射");
         for(MqttTopicValue mqttTopicValue:mqttTopicValues){
+            log.info("xxxxxxxxxxxxxxxxx {}",mqttTopicValue);
             topicToDevId.put(mqttTopicValue.getTopic(),mqttTopicValue.getDevId());
 
         }

+ 1 - 0
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/CqCbdDeviceImpl.java

@@ -212,6 +212,7 @@ public class CqCbdDeviceImpl implements ICbdDevice {
         String devId=mqttManager.getDevIdByTopic(connectionId,topic);
         IotDevice iotDeviceFromDb= iIotDeviceService.selectIotDeviceByDevBid(devId);
         if (iotDeviceFromDb == null) {
+            log.info("【测报灯】iotDeviceFromDb 空 {} ", devId);
             return ;
         }
         boolean isCbd = true;

+ 28 - 0
src/main/java/com/yunfeiyun/agmp/iots/mq/listener/IotmBaseDataChannelAwareMessageListener.java

@@ -5,9 +5,14 @@ import com.rabbitmq.client.Channel;
 import com.yunfeiyun.agmp.common.framework.mq.rabbitmq.model.SynGlobalTenantInfoDto;
 import com.yunfeiyun.agmp.common.utils.JSONUtils;
 import com.yunfeiyun.agmp.iot.common.constant.mq.IotActionEnums;
+import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
 import com.yunfeiyun.agmp.iot.common.model.cmd.CmdGroupModel;
 import com.yunfeiyun.agmp.iot.common.model.cmd.CmdModel;
+import com.yunfeiyun.agmp.iot.common.service.TypeCacheService;
 import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
+import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
+import com.yunfeiyun.agmp.iots.core.mqtt.DeviceTopicService;
+import com.yunfeiyun.agmp.iots.core.mqtt.modal.MqttTopicValue;
 import com.yunfeiyun.agmp.iots.device.serviceImp.IotCbdImgService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
@@ -17,6 +22,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
 import org.springframework.stereotype.Component;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * 负责处理AGMP 子系统的消息
@@ -31,6 +38,13 @@ public class IotmBaseDataChannelAwareMessageListener implements ChannelAwareMess
     @Autowired
     private IotCbdImgService iotCbdImgService;
 
+    @Autowired
+    private MqttManager mqttManager;
+    @Autowired
+    private TypeCacheService cacheService;
+    @Autowired
+    private DeviceTopicService deviceTopicService;
+
     @Override
     public void onMessage(Message message, Channel channel) throws Exception {
         try {
@@ -54,6 +68,20 @@ public class IotmBaseDataChannelAwareMessageListener implements ChannelAwareMess
                         break;
                     //设备创建
                     case IOT_DEVICE_CREATE:
+                        //临时写到这里了,先激活设备,等到负责写链接重新订阅的人统一调整
+                        IotDevice iotDevice = synGlobalTenantInfoDto.getData().to(IotDevice.class);
+                        String serviceName=cacheService.getServiceNameByDevTypeBid(iotDevice.getDevtypeBid());
+                        String[] topics= deviceTopicService.getTopic(serviceName ,iotDevice.getDevCode());
+                        List<MqttTopicValue>  mqttTopicValues=new ArrayList<>();
+                        //改设备的所有topics
+                        for(String s:topics){
+                            MqttTopicValue mqttTopicValue=new MqttTopicValue();
+                            mqttTopicValue.setDevCode(iotDevice.getDevCode());
+                            mqttTopicValue.setDevId(iotDevice.getDevBid());
+                            mqttTopicValue.setTopic(s);
+                            mqttTopicValues.add(mqttTopicValue);
+                        }
+                        mqttManager.topicSingleSubscribeDevice(iotDevice.getDevconnBid(),serviceName,mqttTopicValues);
                         break;
                     //设备更新
                     case IOT_DEVICE_UPDATE: