blink hive catalog吐血整理

1.blink在flink的基础上做了大量的优化,其中有两点:

1.1Catalog

在catalog上做了如下修改和优化:

  • 通过引入全新的 ReadableCatalog and ReadableWritableCatalog 接口统一了 Flink 的内部和外部 catalog。Flink 所有的 catalog 会被 TableEnvironment 中的 CatalogManager管理。
  • 实现了两种新的 catalog - FlinkInMemoryCatalog and HiveCatalog。FlinkInMemoryCatalog 会将所有元数据存在内存中。HiveCatalog 会连接 Hive metastore 并桥接 Flink 和 Hive 之间的元数据。目前,这个HiveCatalog 可以提供读取 Hive 元数据的能力,包括数据库(databases),表(tables),表分区(table partitions), 简单的数据类型(simple data types), 表和列的统计信息(table and column stats)。
  • 重新清晰定义了引用目标的层级,即 'mycatalog.mydatabase.mytable'。通过定义默认 catalog 和默认数据库,用户可以将引用层级简单化为 'mytable’。

未来,我们还将加入对更多类型的元数据以及catalog的支持。

1.2Hive兼容性

我们的目标是在元数据(meta data)和数据层将 Flink 和 Hive 对接和打通。

  • 在这个版本上,Flink可以通过上面提到的HiveCatalog读取Hive的metaData。
  • 这个版本实现了HiveTableSource,使得Flink job可以直接读取Hive中普通表和分区表的数据,以及做分区的裁剪。

通过这个版本,用户可以使用Flink SQL读取已有的Hive meta和data,做数据处理。未来我们将在Flink上继续加大对Hive兼容性的支持,包括支持Hive特有的data type,和Hive UDF等等。

2.如何连接hive 源数据

2.1 代码

通过flink-sql连接外部数据源(比如hive),需要写一些代码声明。


image.png

2.2 blink sql-client

image.png

3. 环境准备

3.1安装hadoop

参考https://blog.csdn.net/hubin232/article/details/76769265

 cd /usr/local/Cellar/hadoop/3.1.1/libexec/sbin
 ./start-all.sh //即可启动 hadoop namenode,secondnamenode,datanode,resource mananger组件

3.2 安装hive

mac 环境下brew install hive 即可安装最新版 (需要先装mysql或者一个能连的上mysql也行)

3.3 配置

cd /usr/local/Cellar/hive/3.1.1/libexec/conf
cp hive-default.xml.template hive-site.xml

编辑hive-site.xml及后续参考http://08643.cn/p/5c11073d19d3
安装后,建表,插数据。

image.png

3.4.metastore server开启

注意?。?! 一定要确保hive开启了 metastore server
lsof -i:9083 查询是否开启了。
开启方式有两种

1

/usr/local/Cellar/hive/3.1.1/libexec/hcatalog/sbin/hcat_server.sh start
提示

Started metastore server init, testing if initialized correctly...
Metastore initialized successfully on port[9083].

就说明成功了。

2 hive --service metastore (这个没试过)

lsof -i:9083


image.png

3.5 为什么要开启metastore 呢?

blink catalog架构图


image.png

红框内就是连接的hive metastore,所以需要先开启 hive 的metastore server。

4. blink源码修改

1. sql-client Environment 类

 修改位置:
org.apache.flink.table.client.config.Environment
enrich 方法
enrichedEnv.deployment = DeploymentEntry.enrich(env.deployment, properties);
下方加入
enrichedEnv.catalogs = new HashMap<>(env.catalogs);

这块没有将catalogs复制过去,会导致从环境中读取到的catalogs丢失,用户永远没发定义catalog。

2.hive connector

官方支持的hive版本是2.4,我的是3.1.1,会报错

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.BatchTableSourceFactory' in
the classpath.

Reason: 
The matching factory 'org.apache.flink.streaming.connectors.hive.HiveTableFactory' doesn't support 'bucketing_version'.

所以要在 flink-connector-hive???/p>

在类org.apache.flink.table.catalog.hive.config.HiveTableConfig
加入:
public static final String DEFAULT_TABLE_BUCKETING_VERSION = "bucketing_version";
public static final String DEFAULT_TABLE_COLUMN_STATS_ACCURATE = "column_stats_accurate";
在类org.apache.flink.streaming.connectors.hive.HiveTableFactory 
supportedProperties 方法加入:
properties.add(HiveTableConfig.DEFAULT_TABLE_BUCKETING_VERSION);
properties.add(HiveTableConfig.DEFAULT_TABLE_COLUMN_STATS_ACCURATE);


修改pom


image.png

image.png

否则会报找不到HadoopInputFormatCommonBase 这个类。

3 jar包替换。

代码修改后
在sql-client???code>mvn clean install -Dmaven.test.skip -Dcheckstyle.skip -Drat.ignoreErrors=true
将flink-sql-client-1.5.1.jar包拷到 /apache-flink/build-target/opt/sql-client
在flink-connector-hive模块 mvn clean install -Dmaven.test.skip -Dcheckstyle.skip -Drat.ignoreErrors=true
将flink-connector-hive_2.11-1.5.1.jar拷到/apache-flink/build-target/opt/connectors

5.应用

5.1配置

进入/apache-flink/build-target/bin
cp ../conf/sql-client-default.ymal sql-client-hive.ymal
修改sql-client-hive.ymal
execution配置:
(streaming模式不支持,应该可以通过修改flink-connector-hive模块代码支持。)


image.png

catalogs配置:

image.png

5.2.执行./sql-client.sh embedded -e sql-client-hive.yaml

image.png
image.png

至此就打通blink的 sql-client与 hive源数据了。

总结:在hive环境的配置比较耗时间,blink源码catalog的bug得debug才能发现。过程比较繁琐,但是结果是很好的,通过打通catalog,flink就可以像spark一样读取并处理hive数据了,对于小公司或者数据量不大的情况下,只选用flink技术栈就可以做到批流处理。

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352