/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.network.mqtt.gateway.device;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.web.logger.ReactiveLogger;
import org.jetlinks.community.gateway.AbstractDeviceGateway;
import org.jetlinks.community.gateway.DeviceGatewayHelper;
import org.jetlinks.community.network.mqtt.client.MqttClient;
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttClientSession;
import org.jetlinks.community.network.mqtt.gateway.device.session.UnknownDeviceMqttClientSession;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.DeviceMessageCodec;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.FromDeviceMessageContext;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.route.MqttRoute;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.utils.TopicUtils;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

public class MqttClientDeviceGateway
extends AbstractDeviceGateway {
    private static final Logger log = LoggerFactory.getLogger(MqttClientDeviceGateway.class);
    final MqttClient mqttClient;
    private final DeviceRegistry registry;
    private Mono<ProtocolSupport> protocol;
    private Mono<DeviceMessageCodec> codecMono;
    private final DeviceGatewayHelper helper;
    private final Map<RouteKey, Tuple2<Integer, Disposable>> routes = new ConcurrentHashMap<RouteKey, Tuple2<Integer, Disposable>>();

    public MqttClientDeviceGateway(String id, MqttClient mqttClient, DeviceRegistry registry, Mono<ProtocolSupport> protocol, DeviceSessionManager sessionManager, DecodedClientMessageHandler clientMessageHandler) {
        super(id);
        this.mqttClient = Objects.requireNonNull(mqttClient, "mqttClient");
        this.registry = Objects.requireNonNull(registry, "registry");
        this.setProtocol(protocol);
        this.helper = new DeviceGatewayHelper(registry, sessionManager, clientMessageHandler);
    }

    protected Mono<ProtocolSupport> getProtocol() {
        return this.protocol;
    }

    public void setProtocol(Mono<ProtocolSupport> protocol) {
        this.protocol = Objects.requireNonNull(protocol, "protocol");
        this.codecMono = protocol.flatMap(p -> p.getMessageCodec(this.getTransport()));
    }

    protected Mono<Void> reload() {
        return this.getProtocol().flatMap(support -> support.getRoutes((Transport)DefaultTransport.MQTT).filter(MqttRoute.class::isInstance).cast(MqttRoute.class).collectList().doOnEach(ReactiveLogger.onNext(routes -> {
            if (CollectionUtils.isEmpty((Collection)routes)) {
                log.warn("The protocol [{}] is not configured with topics information", (Object)support.getId());
            }
        })).doOnNext(this::doReloadRoute)).then();
    }

    protected void doReloadRoute(List<MqttRoute> routes) {
        HashMap<RouteKey, Tuple2<Integer, Disposable>> readyToRemove = new HashMap<RouteKey, Tuple2<Integer, Disposable>>(this.routes);
        for (MqttRoute mqttRoute : routes) {
            if (!mqttRoute.isUpstream()) continue;
            String topic = MqttClientDeviceGateway.convertToMqttTopic(mqttRoute.getTopic());
            RouteKey key = RouteKey.of(topic, mqttRoute.getQos());
            readyToRemove.remove(key);
            this.routes.compute(key, (_key, old) -> {
                if (old != null) {
                    if (((Integer)old.getT1()).equals(((RouteKey)_key).qos)) {
                        return old;
                    }
                    ((Disposable)old.getT2()).dispose();
                }
                return Tuples.of((Object)((RouteKey)_key).qos, (Object)this.doSubscribe(((RouteKey)_key).topic, ((RouteKey)_key).qos));
            });
        }
        for (Map.Entry entry : readyToRemove.entrySet()) {
            this.routes.remove(entry.getKey());
            ((Disposable)((Tuple2)entry.getValue()).getT2()).dispose();
        }
    }

    protected static String convertToMqttTopic(String topic) {
        return TopicUtils.convertToMqttTopic((String)topic);
    }

    public Transport getTransport() {
        return DefaultTransport.MQTT;
    }

    protected Disposable doSubscribe(String topic, int qos) {
        return this.mqttClient.subscribe(Collections.singletonList(topic), qos).filter(msg -> this.isStarted()).flatMap(mqttMessage -> this.codecMono.flatMapMany(codec -> codec.decode((MessageDecodeContext)FromDeviceMessageContext.of((DeviceSession)new UnknownDeviceMqttClientSession(this.getId(), this.mqttClient, this.monitor), (EncodedMessage)mqttMessage, (DeviceRegistry)this.registry))).flatMap(message -> {
            this.monitor.receivedMessage();
            return this.helper.handleDeviceMessage((DeviceMessage)message, device -> this.createDeviceSession((DeviceOperator)device, this.mqttClient), ignore -> {}, () -> log.warn("can not get device info from message:{},{}", (Object)mqttMessage.print(), message));
        }).subscribeOn(Schedulers.parallel()).onErrorResume(err -> {
            log.error("handle mqtt client message error:{}", mqttMessage, err);
            return Mono.empty();
        }), Integer.MAX_VALUE).contextWrite(ReactiveLogger.start((String)"gatewayId", (String)this.getId())).subscribe();
    }

    private MqttClientSession createDeviceSession(DeviceOperator device, MqttClient client) {
        return new MqttClientSession(device.getDeviceId(), device, client, this.monitor);
    }

    protected Mono<Void> doShutdown() {
        for (Tuple2<Integer, Disposable> value : this.routes.values()) {
            ((Disposable)value.getT2()).dispose();
        }
        this.routes.clear();
        return Mono.empty();
    }

    protected Mono<Void> doStartup() {
        return this.reload();
    }

    private static class RouteKey {
        private String topic;
        private int qos;

        private RouteKey(String topic, int qos) {
            this.topic = topic;
            this.qos = qos;
        }

        public static RouteKey of(String topic, int qos) {
            return new RouteKey(topic, qos);
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof RouteKey)) {
                return false;
            }
            RouteKey other = (RouteKey)o;
            if (!other.canEqual(this)) {
                return false;
            }
            String this$topic = this.topic;
            String other$topic = other.topic;
            return !(this$topic == null ? other$topic != null : !this$topic.equals(other$topic));
        }

        protected boolean canEqual(Object other) {
            return other instanceof RouteKey;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $topic = this.topic;
            result = result * 59 + ($topic == null ? 43 : $topic.hashCode());
            return result;
        }
    }
}

