背景
对于Upsert操作,Hudi需要定位到数据所在的File Group。当File Group很多的时候,定位File Group的过程会成为性能瓶颈。
Hudi 提供了索引的方式,保存了每个record key和他所属的file id的对应关系。然后将这些对应关系保存到外部存储系统(HBase, Flink状态后端等)。这种方式需要引入外部系统,运维的复杂度较高且索引数据量较大。除此之外Hudi还提供了Bloom filter方式。每个parquet文件都对应一个bloom filter。通过这个bloom filter可以很容易确定数据不在这个parquet文件。有助于在扫描parquet文件的时候快速跳过无关的文件。但是在确认数据在某个parquet的时候,因bloom filter存在误判的可能性,需要逐条比对数据,存在较大的性能消耗。
在这个背景下提出了Hudi bucket Index。它是一种优化措施,将每个partition中的file group分为N份,N为bucket个数。每个分区下的File group个数一旦确定不再会变化(除了Clustering的时候)。未启用bucket index的情况下file group的file id使用UUID标识。启用了bucket index之后。每个file id的前8为被替换为bucket number(同一个partition中的不同bucket使用bucket number标识)。通过数据的record key取hash运算可以将数据映射到不同的bucket上。也就是说bucket index通过partition -> bucket number两个层级来定位record所属的file group。这两级查找时间复杂度都是O(1),无需遍历数据文件,极大的提高了查找的速度。
除此之外,在查询的时候如果使用bucket字段作为查询筛选条件,由于bucket字段相同的数据一定位于同一个bucket中,可以跳过其他的file group,减少扫描的数据量。
使用bucket index需要注意的是,每个partition的bucket数量一旦确定就无法更改。Hudi的小文件处理策略和大文件分块不再有效。所以说使用前需要预估数据量。如果bucket数量过少,每个file group文件大小会过大,不利于并发处理。如果bucket数量过多,会遇到大量小文件问题,会增大分布式文件系统元数据负载,降低持续读写性能。
Bucket index配置项
- index.type(Flink) / hoodie.index.type(Spark)。使用的索引类型。如果要使用bucket index,需要配置为
BUCKET
。 - hoodie.bucket.index.num.buckets。bucket个数,默认为256。在Flink中默认为4。
- hoodie.bucket.index.hash.field。按照哪个资源hash分桶。不配置默认使用record key。
Bucket Index的原理
Pipelines
我们从构建bucket写入逻辑的BucketStreamWriteOperator
所在的Pipelines
的hoodieStreamWrite
方法开始分析。它的代码如下:
public static DataStream<Object> hoodieStreamWrite(Configuration conf, DataStream<HoodieRecord> dataStream) {
// 如果表的index类型是BUCKET。对应配置项index.type
if (OptionsResolver.isBucketIndexType(conf)) {
// 使用Bucket类型的StreamWriter
WriteOperatorFactory<HoodieRecord> operatorFactory = BucketStreamWriteOperator.getFactory(conf);
// 获取bucket个数,对应配置项hoodie.bucket.index.num.buckets。在Flink中默认为4
int bucketNum = conf.getInteger(FlinkOptions.BUCKET_INDEX_NUM_BUCKETS);
// 获取index key字段,对应配置项hoodie.bucket.index.hash.field。如果没有配置,使用record key
String indexKeyFields = conf.getString(FlinkOptions.INDEX_KEY_FIELD);
// 使用bucket index分区器,根据record key,partition和flink channel数量分区
BucketIndexPartitioner<HoodieKey> partitioner = new BucketIndexPartitioner<>(bucketNum, indexKeyFields);
return dataStream.partitionCustom(partitioner, HoodieRecord::getKey)
.transform(opName("bucket_write", conf), TypeInformation.of(Object.class), operatorFactory)
.uid(opUID("bucket_write", conf))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
} else {
// 其他数据类型使用StreamWriteOperator
WriteOperatorFactory<HoodieRecord> operatorFactory = StreamWriteOperator.getFactory(conf);
return dataStream
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
// 先按照record key分区
// 这里的bucket概念和bucket index中的bucket不同
// 这里的bucket是根据file id和partition分组的概念,同时还考虑到了小文件聚合(将insert的数据优先分配到小文件)
// bucket作为整体flush到磁盘上
.keyBy(HoodieRecord::getRecordKey)
// 分配bucket
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
.uid(opUID("bucket_assigner", conf))
.setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
// shuffle by fileId(bucket id)
// 确定好数据所属的file id,分区写入
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform(opName("stream_write", conf), TypeInformation.of(Object.class), operatorFactory)
.uid(opUID("stream_write", conf))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
}
}
BucketStreamWriteOperator
代码较少,在构造函数中创建出了BucketStreamWriteFunction
。我们接下来分析它。
BucketStreamWriteFunction
在processElement
方法中,Hudi根据record的key计算出record对应的bucket number。结合record所在的partition可以很快的确定数据所在的file group。
@Override
public void processElement(I i, ProcessFunction<I, Object>.Context context, Collector<Object> collector) throws Exception {
HoodieRecord<?> record = (HoodieRecord<?>) i;
// 获取key
final HoodieKey hoodieKey = record.getKey();
// 获取partition path
final String partition = hoodieKey.getPartitionPath();
final HoodieRecordLocation location;
// 在同一个partition中,从Hudi表中读取bucket number和file id的对应关系,放入索引中(bucketIndex)
// 后面分析
bootstrapIndexIfNeed(partition);
// 从索引读取该分区中bucket number和file id的对应关系
Map<Integer, String> bucketToFileId = bucketIndex.computeIfAbsent(partition, p -> new HashMap<>());
// 获取record对应的bucket number
// 代码为getHashKeys(hoodieKey, indexKeyFields).hashCode() & Integer.MAX_VALUE) % numBuckets
// indexKeyField为配置项hoodie.bucket.index.hash.field的值,如果没有配置,使用record key
final int bucketNum = BucketIdentifier.getBucketId(hoodieKey, indexKeyFields, this.bucketNum);
// 组装bucket id
final String bucketId = partition + "/" + bucketNum;
// incBucketIndex是新增数据的bucketIndex缓存
if (incBucketIndex.contains(bucketId)) {
// 如果是新增数据
// 根据bucket number找到对应的file group的id
location = new HoodieRecordLocation("I", bucketToFileId.get(bucketNum));
} else if (bucketToFileId.containsKey(bucketNum)) {
// 如果索引中有,说明是修改的数据
location = new HoodieRecordLocation("U", bucketToFileId.get(bucketNum));
} else {
// 如果任何索引中都没有,该bucket number还没有对应的file group,需要创建一个
// 生成新的file id,替换前8位为bucket number
String newFileId = BucketIdentifier.newBucketFileIdPrefix(bucketNum);
location = new HoodieRecordLocation("I", newFileId);
// 加入索引缓存中
bucketToFileId.put(bucketNum, newFileId);
incBucketIndex.add(bucketId);
}
record.unseal();
// 设置数据的location
record.setCurrentLocation(location);
record.seal();
// 将数据缓存起来
bufferRecord(record);
}
bootstrapIndexIfNeed
方法在指定的partition中,从Hudi表中读取bucket number和file id的对应关系放入索引。代码如下:
private void bootstrapIndexIfNeed(String partition) {
// 如果是insert overwrite,跳过
if (OptionsResolver.isInsertOverwrite(config)) {
// skips the index loading for insert overwrite operation.
return;
}
// 如果partition已被索引,返回
if (bucketIndex.containsKey(partition)) {
return;
}
LOG.info(String.format("Loading Hoodie Table %s, with path %s", this.metaClient.getTableConfig().getTableName(),
this.metaClient.getBasePath() + "/" + partition));
// Load existing fileID belongs to this task
// 索引的数据类型为map,key为bucket number
// valu为file id
Map<Integer, String> bucketToFileIDMap = new HashMap<>();
// 遍历partition中的所有file slice
this.writeClient.getHoodieTable().getHoodieView().getLatestFileSlices(partition).forEach(fileSlice -> {
// 获取file id
String fileId = fileSlice.getFileId();
// file id的前8位是bucket number,获取它
int bucketNumber = BucketIdentifier.bucketIdFromFileId(fileId);
// 检查这个bucket是否归本task处理
// 每个task只缓存自己需要处理的bucket的索引
if (isBucketToLoad(bucketNumber, partition)) {
LOG.info(String.format("Should load this partition bucket %s with fileId %s", bucketNumber, fileId));
// Validate that one bucketId has only ONE fileId
// 检查一个bucket number只对应一个file id
if (bucketToFileIDMap.containsKey(bucketNumber)) {
throw new RuntimeException(String.format("Duplicate fileId %s from bucket %s of partition %s found "
+ "during the BucketStreamWriteFunction index bootstrap.", fileId, bucketNumber, partition));
} else {
LOG.info(String.format("Adding fileId %s to the bucket %s of partition %s.", fileId, bucketNumber, partition));
// 对应关系加入缓存中
bucketToFileIDMap.put(bucketNumber, fileId);
}
}
});
// 加入缓存
bucketIndex.put(partition, bucketToFileIDMap);
}
Flink Hudi默认的state索引
作为对比,我们再去分析下Flink state索引的实现方式。Flink state索引保存了record key和file id的对应关系,保存在Flink的状态后端中。
接下来我们分别分析使用索引和加载索引的方式。
使用索引方式
按照前面Pipelines
的分析,在数据流向BucketAssignFunction
的processElement
方法之前已经按照record key分区。所以索引和record key是一一对应关系。
@Override
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
if (value instanceof IndexRecord) {
// 如果读进来的数据是IndexRecord类型,说明处于加载索引的阶段
IndexRecord<?> indexRecord = (IndexRecord<?>) value;
// 更新保存的索引状态
this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation());
} else {
// 处理用户数据
processRecord((HoodieRecord<?>) value, out);
}
}
processRecord
方法读取状态中的索引。如果record的partition path没有发生变化,数据还在原先索引指向的位置,否则需要分配新的位置,更新索引。数据的位置和partition, record key有关。
private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exception {
// 1. put the record into the BucketAssigner;
// 2. look up the state for location, if the record has a location, just send it out;
// 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
final HoodieKey hoodieKey = record.getKey();
// 获取key和partition path
final String recordKey = hoodieKey.getRecordKey();
final String partitionPath = hoodieKey.getPartitionPath();
final HoodieRecordLocation location;
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
// 从状态中获取上次record位置
HoodieRecordGlobalLocation oldLoc = indexState.value();
// upsert,upsert_prepped或者delete的时候isChangingRecords为true
if (isChangingRecords && oldLoc != null) {
// Set up the instant time as "U" to mark the bucket as an update bucket.
// 如果partition path发生了变化
// record的partition字段值发生变化会导致partition path发生变化
if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
// 如果开启了全局索引,意思如果是新旧数据的partition path不同,是否更新旧数据的partition path
if (globalIndex) {
// if partition path changes, emit a delete record for old partition path,
// then update the index state using location with new partition path.
// 生成一个删除类型的数据,指向旧的partition path
HoodieRecord<?> deleteRecord = new HoodieAvroRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()),
payloadCreation.createDeletePayload((BaseAvroPayload) record.getData()));
deleteRecord.unseal();
// 设置instant time为U
deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
deleteRecord.seal();
out.collect((O) deleteRecord);
}
// 获取新数据的location
location = getNewRecordLocation(partitionPath);
} else {
// 如果partition path没有发生变化
location = oldLoc.toLocal("U");
// 为update类型record创建或加入bucket
this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
}
} else {
// 新增数据,创建新的location
location = getNewRecordLocation(partitionPath);
}
// always refresh the index
if (isChangingRecords) {
// 如果数据更新,需要紧接着更新index状态变量
updateIndexState(partitionPath, location);
}
// 配置record的location
record.unseal();
record.setCurrentLocation(location);
record.seal();
out.collect((O) record);
}
这段方法中BucketAssigner
更详细的分析可以参考Hudi 源码之数据写入逻辑。
加载索引的方式
Pipelines::bootstrap
启动方法流式启动调用的是streamBootstrap
。该方法创建了BootstrapOperator
。
BootstrapOperator
在启动初始化状态量的时候调用initializeState
,从Hudi表加载索引。
@Override
public void initializeState(StateInitializationContext context) throws Exception {
ListStateDescriptor<String> instantStateDescriptor = new ListStateDescriptor<>(
"instantStateDescriptor",
Types.STRING
);
instantState = context.getOperatorStateStore().getListState(instantStateDescriptor);
if (context.isRestored()) {
Iterator<String> instantIterator = instantState.get().iterator();
if (instantIterator.hasNext()) {
lastInstantTime = instantIterator.next();
}
}
this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf, true);
this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext());
this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath());
this.aggregateManager = getRuntimeContext().getGlobalAggregateManager();
preLoadIndexRecords();
}
继续分析preLoadIndexRecords
方法。该方法判断需要加载哪些分区的索引。代码如下:
protected void preLoadIndexRecords() throws Exception {
String basePath = hoodieTable.getMetaClient().getBasePath();
int taskID = getRuntimeContext().getIndexOfThisSubtask();
LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID);
// 遍历所有的分区
for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
// 如果分区名称匹配正则(对应index.partition.regex配置项),加载该分区的索引
if (pattern.matcher(partitionPath).matches()) {
loadRecords(partitionPath);
}
}
LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
// wait for the other bootstrap tasks finish bootstrapping.
waitForBootstrapReady(getRuntimeContext().getIndexOfThisSubtask());
}
loadRecords
方法读取所有分区下的record(包含base file和log),包装为IndexRecord
发往下游。Flink下游算子接收到IndexRecord
会更新状态变量。
protected void loadRecords(String partitionPath) throws Exception {
long start = System.currentTimeMillis();
// 获取并行度
final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
// 最大并行度
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
// 当前作业id
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
HoodieTimeline commitsTimeline = this.hoodieTable.getMetaClient().getCommitsTimeline();
// 获取上次snapshot之后的所有commit组成的timeline
if (!StringUtils.isNullOrEmpty(lastInstantTime)) {
commitsTimeline = commitsTimeline.findInstantsAfter(lastInstantTime);
}
// 找到最近的已完成的commit instant
Option<HoodieInstant> latestCommitTime = commitsTimeline.filterCompletedInstants().lastInstant();
// 如果存在
if (latestCommitTime.isPresent()) {
// 根据不同的文件类型,获取不同的文件读取工具
BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
// 读取schema
Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
// 获取latestCommitTime之前的instant,如果有还没有完成的compaction,将这个的instant和前一个合并(将这两个file slice的log文件视为一个file slice的)后返回
List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
.getLatestMergedFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp())
.collect(toList());
for (FileSlice fileSlice : fileSlices) {
// 如果这个fileSlice不归该任务处理,跳过
if (!shouldLoadFile(fileSlice.getFileId(), maxParallelism, parallelism, taskID)) {
continue;
}
LOG.info("Load records from {}.", fileSlice);
// load parquet records
fileSlice.getBaseFile().ifPresent(baseFile -> {
// filter out crushed files
// 如果base file为空或受损,跳过
if (!isValidFile(baseFile.getFileStatus())) {
return;
}
try (ClosableIterator<HoodieKey> iterator = fileUtils.getHoodieKeyIterator(this.hadoopConf, new Path(baseFile.getPath()))) {
// 逐个读取base file中保存的record,包装为IndexRecord类型发往下游
iterator.forEachRemaining(hoodieKey -> {
output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice))));
});
}
});
// load avro log records
// 获取所有的log file路径
List<String> logPaths = fileSlice.getLogFiles()
.sorted(HoodieLogFile.getLogFileComparator())
// filter out crushed files
.filter(logFile -> isValidFile(logFile.getFileStatus()))
.map(logFile -> logFile.getPath().toString())
.collect(toList());
HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
writeConfig, hadoopConf);
try {
// 读取出这些log文件中的数据,包装为IndexRecord类型发往下游
for (String recordKey : scanner.getRecords().keySet()) {
output.collect(new StreamRecord(new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice))));
}
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
} finally {
scanner.close();
}
}
}
long cost = System.currentTimeMillis() - start;
LOG.info("Task [{}}:{}}] finish loading the index under partition {} and sending them to downstream, time cost: {} milliseconds.",
this.getClass().getSimpleName(), taskID, partitionPath, cost);
}