package com.artfess.uc.kafka;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.stereotype.Component;

/**
 * @Author: wsf
 * @Description: 手动启动Kafka监听
 * 所有 Bean 初始化完成、Spring 容器刷新完毕、
 * ApplicationRunner 和 CommandLineRunner 都已执行完毕之后
 * @DateTime: 2025/6/19 10:44
 **/
@Component
@Slf4j
public class KafkaConsumerStarter implements ApplicationListener<ApplicationReadyEvent> {

    private final KafkaListenerEndpointRegistry registry;

    @Autowired
    public KafkaConsumerStarter(KafkaListenerEndpointRegistry registry) {
        this.registry = registry;
    }

    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        try {
            // 如果你有多个监听器，可根据 ID 启动指定的
            String[] listenerIds = new String[]{
                    "orgListenerContainer",
                    "userListenerContainer",
                    "roleListenerContainer"
            };

            for (String listenerId : listenerIds) {
                MessageListenerContainer container = registry.getListenerContainer(listenerId);
                if (container != null && !container.isRunning()) {
                    container.start();
                    log.info("Kafka Listener [{}] 已手动启动", listenerId);
                } else {
                    log.warn("Kafka Listener [{}] 未注册或已在运行", listenerId);
                }
            }
        } catch (Exception e) {
            log.error("KafkaListener 启动异常：{}", e.getMessage(), e);
        }
    }
}
