package org.jetlinks.community.rule.engine.scene;

import java.time.Duration;
import java.util.Map;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.PropertyConstants;
import org.jetlinks.community.rule.engine.scene.Trigger;
import org.jetlinks.community.rule.engine.scene.term.limit.ShakeLimitGrouping;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.trace.TraceHolder;
import org.jetlinks.core.utils.FluxUtils;
import org.jetlinks.reactor.ql.ReactorQL;
import org.jetlinks.reactor.ql.ReactorQLContext;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/community/rule/engine/scene/SceneTaskExecutorProvider.class */
public class SceneTaskExecutorProvider implements TaskExecutorProvider {
    private static final Logger log = LoggerFactory.getLogger(SceneTaskExecutorProvider.class);
    private static final int BACKPRESSURE_BUFFER_MAX_SIZE = Integer.getInteger("scene.backpressure-buffer-size", 100000).intValue();
    public static final String EXECUTOR = "scene";
    private final EventBus eventBus;
    private final SceneFilter filter;

    /* loaded from: input_file:org/jetlinks/community/rule/engine/scene/SceneTaskExecutorProvider$SceneTaskExecutor.class */
    class SceneTaskExecutor extends AbstractTaskExecutor {
        private SceneRule rule;

        public SceneTaskExecutor(ExecutionContext executionContext) {
            super(executionContext);
            load();
        }

        public String getName() {
            return this.context.getJob().getName();
        }

        protected Disposable doStart() {
            Disposable init = init();
            this.disposable = init;
            return init;
        }

        public void validate() {
            this.rule.validate();
        }

        public void reload() {
            load();
            doStart();
        }

        private void load() {
            SceneRule sceneRule = (SceneRule) FastBeanCopier.copy(this.context.getJob().getConfiguration(), new SceneRule(), new String[0]);
            sceneRule.validate();
            this.rule = sceneRule;
        }

        private Object getDataId(Map<String, Object> map) {
            Object obj = map.get("headers");
            Object obj2 = obj instanceof Map ? ((Map) obj).get(PropertyConstants.uid.getKey()) : map.get(PropertyConstants.uid.getKey());
            if (null == obj2) {
                obj2 = IDGenerator.RANDOM.generate();
            }
            return obj2;
        }

        private ReactorQLContext createReactorQLContext() {
            return ReactorQLContext.ofDatasource(str -> {
                return str.startsWith("/") ? (Publisher) subscribe(str).as(FluxUtils.distinct(this::getDataId, Duration.ofSeconds(1L))) : this.context.getInput().accept().flatMap((v0) -> {
                    return v0.dataToMap();
                });
            });
        }

        private Disposable init() {
            Flux<Map<String, Object>> map;
            if (this.disposable != null) {
                this.disposable.dispose();
            }
            boolean isNotEmpty = CollectionUtils.isNotEmpty(this.rule.getBranches());
            SqlRequest createSql = this.rule.createSql(!isNotEmpty);
            if (createSql.isEmpty()) {
                map = this.context.getInput().accept().flatMap((v0) -> {
                    return v0.dataToMap();
                });
            } else {
                if (SceneTaskExecutorProvider.log.isDebugEnabled()) {
                    SceneTaskExecutorProvider.log.debug("init scene [{}:{}], sql:{}", new Object[]{this.rule.getId(), this.rule.getName(), createSql.toNativeSql()});
                }
                ReactorQLContext createReactorQLContext = createReactorQLContext();
                for (Object obj : createSql.getParameters()) {
                    createReactorQLContext.bind(obj);
                }
                map = ReactorQL.builder().sql(new String[]{createSql.getSql()}).build().start(createReactorQLContext).map((v0) -> {
                    return v0.asMap();
                });
            }
            if (isNotEmpty) {
                return this.rule.createBranchHandler(map, (num, str, map2) -> {
                    if (SceneTaskExecutorProvider.log.isDebugEnabled()) {
                        SceneTaskExecutorProvider.log.debug("scene [{}] branch [{}] execute", this.rule.getId(), str);
                    }
                    RuleData newRuleData = this.context.newRuleData(map2);
                    return (Mono) this.context.getOutput().write(str, newRuleData).onErrorResume(th -> {
                        return this.context.onError(th, newRuleData);
                    }).as(tracer());
                });
            }
            Trigger.GroupShakeLimit shakeLimit = this.rule.getTrigger().getShakeLimit();
            if (shakeLimit != null && shakeLimit.isEnabled()) {
                ShakeLimitGrouping<Map<String, Object>> createGrouping = shakeLimit.createGrouping();
                map = shakeLimit.transfer(map, (duration, flux) -> {
                    return createGrouping.group(flux).flatMap(flux -> {
                        return flux.window(duration);
                    }, Integer.MAX_VALUE);
                }, (map3, l) -> {
                    map3.put("_total", l);
                });
            }
            return map.flatMap(this::handleOutput).subscribe();
        }

        private Flux<Map<String, Object>> subscribe(String str) {
            return SceneTaskExecutorProvider.this.eventBus.subscribe(Subscription.builder().justLocal().topics(new String[]{str}).subscriberId("scene:" + this.rule.getId()).build()).handle((topicPayload, synchronousSink) -> {
                try {
                    synchronousSink.next(topicPayload.bodyToJson(true));
                } catch (Throwable th) {
                    SceneTaskExecutorProvider.log.warn("decode payload error {}", topicPayload.getTopic(), th);
                }
            });
        }

        private Mono<Void> handleOutput(RuleData ruleData) {
            return ruleData.dataToMap().filterWhen(map -> {
                SceneData sceneData = new SceneData();
                sceneData.setId((String) IDGenerator.SNOW_FLAKE_STRING.generate());
                sceneData.setRule(this.rule);
                sceneData.setOutput(map);
                SceneTaskExecutorProvider.log.info("execute scene {} {} : {}", new Object[]{this.rule.getId(), this.rule.getName(), map});
                return SceneTaskExecutorProvider.this.filter.filter(sceneData).defaultIfEmpty(true);
            }).flatMap(map2 -> {
                return ((Mono) this.context.getOutput().write(ruleData.newData(map2)).as(tracer())).contextWrite(context -> {
                    return TraceHolder.readToContext(context, map2);
                });
            }).onErrorResume(th -> {
                return this.context.onError(th, ruleData);
            }).then();
        }

        private Mono<Void> handleOutput(Map<String, Object> map) {
            return handleOutput(this.context.newRuleData(map));
        }

        public Mono<Void> execute(RuleData ruleData) {
            if (!CollectionUtils.isNotEmpty(this.rule.getBranches())) {
                return handleOutput(ruleData);
            }
            if (SceneTaskExecutorProvider.log.isDebugEnabled()) {
                SceneTaskExecutorProvider.log.debug("scene [{}] execute", this.rule.getId());
            }
            return ((Mono) this.context.getOutput().write(this.context.newRuleData(ruleData)).onErrorResume(th -> {
                return this.context.onError(th, ruleData);
            }).as(tracer())).then();
        }
    }

    public String getExecutor() {
        return "scene";
    }

    public Mono<TaskExecutor> createTask(ExecutionContext executionContext) {
        return Mono.just(new SceneTaskExecutor(executionContext));
    }

    public SceneTaskExecutorProvider(EventBus eventBus, SceneFilter sceneFilter) {
        this.eventBus = eventBus;
        this.filter = sceneFilter;
    }
}
