前言
本文主要涉及的技术:HDFS、MapReduce、YARN、Avro、Flume、Pig、Crunch、Hive、HBase、Spark、Flink、Beam。每个技术会大概讲一下其架构和功能,让大家对大数据技术体系有个宏观的认识。
2003~2005 Google连续发表3篇论文,用于解决大规模数据存储和处理,依据三篇论文思想,业界开源出了Hadoop相关大数据组件,拉开了大数据技术帷幕。
1. 2003年 GFS:《The Google File System 》(https://blog.csdn.net/xuleicsu/article/details/526386):解决非结构化数据大规模存储
2. 2004年 MapReduce:《MapReduce: Simplified Data Processing on Large Clusters》(https://blog.csdn.net/active1001/article/details/1675920):解决大规模数据处理
3. 2005年 BigTale: 《Bigtable: A Distributed Storage System for Structured Data》(https://blog.csdn.net/accesine960/article/details/595628):解决结构化/半结构化数据存储
1. HDFS
针对GFS论文的开源实现版本
架构:
文件最小存储单位:chunk(文件块),默认128MB
HDFS上文件被划分为多个分块(chunk),作为独立的存储单元。但与面向单一磁盘的文件系统不同的是,HDFS中小于一个块大小的文件不会占据整个块空间(例如:当一个1MB的文件存储在一个128MB的块中时,文件只使用1MB的磁盘空间,而不是128MB)。
Namenode具备主备双节点。当活动的namenode失效,备用的namenode就会接管它的任务并开始服务于来自客户端的请求。
1.1?HDFS Federation
联邦HDFS:每个namenode管理文件系统命名空间的一部分
由于NameNode要管理所有的文件和数据块的引用关系,对于一个超大规模的HDFS集群,Namenode成为瓶颈。联邦HDFS能够支持多个Namenode,对Namenode数据进行分片,减轻每个Namenode存储和请求压力。
例如:一个Namenode可能管理/user目录下的所有文件,另一个Namenode管理/share目录下的所有文件。
1.2 HDFS读取流程
1.3 HDFS写流程?
2. MapReduce
针对MapReduce论文的开源实现版本
MapReduce运行逻辑图如下:(此过程也称为shuffle)
默认:一个chunk(文件块)会启动一个Map任务
当对HDFS上的一个文件运行MapReduce任务时,如果该文件大小是1280MB,那么该文件就会对应10个chunk,默认会启动10个Map任务去处理(并发度可以理解为10),每个Map任务处理一个chunk,并且该Map任务会尽量分配和chunk在相同的机器上,避免文件读取跨网络
Reduce任务数量并非由输入数据大小决定的,相反是独立指定的。默认任务数是1,可以在运行时指定reduce任务数
Map:对数据进行处理,并按照一定分区规则,将处理好的数据,下发给Reduce任务。
Map输出会写到运行Map任务机器本地磁盘上
Reduce:对分区后的数据进行处理输出(一般也输出在HDFS 存储上)
Map任务完成后,它们会使用心跳机制通知她们的application master。因此,application master知道Map输出和主机位置之间的映射关系。reducer中的一个线程定期询问master以便获取Map输出主机的位置,并开始将数据复制到Reduce任务的JVM内中,如果内存放不下,就会溢出写到磁盘中。
2.1 MapReduce运行架构
Hadoop 1.x版本时:
Hadoop 2.x版本时:
通过YARN进行资源分配
相当于将1.x中JobTracker拆分成:ResourceManager、MRAppMaster两个角色。减轻JobTracker压力。ResourceManager只负责资源管理,MRAppMaster:每个MapReduce任务对应一个。不同的MapReduce任务的Master可以在不同机器上,避免一个JobTracker要管理很多个MapReduce任务,造成压力过大。
2.2 wordcount例子
统计一段文本中,每个单词出现的次数。
Map:
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* Mapper类继承Mapper方法
* <LongWritable,Text,Text, IntWritable> 输入的key-value类型&输出的key-value 类型
*/
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
? ? // 重写Mapper父类的map方法,实现对value的操作
? ? //Map端核心业务的处理方法,每输入一行数据会调用一次
? ? @Override
? ? protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
? ? ? ? String string = value.toString();
? ? ? ? String[] word = string.split(" ");
? ? ? ? if (StringUtils.isEmpty(string)){
? ? ? ? ? ? System.out.println("为空");
? ? ? ? }
? ? ? ? // 遍历word数组 封装数据key-value
? ? ? ? for (String s : word) {
? ? ? ? ? ? // 将遍历出来的单词s写出去 传递给redcue处理
? ? ? ? ? ? System.out.println("test s:"+ s );
? ? ? ? ? ? context.write(new Text(s),new IntWritable(1));
? ? ? ? }
? ? }
}
Reduce:
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* 继承Redcuer类,重写reduce方法,实现聚合操作
* <Text,Text,Text,Text> 输入的key-value & 输出的key-value
*/
public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
? ? //和Map方法一样,每输入一行数据会调用一次
? ? private Text outk = new Text();
? ? private? IntWritable outv = new IntWritable();
? ? @Override
? ? protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
? ? ? //定义一个变量,实现累加操作
? ? ? ? int total = 0;
? ? ? ? for (IntWritable value : values) {
? ? ? ? ? ? outk.set(key);
? ? ? ? ? ? total += value.get();
? ? ? ? }
? ? ? ? outv.set(total);
? ? ? ? context.write(outk, outv);
? ? }
}
Driver:启动类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.io.Text;
import java.io.IOException;
/**
* MR程序的驱动类:主要用于提交MR任务
*/
public class WordCountDriver {
? ? public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
? ? ? ? // 1- 创建job对象
? ? ? ? Configuration conf = new Configuration();
? ? ? ? Job job = Job.getInstance(conf);
? ? ? ? // 2- 设置Driver驱动类
? ? ? ? job.setJarByClass(WordCountDriver.class);
? ? ? ? // 3- 设置读取文件的输入目录
? ? ? ? FileInputFormat.setInputPaths(job,new Path("data\\word.txt"));
? ? ? ? // 4- 设置Mapper的主类
? ? ? ? job.setMapperClass(WordCountMapper.class);
? ? ? ? // 5- 设置Mapper的输出key-value
? ? ? ? job.setMapOutputKeyClass(Text.class);
? ? ? ? job.setMapOutputValueClass(IntWritable.class);
? ? ? ? // 6- 设置Redcuer的主类
? ? ? ? job.setReducerClass(WordCountReducer.class);
? ? ? ? // 7- 设置Redcuer的输出key-value
? ? ? ? job.setOutputKeyClass(Text.class);
? ? ? ? job.setOutputValueClass(IntWritable.class);
? ? ? ? // 8- 设置文件的输出路径
? ? ? ? FileOutputFormat.setOutputPath(job,new Path("output\\wcoutput2"));
? ? ? ? // 9- 提交job
? ? ? ? boolean flag = job.waitForCompletion(true);
? ? ? ? System.out.println(flag ? 0:1);
? ? }
}
3. YARN
Hadoop集群资源管理系统
负责管理Hadoop上机器资源,以及进行任务和资源分配,同时对相关任务通过container或者cgroup进行cpu、内存等资源的限制,上面提到MapRedce 2.x允许架构就是通过YARN运行的。
同时YARN也是作为SPARK、FLINK等组件运行调度器,有点类似k8s
4. AVRO
序列化框架,普遍用于大数据生态下。
Avro属于Apache Hadoop的一个子项目。 Avro提供两种序列化格式:JSON格式或者Binary格式。Binary格式在空间开销和解析性能方面可以和Protobuf媲美,JSON格式方便测试阶段的调试。 Avro支持的数据类型非常丰富,包括C++语言里面的union类型。Avro支持JSON格式的IDL和类似于Thrift和Protobuf的IDL,这两者之间可以互转。Schema可以在传输数据的同时发送,加上JSON的自我描述属性,这使得Avro非常适合动态类型语言。 Avro在做文件持久化的时候,一般会和Schema一起存储,所以Avro序列化文件自身具有自我描述属性,所以非常适合于做Hive、Pig和MapReduce的持久化数据格式。对于不同版本的Schema,在进行RPC调用的时候,服务端和客户端可以在握手阶段对Schema进行互相确认,大大提高了最终的数据解析速度。
Avro提供了两种序列化和反序列化的方式:一种是通过Schema文件来生成代码的方式,一种是不生成代码的通用方式,这两种方式都需要构建Schema文件。
4.1 示例
通过Avro不生成代码方式,定义一个部门类的序列化。
定义dept.avsc:
{
"namespace":"com.hc.bean",
"type":"record",
"name":"Dept",
"fields":[
? {"name":"deptno","type":"int"},
? {"name":"dname","type":"string"},
? {"name":"loc","type":"string"}
]
}
序列化:
@Test
public void serialize() throws IOException {
? ? Schema schema = new Schema.Parser().parse(new File("src/main/resources/dept.avsc"));
? ? GenericRecord dept = new GenericData.Record(schema);
? ? dept.put("deptno", 90);
? ? dept.put("dname", "ee");
? ? dept.put("loc", "eeeeeeeee");
? ? DatumWriter<GenericRecord> datumWriter = new SpecificDatumWriter<>(schema); //泛型参数为GenericRecord
? ? DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter);
? ? dataFileWriter.create(schema, new File("dept.avro"));
? ? dataFileWriter.append(dept);
? ? dataFileWriter.close();
}
反序列化:
@Test
public void deSerialize() throws IOException {
? ? Schema schema = new Schema.Parser().parse(new File("src/main/resources/dept.avsc"));
? ? File file = new File("dept.avro");
? ? DatumReader<GenericRecord> datumReader = new SpecificDatumReader<GenericRecord>(schema);
? ? DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(file, datumReader);
? ? GenericRecord dept = null;
? ? while(dataFileReader.hasNext()) {
? ? ? ? dept = dataFileReader.next(dept );
? ? ? ? System.out.println(dept );
? ? }
}
4.2 序列化性能对比
详细参考:https://tech.meituan.com/2015/02/26/serialization-vs-deserialization.html
avro-specific:通过二进制形式。? ? ?avro-generic:通过json形式
5.?Flume
Flume是一个分布式、可靠和高可用的海量日志聚合的系统,支持在系统读取各类数据发送方,用于收集数据;同时,flume提供对数据进行简单处理,并写到各种数据接收方(可定制)的能力。
Flume架构:
source:source组件是专门用来收集数据的,可以处理各种类型、各种格式的日志数据,包括avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy、自定义数据等。
channel:source组件把数据收集以后,临时存放在channel中,即channel组件在agent中是专门用来存放临时数据的——对采集到的数据进行简单的缓存,可以存放在memory、jdbc、file等。
sink:sink组件是用于把数据发送到目的地 的组件,目的地包括hdfs、logger、avro、thrift、ipc、file、null、hbse、solr、自定义。
5.1 示例:采集文件到 HDFS
需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs
分析:根据需求,首先定义以下3大要素
采集源,即source——监控文件内容更新 : exec ‘tail -F file’
下沉目标,即sink——HDFS文件系统 : hdfs sink
Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel
配置:通过如下配置,启动flume即可完成需求:flume-ng agent -c conf -f conf/tail-file.conf -n agent1 -Dflume.root.logger=INFO,console
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure tail -F source1
agent1.sources.source1.type = exec
agent1.sources.source1.command = tail -F /export/servers/taillogs/access_log
agent1.sources.source1.channels = channel1
# Describe sink1
agent1.sinks.sink1.type = hdfs
#a1.sinks.k1.channel = c1
agent1.sinks.sink1.hdfs.path = hdfs://node01:8020/weblog/flume-collection/%y-%m-%d/%H-%M
agent1.sinks.sink1.hdfs.filePrefix = access_log
agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
agent1.sinks.sink1.hdfs.batchSize= 100
agent1.sinks.sink1.hdfs.fileType = DataStream
agent1.sinks.sink1.hdfs.writeFormat =Text
agent1.sinks.sink1.hdfs.round = true
agent1.sinks.sink1.hdfs.roundValue = 10
agent1.sinks.sink1.hdfs.roundUnit = minute
agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.keep-alive = 120
agent1.channels.channel1.capacity = 500000
agent1.channels.channel1.transactionCapacity = 600
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
5.2 级联
一个flume采集的数据,可以上报到另一个flume上。右侧的flume可以将数据进行进一步处理、聚合在上报到HDFS上。
6. Pig
针对MapReduce提供了更高层次的抽象,方便MapReduce开发,Apache Pig将这些脚本转换为一系列MapReduce作业,因此,它使程序员的工作变得容易
Apache的Pig最大的作用就是对mapreduce算法(框架)实现了一套shell脚本 ,类似我们通常熟悉的 SQL 语句,在Pig中称之为 Pig Latin?,在这套脚本中我们可以对加载出来的数据进行 排序、过滤、求和、分组(group by)、关联(Joining),Pig也可以由用户自定义一些函数对数据集进行操作,也就是传说中的UDF(user-defined functions)。
7.?Crunch
Apache Crunch是FlumeJava的实现,为不太方便直接开发和使用的MapReduce程序,开发一套MR流水线,具备数据表示模型,提供基础原语和高级原语,根据底层执行引擎对MR Job的执行进行优化。从分布式计算角度看,Crunch提供的许多计算原语,可以在Spark、Hive、Pig等地方找到很多相似之处,而本身的数据读写,序列化处理,分组、排序、聚合的实现,类似MapReduce各阶段的拆分都可以在Hadoop里找到影子。(外在表现类似java版的Pig)
8. Hive
通过HDFS提供结构化存储,并兼容SQL语法查询
架构:
在Hadoop上架了一套Meta store 和Driver,实现结构化数据存储。
MetaStore: 记录元数据,包括:表名、表所属的数据库(默认是default)、表的拥有者、列/分区字段,标的类型(表是否为外部表)、表的数据所在目录。这是数据默认存储在Hive自带的derby数据库中,推荐使用MySQL数据库存储MetaStore。
Driver:将sql转化为对应MapReduce任务(底层也可以用spark实现)
Hive默认在HDFS上存储的是文本文件格式,其中每行存储一个数据行(row),行内通过分隔符Control-A(ASCII码为1)划分每列
8.1 Hive支持文件格式
表示的是Hive底层在HDFS存储的文件格式
a.?Textfile文本格式:Hive的默认格式,数据不压缩,磁盘开销大、数据解析开销大??山岷螱zip、Bzip2使用,但使用Gzip这种方式,hive不会对数据进行切分,从而无法对数据进行并行操作
b. SequenceFile:Hadoop提供的一种二进制文件格式是Hadoop支持的标准文件格式(其他生态系统并不适用),可以直接将对序列化到文件中,所以sequencefile文件不能直接查看,可以通过Hadoop fs -text查看。具有使用方便,可分割,可压缩,可进行切片。压缩支持NONE, RECORD, BLOCK(优先)等格式,可进行切片
c. RCFile:是一种行列存储相结合的存储方式,先将数据按行进行分块再按列式存储,保证同一条记录在一个块上,避免读取多个块,有利于数据压缩和快速进行列存储
d. ORCFile:orcfile是对rcfile的优化,可以提高hive的读写、数据处理性能,提供更高的压缩效率(目前主流选择之一)。列式存储
e. Parquet:一种列格式, 可提供对其他 hadoop 工具的可移植性, 包括Hive, Drill, Impala, Crunch, and Pig
f. Avro:Avro是一个数据序列化系统,设计用于支持大批量数据交换的应用。它的主要特点有:支持二进制序列化方式,可以便捷,快速地处理大量数据;动态语言友好,Avro提供的机制使动态语言可以方便地处理Avro数据。
Hive建表:
CREATE EXTERNAL TABLE IF NOT EXISTS tb_test(
? ? id bigint COMMENT 'id',
? ? name string COMMENT 'name'? ? ?
)
COMMENT 'test table'
PARTITIONED BY (dt string, country STRING) //指定分区方式
STORED AS ORC? ? ? //指定文件格式
tblproperties ('orc.compress'='SNAPPY');? //指定压缩算法
其中分区方式,表示Hive在存储时,会将文件划分多份,放入不同的目录下:
如下图所示,根据dt(日期)、country(国家)划分目录,将数据存入对应文件目录下。做到一个数据分片作用。
9. HBase
BigTable论文开源版本实现:基于HDFS的 key-value 列式存储
关键词:HDFS、Rowkey、Region、LSM、Meta表
10.新一代批处理引擎:Spark
MapReduce任务对于复杂流程编写较困难(需要组合多个MapRedece任务才能实现),同时任务执行过程中涉及大量磁盘IO(比如Map输出),运行速度很慢,所以MapReduce已慢慢被Spark代替。
Spark通过将数据缓存在内存(RDD:弹性分布式数据集),加速处理过程(Spark的处理速度是Hadoop的10~100倍)
从狭义上来看,Spark只是MapReduce的替代方案,大部分应用场景中,它还要依赖于HDFS和HBase来存储数据,依赖于YARN来管理集群和资源。
当然,Spark并不是一定要依附于Hadoop才能生存,它还可以运行在Apache Mesos、Kubernetes、standalone等其他云平台上。
Spark架构:
10.1 workflow模型
Spark通过将一系列操作转换成一个有向无环图(DAG),这样能够在一个任务里实现多个逻辑,并且支持:groupby、Filter、reduce、Map、join等操作,避免了需要多个MapReduce才能实现一个复杂任务。
10.2 RDD
一个RDD可以基本理解DAG 图中对应的一个操作。比如:RDD1对应上图中A,RDD2对应上图中B
RDD是一个只读的有属性的数据集。属性用来描述当前数据集的状态,数据集是由数据的分区(Data)组成,并(由block)映射成真实数据。RDD属性包括名称、分区类型、父RDD指针、数据本地化、数据依赖关系等,主要属性如下:
a. Dependencies 表示的是与其他RDD 的关系
b. SparkContext是所有Spark功能的入口,它代表了与Spark节点的连接,可以用来创建RDD对象以及在节点中的广播变量等。一个线程只有一个SparkContext。SparkConf则是一些参数配置信息。
c. Data 它代表RDD中数据的逻辑结构,每个Partition会映射到某个节点内存或硬盘的一个数据块。
d. Partitioner决定了RDD的分区方式,目前有两种主流的分区方式:Hash partitioner和Range partitioner。Hash,顾名思义就是对数据的Key进行散列分区,Range则是按照Key的排序进行均匀分区。此外我们还可以创建自定义的Partitioner。
e.检查点(Checkpoint):基于RDD的依赖关系,如果任意一个RDD在相应的节点丢失,你只需要从上一步的RDD出发再次计算,便可恢复该RDD。但是,如果一个RDD的依赖链比较长,而且中间又有多个RDD出现故障的话,进行恢复可能会非常耗费时间和计算资源。在计算过程中,对于一些计算过程比较耗时的RDD,我们可以将它缓存至硬盘或HDFS中,标记这个RDD有被检查点处理过,并且清空它的所有依赖关系。同时,给它新建一个依赖于CheckpointRDD的依赖关系,CheckpointRDD可以用来从硬盘中读取RDD和生成新的分区信息。
f.存储级别(Storage Level)是一个枚举类型,用来记录RDD持久化时的存储级别,常用的有以下几个:
MEMORY_ONLY:只缓存在内存中,如果内存空间不够则不缓存多出来的部分。这是RDD存储级别的默认值。
MEMORY_AND_DISK:缓存在内存中,如果空间不够则缓存在硬盘中。
DISK_ONLY:只缓存在硬盘中。
MEMORY_ONLY_2和MEMORY_AND_DISK_2等:与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
g. 迭代函数(Iterator)和计算函数(Compute)是用来表示RDD怎样通过父RDD计算得到的。
RDD并行执行原理:
图中每个RDD下的 p 代表真实数据(上一个图中的data部分),每个 p 代表需要处理数据的一部分,这些 p分布在不同机器上,执行时都是并行执行。
RDD的转换操作:Map、Filter、groupByKey
RDD动作操作:Collect、Reduce、Count
例子:
rdd = sc.parallelize(["b", "a", "c"])
rdd.map(lambda x: (x, 1)).collect() // [('b', 1), ('a', 1), ('c', 1)]
RDD是延迟执行的,只有 动作 操作才能触发RDD执行。转换操作只是帮助构建DAG图,到达 动作操作时,才触发真正执行。
10.3 Spark SQL
在RDD底层的API上封装了一层SQL引擎:负责将SQL转换为对应的RDD API,并对任务进行优化,执行相应任务。
同时Spark SQL暴露了 DataFrame、DataSet API,可以让用户用更高层、简洁API进行开发
10.4 Spark Streaming流式计算
通过微批次实现流式计算:每次获取固定时间间隔数据进行处理
示例:
sc = SparkContext(master, appName)
ssc = StreamingContext(sc, 1)
lines = sc.socketTextStream("localhost", 9999)
words = lines.flatMap(lambda line: line.split(" "))
10.5?Structured Streaming 流批一体
通过DataFrame API(Saprk SQL)实现流式计算(微批次实现流式计算),做到用同一套API,既可以处理流计算,也可以用于批处理。
words = ... #这个DataFrame代表词语的数据流,schema是 { timestamp: Timestamp, word: String}
windowedCounts = words.groupBy(
? window(words.timestamp, "1 minute", "10 seconds"),
? words.word
).count()
.sort(desc("count"))
.limit(10)
基于词语的生成时间,我们创建了一个窗口长度为1分钟,滑动间隔为10秒的window。然后,把输入的词语表根据window和词语本身聚合起来,并统计每个window内每个词语的数量。之后,再根据词语的数量进行排序,只返回前10的词语。
当我们编写 Spark Streaming 程序的时候,本质上就是要去构造RDD的DAG执行图,然后通过 Spark Engine 运行。这样开发者身上的担子就很重,很多时候要自己想办法去提高程序的处理效率
Structured Streaming提供的DataFrame API就是这么一个相对高level的API,大部分开发者都很熟悉关系型数据库和SQL。这样的数据抽象可以让他们用一套统一的方案去处理批处理和流处理,不用去关心具体的执行细节。而且,DataFrame API是在Spark SQL的引擎上执行的,Spark SQL有非常多的优化功能,比如执行计划优化和内存管理等,所以Structured Streaming的应用程序性能很好。
11. 原生流式计算引擎:Flink
相比spark 微批次实现的流式计算,flink支持原生的流式计算(可以理解为:数据来了就处理),提供更低延迟。
支持:select、groupby、count、map、combine、join、filter、keyby、flatmap、Aggregation等
关键词:窗口、水位线、滑动窗口
11.1 Flink架构
11.2 Flink流批一体
Flink提供的两个核心API就是DataSet API和DataStream API。DataSet代表有界的数据集,而DataStream代表流数据。所以,DataSet API是用来做批处理的,而DataStream API是做流处理的。
在内部,DataSet其实也用Stream表示,静态的有界数据也可以被看作是特殊的流数据,而且DataSet与DataStream可以无缝切换。所以,Flink的核心是DataStream。
wordCount示例:
public class WindowWordCount {
public static void main(String[] args) throws Exception {
? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
? DataStream> dataStream = env.socketTextStream("localhost", 9999)
.flatMap(new Splitter()).keyBy(0)
.timeWindow(Time.seconds(5))
.sum(1);
? dataStream.print();
? env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction> {
? @Override
? public void flatMap(String sentence, Collector> out) {
? ? for (String word: sentence.split(" ")) {
? ? ? out.collect(new Tuple2(word, 1));
? ? }
? }
}
12. 流批一体:Apache Beam
Google在2016年的时候联合了Talend、Data Artisans、Cloudera这些大数据公司,基于Dataflow Model的思想开发出了一套SDK,并贡献给了Apache Software Foundation
很多时候我们不可避免地需要对数据同时进行批处理和流处理。Beam提供了一套统一的API来处理这两种数据处理模式,让我们只需要将注意力专注于在数据处理的算法上,而不用再花时间去对两种数据处理模式上的差异进行维护。
Apache Beam的主要目标是统一批处理和流处理的编程范式,为无限,乱序,web-scale的数据集处理提供简单灵活,功能丰富以及表达能力十分强大的SDK。Apache Beam项目重点在于数据处理的编程范式和接口定义,并不涉及具体执行引擎的实现,Apache Beam希望基于Beam开发的数据处理程序可以执行在任意的分布式计算引擎上。现阶段Apache Beam支持的Runner有近十种,包括了我们很熟悉的Apache Spark和Apache Flink。
随着分布式数据处理不断发展,新的分布式数据处理技术也不断被提出,业界涌现出了越来越多的分布式数据处理框架,从最早的Hadoop MapReduce,到Apache Spark,Apache Storm,以及更近的Apache Flink,Apache Apex等。新的分布式处理框架可能带来的更高的性能,更强大的功能,更低的延迟等,但用户切换到新的分布式处理框架的代价也非常大:需要学习一个新的数据处理框架,并重写所有的业务逻辑。解决这个问题的思路包括两个部分,首先,需要一个编程范式,能够统一,规范分布式数据处理的需求,例如,统一批处理和流处理的需求。其次,生成的分布式数据处理任务应该能够在各个分布式执行引擎上执行,用户可以自由切换分布式数据处理任务的执行引擎与执行环境。Apache Beam正是为了解决以上问题而提出的。
Apache Beam主要由Beam SDK和Beam Runner组成,Beam SDK定义了开发分布式数据处理任务业务逻辑的API接口,生成的的分布式数据处理任务Pipeline交给具体的Beam Runner执行引擎。Apache Beam目前支持的API接口是由Java语言实现的,Python版本的API正在开发之中。Apache Beam支持的底层执行引擎包括Apache Flink,Apache Spark以及Google Cloud Platform,此外Apache Storm,Apache Hadoop,Apache Gearpump等执行引擎的支持也在讨论或开发当中。其基本架构如下图所示:
13. 经典大数据架构
13.1 Lamda架构?
twitter提出
Lambda 架构将数据处理流程整体分成了三层:离线层(Batch Layer),加速层(Speed layer),服务层(Serving layer)
离线层:新数据不断喂给数据系统,期间每个样本都会同时发给离线和加速层。所有进入离线层的数据流都会在数据湖(Data Lake)上进行计算处理。数据湖多会使用基于内存的数据库或 NoSQL 类的永久存储设施,数据存好后离线层使用 MapReduce 或一些机器学习方法对数据进行处理并由此对接下来的内容进行预测。
加速层:加速层会享用离线层事件溯源成果。离线层中的数据处理会涉及增量程序,MapReduce 或机器学习模型的更新,模型会被加速层进一步用来处理新数据。就这样,加速层借助富集过程得到结果,保证服务层请求响应延迟处于低位。而加速层也几乎只处理实时数据,计算负荷较低,延迟有保障。
服务层:从离线层得到离线视图(batch view),加速层得到准实时视图(near-real time view),统一送给服务层。服务层使用这些信息临时应付那些等候的查询。
如果将整个过程用函数方程表示,任意大数据查询都可归结为如下形式
Query=λ(Complete data)=λ(live streaming data)?λ(stored data)Query=λ(Complete data)=λ(live streaming data)?λ(stored data)
方程中所用符号名为 Lambda_(音译:拉姆达)_,Lambda 架构的名字也源自于此。这个方程广为熟悉数据分析轶事的人所知,从中可以看出所有数据相关查询,结合离线历史信息和实时流,Lambda 架构都能处理。
13.2 Kappa架构
LinkedIn提出
Kappa架构相比Lamda架构,去掉了离线层,只保留加速层,这样就不用同时维护2套架构,更加简洁。当需要对数据重新处理时,可以借助数据源(比如:Kafka)的消息回溯功能,将消息回溯到1个月甚至1年前,进行重新处理。
将 Kappa 架构的整体流程转换成功能方程,任意数据查询形式归结为
Query=K(New data)=K(Live streaming data)Query=K(New data)=K(Live streaming data)
从方程可以看出所有查询都能通过加速层对实时数据流的处理得到满足。同时也指出流处理过程发生在 Kappa 架构的加速层中