0


SpringBoot AMQP

SpringBoot 的支持

SpringBoot 已经提供了对 AMQP 协议完全支持的

spring-boot-starter-amqp

依赖,引入此依赖即可快速方便的在 SpringBoot 中使用 RabbitMQ。参考:Spring AMQP。

特点

  • 用于异步处理消费消息的监听器容器。
  • 用于发送和接收消息的 RabbitTemplate。
  • RabbitAdmin 用于自动声明队列、交换和绑定。

RabbitAdmin

作用

  • declareExchange:创建交换机。
  • deleteExchange:删除交换机。
  • declareQueue:创建队列。
  • deleteQueue:删除队列。
  • purge:清空队列。
  • declareBinding:新建绑定关系。
  • removeBinding:删除绑定关系。
  • getQueueProperties:查询队列属性。

加粗的为常用。

创建方式

// 连接工厂CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");// 使用连接工厂创建 RabbitAdminRabbitAdmin rabbitAdmin =newRabbitAdmin(connectionFactory);

RabbitAdmin 方法

编程式实现

@ConfigurationpublicclassRabbitConfig{publicstaticfinalString EXCHANGE_NAME ="exchange.cat.dog";publicstaticfinalString EXCHANGE_DLX  ="exchange.dlx";publicstaticfinalString QUEUE_NAME    ="queue.cat";publicstaticfinalString QUEUE_DLX     ="queue.dlx";publicstaticfinalString KEY_NAME      ="key.yingduan";publicstaticfinalString KEY_DLX       ="#";@PostConstructvoidrabbitAdmin(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");// 创建 RabbitAdminRabbitAdmin rabbitAdmin =newRabbitAdmin(connectionFactory);// 声明交换机DirectExchange directExchange =newDirectExchange(EXCHANGE_NAME);
        rabbitAdmin.declareExchange(directExchange);// 声明队列Queue queue =newQueue(QUEUE_NAME);
        rabbitAdmin.declareQueue(queue);// 声明绑定关系// 目的地名称、目的地类型、绑定交换机、绑定 key、参数Binding binding =newBinding(QUEUE_NAME,Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME,null);
        rabbitAdmin.declareBinding(binding);}}

声明式实现(推荐)

@Slf4j@ConfigurationpublicclassRabbitConfig{publicstaticfinalString EXCHANGE_NAME ="exchange.cat.dog";publicstaticfinalString EXCHANGE_DLX  ="exchange.dlx";publicstaticfinalString QUEUE_NAME    ="queue.cat";publicstaticfinalString QUEUE_DLX     ="queue.dlx";publicstaticfinalString KEY_NAME      ="key.yingduan";publicstaticfinalString KEY_DLX       ="#";@BeanConnectionFactoryconnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");return connectionFactory;}@BeanRabbitAdminrabbitAdmin(@AutowiredConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}@BeanExchangeexchange(){returnnewDirectExchange(EXCHANGE_NAME);}@BeanQueuequeue(){returnnewQueue(QUEUE_NAME);}@BeanBindingbinding(){// 目的地名称、目的地类型、绑定交换机、绑定 key、参数returnnewBinding(QUEUE_NAME,Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME,null);}}

注意,以上配置再启动 SpringBoot 并不会立马创建交换机、队列、绑定,SpringBoot AMQP 有懒加载,需要等到使用

connection

时才会创建。**什么是使用

connection

呢**?

  • 比如创建 connection
@BeanConnectionFactoryconnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
  connectionFactory.setHost("127.0.0.1");
  connectionFactory.setUsername("admin");
  connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");
  connectionFactory.createConnection();return connectionFactory;}
  • 再比如监听了队列
@RabbitListener(queues ={"test"})voidtest(){
  log.info("【测试监听消息】");}

死信队列机制

死信队列需要在创建 Queue 时指定对应属性:

@BeanQueuequeue(){// 配置声明队列时使用的参数Map<String,Object> args =newHashMap<>(1);// 设置死信队列指向的交换机
  args.put("x-dead-letter-exchange", EXCHANGE_DLX);returnnewQueue(QUEUE_NAME,true,false,false, args);}

