package org.jetlinks.community.network.mqtt.server.vertx;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.util.ReferenceCountUtil;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.SocketAddress;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.messages.MqttPublishMessage;
import io.vertx.mqtt.messages.MqttSubscribeMessage;
import io.vertx.mqtt.messages.MqttUnsubscribeMessage;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.jetlinks.community.network.mqtt.server.MqttConnection;
import org.jetlinks.community.network.mqtt.server.MqttPublishing;
import org.jetlinks.community.network.mqtt.server.MqttSubscription;
import org.jetlinks.community.network.mqtt.server.MqttUnSubscription;
import org.jetlinks.core.message.codec.EncodedMessage;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.SimpleMqttMessage;
import org.jetlinks.core.server.mqtt.MqttAuth;
import org.jetlinks.core.utils.Reactors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection.class */
public class VertxMqttConnection implements MqttConnection {
    private final MqttEndpoint endpoint;
    private long keepAliveTimeoutMs;
    private int messageIdCounter;
    private volatile InetSocketAddress clientAddress;
    private static final Logger log = LoggerFactory.getLogger(VertxMqttConnection.class);
    private static final MqttAuth emptyAuth = new MqttAuth() { // from class: org.jetlinks.community.network.mqtt.server.vertx.VertxMqttConnection.1
        public String getUsername() {
            return "";
        }

        public String getPassword() {
            return "";
        }
    };
    private long lastPingTime = System.currentTimeMillis();
    private volatile boolean closed = false;
    private volatile boolean accepted = false;
    private volatile boolean autoAckSub = true;
    private volatile boolean autoAckUnSub = true;
    private volatile boolean autoAckMsg = false;
    private final Sinks.Many<MqttPublishing> messageProcessor = Reactors.createMany(Integer.MAX_VALUE, false);
    private final Sinks.Many<MqttSubscription> subscription = Reactors.createMany(Integer.MAX_VALUE, false);
    private final Sinks.Many<MqttUnSubscription> unsubscription = Reactors.createMany(Integer.MAX_VALUE, false);
    private final Consumer<MqttConnection> defaultListener = mqttConnection -> {
        log.debug("mqtt client [{}] disconnected", getClientId());
        this.subscription.tryEmitComplete();
        this.unsubscription.tryEmitComplete();
        this.messageProcessor.tryEmitComplete();
    };
    private Consumer<MqttConnection> disconnectConsumer = this.defaultListener;

    /* loaded from: input_file:org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection$VertxMqttAuth.class */
    class VertxMqttAuth implements MqttAuth {
        VertxMqttAuth() {
        }

        public String getUsername() {
            return VertxMqttConnection.this.endpoint.auth().getUsername();
        }

        public String getPassword() {
            return VertxMqttConnection.this.endpoint.auth().getPassword();
        }
    }

    /* loaded from: input_file:org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection$VertxMqttMqttUnSubscription.class */
    class VertxMqttMqttUnSubscription implements MqttUnSubscription {
        private final MqttUnsubscribeMessage message;
        private volatile boolean acknowledged;

        @Override // org.jetlinks.community.network.mqtt.server.MqttUnSubscription
        public MqttUnsubscribeMessage getMessage() {
            return this.message;
        }

        @Override // org.jetlinks.community.network.mqtt.server.MqttUnSubscription
        public synchronized void acknowledge() {
            if (this.acknowledged) {
                return;
            }
            VertxMqttConnection.log.info("acknowledge mqtt [{}] unsubscribe : {} ", VertxMqttConnection.this.getClientId(), this.message.topics());
            this.acknowledged = true;
            VertxMqttConnection.this.endpoint.unsubscribeAcknowledge(this.message.messageId());
        }

        public VertxMqttMqttUnSubscription(MqttUnsubscribeMessage mqttUnsubscribeMessage, boolean z) {
            this.message = mqttUnsubscribeMessage;
            this.acknowledged = z;
        }
    }

    /* loaded from: input_file:org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection$VertxMqttPublishing.class */
    class VertxMqttPublishing implements MqttPublishing {
        private final MqttPublishMessage message;
        private volatile boolean acknowledged;

        @Nonnull
        public String getTopic() {
            return this.message.topicName();
        }

        public String getClientId() {
            return VertxMqttConnection.this.getClientId();
        }

        public int getMessageId() {
            return this.message.messageId();
        }

        public boolean isWill() {
            return false;
        }

        public int getQosLevel() {
            return this.message.qosLevel().value();
        }

        public boolean isDup() {
            return this.message.isDup();
        }

        public boolean isRetain() {
            return this.message.isRetain();
        }

        @Nonnull
        public ByteBuf getPayload() {
            return this.message.payload().getByteBuf();
        }

        public String toString() {
            return print();
        }

        public MqttProperties getProperties() {
            return this.message.properties();
        }

        @Override // org.jetlinks.community.network.mqtt.server.MqttPublishing
        public MqttMessage getMessage() {
            return this;
        }

