package org.jetlinks.simulator.core.network.mqtt;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttConnectionException;
import io.vertx.mqtt.messages.MqttConnAckMessage;
import io.vertx.mqtt.messages.MqttPublishMessage;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.simulator.core.Connection;
import org.jetlinks.simulator.core.ExceptionUtils;
import org.jetlinks.simulator.core.Global;
import org.jetlinks.simulator.core.network.AbstractConnection;
import org.jetlinks.simulator.core.network.Address;
import org.jetlinks.simulator.core.network.AddressManager;
import org.jetlinks.simulator.core.network.NetworkType;
import org.jetlinks.simulator.core.network.NetworkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/jetlinks/simulator/core/network/mqtt/MqttClient.class */
public class MqttClient extends AbstractConnection {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) MqttClient.class);
    io.vertx.mqtt.MqttClient client;
    Address address;
    private List<Consumer<MqttPublishMessage>> handlers;
    private final Map<Tuple2<String, Integer>, Subscriber> subscribers = new ConcurrentHashMap();

    /* loaded from: input_file:org/jetlinks/simulator/core/network/mqtt/MqttClient$Subscriber.class */
    public class Subscriber implements Disposable, Consumer<MqttPublishMessage> {
        private final String topic;
        private final int qos;
        private final List<Consumer<MqttPublishMessage>> handlers = new CopyOnWriteArrayList();
        private final Disposable disposable;

        public Subscriber(String str, int i) {
            this.topic = str;
            this.qos = i;
            this.disposable = MqttClient.this.handle(this);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Disposable addHandler(Consumer<MqttPublishMessage> consumer) {
            if (this.handlers.isEmpty()) {
                MqttClient.this.client.subscribe(this.topic, this.qos);
            }
            this.handlers.add(consumer);
            return () -> {
                this.handlers.remove(consumer);
                tryDispose();
            };
        }

        private void tryDispose() {
            if (this.handlers.isEmpty()) {
                dispose();
            }
        }

        @Override // reactor.core.Disposable
        public void dispose() {
            MqttClient.this.client.unsubscribe(this.topic);
            MqttClient.this.subscribers.clear();
            this.disposable.dispose();
            MqttClient.this.subscribers.remove(Tuples.of(this.topic, Integer.valueOf(this.qos)), this);
        }

        @Override // java.util.function.Consumer
        public void accept(MqttPublishMessage mqttPublishMessage) {
            if (TopicUtils.match(this.topic, mqttPublishMessage.topicName())) {
                Iterator<Consumer<MqttPublishMessage>> it = this.handlers.iterator();
                while (it.hasNext()) {
                    it.next().accept(mqttPublishMessage);
                }
            }
        }

        public String getTopic() {
            return this.topic;
        }

        public int getQos() {
            return this.qos;
        }
    }

    private MqttClient(io.vertx.mqtt.MqttClient mqttClient, Address address) {
        this.client = mqttClient;
        this.address = address;
        this.client.publishHandler(mqttPublishMessage -> {
            received(mqttPublishMessage.payload().length());
            if (this.handlers != null) {
                Iterator<Consumer<MqttPublishMessage>> it = this.handlers.iterator();
                while (it.hasNext()) {
                    try {
                        it.next().accept(mqttPublishMessage);
                    } catch (Throwable th) {
                        log.warn("handle mqtt message {} {} error:{}", mqttPublishMessage.topicName(), mqttPublishMessage.payload().toString(), ExceptionUtils.getErrorMessage(th));
                    }
                }
            }
        }).closeHandler(r3 -> {
            dispose();
        });
    }

    public static Mono<MqttClient> connect(InetSocketAddress inetSocketAddress, MqttOptions mqttOptions) {
        return connect(Global.vertx(), inetSocketAddress, mqttOptions);
    }

    public static Mono<MqttClient> connect(Vertx vertx, InetSocketAddress inetSocketAddress, MqttOptions mqttOptions) {
        Address takeAddress = AddressManager.global().takeAddress(mqttOptions.getLocalAddress());
        return Mono.create(monoSink -> {
            MqttOptions copy = mqttOptions.copy();
            copy.setClientId(mqttOptions.getClientId());
            copy.setUsername(mqttOptions.getUsername());
            copy.setPassword(mqttOptions.getPassword());
            copy.setLocalAddress(takeAddress.getAddress().getHostAddress());
            copy.setAutoKeepAlive(true);
            copy.setTcpKeepAlive(true);
            copy.setMaxMessageSize(1048576);
            copy.setReusePort(true);
            io.vertx.mqtt.MqttClient create = io.vertx.mqtt.MqttClient.create(vertx, copy);
            create.connect(inetSocketAddress.getPort(), inetSocketAddress.getHostString(), asyncResult -> {
                if (asyncResult.failed()) {
                    monoSink.error(asyncResult.cause());
                    return;
                }
                MqttConnAckMessage mqttConnAckMessage = (MqttConnAckMessage) asyncResult.result();
                if (mqttConnAckMessage != null) {
                    if (mqttConnAckMessage.code() == MqttConnectReturnCode.CONNECTION_ACCEPTED) {
                        MqttClient mqttClient = new MqttClient(create, takeAddress);
                        mqttClient.attribute("clientId", mqttOptions.getClientId());
                        mqttClient.attribute("username", mqttOptions.getUsername());
                        mqttClient.changeState(Connection.State.connected);
                        monoSink.success(mqttClient);
                    } else {
                        monoSink.error(new MqttConnectionException(mqttConnAckMessage.code()));
                    }
                }
                monoSink.success();
            });
        }).doOnError(th -> {
            takeAddress.release();
        });
    }

    @Override // org.jetlinks.simulator.core.Connection
    public String getId() {
        return this.client.clientId();
    }

    @Override // org.jetlinks.simulator.core.Connection
    public NetworkType getType() {
        return NetworkType.mqtt_client;
    }

    @Override // org.jetlinks.simulator.core.Connection
    public boolean isAlive() {
        return this.client.isConnected();
    }

    public synchronized Disposable handle(Consumer<MqttPublishMessage> consumer) {
        if (this.handlers == null) {
            this.handlers = new ArrayList();
        }
        this.handlers.add(consumer);
        return () -> {
            this.handlers.remove(consumer);
        };
    }

    public void unsubscribe(String str) {
        for (Subscriber subscriber : this.subscribers.values()) {
            if (subscriber.topic.equals(str)) {
                subscriber.dispose();
            }
        }
    }

    public Disposable subscribe(String str, int i, Consumer<MqttPublishMessage> consumer) {
        return this.subscribers.computeIfAbsent(Tuples.of(str, Integer.valueOf(i)), tuple2 -> {
            return new Subscriber((String) tuple2.getT1(), ((Integer) tuple2.getT2()).intValue());
        }).addHandler(consumer);
    }

    public void publish(String str, int i, Object obj) {
        publishAsync(str, i, obj).doOnError(th -> {
            log.warn("publish error {} {} {}", str, obj, ExceptionUtils.getErrorMessage(th));
        }).subscribe();
    }

    public Mono<Void> publishAsync(String str, int i, Object obj) {
        return publishAsync(str, i, NetworkUtils.castToByteBuf(obj));
    }

    public Mono<Void> publishAsync(String str, int i, ByteBuf byteBuf) {
        Buffer buffer = Buffer.buffer(byteBuf);
        int length = buffer.length();
        return Mono.create(monoSink -> {
            this.client.publish(str, buffer, MqttQoS.valueOf(i), false, false, asyncResult -> {
                ReferenceCountUtil.safeRelease(byteBuf);
                sent(length);
                if (asyncResult.failed()) {
                    monoSink.error(asyncResult.cause());
                } else {
                    monoSink.success();
                }
            });
        }).doOnError(this::error);
    }

    public List<Subscriber> getSubscriptions() {
        return new ArrayList(this.subscribers.values());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.jetlinks.simulator.core.network.AbstractConnection
    public void doDisposed() {
        this.address.release();
        super.doDisposed();
        if (this.client.isConnected()) {
            this.client.disconnect();
        }
    }

    @Override // org.jetlinks.simulator.core.Connection
    public void reset() {
        super.reset();
        getSubscriptions().forEach((v0) -> {
            v0.dispose();
        });
        this.subscribers.clear();
    }
}
