package org.jetlinks.community.network.mqtt.gateway.device;

import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.StatusCode;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import org.jetlinks.community.gateway.AbstractDeviceGateway;
import org.jetlinks.community.gateway.DeviceGatewayHelper;
import org.jetlinks.community.network.mqtt.gateway.device.session.MqttConnectionSession;
import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.community.network.mqtt.server.MqttPublishing;
import org.jetlinks.community.network.mqtt.server.MqttServer;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.community.utils.SystemUtils;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.core.device.AuthenticationResponse;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.DeviceThingType;
import org.jetlinks.core.device.MqttAuthenticationRequest;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.FromDeviceMessageContext;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.Transport;
import org.jetlinks.core.server.session.KeepOnlineSession;
import org.jetlinks.core.trace.DeviceTracer;
import org.jetlinks.core.trace.FluxTracer;
import org.jetlinks.core.trace.MonoTracer;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple3;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/jetlinks/community/network/mqtt/gateway/device/MqttServerDeviceGateway.class */
class MqttServerDeviceGateway extends AbstractDeviceGateway {
    private static final Logger log = LoggerFactory.getLogger(MqttServerDeviceGateway.class);
    static AttributeKey<String> clientId = AttributeKey.stringKey("clientId");
    static AttributeKey<String> username = AttributeKey.stringKey("username");
    static AttributeKey<String> password = AttributeKey.stringKey("password");
    private final DeviceRegistry registry;
    private final DeviceSessionManager sessionManager;
    private final MqttServer mqttServer;
    private final DecodedClientMessageHandler messageHandler;
    private final LongAdder counter;
    private final Mono<ProtocolSupport> supportMono;
    private Disposable disposable;
    private final DeviceGatewayHelper helper;

    public MqttServerDeviceGateway(String str, DeviceRegistry deviceRegistry, DeviceSessionManager deviceSessionManager, MqttServer mqttServer, DecodedClientMessageHandler decodedClientMessageHandler, Mono<ProtocolSupport> mono) {
        super(str);
        this.counter = new LongAdder();
        this.registry = deviceRegistry;
        this.sessionManager = deviceSessionManager;
        this.mqttServer = mqttServer;
        this.messageHandler = decodedClientMessageHandler;
        this.supportMono = mono;
        this.helper = new DeviceGatewayHelper(deviceRegistry, deviceSessionManager, decodedClientMessageHandler);
    }

    private void doStart() {
        if (this.disposable != null) {
            this.disposable.dispose();
        }
        this.disposable = this.mqttServer.handleConnection("device-gateway").filter(mqttConnection -> {
            if (isStarted()) {
                return true;
            }
            mqttConnection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
            this.monitor.rejected();
            return true;
        }).flatMap(mqttConnection2 -> {
            return (Mono) handleConnection(mqttConnection2).flatMap(tuple3 -> {
                return handleAuthResponse((DeviceOperator) tuple3.getT1(), (AuthenticationResponse) tuple3.getT2(), (MqttConnection) tuple3.getT3());
            }).flatMap(tuple32 -> {
                return handleAcceptedMqttConnection((MqttConnection) tuple32.getT1(), (DeviceOperator) tuple32.getT2(), (MqttConnectionSession) tuple32.getT3());
            }).onErrorResume(th -> {
                log.error(th.getMessage(), th);
                return Mono.empty();
            }).as(MonoTracer.create(DeviceTracer.SpanName.connection(mqttConnection2.getClientId()), spanBuilder -> {
                spanBuilder.setAttribute(clientId, mqttConnection2.getClientId());
                spanBuilder.setAttribute(DeviceTracer.SpanKey.address, mqttConnection2.getClientAddress().toString());
            }));
        }, Integer.MAX_VALUE).subscribe();
    }

