文章目录
前言
mq常用于业务解耦、流量削峰和异步通信,rabbitmq是使用范围较广,比较稳定的一款开源产品,接下来我们使用springboot的starter来引入rabbitmq,了解mq的几种使用模式,通过几个简单的案例,让你可以快速地了解到该使用哪种模式来对应业务场景,使用rabbitmq看这一篇就够了,下方附安装链接。
一、引入和配置
1.引入
Spring AMQP高级消息队列协议有两部分组成,spring-amqp是基础抽象,spring-rabbit是RabbitMQ实现。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.配置
配置参考RabbitProperties.java
spring:rabbitmq:host: 192.168.137.192
port:5672username: guest
password: guest
virtualHost: /
二、使用
1.队列
RabbitConfiguration
packagecom.student.rabbit.queue;importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.Bean;importorg.springframework.amqp.core.Queue;/**
* Create by zjg on 2024/3/9
*/@ConfigurationpublicclassRabbitConfiguration{protectedfinalString queueName ="queue";@BeanpublicQueuequeue(){returnnewQueue(this.queueName);}}
Producer
packagerabbit.queue;importcom.student.SpringbootStart;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;importjava.util.concurrent.atomic.AtomicInteger;/**
* Create by zjg on 2024/3/9
*/@RunWith(SpringRunner.class)@SpringBootTest(classes =SpringbootStart.class)publicclassProducer{@AutowiredprivateRabbitTemplate template;@AutowiredprivateQueue queue;AtomicInteger count =newAtomicInteger(0);@Testpublicvoidsend(){for(int i =0; i <10; i++){StringBuilder builder =newStringBuilder("Hello");
builder.append(" "+count.incrementAndGet());String message = builder.toString();
template.convertAndSend(queue.getName(), message);System.out.println(" [x] Sent '"+ message +"'");}}}
Consumer
packagecom.student.rabbit.queue;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;/**
* Create by zjg on 2024/3/9
*/@ComponentpublicclassConsumer{privatestaticfinalLogger log =LoggerFactory.getLogger(Consumer.class);protectedfinalString queueName ="queue";@RabbitListener(queues = queueName)publicvoidreceive1(String message){
log.debug("receive1:"+message);}@RabbitListener(queues = queueName)publicvoidreceive2(String message){
log.debug("receive2:"+message);}}
每个队列都消费了5条消息
2.发布/订阅
交换机类型有fanout,direct, topic, headers四种,接下来我们来学习每种方式的使用以及它们的区别。
2.1 fanout(广播)
P(生产者)产生消息给到X(交换机),X分发给绑定的所有队列。
RabbitFanoutConfiguration
我们定义了AnonymousQueue,它创建了一个具有生成名称的非持久、独占、自动删除队列
packagecom.student.rabbit.fanout;importorg.springframework.amqp.core.AnonymousQueue;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
* Create by zjg on 2024/3/10
*/@ConfigurationpublicclassRabbitFanoutConfiguration{@BeanpublicFanoutExchangefanout(){returnnewFanoutExchange("sys.fanout");}privatestaticclassReceiverConfig{@BeanpublicQueuefanoutQueue1(){returnnewAnonymousQueue();}@BeanpublicQueuefanoutQueue2(){returnnewAnonymousQueue();}@BeanpublicBindingbindingFanout1(FanoutExchange fanout,Queue fanoutQueue1){returnBindingBuilder.bind(fanoutQueue1).to(fanout);}@BeanpublicBindingbindingFanout2(FanoutExchange fanout,Queue fanoutQueue2){returnBindingBuilder.bind(fanoutQueue2).to(fanout);}}}
FanoutProducer
packagerabbit.fanout;importcom.student.SpringbootStart;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;importjava.util.concurrent.atomic.AtomicInteger;/**
* Create by zjg on 2024/3/10
*/@RunWith(SpringRunner.class)@SpringBootTest(classes =SpringbootStart.class)publicclassFanoutProducer{@AutowiredprivateRabbitTemplate template;@AutowiredprivateFanoutExchange fanout;@Testpublicvoidsend(){AtomicInteger count =newAtomicInteger(0);for(int i =0; i <10; i++){StringBuilder builder =newStringBuilder("Hello");
builder.append(" "+count.incrementAndGet());String message = builder.toString();
template.convertAndSend(fanout.getName(),"", message);System.out.println(" [x] Sent '"+ message +"'");}}}
FanoutConsumer
packagecom.student.rabbit.fanout;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;/**
* Create by zjg on 2024/3/10
*/@ComponentpublicclassFanoutConsumer{privatestaticfinalLogger log =LoggerFactory.getLogger(FanoutConsumer.class);@RabbitListener(queues ="#{fanoutQueue1.name}")publicvoidreceive1(String message){
log.debug("receive1:"+message);}@RabbitListener(queues ="#{fanoutQueue2.name}")publicvoidreceive2(String message){
log.debug("receive2:"+message);}}
总共发送10条消息,每个队列都消费了10条
2.2 direct(Routing/路由)
可以将根据不同的路由规则分发消息,很灵活,消费者需要哪种就订阅哪种消息。
RabbitDirectConfiguration
packagecom.student.rabbit.direct;importorg.springframework.amqp.core.AnonymousQueue;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
* Create by zjg on 2024/3/10
*/@ConfigurationpublicclassRabbitDirectConfiguration{@BeanpublicDirectExchangedirect(){returnnewDirectExchange("sys.direct");}privatestaticclassReceiverConfig{@BeanpublicQueuedirectQueue1(){returnnewAnonymousQueue();}@BeanpublicQueuedirectQueue2(){returnnewAnonymousQueue();}@BeanpublicBindingbindingDirect1a(DirectExchange direct,Queue directQueue1){returnBindingBuilder.bind(directQueue1).to(direct).with("orange");}@BeanpublicBindingbindingDirect1b(DirectExchange direct,Queue directQueue1){returnBindingBuilder.bind(directQueue1).to(direct).with("black");}@BeanpublicBindingbindingDirect2a(DirectExchange direct,Queue directQueue2){returnBindingBuilder.bind(directQueue2).to(direct).with("green");}@BeanpublicBindingbindingDirect2b(DirectExchange direct,Queue directQueue2){returnBindingBuilder.bind(directQueue2).to(direct).with("black");}}}
DirectProducer
packagerabbit.direct;importcom.student.SpringbootStart;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;importjava.util.concurrent.atomic.AtomicInteger;/**
* Create by zjg on 2024/3/10
*/@RunWith(SpringRunner.class)@SpringBootTest(classes =SpringbootStart.class)publicclassDirectProducer{@AutowiredprivateRabbitTemplate template;@AutowiredprivateDirectExchange direct;privatefinalString[] keys ={"orange","black","green"};@Testpublicvoidsend(){AtomicInteger count =newAtomicInteger(0);for(int i =0; i < keys.length; i++){StringBuilder builder =newStringBuilder("Hello to ");String key = keys[count.getAndIncrement()];
builder.append(" "+key);String message = builder.toString();
template.convertAndSend(direct.getName(), key, message);System.out.println(" [x] Sent '"+ message +"'");}}}
DirectConsumer
packagecom.student.rabbit.direct;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;/**
* Create by zjg on 2024/3/10
*/@ComponentpublicclassDirectConsumer{privatestaticfinalLogger log =LoggerFactory.getLogger(DirectConsumer.class);@RabbitListener(queues ="#{directQueue1.name}")publicvoidreceive1(String message){
log.debug("receive1:"+message);}@RabbitListener(queues ="#{directQueue2.name}")publicvoidreceive2(String message){
log.debug("receive2:"+message);}}
共发送了3条消息,有两个队列都绑定了black,所以black的消息消费2次
2.3 Topics(主题)
主题模式在路由的基础上增加了routingKey的模糊匹配。
*(星)可以代替一个词。
#(hash)可以代替零个或多个单词。
RabbitTopicConfiguration
packagecom.student.rabbit.topic;importorg.springframework.amqp.core.AnonymousQueue;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.TopicExchange;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
* Create by zjg on 2024/3/10
*/@ConfigurationpublicclassRabbitTopicConfiguration{@BeanpublicTopicExchangetopic(){returnnewTopicExchange("sys.topic");}privatestaticclassReceiverConfig{@BeanpublicQueuetopicQueue1(){returnnewAnonymousQueue();}@BeanpublicQueuetopicQueue2(){returnnewAnonymousQueue();}@BeanpublicBindingbindingTopic1a(TopicExchange topic,Queue topicQueue1){returnBindingBuilder.bind(topicQueue1).to(topic).with("*.orange.*");}@BeanpublicBindingbindingTopic1b(TopicExchange topic,Queue topicQueue1){returnBindingBuilder.bind(topicQueue1).to(topic).with("*.*.rabbit");}@BeanpublicBindingbindingTopic2a(TopicExchange topic,Queue topicQueue2){returnBindingBuilder.bind(topicQueue2).to(topic).with("lazy.#");}@BeanpublicBindingbindingTopic2b(TopicExchange topic,Queue topicQueue2){returnBindingBuilder.bind(topicQueue2).to(topic).with("quick.brown.*");}}}
TopicProducer
packagerabbit.topic;importcom.student.SpringbootStart;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.amqp.core.TopicExchange;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;importjava.util.concurrent.atomic.AtomicInteger;/**
* Create by zjg on 2024/3/10
*/@RunWith(SpringRunner.class)@SpringBootTest(classes =SpringbootStart.class)publicclassTopicProducer{@AutowiredprivateRabbitTemplate template;@AutowiredprivateTopicExchange topic;privatefinalString[] keys ={"quick.orange.rabbit","lazy.orange.elephant","quick.orange.fox","lazy.brown.fox","lazy.pink.rabbit","quick.brown.fox"};@Testpublicvoidsend(){AtomicInteger count =newAtomicInteger(0);for(int i =0; i < keys.length; i++){StringBuilder builder =newStringBuilder("Hello to ");String key = keys[count.getAndIncrement()];
builder.append(" "+key);String message = builder.toString();
template.convertAndSend(topic.getName(), key, message);System.out.println(" [x] Sent '"+ message +"'");}}}
TopicConsumer
packagecom.student.rabbit.topic;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;/**
* Create by zjg on 2024/3/10
*/@ComponentpublicclassTopicConsumer{privatestaticfinalLogger log =LoggerFactory.getLogger(TopicConsumer.class);@RabbitListener(queues ="#{topicQueue1.name}")publicvoidreceive1(String message){
log.debug("receive1:"+message);}@RabbitListener(queues ="#{topicQueue2.name}")publicvoidreceive2(String message){
log.debug("receive2:"+message);}}
队列1匹配了中间值为orange和rabbit结尾的消息,队列2匹配了lazy开头和quick.brown开头的消息
2.4 Headers
关于headers模式,在官方没有找到文档,但包里还有,索性还是写一下吧。
RabbitHeadersConfiguration
packagecom.student.rabbit.headers;importorg.springframework.amqp.core.AnonymousQueue;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.HeadersExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;/**
* Create by zjg on 2024/3/10
*/@ConfigurationpublicclassRabbitHeadersConfiguration{@BeanpublicHeadersExchangeheaders(){returnnewHeadersExchange("sys.headers");}privatestaticclassReceiverConfig{@BeanpublicQueueheadersQueue1(){returnnewAnonymousQueue();}@BeanpublicQueueheadersQueue2(){returnnewAnonymousQueue();}@BeanpublicQueueheadersQueue3(){returnnewAnonymousQueue();}@BeanpublicBindingbindingHeaders1(HeadersExchange headers,Queue headersQueue1){Map<String,Object> headerValue=newHashMap<>();
headerValue.put("user","sys");returnBindingBuilder.bind(headersQueue1).to(headers).whereAll(headerValue).match();}@BeanpublicBindingbindingHeaders2(HeadersExchange headers,Queue headersQueue2){Map<String,Object> headerValue=newHashMap<>();
headerValue.put("user","admin");returnBindingBuilder.bind(headersQueue2).to(headers).whereAll(headerValue).match();}@BeanpublicBindingbindingHeaders3(HeadersExchange headers,Queue headersQueue3){returnBindingBuilder.bind(headersQueue3).to(headers).where("user").exists();}}}
HeadersProducer
packagerabbit.headers;importcom.student.SpringbootStart;importorg.junit.Test;importorg.junit.runner.RunWith;importorg.springframework.amqp.core.HeadersExchange;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessageBuilder;importorg.springframework.amqp.core.MessageProperties;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.test.context.junit4.SpringRunner;importjava.util.concurrent.atomic.AtomicInteger;/**
* Create by zjg on 2024/3/10
*/@RunWith(SpringRunner.class)@SpringBootTest(classes =SpringbootStart.class)publicclassHeadersProducer{@AutowiredprivateRabbitTemplate template;@AutowiredprivateHeadersExchange headers;privatefinalString[] keys ={"sys","admin"};@Testpublicvoidsend(){AtomicInteger count =newAtomicInteger(0);for(int i =0; i < keys.length; i++){StringBuilder builder =newStringBuilder("Hello to ");String key = keys[count.getAndIncrement()];
builder.append(" "+key);MessageProperties messageProperties=newMessageProperties();
messageProperties.setHeader("user",key);Message message =MessageBuilder.withBody(builder.toString().getBytes()).andProperties(messageProperties).build();
template.send(headers.getName(),"", message);System.out.println(" [x] Sent '"+ message +"'");}}}
HeadersConsumer
packagecom.student.rabbit.headers;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;/**
* Create by zjg on 2024/3/10
*/@ComponentpublicclassHeadersConsumer{privatestaticfinalLogger log =LoggerFactory.getLogger(HeadersConsumer.class);@RabbitListener(queues ="#{headersQueue1.name}")publicvoidreceive1(Message message){
log.debug("receive1:"+newString(message.getBody()));}@RabbitListener(queues ="#{headersQueue2.name}")publicvoidreceive2(Message message){
log.debug("receive2:"+newString(message.getBody()));}@RabbitListener(queues ="#{headersQueue3.name}")publicvoidreceive3(Message message){
log.debug("receive3:"+newString(message.getBody()));}}
第一个队列接收sys消息,第二个队列接收admin消息,第三个队列只要包含user头的消息都接收。
总结
版权归原作者 你知道“铁甲小宝”吗丶 所有, 如有侵权,请联系我们删除。