0


RabbitMQ的四种消息传递模式与演示代码

RabbitMQ的四种消息传递模式与演示代码

RabbitMQ是一个功能强大的消息代理,提供了多种消息传递模式来满足不同场景下的需求。本文将介绍RabbitMQ的四种常用消息传递模式:Work、Fanout、Direct、Topic,并给出相应的Java示例代码。

1. Work模式

Work模式也被称为任务队列模式,它将任务分发给多个消费者,并由消费者竞争性地消费任务。每个任务只会被一个消费者处理。

Java示例代码:
importcom.rabbitmq.client.*;publicclassWorker{privatestaticfinalStringTASK_QUEUE_NAME="task_queue";publicstaticvoidmain(String[] args)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");finalConnection connection = factory.newConnection();finalChannel channel = connection.createChannel();

        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1);Consumer consumer =newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsIOException{String message =newString(body,"UTF-8");System.out.println(" [x] Received '"+ message +"'");try{doWork(message);}finally{System.out.println(" [x] Done");
                    channel.basicAck(envelope.getDeliveryTag(),false);}}};
        channel.basicConsume(TASK_QUEUE_NAME,false, consumer);}privatestaticvoiddoWork(String task){try{Thread.sleep(1000);}catch(InterruptedException _ignored){Thread.currentThread().interrupt();}}}

应用场景
一个常见的应用场景是在Web应用中实现异步任务处理。例如,用户在网站上提交了一个长时间处理的任务(如生成报表、发送邮件等),为了提高用户体验,可以将任务提交到RabbitMQ的任务队列中,然后由后台的消费者进行异步处理。这样一来,用户在网站上提交任务后即可立即得到响应,而不必等待任务处理完成,提高了系统的响应速度和并发处理能力。

下面是一个使用Work模式的简单示例代码,包括生产者和消费者部分,用Java编写,并添加了详细的注释。

生产者(Producer)代码:

importcom.rabbitmq.client.*;publicclassProducer{privatestaticfinalStringTASK_QUEUE_NAME="task_queue";publicstaticvoidmain(String[] args)throwsException{// 创建连接和频道ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){// 声明队列
            channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);// 构造消息String message =String.join(" ", args);// 发送消息
            channel.basicPublish("",TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));System.out.println(" [x] Sent '"+ message +"'");}}}

消费者(Consumer)代码:

