0


Java17 --- RabbitMQ之常规使用

一、实现消息可靠性投递

1.1、消息生产者端确认机制

修改yml

  1. spring:
  2. rabbitmq:
  3. host: 192.168.200.110
  4. port: 5672
  5. username: guest
  6. password: 123456
  7. virtual-host: /
  8. publisher-confirm-type: correlated #交换机确认
  9. publisher-returns: true #队列确认
  1. @Configuration
  2. @Slf4j
  3. public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @PostConstruct
  7. public void initRabbitTemplate(){
  8. rabbitTemplate.setConfirmCallback(this);
  9. rabbitTemplate.setReturnsCallback(this);
  10. }
  11. //消息发送到交换机成功或失败都会调用
  12. @Override
  13. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  14. log.info("correlationData:" + correlationData);
  15. log.info("ack:" + ack);
  16. log.info("cause:" + cause);
  17. }
  18. //发送到队列失败调用
  19. @Override
  20. public void returnedMessage(ReturnedMessage returnedMessage) {
  21. log.info("消息主体:" + new String(returnedMessage.getMessage().getBody()));
  22. log.info("应答码:" + returnedMessage.getReplyCode());
  23. log.info("描述:" + returnedMessage.getReplyText());
  24. log.info("使用交换机:" + returnedMessage.getExchange());
  25. log.info("消息使用的路由键:" + returnedMessage.getRoutingKey());
  26. }
  27. }
  1. @SpringBootTest
  2. public class MQTest {
  3. public static final String EXCHANGE_DIRECT = "exchange.direct.order";
  4. public static final String ROUTING_KEY = "order";
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate;
  7. @Test
  8. public void test1(){
  9. rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"你好,美羊羊");
  10. }
  11. }

1.2、备份交换机

创建备份交换机

创建绑定队列

将原交换机与备份交换机绑定

  1. @Test
  2. public void test1(){
  3. //交换机名要对,路由键错才行
  4. rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY + "1","你好,慢羊羊");
  5. }

1.3、消费端确认机制

修改yml

  1. spring:
  2. rabbitmq:
  3. host: 192.168.200.110
  4. port: 5672
  5. username: guest
  6. password: 123456
  7. virtual-host: /
  8. listener:
  9. simple:
  10. acknowledge-mode: manual #手动确认
  1. @RabbitListener(queues = {QUEUE_NAME})
  2. public void getMessage(String date, Message message, Channel channel) throws IOException {
  3. //获取当前deliveryTagID
  4. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  5. try {
  6. log.info(" "+1 / 0);
  7. //成功返回ACK信息
  8. channel.basicAck(deliveryTag,false);
  9. log.info("接收消息为:" + date);
  10. } catch (Exception e) {
  11. //获取消息是否重复投递
  12. Boolean redelivered = message.getMessageProperties().getRedelivered();
  13. //失败返回NACK信息
  14. if (redelivered){
  15. //long var1,
  16. // boolean var3,
  17. // boolean var4 控制消息是否重新放回队列
  18. channel.basicNack(deliveryTag,false,false);
  19. }else {
  20. channel.basicNack(deliveryTag,false,true);
  21. }
  22. throw new RuntimeException(e);
  23. }
  24. }
  25. }

二、消费端限流设置

只需要修改yml

  1. spring:
  2. rabbitmq:
  3. host: 192.168.200.110
  4. port: 5672
  5. username: guest
  6. password: 123456
  7. virtual-host: /
  8. listener:
  9. simple:
  10. acknowledge-mode: manual #手动确认
  11. prefetch: 1 #设置每次从队列中读取消息数

三、消息超时设置

3.1、从队列设置全局超时时间

3.2、设置消息本身超时时间

  1. @Test
  2. public void test4(){
  3. //创建消息后置处理器
  4. MessagePostProcessor messagePostProcessor = message -> {
  5. //设置过期时间,单位毫秒
  6. message.getMessageProperties().setExpiration("10000");
  7. return message;
  8. };
  9. rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"a",messagePostProcessor);
  10. }

四、死信

4.1、消费端拒绝接收消息

4.1.1、创建死信交换机与队列

正常创建绑定即可

4.1.2、创建常规交换机与队列

