package org.jetlinks.protocol.official;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.Unpooled;
import javax.annotation.Nonnull;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DisconnectDeviceMessage;
import org.jetlinks.core.message.codec.DefaultTransport;
import org.jetlinks.core.message.codec.DeviceMessageCodec;
import org.jetlinks.core.message.codec.FromDeviceMessageContext;
import org.jetlinks.core.message.codec.MessageCodecContext;
import org.jetlinks.core.message.codec.MessageDecodeContext;
import org.jetlinks.core.message.codec.MessageEncodeContext;
import org.jetlinks.core.message.codec.MessagePayloadType;
import org.jetlinks.core.message.codec.MqttMessage;
import org.jetlinks.core.message.codec.SimpleMqttMessage;
import org.jetlinks.core.message.codec.ToDeviceMessageContext;
import org.jetlinks.core.message.codec.Transport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:jetlinks-official-protocol-3.0-SNAPSHOT.jar:org/jetlinks/protocol/official/JetLinksMqttDeviceMessageCodec.class */
public class JetLinksMqttDeviceMessageCodec implements DeviceMessageCodec {
    private static final Logger log = LoggerFactory.getLogger(JetLinksMqttDeviceMessageCodec.class);
    private final Transport transport;
    private final ObjectMapper mapper;

    public JetLinksMqttDeviceMessageCodec(Transport transport) {
        this.transport = transport;
        this.mapper = ObjectMappers.JSON_MAPPER;
    }

    public JetLinksMqttDeviceMessageCodec() {
        this(DefaultTransport.MQTT);
    }

    public Transport getSupportTransport() {
        return this.transport;
    }

    @Nonnull
    /* renamed from: encode, reason: merged with bridge method [inline-methods] */
    public Mono<MqttMessage> m111encode(@Nonnull MessageEncodeContext messageEncodeContext) {
        return Mono.defer(() -> {
            DeviceMessage deviceMessage;
            TopicPayload encode;
            DeviceMessage message = messageEncodeContext.getMessage();
            if (message instanceof DisconnectDeviceMessage) {
                return ((ToDeviceMessageContext) messageEncodeContext).disconnect().then(Mono.empty());
            }
            if ((message instanceof DeviceMessage) && (encode = TopicMessageCodec.encode(this.mapper, (deviceMessage = message))) != null) {
                return Mono.justOrEmpty(deviceMessage.getHeader("productId").map(String::valueOf)).switchIfEmpty(messageEncodeContext.getDevice(deviceMessage.getDeviceId()).flatMap(deviceOperator -> {
                    return deviceOperator.getSelfConfig(DeviceConfigKey.productId);
                })).defaultIfEmpty("null").map(str -> {
                    return SimpleMqttMessage.builder().clientId(deviceMessage.getDeviceId()).topic("/".concat(str).concat(encode.getTopic())).payloadType(MessagePayloadType.JSON).payload(Unpooled.wrappedBuffer(encode.getPayload())).build();
                });
            }
            return Mono.empty();
        });
    }

    @Nonnull
    /* renamed from: decode, reason: merged with bridge method [inline-methods] */
    public Flux<DeviceMessage> m112decode(@Nonnull MessageDecodeContext messageDecodeContext) {
        MqttMessage message = messageDecodeContext.getMessage();
        byte[] payloadAsBytes = message.payloadAsBytes();
        return TopicMessageCodec.decode(this.mapper, TopicMessageCodec.removeProductPath(message.getTopic()), payloadAsBytes).switchIfEmpty(FunctionalTopicHandlers.handle(messageDecodeContext.getDevice(), message.getTopic().split("/"), payloadAsBytes, this.mapper, topicPayload -> {
            return doReply(messageDecodeContext, topicPayload);
        }));
    }

    private Mono<Void> doReply(MessageCodecContext messageCodecContext, TopicPayload topicPayload) {
        return messageCodecContext instanceof FromDeviceMessageContext ? ((FromDeviceMessageContext) messageCodecContext).getSession().send(SimpleMqttMessage.builder().topic(topicPayload.getTopic()).payload(topicPayload.getPayload()).build()).then() : messageCodecContext instanceof ToDeviceMessageContext ? ((ToDeviceMessageContext) messageCodecContext).sendToDevice(SimpleMqttMessage.builder().topic(topicPayload.getTopic()).payload(topicPayload.getPayload()).build()).then() : Mono.empty();
    }
}
