文档章节

大数据教程(9.2)MR内部的shuffle过程详解&combiner的运行机制及代码实现

em_aaron
 em_aaron
发布于 2018/12/04 22:04
字数 1477
阅读 108
收藏 1

        之前的文章已经简单介绍过mapreduce的运作流程,不过其内部的shuffle过程并未深入讲解;本篇博客将分享shuffle的全过程。

        一、mapreduce运作流程长卷图(其中[深]朱红色代表是可以用户自定义的部分,当然它们有默认实现)

        二、shuffle过程中的combiner自定义实现

               首先combiner组件有什么作用呢?它可以减少我们在shuffle归并排序是的次数、reduce阶段处理的数据次数,同时可以有效提供程序的执行效率。

               以下是wordcount使用combiner实现的代码

               (1) maper实现:

package com.empire.hadoop.mr.wccombinerdemo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * WordcountMapper.java的实现描述: KEYIN: 默认情况下,是mr框架所读到的一行文本的起始偏移量,Long,
 * 但是在hadoop中有自己的更精简的序列化接口,所以不直接用Long,而用LongWritable
 * VALUEIN:默认情况下,是mr框架所读到的一行文本的内容,String,同上,用Text
 * KEYOUT:是用户自定义逻辑处理完成之后输出数据中的key,在此处是单词,String,同上,用Text
 * VALUEOUT:是用户自定义逻辑处理完成之后输出数据中的value,在此处是单词次数,Integer,同上,用IntWritable 类
 * 
 * @author arron 2018年12月4日 下午9:30:09
 */

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    /**
     * map阶段的业务逻辑就写在自定义的map()方法中 maptask会对每一行输入数据调用一次我们自定义的map()方法
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //将maptask传给我们的文本内容先转换成String
        String line = value.toString();
        //根据空格将这一行切分成单词
        String[] words = line.split(" ");

        //将单词输出为<单词,1>
        for (String word : words) {
            //将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reduce task
            context.write(new Text(word), new IntWritable(1));
        }

    }

}

               (2) reducer实现实现:

package com.empire.hadoop.mr.wccombinerdemo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 类 WordcountReducer.java的实现描述:KEYIN, VALUEIN 对应 mapper输出的KEYOUT,VALUEOUT类型对应
 * KEYOUT, VALUEOUT 是自定义reduce逻辑处理结果的输出数据类型 KEYOUT是单词 VLAUEOUT是总次数
 * 
 * @author arron 2018年12月4日 下午9:51:15
 */
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    /**
     * <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1>
     * <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>
     * <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>
     * 入参key,是一组相同单词kv对的key
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        int count = 0;
        /*
         * Iterator<IntWritable> iterator = values.iterator();
         * while(iterator.hasNext()){ count += iterator.next().get(); }
         */

        for (IntWritable value : values) {

            count += value.get();
        }

        context.write(key, new IntWritable(count));

    }

}

               (3) combiner实现实现:

package com.empire.hadoop.mr.wccombinerdemo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 类 WordcountCombiner.java的实现描述:输如为map的输出
 * 
 * @author arron 2018年12月4日 下午9:29:25
 */
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        int count = 0;
        for (IntWritable v : values) {

            count += v.get();
        }

        context.write(key, new IntWritable(count));

    }

}

               (4) mapreduce主程序驱动类实现:

package com.empire.hadoop.mr.wccombinerdemo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 类 WordcountDriver.java的实现描述:相当于一个yarn集群的客户端 需要在此封装我们的mr程序的相关运行参数,指定jar包
 * 最后提交给yarn
 * 
 * @author arron 2018年12月4日 下午9:29:48
 */
public class WordcountDriver {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        //是否运行为本地模式,就是看这个参数值是否为local,默认就是local
        /* conf.set("mapreduce.framework.name", "local"); */

        //本地模式运行mr程序时,输入输出的数据可以在本地,也可以在hdfs上
        //到底在哪里,就看以下两行配置你用哪行,默认就是file:///
        /* conf.set("fs.defaultFS", "hdfs://mini1:9000/"); */
        /* conf.set("fs.defaultFS", "file:///"); */

        //运行集群模式,就是把程序提交到yarn中去运行
        //要想运行为集群模式,以下3个参数要指定为集群上的值
        /*
         * conf.set("mapreduce.framework.name", "yarn");
         * conf.set("yarn.resourcemanager.hostname", "mini1");
         * conf.set("fs.defaultFS", "hdfs://mini1:9000/");
         */
        Job job = Job.getInstance(conf);

