一、准备
确保服务器上已经搭建完成JDK,zookeeper服务;
如果未搭建完成,请移步参考以下文章:
安装zookeeper:
1.准备
zookeeper官网地址:Apache ZooKeeper
下载安装方式
- 使用wget命令行下载
wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.12.tar.gz
- 采用下载安装包的方式:Index of /zookeeper/stable
由于要安装在远程服务器上,故采用第一种方式安装;
注意:要下载源码包,否则启动客户端是启动失败;后缀带-bin.tar.gz
启动报错如下:
ZooKeeper JMX enabled by default Using config: /opt/software/zookeeper/bin/../conf/zoo.cfg Starting zookeeper ... FAILED TO START
看下zookeeper日志文件具体报错信息:
错误: 找不到或无法加载主类 org.apache.zookeeper.ZooKeeperMain
2、安装与配置
创建和解压
tar -zxvf apache-zookeeper-3.5.9.tar.gz
重命名:mv apache-zookeeper-3.5.9 zookeeper
创建数据存储目录与日志目录
进入zookeeper解压缩后的目录,新建数据文件夹dataDir和日志文件夹dataLogDir
命令:mkdir dataDir和mkdir dataLogDir
conf配置文件
进入配置目录,赋值拷贝样本文件
命令:cp zoo_sample.cfg zoo.cfg
修改 zoo.cfg文件内容
1.修改数据存储文件地址,按照上面建立的目录,小编的如下/opt/software/zookeeper/dataLogDir
[root@localhost kafka-logs]# cat /opt/zk/apache-zookeeper-3.8.0-bin/conf/zoo.cfg
# The number of milliseconds of each tick
tickTime=2000 #服务心跳时间
# The number of ticks that the initial
# synchronization phase can take
initLimit=10 #投票选举新leader的初始化时间
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5 #leader与follower心跳检测最大容忍时间
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/zk/apache-zookeeper-3.8.0-bin/data #数据目录
dataLogDir=/opt/zk/apache-zookeeper-3.8.0-bin/log #日志目录
pidfile=/var/run/zookeeper.pid
# the port at which the clients will connect
clientPort=2181 #zk端口
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true
- 配置环境变量
- 打开/etc/profile 文件:
vim /etc/profile
- 新增以下配置:
#ZOOKEEPER_HOME
export ZOOKEEPER_HOME=/opt/software/zookeeper
export PATH=$ZOOKEEPER_HOME/bin:$PATH
3.重新加载配置:
source /etc/profile
3、运行
- 启动命令:
./zkServer.sh start
- 停止命令:
./zkServer.sh stop
- 查看运行状态:
./zkServer.sh status
kafka:
kafka官网下载:Apache Download Mirrors
Index of /dist/kafka/2.2.2
二、安装与配置
- 解压缩:
tar -zxvf kafka_2.11-2.2.0.tgz
- 重命名:
mv kafka_2.11-2.2.0 kafka
[root@localhost kafa]# ls -al
总用量 164020
drwxr-xr-x. 4 root root 95 8月 16 08:26 .
drwxr-xr-x. 8 root root 82 5月 18 16:09 ..
drwxr-xr-x. 8 root root 119 8月 16 08:33 kafka
-rw-r--r--. 1 root root 63999924 8月 15 17:22 kafka_2.11-2.2.0.tgz
-rw-r--r--. 1 root root 103956099 8月 15 16:03 kafka_2.13-3.2.1.tgz
- 创建
logs
文件,用于存储kafka日志: 在kafka安装目录下创建:mkdir kafka-logs /opt/kafka/kafka-logs - 修改
server.properties
配置文件[root@localhost kafka]# cat config/server.properties | grep -v "#" | grep -v "^$"broker.id=1listeners=PLAINTEXT://0.0.0.0:9092advertised.listeners=PLAINTEXT://192.168.1.36:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/opt/kafa/kafka/kafka-logsnum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=localhost:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0
解释:#一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumersbroker.id=1 #listeners=PLAINTEXT://:9092 #advertised.listeners=PLAINTEXT://your.host.name:9092 #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SS # broker 处理消息的最大线程数,一般情况下不需要去修改num.network.threads=3 # broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数num.io.threads=8 # socket的发送缓冲区(SO_SNDBUF)socket.send.buffer.bytes=102400 # socket的接收缓冲区 (SO_RCVBUF)socket.receive.buffer.bytes=102400 # socket请求的最大字节数。为了防止内存溢出,message.max.bytes必然要小于socket.request.max.bytes=104857600 #kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2log.dirs=/opt/software/kafka/kafka-logs # 每个topic的分区个数,更多的partition会产生更多的segment filenum.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1 # 当达到下面的消息数量时,会将数据flush到日志文件中。默认10000#log.flush.interval.messages=10000 # 当达到下面的时间(ms)时,执行一次强制的flush操作。interval.ms和interval.messages无论哪个达到,都会flush。默认3000ms#log.flush.interval.ms=1000 # 日志保存时间 (hours|minutes),默认为7天(168小时)。超过这个时间会根据policy处理数据。bytes和minutes无论哪个先达到都会触发。log.retention.hours=168 #log.retention.bytes=1073741824 # 控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)log.segment.bytes=1073741824 # 日志片段文件的检查周期,查看它们是否达到了删除策略的设置(log.retention.hours或log.retention.bytes)log.retention.check.interval.ms=300000 # Zookeeper quorum设置。如果有多个使用逗号分割 例如 ip:prot,ip:prot,ip:protzookeeper.connect=localhost:2181 # 连接zk的超时时间zookeeper.connection.timeout.ms=6000 # ZooKeeper集群中leader和follower之间的同步实际group.initial.rebalance.delay.ms=0
四、运行
1.启动zookeeper服务
由于小编服务器上已经启动过zookeeper服务,故不需要重新执行启动命令;
如果服务器zookeeper服务未启动,则在kafka目录下执行以下命令:
使用安装包中的脚本启动单节点Zookeeper 实例:
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
注意:以下命令,小编均选择在kafka安装根目录下执行;
2.启动kafka服务
以下方式任选其一
kafka目录启动
使用kafka-server-start.sh 启动kafka 服务:
bin/kafka-server-start.sh config/server.properties
这种命令执行并不是后台进程运行,故使用以下命令
bin/kafka-server-start.sh -daemon config/server.properties
命令需要切换到kafka的bin目录下,
./kafka-server-start.sh -daemon /opt/software/kafka//config/server.properties
3.创建topic。只有一个broker,所以这里一个topic,5个分区,1个副本(不能大于broker数量)
[root@localhost kafka]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 5 --topic wubo
Created topic wubo.
4 查看描述 Topic 信息
[root@localhost kafka]# kafka-topics.sh --describe --zookeeper localhost:2181 --topic wubo
Topic:wubo PartitionCount:5 ReplicationFactor:1 Configs:
Topic: wubo Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: wubo Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: wubo Partition: 2 Leader: 1 Replicas: 1 Isr: 1
Topic: wubo Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: wubo Partition: 4 Leader: 1 Replicas: 1 Isr: 1
说明:
第一行给出了所有分区的摘要,每个附加行给出了关于一个分区的信息。 由于我们只有一个分区,所以只有一行。
Leader: 是负责给定分区的所有读取和写入的节点。 每个节点将成为分区随机选择部分的领导者。
Replicas: 是复制此分区日志的节点列表,无论它们是否是领导者,或者即使他们当前处于活动状态。
Isr: 是一组“同步”副本。这是复制品列表的子集,当前活着并被引导到领导者。
5.查看topic
[root@localhost kafka]# kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
jettech
test
wubo
6 .产生消息:kafka-console-producer.sh
[root@localhost kafka]# kafka-console-producer.sh --broker-list localhost:9092 --topic wubo
>"jettech hello world"
>^C[root@localhost kafka]#
使用Ctrl+C退出生成消息;
7. 消费消息:kafka-console-consumer.sh
使用kafka-console-consumer.sh 接收消息并在终端打印
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning (这里用删除线标识,并不是代表命名错误,低版本仍然可以适用!!!)
[root@localhost kafka]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wubo --from-beginning
"jettech hello world"
8.删除topic
kafka-topics.sh --delete --zookeeper localhost:2181 --topic wubo
版权归原作者 Michaelwubo 所有, 如有侵权,请联系我们删除。