/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.core.device;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.jetlinks.core.device.DeviceOperationBroker;
import org.jetlinks.core.device.DeviceStateInfo;
import org.jetlinks.core.device.ReplyFailureHandler;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.BroadcastMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.server.MessageHandler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class StandaloneDeviceMessageBroker
implements DeviceOperationBroker,
MessageHandler {
    private static final Logger log = LoggerFactory.getLogger(StandaloneDeviceMessageBroker.class);
    private final Sinks.Many<Message> messageEmitterProcessor;
    private final Map<String, Sinks.Many<DeviceMessageReply>> replyProcessor = new ConcurrentHashMap<String, Sinks.Many<DeviceMessageReply>>();
    private final Map<String, AtomicInteger> partCache = new ConcurrentHashMap<String, AtomicInteger>();
    private ReplyFailureHandler replyFailureHandler = (error, message) -> log.warn("unhandled reply message:{}", (Object)message, (Object)error);
    private Function<Publisher<String>, Flux<DeviceStateInfo>> stateHandler;

    public StandaloneDeviceMessageBroker() {
        this(Sinks.many().multicast().onBackpressureBuffer());
    }

    public StandaloneDeviceMessageBroker(Sinks.Many<Message> processor) {
        this.messageEmitterProcessor = processor;
    }

    @Override
    public Flux<Message> handleSendToDeviceMessage(String serverId) {
        return this.messageEmitterProcessor.asFlux();
    }

    @Override
    public Disposable handleGetDeviceState(String serverId, Function<Publisher<String>, Flux<DeviceStateInfo>> stateMapper) {
        this.stateHandler = stateMapper;
        return () -> {
            this.stateHandler = null;
        };
    }

    @Override
    public Flux<DeviceStateInfo> getDeviceState(String serviceId, Collection<String> deviceIdList) {
        if (this.stateHandler != null) {
            return this.stateHandler.apply(Flux.fromIterable(deviceIdList));
        }
        return Flux.empty();
    }

    @Override
    public Mono<Boolean> reply(DeviceMessageReply message) {
        return Mono.defer(() -> {
            String messageId = message.getMessageId();
            if (StringUtils.isEmpty(messageId)) {
                log.warn("reply message messageId is empty: {}", (Object)message);
                return Mono.just(false);
            }
            String partMsgId = message.getHeader(Headers.fragmentBodyMessageId).orElse(null);
            if (partMsgId != null) {
                Sinks.Many<DeviceMessageReply> processor = this.replyProcessor.getOrDefault(partMsgId, this.replyProcessor.get(messageId));
                if (processor == null || processor.currentSubscriberCount() == 0) {
                    this.replyFailureHandler.handle(new NullPointerException("no reply handler"), message);
                    this.replyProcessor.remove(partMsgId);
                    return Mono.just(false);
                }
                int partTotal = message.getHeader(Headers.fragmentNumber).orElse(1);
                AtomicInteger counter = this.partCache.computeIfAbsent(partMsgId, ignore -> new AtomicInteger(partTotal));
                processor.emitNext(message, Sinks.EmitFailureHandler.FAIL_FAST);
                if (counter.decrementAndGet() <= 0) {
                    processor.tryEmitComplete();
                    this.replyProcessor.remove(partMsgId);
                }
                return Mono.just(true);
            }
            Sinks.Many<DeviceMessageReply> processor = this.replyProcessor.get(messageId);
            Sinks.EmitResult result = processor.tryEmitNext(message);
            if (result.isFailure()) {
                this.replyProcessor.remove(messageId);
                this.replyFailureHandler.handle(new NullPointerException("no reply handler " + result.name()), message);
                return Mono.just(false);
            }
            processor.tryEmitComplete();
            return Mono.just(true);
        }).doOnError(err -> this.replyFailureHandler.handle((Throwable)err, message));
    }

    @Override
    public Flux<DeviceMessageReply> handleReply(String deviceId, String messageId, Duration timeout) {
        return this.replyProcessor.computeIfAbsent(messageId, ignore -> Sinks.many().multicast().onBackpressureBuffer()).asFlux().timeout(timeout, Mono.error(() -> new DeviceOperationException(ErrorCode.TIME_OUT))).doFinally(signal -> this.replyProcessor.remove(messageId));
    }

    @Override
    public Mono<Integer> send(String serverId, Publisher<? extends Message> message) {
        if (this.messageEmitterProcessor.currentSubscriberCount() == 0) {
            return Mono.just(0);
        }
        return Flux.from(message).doOnNext(this.messageEmitterProcessor::tryEmitNext).then(Mono.just(Long.valueOf(this.messageEmitterProcessor.currentSubscriberCount()).intValue()));
    }

    @Override
    public Mono<Integer> send(Publisher<? extends BroadcastMessage> message) {
        return Mono.just(0);
    }

    public void setReplyFailureHandler(ReplyFailureHandler replyFailureHandler) {
        this.replyFailureHandler = replyFailureHandler;
    }
}

