0


Java17 --- RabbitMQ之常规使用

一、实现消息可靠性投递

1.1、消息生产者端确认机制

修改yml

spring:
  rabbitmq:
    host: 192.168.200.110
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    publisher-confirm-type: correlated #交换机确认
    publisher-returns: true #队列确认
@Configuration
@Slf4j
public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @PostConstruct
    public void initRabbitTemplate(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }
    //消息发送到交换机成功或失败都会调用
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("correlationData:" + correlationData);
        log.info("ack:" + ack);
        log.info("cause:" + cause);
    }
    //发送到队列失败调用
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
         log.info("消息主体:" + new String(returnedMessage.getMessage().getBody()));
         log.info("应答码:" + returnedMessage.getReplyCode());
         log.info("描述:" + returnedMessage.getReplyText());
         log.info("使用交换机:" + returnedMessage.getExchange());
         log.info("消息使用的路由键:" + returnedMessage.getRoutingKey());
    }
}
@SpringBootTest
public class MQTest {
    public static final String EXCHANGE_DIRECT = "exchange.direct.order";
    public static final String ROUTING_KEY = "order";
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test1(){
        rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"你好,美羊羊");
    }
}

1.2、备份交换机

创建备份交换机

创建绑定队列

将原交换机与备份交换机绑定

 @Test
    public void test1(){
       //交换机名要对,路由键错才行
        rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY + "1","你好,慢羊羊");
    }

1.3、消费端确认机制

修改yml

spring:
  rabbitmq:
    host: 192.168.200.110
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual #手动确认
    @RabbitListener(queues = {QUEUE_NAME})
    public void getMessage(String date, Message message, Channel channel) throws IOException {
        //获取当前deliveryTagID
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info(" "+1 / 0);
            //成功返回ACK信息
            channel.basicAck(deliveryTag,false);
            log.info("接收消息为:" + date);
        } catch (Exception e) {
            //获取消息是否重复投递
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            //失败返回NACK信息

                if (redelivered){
                    //long var1,
                    // boolean var3,
                    // boolean var4 控制消息是否重新放回队列
                    channel.basicNack(deliveryTag,false,false);
                }else {
                    channel.basicNack(deliveryTag,false,true);
                }
            throw new RuntimeException(e);
        }

    }
}

二、消费端限流设置

只需要修改yml

spring:
  rabbitmq:
    host: 192.168.200.110
    port: 5672
    username: guest
    password: 123456
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual #手动确认
        prefetch: 1 #设置每次从队列中读取消息数

三、消息超时设置

3.1、从队列设置全局超时时间

3.2、设置消息本身超时时间

@Test
    public void test4(){
        //创建消息后置处理器
        MessagePostProcessor messagePostProcessor = message -> {
            //设置过期时间,单位毫秒
            message.getMessageProperties().setExpiration("10000");
            return message;
        };
        rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"a",messagePostProcessor);

    }

四、死信

4.1、消费端拒绝接收消息

4.1.1、创建死信交换机与队列

正常创建绑定即可

4.1.2、创建常规交换机与队列

创建常规队列注意事项

//监听正常队列
    //@RabbitListener(queues = {QUEUE_NAME_NORMAL})
    public void getMessageNormal(String date, Message message, Channel channel) throws Exception {
        //获取当前deliveryTagID
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //返回ACK信息
        channel.basicReject(deliveryTag,false);
        log.info("拒绝接收消息" );
    }
    //监听死信队列
    //@RabbitListener(queues = {QUEUE_NAME_DEAD})
    public void getMessageDead(String date, Message message, Channel channel) throws Exception {
        //获取当前deliveryTagID
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //成功返回ACK信息
        channel.basicAck(deliveryTag,false);
        log.info("接收消息为:" + date);
    }

4.2、消息数量超过队列容纳限度

 @Test
    public void test5(){
        for (int i = 1; i <=20 ; i++) {
            rabbitTemplate.convertAndSend(EXCHANGE_DIRECT_NORMAL,ROUTING_KEY_NORMAL,"a"+i);
        }
    }

五、延迟队列

5.1、使用死信队列实现

5.2、使用插件实现

docker inspect rabbitmq

下载的插件放入source后的目录

进入容器内部

docker exec -it rabbitmq /bin/bash

启动插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

退出容器

exit

重启容器

docker restart rabbitmq

5.3、创建交换机与队列

队列正常创建,无需参数设置

测试代码:

 public static final String EXCHANGE_DIRECT_DELAY = "exchange.delay";
    public static final String ROUTING_KEY_DELAY = "delay";
@Test
    public void test6(){
        //创建消息后置处理器
        MessagePostProcessor messagePostProcessor = message -> {
            //设置过期时间,单位毫秒
            //必须安装启动延迟插件设置才生效
            message.getMessageProperties().setHeader("x-delay","10000");
            return message;
        };
        rabbitTemplate.convertAndSend(EXCHANGE_DIRECT_DELAY,ROUTING_KEY_DELAY,
                "你好,插件" + new SimpleDateFormat("HH:mm:ss").format(new Date()),messagePostProcessor);
    }
public static final String QUEUE_NAME_DELAY = "queue.delay";
@RabbitListener(queues = {QUEUE_NAME_DELAY})
    public void getMessageDelay(String date, Message message, Channel channel) throws Exception {
        //获取当前deliveryTagID
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        //成功返回ACK信息
        channel.basicAck(deliveryTag,false);
        log.info("接收消息为:" + date);
        log.info("当前时间为:" + new SimpleDateFormat("HH:mm:ss").format(new Date()));
    }

六、事务消息

在Java配置类进行设置

@Bean
    public RabbitTransactionManager transactionManager(CachingConnectionFactory cachingConnectionFactory){
        return new RabbitTransactionManager(cachingConnectionFactory);
    }
    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){
        RabbitTemplate rabbitTemplate1 = new RabbitTemplate(cachingConnectionFactory);
        rabbitTemplate1.setChannelTransacted(true);
        return rabbitTemplate1;
    }
 @Test
    @Transactional
    @Rollback(value = false)
    public void test7(){
        rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"异常前");
        int var = 3 / 0;
        rabbitTemplate.convertAndSend(EXCHANGE_DIRECT,ROUTING_KEY,"异常后");
    }

七、优先级队列

创建交换机与队列

@Test
    public void test8(){
        //创建消息后置处理器
        MessagePostProcessor messagePostProcessor = message -> {
            message.getMessageProperties().setPriority(3);
            return message;
        };
        rabbitTemplate.convertAndSend("exchange.priority","priority","第3级",messagePostProcessor);
    }
标签: 中间件

本文转载自: https://blog.csdn.net/qq_46093575/article/details/139503756
版权归原作者 鸭鸭老板 所有, 如有侵权,请联系我们删除。

“Java17 --- RabbitMQ之常规使用”的评论:

还没有评论