package org.jetlinks.community.device.service.data;

import com.google.common.collect.Maps;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.sql.JDBCType;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.hswebframework.ezorm.core.ValueCodec;
import org.hswebframework.ezorm.rdb.codec.ClobValueCodec;
import org.hswebframework.ezorm.rdb.codec.DateTimeCodec;
import org.hswebframework.ezorm.rdb.codec.JsonValueCodec;
import org.hswebframework.ezorm.rdb.codec.NumberValueCodec;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.ezorm.rdb.mapping.defaults.record.Record;
import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
import org.hswebframework.ezorm.rdb.metadata.RDBSchemaMetadata;
import org.hswebframework.ezorm.rdb.metadata.RDBTableMetadata;
import org.hswebframework.ezorm.rdb.operator.DatabaseOperator;
import org.hswebframework.ezorm.rdb.operator.ddl.TableBuilder;
import org.hswebframework.ezorm.rdb.operator.dml.SelectColumnSupplier;
import org.hswebframework.ezorm.rdb.operator.dml.query.Selects;
import org.hswebframework.web.api.crud.entity.QueryParamEntity;
import org.hswebframework.web.exception.ValidationException;
import org.jetlinks.community.ConfigMetadataConstants;
import org.jetlinks.community.buffer.BufferProperties;
import org.jetlinks.community.buffer.BufferSettings;
import org.jetlinks.community.buffer.PersistenceBuffer;
import org.jetlinks.community.device.entity.DeviceLatestData;
import org.jetlinks.community.device.service.data.DeviceLatestDataService;
import org.jetlinks.community.gateway.DeviceMessageUtils;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.timeseries.query.Aggregation;
import org.jetlinks.community.timeseries.query.AggregationColumn;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.message.DeviceMessage;
import org.jetlinks.core.message.event.EventMessage;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.DeviceMetadata;
import org.jetlinks.core.metadata.EventMetadata;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.ArrayType;
import org.jetlinks.core.metadata.types.DateTimeType;
import org.jetlinks.core.metadata.types.DoubleType;
import org.jetlinks.core.metadata.types.EnumType;
import org.jetlinks.core.metadata.types.FloatType;
import org.jetlinks.core.metadata.types.GeoPoint;
import org.jetlinks.core.metadata.types.GeoType;
import org.jetlinks.core.metadata.types.IntType;
import org.jetlinks.core.metadata.types.LongType;
import org.jetlinks.core.metadata.types.NumberType;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.core.utils.SerializeUtils;
import org.jetlinks.core.utils.StringBuilderUtils;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.math.MathFlux;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/jetlinks/community/device/service/data/DatabaseDeviceLatestDataService.class */
public class DatabaseDeviceLatestDataService implements DeviceLatestDataService {
    private final DatabaseOperator databaseOperator;
    private final BufferProperties buffer;
    private PersistenceBuffer<Buffer> writer;
    private static final Logger log = LoggerFactory.getLogger(DatabaseDeviceLatestDataService.class);
    static GeoCodec geoCodec = new GeoCodec();
    static StringCodec stringCodec = new StringCodec();
    static Map<Aggregation, Function<Flux<Object>, Mono<? extends Number>>> aggMappers = new HashMap();
    static Function<Flux<Object>, Mono<? extends Number>> avg = flux -> {
        return MathFlux.averageDouble(flux.map(CastUtils::castNumber).map((v0) -> {
            return v0.doubleValue();
        }));
    };
    static Function<Flux<Object>, Mono<? extends Number>> max = flux -> {
        return MathFlux.max(flux.map(CastUtils::castNumber).map((v0) -> {
            return v0.doubleValue();
        }));
    };
    static Function<Flux<Object>, Mono<? extends Number>> min = flux -> {
        return MathFlux.min(flux.map(CastUtils::castNumber).map((v0) -> {
            return v0.doubleValue();
        }));
    };
    static Function<Flux<Object>, Mono<? extends Number>> sum = flux -> {
        return MathFlux.sumDouble(flux.map(CastUtils::castNumber).map((v0) -> {
            return v0.doubleValue();
        }));
    };

