package org.jetlinks.simulator.core.benchmark;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.jetlinks.simulator.core.Connection;
import org.jetlinks.simulator.core.ConnectionManager;
import org.jetlinks.simulator.core.ExceptionUtils;
import org.jetlinks.simulator.core.report.Reporter;
import org.jetlinks.simulator.core.script.ExposedScript;
import org.jetlinks.simulator.core.script.Script;
import org.jetlinks.simulator.core.script.ScriptFactory;
import org.jetlinks.simulator.core.script.Scripts;
import org.joda.time.DateTime;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/simulator/core/benchmark/Benchmark.class */
public class Benchmark implements Disposable, BenchmarkHelper {
    public static final String REPORT_CONNECTING = "connecting";
    private final String name;
    private final BenchmarkOptions options;
    private final Reporter reporter;
    private final ConnectionManager connectionManager;
    private final Function<ConnectCreateContext, Mono<? extends Connection>> connectionFactory;
    private final List<BiConsumer<Integer, Object>> beforeConnectHandler = new CopyOnWriteArrayList();
    private final List<Consumer<Connection>> connectionHandler = new CopyOnWriteArrayList();
    private final List<Runnable> completeHandler = new CopyOnWriteArrayList();
    private final Disposable.Composite disposable = Disposables.composite();
    private Disposable.Composite reloadable = Disposables.composite();
    private final Set<String> errors = ConcurrentHashMap.newKeySet();
    private final Deque<String> logs = new ConcurrentLinkedDeque();
    private final Deque<Snapshot> snapshots = new ConcurrentLinkedDeque();
    private Throwable lastError;
    private static final Logger log = LoggerFactory.getLogger((Class<?>) Benchmark.class);
    public static final ScriptFactory scriptFactory = Scripts.getFactory("js");

    /* loaded from: input_file:org/jetlinks/simulator/core/benchmark/Benchmark$ConnectCreateContextImpl.class */
    class ConnectCreateContextImpl implements ConnectCreateContext {
        private final int index;

        @Override // org.jetlinks.simulator.core.benchmark.ConnectCreateContext
        public int index() {
            return this.index;
        }

        @Override // org.jetlinks.simulator.core.benchmark.ConnectCreateContext
        public void beforeConnect(Object obj) {
            Benchmark.this.handleBeforeConnect(this.index, obj);
        }

        public ConnectCreateContextImpl(int i) {
            this.index = i;
        }
    }

    /* loaded from: input_file:org/jetlinks/simulator/core/benchmark/Benchmark$Snapshot.class */
    public static class Snapshot {
        private final Snapshot pre;
        private final long timestamp;
        private final ConnectionManager.Summary summary;

        public Snapshot getDiff() {
            ConnectionManager.Summary summary = this.summary;
            long j = 0;
            if (this.pre != null) {
                summary = summary.sub(this.pre.summary);
                j = this.timestamp - this.pre.timestamp;
            }
            return new Snapshot(null, j, summary);
        }

        public Snapshot getPre() {
            return this.pre;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public ConnectionManager.Summary getSummary() {
            return this.summary;
        }

        public Snapshot(Snapshot snapshot, long j, ConnectionManager.Summary summary) {
            this.pre = snapshot;
            this.timestamp = j;
            this.summary = summary;
        }
    }

    public Benchmark(String str, BenchmarkOptions benchmarkOptions, Reporter reporter, ConnectionManager connectionManager, Function<ConnectCreateContext, Mono<? extends Connection>> function) {
        this.name = str;
        this.options = benchmarkOptions;
        this.reporter = reporter;
        this.connectionManager = connectionManager;
        this.connectionFactory = function;
    }

    public static Benchmark create(String str, BenchmarkOptions benchmarkOptions, ConnectionManager connectionManager, Function<ConnectCreateContext, Mono<? extends Connection>> function) {
        return new Benchmark(str, benchmarkOptions, Reporter.create(), connectionManager, function);
    }

    public Reporter getReporter() {
        return this.reporter;
    }

    public Deque<String> getLogs() {
        return this.logs;
    }

    public void start() {
        if (this.disposable.size() > 0) {
            return;
        }
        if (this.options.getFile() != null) {
            executeScript(Paths.get(this.options.getFile().toURI()));
        }
        this.disposable.add(Flux.interval(Duration.ofSeconds(1L)).flatMap(l -> {
            return snapshot();
        }).subscribe());
        this.disposable.add(Flux.range(this.options.getIndex(), this.options.getSize()).flatMap((v1) -> {
            return connect(v1);
        }, this.options.getConcurrency(), this.options.getConcurrency()).doOnNext(this::handleConnected).then().doFinally(signalType -> {
            Iterator<Runnable> it = this.completeHandler.iterator();
            while (it.hasNext()) {
                it.next().run();
            }
        }).subscribe());
    }

    public void reload() {
        this.beforeConnectHandler.clear();
        this.completeHandler.clear();
        this.connectionHandler.clear();
        this.reloadable.dispose();
        this.reloadable = Disposables.composite();
        if (this.options.getFile() != null) {
            executeScript(Paths.get(this.options.getFile().toURI()));
        }
        getConnectionManager().getConnections().filter((v0) -> {
            return v0.isAlive();
        }).doOnNext((v0) -> {
            v0.reset();
        }).subscribe(this::fireConnectionListener);
        Iterator<Runnable> it = this.completeHandler.iterator();
        while (it.hasNext()) {
            it.next().run();
        }
    }

    private void handleConnected(Connection connection) {
        this.connectionManager.addConnection(connection);
        fireConnectionListener(connection);
    }

