0


Zookeeper节点无法加入集群异常问题源代码分析

遇到的问题:

对一个多节点的集群的某个节点重启后,该节点无法正常加入ZK集群。

具体现象

bin/zkServer.sh status

JMX enabled by default

......

Error contacting service. It is probably not running.

该节点的 zk进程处于运行状态,日志持续出现此类信息:

“Have smaller server identifier, so dropping the connections: (3,2)”;

“Notification timeout: 60000”

临时解决方案

重启其他节点的zk进程,则该节点能正常加入zk集群。

根本原因

zookeeper 某版本的bug:https://issues.apache.org/jira/browse/ZOOKEEPER-2186

Zookeeper启动时,会创建一个Listener子线程,该子线程创建绑定端口为选举端口的SocketServer,该SocketServer监听ZK集群其他peer节点socket连接,接收并进行处理。

 public class Listener extends Thread {

        volatile ServerSocket ss = null;

        /**
         * Sleeps on accept().
         */
        @Override
        public void run() {
            int numRetries = 0;
            InetSocketAddress addr;
            while((!shutdown) && (numRetries < 3)){
                try {
                    ss = new ServerSocket();
                    ss.setReuseAddress(true);
                    if (self.getQuorumListenOnAllIPs()) {
                        int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
                        addr = new InetSocketAddress(port);
                    } else {
                        addr = self.quorumPeers.get(self.getId()).electionAddr;
                    }
                    LOG.info("My election bind port: " + addr.toString());
                    setName(self.quorumPeers.get(self.getId()).electionAddr
                            .toString());
                    ss.bind(addr);
                    while (!shutdown) {
                        Socket client = ss.accept();
                        setSockOpts(client);
                        LOG.info("Received connection request "
                                + client.getRemoteSocketAddress());
                        receiveConnection(client);
                        numRetries = 0;
                    }
                } catch (IOException e) {
                    LOG.error("Exception while listening", e);
                    numRetries++;
                    try {
                        ss.close();
                        Thread.sleep(1000);
                    } catch (IOException ie) {
                        LOG.error("Error closing server socket", ie);
                    } catch (InterruptedException ie) {
                        LOG.error("Interrupted while sleeping. " +
                                  "Ignoring exception", ie);
                    }
                }
            }
            LOG.info("Leaving listener");
            if (!shutdown) {
                LOG.error("As I'm leaving the listener thread, "
                        + "I won't be able to participate in leader "
                        + "election any longer: "
                        + self.quorumPeers.get(self.getId()).electionAddr);
            }
        }
        
        /**
         * Halts this listener thread.
         */
        void halt(){
            try{
                LOG.debug("Trying to close listener: " + ss);
                if(ss != null) {
                    LOG.debug("Closing listener: " + self.getId());
                    ss.close();
                }
            } catch (IOException e){
                LOG.warn("Exception when shutting down listener: " + e);
            }
        }
    }

当Listener线程接收到其他peer节点发送socket连接时,日志会打印“Received connection request xxxx”信息,但此时集群所有节点均没有该日志信息。异常节点的日志持续输出“Have smaller server identifier, so dropping the connections: (3,2)”,“Notification timeout: 60000”。这些日志表明异常节点在持续发送选举notification信息至节点3。正常情况下,节点3接收到server id比它小的异常节点的socket连接时,会关闭socket,并主动发起socket连接至异常节点。但此时,节点3没有日志显示接收到异常节点发来的socket连接。异常节点的日志也没有显示连接失败Exception信息。尽管Listener线程crash后,其主线程依然在运行,相应的选举端口依然处于listen状态,客户端虽然能建立socket连接,但Listener线程已无法处理请求,导致选举失败。通过重启其他节点 zk服务,让Listener恢复至正常监听状态,异常节点即可选举成功。

