0


zookeeper源码(07)leader、follower和observer

Leader

构造方法

publicLeader(QuorumPeer self,LeaderZooKeeperServer zk)throwsIOException{this.self = self;this.proposalStats =newBufferStats();// 获取节点间通信地址Set<InetSocketAddress> addresses;if(self.getQuorumListenOnAllIPs()){
        addresses = self.getQuorumAddress().getWildcardAddresses();}else{
        addresses = self.getQuorumAddress().getAllAddresses();}// 创建ServerSocket并bind地址,add到serverSockets集,启动LearnerCnxAcceptor时使用
    addresses.stream().map(address ->createServerSocket(address, self.shouldUsePortUnification(), self.isSslQuorum())).filter(Optional::isPresent).map(Optional::get).forEach(serverSockets::add);this.zk = zk;}

lead方法

QuorumPeer使用lead方法启动leader节点,从lead方法入手分析leader流程并分析重要的方法:

voidlead()throwsIOException,InterruptedException{
    self.end_fle =Time.currentElapsedTime();long electionTimeTaken = self.end_fle - self.start_fle;
    self.setElectionTimeTaken(electionTimeTaken);ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
    self.start_fle =0;
    self.end_fle =0;

    zk.registerJMX(newLeaderBean(this, zk), self.jmxLocalPeerBean);try{
        self.setZabState(QuorumPeer.ZabState.DISCOVERY);
        self.tick.set(0);// 使用ZooKeeperServer的loadData方法加载db数据// 加载数据、清理session、生成快照(takeSnapshot)
        zk.loadData();

        leaderStateSummary =newStateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());// 启动线程接收Learner连接,创建LearnerHandler与客户端通信
        cnxAcceptor =newLearnerCnxAcceptor();
        cnxAcceptor.start();// 获取上一次同步最终epoch并计算本次的epoch和zxidlong epoch =getEpochToPropose(self.getMyId(), self.getAcceptedEpoch());// 设置新的zxid
        zk.setZxid(ZxidUtils.makeZxid(epoch,0));synchronized(this){
            lastProposed = zk.getZxid();}

        newLeaderProposal.packet =newQuorumPacket(NEWLEADER, zk.getZxid(),null,null);QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();QuorumVerifier curQV = self.getQuorumVerifier();if(curQV.getVersion()==0&& curQV.getVersion()== lastSeenQV.getVersion()){// qv.version == 0try{QuorumVerifier newQV = self.configFromString(curQV.toString());
                newQV.setVersion(zk.getZxid());
                self.setLastSeenQuorumVerifier(newQV,true);}catch(Exception e){thrownewIOException(e);}}

        newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());if(self.getLastSeenQuorumVerifier().getVersion()> self.getQuorumVerifier().getVersion()){
            newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());}// 等待足够数量的ACKEPOCH数据包,表示follower确认newEpoch// follower会把lastLoggedZxid、currentEpoch发送过来,leader用这些参数判断同步数据的方式waitForEpochAck(self.getMyId(), leaderStateSummary);
        self.setCurrentEpoch(epoch);// 设置新的currentEpoch
        self.setLeaderAddressAndId(self.getQuorumAddress(), self.getMyId());
        self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);try{// 等待follower的newLeaderAckwaitForNewLeaderAck(self.getMyId(), zk.getZxid());}catch(InterruptedException e){// 略return;}// 启动zookeeperServerstartZkServer();

        self.setZabState(QuorumPeer.ZabState.BROADCAST);
        self.adminServer.setZooKeeperServer(zk);// We ping twice a tick, so we only update the tick every other iterationboolean tickSkip =true;String shutdownMessage =null;while(true){synchronized(this){long start =Time.currentElapsedTime();long cur = start;long end = start + self.tickTime /2;// 等待tickTime / 2毫秒while(cur < end){wait(end - cur);
                    cur =Time.currentElapsedTime();}if(!tickSkip){
                    self.tick.incrementAndGet();}// 用来判断learner同步状态SyncedLearnerTracker syncedAckSet =newSyncedLearnerTracker();
                syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());if(self.getLastSeenQuorumVerifier()!=null&&
                    self.getLastSeenQuorumVerifier().getVersion()> self.getQuorumVerifier().getVersion()){
                    syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());}

                syncedAckSet.addAck(self.getMyId());// 查询learner的ack状态for(LearnerHandler f :getLearners()){if(f.synced()){
                        syncedAckSet.addAck(f.getSid());}}if(!this.isRunning()){// shutdownbreak;}// 判断超半数learner已是同步状态// 1个tickTime周期判断一次if(!tickSkip &&!syncedAckSet.hasAllQuorums()&&!(self.getQuorumVerifier().overrideQuorumDecision(getForwardingFollowers())&&
                      self.getQuorumVerifier().revalidateOutstandingProp(this,newArrayList<>(outstandingProposals.values()), lastCommitted))){// Lost quorum of last committed and/or last proposed
                    shutdownMessage ="Not sufficient followers synced";break;}
                tickSkip =!tickSkip;}// ping learner// 1个tickTime周期ping两次for(LearnerHandler f :getLearners()){
                f.ping();}}if(shutdownMessage !=null){// leader goes in looking stateshutdown(shutdownMessage);}}finally{
        zk.unregisterJMX(this);}}

