package org.jetlinks.community.network.manager.debug;

import io.netty.buffer.Unpooled;
import java.util.HashMap;
import org.jetlinks.community.gateway.external.SubscribeRequest;
import org.jetlinks.community.gateway.external.SubscriptionProvider;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkManager;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.client.TcpClient;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
/* loaded from: input_file:org/jetlinks/community/network/manager/debug/TcpServerDebugSubscriptionProvider.class */
public class TcpServerDebugSubscriptionProvider implements SubscriptionProvider {
    private final NetworkManager networkManager;

    /* loaded from: input_file:org/jetlinks/community/network/manager/debug/TcpServerDebugSubscriptionProvider$TcpClientMessage.class */
    public static class TcpClientMessage {
        private String type;
        private String typeText;
        private Object data;

        public static TcpClientMessage of(TcpClient tcpClient) {
            HashMap hashMap = new HashMap();
            hashMap.put("address", tcpClient.getRemoteAddress());
            return of("connection", "连接", hashMap);
        }

        public static TcpClientMessage ofDisconnect(TcpClient tcpClient) {
            HashMap hashMap = new HashMap();
            hashMap.put("address", tcpClient.getRemoteAddress());
            return of("disconnection", "断开连接", hashMap);
        }

        public static TcpClientMessage of(TcpClient tcpClient, TcpMessage tcpMessage) {
            HashMap hashMap = new HashMap();
            hashMap.put("address", tcpClient.getRemoteAddress().toString());
            hashMap.put("message", tcpMessage.toString());
            return of("publish", "订阅", hashMap);
        }

        private TcpClientMessage(String str, String str2, Object obj) {
            this.type = str;
            this.typeText = str2;
            this.data = obj;
        }

        public static TcpClientMessage of(String str, String str2, Object obj) {
            return new TcpClientMessage(str, str2, obj);
        }

        public String getType() {
            return this.type;
        }

        public String getTypeText() {
            return this.typeText;
        }

        public Object getData() {
            return this.data;
        }

        public void setType(String str) {
            this.type = str;
        }

        public void setTypeText(String str) {
            this.typeText = str;
        }

        public void setData(Object obj) {
            this.data = obj;
        }
    }

    public TcpServerDebugSubscriptionProvider(NetworkManager networkManager) {
        this.networkManager = networkManager;
    }

    public String id() {
        return "network-tcp-server-debug";
    }

    public String name() {
        return "TCP服务调试";
    }

    public String[] getTopicPattern() {
        return new String[]{"/network/tcp/server/*/_subscribe"};
    }

    public Flux<TcpClientMessage> subscribe(SubscribeRequest subscribeRequest) {
        return subscribe(subscribeRequest.getTopic().split("[/]")[4], subscribeRequest);
    }

    public Flux<TcpClientMessage> subscribe(String str, SubscribeRequest subscribeRequest) {
        byte[] stringToBytes = DebugUtils.stringToBytes((String) subscribeRequest.getString("response").filter(StringUtils::hasText).orElse(null));
        return Flux.create(fluxSink -> {
            Mono flatMap = this.networkManager.getNetwork(DefaultNetworkType.TCP_SERVER, str).flatMap(tcpServer -> {
                return tcpServer.handleConnection().doOnNext(tcpClient -> {
                    fluxSink.next(TcpClientMessage.of(tcpClient));
                }).flatMap(tcpClient2 -> {
                    tcpClient2.onDisconnect(() -> {
                        fluxSink.next(TcpClientMessage.ofDisconnect(tcpClient2));
                    });
                    Flux map = tcpClient2.subscribe().map(tcpMessage -> {
                        return TcpClientMessage.of(tcpClient2, tcpMessage);
                    });
                    fluxSink.getClass();
                    return map.doOnNext((v1) -> {
                        r1.next(v1);
                    }).flatMap(tcpClientMessage -> {
                        return stringToBytes.length > 0 ? tcpClient2.send(new TcpMessage(Unpooled.wrappedBuffer(stringToBytes))) : Mono.empty();
                    }).then();
                }).then();
            });
            fluxSink.getClass();
            fluxSink.onDispose(flatMap.doOnError(fluxSink::error).subscriberContext(fluxSink.currentContext()).subscribe());
        });
    }
}
