/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.network.http.device;

import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.community.gateway.AbstractDeviceGateway;
import org.jetlinks.community.gateway.DeviceGatewayHelper;
import org.jetlinks.community.network.http.device.HttpDeviceSession;
import org.jetlinks.community.network.http.device.UnknownHttpDeviceSession;
import org.jetlinks.community.network.http.device.WebSocketDeviceSession;
import org.jetlinks.community.network.http.server.HttpExchange;
import org.jetlinks.community.network.http.server.HttpServer;
import org.jetlinks.community.network.http.server.WebSocketExchange;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.device.AuthenticationRequest;
import org.jetlinks.core.device.AuthenticationResponse;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.DeviceThingType;
import org.jetlinks.core.device.WebsocketAuthenticationRequest;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.FromDeviceMessageContext;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage;
import org.jetlinks.core.message.codec.http.websocket.WebSocketSession;
import org.jetlinks.core.route.HttpRoute;
import org.jetlinks.core.route.WebsocketRoute;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.things.ThingType;
import org.jetlinks.core.trace.MonoTracer;
import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

public class HttpServerDeviceGateway
extends AbstractDeviceGateway {
    private static final Logger log = LoggerFactory.getLogger(HttpServerDeviceGateway.class);
    final HttpServer httpServer;
    Mono<ProtocolSupport> protocol;
    private final DeviceRegistry registry;
    private final DeviceGatewayHelper helper;
    private final Map<RouteKey, Disposable> handlers = new ConcurrentHashMap<RouteKey, Disposable>();
    private final Map<String, Disposable> websocketHandlers = new ConcurrentHashMap<String, Disposable>();

    public HttpServerDeviceGateway(String id, HttpServer server, Mono<ProtocolSupport> protocol, DeviceSessionManager sessionManager, DeviceRegistry registry, DecodedClientMessageHandler messageHandler) {
        super(id);
        this.httpServer = server;
        this.registry = registry;
        this.helper = new DeviceGatewayHelper(registry, sessionManager, messageHandler);
        this.protocol = protocol;
    }

    private Disposable handleRequest(HttpMethod method, String url) {
        return this.httpServer.handleRequest(method, url).flatMap(this::handleHttpRequest, Integer.MAX_VALUE).subscribe();
    }

    private Disposable handleWebsocketRequest(String url) {
        return this.httpServer.handleWebsocket(url).flatMap(this::handleWebsocketRequest, Integer.MAX_VALUE).subscribe();
    }

    private Mono<Void> handleWebsocketRequest(WebSocketExchange exchange) {
        return this.protocol.flatMap(protocol -> protocol.authenticate((AuthenticationRequest)WebsocketAuthenticationRequest.of((WebSocketSession)exchange), this.registry).onErrorResume(err -> Mono.just((Object)AuthenticationResponse.error((int)500, (String)err.getMessage()))).flatMap(result -> {
            if (result.isSuccess()) {
                String deviceId = result.getDeviceId();
                if (StringUtils.hasText((String)deviceId)) {
                    DeviceOnlineMessage message = new DeviceOnlineMessage();
                    message.setDeviceId(deviceId);
                    return this.handleWebsocketMessage((DeviceMessage)message, exchange).flatMap(device -> exchange.receive().flatMap(msg -> this.handleWebsocketRequest(exchange, (WebSocketMessage)msg, (DeviceOperator)device)).then());
                }
                return exchange.receive().flatMap(msg -> this.handleWebsocketRequest(exchange, (WebSocketMessage)msg, null)).then();
            }
            log.warn("\u8bbe\u5907[{}] Websocket \u8ba4\u8bc1\u5931\u8d25:{}", exchange.getRemoteAddress().orElse(null), (Object)result.getMessage());
            return exchange.close(HttpStatus.UNAUTHORIZED);
        }));
    }

    private Mono<Void> handleWebsocketRequest(WebSocketExchange exchange, WebSocketMessage msg, DeviceOperator device) {
        if (!this.isStarted()) {
            return exchange.close(WebSocketCloseStatus.BAD_GATEWAY.code()).onErrorResume(error -> {
                log.error(error.getMessage(), error);
                return Mono.empty();
            });
        }
        WebSocketDeviceSession session = new WebSocketDeviceSession(device, exchange);
        return ((Mono)this.protocol.flatMap(protocol -> {
            if (log.isDebugEnabled()) {
                log.debug("\u6536\u5230HTTP\u8bf7\u6c42\n{}", (Object)msg);
            }
            return protocol.getMessageCodec((Transport)DefaultTransport.WebSocket).flatMapMany(codec -> codec.decode((MessageDecodeContext)FromDeviceMessageContext.of((DeviceSession)session, (EncodedMessage)msg, (DeviceRegistry)this.registry))).cast(DeviceMessage.class).flatMap(deviceMessage -> {
                this.monitor.receivedMessage();
                if (!StringUtils.hasText((String)deviceMessage.getDeviceId())) {
                    deviceMessage.thingId((ThingType)DeviceThingType.device, session.getDeviceId());
                }
                return this.handleWebsocketMessage((DeviceMessage)deviceMessage, exchange);
            }).doOnNext(session::setOperator).onErrorResume(err -> {
                log.error("\u5904\u7406http\u8bf7\u6c42\u5931\u8d25:\n{}", (Object)msg, err);
                return exchange.close(HttpStatus.BAD_REQUEST).then(Mono.empty());
            }).then();
        }).as((Function)MonoTracer.create((String)("http-device-gateway/" + this.getId() + exchange.getPath())))).onErrorResume(error -> {
            log.error(error.getMessage(), error);
            return exchange.close(HttpStatus.INTERNAL_SERVER_ERROR.value()).then(Mono.empty());
        });
    }

    private Mono<DeviceOperator> handleWebsocketMessage(DeviceMessage message, WebSocketExchange exchange) {
        return this.helper.handleDeviceMessage(message, device -> new WebSocketDeviceSession((DeviceOperator)device, exchange), deviceSession -> {
            if (deviceSession.isWrapFrom(WebSocketDeviceSession.class)) {
                ((WebSocketDeviceSession)deviceSession.unwrap(WebSocketDeviceSession.class)).setExchange(exchange);
            } else if (deviceSession.isWrapFrom(HttpDeviceSession.class)) {
                ((HttpDeviceSession)deviceSession.unwrap(HttpDeviceSession.class)).setWebsocket(exchange);
            }
        }, () -> exchange.close(HttpStatus.NOT_FOUND).then(Mono.empty()));
    }

    private Mono<Void> handleHttpRequest(HttpExchange exchange) {
        if (!this.isStarted()) {
            return exchange.error(HttpStatus.SERVICE_UNAVAILABLE).onErrorResume(error -> {
                log.error(error.getMessage(), error);
                return Mono.empty();
            });
        }
        return ((Mono)this.protocol.flatMap(protocol -> exchange.toExchangeMessage().flatMap(httpMessage -> {
            if (log.isDebugEnabled()) {
                log.debug("\u6536\u5230HTTP\u8bf7\u6c42\n{}", httpMessage);
            }
            InetSocketAddress address = exchange.request().getClientAddress();
            UnknownHttpDeviceSession session = new UnknownHttpDeviceSession(exchange);
            return protocol.getMessageCodec((Transport)DefaultTransport.HTTP).flatMapMany(codec -> codec.decode((MessageDecodeContext)FromDeviceMessageContext.of((DeviceSession)session, (EncodedMessage)httpMessage, (DeviceRegistry)this.registry))).cast(DeviceMessage.class).flatMap(deviceMessage -> {
                this.monitor.receivedMessage();
                return this.helper.handleDeviceMessage(deviceMessage, device -> new HttpDeviceSession((DeviceOperator)device, address), ignore -> {}, () -> {
                    log.warn("\u65e0\u6cd5\u4eceHTTP\u6d88\u606f\u4e2d\u83b7\u53d6\u8bbe\u5907\u4fe1\u606f:\n{}\n\n\u8bbe\u5907\u6d88\u606f:{}", httpMessage, deviceMessage);
                    return exchange.error(HttpStatus.NOT_FOUND).then(Mono.empty());
                });
            }).then(Mono.defer(() -> {
                if (!exchange.isClosed()) {
                    return exchange.ok();
                }
                return Mono.empty();
            })).onErrorResume(err -> {
                log.error("\u5904\u7406http\u8bf7\u6c42\u5931\u8d25:\n{}", httpMessage, err);
                return this.response500Error(exchange, (Throwable)err);
            }).then();
        })).as((Function)MonoTracer.create((String)("http-device-gateway/" + this.getId() + exchange.request().getPath())))).onErrorResume(error -> {
            log.error(error.getMessage(), error);
            return this.response500Error(exchange, (Throwable)error);
        });
    }

    private void doReloadRoute(List<HttpRoute> routes) {
        HashMap<RouteKey, Disposable> readyToRemove = new HashMap<RouteKey, Disposable>(this.handlers);
        for (HttpRoute route : routes) {
            for (HttpMethod httpMethod : route.getMethod()) {
                String addr = TopicUtils.convertToMqttTopic((String)route.getAddress()).replace("+", "*").replace("#", "**");
                RouteKey key = RouteKey.of(httpMethod, addr);
                readyToRemove.remove(key);
                this.handlers.computeIfAbsent(key, _key -> this.handleRequest(((RouteKey)_key).method, ((RouteKey)_key).url));
            }
        }
        for (Disposable value : readyToRemove.values()) {
            value.dispose();
        }
    }

    private void doReloadRouteWebsocket(List<WebsocketRoute> routes) {
        HashMap<String, Disposable> readyToRemove = new HashMap<String, Disposable>(this.websocketHandlers);
        for (WebsocketRoute route : routes) {
            String addr = TopicUtils.convertToMqttTopic((String)route.getAddress()).replace("+", "*").replace("#", "**");
            readyToRemove.remove(addr);
            this.websocketHandlers.computeIfAbsent(addr, this::handleWebsocketRequest);
        }
        for (Disposable value : readyToRemove.values()) {
            value.dispose();
        }
    }

    final Mono<Void> reloadHttp() {
        return this.protocol.flatMap(support -> support.getRoutes((Transport)DefaultTransport.HTTP).filter(HttpRoute.class::isInstance).cast(HttpRoute.class).collectList().doOnEach(ReactiveLogger.onNext(routes -> {
            if (CollectionUtils.isEmpty((Collection)routes)) {
                log.warn("The protocol [{}] is not configured with url information", (Object)support.getId());
            }
        })).doOnNext(this::doReloadRoute)).then();
    }

    final Mono<Void> reloadWebsocket() {
        return this.protocol.flatMap(support -> support.getRoutes((Transport)DefaultTransport.WebSocket).filter(WebsocketRoute.class::isInstance).cast(WebsocketRoute.class).collectList().doOnNext(this::doReloadRouteWebsocket)).then();
    }

    final Mono<Void> reload() {
        return this.reloadHttp().then(this.reloadWebsocket());
    }

    private Mono<Void> response500Error(HttpExchange exchange, Throwable err) {
        return exchange.error(HttpStatus.INTERNAL_SERVER_ERROR, err);
    }

    protected Mono<Void> doStartup() {
        return this.reload();
    }

    protected Mono<Void> doShutdown() {
        for (Disposable value : this.handlers.values()) {
            value.dispose();
        }
        this.handlers.clear();
        return Mono.empty();
    }

    public void setProtocol(Mono<ProtocolSupport> protocol) {
        this.protocol = protocol;
    }

    private static class RouteKey {
        private HttpMethod method;
        private String url;

        private RouteKey(HttpMethod method, String url) {
            this.method = method;
            this.url = url;
        }

        public static RouteKey of(HttpMethod method, String url) {
            return new RouteKey(method, url);
        }
    }
}

