0


【从面试题看源码】-看完Kafka性能优化-让你吊打面试官

ProducerConfig和ConsumerConfig释义

Kafka客户端开发中有一个ProducerConfig和ConsumerConfig,熟悉这两个文件内容的含义对我们使用,调优Kafka是非常有帮助的。Ctrl+F搜索吧。

生产者配置参数释义
1.bootstrap.servers
指定Kafka集群所需的broker地址清单,默认”“

2.metadata.max.age.ms
强制刷新元数据时间,毫秒,默认300000,5分钟

3.batch.size
指定ProducerBatch内存区域的大小,默认16kb

4.acks
指定分区中必须有多少个副本收到这条消息,才算消息发送成功,默认值1,字符串类型

5.linger.ms
指定ProducerBatch在延迟多少毫秒后再发送,但如果在延迟的这段时间内batch的大小已经到了batch.size设置的大小,那么消息会被立即发送,不会再等待,默认值0

6.client.id
用户设定,用于跟踪记录消息,默认”“

7.send.buffer.bytes
Socket发送缓冲区大小,默认128kb,-1将使用操作系统的设置

8.receive.buffer.bytes
Socket接收缓冲区大小,默认32kb,-1将使用操作系统的设置

9.max.request.size
限制生产者客户端发送消息的最大值,默认1MB

10.reconnect.backoff.ms
连接失败后,尝试连接Kafka的时间间隔,默认50ms

11.reconnect.backoff.max.ms
尝试连接到Kafka,生产者客户端等待的最大时间,默认1000ms

12.max.block.ms
控制生产者客户端send()方法和partitionsFor()方法的阻塞时间。当生产者的发送缓存区已满,或者没有可用元数据时,这些方法就会阻塞,默认60s

13.buffer.memory
生产者客户端中用于缓存消息的缓存区大小,默认32MB

14.retry.backoff.ms
消息发送失败重试时间间隔,默认100ms

15.compression.type
指定消息的压缩方式,默认不压缩

16.metrics.sample.window.ms
样本计算时间窗口,默认30000ms

17.metrics.num.samples
用于维护metrics的样本数量,默认2

18.metrics.log.level
metrics日志记录级别,默认info

19.metric.reporters
类的列表,用于衡量指标,默认空list

20.max.in.flight.requests.per.connection
可以在一个connection中发送多个请求,叫作一个flight,这样可以减少开销,但是如果产生错误,可能会造成数据的发送顺序改变,默认5

21.retries
消息发送失败重试次数,默认0

22.key.serializer
key的序列化方式

23.value.serializer
value序列化类方式

24.connections.max.idle.ms
设置多久之后关闭空闲连接,默认540000ms

25.partitioner.class
分区类,实现Partitioner接口,可以自定义分区规则

26.request.timeout.ms
客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求,超过重试次数将抛异常,默认30000ms

27.interceptor.classes
拦截器类,实现ProducerInterceptor接口,自定义拦截器

28.enable.idempotence
true为开启幂等性

29.transaction.timeout.ms
事务超时时间,默认60000ms

30.transactional.id
设置事务id,必须唯一

消费者配置参数释义
1.group.id
消费者所属消费组的唯一标识

2.max.poll.records
一次拉取请求的最大消息数,默认500条

3.max.poll.interval.ms
指定拉取消息线程最长空闲时间,默认300000ms

4.session.timeout.ms
检测消费者是否失效的超时时间,默认10000ms

5.heartbeat.interval.ms
消费者心跳时间,默认3000ms

6.bootstrap.servers
连接集群broker地址

7.enable.auto.commit
是否开启自动提交消费位移的功能,默认true

8.auto.commit.interval.ms
自动提交消费位移的时间间隔,默认5000ms

9.partition.assignment.strategy
消费者的分区配置策略

10.auto.offset.reset
如果分区没有初始偏移量,或者当前偏移量服务器上不存在时,将使用的偏移量设置,earliest从头开始消费,latest从最近的开始消费,none抛出异常

11.fetch.min.bytes
消费者客户端一次请求从Kafka拉取消息的最小数据量,如果Kafka返回的数据量小于该值,会一直等待,直到满足这个配置大小,默认1b

12.fetch.max.bytes
消费者客户端一次请求从Kafka拉取消息的最大数据量,默认50MB

13.fetch.max.wait.ms
从Kafka拉取消息时,在不满足fetch.min.bytes条件时,等待的最大时间,默认500ms

14.metadata.max.age.ms
强制刷新元数据时间,毫秒,默认300000,5分钟

15.max.partition.fetch.bytes
设置从每个分区里返回给消费者的最大数据量,区别于fetch.max.bytes,默认1MB

16.send.buffer.bytes
Socket发送缓冲区大小,默认128kb,-1将使用操作系统的设置

