0


Zookeeper+kafka集群部署

Zookeeper+kafka集群部署

需jdk环境

安装包下载地址:Index of /dist/zookeeper

上传到/usr/local

tar -zxf zookeeper-3.4.5-cdh5.5.4.tar.gz

rm -rf zookeeper-3.4.5-cdh5.5.4.tar.gz

进入到zookeeper的安装目录

cd zookeeper-3.4.5-cdh5.5.4/

新建data和logs文件夹

[root@test zookeeper-3.4.5]# mkdir data

[root@test zookeeper-3.4.5]# mkdir logs

复制zoo_sample.cfg文件

cd conf/

cp zoo_sample.cfg zoo.cfg

修改zoo.cfg文件

[root@test zookeeper-3.4.5]# vim conf/zoo.cfg

注释:

tickTime:心跳时间

initLimit:多少个心跳时间内,允许其他server连接并初始化数据

syncLimit:多少个tickTime内,允许follower节点同步

dataDir:存放内存数据文件目录,根据实际环境修改

dataLogDir:存放日志文件目录,根据实际环境修改

clientPort:监听端口,使用默认2181端口

server.x:配置集群主机信息,[hostname]:[通信端口]:[选举端口],根据自己的主机信息修改

maxClientCnxns:最大并发客户端数,用于防止DOS的,设置为0是不加限制

minSessionTimeout:最小的客户端session超时时间(单位是毫秒)

maxSessionTimeout:最大的客户端session超时时间(单位是毫秒)

将本机安装目录,通过scp全部拷贝至另外2台机器。

scp zoo.cfg root@10.1.50.137://usr/local/zookeeper-3.4.5/conf

scp zoo.cfg root@10.1.50.138://usr/local/zookeeper-3.4.5/conf

在三台服务器的zookeeper安装目录下的data文件夹下面新建文件myid, 分别输入数字1、2、3,对应上面配置文件的server后面的数字

vim myid

Zookeeper的启动停止 (要三台都启动才行!!!!),分别进入三台服务器的zookeeper安装目录!!!!!!!!!!,输入命令bin/zkServer.sh start 启动Zookeeper服务,查看状态命令:

bin/zkServer.sh start

./zkServer.sh status

进入节点,执行命令为:

./zkCli.sh -server 10.1.50.130:2181 回车

ls / (查看当前 ZooKeeper 中所包含的内容,输入命令quit 退出Zookeeper服务)

启动zookeeper服务后可以通过jps -l 命令查看zookeeper进程,进程名为QuorumPeerMai

在zookeeper安装目录下输入命令 bin/zkServer.sh status 各个节点的状态

如果需要停止zookeeper服务,则在zookeeper安装目录上输入命令 bin/zkServer.sh stop

部署kafka

下载地址:Apache Kafka

不要下载src包,需编译,直接下载tgz包

长传安装包至node-1节点

解压:

tar -zxvf kafka_2.12-3.2.3.tgz -C /opt

重命名:

mv /opt/kafka_2.12-3.2.3/ /opt/kafka

修改配置文件server.properties

vim /opt/kafka/config/server.properties

可用一下配置覆盖:

当前机器在集群中的唯一标识,和zookeeper的myid性质一样

broker.id=1

套接字服务器监听的地址。如果没有配置,主机名将等于的值

listeners=PLAINTEXT://192.168.1.101:9092

当前kafka对外提供服务的端口默认是9092

port=9092

这个是borker进行网络处理的线程数

num.network.threads=3

这个是borker进行I/O处理的线程数

num.io.threads=8

发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能

socket.send.buffer.bytes=102400

kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘

socket.receive.buffer.bytes=102400

这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小

socket.request.max.bytes=104857600

消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个

log.dirs=/opt/kafka/log/kafka-logs

默认的分区数,一个topic默认1个分区数

num.partitions=1

每个数据目录用来日志恢复的线程数目

num.recovery.threads.per.data.dir=1

默认消息的最大持久化时间,168小时,7天

log.retention.hours=168

这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件

log.segment.bytes=1073741824

每隔300000毫秒去检查上面配置的log失效时间

log.retention.check.interval.ms=300000

