0


Zookeeper节点无法加入集群异常问题源代码分析

遇到的问题:

对一个多节点的集群的某个节点重启后,该节点无法正常加入ZK集群。

具体现象

bin/zkServer.sh status

JMX enabled by default

......

Error contacting service. It is probably not running.

该节点的 zk进程处于运行状态,日志持续出现此类信息:

“Have smaller server identifier, so dropping the connections: (3,2)”;

“Notification timeout: 60000”

临时解决方案

重启其他节点的zk进程,则该节点能正常加入zk集群。

根本原因

zookeeper 某版本的bug:https://issues.apache.org/jira/browse/ZOOKEEPER-2186

Zookeeper启动时,会创建一个Listener子线程,该子线程创建绑定端口为选举端口的SocketServer,该SocketServer监听ZK集群其他peer节点socket连接,接收并进行处理。

  1. public class Listener extends Thread {
  2. volatile ServerSocket ss = null;
  3. /**
  4. * Sleeps on accept().
  5. */
  6. @Override
  7. public void run() {
  8. int numRetries = 0;
  9. InetSocketAddress addr;
  10. while((!shutdown) && (numRetries < 3)){
  11. try {
  12. ss = new ServerSocket();
  13. ss.setReuseAddress(true);
  14. if (self.getQuorumListenOnAllIPs()) {
  15. int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
  16. addr = new InetSocketAddress(port);
  17. } else {
  18. addr = self.quorumPeers.get(self.getId()).electionAddr;
  19. }
  20. LOG.info("My election bind port: " + addr.toString());
  21. setName(self.quorumPeers.get(self.getId()).electionAddr
  22. .toString());
  23. ss.bind(addr);
  24. while (!shutdown) {
  25. Socket client = ss.accept();
  26. setSockOpts(client);
  27. LOG.info("Received connection request "
  28. + client.getRemoteSocketAddress());
  29. receiveConnection(client);
  30. numRetries = 0;
  31. }
  32. } catch (IOException e) {
  33. LOG.error("Exception while listening", e);
  34. numRetries++;
  35. try {
  36. ss.close();
  37. Thread.sleep(1000);
  38. } catch (IOException ie) {
  39. LOG.error("Error closing server socket", ie);
  40. } catch (InterruptedException ie) {
  41. LOG.error("Interrupted while sleeping. " +
  42. "Ignoring exception", ie);
  43. }
  44. }
  45. }
  46. LOG.info("Leaving listener");
  47. if (!shutdown) {
  48. LOG.error("As I'm leaving the listener thread, "
  49. + "I won't be able to participate in leader "
  50. + "election any longer: "
  51. + self.quorumPeers.get(self.getId()).electionAddr);
  52. }
  53. }
  54. /**
  55. * Halts this listener thread.
  56. */
  57. void halt(){
  58. try{
  59. LOG.debug("Trying to close listener: " + ss);
  60. if(ss != null) {
  61. LOG.debug("Closing listener: " + self.getId());
  62. ss.close();
  63. }
  64. } catch (IOException e){
  65. LOG.warn("Exception when shutting down listener: " + e);
  66. }
  67. }
  68. }

