/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.network.mqtt.gateway.device;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.StatusCode;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import org.jetlinks.community.gateway.AbstractDeviceGateway;
import org.jetlinks.community.gateway.DeviceGatewayHelper;
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.community.network.mqtt.server.MqttPublishing;
import org.jetlinks.community.network.mqtt.server.MqttServer;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.community.utils.SystemUtils;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.device.AuthenticationRequest;
import org.jetlinks.core.device.AuthenticationResponse;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.DeviceThingType;
import org.jetlinks.core.device.MqttAuthenticationRequest;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.FromDeviceMessageContext;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.core.server.session.KeepOnlineSession;
import org.jetlinks.core.things.ThingType;
import org.jetlinks.core.trace.DeviceTracer;
import org.jetlinks.core.trace.FluxTracer;
import org.jetlinks.core.trace.MonoTracer;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;

class MqttServerDeviceGateway
extends AbstractDeviceGateway {
    private static final Logger log = LoggerFactory.getLogger(MqttServerDeviceGateway.class);
    static AttributeKey<String> clientId = AttributeKey.stringKey((String)"clientId");
    static AttributeKey<String> username = AttributeKey.stringKey((String)"username");
    static AttributeKey<String> password = AttributeKey.stringKey((String)"password");
    private final DeviceRegistry registry;
    private final DeviceSessionManager sessionManager;
    private final MqttServer mqttServer;
    private final DecodedClientMessageHandler messageHandler;
    private final LongAdder counter = new LongAdder();
    private final Mono<ProtocolSupport> supportMono;
    private Disposable disposable;
    private final DeviceGatewayHelper helper;

    public MqttServerDeviceGateway(String id, DeviceRegistry registry, DeviceSessionManager sessionManager, MqttServer mqttServer, DecodedClientMessageHandler messageHandler, Mono<ProtocolSupport> customProtocol) {
        super(id);
        this.registry = registry;
        this.sessionManager = sessionManager;
        this.mqttServer = mqttServer;
        this.messageHandler = messageHandler;
        this.supportMono = customProtocol;
        this.helper = new DeviceGatewayHelper(registry, sessionManager, messageHandler);
    }

    private void doStart() {
        if (this.disposable != null) {
            this.disposable.dispose();
        }
        this.disposable = this.mqttServer.handleConnection("device-gateway").filter(conn -> {
            if (!this.isStarted()) {
                conn.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                this.monitor.rejected();
            }
            return true;
        }).flatMap(connection -> (Mono)this.handleConnection((MqttConnection)connection).flatMap(tuple3 -> this.handleAuthResponse((DeviceOperator)tuple3.getT1(), (AuthenticationResponse)tuple3.getT2(), (MqttConnection)tuple3.getT3())).flatMap(tp -> this.handleAcceptedMqttConnection((MqttConnection)tp.getT1(), (DeviceOperator)tp.getT2(), (MqttConnectionSession)tp.getT3())).onErrorResume(err -> {
            log.error(err.getMessage(), err);
            return Mono.empty();
        }).as((Function)MonoTracer.create((String)DeviceTracer.SpanName.connection((String)connection.getClientId()), builder -> {
            builder.setAttribute(clientId, (Object)connection.getClientId());
            builder.setAttribute(DeviceTracer.SpanKey.address, (Object)connection.getClientAddress().toString());
        })), Integer.MAX_VALUE).subscribe();
    }

    private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection connection) {
        if (SystemUtils.memoryIsOutOfWatermark()) {
            connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
            return Mono.empty();
        }
        return ((Mono)Mono.justOrEmpty(connection.getAuth()).flatMap(auth -> {
            MqttAuthenticationRequest request = new MqttAuthenticationRequest(connection.getClientId(), auth.getUsername(), auth.getPassword(), this.getTransport());
            return this.supportMono.map(support -> support.authenticate((AuthenticationRequest)request, this.registry)).defaultIfEmpty((Object)Mono.defer(() -> this.registry.getDevice(connection.getClientId()).flatMap(device -> device.authenticate((AuthenticationRequest)request)))).flatMap(Function.identity()).switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD)));
        }).flatMap(resp -> {
            String deviceId = StringUtils.isEmpty((Object)resp.getDeviceId()) ? connection.getClientId() : resp.getDeviceId();
            return this.registry.getDevice(deviceId).map(operator -> Tuples.of((Object)operator, (Object)resp, (Object)connection)).switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)));
        }).as((Function)MonoTracer.create((String)DeviceTracer.SpanName.auth((String)connection.getClientId()), (span, tp3) -> {
            AuthenticationResponse response = (AuthenticationResponse)tp3.getT2();
            if (!response.isSuccess()) {
                span.setStatus(StatusCode.ERROR, response.getMessage());
            }
        }, (span, hasValue) -> {
            if (!hasValue.booleanValue()) {
                span.setStatus(StatusCode.ERROR, "device not exists");
            }
            span.setAttribute(DeviceTracer.SpanKey.address, (Object)connection.getClientAddress().toString());
            span.setAttribute(clientId, (Object)connection.getClientId());
        }))).onErrorResume(err -> Mono.fromRunnable(() -> {
            log.error("MQTT\u8fde\u63a5\u8ba4\u8bc1[{}]\u5931\u8d25", (Object)connection.getClientId(), err);
            this.monitor.rejected();
            connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
        }));
    }

    private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> handleAuthResponse(DeviceOperator device, AuthenticationResponse resp, MqttConnection connection) {
        return Mono.defer(() -> {
            String deviceId = device.getDeviceId();
            if (resp.isSuccess()) {
                connection.onClose(conn -> {
                    this.counter.decrement();
                    this.monitor.disconnected();
                    this.monitor.totalConnection(this.counter.sum());
                    this.sessionManager.getSession(deviceId, false).flatMap(_tmp -> {
                        MqttConnectionSession connectionSession;
                        if (_tmp != null && _tmp.isWrapFrom(MqttConnectionSession.class) && !(_tmp instanceof KeepOnlineSession) && (connectionSession = (MqttConnectionSession)_tmp.unwrap(MqttConnectionSession.class)).getConnection() == conn) {
                            return this.sessionManager.remove(deviceId, true);
                        }
                        return Mono.empty();
                    }).subscribe();
                });
                this.counter.increment();
                return this.sessionManager.compute(deviceId, old -> {
                    MqttConnectionSession newSession = new MqttConnectionSession(deviceId, device, this.getTransport(), connection, this.monitor);
                    return old.map(session -> {
                        if (session instanceof KeepOnlineSession) {
                            return new KeepOnlineSession((DeviceSession)newSession, session.getKeepAliveTimeout());
                        }
                        return newSession;
                    }).defaultIfEmpty((Object)newSession);
                }).mapNotNull(session -> {
                    try {
                        return Tuples.of((Object)connection.accept(), (Object)device, (Object)session.unwrap(MqttConnectionSession.class));
                    }
                    catch (IllegalStateException ignore) {
                        return null;
                    }
                }).doOnNext(o -> {
                    this.monitor.connected();
                    this.monitor.totalConnection(this.counter.sum());
                }).switchIfEmpty(Mono.fromRunnable(() -> connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED)));
            }
            connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
            this.monitor.rejected();
            log.warn("MQTT\u5ba2\u6237\u7aef\u8ba4\u8bc1[{}]\u5931\u8d25:{}", (Object)deviceId, (Object)resp.getMessage());
            return Mono.empty();
        }).onErrorResume(error -> Mono.fromRunnable(() -> {
            log.error(error.getMessage(), error);
            this.monitor.rejected();
            connection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
        }));
    }

    private Mono<Void> handleAcceptedMqttConnection(MqttConnection connection, DeviceOperator operator, MqttConnectionSession session) {
        return Flux.usingWhen((Publisher)Mono.just((Object)connection), MqttConnection::handleMessage, MqttConnection::close).filter(pb -> this.isStarted()).flatMap(publishing -> (Mono)this.decodeAndHandleMessage(operator, session, (MqttMessage)publishing, connection).as((Function)MonoTracer.create((String)DeviceTracer.SpanName.upstream((String)connection.getClientId()), span -> span.setAttribute(DeviceTracer.SpanKey.message, (Object)publishing.print())))).mergeWith((Publisher)Mono.justOrEmpty(connection.getWillMessage()).flatMap(mqttMessage -> this.decodeAndHandleMessage(operator, session, (MqttMessage)mqttMessage, connection))).then();
    }

    private Mono<Void> decodeAndHandleMessage(DeviceOperator operator, MqttConnectionSession session, MqttMessage message, MqttConnection connection) {
        this.monitor.receivedMessage();
        return ((Flux)operator.getProtocol().flatMap(protocol -> protocol.getMessageCodec(this.getTransport())).flatMapMany(codec -> codec.decode((MessageDecodeContext)FromDeviceMessageContext.of((DeviceSession)session, (EncodedMessage)message, (DeviceRegistry)this.registry))).cast(DeviceMessage.class).flatMap(msg -> {
            if (!StringUtils.hasText((String)msg.getDeviceId())) {
                msg.thingId((ThingType)DeviceThingType.device, operator.getDeviceId());
            }
            return this.handleMessage(operator, (DeviceMessage)msg, connection);
        }).doOnComplete(() -> {
            if (message instanceof MqttPublishing) {
                ((MqttPublishing)message).acknowledge();
            }
        }).as((Function)FluxTracer.create((String)DeviceTracer.SpanName.decode((String)operator.getDeviceId()), (span, msg) -> span.setAttribute(DeviceTracer.SpanKey.message, (Object)this.toJsonString(msg.toJson()))))).onErrorResume(err -> {
            log.error("handle mqtt message [{}] error:{}", new Object[]{operator.getDeviceId(), message, err});
            return Mono.empty();
        }).then();
    }

    private String toJsonString(Object data) {
        return ObjectMappers.JSON_MAPPER.writeValueAsString(data);
    }

    private Mono<DeviceMessage> handleMessage(DeviceOperator mainDevice, DeviceMessage message, MqttConnection connection) {
        if (!connection.isAlive()) {
            return this.messageHandler.handleMessage(mainDevice, (Message)message).thenReturn((Object)message);
        }
        return this.helper.handleDeviceMessage(message, device -> new MqttConnectionSession(device.getDeviceId(), (DeviceOperator)device, this.getTransport(), connection, this.monitor), session -> {}, () -> log.warn("\u65e0\u6cd5\u4eceMQTT[{}]\u6d88\u606f\u4e2d\u83b7\u53d6\u8bbe\u5907\u4fe1\u606f:{}", (Object)connection.getClientId(), (Object)message)).thenReturn((Object)message);
    }

    protected Mono<Void> doStartup() {
        this.doStart();
        return Mono.empty();
    }

    protected Mono<Void> doShutdown() {
        if (this.disposable != null && !this.disposable.isDisposed()) {
            this.disposable.dispose();
        }
        return Mono.empty();
    }

    public Transport getTransport() {
        return DefaultTransport.MQTT;
    }
}

