/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.core.topic;

import java.util.function.Function;
import org.jetlinks.core.topic.Router;
import org.jetlinks.core.topic.Topic;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class TopicRouter<T, R>
implements Router<T, R> {
    private static final Logger log = LoggerFactory.getLogger(TopicRouter.class);
    private final Topic<Function<T, Publisher<R>>> root = Topic.createRoot();

    TopicRouter() {
    }

    @Override
    public Router<T, R> route(String topic, Function<T, Publisher<R>> handler) {
        this.root.append(topic).subscribe(handler);
        return this;
    }

    @Override
    public Router<T, R> remove(String topic) {
        this.root.getTopic(topic).ifPresent(Topic::unsubscribeAll);
        return this;
    }

    @Override
    public Flux<Publisher<R>> execute(String topic, T data) {
        return this.root.findTopic(topic).flatMapIterable(Topic::getSubscribers).switchIfEmpty(Mono.fromRunnable(() -> log.debug("not handler for {}", (Object)topic))).distinct().map(runner -> (Publisher)runner.apply(data));
    }

    @Override
    public void close() {
        this.root.clean();
    }
}

