作为MQ初学者的我,写下这篇博客用来加深对MQ代码的认识和了解。后续的MQ都是使用Spring集成进来的,与此文无关。
1.七种工作模式概述
- 简单模式(Simple)
- 工作队列模式(Work Queue)
- 发布订阅模式(Publish/Subscribe)
- 路由模式(Routing)
- 通配符模式(Topics)
- RPC模式(RPC)
- 发布确认模式(Publish Confirms)
上述工作模式,其中1-5使用的比较多,6-7较少,代码难度也比较大。
2.简单与工作队列模式
(1)简单模式
简单模式是rabbitmq的入门模式,也是最简单的
1)工作模式图
这种模式下,只有一个生产者和一个消费者,中间使用一个阻塞队列来连接.
特点:一个生产者,一个消费者,消息只能被消费一次。也称为点对点模式
2)代码写法
生产者:
建立连接--开启信道--指定队列(有不创无则创)--发送信息--关闭连接
/**
* 生产者代码编写
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接Connection
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.138.121.41");//主机ip
factory.setPort(5672);//端口号
factory.setUsername("study");//用户名
factory.setPassword("study");//用户密码
factory.setVirtualHost("test01");//虚拟主机
Connection connection = factory.newConnection();
//2.开启信道Channel
Channel channel = connection.createChannel();
//3.指定队列
/*
queueDeclare(String queue, boolean durable, boolean exclusive,
boolean autoDelete, Map<String, Object> arguments)
1.queue: 队列名称
2.durable: 是否持久化, 当mq重启之后, 消息还在
3.exclusive:是否独占, 只能有⼀个消费者监听队列,当Connection关闭时, 是否删除队列
4.autoDelete: 是否⾃动删除, 当没有Consumer时, ⾃动删除掉
5.arguments: ⼀些参数
*/
channel.queueDeclare("queue01", true, false, false, null);
//4.发送信息
channel.basicPublish("", "queue01", null, "Hello World".getBytes());
//5.资源释放
channel.close();
connection.close();
}
}
消费者:
建立连接--开启信道--指定队列(有不创无则创)--消费信息--关闭连接
//消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.138.121.41");//主机ip
factory.setPort(5672);//端口号
factory.setUsername("study");//用户名
factory.setPassword("study");//用户密码
factory.setVirtualHost("test01");//虚拟主机
Connection connection = factory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.指定队列
channel.queueDeclare("queue01", true, false, false, null);
//前面三步和第五步跟生产者模型一致
//4.消费信息
/*
回调⽅法, 当收到消息后, 会⾃动执⾏该⽅法
1. consumerTag: 标识
2. envelope: 获取⼀些信息, 交换机, 路由key
3. properties: 配置信息
4. body: 数据
*/
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息后执行的逻辑,打印:"+new String(body));
}
};
/**
* String basicConsume(String var1, boolean var2, Consumer var3)
* 参数解释:
* var1: 队列名称
* autoAck: 是否自动确认(信息)
* callback: 接收到消息后,执行的逻辑(这里是打印消息) -- 回调方法
*/
channel.basicConsume("queue01", true, consumer);//消费消息
//5.释放连接
channel.close();
connection.close();
}
}
生产者和消费者的代码基本相同,区别就在消费消息和生产消息。
生产消息到指定队列中:
//4.发送信息
channel.basicPublish("", "queue01", null, "Hello World".getBytes());
第一个参数:指定交换机,不写则默认
第二个参数:指定消息存放的队列
第三个参数:是否携带额外的属性(比如优先级等)
第四个参数:消息体
(2)工作队列模式
工作队列模式在简单模式的基础上多加一个消费者,两个消费者共同消费一份信息(先到先得原则)
1)工作模式图
特点:
每个消费者会获得不同的消息,并且不会重复。
适用场景:集群环境中做异步处理
2)代码写法
生产者:和简单模式的一样
/**
* 生产者代码编写
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接Connection
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.138.121.41");//主机ip
factory.setPort(5672);//端口号
factory.setUsername("study");//用户名
factory.setPassword("study");//用户密码
factory.setVirtualHost("test01");//虚拟主机
Connection connection = factory.newConnection();
//2.开启信道Channel
Channel channel = connection.createChannel();
//3.指定队列
channel.queueDeclare("queue01", true, false, false, null);
//4.发送信息
for(int i=0;i<10;i++) {
String msg = "简单队列模式: "+i;
channel.basicPublish("", "work.queue", null, msg.getBytes());
}
//5.资源释放
channel.close();
connection.close();
}
}
消费者1:和简单模式一样
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("8.138.121.41");//主机ip
factory.setPort(5672);//端口号
factory.setUsername("study");//用户名
factory.setPassword("study");//用户密码
factory.setVirtualHost("test01");//虚拟主机
Connection connection = factory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.指定队列(使用默认的交换机)
channel.queueDeclare("work.queue",true,false,false,null);
//4.消费消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息:"+new String(body));
}
};
channel.basicConsume("work.queue", true, consumer);//消费消息
//5.关闭连接
}
}
另一个消费者也一样,这里就不多余写了
结果:
由此可见,这两个消费者消费同一个队列的消息,彼此获得消息各不相同。
3.发布订阅模式
发布订阅模式,一个生产者生产的消息,只要订阅了队列,就可以拿到消息。每个人都有份切不重复。
(1)工作模式图
(2)代码写法
这是一个存放常量的类:
public class Constants {
//建立连接需要的常量
public static final String HOST = "8.138.121.41";//主机
public static final int PORT = 5672;//端口号
public static final String USER_NAME = "study";//用户名
public static final String PASSWORD = "study";//用户密码
public static final String VIRTUAL_HOST = "test01";//虚拟主机
//工作队列模式
public static final String WORK_QUEUE = "work.queue";
//发布订阅模式
public static final String FANOUT_EXCHANGE = "fanout.exchange";//交换机
public static final String FANOUT_QUEUE1 = "fanout.queue1";//队列
public static final String FANOUT_QUEUE2 = "fanout.queue2";
//路由模式
public static final String DIRECT_EXCHANGE = "direct.exchange";//交换机
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
//通配符模式
public static final String TOPIC_EXCHANGE = "topic.exchange";//交换机
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
//rpc模式
public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
//发布确认模式
public static final String PUBLISH_CONFIRMS_QUEUE1 = "publish.confirms.queue1";
public static final String PUBLISH_CONFIRMS_QUEUE2 = "publish.confirms.queue2";
public static final String PUBLISH_CONFIRMS_QUEUE3 = "publish.confirms.queue3";
}
1)生产者
public class Produce {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接Connection
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.HOST);//主机ip
factory.setPort(Constants.PORT);//端口号
factory.setUsername(Constants.USER_NAME);//用户名
factory.setPassword(Constants.PASSWORD);//用户密码
factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
Connection connection = factory.newConnection();
//2.开启信道Channel
Channel channel = connection.createChannel();
//3.指定交换机
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
//4.指定队列
channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
//5.交换机和队列进行绑定(两个队列绑定同一个交换机)
channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
//6.发送信息
for(int i=0;i<10;i++) {
String msg = "发布订阅模式: "+i;
channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());
}
//5.资源释放
channel.close();
connection.close();
}
}
这里和简单模式相比,多了指定交换机、交换机和队列进行绑定这两步。
指定交换机:
//3.指定交换机
channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
- 第一个参数:指定的交换机
- 第二个参数:交换机的类型(有三种,这里是FANOT模式,也就是发布订阅模式)
- 第三个参数:数据是否可持久化
交换机和队列绑定:
//5.交换机和队列进行绑定(两个队列绑定同一个交换机)
channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
- 第一个参数:绑定的队列是谁
- 第二个参数:要绑定的交换机是谁
- 第三个参数:routingkey,也就是路由规则。发布订阅模式为null
2)消费者
这里的消费者和简单模式一样。并且生产者已经绑定好了交换机和队列的关系,所以这里无须指定,直接使用队列即可。
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.HOST);//主机ip
factory.setPort(Constants.PORT);//端口号
factory.setUsername(Constants.USER_NAME);//用户名
factory.setPassword(Constants.PASSWORD);//用户密码
factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
Connection connection = factory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.指定队列(使用默认的交换机)
channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
//4.消费消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息:"+new String(body));
}
};
channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//消费消息
//5.关闭连接
channel.close();
connection.close();
}
}
另一个消费者的代码也一样。
结果:
队列中的消息,两个消费者共享,也就是每个消费者都会有一份。
4.路由模式
路由模式就是发布订阅模式的变种,指定路由的方式。
(1)工作模式图
(2)代码写法
1)生产者
public class Produce {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.HOST);//主机ip
factory.setPort(Constants.PORT);//端口号
factory.setUsername(Constants.USER_NAME);//用户名
factory.setPassword(Constants.PASSWORD);//用户密码
factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
Connection connection = factory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.指定交换机
channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT,true);
//4.指定队列
channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
//5.绑定路由关系
channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");
channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a");
channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b");
channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");
//6.发送消息
String msg = "hello direct, my routingkey is a....";
channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg.getBytes());
String msg_b = "hello direct, my routingkey is b....";
channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());
String msg_c = "hello direct, my routingkey is c....";
channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());
System.out.println("消息发送成功");
//7. 释放资源
channel.close();
connection.close();
}
}
根据指定的路由规则,就可以把特定的消息发送到对应的队列中。
2)消费者
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.HOST);//主机ip
factory.setPort(Constants.PORT);//端口号
factory.setUsername(Constants.USER_NAME);//用户名
factory.setPassword(Constants.PASSWORD);//用户密码
factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
Connection connection = factory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
//4.消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息:"+new String(body));
}
};
channel.basicConsume(Constants.DIRECT_QUEUE2,true,consumer);
}
}
3)结果
5.通配符模式
这个也是发布订阅模式的变种
(2)代码写法
1)生产者
public class Produce {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.HOST);//主机ip
factory.setPort(Constants.PORT);//端口号
factory.setUsername(Constants.USER_NAME);//用户名
factory.setPassword(Constants.PASSWORD);//用户密码
factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
Connection connection = factory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.声明交换机
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true);
//4.声明队列
channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);
channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);
//5.绑定交换机和队列
channel.queueBind(Constants.TOPIC_QUEUE1,Constants.TOPIC_EXCHANGE,"*.a.*");
channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"*.*.b");
channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"c.#");
//6.发送消息
String msg = "hello topic, my routingkey is ae.a.f....";
channel.basicPublish(Constants.TOPIC_EXCHANGE,"ae.a.f", null, msg.getBytes()); //转发到Q1
String msg_b = "hello topic, my routingkey is ef.a.b....";
channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b", null, msg_b.getBytes()); //转发到Q1和Q2
String msg_c = "hello topic, my routingkey is c.ef.d....";
channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.ef.d", null, msg_c.getBytes());//转发Q2
System.out.println("消息发送成功");
//7. 释放资源
channel.close();
connection.close();
}
}
2)消费者
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.HOST);//主机ip
factory.setPort(Constants.PORT);//端口号
factory.setUsername(Constants.USER_NAME);//用户名
factory.setPassword(Constants.PASSWORD);//用户密码
factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
Connection connection = factory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.声明队列
channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);
//4.消费消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到的消息:"+new String(body));
}
};
channel.basicConsume(Constants.TOPIC_QUEUE1,true,consumer);
}
}
另一个消费者负责消费队列2中的消息
3)结果
6.RPC模式
这个模式分为一个请求队列和响应队列,客户端把请求发送到请求队列中,服务器会根据请求队列中的请求,把对应的响应(带有编号)放入响应队列中,客户端取出即可。
**(1)工作模式图 **
(2)代码写法
客户端:
public class RpcClient {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//1.建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.HOST);//主机ip
factory.setPort(Constants.PORT);//端口号
factory.setUsername(Constants.USER_NAME);//用户名
factory.setPassword(Constants.PASSWORD);//用户密码
factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
Connection connection = factory.newConnection();
//2.开启信道声明对应
Channel channel = connection.createChannel();
channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
//3.发送请求(放入请求队列)
//3.1.构造消息体
String msg = "我是一条rpc请求消息";
//3.2.消息唯一标识符(序号)
String correlationID = UUID.randomUUID().toString();
//3.3.设置消息的属性
AMQP.BasicProperties props = new AMQP.BasicProperties()
.builder()
.correlationId(correlationID)//消息标识符
.replyTo(Constants.RPC_RESPONSE_QUEUE)//响应放回的队列
.build();
channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,props,msg.getBytes());
//4.接收响应(从响应队列获取)
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body);
System.out.println("接收到回调消息"+msg);
//放入阻塞队列中
if(correlationID.equals(properties.getCorrelationId())) {
response.offer(msg);
}
}
};
channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,true,consumer);
//5.从阻塞队列中取出响应
String result = response.take();
System.out.println("接收到的响应:[]"+result);
}
}
服务端:
public class RpcServer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.建立连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.HOST);//主机ip
factory.setPort(Constants.PORT);//端口号
factory.setUsername(Constants.USER_NAME);//用户名
factory.setPassword(Constants.PASSWORD);//用户密码
factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
Connection connection = factory.newConnection();
//2.开启信道
Channel channel = connection.createChannel();
//3.接受请求并返回响应
channel.basicQos(1);//每次接受一个请求
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String request = new String(body,"UTF-8");
System.out.println("接收到请求:"+ request);
String response = "针对request:"+ request +", 响应成功";
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
.correlationId(properties.getCorrelationId())
.build();
channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());//返回响应
channel.basicAck(envelope.getDeliveryTag(), false);//手动确认
}
};
channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,consumer);
}
}
7.发布确认模式
背景:
所以发布确认模式是针对生产者的,确定消息已经发送出去,如果没收到,会让发送方重新发送。
和前面的模式大同小异,这里需要将信道设置为确认模式,也需要指定队列。
发布确认模式有三种策略:单独确认、批量确认、异步确认。
(1)单独确认
private static final Integer MAX_COUNT = 100;
/**
* 建立连接
*
* @return 连接
*/
static Connection createConnection() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.HOST);//主机ip
factory.setPort(Constants.PORT);//端口号
factory.setUsername(Constants.USER_NAME);//用户名
factory.setPassword(Constants.PASSWORD);//用户密码
factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
return factory.newConnection();
}
/**
* 单独确认
*/
private static void publishingMessageIndividually() throws Exception {
try(Connection connection = createConnection()) {
//1.开启信道
Channel channel = connection.createChannel();
//2.设置信道类型(确认模式)
channel.confirmSelect();
//3.声明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE1,true,false,false,null);
//4.发送消息
long start = System.currentTimeMillis();
for(int i=0;i<MAX_COUNT;i++) {
String msg = "Message #" + i;
channel.basicPublish("",Constants.PUBLISH_CONFIRMS_QUEUE1,null,msg.getBytes());//发送消息
channel.waitForConfirmsOrDie(5000);//等待确认,超过5000ms报错
}
long end = System.currentTimeMillis();
System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MAX_COUNT, end-start);
}
}
(2)批量确认
private static final Integer MAX_COUNT = 100;
/**
* 建立连接
*
* @return 连接
*/
static Connection createConnection() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.HOST);//主机ip
factory.setPort(Constants.PORT);//端口号
factory.setUsername(Constants.USER_NAME);//用户名
factory.setPassword(Constants.PASSWORD);//用户密码
factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
return factory.newConnection();
}
/**
* 批量确认
*/
private static void publishingMessageInBatches() throws Exception {
try(Connection connection = createConnection()) {
//1.开启信道
Channel channel = connection.createChannel();
//2.设置信道类型(确认模式)
channel.confirmSelect();
//3.声明队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE2,true,false,false,null);
//4.发送消息
long start = System.currentTimeMillis();
int size = 100;//确定消息的个数
int outstandingMessageCount = 0;//计数
for(int i=0;i<MAX_COUNT;i++) {
String msg = "Message #" + i;
channel.basicPublish("",Constants.PUBLISH_CONFIRMS_QUEUE2,null,msg.getBytes());//发送消息
outstandingMessageCount++;
if(outstandingMessageCount == size) {
channel.waitForConfirmsOrDie(5000);
outstandingMessageCount = 0;
}
}
if(outstandingMessageCount > 0) {
channel.waitForConfirmsOrDie(5000);
}
long end = System.currentTimeMillis();
System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MAX_COUNT, end-start);
}
}
(3)异步确认
private static final Integer MAX_COUNT = 100;
/**
* 建立连接
*
* @return 连接
*/
static Connection createConnection() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(Constants.HOST);//主机ip
factory.setPort(Constants.PORT);//端口号
factory.setUsername(Constants.USER_NAME);//用户名
factory.setPassword(Constants.PASSWORD);//用户密码
factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
return factory.newConnection();
}
/**
* 异步确认
*/
private static void handlingPublisherConfirmsAsynchronously() throws Exception {
//可以一遍发送消息一边进行确认
try(Connection connection = createConnection()) {
//1.开启信道
Channel channel = connection.createChannel();
//2.设置确认模式
channel.confirmSelect();
//3.指定队列
channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE3,true,false,false,null);
//4.监听队列
long start = System.currentTimeMillis();
SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());//用来存储未被确认的消息序号
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long l, boolean b) throws IOException {
//收到ack
if(b) {
//批量确认
confirmSeqNo.headSet(l+1).clear();
}else {
//单独确认
confirmSeqNo.remove(l);//移除id为l的消息
}
}
@Override
public void handleNack(long l, boolean b) throws IOException {
//未收到ack
if(b) {
confirmSeqNo.headSet(l+1).clear();
}else {
confirmSeqNo.remove(l);
}
//
}
});
//5.发送消息
for(int i=0;i<MAX_COUNT;i++) {
String msg = "Message #" + i;
long seqNo = channel.getNextPublishSeqNo();//获取序号
channel.basicPublish("",Constants.PUBLISH_CONFIRMS_QUEUE3,null,msg.getBytes());
confirmSeqNo.add(seqNo);//存储序号
}
long end = System.currentTimeMillis();
System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MAX_COUNT, end-start);
}
}
版权归原作者 代码小娥 所有, 如有侵权,请联系我们删除。