0


RabbitMQ(三)Java客户端

1.快速入门


在idea里面创建两个springboot项目,一个模块是consumer,一个是publisher

两者有自己的启动类,继承同一父工程的pom。

父工程的pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>cn.itcast.demo</groupId>
  7. <artifactId>mq-demo</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <modules>
  10. <module>publisher</module>
  11. <module>consumer</module>
  12. </modules>
  13. <packaging>pom</packaging>
  14. <parent>
  15. <groupId>org.springframework.boot</groupId>
  16. <artifactId>spring-boot-starter-parent</artifactId>
  17. <version>2.3.9.RELEASE</version>
  18. <relativePath/>
  19. </parent>
  20. <properties>
  21. <maven.compiler.source>8</maven.compiler.source>
  22. <maven.compiler.target>8</maven.compiler.target>
  23. </properties>
  24. <dependencies>
  25. <dependency>
  26. <groupId>org.projectlombok</groupId>
  27. <artifactId>lombok</artifactId>
  28. </dependency>
  29. <!--AMQP依赖,包含RabbitMQ-->
  30. <dependency>
  31. <groupId>org.springframework.boot</groupId>
  32. <artifactId>spring-boot-starter-amqp</artifactId>
  33. </dependency>
  34. <!--单元测试-->
  35. <dependency>
  36. <groupId>org.springframework.boot</groupId>
  37. <artifactId>spring-boot-starter-test</artifactId>
  38. </dependency>
  39. </dependencies>
  40. </project>

publisher的pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>mq-demo</artifactId>
  7. <groupId>cn.itcast.demo</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>publisher</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. </properties>
  16. </project>

consumer的pom.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <parent>
  6. <artifactId>mq-demo</artifactId>
  7. <groupId>cn.itcast.demo</groupId>
  8. <version>1.0-SNAPSHOT</version>
  9. </parent>
  10. <modelVersion>4.0.0</modelVersion>
  11. <artifactId>consumer</artifactId>
  12. <properties>
  13. <maven.compiler.source>8</maven.compiler.source>
  14. <maven.compiler.target>8</maven.compiler.target>
  15. </properties>
  16. </project>

入门案例

直接省略交换机,生产者->队列->消费者(虚拟主机不能省略)

  • 利用控制台创建队列simple.queue
  • 在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
  • 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列

那么怎么发送消息呢?

SpringAMQP提供了RabbitTemplate工具类,方便我们发送信息。

  1. @SpringBootTest
  2. public class SpringAmqpTest {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @Test
  6. void testSendMessage2Queue(){
  7. String queueName="simple.queue";
  8. String msg="hello,amqp!";
  9. rabbitTemplate.convertAndSend(queueName,msg);
  10. }
  11. }


那么如何接收信息呢?

  1. @Component
  2. @Slf4j
  3. public class MqListener {
  4. @RabbitListener(queues ="simple.queue" )
  5. public void listenSimpleQueue(String msg){//发送来的是string类型,所以我们也用string接收
  6. System.out.println("消费者收到了simple.queue的消息【"+msg+"】");
  7. }
  8. }

然后用启动类启动(这个不是测试类)

控制台输出:消费者收到了simple.queue的消息【hello,amqp!】

此时,观察控制台发现,程序并没有停止,消费者始终处于监听的状态

然后publisher再次发送一个消息,consumer依然能够监听到

消费者收到了simple.queue的消息【hello,amqp!】

消费者收到了simple.queue的消息【hello,我是第二个消息】

最后,消息不存在队列当中。。。。。。。。。。。。。


2.WorkQueue

WorkQueue,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息.

案例

模拟workqueue,实现一个队列绑定多个消费者


  • 在RabbitMQ的控制台创建一个队列,名为work.queue
  • 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
  • 在consumer服务中定义两个消息监听者,都监听work.queue队列
  • 消费者1每秒处理50条消息,消费者2每秒处理5条消息

1.创建队列work.queue

