HoodieTable 定义了写hudi 表依赖的组件及对表操作API(upsert、delete等),根据不同操作创建BaseActionExecutor完成数据的写入。
HoodieTable
- HoodieTable: 抽象实现,定义了写hudi表基本属性信息,例如FileSystemViewManager、HoodieTableMetaClient、HoodieTableMetadata、HoodieIndex。
- HoodieFlinkTable:根据表类型用来创建 HoodieFlinkCopyOnWriteTable 或者 HoodieFlinkMergeOnReadTable。
- HoodieFlinkCopyOnWriteTable: 对 cor表的insert、upsert、delete以及clean、compact相关操作。
- HoodieFlinkMergeOnReadTable:对 mor表的insert、upsert以及compact相关操作。
public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
extends HoodieFlinkTable<T> implements HoodieCompactionHandler<T> {
public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
super(config, context, metaClient);
}
@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
}
// upsert、insert、delete 对应的 ActionExecutor
public HoodieWriteMetadata<List<WriteStatus>> upsert(
HoodieEngineContext context,
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
String instantTime,
List<HoodieRecord<T>> records) {
return new FlinkUpsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute();
}
public HoodieWriteMetadata<List<WriteStatus>> insert(
HoodieEngineContext context,
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
String instantTime,
List<HoodieRecord<T>> records) {
return new FlinkInsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute();
}
public HoodieWriteMetadata<List<WriteStatus>> delete(
HoodieEngineContext context,
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
String instantTime,
List<HoodieKey> keys) {
return new FlinkDeleteCommitActionExecutor<>(context, writeHandle, config, this, instantTime, keys).execute();
}
// 对 Prepped record 的操作,暂时还不了解Prepped record在哪里用
......
@Override
public HoodieWriteMetadata<List<WriteStatus>> insertOverwriteTable(
HoodieEngineContext context,
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
String instantTime,
List<HoodieRecord<T>> records) {
return new FlinkInsertOverwriteTableCommitActionExecutor(context, writeHandle, config, this, instantTime, records).execute();
}
// 生成 HoodieCleanerPlan
@Override
public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
return new CleanPlanActionExecutor(context, config, this, instantTime, extraMetadata).execute();
}
@Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
shouldRollbackUsingMarkers).execute();
}
// 执行 clean
@Override
public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking) {
return new CleanActionExecutor(context, config, this, cleanInstantTime).execute();
}
@Override
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant,
boolean deleteInstants, boolean skipLocking) {
return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
}
// -------------------------------------------------------------------------
// Used for compaction
// -------------------------------------------------------------------------
@Override
public Iterator<List<WriteStatus>> handleUpdate(
String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
// these are updates
HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, keyToNewRecords, oldDataFile);
return handleUpdateInternal(upsertHandle, instantTime, fileId);
}
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,?,?,?> upsertHandle, String instantTime,
String fileId) throws IOException {
if (upsertHandle.getOldFilePath() == null) {
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
FlinkMergeHelper.newInstance().runMerge(this, upsertHandle);
}
// TODO(vc): This needs to be revisited
if (upsertHandle.getPartitionPath() == null) {
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
+ upsertHandle.writeStatuses());
}
return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
}
protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
if (!config.populateMetaFields()) {
try {
keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieAvroKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps())));
} catch (IOException e) {
throw new HoodieIOException("Only BaseKeyGenerator (or any key generator that extends from BaseKeyGenerator) are supported when meta "
+ "columns are disabled. Please choose the right key generator if you wish to disable meta fields.", e);
}
}
if (requireSortedRecords()) {
return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
} else {
return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
}
}
@Override
public Iterator<List<WriteStatus>> handleInsert(
String instantTime, String partitionPath, String fileId,
Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
HoodieCreateHandle<?, ?, ?, ?> createHandle =
new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier);
createHandle.write();
return Collections.singletonList(createHandle.close()).iterator();
}
}
public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
extends HoodieFlinkCopyOnWriteTable<T> {
HoodieFlinkMergeOnReadTable(
HoodieWriteConfig config,
HoodieEngineContext context,
HoodieTableMetaClient metaClient) {
super(config, context, metaClient);
}
@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> upsert(
HoodieEngineContext context,
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
String instantTime,
List<HoodieRecord<T>> hoodieRecords) {
ValidationUtils.checkArgument(writeHandle instanceof FlinkAppendHandle,
"MOR write handle should always be a FlinkAppendHandle");
// mor 表使用 FlinkAppendHandle
FlinkAppendHandle<?, ?, ?, ?> appendHandle = (FlinkAppendHandle<?, ?, ?, ?>) writeHandle;
return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute();
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> insert(
HoodieEngineContext context,
HoodieWriteHandle<?, ?, ?, ?> writeHandle,
String instantTime,
List<HoodieRecord<T>> hoodieRecords) {
if (writeHandle instanceof FlinkAppendHandle) {
// mor 表的 insert
FlinkAppendHandle<?, ?, ?, ?> appendHandle = (FlinkAppendHandle<?, ?, ?, ?>) writeHandle;
return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute();
} else {
return super.insert(context, writeHandle, instantTime, hoodieRecords);
}
}
// 压缩回滚相关
@Override
public Option<HoodieCompactionPlan> scheduleCompaction(
HoodieEngineContext context,
String instantTime,
Option<Map<String, String>> extraMetadata) {
ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor(
context, config, this, instantTime, extraMetadata,
new HoodieFlinkMergeOnReadTableCompactor());
return scheduleCompactionExecutor.execute();
}
@Override
public HoodieWriteMetadata<List<WriteStatus>> compact(
HoodieEngineContext context, String compactionInstantTime) {
RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor(
context, config, this, compactionInstantTime, new HoodieFlinkMergeOnReadTableCompactor(),
new HoodieFlinkCopyOnWriteTable(config, context, getMetaClient()));
return convertMetadata(compactionExecutor.execute());
}
@Override
public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
shouldRollbackUsingMarkers).execute();
}
@Override
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant,
boolean deleteInstants, boolean skipLocking) {
return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants,
skipLocking).execute();
}
}
BaseActionExecutor
对 HoodieTable 的操作全部由Action执行器执行,Flink操作表及executor的对应关系。
COR 表:
upsert | FlinkUpsertCommitActionExecutor |
---|---|
insert | FlinkInsertCommitActionExecutor |
delete | FlinkDeleteCommitActionExecutor |
insertOverwrite | FlinkInsertOverwriteCommitActionExecutor |
insertOverwriteTable | FlinkInsertOverwriteTableCommitActionExecutor |
rollback | CopyOnWriteRollbackActionExecutor |
MOR 表:
upsert/ insert | FlinkUpsertDeltaCommitActionExecutor |
---|---|
rollback | MergeOnReadRollbackActionExecutor |
BaseFlinkCommitActionExecutor
flink mor/cor表的数据操作依赖 BaseFlinkCommitActionExecutor#execute,根据 HoodieWriteHandle及BucketType 的不同调用 handleInsert/handleUpdate。(看看如何对接 FlinkMergeHelper及FlinkLazyInsertIterable)
BaseFlinkCommitActionExecutor#execute
public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();
List<WriteStatus> writeStatuses = new LinkedList<>();
final HoodieRecord<?> record = inputRecords.get(0);
final String partitionPath = record.getPartitionPath();
final String fileId = record.getCurrentLocation().getFileId();
// 桶之前被使用过则为 UPDATE
final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")
? BucketType.INSERT
: BucketType.UPDATE;
// 处理 handleUpsertPartition
handleUpsertPartition(
instantTime,
partitionPath,
fileId,
bucketType,
inputRecords.iterator())
.forEachRemaining(writeStatuses::addAll);
setUpWriteMetadata(writeStatuses, result);
return result;
}
protected Iterator<List<WriteStatus>> handleUpsertPartition(
String instantTime,
String partitionPath,
String fileIdHint,
BucketType bucketType,
Iterator recordItr) {
try {
if (this.writeHandle instanceof HoodieCreateHandle) {
// cor 表insert 使用
// During one checkpoint interval, an insert record could also be updated,
// for example, for an operation sequence of a record:
// I, U, | U, U
// - batch1 - | - batch2 -
// the first batch(batch1) operation triggers an INSERT bucket,
// the second batch batch2 tries to reuse the same bucket
// and append instead of UPDATE.
return handleInsert(fileIdHint, recordItr);
} else if (this.writeHandle instanceof HoodieMergeHandle) {
// mor merge 使用 / cor 表的update
return handleUpdate(partitionPath, fileIdHint, recordItr);
} else {
// flink append mor 表使用
switch (bucketType) {
case INSERT:
// bucket 内的第一条记录为 I
return handleInsert(fileIdHint, recordItr);
case UPDATE:
// bucket 内的后续记录为U
return handleUpdate(partitionPath, fileIdHint, recordItr);
default:
throw new AssertionError();
}
}
} catch (Throwable t) {
String msg = "Error upsetting bucketType " + bucketType + " for partition :" + partitionPath;
LOG.error(msg, t);
throw new HoodieUpsertException(msg, t);
}
}
// 构建 FlinkLazyInsertIterable
@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
throws Exception {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
LOG.info("Empty partition");
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
taskContextSupplier, new ExplicitWriteHandleFactory<>(writeHandle));
}
/**
*mor merge 使用 / cor 表的update
*/
@Override
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
Iterator<HoodieRecord<T>> recordItr)
throws IOException {
// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
if (!recordItr.hasNext()) {
LOG.info("Empty partition with fileId => " + fileId);
return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
}
// these are updates HoodieMergeHandle
HoodieMergeHandle<?, ?, ?, ?> upsertHandle = (HoodieMergeHandle<?, ?, ?, ?>) this.writeHandle;
return handleUpdateInternal(upsertHandle, fileId);
}
protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId)
throws IOException {
if (upsertHandle.getOldFilePath() == null) {
// 历史文件不存在
throw new HoodieUpsertException(
"Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
} else {
// 将 mergeHandle OldFilePath 中的parquet数据读取后,下发给mergeHandle 并填充 writeStatuses
FlinkMergeHelper.newInstance().runMerge(table, upsertHandle);
}
// TODO(vc): This needs to be revisited
if (upsertHandle.getPartitionPath() == null) {
LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
+ upsertHandle.writeStatuses());
}
return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
}
BaseFlinkDeltaCommitActionExecutor
mor 表的handleUpdate/handleInsert。
public abstract class BaseFlinkDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
extends BaseFlinkCommitActionExecutor<T> {
public BaseFlinkDeltaCommitActionExecutor(HoodieEngineContext context,
FlinkAppendHandle<?, ?, ?, ?> writeHandle,
HoodieWriteConfig config,
HoodieTable table,
String instantTime,
WriteOperationType operationType) {
super(context, writeHandle, config, table, instantTime, operationType);
}
@Override
public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
// 更新数据时,直接存到FlinkAppendHandle的缓存中,不走消费者和生产者模式
FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle;
// 初始化时由数据
// 1. 数据追加
appendHandle.doAppend();
// 2. 关闭状态
List<WriteStatus> writeStatuses = appendHandle.close();
return Collections.singletonList(writeStatuses).iterator();
}
@Override
public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
// 走生产和消费者模式
return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table,
idPfx, taskContextSupplier, new ExplicitWriteHandleFactory(writeHandle));
}
}
HoodieClient
hudi 文件系统对外的Client端,主要关注下upsert的流程。
- 根据表类型创建 hoodieFlinkTable。
- 根据缓存中的第一条记录信息来创建 WriteHandle。
- 调用 hoodieFlinkTable 的upsert。
HoodieFlinkWriteClient
@Override
public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {
// 创建 HoodieTable
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
// validate Upsert Schema
table.validateUpsertSchema();
// set UPSERT operation
preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
// 根据buffer中的第一条数据,创建数据写入句柄。第一条数据包含了fieldID 以及 currentInstant
final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),
instantTime, table, records.listIterator());
// 执行 HoodieTable 的 upsert
HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsert(context, writeHandle, instantTime, records);
if (result.getIndexLookupDuration().isPresent()) {
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
}
return postWrite(result, instantTime, table);
}
private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(
HoodieRecord<T> record,
HoodieWriteConfig config,
String instantTime,
HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
Iterator<HoodieRecord<T>> recordItr) {
final HoodieRecordLocation loc = record.getCurrentLocation();
final String fileID = loc.getFileId();
final String partitionPath = record.getPartitionPath();
final boolean insertClustering = config.allowDuplicateInserts();
// 历史 fileID
if (bucketToHandles.containsKey(fileID)) {
MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
// FlinkAppendHandle 不会被替换
if (lastHandle.shouldReplace()) {
HoodieWriteHandle<?, ?, ?, ?> writeHandle = insertClustering
? new FlinkConcatAndReplaceHandle<>(config, instantTime, table, recordItr, partitionPath, fileID,
table.getTaskContextSupplier(), lastHandle.getWritePath())
// 非 insertClustering
: new FlinkMergeAndReplaceHandle<>(config, instantTime, table, recordItr, partitionPath, fileID,
table.getTaskContextSupplier(), lastHandle.getWritePath());
this.bucketToHandles.put(fileID, writeHandle); // override with new replace handle
return writeHandle;
}
}
final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
if (isDelta) {
// mor表会通过创建FlinkAppendHandle 将数据追加到到log。
writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr,
table.getTaskContextSupplier());
} else if (loc.getInstantTime().equals("I")) {
// cor 表 create
writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
fileID, table.getTaskContextSupplier());
} else {
// cor 表 update
writeHandle = insertClustering
? new FlinkConcatHandle<>(config, instantTime, table, recordItr, partitionPath,
fileID, table.getTaskContextSupplier())
//
: new FlinkMergeHandle<>(config, instantTime, table, recordItr, partitionPath,
fileID, table.getTaskContextSupplier());
}
this.bucketToHandles.put(fileID, writeHandle);
return writeHandle;
}
目前,根据flink 数据写入流程将涉及的主要类做了拆解,HoodieClient -> HoodieTable-> ActionExecutor -> 生产者/消费者模式涉及的相关类 --> writeHandle。