package org.jetlinks.community.network.manager.session;

import com.google.common.collect.Maps;
import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.Optional;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.hswebframework.web.utils.DigestUtils;
import org.jetlinks.community.Interval;
import org.jetlinks.community.dashboard.CommonDimensionDefinition;
import org.jetlinks.community.dashboard.DefaultDashboardDefinition;
import org.jetlinks.community.dashboard.DimensionDefinition;
import org.jetlinks.community.dashboard.MeasurementDefinition;
import org.jetlinks.community.dashboard.MeasurementDimension;
import org.jetlinks.community.dashboard.MeasurementParameter;
import org.jetlinks.community.dashboard.SimpleMeasurementValue;
import org.jetlinks.community.dashboard.supports.StaticMeasurement;
import org.jetlinks.community.dashboard.supports.StaticMeasurementProvider;
import org.jetlinks.community.gateway.monitor.measurements.GatewayObjectDefinition;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.timeseries.TimeSeriesManager;
import org.jetlinks.community.timeseries.TimeSeriesMetadata;
import org.jetlinks.community.timeseries.TimeSeriesMetric;
import org.jetlinks.community.timeseries.TimeSeriesService;
import org.jetlinks.community.timeseries.query.Aggregation;
import org.jetlinks.community.timeseries.query.AggregationQueryParam;
import org.jetlinks.community.utils.TimeUtils;
import org.jetlinks.core.config.ConfigKey;
import org.jetlinks.core.device.DeviceConfigKey;
import org.jetlinks.core.device.session.DeviceSessionManager;
import org.jetlinks.core.metadata.ConfigMetadata;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DefaultConfigMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.SimplePropertyMetadata;
import org.jetlinks.core.metadata.types.ArrayType;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.StringType;
import org.jetlinks.core.server.session.DeviceSession;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

@Component
/* loaded from: input_file:org/jetlinks/community/network/manager/session/DeviceSessionMeasurementProvider.class */
public class DeviceSessionMeasurementProvider extends StaticMeasurementProvider {
    private final DeviceSessionManager sessionManager;
    private final TimeSeriesMetric metric;
    private final TimeSeriesManager timeSeriesManager;
    private final Duration interval;
    private final Disposable.Composite disposable;

    /* loaded from: input_file:org/jetlinks/community/network/manager/session/DeviceSessionMeasurementProvider$DeviceSessionMeasurementDimension.class */
    class DeviceSessionMeasurementDimension implements MeasurementDimension {
        DeviceSessionMeasurementDimension() {
        }

        public DimensionDefinition getDefinition() {
            return CommonDimensionDefinition.agg;
        }

        public DataType getValueType() {
            return new ObjectType();
        }

        public ConfigMetadata getParams() {
            return new DefaultConfigMetadata();
        }

        public boolean isRealTime() {
            return false;
        }

        private AggregationQueryParam createQuery(MeasurementParameter measurementParameter) {
            return AggregationQueryParam.of().agg("deviceId", "count", Aggregation.DISTINCT_COUNT).groupBy(measurementParameter.getInterval("interval", measurementParameter.getInterval("time", (Interval) null)), measurementParameter.getString("format", "yyyy-MM-dd")).from(measurementParameter.getDate("from", TimeUtils.parseDate("now-1d"))).to((Date) measurementParameter.getDate("to").orElseGet(Date::new)).filter(query -> {
                query.is("name", "duration").is("productId", measurementParameter.getString("productId", (String) null)).is("deviceId", measurementParameter.getString("deviceId", (String) null));
            });
        }

        public Publisher<? extends Object> getValue(MeasurementParameter measurementParameter) {
            AggregationQueryParam createQuery = createQuery(measurementParameter);
            DateTimeFormatter forPattern = DateTimeFormat.forPattern(measurementParameter.getString("format", "yyyy-MM-dd"));
            return DeviceSessionMeasurementProvider.this.timeSeriesManager.getService(DeviceSessionMeasurementProvider.this.metric).aggregation(createQuery).map(aggregationData -> {
                Long valueOf = Long.valueOf(aggregationData.getLong("count", 0L));
                String string = aggregationData.getString("time", "");
                Optional string2 = aggregationData.getString("time");
                forPattern.getClass();
                return SimpleMeasurementValue.of(valueOf, string, ((Long) string2.map(forPattern::parseMillis).orElse(0L)).longValue());
            }).sort().take(measurementParameter.getInt("limit", 30));
        }
    }