importcom.rabbitmq.client.*;publicclassConsumer{privatestaticfinalStringTASK_QUEUE_NAME="task_queue";publicstaticvoidmain(String[] args)throwsException{// 创建连接和频道ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");finalConnection connection = factory.newConnection();finalChannel channel = connection.createChannel();// 声明队列
        channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 设置消息处理的回调函数
        channel.basicConsume(TASK_QUEUE_NAME,false,newDefaultConsumer(channel){@OverridepublicvoidhandleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throwsjava.io.IOException{String message =newString(body,"UTF-8");// 模拟任务处理过程try{doWork(message);}catch(InterruptedException e){
                    e.printStackTrace();}finally{System.out.println(" [x] Done");// 手动发送应答
                    channel.basicAck(envelope.getDeliveryTag(),false);}}});}privatestaticvoiddoWork(String task)throwsInterruptedException{// 模拟任务处理过程,这里休眠1秒Thread.sleep(1000);}}

在上述代码中,生产者(Producer)将消息发布到名为

task_queue

的队列中,而消费者(Consumer)则从该队列中获取任务并进行处理。每个任务都会被一个消费者处理,通过模拟任务处理过程,演示了Work模式的基本使用方式。

2. Fanout模式

Fanout模式是RabbitMQ中的一种消息传递模式,它将消息广播到所有绑定到Exchange的队列中,即使在消息发布之后才创建的队列,也能接收到消息。

原理解析
在Fanout模式中,生产者将消息发布到一个名为Exchange的交换机中,而不是直接发送到队列中。交换机会将收到的消息广播到与其绑定的所有队列中。因此,无论在消息发布之前还是之后创建的队列,只要它们与交换机进行了绑定,就能接收到交换机广播的消息。

在示例代码中,首先声明一个名为logs的Fanout类型的Exchange,并将消息发布到该Exchange中。消息发布时,并没有指定具体的队列,而是将消息发送到了Exchange中。Exchange会将消息广播到所有与其绑定的队列中,这就是Fanout模式的工作原理。

应用场景
一个常见的应用场景是日志处理系统。在一个分布式的日志系统中,通常会有多个日志消费者,它们分别负责处理不同级别(如info、error等)的日志。通过使用Fanout模式,可以将日志消息广播到所有相关的队列中,每个消费者只需要关注自己负责处理的日志级别,从而实现了日志的分发和处理。

通过Fanout模式,我们可以实现消息的广播传递,适用于多个消费者需要同时接收同一份消息的场景,例如日志处理系统、实时广播等。

Java示例代码:
importcom.rabbitmq.client.*;publicclassEmitLog{privatestaticfinalStringEXCHANGE_NAME="logs";publicstaticvoidmain(String[] argv)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){
            channel.exchangeDeclare(EXCHANGE_NAME,"fanout");String message =getMessage(argv);

            channel.basicPublish(EXCHANGE_NAME,"",null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '"+ message +"'");}}privatestaticStringgetMessage(String[] strings){if(strings.length <1){return"info: Hello World!";}returnString.join(" ", strings);}}
3. Direct模式

Direct模式将消息路由到与消息的RoutingKey完全匹配的队列中。
应用场景
一个常见的应用场景是日志级别过滤。在一个分布式的日志系统中,可能有多个消费者负责处理不同级别的日志(如info、error、warning等)。通过使用Direct模式,生产者可以根据日志的级别指定不同的RoutingKey,并将日志消息发布到Exchange中。Exchange会将日志消息路由到与其RoutingKey完全匹配的队列中,从而实现了日志的级别过滤和分发。

Java示例代码:
importcom.rabbitmq.client.*;publicclassEmitLogDirect{privatestaticfinalStringEXCHANGE_NAME="direct_logs";publicstaticvoidmain(String[] argv)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){
            channel.exchangeDeclare(EXCHANGE_NAME,"direct");String severity =getSeverity(argv);String message =getMessage(argv);

            channel.basicPublish(EXCHANGE_NAME, severity,null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '"+ severity +"':'"+ message +"'");}}privatestaticStringgetSeverity(String[] strings){if(strings.length <1){return"info";}return strings[0];}privatestaticStringgetMessage(String[] strings){if(strings.length <2){return"Hello World!";}returnString.join(" ", strings);}}
4. Topic模式

Topic模式是RabbitMQ中的一种消息传递模式,它将消息发送到与匹配通配符的RoutingKey相匹配的队列中.

原理解析
在Topic模式中,生产者将消息发布到一个名为Exchange的交换机中,并且在发送消息时需要指定一个RoutingKey。RoutingKey可以使用通配符(和#)来匹配多个队列,其中表示匹配一个单词,#表示匹配零个或多个单词。交换机会将收到的消息根据RoutingKey和通配符匹配规则将消息路由到与之匹配的队列中。

在示例代码中,我们首先声明一个名为topic_logs的Topic类型的Exchange,并指定消息的RoutingKey。然后将消息发布到Exchange中,Exchange会根据消息的RoutingKey和通配符匹配规则将消息路由到与之匹配的队列中。

应用场景
一个常见的应用场景是日志过滤器。在一个分布式的日志系统中,可能有多个消费者负责处理不同模块或不同级别的日志。通过使用Topic模式,生产者可以根据日志的模块或级别指定特定的RoutingKey,并将日志消息发布到Exchange中。消费者可以使用通配符来订阅感兴趣的日志模块或级别,Exchange会将日志消息路由到与之匹配的队列中,从而实现了日志的模块化过滤和分发。

Java示例代码:
importcom.rabbitmq.client.*;publicclassEmitLogTopic{privatestaticfinalStringEXCHANGE_NAME="topic_logs";publicstaticvoidmain(String[] argv)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){
            channel.exchangeDeclare(EXCHANGE_NAME,"topic");String routingKey =getRouting(argv);String message =getMessage(argv);

            channel.basicPublish(EXCHANGE_NAME, routingKey,null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '"+ routingKey +"':'"+ message +"'");}}privatestaticStringgetRouting(String[] strings){if(strings.length <1){return"anonymous.info";}return strings[0];}privatestaticStringgetMessage(String[] strings){if(strings.length <2){return"Hello World!";}returnString.join(" ", strings);}}
标签: rabbitmq ruby 分布式

本文转载自: https://blog.csdn.net/qq_51447496/article/details/137469244
版权归原作者 极客李华 所有, 如有侵权,请联系我们删除。

“RabbitMQ的四种消息传递模式与演示代码”的评论:

还没有评论