    private Mono<Tuple3<DeviceOperator, AuthenticationResponse, MqttConnection>> handleConnection(MqttConnection mqttConnection) {
        if (!SystemUtils.memoryIsOutOfWatermark()) {
            return ((Mono) Mono.justOrEmpty(mqttConnection.getAuth()).flatMap(mqttAuth -> {
                MqttAuthenticationRequest mqttAuthenticationRequest = new MqttAuthenticationRequest(mqttConnection.getClientId(), mqttAuth.getUsername(), mqttAuth.getPassword(), getTransport());
                return this.supportMono.map(protocolSupport -> {
                    return protocolSupport.authenticate(mqttAuthenticationRequest, this.registry);
                }).defaultIfEmpty(Mono.defer(() -> {
                    return this.registry.getDevice(mqttConnection.getClientId()).flatMap(deviceOperator -> {
                        return deviceOperator.authenticate(mqttAuthenticationRequest);
                    });
                })).flatMap(Function.identity()).switchIfEmpty(Mono.fromRunnable(() -> {
                    mqttConnection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
                }));
            }).flatMap(authenticationResponse -> {
                return this.registry.getDevice(StringUtils.isEmpty(authenticationResponse.getDeviceId()) ? mqttConnection.getClientId() : authenticationResponse.getDeviceId()).map(deviceOperator -> {
                    return Tuples.of(deviceOperator, authenticationResponse, mqttConnection);
                }).switchIfEmpty(Mono.fromRunnable(() -> {
                    mqttConnection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
                }));
            }).as(MonoTracer.create(DeviceTracer.SpanName.auth(mqttConnection.getClientId()), (span, tuple3) -> {
                AuthenticationResponse authenticationResponse2 = (AuthenticationResponse) tuple3.getT2();
                if (authenticationResponse2.isSuccess()) {
                    return;
                }
                span.setStatus(StatusCode.ERROR, authenticationResponse2.getMessage());
            }, (span2, bool) -> {
                if (!bool.booleanValue()) {
                    span2.setStatus(StatusCode.ERROR, "device not exists");
                }
                span2.setAttribute(DeviceTracer.SpanKey.address, mqttConnection.getClientAddress().toString());
                span2.setAttribute(clientId, mqttConnection.getClientId());
            }))).onErrorResume(th -> {
                return Mono.fromRunnable(() -> {
                    log.error("MQTT连接认证[{}]失败", mqttConnection.getClientId(), th);
                    this.monitor.rejected();
                    mqttConnection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
                });
            });
        }
        mqttConnection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
        return Mono.empty();
    }

