package org.jetlinks.community.network.mqtt.client;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.buffer.Buffer;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.SimpleMqttMessage;
import org.jetlinks.core.topic.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/jetlinks/community/network/mqtt/client/VertxMqttClient.class */
public class VertxMqttClient implements MqttClient {
    private static final Logger log = LoggerFactory.getLogger(VertxMqttClient.class);
    private io.vertx.mqtt.MqttClient client;
    private final String id;
    private volatile boolean loading;
    private String topicPrefix;
    private final Topic<Tuple3<String, FluxSink<MqttMessage>, Integer>> subscriber = Topic.createRoot();
    private final List<Runnable> loadSuccessListener = new CopyOnWriteArrayList();

    public void setLoading(boolean z) {
        this.loading = z;
        if (z) {
            return;
        }
        this.loadSuccessListener.forEach((v0) -> {
            v0.run();
        });
        this.loadSuccessListener.clear();
    }

    public boolean isLoading() {
        return this.loading;
    }

    public VertxMqttClient(String str) {
        this.id = str;
    }

    public void setClient(io.vertx.mqtt.MqttClient mqttClient) {
        if (this.client != null && this.client != mqttClient) {
            try {
                this.client.disconnect();
            } catch (Exception e) {
            }
        }
        this.client = mqttClient;
        mqttClient.closeHandler(r5 -> {
            log.debug("mqtt client [{}] closed", this.id);
        }).publishHandler(mqttPublishMessage -> {
            try {
                SimpleMqttMessage build = SimpleMqttMessage.builder().messageId(mqttPublishMessage.messageId()).topic(mqttPublishMessage.topicName()).payload(mqttPublishMessage.payload().getByteBuf()).dup(mqttPublishMessage.isDup()).retain(mqttPublishMessage.isRetain()).qosLevel(mqttPublishMessage.qosLevel().value()).properties(mqttPublishMessage.properties()).build();
                log.debug("handle mqtt message \n{}", build);
                this.subscriber.findTopic(mqttPublishMessage.topicName().replace("#", "**").replace("+", "*")).flatMapIterable((v0) -> {
                    return v0.getSubscribers();
                }).subscribe(tuple3 -> {
                    try {
                        ((FluxSink) tuple3.getT2()).next(build);
                    } catch (Exception e2) {
                        log.error("handle mqtt message error", e2);
                    }
                });
            } catch (Throwable th) {
                log.error("handle mqtt message error", th);
            }
        });
        if (this.loading) {
            this.loadSuccessListener.add(this::reSubscribe);
        } else if (isAlive()) {
            reSubscribe();
        }
    }

    private void reSubscribe() {
        this.subscriber.getAllSubscriber().filter(topic -> {
            return topic.getSubscribers().size() > 0;
        }).collectMap(topic2 -> {
            return getCompleteTopic(convertMqttTopic((String) ((Tuple3) topic2.getSubscribers().iterator().next()).getT1()));
        }, topic3 -> {
            return (Integer) ((Tuple3) topic3.getSubscribers().iterator().next()).getT3();
        }).filter(MapUtils::isNotEmpty).subscribe(map -> {
            log.debug("subscribe mqtt topic {}", map);
            this.client.subscribe(map);
        });
    }

    private String convertMqttTopic(String str) {
        return str.replace("**", "#").replace("*", "+");
    }

    protected String parseTopic(String str) {
        if (str.startsWith("$share")) {
            str = (String) Stream.of((Object[]) str.split("/")).skip(2L).collect(Collectors.joining("/", "/", ""));
        } else if (str.startsWith("$queue")) {
            str = str.substring(6);
        }
        return str.startsWith("//") ? str.substring(1) : str;
    }

    protected String getCompleteTopic(String str) {
        return StringUtils.isEmpty(this.topicPrefix) ? str : this.topicPrefix.concat(str);
    }

    @Override // org.jetlinks.community.network.mqtt.client.MqttClient
    public Flux<MqttMessage> subscribe(List<String> list, int i) {
        return Flux.create(fluxSink -> {
            Disposable.Composite composite = Disposables.composite();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                String parseTopic = parseTopic(str);
                String completeTopic = getCompleteTopic(str);
                Topic append = this.subscriber.append(parseTopic.replace("#", "**").replace("+", "*"));
                Tuple3 of = Tuples.of(str, fluxSink, Integer.valueOf(i));
                boolean z = append.getSubscribers().size() == 0;
                append.subscribe(new Tuple3[]{of});
                composite.add(() -> {
                    if (append.unsubscribe(new Tuple3[]{of}).size() <= 0 || !isAlive()) {
                        return;
                    }
                    this.client.unsubscribe(convertMqttTopic(completeTopic), asyncResult -> {
                        if (asyncResult.succeeded()) {
                            log.debug("unsubscribe mqtt topic {}", completeTopic);
                        } else {
                            log.debug("unsubscribe mqtt topic {} error", completeTopic, asyncResult.cause());
                        }
                    });
                });
                if (isAlive() && z) {
                    log.debug("subscribe mqtt topic {}", completeTopic);
                    this.client.subscribe(convertMqttTopic(completeTopic), i, asyncResult -> {
                        if (asyncResult.succeeded()) {
                            return;
                        }
                        fluxSink.error(asyncResult.cause());
                    });
                }
            }
            fluxSink.onDispose(composite);
        });
    }

    private Mono<Void> doPublish(MqttMessage mqttMessage) {
        return Mono.create(monoSink -> {
            ByteBuf payload = mqttMessage.getPayload();
            this.client.publish(mqttMessage.getTopic(), Buffer.buffer(payload), MqttQoS.valueOf(mqttMessage.getQosLevel()), mqttMessage.isDup(), mqttMessage.isRetain(), asyncResult -> {
                try {
                    if (asyncResult.succeeded()) {
                        log.info("publish mqtt [{}] message success: {}", this.client.clientId(), mqttMessage);
                        monoSink.success();
                    } else {
                        log.info("publish mqtt [{}] message error : {}", new Object[]{this.client.clientId(), mqttMessage, asyncResult.cause()});
                        monoSink.error(asyncResult.cause());
                    }
                } finally {
                    ReferenceCountUtil.safeRelease(payload);
                }
            });
        });
    }

    @Override // org.jetlinks.community.network.mqtt.client.MqttClient
    public Mono<Void> publish(MqttMessage mqttMessage) {
        return this.loading ? Mono.create(monoSink -> {
            this.loadSuccessListener.add(() -> {
                Mono<Void> doPublish = doPublish(mqttMessage);
                monoSink.getClass();
                Mono doOnSuccess = doPublish.doOnSuccess((v1) -> {
                    r1.success(v1);
                });
                monoSink.getClass();
                doOnSuccess.doOnError(monoSink::error).subscribe();
            });
        }) : doPublish(mqttMessage);
    }

    public String getId() {
        return this.id;
    }

    public NetworkType getType() {
        return DefaultNetworkType.MQTT_CLIENT;
    }

    public void shutdown() {
        this.loading = false;
        if (isAlive()) {
            try {
                this.client.disconnect();
            } catch (Exception e) {
            }
            this.client = null;
        }
    }

    public boolean isAlive() {
        return this.client != null && this.client.isConnected();
    }

    public boolean isAutoReload() {
        return true;
    }

    public io.vertx.mqtt.MqttClient getClient() {
        return this.client;
    }

    public void setTopicPrefix(String str) {
        this.topicPrefix = str;
    }
}
