一. 引入依赖
SpringBoot 和 Kafka 搭配使用的场景,引入 spring-kafka 即可;
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency>
二. 核心结构
先来看一下 spring-kafka 核心图;
当我们在 Spring 中注册一个 Listener,框架就会为我们自动生成一个对应的 ConcurrentMessageListenerContainer 容器来管理,再根据你配置的并发度来创建多个 KafkaMessageListenerContainer 容器,每个 KafkaMessageListenerContainer 可以粗浅的认为是一个线程,这个线程会不断向 server 端发起 poll 请求来实现监听;
- ConcurrentMessageListenerContainer 是通过 ConcurrentMessageListenerContainerFactory 生产的;一般我们不需要去自定义 ConcurrentMessageListenerContainerFactory,Spring 容器会生成默认的 ConcurrentMessageListenerContainerFactory,也有场景需要我们去自定义 ContainerFactory;
- ConcurrentMessageListenerContainer 中有一个属性 List<KafkaMessageListenerContainer<K, V>> containers,就是用来存放各个 KafkaMessageListenerContainer;需要厘清两者的关系;
三. 核心流程
先来看一下核心方法的调用流程图,略去了部分非核心流程;
执行流程如下:
- Spring 启动;
- Spring 生命周期为 finishRefresh() 时,调用 KafkaListenerEndpointRegistry 中的 start();
- 根据 @KafkaListener 创建对应数量的 ConcurrentMessageListenerContainer;
- 根据并发配置 concurrency 往 ConcurrentMessageListenerContainer 创建对应数量的 KafkaMessageListenerContainer;
- 在每个 KafkaMessageListenerContainer 中创建一个 SimpleAsyncTaskExecutor,值得注意的是 SimpleAsyncTaskExecutor 的作用是创建一条新的线程,并在线程停止时执行 stop();
- 创建一个 ListenerConsumer 注册到 SimpleAsyncTaskExecutor 中,这里的 ListenerConsumer 是一个 Runnable 对象,并且内部会创建聚合一个 KafkaConsumer 对象,SimpleAsyncTaskExecutor 中创建出的线程会执行 ListenerConsumer.run();
- ListenerConsumer 的 run() 被调用;
- run 中开启自旋;
- 不断调用 kafka-client 提供的 poll() 拉取新的消息; - 收到新的消息就执行,执行完了就继续自旋;- 收不新消息,重启下一轮自旋;
四. 分析
1. 启动入口
入口在 SpringApplication.run() -> SpringApplication.refreshContext() -> AbstractApplicationContext.refresh() -> AbstractApplicationContext.finishRefresh();
这个 finishRefresh() 中会调用 LifecycleProssor.onRefresh() 启动 kafka 监听器;
// ------------------------------ AbstractApplicationContext ----------------------------protectedvoidfinishRefresh(){clearResourceCaches();initLifecycleProcessor();// 调用 LifecycleProcessor.onRefresh(),Spring 中默认的是 DefaultLifecycleProcessorgetLifecycleProcessor().onRefresh();publishEvent(newContextRefreshedEvent(this));if(!NativeDetector.inNativeImage()){LiveBeansView.registerApplicationContext(this);}}// ------------------------------ DefaultLifecycleProcessor ----------------------------publicvoidonRefresh(){startBeans(true);this.running =true;}// ------------------------------ DefaultLifecycleProcessor ----------------------------privatevoiddoStart(Map<String,?extendsLifecycle> lifecycleBeans,String beanName,boolean autoStartupOnly){Lifecycle bean = lifecycleBeans.remove(beanName);if(bean !=null&& bean !=this){String[] dependenciesForBean =getBeanFactory().getDependenciesForBean(beanName);for(String dependency : dependenciesForBean){doStart(lifecycleBeans, dependency, autoStartupOnly);}if((!autoStartupOnly ||!(bean instanceofSmartLifecycle)||((SmartLifecycle) bean).isAutoStartup())){try{// 获取容器中的 LifeCycle bean 对象,调用它的 start()// SpringKafka 中对应的是 KafkaListenerEndpointRegistry// 我们重点看一下 KafkaListenerEndpointRegistry.start()
bean.start();}catch(Throwable ex){thrownewApplicationContextException("Failed to start bean '"+ beanName +"'", ex);}}}}
2. KafkaListenerEndpointRegistry
KafkaListenerEndpointRegistry 是 SpringKafka 中很重要的类,是一个 SmartLifecycle 实现类对象,它里面有一个属性 listenerContainers,存放了我们的 ConcurrentMessageListenerContainer 对象;
我们先看它的 start();
// ---------------------------- KafkaListenerEndpointRegistry ---------------------------publicvoidstart(){// 轮询所有的 ConcurrentMessageListenerContainer 对象// 执行 ConcurrentMessageListenerContainer.start()for(MessageListenerContainer listenerContainer :getListenerContainers()){startIfNecessary(listenerContainer);}this.running =true;}// ---------------------------- KafkaListenerEndpointRegistry ---------------------------privatevoidstartIfNecessary(MessageListenerContainer listenerContainer){if((this.contextRefreshed &&this.alwaysStartAfterRefresh)|| listenerContainer.isAutoStartup()){// 执行 ConcurrentMessageListenerContainer.start()
listenerContainer.start();}}// ---------------------------- AbstractMessageListenerContainer ---------------------------publicfinalvoidstart(){checkGroupId();synchronized(this.lifecycleMonitor){if(!isRunning()){// 调用真正干事的 doStart(),进入 ConcurrentMessageListenerContainer.doStart()doStart();}}}
我们看 ConcurrentMessageListenerContainer.doStart() 干了些啥;
3. ConcurrentMessageListenerContainer
我们看下 ConcurrentMessageListenerContainer.doStart() 干了啥;
// ---------------------------- ConcurrentMessageListenerContainer ---------------------------protectedvoiddoStart(){if(!isRunning()){checkTopics();ContainerProperties containerProperties =getContainerProperties();TopicPartitionOffset[] topicPartitions = containerProperties.getTopicPartitions();if(topicPartitions !=null&&this.concurrency > topicPartitions.length){this.concurrency = topicPartitions.length;}setRunning(true);// 1. 根据 @KafkaListener 中配置的 concurrency 轮询for(int i =0; i <this.concurrency; i++){// 2. 创建 KafkaMessageListenerContainerKafkaMessageListenerContainer<K,V> container =constructContainer(containerProperties, topicPartitions, i);// 3. 对刚创建出的 KafkaMessageListenerContainer 做一些配置configureChildContainer(i, container);if(isPaused()){
container.pause();}// 4. 启动 KafkaMessageListenerContainer
container.start();// 5. 将 KafkaMessageListenerContainer 添加到 ConcurrentMessageListenerContainer 中this.containers.add(container);}}}
关键流程是第 3 步和第 4 步,我们分开来看;
3.1 configureChildContainer()
对刚创建出的 KafkaMessageListenerContainer 做一些配置;
这里创建了一个 SimpleAsyncTaskExecutor,设置进 KafkaMessageListenerContainer 中;
privatevoidconfigureChildContainer(int index,KafkaMessageListenerContainer<K,V> container){String beanName =getBeanName();
beanName =(beanName ==null?"consumer": beanName)+"-"+ index;
container.setBeanName(beanName);ApplicationContext applicationContext =getApplicationContext();if(applicationContext !=null){
container.setApplicationContext(applicationContext);}ApplicationEventPublisher publisher =getApplicationEventPublisher();if(publisher !=null){
container.setApplicationEventPublisher(publisher);}// 设置 clinetIdSuffix,clientId 前缀
container.setClientIdSuffix(this.concurrency >1||this.alwaysClientIdSuffix ?"-"+ index :"");
container.setGenericErrorHandler(getGenericErrorHandler());
container.setCommonErrorHandler(getCommonErrorHandler());
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
container.setRecordInterceptor(getRecordInterceptor());
container.setBatchInterceptor(getBatchInterceptor());
container.setInterceptBeforeTx(isInterceptBeforeTx());
container.setListenerInfo(getListenerInfo());AsyncListenableTaskExecutor exec = container.getContainerProperties().getConsumerTaskExecutor();if(exec ==null){// 1. 创建出 SimpleAsyncTaskExecutor,并加入到 this.executors
exec =newSimpleAsyncTaskExecutor(beanName +"-C-");this.executors.add(exec);// 2. 将当前创建的 SimpleAsyncTaskExecutor 设置到 KafkaMessageListenerContainer
container.getContainerProperties().setConsumerTaskExecutor(exec);}}
3.2 container.start()
调用 KafkaMessageListenerContainer 的 start(),最终调用 KafkaMessageListenerContainer.doStart();
protectedvoiddoStart(){if(isRunning()){return;}ContainerProperties containerProperties =getContainerProperties();checkAckMode(containerProperties);Object messageListener = containerProperties.getMessageListener();AsyncListenableTaskExecutor consumerExecutor = containerProperties.getConsumerTaskExecutor();if(consumerExecutor ==null){
consumerExecutor =newSimpleAsyncTaskExecutor((getBeanName()==null?"":getBeanName())+"-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);}GenericMessageListener<?> listener =(GenericMessageListener<?>) messageListener;ListenerType listenerType =determineListenerType(listener);// 1. 创建 ListenerConsumer// ListenerConsumer 是一个 Runnable 对象// new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员// 它的 run() 比较重要this.listenerConsumer =newListenerConsumer(listener, listenerType);setRunning(true);this.startLatch =newCountDownLatch(1);// 2. 将 ListenerConsumer 任务放入到 SimpleAsyncTaskExecutor 中异步调用this.listenerConsumerFuture = consumerExecutor.submitListenable(this.listenerConsumer);}
ListenerConsumer 是一个 Runnable 对象,new ListenerConsumer() 中会创建一个 KafkaConsumer,并作为属性成员,我们看下 ListenerConsumer.run();
4. ListenerConsumer.run()
我们看下 ListenerConsumer 的 run();可以看到这个任务会进入自旋去处理任务;
publicvoidrun(){ListenerUtils.setLogOnlyMetadata(this.containerProperties.isOnlyLogRecordMetadata());publishConsumerStartingEvent();this.consumerThread =Thread.currentThread();setupSeeks();KafkaUtils.setConsumerGroupId(this.consumerGroupId);this.count =0;this.last =System.currentTimeMillis();initAssignedPartitions();publishConsumerStartedEvent();Throwable exitThrowable =null;// 开启自旋while(isRunning()){// 通过 KafkaConsumer 向 kafka-server 发起 poll 请求pollAndInvoke();}wrapUp(exitThrowable);}
ListenerConsumer 的 pollAndInvoke() 比较绕,总之我们知道它会通过反射调用我们 @KafkaListener 声明的方法;
我们简单看下最终调我们 @KafkaListener 声明方法的地方;
4.1 HandlerAdapter.invoke()
调用到 RecordMessagingMessageListenerAdapter.invoke();
publicObjectinvoke(Message<?> message,Object... providedArgs)throwsException{if(this.invokerHandlerMethod !=null){// 最终的执行入口// 最后会通过反射调用我们的 @KafkaListener 声明的方法returnthis.invokerHandlerMethod.invoke(message, providedArgs);}elseif(this.delegatingHandler.hasDefaultHandler()){Object[] args =newObject[providedArgs.length +1];
args[0]= message.getPayload();System.arraycopy(providedArgs,0, args,1, providedArgs.length);returnthis.delegatingHandler.invoke(message, args);}else{returnthis.delegatingHandler.invoke(message, providedArgs);}}
至此,SpringKafka 分析完毕;
版权归原作者 买断 所有, 如有侵权,请联系我们删除。