0


RabbitMQ笔记

依赖

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency>

简单模式

在这里插入图片描述

生产者

packagecom.imooc.mq;importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
 * 构建简单模式的生产者,发送消息
 */publicclassFooProducer{publicstaticvoidmain(String[] args)throwsException{// 1. 创建连接工厂以及相关的参数配置ConnectionFactory factory =newConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");// 2. 通过工程创建连接 ConnectionConnection connection = factory.newConnection();// 3. 创建管道 ChannelChannel channel = connection.createChannel();// 4. 创建队列 Queue(简单模式不需要交换机Exchange)/**
         * queue: 队列名
         * durable: 是否持久化,true:重启之后,队列依然存在,false则不存在
         * exclusive: 是否独占,true:只能有一个消费者监听这个队列,一般设置为false
         * autoDelete: 是否自动删除,true:当没有消费者的时候,则自动删除这个队列
         * arguments: map类型的其他参数
         */
        channel.queueDeclare("hello",true,false,false,null);// 5. 向队列发送消息/**
         * exchange: 交换机的名称,简单模式下没有,所以直接设置为 ""
         * routingKey: 路由key,映射路径,如果交换机没有,则路由key和队列名保持一致
         * props: 配置参数
         * body: 消息数据
         */String msg ="Hello 慕课网~~~";
        channel.basicPublish("","hello",null, msg.getBytes());// 6. 释放资源
        channel.close();
        connection.close();}}

消费者

packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * 构建简单模式的消费者,监听消费消息
 */publicclassFooConsumer{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();
        channel.queueDeclare("hello",true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("consumerTag = "+ consumerTag);System.out.println("envelope = "+ envelope.toString());System.out.println("BasicProperties = "+ properties.toString());System.out.println("body = "+newString(body));}};/**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume("hello",true, consumer);}}

工作队列模式

在这里插入图片描述
生产者

packagecom.imooc.mq;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
 * 构建工作队列的生产者,发送消息
 */publicclassWorkQueuesProducer{publicstaticvoidmain(String[] args)throwsException{// 1. 创建连接工厂以及相关的参数配置ConnectionFactory factory =newConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");// 2. 通过工程创建连接 ConnectionConnection connection = factory.newConnection();// 3. 创建管道 ChannelChannel channel = connection.createChannel();// 4. 创建队列 Queue(简单模式不需要交换机Exchange)/**
         * queue: 队列名
         * durable: 是否持久化,true:重启之后,队列依然存在,false则不存在
         * exclusive: 是否独占,true:只能有一个消费者监听这个队列,一般设置为false
         * autoDelete: 是否自动删除,true:当没有消费者的时候,则自动删除这个队列
         * arguments: map类型的其他参数
         */
        channel.queueDeclare("work_queue",true,false,false,null);// 5. 向队列发送消息/**
         * exchange: 交换机的名称,简单模式下没有,所以直接设置为 ""
         * routingKey: 路由key,映射路径,如果交换机没有,则路由key和队列名保持一致
         * props: 配置参数
         * body: 消息数据
         */for(int i =0; i <10; i ++){String task ="开始上班,搬砖喽~ 开启任务["+ i +"]";
            channel.basicPublish("","work_queue",null, task.getBytes());}// 6. 释放资源
        channel.close();
        connection.close();}}

消费者A

packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * 构建工作队列模式的消费者,监听消费消息
 */publicclassWorkQueuesConsumerA{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();
        channel.queueDeclare("work_queue",true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume("work_queue",true, consumer);}}

消费者B

packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * 构建工作队列模式的消费者,监听消费消息
 */publicclassWorkQueuesConsumerB{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();
        channel.queueDeclare("work_queue",true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume("work_queue",true, consumer);}}

发布订阅模式

在这里插入图片描述

生产者

