Kafka源码分析-Server-日志存储(4)-OffsetIndex

为了提升查找消息的性能,从Kafka0.8开始,为每个日志文件添加了对应的索引文件。OffsetIndex对象对应磁盘管理上的一个索引文件,与上节分析的FileMessageSet共同构成一个LogSegment对象。
1.首先介绍索引文件中索引项的格式:每个索引项有8个字节,分为两部分,第一部分是相对offset,占4个字节;第二部分是物理地址,就是其索引消息在日志文件中对应的position位置,占4个字节。这样就实现了offset与物理地址直接的映射。相对offset表示的是消息相对于baseOffset的偏移量。例如,分段后的一个日志文件的baseOffset是20,当然它的文件名就是20.log,那么offset为23的Message在索引文件中的相对offset就是23-20=3。消息的offset是Long类型,4个字节可能无法直接存储消息的offset,所以使用相对的offset,这样可以减少索引文件占用的空间。
Kafka使用稀疏索引的方式构造消息的索引,它不保证每个消息在索引文件中都有对应的索引项,这算是磁盘空间,内存空间,查找时间等多方面的折中。不断减少索引文件大小的目的是为了将索引文件映射到内存,在OffsetIndex中会使用MappedByteBuffer将索引文件映射到内存中。
介绍完索引文件的相关概念后,我们来介绍下OffsetIndex字段。

  • _file:指向磁盘上的索引文件。
  • baseOffset:对应日志文件中第一个消息的offset。
  • mmap:用来操作索引文件的MappedByteBuffer。
  • lock:ReentrantLock对象,在对mmap进行操作时,需要加锁保护。
  • _entries:当前索引文件中的索引项个数。
  • _maxEntries:当前索引文件中最多能够保存的索引项个数。
  • _lastOffset:保存最后一个索引项的offset。
    在OffsetIndex初始化的过程中会初始化上述字段,因为会有多个Handler线程并发写入索引文件,所以这些字段使用@volatile修饰,保证线程之间的可见性。初始化代码如下:
 /* initialize the memory mapping for this index */
  @volatile
  private[this] var mmap: MappedByteBuffer = {
    //如果索引文件不存在,则创建新文件并返回true,反之返回false。
    val newlyCreated = _file.createNewFile()
    val raf = new RandomAccessFile(_file, "rw")
    try {
      /* pre-allocate the file if necessary */
      if (newlyCreated) {//对于新创建的的索引文件,进行扩容
        if (maxIndexSize < 8)
          throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
        //根据maxIndexSize的值对索引文件进行扩容,扩容结果是小于maxIndexSize的最大的8的倍数
        raf.setLength(roundToExactMultiple(maxIndexSize, 8))
      }

      /* memory-map the file 进行内存映射 */
      val len = raf.length()
      val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)

      /* set the position in the index for the next entry
      * 将新创建的索引文件的positon设置为0,从头开始写文件。
      * */
      if (newlyCreated)
        idx.position(0)
      else
        // if this is a pre-existing index, assume it is all valid and set position to last entry
        //  对于原来就存在的索引文件,则将position移动到所有索引项的结束位置,防止数据覆盖
        idx.position(roundToExactMultiple(idx.limit, 8))
      idx    //    返回MappedByteBuffer
    } finally {
      CoreUtils.swallow(raf.close())
    }
  }

OffsetIndex提供了向索引文件中添加索引项的append()方法,将索引文件截断到某个位置的truncateTo()方法和truncateToEntries()方法,进行文件扩容的resize()方法。这些方法实际上都是通过mmap字段的相关操作完成的。
OffsetIndex中最常用的还是查找相关的方法,使用的是二分查找,涉及的方法是indexSlotFor和lookup()。值得注意的地方是,查找的目标小于targetOffset的最大offset对应的物理地址(position)。下面是lookup()方法的代码:

/**
   * Find the largest offset less than or equal to the given targetOffset 
   * and return a pair holding this offset and its corresponding physical file position.
   * 
   * @param targetOffset The offset to look up.
   * 
   * @return The offset found and the corresponding file position for this offset. 
   * If the target offset is smaller than the least entry in the index (or the index is empty),
   * the pair (baseOffset, 0) is returned.
   */
  def lookup(targetOffset: Long): OffsetPosition = {
    maybeLock(lock) {//window操作要加锁,其他操作不加做
      val idx = mmap.duplicate//创建一个副本
      val slot = indexSlotFor(idx, targetOffset)//二分查找的具体实现
      if(slot == -1)
        OffsetPosition(baseOffset, 0)
      else//将offset和物理地址(position)封装成OffsetPosition对象并返回
        OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
      //relativeOffset()方法和physical()方法是获取索引项内容的辅助方法,分别实现了
      // 读取索引项中的相对offset和索引项中的物理地址(position)的功能
      }
  }

/**
   * Find the slot in which the largest offset less than or equal to the given
   * target offset is stored.
   * 
   * @param idx The index buffer
   * @param targetOffset The offset to look for
   * 
   * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty
   */
  private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
    // we only store the difference from the base offset so calculate that
    val relOffset = targetOffset - baseOffset
    
    // check if the index is empty
    if (_entries == 0)
      return -1
    
    // check if the target offset is smaller than the least offset
    if (relativeOffset(idx, 0) > relOffset)
      return -1
      
    // binary search for the entry  标准的二分查找法
    var lo = 0
    var hi = _entries - 1
    while (lo < hi) {
      val mid = ceil(hi/2.0 + lo/2.0).toInt
      val found = relativeOffset(idx, mid)
      if (found == relOffset)
        return mid
      else if (found < relOffset)
        lo = mid
      else
        hi = mid - 1
    }
    lo//如果找不到targetOffset对应的索引项,则返回小于targetOffset的最大的索引项位置
  }
最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容