docker搭建kafka集群完整版(windows)
1.安装docker desktop.
打开docker官网,下载docker desktop,这里直接给出网址:Install Docker Desktop on Windows | Docker Docs
如下图,点击下载即可。
下载好后 点击运行exe文件,我们采用交互式安装程序。
安装完成后直接重启即可,默认安装在c盘,如果不想安装在c盘就采用命令行的方式安装。官网有教程。
点击接受
之后点击登录
当然不登录也没关系。
接下来我们安装一下Linux内核,打开windows powershell,运行wsl --date,即可(看情况,电脑没有或软件没有提示的情况就要安装)
配置一下环境,如下,打开右上角的设置,更改下面的数据位置
之后配置国内镜像源,可用参考网上给的代码
{
"registry-mirrors": [
"https://registry.docker-cn.com",
"http://hub-mirror.c.163.com",
"https://docker.mirrors.ustc.edu.cn",
"https://cr.console.aliyun.com",
"https://mirror.ccs.tencentyun.com"
],
"builder": {
"gc": {
"defaultKeepStorage": "20GB",
"enabled": true
}
},
"experimental": false,
"features": {
"buildkit": true
}
}
之后点击应用并重启即可。
过程中可能出现的问题:
1.Docker 一直starting
遇到这种情况,一般是因为没有安装wsl 2(或者没有打开),安装即可。在安装这个之前需要启用虚拟化,一般都开启了,这里不详细介绍。
安装完成后重启电脑即可。
如下图,便是安装完成了
上面有一个是我拉取的一个image,一开始没有。
这样我们便可以在windows上使用docker了。
注意:后续如果关闭后一直显示正在启动中建议重启电脑重新启动,亲测有效。
2.下载镜像
打开Windows powershell,运行下面命令:
docker pull bitnami/kafka
docker pull zookeeper
版本随意,但建议都采用最新的版本,老版本可能会出现版本冲突,但你不知道会不会发生冲突,容易出问题。
准备工作:
在开始新建集群之前,新建好文件夹,根据下面的yml配置文件选择的地址来建文件夹(冒号后面的可以不建),如下图(可以自己改变位置):
不然数据会默认安装到C盘。
3.创建docker网络
运行下面命令:
docker network create zk-net
如下图:
4.docker compose 搭建kafka集群
之前还要搭建zookeeper集群,用了俩个compose.yml文件,这里合并一下,缩减操作。
创建一个yml文件,名字随意,这里取名为docker-compose-kafka.yml
文件配置如下:
version:"3"networks:zk-net:external:name: zk-net
services:z1:image:'zookeeper:latest'container_name: z1
hostname: z1
environment:ZOO_MY_ID:1ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=z2:2888:3888;2181 server.3=z3:2888:3888;2181
ALLOW_ANONYMOUS_LOGIN:"yes"networks:- zk-net
ports:- 2181:2181- 8081:8080volumes:- /D/docker_desktop/z1/z1/data:/data
- /D/docker_desktop/z1/z1/datalog:/datalog
z2:image:'zookeeper:latest'container_name: z2
hostname: z2
environment:ZOO_MY_ID:2ZOO_SERVERS: server.1=z1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=z3:2888:3888;2181
ALLOW_ANONYMOUS_LOGIN:"yes"networks:- zk-net
ports:- 2182:2181- 8082:8080volumes:- /D/docker_desktop/z1/z2/data:/data
- /D/docker_desktop/z1/z2/datalog:/datalog
z3:image:'zookeeper:latest'container_name: z3
hostname: z3
environment:ZOO_MY_ID:3ZOO_SERVERS: server.1=z1:2888:3888;2181 server.2=z2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
ALLOW_ANONYMOUS_LOGIN:"yes"networks:- zk-net
ports:- 2183:2181- 8083:8080volumes:- /D/docker_desktop/z1/z3/data:/data
- /D/docker_desktop/z1/z3/datalog:/datalog
kafka1:image:'bitnami/kafka:latest'restart: always
container_name: kafka1
hostname: kafka1
ports:-'9092:9092'environment:- ALLOW_NONE_AUTHENTICATION=yes
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9092- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:9092- KAFKA_CFG_ZOOKEEPER_CONNECT=z1:2181,z2:2181,z3:2181volumes:- /D/docker_desktop/k1/kafka1:/bitnami/kafka
networks:- zk-net
kafka2:image:'bitnami/kafka:latest'restart: always
container_name: kafka2
hostname: kafka2
ports:-'9093:9093'environment:- ALLOW_NONE_AUTHENTICATION=yes
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_BROKER_ID=2
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9093- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:9093- KAFKA_CFG_ZOOKEEPER_CONNECT=z1:2181,z2:2181,z3:2181volumes:- /D/docker_desktop/k1/kafka2:/bitnami/kafka
networks:- zk-net
kafka3:image:'bitnami/kafka:latest'restart: always
container_name: kafka3
hostname: kafka3
ports:-'9094:9094'environment:- ALLOW_NONE_AUTHENTICATION=yes
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_BROKER_ID=3
- KAFKA_CFG_LISTENERS=PLAINTEXT://0.0.0.0:9094- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:9094- KAFKA_CFG_ZOOKEEPER_CONNECT=z1:2181,z2:2181,z3:2181volumes:- /D/docker_desktop/k1/kafka3:/bitnami/kafka
networks:- zk-net
5.启动kafka集群
之后在windows powershell上运行下面命令:
docker-compose -f D:\docker_desktop\z1\docker-compose_kafka.yml up -d
停止配置文件运行代码如下:(不要运行,一般是上面代码报错再运行停止服务的)
docker-compose -f D:\docker_desktop\z1\docker-compose_kafka.yml stop
6.使用docker desktop
如下图,便是docker desktop的一个界面:
点击kafka镜像的状态或者左上角的容器,如下图:
选择一个容器进入即可:
进入后如下图:
其中,logs代表日志信息,inspect可以查看kafka的配置信息,包括网络和集群等信息,如下图:
exec就是容器内部了,可以通过写指令来操控容器,如图所示:
容器内部其实就是一个Linux系统样的东西,在Files里面可以看到容器的结构。如图所示:
更多关于docker desktop的使用请自己摸索或查阅。这里只是简单介绍一下方便下面的教学。
6.创建主题
搭建好集群后我们需要创建一个主题,来进行后面的测试。
进入kafka容器,俩种方式,一种是通过docker desktop进入,还有一种是通过命令行的方式进入,命令行的方式自己去搜,这里通过docker desktop进入,跟上面一样进入一个kafka点击exec即可,如下图:
注意有上角的灰色垃圾箱代表清空桌面,红色的代表删除容器。
刚进入容器默认在/目录下,我们需要进入到kafka的bin目录下,使用cd命令即可,如下图:
注意kafka的文件夹在/opt/bitnami/目录里面。
之后我们通过下面命令创建主题:
./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic topic1
./kafka-topics.sh --create
: 这一部分告诉Kafka命令行工具你想要创建一个新的主题。--bootstrap-server localhost:9092
: 这一部分指定了Kafka服务的地址和端口。在这个例子中,服务运行在本地主机的9092端口。--replication-factor 3
: 这一部分指定了主题的副本因子,即数据在Kafka集群中的复制次数。在这个例子中,数据将被复制3次。--partitions 3
: 这一部分指定了主题的分区数。在Kafka中,数据被组织成多个分区,每个分区可以独立地处理和存储。在这个例子中,主题将有3个分区。--topic topic1
: 这一部分指定了要创建的主题名称。在这个例子中,主题名称为"topic1"。
注意:在Kafka中,副本数不可以大于分区数。因为副本是以目录存储在各个broker节点的data目录下,如果副本数量大于broker节点数量,那么在同一个Broker节点的data目录下会有两个一样的文件夹,这是不允许的。
网上的命令都有点老了,可能会报错,建议用这个命令,或者用–help查阅怎么使用,如下:
./kafka-topics.sh --help
使用下面命令可以查看主题:
kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic topic1
如下图:
7.命令行使用生产者和消费者程序
在容器内运行如下命令,打开消费端:
./kafka-console-consumer.sh --from-beginning --topic ysh --bootstrap-server localhost:9092
之后在打开windows powershell进入一个kafka容器打开生产者程序,命令如下:
docker exec -it kafka2 /bin/bash
之后进入kafka bin目录下 ,命令如下:
cd /opt/bitnami/kafka/bin
打开生产者程序,命令如下:
./kafka-console-producer.sh --broker-list localhost:9093 --topic topic1
注意:上面的我端口是9093,不是9092,9092是kafka1的端口,而我进入的是kafka2,不然会报错,要想在所有kafka集群里面都可以直接用9092就需要改一下上面的配置文件yml,如下图:
将他们的端口都映射到9092就可以了,跟zookeeper一样,每个kafka都要改。
之后再生产者端写入数据,可以看到消费端有数据出来,如下图:
8.kafka Java API 编写生产者程序和消费者程序(以读取股票信息为例)
在编写代码前需要先在C盘,windows/system32/drivers/hosts文件里面将kafka的网络添加进去,不然idea无法识别,idea机制问题,
这样就没问题了。
导入依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.2</version>
</dependency>
编写生产者程序:
importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerRecord;importjava.io.*;importjava.util.Properties;publicclassKafkaProducerTest{publicstaticvoidmain(String[] args)throwsFileNotFoundException,UnsupportedEncodingException{Properties props =newProperties();//1.指定Kafaka集群的ip地址和端口号
props.put("bootstrap.servers","kafka1:9092,kafka2:9093,kafka3:9094");//2.等待所有副本节点的应答
props.put("acks","all");//3.消息发送最大尝试次数
props.put("retries",0);//4.指定一批消息处理次数
props.put("batch.size",16384);//5.指定请求延时
props.put("linger.ms",1);//6.指定缓存区内存大小
props.put("buffer.memory",33554432);//7.设置key序列化
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");//8.设置value序列化
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");// 9、生产数据KafkaProducer<String,String> producer =newKafkaProducer<String,String>(props);// 定义CSV文件路径String csvFile ="C:\\Users\\asus\\Desktop\\data\\股票a.csv";// 读取CSV文件并发送到Kafkatry{//BufferedReader reader = new BufferedReader(new FileReader(csvFile),);BufferedReader reader =newBufferedReader(newInputStreamReader(newFileInputStream(csvFile),"GBK"));String line;
reader.readLine();while((line = reader.readLine())!=null){String[] data = line.split(",");// 假设CSV文件使用逗号分隔String key = data[1];// 假设交易笔数为关键字String value = data[0]+","+ data[1]+","+ data[2]+","+ data[3]+","+ data[4]+","+ data[5]+","+ data[6]+","+ data[7]+","+ data[8];// 假设交易总量为值,使用逗号分隔
producer.send(newProducerRecord<String,String>("ysh",value));System.out.printf(value+"\n");}
producer.close();// 关闭Kafka生产者}catch(IOException e){System.out.printf("文件打开失败");// 处理IO异常}}}
编写消费者程序:
importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.util.Arrays;importjava.util.Properties;importjava.util.concurrent.atomic.AtomicInteger;importjava.util.concurrent.atomic.AtomicLong;publicclassKafkaConsumerTest{publicstaticvoidmain(String[] args){//1、准备配置文件Properties props =newProperties();//2、指定kafka集群主机名和端口号//props.put("zookeeper.connect", "localhost:2181");
props.put("bootstrap.servers","kafka1:9092,kafka2:9093,kafka3:9094");//3、指定消费者组id,在同一时刻同一消费组中只有一个线程可以//去消费一个分区消息,不同的消费组可以去消费同一个分区消息
props.put("group.id","consumer");//4、自动提交偏移量
props.put("enable.auto.commit","true");//5、自动提交时间间隔,每秒提交一次
props.put("auto.commit.interval.ms","1000");
props.put("auto.offset.reset","earliest");
props.put("client.id","zy_client_id");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String,String> kafkaConsumer =newKafkaConsumer<String,String>(props);//6、订阅消息,这里的topic可以是多个
kafkaConsumer.subscribe(Arrays.asList("ysh"));AtomicInteger count =newAtomicInteger(0);// 原子整数用于统计交易笔数之和AtomicLong totalAmount =newAtomicLong(0);// 原子长整型用于统计交易总量之和//System.out.printf("yse"); //7、获取消息long startTime =System.currentTimeMillis();while(true){//每隔10s拉取一次ConsumerRecords<String,String> records = kafkaConsumer.poll(100);if(records.isEmpty()){// 如果 records 为空,则跳过当前循环continue;}for(ConsumerRecord<String,String> record : records){System.out.printf("value=%s%n", record.value());String value = record.value();// 获取消息值(交易总量)String[] values = value.split(",");// 使用逗号分隔值(假设格式为交易笔数,交易总量)int tradeCount =1;// 解析交易笔数(关键字)为整数并加到总和中(假设第一列是交易笔数)long tradeAmount =Long.parseLong(values[4]);// 解析交易总量(值)为长整数并加到总和中(假设第二列是交易总量)
totalAmount.addAndGet(tradeAmount);//
count.addAndGet(tradeCount);}long endTime =System.currentTimeMillis();System.out.printf("tradeCount=%d,totalAmount=%d%n",count.get(),totalAmount.get());System.out.printf("total_time=%d ms %n",endTime-startTime);}}}
先运行消费者程序,再运行生产者程序结果如下:
测试完毕,下面进行参数调优和结果比较。
这里以缓存区内存大小为例:
下面是内存大小为335544b时的运行结果,花费时间为3966ms
下图为缓存大小为33554b时的运行结果,花费时间为3952ms
需要注意的是在缓存大小一定的情况下,花费时间也不是固定的,还收网络速度等因素的影响,下图为缓存大小为33554b时花费时间为3774,较上图速度明显减小,但缓存大小未变。
9.总结
在window上搭建kafka集群并用java API 的过程中,因为对很多知识点的不了解,导致过程之中发生了很多意外,比如如何使用window desktop,如何在windows上面搭建docker,docker如何搭建kafka集群,如何配置网络连接,搭建好kafka后如何创建主题,如何查看主题,如何运行生产者程序,如何运行消费者程序,idea如何连接容器内的kafka集群,idea无法连接容器kafka集群,消息遗漏等一系列问题。在搭建kafka集群的过程中,虽然遇到了很多问题,但也让我学到了很多,包括kafka和docker的一些常用命令,kafka API,消息遗漏,网络连接通信的知识。
版权归原作者 啊华的程序人生 所有, 如有侵权,请联系我们删除。