RabbitTemplate

RabbitTemplate

是 SpringBoot AMQP 提供的快速发 RabbitMQ 消息的模板类,与 RestTemplate 有类似之处,意指方便、简单、快速的发 RabbitMQ 消息。

创建

@BeanRabbitTemplaterabbitTemplate(@AutowiredConnectionFactory connectionFactory){returnnewRabbitTemplate(connectionFactory);}

发送消息

// 通过 Spring 到处注入使用即可。
rabbitTemplate.send(EXCHANGE_NAME, KEY_NAME,newMessage("HelloWorld 中国".getBytes(StandardCharsets.UTF_8)))
rabbitTemplate.convertAndSend(EXCHANGE_NAME, KEY_NAME,"HelloWorld 中国");Message message = rabbitTemplate.sendAndReceive(RabbitConfig.EXCHANGE_NAME,RabbitConfig.KEY_NAME,newMessage("HelloWorld 中国".getBytes(StandardCharsets.UTF_8)));
  • send(final String exchange, final String routingKey, final Message message)(常用)

普通的消息发送,

Message

的带参构造中可以传递参数,比如消息过期时间。

  • convertAndSend(String exchange, String routingKey, final Object object)(常用)

可以转换 Java 对象成 AMQP 消息进行发送。

  • Message sendAndReceive(final String exchange, final String routingKey, final Message message)

阻塞等待 5 秒钟,返回的 Message 就是服务端返回的数据,阻塞时间可以使用

rabbitTemplate.setReplyTimeout(10000)

设置。

发送端确认机制 和 消息返回机制

之前的《RabbitMQ 消息百分百投递方案》中有详细的记录过非 SpringBoot 的

发送端确认机制

消息返回机制

。那改成 SpringBoot AMQP 之后肯定也是支持的。之前推荐使用

同步单条消息确认机制

,可以准确知道是哪一条消息出现问题方便做处理。

同步多条

异步

都不好确定是具体哪一条出现问题。

SpringBoot AMQP 提供的需要先配置

connectionFactory

@BeanConnectionFactoryconnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
  connectionFactory.setHost("127.0.0.1");
  connectionFactory.setUsername("admin");
  connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");// 发送端确认的类型
  connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.SIMPLE);// 开启消息返回机制
  connectionFactory.setPublisherReturns(true);return connectionFactory;}

RibbitTemplate

中配置回调函数:

@BeanRabbitTemplaterabbitTemplate(@AutowiredConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);// 开启 Mandatory
  rabbitTemplate.setMandatory(true);// 配置 发送端确认 回调函数
  rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
    log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);});// 配置 消息返回 回调函数,只有在消息没有从 Exchange 正确路由到 Queue 时才有回调。
  rabbitTemplate.setReturnsCallback(returned ->{
    log.info("【消息返回】- 入参; returned: ${}$", returned);});return rabbitTemplate;}

show:

