一、介绍
在之前的文章中,我们详细的介绍了 kafka 的架构模型,在集群环境中,kafka 可以通过设置分区数来加快数据的消费速度。
光知道理论可不行,我们得真真切切的实践起来才行!
下面,我将结合生产环境的真实案例,以SpringBoot技术框架为基础,向大家介绍 kafka 的使用以及如何实现数据高吞吐!
kaka详解:https://blog.csdn.net/qq_45228323/article/details/120241266
二、代码实践
最近,公司大数据团队每天凌晨会将客户的订单数据进行统计计算,然后把业绩数据推送给我们,以便销售人员每天能看到昨天的业绩数据,数据的体量大约在 1000 多万条,以下是我对接的过程!
2.1、添加 kafka 依赖包
本次项目的
SpringBoot
版本为
2.1.5.RELEASE
,依赖的
kafka
的版本为
2.2.6.RELEASE
<!--kafka--><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.6.RELEASE</version></dependency>
2.2、添加 kafka 配置变量
当添加完了依赖包之后,我们只需要在
application.properties
中添加
kafka
配置变量,基本上就可以正常使用了。
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring.kafka.bootstrap-servers=197.168.25.196:9092
#重试次数
spring.kafka.producer.retries=3
#批量发送的消息数量
spring.kafka.producer.batch-size=1000
#32MB的批处理缓冲区
spring.kafka.producer.buffer-memory=33554432
#默认消费者组
spring.kafka.consumer.group-id=crm-user-service
#最早未被消费的offset
spring.kafka.consumer.auto-offset-reset=earliest
#批量一次最大拉取数据量
spring.kafka.consumer.max-poll-records=4000
#是否自动提交
spring.kafka.consumer.enable-auto-commit=true
#自动提交时间间隔,单位ms
spring.kafka.consumer.auto-commit-interval=1000
2.3、创建一个消费者
@ComponentpublicclassBigDataTopicListener{privatestaticfinalLogger log =LoggerFactory.getLogger(BigDataTopicListener.class);/**
* 监听kafka数据
* @param consumerRecords
* @param ack
*/@KafkaListener(topics ={"big_data_topic"})publicvoidconsumer(ConsumerRecord<?,?> consumerRecord){
log.info("收到bigData推送的数据'{}'", consumerRecord.toString());//...//db.save(consumerRecord);//插入或者更新数据}}
2.4、模拟对方推送数据测试
@RunWith(SpringRunner.class)@SpringBootTestpublicclassKafkaProducerTest{@AutowiredprivateKafkaTemplate<String,String> kafkaTemplate;@TestpublicvoidtestSend(){for(int i =0; i <5000; i++){Map<String,Object> map =newLinkedHashMap<>();
map.put("datekey",20210610);
map.put("userid", i);
map.put("salaryAmount", i);//向kafka的big_data_topic主题推送数据
kafkaTemplate.send("big_data_topic",JSONObject.toJSONString(map));}}}
起初,通过这种单条数据消费方式,进行测试程序没太大毛病!
但是,当上到生产之后,发现一个很大的问题,就是消费 1000 万条数据,至少需要3个小时,结果导致数据看板一直没数据。
第二天痛定思痛,决定改成批量消费模型,怎么操作呢,请看下面!
2.5、将 kafka 的消费模式改成批量消费
首先,创建一个
KafkaConfiguration
配置类,内容如下!
@ConfigurationpublicclassKafkaConfiguration{@Value("${spring.kafka.bootstrap-servers}")privateString bootstrapServers;@Value("${spring.kafka.producer.retries}")privateInteger retries;@Value("${spring.kafka.producer.batch-size}")privateInteger batchSize;@Value("${spring.kafka.producer.buffer-memory}")privateInteger bufferMemory;@Value("${spring.kafka.consumer.group-id}")privateString groupId;@Value("${spring.kafka.consumer.auto-offset-reset}")privateString autoOffsetReset;@Value("${spring.kafka.consumer.max-poll-records}")privateInteger maxPollRecords;@Value("${spring.kafka.consumer.batch.concurrency}")privateInteger batchConcurrency;@Value("${spring.kafka.consumer.enable-auto-commit}")privateBoolean autoCommit;@Value("${spring.kafka.consumer.auto-commit-interval}")privateInteger autoCommitInterval;/**
* 生产者配置信息
*/@BeanpublicMap<String,Object>producerConfigs(){Map<String,Object> props =newHashMap<>();
props.put(ProducerConfig.ACKS_CONFIG,"0");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG,1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);return props;}/**
* 生产者工厂
*/@BeanpublicProducerFactory<String,String>producerFactory(){returnnewDefaultKafkaProducerFactory<>(producerConfigs());}/**
* 生产者模板
*/@BeanpublicKafkaTemplate<String,String>kafkaTemplate(){returnnewKafkaTemplate<>(producerFactory());}/**
* 消费者配置信息
*/@BeanpublicMap<String,Object>consumerConfigs(){Map<String,Object> props =newHashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,30000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,30000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);return props;}/**
* 消费者批量工厂
*/@BeanpublicKafkaListenerContainerFactory<?>batchFactory(){ConcurrentKafkaListenerContainerFactory<Integer,String> factory =newConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(newDefaultKafkaConsumerFactory<>(consumerConfigs()));//设置并发量,小于或等于Topic的分区数
factory.setConcurrency(batchConcurrency);
factory.getContainerProperties().setPollTimeout(1500);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);return factory;}}
同时,新增一个
spring.kafka.consumer.batch.concurrency
变量,用来设置并发数,通过这个参数我们可以指定几个线程来实现消费。
在
application.properties
配置文件中,添加如下变量
#批消费并发量,小于或等于Topic的分区数
spring.kafka.consumer.batch.concurrency =3
#设置每次批量拉取的最大数量为4000
spring.kafka.consumer.max-poll-records=4000
#设置自动提交改成false
spring.kafka.consumer.enable-auto-commit=false
最后,将单个消费方法改成批量消费方法模式
@ComponentpublicclassBigDataTopicListener{privatestaticfinalLogger log =LoggerFactory.getLogger(BigDataTopicListener.class);/**
* 监听kafka数据(批量消费)
* @param consumerRecords
* @param ack
*/@KafkaListener(topics ={"big_data_topic"}, containerFactory ="batchFactory")publicvoidbatchConsumer(List<ConsumerRecord<?,?>> consumerRecords,Acknowledgment ack){long start =System.currentTimeMillis();//...//db.batchSave(consumerRecords);//批量插入或者批量更新数据//手动提交
ack.acknowledge();
log.info("收到bigData推送的数据,拉取数据量:{},消费时间:{}ms", consumerRecords.size(),(System.currentTimeMillis()- start));}}
此时,消费性能大大的提升,数据处理的非常快,500万条数据,最多 30 分钟就全部消费完毕了。
本例中的消费微服务,生产环境部署了
3
台服务器,同时
big_data_topic
主题的分区数为
3
,因此并发数设置为
3
比较合适。
随着推送的数据量不断增加,如果你觉得消费速度还不够,你可以重新设置每次批量拉取的最大数量,活着横向扩展微服务的集群实例数量和
topic
的分区数,以此来加快数据的消费速度。
但是,如果在单台机器中,每次批量拉取的最大数量过大,大对象也会很大,会造成频繁的 gc 告警!
因此,在实际的使用过程中,每次批量拉取的最大数量并不是越大越好,根据当前服务器的硬件配置,调节到合适的阀值,才是最优的选择!
三、小结
本文主要以
SpringBoot
技术框架为背景,结合实际业务需求,采用 kafka 进行数据消费,实现数据量的高吞吐,在下篇文章中,我们会介绍消费失败的处理流程。
版权归原作者 大梦谁先觉i 所有, 如有侵权,请联系我们删除。