packagecom.imooc.mq;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
 * 构建发布订阅模式的生产者,发送消息
 */publicclassPubSubProducer{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建交换机 Exchange/**
         * exchange: 交换机的名称
         * type: 交换机的类型
         *  FANOUT("fanout"): 广播模式,发布订阅,把消息发送给所有的绑定的队列
         *  DIRECT("direct"): 定向投递模式,把消息发送给指定的“routing key”的队列
         *  TOPIC("topic"): 通配符模式,把消息发送给符合的“routing pattern”的队列
         *  HEADERS("headers"): 使用率不多,参数匹配
         * durable: 是否持久化
         * autoDelete: 是否自动删除
         * internal: 内部意思,true:表示当前exchange是rabbitmq内部使用的,用户创建的队列不会消费该类型交换机下的消息,所以我们一般使用false即可
         * arguments: map类型的参数
         */String fanout_exchange ="fanout_exchange";
        channel.exchangeDeclare(fanout_exchange,BuiltinExchangeType.FANOUT,true,false,false,null);// 定义两个队列String fanout_queue_a ="fanout_queue_a";String fanout_queue_b ="fanout_queue_b";
        channel.queueDeclare(fanout_queue_a,true,false,false,null);
        channel.queueDeclare(fanout_queue_b,true,false,false,null);// 绑定交换机和队列
        channel.queueBind(fanout_queue_a, fanout_exchange,"");
        channel.queueBind(fanout_queue_b, fanout_exchange,"");for(int i =0; i <10; i ++){String task ="开始上班,搬砖喽~ 开启任务["+ i +"]";
            channel.basicPublish(fanout_exchange,"",null, task.getBytes());}

        channel.close();
        connection.close();}}

消费者A

packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * 构建发布订阅模式的消费者,监听消费消息
 */publicclassPubSubConsumerA{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();String fanout_queue_a ="fanout_queue_a";
        channel.queueDeclare(fanout_queue_a,true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume(fanout_queue_a,true, consumer);}}

消费者B

packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * 构建发布订阅模式的消费者,监听消费消息
 */publicclassPubSubConsumerB{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();String fanout_queue_b ="fanout_queue_b";
        channel.queueDeclare(fanout_queue_b,true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume(fanout_queue_b,true, consumer);}}

路由(定向)模式

在这里插入图片描述

生产者

packagecom.imooc.mq;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
 * 构建路由(定向)模式的生产者,发送消息
 */publicclassRoutingProducer{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建交换机 Exchange/**
         * exchange: 交换机的名称
         * type: 交换机的类型
         *  FANOUT("fanout"): 广播模式,发布订阅,把消息发送给所有的绑定的队列
         *  DIRECT("direct"): 定向投递模式,把消息发送给指定的“routing key”的队列
         *  TOPIC("topic"): 通配符模式,把消息发送给符合的“routing pattern”的队列
         *  HEADERS("headers"): 使用率不多,参数匹配
         * durable: 是否持久化
         * autoDelete: 是否自动删除
         * internal: 内部意思,true:表示当前exchange是rabbitmq内部使用的,用户创建的队列不会消费该类型交换机下的消息,所以我们一般使用false即可
         * arguments: map类型的参数
         */String routing_exchange ="routing_exchange";
        channel.exchangeDeclare(routing_exchange,BuiltinExchangeType.DIRECT,true,false,false,null);// 定义两个队列String routing_queue_order ="routing_queue_order";String routing_queue_pay ="routing_queue_pay";
        channel.queueDeclare(routing_queue_order,true,false,false,null);
        channel.queueDeclare(routing_queue_pay,true,false,false,null);// 绑定交换机和队列
        channel.queueBind(routing_queue_order, routing_exchange,"order_create");
        channel.queueBind(routing_queue_order, routing_exchange,"order_delete");
        channel.queueBind(routing_queue_order, routing_exchange,"order_update");
        channel.queueBind(routing_queue_pay, routing_exchange,"order_pay");String msg1 ="创建订单A";String msg2 ="创建订单B";String msg3 ="删除订单C";String msg4 ="修改订单D";String msg5 ="支付订单E";String msg6 ="支付订单F";
        channel.basicPublish(routing_exchange,"order_create",null, msg1.getBytes());
        channel.basicPublish(routing_exchange,"order_create",null, msg2.getBytes());
        channel.basicPublish(routing_exchange,"order_delete",null, msg3.getBytes());
        channel.basicPublish(routing_exchange,"order_update",null, msg4.getBytes());
        channel.basicPublish(routing_exchange,"order_pay",null, msg5.getBytes());
        channel.basicPublish(routing_exchange,"order_pay",null, msg6.getBytes());

        channel.close();
        connection.close();}}

