0


RabbitMQ的七种工作模式代码介绍

作为MQ初学者的我,写下这篇博客用来加深对MQ代码的认识和了解。后续的MQ都是使用Spring集成进来的,与此文无关。

1.七种工作模式概述
  1. 简单模式(Simple)
  2. 工作队列模式(Work Queue)
  3. 发布订阅模式(Publish/Subscribe)
  4. 路由模式(Routing)
  5. 通配符模式(Topics)
  6. RPC模式(RPC)
  7. 发布确认模式(Publish Confirms)

上述工作模式,其中1-5使用的比较多,6-7较少,代码难度也比较大。

2.简单与工作队列模式

(1)简单模式

简单模式是rabbitmq的入门模式,也是最简单的

1)工作模式图

这种模式下,只有一个生产者和一个消费者,中间使用一个阻塞队列来连接.

特点:一个生产者,一个消费者,消息只能被消费一次。也称为点对点模式

2)代码写法

生产者:

建立连接--开启信道--指定队列(有不创无则创)--发送信息--关闭连接

  1. /**
  2. * 生产者代码编写
  3. */
  4. public class Producer {
  5. public static void main(String[] args) throws IOException, TimeoutException {
  6. //1.建立连接Connection
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("8.138.121.41");//主机ip
  9. factory.setPort(5672);//端口号
  10. factory.setUsername("study");//用户名
  11. factory.setPassword("study");//用户密码
  12. factory.setVirtualHost("test01");//虚拟主机
  13. Connection connection = factory.newConnection();
  14. //2.开启信道Channel
  15. Channel channel = connection.createChannel();
  16. //3.指定队列
  17. /*
  18. queueDeclare(String queue, boolean durable, boolean exclusive,
  19. boolean autoDelete, Map<String, Object> arguments)
  20. 1.queue: 队列名称
  21. 2.durable: 是否持久化, 当mq重启之后, 消息还在
  22. 3.exclusive:是否独占, 只能有⼀个消费者监听队列,当Connection关闭时, 是否删除队列
  23. 4.autoDelete: 是否⾃动删除, 当没有Consumer时, ⾃动删除掉
  24. 5.arguments: ⼀些参数
  25. */
  26. channel.queueDeclare("queue01", true, false, false, null);
  27. //4.发送信息
  28. channel.basicPublish("", "queue01", null, "Hello World".getBytes());
  29. //5.资源释放
  30. channel.close();
  31. connection.close();
  32. }
  33. }

消费者:

建立连接--开启信道--指定队列(有不创无则创)--消费信息--关闭连接

  1. //消费者
  2. public class Consumer {
  3. public static void main(String[] args) throws IOException, TimeoutException {
  4. //1.建立连接
  5. ConnectionFactory factory = new ConnectionFactory();
  6. factory.setHost("8.138.121.41");//主机ip
  7. factory.setPort(5672);//端口号
  8. factory.setUsername("study");//用户名
  9. factory.setPassword("study");//用户密码
  10. factory.setVirtualHost("test01");//虚拟主机
  11. Connection connection = factory.newConnection();
  12. //2.开启信道
  13. Channel channel = connection.createChannel();
  14. //3.指定队列
  15. channel.queueDeclare("queue01", true, false, false, null);
  16. //前面三步和第五步跟生产者模型一致
  17. //4.消费信息
  18. /*
  19. 回调⽅法, 当收到消息后, 会⾃动执⾏该⽅法
  20. 1. consumerTag: 标识
  21. 2. envelope: 获取⼀些信息, 交换机, 路由key
  22. 3. properties: 配置信息
  23. 4. body: 数据
  24. */
  25. DefaultConsumer consumer = new DefaultConsumer(channel) {
  26. @Override
  27. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  28. System.out.println("接收到的消息后执行的逻辑,打印:"+new String(body));
  29. }
  30. };
  31. /**
  32. * String basicConsume(String var1, boolean var2, Consumer var3)
  33. * 参数解释:
  34. * var1: 队列名称
  35. * autoAck: 是否自动确认(信息)
  36. * callback: 接收到消息后,执行的逻辑(这里是打印消息) -- 回调方法
  37. */
  38. channel.basicConsume("queue01", true, consumer);//消费消息
  39. //5.释放连接
  40. channel.close();
  41. connection.close();
  42. }
  43. }

