文档章节

MapReduce知识点详解

simpler
 simpler
发布于 2014/05/19 00:25
字数 1761
阅读 8.3K
收藏 5

了解计数器

hadoop计数器:可以让开发人员以全局的视角来审查程序的运行情况以及各项指标,及时做出错误诊断并进行相应处理。

内置计数器(MapReduce相关、文件系统相关和作业调度相关)

也可以通过http://master:50030/jobdetails.jsp查看

对文件内容为

hello you

hello me

的数据进行简单统计的 计数器信息

Counters: 19
   File Output Format Counters
     Bytes Written=19  //reduce输出到hdfs的字节数
   FileSystemCounters
     FILE_BYTES_READ=481
     HDFS_BYTES_READ=38
     FILE_BYTES_WRITTEN=81316
     HDFS_BYTES_WRITTEN=19
   File Input Format Counters
     Bytes Read=19  //map从hdfs读取的字节数
   Map-Reduce Framework
     Map output materialized bytes=49
     Map input records=2    //map读入的记录行数
     Reduce shuffle bytes=0
     Spilled Records=8
     Map output bytes=35
     Total committed heap usage (bytes)=266469376
     SPLIT_RAW_BYTES=105
     Combine input records=0
     Reduce input records=4  //reduce从map端接收的记录行数
     Reduce input groups=3  //reduce函数接收的key数量,即归并后的k2数量
     Combine output records=0
     Reduce output records=3  //reduce输出的记录行数
     Map output records=4  //map输出的记录行数

计数器声明

1.通过枚举声明

context.getCounter(Enum enum)

2.动态声明

context.getCounter(String groupName,String counterName)

计数器操作

counter.setValue(long value);//设置初始值

counter.increment(long incr);//增加计数


Combiners编程

每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。

combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。

如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

注意:Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

排序和分组

1.在map和reduce阶段进行排序时,比较的是k2。v2是不参与排序比较的。如果要想让v2也进行排序,需要把k2和v2组装成新的类,作为k2,才能参与比较。

2.分组时也是按照k2进行比较的。

如下内容 按照第一列升序排列,当第一列相同时,第二列升序排列

3    3

3    1

3    2

2    1

2    2

1    1

-----------------

结果如下

1    1

2    1

2    2

3    1

3    2

3    3

package sort;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.net.URI;

import mapreduce.WordCountApp;

import mapreduce.WordCountApp.MyMapper;

import mapreduce.WordCountApp.MyReducer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**

 * 二次排序Secondary Sort

 * @author wuchao

 *

 */

public class SortApp {

    public static final String INPUT_PATH = "hdfs://hadoop:9000/data";

    public static final String OUT_PATH = "hdfs://hadoop:9000/out";

    

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);

        fileSystem.delete(new Path(OUT_PATH), true);

        

        final Job job = new Job(conf , WordCountApp.class.getSimpleName());

        job.setJarByClass(SortApp.class);

        //1.1

        FileInputFormat.setInputPaths(job, INPUT_PATH);

        job.setInputFormatClass(TextInputFormat.class);

        //1.2

        job.setMapperClass(MyMapper.class);

        job.setMapOutputKeyClass(TwoInt.class);

        job.setMapOutputValueClass(IntWritable.class);

        //1.3

        //1.4 

        //1.5

        

        //2.1

        //2.2

        job.setReducerClass(MyReducer.class);

        job.setOutputKeyClass(IntWritable.class);

        job.setOutputValueClass(IntWritable.class);

        //2.3

        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));

        job.setOutputFormatClass(TextOutputFormat.class);

        

        //把代码提交给JobTracker执行

        job.waitForCompletion(true);

        

        final FSDataInputStream in = fileSystem.open(new Path(OUT_PATH+"/part-r-00000"));

        IOUtils.copyBytes(in, System.out, 1024, true);

    }

    

    public static class MyMapper extends Mapper<LongWritable, Text, TwoInt, IntWritable>{

        

        final TwoInt k2 = new TwoInt();

        final IntWritable v2 = new IntWritable();

        

        protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,TwoInt,IntWritable>.Context context) throws java.io.IOException ,InterruptedException {

            final String[] splited = value.toString().split("\t");

            k2.set(Integer.parseInt(splited[0]), Integer.parseInt(splited[1]));

            v2.set(Integer.parseInt(splited[1]));

            context.write(k2v2);

        };

    }

    //排序后,会产生6个组

    public static class MyReducer extends Reducer<TwoInt, IntWritable, IntWritable, IntWritable>{

        

        final IntWritable k3 = new IntWritable();

        protected void reduce(TwoInt key2, java.lang.Iterable<IntWritable> value2s, org.apache.hadoop.mapreduce.Reducer<TwoInt,IntWritable,IntWritable,IntWritable>.Context context) throws java.io.IOException ,InterruptedException {

            k3.set(key2.first);

            for (IntWritable value2 : value2s) {

                context.write(k3, value2);

            }

        };

    }

}

