Przeglądaj źródła

新增mqtt发布消息线程池

liuyaowen 1 rok temu
rodzic
commit
144aef8beb

+ 13 - 0
src/main/java/com/yunfeiyun/agmp/iots/config/CmdAsyncTaskConfig.java

@@ -30,4 +30,17 @@ public class CmdAsyncTaskConfig {
         threadPoolTaskExecutor.initialize();
         return threadPoolTaskExecutor;
     }
+
+    @Bean("MqttTopicPublishExecutor")
+    public ThreadPoolTaskExecutor MqttTopicPublishExecutor() {
+        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
+        threadPoolTaskExecutor.setThreadNamePrefix("MqttTopicPublishExecutor-");//线程前缀
+        threadPoolTaskExecutor.setCorePoolSize( 2);//核心线程数
+        threadPoolTaskExecutor.setMaxPoolSize(4);//最大线程数
+        threadPoolTaskExecutor.setQueueCapacity(50);//等待队列
+        threadPoolTaskExecutor.setKeepAliveSeconds(30);//线程池维护线程所允许的空闲时间,单位为秒
+        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 线程池对拒绝任务(无线程可用)的处理策略
+        threadPoolTaskExecutor.initialize();
+        return threadPoolTaskExecutor;
+    }
 }

+ 4 - 1
src/main/java/com/yunfeiyun/agmp/iots/core/manager/MqttManager.java

@@ -20,6 +20,7 @@ import com.yunfeiyun.agmp.iots.core.mqtt.network.MqttPublisher;
 import com.yunfeiyun.agmp.iots.device.common.Device;
 import com.yunfeiyun.agmp.iots.service.IIotDeviceService;
 import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
@@ -52,7 +53,7 @@ public class MqttManager {
     /**
      * 实现类名称-->mqtt
      */
-    private ConcurrentHashMap<String, MqttCore> mqttCoreMap = new ConcurrentHashMap<>();
+    private Map<String, MqttCore> mqttCoreMap = new HashMap<>();
 
     /**
      * spring 自动注入
@@ -544,4 +545,6 @@ public class MqttManager {
         }
         return null;
     }
+
+
 }

+ 23 - 2
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttPublisher.java

@@ -1,10 +1,17 @@
 package com.yunfeiyun.agmp.iots.core.mqtt.network;
 
 
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.extra.spring.SpringUtil;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttException;
+import org.springframework.context.annotation.Bean;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadPoolExecutor;
 
 @Slf4j
 public class MqttPublisher {
@@ -13,13 +20,27 @@ public class MqttPublisher {
 
     private MqttClient mqttClient;
 
+    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+
+
     public void init(MqttCore mqttCore) {
         this.mqttCore = mqttCore;
         this.mqttClient = mqttCore.getClient();
+        this.threadPoolTaskExecutor = SpringUtil.getBean("MqttTopicPublishExecutor");
     }
 
     public void publish(String topic, String message) throws MqttException {
-        log.info("发MQTT消息,topic:{},message:{}",topic,message);
-        mqttClient.publish(topic, message.getBytes(), 0, false);
+         CompletableFuture.runAsync(() -> {
+            try {
+                log.info("发MQTT消息,topic:{},message:{}",topic,message);
+                mqttClient.publish(topic, message.getBytes(), 0, false);
+            }catch (Exception e){
+                log.error("发MQTT消息失败",e);
+            }
+        },threadPoolTaskExecutor);
+
     }
+
+
 }

+ 4 - 4
src/main/java/com/yunfeiyun/agmp/iots/core/mqtt/network/MqttSubscriber.java

@@ -38,9 +38,9 @@ public class MqttSubscriber implements MqttCallbackExtended {
      * 实际接收消息
      */
     @Override
-    //@Async("asyncServiceExecutor")
     public void messageArrived(String topic, MqttMessage mqttMessage) {
         try {
+
             String msgContent = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
             log.info("【上报数据:收到】收到mqtt消息:" + topic + ", " + msgContent);
             JSONObject obj = null;
@@ -88,11 +88,11 @@ public class MqttSubscriber implements MqttCallbackExtended {
         if(reconnect){
             loggerMsg = "连接断开,重连成功";
         }
-        log.info("[MQTT] {}...{}" ,loggerMsg, serverURI);
+        log.info("[MQTT] clientId :{} {}...{}",this.mqttClient.getClientId() ,loggerMsg, serverURI);
         String[] topics = mqttCore.getSubTopic();
         if(topics==null||topics.length==0){
             mqttClient.setCallback(this);
-            log.info("[MQTT] {} 无topic ",loggerMsg);
+            log.info("[MQTT] clientId :{} {} 无topic ",this.mqttClient.getClientId(),loggerMsg);
             return;
         }
         for (String topic : topics) {
@@ -102,7 +102,7 @@ public class MqttSubscriber implements MqttCallbackExtended {
             }
             try {
                 mqttClient.subscribe(topic);
-                log.info("[MQTT]{} {} 订阅主题 {}",mqttCore.getServiceType(), loggerMsg, topic);
+                log.info("[MQTT] clientId :{} {} {} 订阅主题 {}",this.mqttClient.getClientId(),mqttCore.getServiceType(), loggerMsg, topic);
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }