/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.network.tcp.gateway.device;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.community.gateway.AbstractDeviceGateway;
import org.jetlinks.community.gateway.DeviceGateway;
import org.jetlinks.community.gateway.DeviceGatewayHelper;
import org.jetlinks.community.gateway.monitor.MonitorSupportDeviceGateway;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.client.TcpClient;
import org.jetlinks.community.network.tcp.gateway.device.TcpDeviceSession;
import org.jetlinks.community.network.tcp.gateway.device.UnknownTcpDeviceSession;
import org.jetlinks.community.network.tcp.server.TcpServer;
import org.jetlinks.community.utils.TimeUtils;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceProductOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.FromDeviceMessageContext;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.ClientConnection;
import org.jetlinks.core.server.DeviceGatewayContext;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.trace.DeviceTracer;
import org.jetlinks.core.trace.MonoTracer;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

class TcpServerDeviceGateway
extends AbstractDeviceGateway
implements DeviceGateway,
MonitorSupportDeviceGateway {
    private static final Logger log = LoggerFactory.getLogger(TcpServerDeviceGateway.class);
    final TcpServer tcpServer;
    Mono<ProtocolSupport> protocol;
    private final DeviceRegistry registry;
    private final DeviceSessionManager sessionManager;
    private final LongAdder counter = new LongAdder();
    private final AtomicBoolean started = new AtomicBoolean();
    private Disposable disposable;
    private final DeviceGatewayHelper helper;
    private Duration connectCheckTimeout = TimeUtils.parse((String)System.getProperty("gateway.tcp.network.connect-check-timeout", "10s"));

    public TcpServerDeviceGateway(String id, Mono<ProtocolSupport> protocol, DeviceRegistry deviceRegistry, DecodedClientMessageHandler clientMessageHandler, DeviceSessionManager sessionManager, TcpServer tcpServer) {
        super(id);
        this.protocol = protocol;
        this.registry = deviceRegistry;
        this.tcpServer = tcpServer;
        this.sessionManager = sessionManager;
        this.helper = new DeviceGatewayHelper(this.registry, sessionManager, clientMessageHandler);
    }

    public Mono<ProtocolSupport> getProtocol() {
        return this.protocol;
    }

    public long totalConnection() {
        return this.counter.sum();
    }

    public Transport getTransport() {
        return DefaultTransport.TCP;
    }

    public NetworkType getNetworkType() {
        return DefaultNetworkType.TCP_SERVER;
    }

    private void doStart() {
        if (this.started.getAndSet(true) || this.disposable != null) {
            return;
        }
        this.disposable = this.tcpServer.handleConnection().publishOn(Schedulers.parallel()).flatMap(client -> new TcpConnection((TcpClient)client).accept().onErrorResume(err -> {
            log.error("handle tcp client[{}] error", (Object)client.getRemoteAddress(), err);
            return Mono.empty();
        }), Integer.MAX_VALUE).contextWrite(ReactiveLogger.start((String)"network", (String)this.tcpServer.getId())).subscribe(ignore -> {}, error -> log.error(error.getMessage(), error));
    }

    protected Mono<Void> doStartup() {
        return Mono.fromRunnable(this::doStart);
    }

    protected Mono<Void> doShutdown() {
        return Mono.fromRunnable(() -> {
            if (null != this.disposable) {
                this.disposable.dispose();
                this.disposable = null;
            }
        });
    }

    class TcpConnection
    implements DeviceGatewayContext {
        final TcpClient client;
        final AtomicReference<DeviceSession> sessionRef = new AtomicReference();
        final InetSocketAddress address;
        Disposable legalityChecker;

        TcpConnection(TcpClient client) {
            this.client = client;
            this.address = client.getRemoteAddress();
            TcpServerDeviceGateway.this.monitor.totalConnection(TcpServerDeviceGateway.this.counter.sum());
            client.onDisconnect(() -> {
                TcpServerDeviceGateway.this.counter.decrement();
                TcpServerDeviceGateway.this.monitor.disconnected();
                TcpServerDeviceGateway.this.monitor.totalConnection(TcpServerDeviceGateway.this.counter.sum());
                DeviceSession session = this.sessionRef.get();
                if (session.getDeviceId() != null) {
                    TcpServerDeviceGateway.this.sessionManager.getSession(session.getDeviceId()).subscribe();
                }
            });
            TcpServerDeviceGateway.this.monitor.connected();
            this.sessionRef.set(new UnknownTcpDeviceSession(client.getId(), client, TcpServerDeviceGateway.this.getTransport(), TcpServerDeviceGateway.this.monitor));
            this.legalityChecker = Schedulers.parallel().schedule(this::checkLegality, TcpServerDeviceGateway.this.connectCheckTimeout.toMillis(), TimeUnit.MILLISECONDS);
        }

        public void checkLegality() {
            if (this.sessionRef.get() instanceof UnknownTcpDeviceSession) {
                log.info("tcp [{}] connection is illegal, close it.", (Object)this.address);
                try {
                    this.client.disconnect();
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
        }

        Mono<Void> accept() {
            return TcpServerDeviceGateway.this.getProtocol().flatMap(protocol -> protocol.onClientConnect(TcpServerDeviceGateway.this.getTransport(), (ClientConnection)this.client, (DeviceGatewayContext)this)).then(this.client.subscribe().filter(tcp -> TcpServerDeviceGateway.this.started.get()).flatMap(this::handleTcpMessage).onErrorResume(err -> {
                log.error(err.getMessage(), err);
                this.client.shutdown();
                return Mono.empty();
            }).then()).doOnCancel(() -> ((TcpClient)this.client).shutdown());
        }

        Mono<Void> handleTcpMessage(TcpMessage message) {
            long time = System.nanoTime();
            return TcpServerDeviceGateway.this.getProtocol().flatMap(pt -> pt.getMessageCodec(TcpServerDeviceGateway.this.getTransport())).flatMapMany(codec -> codec.decode((MessageDecodeContext)FromDeviceMessageContext.of((DeviceSession)this.sessionRef.get(), (EncodedMessage)message, (DeviceRegistry)TcpServerDeviceGateway.this.registry))).cast(DeviceMessage.class).flatMap(msg -> (Mono)this.handleDeviceMessage((DeviceMessage)msg).as((Function)MonoTracer.create((String)DeviceTracer.SpanName.decode((String)msg.getDeviceId()), builder -> {
                builder.setAttribute(DeviceTracer.SpanKey.message, (Object)msg.toString());
                builder.setStartTimestamp(time, TimeUnit.NANOSECONDS);
            }))).doOnEach(ReactiveLogger.onError(err -> log.error("Handle TCP[{}] message failed:\n{}", new Object[]{this.address, message, err}))).onErrorResume(err -> Mono.fromRunnable(this.client::reset)).subscribeOn(Schedulers.parallel()).then();
        }

        Mono<DeviceMessage> handleDeviceMessage(DeviceMessage message) {
            Disposable checker = this.legalityChecker;
            if (checker != null) {
                checker.dispose();
                this.legalityChecker = null;
            }
            TcpServerDeviceGateway.this.monitor.receivedMessage();
            return TcpServerDeviceGateway.this.helper.handleDeviceMessage(message, device -> new TcpDeviceSession((DeviceOperator)device, this.client, TcpServerDeviceGateway.this.getTransport(), TcpServerDeviceGateway.this.monitor), session -> {
                TcpDeviceSession deviceSession = (TcpDeviceSession)session.unwrap(TcpDeviceSession.class);
                deviceSession.setClient(this.client);
                this.sessionRef.set(deviceSession);
            }, () -> log.warn("TCP{}: The device[{}] in the message body does not exist:{}", new Object[]{this.address, message.getDeviceId(), message})).thenReturn((Object)message);
        }

        public Mono<DeviceOperator> getDevice(String deviceId) {
            return TcpServerDeviceGateway.this.registry.getDevice(deviceId);
        }

        public Mono<DeviceProductOperator> getProduct(String productId) {
            return TcpServerDeviceGateway.this.registry.getProduct(productId);
        }

        public Mono<Void> onMessage(DeviceMessage message) {
            return this.handleDeviceMessage(message).then();
        }
    }
}