public boolean receiveConnection(Socket sock) {
        Long sid = null;
        
        try {
            // Read server id
            DataInputStream din = new DataInputStream(sock.getInputStream());
            sid = din.readLong();
            if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
                sid = din.readLong();
                // next comes the #bytes in the remainder of the message
                int num_remaining_bytes = din.readInt();
                byte[] b = new byte[num_remaining_bytes];
                // remove the remainder of the message from din
                int num_read = din.read(b);
                if (num_read != num_remaining_bytes) {
                    LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
                }
            }
            if (sid == QuorumPeer.OBSERVER_ID) {
                /*
                 * Choose identifier at random. We need a value to identify
                 * the connection.
                 */
                
                sid = observerCounter--;
                LOG.info("Setting arbitrary identifier to observer: " + sid);
            }
        } catch (IOException e) {
            closeSocket(sock);
            LOG.warn("Exception reading or writing challenge: " + e.toString());
            return false;
        }
        
        //If wins the challenge, then close the new connection.
        if (sid < self.getId()) {
            /*
             * This replica might still believe that the connection to sid is
             * up, so we have to shut down the workers before trying to open a
             * new connection.
             */
            SendWorker sw = senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }

            /*
             * Now we start a new connection
             */
            LOG.debug("Create new connection to server: " + sid);
            closeSocket(sock);
            connectOne(sid);

            // Otherwise start worker threads to receive data.
        } else {
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, sid, sw);
            sw.setRecv(rw);

            SendWorker vsw = senderWorkerMap.get(sid);
            
            if(vsw != null)
                vsw.finish();
            
            senderWorkerMap.put(sid, sw);
            
            if (!queueSendMap.containsKey(sid)) {
                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                        SEND_CAPACITY));
            }
            
            sw.start();
            rw.start();
            
            return true;    
        }
        return false;
    }

对于Listener子线程crash掉的情况下,为什么Socket客户端还能正常与ServerSocket建立连接,且没有抛出连接异常信息?我做一个演示。先创建一个主线程,并在主线程下创建ServerSocket子线程。最后通过Socket客户端测试该问题。

主线程类MainThread,在该主线程类中,创建一个子线程SimpleSocketServer,监听端口3888,接收到的socket请求,调用receiveConnection方法进行处理:

public class MainThread extends Thread {