getEpochToPropose方法

获取上一次同步的最终epoch并计算zxid的值:

publiclonggetEpochToPropose(long sid,long lastAcceptedEpoch)throwsInterruptedException,IOException{synchronized(connectingFollowers){if(!waitingForNewEpoch){return epoch;}if(lastAcceptedEpoch >= epoch){
            epoch = lastAcceptedEpoch +1;// 更新最新epoch}if(isParticipant(sid)){
            connectingFollowers.add(sid);}QuorumVerifier verifier = self.getQuorumVerifier();// 连接的follower超过了半数if(connectingFollowers.contains(self.getMyId())&& verifier.containsQuorum(connectingFollowers)){
            waitingForNewEpoch =false;
            self.setAcceptedEpoch(epoch);// 设置新的epoch
            connectingFollowers.notifyAll();}else{long start =Time.currentElapsedTime();if(sid == self.getMyId()){
                timeStartWaitForEpoch = start;}long cur = start;long end = start + self.getInitLimit()* self.getTickTime();// 等待initLimit*tickTime毫秒,如果还是waitingForNewEpoch状态抛错,会触发重新选举while(waitingForNewEpoch && cur < end &&!quitWaitForEpoch){
                connectingFollowers.wait(end - cur);
                cur =Time.currentElapsedTime();}if(waitingForNewEpoch){thrownewInterruptedException("Timeout while waiting for epoch from quorum");}}return epoch;}}

waitForEpochAck方法

等待足够数量的ACKEPOCH数据包,表示follower确认newEpoch,follower会把lastLoggedZxid、currentEpoch发送过来,leader用这些参数判断同步数据的方式:

publicvoidwaitForEpochAck(long id,StateSummary ss)throwsIOException,InterruptedException{synchronized(electingFollowers){if(electionFinished){return;}// 略QuorumVerifier verifier = self.getQuorumVerifier();if(electingFollowers.contains(self.getMyId())&& verifier.containsQuorum(electingFollowers)){
            electionFinished =true;
            electingFollowers.notifyAll();}else{long start =Time.currentElapsedTime();long cur = start;long end = start + self.getInitLimit()* self.getTickTime();while(!electionFinished && cur < end){
                electingFollowers.wait(end - cur);
                cur =Time.currentElapsedTime();}if(!electionFinished){thrownewInterruptedException("Timeout while waiting for epoch to be acked by quorum");}}}}

waitForNewLeaderAck方法

等待足够数量的Leader.ACK请求上来,之后才能开始正常通信:

publicvoidwaitForNewLeaderAck(long sid,long zxid)throwsInterruptedException{synchronized(newLeaderProposal.qvAcksetPairs){if(quorumFormed){return;}long currentZxid = newLeaderProposal.packet.getZxid();if(zxid != currentZxid){
            LOG.error("NEWLEADER ACK from sid: {} is from a different epoch - current 0x{} received 0x{}",
                      sid,Long.toHexString(currentZxid),Long.toHexString(zxid));return;}// Note that addAck already checks that the learner is a PARTICIPANT.
        newLeaderProposal.addAck(sid);if(newLeaderProposal.hasAllQuorums()){
            quorumFormed =true;
            newLeaderProposal.qvAcksetPairs.notifyAll();}else{long start =Time.currentElapsedTime();long cur = start;long end = start + self.getInitLimit()* self.getTickTime();while(!quorumFormed && cur < end){
                newLeaderProposal.qvAcksetPairs.wait(end - cur);
                cur =Time.currentElapsedTime();}if(!quorumFormed){thrownewInterruptedException("Timeout while waiting for NEWLEADER to be acked by quorum");}}}}

