package org.jetlinks.community.device.message;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.core.Values;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.device.session.DeviceSessionEvent;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.message.ChildDeviceMessage;
import org.jetlinks.core.message.ChildDeviceMessageReply;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.DeviceOfflineMessage;
import org.jetlinks.core.message.DeviceOnlineMessage;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.server.MessageHandler;
import org.jetlinks.core.server.session.ChildrenDeviceSession;
import org.jetlinks.core.server.session.DeviceSession;
import org.jetlinks.supports.server.DecodedClientMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/community/device/message/DeviceMessageConnector.class */
public class DeviceMessageConnector implements DecodedClientMessageHandler {
    private static final Logger log = LoggerFactory.getLogger(DeviceMessageConnector.class);
    private static final String[] allConfigHeader = {PropertyConstants.productId.getKey(), PropertyConstants.deviceName.getKey(), PropertyConstants.orgId.getKey()};
    private static final Function<Throwable, Mono<Void>> doOnError = th -> {
        log.error(th.getMessage(), th);
        return Mono.empty();
    };
    private static final Function<DeviceOperator, Mono<Values>> configGetter = deviceOperator -> {
        return deviceOperator.getSelfConfigs(allConfigHeader);
    };
    private static final Values emptyValues = Values.of(Collections.emptyMap());
    private static final BiConsumer<Message, StringBuilder>[] fastTopicBuilder = new BiConsumer[MessageType.values().length];
    private final DeviceRegistry registry;
    private final EventBus eventBus;
    private final MessageHandler messageHandler;

    public DeviceMessageConnector(EventBus eventBus, DeviceRegistry deviceRegistry, MessageHandler messageHandler, DeviceSessionManager deviceSessionManager) {
        this.registry = deviceRegistry;
        this.eventBus = eventBus;
        this.messageHandler = messageHandler;
        deviceSessionManager.listenEvent(deviceSessionEvent -> {
            return deviceSessionEvent.isClusterExists() ? Mono.empty() : deviceSessionEvent.getType() == DeviceSessionEvent.Type.unregister ? handleSessionUnregister(deviceSessionEvent.getSession()) : deviceSessionEvent.getType() == DeviceSessionEvent.Type.register ? handleSessionRegister(deviceSessionEvent.getSession()) : Mono.empty();
        });
    }

    protected Mono<Void> handleSessionRegister(DeviceSession deviceSession) {
        DeviceOnlineMessage deviceOnlineMessage = new DeviceOnlineMessage();
        deviceOnlineMessage.addHeader("from", "session-register");
        deviceOnlineMessage.addHeader("address", deviceSession.getClientAddress().map((v0) -> {
            return v0.toString();
        }).orElse(""));
        deviceOnlineMessage.setDeviceId(deviceSession.getDeviceId());
        deviceOnlineMessage.setTimestamp(System.currentTimeMillis());
        return onMessage(deviceOnlineMessage).onErrorResume(doOnError);
    }

    protected Mono<Void> handleSessionUnregister(DeviceSession deviceSession) {
        DeviceOfflineMessage deviceOfflineMessage = new DeviceOfflineMessage();
        deviceOfflineMessage.addHeader("from", "session-unregister");
        deviceOfflineMessage.setDeviceId(deviceSession.getDeviceId());
        deviceOfflineMessage.setTimestamp(System.currentTimeMillis());
        if (deviceSession.isWrapFrom(ChildrenDeviceSession.class)) {
            deviceOfflineMessage.addHeader("parentId", deviceSession.unwrap(ChildrenDeviceSession.class).getParentDevice().getDeviceId());
        }
        return onMessage(deviceOfflineMessage).onErrorResume(doOnError);
    }