生产者和消费者的代码基本相同,区别就在消费消息和生产消息。

生产消息到指定队列中:

  1. //4.发送信息
  2. channel.basicPublish("", "queue01", null, "Hello World".getBytes());

第一个参数:指定交换机,不写则默认

第二个参数:指定消息存放的队列

第三个参数:是否携带额外的属性(比如优先级等)

第四个参数:消息体

(2)工作队列模式

工作队列模式在简单模式的基础上多加一个消费者,两个消费者共同消费一份信息(先到先得原则)

1)工作模式图

特点:

每个消费者会获得不同的消息,并且不会重复。

适用场景:集群环境中做异步处理

2)代码写法

生产者:和简单模式的一样

  1. /**
  2. * 生产者代码编写
  3. */
  4. public class Producer {
  5. public static void main(String[] args) throws IOException, TimeoutException {
  6. //1.建立连接Connection
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("8.138.121.41");//主机ip
  9. factory.setPort(5672);//端口号
  10. factory.setUsername("study");//用户名
  11. factory.setPassword("study");//用户密码
  12. factory.setVirtualHost("test01");//虚拟主机
  13. Connection connection = factory.newConnection();
  14. //2.开启信道Channel
  15. Channel channel = connection.createChannel();
  16. //3.指定队列
  17. channel.queueDeclare("queue01", true, false, false, null);
  18. //4.发送信息
  19. for(int i=0;i<10;i++) {
  20. String msg = "简单队列模式: "+i;
  21. channel.basicPublish("", "work.queue", null, msg.getBytes());
  22. }
  23. //5.资源释放
  24. channel.close();
  25. connection.close();
  26. }
  27. }

消费者1:和简单模式一样

  1. public class Consumer1 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.建立连接
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost("8.138.121.41");//主机ip
  6. factory.setPort(5672);//端口号
  7. factory.setUsername("study");//用户名
  8. factory.setPassword("study");//用户密码
  9. factory.setVirtualHost("test01");//虚拟主机
  10. Connection connection = factory.newConnection();
  11. //2.开启信道
  12. Channel channel = connection.createChannel();
  13. //3.指定队列(使用默认的交换机)
  14. channel.queueDeclare("work.queue",true,false,false,null);
  15. //4.消费消息
  16. Consumer consumer = new DefaultConsumer(channel) {
  17. @Override
  18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  19. System.out.println("接收到的消息:"+new String(body));
  20. }
  21. };
  22. channel.basicConsume("work.queue", true, consumer);//消费消息
  23. //5.关闭连接
  24. }
  25. }

另一个消费者也一样,这里就不多余写了

结果:

由此可见,这两个消费者消费同一个队列的消息,彼此获得消息各不相同。

3.发布订阅模式

发布订阅模式,一个生产者生产的消息,只要订阅了队列,就可以拿到消息。每个人都有份切不重复。

(1)工作模式图

(2)代码写法

这是一个存放常量的类:

  1. public class Constants {
  2. //建立连接需要的常量
  3. public static final String HOST = "8.138.121.41";//主机
  4. public static final int PORT = 5672;//端口号
  5. public static final String USER_NAME = "study";//用户名
  6. public static final String PASSWORD = "study";//用户密码
  7. public static final String VIRTUAL_HOST = "test01";//虚拟主机
  8. //工作队列模式
  9. public static final String WORK_QUEUE = "work.queue";
  10. //发布订阅模式
  11. public static final String FANOUT_EXCHANGE = "fanout.exchange";//交换机
  12. public static final String FANOUT_QUEUE1 = "fanout.queue1";//队列
  13. public static final String FANOUT_QUEUE2 = "fanout.queue2";
  14. //路由模式
  15. public static final String DIRECT_EXCHANGE = "direct.exchange";//交换机
  16. public static final String DIRECT_QUEUE1 = "direct.queue1";
  17. public static final String DIRECT_QUEUE2 = "direct.queue2";
  18. //通配符模式
  19. public static final String TOPIC_EXCHANGE = "topic.exchange";//交换机
  20. public static final String TOPIC_QUEUE1 = "topic.queue1";
  21. public static final String TOPIC_QUEUE2 = "topic.queue2";
  22. //rpc模式
  23. public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
  24. public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
  25. //发布确认模式
  26. public static final String PUBLISH_CONFIRMS_QUEUE1 = "publish.confirms.queue1";
  27. public static final String PUBLISH_CONFIRMS_QUEUE2 = "publish.confirms.queue2";
  28. public static final String PUBLISH_CONFIRMS_QUEUE3 = "publish.confirms.queue3";
  29. }

