0


Linux之kafka单机安装和使用简介

一、kafka简介

  Apache Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,并于 2011 年开源,随后捐赠给 Apache 基金会。Kafka 被广泛应用于构建实时数据管道和流处理应用,旨在以高吞吐量、低延迟和高容错性处理大量实时数据流,核心概念包括:

  • Producer(生产者):向 Kafka 主题(Topic)发布消息的应用程序。生产者将数据发送到 Kafka 主题中,每条消息都会被附加到日志的末尾。
  • Consumer(消费者):从 Kafka 主题中读取消息的应用程序。消费者可以订阅一个或多个主题,并以顺序或随机的方式读取消息。
  • Topic(主题):消息的分类和存储单位。每个主题被分为多个分区(Partition),每个分区中的消息被存储为一个日志文件。
  • Partition(分区):主题的水平分片,允许数据在多个 Kafka 服务器上进行分布式存储和处理。每个分区内的消息具有一个唯一的偏移量(Offset),标识其在日志中的位置。
  • Broker(代理):Kafka 集群中的一台服务器,负责存储和传递消息。一个 Kafka 集群由多个 Broker 组成,每个 Broker 负责一个或多个分区。
  • Consumer Group(消费者组):一组共同消费同一主题的消费者实例。Kafka 保证每个分区的消息只被同一个 Consumer Group 中的一个消费者处理。
  • Zookeeper:用于管理 Kafka 集群的协调服务。Zookeeper 负责管理集群的元数据,包括 Broker 的状态、主题的分区信息等。

  kafka常见用于消息队列,实际上还有很多其他应用场景,主要使用场景如下:

  • 日志收集:Kafka 常用于日志收集系统,将应用程序生成的日志和事件数据集中收集,供后续分析使用。
  • 流处理:与流处理框架(如 Apache Flink、Apache Storm)结合,Kafka 能够支持实时数据流处理。
  • 消息队列:Kafka 可以作为一个高效的分布式消息队列系统,处理异步消息传递。
  • 数据集成:Kafka 可以用于数据集成平台,将不同数据源的数据实时传输到目标系统中,如数据仓库或实时分析平台。

二、安装步骤

  博文以安装最新稳定版kafka3.8.0为例进行介绍,kafka从2.8.0开始要求Java环境最低版本为Java11,kafka3.8.0推荐Java17,兼容Java11版本,博文实验环境如下:

  • 操作系统:centos7.9
  • Java版本:JAVA17.0.11
  • kafka版本:3.8.0
  • Scala版本:2.13

1、安装JAVA环境

  JAVA环境的安装可以参考博文Linux之JDK安装与升级,这里不再赘述。

[wuhs@minio_test ~]$ java --version
java 17.0.11 2024-04-16 LTS
Java™ SE Runtime Environment (build 17.0.11+7-LTS-207)
Java HotSpot™ 64-Bit Server VM (build 17.0.11+7-LTS-207, mixed mode, sharing)

2、官网下载kafka软件包

  软件的安装我们尽量通过官方渠道下载,确保软件的安全可靠性。

[wuhs@minio_test ~]$ wget https://downloads.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz

3、哈希值校验

  下载后的软件包查看哈希值,对比官网哈希值,两者一致才可以放心使用。

[wuhs@minio_test ~]$ sha512sum kafka_2.13-3.8.0.tgz
0a33b7be7b6fa53249ba80f9d02cda71ed81927c160aa6ee9be1e3d3c1c4b50466ffc905293143fd88ceac7f5e7d8f5bec28ef972addd3c459cc8b1291e738aa kafka_2.13-3.8.0.tgz
在这里插入图片描述

4、解压并创建软链接

  创建软链接的好处是如果后续版本更新,我们只需要修改软链接,应用部署记录和环境变量一些参数等都不需要修改。另外只要可以使用普通用户部署的服务,都建议使用普通用户进行部署。

[wuhs@minio_test ~]$ tar -zxvf kafka_2.13-3.8.0.tgz
[wuhs@minio_test ~]$ ln -s kafka_2.13-3.8.0 kafka

