package org.jetlinks.simulator.core;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import org.jetlinks.reactor.ql.ReactorQL;
import org.jetlinks.reactor.ql.ReactorQLContext;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/simulator/core/DefaultConnectionManager.class */
public class DefaultConnectionManager implements ConnectionManager {
    static ConnectionManager global = new DefaultConnectionManager();
    private static final Map<String, ReactorQL> queryCache = new ConcurrentReferenceHashMap();
    private final Map<String, Connection> connections = new ConcurrentHashMap(10240);

    @Override // reactor.core.Disposable
    public void dispose() {
        this.connections.values().forEach((v0) -> {
            v0.dispose();
        });
    }

    @Override // org.jetlinks.simulator.core.ConnectionManager
    public long getConnectionSize() {
        return this.connections.size();
    }

    @Override // org.jetlinks.simulator.core.ConnectionManager
    public Flux<Connection> getConnections() {
        return Flux.fromIterable(this.connections.values());
    }

    @Override // org.jetlinks.simulator.core.ConnectionManager
    public Mono<Connection> getConnection(String str) {
        return Mono.justOrEmpty(this.connections.get(str));
    }

    @Override // org.jetlinks.simulator.core.ConnectionManager
    public Optional<Connection> getConnectionNow(String str) {
        return Optional.ofNullable(this.connections.get(str));
    }

    private ReactorQL createQl(String str) {
        return ReactorQL.builder().sql("select id from dual where ", str).build();
    }

    @Override // org.jetlinks.simulator.core.ConnectionManager
    public Flux<Connection> findConnection(String str) {
        return !StringUtils.hasText(str) ? getConnections() : Flux.defer(() -> {
            return queryCache.computeIfAbsent(str, this::createQl).start(ReactorQLContext.ofDatasource(str2 -> {
                return getConnections().map((v0) -> {
                    return v0.attributes();
                });
            })).mapNotNull(reactorQLRecord -> {
                String str3 = (String) reactorQLRecord.asMap().get("id");
                if (str3 == null) {
                    return null;
                }
                return this.connections.get(str3);
            });
        });
    }

    @Override // org.jetlinks.simulator.core.ConnectionManager
    public Flux<Connection> randomConnection(int i) {
        if (this.connections.size() <= i) {
            return Flux.fromIterable(this.connections.values()).filter((v0) -> {
                return v0.isAlive();
            });
        }
        return getConnections().skip(ThreadLocalRandom.current().nextInt(0, r0 - i)).take(i).filter((v0) -> {
            return v0.isAlive();
        });
    }

    @Override // org.jetlinks.simulator.core.ConnectionManager
    public ConnectionManager addConnection(Connection connection) {
        if (!connection.isAlive()) {
            return this;
        }
        connection.attribute("id", connection.getId());
        connection.attribute("type", connection.getType().name());
        connection.attribute("connectTime", Long.valueOf(connection.getConnectTime()));
        this.connections.put(connection.getId(), connection);
        return this;
    }
}
