/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.demo.protocol.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import java.util.Arrays;
import org.apache.commons.codec.binary.Hex;
import org.jetlinks.core.Value;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.DeviceMessageCodec;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.FromDeviceMessageContext;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.MessageEncodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.ReportPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.demo.protocol.tcp.DemoTcpMessage;
import org.jetlinks.demo.protocol.tcp.MessageType;
import org.jetlinks.demo.protocol.tcp.TcpDeviceMessage;
import org.jetlinks.demo.protocol.tcp.TcpStatus;
import org.jetlinks.demo.protocol.tcp.message.AuthRequest;
import org.jetlinks.demo.protocol.tcp.message.AuthResponse;
import org.jetlinks.demo.protocol.tcp.message.ErrorMessage;
import org.jetlinks.demo.protocol.tcp.message.Pong;
import org.jetlinks.demo.protocol.tcp.message.ReadProperty;
import org.jetlinks.demo.protocol.tcp.message.ReportProperty;
import org.jetlinks.demo.protocol.tcp.message.WriteProperty;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class DemoTcpMessageCodec
implements DeviceMessageCodec {
    private static final Logger log = LoggerFactory.getLogger(DemoTcpMessageCodec.class);
    private DeviceRegistry registry;

    public Transport getSupportTransport() {
        return DefaultTransport.TCP;
    }

    public Mono<DeviceMessage> decode(MessageDecodeContext context) {
        return Mono.defer(() -> {
            DemoTcpMessage message;
            FromDeviceMessageContext ctx = (FromDeviceMessageContext)context;
            ByteBuf byteBuf = context.getMessage().getPayload();
            byte[] payload = ByteBufUtil.getBytes((ByteBuf)byteBuf, (int)0, (int)byteBuf.readableBytes(), (boolean)false);
            if (log.isDebugEnabled()) {
                log.debug("handle tcp message:\n{}", (Object)Hex.encodeHexString((byte[])payload));
            }
            try {
                message = DemoTcpMessage.of(payload);
                if (log.isDebugEnabled()) {
                    log.debug("decode tcp message:\n{}\n{}", (Object)Hex.encodeHexString((byte[])payload), (Object)message);
                }
            }
            catch (Exception e) {
                log.warn("decode tcp message error:[{}]", (Object)Hex.encodeHexString((byte[])payload), (Object)e);
                return Mono.error((Throwable)e);
            }
            DeviceSession session = ctx.getSession();
            if (session.getOperator() == null) {
                if (message.getType() != MessageType.AUTH_REQ) {
                    log.warn("tcp session[{}], unauthorized.", (Object)session.getId());
                    return session.send(EncodedMessage.simple((ByteBuf)DemoTcpMessage.of(MessageType.ERROR, ErrorMessage.of(TcpStatus.UN_AUTHORIZED)).toByteBuf())).then(Mono.fromRunnable(() -> ((DeviceSession)session).close()));
                }
                AuthRequest request = (AuthRequest)message.getData();
                String deviceId = this.buildDeviceId(request.getDeviceId());
                return this.registry.getDevice(this.buildDeviceId(request.getDeviceId())).flatMap(operator -> operator.getConfig("tcp_auth_key").map(Value::asString).filter(key -> Arrays.equals(request.getKey(), key.getBytes())).flatMap(msg -> {
                    DeviceOnlineMessage onlineMessage = new DeviceOnlineMessage();
                    onlineMessage.setDeviceId(deviceId);
                    onlineMessage.setTimestamp(System.currentTimeMillis());
                    return session.send(EncodedMessage.simple((ByteBuf)DemoTcpMessage.of(MessageType.AUTH_RES, AuthResponse.of(request.getDeviceId(), TcpStatus.SUCCESS)).toByteBuf())).thenReturn((Object)onlineMessage);
                })).switchIfEmpty(Mono.defer(() -> session.send(EncodedMessage.simple((ByteBuf)DemoTcpMessage.of(MessageType.AUTH_RES, AuthResponse.of(request.getDeviceId(), TcpStatus.ILLEGAL_ARGUMENTS)).toByteBuf())).then(Mono.empty())));
            }
            if (message.getType() == MessageType.PING) {
                return session.send(EncodedMessage.simple((ByteBuf)Unpooled.wrappedBuffer((byte[])DemoTcpMessage.of(MessageType.PONG, new Pong()).toBytes()))).then(Mono.fromRunnable(() -> ((DeviceSession)session).ping()));
            }
            if (message.getData() instanceof TcpDeviceMessage) {
                return Mono.justOrEmpty((Object)((TcpDeviceMessage)((Object)message.getData())).toDeviceMessage());
            }
            return Mono.empty();
        });
    }

    public String buildDeviceId(long deviceId) {
        return String.valueOf(deviceId);
    }

    public Publisher<? extends EncodedMessage> encode(MessageEncodeContext context) {
        DemoTcpMessage of;
        Message message = context.getMessage();
        EncodedMessage encodedMessage = null;
        log.info("\u63a8\u9001\u8bbe\u5907\u6d88\u606f\uff0c\u6d88\u606fID\uff1a{}", (Object)message.getMessageId());
        if (message instanceof ReadPropertyMessage) {
            ReadPropertyMessage readPropertyMessage = (ReadPropertyMessage)message;
            of = DemoTcpMessage.of(MessageType.READ_PROPERTY, ReadProperty.of(readPropertyMessage));
            encodedMessage = EncodedMessage.simple((ByteBuf)of.toByteBuf());
        }
        if (message instanceof WritePropertyMessage) {
            WritePropertyMessage writePropertyMessage = (WritePropertyMessage)message;
            of = DemoTcpMessage.of(MessageType.WRITE_PROPERTY, WriteProperty.of(writePropertyMessage));
            encodedMessage = EncodedMessage.simple((ByteBuf)of.toByteBuf());
        }
        if (message instanceof ReportPropertyMessage) {
            ReportPropertyMessage reportPropertyMessage = (ReportPropertyMessage)message;
            of = DemoTcpMessage.of(MessageType.REPORT_TEMPERATURE, ReportProperty.of(reportPropertyMessage));
            encodedMessage = EncodedMessage.simple((ByteBuf)of.toByteBuf());
        }
        return encodedMessage != null ? Mono.just(encodedMessage) : Mono.empty();
    }

    public DemoTcpMessageCodec(DeviceRegistry registry) {
        this.registry = registry;
    }
}

