/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.rule.engine.commons;

import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
import java.time.Duration;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;

public class ShakeLimit
implements Serializable {
    private static final long serialVersionUID = -6849794470754667710L;
    @Schema(description="\u662f\u5426\u5f00\u542f\u9632\u6296")
    private boolean enabled;
    @Schema(description="\u65f6\u95f4\u95f4\u9694(\u79d2)")
    private int time;
    @Schema(description="\u89e6\u53d1\u9608\u503c(\u6b21)")
    private int threshold;
    @Schema(description="\u662f\u5426\u7b2c\u4e00\u6b21\u6ee1\u8db3\u6761\u4ef6\u5c31\u89e6\u53d1")
    private boolean alarmFirst;

    public String wrapReactorQl(@Nonnull String sql, @Nullable String groupBy) {
        if (!this.enabled || this.time <= 0) {
            return sql;
        }
        int takes = Math.max(this.threshold, 1);
        return "select t.* from (" + sql + ") t" + " group by " + (StringUtils.hasText((String)groupBy) ? groupBy + "," : "") + "_window('" + this.time + "s')" + ",trace()" + ",take(" + (this.alarmFirst ? takes : -1) + ")" + " having row.index >= " + takes;
    }

    public <T> Flux<T> transfer(Flux<T> source, BiFunction<Duration, Flux<T>, Flux<Flux<T>>> windowFunction, BiConsumer<T, Long> totalConsumer) {
        if (!this.enabled || this.time <= 0) {
            return source;
        }
        int thresholdNumber = this.getThreshold();
        Duration windowTime = Duration.ofSeconds(this.getTime());
        return ((Flux)source.as(flux -> (Flux)windowFunction.apply(windowTime, (Flux)flux))).flatMap(group -> ((Flux)group.index((index, data) -> Tuples.of((Object)(index + 1L), (Object)data)).filter(tp -> (Long)tp.getT1() >= (long)thresholdNumber).as(flux -> this.isAlarmFirst() ? flux.take(1L) : flux.takeLast(1))).map(tp2 -> {
            totalConsumer.accept((Object)tp2.getT2(), (Long)tp2.getT1());
            return tp2.getT2();
        }));
    }

    public boolean isEnabled() {
        return this.enabled;
    }

    public int getTime() {
        return this.time;
    }

    public int getThreshold() {
        return this.threshold;
    }

    public boolean isAlarmFirst() {
        return this.alarmFirst;
    }

    public void setEnabled(boolean enabled) {
        this.enabled = enabled;
    }

    public void setTime(int time) {
        this.time = time;
    }

    public void setThreshold(int threshold) {
        this.threshold = threshold;
    }

    public void setAlarmFirst(boolean alarmFirst) {
        this.alarmFirst = alarmFirst;
    }
}