2022-07-0513:24:16.549  INFO 59768---[nectionFactory1]com.lynchj.rabbitmq.config.RabbitConfig: 【消息返回】- 入参; returned: $ReturnedMessage[message=(Body:'[B@3589027c(byte[17])' MessageProperties[headers={}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, exchange=exchange.cat.dog, routingKey=key.yingduan]$
2022-07-0513:24:16.550  INFO 59768---[nectionFactory2]com.lynchj.rabbitmq.config.RabbitConfig: 【发送端确认】- 入参; correlationData: $null$, ack: $true$, cause: $null$

ConfirmType.CORRELATED

上面的配置,在发送端确认时是无法区分消息是哪一个的,观察日志也能看出来,就打印了一个 ack 的值。要想关联上对应的消息需要做如下配置:

// 发送端确认的类型从 SIMPLE 更改为 CORRELATED
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);

CORRELATED:指有关联性的。

在发送消息是修改如下:

// 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
rabbitTemplate.send(RabbitConfig.EXCHANGE_NAME,RabbitConfig.KEY_NAME,newMessage("HelloWorld 中国".getBytes(StandardCharsets.UTF_8)),
  correlationData
);

show:

2022-07-0513:30:09.779  INFO 54416---[nectionFactory1]com.lynchj.rabbitmq.config.RabbitConfig: 【发送端确认】- 入参; correlationData: $CorrelationData[id=976c94a6-2fa8-45dd-84e1-691c0db31460]$, ack: $true$, cause: $null$

SimpleMessageListenerContainer

SimpleMessageListenerContainer

可以帮助在开发中高效的监听消息,可以设置坚挺队列、设置消费者数量、重回队列、消息确认模式等等。主要功能如下:

  • 设置同时监听多个队列、自动启动、自动配置RabbitMQ。
  • 设置消费者数量(最大数量、最小数量、批量消费)。
  • 设置消息确认模式、是否重回队列、异常捕获。
  • 设置是否独占、其他消费者属性等。
  • 设置具体的监听器、消息转换器等。
  • 支持动态设置,运行中修改监听器配置。

代码实现

@BeanSimpleMessageListenerContainermessageListenerContainer(@AutowiredConnectionFactory connectionFactory){SimpleMessageListenerContainer messageListenerContainer =newSimpleMessageListenerContainer(connectionFactory);// 监听队列,可多个
  messageListenerContainer.setQueueNames(QUEUE_NAME);// 并发处理的线程最小数目,不能大于 maxConcurrentConsumers
  messageListenerContainer.setConcurrentConsumers(1);// 并发处理的线程最大数目,不能小于 concurrentConsumers
  messageListenerContainer.setMaxConcurrentConsumers(1);// Ack 的方式
  messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 消费端限流
  messageListenerContainer.setPrefetchCount(1);// 设置监听消息处理方法
  messageListenerContainer.setMessageListener((ChannelAwareMessageListener)(message, channel)->{
    log.info("【消费消息】- 入参;message: ${}$", message);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);});return messageListenerContainer;}

MessageListenerAdapter

上边实现的消费者监听是通过

messageListenerContainer.setMessageListener()

方法实现,业务代码写到了配置的代码中,耦合性比较强,更优雅一点的做法是使用

MessageListenerAdapter

@BeanSimpleMessageListenerContainermessageListenerContainer(@AutowiredConnectionFactory connectionFactory){SimpleMessageListenerContainer messageListenerContainer =newSimpleMessageListenerContainer(connectionFactory);// 监听队列,可多个
  messageListenerContainer.setQueueNames(QUEUE_NAME);// 并发处理的线程最小数目,不能大于 maxConcurrentConsumers
  messageListenerContainer.setConcurrentConsumers(1);// 并发处理的线程最大数目,不能小于 concurrentConsumers
  messageListenerContainer.setMaxConcurrentConsumers(1);// Ack 的方式
  messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 消费端限流
  messageListenerContainer.setPrefetchCount(1);// 设置监听消息处理方法/*messageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("【消费消息】- 入参;message: ${}$", message);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        });*/// 创建消息监听适配器MessageListenerAdapter messageListenerAdapter =newMessageListenerAdapter();
  messageListenerAdapter.setDelegate(handleMessage);// 设置处理消息的适配器
  messageListenerContainer.setMessageListener(messageListenerAdapter);return messageListenerContainer;}
handleMessage

是注入的另一个类:

@Slf4j@ComponentpublicclassHandleMessage{voidhandleMessage(byte[] message)throwsIOException{
        log.info("【消费消息】- 入参;message: ${}$",newString(message,StandardCharsets.UTF_8));}}

经过包装一层

MessageListenerAdapter

适配器,可以把真实的处理方法解耦出去,

MessageListenerAdapter

setDelegate()

方法设置了任意一个 Object,等到有消息消费时,会调用到这个 Object 的

handleMessage

方法,这个方法名是

MessageListenerAdapter

内部的一个常量:

ORIGINAL_DEFAULT_LISTENER_METHOD

