作者:禅与计算机程序设计艺术
1.简介
Kafka是一个分布式消息系统,它可以实现消息的持久化、高并发量处理以及实时的可靠传输。相比于其他消息队列中间件(例如RabbitMQ、ActiveMQ),其最大的优点在于它提供的跨越语言的API支持,支持多种编程语言的客户端。作为一种轻量级的分布式消息传递系统,它能够很好的满足互联网、移动互联网等领域的实时性要求。本文将以Spring Boot为开发框架搭建一个基于Kafka的消息发布/订阅应用,主要涉及以下方面:
- Spring Boot基础知识:通过引入Spring Boot框架,能快速构建可执行的应用;掌握Spring Bean、依赖注入、配置管理等知识;
- Apache Kafka知识:学习Kafka的消息模型、生产者消费者模式、集群搭建以及客户端的使用方法;
- 消息发布/订阅相关知识:了解如何使用Kafka作为消息中间件,进行消息发布和订阅;
- Spring Messaging:使用Spring Messaging模块,构建复杂的消息流程;
- Spring Cloud Stream:探索Spring Cloud Stream模块,集成Kafka支持消息通讯;
2.核心概念术语说明
2.1 Spring Boot
2.1.1 Spring Boot简介
Spring Boot是一个新的Java平台全栈框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程。该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的XML文件。通过少量注解,开发人员即可完成对Spring组件的依赖配置。相对于传统的Spring应用,Spring Boot启动时间更快,内嵌Tomcat,Jetty或者Undertow,提供独立运行的能力,并且可以打包成一个单独的“ runnable” jar 文件,可以直接通过java -jar 命令运行。
2.1.2 Spring Boot特性
- 创建独立运行的JAR包:提供了spring boot starter的方式让用户选择自己所需的服务;使用内置Tomcat,Jetty或undertow等嵌入式容器来创建独立运行的Jar包;
- 提供了一系列默认配置项:通过 spring-boot-autoconfigure 模块为大多数常用第三方库提供了自动配置,无需任何代码侵入;
- 通过starter依赖(Starter POMs)进行第三方库管理:允许用户添加所需功能的依赖,如数据访问层框架mybatis,消息中间件Kafka,HTTP框架SpringMVC等;
- 可插入式框架:Spring Boot通过自动配置和Starters支持各种项目场景需求,包括监控、日志、安全等;
- “约定大于配置”原则:大量的自动配置选项可以避免配置文件过于复杂且难以理解;
- 生产就绪状态检查:当应用启动完成后,Spring Boot会检查依赖的数据库连接、缓存服务器是否正常工作,并提供对应的提示信息。
2.1.3 Spring Boot优点
Spring Boot有很多优点,其中最突出的是以下几点:
- 快速入门:由于自动配置,Spring Boot可以帮助用户快速上手,只要简单地导入相关依赖并编写一些配置代码,就可以创建一个可运行的应用程序。
- 分离关注点:Spring Boot并没有把所有东西都囊括进来,而是采用分层架构,提倡业务逻辑划分到不同的层次中。这一架构思想鼓励不同层次之间松耦合。
- 测试友好:Spring Boot使用了内置的Tomcat作为嵌入式容器,在测试的时候可以很方便地集成各种工具,如MockMvc、Hamcrest等,让单元测试变得简单又有效率。
- 支持多环境:Spring Boot可以通过active profile和配置文件来适配不同的运行环境,比如开发环境、测试环境、生产环境等。
2.2 Apache Kafka
2.2.1 Apache Kafka概述
Apache Kafka是一种分布式流处理平台,由LinkedIn开发,是Apache软件基金会下的顶级开源项目。它是一个快速、可扩展、高吞吐量的分布式 messaging system,它是以分布式日志存储和流处理平台为核心,同时也是为实时消费者设计的。它完全兼容性而开放源代码,并为超过十亿个消息的实时数据传递提供动力。Kafka 具有以下几个主要特征:
- 高吞吐量:Kafka 以磁盘上的快速访问时间闻名,因此其在处理实时数据流时的性能十分出色。Kafka 以每秒数千万的消息量持续产生。
- 低延迟:Kafka 被设计用于处理实时数据,具有毫秒级别的低延迟,平均延迟只有几百微秒。同时 Kafka 的 producer 和 consumer 可以并行发送和接收消息,每个 partition 只负责自己的部分数据,所以它可以实现任意的 throughput。
- 可扩展性:Kafka 可以通过水平扩展来实现消息的容错和容量扩充。这意味着你可以通过增加 broker 的数量来提升处理能力和数据容量。
- 灵活的数据模型:Kafka 支持多种数据格式,如 JSON、XML、Avro、Protocol Buffers,甚至可以使用自定义的 key-value 格式。
2.2.2 Apache Kafka架构
上图展示了一个简单的 Kafka 架构。其中,Producer 产生的消息首先被保存到一个叫作 “Topic” 的 topic 中,然后被分区 (partition) 在多个节点上复制。多个 Consumer 可以读取同一个 Topic 中的数据,但只能消费其中的特定分区的数据,并且只能消费已经被确认消费过的消息。为了保证消息不丢失,Kafka 使用了类似于 Paxos 的协议,确保 producer 和 consumer 数据的一致性。这种架构使得 Kafka 非常适合作为一个统一的消息总线,为实时应用程序提供了强大的基础。 ## 2.3 消息发布/订阅模式 ### 2.3.1 什么是消息发布/订阅? 消息发布/订阅模式就是指两个或更多的进程间存在一对多的依赖关系,一方(生产者)产生消息,另一方(消费者)订阅这些消息并接收。它通常用于异步通信,生产者和消费者之间不需要事先知道对方的信息,异步通信天生就是解耦的。在此,消息传递方式无关紧要,只要它们遵循发布/订阅模式,最终的结果都是一样的。 ### 2.3.2 为何要用消息发布/订阅模式? 在微服务架构下,系统被拆分为独立的服务,这些服务之间需要通过异步消息通信来进行数据交换。原因如下: * 服务之间的松耦合:服务之间通过消息发布/订阅模式解耦,使得系统可以独立部署、扩展,降低耦合程度; * 异步通信:使用消息发布/订阅模式能实现异步通信,削峰填谷,减小响应时间,促进系统弹性伸缩; * 最终一致性:很多时候,我们希望消费者获取到的一定是最新的数据,而不是实时的,这时候就需要用到消息中间件中的事务机制来实现最终一致性。 ## 2.4 Spring Messaging ### 2.4.1 Spring Messaging简介 Spring Messaging 是 Spring Framework 中的一组抽象概念,围绕消息传递机制进行抽象。它包括两种核心概念:Message(消息) 和 MessageChannel(消息通道)。Message 对象封装了具体的消息载荷,可以是文本、XML、JSON 或任何其他序列化形式。MessageChannel 是 Spring Messaging 对消息发送/接收通道的抽象。典型的消息传递场景包括发布/订阅、点对点、工作流。 ### 2.4.2 Spring Messaging API 下表列出了 Spring Messaging 的主要 API: | 接口 | 描述 | | ---------------------------------- | -------------------------------------------------------------- | | Destination
| 表示目的地的通用接口 | | Message
| 表示消息的接口 | | MessageHandler
| 表示用于处理消息的方法的接口 | | MessagingException
| 表示 Spring Messaging 抛出的异常 | | SubscribableChannel
| 表示可以向它订阅的通道 | | MessageChannel
| 表示用于发布/订阅消息的通道 | | MessageConverter
| 表示转换器,用于转换消息的头部和体部 | | MessageDeliveryFailureHandler
| 表示用于处理消息失败的接口 | | MessageHandlerMethodFactory
| 表示用于创建消息处理器的方法工厂 | | ReactiveMessageHandlerAdapter
| 表示用于支持 reactive programming 的消息处理器适配器 | | AbstractSubscribableChannel
| 表示不可修改的 SubscribableChannel | | AbstractMessageChannel
| 表示不可修改的 MessageChannel | | PollableChannel
| 表示支持轮询的消息通道 | | MessageHandlerInterceptor
| 表示用于拦截消息的接口 | | SimpleMessageListenerContainer
| 表示用于启动和停止消息监听器的简单容器 | | StompBrokerRelay
| 表示 STOMP 代理,可以将 STOMP 消息转发到消息代理 | | ChannelInterceptor
| 表示用于拦截消息通道的接口 | | ErrorMessageHandler
| 表示用于处理错误消息的接口 | 从上表可以看出,Spring Messaging 提供的 API 比较广泛,可以用于构建各种消息传递应用。 ## 2.5 Spring Cloud Stream ### 2.5.1 Spring Cloud Stream简介 Spring Cloud Stream是一个轻量级的框架,用于构建微服务架构下的事件驱动的消息传递管道。它利用 Spring Boot 的开发便利性及特有的注解绑定特性,在应用中整合了 Kafka、RabbitMQ、Amazon SQS 等多种消息中间件,为应用开发者提供了统一的消息输入/输出端点。通过声明式消息模型,Spring Cloud Stream 提供了一种声明式的方法来编码解耦各服务间的通信,让开发者聚焦于应用的核心业务逻辑。 Spring Cloud Stream 为微服务架构提供了一种简单而统一的消息传递机制,使得系统的各个微服务之间具备高度自治、松耦合的特性。基于 Spring Integration 来实现的模块化与弹性扩展支持,给予 Spring Cloud Stream 更大的灵活性与适应性。 ### 2.5.2 Spring Cloud Stream架构
从上图可以看到 Spring Cloud Stream 的架构。Spring Cloud Stream 有三个主要角色:Binder、Source 和 Sink。它们分别负责实现各种消息中间件的绑定、消息的生成、消费。其中,Source 负责向外发送消息,Sink 负责从外面接收消息。在 Spring Cloud Stream 中,消息的路由、过滤、编解码等操作都通过 Binder 来实现。 ### 2.5.3 Spring Cloud Stream案例 本节基于 Spring Boot + Spring Cloud Stream + Docker 来演示如何基于 Spring Cloud Stream 来实现一个事件驱动的消息传递应用。假设我们有一个应用 A,它需要将订单信息发送给另外的一个应用 B,这个过程称之为事件驱动。Spring Cloud Stream 提供了 @EnableBinding 和 @StreamListener 两个注解来实现应用间的消息传递。首先,我们需要安装 Docker。在命令行下输入以下指令安装 docker: shell $ sudo apt install docker.io
安装成功之后,我们可以在终端中输入命令查看 docker 是否安装成功: shell $ sudo docker version
如果出现版本号,即表示 docker 已安装。接下来,我们准备编写应用 A 的代码。新建一个 Spring Boot Maven 工程,加入必要的依赖: xml <?xml version="1.0" encoding="UTF-8"?> 4.0.0 com.example appa 0.0.1-SNAPSHOT jar appa http://maven.apache.org org.springframework.cloud spring-cloud-dependencies ${springcloud.version} pom import
配置 application.yml,指定 kafka 作为 binder: yaml server: port: 8080 spring: cloud: stream: bindings: output: destination: orders input: group: order-group destination: orders kafka: binder: brokers: localhost:9092
编写控制器 OrderController: java package com.example.appa; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.web.bind.annotation.*; /** * 订单控制器 */ @RestController public class OrderController { private final static String OUTPUT = "output"; private final static String INPUT = "input"; @Autowired private StreamBridge bridge; @PostMapping("/orders") public void createOrder(@RequestBody Order order) throws Exception{ // 发送消息 this.bridge.send(OUTPUT, order); } }
编写订单对象 Order: java package com.example.appa; import lombok.Data; /** * 订单实体类 */ @Data public class Order { private Long id; private String description; }
此处我们仅仅定义了订单 ID 和描述属性,真正的消息模型需要根据实际情况制定。最后,我们编写单元测试: ```java package com.example.appa; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.mock.mockito.MockBean; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.support.GenericMessage; import org.springframework.test.context.junit4.SpringRunner; /** * 单元测试 */ @RunWith(SpringRunner.class) @SpringBootTest public class AppATest { @Autowired private Source source; @MockBean private OrderEventHandler eventHandler; @Test public void testSend() throws Exception{ Order order = new Order(); order.setId(1L); order.setDescription("An order");
GenericMessage<Object> message = MessageBuilder.withPayload(order).setHeader("contentType", "application/json").build();
((MessageChannel)this.source.getOutput()).send(message);
}
}
此处我们模拟了订单创建事件,并发送到消息通道中,然后观察订单事件是否被正确处理。编写事件处理器 OrderEventHandler:
```java
package com.example.appa;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
/**
* 订单事件处理器
*/
public class OrderEventHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
@StreamListener(AppA.INPUT)
public void handle(@Payload Order order){
log.info("Received an order with id={}, description={}", order.getId(), order.getDescription());
}
}
此处我们通过 @StreamListener 指定消息通道和消息类型,并接收到消息后打印日志。至此,应用 A 的编写就完成了。接下来,我们编写应用 B 的代码,也叫作应用 C。
新建一个 Spring Boot Maven 工程,加入必要的依赖:
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.example</groupId><artifactId>appc</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>appc</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><springframework.version>5.2.9.RELEASE</springframework.version><springcloud.version>Hoxton.SR4</springcloud.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--引入 Spring Cloud Stream --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream</artifactId></dependency><!--引入 kafka binder --><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-stream-binder-kafka</artifactId></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${springcloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement></project>
配置 application.yml,指定 kafka 作为 binder:
server:port:8081logging:level:root: INFO
pattern:console:"%d{yyyy-MM-dd HH:mm:ss} %-5level %logger{50} - %msg%n"spring:cloud:stream:bindings:input:group: order-group
destination: orders
kafka:binder:brokers: localhost:9092
编写控制器 OrderController:
packagecom.example.appc;importorg.springframework.cloud.stream.annotation.Input;importorg.springframework.messaging.SubscribableChannel;importorg.springframework.stereotype.Service;/**
* 订单控制器
*/@ServicepublicclassOrderController{@Input("input")SubscribableChannel channel;publicvoidreceive(){while(true){try{Object msg = channel.receive().getPayload();System.out.println("received a message :"+msg);}catch(InterruptedException e){Thread.currentThread().interrupt();}}}}
此处我们通过 @Input 指定消息通道和消息类型,并接收消息。至此,应用 B 的编写就完成了。接下来,我们编写单元测试:
packagecom.example.appc;importorg.junit.jupiter.api.Disabled;importorg.junit.jupiter.api.Test;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;importorg.springframework.boot.test.mock.mockito.MockBean;importorg.springframework.cloud.stream.binding.BindableProxyFactory;importorg.springframework.cloud.stream.config.BindingsProperties;importorg.springframework.cloud.stream.converter.CompositeMessageConverterFactory;importorg.springframework.cloud.stream.provisioning.ProvisioningProvider;importorg.springframework.cloud.stream.test.binder.MessageCollector;importorg.springframework.integration.channel.QueueChannel;importorg.springframework.messaging.support.GenericMessage;importorg.springframework.util.MimeTypeUtils;importjava.time.Duration;/**
* 单元测试
*/@SpringBootTest(classes ={AppCApplication.class})@Disabled("for demo only, should be enabled to run the tests.")publicclassAppCTest{@AutowiredprivateBindingsProperties bindingsProperties;@AutowiredprivateCompositeMessageConverterFactory compositeMessageConverterFactory;@AutowiredprivateProvisioningProvider provisioningProvider;@AutowiredprivateBindableProxyFactory bindableProxyFactory;@MockBeanprivateMessageCollector messageCollector;@TestpublicvoidtestReceive()throwsException{OrderController controller =newOrderController();QueueChannel channel =newQueueChannel();BindableProxyFactory.BindableProxy proxy = bindableProxyFactory.createBindableProxy(controller,"order-group","orders",null,false,true, queue ->{},()->{});BindingsProperties.Consumer properties =this.bindingsProperties.getInput().get("orders");
provisioner =newKafkaChannelProvisioner(properties.isCreateTopics(), properties.isUseNativeDebeziumHeaders(),
properties.isExtendedBindingProperties(),
compositeMessageConverterFactory, provisioningProvider);
provisioner.provisionInputDestination(proxy,MimeTypeUtils.APPLICATION_JSON,
properties.getExtension(), channel,null);
controller.receive();for(int i =0; i <10; i++){GenericMessage<String> message =newGenericMessage<>(String.format("{\"id\": \"%s\", \"description\": \"%s\"}", i,"order"));
channel.send(message);}Thread.sleep(Duration.ofSeconds(3).toMillis());}}
此处我们模拟了一个 kafka 消费者,等待接收来自应用 A 的订单事件。我们还通过 BindableProxyFactory 创建了一个代理,调用其 provisionInputDestination 方法,创建了消息通道。然后,我们模拟向订单事件通道中发送 10 个订单消息。最后,我们等待 3 秒钟,期望收到相应的消息。至此,我们可以运行单元测试。
注意:目前暂时不能运行测试,原因是在 pom.xml 中,指定了 junit 版本冲突的问题,将 junit.jupiter.api 版本设置为 5.6.2 就行了。
版权归原作者 禅与计算机程序设计艺术 所有, 如有侵权,请联系我们删除。