package org.jetlinks.community.notify;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nonnull;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;

@Component
/* loaded from: input_file:org/jetlinks/community/notify/DefaultNotifierManager.class */
public class DefaultNotifierManager implements NotifierManager, BeanPostProcessor, CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(DefaultNotifierManager.class);
    private final Map<String, Map<String, NotifierProvider>> providers = new ConcurrentHashMap();
    private Map<String, Notifier> notifiers = new ConcurrentHashMap();
    private NotifyConfigManager configManager;
    private EventBus eventBus;

    public DefaultNotifierManager(EventBus eventBus, NotifyConfigManager notifyConfigManager) {
        this.configManager = notifyConfigManager;
        this.eventBus = eventBus;
    }

    protected Mono<NotifierProperties> getProperties(NotifyType notifyType, String str) {
        return this.configManager.getNotifyConfig(notifyType, str);
    }

    @Override // org.jetlinks.community.notify.NotifierManager
    public Mono<Void> reload(String str) {
        return doReload(str).then(this.eventBus.publish("/_sys/notifier/reload", str)).then();
    }

    private Mono<String> doReload(String str) {
        log.debug("reload notifer config {}", str);
        return Mono.justOrEmpty(this.notifiers.remove(str)).flatMap((v0) -> {
            return v0.close();
        }).thenReturn(str);
    }

    @Override // org.jetlinks.community.notify.NotifierManager
    @Nonnull
    public Mono<Notifier> createNotifier(NotifierProperties notifierProperties) {
        return Mono.justOrEmpty(this.providers.get(notifierProperties.getType())).switchIfEmpty(Mono.error(() -> {
            return new UnsupportedOperationException("不支持的通知类型:" + notifierProperties.getType());
        })).flatMap(map -> {
            return Mono.justOrEmpty(map.get(notifierProperties.getProvider()));
        }).switchIfEmpty(Mono.error(() -> {
            return new UnsupportedOperationException("不支持的服务商:" + notifierProperties.getProvider());
        })).flatMap(notifierProvider -> {
            return notifierProvider.createNotifier(notifierProperties);
        }).map(notifier -> {
            return new NotifierEventDispatcher(this.eventBus, notifier);
        }).flatMap(notifierEventDispatcher -> {
            return Mono.justOrEmpty(this.notifiers.put(notifierProperties.getId(), notifierEventDispatcher)).flatMap((v0) -> {
                return v0.close();
            }).thenReturn(notifierEventDispatcher);
        });
    }

    @Override // org.jetlinks.community.notify.NotifierManager
    @Nonnull
    public Mono<Notifier> getNotifier(@Nonnull NotifyType notifyType, @Nonnull String str) {
        return Mono.justOrEmpty(this.notifiers.get(str)).switchIfEmpty(Mono.defer(() -> {
            return getProperties(notifyType, str).flatMap(this::createNotifier);
        }));
    }

    public void registerProvider(NotifierProvider notifierProvider) {
        this.providers.computeIfAbsent(notifierProvider.getType().getId(), str -> {
            return new ConcurrentHashMap();
        }).put(notifierProvider.getProvider().getId(), notifierProvider);
    }

    public Object postProcessAfterInitialization(Object obj, String str) throws BeansException {
        if (obj instanceof NotifierProvider) {
            registerProvider((NotifierProvider) obj);
        }
        return obj;
    }

    public void run(String... strArr) throws Exception {
        this.eventBus.subscribe(Subscription.builder().subscriberId("notifier-loader").topics(new String[]{"/_sys/notifier/reload"}).justBroker().build(), String.class).flatMap(str -> {
            return doReload(str).onErrorResume(th -> {
                log.error("reload notifer config error", th);
                return Mono.empty();
            });
        }).subscribe();
    }
}