    public static Flux<String> createDeviceMessageTopic(DeviceRegistry deviceRegistry, Message message) {
        return Flux.defer(() -> {
            if (!(message instanceof DeviceMessage)) {
                return Mono.just("/device/unknown/message/unknown");
            }
            DeviceMessage deviceMessage = (DeviceMessage) message;
            String deviceId = deviceMessage.getDeviceId();
            if (deviceId != null) {
                return deviceRegistry.getDevice(deviceId).flatMap(configGetter).defaultIfEmpty(emptyValues).flatMapIterable(values -> {
                    Map allValues = values.getAllValues();
                    deviceMessage.getClass();
                    allValues.forEach(deviceMessage::addHeader);
                    String createDeviceMessageTopic = createDeviceMessageTopic((String) deviceMessage.getHeader(PropertyConstants.productId).orElse("null"), deviceId, deviceMessage);
                    ArrayList arrayList = new ArrayList(2);
                    arrayList.add(createDeviceMessageTopic);
                    values.getValue(PropertyConstants.orgId).ifPresent(str -> {
                        arrayList.add("/org/" + str + createDeviceMessageTopic);
                    });
                    return arrayList;
                });
            }
            log.warn("无法从消息中获取设备ID:{}", deviceMessage);
            return Mono.empty();
        });
    }

    public static String createDeviceMessageTopic(String str, String str2, DeviceMessage deviceMessage) {
        StringBuilder append = new StringBuilder(64).append("/device/").append(str).append("/").append(str2);
        appendDeviceMessageTopic(deviceMessage, append);
        return append.toString();
    }

    private static void createFastBuilder(MessageType messageType, String str) {
        fastTopicBuilder[messageType.ordinal()] = (message, sb) -> {
            sb.append(str);
        };
    }

    private static void createFastBuilder(MessageType messageType, BiConsumer<Message, StringBuilder> biConsumer) {
        fastTopicBuilder[messageType.ordinal()] = biConsumer;
    }

    public static void appendDeviceMessageTopic(Message message, StringBuilder sb) {
        BiConsumer<Message, StringBuilder> biConsumer = fastTopicBuilder[message.getMessageType().ordinal()];
        if (null != biConsumer) {
            biConsumer.accept(message, sb);
        } else {
            sb.append("/message/").append(message.getMessageType().name().toLowerCase());
        }
    }

    public Mono<Void> onMessage(Message message) {
        if (null == message) {
            return Mono.empty();
        }
        message.addHeader(PropertyConstants.uid, IDGenerator.SNOW_FLAKE_STRING.generate());
        return getTopic(message).flatMap(str -> {
            return this.eventBus.publish(str, message).then();
        }).onErrorResume(doOnError).then();
    }

    private Flux<String> getTopic(Message message) {
        Flux<String> createDeviceMessageTopic = createDeviceMessageTopic(this.registry, message);
        return message instanceof ChildDeviceMessage ? onMessage(((ChildDeviceMessage) message).getChildDeviceMessage()).thenMany(createDeviceMessageTopic) : message instanceof ChildDeviceMessageReply ? onMessage(((ChildDeviceMessageReply) message).getChildDeviceMessage()).thenMany(createDeviceMessageTopic) : createDeviceMessageTopic;
    }

    protected Mono<Boolean> handleChildrenDeviceMessage(Message message) {
        return message instanceof DeviceMessageReply ? doReply((DeviceMessageReply) message) : Mono.just(true);
    }

    protected Mono<Boolean> handleChildrenDeviceMessageReply(ChildDeviceMessage childDeviceMessage) {
        return handleChildrenDeviceMessage(childDeviceMessage.getChildDeviceMessage());
    }

    protected Mono<Boolean> handleChildrenDeviceMessageReply(ChildDeviceMessageReply childDeviceMessageReply) {
        return handleChildrenDeviceMessage(childDeviceMessageReply.getChildDeviceMessage());
    }

    public Mono<Boolean> handleMessage(DeviceOperator deviceOperator, @Nonnull Message message) {
        return onMessage(message).then(message instanceof ChildDeviceMessageReply ? doReply((ChildDeviceMessageReply) message).then(handleChildrenDeviceMessageReply((ChildDeviceMessageReply) message)) : message instanceof ChildDeviceMessage ? handleChildrenDeviceMessageReply((ChildDeviceMessage) message) : message instanceof DeviceMessageReply ? doReply((DeviceMessageReply) message) : Mono.just(true)).defaultIfEmpty(false);
    }