        @Override // org.jetlinks.community.network.mqtt.server.MqttPublishing
        public void acknowledge() {
            if (this.acknowledged) {
                return;
            }
            this.acknowledged = true;
            if (this.message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
                VertxMqttConnection.log.debug("PUBACK QoS1 mqtt[{}] message[{}]", getClientId(), Integer.valueOf(this.message.messageId()));
                VertxMqttConnection.this.endpoint.publishAcknowledge(this.message.messageId());
            } else if (this.message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
                VertxMqttConnection.log.debug("PUBREC QoS2 mqtt[{}] message[{}]", getClientId(), Integer.valueOf(this.message.messageId()));
                VertxMqttConnection.this.endpoint.publishReceived(this.message.messageId());
            }
        }

        public VertxMqttPublishing(MqttPublishMessage mqttPublishMessage, boolean z) {
            this.message = mqttPublishMessage;
            this.acknowledged = z;
        }
    }

    /* loaded from: input_file:org/jetlinks/community/network/mqtt/server/vertx/VertxMqttConnection$VertxMqttSubscription.class */
    class VertxMqttSubscription implements MqttSubscription {
        private final MqttSubscribeMessage message;
        private volatile boolean acknowledged;

        @Override // org.jetlinks.community.network.mqtt.server.MqttSubscription
        public MqttSubscribeMessage getMessage() {
            return this.message;
        }

        @Override // org.jetlinks.community.network.mqtt.server.MqttSubscription
        public synchronized void acknowledge() {
            if (this.acknowledged) {
                return;
            }
            this.acknowledged = true;
            VertxMqttConnection.this.endpoint.subscribeAcknowledge(this.message.messageId(), (List) this.message.topicSubscriptions().stream().map((v0) -> {
                return v0.qualityOfService();
            }).collect(Collectors.toList()));
        }

        public VertxMqttSubscription(MqttSubscribeMessage mqttSubscribeMessage, boolean z) {
            this.message = mqttSubscribeMessage;
            this.acknowledged = z;
        }
    }