也可以通过调用

MessageListenerAdapter

setDefaultListenerMethod()

方法来更改默认调用方法名。

还可以配置监听多个队列,并给不同的队列设置不同的处理方法:

// 监听多个队列
messageListenerContainer.setQueueNames("cat","dog","queue.dog.cat");// 创建消息监听适配器MessageListenerAdapter adapter =newMessageListenerAdapter(handleMessage);// 设置真实处理业务消息的默认方法名称,如果没有设置,那么默认的处理器中的默认方式是 handleMessage 方法
adapter.setDefaultListenerMethod("onMessage");// 配置队列与真实处理业务消息的方法对应名称Map<String,String> queueOrTagToMethodName =newHashMap<>(8);
queueOrTagToMethodName.put("cat","onCat");
queueOrTagToMethodName.put("dog","onDog");
queueOrTagToMethodName.put("queue.dog.cat","onInfo");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);// 设置处理消息的适配器
messageListenerContainer.setMessageListener(adapter);
@Slf4j@ComponentpublicclassHandleMessage{voidhandleMessage(byte[] message){
        log.info("【消费消息】- 入参;message: ${}$",newString(message,StandardCharsets.UTF_8));}voidonCat(byte[] message){
        log.info("【消费消息】- 入参;message: ${}$",newString(message,StandardCharsets.UTF_8));}voidonDog(byte[] message){
        log.info("【消费消息】- 入参;message: ${}$",newString(message,StandardCharsets.UTF_8));}voidonInfo(byte[] message){
        log.info("【消费消息】- 入参;message: ${}$",newString(message,StandardCharsets.UTF_8));}}

注意:美中不足的是

MessageListenerAdapter

中适配的真实处理业务消息的方法入参只能是

byte[]

MessageConverter

先说说其作用,之前收发消息时,使用了 Byte[] 数组作为消息体,而在编写业务逻辑时,需要使用 Java 对象,这样就避免不了要来回从 Byte[] <> String <> Java 对象之间的相互转换。MessageConverter 就是用来在收发消息时自动转换 AMQP 内部消息和 Java 对象的

MessageConverter

本身是接口,无法直接使用,不过 AMQP 内已经提供了一个其实现

org.springframework.amqp.support.converter.Jackson2JsonMessageConverter

方便直接使用,一般其况下使用这个就足够了,因为项目中大部分应该都是 JSON 形式的数据。当然,如果出现一些比较少见的格式,也可以自定义,只需要重写

toMessage

fromMessage

即可。

Jackson2JsonMessageConverter

Student

Model:

@NoArgsConstructor@AllArgsConstructor@DatapublicclassStudent{privateString name;privateInteger age;}

发消息的方法:

voidsend()throwsException{// 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());Student student =newStudent("大漠知秋",18);ObjectMapper objectMapper =newObjectMapper();String s = objectMapper.writeValueAsString(student);

  rabbitTemplate.send(RabbitConfig.EXCHANGE_NAME,RabbitConfig.KEY_NAME,newMessage(s.getBytes(StandardCharsets.UTF_8)),
    correlationData
  );}

消费者监听配置:

@BeanSimpleMessageListenerContainermessageListenerContainer(@AutowiredConnectionFactory connectionFactory){SimpleMessageListenerContainer messageListenerContainer =newSimpleMessageListenerContainer(connectionFactory);// 监听队列,可多个
  messageListenerContainer.setQueueNames(QUEUE_NAME);// 并发处理的线程最小数目,不能大于 maxConcurrentConsumers
  messageListenerContainer.setConcurrentConsumers(1);// 并发处理的线程最大数目,不能小于 concurrentConsumers
  messageListenerContainer.setMaxConcurrentConsumers(1);// Ack 的方式
  messageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 消费端限流
  messageListenerContainer.setPrefetchCount(1);// 创建消息监听适配器MessageListenerAdapter messageListenerAdapter =newMessageListenerAdapter();
  messageListenerAdapter.setDelegate(handleMessage);// 配置 MessageConverterJackson2JsonMessageConverter messageConverter =newJackson2JsonMessageConverter();
  messageListenerAdapter.setMessageConverter(messageConverter);// 设置处理消息的适配器
  messageListenerContainer.setMessageListener(messageListenerAdapter);return messageListenerContainer;}