        job.setJar("c:/wc.jar");
        //指定本程序的jar包所在的本地路径
        /* job.setJarByClass(WordcountDriver.class); */

        //指定本业务job要使用的mapper/Reducer业务类
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        //指定mapper输出数据的kv类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //指定最终输出的数据的kv类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //指定需要使用combiner,以及用哪个类作为combiner的逻辑
        /* job.setCombinerClass(WordcountCombiner.class); */
        job.setCombinerClass(WordcountReducer.class);

        //如果不设置InputFormat,它默认用的是TextInputformat.class
        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
        CombineTextInputFormat.setMinInputSplitSize(job, 2097152);

        //指定job的输入原始文件所在目录
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job的输出结果所在目录
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //将job中配置的相关参数,以及job所用的java类所在的jar包,提交给yarn去运行
        /* job.submit(); */
        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 : 1);

    }

}

        三、最后总结

               虽然combiner组件在shuffle阶段使用的话,可以提高程序效率;但是,它有一个使用限制条件,那就是不能影响最后的执行结果;例如:这里讲述一个反例,对多个输入的数进行求平均数,如果此时使用combiner将不能得到正确的结果。       

        最后寄语,以上是博主本次文章的全部内容,如果大家觉得博主的文章还不错,请点赞;如果您对博主其它服务器大数据技术或者博主本人感兴趣,请关注博主博客,并且欢迎随时跟博主沟通交流。

 

© 著作权归作者所有

em_aaron
粉丝 85
博文 133
码字总数 223239
作品 3
黄浦
高级程序员
私信 提问
hadoop 学习笔记:mapreduce框架详解

hadoop 学习笔记:mapreduce框架详解   开始聊mapreduce,mapreduce是hadoop的计算框架,我 学hadoop是从hive开始入手,再到hdfs,当我学习hdfs时候,就感觉到hdfs和mapreduce关系的紧密。...

MR_White
2014/08/28
684
0
MapReduce:详解Shuffle过程

Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方。要想理解MapReduce, Shuffle是必须要了解的。我看过很多相关的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越...

pczhangtl
2013/11/24
80
1
MapReduce:详解Shuffle过程

Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方。要想理解MapReduce, Shuffle是必须要了解的。我看过很多相关的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越...

MR_White
2014/09/04
1K
0
MapReduce:详解Shuffle过程---map和reduce数据交互的关键

http://langyu.iteye.com/blog/992916 Shuffle描述着数据从map task输出到reduce task输入的这段过程。 个人理解: map执行的结果会保存为本地的一个文件中: 只要map执行 完成,内存中的map...

new_chaos
2018/01/12
32
0
MapReduce: 详解 Shuffle 过程

Shuffle过程是MapReduce的核心,也被称为奇迹发生的地方。要想理解MapReduce, Shuffle是必须要了解的。我看过很多相关的资料,但每次看完都云里雾里的绕着,很难理清大致的逻辑,反而越搅越...

大数据之路
2012/08/20
919
1

没有更多内容

加载失败,请刷新页面

加载更多

微服务架构一直火,为什么服务化要搞懂?

微服务架构,这 5 年左右一直被认可,是软件架构的未来方向。需要大家理解的是,为什么需要服务化。比如微服务架构对企业来说,带来什么价值?有啥弊端? 这里浅谈一下微服务架构,主要还是在...

泥瓦匠BYSocket
40分钟前
3
0
总结:单机与分布式

传统计算方案演变 1、单机并行运算 1,打开数据源 2,统计出有多少个文件。 3,为每个文件执行相同的统计命令 4,等待所有命令执行成功。 5,合并统计后结果输出或执行进一步统计 2、分布式并...

浮躁的码农
51分钟前
5
0
关于怎么解决CENTOS7没有ETH0网卡这个问题

CentOS7系统安装完毕之后,输入ifconfig命令发现没有eth0,不符合我们的习惯。而且也无法远程ssh连接。 1.进入目录/etc/sysconfig/network-scripts/ 2.将文件ifcfg-ens33重命名为ifcfg-eth0;...

无名氏的程序员
57分钟前
5
0
HTML5 Web Storage 存储介绍

Web Storage是HTML5 API提供一个新的重要的特性; 最新的Web Storage草案中提到,在web客户端可用html5 API,以Key-Value形式来进行数据持久存储; 目前主要的浏览器已经支持该功能: 常见的...

前端老手
今天
5
0
安装mxnet出现的错误

我出现下面的错误:是因为我前面的安装步骤都正确,只是这一步出现错误,sudo python setup.py install 其实我看了下我默认的python是3.6,是大于3.5 ,改为sudo python3 setup.py install就...

南桥北木
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部