    public DeviceSessionMeasurementProvider(DeviceSessionManager deviceSessionManager, TimeSeriesManager timeSeriesManager) {
        super(DefaultDashboardDefinition.device, GatewayObjectDefinition.session);
        this.interval = TimeUtils.parse(System.getProperty("device.session.report.interval", "1h"));
        this.disposable = Disposables.composite();
        this.sessionManager = deviceSessionManager;
        this.metric = TimeSeriesMetric.of("device_session_metric");
        this.timeSeriesManager = timeSeriesManager;
        addMeasurement(new StaticMeasurement(MeasurementDefinition.of("online", "在线统计")).addDimension(new DeviceSessionMeasurementDimension()));
    }

    @PostConstruct
    public void init() {
        this.timeSeriesManager.registerMetadata(TimeSeriesMetadata.of(this.metric, new PropertyMetadata[]{SimplePropertyMetadata.of("server", "server", StringType.GLOBAL), SimplePropertyMetadata.of("duration", "duration", IntType.GLOBAL), SimplePropertyMetadata.of("count", "count", IntType.GLOBAL), SimplePropertyMetadata.of("bindings", "bindings", new ArrayType().elementType(StringType.GLOBAL)), SimplePropertyMetadata.of("connectTime", "connectTime", DateTimeType.GLOBAL)})).block(Duration.ofSeconds(30L));
        Scheduler newSingle = Schedulers.newSingle("device-session-reporter");
        this.disposable.add(newSingle);
        if (!this.interval.isZero() && !this.interval.isNegative()) {
            this.disposable.add(Flux.interval(this.interval, newSingle).flatMap(l -> {
                return reportDeviceSession();
            }).subscribe());
        }
        this.disposable.add(this.sessionManager.listenEvent(deviceSessionEvent -> {
            return reportDeviceSession(deviceSessionEvent.getSession(), "session");
        }));
    }

    @PreDestroy
    public void shutdown() {
        this.disposable.dispose();
    }

    private Mono<Void> reportDeviceSession() {
        return this.sessionManager.getSessions().flatMap(this::reportDeviceSession).onErrorResume(th -> {
            return Mono.empty();
        }).then();
    }

    protected Mono<Void> reportDeviceSession(DeviceSession deviceSession) {
        return reportDeviceSession(deviceSession, "check");
    }

    private long computeDuration(long j) {
        return System.currentTimeMillis() - j;
    }

    protected Mono<Void> reportDeviceSession(DeviceSession deviceSession, String str) {
        if (null == deviceSession.getOperator()) {
            return Mono.empty();
        }
        Mono map = deviceSession.getOperator().getSelfConfigs(new ConfigKey[]{DeviceConfigKey.productId}).map(values -> {
            HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(16);
            CharSequence[] charSequenceArr = new CharSequence[4];
            charSequenceArr[0] = deviceSession.getDeviceId();
            charSequenceArr[1] = String.valueOf(deviceSession.connectTime());
            charSequenceArr[2] = this.sessionManager.getCurrentServerId();
            charSequenceArr[3] = "session".equals(str) ? "session" : String.valueOf((int) (System.currentTimeMillis() / this.interval.toMillis()));
            newHashMapWithExpectedSize.put("id", DigestUtils.md5Hex(String.join("-", charSequenceArr)));
            newHashMapWithExpectedSize.put("name", "duration");
            newHashMapWithExpectedSize.put("productId", values.getValue(DeviceConfigKey.productId).orElse(""));
            newHashMapWithExpectedSize.put("deviceId", deviceSession.getDeviceId());
            newHashMapWithExpectedSize.put("server", this.sessionManager.getCurrentServerId());
            newHashMapWithExpectedSize.put("connectTime", Long.valueOf(deviceSession.connectTime()));
            newHashMapWithExpectedSize.put("duration", Long.valueOf(computeDuration(deviceSession.connectTime())));
            newHashMapWithExpectedSize.put("type", str);
            return TimeSeriesData.of(System.currentTimeMillis(), newHashMapWithExpectedSize);
        });
        TimeSeriesService service = this.timeSeriesManager.getService(this.metric);
        service.getClass();
        return ((Mono) map.as((v1) -> {
            return r1.commit(v1);
        })).onErrorResume(th -> {
            return Mono.empty();
        });
    }
}
