package org.jetlinks.community.device.measurements.message;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import lombok.Generated;
import org.jetlinks.community.Interval;
import org.jetlinks.community.dashboard.CommonDimensionDefinition;
import org.jetlinks.community.dashboard.DimensionDefinition;
import org.jetlinks.community.dashboard.MeasurementDefinition;
import org.jetlinks.community.dashboard.MeasurementDimension;
import org.jetlinks.community.dashboard.MeasurementParameter;
import org.jetlinks.community.dashboard.MeasurementValue;
import org.jetlinks.community.dashboard.SimpleMeasurementValue;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.device.timeseries.DeviceTimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.TimeSeriesService;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.StringType;
import reactor.core.publisher.Flux;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement.class */
public class DeviceMessageMeasurement extends StaticMeasurement {
    private final EventBus eventBus;
    private final TimeSeriesManager timeSeriesManager;
    static MeasurementDefinition definition = MeasurementDefinition.of("quantity", "设备消息量");
    static ConfigMetadata realTimeConfigMetadata = new DefaultConfigMetadata().add("interval", "数据统计周期", "例如: 1s,10s", new StringType());
    static ConfigMetadata historyConfigMetadata = new DefaultConfigMetadata().add("productId", "设备型号", "", new StringType()).add("time", "周期", "例如: 1h,10m,30s", new StringType()).add("format", "时间格式", "如: MM-dd:HH", new StringType()).add("msgType", "消息类型", "", new StringType()).add("limit", "最大数据量", "", new IntType()).add("from", "时间从", "", new DateTimeType()).add("to", "时间至", "", new DateTimeType());

    /* loaded from: input_file:org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement$AggMessageDimension.class */
    class AggMessageDimension implements MeasurementDimension {
        AggMessageDimension() {
        }

        @Generated
        public DimensionDefinition getDefinition() {
            return CommonDimensionDefinition.agg;
        }

        @Generated
        public DataType getValueType() {
            return IntType.GLOBAL;
        }

        @Generated
        public ConfigMetadata getParams() {
            return DeviceMessageMeasurement.historyConfigMetadata;
        }

        @Generated
        public boolean isRealTime() {
            return false;
        }

        public AggregationQueryParam createQueryParam(MeasurementParameter measurementParameter) {
            return AggregationQueryParam.of().sum("count").groupBy(measurementParameter.getInterval("interval", measurementParameter.getInterval("time", (Interval) null)), (String) measurementParameter.getString("format").orElse("MM月dd日 HH时")).filter(query -> {
                query.where("name", "message-count").is("productId", measurementParameter.getString("productId").orElse(null));
            }).limit(((Integer) measurementParameter.getInt("limit").orElse(1)).intValue()).from((Date) measurementParameter.getDate("from").orElseGet(() -> {
                return Date.from(LocalDateTime.now().plusDays(-1L).atZone(ZoneId.systemDefault()).toInstant());
            })).to((Date) measurementParameter.getDate("to").orElse(new Date()));
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Flux<SimpleMeasurementValue> m49getValue(MeasurementParameter measurementParameter) {
            AggregationQueryParam createQueryParam = createQueryParam(measurementParameter);
            return Flux.defer(() -> {
                TimeSeriesService service = DeviceMessageMeasurement.this.timeSeriesManager.getService(DeviceTimeSeriesMetric.deviceMetrics());
                service.getClass();
                return ((Flux) createQueryParam.execute(service::aggregation)).index((l, aggregationData) -> {
                    return SimpleMeasurementValue.of(Long.valueOf(aggregationData.getLong("count", 0L)), (String) aggregationData.getString("time").orElse(""), l.longValue());
                });
            }).take(createQueryParam.getLimit());
        }
    }

    /* loaded from: input_file:org/jetlinks/community/device/measurements/message/DeviceMessageMeasurement$RealTimeMessageDimension.class */
    class RealTimeMessageDimension implements MeasurementDimension {
        RealTimeMessageDimension() {
        }

        @Generated
        public DimensionDefinition getDefinition() {
            return CommonDimensionDefinition.realTime;
        }

        @Generated
        public DataType getValueType() {
            return IntType.GLOBAL;
        }

        @Generated
        public ConfigMetadata getParams() {
            return DeviceMessageMeasurement.realTimeConfigMetadata;
        }

        @Generated
        public boolean isRealTime() {
            return true;
        }

        /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
        public Flux<MeasurementValue> m50getValue(MeasurementParameter measurementParameter) {
            return DeviceMessageMeasurement.this.eventBus.subscribe(Subscription.of("real-time-device-message", "/device/**", new Subscription.Feature[]{Subscription.Feature.local, Subscription.Feature.broker})).doOnNext((v0) -> {
                v0.release();
            }).window((Duration) measurementParameter.getDuration("interval").orElse(Duration.ofSeconds(1L))).flatMap((v0) -> {
                return v0.count();
            }).map(l -> {
                return SimpleMeasurementValue.of(l, System.currentTimeMillis());
            });
        }
    }

    public DeviceMessageMeasurement(EventBus eventBus, TimeSeriesManager timeSeriesManager) {
        super(definition);
        this.eventBus = eventBus;
        this.timeSeriesManager = timeSeriesManager;
        addDimension(new RealTimeMessageDimension());
        addDimension(new AggMessageDimension());
    }
}
