kafka千万级数据挤压问题解决
1. 背景
之前遇到过这样一个项目需求,我们需要从kafka中拿到数据,之后将数据进行转化为第三方需要的格式,再将数据推送到第三方的kafka中。由于每天的数据量快达到的千万级别,且每条数据中包含了多张图片的base64格式,这就导致数据量多的同时每条数据报文还很大,因此数据消费速度是根本无法跟得上数据生产速度,造成数据堆积。
2. 解决方案
2.1 单节点
首先需要从单节点开始优化,使得每个节点的性能达到最大。
2.1.1 消费者端
- 增加消费者线程数,使消费者线程数和订阅的kafka的topic的分区数一致。每一个消费者可以消费多个分区,但是每个分区只能被一个消费者组中的一个消费者所消费。因此,在同一个消费者组中,当消费者数量和topic的分区数相等时,消费速率最高;若消费者数量大于分区数时,多余的消费者并不会和其他消费者同时消费数据,会造成资源浪费。 因此,我先修改了kafka消费者的concurrency参数,将其值设置为分区大小。 但是,之后我使用kafka命令查看分区消费情况时,却发现多个分区只被一个消费者线程消费(一个线程并发消费了多个分区),这与我们最开始想的一个线程消费一个分区不相符。于是我放弃了通过配置来实现多线程消费的方式,使用代码控制线程消费分区情况: 1)消费端由最开始的@KafkaListener注解监听的方式修改为主动通过程序poll消息的方式。 2)在配置文件中配置要监听的分区数,通过代码精准控制每个消费者仅仅消费那一个分区。 实现代码思路如下: 1)配置文件(其中,start.partition为要消费的开始分区,end.partition为要消费的结束分区):2)消费者伪代码:
publicclassConsumerThreadextendsThread{privateKafkaConsumer<String,String> kafkaConsumer;/**
* 消费者线程的构造方法,
* @param properties kafka的配置信息
* @param topic 消费者订阅的topic
* @param i 要消费的分区数
*/publicConsumerThread(Properties properties,String topic,Integer i){this.kafkaConsumer =newKafkaConsumer(properties);this.kafkaConsumer.assign(Arrays.asList(newTopicPartition(topic, i)));}@Overridepublicvoidrun(){try{while(true){// 拉取kafka中的数据ConsumerRecords<String,String> records = kafkaConsumer.poll(Duration.ofMillis(100));for(ConsumerRecord<String,String>record: records){
logger.info("所属分区:"+record.partition());//实现处理逻辑
……
}}}catch(Exception e){
logger.error("线程执行异常", e);}finally{
kafkaConsumer.close();}}}
3)创建消费者线程伪代码:
publicstaticvoidinitNormalTask(){// 获取kafka配置信息Properties properties = consumerProperty.initConfig();// 获取配置文件中的开始分区,即start.partition
logger.info("start:"+ApplicationUtils.getStartPartition());// 获取配置文件中的结束分区,即end.partition
logger.info("end:"+ApplicationUtils.getEndPartition());// 根据分区数创建消费者线程,for循环中的i正好对应分区数for(int i =ApplicationUtils.getStartPartition(); i <=ApplicationUtils.getEndPartition(); i++){newConsumerThread(properties, normalTopic, i).start();}}
通过以上三步,则将消费者线程和分区数进行了一一对应。
2. 增加分区数量。
增加了分区数量,我们就可以相应地增加消费者的数量,从而提高了消费者的并行能力,也就提高了数据的消费能力(分区数量决定了消费者最大线程数)。
2.1.2 数据处理端
最开始,从kafka中获取到数据后,直接进行数据转化,然后推送至第三方,整个过程是一口气完成的。试想,就算将数据直接推送至第三方,都未必能跟上数据的生产速度,更何况还需要对数据进行格式转化。
如果处理数据的地方对消息的处理非常迅速,那么poll拉取的频次也会更高,进而整体消费的性能也会提升;相反,如果在这里对消息的处理缓慢,比如进行一个事务性操作,或者等待一个RPC的同步响应,那么poll拉取的频次也会随之下降,进而造成整体消费性能的下降。因此,在数据处理时,使用多线程的方式,也是提高数据消费速率的一种方式。模型图如下:
优化策略:消费者从kafka监听到数据后,将数据放到阻塞队列中,然后定义数据处理线程,并创建多个数据处理线程从阻塞队列中获取数据、处理数据并将数据推送至第三方,这样就大大提升了一条数据从消费者获取到最后推送至第三方的速率。
2.2 多节点
由于将单个节点的性能调整到最大后,消费速度依然跟不上kafka生产数据的速度,还是有数据堆积,因此萌生了部署多个节点的想法。
最开始为了将集群的性能调到最大,部署了10个节点,每个节点消费一个分区,首先将数据处理线程设置为20个,在这种情况下,消费速度明显提升,已经不存在数据堆积的问题。
2.3 信号量
由于消费者监听到数据后,是存入了阻塞队列中,若节点重启,则会丢失阻塞队列中的数据,针对这一问题,准备尝试使用信号量解决。
信号量即Semaphore,是一种计数器,可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源,通常用于那些资源有明确访问数量限制的场景,常用于限流 。通俗来说就是,如果线程要访问一个资源就必须先获得信号量。如果信号量内部计数器大于等于要获得的
信号量数,信号量减相应数量,然后允许共享这个资源;否则,信号量将会把线程置入休眠直至计数器大于等于要获得的信号量数。当信号量使用完时,必须释放。
Semaphore是JUC(java.util.concurrent)包下的类,常用方法有:
1)Semaphore(int permits):构造方法,参数为初始信号量数;
2)void acquire():线程占用一个许可。可以传一个int参数,意为线程占用n个许可,没有许可时会阻塞住;
3)void release():线程释放一个许可。可以传一个int参数,意为线程释放n个许可;
4)int availablePermits(): 获取当前信号量可用的许可。
为了验证信号量是否可以解决阻塞队列导致数据丢失问题,我写了一个Demo:
1)模拟kafka产生数据的代码:
publicclassKafkaThreadimplementsRunnable{privateKafkaQueue queue =KafkaQueue.getInstance();publicvoidrun(){while(queue.size()<10000000){
queue.put(1);}}}
2)模拟从kafka中消费数据,获取到数据后,先执行Semaphore的acquire方法获取信号量,若还有信号量则向下执行,将数据存入MyQueue阻塞队列中,否则阻塞在这里:
publicclassConsumerThreadimplementsRunnable{privateKafkaQueue kafkaQueue =KafkaQueue.getInstance();privateMyQueue myQueue =MyQueue.getInstance();publicvoidrun(){while(kafkaQueue.size()>0){try{
myQueue.acquire();System.out.println(Thread.currentThread().getName()+"逻辑处理");
myQueue.put(kafkaQueue.get());System.out.println("阻塞队列大小:"+ myQueue.size());}catch(InterruptedException e){
e.printStackTrace();}}}}
3)模拟数据处理线程,从阻塞队列MyQueue中取出,进行数据处理,处理完毕后调用Semaphore的release方法,释放信号量:
publicclassDataHandleThreadimplementsRunnable{privateMyQueue myQueue =MyQueue.getInstance();publicvoidrun(){while(true){Integer i =null;if(myQueue.size()>0){try{
i = myQueue.get();System.out.println(Thread.currentThread().getName()+"处理逻辑");
myQueue.release();System.out.println("信号量大小:"+ myQueue.availablePermits());TimeUnit.MILLISECONDS.sleep(100);}catch(InterruptedException e){
e.printStackTrace();}}}}}
4)阻塞队列MyQueue,内置信号量作为属性,初始信号量值设置为10:
publicclassMyQueue{privatestaticArrayBlockingQueue<Integer> queue;privatestaticSemaphore semaphore;privateMyQueue(){
queue =newArrayBlockingQueue<Integer>(300);
semaphore =newSemaphore(10);}publicvoidput(Integer i)throwsInterruptedException{
queue.put(i);}publicintget()throwsInterruptedException{return queue.take();}publicintsize(){return queue.size();}privatestaticclassSingleTon{privatestaticMyQueue instance =newMyQueue();}publicstaticMyQueuegetInstance(){returnSingleTon.instance;}// 获取信号量publicvoidacquire()throwsInterruptedException{
semaphore.acquire();}// 释放信号量publicvoidrelease(){
semaphore.release();}// 查看可用信号量数publicintavailablePermits(){return semaphore.availablePermits();}}
5)主方法Main:
publicclassMain{publicstaticvoidmain(String[] args){ExecutorService executor =Executors.newFixedThreadPool(10);// 执行kafka线程,通过一直往KafkaQueue中存入数据来模拟kafka生产数据
executor.execute(newKafkaThread());// 执行消费者线程,指从kafkaQueue中获取数据存入阻塞队列中,用来模拟从kafka中获取数据
executor.execute(newConsumerThread());// 执行生产者线程,指从阻塞队列中获取数据进行数据处理for(int i =0; i <5; i++){
executor.execute(newProducerThread());}}}
6)执行结果(开启5个数据处理线程):
a. 开启一个消费者线程,若阻塞队列大小大于信号量的初始值,则MyQueue队列中的数据个数最多与信号量大小一致。阻塞队列大小为300,信号量初始值为10,则队列中的数据最多有10个,执行结果如下:
pool-1-thread-2逻辑处理
阻塞队列大小:1
pool-1-thread-2逻辑处理
阻塞队列大小:2
pool-1-thread-2逻辑处理
阻塞队列大小:3
pool-1-thread-2逻辑处理
阻塞队列大小:4
pool-1-thread-2逻辑处理
阻塞队列大小:5
pool-1-thread-2逻辑处理
阻塞队列大小:6
pool-1-thread-2逻辑处理
阻塞队列大小:7
pool-1-thread-2逻辑处理
阻塞队列大小:8
pool-1-thread-2逻辑处理
阻塞队列大小:9
pool-1-thread-2逻辑处理
阻塞队列大小:10
pool-1-thread-3处理逻辑
信号量大小:1
pool-1-thread-2逻辑处理
阻塞队列大小:10
pool-1-thread-4处理逻辑
信号量大小:1
pool-1-thread-2逻辑处理
阻塞队列大小:10
......
b. 开启一个消费者线程,若阻塞队列大小小于信号量的初始值,则MyQueue队列中的数据个数最多与阻塞队列大小一致。将阻塞队列大小设置为5,信号量初始值为10,则队列中的数据最多有5个,执行结果如下:
pool-1-thread-2逻辑处理
阻塞队列大小:1
pool-1-thread-2逻辑处理
阻塞队列大小:2
pool-1-thread-2逻辑处理
阻塞队列大小:3
pool-1-thread-2逻辑处理
阻塞队列大小:4
pool-1-thread-2逻辑处理
阻塞队列大小:5
pool-1-thread-2逻辑处理
pool-1-thread-4处理逻辑
信号量大小:5
......
因此,信号量在本项目中是适用的,可以将信号量的初始值设置成与数据处理线程每秒处理数据个数大致相同。
信号量也适用于同种线程对公共资源并发访问时,控制线程数的情况。如下Demo:
@Slf4jpublicclassTestMain{publicstaticvoidmain(String[] args){Semaphore semaphore =newSemaphore(5);ExecutorService pool =Executors.newFixedThreadPool(10);for(int i =0; i <10; i++){
pool.execute(()->{try{
semaphore.acquire();
log.info("成功获取令牌");TimeUnit.SECONDS.sleep(1);}catch(InterruptedException e){
e.printStackTrace();}finally{
log.info("释放令牌");
semaphore.release();}});}}}
执行结果:
10:29:50.584 [pool-1-thread-2] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:50.584 [pool-1-thread-7] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:50.584 [pool-1-thread-8] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:50.585 [pool-1-thread-3] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:50.584 [pool-1-thread-1] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:51.596 [pool-1-thread-3] INFO com.ljp.main.TestMain - 释放令牌
10:29:51.596 [pool-1-thread-8] INFO com.ljp.main.TestMain - 释放令牌
10:29:51.596 [pool-1-thread-7] INFO com.ljp.main.TestMain - 释放令牌
10:29:51.596 [pool-1-thread-4] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:51.596 [pool-1-thread-1] INFO com.ljp.main.TestMain - 释放令牌
10:29:51.596 [pool-1-thread-6] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:51.596 [pool-1-thread-5] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:51.596 [pool-1-thread-9] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:51.596 [pool-1-thread-2] INFO com.ljp.main.TestMain - 释放令牌
10:29:51.597 [pool-1-thread-10] INFO com.ljp.main.TestMain - 成功获取令牌
10:29:52.610 [pool-1-thread-6] INFO com.ljp.main.TestMain - 释放令牌
10:29:52.610 [pool-1-thread-10] INFO com.ljp.main.TestMain - 释放令牌
10:29:52.610 [pool-1-thread-4] INFO com.ljp.main.TestMain - 释放令牌
10:29:52.610 [pool-1-thread-9] INFO com.ljp.main.TestMain - 释放令牌
10:29:52.610 [pool-1-thread-5] INFO com.ljp.main.TestMain - 释放令牌
由执行结果可以看出,10个线程并不是一次性都执行完的,根据打印的时间,看出前五个线程是同时进行的,因为我们将信号量的初始值设为了5,等有线程释放了信号量之后,其他线程再继续执行。
版权归原作者 卡卡罗枫 所有, 如有侵权,请联系我们删除。