/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.network.manager.debug;

import java.util.HashMap;
import java.util.Map;
import org.jetlinks.community.gateway.external.SubscribeRequest;
import org.jetlinks.community.gateway.external.SubscriptionProvider;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkManager;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.manager.debug.DebugAuthenticationHandler;
import org.jetlinks.community.network.manager.web.response.MqttMessageResponse;
import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.community.network.mqtt.server.MqttPublishing;
import org.jetlinks.community.network.mqtt.server.MqttSubscription;
import org.jetlinks.community.network.mqtt.server.MqttUnSubscription;
import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.rule.engine.executor.PayloadType;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

@Component
public class MqttServerDebugSubscriptionProvider
implements SubscriptionProvider {
    private static final Logger log = LoggerFactory.getLogger(MqttServerDebugSubscriptionProvider.class);
    private final NetworkManager networkManager;

    public MqttServerDebugSubscriptionProvider(NetworkManager networkManager) {
        this.networkManager = networkManager;
    }

    public String id() {
        return "network-server-mqtt-debug";
    }

    public String name() {
        return "MQTT\u670d\u52a1\u8c03\u8bd5";
    }

    public String[] getTopicPattern() {
        return new String[]{"/network/mqtt/server/*/_subscribe/*"};
    }

    public Flux<MqttClientMessage> subscribe(SubscribeRequest request) {
        DebugAuthenticationHandler.handle(request);
        Map vars = TopicUtils.getPathVariables((String)"/network/mqtt/server/{id}/_subscribe/{type}", (String)request.getTopic());
        String clientId = (String)vars.get("id");
        PayloadType type = PayloadType.valueOf((String)((String)vars.get("type")).toUpperCase());
        return Flux.create(sink -> sink.onDispose(this.networkManager.getNetwork((NetworkType)DefaultNetworkType.MQTT_SERVER, clientId).flatMap(mqtt -> mqtt.handleConnection().doOnNext(conn -> {
            sink.next((Object)MqttClientMessage.of(conn.accept()));
            conn.onClose(disconnect -> sink.next((Object)MqttClientMessage.ofDisconnect(disconnect)));
        }).flatMap(conn -> Flux.merge((Publisher[])new Publisher[]{conn.handleSubscribe(true).map(sub -> MqttClientMessage.of(conn, sub)), conn.handleUnSubscribe(true).map(sub -> MqttClientMessage.of(conn, sub)), conn.handleMessage().map(sub -> MqttClientMessage.of(conn, sub, type))})).doOnNext(arg_0 -> ((FluxSink)sink).next(arg_0)).then()).doOnError(arg_0 -> ((FluxSink)sink).error(arg_0)).doOnSubscribe(sub -> log.debug("start mqtt server[{}] debug", (Object)clientId)).doOnCancel(() -> log.debug("stop mqtt server[{}] debug", (Object)clientId)).subscribe()));
    }

    public static class MqttClientMessage {
        private String type;
        private String typeText;
        private Object data;

        public static MqttClientMessage of(MqttConnection connection) {
            HashMap<String, String> data = new HashMap<String, String>();
            data.put("clientId", connection.getClientId());
            data.put("address", connection.getClientAddress().toString());
            connection.getAuth().ifPresent(auth -> {
                data.put("username", auth.getUsername());
                data.put("password", auth.getPassword());
            });
            return MqttClientMessage.of("connection", "\u8fde\u63a5", data);
        }

        public static MqttClientMessage ofDisconnect(MqttConnection connection) {
            HashMap<String, String> data = new HashMap<String, String>();
            data.put("clientId", connection.getClientId());
            data.put("address", connection.getClientAddress().toString());
            connection.getAuth().ifPresent(auth -> {
                data.put("username", auth.getUsername());
                data.put("password", auth.getPassword());
            });
            return MqttClientMessage.of("disconnection", "\u65ad\u5f00\u8fde\u63a5", data);
        }

        public static MqttClientMessage of(MqttConnection connection, MqttSubscription subscription) {
            HashMap<String, Object> data = new HashMap<String, Object>();
            data.put("clientId", connection.getClientId());
            data.put("address", connection.getClientAddress().toString());
            data.put("topics", subscription.getMessage().topicSubscriptions().stream().map(subs -> "QoS:" + subs.qualityOfService().value() + " Topic:" + subs.topicName()));
            return MqttClientMessage.of("subscription", "\u8ba2\u9605", data);
        }

        public static MqttClientMessage of(MqttConnection connection, MqttUnSubscription subscription) {
            HashMap<String, Object> data = new HashMap<String, Object>();
            data.put("clientId", connection.getClientId());
            data.put("address", connection.getClientAddress().toString());
            data.put("topics", subscription.getMessage().topics());
            return MqttClientMessage.of("unsubscription", "\u53d6\u6d88\u8ba2\u9605", data);
        }

        public static MqttClientMessage of(MqttConnection connection, MqttPublishing subscription, PayloadType type) {
            HashMap<String, Object> data = new HashMap<String, Object>();
            data.put("clientId", connection.getClientId());
            data.put("address", connection.getClientAddress().toString());
            data.put("message", MqttMessageResponse.of(subscription.getMessage(), type));
            return MqttClientMessage.of("publish", "\u63a8\u9001\u6d88\u606f", data);
        }

        private MqttClientMessage(String type, String typeText, Object data) {
            this.type = type;
            this.typeText = typeText;
            this.data = data;
        }

        public static MqttClientMessage of(String type, String typeText, Object data) {
            return new MqttClientMessage(type, typeText, data);
        }

        public String getType() {
            return this.type;
        }

        public String getTypeText() {
            return this.typeText;
        }

        public Object getData() {
            return this.data;
        }

        public void setType(String type) {
            this.type = type;
        }

        public void setTypeText(String typeText) {
            this.typeText = typeText;
        }

        public void setData(Object data) {
            this.data = data;
        }
    }
}

