0


必学消息队列-RabbitMQ(下集)

个人简介

作者是一个来自河源的大三在校生,以下笔记都是作者自学之路的一些浅薄经验,如有错误请指正,将来会不断的完善笔记,帮助更多的Java爱好者入门。

文章目录

什么是RabbitMQ

MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

MQ的特点

  • MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。
  • MQ遵循了AMQP协议的具体实现和产品。

MQ的使用场景

  • 在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
  • 异步处理(常用)
  • 应用解耦(常用)
  • 流量削峰(常用)

各种MQ对比

在目前主流的消息队列中有(ActiveMQ,RocketMQ,RabbitMQ,kafka)

RabbitMQ在上面的各种消息队列中对于消息的保护是十分到位的(不会丢失消息),相对于kafka,虽然kafka性能十分强悍,在大数据中处理海量数据游刃有余,但是kafka容易丢失消息,而RabbitMQ虽然性能不及kafka,但是也不会很差,对于消息要求完整性很高的系统中用RabbitMQ十分好。

SpringBoot+RabbitMQ

导入启动器

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.3.9.RELEASE</version></dependency>

application.yml

spring:rabbitmq:username: ems
    password:123456virtual-host: /ems
    host: localhost

自定义RabbitTemplate

SpringBoot默认使用CachingConnectionFactory连接工厂

@ConfigurationpublicclassrabbitTemplateConfig{//注入SpringBoot默认的CachingConnectonFactory@Beanpublic RabbitTemplate rabbitTemplate(@Qualifier("rabbitConnectionFactory") CachingConnectionFactory cachingConnectionFactory){
        RabbitTemplate rabbitTemplate =newRabbitTemplate(cachingConnectionFactory);/**
         * 当mandatory标志位设置为true时
         * 如果exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息
         * 那么broker会调用basic.return方法将消息返还给生产者
         * 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃
         */
        rabbitTemplate.setMandatory(true);//使用单独的发送连接,避免生产者由于各种原因阻塞而导致消费者同样阻塞
        rabbitTemplate.setUsePublisherConnection(true);return rabbitTemplate;}}

RabbitTemplate实现发送消息

最简单的使用HelloWorld

经过SpringBoot整合的RabbitMQ,发送消息只要一条语句

对比如下:

原生RabbitMQ:(11行)

publicstaticvoidmain(String[] args)throws IOException, TimeoutException {
        ConnectionFactory factory =newConnectionFactory();
        factory.setUsername("ems");
        factory.setPassword("123456");
        factory.setVirtualHost("/ems");//虚拟主机
        factory.setHost("127.0.0.1");//rabbitMQ的主机名(ip)
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("hello",true,false,false,null);
        channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"第一个RabbitMQ程序!!!".getBytes());
        channel.close();
        connection.close();}

SpringBoot整合RabbitMQ:(1行)

@SpringBootTest@RunWith(SpringRunner.class)publicclassprovider{@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublicvoidsend(){//一条代码即可发送消息/**
         * 参数1:交换机名称
         * 参数2:路由键
         * 参数3:消息内容(不需要转换成byte数组)
         */
        rabbitTemplate.convertAndSend("","boot_hello","boot_helloWorld");}}
@Component//所有RabbitMQ的消费者都需要“”加上“”Spring的组件注解,RabbitMQ消费者监听方法不用运行都可以被自动生效。。。。publicclassconsumer{//RabbitMQ消费者监听方法@RabbitListener(queuesToDeclare ={@Queue(name ="boot_hello",durable ="true",exclusive ="false",autoDelete ="false")})publicvoidreceive(String msg){
        System.out.println(msg);}}
workqueue模式
@SpringBootTest@RunWith(SpringRunner.class)//加载上下文publicclassworkqueueTest{@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublicvoidsend(){//        System.out.println(rabbitTemplate);for(int i =0; i <10; i++){
            rabbitTemplate.convertAndSend("","boot_work","workqueue===>"+i);}}}
@Componentpublicclassconsumer1{@RabbitListener(queuesToDeclare =@Queue(name ="boot_work",durable ="true"))publicvoidreceive1(String msg1){
        System.out.println("consumer1===>"+msg1);}}@Componentclassconsumer2{@RabbitListener(queuesToDeclare =@Queue(name ="boot_work",durable ="true"))publicvoidreceive2(String msg2){
        System.out.println("consumer2===>"+msg2);}}
