Apache Spark with Scala-Hands on with big data
scala是funcional language
prep
intelliJ import项目的时候报错:scala compiler not enough space
增加环境变量 _JAVA_OPTIONS: -Xmx512M
会报错 picked up 。。。但可以正常跑完
新建 scala worksheet
scala 基本语法
- values不可更改
val hello: String = "Hello"
- varables 可更改
- 类型:Int Boolean Char Double Float Long Byte
println()
println(f"is $a%.3f")
类似prinf,formating print;$表示一个变量;%部分为格式设定
%05d
补齐5位
println(s"is $a")
插入变量
${1+2}
expression计算
正则表达式
val theUltimateAnswer: String = "To life, the universe, and everything is 42."
val pattern = """.* ([\d]+).*""".r
val pattern(answerString) = theUltimateAnswer
val answer = answerString.toInt
println(answer)
// VALUES are immutable constants.
val hello: String = "Hola!"
// VARIABLES are mutable
var helloThere: String = hello
helloThere = hello + " There!"
println(helloThere)
val immutableHelloThere = hello + " There"
println(immutableHelloThere)
// Data Types
val numberOne: Int = 1
val truth: Boolean = true
val letterA: Char = 'a'
val pi: Double = 3.14159265
val piSinglePrecision: Float = 3.14159265f
val bigNumber: Long = 123456789
val smallNumber: Byte = 127
println("Here is a mess: " + numberOne + truth + letterA + pi + bigNumber)
println(f"Pi is about $piSinglePrecision%.3f")
println(f"Zero padding on the left: $numberOne%05d")
println(s"I can use the s prefix to use variables like $numberOne $truth $letterA")
println(s"The s prefix isn't limited to variables; I can include any expression. Like ${1+2}")
// Booleans
val isGreater = 1 > 2
val isLesser = 1 < 2
val impossible = isGreater & isLesser
val anotherWay = isGreater || isLesser
val picard: String = "Picard"
val bestCaptain: String = "Picard"
val isBest: Boolean = picard == bestCaptain
Control Flow
// Flow control
// If / else:
if (1 > 3) println("Impossible!") else println("The world makes sense.")
if (1 > 3) {
println("Impossible!")
println("Really?")
} else {
println("The world makes sense.")
println("still.")
}
// Matching
val number = 2
number match {
case 1 => println("One")
case 2 => println("Two")
case 3 => println("Three")
case _ => println("Something else")
}
for (x <- 1 to 4) {
val squared = x * x
println(squared)
}
var x = 10
while (x >= 0) {
println(x)
x -= 1
}
x = 0
do { println(x); x+=1 } while (x <= 10)
// Expressions
{val x = 10; x + 20}
println({val x = 10; x + 20})
expression
{val x = 10; x + 20}
返回表达式最后的值
Functions
不要忘记等号
不需要return,最后一个表达式的值会被默认返回
def squareIt(x: Int) : Int = {
x * x
}
函数可以将函数作为参数
def transformInt(x: Int, f: Int => Int): Int = {
f(x)
}
将函数名称作为参数传给y
或者放一个匿名函数
transformInt(2, cubeIt)
transformInt(3, x => x * x * x)
完整代码
// Functions
// format def <function name>(parameter name: type...) : return type = { }
def squareIt(x: Int) : Int = {
x * x
}
def cubeIt(x : Int) : Int = {x * x * x}
println(squareIt(2))
println(cubeIt(3))
def transformInt(x: Int, f: Int => Int): Int = {
f(x)
}
val result = transformInt(2, cubeIt)
println(result)
// 匿名函数
transformInt(3, x => x * x * x)
transformInt(10, x => x / 2)
transformInt(2, x => {val y = x * 2; y * y}) //多行匿名函数
data structure
tuples
可以不同类型
1-based
// Tuples
// Immutable lists
val captainStuff = ("Picard", "Enterprise-D", "NCC-1701-D")
println(captainStuff)
// Refer to the individual fields with a ONE-BASED index
println(captainStuff._1)
println(captainStuff._2)
println(captainStuff._3)
val picardsShip = "Picard" -> "Enterprise-D"
println(picardsShip._2)
val aBunchOfStuff = ("Kirk", 1964, true)
lists
必须同一类型
0-based
head :第一个元素
tail:除去第一个的剩下的元素
map:将函数应用于list所有元素
reduce:当前输出给x,新元素给y。
合并list: ++
// Lists
// Like a tuple, but more functionality
// Must be of same type
val shipList = List("Enterprise", "Defiant", "Voyager", "Deep Space Nine")
println(shipList(1))
// zero-based
println(shipList.head)
println(shipList.tail)
for (ship <- shipList) {println(ship)}
val backwardShips = shipList.map( (ship: String) => {ship.reverse})
for (ship <- backwardShips) {println(ship)}
// reduce() to combine together all the items in a collection using some function
val numberList = List(1, 2, 3, 4,5 )
val sum = numberList.reduce( (x: Int, y: Int) => x + y)
println(sum)
// filter() removes stuff
val iHateFives = numberList.filter( (x: Int) => x != 5)
val iHateThrees = numberList.filter(_ != 3)
// Concatenate lists
val moreNumbers = List(6,7,8)
val lotsOfNumbers = numberList ++ moreNumbers
val reversed = numberList.reverse
val sorted = reversed.sorted
val lotsOfDuplicates = numberList ++ numberList
val distinctValues = lotsOfDuplicates.distinct
val maxValue = numberList.max
val total = numberList.sum
val hasThree = iHateThrees.contains(3)
Maps
类似字典
// MAPS
val shipMap = Map("Kirk" -> "Enterprise", "Picard" -> "Enterprise-D", "Sisko" -> "Deep Space Nine", "Janeway" -> "Voyager")
println(shipMap("Janeway"))
println(shipMap.contains("Archer"))
val archersShip = util.Try(shipMap("Archer")) getOrElse "Unknown"
println(archersShip)
RDD
RDD: Resilient Distributed Dataset
rows
transforming RDDs
- map
- flatmap:one row of RDD -> multiple rows of RDDs
- filter
- distinct
- sample
- union, intersection, substract, cartesian
RDD actions - collect
- count
- countByValue
- take
- top
- reduce
Key/Value RDD
totalsByAge = rdd.map( x => (x,1))
- reduceByKey()
rdd.reduceByKey((x+y) => x+y)
将同一个key的所有values相加。x可认为是当前running total,y是新的一个value - groupByKey()
- sortByKey
-keys() values() :创建一个RDD,只有keys 或者values - join,rightOuterJoin, leftOuterJoin,cogroup,subtractByKey
- mapValues :只针对value应用函数
代码解读
rdd的每行是一个tuple (age,numFriends)
- mapValues :values从numFriends,变为 (numFriends,1)
- reduceByKey:x和y都是tuples,前者是部分计算的结果,后者是新的未计算的一个tuple。 tuple的第一个元素加和,tuple的第二个元素加和
- 结果 (age, (totalFriends, totalInstances))
val totalsByAge = rdd.mapValues(x => (x, 1)).reduceByKey( (x,y) => (x._1 + y._1, x._2 + y._2))
Filter
括号里写一个返回布尔值的函数
val minTemps = parsedLines.filter(x=>x._2 == "TMIN")
Map & FlatMap
Map是一对一的转换,row in row out
FlatMap是一对多的转换
正则表达式
\\W+
项目总结
RatingsCounter
数据
- 创建一个sc对象,读取数据
- lines为RDD,每行为一个String
- map 每行执行行数,提取第三列 。存入ratings(RDD)
- countByValue 对所有行统计,每个unique值计数。results(Map[String,Long])
- results.toSeq.sortBy(_._1) Map可以排序,按照第一列
6.打印 foreach(println)
val sc = new SparkContext("local[*]", "RatingsCounter")
val lines = sc.textFile("data/ml-100k/u.data")
MaxTemperatures
parseLine 函数 :读入one line,返回tuple
var fields = lines.split(",")
val.toFloat
val.toInt
rdd.filter(x => x._2 == "TMAX")
reduceByKey
result = rdd.collect() 这时才会计算。