/*
 * Decompiled with CFR 0.152.
 */
package org.jetlinks.community.gateway.spring;

import java.lang.reflect.AnnotatedElement;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.hswebframework.web.logger.ReactiveLogger;
import org.hswebframework.web.utils.TemplateParser;
import org.jetlinks.community.gateway.annotation.Subscribe;
import org.jetlinks.community.gateway.spring.ProxyMessageListener;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.jetlinks.core.event.TopicPayload;
import org.jetlinks.core.utils.TopicUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationAttributes;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

@Component
public class SpringMessageBroker
implements BeanPostProcessor {
    private static final Logger log = LoggerFactory.getLogger(SpringMessageBroker.class);
    private final EventBus eventBus;
    private final Environment environment;

    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        Class type = ClassUtils.getUserClass((Object)bean);
        ReflectionUtils.doWithMethods((Class)type, method -> {
            AnnotationAttributes subscribes = AnnotatedElementUtils.getMergedAnnotationAttributes((AnnotatedElement)method, Subscribe.class);
            if (CollectionUtils.isEmpty((Map)subscribes)) {
                return;
            }
            String id = subscribes.getString("id");
            if (!StringUtils.hasText((String)id)) {
                id = type.getSimpleName().concat(".").concat(method.getName());
            }
            Subscription subscription = Subscription.builder().subscriberId("spring:" + id).topics((Collection)Arrays.stream(subscribes.getStringArray("value")).map(this::convertTopic).flatMap(topic -> TopicUtils.expand((String)topic).stream()).collect(Collectors.toList())).features((Subscription.Feature[])subscribes.get((Object)"features")).build();
            ProxyMessageListener listener = new ProxyMessageListener(bean, method);
            Consumer logError = ReactiveLogger.onError(error -> log.error("handle[{}] event message error", (Object)listener, error));
            this.eventBus.subscribe(subscription).doOnNext(msg -> {
                try {
                    listener.onMessage((TopicPayload)msg).doOnEach(logError).subscribe();
                }
                catch (Throwable e) {
                    log.error("handle[{}] event message error", (Object)listener, (Object)e);
                }
            }).subscribe();
        });
        return bean;
    }

    protected String convertTopic(String topic) {
        if (!topic.contains("${")) {
            return topic;
        }
        return TemplateParser.parse((String)topic, template -> {
            String[] arr = template.split(":", 2);
            String property = this.environment.getProperty(arr[0], arr.length > 1 ? arr[1] : "");
            if (StringUtils.isEmpty((Object)property)) {
                throw new IllegalArgumentException("Parse topic [" + template + "] error, can not get property : " + arr[0]);
            }
            return property;
        });
    }

    public SpringMessageBroker(EventBus eventBus, Environment environment) {
        this.eventBus = eventBus;
        this.environment = environment;
    }
}

