/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.elastic.search.things;

import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.hswebframework.ezorm.core.dsl.Query;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.elastic.search.service.AggregationService;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.things.data.AggregationRequest;
import org.jetlinks.community.things.data.PropertyAggregation;
import org.jetlinks.community.things.data.ThingPropertyDetail;
import org.jetlinks.community.things.data.operations.AbstractQueryOperations;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import org.jetlinks.community.things.data.operations.RowModeQueryOperationsBase;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.query.Aggregation;
import org.jetlinks.community.timeseries.query.AggregationColumn;
import org.jetlinks.community.timeseries.query.AggregationData;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.jetlinks.community.timeseries.query.Group;
import org.jetlinks.community.timeseries.query.LimitAggregationColumn;
import org.jetlinks.community.timeseries.query.LimitGroup;
import org.jetlinks.community.timeseries.query.TimeGroup;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.reactor.ql.utils.CastUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class ElasticSearchRowModeQueryOperations
extends RowModeQueryOperationsBase {
    private final ElasticSearchService searchService;
    private final AggregationService aggregationService;

    public ElasticSearchRowModeQueryOperations(String thingType, String thingTemplateId, String thingId, MetricBuilder metricBuilder, DataSettings settings, ThingsRegistry registry, ElasticSearchService service, AggregationService aggregationService) {
        super(thingType, thingTemplateId, thingId, metricBuilder, settings, registry);
        this.searchService = service;
        this.aggregationService = aggregationService;
    }

    protected Flux<TimeSeriesData> doQuery(String metric, Query<?, QueryParamEntity> query) {
        return this.searchService.query(metric, query.getParam(), data -> {
            long ts = CastUtils.castNumber((Object)data.getOrDefault("timestamp", 0L)).longValue();
            return TimeSeriesData.of((long)ts, (Map)data);
        });
    }

    protected <T> Mono<PagerResult<T>> doQueryPage(String metric, Query<?, QueryParamEntity> query, Function<TimeSeriesData, T> mapper) {
        return this.searchService.queryPager(metric, query.getParam(), data -> {
            long ts = CastUtils.castNumber((Object)data.getOrDefault("timestamp", 0L)).longValue();
            return mapper.apply(TimeSeriesData.of((long)ts, (Map)data));
        });
    }

    protected Flux<ThingPropertyDetail> queryEachProperty(@Nonnull String metric, @Nonnull Query<?, QueryParamEntity> query, @Nonnull ThingMetadata metadata, @Nonnull Map<String, PropertyMetadata> properties) {
        QueryParamEntity param = (QueryParamEntity)query.getParam();
        if (!param.isPaging() || param.getPageSize() >= 1000) {
            return super.queryEachProperty(metric, query, metadata, properties);
        }
        if (properties.size() <= 200) {
            query.in("property", properties.keySet());
        }
        return ((Flux)AggregationQueryParam.of().agg((AggregationColumn)new LimitAggregationColumn("property", "property", Aggregation.FIRST, param.getPageSize())).groupBy((Group)new LimitGroup("property", "property", param.getPageSize() * properties.size())).filter(param).execute(params -> this.aggregationService.aggregation(metric, (AggregationQueryParam)params))).mapNotNull(data -> {
            long ts = CastUtils.castNumber((Object)data.getOrDefault("timestamp", 0L)).longValue();
            String property = data.getOrDefault("property", null);
            String thingId = data.getOrDefault(this.metricBuilder.getThingIdProperty(), null);
            Object value = data.getOrDefault("value", null);
            if (property == null || thingId == null || value == null) {
                return null;
            }
            return ThingPropertyDetail.of((TimeSeriesData)TimeSeriesData.of((long)ts, (Map)data), (PropertyMetadata)((PropertyMetadata)properties.get(property)));
        });
    }

    protected Flux<AggregationData> doAggregation(String metric, AggregationRequest request, AbstractQueryOperations.AggregationContext context) {
        PropertyAggregation[] properties = context.getProperties();
        if (properties.length == 1) {
            return ((Flux)((AggregationQueryParam)AggregationQueryParam.of().agg("numberValue", properties[0].getAlias(), properties[0].getAgg()).as(param -> {
                if (request.getInterval() == null) {
                    return param;
                }
                return param.groupBy(request.getInterval(), request.getFormat());
            })).limit(request.getLimit()).from(request.getFrom()).to(request.getTo()).filter(request.getFilter()).filter(query -> {
                Query cfr_ignored_0 = (Query)query.where("property", (Object)properties[0].getProperty());
            }).execute(param -> this.aggregationService.aggregation(metric, (AggregationQueryParam)param))).doOnNext(agg -> agg.remove("_time")).map(AggregationData::of).take((long)request.getLimit());
        }
        Map propertyAlias = context.getPropertyAlias();
        Map aliasProperty = context.getAliasToProperty();
        return ((Flux)((Flux)((AggregationQueryParam)((AggregationQueryParam)AggregationQueryParam.of().as(param -> {
            Arrays.stream(properties).forEach(agg -> param.agg("numberValue", "value_" + agg.getAlias(), agg.getAgg()));
            return param;
        })).as(param -> {
            if (request.getInterval() == null) {
                return param;
            }
            return param.groupBy((Group)new TimeGroup(request.getInterval(), "time", request.getFormat()));
        })).groupBy((Group)new LimitGroup("property", "property", properties.length)).limit(request.getLimit() * properties.length).from(request.getFrom()).to(request.getTo()).filter(request.getFilter()).filter(query -> {
            Query cfr_ignored_0 = (Query)((Query)query.where()).in("property", new HashSet(propertyAlias.values()));
        }).execute(param -> this.aggregationService.aggregation(metric, (AggregationQueryParam)param))).map(AggregationData::of).groupBy(agg -> agg.getString("time", ""), Integer.MAX_VALUE).as(flux -> {
            if (request.getInterval() != null) {
                return flux.flatMap(group -> {
                    String time = (String)group.key();
                    return group.groupBy(agg -> agg.getString("property", ""), Integer.MAX_VALUE).flatMap(propsGroup -> {
                        String property = (String)propsGroup.key();
                        return propsGroup.reduce(AggregationData::merge).map(agg -> {
                            HashMap<String, String> data = new HashMap<String, String>();
                            data.put("_time", agg.get("_time").orElse(time));
                            data.put("time", time);
                            aliasProperty.forEach((alias, prp) -> {
                                if (prp.getAgg() == Aggregation.FIRST || prp.getAgg() == Aggregation.TOP) {
                                    data.putIfAbsent((String)alias, agg.get("numberValue").orElse(agg.get("value").orElse(null)));
                                } else if (property.equals(prp.getProperty())) {
                                    Integer value = agg.get("value_" + alias).orElse(0);
                                    data.putIfAbsent((String)alias, (String)((Object)value));
                                }
                            });
                            return data;
                        });
                    }).reduceWith(HashMap::new, (a, b) -> {
                        a.putAll(b);
                        return a;
                    });
                });
            }
            return flux.flatMap(group -> group.reduce(AggregationData::merge).map(agg -> {
                HashMap values = new HashMap();
                for (Map.Entry props : propertyAlias.entrySet()) {
                    values.put(props.getKey(), agg.get("value_" + (String)props.getKey()).orElse(0));
                }
                return values;
            }));
        })).map(map -> {
            map.remove("");
            propertyAlias.keySet().forEach(key -> map.putIfAbsent(key, 0));
            return AggregationData.of((Map)map);
        }).sort(Comparator.comparing(agg -> CastUtils.castDate(agg.values().get("_time"))).reversed()).doOnNext(agg -> agg.values().remove("_time")).take((long)request.getLimit());
    }
}