2.publisher发送消息

  1. @Test
  2. void testWorkQueue() throws InterruptedException {
  3. for (int i = 0; i < 50; i++) {
  4. String queueName="work.queue";
  5. String msg="hello,我是第【"+i+"】条消息呀";
  6. rabbitTemplate.convertAndSend(queueName,msg);
  7. Thread.sleep(20);
  8. }
  9. }

线程休眠20ms发送一个消息,这样能看到循序渐进的效果。

3.consumer两个消息监听者,共同监听队列

  1. @RabbitListener(queues ="work.queue" )
  2. public void listenSimpleQueue1(String msg) throws InterruptedException {
  3. //发送来的是string类型,所以我们也用string接收
  4. System.out.println("消费者1 收到了work.queue的消息【"+msg+"】");
  5. Thread.sleep(20);
  6. }
  7. @RabbitListener(queues ="work.queue" )
  8. public void listenSimpleQueue2(String msg) throws InterruptedException {
  9. //发送来的是string类型,所以我们也用string接收 err.print是打印红色的
  10. System.err.println("消费者2 收到了work.queue的消息【"+msg+"】");
  11. Thread.sleep(200);
  12. }

如果两个监听器不加线程休眠,那么性能是一样的:

消费者1 收到了work.queue的消息【hello,我是第【0】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【1】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【2】条消息呀】

消费者2 收到了work.queue的消息【hello,我是第【3】条消息呀】


如果两个监听器加上不同的线程休眠,导致性能不一样:

消费者1 收到了work.queue的消息【hello,我是第【0】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【1】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【2】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【4】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【6】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【3】条消息呀】

但是你会发现,依旧是02468--13579????

消费者消息推送限制

默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但是并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。

因此我们需要修改application.yml,设置preFetch的值为1,确保同一时刻最多投递给消费者1条消息。

消费者2 收到了work.queue的消息【hello,我是第【0】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【1】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【2】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【3】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【4】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【5】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【6】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【7】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【8】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【9】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【10】条消息呀】

总结

Work模型的使用:
  • 多个消费者绑定到一个队列中,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者领取的消息数量,处理完一条再处理下一条,实现能者多劳

3.交换机

真正的生产环境都会经过交换机来发送消息,交换机有路由功能,而不是直接发送到队列,交换机的类型有三种:

  • Fanout广播
  • Direct定向
  • Topic话题
1.Fanout交换机

Fanout Exchange会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式

案例---利用SpringAMQP演示FanoutExchange的使用

实现思路如下:

1.在控制台中,声明队列fanout.queue1和fanout.queue2

2.在控制台中,声明交换机hmall.fanout,将两个队列与其绑定

3.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

4.在publisher中编写测试方法,向hmall.fanout发送消息




现在Fanout广播交换机和两个队列已经就绪,

在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

  1. @RabbitListener(queues ="fanout.queue1" )
  2. public void listenFanoutQueue1(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
  3. System.out.println("消费者1 收到了fanout.queue1的消息【"+msg+"】");
  4. }
  5. @RabbitListener(queues ="fanout.queue2" )
  6. public void listenFanoutQueue2(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
  7. System.out.println("消费者2 收到了fanout.queue2的消息【"+msg+"】");
  8. }

在publisher中编写测试方法,向hmall.fanout发送消息

  1. @Test
  2. void testSendFanout(){
  3. String exchangeName="hmall.fanout";
  4. String msg="hello,everyone";
  5. rabbitTemplate.convertAndSend(exchangeName,null,msg);
  6. }

消费者2 收到了fanout.queue2的消息【hello,everyone】
消费者1 收到了fanout.queue1的消息【hello,everyone】

2.Direct交换机

Direct Exchange会将接收到的消息根据规则路由到指定的queue,因此叫定向路由。

  • 每一个Queue都与Exchage设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

案例

1.在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2

2.在RabbitMQ控制台中,声明交换机hmall.direct,将两个队列与其绑定

