package org.jetlinks.community.network.manager.debug;

import java.util.HashMap;
import java.util.Map;
import org.jetlinks.community.gateway.external.SubscribeRequest;
import org.jetlinks.community.gateway.external.SubscriptionProvider;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkManager;
import org.jetlinks.community.network.manager.web.response.MqttMessageResponse;
import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.community.network.mqtt.server.MqttPublishing;
import org.jetlinks.community.network.mqtt.server.MqttSubscription;
import org.jetlinks.community.network.mqtt.server.MqttUnSubscription;
import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.rule.engine.executor.PayloadType;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
/* loaded from: input_file:org/jetlinks/community/network/manager/debug/MqttServerDebugSubscriptionProvider.class */
public class MqttServerDebugSubscriptionProvider implements SubscriptionProvider {
    private static final Logger log = LoggerFactory.getLogger(MqttServerDebugSubscriptionProvider.class);
    private final NetworkManager networkManager;

    /* loaded from: input_file:org/jetlinks/community/network/manager/debug/MqttServerDebugSubscriptionProvider$MqttClientMessage.class */
    public static class MqttClientMessage {
        private String type;
        private String typeText;
        private Object data;

        public static MqttClientMessage of(MqttConnection mqttConnection) {
            HashMap hashMap = new HashMap();
            hashMap.put("clientId", mqttConnection.getClientId());
            hashMap.put("address", mqttConnection.getClientAddress().toString());
            mqttConnection.getAuth().ifPresent(mqttAuth -> {
                hashMap.put("username", mqttAuth.getUsername());
                hashMap.put("password", mqttAuth.getPassword());
            });
            return of("connection", "连接", hashMap);
        }

        public static MqttClientMessage ofDisconnect(MqttConnection mqttConnection) {
            HashMap hashMap = new HashMap();
            hashMap.put("clientId", mqttConnection.getClientId());
            hashMap.put("address", mqttConnection.getClientAddress().toString());
            mqttConnection.getAuth().ifPresent(mqttAuth -> {
                hashMap.put("username", mqttAuth.getUsername());
                hashMap.put("password", mqttAuth.getPassword());
            });
            return of("disconnection", "断开连接", hashMap);
        }

        public static MqttClientMessage of(MqttConnection mqttConnection, MqttSubscription mqttSubscription) {
            HashMap hashMap = new HashMap();
            hashMap.put("clientId", mqttConnection.getClientId());
            hashMap.put("address", mqttConnection.getClientAddress().toString());
            hashMap.put("topics", mqttSubscription.getMessage().topicSubscriptions().stream().map(mqttTopicSubscription -> {
                return "QoS:" + mqttTopicSubscription.qualityOfService().value() + " Topic:" + mqttTopicSubscription.topicName();
            }));
            return of("subscription", "订阅", hashMap);
        }

        public static MqttClientMessage of(MqttConnection mqttConnection, MqttUnSubscription mqttUnSubscription) {
            HashMap hashMap = new HashMap();
            hashMap.put("clientId", mqttConnection.getClientId());
            hashMap.put("address", mqttConnection.getClientAddress().toString());
            hashMap.put("topics", mqttUnSubscription.getMessage().topics());
            return of("unsubscription", "取消订阅", hashMap);
        }

        public static MqttClientMessage of(MqttConnection mqttConnection, MqttPublishing mqttPublishing, PayloadType payloadType) {
            HashMap hashMap = new HashMap();
            hashMap.put("clientId", mqttConnection.getClientId());
            hashMap.put("address", mqttConnection.getClientAddress().toString());
            hashMap.put("message", MqttMessageResponse.of(mqttPublishing.getMessage(), payloadType));
            return of("publish", "推送消息", hashMap);
        }

        private MqttClientMessage(String str, String str2, Object obj) {
            this.type = str;
            this.typeText = str2;
            this.data = obj;
        }

        public static MqttClientMessage of(String str, String str2, Object obj) {
            return new MqttClientMessage(str, str2, obj);
        }

        public String getType() {
            return this.type;
        }

        public String getTypeText() {
            return this.typeText;
        }

        public Object getData() {
            return this.data;
        }

        public void setType(String str) {
            this.type = str;
        }

        public void setTypeText(String str) {
            this.typeText = str;
        }

        public void setData(Object obj) {
            this.data = obj;
        }
    }

    public MqttServerDebugSubscriptionProvider(NetworkManager networkManager) {
        this.networkManager = networkManager;
    }

    public String id() {
        return "network-server-mqtt-debug";
    }

    public String name() {
        return "MQTT服务调试";
    }

    public String[] getTopicPattern() {
        return new String[]{"/network/mqtt/server/*/_subscribe/*"};
    }

    public Flux<MqttClientMessage> subscribe(SubscribeRequest subscribeRequest) {
        DebugAuthenticationHandler.handle(subscribeRequest);
        Map pathVariables = TopicUtils.getPathVariables("/network/mqtt/server/{id}/_subscribe/{type}", subscribeRequest.getTopic());
        String str = (String) pathVariables.get("id");
        PayloadType valueOf = PayloadType.valueOf(((String) pathVariables.get("type")).toUpperCase());
        return Flux.create(fluxSink -> {
            Mono flatMap = this.networkManager.getNetwork(DefaultNetworkType.MQTT_SERVER, str).flatMap(mqttServer -> {
                Flux flatMap2 = mqttServer.handleConnection().doOnNext(mqttConnection -> {
                    fluxSink.next(MqttClientMessage.of(mqttConnection.accept()));
                    mqttConnection.onClose(mqttConnection -> {
                        fluxSink.next(MqttClientMessage.ofDisconnect(mqttConnection));
                    });
                }).flatMap(mqttConnection2 -> {
                    return Flux.merge(new Publisher[]{mqttConnection2.handleSubscribe(true).map(mqttSubscription -> {
                        return MqttClientMessage.of(mqttConnection2, mqttSubscription);
                    }), mqttConnection2.handleUnSubscribe(true).map(mqttUnSubscription -> {
                        return MqttClientMessage.of(mqttConnection2, mqttUnSubscription);
                    }), mqttConnection2.handleMessage().map(mqttPublishing -> {
                        return MqttClientMessage.of(mqttConnection2, mqttPublishing, valueOf);
                    })});
                });
                fluxSink.getClass();
                return flatMap2.doOnNext((v1) -> {
                    r1.next(v1);
                }).then();
            });
            fluxSink.getClass();
            fluxSink.onDispose(flatMap.doOnError(fluxSink::error).doOnSubscribe(subscription -> {
                log.debug("start mqtt server[{}] debug", str);
            }).doOnCancel(() -> {
                log.debug("stop mqtt server[{}] debug", str);
            }).subscribe());
        });
    }
}
