/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.rule.engine.scene;

import java.time.Duration;
import java.util.Map;
import java.util.function.Function;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.community.rule.engine.scene.SceneData;
import org.jetlinks.community.rule.engine.scene.SceneFilter;
import org.jetlinks.community.rule.engine.scene.SceneRule;
import org.jetlinks.community.rule.engine.scene.Trigger;
import org.jetlinks.community.rule.engine.scene.term.limit.ShakeLimitGrouping;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.utils.FluxUtils;
import org.jetlinks.reactor.ql.ReactorQL;
import org.jetlinks.reactor.ql.ReactorQLContext;
import org.jetlinks.reactor.ql.ReactorQLRecord;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.function.Function3;
import reactor.util.context.ContextView;

public class SceneTaskExecutorProvider
implements TaskExecutorProvider {
    private static final Logger log = LoggerFactory.getLogger(SceneTaskExecutorProvider.class);
    private static final int BACKPRESSURE_BUFFER_MAX_SIZE = Integer.getInteger("scene.backpressure-buffer-size", 100000);
    public static final String EXECUTOR = "scene";
    private final EventBus eventBus;
    private final SceneFilter filter;

    public String getExecutor() {
        return EXECUTOR;
    }

    public Mono<TaskExecutor> createTask(ExecutionContext context) {
        return Mono.just((Object)((Object)new SceneTaskExecutor(context)));
    }

    public SceneTaskExecutorProvider(EventBus eventBus, SceneFilter filter) {
        this.eventBus = eventBus;
        this.filter = filter;
    }

    class SceneTaskExecutor
    extends AbstractTaskExecutor {
        private SceneRule rule;

        public SceneTaskExecutor(ExecutionContext context) {
            super(context);
            this.load();
        }

        public String getName() {
            return this.context.getJob().getName();
        }

        protected Disposable doStart() {
            this.disposable = this.init();
            return this.disposable;
        }

        public void validate() {
            this.rule.validate();
        }

        public void reload() {
            this.load();
            this.doStart();
        }

        private void load() {
            SceneRule sceneRule = (SceneRule)FastBeanCopier.copy((Object)this.context.getJob().getConfiguration(), (Object)new SceneRule(), (String[])new String[0]);
            sceneRule.validate();
            this.rule = sceneRule;
        }

        private Object getDataId(Map<String, Object> data) {
            Object header = data.get("headers");
            Object id = header instanceof Map ? ((Map)header).get(PropertyConstants.uid.getKey()) : data.get(PropertyConstants.uid.getKey());
            if (null == id) {
                id = IDGenerator.RANDOM.generate();
            }
            return id;
        }

        private ReactorQLContext createReactorQLContext() {
            return ReactorQLContext.ofDatasource(table -> {
                if (table.startsWith("/")) {
                    return (Publisher)this.subscribe((String)table).as(FluxUtils.distinct(this::getDataId, (Duration)Duration.ofSeconds(1L)));
                }
                return this.context.getInput().accept().flatMap(RuleData::dataToMap);
            });
        }

        private Disposable init() {
            Flux source;
            boolean useBranch;
            SqlRequest request;
            if (this.disposable != null) {
                this.disposable.dispose();
            }
            if ((request = this.rule.createSql(!(useBranch = CollectionUtils.isNotEmpty(this.rule.getBranches())))).isEmpty()) {
                source = this.context.getInput().accept().flatMap(RuleData::dataToMap);
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("init scene [{}:{}], sql:{}", new Object[]{this.rule.getId(), this.rule.getName(), request.toNativeSql()});
                }
                ReactorQLContext qlContext = this.createReactorQLContext();
                for (Object parameter : request.getParameters()) {
                    qlContext.bind(parameter);
                }
                source = ReactorQL.builder().sql(new String[]{request.getSql()}).build().start(qlContext).map(ReactorQLRecord::asMap);
            }
            if (useBranch) {
                return this.rule.createBranchHandler((Flux<Map<String, Object>>)source, (Function3<Integer, String, Map<String, Object>, Mono<Void>>)((Function3)(idx, nodeId, data) -> {
                    if (log.isDebugEnabled()) {
                        log.debug("scene [{}] branch [{}] execute", (Object)this.rule.getId(), nodeId);
                    }
                    RuleData ruleData = this.context.newRuleData(data);
                    return (Mono)this.context.getOutput().write(nodeId, ruleData).onErrorResume(err -> this.context.onError(err, ruleData)).as((Function)this.tracer());
                }));
            }
            Trigger.GroupShakeLimit shakeLimit = this.rule.getTrigger().getShakeLimit();
            if (shakeLimit != null && shakeLimit.isEnabled()) {
                ShakeLimitGrouping<Map<String, Object>> grouping = shakeLimit.createGrouping();
                source = shakeLimit.transfer(source, (time, flux) -> grouping.group((Flux<Map<String, Object>>)flux).flatMap(group -> group.window(time), Integer.MAX_VALUE), (map, total) -> map.put("_total", total));
            }
            return source.flatMap(this::handleOutput).subscribe();
        }

        private Flux<Map<String, Object>> subscribe(String topic) {
            return SceneTaskExecutorProvider.this.eventBus.subscribe(Subscription.builder().justLocal().topics(new String[]{topic}).subscriberId("scene:" + this.rule.getId()).build()).handle((topicPayload, synchronousSink) -> {
                try {
                    synchronousSink.next((Object)topicPayload.bodyToJson(true));
                }
                catch (Throwable err) {
                    log.warn("decode payload error {}", (Object)topicPayload.getTopic(), (Object)err);
                }
            });
        }

        private Mono<Void> handleOutput(RuleData data) {
            return data.dataToMap().filterWhen(map -> {
                SceneData sceneData = new SceneData();
                sceneData.setId((String)IDGenerator.SNOW_FLAKE_STRING.generate());
                sceneData.setRule(this.rule);
                sceneData.setOutput((Map<String, Object>)map);
                log.info("execute scene {} {} : {}", new Object[]{this.rule.getId(), this.rule.getName(), map});
                return SceneTaskExecutorProvider.this.filter.filter(sceneData).defaultIfEmpty((Object)true);
            }).flatMap(map -> ((Mono)this.context.getOutput().write(data.newData(map)).as((Function)this.tracer())).contextWrite(ctx -> TraceHolder.readToContext((ContextView)ctx, (Map)map))).onErrorResume(err -> this.context.onError(err, data)).then();
        }

        private Mono<Void> handleOutput(Map<String, Object> data) {
            return this.handleOutput(this.context.newRuleData(data));
        }

        public Mono<Void> execute(RuleData ruleData) {
            if (CollectionUtils.isNotEmpty(this.rule.getBranches())) {
                if (log.isDebugEnabled()) {
                    log.debug("scene [{}] execute", (Object)this.rule.getId());
                }
                RuleData newData = this.context.newRuleData((Object)ruleData);
                return ((Mono)this.context.getOutput().write(newData).onErrorResume(err -> this.context.onError(err, ruleData)).as((Function)this.tracer())).then();
            }
            return this.handleOutput(ruleData);
        }
    }
}

