kafka入门
文章目录
1. 简介
1.1 什么是事件流?
事件流是人体中枢神经的数字等效果产物。它是“永远在线”世界技术基础,在这个世界中,企业越来越多的由软件定义和自动化,并且使用软件的用户越来越多。
从技术上来讲,事件流就是从事件源(如数据库、传感器、移动设备、云服务和软件应用程序)以事件流的形式实时捕获数据的实践;持久存储这些事件流以供以后检索;实时和回顾性的操作、处理和响应事件流;并根据需要将事件流路由到不同的目标技术。因此,事件流确保了数据的连续流动和解释,以便正确的信息在正确的时间出现在正确的位置。
1.2 事件流可以干什么?
事件流应用于众多行业和组织的各种用例,比如:
- 实时的处理支付和金融交易,例如证券交易所、银行和保险中。
- 实时跟踪和监控汽车、卡车、车队和货运,例如在物流和汽车行业。
- 持续捕获和分析来自物联网设备或其他设备的传感器数据,例如工厂和风电厂。
- 收集并立即响应客户互动和订单,例如零售、酒店和旅游行业以及移动应用程序。
- 监测住院病人,预测病情变化,确保在紧急情况下及时治疗。
- 连接、存储和提供公司不同部门产生的数据。
- 作为数据平台、事件驱动架构和微服务的基础
1.3 Apache Kafka是一个事件流平台
Kafka结合了三个关键功能,因此可以通过一个实战考验的解决方案实现端到端的事件流用例:
- 发布(写入)和订阅(读取)事件流,包括从其他系统持续导入/导出数据。
- 根据需要持久可靠的存储事件流。
- 在事件发生时或回顾时处理事件流。
所有这些功能都以分布式、高度可扩展、弹性、容错和安全的方式提供。Kafka可以部署在裸机硬件、虚拟机和容器上,也可以部署在本地和云端。可以自行管理Kafka环境和使用各种供应商提供完全的托管服务之间进行选择。
1.4 kafka是如何工作的?
Kafka是一个分布式系统,由通过高性能TCP网络协议进行通信的服务器和客户端组成。它可以部署在本地和云环境的裸机硬件、虚拟机和容器上。
服务器:Kafka作为一个或多个服务器的集群运行,可以跨越多个数据中心或云区域。其中一些服务器形成存储层,称为代理。其他服务器运行Kafka Connect以将数据作为事件流持续导入和导出,以及将Kafka与现有的系统(例如关系数据库以及其他kafka集群)集成。为了实现关键任务用例,Kafka集群具有高度可扩展性和容错性:如果其中任何一个服务器出现故障,其他服务器将接管他们的工作,以确保持续运行不会丢失任何数据。
客户端:他们允许编写分布式应用程序和微服务,以并行、大规模和容错方式读取、写入和处理事件流,即使在网络问题或机器故障的情况下也是如此。Kafka附带了一些这样的客户端,这些客户端由Kafka社区提供的数十个客户端Plus:客户端可用于Java和Scala,包括更高级别的Kafka Stream库,用于Go、Python、C/C++和许多其他编程语言以及REST API。
1.5 主要概念和术语
事件:记录了世界或业务中“发生了某事” 的事实。在文档中也称为记录或消息。当您向 Kafka 读取或写入数据时,您以事件的形式执行此操作。从概念上讲,事件具有键、值、时间戳和可选的元数据标头。
生产者是那些向 Kafka 发布(写入)事件的客户端应用程序,而消费者是订阅(读取和处理)这些事件的那些客户端应用程序。在 Kafka 中,生产者和消费者完全解耦并且彼此不可知,这是实现 Kafka 众所周知的高可扩展性的关键设计元素。例如,生产者永远不需要等待消费者。Kafka 提供了各种保证,例如一次性处理事件的能力。
事件被组织并持久地存储在主题中。非常简化,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。示例主题名称可以是“付款”。Kafka 中的主题始终是多生产者和多订阅者:一个主题可以有零个、一个或多个向其写入事件的生产者,以及订阅这些事件的零个、一个或多个消费者。主题中的事件可以根据需要随时读取——与传统的消息传递系统不同,事件在消费后不会被删除。相反,您可以通过每个主题的配置设置来定义 Kafka 应该将您的事件保留多长时间,之后旧事件将被丢弃。Kafka 的性能在数据大小方面实际上是恒定的,因此长时间存储数据是非常好的。
主题是分区的,这意味着一个主题分布在位于不同 Kafka 代理上的多个“桶”中。数据的这种分布式放置对于可扩展性非常重要,因为它允许客户端应用程序同时从多个代理读取和写入数据。当一个新事件发布到一个主题时,它实际上是附加到主题的分区之一。具有相同事件键(例如,客户或车辆 ID)的事件被写入同一个分区,并且 Kafka保证给定主题分区的任何消费者将始终以与写入事件完全相同的顺序读取该分区的事件。
此示例主题有四个分区 P1–P4。两个不同的生产者客户端通过网络将事件写入主题的分区,彼此独立地向主题发布新事件。具有相同键的事件(在图中由它们的颜色表示)被写入同一个分区。请注意,如果合适的话,两个生产者都可以写入同一个分区。
为了使数据具有容错性和高可用性,可以复制每个主题,甚至跨地理区域或数据中心,以便始终有多个代理拥有数据副本。一个常见的生产设置是复制因子为 3,即始终存在三个数据副本。此复制在主题分区级别执行
1.6 kafka架构
整体来看,Kafka架构中包含四大组件:生产者、消费者、Kafka集群、zoopkeeper集群。对照上面架构图,得出以下分析:
- boker:kafka集群包含一个和多个服务器,每个服务器节点称为一个broker;
- topic:每条发布到kafka集群的消息都有一个类别,这个类别称为topic,其实就是将消息按照topic来分类,topic就是逻辑上的分类,同一个topic的数据既可以在同一个boker上也可以在不同的boker节点上。
- partition:分区,每个topic被物理划分为一个或者多个分区,每个分区再物理上对应一个文件夹,该文件夹里面存储了这个分区所有的消息和索引文件。在创建topic时可以指定partion数量,生产者将消息发送到topic时,消息会根据分区策略追加到分区文件的末尾,属于顺序写磁盘,因此效率非常高(顺序写磁盘比随机写内存还要高,这是kafka高吞吐率的很重要的一个保证)。上面提到了分区策略,所谓分区策略就是决定生产者将消息发送到哪个分区的算法。kafka为我们提供了默认的分区策略,同时它也支持自定义分区策略。kafka允许为每个消息设置一个key,一旦消息被定义了key,那么就可以保证同一个key的所有消息都进入到相同的分区,这种策略属于自定义策略的一种,被称作“按消息保存策略“,或者key-ordering策略。同一个主题的多个分区可以部署在多个机器上,以此来实现kafka的伸缩性。同一partition中的数据是有序的,但topic下的多个partition之间在消费数据时不能保证有序性,在需要严格保证消息顺序消费的场景下,可以将partition数设为1,但是这种做法的缺点就是降低了吞吐,一般来说,只需要保证每个分区的有序性,再对消息设置key来保证相同的key的消息落入同一个分区,就可以满足绝大多数应用。
- offset:partition中的每条消息都被标记了一个序号,这个序号表示消息在partition中的偏移量,称为offset,每一条消息在partition都有唯一的offset,消息者通过指定offset来指定要消费的信息。正常情况下,消费者在消费完一条消息后会递增offset,准备去消费下一条消息,但也可以将offset设成一个比较小的值,重新消费一些消费国的信息,可见offset是由consumer控制的,consumer想消费哪一条消息就消费哪一条消息,所以kafka borker是无状态的,它不需要标记哪些消息被消费过。
- producer:生产者发送消息到指定topic下,消息再根据分配规则append到某个partition的末尾。
- consumer:消费者从topic中消费数据。
- consumer group:消费者组,每个consumer属于一个特定的consumer group,可为每个consumer指定consumer group,若不指定则属于默认的group。同一topic的一条消息只能被同一consumer group内的consumer消费,但多个consumer group可同时消费这一消息。这也是kafka用来实现一个topic消息的广播和手段,如果需要实现广播,一个consumer group内只放一个消费者即可,要实现单播,将所有的消费者放到同一个consumer group即可。用consumer group还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
- leader:每个partition有多个副本,其中有且仅有一个作为leader,leader会负责所有的客户端读写操作。
- follower:follower不对外提供服务,只与leader保持数据同步,如果leader失效,则选举一个follower来充当新的leader。当follower与leader挂掉、卡住或者同步太慢,leader会把这个follower从ISR列表中删除,重新创建一个follower。
- rebalance:同一个consumer group下的多个消费者互相协调消费工作,我们这样想,一个topic分为多个分区,一个consumer group里面的所有消费者合作,一起去消费所订阅的某个topic下的所有分区(每个消费者消费部分分区),kafka会将该topic下的所有分区均匀的分配给consumer group下的每个消费者,如下图,rebalance表示"重平衡",consumer group内某个消费者挂掉后,其他消费者自动重新分配订阅主题分区的过程,是 Kafka 消费者端实现高可用的重要手段。如下图Consumer Group A中的C2挂掉,C1会接收P1和P2,以达到重新平衡。同样的,当有新消费者加入consumer group,也会触发重平衡操作。
1.7Kafka API
c除了用于管理和管理任务的命令行工具外,Kafka还为Java和Scala提供了五个核心API:
- 用于管理和管理任务的和检查主题、代理和其他Kafka 对象 的管理 API 。
- 将事件流发布(写入)到一个或多个 Kafka 主题 的Producer API 。
- Consumer API订阅(读取)一个或多个主题并处理向它们生成的事件流 。
- 用于实现流处理应用程序和微服务 的Kafka Streams API 。它提供了更高级别的函数来处理事件流,包括转换、聚合和连接等有状态操作、窗口化、基于事件时间的处理等等。从一个或多个主题读取输入以生成一个或多个主题的输出,有效地将输入流转换为输出流。
- Kafka Connect API用于构建和运行可重用 的数据导入/导出连接器,这些连接器从外部系统和应用程序消耗(读取)或生成(写入)事件流,以便它们可以与 Kafka 集成。例如,与 PostgreSQL 等关系数据库的连接器可能会捕获对一组表的每次更改。但是,在实践中,您通常不需要实现自己的连接器,因为 Kafka 社区已经提供了数百个即用型连接器。
2. 用例
2.1 消息传递
Kafka可以很好的替代更传统的消息代理。消息代理的使用有多种原因(将处理与数据生产者分离,缓冲未处理的消息等)。与大多数消息传递系统相比,Kafka具有更好的吞吐量、内置的分区、复制和容错能力,使其成为大规模的消息处理应用程序的良好解决方案。
消息传递的使用通常吞吐量相对较低,但可能需要较低的端到端延迟,并且通常依赖于Kafka提供强大的持久性保证。(在这个领域,Kafka可与ActiveMQ或RabbitMQ等传统消息传递系统相媲美)。
2.2 网站活动跟踪
Kafka的原始用例是能够将用户活动跟踪管道重建为一组实时发布——订阅源。这意味着站点活动(页面查看、搜索或用户可能采取其他操作)将发布到中心主题,每种活动类型都有一个主题。这些订阅源可用于订阅一系列用例,包括实时处理、实时监控以及加载到Hadoop或离线数据仓库系统以进行离线处理和报告。
活动跟踪i的数量通常非常高,因为每个用户页面查看都会生成许多活动消息。
2.3 指标
Kafka常用于运营监控数据。这涉及聚合来自分布式应用程序的统计数据以生成操作数据的集中提要。
2.4 日志聚合
许多人使用Kafka作为日志聚合解决方案的替代品。日志聚合通常从服务器收集物理日志文件,并将它们放在一个中心位置(可能是文件服务器或HDFS)进行处理。Kafka抽象出文件的细节,并将日志或事件数据更清晰的抽象为消息流。这允许更低延迟的处理和更容易支持多个数据源和分布式数据消费。与Scribe或Flume等以日志为中心的系统相比,Kafka提供同样出色的性能,由于复制而产生的更强大的持久性保证以及更低的端到端延迟。
2.5 流处理
许多Kafka用户在由多个阶段组成的处理管道中处理数据,其中原始输入数据从kafka主题中消费,然后聚合、丰富或以其他方式转换为新主题以供进一步消费或后续处理。例如:用于推荐新闻文章的处理管道可能会从RSS提要中抓取文章内容并将其发布发到“文章”主题;最终处理阶段可能会尝试向用户推荐次内容。此类处理管道基于各个主题创建实时数据流图。从0.10.0.0开始,一个轻量级但功能强大的流处理库。称为Kafka Streams可以在Apache Kafka中执行上述数据处理。除了Kafka Streams,替代的开源流处理工具包括Apache Strom和Apache Samza。
2.6 事件溯源
事件溯源是一种应用程序设计风格,其中状态更改被记录为按时间排序的记录序列。Kafka对非常大的存储日志数据的支持使其成为以这种风格构建的应用程序的出色后端。
2.7 提交日志
Kafka可以作为分布式系统的一种外部提交日志。该日志有助于在节点之间复制数据,并充当故障节点恢复其数据的同步机制。Kafka中的日志压缩功能有助于支持这种用法。在这种用法中,Kafka类似于Apache BookKeeper项目。
3. 快速入门
第一步:获取Kafka
点击下载 最新的 Kafka 版本并解压:
$ tar -xzf kafka_2.13-3.1.0.tgz
$ cd kafka_2.13-3.1.0
第二步:启动Kafka环境
(*ps:本地环境必须安装Java 8+*)
运行以下命令以按正确的顺序启动所有服务:
# Start the ZooKeeper service# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
打开另一个终端会话并运行:
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
成功启动所有服务后,您将拥有一个基本的 Kafka 环境运行并可以使用。
第三步:创建一个主题来存储事件
Kafka 是一个分布式事件流平台,可让您跨多台机器 读取、写入、存储和处理 事件(在文档中也称为记录或 消息)。
示例事件包括支付交易、手机的地理位置更新、运输订单、物联网设备或医疗设备的传感器测量等等。这些事件被组织并存储在 主题中。非常简化,主题类似于文件系统中的文件夹,事件是该文件夹中的文件。
因此,在编写第一个事件之前,必须创建一个主题。打开另一个终端会话并运行:
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
Kafka 的所有命令行工具都有其他选项:运行
kafka-topics.sh
不带任何参数的命令以显示使用信息。例如,它还可以显示 新主题 的分区数等详细信息:
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic:quickstart-events PartitionCount:1 ReplicationFactor:1 Configs:
Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
第四步:创建一些事件写入主题
Kafka 客户端通过网络与 Kafka 代理通信以写入(或读取)事件。一旦收到,代理将以持久和容错的方式存储事件,只要您需要 - 甚至永远。
运行控制台生产者客户端将一些事件写入您的主题。默认情况下,您输入的每一行都会导致将一个单独的事件写入主题。
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
您可以随时停止生产者客户端
Ctrl-C
。
第五步:阅读事件
打开另一个终端会话并运行控制台使用者客户端以读取刚刚创建的事件:
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
可以随时停止消费者客户端
Ctrl-C
。
随意尝试:例如,切换回生产者终端(上一步)以编写其他事件,并查看这些事件如何立即显示在消费者终端中。
因为事件被持久地存储在 Kafka 中,所以它们可以被尽可能多的消费者多次读取。可以通过打开另一个终端会话并再次重新运行上一个命令来轻松验证这一点。
第六步:使用Kafka Connect 将数据作为事件流导入/导出
现有系统(如关系数据库或传统消息传递系统)中拥有大量数据,以及许多已经使用这些系统的应用程序。 Kafka Connect允许不断地将来自外部系统的数据摄取到 Kafka 中,反之亦然。因此很容易将现有系统与 Kafka 集成。为了使这个过程更容易,有数百个这样的连接器随时可用。
第七步:使用Kafka Streams处理事件
一旦您的数据作为事件存储在 Kafka 中,您就可以使用 Java/Scala 的 Kafka Streams客户端库处理数据。它允许您实现关键任务的实时应用程序和微服务,其中输入和/或输出数据存储在 Kafka 主题中。Kafka Streams 将在客户端编写和部署标准 Java 和 Scala 应用程序的简单性与 Kafka 的服务器端集群技术的优势相结合,使这些应用程序具有高度可扩展性、弹性、容错性和分布式性。该库支持一次性处理、有状态操作和聚合、窗口化、连接、基于事件时间的处理等等。
为了让您初步了解,以下是实现流行
WordCount
算法的方法:
KStream<String, String> textLines = builder.stream("quickstart-events");
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" "))).groupBy((keyIgnored, word)-> word).count();
wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Ser
第八步:终止Kafka环境
现在已经完成了快速入门,请随意拆除 Kafka 环境,或者继续玩。
Ctrl-C
如果还没有这样做,请 使用 停止生产者和消费者客户端。- 使用 停止 Kafka 代理
Ctrl-C
。 - 最后,使用 . 停止 ZooKeeper 服务器
Ctrl-C
。
如果还想删除本地 Kafka 环境的任何数据,包括在此过程中创建的任何事件,请运行以下命令:
$ rm -rf /tmp/kafka-logs /tmp/zookeeper
版权归原作者 低调小马(mcy) 所有, 如有侵权,请联系我们删除。