package org.jetlinks.community.rule.engine.executor;

import com.google.common.collect.Maps;
import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.commons.collections4.MapUtils;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.id.IDGenerator;
import org.hswebframework.web.utils.TemplateParser;
import org.jetlinks.community.relation.utils.VariableSource;
import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorProviders;
import org.jetlinks.community.rule.engine.executor.device.DeviceSelectorSpec;
import org.jetlinks.core.device.DeviceOperator;
import org.jetlinks.core.device.DeviceRegistry;
import org.jetlinks.core.enums.ErrorCode;
import org.jetlinks.core.exception.DeviceOperationException;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.MessageType;
import org.jetlinks.core.message.RepayableDeviceMessage;
import org.jetlinks.core.message.function.FunctionInvokeMessage;
import org.jetlinks.core.message.property.ReadPropertyMessage;
import org.jetlinks.core.message.property.WritePropertyMessage;
import org.jetlinks.reactor.ql.supports.DefaultPropertyFeature;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.FunctionTaskExecutor;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Component
/* loaded from: input_file:org/jetlinks/community/rule/engine/executor/DeviceMessageSendTaskExecutorProvider.class */
public class DeviceMessageSendTaskExecutorProvider implements TaskExecutorProvider {
    public static final String EXECUTOR = "device-message-sender";
    private final DeviceRegistry registry;
    private final DeviceSelectorBuilder selectorBuilder;

    /* loaded from: input_file:org/jetlinks/community/rule/engine/executor/DeviceMessageSendTaskExecutorProvider$DeviceMessageSendConfig.class */
    public static class DeviceMessageSendConfig {
        private String deviceId;
        private String productId;
        private DeviceSelectorSpec selectorSpec;
        private String from;
        private Map<String, Object> message;
        private boolean async;
        private Map<String, Object> responseHeaders;
        private Duration timeout = Duration.ofSeconds(10);
        private String waitType = "sync";
        private String stateOperator = "ignoreOffline";
        private long delayMillis = 0;

        public Map<String, Object> toMap() {
            Map<String, Object> map = (Map) FastBeanCopier.copy(this, new HashMap(), new String[0]);
            map.put("timeout", this.timeout.toString());
            return map;
        }

        public Flux<DeviceMessage> doSend(Map<String, Object> map, ExecutionContext executionContext, DeviceOperator deviceOperator, RuleData ruleData) {
            HashMap hashMap = new HashMap("pre-node".equals(this.from) ? map : this.message);
            hashMap.put("messageId", IDGenerator.SNOW_FLAKE_STRING.generate());
            hashMap.put("deviceId", deviceOperator.getDeviceId());
            hashMap.put("timestamp", Long.valueOf(System.currentTimeMillis()));
            return ((Mono) Mono.justOrEmpty(MessageType.convertMessage(hashMap)).switchIfEmpty(executionContext.onError(() -> {
                return new DeviceOperationException(ErrorCode.UNSUPPORTED_MESSAGE);
            }, ruleData)).cast(DeviceMessage.class).flatMap(deviceMessage -> {
                return applyMessageExpression((Map<String, Object>) map, deviceMessage);
            }).doOnNext(deviceMessage2 -> {
                deviceMessage2.addHeader(Headers.async, Boolean.valueOf(this.async || !"sync".equals(this.waitType))).addHeader(Headers.sendAndForget, Boolean.valueOf("forget".equals(this.waitType))).addHeader(Headers.timeout, Long.valueOf(this.timeout.toMillis()));
            }).as(mono -> {
                return this.delayMillis > 0 ? mono.delayElement(Duration.ofMillis(this.delayMillis)) : mono;
            })).flatMapMany(deviceMessage3 -> {
                return "forget".equals(this.waitType) ? deviceOperator.messageSender().send(deviceMessage3).then(Mono.empty()) : deviceOperator.messageSender().send(deviceMessage3).onErrorResume(th -> {
                    return deviceMessage3 instanceof RepayableDeviceMessage ? Mono.just(((RepayableDeviceMessage) deviceMessage3).newReply().error(th)) : Mono.error(th);
                });
            });
        }

        private Mono<ReadPropertyMessage> applyMessageExpression(Map<String, Object> map, ReadPropertyMessage readPropertyMessage) {
            return Mono.just(readPropertyMessage);
        }

