0


RabbitMQ应用

1. 7种工作模式介绍

1.1 Simple(简单模式)

  • P: ⽣产者,也就是要发送消息的程序
  • C: 消费者,消息的接收者
  • Queue: 消息队列(图中⻩⾊背景部分)类似⼀个邮箱,可以缓存消息;⽣产者向其中投递消息,消费者从 其中取出消息

特点:

  • ⼀个⽣产者P,⼀个消费者C, 消息只能被消费⼀次.
  • 也称为点对点(Point-to-Point)模式.

适⽤场景:消息只能被单个消费者处理

1.2 WorkQueue(⼯作队列)

  • ⼀个⽣产者P,多个消费者C1,C2.
  • 在多个消息的情况下,WorkQueue会将消息分派给不同的消费者,每个消费者都会接收到不同的消息.

特点:

消息不会重复,分配给不同的消费者.

适⽤场景:集群环境中做异步处理

1.3 Publish/Subscribe(发布/订阅)

  • X表⽰交换机
  • ⼀个⽣产者P,多个消费者C1,C2,X代表交换机消息复制多份,每个消费者接收相同的消息
  • ⽣产者发送⼀条消息,经过交换机转发到多个不同的队列,多个不同的队列就有多个不同的消费者

适合场景:消息需要被多个消费者同时接收的场景.如:实时通知或者⼴播消息

Exchange:

作⽤:

⽣产者将消息发送到Exchange,由交换机将消息按⼀定规则路由到⼀个或多个队列中(上图中⽣产者将消息投递到队列中,实际上这个在RabbitMQ中不会发⽣.)

RabbitMQ交换机有四种类型:fanout,direct,topic,headers,不同类型有着不同的路由策略:

①Fanout:⼴播,将消息交给所有绑定到交换机的队列(Publish/Subscribe模式)

②Direct:定向,把消息交给符合指定routingkey的队列(Routing模式)

③Topic:通配符,把消息交给符合routingpattern(路由模式)的队列(Topics模式)

④headers类型的交换器不依赖于路由键的匹配规则来路由消息,⽽是根据发送的消息内容中的 headers属性进⾏匹配.headers类型的交换器性能会很差,⽽且也不实⽤,基本上不会看到它的存在.

  • Exchange(交换机)只负责转发消息,不具备存储消息的能⼒,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息就会丢失
  • RoutingKey: 路由键.⽣产者将消息发给交换器时,指定的⼀个字符串,⽤来告诉交换机应该如何处理这个消息.
  • BindingKey:绑定.RabbitMQ中通过Binding(绑定)将交换器与队列关联起来,在绑定的时候⼀般会指定⼀个BindingKey, 这样RabbitMQ就知道如何正确地将消息路由到队列了.
  • 在使⽤绑定的时候,需要的路由键是BindingKey.
  • 在发送消息的时候,需要的路由键是RoutingKey.

1.4 Routing(路由模式)

  • 路由模式是发布订阅模式的变种,在发布订阅基础上,增加路由key
  • Exchange根据RoutingKey的规则, 将数据筛选后发给对应的消费者队列

适合场景:需要根据特定规则分发消息的场景.

1.5 Topics(通配符模式)

Topics和Routing的基本原理相同

即:⽣产者将消息发给交换机,交换机根据RoutingKey将消息转发给与RoutingKey匹配的队列

不同之处是:routingKey的匹配⽅式不同,Routing模式是相等匹配,topics模式是通配符匹配.

适合场景:需要灵活匹配和过滤消息的场景

在topic类型的交换机在匹配规则上,有些要求:

①RoutingKey是⼀系列由点( . )分隔的单词,⽐如" " q uick.orange.rabbit " stock.usd.nyse ","

②BindingKey 和RoutingKey⼀样,也是点( . )分割的字符串.

③Binding Key中可以存在两种特殊字符串,⽤于模糊匹配

  • *表⽰⼀个单词
  • #表⽰多个单词(0-N个)

比如:

• BindingKey为"d.a.b"会同时路由到Q1和Q2

• BindingKey为"d.a.f"会路由到Q1

