为了提升查找消息的性能,从Kafka0.8开始,为每个日志文件添加了对应的索引文件。OffsetIndex对象对应磁盘管理上的一个索引文件,与上节分析的FileMessageSet共同构成一个LogSegment对象。
1.首先介绍索引文件中索引项的格式:每个索引项有8个字节,分为两部分,第一部分是相对offset,占4个字节;第二部分是物理地址,就是其索引消息在日志文件中对应的position位置,占4个字节。这样就实现了offset与物理地址直接的映射。相对offset表示的是消息相对于baseOffset的偏移量。例如,分段后的一个日志文件的baseOffset是20,当然它的文件名就是20.log,那么offset为23的Message在索引文件中的相对offset就是23-20=3。消息的offset是Long类型,4个字节可能无法直接存储消息的offset,所以使用相对的offset,这样可以减少索引文件占用的空间。
Kafka使用稀疏索引的方式构造消息的索引,它不保证每个消息在索引文件中都有对应的索引项,这算是磁盘空间,内存空间,查找时间等多方面的折中。不断减少索引文件大小的目的是为了将索引文件映射到内存,在OffsetIndex中会使用MappedByteBuffer将索引文件映射到内存中。
介绍完索引文件的相关概念后,我们来介绍下OffsetIndex字段。
- _file:指向磁盘上的索引文件。
- baseOffset:对应日志文件中第一个消息的offset。
- mmap:用来操作索引文件的MappedByteBuffer。
- lock:ReentrantLock对象,在对mmap进行操作时,需要加锁保护。
- _entries:当前索引文件中的索引项个数。
- _maxEntries:当前索引文件中最多能够保存的索引项个数。
- _lastOffset:保存最后一个索引项的offset。
在OffsetIndex初始化的过程中会初始化上述字段,因为会有多个Handler线程并发写入索引文件,所以这些字段使用@volatile修饰,保证线程之间的可见性。初始化代码如下:
/* initialize the memory mapping for this index */
@volatile
private[this] var mmap: MappedByteBuffer = {
//如果索引文件不存在,则创建新文件并返回true,反之返回false。
val newlyCreated = _file.createNewFile()
val raf = new RandomAccessFile(_file, "rw")
try {
/* pre-allocate the file if necessary */
if (newlyCreated) {//对于新创建的的索引文件,进行扩容
if (maxIndexSize < 8)
throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
//根据maxIndexSize的值对索引文件进行扩容,扩容结果是小于maxIndexSize的最大的8的倍数
raf.setLength(roundToExactMultiple(maxIndexSize, 8))
}
/* memory-map the file 进行内存映射 */
val len = raf.length()
val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
/* set the position in the index for the next entry
* 将新创建的索引文件的positon设置为0,从头开始写文件。
* */
if (newlyCreated)
idx.position(0)
else
// if this is a pre-existing index, assume it is all valid and set position to last entry
// 对于原来就存在的索引文件,则将position移动到所有索引项的结束位置,防止数据覆盖
idx.position(roundToExactMultiple(idx.limit, 8))
idx // 返回MappedByteBuffer
} finally {
CoreUtils.swallow(raf.close())
}
}
OffsetIndex提供了向索引文件中添加索引项的append()方法,将索引文件截断到某个位置的truncateTo()方法和truncateToEntries()方法,进行文件扩容的resize()方法。这些方法实际上都是通过mmap字段的相关操作完成的。
OffsetIndex中最常用的还是查找相关的方法,使用的是二分查找,涉及的方法是indexSlotFor和lookup()。值得注意的地方是,查找的目标小于targetOffset的最大offset对应的物理地址(position)。下面是lookup()方法的代码:
/**
* Find the largest offset less than or equal to the given targetOffset
* and return a pair holding this offset and its corresponding physical file position.
*
* @param targetOffset The offset to look up.
*
* @return The offset found and the corresponding file position for this offset.
* If the target offset is smaller than the least entry in the index (or the index is empty),
* the pair (baseOffset, 0) is returned.
*/
def lookup(targetOffset: Long): OffsetPosition = {
maybeLock(lock) {//window操作要加锁,其他操作不加做
val idx = mmap.duplicate//创建一个副本
val slot = indexSlotFor(idx, targetOffset)//二分查找的具体实现
if(slot == -1)
OffsetPosition(baseOffset, 0)
else//将offset和物理地址(position)封装成OffsetPosition对象并返回
OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
//relativeOffset()方法和physical()方法是获取索引项内容的辅助方法,分别实现了
// 读取索引项中的相对offset和索引项中的物理地址(position)的功能
}
}
/**
* Find the slot in which the largest offset less than or equal to the given
* target offset is stored.
*
* @param idx The index buffer
* @param targetOffset The offset to look for
*
* @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty
*/
private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
// we only store the difference from the base offset so calculate that
val relOffset = targetOffset - baseOffset
// check if the index is empty
if (_entries == 0)
return -1
// check if the target offset is smaller than the least offset
if (relativeOffset(idx, 0) > relOffset)
return -1
// binary search for the entry 标准的二分查找法
var lo = 0
var hi = _entries - 1
while (lo < hi) {
val mid = ceil(hi/2.0 + lo/2.0).toInt
val found = relativeOffset(idx, mid)
if (found == relOffset)
return mid
else if (found < relOffset)
lo = mid
else
hi = mid - 1
}
lo//如果找不到targetOffset对应的索引项,则返回小于targetOffset的最大的索引项位置
}