package org.jetlinks.community.network.mqtt.gateway.device;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.community.gateway.AbstractDeviceGateway;
import org.jetlinks.community.gateway.DeviceGatewayHelper;
import org.jetlinks.community.network.mqtt.client.MqttClient;
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttClientSession;
import org.jetlinks.community.network.mqtt.gateway.device.session.UnknownDeviceMqttClientSession;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.DeviceMessageCodec;
import org.jetlinks.core.message.codec.FromDeviceMessageContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.route.MqttRoute;
import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway.class */
public class MqttClientDeviceGateway extends AbstractDeviceGateway {
    private static final Logger log = LoggerFactory.getLogger(MqttClientDeviceGateway.class);
    final MqttClient mqttClient;
    private final DeviceRegistry registry;
    private Mono<ProtocolSupport> protocol;
    private Mono<DeviceMessageCodec> codecMono;
    private final DeviceGatewayHelper helper;
    private final Map<RouteKey, Tuple2<Integer, Disposable>> routes;

    /* loaded from: input_file:org/jetlinks/community/network/mqtt/gateway/device/MqttClientDeviceGateway$RouteKey.class */
    private static class RouteKey {
        private String topic;
        private int qos;

        private RouteKey(String str, int i) {
            this.topic = str;
            this.qos = i;
        }

        public static RouteKey of(String str, int i) {
            return new RouteKey(str, i);
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof RouteKey)) {
                return false;
            }
            RouteKey routeKey = (RouteKey) obj;
            if (!routeKey.canEqual(this)) {
                return false;
            }
            String str = this.topic;
            String str2 = routeKey.topic;
            return str == null ? str2 == null : str.equals(str2);
        }

        protected boolean canEqual(Object obj) {
            return obj instanceof RouteKey;
        }

        public int hashCode() {
            String str = this.topic;
            return (1 * 59) + (str == null ? 43 : str.hashCode());
        }
    }

    public MqttClientDeviceGateway(String str, MqttClient mqttClient, DeviceRegistry deviceRegistry, Mono<ProtocolSupport> mono, DeviceSessionManager deviceSessionManager, DecodedClientMessageHandler decodedClientMessageHandler) {
        super(str);
        this.routes = new ConcurrentHashMap();
        this.mqttClient = (MqttClient) Objects.requireNonNull(mqttClient, "mqttClient");
        this.registry = (DeviceRegistry) Objects.requireNonNull(deviceRegistry, "registry");
        setProtocol(mono);
        this.helper = new DeviceGatewayHelper(deviceRegistry, deviceSessionManager, decodedClientMessageHandler);
    }

    protected Mono<ProtocolSupport> getProtocol() {
        return this.protocol;
    }

    public void setProtocol(Mono<ProtocolSupport> mono) {
        this.protocol = (Mono) Objects.requireNonNull(mono, "protocol");
        this.codecMono = mono.flatMap(protocolSupport -> {
            return protocolSupport.getMessageCodec(getTransport());
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Void> reload() {
        return getProtocol().flatMap(protocolSupport -> {
            Flux routes = protocolSupport.getRoutes(DefaultTransport.MQTT);
            Class<MqttRoute> cls = MqttRoute.class;
            MqttRoute.class.getClass();
            return routes.filter((v1) -> {
                return r1.isInstance(v1);
            }).cast(MqttRoute.class).collectList().doOnEach(ReactiveLogger.onNext(list -> {
                if (CollectionUtils.isEmpty(list)) {
                    log.warn("The protocol [{}] is not configured with topics information", protocolSupport.getId());
                }
            })).doOnNext(this::doReloadRoute);
        }).then();
    }

    protected void doReloadRoute(List<MqttRoute> list) {
        HashMap hashMap = new HashMap(this.routes);
        for (MqttRoute mqttRoute : list) {
            if (mqttRoute.isUpstream()) {
                RouteKey of = RouteKey.of(convertToMqttTopic(mqttRoute.getTopic()), mqttRoute.getQos());
                hashMap.remove(of);
                this.routes.compute(of, (routeKey, tuple2) -> {
                    if (tuple2 != null) {
                        if (((Integer) tuple2.getT1()).equals(Integer.valueOf(routeKey.qos))) {
                            return tuple2;
                        }
                        ((Disposable) tuple2.getT2()).dispose();
                    }
                    return Tuples.of(Integer.valueOf(routeKey.qos), doSubscribe(routeKey.topic, routeKey.qos));
                });
            }
        }
        for (Map.Entry entry : hashMap.entrySet()) {
            this.routes.remove(entry.getKey());
            ((Disposable) ((Tuple2) entry.getValue()).getT2()).dispose();
        }
    }

    protected static String convertToMqttTopic(String str) {
        return TopicUtils.convertToMqttTopic(str);
    }

    public Transport getTransport() {
        return DefaultTransport.MQTT;
    }

    protected Disposable doSubscribe(String str, int i) {
        return this.mqttClient.subscribe(Collections.singletonList(str), i).filter(mqttMessage -> {
            return isStarted();
        }).flatMap(mqttMessage2 -> {
            return this.codecMono.flatMapMany(deviceMessageCodec -> {
                return deviceMessageCodec.decode(FromDeviceMessageContext.of(new UnknownDeviceMqttClientSession(getId(), this.mqttClient, this.monitor), mqttMessage2, this.registry));
            }).flatMap(message -> {
                this.monitor.receivedMessage();
                return this.helper.handleDeviceMessage((DeviceMessage) message, deviceOperator -> {
                    return createDeviceSession(deviceOperator, this.mqttClient);
                }, deviceSession -> {
                }, () -> {
                    log.warn("can not get device info from message:{},{}", mqttMessage2.print(), message);
                });
            }).subscribeOn(Schedulers.parallel()).onErrorResume(th -> {
                log.error("handle mqtt client message error:{}", mqttMessage2, th);
                return Mono.empty();
            });
        }, Integer.MAX_VALUE).contextWrite(ReactiveLogger.start("gatewayId", getId())).subscribe();
    }

    private MqttClientSession createDeviceSession(DeviceOperator deviceOperator, MqttClient mqttClient) {
        return new MqttClientSession(deviceOperator.getDeviceId(), deviceOperator, mqttClient, this.monitor);
    }

    protected Mono<Void> doShutdown() {
        Iterator<Tuple2<Integer, Disposable>> it = this.routes.values().iterator();
        while (it.hasNext()) {
            ((Disposable) it.next().getT2()).dispose();
        }
        this.routes.clear();
        return Mono.empty();
    }

    protected Mono<Void> doStartup() {
        return reload();
    }
}