HandleMessage

@Slf4j@ComponentpublicclassHandleMessage{voidhandleMessage(Student student){
        log.info("【消费消息】- 入参;message: ${}$", student);}}

如果仅仅是这样配置上

MessageConverter

就启动的话会报如下错:

Caused by:org.springframework.amqp.rabbit.support.ListenerExecutionFailedException:Failedtoinvoke target method 'handleMessage' withargument type =[classjava.util.LinkedHashMap], value =[{{name=大漠知秋, age=18}}]
Jackson2JsonMessageConverter

默认转换的 Java 对象为 LinkedHashMap,而在

handleMessage

处理方法中的参数是

Student

,所以就报错了。需要指定一下类型:

// 配置 MessageConverterJackson2JsonMessageConverter messageConverter =newJackson2JsonMessageConverter();
messageConverter.setClassMapper(newClassMapper(){@OverridepublicvoidfromClass(Class<?> clazz,MessageProperties properties){}@OverridepublicClass<?>toClass(MessageProperties properties){returnStudent.class;}});
messageListenerAdapter.setMessageConverter(messageConverter);

show:

2022-07-0517:19:22.724  INFO 34140---[enerContainer-1]c.lynchj.rabbitmq.handle.HandleMessage: 【消费消息】- 入参;message: $Student(name=大漠知秋, age=18)$

@RabbitListener(终极监听方案)

使用此方案做监听消息功能,就可以把之前的

SimpleMessageListenerContainer

进行监听的方案舍弃掉了,就是这么的喜新厌旧,不过之前的 SimpleMessageListenerContainer 也不是一无是处,学过之后可以更好的理解内部的一些逻辑。

@RabbitListener

的特点:

  • RabbitListener 是 SpringBoot 架构中监听消息的终极方案
  • RabbitListener 使用注解声明,对业务代码无侵入。
  • RabbitListener 可以在 SpringBoot 配置文件中进行配置。
@RabbitListener

本身是 Java 中的注解,可以搭配其他注解一起使用:

  • @Exchange:自动声明 Exchange。
  • @Queue:自动声明队列。
  • @QueueBinding:自动声明绑定关系。

基本使用

首先在

RabbitConfig

中新增创建

RabbitListenerContainerFactory

的 Bean,看名字应该就知道是用来替换掉

SimpleMessageListenerContainer

的工厂。方便后边使用

@RabbitListener

时创建

ListenerContainer

@Slf4j@ConfigurationpublicclassRabbitConfig{publicstaticfinalString EXCHANGE_NAME ="exchange.cat.dog";publicstaticfinalString EXCHANGE_DLX  ="exchange.dlx";publicstaticfinalString QUEUE_NAME    ="queue.cat";publicstaticfinalString QUEUE_DLX     ="queue.dlx";publicstaticfinalString KEY_NAME      ="key.yingduan";publicstaticfinalString KEY_DLX       ="#";publicstaticfinalString RABBIT_ADMIN                      ="rabbitAdmin";publicstaticfinalString RABBIT_LISTENER_CONTAINER_FACTORY ="rabbitListenerContainerFactory";@BeanConnectionFactoryconnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");// 发送端确认的类型
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);// 开启消息返回机制
        connectionFactory.setPublisherReturns(true);return connectionFactory;}@Bean(name = RABBIT_ADMIN)RabbitAdminrabbitAdmin(@AutowiredConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}@BeanExchangeexchange(){returnnewDirectExchange(EXCHANGE_NAME);}@BeanQueuequeue(){// 配置声明队列时使用的参数Map<String,Object> args =newHashMap<>(1);// 设置死信队列指向的交换机
        args.put("x-dead-letter-exchange", EXCHANGE_DLX);returnnewQueue(QUEUE_NAME,true,false,false, args);}@BeanBindingbinding(){// 目的地名称、目的地类型、绑定交换机、绑定 key、参数returnnewBinding(QUEUE_NAME,Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME,null);}@BeanRabbitTemplaterabbitTemplate(@AutowiredConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);// 开启 Mandatory
        rabbitTemplate.setMandatory(true);// 配置 发送端确认 回调函数
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
            log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);});// 配置 消息返回 回调函数,只有在消息没有从 Exchange 正确路由到 Queue 时才有回调。
        rabbitTemplate.setReturnsCallback(returned ->{
            log.info("【消息返回】- 入参; returned: ${}$", returned);});return rabbitTemplate;}@Bean(name = RABBIT_LISTENER_CONTAINER_FACTORY)RabbitListenerContainerFactoryrabbitListenerContainerFactory(@AutowiredConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory listenerContainerFactory =newSimpleRabbitListenerContainerFactory();
        listenerContainerFactory.setConnectionFactory(connectionFactory);return listenerContainerFactory;}}

