1.下载
https://kafka.apache.org/downloads
2.解压
tar zxvf kafka_2.12-3.2.0.tgz
3.配置
3.1 配置/config/server.properties
(1).存活时间(默认168)
log.retention.hours=168
(2).配置IP与端口号
host.name=内网IP
# 如果操作topic时,报could not be established. Broker may not be available 错误
# 将listeners=PLAINTEXT: 改为 host:port:=PLAINTEXT:
listeners=PLAINTEXT://内网IP:9092
#主要给外网访问,常在云服务器上配置
advertised.listeners=PLAINTEXT://公网IP:9092
3.2 配置/config/zookeeper.properties
# 先创建,再指定zookeeper的数据与日志目录,这里在kafka的安装主目录下创 建/zookeeper/data 与 /zookeeper/log
dataDir=/zookeeper/data
dataLogDir=/zookeeper/log
# 如果启动zookeeper时,报上面地址访问不到,则使用绝对路径即可解决
4.启动zookeeper
# 在安装主目录下执行
(1).挂起启动
bin/zookeeper-server-start.sh config/zookeeper.properties
(2).后台启动
nohup bin/zookeeper-server-start.sh config/zookeeper.properties 2>&1 &
5.启动kafka
# 在安装主目录下执行
(1).挂起启动
bin/kafka-server-start.sh config/server.properties
(2).后台启动
nohup bin/kafka-server-start.sh config/server.properties 2>&1 &
6.创建topic(kafka-topic)存储消息
bin/kafka-topics.sh --create --topic kafka-topic --bootstrap-server localhost:9092
7.查看topic(kafka-topic)状态
bin/kafka-topics.sh --describe --topic kafka-topic --bootstrap-server localhost:9092
8.生产消息
bin/kafka-console-producer.sh --topic kafka-topic --bootstrap-server localhost:9092
9.消费消息
bin/kafka-console-consumer.sh --topic kafka-topic --from-beginning --bootstrap-server localhost:9092
# ps1:在消费消息时如果没有指定group,不会记录消息的offset,生产消息时没有消费者,消息会被忽略,除非有这条指定 --from-beginning
# ps2:在消费消息时如果指定group,则会记录消息的offset,产生消息时就算没有消费者,之后再有消费都连接的时候会从未消费过的消息开始递送。
10.关闭
先关闭生产者和消费者客户端,再关闭kafka broker,最后关闭zookeeper
10.1 关闭kafka
//关闭挂起运行
ctrl+c
//关闭后台运行
bin/kafka-server-stop.sh
10.2 关闭zookeeper
//关闭挂起运行
ctrl+c
//关闭后台运行
bin/zookeeper-server-stop.sh
版权归原作者 羁客% 所有, 如有侵权,请联系我们删除。