    public VertxMqttConnection(MqttEndpoint mqttEndpoint) {
        this.endpoint = mqttEndpoint;
        this.keepAliveTimeoutMs = (mqttEndpoint.keepAliveTimeSeconds() + 10) * 1000;
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public Duration getKeepAliveTimeout() {
        return Duration.ofMillis(this.keepAliveTimeoutMs);
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public void onClose(Consumer<MqttConnection> consumer) {
        this.disconnectConsumer = this.disconnectConsumer.andThen(consumer);
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public Optional<MqttAuth> getAuth() {
        return this.endpoint.auth() == null ? Optional.of(emptyAuth) : Optional.of(new VertxMqttAuth());
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public void reject(MqttConnectReturnCode mqttConnectReturnCode) {
        if (this.closed) {
            return;
        }
        try {
            this.endpoint.reject(mqttConnectReturnCode);
        } catch (Throwable th) {
        }
        try {
            complete();
        } catch (Throwable th2) {
        }
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public Optional<MqttMessage> getWillMessage() {
        return Optional.ofNullable(this.endpoint.will()).filter(mqttWill -> {
            return mqttWill.getWillMessageBytes() != null;
        }).map(mqttWill2 -> {
            return SimpleMqttMessage.builder().will(true).payload(Unpooled.wrappedBuffer(mqttWill2.getWillMessageBytes())).topic(mqttWill2.getWillTopic()).qosLevel(mqttWill2.getWillQos()).build();
        });
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public MqttConnection accept() {
        if (this.accepted) {
            return this;
        }
        log.debug("mqtt client [{}] connected", getClientId());
        this.accepted = true;
        try {
            if (!this.endpoint.isConnected()) {
                this.endpoint.accept();
            }
            init();
            return this;
        } catch (Exception e) {
            close().subscribe();
            log.warn(e.getMessage(), e);
            return this;
        }
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public void keepAlive() {
        ping();
    }

    void ping() {
        this.lastPingTime = System.currentTimeMillis();
    }

    void init() {
        this.endpoint.disconnectHandler(r3 -> {
            complete();
        }).closeHandler(r32 -> {
            complete();
        }).exceptionHandler(th -> {
            if ((th instanceof DecoderException) && th.getMessage().contains("too large message")) {
                log.error("MQTT消息过大,请在网络组件中设置[最大消息长度].", th);
            } else {
                log.error(th.getMessage(), th);
            }
        }).pingHandler(r33 -> {
            ping();
            if (this.endpoint.isAutoKeepAlive()) {
                return;
            }
            this.endpoint.pong();
        }).publishHandler(mqttPublishMessage -> {
            ping();
            VertxMqttPublishing vertxMqttPublishing = new VertxMqttPublishing(mqttPublishMessage, false);
            boolean z = this.messageProcessor.currentSubscriberCount() > 0;
            if (this.autoAckMsg && z) {
                vertxMqttPublishing.acknowledge();
            }
            if (z) {
                this.messageProcessor.emitNext(vertxMqttPublishing, Reactors.emitFailureHandler());
            }
        }).publishAcknowledgeHandler(num -> {
            ping();
            log.debug("PUBACK mqtt[{}] message[{}]", getClientId(), num);
        }).publishReceivedHandler(num2 -> {
            ping();
            log.debug("PUBREC mqtt[{}] message[{}]", getClientId(), num2);
            this.endpoint.publishRelease(num2.intValue());
        }).publishReleaseHandler(num3 -> {
            ping();
            log.debug("PUBREL mqtt[{}] message[{}]", getClientId(), num3);
            this.endpoint.publishComplete(num3.intValue());
        }).publishCompletionHandler(num4 -> {
            ping();
            log.debug("PUBCOMP mqtt[{}] message[{}]", getClientId(), num4);
        }).subscribeHandler(mqttSubscribeMessage -> {
            ping();
            VertxMqttSubscription vertxMqttSubscription = new VertxMqttSubscription(mqttSubscribeMessage, false);
            boolean z = this.subscription.currentSubscriberCount() > 0;
            if (this.autoAckSub || !z) {
                vertxMqttSubscription.acknowledge();
            }
            if (z) {
                this.subscription.emitNext(vertxMqttSubscription, Reactors.emitFailureHandler());
            }
        }).unsubscribeHandler(mqttUnsubscribeMessage -> {
            ping();
            VertxMqttMqttUnSubscription vertxMqttMqttUnSubscription = new VertxMqttMqttUnSubscription(mqttUnsubscribeMessage, false);
            boolean z = this.unsubscription.currentSubscriberCount() > 0;
            if (this.autoAckUnSub || !z) {
                vertxMqttMqttUnSubscription.acknowledge();
            }
            if (z) {
                this.unsubscription.emitNext(vertxMqttMqttUnSubscription, Reactors.emitFailureHandler());
            }
        });
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public void setKeepAliveTimeout(Duration duration) {
        this.keepAliveTimeoutMs = duration.toMillis();
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public InetSocketAddress getClientAddress() {
        SocketAddress remoteAddress;
        try {
            if (this.clientAddress == null && this.endpoint != null && (remoteAddress = this.endpoint.remoteAddress()) != null) {
                this.clientAddress = new InetSocketAddress(remoteAddress.host(), remoteAddress.port());
            }
        } catch (Throwable th) {
        }
        return this.clientAddress;
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public String getClientId() {
        return this.endpoint.clientIdentifier();
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public Flux<MqttPublishing> handleMessage() {
        return this.messageProcessor.asFlux();
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public Mono<Void> publish(MqttMessage mqttMessage) {
        ping();
        int nextMessageId = mqttMessage.getMessageId() <= 0 ? nextMessageId() : mqttMessage.getMessageId();
        return Mono.create(monoSink -> {
            ByteBuf payload = mqttMessage.getPayload();
            this.endpoint.publish(mqttMessage.getTopic(), Buffer.buffer(payload), MqttQoS.valueOf(mqttMessage.getQosLevel()), mqttMessage.isDup(), mqttMessage.isRetain(), nextMessageId, mqttMessage.getProperties(), asyncResult -> {
                if (asyncResult.succeeded()) {
                    monoSink.success();
                } else {
                    monoSink.error(asyncResult.cause());
                }
                ReferenceCountUtil.safeRelease(payload);
            });
        });
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public Flux<MqttSubscription> handleSubscribe(boolean z) {
        this.autoAckSub = z;
        return this.subscription.asFlux();
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public Flux<MqttUnSubscription> handleUnSubscribe(boolean z) {
        this.autoAckUnSub = z;
        return this.unsubscription.asFlux();
    }

    public InetSocketAddress address() {
        return getClientAddress();
    }

    public Mono<Void> sendMessage(EncodedMessage encodedMessage) {
        return encodedMessage instanceof MqttMessage ? publish((MqttMessage) encodedMessage) : Mono.empty();
    }

    public Flux<EncodedMessage> receiveMessage() {
        return handleMessage().cast(EncodedMessage.class);
    }

    public void disconnect() {
        close().subscribe();
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public boolean isAlive() {
        return this.endpoint.isConnected() && (this.keepAliveTimeoutMs < 0 || System.currentTimeMillis() - this.lastPingTime < this.keepAliveTimeoutMs);
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public Mono<Void> close() {
        return this.closed ? Mono.empty() : Mono.fromRunnable(() -> {
            try {
                if (this.endpoint.isConnected()) {
                    this.endpoint.close();
                } else {
                    complete();
                }
            } catch (Throwable th) {
            }
        });
    }

    private void complete() {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.disconnectConsumer.accept(this);
    }

    private int nextMessageId() {
        this.messageIdCounter = this.messageIdCounter % 65535 != 0 ? this.messageIdCounter + 1 : 1;
        return this.messageIdCounter;
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        return Objects.equals(this.endpoint, ((VertxMqttConnection) obj).endpoint);
    }

    public int hashCode() {
        return Objects.hash(this.endpoint);
    }

    @Override // org.jetlinks.community.network.mqtt.server.MqttConnection
    public long getLastPingTime() {
        return this.lastPingTime;
    }
}
