|
@@ -1,6 +1,8 @@
|
|
|
package com.yunfei.adapter.accept;
|
|
package com.yunfei.adapter.accept;
|
|
|
|
|
|
|
|
import com.alibaba.fastjson.JSON;
|
|
import com.alibaba.fastjson.JSON;
|
|
|
|
|
+import com.alibaba.fastjson.JSONArray;
|
|
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
import com.yunfei.adapter.http.uniagro.UniagroHttp;
|
|
import com.yunfei.adapter.http.uniagro.UniagroHttp;
|
|
|
import jakarta.annotation.PostConstruct;
|
|
import jakarta.annotation.PostConstruct;
|
|
|
import jakarta.annotation.Resource;
|
|
import jakarta.annotation.Resource;
|
|
@@ -8,28 +10,33 @@ import org.eclipse.paho.client.mqttv3.*;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
|
|
|
|
+import java.lang.reflect.Method;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
import java.nio.charset.StandardCharsets;
|
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
|
|
|
+import java.util.Map;
|
|
|
|
|
|
|
|
@Service
|
|
@Service
|
|
|
public class MqttSubscriber implements MqttCallback {
|
|
public class MqttSubscriber implements MqttCallback {
|
|
|
|
|
|
|
|
@Autowired
|
|
@Autowired
|
|
|
private MqttClient mqttClient;
|
|
private MqttClient mqttClient;
|
|
|
- @Resource
|
|
|
|
|
- private UniagroHttp uniagroHttp;
|
|
|
|
|
|
|
|
|
|
|
|
+ @Resource
|
|
|
|
|
+ private FuncHandleBeanInit beanInit;
|
|
|
private HashMap<String, String> map = new HashMap<>();
|
|
private HashMap<String, String> map = new HashMap<>();
|
|
|
- private String topic = "/adapter/write/#";
|
|
|
|
|
|
|
+ private String[] topic = {
|
|
|
|
|
+ "/JP/A1030119BCE5EC92/adapter/write",
|
|
|
|
|
+ "/PY/9100/adapter/write"
|
|
|
|
|
+ };
|
|
|
|
|
|
|
|
@PostConstruct
|
|
@PostConstruct
|
|
|
public void init() throws MqttException {
|
|
public void init() throws MqttException {
|
|
|
mqttClient.setCallback(this);
|
|
mqttClient.setCallback(this);
|
|
|
mqttClient.connect();
|
|
mqttClient.connect();
|
|
|
mqttClient.subscribe(topic);
|
|
mqttClient.subscribe(topic);
|
|
|
-
|
|
|
|
|
map.put("ZZ", "uniagro");
|
|
map.put("ZZ", "uniagro");
|
|
|
map.put("PY", "uniagro");
|
|
map.put("PY", "uniagro");
|
|
|
|
|
+ map.put("JP", "NHH");
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -40,13 +47,21 @@ public class MqttSubscriber implements MqttCallback {
|
|
|
@Override
|
|
@Override
|
|
|
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
|
|
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
|
|
|
try {
|
|
try {
|
|
|
- String[] split = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8).split("/");
|
|
|
|
|
- String deviceId = split[4];
|
|
|
|
|
- String product = split[3];
|
|
|
|
|
|
|
+ String[] split = s.split("/");
|
|
|
|
|
+ String deviceId = split[2];
|
|
|
|
|
+ String product = split[1];
|
|
|
|
|
+ JSONObject obj = JSON.parseObject(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
|
|
|
|
|
+ if (!obj.containsKey("functionId")) {
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
if (map.get(product) != null) {
|
|
if (map.get(product) != null) {
|
|
|
- if (map.get(product).equals("uniagro")) {
|
|
|
|
|
- HashMap<String, Object> hashMap = JSON.parseObject(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8), HashMap.class);
|
|
|
|
|
- uniagroHttp.write(deviceId, hashMap);
|
|
|
|
|
|
|
+ Object bean = beanInit.getBean(map.get(product));
|
|
|
|
|
+ Method[] declaredMethods = bean.getClass().getDeclaredMethods();
|
|
|
|
|
+ for (Method declaredMethod : declaredMethods) {
|
|
|
|
|
+ if(declaredMethod.getName().equals(obj.get("functionId"))){
|
|
|
|
|
+ declaredMethod.setAccessible(true);
|
|
|
|
|
+ declaredMethod.invoke(bean, new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
@@ -59,4 +74,6 @@ public class MqttSubscriber implements MqttCallback {
|
|
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
|
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
|
|
|
// 消息交付完成时的处理逻辑
|
|
// 消息交付完成时的处理逻辑
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
}
|
|
}
|