فهرست منبع

新增Iot消息队列

liuyaowen 1 سال پیش
والد
کامیت
a67903cc98

+ 3 - 2
src/main/java/com/yunfeiyun/agmp/iots/device/serviceImp/IotCbdImgService.java

@@ -6,7 +6,7 @@ import com.baomidou.mybatisplus.annotation.TableName;
 import com.yunfeiyun.agmp.common.constant.ResConstants;
 import com.yunfeiyun.agmp.common.constant.ResConstants;
 import com.yunfeiyun.agmp.common.utils.DateUtils;
 import com.yunfeiyun.agmp.common.utils.DateUtils;
 import com.yunfeiyun.agmp.common.utils.StringUtils;
 import com.yunfeiyun.agmp.common.utils.StringUtils;
-import com.yunfeiyun.agmp.iot.common.constant.mq.IotMqConstant;
+
 import com.yunfeiyun.agmp.iot.common.domain.*;
 import com.yunfeiyun.agmp.iot.common.domain.*;
 import com.yunfeiyun.agmp.iot.common.enums.EnumCbdRecogType;
 import com.yunfeiyun.agmp.iot.common.enums.EnumCbdRecogType;
 import com.yunfeiyun.agmp.iot.common.model.IotWarncheck;
 import com.yunfeiyun.agmp.iot.common.model.IotWarncheck;
