一、kafka简介
Apache Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,并于 2011 年开源,随后捐赠给 Apache 基金会。Kafka 被广泛应用于构建实时数据管道和流处理应用,旨在以高吞吐量、低延迟和高容错性处理大量实时数据流,核心概念包括:
- Producer(生产者):向 Kafka 主题(Topic)发布消息的应用程序。生产者将数据发送到 Kafka 主题中,每条消息都会被附加到日志的末尾。
- Consumer(消费者):从 Kafka 主题中读取消息的应用程序。消费者可以订阅一个或多个主题,并以顺序或随机的方式读取消息。
- Topic(主题):消息的分类和存储单位。每个主题被分为多个分区(Partition),每个分区中的消息被存储为一个日志文件。
- Partition(分区):主题的水平分片,允许数据在多个 Kafka 服务器上进行分布式存储和处理。每个分区内的消息具有一个唯一的偏移量(Offset),标识其在日志中的位置。
- Broker(代理):Kafka 集群中的一台服务器,负责存储和传递消息。一个 Kafka 集群由多个 Broker 组成,每个 Broker 负责一个或多个分区。
- Consumer Group(消费者组):一组共同消费同一主题的消费者实例。Kafka 保证每个分区的消息只被同一个 Consumer Group 中的一个消费者处理。
- Zookeeper:用于管理 Kafka 集群的协调服务。Zookeeper 负责管理集群的元数据,包括 Broker 的状态、主题的分区信息等。
kafka常见用于消息队列,实际上还有很多其他应用场景,主要使用场景如下:
- 日志收集:Kafka 常用于日志收集系统,将应用程序生成的日志和事件数据集中收集,供后续分析使用。
- 流处理:与流处理框架(如 Apache Flink、Apache Storm)结合,Kafka 能够支持实时数据流处理。
- 消息队列:Kafka 可以作为一个高效的分布式消息队列系统,处理异步消息传递。
- 数据集成:Kafka 可以用于数据集成平台,将不同数据源的数据实时传输到目标系统中,如数据仓库或实时分析平台。
二、安装步骤
博文以安装最新稳定版kafka3.8.0为例进行介绍,kafka从2.8.0开始要求Java环境最低版本为Java11,kafka3.8.0推荐Java17,兼容Java11版本,博文实验环境如下:
- 操作系统:centos7.9
- Java版本:JAVA17.0.11
- kafka版本:3.8.0
- Scala版本:2.13
1、安装JAVA环境
JAVA环境的安装可以参考博文Linux之JDK安装与升级,这里不再赘述。
[wuhs@minio_test ~]$ java --version
java 17.0.11 2024-04-16 LTS
Java™ SE Runtime Environment (build 17.0.11+7-LTS-207)
Java HotSpot™ 64-Bit Server VM (build 17.0.11+7-LTS-207, mixed mode, sharing)
2、官网下载kafka软件包
软件的安装我们尽量通过官方渠道下载,确保软件的安全可靠性。
[wuhs@minio_test ~]$ wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz
3、哈希值校验
下载后的软件包查看哈希值,对比官网哈希值,两者一致才可以放心使用。
[wuhs@minio_test ~]$ sha512sum kafka_2.13-3.8.0.tgz
0a33b7be7b6fa53249ba80f9d02cda71ed81927c160aa6ee9be1e3d3c1c4b50466ffc905293143fd88ceac7f5e7d8f5bec28ef972addd3c459cc8b1291e738aa kafka_2.13-3.8.0.tgz
4、解压并创建软链接
创建软链接的好处是如果后续版本更新,我们只需要修改软链接,应用部署记录和环境变量一些参数等都不需要修改。另外只要可以使用普通用户部署的服务,都建议使用普通用户进行部署。
[wuhs@minio_test ~]$ tar -zxvf kafka_2.13-3.8.0.tgz
[wuhs@minio_test ~]$ ln -s kafka_2.13-3.8.0 kafka
5、启停zookeeper服务
zookeeper默认配置如下,单机环境下直接启用服务即可,如果需要做持久化,我们可以修改dataDir目录将数据存储到数据盘。可以使用daemon参数让服务后台运行。
[wuhs@minio_test kafka]$ cat config/zookeeper.properties |grep -Ev “^#|^$”
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
[wuhs@minio_test kafka]$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
6、检查zookeeper服务
看到2181端口已监听,说明zookeeper服务已启动。
7、修改kafka服务配置
kafka默认配置如下,我们主要需要修改advertised.listeners参数,该参数默认配置为PLAINTEXT://your.host.name:9092,远程客户端需要hostname调用并配置了正确的hosts映射关系,我们可以修改为网卡IP,让客户端可与通过IP进行远程消费。
#默认配置如下
[wuhs@minio_test kafka]$ cat config/server.properties |grep -Ev “^#|^$”
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
#增加配置
advertised.listeners=PLAINTEXT://192.168.20.10:9092
8、启动kafka服务
[wuhs@minio_test kafka]$ ./bin/kafka-server-start.sh -daemon config/server.properties
9、检查kafka服务
看到服务端口已监听说明kafka服务已经启动成功。
三、使用简介
1、服务验证
我们创建一个主题,通过生产消息,消费者可以正常消费获得消息,说明服务正常。
- 创建主题
[wuhs@minio_test kafka]$ ./bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
- 查看主题
[wuhs@minio_test kafka]$ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test
- 发送消息
[wuhs@minio_test kafka]$ ./bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
this is a test
…
hello wuhs
this is a test
- 另起一个窗口,消费消息
[wuhs@minio_test kafka]$ ./bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
this is a test
2、服务启停管理
- zookeeper启动服务
$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
- zookeeper停止服务
$ ./bin/zookeeper-server-stop.sh
- kafka启动服务
$ ./bin/kafka-server-start.sh -daemon config/server.properties
- kafka停止服务
$ ./bin/kafka-server-stop.sh
- 查看kafka版本
[wuhs@minio_test kafka]$ ./bin/kafka-server-start.sh --version
3.8.0
[wuhs@minio_test kafka]$ ./bin/kafka-topics.sh --version
3.8.0
- 查看主题列表
[wuhs@minio_test kafka]$ ./bin/kafka-topics.sh --list --bootstrap-server 192.168.20.10:9092
__consumer_offsets
test
3、版本升级注意事项
kafka版本升级的时候,启动新服务的时候自动会生成新的cluster.id,如果数据存储路径还是原来的,启动kefka的时候会报错提示cluster.id冲突,这个时候我们可以通过zookeeper客户端查看cluster-id,可以修改为原来的cluster.id,如果不需要历史数据,我们可以修改数据存储目录,自动生产并使用新的cluster.id。
#通过zookeeper查看cluster.id
[zk: localhost:2181(CONNECTED) 0] get /cluster/id
{“version”:“1”,“id”:“KuhMl4rsRwynLaFWeZzVFg”}
#通过meta.properties文件查看cluster.id
[wuhs@minio_test kafka]$ cat /tmp/kafka-logs/meta.properties
5、kafka配置文件目录下文件说明
server.properties
:用途:用于配置 Kafka Broker。zookeeper.properties
:用途:用于配置 ZooKeeper 服务。producer.properties
:用途:用于配置 Kafka Producer。consumer.properties
:用途:用于配置 Kafka Consumer。connect-standalone.properties
:用途:用于配置 Kafka Connect 以独立模式运行。connect-distributed.properties
:用途:用于配置 Kafka Connect 以分布式模式运行。log4j.properties
或log4j2.properties
:用途:用于配置 Kafka 的日志记录(Log4j 1.x 或 Log4j 2.x)。
版权归原作者 恒悦sunsite 所有, 如有侵权,请联系我们删除。