0


RabbitMQ(三)Java客户端

1.快速入门


在idea里面创建两个springboot项目,一个模块是consumer,一个是publisher

两者有自己的启动类,继承同一父工程的pom。

父工程的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.itcast.demo</groupId>
    <artifactId>mq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>publisher</module>
        <module>consumer</module>
    </modules>
    <packaging>pom</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.9.RELEASE</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--单元测试-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>
</project>

publisher的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>mq-demo</artifactId>
        <groupId>cn.itcast.demo</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>publisher</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

</project>

consumer的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>mq-demo</artifactId>
        <groupId>cn.itcast.demo</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>consumer</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

</project>

入门案例

直接省略交换机,生产者->队列->消费者(虚拟主机不能省略)

  • 利用控制台创建队列simple.queue
  • 在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
  • 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列

那么怎么发送消息呢?

SpringAMQP提供了RabbitTemplate工具类,方便我们发送信息。

@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void testSendMessage2Queue(){
        String queueName="simple.queue";
        String msg="hello,amqp!";
        rabbitTemplate.convertAndSend(queueName,msg);
    }
}


那么如何接收信息呢?

@Component
@Slf4j
public class MqListener {
    @RabbitListener(queues ="simple.queue" )
    public void listenSimpleQueue(String msg){//发送来的是string类型,所以我们也用string接收
        System.out.println("消费者收到了simple.queue的消息【"+msg+"】");
    }
}

然后用启动类启动(这个不是测试类)

控制台输出:消费者收到了simple.queue的消息【hello,amqp!】

此时,观察控制台发现,程序并没有停止,消费者始终处于监听的状态

然后publisher再次发送一个消息,consumer依然能够监听到

消费者收到了simple.queue的消息【hello,amqp!】

消费者收到了simple.queue的消息【hello,我是第二个消息】

最后,消息不存在队列当中。。。。。。。。。。。。。


2.WorkQueue

WorkQueue,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息.

案例

模拟workqueue,实现一个队列绑定多个消费者


  • 在RabbitMQ的控制台创建一个队列,名为work.queue
  • 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
  • 在consumer服务中定义两个消息监听者,都监听work.queue队列
  • 消费者1每秒处理50条消息,消费者2每秒处理5条消息

1.创建队列work.queue

2.publisher发送消息

@Test
void testWorkQueue() throws InterruptedException {
    for (int i = 0; i < 50; i++) {
        String queueName="work.queue";
        String msg="hello,我是第【"+i+"】条消息呀";
        rabbitTemplate.convertAndSend(queueName,msg);
        Thread.sleep(20);
    }
}

线程休眠20ms发送一个消息,这样能看到循序渐进的效果。

3.consumer两个消息监听者,共同监听队列

@RabbitListener(queues ="work.queue" )
public void listenSimpleQueue1(String msg) throws InterruptedException {
//发送来的是string类型,所以我们也用string接收
    System.out.println("消费者1 收到了work.queue的消息【"+msg+"】");
    Thread.sleep(20);
}

@RabbitListener(queues ="work.queue" )
public void listenSimpleQueue2(String msg) throws InterruptedException {
//发送来的是string类型,所以我们也用string接收  err.print是打印红色的
    System.err.println("消费者2 收到了work.queue的消息【"+msg+"】");
    Thread.sleep(200);
}

如果两个监听器不加线程休眠,那么性能是一样的:

消费者1 收到了work.queue的消息【hello,我是第【0】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【1】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【2】条消息呀】

消费者2 收到了work.queue的消息【hello,我是第【3】条消息呀】


如果两个监听器加上不同的线程休眠,导致性能不一样:

消费者1 收到了work.queue的消息【hello,我是第【0】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【1】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【2】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【4】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【6】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【3】条消息呀】

但是你会发现,依旧是02468--13579????

消费者消息推送限制

默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但是并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。

因此我们需要修改application.yml,设置preFetch的值为1,确保同一时刻最多投递给消费者1条消息。

消费者2 收到了work.queue的消息【hello,我是第【0】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【1】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【2】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【3】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【4】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【5】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【6】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【7】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【8】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【9】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【10】条消息呀】

总结

