Flink 使用之 CDC 自定义 DeserializationSchema

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

背景

本篇接Flink 使用之 MySQL CDC。在这篇博客,我们解析CDC数据的时候用的是StringDebeziumDeserializationSchema。实际上它仅仅调用接收到SourceRecordtoString()方法,将结果传递到下游。SourceRecord.toString()返回的内容过多,层级较为复杂,给下游的解析造成了很大的困难。

为了解决这个问题,本篇为大家讲解如何自定义DeserializationSchema。

目标

通常来说,我们关心的CDC内容包含如下:

  • 数据库名
  • 表名
  • 操作类型(增删改)
  • 变更前数据
  • 变更后数据
  • 其他字段(例如变更时间等,根据业务需要添加)

下面例子中,我们手工将SourceRecord解析为一个Map类型。

实现方式

实现自己的DeserializationSchema,我们可以参考官方给出的StringDebeziumDeserializationSchema的源代码。如下所示:

public class StringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
    private static final long serialVersionUID = -3168848963265670603L;

    public StringDebeziumDeserializationSchema() {
    }

    public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
        out.collect(record.toString());
    }

    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

它继承了DebeziumDeserializationSchema,实现了2个方法:

  • deserialize:反序列化的逻辑在此,CDC捕获到的原始数据为SourceRecord,我们将其解析之后,通过Collector收集,即可传递给下游。
  • getProducedType:返回解析之后的数据类型。

我们自己的解析逻辑只需要实现这个接口,在deserialize完成自己的解析逻辑即可。当然,这个例子中仅仅是为了print看起来直观,将转换后的map转换为String后发往下游。实际生产中建议使用Java Bean,Scala Case Class或者JSON格式。

完整的代码和关键点讲解如下所示:

public class CustomDebeziumDeserializer implements DebeziumDeserializationSchema<String> {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
        Map<String, Object> parsedObject = new HashMap<>();

        // 注意,SourceRecord中并没有获取操作类型的方法?;袢〔僮骼嘈托枰饷葱?        Envelope.Operation operation = Envelope.operationFor(sourceRecord);
        parsedObject.put("operation", operation.toString());

        // topic返回的内容为:mysql_binlog_source.dbName.tableName,即数据源.数据库名.表名
        // 按照业务使用要求解析即可
        if (null != sourceRecord.topic()) {
            String[] splitTopic = sourceRecord.topic().split("\\.");
            if (splitTopic.length == 3) {
                parsedObject.put("database", splitTopic[1]);
                parsedObject.put("table", splitTopic[2]);
            }
        }

        // value返回sourceRecord中携带的数据,它是一个Struct类型
        // Struct的类型为org.apache.kafka.connect.data.Struct
        Struct value = (Struct) sourceRecord.value();
        // 变更前后的数据位于value这个Struct中,名称分别为before和after
        Struct before = value.getStruct("before");
        Struct after = value.getStruct("after");

        // 对于新增或删除的数据,before和after可能不存在,需要做null检查
        if (null != before) {
            Map<String, Object> beforeMap = new HashMap<>();
            // 获取Struct中包含所有字段名可以使用struct.schema().fields()方法,遍历即可
            Schema beforeSchema = before.schema();
            for (Field field : beforeSchema.fields()) {
                beforeMap.put(field.name(), before.get(field));
            }
            parsedObject.put("before", beforeMap);
        }

        if (null != after) {
            Map<String, Object> afterMap = new HashMap<>();
            Schema afterSchema = after.schema();
            for (Field field : afterSchema.fields()) {
                afterMap.put(field.name(), after.get(field));
            }

            parsedObject.put("after", afterMap);
        }

        // 调用collector的collect方法,将转换后的数据发往下游
        collector.collect(parsedObject.toString());
    }

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }
}

最后和Flink 使用之 MySQL CDC类似。编写Flink主程序:

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 使用MySQLSource创建数据源
// 使用自己编写的CustomDebeziumDeserializer替换掉官方提供的StringDebeziumDeserializationSchema
val sourceFunction = MySQLSource.builder().hostname("your-ip").port(3306)
    .databaseList("demo").username("root").password("123456")
    .deserializer(new CustomDebeziumDeserializer).build();

// 单并行度打印,避免输出乱序
env.addSource(sourceFunction).print.setParallelism(1)

env.execute()
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容