🔥《Kafka运维管控平台LogiKM》🔥
✏️更强大的管控能力✏️ 🎾更高效的问题定位能力🎾 🌅更便捷的集群运维能力🌅 🎼更专业的资源治理🎼 🌞更友好的运维生态🌞
文章目录
前面我们有讲解Kafka的网络通信模型 , 但是那篇文章主要讲的是 作为服务端是如何处理的。
那么,今天我们再来讲一讲 客户端是如何发起请求的。
带着几个问题思考一下
- 如何发起Request请求
- 如果配置了多个listeners,如何正确的选择listener发起请求?
- Controller2Broker、Broker2Broker、Client2Broker 的区别是什么?
构建Request并发起请求
关键类
客户端发起请求的几个关键类
NetworkSend
该类继承自
ByteBufferSend
, 超类是 Send,有以下几个接口
Stringdestination();booleancompleted();longwriteTo(GatheringByteChannel channel)throwsIOException;longsize();
它的作用主要是用来缓存待发送的数据的,
writeTo
方法会把缓存的数据写入到入参的通道里面。
例如
ByteBufferSend
,的写入方法如下。
@OverridepubliclongwriteTo(GatheringByteChannel channel)throwsIOException{long written = channel.write(buffers);if(written <0)thrownewEOFException("Wrote negative bytes to channel. This shouldn't happen.");
remaining -= written;
pending =TransportLayers.hasPendingWrites(channel);return written;}
Send接口,还有很多其他的实现类。
NetworkClientUtils
客户端的工具类, 只要构建好了NetworkClient,就可以用这个工具类发送请求。
NetworkClient
用于异步请求/响应网络 i/o 的网络客户端。这是一个内部类,用于实现面向用户的生产者和消费者客户端。
这个类不是线程安全的!
NetworkClient的一些关键属性
/* 用于执行网络 io 的选择器 */privatefinal Selectable selector;/* Metadata元信息的更新器, 他可以尝试更新元信息 */privatefinal MetadataUpdater metadataUpdater;/* 每个节点的连接状态 */privatefinal ClusterConnectionStates connectionStates;/* 当前正在发送或等待响应的一组请求 */privatefinal InFlightRequests inFlightRequests;/* 套接字发送缓冲区大小(以字节为单位) */privatefinal int socketSendBuffer;/* 套接字接收大小缓冲区(以字节为单位) */privatefinal int socketReceiveBuffer;/* 用于在对服务器的请求中识别此客户端的客户端 ID */privatefinalString clientId;/* 向服务器发送请求时使用的当前关联 ID*/private int correlation;/* 单个请求等待服务器确认的默认超时*/privatefinal int defaultRequestTimeoutMs;//.... 省略
这里构建NetworkClient涉及到的Broker配置有:
属性描述默认request.timeout.ms配置控制客户端等待请求响应的最长时间。如果在超时之前没有收到响应,客户端将在必要时重新发送请求,或者如果重试次数用尽,则请求失败。这应该大于replica.lag.time.max.ms(代理配置),以减少由于不必要的生产者重试而导致消息重复的可能性。30000(30 秒)socket.connection.setup.timeout.ms客户端等待套接字连接建立的时间。如果在超时之前没有建立连接,客户端将关闭套接字通道。10000(10 秒)socket.connection.setup.timeout.max.ms客户端等待建立套接字连接的最长时间。对于每个连续的连接失败,连接设置超时将成倍增加,直至达到此最大值。为避免连接风暴,将对超时应用 0.2 的随机化因子,从而产生低于计算值 20% 到高于 20% 的随机范围。127000(127 秒)
元信息更新
请看
发送请求
// 根据拿到的BrokerNode,和RequestBuilder构建 Request请求val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,
time.milliseconds(),true)// 发起请求并接受Response
clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
主要的发送请求逻辑就是上面的关键代码, 先构建clientRequest请求,然后用NetworkClientUtils发送请求。
具体代码就不贴出来了, 简要概述一下整个流程吧
- 创建networkclient还有clientRequest, 注意brokerNode是具体Broker的EndPoint,一个Broker可能有多个EndPoint,具体选择哪个是由调用层决定的。
- 开始执行发送流程
- 校验是否能够发送Request,判断逻辑为:连接状态Ready&&通道Ready&&当然正在发送中的请求数量<
maxInFlightRequestsPerConnection
(最大未完成请求数,这个是上层参数决定的) 。当然,如果这个请求的类型是内部请求,是不需要这个判断的。 - 如果能够发送Request, 则开始构建NetworkSend, 然后调用
Selector.send(send)
开始发送,这个过程其实是注册SelectionKey.OP_WRITE 事件。当然在这之前会将请求保存起来放到inFlightRequests中,用于后面判断请求数是否超过阈值等等。 - 循环遍历
networkClient.poll
获取Response, 直到结束。
Request的几个场景
客户端发起请求,总共分为以下几个场景。
Controller2Broker
关键类 ControllerChannelManager
Controller会向Broker发起一些请求,比如UpdateMetadataRequest 更新元信息请求。
- 那么Controller是如何构建networkClient的呢?
- 如果Broker配置了多个listeners, 怎么选择listeners去发送请求呢?
在Controller重新选举初始化的时候,或者有新的Broker启动上线之后, Controller节点会执行添加Broker的操作。
ControllerChannelManager#addBroker
privatedef addNewBroker(broker: Broker):Unit={val messageQueue =new LinkedBlockingQueue[QueueItem]// 获取内部Broker之间通信的监听器名称val controllerToBrokerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)// 读取内部Broker之间通信的安全协议val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)// 根据监听器名称选择合适的节点和监听器名称val brokerNode = broker.node(controllerToBrokerListenerName)// 省略部分.......val networkClient =new NetworkClient(
selector,new ManualMetadataUpdater(Seq(brokerNode).asJava),
config.brokerId.toString,// 一次只能发一个请求,保证顺序性1,0,0,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs,
config.connectionSetupTimeoutMs,
config.connectionSetupTimeoutMaxMs,
ClientDnsLookup.USE_ALL_DNS_IPS,
time,false,new ApiVersions,
logContext
)// 省略部分.......val requestThread =new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
brokerNode, config, time, requestRateAndQueueTimeMetrics, stateChangeLogger, threadName)}
- 解析配置
control.plane.listener.name
获取ControllerPlane(控制面板)使用的listeners, 并解析出listener_name 和 安全协议。 如果没有配置control.plane.listener.name
, 则使用inter.broker.listener.name
的监听器和安全协议 - 根据上面得到的监听器名称, 就可以指定使用哪个BrokerNode了。例如某个新增的Broker配置如下:
listeners = OUTSIDE1://172.23.164.160:9093,OUTSIDE2://172.23.164.160:9094listener.security.protocol.map=OUTSIDE1:PLAINTEXT,OUTSIDE2:PLAINTEXTinter.broker.listener.name=OUTSIDE2
那么新增Broker的时候,Controller得到的Broker就有2个Endpoint。 那么Controller应该选哪个EndPoint去跟这个Broker建立连接呢?这得看Controller这台Broker的配置是什么了。假设Controller的配置如下:listeners = OUTSIDE1://xxx.xxx:xxx,OUTSIDE2://xxx.xxx:xxxlistener.security.protocol.map=OUTSIDE1:PLAINTEXT,OUTSIDE2:PLAINTEXTinter.broker.listener.name=OUTSIDE2
那么通过上面1中的判断逻辑, 会找到监听器为OUTSIDE2 的EndPoint进行连接。注意:是Controller拿本地配置,去匹配Broker的EndPoint配置。一般情况下,所有的Broker配置都应该一致!如果Controller本地配置的监听器,不存在于其他Broker中会造成什么情况? 如果找不到正确的BrokerNode,从日志里面看,好像并没有打印出明显的异常。但是实际上它是会抛出一个异常BrokerEndPointNotAvailableException.s"End point with listener name ${listenerName.value} not found " + s"for broker $id"
- 构建NetworkClient, 将用于发起网络请求。
- 构建完NetworkClient,创建RequestSendThread线程对象,该对象包含networClient、BrokerNode等等实例。ThreadName为: “Controller-当前BrokerID-to-broker-目标BrokerID-send-thread”
- 启动RequestSendThread, 这个线程做的事情就是跟BrokerNode建立起连接、发起UpdateMetadataRequest请求, 接受请求Response。就是上面的NetworkClientUtils.sendAndReceive流程
PS: 这里传入的
maxInFlightRequestsPerConnection
是1,也就说Controller给某个Broker发送请求同一时间只有一个请求。确保请求的顺序性。
Broker2Controller
在Kafka启动过程中,会构建一个brokerToControllerChannelManager 的实例。这个是专门管理Broker向Controller发起请求的类,里面有一个BrokerToControllerRequestThread线程负责真正的想Controller发起请求。
brokerToControllerChannelManager =new BrokerToControllerChannelManagerImpl(metadataCache, time, metrics, config, threadNamePrefix)
brokerToControllerChannelManager.start()
线程名格式:自定义前缀:broker-${config.brokerId}-to-controller-send-thread
可以看看他里面的类, 也是先构建networkClient, 然后发起请求。具体构建就不再分析了,跟上面的Controller2Broker一样。但是列出几个重点需要注意的地方:
- Controller2Broker通过配置可以找到具体的BrokerNode, 也就是说在发起请求之前就知道向Broker的哪个EndPoint发起请求, 这个时候的Broker2Controller在发起之前是不知道的,只知道监听器名称。当然这个监听器名称的寻找逻辑 跟Controller2Broker一样,也是先找配置
control.plane.listener.name
,找不到就用inter.broker.listener.name
配置。 - Broker刚启动的时候,还没有设置
activeController
,不知道谁是Controller,所以等元信息更新之后,才拿到Controller Broker节点,但是Controller可能有多个EndPoint,那么获取哪个呢? 当然是根据上面的1中获取到的监听器名 listenerName,过滤出BrokerNode。并赋值给activeController
(是具体的Node,一个Broker可以有多个Node的) - 元信息更新器是
ManualMetadataUpdater
. 这个更新器是手动更新,直接调用metadataUpdater.setNodes
来更新节点。// 获取正确的ControllerNode activeController = Option(controllerOpt.get.node(listenerName))// 手动更新一下Nodes信息。所有其他的Broker都只接收listenerName的Node metadataUpdater.setNodes(metadataCache.getAliveBrokers.map(_.node(listenerName)).asJava)
- 有了activeController之后,就可以正常的走网络请求了。但是这个时候还没有请求进来, 它会去循环的poll
requestQueue
里面的请求, 有请求的话就走请求流程。请求成功后,会调用回调接口request.callback.onComplete(response)
- 所以,你想要发送一个请求,只需要把请求参数放到队列
requestQueue
里面就行了。 例如:Broker定时向Controller发送AlterIsr请求。AlterIsrManager.star()
PS: 这里传入的
maxInFlightRequestsPerConnection
也是1,也就说Broker给Controller发送请求同一时间只有一个请求。确保请求的顺序性。
Broker2Broker
Broker之间的请求, 例如 AbstractFetcherThread 副本同步线程。
Follower去Leader Fetch数据,FetchRequest 请求, 那么他们的通信又是什么样子呢?
基本上都是差不多的, 需要注意几个问题
- Broker2Broker 之间的请求用什么EndPoint呢? 答: 用本地的
inter.broker.listener.name
配置去匹配对应的EndPoint。Broker2Broker是属于内部Broker之间的请求。具体的代码在ReplicaManager#makeFollowers
- ReplicaFetcher的线程名: “配置的前缀ReplicaFetcherThread- f e t c h e r I d − fetcherId- fetcherId−{sourceBroker.id}”
- 这里传入的
maxInFlightRequestsPerConnection
也是1,也就说Broker给Controller发送请求同一时间只有一个请求。确保请求的顺序性。
Client2Broker
这个就是 例如 Producer 和 Consumer 等等向Broker发起请求模块。
方式都是一样的,构建自己的 networkClient,配置不同属性。
书籍: 《数据结构和算法基础》
👇🏻 扫描 下方 关注公众号 参与每周福利👇🏻
版权归原作者 石臻臻的杂货铺 所有, 如有侵权,请联系我们删除。