1)生产者

  1. public class Produce {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.建立连接Connection
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost(Constants.HOST);//主机ip
  6. factory.setPort(Constants.PORT);//端口号
  7. factory.setUsername(Constants.USER_NAME);//用户名
  8. factory.setPassword(Constants.PASSWORD);//用户密码
  9. factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
  10. Connection connection = factory.newConnection();
  11. //2.开启信道Channel
  12. Channel channel = connection.createChannel();
  13. //3.指定交换机
  14. channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
  15. //4.指定队列
  16. channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);
  17. channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);
  18. //5.交换机和队列进行绑定(两个队列绑定同一个交换机)
  19. channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
  20. channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
  21. //6.发送信息
  22. for(int i=0;i<10;i++) {
  23. String msg = "发布订阅模式: "+i;
  24. channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());
  25. }
  26. //5.资源释放
  27. channel.close();
  28. connection.close();
  29. }
  30. }

这里和简单模式相比,多了指定交换机、交换机和队列进行绑定这两步。

指定交换机:

  1. //3.指定交换机
  2. channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
  1. 第一个参数:指定的交换机
  2. 第二个参数:交换机的类型(有三种,这里是FANOT模式,也就是发布订阅模式)
  3. 第三个参数:数据是否可持久化

交换机和队列绑定:

  1. //5.交换机和队列进行绑定(两个队列绑定同一个交换机)
  2. channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
  3. channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
  • 第一个参数:绑定的队列是谁
  • 第二个参数:要绑定的交换机是谁
  • 第三个参数:routingkey,也就是路由规则。发布订阅模式为null

2)消费者

这里的消费者和简单模式一样。并且生产者已经绑定好了交换机和队列的关系,所以这里无须指定,直接使用队列即可。

  1. public class Consumer1 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.建立连接
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost(Constants.HOST);//主机ip
  6. factory.setPort(Constants.PORT);//端口号
  7. factory.setUsername(Constants.USER_NAME);//用户名
  8. factory.setPassword(Constants.PASSWORD);//用户密码
  9. factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
  10. Connection connection = factory.newConnection();
  11. //2.开启信道
  12. Channel channel = connection.createChannel();
  13. //3.指定队列(使用默认的交换机)
  14. channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
  15. //4.消费消息
  16. Consumer consumer = new DefaultConsumer(channel) {
  17. @Override
  18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  19. System.out.println("接收到的消息:"+new String(body));
  20. }
  21. };
  22. channel.basicConsume(Constants.WORK_QUEUE, true, consumer);//消费消息
  23. //5.关闭连接
  24. channel.close();
  25. connection.close();
  26. }
  27. }

另一个消费者的代码也一样。

结果:

队列中的消息,两个消费者共享,也就是每个消费者都会有一份。

4.路由模式

路由模式就是发布订阅模式的变种,指定路由的方式。

(1)工作模式图

(2)代码写法

1)生产者

  1. public class Produce {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.建立连接
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost(Constants.HOST);//主机ip
  6. factory.setPort(Constants.PORT);//端口号
  7. factory.setUsername(Constants.USER_NAME);//用户名
  8. factory.setPassword(Constants.PASSWORD);//用户密码
  9. factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
  10. Connection connection = factory.newConnection();
  11. //2.开启信道
  12. Channel channel = connection.createChannel();
  13. //3.指定交换机
  14. channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT,true);
  15. //4.指定队列
  16. channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
  17. channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
  18. //5.绑定路由关系
  19. channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");
  20. channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"a");
  21. channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"b");
  22. channel.queueBind(Constants.DIRECT_QUEUE2,Constants.DIRECT_EXCHANGE,"c");
  23. //6.发送消息
  24. String msg = "hello direct, my routingkey is a....";
  25. channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg.getBytes());
  26. String msg_b = "hello direct, my routingkey is b....";
  27. channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());
  28. String msg_c = "hello direct, my routingkey is c....";
  29. channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());
  30. System.out.println("消息发送成功");
  31. //7. 释放资源
  32. channel.close();
  33. connection.close();
  34. }
  35. }

