package com.artfess.uc.kafka; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationListener; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.kafka.listener.MessageListenerContainer; import org.springframework.stereotype.Component; /** * @Author: wsf * @Description: 手动启动Kafka监听 * 所有 Bean 初始化完成、Spring 容器刷新完毕、 * ApplicationRunner 和 CommandLineRunner 都已执行完毕之后 * @DateTime: 2025/6/19 10:44 **/ @Component @Slf4j public class KafkaConsumerStarter implements ApplicationListener { private final KafkaListenerEndpointRegistry registry; @Autowired public KafkaConsumerStarter(KafkaListenerEndpointRegistry registry) { this.registry = registry; } @Override public void onApplicationEvent(ApplicationReadyEvent event) { try { // 如果你有多个监听器,可根据 ID 启动指定的 String[] listenerIds = new String[]{ "orgListenerContainer", "userListenerContainer", "roleListenerContainer" }; for (String listenerId : listenerIds) { MessageListenerContainer container = registry.getListenerContainer(listenerId); if (container != null && !container.isRunning()) { container.start(); log.info("Kafka Listener [{}] 已手动启动", listenerId); } else { log.warn("Kafka Listener [{}] 未注册或已在运行", listenerId); } } } catch (Exception e) { log.error("KafkaListener 启动异常:{}", e.getMessage(), e); } } }