RabbitMQ简介
RabbitMQ 中文官方文档 (RabbitMQ 中文官方文档)
- 消息队列是应用程序和应用程序之间的一种通信方法。
- RabbitMQ : erlang语言开发、 基于AMQP协议。
- 同类产品:ActiveMQ、 ZeroMQ、 RabbitMQ、 RocketMQ、 Kafka。
- 六种模式: 简单模式、 工作模式、 发布与订阅模式、 路由模式、通配符模式、 远程调用模式(基本不会用到)
- 关键词:{Broker: 服务器实体、 Exchange :消息交换机、 Queue: 消息队列载体、Binding: 绑定 、Routing Key: 路由关键字、 VHost: 虚拟主机、Producer: 消息生产者 、 Consumer: 消息消费者、Channel: 消息通道 }
- 关键概念:由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的唯一的线路。
RabbitMQ五大模式实战
基于SpringBoot开发的RabbitMQ应用程序,利用SpringBoot的自动配置和起步依赖方便更快的构建项目。
环境准备
- 准备一台RabbitMQ服务器 链接: RabbitMQ安装提取码: qhxi
- 本次使用的是SpringBoot项目
- pom依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --></parent><dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-configuration-processor</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency></dependencies><build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins></build>
- 配置application.yml文件
spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guestserver: port: 8082
- 启动类和目录结构是SpringBoo正常设置,这里不再赘述。
注意:启动类名设置为RabbitmqProducerApplication
- 简单模式
- 简单模式配置文件
@Configurationpublic class RabbitSimpleConfig { @Bean public Queue simpleQueue(){ return new Queue("simpleQueue"); }}
- 简单模式生产者
@SpringBootTest(classes = RabbitmqProducerApplication.class)public class ProducerTest { @Autowired RabbitTemplate rabbitTemplate; @Test public void simpleProduct(){ for (int num = 0; num < 20; num++) { rabbitTemplate.convertAndSend("simpleQueue", "简单模式"+num); } }}
- 简单模式消费者
@Componentpublic class MessageListener { @RabbitListener(queues = "simpleQueue") public void simpleListener(String message){ System.out.println("简单模式监听器:"+message); } }
- 工作模式
- 工作模式配置文件
@Bean public Queue workQueue(){ return new Queue("workQueue"); }
- 工作模式生产者
@Testpublic void workProduct(){ for (int num = 0; num < 20; num++) { rabbitTemplate.convertAndSend("workQueue", "工作模式"+num); }}
- 工作模式消费者
@RabbitListener(queues = "workQueue") public void workListener1(String message) { System.out.println("工作模式监听器1:" + message); } @RabbitListener(queues = "workQueue") public void workListener2(String message) { System.out.println("工作模式监听器2:" + message); }
- 发布订阅模式
- 发布订阅模式配置文件
//配置交换器@Beanpublic FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange");}//配置队列@Beanpublic Queue fanoutQueue1() { return new Queue("fanoutQueue1", true, false, false, null);}@Beanpublic Queue fanoutQueue2() { return new Queue("fanoutQueue2", true, false, false, null);}//配置绑定@Beanpublic Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) { return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Binding fanoutBinding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) { return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
- 发布订阅模式生产者
@Testpublic void FanoutProduct(){ for (int num = 0; num < 10; num++) { rabbitTemplate.convertAndSend("fanoutExchange","","发布订阅模式"+num); }}
- 发布订阅模式消费者
@RabbitListener(queues = "fanoutQueue1")public void fanoutListener1(String message) { System.out.println("发布订阅监听器1:" + message);}@RabbitListener(queues = "fanoutQueue2")public void fanoutListener2(String message) { System.out.println("发布订阅监听器2:" + message);}
- 路由模式
- 路由模式配置文件
//配置交换机@Beanpublic DirectExchange directExchange() { return new DirectExchange("directExchange");}//配置队列@Beanpublic Queue directQueue1() { return new Queue("directQueue1", true, false, false, null);}@Beanpublic Queue directQueue2() { return new Queue("directQueue2", true, false, false, null);}//配置绑定@Beanpublic Binding directBinding1(Queue directQueue1, DirectExchange directExchange) { return BindingBuilder.bind(directQueue1).to(directExchange).with("one");}@Beanpublic Binding directBinding2(Queue directQueue2, DirectExchange directExchange) { return BindingBuilder.bind(directQueue2).to(directExchange).with("two");}
- 路由模式生产者
@Testpublic void directProduct1() { for (int num = 0; num < 5; num++) { rabbitTemplate.convertAndSend("directExchange","one", "发送到路由队列1消息"+num); }}@Testpublic void directProduct2() { for (int num = 0; num < 5; num++) { rabbitTemplate.convertAndSend("directExchange","two", "发送到路由队列2消息"+num); }}
- 路由模式消费者
@RabbitListener(queues = "directQueue1")public void fanoutListener1(String message) { System.out.println("路由模式监听器1:" + message);}@RabbitListener(queues = "directQueue2")public void fanoutListener2(String message) { System.out.println("路由模式监听器2:" + message);}
- 通配符模式
- 通配符模式配置文件
//配置队列@Beanpublic Queue topicQueue1() { return new Queue("topicQueue1");}@Beanpublic Queue topicQueue2() { return new Queue("topicQueue2");}//配置交换器@Beanpublic TopicExchange topicExchange() { return new TopicExchange("topicExchange");}//配置绑定@Beanpublic Binding topicBinding1(Queue topicQueue1, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue1).to(topicExchange).with("topic.*");}@Beanpublic Binding topicBinding2(Queue topicQueue2, TopicExchange topicExchange) { return BindingBuilder.bind(topicQueue2).to(topicExchange).with("topic.#");}
- 通配符模式生产者
/* * 通配符模式测试 * */@Testpublic void topicProduct() { rabbitTemplate.convertAndSend("topicExchange","topic.one", "routkey为topic.one的消息"); rabbitTemplate.convertAndSend("topicExchange","topic.one.two", "routkey为topic.one.two的消息");}
- 通配符模式消费者
@RabbitListener(queues = "topicQueue1")public void fanoutListener1(String message) { System.out.println("通配符监听器1:" + message);}@RabbitListener(queues = "topicQueue2")public void fanoutListener2(String message) { System.out.println("通配符监听器2:" + message);}
## 总结
以上就是SpringBoot+RabbitMQ五大模式的简单使用实例,到目前为止RabbitMQ也是Sping AMQP的唯一实现。感谢支持,你的支持是我前进的动力!!!
版权归原作者 方昭暮 所有, 如有侵权,请联系我们删除。