package org.jetlinks.community.network.http.device;

import io.netty.handler.codec.http.websocketx.WebSocketCloseStatus;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.community.gateway.AbstractDeviceGateway;
import org.jetlinks.community.gateway.DeviceGatewayHelper;
import org.jetlinks.community.network.http.server.HttpExchange;
import org.jetlinks.community.network.http.server.HttpServer;
import org.jetlinks.community.network.http.server.WebSocketExchange;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.device.AuthenticationResponse;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.DeviceThingType;
import org.jetlinks.core.device.WebsocketAuthenticationRequest;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.FromDeviceMessageContext;
import org.jetlinks.core.message.codec.http.websocket.WebSocketMessage;
import org.jetlinks.core.route.HttpRoute;
import org.jetlinks.core.route.WebsocketRoute;
import org.jetlinks.core.trace.MonoTracer;
import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/community/network/http/device/HttpServerDeviceGateway.class */
public class HttpServerDeviceGateway extends AbstractDeviceGateway {
    private static final Logger log = LoggerFactory.getLogger(HttpServerDeviceGateway.class);
    final HttpServer httpServer;
    Mono<ProtocolSupport> protocol;
    private final DeviceRegistry registry;
    private final DeviceGatewayHelper helper;
    private final Map<RouteKey, Disposable> handlers;
    private final Map<String, Disposable> websocketHandlers;

    /* loaded from: input_file:org/jetlinks/community/network/http/device/HttpServerDeviceGateway$RouteKey.class */
    private static class RouteKey {
        private HttpMethod method;
        private String url;

        private RouteKey(HttpMethod httpMethod, String str) {
            this.method = httpMethod;
            this.url = str;
        }

        public static RouteKey of(HttpMethod httpMethod, String str) {
            return new RouteKey(httpMethod, str);
        }
    }

    public HttpServerDeviceGateway(String str, HttpServer httpServer, Mono<ProtocolSupport> mono, DeviceSessionManager deviceSessionManager, DeviceRegistry deviceRegistry, DecodedClientMessageHandler decodedClientMessageHandler) {
        super(str);
        this.handlers = new ConcurrentHashMap();
        this.websocketHandlers = new ConcurrentHashMap();
        this.httpServer = httpServer;
        this.registry = deviceRegistry;
        this.helper = new DeviceGatewayHelper(deviceRegistry, deviceSessionManager, decodedClientMessageHandler);
        this.protocol = mono;
    }

    private Disposable handleRequest(HttpMethod httpMethod, String str) {
        return this.httpServer.handleRequest(httpMethod, str).flatMap(this::handleHttpRequest, Integer.MAX_VALUE).subscribe();
    }

    private Disposable handleWebsocketRequest(String str) {
        return this.httpServer.handleWebsocket(str).flatMap(this::handleWebsocketRequest, Integer.MAX_VALUE).subscribe();
    }

    private Mono<Void> handleWebsocketRequest(WebSocketExchange webSocketExchange) {
        return this.protocol.flatMap(protocolSupport -> {
            return protocolSupport.authenticate(WebsocketAuthenticationRequest.of(webSocketExchange), this.registry).onErrorResume(th -> {
                return Mono.just(AuthenticationResponse.error(500, th.getMessage()));
            }).flatMap(authenticationResponse -> {
                if (!authenticationResponse.isSuccess()) {
                    log.warn("设备[{}] Websocket 认证失败:{}", webSocketExchange.getRemoteAddress().orElse(null), authenticationResponse.getMessage());
                    return webSocketExchange.close(HttpStatus.UNAUTHORIZED);
                }
                String deviceId = authenticationResponse.getDeviceId();
                if (!StringUtils.hasText(deviceId)) {
                    return webSocketExchange.receive().flatMap(webSocketMessage -> {
                        return handleWebsocketRequest(webSocketExchange, webSocketMessage, null);
                    }).then();
                }
                DeviceOnlineMessage deviceOnlineMessage = new DeviceOnlineMessage();
                deviceOnlineMessage.setDeviceId(deviceId);
                return handleWebsocketMessage(deviceOnlineMessage, webSocketExchange).flatMap(deviceOperator -> {
                    return webSocketExchange.receive().flatMap(webSocketMessage2 -> {
                        return handleWebsocketRequest(webSocketExchange, webSocketMessage2, deviceOperator);
                    }).then();
                });
            });
        });
    }