1.6 RPC(RPC通信)

  • 客⼾端发送消息到⼀个指定的队列,并在消息属性中设置replyTo字段,这个字段指定了⼀个回调队 列,⽤于接收服务端的响应.
  • 服务端接收到请求后,处理请求并发送响应消息到replyTo指定的回调队列
  • 客⼾端在回调队列上等待响应消息.⼀旦收到响应,客⼾端会检查消息的correlationId属性,以确保它是所期望的响应.

1.7 Publisher Confirms(发布确认)

Publisher Confirms模式是RabbitMQ提供的⼀种确保消息可靠发送到RabbitMQ服务器的机制。

在这种模式下,⽣产者可以等待RabbitMQ服务器的确认,以确保消息已经被服务器接收并处理.

  • ⽣产者将Channel设置为confirm模式(通过调⽤channel.confirmSelect()完成)后,发布的每⼀条消 息都会获得⼀个唯⼀的ID,⽣产者可以将这些序列号与消息关联起来,以便跟踪消息的状态.
  • 当消息被RabbitMQ服务器接收并处理后,服务器会异步地向⽣产者发送⼀个确认(ACK)给⽣产者 (包含消息的唯⼀ID),表明消息已经送达.

通过PublisherConfirms模式,⽣产者可以确保消息被RabbitMQ服务器成功接收,从⽽避免消息丢失 的问题.

适⽤场景:对数据安全性要求较⾼的场景.⽐如⾦融交易,订单处理

2. 使用案例

  1. public class Constants {
  2. public static final String HOST = "119.91.154.99";
  3. public static final int PORT = 5672;
  4. public static final String USER_NAME = "xuexue";
  5. public static final String PASSWORD = "xuexue";
  6. public static final String VIRTUAL_HOST = "bit";
  7. //工作队列模式
  8. public static final String WORK_QUEUE = "work.queue";
  9. //发布订阅模式
  10. public static final String FANOUT_EXCHANGE = "fanout.exchange";
  11. public static final String FANOUT_QUEUE1 = "fanout.queue1";
  12. public static final String FANOUT_QUEUE2 = "fanout.queue2";
  13. //路由模式
  14. public static final String DIRECT_EXCHANGE = "direct.exchange";
  15. public static final String DIRECT_QUEUE1 = "direct.queue1";
  16. public static final String DIRECT_QUEUE2 = "direct.queue2";
  17. //通配符模式
  18. public static final String TOPIC_EXCHANGE = "topic.exchange";
  19. public static final String TOPIC_QUEUE1 = "topic_queue1";
  20. public static final String TOPIC_QUEUE2 = "topic_queue2";
  21. //rpc 模式
  22. public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
  23. public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";
  24. //publisher confirms
  25. public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1";
  26. public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2";
  27. public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirms.queue3";
  28. //推拉模式
  29. public static final String MESSAGE_QUEUE = "message.queue";
  30. }

2.1 简单模式

2.2 WorkQueues(⼯作队列)

生产者:

  1. public class Producer {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1. 建立连接
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(Constants.HOST);
  6. connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  7. connectionFactory.setUsername(Constants.USER_NAME);//账号
  8. connectionFactory.setPassword(Constants.PASSWORD); //密码
  9. connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  10. Connection connection = connectionFactory.newConnection();
  11. //2. 开启信道
  12. Channel channel = connection.createChannel();
  13. //3. 声明队列 使用内置的交换机
  14. //如果队列不存在, 则创建, 如果队列存在, 则不创建
  15. channel.queueDeclare(Constants.WORK_QUEUE,true,false,false,null);
  16. //4. 发送消息
  17. for (int i = 0; i < 10; i++) {
  18. String msg = "hello work queue...."+i;
  19. channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());
  20. }
  21. System.out.println("消息发送成功~");
  22. //5. 资源释放
  23. channel.close();
  24. connection.close();
  25. }
  26. }

