package org.jetlinks.community.network.http.server.vertx;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClosedException;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.net.SocketAddress;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.network.http.server.WebSocketExchange;
import org.jetlinks.core.message.codec.http.Header;
import org.jetlinks.core.message.codec.http.websocket.DefaultWebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketSessionMessageWrapper;
import org.jetlinks.core.utils.Reactors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:org/jetlinks/community/network/http/server/vertx/VertxWebSocketExchange.class */
public class VertxWebSocketExchange implements WebSocketExchange {
    private static final Logger log = LoggerFactory.getLogger(VertxWebSocketExchange.class);
    private final ServerWebSocket serverWebSocket;
    private final InetSocketAddress address;
    private final Sinks.Many<WebSocketMessage> sink = Reactors.createMany();
    private final Map<String, Object> attributes = new ConcurrentHashMap();
    private long keepAliveTimeOutMs = Duration.ofHours(1).toMillis();
    private long lastKeepAliveTime = System.currentTimeMillis();
    private final List<Runnable> closeHandler = new CopyOnWriteArrayList();
    private final String id;

    /* renamed from: org.jetlinks.community.network.http.server.vertx.VertxWebSocketExchange$1, reason: invalid class name */
    /* loaded from: input_file:org/jetlinks/community/network/http/server/vertx/VertxWebSocketExchange$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$jetlinks$core$message$codec$http$websocket$WebSocketMessage$Type = new int[WebSocketMessage.Type.values().length];

        static {
            try {
                $SwitchMap$org$jetlinks$core$message$codec$http$websocket$WebSocketMessage$Type[WebSocketMessage.Type.TEXT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$jetlinks$core$message$codec$http$websocket$WebSocketMessage$Type[WebSocketMessage.Type.BINARY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$jetlinks$core$message$codec$http$websocket$WebSocketMessage$Type[WebSocketMessage.Type.PING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public VertxWebSocketExchange(ServerWebSocket serverWebSocket) {
        this.serverWebSocket = serverWebSocket;
        doReceived();
        SocketAddress remoteAddress = serverWebSocket.remoteAddress();
        this.address = new InetSocketAddress(remoteAddress.host(), remoteAddress.port());
        this.id = (String) IDGenerator.RANDOM.generate();
    }

    public Optional<InetSocketAddress> getRemoteAddress() {
        return Optional.of(this.address);
    }

    private void doReceived() {
        this.serverWebSocket.textMessageHandler(str -> {
            handle(textMessage(str));
        }).binaryMessageHandler(buffer -> {
            handle(binaryMessage(buffer.getByteBuf()));
        }).pongHandler(buffer2 -> {
            handle(pongMessage(buffer2.getByteBuf()));
        }).closeHandler(r3 -> {
            doClose();
        }).exceptionHandler(th -> {
            if (th instanceof HttpClosedException) {
                return;
            }
            log.error(th.getMessage(), th);
        });
    }

    private void doClose() {
        this.sink.emitComplete(Reactors.emitFailureHandler());
        Iterator<Runnable> it = this.closeHandler.iterator();
        while (it.hasNext()) {
            it.next().run();
        }
        this.closeHandler.clear();
    }

    private void handle(WebSocketMessage webSocketMessage) {
        this.lastKeepAliveTime = System.currentTimeMillis();
        if (this.sink.currentSubscriberCount() > 0) {
            this.sink.emitNext(WebSocketSessionMessageWrapper.of(webSocketMessage, this), Reactors.emitFailureHandler());
        } else {
            log.warn("websocket client[{}] session no handler", this.address);
        }
    }

    public String getUri() {
        return this.serverWebSocket.uri();
    }

    @Nonnull
    public List<Header> getHeaders() {
        return (List) this.serverWebSocket.headers().entries().stream().map(entry -> {
            Header header = new Header();
            header.setName((String) entry.getKey());
            header.setValue(new String[]{(String) entry.getValue()});
            return header;
        }).collect(Collectors.toList());
    }

    public Optional<Header> getHeader(String str) {
        return Optional.ofNullable(this.serverWebSocket.headers().getAll(str)).map(list -> {
            return new Header(str, (String[]) list.toArray(new String[0]));
        });
    }

    public Mono<Void> close() {
        if (this.serverWebSocket.isClosed()) {
            return Mono.empty();
        }
        ServerWebSocket serverWebSocket = this.serverWebSocket;
        serverWebSocket.getClass();
        return Mono.fromRunnable(serverWebSocket::close);
    }

    public Mono<Void> close(int i) {
        return close(i, "Closed");
    }

    public Mono<Void> close(int i, String str) {
        return this.serverWebSocket.isClosed() ? Mono.empty() : Mono.defer(() -> {
            return Mono.fromCompletionStage(this.serverWebSocket.close(WebSocketCloseStatus.isValidStatusCode(i) ? (short) i : (short) WebSocketCloseStatus.BAD_GATEWAY.code(), str).toCompletionStage());
        });
    }

    public Map<String, Object> getAttributes() {
        return this.attributes;
    }

    public Optional<Object> getAttribute(String str) {
        return Optional.ofNullable(this.attributes.get(str));
    }

    public void setAttribute(String str, Object obj) {
        this.attributes.put(str, obj);
    }

    public Flux<WebSocketMessage> receive() {
        return this.sink.asFlux();
    }

    public Mono<Void> send(WebSocketMessage webSocketMessage) {
        ByteBuf payload = webSocketMessage.getPayload();
        return doWrite(handler -> {
            switch (AnonymousClass1.$SwitchMap$org$jetlinks$core$message$codec$http$websocket$WebSocketMessage$Type[webSocketMessage.getType().ordinal()]) {
                case 1:
                    this.serverWebSocket.writeTextMessage(webSocketMessage.payloadAsString(), handler);
                    return;
                case 2:
                    this.serverWebSocket.writeBinaryMessage(Buffer.buffer(payload), handler);
                    return;
                case 3:
                    this.serverWebSocket.writePing(Buffer.buffer(payload));
                    handler.handle(Future.succeededFuture());
                    return;
                default:
                    throw new UnsupportedOperationException("unsupported message type" + webSocketMessage.getType());
            }
        }).doAfterTerminate(() -> {
            ReferenceCountUtil.safeRelease(payload);
        });
    }

    protected Mono<Void> doWrite(Consumer<Handler<AsyncResult<Void>>> consumer) {
        this.lastKeepAliveTime = System.currentTimeMillis();
        return Mono.create(monoSink -> {
            try {
                consumer.accept(asyncResult -> {
                    if (asyncResult.succeeded()) {
                        monoSink.success();
                    } else {
                        monoSink.error(asyncResult.cause());
                    }
                });
            } catch (Throwable th) {
                monoSink.error(th);
            }
        });
    }

    public WebSocketMessage textMessage(String str) {
        return DefaultWebSocketMessage.of(WebSocketMessage.Type.TEXT, Unpooled.wrappedBuffer(str.getBytes()));
    }

    public WebSocketMessage binaryMessage(ByteBuf byteBuf) {
        return DefaultWebSocketMessage.of(WebSocketMessage.Type.BINARY, byteBuf);
    }

    public WebSocketMessage pingMessage(ByteBuf byteBuf) {
        return DefaultWebSocketMessage.of(WebSocketMessage.Type.PING, byteBuf);
    }

    public WebSocketMessage pongMessage(ByteBuf byteBuf) {
        return DefaultWebSocketMessage.of(WebSocketMessage.Type.PONG, byteBuf);
    }

    @Override // org.jetlinks.community.network.http.server.WebSocketExchange
    public boolean isAlive() {
        return !this.serverWebSocket.isClosed() && (this.keepAliveTimeOutMs <= 0 || System.currentTimeMillis() - this.lastKeepAliveTime < this.keepAliveTimeOutMs);
    }

    @Override // org.jetlinks.community.network.http.server.WebSocketExchange
    public void setKeepAliveTimeout(Duration duration) {
        this.keepAliveTimeOutMs = duration.toMillis();
        this.lastKeepAliveTime = System.currentTimeMillis();
    }

    @Override // org.jetlinks.community.network.http.server.WebSocketExchange
    public void closeHandler(Runnable runnable) {
        this.closeHandler.add(runnable);
    }

    @Override // org.jetlinks.community.network.http.server.WebSocketExchange
    public long getLastKeepAliveTime() {
        return this.lastKeepAliveTime;
    }

    @Override // org.jetlinks.community.network.http.server.WebSocketExchange
    public String getId() {
        return this.id;
    }
}