创建常规队列注意事项

  1. //监听正常队列
  2. //@RabbitListener(queues = {QUEUE_NAME_NORMAL})
  3. public void getMessageNormal(String date, Message message, Channel channel) throws Exception {
  4. //获取当前deliveryTagID
  5. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  6. //返回ACK信息
  7. channel.basicReject(deliveryTag,false);
  8. log.info("拒绝接收消息" );
  9. }
  10. //监听死信队列
  11. //@RabbitListener(queues = {QUEUE_NAME_DEAD})
  12. public void getMessageDead(String date, Message message, Channel channel) throws Exception {
  13. //获取当前deliveryTagID
  14. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  15. //成功返回ACK信息
  16. channel.basicAck(deliveryTag,false);
  17. log.info("接收消息为:" + date);
  18. }

4.2、消息数量超过队列容纳限度

  1. @Test
  2. public void test5(){
  3. for (int i = 1; i <=20 ; i++) {
  4. rabbitTemplate.convertAndSend(EXCHANGE_DIRECT_NORMAL,ROUTING_KEY_NORMAL,"a"+i);
  5. }
  6. }

五、延迟队列

5.1、使用死信队列实现

5.2、使用插件实现

docker inspect rabbitmq

下载的插件放入source后的目录

进入容器内部

docker exec -it rabbitmq /bin/bash

启动插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

退出容器

exit

重启容器

docker restart rabbitmq

5.3、创建交换机与队列

队列正常创建,无需参数设置

测试代码:

  1. public static final String EXCHANGE_DIRECT_DELAY = "exchange.delay";
  2. public static final String ROUTING_KEY_DELAY = "delay";
  3. @Test
  4. public void test6(){
  5. //创建消息后置处理器
  6. MessagePostProcessor messagePostProcessor = message -> {
  7. //设置过期时间,单位毫秒
  8. //必须安装启动延迟插件设置才生效
  9. message.getMessageProperties().setHeader("x-delay","10000");
  10. return message;
  11. };
  12. rabbitTemplate.convertAndSend(EXCHANGE_DIRECT_DELAY,ROUTING_KEY_DELAY,
  13. "你好,插件" + new SimpleDateFormat("HH:mm:ss").format(new Date()),messagePostProcessor);
  14. }
  1. public static final String QUEUE_NAME_DELAY = "queue.delay";
  2. @RabbitListener(queues = {QUEUE_NAME_DELAY})
  3. public void getMessageDelay(String date, Message message, Channel channel) throws Exception {
  4. //获取当前deliveryTagID
  5. long deliveryTag = message.getMessageProperties().getDeliveryTag();
  6. //成功返回ACK信息
  7. channel.basicAck(deliveryTag,false);
  8. log.info("接收消息为:" + date);
  9. log.info("当前时间为:" + new SimpleDateFormat("HH:mm:ss").format(new Date()));
  10. }

六、事务消息

在Java配置类进行设置

  1. @Bean
  2. public RabbitTransactionManager transactionManager(CachingConnectionFactory cachingConnectionFactory){
  3. return new RabbitTransactionManager(cachingConnectionFactory);
  4. }
  5. @Bean
  6. public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){
  7. RabbitTemplate rabbitTemplate1 = new RabbitTemplate(cachingConnectionFactory);
  8. rabbitTemplate1.setChannelTransacted(true);
  9. return rabbitTemplate1;
  10. }
  1. @Test
  2. @Transactional
  3. @Rollback(value = false)
  4. public void test7(){
  5. rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"异常前");
  6. int var = 3 / 0;
  7. rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"异常后");
  8. }

七、优先级队列

创建交换机与队列

  1. @Test
  2. public void test8(){
  3. //创建消息后置处理器
  4. MessagePostProcessor messagePostProcessor = message -> {
  5. message.getMessageProperties().setPriority(3);
  6. return message;
  7. };
  8. rabbitTemplate.convertAndSend("exchange.priority","priority","第3级",messagePostProcessor);
  9. }
标签: 中间件

本文转载自: https://blog.csdn.net/qq_46093575/article/details/139503756
版权归原作者 鸭鸭老板 所有, 如有侵权,请联系我们删除。

“Java17 --- RabbitMQ之常规使用”的评论:

还没有评论