package org.jetlinks.community.gateway.spring;

import java.util.Arrays;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.hswebframework.web.logger.ReactiveLogger;
import org.hswebframework.web.utils.TemplateParser;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.utils.TopicUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

@Component
/* loaded from: input_file:org/jetlinks/community/gateway/spring/SpringMessageBroker.class */
public class SpringMessageBroker implements BeanPostProcessor {
    private static final Logger log = LoggerFactory.getLogger(SpringMessageBroker.class);
    private final EventBus eventBus;
    private final Environment environment;

    public Object postProcessAfterInitialization(Object obj, String str) throws BeansException {
        Class userClass = ClassUtils.getUserClass(obj);
        ReflectionUtils.doWithMethods(userClass, method -> {
            AnnotationAttributes mergedAnnotationAttributes = AnnotatedElementUtils.getMergedAnnotationAttributes(method, Subscribe.class);
            if (CollectionUtils.isEmpty(mergedAnnotationAttributes)) {
                return;
            }
            String string = mergedAnnotationAttributes.getString("id");
            if (!StringUtils.hasText(string)) {
                string = userClass.getSimpleName().concat(".").concat(method.getName());
            }
            Subscription build = Subscription.builder().subscriberId("spring:" + string).topics((Collection) Arrays.stream(mergedAnnotationAttributes.getStringArray("value")).map(this::convertTopic).flatMap(str2 -> {
                return TopicUtils.expand(str2).stream();
            }).collect(Collectors.toList())).features((Subscription.Feature[]) mergedAnnotationAttributes.get("features")).build();
            ProxyMessageListener proxyMessageListener = new ProxyMessageListener(obj, method);
            Consumer onError = ReactiveLogger.onError(th -> {
                log.error("handle[{}] event message error", proxyMessageListener, th);
            });
            this.eventBus.subscribe(build).doOnNext(topicPayload -> {
                try {
                    proxyMessageListener.onMessage(topicPayload).doOnEach(onError).subscribe();
                } catch (Throwable th2) {
                    log.error("handle[{}] event message error", proxyMessageListener, th2);
                }
            }).subscribe();
        });
        return obj;
    }

    protected String convertTopic(String str) {
        return !str.contains("${") ? str : TemplateParser.parse(str, str2 -> {
            String[] split = str2.split(":", 2);
            String property = this.environment.getProperty(split[0], split.length > 1 ? split[1] : "");
            if (StringUtils.isEmpty(property)) {
                throw new IllegalArgumentException("Parse topic [" + str2 + "] error, can not get property : " + split[0]);
            }
            return property;
        });
    }

    public SpringMessageBroker(EventBus eventBus, Environment environment) {
        this.eventBus = eventBus;
        this.environment = environment;
    }
}
