package com.artfess.uc.kafka; import cn.hutool.json.JSONUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; /** * @Author: wsf * @Description: kafka 生产者 * @DateTime: 2025/6/17 11:22 **/ @Component @Slf4j public class KafkaProducer { @Autowired private KafkaTemplate kafkaTemplate; /** * 组织同步订阅Topic null Json序列化内容 **/ public static final String ORG_TOPIC = "basic-org-info"; /** * 用户同步订阅Topic null Json序列化内容 **/ public static final String USER_TOPIC = "basic-user-info"; /** * 角色同步订阅Topic null Json序列化内容 **/ public static final String ROLE_TOPIC = "basic-userrole-info"; /** * 安全生产系统 订阅自定义分组 **/ public static final String TOPIC_GROUP = "consumer-xcjy-user"; public void send(Object obj, String topicKey) { String obj2String = JSONUtil.toJsonStr(obj); log.info("准备发送消息为:{}", obj2String); //发送消息 ListenableFuture> future = kafkaTemplate.send(topicKey, obj); future.addCallback(new ListenableFutureCallback>() { @Override public void onFailure(Throwable throwable) { //发送失败的处理 System.out.println("发送消息失败"); log.error("{} - 生产者 发送消息失败:{}", topicKey, throwable.getMessage()); } @Override public void onSuccess(SendResult stringObjectSendResult) { //成功的处理 System.out.println("发送消息成功"); log.info("{} - 生产者 发送消息成功:{}", topicKey, stringObjectSendResult.toString()); } }); } }