消费者A

packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * 构建路由模式的消费者,监听消费消息
 */publicclassRoutingOrderConsumerA{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();String routing_queue_order ="routing_queue_order";
        channel.queueDeclare(routing_queue_order,true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume(routing_queue_order,true, consumer);}}

消费者B

packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * 构建路由模式的消费者,监听消费消息
 */publicclassRoutingPayConsumerB{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();String routing_queue_pay ="routing_queue_pay";
        channel.queueDeclare(routing_queue_pay,true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume(routing_queue_pay,true, consumer);}}

通配符匹配模式 topic

在这里插入图片描述
生产者

packagecom.imooc.mq;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
 * 构建通配符模式的生产者,发送消息
 */publicclassTopicsProducer{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建交换机 Exchange/**
         * exchange: 交换机的名称
         * type: 交换机的类型
         *  FANOUT("fanout"): 广播模式,发布订阅,把消息发送给所有的绑定的队列
         *  DIRECT("direct"): 定向投递模式,把消息发送给指定的“routing key”的队列
         *  TOPIC("topic"): 通配符模式,把消息发送给符合的“routing pattern”的队列
         *  HEADERS("headers"): 使用率不多,参数匹配
         * durable: 是否持久化
         * autoDelete: 是否自动删除
         * internal: 内部意思,true:表示当前exchange是rabbitmq内部使用的,用户创建的队列不会消费该类型交换机下的消息,所以我们一般使用false即可
         * arguments: map类型的参数
         */String topics_exchange ="topics_exchange";
        channel.exchangeDeclare(topics_exchange,BuiltinExchangeType.TOPIC,true,false,false,null);// 定义两个队列String topics_queue_order ="topics_queue_order";String topics_queue_pay ="topics_queue_pay";
        channel.queueDeclare(topics_queue_order,true,false,false,null);
        channel.queueDeclare(topics_queue_pay,true,false,false,null);// 绑定交换机和队列 *表示通配一个词,#表示通配一个或多个词
        channel.queueBind(topics_queue_order, topics_exchange,"order.*");
        channel.queueBind(topics_queue_pay, topics_exchange,"*.pay.#");String msg1 ="创建订单A";String msg2 ="创建订单B";String msg3 ="删除订单C";String msg4 ="修改订单D";String msg5 ="支付订单E";String msg6 ="超市订单F";String msg7 ="慕课订单G";
        channel.basicPublish(topics_exchange,"order.create",null, msg1.getBytes());
        channel.basicPublish(topics_exchange,"order.create",null, msg2.getBytes());
        channel.basicPublish(topics_exchange,"order.delete",null, msg3.getBytes());
        channel.basicPublish(topics_exchange,"order.update",null, msg4.getBytes());// order.pay 能匹配到两个队列,注意看两个队列是否都能消费到
        channel.basicPublish(topics_exchange,"order.pay",null, msg5.getBytes());
        channel.basicPublish(topics_exchange,"imooc.pay.super.market",null, msg6.getBytes());
        channel.basicPublish(topics_exchange,"imooc.pay.course",null, msg7.getBytes());

        channel.close();
        connection.close();}}

消费者A

packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * 构建通配符模式的消费者,监听消费消息
 */publicclassTopicsOrderConsumerA{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();String topics_queue_order ="topics_queue_order";
        channel.queueDeclare(topics_queue_order,true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume(topics_queue_order,true, consumer);}}

消费者B

packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
 * 构建通配符模式的消费者,监听消费消息
 */publicclassTopicsPayConsumerB{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("192.168.233.128");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();String topics_queue_pay ="topics_queue_pay";
        channel.queueDeclare(topics_queue_pay,true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
             * 重写消息配送方法
             * @param consumerTag 消息的标签(标识)
             * @param envelope  信封(一些信息,比如交换机路由等等信息)
             * @param properties 配置信息
             * @param body 收到的消息数据
             * @throws IOException
             */@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
         * queue: 监听的队列名
         * autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
         * callback: 回调函数,处理监听到的消息
         */
        channel.basicConsume(topics_queue_pay,true, consumer);}}

SpringBoot集成RabbitMQ

依赖(生产和消费)

<!--SpringBoot 整合RabbitMQ 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

配置类 (生产者)

packagecom.imooc.api.mq;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
 * RabbitMQ 的配置类
 */@ConfigurationpublicclassRabbitMQSMSConfig{// 定义交换机的名称publicstaticfinalString SMS_EXCHANGE ="sms_exchange";// 定义队列的名称publicstaticfinalString SMS_QUEUE ="sms_queue";// 统一定义路由keypublicstaticfinalString ROUTING_KEY_SMS_SEND_LOGIN ="imooc.sms.send.login";// 创建交换机@Bean(SMS_EXCHANGE)publicExchangeexchange(){returnExchangeBuilder.topicExchange(SMS_EXCHANGE).durable(true).build();}// 创建队列@Bean(SMS_QUEUE)publicQueuequeue(){//        return new Queue(SMS_QUEUE);returnQueueBuilder.durable(SMS_QUEUE).build();}// 创建绑定关系@BeanpublicBindingsmsBinding(@Qualifier(SMS_EXCHANGE)Exchange exchange,@Qualifier(SMS_QUEUE)Queue queue){returnBindingBuilder.bind(queue).to(exchange).with("imooc.sms.#").noargs();}}

yml配置(生产和消费)

spring:
  rabbitmq:
    host:192.168.233.128
    port:5672
    virtual-host:/
    username: guest
    password: guest

生产者

@AutowiredprivateRabbitTemplate rabbitTemplate;@PostMapping("getSMSCode")publicGraceJSONResultgetSMSCode(String mobile,HttpServletRequest request)throwsException{if(StringUtils.isBlank(mobile)){returnGraceJSONResult.error();}// 获得用户ipString userIp =IPUtil.getRequestIp(request);// 限制用户只能在60s以内获得一次验证码
            redis.setnx60s(MOBILE_SMSCODE +":"+ userIp, mobile);String code =(int)((Math.random()*9+1)*100000)+"";// 使用消息队列异步解耦发送短信SMSContentQO contentQO =newSMSContentQO();
            contentQO.setMobile(mobile);
            contentQO.setContent(code);

            rabbitTemplate.convertAndSend("sms_exchange","imooc.sms.send.login",GsonUtils.object2String(contentQO));

GsonUtils

packagecom.imooc.utils;importcom.google.gson.*;importcom.google.gson.reflect.TypeToken;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;publicclassGsonUtils{/**
     * 不用创建对象,直接使用Gson.就可以调用方法
     */privatestaticGson gson =null;privatestaticJsonParser jsonParser =null;/**
     * 判断gson对象是否存在了,不存在则创建对象
     */static{if(gson ==null){//gson = new Gson();// 当使用GsonBuilder方式时属性为空的时候输出来的json字符串是有键值key的,显示形式是"key":null,而直接new出来的就没有"key":null的
            gson =newGsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();}if(jsonParser ==null){
            jsonParser =newJsonParser();}}privateGsonUtils(){}/**
     * json 转对象
     * @param strJson
     * @return
     */publicstaticJsonObjectstring2Object(String strJson){return jsonParser.parse(strJson).getAsJsonObject();}/**
     * 将对象转成json格式
     * @param object
     * @return String
     */publicstaticStringobject2String(Object object){String gsonString =null;if(gson !=null){
            gsonString = gson.toJson(object);}return gsonString;}/**
     * 将json转成特定的cls的对象
     * @param gsonString
     * @param cls
     * @return
     */publicstatic<T>TstringToBean(String gsonString,Class<T> cls){T t =null;if(gson !=null){//传入json对象和对象类型,将json转成对象
            t = gson.fromJson(gsonString, cls);}return t;}//    public static <T> T stringToBean2(String gsonString, Class<T> cls) {////        JsonParser jsonParser = new JsonParser();//        JsonObject jsonObject = jsonParser.parse(gsonString).getAsJsonObject();////        T t = null;//        if (gson != null) {//            //传入json对象和对象类型,将json转成对象//            t = gson.fromJson(jsonObject, cls);//        }//        return t;//    }/**
     * json字符串转成list
     * @param gsonString
     * @param cls
     * @return
     */publicstatic<T>List<T>stringToList(String gsonString,Class<T> cls){List<T> list =null;if(gson !=null){//根据泛型返回解析指定的类型,TypeToken<List<T>>{}.getType()获取返回类型
            list = gson.fromJson(gsonString,newTypeToken<List<T>>(){}.getType());}return list;}publicstatic<T>List<T>stringToListAnother(String gsonString,Class<T> cls){List<T> list =newArrayList<>();JsonArray jsonArray =newJsonParser().parse(gsonString).getAsJsonArray();Gson gson =newGson();for(JsonElement jsonElement : jsonArray){
            list.add(gson.fromJson(jsonElement,cls));}return list;}/**
     * json字符串转成list中有map的
     * @param gsonString
     * @return
     */publicstatic<T>List<Map<String,T>>stringToListMaps(String gsonString){List<Map<String,T>> list =null;if(gson !=null){
            list = gson.fromJson(gsonString,newTypeToken<List<Map<String,T>>>(){}.getType());}return list;}/**
     * json字符串转成map的
     * @param gsonString
     * @return
     */publicstatic<T>Map<String,T>stringToMaps(String gsonString){Map<String,T> map =null;if(gson !=null){
            map = gson.fromJson(gsonString,newTypeToken<Map<String,T>>(){}.getType());}return map;}publicstaticStringjsonElementAsString(JsonElement jsonElement){return jsonElement ==null?null: jsonElement.getAsString();}publicstaticIntegerjsonElementAsInt(JsonElement jsonElement){return jsonElement ==null?null: jsonElement.getAsInt();}}

消费者

/**
     * 监听队列,并且处理消息
     * @param payload
     * @param message
     */@RabbitListener(queues ={RabbitMQSMSConfig.SMS_QUEUE})publicvoidwatchQueue(String payload,Message message)throwsException{

        log.info("payload = "+ payload);String routingKey = message.getMessageProperties().getReceivedRoutingKey();
        log.info("routingKey = "+ routingKey);String msg = payload;
        log.info("msg = "+ msg);if(routingKey.equalsIgnoreCase(RabbitMQSMSConfig.ROUTING_KEY_SMS_SEND_LOGIN)){// 此处为短信发送的消息消费处理SMSContentQO contentQO =GsonUtils.stringToBean(msg,SMSContentQO.class);//            smsUtils.sendSMS(contentQO.getMobile(), contentQO.getContent());}}

生产端消息可靠性机制

setConfirmCallback交换机确认接收到消息机制

配置文件:

  rabbitmq:
    host:192.168.233.128
    port:5672
    virtual-host:/
    username: guest
    password: guest
    publisher-confirm-type: correlated

生产者实现

        rabbitTemplate.convertAndSend("sms_exchange","imooc.sms.send.login",GsonUtils.object2String(contentQO),newCorrelationData(UUID.randomUUID().toString()));// 定义confirm回调
        rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){/**
             * 回调函数
             * @param correlationData 相关性数据
             * @param ack 交换机是否成功接收到消息,true:成功
             * @param cause 失败的原因
             */@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){
                log.info("进入confirm");
                log.info("correlationData:{}", correlationData.getId());if(ack){
                    log.info("交换机成功接收到消息~~ {}", cause);}else{
                    log.info("交换机接收消息失败~~失败原因: {}", cause);}}});

setReturnsCallback队列未接受到消息回退机制

配置文件

  rabbitmq:
    host:192.168.233.128
    port:5672
    virtual-host:/
    username: guest
    password: guest
    publisher-confirm-type: correlated
    publisher-returns:true

生产者实现

        rabbitTemplate.convertAndSend("sms_exchange","123imooc.sms.send.login",GsonUtils.object2String(contentQO),newCorrelationData(UUID.randomUUID().toString()));// 定义路由匹配不到时候 return回调
        rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returned){
                log.info("进入return");
                log.info(returned.toString());}});

