前言
上一篇写到了QuorumPeer的start方法,里面主要进行执行了loadDataBase方法(进行加载本地的数据信息,具体是怎么进行加载的,没在文章中进行说明,这块小园子也没看,等分析完整体的启动流程之后在进行分析), 这篇文章的话主要写startServerCnxnFactory方法,在上一篇文章中也进行说明,这个方法主要进行了启动了两个ServerCnxnFactory对象,一个是安全的,一个是不安全的,里面的默认实现都是NIOServerCnxnFactory
NIOServerCnxnFactory
这个还是老的习惯,这个类上有很多注释说明,可以先看下类的注释说明,这种开源的框架一般都会在类的说明上进行说明这个类是干什么的
在类上的说明我们可以看出来这个这个是通过nio非阻塞式socket进行连接的,线程之间的通信是通过队列来进行处理,它里面主要有1个接收线程来进行接收新的连接,并将新的连接给selector线程,selector线程数量是1-N个,通过工厂进行创建多个selector线程来进行支持大量的连接,当连接很多的时候,这块可能会成一个瓶颈,0-M个socket I/O worker 线程来进行I/O线程的读写操作,还有1个过期的线程来进行关闭空闲的连接信息。这个zookeeper的官方在这个类的上面给了一个示例:在32核机器上,1个接受线程,1个连接过期线程、4个selector线程和64个socket I/O worker。原文注释如下:
/**
* NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using
* NIO non-blocking socket calls. Communication between threads is handled via
* queues.
*
* - 1 accept thread, which accepts new connections and assigns to a
* selector thread
* - 1-N selector threads, each of which selects on 1/N of the connections.
* The reason the factory supports more than one selector thread is that
* with large numbers of connections, select() itself can become a
* performance bottleneck.
* - 0-M socket I/O worker threads, which perform basic socket reads and
* writes. If configured with 0 worker threads, the selector threads
* do the socket I/O directly.
* - 1 connection expiration thread, which closes idle connections; this is
* necessary to expire connections on which no session is established.
*
* Typical (default) thread counts are: on a 32 core machine, 1 accept thread,
* 1 connection expiration thread, 4 selector threads, and 64 worker threads.
*/
我们先看下构造函数
/**
* Construct a new server connection factory which will accept an unlimited number
* of concurrent connections from each client (up to the file descriptor
* limits of the operating system). startup(zks) must be called subsequently.
*/
public NIOServerCnxnFactory() {
}
在构造函数中我们可以看到一些注释,上面注释的大致意思是:这个NIOServer的连接工厂可以接收来自每个客户端的无限数量的并发连接(最多为文件描述符操作系统的限制)。我们可以后面看下怎么进行支持无限数量的并发连接。
在 runFromConfig方法解析中,可以看到都进行调用了NIOServerCnxnFactory#configure方法
NIOServerCnxnFactory#configure
public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
if (secure) {
throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
}
configureSaslLogin();
//设置最大的连接数
maxClientCnxns = maxcc;
initMaxCnxns();
sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
// We also use the sessionlessCnxnTimeout as expiring interval for
// cnxnExpiryQueue. These don't need to be the same, but the expiring
// interval passed into the ExpiryQueue() constructor below should be
// less than or equal to the timeout.
//设置过期的队列以及对过期线程进行初始化
cnxnExpiryQueue = new ExpiryQueue<>(sessionlessCnxnTimeout);
expirerThread = new ConnectionExpirerThread();
//获取本地的CPU的核数
int numCores = Runtime.getRuntime().availableProcessors();
// 32 cores sweet spot seems to be 4 selector threads
//selector线程的数量
numSelectorThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
Math.max((int) Math.sqrt((float) numCores / 2), 1));
if (numSelectorThreads < 1) {
throw new IOException("numSelectorThreads must be at least 1");
}
//工作线程的数量
numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
String logMsg = "Configuring NIO connection handler with "
+ (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, "
+ numSelectorThreads + " selector thread(s), "
+ (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and "
+ (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers."));
LOG.info(logMsg);
for (int i = 0; i < numSelectorThreads; ++i) {
selectorThreads.add(new SelectorThread(i));
}
listenBacklog = backlog;
//从这往下就是常见的nio socket编程的
//绑定端口号
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port {}", addr);
if (listenBacklog == -1) {
ss.socket().bind(addr);
} else {
ss.socket().bind(addr, listenBacklog);
}
if (addr.getPort() == 0) {
// We're likely bound to a different port than was requested, so log that too
LOG.info("bound to port {}", ss.getLocalAddress());
}
ss.configureBlocking(false);
//初始化一个接收线程
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}
这个configure方法,大体逻辑上是针对NIOServerCnxnFactory里面的一些属性进行一些赋值操作,会针对maxClientCnxns(最大连接数),expirerThread(过期线程)初始化,numSelectorThreads(selector线程的数量,会进行获取CPU的核数,根据核数进行计算selector线程数),numWorkerThreads(工作的线程数)以及常见的serverScoketChannel的初始化,最后在进行初始化一个接收线程。
NIOServerCnxnFactory#start
public void start() {
stopped = false;
if (workerPool == null) {
workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
}
for (SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
// ensure thread is started once and only once
if (acceptThread.getState() == Thread.State.NEW) {
acceptThread.start();
}
if (expirerThread.getState() == Thread.State.NEW) {
expirerThread.start();
}
}
NIOServerCnxnFactory的start方法这个代码行数比较少,一眼看过去,主要进行了三种操作,WorkService(WorkerService是用于运行任务的工作线程池,并且是实现的使用一个或多个ExecutorServices.)的初始化,selectorThread的线程的启动,acceptThread(接收线程)的启动,expirerThread(过期线程)的启动
版权归原作者 小园子的小菜 所有, 如有侵权,请联系我们删除。