package org.jetlinks.community.network.tcp.server;

import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetSocket;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Collection;
import java.util.Iterator;
import java.util.function.Supplier;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.tcp.client.TcpClient;
import org.jetlinks.community.network.tcp.client.VertxTcpClient;
import org.jetlinks.community.network.tcp.parser.PayloadParser;
import org.jetlinks.core.utils.Reactors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/jetlinks/community/network/tcp/server/VertxTcpServer.class */
public class VertxTcpServer implements TcpServer {
    private static final Logger log = LoggerFactory.getLogger(VertxTcpServer.class);
    Collection<NetServer> tcpServers;
    private Supplier<PayloadParser> parserSupplier;
    private final String id;
    private String lastError;
    private InetSocketAddress bind;
    private long keepAliveTimeout = Duration.ofMinutes(10).toMillis();
    private final Sinks.Many<TcpClient> sink = Reactors.createMany(Integer.MAX_VALUE, false);

    public VertxTcpServer(String str) {
        this.id = str;
    }

    @Override // org.jetlinks.community.network.tcp.server.TcpServer
    public Flux<TcpClient> handleConnection() {
        return this.sink.asFlux();
    }

    private void execute(Runnable runnable) {
        try {
            runnable.run();
        } catch (Exception e) {
            log.warn("close tcp server error", e);
        }
    }

    public InetSocketAddress getBindAddress() {
        return this.bind;
    }

    public void setParserSupplier(Supplier<PayloadParser> supplier) {
        this.parserSupplier = supplier;
    }

    public void setServer(Collection<NetServer> collection) {
        if (this.tcpServers != null && !this.tcpServers.isEmpty()) {
            shutdown();
        }
        this.tcpServers = collection;
        Iterator<NetServer> it = this.tcpServers.iterator();
        while (it.hasNext()) {
            it.next().connectHandler(this::acceptTcpConnection);
        }
    }

    protected void acceptTcpConnection(NetSocket netSocket) {
        if (this.sink.currentSubscriberCount() == 0) {
            log.warn("not handler for tcp client[{}]", netSocket.remoteAddress());
            netSocket.close();
            return;
        }
        VertxTcpClient vertxTcpClient = new VertxTcpClient(this.id + "_" + netSocket.remoteAddress());
        vertxTcpClient.setKeepAliveTimeoutMs(this.keepAliveTimeout);
        try {
            netSocket.exceptionHandler(th -> {
                log.error("tcp server client [{}] error", netSocket.remoteAddress(), th);
            });
            vertxTcpClient.setRecordParser(this.parserSupplier.get());
            vertxTcpClient.setSocket(netSocket);
            this.sink.emitNext(vertxTcpClient, Reactors.emitFailureHandler());
            log.debug("accept tcp client [{}] connection", netSocket.remoteAddress());
        } catch (Exception e) {
            log.error("create tcp server client error", e);
            vertxTcpClient.shutdown();
        }
    }

    public NetworkType getType() {
        return DefaultNetworkType.TCP_SERVER;
    }

    @Override // org.jetlinks.community.network.tcp.server.TcpServer
    public void shutdown() {
        if (null != this.tcpServers) {
            log.debug("close tcp server :[{}]", this.id);
            for (NetServer netServer : this.tcpServers) {
                netServer.getClass();
                execute(netServer::close);
            }
            this.tcpServers = null;
        }
    }

    public boolean isAlive() {
        return this.tcpServers != null;
    }

    public boolean isAutoReload() {
        return false;
    }

    public void setKeepAliveTimeout(long j) {
        this.keepAliveTimeout = j;
    }

    public String getId() {
        return this.id;
    }

    public String getLastError() {
        return this.lastError;
    }

    public void setLastError(String str) {
        this.lastError = str;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setBind(InetSocketAddress inetSocketAddress) {
        this.bind = inetSocketAddress;
    }
}
