/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.things.data.operations;

import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.commons.collections.MapUtils;
import org.hswebframework.web.id.IDGenerator;
import org.hswebframework.web.utils.DigestUtils;
import org.jetlinks.community.things.ThingConstants;
import org.jetlinks.community.things.data.ThingLogType;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import org.jetlinks.community.things.data.operations.SaveOperations;
import org.jetlinks.community.timeseries.TimeSeriesData;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.core.message.DeviceLogMessage;
import org.jetlinks.core.message.Headers;
import org.jetlinks.core.message.Message;
import org.jetlinks.core.message.ThingMessage;
import org.jetlinks.core.message.event.ThingEventMessage;
import org.jetlinks.core.message.property.PropertyMessage;
import org.jetlinks.core.message.property.ThingReportPropertyMessage;
import org.jetlinks.core.metadata.Converter;
import org.jetlinks.core.metadata.DataType;
import org.jetlinks.core.metadata.EventMetadata;
import org.jetlinks.core.metadata.Feature;
import org.jetlinks.core.metadata.MetadataFeature;
import org.jetlinks.core.metadata.PropertyMetadata;
import org.jetlinks.core.metadata.types.ObjectType;
import org.jetlinks.core.metadata.types.UnknownType;
import org.jetlinks.core.things.Thing;
import org.jetlinks.core.things.ThingMetadata;
import org.jetlinks.core.things.ThingTemplate;
import org.jetlinks.core.things.ThingsRegistry;
import org.jetlinks.core.utils.StringBuilderUtils;
import org.jetlinks.core.utils.TimestampUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

