/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.rule.engine.executor;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.hswebframework.web.bean.FastBeanCopier;
import org.jetlinks.community.utils.TimeUtils;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.jetlinks.rule.engine.api.task.TaskExecutor;
import org.jetlinks.rule.engine.api.task.TaskExecutorProvider;
import org.jetlinks.rule.engine.defaults.AbstractTaskExecutor;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

@Component
public class DelayTaskExecutorProvider
implements TaskExecutorProvider {
    private static final Logger log = LoggerFactory.getLogger(DelayTaskExecutorProvider.class);
    public static final String EXECUTOR = "delay";
    private final Scheduler scheduler;
    static ObjectMapper objectMapper = new ObjectMapper();

    public String getExecutor() {
        return EXECUTOR;
    }

    public Mono<TaskExecutor> createTask(ExecutionContext context) {
        return Mono.just((Object)((Object)new DelayTaskExecutor(context, this.scheduler)));
    }

    public DelayTaskExecutorProvider(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    public static enum PauseType {
        delayv{

            @Override
            Flux<RuleData> create(DelayTaskExecutorConfig config, Flux<RuleData> flux, ExecutionContext context, Scheduler scheduler) {
                return flux.delayUntil(el -> {
                    Map map = RuleDataHelper.toContextMap((RuleData)el);
                    if (map.get(DelayTaskExecutorProvider.EXECUTOR) == null) {
                        return Mono.never();
                    }
                    Duration duration = TimeUtils.parse((String)String.valueOf(map.get(DelayTaskExecutorProvider.EXECUTOR)));
                    context.getLogger().debug("delay execution {} ", new Object[]{duration});
                    return Mono.delay((Duration)duration, (Scheduler)scheduler);
                });
            }
        }
        ,
        delay{

            @Override
            Flux<RuleData> create(DelayTaskExecutorConfig config, Flux<RuleData> flux, ExecutionContext context, Scheduler scheduler) {
                return flux.delayUntil(el -> {
                    Duration duration = Duration.of(config.getTimeout(), config.getTimeoutUnits());
                    context.getLogger().debug("delay execution {} ", new Object[]{duration});
                    return Mono.delay((Duration)duration, (Scheduler)scheduler);
                });
            }
        }
        ,
        random{

            @Override
            Flux<RuleData> create(DelayTaskExecutorConfig config, Flux<RuleData> flux, ExecutionContext context, Scheduler scheduler) {
                return flux.delayUntil(el -> {
                    Duration duration = Duration.of(ThreadLocalRandom.current().nextLong(config.getRandomFirst(), config.getRandomLast()), config.getRandomUnits());
                    context.getLogger().debug("delay execution {} ", new Object[]{duration});
                    return Mono.delay((Duration)duration, (Scheduler)scheduler);
                });
            }
        }
        ,
        rate{

            @Override
            Flux<RuleData> create(DelayTaskExecutorConfig config, Flux<RuleData> flux, ExecutionContext context, Scheduler scheduler) {
                Duration duration = Duration.of(config.nbRateUnits, config.getRateUnits());
                return flux.window(duration, scheduler).flatMap(window -> {
                    AtomicLong counter = new AtomicLong();
                    Flux stream = config.isErrorOnDrop() ? window.index().flatMap(tp2 -> {
                        if ((Long)tp2.getT1() < (long)config.getRate()) {
                            return Mono.just((Object)tp2.getT2());
                        }
                        return context.fireEvent("error", context.newRuleData(tp2.getT2()));
                    }) : window.take((long)config.getRate());
                    return stream.doOnNext(v -> counter.incrementAndGet()).doOnComplete(() -> {
                        if (counter.get() > 0L) {
                            context.getLogger().debug("rate limit execution {}/{}", new Object[]{counter, duration});
                        }
                    });
                }, Integer.MAX_VALUE);
            }
        };


        abstract Flux<RuleData> create(DelayTaskExecutorConfig var1, Flux<RuleData> var2, ExecutionContext var3, Scheduler var4);
    }

    public static class DelayTaskExecutorConfig {
        private PauseType pauseType;
        private int timeout;
        private ChronoUnit timeoutUnits;
        private int rate;
        private int nbRateUnits;
        private ChronoUnit rateUnits;
        private int randomFirst;
        private int randomLast;
        private ChronoUnit randomUnits;
        private String groupExpression;
        private boolean errorOnDrop;

        public Flux<RuleData> create(Flux<RuleData> flux, ExecutionContext context, Scheduler scheduler) {
            return this.pauseType.create(this, flux, context, scheduler);
        }

        public static DelayTaskExecutorConfig of(Map<String, Object> configuration) {
            return (DelayTaskExecutorConfig)FastBeanCopier.copy(configuration, (Object)new DelayTaskExecutorConfig(), (String[])new String[0]);
        }

        public PauseType getPauseType() {
            return this.pauseType;
        }

        public int getTimeout() {
            return this.timeout;
        }

        public ChronoUnit getTimeoutUnits() {
            return this.timeoutUnits;
        }

        public int getRate() {
            return this.rate;
        }

        public int getNbRateUnits() {
            return this.nbRateUnits;
        }

        public ChronoUnit getRateUnits() {
            return this.rateUnits;
        }

        public int getRandomFirst() {
            return this.randomFirst;
        }

        public int getRandomLast() {
            return this.randomLast;
        }

        public ChronoUnit getRandomUnits() {
            return this.randomUnits;
        }

        public String getGroupExpression() {
            return this.groupExpression;
        }

        public boolean isErrorOnDrop() {
            return this.errorOnDrop;
        }

        public void setPauseType(PauseType pauseType) {
            this.pauseType = pauseType;
        }

        public void setTimeout(int timeout) {
            this.timeout = timeout;
        }

        public void setTimeoutUnits(ChronoUnit timeoutUnits) {
            this.timeoutUnits = timeoutUnits;
        }

        public void setRate(int rate) {
            this.rate = rate;
        }

        public void setNbRateUnits(int nbRateUnits) {
            this.nbRateUnits = nbRateUnits;
        }

        public void setRateUnits(ChronoUnit rateUnits) {
            this.rateUnits = rateUnits;
        }

        public void setRandomFirst(int randomFirst) {
            this.randomFirst = randomFirst;
        }

        public void setRandomLast(int randomLast) {
            this.randomLast = randomLast;
        }

        public void setRandomUnits(ChronoUnit randomUnits) {
            this.randomUnits = randomUnits;
        }

        public void setGroupExpression(String groupExpression) {
            this.groupExpression = groupExpression;
        }

        public void setErrorOnDrop(boolean errorOnDrop) {
            this.errorOnDrop = errorOnDrop;
        }
    }

    static class DelayTaskExecutor
    extends AbstractTaskExecutor {
        private DelayTaskExecutorConfig config;
        private final Scheduler scheduler;

        public DelayTaskExecutor(ExecutionContext context, Scheduler scheduler) {
            super(context);
            this.scheduler = scheduler;
            this.init();
        }

        protected Disposable doStart() {
            if (this.disposable != null) {
                this.disposable.dispose();
            }
            return this.config.create((Flux<RuleData>)this.context.getInput().accept(), this.context, this.scheduler).map(arg_0 -> ((ExecutionContext)this.context).newRuleData(arg_0)).flatMap(ruleData -> this.context.fireEvent("result", ruleData).then(this.context.getOutput().write((Publisher)Mono.just((Object)ruleData)))).onErrorResume(err -> this.context.onError(err, null)).subscribe();
        }

        void init() {
            this.config = DelayTaskExecutorConfig.of(this.context.getJob().getConfiguration());
        }

        public void reload() {
            this.init();
            this.disposable = this.doStart();
        }

        public String getName() {
            return "\u5ef6\u8fdf";
        }
    }
}

