package org.jetlinks.community.things.data.operations;

import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.collections.MapUtils;
import org.hswebframework.web.id.IDGenerator;
import org.hswebframework.web.utils.DigestUtils;
import org.jetlinks.community.things.ThingConstants;
import org.jetlinks.community.things.data.ThingLogType;
import org.jetlinks.community.things.data.ThingsDataConstants;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.core.message.DeviceLogMessage;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.ThingMessage;
import org.jetlinks.core.message.event.ThingEventMessage;
import org.jetlinks.core.message.property.PropertyMessage;
import org.jetlinks.core.message.property.ThingReportPropertyMessage;
import org.jetlinks.core.metadata.Converter;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.Feature;
import org.jetlinks.core.metadata.MetadataFeature;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.UnknownType;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.core.utils.StringBuilderUtils;
import org.jetlinks.core.utils.TimestampUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/jetlinks/community/things/data/operations/AbstractSaveOperations.class */
public abstract class AbstractSaveOperations implements SaveOperations {
    private static final Logger log = LoggerFactory.getLogger(AbstractSaveOperations.class);
    protected final ThingsRegistry registry;
    protected final MetricBuilder metricBuilder;
    protected final DataSettings settings;

    @Override // org.jetlinks.community.things.data.operations.SaveOperations
    public final Mono<Void> save(ThingMessage thingMessage) {
        return convertMessageToTimeSeriesData(thingMessage).flatMap(tuple2 -> {
            return doSave((String) tuple2.getT1(), (TimeSeriesData) tuple2.getT2());
        }).then();
    }

    @Override // org.jetlinks.community.things.data.operations.SaveOperations
    public final Mono<Void> save(Collection<? extends ThingMessage> collection) {
        return save((Publisher<? extends ThingMessage>) Flux.fromIterable(collection));
    }

    @Override // org.jetlinks.community.things.data.operations.SaveOperations
    public final Mono<Void> save(Publisher<? extends ThingMessage> publisher) {
        return Flux.from(publisher).flatMap(this::convertMessageToTimeSeriesData).groupBy((v0) -> {
            return v0.getT1();
        }).flatMap(groupedFlux -> {
            return doSave((String) groupedFlux.key(), groupedFlux.map((v0) -> {
                return v0.getT2();
            }));
        }).then();
    }

    protected Map<String, Object> createLogData(ThingMessage thingMessage) {
        HashMap newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(8);
        newHashMapWithExpectedSize.put(ThingsDataConstants.COLUMN_ID, IDGenerator.SNOW_FLAKE_STRING.generate());
        newHashMapWithExpectedSize.put(this.metricBuilder.getThingIdProperty(), thingMessage.getThingId());
        newHashMapWithExpectedSize.put(ThingsDataConstants.COLUMN_TIMESTAMP, Long.valueOf(thingMessage.getTimestamp()));
        newHashMapWithExpectedSize.put(ThingsDataConstants.COLUMN_CREATE_TIME, Long.valueOf(System.currentTimeMillis()));
        newHashMapWithExpectedSize.put(ThingsDataConstants.COLUMN_MESSAGE_ID, thingMessage.getMessageId());
        newHashMapWithExpectedSize.put("type", ThingLogType.of(thingMessage).name());
        newHashMapWithExpectedSize.put(ThingsDataConstants.COLUMN_LOG_CONTENT, thingMessage instanceof DeviceLogMessage ? ((DeviceLogMessage) thingMessage).getLog() : ObjectMappers.toJsonString(thingMessage.toJson()));
        return newHashMapWithExpectedSize;
    }

    protected String getTemplateIdFromMessage(ThingMessage thingMessage) {
        String str = (String) thingMessage.getHeader(Headers.productId).orElse(null);
        if (str == null) {
            str = (String) thingMessage.getHeader(ThingConstants.templateId).orElse(null);
        }
        return str == null ? "null" : str;
    }

    protected Flux<Tuple2<String, TimeSeriesData>> convertMessageToTimeSeriesData(ThingMessage thingMessage) {
        boolean booleanValue = ((Boolean) thingMessage.getHeaderOrDefault(Headers.ignoreStorage)).booleanValue();
        boolean booleanValue2 = ((Boolean) thingMessage.getHeaderOrDefault(Headers.ignoreLog)).booleanValue();
        if (booleanValue && booleanValue2) {
            return Flux.empty();
        }
        String templateIdFromMessage = getTemplateIdFromMessage(thingMessage);
        ArrayList arrayList = new ArrayList(2);
        if (!booleanValue) {
            if (thingMessage instanceof ThingEventMessage) {
                arrayList.add(convertEventMessageToTimeSeriesData(templateIdFromMessage, (ThingEventMessage) thingMessage));
            } else if ((thingMessage instanceof PropertyMessage) && (!this.settings.getProperty().isOnlySaveReport() || (thingMessage instanceof ThingReportPropertyMessage))) {
                PropertyMessage propertyMessage = (PropertyMessage) thingMessage;
                Map<String, Object> properties = propertyMessage.getProperties();
                if (MapUtils.isNotEmpty(properties)) {
                    Map<String, Long> propertySourceTimes = propertyMessage.getPropertySourceTimes();
                    if (propertySourceTimes == null) {
                        propertySourceTimes = Collections.emptyMap();
                    }
                    arrayList.add(convertProperties(templateIdFromMessage, thingMessage, properties, propertySourceTimes));
                }
            }
        }
        if (this.settings.getLogFilter().match(thingMessage.getMessageType()) && !booleanValue2) {
            arrayList.add(createDeviceMessageLog(templateIdFromMessage, thingMessage));
        }
        return Flux.merge(arrayList);
    }