class TwoInt implements WritableComparable<TwoInt>{

    int first;

    int second;

    

    public void set(int first, int second) {

        this.first = first;

        this.second = second;

    }

    

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeInt(this.first);

        out.writeInt(this.second);

    }

    @Override

    public void readFields(DataInput in) throws IOException {

        this.first = in.readInt();

        this.second = in.readInt();

    }

    /**

     * 先比较first,如果first相同,再比较second

     * @param o

     * @return

     */

    @Override

    public int compareTo(TwoInt o) {

        if(this.first!=o.first) {

            return this.first-o.first;

        } else {

            return this.second - o.second;

        }

        

    }

}


分组测试:

如下内容 按照第一列升序分组,当第一列相同时,第二列升序排列

3    3

3    1

3    2

2    1

2    2

1    1

---------------结果如下

1    1

2    1,2

3    1,2,3

package group;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.net.URI;

import mapreduce.WordCountApp;

import mapreduce.WordCountApp.MyMapper;

import mapreduce.WordCountApp.MyReducer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IOUtils;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.RawComparator;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.WritableComparable;

import org.apache.hadoop.io.WritableComparator;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**

 * 二次排序Secondary Sort

 * @author wuchao

 *

 */

public class GroupSort {

    public static final String INPUT_PATH = "hdfs://hadoop:9000/data";

    public static final String OUT_PATH = "hdfs://hadoop:9000/out";

    

    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();

        final FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);

        fileSystem.delete(new Path(OUT_PATH), true);

        

        final Job job = new Job(conf , GroupSort.class.getSimpleName());

        job.setJarByClass(GroupSort.class);

        //1.1

        FileInputFormat.setInputPaths(job, INPUT_PATH);

        job.setInputFormatClass(TextInputFormat.class);

        //1.2

        job.setMapperClass(MyMapper.class);

        job.setMapOutputKeyClass(TwoInt2.class);

        job.setMapOutputValueClass(IntWritable.class);

        //1.3

        //1.4 

        job.setGroupingComparatorClass(GroupingComparator2.class);

        //1.5

        

        //2.1

        //2.2

        job.setReducerClass(MyReducer.class);

        job.setOutputKeyClass(IntWritable.class);

        job.setOutputValueClass(IntWritable.class);

        //2.3

        FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));

        job.setOutputFormatClass(TextOutputFormat.class);

        

        //把代码提交给JobTracker执行

        job.waitForCompletion(true);

        

        final FSDataInputStream in = fileSystem.open(new Path(OUT_PATH+"/part-r-00000"));

        IOUtils.copyBytes(in, System.out, 1024, true);

    }

    

    public static class MyMapper extends Mapper<LongWritable, Text, TwoInt2, IntWritable>{

        

        final TwoInt2 k2 = new TwoInt2();

        final IntWritable v2 = new IntWritable();

        

        protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable,Text,TwoInt2,IntWritable>.Context context) throws java.io.IOException ,InterruptedException {

            final String[] splited = value.toString().split("\t");

            k2.set(Integer.parseInt(splited[0]), Integer.parseInt(splited[1]));

            v2.set(Integer.parseInt(splited[1]));

            context.write(k2v2);

        };

    }

    //分组后,会产生3个组

    public static class MyReducer extends Reducer<TwoInt2, IntWritable, IntWritable, Text>{

        

        final IntWritable k3 = new IntWritable();

        final IntWritable v3 = new IntWritable();

        protected void reduce(TwoInt2 key2, java.lang.Iterable<IntWritable> value2s, org.apache.hadoop.mapreduce.Reducer<TwoInt2,IntWritable,IntWritable,Text>.Context context) throws java.io.IOException ,InterruptedException {

            k3.set(key2.first);

            StringBuffer sb = new StringBuffer();

            for (IntWritable val : value2s) {

                sb.append(val+",");

            }

            if(sb.length()>0){

                sb.deleteCharAt(sb.length()-1);

            }

            context.write(k3new Text(sb.toString()));

        };

    }

}

class GroupingComparator2 implements RawComparator<TwoInt2>{

    @Override

    public int compare(TwoInt2 o1, TwoInt2 o2) {

        return 0;

    }

    /**

     * b1    第一个参与比较的字节数组

     * s1    参与比较的起始字节位置

     * l1    参与比较的字节长度

     * b2    第二个参与比较的字节数组

     * s2    参与比较的起始字节位置

     * l2    参与比较的字节长度

     */

