Leader
构造方法
publicLeader(QuorumPeer self,LeaderZooKeeperServer zk)throwsIOException{this.self = self;this.proposalStats =newBufferStats();// 获取节点间通信地址Set<InetSocketAddress> addresses;if(self.getQuorumListenOnAllIPs()){
addresses = self.getQuorumAddress().getWildcardAddresses();}else{
addresses = self.getQuorumAddress().getAllAddresses();}// 创建ServerSocket并bind地址,add到serverSockets集,启动LearnerCnxAcceptor时使用
addresses.stream().map(address ->createServerSocket(address, self.shouldUsePortUnification(), self.isSslQuorum())).filter(Optional::isPresent).map(Optional::get).forEach(serverSockets::add);this.zk = zk;}
lead方法
QuorumPeer使用lead方法启动leader节点,从lead方法入手分析leader流程并分析重要的方法:
voidlead()throwsIOException,InterruptedException{
self.end_fle =Time.currentElapsedTime();long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
self.start_fle =0;
self.end_fle =0;
zk.registerJMX(newLeaderBean(this, zk), self.jmxLocalPeerBean);try{
self.setZabState(QuorumPeer.ZabState.DISCOVERY);
self.tick.set(0);// 使用ZooKeeperServer的loadData方法加载db数据// 加载数据、清理session、生成快照(takeSnapshot)
zk.loadData();
leaderStateSummary =newStateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());// 启动线程接收Learner连接,创建LearnerHandler与客户端通信
cnxAcceptor =newLearnerCnxAcceptor();
cnxAcceptor.start();// 获取上一次同步最终epoch并计算本次的epoch和zxidlong epoch =getEpochToPropose(self.getMyId(), self.getAcceptedEpoch());// 设置新的zxid
zk.setZxid(ZxidUtils.makeZxid(epoch,0));synchronized(this){
lastProposed = zk.getZxid();}
newLeaderProposal.packet =newQuorumPacket(NEWLEADER, zk.getZxid(),null,null);QuorumVerifier lastSeenQV = self.getLastSeenQuorumVerifier();QuorumVerifier curQV = self.getQuorumVerifier();if(curQV.getVersion()==0&& curQV.getVersion()== lastSeenQV.getVersion()){// qv.version == 0try{QuorumVerifier newQV = self.configFromString(curQV.toString());
newQV.setVersion(zk.getZxid());
self.setLastSeenQuorumVerifier(newQV,true);}catch(Exception e){thrownewIOException(e);}}
newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());if(self.getLastSeenQuorumVerifier().getVersion()> self.getQuorumVerifier().getVersion()){
newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());}// 等待足够数量的ACKEPOCH数据包,表示follower确认newEpoch// follower会把lastLoggedZxid、currentEpoch发送过来,leader用这些参数判断同步数据的方式waitForEpochAck(self.getMyId(), leaderStateSummary);
self.setCurrentEpoch(epoch);// 设置新的currentEpoch
self.setLeaderAddressAndId(self.getQuorumAddress(), self.getMyId());
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);try{// 等待follower的newLeaderAckwaitForNewLeaderAck(self.getMyId(), zk.getZxid());}catch(InterruptedException e){// 略return;}// 启动zookeeperServerstartZkServer();
self.setZabState(QuorumPeer.ZabState.BROADCAST);
self.adminServer.setZooKeeperServer(zk);// We ping twice a tick, so we only update the tick every other iterationboolean tickSkip =true;String shutdownMessage =null;while(true){synchronized(this){long start =Time.currentElapsedTime();long cur = start;long end = start + self.tickTime /2;// 等待tickTime / 2毫秒while(cur < end){wait(end - cur);
cur =Time.currentElapsedTime();}if(!tickSkip){
self.tick.incrementAndGet();}// 用来判断learner同步状态SyncedLearnerTracker syncedAckSet =newSyncedLearnerTracker();
syncedAckSet.addQuorumVerifier(self.getQuorumVerifier());if(self.getLastSeenQuorumVerifier()!=null&&
self.getLastSeenQuorumVerifier().getVersion()> self.getQuorumVerifier().getVersion()){
syncedAckSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());}
syncedAckSet.addAck(self.getMyId());// 查询learner的ack状态for(LearnerHandler f :getLearners()){if(f.synced()){
syncedAckSet.addAck(f.getSid());}}if(!this.isRunning()){// shutdownbreak;}// 判断超半数learner已是同步状态// 1个tickTime周期判断一次if(!tickSkip &&!syncedAckSet.hasAllQuorums()&&!(self.getQuorumVerifier().overrideQuorumDecision(getForwardingFollowers())&&
self.getQuorumVerifier().revalidateOutstandingProp(this,newArrayList<>(outstandingProposals.values()), lastCommitted))){// Lost quorum of last committed and/or last proposed
shutdownMessage ="Not sufficient followers synced";break;}
tickSkip =!tickSkip;}// ping learner// 1个tickTime周期ping两次for(LearnerHandler f :getLearners()){
f.ping();}}if(shutdownMessage !=null){// leader goes in looking stateshutdown(shutdownMessage);}}finally{
zk.unregisterJMX(this);}}
getEpochToPropose方法
获取上一次同步的最终epoch并计算zxid的值:
publiclonggetEpochToPropose(long sid,long lastAcceptedEpoch)throwsInterruptedException,IOException{synchronized(connectingFollowers){if(!waitingForNewEpoch){return epoch;}if(lastAcceptedEpoch >= epoch){
epoch = lastAcceptedEpoch +1;// 更新最新epoch}if(isParticipant(sid)){
connectingFollowers.add(sid);}QuorumVerifier verifier = self.getQuorumVerifier();// 连接的follower超过了半数if(connectingFollowers.contains(self.getMyId())&& verifier.containsQuorum(connectingFollowers)){
waitingForNewEpoch =false;
self.setAcceptedEpoch(epoch);// 设置新的epoch
connectingFollowers.notifyAll();}else{long start =Time.currentElapsedTime();if(sid == self.getMyId()){
timeStartWaitForEpoch = start;}long cur = start;long end = start + self.getInitLimit()* self.getTickTime();// 等待initLimit*tickTime毫秒,如果还是waitingForNewEpoch状态抛错,会触发重新选举while(waitingForNewEpoch && cur < end &&!quitWaitForEpoch){
connectingFollowers.wait(end - cur);
cur =Time.currentElapsedTime();}if(waitingForNewEpoch){thrownewInterruptedException("Timeout while waiting for epoch from quorum");}}return epoch;}}
waitForEpochAck方法
等待足够数量的ACKEPOCH数据包,表示follower确认newEpoch,follower会把lastLoggedZxid、currentEpoch发送过来,leader用这些参数判断同步数据的方式:
publicvoidwaitForEpochAck(long id,StateSummary ss)throwsIOException,InterruptedException{synchronized(electingFollowers){if(electionFinished){return;}// 略QuorumVerifier verifier = self.getQuorumVerifier();if(electingFollowers.contains(self.getMyId())&& verifier.containsQuorum(electingFollowers)){
electionFinished =true;
electingFollowers.notifyAll();}else{long start =Time.currentElapsedTime();long cur = start;long end = start + self.getInitLimit()* self.getTickTime();while(!electionFinished && cur < end){
electingFollowers.wait(end - cur);
cur =Time.currentElapsedTime();}if(!electionFinished){thrownewInterruptedException("Timeout while waiting for epoch to be acked by quorum");}}}}
waitForNewLeaderAck方法
等待足够数量的Leader.ACK请求上来,之后才能开始正常通信:
publicvoidwaitForNewLeaderAck(long sid,long zxid)throwsInterruptedException{synchronized(newLeaderProposal.qvAcksetPairs){if(quorumFormed){return;}long currentZxid = newLeaderProposal.packet.getZxid();if(zxid != currentZxid){
LOG.error("NEWLEADER ACK from sid: {} is from a different epoch - current 0x{} received 0x{}",
sid,Long.toHexString(currentZxid),Long.toHexString(zxid));return;}// Note that addAck already checks that the learner is a PARTICIPANT.
newLeaderProposal.addAck(sid);if(newLeaderProposal.hasAllQuorums()){
quorumFormed =true;
newLeaderProposal.qvAcksetPairs.notifyAll();}else{long start =Time.currentElapsedTime();long cur = start;long end = start + self.getInitLimit()* self.getTickTime();while(!quorumFormed && cur < end){
newLeaderProposal.qvAcksetPairs.wait(end - cur);
cur =Time.currentElapsedTime();}if(!quorumFormed){thrownewInterruptedException("Timeout while waiting for NEWLEADER to be acked by quorum");}}}}
LearnerCnxAcceptor类
启动LearnerCnxAcceptor线程:
// Start thread that waits for connection requests from new followers.
cnxAcceptor =newLearnerCnxAcceptor();
cnxAcceptor.start();
LearnerCnxAcceptor类:
publicvoidrun(){if(!stop.get()&&!serverSockets.isEmpty()){ExecutorService executor =Executors.newFixedThreadPool(serverSockets.size());CountDownLatch latch =newCountDownLatch(serverSockets.size());// 启动LearnerCnxAcceptorHandler
serverSockets.forEach(serverSocket ->
executor.submit(newLearnerCnxAcceptorHandler(serverSocket, latch)));try{
latch.await();}catch(InterruptedException ie){}finally{// 关闭连接、线程池}}}
LearnerCnxAcceptorHandler类启动监听,接受连接:
classLearnerCnxAcceptorHandlerimplementsRunnable{privateServerSocket serverSocket;privateCountDownLatch latch;LearnerCnxAcceptorHandler(ServerSocket serverSocket,CountDownLatch latch){this.serverSocket = serverSocket;this.latch = latch;}@Overridepublicvoidrun(){try{while(!stop.get()){acceptConnections();// 接受连接}}catch(Exception e){// 关闭}finally{
latch.countDown();// countdown到0会唤醒LearnerCnxAcceptor}}privatevoidacceptConnections()throwsIOException{Socket socket =null;boolean error =false;try{
socket = serverSocket.accept();// 接受客户端连接
socket.setSoTimeout(self.tickTime * self.initLimit);// timeout
socket.setTcpNoDelay(nodelay);BufferedInputStream is =newBufferedInputStream(socket.getInputStream());// 封装LearnerHandler对象,与客户端通信LearnerHandler fh =newLearnerHandler(socket, is,Leader.this);
fh.start();}catch(Exception e){// 略}finally{// 略}}}
LearnerHandler
与客户端通信。
关键字段
protectedfinalSocket sock;// 客户端socket// Leader对象finalLearnerMaster learnerMaster;// 给learner的唯一标识protectedlong sid =0;// 发送队列finalLinkedBlockingQueue<QuorumPacket> queuedPackets =newLinkedBlockingQueue<>();// zxidprotectedvolatilelong lastZxid =-1;// 输出输入流privateBinaryInputArchive ia;privateBinaryOutputArchive oa;privatefinalBufferedInputStream bufferedInput;privateBufferedOutputStream bufferedOutput;// learner类型 PARTICIPANT/OBSERVERprivateLearnerType learnerType =LearnerType.PARTICIPANT;
run方法
- 接收Leader.FOLLOWERINFO或Leader.OBSERVERINFO数据包,解析type、sid等关键字段,计算newEpoch和newLeaderZxid
- 发送Leader.LEADERINFO数据包,包含newLeaderZxid值
- 读取Leader.ACKEPOCH数据包,解析对端的epoch、zxid
- 根据对端zxid判断是否需要同步数据、如何同步数据(txnlog/committedlog/snapshot)
peerLastZxid = ss.getLastZxid();// 对端最新processZxid// 同步txnlog或committedlog数据,或者返回true使用SNAP方式同步快照数据boolean needSnap =syncFollower(peerLastZxid, learnerMaster);// 比对maxCommittedLog、minCommittedLog与peerLastZxid同步txnlog和committedlog数据或者使用SNAP同步数据// committedlog在内存里面,性能更好
- 同步txnlog和committedlog数据
if((maxCommittedLog >= peerLastZxid)&&(minCommittedLog <= peerLastZxid)){// 对端lastZxid在minCommittedLog和maxCommittedLog之间// 直接使用committedlog同步Iterator<Proposal> itr = db.getCommittedLog().iterator(); currentZxid =queueCommittedProposals(itr, peerLastZxid,null, maxCommittedLog); needSnap =false;}elseif(peerLastZxid < minCommittedLog && txnLogSyncEnabled){// 使用txnlog和committedLog同步// 默认"最新snapshot文件字节数 * 0.33"long sizeLimit = db.calculateTxnLogSizeLimit();// 从txnlog查找数据,当数据字节数大于sizeLimit将返回空集,强制使用SNAP同步Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(peerLastZxid, sizeLimit);if(txnLogItr.hasNext()){// 使用txnlog同步 currentZxid =queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog);// txnlog同步未达到minCommittedLog表示txnlog和committedLog数据存在缺失// 将强制使用SNAP同步if(currentZxid < minCommittedLog){ currentZxid = peerLastZxid;// Clear out currently queued requests and revert to sending a snapshot queuedPackets.clear(); needOpPacket =true;}else{// 使用committedlog同步Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator(); currentZxid =queueCommittedProposals(committedLogItr, currentZxid,null, maxCommittedLog); needSnap =false;}}// 略}
- 启动转发功能
// Start forwardingleaderLastZxid = learnerMaster.startForwarding(this, currentZxid);// 把toBeApplied数据(待commit状态)发出去// 添加到forwardingFollowers/observingLearners集
- 如果needSnap为true则需要发送SNAP请求让learner读取输入流加载dataTree
long zxidToSend = learnerMaster.getZKDatabase().getDataTreeLastProcessedZxid();// 发送SNAP请求oa.writeRecord(newQuorumPacket(Leader.SNAP, zxidToSend,null,null),"packet");messageTracker.trackSent(Leader.SNAP);bufferedOutput.flush();// 将dataTree序列化发给learnerlearnerMaster.getZKDatabase().serializeSnapshot(oa);oa.writeString("BenWasHere","signature");bufferedOutput.flush();
- 发送NEWLEADER请求
if(getVersion()<0x10000){QuorumPacket newLeaderQP =newQuorumPacket(Leader.NEWLEADER, newLeaderZxid,null,null); oa.writeRecord(newLeaderQP,"packet");}else{QuorumPacket newLeaderQP =newQuorumPacket(Leader.NEWLEADER, newLeaderZxid, learnerMaster.getQuorumVerifierBytes(),null); queuedPackets.add(newLeaderQP);}
- 启动sendPackets线程:从queuedPackets取消息发给learner节点
- 等待NEWLEADER ACK响应
qp =newQuorumPacket();
ia.readRecord(qp,"packet");
messageTracker.trackReceived(qp.getType());if(qp.getType()!=Leader.ACK){return;}
learnerMaster.waitForNewLeaderAck(getSid(), qp.getZxid());
- 等待zookeeperServer启动完成
- 发送UPTODATE请求,告知follower处于最新状态,并且可以开始响应客户端
queuedPackets.add(newQuorumPacket(Leader.UPTODATE,-1,null,null));
- 启动while循环与客户端保持通信,处理ACK、PING、REVALIDATE、REQUEST等请求
Follower
包含了follower的逻辑。
followLeader方法
the main method called by the follower to follow the leader.
voidfollowLeader()throwsInterruptedException{
self.end_fle =Time.currentElapsedTime();long electionTimeTaken = self.end_fle - self.start_fle;
self.setElectionTimeTaken(electionTimeTaken);ServerMetrics.getMetrics().ELECTION_TIME.add(electionTimeTaken);
self.start_fle =0;
self.end_fle =0;
fzk.registerJMX(newFollowerBean(this, zk), self.jmxLocalPeerBean);long connectionTime =0;boolean completedSync =false;try{
self.setZabState(QuorumPeer.ZabState.DISCOVERY);// 查找leader服务器QuorumServer leaderServer =findLeader();try{// 连接leader服务器connectToLeader(leaderServer.addr, leaderServer.hostname);
connectionTime =System.currentTimeMillis();// 获取事务idlong newEpochZxid =registerWithLeader(Leader.FOLLOWERINFO);if(self.isReconfigStateChange()){thrownewException("learned about role change");}// zxid >> 32L得到epochlong newEpoch =ZxidUtils.getEpochFromZxid(newEpochZxid);if(newEpoch < self.getAcceptedEpoch()){thrownewIOException("Error: Epoch of leader is lower");}long startTime =Time.currentElapsedTime();
self.setLeaderAddressAndId(leaderServer.addr, leaderServer.getId());
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);// 与leader同步数据syncWithLeader(newEpochZxid);
self.setZabState(QuorumPeer.ZabState.BROADCAST);
completedSync =true;long syncTime =Time.currentElapsedTime()- startTime;ServerMetrics.getMetrics().FOLLOWER_SYNC_TIME.add(syncTime);if(self.getObserverMasterPort()>0){// 创建ObserverMaster用来链式复制,此处不做分析
om =newObserverMaster(self, fzk, self.getObserverMasterPort());
om.start();}else{
om =null;}// 保持通信QuorumPacket qp =newQuorumPacket();while(this.isRunning()){readPacket(qp);processPacket(qp);// 处理leader的数据包}}catch(Exception e){// ...}}finally{// ...}}
connectToLeader方法
protectedvoidconnectToLeader(MultipleAddresses multiAddr,String hostname)throwsIOException{this.leaderAddr = multiAddr;Set<InetSocketAddress> addresses;if(self.isMultiAddressReachabilityCheckEnabled()){
addresses = multiAddr.getAllReachableAddressesOrAll();}else{
addresses = multiAddr.getAllAddresses();}ExecutorService executor =Executors.newFixedThreadPool(addresses.size());CountDownLatch latch =newCountDownLatch(addresses.size());AtomicReference<Socket> socket =newAtomicReference<>(null);// 使用LeaderConnector异步建立连接,此处考虑到了多地址的情况
addresses.stream().map(address ->newLeaderConnector(address, socket, latch)).forEach(executor::submit);try{
latch.await();}catch(InterruptedException e){}finally{// 关闭executor}if(socket.get()==null){thrownewIOException("Failed connect to "+ multiAddr);}else{
sock = socket.get();
sockBeingClosed.set(false);}// 认证 略
self.authLearner.authenticate(sock, hostname);// 获取输入输出流
leaderIs =BinaryInputArchive.getArchive(newBufferedInputStream(sock.getInputStream()));
bufferedOutput =newBufferedOutputStream(sock.getOutputStream());
leaderOs =BinaryOutputArchive.getArchive(bufferedOutput);// 启动发送线程,基于BlockingQueue的生产者消费者模式if(asyncSending){startSendingThread();}}
registerWithLeader方法
protectedlongregisterWithLeader(int pktType)throwsIOException{// 1. 先发送一个Leader.FOLLOWERINFO类型数据包:// Leader.FOLLOWERINFO, zxid, sid, protocolVersion, quorumVersionlong lastLoggedZxid = self.getLastLoggedZxid();QuorumPacket qp =newQuorumPacket();
qp.setType(pktType);
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(),0));LearnerInfo li =newLearnerInfo(self.getMyId(),0x10000, self.getQuorumVerifier().getVersion());ByteArrayOutputStream bsid =newByteArrayOutputStream();BinaryOutputArchive boa =BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li,"LearnerInfo");
qp.setData(bsid.toByteArray());writePacket(qp,true);// 把数据包写出去// 2. 读取leader的Leader.LEADERINFO数据包readPacket(qp);finallong newEpoch =ZxidUtils.getEpochFromZxid(qp.getZxid());// 解析newEpochif(qp.getType()==Leader.LEADERINFO){// 使用1.0版本协议
leaderProtocolVersion =ByteBuffer.wrap(qp.getData()).getInt();byte[] epochBytes =newbyte[4];finalByteBuffer wrappedEpochBytes =ByteBuffer.wrap(epochBytes);if(newEpoch > self.getAcceptedEpoch()){
wrappedEpochBytes.putInt((int) self.getCurrentEpoch());
self.setAcceptedEpoch(newEpoch);// 设置acceptEpoch}elseif(newEpoch == self.getAcceptedEpoch()){
wrappedEpochBytes.putInt(-1);}else{thrownewIOException("...");}// 3. 发送ACKEPOCH类型数据包:// 包含self.lastLoggedZxid和self.currentEpochQuorumPacket ackNewEpoch =newQuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes,null);writePacket(ackNewEpoch,true);returnZxidUtils.makeZxid(newEpoch,0);}else{// 低版本分支,略}}
syncWithLeader方法
- 读leader数据包- DIFF - 表示数据已经是最新,可以直接同步新数据- SNAP - 将leader输入流(leader的dataTree快照数据)反序列化到zkDb
zk.getZKDatabase().deserializeSnapshot(leaderIs);zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
- TRUNC - 将数据truncate到指定位置boolean truncated = zk.getZKDatabase().truncateLog(qp.getZxid());zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
- 继续读leader数据包,leader可能使用txnlog或committedlog同步数据
- 同步数据并提交:- PROPOSAL - 提案数据会放入packetsNotCommitted集待处理- COMMIT/COMMITANDACTIVATE - 提交数据会放入packetsCommitted集待处理- INFORM/INFORMANDACTIVATE - 同上- NEWLEADER - leader已经停止同步数据,follower会takeSnapshot、setCurrentEpoch、将packetsNotCommitted都提交给zk、响应ACK
// fzk.logRequest(p.hdr, p.rec, p.digest);publicvoidlogRequest(TxnHeader hdr,Record txn,TxnDigest digest){Request request =newRequest( hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid()); request.setTxnDigest(digest);if((request.zxid &0xffffffffL)!=0){ pendingTxns.add(request);// 待处理的事务集} syncProcessor.processRequest(request);// 持久化磁盘}
- UPTODATE - leader会等待足够的follower响应ACK并且确定各种组件已启动之后,发送一个UPTODATE数据包,表示follower已经处于同步状态,停止同步,跳出循环 - 处理packetsNotCommitted和packetsCommitted集,处理事务或写磁盘
FollowerZooKeeperServer fzk =(FollowerZooKeeperServer) zk;for(PacketInFlight p : packetsNotCommitted){ fzk.logRequest(p.hdr, p.rec, p.digest);}for(Long zxid : packetsCommitted){ fzk.commit(zxid);}// 使用RequestProcessor处理Request// 后续再详细介绍
- Observer会执行下面代码处理
ObserverZooKeeperServer ozk =(ObserverZooKeeperServer) zk;for(PacketInFlight p : packetsNotCommitted){Long zxid = packetsCommitted.peekFirst();if(p.hdr.getZxid()!= zxid){// log warning message if there is no matching commit// old leader send outstanding proposal to observercontinue;} packetsCommitted.remove();Request request =newRequest( p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec,-1); request.setTxnDigest(p.digest); ozk.commitRequest(request);}
processPacket方法
在连接建立、数据处于同步状态后,follower会阻塞读取来自leader的数据包,之后使用processPacket方法处理:
// create a reusable packet to reduce gc impactQuorumPacket qp =newQuorumPacket();while(this.isRunning()){readPacket(qp);processPacket(qp);}
processPacket方法:
protectedvoidprocessPacket(QuorumPacket qp)throwsException{switch(qp.getType()){caseLeader.PING:ping(qp);break;caseLeader.PROPOSAL:ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);TxnLogEntry logEntry =SerializeUtils.deserializeTxn(qp.getData());TxnHeader hdr = logEntry.getHeader();Record txn = logEntry.getTxn();TxnDigest digest = logEntry.getDigest();if(hdr.getZxid()!= lastQueued +1){
LOG.warn("Got zxid 0x{} expected 0x{}",Long.toHexString(hdr.getZxid()),Long.toHexString(lastQueued +1));}
lastQueued = hdr.getZxid();if(hdr.getType()==OpCode.reconfig){SetDataTxn setDataTxn =(SetDataTxn) txn;QuorumVerifier qv = self.configFromString(newString(setDataTxn.getData(), UTF_8));
self.setLastSeenQuorumVerifier(qv,true);}// 封装Request使用syncProcessor.processRequest(request)写磁盘
fzk.logRequest(hdr, txn, digest);// 略break;caseLeader.COMMIT:ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);// 使用commitProcessor.commit(request)提交请求
fzk.commit(qp.getZxid());// 略break;caseLeader.COMMITANDACTIVATE:// get the new configuration from the requestRequest request = fzk.pendingTxns.element();SetDataTxn setDataTxn =(SetDataTxn) request.getTxn();QuorumVerifier qv = self.configFromString(newString(setDataTxn.getData(), UTF_8));// get new designated leader from (current) leader's messageByteBuffer buffer =ByteBuffer.wrap(qp.getData());long suggestedLeaderId = buffer.getLong();finallong zxid = qp.getZxid();boolean majorChange = self.processReconfig(qv, suggestedLeaderId, zxid,true);// commit (writes the new config to ZK tree (/zookeeper/config)
fzk.commit(zxid);// 略break;caseLeader.UPTODATE:// 正常情况下主从复制数据不应该出现这种类型数据包break;caseLeader.REVALIDATE:if(om ==null||!om.revalidateLearnerSession(qp)){revalidate(qp);}break;caseLeader.SYNC:
fzk.sync();break;default:
LOG.warn("Unknown packet type: {}",LearnerHandler.packetToString(qp));break;}}
Observer
observeLeader方法
voidobserveLeader()throwsException{
zk.registerJMX(newObserverBean(this, zk), self.jmxLocalPeerBean);long connectTime =0;boolean completedSync =false;try{
self.setZabState(QuorumPeer.ZabState.DISCOVERY);// 获取leader或一个observerMaster服务器QuorumServer master =findLearnerMaster();try{// 连接leader或observerMasterconnectToLeader(master.addr, master.hostname);
connectTime =System.currentTimeMillis();long newLeaderZxid =registerWithLeader(Leader.OBSERVERINFO);if(self.isReconfigStateChange()){thrownewException("learned about role change");}finallong startTime =Time.currentElapsedTime();
self.setLeaderAddressAndId(master.addr, master.getId());
self.setZabState(QuorumPeer.ZabState.SYNCHRONIZATION);syncWithLeader(newLeaderZxid);
self.setZabState(QuorumPeer.ZabState.BROADCAST);
completedSync =true;finallong syncTime =Time.currentElapsedTime()- startTime;ServerMetrics.getMetrics().OBSERVER_SYNC_TIME.add(syncTime);QuorumPacket qp =newQuorumPacket();while(this.isRunning()&& nextLearnerMaster.get()==null){readPacket(qp);processPacket(qp);}}catch(Exception e){closeSocket();// clear pending revalidations
pendingRevalidations.clear();}}finally{
currentLearnerMaster =null;
zk.unregisterJMX(this);if(connectTime !=0){long connectionDuration =System.currentTimeMillis()- connectTime;
messageTracker.dumpToLog(leaderAddr.toString());}}}
processPacket方法
protectedvoidprocessPacket(QuorumPacket qp)throwsException{TxnLogEntry logEntry;TxnHeader hdr;TxnDigest digest;Record txn;switch(qp.getType()){caseLeader.PING:ping(qp);break;caseLeader.PROPOSAL:
LOG.warn("Ignoring proposal");break;caseLeader.COMMIT:
LOG.warn("Ignoring commit");break;caseLeader.UPTODATE:
LOG.error("Received an UPTODATE message after Observer started");break;caseLeader.REVALIDATE:revalidate(qp);break;caseLeader.SYNC:((ObserverZooKeeperServer) zk).sync();break;caseLeader.INFORM:ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
logEntry =SerializeUtils.deserializeTxn(qp.getData());
hdr = logEntry.getHeader();
txn = logEntry.getTxn();
digest = logEntry.getDigest();Request request =newRequest(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn,0);
request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
request.setTxnDigest(digest);ObserverZooKeeperServer obs =(ObserverZooKeeperServer) zk;
obs.commitRequest(request);// 提交break;caseLeader.INFORMANDACTIVATE:// reconfig功能使用break;default:
LOG.warn("Unknown packet type: {}",LearnerHandler.packetToString(qp));break;}}
Leader与Follower通信总结
Leader Follower
FOLLOWERINFO/OBSERVERINFO数据包发送acceptEpoch
<-------------------------------------------------------------------
leader计算newEpoch、newZxid
LEADERINFO数据包发送最新的zxid
------------------------------------------------------------------->
follower接受newEpoch
ACKEPOCH数据包发送lastLoggedZxid、currentEpoch
<------------------------------------------------------------------
leader确定数据同步方式
DIFF/TRUNC/SNAP或者同步数据(loop)
------------------------------------------------------------------->
NEWLEADER数据包
------------------------------------------------------------------->
ACK数据包
<------------------------------------------------------------------
UPTODATE数据包
------------------------------------------------------------------->
PROPOSAL数据包
------------------------------------------------------------------->
ACK数据包
<------------------------------------------------------------------
COMMIT数据包
------------------------------------------------------------------->
ACK数据包
<------------------------------------------------------------------
版权归原作者 xuguofeng2016 所有, 如有侵权,请联系我们删除。