kafka安装准备
- jdk-8u131-linux-x64.tar.gz
- kafka_2.13-3.2.0.tgz
- apache-zookeeper-3.6.1-bin.tar.gz
- kafka-eagle-bin-2.1.0.tar.gz
- zktools.zip
- canal.deployer-1.1.4.tar.gz
kafka单机安装
更新jdk
CentOS7自带了一个openjdk,使用的时候用诸多问题,例如明明配置了Java环境变量但是不能使用,这个时候需要卸载重新安装。
查看已有openjdk版本
rpm -qa|grep jdk
卸载openjdk
remove后面的参数是上面得到的结果.noarch结尾的包
yum -y remove copy-jdk-configs-3.3-10.el7_5.noarch
下载jdk1.8
下载jdk-8u40-linux-x64.tar.gz,上传到/usr/local/java
链接:百度网盘 请输入提取码
提取码:2673
解压
tar -zxvf jdk-8u40-linux-x64.tar.gz -C /usr/local/java
配置环境变量
在/etc/profile文件的末尾加上
export JAVA_HOME=/usr/local/java/jdk1.8.0_40
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH
编译使之生效
source /etc/profile
验证
java -version
下载解压kafka
获取下载地址(点开具体版本): http://kafka.apache.org/downloads, 下载 Binary 二进制版本而不是源码
通过下列命令直接下载,或者直接将下载好的 kafka_2.13-3.2.0.tgz 压缩包放到 /usr/local/ 目录,然后解压安装
cd /usr/local
wget https://mirror.bit.edu.cn/apache/kafka/3.2.0/kafka_2.13-3.2.0.tgz
tar -xzvf kafka_2.13-3.2.0.tgz
cd kafka_2.13-3.2.0
启动zookeeper
kafka需要依赖ZK,kafka安装包中已经自带了一个ZK,也可以在其他机器单独安装运行zk,改成指定已运行的ZK。
如果改成指定的ZK需要修改修改 kafka 安装目录下的 config/server.properties 文件中的 zookeeper.connect 。这里使用自带的ZK。
如果需要单独安装zookeeper: https://blog.csdn.net/luciferlongxu/article/details/126187553
后台启动ZK:
nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >> zookeeper.nohup &
如果启动zookeeper时报错nohup
[root@rmq101 kafka_2.13-3.2.0]# nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >> zookeeper.nohup &
[1] 1579
[root@rmq101 kafka_2.13-3.2.0]# nohup: 忽略输入重定向错误到标准输出端
修改命令,在&前加上 2>&1
[root@rmq101 kafka_2.13-3.2.0]# nohup ./bin/zookeeper-server-start.sh config/zookeeper.properties >> zookeeper.nohup 2>&1 &
[2] 1929
检查zookeeper是否启动成功:
ps -ef|grep zookeeper
启动kafka
修改相关配置
vim config/server.properties
Broker ID启动以后就不能改了
broker.id=1
取消注释,改成本机IP:
listeners=PLAINTEXT://192.168.1.101:9092
num.partitions后面增加2行。
发送到不存在topic自动创建。允许永久删除topic。(注意:生产环境正式运行必须设置为false)
num.partitions=1
auto.create.topics.enable=true
delete.topic.enable=true
后台启动kafka(kafka安装目录下):
nohup ./bin/kafka-server-start.sh ./config/server.properties &
如果启动kafka时报错nohup
[root@rmq101 kafka_2.13-3.2.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties 2>&1 &
[2] 2277
[root@rmq101 kafka_2.13-3.2.0]# nohup: 忽略输入并把输出追加到"nohup.out"
修改命令,在&前加上 >/dev/null 2>&1
[root@rmq101 kafka_2.13-3.2.0]# nohup ./bin/kafka-server-start.sh ./config/server.properties >/dev/null 2>&1 &
[3] 3027
通过 ps -ef|grep kafka 命令可以查看kafka是否启动成功
日志在logs目录下
创建Topic
创建一个名为lucifer-topic的topic,只有一个副本,一个分区:
sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic lucifer-topic
如果kafka创建topic时报错
[root@rmq101 kafka_2.13-3.2.0]# sh bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic lucifer-topic --replication-factor 1 --partitions 1
Exception in thread "main" joptsimple.UnrecognizedOptionException: zookeeper is not a recognized option
at joptsimple.OptionException.unrecognizedOption(OptionException.java:108)
at joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510)
at joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56)
at joptsimple.OptionParser.parse(OptionParser.java:396)
at kafka.admin.TopicCommand$TopicCommandOptions.<init>(TopicCommand.scala:567)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:47)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
在较新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 连接字符串,即- -zookeeper localhost:2181。使用 Kafka Broker的 --bootstrap-server localhost:9092来替代。
[root@rmq101 kafka_2.13-3.2.0]# sh bin/kafka-topics.sh --create --topic lucifer-topic --replication-factor 1 --partitions 1 --bootstrap-server 192.168.1.101:9092
Created topic lucifer-topic.
修改为 --bootstrap-server 192.168.1.101:9092后执行成功,可以通过下面命令查看topic是否创建成功
[root@rmq101 kafka_2.13-3.2.0]# sh bin/kafka-topics.sh --bootstrap-server 192.168.1.101:9092 --describe --topic lucifer-topic
Topic: lucifer-topic TopicId: awcXOAvsTUmWltdSsgPz2Q PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: lucifer-topic Partition: 0 Leader: 101 Replicas: 101 Isr: 101
[root@rmq101 kafka_2.13-3.2.0]#
或者通过下面命令查看已经创建的 topic:
# 2.2 以前版本
sh bin/kafka-topics.sh -list -zookeeper localhost:2181
# 2.2以后版本
sh bin/kafka-topics.sh -list -bootstrap-server 192.168.1.101:9092
启动Producer
打开一个窗口,在kafka解压目录下:
sh bin/kafka-console-producer.sh --broker-list 192.168.1.101:9092 --topic lucifer-topic
启动Consumer
在一个新的远程窗口中:当生产者发送消息,消费者就能实时收到消息
sh bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.101:9092 --topic lucifer-topic --from-beginning
查看消息内容
生产者发送消息:
[root@rmq101 kafka_2.13-3.2.0]# sh bin/kafka-console-producer.sh --broker-list 192.168.1.101:9092 --topic lucifer-topic
>hello, Im ^H^H^H^H^H^H^H^H
>hello, I'm lucifer from producer client
>how are you
>are you ready
>go
>very good
>
消费者收到消息:
[root@rmq101 kafka_2.13-3.2.0]# sh bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.101:9092 --topic lucifer-topic --from-beginning
hello, Im
hello, I'm lucifer from producer client
how are you
are you ready
go
very good
在 /tmp/kafka-logs/ 目录下查看内容:
/tmp/kafka-logs/lucifer-topic-0/00000000000000000000.log
[root@rmq101 lucifer-topic-0]# /usr/local/kafka_2.13-3.2.0/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/lucifer-topic-0/00000000000000000000.log --print-data-log
Dumping /tmp/kafka-logs/lucifer-topic-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: 0 lastSequence: 0 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 0 CreateTime: 1659453466769 size: 86 magic: 2 compresscodec: none crc: 842542502 isvalid: true
| offset: 0 CreateTime: 1659453466769 keySize: -1 valueSize: 18 sequence: 0 headerKeys: [] payload: hello, Im
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: 1 lastSequence: 1 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 86 CreateTime: 1659453478488 size: 107 magic: 2 compresscodec: none crc: 2883387325 isvalid: true
| offset: 1 CreateTime: 1659453478488 keySize: -1 valueSize: 39 sequence: 1 headerKeys: [] payload: hello, I'm lucifer from producer client
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: 2 lastSequence: 2 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 193 CreateTime: 1659453503674 size: 79 magic: 2 compresscodec: none crc: 1027305610 isvalid: true
| offset: 2 CreateTime: 1659453503674 keySize: -1 valueSize: 11 sequence: 2 headerKeys: [] payload: how are you
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: 3 lastSequence: 3 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 272 CreateTime: 1659453513330 size: 81 magic: 2 compresscodec: none crc: 3754693900 isvalid: true
| offset: 3 CreateTime: 1659453513330 keySize: -1 valueSize: 13 sequence: 3 headerKeys: [] payload: are you ready
baseOffset: 4 lastOffset: 4 count: 1 baseSequence: 4 lastSequence: 4 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 353 CreateTime: 1659453515317 size: 70 magic: 2 compresscodec: none crc: 3065572672 isvalid: true
| offset: 4 CreateTime: 1659453515317 keySize: -1 valueSize: 2 sequence: 4 headerKeys: [] payload: go
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: 5 lastSequence: 5 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 423 CreateTime: 1659453522193 size: 77 magic: 2 compresscodec: none crc: 350799265 isvalid: true
| offset: 5 CreateTime: 1659453522193 keySize: -1 valueSize: 9 sequence: 5 headerKeys: [] payload: very good
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: 6 lastSequence: 6 producerId: 0 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false deleteHorizonMs: OptionalLong.empty position: 500 CreateTime: 1659453880343 size: 68 magic: 2 compresscodec: none crc: 3512345729 isvalid: true
| offset: 6 CreateTime: 1659453880343 keySize: -1 valueSize: 0 sequence: 6 headerKeys: [] payload:
/tmp/kafka-logs/lucifer-topic-0/00000000000000000000.index
[root@rmq101 lucifer-topic-0]# /usr/local/kafka_2.13-3.2.0/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/lucifer-topic-0/00000000000000000000.index --print-data-log
Dumping /tmp/kafka-logs/lucifer-topic-0/00000000000000000000.index
offset: 0 position: 0
/tmp/kafka-logs/lucifer-topic-0/00000000000000000000.timeindex
[root@rmq101 lucifer-topic-0]# /usr/local/kafka_2.13-3.2.0/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/lucifer-topic-0/00000000000000000000.timeindex --print-data-log
Dumping /tmp/kafka-logs/lucifer-topic-0/00000000000000000000.timeindex
timestamp: 0 offset: 0
Found timestamp mismatch in :/tmp/kafka-logs/lucifer-topic-0/00000000000000000000.timeindex
Index timestamp: 0, log timestamp: 1659453466769
删除kafka全部数据步骤:
1、停止每台机器上的kafka;
2、删除kafka存储目录(server.properties文件log.dirs配置,默认为“/tmp/kafka-logs”)全部topic的数据目录;
3、删除zookeeper上与kafka相关的znode节点;除了/zookeeper
4、重启kafka。
版权归原作者 Lucifer Zhao 所有, 如有侵权,请联系我们删除。