package org.jetlinks.community.network.tcp.gateway.device;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.community.gateway.AbstractDeviceGateway;
import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.gateway.DeviceGatewayHelper;
import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.client.TcpClient;
import org.jetlinks.community.network.tcp.server.TcpServer;
import org.jetlinks.community.utils.TimeUtils;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.FromDeviceMessageContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.DeviceGatewayContext;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.trace.DeviceTracer;
import org.jetlinks.core.trace.MonoTracer;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/jetlinks/community/network/tcp/gateway/device/TcpServerDeviceGateway.class */
public class TcpServerDeviceGateway extends AbstractDeviceGateway implements DeviceGateway, MonitorSupportDeviceGateway {
    private static final Logger log = LoggerFactory.getLogger(TcpServerDeviceGateway.class);
    final TcpServer tcpServer;
    Mono<ProtocolSupport> protocol;
    private final DeviceRegistry registry;
    private final DeviceSessionManager sessionManager;
    private final LongAdder counter;
    private final AtomicBoolean started;
    private Disposable disposable;
    private final DeviceGatewayHelper helper;
    private Duration connectCheckTimeout;

    /* loaded from: input_file:org/jetlinks/community/network/tcp/gateway/device/TcpServerDeviceGateway$TcpConnection.class */
    class TcpConnection implements DeviceGatewayContext {
        final TcpClient client;
        final AtomicReference<DeviceSession> sessionRef = new AtomicReference<>();
        final InetSocketAddress address;
        Disposable legalityChecker;

        TcpConnection(TcpClient tcpClient) {
            this.client = tcpClient;
            this.address = tcpClient.getRemoteAddress();
            TcpServerDeviceGateway.this.monitor.totalConnection(TcpServerDeviceGateway.this.counter.sum());
            tcpClient.onDisconnect(() -> {
                TcpServerDeviceGateway.this.counter.decrement();
                TcpServerDeviceGateway.this.monitor.disconnected();
                TcpServerDeviceGateway.this.monitor.totalConnection(TcpServerDeviceGateway.this.counter.sum());
                DeviceSession deviceSession = this.sessionRef.get();
                if (deviceSession.getDeviceId() != null) {
                    TcpServerDeviceGateway.this.sessionManager.getSession(deviceSession.getDeviceId()).subscribe();
                }
            });
            TcpServerDeviceGateway.this.monitor.connected();
            this.sessionRef.set(new UnknownTcpDeviceSession(tcpClient.getId(), tcpClient, TcpServerDeviceGateway.this.getTransport(), TcpServerDeviceGateway.this.monitor));
            this.legalityChecker = Schedulers.parallel().schedule(this::checkLegality, TcpServerDeviceGateway.this.connectCheckTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }

        public void checkLegality() {
            if (this.sessionRef.get() instanceof UnknownTcpDeviceSession) {
                TcpServerDeviceGateway.log.info("tcp [{}] connection is illegal, close it.", this.address);
                try {
                    this.client.disconnect();
                } catch (Throwable th) {
                }
            }
        }

        Mono<Void> accept() {
            Mono then = TcpServerDeviceGateway.this.getProtocol().flatMap(protocolSupport -> {
                return protocolSupport.onClientConnect(TcpServerDeviceGateway.this.getTransport(), this.client, this);
            }).then(this.client.subscribe().filter(tcpMessage -> {
                return TcpServerDeviceGateway.this.started.get();
            }).flatMap(this::handleTcpMessage).onErrorResume(th -> {
                TcpServerDeviceGateway.log.error(th.getMessage(), th);
                this.client.shutdown();
                return Mono.empty();
            }).then());
            TcpClient tcpClient = this.client;
            tcpClient.getClass();
            return then.doOnCancel(tcpClient::shutdown);
        }

        Mono<Void> handleTcpMessage(TcpMessage tcpMessage) {
            long nanoTime = System.nanoTime();
            return TcpServerDeviceGateway.this.getProtocol().flatMap(protocolSupport -> {
                return protocolSupport.getMessageCodec(TcpServerDeviceGateway.this.getTransport());
            }).flatMapMany(deviceMessageCodec -> {
                return deviceMessageCodec.decode(FromDeviceMessageContext.of(this.sessionRef.get(), tcpMessage, TcpServerDeviceGateway.this.registry));
            }).cast(DeviceMessage.class).flatMap(deviceMessage -> {
                return (Mono) handleDeviceMessage(deviceMessage).as(MonoTracer.create(DeviceTracer.SpanName.decode(deviceMessage.getDeviceId()), spanBuilder -> {
                    spanBuilder.setAttribute(DeviceTracer.SpanKey.message, deviceMessage.toString());
                    spanBuilder.setStartTimestamp(nanoTime, TimeUnit.NANOSECONDS);
                }));
            }).doOnEach(ReactiveLogger.onError(th -> {
                TcpServerDeviceGateway.log.error("Handle TCP[{}] message failed:\n{}", new Object[]{this.address, tcpMessage, th});
            })).onErrorResume(th2 -> {
                TcpClient tcpClient = this.client;
                tcpClient.getClass();
                return Mono.fromRunnable(tcpClient::reset);
            }).subscribeOn(Schedulers.parallel()).then();
        }

        Mono<DeviceMessage> handleDeviceMessage(DeviceMessage deviceMessage) {
            Disposable disposable = this.legalityChecker;
            if (disposable != null) {
                disposable.dispose();
                this.legalityChecker = null;
            }
            TcpServerDeviceGateway.this.monitor.receivedMessage();
            return TcpServerDeviceGateway.this.helper.handleDeviceMessage(deviceMessage, deviceOperator -> {
                return new TcpDeviceSession(deviceOperator, this.client, TcpServerDeviceGateway.this.getTransport(), TcpServerDeviceGateway.this.monitor);
            }, deviceSession -> {
                TcpDeviceSession tcpDeviceSession = (TcpDeviceSession) deviceSession.unwrap(TcpDeviceSession.class);
                tcpDeviceSession.setClient(this.client);
                this.sessionRef.set(tcpDeviceSession);
            }, () -> {
                TcpServerDeviceGateway.log.warn("TCP{}: The device[{}] in the message body does not exist:{}", new Object[]{this.address, deviceMessage.getDeviceId(), deviceMessage});
            }).thenReturn(deviceMessage);
        }

        public Mono<DeviceOperator> getDevice(String str) {
            return TcpServerDeviceGateway.this.registry.getDevice(str);
        }

        public Mono<DeviceProductOperator> getProduct(String str) {
            return TcpServerDeviceGateway.this.registry.getProduct(str);
        }

        public Mono<Void> onMessage(DeviceMessage deviceMessage) {
            return handleDeviceMessage(deviceMessage).then();
        }
    }

