/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.network.mqtt.client;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.buffer.Buffer;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.mqtt.client.MqttClient;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.SimpleMqttMessage;
import org.jetlinks.core.topic.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;

public class VertxMqttClient
implements MqttClient {
    private static final Logger log = LoggerFactory.getLogger(VertxMqttClient.class);
    private io.vertx.mqtt.MqttClient client;
    private final Topic<Tuple3<String, FluxSink<MqttMessage>, Integer>> subscriber = Topic.createRoot();
    private final String id;
    private volatile boolean loading;
    private final List<Runnable> loadSuccessListener = new CopyOnWriteArrayList<Runnable>();
    private String topicPrefix;

    public void setLoading(boolean loading) {
        this.loading = loading;
        if (!loading) {
            this.loadSuccessListener.forEach(Runnable::run);
            this.loadSuccessListener.clear();
        }
    }

    public boolean isLoading() {
        return this.loading;
    }

    public VertxMqttClient(String id) {
        this.id = id;
    }

    public void setClient(io.vertx.mqtt.MqttClient client) {
        if (this.client != null && this.client != client) {
            try {
                this.client.disconnect();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        this.client = client;
        client.closeHandler(nil -> log.debug("mqtt client [{}] closed", (Object)this.id)).publishHandler(msg -> {
            try {
                SimpleMqttMessage mqttMessage = SimpleMqttMessage.builder().messageId(msg.messageId()).topic(msg.topicName()).payload(msg.payload().getByteBuf()).dup(msg.isDup()).retain(msg.isRetain()).qosLevel(msg.qosLevel().value()).properties(msg.properties()).build();
                log.debug("handle mqtt message \n{}", (Object)mqttMessage);
                this.subscriber.findTopic(msg.topicName().replace("#", "**").replace("+", "*")).flatMapIterable(Topic::getSubscribers).subscribe(arg_0 -> VertxMqttClient.lambda$null$4((MqttMessage)mqttMessage, arg_0));
            }
            catch (Throwable e) {
                log.error("handle mqtt message error", e);
            }
        });
        if (this.loading) {
            this.loadSuccessListener.add(this::reSubscribe);
        } else if (this.isAlive()) {
            this.reSubscribe();
        }
    }

    private void reSubscribe() {
        this.subscriber.getAllSubscriber().filter(topic -> topic.getSubscribers().size() > 0).collectMap(topic -> this.getCompleteTopic(this.convertMqttTopic((String)((Tuple3)topic.getSubscribers().iterator().next()).getT1())), topic -> (Integer)((Tuple3)topic.getSubscribers().iterator().next()).getT3()).filter(MapUtils::isNotEmpty).subscribe(topics -> {
            log.debug("subscribe mqtt topic {}", topics);
            this.client.subscribe(topics);
        });
    }

    private String convertMqttTopic(String topic) {
        return topic.replace("**", "#").replace("*", "+");
    }

    protected String parseTopic(String topic) {
        if (topic.startsWith("$share")) {
            topic = Stream.of(topic.split("/")).skip(2L).collect(Collectors.joining("/", "/", ""));
        } else if (topic.startsWith("$queue")) {
            topic = topic.substring(6);
        }
        if (topic.startsWith("//")) {
            return topic.substring(1);
        }
        return topic;
    }

    protected String getCompleteTopic(String topic) {
        if (StringUtils.isEmpty((CharSequence)this.topicPrefix)) {
            return topic;
        }
        return this.topicPrefix.concat(topic);
    }

    @Override
    public Flux<MqttMessage> subscribe(List<String> topics, int qos) {
        return Flux.create(sink -> {
            Disposable.Composite composite = Disposables.composite();
            for (String topic : topics) {
                String realTopic = this.parseTopic(topic);
                String completeTopic = this.getCompleteTopic(topic);
                Topic sinkTopic = this.subscriber.append(realTopic.replace("#", "**").replace("+", "*"));
                Tuple3 topicQos = Tuples.of((Object)topic, (Object)sink, (Object)qos);
                boolean first = sinkTopic.getSubscribers().size() == 0;
                sinkTopic.subscribe((Object[])new Tuple3[]{topicQos});
                composite.add(() -> {
                    if (sinkTopic.unsubscribe((Object[])new Tuple3[]{topicQos}).size() > 0 && this.isAlive()) {
                        this.client.unsubscribe(this.convertMqttTopic(completeTopic), result -> {
                            if (result.succeeded()) {
                                log.debug("unsubscribe mqtt topic {}", (Object)completeTopic);
                            } else {
                                log.debug("unsubscribe mqtt topic {} error", (Object)completeTopic, (Object)result.cause());
                            }
                        });
                    }
                });
                if (!this.isAlive() || !first) continue;
                log.debug("subscribe mqtt topic {}", (Object)completeTopic);
                this.client.subscribe(this.convertMqttTopic(completeTopic), qos, result -> {
                    if (!result.succeeded()) {
                        sink.error(result.cause());
                    }
                });
            }
            sink.onDispose((Disposable)composite);
        });
    }

    private Mono<Void> doPublish(MqttMessage message) {
        return Mono.create(sink -> {
            ByteBuf payload = message.getPayload();
            Buffer buffer = Buffer.buffer((ByteBuf)payload);
            this.client.publish(message.getTopic(), buffer, MqttQoS.valueOf((int)message.getQosLevel()), message.isDup(), message.isRetain(), result -> {
                try {
                    if (result.succeeded()) {
                        log.info("publish mqtt [{}] message success: {}", (Object)this.client.clientId(), (Object)message);
                        sink.success();
                    } else {
                        log.info("publish mqtt [{}] message error : {}", new Object[]{this.client.clientId(), message, result.cause()});
                        sink.error(result.cause());
                    }
                }
                finally {
                    ReferenceCountUtil.safeRelease((Object)payload);
                }
            });
        });
    }

    @Override
    public Mono<Void> publish(MqttMessage message) {
        if (this.loading) {
            return Mono.create(sink -> this.loadSuccessListener.add(() -> this.doPublish(message).doOnSuccess(arg_0 -> ((MonoSink)sink).success(arg_0)).doOnError(arg_0 -> ((MonoSink)sink).error(arg_0)).subscribe()));
        }
        return this.doPublish(message);
    }

    public String getId() {
        return this.id;
    }

    public NetworkType getType() {
        return DefaultNetworkType.MQTT_CLIENT;
    }

    public void shutdown() {
        this.loading = false;
        if (this.isAlive()) {
            try {
                this.client.disconnect();
            }
            catch (Exception exception) {
                // empty catch block
            }
            this.client = null;
        }
    }

    public boolean isAlive() {
        return this.client != null && this.client.isConnected();
    }

    public boolean isAutoReload() {
        return true;
    }

    public io.vertx.mqtt.MqttClient getClient() {
        return this.client;
    }

    public void setTopicPrefix(String topicPrefix) {
        this.topicPrefix = topicPrefix;
    }

    private static /* synthetic */ void lambda$null$4(MqttMessage mqttMessage, Tuple3 sink) {
        try {
            ((FluxSink)sink.getT2()).next((Object)mqttMessage);
        }
        catch (Exception e) {
            log.error("handle mqtt message error", (Throwable)e);
        }
    }
}

