/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.rule.engine.alarm;

import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.apache.commons.collections4.CollectionUtils;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.hswebframework.ezorm.rdb.mapping.ReactiveUpdate;
import org.hswebframework.web.api.crud.entity.GenericEntity;
import org.hswebframework.web.bean.FastBeanCopier;
import org.hswebframework.web.crud.events.EntityCreatedEvent;
import org.hswebframework.web.crud.events.EntityDeletedEvent;
import org.hswebframework.web.crud.events.EntityModifyEvent;
import org.hswebframework.web.crud.events.EntitySavedEvent;
import org.hswebframework.web.i18n.LocaleUtils;
import org.hswebframework.web.id.IDGenerator;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.rule.engine.RuleEngineConstants;
import org.jetlinks.community.rule.engine.alarm.AlarmData;
import org.jetlinks.community.rule.engine.alarm.AlarmHandleInfo;
import org.jetlinks.community.rule.engine.alarm.AlarmRuleHandler;
import org.jetlinks.community.rule.engine.alarm.AlarmTarget;
import org.jetlinks.community.rule.engine.alarm.AlarmTargetInfo;
import org.jetlinks.community.rule.engine.entity.AlarmConfigEntity;
import org.jetlinks.community.rule.engine.entity.AlarmHandleHistoryEntity;
import org.jetlinks.community.rule.engine.entity.AlarmHistoryInfo;
import org.jetlinks.community.rule.engine.entity.AlarmRecordEntity;
import org.jetlinks.community.rule.engine.entity.AlarmRuleBindEntity;
import org.jetlinks.community.rule.engine.enums.AlarmHandleType;
import org.jetlinks.community.rule.engine.enums.AlarmRecordState;
import org.jetlinks.community.rule.engine.enums.AlarmState;
import org.jetlinks.community.rule.engine.service.AlarmConfigService;
import org.jetlinks.community.rule.engine.service.AlarmHistoryService;
import org.jetlinks.community.rule.engine.service.AlarmRecordService;
import org.jetlinks.community.topic.Topics;
import org.jetlinks.community.utils.ObjectMappers;
import org.jetlinks.core.config.ConfigStorage;
import org.jetlinks.core.config.ConfigStorageManager;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.utils.CompositeSet;
import org.jetlinks.core.utils.Reactors;
import org.jetlinks.reactor.ql.utils.CastUtils;
import org.jetlinks.rule.engine.api.RuleData;
import org.jetlinks.rule.engine.api.RuleDataHelper;
import org.jetlinks.rule.engine.api.task.ExecutionContext;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

