package org.jetlinks.community.network.tcp.client;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.SocketAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.commons.codec.binary.Hex;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.tcp.TcpMessage;
import org.jetlinks.community.network.tcp.parser.PayloadParser;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.utils.Reactors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/jetlinks/community/network/tcp/client/VertxTcpClient.class */
public class VertxTcpClient implements TcpClient {
    private static final Logger log = LoggerFactory.getLogger(VertxTcpClient.class);
    public volatile NetClient client;
    public NetSocket socket;
    volatile PayloadParser payloadParser;
    private final String id;
    private long keepAliveTimeoutMs = Duration.ofMinutes(10).toMillis();
    private volatile long lastKeepAliveTime = System.currentTimeMillis();
    private final List<Runnable> disconnectListener = new CopyOnWriteArrayList();
    private final Sinks.Many<TcpMessage> sink = Reactors.createMany();
    private final boolean serverClient = true;

    @Override // org.jetlinks.community.network.tcp.client.TcpClient
    public void keepAlive() {
        this.lastKeepAliveTime = System.currentTimeMillis();
    }

    @Override // org.jetlinks.community.network.tcp.client.TcpClient
    public void setKeepAliveTimeout(Duration duration) {
        this.keepAliveTimeoutMs = duration.toMillis();
    }

    @Override // org.jetlinks.community.network.tcp.client.TcpClient
    public void reset() {
        if (null != this.payloadParser) {
            this.payloadParser.reset();
        }
    }

    public InetSocketAddress address() {
        return getRemoteAddress();
    }

    public Mono<Void> sendMessage(EncodedMessage encodedMessage) {
        return Mono.create(monoSink -> {
            if (this.socket == null) {
                monoSink.error(new SocketException("socket closed"));
                return;
            }
            ByteBuf payload = encodedMessage.getPayload();
            Buffer buffer = Buffer.buffer(payload);
            buffer.length();
            this.socket.write(buffer, asyncResult -> {
                ReferenceCountUtil.safeRelease(payload);
                if (!asyncResult.succeeded()) {
                    monoSink.error(asyncResult.cause());
                } else {
                    keepAlive();
                    monoSink.success();
                }
            });
        });
    }

    public Flux<EncodedMessage> receiveMessage() {
        return subscribe().cast(EncodedMessage.class);
    }

    public void disconnect() {
        shutdown();
    }

    public boolean isAlive() {
        return this.socket != null && (this.keepAliveTimeoutMs < 0 || System.currentTimeMillis() - this.lastKeepAliveTime < this.keepAliveTimeoutMs);
    }

    public boolean isAutoReload() {
        return true;
    }

    public VertxTcpClient(String str) {
        this.id = str;
    }

    protected void received(TcpMessage tcpMessage) {
        this.sink.emitNext(tcpMessage, Reactors.RETRY_NON_SERIALIZED);
    }

    @Override // org.jetlinks.community.network.tcp.client.TcpClient
    public Flux<TcpMessage> subscribe() {
        return this.sink.asFlux();
    }

    private void execute(Runnable runnable) {
        try {
            runnable.run();
        } catch (Exception e) {
            log.warn("close tcp client error", e);
        }
    }

    @Override // org.jetlinks.community.network.tcp.client.TcpClient
    public InetSocketAddress getRemoteAddress() {
        if (null == this.socket) {
            return null;
        }
        SocketAddress remoteAddress = this.socket.remoteAddress();
        return InetSocketAddress.createUnresolved(remoteAddress.host(), remoteAddress.port());
    }

    public NetworkType getType() {
        return DefaultNetworkType.TCP_CLIENT;
    }

    public void shutdown() {
        if (this.socket == null) {
            return;
        }
        log.debug("tcp client [{}] disconnect", getId());
        synchronized (this) {
            if (null != this.client) {
                NetClient netClient = this.client;
                netClient.getClass();
                execute(netClient::close);
                this.client = null;
            }
            if (null != this.socket) {
                NetSocket netSocket = this.socket;
                netSocket.getClass();
                execute(netSocket::close);
                this.socket = null;
            }
            if (null != this.payloadParser) {
                PayloadParser payloadParser = this.payloadParser;
                payloadParser.getClass();
                execute(payloadParser::close);
                this.payloadParser = null;
            }
        }
        Iterator<Runnable> it = this.disconnectListener.iterator();
        while (it.hasNext()) {
            execute(it.next());
        }
        this.disconnectListener.clear();
        if (this.serverClient) {
            this.sink.tryEmitComplete();
        }
    }

    public void setClient(NetClient netClient) {
        if (this.client != null && this.client != netClient) {
            this.client.close();
        }
        keepAlive();
        this.client = netClient;
    }

    public void setRecordParser(PayloadParser payloadParser) {
        synchronized (this) {
            if (null != this.payloadParser && this.payloadParser != payloadParser) {
                this.payloadParser.close();
            }
            this.payloadParser = payloadParser;
            this.payloadParser.handlePayload().subscribe(buffer -> {
                received(new TcpMessage(buffer.getByteBuf()));
            });
        }
    }

    public void setSocket(NetSocket netSocket) {
        synchronized (this) {
            Objects.requireNonNull(this.payloadParser);
            if (this.socket != null && this.socket != netSocket) {
                this.socket.close();
            }
            this.socket = netSocket.closeHandler(r3 -> {
                shutdown();
            }).handler(buffer -> {
                if (log.isDebugEnabled()) {
                    log.debug("handle tcp client[{}] payload:[{}]", netSocket.remoteAddress(), Hex.encodeHexString(buffer.getBytes()));
                }
                keepAlive();
                this.payloadParser.handle(buffer);
                if (this.socket != netSocket) {
                    log.warn("tcp client [{}] memory leak ", netSocket.remoteAddress());
                    netSocket.close();
                }
            });
        }
    }

    @Override // org.jetlinks.community.network.tcp.client.TcpClient
    public Mono<Boolean> send(TcpMessage tcpMessage) {
        return sendMessage(tcpMessage).thenReturn(true);
    }

    @Override // org.jetlinks.community.network.tcp.client.TcpClient
    public void onDisconnect(Runnable runnable) {
        this.disconnectListener.add(runnable);
    }

    public String getId() {
        return this.id;
    }

    public void setKeepAliveTimeoutMs(long j) {
        this.keepAliveTimeoutMs = j;
    }
}