    /* renamed from: org.jetlinks.community.device.service.data.DatabaseDeviceLatestDataService$1, reason: invalid class name */
    /* loaded from: input_file:org/jetlinks/community/device/service/data/DatabaseDeviceLatestDataService$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$jetlinks$community$timeseries$query$Aggregation = new int[Aggregation.values().length];

        static {
            try {
                $SwitchMap$org$jetlinks$community$timeseries$query$Aggregation[Aggregation.COUNT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$jetlinks$community$timeseries$query$Aggregation[Aggregation.AVG.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$jetlinks$community$timeseries$query$Aggregation[Aggregation.MAX.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$jetlinks$community$timeseries$query$Aggregation[Aggregation.MIN.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$jetlinks$community$timeseries$query$Aggregation[Aggregation.SUM.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/jetlinks/community/device/service/data/DatabaseDeviceLatestDataService$Buffer.class */
    public static class Buffer implements Externalizable {
        private static final long expires = Duration.ofSeconds(30).toMillis();
        private String table;
        private String deviceId;
        private String deviceName;
        private Map<String, Object> properties;
        private long timestamp;

        public boolean isEffective() {
            return System.currentTimeMillis() - this.timestamp < expires;
        }

        public static Buffer of(String str, String str2, String str3, Map<String, Object> map, long j) {
            Buffer buffer = new Buffer();
            buffer.table = str;
            buffer.deviceId = str2;
            buffer.deviceName = str3;
            buffer.properties = map;
            buffer.timestamp = j;
            return buffer;
        }

        public Buffer merge(Buffer buffer) {
            if (buffer.timestamp > this.timestamp) {
                return buffer.merge(this);
            }
            Map<String, Object> map = buffer.properties;
            Map<String, Object> map2 = this.properties;
            map2.getClass();
            map.forEach((v1, v2) -> {
                r1.putIfAbsent(v1, v2);
            });
            return this;
        }

        int size() {
            if (this.properties == null) {
                return 0;
            }
            return this.properties.size();
        }

        @Override // java.io.Externalizable
        public void writeExternal(ObjectOutput objectOutput) throws IOException {
            objectOutput.writeUTF(this.table);
            objectOutput.writeUTF(this.deviceId);
            objectOutput.writeUTF(this.deviceName);
            objectOutput.writeLong(this.timestamp);
            SerializeUtils.writeObject(this.properties, objectOutput);
        }

        @Override // java.io.Externalizable
        public void readExternal(ObjectInput objectInput) throws IOException, ClassNotFoundException {
            this.table = objectInput.readUTF();
            this.deviceId = objectInput.readUTF();
            this.deviceName = objectInput.readUTF();
            this.timestamp = objectInput.readLong();
            this.properties = (Map) SerializeUtils.readObject(objectInput);
        }

        public String getTable() {
            return this.table;
        }

        public String getDeviceId() {
            return this.deviceId;
        }

        public String getDeviceName() {
            return this.deviceName;
        }

        public Map<String, Object> getProperties() {
            return this.properties;
        }

