package org.jetlinks.community.gateway.external;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.stereotype.Component;
import org.springframework.util.AntPathMatcher;
import org.springframework.util.PathMatcher;
import reactor.core.publisher.Flux;

@Component
/* loaded from: input_file:org/jetlinks/community/gateway/external/DefaultMessagingManager.class */
public class DefaultMessagingManager implements MessagingManager, BeanPostProcessor {
    private final Map<String, SubscriptionProvider> subProvider = new ConcurrentHashMap();
    private static final PathMatcher matcher = new AntPathMatcher();

    @Override // org.jetlinks.community.gateway.external.MessagingManager
    public Flux<Message> subscribe(SubscribeRequest subscribeRequest) {
        return Flux.defer(() -> {
            for (Map.Entry<String, SubscriptionProvider> entry : this.subProvider.entrySet()) {
                if (matcher.match(entry.getKey(), subscribeRequest.getTopic())) {
                    return entry.getValue().subscribe(subscribeRequest).map(obj -> {
                        return obj instanceof Message ? (Message) obj : Message.success(subscribeRequest.getId(), subscribeRequest.getTopic(), obj);
                    });
                }
            }
            return Flux.error(new UnsupportedOperationException("不支持的topic"));
        });
    }

    public void register(SubscriptionProvider subscriptionProvider) {
        for (String str : subscriptionProvider.getTopicPattern()) {
            this.subProvider.put(str, subscriptionProvider);
        }
    }

    public Object postProcessAfterInitialization(Object obj, String str) throws BeansException {
        if (obj instanceof SubscriptionProvider) {
            register((SubscriptionProvider) obj);
        }
        return obj;
    }
}
