0


zookeeper客户端会话状态分析

前言:

Zookeeper客户端创建完成之后,后续就可以使用其进行请求发送,在请求发送之前,会先检查客户端与服务端的连接是否存在(是否有对应Session),如果不存在,则会先创建Session会话,后续的操作都会依据当前Session来发送。

而当连接发生异常时(网络波动等),客户端会尝试重连服务端,其中又会涉及到Session状态的变化。本文就来了解下Session的创建及其状态的变化过程。

1.客户端连接的状态

客户端与服务端三次握手完成后,会发送ConnectRequest请求,服务端处理完成后,返回ConnectResponse,代表当前Session会话创建完成,会话里面有一个sessionId,代表当前会话(很关键)。

那么连接具体有哪些状态呢?具体都写在Zookeeper.States里,具体代码如下

  1. public enum States {
  2. CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
  3. CLOSED, AUTH_FAILED, NOT_CONNECTED;
  4. // 当前连接是否存活
  5. public boolean isAlive() {
  6. return this != CLOSED && this != AUTH_FAILED;
  7. }
  8. // 当前连接是否连接成功
  9. public boolean isConnected() {
  10. return this == CONNECTED || this == CONNECTEDREADONLY;
  11. }
  12. }

状态还是比较多的,下面我们来看下这个状态的变化过程。

2.NOT_CONNECTED 未连接

未连接状态是默认状态,具体见ClientCnxn

  1. volatile States state = States.NOT_CONNECTED;

3.CONNECTING 连接中

CONNECTING代表还未连接到Zookeeper服务端,在以下场景中会被设置

3.1 SendThread新建

  1. SendThread(ClientCnxnSocket clientCnxnSocket) {
  2. super(makeThreadName("-SendThread()"));
  3. // 当SendThread新建时会将ClientCnxn 的连接状态设置为CONNECTING
  4. state = States.CONNECTING;
  5. this.clientCnxnSocket = clientCnxnSocket;
  6. setDaemon(true);
  7. }

**3.2 ClientCnxn.startConnect() **

  1. public class ClientCnxn {
  2. private void startConnect(InetSocketAddress addr) throws IOException {
  3. state = States.CONNECTING;
  4. ...
  5. logStartConnect(addr);
  6. clientCnxnSocket.connect(addr);
  7. }
  8. }

在客户端发送请求之前,若检测到与服务端之间的连接未创建,则会先发起创建连接的请求;

若发生网络异常时,也会重新创建连接。

4.CONNECTED 、CONNECTEDREADONLY 连接已完成

当成功接收到服务端对于ConnectRequest请求的响应(ConnectResponse)时,则会设置当前连接状态为已完成,具体代码如下

  1. public class ClientCnxnSocketNIO extends ClientCnxnSocket {
  2. void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
  3. throws InterruptedException, IOException {
  4. SocketChannel sock = (SocketChannel) sockKey.channel();
  5. if (sockKey.isReadable()) {
  6. int rc = sock.read(incomingBuffer);
  7. ...
  8. if (!incomingBuffer.hasRemaining()) {
  9. incomingBuffer.flip();
  10. if (incomingBuffer == lenBuffer) {
  11. recvCount++;
  12. readLength();
  13. } else if (!initialized) {
  14. // 如果发现还未初始化完成,说明当前响应为连接的响应
  15. readConnectResult();
  16. ...
  17. }
  18. }
  19. }
  20. }
  21. void readConnectResult() throws IOException {
  22. // 处理连接响应
  23. ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
  24. BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
  25. ConnectResponse conRsp = new ConnectResponse();
  26. conRsp.deserialize(bbia, "connect");
  27. // 客户端的readonly标志
  28. boolean isRO = false;
  29. try {
  30. isRO = bbia.readBool("readOnly");
  31. } catch (IOException e) {
  32. LOG.warn("Connected to an old server; r-o mode will be unavailable");
  33. }
  34. this.sessionId = conRsp.getSessionId();
  35. // 继续交由SendThread执行
  36. sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
  37. conRsp.getPasswd(), isRO);
  38. }
  39. }

SendThread.onConnected()

创建连接完成后,则当前会话状态变成CONNECTED或CONNECTEDREADONLY,这个具体由请求的Packet.readOnly标识

5.AUTH_FAILED 验证失败

当服务端连接需要用户名密码时,那么客户端连接的创建需要将正确的用户名密码传入,如果错误的话,服务端会返回该异常。具体代码如下

  1. class SendThread extends ZooKeeperThread {
  2. void readResponse(ByteBuffer incomingBuffer) throws IOException {
  3. ByteBufferInputStream bbis = new ByteBufferInputStream(
  4. incomingBuffer);
  5. BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
  6. ReplyHeader replyHdr = new ReplyHeader();
  7. replyHdr.deserialize(bbia, "header");
  8. ...
  9. if (replyHdr.getXid() == -4) {
  10. // 代表验证失败
  11. if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
  12. state = States.AUTH_FAILED;
  13. eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None,
  14. Watcher.Event.KeeperState.AuthFailed, null) );
  15. }
  16. return;
  17. }
  18. ...
  19. }
  20. }

6.CLOSED 关闭连接

当出现会话超时或者客户端主动退出时,那么会话的状态则直接变为CLOSED,具体场景如下

6.1 会话超时时间异常

  1. class SendThread extends ZooKeeperThread {
  2. void onConnected(int _negotiatedSessionTimeout, long _sessionId,
  3. byte[] _sessionPasswd, boolean isRO) throws IOException {
  4. negotiatedSessionTimeout = _negotiatedSessionTimeout;
  5. // 如果协商出的超时时间<=0,那么直接关闭当前连接
  6. if (negotiatedSessionTimeout <= 0) {
  7. state = States.CLOSED;
  8. }
  9. }
  10. }

6.2 ClinetCnxn.close() 客户端主动退出

  1. public class ClientCnxn {
  2. public void close() throws IOException {
  3. try {
  4. RequestHeader h = new RequestHeader();
  5. h.setType(ZooDefs.OpCode.closeSession);
  6. submitRequest(h, null, null, null);
  7. } catch (InterruptedException e) {
  8. // ignore, close the send/event threads
  9. } finally {
  10. // 关闭连接
  11. disconnect();
  12. }
  13. }
  14. public void disconnect() {
  15. // SendThread发起关闭
  16. sendThread.close();
  17. eventThread.queueEventOfDeath();
  18. }
  19. }
  20. class SendThread extends ZooKeeperThread {
  21. void close() {
  22. // 状态设置为CLOSED
  23. state = States.CLOSED;
  24. clientCnxnSocket.wakeupCnxn();
  25. }
  26. }

总结:

我们需要注意的是,在使用Zookeeper客户端连接服务端时,若服务端发生异常,导致连接中断时,则客户端会不断发起重连,不断的重试Zookeeper集群中的其他机器,重试代码在SendThread.run()方法中。

当客户端在超时时间sessionTimeout还没有连接上,那么服务端会清理该会话,此时客户端哪怕连接上,也会被服务端认为是非法会话而中断。这时我们则只能手动新建一个Zookeeper客户端会话。

最后,借用一张网络上图片来总结下会话状态的转换(来自【分布式】Zookeeper会话 - leesf - 博客园 )

标签: zookeeper http java

本文转载自: https://blog.csdn.net/qq_26323323/article/details/125251145
版权归原作者 恐龙弟旺仔 所有, 如有侵权,请联系我们删除。

“zookeeper客户端会话状态分析”的评论:

还没有评论