/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.network.mqtt.server.vertx;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.vertx.mqtt.MqttEndpoint;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.community.network.mqtt.server.MqttServer;
import org.jetlinks.community.network.mqtt.server.vertx.VertxMqttConnection;
import org.jetlinks.core.utils.Reactors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

public class VertxMqttServer
implements MqttServer {
    private static final Logger log = LoggerFactory.getLogger(VertxMqttServer.class);
    private final Sinks.Many<MqttConnection> sink = Reactors.createMany((int)5120, (boolean)false);
    private final Map<String, List<Sinks.Many<MqttConnection>>> sinks = new NonBlockingHashMap();
    private Collection<io.vertx.mqtt.MqttServer> mqttServer;
    private final String id;
    private String lastError;
    private InetSocketAddress bind;

    public VertxMqttServer(String id) {
        this.id = id;
    }

    public void setMqttServer(Collection<io.vertx.mqtt.MqttServer> mqttServer) {
        if (this.mqttServer != null && !this.mqttServer.isEmpty()) {
            this.shutdown();
        }
        this.mqttServer = mqttServer;
        for (io.vertx.mqtt.MqttServer server : this.mqttServer) {
            server.exceptionHandler(error -> log.error(error.getMessage(), error)).endpointHandler(endpoint -> this.handleConnection(new VertxMqttConnection((MqttEndpoint)endpoint)));
        }
    }

    private boolean emitNext(Sinks.Many<MqttConnection> sink, VertxMqttConnection connection) {
        if (sink.currentSubscriberCount() <= 0) {
            return false;
        }
        try {
            sink.emitNext((Object)connection, Reactors.emitFailureHandler());
            return true;
        }
        catch (Throwable throwable) {
            return false;
        }
    }

    private void handleConnection(VertxMqttConnection connection) {
        boolean anyHandled = this.emitNext(this.sink, connection);
        for (List<Sinks.Many<MqttConnection>> value : this.sinks.values()) {
            Sinks.Many<MqttConnection> sink;
            if (value.size() == 0 || !this.emitNext(sink = value.get(ThreadLocalRandom.current().nextInt(value.size())), connection)) continue;
            anyHandled = true;
        }
        if (!anyHandled) {
            connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
        }
    }

    @Override
    public Flux<MqttConnection> handleConnection() {
        return this.sink.asFlux();
    }

    @Override
    public Flux<MqttConnection> handleConnection(String holder) {
        List sinks = this.sinks.computeIfAbsent(holder, ignore -> new CopyOnWriteArrayList());
        Sinks.Many sink = Reactors.createMany((int)Integer.MAX_VALUE, (boolean)true);
        sinks.add(sink);
        return sink.asFlux().doOnCancel(() -> sinks.remove(sink));
    }

    public boolean isAlive() {
        return this.mqttServer != null && !this.mqttServer.isEmpty();
    }

    public boolean isAutoReload() {
        return false;
    }

    public String getId() {
        return this.id;
    }

    public NetworkType getType() {
        return DefaultNetworkType.MQTT_SERVER;
    }

    public void shutdown() {
        if (this.mqttServer != null) {
            for (io.vertx.mqtt.MqttServer server : this.mqttServer) {
                server.close(res -> {
                    if (res.failed()) {
                        log.error(res.cause().getMessage(), res.cause());
                    } else {
                        log.debug("mqtt server [{}] closed", (Object)server.actualPort());
                    }
                });
            }
            this.mqttServer.clear();
        }
    }

    public InetSocketAddress getBindAddress() {
        return this.bind;
    }

    public String getLastError() {
        return this.lastError;
    }

    public void setLastError(String lastError) {
        this.lastError = lastError;
    }

    void setBind(InetSocketAddress bind) {
        this.bind = bind;
    }
}

