Parcourir la source

拆分消息队列,将指令消息与基础数据通知拆分

liuyaowen il y a 1 an
Parent
commit
9d4dd70a57

+ 11 - 10
src/main/java/com/yunfeiyun/agmp/iots/mq/bussiness/AgmpMqBusConfig.java

@@ -41,9 +41,10 @@ public class AgmpMqBusConfig {
         agmpAmqpAdmin.declareExchange(iotmToIotsCmdExchange());
         agmpAmqpAdmin.declareBinding(iotmToIotsCmdBinding());
         log.info("加载Base Data Mq");
-        agmpAmqpAdmin.declareQueue(iotmToIotsCmdQueue());
-        agmpAmqpAdmin.declareExchange(iotmToIotsCmdExchange());
-        agmpAmqpAdmin.declareBinding(iotmToIotsCmdBinding());
+
+        agmpAmqpAdmin.declareQueue(iotmToIotsBaseDataQueue());
+        agmpAmqpAdmin.declareExchange(iotmToIotsBaseDataExchange());
+        agmpAmqpAdmin.declareBinding(iotmToIotsBaseDataBinding());
 
     }
 
@@ -63,8 +64,8 @@ public class AgmpMqBusConfig {
      * @return 交换机对象
      */
     @Bean("iotmToIotsCmdExchange")
-    public FanoutExchange iotmToIotsCmdExchange() {
-        return new FanoutExchange(IotMqExchange.IOTM_TO_IOTS_EXCHANGE);
+    public DirectExchange iotmToIotsCmdExchange() {
+        return new DirectExchange(IotMqExchange.IOTM_TO_IOTS_EXCHANGE);
     }
 
     /**
@@ -74,7 +75,7 @@ public class AgmpMqBusConfig {
      */
     @Bean("iotmToIotsCmdBinding")
     public Binding iotmToIotsCmdBinding() {
-        return BindingBuilder.bind(iotmToIotsCmdQueue()).to(iotmToIotsCmdExchange());
+        return BindingBuilder.bind(iotmToIotsCmdQueue()).to(iotmToIotsCmdExchange()).with(IotMqQueue.IOTM_TO_IOTS_CMD_QUEUE);
     }
 
     /**
@@ -84,7 +85,7 @@ public class AgmpMqBusConfig {
      */
     @Bean("iotmToIotsBaseDataQueue")
     public Queue iotmToIotsBaseDataQueue() {
-        return QueueBuilder.durable(IotMqQueue.IOTM_TO_IOTS_CMD_QUEUE).build();
+        return QueueBuilder.durable(IotMqQueue.IOTM_TO_IOTS_BASE_DATA_QUEUE).build();
     }
 
     /**
@@ -93,8 +94,8 @@ public class AgmpMqBusConfig {
      * @return 交换机对象
      */
     @Bean("iotmToIotsBaseDataExchange")
-    public FanoutExchange iotmToIotsBaseDataExchange() {
-        return new FanoutExchange(IotMqExchange.IOTM_TO_IOTS_EXCHANGE);
+    public DirectExchange iotmToIotsBaseDataExchange() {
+        return new DirectExchange(IotMqExchange.IOTM_TO_IOTS_EXCHANGE);
     }
 
     /**
@@ -104,7 +105,7 @@ public class AgmpMqBusConfig {
      */
     @Bean("iotmToIotsBaseDataBinding")
     public Binding iotmToIotsBaseDataBinding() {
-        return BindingBuilder.bind(iotmToIotsBaseDataQueue()).to(iotmToIotsBaseDataExchange());
+        return BindingBuilder.bind(iotmToIotsBaseDataQueue()).to(iotmToIotsBaseDataExchange()).with(IotMqQueue.IOTM_TO_IOTS_BASE_DATA_QUEUE);
     }
 
 

+ 1 - 2
src/main/java/com/yunfeiyun/agmp/iots/mq/listener/IotmBaseDataChannelAwareMessageListener.java

@@ -38,7 +38,7 @@ public class IotmBaseDataChannelAwareMessageListener implements ChannelAwareMess
             byte[] body = message.getBody();
             String content = new String(body);
             log.info("【SAAS:】收到AGMP消息:{}", content);
-            SynGlobalTenantInfoDto synGlobalTenantInfoDto = JSONUtils.toObject(content, SynGlobalTenantInfoDto.class);
+            SynGlobalTenantInfoDto synGlobalTenantInfoDto = JSONObject.parseObject(content, SynGlobalTenantInfoDto.class);
             //根据不同的action进行相应业务处理
             String action = synGlobalTenantInfoDto.getAction();
             IotActionEnums iotActionEnums = IotActionEnums.getAction(action);
@@ -47,7 +47,6 @@ public class IotmBaseDataChannelAwareMessageListener implements ChannelAwareMess
                     //控制指令
                     case CMD_TASK:
                         CmdGroupModel cmdGroupModel = synGlobalTenantInfoDto.getData().to(CmdGroupModel.class);
-                      //  CmdGroupModel cmdGroupModel = JSONUtils.toObject(JSONUtils.toJSONString(synGlobalTenantInfoDto.getData()), CmdGroupModel.class);
                         cmdDispatcherService.handleCmd(cmdGroupModel);
                         break;
                     //控制指令结果

+ 3 - 2
src/main/java/com/yunfeiyun/agmp/iots/mq/listener/IotmCmdChannelAwareMessageListener.java

@@ -35,13 +35,14 @@ public class IotmCmdChannelAwareMessageListener implements ChannelAwareMessageLi
             // 处理消息
             byte[] body = message.getBody();
             String content = new String(body);
-            log.info("【SAAS:】收到AGMP消息:{}", content);
+            log.info("【IOTS】收到IOTM的指令消息{}", content);
             SynGlobalTenantInfoDto synGlobalTenantInfoDto = JSONUtils.toObject(content, SynGlobalTenantInfoDto.class);
             //根据不同的action进行相应业务处理
             String action = synGlobalTenantInfoDto.getAction();
             IotActionEnums iotActionEnums = IotActionEnums.getAction(action);
             if (iotActionEnums != null) {
-
+                CmdGroupModel cmdGroupModel = synGlobalTenantInfoDto.getData().to(CmdGroupModel.class);
+                cmdDispatcherService.handleCmd(cmdGroupModel);
             } else {
                 log.error("【SAAS:】收到Tos:所有租户处理的消息:action 为空,忽略消息");
             }