public abstract class AbstractSaveOperations
implements SaveOperations {
    private static final Logger log = LoggerFactory.getLogger(AbstractSaveOperations.class);
    protected final ThingsRegistry registry;
    protected final MetricBuilder metricBuilder;
    protected final DataSettings settings;

    @Override
    public final Mono<Void> save(ThingMessage thingMessage) {
        return this.convertMessageToTimeSeriesData(thingMessage).flatMap(tp2 -> this.doSave((String)tp2.getT1(), (TimeSeriesData)tp2.getT2())).then();
    }

    @Override
    public final Mono<Void> save(Collection<? extends ThingMessage> thingMessage) {
        return this.save((Publisher<? extends ThingMessage>)Flux.fromIterable(thingMessage));
    }

    @Override
    public final Mono<Void> save(Publisher<? extends ThingMessage> thingMessage) {
        return Flux.from(thingMessage).flatMap(this::convertMessageToTimeSeriesData).groupBy(Tuple2::getT1).flatMap(group -> this.doSave((String)group.key(), (Flux<TimeSeriesData>)group.map(Tuple2::getT2))).then();
    }

    protected Map<String, Object> createLogData(ThingMessage message) {
        HashMap data = Maps.newHashMapWithExpectedSize((int)8);
        data.put("id", IDGenerator.SNOW_FLAKE_STRING.generate());
        data.put(this.metricBuilder.getThingIdProperty(), message.getThingId());
        data.put("timestamp", message.getTimestamp());
        data.put("createTime", System.currentTimeMillis());
        data.put("messageId", message.getMessageId());
        data.put("type", ThingLogType.of((Message)message).name());
        String log = message instanceof DeviceLogMessage ? ((DeviceLogMessage)message).getLog() : ObjectMappers.toJsonString((Object)message.toJson());
        data.put("content", log);
        return data;
    }

    protected String getTemplateIdFromMessage(ThingMessage message) {
        String templateId = message.getHeader(Headers.productId).orElse(null);
        if (templateId == null) {
            templateId = message.getHeader(ThingConstants.templateId).orElse(null);
        }
        return templateId == null ? "null" : templateId;
    }

    protected Flux<Tuple2<String, TimeSeriesData>> convertMessageToTimeSeriesData(ThingMessage message) {
        boolean ignoreStorage = (Boolean)message.getHeaderOrDefault(Headers.ignoreStorage);
        boolean ignoreLog = (Boolean)message.getHeaderOrDefault(Headers.ignoreLog);
        if (ignoreStorage && ignoreLog) {
            return Flux.empty();
        }
        String templateId = this.getTemplateIdFromMessage(message);
        ArrayList<Object> all = new ArrayList<Object>(2);
        if (!ignoreStorage) {
            PropertyMessage propertyMessage;
            Map properties;
            if (message instanceof ThingEventMessage) {
                all.add(this.convertEventMessageToTimeSeriesData(templateId, (ThingEventMessage)message));
            } else if (message instanceof PropertyMessage && (!this.settings.getProperty().isOnlySaveReport() || message instanceof ThingReportPropertyMessage) && MapUtils.isNotEmpty((Map)(properties = (propertyMessage = (PropertyMessage)message).getProperties()))) {
                Map<String, Long> propertiesTimes = propertyMessage.getPropertySourceTimes();
                if (propertiesTimes == null) {
                    propertiesTimes = Collections.emptyMap();
                }
                all.add(this.convertProperties(templateId, message, properties, propertiesTimes));
            }
        }
        if (this.settings.getLogFilter().match(message.getMessageType()) && !ignoreLog) {
            all.add(this.createDeviceMessageLog(templateId, message));
        }
        return Flux.merge(all);
    }

    private Mono<Tuple2<String, TimeSeriesData>> convertEventMessageToTimeSeriesData(String templateId, ThingEventMessage message) {
        return this.registry.getTemplate(message.getThingType(), templateId).flatMap(thing -> {
            if (this.settings.getEvent().eventIsAllInOne()) {
                return thing.getMetadata();
            }
            return this.registry.getThing(message.getThingType(), message.getThingId()).flatMap(Thing::getTemplate).flatMap(ThingTemplate::getMetadata);
        }).handle((metadata, sink) -> {
            if (this.settings.getEvent().shouldIgnoreUndefined() && metadata.getEventOrNull(message.getEvent()) == null) {
                log.warn("{}[{}] event [{}] metadata undefined", new Object[]{message.getThingType(), message.getThingId(), message.getEvent()});
                return;
            }
            Map<String, Object> data = this.createEventData(message, (ThingMetadata)metadata);
            sink.next((Object)TimeSeriesData.of((long)TimestampUtils.toMillis((long)message.getTimestamp()), data));
        }).map(data -> Tuples.of((Object)this.createEventMetric(message.getThingType(), templateId, message.getThingId(), message.getEvent()), (Object)data));
    }

    private String createEventMetric(String thingType, String thingTemplateId, String thingId, String eventId) {
        return this.settings.getEvent().eventIsAllInOne() ? this.metricBuilder.createEventAllInOneMetric(thingType, thingTemplateId, thingId) : this.metricBuilder.createEventMetric(thingType, thingTemplateId, thingId, eventId);
    }

    protected Object convertValue(Object value, DataType type) {
        if (type instanceof Converter) {
            return ((Converter)type).convert(value);
        }
        return value;
    }

    protected Map<String, Object> createEventData(ThingEventMessage message, ThingMetadata metadata) {
        HashMap data;
        Object value = message.getData();
        DataType dataType = metadata.getEvent(message.getEvent()).map(EventMetadata::getType).orElseGet(UnknownType::new);
        Object tempValue = this.convertValue(value, dataType);
        if (this.settings.getEvent().isUsingJsonString()) {
            data = Maps.newHashMapWithExpectedSize((int)16);
            data.put("value", tempValue instanceof String ? tempValue : ObjectMappers.toJsonString((Object)tempValue));
        } else if (tempValue instanceof Map) {
            Map mapValue = (Map)tempValue;
            int size = mapValue.size();
            data = Maps.newHashMapWithExpectedSize((int)size);
            data.putAll(mapValue);
            if (this.settings.isStrict() && dataType instanceof ObjectType) {
                HashSet nonexistent = new HashSet(data.keySet());
                ObjectType objType = (ObjectType)dataType;
                for (PropertyMetadata property : objType.getProperties()) {
                    nonexistent.remove(property.getId());
                }
                nonexistent.forEach(data::remove);
            }
        } else {
            data = Maps.newHashMapWithExpectedSize((int)16);
            data.put("value", tempValue);
        }
        if (this.settings.getEvent().eventIsAllInOne()) {
            data.put("event", message.getEvent());
        }
        data.put("id", this.createEventDataId((ThingMessage)message));
        data.put(this.metricBuilder.getThingIdProperty(), message.getThingId());
        data.put("createTime", System.currentTimeMillis());
        data.put("timestamp", message.getTimestamp());
        return data;
    }

    protected String createEventDataId(ThingMessage message) {
        return DigestUtils.md5Hex((String)StringBuilderUtils.buildString((Object)message, (msg, builder) -> builder.append(msg.getThingId()).append('-').append(msg.getTimestamp())));
    }

    private Mono<Tuple2<String, TimeSeriesData>> createDeviceMessageLog(String templateId, ThingMessage message) {
        return Mono.just((Object)Tuples.of((Object)this.metricBuilder.createLogMetric(message.getThingType(), templateId, message.getThingId()), (Object)TimeSeriesData.of((long)message.getTimestamp(), this.createLogData(message))));
    }

    protected abstract Flux<Tuple2<String, TimeSeriesData>> convertProperties(String var1, ThingMessage var2, Map<String, Object> var3, Map<String, Long> var4);

    protected abstract Mono<Void> doSave(String var1, TimeSeriesData var2);

    protected abstract Mono<Void> doSave(String var1, Flux<TimeSeriesData> var2);

    @Override
    public Flux<Feature> getFeatures() {
        if (this.settings.getEvent().eventIsAllInOne()) {
            return Flux.empty();
        }
        return Flux.just((Object[])new Feature[]{MetadataFeature.eventNotInsertable, MetadataFeature.eventNotModifiable});
    }

    public AbstractSaveOperations(ThingsRegistry registry, MetricBuilder metricBuilder, DataSettings settings) {
        this.registry = registry;
        this.metricBuilder = metricBuilder;
        this.settings = settings;
    }
}

