说明
- 本文基于 kafka 2.7 编写。
- @author JellyfishMIX - github / blog.jellyfishmix.com
- LICENSE GPL-2.0
java NIO 组件
几个 java NIO 的组件。
- Buffer: 缓冲区。这是一个接口,kafka 用它的 ByteBuffer 实现类,配合 SocketChannel 实现读写操作。读的时候,调用 channel#read(buffer) 把 SocketChannel 的数据读到 ByteBuffer 内。写的时候,调用 channel.write(buffer) 把 Buffer 中的数据写到 SocketChannel 内。
- SocketChannel: 网络连接通道, byte 数据的读写都发生在这个通道上,包括从通道中读出数据, 将数据写入通道。
- SelectionKey: 选择键。每个 SocketChannel 向 Selector 注册标识时,都会创建一个 SelectionKey。SelectionKey 里可以定义 Selector 监听 SocketChannel 的事件,包括连接、读、写事件(SelectionKey#OP_CONNECT, OP_READ, OP_WRITE)。
- Selector: 选择器,用来监听注册的 SelectionKey 关注的事件。
本文涉及 java NIO 相关内容, 推荐先阅读 ByteBuffer 相关内容。
kafka 对 java NIO 组件的封装
- Selector(Kafka 自己的 Selector 类): 对 NIO 中 Selector 的封装。
- TransportLayer: 对 NIO 中 SocketChannel 的封装。TransportLayer 是一个接口, 实现类有 PlaintextTransportLayer 和 SslTransportLayer,其中,PlaintextTransportLayer 是明文传输的实现,SslTransportLayer 是 SSL 加密传输的实现。本文只涉及 PlaintextTransportLayer。
- NetworkReceive: 对 NIO 中读 Buffer 的封装,用来缓存接收的数据。
- NetworkSend: 对 NIO 中写 Buffer 的封装,用来缓存发送的数据。
- KafkaChannel: 把 TransportLayer, NetworkReceive 和 NetworkSend 又做了一次封装,隐藏了底层组件的细节。
- Kafka 对 NIO 中的 SelectionKey 没有封装,直接使用。
kafka 封装的 NIO 组件关系
- Selector 监听到客户端的读写事件后,会获取绑定在 SelectionKey 上的 KafkaChannel。
- KafkaChannel 会调用 TransportLayer 进行读写操作, TransportLayer 会调用 SocketChannel 进行读写操作, 完成数据的发送。数据的接收流程类似。
TransportLayer
TransportLayer 是对 NIO 中 SocketChannel 的封装。它的实现类有 2 个:
- PlaintextTransportLayer, 明文传输的实现。
- SslTransportLayer 类, SSL 加密传输的实现。
本文只涉及 PlaintextTransportLayer。
PlaintextTransportLayer
PlaintextTransportLayer#finishConnect 方法 – 完成网络连接
org.apache.kafka.common.network.PlaintextTransportLayer#finishConnect
- 调用 SocketChannel#finishConnect 方法,返回连接是否已经建立。
- 如果连接已经建立,则取消对连接事件的监听,增加对读事件的监听。
@OverridepublicbooleanfinishConnect()throwsIOException{// 调用 SocketChannel#finishConnect 方法,返回连接是否已经建立boolean connected = socketChannel.finishConnect();// 如果连接已经建立,则取消对连接事件的监听,增加对读事件的监听if(connected)
key.interestOps(key.interestOps()&~SelectionKey.OP_CONNECT |SelectionKey.OP_READ);return connected;}
PlaintextTransportLayer#read 方法 – 读取数据
org.apache.kafka.common.network.PlaintextTransportLayer#read(java.nio.ByteBuffer)
把 SocketChannel 中的数据读取到 ByteBuffer 中。
@Overridepublicintread(ByteBuffer dst)throwsIOException{// 把 SocketChannel 中的数据读取到 ByteBuffer 中return socketChannel.read(dst);}
PlaintextTransportLayer#write – 写入数据
org.apache.kafka.common.network.PlaintextTransportLayer#write(java.nio.ByteBuffer)
把 ByteBuffer 中的数据写入到 SocketChannel 中。
@Overridepublicintwrite(ByteBuffer src)throwsIOException{// 把 ByteBuffer 中的数据写入到 SocketChannel 中return socketChannel.write(src);}
NetworkReceive
- java NIO 一次读写不一定读写完数据,这样需要判断读写是否完成,没有读写完的数据需要继续执行读写操作。
- 这样的操作较为繁琐,对调用方不友好。于是 kafka 把 ByteBuffer 进行了封装,用于读的 Buffer 封装成 NetworkReceive, 用于写的 Buffer 封装成 NetworkSend。
NetworkReceive 的属性
/**
* channelId
*/privatefinalString source;/**
* size 是固定大小的 4 byte ByteBuffer, kafka 传输数据时, 约定把要传输数据的长度放在最开头 4 byte, size 只用来接收这 4 byte 的长度信息
*/privatefinalByteBuffer size;/**
* 能接收的最大消息
*/privatefinalint maxSize;/**
* 内存池
*/privatefinalMemoryPool memoryPool;/**
* 记录真正数据内容长度信息大小
*/privateint requestedBufferSize =-1;/**
* buffer 用来承载真正的数据内容, 即 4 byte 长度数据后的内容
*/privateByteBuffer buffer;
NetworkReceive#readFrom 方法 – 把 channel 中的数据读到 ByteBuffer 中
org.apache.kafka.common.network.NetworkReceive#readFrom
- 注意 size 的作用, size 是固定大小的 4 byte ByteBuffer, kafka 传输数据时, 约定把要传输数据的长度放在最开头 4 byte, size 只用来接收这 4 byte 的长度信息。
- 判断 size 是否有剩余空间, 有剩余空间则从 channel 中读取数据至 size 中。 1. 如果从 channel 中读取数据后, size 没有剩余空间了, 说明长度信息读取完了(因为长度信息总共只占 4 byte, 读取后刚好把 size 占满)。2. 前 4 个 byte 存放了数据的长度, 以 int 类型获取。3. 针对本次通过 channel 传输数据的长度做校验。
- 从 channel 中读取真正的数据内容, 即 4 byte 长度数据后的内容, buffer 用来承载真正的数据内容。 1. 给 buffer 分配 size 中记录的长度信息大小的内存空间。2. 把 channel 中的数据读到 buffer 中。维护读取的字节大小数。
/**
* 把 channel 中的数据读到 ByteBuffer 中
*/publiclongreadFrom(ScatteringByteChannel channel)throwsIOException{// 维护读取的字节大小数int read =0;// 注意 size 的作用, size 是固定大小的 4 byte ByteBuffer, kafka 传输数据时, 约定把要传输数据的长度放在最开头 4 byte, size 只用来接收这 4 byte 的长度信息// 判断 size 是否有剩余空间, 有剩余空间则从 channel 中读取数据至 size 中if(size.hasRemaining()){int bytesRead = channel.read(size);if(bytesRead <0)thrownewEOFException();// 维护读取的字节大小数
read += bytesRead;// 如果从 channel 中读取数据后, size 没有剩余空间了, 说明长度信息读取完了(因为长度信息总共只占 4 byte, 读取后刚好把 size 占满)if(!size.hasRemaining()){// ByteBuffer#position 置 0, 从头开始读取
size.rewind();// 前 4 个 byte 存放了数据的长度, 以 int 类型获取int receiveSize = size.getInt();// 针对本次通过 channel 传输数据的长度做校验if(receiveSize <0)thrownewInvalidReceiveException("Invalid receive (size = "+ receiveSize +")");if(maxSize != UNLIMITED && receiveSize > maxSize)thrownewInvalidReceiveException("Invalid receive (size = "+ receiveSize +" larger than "+ maxSize +")");
requestedBufferSize = receiveSize;//may be 0 for some payloads (SASL)if(receiveSize ==0){
buffer = EMPTY_BUFFER;}}}// 下面要从 channel 中读取真正的数据内容, 即 4 byte 长度数据后的内容, buffer 用来承载真正的数据内容// 给 buffer 分配 size 中记录的长度信息大小的内存空间if(buffer ==null&& requestedBufferSize !=-1){//we know the size we want but havent been able to allocate it yet
buffer = memoryPool.tryAllocate(requestedBufferSize);if(buffer ==null)
log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);}// 把 channel 中的数据读到 buffer 中if(buffer !=null){int bytesRead = channel.read(buffer);if(bytesRead <0)thrownewEOFException();// 维护读取的字节大小数
read += bytesRead;}return read;}
NetworkSend
层次关系
NetworkSend extends ByteBufferSend, ByteBufferSend implements Send
ByteBufferSend#writeTo 方法 – 把 ByteBuffer 中的数据写入 SocketChannel
org.apache.kafka.common.network.ByteBufferSend#writeTo
- 把 ByteBuffer 中的数据写入 SocketChannel, 返回写入的字节数。
- 维护还剩多少字节没有写进 SocketChannel。
/**
* 把 ByteBuffer 中的数据写入 SocketChannel
*/@OverridepubliclongwriteTo(GatheringByteChannel channel)throwsIOException{// 把 ByteBuffer 中的数据写入 SocketChannel, 返回写入的字节数long written = channel.write(buffers);if(written <0)thrownewEOFException("Wrote negative bytes to channel. This shouldn't happen.");// 维护还剩多少字节没有写进 SocketChannel
remaining -= written;
pending =TransportLayers.hasPendingWrites(channel);return written;}
NetworkSend#sizeBuffer 方法 – 分配 4 个字节的 sizeBuffer
org.apache.kafka.common.network.NetworkSend#sizeBuffer
分配 4 个字节的 sizeBuffer, 用来存储要发送的数据长度
/**
* 分配 4 个字节的 sizeBuffer, 用来存储要发送的数据长度
*/privatestaticByteBuffersizeBuffer(int size){ByteBuffer sizeBuffer =ByteBuffer.allocate(4);
sizeBuffer.putInt(size);
sizeBuffer.rewind();return sizeBuffer;}
KafkaChannel
org.apache.kafka.common.network.KafkaChannel
KafkaChannel#setSend 方法-- 正式发送请求前设置 NetworkSend
org.apache.kafka.common.network.KafkaChannel#setSend
正式发送请求前设置 NetworkSend(用于发送的 byteBuffer), 并让 SelectionKey 关注写事件。
/**
* 正式发送请求前设置 NetworkSend(用于发送的 byteBuffer), 并让 SelectionKey 关注写事件
*/publicvoidsetSend(Send send){if(this.send !=null)thrownewIllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is "+ id);this.send = send;// SelectionKey 关注写事件this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);}
KafkaChannel#write – 发送数据
org.apache.kafka.common.network.KafkaChannel#write
把 NetworkSend 中的数据写入 SocketChannel。
/**
* 把 NetworkSend 中的数据发送出去
*/publiclongwrite()throwsIOException{if(send ==null)return0;
midWrite =true;// 把 NetworkSend 中的数据写入 SocketChannelreturn send.writeTo(transportLayer);}
KafkaChannel#read 方法 – 读取数据
org.apache.kafka.common.network.KafkaChannel#read
- 把 SocketChannel 中的数据读取到 NetworkReceive 中。
- 判断是否读完的条件是 NetworkReceive 里的 size 和 buffer 是否用完, 因为 NetworkReceive 的 size 和 buffer 两个 byteBuffer 的大小,正好是 SocketChannel 中接收到数据的大小。
/**
* 把 SocketChannel 中的数据读取到 NetworkReceive 中
*/publiclongread()throwsIOException{if(receive ==null){
receive =newNetworkReceive(maxReceiveSize, id, memoryPool);}// 把 SocketChannel 中的数据读取到 NetworkReceive 中, 返回读取信息的字节数long bytesReceived =receive(this.receive);// 判断是否读完的条件是 NetworkReceive 里的 size 和 buffer 是否用完, 因为 NetworkReceive 的 size 和 buffer 两个 byteBuffer 的大小,正好是 SocketChannel 中接收到数据的大小if(this.receive.requiredMemoryAmountKnown()&&!this.receive.memoryAllocated()&&isInMutableState()){//pool must be out of memory, mute ourselves.mute();}return bytesReceived;}
Selector
Selector#connect – 建立连接
org.apache.kafka.common.network.Selector#connect
- 验证。
- 创建并配置 SocketChannel。 1. 包括配置非阻塞模式, 设置长连接, 设置 SO_SNDBUF 和 SO_RCVBUF 的大小。SO_SNDBUF、SO_RCVBUF 表示发送和接收数据缓存的大小。2. 建立一个连接,由于是非阻塞建立连接,方法会直接返回,不一定连接建立完毕。后面会通过 Selector#finishConnect 方法, 连接并确认是否连接成功。3. 将上面创建的 SocketChannel 注册到 nioSelector 上,关注 OP_CONNECT 事件。
@Overridepublicvoidconnect(String id,InetSocketAddress address,int sendBufferSize,int receiveBufferSize)throwsIOException{// 验证ensureNotRegistered(id);// 创建 SocketChannelSocketChannel socketChannel =SocketChannel.open();SelectionKey key =null;try{// 配置 SocketChannelconfigureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);// 建立一个连接,由于是非阻塞建立连接,方法会直接返回,不一定连接建立完毕// 后面会通过 Selector#finishConnect 方法, 连接并确认是否连接成功boolean connected =doConnect(socketChannel, address);// 将上面创建的 SocketChannel 注册到 nioSelector 上,关注 OP_CONNECT 事件
key =registerChannel(id, socketChannel,SelectionKey.OP_CONNECT);// 如果已经连接成功了,则取消对OP_CONNECT的监听if(connected){// OP_CONNECT won't trigger for immediately connected channels
log.debug("Immediately connected to node {}", id);
immediatelyConnectedKeys.add(key);
key.interestOps(0);}}catch(IOException|RuntimeException e){if(key !=null)
immediatelyConnectedKeys.remove(key);
channels.remove(id);
socketChannel.close();throw e;}}
Selector#send – 将 Send 设置到 KafkaChannel 中
org.apache.kafka.common.network.Selector#send
- 获取 channelId 作为 connectionId, 获取连接。
- 把 send 放入 KafkaChannel 里,并让 SelectionKey 关注写事件。
/**
* 将 Send 设置到 KafkaChannel 的 send 字段中,并让 SelectionKey 关注写事件
*/publicvoidsend(Send send){// 获取 channelId 作为 connectionIdString connectionId = send.destination();// 获取连接KafkaChannel channel =openOrClosingChannelOrFail(connectionId);// 如果连接是关闭的,就把 connectionId 放到 closingChannels 集合里if(closingChannels.containsKey(connectionId)){// ensure notification via `disconnected`, leave channel in the state in which closing was triggeredthis.failedSends.add(connectionId);}else{try{// 把 send 放入 KafkaChannel 里,并让 SelectionKey 关注写事件
channel.setSend(send);}catch(Exception e){// update the state for consistency, the channel will be discarded after `close`// 异常处理
channel.state(ChannelState.FAILED_SEND);// ensure notification via `disconnected` when `failedSends` are processed in the next pollthis.failedSends.add(connectionId);close(channel,CloseMode.DISCARD_NO_NOTIFY);if(!(e instanceofCancelledKeyException)){
log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}",
connectionId, e);throw e;}}}}
Selector#write 方法 – 调用 KafkaChannel 执行写操作
org.apache.kafka.common.network.Selector#write
- 获取 KafkaChannel 对应的 nodeId。
- 把 NetworkSend 中的数据发送出去。
- 如果发送完成,则返回 send,并取消 SelectionKey 对写事件的关注。
/**
* 调用 KafkaChannel 执行写操作
*/// package-private for testingvoidwrite(KafkaChannel channel)throwsIOException{// 获取 KafkaChannel 对应的 nodeIdString nodeId = channel.id();// 把 NetworkSend 中的数据发送出去long bytesSent = channel.write();// 如果发送完成,则返回 send,并取消 SelectionKey 对写事件的关注Send send = channel.maybeCompleteSend();// We may complete the send with bytesSent < 1 if `TransportLayer.hasPendingWrites` was true and `channel.write()`// caused the pending writes to be written to the socket channel bufferif(bytesSent >0|| send !=null){long currentTimeMs = time.milliseconds();if(bytesSent >0)this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);if(send !=null){this.completedSends.add(send);this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);}}}
Selector#attemptWrite 方法 – 尝试调用 KafkaChannel 执行写操作
org.apache.kafka.common.network.Selector#attemptWrite
尝试调用 KafkaChannel 执行写操作,需满足如下条件:
- send 不为空。
- KafkaChannel 连接正常。
- SelectionKey 是可写状态。
- 客户端验证没有开启。
/**
* 尝试调用 KafkaChannel 执行写操作
*/privatevoidattemptWrite(SelectionKey key,KafkaChannel channel,long nowNanos)throwsIOException{/*
* 1. send 不为空
* 2. KafkaChannel 连接正常
* 3. SelectionKey 是可写状态
* 4. 客户端验证没有开启
*/if(channel.hasSend()&& channel.ready()&& key.isWritable()&&!channel.maybeBeginClientReauthentication(()-> nowNanos)){write(channel);}}
Selector#attemptRead 方法 – 尝试调用 kafkaChannel 执行读操作
org.apache.kafka.common.network.Selector#attemptRead
- 调用 kafkaChannel 执行读操作, 返回读取的字节数。
- 如果当前 NetworkReceive 读取满了(说明本次请求完整接收了),则将其置空,下次读操作时会创建新的 NetworkReceive 对象。
- 读完的 NetworkReceive 加入 completedReceives 队列中。
/**
* 尝试调用 kafkaChannel 执行读操作
*/privatevoidattemptRead(KafkaChannel channel)throwsIOException{String nodeId = channel.id();// 调用 kafkaChannel 执行读操作, 返回读取的字节数long bytesReceived = channel.read();if(bytesReceived !=0){long currentTimeMs = time.milliseconds();
sensors.recordBytesReceived(nodeId, bytesReceived, currentTimeMs);
madeReadProgressLastPoll =true;// 如果当前 NetworkReceive 读取满了(说明本次请求完整接收了),则将其置空,下次读操作时会创建新的 NetworkReceive 对象NetworkReceive receive = channel.maybeCompleteReceive();if(receive !=null){// 读完的 NetworkReceive 加入 completedReceives 队列中addToCompletedReceives(channel, receive, currentTimeMs);}}if(channel.isMuted()){
outOfMemory =true;//channel has muted itself due to memory pressure.}else{
madeReadProgressLastPoll =true;}}
Selector#poll 方法 – 获取监听的网络 IO 事件并处理
org.apache.kafka.common.network.Selector#poll
- 将上一次 poll 方法的结果全部清除掉。
- nioSelector 线程 selectNow 非阻塞或 select 阻塞地获取 IO 事件。
- 监听到 IO 事件, 或立即连接的集合不为空,或有数据在缓存中,则进行处理。 1. 获取有 IO 事件的 SelectionKey 集合。2. 调用处理有 IO 事件的 SelectionKey。3. 处理立即连接的 SelectionKey。
@Overridepublicvoidpoll(long timeout)throwsIOException{if(timeout <0)thrownewIllegalArgumentException("timeout should be >= 0");boolean madeReadProgressLastCall = madeReadProgressLastPoll;// 将上一次 poll 方法的结果全部清除掉clear();boolean dataInBuffers =!keysWithBufferedRead.isEmpty();if(!immediatelyConnectedKeys.isEmpty()||(madeReadProgressLastCall && dataInBuffers))
timeout =0;if(!memoryPool.isOutOfMemory()&& outOfMemory){//we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons
log.trace("Broker no longer low on memory - unmuting incoming sockets");for(KafkaChannel channel : channels.values()){if(channel.isInMutableState()&&!explicitlyMutedChannels.contains(channel)){
channel.maybeUnmute();}}
outOfMemory =false;}/* check ready keys */long startSelect = time.nanoseconds();// nioSelector 线程 selectNow 非阻塞或 select 阻塞地获取 IO 事件int numReadyKeys =select(timeout);long endSelect = time.nanoseconds();this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());// 监听到 IO 事件, 或立即连接的集合不为空,或有数据在缓存中if(numReadyKeys >0||!immediatelyConnectedKeys.isEmpty()|| dataInBuffers){// 获取有 IO 事件的 SelectionKey 集合Set<SelectionKey> readyKeys =this.nioSelector.selectedKeys();// Poll from channels that have buffered data (but nothing more from the underlying socket)if(dataInBuffers){
keysWithBufferedRead.removeAll(readyKeys);//so no channel gets polled twiceSet<SelectionKey> toPoll = keysWithBufferedRead;
keysWithBufferedRead =newHashSet<>();//poll() calls will repopulate if neededpollSelectionKeys(toPoll,false, endSelect);}// Poll from channels where the underlying socket has more data// 处理有 IO 事件的 SelectionKeypollSelectionKeys(readyKeys,false, endSelect);// Clear all selected keys so that they are included in the ready count for the next select
readyKeys.clear();// 处理立即连接的 SelectionKeypollSelectionKeys(immediatelyConnectedKeys,true, endSelect);
immediatelyConnectedKeys.clear();}else{
madeReadProgressLastPoll =true;//no work is also "progress"}long endIo = time.nanoseconds();this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());// Close channels that were delayed and are now ready to be closedcompleteDelayedChannelClose(endIo);// we use the time at the end of select to ensure that we don't close any connections that// have just been processed in pollSelectionKeysmaybeCloseOldestConnection(endSelect);}
Selector#pollSelectionKeys 方法 – 处理监听到的 IO 事件
org.apache.kafka.common.network.Selector#pollSelectionKeys
具体处理监听到的 IO 事件,包括连接事件, 读事件和写事件,处理立即完成的连接。
- 遍历有 IO 事件的 SelectionKey。
- 判断连接是否建立好了, 如果连接已经建立,则取消对连接事件的监听,增加对读事件的监听。 1. 连接尚未建立, 跳过当前 SelectionKey。
- 维护 KafkaChannel 的状态。
- 处理读事件()和写事件。
voidpollSelectionKeys(Set<SelectionKey> selectionKeys,boolean isImmediatelyConnected,long currentTimeNanos){// 遍历有 IO 事件的 SelectionKeyfor(SelectionKey key :determineHandlingOrder(selectionKeys)){KafkaChannel channel =channel(key);long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds():0;boolean sendFailed =false;String nodeId = channel.id();// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(nodeId);if(idleExpiryManager !=null)
idleExpiryManager.update(nodeId, currentTimeNanos);try{/* complete any connections that have finished their handshake (either normally or immediately) */// 判断连接是否建立好了, 如果连接已经建立,则取消对连接事件的监听,增加对读事件的监听if(isImmediatelyConnected || key.isConnectable()){if(channel.finishConnect()){this.connected.add(nodeId);this.sensors.connectionCreated.record();SocketChannel socketChannel =(SocketChannel) key.channel();
log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
socketChannel.socket().getReceiveBufferSize(),
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
nodeId);}else{// 连接尚未建立, 跳过当前 SelectionKeycontinue;}}/* if channel is not ready finish prepare */if(channel.isConnected()&&!channel.ready()){
channel.prepare();if(channel.ready()){long readyTimeMs = time.milliseconds();boolean isReauthentication = channel.successfulAuthentications()>1;if(isReauthentication){
sensors.successfulReauthentication.record(1.0, readyTimeMs);if(channel.reauthenticationLatencyMs()==null)
log.warn("Should never happen: re-authentication latency for a re-authenticated channel was null; continuing...");else
sensors.reauthenticationLatency
.record(channel.reauthenticationLatencyMs().doubleValue(), readyTimeMs);}else{
sensors.successfulAuthentication.record(1.0, readyTimeMs);if(!channel.connectedClientSupportsReauthentication())
sensors.successfulAuthenticationNoReauth.record(1.0, readyTimeMs);}
log.debug("Successfully {}authenticated with {}", isReauthentication ?"re-":"", channel.socketDescription());}}// 维护 KafkaChannel 的状态if(channel.ready()&& channel.state()==ChannelState.NOT_CONNECTED)
channel.state(ChannelState.READY);Optional<NetworkReceive> responseReceivedDuringReauthentication = channel.pollResponseReceivedDuringReauthentication();
responseReceivedDuringReauthentication.ifPresent(receive ->{long currentTimeMs = time.milliseconds();addToCompletedReceives(channel, receive, currentTimeMs);});//if channel is ready and has bytes to read from socket or buffer, and has no//previous completed receive then read from itif(channel.ready()&&(key.isReadable()|| channel.hasBytesBuffered())&&!hasCompletedReceive(channel)&&!explicitlyMutedChannels.contains(channel)){// 处理读事件attemptRead(channel);}if(channel.hasBytesBuffered()){
keysWithBufferedRead.add(key);}/* if channel is ready write to any sockets that have space in their buffer and for which we have data */long nowNanos = channelStartTimeNanos !=0? channelStartTimeNanos : currentTimeNanos;try{// 处理写事件attemptWrite(key, channel, nowNanos);}catch(Exception e){
sendFailed =true;throw e;}/* cancel any defunct sockets */if(!key.isValid())close(channel,CloseMode.GRACEFUL);}catch(Exception e){// ...}finally{maybeRecordTimePerConnection(channel, channelStartTimeNanos);}}}
版权归原作者 JellyfishMIX 所有, 如有侵权,请联系我们删除。