package org.jetlinks.reactor.ql.supports.group;

import java.time.Duration;
import java.util.List;
import java.util.function.Function;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.LongValue;
import net.sf.jsqlparser.expression.StringValue;
import net.sf.jsqlparser.expression.operators.relational.ExpressionList;
import org.apache.commons.collections.CollectionUtils;
import org.jetlinks.reactor.ql.ReactorQLMetadata;
import org.jetlinks.reactor.ql.ReactorQLRecord;
import org.jetlinks.reactor.ql.feature.FeatureId;
import org.jetlinks.reactor.ql.feature.GroupFeature;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/jetlinks/reactor/ql/supports/group/GroupByWindowFeature.class */
public class GroupByWindowFeature implements GroupFeature {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) GroupByWindowFeature.class);
    private static final String ID = FeatureId.GroupBy.of("_window").getId();

    @Override // org.jetlinks.reactor.ql.feature.GroupFeature
    public Function<Flux<ReactorQLRecord>, Flux<Flux<ReactorQLRecord>>> createGroupMapper(Expression expression, ReactorQLMetadata reactorQLMetadata) {
        ExpressionList parameters = ((net.sf.jsqlparser.expression.Function) expression).getParameters();
        if (parameters != null) {
            List<Expression> expressions = parameters.getExpressions();
            if (!CollectionUtils.isEmpty(expressions)) {
                try {
                    if (expressions.size() == 1) {
                        return createOneParameter(expressions, reactorQLMetadata);
                    }
                    if (expressions.size() == 2) {
                        return createTwoParameter(expressions, reactorQLMetadata);
                    }
                    throw new UnsupportedOperationException("函数[ " + expression + " ]参数数量错误,最小1,最大2.");
                } catch (UnsupportedOperationException e) {
                    throw new UnsupportedOperationException("不支持的函数[ " + expression + " ] : " + e.getMessage(), e);
                }
            }
        }
        throw new UnsupportedOperationException("窗口函数必须传入参数,如: window('10s') , window(30)");
    }

    protected Function<Flux<ReactorQLRecord>, Flux<Flux<ReactorQLRecord>>> createOneParameter(List<Expression> list, ReactorQLMetadata reactorQLMetadata) {
        Expression expression = list.get(0);
        if (expression instanceof LongValue) {
            int value = (int) ((LongValue) expression).getValue();
            if (value <= 0) {
                throw new UnsupportedOperationException("窗口数量不能小于0:" + expression);
            }
            return flux -> {
                return flux.window(value);
            };
        }
        if (!(expression instanceof StringValue)) {
            throw new UnsupportedOperationException("不支持的窗口表达式:" + expression);
        }
        Duration parseDuration = CastUtils.parseDuration(((StringValue) expression).getValue());
        if (parseDuration.toMillis() <= 0) {
            throw new UnsupportedOperationException("窗口时间不能小于0:" + expression);
        }
        return flux2 -> {
            return flux2.window(parseDuration);
        };
    }

    protected Function<Flux<ReactorQLRecord>, Flux<Flux<ReactorQLRecord>>> createTwoParameter(List<Expression> list, ReactorQLMetadata reactorQLMetadata) {
        Expression expression = list.get(0);
        Expression expression2 = list.get(1);
        if ((expression instanceof LongValue) && (expression2 instanceof LongValue)) {
            int value = (int) ((LongValue) expression).getValue();
            int value2 = (int) ((LongValue) expression2).getValue();
            if (value <= 0 || value2 <= 0) {
                throw new UnsupportedOperationException("窗口时间不能小于0: " + (value <= 0 ? expression : expression2));
            }
            return flux -> {
                return flux.window(value, value2);
            };
        }
        if ((expression instanceof StringValue) && (expression2 instanceof StringValue)) {
            Duration parseDuration = CastUtils.parseDuration(((StringValue) expression).getValue());
            Duration parseDuration2 = CastUtils.parseDuration(((StringValue) expression2).getValue());
            if (parseDuration.toMillis() <= 0 || parseDuration2.toMillis() <= 0) {
                throw new UnsupportedOperationException("窗口时间不能小于0: " + (parseDuration.toMillis() <= 0 ? expression : expression2));
            }
            return flux2 -> {
                return flux2.window(parseDuration, parseDuration2);
            };
        }
        if (!(expression instanceof LongValue) || !(expression2 instanceof StringValue)) {
            throw new UnsupportedOperationException("不支持的参数: " + expression + " , " + expression2);
        }
        int value3 = (int) ((LongValue) expression).getValue();
        Duration parseDuration3 = CastUtils.parseDuration(((StringValue) expression2).getValue());
        if (parseDuration3.toMillis() <= 0) {
            throw new UnsupportedOperationException("窗口时间不能小于0: " + expression2);
        }
        if (value3 <= 0) {
            throw new UnsupportedOperationException("窗口时间不能小于0: " + expression);
        }
        return flux3 -> {
            return flux3.windowTimeout(value3, parseDuration3);
        };
    }

    @Override // org.jetlinks.reactor.ql.feature.Feature
    public String getId() {
        return ID;
    }
}
