Springboot整合RabbitMQ
文章目录
1.pom依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.30</version></dependency></dependencies>
2.yml配置
#配置使用的配置文件spring:#配置rabbitmqrabbitmq:host: 127.0.0.1 #主机地址port:5672#端口号username: xxx #用户名password: xxx #密码virtual-host: my_vhost #虚拟主机地址#开启消息送达提示publisher-returns:true# springboot.rabbitmq.publisher-confirm 新版本已被弃用,现在使用 spring.rabbitmq.publisher-confirm-type = correlated 实现相同效果publisher-confirm-type: correlated
listener:#消息监听配置type: simple
simple:acknowledge-mode: manual #manual手动确认消息 auto没有异常时 进行自动确认 (异常类型 消息重新入队)prefetch:1#限制每次发送一条数据。concurrency:3#同一个队列启动几个消费者max-concurrency:3#启动消费者最大数量#重试策略相关配置retry:# 开启消费者(程序出现异常)重试机制,默认开启并一直重试enabled:true# 最大重试次数max-attempts:5# 重试间隔时间(毫秒)initial-interval:3000server:port:18082address: 127.0.0.1
servlet:context-path: /
3.配置队列、交换机
方式一:直接通过配置类配置bean
推送消息时不存在创建队列和交换机
/**
* direct模式声明配置
*/@ConfigurationpublicclassRabbitDirectConfig{publicstaticfinalString EXCHANGE_NAME="direct-exchange";publicstaticfinalString QUEUE_NAME="direct-queue";publicstaticfinalString BINDING_KEY="change:direct";/**
* 声明直连交换机
* name:交换机的名称
* durable 队列是否持久化
* autoDelete:是否自动删除,(当该交换机上绑定的最后一个队列解除绑定后,该交换机自动删除)
* argument:其他一些参数
*/@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange(EXCHANGE_NAME,false,false,null);}/**
* 声明队列
* queue 队列的名称
* durable 队列是否持久化
* exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
*/@BeanpublicQueuedirectQueue(){returnnewQueue(QUEUE_NAME,false,false,false,null);}/**
* 交换机队列绑定
*/@BeanpublicBindingspringExchangeBindSpringQueue(){returnBindingBuilder.bind(directQueue()).to(directExchange()).with(BINDING_KEY);}}
方式二:消息监听通过注解配置
启动时创建队列和交换机
@RabbitListener(
bindings =@QueueBinding(
value =@Queue(value ="direct1-queue",durable ="true"),
exchange =@Exchange(value ="direct1-exchange",type =ExchangeTypes.DIRECT,durable ="true"),
key ="change1:direct"))
注意:rabbitmq同名的队列只能创建一个,创建多个会报错,推送消息时需确保队列和交换机已存在,
方式一队列和交换机在第一次推送消息时才会自动创建队列和交换机,方式二注解在启动时就会创建
4.编写消息监听发送测试
监听
@Slf4j@ComponentpublicclassRabbitMQListener{@RabbitListener(queues ="direct-queue")@RabbitHandlerpublicvoidbootMsg(Channel channel,Message message){String message1 =newString(message.getBody(),StandardCharsets.UTF_8);System.out.println(" direct 消费者:'"+ message1 +"'");//手动确认该消息try{//消息确认,根据消息序号(false只确认当前一个消息收到,true确认所有比当前序号小的消息(成功消费,消息从队列中删除 ))
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(IOException e){
log.error("执行异常",e);// 拒绝消息并重新入队
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}@RabbitListener(
bindings =@QueueBinding(
value =@Queue(value ="direct1-queue",durable ="true"),
exchange =@Exchange(value ="direct1-exchange",type =ExchangeTypes.DIRECT,durable ="true"),
key ="change1:direct"))@RabbitHandlerpublicvoidbootMsg1(Channel channel,Message message){String message1 =newString(message.getBody(),StandardCharsets.UTF_8);System.out.println(" direct 消费者:'"+ message1 +"'");//手动确认该消息try{//消息确认,根据消息序号(false只确认当前一个消息收到,true确认所有比当前序号小的消息(成功消费,消息从队列中删除 ))
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(IOException e){
log.error("执行异常",e);// 拒绝消息并重新入队
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);}}}
测试
@Slf4j@SpringBootTest(classes =RabbitProviderApplication.class)publicclassRabbitTest{@AutowiredprivateAmqpTemplate amqpTemplate;@TestpublicvoiddirectProvider(){String message ="direct模式消息推送。。。。。";/**
* 参数分别为,交换机,路由key,消息体
*/
amqpTemplate.convertAndSend("direct-exchange","change:direct",message);System.out.println(" 消息发送 :'"+message +"'");}@TestpublicvoiddirectProvider1(){String message ="direct模式消息推送1。。。。。";/**
* 参数分别为,交换机,路由key,消息体
*/
amqpTemplate.convertAndSend("direct1-exchange","change1:direct",message);System.out.println(" 消息发送1 :'"+message +"'");}}
5.其他类型交换机配置
1.FanoutExchange
/**
* fanout模式声明配置
*/@ConfigurationpublicclassRabbitFanoutConfig{publicstaticfinalString EXCHANGE_NAME="fanout-exchange";publicstaticfinalString QUEUE_NAME1="fanout-queue1";publicstaticfinalString QUEUE_NAME2="fanout-queue2";/**
* 声明交换机
*/@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange(EXCHANGE_NAME,false,false,null);}/**
* 声明队列
*/@BeanpublicQueuefanoutQueue1(){returnnewQueue(QUEUE_NAME1,false,false,false,null);}@BeanpublicQueuefanoutQueue2(){returnnewQueue(QUEUE_NAME2,false,false,false,null);}/**
* 交换机队列绑定
*/@BeanpublicBindingspringExchangeBindQueue1(){returnBindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());}/**
* 交换机队列绑定
*/@BeanpublicBindingspringExchangeBindQueue2(){returnBindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}}
监听
@RabbitListener(queues ="fanout-queue1")publicvoidfanoutMsg1(Channel channel,Message message){String message1 =newString(message.getBody(),StandardCharsets.UTF_8);System.out.println(" fanout-queue1 消费者:'"+ message1 +"'");}@RabbitListener(queues ="fanout-queue2")publicvoidfanoutMsg2(Channel channel,Message message){String message1 =newString(message.getBody(),StandardCharsets.UTF_8);System.out.println(" fanout-queue2 消费者:'"+ message1 +"'");}
测试
@TestpublicvoidfanoutProvider(){String message ="fanout模式消息推送。。。。。";
amqpTemplate.convertAndSend("fanout-exchange","",message);System.out.println(" 消息发送 :'"+message +"'");}
2.TopicExchange
/**
* topic模式声明配置
*/@ConfigurationpublicclassRabbitTopicConfig{publicstaticfinalString EXCHANGE_NAME="topic-exchange";publicstaticfinalString QUEUE_NAME="topic-queue";publicstaticfinalString BINDING_KEY="*.orange.#";/**
* 声明交换机
*/@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange(EXCHANGE_NAME,false,false,null);}/**
* 声明队列
*/@BeanpublicQueuetopicQueue(){returnnewQueue(QUEUE_NAME,false,false,false,null);}/**
* 交换机队列绑定
*/@BeanpublicBindingtopicExchangeBindQueue(){returnBindingBuilder.bind(topicQueue()).to(topicExchange()).with(BINDING_KEY);}}
@RabbitListener(queues ="topic-queue")publicvoidtopicMsg2(Channel channel,Message message){String message1 =newString(message.getBody(),StandardCharsets.UTF_8);System.out.println(" topic-queue2 消费者:'"+ message1 +"'");}
测试
@TestpublicvoidtopicProvider(){String message1 ="topic test模式消息推送。。。。。";String message2 ="topic test.aaa模式消息推送。。。。。";
amqpTemplate.convertAndSend("topic-exchange","com.orange.test",message1);
amqpTemplate.convertAndSend("topic-exchange","com.orange.test.aaa",message2);System.out.println(" 消息发送");}
3.HeadersExchange
/**
* headers模式声明配置
* 与路由key无关,只需要消息的头参数匹配即可
* x-match参数代表是全部匹配还是部分匹配
*/@ConfigurationpublicclassRabbitHeadersConfig{publicstaticfinalString EXCHANGE_NAME="headers-exchange";publicstaticfinalString QUEUE_NAME="headers-queue";publicstaticfinalString QUEUE_NAME1="headers-queue1";/**
* 声明交换机
*/@BeanpublicHeadersExchangeheadersExchange(){returnnewHeadersExchange(EXCHANGE_NAME,false,false,null);}/**
* 声明队列
*/@BeanpublicQueueheadersQueue(){returnnewQueue(QUEUE_NAME,false,false,false,null);}@BeanpublicQueueheadersQueue2(){returnnewQueue(QUEUE_NAME1,false,false,false,null);}/**
* 交换机队列绑定(任意匹配)
* whereAny 等同于x-match = any
*/@BeanpublicBindingheadersExchangeBindSpringQueue(){HashMap<String,Object> header =newHashMap<>();
header.put("test","111");
header.put("test1","222");returnBindingBuilder.bind(headersQueue()).to(headersExchange()).whereAny(header).match();}/**
* 交换机队列绑定(全部匹配)
* whereAny 等同于x-match = all
*/@BeanpublicBindingheadersExchangeBindSpringQueue1(){HashMap<String,Object> header =newHashMap<>();
header.put("test","111");
header.put("test1","222");returnBindingBuilder.bind(headersQueue2()).to(headersExchange()).whereAll(header).match();}}
发送测试
@TestpublicvoidheaderProvider(){String param ="headers 模式消息推送。。。。。";MessageProperties messageProperties =newMessageProperties();
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding("utf-8");
messageProperties.setHeader("test","111");Message message =newMessage(param.getBytes(), messageProperties);
amqpTemplate.convertAndSend("headers-exchange",null,message);System.out.println(" 消息发送");}
队列queue任意匹配有数据,queue1全部匹配无数据
headers-queue
headers-queue1
消息监听
@RabbitListener(queues ="headers-queue")publicvoidheadersMsg2(Channel channel,Message message){String message1 =newString(message.getBody(),StandardCharsets.UTF_8);System.out.println(" headers-queue 消费者:'"+ message1 +"'");}@RabbitListener(queues ="headers-queue1")publicvoidheaders1Msg2(Channel channel,Message message){String message1 =newString(message.getBody(),StandardCharsets.UTF_8);System.out.println(" headers-queue1 消费者:'"+ message1 +"'");}
6.延迟消息处理(TTL)
- 第一种是使用普通队列和死信队列来模拟实现延迟的效果。将消息放入一个没有被监听的队列上,
设置TTL(一条消息的最大存活时间)为延迟的时间
,时间到了没有被消费,直接成为死信,进入死信队列。后监听私信队列来消息消费 - 第二种是使用rabbitmq官方提供的
delayed插件
来真正实现延迟队列。
方式一:ttl配置
超时自动删除
/**
* rabbitmq的ttl延迟过期时间配置
*/@ConfigurationpublicclassRabbitMQTTLConfig{/**
* 声明交换机
* @return
*/@BeanpublicDirectExchangettlDirectExchange(){returnnewDirectExchange("ttl-direct-exchange");}/**
* 声明队列
* @return
*/@BeanpublicQueuettlQueue(){//设置参数Map<String,Object> args =newHashMap<>();//设置ttl过期时间,需设置int值
args.put("x-message-ttl",5000);returnnewQueue("ttl-direct-queue",true,false,false,args);}/**
* 绑定队列
* @return
*/@BeanpublicBindingttlBingQueue(){returnBindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("direct:ttl:key");}}
测试
@TestpublicvoidttlSendMessageTest(){String exchange ="ttl-direct-exchange";String routingKey ="direct:ttl:key";String msg = UUID.randomUUID().toString();//发送并设置
amqpTemplate.convertAndSend(exchange,routingKey,msg);System.out.println("消息发送成功====="+msg);}
方式二:消息发送设置
注释掉
x-message-ttl
参数,使用普通队列,发送消息时设置过期时间
@TestpublicvoidttlSendMessageTest(){String exchange ="ttl-direct-exchange";String routingKey ="direct:ttl:key";String msg = UUID.randomUUID().toString();//设置过期时间MessagePostProcessor messagePostProcessor =newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");return message;}};//发送并设置
amqpTemplate.convertAndSend(exchange,routingKey,msg,messagePostProcessor);System.out.println("消息发送成功====="+msg);}
**注意:如果项目中即使用了ttl配置过期时间,有设置了消息过期时间,则执行时以最小的时间为准,ttl过期队列的消息过期会写到死信,而
设置方式的普通队列则不会自动写到死信队列
**
7.死信队列
**死信的情况:
消息被拒绝,消息过期,队列达到最大长度
**
死信队列声明
@ConfigurationpublicclassRabbitMQDLXConfig{/**
* 声明死信交换机
* @return
*/@BeanpublicDirectExchangedlxDirectExchange(){returnnewDirectExchange("dlx-direct-exchange");}/**
* 声明死信队列
* @return
*/@BeanpublicQueuedlxQueue(){;returnnewQueue("dlx-direct-queue",true);}/**
* 绑定队列
* @return
*/@BeanpublicBindingdlxBingQueue(){returnBindingBuilder.bind(dlxQueue()).to(dlxDirectExchange()).with("direct:dlx:key");}}
过期推送到死信设置
/**
* 声明ttl队列
* @return
*/@BeanpublicQueuettlQueue(){//设置参数Map<String,Object> args =newHashMap<>();//设置ttl过期时间,需设置int值
args.put("x-message-ttl",5000);
args.put("x-max-length",5);//最大长度//消息过期死信队列入队配置
args.put("x-dead-letter-exchange","dlx-direct-exchange");//设置死信交换机
args.put("x-dead-letter-routing-key","direct:dlx:key");//死信路由key,fanout模式不需要设置路由keyreturnnewQueue("ttl-direct-queue",true,false,false,args);}
注意:队列参数修改后,不会重新创建覆盖而是会报错,需要手动删除重新创建,生产环境中则可以通过重新创建一个队列,进行转移
测试
消息过期进死信队列
版权归原作者 陌殇殇 所有, 如有侵权,请联系我们删除。