根据指定的路由规则,就可以把特定的消息发送到对应的队列中。

2)消费者

  1. public class Consumer2 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.建立连接
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost(Constants.HOST);//主机ip
  6. factory.setPort(Constants.PORT);//端口号
  7. factory.setUsername(Constants.USER_NAME);//用户名
  8. factory.setPassword(Constants.PASSWORD);//用户密码
  9. factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
  10. Connection connection = factory.newConnection();
  11. //2.开启信道
  12. Channel channel = connection.createChannel();
  13. //3.声明队列
  14. channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
  15. //4.消费消息
  16. DefaultConsumer consumer = new DefaultConsumer(channel) {
  17. @Override
  18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  19. System.out.println("接收到的消息:"+new String(body));
  20. }
  21. };
  22. channel.basicConsume(Constants.DIRECT_QUEUE2,true,consumer);
  23. }
  24. }

3)结果

5.通配符模式

这个也是发布订阅模式的变种

(2)代码写法

1)生产者

  1. public class Produce {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.建立连接
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost(Constants.HOST);//主机ip
  6. factory.setPort(Constants.PORT);//端口号
  7. factory.setUsername(Constants.USER_NAME);//用户名
  8. factory.setPassword(Constants.PASSWORD);//用户密码
  9. factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
  10. Connection connection = factory.newConnection();
  11. //2.开启信道
  12. Channel channel = connection.createChannel();
  13. //3.声明交换机
  14. channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC,true);
  15. //4.声明队列
  16. channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);
  17. channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);
  18. //5.绑定交换机和队列
  19. channel.queueBind(Constants.TOPIC_QUEUE1,Constants.TOPIC_EXCHANGE,"*.a.*");
  20. channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"*.*.b");
  21. channel.queueBind(Constants.TOPIC_QUEUE2,Constants.TOPIC_EXCHANGE,"c.#");
  22. //6.发送消息
  23. String msg = "hello topic, my routingkey is ae.a.f....";
  24. channel.basicPublish(Constants.TOPIC_EXCHANGE,"ae.a.f", null, msg.getBytes()); //转发到Q1
  25. String msg_b = "hello topic, my routingkey is ef.a.b....";
  26. channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b", null, msg_b.getBytes()); //转发到Q1和Q2
  27. String msg_c = "hello topic, my routingkey is c.ef.d....";
  28. channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.ef.d", null, msg_c.getBytes());//转发Q2
  29. System.out.println("消息发送成功");
  30. //7. 释放资源
  31. channel.close();
  32. connection.close();
  33. }
  34. }

2)消费者

  1. public class Consumer1 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.建立连接
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost(Constants.HOST);//主机ip
  6. factory.setPort(Constants.PORT);//端口号
  7. factory.setUsername(Constants.USER_NAME);//用户名
  8. factory.setPassword(Constants.PASSWORD);//用户密码
  9. factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
  10. Connection connection = factory.newConnection();
  11. //2.开启信道
  12. Channel channel = connection.createChannel();
  13. //3.声明队列
  14. channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);
  15. //4.消费消息
  16. DefaultConsumer consumer = new DefaultConsumer(channel) {
  17. @Override
  18. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  19. System.out.println("接收到的消息:"+new String(body));
  20. }
  21. };
  22. channel.basicConsume(Constants.TOPIC_QUEUE1,true,consumer);
  23. }
  24. }

另一个消费者负责消费队列2中的消息

3)结果

6.RPC模式

这个模式分为一个请求队列和响应队列,客户端把请求发送到请求队列中,服务器会根据请求队列中的请求,把对应的响应(带有编号)放入响应队列中,客户端取出即可。

**(1)工作模式图 **

(2)代码写法