当Listener线程接收到其他peer节点发送socket连接时,日志会打印“Received connection request xxxx”信息,但此时集群所有节点均没有该日志信息。异常节点的日志持续输出“Have smaller server identifier, so dropping the connections: (3,2)”,“Notification timeout: 60000”。这些日志表明异常节点在持续发送选举notification信息至节点3。正常情况下,节点3接收到server id比它小的异常节点的socket连接时,会关闭socket,并主动发起socket连接至异常节点。但此时,节点3没有日志显示接收到异常节点发来的socket连接。异常节点的日志也没有显示连接失败Exception信息。尽管Listener线程crash后,其主线程依然在运行,相应的选举端口依然处于listen状态,客户端虽然能建立socket连接,但Listener线程已无法处理请求,导致选举失败。通过重启其他节点 zk服务,让Listener恢复至正常监听状态,异常节点即可选举成功。

  1. public boolean receiveConnection(Socket sock) {
  2. Long sid = null;
  3. try {
  4. // Read server id
  5. DataInputStream din = new DataInputStream(sock.getInputStream());
  6. sid = din.readLong();
  7. if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
  8. sid = din.readLong();
  9. // next comes the #bytes in the remainder of the message
  10. int num_remaining_bytes = din.readInt();
  11. byte[] b = new byte[num_remaining_bytes];
  12. // remove the remainder of the message from din
  13. int num_read = din.read(b);
  14. if (num_read != num_remaining_bytes) {
  15. LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
  16. }
  17. }
  18. if (sid == QuorumPeer.OBSERVER_ID) {
  19. /*
  20. * Choose identifier at random. We need a value to identify
  21. * the connection.
  22. */
  23. sid = observerCounter--;
  24. LOG.info("Setting arbitrary identifier to observer: " + sid);
  25. }
  26. } catch (IOException e) {
  27. closeSocket(sock);
  28. LOG.warn("Exception reading or writing challenge: " + e.toString());
  29. return false;
  30. }
  31. //If wins the challenge, then close the new connection.
  32. if (sid < self.getId()) {
  33. /*
  34. * This replica might still believe that the connection to sid is
  35. * up, so we have to shut down the workers before trying to open a
  36. * new connection.
  37. */
  38. SendWorker sw = senderWorkerMap.get(sid);
  39. if (sw != null) {
  40. sw.finish();
  41. }
  42. /*
  43. * Now we start a new connection
  44. */
  45. LOG.debug("Create new connection to server: " + sid);
  46. closeSocket(sock);
  47. connectOne(sid);
  48. // Otherwise start worker threads to receive data.
  49. } else {
  50. SendWorker sw = new SendWorker(sock, sid);
  51. RecvWorker rw = new RecvWorker(sock, sid, sw);
  52. sw.setRecv(rw);
  53. SendWorker vsw = senderWorkerMap.get(sid);
  54. if(vsw != null)
  55. vsw.finish();
  56. senderWorkerMap.put(sid, sw);
  57. if (!queueSendMap.containsKey(sid)) {
  58. queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
  59. SEND_CAPACITY));
  60. }
  61. sw.start();
  62. rw.start();
  63. return true;
  64. }
  65. return false;
  66. }

对于Listener子线程crash掉的情况下,为什么Socket客户端还能正常与ServerSocket建立连接,且没有抛出连接异常信息?我做一个演示。先创建一个主线程,并在主线程下创建ServerSocket子线程。最后通过Socket客户端测试该问题。

主线程类MainThread,在该主线程类中,创建一个子线程SimpleSocketServer,监听端口3888,接收到的socket请求,调用receiveConnection方法进行处理:

  1. public class MainThread extends Thread {
  2. @Override
  3. public void run() {
  4. int port = 3888;
  5. System.out.println("Start server on port: " + port);
  6. SimpleSocketServer simpleSocketServer = new SimpleSocketServer(port);
  7. simpleSocketServer.start();
  8. while (true) {
  9. System.out.println("MainThread running...");
  10. try {
  11. Thread.sleep(3000);
  12. } catch (InterruptedException e) {
  13. e.printStackTrace();
  14. }
  15. }
  16. }
  17. }
  18. public class SimpleSocketServer extends Thread {
  19. private ServerSocket serverSocket;
  20. private int port;
  21. private boolean running = false;
  22. public SimpleSocketServer(int port) {
  23. this.port = port;
  24. }
  25. public void stopServer() {
  26. running = false;
  27. this.interrupt();
  28. }
  29. @Override
  30. public void run() {
  31. try {
  32. serverSocket = new ServerSocket(port);
  33. } catch (IOException e) {
  34. e.printStackTrace();
  35. }
  36. running = true;
  37. while (running) {
  38. try {
  39. System.out.println("Listening for a connection");
  40. // 调用accept()方法阻塞在这里接收下一个连接
  41. Socket socket = serverSocket.accept();
  42. System.out.println("Received a connection");
  43. receiveConnection(socket);
  44. } catch (IOException e) {
  45. e.printStackTrace();
  46. }
  47. }
  48. }
  49. public void receiveConnection(Socket sock) {
  50. Long sid = null;
  51. try {
  52. // Read server id
  53. DataInputStream din = new DataInputStream(sock.getInputStream());
  54. sid = din.readLong();
  55. System.out.println("1 sid: " + sid);
  56. if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
  57. sid = din.readLong();
  58. System.out.println("2 sid: " + sid);
  59. // next comes the #bytes in the remainder of the message
  60. int num_remaining_bytes = din.readInt();
  61. System.out.println("num_remaining_bytes: " + num_remaining_bytes);
  62. byte[] b = new byte[num_remaining_bytes];
  63. // remove the remainder of the message from din
  64. int num_read = din.read(b);
  65. System.out.println("num_read: " + num_read);
  66. if (num_read != num_remaining_bytes) {
  67. System.out.println("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
  68. }
  69. }
  70. } catch (IOException e) {
  71. closeSocket(sock);
  72. System.out.format("Exception reading or writing challenge: \n");
  73. e.printStackTrace();
  74. return;
  75. }
  76. }
  77. private void closeSocket(Socket sock) {
  78. try {
  79. sock.close();
  80. } catch (IOException ie) {
  81. System.out.format("Exception while closing %s \n", ie.toString());
  82. }
  83. }
  84. }

