package org.jetlinks.community.rule.engine.executor;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.utils.TimeUtils;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

@Component
/* loaded from: input_file:org/jetlinks/community/rule/engine/executor/DelayTaskExecutorProvider.class */
public class DelayTaskExecutorProvider implements TaskExecutorProvider {
    public static final String EXECUTOR = "delay";
    private final Scheduler scheduler;
    private static final Logger log = LoggerFactory.getLogger(DelayTaskExecutorProvider.class);
    static ObjectMapper objectMapper = new ObjectMapper();

    /* loaded from: input_file:org/jetlinks/community/rule/engine/executor/DelayTaskExecutorProvider$DelayTaskExecutor.class */
    static class DelayTaskExecutor extends AbstractTaskExecutor {
        private DelayTaskExecutorConfig config;
        private final Scheduler scheduler;

        public DelayTaskExecutor(ExecutionContext executionContext, Scheduler scheduler) {
            super(executionContext);
            this.scheduler = scheduler;
            init();
        }

        protected Disposable doStart() {
            if (this.disposable != null) {
                this.disposable.dispose();
            }
            Flux<RuleData> create = this.config.create(this.context.getInput().accept(), this.context, this.scheduler);
            ExecutionContext executionContext = this.context;
            executionContext.getClass();
            return create.map((v1) -> {
                return r1.newRuleData(v1);
            }).flatMap(ruleData -> {
                return this.context.fireEvent("result", ruleData).then(this.context.getOutput().write(Mono.just(ruleData)));
            }).onErrorResume(th -> {
                return this.context.onError(th, (RuleData) null);
            }).subscribe();
        }

        void init() {
            this.config = DelayTaskExecutorConfig.of(this.context.getJob().getConfiguration());
        }

        public void reload() {
            init();
            this.disposable = doStart();
        }

        public String getName() {
            return "延迟";
        }
    }

    /* loaded from: input_file:org/jetlinks/community/rule/engine/executor/DelayTaskExecutorProvider$DelayTaskExecutorConfig.class */
    public static class DelayTaskExecutorConfig {
        private PauseType pauseType;
        private int timeout;
        private ChronoUnit timeoutUnits;
        private int rate;
        private int nbRateUnits;
        private ChronoUnit rateUnits;
        private int randomFirst;
        private int randomLast;
        private ChronoUnit randomUnits;
        private String groupExpression;
        private boolean errorOnDrop;

        public Flux<RuleData> create(Flux<RuleData> flux, ExecutionContext executionContext, Scheduler scheduler) {
            return this.pauseType.create(this, flux, executionContext, scheduler);
        }

        public static DelayTaskExecutorConfig of(Map<String, Object> map) {
            return (DelayTaskExecutorConfig) FastBeanCopier.copy(map, new DelayTaskExecutorConfig(), new String[0]);
        }

        public PauseType getPauseType() {
            return this.pauseType;
        }

        public int getTimeout() {
            return this.timeout;
        }

        public ChronoUnit getTimeoutUnits() {
            return this.timeoutUnits;
        }

        public int getRate() {
            return this.rate;
        }

        public int getNbRateUnits() {
            return this.nbRateUnits;
        }

        public ChronoUnit getRateUnits() {
            return this.rateUnits;
        }

        public int getRandomFirst() {
            return this.randomFirst;
        }

        public int getRandomLast() {
            return this.randomLast;
        }

        public ChronoUnit getRandomUnits() {
            return this.randomUnits;
        }

        public String getGroupExpression() {
            return this.groupExpression;
        }

        public boolean isErrorOnDrop() {
            return this.errorOnDrop;
        }

        public void setPauseType(PauseType pauseType) {
            this.pauseType = pauseType;
        }

        public void setTimeout(int i) {
            this.timeout = i;
        }

        public void setTimeoutUnits(ChronoUnit chronoUnit) {
            this.timeoutUnits = chronoUnit;
        }

        public void setRate(int i) {
            this.rate = i;
        }

        public void setNbRateUnits(int i) {
            this.nbRateUnits = i;
        }

        public void setRateUnits(ChronoUnit chronoUnit) {
            this.rateUnits = chronoUnit;
        }

        public void setRandomFirst(int i) {
            this.randomFirst = i;
        }

        public void setRandomLast(int i) {
            this.randomLast = i;
        }

        public void setRandomUnits(ChronoUnit chronoUnit) {
            this.randomUnits = chronoUnit;
        }

        public void setGroupExpression(String str) {
            this.groupExpression = str;
        }

        public void setErrorOnDrop(boolean z) {
            this.errorOnDrop = z;
        }
    }