客户端:

  1. public class RpcClient {
  2. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  3. //1.建立连接
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost(Constants.HOST);//主机ip
  6. factory.setPort(Constants.PORT);//端口号
  7. factory.setUsername(Constants.USER_NAME);//用户名
  8. factory.setPassword(Constants.PASSWORD);//用户密码
  9. factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
  10. Connection connection = factory.newConnection();
  11. //2.开启信道声明对应
  12. Channel channel = connection.createChannel();
  13. channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
  14. channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
  15. //3.发送请求(放入请求队列)
  16. //3.1.构造消息体
  17. String msg = "我是一条rpc请求消息";
  18. //3.2.消息唯一标识符(序号)
  19. String correlationID = UUID.randomUUID().toString();
  20. //3.3.设置消息的属性
  21. AMQP.BasicProperties props = new AMQP.BasicProperties()
  22. .builder()
  23. .correlationId(correlationID)//消息标识符
  24. .replyTo(Constants.RPC_RESPONSE_QUEUE)//响应放回的队列
  25. .build();
  26. channel.basicPublish("",Constants.RPC_REQUEST_QUEUE,props,msg.getBytes());
  27. //4.接收响应(从响应队列获取)
  28. final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
  29. DefaultConsumer consumer = new DefaultConsumer(channel) {
  30. @Override
  31. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  32. String msg = new String(body);
  33. System.out.println("接收到回调消息"+msg);
  34. //放入阻塞队列中
  35. if(correlationID.equals(properties.getCorrelationId())) {
  36. response.offer(msg);
  37. }
  38. }
  39. };
  40. channel.basicConsume(Constants.RPC_RESPONSE_QUEUE,true,consumer);
  41. //5.从阻塞队列中取出响应
  42. String result = response.take();
  43. System.out.println("接收到的响应:[]"+result);
  44. }
  45. }

服务端:

  1. public class RpcServer {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1.建立连接
  4. ConnectionFactory factory = new ConnectionFactory();
  5. factory.setHost(Constants.HOST);//主机ip
  6. factory.setPort(Constants.PORT);//端口号
  7. factory.setUsername(Constants.USER_NAME);//用户名
  8. factory.setPassword(Constants.PASSWORD);//用户密码
  9. factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
  10. Connection connection = factory.newConnection();
  11. //2.开启信道
  12. Channel channel = connection.createChannel();
  13. //3.接受请求并返回响应
  14. channel.basicQos(1);//每次接受一个请求
  15. DefaultConsumer consumer = new DefaultConsumer(channel) {
  16. @Override
  17. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  18. String request = new String(body,"UTF-8");
  19. System.out.println("接收到请求:"+ request);
  20. String response = "针对request:"+ request +", 响应成功";
  21. AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
  22. .correlationId(properties.getCorrelationId())
  23. .build();
  24. channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());//返回响应
  25. channel.basicAck(envelope.getDeliveryTag(), false);//手动确认
  26. }
  27. };
  28. channel.basicConsume(Constants.RPC_REQUEST_QUEUE,false,consumer);
  29. }
  30. }

7.发布确认模式

背景:

所以发布确认模式是针对生产者的,确定消息已经发送出去,如果没收到,会让发送方重新发送。

和前面的模式大同小异,这里需要将信道设置为确认模式,也需要指定队列。

发布确认模式有三种策略:单独确认、批量确认、异步确认。

(1)单独确认

  1. private static final Integer MAX_COUNT = 100;
  2. /**
  3. * 建立连接
  4. *
  5. * @return 连接
  6. */
  7. static Connection createConnection() throws Exception{
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost(Constants.HOST);//主机ip
  10. factory.setPort(Constants.PORT);//端口号
  11. factory.setUsername(Constants.USER_NAME);//用户名
  12. factory.setPassword(Constants.PASSWORD);//用户密码
  13. factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
  14. return factory.newConnection();
  15. }
  16. /**
  17. * 单独确认
  18. */
  19. private static void publishingMessageIndividually() throws Exception {
  20. try(Connection connection = createConnection()) {
  21. //1.开启信道
  22. Channel channel = connection.createChannel();
  23. //2.设置信道类型(确认模式)
  24. channel.confirmSelect();
  25. //3.声明队列
  26. channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE1,true,false,false,null);
  27. //4.发送消息
  28. long start = System.currentTimeMillis();
  29. for(int i=0;i<MAX_COUNT;i++) {
  30. String msg = "Message #" + i;
  31. channel.basicPublish("",Constants.PUBLISH_CONFIRMS_QUEUE1,null,msg.getBytes());//发送消息
  32. channel.waitForConfirmsOrDie(5000);//等待确认,超过5000ms报错
  33. }
  34. long end = System.currentTimeMillis();
  35. System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MAX_COUNT, end-start);
  36. }
  37. }

