0


RabbitMQ 交换机的类型

在 RabbitMQ 中,交换机(Exchange)是一个核心组件,负责接收来自生产者的消息,并根据特定的路由规则将消息分发到相应的队列。交换机的存在改变了消息发送的模式,使得消息的路由更加灵活和高效。

交换机的类型

RabbitMQ 提供了四种主要类型的交换机,每种交换机的路由规则不同:

  1. Direct Exchange(直连交换机):- 功能:基于路由键(Routing Key)将消息发送到与该路由键完全匹配的队列。- 应用场景:适用于需要精确匹配路由键的场景。- 示例:假设有两个队列 A 和 B,A 绑定了路由键 key1,B 绑定了路由键 key2。当生产者发送一条路由键为 key1 的消息时,只有队列 A 会接收到这条消息。
  2. Fanout Exchange(扇出交换机):- 功能:将消息广播到所有绑定到该交换机的队列,不考虑路由键。- 应用场景:适用于需要将消息广播到多个队列的场景。- 示例:假设有两个队列 A 和 B 都绑定到了一个 Fanout 交换机上。当生产者发送一条消息到该交换机时,A 和 B 都会接收到这条消息。
  3. Topic Exchange(主题交换机):- 功能:基于路由键的模式匹配(使用通配符)将消息发送到匹配的队列。- 应用场景:适用于需要基于模式匹配路由键的场景。- 示例:假设有两个队列 A 和 B,A 绑定了路由键模式 key.*,B 绑定了路由键模式 key.#。当生产者发送一条路由键为 key.test 的消息时,A 和 B 都会接收到这条消息。
  4. Headers Exchange(头交换机):- 功能:基于消息的头部属性进行匹配,将消息发送到匹配的队列。- 应用场景:适用于需要基于消息头部属性进行路由的场景。- 示例:这种交换机使用较少,通常在特定情况下才会使用。

交换机的作用

  • 消息路由:交换机根据路由规则将消息分发到相应的队列。
  • 解耦生产者和消费者:生产者只需将消息发送到交换机,不需要知道消息的最终目的地队列。
  • 灵活性和扩展性:通过不同类型的交换机,可以实现复杂的消息路由逻辑,满足各种业务需求。

示例代码

以下是如何使用 Direct Exchange 和 Fanout Exchange 的示例代码:

Direct Exchange 示例
const amqp =require('amqplib/callback_api');

amqp.connect('amqp://localhost',function(error0, connection){if(error0){throw error0;}
  connection.createChannel(function(error1, channel){if(error1){throw error1;}const exchange ='direct_logs';const msg ='Hello World!';const routingKey ='key1';

    channel.assertExchange(exchange,'direct',{ durable:true});
    channel.publish(exchange, routingKey, Buffer.from(msg));
    console.log(" [x] Sent %s: '%s'", routingKey, msg);});setTimeout(function(){
    connection.close();
    process.exit(0);},500);});
Fanout Exchange 示例
const amqp =require('amqplib/callback_api');

amqp.connect('amqp://localhost',function(error0, connection){if(error0){throw error0;}
  connection.createChannel(function(error1, channel){if(error1){throw error1;}const exchange ='logs';const msg ='Hello World!';

    channel.assertExchange(exchange,'fanout',{ durable:true});
    channel.publish(exchange,'', Buffer.from(msg));
    console.log(" [x] Sent %s", msg);});setTimeout(function(){
    connection.close();
    process.exit(0);},500);});
Topic Exchange 示例

Topic Exchange 允许使用通配符进行路由,支持更复杂的路由规则。

发布者代码
const amqp =require('amqplib/callback_api');

amqp.connect('amqp://localhost',function(error0, connection){if(error0){throw error0;}
  connection.createChannel(function(error1, channel){if(error1){throw error1;}const exchange ='topic_logs';const msg ='Hello World!';const routingKey ='quick.orange.rabbit';

    channel.assertExchange(exchange,'topic',{ durable:true});
    channel.publish(exchange, routingKey, Buffer.from(msg));
    console.log(" [x] Sent %s: '%s'", routingKey, msg);});setTimeout(function(){
    connection.close();
    process.exit(0);},500);});
消费者代码
const amqp =require('amqplib/callback_api');

amqp.connect('amqp://localhost',function(error0, connection){if(error0){throw error0;}
  connection.createChannel(function(error1, channel){if(error1){throw error1;}const exchange ='topic_logs';const queue ='topic_queue';

    channel.assertExchange(exchange,'topic',{ durable:true});
    channel.assertQueue(queue,{ durable:true});// 绑定队列到交换机,使用通配符
    channel.bindQueue(queue, exchange,'*.orange.*');

    channel.consume(queue,function(msg){if(msg.content){
        console.log(" [x] Received %s: '%s'", msg.fields.routingKey, msg.content.toString());}},{ noAck:true});});});

在这个示例中,发布者将消息发送到

topic_logs

交换机,使用路由键

quick.orange.rabbit

。消费者绑定到

topic_logs

交换机,使用通配符

*.orange.*

,因此会接收到所有包含

orange

的消息。

Headers Exchange 示例

Headers Exchange 基于消息头部属性进行路由,适用于需要复杂路由规则的场景。

发布者代码
const amqp =require('amqplib/callback_api');

amqp.connect('amqp://localhost',function(error0, connection){if(error0){throw error0;}
  connection.createChannel(function(error1, channel){if(error1){throw error1;}const exchange ='headers_logs';const msg ='Hello World!';

    channel.assertExchange(exchange,'headers',{ durable:true});
    channel.publish(exchange,'', Buffer.from(msg),{
      headers:{'format':'pdf','type':'report'}});
    console.log(" [x] Sent %s", msg);});setTimeout(function(){
    connection.close();
    process.exit(0);},500);});
消费者代码
const amqp =require('amqplib/callback_api');

amqp.connect('amqp://localhost',function(error0, connection){if(error0){throw error0;}
  connection.createChannel(function(error1, channel){if(error1){throw error1;}const exchange ='headers_logs';const queue ='headers_queue';

    channel.assertExchange(exchange,'headers',{ durable:true});
    channel.assertQueue(queue,{ durable:true});// 绑定队列到交换机,使用头部属性
    channel.bindQueue(queue, exchange,'',{'x-match':'all','format':'pdf','type':'report'});

    channel.consume(queue,function(msg){if(msg.content){
        console.log(" [x] Received %s", msg.content.toString());}},{ noAck:true});});});

在这个示例中,发布者将消息发送到

headers_logs

交换机,并设置消息头部属性

format: pdf

type: report

。消费者绑定到

headers_logs

交换机,使用头部属性匹配

format: pdf

type: report

,因此会接收到符合这些头部属性的消息。


本文转载自: https://blog.csdn.net/canduecho/article/details/142821768
版权归原作者 田猿笔记 所有, 如有侵权,请联系我们删除。

“RabbitMQ 交换机的类型”的评论:

还没有评论