上一篇写到了QuorumPeer的start方法,里面主要进行执行了loadDataBase方法(进行加载本地的数据信息,具体是怎么进行加载的,没在文章中进行说明,这块小园子也没看,等分析完整体的启动流程之后在进行分析), 这篇文章的话主要写startServerCnxnFactory方法,在上一篇文章中也进行说明,这个方法主要进行了启动了两个ServerCnxnFactory对象,一个是安全的,一个是不安全的,里面的默认实现都是NIOServerCnxnFactory
在类上的说明我们可以看出来这个这个是通过nio非阻塞式socket进行连接的,线程之间的通信是通过队列来进行处理,它里面主要有1个接收线程来进行接收新的连接,并将新的连接给selector线程,selector线程数量是1-N个,通过工厂进行创建多个selector线程来进行支持大量的连接,当连接很多的时候,这块可能会成一个瓶颈,0-M个socket I/O worker 线程来进行I/O线程的读写操作,还有1个过期的线程来进行关闭空闲的连接信息。这个zookeeper的官方在这个类的上面给了一个示例:在32核机器上,1个接受线程,1个连接过期线程、4个selector线程和64个socket I/O worker。原文注释如下:
* NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using
* NIO non-blocking socket calls. Communication between threads is handled via
* queues.
* - 1 accept thread, which accepts new connections and assigns to a
* selector thread
* - 1-N selector threads, each of which selects on 1/N of the connections.
* The reason the factory supports more than one selector thread is that
* with large numbers of connections, select() itself can become a
* performance bottleneck.
* - 0-M socket I/O worker threads, which perform basic socket reads and
* writes. If configured with 0 worker threads, the selector threads
* do the socket I/O directly.
* - 1 connection expiration thread, which closes idle connections; this is
* necessary to expire connections on which no session is established.
* Typical (default) thread counts are: on a 32 core machine, 1 accept thread,
* 1 connection expiration thread, 4 selector threads, and 64 worker threads.
* Construct a new server connection factory which will accept an unlimited number
* of concurrent connections from each client (up to the file descriptor
* limits of the operating system). startup(zks) must be called subsequently.
public NIOServerCnxnFactory() {
在 runFromConfig方法解析中,可以看到都进行调用了NIOServerCnxnFactory#configure方法
public void configure(InetSocketAddress addr, int maxcc, int backlog, boolean secure) throws IOException {
if (secure) {
throw new UnsupportedOperationException("SSL isn't supported in NIOServerCnxn");
maxClientCnxns = maxcc;
sessionlessCnxnTimeout = Integer.getInteger(ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
// We also use the sessionlessCnxnTimeout as expiring interval for
// cnxnExpiryQueue. These don't need to be the same, but the expiring
// interval passed into the ExpiryQueue() constructor below should be
// less than or equal to the timeout.
cnxnExpiryQueue = new ExpiryQueue<>(sessionlessCnxnTimeout);
expirerThread = new ConnectionExpirerThread();
int numCores = Runtime.getRuntime().availableProcessors();
// 32 cores sweet spot seems to be 4 selector threads
numSelectorThreads = Integer.getInteger(
Math.max((int) Math.sqrt((float) numCores / 2), 1));
if (numSelectorThreads < 1) {
throw new IOException("numSelectorThreads must be at least 1");
numWorkerThreads = Integer.getInteger(ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
workerShutdownTimeoutMS = Long.getLong(ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
String logMsg = "Configuring NIO connection handler with "
+ (sessionlessCnxnTimeout / 1000) + "s sessionless connection timeout, "
+ numSelectorThreads + " selector thread(s), "
+ (numWorkerThreads > 0 ? numWorkerThreads : "no") + " worker threads, and "
+ (directBufferBytes == 0 ? "gathered writes." : ("" + (directBufferBytes / 1024) + " kB direct buffers."));
for (int i = 0; i < numSelectorThreads; ++i) {
selectorThreads.add(new SelectorThread(i));
listenBacklog = backlog;
//从这往下就是常见的nio socket编程的
this.ss = ServerSocketChannel.open();
LOG.info("binding to port {}", addr);
if (listenBacklog == -1) {
} else {
ss.socket().bind(addr, listenBacklog);
if (addr.getPort() == 0) {
// We're likely bound to a different port than was requested, so log that too
LOG.info("bound to port {}", ss.getLocalAddress());
acceptThread = new AcceptThread(ss, addr, selectorThreads);
public void start() {
stopped = false;
if (workerPool == null) {
workerPool = new WorkerService("NIOWorker", numWorkerThreads, false);
for (SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
// ensure thread is started once and only once
if (acceptThread.getState() == Thread.State.NEW) {
if (expirerThread.getState() == Thread.State.NEW) {
版权归原作者 小园子的小菜 所有, 如有侵权,请联系我们删除。