Преглед на файлове

feat(通知订阅): 支持告警消息站内信通知 (#274)

bestfeng1020 преди 2 години
родител
ревизия
6a2035fee6
променени са 13 файла, в които са добавени 567 реда и са изтрити 54 реда
  1. 11 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/Notification.java
  2. 21 1
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotificationEntity.java
  3. 87 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifyChannelEntity.java
  4. 19 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifySubscriberEntity.java
  5. 19 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/enums/NotifyChannelState.java
  6. 93 46
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/service/NotifySubscriberService.java
  7. 8 1
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/Subscriber.java
  8. 25 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/SubscriberProviders.java
  9. 61 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/InsideMailChannelProvider.java
  10. 172 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/NotificationDispatcher.java
  11. 17 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/NotifyChannel.java
  12. 16 0
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/NotifyChannelProvider.java
  13. 18 6
      jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmProvider.java

+ 11 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/Notification.java

@@ -7,6 +7,7 @@ import org.hswebframework.web.id.IDGenerator;
 import org.jetlinks.community.notify.manager.subscriber.Notify;
 
 import java.io.Serializable;
+import java.util.List;
 
 @Getter
 @Setter
@@ -27,10 +28,17 @@ public class Notification implements Serializable {
 
     private String message;
 
+
+    private Object detail;
+
+    private String code;
+
     private String dataId;
 
     private long notifyTime;
 
+    private List<String> notifyChannels;
+
     public static Notification from(NotifySubscriberEntity entity) {
         Notification notification = new Notification();
 
@@ -39,6 +47,7 @@ public class Notification implements Serializable {
         notification.subscriber = entity.getSubscriber();
         notification.topicName = entity.getTopicName();
         notification.setTopicProvider(entity.getTopicProvider());
+        notification.setNotifyChannels(entity.getNotifyChannels());
 
         return notification;
     }
@@ -49,6 +58,8 @@ public class Notification implements Serializable {
         target.setMessage(message.getMessage());
         target.setDataId(message.getDataId());
         target.setNotifyTime(message.getNotifyTime());
+        target.setDetail(message.getDetail());
+        target.setCode(message.getCode());
 
         return target;
     }

+ 21 - 1
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotificationEntity.java

@@ -4,12 +4,14 @@ import io.swagger.v3.oas.annotations.Hidden;
 import io.swagger.v3.oas.annotations.media.Schema;
 import lombok.Getter;
 import lombok.Setter;
+import lombok.SneakyThrows;
 import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType;
 import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
 import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec;
 import org.hswebframework.web.api.crud.entity.GenericEntity;
 import org.hswebframework.web.bean.FastBeanCopier;
 import org.jetlinks.community.notify.manager.enums.NotificationState;
+import org.jetlinks.community.utils.ObjectMappers;
 
 import javax.persistence.Column;
 import javax.persistence.Index;
@@ -58,6 +60,16 @@ public class NotificationEntity extends GenericEntity<String> {
     @Schema(description = "通知时间")
     private Long notifyTime;
 
+    @Column(length = 128)
+    @Schema(description = "通知编码")
+    private String code;
+
+    @Column
+    @Schema(description = "详情")
+    @ColumnType(jdbcType = JDBCType.CLOB, javaType = String.class)
+    private String detailJson;
+
+
     @Column(length = 32)
     @EnumCodec
     @DefaultValue("unread")
@@ -69,7 +81,15 @@ public class NotificationEntity extends GenericEntity<String> {
     @Schema(description = "说明")
     private String description;
 
+    @SneakyThrows
     public static NotificationEntity from(Notification notification) {
-        return FastBeanCopier.copy(notification, new NotificationEntity());
+        NotificationEntity entity = FastBeanCopier.copy(notification, new NotificationEntity());
+        Object detail = notification.getDetail();
+
+        entity.setCode(notification.getCode());
+        if (detail != null) {
+            entity.setDetailJson(ObjectMappers.JSON_MAPPER.writeValueAsString(detail));
+        }
+        return entity;
     }
 }

+ 87 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifyChannelEntity.java

@@ -0,0 +1,87 @@
+package org.jetlinks.community.notify.manager.entity;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.Getter;
+import lombok.Setter;
+import org.hswebframework.ezorm.rdb.mapping.annotation.ColumnType;
+import org.hswebframework.ezorm.rdb.mapping.annotation.DefaultValue;
+import org.hswebframework.ezorm.rdb.mapping.annotation.EnumCodec;
+import org.hswebframework.ezorm.rdb.mapping.annotation.JsonCodec;
+import org.hswebframework.web.api.crud.entity.GenericEntity;
+import org.hswebframework.web.validator.CreateGroup;
+//import org.jetlinks.community.authorize.AuthenticationSpec;
+import org.jetlinks.community.notify.manager.enums.NotifyChannelState;
+import org.jetlinks.community.notify.manager.subscriber.SubscriberProvider;
+import org.jetlinks.community.notify.manager.subscriber.channel.NotifyChannelProvider;
+
+import javax.persistence.Column;
+import javax.persistence.Table;
+import javax.validation.constraints.NotBlank;
+import java.sql.JDBCType;
+import java.util.Map;
+
+/**
+ * 通知通道(配置).
+ * 用于定义哪些权限范围(grant),哪种主题(topicProvider),支持何种方式(channel)进行通知
+ * <p>
+ * 比如: 管理员角色的用户可以使用邮件通知,但是普通用户只能使用站内信通知.
+ *
+ * @author zhouhao
+ * @since 2.0
+ */
+@Table(name = "notify_channel")
+@Getter
+@Setter
+@Schema(description = "通知通道(配置)")
+public class NotifyChannelEntity extends GenericEntity<String> {
+
+    @Column(nullable = false, length = 32)
+    @NotBlank(groups = CreateGroup.class)
+    @Schema(description = "名称")
+    private String name;
+
+//    @Column
+//    @JsonCodec
+//    @ColumnType(jdbcType = JDBCType.LONGVARCHAR, javaType = String.class)
+//    @Schema(description = "权限范围")
+//    private AuthenticationSpec grant;
+
+    /**
+     * @see SubscriberProvider#getId()
+     */
+    @Column(nullable = false, length = 32, updatable = false)
+    @NotBlank(groups = CreateGroup.class)
+    @Schema(description = "主题提供商标识")
+    private String topicProvider;
+
+    @Column(nullable = false, length = 32)
+    @NotBlank(groups = CreateGroup.class)
+    @Schema(description = "主题提供商名称")
+    private String topicName;
+
+    /**
+     * @see NotifyChannelProvider#getId()
+     */
+    @Column(nullable = false, length = 32, updatable = false)
+    @NotBlank(groups = CreateGroup.class)
+    @Schema(description = "通知类型")
+    private String channelProvider;
+
+    /**
+     * @see NotifyChannelProvider#createChannel(Map)
+     * @see org.jetlinks.community.notify.manager.subscriber.channel.notifiers.NotifierChannelProvider.NotifyChannelConfig
+     */
+    @Column
+    @JsonCodec
+    @ColumnType(jdbcType = JDBCType.LONGVARCHAR, javaType = String.class)
+    @Schema(description = "通知配置")
+    private Map<String, Object> channelConfiguration;
+
+    @Column(length = 32)
+    @EnumCodec
+    @ColumnType(javaType = String.class)
+    @DefaultValue("enabled")
+    @Schema(description = "状态")
+    private NotifyChannelState state;
+
+}

+ 19 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/entity/NotifySubscriberEntity.java

@@ -12,6 +12,8 @@ import org.jetlinks.community.notify.manager.enums.SubscribeState;
 import javax.persistence.Column;
 import javax.persistence.Index;
 import javax.persistence.Table;
+import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 /**
@@ -75,5 +77,22 @@ public class NotifySubscriberEntity extends GenericEntity<String> {
     @Schema(description = "状态.")
     private SubscribeState state;
 
+    @Column(length = 32)
+    @Schema(description = "订阅语言")
+    private String locale;
+
+
+    /**
+     * @see NotifyChannelEntity#getId()
+     */
+    @Column(length = 3000)
+    @Schema(description = "通知方式")
+    @JsonCodec
+    @ColumnType(javaType = String.class)
+    private List<String> notifyChannels;
+
 
+    public Locale toLocale() {
+        return locale == null ? Locale.getDefault() : Locale.forLanguageTag(locale);
+    }
 }

+ 19 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/enums/NotifyChannelState.java

@@ -0,0 +1,19 @@
+package org.jetlinks.community.notify.manager.enums;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.hswebframework.web.dict.EnumDict;
+
+@Getter
+@AllArgsConstructor
+public enum NotifyChannelState implements EnumDict<String> {
+    enabled("正常"),
+    disabled("禁用");
+
+    private final String text;
+
+    @Override
+    public String getValue() {
+        return name();
+    }
+}

+ 93 - 46
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/service/NotifySubscriberService.java

@@ -1,23 +1,28 @@
 package org.jetlinks.community.notify.manager.service;
 
+
 import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.authorization.ReactiveAuthenticationHolder;
-import org.hswebframework.web.crud.events.EntityCreatedEvent;
-import org.hswebframework.web.crud.events.EntityDeletedEvent;
-import org.hswebframework.web.crud.events.EntityModifyEvent;
-import org.hswebframework.web.crud.events.EntitySavedEvent;
+import org.hswebframework.web.crud.events.*;
 import org.hswebframework.web.crud.service.GenericReactiveCrudService;
+import org.hswebframework.web.exception.BusinessException;
+import org.hswebframework.web.i18n.LocaleUtils;
+import org.jetlinks.core.cluster.ClusterManager;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.community.gateway.annotation.Subscribe;
 import org.jetlinks.community.notify.manager.entity.Notification;
 import org.jetlinks.community.notify.manager.entity.NotifySubscriberEntity;
 import org.jetlinks.community.notify.manager.enums.SubscribeState;
 import org.jetlinks.community.notify.manager.subscriber.SubscriberProvider;
-import org.jetlinks.core.cluster.ClusterManager;
-import org.jetlinks.core.event.EventBus;
+import org.jetlinks.community.notify.manager.subscriber.SubscriberProviders;
+import org.springframework.beans.factory.ObjectProvider;
 import org.springframework.boot.CommandLineRunner;
+import org.springframework.context.ApplicationEventPublisher;
 import org.springframework.context.event.EventListener;
 import org.springframework.stereotype.Service;
 import org.springframework.util.StringUtils;
 import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 import java.util.List;
@@ -31,19 +36,20 @@ public class NotifySubscriberService extends GenericReactiveCrudService<NotifySu
 
     private final EventBus eventBus;
 
-    private final ClusterManager clusterManager;
-
     private final Map<String, SubscriberProvider> providers = new ConcurrentHashMap<>();
 
     private final Map<String, Disposable> subscribers = new ConcurrentHashMap<>();
 
+    private final ApplicationEventPublisher eventPublisher;
+
     public NotifySubscriberService(EventBus eventBus,
-                                   ClusterManager clusterManager,
-                                   List<SubscriberProvider> providers) {
+                                   ObjectProvider<SubscriberProvider> providers,
+                                   ApplicationEventPublisher eventPublisher) {
         this.eventBus = eventBus;
-        this.clusterManager = clusterManager;
+        this.eventPublisher=eventPublisher;
         for (SubscriberProvider provider : providers) {
             this.providers.put(provider.getId(), provider);
+            SubscriberProviders.register(provider);
         }
     }
 
@@ -51,69 +57,109 @@ public class NotifySubscriberService extends GenericReactiveCrudService<NotifySu
         return Optional.ofNullable(provider).map(providers::get);
     }
 
-    private void doStart() {
-        clusterManager.<NotifySubscriberEntity>getTopic("notification-changed")
-            .subscribe()
-            .subscribe(this::handleSubscribe);
+    protected Mono<Void> doNotifyChange(NotifySubscriberEntity entity) {
+        return eventBus
+            .publish("/notification-changed", entity)
+            .then();
+
     }
 
-    protected void doNotifyChange(NotifySubscriberEntity entity) {
-        clusterManager.<NotifySubscriberEntity>getTopic("notification-changed")
-            .publish(Mono.just(entity))
-            .retry(3)
-            .subscribe();
+    @EventListener
+    public void handleEvent(EntityPrepareCreateEvent<NotifySubscriberEntity> entity) {
+        //填充语言
+        entity.async(
+            LocaleUtils
+                .currentReactive()
+                .doOnNext(locale -> {
+                    for (NotifySubscriberEntity subscriber : entity.getEntity()) {
+                        if (subscriber.getLocale() == null) {
+                            subscriber.setLocale(locale.toLanguageTag());
+                        }
+                    }
+                })
+        );
+    }
+
+    @EventListener
+    public void handleEvent(EntityPrepareSaveEvent<NotifySubscriberEntity> entity) {
+        //填充语言
+        entity.async(
+            LocaleUtils
+                .currentReactive()
+                .doOnNext(locale -> {
+                    for (NotifySubscriberEntity subscriber : entity.getEntity()) {
+                        if (subscriber.getLocale() == null) {
+                            subscriber.setLocale(locale.toLanguageTag());
+                        }
+                    }
+                })
+        );
     }
 
     @EventListener
     public void handleEvent(EntityCreatedEvent<NotifySubscriberEntity> entity) {
-        entity.getEntity().forEach(this::doNotifyChange);
+        entity.async(
+            Flux.fromIterable(entity.getEntity())
+                .flatMap(this::doNotifyChange)
+        );
     }
 
     @EventListener
     public void handleEvent(EntitySavedEvent<NotifySubscriberEntity> entity) {
-        entity.getEntity().forEach(this::doNotifyChange);
+        entity.async(
+            Flux.fromIterable(entity.getEntity())
+                .flatMap(this::doNotifyChange)
+        );
     }
 
     @EventListener
     public void handleEvent(EntityDeletedEvent<NotifySubscriberEntity> entity) {
-        entity.getEntity().forEach(e -> {
-            e.setState(SubscribeState.disabled);
-            doNotifyChange(e);
-        });
+        entity.async(
+            Flux.fromIterable(entity.getEntity())
+                .doOnNext(e -> e.setState(SubscribeState.disabled))
+                .flatMap(this::doNotifyChange)
+        );
+
     }
 
     @EventListener
     public void handleEvent(EntityModifyEvent<NotifySubscriberEntity> entity) {
-        entity.getAfter().forEach(this::doNotifyChange);
+        entity.async(
+            Flux.fromIterable(entity.getAfter())
+                .flatMap(this::doNotifyChange)
+        );
     }
 
-    private void handleSubscribe(NotifySubscriberEntity entity) {
+    @Subscribe("/notification-changed")
+    public void handleSubscribe(NotifySubscriberEntity entity) {
 
         //取消订阅
         if (entity.getState() == SubscribeState.disabled) {
             Optional.ofNullable(subscribers.remove(entity.getId()))
-                .ifPresent(Disposable::dispose);
+                    .ifPresent(Disposable::dispose);
             log.debug("unsubscribe:{}({}),{}", entity.getTopicProvider(), entity.getTopicName(), entity.getId());
             return;
         }
 
         //模版
         Notification template = Notification.from(entity);
-        //转发通知
-        String dispatch = template.createTopic();
 
         Disposable old = subscribers
             .put(entity.getId(),
-                Mono.zip(ReactiveAuthenticationHolder.get(entity.getSubscriber()), Mono.justOrEmpty(getProvider(entity.getTopicProvider())))
-                    .flatMap(tp2 -> tp2.getT2().createSubscriber(entity.getId(),tp2.getT1(), entity.getTopicConfig()))
-                    .flatMap(subscriber ->
-                        subscriber
-                            .subscribe()
-                            .map(template::copyWithMessage)
-                            .flatMap(notification -> eventBus.publish(dispatch, notification))
-                            .onErrorContinue((err, obj) -> log.error(err.getMessage(), err))
-                            .then())
-                    .subscribe()
+                 Mono
+                     .zip(ReactiveAuthenticationHolder.get(entity.getSubscriber()), Mono.justOrEmpty(getProvider(entity.getTopicProvider())))
+                     .flatMap(tp2 -> tp2.getT2().createSubscriber(entity.getId(), tp2.getT1(), entity.getTopicConfig()))
+                     .flatMap(subscriber ->
+                                  subscriber
+                                      .subscribe(entity.toLocale())
+                                      .map(template::copyWithMessage)
+                                      .doOnNext(eventPublisher::publishEvent)
+                                      .onErrorResume((err) -> {
+                                          log.error(err.getMessage(), err);
+                                          return Mono.empty();
+                                      })
+                                      .then())
+                     .subscribe()
             );
         log.debug("subscribe :{}({})", template.getTopicProvider(), template.getTopicName());
 
@@ -124,8 +170,9 @@ public class NotifySubscriberService extends GenericReactiveCrudService<NotifySu
     }
 
     public Mono<Void> doSubscribe(NotifySubscriberEntity entity) {
-        return Mono.justOrEmpty(getProvider(entity.getTopicProvider()))
-            .switchIfEmpty(Mono.error(() -> new IllegalArgumentException("不支持的主题:" + entity.getTopicProvider())))
+        return Mono
+            .justOrEmpty(getProvider(entity.getTopicProvider()))
+            .switchIfEmpty(Mono.error(() -> new BusinessException("error.unsupported_topics", 500, entity.getTopicProvider())))
             .map(provider -> {
                 entity.setTopicName(provider.getName());
                 return entity;
@@ -133,9 +180,10 @@ public class NotifySubscriberService extends GenericReactiveCrudService<NotifySu
             .flatMap(subEntity -> {
                 if (StringUtils.isEmpty(entity.getId())) {
                     entity.setId(null);
-                    return save(Mono.just(entity));
+                    return save(entity);
                 } else {
-                    return createUpdate().set(entity)
+                    return createUpdate()
+                        .set(entity)
                         .where(NotifySubscriberEntity::getId, entity.getId())
                         .and(NotifySubscriberEntity::getSubscriberType, entity.getSubscriberType())
                         .and(NotifySubscriberEntity::getSubscriber, entity.getSubscriber())
@@ -147,7 +195,6 @@ public class NotifySubscriberService extends GenericReactiveCrudService<NotifySu
 
     @Override
     public void run(String... args) {
-        doStart();
         createQuery()
             .where(NotifySubscriberEntity::getState, SubscribeState.enabled)
             .fetch()

+ 8 - 1
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/Subscriber.java

@@ -2,7 +2,14 @@ package org.jetlinks.community.notify.manager.subscriber;
 
 import reactor.core.publisher.Flux;
 
+import java.util.Locale;
+
 public interface Subscriber {
 
-    Flux<Notify> subscribe();
+    Flux<Notify> subscribe(Locale locale);
+
+    default Flux<Notify> subscribe() {
+        return subscribe(Locale.getDefault());
+    }
+
 }

+ 25 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/SubscriberProviders.java

@@ -0,0 +1,25 @@
+package org.jetlinks.community.notify.manager.subscriber;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class SubscriberProviders {
+
+    private final static Map<String, SubscriberProvider> providers = new ConcurrentHashMap<>();
+
+
+    public static void register(SubscriberProvider provider) {
+        providers.put(provider.getId(), provider);
+    }
+
+    public static List<SubscriberProvider> getProviders() {
+        return new ArrayList<>(providers.values());
+    }
+
+    public static Optional<SubscriberProvider> getProvider(String id) {
+        return Optional.ofNullable(providers.get(id));
+    }
+}

+ 61 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/InsideMailChannelProvider.java

@@ -0,0 +1,61 @@
+package org.jetlinks.community.notify.manager.subscriber.channel;
+
+import lombok.AllArgsConstructor;
+import org.jetlinks.community.notify.manager.entity.Notification;
+import org.jetlinks.core.event.EventBus;
+import org.springframework.core.Ordered;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Mono;
+
+import java.util.Map;
+
+/**
+ * 站内信通知,通过推送通知信息到事件总线.
+ * <p>
+ * 由{@link org.jetlinks.community.notify.manager.message.NotificationsPublishProvider}推送到前端.
+ * <p>
+ * 由{@link org.jetlinks.community.notify.manager.service.NotificationService}写入到数据库.
+ *
+ * @author zhouhao
+ * @since 2.0
+ */
+@Component
+@AllArgsConstructor
+public class InsideMailChannelProvider implements NotifyChannelProvider, NotifyChannel {
+    public static final String provider = "inside-mail";
+
+    private final EventBus eventBus;
+
+    @Override
+    public String getId() {
+        return "inside-mail";
+    }
+
+    @Override
+    public String getName() {
+        return "站内信";
+    }
+
+    @Override
+    public Mono<NotifyChannel> createChannel(Map<String, Object> configuration) {
+        return Mono.just(this);
+    }
+
+    @Override
+    public Mono<Void> sendNotify(Notification notification) {
+        //设置了站内信的订阅才推送的事件总线
+        return eventBus
+            .publish(notification.createTopic(), notification)
+            .then();
+    }
+
+    @Override
+    public void dispose() {
+
+    }
+
+    @Override
+    public int getOrder() {
+        return Ordered.HIGHEST_PRECEDENCE;
+    }
+}

+ 172 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/NotificationDispatcher.java

@@ -0,0 +1,172 @@
+package org.jetlinks.community.notify.manager.subscriber.channel;
+
+import lombok.extern.slf4j.Slf4j;
+import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
+import org.hswebframework.web.crud.events.EntityCreatedEvent;
+import org.hswebframework.web.crud.events.EntityDeletedEvent;
+import org.hswebframework.web.crud.events.EntityModifyEvent;
+import org.hswebframework.web.crud.events.EntitySavedEvent;
+import org.jetlinks.community.gateway.annotation.Subscribe;
+import org.jetlinks.community.notify.manager.entity.Notification;
+import org.jetlinks.community.notify.manager.entity.NotifyChannelEntity;
+import org.jetlinks.community.notify.manager.enums.NotifyChannelState;
+import org.jetlinks.core.cache.ReactiveCacheContainer;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.event.Subscription;
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * 通知订阅转发器,将通知信息转发到对应的订阅通道中
+ *
+ * @author zhouhao
+ * @since 2.0
+ */
+@Component
+@Slf4j
+public class NotificationDispatcher implements CommandLineRunner {
+
+    private final EventBus eventBus;
+
+    private final ReactiveCacheContainer<String, NotifyChannel> channels = ReactiveCacheContainer.create();
+
+    private final Map<String, NotifyChannelProvider> providers = new HashMap<>();
+
+    private final ReactiveRepository<NotifyChannelEntity, String> channelRepository;
+
+    public NotificationDispatcher(EventBus eventBus,
+                                  ObjectProvider<NotifyChannelProvider> providers,
+                                  ReactiveRepository<NotifyChannelEntity, String> channelRepository) {
+        this.eventBus = eventBus;
+        this.channelRepository = channelRepository;
+        //默认支持站内信
+        this.channels.put(InsideMailChannelProvider.provider, new InsideMailChannelProvider(eventBus));
+
+        for (NotifyChannelProvider provider : providers) {
+            this.providers.put(provider.getId(), provider);
+        }
+    }
+
+    @EventListener
+    public void handleNotifications(Notification notification) {
+
+        List<String> channelIdList = notification.getNotifyChannels();
+        //默认站内信
+        if (channelIdList == null) {
+            channelIdList = Collections.singletonList(InsideMailChannelProvider.provider);
+        }
+        //发送通知
+        for (String notifyChannel : channelIdList) {
+            NotifyChannel dispatcher = channels.getNow(notifyChannel);
+            if (dispatcher != null) {
+                dispatcher
+                    .sendNotify(notification)
+                    .subscribe();
+            }
+        }
+
+    }
+
+    @EventListener
+    public void handleEvent(EntityCreatedEvent<NotifyChannelEntity> event) {
+
+        event.async(
+            register(event.getEntity())
+        );
+    }
+
+    @EventListener
+    public void handleEvent(EntitySavedEvent<NotifyChannelEntity> event) {
+
+        event.async(
+            register(event.getEntity())
+        );
+    }
+
+    @EventListener
+    public void handleEvent(EntityModifyEvent<NotifyChannelEntity> event) {
+
+        event.async(
+            register(event.getAfter())
+        );
+    }
+
+    @EventListener
+    public void handleEvent(EntityDeletedEvent<NotifyChannelEntity> event) {
+        event.async(
+            unregister(event.getEntity())
+        );
+    }
+
+    @Subscribe(value = "/_sys/notify-channel/unregister", features = Subscription.Feature.broker)
+    public void unregister(NotifyChannelEntity entity) {
+        channels.remove(entity.getId());
+    }
+
+    @Subscribe(value = "/_sys/notify-channel/register", features = Subscription.Feature.broker)
+    public Mono<Void> register(NotifyChannelEntity entity) {
+        if (entity.getState() == NotifyChannelState.disabled) {
+            channels.remove(entity.getId());
+        } else {
+            return channels
+                .compute(entity.getId(), (ignore, old) -> {
+                    if (null != old) {
+                        old.dispose();
+                    }
+                    return createChannel(entity);
+                })
+                .then();
+        }
+        return Mono.empty();
+    }
+
+    private Mono<NotifyChannel> createChannel(NotifyChannelEntity entity) {
+        NotifyChannelProvider provider = providers.get(entity.getChannelProvider());
+        if (null == provider) {
+            return Mono.empty();
+        }
+        return provider.createChannel(entity.getChannelConfiguration());
+    }
+
+    private Mono<Void> unregister(List<NotifyChannelEntity> entities) {
+        for (NotifyChannelEntity entity : entities) {
+            unregister(entity);
+        }
+        return Flux.fromIterable(entities)
+                   .flatMap(e -> eventBus.publish("/_sys/notify-channel/unregister", e))
+                   .then();
+    }
+
+    private Mono<Void> register(List<NotifyChannelEntity> entities) {
+        return Flux.fromIterable(entities)
+                   .flatMap(e -> register(e)
+                       .then(eventBus.publish("/_sys/notify-channel/register", e)))
+                   .then();
+
+    }
+
+    @Override
+    public void run(String... args) throws Exception {
+        channelRepository
+            .createQuery()
+            .where(NotifyChannelEntity::getState, NotifyChannelState.enabled)
+            .fetch()
+            .flatMap(e -> this
+                .register(e)
+                .onErrorResume(er -> {
+                    log.warn("register notify channel error", er);
+                    return Mono.empty();
+                }))
+            .subscribe();
+
+    }
+}

+ 17 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/NotifyChannel.java

@@ -0,0 +1,17 @@
+package org.jetlinks.community.notify.manager.subscriber.channel;
+
+import org.jetlinks.community.notify.manager.entity.Notification;
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
+
+/**
+ * 订阅通知通道,用于发送通知信息
+ *
+ * @author zhouhao
+ * @since 2.0
+ */
+public interface NotifyChannel extends Disposable {
+
+    Mono<Void> sendNotify(Notification notification);
+
+}

+ 16 - 0
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/channel/NotifyChannelProvider.java

@@ -0,0 +1,16 @@
+package org.jetlinks.community.notify.manager.subscriber.channel;
+
+import org.springframework.core.Ordered;
+import reactor.core.publisher.Mono;
+
+import java.util.Map;
+
+public interface NotifyChannelProvider extends Ordered {
+
+    String getId();
+
+    String getName();
+
+    Mono<NotifyChannel> createChannel(Map<String, Object> configuration);
+
+}

+ 18 - 6
jetlinks-manager/notify-manager/src/main/java/org/jetlinks/community/notify/manager/subscriber/providers/AlarmProvider.java

@@ -3,6 +3,7 @@ package org.jetlinks.community.notify.manager.subscriber.providers;
 import com.alibaba.fastjson.JSONObject;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
 import org.hswebframework.web.authorization.Authentication;
 import org.hswebframework.web.i18n.LocaleUtils;
 import org.jetlinks.community.ValueObject;
@@ -17,14 +18,20 @@ import org.jetlinks.core.metadata.DefaultConfigMetadata;
 import org.jetlinks.core.metadata.PropertyMetadata;
 import org.jetlinks.core.metadata.SimplePropertyMetadata;
 import org.jetlinks.core.metadata.types.StringType;
+import org.jetlinks.core.utils.FluxUtils;
 import org.springframework.stereotype.Component;
+import reactor.core.Disposable;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
+import java.time.Duration;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Consumer;
 
 @Component
+@Slf4j
 public class AlarmProvider implements SubscriberProvider {
 
     private final EventBus eventBus;
@@ -56,11 +63,15 @@ public class AlarmProvider implements SubscriberProvider {
         String alarmId = configs.getString("alarmConfigId").orElse("*");
 
         String topic = Topics.alarm("*", "*", alarmId);
-        return Mono.justOrEmpty(()-> createSubscribe(id, new String[]{topic}));
+
+        return Mono.just(locale -> createSubscribe(locale, id, new String[]{topic})
+            //有效期内去重,防止同一个用户所在多个部门推送同一个告警
+            .as(FluxUtils.distinct(Notify::getDataId, Duration.ofSeconds(10))));
 
     }
 
-    private Flux<Notify> createSubscribe(String id,
+    private Flux<Notify> createSubscribe(Locale locale,
+                                         String id,
                                          String[] topics) {
         Subscription.Feature[] features = new Subscription.Feature[]{Subscription.Feature.local};
         return Flux
@@ -70,7 +81,7 @@ public class AlarmProvider implements SubscriberProvider {
                 .map(msg -> {
                     JSONObject json = msg.bodyToJson();
                     return Notify.of(
-                        getNotifyMessage(json),
+                        getNotifyMessage(locale, json),
                         //告警记录ID
                         json.getString("id"),
                         System.currentTimeMillis(),
@@ -80,18 +91,19 @@ public class AlarmProvider implements SubscriberProvider {
                 }));
     }
 
-    private static String getNotifyMessage(JSONObject json) {
+
+    private static String getNotifyMessage(Locale locale, JSONObject json) {
 
         String message;
         TargetType targetType = TargetType.of(json.getString("targetType"));
         String targetName = json.getString("targetName");
-        String alarmName = json.getString("alarmName");
+        String alarmName = json.getString("alarmConfigName");
         if (targetType == TargetType.other) {
             message = String.format("[%s]发生告警:[%s]!", targetName, alarmName);
         } else {
             message = String.format("%s[%s]发生告警:[%s]!", targetType.getText(), targetName, alarmName);
         }
-        return LocaleUtils.resolveMessage("message.alarm.notify." + targetType.name(), message, targetName, alarmName);
+        return LocaleUtils.resolveMessage("message.alarm.notify." + targetType.name(), locale, message, targetName, alarmName);
     }
 
     @Override