消费者(两个一样):

  1. public class Consumer1 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1. 建立连接
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(Constants.HOST);
  6. connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  7. connectionFactory.setUsername(Constants.USER_NAME);//账号
  8. connectionFactory.setPassword(Constants.PASSWORD); //密码
  9. connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  10. Connection connection = connectionFactory.newConnection();
  11. //2. 开启信道
  12. Channel channel = connection.createChannel();
  13. //3. 声明队列 使用内置的交换机
  14. //如果队列不存在, 则创建, 如果队列存在, 则不创建
  15. channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);
  16. //4. 消费消息
  17. DefaultConsumer consumer = new DefaultConsumer(channel){
  18. //从队列中收到消息, 就会执行的方法
  19. @Override
  20. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  21. System.out.println("接收到消息:"+ new String(body));
  22. }
  23. };
  24. //默认交换机,RoutingKey=队列名称
  25. channel.basicConsume(Constants.WORK_QUEUE, true, consumer);
  26. //5. 资源释放
  27. channel.close();
  28. connection.close();
  29. }
  30. }

2.3 Publish/Subscribe(发布/订阅)

生产者:

  1. public class Producer {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1. 建立连接
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(Constants.HOST);
  6. connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  7. connectionFactory.setUsername(Constants.USER_NAME);//账号
  8. connectionFactory.setPassword(Constants.PASSWORD); //密码
  9. connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  10. Connection connection = connectionFactory.newConnection();
  11. //2. 开启信道
  12. Channel channel = connection.createChannel();
  13. //3.声明交换机
  14. channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT,true);
  15. //4. 声明队列
  16. channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
  17. channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
  18. //5.交换机和队列绑定
  19. channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
  20. channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");
  21. //6. 发布消息
  22. String msg = "hello fanout....";
  23. channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());
  24. System.out.println("消息发送成功");
  25. //7. 释放资源
  26. channel.close();
  27. connection.close();
  28. }
  29. }

消费者:

  1. public class Consumer1 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1. 建立连接
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(Constants.HOST);
  6. connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  7. connectionFactory.setUsername(Constants.USER_NAME);//账号
  8. connectionFactory.setPassword(Constants.PASSWORD); //密码
  9. connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  10. Connection connection = connectionFactory.newConnection();
  11. //2. 开启信道
  12. Channel channel = connection.createChannel();
  13. //3. 声明队列
  14. channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
  15. //4. 消费消息
  16. DefaultConsumer consumer = new DefaultConsumer(channel){
  17. //从队列中收到消息, 就会执行的方法
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20. System.out.println("接收到消息:"+ new String(body));
  21. }
  22. };
  23. channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);
  24. }
  25. }
  1. public class Consumer2 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1. 建立连接
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(Constants.HOST);
  6. connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  7. connectionFactory.setUsername(Constants.USER_NAME);//账号
  8. connectionFactory.setPassword(Constants.PASSWORD); //密码
  9. connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  10. Connection connection = connectionFactory.newConnection();
  11. //2. 开启信道
  12. Channel channel = connection.createChannel();
  13. //3. 声明队列
  14. channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);
  15. //4. 消费消息
  16. DefaultConsumer consumer = new DefaultConsumer(channel){
  17. //从队列中收到消息, 就会执行的方法
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20. System.out.println("接收到消息:"+ new String(body));
  21. }
  22. };
  23. channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);
  24. }
  25. }

2.4 Routing(路由模式)

生产者:

  1. public class Producer {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1. 建立连接
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(Constants.HOST);
  6. connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  7. connectionFactory.setUsername(Constants.USER_NAME);//账号
  8. connectionFactory.setPassword(Constants.PASSWORD); //密码
  9. connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  10. Connection connection = connectionFactory.newConnection();
  11. //2. 开启信道
  12. Channel channel = connection.createChannel();
  13. //3. 声明交换机
  14. channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
  15. //4. 声明队列
  16. channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
  17. channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
  18. //5. 绑定交换机和队列
  19. channel.queueBind(Constants.DIRECT_QUEUE1,Constants.DIRECT_EXCHANGE,"a");
  20. channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");
  21. channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");
  22. channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");
  23. //6. 发送消息
  24. String msg = "hello direct, my routingkey is a....";
  25. channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msg.getBytes());
  26. String msg_b = "hello direct, my routingkey is b....";
  27. channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msg_b.getBytes());
  28. String msg_c = "hello direct, my routingkey is c....";
  29. channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msg_c.getBytes());
  30. System.out.println("消息发送成功");
  31. //7. 释放资源
  32. channel.close();
  33. connection.close();
  34. }
  35. }