发送端:

@Slf4j@ComponentpublicclassPublisherConfirm{@ResourceprivateRabbitTemplate rabbitTemplate;@PostConstructvoidsend()throwsException{// 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());Student student =newStudent("大漠知秋",18);ObjectMapper objectMapper =newObjectMapper();String s = objectMapper.writeValueAsString(student);

        rabbitTemplate.send(RabbitConfig.EXCHANGE_NAME,RabbitConfig.KEY_NAME,newMessage(s.getBytes(StandardCharsets.UTF_8)),
                correlationData
        );}}

消费者端:

@Slf4j@ComponentpublicclassRabbitListenerTest{@RabbitListener(
            containerFactory =RabbitConfig.RABBIT_LISTENER_CONTAINER_FACTORY,
            queues ={RabbitConfig.QUEUE_NAME})voidlistenCat(@PayloadMessage message){
        log.info("【消费消息】- 入参;message: ${}$", message);
        log.info("【消费消息】- student: ${}$",newString(message.getBody(),StandardCharsets.UTF_8));}}

show:

2022-07-0518:03:03.101  INFO 60560---[ntContainer#0-1]c.l.rabbitmq.handle.RabbitListenerTest: 【消费消息】- 入参;message: $(Body:'[B@251d4c4b(byte[32])' MessageProperties[headers={spring_listener_return_correlation=0e26e018-0e0a-43af-a197-67428c8fc800, spring_returned_message_correlation=e8937d7d-1257-46a2-93ec-e65bd2fac5ad}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=exchange.cat.dog, receivedRoutingKey=key.yingduan, deliveryTag=1, consumerTag=amq.ctag-A8eb9Qb1Uwyrdz6KcsvwBA, consumerQueue=queue.cat])$
2022-07-0518:03:03.101  INFO 60560---[ntContainer#0-1]c.l.rabbitmq.handle.RabbitListenerTest: 【消费消息】- student: ${"name":"大漠知秋","age":18}$

使用 bindings 创建 Exchange、Queue、Binding

简化后的

RabbitConfig

@Slf4j@ConfigurationpublicclassRabbitConfig{publicstaticfinalString EXCHANGE_NAME ="exchange.cat.dog";publicstaticfinalString EXCHANGE_DLX  ="exchange.dlx";publicstaticfinalString QUEUE_NAME    ="queue.cat";publicstaticfinalString QUEUE_DLX     ="queue.dlx";publicstaticfinalString KEY_NAME      ="key.yingduan";publicstaticfinalString KEY_DLX       ="#";publicstaticfinalString RABBIT_ADMIN                      ="rabbitAdmin";publicstaticfinalString RABBIT_LISTENER_CONTAINER_FACTORY ="rabbitListenerContainerFactory";@BeanConnectionFactoryconnectionFactory(){CachingConnectionFactory connectionFactory =newCachingConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("kzh_mxg4vfb2QRP*xkv");// 发送端确认的类型
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);// 开启消息返回机制
        connectionFactory.setPublisherReturns(true);return connectionFactory;}@Bean(name = RABBIT_ADMIN)RabbitAdminrabbitAdmin(@AutowiredConnectionFactory connectionFactory){returnnewRabbitAdmin(connectionFactory);}@BeanRabbitTemplaterabbitTemplate(@AutowiredConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);// 开启 Mandatory
        rabbitTemplate.setMandatory(true);// 配置 发送端确认 回调函数
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
            log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);});// 配置 消息返回 回调函数,只有在消息没有从 Exchange 正确路由到 Queue 时才有回调。
        rabbitTemplate.setReturnsCallback(returned ->{
            log.info("【消息返回】- 入参; returned: ${}$", returned);});return rabbitTemplate;}@Bean(name = RABBIT_LISTENER_CONTAINER_FACTORY)RabbitListenerContainerFactoryrabbitListenerContainerFactory(@AutowiredConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory listenerContainerFactory =newSimpleRabbitListenerContainerFactory();
        listenerContainerFactory.setConnectionFactory(connectionFactory);return listenerContainerFactory;}}
