Browse Source

createDemo

Y_ANXI 2 years ago
commit
0832634325

+ 33 - 0
.gitignore

@@ -0,0 +1,33 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/

+ 0 - 0
fd5a18f983da4b56b4c9226cebf77b40-tcp47961231801883/.lck


+ 108 - 0
pom.xml

@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>com.yunfei</groupId>
+    <artifactId>adapter</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>adapter</name>
+    <description>adapter</description>
+    <properties>
+        <java.version>17</java.version>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <spring-boot.version>3.0.2</spring-boot.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.t-io</groupId>
+            <artifactId>tio-core</artifactId>
+            <version>3.5.0.v20190822-RELEASE</version>
+        </dependency>
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+            <version>5.7.12</version>
+        </dependency>
+        <dependency>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.2.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-aop</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.80</version>
+        </dependency>
+        <dependency>
+            <groupId>org.aspectj</groupId>
+            <artifactId>aspectjweaver</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter</artifactId>
+        </dependency>
+
+    </dependencies>
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-dependencies</artifactId>
+                <version>${spring-boot.version}</version>
+                <type>pom</type>
+                <scope>import</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.8.1</version>
+                <configuration>
+                    <source>17</source>
+                    <target>17</target>
+                    <encoding>UTF-8</encoding>
+                    <compilerArgs>
+                        <arg>-parameters</arg>
+                    </compilerArgs>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <version>${spring-boot.version}</version>
+                <configuration>
+                    <mainClass>com.yunfei.adapter.AdapterApplication</mainClass>
+                    <skip>true</skip>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>repackage</id>
+                        <goals>
+                            <goal>repackage</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 15 - 0
src/main/java/com/yunfei/adapter/AdapterApplication.java

@@ -0,0 +1,15 @@
+package com.yunfei.adapter;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@SpringBootApplication
+@EnableScheduling
+public class AdapterApplication {
+
+    public static void main(String[] args) {
+        SpringApplication.run(AdapterApplication.class, args);
+    }
+
+}

+ 62 - 0
src/main/java/com/yunfei/adapter/accept/MqttSubscriber.java

@@ -0,0 +1,62 @@
+package com.yunfei.adapter.accept;
+
+import com.alibaba.fastjson.JSON;
+import com.yunfei.adapter.http.uniagro.UniagroHttp;
+import jakarta.annotation.PostConstruct;
+import jakarta.annotation.Resource;
+import org.eclipse.paho.client.mqttv3.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+
+@Service
+public class MqttSubscriber implements MqttCallback {
+
+    @Autowired
+    private MqttClient mqttClient;
+    @Resource
+    private UniagroHttp uniagroHttp;
+
+    private HashMap<String, String> map = new HashMap<>();
+    private String topic = "/adapter/write/#";
+
+    @PostConstruct
+    public void init() throws MqttException {
+        mqttClient.setCallback(this);
+        mqttClient.connect();
+        mqttClient.subscribe(topic);
+
+        map.put("ZZ", "uniagro");
+        map.put("PY", "uniagro");
+    }
+
+    @Override
+    public void connectionLost(Throwable throwable) {
+        // 连接丢失时的处理逻辑
+    }
+
+    @Override
+    public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
+        try {
+            String[] split = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8).split("/");
+            String deviceId = split[4];
+            String product = split[3];
+            if (map.get(product) != null) {
+                if (map.get(product).equals("uniagro")) {
+                    HashMap<String, Object> hashMap = JSON.parseObject(new String(mqttMessage.getPayload(), StandardCharsets.UTF_8), HashMap.class);
+                    uniagroHttp.write(deviceId, hashMap);
+                }
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
+        // 消息交付完成时的处理逻辑
+    }
+}

+ 28 - 0
src/main/java/com/yunfei/adapter/forword/MqttConfig.java

