|
|
@@ -0,0 +1,98 @@
|
|
|
+package com.yunfeiyun.agmp.iotm.mq.bussiness;
|
|
|
+
|
|
|
+import com.yunfeiyun.agmp.common.framework.mq.rabbitmq.consts.MqAgmpConsts;
|
|
|
+import com.yunfeiyun.agmp.iot.common.constant.mq.IotMqExchange;
|
|
|
+import com.yunfeiyun.agmp.iot.common.constant.mq.IotMqQueue;
|
|
|
+
|
|
|
+import com.yunfeiyun.agmp.iotm.mq.listener.AgmpChannelAwareMessageListener;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.amqp.core.*;
|
|
|
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
|
|
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Qualifier;
|
|
|
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
+import org.springframework.context.annotation.Configuration;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 智慧农业SaaS系统的MQ配置类
|
|
|
+ */
|
|
|
+@Configuration
|
|
|
+@Slf4j
|
|
|
+@ConditionalOnBean(name = "agmpMqConfig")
|
|
|
+public class AgmpMqBusConfig {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 注入AMQP管理员,用于声明队列、交换机和绑定
|
|
|
+ */
|
|
|
+ @Autowired
|
|
|
+ @Qualifier("agmpAmqpAdmin")
|
|
|
+ private AmqpAdmin agmpAmqpAdmin;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 初始化方法,在Spring容器初始化时声明队列、交换机和绑定
|
|
|
+ */
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ log.info("加载Agmp Mq");
|
|
|
+ agmpAmqpAdmin.declareQueue(agmpQueue());
|
|
|
+ agmpAmqpAdmin.declareExchange(agmpExchange());
|
|
|
+ agmpAmqpAdmin.declareBinding(agmpBinding());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 定义一个持久化的队列
|
|
|
+ *
|
|
|
+ * @return 队列对象
|
|
|
+ */
|
|
|
+ @Bean("agmpQueue")
|
|
|
+ public Queue agmpQueue() {
|
|
|
+ return QueueBuilder.durable(IotMqQueue.IOTM_TO_IOTS_CMD_QUEUE).build();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 定义一个主题交换机
|
|
|
+ *
|
|
|
+ * @return 交换机对象
|
|
|
+ */
|
|
|
+ @Bean("agmpExchange")
|
|
|
+ public FanoutExchange agmpExchange() {
|
|
|
+ return new FanoutExchange(IotMqExchange.IOTM_TO_IOTS_EXCHANGE);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 定义队列与交换机的绑定关系
|
|
|
+ *
|
|
|
+ * @return 绑定对象
|
|
|
+ */
|
|
|
+ @Bean("agmpBinding")
|
|
|
+ public Binding agmpBinding() {
|
|
|
+ return BindingBuilder.bind(agmpQueue()).to(agmpExchange());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 配置一个具体的消息监听器容器,用于监听特定的队列并处理消息
|
|
|
+ *
|
|
|
+ * @param connectionFactory 连接工厂
|
|
|
+ * @param listener 消息监听器
|
|
|
+ * @return 消息监听器容器
|
|
|
+ */
|
|
|
+ @Bean("agmpSimpleMessageListenerContainer")
|
|
|
+ public SimpleMessageListenerContainer agmpSimpleMessageListenerContainer(
|
|
|
+ @Qualifier("agmpConnectionFactory") CachingConnectionFactory connectionFactory,
|
|
|
+ AgmpChannelAwareMessageListener listener) {
|
|
|
+ SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
|
|
|
+ container.setQueues(agmpQueue());
|
|
|
+ container.setMessageListener(listener);
|
|
|
+ // 设置确认模式为自动确认
|
|
|
+ container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
|
|
|
+ container.setAmqpAdmin(agmpAmqpAdmin);
|
|
|
+
|
|
|
+ return container;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|