Flink 使用介绍相关文档目录
背景
本篇接Flink 使用之 MySQL CDC。在这篇博客,我们解析CDC数据的时候用的是StringDebeziumDeserializationSchema
。实际上它仅仅调用接收到SourceRecord
的toString()
方法,将结果传递到下游。SourceRecord.toString()
返回的内容过多,层级较为复杂,给下游的解析造成了很大的困难。
为了解决这个问题,本篇为大家讲解如何自定义DeserializationSchema
。
目标
通常来说,我们关心的CDC内容包含如下:
- 数据库名
- 表名
- 操作类型(增删改)
- 变更前数据
- 变更后数据
- 其他字段(例如变更时间等,根据业务需要添加)
下面例子中,我们手工将SourceRecord
解析为一个Map类型。
实现方式
实现自己的DeserializationSchema
,我们可以参考官方给出的StringDebeziumDeserializationSchema
的源代码。如下所示:
public class StringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
private static final long serialVersionUID = -3168848963265670603L;
public StringDebeziumDeserializationSchema() {
}
public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
out.collect(record.toString());
}
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
它继承了DebeziumDeserializationSchema
,实现了2个方法:
-
deserialize
:反序列化的逻辑在此,CDC捕获到的原始数据为SourceRecord
,我们将其解析之后,通过Collector收集,即可传递给下游。 -
getProducedType
:返回解析之后的数据类型。
我们自己的解析逻辑只需要实现这个接口,在deserialize
完成自己的解析逻辑即可。当然,这个例子中仅仅是为了print看起来直观,将转换后的map转换为String后发往下游。实际生产中建议使用Java Bean,Scala Case Class或者JSON格式。
完整的代码和关键点讲解如下所示:
public class CustomDebeziumDeserializer implements DebeziumDeserializationSchema<String> {
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
Map<String, Object> parsedObject = new HashMap<>();
// 注意,SourceRecord中并没有获取操作类型的方法?;袢〔僮骼嘈托枰饷葱? Envelope.Operation operation = Envelope.operationFor(sourceRecord);
parsedObject.put("operation", operation.toString());
// topic返回的内容为:mysql_binlog_source.dbName.tableName,即数据源.数据库名.表名
// 按照业务使用要求解析即可
if (null != sourceRecord.topic()) {
String[] splitTopic = sourceRecord.topic().split("\\.");
if (splitTopic.length == 3) {
parsedObject.put("database", splitTopic[1]);
parsedObject.put("table", splitTopic[2]);
}
}
// value返回sourceRecord中携带的数据,它是一个Struct类型
// Struct的类型为org.apache.kafka.connect.data.Struct
Struct value = (Struct) sourceRecord.value();
// 变更前后的数据位于value这个Struct中,名称分别为before和after
Struct before = value.getStruct("before");
Struct after = value.getStruct("after");
// 对于新增或删除的数据,before和after可能不存在,需要做null检查
if (null != before) {
Map<String, Object> beforeMap = new HashMap<>();
// 获取Struct中包含所有字段名可以使用struct.schema().fields()方法,遍历即可
Schema beforeSchema = before.schema();
for (Field field : beforeSchema.fields()) {
beforeMap.put(field.name(), before.get(field));
}
parsedObject.put("before", beforeMap);
}
if (null != after) {
Map<String, Object> afterMap = new HashMap<>();
Schema afterSchema = after.schema();
for (Field field : afterSchema.fields()) {
afterMap.put(field.name(), after.get(field));
}
parsedObject.put("after", afterMap);
}
// 调用collector的collect方法,将转换后的数据发往下游
collector.collect(parsedObject.toString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
}
最后和Flink 使用之 MySQL CDC类似。编写Flink主程序:
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 使用MySQLSource创建数据源
// 使用自己编写的CustomDebeziumDeserializer替换掉官方提供的StringDebeziumDeserializationSchema
val sourceFunction = MySQLSource.builder().hostname("your-ip").port(3306)
.databaseList("demo").username("root").password("123456")
.deserializer(new CustomDebeziumDeserializer).build();
// 单并行度打印,避免输出乱序
env.addSource(sourceFunction).print.setParallelism(1)
env.execute()