MapReduce框架原理

最全的MapReduce框架原理,方便以后复习。知识点来自尚硅谷的课程学习。课程链接


一、InputFormat数据输入

1. 切片与MapTask并行度决定机制

数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。实际存储在磁盘上,还是按照HDFS将数据分成一个一个Block进行存储。
MapTask的并行速度决定Map阶段的任务处理并发度,进而影响到整个Job的处理速度。

决定机制原理

  • 当切片大小为100M时,不同节点之间需要数据传输,耗费大量IO,不高效;
  • 当切分时,只针对单个文件进行切分,不考虑文件之间的大小;
  • MapTask的数量由切片数量决定,切片的大小默认是块大小。

2. Job提交流程源码解析

Job提交流程源码解析
waitForCompletion();

// 1 建立连接
connect();
  // 1.1 创建提交Job的代理
  new Cluster(getConfiguration());
  // 1.2 判断是本地yarn还是远程yarn
  initialize(jobTrackAddr, conf);

// 2 提交job
submitter.submitJobInternal(Job.this, cluster)
  // 2.1 创建给集群提交数据的Stag路径
  Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
  // 2.2 获取jobid,并创建Job路径
  JobID jobId = submitClient.getNewJobID();
  // 2.3 拷贝jar包到集群
  copyAndConfigureFiles(job, submitJobDir);
  rUpLoader.uploadFiles(job, jobSubmitDir);
  // 2.4 计算切片,生成切片规划文件
  writeSplits(job, submitJobDir);
  maps = writeNewSplits(job, jobSubmitDir);
  input.getSplits(job);
  // 2.5 向Stag路径写XML配置文件
  writeConf(conf, submitJobFile);
  conf.writeXml(out);
  // 2.6 提交Job,返回提交状态
  status = submitClint.submitJob(jobId, submitJobDir.toString(), job.getCredentials());

3. FileInputFormat切片机制

  • 根据计算公式,切片大小默认为块大小,本地模式切片大小为32M;
  • 公式中,默认minSize=1,maxSize=Long.MAXValue;
  • 切片大小设置:maxSize调的小于blockSize,则切片会变小。minSize调的比blockSize大,则可以让切片变大;
  • MrAppMaster根据切片数量计算MapTask个数;
  • 获取切片的文件名:
    String name = inputSplit.getPath().getName();
  • 根据文件类型获取切片信息:
    FileSplit inputSplit = (FileSplit) context.getInputSplit();

4. CombineTextInputFormat切片机制

框架默认的TextInputFormat切片机制是对任务按照文件进行切片,当有大量小文件时,就会产生大量的MapTask,处理效率及其低下。

CombineTextFormat用于小文件过多的场景,从逻辑上将多个小文件规划到一个切片中,交给一个MapTask处理。


  • 虚拟存储切片最大值可以任意设置,但是要根据实际的小文件大小来具体设置:
    combineTextFormat.setMaxInputSplitSize(job, 4194304); // 4M
  • 主要分为两个过程:虚拟存储过程和切片过程。
  • 当存储时当文件大于4M,会判断是否比2*4M大。
  • 在实际使用时,需要处理很多小文件时,可以按如下操作,在Driver类中增加如下信息:
// 如果不设置InputFormat,默认是TextInputFormat.class
job.setInputFormatClass(CombineTextInputForamt.class);
// 设置虚拟存储切片最大值
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);

5. FileInputFormat实现类

在运行MapReduce程序时,针对不同格式的输入文件,MapReduce是如何读取这些数据的?

FileInputFormat常见的接口实现类包括:

  • TextInputFormat
  • KeyValueTextInputFormat
  • NLineInputFormat
  • CombineTextInputFormat
  • 自定义的InputFormat
5.1 TextInputFormat

TextinputFormat是默认的FileInputFormat实现类。按行读取每条记录,健是存储该行在整个文件中的起始字节偏移量,LongWritable类型。值是这行的内容,不包括任何终止符(换行,回车等),Text类型。

5.2 KeyValueTextInputFormat