@@ -0,0 +1,28 @@
+package com.yunfei.adapter.forword;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.UUID;
+
+@Configuration
+public class MqttConfig {
+
+
+    @Bean
+    public MqttConnectOptions mqttConnectOptions() {
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setServerURIs(new String[]{"tcp://47.96.123.180:1883"});
+        options.setUserName("admin");
+        options.setPassword("admin".toCharArray());
+        return options;
+    }
+
+    @Bean
+    public MqttClient mqttClient() throws MqttException {
+        return new MqttClient("tcp://47.96.123.180:1883", UUID.randomUUID().toString().replace("-",""));
+    }
+}

+ 21 - 0
src/main/java/com/yunfei/adapter/forword/MqttPublisher.java

@@ -0,0 +1,21 @@
+package com.yunfei.adapter.forword;
+
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class MqttPublisher {
+
+    @Autowired
+    private MqttClient mqttClient;
+
+
+    public void publish(String topic, String message) throws MqttException {
+        if (!mqttClient.isConnected()) {
+            mqttClient.connect();
+        }
+        mqttClient.publish(topic, message.getBytes(), 0, false);
+    }
+}

File diff suppressed because it is too large
+ 15 - 0
src/main/java/com/yunfei/adapter/http/uniagro/UniagroConfig.java


+ 60 - 0
src/main/java/com/yunfei/adapter/http/uniagro/UniagroHttp.java

@@ -0,0 +1,60 @@
+package com.yunfei.adapter.http.uniagro;
+
+import cn.hutool.http.HttpUtil;
+import jakarta.annotation.Resource;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Objects;
+
+@Component
+//@Slf4j
+public class UniagroHttp {
+    //请求域名
+    public String domain = "https://openapi.uniagro.cn";
+
+    //获取设备状态
+    private static final String getDeviceStatus = "/v1/remote/{id}?realtime=true";
+
+    //设置设备参数
+    private static final String setDevice = "/v1/remote/{id}";
+
+    @Resource
+    private UniagroConfig config;
+
+    /**
+     * 读取上报数据
+     * @param deviceId 设备id
+     * @return
+     */
+    public String readReport(String deviceId) {
+        try {
+            String url = domain + getDeviceStatus.replace("{id}", deviceId);
+            String body = HttpUtil.createGet(url).header("Authorization","Bearer "+config.token).execute().body();
+//            log.info("读取设备" + deviceId + "消息,响应数据为" + body);
+            return body;
+        } catch (Exception e) {
+//            e.printStackTrace();
+        }
+        return null;
+    }
+
+    /**
+     * 写入设备参数
+     * @param deviceId 设备id
+     * @param params 参数
+     * @return
+     */
+    public String write(String deviceId, HashMap<String, Object> params) {
+        try {
+            String url = domain + setDevice.replace("{id}", deviceId);
+            String body = HttpUtil.createPost(url).header("Authorization","Bearer "+config.token).form(params).execute().body();
+//            log.info("写入设备" + deviceId + "参数为:" + params + ",响应数据为:" + body);
+            return body;
+        } catch (Exception e) {
+//            e.printStackTrace();
+            return null;
+        }
+    }
+
+}

+ 52 - 0
src/main/java/com/yunfei/adapter/http/uniagro/UniagroHttpAspect.java

