0


(Kafka源码五)Kafka服务端处理消息

Kafka 服务端(Broker)采用 Reactor 的架构思想,通过1 个 Acceptor,N 个 Processor(N默认为3),M 个 KafkaRequestHandler(M默认为8),来处理客户端请求,这种模式结合了多线程和事件驱动的设计,优点是能够有效地利用系统资源,可以实现高效地处理请求,无需为每个连接或请求创建新的线程,减少了线程上下文切换的开销,以实现高并发和高吞吐量。

文章目录

服务端整体架构

在这里插入图片描述

Kafka 服务端的网络结构主要包含以下三层:

  • 网络连接层:Acceptor 线程接收客户端的连接请求并创建网络连接。
  • 请求转发层:Acceptor 线程以轮询的方式分发给Processor 线程,从而实现负载均衡的效果,Processor 线程将请求放到请求队列中。
  • 请求处理层:KafkaRequestHandler线程不断地从请求队列中获取请求,解析请求,调用KafkaAPIs获取对应的操作结果,并将结果返回给客户端。

执行流程

  • Acceptor 线程在初始化的时候会往selector注册 OP_ACCEPT事件,表示可以接受客户端的连接请求,当客户端有请求连接过来时,根据selectionkey可以得到socketChannel,再将socketChannel以轮询的方式交给Processor线程(默认有3个Processor线程)处理。
  • Processor线程收到Acceptor线程分发的连接后,会先将连接放入自己的队列newConnections中,然后在selector注册OP_READ事件,表示可以读取客户端的请求,当客户端发送消息过来时,Processor线程就会处理OP_READ事件,然后Processor线程会将客户端的请求连接放入requestChannel的RequestQueue(请求队列被所有Processor线程共享)里并取消OP_READ事件的监听
  • KafkaRequestHandler线程(默认会创建8个线程)会从RequestQueue取出请求进行处理,通过KafkaApis调用得到响应结果,将处理后的响应结果放入responseQueues中(每个Processor线程对应一个responseQueues)。
  • Processor线程往selector注册OP_WRITE事件,表示可以将响应结果发送给客户端,当Processor线程检测到有OP_WRITE事件时,Processor线程就会从对应的responseQueues中取出响应结果,并通过selector.poll()方法将响应结果发送给对应的客户端且取消OP_WRITE事件的监听,最后Processor线程就会重新注册OP_READ事件,准备下一个请求的处理。

源码剖析

服务端的核心代码都在

kafka.scala