每一行均为一条记录,被分割符分割为key和value,默认的分割符是"\t"??梢栽谇嘀薪猩柚茫?br> conf.set(KeyValueLineRecoredReader,KEY_VALUE_SEPERATOR, "\t");
此时的key是每行排在分割符之前的Text序列。

5.3 NLineInputFormat

如果使用NLineInpurFormat,代表每个MapTask进程处理的InputSplit不再按照Block块去切分,而是按照NLineInputFormat指定的行数N来切分。
输入文件的总行数 \div N=切片数,如果不整除,切片数=商+1。
此时的key-value与TextInputFormat生成的一样??梢栽谇嘀薪猩柚茫?br> NLineInputFormat.setNumLinesPerSplit(job, 3);

5.4 自定义InputFormat

具体步骤:
1)自定义一个类继承FileInputFormat;
2)改写RecordReader,实现自定义读取并封装为key-value;
3)在输出时使用SequenceFileOutPutFormat输出合并文件。

SequenceFile文件是Hadoop用来存储二进制形式的key-value对的文件格式,文件路径+文件名为key,文件内容为value。

自定义InputFormat案例:


实现步骤

代码实现:
WholeFileInputFormat.java

package Inputformat;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import java.io.IOException;

public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {

    @Override
    public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        WholeRecordReader recordReader = new WholeRecordReader();
        recordReader.initialize(split, context);

        return recordReader;
    }
}

WholeRecordReader.java

package Inputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class WholeRecordReader extends RecordReader<Text, BytesWritable> {

    FileSplit split;
    Configuration configuration;
    Text k = new Text();
    BytesWritable v = new BytesWritable();
    boolean isProgress = true;

    // 初始化
    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
        this.split = (FileSplit) split;
        configuration = context.getConfiguration();
    }

    // 核心的业务逻辑
    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {

        if (isProgress) {
            byte[] buf = new byte[(int) split.getLength()];

            // 1 获取Fs对象
            Path path = split.getPath();
            FileSystem fs = path.getFileSystem(configuration);

            // 2 获取输入流
            FSDataInputStream fis = fs.open(path);

            // 3 拷贝
            IOUtils.readFully(fis, buf, 0, buf.length);

            // 4 封装v
            v.set(buf, 0, buf.length);

            // 5 封装k
            k.set(path.toString());

            // 6 关闭资源
            IOUtils.closeStream(fis);

            isProgress = false;

            return true;
        }

        return false;
    }

    public Text getCurrentKey() throws IOException, InterruptedException {
        return k;
    }

    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return v;
    }

    public float getProgress() throws IOException, InterruptedException {
        return 0;
    }

    public void close() throws IOException {

    }
}

完成相应的Mapper和Reducer类,并在Driver中增加两句:

// 4 设置输入的InputFormat
job.setInputFormatClass(WholeFileInputFormat.class);

// 5 设置输出的OutputFormat
job.setOutputFormatClass(SequenceFileOutputFormat.class);

二、Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。

1. Partition分区

分区就是将计算结果按照条件输出到不同文件中。比如按照手机号归属地将不同省份输出到不同文件中。

1.1系统默认的分区是Hash分区:
public class Hashpartitioner<K, V> extends Partitioner<K, V> {
  public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hasCode() & Integer.MAX_VALUE) % numReduceTasks;
  }
}
  • 默认分区是根据key的HashCode对ReduceTasks个数取模得到的;
  • 用户没法控制那个Key存储到那个分区。
1.2 自定义Partition分区

自定义步骤:



案例实现:

按照手机号前三位将统计结果写入到不同的文件。

自定义Partition类ProvincePartitioner.java

package Flowsum;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<Text, FlowBean> {

    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {

        // 1 获取手机号前三位
        String prePhoneNum = key.toString().substring(0, 3);

        int partition = 3;
        if ("136".equals(prePhoneNum)) {
            partition = 0;
        }else if ("137".equals(prePhoneNum)) {
            partition = 1;
        }else if ("138".equals(prePhoneNum)) {
            partition = 2;
        }
        
        return partition;
    }
}

编写Mapper和Reducer,在Driver类中增加如下代码:

