Spring Cloud系列目前已经有了Spring Cloud五大核心组件:分别是,Eureka注册中心,Zuul网关,Hystrix熔断降级,openFeign声明式远程调用,ribbon负载均衡。这五个模块,对了,有没有发现,其实我这五个模块中ribbon好像还没有案例例举,目前只有一个Ribbon模块的搭建,后边我会完善的。
今天我们不主要围绕Spring Cloud的五大组件,本篇会以新的模块进行,完成一个以Rabbit MQ消息队列为核心的模块功能设计。在模块进行之前,我们先了解Spring Cloud 的Stream,这个很重要。
Spring Cloud Steam 是一个可以用来作为微服务应用构建消息驱动能力的框架,他可以基于Spring Boot来进行创建一个独立的,并且可以用来生产的Spring 应用程序。他通过使用Spring Integration(一体化)来链接消息嗲了中间件用以实现消息事件驱动的微服务应用。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动装配的实现,并且熟悉Spring的都知道他有个发布-订阅模式,消费组,以及消息分区核心部分。
Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。通过使用Spring Cloud Stream,可以有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员可以有更多的精力关注于核心业务逻辑的处理。由于Spring Cloud Stream基于Spring Boot实现,所以它秉承了Spring Boot的优点。
- Binder:Binder是Spring Cloud Stream的核心组件之一,它提供了与消息中间件的连接和交互。通过Binder,开发者可以将应用程序与特定的消息中间件进行集成,而无需关注底层的细节。
- 消息通道:Spring Cloud Stream使用消息通道作为应用程序中消息的传输媒介。消息通道可以是发布-订阅模式或点对点模式,开发者可以根据需求选择合适的通道类型。
- 绑定注解:通过使用绑定注解,开发者可以将消息通道与应用程序中的方法进行绑定。例如,@Input注解用于将方法绑定到输入通道,@Output注解用于将方法绑定到输出通道。
- 消息转换:Spring Cloud Stream支持自动的消息转换,使得开发者可以使用不同的数据格式和协议进行消息的传输。它提供了一些内置的消息转换器,同时也支持自定义的消息转换器。
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency>
在应用程序中配置Rabbit MQ的链接信息,在application.yml或者application.properties中添加配置。
spring.cloud.stream.bindings.input.destination=myInputQueue
spring.cloud.stream.bindings.output.destination=myOutputQueue
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
这里的myInputQueue和myOutputQueue是你自定义的队列名称,你可以根据自己的需求进行命名。
接下来,创建一个消息处理器来处理输入和输出的消息。你可以使用
@StreamListener
注解来监听输入消息,并使用
@EnableBinding
注解来绑定输入和输出通道。
importorg.springframework.cloud.stream.annotation.EnableBinding;importorg.springframework.cloud.stream.annotation.StreamListener;importorg.springframework.cloud.stream.messaging.Sink;@EnableBinding(Sink.class)publicclassMessageProcessor{@StreamListener(Sink.INPUT)publicvoidprocessMessage(String message){// 处理输入消息System.out.println("Received message: "+ message);// 处理完后可以发送输出消息// output.send(MessageBuilder.withPayload("Output message").build());}}
@EnableBinding(Sink.class)
将输入通道绑定到Sink,
@StreamListener(Sink.INPUT)
注解用于监听输入消息。你可以在
processMessage
方法中处理输入消息。
如果你想发送输出消息,你可以注入
MessageChannel
并使用它发送消息。例如,你可以在
MessageProcessor
类中注入
MessageChannel
:
importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.cloud.stream.messaging.Source;importorg.springframework.messaging.MessageChannel;importorg.springframework.messaging.support.MessageBuilder;@EnableBinding(Source.class)publicclassMessageProcessor{privatefinalMessageChannel output;@AutowiredpublicMessageProcessor(Source source){this.output = source.output();}@StreamListener(Sink.INPUT)publicvoidprocessMessage(String message){// 处理输入消息System.out.println("Received message: "+ message);// 处理完后发送输出消息
output.send(MessageBuilder.withPayload("Output message").build());}}
这样,当有消息发送到输入通道时,processMessage方法将被调用,并处理输入消息。在处理完输入消息后,它将发送一个输出消息到输出通道。
版权归原作者 Miaow.Y.Hu 所有, 如有侵权,请联系我们删除。