0


Kafka-02 @KafkaListener学习

一. 引入依赖

SpringBoot 和 Kafka 搭配使用的场景,引入 spring-kafka 即可;

<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency>

二. 核心结构

先来看一下 spring-kafka 核心图;

当我们在 Spring 中注册一个 Listener,框架就会为我们自动生成一个对应的 ConcurrentMessageListenerContainer 容器来管理,再根据你配置的并发度来创建多个 KafkaMessageListenerContainer 容器,每个 KafkaMessageListenerContainer 可以粗浅的认为是一个线程,这个线程会不断向 server 端发起 poll 请求来实现监听;

  • ConcurrentMessageListenerContainer 是通过 ConcurrentMessageListenerContainerFactory 生产的;一般我们不需要去自定义 ConcurrentMessageListenerContainerFactory,Spring 容器会生成默认的 ConcurrentMessageListenerContainerFactory,也有场景需要我们去自定义 ContainerFactory;
  • ConcurrentMessageListenerContainer 中有一个属性 List<KafkaMessageListenerContainer<K, V>> containers,就是用来存放各个 KafkaMessageListenerContainer;需要厘清两者的关系;

在这里插入图片描述

三. 核心流程

先来看一下核心方法的调用流程图,略去了部分非核心流程;

执行流程如下:

  1. Spring 启动;
  2. Spring 生命周期为 finishRefresh() 时,调用 KafkaListenerEndpointRegistry 中的 start();
  3. 根据 @KafkaListener 创建对应数量的 ConcurrentMessageListenerContainer;
  4. 根据并发配置 concurrency 往 ConcurrentMessageListenerContainer 创建对应数量的 KafkaMessageListenerContainer;
  5. 在每个 KafkaMessageListenerContainer 中创建一个 SimpleAsyncTaskExecutor,值得注意的是 SimpleAsyncTaskExecutor 的作用是创建一条新的线程,并在线程停止时执行 stop();
  6. 创建一个 ListenerConsumer 注册到 SimpleAsyncTaskExecutor 中,这里的 ListenerConsumer 是一个 Runnable 对象,并且内部会创建聚合一个 KafkaConsumer 对象,SimpleAsyncTaskExecutor 中创建出的线程会执行 ListenerConsumer.run();
  7. ListenerConsumer 的 run() 被调用;
  8. run 中开启自旋;
  9. 不断调用 kafka-client 提供的 poll() 拉取新的消息; - 收到新的消息就执行,执行完了就继续自旋;- 收不新消息,重启下一轮自旋;

四. 分析

1. 启动入口

入口在 SpringApplication.run() -> SpringApplication.refreshContext() -> AbstractApplicationContext.refresh() -> AbstractApplicationContext.finishRefresh();

这个 finishRefresh() 中会调用 LifecycleProssor.onRefresh() 启动 kafka 监听器;

// ------------------------------ AbstractApplicationContext ----------------------------protectedvoidfinishRefresh(){clearResourceCaches();initLifecycleProcessor();// 调用 LifecycleProcessor.onRefresh(),Spring 中默认的是 DefaultLifecycleProcessorgetLifecycleProcessor().onRefresh();publishEvent(newContextRefreshedEvent(this));if(!NativeDetector.inNativeImage()){LiveBeansView.registerApplicationContext(this);}}// ------------------------------ DefaultLifecycleProcessor ----------------------------publicvoidonRefresh(){startBeans(true);this.running =true;}// ------------------------------ DefaultLifecycleProcessor ----------------------------privatevoiddoStart(Map<String,?extendsLifecycle> lifecycleBeans,String beanName,boolean autoStartupOnly){Lifecycle bean = lifecycleBeans.remove(beanName);if(bean !=null&& bean !=this){String[] dependenciesForBean =getBeanFactory().getDependenciesForBean(beanName);for(String dependency : dependenciesForBean){doStart(lifecycleBeans, dependency, autoStartupOnly);}if((!autoStartupOnly ||!(bean instanceofSmartLifecycle)||((SmartLifecycle) bean).isAutoStartup())){try{// 获取容器中的 LifeCycle bean 对象,调用它的 start()// SpringKafka 中对应的是 KafkaListenerEndpointRegistry// 我们重点看一下 KafkaListenerEndpointRegistry.start()
                bean.start();}catch(Throwable ex){thrownewApplicationContextException("Failed to start bean '"+ beanName +"'", ex);}}}}

