HoodieClient、HoodieTable、ActionExecutor 数据写入

HoodieTable 定义了写hudi 表依赖的组件及对表操作API(upsert、delete等),根据不同操作创建BaseActionExecutor完成数据的写入。

HoodieTable

  • HoodieTable: 抽象实现,定义了写hudi表基本属性信息,例如FileSystemViewManager、HoodieTableMetaClient、HoodieTableMetadata、HoodieIndex。
  • HoodieFlinkTable:根据表类型用来创建 HoodieFlinkCopyOnWriteTable 或者 HoodieFlinkMergeOnReadTable。
  • HoodieFlinkCopyOnWriteTable: 对 cor表的insert、upsert、delete以及clean、compact相关操作。
  • HoodieFlinkMergeOnReadTable:对 mor表的insert、upsert以及compact相关操作。

public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload>
    extends HoodieFlinkTable<T> implements HoodieCompactionHandler<T> {

  public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
    super(config, context, metaClient);
  }

  @Override
  public boolean isTableServiceAction(String actionType) {
    return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
  }

  //  upsert、insert、delete 对应的 ActionExecutor
  public HoodieWriteMetadata<List<WriteStatus>> upsert(
      HoodieEngineContext context,
      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
      String instantTime,
      List<HoodieRecord<T>> records) {
    return new FlinkUpsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute();
  }


  public HoodieWriteMetadata<List<WriteStatus>> insert(
      HoodieEngineContext context,
      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
      String instantTime,
      List<HoodieRecord<T>> records) {
    return new FlinkInsertCommitActionExecutor<>(context, writeHandle, config, this, instantTime, records).execute();
  }

  public HoodieWriteMetadata<List<WriteStatus>> delete(
      HoodieEngineContext context,
      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
      String instantTime,
      List<HoodieKey> keys) {
    return new FlinkDeleteCommitActionExecutor<>(context, writeHandle, config, this, instantTime, keys).execute();
  }

  // 对 Prepped record 的操作,暂时还不了解Prepped record在哪里用
  ......

  @Override
  public HoodieWriteMetadata<List<WriteStatus>> insertOverwriteTable(
      HoodieEngineContext context,
      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
      String instantTime,
      List<HoodieRecord<T>> records) {
    return new FlinkInsertOverwriteTableCommitActionExecutor(context, writeHandle, config, this, instantTime, records).execute();
  }

  //  生成 HoodieCleanerPlan 
  @Override
  public Option<HoodieCleanerPlan> scheduleCleaning(HoodieEngineContext context, String instantTime, Option<Map<String, String>> extraMetadata) {
    return new CleanPlanActionExecutor(context, config, this, instantTime, extraMetadata).execute();
  }

  @Override
  public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
                                                     boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
    return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
        shouldRollbackUsingMarkers).execute();
  }
  // 执行 clean
  @Override
  public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstantTime, boolean skipLocking) {
    return new CleanActionExecutor(context, config, this, cleanInstantTime).execute();
  }

  @Override
  public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant,
                                         boolean deleteInstants, boolean skipLocking) {
    return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
  }


  // -------------------------------------------------------------------------
  //  Used for compaction
  // -------------------------------------------------------------------------
  @Override
  public Iterator<List<WriteStatus>> handleUpdate(
      String instantTime, String partitionPath, String fileId,
      Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile oldDataFile) throws IOException {
    // these are updates
    HoodieMergeHandle upsertHandle = getUpdateHandle(instantTime, partitionPath, fileId, keyToNewRecords, oldDataFile);
    return handleUpdateInternal(upsertHandle, instantTime, fileId);
  }

  protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?,?,?,?> upsertHandle, String instantTime,
                                                             String fileId) throws IOException {
    if (upsertHandle.getOldFilePath() == null) {
      throw new HoodieUpsertException(
          "Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
    } else {
      FlinkMergeHelper.newInstance().runMerge(this, upsertHandle);
    }

    // TODO(vc): This needs to be revisited
    if (upsertHandle.getPartitionPath() == null) {
      LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
          + upsertHandle.writeStatuses());
    }

    return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
  }

  protected HoodieMergeHandle getUpdateHandle(String instantTime, String partitionPath, String fileId,
                                              Map<String, HoodieRecord<T>> keyToNewRecords, HoodieBaseFile dataFileToBeMerged) {
    Option<BaseKeyGenerator> keyGeneratorOpt = Option.empty();
    if (!config.populateMetaFields()) {
      try {
        keyGeneratorOpt = Option.of((BaseKeyGenerator) HoodieAvroKeyGeneratorFactory.createKeyGenerator(new TypedProperties(config.getProps())));
      } catch (IOException e) {
        throw new HoodieIOException("Only BaseKeyGenerator (or any key generator that extends from BaseKeyGenerator) are supported when meta "
            + "columns are disabled. Please choose the right key generator if you wish to disable meta fields.", e);
      }
    }
    if (requireSortedRecords()) {
      return new HoodieSortedMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
          dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
    } else {
      return new HoodieMergeHandle<>(config, instantTime, this, keyToNewRecords, partitionPath, fileId,
          dataFileToBeMerged, taskContextSupplier, keyGeneratorOpt);
    }
  }

  @Override
  public Iterator<List<WriteStatus>> handleInsert(
      String instantTime, String partitionPath, String fileId,
      Map<String, HoodieRecord<? extends HoodieRecordPayload>> recordMap) {
    HoodieCreateHandle<?, ?, ?, ?> createHandle =
        new HoodieCreateHandle(config, instantTime, this, partitionPath, fileId, recordMap, taskContextSupplier);

    createHandle.write();

    return Collections.singletonList(createHandle.close()).iterator();
  }
}