    public TcpServerDeviceGateway(String str, Mono<ProtocolSupport> mono, DeviceRegistry deviceRegistry, DecodedClientMessageHandler decodedClientMessageHandler, DeviceSessionManager deviceSessionManager, TcpServer tcpServer) {
        super(str);
        this.counter = new LongAdder();
        this.started = new AtomicBoolean();
        this.connectCheckTimeout = TimeUtils.parse(System.getProperty("gateway.tcp.network.connect-check-timeout", "10s"));
        this.protocol = mono;
        this.registry = deviceRegistry;
        this.tcpServer = tcpServer;
        this.sessionManager = deviceSessionManager;
        this.helper = new DeviceGatewayHelper(this.registry, deviceSessionManager, decodedClientMessageHandler);
    }

    public Mono<ProtocolSupport> getProtocol() {
        return this.protocol;
    }

    public long totalConnection() {
        return this.counter.sum();
    }

    public Transport getTransport() {
        return DefaultTransport.TCP;
    }

    public NetworkType getNetworkType() {
        return DefaultNetworkType.TCP_SERVER;
    }

    private void doStart() {
        if (this.started.getAndSet(true) || this.disposable != null) {
            return;
        }
        this.disposable = this.tcpServer.handleConnection().publishOn(Schedulers.parallel()).flatMap(tcpClient -> {
            return new TcpConnection(tcpClient).accept().onErrorResume(th -> {
                log.error("handle tcp client[{}] error", tcpClient.getRemoteAddress(), th);
                return Mono.empty();
            });
        }, Integer.MAX_VALUE).contextWrite(ReactiveLogger.start("network", this.tcpServer.getId())).subscribe(r1 -> {
        }, th -> {
            log.error(th.getMessage(), th);
        });
    }

    protected Mono<Void> doStartup() {
        return Mono.fromRunnable(this::doStart);
    }

    protected Mono<Void> doShutdown() {
        return Mono.fromRunnable(() -> {
            if (null != this.disposable) {
                this.disposable.dispose();
                this.disposable = null;
            }
        });
    }
}
