package org.jetlinks.community.rule.engine.commons;

import io.swagger.v3.oas.annotations.media.Schema;
import java.io.Serializable;
import java.time.Duration;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/jetlinks/community/rule/engine/commons/ShakeLimit.class */
public class ShakeLimit implements Serializable {
    private static final long serialVersionUID = -6849794470754667710L;

    @Schema(description = "是否开启防抖")
    private boolean enabled;

    @Schema(description = "时间间隔(秒)")
    private int time;

    @Schema(description = "触发阈值(次)")
    private int threshold;

    @Schema(description = "是否第一次满足条件就触发")
    private boolean alarmFirst;

    public String wrapReactorQl(@Nonnull String str, @Nullable String str2) {
        if (!this.enabled || this.time <= 0) {
            return str;
        }
        int max = Math.max(this.threshold, 1);
        return "select t.* from (" + str + ") t group by " + (StringUtils.hasText(str2) ? str2 + "," : "") + "_window('" + this.time + "s'),trace(),take(" + (this.alarmFirst ? max : -1) + ") having row.index >= " + max;
    }

    public <T> Flux<T> transfer(Flux<T> flux, BiFunction<Duration, Flux<T>, Flux<Flux<T>>> biFunction, BiConsumer<T, Long> biConsumer) {
        if (!this.enabled || this.time <= 0) {
            return flux;
        }
        int threshold = getThreshold();
        Duration ofSeconds = Duration.ofSeconds(getTime());
        return ((Flux) flux.as(flux2 -> {
            return (Flux) biFunction.apply(ofSeconds, flux2);
        })).flatMap(flux3 -> {
            return ((Flux) flux3.index((l, obj) -> {
                return Tuples.of(Long.valueOf(l.longValue() + 1), obj);
            }).filter(tuple2 -> {
                return ((Long) tuple2.getT1()).longValue() >= ((long) threshold);
            }).as(flux3 -> {
                return isAlarmFirst() ? flux3.take(1L) : flux3.takeLast(1);
            })).map(tuple22 -> {
                biConsumer.accept(tuple22.getT2(), tuple22.getT1());
                return tuple22.getT2();
            });
        });
    }

    public boolean isEnabled() {
        return this.enabled;
    }

    public int getTime() {
        return this.time;
    }

    public int getThreshold() {
        return this.threshold;
    }

    public boolean isAlarmFirst() {
        return this.alarmFirst;
    }

    public void setEnabled(boolean z) {
        this.enabled = z;
    }

    public void setTime(int i) {
        this.time = i;
    }

    public void setThreshold(int i) {
        this.threshold = i;
    }

    public void setAlarmFirst(boolean z) {
        this.alarmFirst = z;
    }
}
