0


ZooKeeper客户端源码(二)——向服务端发起请求(顺序响应+同步阻塞+异步回调)

首发CSDN:徐同学呀,原创不易,转载请注明源链接。我是徐同学,用心输出高质量文章,希望对你有所帮助。 本篇源码基于

ZooKeeper3.7.0

版本。

向服务端发起请求

一、向服务端发起请求

客户端与服务端通信的最小单元是

Packet

。所有请求在发送给服务端之前,都需要先构建一个

Packet

,再将

Packet

提交给请求处理队列

outgoingQueue

并唤醒

SendThread

线程,最后处理写事件,从

outgoingQueue

中取出

Packet

,将其序列化写入网络发送缓冲区。

1、构建协议包

Packet

中包含请求头、请求体、响应头、响应体、本地回调函数、

watcher

注册等信息。

(1)请求体和响应体

不同的请求API有不同的请求体和响应体,比如

getData

的请求体是

GetDataRequest

,响应体是

GetDataResponse

setData

的请求体是

SetDataRequest

,响应体是

SetDataResponse

如下是不同请求体和响应体的类关系图:

Record-Request.drawio

Record-Response.drawio

如下图是常见的几个请求体和响应体的内容结构:

请求体和响应体内容

(2)请求头

请求头

RequestHeader

定义了操作类型

OpCode

和请求序号

xid

  • 最常见OpCodecreate=1delete=2exists=3getData=5setData=6ping=11等,详细参考org.apache.zookeeper.ZooDefs.OpCode
  • xid用于记录客户端请求发起的先后序号,用来确保单个客户端请求的响应顺序。正常从1开始自增,但是也有几个特殊的xid定义,NOTIFICATION_XID=-1``````watcher通知信息,PING_XID=-2心跳请求,AUTHPACKET_XID=-4授权数据包请求,SET_WATCHES_XID=-8设置watcher请求。

根据协议规定,除非是“会话创建”请求,其他所有的客户端请求都会带上请求头。

(3)getData源码示例

getData

源码为例,其他类似:

ZooKeeper.getData

2、发送数据包

(1)提交给outgoingQueue

构建好

Packet

,就提交给

outgoingQueue

队列,然后通知

SendThread

线程:

ClientCnxn.queuePacket

(2)SendThread处理写事件

SendThread

线程轮询

SelectionKey

列表,处理写事件:

ClientCnxnSocketNIO.doIO

除了会话建立请求、心跳请求,其他正常请求发送完毕后,都需要添加到

pendingQueue

队列,其目的是按顺序处理响应。

(3)网络包序列化

真正要发给服务端的只有请求头和请求体以及长度等少量信息。

请求协议组成

如下是

Packet

序列化过程:

ClientCnxn.Packet.createBB

发送的网络包,需要序列化为

byte

数组,而

ZooKeeper

并没有使用多么高深的序列化技术,实则还是用的Java原生的序列化和反序列化技术

ByteArrayOutputStream
  • DataOutputStream
    

二、接收服务端响应

1、按顺序处理响应

正常请求,如

getData

setData

create

delete

等的响应都需要按顺序处理。接收服务端发来的响应信息按顺序和

pendingQueue

队列中的

Packet

对比

xid

是否相等,相等就是同一个请求,不相等就说明顺序乱了,抛出异常。

如下是处理响应的部分源码:

// org.apache.zookeeper.ClientCnxn.SendThread#readResponsevoidreadResponse(ByteBuffer incomingBuffer)throws IOException {
    ByteBufferInputStream bbis =newByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ReplyHeader replyHdr =newReplyHeader();// 解码
    replyHdr.deserialize(bbia,"header");// 暂时省略 Xid事件的处理// 必须按顺序处理响应,pendingQueue 按顺序出队列
    Packet packet;synchronized(pendingQueue){if(pendingQueue.size()==0){thrownewIOException("Nothing in the queue, but got "+ replyHdr.getXid());}// 从 pendingQueue 中取出 packet
        packet = pendingQueue.remove();}/*
     * Since requests are processed in order, we better get a response to the first request!
     */try{// 对比xid是否一致,若不一致则抛出Xid out of order异常if(packet.requestHeader.getXid()!= replyHdr.getXid()){
            packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());thrownewIOException("Xid out of order. Got Xid "+ replyHdr.getXid()+" with err "+ replyHdr.getErr()+" expected Xid "+ packet.requestHeader.getXid()+" for a packet with details: "+ packet);}// 填充 replyHeader
        packet.replyHeader.setXid(replyHdr.getXid());
        packet.replyHeader.setErr(replyHdr.getErr());
        packet.replyHeader.setZxid(replyHdr.getZxid());if(replyHdr.getZxid()>0){
            lastZxid = replyHdr.getZxid();}if(packet.response != null && replyHdr.getErr()==0){// 反序列化 response
            packet.response.deserialize(bbia,"response");}

        LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);}finally{// 进行packet处理的收尾工作,如注册watcher、唤醒同步阻塞的主线程、触发本地回调函数等finishPacket(packet);}}

从网络底层读取数据,然后反序列化出响应头和响应体。

响应协议

2、唤醒同步阻塞

请求的同步阻塞方式到底如何实现的呢?

以同步阻塞方式等待响应结果的请求API,都是调用方法

org.apache.zookeeper.ClientCnxn#submitRequest

submitRequest

packet

提交给

outgoingQueue

队列后,就调用

packet.wait()

阻塞当前线程。接收到响应,解析对比完

packet

后,调用

finishPacket()

方法进行收尾工作,如果没有设置

Callback

,就调用

packet.notifyAll()

唤醒刚才阻塞的线程。

finishPacket

3、异步回调通知

以异步回调通知响应结果,就直接调用的

org.apache.zookeeper.ClientCnxn#queuePacket

,直接将

packet

添加到

outgoingQueue

队列。

在调用

finishPacket()

方法进行收尾工作时,判断如果设置了

Callback

,就将

packet

交给

EventThread

进行回调通知。

首先将

packet

添加到

EventThread

线程的

waitingEvents

队列,然后

EventThread

线程循环遍历

waitingEvents

队列取出

packet

处理:

EventThread.queuePacket

EventThread.run

processEvent部分源码

三、总结与参考

一图以蔽之。

请求发起和响应过程.drawio
推荐阅读:《从Paxos到Zookeeper:分布式一致性原理与实践》倪超著。

如若文章有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。


本文转载自: https://blog.csdn.net/weixin_36586120/article/details/123455097
版权归原作者 徐同学呀 所有, 如有侵权,请联系我们删除。

“ZooKeeper客户端源码(二)——向服务端发起请求(顺序响应+同步阻塞+异步回调)”的评论:

还没有评论