public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
    extends HoodieFlinkCopyOnWriteTable<T> {

  HoodieFlinkMergeOnReadTable(
      HoodieWriteConfig config,
      HoodieEngineContext context,
      HoodieTableMetaClient metaClient) {
    super(config, context, metaClient);
  }

  @Override
  public boolean isTableServiceAction(String actionType) {
    return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
  }

  @Override
  public HoodieWriteMetadata<List<WriteStatus>> upsert(
      HoodieEngineContext context,
      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
      String instantTime,
      List<HoodieRecord<T>> hoodieRecords) {
    ValidationUtils.checkArgument(writeHandle instanceof FlinkAppendHandle,
        "MOR write handle should always be a FlinkAppendHandle");
    //  mor 表使用 FlinkAppendHandle
    FlinkAppendHandle<?, ?, ?, ?> appendHandle = (FlinkAppendHandle<?, ?, ?, ?>) writeHandle;
    return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute();
  }

  @Override
  public HoodieWriteMetadata<List<WriteStatus>> insert(
      HoodieEngineContext context,
      HoodieWriteHandle<?, ?, ?, ?> writeHandle,
      String instantTime,
      List<HoodieRecord<T>> hoodieRecords) {
    if (writeHandle instanceof FlinkAppendHandle) {
      // mor 表的 insert
      FlinkAppendHandle<?, ?, ?, ?> appendHandle = (FlinkAppendHandle<?, ?, ?, ?>) writeHandle;
      return new FlinkUpsertDeltaCommitActionExecutor<>(context, appendHandle, config, this, instantTime, hoodieRecords).execute();
    } else {
      return super.insert(context, writeHandle, instantTime, hoodieRecords);
    }
  }

  //  压缩回滚相关
  @Override
  public Option<HoodieCompactionPlan> scheduleCompaction(
      HoodieEngineContext context,
      String instantTime,
      Option<Map<String, String>> extraMetadata) {
    ScheduleCompactionActionExecutor scheduleCompactionExecutor = new ScheduleCompactionActionExecutor(
        context, config, this, instantTime, extraMetadata,
        new HoodieFlinkMergeOnReadTableCompactor());
    return scheduleCompactionExecutor.execute();
  }

  @Override
  public HoodieWriteMetadata<List<WriteStatus>> compact(
      HoodieEngineContext context, String compactionInstantTime) {
    RunCompactionActionExecutor compactionExecutor = new RunCompactionActionExecutor(
        context, config, this, compactionInstantTime, new HoodieFlinkMergeOnReadTableCompactor(),
        new HoodieFlinkCopyOnWriteTable(config, context, getMetaClient()));
    return convertMetadata(compactionExecutor.execute());
  }

  @Override
  public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context, String instantTime, HoodieInstant instantToRollback,
                                                     boolean skipTimelinePublish, boolean shouldRollbackUsingMarkers) {
    return new BaseRollbackPlanActionExecutor(context, config, this, instantTime, instantToRollback, skipTimelinePublish,
        shouldRollbackUsingMarkers).execute();
  }

  @Override
  public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant,
                                         boolean deleteInstants, boolean skipLocking) {
    return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants,
        skipLocking).execute();
  }
}

