0


Zookeeper断网重连事件回调源码分析

“不积跬步,无以至千里。”

背景

  • 确定使用Curator作为zk客户端的情况下,断网[发生SUSPENDED | LOST事件]重连后每次都会回调org.apache.curator.framework.state.ConnectionStateListener#stateChanged方法,且事件类型为org.apache.curator.framework.state.ConnectionState#RECONNECTED
  • 部署zookeeper的版本为最新稳定版3.8.3,curator-recipes相关依赖的版本为5.5.0

源码分析过程

  • 首先需要构建一个CuratorFramework对象,并基于这个CuratorFramework对象创建一个用于实现Leader选举功能的LeaderSelector,并将它启动publicstaticfinalString leaderSelectorPath ="/source-code-analyse/reconnect";publicstaticvoidmain(String[] args)throwsInterruptedException{CuratorFramework curatorFramework =newCuratorFramework();LeaderSelectorListener leaderSelectorListener =newLeaderSelectorListener(){@OverridepublicvoidstateChanged(CuratorFramework curatorFramework,ConnectionState connectionState){System.out.println("Thread "+Thread.currentThread().getName()+" Connection state changed : "+ connectionState);}@OverridepublicvoidtakeLeadership(CuratorFramework curatorFramework)throwsException{System.out.println("Thread "+Thread.currentThread().getName()+" get the leader.");TimeUnit.SECONDS.sleep(20);}};LeaderSelector leaderSelector =newLeaderSelector(curatorFramework, leaderSelectorPath, leaderSelectorListener); leaderSelector.start();TimeUnit.SECONDS.sleep(100); leaderSelector.close(); curatorFramework.close();System.out.println("Test completed.");}``````publicstaticCuratorFrameworknewCuratorFramework(){CuratorFramework curatorFramework =CuratorFrameworkFactory.builder().connectString("192.168.0.104:2181").sessionTimeoutMs(30000).connectionTimeoutMs(6000).retryPolicy(newExponentialBackoffRetry(1000,3)).threadFactory(ThreadUtils.newThreadFactory("ReconnectionTestThread")).build(); curatorFramework.start();return curatorFramework;}
  • 由于LeaderSelector的功能实现需要基于CuratorFramework,于是应该先看看CuratorFramework的start方法,直接看实现类CuratorFrameworkImpl@Overridepublicvoidstart(){ log.info("Starting");if(!state.compareAndSet(CuratorFrameworkState.LATENT,CuratorFrameworkState.STARTED)){thrownewIllegalStateException("Cannot be started more than once");}try{ connectionStateManager.start();//省略代码
  • 发现CuratorFrameworkImpl内部维护了一个与连接状态管理器,start方法中会启动它
  • ConnectionStateManager的start方法中,会向线程池提交一个任务,去调用processEvents方法publicvoidstart(){Preconditions.checkState(state.compareAndSet(State.LATENT,State.STARTED),"Cannot be started more than once"); service.submit (newCallable<Object>(){@OverridepublicObjectcall()throwsException{processEvents();returnnull;}});}
  • processEvents方法里面核心的内容就是,从eventQueue的一个阻塞队列中不断调用poll方法获取ConnectionState对象,因为处在一个while循环中,只要当前连接状态正常,就会一直去pollprivatevoidprocessEvents(){while( state.get()==State.STARTED){try{int useSessionTimeoutMs =getUseSessionTimeoutMs();long elapsedMs = startOfSuspendedEpoch ==0? useSessionTimeoutMs /2:System.currentTimeMillis()- startOfSuspendedEpoch;long pollMaxMs = useSessionTimeoutMs - elapsedMs;finalConnectionState newState = eventQueue.poll(pollMaxMs,TimeUnit.MILLISECONDS);if( newState !=null){if( listeners.isEmpty()){ log.warn("There are no ConnectionStateListeners registered.");} listeners.forEach(listener -> listener.stateChanged(client, newState));}elseif( sessionExpirationPercent >0){synchronized(this){checkSessionExpiration();}}synchronized(this){if((currentConnectionState ==ConnectionState.LOST)&& client.getZookeeperClient().isConnected()){// CURATOR-525 - there is a race whereby LOST is sometimes set after the connection has been repaired// this "hack" fixes it by forcing the state to RECONNECTED log.warn("ConnectionState is LOST but isConnected() is true. Forcing RECONNECTED.");addStateChange(ConnectionState.RECONNECTED);}}}catch(InterruptedException e ){// swallow the interrupt as it's only possible from either a background// operation and, thus, doesn't apply to this loop or the instance// is being closed in which case the while test will get it}}}
  • 随后遍历所有的ConnectionStateListener,回调stateChanged方法,LeaderSelector有一个静态内部类叫做WrappedListener实现了LeaderSelectorListener,则这个WrappedListener的stateChanged方法会被回调@OverridepublicvoidstateChanged(CuratorFramework client,ConnectionState newState){try{ listener.stateChanged(client, newState);}catch(CancelLeadershipException dummy ){// If we cancel only leadership but not whole election, then we could hand over// dated leadership to client with no further cancellation. Dated leadership is// possible due to separated steps in leadership acquire: server data(e.g. election sequence)// change and client flag(e.g. hasLeadership) set. leaderSelector.cancelElection();}}
  • 而上面的listener.stateChanged(client, newState)中listener变量就是构造LeaderSelector时传入的第三个构造参数:LeaderSelectorListener,就是我们自己实现的LeaderSelectorListener所以最终会回调到我们自定义的LeaderSelectorListener#stateChanged()方法
  • 那么现在需要搞清楚ConnectionStateManager中的eventQueue是在哪里被放进去的
  • 追溯一下方法调用,发现eventQueue中的元素,是在ConnectionStateManager#postState方法中offer进去的privatevoidpostState(ConnectionState state){ log.info("State change: "+ state);notifyAll();//如果队列满了,offer失败,会先poll,之后继续offerwhile(!eventQueue.offer(state)){ eventQueue.poll(); log.warn("ConnectionStateManager queue full - dropping events to make room");}}
  • 继续追溯来到org.apache.curator.framework.state.ConnectionStateManager#addStateChange方法publicsynchronizedbooleanaddStateChange(ConnectionState newConnectionState){//如果client不是启动状态直接返回falseif( state.get()!=State.STARTED){returnfalse;}ConnectionState previousState = currentConnectionState;//如果新的连接状态和前一个一样,说明连接状态没有发生变化,不产生事件,直接返回了if( previousState == newConnectionState ){returnfalse;} currentConnectionState = newConnectionState;ConnectionState localState = newConnectionState;boolean isNegativeMessage =((newConnectionState ==ConnectionState.LOST)||(newConnectionState ==ConnectionState.SUSPENDED)||(newConnectionState ==ConnectionState.READ_ONLY));//如果是第一次连接,设置状态为CONNECTEDif(!isNegativeMessage && initialConnectMessageSent.compareAndSet(false,true)){ localState =ConnectionState.CONNECTED;}postState(localState);returntrue;}
  • 继续看addStateChange方法被org.apache.curator.framework.imps.CuratorFrameworkImpl#validateConnection调用voidvalidateConnection(Watcher.Event.KeeperState state){//state为Disconnected的时候产生SUSPENDED事件if( state ==Watcher.Event.KeeperState.Disconnected){suspendConnection();}//state为Expired的时候产生LOST事件elseif( state ==Watcher.Event.KeeperState.Expired){ connectionStateManager.addStateChange(ConnectionState.LOST);}//state为SyncConnected的时候产生RECONNECTED事件elseif( state ==Watcher.Event.KeeperState.SyncConnected){ connectionStateManager.addStateChange(ConnectionState.RECONNECTED);}//state为ConnectedReadOnly的时候产生READ_ONLY事件elseif( state ==Watcher.Event.KeeperState.ConnectedReadOnly){ connectionStateManager.addStateChange(ConnectionState.READ_ONLY);}}
  • 继续追溯validateConnection()的调用方是org.apache.curator.framework.imps.CuratorFrameworkImpl#processEvent``````privatevoidprocessEvent(finalCuratorEvent curatorEvent){//只有事件类型是WATCHED时候,会调用这个validateConnection方法,连接状态的变更事件就是WARCHEDif( curatorEvent.getType()==CuratorEventType.WATCHED){validateConnection(curatorEvent.getWatchedEvent().getState());}//省略代码}
  • 这个processEvent方法在连接状态发生变化时,会被CuratorFrameworkImplCuratorZookeeperClient传入的一个匿名内部类Watcher给调用publicCuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder){//这个ZookeeperFactory就是Curator创建Zookeeper的一个工厂ZookeeperFactory localZookeeperFactory =makeZookeeperFactory(builder.getZookeeperFactory(), builder.getZkClientConfig());this.client =newCuratorZookeeperClient( localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), builder.getWaitForShutdownTimeoutMs(),newWatcher(){@Overridepublicvoidprocess(WatchedEvent watchedEvent){CuratorEvent event =newCuratorEventImpl(CuratorFrameworkImpl.this,CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(),unfixForNamespace(watchedEvent.getPath()),null,null,null,null,null, watchedEvent,null,null);processEvent(event);}}, builder.getRetryPolicy(), builder.canBeReadOnly());//省略代码}
  • 并且在CuratorZookeeperClient构造函数中,创建了一个ConnectionState对象,用来管理客户端与zk的连接事件,同时把刚才的Watcher作为构造参数传给了ConnectionState,放到一个parentWatchers的队列中publicCuratorZookeeperClient(ZookeeperFactory zookeeperFactory,EnsembleProvider ensembleProvider,int sessionTimeoutMs,int connectionTimeoutMs,int waitForShutdownTimeoutMs,Watcher watcher,RetryPolicy retryPolicy,boolean canBeReadOnly){if( sessionTimeoutMs < connectionTimeoutMs ){ log.warn(String.format("session timeout [%d] is less than connection timeout [%d]", sessionTimeoutMs, connectionTimeoutMs));} retryPolicy =Preconditions.checkNotNull(retryPolicy,"retryPolicy cannot be null"); ensembleProvider =Preconditions.checkNotNull(ensembleProvider,"ensembleProvider cannot be null");this.connectionTimeoutMs = connectionTimeoutMs;this.waitForShutdownTimeoutMs = waitForShutdownTimeoutMs;//创建了一个ConnectionState对象,管理客户端与zk的连接状态 state =newConnectionState(zookeeperFactory, ensembleProvider, sessionTimeoutMs, watcher, tracer, canBeReadOnly);setRetryPolicy(retryPolicy);}``````ConnectionState(ZookeeperFactory zookeeperFactory,EnsembleProvider ensembleProvider,int sessionTimeoutMs,Watcher parentWatcher,AtomicReference<TracerDriver> tracer,boolean canBeReadOnly){this.ensembleProvider = ensembleProvider;this.tracer = tracer;if( parentWatcher !=null){//把匿名内部类的Watcher对象传进来,放到parentWatchers中 parentWatchers.offer(parentWatcher);} handleHolder =newHandleHolder(zookeeperFactory,this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);}
  • 然后在ConnectionState对象中看看哪些地方使用了这个parentWatchers对象,发现是一个process()方法@Overridepublicvoidprocess(WatchedEvent event){if(LOG_EVENTS){ log.debug("ConnectState watcher: "+ event);}if( event.getType()==Watcher.Event.EventType.None){boolean wasConnected = isConnected.get();boolean newIsConnected =checkState(event.getState(), wasConnected);if( newIsConnected != wasConnected ){ isConnected.set(newIsConnected); connectionStartMs =System.currentTimeMillis();if( newIsConnected ){ lastNegotiatedSessionTimeoutMs.set(handleHolder.getNegotiatedSessionTimeoutMs()); log.debug("Negotiated session timeout: "+ lastNegotiatedSessionTimeoutMs.get());}}}for(Watcher parentWatcher : parentWatchers ){OperationTrace trace =newOperationTrace("connection-state-parent-process", tracer.get(),getSessionId());//遍历Watcher,调用process方法,目前已知是在CuratorFrameworkImpl构造器中new的一个匿名Watcher,会回到我们自定义的ConnectionStateListener parentWatcher.process(event); trace.commit();}}
  • 那么ConnectionState#process方法又是在哪里被调用的呢?这个找的有点深了,最终经过断点发现是在org.apache.zookeeper.ClientCnxn.EventThread#processEvent中被调用privatevoidprocessEvent(Object event){try{if(event instanceofWatcherSetEventPair){// each watcher will process the eventWatcherSetEventPair pair =(WatcherSetEventPair) event;for(Watcher watcher : pair.watchers){try{ watcher.process(pair.event);}catch(Throwable t){LOG.error("Error while calling watcher ", t);}}//省略代码
  • 这个ClientCnxn已经不是Curator的源码了,属于Zookeeper原生API,是最底层用来管理客户端和zookeeper连接的一个组件,在new Zookeeper的时候被初始化,这个Zookeeper之前提了一下,会被Curator框架封装在ConnectionState中ConnectionState(ZookeeperFactory zookeeperFactory,EnsembleProvider ensembleProvider,int sessionTimeoutMs,Watcher parentWatcher,AtomicReference<TracerDriver> tracer,boolean canBeReadOnly){this.ensembleProvider = ensembleProvider;this.tracer = tracer;if( parentWatcher !=null){ parentWatchers.offer(parentWatcher);}//这个zookeeperFactory里面封装了获取Zookepper的方法 handleHolder =newHandleHolder(zookeeperFactory,this, ensembleProvider, sessionTimeoutMs, canBeReadOnly);}
  • org.apache.zookeeper.ClientCnxn.EventThread#processEvent方法又是在org.apache.zookeeper.ClientCnxn.EventThread#run中调用,因为EventThread这个内部类继承了Thread类,所以在创建Zookeeper的时候就调用start()将线程启动了,同时启动的还有SendThread``````publicZooKeeper(String connectString,int sessionTimeout,Watcher watcher,boolean canBeReadOnly)throwsIOException//省略代码... ... cnxn =createConnection( connectStringParser.getChrootPath(), hostProvider, sessionTimeout,this.clientConfig, watcher,getClientCnxnSocket(), canBeReadOnly); cnxn.start();}``````publicvoidstart(){ sendThread.start(); eventThread.start();}
  • 跟踪EventThread源码,可以看到,这个线程的run方法中也是采用while循环的方式不断从一个叫做waitingEvents的阻塞队列中take事件privatefinalLinkedBlockingQueue<Object> waitingEvents =newLinkedBlockingQueue<Object>();@Override@SuppressFBWarnings("JLM_JSR166_UTILCONCURRENT_MONITORENTER")publicvoidrun(){try{ isRunning =true;while(true){Object event = waitingEvents.take();//如果不是一个new Object对象,交给processEvent方法处理if(event == eventOfDeath){ wasKilled =true;}else{processEvent(event);}//省略无关代码... ...}
  • 那么重点就是这个waitingEvents的元素是在哪里add的?
  • 在ClientCnxn中拿这个变量搜索一下,发现有两个地方会add,一个是queueEvent方法,一个是queuePacket方法,显然根据名字来看,第二个应该是添加和ZK进行交互的具体数据的(而后通过打断点的方式也确实验证了这一点),而queueEvent()才是用来添加事件数据的publicvoidqueueEvent(WatchedEvent event){queueEvent(event,null);}privatevoidqueueEvent(WatchedEvent event,Set<Watcher> materializedWatchers){if(event.getType()==EventType.None&& sessionState == event.getState()){return;} sessionState = event.getState();finalSet<Watcher> watchers;if(materializedWatchers ==null){// materialize the watchers based on the event watchers = watchManager.materialize(event.getState(), event.getType(), event.getPath());}else{ watchers =newHashSet<>(materializedWatchers);}WatcherSetEventPair pair =newWatcherSetEventPair(watchers, event);// queue the pair (watch set & event) for later processing waitingEvents.add(pair);}
  • 也就是说,当客户端和ZK server连接状态变更时(如重连)一定会在某个地方调用这个queueEvent方法,把变更状态放到阻塞队列中,等待消费
  • 这块代码比较复杂,有兴趣可以自主阅读org.apache.zookeeper.ClientCnxn.SendThread源码
  • 简单的说,这块的处理流程是这样的:Zookeerper被创建的时候,会创建ClientCnxn,启动两个线程,一个是eventThread,另一个就是sendThread
  • 这个SendThread主要作用就是用来跟zk通信的,而且还会搞一个心跳机制,定期去和zk ping一下,确定连接是正常的
  • 在SendThread的run方法里有一个while循环,会检查如果你是断网状态,会不停的通过ClientCnxnSocket重新建立连接,连不上会重复进行此步骤//如果不是连接状态,会一直尝试建立连接,有兴趣的可以去startConnect方法看看,如果失败,会被外层的Catch块捕获,然后继续来到while循环,重新尝试建立连接if(!clientCnxnSocket.isConnected()){// don't re-establish connection if we are closingif(closing){break;}if(rwServerAddress !=null){ serverAddress = rwServerAddress; rwServerAddress =null;}else{ serverAddress = hostProvider.next(1000);}onConnecting(serverAddress);//这个方法中,最后会通过clientCnxnSocket组件连接zk,clientCnxnSocket.connect(addr);startConnect(serverAddress);// Update now to start the connection timer right after we make a connection attempt clientCnxnSocket.updateNow(); clientCnxnSocket.updateLastSendAndHeard();}
  • 一旦重新重新建立,会在org.apache.zookeeper.ClientCnxn.SendThread#run方法中调用clientCnxnSocket.doTransport,开始和zk收发数据包//pengingQueue是已经发送并正在等待响应的数据包clientCnxnSocket.doTransport(to, pendingQueue,ClientCnxn.this);
  • doTransport方法里面是NIO的代码,有兴趣可以自己研究下
  • 最终会在org.apache.zookeeper.ClientCnxnSocket#readConnectResult读取zk响应的数据包,调用org.apache.zookeeper.ClientCnxn.SendThread#onConnected方法,将数据放入waitingEvents阻塞队列中voidreadConnectResult()throwsIOException{//省略无关代码 sendThread.onConnected(conRsp.getTimeOut(),this.sessionId, conRsp.getPasswd(), isRO);}因为我们和ZK建立的不是一个只读连接,所以事件类型会是SyncConnected``````voidonConnected(int _negotiatedSessionTimeout,long _sessionId,byte[] _sessionPasswd,boolean isRO)throwsIOException{//省略无关代码KeeperState eventState =(isRO)?KeeperState.ConnectedReadOnly:KeeperState.SyncConnected; eventThread.queueEvent(newWatchedEvent(Watcher.Event.EventType.None, eventState,null));}前面已经看到代码,在validate的时候,如果KeeperState是KeeperState.SyncConnected,会触发RECONNECTED事件,最终回调到我们自定义的ConnectionStateListener#stateChanged方法中
  • 有兴趣的可以根据我的思路进行断点调试验证,不过有一些事异步的,注意打断点的时机

验证结果

  • 使用CuratorFramework作为zookeeper客户端连接工具时,当发生断网重连时在自定义的ConnectionStateListener的stateChanged方法中确定会产生RECONNECTED事件

本文转载自: https://blog.csdn.net/Josh_scott/article/details/133826783
版权归原作者 知秋丶 所有, 如有侵权,请联系我们删除。

“Zookeeper断网重连事件回调源码分析”的评论:

还没有评论