这个类,首先是main入口方法,该方法主要设置参数,然后调用启动方法

  def main(args:Array[String]):Unit={try{//启动服务端的时候,在这里解析参数
      val serverProps =getPropsFromArgs(args)
      val kafkaServerStartable =KafkaServerStartable.fromProps(serverProps)//启动的核心代码方法
      kafkaServerStartable.startup
         //...}

kafka的服务端核心方法都在startup()里面

  def startup(){//启动服务
      server.startup()//...}

创建SocketServer,startup启动后,会创建

Acceptor线程

和三个

Processor线程

并启动

//Kafka 服务端的功能 都是在这里面实现
def startup(){//创建NIO的服务端
        socketServer =newSocketServer(config, metrics, kafkaMetricsTime)
        socketServer.startup()}
 def startup(){this.synchronized{// 设置发送和接收的缓冲区大小
      val sendBufferSize = config.socketSendBufferBytes
      val recvBufferSize = config.socketReceiveBufferBytes
      //获取当前broker主机id
      val brokerId = config.brokerId
      var processorBeginIndex =0//endpoints表示Kafka配置文件config/server.properties中的信息//正常情况下,只有一个服务实例
      endpoints.values.foreach { endpoint =>
        val protocol = endpoint.protocolType
        //processorEndIndex = 0 + 3
        val processorEndIndex = processorBeginIndex + numProcessorThreads
        //创建了三个Processor的线程for(i <- processorBeginIndex until processorEndIndex)//默认新建3个Processor线程processors(i)=newProcessor(i, connectionQuotas, protocol)//Acceptor类的主构造函数会启动3个Processor线程
        val acceptor =newAcceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
          processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)

        acceptors.put(endpoint, acceptor)// Utils是线程工具类,启动acceptor线程,Utils.newThread("kafka-socket-acceptor-%s-%d".format(protocol.toString, endpoint.port), acceptor,false).start()
        acceptor.awaitStartup()

        processorBeginIndex = processorEndIndex
      }}}

Acceptor 线程处理

Utils.newThread启动

acceptor

线程的

start()

方法后,就会执行该线程的

run

方法

  • 首先 serverChannelnioSelector 注册 OP_ACCEPT 事件,nioSelector就会监听serverChannel是否有新的连接请求
  • 若有新的连接请求到来,根据该连接的key创建 SocketChannel,然后通过轮询的方式分发给不同的 processors线程处理,从而保证processor线程的负载均衡。
  def run(){//ServerChannel往Selector注册OP_ACCEPT事件,表示可以接收客户端的请求,//Selector就会检查ServerChannel是否有新的请求到达
    serverChannel.register(nioSelector,SelectionKey.OP_ACCEPT)startupComplete()try{var currentProcessor =0//死循环,不断轮询while(isRunning){try{//selecotr 查看是否有新的注册事件
          val ready = nioSelector.select(500)//大于0,说明有新事件到来if(ready >0){//获取事件的key
            val keys = nioSelector.selectedKeys()//遍历注册的所有key
            val iter = keys.iterator()while(iter.hasNext && isRunning){try{
                val key = iter.next
                //遍历完就删除
                iter.remove()//如果事件是OP_ACCEPT,就会调用accept()方法接收请求if(key.isAcceptable)// 创建SocketChannel,将其分发给Processor线程处理accept(key,processors(currentProcessor))elsethrownewIllegalStateException("Unrecognized key state for acceptor thread.")// 轮询遍历下一个Processor线程
                currentProcessor =(currentProcessor +1)% processors.length
              }catch{case e:Throwable=>error("Error while accepting connection", e)}}}}}

根据key封装

socketChannel

,分发给

processor线程

处理,processor线程将socketChannel放入自己的队列

newConnections

中,该队列是由

ConcurrentLinkedQueue

实现的队列,然后唤醒

processor

selector

处理

def accept(key:SelectionKey, processor:Processor){//根据SelectionKey获取到serverSocketChannel
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]//获取到一个socketChannel
    val socketChannel = serverSocketChannel.accept()try{
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)// processor调用accept方法对socketChannel进行处理
      processor.accept(socketChannel)}}
 def accept(socketChannel:SocketChannel){//将接收的 SocketChannel放入到自己的队列
    newConnections.add(socketChannel)// 唤醒 Processor 的 selector 进行处理wakeup()}

Processor 线程处理

在上面的startup()中已经创建了3个Processor线程,然后在Acceptor的主构造函数中进行启动

//主构造函数,new出来的时候会被运行
private[kafka] class Acceptor(val endPoint: EndPoint,
                              val sendBufferSize: Int,
                              val recvBufferSize: Int,
                              brokerId: Int,
                              processors: Array[Processor],
                              connectionQuotas: ConnectionQuotas)
  extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
  this.synchronized {//启动在startup()创建的3个Processor线程
    processors.foreach { processor =>
      Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor,false).start()}}

