/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.rule.engine.executor;

import java.util.Collection;
import java.util.Optional;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.reactor.ql.ReactorQL;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
public class ReactorQLTaskExecutorProvider
implements TaskExecutorProvider {
    private final EventBus eventBus;

    public String getExecutor() {
        return "reactor-ql";
    }

    public Mono<TaskExecutor> createTask(ExecutionContext context) {
        return Mono.just((Object)((Object)new ReactorQLTaskExecutor(context)));
    }

    public ReactorQLTaskExecutorProvider(EventBus eventBus) {
        this.eventBus = eventBus;
    }

    class ReactorQLTaskExecutor
    extends AbstractTaskExecutor {
        private ReactorQL reactorQL;

        public ReactorQLTaskExecutor(ExecutionContext context) {
            super(context);
            this.reactorQL = this.createQl();
        }

        public String getName() {
            return "ReactorQL";
        }

        protected Disposable doStart() {
            Flux dataStream = !CollectionUtils.isEmpty((Collection)this.context.getJob().getInputs()) ? this.context.getInput().accept().flatMap(ruleData -> this.reactorQL.start(Flux.just((Object)RuleDataHelper.toContextMap((RuleData)ruleData))).map(arg_0 -> ((RuleData)ruleData).newData(arg_0)).onErrorResume(err -> {
                this.context.getLogger().error(err.getMessage(), new Object[]{err});
                return this.context.onError(err, null).then(Mono.empty());
            })) : this.reactorQL.start(table -> {
                if (table == null || table.equalsIgnoreCase("dual")) {
                    return Flux.just((Object)1);
                }
                if (table.startsWith("/")) {
                    return ReactorQLTaskExecutorProvider.this.eventBus.subscribe(Subscription.builder().subscriberId("rule-engine:".concat(this.context.getInstanceId()).concat(":").concat(this.context.getJob().getNodeId())).topics(new String[]{table}).local().build()).flatMap(payload -> {
                        try {
                            return Mono.just((Object)payload.bodyToJson(true));
                        }
                        catch (Throwable error) {
                            return this.context.onError(error, null);
                        }
                    });
                }
                return Flux.just((Object)1);
            }).cast(Object.class);
            return dataStream.flatMap(result -> {
                RuleData data = this.context.newRuleData(result);
                return this.context.getOutput().write((Publisher)Mono.just((Object)data)).then(this.context.fireEvent("result", data));
            }).onErrorResume(err -> this.context.onError(err, null)).subscribe();
        }

        protected ReactorQL createQl() {
            try {
                ReactorQL.Builder builder = Optional.ofNullable(this.context.getJob().getConfiguration()).map(map -> map.get("sql")).map(String::valueOf).map(arg_0 -> ReactorQLTaskExecutor.lambda$createQl$68(ReactorQL.builder(), arg_0)).orElseThrow(() -> new IllegalArgumentException("\u914d\u7f6esql\u9519\u8bef"));
                return builder.build();
            }
            catch (Exception e) {
                throw new IllegalArgumentException("SQL\u683c\u5f0f\u9519\u8bef:" + e.getMessage(), e);
            }
        }

        public void reload() {
            this.reactorQL = this.createQl();
            if (this.disposable != null) {
                this.disposable.dispose();
            }
            this.start();
        }

        public void validate() {
            this.createQl();
        }

        private static /* synthetic */ ReactorQL.Builder lambda$createQl$68(ReactorQL.Builder rec$, String xva$0) {
            return rec$.sql(new String[]{xva$0});
        }
    }
}

