/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.simulator.cmd.mqtt;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.vertx.mqtt.messages.MqttPublishMessage;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.jetlinks.simulator.cmd.AbstractCommand;
import org.jetlinks.simulator.cmd.CommonCommand;
import org.jetlinks.simulator.cmd.ConnectionAttachCommand;
import org.jetlinks.simulator.cmd.mqtt.MqttPublishCommand;
import org.jetlinks.simulator.core.ExceptionUtils;
import org.jetlinks.simulator.core.network.mqtt.MqttClient;
import org.jline.utils.AttributedString;
import org.joda.time.DateTime;
import picocli.CommandLine;

@CommandLine.Command(name="subscribe", aliases={"attach"}, description={"Subscribe mqtt client topic"}, hidden=false)
public class MqttAttachCommand
extends ConnectionAttachCommand {
    @Override
    @CommandLine.Parameters(paramLabel="clientId", completionCandidates=MqttPublishCommand.IdComplete.class)
    public void setId(String id) {
        super.setId(id);
    }

    @Override
    protected void doInit() {
        this.disposable.add(this.connection.unwrap(MqttClient.class).handle(this::appendMessage));
    }

    @Override
    protected void doDestroy() {
        super.doDestroy();
    }

    @Override
    protected void createHeader(List<AttributedString> lines) {
        super.createHeader(lines);
        lines.add(MqttAttachCommand.createLine(builder -> {
            builder.append("Subscriptions: ");
            for (MqttClient.Subscriber subscription : this.connection.unwrap(MqttClient.class).getSubscriptions()) {
                builder.append(subscription.getTopic(), green).append("(QoS " + subscription.getQos() + ") ", blue);
            }
        }));
    }

    @Override
    protected AbstractCommand createCommand() {
        return new AttachCommands();
    }

    private void appendMessage(MqttPublishMessage message) {
        ArrayList<AttributedString> msgLine = new ArrayList<AttributedString>();
        msgLine.add(MqttAttachCommand.createLine(builder -> builder.append(new DateTime().toString("HH:mm:ss"), red).append(" ").append(message.topicName(), green).append(" ").append("(QoS " + message.qosLevel().value() + ") ", blue)));
        ByteBuf byteBuf = message.payload().getByteBuf();
        String str = ByteBufUtil.isText(byteBuf, StandardCharsets.UTF_8) ? byteBuf.toString(StandardCharsets.UTF_8) : ByteBufUtil.prettyHexDump(byteBuf);
        for (String n : str.split("\n")) {
            msgLine.add(MqttAttachCommand.createLine(builder -> builder.append(n, green)));
        }
        this.messages.add(msgLine);
        if (this.messages.size() > 50) {
            this.messages.removeFirst();
        }
    }

    @CommandLine.Command(name="disconnect", description={"Disconnect mqtt"})
    static class Disconnect
    extends CommonCommand {
        Disconnect() {
        }

        @Override
        public void run() {
            ((AttachCommands)this.parent).disconnect();
        }
    }

    @CommandLine.Command(name="unsubscribe", description={"Unsubscribe mqtt message"})
    static class Unsubscribe
    extends CommonCommand {
        @CommandLine.Parameters(description={"mqtt topic"})
        String[] topics;

        Unsubscribe() {
        }

        @Override
        public void run() {
            ((AttachCommands)this.parent).unsubscribe(this);
        }
    }

    @CommandLine.Command(name="subscribe", description={"Subscribe mqtt message"})
    static class Subscribe
    extends CommonCommand {
        @CommandLine.Parameters(description={"mqtt topic"})
        String[] topics;
        @CommandLine.Option(names={"-q", "--qos"}, description={"QoS Level"}, defaultValue="0")
        int qos;

        Subscribe() {
        }

        @Override
        public void run() {
            ((AttachCommands)this.parent).subscribe(this);
        }
    }

    @CommandLine.Command(name="publish", description={"Publish mqtt message"})
    static class Publish
    extends CommonCommand {
        @CommandLine.Option(names={"-t", "--topic"}, required=true, description={"mqtt topic"})
        String topic;
        @CommandLine.Option(names={"-q", "--qos"}, description={"QoS Level"}, defaultValue="0")
        int qos;
        @CommandLine.Parameters(arity="1", description={"0x\u5f00\u5934\u4e3a16\u8fdb\u5236"})
        String payload;

        Publish() {
        }

        @Override
        public void run() {
            ((AttachCommands)this.getParent()).publish(this);
        }
    }

    @CommandLine.Command(name="", subcommands={Publish.class, Subscribe.class, Unsubscribe.class, Disconnect.class}, customSynopsis={""}, synopsisHeading="")
    class AttachCommands
    extends CommonCommand {
        AttachCommands() {
        }

        void publish(Publish publish) {
            MqttClient client = MqttAttachCommand.this.connection.unwrap(MqttClient.class);
            try {
                this.printf("publishing ", new Object[0]);
                client.publishAsync(publish.topic, publish.qos, publish.payload).block(Duration.ofSeconds(10L));
                this.printf("success!%n", new Object[0]);
            }
            catch (Throwable e) {
                this.printfError("error:%s%n", ExceptionUtils.getErrorMessage(e));
            }
        }

        void unsubscribe(Unsubscribe subscribe) {
            for (String topic : subscribe.topics) {
                MqttAttachCommand.this.connection.unwrap(MqttClient.class).unsubscribe(topic);
            }
        }

        void subscribe(Subscribe subscribe) {
            for (String topic : subscribe.topics) {
                MqttAttachCommand.this.connection.unwrap(MqttClient.class).subscribe(topic, subscribe.qos, ignore -> {});
            }
        }

        void disconnect() {
            MqttAttachCommand.this.connection.dispose();
        }
    }
}