    /* loaded from: input_file:org/jetlinks/community/rule/engine/executor/DelayTaskExecutorProvider$PauseType.class */
    public enum PauseType {
        delayv { // from class: org.jetlinks.community.rule.engine.executor.DelayTaskExecutorProvider.PauseType.1
            @Override // org.jetlinks.community.rule.engine.executor.DelayTaskExecutorProvider.PauseType
            Flux<RuleData> create(DelayTaskExecutorConfig delayTaskExecutorConfig, Flux<RuleData> flux, ExecutionContext executionContext, Scheduler scheduler) {
                return flux.delayUntil(ruleData -> {
                    Map contextMap = RuleDataHelper.toContextMap(ruleData);
                    if (contextMap.get(DelayTaskExecutorProvider.EXECUTOR) == null) {
                        return Mono.never();
                    }
                    Duration parse = TimeUtils.parse(String.valueOf(contextMap.get(DelayTaskExecutorProvider.EXECUTOR)));
                    executionContext.getLogger().debug("delay execution {} ", new Object[]{parse});
                    return Mono.delay(parse, scheduler);
                });
            }
        },
        delay { // from class: org.jetlinks.community.rule.engine.executor.DelayTaskExecutorProvider.PauseType.2
            @Override // org.jetlinks.community.rule.engine.executor.DelayTaskExecutorProvider.PauseType
            Flux<RuleData> create(DelayTaskExecutorConfig delayTaskExecutorConfig, Flux<RuleData> flux, ExecutionContext executionContext, Scheduler scheduler) {
                return flux.delayUntil(ruleData -> {
                    Duration of = Duration.of(delayTaskExecutorConfig.getTimeout(), delayTaskExecutorConfig.getTimeoutUnits());
                    executionContext.getLogger().debug("delay execution {} ", new Object[]{of});
                    return Mono.delay(of, scheduler);
                });
            }
        },
        random { // from class: org.jetlinks.community.rule.engine.executor.DelayTaskExecutorProvider.PauseType.3
            @Override // org.jetlinks.community.rule.engine.executor.DelayTaskExecutorProvider.PauseType
            Flux<RuleData> create(DelayTaskExecutorConfig delayTaskExecutorConfig, Flux<RuleData> flux, ExecutionContext executionContext, Scheduler scheduler) {
                return flux.delayUntil(ruleData -> {
                    Duration of = Duration.of(ThreadLocalRandom.current().nextLong(delayTaskExecutorConfig.getRandomFirst(), delayTaskExecutorConfig.getRandomLast()), delayTaskExecutorConfig.getRandomUnits());
                    executionContext.getLogger().debug("delay execution {} ", new Object[]{of});
                    return Mono.delay(of, scheduler);
                });
            }
        },
        rate { // from class: org.jetlinks.community.rule.engine.executor.DelayTaskExecutorProvider.PauseType.4
            @Override // org.jetlinks.community.rule.engine.executor.DelayTaskExecutorProvider.PauseType
            Flux<RuleData> create(DelayTaskExecutorConfig delayTaskExecutorConfig, Flux<RuleData> flux, ExecutionContext executionContext, Scheduler scheduler) {
                Duration of = Duration.of(delayTaskExecutorConfig.nbRateUnits, delayTaskExecutorConfig.getRateUnits());
                return flux.window(of, scheduler).flatMap(flux2 -> {
                    AtomicLong atomicLong = new AtomicLong();
                    return (delayTaskExecutorConfig.isErrorOnDrop() ? flux2.index().flatMap(tuple2 -> {
                        return ((Long) tuple2.getT1()).longValue() < ((long) delayTaskExecutorConfig.getRate()) ? Mono.just(tuple2.getT2()) : executionContext.fireEvent("error", executionContext.newRuleData(tuple2.getT2()));
                    }) : flux2.take(delayTaskExecutorConfig.getRate())).doOnNext(ruleData -> {
                        atomicLong.incrementAndGet();
                    }).doOnComplete(() -> {
                        if (atomicLong.get() > 0) {
                            executionContext.getLogger().debug("rate limit execution {}/{}", new Object[]{atomicLong, of});
                        }
                    });
                }, Integer.MAX_VALUE);
            }
        };

        abstract Flux<RuleData> create(DelayTaskExecutorConfig delayTaskExecutorConfig, Flux<RuleData> flux, ExecutionContext executionContext, Scheduler scheduler);
    }

    public String getExecutor() {
        return EXECUTOR;
    }

    public Mono<TaskExecutor> createTask(ExecutionContext executionContext) {
        return Mono.just(new DelayTaskExecutor(executionContext, this.scheduler));
    }

    public DelayTaskExecutorProvider(Scheduler scheduler) {
        this.scheduler = scheduler;
    }
}
