Browse Source

feat(仪表盘): 系统监控添加历史记录支持 (#284)

Zhang Ji 2 years ago
parent
commit
015570066a

+ 6 - 0
jetlinks-components/dashboard-component/pom.xml

@@ -30,6 +30,12 @@
         </dependency>
 
         <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>timeseries-component</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>com.github.oshi</groupId>
             <artifactId>oshi-core</artifactId>
             <version>6.2.2</version>

+ 148 - 4
jetlinks-components/dashboard-component/src/main/java/org/jetlinks/community/dashboard/measurements/sys/SystemMonitorMeasurementProvider.java

@@ -1,10 +1,17 @@
 package org.jetlinks.community.dashboard.measurements.sys;
 
+import com.google.common.collect.Maps;
+import org.hswebframework.web.api.crud.entity.QueryParamEntity;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.jetlinks.community.dashboard.*;
 import org.jetlinks.community.dashboard.measurements.MonitorObjectDefinition;
 import org.jetlinks.community.dashboard.supports.StaticMeasurement;
 import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
+import org.jetlinks.community.timeseries.TimeSeriesData;
+import org.jetlinks.community.timeseries.TimeSeriesManager;
+import org.jetlinks.community.timeseries.TimeSeriesMetadata;
+import org.jetlinks.community.timeseries.TimeSeriesMetric;
+import org.jetlinks.community.utils.TimeUtils;
 import org.jetlinks.core.metadata.ConfigMetadata;
 import org.jetlinks.core.metadata.DataType;
 import org.jetlinks.core.metadata.DefaultConfigMetadata;
@@ -13,10 +20,17 @@ import org.jetlinks.core.metadata.types.ObjectType;
 import org.jetlinks.core.metadata.types.StringType;
 import org.reactivestreams.Publisher;
 import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
 
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
 import java.time.Duration;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -42,7 +56,55 @@ import java.util.Map;
  * <p>
  * 类型不同结构不同,memory: {@link MemoryInfo},cpu:{@link CpuInfo},disk:{@link DiskInfo},all:{@link SystemInfo}
  * <p>
- *  🌟: 企业版支持集群监控以及历史记录
+ *
+ * <h2>历史数据</h2>
+ *
+ * <pre>{@code
+ * POST /dashboard/_multi
+ *
+ *  [
+ *     {
+ *         "dashboard": "systemMonitor",
+ *         "object": "stats",
+ *         "measurement": "info",
+ *         "dimension": "history",
+ *         "group": "system-monitor",
+ *         "params": {
+ *              "from":"now-10m",
+ *              "to":"now"
+ *         }
+ *     }
+ * ]
+ *
+ * 返回:
+ *
+ *  [
+ *    {
+ *    "group":"system-monitor",
+ *    "data": {
+ *          "value": {
+ *              "memorySystemFree": 344, //系统可用内存
+ *              "memoryJvmHeapFree": 3038, //jvm可用内存
+ *              "memorySystemTotal": 49152, //系统总内存
+ *              "memoryJvmNonHeapTotal": 49152, //jvm堆外总内存
+ *              "diskTotal": 1907529, //磁盘总空间
+ *              "cpuSystemUsage": 11.8, //系统cpu使用率
+ *              "diskFree": 1621550, //磁盘可用空间
+ *              "clusterNodeId": "jetlinks-platform:8820", //集群节点ID
+ *              "memoryJvmHeapTotal": 4001, //jvm总内存
+ *              "cpuJvmUsage": 0.1, //jvm cpu使用率
+ *              "memoryJvmNonHeapFree": 48964, //jvm堆外可用内存
+ *              "id": "eSEeBYEBN57nz4ZBo0WI", // ID
+ *          },
+ *          "timeString": "2023-05-16 18:32:27",//时间
+ *          "timestamp": 1684233147193 //时间
+ *       }
+ *    }
+ *  ]
+ *
+ * }</pre>
+ *
+ *  🌟: 企业版支持集群监控
  *
  * @author zhouhao
  * @since 2.0
@@ -52,17 +114,56 @@ public class SystemMonitorMeasurementProvider extends StaticMeasurementProvider
 
     private final SystemMonitorService monitorService = new SystemMonitorServiceImpl();
 
+    private final Duration collectInterval = TimeUtils.parse(System.getProperty("monitor.system.collector.interval", "1m"));
+
+    private final Scheduler scheduler;
+
+    private final TimeSeriesManager timeSeriesManager;
+
+    static final TimeSeriesMetric metric = TimeSeriesMetric.of(System.getProperty("monitor.system.collector.metric", "system_monitor"));
+
+    private final Disposable.Composite disposable = Disposables.composite();
 
-    public SystemMonitorMeasurementProvider() {
+
+    public SystemMonitorMeasurementProvider(TimeSeriesManager timeSeriesManager) {
         super(DefaultDashboardDefinition.systemMonitor, MonitorObjectDefinition.stats);
+        this.timeSeriesManager = timeSeriesManager;
 
         addMeasurement(new StaticMeasurement(CommonMeasurementDefinition.info)
-                           .addDimension(new RealTimeDimension())
+            .addDimension(new RealTimeDimension())
+            .addDimension(new HistoryDimension())
         );
 
+        this.scheduler = Schedulers.newSingle("system-monitor-collector");
+
+        disposable.add(this.scheduler);
+    }
 
+    @PreDestroy
+    public void destroy() {
+        disposable.dispose();
     }
 
+    @PostConstruct
+    public void init() {
+        //注册监控信息
+        timeSeriesManager
+            .registerMetadata(
+                TimeSeriesMetadata.of(metric)
+            )
+            .block(Duration.ofSeconds(10));
+
+        //定时收集监控信息
+        disposable.add(Flux
+            .interval(collectInterval, scheduler)
+            .flatMap(ignore -> monitorService
+                .system()
+                .map(this::systemInfoToMap)
+                .flatMap(data -> timeSeriesManager.getService(metric).commit(data))
+                .onErrorResume(err -> Mono.empty()))
+            .subscribe()
+        );
+    }
 
     private void putTo(String prefix, MonitorInfo<?> source, Map<String, Object> target) {
         Map<String, Object> data = FastBeanCopier.copy(source, new HashMap<>());
@@ -73,6 +174,50 @@ public class SystemMonitorMeasurementProvider extends StaticMeasurementProvider
         });
     }
 