消费者:

  1. public class Consumer1 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1. 建立连接
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(Constants.HOST);
  6. connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  7. connectionFactory.setUsername(Constants.USER_NAME);//账号
  8. connectionFactory.setPassword(Constants.PASSWORD); //密码
  9. connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  10. Connection connection = connectionFactory.newConnection();
  11. //2. 开启信道
  12. Channel channel = ((Connection) connection).createChannel();
  13. //3. 声明队列
  14. channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);
  15. //4. 消费消息
  16. DefaultConsumer consumer = new DefaultConsumer(channel){
  17. //从队列中收到消息, 就会执行的方法
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20. System.out.println("接收到消息:"+ new String(body));
  21. }
  22. };
  23. channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);
  24. }
  25. }
  1. public class Consumer2 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1. 建立连接
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(Constants.HOST);
  6. connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  7. connectionFactory.setUsername(Constants.USER_NAME);//账号
  8. connectionFactory.setPassword(Constants.PASSWORD); //密码
  9. connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  10. Connection connection = connectionFactory.newConnection();
  11. //2. 开启信道
  12. Channel channel = ((Connection) connection).createChannel();
  13. //3. 声明队列
  14. channel.queueDeclare(Constants.DIRECT_QUEUE2,true,false,false,null);
  15. //4. 消费消息
  16. DefaultConsumer consumer = new DefaultConsumer(channel){
  17. //从队列中收到消息, 就会执行的方法
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20. System.out.println("接收到消息:"+ new String(body));
  21. }
  22. };
  23. channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);
  24. }
  25. }

2.5 Topics(通配符模式)

生产者:

  1. public class Producer {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1. 建立连接
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(Constants.HOST);
  6. connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  7. connectionFactory.setUsername(Constants.USER_NAME);//账号
  8. connectionFactory.setPassword(Constants.PASSWORD); //密码
  9. connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  10. Connection connection = connectionFactory.newConnection();
  11. //2. 开启信道
  12. Channel channel = ((Connection) connection).createChannel();
  13. //3. 声明交换机
  14. channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
  15. //4. 声明队列
  16. channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
  17. channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
  18. //5. 绑定交换机和队列
  19. channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");
  20. channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");
  21. channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");
  22. //6. 发送消息
  23. String msg = "hello topic, my routingkey is ae.a.f....";
  24. channel.basicPublish(Constants.TOPIC_EXCHANGE,"ae.a.f", null, msg.getBytes()); //转发到Q1
  25. String msg_b = "hello topic, my routingkey is ef.a.b....";
  26. channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b", null, msg_b.getBytes()); //转发到Q1和Q2
  27. String msg_c = "hello topic, my routingkey is c.ef.d....";
  28. channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.ef.d", null, msg_c.getBytes());//转发Q2
  29. System.out.println("消息发送成功");
  30. //7. 释放资源
  31. channel.close();
  32. connection.close();
  33. }
  34. }

消费者:

  1. public class Consumer1 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1. 建立连接
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(Constants.HOST);
  6. connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  7. connectionFactory.setUsername(Constants.USER_NAME);//账号
  8. connectionFactory.setPassword(Constants.PASSWORD); //密码
  9. connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  10. Connection connection = connectionFactory.newConnection();
  11. //2. 开启信道
  12. Channel channel = connection.createChannel();
  13. //3. 声明队列
  14. channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);
  15. //4. 消费消息
  16. DefaultConsumer consumer = new DefaultConsumer(channel){
  17. //从队列中收到消息, 就会执行的方法
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20. System.out.println("接收到消息:"+ new String(body));
  21. }
  22. };
  23. channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);
  24. }
  25. }
  1. public class Consumer2 {
  2. public static void main(String[] args) throws IOException, TimeoutException {
  3. //1. 建立连接
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(Constants.HOST);
  6. connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  7. connectionFactory.setUsername(Constants.USER_NAME);//账号
  8. connectionFactory.setPassword(Constants.PASSWORD); //密码
  9. connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  10. Connection connection = connectionFactory.newConnection();
  11. //2. 开启信道
  12. Channel channel = connection.createChannel();
  13. //3. 声明队列
  14. channel.queueDeclare(Constants.TOPIC_QUEUE2,true,false,false,null);
  15. //4. 消费消息
  16. DefaultConsumer consumer = new DefaultConsumer(channel){
  17. //从队列中收到消息, 就会执行的方法
  18. @Override
  19. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  20. System.out.println("接收到消息:"+ new String(body));
  21. }
  22. };
  23. channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);
  24. }
  25. }

