package com.artfess.rocketmq.producer; import com.artfess.rocketmq.model.MessageBody; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import java.util.UUID; /** * @author 陈实 * @Package com.artfess.bzzgj.util * @date 2023/4/13 17:19 * @Description: */ @Slf4j @Component public class RocketMQUtil { private static enum MSG_TYPE{ ONEWAY, ASYNC, SYNC }; @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送消息,通用 * @param msg_type * @param topic * @param payload */ private void sendMsg(MSG_TYPE msg_type, String topic, Object payload, String msgSource){ String msgId = UUID.randomUUID().toString().replace("-", ""); MessageBody msgBody = new MessageBody(msgId, payload , msgSource); Message message = MessageBuilder.withPayload(msgBody).setHeader("KEYS",msgId ).build(); log.info(String.format("消息发送 MQService 开始: %s %s", topic, message)); SendResult result = null; switch (msg_type) { case ONEWAY: //单向 rocketMQTemplate.sendOneWay(topic, message); break; case ASYNC: //异步 rocketMQTemplate.asyncSend(topic, message,new SendCallback() { @Override public void onSuccess(SendResult sendResult) { } @Override public void onException(Throwable throwable) { log.error("MQService:" + ExceptionUtils.getStackTrace(throwable)); throw new RuntimeException(String.format("消息发送失败 topic_tag:%s", topic )); } }); break; case SYNC: //同步 result = rocketMQTemplate.syncSend(topic, message); break; } log.info(String.format("消息发送 MQService 结束: msgId: %s dest: %s msg: %s",result != null ? result.getMsgId() : "", topic, message)); } /** * 同步发送消息,会确认应答 * @param topic 主题名称 * @param payload 消息 * @param msgSource 消息来源 */ public void syncSendMsg(String topic, Object payload, String msgSource){ sendMsg(MSG_TYPE.SYNC,topic, payload,msgSource) ; } /** * 同步发送消息,会确认应答 * @param topic 主题名称 * @param tag 主题标签(与topic组合成一个新的主题名称) * @param payload 消息 * @param msgSource 消息来源 */ public void syncSendMsg(String topic, String tag, Object payload, String msgSource){ // 发送的消息体,消息体必须存在 // 业务主键作为消息key String destination = topic + ":" + tag; syncSendMsg(destination, payload,msgSource); } /** * 异步消息发送,异步日志确认异常 * @param topic 主题名称 * @param payload 消息 * @param msgSource 消息来源 */ public void asyncSendMsg(String topic, Object payload, String msgSource){ sendMsg(MSG_TYPE.ASYNC,topic, payload,msgSource); } /** * 异步消息发送,异步日志确认异常 * @param topic 主题名称 * @param tag 主题标签(与topic组合成一个新的主题名称) * @param payload 消息 * @param msgSource 消息来源 */ public void asyncSendMsg(String topic, String tag, Object payload, String msgSource){ // 发送的消息体,消息体必须存在 // 业务主键作为消息key String destination = topic + ":" + tag; asyncSendMsg(destination, payload,msgSource); } /** * 单向发送消息,不关注结果 * @param topic 主题名称 * @param payload 消息 * @param msgSource 消息来源 */ public void oneWaySendMsg(String topic, Object payload, String msgSource){ sendMsg(MSG_TYPE.ONEWAY,topic, payload,msgSource); } /** * 单向发送消息,不关注结果 * @param topic 主题名称 * @param tag 主题标签(与topic组合成一个新的主题名称) * @param payload 消息 * @param msgSource 消息来源 */ public void oneWaySendMsg(String topic, String tag, Object payload, String msgSource){ // 发送的消息体,消息体必须存在 // 业务主键作为消息key String destination = topic + ":" + tag; oneWaySendMsg(destination, payload,msgSource); } }