主题日常管理
- 创建topic,版本2.2
bin/kafka-topics.sh --bootstrap-server broker_host:port --create --topic my_topic_name --partitions 1 --replication-factor 1
从2.2版本,使用--bootstrap-server 参数替换 --zookeeper 参数
原因:
- 使用--zookeeper 参数会绕过kafka的安全体系
- 使用 --bootstrap-server 与集群进行交互,越来越成为使用 Kafka 的标准姿势
- 查询主题列表
bin/kafka-topics.sh --bootstrap-server broker_host:port --list
- 查询主题详细信息
bin/kafka-topics.sh --bootstrap-server broker_host:port --describe --topic <topic_name>
- 修改主题分区alert
bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic <topic_name> --partitions <新分区数>
这里指定的分区一定要比原有分区数大,否则会报错InvalidPartitionsException
- 修改主题级别参数 kafka-configs
bin/kafka-configs.sh --zookeeper zookeeper_host:port --entity-type topics --entity-name <topic_name> --alter --add-config max.message.bytes=10485760
- 变更副本数:kafka-reassign-partitions
- 修改主题限速:这里主要是指设置Leader副本和Follower副本使用的带宽
broker端参数:leader.replication.throttled.rate 和 follower.replication.throttled.rate
#--entity-name 就是 Broker ID
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.rate=104857600,follower.replication.throttled.rate=104857600' --entity-type brokers --entity-name 0
# 为该主题设置要限制的副本,*表示所有的副本
bin/kafka-configs.sh --zookeeper zookeeper_host:port --alter --add-config 'leader.replication.throttled.replicas=*,follower.replication.throttled.replicas=*' --entity-type topics --entity-name test
- 删除主题
bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic <topic_name>
删除操作是异步的,执行完这条命令不代表主题立即就被删除了,它仅仅是被标记成已删除 状态而已,kafka会在后台默默地开启主题删除操作
- 特殊主题的管理与运维
offsets.topic.replication.factor:控制__consumer_offsets的副本数,如果当前运行的broker数量小于offsets.topic.replication.factor的值,kafka会创建主题失败,并显示抛出异常
修改内部主题:假设副本值一开始是1,如何增加到3
- 创建一个json文件,显示提供50个分区对应的副本数
{"version":1, "partitions":[
{"topic":"__consumer_offsets","partition":0,"replicas":[0,1,2]},
{"topic":"__consumer_offsets","partition":1,"replicas":[0,2,1]},
{"topic":"__consumer_offsets","partition":2,"replicas":[1,0,2]},
{"topic":"__consumer_offsets","partition":3,"replicas":[1,2,0]},
...
{"topic":"__consumer_offsets","partition":49,"replicas":[0,1,2]} ]}`
2. 执行kafka-rreassign-partitions脚本
bin/kafka-reassign-partitions.sh --zookeeper zookeeper_host:port --reassignment-json-file reassign.json --execute
通过内部主题查看消费者组提交的位移数据
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --from-beginning
通过内部主题查看消费者组的状态信息
bin/kafka-console-consumer.sh --bootstrap-server kafka_host:port --topic __consumer_offsets --formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" --from-beginning
常见错误
- 主题删除失败
原因:
- 副本所在的broker宕机了,重启对应的Broker之后就能自动恢复
- 待删除主题的部分分区依然在执行迁移过程
- _consumer_offsets占用太多磁盘
使用jstack命令查看kafka-log-cleaner-thread 前缀的线程状态。通常情况下,这都是因为该线程挂掉了,无法及时清理此内部主题,,如果是这个原因导致的,只能重启相应的broker
kafka动态配置
静态参数:在 server.properties中配置的需要重启才能生效的参数称为静态参数
动态参数:修改参数值后,无需重启Broker就能立即生效。1.1版本及以后
kafka官网参数表说明:
- read-only。被标记为 read-only 的参数和原来的参数行为一样,只有重启 Broker,才能令修改生效。
- per-broker。被标记为 per-broker 的参数属于动态参数,修改它之后,只会在对应的 Broker 上生效。
- cluster-wide。被标记为 cluster-wide 的参数也属于动态参数,修改它之后,会在整个集群范围内生效,也就是说,对所有 Broker 都生效。你也可以为具体的 Broker 修改 cluster-wide 参数。
使用场景:
- 动态调整 Broker 端各种线程池大小,实时应对突发流量。
- 动态调整 Broker 端连接信息或安全配置信息。
- 动态更新 SSL Keystore 有效期。
- 动态调整 Broker 端 Compact 操作性能。
- 实时变更 JMX 指标收集器 (JMX Metrics Reporter)。
动态配置如何保存?
kafka将动态broker参数保存在zookeeper中,具体的znode路径为:
- changes用来实时监测动态参数变更,不会保存参数值
- topics用来保存kafka主题级别参数的,不属于动态broker端参数,但是他们也是能够动态变更的
- user和client用于动态调整客户端配额(Quota)的znode节点,配额是指kafka运维人员限制连入集群的客户端的吞吐量或是限定他们使用的CPU资源
- brokers保存动态Broker参数的地方,子节点保存的是cluster-wide范围的动态参数,broker.id保存的是特定broker的per-broker范围参数
优先级:per-broker 参数 > cluster-wide 参数 > static 参数 > Kafka 默认值。
如何配置
#设置cluster-wide 范围的参数
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --add-config unclean.leader.election.enable=true
#查看
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --describe
# 删除cluster-wide范围参数
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-default --alter --delete-config unclean.leader.election.enable
#设置per-broker 范围的参数
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --add-config unclean.leader.election.enable=false
#查看
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --describe
# 删除per-broker范围参数
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port --entity-type brokers --entity-name 1 --alter --delete-config unclean.leader.election.enable
# 查看动态Broker参数的方式:
$ bin/kafka-configs.sh --bootstrap-server kafka-host:port
如何重设消费者位移
kafka和传统消息中间件的一个较为显著的区别是,kafka的消费者读取消息是可以重演的,别的中间件处理和响应消息的方式是破坏性的,既一旦被成功处理,就会被从broker上删除,但是kafka是基于日志结构的消息引擎,消费者消费消息,仅仅只是从磁盘上读取数据,并不会删除消息,并且位移数据也是有消费者控制的,所以能够很容易的修改位移的值,实现重复消费历史数据的功能
选举传统中间件还是kafka的依据:
- 如果消息处理逻辑非常复杂,处理代价很高,同事又不关心消息之间的顺序,那么传统的中间间比较合适
- 如果需要较高的吞吐量,但每条消息的处理时间很短,同事又很在意消息的顺序,则选择kafka
重设位移策略:
- 位移维度:把消费者的位移值重设成给定的位移值
- 时间维度:给定一个时间,让消费者把位移调成大于该时间的最小位移,也可以给出时间间隔,让消费者直接将位移调回30分钟之前的位移值
重设消费组位移的方式:
- 通过消费者API来实现
seek,seekToBeginning,seekToEnd
void seek(TopicPartition partition, long offset);
void seek(TopicPartition partition, OffsetAndMetadata offsetAndMetadata);
void seekToBeginning(Collection<TopicPartition> partitions);
void seekToEnd(Collection<TopicPartition> partitions);
Properties consumerProperties = new Properties();
// 要禁用自动提交位移
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList); String topic = "test"; // 要重设位移的Kafka主题
try (final KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(consumerProperties)) {
consumer.subscribe(Collections.singleton(topic));
consumer.poll(0);//要调用带长整型的poll方法
consumer.seekToBeginning(
consumer.partitionsFor(topic).stream().map(partitionInfo ->
new TopicPartition(topic, partitionInfo.partition()))
.collect(Collectors.toList()));
}
- 通过kafka-consumer-groups命令行脚本来实现
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-earliest –execute
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-latest --execute
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-current --execute
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --all-topics --to-offset <offset> --execute
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --shift-by <offset_N> --execute
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute
bin/kafka-consumer-groups.sh --bootstrap-server kafka-host:port --group test-group --reset-offsets --by-duration PT0H30M0S --execute
kafka脚本工具大全
- connect-standalone 和 connect-distributed:kafka connect组件的启动脚本
- kafka-acls:用于设置kafka权限
- kafka-broker-api-versions:验证不同kafka版本之间服务器和客户端的适配性
- kafka-configs
- kafka-console-consumer
$ bin/kafka-console-consumer.sh --bootstrap-server kafka-host:port --topic test-topic --group test-group --from-beginning --consumer-property enable.auto.commit=false
- kafka-console-producer
$ bin/kafka-console-producer.sh --broker-list kafka-host:port --topic test-topic --request-required-acks -1 --producer-property compression.type=lz4
- kafka-consumer-perf-test:生产者的性能测试工具
#1723.243是吞吐量,但是这个脚本没有计算不同分位数下的分布情况
$ bin/kafka-consumer-perf-test.sh --broker-list kafka-host:port --messages 10000000 --topic test-topic
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec,
rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec 2019-06-26 15:24:18:138,
2019-06-26 15:24:23:805, 9765.6202, 1723.2434, 10000000, 1764602.0822, 16, 5651, 1728.1225,
1769598.3012
- kafka-producer-perf-test:消费者的性能测试工具
#向指定命令发送10000000条消息,每条消息1kb大小,producer-props后面指定要设置的上生产者参数,比如本例中的压缩算法和延时时间等
$ bin/kafka-producer-perf-test.sh --topic test-topic --num-records 10000000 --throughput -1 --record-size 1024 --producer-props bootstrap.servers=kafka-host:port acks=-1 linger.ms=2000 compression.type=lz4
2175479 records sent, 435095.8 records/sec (424.90 MB/sec), 131.1 ms avg latency, 681.0 ms max latency.
4190124 records sent, 838024.8 records/sec (818.38 MB/sec), 4.4 ms avg latency, 73.0 ms max latency.
10000000 records sent, 737463.126844 records/sec (720.18 MB/sec), 31.81 ms avg latency, 681.00 ms max latency, 4 ms 50th, 126 ms 95th, 604 ms 99th, 672 ms 99.9th.
#99th分位值是604ms,说明99%的消息的延时都在604ms以内
- kafka-consumer-groups
- kafka-delegation-tokens:管理Delegation Token,基于Delagation认证是一种轻量级的认证机制,补充了现有的SASL认证机制
- kafka-delete-records:用于删除kafka的分区消息,但是鉴于kafka本身又自己的自动消息删除策略,所以这个脚本很少用
- kafka-dump-log:能够查看kafka消息文件的内容,包括消息的各种元数据信息,甚至是消息体本身
#只是指定--files,那么该命令显示的是消息批次和消息集合的元数据信息,比如创建时间,使用的压缩算法,CRC校验值
$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log
#深入看每条具体的消息
$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration
#查看消息里面的实际数据
$ bin/kafka-dump-log.sh --files ../data_dir/kafka_1/test-topic-1/00000000000000000000.log --deep-iteration --print-data-log
- kafka-log-dirs:可以帮助查询各个Broker上的各个日志路径的磁盘占用情况
- kafka-mirror-maker:实现kafka集群间的消息同步
- kafka-preferred-replica-election:执行Preferred Leader选举,可以为指定的主题执行“换Leader”的操作
- kafka-reassign-partitions:用于执行分区副本迁移以及副本文件路径迁移
- kafka-topics
- kafka-run-class:可以用这个脚本执行任何带main方法的kafka类,早期实现命令辅助工具,现在已经不会用到了
- kafka-server-start和kafka-server-stop:启动和停止kafka broker进程的
- kafka-streams-application-reset:用来给kafka Stream 应用程序重设位移,以便重新消费数据
- trogdor:kafka的测试框架,用于执行各种基准测试和负载测试
查看主题消息总数
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -2 --topic test-topic
test-topic:0:0
test-topic:1:0
$ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list kafka-host:port --time -1 --topic test-topic
test-topic:0:5500000
test-topic:1:5500000
使用GetOffsetShell来计算给定主题特定分区当前的最早位移和最新位移,将两者的差值累加起来,就能得到该主题当前总的消息书
test-topic总的消息数为:550000+550000=11000000
kafka运维利器kakfaAdminClient
引入原因:
- 命令行的脚本都只能运行在控制台,而不能集成在应用程序,运维框架或者是监控平台中
- 这些命令行脚本很多都是通过连接zk来提供服务的,直连zk存在一些潜在问题,会绕开kafka的安全设置
- 运行这些脚本需要使用kafka内部的类实现,也就是kafka服务端的代码,社区希望用户只是用kafka客户端代码,通过现有的请求机制来运维管理集群
如何使用:
- 现在工程中引入依赖,比如maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.3.0</version>
</dependency>
- AdminClient对象的完整路径是:org.apache.kafka.clients.admin.AdminClient,而不是 kafka.admin.AdminClient,后者是服务端的AdminClient,已经不被推荐使用了
- AdminClient实例的创建与销毁
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:port");
props.put("request.timeout.ms", 600000);
try (AdminClient client = AdminClient.create(props)) {
// 执行你要做的操作……
}
- 创建主题
String newTopicName = "test-topic";
try (AdminClient client = AdminClient.create(props)) {
NewTopic newTopic = new NewTopic(newTopicName, 10, (short) 3);
CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));
result.all().get(10, TimeUnit.SECONDS);
}
- 查询消费者位移
String groupID = "test-group";
try (AdminClient client = AdminClient.create(props)) {
ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
Map<TopicPartition, OffsetAndMetadata> offsets =
result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
System.out.println(offsets);
}
- 获取broker磁盘占用
try (AdminClient client = AdminClient.create(props)) {
DescribeLogDirsResult ret =
client.describeLogDirs(Collections.singletonList(targetBrokerId)); // 指定Broker id
long size = 0L;
for (Map<String, DescribeLogDirsResponse.LogDirInfo> logDirInfoMap : ret.all().get().values()) {
size += logDirInfoMap.values().stream().map(logDirInfo -> logDirInfo.replicaInfos).flatMap(
topicPartitionReplicaInfoMap ->
topicPartitionReplicaInfoMap.values().stream().map(replicaInfo -> replicaInfo.size))
.mapToLong(Long::longValue).sum();
}
System.out.println(size);
}
功能:
- 主题管理:包括主题的创建,删除和查询
- 权限管理:包括具体权限的配置和删除
- 配置参数管理:包括kafka各种资源的参数设置,详情查询,所谓的kafka资源,主要有Broker,主题,用户,Client-id等
- 副本日志管理:包括副本底层日志路径的变更和详情查询
- 分区管理:创建额外的主题分区
- 消息删除:即删除指定位移之前的分区消息
- Delegatition Token管理:包括Dalegatition Token的创建,更新,过期和详情查询
- 消费组管理:包括消费组的查询,位移查询和删除
- Preferred领导者选举:推选指定主题分区的Preferred Broker为领导者
工作原理:
AdminClient 是一个双线程设计:前端主线程和I/O线程
前端线程负责:将用户要执行的操作转换成对应的请求,再将请求发送到后端I/O线程的队列中
后端I/O线程(kafka-admin-client-thread):从队列中读取相应的请求,然后发送到对应的Broker节点上。之后把执行结果保存起来,以便等待前端线程的获取
- 前端主线程创建名为Call的请求对象实例,实例有两个任务: - 构建对应的请求对象,比如:创建主题则创建CreateTopicRequest,查询消费组位移,则创建OffsetFetchRequest- 指定相应的回调逻辑,比如:从Broker端接收到CreateTopicRequest,一旦创建好Call实例,前端主线程会将其放入到新请求队列中,此时前端主线程的任务就算完成了,著需要等待结果返回即可
- 后端 I/O 线程使用了 3 个队列来承载不同时期的请求对象,它们分别是新请求队列、待发送请求队列和处理中请求队列。为什么要使用 3 个呢?原因是目前新请求队列的线程安全是由 Java 的 monitor 锁来保证的。为了确保前端主线程不会因为 monitor 锁被阻塞,后端 I/O 线程会定期地将新请求队列中的所有 Call 实例全部搬移到待发送请求队列中进行处理。图中的待发送请求队列和处理中请求队列只由后端 I/O 线程处理,因此无需任何锁机制来保证线程安全。
- 当 I/O 线程在处理某个请求时,它会显式地将该请求保存在处理中请求队列。一旦处理完成,I/O 线程会自动地调用 Call 对象中的回调逻辑完成最后的处理。把这些都做完之后,I/O 线程会通知前端主线程说结果已经准备完毕,这样前端主线程能够及时获取到执行操作的结果。AdminClient 是使用 Java Object 对象的 wait 和 notify 实现的这种通知机制。
- AdminClient 并没有使用 Java 已有的队列去实现上面的请求队列,它是使用 ArrayList 和 HashMap 这样的简单容器类,再配以 monitor 锁来保证线程安全的
kafka认证机制
kafka支持的SASL机制:
- GSSAPI:也就是kerberos使用的安全接口,在0.9版本中被引入,只要让你们的 Kerberos 管理员给每个 Broker 和要访问 Kafka 集群的操作系统用户申请 principal 就好了。
- PLAIN:使用简单的用户名、密码认证的机制,在0.10版本中被引入,适用于用户系统不复杂,访问kafka集群的用户不是很多的场景,因为SASL/PLAIN的配置和运维成本相对较小,但是不能动态的增减认证用户,必须重启kafka集群才能生效,因为所有认证用户信息全部保存在静态文件中。
- SCRAM:主要用于解决PLAIN机制安全问题的新机制,在0.10.2版本中被引入,将认证用户信息保存在zk中,可以使用kafka提供的命令动态的创建和删除用户,无需重启整个集群
- OAUTHBEARER:是基于OAuth2认证框架,在2.0版本中被引进
- Delegation Token:补充现有SASL机制的轻量级认证机制,在1.1.0版本被引入,是一种轻量级的认证机制,如果要使用,需要先配置好SASL认证,再 利用kafka提供的API去获取对应的Delegation Token,这样Broker和客户端在做认证的时候,可以直接使用这个token
可以使用SSL来做通信加密,SASL来做kafka的认证实现
认证:authentication,完成对用户身份的确认,主要目的是确认当前声称为某种身份的用户确实是所声称的用户
授权:authorization,一般是指对信息安全或计算机安全相关的资源定义与授予相应的访问权限
云环境的授权
权限模型:
- ACL:Access-contro List ,访问控制列表
- RBAC:Role-Based Access Control 基于角色的权限控制
- ABAC:attribute-Based Access Control 基于属性的权限控制
- PBAC:Policy-Based Access Control 基于策略的权限控制
kafka使用ACL模型进行授权:
Principal P is [Allowed/Denied] Operation O From Host H On Resource R.
- principal:表示访问kafka集群的用户
- Operation:表示一个具体的访问类型,如读写消息或者创建主题等
- Host:表示连接kafka集群的客户端应用程序IP地址,Host支持星号占位符,表示所有IP地址
- Resource:表示kafka资源,2.3版本为例:包括:TOPIC、CLUSTER、GROUP、TRANSACTIONALID 和 DELEGATION TOKEN。
kafka提供可插拔的授权实现机制,将配置的所有ACL项保存在Zookeeper下的/kafka-acl节点中,可以通过kafka自带的kafka-acls脚本动态的对ACL项进行增删改查
开启ACL
#在server.propoties中添加
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
超级用户
在开启ACL授权后,必须显示的为不同用户设置访问某项资源的权限,否则,默认情况下,没有配置任何ACL的资源是不能被访问的。但是超级用户能够访问所有的资源,及时没有为他们设置任何ACL项
#设置超级用户,在server.propoties中添加
super.users=User:superuser1;User:superuser2
#如果配置了该项,所有用户都可以访问没有设置任何ACL的资源,但是生产环境不建议如此配置
allow.everyone.if.no.acl.found=true
kafka-acls脚本
#为用户Alice增加集群级别的所有权限
$ kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:Alice --operation All --topic '*' --cluster
#User 后面的星号表示所有用户,allow-host 后面的星号则表示所有 IP 地址。这个命令的意思是,允许所有的用户使用任意的 IP 地址读取名为 test-topic 的主题数据,同时也禁止 BadUser 用户和 10.205.96.119 的 IP 地址访问 test-topic 下的消息。
$ bin/kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:'*' --allow-host '*' --deny-principal User:BadUser --deny-host 10.205.96.119 --operation Read --topic test-topic
ACL权限
如果没有配置认证机制,授权机制可以单独使用,但是只能为IP地址设置权限
$ bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --deny-principal User:* --deny-host 127.0.0.1 --operation Write --topic test
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hello
[2019-07-16 10:10:57,283] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2019-07-16 10:10:57,284] ERROR [Producer clientId=console-producer] Topic authorization failed for topics [test] (org.apache.kafka.clients.Metadata)
[2019-07-16 10:10:57,284] ERROR Error when sending message to topic test with key: null, value: 5 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: <span class="orange">Not authorized to access topics: [test]
</span class="orange">
跨集群备份解决方案MirrorMaker
通常我们把数据在单个集群下不同节点之间的拷贝称为备份,而把数据在集群间的拷贝称为镜像
MirrorMaker:
mirrorMaker就是一个消费者+生产者程序,消费者负责从源集消费数据,生产者负责向目标集群发送消息
可使用多套MirrorMaker来连接多套集群
运行MirrorMaker
$ bin/kafka-mirror-maker.sh --consumer.config ./config/consumer.properties --producer.config ./config/producer.properties --num.streams 8 --whitelist ".*"
- consumer.config指定了MirrorMaker中消费者的配置文件地址,最主要的配置项是bootstrap.servers,执行从哪个kafka集群读取消息,因为 MirrorMaker 有可能在内部创建多个消费者实例并使用消费者组机制,因此你还需要设置 group.id 参数。另外,我建议你额外配置 auto.offset.reset=earliest,否则的话,MirrorMaker 只会拷贝那些在它启动之后到达源集群的消息。
consumer.properties:
bootstrap.servers=localhost:9092
group.id=mirrormaker
auto.offset.reset=earliest
- producer.config参数:指定MirrorMaker内部生产者组件的配置文件地址,需要显示指定bootstrap.servers
producer.properties:
bootstrap.servers=localhost:9093
- num.streams:指定kafkaConsumer实例的个数
- whitelist参数:这个参数接收一个正则表达式。所有匹配该正则表达式的主题都会被自动地执行镜像。在这个命令中,我指定了“.*”,这表明我要同步源集群上的所有主题。
注意:MirrorMaker 在执行消息镜像的过程中,如果发现要同步的主题在目标集群上不存在的话,它就会根据 Broker 端参数 num.partitions 和 default.replication.factor 的默认值,自动将主题创建出来
kafka监控
JVM监控
指标
- FuLL GC发生的频率和时长,长时间的停顿会令Borker端抛出各种超市异常
- 活跃对象大小,是设置对大小的重要依据,一般堆内存设置为FULL GC后存活数据大小的1.5到2倍
- 应用程序总数,帮助了解borker进程对CPU的使用情况
监控GC日志,kafkaServer-gc.log,如果进程频繁Full GC,可以开始G1的-XX:+PrintAdaptiveSizePolicy 开关,让 JVM 告诉你到底是谁引发了 Full GC。
集群监控
- broiker进程是否启动,端口是否监听
- broker端的关键日志,服务端日志:server.log,控制器日志controller.log,主题分区状态变更日志state-change.log
- broker端关键线程运行状态,log Compatiton线程,以kafka-log-cleaner-Thread开头;以ReplicaFetcherThread开头的副本拉取消息的线程
- borker端的关键JMX指标 - BytesIn/BytesOut:Broker 端每秒入站和出站字节数。你要确保这组值不要接近你的网络带宽,否则这通常都表示网卡已被“打满”,很容易出现网络丢包的情形。- NetworkProcessorAvgIdlePercent:网络线程池线程平均的空闲比例。应该确保这个JMX值长期大于30%,如果小于这个值,表明网络线程池非常繁忙,需要通过增加网络线程数或者将负载转移给其他服务器的方式,来给Borker减负- RequestHandlerAvgIdlePercent:即I/O线程池平均的空闲比例,如果该值长期小于30%,需要调整I/O线程池的数量,来减少broker端的负载- UnderReplicatedPartitions:未充分备份的分区数,通常表明该分区有可能会出现数据丢失- ISRShink/ISRExpand:ISR收缩和扩容的频次指标,如果出现ISR中副本频繁进出的情形,那么这组值将会很高,需要诊断副本频繁进出ISR的原因,并采取适当的措施- ActiveCobtrollerCount:当前处于激活状态的控制器数量,正常情况下Controller所在的broker上的这个JMX指标值是1,其他broker是0,如果有多台是1 ,则通常表明集群出现了脑裂,主要查询网络连通性,kafka目前依托zk来防止脑裂,一旦出现脑裂,kafka无法保证正常工作
监控kafka客户端
- 客户端所在机器和broker机器之间的网络往返时延 ping 查看RTT
- kafka-producer-network-thread 开头的线程,负责实际消息发送的线程
- kafka-coordinator-hearbeat-thread开头的心跳线程,事关rebalance
- JMX指标request-latency,消息生产请求的延时,join rate 和sync rate 说明rebalance的频繁程度
kafka调优
kafka性能指标:吞吐量和延时
吞吐量:TPS,指borker端进程或者client端应用程序每秒能处理得字节数或者消息数
延时:表示从Produce端发送消息到borker端持久化完成之间的时间间隔
优化漏斗
优化效果,逐层递减
应用程序层,是指优化kafka客户端应用程序代码,比如使用合理的数据结构,缓存,计算开销大的运算结果,抑或是复用构造成本高的对象实例等
- 不要频繁的创建Producer和Consumer对象实例,构造这些对象的开销很大,尽量复用2. 用完及时关闭,这些对象底层会创建很多物理资源,如socket连接,ByteBuffer缓冲区等,不及时关闭的话,会造成资源泄露3. 合理利用多线程来改善性能,producer是线程安全的,可以放心在多个线程中共享一个实例
框架层,是指合理设置kafka集群各种参数
- 尽力保持客户端版本和Broker端版本一致,不同的版本会使kafka丧失很多性能收益,比如Zero Copy
JVM层
- 堆大小, 一般是 6
8G,如果想要精确调整,可以查看GC log,关注Full GC之后堆上存活对象的总大小,设置为该值的1.52倍2. GC收集器的选择,建议选G1
- 堆大小, 一般是 6
操作系统层
- 最好在挂载文件系统时禁用atime(access time),记录的是文件最后被访问的时间,记录atime需要操作系统访问inode资源,禁用atime可以避免inode访问时间的写入操作,减少文件系统的写操作数,可以执行mount -o noatime进行设置2. 至少选择ext4或者是XFS的文件系统,尤其是XFS3. 设置ulimit -n和vm.max_map_count,前者设置得太小,会报错Too Many File Open,后者值太小,在一个主题数比较多的Borker环境上,会碰到OutOfMemoryError:Map failed4. 操作系统页缓存大小,给kafka预留的也缓存越大越好,最小值至少要容纳一个日志段的大小,也就是Borker端参数log.segment.bytes的值,参数的默认值是1GB,预留出一个日志段的大小,至少能保证kafka可以将整个日志段全部放入页缓存
调优TPS
- num.replica.fetchers:表示的是Follower副本用多少个线程来拉取消息,默认是使用1个线程,如果Broker端CPU资源充足,可以调大改参数值,加快Follower副本的同步速度,因为实际生产环境中,配置了acks=all的Produce程序是吞吐量被拖累的首要因素
- Producer端,要改善吞吐量,通常标配是增加消息批次的大小以及批次缓存时间,即batch.size和linger.ms
- fetch.min.bytes,默认是1字节,表示只要broker端积攒了1字节数据,就返回给consumer
延时调优
版权归原作者 paradoxxi 所有, 如有侵权,请联系我们删除。