一、Kafka中数据清理(Log Deletion)
Kafka的消息存储在磁盘中,为了控制磁盘占用空间,Kafka需要不断地对过去的一些消息进行清理工作。Kafka的每个分区都有很多的日志文件,这样也是为了方便进行日志的清理。在Kafka中,提供两种日志清理方式:
- 日志删除(Log Deletion):按照指定的策略直接删除不符合条件的日志。
- 日志压缩(Log Compaction):按照消息的key进行整合,有相同key的但有不同value值,只保留最后一个版本。
在Kafka的broker或topic配置中:
配置项配置值说明log.cleaner.enabletrue(默认)开启自动清理日志功能log.cleanup.policydelete(默认)删除日志log.cleanup.policycompaction压缩日志log.cleanup.policydelete,compact同时支持删除、压缩
1.1、日志删除
日志删除是以段(segment日志)为单位来进行定期清理的。
1.1.1、定时日志删除任务
Kafka日志管理器中会有一个专门的日志删除任务来定期检测和删除不符合保留条件的日志分段文件,这个周期可以通过broker端参数log.retention.check.interval.ms来配置,默认值为300,000,即5分钟。当前日志分段的保留策略有3种:
- 基于时间的保留策略
- 基于日志大小的保留策略
- 基于日志起始偏移量的保留策略
1.1.2、基于时间的保留策略
以下三种配置可以指定如果Kafka中的消息超过指定的阈值,就会将日志进行自动清理:
- log.retention.hours
- log.retention.minutes
- log.retention.ms
其中,优先级为 log.retention.ms > log.retention.minutes > log.retention.hours。默认情况,在broker中,配置如下:
log.retention.hours=168
也就是,默认日志的保留时间为168小时,相当于保留7天。
删除日志分段时:
- 从日志文件对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操作
- 将日志分段文件添加上“.deleted”的后缀(也包括日志分段对应的索引文件)
- Kafka的后台定时任务会定期删除这些“.deleted”为后缀的文件,这个任务的延迟执行时间可以通过file.delete.delay.ms参数来设置,默认值为60000,即1分钟。
1.1.2.1、设置topic 5秒删除一次
设置topic的删除策略
- key: retention.ms
- value: 5000
1.1.3、基于日志大小的保留策略
日志删除任务会检查当前日志的大小是否超过设定的阈值来寻找可删除的日志分段的文件集合。可以通过broker端参数 log.retention.bytes 来配置,默认值为-1,表示无穷大。如果超过该大小,会自动将超出部分删除。
注意:
log.retention.bytes 配置的是日志文件的总大小,而不是单个的日志分段的大小,一个日志文件包含多个日志分段。
1.1.4、基于日志起始偏移量保留策略
每个segment日志都有它的起始偏移量,如果起始偏移量小于 logStartOffset,那么这些日志文件将会标记为删除。
1.2 日志压缩(Log Compaction)
Log Compaction是默认的日志删除之外的清理过时数据的方式。它会将相同的key对应的数据只保留一个版本。
- Log Compaction执行后,offset将不再连续,但依然可以查询Segment
- Log Compaction执行前后,日志分段中的每条消息偏移量保持不变。Log Compaction会生成一个新的Segment文件
- Log Compaction是针对key的,在使用的时候注意每个消息的key不为空
- 基于Log Compaction可以保留key的最新更新,可以基于Log Compaction来恢复消费者的最新状态
二、Kafka配额限速机制(Quotas)
生产者和消费者以极高的速度生产/消费大量数据或产生请求,从而占用broker上的全部资源,造成网络IO饱和。有了配额(Quotas)就可以避免这些问题。Kafka支持配额管理,从而可以对Producer和Consumer的produce&fetch操作进行流量限制,防止个别业务压爆服务器。
2.1、限制producer端速率
为所有client id设置默认值,以下为所有producer程序设置其TPS不超过1MB/s,即1048576/s,命令如下:
bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --add-config 'producer_byte_rate=1048576' --entity-type clients --entity-default
运行基准测试,观察生产消息的速率
bin/kafka-producer-perf-test.sh --topic test --num-records 500000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 acks=1
结果:
50000 records sent, 1108.156028 records/sec (1.06 MB/sec)
2.2、限制consumer端速率
对consumer限速与producer类似,只不过参数名不一样。
为指定的topic进行限速,以下为所有consumer程序设置topic速率不超过1MB/s,即1048576/s。命令如下:
bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --add-config 'consumer_byte_rate=1048576' --entity-type clients --entity-default
运行基准测试:
bin/kafka-consumer-perf-test.sh --broker-list node1.angyan.cn:9092,node2.angyan.cn:9092,node3.angyan.cn:9092 --topic test --fetch-size 1048576 --messages 500000
结果为:
MB.sec:1.0743
2.3、取消Kafka的Quota配置
使用以下命令,删除Kafka的Quota配置
bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --delete-config 'producer_byte_rate' --entity-type clients --entity-default
bin/kafka-configs.sh --zookeeper node1.angyan.cn:2181 --alter --delete-config 'consumer_byte_rate' --entity-type clients --entity-default
三、Kafka实战
3.1、生产者
3.1.1、导入依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
3.1.2、配置文件
spring.kafka.angyan.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
spring.kafka.angyan.clientId=TEST_DEMO_MESSAGE
spring.kafka.angyan.producer.compressionType=gzip
spring.kafka.angyan.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.angyan.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
# 提交延时
spring.kafka.angyan.linger.ms=1000
spring.kafka.angyan.template.defaultTopic=TEST_DEMO_TOPIC
3.1.3、发送消息
@Service
public class KafkaServiceImpl implements KafkaService {
@Qualifier("kafkaTemplate")
@Autowired
private KafkaTemplate kafkaRecordTemplate;
@Override
public String sendMessage(String key, byte[] bytes) {
Map header = new HashMap();
header.put(KafkaHeaders.KEY,key);
MessageHeaders messageHeaders = new MessageHeaders(header);
Message message = MessageBuilder.createMessage(bytes, messageHeaders);
kafkaRecordTemplate.send(message);
return null;
}
}
3.2、消费者
3.2.1、配置类
pring.kafka.angyan.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
spring.kafka.angyan.consumer.group.id=TEST_DEMO_MESSAGE
spring.kafka.angyan.consumer.clientId=TEST_DEMO_MESSAGE
spring.kafka.angyan.defaultTopic=TEST_DEMO_TOPIC
spring.kafka.angyan.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.angyan.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
3.2.2、消费消息
@Component
public class KafkaCustomer {
@KafkaListener(topics = "${spring.kafka.angyan.defaultTopic}",containerFactory = "kafkaTemplateConsumer")
public void testKafka(ConsumerRecord<String,byte[]> record){
//处理业务逻辑
}
}
版权归原作者 之乎者也· 所有, 如有侵权,请联系我们删除。