// 5 设置Partition分区
 job.setPartitionerClass(ProvincePartitioner.class);
 job.setNumReduceTasks(4);
  • 在定义分区时,分区号一定要严格按照顺序从0开始;
  • 在设置分区数时:
    当忘记设置或者设置为1,则最终只会产生一个文件,程序会将所有的分区数据统统写入一个文件;
    当设置的分区数在1和实际分区数之间会报错(IO异常);
    当设置的分区数大于实际分区数,会生成期待的结果,但是会多出超出分区个数个空文件,因为实际运行中,没有对应的数据传给多余的ReduceTask。
1.3 WritableComparable排序

排序是MapReduce中的重要操作。

MapTask和ReduceTask均会对数据按照key进行排序 ,属于Hadoop的默认行为。默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

MapReduce排序分类:


自定义排序:
bean对象作为key传输,需要实现WritableComparable接口重写compareTo方法,就可以实现排序。

示例编写:

按照电话所属区将统计数据存储到不同的文件中,并在每个文件中实现按总流量的逆序。

重写编写FlowBean.java

package Sort;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
        super();
    }

    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        sumFlow = upFlow + downFlow;
    }

    // 核心的比较
    public int compareTo(FlowBean bean) {

        int result;

        if (sumFlow > bean.getSumFlow()) {
            result = -1;
        }else if(sumFlow < bean.getSumFlow()) {
            result = 1;
        }else{
            result = 0;
        }

        return result;
    }

    // 序列化方法
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(sumFlow);
    }

    // 反序列化方法
    public void readFields(DataInput in) throws IOException {
        upFlow = in.readLong();
        downFlow = in.readLong();
        sumFlow = in.readLong();
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }
}
  • 此时,要继承WritableComparable类;
  • 重写compareTo方法,这是排序比较的核心;

编写Mapper类,FlowCountSortMapper.java

package Sort;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FlowCountSortMapper extends Mapper<LongWritable, Text, FlowBean, Text> {

    FlowBean k = new FlowBean();
    Text v = new Text();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 1 获取一行
        String line = value.toString();

        // 2 拆分
        String[] fields = line.split("\t");

        // 3 封装
        k.setUpFlow(Long.parseLong(fields[1]));   // 流量作为key
        k.setDownFlow(Long.parseLong(fields[2]));
        k.setSumFlow(Long.parseLong(fields[3]));

        v.set(fields[0]);  // 电话号码作为Value

        // 3 写出
        context.write(k, v);
    }
}
  • 需要对数据按照总流量排序,所以Map阶段结束后的key应该是Flowbean对象,value为电话号码.

编写Reducer类,FlowCountSortReducer.java

package Sort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FlowCountSortReducer extends Reducer<FlowBean, Text, Text, FlowBean> {

    @Override
    protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        for (Text value : values) {
            context.write(value, key);
        }
    }
}
  • 排序结束后,生成的数据还是按照电话,流量展示,所以此时输出的key为电话号码,value为FlowBean。

编写Partition类,ProvincePartitioner.java

package Sort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class ProvincePartitioner extends Partitioner<FlowBean, Text> {

    public int getPartition(FlowBean flowBean, Text text, int numPartitions) {

        String prePhoneNum = text.toString().substring(0, 3);

        int partition = 3;

        if ("136".equals(prePhoneNum)) {
            partition= 0;
        }else if ("137".equals(prePhoneNum)) {
            partition = 1;
        }else if ("138".equals(prePhoneNum)) {
            partition = 2;
        }

        return partition;
    }
}
  • 在Reducer阶段,需要将不同所属区的数据写入不同文件,此时需要对数据进行Partition分区。

编写驱动类,FlowCountSortDriver.java

package Sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class FlowCountSortDriver {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(FlowCountSortDriver.class);
        job.setMapperClass(FlowCountSortMapper.class);
        job.setReducerClass(FlowCountSortReducer.class);

        job.setPartitionerClass(ProvincePartitioner.class);
        job.setNumReduceTasks(4);

        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.waitForCompletion(true);
    }
}
最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351