0


SpringBoot 处理 @KafkaListener 消息

消息监听容器

1、KafkaMessageListenerContainer

由spring提供用于监听以及拉取消息,并将这些消息按指定格式转换后交给由@KafkaListener注解的方法处理,相当于一个消费者;

看看其整体代码结构:

可以发现其入口方法为doStart(), 往上追溯到实现了SmartLifecycle接口,很明显,由spring管理其start和stop操作;

ListenerConsumer, 内部真正拉取消息消费的是这个结构,其 实现了Runable接口,简言之,它就是一个后台线程轮训拉取并处理消息(while true死循环拉取消息)。

在doStart方法中会创建ListenerConsumer并交给线程池处理

以上步骤就开启了消息监听过程。

KafkaMessageListenerContainer#doStart
protected void doStart() {
        if (isRunning()) {
            return;
        }
        ContainerProperties containerProperties = getContainerProperties();
        if (!this.consumerFactory.isAutoCommit()) {
            AckMode ackMode = containerProperties.getAckMode();
            if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {
                Assert.state(containerProperties.getAckCount() > 0, "'ackCount' must be > 0");
            }
            if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME))
                    && containerProperties.getAckTime() == 0) {
                containerProperties.setAckTime(5000);
            }
        }

        Object messageListener = containerProperties.getMessageListener();
        Assert.state(messageListener != null, "A MessageListener is required");
        if (containerProperties.getConsumerTaskExecutor() == null) {
            SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                    (getBeanName() == null ? "" : getBeanName()) + "-C-");
            containerProperties.setConsumerTaskExecutor(consumerExecutor);
        }
        Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");
        this.listener = (GenericMessageListener<?>) messageListener;
        ListenerType listenerType = ListenerUtils.determineListenerType(this.listener);
        if (this.listener instanceof DelegatingMessageListener) {
            Object delegating = this.listener;
            while (delegating instanceof DelegatingMessageListener) {
                delegating = ((DelegatingMessageListener<?>) delegating).getDelegate();
            }
            listenerType = ListenerUtils.determineListenerType(delegating);
        }
        // 这里创建了监听消费者对象
        this.listenerConsumer = new ListenerConsumer(this.listener, listenerType);
        setRunning(true);
        // 将消费者对象放入到线程池中执行
        this.listenerConsumerFuture = containerProperties
                .getConsumerTaskExecutor()
                .submitListenable(this.listenerConsumer);
    }
KafkaMessageListenerContainer.ListenerConsumer#run
public void run() {
            this.consumerThread = Thread.currentThread();
            if (this.genericListener instanceof ConsumerSeekAware) {
                ((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
            }
            if (this.transactionManager != null) {
                ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
            }
            this.count = 0;
            this.last = System.currentTimeMillis();
            if (isRunning() && this.definedPartitions != null) {
                try {
                    initPartitionsIfNeeded();
                }
                catch (Exception e) {
                    this.logger.error("Failed to set initial offsets", e);
                }
            }
            long lastReceive = System.currentTimeMillis();
            long lastAlertAt = lastReceive;
            while (isRunning()) {
                try {
                    if (!this.autoCommit && !this.isRecordAck) {
                        processCommits();
                    }
                    processSeeks();
                    if (!this.consumerPaused && isPaused()) {
                        this.consumer.pause(this.consumer.assignment());
                        this.consumerPaused = true;
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Paused consumption from: " + this.consumer.paused());
                        }
                        publishConsumerPausedEvent(this.consumer.assignment());
                    }
                    // 拉取信息
                    ConsumerRecords<K, V> records = this.consumer.poll(this.containerProperties.getPollTimeout());
                    this.lastPoll = System.currentTimeMillis();
                    if (this.consumerPaused && !isPaused()) {
                        if (this.logger.isDebugEnabled()) {
                            this.logger.debug("Resuming consumption from: " + this.consumer.paused());
                        }
                        Set<TopicPartition> paused = this.consumer.paused();
                        this.consumer.resume(paused);
                        this.consumerPaused = false;
                        publishConsumerResumedEvent(paused);
                    }
                    if (records != null && this.logger.isDebugEnabled()) {
                        this.logger.debug("Received: " + records.count() + " records");
                        if (records.count() > 0 && this.logger.isTraceEnabled()) {
                            this.logger.trace(records.partitions().stream()
                                .flatMap(p -> records.records(p).stream())
                                // map to same format as send metadata toString()
                                .map(r -> r.topic() + "-" + r.partition() + "@" + r.offset())
                                .collect(Collectors.toList()));
                        }
                    }
                    if (records != null && records.count() > 0) {
                        if (this.containerProperties.getIdleEventInterval() != null) {
                            lastReceive = System.currentTimeMillis();
                        }
                        invokeListener(records);
                    }
                    else {
                        if (this.containerProperties.getIdleEventInterval() != null) {
                            long now = System.currentTimeMillis();
                            if (now > lastReceive + this.containerProperties.getIdleEventInterval()
                                    && now > lastAlertAt + this.containerProperties.getIdleEventInterval()) {
                                publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener
                                        ? this.consumer : null, this.consumerPaused);
                                lastAlertAt = now;
                                if (this.genericListener instanceof ConsumerSeekAware) {
                                    seekPartitions(getAssignedPartitions(), true);
                                }
                            }
                        }
                    }
                }
                catch (WakeupException e) {
                    // Ignore, we're stopping
                }
                catch (NoOffsetForPartitionException nofpe) {
                    this.fatalError = true;
                    ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
                    break;
                }
                catch (Exception e) {
                    handleConsumerException(e);
                }
            }
            ProducerFactoryUtils.clearConsumerGroupId();
            if (!this.fatalError) {
                if (this.kafkaTxManager == null) {
                    commitPendingAcks();
                    try {
                        this.consumer.unsubscribe();
                    }
                    catch (WakeupException e) {
                        // No-op. Continue process
                    }
                }
            }
            else {
                ListenerConsumer.this.logger.error("No offset and no reset policy; stopping container");
                KafkaMessageListenerContainer.this.stop();
            }
            this.monitorTask.cancel(true);
            if (!this.taskSchedulerExplicitlySet) {
                ((ThreadPoolTaskScheduler) this.taskScheduler).destroy();
            }
            this.consumer.close();
            this.logger.info("Consumer stopped");
        }

