package org.jetlinks.simulator.core.network.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import java.io.File;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.jetlinks.simulator.core.Connection;
import org.jetlinks.simulator.core.ExceptionUtils;
import org.jetlinks.simulator.core.network.AbstractConnection;
import org.jetlinks.simulator.core.network.Address;
import org.jetlinks.simulator.core.network.AddressManager;
import org.jetlinks.simulator.core.network.NetworkType;
import org.jetlinks.simulator.core.network.NetworkUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/simulator/core/network/tcp/TcpClient.class */
public class TcpClient extends AbstractConnection {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TcpClient.class);
    static final AtomicLong count = new AtomicLong();
    public static final String ATTR_ADDRESS = "address";
    private final String id;
    private final NetSocket socket;
    private final Address address;
    private final List<Consumer<Buffer>> bufferHandlers = new CopyOnWriteArrayList();
    private Consumer<Buffer> parser;

    /* JADX WARN: Type inference failed for: r0v8, types: [io.vertx.core.net.NetSocket] */
    public TcpClient(String str, NetSocket netSocket, Address address) {
        this.id = str;
        this.socket = netSocket;
        this.address = address;
        attribute(ATTR_ADDRESS, netSocket.localAddress().host() + ":" + netSocket.localAddress().port());
        this.socket.handler2(buffer -> {
            received(buffer.length());
            this.parser.accept(buffer);
        }).mo1809closeHandler(r4 -> {
            changeState(Connection.State.closed);
        });
        changeState(Connection.State.connected);
    }

    private void handleBuffer(Buffer buffer) {
        Iterator<Consumer<Buffer>> it = this.bufferHandlers.iterator();
        while (it.hasNext()) {
            try {
                it.next().accept(buffer);
            } catch (Throwable th) {
                log.warn("handle socket error :{}", ExceptionUtils.getErrorMessage(th));
            }
        }
    }

    public static Mono<TcpClient> connect(Vertx vertx, TcpOptions tcpOptions) {
        Address takeAddress = AddressManager.global().takeAddress(tcpOptions.getLocalAddress());
        tcpOptions.setLocalAddress(takeAddress.getAddress().getHostAddress());
        return Mono.create(monoSink -> {
            Future<U> map = vertx.createNetClient(tcpOptions).connect(tcpOptions.getPort(), tcpOptions.getHost()).map(netSocket -> {
                try {
                    TcpClient tcpClient = new TcpClient(tcpOptions.getId() == null ? "tcp-client-" + count.incrementAndGet() : tcpOptions.getId(), netSocket, takeAddress);
                    tcpClient.getClass();
                    tcpClient.parser = tcpOptions.createParser(tcpClient::handleBuffer);
                    monoSink.success(tcpClient);
                } catch (Throwable th) {
                    monoSink.error(th);
                    netSocket.close();
                }
                return netSocket;
            });
            monoSink.getClass();
            map.onFailure(monoSink::error);
        }).doOnError(th -> {
            takeAddress.release();
        });
    }

    @Override // org.jetlinks.simulator.core.Connection
    public String getId() {
        return this.id;
    }

    public TcpClient handlePayload(Consumer<Buffer> consumer) {
        this.bufferHandlers.add(consumer);
        return this;
    }

    public void send(Object obj) {
        sendAsync(obj).subscribe(r1 -> {
        }, th -> {
            log.warn("send tcp [{}] error:{}", this.id, ExceptionUtils.getErrorMessage(th));
        });
    }

    public void sendFile(String str) {
        sendFileAsync(new File(str)).subscribe(r1 -> {
        }, th -> {
            log.warn("send tcp file [{}] error:{}", this.id, ExceptionUtils.getErrorMessage(th));
        });
    }

    public Mono<Void> sendAsync(Object obj) {
        return Mono.create(monoSink -> {
            ByteBuf castToByteBuf = NetworkUtils.castToByteBuf(obj);
            Buffer buffer = Buffer.buffer(castToByteBuf);
            int length = buffer.length();
            this.socket.write(buffer, asyncResult -> {
                try {
                    if (asyncResult.succeeded()) {
                        sent(length);
                        monoSink.success();
                    } else {
                        monoSink.error(asyncResult.cause());
                    }
                } finally {
                    ReferenceCountUtil.safeRelease(castToByteBuf);
                }
            });
        }).doOnError(this::error);
    }

    public Mono<Void> sendFileAsync(String str) {
        return sendFileAsync(new File(str));
    }

    public Mono<Void> sendFileAsync(File file) {
        return Mono.create(monoSink -> {
            long length = file.length();
            this.socket.sendFile(file.getAbsolutePath(), asyncResult -> {
                if (!asyncResult.succeeded()) {
                    monoSink.error(asyncResult.cause());
                } else {
                    sent((int) length);
                    monoSink.success();
                }
            });
        }).doOnError(this::error);
    }

    @Override // org.jetlinks.simulator.core.Connection
    public NetworkType getType() {
        return NetworkType.tcp_client;
    }

    @Override // org.jetlinks.simulator.core.Connection
    public boolean isAlive() {
        return state() != Connection.State.closed;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.jetlinks.simulator.core.network.AbstractConnection
    public void doDisposed() {
        this.address.release();
        try {
            this.socket.close();
        } catch (Throwable th) {
        }
        super.doDisposed();
    }

    @Override // org.jetlinks.simulator.core.Connection
    public void reset() {
        super.reset();
        this.bufferHandlers.clear();
    }
}
