Flink源码分析系列文档目录
请点击:Flink 源码分析系列文档目录
源代码分支
release-0.9.0
Hudi 源代码GitHub地址:apache/hudi: Upserts, Deletes And Incremental Processing on Big Data. (github.com)
HoodieTableFactory
Flink通过SPI机制加载org.apache.flink.table.factories.Factory
接口的实现类。Hudi的hudi-flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
文件内容如下:
org.apache.hudi.table.HoodieTableFactory
这个类是Flink SQL创建Table Sink和Source的入口类。本篇我们从这个类开始,分析HoodieTableSink
的创建过程。创建TableSink的入口方法逻辑如下:
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
// 获取create table是否with子句附带的参数
Configuration conf = FlinkOptions.fromMap(context.getCatalogTable().getOptions());
// 获取表的物理Schema,意思是不包含计算字段和元数据字段
TableSchema schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
// 检查参数合理性
// 检查hoodie.datasource.write.recordkey.field和write.precombine.field配置项是否包含在表字段中,如果不包含则抛出异常
sanityCheck(conf, schema);
// 根据table定义和主键等配置,Hudi自动附加一些属性配置
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), context.getCatalogTable(), schema);
// 返回HoodieTableSink
return new HoodieTableSink(conf, schema);
}
HoodieTableSink
Flink SQL在执行过程中最终被解析转换为Flink的TableSink
或者TableSource
。本篇我们关注数据写入Hudi的过程。HoodieTableSink
写入数据的逻辑位于getSinkRuntimeProvider
方法。它的内容和解析如下所示:
@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return (DataStreamSinkProvider) dataStream -> {
// setup configuration
// 获取checkpoint超时配置
long ckpTimeout = dataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout();
// 设置Hudi的instant commit超时时间为Flink的checkpoint超时时间
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
// 获取schema对应每列数据类型
RowType rowType = (RowType) schema.toRowDataType().notNull().getLogicalType();
// bulk_insert mode
// 获取写入操作类型,默认是upsert
final String writeOperation = this.conf.get(FlinkOptions.OPERATION);
// 如果写入操作类型配置的为bulk_insert,进入这个if分支
if (WriteOperationType.fromValue(writeOperation) == WriteOperationType.BULK_INSERT) {
// 创建出批量插入operator工厂类
BulkInsertWriteOperator.OperatorFactory<RowData> operatorFactory = BulkInsertWriteOperator.getFactory(this.conf, rowType);
// 获取分区字段
final String[] partitionFields = FilePathUtils.extractPartitionKeys(this.conf);
if (partitionFields.length > 0) {
// 创建出key生成器,用于指定数据分组,keyBy算子使用
RowDataKeyGen rowDataKeyGen = RowDataKeyGen.instance(conf, rowType);
// 如果启用write.bulk_insert.shuffle_by_partition
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SHUFFLE_BY_PARTITION)) {
// shuffle by partition keys
// 数据流按照分区字段值进行keyBy操作
dataStream = dataStream.keyBy(rowDataKeyGen::getPartitionPath);
}
// 如果需要按照分区排序
if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_BY_PARTITION)) {
// 创建一个排序operator
SortOperatorGen sortOperatorGen = new SortOperatorGen(rowType, partitionFields);
// sort by partition keys
// 为datastream增加一个排序操作符
dataStream = dataStream
.transform("partition_key_sorter",
TypeInformation.of(RowData.class),
sortOperatorGen.createSortOperator())
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
ExecNode$.MODULE$.setManagedMemoryWeight(dataStream.getTransformation(),
conf.getInteger(FlinkOptions.WRITE_SORT_MEMORY) * 1024L * 1024L);
}
}
// 为dataStream加入批量写入operator并返回
return dataStream
.transform("hoodie_bulk_insert_write",
TypeInformation.of(Object.class),
operatorFactory)
// follow the parallelism of upstream operators to avoid shuffle
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS))
.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits");
}
// 对于非批量写入模式,采用流式写入
// stream write
int parallelism = dataStream.getExecutionConfig().getParallelism();
// 创建流式写入operator
StreamWriteOperatorFactory<HoodieRecord> operatorFactory = new StreamWriteOperatorFactory<>(conf);
// 将数据从RowData格式转换为HoodieRecord
DataStream<HoodieRecord> dataStream1 = dataStream
.map(RowDataToHoodieFunctions.create(rowType, conf), TypeInformation.of(HoodieRecord.class));
// bootstrap index
// TODO: This is a very time-consuming operation, will optimization
// 是否启动时加载索引
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
// 如果启用,会在启动时自动加载索引,包装为IndexRecord发往下游
dataStream1 = dataStream1.rebalance()
.transform(
"index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)))
.setParallelism(conf.getOptional(FlinkOptions.INDEX_BOOTSTRAP_TASKS).orElse(parallelism))
.uid("uid_index_bootstrap_" + conf.getString(FlinkOptions.TABLE_NAME));
}
// 按照record key分区,然后使用ucketAssignFunction分桶
// 再按照分桶id分区,使用StreamWriteFunction流式写入
DataStream<Object> pipeline = dataStream1
// Key-by record key, to avoid multiple subtasks write to a bucket at the same time
.keyBy(HoodieRecord::getRecordKey)
.transform(
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
.uid("uid_bucket_assigner_" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getOptional(FlinkOptions.BUCKET_ASSIGN_TASKS).orElse(parallelism))
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", TypeInformation.of(Object.class), operatorFactory)
.uid("uid_hoodie_stream_write" + conf.getString(FlinkOptions.TABLE_NAME))
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
// compaction
// 如果需要压缩(表类型为MERGE_ON_READ,并且启用了异步压缩)
if (StreamerUtil.needsAsyncCompaction(conf)) {
// 首先在coordinator通知checkpoint完毕的时候生成压缩计划
// 然后使用CompactFunction压缩hudi table数据
return pipeline.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
new CompactionPlanOperator(conf))
.setParallelism(1) // plan generate must be singleton
.rebalance()
.transform("compact_task",
TypeInformation.of(CompactionCommitEvent.class),
new ProcessOperator<>(new CompactFunction(conf)))
.setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
.addSink(new CompactionCommitSink(conf))
.name("compact_commit")
.setParallelism(1); // compaction commit should be singleton
} else {
return pipeline.addSink(new CleanFunction<>(conf))
.setParallelism(1)
.name("clean_commits");
}
};
}
从上面源代码我们可大致梳理出数据入Hudi表的流程:
- 如果配置了批量插入,采用
BulkInsertWriteOperator
批量写入数据。根据是否需要排序的要求,决定是否采用SortOperator
。 - 将
RowData
格式的数据转换为Hudi专用的HoodieRecord
格式。 - 根据配置需要,确定是否使用
BootstrapFunction
加载索引,此步骤耗时较长。 - 根据数据的partition分配数据的存储位置(BucketAssignFunction)。
- 将数据通过流的方式落地
StreamWriteFunction
。 - 如果是MOR类型表,且开启了异步压缩,schedule一个压缩操作(
CompactionPlanOperator
和CompactFunction
)。
批量插入相关
BulkInsertWriteOperator
BulkInsertWriteOperator
使用BulkInsertWriteFunction
进行批量数据插入操作。
BulkInsertWriteFunction
的初始化逻辑位于open
方法中,代码如下所示:
@Override
public void open(Configuration parameters) throws IOException {
// 获取批量插入数据作业的taskID
this.taskID = getRuntimeContext().getIndexOfThisSubtask();
// 创建writeClient,它负责创建index,提交数据和回滚,以及数据增删改查操作
this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext());
// 根据table类型和写入操作类型推断操作类型
this.actionType = CommitUtils.getCommitActionType(
WriteOperationType.fromValue(config.getString(FlinkOptions.OPERATION)),
HoodieTableType.valueOf(config.getString(FlinkOptions.TABLE_TYPE)));
// 获取上一个进行中的instant时间戳
this.initInstant = this.writeClient.getLastPendingInstant(this.actionType);
// 发送一个WriteMetadataEvent到coordinator,结束上一批数据写入过程
sendBootstrapEvent();
// 初始化writerHelper,用于辅助进行数据批量插入
initWriterHelper();
}
该function遇到每一个元素,通过writerHelper
将这个元素写入到Parquet文件中。
@Override
public void processElement(I value, Context ctx, Collector<O> out) throws IOException {
this.writerHelper.write((RowData) value);
}
每一批数据结束后,会调用endInput
方法。执行writeHelper
关闭和通知coordinator批量插入完毕。
public void endInput() {
final List<WriteStatus> writeStatus;
try {
// 关闭writeHelper
this.writerHelper.close();
// 获取所有HoodieRowDataCreateHandle对应的writeStatus,每个数据写入的partitionPath对应一个handle
writeStatus = this.writerHelper.getWriteStatuses().stream()
.map(BulkInsertWriteFunction::toWriteStatus).collect(Collectors.toList());
} catch (IOException e) {
throw new HoodieException("Error collect the write status for task [" + this.taskID + "]");
}
// 发送本批数据已完全写入的event给coordinator
final WriteMetadataEvent event = WriteMetadataEvent.builder()
.taskID(taskID)
.instantTime(this.writerHelper.getInstantTime())
.writeStatus(writeStatus)
.lastBatch(true)
.endInput(true)
.build();
this.eventGateway.sendEventToCoordinator(event);
}
SortOperator
SortOperator
用于将一批插入的数据排序后再写入???code>write.bulk_insert.sort_by_partition配置项会启用此特性。
它的初始化逻辑位于open
方法,内容和分析如下:
@Override
public void open() throws Exception {
super.open();
LOG.info("Opening SortOperator");
// 获取用户代码classloader
ClassLoader cl = getContainingTask().getUserCodeClassLoader();
// 获取RowData序列化器
AbstractRowDataSerializer inputSerializer =
(AbstractRowDataSerializer)
getOperatorConfig().getTypeSerializerIn1(getUserCodeClassloader());
// 创建Hudi专用的序列化器,传入参数为RowData字段数
this.binarySerializer = new BinaryRowDataSerializer(inputSerializer.getArity());
NormalizedKeyComputer computer = gComputer.newInstance(cl);
RecordComparator comparator = gComparator.newInstance(cl);
gComputer = null;
gComparator = null;
// 获取作业的内存管理器
MemoryManager memManager = getContainingTask().getEnvironment().getMemoryManager();
// 使用Flink提供的二进制MergeSort工具对RowData排序
this.sorter =
new BinaryExternalSorter(
this.getContainingTask(),
memManager,
computeMemorySize(),
this.getContainingTask().getEnvironment().getIOManager(),
inputSerializer,
binarySerializer,
computer,
comparator,
getContainingTask().getJobConfiguration());
// 排序工具包含了排序线程,合并线程以及溢写Thread,该方法启动这些线程
this.sorter.startThreads();
// 创建结果收集器,用于发送结果到下游
collector = new StreamRecordCollector<>(output);
// register the the metrics.
// 创建监控仪表,包含内存已用字节数,溢写文件数和溢写字节数
getMetricGroup().gauge("memoryUsedSizeInBytes", (Gauge<Long>) sorter::getUsedMemoryInBytes);
getMetricGroup().gauge("numSpillFiles", (Gauge<Long>) sorter::getNumSpillFiles);
getMetricGroup().gauge("spillInBytes", (Gauge<Long>) sorter::getSpillInBytes);
}
SortOperator
每次接收到一个RowData类型数据,都把它放入BinaryExternalSorter
的缓存中。
@Override
public void processElement(StreamRecord<RowData> element) throws Exception {
this.sorter.write(element.getValue());
}
当一批数据插入过程结束时,SortOperator
将sorter
中以排序的二进制RowData数据顺序取出,发往下游。
@Override
public void endInput() throws Exception {
BinaryRowData row = binarySerializer.createInstance();
MutableObjectIterator<BinaryRowData> iterator = sorter.getIterator();
while ((row = iterator.next(row)) != null) {
collector.collect(row);
}
}
RowDataToHoodieFunction
负责将RowData映射为HoodieRecord
,转换的逻辑位于toHoodieRecord
方法中。
private HoodieRecord toHoodieRecord(I record) throws Exception {
// 根据AvroSchema,将RowData数据转换为Avro格式
GenericRecord gr = (GenericRecord) this.converter.convert(this.avroSchema, record);
// 获取HoodieKey,它由record key字段值和partitionPath(分区路径)共同确定
final HoodieKey hoodieKey = keyGenerator.getKey(gr);
// 创建数据载体,该对象包含RowData数据
HoodieRecordPayload payload = payloadCreation.createPayload(gr);
// 获取操作类型,增删改查
HoodieOperation operation = HoodieOperation.fromValue(record.getRowKind().toByteValue());
// 构造出HoodieRecord
return new HoodieRecord<>(hoodieKey, payload, operation);
}
BootstrapFunction
通途为加载时候生成索引。该特性通过index.bootstrap.enabled
配置项开启。索引在接收到数据的时候开始加载,只加载index.partition.regex
配置项正则表达式匹配的partition path对应的索引。加载完毕之后,该算子将不再进行任何其他操作,直接将数据发往下游。
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
// 标记是否已启动,初始值为false
if (!alreadyBootstrap) {
// 获取hoodie表元数据所在路径
String basePath = hoodieTable.getMetaClient().getBasePath();
int taskID = getRuntimeContext().getIndexOfThisSubtask();
LOG.info("Start loading records in table {} into the index state, taskId = {}", basePath, taskID);
// 遍历表包含的所有partitionPath
for (String partitionPath : FSUtils.getAllFoldersWithPartitionMetaFile(FSUtils.getFs(basePath, hadoopConf), basePath)) {
// pattern为index.partition.regex配置项的值,决定加载哪些partition的index,默认全加载
if (pattern.matcher(partitionPath).matches()) {
// 加载分区索引
loadRecords(partitionPath, out);
}
}
// wait for others bootstrap task send bootstrap complete.
// 等待其他task启动完毕
waitForBootstrapReady(taskID);
// 标记已启动完毕
alreadyBootstrap = true;
LOG.info("Finish sending index records, taskId = {}.", getRuntimeContext().getIndexOfThisSubtask());
}
// send the trigger record
// 把数据原封不动发往下游
// 该算子不操作数据,仅仅是通过数据触发加载索引的操作
out.collect((O) value);
}
loadRecords
方法加载partition的索引。索引是Indexrecord
格式,保存了record key,partition path(两者合起来为HoodieKey)和所在fileSlice的对应关系。
private void loadRecords(String partitionPath, Collector<O> out) throws Exception {
long start = System.currentTimeMillis();
// 根据存储格式,创建对应的格式处理工具,目前支持Parquet和Orc
BaseFileUtils fileUtils = BaseFileUtils.getInstance(this.hoodieTable.getBaseFileFormat());
// 获取table对应的avro schema
Schema schema = new TableSchemaResolver(this.hoodieTable.getMetaClient()).getTableAvroSchema();
// 获取并行度,最大并行度和taskID
final int parallelism = getRuntimeContext().getNumberOfParallelSubtasks();
final int maxParallelism = getRuntimeContext().getMaxNumberOfParallelSubtasks();
final int taskID = getRuntimeContext().getIndexOfThisSubtask();
// 获取时间线上最后一个已提交的instant
Option<HoodieInstant> latestCommitTime = this.hoodieTable.getMetaClient().getCommitsTimeline()
.filterCompletedInstants().lastInstant();
// 如果这个instant存在
if (latestCommitTime.isPresent()) {
// 获取这个commit时间之前的所有FileSlice
List<FileSlice> fileSlices = this.hoodieTable.getSliceView()
.getLatestFileSlicesBeforeOrOn(partitionPath, latestCommitTime.get().getTimestamp(), true)
.collect(toList());
for (FileSlice fileSlice : fileSlices) {
// 判断这个fileSlice是否归本task加载
// 如果不是则跳过
if (!shouldLoadFile(fileSlice.getFileId(), maxParallelism, parallelism, taskID)) {
continue;
}
LOG.info("Load records from {}.", fileSlice);
// load parquet records
// 加载FlieSlice中的数据文件
fileSlice.getBaseFile().ifPresent(baseFile -> {
// filter out crushed files
// 根据文件类型,校验文件是否正常
if (!isValidFile(baseFile.getFileStatus())) {
return;
}
final List<HoodieKey> hoodieKeys;
try {
// 获取Partition对应的HoodieKey
hoodieKeys =
fileUtils.fetchRecordKeyPartitionPath(this.hadoopConf, new Path(baseFile.getPath()));
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from file: %s", baseFile), e);
}
// 发送indexRecord(各个HoodieKey和fileSlice的对应关系)到下游,这里是列存储文件的index
for (HoodieKey hoodieKey : hoodieKeys) {
out.collect((O) new IndexRecord(generateHoodieRecord(hoodieKey, fileSlice)));
}
});
// load avro log records
// 加载所有avro格式log文件的路径
List<String> logPaths = fileSlice.getLogFiles()
// filter out crushed files
.filter(logFile -> isValidFile(logFile.getFileStatus()))
.map(logFile -> logFile.getPath().toString())
.collect(toList());
// 扫描log文件,合并record key相同的数据
HoodieMergedLogRecordScanner scanner = FormatUtils.logScanner(logPaths, schema, latestCommitTime.get().getTimestamp(),
writeConfig, hadoopConf);
try {
// 遍历合并后的数据,遍历他们的record key
// 发送IndexRecord到下游,这里处理的是log文件中数据的index
for (String recordKey : scanner.getRecords().keySet()) {
out.collect((O) new IndexRecord(generateHoodieRecord(new HoodieKey(recordKey, partitionPath), fileSlice)));
}
} catch (Exception e) {
throw new HoodieException(String.format("Error when loading record keys from files: %s", logPaths), e);
} finally {
scanner.close();
}
}
}
BucketAssignFunction
执行数据分桶操作。为每一条数据分配它的存储位置。如果开启了索引加载(BootstrapFunction
),BucketAssignFunction
会把索引数据(IndexRecord
)加载入operator状态缓存中。
@Override
public void processElement(I value, Context ctx, Collector<O> out) throws Exception {
// 如果接收到的是索引数据
// 如果启用的加载索引,上一节的BootstrapFunction会产生IndexRecord
// 这里需要根据索引,更新recordKey和储存位置的对应关系
if (value instanceof IndexRecord) {
IndexRecord<?> indexRecord = (IndexRecord<?>) value;
// 设置operator StateHandler当前处理的key为record key
this.context.setCurrentKey(indexRecord.getRecordKey());
// 更新indexState为索引数据对应的位置
// 将IndexRecord携带的recordKey和location信息对应存入indexState中
this.indexState.update((HoodieRecordGlobalLocation) indexRecord.getCurrentLocation());
} else {
// 进入此分支伤命接收到的事HoodieRecord,开始处理数据过程
processRecord((HoodieRecord<?>) value, out);
}
}
数据处理过程位于processRecord
方法,逻辑如下所示:
private void processRecord(HoodieRecord<?> record, Collector<O> out) throws Exception {
// 1. put the record into the BucketAssigner;
// 2. look up the state for location, if the record has a location, just send it out;
// 3. if it is an INSERT, decide the location using the BucketAssigner then send it out.
// 获取HoodieKey,分别拿出recordKey和partitionPath
final HoodieKey hoodieKey = record.getKey();
final String recordKey = hoodieKey.getRecordKey();
final String partitionPath = hoodieKey.getPartitionPath();
// 封装了HoodieRecord的存储位置,即这条HoodieRecord对应哪个文件
final HoodieRecordLocation location;
// Only changing records need looking up the index for the location,
// append only records are always recognized as INSERT.
// 获取index中保存的location信息
HoodieRecordGlobalLocation oldLoc = indexState.value();
// 如果操作类型为UPSERT,DELETE或者UPSERT_PREPPED,isChangingRecords为true
if (isChangingRecords && oldLoc != null) {
// Set up the instant time as "U" to mark the bucket as an update bucket.
// 如果index的partitionPath和当前HoodieRecord的不同
if (!Objects.equals(oldLoc.getPartitionPath(), partitionPath)) {
// 由index.global.enabled配置项控制
// 表示一个相同key的record到来但是partitionPath不同,是否需要更新旧的partitionPath
if (globalIndex) {
// if partition path changes, emit a delete record for old partition path,
// then update the index state using location with new partition path.
// 创建一个删除元素发给下游,删除老的partitionPath信息
HoodieRecord<?> deleteRecord = new HoodieRecord<>(new HoodieKey(recordKey, oldLoc.getPartitionPath()),
payloadCreation.createDeletePayload((BaseAvroPayload) record.getData()));
deleteRecord.setCurrentLocation(oldLoc.toLocal("U"));
deleteRecord.seal();
out.collect((O) deleteRecord);
}
// 通过BucketAssigner获取新的存储位置
location = getNewRecordLocation(partitionPath);
// 更新IndexState为新的partitionPath和location
updateIndexState(partitionPath, location);
} else {
location = oldLoc.toLocal("U");
// 加入更新数据的位置信息到bucketAssigner
this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
}
} else {
// 如果不是数据更新操作
location = getNewRecordLocation(partitionPath);
this.context.setCurrentKey(recordKey);
}
// always refresh the index
// 确保数据更新操作刷新索引(indexState)
if (isChangingRecords) {
updateIndexState(partitionPath, location);
}
// 设置record的存放位置,发送给下游
record.setCurrentLocation(location);
out.collect((O) record);
}
StreamWriteFunction
用于写入HoodieRecord到文件系统中。
@Override
public void processElement(I value, KeyedProcessFunction<K, I, O>.Context ctx, Collector<O> out) {
bufferRecord((HoodieRecord<?>) value);
}
processElement
又调用了bufferRecord
方法。在存入数据到buffer之前,先检查是否需要flush bucket和buffer。先提前判断如果某条数据加入bucket后将超过了bucket大小限制,会flush这个bucket。buffer为多个bucket的最大占用内存数量总和,如果buffer空闲容量耗尽,Hudi挑一个当前数据写入最多的bucket执行flush。代码如下所示:
private void bufferRecord(HoodieRecord<?> value) {
// 根据HoodieRecord的partitionPath和fileId构建出bucketID
final String bucketID = getBucketID(value);
// 根据bucketID缓存了一组DataBucket,保存在buckets变量
// 如果bucketID对应的DataBucket不存在,这里创建一个新的并放入buckets中
// bucket batch大小设置为write.batch.size
// partitionPath和fileID与HoodieRecord一致
DataBucket bucket = this.buckets.computeIfAbsent(bucketID,
k -> new DataBucket(this.config.getDouble(FlinkOptions.WRITE_BATCH_SIZE), value));
// 将HoodieRecord转换为DataItem
// DataItem为数据保存在buffer中的格式,在flush之前DataItem会再转换回HoodieRecord
final DataItem item = DataItem.fromHoodieRecord(value);
// buffer中已存元素大小加上当前dataitem是否大于batch size,如果大于需要flush
boolean flushBucket = bucket.detector.detect(item);
// 检查buffer size是否超过最大缓存容量
// 最大缓存容量为write.task.max.size - 100MB - write.merge.max_memory
boolean flushBuffer = this.tracer.trace(bucket.detector.lastRecordSize);
// 如果需要flushBucket
if (flushBucket) {
// 如果bucket数据被writeClient成功写入
if (flushBucket(bucket)) {
// tracer持有的缓存使用量减掉bucket容量
this.tracer.countDown(bucket.detector.totalSize);
// 清空bucket
bucket.reset();
}
} else if (flushBuffer) {
// 如果需要清空buffer,找到大小最大的bucket然后flush它
// find the max size bucket and flush it out
// 找到所有的bucket,按照totalSize从大到小排序
List<DataBucket> sortedBuckets = this.buckets.values().stream()
.sorted((b1, b2) -> Long.compare(b2.detector.totalSize, b1.detector.totalSize))
.collect(Collectors.toList());
// 取出第一个bucket,即totalSize最大的bucket
final DataBucket bucketToFlush = sortedBuckets.get(0);
// flush这个bucket
if (flushBucket(bucketToFlush)) {
this.tracer.countDown(bucketToFlush.detector.totalSize);
bucketToFlush.reset();
} else {
LOG.warn("The buffer size hits the threshold {}, but still flush the max size data bucket failed!", this.tracer.maxBufferSize);
}
}
// 将record加入bucket中
bucket.records.add(item);
}
CompactionPlanOperator
如果符合数据压缩的条件(Merge on Read表,并且启用异步压缩),CompactionPlanOperator
将会生成数据压缩计划。CompactionPlanOperator
不处理数据,只在checkpoint完成之后,schedule一个compact操作。
@Override
public void notifyCheckpointComplete(long checkpointId) {
try {
// 获取Hoodie表
HoodieFlinkTable hoodieTable = writeClient.getHoodieTable();
// 回滚之前没进行完的压缩操作
CompactionUtil.rollbackCompaction(hoodieTable, writeClient, conf);
// schedule一个新的压缩操作
scheduleCompaction(hoodieTable, checkpointId);
} catch (Throwable throwable) {
// make it fail safe
LOG.error("Error while scheduling compaction at instant: " + compactionInstantTime, throwable);
}
}
scheduleCompaction
方法:
private void scheduleCompaction(HoodieFlinkTable<?> table, long checkpointId) throws IOException {
// the last instant takes the highest priority.
// 获取最近一个活跃的可被压缩的instant
Option<HoodieInstant> lastRequested = table.getActiveTimeline().filterPendingCompactionTimeline()
.filter(instant -> instant.getState() == HoodieInstant.State.REQUESTED).lastInstant();
if (!lastRequested.isPresent()) {
// do nothing.
LOG.info("No compaction plan for checkpoint " + checkpointId);
return;
}
// 获取这个instant的时间
String compactionInstantTime = lastRequested.get().getTimestamp();
// 如果当前正在压缩的instant时间和最近一个活跃的可被压缩的instant时间相同
// 说明schedule的compact操作重复了
if (this.compactionInstantTime != null
&& Objects.equals(this.compactionInstantTime, compactionInstantTime)) {
// do nothing
LOG.info("Duplicate scheduling for compaction instant: " + compactionInstantTime + ", ignore");
return;
}
// generate compaction plan
// should support configurable commit metadata
// 创建HoodieCompactionPlan
HoodieCompactionPlan compactionPlan = CompactionUtils.getCompactionPlan(
table.getMetaClient(), compactionInstantTime);
if (compactionPlan == null || (compactionPlan.getOperations() == null)
|| (compactionPlan.getOperations().isEmpty())) {
// do nothing.
LOG.info("No compaction plan for checkpoint " + checkpointId + " and instant " + compactionInstantTime);
} else {
this.compactionInstantTime = compactionInstantTime;
// 获取要压缩的instant
HoodieInstant instant = HoodieTimeline.getCompactionRequestedInstant(compactionInstantTime);
// Mark instant as compaction inflight
// 标记该instant状态为inflight(正在处理)
table.getActiveTimeline().transitionCompactionRequestedToInflight(instant);
table.getMetaClient().reloadActiveTimeline();
// 创建压缩操作
List<CompactionOperation> operations = compactionPlan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("CompactionPlanOperator compacting " + operations + " files");
// 逐个发送压缩操作到下游
for (CompactionOperation operation : operations) {
output.collect(new StreamRecord<>(new CompactionPlanEvent(compactionInstantTime, operation)));
}
}
}
CompactFunction
接前一步生成的压缩计划,执行数据压缩过程。
@Override
public void processElement(CompactionPlanEvent event, Context context, Collector<CompactionCommitEvent> collector) throws Exception {
// 获取要压缩的instant
final String instantTime = event.getCompactionInstantTime();
// 获取压缩操作
final CompactionOperation compactionOperation = event.getOperation();
// 如果是异步压缩,通过线程池执行doCompaction方法
if (asyncCompaction) {
// executes the compaction task asynchronously to not block the checkpoint barrier propagate.
executor.execute(
() -> doCompaction(instantTime, compactionOperation, collector),
"Execute compaction for instant %s from task %d", instantTime, taskID);
} else {
// executes the compaction task synchronously for batch mode.
LOG.info("Execute compaction for instant {} from task {}", instantTime, taskID);
doCompaction(instantTime, compactionOperation, collector);
}
}
doCompaction
方法:
private void doCompaction(String instantTime, CompactionOperation compactionOperation, Collector<CompactionCommitEvent> collector) throws IOException {
// 通过FlinkCompactHelpers执行数据压缩操作
List<WriteStatus> writeStatuses = FlinkCompactHelpers.compact(writeClient, instantTime, compactionOperation);
// 收集数据压缩结果到下游
collector.collect(new CompactionCommitEvent(instantTime, writeStatuses, taskID));
}
到此为止,Flink写入Hudi表的流程已分析完毕。
本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。