LearnerCnxAcceptor类

启动LearnerCnxAcceptor线程:

// Start thread that waits for connection requests from new followers.
cnxAcceptor =newLearnerCnxAcceptor();
cnxAcceptor.start();

LearnerCnxAcceptor类:

publicvoidrun(){if(!stop.get()&&!serverSockets.isEmpty()){ExecutorService executor =Executors.newFixedThreadPool(serverSockets.size());CountDownLatch latch =newCountDownLatch(serverSockets.size());// 启动LearnerCnxAcceptorHandler
        serverSockets.forEach(serverSocket ->
                executor.submit(newLearnerCnxAcceptorHandler(serverSocket, latch)));try{
            latch.await();}catch(InterruptedException ie){}finally{// 关闭连接、线程池}}}

LearnerCnxAcceptorHandler类启动监听,接受连接:

classLearnerCnxAcceptorHandlerimplementsRunnable{privateServerSocket serverSocket;privateCountDownLatch latch;LearnerCnxAcceptorHandler(ServerSocket serverSocket,CountDownLatch latch){this.serverSocket = serverSocket;this.latch = latch;}@Overridepublicvoidrun(){try{while(!stop.get()){acceptConnections();// 接受连接}}catch(Exception e){// 关闭}finally{
            latch.countDown();// countdown到0会唤醒LearnerCnxAcceptor}}privatevoidacceptConnections()throwsIOException{Socket socket =null;boolean error =false;try{
            socket = serverSocket.accept();// 接受客户端连接
            socket.setSoTimeout(self.tickTime * self.initLimit);// timeout
            socket.setTcpNoDelay(nodelay);BufferedInputStream is =newBufferedInputStream(socket.getInputStream());// 封装LearnerHandler对象,与客户端通信LearnerHandler fh =newLearnerHandler(socket, is,Leader.this);
            fh.start();}catch(Exception e){// 略}finally{// 略}}}

LearnerHandler

与客户端通信。

关键字段

protectedfinalSocket sock;// 客户端socket// Leader对象finalLearnerMaster learnerMaster;// 给learner的唯一标识protectedlong sid =0;// 发送队列finalLinkedBlockingQueue<QuorumPacket> queuedPackets =newLinkedBlockingQueue<>();// zxidprotectedvolatilelong lastZxid =-1;// 输出输入流privateBinaryInputArchive ia;privateBinaryOutputArchive oa;privatefinalBufferedInputStream bufferedInput;privateBufferedOutputStream bufferedOutput;// learner类型 PARTICIPANT/OBSERVERprivateLearnerType learnerType =LearnerType.PARTICIPANT;

run方法

  1. 接收Leader.FOLLOWERINFO或Leader.OBSERVERINFO数据包,解析type、sid等关键字段,计算newEpoch和newLeaderZxid
  2. 发送Leader.LEADERINFO数据包,包含newLeaderZxid值
  3. 读取Leader.ACKEPOCH数据包,解析对端的epoch、zxid
  4. 根据对端zxid判断是否需要同步数据、如何同步数据(txnlog/committedlog/snapshot)peerLastZxid = ss.getLastZxid();// 对端最新processZxid// 同步txnlog或committedlog数据,或者返回true使用SNAP方式同步快照数据boolean needSnap =syncFollower(peerLastZxid, learnerMaster);// 比对maxCommittedLog、minCommittedLog与peerLastZxid同步txnlog和committedlog数据或者使用SNAP同步数据// committedlog在内存里面,性能更好
  5. 同步txnlog和committedlog数据if((maxCommittedLog >= peerLastZxid)&&(minCommittedLog <= peerLastZxid)){// 对端lastZxid在minCommittedLog和maxCommittedLog之间// 直接使用committedlog同步Iterator<Proposal> itr = db.getCommittedLog().iterator(); currentZxid =queueCommittedProposals(itr, peerLastZxid,null, maxCommittedLog); needSnap =false;}elseif(peerLastZxid < minCommittedLog && txnLogSyncEnabled){// 使用txnlog和committedLog同步// 默认"最新snapshot文件字节数 * 0.33"long sizeLimit = db.calculateTxnLogSizeLimit();// 从txnlog查找数据,当数据字节数大于sizeLimit将返回空集,强制使用SNAP同步Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);if(txnLogItr.hasNext()){// 使用txnlog同步 currentZxid =queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);// txnlog同步未达到minCommittedLog表示txnlog和committedLog数据存在缺失// 将强制使用SNAP同步if(currentZxid < minCommittedLog){ currentZxid = peerLastZxid;// Clear out currently queued requests and revert to sending a snapshot queuedPackets.clear(); needOpPacket =true;}else{// 使用committedlog同步Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator(); currentZxid =queueCommittedProposals(committedLogItr, currentZxid,null, maxCommittedLog); needSnap =false;}}// 略}
  6. 启动转发功能// Start forwardingleaderLastZxid = learnerMaster.startForwarding(this, currentZxid);// 把toBeApplied数据(待commit状态)发出去// 添加到forwardingFollowers/observingLearners集
  7. 如果needSnap为true则需要发送SNAP请求让learner读取输入流加载dataTreelong zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();// 发送SNAP请求oa.writeRecord(newQuorumPacket(Leader.SNAP, zxidToSend,null,null),"packet");messageTracker.trackSent(Leader.SNAP);bufferedOutput.flush();// 将dataTree序列化发给learnerlearnerMaster.getZKDatabase().serializeSnapshot(oa);oa.writeString("BenWasHere","signature");bufferedOutput.flush();
  8. 发送NEWLEADER请求if(getVersion()<0x10000){QuorumPacket newLeaderQP =newQuorumPacket(Leader.NEWLEADER, newLeaderZxid,null,null); oa.writeRecord(newLeaderQP,"packet");}else{QuorumPacket newLeaderQP =newQuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(),null); queuedPackets.add(newLeaderQP);}
  9. 启动sendPackets线程:从queuedPackets取消息发给learner节点
  10. 等待NEWLEADER ACK响应
qp =newQuorumPacket();
ia.readRecord(qp,"packet");

messageTracker.trackReceived(qp.getType());if(qp.getType()!=Leader.ACK){return;}

learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());
  1. 等待zookeeperServer启动完成
  2. 发送UPTODATE请求,告知follower处于最新状态,并且可以开始响应客户端queuedPackets.add(newQuorumPacket(Leader.UPTODATE,-1,null,null));
  3. 启动while循环与客户端保持通信,处理ACK、PING、REVALIDATE、REQUEST等请求

Follower

包含了follower的逻辑。

followLeader方法

the main method called by the follower to follow the leader.

voidfollowLeader()throwsInterruptedException{
    self.end_fle =Time.currentElapsedTime();long electionTimeTaken = self.end_fle - self.start_fle;
    self.setElectionTimeTaken(electionTimeTaken);ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
    self.start_fle =0;
    self.end_fle =0;
    fzk.registerJMX(newFollowerBean(this, zk), self.jmxLocalPeerBean);long connectionTime =0;boolean completedSync =false;try{
        self.setZabState(QuorumPeer.ZabState.DISCOVERY);// 查找leader服务器QuorumServer leaderServer =findLeader();try{// 连接leader服务器connectToLeader(leaderServer.addr, leaderServer.hostname);
            connectionTime =System.currentTimeMillis();// 获取事务idlong newEpochZxid =registerWithLeader(Leader.FOLLOWERINFO);if(self.isReconfigStateChange()){thrownewException("learned about role change");}// zxid >> 32L得到epochlong newEpoch =ZxidUtils.getEpochFromZxid(newEpochZxid);if(newEpoch < self.getAcceptedEpoch()){thrownewIOException("Error: Epoch of leader is lower");}long startTime =Time.currentElapsedTime();
            self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
            self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);// 与leader同步数据syncWithLeader(newEpochZxid);
            self.setZabState(QuorumPeer.ZabState.BROADCAST);
            completedSync =true;long syncTime =Time.currentElapsedTime()- startTime;ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);if(self.getObserverMasterPort()>0){// 创建ObserverMaster用来链式复制,此处不做分析
                om =newObserverMaster(self, fzk, self.getObserverMasterPort());
                om.start();}else{
                om =null;}// 保持通信QuorumPacket qp =newQuorumPacket();while(this.isRunning()){readPacket(qp);processPacket(qp);// 处理leader的数据包}}catch(Exception e){// ...}}finally{// ...}}

connectToLeader方法

protectedvoidconnectToLeader(MultipleAddresses multiAddr,String hostname)throwsIOException{this.leaderAddr = multiAddr;Set<InetSocketAddress> addresses;if(self.isMultiAddressReachabilityCheckEnabled()){
        addresses = multiAddr.getAllReachableAddressesOrAll();}else{
        addresses = multiAddr.getAllAddresses();}ExecutorService executor =Executors.newFixedThreadPool(addresses.size());CountDownLatch latch =newCountDownLatch(addresses.size());AtomicReference<Socket> socket =newAtomicReference<>(null);// 使用LeaderConnector异步建立连接,此处考虑到了多地址的情况
    addresses.stream().map(address ->newLeaderConnector(address, socket, latch)).forEach(executor::submit);try{
        latch.await();}catch(InterruptedException e){}finally{// 关闭executor}if(socket.get()==null){thrownewIOException("Failed connect to "+ multiAddr);}else{
        sock = socket.get();
        sockBeingClosed.set(false);}// 认证 略
    self.authLearner.authenticate(sock, hostname);// 获取输入输出流
    leaderIs =BinaryInputArchive.getArchive(newBufferedInputStream(sock.getInputStream()));
    bufferedOutput =newBufferedOutputStream(sock.getOutputStream());
    leaderOs =BinaryOutputArchive.getArchive(bufferedOutput);// 启动发送线程,基于BlockingQueue的生产者消费者模式if(asyncSending){startSendingThread();}}

registerWithLeader方法

protectedlongregisterWithLeader(int pktType)throwsIOException{// 1. 先发送一个Leader.FOLLOWERINFO类型数据包:// Leader.FOLLOWERINFO, zxid, sid, protocolVersion, quorumVersionlong lastLoggedZxid = self.getLastLoggedZxid();QuorumPacket qp =newQuorumPacket();
    qp.setType(pktType);
    qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(),0));LearnerInfo li =newLearnerInfo(self.getMyId(),0x10000, self.getQuorumVerifier().getVersion());ByteArrayOutputStream bsid =newByteArrayOutputStream();BinaryOutputArchive boa =BinaryOutputArchive.getArchive(bsid);
    boa.writeRecord(li,"LearnerInfo");
    qp.setData(bsid.toByteArray());writePacket(qp,true);// 把数据包写出去// 2. 读取leader的Leader.LEADERINFO数据包readPacket(qp);finallong newEpoch =ZxidUtils.getEpochFromZxid(qp.getZxid());// 解析newEpochif(qp.getType()==Leader.LEADERINFO){// 使用1.0版本协议
        leaderProtocolVersion =ByteBuffer.wrap(qp.getData()).getInt();byte[] epochBytes =newbyte[4];finalByteBuffer wrappedEpochBytes =ByteBuffer.wrap(epochBytes);if(newEpoch > self.getAcceptedEpoch()){
            wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
            self.setAcceptedEpoch(newEpoch);// 设置acceptEpoch}elseif(newEpoch == self.getAcceptedEpoch()){
            wrappedEpochBytes.putInt(-1);}else{thrownewIOException("...");}// 3. 发送ACKEPOCH类型数据包:// 包含self.lastLoggedZxid和self.currentEpochQuorumPacket ackNewEpoch =newQuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes,null);writePacket(ackNewEpoch,true);returnZxidUtils.makeZxid(newEpoch,0);}else{// 低版本分支,略}}

syncWithLeader方法

  1. 读leader数据包- DIFF - 表示数据已经是最新,可以直接同步新数据- SNAP - 将leader输入流(leader的dataTree快照数据)反序列化到zkDbzk.getZKDatabase().deserializeSnapshot(leaderIs);zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());- TRUNC - 将数据truncate到指定位置boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
  2. 继续读leader数据包,leader可能使用txnlog或committedlog同步数据
  3. 同步数据并提交:- PROPOSAL - 提案数据会放入packetsNotCommitted集待处理- COMMIT/COMMITANDACTIVATE - 提交数据会放入packetsCommitted集待处理- INFORM/INFORMANDACTIVATE - 同上- NEWLEADER - leader已经停止同步数据,follower会takeSnapshot、setCurrentEpoch、将packetsNotCommitted都提交给zk、响应ACK// fzk.logRequest(p.hdr, p.rec, p.digest);publicvoidlogRequest(TxnHeader hdr,Record txn,TxnDigest digest){Request request =newRequest( hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); request.setTxnDigest(digest);if((request.zxid &0xffffffffL)!=0){ pendingTxns.add(request);// 待处理的事务集} syncProcessor.processRequest(request);// 持久化磁盘}- UPTODATE - leader会等待足够的follower响应ACK并且确定各种组件已启动之后,发送一个UPTODATE数据包,表示follower已经处于同步状态,停止同步,跳出循环
  4. 处理packetsNotCommitted和packetsCommitted集,处理事务或写磁盘FollowerZooKeeperServer fzk =(FollowerZooKeeperServer) zk;for(PacketInFlight p : packetsNotCommitted){ fzk.logRequest(p.hdr, p.rec, p.digest);}for(Long zxid : packetsCommitted){ fzk.commit(zxid);}// 使用RequestProcessor处理Request// 后续再详细介绍
  5. Observer会执行下面代码处理ObserverZooKeeperServer ozk =(ObserverZooKeeperServer) zk;for(PacketInFlight p : packetsNotCommitted){Long zxid = packetsCommitted.peekFirst();if(p.hdr.getZxid()!= zxid){// log warning message if there is no matching commit// old leader send outstanding proposal to observercontinue;} packetsCommitted.remove();Request request =newRequest( p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec,-1); request.setTxnDigest(p.digest); ozk.commitRequest(request);}

processPacket方法

在连接建立、数据处于同步状态后,follower会阻塞读取来自leader的数据包,之后使用processPacket方法处理:

// create a reusable packet to reduce gc impactQuorumPacket qp =newQuorumPacket();while(this.isRunning()){readPacket(qp);processPacket(qp);}

processPacket方法:

protectedvoidprocessPacket(QuorumPacket qp)throwsException{switch(qp.getType()){caseLeader.PING:ping(qp);break;caseLeader.PROPOSAL:ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);TxnLogEntry logEntry =SerializeUtils.deserializeTxn(qp.getData());TxnHeader hdr = logEntry.getHeader();Record txn = logEntry.getTxn();TxnDigest digest = logEntry.getDigest();if(hdr.getZxid()!= lastQueued +1){
            LOG.warn("Got zxid 0x{} expected 0x{}",Long.toHexString(hdr.getZxid()),Long.toHexString(lastQueued +1));}
        lastQueued = hdr.getZxid();if(hdr.getType()==OpCode.reconfig){SetDataTxn setDataTxn =(SetDataTxn) txn;QuorumVerifier qv = self.configFromString(newString(setDataTxn.getData(), UTF_8));
            self.setLastSeenQuorumVerifier(qv,true);}// 封装Request使用syncProcessor.processRequest(request)写磁盘
        fzk.logRequest(hdr, txn, digest);// 略break;caseLeader.COMMIT:ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);// 使用commitProcessor.commit(request)提交请求
        fzk.commit(qp.getZxid());// 略break;caseLeader.COMMITANDACTIVATE:// get the new configuration from the requestRequest request = fzk.pendingTxns.element();SetDataTxn setDataTxn =(SetDataTxn) request.getTxn();QuorumVerifier qv = self.configFromString(newString(setDataTxn.getData(), UTF_8));// get new designated leader from (current) leader's messageByteBuffer buffer =ByteBuffer.wrap(qp.getData());long suggestedLeaderId = buffer.getLong();finallong zxid = qp.getZxid();boolean majorChange = self.processReconfig(qv, suggestedLeaderId, zxid,true);// commit (writes the new config to ZK tree (/zookeeper/config)
        fzk.commit(zxid);// 略break;caseLeader.UPTODATE:// 正常情况下主从复制数据不应该出现这种类型数据包break;caseLeader.REVALIDATE:if(om ==null||!om.revalidateLearnerSession(qp)){revalidate(qp);}break;caseLeader.SYNC:
        fzk.sync();break;default:
        LOG.warn("Unknown packet type: {}",LearnerHandler.packetToString(qp));break;}}

Observer

observeLeader方法

voidobserveLeader()throwsException{
    zk.registerJMX(newObserverBean(this, zk), self.jmxLocalPeerBean);long connectTime =0;boolean completedSync =false;try{
        self.setZabState(QuorumPeer.ZabState.DISCOVERY);// 获取leader或一个observerMaster服务器QuorumServer master =findLearnerMaster();try{// 连接leader或observerMasterconnectToLeader(master.addr, master.hostname);
            connectTime =System.currentTimeMillis();long newLeaderZxid =registerWithLeader(Leader.OBSERVERINFO);if(self.isReconfigStateChange()){thrownewException("learned about role change");}finallong startTime =Time.currentElapsedTime();
            self.setLeaderAddressAndId(master.addr, master.getId());
            self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);syncWithLeader(newLeaderZxid);
            self.setZabState(QuorumPeer.ZabState.BROADCAST);
            completedSync =true;finallong syncTime =Time.currentElapsedTime()- startTime;ServerMetrics.getMetrics().OBSERVER_SYNC_TIME.add(syncTime);QuorumPacket qp =newQuorumPacket();while(this.isRunning()&& nextLearnerMaster.get()==null){readPacket(qp);processPacket(qp);}}catch(Exception e){closeSocket();// clear pending revalidations
            pendingRevalidations.clear();}}finally{
        currentLearnerMaster =null;
        zk.unregisterJMX(this);if(connectTime !=0){long connectionDuration =System.currentTimeMillis()- connectTime;
            messageTracker.dumpToLog(leaderAddr.toString());}}}

processPacket方法

protectedvoidprocessPacket(QuorumPacket qp)throwsException{TxnLogEntry logEntry;TxnHeader hdr;TxnDigest digest;Record txn;switch(qp.getType()){caseLeader.PING:ping(qp);break;caseLeader.PROPOSAL:
        LOG.warn("Ignoring proposal");break;caseLeader.COMMIT:
        LOG.warn("Ignoring commit");break;caseLeader.UPTODATE:
        LOG.error("Received an UPTODATE message after Observer started");break;caseLeader.REVALIDATE:revalidate(qp);break;caseLeader.SYNC:((ObserverZooKeeperServer) zk).sync();break;caseLeader.INFORM:ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
        logEntry =SerializeUtils.deserializeTxn(qp.getData());
        hdr = logEntry.getHeader();
        txn = logEntry.getTxn();
        digest = logEntry.getDigest();Request request =newRequest(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn,0);
        request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
        request.setTxnDigest(digest);ObserverZooKeeperServer obs =(ObserverZooKeeperServer) zk;
        obs.commitRequest(request);// 提交break;caseLeader.INFORMANDACTIVATE:// reconfig功能使用break;default:
        LOG.warn("Unknown packet type: {}",LearnerHandler.packetToString(qp));break;}}

Leader与Follower通信总结

Leader                                                                           Follower
                     FOLLOWERINFO/OBSERVERINFO数据包发送acceptEpoch
         <-------------------------------------------------------------------
                             leader计算newEpoch、newZxid

                             LEADERINFO数据包发送最新的zxid
         ------------------------------------------------------------------->
                               follower接受newEpoch

                     ACKEPOCH数据包发送lastLoggedZxid、currentEpoch
         <------------------------------------------------------------------
                                leader确定数据同步方式

                           DIFF/TRUNC/SNAP或者同步数据(loop)
         ------------------------------------------------------------------->

                                    NEWLEADER数据包
         ------------------------------------------------------------------->

                                       ACK数据包
         <------------------------------------------------------------------

                                    UPTODATE数据包
         ------------------------------------------------------------------->

                                    PROPOSAL数据包
         ------------------------------------------------------------------->

                                       ACK数据包
         <------------------------------------------------------------------

                                     COMMIT数据包
         ------------------------------------------------------------------->

                                       ACK数据包
         <------------------------------------------------------------------

本文转载自: https://blog.csdn.net/xuguofeng2016/article/details/135930571
版权归原作者 xuguofeng2016 所有, 如有侵权,请联系我们删除。

“zookeeper源码(07)leader、follower和observer”的评论:

还没有评论