fanout模式
@SpringBootTest@RunWith(SpringRunner.class)publicclassfanoutTest{@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublicvoidtest(){
        rabbitTemplate.convertAndSend("boot_fanout","","hello");}}
@Componentpublicclassconsumer3{@RabbitListener(bindings ={@QueueBinding(value =@Queue,exchange =@Exchange(value ="boot_fanout",type ="fanout"),key ="")})publicvoidreceive(String msg){
        System.out.println("consumer1===>"+msg);}}@Componentclassconsumer4{@RabbitListener(bindings =@QueueBinding(value =@Queue,exchange =@Exchange(value ="boot_fanout",type ="fanout"),key =""))publicvoidreceive(String msg){
        System.out.println("consumer2===>"+msg);}}
direct模式
@SpringBootTest@RunWith(SpringRunner.class)publicclassdirectTest{@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublicvoidtest(){
        rabbitTemplate.convertAndSend("direct_boot","user.log","direct");}}
@ComponentpublicclassdirectConsumer1{@RabbitListener(bindings =@QueueBinding(exchange =@Exchange(name ="direct_boot",type ="direct"),value =@Queue,key ="user"))publicvoidreceive(String msg){
        System.out.println("consumer1===>"+msg);}}@ComponentclassdirectConsumer2{@RabbitListener(bindings =@QueueBinding(exchange =@Exchange(name ="direct_boot",type ="direct"),value =@Queue,key ="user.log"))publicvoidreceive(String msg){
        System.out.println("consumer2==>"+msg);}}
topic模式
@SpringBootTest@RunWith(SpringRunner.class)publicclasstopicTest{@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublicvoidtest(){
        rabbitTemplate.convertAndSend("topic_boot","user.hello.log","hello");}}
@ComponentpublicclasstopicConsumer1{@RabbitListener(bindings =@QueueBinding(exchange =@Exchange(value ="topic_boot",type ="topic"),value =@Queue,key ="user.#"))publicvoidreceive(String msg){
        System.out.println("consumer1==>"+msg);}}@ComponentclasstopicConsumer2{@RabbitListener(bindings =@QueueBinding(exchange =@Exchange(value ="topic_boot",type ="topic"),value =@Queue,key ="user.*"))publicvoidreceive(String msg){
        System.out.println("consumer2==>"+msg);}}

RabbitMQ高级特性

消息队列的过期时间ttl

如果我们设置了消息队列的过期时间,假设我们设置了5000ms,5000ms过去了,如果这个队列还有未被消费的消息,那么这些消息将会被自动丢弃(无法找回)。。。。

队列里的消息的过期时间(有点坑)

消费者的消息的过期时间

设置消息队列的argument为x-message-ttl 为xxx值,比如value=“5000”,就是5秒过去了,消息队列未被消费的消息将会直接丢弃

坑:@argument注解设置参数一定要指定类型为Number子类,比如java.lang.Integer,不然会报错

比如:arguments = {@Argument(name = “x-message-ttl”,value = “5000”,type = “java.lang.Integer”)}

spring:
  rabbitmq:
    username: ems
    password:123456
    virtual-host:/ems
    host: localhost
    listener:
      direct:
        acknowledge-mode: manual #手动确认
      simple:
        acknowledge-mode: manual #手动确认
@Testpublicvoidtest1(){
        MessageProperties messageProperties =newMessageProperties();
        String msg ="hello_ttl";
        Message message =newMessage(msg.getBytes(),messageProperties);
        rabbitTemplate.convertAndSend("ttl_queue","ttl_a",message);}
/**
     *  ==小坑:
     * 使用RabbitListener实现队列的过期时间ttl必须要指定argument的“type”为Number类的子类,比如java.lang.Integer
     * =======切记,ttl和消息队列长度都要用Number的子类,使用默认的会报错======
     * 因为argument默认是java.lang.String类型,必须修改。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
     * 。。。
     *///@Queue和@Exchange指定value就会使这个队列和交换机设置为不过期的,没有value就是暂时的@RabbitListener(bindings =@QueueBinding(value =@Queue(value ="ttl_temp",durable ="true",arguments ={@Argument(name ="x-message-ttl",value ="5000",type ="java.lang.Integer")}//一定要指定类型),exchange =@Exchange(value ="ttl_queue",type ="direct"),key ={"ttl_a"}))publicvoidreceive(String msg,Message message,Channel channel){
        System.out.println("msg==="+msg);
        System.out.println("message==="+message);
        System.out.println("channel==="+channel);//        try {//            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); //手动确认//        } catch (IOException e) {//            e.printStackTrace();//        }}
