0


菜鸡学习zookeeper源码(三)NIOServer的启动

前言

上一篇写到了QuorumPeer的start方法,里面主要进行执行了loadDataBase方法(进行加载本地的数据信息,具体是怎么进行加载的,没在文章中进行说明,这块小园子也没看,等分析完整体的启动流程之后在进行分析), 这篇文章的话主要写startServerCnxnFactory方法,在上一篇文章中也进行说明,这个方法主要进行了启动了两个ServerCnxnFactory对象,一个是安全的,一个是不安全的,里面的默认实现都是NIOServerCnxnFactory

NIOServerCnxnFactory

这个还是老的习惯,这个类上有很多注释说明,可以先看下类的注释说明,这种开源的框架一般都会在类的说明上进行说明这个类是干什么的

在类上的说明我们可以看出来这个这个是通过nio非阻塞式socket进行连接的,线程之间的通信是通过队列来进行处理,它里面主要有1个接收线程来进行接收新的连接,并将新的连接给selector线程,selector线程数量是1-N个,通过工厂进行创建多个selector线程来进行支持大量的连接,当连接很多的时候,这块可能会成一个瓶颈,0-M个socket I/O worker 线程来进行I/O线程的读写操作,还有1个过期的线程来进行关闭空闲的连接信息。这个zookeeper的官方在这个类的上面给了一个示例:在32核机器上,1个接受线程,1个连接过期线程、4个selector线程和64个socket I/O worker。原文注释如下:

/**
 * NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using
 * NIO non-blocking socket calls. Communication between threads is handled via
 * queues.
 *
 *   - 1   accept thread, which accepts new connections and assigns to a
 *         selector thread
 *   - 1-N selector threads, each of which selects on 1/N of the connections.
 *         The reason the factory supports more than one selector thread is that
 *         with large numbers of connections, select() itself can become a
 *         performance bottleneck.
 *   - 0-M socket I/O worker threads, which perform basic socket reads and
 *         writes. If configured with 0 worker threads, the selector threads
 *         do the socket I/O directly.
 *   - 1   connection expiration thread, which closes idle connections; this is
 *         necessary to expire connections on which no session is established.
 *
 * Typical (default) thread counts are: on a 32 core machine, 1 accept thread,
 * 1 connection expiration thread, 4 selector threads, and 64 worker threads.
 */

我们先看下构造函数

 /**
     * Construct a new server connection factory which will accept an unlimited number
     * of concurrent connections from each client (up to the file descriptor
     * limits of the operating system). startup(zks) must be called subsequently.
     */
    public NIOServerCnxnFactory() {
    }

在构造函数中我们可以看到一些注释,上面注释的大致意思是:这个NIOServer的连接工厂可以接收来自每个客户端的无限数量的并发连接(最多为文件描述符操作系统的限制)。我们可以后面看下怎么进行支持无限数量的并发连接。

在 runFromConfig方法解析中,可以看到都进行调用了NIOServerCnxnFactory#configure方法

NIOServerCnxnFactory#configure

 public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
        if (secure) {
            throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
        }
        
        configureSaslLogin();
//设置最大的连接数
        maxClientCnxns = maxcc;
        initMaxCnxns();

        sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
        // We also use the sessionlessCnxnTimeout as expiring interval for
        // cnxnExpiryQueue. These don't need to be the same, but the expiring
        // interval passed into the ExpiryQueue() constructor below should be
        // less than or equal to the timeout.
         //设置过期的队列以及对过期线程进行初始化
        cnxnExpiryQueue = new ExpiryQueue<>(sessionlessCnxnTimeout);
        expirerThread = new ConnectionExpirerThread();
        //获取本地的CPU的核数
        int numCores = Runtime.getRuntime().availableProcessors();
        // 32 cores sweet spot seems to be 4 selector threads
        //selector线程的数量
        numSelectorThreads = Integer.getInteger(
            ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
            Math.max((int) Math.sqrt((float) numCores / 2), 1));
        if (numSelectorThreads < 1) {
            throw new IOException("numSelectorThreads must be at least 1");
        }
        //工作线程的数量
        numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
        workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);

        String logMsg = "Configuring NIO connection handler with "
            + (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, "
            + numSelectorThreads + " selector thread(s), "
            + (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and "
            + (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers."));
        LOG.info(logMsg);
        for (int i = 0; i < numSelectorThreads; ++i) {
            selectorThreads.add(new SelectorThread(i));
        }

        listenBacklog = backlog;
        
        //从这往下就是常见的nio socket编程的
        //绑定端口号
        this.ss = ServerSocketChannel.open();
        ss.socket().setReuseAddress(true);
        LOG.info("binding to port {}", addr);
        if (listenBacklog == -1) {
            ss.socket().bind(addr);
        } else {
            ss.socket().bind(addr, listenBacklog);
        }
        if (addr.getPort() == 0) {
            // We're likely bound to a different port than was requested, so log that too
            LOG.info("bound to port {}", ss.getLocalAddress());
        }
        ss.configureBlocking(false);
        //初始化一个接收线程
        acceptThread = new AcceptThread(ss, addr, selectorThreads);
    }

这个configure方法,大体逻辑上是针对NIOServerCnxnFactory里面的一些属性进行一些赋值操作,会针对maxClientCnxns(最大连接数),expirerThread(过期线程)初始化,numSelectorThreads(selector线程的数量,会进行获取CPU的核数,根据核数进行计算selector线程数),numWorkerThreads(工作的线程数)以及常见的serverScoketChannel的初始化,最后在进行初始化一个接收线程。

NIOServerCnxnFactory#start

 public void start() {
        stopped = false;
        if (workerPool == null) {
            workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
        }
        for (SelectorThread thread : selectorThreads) {
            if (thread.getState() == Thread.State.NEW) {
                thread.start();
            }
        }
        // ensure thread is started once and only once
        if (acceptThread.getState() == Thread.State.NEW) {
            acceptThread.start();
        }
        if (expirerThread.getState() == Thread.State.NEW) {
            expirerThread.start();
        }
    }

NIOServerCnxnFactory的start方法这个代码行数比较少,一眼看过去,主要进行了三种操作,WorkService(WorkerService是用于运行任务的工作线程池,并且是实现的使用一个或多个ExecutorServices.)的初始化,selectorThread的线程的启动,acceptThread(接收线程)的启动,expirerThread(过期线程)的启动

标签: 学习 zookeeper

本文转载自: https://blog.csdn.net/u013127325/article/details/135634735
版权归原作者 小园子的小菜 所有, 如有侵权,请联系我们删除。

“菜鸡学习zookeeper源码(三)NIOServer的启动”的评论:

还没有评论