RabbitListenerTest

@Slf4j@ComponentpublicclassRabbitListenerTest{@RabbitListener(
            containerFactory =RabbitConfig.RABBIT_LISTENER_CONTAINER_FACTORY,// 指定 RabbitAdmin,创建 Exchange、Queue、Binding 时使用
            admin =RabbitConfig.RABBIT_ADMIN,// 绑定关系,没有的 Exchange、Queue、Binding 没有的会自动创建。
            bindings ={// 第一个绑定关系,可以多个@QueueBinding(// 队列
                            value =@Queue(// 队列名
                                    name =RabbitConfig.QUEUE_NAME,// 队列参数
                                    arguments ={// 队列中消息超时时间@Argument(
                                                    name ="x-message-ttl",
                                                    value ="1000",
                                                    type ="java.lang.Integer"),// 死信队列配置信息@Argument(
                                                    name ="x-dead-letter-exchange",
                                                    value =RabbitConfig.EXCHANGE_DLX,// 默认值就是 String, 也可以不写
                                                    type ="java.lang.String")}),// 交换机
                            exchange =@Exchange(// 交换机名
                                    name =RabbitConfig.EXCHANGE_NAME
                            ),// 绑定 Key
                            key ={RabbitConfig.KEY_NAME})})voidlistenCat(@PayloadMessage message){
        log.info("【消费消息】- 入参;message: ${}$", message);
        log.info("【消费消息】- student: ${}$",newString(message.getBody(),StandardCharsets.UTF_8));}}

这种方式注解写到崩溃,不建议使用。。。

SpringBoot 使用 RabbitMQ 终极方案

SpringBoot 的开发原则就是

约定大于配置

,上面的代码中,还存在着不少

@Bean

的配置代码,这显然很不 SpringBoot,应该把一些常规配置,配置到

.yml

.properties

中,让项目可以从配置文件中自动加载好 Bean,对此,SpringBoot AMQP 包提供了对应的支持。

配置文件(.yml/.properties)

spring:rabbitmq:host:'localhost'port:5672username:'admin'password:'kzh_mxg4vfb2QRP*xkv'virtual-host:'/'# 发送端确认机制开启,并且使用关联性的类型publisher-confirm-type: correlated
    # 开启消息返回机制publisher-returns:truetemplate:# 开启委托,配合 publisher-returns 使用mandatory:truelistener:simple:# Ack 模式acknowledge-mode: manual
        # 消费者限流prefetch:10# 并发处理的线程最小数目,不能大于 max-concurrencyconcurrency:3# 并发处理的线程最大数目,不能小于 concurrencymax-concurrency:5

RabbitConfig

主要是配置

发送端确认回调

消息返回回调

Exchange

Queue

Binding

的创建。

