/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.simulator.core.network.udp;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramPacket;
import io.vertx.core.datagram.DatagramSocket;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import org.jetlinks.simulator.core.Connection;
import org.jetlinks.simulator.core.Global;
import org.jetlinks.simulator.core.network.AbstractConnection;
import org.jetlinks.simulator.core.network.Address;
import org.jetlinks.simulator.core.network.AddressManager;
import org.jetlinks.simulator.core.network.NetworkType;
import org.jetlinks.simulator.core.network.NetworkUtils;
import org.jetlinks.simulator.core.network.udp.UDPOptions;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

public class UDPClient
extends AbstractConnection {
    private final String id;
    private final DatagramSocket socket;
    private final InetSocketAddress remote;
    private final Address address;
    private final List<Consumer<DatagramPacket>> handlers = new CopyOnWriteArrayList<Consumer<DatagramPacket>>();

    public UDPClient(String id, DatagramSocket socket, InetSocketAddress remote, Address address) {
        this.id = id;
        this.socket = socket;
        this.remote = remote;
        this.address = address;
        this.changeState(Connection.State.connected);
        socket.handler(packet -> {
            this.received(packet.data().length());
            for (Consumer<DatagramPacket> handler : this.handlers) {
                handler.accept((DatagramPacket)packet);
            }
        });
    }

    public InetSocketAddress getRemote() {
        return this.remote;
    }

    public InetSocketAddress getLocal() {
        return InetSocketAddress.createUnresolved(this.socket.localAddress().host(), this.socket.localAddress().port());
    }

    public static Mono<UDPClient> create(UDPOptions options) {
        Address address = AddressManager.global().takeAddress(options.getLocalAddress());
        options.setLocalAddress(address.getAddress().getHostAddress());
        return Mono.fromCompletionStage(() -> {
            options.setReusePort(true);
            return Global.vertx().createDatagramSocket(options).listen(0, options.getLocalAddress()).map(socket -> new UDPClient(options.getId(), (DatagramSocket)socket, InetSocketAddress.createUnresolved(options.getHost(), options.getPort()), address)).toCompletionStage();
        });
    }

    @Override
    public String getId() {
        return this.id;
    }

    public void send(Object packet) {
        this.sendAsync(packet).subscribe();
    }

    public Disposable handlePayload(Consumer<Buffer> buffer) {
        return this.handle(packet -> buffer.accept(packet.data()));
    }

    public Disposable handle(Consumer<DatagramPacket> consumer) {
        this.handlers.add(consumer);
        return () -> this.handlers.remove(consumer);
    }

    @Override
    public void reset() {
        super.reset();
        this.handlers.clear();
    }

    public Mono<Void> sendAsync(String host, int port, Object packet) {
        ByteBuf buf = NetworkUtils.castToByteBuf(packet);
        Buffer buffer = Buffer.buffer(buf);
        int len = buffer.length();
        return Mono.fromCompletionStage(() -> this.socket.send(Buffer.buffer(buf), port, host).toCompletionStage()).doAfterTerminate(() -> {
            this.sent(len);
            ReferenceCountUtil.safeRelease(buf);
        }).doOnError(this::error);
    }

    public Mono<Void> sendAsync(Object packet) {
        return this.sendAsync(this.remote.getHostString(), this.remote.getPort(), packet);
    }

    @Override
    protected void doDisposed() {
        this.address.release();
        this.socket.close();
        super.doDisposed();
    }

    @Override
    public NetworkType getType() {
        return NetworkType.udp_client;
    }

    @Override
    public boolean isAlive() {
        return true;
    }
}

