/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.network.http.server.vertx;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpClosedException;
import io.vertx.core.http.ServerWebSocket;
import io.vertx.core.net.SocketAddress;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.network.http.server.WebSocketExchange;
import org.jetlinks.core.message.codec.http.Header;
import org.jetlinks.core.message.codec.http.websocket.DefaultWebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketSession;
import org.jetlinks.core.message.codec.http.websocket.WebSocketSessionMessageWrapper;
import org.jetlinks.core.utils.Reactors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

public class VertxWebSocketExchange
implements WebSocketExchange {
    private static final Logger log = LoggerFactory.getLogger(VertxWebSocketExchange.class);
    private final ServerWebSocket serverWebSocket;
    private final InetSocketAddress address;
    private final Sinks.Many<WebSocketMessage> sink = Reactors.createMany();
    private final Map<String, Object> attributes = new ConcurrentHashMap<String, Object>();
    private long keepAliveTimeOutMs = Duration.ofHours(1L).toMillis();
    private long lastKeepAliveTime = System.currentTimeMillis();
    private final List<Runnable> closeHandler = new CopyOnWriteArrayList<Runnable>();
    private final String id;

    public VertxWebSocketExchange(ServerWebSocket serverWebSocket) {
        this.serverWebSocket = serverWebSocket;
        this.doReceived();
        SocketAddress socketAddress = serverWebSocket.remoteAddress();
        this.address = new InetSocketAddress(socketAddress.host(), socketAddress.port());
        this.id = (String)IDGenerator.RANDOM.generate();
    }

    public Optional<InetSocketAddress> getRemoteAddress() {
        return Optional.of(this.address);
    }

    private void doReceived() {
        this.serverWebSocket.textMessageHandler(text -> this.handle(this.textMessage((String)text))).binaryMessageHandler(msg -> this.handle(this.binaryMessage(msg.getByteBuf()))).pongHandler(buf -> this.handle(this.pongMessage(buf.getByteBuf()))).closeHandler(nil -> this.doClose()).exceptionHandler(err -> {
            if (err instanceof HttpClosedException) {
                return;
            }
            log.error(err.getMessage(), err);
        });
    }

    private void doClose() {
        this.sink.emitComplete(Reactors.emitFailureHandler());
        for (Runnable runnable : this.closeHandler) {
            runnable.run();
        }
        this.closeHandler.clear();
    }

    private void handle(WebSocketMessage message) {
        this.lastKeepAliveTime = System.currentTimeMillis();
        if (this.sink.currentSubscriberCount() > 0) {
            this.sink.emitNext((Object)WebSocketSessionMessageWrapper.of((WebSocketMessage)message, (WebSocketSession)this), Reactors.emitFailureHandler());
        } else {
            log.warn("websocket client[{}] session no handler", (Object)this.address);
        }
    }

    public String getUri() {
        return this.serverWebSocket.uri();
    }

    @Nonnull
    public List<Header> getHeaders() {
        return this.serverWebSocket.headers().entries().stream().map(entry -> {
            Header header = new Header();
            header.setName((String)entry.getKey());
            header.setValue(new String[]{(String)entry.getValue()});
            return header;
        }).collect(Collectors.toList());
    }

    public Optional<Header> getHeader(String s) {
        return Optional.ofNullable(this.serverWebSocket.headers().getAll(s)).map(list -> new Header(s, list.toArray(new String[0])));
    }

    public Mono<Void> close() {
        if (this.serverWebSocket.isClosed()) {
            return Mono.empty();
        }
        return Mono.fromRunnable(() -> ((ServerWebSocket)this.serverWebSocket).close());
    }

    public Mono<Void> close(int i) {
        return this.close(i, "Closed");
    }

    public Mono<Void> close(int status, String reason) {
        if (this.serverWebSocket.isClosed()) {
            return Mono.empty();
        }
        return Mono.defer(() -> {
            short code = WebSocketCloseStatus.isValidStatusCode((int)status) ? (short)status : (short)WebSocketCloseStatus.BAD_GATEWAY.code();
            return Mono.fromCompletionStage((CompletionStage)this.serverWebSocket.close(code, reason).toCompletionStage());
        });
    }

    public Map<String, Object> getAttributes() {
        return this.attributes;
    }

    public Optional<Object> getAttribute(String s) {
        return Optional.ofNullable(this.attributes.get(s));
    }

    public void setAttribute(String s, Object o) {
        this.attributes.put(s, o);
    }

    public Flux<WebSocketMessage> receive() {
        return this.sink.asFlux();
    }

    public Mono<Void> send(WebSocketMessage webSocketMessage) {
        ByteBuf payload = webSocketMessage.getPayload();
        return this.doWrite(handler -> {
            switch (webSocketMessage.getType()) {
                case TEXT: {
                    this.serverWebSocket.writeTextMessage(webSocketMessage.payloadAsString(), handler);
                    return;
                }
                case BINARY: {
                    this.serverWebSocket.writeBinaryMessage(Buffer.buffer((ByteBuf)payload), handler);
                    return;
                }
                case PING: {
                    this.serverWebSocket.writePing(Buffer.buffer((ByteBuf)payload));
                    handler.handle((Object)Future.succeededFuture());
                    return;
                }
            }
            throw new UnsupportedOperationException("unsupported message type" + webSocketMessage.getType());
        }).doAfterTerminate(() -> ReferenceCountUtil.safeRelease((Object)payload));
    }

    protected Mono<Void> doWrite(Consumer<Handler<AsyncResult<Void>>> handler) {
        this.lastKeepAliveTime = System.currentTimeMillis();
        return Mono.create(sink -> {
            try {
                handler.accept(result -> {
                    if (result.succeeded()) {
                        sink.success();
                    } else {
                        sink.error(result.cause());
                    }
                });
            }
            catch (Throwable e) {
                sink.error(e);
            }
        });
    }

    public WebSocketMessage textMessage(String s) {
        return DefaultWebSocketMessage.of((WebSocketMessage.Type)WebSocketMessage.Type.TEXT, (ByteBuf)Unpooled.wrappedBuffer((byte[])s.getBytes()));
    }

    public WebSocketMessage binaryMessage(ByteBuf byteBuf) {
        return DefaultWebSocketMessage.of((WebSocketMessage.Type)WebSocketMessage.Type.BINARY, (ByteBuf)byteBuf);
    }

    public WebSocketMessage pingMessage(ByteBuf byteBuf) {
        return DefaultWebSocketMessage.of((WebSocketMessage.Type)WebSocketMessage.Type.PING, (ByteBuf)byteBuf);
    }

    public WebSocketMessage pongMessage(ByteBuf byteBuf) {
        return DefaultWebSocketMessage.of((WebSocketMessage.Type)WebSocketMessage.Type.PONG, (ByteBuf)byteBuf);
    }

    @Override
    public boolean isAlive() {
        return !this.serverWebSocket.isClosed() && (this.keepAliveTimeOutMs <= 0L || System.currentTimeMillis() - this.lastKeepAliveTime < this.keepAliveTimeOutMs);
    }

    @Override
    public void setKeepAliveTimeout(Duration duration) {
        this.keepAliveTimeOutMs = duration.toMillis();
        this.lastKeepAliveTime = System.currentTimeMillis();
    }

    @Override
    public void closeHandler(Runnable handler) {
        this.closeHandler.add(handler);
    }

    @Override
    public long getLastKeepAliveTime() {
        return this.lastKeepAliveTime;
    }

    @Override
    public String getId() {
        return this.id;
    }
}

