Spring+RabbitMQ的相关配置
提示:消息中间件rabbitmq的使用
文章目录
前言
本文主要介绍springboot整合rabbitmq的使用。
一、RabbitMQ是什么?
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件。
特性:
可伸缩性:集群服务
消息持久化:从内存持久化消息到硬盘,再从硬盘加载到内存
可靠性:使用了一些机制来保证可靠性,比如持久化、生产确认、消费确认机制等。
灵活的路由:在消息进入队列之前,通过exchange来路由消息,对于典型的路由功能,RabbitMQ已经提供了一些内置的exchange来实现,针对更复杂的路由功能,可以将多个exchange绑定在一起,可以能通过插件机制来自己实现exchange。
消息集群:多个server组成一个集群。
高可用:队列可以在集群中的机器上进行镜像,部分节点出问题的情况下队列仍然可用。
多种协议:支持多种消息队列协议,如STOMP、MQTT等。
多语言客户端:几乎支持所有的常用语言,比如Java、ruby等。
管理界面:提供了易用的用户界面,用户可以监控和管理消息。
跟踪机制:如果消息异常,RabbitMQ提供了消息的跟踪机制,使用者可以找出发生了什么。
插件机制:提供了很多插件,也可以自己实现插件。
核心概念
1、生产者
产生数据,发送消息的程序是生产者。
2、消费者
消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。 请注意生产者,消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。
3、队列
队列是 RabbitMQ 内部使用的一种数据结构, 尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式。
4、交换机
交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。
二、使用步骤
1.导入pom依赖
依赖如下:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.yml配置
配置如下:
spring:rabbitmq:host: localhost
port:5672username: guest
password: guest
三、FanRabbitMqConfig配置
packagecom.example.demo.config;importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONObject;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassFanRabbitMqConfig{// 创建消息队列@BeanpublicQueuecig1KQueen(){// 消息过期 特殊的args// args参数可自行查看官方文档进行使用Map<String,Object> args =newHashMap<>(16);
args.put("x-message-ttl",10000);// 设置队列可以存储的最大消息数量
args.put("x-max-length",10);returnnewQueue("test_queue",false,true,false, args);}// 创建交换机@BeanFanoutExchangeoneKExchange(){returnnewFanoutExchange("test");}// 消息队列绑定交换机@BeanBindingbindCigSingle(){returnBindingBuilder.bind(cig1KQueen()).to(oneKExchange());}@BeanpublicQueuecig10KQueen(){// 消息过期 特殊的argsMap<String,Object> args =newHashMap<>(16);
args.put("x-message-ttl",10000);// 设置队列可以存储的最大消息数量
args.put("x-max-length",10);returnnewQueue("test_queue2",false,true,false, args);}@BeanFanoutExchangetenKExchange(){returnnewFanoutExchange("test2");}@BeanBindingbindCig10K(){returnBindingBuilder.bind(cig10KQueen()).to(tenKExchange());}}
四、定义消息推送方法
packagecom.example.demo.config;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.AmqpTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importjava.time.LocalDateTime;/**
* @author lhp
* @version 1.0.0
* @Description 实体类描述
* @date 2022/7/29 17:05
*/@Component@Slf4jpublicclassSendMessage{@AutowiredprivateAmqpTemplate rabbitTemplate;/**
* 消息推送方法
* @param message
*/publicvoidsendMessage(String message){
log.info("message:"+message);
rabbitTemplate.convertAndSend("test_queue",message);
log.info("队列发送成功");}}
调用该类里面的sendMessage()方法推送消息
五、监听消息队列,处理消息
有多种方式,这里采用最简单的注解实现
@RabbitListener(queues ="test_queue")publicvoidgetMessage(String message)throwsException{System.out.println("1接收到的消息:"+message);// 序列化消息进行写入数据库等操作HashMap object = JSON.parseObject(message,HashMap.class);}
总结
本文主要是针对springboot和rabbitmq的整合,除此之外还有一些其他的消息中间件,如:Kafka、RocketMQ、轻量级消息中间件 MQTT等,感兴趣的朋友可以自行了解。
版权归原作者 佛系小李哥 所有, 如有侵权,请联系我们删除。