17.receive.buffer.bytes
Socket发送缓冲区大小,默认64kb,-1将使用操作系统的设置

18.client.id
消费者客户端的id

19.reconnect.backoff.ms
连接失败后,尝试连接Kafka的时间间隔,默认50ms

20.reconnect.backoff.max.ms
尝试连接到Kafka,生产者客户端等待的最大时间,默认1000ms

21.retry.backoff.ms
消息发送失败重试时间间隔,默认100ms

22.metrics.sample.window.ms
样本计算时间窗口,默认30000ms

23.metrics.num.samples
用于维护metrics的样本数量,默认2

24.metrics.log.level
metrics日志记录级别,默认info

25.metric.reporters
类的列表,用于衡量指标,默认空list

26.check.crcs
自动检查CRC32记录的消耗

27.key.deserializer
key反序列化方式

28.value.deserializer
value反序列化方式

29.connections.max.idle.ms
设置多久之后关闭空闲连接,默认540000ms

30.request.timeout.ms
客户端将等待请求的响应的最大时间,如果在这个时间内没有收到响应,客户端将重发请求,超过重试次数将抛异常,默认30000ms

31.default.api.timeout.ms
设置消费者api超时时间,默认60000ms

32.interceptor.classes
自定义拦截器

33.exclude.internal.topics
内部的主题:一consumer_offsets 和一transaction_state。该参数用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true。如果设置为 true,那么只能使用 subscribe(Collection)的方式而不能使用 subscribe(Pattern)的方式来订阅内部主题,设置为 false 则没有这个限制。

34.internal.leave.group.on.close

35.isolation.level
用来配置消费者的事务隔离级别。如果设置为“read committed”,那么消费者就会忽略事务未提交的消息,即只能消费到 LSO (LastStableOffset)的位置,默认情况下为 “read_uncommitted”,即可以消 费到 HW (High Watermark)处的位置

Kafka服务端脚本详解(1)-topics

脚本名称脚本用途kafka-topics.shtopic管理脚本connect-distributed.sh连接分布式模式脚本connect-standalone.sh连接单机模式脚本

kafka-topics.sh

--partitions

创建或修改主题的分区数

--replication-factor

副本因子,副本数量

--replica-assignment

手动指定分区副本分配方案,使用该参数,不用指定--partitions 和 --replication-factor

--topic

主题名称

--zookeeper

连接kafka zk地址

--alter

修改分区,副本,配置

--bootstrap-server

kafka服务器地址

--create

创建主题

--delete

删除主题

--list

列出所有的可用主题

 [[email protected] kafka_2]*# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --list*
 __consumer_offsets
 first
 test
 topic-3
 topic-4
 topic-5
 topic-6
 topic-admin
 topic-create-diff
 topic-two

--describe

列出主题的详细信息

--exclude-internal

使用--list --describe 命令时是否列出内部主题,默认列出内部主题

--command-config

以配置文件的形式修改Admin Client的配置,支持的配置见org.apache.kafka.clients.admin.AdminClientConfig

*//me.properties*
request.timeout.ms=200000

*//*
bin/kafka-topics.sh --bootstrap-server  10.211.55.3:9092 --topic topic-two --list  --command-config config/me.properties 

--config

在创建/修改主题的时候可以对主题默认参数进行覆盖,具体支持的参数见http://kafka.apachecn.org/documentation.html#topicconfigs
该参数将在以后废弃,请使用kafka-configs.sh

 [[email protected] kafka_2.11-2.2.0]# bin/kafka-topics.sh --bootstrap-server  10.211.55.3:9092 --topic topic-two --describe
