/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.core.device;

import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.interceptor.DeviceMessageSenderInterceptor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class CompositeDeviceMessageSenderInterceptor
implements DeviceMessageSenderInterceptor {
    private final List<DeviceMessageSenderInterceptor> interceptors = new CopyOnWriteArrayList<DeviceMessageSenderInterceptor>();

    public void addInterceptor(DeviceMessageSenderInterceptor interceptor) {
        this.interceptors.add(interceptor);
        this.interceptors.sort(Comparator.comparingInt(DeviceMessageSenderInterceptor::getOrder));
    }

    @Override
    public Mono<DeviceMessage> preSend(DeviceOperator device, DeviceMessage message) {
        Mono<DeviceMessage> mono = Mono.just(message);
        for (DeviceMessageSenderInterceptor interceptor : this.interceptors) {
            mono = mono.flatMap(msg -> interceptor.preSend(device, (DeviceMessage)msg));
        }
        return mono;
    }

    @Override
    public Flux<DeviceMessage> doSend(DeviceOperator device, DeviceMessage source, Flux<DeviceMessage> sender) {
        Flux<DeviceMessage> flux = sender;
        for (DeviceMessageSenderInterceptor interceptor : this.interceptors) {
            flux = interceptor.doSend(device, source, flux);
        }
        return flux;
    }

    @Override
    public <R extends DeviceMessage> Flux<R> afterSent(DeviceOperator device, DeviceMessage message, Flux<R> reply) {
        Flux<R> flux = reply;
        for (DeviceMessageSenderInterceptor interceptor : this.interceptors) {
            flux = interceptor.afterSent(device, message, flux);
        }
        return flux;
    }
}

