|
@@ -26,6 +26,7 @@ public class MqttSubscriber implements MqttCallback {
|
|
|
private HashMap<String, String> map = new HashMap<>();
|
|
private HashMap<String, String> map = new HashMap<>();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+ //声明了监听的队列(初始化监听队列)
|
|
|
@PostConstruct
|
|
@PostConstruct
|
|
|
public void init() throws MqttException {
|
|
public void init() throws MqttException {
|
|
|
String[] topic = {
|
|
String[] topic = {
|
|
@@ -50,22 +51,25 @@ public class MqttSubscriber implements MqttCallback {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
@Override
|
|
@Override
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 实际接收消息
|
|
|
|
|
+ */
|
|
|
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
|
|
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
|
|
|
try {
|
|
try {
|
|
|
String[] split = s.split("/");
|
|
String[] split = s.split("/");
|
|
|
- String deviceId = split[2];
|
|
|
|
|
- String product = split[1];
|
|
|
|
|
|
|
+ String deviceId = split[2]; //从Mqtt topic上取设备编号
|
|
|
|
|
+ String product = split[1]; //从Mqtt topic上取产品编号
|
|
|
JSONObject obj = JSON.parseObject(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
|
|
JSONObject obj = JSON.parseObject(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
|
|
|
obj.put("deviceId", deviceId);
|
|
obj.put("deviceId", deviceId);
|
|
|
- if (!obj.containsKey("functionId")) {
|
|
|
|
|
|
|
+ if (!obj.containsKey("functionId")) { //根据方法id动态加载方法
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
- if (map.get(product) != null) {
|
|
|
|
|
|
|
+ if (map.get(product) != null) { //从配置项中根据产品获取对应的适配器
|
|
|
Object bean = beanInit.getBean(map.get(product));
|
|
Object bean = beanInit.getBean(map.get(product));
|
|
|
Method[] declaredMethods = bean.getClass().getDeclaredMethods();
|
|
Method[] declaredMethods = bean.getClass().getDeclaredMethods();
|
|
|
for (Method declaredMethod : declaredMethods) {
|
|
for (Method declaredMethod : declaredMethods) {
|
|
|
- if (declaredMethod.getName().equals(obj.get("functionId"))) {
|
|
|
|
|
- declaredMethod.setAccessible(true);
|
|
|
|
|
|
|
+ if (declaredMethod.getName().equals(obj.get("functionId"))) { //根据MQTT传过来来的方法名 加载方法
|
|
|
|
|
+ declaredMethod.setAccessible(true); //将访问修饰符设置为public
|
|
|
declaredMethod.invoke(bean, new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
|
|
declaredMethod.invoke(bean, new String(mqttMessage.getPayload(), StandardCharsets.UTF_8));
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|