0


RabbitMQ的两个简单示例

在实际应用中,消息队列常用于实现异步通信和解耦。例如,当商品上架时,将商品信息发送到消息队列,并由消费者处理这些消息。这可以用来完成各种任务,如更新库存、通知用户或记录日志等。

下面是一个详细的简单收发的示例,展示了如何在商品上架时使用 RabbitMQ 发送消息,包括生产者和消费者的实现。

步骤概述

  1. 添加 Maven 依赖。
  2. 配置 RabbitMQ。
  3. 实现生产者:当商品上架时发送消息。
  4. 实现消费者:从队列中接收并处理消息。

1. 添加 Maven 依赖

pom.xml

文件中添加 RabbitMQ 客户端依赖:

<dependencies><!-- RabbitMQ Java client --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.12.0</version></dependency><!-- SLF4J for logging --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.30</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.30</version></dependency></dependencies>

2. RabbitMQ 配置

application.yml

application.properties

中配置 RabbitMQ 连接属性(可选)。

spring:rabbitmq:host: localhost
    port:5672username: guest
    password: guest

3. 实现生产者

商品上架事件类

首先定义一个商品上架事件类,用于传递商品信息:

publicclassProduct{privateString id;privateString name;privatedouble price;// Constructors, Getters, SetterspublicProduct(String id,String name,double price){this.id = id;this.name = name;this.price = price;}publicStringgetId(){return id;}publicvoidsetId(String id){this.id = id;}publicStringgetName(){return name;}publicvoidsetName(String name){this.name = name;}publicdoublegetPrice(){return price;}publicvoidsetPrice(double price){this.price = price;}@OverridepublicStringtoString(){return"Product{"+"id='"+ id +'\''+", name='"+ name +'\''+", price="+ price +'}';}}
生产者类

实现一个生产者类,当有商品上架时发送消息到 RabbitMQ:

importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importcom.fasterxml.jackson.databind.ObjectMapper;publicclassProductProducer{privatefinalstaticStringEXCHANGE_NAME="product_exchange";privatefinalstaticStringROUTING_KEY="product.add";publicstaticvoidmain(String[] argv)throwsException{// 创建连接工厂ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");// 创建连接和频道try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){// 声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"topic");// 创建商品并发送上架消息Product product =newProduct("123","Laptop",1200.00);String message =newObjectMapper().writeValueAsString(product);// 发送消息到交换机
            channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,null, message.getBytes());System.out.println(" [x] Sent '"+ message +"'");}}}

4. 实现消费者

消费者从 RabbitMQ 队列中接收并处理消息:

importcom.rabbitmq.client.*;publicclassProductConsumer{privatefinalstaticStringQUEUE_NAME="product_queue";privatefinalstaticStringEXCHANGE_NAME="product_exchange";privatefinalstaticStringROUTING_KEY="product.add";publicstaticvoidmain(String[] argv)throwsException{// 创建连接工厂ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");// 创建连接和频道try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){// 声明队列
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);// 绑定队列到交换机
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 回调方法,当有消息到达时调用DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),"UTF-8");System.out.println(" [x] Received '"+ message +"'");// 可以在这里处理接收到的消息,例如更新库存、通知用户等};
            channel.basicConsume(QUEUE_NAME,true, deliverCallback, consumerTag ->{});}}}

解释

  1. Exchange 和 Routing Key:使用交换机(EXCHANGE_NAME)和路由键(ROUTING_KEY)将消息从生产者发送到指定队列。
  2. ObjectMapper:使用 Jackson 的 ObjectMapper 将商品对象序列化为 JSON 字符串,在发送和接收消息时进行转换。
  3. Queue 和 Binding:在消费者中声明队列并将其绑定到交换机,确保能够收到相应的消息。

测试

  1. 启动 RabbitMQ 服务,你可以使用 Docker 启动:docker run -d--hostname my-rabbit --name rabbitmq -p5672:5672 -p15672:15672 rabbitmq:3-management
  2. 启动消费者。
  3. 启动生产者,上架商品并发送消息。

消费者将会接收到来自生产者的消息,并在控制台进行打印:

[*] Waiting for messages. To exit press CTRL+C
[x] Received '{"id":"123","name":"Laptop","price":1200.0}'

通过这种方式,可以将商品上架的事件异步发送到 RabbitMQ 消息队列进行处理,从而实现系统之间的解耦和异步通信。

另外RabbitMQ 还能实现延时消费的功能。如实现订单过期未付款的处理示例,可以用RabbitMQ的延时队列或者死信队列(Dead-Letter Queue,简称DLQ)功能。下面是一个详细的示例,展示了如何使用RabbitMQ创建一个延时队列来处理订单过期未付款的情况。

思路概述

  1. 创建一个普通队列,用于接收订单创建的消息。
  2. 创建一个死信交换机和死信队列,用于接收延时队列中未被及时处理的消息,即过期未付款的订单。
  3. 在普通队列中设置消息的TTL(Time to Live)。
  4. 当消息在普通队列中超时后,会被转发到死信队列。

实现生产者

订单类

首先定义一个订单类,用于传递订单信息:

