A.集成
一:添加依赖
在pom.xml文件中添加spring-boot-starter-amqp依赖,以便使用Spring Boot提供的RabbitMQ支持:
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
二:配置RabbitMQ连接信息
rabbitmq:host: 13X.9.1XX.7X
port:5672#通过控制台可以查看 记得开启这个端口的防护username: admin
password: admin
三:创建队列
importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfig{@BeanpublicQueuequeue(){//name,名字;durable,是否开启持久化returnnewQueue("logs",false);}}
启动就可以得到下队列
四:创建控制类来生产数据
importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RestController;@RestControllerpublicclassRabbitMQController{privatestaticfinalLogger logger =LoggerFactory.getLogger(RabbitMQController.class);@AutowiredprivateRabbitTemplate rabbitTemplate;@GetMapping("aaa")publicvoidsimpleTest(){
logger.info("RabbitMQController开始!");
rabbitTemplate.convertAndSend("logs","hello world!");
logger.info("RabbitMQController结束!");}}
因为只创建了生产,消费者没有创建,所以在RabbitMQ客户端可以查看,然后点击,消费可得数据
五:创建消费者,获取数据
importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassConsumeBean{privatestaticfinalLogger logger =LoggerFactory.getLogger(ConsumeBean.class);@RabbitListener(queues={"logs"})publicvoidgetMsg(String message){
logger.info("消费者:{}",message);}}
这样就可以看出,消息自动就被接收,消费掉了
B.消息传递的开放标准协议(AMQP)
AMQP(Advanced Message Queuing Protocol)它定义了一种抽象的消息传递模型,包括以下几个主要组件:
消息
(Message):AMQP中的基本单位,是要在消息队列系统中传递的数据。消息通常包括消息体和消息头,消息体是实际要传递的数据,而消息头包含元数据信息,如消息的路由键、优先级等。
生产者
(Producer):负责创建并发送消息到消息队列中的实体。生产者将消息发布到交换机(Exchange),交换机根据路由规则将消息路由到一个或多个队列中。
消费者
(Consumer):从消息队列中接收并处理消息的实体。消费者订阅一个或多个队列,并在有消息到达时接收并处理它们。
交换机
(Exchange):用于接收生产者发送的消息,并根据路由规则将消息路由到一个或多个队列中。AMQP定义了不同类型的交换机,如直连交换机(Direct Exchange)、主题交换机(Topic Exchange)、扇出交换机(Fanout Exchange)等。
队列
(Queue):存储消息的容器,消费者从队列中获取消息进行处理。消息可以被一个或多个消费者订阅,但每条消息只会被一个消费者接收。
绑定
(Binding):用于将交换机和队列之间建立关联关系的规则。绑定定义了消息如何从交换机路由到队列,通常包括交换机名称、路由键等信息。
连接
(Connection):生产者和消费者与消息代理(如RabbitMQ)之间建立的网络连接。连接是长期的、持久的,用于传输消息和管理通信。
通过这些抽象组件,AMQP定义了一个灵活且可扩展的消息传递模型,使得不同的消息队列系统可以遵循相同的协议进行通信和交互。这种抽象模型使得开发者可以更容易地实现消息传递系统,并实现消息的可靠传递和处理。
六大模式
1.简单队列 一个生产者一个队列一个消费者
2.工作队列 一个生产者一个队列多个消费者
3.订阅模式 一个生产者一个交换机 多个队列多个消费者(对与消一对一)
4.路由模式 一个生产者一个交换机 分类进入队列 多个队列多个消费者(对与消一对一)
5.主题模式(通配符模式) 一个生产者一个交换机 通配符分类进入队列 多个队列多个消费者(对与消一对一)
6.RPC 是一种实现远程过程调用的方式,允许客户端应用程序调用远程服务器上的服务,并等待服务端返回结果。
1.简单队列
创建生产者(Producer):
importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@ComponentpublicclassMessageProducer{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsendMessage(String message){
rabbitTemplate.convertAndSend("queueName", message);}}//创建消费者importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassMessageConsumer{@RabbitListener(queues ="queueName")publicvoidreceiveMessage(String message){System.out.println("Received message: "+ message);}}//队列配置importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitConfig{@BeanpublicQueuequeue1(){returnnewQueue("queueName");}}
2.工作队列
//队列配置importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitConfig2{@BeanpublicQueuetaskQueue(){returnnewQueue("taskQueue");}}//创建生产者importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@ComponentpublicclassTaskProducer{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsendTask(String task){
rabbitTemplate.convertAndSend("taskQueue", task);}}//创建消费者importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassTaskConsumer{@RabbitListener(queues ="taskQueue")publicvoidprocessTask(String task){System.out.println("Processing task: "+ task);// Simulate task processingtry{Thread.sleep(1000);// Simulate task processing time}catch(InterruptedException e){Thread.currentThread().interrupt();}System.out.println("Task processed: "+ task);}}
3.订阅模式
//创建生产者(Producer)importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@ComponentpublicclassMessageProducer3{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsendMessage(String message){
rabbitTemplate.convertAndSend("fanoutExchange","", message);}}//创建消费者(Consumer)importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassMessageConsumerA{@RabbitListener(queues ="queueFanout1")publicvoidreceiveMessage(String message){System.out.println("Consumer 1 received message: "+ message);}}@ComponentpublicclassMessageConsumerB{@RabbitListener(queues ="queueFanout2")publicvoidreceiveMessage(String message){System.out.println("Consumer 2 received message: "+ message);}}//配置交换机和队列importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitConfig3{@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("fanoutExchange");}@BeanpublicQueuequeueFanout1(){returnnewQueue("queueFanout1");}@BeanpublicQueuequeueFanout2(){returnnewQueue("queueFanout2");}@BeanpublicBindingbinding1(){returnBindingBuilder.bind(queueFanout1()).to(fanoutExchange());}@BeanpublicBindingbinding2(){returnBindingBuilder.bind(queueFanout2()).to(fanoutExchange());}}
4.路由模式
//创建生产者(Producer)importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@ComponentpublicclassMessageProducer4{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsendMessage(String message,String routingKey){
rabbitTemplate.convertAndSend("directExchange", routingKey, message);}}//创建消费者(Consumer)importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassMessageConsumerly1{@RabbitListener(queues ="queueDirect1")publicvoidreceiveMessage(String message){System.out.println("Consumer 1 received message: "+ message);}}@ComponentpublicclassMessageConsumerly2{@RabbitListener(queues ="queueDirect2")publicvoidreceiveMessage(String message){System.out.println("Consumer 2 received message: "+ message);}}//配置交换机和队列importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitConfig4{@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange("directExchange");}@BeanpublicQueuequeueDirect1(){returnnewQueue("queueDirect1");}@BeanpublicQueuequeueDirect2(){returnnewQueue("queueDirect2");}@BeanpublicBindingbindingDirect1(){returnBindingBuilder.bind(queueDirect1()).to(directExchange()).with("routingDirectKey1");}@BeanpublicBindingbindingDirect2(){returnBindingBuilder.bind(queueDirect2()).to(directExchange()).with("routingDirectKey2");}}
5.主题模式
//创建生产者(Producer)importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@ComponentpublicclassMessageProducer5{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsendMessage(String message,String routingKey){
rabbitTemplate.convertAndSend("topicExchange", routingKey, message);}}//创建消费者(Consumer)importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassMessageConsumer5{@RabbitListener(queues ="queueTopic5")publicvoidreceiveMessage(String message){System.out.println("Received message: "+ message);}}//配置交换机和队列importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.TopicExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitConfig5{@BeanpublicTopicExchangetopicExchange(){returnnewTopicExchange("topicExchange");}@BeanpublicQueuequeueTopic5(){returnnewQueue("queueTopic5");}@BeanpublicBindingbindingTopic5(){returnBindingBuilder.bind(queueTopic5()).to(topicExchange()).with("topic.*");}}
6.RPC模式
//创建RPC客户端(Client)importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;@ComponentpublicclassRpcClient{@AutowiredprivateRabbitTemplate rabbitTemplate;publicStringsendMessageAndReceiveResponse(String message){return(String) rabbitTemplate.convertSendAndReceive("rpcExchange","rpcQueue", message);}}//创建RPC服务端(Server)importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;@ComponentpublicclassRpcServer{@RabbitListener(queues ="rpcQueue")publicStringprocessMessage(String message){// Perform some processing based on the messagereturn"Processed: "+ message;}}//配置交换机和队列importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitConfig6{@BeanpublicDirectExchangerpcExchange(){returnnewDirectExchange("rpcExchange");}@BeanpublicQueuerpcQueue(){returnnewQueue("rpcQueue");}@BeanpublicBindingrpcBinding(){returnBindingBuilder.bind(rpcQueue()).to(rpcExchange()).with("rpcQueue");}}
版权归原作者 秃头佛爷 所有, 如有侵权,请联系我们删除。