/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.mqtt.impl;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageFactory;
import io.netty.handler.codec.mqtt.MqttMessageIdAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPubReplyMessageVariableHeader;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttUnsubAckPayload;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.NetSocketInternal;
import io.vertx.mqtt.MqttAuth;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.MqttWill;
import io.vertx.mqtt.messages.MqttDisconnectMessage;
import io.vertx.mqtt.messages.MqttPubAckMessage;
import io.vertx.mqtt.messages.MqttPubCompMessage;
import io.vertx.mqtt.messages.MqttPubRecMessage;
import io.vertx.mqtt.messages.MqttPubRelMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubscribeMessage;
import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
import io.vertx.mqtt.messages.codes.MqttDisconnectReasonCode;
import io.vertx.mqtt.messages.codes.MqttPubAckReasonCode;
import io.vertx.mqtt.messages.codes.MqttPubCompReasonCode;
import io.vertx.mqtt.messages.codes.MqttPubRecReasonCode;
import io.vertx.mqtt.messages.codes.MqttPubRelReasonCode;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
import io.vertx.mqtt.messages.codes.MqttUnsubAckReasonCode;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import javax.net.ssl.SSLSession;

public class MqttEndpointImpl
implements MqttEndpoint {
    private static final int MAX_MESSAGE_ID = 65535;
    private static final Logger log = LoggerFactory.getLogger(MqttEndpointImpl.class);
    private final NetSocketInternal conn;
    private String clientIdentifier;
    private final MqttAuth auth;
    private final MqttWill will;
    private final boolean isCleanSession;
    private final int protocolVersion;
    private final String protocolName;
    private final int keepAliveTimeoutSeconds;
    private final MqttProperties connectProperties;
    private final MultiMap httpHeaders;
    private String httpRequestUri;
    private Handler<MqttSubscribeMessage> subscribeHandler;
    private Handler<MqttUnsubscribeMessage> unsubscribeHandler;
    private Handler<MqttPublishMessage> publishHandler;
    private Handler<Integer> pubackHandler;
    private Handler<MqttPubAckMessage> pubackHandlerWithMessage;
    private Handler<Integer> pubrecHandler;
    private Handler<MqttPubRecMessage> pubrecHandlerWithMessage;
    private Handler<Integer> pubrelHandler;
    private Handler<MqttPubRelMessage> pubrelHandlerWithMessage;
    private Handler<Integer> pubcompHandler;
    private Handler<MqttPubCompMessage> pubcompHandlerWithMessage;
    private Handler<Void> disconnectHandler;
    private Handler<MqttDisconnectMessage> disconnectHandlerWithMessage;
    private Handler<Void> pingreqHandler;
    private Handler<Void> closeHandler;
    private Handler<Throwable> exceptionHandler;
    private boolean isConnected;
    private boolean isClosed;
    private int messageIdCounter;
    private boolean isSubscriptionAutoAck;
    private boolean isPublishAutoAck;
    private boolean isAutoKeepAlive = true;

    public MqttEndpointImpl(NetSocketInternal conn, String clientIdentifier, MqttAuth auth, MqttWill will, boolean isCleanSession, int protocolVersion, String protocolName, int keepAliveTimeoutSeconds, MqttProperties connectProperties, MultiMap httpHeaders, String httpRequestUri) {
        this.conn = conn;
        this.clientIdentifier = clientIdentifier;
        this.auth = auth;
        this.will = will;
        this.isCleanSession = isCleanSession;
        this.protocolVersion = protocolVersion;
        this.protocolName = protocolName;
        this.keepAliveTimeoutSeconds = keepAliveTimeoutSeconds;
        this.connectProperties = connectProperties;
        this.httpHeaders = httpHeaders;
        this.httpRequestUri = httpRequestUri;
    }

    @Override
    public String clientIdentifier() {
        return this.clientIdentifier;
    }

    @Override
    public MqttAuth auth() {
        return this.auth;
    }

    @Override
    public MqttWill will() {
        return this.will;
    }

    @Override
    public boolean isCleanSession() {
        return this.isCleanSession;
    }

    @Override
    public int protocolVersion() {
        return this.protocolVersion;
    }

    @Override
    public String protocolName() {
        return this.protocolName;
    }

    @Override
    public int keepAliveTimeSeconds() {
        return this.keepAliveTimeoutSeconds;
    }

    @Override
    public int lastMessageId() {
        return this.messageIdCounter;
    }

    @Override
    public void subscriptionAutoAck(boolean isSubscriptionAutoAck) {
        this.isSubscriptionAutoAck = isSubscriptionAutoAck;
    }

    @Override
    public boolean isSubscriptionAutoAck() {
        return this.isSubscriptionAutoAck;
    }

    @Override
    public MqttEndpoint publishAutoAck(boolean isPublishAutoAck) {
        this.isPublishAutoAck = isPublishAutoAck;
        return this;
    }

    @Override
    public boolean isPublishAutoAck() {
        return this.isPublishAutoAck;
    }

    @Override
    public MqttEndpoint autoKeepAlive(boolean isAutoKeepAlive) {
        this.isAutoKeepAlive = isAutoKeepAlive;
        return this;
    }

    @Override
    public boolean isAutoKeepAlive() {
        return this.isAutoKeepAlive;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isConnected() {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            return this.isConnected;
        }
    }

    @Override
    public MqttProperties connectProperties() {
        return this.connectProperties;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpoint setClientIdentifier(String clientIdentifier) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.clientIdentifier = clientIdentifier;
        }
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl disconnectHandler(Handler<Void> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.disconnectHandler = handler;
            return this;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl disconnectMessageHandler(Handler<MqttDisconnectMessage> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.disconnectHandlerWithMessage = handler;
            return this;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl subscribeHandler(Handler<MqttSubscribeMessage> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.subscribeHandler = handler;
            return this;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl unsubscribeHandler(Handler<MqttUnsubscribeMessage> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.unsubscribeHandler = handler;
            return this;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl publishHandler(Handler<MqttPublishMessage> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.publishHandler = handler;
            return this;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl publishAcknowledgeHandler(Handler<Integer> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.pubackHandler = handler;
            return this;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl publishAcknowledgeMessageHandler(Handler<MqttPubAckMessage> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.pubackHandlerWithMessage = handler;
            return this;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl publishReceivedHandler(Handler<Integer> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.pubrecHandler = handler;
            return this;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl publishReceivedMessageHandler(Handler<MqttPubRecMessage> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.pubrecHandlerWithMessage = handler;
            return this;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl publishReleaseHandler(Handler<Integer> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.pubrelHandler = handler;
            return this;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl publishReleaseMessageHandler(Handler<MqttPubRelMessage> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.pubrelHandlerWithMessage = handler;
            return this;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl publishCompletionHandler(Handler<Integer> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.pubcompHandler = handler;
            return this;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl publishCompletionMessageHandler(Handler<MqttPubCompMessage> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.pubcompHandlerWithMessage = handler;
            return this;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl pingHandler(Handler<Void> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.pingreqHandler = handler;
            return this;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl closeHandler(Handler<Void> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.closeHandler = handler;
            return this;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl exceptionHandler(Handler<Throwable> handler) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.exceptionHandler = handler;
            return this;
        }
    }

    private MqttEndpointImpl connack(MqttConnectReturnCode returnCode, boolean sessionPresent, MqttProperties properties) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, sessionPresent, properties);
        MqttMessage connack = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
        this.write(connack);
        if (returnCode != MqttConnectReturnCode.CONNECTION_ACCEPTED) {
            this.close();
        } else {
            this.isConnected = true;
        }
        return this;
    }

    @Override
    public MqttEndpoint accept() {
        return this.accept(false);
    }

    @Override
    public MqttEndpointImpl accept(boolean sessionPresent) {
        return this.accept(sessionPresent, MqttProperties.NO_PROPERTIES);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl accept(boolean sessionPresent, MqttProperties properties) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            if (this.isConnected) {
                throw new IllegalStateException("Connection already accepted");
            }
            return this.connack(MqttConnectReturnCode.CONNECTION_ACCEPTED, sessionPresent, properties);
        }
    }

    @Override
    public MqttEndpointImpl reject(MqttConnectReturnCode returnCode) {
        return this.reject(returnCode, MqttProperties.NO_PROPERTIES);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MqttEndpointImpl reject(MqttConnectReturnCode returnCode, MqttProperties properties) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            if (returnCode == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
                throw new IllegalArgumentException("Need to use the 'accept' method for accepting connection");
            }
            return this.connack(returnCode, false, properties);
        }
    }

    @Override
    public MqttEndpointImpl subscribeAcknowledge(int subscribeMessageId, List<MqttQoS> grantedQoSLevels) {
        return this.subscribeAcknowledgeWithCode(subscribeMessageId, grantedQoSLevels.stream().mapToInt(MqttQoS::value).toArray(), MqttProperties.NO_PROPERTIES);
    }

    @Override
    public MqttEndpointImpl subscribeAcknowledge(int subscribeMessageId, List<MqttSubAckReasonCode> reasonCodes, MqttProperties properties) {
        return this.subscribeAcknowledgeWithCode(subscribeMessageId, reasonCodes.stream().mapToInt(MqttSubAckReasonCode::value).toArray(), properties);
    }

    private MqttEndpointImpl subscribeAcknowledgeWithCode(int subscribeMessageId, int[] reasonCodes, MqttProperties properties) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(subscribeMessageId, properties);
        MqttSubAckPayload payload = new MqttSubAckPayload(reasonCodes);
        MqttMessage suback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
        this.write(suback);
        return this;
    }

    @Override
    public MqttEndpointImpl unsubscribeAcknowledge(int unsubscribeMessageId) {
        return this.unsubscribeAcknowledge(unsubscribeMessageId, Collections.emptyList(), MqttProperties.NO_PROPERTIES);
    }

    @Override
    public MqttEndpointImpl unsubscribeAcknowledge(int unsubscribeMessageId, List<MqttUnsubAckReasonCode> reasonCodes, MqttProperties properties) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(unsubscribeMessageId, properties);
        short[] reasoneCodesNum = new short[reasonCodes.size()];
        for (int i2 = 0; i2 < reasoneCodesNum.length; ++i2) {
            reasoneCodesNum[i2] = reasonCodes.get(i2).value();
        }
        MqttUnsubAckPayload payload = new MqttUnsubAckPayload(reasoneCodesNum);
        MqttMessage unsuback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, payload);
        this.write(unsuback);
        return this;
    }

    @Override
    public MqttEndpointImpl publishAcknowledge(int publishMessageId) {
        return this.publishAcknowledge(publishMessageId, MqttPubAckReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES);
    }

    @Override
    public MqttEndpointImpl publishAcknowledge(int publishMessageId, MqttPubAckReasonCode reasonCode, MqttProperties properties) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttPubReplyMessageVariableHeader variableHeader = new MqttPubReplyMessageVariableHeader(publishMessageId, reasonCode.value(), properties);
        MqttMessage puback = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
        this.write(puback);
        return this;
    }

    @Override
    public MqttEndpointImpl publishReceived(int publishMessageId) {
        return this.publishReceived(publishMessageId, MqttPubRecReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES);
    }

    @Override
    public MqttEndpointImpl publishReceived(int publishMessageId, MqttPubRecReasonCode reasonCode, MqttProperties properties) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(publishMessageId, properties);
        MqttMessage pubrec = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
        this.write(pubrec);
        return this;
    }

    @Override
    public MqttEndpointImpl publishRelease(int publishMessageId) {
        return this.publishRelease(publishMessageId, MqttPubRelReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES);
    }

    @Override
    public MqttEndpointImpl publishRelease(int publishMessageId, MqttPubRelReasonCode reasonCode, MqttProperties properties) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
        MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(publishMessageId, properties);
        MqttMessage pubrel = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
        this.write(pubrel);
        return this;
    }

    @Override
    public MqttEndpointImpl publishComplete(int publishMessageId) {
        return this.publishComplete(publishMessageId, MqttPubCompReasonCode.SUCCESS, MqttProperties.NO_PROPERTIES);
    }

    @Override
    public MqttEndpointImpl publishComplete(int publishMessageId, MqttPubCompReasonCode reasonCode, MqttProperties properties) {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(publishMessageId, properties);
        MqttMessage pubcomp = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
        this.write(pubcomp);
        return this;
    }

    @Override
    public Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain) {
        return this.publish(topic, payload, qosLevel, isDup, isRetain, this.nextMessageId());
    }

    @Override
    public MqttEndpointImpl publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain, Handler<AsyncResult<Integer>> publishSentHandler) {
        return this.publish(topic, payload, qosLevel, isDup, isRetain, this.nextMessageId(), (Handler)publishSentHandler);
    }

    @Override
    public Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain, int messageId) {
        return this.publish(topic, payload, qosLevel, isDup, isRetain, messageId, MqttProperties.NO_PROPERTIES);
    }

    @Override
    public Future<Integer> publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain, int messageId, MqttProperties properties) {
        if (messageId > 65535 || messageId < 0) {
            throw new IllegalArgumentException("messageId must be non-negative integer not larger than 65535");
        }
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, isDup, qosLevel, isRetain, 0);
        MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(topic, messageId, properties);
        ByteBuf buf = Unpooled.copiedBuffer(payload.getBytes());
        MqttMessage publish = MqttMessageFactory.newMessage(fixedHeader, variableHeader, buf);
        return this.write(publish).map((Object)variableHeader.packetId());
    }

    @Override
    public MqttEndpointImpl publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain, int messageId, Handler<AsyncResult<Integer>> publishSentHandler) {
        return this.publish(topic, payload, qosLevel, isDup, isRetain, messageId, MqttProperties.NO_PROPERTIES, (Handler)publishSentHandler);
    }

    @Override
    public MqttEndpointImpl publish(String topic, Buffer payload, MqttQoS qosLevel, boolean isDup, boolean isRetain, int messageId, MqttProperties properties, Handler<AsyncResult<Integer>> publishSentHandler) {
        Future<Integer> fut = this.publish(topic, payload, qosLevel, isDup, isRetain, messageId, properties);
        if (publishSentHandler != null) {
            fut.onComplete(publishSentHandler);
        }
        return this;
    }

    @Override
    public MqttEndpointImpl pong() {
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttMessage pingresp = MqttMessageFactory.newMessage(fixedHeader, null, null);
        this.write(pingresp);
        return this;
    }

    @Override
    public MqttEndpointImpl disconnect(MqttDisconnectReasonCode code, MqttProperties properties) {
        if (this.protocolVersion >= MqttVersion.MQTT_5.protocolLevel()) {
            MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.DISCONNECT, false, MqttQoS.AT_MOST_ONCE, false, 0);
            MqttReasonCodeAndPropertiesVariableHeader variableHeader = new MqttReasonCodeAndPropertiesVariableHeader(code.value(), properties);
            MqttMessage disconnect = MqttMessageFactory.newMessage(fixedHeader, variableHeader, null);
            this.write(disconnect);
        }
        this.close();
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleSubscribe(MqttSubscribeMessage msg) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            if (this.subscribeHandler != null) {
                this.subscribeHandler.handle(msg);
            }
            if (this.isSubscriptionAutoAck) {
                this.subscribeAcknowledge(msg.messageId(), msg.topicSubscriptions().stream().map(MqttTopicSubscription::qualityOfService).collect(Collectors.toList()));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleUnsubscribe(MqttUnsubscribeMessage msg) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            if (this.unsubscribeHandler != null) {
                this.unsubscribeHandler.handle(msg);
            }
            if (this.isSubscriptionAutoAck) {
                this.unsubscribeAcknowledge(msg.messageId());
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handlePublish(MqttPublishMessage msg) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            if (this.publishHandler != null) {
                this.publishHandler.handle(msg);
            }
            if (this.isPublishAutoAck) {
                switch (msg.qosLevel()) {
                    case AT_LEAST_ONCE: {
                        this.publishAcknowledge(msg.messageId());
                        break;
                    }
                    case EXACTLY_ONCE: {
                        this.publishReceived(msg.messageId());
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handlePuback(int pubackMessageId, MqttPubAckReasonCode code, MqttProperties properties) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            if (this.pubackHandler != null) {
                this.pubackHandler.handle(pubackMessageId);
            }
            if (this.pubackHandlerWithMessage != null) {
                this.pubackHandlerWithMessage.handle(MqttPubAckMessage.create(pubackMessageId, code, properties));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handlePubrec(int pubrecMessageId, MqttPubRecReasonCode code, MqttProperties properties) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            if (this.pubrecHandler != null) {
                this.pubrecHandler.handle(pubrecMessageId);
            }
            if (this.pubrecHandlerWithMessage != null) {
                this.pubrecHandlerWithMessage.handle(MqttPubRecMessage.create(pubrecMessageId, code, properties));
            }
            if (this.isPublishAutoAck) {
                this.publishRelease(pubrecMessageId);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handlePubrel(int pubrelMessageId, MqttPubRelReasonCode code, MqttProperties properties) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            if (this.pubrelHandler != null) {
                this.pubrelHandler.handle(pubrelMessageId);
            }
            if (this.pubrelHandlerWithMessage != null) {
                this.pubrelHandlerWithMessage.handle(MqttPubRelMessage.create(pubrelMessageId, code, properties));
            }
            if (this.isPublishAutoAck) {
                this.publishComplete(pubrelMessageId);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handlePubcomp(int pubcompMessageId, MqttPubCompReasonCode code, MqttProperties properties) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            if (this.pubcompHandler != null) {
                this.pubcompHandler.handle(pubcompMessageId);
            }
            if (this.pubcompHandlerWithMessage != null) {
                this.pubcompHandlerWithMessage.handle(MqttPubCompMessage.create(pubcompMessageId, code, properties));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handlePingreq() {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            if (this.pingreqHandler != null) {
                this.pingreqHandler.handle(null);
            }
            if (this.isAutoKeepAlive) {
                this.pong();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleDisconnect(MqttDisconnectReasonCode code, MqttProperties properties) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            if (this.disconnectHandler != null) {
                this.disconnectHandler.handle(null);
            }
            if (this.disconnectHandlerWithMessage != null) {
                this.disconnectHandlerWithMessage.handle(MqttDisconnectMessage.create(code, properties));
            }
            this.close();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleClosed() {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.cleanup();
            if (this.closeHandler != null) {
                this.closeHandler.handle(null);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleException(Throwable t) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            if (this.exceptionHandler != null) {
                this.exceptionHandler.handle(t);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            this.conn.close();
            this.cleanup();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SocketAddress localAddress() {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            return this.conn.localAddress();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SocketAddress remoteAddress() {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            return this.conn.remoteAddress();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isSsl() {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            return this.conn.isSsl();
        }
    }

    @Override
    public MultiMap httpHeaders() {
        return this.httpHeaders;
    }

    @Override
    public String httpRequestURI() {
        return this.httpRequestUri;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public SSLSession sslSession() {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            this.checkClosed();
            return this.conn.sslSession();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Future<Void> write(MqttMessage mqttMessage) {
        NetSocketInternal netSocketInternal = this.conn;
        synchronized (netSocketInternal) {
            if (mqttMessage.fixedHeader().messageType() != MqttMessageType.CONNACK) {
                this.checkConnected();
            }
            return this.conn.writeMessage(mqttMessage);
        }
    }

    private void checkClosed() {
        if (this.isClosed) {
            throw new IllegalStateException("MQTT endpoint is closed");
        }
    }

    private void checkConnected() {
        if (!this.isConnected) {
            throw new IllegalStateException("Connection not accepted yet");
        }
    }

    private void cleanup() {
        if (!this.isClosed) {
            this.isClosed = true;
            this.isConnected = false;
        }
    }

    private int nextMessageId() {
        this.messageIdCounter = this.messageIdCounter % 65535 != 0 ? this.messageIdCounter + 1 : 1;
        return this.messageIdCounter;
    }
}

