RabbitMQ概述
RabbitMQ是一个开源的消息代理软件,也称为面向消息的中间件。它实现了高级消息队列协议(AMQP),由Erlang语言编写,旨在提供高性能、健壮以及可伸缩性的消息队列服务。RabbitMQ在分布式系统开发中应用广泛,支持多种操作系统和编程语言,如Linux、Windows、macOS等操作系统,以及Python、Java、Ruby、PHP、C#、JavaScript等编程语言。
简单来说RabbitMQ 是一个流行的消息代理和队列服务器,它允许应用程序之间进行异步通信
RabbitMQ的使用案例
下面我们来看看RabbitMQ的优点及使用场景
发布与订阅模式:
- 在此模式下,RabbitMQ充当了**消息队列**的角色,连接了生产者和消费者。生产者将消息发送到队列中,而消费者则从队列中获取消息进行处理。
- 例如,一个新闻网站可能使用RabbitMQ来处理新闻稿的发布。当新闻稿被创建时,它会被发送到RabbitMQ的某个队列中。然后,该网站的多个服务(如网站前端、移动应用后端等)可以作为消费者从该队列中获取新闻稿,并实时更新其显示内容。
- 配置参数可能包括队列名称、交换机类型(如直接交换机、主题交换机等)、路由键等。在Spring Boot项目中,可以通过添加`spring-boot-starter-amqp`依赖来集成RabbitMQ。
消息持久化
- RabbitMQ支持将消息从内存持久化到硬盘,以确保在服务器重启或故障时不会丢失消息。
- 例如,一个电子商务网站可能使用RabbitMQ来处理订单。当用户提交订单时,订单信息会被发送到RabbitMQ的队列中,并立即持久化到硬盘。然后,订单处理服务可以从队列中获取订单并处理它。如果订单处理服务在处理订单时崩溃,RabbitMQ可以确保订单信息不会丢失,并在服务恢复后继续处理
集群服务
- RabbitMQ支持集群服务,可以通过添加多个RabbitMQ节点来提高系统的可伸缩性和容错性。
- 例如,一个大型社交媒体应用可能使用RabbitMQ来处理用户之间的消息传递。为了处理大量的并发消息,该应用可以部署一个RabbitMQ集群,由多个节点组成。每个节点都可以处理一部分消息,从而提高整个系统的吞吐量
高可用性
- RabbitMQ的集群和故障转移机制可以确保在高负载或故障情况下系统的可用性和可靠性。
- 例如,在一个银行系统中,RabbitMQ可能被用于处理交易消息。为了确保系统的稳定性和可靠性,可以使用RabbitMQ的集群和故障转移机制来确保即使部分节点出现故障,整个系统仍然可以正常运行并处理交易消息。
下面我们来举几个RabbitMQ的例子来帮你加深一下印象
- 简单的消息发送和接收
发送者(Producer)
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
接收者(Consumer)
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
- 订单处理系统
订单发送(Order Producer)
// 省略了部分代码,只显示发送订单到队列的部分
// ...
channel.basicPublish("orders_queue", "", null, orderData.getBytes("UTF-8"));
// ...
订单处理(Order Consumer)
// 省略了部分代码,只显示从队列接收订单并处理的部分
// ...
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String orderData = new String(body, "UTF-8");
// 处理订单...
System.out.println("Processing order: " + orderData);
}
};
channel.basicConsume("orders_queue", true, consumer);
// ...
3.日志发送和收集
日志发送者(Log Producer)
// 省略了部分代码,只显示发送日志到队列的部分
// ...
String logEntry = "Application log entry at " + new Date();
channel.basicPublish("logs", "", null, logEntry.getBytes("UTF-8"));
// ...
日志收集器(Log Consumer)
作为日志收集器(Log Consumer)来消费 RabbitMQ 队列中的日志消息。该应用程序将消费名为 "logs" 的队列,并将收到的消息打印到控制台
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class LogConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
// 获取连接
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
// 声明队列(如果队列已存在,则不需要声明)
channel.queueDeclare("logs", false, false, false, null);
// 定义消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received message: " + message);
}
};
// 开始消费队列中的消息
channel.basicConsume("logs", true, consumer);
}
}
版权归原作者 無限神樂 所有, 如有侵权,请联系我们删除。