这里把路由故意写为"123imooc.sms.send.login" 这匹配不到RabbitMQSMSConfig类中定义的路由,所以从交换机发布到对应的队列中
在这里插入图片描述
所以就会进入到setReturnsCallback方法中
在这里插入图片描述

消费端消息可靠性

ACK确认机制

    none 默认自动确认
    manual 手动确认(业务处理完成后手动确认,最常用)
    auto 异常类型确认

配置文件

  rabbitmq:
    host:192.168.233.128
    port:5672
    virtual-host:/
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual  # 手动ack确认

消费者实现

@RabbitListener(queues ={RabbitMQSMSConfig.SMS_QUEUE})publicvoidwatchQueue(Message message,Channel channel)throwsException{try{String routingKey = message.getMessageProperties().getReceivedRoutingKey();
            log.info("routingKey = "+ routingKey);int a =1/0;// 模拟异常String msg =newString(message.getBody());
            log.info("msg = "+ msg);/**
             * long deliveryTag: 消息投递的标签
             * boolean multiple: 批量确认所有消费者获得的消息
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}catch(Exception e){
            e.printStackTrace();/**
             * long deliveryTag
             * boolean multiple
             * boolean requeue: true:重回队列 ;false:丢弃消息
             */
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);}}

消费端消息拉取限流

