liuyaowen 1 年間 前
コミット
9e3f121661

+ 0 - 51
src/main/java/com/yunfeiyun/agmp/iots/mq/IotsMqService.java

@@ -1,51 +0,0 @@
-package com.yunfeiyun.agmp.iots.mq;
-
-import com.alibaba.fastjson2.JSON;
-import com.yunfeiyun.agmp.common.framework.message.MqMsg;
-import com.yunfeiyun.agmp.common.utils.JSONUtils;
-import com.yunfeiyun.agmp.iot.common.constant.mq.IotMqConstant;
-import com.yunfeiyun.agmp.iot.common.model.task.TaskResult;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.AmqpException;
-import org.springframework.amqp.core.*;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.PostConstruct;
-
-@Component
-@Slf4j
-public class IotsMqService {
-
-    @Autowired
-    AmqpTemplate amqpTemplate;
-
-    @Value("${application.name}")
-    private String applicationName;
-
-    @PostConstruct
-    public void init() {
-
-    }
-
-    public void sendTaskResultMsg(TaskResult taskResult) {
-        MqMsg mqMsg = new MqMsg("result", JSONUtils.toJSONString(taskResult), applicationName);
-        Message message = MessageBuilder.withBody(JSON.toJSONString(mqMsg).getBytes()).build();
-        try {
-            this.amqpTemplate.convertAndSend("amq.topic", IotMqConstant.TOPIC_TASK_STATUS, message);
-            log.debug("iots --》 iotm    mq消息 {}" + this, mqMsg);
-        } catch (AmqpException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void sendMsg(String msgType, String queue, Object data) {
-        //log.debug("发送mq消息1 {}" + this, data);
-        MqMsg mqMsg = new MqMsg(msgType, JSONUtils.toJSONString(data), applicationName);
-        Message message = MessageBuilder.withBody(JSONUtils.toJSONString((mqMsg)).getBytes()).build();
-        this.amqpTemplate.convertAndSend("amq.topic", queue, message);
-        log.debug("发送mq消息2 {}" + this, mqMsg);
-    }
-
-}

+ 97 - 0
src/main/java/com/yunfeiyun/agmp/iots/mq/bussiness/AgmpMqBusConfig.java

@@ -0,0 +1,97 @@
+package com.yunfeiyun.agmp.iots.mq.bussiness;
+
+import com.yunfeiyun.agmp.iot.common.constant.mq.IotMqExchange;
+import com.yunfeiyun.agmp.iot.common.constant.mq.IotMqQueue;
+
+import com.yunfeiyun.agmp.iots.mq.listener.IotmChannelAwareMessageListener;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * 智慧农业SaaS系统的MQ配置类
+ */
+@Configuration
+@Slf4j
+@ConditionalOnBean(name = "agmpMqConfig")
+public class AgmpMqBusConfig {
+
+    /**
+     * 注入AMQP管理员,用于声明队列、交换机和绑定
+     */
+    @Autowired
+    @Qualifier("agmpAmqpAdmin")
+    private AmqpAdmin agmpAmqpAdmin;
+
+    /**
+     * 初始化方法,在Spring容器初始化时声明队列、交换机和绑定
+     */
+    @PostConstruct
+    public void init() {
+        log.info("加载Agmp Mq");
+        agmpAmqpAdmin.declareQueue(agmpQueue());
+        agmpAmqpAdmin.declareExchange(agmpExchange());
+        agmpAmqpAdmin.declareBinding(agmpBinding());
+    }
+
+    /**
+     * 定义一个持久化的队列
+     *
+     * @return 队列对象
+     */
+    @Bean("agmpQueue")
+    public Queue agmpQueue() {
+        return QueueBuilder.durable(IotMqQueue.IOTM_TO_IOTS_CMD_QUEUE).build();
+    }
+
+    /**
+     * 定义一个主题交换机
+     *
+     * @return 交换机对象
+     */
+    @Bean("agmpExchange")
+    public FanoutExchange agmpExchange() {
+        return new FanoutExchange(IotMqExchange.IOTM_TO_IOTS_EXCHANGE);
+    }
+
+    /**
+     * 定义队列与交换机的绑定关系
+     *
+     * @return 绑定对象
+     */
+    @Bean("agmpBinding")
+    public Binding agmpBinding() {
+        return BindingBuilder.bind(agmpQueue()).to(agmpExchange());
+    }
+
+
+    /**
+     * 配置一个具体的消息监听器容器,用于监听特定的队列并处理消息
+     *
+     * @param connectionFactory 连接工厂
+     * @param listener          消息监听器
+     * @return 消息监听器容器
+     */
+    @Bean("agmpSimpleMessageListenerContainer")
+    public SimpleMessageListenerContainer agmpSimpleMessageListenerContainer(
+            @Qualifier("agmpConnectionFactory") CachingConnectionFactory connectionFactory,
+            IotmChannelAwareMessageListener listener) {
+        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
+        container.setQueues(agmpQueue());
+        container.setMessageListener(listener);
+        // 设置确认模式为自动确认
+        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+        container.setAmqpAdmin(agmpAmqpAdmin);
+
+        return container;
+    }
+
+}

+ 0 - 79
src/main/java/com/yunfeiyun/agmp/iots/mq/bussiness/IotMqBusConfig.java

@@ -1,79 +0,0 @@
-package com.yunfeiyun.agmp.iots.mq.bussiness;
-
-
-import com.yunfeiyun.agmp.iot.common.constant.mq.IotMqExchange;
-import com.yunfeiyun.agmp.iot.common.constant.mq.IotMqQueue;
-import com.yunfeiyun.agmp.iots.mq.listener.IotsChannelGlobalAwareMessageListener;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.core.*;
-import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
-import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-/**/
-import javax.annotation.PostConstruct;
-
-/**
- * TOS 的 MQ配置
- * 1. 链接信息
- * 2. queue 定义
- * 3. exchange 定义
- * 4. routing key 定义
- * 5. 链接工厂定义
- */
-@Slf4j
-@Configuration
-@ConditionalOnBean(name = "iotMqBusConfig")
-public class IotMqBusConfig {
-
-    @Autowired
-    @Qualifier("iotAmqpAdmin")
-    private AmqpAdmin iotAmqpAdmin;
-
-
-
-    @PostConstruct
-    public void init() {
-        log.info("加载Tos 全局租户通道 Mq");
-        iotAmqpAdmin.declareExchange(iotmToIotsGlobalExchange());
-        iotAmqpAdmin.declareQueue(topicTaskQueue());
-        iotAmqpAdmin.declareBinding(topicTaskBinding());
-    }
-
-    public FanoutExchange iotmToIotsGlobalExchange() {
-        return new FanoutExchange(IotMqExchange.IOTM_TO_IOTS_EXCHANGE);
-    }
-
-    public Queue topicTaskQueue() {
-        return QueueBuilder.durable(IotMqQueue.IOTM_TO_IOTS_CMD_QUEUE).build();
-    }
-    public Binding topicTaskBinding() {
-        return BindingBuilder.bind(topicTaskQueue()).to(iotmToIotsGlobalExchange());
-    }
-
-    // ########################################## 接受tos消息 全部租户通道结束##########################################
-
-
-    /**
-     * 配置接受所有租户都要处理的监听器,用户消息回执
-     *
-     * @param connectionFactory
-     * @param lobleAwareMessageListener
-     * @return
-     */
-    @Bean("iotGlobleSimpleMessageListenerContainer")
-    public SimpleMessageListenerContainer iotsGlobleSimpleMessageListenerContainer(
-            @Qualifier("iotConnectionFactory") CachingConnectionFactory connectionFactory,
-            IotsChannelGlobalAwareMessageListener lobleAwareMessageListener) {
-        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
-        container.setQueues(topicTaskQueue());
-        container.setMessageListener(lobleAwareMessageListener);
-        container.setAmqpAdmin(iotAmqpAdmin);
-        // 设置确认模式为手动确认
-        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
-        return container;
-    }
-}

+ 34 - 0
src/main/java/com/yunfeiyun/agmp/iots/mq/listener/IotmChannelAwareMessageListener.java

@@ -0,0 +1,34 @@
+package com.yunfeiyun.agmp.iots.mq.listener;
+
+import com.rabbitmq.client.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+
+/**
+ * 负责处理AGMP 子系统的消息
+ */
+@Component
+@Slf4j
+@ConditionalOnBean(name = "agmpMqConfig")
+public class IotmChannelAwareMessageListener implements ChannelAwareMessageListener {
+
+    @Override
+    public void onMessage(Message message, Channel channel) throws Exception {
+        try {
+            // 处理消息
+            byte[] body = message.getBody();
+            String content = new String(body);
+            log.info("【SAAS:】收到AGMP消息:{}", content);
+            // 手动确认消息
+            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+        } catch (IOException e) {
+            // 处理异常,例如重新入队或拒绝消息
+            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
+        }
+    }
+}

+ 0 - 55
src/main/java/com/yunfeiyun/agmp/iots/mq/listener/IotsChannelGlobalAwareMessageListener.java

@@ -1,55 +0,0 @@
-package com.yunfeiyun.agmp.iots.mq.listener;
-
-import com.rabbitmq.client.Channel;
-import com.yunfeiyun.agmp.common.framework.mq.rabbitmq.enums.TosActionEnums;
-import com.yunfeiyun.agmp.common.framework.mq.rabbitmq.model.SynGlobalTenantInfoDto;
-import com.yunfeiyun.agmp.common.utils.JSONUtils;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.core.Message;
-import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
-import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-import java.io.IOException;
-
-/**
- * 专门处理tos[所有租户]发的消息
- * 目前暂时没有这个情况,但后面遇到全局的字典就会用到
- *
- * @author zhangnn
- * @data 2024年10月9日
- */
-@Component
-@Slf4j
-@ConditionalOnBean(name = "iotMqConfig")
-public class IotsChannelGlobalAwareMessageListener implements ChannelAwareMessageListener {
-
-
-    @Override
-    public void onMessage(Message message, Channel channel) throws Exception {
-        try {
-            // 处理消息
-            byte[] body = message.getBody();
-            String content = new String(body);
-            log.info("【SAAS:】收到Tos:所有租户处理的消息:{}", content);
-            SynGlobalTenantInfoDto synGlobalTenantInfoDto = JSONUtils.toObject(content, SynGlobalTenantInfoDto.class);
-            //根据不同的action进行相应业务处理
-            String action = synGlobalTenantInfoDto.getAction();
-            TosActionEnums tosActionEnums = TosActionEnums.getAction(action);
-            if (tosActionEnums != null) {
-                switch (tosActionEnums) {
-
-                }
-            } else {
-                log.error("【SAAS:】收到Tos:所有租户处理的消息:action 为空,忽略消息");
-            }
-            // 手动确认消息
-            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
-        } catch (IOException e) {
-            // 处理异常,例如重新入队或拒绝消息
-            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
-        }
-    }
-}

+ 2 - 2
src/main/resources/application-dev.yml

@@ -131,12 +131,12 @@ spring:
       # 热部署开关
       enabled: true
   rabbitmq:
-    iot:
+    agmp:
       host: 192.168.1.228
       port: 5672
       username: admin
       password: admin
-      virtual-host: /agmp-sass-tos
+      virtual-host: /local_IP_05
       connection-timeout: 15000
       publisher-returns: true
       enabled: true