zookeeper注册中心实现原理
服务注册:
- springboot项目启动时,自定义监听器ApplicationListener去监听web服务启动事件
- web server启动成功,则触发事件回调方法
- 回调方法中,在zookeeper指定节点下创建临时节点,临时节点的值保存当前项目启动的 ip + port
- 如果某个服务宕机,服务断开一定时间(默认30s)临时节点会自动删除
服务发现:
- springboot项目启动时,会从zookeeper指定节点获取对应服务的所有可用url列表(可以缓存此url列表)
- 然后根据负载均衡算法,将请求负载到url列表中的某一个server上
- 利用spring初始化器扩展机制创建zookeeper节点监听,当节点列表发生变更,则更新url列表缓存
服务注册大体流程
服务注册,等对应的service的容器启动成功,针对微服务项目,一般是spring boot内置的tomcat启动成功,这个服务才可以使用,这个时候才可以将服务注册到zookeeper中。
那么如何知道tomcat容器启动成功了呢?
通过spring的事件监听机制,当tomcat启动成功会发布一个事件,我们可以监听这个事件,当tomcat启动成功做出相应。
Spring事件监听机制
1、手写注册中心
1.1 服务注册
服务注册原理
- 自定义监听器ApplicationListener去监听springboot项目的内置容器启动事件
- 一旦springboot项目内置的tomcat启动成功,会触发监听器回调方法
- 在回调方法中,创建临时节点在zookeeper指定的节点下,当前项目启动的 ip:port 即为节点名称
创建springboot项目order-service
(1) pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.8</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.1</version>
</dependency>
</dependencies>
(2) 配置文件application.properties
server.ip=192.168.9.1
server.port=9090
# 自定义的配置信息
zk.service-name=order-service
zk.server=192.168.1.104:2181
(3) 创建监听器 ApplicationListener
监听spring web服务器已经初始化完成事件 WebServerInitializedEvent
public class ZkApplicationListener implements ApplicationListener<WebServerInitializedEvent> {
@Override
public void onApplicationEvent(WebServerInitializedEvent event) {
System.out.println("事件监听机制的回调...");
// 获取app.properties配置属性
Environment environment = event.getApplicationContext().getEnvironment();
String serviceName = environment.getProperty("zk.service-name");
String ip = environment.getProperty("server.ip");
String port = environment.getProperty("server.port");
String zkServer = environment.getProperty("zk.server");
// 服务注册
ServiceRegistry zookeeperServiceRegistry = new ZookeeperServiceRegistry(serviceName,ip,port,zkServer);
zookeeperServiceRegistry.register();
}
}
(4) SPI配置spring.factories
# Application Listeners
org.springframework.context.ApplicationListener=\
com.zk.serviceregistry.orderservice.listener.ZkApplicationListener
(5) 注册服务到zookeeper
// spring cloud 团队提供了服务注册的接口
public interface ServiceRegistry {
void register();
}
public class ZookeeperServiceRegistry implements ServiceRegistry {
private CuratorFramework curatorFramework;
private final String ip;
private final String port;
private final String serviceName;
private final String basePath = "/zk-registry";
public ZookeeperServiceRegistry(String serviceName, String ip, String port, String zkServer) {
this.serviceName = serviceName;
this.ip = ip;
this.port = port;
this.curatorFramework = CuratorFrameworkFactory
.builder()
.connectionTimeoutMs(20000)
.connectString(zkServer)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
}
@Override
public void register() {
// 服务名称
String serviceNamePath = basePath + "/" + serviceName;
try {
if (curatorFramework.checkExists().forPath(serviceNamePath) == null) {
// 创建持久化的节点,作为服务名称
this.curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(serviceNamePath);
}
String urlNode = curatorFramework.create().withMode(CreateMode.EPHEMERAL).forPath(serviceNamePath + "/" + ip + ":" + port);
System.out.println("服务 " + urlNode + " 成功注册到zookeeper server...");
} catch (Exception e) {
e.printStackTrace();
}
}
}
(6) 启动服务测试
会发现服务注册已经生效,日志中打印127.0.0.1:9090已经注册到zookeeper server
查看zookeeper,发现创建了新的节点
启动多个服务192.168.9.1:9091,192.168.9.1:9092,192.168.9.1:9093,192.168.9.1:9094,新的服务ip:port也会被依次注册到zookeeper中
停掉某个服务比如192.168.9.1:9094去模拟某个服务宕机的情况,当zookeeper server在一定时间内(默认30s)没有收到来自192.168.9.1:9094服务的反馈时,就会认为此服务已经挂了,会将此服务从zookeeper节点中删除
1.2 服务发现
服务发现原理
- 项目启动时自动获取zookeeper中配置的需要调用的服务order-service的所有可用url列表
- 利用zookeeper临时节点特性,如果某个服务节点宕机,那么对应临时节点会在一定时间后自动删除
- 访问服务user-service时,根据负载均衡算法从可用的服务url列表中获取某个节点url
创建springboot项目user-service
(1) pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.8</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.2.1</version>
</dependency>
</dependencies>
(2) 配置文件application.properties
server.port=9999
zk.server=192.168.1.104:2181
(3) SPI配置spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.zk.servicediscovery.userservice.config.ZookeeperDiscoveryAutoConfiguration
(4) 自动配置,项目启动时去执行服务发现
@Configuration
public class ZookeeperDiscoveryAutoConfiguration {
@Resource
private Environment environment;
@Bean
public ServiceDiscoveryImpl serviceDiscovery(){
return new ServiceDiscoveryImpl(environment.getProperty("zk.server"));
}
}
(5) 服务发现与监听
public interface ServiceDiscovery {
// 服务发现:获取所有子节点(所有可用的服务url列表)
List<String> discovery(String serviceName);
// 注册监听:当子节点发生变更(代表有新服务添加或者有服务宕机),则会触发监听,更新服务url列表
void registerWatch(String serviceNamePath);
}
public class ServiceDiscoveryImpl implements ServiceDiscovery {
private final CuratorFramework curatorFramework;
private final String basePath = "/zk-registry";
public ServiceDiscoveryImpl(String zkServer) {
this.curatorFramework = CuratorFrameworkFactory
.builder()
.connectionTimeoutMs(20000)
.connectString(zkServer)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
curatorFramework.start();
}
@Override
public List<String> discovery(String serviceName) {
// /zk-registry/order-service
String serviceNamePath = basePath + "/" + serviceName;
try {
if (this.curatorFramework.checkExists().forPath(serviceNamePath) != null) {
return this.curatorFramework.getChildren().forPath(serviceNamePath);
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public void registerWatch(String serviceNamePath) {
// 永久的监听,当/zk-registry/order-service下的子节点变动,则更新
CuratorCache curatorCache = CuratorCache.build(curatorFramework, serviceNamePath);
CuratorCacheListener listener = CuratorCacheListener.builder().forPathChildrenCache(serviceNamePath, curatorFramework, new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
// 拉模式
System.out.println("最新的urls为: " + curatorFramework.getChildren().forPath(serviceNamePath));
}
}).build();
curatorCache.listenable().addListener(listener);
curatorCache.start();
}
}
(6) 随机访问某个服务节点:模拟负载均衡
public interface LoadBalance {
String select(List<String> urls);
}
public class RandomLoadBalance implements LoadBalance{
@Override
public String select(List<String> urls) {
int len=urls.size();
Random random=new Random();
return urls.get(random.nextInt(len));
}
}
(7) UserController模拟请求
@RestController
@RequestMapping("/user")
public class UserController {
@Autowired
private ServiceDiscovery serviceDiscovery;
@RequestMapping("/discovery")
public void discovery() throws IOException {
List<String> urls= this.serviceDiscovery.discovery("order-service");
LoadBalance loadBalance=new RandomLoadBalance();
String url = loadBalance.select(urls);
System.out.println("获取可用的order-service服务节点路径为: "+url);
String response = new RestTemplate().getForObject("http://" + url + "/order/query", String.class);
System.out.println("order-service response: "+response);
// 添加对节点order-service的监听
this.serviceDiscovery.registerWatch("/zk-registry/order-service");
}
}
(8) 测试
访问http://192.168.9.1:9999/user/discovery测试
停掉order-service某个服务节点,不需要重启,再次访问user-service
2、Spring Cloud Zookeeper实现注册中心
2.1 示例代码
创建spring-cloud-zookeeper的spring boot项目,Spring Boot版本为2.6.8
(1) pom.xml
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.8</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- spring cloud zookeeper config 配置中心-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-config</artifactId>
</dependency>
<!-- bootstrap.yaml文件所需依赖 -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bootstrap</artifactId>
</dependency>
<!-- spring cloud zookeeper discovery 注册中心-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-zookeeper-discovery</artifactId>
</dependency>
</dependencies>
<!--定义版本的管理-->
<dependencyManagement>
<dependencies>
<!--定义spring cloud的版本-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2021.0.3</version>
<scope>import</scope>
<type>pom</type>
</dependency>
</dependencies>
</dependencyManagement>
(2) 配置文件
application.properties
server.port=9091
spring.cloud.zookeeper.connect-string=192.168.1.104:2181
spring.cloud.zookeeper.discovery.root=/services/registries
spring.application.name=spring-cloud-zookeeper
或者application.yml
spring:
cloud:
zookeeper:
connect-string: 192.168.1.104:2181
discovery:
root: /services/registries
application:
name: spring-cloud-zookeeper
server:
port: 9091
(3) bootstrap.yaml
spring:
profiles:
active: dev
application:
name: spring-cloud-zookeeper # 找哪一个ZNode节点 spring-cloud-zookeeper-dev
cloud:
zookeeper:
config:
root: config # 相当于 /zk-config/spring-cloud-zookeeper-dev
profile-separator: "-"
enabled: true
connect-string: 192.168.1.104:2181
(4) 启动Spring Boot项目,观察Zookeeper Server上的数据
可以发现zookeeper server上自动创建了对应的节点
(5) 服务发现代码
@RestController
public class SpringCloudZkDiscoveryController {
// 1.注入服务发现客户端接口
@Autowired
private DiscoveryClient discoveryClient;
@RequestMapping("/sc-zk-discovery")
public List<ServiceInstance> serviceUrl() {
// 2.调用getInstances方法可获得所有可用实例
List<ServiceInstance> instances = discoveryClient.getInstances("spring-cloud-zookeeper");
String url = instances.get(0).getUri().toString();
System.out.println("url=" + url);
return discoveryClient.getInstances("spring-cloud-zookeeper");
}
}
访问测试
2.2 Spring Cloud Zookeeper注册中心实现原理
(1) 监听器AbstractAutoServiceRegistration监听web容器启动
类似于我们手动实现服务注册,Spring Cloud也自定义了一个监听器 AbstractAutoServiceRegistration 去监听 web服务器启动事件 WebServerInitializedEvent
org.springframework.cloud.client.serviceregistry.AbstractAutoServiceRegistration#onApplicationEvent(WebServerInitializedEvent)源代码片段:
public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
private final ServiceRegistry<R> serviceRegistry;
// .................
public void onApplicationEvent(WebServerInitializedEvent event) {
this.bind(event);
}
@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();
if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {
this.port.compareAndSet(0, event.getWebServer().getPort());
this.start();
}
}
public void start() {
if (!this.isEnabled()) {
} else {
if (!this.running.get()) {
this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration()));
this.register();
if (this.shouldRegisterManagement()) {
this.registerManagement();
}
this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration()));
this.running.compareAndSet(false, true);
}
}
}
}
(2) 服务注册与发现
getRegistration() 获取具体注册实现类
org.springframework.cloud.zookeeper.serviceregistry.ZookeeperAutoServiceRegistration
serviceRegistry.register(registration) 具体服务注册实现
public class ZookeeperServiceRegistry implements ServiceRegistry<ZookeeperRegistration>, SmartInitializingSingleton, Closeable {
protected CuratorFramework curator;
protected ZookeeperDiscoveryProperties properties;
private ServiceDiscovery<ZookeeperInstance> serviceDiscovery;
public void register(ZookeeperRegistration registration) {
try {
this.getServiceDiscovery().registerService(registration.getServiceInstance());
} catch (Exception var3) {
ReflectionUtils.rethrowRuntimeException(var3);
}
}
}
服务发现实现类:创建zookeeper节点,创建节点监听
public class ServiceDiscoveryImpl<T> implements ServiceDiscovery<T> {
public void registerService(ServiceInstance<T> service) throws Exception {
ServiceDiscoveryImpl.Entry<T> newEntry = new ServiceDiscoveryImpl.Entry(service);
ServiceDiscoveryImpl.Entry<T> oldEntry = (ServiceDiscoveryImpl.Entry)this.services.putIfAbsent(service.getId(), newEntry);
ServiceDiscoveryImpl.Entry<T> useEntry = oldEntry != null ? oldEntry : newEntry;
synchronized(useEntry) {
if (useEntry == newEntry) {
// 创建节点监听
useEntry.cache = this.makeNodeCache(service);
}
// 创建zookeeper节点
this.internalRegisterService(service);
}
}
// 创建节点监听
private CuratorCacheBridge makeNodeCache(ServiceInstance<T> instance) {
if (!this.watchInstances) {
return null;
} else {
CuratorCacheBridge cache = CuratorCache.bridgeBuilder(this.client, this.pathForInstance(instance.getName(), instance.getId())).withOptions(new Options[]{Options.SINGLE_NODE_CACHE}).withDataNotCached().build();
CuratorCacheListener listener = CuratorCacheListener.builder().afterInitialized().forAll((__, ___, data) -> {
if (data != null) {
try {
ServiceInstance<T> newInstance = this.serializer.deserialize(data.getData());
ServiceDiscoveryImpl.Entry<T> entry = (ServiceDiscoveryImpl.Entry)this.services.get(newInstance.getId());
if (entry != null) {
synchronized(entry) {
entry.service = newInstance;
}
}
} catch (Exception var10) {
this.log.debug("Could not deserialize: " + data.getPath());
}
} else {
this.log.warn("Instance data has been deleted for: " + instance);
}
}).build();
cache.listenable().addListener(listener);
cache.start();
return cache;
}
}
// 创建zookeeper节点
@VisibleForTesting
protected void internalRegisterService(ServiceInstance<T> service) throws Exception {
byte[] bytes = this.serializer.serialize(service);
String path = this.pathForInstance(service.getName(), service.getId());
int MAX_TRIES = true;
boolean isDone = false;
for(int i = 0; !isDone && i < 2; ++i) {
try {
CreateMode mode;
switch(service.getServiceType()) {
case DYNAMIC:
mode = CreateMode.EPHEMERAL;
break;
case DYNAMIC_SEQUENTIAL:
mode = CreateMode.EPHEMERAL_SEQUENTIAL;
break;
default:
mode = CreateMode.PERSISTENT;
}
((ACLBackgroundPathAndBytesable)this.client.create().creatingParentContainersIfNeeded().withMode(mode)).forPath(path, bytes);
isDone = true;
} catch (NodeExistsException var8) {
this.client.delete().forPath(path);
}
}
}
}
(3) spring容器启动事件
SpringBoot项目启动 -> webServer启动 -> 监听器监听服务启动事件执行流程:
SpringApplication.run(args) -> refreshContext(context) -> refresh(context) -> ServletWebServerApplicationContext.refresh() -> AbstractApplicationContext.refresh() -> finishRefresh() -> DefaultLifecycleProcessor.onRefresh() -> startBeans(true) -> DefaultLifecycleProcessor$LifecycleGroup.start() -> doStart() -> WebServerStartStopLifecycle.start() -> AbstractApplicationContext.publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext)) -> SimpleApplicationEventMulticaster.multicastEvent(applicationEvent, eventType) -> invokeListener(listener, event) -> doInvokeListener(listener, event) -> listener.onApplicationEvent(event);
堆栈信息:
onApplicationEvent:12, ZkApplicationListener (com.zk.serviceregistry.orderservice.listener) doInvokeListener:176, SimpleApplicationEventMulticaster (org.springframework.context.event) invokeListener:169, SimpleApplicationEventMulticaster (org.springframework.context.event) multicastEvent:143, SimpleApplicationEventMulticaster (org.springframework.context.event) publishEvent:421, AbstractApplicationContext (org.springframework.context.support) publishEvent:378, AbstractApplicationContext (org.springframework.context.support) start:46, WebServerStartStopLifecycle (org.springframework.boot.web.servlet.context) doStart:178, DefaultLifecycleProcessor (org.springframework.context.support) access$200:54, DefaultLifecycleProcessor (org.springframework.context.support) start:356, DefaultLifecycleProcessor$LifecycleGroup (org.springframework.context.support) accept:-1, 1643565953 (org.springframework.context.support.DefaultLifecycleProcessor$$Lambda$541) forEach:75, Iterable (java.lang) startBeans:155, DefaultLifecycleProcessor (org.springframework.context.support) onRefresh:123, DefaultLifecycleProcessor (org.springframework.context.support) finishRefresh:935, AbstractApplicationContext (org.springframework.context.support) refresh:586, AbstractApplicationContext (org.springframework.context.support) refresh:145, ServletWebServerApplicationContext (org.springframework.boot.web.servlet.context) refresh:745, SpringApplication (org.springframework.boot) refreshContext:420, SpringApplication (org.springframework.boot) run:307, SpringApplication (org.springframework.boot) run:1317, SpringApplication (org.springframework.boot) run:1306, SpringApplication (org.springframework.boot) main:10, OrderServiceApplication (com.zk.serviceregistry.orderservice)
- SpringApplicaton.run()
public class SpringApplication {
public ConfigurableApplicationContext run(String... args) {
long startTime = System.nanoTime();
DefaultBootstrapContext bootstrapContext = this.createBootstrapContext();
ConfigurableApplicationContext context = null;
this.configureHeadlessProperty();
SpringApplicationRunListeners listeners = this.getRunListeners(args);
listeners.starting(bootstrapContext, this.mainApplicationClass);
try {
ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
ConfigurableEnvironment environment = this.prepareEnvironment(listeners, bootstrapContext, applicationArguments);
this.configureIgnoreBeanInfo(environment);
Banner printedBanner = this.printBanner(environment);
context = this.createApplicationContext();
context.setApplicationStartup(this.applicationStartup);
this.prepareContext(bootstrapContext, context, environment, listeners, applicationArguments, printedBanner);
this.refreshContext(context); // 1
this.afterRefresh(context, applicationArguments);
Duration timeTakenToStartup = Duration.ofNanos(System.nanoTime() - startTime);
if (this.logStartupInfo) {
(new StartupInfoLogger(this.mainApplicationClass)).logStarted(this.getApplicationLog(), timeTakenToStartup);
}
listeners.started(context, timeTakenToStartup);
this.callRunners(context, applicationArguments);
} catch (Throwable var12) {
this.handleRunFailure(context, var12, listeners);
throw new IllegalStateException(var12);
}
try {
Duration timeTakenToReady = Duration.ofNanos(System.nanoTime() - startTime);
listeners.ready(context, timeTakenToReady);
return context;
} catch (Throwable var11) {
this.handleRunFailure(context, var11, (SpringApplicationRunListeners)null);
throw new IllegalStateException(var11);
}
}
}
- refreshContext(context)
private void refreshContext(ConfigurableApplicationContext context) {
if (this.registerShutdownHook) {
shutdownHook.registerApplicationContext(context);
}
this.refresh(context); // 2
}
- AbstractApplicationContext.refresh()
public void refresh() throws BeansException, IllegalStateException {
synchronized(this.startupShutdownMonitor) {
StartupStep contextRefresh = this.applicationStartup.start("spring.context.refresh");
this.prepareRefresh();
ConfigurableListableBeanFactory beanFactory = this.obtainFreshBeanFactory();
this.prepareBeanFactory(beanFactory);
try {
this.postProcessBeanFactory(beanFactory);
StartupStep beanPostProcess = this.applicationStartup.start("spring.context.beans.post-process");
this.invokeBeanFactoryPostProcessors(beanFactory);
this.registerBeanPostProcessors(beanFactory);
beanPostProcess.end();
this.initMessageSource();
this.initApplicationEventMulticaster();
this.onRefresh();
this.registerListeners();
this.finishBeanFactoryInitialization(beanFactory);
this.finishRefresh(); // 3
} catch (BeansException var10) {
if (this.logger.isWarnEnabled()) {
this.logger.warn("Exception encountered during context initialization - cancelling refresh attempt: " + var10);
}
this.destroyBeans();
this.cancelRefresh(var10);
throw var10;
} finally {
this.resetCommonCaches();
contextRefresh.end();
}
}
}
- finishRefresh()
protected void finishRefresh() {
this.clearResourceCaches();
this.initLifecycleProcessor();
this.getLifecycleProcessor().onRefresh(); // 4
this.publishEvent((ApplicationEvent)(new ContextRefreshedEvent(this)));
if (!NativeDetector.inNativeImage()) {
LiveBeansView.registerApplicationContext(this);
}
}
- DefaultLifecycleProcessor.onRefresh()
public void onRefresh() {
this.startBeans(true); // 5
this.running = true;
}
- startBeans(true)
private void startBeans(boolean autoStartupOnly) {
Map<String, Lifecycle> lifecycleBeans = this.getLifecycleBeans();
Map<Integer, DefaultLifecycleProcessor.LifecycleGroup> phases = new TreeMap();
lifecycleBeans.forEach((beanName, bean) -> {
if (!autoStartupOnly || bean instanceof SmartLifecycle && ((SmartLifecycle)bean).isAutoStartup()) {
int phase = this.getPhase(bean);
((DefaultLifecycleProcessor.LifecycleGroup)phases.computeIfAbsent(phase, (p) -> {
return new DefaultLifecycleProcessor.LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
})).add(beanName, bean);
}
});
if (!phases.isEmpty()) {
phases.values().forEach(DefaultLifecycleProcessor.LifecycleGroup::start); // 6
}
}
- DefaultLifecycleProcessor$LifecycleGroup.start()
public void start() {
if (!this.members.isEmpty()) {
if (DefaultLifecycleProcessor.this.logger.isDebugEnabled()) {
DefaultLifecycleProcessor.this.logger.debug("Starting beans in phase " + this.phase);
}
Collections.sort(this.members);
Iterator var1 = this.members.iterator();
while(var1.hasNext()) {
DefaultLifecycleProcessor.LifecycleGroupMember member = (DefaultLifecycleProcessor.LifecycleGroupMember)var1.next();
DefaultLifecycleProcessor.this.doStart(this.lifecycleBeans, member.name, this.autoStartupOnly); // 7
}
}
}
- DefaultLifecycleProcessor.doStart()
private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
Lifecycle bean = (Lifecycle)lifecycleBeans.remove(beanName);
if (bean != null && bean != this) {
String[] dependenciesForBean = this.getBeanFactory().getDependenciesForBean(beanName);
String[] var6 = dependenciesForBean;
int var7 = dependenciesForBean.length;
for(int var8 = 0; var8 < var7; ++var8) {
String dependency = var6[var8];
this.doStart(lifecycleBeans, dependency, autoStartupOnly);
}
if (!bean.isRunning() && (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle)bean).isAutoStartup())) {
if (this.logger.isTraceEnabled()) {
this.logger.trace("Starting bean '" + beanName + "' of type [" + bean.getClass().getName() + "]");
}
try {
bean.start(); // 8
} catch (Throwable var10) {
throw new ApplicationContextException("Failed to start bean '" + beanName + "'", var10);
}
}
}
}
- 发布web服务启动完成事件
事件 ServletWebServerInitializedEvent extends WebServerInitializedEvent
public void start() {
this.webServer.start();
this.running = true;
this.applicationContext.publishEvent(new ServletWebServerInitializedEvent(this.webServer, this.applicationContext)); // 9
}
- 发布事件 AbstractApplicationContext.publishEvent()
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
} else {
this.getApplicationEventMulticaster().multicastEvent((ApplicationEvent)applicationEvent, eventType); // 10
}
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext)this.parent).publishEvent(event, eventType);
} else {
this.parent.publishEvent(event);
}
}
}
- 广播器发布事件 SimpleApplicationEventMulticaster.multicastEvent(event, eventType)
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = eventType != null ? eventType : this.resolveDefaultEventType(event);
Executor executor = this.getTaskExecutor();
Iterator var5 = this.getApplicationListeners(event, type).iterator();
while(var5.hasNext()) {
ApplicationListener<?> listener = (ApplicationListener)var5.next();
if (executor != null) {
executor.execute(() -> {
this.invokeListener(listener, event);
});
} else {
this.invokeListener(listener, event); // 11
}
}
}
- 调用监听器 invokeListener(listener, event)
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = this.getErrorHandler();
if (errorHandler != null) {
try {
this.doInvokeListener(listener, event);
} catch (Throwable var5) {
errorHandler.handleError(var5);
}
} else {
this.doInvokeListener(listener, event); // 12
}
}
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event); // 13
} catch (ClassCastException var6) {}
}
- 调用监听器事件回调方法
public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
public void onApplicationEvent(WebServerInitializedEvent event) {
// TODO ..................
}
}
版权归原作者 Lucifer Zhao 所有, 如有侵权,请联系我们删除。