配置文件

  rabbitmq:
    host:192.168.233.128
    port:5672
    virtual-host:/
    username: guest
    password: guest
    listener:
      simple:
        acknowledge-mode: manual  # 手动ack确认
        prefetch:2  # 每次每个消费者从mq中拉去的消息的数量,直到手动ack确认之后,才会拉取下一个消息

模拟场景:
生产者连续发10条消息

for(int i =0; i <10; i ++){
            rabbitTemplate.convertAndSend(RabbitMQSMSConfig.SMS_EXCHANGE,RabbitMQSMSConfig.ROUTING_KEY_SMS_SEND_LOGIN,GsonUtils.object2String(contentQO),newCorrelationData(UUID.randomUUID().toString()));}

消费者在手动ack的地方断点
在这里插入图片描述
查看RabbitMQ管理平台,每次确实只从队列中拉取2条消息
在这里插入图片描述

设置消息过期失效(移除消息)

在RabbitMQSMSConfig配置类中

// 创建队列@Bean(SMS_QUEUE)publicQueuequeue(){returnQueueBuilder.durable(SMS_QUEUE).withArgument("x-message-ttl",30*1000).build();}

.withArgument(“x-message-ttl”, 30*1000) 表示30秒后,消息自动失效
注意:只能对新的队列生效,对老的队列需要先在RabbitMQ管理平台中删除老队列
在这里插入图片描述
测试方法
可以先将消费者注释掉,或者不启动消费者
生产者生产的消息,过一段时间在RabbitMQ管理平台看看是否自动消失
在这里插入图片描述

死信队列

以下3中消息都会进入死信队列
1、 ttl超时被丢弃的消息
2、超过队列长度被丢弃的消息
3、手动nack或者reject,并且requeue为false的消息

死信队列配置类

