package org.jetlinks.community.things.data;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.hswebframework.web.exception.I18nSupportException;
import org.jetlinks.community.things.ThingsDataRepository;
import org.jetlinks.community.things.data.ThingsDataRepositoryStrategy;
import org.jetlinks.community.things.data.operations.DataSettings;
import org.jetlinks.community.things.data.operations.MetricBuilder;
import org.jetlinks.community.things.data.operations.SaveOperations;
import org.jetlinks.community.things.data.operations.TemplateOperations;
import org.jetlinks.community.things.data.operations.ThingOperations;
import org.jetlinks.core.message.ThingMessage;
import org.jetlinks.core.things.ThingsRegistry;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

/* loaded from: input_file:org/jetlinks/community/things/data/DefaultThingsDataRepository.class */
public class DefaultThingsDataRepository implements ThingsDataRepository, ThingsDataContext, SaveOperations {
    private final ThingsRegistry registry;
    private final Map<String, ThingsDataRepositoryStrategy> policies = new ConcurrentHashMap();
    private final Map<String, ThingsDataRepositoryStrategy.OperationsContext> contexts = new ConcurrentHashMap();
    private String defaultPolicy = "default-row";
    private ThingsDataRepositoryStrategy.OperationsContext defaultContext = new ThingsDataRepositoryStrategy.OperationsContext(MetricBuilder.DEFAULT, new DataSettings());

    public DefaultThingsDataRepository(ThingsRegistry thingsRegistry) {
        this.registry = thingsRegistry;
    }

    private ThingsDataRepositoryStrategy getPolicyNow(String str) {
        ThingsDataRepositoryStrategy thingsDataRepositoryStrategy = this.policies.get(str);
        if (thingsDataRepositoryStrategy == null) {
            throw new I18nSupportException("error.thing_data_policy_unsupported", new Object[]{str});
        }
        return thingsDataRepositoryStrategy;
    }

    private Mono<Tuple2<String, ThingsDataRepositoryStrategy>> getPolicyByThing(String str, String str2) {
        return this.registry.getThing(str, str2).flatMap(thing -> {
            return Mono.zip(thing.getTemplate().map((v0) -> {
                return v0.getId();
            }), thing.getConfig(ThingsDataConstants.storePolicyConfigKey).defaultIfEmpty(this.defaultPolicy).map(this::getPolicyNow));
        });
    }

    private Mono<ThingsDataRepositoryStrategy> getPolicyByTemplate(String str, String str2) {
        return this.registry.getTemplate(str, str2).flatMap(thingTemplate -> {
            return thingTemplate.getConfig(ThingsDataConstants.storePolicyConfigKey).defaultIfEmpty(this.defaultPolicy);
        }).map(this::getPolicyNow);
    }

    @Override // org.jetlinks.community.things.ThingsDataRepository
    public SaveOperations opsForSave() {
        return this;
    }

    @Override // org.jetlinks.community.things.ThingsDataRepository
    public Mono<ThingOperations> opsForThing(String str, String str2) {
        return getPolicyByThing(str, str2).map(tuple2 -> {
            return ((ThingsDataRepositoryStrategy) tuple2.getT2()).opsForThing(str, (String) tuple2.getT1(), str2, this.contexts.getOrDefault(str, this.defaultContext));
        });
    }

    @Override // org.jetlinks.community.things.ThingsDataRepository
    public Mono<TemplateOperations> opsForTemplate(String str, String str2) {
        return getPolicyByTemplate(str, str2).map(thingsDataRepositoryStrategy -> {
            return thingsDataRepositoryStrategy.opsForTemplate(str, str2, this.contexts.getOrDefault(str, this.defaultContext));
        });
    }

    @Override // org.jetlinks.community.things.data.ThingsDataContext
    public void customMetricBuilder(String str, MetricBuilder metricBuilder) {
        this.contexts.compute(str, (str2, operationsContext) -> {
            return operationsContext == null ? new ThingsDataRepositoryStrategy.OperationsContext(metricBuilder, this.defaultContext.getSettings()) : operationsContext.metricBuilder(metricBuilder);
        });
    }

    @Override // org.jetlinks.community.things.data.ThingsDataContext
    public void customSettings(String str, DataSettings dataSettings) {
        this.contexts.compute(str, (str2, operationsContext) -> {
            return operationsContext == null ? new ThingsDataRepositoryStrategy.OperationsContext(this.defaultContext.getMetricBuilder(), dataSettings) : operationsContext.settings(dataSettings);
        });
    }

    @Override // org.jetlinks.community.things.data.ThingsDataContext
    public void setDefaultPolicy(String str) {
        this.defaultPolicy = str;
    }

    @Override // org.jetlinks.community.things.data.ThingsDataContext
    public void setDefaultSettings(DataSettings dataSettings) {
        for (Map.Entry<String, ThingsDataRepositoryStrategy.OperationsContext> entry : this.contexts.entrySet()) {
            if (entry.getValue().getSettings() == this.defaultContext.getSettings()) {
                entry.setValue(new ThingsDataRepositoryStrategy.OperationsContext(entry.getValue().getMetricBuilder(), dataSettings));
            }
        }
        this.defaultContext = new ThingsDataRepositoryStrategy.OperationsContext(MetricBuilder.DEFAULT, dataSettings);
    }

    @Override // org.jetlinks.community.things.data.ThingsDataContext
    public void addPolicy(ThingsDataRepositoryStrategy thingsDataRepositoryStrategy) {
        ThingsDataRepositoryStrategies.register(thingsDataRepositoryStrategy);
        this.policies.put(thingsDataRepositoryStrategy.getId(), thingsDataRepositoryStrategy);
    }

    @Override // org.jetlinks.community.things.data.operations.SaveOperations
    public Mono<Void> save(ThingMessage thingMessage) {
        return doSave(thingMessage.getThingType(), thingMessage.getThingId(), saveOperations -> {
            return saveOperations.save(thingMessage);
        });
    }

    @Override // org.jetlinks.community.things.data.operations.SaveOperations
    public Mono<Void> save(Collection<? extends ThingMessage> collection) {
        return save((Publisher<? extends ThingMessage>) Flux.fromIterable(collection));
    }

    @Override // org.jetlinks.community.things.data.operations.SaveOperations
    public Mono<Void> save(Publisher<? extends ThingMessage> publisher) {
        return Flux.from(publisher).groupBy(thingMessage -> {
            return Tuples.of(thingMessage.getThingType(), thingMessage.getThingId());
        }).flatMap(groupedFlux -> {
            return doSave((String) ((Tuple2) groupedFlux.key()).getT1(), (String) ((Tuple2) groupedFlux.key()).getT2(), saveOperations -> {
                return saveOperations.save((Publisher<? extends ThingMessage>) groupedFlux);
            });
        }).then();
    }

    private Mono<Void> doSave(String str, String str2, Function<SaveOperations, Mono<Void>> function) {
        return getPolicyByThing(str, str2).flatMap(tuple2 -> {
            return (Mono) function.apply(((ThingsDataRepositoryStrategy) tuple2.getT2()).opsForSave(this.contexts.getOrDefault(str, this.defaultContext)));
        });
    }
}
