0


zookeeper选举流程源码分析

zookeeper选举流程源码分析

选举的代码主要是在

QuorumPeer.java

这个类中。

它有一个内部枚举类,用来表示当前节点的状态。

publicenumServerState{LOOKING,FOLLOWING,LEADING,OBSERVING;}

LOOKING: 当前节点在选举过程中

FOLLOWING:当前节点是从节点

LEADING: 当前节点是主节点

OBSERVING: 当前节点是观察者状态,这种状态的节点不参与选举的投票。

QuorumPeer

有个

run

方法,就是用来根据当前节点不同的状态,进行不同的处理。

下面看下这段代码主要的框架

@Overridepublicvoidrun(){updateThreadName();LOG.debug("Starting quorum peer");// 这里是注册jmx消息,不用关注//下面就是选举的框架代码了try{//running 表示当前节点的状态,只要在运行过程中,就会一直根据当前节点的状态进行不同的处理while(running){//getPeerState()用来获取当前节点的状态,就是上面提到的枚举类。//下面就会根据不同的状态进行不同的处理switch(getPeerState()){caseLOOKING:LOG.info("LOOKING");......//选举就是调用下面的这行代码来完成的。//后面我们也就单独就这个代码来进行分析setCurrentVote(makeLEStrategy().lookForLeader());......break;caseOBSERVING:......//按照观察者的逻辑进行处理  break;caseFOLLOWING:......//按照从节点的逻辑进行处理break;caseLEADING:......//按照主节点的逻辑进行处理break;}
                start_fle =Time.currentElapsedTime();}}finally{......}}

上面代码的逻辑还是比较清楚的,就是一直在这几种状态之间处理。

每种状态的处理逻辑基本都是如下

                    try {
                      //处理业务逻辑,正常情况下,会一直在这里。
                      //除非当前的状态逻辑已经处理完毕,如LOOKING,或者抛出了异常,这时就需要重置状态
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                         //重置状态
                      updateServerState();
                    }

下面我们看看上面选举的这行代码

 setCurrentVote(makeLEStrategy().lookForLeader());

。这行代码会调用具体执行选举的类执行具体的选举操作,并返回对应的投票信息,并设置成当前的投票信息。

默认的选举的是

FastLeaderElection

,对应的选举逻辑就在

lookForLeader

方法中。下面我们就直接去看看

FastLeaderElection

lookForLeader

方法吧。

选举的主要逻辑就是告诉其他节点。我是谁,我选谁做为主节点。