packagecom.imooc.api.mq;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
 * RabbitMQ 的配置类(用于死信队列的配置)
 */@ConfigurationpublicclassRabbitMQSMSConfig_Dead{// 定义交换机的名称publicstaticfinalString SMS_EXCHANGE_DEAD ="sms_exchange_dead";// 定义队列的名称publicstaticfinalString SMS_QUEUE_DEAD ="sms_queue_dead";// 统一定义路由keypublicstaticfinalString ROUTING_KEY_SMS_DEAD ="dead.sms.display";// 创建交换机@Bean(SMS_EXCHANGE_DEAD)publicExchangeexchange(){returnExchangeBuilder.topicExchange(SMS_EXCHANGE_DEAD).durable(true).build();}// 创建队列@Bean(SMS_QUEUE_DEAD)publicQueuequeue(){returnQueueBuilder.durable(SMS_QUEUE_DEAD).build();}// 创建绑定关系@BeanpublicBindingsmsDeadBinding(@Qualifier(SMS_EXCHANGE_DEAD)Exchange exchange,@Qualifier(SMS_QUEUE_DEAD)Queue queue){returnBindingBuilder.bind(queue).to(exchange).with("dead.sms.*").noargs();}}

正常队列配置类中添加死信队列配置
在这里插入图片描述

packagecom.imooc.api.mq;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
 * RabbitMQ 的配置类
 */@ConfigurationpublicclassRabbitMQSMSConfig{// 定义交换机的名称publicstaticfinalString SMS_EXCHANGE ="sms_exchange";// 定义队列的名称publicstaticfinalString SMS_QUEUE ="sms_queue";// 统一定义路由keypublicstaticfinalString ROUTING_KEY_SMS_SEND_LOGIN ="imooc.sms.send.login";// 创建交换机@Bean(SMS_EXCHANGE)publicExchangeexchange(){returnExchangeBuilder.topicExchange(SMS_EXCHANGE).durable(true).build();}// 创建队列@Bean(SMS_QUEUE)publicQueuequeue(){returnQueueBuilder.durable(SMS_QUEUE).withArgument("x-message-ttl",30*1000)// 队列中消息过期时间,过期进入死信队列.withArgument("x-dead-letter-exchange",RabbitMQSMSConfig_Dead.SMS_EXCHANGE_DEAD)// 死信队列交换机.withArgument("x-dead-letter-routing-key",RabbitMQSMSConfig_Dead.ROUTING_KEY_SMS_DEAD)// 死信队列路由.withArgument("x-max-length",6)// 队列消息个数最大值,超出的进入死信队列.build();}// 创建绑定关系@BeanpublicBindingsmsBinding(@Qualifier(SMS_EXCHANGE)Exchange exchange,@Qualifier(SMS_QUEUE)Queue queue){returnBindingBuilder.bind(queue).to(exchange).with("imooc.sms.#").noargs();}}

死信队列消费者

packagecom.imooc.mq;importcom.imooc.api.mq.RabbitMQSMSConfig_Dead;importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;/**
 * 死信队列监听消费者
 */@Slf4j@ComponentpublicclassRabbitMQSMSConsumer_Dead{/**
     *
     * @param message
     * @param channel
     * @throws Exception
     */@RabbitListener(queues ={RabbitMQSMSConfig_Dead.SMS_QUEUE_DEAD})publicvoidwatchQueue(Message message,Channel channel)throwsException{

        log.info("++++++++++++++++++++++++++++++");String routingKey = message.getMessageProperties().getReceivedRoutingKey();
        log.info("routingKey = "+ routingKey);String msg =newString(message.getBody());
        log.info("msg = "+ msg);

        channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);

        log.info("++++++++++++++++++++++++++++++");}}

测试方法:正常的消费者关闭,设置队列长度为6,然后发10条消息,会发现,首先有4条超过队列长度的消息,进入死信队列消费者,然后,到了消息过期时间后,又有6条消息进入死信队列中被死信消费者消费
在这里插入图片描述


本文转载自: https://blog.csdn.net/ZHUSHANGLIN/article/details/131070363
版权归原作者 Sunny_yiyi 所有, 如有侵权,请联系我们删除。

“RabbitMQ笔记”的评论:

还没有评论