BaseActionExecutor


对 HoodieTable 的操作全部由Action执行器执行,Flink操作表及executor的对应关系。
COR 表:

upsert FlinkUpsertCommitActionExecutor
insert FlinkInsertCommitActionExecutor
delete FlinkDeleteCommitActionExecutor
insertOverwrite FlinkInsertOverwriteCommitActionExecutor
insertOverwriteTable FlinkInsertOverwriteTableCommitActionExecutor
rollback CopyOnWriteRollbackActionExecutor

MOR 表:

upsert/ insert FlinkUpsertDeltaCommitActionExecutor
rollback MergeOnReadRollbackActionExecutor

BaseFlinkCommitActionExecutor

flink mor/cor表的数据操作依赖 BaseFlinkCommitActionExecutor#execute,根据 HoodieWriteHandle及BucketType 的不同调用 handleInsert/handleUpdate。(看看如何对接 FlinkMergeHelper及FlinkLazyInsertIterable)

BaseFlinkCommitActionExecutor#execute
public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>> inputRecords) {
    HoodieWriteMetadata<List<WriteStatus>> result = new HoodieWriteMetadata<>();

    List<WriteStatus> writeStatuses = new LinkedList<>();
    final HoodieRecord<?> record = inputRecords.get(0);
    final String partitionPath = record.getPartitionPath();
    final String fileId = record.getCurrentLocation().getFileId();
    // 桶之前被使用过则为 UPDATE
    final BucketType bucketType = record.getCurrentLocation().getInstantTime().equals("I")
        ? BucketType.INSERT
        : BucketType.UPDATE;

    // 处理 handleUpsertPartition
    handleUpsertPartition(
        instantTime,
        partitionPath,
        fileId,
        bucketType,
        inputRecords.iterator())
        .forEachRemaining(writeStatuses::addAll);

    setUpWriteMetadata(writeStatuses, result);
    return result;
  }

