package org.jetlinks.community.rule.engine.executor;

import java.util.Optional;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.reactor.ql.ReactorQL;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
/* loaded from: input_file:org/jetlinks/community/rule/engine/executor/ReactorQLTaskExecutorProvider.class */
public class ReactorQLTaskExecutorProvider implements TaskExecutorProvider {
    private final EventBus eventBus;

    /* loaded from: input_file:org/jetlinks/community/rule/engine/executor/ReactorQLTaskExecutorProvider$ReactorQLTaskExecutor.class */
    class ReactorQLTaskExecutor extends AbstractTaskExecutor {
        private ReactorQL reactorQL;

        public ReactorQLTaskExecutor(ExecutionContext executionContext) {
            super(executionContext);
            this.reactorQL = createQl();
        }

        public String getName() {
            return "ReactorQL";
        }

        protected Disposable doStart() {
            return (!CollectionUtils.isEmpty(this.context.getJob().getInputs()) ? this.context.getInput().accept().flatMap(ruleData -> {
                Flux start = this.reactorQL.start(Flux.just(RuleDataHelper.toContextMap(ruleData)));
                ruleData.getClass();
                return start.map((v1) -> {
                    return r1.newData(v1);
                }).onErrorResume(th -> {
                    this.context.getLogger().error(th.getMessage(), new Object[]{th});
                    return this.context.onError(th, (RuleData) null).then(Mono.empty());
                });
            }) : this.reactorQL.start(str -> {
                return (str == null || str.equalsIgnoreCase("dual")) ? Flux.just(1) : str.startsWith("/") ? ReactorQLTaskExecutorProvider.this.eventBus.subscribe(Subscription.builder().subscriberId("rule-engine:".concat(this.context.getInstanceId()).concat(":").concat(this.context.getJob().getNodeId())).topics(new String[]{str}).local().build()).flatMap(topicPayload -> {
                    try {
                        return Mono.just(topicPayload.bodyToJson(true));
                    } catch (Throwable th) {
                        return this.context.onError(th, (RuleData) null);
                    }
                }) : Flux.just(1);
            }).cast(Object.class)).flatMap(obj -> {
                RuleData newRuleData = this.context.newRuleData(obj);
                return this.context.getOutput().write(Mono.just(newRuleData)).then(this.context.fireEvent("result", newRuleData));
            }).onErrorResume(th -> {
                return this.context.onError(th, (RuleData) null);
            }).subscribe();
        }

        protected ReactorQL createQl() {
            try {
                Optional map = Optional.ofNullable(this.context.getJob().getConfiguration()).map(map2 -> {
                    return map2.get("sql");
                }).map(String::valueOf);
                ReactorQL.Builder builder = ReactorQL.builder();
                builder.getClass();
                return ((ReactorQL.Builder) map.map(str -> {
                    return builder.sql(new String[]{str});
                }).orElseThrow(() -> {
                    return new IllegalArgumentException("配置sql错误");
                })).build();
            } catch (Exception e) {
                throw new IllegalArgumentException("SQL格式错误:" + e.getMessage(), e);
            }
        }

        public void reload() {
            this.reactorQL = createQl();
            if (this.disposable != null) {
                this.disposable.dispose();
            }
            start();
        }

        public void validate() {
            createQl();
        }
    }

    public String getExecutor() {
        return "reactor-ql";
    }

    public Mono<TaskExecutor> createTask(ExecutionContext executionContext) {
        return Mono.just(new ReactorQLTaskExecutor(executionContext));
    }

    public ReactorQLTaskExecutorProvider(EventBus eventBus) {
        this.eventBus = eventBus;
    }
}
