/*
 * Decompiled with CFR 0.152.
 */
package reactor.netty.incubator.quic;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.incubator.codec.quic.DefaultQuicStreamFrame;
import io.netty.incubator.codec.quic.QuicStreamChannel;
import io.netty.incubator.codec.quic.QuicStreamFrame;
import io.netty.incubator.codec.quic.QuicStreamType;
import io.netty.util.ReferenceCounted;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.FutureMono;
import reactor.netty.NettyOutbound;
import reactor.netty.ReactorNetty;
import reactor.netty.channel.AbortedException;
import reactor.netty.channel.ChannelOperations;
import reactor.netty.incubator.quic.QuicInbound;
import reactor.netty.incubator.quic.QuicOutbound;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;

class QuicStreamOperations
extends ChannelOperations<QuicInbound, QuicOutbound>
implements QuicInbound,
QuicOutbound {
    static final AtomicIntegerFieldUpdater<QuicStreamOperations> FIN_SENT = AtomicIntegerFieldUpdater.newUpdater(QuicStreamOperations.class, "finSent");
    volatile int finSent;
    static final Logger log = Loggers.getLogger(QuicStreamOperations.class);

    QuicStreamOperations(Connection connection, ConnectionObserver listener) {
        super(connection, listener);
        this.markPersistent(false);
    }

    @Override
    public String asLongText() {
        return this.asShortText() + ", " + this.channel().localAddress();
    }

    @Override
    public boolean isLocalStream() {
        return ((QuicStreamChannel)this.connection().channel()).isLocalCreated();
    }

    @Override
    public NettyOutbound send(Publisher<? extends ByteBuf> dataStream, Predicate<ByteBuf> predicate) {
        Objects.requireNonNull(predicate, "predicate");
        if (!this.channel().isActive()) {
            return this.then(Mono.error(AbortedException.beforeSend()));
        }
        if (dataStream instanceof Mono) {
            return this.then(((Mono)dataStream).flatMap(m3 -> {
                if (this.markFinSent()) {
                    return FutureMono.from(this.channel().writeAndFlush(new DefaultQuicStreamFrame((ByteBuf)m3, true)));
                }
                return FutureMono.from(this.channel().writeAndFlush(m3));
            }).doOnDiscard(ByteBuf.class, ReferenceCounted::release));
        }
        return super.send(dataStream, predicate);
    }

    @Override
    public NettyOutbound sendObject(Object message) {
        if (!this.channel().isActive()) {
            ReactorNetty.safeRelease(message);
            return this.then(Mono.error(AbortedException.beforeSend()));
        }
        if (!(message instanceof ByteBuf)) {
            return super.sendObject(message);
        }
        ByteBuf buffer = (ByteBuf)message;
        return this.then(FutureMono.deferFuture(() -> {
            if (this.markFinSent()) {
                return this.connection().channel().writeAndFlush(new DefaultQuicStreamFrame(buffer, true));
            }
            return this.connection().channel().writeAndFlush(buffer);
        }), () -> ReactorNetty.safeRelease(buffer));
    }

    @Override
    public long streamId() {
        return ((QuicStreamChannel)this.connection().channel()).streamId();
    }

    @Override
    public QuicStreamType streamType() {
        return ((QuicStreamChannel)this.connection().channel()).type();
    }

    @Override
    protected void onInboundCancel() {
        if (log.isDebugEnabled()) {
            log.debug(ReactorNetty.format(this.channel(), "Cancelling inbound stream. Sending WRITE_FIN."));
        }
        this.sendFinNow(f -> this.terminate());
    }

    @Override
    protected void onOutboundError(Throwable err) {
        if (log.isDebugEnabled()) {
            log.debug(ReactorNetty.format(this.channel(), "Outbound error happened. Sending WRITE_FIN."), err);
        }
        this.sendFinNow(f -> this.terminate());
    }

    @Override
    protected final void onInboundComplete() {
        super.onInboundComplete();
    }

    final boolean markFinSent() {
        return FIN_SENT.compareAndSet(this, 0, 1);
    }

    final void sendFinNow() {
        this.sendFinNow(null);
    }

    final void sendFinNow(@Nullable ChannelFutureListener listener) {
        if (this.markFinSent()) {
            ChannelFuture f = this.channel().writeAndFlush(QuicStreamFrame.EMPTY_FIN);
            if (listener != null) {
                f.addListener(listener);
            }
        }
    }

    static void callTerminate(Channel ch) {
        ChannelOperations<?, ?> ops = QuicStreamOperations.get(ch);
        if (ops == null) {
            return;
        }
        ((QuicStreamOperations)ops).terminate();
    }
}

