/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.simulator.core;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import org.jetlinks.reactor.ql.ReactorQL;
import org.jetlinks.reactor.ql.ReactorQLContext;
import org.jetlinks.simulator.core.Connection;
import org.jetlinks.simulator.core.ConnectionManager;
import org.springframework.util.ConcurrentReferenceHashMap;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class DefaultConnectionManager
implements ConnectionManager {
    static ConnectionManager global = new DefaultConnectionManager();
    private static final Map<String, ReactorQL> queryCache = new ConcurrentReferenceHashMap<String, ReactorQL>();
    private final Map<String, Connection> connections = new ConcurrentHashMap<String, Connection>(10240);

    @Override
    public void dispose() {
        this.connections.values().forEach(Disposable::dispose);
    }

    @Override
    public long getConnectionSize() {
        return this.connections.size();
    }

    @Override
    public Flux<Connection> getConnections() {
        return Flux.fromIterable(this.connections.values());
    }

    @Override
    public Mono<Connection> getConnection(String id) {
        return Mono.justOrEmpty(this.connections.get(id));
    }

    @Override
    public Optional<Connection> getConnectionNow(String id) {
        return Optional.ofNullable(this.connections.get(id));
    }

    private ReactorQL createQl(String ql) {
        return ReactorQL.builder().sql("select id from dual where ", ql).build();
    }

    @Override
    public Flux<Connection> findConnection(String ql) {
        if (!StringUtils.hasText(ql)) {
            return this.getConnections();
        }
        return Flux.defer(() -> queryCache.computeIfAbsent(ql, this::createQl).start(ReactorQLContext.ofDatasource(ignore -> this.getConnections().map(Connection::attributes))).mapNotNull(record -> {
            String id = (String)record.asMap().get("id");
            return id == null ? null : this.connections.get(id);
        }));
    }

    @Override
    public Flux<Connection> randomConnection(int size) {
        int total = this.connections.size();
        if (total <= size) {
            return Flux.fromIterable(this.connections.values()).filter(Connection::isAlive);
        }
        int sub = total - size;
        return this.getConnections().skip(ThreadLocalRandom.current().nextInt(0, sub)).take(size).filter(Connection::isAlive);
    }

    @Override
    public ConnectionManager addConnection(Connection connection) {
        if (!connection.isAlive()) {
            return this;
        }
        connection.attribute("id", connection.getId());
        connection.attribute("type", connection.getType().name());
        connection.attribute("connectTime", connection.getConnectTime());
        this.connections.put(connection.getId(), connection);
        return this;
    }
}