@Component
public class DefaultAlarmRuleHandler
implements AlarmRuleHandler,
CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(DefaultAlarmRuleHandler.class);
    private static final Set<String> configInfoKey = new HashSet<String>(Arrays.asList("alarmConfigId", "alarmName", "level", "targetType", "state", "ownerId"));
    private final Map<Tuple2<String, Integer>, Set<String>> ruleAlarmBinds = new ConcurrentHashMap<Tuple2<String, Integer>, Set<String>>();
    private final AlarmRecordService alarmRecordService;
    private final AlarmHistoryService historyService;
    private final ConfigStorageManager storageManager;
    private final ApplicationEventPublisher eventPublisher;
    private final EventBus eventBus;
    public final ReactiveRepository<AlarmRuleBindEntity, String> bindRepository;
    private final ReactiveRepository<AlarmHandleHistoryEntity, String> handleHistoryRepository;
    public final AlarmConfigService alarmConfigService;
    static final String TOPIC_ALARM_CONFIG_SAVE = "/_sys/device-alarm-config/save";
    static final String TOPIC_ALARM_CONFIG_DELETE = "/_sys/device-alarm-rule/del";
    static final String TOPIC_ALARM_RULE_BIND = "/_sys/device-alarm-rule/bind";
    static final String TOPIC_ALARM_RULE_UNBIND = "/_sys/device-alarm-rule/unbind";

    @Override
    public Flux<AlarmRuleHandler.Result> triggered(ExecutionContext context, RuleData data) {
        return this.parseAlarmInfo(context, data).flatMap(this::triggerAlarm);
    }

    @Override
    public Flux<AlarmRuleHandler.Result> relieved(ExecutionContext context, RuleData data) {
        return this.parseAlarmInfo(context, data).flatMap(this::relieveAlarm);
    }

    private Flux<AlarmInfo> parseAlarmInfo(ExecutionContext context, RuleData data) {
        if (this.ruleAlarmBinds.isEmpty()) {
            return Flux.empty();
        }
        int branchIndex = context.getJob().getConfiguration("_branchIndex").map(idx -> CastUtils.castNumber((Object)idx).intValue()).orElse(-1);
        Set<String> alarmId = this.getBoundAlarmId(context.getInstanceId(), branchIndex);
        if (CollectionUtils.isEmpty(alarmId)) {
            return Flux.empty();
        }
        Map contextMap = RuleDataHelper.toContextMap((RuleData)data);
        return Flux.fromIterable(alarmId).flatMap(this::getAlarmStorage).flatMap(store -> this.parseAlarm(context, (ConfigStorage)store, contextMap));
    }

    private Set<String> getBoundAlarmId(String ruleId, int branchIndex) {
        Set<String> specific = this.ruleAlarmBinds.get(Tuples.of((Object)ruleId, (Object)branchIndex));
        Set<String> any = this.ruleAlarmBinds.get(Tuples.of((Object)ruleId, (Object)-1));
        if (CollectionUtils.isEmpty(specific) && CollectionUtils.isEmpty(any)) {
            return Collections.emptySet();
        }
        if (CollectionUtils.isNotEmpty(specific) && CollectionUtils.isEmpty(any)) {
            return specific;
        }
        if (CollectionUtils.isEmpty(specific) && CollectionUtils.isNotEmpty(any)) {
            return any;
        }
        return new CompositeSet(specific, any);
    }

    private AlarmRecordEntity ofRecord(AlarmRuleHandler.Result result) {
        AlarmRecordEntity entity = new AlarmRecordEntity();
        entity.setAlarmConfigId(result.getAlarmConfigId());
        entity.setState(AlarmRecordState.warning);
        entity.setAlarmTime(System.currentTimeMillis());
        entity.setLevel(result.getLevel());
        entity.setTargetType(result.getTargetType());
        entity.setTargetName(result.getTargetName());
        entity.setTargetId(result.getTargetId());
        entity.setSourceType(result.getSourceType());
        entity.setSourceName(result.getSourceName());
        entity.setSourceId(result.getSourceId());
        entity.setAlarmName(result.getAlarmName());
        entity.generateId();
        return entity;
    }

    private Flux<AlarmInfo> parseAlarm(ExecutionContext context, ConfigStorage alarm, Map<String, Object> contextMap) {
        return this.getAlarmInfo(alarm).flatMapMany(result -> {
            String ruleName = RuleEngineConstants.getRuleName((ExecutionContext)context).orElse(result.getAlarmName());
            AlarmData alarmData = AlarmData.of(result.getAlarmConfigId(), result.getAlarmName(), context.getInstanceId(), ruleName, contextMap);
            result.setData(alarmData);
            return AlarmTarget.of(result.getTargetType()).convert(alarmData).map(result::copyWith);
        }).flatMap(info -> this.getRecordCache(info.createRecordId()).map(info::with).defaultIfEmpty(info));
    }

    private Mono<AlarmInfo> relieveAlarm(AlarmInfo result) {
        if (result.isCached() && !result.isAlarming()) {
            return Mono.empty();
        }
        AlarmRecordEntity record = this.ofRecord(result);
        return Mono.zip(this.alarmRecordService.changeRecordState(AlarmRecordState.normal, (String)record.getId()), this.updateRecordCache((String)record.getId(), RecordCache::withNormal), (total, ignore) -> total).flatMap(total -> {
            if (total > 0) {
                result.setAlarming(true);
                return this.saveAlarmHandleHistory(record);
            }
            return Mono.empty();
        }).thenReturn((Object)result);
    }

    private Mono<Void> saveAlarmHandleHistory(AlarmRecordEntity record) {
        AlarmHandleInfo alarmHandleInfo = new AlarmHandleInfo();
        alarmHandleInfo.setHandleTime(System.currentTimeMillis());
        alarmHandleInfo.setAlarmRecordId((String)record.getId());
        alarmHandleInfo.setAlarmConfigId(record.getAlarmConfigId());
        alarmHandleInfo.setAlarmTime(record.getAlarmTime());
        alarmHandleInfo.setState(AlarmRecordState.normal);
        alarmHandleInfo.setType(AlarmHandleType.system);
        alarmHandleInfo.setDescribe(LocaleUtils.resolveMessage((String)"message.scene_triggered_relieve_alarm", (String)"\u573a\u666f\u89e6\u53d1\u89e3\u9664\u544a\u8b66", (Object[])new Object[0]));
        return this.handleHistoryRepository.save((Object)AlarmHandleHistoryEntity.of(alarmHandleInfo)).then();
    }

    private Mono<AlarmInfo> triggerAlarm(AlarmInfo result) {
        AlarmRecordEntity record = this.ofRecord(result);
        return ((ReactiveUpdate)((ReactiveUpdate)((ReactiveUpdate)this.alarmRecordService.createUpdate().set((Object)record)).where(GenericEntity::getId, record.getId())).and(AlarmRecordEntity::getState, (Object)AlarmRecordState.warning)).execute().onErrorResume(err -> {
            log.error("trigger alarm error", err);
            return Reactors.ALWAYS_ZERO;
        }).flatMap(total -> {
            AlarmHistoryInfo historyInfo = this.createHistory(record, result);
            result.setAlarmTime(record.getAlarmTime());
            if (total == 0) {
                result.setFirstAlarm(true);
                result.setAlarming(false);
                return this.alarmRecordService.save((Object)record).then(this.historyService.save(historyInfo)).then(this.publishAlarmRecord(historyInfo, result)).then(this.publishEvent(historyInfo)).then(this.saveAlarmCache(result, record));
            }
            result.setFirstAlarm(false);
            result.setAlarming(true);
            return this.historyService.save(historyInfo).then(this.publishEvent(historyInfo)).then(this.saveAlarmCache(result, record));
        });
    }

    private Mono<Void> publishEvent(AlarmHistoryInfo historyInfo) {
        return Mono.fromRunnable(() -> this.eventPublisher.publishEvent((Object)historyInfo));
    }

    private AlarmHistoryInfo createHistory(AlarmRecordEntity record, AlarmInfo alarmInfo) {
        AlarmHistoryInfo info = new AlarmHistoryInfo();
        info.setId((String)IDGenerator.RANDOM.generate());
        info.setAlarmConfigId(record.getAlarmConfigId());
        info.setAlarmConfigName(record.getAlarmName());
        info.setAlarmRecordId((String)record.getId());
        info.setLevel(record.getLevel());
        info.setAlarmTime(record.getAlarmTime());
        info.setTargetName(record.getTargetName());
        info.setTargetId(record.getTargetId());
        info.setTargetType(record.getTargetType());
        info.setSourceType(record.getSourceType());
        info.setSourceName(record.getSourceName());
        info.setSourceId(record.getSourceId());
        info.setAlarmInfo(ObjectMappers.toJsonString(alarmInfo.getData().getOutput()));
        return info;
    }

    public Mono<Void> publishAlarmRecord(AlarmHistoryInfo historyInfo, AlarmInfo alarmInfo) {
        String topic = Topics.alarm((String)historyInfo.getTargetType(), (String)historyInfo.getTargetId(), (String)historyInfo.getAlarmConfigId());
        return this.eventBus.publish(topic, (Object)historyInfo).then();
    }

    private Mono<AlarmInfo> saveAlarmCache(AlarmInfo result, AlarmRecordEntity record) {
        return this.updateRecordCache((String)record.getId(), cache -> cache.with(result)).thenReturn((Object)result);
    }

    private Mono<AlarmInfo> getAlarmInfo(ConfigStorage alarm) {
        return alarm.getConfigs(configInfoKey).mapNotNull(values -> {
            if (values.getString("state", AlarmState.enabled.name()).equals(AlarmState.disabled.name())) {
                return null;
            }
            AlarmInfo result = (AlarmInfo)FastBeanCopier.copy((Object)values.getAllValues(), (Object)new AlarmInfo(), (String[])new String[0]);
            if (result.getAlarmConfigId() == null || result.getAlarmName() == null) {
                return null;
            }
            return result;
        });
    }

    private Mono<ConfigStorage> getAlarmStorage(String alarmId) {
        return this.storageManager.getStorage("alarm:" + alarmId);
    }

    @EventListener
    public void handleConfigEvent(EntitySavedEvent<AlarmConfigEntity> event) {
        event.async((Publisher)Flux.fromIterable((Iterable)event.getEntity()).flatMap(e -> this.eventBus.publish(TOPIC_ALARM_CONFIG_SAVE, (Object)e)));
    }

    @EventListener
    public void handleConfigEvent(EntityCreatedEvent<AlarmConfigEntity> event) {
        event.async((Publisher)Flux.fromIterable((Iterable)event.getEntity()).flatMap(e -> this.eventBus.publish(TOPIC_ALARM_CONFIG_SAVE, (Object)e)));
    }

    @EventListener
    public void handleConfigEvent(EntityModifyEvent<AlarmConfigEntity> event) {
        event.async((Publisher)Flux.fromIterable((Iterable)event.getAfter()).flatMap(e -> this.eventBus.publish(TOPIC_ALARM_CONFIG_SAVE, (Object)e)));
    }

    @EventListener
    public void handleConfigEvent(EntityDeletedEvent<AlarmConfigEntity> event) {
        event.async((Publisher)Flux.fromIterable((Iterable)event.getEntity()).flatMap(e -> this.eventBus.publish(TOPIC_ALARM_CONFIG_DELETE, (Object)e)));
    }

    @Subscribe(value={"/_sys/device-alarm-config/save"}, features={Subscription.Feature.local, Subscription.Feature.broker})
    public Mono<Void> handleAlarmConfig(AlarmConfigEntity entity) {
        return this.getAlarmStorage((String)entity.getId()).flatMap(store -> store.setConfigs(entity.toConfigMap())).then();
    }

    @Subscribe(value={"/_sys/device-alarm-rule/del"}, features={Subscription.Feature.local, Subscription.Feature.broker})
    public Mono<Void> removeAlarmConfig(AlarmConfigEntity entity) {
        return this.getAlarmStorage((String)entity.getId()).flatMap(ConfigStorage::clear).then();
    }

    @EventListener
    public void handleBindEvent(EntitySavedEvent<AlarmRuleBindEntity> event) {
        event.async((Publisher)Flux.fromIterable((Iterable)event.getEntity()).flatMap(e -> this.eventBus.publish(TOPIC_ALARM_RULE_BIND, (Object)e)));
    }

    @EventListener
    public void handleBindEvent(EntityCreatedEvent<AlarmRuleBindEntity> event) {
        event.async((Publisher)Flux.fromIterable((Iterable)event.getEntity()).flatMap(e -> this.eventBus.publish(TOPIC_ALARM_RULE_BIND, (Object)e)));
    }

    @EventListener
    public void handleBindEvent(EntityDeletedEvent<AlarmRuleBindEntity> event) {
        event.async((Publisher)Flux.fromIterable((Iterable)event.getEntity()).flatMap(e -> this.eventBus.publish(TOPIC_ALARM_RULE_UNBIND, (Object)e)));
    }

    @Subscribe(value={"/_sys/device-alarm-rule/unbind"}, features={Subscription.Feature.local, Subscription.Feature.broker})
    public void handleUnBind(AlarmRuleBindEntity entity) {
        Integer index = entity.getBranchIndex();
        if (index == null) {
            index = -1;
        }
        this.ruleAlarmBinds.compute((Tuple2<String, Integer>)Tuples.of((Object)entity.getRuleId(), (Object)index), (key, value) -> {
            if (value == null) {
                return null;
            }
            value.remove(entity.getAlarmId());
            if (value.isEmpty()) {
                return null;
            }
            return value;
        });
    }

    @Subscribe(value={"/_sys/device-alarm-rule/bind"}, features={Subscription.Feature.local, Subscription.Feature.broker})
    public void handleBind(AlarmRuleBindEntity entity) {
        Integer index = entity.getBranchIndex();
        if (index == null) {
            index = -1;
        }
        this.ruleAlarmBinds.computeIfAbsent((Tuple2<String, Integer>)Tuples.of((Object)entity.getRuleId(), (Object)index), ignore -> ConcurrentHashMap.newKeySet()).add(entity.getAlarmId());
    }

    public void run(String ... args) throws Exception {
        this.bindRepository.createQuery().fetch().doOnNext(this::handleBind).thenMany((Publisher)this.alarmConfigService.createQuery().fetch().doOnNext(this::handleAlarmConfig)).subscribe();
    }

    private Mono<RecordCache> getRecordCache(String recordId) {
        return this.storageManager.getStorage("alarm-records").flatMap(store -> store.getConfig(recordId).map(val -> (RecordCache)val.as(RecordCache.class)));
    }

    private Mono<RecordCache> updateRecordCache(String recordId, Function<RecordCache, RecordCache> handler) {
        return this.storageManager.getStorage("alarm-records").flatMap(store -> store.getConfig(recordId).map(val -> (RecordCache)val.as(RecordCache.class)).switchIfEmpty(Mono.fromSupplier(RecordCache::new)).mapNotNull(handler).flatMap(cache -> store.setConfig(recordId, cache).thenReturn(cache)));
    }

    public DefaultAlarmRuleHandler(AlarmRecordService alarmRecordService, AlarmHistoryService historyService, ConfigStorageManager storageManager, ApplicationEventPublisher eventPublisher, EventBus eventBus, ReactiveRepository<AlarmRuleBindEntity, String> bindRepository, ReactiveRepository<AlarmHandleHistoryEntity, String> handleHistoryRepository, AlarmConfigService alarmConfigService) {
        this.alarmRecordService = alarmRecordService;
        this.historyService = historyService;
        this.storageManager = storageManager;
        this.eventPublisher = eventPublisher;
        this.eventBus = eventBus;
        this.bindRepository = bindRepository;
        this.handleHistoryRepository = handleHistoryRepository;
        this.alarmConfigService = alarmConfigService;
    }

    public static class RecordCache
    implements Externalizable {
        static final byte stateNormal = 1;
        static final byte stateAlarming = 2;
        byte state;
        long alarmTime;
        long lastAlarmTime;

        public boolean isAlarming() {
            return this.state == 2;
        }

        public RecordCache withNormal() {
            this.state = 1;
            return this;
        }

        public RecordCache withAlarming() {
            this.state = (byte)2;
            return this;
        }

        public RecordCache with(AlarmRuleHandler.Result record) {
            this.lastAlarmTime = this.alarmTime == 0L ? record.getAlarmTime() : this.alarmTime;
            this.alarmTime = record.getAlarmTime();
            this.state = record.isAlarming() || record.isFirstAlarm() ? (byte)2 : (byte)1;
            return this;
        }

        @Override
        public void writeExternal(ObjectOutput out) throws IOException {
            out.writeByte(this.state);
            out.writeLong(this.alarmTime);
            out.writeLong(this.lastAlarmTime);
        }

        @Override
        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
            this.state = in.readByte();
            this.alarmTime = in.readLong();
            this.lastAlarmTime = in.readLong();
        }
    }

    public static class AlarmInfo
    extends AlarmRuleHandler.Result {
        private String ownerId;
        private AlarmData data;
        private boolean cached;

        @Override
        public AlarmInfo copyWith(AlarmTargetInfo targetInfo) {
            AlarmInfo result = (AlarmInfo)FastBeanCopier.copy((Object)this, (Object)new AlarmInfo(), (String[])new String[0]);
            result.setTargetType(targetInfo.getTargetType());
            result.setTargetId(targetInfo.getTargetId());
            result.setTargetName(targetInfo.getTargetName());
            result.setSourceId(targetInfo.getSourceId());
            result.setSourceType(targetInfo.getSourceType());
            result.setSourceName(targetInfo.getSourceName());
            return result;
        }

        public AlarmInfo with(RecordCache cache) {
            this.setAlarmTime(cache.alarmTime);
            this.setLastAlarmTime(cache.lastAlarmTime);
            this.setAlarming(cache.isAlarming());
            this.cached = true;
            return this;
        }

        public String createRecordId() {
            return AlarmRecordEntity.generateId(this.getTargetId(), this.getTargetType(), this.getAlarmConfigId());
        }

        public String getOwnerId() {
            return this.ownerId;
        }

        public AlarmData getData() {
            return this.data;
        }

        public boolean isCached() {
            return this.cached;
        }

        public void setOwnerId(String ownerId) {
            this.ownerId = ownerId;
        }

        public void setData(AlarmData data) {
            this.data = data;
        }

        public void setCached(boolean cached) {
            this.cached = cached;
        }
    }
}

