/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.simulator.core.network.mqtt;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.messages.MqttConnAckMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.simulator.core.Connection;
import org.jetlinks.simulator.core.ExceptionUtils;
import org.jetlinks.simulator.core.Global;
import org.jetlinks.simulator.core.network.AbstractConnection;
import org.jetlinks.simulator.core.network.Address;
import org.jetlinks.simulator.core.network.AddressManager;
import org.jetlinks.simulator.core.network.NetworkType;
import org.jetlinks.simulator.core.network.NetworkUtils;
import org.jetlinks.simulator.core.network.mqtt.MqttOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

public class MqttClient
extends AbstractConnection {
    private static final Logger log = LoggerFactory.getLogger(MqttClient.class);
    io.vertx.mqtt.MqttClient client;
    Address address;
    private List<Consumer<MqttPublishMessage>> handlers;
    private final Map<Tuple2<String, Integer>, Subscriber> subscribers = new ConcurrentHashMap<Tuple2<String, Integer>, Subscriber>();

    private MqttClient(io.vertx.mqtt.MqttClient client, Address address) {
        this.client = client;
        this.address = address;
        this.client.publishHandler(msg -> {
            this.received(msg.payload().length());
            if (this.handlers != null) {
                for (Consumer<MqttPublishMessage> handler : this.handlers) {
                    try {
                        handler.accept((MqttPublishMessage)msg);
                    }
                    catch (Throwable error) {
                        log.warn("handle mqtt message {} {} error:{}", msg.topicName(), msg.payload().toString(), ExceptionUtils.getErrorMessage(error));
                    }
                }
            }
        }).closeHandler(e -> this.dispose());
    }

    public static Mono<MqttClient> connect(InetSocketAddress server, MqttOptions options) {
        return MqttClient.connect(Global.vertx(), server, options);
    }

    public static Mono<MqttClient> connect(Vertx vertx, InetSocketAddress server, MqttOptions options) {
        Address localAddress = AddressManager.global().takeAddress(options.getLocalAddress());
        return Mono.create(sink -> {
            MqttOptions clientOptions = options.copy();
            clientOptions.setClientId(options.getClientId());
            clientOptions.setUsername(options.getUsername());
            clientOptions.setPassword(options.getPassword());
            clientOptions.setLocalAddress(localAddress.getAddress().getHostAddress());
            clientOptions.setAutoKeepAlive(true);
            clientOptions.setTcpKeepAlive(true);
            clientOptions.setMaxMessageSize(0x100000);
            clientOptions.setReusePort(true);
            io.vertx.mqtt.MqttClient client = io.vertx.mqtt.MqttClient.create(vertx, clientOptions);
            client.connect(server.getPort(), server.getHostString(), res -> {
                if (res.failed()) {
                    sink.error(res.cause());
                    return;
                }
                MqttConnAckMessage msg = (MqttConnAckMessage)res.result();
                if (msg != null) {
                    if (msg.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
                        MqttClient mqttClient = new MqttClient(client, localAddress);
                        mqttClient.attribute("clientId", options.getClientId());
                        mqttClient.attribute("username", options.getUsername());
                        mqttClient.changeState(Connection.State.connected);
                        sink.success(mqttClient);
                    } else {
                        sink.error(new MqttConnectionException(msg.code()));
                    }
                }
                sink.success();
            });
        }).doOnError(err -> localAddress.release());
    }

    @Override
    public String getId() {
        return this.client.clientId();
    }

    @Override
    public NetworkType getType() {
        return NetworkType.mqtt_client;
    }

    @Override
    public boolean isAlive() {
        return this.client.isConnected();
    }

    public synchronized Disposable handle(Consumer<MqttPublishMessage> handler) {
        if (this.handlers == null) {
            this.handlers = new ArrayList<Consumer<MqttPublishMessage>>();
        }
        this.handlers.add(handler);
        return () -> this.handlers.remove(handler);
    }

    public void unsubscribe(String topic) {
        for (Subscriber value : this.subscribers.values()) {
            if (!value.topic.equals(topic)) continue;
            value.dispose();
        }
    }

    public Disposable subscribe(String topic, int qoS, Consumer<MqttPublishMessage> handler) {
        Subscriber subscriber = this.subscribers.computeIfAbsent(Tuples.of(topic, qoS), tp2 -> new Subscriber((String)tp2.getT1(), (Integer)tp2.getT2()));
        return subscriber.addHandler(handler);
    }

    public void publish(String topic, int qos, Object payload) {
        this.publishAsync(topic, qos, payload).doOnError(err -> log.warn("publish error {} {} {}", topic, payload, ExceptionUtils.getErrorMessage(err))).subscribe();
    }

    public Mono<Void> publishAsync(String topic, int qos, Object payload) {
        return this.publishAsync(topic, qos, NetworkUtils.castToByteBuf(payload));
    }

    public Mono<Void> publishAsync(String topic, int qos, ByteBuf payload) {
        Buffer buffer = Buffer.buffer(payload);
        int len = buffer.length();
        return Mono.create(sink -> this.client.publish(topic, buffer, MqttQoS.valueOf(qos), false, false, res -> {
            ReferenceCountUtil.safeRelease(payload);
            this.sent(len);
            if (res.failed()) {
                sink.error(res.cause());
            } else {
                sink.success();
            }
        })).doOnError(this::error);
    }

    public List<Subscriber> getSubscriptions() {
        return new ArrayList<Subscriber>(this.subscribers.values());
    }

    @Override
    protected void doDisposed() {
        this.address.release();
        super.doDisposed();
        if (this.client.isConnected()) {
            this.client.disconnect();
        }
    }

    @Override
    public void reset() {
        super.reset();
        this.getSubscriptions().forEach(Disposable::dispose);
        this.subscribers.clear();
    }

    public class Subscriber
    implements Disposable,
    Consumer<MqttPublishMessage> {
        private final String topic;
        private final int qos;
        private final List<Consumer<MqttPublishMessage>> handlers = new CopyOnWriteArrayList<Consumer<MqttPublishMessage>>();
        private final Disposable disposable;

        public Subscriber(String topic, int qos) {
            this.topic = topic;
            this.qos = qos;
            this.disposable = MqttClient.this.handle(this);
        }

        private Disposable addHandler(Consumer<MqttPublishMessage> handler) {
            if (this.handlers.isEmpty()) {
                MqttClient.this.client.subscribe(this.topic, this.qos);
            }
            this.handlers.add(handler);
            return () -> {
                this.handlers.remove(handler);
                this.tryDispose();
            };
        }

        private void tryDispose() {
            if (this.handlers.isEmpty()) {
                this.dispose();
            }
        }

        @Override
        public void dispose() {
            MqttClient.this.client.unsubscribe(this.topic);
            MqttClient.this.subscribers.clear();
            this.disposable.dispose();
            MqttClient.this.subscribers.remove(Tuples.of(this.topic, this.qos), this);
        }

        @Override
        public void accept(MqttPublishMessage msg) {
            if (TopicUtils.match(this.topic, msg.topicName())) {
                for (Consumer<MqttPublishMessage> handler : this.handlers) {
                    handler.accept(msg);
                }
            }
        }

        public String getTopic() {
            return this.topic;
        }

        public int getQos() {
            return this.qos;
        }
    }
}