    private void fireConnectionListener(Connection connection) {
        Iterator<Consumer<Connection>> it = this.connectionHandler.iterator();
        while (it.hasNext()) {
            try {
                it.next().accept(connection);
            } catch (Throwable th) {
                log.error(th.getMessage(), th);
            }
        }
    }

    void handleBeforeConnect(int i, Object obj) {
        Iterator<BiConsumer<Integer, Object>> it = this.beforeConnectHandler.iterator();
        while (it.hasNext()) {
            try {
                it.next().accept(Integer.valueOf(i), obj);
            } catch (Throwable th) {
                log.warn("handleBeforeConnect error:{}", ExceptionUtils.getErrorMessage(th));
            }
        }
    }

    public Benchmark onConnected(Consumer<Connection> consumer) {
        this.connectionHandler.add(consumer);
        return this;
    }

    public Benchmark beforeConnect(BiConsumer<Integer, Object> biConsumer) {
        this.beforeConnectHandler.add(biConsumer);
        return this;
    }

    public Benchmark onComplete(Runnable runnable) {
        this.completeHandler.add(runnable);
        return this;
    }

    private Mono<Object> castMono(Object obj) {
        return obj instanceof Publisher ? Mono.from((Publisher) obj) : Mono.justOrEmpty(obj);
    }

    @Override // org.jetlinks.simulator.core.benchmark.BenchmarkHelper
    public Object require(String str) {
        return executeScript(Paths.get(str, new String[0]));
    }

    protected Object executeScript(Path path) {
        String str = new String(Files.readAllBytes(path));
        HashMap hashMap = new HashMap();
        if (this.options.getScriptArgs() != null) {
            hashMap.putAll(this.options.getScriptArgs());
            hashMap.put("args", this.options.getScriptArgs());
        } else {
            hashMap.put("args", Collections.emptyMap());
        }
        hashMap.put("benchmark", this);
        return scriptFactory.compileExpose(Script.of("benchmark_" + this.name, str).returnNative(), BenchmarkHelper.class).call((ExposedScript) this, (Map<String, Object>) hashMap);
    }

    public Mono<Void> randomConnectionAsync(int i, Function<Connection, Object> function) {
        return this.connectionManager.randomConnection(i).flatMap(connection -> {
            return castMono(function.apply(connection));
        }, i).then();
    }

    public Disposable randomConnection(int i, Function<Connection, Object> function) {
        return randomConnectionAsync(i, function).subscribe();
    }

    public Disposable delay(Callable<Object> callable, int i) {
        return Mono.delay(Duration.ofMillis(i)).flatMap(l -> {
            return Mono.fromCallable(callable).flatMap(this::castMono).onErrorResume(th -> {
                error("delay execute", th);
                return Mono.empty();
            });
        }).subscribe();
    }

    public Disposable interval(Callable<Object> callable, int i) {
        Disposable subscribe = Flux.interval(Duration.ofMillis(i)).onBackpressureDrop().concatMap(l -> {
            return Mono.fromCallable(callable).flatMap(this::castMono).onErrorResume(th -> {
                error("interval execute", th);
                return Mono.empty();
            });
        }).subscribe();
        this.reloadable.add(subscribe);
        return () -> {
            this.reloadable.remove(subscribe);
            subscribe.dispose();
        };
    }

    public Deque<Snapshot> snapshots() {
        return this.snapshots;
    }

    private Mono<Void> snapshot() {
        return this.connectionManager.summary().map(summary -> {
            return new Snapshot(this.snapshots.peekLast(), System.currentTimeMillis(), summary);
        }).doOnNext(snapshot -> {
            this.snapshots.add(snapshot);
            if (this.snapshots.size() >= 86400) {
                this.snapshots.removeFirst();
            }
        }).onErrorResume(th -> {
            error("snapshot", th);
            return Mono.empty();
        }).then();
    }

    private void error(String str, Throwable th) {
        this.lastError = th;
        if (this.errors.size() > 100) {
            this.errors.clear();
        }
        this.errors.add(str + ":" + ExceptionUtils.getErrorMessage(th));
    }

    private Mono<? extends Connection> connect(int i) {
        Reporter.Point newPoint = this.reporter.newPoint(REPORT_CONNECTING);
        newPoint.start();
        return this.connectionFactory.apply(new ConnectCreateContextImpl(i)).doOnNext(connection -> {
            newPoint.success();
        }).onErrorResume(th -> {
            newPoint.error(getErrorMessage(th));
            error("connect", th);
            return Mono.empty();
        });
    }

    private String getErrorMessage(Throwable th) {
        return th.getMessage() == null ? th.getClass().getSimpleName() : th.getMessage();
    }

    @Override // reactor.core.Disposable
    public void dispose() {
        this.reloadable.dispose();
        this.disposable.dispose();
    }

    @Override // reactor.core.Disposable
    public boolean isDisposed() {
        return this.disposable.isDisposed();
    }

    public void doOnDispose(Disposable disposable) {
        this.disposable.add(disposable);
    }

    public void clear() {
        this.logs.clear();
        this.errors.clear();
        this.lastError = null;
    }

    public void print(String str, Object... objArr) {
        this.logs.add(new DateTime().toString("HH:mm:ss.SSS") + " " + String.format(str, objArr));
        if (this.logs.size() > 100) {
            this.logs.removeFirst();
        }
    }

    public String getName() {
        return this.name;
    }

    public BenchmarkOptions getOptions() {
        return this.options;
    }

    public ConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    public Throwable getLastError() {
        return this.lastError;
    }
}