    @Override

    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {

        return WritableComparator.compareBytes(b1, s1, 4, b2, s2, 4);

    }

    

}

class TwoInt2 implements WritableComparable<TwoInt2>{

    int first;

    int second;

    

    public void set(int first, int second) {

        this.first = first;

        this.second = second;

    }

    

    @Override

    public void write(DataOutput out) throws IOException {

        out.writeInt(this.first);

        out.writeInt(this.second);

    }

    @Override

    public void readFields(DataInput in) throws IOException {

        this.first = in.readInt();

        this.second = in.readInt();

    }

    /**

     * 先比较first,如果first相同,再比较second

     * @param o

     * @return

     */

    @Override

    public int compareTo(TwoInt2 o) {

        if(this.first!=o.first) {

            return this.first-o.first;

        }else {

            return this.second - o.second;

        }

    }

}

Shuffle

图片1

Map阶段

1.每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。

2.写磁盘前,要partition,sort。如果有combiner,combine排序后数据。

3.等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。

Reduce阶段

1.Reducer通过Http方式得到输出文件的分区。

2.TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。

3.排序阶段合并map输出。然后走Reduce阶段。

hadoop的压缩codec

Codec为压缩,解压缩的算法实现。  
在Hadoop中,codec由CompressionCode的实现来表示

图片2

图片3

图片5

© 著作权归作者所有

下一篇: Hive详细教程
simpler
粉丝 25
博文 44
码字总数 40338
作品 0
成都
程序员
私信 提问
大数据经典学习路线(及供参考)之 一

1.Linux基础和分布式集群技术 学完此阶段可掌握的核心能力: 熟练使用Linux,熟练安装Linux上的软件,了解熟悉负载均衡、高可靠等集群相关概念,搭建互联网高并发、高可靠的服务架构; 学完此...

柯西带你学编程
2018/05/22
0
0
大数据hadoop是什么?初学者如何进行系统学习?

大数据的火爆程度在不断的增加,似乎一个行业不和大数据相对接就会显得很“LOW”。大数据行业的薪资水平越来越高,决定了更多的人纷纷学习大数据,hadoop作为大数据的一个核心自然就是学习的...

hainiubuluo
2018/05/25
0
0
Hadoop编写调试MapReduce程序详解

编程学习,最好的方法还是自己动手,所以这里简单介绍在Hadoop上编写调试一个MapReduce程序。 先说一下我的开发环境,我的操作系统是Centos6.0,Hadoop版本是0.20.2,开发环境是eclipse。在H...

miaosu
2013/03/20
1.2W
3
2014-11-12--Hadoop的基础学习(三)--Hadoop中MapReduce框架入门

1.MapReduce的简单概念 百度百科:MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",和他们的主要思想,都是从函数式编程语言里借来的...

查封炉台
2014/11/16
8.3K
8
第6章-MapReduce的工作机制-笔记

作业的提交 可以只用一行代码来运行一个MapReduce作业: JobClient.runJob(conf)。 作业的调度 Hadoop作业调度演进 1、早期版本的Hadoop使用FIFO调度算法来运行作业 早期版本的Hadoop使用一种...

hiqj
2014/10/16
47
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring AOP-06-切入点类型

切入点是匹配连接点的拦截规则。之前使用的是注解@Pointcut,该注解是AspectJ中的。除了这个注解之外,Spring也提供了其他一些切入点类型: • 静态方法切入点StaticMethodMatcherPointcut •...

moon888
昨天
90
0
Class Loaders in Java

1. Introduction to Class Loaders Class loaders are responsible for loading Java classes during runtime dynamically to the JVM (Java Virtual Machine). Also, they are part of the ......

Ciet
昨天
78
0
以Lazada为例,看电商系统架构演进

什么是Lazada? Lazada 2012年成立于新加坡,是东南亚第一电商,2016年阿里投资10亿美金,2017年完成对lazada的收购。 业务模式上Lazada更偏重自营,类似于亚马逊,自建仓储和为商家提供服务...

春哥大魔王的博客
昨天
60
0
【自用】 Flutter Timer 简单用法

dart: void _startTime() async { _timer = Timer(Duration(seconds: sec), () { fun(xxx,yyy,zzz); }); } @override void dispose() { _timer.cancel()......

Tensor丨思悟
昨天
65
0
计算机实现原理专题--自动化(六)

目前实现的加减法器可以扩充到更高位宽的数值范围,不仅可以实现连续n个数的相加减,还可以实现其中几个数的相加减,并保存相应的结果。但是还不能应对某些场景,例如需要复用中间的临时结果...

FAT_mt
昨天
58
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部