protected Iterator<List<WriteStatus>> handleUpsertPartition(
      String instantTime,
      String partitionPath,
      String fileIdHint,
      BucketType bucketType,
      Iterator recordItr) {
    try {
      if (this.writeHandle instanceof HoodieCreateHandle) {
        // cor 表insert 使用
        // During one checkpoint interval, an insert record could also be updated,
        // for example, for an operation sequence of a record:
        //    I, U,   | U, U
        // - batch1 - | - batch2 -
        // the first batch(batch1) operation triggers an INSERT bucket,
        // the second batch batch2 tries to reuse the same bucket
        // and append instead of UPDATE.
        return handleInsert(fileIdHint, recordItr);
      } else if (this.writeHandle instanceof HoodieMergeHandle) {
        //  mor merge 使用  / cor 表的update
        return handleUpdate(partitionPath, fileIdHint, recordItr);
      } else {
        //  flink append mor 表使用
        switch (bucketType) {
          case INSERT:
            //   bucket 内的第一条记录为 I
            return handleInsert(fileIdHint, recordItr);
          case UPDATE:
            //  bucket 内的后续记录为U
            return handleUpdate(partitionPath, fileIdHint, recordItr);
          default:
            throw new AssertionError();
        }
      }
    } catch (Throwable t) {
      String msg = "Error upsetting bucketType " + bucketType + " for partition :" + partitionPath;
      LOG.error(msg, t);
      throw new HoodieUpsertException(msg, t);
    }
  }


 // 构建 FlinkLazyInsertIterable 
 @Override
  public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr)
      throws Exception {
    // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
    if (!recordItr.hasNext()) {
      LOG.info("Empty partition");
      return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
    }

    return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table, idPfx,
        taskContextSupplier, new ExplicitWriteHandleFactory<>(writeHandle));
  }
  /**
   *mor merge 使用  / cor 表的update
   */
  @Override
  public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId,
                                                  Iterator<HoodieRecord<T>> recordItr)
      throws IOException {
    // This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 records
    if (!recordItr.hasNext()) {
      LOG.info("Empty partition with fileId => " + fileId);
      return Collections.singletonList((List<WriteStatus>) Collections.EMPTY_LIST).iterator();
    }
    // these are updates   HoodieMergeHandle
    HoodieMergeHandle<?, ?, ?, ?> upsertHandle = (HoodieMergeHandle<?, ?, ?, ?>) this.writeHandle;

    return handleUpdateInternal(upsertHandle, fileId);
  }

  protected Iterator<List<WriteStatus>> handleUpdateInternal(HoodieMergeHandle<?, ?, ?, ?> upsertHandle, String fileId)
      throws IOException {
    if (upsertHandle.getOldFilePath() == null) {
      // 历史文件不存在
      throw new HoodieUpsertException(
          "Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId);
    } else {
      //  将 mergeHandle OldFilePath 中的parquet数据读取后,下发给mergeHandle 并填充 writeStatuses
      FlinkMergeHelper.newInstance().runMerge(table, upsertHandle);
    }

    // TODO(vc): This needs to be revisited
    if (upsertHandle.getPartitionPath() == null) {
      LOG.info("Upsert Handle has partition path as null " + upsertHandle.getOldFilePath() + ", "
          + upsertHandle.writeStatuses());
    }

    return Collections.singletonList(upsertHandle.writeStatuses()).iterator();
  }

BaseFlinkDeltaCommitActionExecutor

mor 表的handleUpdate/handleInsert。

public abstract class BaseFlinkDeltaCommitActionExecutor<T extends HoodieRecordPayload<T>>
    extends BaseFlinkCommitActionExecutor<T> {

  public BaseFlinkDeltaCommitActionExecutor(HoodieEngineContext context,
                                            FlinkAppendHandle<?, ?, ?, ?> writeHandle,
                                            HoodieWriteConfig config,
                                            HoodieTable table,
                                            String instantTime,
                                            WriteOperationType operationType) {
    super(context, writeHandle, config, table, instantTime, operationType);
  }

  @Override
  public Iterator<List<WriteStatus>> handleUpdate(String partitionPath, String fileId, Iterator<HoodieRecord<T>> recordItr) {
   // 更新数据时,直接存到FlinkAppendHandle的缓存中,不走消费者和生产者模式
   FlinkAppendHandle appendHandle = (FlinkAppendHandle) writeHandle;
    //  初始化时由数据
    // 1. 数据追加
    appendHandle.doAppend();
    // 2. 关闭状态
    List<WriteStatus> writeStatuses = appendHandle.close();

    return Collections.singletonList(writeStatuses).iterator();
  }

  @Override
  public Iterator<List<WriteStatus>> handleInsert(String idPfx, Iterator<HoodieRecord<T>> recordItr) {
    // 走生产和消费者模式
    return new FlinkLazyInsertIterable<>(recordItr, true, config, instantTime, table,
        idPfx, taskContextSupplier, new ExplicitWriteHandleFactory(writeHandle));
  }
}

HoodieClient

hudi 文件系统对外的Client端,主要关注下upsert的流程。

  1. 根据表类型创建 hoodieFlinkTable。
  2. 根据缓存中的第一条记录信息来创建 WriteHandle。
  3. 调用 hoodieFlinkTable 的upsert。
HoodieFlinkWriteClient

@Override
public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {
  //  创建 HoodieTable
  HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
      getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
  //  validate Upsert Schema
  table.validateUpsertSchema();
  //  set  UPSERT  operation
  preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());

  //  根据buffer中的第一条数据,创建数据写入句柄。第一条数据包含了fieldID 以及 currentInstant
  final HoodieWriteHandle<?, ?, ?, ?> writeHandle = getOrCreateWriteHandle(records.get(0), getConfig(),
      instantTime, table, records.listIterator());

  // 执行 HoodieTable 的 upsert
  HoodieWriteMetadata<List<WriteStatus>> result = ((HoodieFlinkTable<T>) table).upsert(context, writeHandle, instantTime, records);

  if (result.getIndexLookupDuration().isPresent()) {
    metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());
  }
  return postWrite(result, instantTime, table);
}