指定消息的过期时间

生产者消息的过期时间

核心代码:messageProperties.setExpiration(“5000”);

@Testpublicvoidtest2(){
        MessageProperties messageProperties =newMessageProperties();

        messageProperties.setExpiration("5000");//设置指定消息的过期时间

        String str="ttl_test2";
        Message message =newMessage(str.getBytes(),messageProperties);
        rabbitTemplate.convertAndSend("","ttl_declare",message);}
@RabbitListener(queuesToDeclare =@Queue(name ="ttl_declare"))publicvoidreceive1(String msg,Message message,Channel channel)throws IOException {
        System.out.println(msg);//        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}

死信队列

消息放进死信队列的条件:

1:消息过期了,如果有死信队列则放入死信队列,如果没有死信队列则直接丢弃无法找回。

2:某个消息队列长度已经达到最大值,此时在把消息发送到这个队列中,如果有死信队列则放入死信队列,没有则丢弃

3:消息被拒绝(basic.reject / basic.nack)

================创建死信队列步骤

1:创建一个普通队列

@SpringBootTest@RunWith(SpringRunner.class)publicclassdeadLetter{@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublicvoidtest(){

        Message message =newMessage("deadLetter".getBytes(),newMessageProperties());
        rabbitTemplate.convertAndSend("","nomal_dead",message);}}
@ComponentpublicclassnomalQueue{/**
     * 这里我们只演示一种消息放入死信队列的情况(当消息过期后)
     * 在某个队列设置了x-dead-letter-exchange和x-dead-letter-routing-key后,如果出现丢弃消息就会
     * 通过x-dead-letter-exchange和x-dead-letter-routing-key找到指定的队列,这个队列就会默认是死信队列
     * 其实死信队列也是正常的队列。。。。配置全都一样
     */@RabbitListener(queuesToDeclare =@Queue(value ="nomal_dead",arguments ={@Argument(name ="x-message-ttl",value ="5000",type ="java.lang.Integer"),@Argument(name ="x-dead-letter-exchange",value ="deadletter_exchange1"),@Argument(name ="x-dead-letter-routing-key",value ="deadletter_key1")}))publicvoidreceive(String msg, Message message, Channel channel){
        System.out.println("msg1="+msg);}}
@ComponentpublicclassdeadLetterQueue{/**
     * 这里的交换机和路由key都要和配置的死信交换机、死信路由key一样。
     */@RabbitListener(bindings =@QueueBinding(value =@Queue("deadLetterQueue"),exchange =@Exchange(value ="deadletter_exchange1",type ="direct"),key ="deadletter_key1"))publicvoidreceive_deadLetter(String msg){
        System.out.println(msg);}}

固定长度的消息队列

核心代码:arguments = @Argument(name = “x-max-length”,value = “6”,type = “java.lang.Integer”)

@Testpublicvoidtest(){
            Message message =newMessage(("max").getBytes(),newMessageProperties());
            rabbitTemplate.convertAndSend("","maxLength_queue",message);}
@RabbitListener(queuesToDeclare =@Queue(value ="maxLength_queue",durable ="true",arguments =@Argument(name ="x-max-length",value ="6",type ="java.lang.Integer")))publicvoidreceive(String msg, Message message, Channel channel)throws IOException {
        System.out.println(msg);}

延时队列

应用场景:下了订单过了30分钟未支付,然后就自动取消订单

rabbitmq本身是没有延迟队列的,我们可以通过ttl过期时间和死信队列(DLX)来实现


本文转载自: https://blog.csdn.net/weixin_50071998/article/details/123897652
版权归原作者 摸鱼打酱油 所有, 如有侵权,请联系我们删除。

“必学消息队列-RabbitMQ(下集)”的评论:

还没有评论