本文的pyspark代码是在jupyter中执行的,我的python环境用的是anaconda,版本为3.7 。如果你还没有搭建环境的话可以看我另外两篇文章
win10安装anaconda
在Windows上搭建pyspark环境
anaconda中自带jupyter,打开就能愉快的编码了。
废话不多说,直接上代码
创建SparkContext
from pyspark import SparkConf,SparkContext
# 创建spark上下文环境
conf = SparkConf().setMaster("local[*]").setAppName("wordcount")
sc = SparkContext.getOrCreate(conf)
# 用内存中的数据创建一个RDD
list = [1,2,3,4,5]
rdd = sc.parallelize(list)
print(rdd.collect())
[1, 2, 3, 4, 5]
# 打印分区数
print(rdd.getNumPartitions())
# 重新分区并按分区打印
print(rdd.repartition(3).glom().collect())
4
[[], [1, 3], [2, 4, 5]]
rdd transformation转换算子
numbersRdd = sc.parallelize(range(1,11))
print(numbersRdd.collect())
# map
mapRdd = numbersRdd.map(lambda x : x*x)
print(mapRdd.collect())
# filter
filterRdd = numbersRdd.filter(lambda x : x%2 == 0)
print(filterRdd.collect())
# flatMap
flatMapRdd = numbersRdd.flatMap(lambda x : (x,x*x))
print(flatMapRdd.collect())
print(flatMapRdd.distinct())
# sample
sampleRdd = numbersRdd.sample(withReplacement = True,fraction = 0.5,seed=10)
print(sampleRdd.collect())
# sortBy
sortedRdd = flatMapRdd.sortBy(keyfunc = lambda x : x,ascending = False)
print(sortedRdd.collect())
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]
[2, 4, 6, 8, 10]
[1, 1, 2, 4, 3, 9, 4, 16, 5, 25, 6, 36, 7, 49, 8, 64, 9, 81, 10, 100]
PythonRDD[41] at RDD at PythonRDD.scala:53
[5, 9]
[100, 81, 64, 49, 36, 25, 16, 10, 9, 9, 8, 7, 6, 5, 4, 4, 3, 2, 1, 1]
# 在一个job中连续使用转换API
def odd(x):
if x%2 == 1:
return 2*x
else:
return x
resultRdd = numbersRdd.map(lambda x: odd(x)).filter(lambda x :x > 6).distinct()
print(resultRdd.collect())
[8, 10, 14, 18]
actin算子
rdd = sc.parallelize(["Hello hello","Hello New York","Hi Beijing"])
pairRdd = (rdd.flatMap(lambda x : x.split(" "))
.map(lambda word : word.lower())
.map(lambda word : (word,1)))
# groupByKey
groupRdd = pairRdd.groupByKey()
print(groupRdd.collect())
# reduceByKey
reduceRdd = pairRdd.reduceByKey(lambda x,y:x+y)
print(reduceRdd.collect())
# sortByKey
sortedRdd = reduceRdd.sortByKey(ascending = False,keyfunc = lambda x:x)
print(sortedRdd.collect())
# aggregateByKey
zeroValue = 0
seqFunc = lambda a,b : a+b
combFunc = lambda x,y : x+y
aggregateRdd = pairRdd.aggregateByKey(zeroValue,seqFunc,combFunc)
print(aggregateRdd.collect())
# sampleByKey
sampleRdd = pairRdd.sampleByKey(withReplacement=False,fractions={'hello':0.2,'new':0.1,'hi':0.2,'beijing':0.2,'york':0.1},seed = 81)
print(sampleRdd.collect())
[('hello', <pyspark.resultiterable.ResultIterable object at 0x0000014CA57BB448>), ('hi', <pyspark.resultiterable.ResultIterable object at 0x0000014CA57BB1C8>), ('beijing', <pyspark.resultiterable.ResultIterable object at 0x0000014CA578CB88>), ('new', <pyspark.resultiterable.ResultIterable object at 0x0000014CA578C108>), ('york', <pyspark.resultiterable.ResultIterable object at 0x0000014CA5C16848>)]
[('hello', 3), ('hi', 1), ('beijing', 1), ('new', 1), ('york', 1)]
[('york', 1), ('new', 1), ('hi', 1), ('hello', 3), ('beijing', 1)]
[('hello', 3), ('hi', 1), ('beijing', 1), ('new', 1), ('york', 1)]
[('hello', 1), ('york', 1), ('beijing', 1)]
两个RDD之间使用的算子
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize([2,3,4])
# 并集union
unionRdd = rdd1.union(rdd2)
print(unionRdd.collect())
# 交集 intersection
intersectionRdd = rdd1.intersection(rdd2)
print(intersectionRdd.collect())
# 差集substract
subtractRdd = rdd1.subtract(rdd2)
print(subtractRdd.collect())
# 笛卡尔积cartesian
cartesianRdd = rdd1.cartesian(rdd2)
print(cartesianRdd.collect())
[1, 2, 3, 2, 3, 4]
[2, 3]
[1]
[(1, 2), (1, 3), (1, 4), (2, 2), (2, 3), (2, 4), (3, 2), (3, 3), (3, 4)]
rdd1 = sc.parallelize([
("Bob","Jack"),
("Bob","John"),
("lown","jane"),
("Boss","Mary")
])
rdd2 = sc.parallelize([
("Bob",10),
("Boss",7),
("hello",6)
])
# 内连接
innerRdd = rdd1.join(rdd2)
print(innerRdd.collect())
# 左连接
leftRdd = rdd1.leftOuterJoin(rdd2)
print(leftRdd.collect())
# 右连接
rigthRdd = rdd1.rightOuterJoin(rdd2)
print(rigthRdd.collect())
# 全连接
fullRdd = rdd1.fullOuterJoin(rdd2)
print(fullRdd.collect())
[('Boss', ('Mary', 7)), ('Bob', ('Jack', 10)), ('Bob', ('John', 10))]
[('lown', ('jane', None)), ('Boss', ('Mary', 7)), ('Bob', ('Jack', 10)), ('Bob', ('John', 10))]
[('hello', (None, 6)), ('Boss', ('Mary', 7)), ('Bob', ('Jack', 10)), ('Bob', ('John', 10))]
[('lown', ('jane', None)), ('hello', (None, 6)), ('Boss', ('Mary', 7)), ('Bob', ('Jack', 10)), ('Bob', ('John', 10))]
wordcount词频统计
# 读取本地文件系统
path = "file:///E:\spark-2.4.6-bin-hadoop2.7\data\mllib\sample_fpgrowth.txt"
rdd = sc.textFile(path)
print(rdd.collect())
result = (rdd.flatMap(lambda x : x.split(" "))
.map(lambda word : (word,1))
.repartition(10)
.reduceByKey(lambda a,b: a+b)
)
print(result.collect())
['r z h k p', 'z y x w v u t s', 's x o n r', 'x z y m t s q e', 'z', 'x z y r q t p']
[('u', 1), ('m', 1), ('y', 3), ('k', 1), ('t', 3), ('o', 1), ('n', 1), ('e', 1), ('r', 3), ('h', 1), ('p', 2), ('s', 3), ('x', 4), ('q', 2), ('z', 5), ('w', 1), ('v', 1)]
读写hdfs
# 读取hdfs文件,生成RDD,spark conf中不加入hdfs相关的两个配置文件的话默认是本地
path = "hdfs://110.141.77.118:8020/user/sjmt_ml/data/mllib/sample_fpgrowth.txt"
rdd = sc.textFile(path)
# print(rdd.collect())
# 词频统计
result = (rdd.flatMap(lambda x : x.split(" "))
.map(lambda word : (word,1))
.repartition(10)
.reduceByKey(lambda a,b: a+b)
)
result.saveAsTextFile("/user/sjmt_ml/result/wc" + datetime.now().strftime('%Y-%m-%d'))
def func(iter):
for i in iter:
return i
result.mapPartitions(lambda iter : func(iter)).collect()
['r z h k p', 'z y x w v u t s', 's x o n r', 'x z y m t s q e', 'z', 'x z y r q t p']