SpringBoot 的支持
SpringBoot 已经提供了对 AMQP 协议完全支持的
spring-boot-starter-amqp
依赖,引入此依赖即可快速方便的在 SpringBoot 中使用 RabbitMQ。参考:Spring AMQP。
特点
- 用于异步处理消费消息的监听器容器。
- 用于发送和接收消息的 RabbitTemplate。
- RabbitAdmin 用于自动声明队列、交换和绑定。
RabbitAdmin
作用
- declareExchange:创建交换机。
- deleteExchange:删除交换机。
- declareQueue:创建队列。
- deleteQueue:删除队列。
- purge:清空队列。
- declareBinding:新建绑定关系。
- removeBinding:删除绑定关系。
- getQueueProperties:查询队列属性。
加粗的为常用。
创建方式
// 连接工厂CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");// 使用连接工厂创建 RabbitAdminRabbitAdmin rabbitAdmin =newRabbitAdmin(connectionFactory);
编程式实现
@ConfigurationpublicclassRabbitConfig{publicstaticfinalString EXCHANGE_NAME ="exchange.cat.dog";publicstaticfinalString EXCHANGE_DLX ="exchange.dlx";publicstaticfinalString QUEUE_NAME ="queue.cat";publicstaticfinalString QUEUE_DLX ="queue.dlx";publicstaticfinalString KEY_NAME ="key.yingduan";publicstaticfinalString KEY_DLX ="#";@PostConstructvoidrabbitAdmin(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");// 创建 RabbitAdminRabbitAdmin rabbitAdmin =newRabbitAdmin(connectionFactory);// 声明交换机DirectExchange directExchange =newDirectExchange(EXCHANGE_NAME);
rabbitAdmin.declareExchange(directExchange);// 声明队列Queue queue =newQueue(QUEUE_NAME);
rabbitAdmin.declareQueue(queue);// 声明绑定关系// 目的地名称、目的地类型、绑定交换机、绑定 key、参数Binding binding =newBinding(QUEUE_NAME,Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME,null);
rabbitAdmin.declareBinding(binding);}}
声明式实现(推荐)
@Slf4j@ConfigurationpublicclassRabbitConfig{publicstaticfinalString EXCHANGE_NAME ="exchange.cat.dog";publicstaticfinalString EXCHANGE_DLX ="exchange.dlx";publicstaticfinalString QUEUE_NAME ="queue.cat";publicstaticfinalString QUEUE_DLX ="queue.dlx";publicstaticfinalString KEY_NAME ="key.yingduan";publicstaticfinalString KEY_DLX ="#";@BeanConnectionFactoryconnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");return connectionFactory;}@BeanRabbitAdminrabbitAdmin(@AutowiredConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}@BeanExchangeexchange(){returnnewDirectExchange(EXCHANGE_NAME);}@BeanQueuequeue(){returnnewQueue(QUEUE_NAME);}@BeanBindingbinding(){// 目的地名称、目的地类型、绑定交换机、绑定 key、参数returnnewBinding(QUEUE_NAME,Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME,null);}}
注意,以上配置再启动 SpringBoot 并不会立马创建交换机、队列、绑定,SpringBoot AMQP 有懒加载,需要等到使用
connection
时才会创建。**什么是使用
connection
呢**?
- 比如创建
connection
@BeanConnectionFactoryconnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
connectionFactory.createConnection();return connectionFactory;}
- 再比如监听了队列
@RabbitListener(queues ={"test"})voidtest(){
log.info("【测试监听消息】");}
死信队列机制
死信队列需要在创建 Queue 时指定对应属性:
@BeanQueuequeue(){// 配置声明队列时使用的参数Map<String,Object> args =newHashMap<>(1);// 设置死信队列指向的交换机
args.put("x-dead-letter-exchange", EXCHANGE_DLX);returnnewQueue(QUEUE_NAME,true,false,false, args);}
RabbitTemplate
RabbitTemplate
是 SpringBoot AMQP 提供的快速发 RabbitMQ 消息的模板类,与 RestTemplate 有类似之处,意指方便、简单、快速的发 RabbitMQ 消息。
创建
@BeanRabbitTemplaterabbitTemplate(@AutowiredConnectionFactory connectionFactory){returnnewRabbitTemplate(connectionFactory);}
发送消息
// 通过 Spring 到处注入使用即可。
rabbitTemplate.send(EXCHANGE_NAME, KEY_NAME,newMessage("HelloWorld 中国".getBytes(StandardCharsets.UTF_8)))
rabbitTemplate.convertAndSend(EXCHANGE_NAME, KEY_NAME,"HelloWorld 中国");Message message = rabbitTemplate.sendAndReceive(RabbitConfig.EXCHANGE_NAME,RabbitConfig.KEY_NAME,newMessage("HelloWorld 中国".getBytes(StandardCharsets.UTF_8)));
send(final String exchange, final String routingKey, final Message message)
(常用)
普通的消息发送,
Message
的带参构造中可以传递参数,比如消息过期时间。
convertAndSend(String exchange, String routingKey, final Object object)(常用)
可以转换 Java 对象成 AMQP 消息进行发送。
Message sendAndReceive(final String exchange, final String routingKey, final Message message)
阻塞等待 5 秒钟,返回的 Message 就是服务端返回的数据,阻塞时间可以使用
rabbitTemplate.setReplyTimeout(10000)
设置。
发送端确认机制 和 消息返回机制
之前的《RabbitMQ 消息百分百投递方案》中有详细的记录过非 SpringBoot 的
发送端确认机制
和
消息返回机制
。那改成 SpringBoot AMQP 之后肯定也是支持的。之前推荐使用
同步单条消息确认机制
,可以准确知道是哪一条消息出现问题方便做处理。
同步多条
和
异步
都不好确定是具体哪一条出现问题。
SpringBoot AMQP 提供的需要先配置
connectionFactory
:
@BeanConnectionFactoryconnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");// 发送端确认的类型
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);// 开启消息返回机制
connectionFactory.setPublisherReturns(true);return connectionFactory;}
在
RibbitTemplate
中配置回调函数:
@BeanRabbitTemplaterabbitTemplate(@AutowiredConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);// 开启 Mandatory
rabbitTemplate.setMandatory(true);// 配置 发送端确认 回调函数
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);});// 配置 消息返回 回调函数,只有在消息没有从 Exchange 正确路由到 Queue 时才有回调。
rabbitTemplate.setReturnsCallback(returned ->{
log.info("【消息返回】- 入参; returned: ${}$", returned);});return rabbitTemplate;}
show:
2022-07-0513:24:16.549 INFO 59768---[nectionFactory1]com.lynchj.rabbitmq.config.RabbitConfig: 【消息返回】- 入参; returned: $ReturnedMessage[message=(Body:'[B@3589027c(byte[17])' MessageProperties[headers={}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=exchange.cat.dog, routingKey=key.yingduan]$
2022-07-0513:24:16.550 INFO 59768---[nectionFactory2]com.lynchj.rabbitmq.config.RabbitConfig: 【发送端确认】- 入参; correlationData: $null$, ack: $true$, cause: $null$
ConfirmType.CORRELATED
上面的配置,在发送端确认时是无法区分消息是哪一个的,观察日志也能看出来,就打印了一个 ack 的值。要想关联上对应的消息需要做如下配置:
// 发送端确认的类型从 SIMPLE 更改为 CORRELATED
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
CORRELATED:指有关联性的。
在发送消息是修改如下:
// 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
rabbitTemplate.send(RabbitConfig.EXCHANGE_NAME,RabbitConfig.KEY_NAME,newMessage("HelloWorld 中国".getBytes(StandardCharsets.UTF_8)),
correlationData
);
show:
2022-07-0513:30:09.779 INFO 54416---[nectionFactory1]com.lynchj.rabbitmq.config.RabbitConfig: 【发送端确认】- 入参; correlationData: $CorrelationData[id=976c94a6-2fa8-45dd-84e1-691c0db31460]$, ack: $true$, cause: $null$
SimpleMessageListenerContainer
SimpleMessageListenerContainer
可以帮助在开发中高效的监听消息,可以设置坚挺队列、设置消费者数量、重回队列、消息确认模式等等。主要功能如下:
- 设置同时监听多个队列、自动启动、自动配置RabbitMQ。
- 设置消费者数量(最大数量、最小数量、批量消费)。
- 设置消息确认模式、是否重回队列、异常捕获。
- 设置是否独占、其他消费者属性等。
- 设置具体的监听器、消息转换器等。
- 支持动态设置,运行中修改监听器配置。
代码实现
@BeanSimpleMessageListenerContainermessageListenerContainer(@AutowiredConnectionFactory connectionFactory){SimpleMessageListenerContainer messageListenerContainer =newSimpleMessageListenerContainer(connectionFactory);// 监听队列,可多个
messageListenerContainer.setQueueNames(QUEUE_NAME);// 并发处理的线程最小数目,不能大于 maxConcurrentConsumers
messageListenerContainer.setConcurrentConsumers(1);// 并发处理的线程最大数目,不能小于 concurrentConsumers
messageListenerContainer.setMaxConcurrentConsumers(1);// Ack 的方式
messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 消费端限流
messageListenerContainer.setPrefetchCount(1);// 设置监听消息处理方法
messageListenerContainer.setMessageListener((ChannelAwareMessageListener)(message, channel)->{
log.info("【消费消息】- 入参;message: ${}$", message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);});return messageListenerContainer;}
MessageListenerAdapter
上边实现的消费者监听是通过
messageListenerContainer.setMessageListener()
方法实现,业务代码写到了配置的代码中,耦合性比较强,更优雅一点的做法是使用
MessageListenerAdapter
。
@BeanSimpleMessageListenerContainermessageListenerContainer(@AutowiredConnectionFactory connectionFactory){SimpleMessageListenerContainer messageListenerContainer =newSimpleMessageListenerContainer(connectionFactory);// 监听队列,可多个
messageListenerContainer.setQueueNames(QUEUE_NAME);// 并发处理的线程最小数目,不能大于 maxConcurrentConsumers
messageListenerContainer.setConcurrentConsumers(1);// 并发处理的线程最大数目,不能小于 concurrentConsumers
messageListenerContainer.setMaxConcurrentConsumers(1);// Ack 的方式
messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 消费端限流
messageListenerContainer.setPrefetchCount(1);// 设置监听消息处理方法/*messageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.info("【消费消息】- 入参;message: ${}$", message);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
});*/// 创建消息监听适配器MessageListenerAdapter messageListenerAdapter =newMessageListenerAdapter();
messageListenerAdapter.setDelegate(handleMessage);// 设置处理消息的适配器
messageListenerContainer.setMessageListener(messageListenerAdapter);return messageListenerContainer;}
handleMessage
是注入的另一个类:
@Slf4j@ComponentpublicclassHandleMessage{voidhandleMessage(byte[] message)throwsIOException{
log.info("【消费消息】- 入参;message: ${}$",newString(message,StandardCharsets.UTF_8));}}
经过包装一层
MessageListenerAdapter
适配器,可以把真实的处理方法解耦出去,
MessageListenerAdapter
的
setDelegate()
方法设置了任意一个 Object,等到有消息消费时,会调用到这个 Object 的
handleMessage
方法,这个方法名是
MessageListenerAdapter
内部的一个常量:
也可以通过调用
MessageListenerAdapter
的
setDefaultListenerMethod()
方法来更改默认调用方法名。
还可以配置监听多个队列,并给不同的队列设置不同的处理方法:
// 监听多个队列
messageListenerContainer.setQueueNames("cat","dog","queue.dog.cat");// 创建消息监听适配器MessageListenerAdapter adapter =newMessageListenerAdapter(handleMessage);// 设置真实处理业务消息的默认方法名称,如果没有设置,那么默认的处理器中的默认方式是 handleMessage 方法
adapter.setDefaultListenerMethod("onMessage");// 配置队列与真实处理业务消息的方法对应名称Map<String,String> queueOrTagToMethodName =newHashMap<>(8);
queueOrTagToMethodName.put("cat","onCat");
queueOrTagToMethodName.put("dog","onDog");
queueOrTagToMethodName.put("queue.dog.cat","onInfo");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);// 设置处理消息的适配器
messageListenerContainer.setMessageListener(adapter);
@Slf4j@ComponentpublicclassHandleMessage{voidhandleMessage(byte[] message){
log.info("【消费消息】- 入参;message: ${}$",newString(message,StandardCharsets.UTF_8));}voidonCat(byte[] message){
log.info("【消费消息】- 入参;message: ${}$",newString(message,StandardCharsets.UTF_8));}voidonDog(byte[] message){
log.info("【消费消息】- 入参;message: ${}$",newString(message,StandardCharsets.UTF_8));}voidonInfo(byte[] message){
log.info("【消费消息】- 入参;message: ${}$",newString(message,StandardCharsets.UTF_8));}}
注意:美中不足的是
MessageListenerAdapter
中适配的真实处理业务消息的方法入参只能是
byte[]
。
MessageConverter
先说说其作用,之前收发消息时,使用了 Byte[] 数组作为消息体,而在编写业务逻辑时,需要使用 Java 对象,这样就避免不了要来回从 Byte[] <> String <> Java 对象之间的相互转换。MessageConverter 就是用来在收发消息时自动转换 AMQP 内部消息和 Java 对象的。
MessageConverter
本身是接口,无法直接使用,不过 AMQP 内已经提供了一个其实现
org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
方便直接使用,一般其况下使用这个就足够了,因为项目中大部分应该都是 JSON 形式的数据。当然,如果出现一些比较少见的格式,也可以自定义,只需要重写
toMessage
和
fromMessage
即可。
Jackson2JsonMessageConverter
Student
Model:
@NoArgsConstructor@AllArgsConstructor@DatapublicclassStudent{privateString name;privateInteger age;}
发消息的方法:
voidsend()throwsException{// 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());Student student =newStudent("大漠知秋",18);ObjectMapper objectMapper =newObjectMapper();String s = objectMapper.writeValueAsString(student);
rabbitTemplate.send(RabbitConfig.EXCHANGE_NAME,RabbitConfig.KEY_NAME,newMessage(s.getBytes(StandardCharsets.UTF_8)),
correlationData
);}
消费者监听配置:
@BeanSimpleMessageListenerContainermessageListenerContainer(@AutowiredConnectionFactory connectionFactory){SimpleMessageListenerContainer messageListenerContainer =newSimpleMessageListenerContainer(connectionFactory);// 监听队列,可多个
messageListenerContainer.setQueueNames(QUEUE_NAME);// 并发处理的线程最小数目,不能大于 maxConcurrentConsumers
messageListenerContainer.setConcurrentConsumers(1);// 并发处理的线程最大数目,不能小于 concurrentConsumers
messageListenerContainer.setMaxConcurrentConsumers(1);// Ack 的方式
messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 消费端限流
messageListenerContainer.setPrefetchCount(1);// 创建消息监听适配器MessageListenerAdapter messageListenerAdapter =newMessageListenerAdapter();
messageListenerAdapter.setDelegate(handleMessage);// 配置 MessageConverterJackson2JsonMessageConverter messageConverter =newJackson2JsonMessageConverter();
messageListenerAdapter.setMessageConverter(messageConverter);// 设置处理消息的适配器
messageListenerContainer.setMessageListener(messageListenerAdapter);return messageListenerContainer;}
HandleMessage
:
@Slf4j@ComponentpublicclassHandleMessage{voidhandleMessage(Student student){
log.info("【消费消息】- 入参;message: ${}$", student);}}
如果仅仅是这样配置上
MessageConverter
就启动的话会报如下错:
Caused by:org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:Failedtoinvoke target method 'handleMessage' withargument type =[classjava.util.LinkedHashMap], value =[{{name=大漠知秋, age=18}}]
Jackson2JsonMessageConverter
默认转换的 Java 对象为 LinkedHashMap,而在
handleMessage
处理方法中的参数是
Student
,所以就报错了。需要指定一下类型:
// 配置 MessageConverterJackson2JsonMessageConverter messageConverter =newJackson2JsonMessageConverter();
messageConverter.setClassMapper(newClassMapper(){@OverridepublicvoidfromClass(Class<?> clazz,MessageProperties properties){}@OverridepublicClass<?>toClass(MessageProperties properties){returnStudent.class;}});
messageListenerAdapter.setMessageConverter(messageConverter);
show:
2022-07-0517:19:22.724 INFO 34140---[enerContainer-1]c.lynchj.rabbitmq.handle.HandleMessage: 【消费消息】- 入参;message: $Student(name=大漠知秋, age=18)$
@RabbitListener(终极监听方案)
使用此方案做监听消息功能,就可以把之前的
SimpleMessageListenerContainer
进行监听的方案舍弃掉了,就是这么的喜新厌旧,不过之前的 SimpleMessageListenerContainer 也不是一无是处,学过之后可以更好的理解内部的一些逻辑。
@RabbitListener
的特点:
- RabbitListener 是 SpringBoot 架构中监听消息的
终极方案
。 - RabbitListener 使用注解声明,对业务代码无侵入。
- RabbitListener 可以在 SpringBoot 配置文件中进行配置。
@RabbitListener
本身是 Java 中的注解,可以搭配其他注解一起使用:
- @Exchange:自动声明 Exchange。
- @Queue:自动声明队列。
- @QueueBinding:自动声明绑定关系。
基本使用
首先在
RabbitConfig
中新增创建
RabbitListenerContainerFactory
的 Bean,看名字应该就知道是用来替换掉
SimpleMessageListenerContainer
的工厂。方便后边使用
@RabbitListener
时创建
ListenerContainer
。
@Slf4j@ConfigurationpublicclassRabbitConfig{publicstaticfinalString EXCHANGE_NAME ="exchange.cat.dog";publicstaticfinalString EXCHANGE_DLX ="exchange.dlx";publicstaticfinalString QUEUE_NAME ="queue.cat";publicstaticfinalString QUEUE_DLX ="queue.dlx";publicstaticfinalString KEY_NAME ="key.yingduan";publicstaticfinalString KEY_DLX ="#";publicstaticfinalString RABBIT_ADMIN ="rabbitAdmin";publicstaticfinalString RABBIT_LISTENER_CONTAINER_FACTORY ="rabbitListenerContainerFactory";@BeanConnectionFactoryconnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");// 发送端确认的类型
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);// 开启消息返回机制
connectionFactory.setPublisherReturns(true);return connectionFactory;}@Bean(name = RABBIT_ADMIN)RabbitAdminrabbitAdmin(@AutowiredConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}@BeanExchangeexchange(){returnnewDirectExchange(EXCHANGE_NAME);}@BeanQueuequeue(){// 配置声明队列时使用的参数Map<String,Object> args =newHashMap<>(1);// 设置死信队列指向的交换机
args.put("x-dead-letter-exchange", EXCHANGE_DLX);returnnewQueue(QUEUE_NAME,true,false,false, args);}@BeanBindingbinding(){// 目的地名称、目的地类型、绑定交换机、绑定 key、参数returnnewBinding(QUEUE_NAME,Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME,null);}@BeanRabbitTemplaterabbitTemplate(@AutowiredConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);// 开启 Mandatory
rabbitTemplate.setMandatory(true);// 配置 发送端确认 回调函数
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);});// 配置 消息返回 回调函数,只有在消息没有从 Exchange 正确路由到 Queue 时才有回调。
rabbitTemplate.setReturnsCallback(returned ->{
log.info("【消息返回】- 入参; returned: ${}$", returned);});return rabbitTemplate;}@Bean(name = RABBIT_LISTENER_CONTAINER_FACTORY)RabbitListenerContainerFactoryrabbitListenerContainerFactory(@AutowiredConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory listenerContainerFactory =newSimpleRabbitListenerContainerFactory();
listenerContainerFactory.setConnectionFactory(connectionFactory);return listenerContainerFactory;}}
发送端:
@Slf4j@ComponentpublicclassPublisherConfirm{@ResourceprivateRabbitTemplate rabbitTemplate;@PostConstructvoidsend()throwsException{// 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());Student student =newStudent("大漠知秋",18);ObjectMapper objectMapper =newObjectMapper();String s = objectMapper.writeValueAsString(student);
rabbitTemplate.send(RabbitConfig.EXCHANGE_NAME,RabbitConfig.KEY_NAME,newMessage(s.getBytes(StandardCharsets.UTF_8)),
correlationData
);}}
消费者端:
@Slf4j@ComponentpublicclassRabbitListenerTest{@RabbitListener(
containerFactory =RabbitConfig.RABBIT_LISTENER_CONTAINER_FACTORY,
queues ={RabbitConfig.QUEUE_NAME})voidlistenCat(@PayloadMessage message){
log.info("【消费消息】- 入参;message: ${}$", message);
log.info("【消费消息】- student: ${}$",newString(message.getBody(),StandardCharsets.UTF_8));}}
show:
2022-07-0518:03:03.101 INFO 60560---[ntContainer#0-1]c.l.rabbitmq.handle.RabbitListenerTest: 【消费消息】- 入参;message: $(Body:'[B@251d4c4b(byte[32])' MessageProperties[headers={spring_listener_return_correlation=0e26e018-0e0a-43af-a197-67428c8fc800, spring_returned_message_correlation=e8937d7d-1257-46a2-93ec-e65bd2fac5ad}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.cat.dog, receivedRoutingKey=key.yingduan, deliveryTag=1, consumerTag=amq.ctag-A8eb9Qb1Uwyrdz6KcsvwBA, consumerQueue=queue.cat])$
2022-07-0518:03:03.101 INFO 60560---[ntContainer#0-1]c.l.rabbitmq.handle.RabbitListenerTest: 【消费消息】- student: ${"name":"大漠知秋","age":18}$
使用 bindings 创建 Exchange、Queue、Binding
简化后的
RabbitConfig
:
@Slf4j@ConfigurationpublicclassRabbitConfig{publicstaticfinalString EXCHANGE_NAME ="exchange.cat.dog";publicstaticfinalString EXCHANGE_DLX ="exchange.dlx";publicstaticfinalString QUEUE_NAME ="queue.cat";publicstaticfinalString QUEUE_DLX ="queue.dlx";publicstaticfinalString KEY_NAME ="key.yingduan";publicstaticfinalString KEY_DLX ="#";publicstaticfinalString RABBIT_ADMIN ="rabbitAdmin";publicstaticfinalString RABBIT_LISTENER_CONTAINER_FACTORY ="rabbitListenerContainerFactory";@BeanConnectionFactoryconnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");// 发送端确认的类型
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);// 开启消息返回机制
connectionFactory.setPublisherReturns(true);return connectionFactory;}@Bean(name = RABBIT_ADMIN)RabbitAdminrabbitAdmin(@AutowiredConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}@BeanRabbitTemplaterabbitTemplate(@AutowiredConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);// 开启 Mandatory
rabbitTemplate.setMandatory(true);// 配置 发送端确认 回调函数
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);});// 配置 消息返回 回调函数,只有在消息没有从 Exchange 正确路由到 Queue 时才有回调。
rabbitTemplate.setReturnsCallback(returned ->{
log.info("【消息返回】- 入参; returned: ${}$", returned);});return rabbitTemplate;}@Bean(name = RABBIT_LISTENER_CONTAINER_FACTORY)RabbitListenerContainerFactoryrabbitListenerContainerFactory(@AutowiredConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory listenerContainerFactory =newSimpleRabbitListenerContainerFactory();
listenerContainerFactory.setConnectionFactory(connectionFactory);return listenerContainerFactory;}}
RabbitListenerTest
:
@Slf4j@ComponentpublicclassRabbitListenerTest{@RabbitListener(
containerFactory =RabbitConfig.RABBIT_LISTENER_CONTAINER_FACTORY,// 指定 RabbitAdmin,创建 Exchange、Queue、Binding 时使用
admin =RabbitConfig.RABBIT_ADMIN,// 绑定关系,没有的 Exchange、Queue、Binding 没有的会自动创建。
bindings ={// 第一个绑定关系,可以多个@QueueBinding(// 队列
value =@Queue(// 队列名
name =RabbitConfig.QUEUE_NAME,// 队列参数
arguments ={// 队列中消息超时时间@Argument(
name ="x-message-ttl",
value ="1000",
type ="java.lang.Integer"),// 死信队列配置信息@Argument(
name ="x-dead-letter-exchange",
value =RabbitConfig.EXCHANGE_DLX,// 默认值就是 String, 也可以不写
type ="java.lang.String")}),// 交换机
exchange =@Exchange(// 交换机名
name =RabbitConfig.EXCHANGE_NAME
),// 绑定 Key
key ={RabbitConfig.KEY_NAME})})voidlistenCat(@PayloadMessage message){
log.info("【消费消息】- 入参;message: ${}$", message);
log.info("【消费消息】- student: ${}$",newString(message.getBody(),StandardCharsets.UTF_8));}}
这种方式注解写到崩溃,不建议使用。。。
SpringBoot 使用 RabbitMQ 终极方案
SpringBoot 的开发原则就是
约定大于配置
,上面的代码中,还存在着不少
@Bean
的配置代码,这显然很不 SpringBoot,应该把一些常规配置,配置到
.yml
或
.properties
中,让项目可以从配置文件中自动加载好 Bean,对此,SpringBoot AMQP 包提供了对应的支持。
配置文件(.yml/.properties)
spring:rabbitmq:host:'localhost'port:5672username:'admin'password:'kzh_mxg4vfb2QRP*xkv'virtual-host:'/'# 发送端确认机制开启,并且使用关联性的类型publisher-confirm-type: correlated
# 开启消息返回机制publisher-returns:truetemplate:# 开启委托,配合 publisher-returns 使用mandatory:truelistener:simple:# Ack 模式acknowledge-mode: manual
# 消费者限流prefetch:10# 并发处理的线程最小数目,不能大于 max-concurrencyconcurrency:3# 并发处理的线程最大数目,不能小于 concurrencymax-concurrency:5
RabbitConfig
主要是配置
发送端确认回调
、
消息返回回调
、
Exchange
、
Queue
、
Binding
的创建。
@Slf4j@ConfigurationpublicclassRabbitConfigimplementsInitializingBean{publicstaticfinalString EXCHANGE_NAME ="exchange.cat.dog";publicstaticfinalString EXCHANGE_DLX ="exchange.dlx";publicstaticfinalString QUEUE_NAME ="queue.cat";publicstaticfinalString QUEUE_DLX ="queue.dlx";publicstaticfinalString KEY_NAME ="key.yingduan";publicstaticfinalString KEY_DLX ="#";@ResourceprivateRabbitTemplate rabbitTemplate;@OverridepublicvoidafterPropertiesSet()throwsException{// 发送端确认 回调配置
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);});// 消息返回 回调配置,只有在 Exchange 无法路由到 Queue 时回调
rabbitTemplate.setReturnsCallback(returned ->{
log.error("【消息返回】- 入参; returned: ${}$", returned);});}@BeanExchangeexchange(){returnnewDirectExchange(EXCHANGE_NAME);}@BeanQueuequeue(){// 配置声明队列时使用的参数Map<String,Object> args =newHashMap<>(1);// 设置死信队列指向的交换机
args.put("x-dead-letter-exchange", EXCHANGE_DLX);// 设置队列内消息过期时间,单位:毫秒
args.put("x-message-ttl",15000);returnnewQueue(QUEUE_NAME,true,false,false, args);}@BeanBindingbinding(){// 目的地名称、目的地类型、绑定交换机、绑定 key、参数returnnewBinding(QUEUE_NAME,Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME,null);}@BeanTopicExchangedlxExchange(){returnnewTopicExchange(EXCHANGE_DLX);}@BeanQueuedlxQueue(){returnnewQueue(QUEUE_DLX);}@BeanBindingdlxBinding(){// 目的地名称、目的地类型、绑定交换机、绑定 key、参数returnnewBinding(QUEUE_DLX,Binding.DestinationType.QUEUE, EXCHANGE_DLX, KEY_DLX,null);}}
发送消息
@Slf4j@RestController@RequestMapping("/send")publicclassSendController{@ResourceprivateRabbitTemplate rabbitTemplate;@GetMapping("/sendOne/{name}")voidsendOne(@PathVariable(name ="name")String name)throwsJsonProcessingException{
log.info("【sendOne】- 入参: ${}$", name);// 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());Student student =newStudent(name,18);ObjectMapper objectMapper =newObjectMapper();String s = objectMapper.writeValueAsString(student);MessageProperties messageProperties =newMessageProperties();// 消息过期时间 10 秒
messageProperties.setExpiration("10000");
rabbitTemplate.send(RabbitConfig.EXCHANGE_NAME,RabbitConfig.KEY_NAME,newMessage(s.getBytes(StandardCharsets.UTF_8), messageProperties),
correlationData
);}}
监听消息
@Slf4j@ComponentpublicclassRabbitListeners{@RabbitListener(queues ={RabbitConfig.QUEUE_NAME})voidlistenCat(String content,@PayloadMessage message,Channel channel)throwsIOException,InterruptedException{
log.info("【消费消息】- 入参;content: ${}$, message: ${}$, channel: ${}$", content, message, channel);
log.info("【消费消息】- student: ${}$",newString(message.getBody(),StandardCharsets.UTF_8));TimeUnit.SECONDS.sleep(3);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
Student
@NoArgsConstructor@AllArgsConstructor@DatapublicclassStudent{privateString name;privateInteger age;}
说明
这一套下来包括了
发送端确认
、
消息返回
、
手动 Ack
、
消费者限流
、
消息过期
、
死信队列
。可以有效地保证消息的发送、路由、消费能够正常执行。
版权归原作者 大漠知秋 所有, 如有侵权,请联系我们删除。