    private Mono<Tuple3<MqttConnection, DeviceOperator, MqttConnectionSession>> handleAuthResponse(DeviceOperator deviceOperator, AuthenticationResponse authenticationResponse, MqttConnection mqttConnection) {
        return Mono.defer(() -> {
            String deviceId = deviceOperator.getDeviceId();
            if (authenticationResponse.isSuccess()) {
                mqttConnection.onClose(mqttConnection2 -> {
                    this.counter.decrement();
                    this.monitor.disconnected();
                    this.monitor.totalConnection(this.counter.sum());
                    this.sessionManager.getSession(deviceId, false).flatMap(deviceSession -> {
                        return (deviceSession == null || !deviceSession.isWrapFrom(MqttConnectionSession.class) || (deviceSession instanceof KeepOnlineSession) || ((MqttConnectionSession) deviceSession.unwrap(MqttConnectionSession.class)).getConnection() != mqttConnection2) ? Mono.empty() : this.sessionManager.remove(deviceId, true);
                    }).subscribe();
                });
                this.counter.increment();
                return this.sessionManager.compute(deviceId, mono -> {
                    MqttConnectionSession mqttConnectionSession = new MqttConnectionSession(deviceId, deviceOperator, getTransport(), mqttConnection, this.monitor);
                    return mono.map(deviceSession -> {
                        return deviceSession instanceof KeepOnlineSession ? new KeepOnlineSession(mqttConnectionSession, deviceSession.getKeepAliveTimeout()) : mqttConnectionSession;
                    }).defaultIfEmpty(mqttConnectionSession);
                }).mapNotNull(deviceSession -> {
                    try {
                        return Tuples.of(mqttConnection.accept(), deviceOperator, deviceSession.unwrap(MqttConnectionSession.class));
                    } catch (IllegalStateException e) {
                        return null;
                    }
                }).doOnNext(tuple3 -> {
                    this.monitor.connected();
                    this.monitor.totalConnection(this.counter.sum());
                }).switchIfEmpty(Mono.fromRunnable(() -> {
                    mqttConnection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
                }));
            }
            mqttConnection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
            this.monitor.rejected();
            log.warn("MQTT客户端认证[{}]失败:{}", deviceId, authenticationResponse.getMessage());
            return Mono.empty();
        }).onErrorResume(th -> {
            return Mono.fromRunnable(() -> {
                log.error(th.getMessage(), th);
                this.monitor.rejected();
                mqttConnection.reject(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
            });
        });
    }

    private Mono<Void> handleAcceptedMqttConnection(MqttConnection mqttConnection, DeviceOperator deviceOperator, MqttConnectionSession mqttConnectionSession) {
        return Flux.usingWhen(Mono.just(mqttConnection), (v0) -> {
            return v0.handleMessage();
        }, (v0) -> {
            return v0.close();
        }).filter(mqttPublishing -> {
            return isStarted();
        }).flatMap(mqttPublishing2 -> {
            return (Mono) decodeAndHandleMessage(deviceOperator, mqttConnectionSession, mqttPublishing2, mqttConnection).as(MonoTracer.create(DeviceTracer.SpanName.upstream(mqttConnection.getClientId()), spanBuilder -> {
                spanBuilder.setAttribute(DeviceTracer.SpanKey.message, mqttPublishing2.print());
            }));
        }).mergeWith(Mono.justOrEmpty(mqttConnection.getWillMessage()).flatMap(mqttMessage -> {
            return decodeAndHandleMessage(deviceOperator, mqttConnectionSession, mqttMessage, mqttConnection);
        })).then();
    }

    private Mono<Void> decodeAndHandleMessage(DeviceOperator deviceOperator, MqttConnectionSession mqttConnectionSession, MqttMessage mqttMessage, MqttConnection mqttConnection) {
        this.monitor.receivedMessage();
        return ((Flux) deviceOperator.getProtocol().flatMap(protocolSupport -> {
            return protocolSupport.getMessageCodec(getTransport());
        }).flatMapMany(deviceMessageCodec -> {
            return deviceMessageCodec.decode(FromDeviceMessageContext.of(mqttConnectionSession, mqttMessage, this.registry));
        }).cast(DeviceMessage.class).flatMap(deviceMessage -> {
            if (!StringUtils.hasText(deviceMessage.getDeviceId())) {
                deviceMessage.thingId(DeviceThingType.device, deviceOperator.getDeviceId());
            }
            return handleMessage(deviceOperator, deviceMessage, mqttConnection);
        }).doOnComplete(() -> {
            if (mqttMessage instanceof MqttPublishing) {
                ((MqttPublishing) mqttMessage).acknowledge();
            }
        }).as(FluxTracer.create(DeviceTracer.SpanName.decode(deviceOperator.getDeviceId()), (span, deviceMessage2) -> {
            span.setAttribute(DeviceTracer.SpanKey.message, toJsonString(deviceMessage2.toJson()));
        }))).onErrorResume(th -> {
            log.error("handle mqtt message [{}] error:{}", new Object[]{deviceOperator.getDeviceId(), mqttMessage, th});
            return Mono.empty();
        }).then();
    }

    private String toJsonString(Object obj) {
        return ObjectMappers.JSON_MAPPER.writeValueAsString(obj);
    }

    private Mono<DeviceMessage> handleMessage(DeviceOperator deviceOperator, DeviceMessage deviceMessage, MqttConnection mqttConnection) {
        return !mqttConnection.isAlive() ? this.messageHandler.handleMessage(deviceOperator, deviceMessage).thenReturn(deviceMessage) : this.helper.handleDeviceMessage(deviceMessage, deviceOperator2 -> {
            return new MqttConnectionSession(deviceOperator2.getDeviceId(), deviceOperator2, getTransport(), mqttConnection, this.monitor);
        }, deviceSession -> {
        }, () -> {
            log.warn("无法从MQTT[{}]消息中获取设备信息:{}", mqttConnection.getClientId(), deviceMessage);
        }).thenReturn(deviceMessage);
    }

    protected Mono<Void> doStartup() {
        doStart();
        return Mono.empty();
    }

    protected Mono<Void> doShutdown() {
        if (this.disposable != null && !this.disposable.isDisposed()) {
            this.disposable.dispose();
        }
        return Mono.empty();
    }

    public Transport getTransport() {
        return DefaultTransport.MQTT;
    }
}
