深入理解Kafka设计:RequestPurgatory(2)

版权声明:本文为博主原创文章,未经博主允许不得转载。

摘要

上一节 深入理解Kafka设计:RequestPurgatory(1),从类结构设计方面介绍了RequestPurgatory相关的几个主要组件类的功能,这一节将从源码层面来分析RequestPurgatory是如何处理延迟请求的。

RequestPurgatory工作原理

Kafka的延迟请求分两种类型:一种是Produce延迟请求,需要等待请求涉及到的数据都被其他follower同步;另一种则是Fetch延迟请求,为了避免不断的轮询,Fetch请求会等待有足够的数据量才返回。下面以Produce延迟请求为例来分析RequestPurgatory的工作原理。

处理DelayedRequest

图1.处理延迟ProduceRequest的时序图

KafkaRequestHandler处理请求的过程已经在深入理解Kafka设计:高性能服务器模型(1)中介绍过。这里直接从KafkaApis处理发送请求的方法入手。

KafkaApis.handleProducerOrOffsetCommitRequest用于接收Producer发送过来的新消息,然后将其持久化,并根据请求参数requiredAcks的值决定是否立即返回响应或将请求延迟处理。

def handleProducerOrOffsetCommitRequest(request: RequestChannel.Request) {
    .....
    val localProduceResults = appendToLocalLog(produceRequest, offsetCommitRequestOpt.nonEmpty)//持久化消息数据
    .....
    if(produceRequest.requiredAcks == 0) {//producer不需要响应
    .....
    } else if (produceRequest.requiredAcks == 1 ||
        produceRequest.numPartitions <= 0 ||
        numPartitionsInError == produceRequest.numPartitions) {//producer只需要leader成功接受到消息即可
    .....
    } else {//producer需要所有follower成功同步到消息
            //为这个延迟请求生成(topic, parition)列表,Wathcers会将列表中的元素用作key
            val producerRequestKeys = produceRequest.data.keys.toSeq
            val statuses = localProduceResults.map(r =>
        r.key -> DelayedProduceResponseStatus(r.end + 1, ProducerResponseStatus(r.errorCode, r.start))).toMap
            val delayedRequest =  new DelayedProduce(
                producerRequestKeys,
                request,
                produceRequest.ackTimeoutMs.toLong,
                produceRequest,
                statuses,
                offsetCommitRequestOpt)//构建延迟请求
        
            //延迟请求交给producerRequestPurgatory
            val satisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(delayedRequest)
            if (satisfiedByMe)
                producerRequestPurgatory.respond(delayedRequest)
    }
    .....
 }

当requiredAcks>1时,会将原请求封装成延迟请求DelayedProduce,并交予producerRequestPurgatory来处理。如果该请求此时已经满足条件(即其他follower都已经成功同步了数据),那么会立即返回响应给producer。

ProducerRequestPurgatory.checkAndMaybeWatch是处理DelayedProduce的唯一入口,他会先检查请求是否满足条件,否则再延迟处理请求,并返回false。

def checkAndMaybeWatch(delayedRequest: T): Boolean = {
    if (delayedRequest.keys.size <= 0)//如果请求不包含消息,那么已满足条件
      return isSatisfiedByMe(delayedRequest)

    var isSatisfied = delayedRequest synchronized checkSatisfied(delayedRequest)//首先检查请求是否已满足条件
    if (isSatisfied)
      return isSatisfiedByMe(delayedRequest)//更新isSatisied标志位,reaper线程会使用到

    //由于一个请求可能包含多个topic-partition的数据,每个topic-partition都需要做条件检查
    for(key <- delayedRequest.keys) {
      val lst = watchersFor(key)//为该topic-partition创建Watcher
      if (!lst.addIfNotSatisfied(delayedRequest)) {//如果request还没有满足条件,那么添加到该key对应的Watchers里面
        return false//如果已经满足条件,那么为了节约开销,不再检查余下的key
      }
    }

    isSatisfied = delayedRequest synchronized checkSatisfied(delayedRequest)//再一次检查请求是否满足条件
    if (isSatisfied)
      return isSatisfiedByMe(delayedRequest)
    else {
      //仍然不满足时,需要加入超时控制
      expiredRequestReaper.enqueue(delayedRequest)
      return false
    }
  }