+    public TimeSeriesData systemInfoToMap(SystemInfo info) {
+        Map<String, Object> map = Maps.newLinkedHashMapWithExpectedSize(12);
+        putTo("cpu", info.getCpu(), map);
+        putTo("disk", info.getDisk(), map);
+        putTo("memory", info.getMemory(), map);
+        return TimeSeriesData.of(System.currentTimeMillis(), map);
+    }
+
+    //历史记录
+    class HistoryDimension implements MeasurementDimension {
+
+        @Override
+        public DimensionDefinition getDefinition() {
+            return CommonDimensionDefinition.history;
+        }
+
+        @Override
+        public DataType getValueType() {
+            return new ObjectType();
+        }
+
+        @Override
+        public ConfigMetadata getParams() {
+            return new DefaultConfigMetadata();
+        }
+
+        @Override
+        public boolean isRealTime() {
+            return false;
+        }
+
+        @Override
+        public Flux<? extends MeasurementValue> getValue(MeasurementParameter parameter) {
+            Date from = parameter.getDate("from", TimeUtils.parseDate("now-1h"));
+            Date to = parameter.getDate("to", TimeUtils.parseDate("now"));
+
+            return QueryParamEntity
+                .newQuery()
+                .noPaging()
+                .between("timestamp", from, to)
+                .execute(timeSeriesManager.getService(metric)::query)
+                .map(tsData -> SimpleMeasurementValue.of(tsData.getData(), tsData.getTimestamp()));
+        }
+    }
 
     //实时监控
     class RealTimeDimension implements MeasurementDimension {
@@ -91,7 +236,6 @@ public class SystemMonitorMeasurementProvider extends StaticMeasurementProvider
         public ConfigMetadata getParams() {
 
             return new DefaultConfigMetadata()
-                .add("serverNodeId", "服务节点ID", StringType.GLOBAL)
                 .add("interval", "更新频率", StringType.GLOBAL)
                 .add("type", "指标类型", new EnumType()
                     .addElement(EnumType.Element.of("all", "全部"))