    private Mono<Tuple2<String, TimeSeriesData>> convertEventMessageToTimeSeriesData(String str, ThingEventMessage thingEventMessage) {
        return this.registry.getTemplate(thingEventMessage.getThingType(), str).flatMap(thingTemplate -> {
            return this.settings.getEvent().eventIsAllInOne() ? thingTemplate.getMetadata() : this.registry.getThing(thingEventMessage.getThingType(), thingEventMessage.getThingId()).flatMap((v0) -> {
                return v0.getTemplate();
            }).flatMap((v0) -> {
                return v0.getMetadata();
            });
        }).handle((thingMetadata, synchronousSink) -> {
            if (this.settings.getEvent().shouldIgnoreUndefined() && thingMetadata.getEventOrNull(thingEventMessage.getEvent()) == null) {
                log.warn("{}[{}] event [{}] metadata undefined", new Object[]{thingEventMessage.getThingType(), thingEventMessage.getThingId(), thingEventMessage.getEvent()});
            } else {
                synchronousSink.next(TimeSeriesData.of(TimestampUtils.toMillis(thingEventMessage.getTimestamp()), createEventData(thingEventMessage, thingMetadata)));
            }
        }).map(timeSeriesData -> {
            return Tuples.of(createEventMetric(thingEventMessage.getThingType(), str, thingEventMessage.getThingId(), thingEventMessage.getEvent()), timeSeriesData);
        });
    }

    private String createEventMetric(String str, String str2, String str3, String str4) {
        return this.settings.getEvent().eventIsAllInOne() ? this.metricBuilder.createEventAllInOneMetric(str, str2, str3) : this.metricBuilder.createEventMetric(str, str2, str3, str4);
    }

    protected Object convertValue(Object obj, DataType dataType) {
        return dataType instanceof Converter ? ((Converter) dataType).convert(obj) : obj;
    }

    protected Map<String, Object> createEventData(ThingEventMessage thingEventMessage, ThingMetadata thingMetadata) {
        HashMap newHashMapWithExpectedSize;
        Object data = thingEventMessage.getData();
        ObjectType objectType = (DataType) thingMetadata.getEvent(thingEventMessage.getEvent()).map((v0) -> {
            return v0.getType();
        }).orElseGet(UnknownType::new);
        Object convertValue = convertValue(data, objectType);
        if (this.settings.getEvent().isUsingJsonString()) {
            newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(16);
            newHashMapWithExpectedSize.put("value", convertValue instanceof String ? convertValue : ObjectMappers.toJsonString(convertValue));
        } else if (convertValue instanceof Map) {
            Map map = (Map) convertValue;
            newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(map.size());
            newHashMapWithExpectedSize.putAll(map);
            if (this.settings.isStrict() && (objectType instanceof ObjectType)) {
                HashSet hashSet = new HashSet(newHashMapWithExpectedSize.keySet());
                Iterator it = objectType.getProperties().iterator();
                while (it.hasNext()) {
                    hashSet.remove(((PropertyMetadata) it.next()).getId());
                }
                newHashMapWithExpectedSize.getClass();
                hashSet.forEach((v1) -> {
                    r1.remove(v1);
                });
            }
        } else {
            newHashMapWithExpectedSize = Maps.newHashMapWithExpectedSize(16);
            newHashMapWithExpectedSize.put("value", convertValue);
        }
        if (this.settings.getEvent().eventIsAllInOne()) {
            newHashMapWithExpectedSize.put(ThingsDataConstants.COLUMN_EVENT_ID, thingEventMessage.getEvent());
        }
        newHashMapWithExpectedSize.put(ThingsDataConstants.COLUMN_ID, createEventDataId(thingEventMessage));
        newHashMapWithExpectedSize.put(this.metricBuilder.getThingIdProperty(), thingEventMessage.getThingId());
        newHashMapWithExpectedSize.put(ThingsDataConstants.COLUMN_CREATE_TIME, Long.valueOf(System.currentTimeMillis()));
        newHashMapWithExpectedSize.put(ThingsDataConstants.COLUMN_TIMESTAMP, Long.valueOf(thingEventMessage.getTimestamp()));
        return newHashMapWithExpectedSize;
    }

    protected String createEventDataId(ThingMessage thingMessage) {
        return DigestUtils.md5Hex(StringBuilderUtils.buildString(thingMessage, (thingMessage2, sb) -> {
            sb.append(thingMessage2.getThingId()).append('-').append(thingMessage2.getTimestamp());
        }));
    }

    private Mono<Tuple2<String, TimeSeriesData>> createDeviceMessageLog(String str, ThingMessage thingMessage) {
        return Mono.just(Tuples.of(this.metricBuilder.createLogMetric(thingMessage.getThingType(), str, thingMessage.getThingId()), TimeSeriesData.of(thingMessage.getTimestamp(), createLogData(thingMessage))));
    }

    protected abstract Flux<Tuple2<String, TimeSeriesData>> convertProperties(String str, ThingMessage thingMessage, Map<String, Object> map, Map<String, Long> map2);

    protected abstract Mono<Void> doSave(String str, TimeSeriesData timeSeriesData);

    protected abstract Mono<Void> doSave(String str, Flux<TimeSeriesData> flux);

    @Override // org.jetlinks.community.things.data.operations.SaveOperations
    public Flux<Feature> getFeatures() {
        return this.settings.getEvent().eventIsAllInOne() ? Flux.empty() : Flux.just(new Feature[]{MetadataFeature.eventNotInsertable, MetadataFeature.eventNotModifiable});
    }

    public AbstractSaveOperations(ThingsRegistry thingsRegistry, MetricBuilder metricBuilder, DataSettings dataSettings) {
        this.registry = thingsRegistry;
        this.metricBuilder = metricBuilder;
        this.settings = dataSettings;
    }
}