(2)批量确认

  1. private static final Integer MAX_COUNT = 100;
  2. /**
  3. * 建立连接
  4. *
  5. * @return 连接
  6. */
  7. static Connection createConnection() throws Exception{
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost(Constants.HOST);//主机ip
  10. factory.setPort(Constants.PORT);//端口号
  11. factory.setUsername(Constants.USER_NAME);//用户名
  12. factory.setPassword(Constants.PASSWORD);//用户密码
  13. factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
  14. return factory.newConnection();
  15. }
  16. /**
  17. * 批量确认
  18. */
  19. private static void publishingMessageInBatches() throws Exception {
  20. try(Connection connection = createConnection()) {
  21. //1.开启信道
  22. Channel channel = connection.createChannel();
  23. //2.设置信道类型(确认模式)
  24. channel.confirmSelect();
  25. //3.声明队列
  26. channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE2,true,false,false,null);
  27. //4.发送消息
  28. long start = System.currentTimeMillis();
  29. int size = 100;//确定消息的个数
  30. int outstandingMessageCount = 0;//计数
  31. for(int i=0;i<MAX_COUNT;i++) {
  32. String msg = "Message #" + i;
  33. channel.basicPublish("",Constants.PUBLISH_CONFIRMS_QUEUE2,null,msg.getBytes());//发送消息
  34. outstandingMessageCount++;
  35. if(outstandingMessageCount == size) {
  36. channel.waitForConfirmsOrDie(5000);
  37. outstandingMessageCount = 0;
  38. }
  39. }
  40. if(outstandingMessageCount > 0) {
  41. channel.waitForConfirmsOrDie(5000);
  42. }
  43. long end = System.currentTimeMillis();
  44. System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MAX_COUNT, end-start);
  45. }
  46. }

(3)异步确认

  1. private static final Integer MAX_COUNT = 100;
  2. /**
  3. * 建立连接
  4. *
  5. * @return 连接
  6. */
  7. static Connection createConnection() throws Exception{
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost(Constants.HOST);//主机ip
  10. factory.setPort(Constants.PORT);//端口号
  11. factory.setUsername(Constants.USER_NAME);//用户名
  12. factory.setPassword(Constants.PASSWORD);//用户密码
  13. factory.setVirtualHost(Constants.VIRTUAL_HOST);//虚拟主机
  14. return factory.newConnection();
  15. }
  16. /**
  17. * 异步确认
  18. */
  19. private static void handlingPublisherConfirmsAsynchronously() throws Exception {
  20. //可以一遍发送消息一边进行确认
  21. try(Connection connection = createConnection()) {
  22. //1.开启信道
  23. Channel channel = connection.createChannel();
  24. //2.设置确认模式
  25. channel.confirmSelect();
  26. //3.指定队列
  27. channel.queueDeclare(Constants.PUBLISH_CONFIRMS_QUEUE3,true,false,false,null);
  28. //4.监听队列
  29. long start = System.currentTimeMillis();
  30. SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());//用来存储未被确认的消息序号
  31. channel.addConfirmListener(new ConfirmListener() {
  32. @Override
  33. public void handleAck(long l, boolean b) throws IOException {
  34. //收到ack
  35. if(b) {
  36. //批量确认
  37. confirmSeqNo.headSet(l+1).clear();
  38. }else {
  39. //单独确认
  40. confirmSeqNo.remove(l);//移除id为l的消息
  41. }
  42. }
  43. @Override
  44. public void handleNack(long l, boolean b) throws IOException {
  45. //未收到ack
  46. if(b) {
  47. confirmSeqNo.headSet(l+1).clear();
  48. }else {
  49. confirmSeqNo.remove(l);
  50. }
  51. //
  52. }
  53. });
  54. //5.发送消息
  55. for(int i=0;i<MAX_COUNT;i++) {
  56. String msg = "Message #" + i;
  57. long seqNo = channel.getNextPublishSeqNo();//获取序号
  58. channel.basicPublish("",Constants.PUBLISH_CONFIRMS_QUEUE3,null,msg.getBytes());
  59. confirmSeqNo.add(seqNo);//存储序号
  60. }
  61. long end = System.currentTimeMillis();
  62. System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MAX_COUNT, end-start);
  63. }
  64. }

本文转载自: https://blog.csdn.net/2301_77053417/article/details/143039795
版权归原作者 代码小娥 所有, 如有侵权,请联系我们删除。

“RabbitMQ的七种工作模式代码介绍”的评论:

还没有评论