RabbitMQ架构设计
- Producer:负责产生消息。
- Connection:RabbitMQ客户端和代理服务器之间的TCP连接。
- Channel:建立在连接之上的虚拟连接,RabbitMQ操作都是在信道中进行。
- Broker:一个Broker可以看做一个RabbitMQ服务节点或者服务实例。
- Exchange:生产者发送消息到交换器,交换器根据路由key投递到相应的队列。
- Queue:存储消息的队列 。
- RoutingKey:路由键,指定消息的路由规则。
- BindingKey:绑定键,关联交换器和队列。
- Consumer:消费消息。
路由机制
- Direct:默认方式,根据消息的路由键完全匹配队列的绑定键来分发消息。
- fanout:广播模式,将消息投递到所有绑定到交换器的队列。
- topic:使用模糊匹配的方式根据路由键将消息分发到不同的队列中,支持通配符(*和#)进行匹配。
- header:不依赖路由键,而是根据消息的头部信息来进行匹配和分发。
连接RabbitMQ
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setVirtualHost(VIRTUAL_HOST);
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
Connection 可以用来创建多个 Channel,但是 Channel 不能线程共享使用。channel 的开启有一个 isOpen 方法可以得知
com.rabbitmq.client.impl.ShutdownNotifierComponent#isOpen
@Override
public boolean isOpen() {
synchronized(this.monitor) {
return this.shutdownCause == null;
}
}
生产者发送消息
- 生产者连接到 RabbitMO Broker,建立一个连接(Connection),开启一个信道(Channel)
- 生产者声明一个交换器,并设置相关属性,比如交换器类型、是否持久化等
- 生产者声明一个队列并设置相关属性,比如是否排他、是否持久化、是否自动删除等
- 生产者通过路由键将交换器和队列绑定起来
- 生产者发送消息至 RabbitMO Broker,其中包含路由键、交换器等信息
- 相应的交换器根据接收到的路由键查找相匹配的队列。
- 如果找到,则将从生产者发送过来的消息存入相应的队列中。
- 如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
- 关闭信道。
- 关闭连接。
交换器和队列
创建临时队列
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
final String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, EXCHANGE_NAME, routingKey);
上面创建一个持久化的、绑定类型为 direct 的交换器,同时也创建了一个非持久化的、排他的、自动删除的队列(队列名称由 RabbitMQ 自动生成)。
创建持久化队列
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(queue, EXCHANGE_NAME, routingKey);
分配一个固定的队列名称,并设置持久化、非排他的、非自动删除的队列
生产者和消费者都可以声明一个交换器或则队列,如果尝试声明一个已经存在的交换器或队列(只要声明的参数完全匹配已存在的交换器或队列),RabbitMQ 则什么都不做,直接返回成功。如果参数不匹配则会抛出异常。
创建交换器
public Exchange.DeclareOk exchangeDeclare(String exchange, String type,
boolean durable,
boolean autoDelete,
boolean internal,
Map<String, Object> arguments)
返回 Exchange.DeclareOk 标识成功声明了一个交换器
- exchange:交换器名称
- type:交换器类型;常见的有:fanout、direct、topic...
com.rabbitmq.client.BuiltinExchangeType
类定义了交换器类型 - durable:是否持久化持久化将交换器存盘,服务重启时不会丢失相关信息
- autoDelete:是否自动删除;自动删除的前提是:至少有一个队列或则交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或则交换器都与此解绑。注意:这里自动删除,不是当连接断开时,自动删除这个交换器。
- internal:是否内置的;如果是内置的交换器,客户端程序无法直接发送消息到这个交换器 中,只能通过交换器路由 到交换器这种方式。
- arguments:其他一些结构化参数
删除交换器
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
- exchange:交换器名称
- isUnused:设置为 true ,则只有交换器没有被使用时,才被删除。
创建队列
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
- queue:队列名称
- durable:是否持久化
- exclusive:是否排他;当一个队列被声明为排他队列,该队列 仅对首次声明它 的连接可见,并在连接断开时自动删除。这里需要注意一点:就算是持久化的,一旦连接关闭,这个排他队列也会被自动删除。
- autoDelete:是否自动删除,与交换器定义一致;
- arguments:设置队列的其他一些参数如
x-message-ttl
、x-expires
、x-max-length
、x-max-length-bytes
、x-dead-letter-exchange
、x-dead-letter-routing-key
、x-max-priority
删除队列
Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException;
队列绑定交换器
Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
- queue:队列名
- exchange:交换器名称
- routingKey:用来绑定队列和交换器的路由键
- arguments:定义绑定的一些参数
交换器与交换器绑定
Exchange.BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
发送消息
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
throws IOException;
- exchange:交换器名称,如果为空,则会发送到 RabbitMQ 默认的交换器中
- routingKey:路由键
- mandatory:mandatory 参数设为 true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么 RabbitMQ会调用 Basic.Return 命令将消息返回给生产者 。当 mandatory 数设置为 false 时,出现上述情形,则消息直接被丢弃那么生产者如何获取到没有被正确路由到合适队列的消息呢?这时候可以通过调用channel addReturnListener 来添加 ReturnListener 监昕器实现。
- props:消息的基本属性集
消费消息
RabbitMQ 消费模式分两种:
- Push:推模式;采用 Basic.Consume 进行消费
- Pull:拉模式;则使用 Basic.Get 进行消费
消息分发
当RabbitMQ 队列拥有多个消费者时 ,队列收到的消息将以轮询 (round-robin )的分发方式发送给消费者。每条消息只会发送给订阅列表里的一个消费者。这种方式非常适合扩展,而且它是专门为并发程序设计的。如果现在负载加重,那么只需要创建更多的消费者来消费处理消息即可。
默认情况下,如果有 个消费者,那么 RabbitMQ会将第 条消息分发给第 m%n (取余的方式)个消费者, RabbitMQ 不管消费者是否消费并己经确认 (Basic.Ack) 了消息。
如果某些消费者任务繁重,来不及消费那么多的消息,而某些其他消费者由于某些原因很快地处理完了所分配到的消息,进而进程空闲,这样就会造成整体应用吞吐量的下降。
这里就要用到 channel.basicQos(int prefetchCount) 这个方法,channel.basicQos 方法允许限制信道上的消费者所能保持的最大未确认消息的数量。例如在订阅消费队列之前,消费端程序调用了 channel.basicQos(5) ,之后订阅了某个队列进行消费。 RabbitMQ会保存一个消费者的列表,每发送一条消息都会为对应的消费者计数,如果达到了所设定的上限,那么 RabbitMQ 就不会向这个消费者再发送任何消息。直到消费者确认了某条消息之后 RabbitMQ 将相应的计数减1,之后消费者可以继续接收消息,直到再次到达计数上限。
Basic.Qos 的使用对于拉模式的消费方式无效.
版权归原作者 知知之之 所有, 如有侵权,请联系我们删除。