publicVotelookForLeader()throwsInterruptedException{......try{//recvset用来保存投票信息,//key表示选民身份,也就是这个票是谁投的(注意:每个节点只会有一个有效的投票,后面的投票会覆盖掉之前的投票)//value用来表示具体投票的内容HashMap<Long,Vote> recvset =newHashMap<Long,Vote>();HashMap<Long,Vote> outofelection =newHashMap<Long,Vote>();int notTimeout = finalizeWait;synchronized(this){//每次投票前,会先更新这个logicalclock逻辑时钟,这个用来表示当前是第几次选举了,对比投票信息的时候会用到,这个很关键
                logicalclock.incrementAndGet();//首先给自己投一票updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch());}LOG.info("New election. My id =  "+ self.getId()+", proposed zxid=0x"+Long.toHexString(proposedZxid));//发送投票信息sendNotifications();

在上面的代码,首先会把

logicalclock

+1,表示当前是启动后的第几轮选取,这个参数是保存在内存中的,也就是每次启动都会从0开始。

那会不会出现节点之间

logicalclock

不同的情况呢,这个情况是有可能会出现的。不过后面选举过程中,相互发送消息也就会发送

logicalclock

,会和自己的

logicalclock

比较,进行修正。

在开始选举的时候,首先会给自己投一票。

会调用

sendNotifications

方法将投票者(自己)的信息和投票信息发出去。

会发送这些信息:

  • proposedLeader : 选举的主节点
  • proposedZxid: 选举的节点zxid,这个字段是long类型,前面32 bit表示epoch,后面32bit表示事务id
  • logicalclock:投票者的逻辑时钟
  • QuorumPeer.ServerState.LOOKING:投票者的状态(投票的状态肯定是looking)
  • sid: 投票者的id
  • proposedEpoch :选举节点的epoch,也就是proposedZxid的前面32 bit

下面看看具体的选举代码

//如果当前节点一直是looking,且服务没有停止,就会一直进行选举流程while((self.getPeerState()==ServerState.LOOKING)&&(!stop)){// 获取其他节点发送过来的消息Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);//如果没有收到消息,就去检查下和其他节点的连接是否正常,尽力使消息能发送。if(n ==null){......}// 验证收到消息的节点和它选举的主节点是否有效elseif(validVoter(n.sid)&&validVoter(n.leader)){//这里就会根据收到消息的节点状态进行分别进行处理// 比如自己是后加入进来的,这时就已经有了leader节点,对应的也就有follow节点// 也有可能大家都刚启动,或者主节点挂掉了,这时大家都会又会是looking状态switch(n.state){// 如果对方节点是投票状态caseLOOKING:// If notification > current, replace and send messages out//首先比较logicalclock,如果对方的logicalclock比自己的大,就修正自己的`logicalclock`,同时清空自己的票箱,重新计票if(n.electionEpoch > logicalclock.get()){
                            logicalclock.set(n.electionEpoch);
                            recvset.clear();// 这里会比较票的信息,如果对方选的leader节点的比自己大,就推举对方选的leader节点,否则还是将票投给自己if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,getInitId(),getInitLastLoggedZxid(),getPeerEpoch())){updateProposal(n.leader, n.zxid, n.peerEpoch);}else{updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch());}sendNotifications();// 如果自己的logicalclock 比对方的大,直接忽略对方的票}elseif(n.electionEpoch < logicalclock.get()){......break;// 如果logicalclock相等,那就直接比较自己当前选出来的leader和对方选出来的leader进行比较,如果自己的大,就不做处理,如果对方的大,就更新自己的票,重新投票}elseif(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)){updateProposal(n.leader, n.zxid, n.peerEpoch);sendNotifications();}//在这里将对方的票扔进投票箱// don't care about the version if it's in LOOKING state
                        recvset.put(n.sid,newVote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));//这里就对投票进行统计了,如果过半,就要设置leader了,不过在这之前,会再等一个时间,看看其他节点是否有选出更适合的leader。//如果没有,那就设置对方节点选出来的leader为主节点,对比下leader是不是自己,如果是自己,就将自己的状态修改为leader,否则就修改成follow。同时保存当前leader信息if(termPredicate(recvset,newVote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))){// Verify if there is any change in the proposed leaderwhile((n = recvqueue.poll(finalizeWait,TimeUnit.MILLISECONDS))!=null){if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);break;}}/*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */if(n ==null){
                                self.setPeerState((proposedLeader == self.getId())?ServerState.LEADING:learningState());Vote endVote =newVote(proposedLeader,
                                        proposedZxid, logicalclock.get(), 
                                        proposedEpoch);leaveInstance(endVote);return endVote;}}break;caseOBSERVING://这种状态的节点是不参与投票的,所以对它的发送的投票信息进行忽略。break;caseFOLLOWING:caseLEADING://如果对方是following或者leading,说明当前已经有主节点了,在这里就直接统计票数信息,并验证根据票数信息统计出来的leader节点和回应自己消息的自称leader节点 是不是同一个,如果是同一个,说明信息是吻合的,就会去设置自己的节点状态。需要注意的是,投票信息不但会发送给其他节点,也会给自己发送。所以这里会判断对方节点是否是当前节点。......break;default:......}}else{......}}returnnull;}finally{......}}

比较节点大小也比较简单。

((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));

首先比较epoch,其次比较zxid,最后比较myid。

myid就是我们在zookeeper每个节点中设置myid文件中对应的值。

zxid是两部分,前32bit epoch,后32 bit 事务序号。在一个节点成为leader节点后,首先会将epoch的值+1,同时将事务序号设置成0。zxid是持久化写入文件的,所以重启也不会丢失。

logicalclock在内存中,所以每次启动都会从0开始。

给其他节点发送投票消息的时候,也会给自己发送,其他节点是通过网络发送,给自己是直接放到接收投票信息的队列。

标签: zookeeper linux java

本文转载自: https://blog.csdn.net/wbo112/article/details/132462218
版权归原作者 wbo112 所有, 如有侵权,请联系我们删除。

“zookeeper选举流程源码分析”的评论:

还没有评论