        public long getTimestamp() {
            return this.timestamp;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jetlinks/community/device/service/data/DatabaseDeviceLatestDataService$GeoCodec.class */
    public static class GeoCodec implements ValueCodec<String, GeoPoint> {
        GeoCodec() {
        }

        /* renamed from: encode, reason: merged with bridge method [inline-methods] */
        public String m73encode(Object obj) {
            return String.valueOf(obj);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public GeoPoint m72decode(Object obj) {
            return GeoPoint.of(obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jetlinks/community/device/service/data/DatabaseDeviceLatestDataService$StringCodec.class */
    public static class StringCodec implements ValueCodec<String, String> {
        StringCodec() {
        }

        /* renamed from: encode, reason: merged with bridge method [inline-methods] */
        public String m75encode(Object obj) {
            return String.valueOf(obj);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public String m74decode(Object obj) {
            return String.valueOf(obj);
        }
    }

    public DatabaseDeviceLatestDataService(DatabaseOperator databaseOperator, BufferProperties bufferProperties) {
        this.databaseOperator = databaseOperator;
        this.buffer = bufferProperties;
        init();
    }

    public static String getLatestTableTableName(String str) {
        return StringBuilderUtils.buildString(str, (str2, sb) -> {
            sb.append("dev_lst_");
            for (char c : str.toCharArray()) {
                if (c == '-' || c == '.') {
                    sb.append('_');
                } else {
                    sb.append(Character.toLowerCase(c));
                }
            }
        });
    }

    private String getEventColumn(String str, String str2) {
        return str + "_" + str2;
    }

    private Mono<Boolean> doWrite(Flux<Buffer> flux) {
        return flux.groupBy((v0) -> {
            return v0.getTable();
        }, Integer.MAX_VALUE).concatMap(groupedFlux -> {
            return groupedFlux.groupBy((v0) -> {
                return v0.getDeviceId();
            }, Integer.MAX_VALUE).flatMap(groupedFlux -> {
                return groupedFlux.reduce((v0, v1) -> {
                    return v0.merge(v1);
                });
            }).buffer(200).flatMap(list -> {
                Buffer buffer = (Buffer) list.get(0);
                return doUpdateLatestData(buffer.table, (List) list.stream().map((v0) -> {
                    return v0.getProperties();
                }).collect(Collectors.toList())).onErrorResume(th -> {
                    log.error("save device latest data error", th);
                    return Mono.empty();
                });
            });
        }).then(Reactors.ALWAYS_FALSE);
    }

    public void init() {
        this.writer = new PersistenceBuffer(BufferSettings.create("./data/buffer", this.buffer), Buffer::new, this::doWrite).name("device-latest-data").settings(bufferSettings -> {
            return bufferSettings.bufferSize(100000);
        });
        this.writer.start();
    }

    public void destroy() {
        this.writer.dispose();
    }

    Class<?> getJavaType(DataType dataType) {
        if (null == dataType) {
            return Map.class;
        }
        String type = dataType.getType();
        boolean z = -1;
        switch (type.hashCode()) {
            case -1325958191:
                if (type.equals("double")) {
                    z = 3;
                    break;
                }
                break;
            case -1023368385:
                if (type.equals("object")) {
                    z = 8;
                    break;
                }
                break;
            case 104431:
                if (type.equals("int")) {
                    z = false;
                    break;
                }
                break;
            case 3076014:
                if (type.equals("date")) {
                    z = 5;
                    break;
                }
                break;
            case 3327612:
                if (type.equals("long")) {
                    z = true;
                    break;
                }
                break;
            case 64711720:
                if (type.equals("boolean")) {
                    z = 4;
                    break;
                }
                break;
            case 93090393:
                if (type.equals("array")) {
                    z = 6;
                    break;
                }
                break;
            case 97526364:
                if (type.equals("float")) {
                    z = 2;
                    break;
                }
                break;
            case 1819525311:
                if (type.equals("geoPoint")) {
                    z = 7;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                return Integer.class;
            case true:
                return Long.class;
            case true:
                return Float.class;
            case true:
                return Double.class;
            case true:
                return Boolean.class;
            case true:
                return Date.class;
            case true:
                return List.class;
            case true:
            case true:
                return Map.class;
            default:
                return String.class;
        }
    }

    RDBColumnMetadata convertColumn(PropertyMetadata propertyMetadata) {
        RDBColumnMetadata rDBColumnMetadata = new RDBColumnMetadata();
        rDBColumnMetadata.setName(propertyMetadata.getId());
        rDBColumnMetadata.setComment(propertyMetadata.getName());
        DoubleType valueType = propertyMetadata.getValueType();
        if (valueType instanceof NumberType) {
            rDBColumnMetadata.setLength(32);
            rDBColumnMetadata.setPrecision(32);
            if (valueType instanceof DoubleType) {
                rDBColumnMetadata.setScale(((Integer) Optional.ofNullable(valueType.getScale()).orElse(2)).intValue());
                rDBColumnMetadata.setValueCodec(new NumberValueCodec(Double.class));
                rDBColumnMetadata.setJdbcType(JDBCType.NUMERIC, Double.class);
            } else if (valueType instanceof FloatType) {
                rDBColumnMetadata.setScale(((Integer) Optional.ofNullable(((FloatType) valueType).getScale()).orElse(2)).intValue());
                rDBColumnMetadata.setValueCodec(new NumberValueCodec(Float.class));
                rDBColumnMetadata.setJdbcType(JDBCType.NUMERIC, Float.class);
            } else if (valueType instanceof LongType) {
                rDBColumnMetadata.setValueCodec(new NumberValueCodec(Long.class));
                rDBColumnMetadata.setJdbcType(JDBCType.NUMERIC, Long.class);
            } else {
                rDBColumnMetadata.setValueCodec(new NumberValueCodec(IntType.class));
                rDBColumnMetadata.setJdbcType(JDBCType.NUMERIC, Integer.class);
            }
        } else if (valueType instanceof ObjectType) {
            rDBColumnMetadata.setJdbcType(JDBCType.CLOB, String.class);
            rDBColumnMetadata.setValueCodec(JsonValueCodec.of(Map.class));
        } else if (valueType instanceof ArrayType) {
            rDBColumnMetadata.setJdbcType(JDBCType.CLOB, String.class);
            rDBColumnMetadata.setValueCodec(JsonValueCodec.ofCollection(ArrayList.class, getJavaType(((ArrayType) valueType).getElementType())));
        } else if (valueType instanceof DateTimeType) {
            rDBColumnMetadata.setJdbcType(JDBCType.TIMESTAMP, Long.class);
            String format = ((DateTimeType) valueType).getFormat();
            if ("timestamp".equals(format)) {
                format = "yyyy-MM-dd HH:mm:ss";
            }
            rDBColumnMetadata.setValueCodec(new DateTimeCodec(format, Long.class));
        } else if (valueType instanceof GeoType) {
            rDBColumnMetadata.setJdbcType(JDBCType.VARCHAR, String.class);
            rDBColumnMetadata.setValueCodec(geoCodec);
            rDBColumnMetadata.setLength(128);
        } else if (valueType instanceof EnumType) {
            rDBColumnMetadata.setJdbcType(JDBCType.VARCHAR, String.class);
            rDBColumnMetadata.setValueCodec(stringCodec);
            rDBColumnMetadata.setLength(64);
        } else {
            int intValue = ((Integer) valueType.getExpand(ConfigMetadataConstants.maxLength.getKey()).filter(obj -> {
                return !StringUtils.isEmpty(obj);
            }).map(CastUtils::castNumber).map((v0) -> {
                return v0.intValue();
            }).orElse(255)).intValue();
            if (intValue > 2048) {
                rDBColumnMetadata.setJdbcType(JDBCType.LONGVARBINARY, String.class);
                rDBColumnMetadata.setValueCodec(ClobValueCodec.INSTANCE);
            } else {
                rDBColumnMetadata.setJdbcType(JDBCType.VARCHAR, String.class);
                rDBColumnMetadata.setLength(intValue);
                rDBColumnMetadata.setValueCodec(stringCodec);
            }
        }
        return rDBColumnMetadata;
    }

    @Override // org.jetlinks.community.device.service.data.DeviceLatestDataService
    public Mono<Void> reloadMetadata(String str, DeviceMetadata deviceMetadata) {
        return Mono.defer(() -> {
            String latestTableTableName = getLatestTableTableName(str);
            log.debug("reload product[{}] metadata,table name:[{}] ", str, latestTableTableName);
            RDBSchemaMetadata currentSchema = this.databaseOperator.getMetadata().getCurrentSchema();
            RDBTableMetadata newTable = currentSchema.newTable(latestTableTableName);
            RDBColumnMetadata newColumn = newTable.newColumn();
            newColumn.setName("id");
            newColumn.setLength(64);
            newColumn.setPrimaryKey(true);
            newColumn.setJdbcType(JDBCType.VARCHAR, String.class);
            newTable.addColumn(newColumn);
            RDBColumnMetadata newColumn2 = newTable.newColumn();
            newColumn2.setLength(128);
            newColumn2.setName("device_name");
            newColumn2.setAlias("deviceName");
            newColumn2.setJdbcType(JDBCType.VARCHAR, String.class);
            newTable.addColumn(newColumn2);
            Iterator it = deviceMetadata.getProperties().iterator();
            while (it.hasNext()) {
                newTable.addColumn(convertColumn((PropertyMetadata) it.next()));
            }
            for (EventMetadata eventMetadata : deviceMetadata.getEvents()) {
                ObjectType type = eventMetadata.getType();
                if (type instanceof ObjectType) {
                    for (PropertyMetadata propertyMetadata : type.getProperties()) {
                        RDBColumnMetadata convertColumn = convertColumn(propertyMetadata);
                        convertColumn.setName(getEventColumn(eventMetadata.getId(), propertyMetadata.getId()));
                        newTable.addColumn(convertColumn);
                    }
                }
            }
            return currentSchema.getTableReactive(latestTableTableName, false).doOnNext(rDBTableMetadata -> {
                rDBTableMetadata.replace(newTable);
            }).switchIfEmpty(Mono.fromRunnable(() -> {
                currentSchema.addTable(newTable);
            })).then();
        });
    }

    @Transactional(propagation = Propagation.NEVER)
    public Mono<Void> upgradeMetadata(String str, DeviceMetadata deviceMetadata, boolean z) {
        return Mono.defer(() -> {
            String latestTableTableName = getLatestTableTableName(str);
            log.debug("upgrade product[{}] metadata,table name:[{}] ", str, latestTableTableName);
            TableBuilder allowAlter = this.databaseOperator.ddl().createOrAlter(latestTableTableName).addColumn("id").primaryKey().varchar(64).commit().addColumn("device_name").alias("deviceName").varchar(128).notNull().commit().merge(true).allowAlter(z);
            Iterator it = deviceMetadata.getProperties().iterator();
            while (it.hasNext()) {
                allowAlter.addColumn(convertColumn((PropertyMetadata) it.next()));
            }
            for (EventMetadata eventMetadata : deviceMetadata.getEvents()) {
                ObjectType type = eventMetadata.getType();
                if (type instanceof ObjectType) {
                    for (PropertyMetadata propertyMetadata : type.getProperties()) {
                        RDBColumnMetadata convertColumn = convertColumn(propertyMetadata);
                        convertColumn.setName(getEventColumn(eventMetadata.getId(), propertyMetadata.getId()));
                        allowAlter.addColumn(convertColumn);
                    }
                }
            }
            return allowAlter.commit().reactive().subscribeOn(Schedulers.boundedElastic()).then();
        });
    }

    @Override // org.jetlinks.community.device.service.data.DeviceLatestDataService
    public Mono<Void> upgradeMetadata(String str, DeviceMetadata deviceMetadata) {
        return upgradeMetadata(str, deviceMetadata, true);
    }

    @Override // org.jetlinks.community.device.service.data.DeviceLatestDataService
    @Subscribe(topics = {"/device/**"}, features = {Subscription.Feature.local})
    public void save(DeviceMessage deviceMessage) {
        try {
            Map map = (Map) DeviceMessageUtils.tryGetProperties(deviceMessage).orElseGet(() -> {
                if (!(deviceMessage instanceof EventMessage)) {
                    return null;
                }
                Object data = ((EventMessage) deviceMessage).getData();
                String event = ((EventMessage) deviceMessage).getEvent();
                if (!(data instanceof Map)) {
                    return Collections.singletonMap(getEventColumn(event, "value"), data);
                }
                HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(((Map) data).size());
                ((Map) data).forEach((obj, obj2) -> {
                    newHashMapWithExpectedSize.put(getEventColumn(event, String.valueOf(obj)), obj2);
                });
                return newHashMapWithExpectedSize;
            });
            if (CollectionUtils.isEmpty(map)) {
                return;
            }
            String str = (String) deviceMessage.getHeader("productId").map(String::valueOf).orElse("null");
            String str2 = (String) deviceMessage.getHeader("deviceName").map(String::valueOf).orElse(deviceMessage.getDeviceId());
            String latestTableTableName = getLatestTableTableName(str);
            HashMap hashMap = new HashMap(map);
            hashMap.put("id", deviceMessage.getDeviceId());
            hashMap.put("deviceName", str2);
            this.writer.write(Buffer.of(latestTableTableName, deviceMessage.getDeviceId(), str2, hashMap, deviceMessage.getTimestamp()));
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    @Transactional(propagation = Propagation.REQUIRES_NEW)
    public Mono<Void> doUpdateLatestData(String str, List<Map<String, Object>> list) {
        return this.databaseOperator.getMetadata().getCurrentSchema().getTableReactive(str, false).flatMap(rDBTableMetadata -> {
            if (rDBTableMetadata.getColumn("deviceName").isPresent()) {
                return this.databaseOperator.dml().upsert(str).ignoreUpdate(new String[]{"id"}).values(list).execute().reactive().then();
            }
            log.warn("设备最新数据表[{}]结构错误", str);
            return Mono.empty();
        });
    }

    public ReactiveRepository<Record, String> getRepository(String str) {
        return this.databaseOperator.dml().createReactiveRepository(getLatestTableTableName(str));
    }

    @Override // org.jetlinks.community.device.service.data.DeviceLatestDataService
    public Flux<DeviceLatestData> query(String str, QueryParamEntity queryParamEntity) {
        return getRepository(str).createQuery().setParam(queryParamEntity).fetch().map((v1) -> {
            return new DeviceLatestData(v1);
        });
    }

    @Override // org.jetlinks.community.device.service.data.DeviceLatestDataService
    public Mono<DeviceLatestData> queryDeviceData(String str, String str2) {
        return getRepository(str).findById(str2).map((v1) -> {
            return new DeviceLatestData(v1);
        });
    }

    @Override // org.jetlinks.community.device.service.data.DeviceLatestDataService
    public Mono<Integer> count(String str, QueryParamEntity queryParamEntity) {
        return getRepository(str).createQuery().setParam(queryParamEntity).count();
    }

    private SelectColumnSupplier createAggColumn(AggregationColumn aggregationColumn) {
        switch (AnonymousClass1.$SwitchMap$org$jetlinks$community$timeseries$query$Aggregation[aggregationColumn.getAggregation().ordinal()]) {
            case 1:
                return Selects.count(aggregationColumn.getProperty()).as(aggregationColumn.getAlias());
            case 2:
                return Selects.avg(aggregationColumn.getProperty()).as(aggregationColumn.getAlias());
            case 3:
                return Selects.max(aggregationColumn.getProperty()).as(aggregationColumn.getAlias());
            case 4:
                return Selects.min(aggregationColumn.getProperty()).as(aggregationColumn.getAlias());
            case 5:
                return Selects.sum(aggregationColumn.getProperty()).as(aggregationColumn.getAlias());
            default:
                throw new UnsupportedOperationException("unsupported agg:" + aggregationColumn.getAggregation());
        }
    }

    private SelectColumnSupplier[] createAggColumns(List<AggregationColumn> list) {
        return (SelectColumnSupplier[]) list.stream().map(this::createAggColumn).toArray(i -> {
            return new SelectColumnSupplier[i];
        });
    }

    @Override // org.jetlinks.community.device.service.data.DeviceLatestDataService
    public Mono<Map<String, Object>> aggregation(String str, List<AggregationColumn> list, QueryParamEntity queryParamEntity) {
        if (CollectionUtils.isEmpty(list)) {
            return Mono.error(new ValidationException("columns", "error.aggregate_column_cannot_be_empty", new Object[0]));
        }
        String latestTableTableName = getLatestTableTableName(str);
        return this.databaseOperator.getMetadata().getTableReactive(latestTableTableName).flatMap(rDBTableMetadata -> {
            ArrayList arrayList = new ArrayList();
            List<AggregationColumn> list2 = (List) list.stream().filter(aggregationColumn -> {
                if (rDBTableMetadata.getColumn(aggregationColumn.getProperty()).isPresent()) {
                    return true;
                }
                arrayList.add(aggregationColumn.getProperty());
                return false;
            }).collect(Collectors.toList());
            return CollectionUtils.isEmpty(list2) ? Mono.error(new ValidationException("columns", "error.invalid_product_attribute_or_event", new Object[]{str, arrayList})) : this.databaseOperator.dml().query(latestTableTableName).select(createAggColumns(list2)).setParam(queryParamEntity.clone().noPaging()).fetch(ResultWrappers.map()).reactive().take(1L).singleOrEmpty().doOnNext(map -> {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    map.putIfAbsent(((AggregationColumn) it.next()).getAlias(), 0);
                }
            }).onErrorReturn(th -> {
                return StringUtils.hasText(th.getMessage()) && th.getMessage().contains("doesn't exist ");
            }, Collections.emptyMap());
        });
    }

    @Override // org.jetlinks.community.device.service.data.DeviceLatestDataService
    public Flux<Map<String, Object>> aggregation(Flux<DeviceLatestDataService.QueryProductLatestDataRequest> flux, boolean z) {
        Flux cache = flux.cache();
        return (Flux) cache.flatMap(queryProductLatestDataRequest -> {
            return aggregation(queryProductLatestDataRequest.getProductId(), queryProductLatestDataRequest.getColumns(), queryProductLatestDataRequest.getQuery()).doOnNext(map -> {
                if (z) {
                    return;
                }
                map.put("productId", queryProductLatestDataRequest.getProductId());
            });
        }).as(flux2 -> {
            return !z ? flux2 : cache.take(1L).flatMapIterable((v0) -> {
                return v0.getColumns();
            }).collectMap((v0) -> {
                return v0.getAlias();
            }, aggregationColumn -> {
                return aggMappers.getOrDefault(aggregationColumn.getAggregation(), sum);
            }).flatMap(map -> {
                return flux2.flatMapIterable((v0) -> {
                    return v0.entrySet();
                }).groupBy((v0) -> {
                    return v0.getKey();
                }, Integer.MAX_VALUE).flatMap(groupedFlux -> {
                    return ((Mono) ((Function) map.getOrDefault(groupedFlux.key(), sum)).apply(groupedFlux.map((v0) -> {
                        return v0.getValue();
                    }))).map(number -> {
                        return Tuples.of(String.valueOf(groupedFlux.key()), number);
                    });
                }).collectMap((v0) -> {
                    return v0.getT1();
                }, (v0) -> {
                    return v0.getT2();
                });
            }).flux();
        });
    }

    static {
        aggMappers.put(Aggregation.AVG, avg);
        aggMappers.put(Aggregation.MAX, max);
        aggMappers.put(Aggregation.MIN, min);
        aggMappers.put(Aggregation.SUM, sum);
        aggMappers.put(Aggregation.COUNT, sum);
    }
}