private HoodieWriteHandle<?, ?, ?, ?> getOrCreateWriteHandle(
    HoodieRecord<T> record,
    HoodieWriteConfig config,
    String instantTime,
    HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
    Iterator<HoodieRecord<T>> recordItr) {

  final HoodieRecordLocation loc = record.getCurrentLocation();
  final String fileID = loc.getFileId();
  final String partitionPath = record.getPartitionPath();
  final boolean insertClustering = config.allowDuplicateInserts();
  //  历史 fileID
  if (bucketToHandles.containsKey(fileID)) {
    MiniBatchHandle lastHandle = (MiniBatchHandle) bucketToHandles.get(fileID);
    //  FlinkAppendHandle  不会被替换
    if (lastHandle.shouldReplace()) {
      HoodieWriteHandle<?, ?, ?, ?> writeHandle = insertClustering
          ? new FlinkConcatAndReplaceHandle<>(config, instantTime, table, recordItr, partitionPath, fileID,
              table.getTaskContextSupplier(), lastHandle.getWritePath())
          //  非 insertClustering
          : new FlinkMergeAndReplaceHandle<>(config, instantTime, table, recordItr, partitionPath, fileID,
              table.getTaskContextSupplier(), lastHandle.getWritePath());

      this.bucketToHandles.put(fileID, writeHandle); // override with new replace handle
      return writeHandle;
    }
  }

  final boolean isDelta = table.getMetaClient().getTableType().equals(HoodieTableType.MERGE_ON_READ);
  final HoodieWriteHandle<?, ?, ?, ?> writeHandle;
  if (isDelta) {
    // mor表会通过创建FlinkAppendHandle  将数据追加到到log。
    writeHandle = new FlinkAppendHandle<>(config, instantTime, table, partitionPath, fileID, recordItr,
        table.getTaskContextSupplier());
  } else if (loc.getInstantTime().equals("I")) {
    // cor 表 create
    writeHandle = new FlinkCreateHandle<>(config, instantTime, table, partitionPath,
        fileID, table.getTaskContextSupplier());
  } else {
    // cor 表 update
    writeHandle = insertClustering
        ? new FlinkConcatHandle<>(config, instantTime, table, recordItr, partitionPath,
            fileID, table.getTaskContextSupplier())
        //
        : new FlinkMergeHandle<>(config, instantTime, table, recordItr, partitionPath,
            fileID, table.getTaskContextSupplier());
  }

  this.bucketToHandles.put(fileID, writeHandle);
  return writeHandle;
}

目前,根据flink 数据写入流程将涉及的主要类做了拆解,HoodieClient -> HoodieTable-> ActionExecutor -> 生产者/消费者模式涉及的相关类 --> writeHandle。

?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容