Prechádzať zdrojové kódy

fix(设备会话): 修复设置DeviceOnlineMessage header无效

zhouhao 2 rokov pred
rodič
commit
a9b906a0d5

+ 36 - 33
jetlinks-manager/device-manager/src/main/java/org/jetlinks/community/device/message/DeviceMessageConnector.java

@@ -110,7 +110,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
             Message msg = ((ChildDeviceMessage) message).getChildDeviceMessage();
             if (msg instanceof DeviceMessage) {
                 builder.append("/message/children/")
-                    .append(((DeviceMessage) msg).getDeviceId());
+                       .append(((DeviceMessage) msg).getDeviceId());
             } else {
                 builder.append("/message/children");
             }
@@ -121,7 +121,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
             Message msg = ((ChildDeviceMessageReply) message).getChildDeviceMessage();
             if (msg instanceof DeviceMessage) {
                 builder.append("/message/children/reply/")
-                    .append(((DeviceMessage) msg).getDeviceId());
+                       .append(((DeviceMessage) msg).getDeviceId());
             } else {
                 builder.append("/message/children/reply");
             }
@@ -142,48 +142,51 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
         this.registry = registry;
         this.eventBus = eventBus;
         this.messageHandler = messageHandler;
-        sessionManager.listenEvent(event->{
-            if(event.isClusterExists()){
+        sessionManager.listenEvent(event -> {
+            if (event.isClusterExists()) {
                 return Mono.empty();
             }
             //从会话管理器里监听会话注册,转发为设备上线消息
-            if(event.getType()== DeviceSessionEvent.Type.unregister){
-                return this.handleSessionUnregister(event.getSession());
+            if (event.getType() == DeviceSessionEvent.Type.unregister) {
+                return handleSessionMessage(new DeviceOnlineMessage(),event.getSession());
             }
             //从会话管理器里监听会话注销,转发为设备离线消息
-            if(event.getType()== DeviceSessionEvent.Type.register){
-                return this.handleSessionRegister(event.getSession());
+            if (event.getType() == DeviceSessionEvent.Type.register) {
+                return handleSessionMessage(new DeviceOfflineMessage(),event.getSession());
             }
             return Mono.empty();
         });
     }
 
+    private Mono<Void> handleSessionMessage(CommonDeviceMessage<?> message, DeviceSession session) {
+        return Mono.deferContextual(ctx -> {
 
-    protected Mono<Void> handleSessionRegister(DeviceSession session) {
-        DeviceOnlineMessage message = new DeviceOnlineMessage();
-        message.addHeader("from", "session-register");
-        //添加客户端地址信息
-        message.addHeader("address", session.getClientAddress().map(InetSocketAddress::toString).orElse(""));
-        message.setDeviceId(session.getDeviceId());
-        message.setTimestamp(System.currentTimeMillis());
-        return this
-            .onMessage(message)
-            .onErrorResume(doOnError);
-    }
+            //填充触发会话的header信息
+            ctx.<DeviceMessage>getOrEmpty(DeviceMessage.class)
+               .ifPresent(msg -> {
+                   if (msg.getHeaders() != null) {
+                       msg.getHeaders().forEach(message::addHeaderIfAbsent);
+                   }
+                   //上线离线由何种消息触发
+                   message.addHeader("_createBy", msg.getMessageType().name());
+               });
+
+            message.setDeviceId(session.getDeviceId());
+            message.setTimestamp(System.currentTimeMillis());
+
+            message.addHeader("connectTime", session.connectTime());
+            message.addHeader("from", "session");
+            //子设备会话时添加上级设备id到header中,下游可以直接通过获取header来获取上级设备id
+            if (session.isWrapFrom(ChildrenDeviceSession.class)) {
+                ChildrenDeviceSession child = session.unwrap(ChildrenDeviceSession.class);
+                message.addHeader("parentId", child.getParentDevice().getDeviceId());
+            }
+            return this
+                .onMessage(message)
+                .onErrorResume(doOnError);
+
+        });
 
-    protected Mono<Void> handleSessionUnregister(DeviceSession session) {
-        DeviceOfflineMessage message = new DeviceOfflineMessage();
-        message.addHeader("from", "session-unregister");
-        message.setDeviceId(session.getDeviceId());
-        message.setTimestamp(System.currentTimeMillis());
-        //子设备会话时添加上级设备id到header中,下游可以直接通过获取header来获取上级设备id
-        if (session.isWrapFrom(ChildrenDeviceSession.class)) {
-            ChildrenDeviceSession child = session.unwrap(ChildrenDeviceSession.class);
-            message.addHeader("parentId", child.getParentDevice().getDeviceId());
-        }
-        return this
-            .onMessage(message)
-            .onErrorResume(doOnError);
     }
 
     public static Flux<String> createDeviceMessageTopic(DeviceRegistry deviceRegistry, Message message) {
@@ -206,7 +209,7 @@ public class DeviceMessageConnector implements DecodedClientMessageHandler {
                         List<String> topics = new ArrayList<>(2);
                         topics.add(topic);
                         configs.getValue(PropertyConstants.orgId)
-                            .ifPresent(orgId -> topics.add("/org/" + orgId + topic));
+                               .ifPresent(orgId -> topics.add("/org/" + orgId + topic));
 
                         return topics;
                     });