@Slf4j@ConfigurationpublicclassRabbitConfigimplementsInitializingBean{publicstaticfinalString EXCHANGE_NAME ="exchange.cat.dog";publicstaticfinalString EXCHANGE_DLX  ="exchange.dlx";publicstaticfinalString QUEUE_NAME    ="queue.cat";publicstaticfinalString QUEUE_DLX     ="queue.dlx";publicstaticfinalString KEY_NAME      ="key.yingduan";publicstaticfinalString KEY_DLX       ="#";@ResourceprivateRabbitTemplate rabbitTemplate;@OverridepublicvoidafterPropertiesSet()throwsException{// 发送端确认 回调配置
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
            log.info("【发送端确认】- 入参; correlationData: ${}$, ack: ${}$, cause: ${}$", correlationData, ack, cause);});// 消息返回 回调配置,只有在 Exchange 无法路由到 Queue 时回调
        rabbitTemplate.setReturnsCallback(returned ->{
            log.error("【消息返回】- 入参; returned: ${}$", returned);});}@BeanExchangeexchange(){returnnewDirectExchange(EXCHANGE_NAME);}@BeanQueuequeue(){// 配置声明队列时使用的参数Map<String,Object> args =newHashMap<>(1);// 设置死信队列指向的交换机
        args.put("x-dead-letter-exchange", EXCHANGE_DLX);// 设置队列内消息过期时间,单位:毫秒
        args.put("x-message-ttl",15000);returnnewQueue(QUEUE_NAME,true,false,false, args);}@BeanBindingbinding(){// 目的地名称、目的地类型、绑定交换机、绑定 key、参数returnnewBinding(QUEUE_NAME,Binding.DestinationType.QUEUE, EXCHANGE_NAME, KEY_NAME,null);}@BeanTopicExchangedlxExchange(){returnnewTopicExchange(EXCHANGE_DLX);}@BeanQueuedlxQueue(){returnnewQueue(QUEUE_DLX);}@BeanBindingdlxBinding(){// 目的地名称、目的地类型、绑定交换机、绑定 key、参数returnnewBinding(QUEUE_DLX,Binding.DestinationType.QUEUE, EXCHANGE_DLX, KEY_DLX,null);}}

发送消息

@Slf4j@RestController@RequestMapping("/send")publicclassSendController{@ResourceprivateRabbitTemplate rabbitTemplate;@GetMapping("/sendOne/{name}")voidsendOne(@PathVariable(name ="name")String name)throwsJsonProcessingException{
        log.info("【sendOne】- 入参: ${}$", name);// 发送消息时,增加 CorrelationData 字段,在发送端确认的回调函数中会回传过来。CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());Student student =newStudent(name,18);ObjectMapper objectMapper =newObjectMapper();String s = objectMapper.writeValueAsString(student);MessageProperties messageProperties =newMessageProperties();// 消息过期时间 10 秒
        messageProperties.setExpiration("10000");

        rabbitTemplate.send(RabbitConfig.EXCHANGE_NAME,RabbitConfig.KEY_NAME,newMessage(s.getBytes(StandardCharsets.UTF_8), messageProperties),
                correlationData
        );}}

监听消息

@Slf4j@ComponentpublicclassRabbitListeners{@RabbitListener(queues ={RabbitConfig.QUEUE_NAME})voidlistenCat(String content,@PayloadMessage message,Channel channel)throwsIOException,InterruptedException{
        log.info("【消费消息】- 入参;content: ${}$, message: ${}$, channel: ${}$", content, message, channel);
        log.info("【消费消息】- student: ${}$",newString(message.getBody(),StandardCharsets.UTF_8));TimeUnit.SECONDS.sleep(3);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}

Student

@NoArgsConstructor@AllArgsConstructor@DatapublicclassStudent{privateString name;privateInteger age;}

说明

这一套下来包括了

发送端确认

消息返回

手动 Ack

消费者限流

消息过期

死信队列

。可以有效地保证消息的发送、路由、消费能够正常执行。


本文转载自: https://blog.csdn.net/wo18237095579/article/details/125651191
版权归原作者 大漠知秋 所有, 如有侵权,请联系我们删除。

“SpringBoot AMQP”的评论:

还没有评论