3.在consumer服务里面,编写两个消费者方法,分别监听direct.queue1和direct.queue2

4.在publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息


消费者监听:

  1. @RabbitListener(queues ="direct.queue1" )
  2. public void listenDirectQueue1(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
  3. System.out.println("消费者1 收到了direct.queue1的消息【"+msg+"】");
  4. }
  5. @RabbitListener(queues ="direct.queue2" )
  6. public void listenDirectQueue2(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
  7. System.out.println("消费者2 收到了direct.queue2的消息【"+msg+"】");
  8. }

publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息:

  1. @Test
  2. void testSendDirect(){
  3. String exchangeName="hmall.direct";
  4. String msg="红色红色";
  5. rabbitTemplate.convertAndSend(exchangeName,"red",msg);
  6. }

结果:

消费者1 收到了direct.queue1的消息【红色红色】
消费者2 收到了direct.queue2的消息【红色红色】

3.Topic交换机

案例:


  1. **消费者接收:**
  2. @RabbitListener(queues ="topic.queue1" )
  3. public void listenTopicQueue1(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
  4. System.out.println("消费者1 收到了topic.queue1的消息【"+msg+"】");
  5. }
  6. @RabbitListener(queues ="topic.queue2" )
  7. public void listenTopicQueue2(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
  8. System.out.println("消费者2 收到了topic.queue2的消息【"+msg+"】");
  9. }

publisher发送:

  1. @Test
  2. void testSendTopic(){
  3. String exchangeName="hmall.topic";
  4. String msg="小日本的新闻";
  5. rabbitTemplate.convertAndSend(exchangeName,"japan.news",msg);
  6. }

控制台:

消费者2 收到了topic.queue2的消息【小日本的新闻】

4.声明队列和交换机的方式(新建队列&交换机)

就比如说刚才我们声明一个新的队列和交换机,都是在RabbitMQ的控制台里面生成的,并不是在JAVA代码里面生成的。


方式一:

在消费者里面建立一个config包:

  1. @Configuration
  2. public class FanoutConfiguration {
  3. //交换机
  4. @Bean
  5. public FanoutExchange fanoutExchange() {
  6. // ExchangeBuilder.fanoutExchange("hmall.fanout2").build();
  7. return new FanoutExchange("hmall.fanout2");
  8. }
  9. //队列 durable持久的
  10. @Bean
  11. public Queue fanoutQueue3() {
  12. // QueueBuilder.durable("").build();
  13. return new Queue("fanout.queue3");
  14. }
  15. @Bean
  16. public Queue fanoutQueue4() {
  17. return new Queue("fanout.queue4");
  18. }
  19. //绑定
  20. @Bean
  21. public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
  22. return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
  23. }
  24. @Bean
  25. public Binding fanoutBinding4(Queue fanoutQueue4, FanoutExchange fanoutExchange) {
  26. return BindingBuilder.bind(fanoutQueue4).to(fanoutExchange);
  27. //后面加上 .with 就是routingKey
  28. }
  29. // 在configuration中的bean会被直接动态代理,而在service里
  30. // 方法在不注入自己的情况下调用自己的方法会使事物无效,原因就是没有动态代理,只有用到该类时才会代理
  31. }

方式二:基于注解声明

SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:

在消费者的监听器里面:

  1. @RabbitListener(bindings = @QueueBinding(
  2. value = @Queue(name = "direct.queue1",durable = "true"),
  3. exchange=@Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
  4. key={"red","blue"}
  5. ))
  6. public void listenDirectQueue1(String msg) throws InterruptedException {
  7. System.out.println("消费者1 收到了direct.queue1的消息【"+msg+"】");
  8. }

5.消息转换器

  1. @Test
  2. void testSendObject(){
  3. Map<String, Object>msg=new HashMap<>(2);
  4. msg.put("name","jack");
  5. msg.put("aga",21);
  6. rabbitTemplate.convertAndSend("object.queue",msg);
  7. }




  1. <dependency>
  2. <groupId>com.fasterxml.jackson.dataformat</groupId>
  3. <artifactId>jackson-dataformat-xml</artifactId>
  4. <version>2.17.1</version>
  5. </dependency>