        private Mono<WritePropertyMessage> applyMessageExpression(Map<String, Object> map, WritePropertyMessage writePropertyMessage) {
            Map properties = writePropertyMessage.getProperties();
            if (!CollectionUtils.isEmpty(properties)) {
                writePropertyMessage.setProperties(Maps.transformValues(properties, obj -> {
                    return VariableSource.of(obj).resolveStatic(map);
                }));
            }
            return Mono.just(writePropertyMessage);
        }

        private Mono<FunctionInvokeMessage> applyMessageExpression(Map<String, Object> map, FunctionInvokeMessage functionInvokeMessage) {
            List inputs = functionInvokeMessage.getInputs();
            return !CollectionUtils.isEmpty(inputs) ? Flux.fromIterable(inputs).flatMap(functionParameter -> {
                Flux resolve = VariableSource.of(functionParameter.getValue()).resolve(map);
                functionParameter.getClass();
                return resolve.doOnNext(functionParameter::setValue);
            }).then(Mono.just(functionInvokeMessage)) : Mono.just(functionInvokeMessage);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getDeviceIdInMessage(Map<String, Object> map) {
            String str = (String) this.message.get("deviceId");
            if (StringUtils.hasText(str)) {
                return str.contains("${") ? TemplateParser.parse(str, str2 -> {
                    return (String) DefaultPropertyFeature.GLOBAL.getProperty(str2, map).map(String::valueOf).orElse("");
                }) : str;
            }
            return null;
        }

        private Mono<? extends DeviceMessage> applyMessageExpression(Map<String, Object> map, DeviceMessage deviceMessage) {
            return deviceMessage instanceof ReadPropertyMessage ? applyMessageExpression(map, (ReadPropertyMessage) deviceMessage) : deviceMessage instanceof WritePropertyMessage ? applyMessageExpression(map, (WritePropertyMessage) deviceMessage) : deviceMessage instanceof FunctionInvokeMessage ? applyMessageExpression(map, (FunctionInvokeMessage) deviceMessage) : Mono.just(deviceMessage);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isFixed() {
            return DeviceSelectorProviders.PROVIDER_FIXED.equals(this.from);
        }

        private boolean isPreNode() {
            return "pre-node".equals(this.from);
        }

        public void validate() {
            if (DeviceSelectorProviders.PROVIDER_FIXED.equals(this.from)) {
                MessageType.convertMessage(this.message).orElseThrow(() -> {
                    return new IllegalArgumentException("不支持的消息格式");
                });
            }
        }

        public String getDeviceId() {
            return this.deviceId;
        }

        public String getProductId() {
            return this.productId;
        }

        public DeviceSelectorSpec getSelectorSpec() {
            return this.selectorSpec;
        }

        public String getFrom() {
            return this.from;
        }

        public Duration getTimeout() {
            return this.timeout;
        }

        public Map<String, Object> getMessage() {
            return this.message;
        }

        public boolean isAsync() {
            return this.async;
        }

        public String getWaitType() {
            return this.waitType;
        }

        public String getStateOperator() {
            return this.stateOperator;
        }

        public long getDelayMillis() {
            return this.delayMillis;
        }

        public Map<String, Object> getResponseHeaders() {
            return this.responseHeaders;
        }

        public void setDeviceId(String str) {
            this.deviceId = str;
        }

        public void setProductId(String str) {
            this.productId = str;
        }

        public void setSelectorSpec(DeviceSelectorSpec deviceSelectorSpec) {
            this.selectorSpec = deviceSelectorSpec;
        }

        public void setFrom(String str) {
            this.from = str;
        }

        public void setTimeout(Duration duration) {
            this.timeout = duration;
        }

        public void setMessage(Map<String, Object> map) {
            this.message = map;
        }

        public void setAsync(boolean z) {
            this.async = z;
        }

        public void setWaitType(String str) {
            this.waitType = str;
        }

        public void setStateOperator(String str) {
            this.stateOperator = str;
        }

        public void setDelayMillis(long j) {
            this.delayMillis = j;
        }

        public void setResponseHeaders(Map<String, Object> map) {
            this.responseHeaders = map;
        }
    }

    /* loaded from: input_file:org/jetlinks/community/rule/engine/executor/DeviceMessageSendTaskExecutorProvider$DeviceMessageSendTaskExecutor.class */
    class DeviceMessageSendTaskExecutor extends FunctionTaskExecutor {
        private DeviceMessageSendConfig config;
        private Function<Map<String, Object>, Flux<DeviceOperator>> selector;

        public DeviceMessageSendTaskExecutor(ExecutionContext executionContext) {
            super("发送设备消息", executionContext);
            reload();
        }

        protected Flux<DeviceOperator> selectDevice(Map<String, Object> map) {
            return this.selector.apply(map);
        }

        protected Publisher<RuleData> apply(RuleData ruleData) {
            Map<String, Object> contextMap = RuleDataHelper.toContextMap(ruleData);
            return ("ignoreOffline".equals(this.config.getStateOperator()) ? selectDevice(contextMap).filterWhen((v0) -> {
                return v0.isOnline();
            }) : selectDevice(contextMap)).switchIfEmpty(this.context.onError(() -> {
                return new DeviceOperationException(ErrorCode.SYSTEM_ERROR, "无可用设备");
            }, ruleData)).flatMap(deviceOperator -> {
                return this.config.doSend(contextMap, this.context, deviceOperator, ruleData).onErrorResume(th -> {
                    return this.context.onError(th, ruleData);
                }).subscribeOn(Schedulers.parallel());
            }).map(deviceMessage -> {
                RuleData newRuleData = this.context.newRuleData(ruleData.newData(deviceMessage.toJson()));
                if (this.config.getResponseHeaders() != null) {
                    Map<String, Object> responseHeaders = this.config.getResponseHeaders();
                    newRuleData.getClass();
                    responseHeaders.forEach(newRuleData::setHeader);
                }
                return newRuleData;
            });
        }

        public void validate() {
            if (CollectionUtils.isEmpty(this.context.getJob().getConfiguration())) {
                throw new IllegalArgumentException("配置不能为空");
            }
            ((DeviceMessageSendConfig) FastBeanCopier.copy(this.context.getJob().getConfiguration(), new DeviceMessageSendConfig(), new String[0])).validate();
        }

        public void reload() {
            this.config = (DeviceMessageSendConfig) FastBeanCopier.copy(this.context.getJob().getConfiguration(), new DeviceMessageSendConfig(), new String[0]);
            this.config.validate();
            if (this.config.getSelectorSpec() != null) {
                DeviceSelector createSelector = DeviceMessageSendTaskExecutorProvider.this.selectorBuilder.createSelector(this.config.getSelectorSpec());
                createSelector.getClass();
                this.selector = createSelector::select;
            } else {
                if (StringUtils.hasText(this.config.deviceId)) {
                    this.selector = map -> {
                        return DeviceMessageSendTaskExecutorProvider.this.registry.getDevice(this.config.getDeviceId()).flux();
                    };
                    return;
                }
                if (StringUtils.hasText(this.config.productId)) {
                    DeviceSelector createSelector2 = DeviceMessageSendTaskExecutorProvider.this.selectorBuilder.createSelector(DeviceSelectorProviders.product(this.config.productId));
                    createSelector2.getClass();
                    this.selector = createSelector2::select;
                } else if (this.config.isFixed() && MapUtils.isNotEmpty(this.config.getMessage())) {
                    this.selector = map2 -> {
                        return DeviceMessageSendTaskExecutorProvider.this.registry.getDevice(this.config.getDeviceIdInMessage(map2)).flux();
                    };
                } else {
                    this.selector = map3 -> {
                        return DeviceMessageSendTaskExecutorProvider.this.registry.getDevice((String) map3.getOrDefault("deviceId", this.config.getMessage() == null ? null : this.config.getMessage().get("deviceId"))).flux();
                    };
                }
            }
        }
    }

    public String getExecutor() {
        return EXECUTOR;
    }

    public Mono<TaskExecutor> createTask(ExecutionContext executionContext) {
        return Mono.just(new DeviceMessageSendTaskExecutor(executionContext));
    }

    public DeviceMessageSendTaskExecutorProvider(DeviceRegistry deviceRegistry, DeviceSelectorBuilder deviceSelectorBuilder) {
        this.registry = deviceRegistry;
        this.selectorBuilder = deviceSelectorBuilder;
    }
}
