前言:
Zookeeper客户端创建完成之后,后续就可以使用其进行请求发送,在请求发送之前,会先检查客户端与服务端的连接是否存在(是否有对应Session),如果不存在,则会先创建Session会话,后续的操作都会依据当前Session来发送。
而当连接发生异常时(网络波动等),客户端会尝试重连服务端,其中又会涉及到Session状态的变化。本文就来了解下Session的创建及其状态的变化过程。
1.客户端连接的状态
客户端与服务端三次握手完成后,会发送ConnectRequest请求,服务端处理完成后,返回ConnectResponse,代表当前Session会话创建完成,会话里面有一个sessionId,代表当前会话(很关键)。
那么连接具体有哪些状态呢?具体都写在Zookeeper.States里,具体代码如下
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
CLOSED, AUTH_FAILED, NOT_CONNECTED;
// 当前连接是否存活
public boolean isAlive() {
return this != CLOSED && this != AUTH_FAILED;
}
// 当前连接是否连接成功
public boolean isConnected() {
return this == CONNECTED || this == CONNECTEDREADONLY;
}
}
状态还是比较多的,下面我们来看下这个状态的变化过程。
2.NOT_CONNECTED 未连接
未连接状态是默认状态,具体见ClientCnxn
volatile States state = States.NOT_CONNECTED;
3.CONNECTING 连接中
CONNECTING代表还未连接到Zookeeper服务端,在以下场景中会被设置
3.1 SendThread新建
SendThread(ClientCnxnSocket clientCnxnSocket) {
super(makeThreadName("-SendThread()"));
// 当SendThread新建时会将ClientCnxn 的连接状态设置为CONNECTING
state = States.CONNECTING;
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}
**3.2 ClientCnxn.startConnect() **
public class ClientCnxn {
private void startConnect(InetSocketAddress addr) throws IOException {
state = States.CONNECTING;
...
logStartConnect(addr);
clientCnxnSocket.connect(addr);
}
}
在客户端发送请求之前,若检测到与服务端之间的连接未创建,则会先发起创建连接的请求;
若发生网络异常时,也会重新创建连接。
4.CONNECTED 、CONNECTEDREADONLY 连接已完成
当成功接收到服务端对于ConnectRequest请求的响应(ConnectResponse)时,则会设置当前连接状态为已完成,具体代码如下
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
...
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
// 如果发现还未初始化完成,说明当前响应为连接的响应
readConnectResult();
...
}
}
}
}
void readConnectResult() throws IOException {
// 处理连接响应
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
// 客户端的readonly标志
boolean isRO = false;
try {
isRO = bbia.readBool("readOnly");
} catch (IOException e) {
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
this.sessionId = conRsp.getSessionId();
// 继续交由SendThread执行
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
conRsp.getPasswd(), isRO);
}
}
SendThread.onConnected()
创建连接完成后,则当前会话状态变成CONNECTED或CONNECTEDREADONLY,这个具体由请求的Packet.readOnly标识
5.AUTH_FAILED 验证失败
当服务端连接需要用户名密码时,那么客户端连接的创建需要将正确的用户名密码传入,如果错误的话,服务端会返回该异常。具体代码如下
class SendThread extends ZooKeeperThread {
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
...
if (replyHdr.getXid() == -4) {
// 代表验证失败
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null) );
}
return;
}
...
}
}
6.CLOSED 关闭连接
当出现会话超时或者客户端主动退出时,那么会话的状态则直接变为CLOSED,具体场景如下
6.1 会话超时时间异常
class SendThread extends ZooKeeperThread {
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
// 如果协商出的超时时间<=0,那么直接关闭当前连接
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
}
}
}
6.2 ClinetCnxn.close() 客户端主动退出
public class ClientCnxn {
public void close() throws IOException {
try {
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.closeSession);
submitRequest(h, null, null, null);
} catch (InterruptedException e) {
// ignore, close the send/event threads
} finally {
// 关闭连接
disconnect();
}
}
public void disconnect() {
// SendThread发起关闭
sendThread.close();
eventThread.queueEventOfDeath();
}
}
class SendThread extends ZooKeeperThread {
void close() {
// 状态设置为CLOSED
state = States.CLOSED;
clientCnxnSocket.wakeupCnxn();
}
}
总结:
我们需要注意的是,在使用Zookeeper客户端连接服务端时,若服务端发生异常,导致连接中断时,则客户端会不断发起重连,不断的重试Zookeeper集群中的其他机器,重试代码在SendThread.run()方法中。
当客户端在超时时间sessionTimeout还没有连接上,那么服务端会清理该会话,此时客户端哪怕连接上,也会被服务端认为是非法会话而中断。这时我们则只能手动新建一个Zookeeper客户端会话。
最后,借用一张网络上图片来总结下会话状态的转换(来自【分布式】Zookeeper会话 - leesf - 博客园 )
版权归原作者 恐龙弟旺仔 所有, 如有侵权,请联系我们删除。