2. KafkaListenerEndpointRegistry

KafkaListenerEndpointRegistry 是 SpringKafka 中很重要的类,是一个 SmartLifecycle 实现类对象,它里面有一个属性 listenerContainers,存放了我们的 ConcurrentMessageListenerContainer 对象;

我们先看它的 start();

// ---------------------------- KafkaListenerEndpointRegistry ---------------------------publicvoidstart(){// 轮询所有的 ConcurrentMessageListenerContainer 对象// 执行 ConcurrentMessageListenerContainer.start()for(MessageListenerContainer listenerContainer :getListenerContainers()){startIfNecessary(listenerContainer);}this.running =true;}// ---------------------------- KafkaListenerEndpointRegistry ---------------------------privatevoidstartIfNecessary(MessageListenerContainer listenerContainer){if((this.contextRefreshed &&this.alwaysStartAfterRefresh)|| listenerContainer.isAutoStartup()){// 执行 ConcurrentMessageListenerContainer.start()
        listenerContainer.start();}}// ---------------------------- AbstractMessageListenerContainer ---------------------------publicfinalvoidstart(){checkGroupId();synchronized(this.lifecycleMonitor){if(!isRunning()){// 调用真正干事的 doStart(),进入 ConcurrentMessageListenerContainer.doStart()doStart();}}}

我们看 ConcurrentMessageListenerContainer.doStart() 干了些啥;

3. ConcurrentMessageListenerContainer

我们看下 ConcurrentMessageListenerContainer.doStart() 干了啥;

// ---------------------------- ConcurrentMessageListenerContainer ---------------------------protectedvoiddoStart(){if(!isRunning()){checkTopics();ContainerProperties containerProperties =getContainerProperties();TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();if(topicPartitions !=null&&this.concurrency > topicPartitions.length){this.concurrency = topicPartitions.length;}setRunning(true);// 1. 根据 @KafkaListener 中配置的 concurrency 轮询for(int i =0; i <this.concurrency; i++){// 2. 创建 KafkaMessageListenerContainerKafkaMessageListenerContainer<K,V> container =constructContainer(containerProperties, topicPartitions, i);// 3. 对刚创建出的 KafkaMessageListenerContainer 做一些配置configureChildContainer(i, container);if(isPaused()){
                container.pause();}// 4. 启动 KafkaMessageListenerContainer
            container.start();// 5. 将 KafkaMessageListenerContainer 添加到 ConcurrentMessageListenerContainer 中this.containers.add(container);}}}

关键流程是第 3 步和第 4 步,我们分开来看;

3.1 configureChildContainer()

对刚创建出的 KafkaMessageListenerContainer 做一些配置;

这里创建了一个 SimpleAsyncTaskExecutor,设置进 KafkaMessageListenerContainer 中;

