/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.mqtt.impl;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.impl.MqttServerConnection;
import java.util.List;

public class MqttServerImpl
implements MqttServer {
    private static final Logger log = LoggerFactory.getLogger(MqttServerImpl.class);
    private final VertxInternal vertx;
    private final NetServer server;
    private Handler<MqttEndpoint> endpointHandler;
    private Handler<Throwable> exceptionHandler;
    private MqttServerOptions options;

    public MqttServerImpl(Vertx vertx, MqttServerOptions options) {
        this.vertx = (VertxInternal)vertx;
        this.server = vertx.createNetServer(options);
        this.options = options;
    }

    @Override
    public Future<MqttServer> listen() {
        return this.listen(this.options.getPort());
    }

    @Override
    public Future<MqttServer> listen(int port, String host) {
        Handler<MqttEndpoint> h1 = this.endpointHandler;
        Handler<Throwable> h2 = this.exceptionHandler;
        if (h1 == null) {
            return this.vertx.getOrCreateContext().failedFuture(new IllegalStateException("Please set handler before server is listening"));
        }
        this.server.connectHandler(so -> {
            NetSocketInternal soi = (NetSocketInternal)so;
            ChannelPipeline pipeline = soi.channelHandlerContext().pipeline();
            this.initChannel(pipeline);
            MqttServerConnection conn = new MqttServerConnection(soi, h1, h2, this.options);
            soi.eventHandler(evt -> {
                if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
                    MqttServerConnection mqttServerConnection = conn;
                    synchronized (mqttServerConnection) {
                        conn.handleHandshakeComplete((WebSocketServerProtocolHandler.HandshakeComplete)evt);
                    }
                }
                ReferenceCountUtil.release(evt);
            });
            soi.messageHandler(msg -> {
                MqttServerConnection mqttServerConnection = conn;
                synchronized (mqttServerConnection) {
                    conn.handleMessage(msg);
                }
            });
        });
        return this.server.listen(port, host).map(this);
    }

    @Override
    public Future<MqttServer> listen(int port) {
        return this.listen(port, this.options.getHost());
    }

    @Override
    public MqttServer listen(int port, Handler<AsyncResult<MqttServer>> listenHandler) {
        return this.listen(port, this.options.getHost(), listenHandler);
    }

    @Override
    public MqttServer listen(Handler<AsyncResult<MqttServer>> listenHandler) {
        return this.listen(this.options.getPort(), listenHandler);
    }

    @Override
    public MqttServer listen(int port, String host, Handler<AsyncResult<MqttServer>> listenHandler) {
        Future<MqttServer> fut = this.listen(port, host);
        if (listenHandler != null) {
            fut.onComplete(listenHandler);
        }
        return this;
    }

    @Override
    public synchronized MqttServer endpointHandler(Handler<MqttEndpoint> handler) {
        this.endpointHandler = handler;
        return this;
    }

    @Override
    public synchronized MqttServer exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    @Override
    public int actualPort() {
        return this.server.actualPort();
    }

    @Override
    public Future<Void> close() {
        return this.server.close();
    }

    @Override
    public void close(Handler<AsyncResult<Void>> completionHandler) {
        this.server.close(completionHandler);
    }

    private void initChannel(ChannelPipeline pipeline) {
        pipeline.addBefore("handler", "mqttEncoder", MqttEncoder.INSTANCE);
        if (this.options.getMaxMessageSize() > 0) {
            pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder(this.options.getMaxMessageSize()));
        } else {
            pipeline.addBefore("handler", "mqttDecoder", new MqttDecoder());
        }
        pipeline.addBefore("handler", "idle", new IdleStateHandler(this.options.timeoutOnConnect(), 0, 0));
        pipeline.addBefore("handler", "timeoutOnConnect", new ChannelDuplexHandler(){

            @Override
            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
                IdleStateEvent e;
                if (evt instanceof IdleStateEvent && (e = (IdleStateEvent)evt).state() == IdleState.READER_IDLE) {
                    ctx.channel().close();
                }
                super.userEventTriggered(ctx, evt);
            }
        });
        if (this.options.isUseWebSocket()) {
            int maxFrameSize = this.options.getWebSocketMaxFrameSize();
            pipeline.addBefore("mqttEncoder", "httpServerCodec", new HttpServerCodec());
            pipeline.addAfter("httpServerCodec", "aggregator", new HttpObjectAggregator(maxFrameSize));
            pipeline.addAfter("aggregator", "webSocketHandler", new WebSocketServerProtocolHandler("/mqtt", "mqtt, mqttv3.1, mqttv3.1.1", false, maxFrameSize, 10000L));
            pipeline.addAfter("webSocketHandler", "bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());
            pipeline.addAfter("bytebuf2wsEncoder", "ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
        }
    }

    static class ByteBufToWebSocketFrameEncoder
    extends MessageToMessageEncoder<ByteBuf> {
        ByteBufToWebSocketFrameEncoder() {
        }

        @Override
        protected void encode(ChannelHandlerContext chc, ByteBuf bb, List<Object> out) throws Exception {
            BinaryWebSocketFrame result = new BinaryWebSocketFrame();
            result.content().writeBytes(bb);
            out.add(result);
        }
    }

    static class WebSocketFrameToByteBufDecoder
    extends MessageToMessageDecoder<BinaryWebSocketFrame> {
        WebSocketFrameToByteBufDecoder() {
        }

        @Override
        protected void decode(ChannelHandlerContext chc, BinaryWebSocketFrame frame, List<Object> out) throws Exception {
            ByteBuf bb = frame.content();
            bb.retain();
            out.add(bb);
        }
    }
}