5、启停zookeeper服务

  zookeeper默认配置如下,单机环境下直接启用服务即可,如果需要做持久化,我们可以修改dataDir目录将数据存储到数据盘。可以使用daemon参数让服务后台运行。

[wuhs@minio_test kafka]$ cat config/zookeeper.properties |grep -Ev “^#|^$”

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
[wuhs@minio_test kafka]$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

6、检查zookeeper服务

  看到2181端口已监听,说明zookeeper服务已启动。
在这里插入图片描述

7、修改kafka服务配置

  kafka默认配置如下,我们主要需要修改advertised.listeners参数,该参数默认配置为PLAINTEXT://your.host.name:9092,远程客户端需要hostname调用并配置了正确的hosts映射关系,我们可以修改为网卡IP,让客户端可与通过IP进行远程消费。

#默认配置如下
[wuhs@minio_test kafka]$ cat config/server.properties |grep -Ev “^#|^$”
broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
#增加配置
advertised.listeners=PLAINTEXT://192.168.20.10:9092

8、启动kafka服务

[wuhs@minio_test kafka]$ ./bin/kafka-server-start.sh -daemon config/server.properties

9、检查kafka服务

  看到服务端口已监听说明kafka服务已经启动成功。

在这里插入图片描述

三、使用简介

1、服务验证

  我们创建一个主题,通过生产消息,消费者可以正常消费获得消息,说明服务正常。

  • 创建主题

[wuhs@minio_test kafka]$ ./bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

  • 查看主题

[wuhs@minio_test kafka]$ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
test

  • 发送消息

[wuhs@minio_test kafka]$ ./bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
this is a test

hello wuhs
this is a test

  • 另起一个窗口,消费消息

[wuhs@minio_test kafka]$ ./bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
this is a test
在这里插入图片描述

2、服务启停管理

  • zookeeper启动服务

$ ./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

  • zookeeper停止服务

$ ./bin/zookeeper-server-stop.sh

  • kafka启动服务

$ ./bin/kafka-server-start.sh -daemon config/server.properties

  • kafka停止服务

$ ./bin/kafka-server-stop.sh

  • 查看kafka版本

[wuhs@minio_test kafka]$ ./bin/kafka-server-start.sh --version
3.8.0
[wuhs@minio_test kafka]$ ./bin/kafka-topics.sh --version
3.8.0

  • 查看主题列表

[wuhs@minio_test kafka]$ ./bin/kafka-topics.sh --list --bootstrap-server 192.168.20.10:9092
__consumer_offsets
test

3、版本升级注意事项

  kafka版本升级的时候,启动新服务的时候自动会生成新的cluster.id,如果数据存储路径还是原来的,启动kefka的时候会报错提示cluster.id冲突,这个时候我们可以通过zookeeper客户端查看cluster-id,可以修改为原来的cluster.id,如果不需要历史数据,我们可以修改数据存储目录,自动生产并使用新的cluster.id。

#通过zookeeper查看cluster.id
[zk: localhost:2181(CONNECTED) 0] get /cluster/id
{“version”:“1”,“id”:“KuhMl4rsRwynLaFWeZzVFg”}
#通过meta.properties文件查看cluster.id
[wuhs@minio_test kafka]$ cat /tmp/kafka-logs/meta.properties

5、kafka配置文件目录下文件说明

  • server.properties用途:用于配置 Kafka Broker。
  • zookeeper.properties用途:用于配置 ZooKeeper 服务。
  • producer.properties用途:用于配置 Kafka Producer。
  • consumer.properties用途:用于配置 Kafka Consumer。
  • connect-standalone.properties用途:用于配置 Kafka Connect 以独立模式运行。
  • connect-distributed.properties用途:用于配置 Kafka Connect 以分布式模式运行。
  • log4j.propertieslog4j2.properties用途:用于配置 Kafka 的日志记录(Log4j 1.x 或 Log4j 2.x)。

本文转载自: https://blog.csdn.net/carefree2005/article/details/141378917
版权归原作者 恒悦sunsite 所有, 如有侵权,请联系我们删除。

“Linux之kafka单机安装和使用简介”的评论:

还没有评论