2.6 RPC(RPC通信)

客户端:

  1. /**
  2. * 1.发送请求
  3. * 2.接收响应
  4. */
  5. public class Client {
  6. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException, IOException, TimeoutException {
  7. //1. 建立连接
  8. ConnectionFactory connectionFactory = new ConnectionFactory();
  9. connectionFactory.setHost(Constants.HOST);
  10. connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  11. connectionFactory.setUsername(Constants.USER_NAME);//账号
  12. connectionFactory.setPassword(Constants.PASSWORD); //密码
  13. connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  14. Connection connection = connectionFactory.newConnection();
  15. //2. 开启信道
  16. Channel channel = connection.createChannel();
  17. //3.声明队列
  18. channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
  19. channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);
  20. //4. 发送请求
  21. String msg = "hello rpc...";
  22. //设置请求的唯一标识
  23. String correlationID = UUID.randomUUID().toString();
  24. //设置请求的相关属性
  25. AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
  26. .correlationId(correlationID)
  27. .replyTo(Constants.RPC_RESPONSE_QUEUE)
  28. //指定了⼀个回调队列,服务端处理后,会把响应结果发送到这个队列
  29. .build();
  30. //默认交换机,RoutingKey=队列名称
  31. //props属性
  32. channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());
  33. //4. 接收响应(校验ID)
  34. //使用阻塞队列, 来存储响应信息
  35. //不适用阻塞队列,很快就接收响应,但是响应还没有传过来
  36. final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
  37. DefaultConsumer consumer = new DefaultConsumer(channel){
  38. @Override
  39. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  40. String respMsg = new String(body);
  41. System.out.println("接收到回调消息:"+respMsg);
  42. if (correlationID.equals(properties.getCorrelationId())){
  43. //如果唯⼀标识正确, 放到阻塞队列中
  44. response.offer(respMsg);
  45. }
  46. }
  47. };
  48. channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
  49. //获取回调的结果
  50. String result = response.take();
  51. System.out.println("[RPC Client 响应结果]:"+ result);
  52. }
  53. }

服务端:

  1. /**
  2. * 1.接收请求
  3. * 2.发送响应
  4. */
  5. public class Server {
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. //1. 建立连接
  8. ConnectionFactory connectionFactory = new ConnectionFactory();
  9. connectionFactory.setHost(Constants.HOST);
  10. connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  11. connectionFactory.setUsername(Constants.USER_NAME);//账号
  12. connectionFactory.setPassword(Constants.PASSWORD); //密码
  13. connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  14. Connection connection = connectionFactory.newConnection();
  15. //2. 开启信道
  16. Channel channel = connection.createChannel();
  17. //3. 接收请求
  18. //设置服务端同时最多只能获取⼀个消息
  19. channel.basicQos(1);
  20. DefaultConsumer consumer = new DefaultConsumer(channel) {
  21. @Override
  22. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  23. //将请求转化为string
  24. String request = new String(body, "UTF-8");
  25. System.out.println("接收到请求:" + request);
  26. //响应
  27. String response = "针对request:" + request + ", 响应成功";
  28. AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
  29. .correlationId(properties.getCorrelationId())
  30. .build();
  31. channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());
  32. //消息应答
  33. channel.basicAck(envelope.getDeliveryTag(), false);
  34. }
  35. };
  36. //false:手动确认
  37. channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
  38. }
  39. }

RabbitMQ消息确定机制

在RabbitMQ中,basicConsume⽅法的autoAck参数⽤于指定消费者是否应该⾃动向消息队列确认 消息

  • ⾃动确认(autoAck=true):消息队列在将消息发送给消费者后,会⽴即从内存中删除该消息.这意味着,如果消费者处理消息失败,消息将丢失,因为消息队列认为消息已经被成功消费
  • ⼿动确认(autoAck=false):消息队列在将消息发送给消费者后,需要消费者显式地调⽤basicAck ⽅法来确认消息.⼿动确认提供了更⾼的可靠性,确保消息不会被意外丢失,适⽤于消息处理重要且需 要确保每个消息都被正确处理的场景