    private Mono<Void> handleWebsocketRequest(WebSocketExchange webSocketExchange, WebSocketMessage webSocketMessage, DeviceOperator deviceOperator) {
        if (!isStarted()) {
            return webSocketExchange.close(WebSocketCloseStatus.BAD_GATEWAY.code()).onErrorResume(th -> {
                log.error(th.getMessage(), th);
                return Mono.empty();
            });
        }
        WebSocketDeviceSession webSocketDeviceSession = new WebSocketDeviceSession(deviceOperator, webSocketExchange);
        return ((Mono) this.protocol.flatMap(protocolSupport -> {
            if (log.isDebugEnabled()) {
                log.debug("收到HTTP请求\n{}", webSocketMessage);
            }
            Flux flatMap = protocolSupport.getMessageCodec(DefaultTransport.WebSocket).flatMapMany(deviceMessageCodec -> {
                return deviceMessageCodec.decode(FromDeviceMessageContext.of(webSocketDeviceSession, webSocketMessage, this.registry));
            }).cast(DeviceMessage.class).flatMap(deviceMessage -> {
                this.monitor.receivedMessage();
                if (!StringUtils.hasText(deviceMessage.getDeviceId())) {
                    deviceMessage.thingId(DeviceThingType.device, webSocketDeviceSession.getDeviceId());
                }
                return handleWebsocketMessage(deviceMessage, webSocketExchange);
            });
            webSocketDeviceSession.getClass();
            return flatMap.doOnNext(webSocketDeviceSession::setOperator).onErrorResume(th2 -> {
                log.error("处理http请求失败:\n{}", webSocketMessage, th2);
                return webSocketExchange.close(HttpStatus.BAD_REQUEST).then(Mono.empty());
            }).then();
        }).as(MonoTracer.create("http-device-gateway/" + getId() + webSocketExchange.getPath()))).onErrorResume(th2 -> {
            log.error(th2.getMessage(), th2);
            return webSocketExchange.close(HttpStatus.INTERNAL_SERVER_ERROR.value()).then(Mono.empty());
        });
    }

    private Mono<DeviceOperator> handleWebsocketMessage(DeviceMessage deviceMessage, WebSocketExchange webSocketExchange) {
        return this.helper.handleDeviceMessage(deviceMessage, deviceOperator -> {
            return new WebSocketDeviceSession(deviceOperator, webSocketExchange);
        }, deviceSession -> {
            if (deviceSession.isWrapFrom(WebSocketDeviceSession.class)) {
                ((WebSocketDeviceSession) deviceSession.unwrap(WebSocketDeviceSession.class)).setExchange(webSocketExchange);
            } else if (deviceSession.isWrapFrom(HttpDeviceSession.class)) {
                ((HttpDeviceSession) deviceSession.unwrap(HttpDeviceSession.class)).setWebsocket(webSocketExchange);
            }
        }, () -> {
            return webSocketExchange.close(HttpStatus.NOT_FOUND).then(Mono.empty());
        });
    }

    private Mono<Void> handleHttpRequest(HttpExchange httpExchange) {
        return !isStarted() ? httpExchange.error(HttpStatus.SERVICE_UNAVAILABLE).onErrorResume(th -> {
            log.error(th.getMessage(), th);
            return Mono.empty();
        }) : ((Mono) this.protocol.flatMap(protocolSupport -> {
            return httpExchange.toExchangeMessage().flatMap(httpExchangeMessage -> {
                if (log.isDebugEnabled()) {
                    log.debug("收到HTTP请求\n{}", httpExchangeMessage);
                }
                InetSocketAddress clientAddress = httpExchange.request().getClientAddress();
                UnknownHttpDeviceSession unknownHttpDeviceSession = new UnknownHttpDeviceSession(httpExchange);
                return protocolSupport.getMessageCodec(DefaultTransport.HTTP).flatMapMany(deviceMessageCodec -> {
                    return deviceMessageCodec.decode(FromDeviceMessageContext.of(unknownHttpDeviceSession, httpExchangeMessage, this.registry));
                }).cast(DeviceMessage.class).flatMap(deviceMessage -> {
                    this.monitor.receivedMessage();
                    return this.helper.handleDeviceMessage(deviceMessage, deviceOperator -> {
                        return new HttpDeviceSession(deviceOperator, clientAddress);
                    }, deviceSession -> {
                    }, () -> {
                        log.warn("无法从HTTP消息中获取设备信息:\n{}\n\n设备消息:{}", httpExchangeMessage, deviceMessage);
                        return httpExchange.error(HttpStatus.NOT_FOUND).then(Mono.empty());
                    });
                }).then(Mono.defer(() -> {
                    return !httpExchange.isClosed() ? httpExchange.ok() : Mono.empty();
                })).onErrorResume(th2 -> {
                    log.error("处理http请求失败:\n{}", httpExchangeMessage, th2);
                    return response500Error(httpExchange, th2);
                }).then();
            });
        }).as(MonoTracer.create("http-device-gateway/" + getId() + httpExchange.request().getPath()))).onErrorResume(th2 -> {
            log.error(th2.getMessage(), th2);
            return response500Error(httpExchange, th2);
        });
    }

