package org.jetlinks.community.network.mqtt.server.vertx;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.community.network.mqtt.server.MqttServer;
import org.jetlinks.core.utils.Reactors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/jetlinks/community/network/mqtt/server/vertx/VertxMqttServer.class */
public class VertxMqttServer implements MqttServer {
    private static final Logger log = LoggerFactory.getLogger(VertxMqttServer.class);
    private final Sinks.Many<MqttConnection> sink = Reactors.createMany(5120, false);
    private final Map<String, List<Sinks.Many<MqttConnection>>> sinks = new NonBlockingHashMap();
    private Collection<io.vertx.mqtt.MqttServer> mqttServer;
    private final String id;
    private String lastError;
    private InetSocketAddress bind;

    public VertxMqttServer(String str) {
        this.id = str;
    }

    public void setMqttServer(Collection<io.vertx.mqtt.MqttServer> collection) {
        if (this.mqttServer != null && !this.mqttServer.isEmpty()) {
            shutdown();
        }
        this.mqttServer = collection;
        Iterator<io.vertx.mqtt.MqttServer> it = this.mqttServer.iterator();
        while (it.hasNext()) {
            it.next().exceptionHandler(th -> {
                log.error(th.getMessage(), th);
            }).endpointHandler(mqttEndpoint -> {
                handleConnection(new VertxMqttConnection(mqttEndpoint));
            });
        }
    }

    private boolean emitNext(Sinks.Many<MqttConnection> many, VertxMqttConnection vertxMqttConnection) {
        if (many.currentSubscriberCount() <= 0) {
            return false;
        }
        try {
            many.emitNext(vertxMqttConnection, Reactors.emitFailureHandler());
            return true;
        } catch (Throwable th) {
            return false;
        }
    }

    private void handleConnection(VertxMqttConnection vertxMqttConnection) {
        boolean emitNext = emitNext(this.sink, vertxMqttConnection);
        for (List<Sinks.Many<MqttConnection>> list : this.sinks.values()) {
            if (list.size() != 0 && emitNext(list.get(ThreadLocalRandom.current().nextInt(list.size())), vertxMqttConnection)) {
                emitNext = true;
            }
        }
        if (emitNext) {
            return;
        }
        vertxMqttConnection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttServer
    public Flux<MqttConnection> handleConnection() {
        return this.sink.asFlux();
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttServer
    public Flux<MqttConnection> handleConnection(String str) {
        List<Sinks.Many<MqttConnection>> computeIfAbsent = this.sinks.computeIfAbsent(str, str2 -> {
            return new CopyOnWriteArrayList();
        });
        Sinks.Many<MqttConnection> createMany = Reactors.createMany(Integer.MAX_VALUE, true);
        computeIfAbsent.add(createMany);
        return createMany.asFlux().doOnCancel(() -> {
            computeIfAbsent.remove(createMany);
        });
    }

    public boolean isAlive() {
        return (this.mqttServer == null || this.mqttServer.isEmpty()) ? false : true;
    }

    public boolean isAutoReload() {
        return false;
    }

    public String getId() {
        return this.id;
    }

    public NetworkType getType() {
        return DefaultNetworkType.MQTT_SERVER;
    }

    public void shutdown() {
        if (this.mqttServer != null) {
            for (io.vertx.mqtt.MqttServer mqttServer : this.mqttServer) {
                mqttServer.close(asyncResult -> {
                    if (asyncResult.failed()) {
                        log.error(asyncResult.cause().getMessage(), asyncResult.cause());
                    } else {
                        log.debug("mqtt server [{}] closed", Integer.valueOf(mqttServer.actualPort()));
                    }
                });
            }
            this.mqttServer.clear();
        }
    }

    public InetSocketAddress getBindAddress() {
        return this.bind;
    }

    public String getLastError() {
        return this.lastError;
    }

    public void setLastError(String str) {
        this.lastError = str;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setBind(InetSocketAddress inetSocketAddress) {
        this.bind = inetSocketAddress;
    }
}