Work模型的使用:
  • 多个消费者绑定到一个队列中,可以加快消息处理速度
  • 同一条消息只会被一个消费者处理
  • 通过设置prefetch来控制消费者领取的消息数量,处理完一条再处理下一条,实现能者多劳

3.交换机

真正的生产环境都会经过交换机来发送消息,交换机有路由功能,而不是直接发送到队列,交换机的类型有三种:

  • Fanout广播
  • Direct定向
  • Topic话题
1.Fanout交换机

Fanout Exchange会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式

案例---利用SpringAMQP演示FanoutExchange的使用

实现思路如下:

1.在控制台中,声明队列fanout.queue1和fanout.queue2

2.在控制台中,声明交换机hmall.fanout,将两个队列与其绑定

3.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

4.在publisher中编写测试方法,向hmall.fanout发送消息




现在Fanout广播交换机和两个队列已经就绪,

在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

@RabbitListener(queues ="fanout.queue1" )
    public void listenFanoutQueue1(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
        System.out.println("消费者1 收到了fanout.queue1的消息【"+msg+"】");
    }

    @RabbitListener(queues ="fanout.queue2" )
    public void listenFanoutQueue2(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
        System.out.println("消费者2 收到了fanout.queue2的消息【"+msg+"】");
    }

在publisher中编写测试方法,向hmall.fanout发送消息

@Test
    void testSendFanout(){
        String exchangeName="hmall.fanout";
        String msg="hello,everyone";
        rabbitTemplate.convertAndSend(exchangeName,null,msg);
    }

消费者2 收到了fanout.queue2的消息【hello,everyone】
消费者1 收到了fanout.queue1的消息【hello,everyone】

2.Direct交换机

Direct Exchange会将接收到的消息根据规则路由到指定的queue,因此叫定向路由。

  • 每一个Queue都与Exchage设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

案例

1.在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2

2.在RabbitMQ控制台中,声明交换机hmall.direct,将两个队列与其绑定

3.在consumer服务里面,编写两个消费者方法,分别监听direct.queue1和direct.queue2

4.在publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息


消费者监听:

@RabbitListener(queues ="direct.queue1" )
public void listenDirectQueue1(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
    System.out.println("消费者1 收到了direct.queue1的消息【"+msg+"】");
}

@RabbitListener(queues ="direct.queue2" )
public void listenDirectQueue2(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
    System.out.println("消费者2 收到了direct.queue2的消息【"+msg+"】");
}

publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息:

@Test
void testSendDirect(){
    String exchangeName="hmall.direct";
    String msg="红色红色";
    rabbitTemplate.convertAndSend(exchangeName,"red",msg);
}

结果:

消费者1 收到了direct.queue1的消息【红色红色】
消费者2 收到了direct.queue2的消息【红色红色】

3.Topic交换机

案例:


**消费者接收:**
@RabbitListener(queues ="topic.queue1" )
public void listenTopicQueue1(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
    System.out.println("消费者1 收到了topic.queue1的消息【"+msg+"】");
}

@RabbitListener(queues ="topic.queue2" )
public void listenTopicQueue2(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
    System.out.println("消费者2 收到了topic.queue2的消息【"+msg+"】");
}

publisher发送:

@Test
void testSendTopic(){
    String exchangeName="hmall.topic";
    String msg="小日本的新闻";
    rabbitTemplate.convertAndSend(exchangeName,"japan.news",msg);
}

控制台:

消费者2 收到了topic.queue2的消息【小日本的新闻】

4.声明队列和交换机的方式(新建队列&交换机)

就比如说刚才我们声明一个新的队列和交换机,都是在RabbitMQ的控制台里面生成的,并不是在JAVA代码里面生成的。


方式一:

在消费者里面建立一个config包:

@Configuration
public class FanoutConfiguration {

    //交换机
    @Bean
    public FanoutExchange fanoutExchange() {
//  ExchangeBuilder.fanoutExchange("hmall.fanout2").build();
        return new FanoutExchange("hmall.fanout2");
    }

    //队列    durable持久的
    @Bean
    public Queue fanoutQueue3() {
//        QueueBuilder.durable("").build();
        return new Queue("fanout.queue3");
    }

    @Bean
    public Queue fanoutQueue4() {
        return new Queue("fanout.queue4");
    }