Processor启动之后就会执行run方法

  override def run(){startupComplete()while(isRunning){try{//读取队列中的每个SocketChannel,都往Selector上面注册OP_READ事件configureNewConnections()//处理响应,并注册OP_WRITE事件processNewResponses()//读取和发送请求的代码应该都是在这个方法完成,用于处理OP_READ事件与OP_WRITE事件poll()//处理接收到的新请求,将这些请求放入requestChannel请求队列中并取消OP_READ事件processCompletedReceives()//处理已经发送出去的响应并重新监听OP_READ事件processCompletedSends()processDisconnected()}swallowError(closeAll())shutdownComplete()}

不断获取连接队列里所有的SocketChannel,解析参数得到ConnectionId,再往selector注册OP_READ事件,注册之后就可以读取客户端的请求。

private def configureNewConnections(){//当连接队列不为空while(!newConnections.isEmpty){//不断获取连接队列里面的SocketChannel
      val channel = newConnections.poll()try{//解析SocketChannel,获取对应的参数
        val localHost = channel.socket().getLocalAddress.getHostAddress
        val localPort = channel.socket().getLocalPort
        val remoteHost = channel.socket().getInetAddress.getHostAddress
        val remotePort = channel.socket().getPort
        val connectionId =ConnectionId(localHost, localPort, remoteHost, remotePort).toString
        //往selector注册OP_READ事件
        selector.register(connectionId, channel)}}}
从这段代码可以知道kafka对SocketChannel进行了封装,封装成KakaChannel,并将SelectionKey和KakaChannel进行二者的绑定,除此之外,Kafka还实现了channel复用,将connectionId和KakaChannel放入map中,避免每次发起请求都新建channel,减少了资源的消耗

publicvoidregister(String id,SocketChannel socketChannel)throwsClosedChannelException{//往自己的Selector上面注册OP_READ事件SelectionKey key = socketChannel.register(nioSelector,SelectionKey.OP_READ);//kafka对SocketChannel进行了封装,封装成KakaChannelKafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);//将key和channel进行绑定
        key.attach(channel);//channels护了多个网络连接,实现channel复用this.channels.put(id, channel);}

将客户端的请求放入请求队列中,并取消OP_READ事件

private def processCompletedReceives(){//遍历每一个请求
    selector.completedReceives.asScala.foreach { receive =>try{
        val channel = selector.channel(receive.source)
        val session =RequestChannel.Session(newKafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
          channel.socketAddress)//对于获取到的请求进行解析,得到request
        val req =RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)//将request放入请求队列中
        requestChannel.sendRequest(req)//取消OP_READ事件
        selector.mute(receive.source)}}}

KafkaRequestHandlerPool 处理请求

接下来就会通过

KafkaRequestHandler

线程去处理请求队列中的请求。回到最开始的 startup(),

  def startup(){//主要用于处理请求队列里面的请求
        requestHandlerPool =newKafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)//...}

新建的

KafkaRequestHandlerPool

,会在主构造函数创建8个KafkaRequestHandler

classKafkaRequestHandlerPool(val brokerId:Int,
                              val requestChannel:RequestChannel,
                              val apis:KafkaApis,
                              numThreads:Int)extendsLoggingwithKafkaMetricsGroup{

  val threads =newArray[Thread](numThreads)
  val runnables =newArray[KafkaRequestHandler](numThreads)//默认启动8个线程,一般情况下可以根据消息的吞吐量去设置这个参数for(i <-0 until numThreads){//创建线程runnables(i)=newKafkaRequestHandler(i, brokerId, aggregateIdleMeter, numThreads, requestChannel, apis)threads(i)=Utils.daemonThread("kafka-request-handler-"+ i,runnables(i))//线程启动,就会执行run方法threads(i).start()}}

KafkaRequestHandler启动之后就会执行run方法,将客户端的请求交由

KafkaAPIs

进行最终的处理。

def run(){while(true){try{var req :RequestChannel.Request=nullwhile(req ==null){
          val startSelectTime =SystemTime.nanoseconds
          //获取request对象
          req = requestChannel.receiveRequest(300)
          val idleTime =SystemTime.nanoseconds - startSelectTime
          aggregateIdleMeter.mark(idleTime / totalHandlerThreads)}//将请求交给KafkaApis进行处理
        apis.handle(req)}}}
  def handle(request:RequestChannel.Request){//处理生产者发送过来的请求caseApiKeys.PRODUCE =>handleProducerRequest(request)}
def handleProducerRequest(request:RequestChannel.Request){//获取到生产发送过来的请求信息
    val produceRequest = request.body.asInstanceOf[ProduceRequest]
    val numBytesAppended = request.header.sizeOf + produceRequest.sizeOf
    //按照分区的方式去遍历数据
    val (existingAndAuthorizedForDescribeTopics, nonExistingOrUnauthorizedForDescribeTopics)= produceRequest.partitionRecords.asScala.partition {//对发送过来的数据进行权限等判断。case(topicPartition, _)=>authorize(request.session,Describe,newResource(auth.Topic, topicPartition.topic))&& metadataCache.contains(topicPartition.topic)}//判断是否有写权限。
    val (authorizedRequestInfo, unauthorizedForWriteRequestInfo)= existingAndAuthorizedForDescribeTopics.partition {case(topicPartition, _)=>authorize(request.session,Write,newResource(auth.Topic, topicPartition.topic))}//把接收的数据追加到磁盘上
      replicaManager.appendMessages(
        produceRequest.timeout.toLong,
        produceRequest.acks,
        internalTopicsAllowed,
        authorizedMessagesPerPartition,
        sendResponseCallback)}

数据存储到磁盘后,调用sendResponseCallback()回调函数处理响应。

  def sendResponseCallback(responseStatus:Map[TopicPartition,PartitionResponse]){//...
   quotas.produce.recordAndMaybeThrottle(
        request.session.sanitizedUser,
        request.header.clientId,
        numBytesAppended,
        produceResponseCallback)}}

继续调用回调函数produceResponseCallback(),根据

ack

的值进行处理

  1. acks=0:生产者不会等待任何来自broker的确认。
  2. acks=1(默认):生产者会等待leader broker接收到消息并确认(但不保证所有副本都已同步)。
  3. acks=all 或 acks=-1:生产者会等待所有同步副本都确认接收到消息。
 def produceResponseCallback(delayTimeMs:Int){//acks = 0,表示生产者不关心数据的处理结果,所以不需要返回响应信息if(produceRequest.acks ==0){//...}else{//acks不为0,表明生产者需要响应消息//封装请求头
          val respHeader =newResponseHeader(request.header.correlationId)//封装请求体,也就是响应消息
          val respBody = request.header.apiVersion match {case0=>newProduceResponse(mergedResponseStatus.asJava)case version@(1|2)=>newProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version)}//将响应消息发送给客户端
          requestChannel.sendResponse(newRequestChannel.Response(request,newResponseSend(request.connectionId, respHeader, respBody)))}}

将响应放入processor对应的responseQueue中,默认情况下有3个responseQueue。

  def sendResponse(response:RequestChannel.Response){//先从数组获取Processor对应的队列,再将响应放到这个队列responseQueues(response.processor).put(response)for(onResponse <- responseListeners)onResponse(response.processor)}

接着服务端需要响应客户端,回到processor的run方法

 override def run(){//处理响应,并注册OP_WRITE事件processNewResponses()//处理已经发送出去的响应并重新监听OP_READ事件processCompletedSends()}

处理responseQueues中的响应可以分为三种类型:

  • NoOpAction:对于不需要返回响应的请求,重新注册OP_READ监听事件。
  • SendAction:需要发送响应的情况,接下来注册OP_WRITE监听事件,并最终通过selector.poll()方法将响应结果发送给客户端。
  • CloseConnectionAction:需要关闭的响应。
private def processNewResponses(){//通过Process线程的id获取Response对象var curr = requestChannel.receiveResponse(id)while(curr !=null){try{
        curr.responseAction match {//对于不需要返回响应的请求caseRequestChannel.NoOpAction=>
            curr.request.updateRequestMetrics
            //重新监听OP_READ事件
            selector.unmute(curr.request.connectionId)//需要发送响应的情况caseRequestChannel.SendAction=>//注册OP_WRITE事件,发送响应sendResponse(curr)// 需要关闭的响应,关闭连接caseRequestChannel.CloseConnectionAction=>
            curr.request.updateRequestMetrics
            close(selector, curr.request.connectionId)}}finally{
        curr = requestChannel.receiveResponse(id)}}}

正常情况下处理已经发送出去的响应,将响应从响应队列中移除,并重新监听OP_READ事件,准备处理客户端的下一个请求。

private def processCompletedSends(){
    selector.completedSends.asScala.foreach { send =>//移除响应队列的响应
      val resp = inflightResponses.remove(send.destination).getOrElse {//...}
      resp.request.updateRequestMetrics()
      selector.unmute(send.destination)}}
标签: kafka 分布式 java

本文转载自: https://blog.csdn.net/weixin_42828342/article/details/136952911
版权归原作者 vibag 所有, 如有侵权,请联系我们删除。

“(Kafka源码五)Kafka服务端处理消息”的评论:

还没有评论