@@ -159,7 +159,8 @@ public class IotCbdImgService {
             //要求预警检查 V3
             //要求预警检查 V3
             IotWarncheck warncheck = new IotWarncheck();
             IotWarncheck warncheck = new IotWarncheck();
             warncheck.setDevBid(iotDevice.getDevBid());
             warncheck.setDevBid(iotDevice.getDevBid());
-            iotsMqService.sendMsg(IotMqConstant.TOPIC_WARNCHECK, IotMqConstant.TOPIC_WARNCHECK, warncheck);
+            // TODO
+            // iotsMqService.sendMsg(IotMqConstant.TOPIC_WARNCHECK, IotMqConstant.TOPIC_WARNCHECK, warncheck);
         }else{
         }else{
             //预警
             //预警
             warnService.checkCbdData("0","",iotDevice.getDevBid());
             warnService.checkCbdData("0","",iotDevice.getDevBid());

+ 0 - 72
src/main/java/com/yunfeiyun/agmp/iots/mq/IotSpdrecogMqService.java

@@ -1,72 +0,0 @@
-package com.yunfeiyun.agmp.iots.mq;
-
-import com.alibaba.fastjson2.JSONObject;
-import com.yunfeiyun.agmp.common.constant.ResConstants;
-import com.yunfeiyun.agmp.common.framework.manager.ResManager;
-import com.yunfeiyun.agmp.common.framework.message.MqMsg;
-import com.yunfeiyun.agmp.common.utils.DateUtils;
-import com.yunfeiyun.agmp.iot.common.constant.mq.IotMqConstant;
-import com.yunfeiyun.agmp.iot.common.domain.IotSpdrecog;
-import com.yunfeiyun.agmp.iot.common.service.MongoService;
-import com.yunfeiyun.agmp.iot.common.util.tmn.CustomerIdUtil;
-import com.yunfeiyun.agmp.iots.core.http.HikCloudHttpClient;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.rabbit.annotation.Exchange;
-import org.springframework.amqp.rabbit.annotation.Queue;
-import org.springframework.amqp.rabbit.annotation.QueueBinding;
-import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.LinkedHashMap;
-import java.util.List;
-
-@Component
-@Slf4j
-public class IotSpdrecogMqService {
-    @Resource
-    private ResManager resManager;
-    @Resource
-    private HikCloudHttpClient hikCloudHttpClient;
-    @Resource
-    private MongoService mongoService;
-
-
-    @RabbitListener(bindings = @QueueBinding(value = @Queue(IotMqConstant.TOPIC_HIK_SPD_RECOG_QUEUE)
-            , exchange = @Exchange(value = "amq.topic", durable = "true", type = "topic")
-            , key = IotMqConstant.TOPIC_HIK_SPD_RECOG_QUEUE))
-    public void recvDeviceDelete(String s) {
-        log.info("【IOTS】收到海康多光谱相机触发定时接收光谱数据 messageBody={}", s);
-        try {
-            MqMsg mqMsg = JSONObject.parseObject(s, MqMsg.class);
-            log.info("海康多光谱相机触发定时接收光谱数据任务,触发时间:{}", DateUtils.dateTimeNow());
-            List<LinkedHashMap<String,Object>> data = JSONObject.parseObject(String.valueOf(mqMsg.getCmd()), ArrayList.class);
-            if(null != data&&!data.isEmpty()){
-                for(LinkedHashMap<String,Object> map : data){
-                    String content =String.valueOf(map.get("content"));
-                    JSONObject contentObject = JSONObject.parseObject(content);
-                    IotSpdrecog iotSpdrecog = new IotSpdrecog();
-                    iotSpdrecog.setSpdrecogBid(iotSpdrecog.getUUId());
-                    iotSpdrecog.setSpdrecogCreatedDate(DateUtils.dateTimeNow());
-                    iotSpdrecog.setSpdrecogIndextype(contentObject.getString("indexType"));
-                    iotSpdrecog.setSpdrecogEventtime(DateUtils.parseDateToStr(DateUtils.YYYY_MM_DD_HH_MM_SS,new Date(Long.parseLong(contentObject.getString("eventTime")))));
-                    iotSpdrecog.setSpdrecogIndexvalue(contentObject.getString("indexValue"));
-                    iotSpdrecog.setCId(CustomerIdUtil.getCustomerId());
-                    iotSpdrecog.setDevCode(contentObject.getString("deviceSerial"));
-                    iotSpdrecog.setSpdrecogScenename(contentObject.getString("sceneName"));
-                    String ossUrl = resManager.saveMonitorHttpsImg(contentObject.getString("picUrl"), ResConstants.BizType.IOT_SIOT_SPECTRAL_DETECT_PIC,"","jpg");
-                    iotSpdrecog.setSpdrecogPicurl(ossUrl);
-                    mongoService.saveOne(iotSpdrecog);
-                }
-            }
-
-        } catch (Exception e) {
-            log.error("【IOTS】收到海康多光谱相机触发定时接收光谱数据", e);
-        }
-    }
-
-
-
-}

+ 0 - 38
src/main/java/com/yunfeiyun/agmp/iots/mq/IotsCbdRecogReceiver.java

@@ -1,38 +0,0 @@
-package com.yunfeiyun.agmp.iots.mq;
-
-import com.alibaba.fastjson2.JSONObject;
-import com.yunfeiyun.agmp.common.framework.message.MqMsg;
-import com.yunfeiyun.agmp.iot.common.constant.mq.IotMqConstant;
-import com.yunfeiyun.agmp.iots.device.serviceImp.IotCbdImgService;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.rabbit.annotation.Exchange;
-import org.springframework.amqp.rabbit.annotation.Queue;
-import org.springframework.amqp.rabbit.annotation.QueueBinding;
-import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Slf4j
-@Component
-public class IotsCbdRecogReceiver {
-
-    @Autowired
-    private IotCbdImgService iotCbdImgService;
-
-    @RabbitListener(bindings = @QueueBinding(value = @Queue(IotMqConstant.CBD_IMAGE_AGAIN_RECOGE)
-            , exchange = @Exchange(value = "amq.topic", durable = "true", type = "topic")
-            , key = IotMqConstant.CBD_IMAGE_AGAIN_RECOGE))
-    public String againRecog(String content) {
-        log.info("【测报灯重新识别】:{}", content);
-        String result = null;
-        try{
-            MqMsg mqMsg = JSONObject.parseObject(content, MqMsg.class);
-            iotCbdImgService.iotmAgainRecog(mqMsg.getCmd());
-        }catch (Exception e){
-            log.error("【测报灯重新识别】:{} {}", content, e.getMessage());
-            result = "识别失败,请重试";
-            e.printStackTrace();
-        }
-        return result;
-    }
-}

+ 0 - 72
src/main/java/com/yunfeiyun/agmp/iots/mq/IotsFtpMqReceiver.java

@@ -1,72 +0,0 @@
-package com.yunfeiyun.agmp.iots.mq;
-
-import com.alibaba.fastjson2.JSONObject;
-import com.yunfeiyun.agmp.common.framework.manager.ResManager;
-import com.yunfeiyun.agmp.common.utils.DateUtils;
-import com.yunfeiyun.agmp.common.utils.StringUtils;
-import com.yunfeiyun.agmp.iot.common.constant.IotFtpFileTypeEnum;
-import com.yunfeiyun.agmp.iot.common.constant.devicetype.ServiceNameConst;
-import com.yunfeiyun.agmp.iot.common.domain.IotDevice;
-import com.yunfeiyun.agmp.iot.common.domain.IotMonitorCapture;
-import com.yunfeiyun.agmp.iot.common.service.MongoService;
-import com.yunfeiyun.agmp.iots.service.IIotDeviceService;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.rabbit.annotation.Exchange;
-import org.springframework.amqp.rabbit.annotation.Queue;
-import org.springframework.amqp.rabbit.annotation.QueueBinding;
-import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-
-/**
- * @author liuyaowen
- */
-@Slf4j
-@Component
-public class IotsFtpMqReceiver {
-    @Resource
-    private ResManager resManager;
-
-    @Resource
-    private MongoService mongoService;
-    @Resource
-    private IIotDeviceService iotDeviceService;
-
-
-    @RabbitListener(bindings = @QueueBinding(value = @Queue("ftp.file.upload.queue")
-            , exchange = @Exchange(value = "ftp.file.upload.exchange", durable = "true", type = "topic")
-            , key = "ftp.file.upload.routeKey"))
-    public void recvDeviceDelete(String s) {
-        log.info("【IOTS】收到FTP上传文件信息 messageBody={}", s);
-        try {
-            JSONObject fileData = JSONObject.parseObject(s);
-            if(fileData!=null&& IotFtpFileTypeEnum.KIK_MONITOR_IMAGE.getType().equals(fileData.get("fileType"))){
-                saveHikMonitorImage(fileData);
-            }
-        } catch (Exception e) {
-            log.error("【IOTS】接收FTP上传文件信息异常", e);
-        }
-    }
-
-    private void saveHikMonitorImage(JSONObject fileData){
-        String path = fileData.getString("path");
-        if(StringUtils.isNotEmpty(path)){
-            String[] pathParam = path.split("/");
-            String devCode = pathParam[pathParam.length-1];
-            String ossUrl = fileData.getString("ossUrl");
-            IotDevice iotDevice = iotDeviceService.selectDeviceByDeviceServiceNameAndDevCode(ServiceNameConst.SERVICE_HIK_EZVIZ_MINITOR,devCode);
-            if(null != iotDevice){
-                IotMonitorCapture iotMonitorCapture = new IotMonitorCapture();
-                iotMonitorCapture.setDevBid(iotDevice.getDevBid());
-                iotMonitorCapture.setPicBid(iotMonitorCapture.getUUId());
-                iotMonitorCapture.setPicCreatedDate(DateUtils.dateTimeNow());
-                iotMonitorCapture.setPicUrl(ossUrl);
-                iotMonitorCapture.setCid(iotDevice.getTid());
-                mongoService.saveOne(iotMonitorCapture);
-            }
-        }
-
-    }
-
-}

+ 0 - 244
src/main/java/com/yunfeiyun/agmp/iots/mq/IotsMqReceiver.java

@@ -1,244 +0,0 @@
-package com.yunfeiyun.agmp.iots.mq;
-
-import com.alibaba.fastjson2.JSONArray;
-import com.alibaba.fastjson2.JSONObject;
-import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
-import com.yunfeiyun.agmp.common.framework.message.MqMsg;
-import com.yunfeiyun.agmp.common.utils.JSONUtils;
-import com.yunfeiyun.agmp.common.utils.StringUtils;
-import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictConst;
-import com.yunfeiyun.agmp.iot.common.constant.devicetype.IotDeviceDictEnum;
-import com.yunfeiyun.agmp.iot.common.constant.mq.IotMqConstant;
-import com.yunfeiyun.agmp.iot.common.model.IotWarncheck;
-import com.yunfeiyun.agmp.iot.common.model.cmd.CmdGroupModel;
-import com.yunfeiyun.agmp.iots.core.cmd.core.CmdDispatcherService;
-import com.yunfeiyun.agmp.iots.core.manager.MqttManager;
-import com.yunfeiyun.agmp.iots.device.common.HttpDevice;
-import com.yunfeiyun.agmp.iots.service.IIotFirmdevService;
-import com.yunfeiyun.agmp.iots.service.impl.WarnService;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.rabbit.annotation.Exchange;
-import org.springframework.amqp.rabbit.annotation.Queue;
-import org.springframework.amqp.rabbit.annotation.QueueBinding;
-import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.stereotype.Component;
-
-import javax.annotation.Resource;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-/**
- * @author zhangn
- */
-@Slf4j
-@Component
-public class IotsMqReceiver {
-
-    @Autowired
-    private CmdDispatcherService cmdDispatcherService;
-
-    @Autowired
-    private MqttManager mqttManager;
-
-    @Autowired
-    private IIotFirmdevService iIotFirmdevService;
-
-
-
-
-
-    @Autowired
-    private WarnService warnService;
-    @Autowired
-    @Qualifier("cmdAsyncTaskExecutor")
-    private ThreadPoolTaskExecutor cmdAsyncTaskExecutor;
-    @Resource
-    private Map<String, HttpDevice> httpDeviceMap;
-
-    /**
-     * 指令集执行状态消息处理
-     *
-     * @param content
-     */
-    @RabbitListener(bindings = @QueueBinding(value = @Queue(IotMqConstant.TOPIC_TASK)
-            , exchange = @Exchange(value = "amq.topic", durable = "true", type = "topic")
-            , key = IotMqConstant.TOPIC_TASK))
-    public void recvTask(String content) {
-        log.info("【IOTS】收到iotm的 任务 messageBody={}", content);
-        try {
-            MqMsg mqMsg = JSONObject.parseObject(content, MqMsg.class);
-            log.info("【IOTS】收到iotm的 任务 cmd groups ={}", mqMsg.getCmd());
-            CmdGroupModel cmdGroupModel = JSONUtils.toObject(mqMsg.getCmd(), CmdGroupModel.class);
-            cmdDispatcherService.handleCmd(cmdGroupModel);
-        } catch (Exception e) {
-            log.error("【IOTS】【接收消息:指令集任务】【异常】", e);
-        }
-
-    }
-
-    /**
-     * 创建设备,并进行订阅
-     * 1. 解析数据
-     * 2. 订阅设备
-     * 3. 订阅结果返回给iotm
-     *
-     * @param content
-     */
-    @RabbitListener(bindings = @QueueBinding(value = @Queue(IotMqConstant.DEVICE_CREATE)
-            , exchange = @Exchange(value = "amq.topic", durable = "true", type = "topic")
-            , key = IotMqConstant.DEVICE_CREATE))
-    public void recvDeviceCreate(String content) {
-        log.info("【IOTS】收到iotm的 设备创建消息 messageBody={}", content);
-        try {
-            MqMsg mqMsg = JSONObject.parseObject(content, MqMsg.class);
-            log.info("【IOTS】收到iotm的 设备创建消息 messageBody ={} | {} ", mqMsg.getCmd(), mqMsg.getData());
-            Map<String, List<String>> serviceDeviceMap = getServiceDeviceMap(mqMsg);
-            //mqttManager.topicBatchUnSubscribeDevices(serviceDeviceMap);
-
-        } catch (Exception e) {
-            log.error("【IOTS】【接收消息:设备创建】【异常】", e);
-        }
-    }
-
-    @RabbitListener(bindings = @QueueBinding(value = @Queue(IotMqConstant.DEVICE_DELETE)
-            , exchange = @Exchange(value = "amq.topic", durable = "true", type = "topic")
-            , key = IotMqConstant.DEVICE_DELETE))
-    public void recvDeviceDelete(String s) {
-        log.info("【IOTS】收到iotm的 设备删除消息 messageBody={}", s);
-        try {
-            MqMsg mqMsg = JSONObject.parseObject(s, MqMsg.class);
-            log.info("【IOTS】收到iotm的 设备删除消息 messageBody ={} | {} ", mqMsg.getCmd(), mqMsg.getData());
-            Map<String, List<String>> serviceDeviceMap = getServiceDeviceMap(mqMsg);
-            //mqttManager.topicBatchUnSubscribeDevices(serviceDeviceMap);
-
-        } catch (Exception e) {
-            log.error("【IOTS】【接收消息:设备删除】【异常】", e);
-        }
-    }
-
-    /***
-     *
-     * 用于创建设备后需要主动获取最新信息,但是又不是mqtt协议
-     * 己根据类型自行实现策略 zhangn 2024年6月5日
-     * @param content
-     */
-    @RabbitListener(bindings = @QueueBinding(value = @Queue(IotMqConstant.DEVICE_CREATE_V2)
-            , exchange = @Exchange(value = "amq.topic", durable = "true", type = "topic")
-            , key = IotMqConstant.DEVICE_CREATE_V2))
-    public void recvDeviceCreateV2(String content) {
-        log.info("【IOTS】收到iotm的 设备创建消息V2 messageBody={}", content);
-        try {
-            MqMsg mqMsg = JSONObject.parseObject(content, MqMsg.class);
-            log.info("【IOTS】收到iotm的 设备创建消息V2 messageBody ={} | {} ", mqMsg.getCmd(), mqMsg.getData());
-            JSONArray mqArray = JSONArray.parseArray(mqMsg.getCmd());
-            List<String> xphDevCodeList = new ArrayList<>();
-            List<String> xphGpQxzDevCodeList = new ArrayList<>();
-            for (Object msg : mqArray) {
-                JSONObject deviceInfo = (JSONObject) msg;
-                String devtypeBid = deviceInfo.getString("devtypeBid");
-                String devBid = deviceInfo.getString("devBid");
-                // 自己判断处理
-                if (IotDeviceDictConst.TYPE_HS_YBQ_DWB.equals(devtypeBid) || IotDeviceDictConst.TYPE_HS_YBQ_CMB.equals(devtypeBid) || IotDeviceDictConst.TYPE_HS_YBQ_DBB.equals(devtypeBid)) {
-
-                } else if (IotDeviceDictConst.TYPE_XPH_WSKZ.equals(devtypeBid)
-                        || IotDeviceDictConst.TYPE_XPH_TRSH_CL.equals(devtypeBid)
-                        || IotDeviceDictConst.TYPE_XPH_WSHJ_JC.equals(devtypeBid)) {
-                    xphDevCodeList.add(deviceInfo.getString("devCode"));
-                } else if (IotDeviceDictConst.TYPE_XPH_GP_QXZ.equals(devtypeBid)) {
-                    xphGpQxzDevCodeList.add(deviceInfo.getString("devCode"));
-                }
-            }
-
-        } catch (Exception e) {
-            log.error("【IOTS】【接收消息:设备创建V2】【异常】", e);
-        }
-    }
-
-    private Map<String, List<String>> getServiceDeviceMap(MqMsg mqMsg) {
-        JSONArray mqArray = JSONArray.parseArray(mqMsg.getCmd());
-        List<String> devBidList = new ArrayList<>();
-        Map<String, List<String>> serviceKeyMap = new HashMap<>();
-        for (Object msg : mqArray) {
-            JSONObject deviceInfo = (JSONObject) msg;
-            String devCode = deviceInfo.getString("devCode");
-            String firmBizId = deviceInfo.getString("firmBid");
-            String devtypeBid = deviceInfo.getString("devtypeBid");
-            String devBid = deviceInfo.getString("devBid");
-            devBidList.add(devBid);
-
-            String k = firmBizId + devtypeBid;
-            if (!serviceKeyMap.containsKey(k)) {
-                serviceKeyMap.put(k, new ArrayList<>());
-            }
-            serviceKeyMap.get(k).add(devCode);
-        }
-        Map<String, String> serviceNameMap = iIotFirmdevService.getServiceNameMap(devBidList);
-        Map<String, List<String>> serviceDeviceMap = new HashMap<>();
-        for (Map.Entry<String, List<String>> entry : serviceKeyMap.entrySet()) {
-            String k = entry.getKey();
-            List<String> v = entry.getValue();
-            if (serviceNameMap.containsKey(k)) {
-                serviceDeviceMap.put(serviceNameMap.get(k), v);
-            }
-        }
-        return serviceDeviceMap;
-    }
-
-    /**
-     * ”预警检查“消息处理
-     *
-     * @param content
-     */
-    @RabbitListener(bindings = @QueueBinding(value = @Queue(IotMqConstant.TOPIC_WARNCHECK)
-            , exchange = @Exchange(value = "amq.topic", durable = "true", type = "topic")
-            , key = IotMqConstant.TOPIC_WARNCHECK))
-    public void recvWarncheck(String content) {
-        log.info("【IOTS】收到 预警检查 任务 messageBody={}", content);
-        try {
-            MqMsg mqMsg = JSONObject.parseObject(content, MqMsg.class);
-            log.info("【IOTS】收到 预警检查 任务 cmd  ={}", mqMsg.getCmd());
-            IotWarncheck warncheck = JSONUtils.toObject(mqMsg.getCmd(), IotWarncheck.class);
-            warnService.check(warncheck);
-        } catch (Exception e) {
-            log.error("【IOTS】【接收消息:预警检查 任务】【异常】", e);
-        }
-    }
-    /**
-     * 用于更新同步指定类型设备的所有同步接口,
-     * 一般用于http对接设备,用于从平台拉取设备所有信息
-     * */
-    @RabbitListener(bindings = @QueueBinding(value = @Queue(IotMqConstant.DEVICE_ALL_SYN)
-            , exchange = @Exchange(value = "amq.topic", durable = "true",type = "topic")
-            , key=IotMqConstant.DEVICE_ALL_SYN))
-    public void recvDeviceRefresh(String content){
-        log.info("【IOTS】收到设备信息同步任务 messageBody={}",content);
-        try {
-           MqMsg mqMsg = JSONObject.parseObject(content,MqMsg.class);
-           JSONObject param = JSONObject.parseObject(mqMsg.getCmd(),JSONObject.class);
-           String devTypeBid = param.getString("devTypeBid");
-           if(StringUtils.isEmpty(devTypeBid)){
-               log.error("【IOTS】设备信息同步任务异常,devTypeBid为空");
-               return;
-           }
-           IotDeviceDictEnum iotDeviceDictEnum = IotDeviceDictEnum.findEnumByCode(devTypeBid);
-           if(ObjectUtils.isNull(iotDeviceDictEnum)){
-               log.error("【IOTS】设备信息同步任务异常,未对接的设备类型");
-               return;
-           }
-           HttpDevice httpDevice = httpDeviceMap.get(iotDeviceDictEnum.getCode());
-           httpDevice.sysAllDevice();
-        }catch (Exception e){
-            log.error("【IOTS】执行设备信息同步任务失败",e);
-        }
-    }
-
-
-
-}

+ 79 - 0
src/main/java/com/yunfeiyun/agmp/iots/mq/bussiness/IotMqBusConfig.java

@@ -0,0 +1,79 @@
+package com.yunfeiyun.agmp.iots.mq.bussiness;
+
+
+import com.yunfeiyun.agmp.iot.common.constant.mq.IotMqExchange;
+import com.yunfeiyun.agmp.iot.common.constant.mq.IotMqQueue;
+import com.yunfeiyun.agmp.iots.mq.listener.IotsChannelGlobalAwareMessageListener;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+/**/
+import javax.annotation.PostConstruct;
+
+/**
+ * TOS 的 MQ配置
+ * 1. 链接信息
+ * 2. queue 定义
+ * 3. exchange 定义
+ * 4. routing key 定义
+ * 5. 链接工厂定义
+ */
+@Slf4j
+@Configuration
+@ConditionalOnBean(name = "iotMqBusConfig")
+public class IotMqBusConfig {
+
+    @Autowired
+    @Qualifier("iotAmqpAdmin")
+    private AmqpAdmin iotAmqpAdmin;
+
+
+
+    @PostConstruct
+    public void init() {
+        log.info("加载Tos 全局租户通道 Mq");
+        iotAmqpAdmin.declareExchange(iotmToIotsGlobalExchange());
+        iotAmqpAdmin.declareQueue(topicTaskQueue());
+        iotAmqpAdmin.declareBinding(topicTaskBinding());
+    }
+
+    public FanoutExchange iotmToIotsGlobalExchange() {
+        return new FanoutExchange(IotMqExchange.IOTM_TO_IOTS_EXCHANGE);
+    }
+
+    public Queue topicTaskQueue() {
+        return QueueBuilder.durable(IotMqQueue.IOTM_TO_IOTS_CMD_QUEUE).build();
+    }
+    public Binding topicTaskBinding() {
+        return BindingBuilder.bind(topicTaskQueue()).to(iotmToIotsGlobalExchange());
+    }
+
+    // ########################################## 接受tos消息 全部租户通道结束##########################################
+
+
+    /**
+     * 配置接受所有租户都要处理的监听器,用户消息回执
+     *
+     * @param connectionFactory
+     * @param lobleAwareMessageListener
+     * @return
+     */
+    @Bean("iotGlobleSimpleMessageListenerContainer")
+    public SimpleMessageListenerContainer iotsGlobleSimpleMessageListenerContainer(
+            @Qualifier("iotConnectionFactory") CachingConnectionFactory connectionFactory,
+            IotsChannelGlobalAwareMessageListener lobleAwareMessageListener) {
+        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
+        container.setQueues(topicTaskQueue());
+        container.setMessageListener(lobleAwareMessageListener);
+        container.setAmqpAdmin(iotAmqpAdmin);
+        // 设置确认模式为手动确认
+        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+        return container;
+    }
+}

+ 55 - 0
src/main/java/com/yunfeiyun/agmp/iots/mq/listener/IotsChannelGlobalAwareMessageListener.java

@@ -0,0 +1,55 @@
+package com.yunfeiyun.agmp.iots.mq.listener;
+
+import com.rabbitmq.client.Channel;
+import com.yunfeiyun.agmp.common.framework.mq.rabbitmq.enums.TosActionEnums;
+import com.yunfeiyun.agmp.common.framework.mq.rabbitmq.model.SynGlobalTenantInfoDto;
+import com.yunfeiyun.agmp.common.utils.JSONUtils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.io.IOException;
+
+/**
+ * 专门处理tos[所有租户]发的消息
+ * 目前暂时没有这个情况,但后面遇到全局的字典就会用到
+ *
+ * @author zhangnn
+ * @data 2024年10月9日
+ */
+@Component
+@Slf4j
+@ConditionalOnBean(name = "iotMqConfig")
+public class IotsChannelGlobalAwareMessageListener implements ChannelAwareMessageListener {
+
+
+    @Override
+    public void onMessage(Message message, Channel channel) throws Exception {
+        try {
+            // 处理消息
+            byte[] body = message.getBody();
+            String content = new String(body);
+            log.info("【SAAS:】收到Tos:所有租户处理的消息:{}", content);
+            SynGlobalTenantInfoDto synGlobalTenantInfoDto = JSONUtils.toObject(content, SynGlobalTenantInfoDto.class);
+            //根据不同的action进行相应业务处理
+            String action = synGlobalTenantInfoDto.getAction();
+            TosActionEnums tosActionEnums = TosActionEnums.getAction(action);
+            if (tosActionEnums != null) {
+                switch (tosActionEnums) {
+
+                }
+            } else {
+                log.error("【SAAS:】收到Tos:所有租户处理的消息:action 为空,忽略消息");
+            }
+            // 手动确认消息
+            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+        } catch (IOException e) {
+            // 处理异常,例如重新入队或拒绝消息
+            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
+        }
+    }
+}

+ 3 - 2
src/main/resources/application-dev.yml

@@ -40,7 +40,7 @@ server:
 logging:
 logging:
   level:
   level:
     com.yunfeiyun: INFO
     com.yunfeiyun: INFO
-    org.springframework: DEBUG
+    org.springframework: INFO
 
 
 # 用户配置
 # 用户配置
 user:
 user:
@@ -131,7 +131,7 @@ spring:
       # 热部署开关
       # 热部署开关
       enabled: true
       enabled: true
   rabbitmq:
   rabbitmq:
-    tos:
+    iot:
       host: 192.168.1.228
       host: 192.168.1.228
       port: 5672
       port: 5672
       username: admin
       username: admin
@@ -141,6 +141,7 @@ spring:
       publisher-returns: true
       publisher-returns: true
       enabled: true
       enabled: true
 
 
+
   # redis 配置
   # redis 配置
   redis:
   redis:
     # 地址
     # 地址