package org.jetlinks.community.notify.manager.service;

import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.hswebframework.web.authorization.Authentication;
import org.hswebframework.web.authorization.ReactiveAuthenticationHolder;
import org.hswebframework.web.crud.events.EntityCreatedEvent;
import org.hswebframework.web.crud.events.EntityDeletedEvent;
import org.hswebframework.web.crud.events.EntityModifyEvent;
import org.hswebframework.web.crud.events.EntitySavedEvent;
import org.hswebframework.web.crud.service.GenericReactiveCrudService;
import org.jetlinks.community.notify.manager.entity.Notification;
import org.jetlinks.community.notify.manager.entity.NotifySubscriberEntity;
import org.jetlinks.community.notify.manager.enums.SubscribeState;
import org.jetlinks.community.notify.manager.subscriber.Notify;
import org.jetlinks.community.notify.manager.subscriber.SubscriberProvider;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.event.EventBus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
/* loaded from: input_file:org/jetlinks/community/notify/manager/service/NotifySubscriberService.class */
public class NotifySubscriberService extends GenericReactiveCrudService<NotifySubscriberEntity, String> implements CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(NotifySubscriberService.class);
    private final EventBus eventBus;
    private final ClusterManager clusterManager;
    private final Map<String, SubscriberProvider> providers = new ConcurrentHashMap();
    private final Map<String, Disposable> subscribers = new ConcurrentHashMap();

    public NotifySubscriberService(EventBus eventBus, ClusterManager clusterManager, List<SubscriberProvider> list) {
        this.eventBus = eventBus;
        this.clusterManager = clusterManager;
        for (SubscriberProvider subscriberProvider : list) {
            this.providers.put(subscriberProvider.getId(), subscriberProvider);
        }
    }

    public Optional<SubscriberProvider> getProvider(String str) {
        Optional ofNullable = Optional.ofNullable(str);
        Map<String, SubscriberProvider> map = this.providers;
        map.getClass();
        return ofNullable.map((v1) -> {
            return r1.get(v1);
        });
    }

    private void doStart() {
        this.clusterManager.getTopic("notification-changed").subscribe().subscribe(this::handleSubscribe);
    }

    protected void doNotifyChange(NotifySubscriberEntity notifySubscriberEntity) {
        this.clusterManager.getTopic("notification-changed").publish(Mono.just(notifySubscriberEntity)).retry(3L).subscribe();
    }

    @EventListener
    public void handleEvent(EntityCreatedEvent<NotifySubscriberEntity> entityCreatedEvent) {
        entityCreatedEvent.getEntity().forEach(this::doNotifyChange);
    }

    @EventListener
    public void handleEvent(EntitySavedEvent<NotifySubscriberEntity> entitySavedEvent) {
        entitySavedEvent.getEntity().forEach(this::doNotifyChange);
    }

    @EventListener
    public void handleEvent(EntityDeletedEvent<NotifySubscriberEntity> entityDeletedEvent) {
        entityDeletedEvent.getEntity().forEach(notifySubscriberEntity -> {
            notifySubscriberEntity.setState(SubscribeState.disabled);
            doNotifyChange(notifySubscriberEntity);
        });
    }

    @EventListener
    public void handleEvent(EntityModifyEvent<NotifySubscriberEntity> entityModifyEvent) {
        entityModifyEvent.getAfter().forEach(this::doNotifyChange);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void handleSubscribe(NotifySubscriberEntity notifySubscriberEntity) {
        if (notifySubscriberEntity.getState() == SubscribeState.disabled) {
            Optional.ofNullable(this.subscribers.remove(notifySubscriberEntity.getId())).ifPresent((v0) -> {
                v0.dispose();
            });
            log.debug("unsubscribe:{}({}),{}", new Object[]{notifySubscriberEntity.getTopicProvider(), notifySubscriberEntity.getTopicName(), notifySubscriberEntity.getId()});
            return;
        }
        Notification from = Notification.from(notifySubscriberEntity);
        String createTopic = from.createTopic();
        Disposable disposable = (Disposable) this.subscribers.put(notifySubscriberEntity.getId(), Mono.zip(ReactiveAuthenticationHolder.get(notifySubscriberEntity.getSubscriber()), Mono.justOrEmpty(getProvider(notifySubscriberEntity.getTopicProvider()))).flatMap(tuple2 -> {
            return ((SubscriberProvider) tuple2.getT2()).createSubscriber((String) notifySubscriberEntity.getId(), (Authentication) tuple2.getT1(), notifySubscriberEntity.getTopicConfig());
        }).flatMap(subscriber -> {
            Flux<Notify> subscribe = subscriber.subscribe();
            from.getClass();
            return subscribe.map(from::copyWithMessage).flatMap(notification -> {
                return this.eventBus.publish(createTopic, notification);
            }).onErrorContinue((th, obj) -> {
                log.error(th.getMessage(), th);
            }).then();
        }).subscribe());
        log.debug("subscribe :{}({})", from.getTopicProvider(), from.getTopicName());
        if (null != disposable) {
            log.debug("close old subscriber:{}({})", from.getTopicProvider(), from.getTopicName());
            disposable.dispose();
        }
    }

    public Mono<Void> doSubscribe(NotifySubscriberEntity notifySubscriberEntity) {
        return Mono.justOrEmpty(getProvider(notifySubscriberEntity.getTopicProvider())).switchIfEmpty(Mono.error(() -> {
            return new IllegalArgumentException("不支持的主题:" + notifySubscriberEntity.getTopicProvider());
        })).map(subscriberProvider -> {
            notifySubscriberEntity.setTopicName(subscriberProvider.getName());
            return notifySubscriberEntity;
        }).flatMap(notifySubscriberEntity2 -> {
            if (!StringUtils.isEmpty(notifySubscriberEntity.getId())) {
                return createUpdate().set(notifySubscriberEntity).where((v0) -> {
                    return v0.getId();
                }, notifySubscriberEntity.getId()).and((v0) -> {
                    return v0.getSubscriberType();
                }, notifySubscriberEntity.getSubscriberType()).and((v0) -> {
                    return v0.getSubscriber();
                }, notifySubscriberEntity.getSubscriber()).execute();
            }
            notifySubscriberEntity.setId(null);
            return save(Mono.just(notifySubscriberEntity));
        }).then();
    }

    public void run(String... strArr) {
        doStart();
        createQuery().where((v0) -> {
            return v0.getState();
        }, SubscribeState.enabled).fetch().doOnNext(this::handleSubscribe).subscribe();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1517517730:
                if (implMethodName.equals("getSubscriber")) {
                    z = false;
                    break;
                }
                break;
            case 98245393:
                if (implMethodName.equals("getId")) {
                    z = 3;
                    break;
                }
                break;
            case 1224681656:
                if (implMethodName.equals("getSubscriberType")) {
                    z = 2;
                    break;
                }
                break;
            case 1965583067:
                if (implMethodName.equals("getState")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/hswebframework/ezorm/core/StaticMethodReferenceColumn") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/jetlinks/community/notify/manager/entity/NotifySubscriberEntity") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getSubscriber();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/hswebframework/ezorm/core/StaticMethodReferenceColumn") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/jetlinks/community/notify/manager/entity/NotifySubscriberEntity") && serializedLambda.getImplMethodSignature().equals("()Lorg/jetlinks/community/notify/manager/enums/SubscribeState;")) {
                    return (v0) -> {
                        return v0.getState();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/hswebframework/ezorm/core/StaticMethodReferenceColumn") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/jetlinks/community/notify/manager/entity/NotifySubscriberEntity") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getSubscriberType();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/hswebframework/ezorm/core/StaticMethodReferenceColumn") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/hswebframework/web/api/crud/entity/GenericEntity") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getId();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
