0


Kafka消费端concurrency参数

首先说一下结论,这个参数用来增加消费者实例,或者可以理解为@KafkaListener注解实例的数量。当消费者服务数量小于topic的分区数的时候使用此参数可以提升消费能力,spring-kafka在初始化的时候会启动**

concurrency

**个Consumer线程来执行

@KafkaListener

里面的方法。

Consumer线程

用来直接调用kafka-client的poll()方法获取消息。如果是自动提交offset,poll()方法获取消息后会直接给到listener线程执行。

Listener线程

真正调用处理我们代码中标有

@KafkaListener

注解方法的线程。具体实现在KafkaMessageListenerContainer 类中。

KafkaMessageListenerContainer

protected void pollAndInvoke() {
            if (!this.autoCommit && !this.isRecordAck) {
                processCommits();
            }
            processSeeks();
            checkPaused();
            ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout);
            this.lastPoll = System.currentTimeMillis();
            checkResumed();
            debugRecords(records);
            if (records != null && records.count() > 0) {
                if (this.containerProperties.getIdleEventInterval() != null) {
                    this.lastReceive = System.currentTimeMillis();
                }
                // 这里可以看到如果是自动提交offset,会直接把consumer poll下来的消息给到listener执行,
// 即kafka consumer所在线程会直接调用我们的@KafkaListener方法
                invokeListener(records);
            }
            else {
                checkIdle();
            }
        }
如果是手动提交offset,即enable-auto-commit设置为false,则是将消息投放到阻塞队列中,另一边由Listener线程取出执行。

ConcurrentMessageListenerContainer

当使用了concurrency参数是,在consumer启动过程会通过这个类去初始化。其实就是根据concurrency的值for循环调用KafkaMessageListenerContainer的dostart方法创建实例

标签: kafka java 大数据

本文转载自: https://blog.csdn.net/qq_29569183/article/details/131253360
版权归原作者 WannaRunning 所有, 如有侵权,请联系我们删除。

“Kafka消费端concurrency参数”的评论:

还没有评论