package org.jetlinks.simulator.core.network.udp;

import io.netty.buffer.ByteBuf;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.datagram.DatagramPacket;
import io.vertx.core.datagram.DatagramSocket;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import org.jetlinks.simulator.core.Connection;
import org.jetlinks.simulator.core.Global;
import org.jetlinks.simulator.core.network.AbstractConnection;
import org.jetlinks.simulator.core.network.Address;
import org.jetlinks.simulator.core.network.AddressManager;
import org.jetlinks.simulator.core.network.NetworkType;
import org.jetlinks.simulator.core.network.NetworkUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/simulator/core/network/udp/UDPClient.class */
public class UDPClient extends AbstractConnection {
    private final String id;
    private final DatagramSocket socket;
    private final InetSocketAddress remote;
    private final Address address;
    private final List<Consumer<DatagramPacket>> handlers = new CopyOnWriteArrayList();

    public UDPClient(String str, DatagramSocket datagramSocket, InetSocketAddress inetSocketAddress, Address address) {
        this.id = str;
        this.socket = datagramSocket;
        this.remote = inetSocketAddress;
        this.address = address;
        changeState(Connection.State.connected);
        datagramSocket.handler2(datagramPacket -> {
            received(datagramPacket.data().length());
            Iterator<Consumer<DatagramPacket>> it = this.handlers.iterator();
            while (it.hasNext()) {
                it.next().accept(datagramPacket);
            }
        });
    }

    public InetSocketAddress getRemote() {
        return this.remote;
    }

    public InetSocketAddress getLocal() {
        return InetSocketAddress.createUnresolved(this.socket.localAddress().host(), this.socket.localAddress().port());
    }

    public static Mono<UDPClient> create(UDPOptions uDPOptions) {
        Address takeAddress = AddressManager.global().takeAddress(uDPOptions.getLocalAddress());
        uDPOptions.setLocalAddress(takeAddress.getAddress().getHostAddress());
        return Mono.fromCompletionStage(() -> {
            uDPOptions.setReusePort(true);
            return Global.vertx().createDatagramSocket(uDPOptions).listen(0, uDPOptions.getLocalAddress()).map(datagramSocket -> {
                return new UDPClient(uDPOptions.getId(), datagramSocket, InetSocketAddress.createUnresolved(uDPOptions.getHost(), uDPOptions.getPort()), takeAddress);
            }).toCompletionStage();
        });
    }

    @Override // org.jetlinks.simulator.core.Connection
    public String getId() {
        return this.id;
    }

    public void send(Object obj) {
        sendAsync(obj).subscribe();
    }

    public Disposable handlePayload(Consumer<Buffer> consumer) {
        return handle(datagramPacket -> {
            consumer.accept(datagramPacket.data());
        });
    }

    public Disposable handle(Consumer<DatagramPacket> consumer) {
        this.handlers.add(consumer);
        return () -> {
            this.handlers.remove(consumer);
        };
    }

    @Override // org.jetlinks.simulator.core.Connection
    public void reset() {
        super.reset();
        this.handlers.clear();
    }

    public Mono<Void> sendAsync(String str, int i, Object obj) {
        ByteBuf castToByteBuf = NetworkUtils.castToByteBuf(obj);
        int length = Buffer.buffer(castToByteBuf).length();
        return Mono.fromCompletionStage(() -> {
            return this.socket.send(Buffer.buffer(castToByteBuf), i, str).toCompletionStage();
        }).doAfterTerminate(() -> {
            sent(length);
            ReferenceCountUtil.safeRelease(castToByteBuf);
        }).doOnError(this::error);
    }

    public Mono<Void> sendAsync(Object obj) {
        return sendAsync(this.remote.getHostString(), this.remote.getPort(), obj);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.jetlinks.simulator.core.network.AbstractConnection
    public void doDisposed() {
        this.address.release();
        this.socket.close();
        super.doDisposed();
    }

    @Override // org.jetlinks.simulator.core.Connection
    public NetworkType getType() {
        return NetworkType.udp_client;
    }

    @Override // org.jetlinks.simulator.core.Connection
    public boolean isAlive() {
        return true;
    }
}
