/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.simulator.core.network.tcp;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import java.io.File;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.jetlinks.simulator.core.Connection;
import org.jetlinks.simulator.core.ExceptionUtils;
import org.jetlinks.simulator.core.network.AbstractConnection;
import org.jetlinks.simulator.core.network.Address;
import org.jetlinks.simulator.core.network.AddressManager;
import org.jetlinks.simulator.core.network.NetworkType;
import org.jetlinks.simulator.core.network.NetworkUtils;
import org.jetlinks.simulator.core.network.tcp.TcpOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class TcpClient
extends AbstractConnection {
    private static final Logger log = LoggerFactory.getLogger(TcpClient.class);
    static final AtomicLong count = new AtomicLong();
    public static final String ATTR_ADDRESS = "address";
    private final String id;
    private final NetSocket socket;
    private final Address address;
    private final List<Consumer<Buffer>> bufferHandlers = new CopyOnWriteArrayList<Consumer<Buffer>>();
    private Consumer<Buffer> parser;

    public TcpClient(String id, NetSocket socket, Address address) {
        this.id = id;
        this.socket = socket;
        this.address = address;
        this.attribute(ATTR_ADDRESS, socket.localAddress().host() + ":" + socket.localAddress().port());
        this.socket.handler(buffer -> {
            this.received(buffer.length());
            this.parser.accept((Buffer)buffer);
        }).closeHandler(ignore -> this.changeState(Connection.State.closed));
        this.changeState(Connection.State.connected);
    }

    private void handleBuffer(Buffer buffer) {
        for (Consumer<Buffer> bufferHandler : this.bufferHandlers) {
            try {
                bufferHandler.accept(buffer);
            }
            catch (Throwable e) {
                log.warn("handle socket error :{}", (Object)ExceptionUtils.getErrorMessage(e));
            }
        }
    }

    public static Mono<TcpClient> connect(Vertx vertx, TcpOptions tcpOptions) {
        Address addr = AddressManager.global().takeAddress(tcpOptions.getLocalAddress());
        tcpOptions.setLocalAddress(addr.getAddress().getHostAddress());
        return Mono.create(sink -> vertx.createNetClient(tcpOptions).connect(tcpOptions.getPort(), tcpOptions.getHost()).map(socket -> {
            try {
                String id = tcpOptions.getId() == null ? "tcp-client-" + count.incrementAndGet() : tcpOptions.getId();
                TcpClient client = new TcpClient(id, (NetSocket)socket, addr);
                client.parser = tcpOptions.createParser(client::handleBuffer);
                sink.success(client);
            }
            catch (Throwable e) {
                sink.error(e);
                socket.close();
            }
            return socket;
        }).onFailure(sink::error)).doOnError(err -> addr.release());
    }

    @Override
    public String getId() {
        return this.id;
    }

    public TcpClient handlePayload(Consumer<Buffer> buffer) {
        this.bufferHandlers.add(buffer);
        return this;
    }

    public void send(Object data) {
        this.sendAsync(data).subscribe(ignore -> {}, error -> log.warn("send tcp [{}] error:{}", (Object)this.id, (Object)ExceptionUtils.getErrorMessage(error)));
    }

    public void sendFile(String data) {
        this.sendFileAsync(new File(data)).subscribe(ignore -> {}, error -> log.warn("send tcp file [{}] error:{}", (Object)this.id, (Object)ExceptionUtils.getErrorMessage(error)));
    }

    public Mono<Void> sendAsync(Object data) {
        return Mono.create(sink -> {
            ByteBuf buf = NetworkUtils.castToByteBuf(data);
            Buffer buffer = Buffer.buffer(buf);
            int len = buffer.length();
            this.socket.write(buffer, res -> {
                try {
                    if (res.succeeded()) {
                        this.sent(len);
                        sink.success();
                    } else {
                        sink.error(res.cause());
                    }
                }
                finally {
                    ReferenceCountUtil.safeRelease(buf);
                }
            });
        }).doOnError(this::error);
    }

    public Mono<Void> sendFileAsync(String file) {
        return this.sendFileAsync(new File(file));
    }

    public Mono<Void> sendFileAsync(File file) {
        return Mono.create(sink -> {
            long len = file.length();
            this.socket.sendFile(file.getAbsolutePath(), res -> {
                if (res.succeeded()) {
                    this.sent((int)len);
                    sink.success();
                } else {
                    sink.error(res.cause());
                }
            });
        }).doOnError(this::error);
    }

    @Override
    public NetworkType getType() {
        return NetworkType.tcp_client;
    }

    @Override
    public boolean isAlive() {
        return this.state() != Connection.State.closed;
    }

    @Override
    protected void doDisposed() {
        this.address.release();
        try {
            this.socket.close();
        }
        catch (Throwable throwable) {
            // empty catch block
        }
        super.doDisposed();
    }

    @Override
    public void reset() {
        super.reset();
        this.bufferHandlers.clear();
    }
}

