/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.device.message;

import java.util.HashMap;
import java.util.Map;
import org.hswebframework.ezorm.core.StaticMethodReferenceColumn;
import org.hswebframework.ezorm.rdb.mapping.ReactiveQuery;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.device.entity.DeviceInstanceEntity;
import org.jetlinks.community.device.service.LocalDeviceInstanceService;
import org.jetlinks.community.gateway.external.Message;
import org.jetlinks.community.gateway.external.SubscribeRequest;
import org.jetlinks.community.gateway.external.SubscriptionProvider;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.DeviceMessageReply;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.utils.TopicUtils;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
public class DeviceMessageSendSubscriptionProvider
implements SubscriptionProvider {
    private final DeviceRegistry registry;
    private final LocalDeviceInstanceService instanceService;

    public String id() {
        return "device-message-sender";
    }

    public String name() {
        return "\u8bbe\u5907\u6d88\u606f\u53d1\u9001";
    }

    public String[] getTopicPattern() {
        return new String[]{"/device-message-sender/*/*"};
    }

    public Flux<Message> subscribe(SubscribeRequest request) {
        String topic = request.getTopic();
        Map variables = TopicUtils.getPathVariables((String)"/device-message-sender/{productId}/{deviceId}", (String)topic);
        String deviceId = (String)variables.get("deviceId");
        String productId = (String)variables.get("productId");
        if ("*".equals(deviceId)) {
            return ((ReactiveQuery)((ReactiveQuery)this.instanceService.createQuery().select(new StaticMethodReferenceColumn[]{DeviceInstanceEntity::getId})).where(DeviceInstanceEntity::getProductId, (Object)productId)).fetch().map(DeviceInstanceEntity::getId).flatMap(id -> this.doSend(request.getId(), topic, (String)id, (Map<String, Object>)new HashMap<String, Object>(request.getParameter())));
        }
        return Flux.fromArray((Object[])deviceId.split("[,]")).flatMap(id -> this.doSend(request.getId(), topic, (String)id, (Map<String, Object>)new HashMap<String, Object>(request.getParameter())));
    }

    public Flux<Message> doSend(String requestId, String topic, String deviceId, Map<String, Object> message) {
        message.put("messageId", IDGenerator.SNOW_FLAKE_STRING.generate());
        message.put("deviceId", deviceId);
        RepayableDeviceMessage msg = MessageType.convertMessage(message).filter(RepayableDeviceMessage.class::isInstance).map(RepayableDeviceMessage.class::cast).orElseThrow(() -> new UnsupportedOperationException("\u4e0d\u652f\u6301\u7684\u6d88\u606f\u683c\u5f0f"));
        return this.registry.getDevice(deviceId).switchIfEmpty(Mono.error(() -> new DeviceOperationException(ErrorCode.CLIENT_OFFLINE))).flatMapMany(deviceOperator -> deviceOperator.messageSender().send((Publisher)Mono.just((Object)msg))).map(reply -> Message.success((String)requestId, (String)topic, (Object)reply)).onErrorResume(error -> {
            DeviceMessageReply reply = msg.newReply();
            if (error instanceof DeviceOperationException) {
                reply.error(((DeviceOperationException)error).getCode());
            } else {
                reply.error(error);
            }
            return Mono.just((Object)Message.success((String)requestId, (String)topic, (Object)reply));
        });
    }

    public DeviceMessageSendSubscriptionProvider(DeviceRegistry registry, LocalDeviceInstanceService instanceService) {
        this.registry = registry;
        this.instanceService = instanceService;
    }
}