publicclassOrder{privateString orderId;privateString product;privatedouble amount;// Constructors, Getters, SetterspublicOrder(String orderId,String product,double amount){this.orderId = orderId;this.product = product;this.amount = amount;}publicStringgetOrderId(){return orderId;}publicvoidsetOrderId(String orderId){this.orderId = orderId;}publicStringgetProduct(){return product;}publicvoidsetProduct(String product){this.product = product;}publicdoublegetAmount(){return amount;}publicvoidsetAmount(double amount){this.amount = amount;}@OverridepublicStringtoString(){return"Order{"+"orderId='"+ orderId +'\''+", product='"+ product +'\''+", amount="+ amount +'}';}}
生产者类

实现一个生产者类,当有新订单创建时发送消息到RabbitMQ:

importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.ConnectionFactory;importcom.fasterxml.jackson.databind.ObjectMapper;importjava.util.HashMap;importjava.util.Map;publicclassOrderProducer{privatefinalstaticStringEXCHANGE_NAME="order_exchange";privatefinalstaticStringQUEUE_NAME="order_queue";privatefinalstaticStringDLX_EXCHANGE_NAME="dlx_order_exchange";privatefinalstaticStringDLX_QUEUE_NAME="dlx_order_queue";publicstaticvoidmain(String[] args)throwsException{// 创建连接工厂ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");// 创建连接和频道try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){// 声明死信交换机和死信队列
            channel.exchangeDeclare(DLX_EXCHANGE_NAME,"direct");
            channel.queueDeclare(DLX_QUEUE_NAME,false,false,false,null);
            channel.queueBind(DLX_QUEUE_NAME,DLX_EXCHANGE_NAME,"dlx_order");// 声明交换机和队列,并设置生存时间和死信交换机Map<String,Object> argsMap =newHashMap<>();
            argsMap.put("x-message-ttl",60000);// 设置消息TTL为60秒
            argsMap.put("x-dead-letter-exchange",DLX_EXCHANGE_NAME);// 设置死信交换机
            argsMap.put("x-dead-letter-routing-key","dlx_order");

            channel.exchangeDeclare(EXCHANGE_NAME,"direct");
            channel.queueDeclare(QUEUE_NAME,false,false,false, argsMap);
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"order");// 创建订单并发送消息到普通队列Order order =newOrder("123","Laptop",1200.00);String message =newObjectMapper().writeValueAsString(order);// 发送消息到交换机
            channel.basicPublish(EXCHANGE_NAME,"order",null, message.getBytes());System.out.println(" [x] Sent '"+ message +"'");}}}

实现消费者

消费者从死信队列中接收并处理过期未付款的订单:

importcom.rabbitmq.client.*;importcom.fasterxml.jackson.databind.ObjectMapper;publicclassOrderConsumer{privatefinalstaticStringDLX_QUEUE_NAME="dlx_order_queue";publicstaticvoidmain(String[] argv)throwsException{// 创建连接工厂ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");// 创建连接和频道try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 回调方法,当有消息到达时调用DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),"UTF-8");// 反序列化消息内容ObjectMapper objectMapper =newObjectMapper();Order order = objectMapper.readValue(message,Order.class);System.out.println(" [x] Received '"+ order +"'");// 处理过期未付款的订单,例如取消订单、通知用户等handleExpiredOrder(order);};

            channel.basicConsume(DLX_QUEUE_NAME,true, deliverCallback, consumerTag ->{});}}// 模拟处理过期未付款的订单privatestaticvoidhandleExpiredOrder(Order order){System.out.println("Handling expired order: "+ order.getOrderId());// 这里可以添加取消订单、通知用户等逻辑}}

解释

  1. 交换机和死信机制:- 创建了一个普通队列 order_queue,并为其设置了TTL(Time to Live)属性。- 当消息在普通队列中超时后,会转发到死信队列 dlx_order_queue 中进行处理。
  2. 生产者发送消息:- 将订单消息发送到普通交换机 order_exchange,通过路由键 order 转发到普通队列 order_queue。- 设置消息TTL为60秒,即消息在队列中存活60秒后,如果没有被消费,会转发到死信队列。
  3. 消费者处理消息:- 消费者从死信队列 dlx_order_queue 中接收过期未付款的订单。- 反序列化订单消息,并在控制台输出并处理。

测试

  1. 启动RabbitMQ服务,可以使用Docker进行启动:docker run -d--hostname my-rabbit --name rabbitmq -p5672:5672 -p15672:15672 rabbitmq:3-management
  2. 启动消费者。
  3. 启动生产者,创建订单并发送消息。

消费者将会在消息TTL过期后,从死信队列中接收到过期未付款的订单,并在控制台进行日志输出:

[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Order{orderId='123', product='Laptop', amount=1200.0}'
Handling expired order: 123

通过这种方式,可以有效地管理和处理订单过期未付款的情况。

标签: rabbitmq ruby 分布式

本文转载自: https://blog.csdn.net/qq_40592590/article/details/140094992
版权归原作者 青春丨猪头丨少年 所有, 如有侵权,请联系我们删除。

“RabbitMQ的两个简单示例”的评论:

还没有评论