依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.16.0</version></dependency>
简单模式
生产者
packagecom.imooc.mq;importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
* 构建简单模式的生产者,发送消息
*/publicclassFooProducer{publicstaticvoidmain(String[] args)throwsException{// 1. 创建连接工厂以及相关的参数配置ConnectionFactory factory =newConnectionFactory();
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");// 2. 通过工程创建连接 ConnectionConnection connection = factory.newConnection();// 3. 创建管道 ChannelChannel channel = connection.createChannel();// 4. 创建队列 Queue(简单模式不需要交换机Exchange)/**
* queue: 队列名
* durable: 是否持久化,true:重启之后,队列依然存在,false则不存在
* exclusive: 是否独占,true:只能有一个消费者监听这个队列,一般设置为false
* autoDelete: 是否自动删除,true:当没有消费者的时候,则自动删除这个队列
* arguments: map类型的其他参数
*/
channel.queueDeclare("hello",true,false,false,null);// 5. 向队列发送消息/**
* exchange: 交换机的名称,简单模式下没有,所以直接设置为 ""
* routingKey: 路由key,映射路径,如果交换机没有,则路由key和队列名保持一致
* props: 配置参数
* body: 消息数据
*/String msg ="Hello 慕课网~~~";
channel.basicPublish("","hello",null, msg.getBytes());// 6. 释放资源
channel.close();
connection.close();}}
消费者
packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
* 构建简单模式的消费者,监听消费消息
*/publicclassFooConsumer{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();
channel.queueDeclare("hello",true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
* 重写消息配送方法
* @param consumerTag 消息的标签(标识)
* @param envelope 信封(一些信息,比如交换机路由等等信息)
* @param properties 配置信息
* @param body 收到的消息数据
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("consumerTag = "+ consumerTag);System.out.println("envelope = "+ envelope.toString());System.out.println("BasicProperties = "+ properties.toString());System.out.println("body = "+newString(body));}};/**
* queue: 监听的队列名
* autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
* callback: 回调函数,处理监听到的消息
*/
channel.basicConsume("hello",true, consumer);}}
工作队列模式
生产者
packagecom.imooc.mq;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
* 构建工作队列的生产者,发送消息
*/publicclassWorkQueuesProducer{publicstaticvoidmain(String[] args)throwsException{// 1. 创建连接工厂以及相关的参数配置ConnectionFactory factory =newConnectionFactory();
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");// 2. 通过工程创建连接 ConnectionConnection connection = factory.newConnection();// 3. 创建管道 ChannelChannel channel = connection.createChannel();// 4. 创建队列 Queue(简单模式不需要交换机Exchange)/**
* queue: 队列名
* durable: 是否持久化,true:重启之后,队列依然存在,false则不存在
* exclusive: 是否独占,true:只能有一个消费者监听这个队列,一般设置为false
* autoDelete: 是否自动删除,true:当没有消费者的时候,则自动删除这个队列
* arguments: map类型的其他参数
*/
channel.queueDeclare("work_queue",true,false,false,null);// 5. 向队列发送消息/**
* exchange: 交换机的名称,简单模式下没有,所以直接设置为 ""
* routingKey: 路由key,映射路径,如果交换机没有,则路由key和队列名保持一致
* props: 配置参数
* body: 消息数据
*/for(int i =0; i <10; i ++){String task ="开始上班,搬砖喽~ 开启任务["+ i +"]";
channel.basicPublish("","work_queue",null, task.getBytes());}// 6. 释放资源
channel.close();
connection.close();}}
消费者A
packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
* 构建工作队列模式的消费者,监听消费消息
*/publicclassWorkQueuesConsumerA{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();
channel.queueDeclare("work_queue",true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
* 重写消息配送方法
* @param consumerTag 消息的标签(标识)
* @param envelope 信封(一些信息,比如交换机路由等等信息)
* @param properties 配置信息
* @param body 收到的消息数据
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
* queue: 监听的队列名
* autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
* callback: 回调函数,处理监听到的消息
*/
channel.basicConsume("work_queue",true, consumer);}}
消费者B
packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
* 构建工作队列模式的消费者,监听消费消息
*/publicclassWorkQueuesConsumerB{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();
channel.queueDeclare("work_queue",true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
* 重写消息配送方法
* @param consumerTag 消息的标签(标识)
* @param envelope 信封(一些信息,比如交换机路由等等信息)
* @param properties 配置信息
* @param body 收到的消息数据
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
* queue: 监听的队列名
* autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
* callback: 回调函数,处理监听到的消息
*/
channel.basicConsume("work_queue",true, consumer);}}
发布订阅模式
生产者
packagecom.imooc.mq;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
* 构建发布订阅模式的生产者,发送消息
*/publicclassPubSubProducer{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建交换机 Exchange/**
* exchange: 交换机的名称
* type: 交换机的类型
* FANOUT("fanout"): 广播模式,发布订阅,把消息发送给所有的绑定的队列
* DIRECT("direct"): 定向投递模式,把消息发送给指定的“routing key”的队列
* TOPIC("topic"): 通配符模式,把消息发送给符合的“routing pattern”的队列
* HEADERS("headers"): 使用率不多,参数匹配
* durable: 是否持久化
* autoDelete: 是否自动删除
* internal: 内部意思,true:表示当前exchange是rabbitmq内部使用的,用户创建的队列不会消费该类型交换机下的消息,所以我们一般使用false即可
* arguments: map类型的参数
*/String fanout_exchange ="fanout_exchange";
channel.exchangeDeclare(fanout_exchange,BuiltinExchangeType.FANOUT,true,false,false,null);// 定义两个队列String fanout_queue_a ="fanout_queue_a";String fanout_queue_b ="fanout_queue_b";
channel.queueDeclare(fanout_queue_a,true,false,false,null);
channel.queueDeclare(fanout_queue_b,true,false,false,null);// 绑定交换机和队列
channel.queueBind(fanout_queue_a, fanout_exchange,"");
channel.queueBind(fanout_queue_b, fanout_exchange,"");for(int i =0; i <10; i ++){String task ="开始上班,搬砖喽~ 开启任务["+ i +"]";
channel.basicPublish(fanout_exchange,"",null, task.getBytes());}
channel.close();
connection.close();}}
消费者A
packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
* 构建发布订阅模式的消费者,监听消费消息
*/publicclassPubSubConsumerA{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();String fanout_queue_a ="fanout_queue_a";
channel.queueDeclare(fanout_queue_a,true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
* 重写消息配送方法
* @param consumerTag 消息的标签(标识)
* @param envelope 信封(一些信息,比如交换机路由等等信息)
* @param properties 配置信息
* @param body 收到的消息数据
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
* queue: 监听的队列名
* autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
* callback: 回调函数,处理监听到的消息
*/
channel.basicConsume(fanout_queue_a,true, consumer);}}
消费者B
packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
* 构建发布订阅模式的消费者,监听消费消息
*/publicclassPubSubConsumerB{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();String fanout_queue_b ="fanout_queue_b";
channel.queueDeclare(fanout_queue_b,true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
* 重写消息配送方法
* @param consumerTag 消息的标签(标识)
* @param envelope 信封(一些信息,比如交换机路由等等信息)
* @param properties 配置信息
* @param body 收到的消息数据
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
* queue: 监听的队列名
* autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
* callback: 回调函数,处理监听到的消息
*/
channel.basicConsume(fanout_queue_b,true, consumer);}}
路由(定向)模式
生产者
packagecom.imooc.mq;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
* 构建路由(定向)模式的生产者,发送消息
*/publicclassRoutingProducer{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建交换机 Exchange/**
* exchange: 交换机的名称
* type: 交换机的类型
* FANOUT("fanout"): 广播模式,发布订阅,把消息发送给所有的绑定的队列
* DIRECT("direct"): 定向投递模式,把消息发送给指定的“routing key”的队列
* TOPIC("topic"): 通配符模式,把消息发送给符合的“routing pattern”的队列
* HEADERS("headers"): 使用率不多,参数匹配
* durable: 是否持久化
* autoDelete: 是否自动删除
* internal: 内部意思,true:表示当前exchange是rabbitmq内部使用的,用户创建的队列不会消费该类型交换机下的消息,所以我们一般使用false即可
* arguments: map类型的参数
*/String routing_exchange ="routing_exchange";
channel.exchangeDeclare(routing_exchange,BuiltinExchangeType.DIRECT,true,false,false,null);// 定义两个队列String routing_queue_order ="routing_queue_order";String routing_queue_pay ="routing_queue_pay";
channel.queueDeclare(routing_queue_order,true,false,false,null);
channel.queueDeclare(routing_queue_pay,true,false,false,null);// 绑定交换机和队列
channel.queueBind(routing_queue_order, routing_exchange,"order_create");
channel.queueBind(routing_queue_order, routing_exchange,"order_delete");
channel.queueBind(routing_queue_order, routing_exchange,"order_update");
channel.queueBind(routing_queue_pay, routing_exchange,"order_pay");String msg1 ="创建订单A";String msg2 ="创建订单B";String msg3 ="删除订单C";String msg4 ="修改订单D";String msg5 ="支付订单E";String msg6 ="支付订单F";
channel.basicPublish(routing_exchange,"order_create",null, msg1.getBytes());
channel.basicPublish(routing_exchange,"order_create",null, msg2.getBytes());
channel.basicPublish(routing_exchange,"order_delete",null, msg3.getBytes());
channel.basicPublish(routing_exchange,"order_update",null, msg4.getBytes());
channel.basicPublish(routing_exchange,"order_pay",null, msg5.getBytes());
channel.basicPublish(routing_exchange,"order_pay",null, msg6.getBytes());
channel.close();
connection.close();}}
消费者A
packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
* 构建路由模式的消费者,监听消费消息
*/publicclassRoutingOrderConsumerA{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();String routing_queue_order ="routing_queue_order";
channel.queueDeclare(routing_queue_order,true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
* 重写消息配送方法
* @param consumerTag 消息的标签(标识)
* @param envelope 信封(一些信息,比如交换机路由等等信息)
* @param properties 配置信息
* @param body 收到的消息数据
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
* queue: 监听的队列名
* autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
* callback: 回调函数,处理监听到的消息
*/
channel.basicConsume(routing_queue_order,true, consumer);}}
消费者B
packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
* 构建路由模式的消费者,监听消费消息
*/publicclassRoutingPayConsumerB{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();String routing_queue_pay ="routing_queue_pay";
channel.queueDeclare(routing_queue_pay,true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
* 重写消息配送方法
* @param consumerTag 消息的标签(标识)
* @param envelope 信封(一些信息,比如交换机路由等等信息)
* @param properties 配置信息
* @param body 收到的消息数据
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
* queue: 监听的队列名
* autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
* callback: 回调函数,处理监听到的消息
*/
channel.basicConsume(routing_queue_pay,true, consumer);}}
通配符匹配模式 topic
生产者
packagecom.imooc.mq;importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;/**
* 构建通配符模式的生产者,发送消息
*/publicclassTopicsProducer{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建交换机 Exchange/**
* exchange: 交换机的名称
* type: 交换机的类型
* FANOUT("fanout"): 广播模式,发布订阅,把消息发送给所有的绑定的队列
* DIRECT("direct"): 定向投递模式,把消息发送给指定的“routing key”的队列
* TOPIC("topic"): 通配符模式,把消息发送给符合的“routing pattern”的队列
* HEADERS("headers"): 使用率不多,参数匹配
* durable: 是否持久化
* autoDelete: 是否自动删除
* internal: 内部意思,true:表示当前exchange是rabbitmq内部使用的,用户创建的队列不会消费该类型交换机下的消息,所以我们一般使用false即可
* arguments: map类型的参数
*/String topics_exchange ="topics_exchange";
channel.exchangeDeclare(topics_exchange,BuiltinExchangeType.TOPIC,true,false,false,null);// 定义两个队列String topics_queue_order ="topics_queue_order";String topics_queue_pay ="topics_queue_pay";
channel.queueDeclare(topics_queue_order,true,false,false,null);
channel.queueDeclare(topics_queue_pay,true,false,false,null);// 绑定交换机和队列 *表示通配一个词,#表示通配一个或多个词
channel.queueBind(topics_queue_order, topics_exchange,"order.*");
channel.queueBind(topics_queue_pay, topics_exchange,"*.pay.#");String msg1 ="创建订单A";String msg2 ="创建订单B";String msg3 ="删除订单C";String msg4 ="修改订单D";String msg5 ="支付订单E";String msg6 ="超市订单F";String msg7 ="慕课订单G";
channel.basicPublish(topics_exchange,"order.create",null, msg1.getBytes());
channel.basicPublish(topics_exchange,"order.create",null, msg2.getBytes());
channel.basicPublish(topics_exchange,"order.delete",null, msg3.getBytes());
channel.basicPublish(topics_exchange,"order.update",null, msg4.getBytes());// order.pay 能匹配到两个队列,注意看两个队列是否都能消费到
channel.basicPublish(topics_exchange,"order.pay",null, msg5.getBytes());
channel.basicPublish(topics_exchange,"imooc.pay.super.market",null, msg6.getBytes());
channel.basicPublish(topics_exchange,"imooc.pay.course",null, msg7.getBytes());
channel.close();
connection.close();}}
消费者A
packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
* 构建通配符模式的消费者,监听消费消息
*/publicclassTopicsOrderConsumerA{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();String topics_queue_order ="topics_queue_order";
channel.queueDeclare(topics_queue_order,true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
* 重写消息配送方法
* @param consumerTag 消息的标签(标识)
* @param envelope 信封(一些信息,比如交换机路由等等信息)
* @param properties 配置信息
* @param body 收到的消息数据
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
* queue: 监听的队列名
* autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
* callback: 回调函数,处理监听到的消息
*/
channel.basicConsume(topics_queue_order,true, consumer);}}
消费者B
packagecom.imooc.mq;importcom.rabbitmq.client.*;importjava.io.IOException;/**
* 构建通配符模式的消费者,监听消费消息
*/publicclassTopicsPayConsumerB{publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
factory.setHost("192.168.233.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();String topics_queue_pay ="topics_queue_pay";
channel.queueDeclare(topics_queue_pay,true,false,false,null);Consumer consumer =newDefaultConsumer(channel){/**
* 重写消息配送方法
* @param consumerTag 消息的标签(标识)
* @param envelope 信封(一些信息,比如交换机路由等等信息)
* @param properties 配置信息
* @param body 收到的消息数据
* @throws IOException
*/@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{System.out.println("body = "+newString(body));}};/**
* queue: 监听的队列名
* autoAck: 是否自动确认,true:告知mq消费者已经消费的确认通知
* callback: 回调函数,处理监听到的消息
*/
channel.basicConsume(topics_queue_pay,true, consumer);}}
SpringBoot集成RabbitMQ
依赖(生产和消费)
<!--SpringBoot 整合RabbitMQ 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
配置类 (生产者)
packagecom.imooc.api.mq;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
* RabbitMQ 的配置类
*/@ConfigurationpublicclassRabbitMQSMSConfig{// 定义交换机的名称publicstaticfinalString SMS_EXCHANGE ="sms_exchange";// 定义队列的名称publicstaticfinalString SMS_QUEUE ="sms_queue";// 统一定义路由keypublicstaticfinalString ROUTING_KEY_SMS_SEND_LOGIN ="imooc.sms.send.login";// 创建交换机@Bean(SMS_EXCHANGE)publicExchangeexchange(){returnExchangeBuilder.topicExchange(SMS_EXCHANGE).durable(true).build();}// 创建队列@Bean(SMS_QUEUE)publicQueuequeue(){// return new Queue(SMS_QUEUE);returnQueueBuilder.durable(SMS_QUEUE).build();}// 创建绑定关系@BeanpublicBindingsmsBinding(@Qualifier(SMS_EXCHANGE)Exchange exchange,@Qualifier(SMS_QUEUE)Queue queue){returnBindingBuilder.bind(queue).to(exchange).with("imooc.sms.#").noargs();}}
yml配置(生产和消费)
spring:
rabbitmq:
host:192.168.233.128
port:5672
virtual-host:/
username: guest
password: guest
生产者
@AutowiredprivateRabbitTemplate rabbitTemplate;@PostMapping("getSMSCode")publicGraceJSONResultgetSMSCode(String mobile,HttpServletRequest request)throwsException{if(StringUtils.isBlank(mobile)){returnGraceJSONResult.error();}// 获得用户ipString userIp =IPUtil.getRequestIp(request);// 限制用户只能在60s以内获得一次验证码
redis.setnx60s(MOBILE_SMSCODE +":"+ userIp, mobile);String code =(int)((Math.random()*9+1)*100000)+"";// 使用消息队列异步解耦发送短信SMSContentQO contentQO =newSMSContentQO();
contentQO.setMobile(mobile);
contentQO.setContent(code);
rabbitTemplate.convertAndSend("sms_exchange","imooc.sms.send.login",GsonUtils.object2String(contentQO));
GsonUtils
packagecom.imooc.utils;importcom.google.gson.*;importcom.google.gson.reflect.TypeToken;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;publicclassGsonUtils{/**
* 不用创建对象,直接使用Gson.就可以调用方法
*/privatestaticGson gson =null;privatestaticJsonParser jsonParser =null;/**
* 判断gson对象是否存在了,不存在则创建对象
*/static{if(gson ==null){//gson = new Gson();// 当使用GsonBuilder方式时属性为空的时候输出来的json字符串是有键值key的,显示形式是"key":null,而直接new出来的就没有"key":null的
gson =newGsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();}if(jsonParser ==null){
jsonParser =newJsonParser();}}privateGsonUtils(){}/**
* json 转对象
* @param strJson
* @return
*/publicstaticJsonObjectstring2Object(String strJson){return jsonParser.parse(strJson).getAsJsonObject();}/**
* 将对象转成json格式
* @param object
* @return String
*/publicstaticStringobject2String(Object object){String gsonString =null;if(gson !=null){
gsonString = gson.toJson(object);}return gsonString;}/**
* 将json转成特定的cls的对象
* @param gsonString
* @param cls
* @return
*/publicstatic<T>TstringToBean(String gsonString,Class<T> cls){T t =null;if(gson !=null){//传入json对象和对象类型,将json转成对象
t = gson.fromJson(gsonString, cls);}return t;}// public static <T> T stringToBean2(String gsonString, Class<T> cls) {//// JsonParser jsonParser = new JsonParser();// JsonObject jsonObject = jsonParser.parse(gsonString).getAsJsonObject();//// T t = null;// if (gson != null) {// //传入json对象和对象类型,将json转成对象// t = gson.fromJson(jsonObject, cls);// }// return t;// }/**
* json字符串转成list
* @param gsonString
* @param cls
* @return
*/publicstatic<T>List<T>stringToList(String gsonString,Class<T> cls){List<T> list =null;if(gson !=null){//根据泛型返回解析指定的类型,TypeToken<List<T>>{}.getType()获取返回类型
list = gson.fromJson(gsonString,newTypeToken<List<T>>(){}.getType());}return list;}publicstatic<T>List<T>stringToListAnother(String gsonString,Class<T> cls){List<T> list =newArrayList<>();JsonArray jsonArray =newJsonParser().parse(gsonString).getAsJsonArray();Gson gson =newGson();for(JsonElement jsonElement : jsonArray){
list.add(gson.fromJson(jsonElement,cls));}return list;}/**
* json字符串转成list中有map的
* @param gsonString
* @return
*/publicstatic<T>List<Map<String,T>>stringToListMaps(String gsonString){List<Map<String,T>> list =null;if(gson !=null){
list = gson.fromJson(gsonString,newTypeToken<List<Map<String,T>>>(){}.getType());}return list;}/**
* json字符串转成map的
* @param gsonString
* @return
*/publicstatic<T>Map<String,T>stringToMaps(String gsonString){Map<String,T> map =null;if(gson !=null){
map = gson.fromJson(gsonString,newTypeToken<Map<String,T>>(){}.getType());}return map;}publicstaticStringjsonElementAsString(JsonElement jsonElement){return jsonElement ==null?null: jsonElement.getAsString();}publicstaticIntegerjsonElementAsInt(JsonElement jsonElement){return jsonElement ==null?null: jsonElement.getAsInt();}}
消费者
/**
* 监听队列,并且处理消息
* @param payload
* @param message
*/@RabbitListener(queues ={RabbitMQSMSConfig.SMS_QUEUE})publicvoidwatchQueue(String payload,Message message)throwsException{
log.info("payload = "+ payload);String routingKey = message.getMessageProperties().getReceivedRoutingKey();
log.info("routingKey = "+ routingKey);String msg = payload;
log.info("msg = "+ msg);if(routingKey.equalsIgnoreCase(RabbitMQSMSConfig.ROUTING_KEY_SMS_SEND_LOGIN)){// 此处为短信发送的消息消费处理SMSContentQO contentQO =GsonUtils.stringToBean(msg,SMSContentQO.class);// smsUtils.sendSMS(contentQO.getMobile(), contentQO.getContent());}}
生产端消息可靠性机制
setConfirmCallback交换机确认接收到消息机制
配置文件:
rabbitmq:
host:192.168.233.128
port:5672
virtual-host:/
username: guest
password: guest
publisher-confirm-type: correlated
生产者实现
rabbitTemplate.convertAndSend("sms_exchange","imooc.sms.send.login",GsonUtils.object2String(contentQO),newCorrelationData(UUID.randomUUID().toString()));// 定义confirm回调
rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){/**
* 回调函数
* @param correlationData 相关性数据
* @param ack 交换机是否成功接收到消息,true:成功
* @param cause 失败的原因
*/@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){
log.info("进入confirm");
log.info("correlationData:{}", correlationData.getId());if(ack){
log.info("交换机成功接收到消息~~ {}", cause);}else{
log.info("交换机接收消息失败~~失败原因: {}", cause);}}});
setReturnsCallback队列未接受到消息回退机制
配置文件
rabbitmq:
host:192.168.233.128
port:5672
virtual-host:/
username: guest
password: guest
publisher-confirm-type: correlated
publisher-returns:true
生产者实现
rabbitTemplate.convertAndSend("sms_exchange","123imooc.sms.send.login",GsonUtils.object2String(contentQO),newCorrelationData(UUID.randomUUID().toString()));// 定义路由匹配不到时候 return回调
rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returned){
log.info("进入return");
log.info(returned.toString());}});
这里把路由故意写为"123imooc.sms.send.login" 这匹配不到RabbitMQSMSConfig类中定义的路由,所以从交换机发布到对应的队列中
所以就会进入到setReturnsCallback方法中
消费端消息可靠性
ACK确认机制
none 默认自动确认
manual 手动确认(业务处理完成后手动确认,最常用)
auto 异常类型确认
配置文件
rabbitmq:
host:192.168.233.128
port:5672
virtual-host:/
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual # 手动ack确认
消费者实现
@RabbitListener(queues ={RabbitMQSMSConfig.SMS_QUEUE})publicvoidwatchQueue(Message message,Channel channel)throwsException{try{String routingKey = message.getMessageProperties().getReceivedRoutingKey();
log.info("routingKey = "+ routingKey);int a =1/0;// 模拟异常String msg =newString(message.getBody());
log.info("msg = "+ msg);/**
* long deliveryTag: 消息投递的标签
* boolean multiple: 批量确认所有消费者获得的消息
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);}catch(Exception e){
e.printStackTrace();/**
* long deliveryTag
* boolean multiple
* boolean requeue: true:重回队列 ;false:丢弃消息
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);}}
消费端消息拉取限流
配置文件
rabbitmq:
host:192.168.233.128
port:5672
virtual-host:/
username: guest
password: guest
listener:
simple:
acknowledge-mode: manual # 手动ack确认
prefetch:2 # 每次每个消费者从mq中拉去的消息的数量,直到手动ack确认之后,才会拉取下一个消息
模拟场景:
生产者连续发10条消息
for(int i =0; i <10; i ++){
rabbitTemplate.convertAndSend(RabbitMQSMSConfig.SMS_EXCHANGE,RabbitMQSMSConfig.ROUTING_KEY_SMS_SEND_LOGIN,GsonUtils.object2String(contentQO),newCorrelationData(UUID.randomUUID().toString()));}
消费者在手动ack的地方断点
查看RabbitMQ管理平台,每次确实只从队列中拉取2条消息
设置消息过期失效(移除消息)
在RabbitMQSMSConfig配置类中
// 创建队列@Bean(SMS_QUEUE)publicQueuequeue(){returnQueueBuilder.durable(SMS_QUEUE).withArgument("x-message-ttl",30*1000).build();}
.withArgument(“x-message-ttl”, 30*1000) 表示30秒后,消息自动失效
注意:只能对新的队列生效,对老的队列需要先在RabbitMQ管理平台中删除老队列
测试方法
可以先将消费者注释掉,或者不启动消费者
生产者生产的消息,过一段时间在RabbitMQ管理平台看看是否自动消失
死信队列
以下3中消息都会进入死信队列
1、 ttl超时被丢弃的消息
2、超过队列长度被丢弃的消息
3、手动nack或者reject,并且requeue为false的消息
死信队列配置类
packagecom.imooc.api.mq;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
* RabbitMQ 的配置类(用于死信队列的配置)
*/@ConfigurationpublicclassRabbitMQSMSConfig_Dead{// 定义交换机的名称publicstaticfinalString SMS_EXCHANGE_DEAD ="sms_exchange_dead";// 定义队列的名称publicstaticfinalString SMS_QUEUE_DEAD ="sms_queue_dead";// 统一定义路由keypublicstaticfinalString ROUTING_KEY_SMS_DEAD ="dead.sms.display";// 创建交换机@Bean(SMS_EXCHANGE_DEAD)publicExchangeexchange(){returnExchangeBuilder.topicExchange(SMS_EXCHANGE_DEAD).durable(true).build();}// 创建队列@Bean(SMS_QUEUE_DEAD)publicQueuequeue(){returnQueueBuilder.durable(SMS_QUEUE_DEAD).build();}// 创建绑定关系@BeanpublicBindingsmsDeadBinding(@Qualifier(SMS_EXCHANGE_DEAD)Exchange exchange,@Qualifier(SMS_QUEUE_DEAD)Queue queue){returnBindingBuilder.bind(queue).to(exchange).with("dead.sms.*").noargs();}}
正常队列配置类中添加死信队列配置
packagecom.imooc.api.mq;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
* RabbitMQ 的配置类
*/@ConfigurationpublicclassRabbitMQSMSConfig{// 定义交换机的名称publicstaticfinalString SMS_EXCHANGE ="sms_exchange";// 定义队列的名称publicstaticfinalString SMS_QUEUE ="sms_queue";// 统一定义路由keypublicstaticfinalString ROUTING_KEY_SMS_SEND_LOGIN ="imooc.sms.send.login";// 创建交换机@Bean(SMS_EXCHANGE)publicExchangeexchange(){returnExchangeBuilder.topicExchange(SMS_EXCHANGE).durable(true).build();}// 创建队列@Bean(SMS_QUEUE)publicQueuequeue(){returnQueueBuilder.durable(SMS_QUEUE).withArgument("x-message-ttl",30*1000)// 队列中消息过期时间,过期进入死信队列.withArgument("x-dead-letter-exchange",RabbitMQSMSConfig_Dead.SMS_EXCHANGE_DEAD)// 死信队列交换机.withArgument("x-dead-letter-routing-key",RabbitMQSMSConfig_Dead.ROUTING_KEY_SMS_DEAD)// 死信队列路由.withArgument("x-max-length",6)// 队列消息个数最大值,超出的进入死信队列.build();}// 创建绑定关系@BeanpublicBindingsmsBinding(@Qualifier(SMS_EXCHANGE)Exchange exchange,@Qualifier(SMS_QUEUE)Queue queue){returnBindingBuilder.bind(queue).to(exchange).with("imooc.sms.#").noargs();}}
死信队列消费者
packagecom.imooc.mq;importcom.imooc.api.mq.RabbitMQSMSConfig_Dead;importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;/**
* 死信队列监听消费者
*/@Slf4j@ComponentpublicclassRabbitMQSMSConsumer_Dead{/**
*
* @param message
* @param channel
* @throws Exception
*/@RabbitListener(queues ={RabbitMQSMSConfig_Dead.SMS_QUEUE_DEAD})publicvoidwatchQueue(Message message,Channel channel)throwsException{
log.info("++++++++++++++++++++++++++++++");String routingKey = message.getMessageProperties().getReceivedRoutingKey();
log.info("routingKey = "+ routingKey);String msg =newString(message.getBody());
log.info("msg = "+ msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
log.info("++++++++++++++++++++++++++++++");}}
测试方法:正常的消费者关闭,设置队列长度为6,然后发10条消息,会发现,首先有4条超过队列长度的消息,进入死信队列消费者,然后,到了消息过期时间后,又有6条消息进入死信队列中被死信消费者消费
版权归原作者 Sunny_yiyi 所有, 如有侵权,请联系我们删除。