0


RabbitMQ - 简单案例

0.引用

  1. https://note.oddfar.com/rabbitmq/

1.Hello world

1.1 依赖引用

  1. <dependencies>
  2. <!--rabbitmq 依赖客户端-->
  3. <dependency>
  4. <groupId>com.rabbitmq</groupId>
  5. <artifactId>amqp-client</artifactId>
  6. <version>5.8.0</version>
  7. </dependency>
  8. <!--操作文件流的一个依赖-->
  9. <dependency>
  10. <groupId>commons-io</groupId>
  11. <artifactId>commons-io</artifactId>
  12. <version>2.6</version>
  13. </dependency>
  14. </dependencies>

1.2 消息生产者

  1. package com.example.one;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. public class Producer {
  6. private final static String QUEUE_NAME = "quque";
  7. public static void main(String[] args) throws Exception {
  8. //创建一个连接工厂
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("192.168.2.17");
  11. factory.setUsername("admin");
  12. factory.setPassword("admin");
  13. //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
  14. //创建连接
  15. Connection connection = factory.newConnection();
  16. //获取信道
  17. Channel channel = connection.createChannel();
  18. /**
  19. * 生成一个队列
  20. * 1.QUEUE_NAME 队列名称
  21. * 2.durable 队列里面的消息是否持久化 也就是是否用完就删除
  22. * 3.exclusive 该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
  23. * 4.autoDelete是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
  24. * 5.其他参数
  25. */
  26. Boolean durable = true;
  27. Boolean exclusive = false;
  28. Boolean autoDelete = false;
  29. Map<String, Object> arguments = null;
  30. channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete, null);
  31. String message = "hello world";
  32. /**
  33. * 发送一个消息
  34. * 1.发送到那个交换机
  35. * 2.路由的 key 是哪个
  36. * 3.其他的参数信息
  37. * 4.发送消息的消息体
  38. */
  39. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  40. System.out.println("消息发送完毕");
  41. }
  42. }

1.3 消息消费者

  1. package com.example.one;
  2. import com.rabbitmq.client.*;
  3. public class Consumer {
  4. private final static String QUEUE_NAME = "quque";
  5. public static void main(String[] args) throws Exception {
  6. ConnectionFactory factory = new ConnectionFactory();
  7. factory.setHost("192.168.2.17");
  8. factory.setUsername("admin");
  9. factory.setPassword("admin");
  10. Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel();
  12. System.out.println("等待接收消息.........");
  13. //推送的消息如何进行消费的接口回调
  14. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  15. String message = new String(delivery.getBody());
  16. System.out.println(message);
  17. };
  18. //取消消费的一个回调接口 如在消费的时候队列被删除掉了
  19. CancelCallback cancelCallback = (consumerTag) -> {
  20. System.out.println("消息消费被中断");
  21. };
  22. /**
  23. * 消费者消费消息 - 接受消息
  24. * 1.消费哪个队列
  25. * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
  26. * 3.消费者未成功消费的回调
  27. * 4.消息被取消时的回调
  28. */
  29. channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
  30. }
  31. }

2.轮训分发消息

2.1 抽取工具类

  1. package com.example.utils;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. public class RabbitMqUtils {
  6. //得到一个连接的 channel
  7. public static Channel getChannel() throws Exception {
  8. //创建一个连接工厂
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("192.168.2.17");
  11. factory.setUsername("admin");
  12. factory.setPassword("admin");
  13. Connection connection = factory.newConnection();
  14. Channel channel = connection.createChannel();
  15. return channel;
  16. }
  17. }

** 2.2 启动两个工作线程接受消息**

  1. package com.example.two;
  2. import com.oddfar.utils.RabbitMqUtils;
  3. import com.rabbitmq.client.CancelCallback;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.DeliverCallback;
  6. public class Worker01 {
  7. private static final String QUEUE_NAME = "quque";
  8. public static void main(String[] args) throws Exception {
  9. Channel channel = RabbitMqUtils.getChannel();
  10. //消息接受
  11. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  12. String receivedMessage = new String(delivery.getBody());
  13. System.out.println("接收到消息:" + receivedMessage);
  14. };
  15. //消息被取消
  16. CancelCallback cancelCallback = (consumerTag) -> {
  17. System.out.println(consumerTag + "消费者取消消费接口回调逻辑");
  18. };
  19. System.out.println("C1 消费者启动等待消费.................. ");
  20. channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
  21. }
  22. }

选中

  1. Allow multiple instances

image-20210627125840217

启动后

image-20210627130146584

** 2.3 启动一个发送消息线程**

  1. public class Task01 {
  2. public static final String QUEUE_NAME = "quque";
  3. public static void main(String[] args) throws Exception {
  4. Channel channel = RabbitMqUtils.getChannel();
  5. Scanner scanner = new Scanner(System.in);
  6. while (scanner.hasNext()) {
  7. String message = scanner.next();
  8. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
  9. System.out.println("消息发送完成:" + message);
  10. }
  11. }
  12. }

** 2.4 结果展示**

  1. 通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且是按照有序的一个接收一次消息

3.消息应答

3.1 自动应答

  1. 消息发送后立即被认为已经传送成功

3.2 手动消息应答的方法

  • Channel.basicAck(用于肯定确认)
  • Channel.basicNack(用于否定确认)
  • Channel.basicReject(用于否定确认)

Multiple 的解释:

  1. 手动应答的好处是可以批量应答并且减少网络拥堵

  • true 代表批量应答 channel 上未应答的消息
  • false 同上面相比只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

RabbitMQ-00000018

3.3 消息自动重新入队

3.4 消息手动应答代码

  1. 消费者在上面代码的基础上增加了以下内容
  1. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

4.RabbitMQ 持久化

4.1 队列如何实现持久化

  1. //让队列持久化
  2. boolean durable = true;
  3. //声明队列
  4. channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);

4.2 消息实现持久化

  1. 需要在消息**生产者**修改代码,
  1. MessageProperties.PERSISTENT_TEXT_PLAIN

添加这个属性

RabbitMQ-00000028

5.不公平分发

为了避免这种情况,在消费者中消费之前,我们可以设置参数

  1. channel.basicQos(1);
  1. //不公平分发
  2. int prefetchCount = 1;
  3. channel.basicQos(prefetchCount);
  4. //采用手动应答
  5. boolean autoAck = false;
  6. channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, cancelCallback);

6.预取值分发

  1. 本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息另外来自消费 者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能**限制此缓冲区的大小**,**以避免缓冲区里面无限制的未确认消息问题**。这个时候就可以通过使用 basic.qos 方法设 置“预取计数”值来完成的
标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/wang_peng/article/details/132115303
版权归原作者 wang_peng 所有, 如有侵权,请联系我们删除。

“RabbitMQ - 简单案例”的评论:

还没有评论