package org.jetlinks.community.network.http.server.vertx;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import org.jetlinks.community.network.DefaultNetworkType;
import org.jetlinks.community.network.NetworkType;
import org.jetlinks.community.network.http.server.HttpExchange;
import org.jetlinks.community.network.http.server.HttpServer;
import org.jetlinks.community.network.http.server.WebSocketExchange;
import org.jetlinks.core.topic.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/community/network/http/server/vertx/VertxHttpServer.class */
public class VertxHttpServer implements HttpServer {
    private static final Logger log = LoggerFactory.getLogger(VertxHttpServer.class);
    private Collection<io.vertx.core.http.HttpServer> httpServers;
    private HttpServerConfig config;
    private String id;
    private final Topic<FluxSink<HttpExchange>> route = Topic.createRoot();
    private final Topic<FluxSink<WebSocketExchange>> websocketRoute = Topic.createRoot();
    private String lastError;
    private InetSocketAddress bindAddress;

    public VertxHttpServer(HttpServerConfig httpServerConfig) {
        this.config = httpServerConfig;
        this.id = httpServerConfig.getId();
    }

    public InetSocketAddress getBindAddress() {
        return this.bindAddress;
    }

    public void setHttpServers(Collection<io.vertx.core.http.HttpServer> collection) {
        if (isAlive()) {
            shutdown();
        }
        this.httpServers = collection;
        for (io.vertx.core.http.HttpServer httpServer : this.httpServers) {
            httpServer.webSocketHandler(serverWebSocket -> {
                serverWebSocket.exceptionHandler(th -> {
                    log.error(th.getMessage(), th);
                });
                String path = serverWebSocket.path();
                if (path.endsWith("/")) {
                    path = path.substring(0, path.length() - 1);
                }
                VertxWebSocketExchange vertxWebSocketExchange = new VertxWebSocketExchange(serverWebSocket);
                this.websocketRoute.findTopic("/ws" + path).flatMapIterable((v0) -> {
                    return v0.getSubscribers();
                }).doOnNext(fluxSink -> {
                    fluxSink.next(vertxWebSocketExchange);
                }).switchIfEmpty(Mono.fromRunnable(() -> {
                    log.warn("http server no handler for:[{}://{}{}]", new Object[]{serverWebSocket.scheme(), serverWebSocket.host(), serverWebSocket.path()});
                    serverWebSocket.reject(404);
                })).subscribe();
            }).requestHandler(httpServerRequest -> {
                httpServerRequest.exceptionHandler(th -> {
                    log.error(th.getMessage(), th);
                });
                VertxHttpExchange vertxHttpExchange = new VertxHttpExchange(httpServerRequest, this.config);
                String url = vertxHttpExchange.getUrl();
                if (url.endsWith("/")) {
                    url = url.substring(0, url.length() - 1);
                }
                this.route.findTopic("/" + vertxHttpExchange.request().getMethod().name().toLowerCase() + url).flatMapIterable((v0) -> {
                    return v0.getSubscribers();
                }).doOnNext(fluxSink -> {
                    fluxSink.next(vertxHttpExchange);
                }).switchIfEmpty(Mono.fromRunnable(() -> {
                    log.warn("http server no handler for:[{} {}://{}{}]", new Object[]{httpServerRequest.method(), httpServerRequest.scheme(), httpServerRequest.host(), httpServerRequest.path()});
                    httpServerRequest.response().setStatusCode(HttpStatus.NOT_FOUND.value()).end();
                })).subscribe();
            });
            httpServer.exceptionHandler(th -> {
                log.error(th.getMessage(), th);
            });
        }
    }

    @Override // org.jetlinks.community.network.http.server.HttpServer
    public Flux<HttpExchange> handleRequest() {
        return handleRequest("*", "/**");
    }

    @Override // org.jetlinks.community.network.http.server.HttpServer
    public Flux<WebSocketExchange> handleWebsocket(String str) {
        return createRoute(this.websocketRoute, "ws", str);
    }

    @Override // org.jetlinks.community.network.http.server.HttpServer
    public Flux<HttpExchange> handleRequest(String str, String... strArr) {
        return createRoute(this.route, str, strArr);
    }

    private <T> Flux<T> createRoute(Topic<FluxSink<T>> topic, String str, String... strArr) {
        return Flux.create(fluxSink -> {
            Disposable.Composite composite = Disposables.composite();
            for (String str2 : strArr) {
                String str3 = (String) Stream.of((Object[]) str2.split("/")).map(str4 -> {
                    return (str4.startsWith("{") && str4.endsWith("}")) ? "*" : str4;
                }).collect(Collectors.joining("/"));
                if (str3.endsWith("/")) {
                    str3 = str3.substring(0, str3.length() - 1);
                }
                if (!str3.startsWith("/")) {
                    str3 = "/".concat(str3);
                }
                String str5 = "/" + str + str3;
                log.debug("handle http request : {}", str5);
                Topic append = topic.append(str5);
                append.subscribe(new FluxSink[]{fluxSink});
                composite.add(() -> {
                    append.unsubscribe(new FluxSink[]{fluxSink});
                });
            }
            fluxSink.onDispose(composite);
        });
    }

    @Generated
    public String getId() {
        return this.id;
    }

    @Generated
    public NetworkType getType() {
        return DefaultNetworkType.HTTP_SERVER;
    }

    @Override // org.jetlinks.community.network.http.server.HttpServer
    public void shutdown() {
        if (this.httpServers != null) {
            for (io.vertx.core.http.HttpServer httpServer : this.httpServers) {
                httpServer.close(asyncResult -> {
                    if (asyncResult.failed()) {
                        log.error(asyncResult.cause().getMessage(), asyncResult.cause());
                    } else {
                        log.debug("http server [{}] closed", Integer.valueOf(httpServer.actualPort()));
                    }
                });
            }
            this.httpServers.clear();
            this.httpServers = null;
        }
    }

    public boolean isAlive() {
        return (this.httpServers == null || this.httpServers.isEmpty()) ? false : true;
    }

    public boolean isAutoReload() {
        return false;
    }

    public void setConfig(HttpServerConfig httpServerConfig) {
        this.config = httpServerConfig;
    }

    public void setId(String str) {
        this.id = str;
    }

    public VertxHttpServer(Collection<io.vertx.core.http.HttpServer> collection, HttpServerConfig httpServerConfig, String str, String str2, InetSocketAddress inetSocketAddress) {
        this.httpServers = collection;
        this.config = httpServerConfig;
        this.id = str;
        this.lastError = str2;
        this.bindAddress = inetSocketAddress;
    }

    public VertxHttpServer() {
    }

    public String getLastError() {
        return this.lastError;
    }

    public void setLastError(String str) {
        this.lastError = str;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setBindAddress(InetSocketAddress inetSocketAddress) {
        this.bindAddress = inetSocketAddress;
    }
}
