1.快速入门
在idea里面创建两个springboot项目,一个模块是consumer,一个是publisher
两者有自己的启动类,继承同一父工程的pom。
父工程的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.itcast.demo</groupId>
<artifactId>mq-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<modules>
<module>publisher</module>
<module>consumer</module>
</modules>
<packaging>pom</packaging>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
</project>
publisher的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>mq-demo</artifactId>
<groupId>cn.itcast.demo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>publisher</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>
consumer的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>mq-demo</artifactId>
<groupId>cn.itcast.demo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>consumer</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
</project>
入门案例
直接省略交换机,生产者->队列->消费者(虚拟主机不能省略)
- 利用控制台创建队列simple.queue
- 在publisher服务中,利用SpringAMQP直接向simple.queue发送消息
- 在consumer服务中,利用SpringAMQP编写消费者,监听simple.queue队列
那么怎么发送消息呢?
SpringAMQP提供了RabbitTemplate工具类,方便我们发送信息。
@SpringBootTest public class SpringAmqpTest { @Autowired private RabbitTemplate rabbitTemplate; @Test void testSendMessage2Queue(){ String queueName="simple.queue"; String msg="hello,amqp!"; rabbitTemplate.convertAndSend(queueName,msg); } }
那么如何接收信息呢?
@Component @Slf4j public class MqListener { @RabbitListener(queues ="simple.queue" ) public void listenSimpleQueue(String msg){//发送来的是string类型,所以我们也用string接收 System.out.println("消费者收到了simple.queue的消息【"+msg+"】"); } }
然后用启动类启动(这个不是测试类)
控制台输出:消费者收到了simple.queue的消息【hello,amqp!】
此时,观察控制台发现,程序并没有停止,消费者始终处于监听的状态
然后publisher再次发送一个消息,consumer依然能够监听到
消费者收到了simple.queue的消息【hello,amqp!】
消费者收到了simple.queue的消息【hello,我是第二个消息】
最后,消息不存在队列当中。。。。。。。。。。。。。
2.WorkQueue
WorkQueue,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息.
案例
模拟workqueue,实现一个队列绑定多个消费者
- 在RabbitMQ的控制台创建一个队列,名为work.queue
- 在publisher服务中定义测试方法,在1秒内产生50条消息,发送到work.queue
- 在consumer服务中定义两个消息监听者,都监听work.queue队列
- 消费者1每秒处理50条消息,消费者2每秒处理5条消息
1.创建队列work.queue
2.publisher发送消息
@Test void testWorkQueue() throws InterruptedException { for (int i = 0; i < 50; i++) { String queueName="work.queue"; String msg="hello,我是第【"+i+"】条消息呀"; rabbitTemplate.convertAndSend(queueName,msg); Thread.sleep(20); } }
线程休眠20ms发送一个消息,这样能看到循序渐进的效果。
3.consumer两个消息监听者,共同监听队列
@RabbitListener(queues ="work.queue" ) public void listenSimpleQueue1(String msg) throws InterruptedException { //发送来的是string类型,所以我们也用string接收 System.out.println("消费者1 收到了work.queue的消息【"+msg+"】"); Thread.sleep(20); } @RabbitListener(queues ="work.queue" ) public void listenSimpleQueue2(String msg) throws InterruptedException { //发送来的是string类型,所以我们也用string接收 err.print是打印红色的 System.err.println("消费者2 收到了work.queue的消息【"+msg+"】"); Thread.sleep(200); }
如果两个监听器不加线程休眠,那么性能是一样的:
消费者1 收到了work.queue的消息【hello,我是第【0】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【1】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【2】条消息呀】消费者2 收到了work.queue的消息【hello,我是第【3】条消息呀】
如果两个监听器加上不同的线程休眠,导致性能不一样:
消费者1 收到了work.queue的消息【hello,我是第【0】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【1】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【2】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【4】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【6】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【3】条消息呀】
但是你会发现,依旧是02468--13579????
消费者消息推送限制
默认情况下,RabbitMQ的会将消息依次轮询投递给绑定在队列上的每一个消费者。但是并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此我们需要修改application.yml,设置preFetch的值为1,确保同一时刻最多投递给消费者1条消息。
消费者2 收到了work.queue的消息【hello,我是第【0】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【1】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【2】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【3】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【4】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【5】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【6】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【7】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【8】条消息呀】
消费者1 收到了work.queue的消息【hello,我是第【9】条消息呀】
消费者2 收到了work.queue的消息【hello,我是第【10】条消息呀】
总结
Work模型的使用:
- 多个消费者绑定到一个队列中,可以加快消息处理速度
- 同一条消息只会被一个消费者处理
- 通过设置prefetch来控制消费者领取的消息数量,处理完一条再处理下一条,实现能者多劳
3.交换机
真正的生产环境都会经过交换机来发送消息,交换机有路由功能,而不是直接发送到队列,交换机的类型有三种:
- Fanout广播
- Direct定向
- Topic话题
1.Fanout交换机
Fanout Exchange会将接收到的消息广播到每一个跟其绑定的queue,所以也叫广播模式
案例---利用SpringAMQP演示FanoutExchange的使用
实现思路如下:
1.在控制台中,声明队列fanout.queue1和fanout.queue2
2.在控制台中,声明交换机hmall.fanout,将两个队列与其绑定
3.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
4.在publisher中编写测试方法,向hmall.fanout发送消息
现在Fanout广播交换机和两个队列已经就绪,
在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
@RabbitListener(queues ="fanout.queue1" )
public void listenFanoutQueue1(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
System.out.println("消费者1 收到了fanout.queue1的消息【"+msg+"】");
}
@RabbitListener(queues ="fanout.queue2" )
public void listenFanoutQueue2(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收
System.out.println("消费者2 收到了fanout.queue2的消息【"+msg+"】");
}
在publisher中编写测试方法,向hmall.fanout发送消息
@Test
void testSendFanout(){
String exchangeName="hmall.fanout";
String msg="hello,everyone";
rabbitTemplate.convertAndSend(exchangeName,null,msg);
}
消费者2 收到了fanout.queue2的消息【hello,everyone】
消费者1 收到了fanout.queue1的消息【hello,everyone】
2.Direct交换机
Direct Exchange会将接收到的消息根据规则路由到指定的queue,因此叫定向路由。
- 每一个Queue都与Exchage设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
案例
1.在RabbitMQ控制台中,声明队列direct.queue1和direct.queue2
2.在RabbitMQ控制台中,声明交换机hmall.direct,将两个队列与其绑定
3.在consumer服务里面,编写两个消费者方法,分别监听direct.queue1和direct.queue2
4.在publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息
消费者监听:
@RabbitListener(queues ="direct.queue1" ) public void listenDirectQueue1(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收 System.out.println("消费者1 收到了direct.queue1的消息【"+msg+"】"); } @RabbitListener(queues ="direct.queue2" ) public void listenDirectQueue2(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收 System.out.println("消费者2 收到了direct.queue2的消息【"+msg+"】"); }
publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息:
@Test void testSendDirect(){ String exchangeName="hmall.direct"; String msg="红色红色"; rabbitTemplate.convertAndSend(exchangeName,"red",msg); }
结果:
消费者1 收到了direct.queue1的消息【红色红色】
消费者2 收到了direct.queue2的消息【红色红色】
3.Topic交换机
案例:
**消费者接收:** @RabbitListener(queues ="topic.queue1" ) public void listenTopicQueue1(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收 System.out.println("消费者1 收到了topic.queue1的消息【"+msg+"】"); } @RabbitListener(queues ="topic.queue2" ) public void listenTopicQueue2(String msg) throws InterruptedException {//发送来的是string类型,所以我们也用string接收 System.out.println("消费者2 收到了topic.queue2的消息【"+msg+"】"); }
publisher发送:
@Test void testSendTopic(){ String exchangeName="hmall.topic"; String msg="小日本的新闻"; rabbitTemplate.convertAndSend(exchangeName,"japan.news",msg); }
控制台:
消费者2 收到了topic.queue2的消息【小日本的新闻】
4.声明队列和交换机的方式(新建队列&交换机)
就比如说刚才我们声明一个新的队列和交换机,都是在RabbitMQ的控制台里面生成的,并不是在JAVA代码里面生成的。
方式一:
在消费者里面建立一个config包:
@Configuration
public class FanoutConfiguration {
//交换机
@Bean
public FanoutExchange fanoutExchange() {
// ExchangeBuilder.fanoutExchange("hmall.fanout2").build();
return new FanoutExchange("hmall.fanout2");
}
//队列 durable持久的
@Bean
public Queue fanoutQueue3() {
// QueueBuilder.durable("").build();
return new Queue("fanout.queue3");
}
@Bean
public Queue fanoutQueue4() {
return new Queue("fanout.queue4");
}
//绑定
@Bean
public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);
}
@Bean
public Binding fanoutBinding4(Queue fanoutQueue4, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue4).to(fanoutExchange);
//后面加上 .with 就是routingKey
}
// 在configuration中的bean会被直接动态代理,而在service里
// 方法在不注入自己的情况下调用自己的方法会使事物无效,原因就是没有动态代理,只有用到该类时才会代理
}
方式二:基于注解声明
SpringAMQP还提供了基于@RabbitListener注解来声明队列和交换机的方式:
在消费者的监听器里面:
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "direct.queue1",durable = "true"), exchange=@Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT), key={"red","blue"} )) public void listenDirectQueue1(String msg) throws InterruptedException { System.out.println("消费者1 收到了direct.queue1的消息【"+msg+"】"); }
5.消息转换器
@Test
void testSendObject(){
Map<String, Object>msg=new HashMap<>(2);
msg.put("name","jack");
msg.put("aga",21);
rabbitTemplate.convertAndSend("object.queue",msg);
}
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.17.1</version>
</dependency>
在发布者启动类里配置:
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
@SpringBootApplication
public class PublisherApplication {
public static void main(String[] args) {
SpringApplication.run(PublisherApplication.class);
}
@Bean
public MessageConverter jacksonMessageConvertor() {
return new Jackson2JsonMessageConverter();
}
}
成功!!!!!!!!!
接收和以前一样!!!!!!!
END-业务改造
↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
消费者监听器里面:
@Component
@RequiredArgsConstructor
public class PayStatusListener {
private final IOrderService orderService;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "mark.order.pay.queue", durable = "true"),
exchange = @Exchange(name = "pay.topic", type = ExchangeTypes.TOPIC),
key = "pay.success"
))
public void listenOrderPay(Long orderId) {
//标记订单状态为已经支付
orderService.markOrderPaySuccess(orderId);
}
}
@RequiredArgsConstructor和@Autowired
这两种方式在实现依赖注入的方式和效果上有一些区别,主要体现在以下几个方面:
- 生成方式:-
@RequiredArgsConstructor
是 Lombok 提供的功能,它会为所有被final
修饰的字段生成一个构造函数,并将这些字段作为参数进行注入。-@Autowired
是 Spring 提供的注解,用来标注依赖注入的位置(字段、构造函数、或者方法)。- 字段可变性:-
@RequiredArgsConstructor
要求依赖字段必须是final
的,因为它生成的构造函数会使用final
字段作为参数,并在构造函数中进行赋值。-@Autowired
不要求字段必须是final
的,可以是普通的私有字段或者通过 setter 方法注入。- 灵活性和推荐性:-
@RequiredArgsConstructor
通常适用于希望保持类中依赖字段不变(immutable)的情况,例如通过构造函数注入一次性初始化这些依赖,而后不再修改。-@Autowired
则更为灵活,可以在字段或者方法中使用,依赖字段可以是final
也可以不是,更适合在需要动态变化依赖或者在单元测试中替换依赖时使用。- 使用场景:- 如果你的类设计为依赖在初始化后不再改变,并且希望利用
final
字段确保其不可变性,那么使用@RequiredArgsConstructor
是一个很好的选择。- 如果你希望在类的生命周期中动态更改依赖,或者依赖字段不一定是final
的,那么使用@Autowired
更为灵活。
示例比较:
使用
@RequiredArgsConstructor
的示例:
import lombok.RequiredArgsConstructor; import org.springframework.stereotype.Service; @Service @RequiredArgsConstructor public class MyService { private final SomeOtherService otherService; // 其他方法 }
使用
@Autowired
的示例:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @Service public class MyService { @Autowired private SomeOtherService otherService; // 其他方法 }
在这两个示例中,第一个示例利用
@RequiredArgsConstructor
自动生成了构造函数,并使用
final
字段
otherService
进行依赖注入。第二个示例则显式地使用了
@Autowired
注解在构造函数中进行依赖注入,没有要求
otherService
是
final
字段。
综上所述,主要区别在于字段的
final
要求、代码生成方式以及推荐的使用场景。选择哪种方式取决于你的设计需求和项目的规范。
发消息者:
异步修改订单状态,尽量用try、catch
版权归原作者 let_ 所有, 如有侵权,请联系我们删除。