一般来说,大部分请求很快便会满足条件,所以接连两次调用checkSatisfied,以减轻内存溢出风险。如果两次检查仍然没有满足条件,那么该请求也要控制超时时间了。

ExpiredRequestReaper的工作

ExpiredRequestReaper线程随Broker启动而启动,其目的是不断地检查超时请求,触发RequestPurgatory处理超时响应,并帮助其清理超时请求对象以缓解内存压力。

图2.ExpiredRequestReaper线程工作时序图

ExpiredRequestReaper.run中的主循环可以看出该线程的主要逻辑:

def run() {
      while(running.get) {
        try {
          val curr = pollExpired()//获取下一个超时请求
          if (curr != null) {
            curr synchronized {
              expire(curr)//触发RequestPurgatory执行请求超时的动作,具体行为由子类来实现
            }
          }
          //RequestPurgatory监听的请求个数超过1000时
          if (RequestPurgatory.this.watched() >= purgeInterval) {
            val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum//清除已满足条件的请求
          }
          //当超时请求列表超过1000时
          if (delayed() >= purgeInterval) {
            val purged = purgeSatisfied()//清除已满足条件的请求
          }
        } catch {
            .....
        }
      }
      .....
    }
    
    //获取下一个超时请求
    private def pollExpired(): T = {
      while(true) {
        val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS)//获取第一个超时的对象
        if (curr == null)
          return null.asInstanceOf[T]
        val updated = curr.satisfied.compareAndSet(false, true)//更新statisfied标志位
        if(updated) {
          return curr
        }
      }
      throw new RuntimeException("This should not happen")
    }

不论是ExpiredRequestReaper.purgeSatisfied还是Watchers.purgeSatisfied,都是遍历他们的Request列表(数据结构不一样,ExpiredRequestReaper是DelayedQueue实现,另一个则是LinkedList),然后从中remove掉已经满足条件的Request对象,使得他们可以被GC回收掉,达到最终目的。

释放DelayedRequest

图3.ExpiredRequestReaper线程工作时序图

上面提到DelayedProduce的创建、清理和超时控制,那么他的条件在什么时候会被满足呢?肯定是在数据同步以后了。KafkaApis.handleFetchRequest不仅是处理消费者拉取消息的逻辑,同时也是follower同步leader数据的逻辑入口,所以从这个方法入手来分析。

def handleFetchRequest(request: RequestChannel.Request) {
    val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
    val dataRead = replicaManager.readMessageSets(fetchRequest)//根据topic,partition,offset,size等参数来读取相应的消息数据

    //如果是从follower过来的请求
    if(fetchRequest.isFromFollower)
      recordFollowerLogEndOffsets(fetchRequest.replicaId, dataRead.mapValues(_.offset))//那么更新内存中replica的相应offset
      
    .....
    if(fetchRequest.maxWait <= 0 ||//请求不需要等待
       fetchRequest.numPartitions <= 0 ||//请求不获取任何数据
       bytesReadable >= fetchRequest.minBytes ||//已经获取到足够数据
       errorReadingData) {//读取消息时出错
      .....
      //立即返回响应
      val response = new FetchResponse(fetchRequest.correlationId, dataRead.mapValues(_.data))
      requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
    } else {//这里是处理DelayedFetch的逻辑,不再介绍
      .....
    }
  }
  
  private def recordFollowerLogEndOffsets(replicaId: Int, offsets: Map[TopicAndPartition, LogOffsetMetadata]) {
    .....
      case (topicAndPartition, offset) =>
        //更新该replicaId在这个Leader中所有topic-partition的offset
replicaManager.updateReplicaLEOAndPartitionHW(topicAndPartition.topic,
          topicAndPartition.partition, replicaId, offset)
//此时要检查是否可以满足某些produce请求的条件了
replicaManager.unblockDelayedProduceRequests(topicAndPartition)
    }
  }
  
  def unblockDelayedProduceRequests(key: TopicAndPartition) {
    val satisfied = producerRequestPurgatory.update(key)//更新key所对应的DelayedProduce

    //为每一个满足了条件的请求返回响应
    satisfied.foreach(producerRequestPurgatory.respond(_))
  }

