/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.elastic.search.things;

import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;
import org.hswebframework.ezorm.core.dsl.Query;
import org.hswebframework.web.api.crud.entity.PagerResult;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.jetlinks.community.elastic.search.service.AggregationService;
import org.jetlinks.community.elastic.search.service.ElasticSearchService;
import org.jetlinks.community.things.data.AggregationRequest;
import org.jetlinks.community.things.data.PropertyAggregation;
import org.jetlinks.community.things.data.operations.AbstractQueryOperations;
import org.jetlinks.community.things.data.operations.ColumnModeQueryOperationsBase;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.query.Aggregation;
import org.jetlinks.community.timeseries.query.AggregationData;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.jetlinks.community.timeseries.query.Group;
import org.jetlinks.community.timeseries.query.TimeGroup;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class ElasticSearchColumnModeQueryOperations
extends ColumnModeQueryOperationsBase {
    private final ElasticSearchService searchService;
    private final AggregationService aggregationService;

    public ElasticSearchColumnModeQueryOperations(String thingType, String thingTemplateId, String thingId, MetricBuilder metricBuilder, DataSettings settings, ThingsRegistry registry, ElasticSearchService service, AggregationService aggregationService) {
        super(thingType, thingTemplateId, thingId, metricBuilder, settings, registry);
        this.searchService = service;
        this.aggregationService = aggregationService;
    }

    protected Flux<TimeSeriesData> doQuery(String metric, Query<?, QueryParamEntity> query) {
        return this.searchService.query(metric, query.getParam(), data -> {
            long ts = CastUtils.castNumber((Object)data.getOrDefault("timestamp", 0L)).longValue();
            data.put("timestamp", ts);
            return TimeSeriesData.of((long)ts, (Map)data);
        });
    }

    protected <T> Mono<PagerResult<T>> doQueryPage(String metric, Query<?, QueryParamEntity> query, Function<TimeSeriesData, T> mapper) {
        return this.searchService.queryPager(metric, query.getParam(), data -> {
            long ts = CastUtils.castNumber((Object)data.getOrDefault("timestamp", 0L)).longValue();
            data.put("timestamp", ts);
            return mapper.apply(TimeSeriesData.of((long)ts, (Map)data));
        });
    }

    protected Flux<AggregationData> doAggregation(String metric, AggregationRequest request, AbstractQueryOperations.AggregationContext context) {
        DateTimeFormatter formatter = DateTimeFormat.forPattern((String)request.getFormat());
        PropertyAggregation[] properties = context.getProperties();
        return ((Flux)((AggregationQueryParam)((AggregationQueryParam)AggregationQueryParam.of().as(param -> {
            for (PropertyAggregation property : properties) {
                param.agg(property.getProperty(), property.getAlias(), property.getAgg());
            }
            return param;
        })).as(param -> {
            if (request.getInterval() == null) {
                return param;
            }
            return param.groupBy((Group)new TimeGroup(request.getInterval(), "time", request.getFormat()));
        })).limit(request.getLimit() * properties.length).from(request.getFrom()).to(request.getTo()).filter(request.getFilter()).execute(param -> this.aggregationService.aggregation(metric, (AggregationQueryParam)param))).map(AggregationData::of).groupBy(agg -> agg.getString("time", ""), Integer.MAX_VALUE).flatMap(group -> group.map(data -> {
            HashMap<String, Object> newMap = new HashMap<String, Object>();
            newMap.put("time", data.get("time").orElse(null));
            for (PropertyAggregation property : properties) {
                Object val = property.getAgg() == Aggregation.FIRST || property.getAgg() == Aggregation.TOP ? data.get(property.getProperty()).orElse(null) : data.get(property.getAlias()).orElse(null);
                if (null == val) continue;
                newMap.put(property.getAlias(), val);
            }
            return newMap;
        }).reduce((a, b) -> {
            a.putAll(b);
            return a;
        }).map(AggregationData::of)).sort(Comparator.comparing(agg -> DateTime.parse((String)agg.getString("time", ""), (DateTimeFormatter)formatter).toDate()).reversed()).take((long)request.getLimit());
    }
}

