一、Docker拉取镜像并启动RabbitMQ
拉取镜像
docker pull rabbitmq:3.8.8-management
查看镜像
docker images rabbitmq
启动镜像
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.8-management
Linux虚拟机记得开放5672端口或者关闭防火墙,在window通过 主机ip:15672 访问rabbitmq控制台
用户名密码默认为guest
二、Hello World
(一)依赖导入
<!--指定 jdk 编译版本-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
</dependencies>
(二)消息生产者
工作原理
Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。**Channel 作为轻量级的 ****Connection ****极大减少了操作系统建立 ****TCP connection **的开销
Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
Queue:消息最终被送到这里等待 consumer 取走
我们需要先获取连接(Connection),然后通过连接获取信道(Channel),这里我们演示简单例子,可以直接跳过交换机(Exchange)发送队列(Queue)
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置主机ip
factory.setHost("182.92.234.71");
// 设置用户名
factory.setUsername("guest");
// 设置密码
factory.setPassword("guest");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
/*
* 生成一个队列
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments)
* 1.队列名称
* 2.队列里面的消息是否持久化 默认消息存储在内存中
* 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
* 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
* 5.其他参数
**/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "hello rabbitmq";
/*
* 发送一个消息
* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
* 1.发送到哪个交换机
* 2.路由的key是哪个
* 3.其他的参数信息
* 4.发送消息的消息体
*
**/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送成功");
}
}
(三)消息消费者
public class Consumer {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置主机ip
factory.setHost("182.92.234.71");
// 设置用户名
factory.setUsername("guest");
// 设置密码
factory.setPassword("guest");
//channel 实现了自动 close 接口 自动关闭 不需要显示关闭
Connection connection = factory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
// 推送的消息如何进行消费的回调接口
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody()));
};
// 取消消费的一个回调接口,如在消费的时候队列被删除了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
/*
* 消费者消费消息
* basicConsume(String queue, boolean autoAck,
* DeliverCallback deliverCallback, CancelCallback cancelCallback)
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消费者未成功消费的回调
**/
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
三、实现轮训分发消息
(一)抽取工具类
可以发现,上面获取连接工厂,然后获取连接,再获取信道的步骤是一致的,我们可以抽取成一个工具类来调用,并使用单例模式-饿汉式完成信道的初始化
public class RabbitMqUtils {
private static Channel channel;
static {
ConnectionFactory factory = new ConnectionFactory();
// 设置ip地址
factory.setHost("192.168.23.100");
// 设置用户名
factory.setUsername("guest");
// 设置密码
factory.setPassword("guest");
try {
// 创建连接
Connection connection = factory.newConnection();
// 获取信道
channel = connection.createChannel();
} catch (Exception e) {
System.out.println("创建信道失败,错误信息:" + e.getMessage());
}
}
public static Channel getChannel() {
return channel;
}
}
(二)启动两个工作线程
相当于前面的消费者,我们只需要写一个类,通过ideal实现多线程启动即可模拟两个线程
public class Worker01 {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = ( consumerTag, message) -> {
System.out.println("接受到消息:" + new String(message.getBody()));
};
CancelCallback cancelCallback = (cunsumerTag) -> {
System.out.println("消费者取消消费接口回调逻辑");
};
// 启动两次,第一次为C1, 第二次为C2
System.out.println("C2消费者等待消费消息");
channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback);
}
}
(三)启动发送线程
public class Test01 {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 通过控制台输入充当消息,使轮训演示更明显
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
System.out.println("消息发送完成:" + message);
}
}
}
结果
四、实现手动应答
(一)消息应答概念
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成
了部分突然它挂掉了,会发生什么情况。**RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 **
**息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 **
**发送给该消费这的消息,因为它无法接收到。 **
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:**消费者在接 **
**收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。 **
自动应答:消费者发送后立即被认为已经传送成功。这种模式需要在高吞吐量和数据传输安全性方面做权****衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。
当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,
当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终 使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并****以某种速率能够处理这些消息的情况下使用。
手动应答:消费者接受到消息并顺利完成业务后再调用方法进行确认,rabbitmq 才可以把该消息删除
(二)消息应答的方法
- Channel.basicAck(用于肯定确认) - RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
- Channel.basicNack(用于否定确认)
- Channel.basicReject(用于否定确认) - 与 Channel.basicNack 相比少一个参数Multiple- multiple 的 true 和 false 代表不同意思 true 代表批量应答 channel 上未应答的消息 比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答 false 同上面相比 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答- 不处理该消息了直接拒绝,可以将其丢弃了
(三)消息自动重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确
保不会丢失任何消息。
(四)消息手动应答代码
1、生产者
public class Test01 {
private final static String QUEUE_NAME = "ack";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
System.out.println("消息发送完成:" + message);
}
}
}
2、睡眠工具类模拟业务执行
public class SleepUtils {
public static void sleep(int second) {
try {
Thread.sleep(1000 * second);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
3、消费者
public class Worker01 {
private final static String QUEUE_NAME = "ack";
public static void main(String[] args) throws Exception {
System.out.println("C1,业务时间短");
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = ( consumerTag, message) -> {
SleepUtils.sleep(1); // 模拟业务执行1秒
System.out.println("接受到消息:" + new String(message.getBody()));
/*
* 1、消息标识
* 2、是否启动批量确认,false:否。
* 启用批量有可能造成消息丢失,比如未消费的消息提前被确然删除,后面业务消费该消息
* 时出现异常会导致该消息的丢失
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = (cunsumerTag) -> {
System.out.println("消费者取消消费接口回调逻辑");
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
}
}
==============================================================================
public class Worker02 {
private final static String QUEUE_NAME = "ack";
public static void main(String[] args) throws Exception {
System.out.println("C2,业务时间长");
Channel channel = RabbitMqUtils.getChannel();
DeliverCallback deliverCallback = ( consumerTag, message) -> {
SleepUtils.sleep(15); // 模拟业务执行15秒
System.out.println("接受到消息:" + new String(message.getBody()));
/*
* 1、消息标识
* 2、是否启动批量确认,false:否。
* 启用批量有可能造成消息丢失,比如未消费的消息提前被确然删除,后面业务消费该消息
* 时出现异常会导致该消息的丢失
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
};
CancelCallback cancelCallback = (cunsumerTag) -> {
System.out.println("消费者取消消费接口回调逻辑");
};
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
}
}
worker01业务时间短,worker02业务时间长,我们提前终止worker02模拟出异常,可以看到消息dd会被放回队列由worker01接收处理。
注意:这里需要先启动生产者声明队列ack,不然启动消费者会报错
最后一个案例我们可以看到消息轮训+消息自动重新入队+手动应答。
版权归原作者 zoeil 所有, 如有侵权,请联系我们删除。