Topic:topic-two PartitionCount:1        ReplicationFactor:1     Configs:segment.bytes=1073741824,retention.bytes=1073741824
Topic: topic-two        Partition: 0    Leader: 0       Replicas: 0     Isr: 0

 [[email protected] kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper  10.211.55.3:2181 --alter --topic topic-two --config segment.bytes=1048577
 WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
         Going forward, please use kafka-configs.sh for this functionality
 Updated config for topic topic-two.
 
[[email protected] kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper  10.211.55.3:2181 --describe --topic topic-two
Topic:topic-two PartitionCount:1        ReplicationFactor:1     Configs:segment.bytes=1048577
Topic: topic-two        Partition: 0    Leader: 0       Replicas: 0     Isr: 0

----delete-config

删除一个配置项

1[[email protected] kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-two --alter --delete-config segment.bytes 
2WARNING: Altering topic configuration from this script has been deprecated and may be removed in future releases.
3         Going forward, please use kafka-configs.sh for this functionality
4Updated config for topic topic-two.
5
6[[email protected] kafka_2.11-2.2.0]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-two --describe
7Topic:topic-two PartitionCount:1        ReplicationFactor:1     Configs:
8        Topic: topic-two        Partition: 0    Leader: 0       Replicas: 0     Isr: 0

--disable-rack-aware

忽略机架信息
有两个broker,一个配了机架信息,另一个没配,在创建topic的时候就会报错

 1[[email protected] kafka_2.11-2.2.0]*# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --create --topic topic-6 --replication-factor 1  --partitions 2*
 2Error while executing topic command : Not all brokers have rack information. Add --disable-rack-aware in command line to make replica assignment without rack information.
 3[2018-12-27 05:22:40,834] ERROR kafka.admin.AdminOperationException: Not all brokers have rack information. Add --disable-rack-aware in command line to make replica assignment without rack information.
 4        at kafka.zk.AdminZkClient.getBrokerMetadatas(AdminZkClient.scala:71)
 5        at kafka.zk.AdminZkClient.createTopic(AdminZkClient.scala:54)
 6        at kafka.admin.TopicCommand$ZookeeperTopicService.createTopic(TopicCommand.scala:274)
 7        at kafka.admin.TopicCommand$TopicService$class.createTopic(TopicCommand.scala:134)
 8        at kafka.admin.TopicCommand$ZookeeperTopicService.createTopic(TopicCommand.scala:266)
 9        at kafka.admin.TopicCommand$.main(TopicCommand.scala:60)
10        at kafka.admin.TopicCommand.main(TopicCommand.scala)
11 (kafka.admin.TopicCommand$)
12
13[[email protected] kafka_2.11-2.2.0]*# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --create --topic topic-6 --replication-factor 1  --partitions 2 --disable-rack-aware*
14Created topic topic-6.

--if-exists

只有当主题存在时,相关命令才会执行,不会显示错误

 1[[email protected] kafka_2]*# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-7  --alter --config segment.bytes=104857 --if-exists*
 2
 3[[email protected] kafka_2]*# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-7  --alter --config segment.bytes=104857*
 4Error while executing topic command : Topics in [] does not exist
 5[2018-12-27 06:01:25,638] ERROR java.lang.IllegalArgumentException: Topics in [] does not exist
 6        at kafka.admin.TopicCommand$.kafka$admin$TopicCommand$$ensureTopicExists(TopicCommand.scala:416)
 7        at kafka.admin.TopicCommand$ZookeeperTopicService.alterTopic(TopicCommand.scala:294)
 8        at kafka.admin.TopicCommand$.main(TopicCommand.scala:62)
 9        at kafka.admin.TopicCommand.main(TopicCommand.scala)
10 (kafka.admin.TopicCommand$)

--if-not-exists

创建主题的时候,只有当主题不存在时,命令才执行,存在时不会报错

1[[email protected] kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-6  --create --partitions 1 --replication-factor 1 --if-not-exists
2
3[[email protected] kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-6  --create --partitions 1 --replication-factor 1 
4Error while executing topic command : Topic 'topic-6' already exists.
5[2018-12-27 06:07:54,185] ERROR org.apache.kafka.common.errors.TopicExistsException: Topic 'topic-6' already exists.
6 (kafka.admin.TopicCommand$)

--topics-with-overrides

显示覆盖过配置的主题

--unavailable-partitions

查看没有leader副本的分区

1[[email protected] kafka_2]# bin/kafka-topics.sh --zookeeper 10.211.55.3:2181 --topic topic-6  --describe --unavailable-partitions
2        Topic: topic-6  Partition: 0    Leader: -1      Replicas: 1     Isr: 1

--under-replicated-partitions

查看所有包含失效副本的分区


connect-distributed.sh & connect-standalone.sh

Kafka Connect 是一款可扩展并且可靠的在 Apache Kafka 和其他系统之间进行数据传输的工具。

1bin/connect-standalone.sh config/connect-standalone.properties  config/connect-file-source.properties
2
3bin/connect-distributed.sh config/connect-distributed.properties

Kafka服务端脚本详解(2)一log,verifiable

脚本名称脚本用途kafka-log-dirs.sh查看指定broker上日志目录使用情况kafka-verifiable-consumer.sh检验kafka消费者kafka-verifiable-producer.sh检验kafka生产者

kafka-log-dirs.sh

--bootstrap-server

kafka地址

--broker-list

要查询的broker地址列表,broker之间逗号隔开,不配置该命令则查询所有broker

--topic-list

指定查询的topic列表,逗号隔开

--command-config

配置Admin Client

--describe

显示详情

1[[email protected] kafka_2.11-2.2.0]# bin/kafka-log-dirs.sh --bootstrap-server 10.211.55.3:9092 --describe --broker-list 0 --topic-list first,topic-3
2Querying brokers for log directories information
3Received log directory information from brokers 0
4{"version":1,"brokers":[{"broker":0,"logDirs":[{"logDir":"/tmp/kafka-logs","error":null,"partitions":[{"partition":"topic-3-0","size":474,"offsetLag":0,"isFuture":false},{"partition":"first-0","size":310,"offsetLag":0,"isFuture":false}]}]}]}

kafka-verifiable-consumer.sh

--broker-list

broker列表, HOST1:PORT1,HOST2:PORT2,…

--topic

要消费的topic

--group-id

消费组id

--max-messages

最大消费消息数量,默认-1,一直消费

 1#设置消费两次后,自动停止
 2[[email protected] kafka_2.11-2.2.0]# bin/kafka-verifiable-consumer.sh --broker-list 10.211.55.3:9092 --topic first --group-id group.demo --max-messages 2
 3{"timestamp":1558869583036,"name":"startup_complete"}
 4{"timestamp":1558869583329,"name":"partitions_revoked","partitions":[]}
 5{"timestamp":1558869583366,"name":"partitions_assigned","partitions":[{"topic":"first","partition":0}]}
 6{"timestamp":1558869590352,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":37,"maxOffset":37}]}
 7{"timestamp":1558869590366,"name":"offsets_committed","offsets":[{"topic":"first","partition":0,"offset":38}],"success":true}
 8{"timestamp":1558869595328,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":38,"maxOffset":38}]}
 9{"timestamp":1558869595335,"name":"offsets_committed","offsets":[{"topic":"first","partition":0,"offset":39}],"success":true}
10{"timestamp":1558869595355,"name":"shutdown_complete"}

--session-timeout

消费者会话超时时间,默认30000ms,服务端如果在该时间内没有接收到消费者的心跳,就会将该消费者从消费组中删除

--enable-autocommit

自动提交,默认false

 1#比较一下两者的差别
 2#没有--enable-autocommit
 3[[email protected] kafka_2.11-2.2.0]# bin/kafka-verifiable-consumer.sh --broker-list 10.211.55.3:9092 --topic first --group-id group.demo
 4{"timestamp":1558875063613,"name":"startup_complete"}
 5{"timestamp":1558875063922,"name":"partitions_revoked","partitions":[]}
 6{"timestamp":1558875063952,"name":"partitions_assigned","partitions":[{"topic":"first","partition":0}]}
 7{"timestamp":1558875069603,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":47,"maxOffset":47}]}
 8{"timestamp":1558875069614,"name":"offsets_committed","offsets":[{"topic":"first","partition":0,"offset":48}],"success":true}
 9
10#有--enable-autocommit
11[[email protected] kafka_2.11-2.2.0]# bin/kafka-verifiable-consumer.sh --broker-list 10.211.55.3:9092 --topic first --group-id group.demo --enable-autocommit
12{"timestamp":1558874772119,"name":"startup_complete"}
13{"timestamp":1558874772408,"name":"partitions_revoked","partitions":[]}
14{"timestamp":1558874772449,"name":"partitions_assigned","partitions":[{"topic":"first","partition":0}]}
15{"timestamp":1558874820898,"name":"records_consumed","count":1,"partitions":[{"topic":"first","partition":0,"count":1,"minOffset":46,"maxOffset":46}]}

--reset-policy

设置消费偏移量,earliest从头开始消费,latest从最近的开始消费,none抛出异常,默认earliest

--assignment-strategy

消费者的分区配置策略, 默认 RangeAssignor

--consumer.config

配置文件


kafka-verifiable-producer.sh

该脚本可以生产测试数据发送到指定topic,并将数据已json格式打印到控制台

--topic

主题名称

--broker-list

broker列表, HOST1:PORT1,HOST2:PORT2,…

--max-messages

最大消息数量,默认-1,一直生产消息

--throughput

设置吞吐量,默认-1

--acks

指定分区中必须有多少个副本收到这条消息,才算消息发送成功,默认-1

--producer.config

配置文件

--message-create-time

设置消息创建的时间,时间戳

--value-prefix

设置消息前缀

--repeating-keys

key从0开始,每次递增1,直到指定的值,然后再从0开始

 1[[email protected] kafka_2.11-2.2.0]# bin/kafka-verifiable-producer.sh --broker-list 10.211.55.3:9092 --topic first --message-create-time 1527351382000 --value-prefix 1 --repeating-keys 10 --max-messages 20
 2{"timestamp":1558877565069,"name":"startup_complete"}
 3{"timestamp":1558877565231,"name":"producer_send_success","key":"0","value":"1.0","topic":"first","partition":0,"offset":1541118}
 4{"timestamp":1558877565238,"name":"producer_send_success","key":"1","value":"1.1","topic":"first","partition":0,"offset":1541119}
 5{"timestamp":1558877565238,"name":"producer_send_success","key":"2","value":"1.2","topic":"first","partition":0,"offset":1541120}
 6{"timestamp":1558877565238,"name":"producer_send_success","key":"3","value":"1.3","topic":"first","partition":0,"offset":1541121}
 7{"timestamp":1558877565238,"name":"producer_send_success","key":"4","value":"1.4","topic":"first","partition":0,"offset":1541122}
 8{"timestamp":1558877565239,"name":"producer_send_success","key":"5","value":"1.5","topic":"first","partition":0,"offset":1541123}
 9{"timestamp":1558877565239,"name":"producer_send_success","key":"6","value":"1.6","topic":"first","partition":0,"offset":1541124}
10{"timestamp":1558877565239,"name":"producer_send_success","key":"7","value":"1.7","topic":"first","partition":0,"offset":1541125}
11{"timestamp":1558877565239,"name":"producer_send_success","key":"8","value":"1.8","topic":"first","partition":0,"offset":1541126}
12{"timestamp":1558877565239,"name":"producer_send_success","key":"9","value":"1.9","topic":"first","partition":0,"offset":1541127}
13{"timestamp":1558877565239,"name":"producer_send_success","key":"0","value":"1.10","topic":"first","partition":0,"offset":1541128}
14{"timestamp":1558877565239,"name":"producer_send_success","key":"1","value":"1.11","topic":"first","partition":0,"offset":1541129}
15{"timestamp":1558877565239,"name":"producer_send_success","key":"2","value":"1.12","topic":"first","partition":0,"offset":1541130}
16{"timestamp":1558877565240,"name":"producer_send_success","key":"3","value":"1.13","topic":"first","partition":0,"offset":1541131}
17{"timestamp":1558877565240,"name":"producer_send_success","key":"4","value":"1.14","topic":"first","partition":0,"offset":1541132}
18{"timestamp":1558877565241,"name":"producer_send_success","key":"5","value":"1.15","topic":"first","partition":0,"offset":1541133}
19{"timestamp":1558877565244,"name":"producer_send_success","key":"6","value":"1.16","topic":"first","partition":0,"offset":1541134}
20{"timestamp":1558877565244,"name":"producer_send_success","key":"7","value":"1.17","topic":"first","partition":0,"offset":1541135}
21{"timestamp":1558877565244,"name":"producer_send_success","key":"8","value":"1.18","topic":"first","partition":0,"offset":1541136}
22{"timestamp":1558877565244,"name":"producer_send_success","key":"9","value":"1.19","topic":"first","partition":0,"offset":1541137}
23{"timestamp":1558877565262,"name":"shutdown_complete"}
24{"timestamp":1558877565263,"name":"tool_data","sent":20,"acked":20,"target_throughput":-1,"avg_throughput":100.50251256281408}

Kafka服务端脚本详解(3)-性能测试脚本

脚本名称脚本用途kafka-producer-perf-test.shkafka 生产者性能测试脚本kafka-consumer-perf-test.shkafka 消费者性能测试脚本kafka-console-producer.shkafka 生产者控制台kafka-console-consumer.shkafka 消费者控制台

kafka-producer-perf-test.sh

kafka 生产者性能测试脚本

--topic
消息主题名称
----num-records
需要生产的消息数量
--payload-delimiter
指定 --payload-file 文件的分隔符,默认为换行符 \n
--throughput
设置消息吞吐量,messages/sec
--producer-props
发送端配置信息,配置信息优先于 --producer.config
--producer.config
发送端配置文件
--print-metrics
是否打印测试指标,默认 false
--transactional-id
用于测试并发事务的性能 (默认值:performance-producer-default-transactional-id)
--transaction-duration-ms
事务时间最大值,超过这个值就提交事务,只有 > 0 时才生效
--record-size
每条消息字节数
--payload-file
测试数据文件

测试 10w 条数据,每条数据 1000 字节,每秒发送 2000 条数据

[[email protected] kafka_2.11-2.2.0]# bin/kafka-producer-perf-test.sh --producer-props bootstrap.servers=10.211.55.3:9092 --topic first --record-size 1000 --num-records 100000  --throughput 2000
9999 records sent, 1999.8 records/sec (1.91 MB/sec), 8.6 ms avg latency, 406.0 ms max latency.
10007 records sent, 2001.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 8.0 ms max latency.
10002 records sent, 2000.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 10.0 ms max latency.
10000 records sent, 2000.0 records/sec (1.91 MB/sec), 0.8 ms avg latency, 37.0 ms max latency.
10008 records sent, 2001.2 records/sec (1.91 MB/sec), 0.6 ms avg latency, 7.0 ms max latency.
10004 records sent, 2000.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 5.0 ms max latency.
10000 records sent, 2000.0 records/sec (1.91 MB/sec), 0.8 ms avg latency, 35.0 ms max latency.
10004 records sent, 2000.8 records/sec (1.91 MB/sec), 0.8 ms avg latency, 33.0 ms max latency.
10004 records sent, 2000.4 records/sec (1.91 MB/sec), 0.7 ms avg latency, 5.0 ms max latency.
100000 records sent, 1999.280259 records/sec (1.91 MB/sec), 1.50 ms avg latency, 406.00 ms max latency, 1 ms 50th, 2 ms 95th, 43 ms 99th, 91 ms 99.9th.

测试结果为:每秒发送 1.91MB 数据,平均延迟 1.5ms,最大延迟 406ms, 延迟小于 1ms 占 50%,小于 2ms 占 95%...


kafka-consumer-perf-test.sh

kafka 消费者性能测试脚本
--topic
消费的主题名称
--broker-list
kafka 地址
--consumer.config
消费端配置文件
--date-format
格式化时间
--fetch-size
一次请求拉取的消息大小,默认 1048576 字节
--from-latest
如果消费者还没有已建立的偏移量,就从日志中的最新消息开始,而不是最早的消息
--group
消费者组 id,默认 perf-consumer-94851
--hide-header
如果设置,就跳过打印统计信息的标题
--messages
要获取的消息数量
--num-fetch-threads
获取消息的线程数量
--print-metrics
打印指标信息
--reporting-interval
打印进度信息的间隔,默认 5000ms
--show-detailed-stats
如果设置,将按 --reporting-interval 的间隔打印统计信息
--socket-buffer-size
TCP 获取信息的缓存大小 默认 2097152(2M)
--threads
处理线程数,默认 10
--timeout
返回记录的超时时间

测试消费 50w 条数据

[[email protected] kafka_2.11-2.2.0]# bin/kafka-consumer-perf-test.sh --topic first --broker-list 10.211.55.3:9092 --messages 500000  --timeout 300000
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2019-05-30 01:21:27:072, 2019-05-30 01:21:30:801, 488.6162, 131.0314, 500343, 134176.1866, 25, 3704, 131.9158, 135081.8035

测试结果为:共消费 488.6162MB 数据,每秒消费 131.0314MB, 共消费 500343 条数据,每秒消费 134176.1866 条

Kafka生产者端优化

测试环境虚拟机
CPU:2 核
RAM:2G
Kafka Topic 为 1 分区,1 副本

Kafka 生产者端发送延迟优化

batch.size

batch.size 单位为字节,为了方便这里都表示为kb

默认配置,batch.size=16kb

[[email protected] kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first  --record-size 1024 --num-records 1000000  --throughput 50000
249892 records sent, 49978.4 records/sec (48.81 MB/sec), 153.6 ms avg latency, 537.0 ms max latency.
250193 records sent, 50038.6 records/sec (48.87 MB/sec), 1.4 ms avg latency, 12.0 ms max latency.
211747 records sent, 42349.4 records/sec (41.36 MB/sec), 194.3 ms avg latency, 1106.0 ms max latency.
1000000 records sent, 49972.515117 records/sec (48.80 MB/sec), 119.65 ms avg latency, 1106.00 ms max latency, 2 ms 50th, 488 ms 95th, 1043 ms 99th, 1102 ms 99.9th.

结果显示平均延迟有 456.94 ms,最高延迟 5308.00 ms
现在我要降低最高延迟数,batch.size 的意思是 ProducerBatch 的内存区域充满后,消息就会被立即发送,那我们把值改小看看
batch.size=8kb

[[email protected] kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first  --record-size 1024 --num-records 1000000  --throughput 50000
148553 records sent, 29710.6 records/sec (29.01 MB/sec), 812.4 ms avg latency, 1032.0 ms max latency.
195468 records sent, 39093.6 records/sec (38.18 MB/sec), 735.9 ms avg latency, 907.0 ms max latency.
189700 records sent, 37940.0 records/sec (37.05 MB/sec), 763.4 ms avg latency, 1053.0 ms max latency.
208418 records sent, 41683.6 records/sec (40.71 MB/sec), 689.7 ms avg latency, 923.0 ms max latency.
196504 records sent, 39300.8 records/sec (38.38 MB/sec), 718.1 ms avg latency, 1056.0 ms max latency.
1000000 records sent, 37608.123355 records/sec (36.73 MB/sec), 741.56 ms avg latency, 1056.00 ms max latency, 725 ms 50th, 937 ms 95th, 1029 ms 99th, 1051 ms 99.9th.

但经过测试发现,延迟反而很高,连设定的 50000 吞吐量都达不到,原因应该是这样:batch.size 小了,消息很快就会充满,这样消息就会被立即发送的服务端,但这样的话发送的次数就变多了,但由于网络原因是不可控的,有时候网络发生抖动就会造成较高的延迟
那就改大看看。
batch.size=32kb

[[email protected] kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first  --record-size 1024 --num-records 1000000  --throughput 50000
249852 records sent, 49970.4 records/sec (48.80 MB/sec), 88.8 ms avg latency, 492.0 ms max latency.
250143 records sent, 50028.6 records/sec (48.86 MB/sec), 1.2 ms avg latency, 15.0 ms max latency.
250007 records sent, 49991.4 records/sec (48.82 MB/sec), 1.2 ms avg latency, 17.0 ms max latency.
1000000 records sent, 49952.545082 records/sec (48.78 MB/sec), 31.07 ms avg latency, 492.00 ms max latency, 1 ms 50th, 305 ms 95th, 440 ms 99th, 486 ms 99.9th.

测试后,平均延迟,最高延迟都降下来很多,而且比默认值延迟都要小很多,那再改大延迟还会降低吗
batch.size=50kb

[[email protected] kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic first  --record-size 1024 --num-records 1000000  --throughput 50000
249902 records sent, 49970.4 records/sec (48.80 MB/sec), 27.3 ms avg latency, 219.0 ms max latency.
250200 records sent, 50030.0 records/sec (48.86 MB/sec), 1.2 ms avg latency, 8.0 ms max latency.
250098 records sent, 50019.6 records/sec (48.85 MB/sec), 18.6 ms avg latency, 288.0 ms max latency.
242327 records sent, 48407.3 records/sec (47.27 MB/sec), 121.3 ms avg latency, 920.0 ms max latency.
1000000 records sent, 49823.127896 records/sec (48.66 MB/sec), 41.98 ms avg latency, 920.00 ms max latency, 1 ms 50th, 221 ms 95th, 792 ms 99th, 910 ms 99.9th.

如上测试在不同的机器上结果会有不同,但总体的变化曲线是一样的,成 U 型变化

batch.size 代码实现

Kafka 客户端有一个 RecordAccumulator 类,叫做消息记录池,内部有一个 BufferPool 内存区域

RecordAccumulator(LogContext logContext,
                             int batchSize,
                             CompressionType compression,
                             int lingerMs,
                             long retryBackoffMs,
                             int deliveryTimeoutMs,
                             Metrics metrics,
                             String metricGrpName,
                             Time time,
                             ApiVersions apiVersions,
                             TransactionManager transactionManager,
                             BufferPool bufferPool)

当该判断为 true,消息就会被发送

if (result.batchIsFull || result.newBatchCreated) {
   log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
   this.sender.wakeup();
}

max.in.flight.requests.per.connection

该参数可以在一个 connection 中发送多个请求,叫作一个 flight, 这样可以减少开销,但是如果产生错误,可能会造成数据的发送顺序改变,默认 5

在 batch.size=100kb 的基础上,增加该参数值到 10,看看效果

[[email protected] kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic two   --record-size 1024 --num-records 1000000  --throughput 50000
249902 records sent, 49960.4 records/sec (48.79 MB/sec), 16.1 ms avg latency, 185.0 ms max latency.
250148 records sent, 50019.6 records/sec (48.85 MB/sec), 1.3 ms avg latency, 14.0 ms max latency.
239585 records sent, 47917.0 records/sec (46.79 MB/sec), 6.4 ms avg latency, 226.0 ms max latency.
1000000 records sent, 49960.031974 records/sec (48.79 MB/sec), 9.83 ms avg latency, 226.00 ms max latency, 1 ms 50th, 83 ms 95th, 182 ms 99th, 219 ms 99.9th.

多次测试结果延迟都比原来降低了 10 倍多,效果还是很明显的
但物极必反,如果你再调大后,效果就不明显了,最终延迟反而变高,这个 batch.size 道理是一样的

compression.type

指定消息的压缩方式,默认不压缩

在原来 batch.size=100kb,max.in.flight.requests.per.connection=10 的基础上,设置 compression.type=gzip 看看延迟是否还可以降低

[[email protected] kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic two   --record-size 1024 --num-records 1000000  --throughput 50000
249785 records sent, 49957.0 records/sec (48.79 MB/sec), 2.5 ms avg latency, 199.0 ms max latency.
250091 records sent, 50008.2 records/sec (48.84 MB/sec), 1.9 ms avg latency, 17.0 ms max latency.
250123 records sent, 50024.6 records/sec (48.85 MB/sec), 1.5 ms avg latency, 18.0 ms max latency.
1000000 records sent, 49960.031974 records/sec (48.79 MB/sec), 1.89 ms avg latency, 199.00 ms max latency, 2 ms 50th, 4 ms 95th, 6 ms 99th, 18 ms 99.9th.

测试结果发现延迟又降低了,是不是感觉很强大😁

acks

指定分区中必须有多少个副本收到这条消息,才算消息发送成功,默认值 1
如果配置 acks=0 还能降低一点点延迟,就是不等待 broker 返回是否成功,发出去就完了

[[email protected] kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic two   --record-size 1024 --num-records 1000000  --throughput 50000
249919 records sent, 49963.8 records/sec (48.79 MB/sec), 1.4 ms avg latency, 179.0 ms max latency.
250157 records sent, 50021.4 records/sec (48.85 MB/sec), 1.2 ms avg latency, 10.0 ms max latency.
250228 records sent, 50015.6 records/sec (48.84 MB/sec), 0.9 ms avg latency, 8.0 ms max latency.
1000000 records sent, 49967.521111 records/sec (48.80 MB/sec), 1.09 ms avg latency, 179.00 ms max latency, 1 ms 50th, 3 ms 95th, 4 ms 99th, 6 ms 99.9th.

通过测试上面几个参数,如果只配置其中一个,compression.type=gzip 效果是最好的

[[email protected] kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic two   --record-size 1024 --num-records 1000000  --throughput 50000
249882 records sent, 49956.4 records/sec (48.79 MB/sec), 11.9 ms avg latency, 191.0 ms max latency.
248708 records sent, 49731.7 records/sec (48.57 MB/sec), 2.9 ms avg latency, 92.0 ms max latency.
251380 records sent, 50276.0 records/sec (49.10 MB/sec), 2.0 ms avg latency, 23.0 ms max latency.
249980 records sent, 49996.0 records/sec (48.82 MB/sec), 1.5 ms avg latency, 18.0 ms max latency.
1000000 records sent, 49960.031974 records/sec (48.79 MB/sec), 4.55 ms avg latency, 191.00 ms max latency, 2 ms 50th, 12 ms 95th, 88 ms 99th, 163 ms 99.9th.

在当前环境下,平均延迟能只有 4.55ms, 最大延迟 191ms

如上测试是在单机1分区,1副本的情况下的,为了能看到效果,延迟只是一个指标,但实际中并不是一味追求某个指标,还需要综合考虑,比如低延迟下,还要提高吞吐量,这就会要牺牲一部分的低延迟。不同的优化点,需要调整不同的参数,具体参数可以见 https://dwz.cn/Sl5L3zoq

另外:
如果 Topic 是多分区,也有显著效果,如果还需要降低延迟,可以再通过如上的参数进行优化

比如在当前环境下,我现在要达到 10w 的吞吐量,默认配置下是达不到的

[[email protected] kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic two    --record-size 1024 --num-records 1000000  --throughput 100000
1 records sent, 0.1 records/sec (0.00 MB/sec), 7194.0 ms avg latency, 7194.0 ms max latency.
91167 records sent, 3306.3 records/sec (3.23 MB/sec), 519.4 ms avg latency, 26096.0 ms max latency.
330075 records sent, 66015.0 records/sec (64.47 MB/sec), 2843.5 ms avg latency, 26106.0 ms max latency.
227535 records sent, 45507.0 records/sec (44.44 MB/sec), 556.2 ms avg latency, 2306.0 ms max latency.
236940 records sent, 38577.0 records/sec (37.67 MB/sec), 522.0 ms avg latency, 3439.0 ms max latency.
1000000 records sent, 18762.078088 records/sec (18.32 MB/sec), 1402.18 ms avg latency, 26106.00 ms max latency, 443 ms 50th, 4018 ms 95th, 26073 ms 99th, 26095 ms 99.9th.

通过这几个配置

batch.size=204800 compression.type=gzip 

就近乎达到了 10w 的吞吐量

C[[email protected] kafka_2.11-2.2.0]# ./bin/kafka-producer-perf-test.sh --producer.config config/me.properties --topic tw   --record-size 1024 --num-records 2000000  --throughput 100000
397998 records sent, 79599.6 records/sec (77.73 MB/sec), 3.4 ms avg latency, 193.0 ms max latency.
489610 records sent, 97922.0 records/sec (95.63 MB/sec), 2.5 ms avg latency, 24.0 ms max latency.
522791 records sent, 104558.2 records/sec (102.11 MB/sec), 1.8 ms avg latency, 29.0 ms max latency.
485255 records sent, 96973.4 records/sec (94.70 MB/sec), 1.8 ms avg latency, 26.0 ms max latency.
2000000 records sent, 94665.593790 records/sec (92.45 MB/sec), 2.31 ms avg latency, 193.00 ms max latency, 2 ms 50th, 5 ms 95th, 12 ms 99th, 23 ms 99.9th.

到这里Kafka的所有配置上的性能优化到此就结束了。

标签: kafka java 分布式

本文转载自: https://blog.csdn.net/weixin_40972073/article/details/125286355
版权归原作者 阿提说说2 所有, 如有侵权,请联系我们删除。

“【从面试题看源码】-看完Kafka性能优化-让你吊打面试官”的评论:

还没有评论