作为rabbitMQ的生产者,发送消息到MQ的过程中,是通过routingkey发送给交换机,由交换机进行路由,把信息发送的最终的队列中。而rabbitMQ消费的时候,是要明确指明消费的队列的。
消费模式
rabbitMQ的消费模式分为两种,推模式和拉模式。推模式使用的是Basic.Consume 进行消费,而拉模式通过调用Basic.Get进行消费。推模式用于持续的获取消息,在推模式中,RabbitMQ会不断的推送消息给消费者,不过推送的数量可以通过Basic.Qos进行限制。拉模式可以单条的获取信息。
消费端的确认和拒绝
为了保证消息可以从队列可靠的到达消费者,RabbitMQ提供了消息确认机制。消费者在订阅队列时,可以指定autoAck参数。
- 如果autoAck参数为true,RabbitMQ会自动把发送出去的消息置为确认,并且从内存(磁盘)中删除,不关消费者是否真正进行了正确消费。
- 如果autoAck参数设置为false,RabbitMQ会等待消费者显式地回复确认信号后才从内存或者磁盘中删除。
采用消息确认机制后,只要设置autoAck为false,消费者就有足够的时间处理消息,不用担心处理消息过程中,消费者进程断掉导致消息丢失的问题。RabbitMQ会一直等到消费者显式调用Basic.ack命令为止。
Springboot使用amqp进行消费
在springboot中,对消息进行消费有两种方式。
- 轮询:使用rabbitMQTemplate.receive()等相关方法进行消费,每次消费一条。
- 注册侦听器:这个方式也是更为灵活,更为常用的。通过
@Bean
设置监听器端点和@RabbitListener
注解的方式实现。
代码演示
@Testpublicvoidreceive()throwsUnsupportedEncodingException{Message receive = rabbitTemplate.receive("queue-msg”);//指定队列名称System.out.println(newString(receive.getBody(),"utf-8"));Object o = rabbitTemplate.receiveAndConvert("queue-msg”);//可以支持类型转换System.out.println(o);}
这是最简单的消费方式,但是可以看到,虽然简单,功能也很少。
那么更常用的是使用配置监听器的方式
@ComponentpublicclassRabbitMQReceiver{@RabbitListener(queues ="test.topic",ackMode ="MANUAL")@RabbitHandler()publicvoidreceive(String msg,Channel channel,Message message)throwsIOException,InterruptedException{System.out.println("接收到消息:RabbitMQReceiver"+message);Thread.sleep(2000);
channel.basicQos(2);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}@RabbitListener(queues ="test.topic",ackMode ="MANUAL")@RabbitHandler()publicvoidreceive2(String msg,Channel channel,Message message)throwsIOException,InterruptedException{System.out.println("wqerdf:RabbitMQReceiver"+message);Thread.sleep(2000);
channel.basicQos(2);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}}/**
这里通过RabbitListener注解指定了队列名称,看到queues应该已经想到了,可以指定多个; ackMode指定了消息需要人为的确认
需要注意的是,如果多个消费者方法,每个方法都要有一个RabbitListener
*/
版权归原作者 被代码耽误的段子手 所有, 如有侵权,请联系我们删除。