prometheus/tsdb 的源码阅读笔记 0x01

分析一下
github.com/prometheus/prometheus/tsdb/chunkenc

github.com/prometheus/prometheus/tsdb/chunks
这两个 pkg

1. chunkenc

chunkenc
├── bstream.go
├── chunk.go
├── chunk_test.go
└── xor.go

chunkenc 提供了时序数据点的编码格式.

它定义了一个 Chunk 接口 及其附属的 AppenderIterator接口.

此外给出了 Chunk 的一个实现 XORChunk .

Chunk, Appender, Iterator

定义了数据块Chunk, 其为一组数据点的集合.

可以通过 Appender 继续写入, 及通过 Iterator 遍历已有的数据点.

这里明确指出了一个数据点是一对 时间戳 (int64) 和值 (float64).

Pool

Chunk 对象池的定义, 并给出了一个基于内存的实现.

XORChunk

这是目前给出的唯一一个 Chunk 实现, 使用了 Gorilla 的算法思路.

func (a *xorAppender) Append(t int64, v float64) {
    var tDelta uint64
    num := binary.BigEndian.Uint16(a.b.bytes())

    if num == 0 {
        // ...

    } else if num == 1 {
        // ...

    } else {
        tDelta = uint64(t - a.t)
        dod := int64(tDelta - a.tDelta)

        // Gorilla has a max resolution of seconds, Prometheus milliseconds.
        // Thus we use higher value range steps with larger bit size.
        switch {
        case dod == 0:
            a.b.writeBit(zero)
        case bitRange(dod, 14):
            a.b.writeBits(0x02, 2) // '10'
            a.b.writeBits(uint64(dod), 14)
        case bitRange(dod, 17):
            a.b.writeBits(0x06, 3) // '110'
            a.b.writeBits(uint64(dod), 17)
        case bitRange(dod, 20):
            a.b.writeBits(0x0e, 4) // '1110'
            a.b.writeBits(uint64(dod), 20)
        default:
            a.b.writeBits(0x0f, 4) // '1111'
            a.b.writeBits(uint64(dod), 64)
        }

        a.writeVDelta(v)
    }

    a.t = t
    a.v = v
    binary.BigEndian.PutUint16(a.b.bytes(), num+1)
    a.tDelta = tDelta
}

上述代码的 switch 部分对 dod (delta of delta) 的大小范围进行判定, 以确定一个最多 4bit 的标识, 及标识后的数据长度.

这里为了支持毫秒级的时间精度 (原始算法中为秒级), 对每一级的范围和长度做了调整.

数据点的压缩比率会受到一些影响, 但能适应更多的使用场景.

bstream

XORChunk 写入和读取点数据都依赖于 bstream提供的 bit 流读写能力, 核心是

func (b *bstream) writeBit(bit bit) {
    // ...
}

func (b *bstream) readBit() (bit, error) {
    // ...
}
func (b *bstream) writeByte(byt byte) {
    // ...
}

func (b *bstream) readByte() (byte, error) {
    // ...
}
func (b *bstream) writeBits(u uint64, nbits int) {
    // ...
}

func (b *bstream) readBits(nbits int) (uint64, error) {
    // ...
}

三组方法.

2. chunks

chunks
└── chunks.go

chunks 是 chunk 数据持久化的实现

Meta

Chunk 的元数据

Writer

是一个基于文件目录的 ChunkWriter 实现.