    @Override
    public void run() {
        int port = 3888;
        System.out.println("Start server on port: " + port);
        SimpleSocketServer simpleSocketServer = new SimpleSocketServer(port);
        simpleSocketServer.start();

        while (true) {
            System.out.println("MainThread running...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

public class SimpleSocketServer extends Thread {

    private ServerSocket serverSocket;
    private int port;
    private boolean running = false;

    public SimpleSocketServer(int port) {
        this.port = port;
    }

    public void stopServer() {
        running = false;
        this.interrupt();
    }

    @Override
    public void run() {
        try {
            serverSocket = new ServerSocket(port);
        } catch (IOException e) {
            e.printStackTrace();
        }
        running = true;
        while (running) {
            try {
                System.out.println("Listening for a connection");
                // 调用accept()方法阻塞在这里接收下一个连接
                Socket socket = serverSocket.accept();
                System.out.println("Received a connection");
                receiveConnection(socket);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void receiveConnection(Socket sock) {
        Long sid = null;
        try {
            // Read server id
            DataInputStream din = new DataInputStream(sock.getInputStream());
            sid = din.readLong();
            System.out.println("1 sid: " + sid);
            if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
                sid = din.readLong();
                System.out.println("2 sid: " + sid);
                // next comes the #bytes in the remainder of the message
                int num_remaining_bytes = din.readInt();
                System.out.println("num_remaining_bytes: " + num_remaining_bytes);
                byte[] b = new byte[num_remaining_bytes];
                // remove the remainder of the message from din
                int num_read = din.read(b);
                System.out.println("num_read: " + num_read);
                if (num_read != num_remaining_bytes) {
                    System.out.println("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
                }
            }

        } catch (IOException e) {
            closeSocket(sock);
            System.out.format("Exception reading or writing challenge: \n");
            e.printStackTrace();
            return;
        }

    }

    private void closeSocket(Socket sock) {
        try {
            sock.close();
        } catch (IOException ie) {
            System.out.format("Exception while closing %s \n", ie.toString());
        }
    }

}

Main类:

public class Main {
    public static void main(String[] args) {
        MainThread mainThread = new MainThread();
        mainThread.start();
    }
}

把主线程运行起来,console输出信息:

Start server on port: 3888
MainThread running...
Listening for a connection

Socket客户端类,该类与SocketServer建立连接后,发送特定字符串:

public class SocketClient {
    public static void main(String[] args) {
        try {
            Socket socket = new Socket("localhost", 3888);
            System.out.println("Connected!");
            DataOutputStream out = new DataOutputStream(socket.getOutputStream());
            // protocol version - a negative number
            out.writeLong(0xffff0000);
            // server id
            out.writeLong(new Long(2));
            // addr
            String addr = "1.2.3.4:3888";
            byte[] addrBytes = addr.getBytes();
            out.writeInt(addrBytes.length);
            out.write(addrBytes);
            out.flush();
            System.out.println("Sent!");
        } catch (Exception e) {
            System.out.println("捕获Exception");
            e.printStackTrace();
        }
    }
}

运行正常的SocketClient,SimpleSocketServer线程接收到请求,进行正常处理,控制台输出信息:

Received a connection
1 sid: -65536
2 sid: 2
num_remaining_bytes: 12
num_read: 12

这时,修改SocketClient的代码,往SimpleSocketServer发送一个很大的int数值,

out.writeInt(2147483647);
public class SocketClient {
    public static void main(String[] args) {
        try {
            Socket socket = new Socket("localhost", 3888);
            System.out.println("Connected!");
            DataOutputStream out = new DataOutputStream(socket.getOutputStream());
            // protocol version - a negative number
            out.writeLong(0xffff0000);
            // server id
            out.writeLong(new Long(2));
            // addr
            String addr = "1.2.3.4:3888";
            byte[] addrBytes = addr.getBytes();
//            out.writeInt(addrBytes.length);
            // 发送一个很大的int数值
            out.writeInt(2147483647);
            out.write(addrBytes);
            out.flush();
            System.out.println("Sent!");
        } catch (Exception e) {
            System.out.println("捕获Exception");
            e.printStackTrace();
        }
    }
}

SimpleSocketServer抛出异常信息:

Received a connection
1 sid: -65536
2 sid: 2
num_remaining_bytes: 2147483647
Exception in thread "Thread-1" java.lang.OutOfMemoryError: Requested array size exceeds VM limit
at com.demo.SimpleSocketServer.receiveConnection(SimpleSocketServer.java:108)
at com.demo.SimpleSocketServer.run(SimpleSocketServer.java:48)

SocketClient尝试再次连接SimpleSocketServer,连接无显示任何异常信息,且telnet localhost 3888也能通过,但服务端再也接收不到客户端所发来的任何信息

回到本文的bug,在新版本的zookeeper得到了修复,修复代码为:

               // next comes the #bytes in the remainder of the message
                // note that 0 bytes is fine (old servers)
                int num_remaining_bytes = din.readInt();
                // 如果num_remaining_bytes超出最大值,就抛出ERROR日志信息,并返回
                if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {
                    LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
                    closeSocket(sock);
                    return;
                }
                byte[] b = new byte[num_remaining_bytes];

                // remove the remainder of the message from din
                // 执行这一步的时候,b字节组则为一个合理长度的字节数组,最后把din的字节内容读入至b中,并返回读取到内容的长度
                int num_read = din.read(b);
                if (num_read != num_remaining_bytes) {
                    LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
                }
标签: zookeeper

本文转载自: https://blog.csdn.net/zss_89/article/details/124392023
版权归原作者 青草地@~~ 所有, 如有侵权,请联系我们删除。

“Zookeeper节点无法加入集群异常问题源代码分析”的评论:

还没有评论