首发CSDN:徐同学呀,原创不易,转载请注明源链接。我是徐同学,用心输出高质量文章,希望对你有所帮助。 本篇源码基于
ZooKeeper3.7.0
版本。
一、向服务端发起请求
客户端与服务端通信的最小单元是
Packet
。所有请求在发送给服务端之前,都需要先构建一个
Packet
,再将
Packet
提交给请求处理队列
outgoingQueue
并唤醒
SendThread
线程,最后处理写事件,从
outgoingQueue
中取出
Packet
,将其序列化写入网络发送缓冲区。
1、构建协议包
Packet
中包含请求头、请求体、响应头、响应体、本地回调函数、
watcher
注册等信息。
(1)请求体和响应体
不同的请求API有不同的请求体和响应体,比如
getData
的请求体是
GetDataRequest
,响应体是
GetDataResponse
,
setData
的请求体是
SetDataRequest
,响应体是
SetDataResponse
。
如下是不同请求体和响应体的类关系图:
如下图是常见的几个请求体和响应体的内容结构:
(2)请求头
请求头
RequestHeader
定义了操作类型
OpCode
和请求序号
xid
。
- 最常见
OpCode
有create=1
、delete=2
、exists=3
、getData=5
、setData=6
、ping=11
等,详细参考org.apache.zookeeper.ZooDefs.OpCode
。 xid
用于记录客户端请求发起的先后序号,用来确保单个客户端请求的响应顺序。正常从1开始自增,但是也有几个特殊的xid
定义,NOTIFICATION_XID=-1``````watcher
通知信息,PING_XID=-2
心跳请求,AUTHPACKET_XID=-4
授权数据包请求,SET_WATCHES_XID=-8
设置watcher
请求。
根据协议规定,除非是“会话创建”请求,其他所有的客户端请求都会带上请求头。
(3)getData源码示例
以
getData
源码为例,其他类似:
2、发送数据包
(1)提交给outgoingQueue
构建好
Packet
,就提交给
outgoingQueue
队列,然后通知
SendThread
线程:
(2)SendThread处理写事件
SendThread
线程轮询
SelectionKey
列表,处理写事件:
除了会话建立请求、心跳请求,其他正常请求发送完毕后,都需要添加到
pendingQueue
队列,其目的是按顺序处理响应。
(3)网络包序列化
真正要发给服务端的只有请求头和请求体以及长度等少量信息。
如下是
Packet
序列化过程:
发送的网络包,需要序列化为
byte
数组,而
ZooKeeper
并没有使用多么高深的序列化技术,实则还是用的Java原生的序列化和反序列化技术
ByteArrayOutputStream
。DataOutputStream
二、接收服务端响应
1、按顺序处理响应
正常请求,如
getData
、
setData
、
create
、
delete
等的响应都需要按顺序处理。接收服务端发来的响应信息按顺序和
pendingQueue
队列中的
Packet
对比
xid
是否相等,相等就是同一个请求,不相等就说明顺序乱了,抛出异常。
如下是处理响应的部分源码:
// org.apache.zookeeper.ClientCnxn.SendThread#readResponsevoidreadResponse(ByteBuffer incomingBuffer)throws IOException {
ByteBufferInputStream bbis =newByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr =newReplyHeader();// 解码
replyHdr.deserialize(bbia,"header");// 暂时省略 Xid事件的处理// 必须按顺序处理响应,pendingQueue 按顺序出队列
Packet packet;synchronized(pendingQueue){if(pendingQueue.size()==0){thrownewIOException("Nothing in the queue, but got "+ replyHdr.getXid());}// 从 pendingQueue 中取出 packet
packet = pendingQueue.remove();}/*
* Since requests are processed in order, we better get a response to the first request!
*/try{// 对比xid是否一致,若不一致则抛出Xid out of order异常if(packet.requestHeader.getXid()!= replyHdr.getXid()){
packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());thrownewIOException("Xid out of order. Got Xid "+ replyHdr.getXid()+" with err "+ replyHdr.getErr()+" expected Xid "+ packet.requestHeader.getXid()+" for a packet with details: "+ packet);}// 填充 replyHeader
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());if(replyHdr.getZxid()>0){
lastZxid = replyHdr.getZxid();}if(packet.response != null && replyHdr.getErr()==0){// 反序列化 response
packet.response.deserialize(bbia,"response");}
LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);}finally{// 进行packet处理的收尾工作,如注册watcher、唤醒同步阻塞的主线程、触发本地回调函数等finishPacket(packet);}}
从网络底层读取数据,然后反序列化出响应头和响应体。
2、唤醒同步阻塞
请求的同步阻塞方式到底如何实现的呢?
以同步阻塞方式等待响应结果的请求API,都是调用方法
org.apache.zookeeper.ClientCnxn#submitRequest
:
将
packet
提交给
outgoingQueue
队列后,就调用
packet.wait()
阻塞当前线程。接收到响应,解析对比完
packet
后,调用
finishPacket()
方法进行收尾工作,如果没有设置
Callback
,就调用
packet.notifyAll()
唤醒刚才阻塞的线程。
3、异步回调通知
以异步回调通知响应结果,就直接调用的
org.apache.zookeeper.ClientCnxn#queuePacket
,直接将
packet
添加到
outgoingQueue
队列。
在调用
finishPacket()
方法进行收尾工作时,判断如果设置了
Callback
,就将
packet
交给
EventThread
进行回调通知。
首先将
packet
添加到
EventThread
线程的
waitingEvents
队列,然后
EventThread
线程循环遍历
waitingEvents
队列取出
packet
处理:
三、总结与参考
一图以蔽之。
推荐阅读:《从Paxos到Zookeeper:分布式一致性原理与实践》倪超著。
如若文章有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。
版权归原作者 徐同学呀 所有, 如有侵权,请联系我们删除。