/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.device.measurements.message;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import lombok.Generated;
import org.hswebframework.ezorm.core.dsl.Query;
import org.jetlinks.community.dashboard.CommonDimensionDefinition;
import org.jetlinks.community.dashboard.DimensionDefinition;
import org.jetlinks.community.dashboard.MeasurementDefinition;
import org.jetlinks.community.dashboard.MeasurementDimension;
import org.jetlinks.community.dashboard.MeasurementParameter;
import org.jetlinks.community.dashboard.MeasurementValue;
import org.jetlinks.community.dashboard.SimpleMeasurementValue;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.TimeSeriesService;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.StringType;
import reactor.core.publisher.Flux;

class DeviceMessageMeasurement
extends StaticMeasurement {
    private final EventBus eventBus;
    private final TimeSeriesManager timeSeriesManager;
    static MeasurementDefinition definition = MeasurementDefinition.of((String)"quantity", (String)"\u8bbe\u5907\u6d88\u606f\u91cf");
    static ConfigMetadata realTimeConfigMetadata = new DefaultConfigMetadata().add("interval", "\u6570\u636e\u7edf\u8ba1\u5468\u671f", "\u4f8b\u5982: 1s,10s", (DataType)new StringType());
    static ConfigMetadata historyConfigMetadata = new DefaultConfigMetadata().add("productId", "\u8bbe\u5907\u578b\u53f7", "", (DataType)new StringType()).add("time", "\u5468\u671f", "\u4f8b\u5982: 1h,10m,30s", (DataType)new StringType()).add("format", "\u65f6\u95f4\u683c\u5f0f", "\u5982: MM-dd:HH", (DataType)new StringType()).add("msgType", "\u6d88\u606f\u7c7b\u578b", "", (DataType)new StringType()).add("limit", "\u6700\u5927\u6570\u636e\u91cf", "", (DataType)new IntType()).add("from", "\u65f6\u95f4\u4ece", "", (DataType)new DateTimeType()).add("to", "\u65f6\u95f4\u81f3", "", (DataType)new DateTimeType());

    public DeviceMessageMeasurement(EventBus eventBus, TimeSeriesManager timeSeriesManager) {
        super(definition);
        this.eventBus = eventBus;
        this.timeSeriesManager = timeSeriesManager;
        this.addDimension(new RealTimeMessageDimension());
        this.addDimension(new AggMessageDimension());
    }

    class AggMessageDimension
    implements MeasurementDimension {
        AggMessageDimension() {
        }

        @Generated
        public DimensionDefinition getDefinition() {
            return CommonDimensionDefinition.agg;
        }

        @Generated
        public DataType getValueType() {
            return IntType.GLOBAL;
        }

        @Generated
        public ConfigMetadata getParams() {
            return historyConfigMetadata;
        }

        @Generated
        public boolean isRealTime() {
            return false;
        }

        public AggregationQueryParam createQueryParam(MeasurementParameter parameter) {
            return AggregationQueryParam.of().sum("count").groupBy(parameter.getInterval("interval", parameter.getInterval("time", null)), parameter.getString("format").orElse("MM\u6708dd\u65e5 HH\u65f6")).filter(query -> {
                Query cfr_ignored_0 = (Query)((Query)query.where("name", (Object)"message-count")).is("productId", parameter.getString("productId").orElse(null));
            }).limit(parameter.getInt("limit").orElse(1).intValue()).from(parameter.getDate("from").orElseGet(() -> Date.from(LocalDateTime.now().plusDays(-1L).atZone(ZoneId.systemDefault()).toInstant()))).to(parameter.getDate("to").orElse(new Date()));
        }

        public Flux<SimpleMeasurementValue> getValue(MeasurementParameter parameter) {
            AggregationQueryParam param = this.createQueryParam(parameter);
            return Flux.defer(() -> ((Flux)param.execute(arg_0 -> ((TimeSeriesService)DeviceMessageMeasurement.this.timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics())).aggregation(arg_0))).index((index, data) -> SimpleMeasurementValue.of((Object)data.getLong("count", 0L), (String)data.getString("time").orElse(""), (long)index))).take((long)param.getLimit());
        }
    }

    class RealTimeMessageDimension
    implements MeasurementDimension {
        RealTimeMessageDimension() {
        }

        @Generated
        public DimensionDefinition getDefinition() {
            return CommonDimensionDefinition.realTime;
        }

        @Generated
        public DataType getValueType() {
            return IntType.GLOBAL;
        }

        @Generated
        public ConfigMetadata getParams() {
            return realTimeConfigMetadata;
        }

        @Generated
        public boolean isRealTime() {
            return true;
        }

        public Flux<MeasurementValue> getValue(MeasurementParameter parameter) {
            return DeviceMessageMeasurement.this.eventBus.subscribe(Subscription.of((String)"real-time-device-message", (String)"/device/**", (Subscription.Feature[])new Subscription.Feature[]{Subscription.Feature.local, Subscription.Feature.broker})).doOnNext(TopicPayload::release).window(parameter.getDuration("interval").orElse(Duration.ofSeconds(1L))).flatMap(Flux::count).map(total -> SimpleMeasurementValue.of((Object)total, (long)System.currentTimeMillis()));
        }
    }
}

