使用 Spark 读写 HBase 数据

Use Spark to read and write HBase data

启动 hbase

start-hbase.sh

在 HBase 中准备 sample 数据

  • 1、运行 HBase shell
hbase shell
  • 2、创建一个含有 PersonalOffice 列簇的 Contacts
create 'Contacts', 'Personal', 'Office'
  • 3、加载一些样例数据
put 'Contacts', '1000', 'Personal:Name', 'John Dole'
put 'Contacts', '1000', 'Personal:Phone', '1-425-000-0001'
put 'Contacts', '1000', 'Office:Phone', '1-425-000-0002'
put 'Contacts', '1000', 'Office:Address', '1111 San Gabriel Dr.'
put 'Contacts', '8396', 'Personal:Name', 'Calvin Raji'
put 'Contacts', '8396', 'Personal:Phone', '230-555-0191'
put 'Contacts', '8396', 'Office:Phone', '230-555-0191'
put 'Contacts', '8396', 'Office:Address', '5415 San Gabriel Dr.'

运行 Spark Shell 引用 Spark HBase Connector

spark-shell --packages com.hortonworks:shc-core:1.1.1-2.1-s_2.11 --repositories http://repo.hortonworks.com/content/repositories/releases

这会下载很多 jar 包, 完成后会进入 spark shell 界面, 继续下面的步骤。

定义一个 Catalog 并查询

  • 1)在你的 Spark Shell 中, 运行下面的 import 语句:
import org.apache.spark.sql.{SQLContext, _}
import org.apache.spark.sql.execution.datasources.hbase._
import org.apache.spark.{SparkConf, SparkContext}
import spark.sqlContext.implicits._
  • 2)给 HBase 表中的 Contacts 表定义一个 catalog

a. 给名为 Contacts 的 HBase 表定义一个 catalog
b. 将 rowkey 标识为 key,并将 Spark 中使用的列名映射到 HBase 中使用的列族,列名和列类型
c. 还必须将 r??owkey 详细定义为命名列(rowkey),其具有 rowkey 的特定列族 cf

def catalog = s"""{
     |"table":{"namespace":"default", "name":"Contacts"},
     |"rowkey":"key",
     |"columns":{
     |"rowkey":{"cf":"rowkey", "col":"key", "type":"string"},
     |"officeAddress":{"cf":"Office", "col":"Address", "type":"string"},
     |"officePhone":{"cf":"Office", "col":"Phone", "type":"string"},
     |"personalName":{"cf":"Personal", "col":"Name", "type":"string"},
     |"personalPhone":{"cf":"Personal", "col":"Phone", "type":"string"}
     |}
 |}""".stripMargin
    1. 定义一个方法,在 HBase 中的 Contacts 表周围提供 DataFrame
def withCatalog(cat: String): DataFrame = {
         spark.sqlContext
         .read
         .options(Map(HBaseTableCatalog.tableCatalog->cat))
         .format("org.apache.spark.sql.execution.datasources.hbase")
         .load()
     }
    1. 创建一个 DataFrame 的实例
val df = withCatalog(catalog)
  • 5)查询这个 DataFrame
df.show()
  • 6)你应该看到两行数据:
+------+--------------------+--------------+-------------+--------------+
|rowkey|       officeAddress|   officePhone| personalName| personalPhone|
+------+--------------------+--------------+-------------+--------------+
|  1000|1111 San Gabriel Dr.|1-425-000-0002|    John Dole|1-425-000-0001|
|  8396|5415 San Gabriel Dr.|  230-555-0191|  Calvin Raji|  230-555-0191|
+------+--------------------+--------------+-------------+--------------+
  • 7)注册临时表,以便使用 Spark SQL 查询 HBase 表
df.registerTempTable("contacts")
  • 8)针对 contacts 表发出 SQL 查询:
val query = spark.sqlContext.sql("select personalName, officeAddress from contacts")
query.show()
  • 9)你应该看到这样的结果:
+-------------+--------------------+
| personalName|       officeAddress|
+-------------+--------------------+
|    John Dole|1111 San Gabriel Dr.|
|  Calvin Raji|5415 San Gabriel Dr.|
+-------------+--------------------+

插入新数据

  • 1)要插入新的 Contact 联系人记录,请定义 ContactRecord 类:
case class ContactRecord(
     rowkey: String,
     officeAddress: String,
     officePhone: String,
     personalName: String,
     personalPhone: String
     )
  • 2)创建 ContactRecord 的实例并将其放在一个数组中:
val newContact = ContactRecord("16891", "40 Ellis St.", "674-555-0110", "John Jackson","230-555-0194")

var newData = new Array[ContactRecord](1)
newData(0) = newContact
  • 3)将新数据数组保存到 HBase:
sc.parallelize(newData).toDF.write.options(Map(HBaseTableCatalog.tableCatalog -> catalog,HBaseTableCatalog.newTable -> "5")).format("org.apache.spark.sql.execution.datasources.hbase").save()
  • 4)检查结果:
df.show()
    1. 你应该看到这样的输出:
+------+--------------------+--------------+------------+--------------+
|rowkey|       officeAddress|   officePhone|personalName| personalPhone|
+------+--------------------+--------------+------------+--------------+
|  1000|1111 San Gabriel Dr.|1-425-000-0002|   John Dole|1-425-000-0001|
| 16891|        40 Ellis St.|  674-555-0110|John Jackson|  230-555-0194|
|  8396|5415 San Gabriel Dr.|  230-555-0191| Calvin Raji|  230-555-0191|
+------+--------------------+--------------+------------+--------------+

参考文献

?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351

推荐阅读更多精彩内容