/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.notify.manager.service;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.hswebframework.ezorm.rdb.mapping.ReactiveQuery;
import org.hswebframework.ezorm.rdb.mapping.ReactiveUpdate;
import org.hswebframework.web.api.crud.entity.GenericEntity;
import org.hswebframework.web.authorization.Authentication;
import org.hswebframework.web.authorization.ReactiveAuthenticationHolder;
import org.hswebframework.web.crud.events.EntityCreatedEvent;
import org.hswebframework.web.crud.events.EntityDeletedEvent;
import org.hswebframework.web.crud.events.EntityModifyEvent;
import org.hswebframework.web.crud.events.EntitySavedEvent;
import org.hswebframework.web.crud.service.GenericReactiveCrudService;
import org.jetlinks.community.notify.manager.entity.Notification;
import org.jetlinks.community.notify.manager.entity.NotifySubscriberEntity;
import org.jetlinks.community.notify.manager.enums.SubscribeState;
import org.jetlinks.community.notify.manager.subscriber.SubscriberProvider;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.event.EventBus;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

@Service
public class NotifySubscriberService
extends GenericReactiveCrudService<NotifySubscriberEntity, String>
implements CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(NotifySubscriberService.class);
    private final EventBus eventBus;
    private final ClusterManager clusterManager;
    private final Map<String, SubscriberProvider> providers = new ConcurrentHashMap<String, SubscriberProvider>();
    private final Map<String, Disposable> subscribers = new ConcurrentHashMap<String, Disposable>();

    public NotifySubscriberService(EventBus eventBus, ClusterManager clusterManager, List<SubscriberProvider> providers) {
        this.eventBus = eventBus;
        this.clusterManager = clusterManager;
        for (SubscriberProvider provider : providers) {
            this.providers.put(provider.getId(), provider);
        }
    }

    public Optional<SubscriberProvider> getProvider(String provider) {
        return Optional.ofNullable(provider).map(this.providers::get);
    }

    private void doStart() {
        this.clusterManager.getTopic("notification-changed").subscribe().subscribe(this::handleSubscribe);
    }

    protected void doNotifyChange(NotifySubscriberEntity entity) {
        this.clusterManager.getTopic("notification-changed").publish((Publisher)Mono.just((Object)((Object)entity))).retry(3L).subscribe();
    }

    @EventListener
    public void handleEvent(EntityCreatedEvent<NotifySubscriberEntity> entity) {
        entity.getEntity().forEach(this::doNotifyChange);
    }

    @EventListener
    public void handleEvent(EntitySavedEvent<NotifySubscriberEntity> entity) {
        entity.getEntity().forEach(this::doNotifyChange);
    }

    @EventListener
    public void handleEvent(EntityDeletedEvent<NotifySubscriberEntity> entity) {
        entity.getEntity().forEach(e -> {
            e.setState(SubscribeState.disabled);
            this.doNotifyChange((NotifySubscriberEntity)((Object)e));
        });
    }

    @EventListener
    public void handleEvent(EntityModifyEvent<NotifySubscriberEntity> entity) {
        entity.getAfter().forEach(this::doNotifyChange);
    }

    private void handleSubscribe(NotifySubscriberEntity entity) {
        if (entity.getState() == SubscribeState.disabled) {
            Optional.ofNullable(this.subscribers.remove(entity.getId())).ifPresent(Disposable::dispose);
            log.debug("unsubscribe:{}({}),{}", new Object[]{entity.getTopicProvider(), entity.getTopicName(), entity.getId()});
            return;
        }
        Notification template = Notification.from(entity);
        String dispatch = template.createTopic();
        Disposable old = this.subscribers.put((String)entity.getId(), Mono.zip((Mono)ReactiveAuthenticationHolder.get((String)entity.getSubscriber()), (Mono)Mono.justOrEmpty(this.getProvider(entity.getTopicProvider()))).flatMap(tp2 -> ((SubscriberProvider)tp2.getT2()).createSubscriber((String)entity.getId(), (Authentication)tp2.getT1(), entity.getTopicConfig())).flatMap(subscriber -> subscriber.subscribe().map(template::copyWithMessage).flatMap(notification -> this.eventBus.publish(dispatch, notification)).onErrorContinue((err, obj) -> log.error(err.getMessage(), err)).then()).subscribe());
        log.debug("subscribe :{}({})", (Object)template.getTopicProvider(), (Object)template.getTopicName());
        if (null != old) {
            log.debug("close old subscriber:{}({})", (Object)template.getTopicProvider(), (Object)template.getTopicName());
            old.dispose();
        }
    }

    public Mono<Void> doSubscribe(NotifySubscriberEntity entity) {
        return Mono.justOrEmpty(this.getProvider(entity.getTopicProvider())).switchIfEmpty(Mono.error(() -> new IllegalArgumentException("\u4e0d\u652f\u6301\u7684\u4e3b\u9898:" + entity.getTopicProvider()))).map(provider -> {
            entity.setTopicName(provider.getName());
            return entity;
        }).flatMap(subEntity -> {
            if (StringUtils.isEmpty((Object)entity.getId())) {
                entity.setId(null);
                return this.save((Publisher)Mono.just((Object)((Object)entity)));
            }
            return ((ReactiveUpdate)((ReactiveUpdate)((ReactiveUpdate)((ReactiveUpdate)this.createUpdate().set((Object)entity)).where(GenericEntity::getId, entity.getId())).and(NotifySubscriberEntity::getSubscriberType, (Object)entity.getSubscriberType())).and(NotifySubscriberEntity::getSubscriber, (Object)entity.getSubscriber())).execute();
        }).then();
    }

    public void run(String ... args) {
        this.doStart();
        ((ReactiveQuery)this.createQuery().where(NotifySubscriberEntity::getState, (Object)SubscribeState.enabled)).fetch().doOnNext(this::handleSubscribe).subscribe();
    }
}