是否启用log压缩,一般不用启用,启用的话可以提高性能

log.cleaner.enable=false

设置zookeeper的连接端口

zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181

设置zookeeper的连接超时时间

zookeeper.connection.timeout.ms=6000

创建server.properties配置文件中的日志存放目录:

mkdir -p /opt/kafka/log

修改producer.properties配置文件

[root@test kafka]# vim /opt/kafka/config/producer.properties

末行加入:

metadata.broker.list=10.1.50.130:2181,10.1.50.137:2181,10.1.50.138:2181

修改consumer.properties配置文件

[root@test kafka]# vim /opt/kafka/config/consumer.properties

末行加入:

zookeeper.connect=10.1.50.130:2181,10.1.50.137:2181,10.1.50.138:2181

各节点添加环境变量:

vim /etc/profile

末行加入:

export KAFKA_HOME=/opt/kafka

export PATH=$PATH:$KAFKA_HOME/bin

重载配置文件:

source /etc/profile

分发文件至node002与node001节点

[root@test config]# scp -r /opt/kafka root@10.1.50.137:/opt

[root@test config]# scp -r /opt/kafka root@10.1.50.138:/opt/

修改node002与node003的server.properties配置文件

将node002中server.properties的broker.id修改为2

将node003中server.properties的broker.id修改为3

将node002中server.properties的listeners修改为10.1.50.137:9092

将node003中server.properties的listeners修改为10.1.50.138:9092

各节点启动kafka服务,进入kafka服务/bin目录下执行:

kafka-server-start.sh /opt/kafka/config/server.properties

后台启动:

kafka-server-start.sh -daemon /opt/kafka/config/server.properties

查看:

Kafka设置密码

  1. 停止kafka服务

kafka-server-stop.sh

  1. 修改kafka配置文件 server.properties

添加如下配置

#使用的认证协议

security.inter.broker.protocol=SASL_PLAINTEXT

#SASL机制

sasl.enabled.mechanisms=PLAIN

sasl.mechanism.inter.broker.protocol=PLAIN

#完成身份验证的类

authorizer.class.name=kafka.security.authorizer.AclAuthorizer

#如果没有找到ACL(访问控制列表)配置,则允许任何操作。

allow.everyone.if.no.acl.found=false

#需要开启设置超级管理员,设置admin用户为超级管理员

super.users=User:admin

  1. 添加认证后

此处配置修改添加SASL协议

  1. 添加SASL配置文件

vim config/kafka_server_jaas.conf

KafkaServer {

org.apache.kafka.common.security.plain.PlainLoginModule required

    username="admin"

    password="admin@123"

    user_admin="admin@123";

};

以下为例,测试部署不做客户端操作

#########

客户端若配置登录认证

vim config/kafka_client_jaas.conf

KafkaClient {

   org.apache.kafka.common.security.plain.PlainLoginModule required

   username="admin"

   password="admin@123";

};

说明: 这里配置用户名和密码需要和服务端配置的账号密码保持一致,这里配置了admin这个用户

添加kafka-console-consumer.sh认证文件路径,后面启动消费者测试时使用

vim /opt/kafka/bin/kafka-console-consumer.sh ,找到 “KAFKA_HEAP_OPTS”,添加以下参数:

-Djava.security.auth.login.config=/opt/kafka/kafka_client_jaas.conf

#######

  1. vim /opt/kafka/bin/kafka-server-start.sh ,找到 export KAFKA_HEAP_OPTS , 添加jvm 参数为kafka_server_jaas.conf文件,加入一行:

-Djava.security.auth.login.config=/opt/kafka/config/kafka_server_jaas.conf

  1. 启动kafka

各节点启动kafka服务,进入kafka服务/bin目录下执行:

kafka-server-start.sh /opt/kafka/config/server.properties

后台启动:

kafka-server-start.sh -daemon /opt/kafka/config/server.properties

标签: linux 大数据 java

本文转载自: https://blog.csdn.net/LinuxKBG/article/details/131051754
版权归原作者 LinuxKBG 所有, 如有侵权,请联系我们删除。

“Zookeeper+kafka集群部署”的评论:

还没有评论