2、ConcurrentMessageListenerContainer

并发消息监听,相当于创建消费者;其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理;从实现上看就是在KafkaMessageListenerContainer上做了层包装,有多少的concurrency就创建多个KafkaMessageListenerContainer,也就是concurrency个消费者。

    protected void doStart() {
        if (!isRunning()) {
            ContainerProperties containerProperties = getContainerProperties();
            TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
            if (topicPartitions != null
                    && this.concurrency > topicPartitions.length) {
                this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
                        + "equal to the number of partitions; reduced from " + this.concurrency + " to "
                        + topicPartitions.length);
                this.concurrency = topicPartitions.length;
            }
            setRunning(true);

            // 创建多个消费者
            for (int i = 0; i < this.concurrency; i++) {
                KafkaMessageListenerContainer<K, V> container;
                if (topicPartitions == null) {
                    container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
                            containerProperties);
                }
                else {
                    container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
                            containerProperties, partitionSubset(containerProperties, i));
                }
                String beanName = getBeanName();
                container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
                if (getApplicationEventPublisher() != null) {
                    container.setApplicationEventPublisher(getApplicationEventPublisher());
                }
                container.setClientIdSuffix("-" + i);
                container.setAfterRollbackProcessor(getAfterRollbackProcessor());
                container.start();
                this.containers.add(container);
            }
        }
    }

3、@KafkaListener底层监听原理

上面已经介绍了

KafkaMessageListenerContainer

的作用是拉取并处理消息,但还缺少关键的一步,即 如何将我们的业务逻辑与KafkaMessageListenerContainer的处理逻辑联系起来?

那么这个桥梁就是@KafkaListener注解

KafkaListenerAnnotationBeanPostProcessor, 从后缀BeanPostProcessor就可以知道这是Spring IOC初始化bean相关的操作,当然这里也是;此类会扫描带@KafkaListener注解的类或者方法,通过 KafkaListenerContainerFactory工厂创建对应的KafkaMessageListenerContainer,并调用start方法启动监听,也就是这样打通了这条路…

4、Spring Boot 自动加载kafka相关配置

1、KafkaAutoConfiguration
自动生成kafka相关配置,比如当缺少这些bean的时候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等,默认创建bean实例

2、KafkaAnnotationDrivenConfiguration
主要是针对于spring-kafka提供的注解背后的相关操作,比如 @KafkaListener;

在开启了@EnableKafka注解后,spring会扫描到此配置并创建缺少的bean实例,比如当配置的工厂beanName不是kafkaListenerContainerFactory的时候,就会默认创建一个beanName为kafkaListenerContainerFactory的实例,这也是为什么在springboot中不用定义consumer的相关配置也可以通过@KafkaListener正常的处理消息

5、消息处理

1、单条消息处理

@Configuration
public class KafkaConsumerConfiguration {

  

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaCustomizeContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(2);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    private ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    private Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bizConfig.getReconciliationInstanceKafkaServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, bizConfig.getReconciliationInstanceKafkaConsumerGroupId());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 300);
        // poll 一次拉取的阻塞的最大时长,单位:毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10000);
        return props;
    }

}

这种方式的@KafkaLisener中的参数是单条的。

2、批量处理

@Configuration
@EnableKafka
public class KafkaConfig {
 
    @Bean
public KafkaListenerContainerFactory<?, ?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    // 增加开启批量处理
    factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<
    return factory;
}
 
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

// 注意:这里接受的是集合类型
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}

这种方式的@KafkaLisener中的参数是多条的。

6、线程池相关

如果没有额外给Kafka指定线程池,底层默认用的是SimpleAsyncTaskExecutor类,它不使用线程池,而是为每个任务创建新线程。相当于一个消费者用一个独立的线程来跑。

总结

spring为了将kafka融入其生态,方便在spring大环境下使用kafka,开发了spring-kafa这一模块,本质上是为了帮助开发者更好的以spring的方式使用kafka

@KafkaListener就是这么一个工具,在同一个项目中既可以有单条的消息处理,也可以配置多条的消息处理,稍微改变下配置即可实现,很是方便

当然,@KafkaListener单条或者多条消息处理仍然是spring自行封装处理,与kafka-client客户端的拉取机制无关;比如一次性拉取50条消息,对于单条处理来说就是循环50次处理,而多条消息处理则可以一次性处理50条;本质上来说这套逻辑都是spring处理的,并不是说单条消费就是通过kafka-client一次只拉取一条消息

在使用过程中需要注意spring自动的创建的一些bean实例,当然也可以覆盖其自动创建的实例以满足特定的需求场景。

原文链接:https://blog.csdn.net/yuechuzhixing/article/details/124725713

标签: spring boot java spring

本文转载自: https://blog.csdn.net/weixin_38687619/article/details/142252107
版权归原作者 小豹子的技术笔记 所有, 如有侵权,请联系我们删除。

“SpringBoot 处理 @KafkaListener 消息”的评论:

还没有评论