Просмотр исходного кода

优化设备批量创建订阅逻辑

liuyaowen 11 месяцев назад
Родитель
Сommit
6ac5ca2f9b

+ 52 - 18
src/main/java/com/yunfeiyun/agmp/iotm/mq/service/SendToIotsMsgService.java

@@ -17,6 +17,7 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -39,19 +40,18 @@ public class SendToIotsMsgService {
      * @param iotDevice
      */
     public void sendIotDeviceInsertMsg(IotDevice iotDevice) {
-        mqService.sendBaseDataToIots( IotActionEnums.IOT_DEVICE_CREATE, iotDevice, "【Iotm】to【Iots】物联网新增设备,同步到Iots");
+        mqService.sendBaseDataToIots(IotActionEnums.IOT_DEVICE_CREATE, iotDevice, "【Iotm】to【Iots】物联网新增设备,同步到Iots");
 
     }
 
 
-
     /**
      * 物联网编辑设备,同步到iots  ok
      *
      * @param iotDevice
      */
     public void sendIotDeviceUpdateMsg(IotDeviceEditMqModel iotDeviceEditMqModel) {
-        mqService.sendBaseDataToIots( IotActionEnums.IOT_DEVICE_UPDATE, iotDeviceEditMqModel, "【Iotm】to【Iots】物联网编辑设备,同步到Iots");
+        mqService.sendBaseDataToIots(IotActionEnums.IOT_DEVICE_UPDATE, iotDeviceEditMqModel, "【Iotm】to【Iots】物联网编辑设备,同步到Iots");
 
     }
 
@@ -61,15 +61,16 @@ public class SendToIotsMsgService {
      * @param iotDevice
      */
     public void sendIotDeviceDeleteMsg(IotDevice iotDevice) {
-        mqService.sendBaseDataToIots( IotActionEnums.IOT_DEVICE_DELETE, iotDevice, "【Iotm】to【Iots】物联网删除设备,同步到Iots");
+        mqService.sendBaseDataToIots(IotActionEnums.IOT_DEVICE_DELETE, iotDevice, "【Iotm】to【Iots】物联网删除设备,同步到Iots");
     }
+
     /**
      * 物联网新增设备型号,同步到iots ok
      *
      * @param iotDeviceconn
      */
     public void sendIotDeviceconnInsertMsg(IotDeviceconn iotDeviceconn) {
-        mqService.sendBaseDataToIots( IotActionEnums.DEVICE_CONN_CREATE, iotDeviceconn, "【Iotm】to【Iots】物联网新增设备连接,同步到Iots");
+        mqService.sendBaseDataToIots(IotActionEnums.DEVICE_CONN_CREATE, iotDeviceconn, "【Iotm】to【Iots】物联网新增设备连接,同步到Iots");
     }
 
     /**
@@ -78,7 +79,7 @@ public class SendToIotsMsgService {
      * @param iotDeviceconn
      */
     public void sendIotDeviceconnUpdateMsg(IotDeviceconn iotDeviceconn) {
-        mqService.sendBaseDataToIots( IotActionEnums.DEVICE_CONN_UPDATE, iotDeviceconn, "【Iotm】to【Iots】物联网编辑设备连接,同步到Iots");
+        mqService.sendBaseDataToIots(IotActionEnums.DEVICE_CONN_UPDATE, iotDeviceconn, "【Iotm】to【Iots】物联网编辑设备连接,同步到Iots");
     }
 
     /**
@@ -87,32 +88,65 @@ public class SendToIotsMsgService {
      * @param iotDeviceconn
      */
     public void sendIotDeviceconnDeleteMsg(IotDeviceconn iotDeviceconn) {
-        mqService.sendBaseDataToIots( IotActionEnums.DEVICE_CONN_DELETE, iotDeviceconn, "【Iotm】to【Iots】物联网删除设备连接,同步到Iots");
+        mqService.sendBaseDataToIots(IotActionEnums.DEVICE_CONN_DELETE, iotDeviceconn, "【Iotm】to【Iots】物联网删除设备连接,同步到Iots");
     }
 
-    public void sendCmdMsg(CmdGroupModel cmdGroupModel){
-        mqService.sendCmdToIots(cmdGroupModel,"【Iotm】 to 【iots】物联网设备指令,同步到Iots");
+    public void sendCmdMsg(CmdGroupModel cmdGroupModel) {
+        mqService.sendCmdToIots(cmdGroupModel, "【Iotm】 to 【iots】物联网设备指令,同步到Iots");
     }
 
     /**
      * 物联网同步所有设备信息,同步iots  ok
+     *
      * @param devTypeCode 设备型号
-     * @param times 时间间隔
-     * @param timeUnit 时间单位
+     * @param times       时间间隔
+     * @param timeUnit    时间单位
      */
-    public void sendIotSynAllDeviceMsg(String devTypeCode,long times,TimeUnit timeUnit) {
-        if(Boolean.TRUE.equals(redisCacheManager.hasKey(RedisCacheKey.IOT_SYN_ALL_DEVICE_TIME_LIMIT,devTypeCode))){
-            log.debug("更新设备状态频繁,设备类型:{},间隔时间:{},间隔单位:{}",devTypeCode,times,timeUnit);
+    public void sendIotSynAllDeviceMsg(String devTypeCode, long times, TimeUnit timeUnit) {
+        if (Boolean.TRUE.equals(redisCacheManager.hasKey(RedisCacheKey.IOT_SYN_ALL_DEVICE_TIME_LIMIT, devTypeCode))) {
+            SendToIotsMsgService.log.debug("更新设备状态频繁,设备类型:{},间隔时间:{},间隔单位:{}", devTypeCode, times, timeUnit);
             return;
         }
         JSONObject data = new JSONObject();
-        data.put("devTypeCode",devTypeCode);
-        mqService.sendBaseDataToIots( IotActionEnums.DEVICE_ALL_SYN,data, "【Iotm】to【Iots】物联网同步所有设备信息,同步到Iots");
-        redisCacheManager.setCacheObject(RedisCacheKey.IOT_SYN_ALL_DEVICE_TIME_LIMIT,devTypeCode,devTypeCode,times,timeUnit);
+        data.put("devTypeCode", devTypeCode);
+        mqService.sendBaseDataToIots(IotActionEnums.DEVICE_ALL_SYN, data, "【Iotm】to【Iots】物联网同步所有设备信息,同步到Iots");
+        redisCacheManager.setCacheObject(RedisCacheKey.IOT_SYN_ALL_DEVICE_TIME_LIMIT, devTypeCode, devTypeCode, times, timeUnit);
 
     }
 
     public void sendTosDeviceTypeUpdate(TosDevicetype tosDevicetype) {
-         mqService.sendBaseDataToIots(IotActionEnums.DEVICE_COMMON_CONN_UPDATE,tosDevicetype,"【Iotm】to【Iots】物联网设备型号更新,同步到Iots");
+        mqService.sendBaseDataToIots(IotActionEnums.DEVICE_COMMON_CONN_UPDATE, tosDevicetype, "【Iotm】to【Iots】物联网设备型号更新,同步到Iots");
+    }
+
+    public void sendIotDeviceInsertByBatchMsg(IotDeviceconn iotDeviceconn, List<IotDevice> iotDeviceList) {
+        // 考虑mq消息长度对性能的影响,将数据进行拆分
+        // 每条消息的包含的设备数量
+        int pageSize = 100;
+        // 页码
+        int pageNo = 1;
+        // 起始索引
+        int startIndex = 0;
+        // 中止索引
+        int endIndex = Math.min(100, iotDeviceList.size());
+        while (startIndex < endIndex) {
+            // 进行分片
+            List<IotDevice> bodyItem = iotDeviceList.subList(startIndex, endIndex);
+            if (!bodyItem.isEmpty()) {
+                // 有数据的情况下,进行推送
+                JSONObject body = new JSONObject();
+                body.put("bodyItem", bodyItem);
+                body.put("iotDeviceconn", iotDeviceconn);
+                mqService.sendBaseDataToIots(IotActionEnums.IOT_DEVICE_CREATE, body, "【】");
+            } else {
+                // 当前页无数据,直接退出
+                return;
+            }
+            // 页码增加
+            pageNo++;
+            // 从中止索引重新开始分片
+            startIndex = endIndex;
+            // 取当前页的最大数据索引与总数据长度的最小值作为中止索引
+            endIndex = Math.min(pageNo * pageSize, iotDeviceList.size());
+        }
     }
 }

+ 1 - 1
src/main/java/com/yunfeiyun/agmp/iotm/web/service/IIotDeviceService.java

@@ -49,7 +49,7 @@ public interface IIotDeviceService {
      * @param iotDeviceList 设备基础列表
      * @return 结果
      */
-    public int insertIotDeviceByBatch(List<IotDevice> iotDeviceList);
+    public int insertIotDeviceByBatch(IotDeviceconn iotDeviceconn, List<IotDevice> iotDeviceList);
 
     /**
      * 修改设备基础

+ 6 - 3
src/main/java/com/yunfeiyun/agmp/iotm/web/service/impl/IotDeviceServiceImpl.java

@@ -64,6 +64,7 @@ public class IotDeviceServiceImpl implements IIotDeviceService {
     @Value("${runMode.env:1}")
     private String runMode;
 
+
     /**
      * 新增设备基础
      *
@@ -138,7 +139,7 @@ public class IotDeviceServiceImpl implements IIotDeviceService {
             insertIotDeviceList.add(iotDevice);
         }
 
-        return insertIotDeviceByBatch(insertIotDeviceList);
+        return insertIotDeviceByBatch(iotDeviceconn, insertIotDeviceList);
     }
 
     @Override
@@ -256,6 +257,7 @@ public class IotDeviceServiceImpl implements IIotDeviceService {
         return selectIotDeviceList(iotDevice, true);
     }
 
+
     /**
      * 查询设备基础列表
      *
@@ -279,7 +281,7 @@ public class IotDeviceServiceImpl implements IIotDeviceService {
      * @return 结果
      */
     @Override
-    public int insertIotDeviceByBatch(List<IotDevice> iotDeviceList) {
+    public int insertIotDeviceByBatch(IotDeviceconn iotDeviceconn, List<IotDevice> iotDeviceList) {
         for (IotDevice iotDevice : iotDeviceList) {
             iotDevice.setTid(SecurityUtils.getTid());
         }
@@ -288,8 +290,9 @@ public class IotDeviceServiceImpl implements IIotDeviceService {
         // 获取连接 订阅设备
         for (IotDevice iotDevice : iotDeviceList) {
             sendToTosMsgService.sendIotDeviceInsertMsg(iotDevice);
-            sendToIotsMsgService.sendIotDeviceInsertMsg(iotDevice);
         }
+        // 同步至iots进行设备订阅
+        sendToIotsMsgService.sendIotDeviceInsertByBatchMsg(iotDeviceconn, iotDeviceList);
         new Thread(() -> {
             try {
                 Thread.sleep(2000);