Quellcode durchsuchen

指令集 RabbitMq 初始化

liuyaowen vor 1 Jahr
Ursprung
Commit
790af9ad52

+ 1 - 1
src/main/java/com/yunfeiyun/agmp/iots/core/modbus/ModbusSlaveServer.java

@@ -45,7 +45,7 @@ public class ModbusSlaveServer implements CommandLineRunner {
             @Override
             public void run() {
 
-                createSalve();
+                // createSalve();
             }
         }).start();
     }

+ 71 - 18
src/main/java/com/yunfeiyun/agmp/iots/mq/bussiness/AgmpMqBusConfig.java

@@ -2,8 +2,8 @@ package com.yunfeiyun.agmp.iots.mq.bussiness;
 
 import com.yunfeiyun.agmp.iot.common.constant.mq.IotMqExchange;
 import com.yunfeiyun.agmp.iot.common.constant.mq.IotMqQueue;
-
-import com.yunfeiyun.agmp.iots.mq.listener.IotmChannelAwareMessageListener;
+import com.yunfeiyun.agmp.iots.mq.listener.IotmBaseDataChannelAwareMessageListener;
+import com.yunfeiyun.agmp.iots.mq.listener.IotmCmdChannelAwareMessageListener;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.*;
 import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
@@ -36,10 +36,15 @@ public class AgmpMqBusConfig {
      */
     @PostConstruct
     public void init() {
-        log.info("加载Agmp Mq");
-        agmpAmqpAdmin.declareQueue(agmpQueue());
-        agmpAmqpAdmin.declareExchange(agmpExchange());
-        agmpAmqpAdmin.declareBinding(agmpBinding());
+        log.info("加载Cmd Mq");
+        agmpAmqpAdmin.declareQueue(iotmToIotsCmdQueue());
+        agmpAmqpAdmin.declareExchange(iotmToIotsCmdExchange());
+        agmpAmqpAdmin.declareBinding(iotmToIotsCmdBinding());
+        log.info("加载Base Data Mq");
+        agmpAmqpAdmin.declareQueue(iotmToIotsCmdQueue());
+        agmpAmqpAdmin.declareExchange(iotmToIotsCmdExchange());
+        agmpAmqpAdmin.declareBinding(iotmToIotsCmdBinding());
+
     }
 
     /**
@@ -47,8 +52,8 @@ public class AgmpMqBusConfig {
      *
      * @return 队列对象
      */
-    @Bean("agmpQueue")
-    public Queue agmpQueue() {
+    @Bean("iotmToIotsCmdQueue")
+    public Queue iotmToIotsCmdQueue() {
         return QueueBuilder.durable(IotMqQueue.IOTM_TO_IOTS_CMD_QUEUE).build();
     }
 
@@ -57,8 +62,8 @@ public class AgmpMqBusConfig {
      *
      * @return 交换机对象
      */
-    @Bean("agmpExchange")
-    public FanoutExchange agmpExchange() {
+    @Bean("iotmToIotsCmdExchange")
+    public FanoutExchange iotmToIotsCmdExchange() {
         return new FanoutExchange(IotMqExchange.IOTM_TO_IOTS_EXCHANGE);
     }
 
@@ -67,9 +72,39 @@ public class AgmpMqBusConfig {
      *
      * @return 绑定对象
      */
-    @Bean("agmpBinding")
-    public Binding agmpBinding() {
-        return BindingBuilder.bind(agmpQueue()).to(agmpExchange());
+    @Bean("iotmToIotsCmdBinding")
+    public Binding iotmToIotsCmdBinding() {
+        return BindingBuilder.bind(iotmToIotsCmdQueue()).to(iotmToIotsCmdExchange());
+    }
+
+    /**
+     * 定义一个持久化的队列
+     *
+     * @return 队列对象
+     */
+    @Bean("iotmToIotsBaseDataQueue")
+    public Queue iotmToIotsBaseDataQueue() {
+        return QueueBuilder.durable(IotMqQueue.IOTM_TO_IOTS_CMD_QUEUE).build();
+    }
+
+    /**
+     * 定义一个主题交换机
+     *
+     * @return 交换机对象
+     */
+    @Bean("iotmToIotsBaseDataExchange")
+    public FanoutExchange iotmToIotsBaseDataExchange() {
+        return new FanoutExchange(IotMqExchange.IOTM_TO_IOTS_EXCHANGE);
+    }
+
+    /**
+     * 定义队列与交换机的绑定关系
+     *
+     * @return 绑定对象
+     */
+    @Bean("iotmToIotsBaseDataBinding")
+    public Binding iotmToIotsBaseDataBinding() {
+        return BindingBuilder.bind(iotmToIotsBaseDataQueue()).to(iotmToIotsBaseDataExchange());
     }
 
 
@@ -80,17 +115,35 @@ public class AgmpMqBusConfig {
      * @param listener          消息监听器
      * @return 消息监听器容器
      */
-    @Bean("agmpSimpleMessageListenerContainer")
-    public SimpleMessageListenerContainer agmpSimpleMessageListenerContainer(
+    @Bean("iotmBaseDataMessageListenerContainer")
+    public SimpleMessageListenerContainer iotmBaseDataMessageListenerContainer(
             @Qualifier("agmpConnectionFactory") CachingConnectionFactory connectionFactory,
-            IotmChannelAwareMessageListener listener) {
+            IotmBaseDataChannelAwareMessageListener listener) {
         SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
-        container.setQueues(agmpQueue());
+        container.setQueues(iotmToIotsBaseDataQueue());
+        container.setMessageListener(listener);
+        // 设置确认模式为自动确认
+        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+        container.setAmqpAdmin(agmpAmqpAdmin);
+        return container;
+    }
+    /**
+     * 配置一个具体的消息监听器容器,用于监听特定的队列并处理消息
+     *
+     * @param connectionFactory 连接工厂
+     * @param listener          消息监听器
+     * @return 消息监听器容器
+     */
+    @Bean("iotmCmdMessageListenerContainer")
+    public SimpleMessageListenerContainer iotmCmdMessageListenerContainer(
+            @Qualifier("agmpConnectionFactory") CachingConnectionFactory connectionFactory,
+            IotmCmdChannelAwareMessageListener listener) {
+        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
+        container.setQueues(iotmToIotsCmdQueue());
         container.setMessageListener(listener);
         // 设置确认模式为自动确认
         container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
         container.setAmqpAdmin(agmpAmqpAdmin);
-
         return container;
     }
 

+ 4 - 4
src/main/java/com/yunfeiyun/agmp/iots/mq/listener/IotmChannelAwareMessageListener.java

@@ -2,12 +2,11 @@ package com.yunfeiyun.agmp.iots.mq.listener;
 
 import com.alibaba.fastjson2.JSONObject;
 import com.rabbitmq.client.Channel;
-import com.yunfeiyun.agmp.common.framework.mq.rabbitmq.enums.TosActionEnums;
 import com.yunfeiyun.agmp.common.framework.mq.rabbitmq.model.SynGlobalTenantInfoDto;
 import com.yunfeiyun.agmp.common.utils.JSONUtils;
 import com.yunfeiyun.agmp.iot.common.constant.mq.IotActionEnums;
-import com.yunfeiyun.agmp.iot.common.domain.reqvo.IotCbdrecogAgainReqVo;
 import com.yunfeiyun.agmp.iot.common.model.cmd.CmdGroupModel;
+import com.yunfeiyun.agmp.iot.common.model.cmd.CmdModel;
 import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
 import com.yunfeiyun.agmp.iots.device.serviceImp.IotCbdImgService;
 import lombok.extern.slf4j.Slf4j;
@@ -25,7 +24,7 @@ import java.io.IOException;
 @Component
 @Slf4j
 @ConditionalOnBean(name = "agmpMqConfig")
-public class IotmChannelAwareMessageListener implements ChannelAwareMessageListener {
+public class IotmBaseDataChannelAwareMessageListener implements ChannelAwareMessageListener {
     @Autowired
     private CmdDispatcherService cmdDispatcherService;
 
@@ -47,7 +46,8 @@ public class IotmChannelAwareMessageListener implements ChannelAwareMessageListe
                 switch (iotActionEnums) {
                     //控制指令
                     case CMD_TASK:
-                        CmdGroupModel cmdGroupModel = JSONUtils.toObject(synGlobalTenantInfoDto.getData().toString(), CmdGroupModel.class);
+                        CmdGroupModel cmdGroupModel = synGlobalTenantInfoDto.getData().to(CmdGroupModel.class);
+                      //  CmdGroupModel cmdGroupModel = JSONUtils.toObject(JSONUtils.toJSONString(synGlobalTenantInfoDto.getData()), CmdGroupModel.class);
                         cmdDispatcherService.handleCmd(cmdGroupModel);
                         break;
                     //控制指令结果

+ 55 - 0
src/main/java/com/yunfeiyun/agmp/iots/mq/listener/IotmCmdChannelAwareMessageListener.java

@@ -0,0 +1,55 @@
+package com.yunfeiyun.agmp.iots.mq.listener;
+
+import com.rabbitmq.client.Channel;
+import com.yunfeiyun.agmp.common.framework.mq.rabbitmq.model.SynGlobalTenantInfoDto;
+import com.yunfeiyun.agmp.common.utils.JSONUtils;
+import com.yunfeiyun.agmp.iot.common.constant.mq.IotActionEnums;
+import com.yunfeiyun.agmp.iot.common.model.cmd.CmdGroupModel;
+import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
+import com.yunfeiyun.agmp.iots.device.serviceImp.IotCbdImgService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+
+/**
+ * 负责处理AGMP 子系统的消息
+ */
+@Component
+@Slf4j
+@ConditionalOnBean(name = "agmpMqConfig")
+public class IotmCmdChannelAwareMessageListener implements ChannelAwareMessageListener {
+    @Autowired
+    private CmdDispatcherService cmdDispatcherService;
+
+    @Autowired
+    private IotCbdImgService iotCbdImgService;
+
+    @Override
+    public void onMessage(Message message, Channel channel) throws Exception {
+        try {
+            // 处理消息
+            byte[] body = message.getBody();
+            String content = new String(body);
+            log.info("【SAAS:】收到AGMP消息:{}", content);
+            SynGlobalTenantInfoDto synGlobalTenantInfoDto = JSONUtils.toObject(content, SynGlobalTenantInfoDto.class);
+            //根据不同的action进行相应业务处理
+            String action = synGlobalTenantInfoDto.getAction();
+            IotActionEnums iotActionEnums = IotActionEnums.getAction(action);
+            if (iotActionEnums != null) {
+
+            } else {
+                log.error("【SAAS:】收到Tos:所有租户处理的消息:action 为空,忽略消息");
+            }
+            // 手动确认消息
+            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+        } catch (IOException e) {
+            // 处理异常,例如重新入队或拒绝消息
+            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
+        }
+    }
+}