package com.artfess.rabbitmq.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Exchange; import org.springframework.amqp.core.ExchangeBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author 周渝森 * @date 2020/8/19 20:42 */ @Configuration public class RabbitMqConfig { /** * 队列 */ @Bean(name = "msg") public Queue queueMsg() { return new Queue(RabbitMqQueue.QUEUE_MSG); } /** * 队列 */ @Bean(name = "scada_webSocket") public Queue queueScadaWebSocket() { return QueueBuilder.durable(RabbitMqQueue.QUEUE_PLAT_SCADA_WEbSOCKET).build(); } /** * 队列 */ @Bean(name = "scada") public Queue queueScada() { return new Queue(RabbitMqQueue.QUEUE_PLAT_SCADA,true); } /** * 队列 */ @Bean(name = "cmsState") public Queue queueCmsState() { return new Queue(RabbitMqQueue.QUEUE_PLAT_CMS_STATE); } /** * 队列 */ @Bean(name = "cmsNow") public Queue queueCmsNow() { return new Queue(RabbitMqQueue.QUEUE_PLAT_CMS_NOW); } /** * 队列 */ @Bean(name = "scadaMsg") public Queue queueScadaMsg() { return new Queue(RabbitMqQueue.QUEUE_SCADA_MSG,true); } /** * 交换机 */ @Bean public Exchange exchange() { return ExchangeBuilder.topicExchange(RabbitMqQueue.EXCHANGE).durable(true).build(); } //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man //这样只要是消息携带的路由键是topic.man,才会分发到该队列 @Bean Binding bindingExchangeMessage() { return BindingBuilder.bind(queueMsg()).to(exchange()).with(RabbitMqQueue.QUEUE_MSG).noargs(); } @Bean Binding bindingExchangeScada() { return BindingBuilder.bind(queueScada()).to(exchange()).with(RabbitMqQueue.QUEUE_PLAT_SCADA).noargs(); } @Bean Binding bindingExchangeScadaWebSocket(@Qualifier("scada_webSocket") Queue queueScadaWebSocket, Exchange exchange) { return BindingBuilder.bind(queueScadaWebSocket).to(exchange).with("hz.WindPlat.scada.#").noargs();//*表示一个词,#表示零个或多个词 } @Bean Binding bindingExchangeScadaMsg() { return BindingBuilder.bind(queueScadaMsg()).to(exchange()).with(RabbitMqQueue.QUEUE_SCADA_MSG).noargs(); } @Bean Binding bindingExchangeCmsState() { return BindingBuilder.bind(queueCmsState()).to(exchange()).with(RabbitMqQueue.QUEUE_PLAT_CMS_STATE).noargs(); } @Bean Binding bindingExchangeCmsNow() { return BindingBuilder.bind(queueCmsNow()).to(exchange()).with(RabbitMqQueue.QUEUE_PLAT_CMS_NOW).noargs(); } //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.# // 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列 /* @Bean Binding bindingExchangeMessages() { return BindingBuilder.bind(queueMsg()).to(exchange()).with("hz.#").noargs(); }*/ // @Bean // Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, Exchange exchange) { // return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#").noargs();//*表示一个词,#表示零个或多个词 // } @Bean(name = "messageConverter") MessageConverter messageConverter() { return new Jackson2JsonMessageConverter(); } @Bean //@Scope("prototype") public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,@Qualifier("messageConverter") MessageConverter messageConverter) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); //确保消息发送失败后可以重新返回到队列中 rabbitTemplate.setMandatory(true); //消息投递到队列失败回调处理 rabbitTemplate.setReturnCallback(new RabbitMqReturnCallback()); rabbitTemplate.setConfirmCallback(new RabbitMqConfirmCallback()); rabbitTemplate.setMessageConverter(messageConverter); return rabbitTemplate; } }