@@ -0,0 +1,52 @@
+package com.yunfei.adapter.http.uniagro;
+
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.http.HttpUtil;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import org.aspectj.lang.annotation.AfterReturning;
+import org.aspectj.lang.annotation.Aspect;
+import org.aspectj.lang.annotation.Before;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.util.StringUtils;
+
+import java.util.Map;
+
+@Aspect
+@Component
+public class UniagroHttpAspect {
+
+    @Autowired
+    private UniagroConfig config;
+
+    @Before("execution(* com.yunfei.adapter.http.uniagro.UniagroHttp.readReport(..)) || " +
+            "execution(* com.yunfei.adapter.http.uniagro.UniagroHttp.write(..))")
+    public void checkAndInjectToken() {
+        if (StringUtils.isEmpty(config.token)) {
+            this.login();
+        }
+    }
+
+    @AfterReturning(pointcut = "execution(* com.yunfei.adapter.http.uniagro.UniagroHttp.readReport(..)) || execution(* com.yunfei.adapter.http.uniagro.UniagroHttp.write(..))", returning = "responseString")
+    public void processResponse(String responseString) {
+        try {
+//            System.err.println("asdfasdfasdf");
+        } catch (Exception e) {
+//             处理JSON转换的异常
+            System.out.println("Error processing JSON response: " + e.getMessage());
+        }
+    }
+
+    private void login(){
+        String loginUrl = "login.uniagro.cn/connect/token";
+        Map<String, Object> params = BeanUtil.beanToMap(config);
+        params.put("grant_type","password");
+        String body = HttpUtil.createPost(loginUrl).form(params).contentType("application/x-www-form-urlencoded").execute().body();
+        System.err.println(body);
+        JSONObject obj = JSON.parseObject(body);
+        if(obj.containsKey("access_token")){
+            config.token = obj.getString("access_token");
+        }
+    }
+}

+ 70 - 0
src/main/java/com/yunfei/adapter/http/uniagro/runner/ScheduledTasks.java