    //绑定
    @Bean
    public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);

    }

    @Bean
    public Binding fanoutBinding4(Queue fanoutQueue4, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue4).to(fanoutExchange);
        //后面加上 .with  就是routingKey
    }

//    在configuration中的bean会被直接动态代理,而在service里
//    方法在不注入自己的情况下调用自己的方法会使事物无效,原因就是没有动态代理,只有用到该类时才会代理
}

方式二:基于注解声明

SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:

在消费者的监听器里面:

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1",durable = "true"),
        exchange=@Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),
        key={"red","blue"}
))
public void listenDirectQueue1(String msg) throws InterruptedException {
    System.out.println("消费者1 收到了direct.queue1的消息【"+msg+"】");
}

5.消息转换器

@Test
    void testSendObject(){
        Map<String, Object>msg=new HashMap<>(2);
        msg.put("name","jack");
        msg.put("aga",21);
        rabbitTemplate.convertAndSend("object.queue",msg);
    }




        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
            <version>2.17.1</version>
        </dependency>

在发布者启动类里配置:

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
@SpringBootApplication
public class PublisherApplication {
    public static void main(String[] args) {
        SpringApplication.run(PublisherApplication.class);
    }
    @Bean
    public MessageConverter jacksonMessageConvertor() {
        return new Jackson2JsonMessageConverter();
    }
}

成功!!!!!!!!!

接收和以前一样!!!!!!!


END-业务改造

↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓

消费者监听器里面:

@Component
@RequiredArgsConstructor
public class PayStatusListener {

    private final IOrderService orderService;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "mark.order.pay.queue", durable = "true"),
            exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),
            key = "pay.success"
    ))
    public void listenOrderPay(Long orderId) {
        //标记订单状态为已经支付
        orderService.markOrderPaySuccess(orderId);
    }

}
@RequiredArgsConstructor和@Autowired

这两种方式在实现依赖注入的方式和效果上有一些区别,主要体现在以下几个方面:

  1. 生成方式:- @RequiredArgsConstructor 是 Lombok 提供的功能,它会为所有被 final 修饰的字段生成一个构造函数,并将这些字段作为参数进行注入。- @Autowired 是 Spring 提供的注解,用来标注依赖注入的位置(字段、构造函数、或者方法)。
  2. 字段可变性:- @RequiredArgsConstructor 要求依赖字段必须是 final 的,因为它生成的构造函数会使用 final 字段作为参数,并在构造函数中进行赋值。- @Autowired 不要求字段必须是 final 的,可以是普通的私有字段或者通过 setter 方法注入。
  3. 灵活性和推荐性:- @RequiredArgsConstructor 通常适用于希望保持类中依赖字段不变(immutable)的情况,例如通过构造函数注入一次性初始化这些依赖,而后不再修改。- @Autowired 则更为灵活,可以在字段或者方法中使用,依赖字段可以是 final 也可以不是,更适合在需要动态变化依赖或者在单元测试中替换依赖时使用。
  4. 使用场景:- 如果你的类设计为依赖在初始化后不再改变,并且希望利用 final 字段确保其不可变性,那么使用 @RequiredArgsConstructor 是一个很好的选择。- 如果你希望在类的生命周期中动态更改依赖,或者依赖字段不一定是 final 的,那么使用 @Autowired 更为灵活。

示例比较:

使用

@RequiredArgsConstructor

的示例:

import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;

@Service
@RequiredArgsConstructor
public class MyService {
    
    private final SomeOtherService otherService;
    
    // 其他方法
}

使用

@Autowired

的示例:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MyService {
    @Autowired
    private SomeOtherService otherService;
    
    
    
    
    // 其他方法
}

在这两个示例中,第一个示例利用

@RequiredArgsConstructor

自动生成了构造函数,并使用

final

字段

otherService

进行依赖注入。第二个示例则显式地使用了

@Autowired

注解在构造函数中进行依赖注入,没有要求

otherService

final

字段。

综上所述,主要区别在于字段的

final

要求、代码生成方式以及推荐的使用场景。选择哪种方式取决于你的设计需求和项目的规范。

发消息者:



异步修改订单状态,尽量用try、catch



本文转载自: https://blog.csdn.net/weixin_73253843/article/details/140865342
版权归原作者 let_ 所有, 如有侵权,请联系我们删除。

“RabbitMQ(三)Java客户端”的评论:

还没有评论