执行一组 Chunk 的写入, 并按体积进行分片, 每一片称为一个 sequenceFile.

  1. 结构体定义

    // Writer implements the ChunkWriter interface for the standard
    // serialization format.
    type Writer struct {
     // 当前目录
     dirFile *os.File
    
     // 所有用于写入的数据文件, 只有最后一个是当前有效的
     files   []*os.File
     wbuf    *bufio.Writer
     
     // 当前分片文件已写入的字节数
     n       int64
    
     // 复用的 crc32, 用于每一个写入的 Chunk 的校验
     crc32   hash.Hash
    
     // 分片的尺寸, 目前是 512 << 20
     segmentSize int64
    }
    
  2. Writer.finalizeTail & Writer.cut

    // 安全地关闭当前用于写入的文件
    func (w *Writer) finalizeTail() error {
     // ...
     
     // As the file was pre-allocated, we truncate any superfluous zero bytes.
     // 由于每个 seq file 都会预先分配空间, 因此需要按照实际使用量进行一次 Truncate
     off, err := tf.Seek(0, os.SEEK_CUR)
     if err != nil {
         return err
     }
     if err := tf.Truncate(off); err != nil {
         return err
     }
    
     return tf.Close()
    }
    
    func (w *Writer) cut() error {
     // Sync current tail to disk and close.
     if err := w.finalizeTail(); err != nil {
         return err
     }
    
         // 打开一个新文件用于数据写入
     p, _, err := nextSequenceFile(w.dirFile.Name())
     if err != nil {
         return err
     }
     f, err := os.OpenFile(p, os.O_WRONLY|os.O_CREATE, 0666)
     if err != nil {
         return err
     }
      
         // 为文件预分配 segmentSize 大小
     if err = fileutil.Preallocate(f, w.segmentSize, true); err != nil {
         return err
     }
     if err = w.dirFile.Sync(); err != nil {
         return err
     }
    
     // Write header metadata for new file.
    
     metab := make([]byte, 8)
     binary.BigEndian.PutUint32(metab[:4], MagicChunks)
     metab[4] = chunksFormatV1
    
     if _, err := f.Write(metab); err != nil {
         return err
     }
    
         // 重置或初始化一个 bufio.Writer
     w.files = append(w.files, f)
     if w.wbuf != nil {
         w.wbuf.Reset(f)
     } else {
         w.wbuf = bufio.NewWriterSize(f, 8*1024*1024)
     }
      
         // 重置已写入的字节数, 8 是 文件头 MagicChunks 占用的大小
         // 即前面 `Write header metadata for new file.` 的 b 部分
     w.n = 8
    
     return nil
    }
    
  3. Writer.WriteChunks

    func (w *Writer) WriteChunks(chks ...Meta) error {
     // Calculate maximum space we need and cut a new segment in case
     // we don't fit into the current one.
         // 计算所有 chunks 可能占用的空间
     maxLen := int64(binary.MaxVarintLen32) // The number of chunks.
     for _, c := range chks {
         maxLen += binary.MaxVarintLen32 + 1 // The number of bytes in the chunk and its encoding.
         maxLen += int64(len(c.Chunk.Bytes()))
     }
     newsz := w.n + maxLen
    
      // 根据这里执行 w.cut() 的判断条件, 实际上是可能出现单个文件超过 w.segmentSize 的
     if w.wbuf == nil || w.n > w.segmentSize || newsz > w.segmentSize && maxLen <= w.segmentSize {
         if err := w.cut(); err != nil {
             return err
         }
     }
    
     var (
         // 初始化 b 作为写入长度, 写入 chunk 编码方式, 计算 hash 时等的 buffer
         b   = [binary.MaxVarintLen32]byte{}
         seq = uint64(w.seq()) << 32
     )
     for i := range chks {
         chk := &chks[i]
    
         // 用于定位 Chunk 
         chk.Ref = seq | uint64(w.n)
    
         // ...
    
         // 校验数据
         w.crc32.Reset()
         if err := chk.writeHash(w.crc32); err != nil {
             return err
         }
         if err := w.write(w.crc32.Sum(b[:0])); err != nil {
             return err
         }
     }
    
     return nil
    }
    
ByteSlice

用于 Reader 中逐段读取数据

Reader

用于读取数据块

  1. NewDirReader

    // NewDirReader returns a new Reader against sequentially numbered files in the
    // given directory.
    func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) {
         // 根据命名规则得到所有数据文件
     files, err := sequenceFiles(dir)
     if err != nil {
         return nil, err
     }
      
         // 初始化一个 chunkenc.Pool
     if pool == nil {
         pool = chunkenc.NewPool()
     }
    
     var bs []ByteSlice
     var cs []io.Closer
    
     for _, fn := range files {
         f, err := fileutil.OpenMmapFile(fn)
         if err != nil {
             return nil, errors.Wrapf(err, "mmap files")
         }
         cs = append(cs, f)
         bs = append(bs, realByteSlice(f.Bytes()))
     }
     return newReader(bs, cs, pool)
    }
    
  2. Reader.Chunk

    // 根据定位读取指定 Chunk
    func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) {
     // 分别计算文件所在的位置, 和 Chunk 数据的起始位置
     var (
         seq = int(ref >> 32)
         off = int((ref << 32) >> 32)
     )
     
     // ...
    
     // 将数据封装成一个 chunkenc.Chunk
     return s.pool.Get(chunkenc.Encoding(r[0]), r[1:1+l])
    }
    

    ?

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容

  • Lua 5.1 参考手册 by Roberto Ierusalimschy, Luiz Henrique de F...
    苏黎九歌阅读 13,780评论 0 38
  • 谈到docker源码,其实网上有很多的源码的分析的文章,也看过一些大牛写的docker源码解读的文章,收获很大。我...
    跨界师阅读 1,319评论 2 3
  • 不支持上传文件,所以就复制过来了。作者信息什么的都没删。对前端基本属于一窍不通,所以没有任何修改,反正用着没问题就...
    全栈在路上阅读 1,955评论 0 2
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • //公共引用 varfs =require('fs'), path =require('path'); 1、读取文...
    才気莮孒阅读 829评论 0 1