@@ -0,0 +1,70 @@
+package com.yunfei.adapter.http.uniagro.runner;
+
+import cn.hutool.core.lang.Pair;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.yunfei.adapter.forword.MqttPublisher;
+import com.yunfei.adapter.http.uniagro.UniagroHttp;
+import jakarta.annotation.Resource;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Component
+public class ScheduledTasks {
+    @Resource
+    private UniagroHttp uniagroHttp;
+    @Resource
+    private MqttPublisher mqttPublisher;
+    static List<Pair<String, String>> deviceIds = new ArrayList<>();
+
+    static {
+        deviceIds.add(new Pair<>("9001", "ZZ"));
+        deviceIds.add(new Pair<>("9100", "PY"));
+    }
+
+    @Scheduled(fixedRate = 20000)
+    public void performTask() {
+        for (Pair<String, String> device : deviceIds) {
+            String msg = uniagroHttp.readReport(device.getKey());
+            try {
+                mqttPublisher.publish("/" + device.getValue() + "/" + device.getKey() + "/properties/report", this.decode(msg));
+            } catch (MqttException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public String decode(String msg) {
+        JSONObject result = new JSONObject();
+        JSONObject obj = JSON.parseObject(msg);
+        if (obj.containsKey("data")) {
+            JSONObject data = obj.getJSONObject("data");
+            if(data.containsKey("angle")){
+                this.putInputs(result,data.getJSONObject("angle"));
+            }
+        }
+        if (obj.containsKey("timeStamp")) {
+            result.put("timeStamp", obj.get("timeStamp"));
+        }
+        if (obj.containsKey("deviceType")) {
+            result.put("deviceType", obj.get("deviceType"));
+        }
+        if (obj.getJSONObject("data").containsKey("detail")) {
+            this.putInputs(result,obj.getJSONObject("data").getJSONObject("detail"));
+        }
+        JSONObject property = new JSONObject();
+        property.put("property",result);
+        return property.toJSONString();
+    }
+
+    public void putInputs(JSONObject result,JSONObject data){
+        for (String key : data.keySet()) {
+            result.put(key, data.get(key));
+        }
+    }
+}

+ 21 - 0
src/main/java/com/yunfei/adapter/tcp/DefPacket.java

@@ -0,0 +1,21 @@
+package com.yunfei.adapter.tcp;
+
+import org.tio.core.intf.Packet;
+
+public class DefPacket extends Packet {
+    private static final long serialVersionUID = -172060606924066412L;
+    public static final String CHARSET = "utf-8";
+    private byte[] body;
+    public static final String heartbeat = "hello";
+
+    public static final String response = "AA01000100BB00CC";
+
+
+    public byte[] getBody() {
+        return body;
+    }
+
+    public void setBody(byte[] body) {
+        this.body = body;
+    }
+}

+ 50 - 0
src/main/java/com/yunfei/adapter/tcp/TcpHandle.java

@@ -0,0 +1,50 @@
+package com.yunfei.adapter.tcp;
+
+import com.alibaba.fastjson.JSONObject;
+import com.yunfei.adapter.forword.MqttPublisher;
+import jakarta.annotation.Resource;
+import org.springframework.stereotype.Component;
+import org.tio.core.ChannelContext;
+import org.tio.core.TioConfig;
+import org.tio.core.exception.AioDecodeException;
+import org.tio.core.intf.Packet;
+import org.tio.server.intf.ServerAioHandler;
+
+import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+
+@Component
+public class TcpHandle implements ServerAioHandler {
+    @Resource
+    private MqttPublisher mqttPublisher;
+
+    @Override
+    public Packet decode(ByteBuffer byteBuffer, int i, int i1, int i2, ChannelContext channelContext) throws AioDecodeException {
+        byte[] array = new byte[byteBuffer.remaining()];
+        byteBuffer.get(array);
+        DefPacket defPackage = new DefPacket();
+        defPackage.setBody(array);
+        return defPackage;
+    }
+
+    @Override
+    public ByteBuffer encode(Packet packet, TioConfig tioConfig, ChannelContext channelContext) {
+        return null;
+    }
+
+//    @Override
+//    public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
+//        return null;
+//    }
+
+    @Override
+    public void handler(Packet packet, ChannelContext channelContext) throws Exception {
+        DefPacket defPackage = (DefPacket) packet;
+        byte[] body = defPackage.getBody();
+        String msg = new String(body);
+        String[] split = msg.split(",");
+        mqttPublisher.publish("/NHH/" + split[1] + "/property/report", msg);
+    }
+}

+ 44 - 0
src/main/java/com/yunfei/adapter/tcp/TcpListener.java

@@ -0,0 +1,44 @@
+package com.yunfei.adapter.tcp;
+
+import org.springframework.stereotype.Component;
+import org.tio.core.ChannelContext;
+import org.tio.core.intf.Packet;
+import org.tio.server.intf.ServerAioListener;
+
+@Component
+public class TcpListener implements ServerAioListener {
+    @Override
+    public void onAfterConnected(ChannelContext channelContext, boolean b, boolean b1) throws Exception {
+
+    }
+
+    @Override
+    public void onAfterDecoded(ChannelContext channelContext, Packet packet, int i) throws Exception {
+
+    }
+
+    @Override
+    public void onAfterReceivedBytes(ChannelContext channelContext, int i) throws Exception {
+
+    }
+
+    @Override
+    public void onAfterSent(ChannelContext channelContext, Packet packet, boolean b) throws Exception {
+
+    }
+
+    @Override
+    public void onAfterHandled(ChannelContext channelContext, Packet packet, long l) throws Exception {
+
+    }
+
+    @Override
+    public void onBeforeClose(ChannelContext channelContext, Throwable throwable, String s, boolean b) throws Exception {
+
+    }
+
+    @Override
+    public boolean onHeartbeatTimeout(ChannelContext channelContext, Long aLong, int i) {
+        return false;
+    }
+}

+ 28 - 0
src/main/java/com/yunfei/adapter/tcp/TioConfig.java

@@ -0,0 +1,28 @@
+package com.yunfei.adapter.tcp;
+
+import jakarta.annotation.Resource;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+import org.tio.server.ServerTioConfig;
+import org.tio.server.TioServer;
+
+import java.io.IOException;
+
+@Component
+public class TioConfig {
+    @Resource
+    private TcpHandle tcpHandle;
+    @Resource
+    private TcpListener tcpListener;
+
+    @Bean
+    public TioServer create(){
+        TioServer tioServer = new TioServer(new ServerTioConfig(tcpHandle, tcpListener));
+        try {
+            tioServer.start("0.0.0.0",6789);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return tioServer;
+    }
+}