    private void doReloadRoute(List<HttpRoute> list) {
        HashMap hashMap = new HashMap(this.handlers);
        for (HttpRoute httpRoute : list) {
            for (HttpMethod httpMethod : httpRoute.getMethod()) {
                RouteKey of = RouteKey.of(httpMethod, TopicUtils.convertToMqttTopic(httpRoute.getAddress()).replace("+", "*").replace("#", "**"));
                hashMap.remove(of);
                this.handlers.computeIfAbsent(of, routeKey -> {
                    return handleRequest(routeKey.method, routeKey.url);
                });
            }
        }
        Iterator it = hashMap.values().iterator();
        while (it.hasNext()) {
            ((Disposable) it.next()).dispose();
        }
    }

    private void doReloadRouteWebsocket(List<WebsocketRoute> list) {
        HashMap hashMap = new HashMap(this.websocketHandlers);
        Iterator<WebsocketRoute> it = list.iterator();
        while (it.hasNext()) {
            String replace = TopicUtils.convertToMqttTopic(it.next().getAddress()).replace("+", "*").replace("#", "**");
            hashMap.remove(replace);
            this.websocketHandlers.computeIfAbsent(replace, this::handleWebsocketRequest);
        }
        Iterator it2 = hashMap.values().iterator();
        while (it2.hasNext()) {
            ((Disposable) it2.next()).dispose();
        }
    }

    final Mono<Void> reloadHttp() {
        return this.protocol.flatMap(protocolSupport -> {
            Flux routes = protocolSupport.getRoutes(DefaultTransport.HTTP);
            Class<HttpRoute> cls = HttpRoute.class;
            HttpRoute.class.getClass();
            return routes.filter((v1) -> {
                return r1.isInstance(v1);
            }).cast(HttpRoute.class).collectList().doOnEach(ReactiveLogger.onNext(list -> {
                if (CollectionUtils.isEmpty(list)) {
                    log.warn("The protocol [{}] is not configured with url information", protocolSupport.getId());
                }
            })).doOnNext(this::doReloadRoute);
        }).then();
    }

    final Mono<Void> reloadWebsocket() {
        return this.protocol.flatMap(protocolSupport -> {
            Flux routes = protocolSupport.getRoutes(DefaultTransport.WebSocket);
            Class<WebsocketRoute> cls = WebsocketRoute.class;
            WebsocketRoute.class.getClass();
            return routes.filter((v1) -> {
                return r1.isInstance(v1);
            }).cast(WebsocketRoute.class).collectList().doOnNext(this::doReloadRouteWebsocket);
        }).then();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final Mono<Void> reload() {
        return reloadHttp().then(reloadWebsocket());
    }

    private Mono<Void> response500Error(HttpExchange httpExchange, Throwable th) {
        return httpExchange.error(HttpStatus.INTERNAL_SERVER_ERROR, th);
    }

    protected Mono<Void> doStartup() {
        return reload();
    }

    protected Mono<Void> doShutdown() {
        Iterator<Disposable> it = this.handlers.values().iterator();
        while (it.hasNext()) {
            it.next().dispose();
        }
        this.handlers.clear();
        return Mono.empty();
    }

    public void setProtocol(Mono<ProtocolSupport> mono) {
        this.protocol = mono;
    }
}