2.7 Publisher Confirms(发布确认)

消息丢失⼤概分为三种情况:

  1. ⽣产者问题.因为应⽤程序故障,⽹络抖动等各种原因,⽣产者没有成功向broker发送消息.

  2. 消息中间件⾃⾝问题.⽣产者成功发送给了Broker,但是Broker没有把消息保存好,导致消息丢失.

  3. 消费者问题.Broker发送消息到消费者,消费者在消费消息时,因为没有处理好,导致broker将消费 失败的消息从队列中删除了.


针对问题1,可以采⽤发布确认(PublisherConfirms)机制实现:

⽣产者将信道设置成confirm(确认)模式,⼀旦信道进⼊confirm模式,所有在该信道上⾯发布的消息都 会被指派⼀个唯⼀的ID(从1开始),⼀旦消息被投递到所有匹配的队列之后,RabbitMQ就会发送⼀个确认给⽣产者(包含消息的唯⼀ID)

这就使得⽣产者知道消息已经正确到达⽬的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写⼊磁盘之后发出

broker回传给⽣产者的确认消息中deliveryTag 包含了确认消息的序号,此外broker也可以设置channel.basicAck⽅法中的multiple参数,表⽰到这个序号之前的所有消息都已经得到了处理

发送⽅确认机制最⼤的好处在于它是异步的,⽣产者可以同时发布消息和等待信道返回确认消息.

当消息最终得到确认之后,⽣产者可以通过回调⽅法来处理该确认消息.

如果RabbitMQ因为⾃⾝内部错误导致消息丢失,就会发送⼀条nack(Basic.Nack)命令,⽣产者同样可以在回调⽅法中处理该nack命令.

  1. public class PublisherConfirms {
  2. private static final Integer MESSAGE_COUNT = 100;
  3. static Connection createConnection() throws Exception {
  4. ConnectionFactory connectionFactory = new ConnectionFactory();
  5. connectionFactory.setHost(Constants.HOST);
  6. connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
  7. connectionFactory.setUsername(Constants.USER_NAME);//账号
  8. connectionFactory.setPassword(Constants.PASSWORD); //密码
  9. connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
  10. return connectionFactory.newConnection();
  11. }
  12. public static void main(String[] args) throws Exception {
  13. //Strategy #1: Publishing Messages Individually
  14. //单独确认
  15. //publishingMessagesIndividually();
  16. //Strategy #2: Publishing Messages in Batches
  17. //批量确认
  18. //publishingMessagesInBatches();
  19. //Strategy #3: Handling Publisher Confirms Asynchronously
  20. //异步确认
  21. handlingPublisherConfirmsAsynchronously();
  22. }
  23. /**
  24. * 异步确认
  25. */
  26. private static void handlingPublisherConfirmsAsynchronously() throws Exception {
  27. try (Connection connection = createConnection()){
  28. //1. 开启信道
  29. Channel channel = connection.createChannel();
  30. //2. 设置信道为confirm模式
  31. channel.confirmSelect();
  32. //3. 声明队列
  33. channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);
  34. //4. 监听confirm
  35. //创建有序集合.中存储的是未确认的消息ID
  36. long start = System.currentTimeMillis();
  37. SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());
  38. channel.addConfirmListener(new ConfirmListener() {
  39. @Override
  40. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  41. if (multiple){
  42. //<=都清除
  43. //headSet返回<n的集合,但是这条也要被删除
  44. confirmSeqNo.headSet(deliveryTag+1).clear();
  45. }else {
  46. confirmSeqNo.remove(deliveryTag);
  47. }
  48. }
  49. @Override
  50. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  51. if (multiple){
  52. confirmSeqNo.headSet(deliveryTag+1).clear();
  53. }else {
  54. confirmSeqNo.remove(deliveryTag);
  55. }
  56. //业务需要根据实际场景进行处理, 比如重发, 此处代码省略
  57. }
  58. });
  59. //5.发送消息
  60. for (int i = 0; i < MESSAGE_COUNT; i++) {
  61. String msg = "hello publisher confirms"+i;
  62. //获取要发送消息的序号
  63. long seqNo = channel.getNextPublishSeqNo();
  64. channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());
  65. //放
  66. confirmSeqNo.add(seqNo);
  67. }
  68. while (!confirmSeqNo.isEmpty()){
  69. Thread.sleep(10);
  70. }
  71. long end = System.currentTimeMillis();
  72. System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
  73. }
  74. }
  75. /**
  76. * 批量确认
  77. */
  78. private static void publishingMessagesInBatches() throws Exception{
  79. try(Connection connection = createConnection()) {
  80. //1. 开启信道
  81. Channel channel = connection.createChannel();
  82. //2. 设置信道为confirm模式
  83. channel.confirmSelect();
  84. //3. 声明队列
  85. channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);
  86. //4. 发送消息, 并进行确认
  87. long start = System.currentTimeMillis();
  88. int batchSize = 100;
  89. int outstandingMessageCount = 0;
  90. for (int i = 0; i < MESSAGE_COUNT; i++) {
  91. String msg = "hello publisher confirms"+i;
  92. channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());
  93. outstandingMessageCount++;
  94. if (outstandingMessageCount==batchSize){
  95. channel.waitForConfirmsOrDie(5000);
  96. outstandingMessageCount = 0;
  97. }
  98. }
  99. if (outstandingMessageCount>0){
  100. channel.waitForConfirmsOrDie(5000);
  101. }
  102. long end = System.currentTimeMillis();
  103. System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
  104. }
  105. }
  106. /**
  107. * 单独确认
  108. */
  109. private static void publishingMessagesIndividually() throws Exception {
  110. try (Connection connection = createConnection()){
  111. //1.开启新道
  112. Channel channel = connection.createChannel();
  113. //2.设置信道为confirm模式
  114. channel.confirmSelect();
  115. //3. 声明队列
  116. channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);
  117. //4.发送信息,并等待确认
  118. long start = System.currentTimeMillis();
  119. for(int i =0; i< MESSAGE_COUNT;i++){
  120. String msg = "hello publisher confirms"+i;
  121. channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());
  122. //等待确认
  123. channel.waitForConfirmsOrDie(5000);
  124. }
  125. long end = System.currentTimeMillis();
  126. System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
  127. }
  128. }
  129. }