在发布者启动类里配置:

  1. import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
  2. import org.springframework.amqp.support.converter.MessageConverter;
  3. @SpringBootApplication
  4. public class PublisherApplication {
  5. public static void main(String[] args) {
  6. SpringApplication.run(PublisherApplication.class);
  7. }
  8. @Bean
  9. public MessageConverter jacksonMessageConvertor() {
  10. return new Jackson2JsonMessageConverter();
  11. }
  12. }

成功!!!!!!!!!

接收和以前一样!!!!!!!


END-业务改造

↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓

消费者监听器里面:

  1. @Component
  2. @RequiredArgsConstructor
  3. public class PayStatusListener {
  4. private final IOrderService orderService;
  5. @RabbitListener(bindings = @QueueBinding(
  6. value = @Queue(name = "mark.order.pay.queue", durable = "true"),
  7. exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),
  8. key = "pay.success"
  9. ))
  10. public void listenOrderPay(Long orderId) {
  11. //标记订单状态为已经支付
  12. orderService.markOrderPaySuccess(orderId);
  13. }
  14. }
  1. @RequiredArgsConstructor@Autowired

这两种方式在实现依赖注入的方式和效果上有一些区别,主要体现在以下几个方面:

  1. 生成方式:- @RequiredArgsConstructor 是 Lombok 提供的功能,它会为所有被 final 修饰的字段生成一个构造函数,并将这些字段作为参数进行注入。- @Autowired 是 Spring 提供的注解,用来标注依赖注入的位置(字段、构造函数、或者方法)。
  2. 字段可变性:- @RequiredArgsConstructor 要求依赖字段必须是 final 的,因为它生成的构造函数会使用 final 字段作为参数,并在构造函数中进行赋值。- @Autowired 不要求字段必须是 final 的,可以是普通的私有字段或者通过 setter 方法注入。
  3. 灵活性和推荐性:- @RequiredArgsConstructor 通常适用于希望保持类中依赖字段不变(immutable)的情况,例如通过构造函数注入一次性初始化这些依赖,而后不再修改。- @Autowired 则更为灵活,可以在字段或者方法中使用,依赖字段可以是 final 也可以不是,更适合在需要动态变化依赖或者在单元测试中替换依赖时使用。
  4. 使用场景:- 如果你的类设计为依赖在初始化后不再改变,并且希望利用 final 字段确保其不可变性,那么使用 @RequiredArgsConstructor 是一个很好的选择。- 如果你希望在类的生命周期中动态更改依赖,或者依赖字段不一定是 final 的,那么使用 @Autowired 更为灵活。

示例比较:

使用

  1. @RequiredArgsConstructor

的示例:

  1. import lombok.RequiredArgsConstructor;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. @RequiredArgsConstructor
  5. public class MyService {
  6. private final SomeOtherService otherService;
  7. // 其他方法
  8. }

使用

  1. @Autowired

的示例:

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class MyService {
  5. @Autowired
  6. private SomeOtherService otherService;
  7. // 其他方法
  8. }

在这两个示例中,第一个示例利用

  1. @RequiredArgsConstructor

自动生成了构造函数,并使用

  1. final

字段

  1. otherService

进行依赖注入。第二个示例则显式地使用了

  1. @Autowired

注解在构造函数中进行依赖注入,没有要求

  1. otherService

  1. final

字段。

综上所述,主要区别在于字段的

  1. final

要求、代码生成方式以及推荐的使用场景。选择哪种方式取决于你的设计需求和项目的规范。

发消息者:



异步修改订单状态,尽量用try、catch



本文转载自: https://blog.csdn.net/weixin_73253843/article/details/140865342
版权归原作者 let_ 所有, 如有侵权,请联系我们删除。

“RabbitMQ(三)Java客户端”的评论:

还没有评论