Main类:

  1. public class Main {
  2. public static void main(String[] args) {
  3. MainThread mainThread = new MainThread();
  4. mainThread.start();
  5. }
  6. }

把主线程运行起来,console输出信息:

Start server on port: 3888
MainThread running...
Listening for a connection

Socket客户端类,该类与SocketServer建立连接后,发送特定字符串:

  1. public class SocketClient {
  2. public static void main(String[] args) {
  3. try {
  4. Socket socket = new Socket("localhost", 3888);
  5. System.out.println("Connected!");
  6. DataOutputStream out = new DataOutputStream(socket.getOutputStream());
  7. // protocol version - a negative number
  8. out.writeLong(0xffff0000);
  9. // server id
  10. out.writeLong(new Long(2));
  11. // addr
  12. String addr = "1.2.3.4:3888";
  13. byte[] addrBytes = addr.getBytes();
  14. out.writeInt(addrBytes.length);
  15. out.write(addrBytes);
  16. out.flush();
  17. System.out.println("Sent!");
  18. } catch (Exception e) {
  19. System.out.println("捕获Exception");
  20. e.printStackTrace();
  21. }
  22. }
  23. }

运行正常的SocketClient,SimpleSocketServer线程接收到请求,进行正常处理,控制台输出信息:

Received a connection
1 sid: -65536
2 sid: 2
num_remaining_bytes: 12
num_read: 12

这时,修改SocketClient的代码,往SimpleSocketServer发送一个很大的int数值,

  1. out.writeInt(2147483647);
  1. public class SocketClient {
  2. public static void main(String[] args) {
  3. try {
  4. Socket socket = new Socket("localhost", 3888);
  5. System.out.println("Connected!");
  6. DataOutputStream out = new DataOutputStream(socket.getOutputStream());
  7. // protocol version - a negative number
  8. out.writeLong(0xffff0000);
  9. // server id
  10. out.writeLong(new Long(2));
  11. // addr
  12. String addr = "1.2.3.4:3888";
  13. byte[] addrBytes = addr.getBytes();
  14. // out.writeInt(addrBytes.length);
  15. // 发送一个很大的int数值
  16. out.writeInt(2147483647);
  17. out.write(addrBytes);
  18. out.flush();
  19. System.out.println("Sent!");
  20. } catch (Exception e) {
  21. System.out.println("捕获Exception");
  22. e.printStackTrace();
  23. }
  24. }
  25. }

SimpleSocketServer抛出异常信息:

Received a connection
1 sid: -65536
2 sid: 2
num_remaining_bytes: 2147483647
Exception in thread "Thread-1" java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at com.demo.SimpleSocketServer.receiveConnection(SimpleSocketServer.java:108)
at com.demo.SimpleSocketServer.run(SimpleSocketServer.java:48)

SocketClient尝试再次连接SimpleSocketServer,连接无显示任何异常信息,且telnet localhost 3888也能通过,但服务端再也接收不到客户端所发来的任何信息

回到本文的bug,在新版本的zookeeper得到了修复,修复代码为:

  1. // next comes the #bytes in the remainder of the message
  2. // note that 0 bytes is fine (old servers)
  3. int num_remaining_bytes = din.readInt();
  4. // 如果num_remaining_bytes超出最大值,就抛出ERROR日志信息,并返回
  5. if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {
  6. LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
  7. closeSocket(sock);
  8. return;
  9. }
  10. byte[] b = new byte[num_remaining_bytes];
  11. // remove the remainder of the message from din
  12. // 执行这一步的时候,b字节组则为一个合理长度的字节数组,最后把din的字节内容读入至b中,并返回读取到内容的长度
  13. int num_read = din.read(b);
  14. if (num_read != num_remaining_bytes) {
  15. LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
  16. }
标签: zookeeper

本文转载自: https://blog.csdn.net/zss_89/article/details/124392023
版权归原作者 青草地@~~ 所有, 如有侵权,请联系我们删除。

“Zookeeper节点无法加入集群异常问题源代码分析”的评论:

还没有评论