    private Mono<Boolean> doReply(DeviceMessageReply deviceMessageReply) {
        if (log.isDebugEnabled()) {
            log.debug("reply message {}", deviceMessageReply.getMessageId());
        }
        return this.messageHandler.reply(deviceMessageReply).thenReturn(true).doOnError(th -> {
            log.error("reply message error", th);
        });
    }

    static {
        createFastBuilder(MessageType.EVENT, (BiConsumer<Message, StringBuilder>) (message, sb) -> {
            sb.append("/message/event/").append(((EventMessage) message).getEvent());
        });
        createFastBuilder(MessageType.REPORT_PROPERTY, "/message/property/report");
        createFastBuilder(MessageType.READ_PROPERTY, "/message/send/property/read");
        createFastBuilder(MessageType.READ_PROPERTY_REPLY, "/message/property/read/reply");
        createFastBuilder(MessageType.WRITE_PROPERTY, "/message/send/property/write");
        createFastBuilder(MessageType.WRITE_PROPERTY_REPLY, "/message/property/write/reply");
        createFastBuilder(MessageType.INVOKE_FUNCTION, "/message/send/function");
        createFastBuilder(MessageType.INVOKE_FUNCTION_REPLY, "/message/function/reply");
        createFastBuilder(MessageType.REGISTER, "/register");
        createFastBuilder(MessageType.UN_REGISTER, "/unregister");
        createFastBuilder(MessageType.REQUEST_FIRMWARE, "/firmware/pull");
        createFastBuilder(MessageType.REQUEST_FIRMWARE_REPLY, "/firmware/pull/reply");
        createFastBuilder(MessageType.REPORT_FIRMWARE, "/firmware/report");
        createFastBuilder(MessageType.UPGRADE_FIRMWARE_PROGRESS, "/firmware/progress");
        createFastBuilder(MessageType.UPGRADE_FIRMWARE, "/firmware/push");
        createFastBuilder(MessageType.UPGRADE_FIRMWARE_REPLY, "/firmware/push/reply");
        createFastBuilder(MessageType.UNKNOWN, "/message/unknown");
        createFastBuilder(MessageType.LOG, "/message/log");
        createFastBuilder(MessageType.DIRECT, "/message/direct");
        createFastBuilder(MessageType.UPDATE_TAG, "/message/tags/update");
        createFastBuilder(MessageType.ONLINE, "/online");
        createFastBuilder(MessageType.OFFLINE, "/offline");
        createFastBuilder(MessageType.DISCONNECT, "/disconnect");
        createFastBuilder(MessageType.DISCONNECT_REPLY, "/disconnect/reply");
        createFastBuilder(MessageType.CHILD, (BiConsumer<Message, StringBuilder>) (message2, sb2) -> {
            DeviceMessage childDeviceMessage = ((ChildDeviceMessage) message2).getChildDeviceMessage();
            if (childDeviceMessage instanceof DeviceMessage) {
                sb2.append("/message/children/").append(childDeviceMessage.getDeviceId());
            } else {
                sb2.append("/message/children");
            }
            appendDeviceMessageTopic(childDeviceMessage, sb2);
        });
        createFastBuilder(MessageType.CHILD_REPLY, (BiConsumer<Message, StringBuilder>) (message3, sb3) -> {
            DeviceMessage childDeviceMessage = ((ChildDeviceMessageReply) message3).getChildDeviceMessage();
            if (childDeviceMessage instanceof DeviceMessage) {
                sb3.append("/message/children/reply/").append(childDeviceMessage.getDeviceId());
            } else {
                sb3.append("/message/children/reply");
            }
            appendDeviceMessageTopic(childDeviceMessage, sb3);
        });
        createFastBuilder(MessageType.DERIVED_METADATA, "/metadata/derived");
    }
}