异步确认:

Channel接⼝提供了⼀个⽅法addConfirmListener.这个⽅法可以添加ConfirmListener回调接⼝

deliveryTag 表⽰发送消息的序号

multiple 表⽰是否批量确认

我们需要为每⼀个Channel维护⼀个已发送消息的序号集合.当收到RabbitMQ的confirm回调时,从集 合中删除对应的消息.当Channel开启confirm模式后,channel上发送消息都会附带⼀个从1开始递增的 deliveryTag序号. 可以使⽤SortedSet的有序性来维护这个已发消息的集合.

  • 当收到ack时,从序列中删除该消息的序号.如果为批量确认消息,表⽰⼩于等于当前序号 deliveryTag的消息都收到了,则清除对应集合
  • 当收到nack时,处理逻辑类似,不过需要结合具体的业务情况,进⾏消息重发等操作.

对比:

消息数越多,异步确认的优势越明显

3. Spring Boot整合RabbitMQ

创建项⽬时, 加⼊依赖:

添加配置:

详情代码看idea

3.1 工作队列模式

声明队列(@Bean交给spring进行管理):

生产者:

消费者:

① Message message

②String message

此处返回的是message的具体内容

channel:

3.2 Publish/Subscribe(发布订阅模式)

声明队列,交换机:

绑定:

这里@Qualifier指定绑定的队列和交换机

发送消息:

3.3 Routing(路由模式)

此时绑定需要指定routingkey

注意:放到路径里,需要使用注解@PathVariable

3.4 Topics(通配符模式)

4. 基于SpringBoot+RabbitMQ完成应⽤通信

需求:

⽤⼾下单成功之后, 通知物流系统, 进⾏发货

订单系统:生产者

物流系统:消费者

创建项目