privatevoidconfigureChildContainer(int index,KafkaMessageListenerContainer<K,V> container){String beanName =getBeanName();
    beanName =(beanName ==null?"consumer": beanName)+"-"+ index;
    container.setBeanName(beanName);ApplicationContext applicationContext =getApplicationContext();if(applicationContext !=null){
        container.setApplicationContext(applicationContext);}ApplicationEventPublisher publisher =getApplicationEventPublisher();if(publisher !=null){
        container.setApplicationEventPublisher(publisher);}// 设置 clinetIdSuffix,clientId 前缀
    container.setClientIdSuffix(this.concurrency >1||this.alwaysClientIdSuffix ?"-"+ index :"");
    container.setGenericErrorHandler(getGenericErrorHandler());
    container.setCommonErrorHandler(getCommonErrorHandler());
    container.setAfterRollbackProcessor(getAfterRollbackProcessor());
    container.setRecordInterceptor(getRecordInterceptor());
    container.setBatchInterceptor(getBatchInterceptor());
    container.setInterceptBeforeTx(isInterceptBeforeTx());
    container.setListenerInfo(getListenerInfo());AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();if(exec ==null){// 1. 创建出 SimpleAsyncTaskExecutor,并加入到 this.executors
        exec =newSimpleAsyncTaskExecutor(beanName +"-C-");this.executors.add(exec);// 2. 将当前创建的 SimpleAsyncTaskExecutor 设置到 KafkaMessageListenerContainer
        container.getContainerProperties().setConsumerTaskExecutor(exec);}}

3.2 container.start()

调用 KafkaMessageListenerContainer 的 start(),最终调用 KafkaMessageListenerContainer.doStart();

protectedvoiddoStart(){if(isRunning()){return;}ContainerProperties containerProperties =getContainerProperties();checkAckMode(containerProperties);Object messageListener = containerProperties.getMessageListener();AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();if(consumerExecutor ==null){
        consumerExecutor =newSimpleAsyncTaskExecutor((getBeanName()==null?"":getBeanName())+"-C-");
        containerProperties.setConsumerTaskExecutor(consumerExecutor);}GenericMessageListener<?> listener =(GenericMessageListener<?>) messageListener;ListenerType listenerType =determineListenerType(listener);// 1. 创建 ListenerConsumer// ListenerConsumer 是一个 Runnable 对象// new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员// 它的 run() 比较重要this.listenerConsumer =newListenerConsumer(listener, listenerType);setRunning(true);this.startLatch =newCountDownLatch(1);// 2. 将 ListenerConsumer 任务放入到 SimpleAsyncTaskExecutor 中异步调用this.listenerConsumerFuture = consumerExecutor.submitListenable(this.listenerConsumer);}

ListenerConsumer 是一个 Runnable 对象,new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员,我们看下 ListenerConsumer.run();

4. ListenerConsumer.run()

我们看下 ListenerConsumer 的 run();可以看到这个任务会进入自旋去处理任务;

publicvoidrun(){ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());publishConsumerStartingEvent();this.consumerThread =Thread.currentThread();setupSeeks();KafkaUtils.setConsumerGroupId(this.consumerGroupId);this.count =0;this.last =System.currentTimeMillis();initAssignedPartitions();publishConsumerStartedEvent();Throwable exitThrowable =null;// 开启自旋while(isRunning()){// 通过 KafkaConsumer 向 kafka-server 发起 poll 请求pollAndInvoke();}wrapUp(exitThrowable);}

ListenerConsumer 的 pollAndInvoke() 比较绕,总之我们知道它会通过反射调用我们 @KafkaListener 声明的方法;

我们简单看下最终调我们 @KafkaListener 声明方法的地方;

4.1 HandlerAdapter.invoke()

调用到 RecordMessagingMessageListenerAdapter.invoke();

publicObjectinvoke(Message<?> message,Object... providedArgs)throwsException{if(this.invokerHandlerMethod !=null){// 最终的执行入口// 最后会通过反射调用我们的 @KafkaListener 声明的方法returnthis.invokerHandlerMethod.invoke(message, providedArgs);}elseif(this.delegatingHandler.hasDefaultHandler()){Object[] args =newObject[providedArgs.length +1];
      args[0]= message.getPayload();System.arraycopy(providedArgs,0, args,1, providedArgs.length);returnthis.delegatingHandler.invoke(message, args);}else{returnthis.delegatingHandler.invoke(message, providedArgs);}}

至此,SpringKafka 分析完毕;

标签: kafka 学习 linq

本文转载自: https://blog.csdn.net/qq_45149567/article/details/140319227
版权归原作者 买断 所有, 如有侵权,请联系我们删除。

“Kafka-02 @KafkaListener学习”的评论:

还没有评论