/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.simulator.core.benchmark;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.jetlinks.simulator.core.Connection;
import org.jetlinks.simulator.core.ConnectionManager;
import org.jetlinks.simulator.core.ExceptionUtils;
import org.jetlinks.simulator.core.benchmark.BenchmarkHelper;
import org.jetlinks.simulator.core.benchmark.BenchmarkOptions;
import org.jetlinks.simulator.core.benchmark.ConnectCreateContext;
import org.jetlinks.simulator.core.report.Reporter;
import org.jetlinks.simulator.core.script.Script;
import org.jetlinks.simulator.core.script.ScriptFactory;
import org.jetlinks.simulator.core.script.Scripts;
import org.joda.time.DateTime;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class Benchmark
implements Disposable,
BenchmarkHelper {
    private static final Logger log = LoggerFactory.getLogger(Benchmark.class);
    public static final String REPORT_CONNECTING = "connecting";
    public static final ScriptFactory scriptFactory = Scripts.getFactory("js");
    private final String name;
    private final BenchmarkOptions options;
    private final Reporter reporter;
    private final ConnectionManager connectionManager;
    private final Function<ConnectCreateContext, Mono<? extends Connection>> connectionFactory;
    private final List<BiConsumer<Integer, Object>> beforeConnectHandler = new CopyOnWriteArrayList<BiConsumer<Integer, Object>>();
    private final List<Consumer<Connection>> connectionHandler = new CopyOnWriteArrayList<Consumer<Connection>>();
    private final List<Runnable> completeHandler = new CopyOnWriteArrayList<Runnable>();
    private final Disposable.Composite disposable = Disposables.composite();
    private Disposable.Composite reloadable = Disposables.composite();
    private final Set<String> errors = ConcurrentHashMap.newKeySet();
    private final Deque<String> logs = new ConcurrentLinkedDeque<String>();
    private final Deque<Snapshot> snapshots = new ConcurrentLinkedDeque<Snapshot>();
    private Throwable lastError;

    public Benchmark(String name, BenchmarkOptions options, Reporter reporter, ConnectionManager connectionManager, Function<ConnectCreateContext, Mono<? extends Connection>> connectionFactory) {
        this.name = name;
        this.options = options;
        this.reporter = reporter;
        this.connectionManager = connectionManager;
        this.connectionFactory = connectionFactory;
    }

    public static Benchmark create(String name, BenchmarkOptions options, ConnectionManager connectionManager, Function<ConnectCreateContext, Mono<? extends Connection>> connectionFactory) {
        return new Benchmark(name, options, Reporter.create(), connectionManager, connectionFactory);
    }

    public Reporter getReporter() {
        return this.reporter;
    }

    public Deque<String> getLogs() {
        return this.logs;
    }

    public void start() {
        if (this.disposable.size() > 0) {
            return;
        }
        if (this.options.getFile() != null) {
            this.executeScript(Paths.get(this.options.getFile().toURI()));
        }
        this.disposable.add(Flux.interval(Duration.ofSeconds(1L)).flatMap(ignore -> this.snapshot()).subscribe());
        this.disposable.add(Flux.range(this.options.getIndex(), this.options.getSize()).flatMap(this::connect, this.options.getConcurrency(), this.options.getConcurrency()).doOnNext(this::handleConnected).then().doFinally(ignore -> {
            for (Runnable runnable : this.completeHandler) {
                runnable.run();
            }
        }).subscribe());
    }

    public void reload() {
        this.beforeConnectHandler.clear();
        this.completeHandler.clear();
        this.connectionHandler.clear();
        this.reloadable.dispose();
        this.reloadable = Disposables.composite();
        if (this.options.getFile() != null) {
            this.executeScript(Paths.get(this.options.getFile().toURI()));
        }
        this.getConnectionManager().getConnections().filter(Connection::isAlive).doOnNext(Connection::reset).subscribe(this::fireConnectionListener);
        for (Runnable runnable : this.completeHandler) {
            runnable.run();
        }
    }

    private void handleConnected(Connection connection) {
        this.connectionManager.addConnection(connection);
        this.fireConnectionListener(connection);
    }

    private void fireConnectionListener(Connection connection) {
        for (Consumer<Connection> consumer : this.connectionHandler) {
            try {
                consumer.accept(connection);
            }
            catch (Throwable e) {
                log.error(e.getMessage(), e);
            }
        }
    }

    void handleBeforeConnect(int index, Object ctx) {
        for (BiConsumer<Integer, Object> consumer : this.beforeConnectHandler) {
            try {
                consumer.accept(index, ctx);
            }
            catch (Throwable e) {
                log.warn("handleBeforeConnect error:{}", (Object)ExceptionUtils.getErrorMessage(e));
            }
        }
    }

    public Benchmark onConnected(Consumer<Connection> consumer) {
        this.connectionHandler.add(consumer);
        return this;
    }

    public Benchmark beforeConnect(BiConsumer<Integer, Object> handler) {
        this.beforeConnectHandler.add(handler);
        return this;
    }

    public Benchmark onComplete(Runnable runnable) {
        this.completeHandler.add(runnable);
        return this;
    }

    private Mono<Object> castMono(Object obj) {
        if (obj instanceof Publisher) {
            return Mono.from((Publisher)obj);
        }
        return Mono.justOrEmpty(obj);
    }

    @Override
    public Object require(String location) {
        return this.executeScript(Paths.get(location, new String[0]));
    }

    protected Object executeScript(Path file) {
        String script = new String(Files.readAllBytes(file));
        HashMap<String, Object> context = new HashMap<String, Object>();
        if (this.options.getScriptArgs() != null) {
            context.putAll(this.options.getScriptArgs());
            context.put("args", this.options.getScriptArgs());
        } else {
            context.put("args", Collections.emptyMap());
        }
        context.put("benchmark", this);
        return scriptFactory.compileExpose(Script.of("benchmark_" + this.name, script).returnNative(), BenchmarkHelper.class).call((BenchmarkHelper)this, context);
    }

    public Mono<Void> randomConnectionAsync(int size, Function<Connection, Object> handler) {
        return this.connectionManager.randomConnection(size).flatMap(conn -> this.castMono(handler.apply((Connection)conn)), size).then();
    }

    public Disposable randomConnection(int size, Function<Connection, Object> handler) {
        return this.randomConnectionAsync(size, handler).subscribe();
    }

    public Disposable delay(Callable<Object> callable, int ms) {
        return Mono.delay(Duration.ofMillis(ms)).flatMap(ignore -> Mono.fromCallable(callable).flatMap(this::castMono).onErrorResume(err -> {
            this.error("delay execute", (Throwable)err);
            return Mono.empty();
        })).subscribe();
    }

    public Disposable interval(Callable<Object> callable, int ms) {
        Disposable interval = Flux.interval(Duration.ofMillis(ms)).onBackpressureDrop().concatMap(ignore -> Mono.fromCallable(callable).flatMap(this::castMono).onErrorResume(err -> {
            this.error("interval execute", (Throwable)err);
            return Mono.empty();
        })).subscribe();
        this.reloadable.add(interval);
        return () -> {
            this.reloadable.remove(interval);
            interval.dispose();
        };
    }

    public Deque<Snapshot> snapshots() {
        return this.snapshots;
    }

    private Mono<Void> snapshot() {
        return this.connectionManager.summary().map(sum -> new Snapshot(this.snapshots.peekLast(), System.currentTimeMillis(), (ConnectionManager.Summary)sum)).doOnNext(snapshot -> {
            this.snapshots.add((Snapshot)snapshot);
            if (this.snapshots.size() >= 86400) {
                this.snapshots.removeFirst();
            }
        }).onErrorResume(err -> {
            this.error("snapshot", (Throwable)err);
            return Mono.empty();
        }).then();
    }

    private void error(String operation, Throwable e) {
        this.lastError = e;
        if (this.errors.size() > 100) {
            this.errors.clear();
        }
        this.errors.add(operation + ":" + ExceptionUtils.getErrorMessage(e));
    }

    private Mono<? extends Connection> connect(int index) {
        Reporter.Point point = this.reporter.newPoint(REPORT_CONNECTING);
        point.start();
        return this.connectionFactory.apply(new ConnectCreateContextImpl(index)).doOnNext(ignore -> point.success()).onErrorResume(err -> {
            point.error(this.getErrorMessage((Throwable)err));
            this.error("connect", (Throwable)err);
            return Mono.empty();
        });
    }

    private String getErrorMessage(Throwable error) {
        return error.getMessage() == null ? error.getClass().getSimpleName() : error.getMessage();
    }

    @Override
    public void dispose() {
        this.reloadable.dispose();
        this.disposable.dispose();
    }

    @Override
    public boolean isDisposed() {
        return this.disposable.isDisposed();
    }

    public void doOnDispose(Disposable disposable) {
        this.disposable.add(disposable);
    }

    public void clear() {
        this.logs.clear();
        this.errors.clear();
        this.lastError = null;
    }

    public void print(String log, Object ... args) {
        this.logs.add(new DateTime().toString("HH:mm:ss.SSS") + " " + String.format(log, args));
        if (this.logs.size() > 100) {
            this.logs.removeFirst();
        }
    }

    public String getName() {
        return this.name;
    }

    public BenchmarkOptions getOptions() {
        return this.options;
    }

    public ConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    public Throwable getLastError() {
        return this.lastError;
    }

    class ConnectCreateContextImpl
    implements ConnectCreateContext {
        private final int index;

        @Override
        public int index() {
            return this.index;
        }

        @Override
        public void beforeConnect(Object context) {
            Benchmark.this.handleBeforeConnect(this.index, context);
        }

        public ConnectCreateContextImpl(int index) {
            this.index = index;
        }
    }

    public static class Snapshot {
        private final Snapshot pre;
        private final long timestamp;
        private final ConnectionManager.Summary summary;

        public Snapshot getDiff() {
            ConnectionManager.Summary sum = this.summary;
            long time = 0L;
            if (this.pre != null) {
                sum = sum.sub(this.pre.summary);
                time = this.timestamp - this.pre.timestamp;
            }
            return new Snapshot(null, time, sum);
        }

        public Snapshot getPre() {
            return this.pre;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public ConnectionManager.Summary getSummary() {
            return this.summary;
        }

        public Snapshot(Snapshot pre, long timestamp, ConnectionManager.Summary summary) {
            this.pre = pre;
            this.timestamp = timestamp;
            this.summary = summary;
        }
    }
}