创建⼀个空的项⽬ rabbitmq-communication(其实就是⼀个空的⽂件夹),将两个项⽬放在⼀个项⽬中

生产者

配置

  1. spring:
  2. rabbitmq:
  3. addresses: amqp://xuexue:xuexue@119.91.154.99:5672/order
  4. server:
  5. port: 9090

声明队列

  1. @Configuration
  2. public class RabbitMQConfig {
  3. @Bean("orderQueue")
  4. public Queue orderQueue(){
  5. return QueueBuilder.durable("order.create").build();
  6. }
  7. }

发送订单消息

  1. @RequestMapping("/order")
  2. @RestController
  3. public class OrderController {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @RequestMapping("/create")
  7. public String create(){
  8. //发送消息
  9. String orderId = UUID.randomUUID().toString();
  10. rabbitTemplate.convertAndSend("","order.create","订单信息,订单ID:"+orderId);
  11. return "下单成功";
  12. }
  13. }

启动服务,观察结果

119.91.154.99:15672

消费者

配置

  1. spring:
  2. rabbitmq:
  3. addresses: amqp://xuexue:xuexue@119.91.154.99:5672/order
  4. server:
  5. port: 8080

监听队列

  1. @Component
  2. public class OrderListener {
  3. @RabbitListener(queues = "order.create")
  4. public void handMessage(String orderInfo){
  5. System.out.println("接收到订单消息:"+orderInfo);
  6. }
  7. }

结果

发送消息格式为对象

如果通过 RabbitTemplate 发送⼀个对象作为消息, 我们需要对该对象进⾏序列化.

Spring AMQP推荐使⽤JSON序列化,Spring AMQP提供了 Jackson2JsonMessageConverter,我们需要把⼀个 MessageConverter 设置 到 RabbitTemplate 中

JSON序列化(生产者和消费者都要添加)

  1. @Configuration
  2. public class RabbitMQConfig {
  3. @Bean("orderQueue")
  4. public Queue orderQueue(){
  5. return QueueBuilder.durable("order.create").build();
  6. }
  7. @Bean
  8. public Jackson2JsonMessageConverter jackson2JsonMessageConverter(){
  9. return new Jackson2JsonMessageConverter();
  10. }
  11. @Bean
  12. public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory,Jackson2JsonMessageConverter jackson2JsonMessageConverter){
  13. RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
  14. rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
  15. return rabbitTemplate;
  16. }
  17. }

定义对象

  1. @Data
  2. public class OrderInfo {
  3. private String orderId;
  4. private String name;
  5. }

生产者

  1. @RequestMapping("/order")
  2. @RestController
  3. public class OrderController {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. @RequestMapping("/create")
  7. public String create(){
  8. //发送消息
  9. String orderId = UUID.randomUUID().toString();
  10. rabbitTemplate.convertAndSend("","order.create","订单信息,订单ID:"+orderId);
  11. return "下单成功";
  12. }
  13. @RequestMapping("/create2")
  14. public String create2(){
  15. //发送消息
  16. OrderInfo orderInfo = new OrderInfo();
  17. orderInfo.setOrderId( UUID.randomUUID().toString());
  18. orderInfo.setName("价格"+new Random().nextInt(100));
  19. rabbitTemplate.convertAndSend("","order.create","订单信息,订单ID:"+orderInfo);
  20. return "下单成功";
  21. }
  22. }

查看消息:

消费者

  1. @Component
  2. @RabbitListener(queues = "order.create")
  3. public class OrderListener {
  4. @RabbitHandler
  5. public void handMessage(String orderInfo){
  6. System.out.println("接收到订单消息String:"+orderInfo);
  7. }
  8. @RabbitHandler
  9. public void handMessage(OrderInfo orderInfo){
  10. System.out.println("接收到订单消息OrderInfo:"+orderInfo);
  11. }
  12. }

@RabbitListener(queues="order.create")可以加在类上,也可以加在方法上,⽤于定于⼀个类或者方法作为消息的监听器

@RabbitHandler是一个方法级别的注解,使用它该方法被调用处理特定的消息


本文转载自: https://blog.csdn.net/qq_74812898/article/details/143441346
版权归原作者 小笨猪- 所有, 如有侵权,请联系我们删除。

“RabbitMQ应用”的评论:

还没有评论