unblockDelayedProduceRequests是一个重要的方法,顾名思义,解锁Produce延迟请求。每一次follower成功获取到数据以后,除了要更新对应的offset外,最后还要解锁相应的Produce延迟请求。所以他调用了RequestPurgatory.update去更新topic-parition所对应的DelayedRequest状态。

def update(key: Any): Seq[T] = {
    val w = watchersForKey.get(key)
    if(w == null)
      Seq.empty
    else
      w.collectSatisfiedRequests()
  }

Watchers.collectSatisfiedRequests会找到topic-parition所对应的Watchers对象,然后遍历其Request列表并触发条件检查,并返回已满足的请求。

def collectSatisfiedRequests(): Seq[T] = {
      val response = new mutable.ArrayBuffer[T]
      synchronized {
        val iter = requests.iterator()
        while(iter.hasNext) {
          val curr = iter.next
          if(curr.satisfied.get) {
            iter.remove()//这里也会去除已满足条件的请求
          } else {
            val satisfied = curr synchronized checkSatisfied(curr)
            if(satisfied) {
              iter.remove()
              val updated = curr.satisfied.compareAndSet(false, true)
              if(updated == true) {
                response += curr
              }
            }
          }
        }
      }
      response
    }

由于ExpiredRequestReaper线程也会触发所有Watchers遍历Request列表的动作,所以对每个Watchers对象做了同步控制。另外在RequestPurgatory.checkAndMaybeWatch方法中,业务线程也会执行checkSatisfied操作,跟这里的遍历有极小的可能会操作同一个request对象,所以也加入了同步控制curr synchronized checkSatisfied(curr)。

而且对一个老的延迟请求来说,Watchers.collectSatisfiedRequests这里是他更新状态的唯一入口了。

Wathcers.checkSatisfied最终会调用DelayedRequest.isSatisfied,实现由具体子类来决定,看看DelayedProduce.isSatisfied

  def isSatisfied(replicaManager: ReplicaManager) = {
    //检查请求中每一个topic-partition
    partitionStatus.foreach { case (topicAndPartition, fetchPartitionStatus) =>
      // skip those partitions that have already been satisfied
      if (fetchPartitionStatus.acksPending) {//acksPending代表是否还有数据没有同步完
        val partitionOpt = replicaManager.getPartition(topicAndPartition.topic, topicAndPartition.partition)
        val (hasEnough, errorCode) = partitionOpt match {
          case Some(partition) =>
            partition.checkEnoughReplicasReachOffset(
              fetchPartitionStatus.requiredOffset,
              produce.requiredAcks)//检查该topic-partition的所有备份都已经追上了该请求的offset
          case None =>
            (false, ErrorMapping.UnknownTopicOrPartitionCode)
        }
        if (errorCode != ErrorMapping.NoError) {
          fetchPartitionStatus.acksPending = false
          fetchPartitionStatus.responseStatus.error = errorCode
        } else if (hasEnough) {
          fetchPartitionStatus.acksPending = false
          fetchPartitionStatus.responseStatus.error = ErrorMapping.NoError
        }
      }
    }

    //如果该请求的所有topic-partition都已满足条件
    val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
    satisfied
  }

DelayedProduce经过isSatisfied方法检查,确认条件已经满足,那么isSatisfied标志位被置为true,等待ExpiredRequestReaper定时执行清理,从Watchers和Reaper的列表中移除,最终被GC回收掉。完成Produce延迟请求的生命周期。

缺陷

上一节也提到了Reaper线程正是为了缓解内存压力而设计出来的,通过设置工作频率(purgeInterval)可以使其及时的清理已满足条件的延迟请求。但是工作频率设置过高,那么reaper线程会频繁地遍历Wathcers和他的Request列表(同步控制),带来了性能损耗。因此,kafka在0.9.x引入了新的设计来解决这个问题。

总结

这一节通过跟踪DelayedProduce请求的处理流程,梳理了其生命周期,也从中明白了RequestPurgatory的工作原理与不足。下一节,将介绍新的设计是如何解决这个问题的。

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容