文档章节

Hadoop Outline Part 8 (MapReduce Features)

woodo
 woodo
发布于 2014/10/29 23:02
字数 2109
阅读 38
收藏 0

1. Counter
1.1 内建计数器(Built-in Counters)

1.11 Task counters
每一个Built-in计数器组要么包含一个task counters (for task progresses) 或者是job counters (for progresses).
Task Counter在每一个map或reduce中收集,定期的发给task Tracker, 然后发到job tracker.
MapReduce task counters
Filesystem counters
FileInputFormat counters
FileOutputFormat counters
Job counters

1.2. 自定义计数器(User-Defined Counters)

context.getCounter(Temperature.MISSING).increment(1);

1.2.1 动态计数器(Dynamic counters)

public void incrCounter(String group, String counter, long amount)

1.2.2 Readable counter names

配置一个property文件,放在counter所在类的同级目录.
命名方法: 使用下划线分隔不同的类,比如 MyWordCount_BadRecords.properties
本地化居然也可以:MyWordCount_BadRecords_zh_CN.properties

CounterGroupName=Air Temperature Records
MISSING.name=Missing
MALFORMED.name=Malformed

1.2.3 Retrieving counters

job激活数,mapred.jobtracker.completeuserjobs.maximum,默认100,超过的会被清除掉,所以job有可能空

Cluster cluster = new Cluster(getConf());
Job job = cluster.getJob(JobID.forName(jobID));

Counters counters = job.getCounters();
long missing = counters.findCounter(MaxTemperatureWithCounters.Temperature.MISSING).getValue();
long total = counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();

1.3 User-Defined Streaming Counters

ignore for Streaming topic.

2. Sorting 

2.1 Partial Sort

我们知道shuffle会对key进行排序,然后在产生的每个partitionzhong,key都是排序的,只不过这个排序不是全局的,所以叫Partial Sort. 如果数据集比较小,可以使用merge和sort命令,如果大的结果集,请看下面Total Sort.
当然了,如果结果写入SequenceFile或MapFile,MR提供了Sort方法进行排序和索引。

 对于一个job,对sort的控制有如下步骤:
 1.job.setSortComparatorClass(),这个可以提供一个类来特化Key的排序规则。
 2.如果上面1没设置,key必须是WritableComparable的子类,因为必须实现compareTo方法。
 3.如果2种的类,没有注册RawComparator,那么RawComparator将使用Ser/De来构造对象,然后调用WritableComparable的compareTo方法。这当然是效率低的,所以实现自己的RawComparator
对效率提升很有必要,尤其是大对象。

2.2 Total Sort

对于一个Total Sort的需求,一个Naive的方法是只指定一个Reduce,这样自然是全局排序的.但是如果结果集比较大,则applicationMaster将负担过重,失去了分布式的优势。

既然Partitioner会造成Partial sort,那么如果,我们能够让Partitioner告诉MapReduce,什么样的值必须去某个Reduce,那不就是Total Sort了吗?

的确,它是可行的。我给这种方法取了名字,叫Ranged Partitioning。
实践中,带来一个问题,就是这个Range的boundary不好取,你没法保证range的均匀性。不均匀带来一个比较不好的性能。

当然全部浏览数据也是不现实的,可行的办法是做Sampling,来预测一个较好的数值分布。好消息是,Hadoop已经提供InputSampler.Sampler接口,和一些有用的实现. 这里我隆重介绍InputSampler和TotalSortPartitioner.

2.2.1 InputSampler

InputSampler类的结构如下,通过集成Sampler Interface的三种实现(SplitSampler,RandomSampler和IntervalSampler),当然你也可以自己写自己的Sampler.

/**
 * Utility for collecting samples and writing a partition file for
 * {@link  org.apache.hadoop.mapred.lib.TotalOrderPartitioner}.
 */
public class InputSampler<K,V> implements Tool {
  /**
   *采样器接口
   */
  public interface Sampler<K,V> {
    /**
     * 从输入数据几种获得一个数据采样的子集,然后通过这些采样数据在Map端由
     * TotalOrderPartitioner对处理数据做hash分组,以保证不同Reduce处理数据的有序性。
     * 该方法的具体采样逻辑由继承类实现。
     * For a given job, collect and return a subset of the keys from the
     * input data.
     */
    K[] getSample(InputFormat<K,V> inf, JobConf job) throws IOException;
  }
  /**
   * 分片数据采样器,即从N个分片中采样,效率最高
   * Samples the first n records from s splits.
   * Inexpensive way to sample random data.
   */
  public static class SplitSampler<K,V> implements Sampler<K,V> {
     ...
  }
  /**
   * 通用的随机数据采样器,按一定的频率对所有数据做随机采样,效率很低,
   * 但是对于分布不可预测的数据可能效果比较好.
   * Sample from random points in the input.
   * General-purpose sampler. Takes numSamples / maxSplitsSampled inputs from
   * each split.
   */
  public static class RandomSampler<K,V> implements Sampler<K,V> {
    ...
  }
  /**
   * 有固定采样间隔的数据采样器,适合有序的数据集,效率较随机数据采样器要好一些
   * Sample from s splits at regular intervals.Useful for sorted data.
   */
  public static class IntervalSampler<K,V> implements Sampler<K,V> {
  }
}


我们先不管Sampler的具体抽样细节,从外围看都是产生一个系列的K[] = getSample(),这个key数组提供给TotalSortPartitioner来做Boundary。Sampler一般都是获取所需要的splits进行split级别的sampling,然后在record级别进行sampling. 当然不同的sampling策略,会影响sampling的效果和数据分布.这也是上面三个不同实现的重点部分.

题记: 这里发点负能量的评论. 看hadoop看到这个sampler,我真心觉得有点蛋疼:
1. 这个sampler是基于mapper输入的,而不是基于输出,这也是为什么很多人抱怨的原因,这尼玛怎么这么stupid呢,难道不是输出才需要作partitioning吗?
2. 这个sampler是基于本地的,也就是有网络io+磁盘io的,居然没有map-reduce版本的sampling吗,开始写的时候,没有灵活设计吗?
3. 我用cdh5.1测试,发现_partition.lst不能产生,产生也不能产生预想的结果。在这里浪费了几天时间,无奈暂时撤兵,后来再看。

对这个部分的实现不敢恭维,以后没人改,我写一个看看。。


2.2.2 TotalSortPartitioner

TotalOrderPartitioner依赖于一个partition file来distribute keys,partition file是一个实现计算好的sequence file,如果我们设置的reducer number是N,那么这个文件包含(N-1)个key分割点,并且是基于key comparator排好序的。TotalOrderPartitioner会检查每一个key属于哪一个reducer的范围内,然后决定分发给哪一个reducer。 

2.3 Secondary Sort

所谓Secondary Sort,就是对于k-v有多个排序纬度,比如wordcount中,第一排序字段是count倒序,第二排序字段是word字母顺序,这样的需求无法以来单一key的shuffle自然排序来完成,所以必须建立自定义的WritableComparable,并依次提供Partitioner,GroupingSortComparator,Sort:
Partitioner,可以使需要grouping的kv输出到同一个Reducer

- Grouping, setGroupingComparatorClass

- Sort, setSortComparatorClass

3. Joins

3.1 Map Side Join

所谓MapSide Join简单而言就是Join数据连接操作发生在Map阶段,一般没有Reducer,也就是说没有Shuffle过程,其自然是非常高效的.这里有一个概念Side Data,是说这种类型的Join,适合一个大数据集和一个小数据集的连接,其中小数据集小道比较容易存放在一个机器的内存中.那么这个小数据集称为SideData.
实践中,有这么几种方法来存储SideData:
- Configuration参数存储,如果SideData是非常小的,可以使用序列化+压缩的办法,把数据存储在Job配置中。缺点是:这种办法会显著增加jobtraker和tasktracker进程的内存,浪费读取时间。
- Distribute Cache存储,简单说就是把数据存储在某固定存储上,一般用HDFS文件存储,HBase,MongoDB等。特点是在Mapper的setup节点,读取这些文件。Hadoop特别提供了-file,可以以逗号添加多个文件,可以来自于本地,hdfs,S3等。


问题: 如果是大文件,无法在单机内存加载怎么办呢?书上推荐MapFile格式存储,可以基于Map特征进行 FileAPI 级别的检索数据,只加载命中数据。HBase等也是一个选择,如果有的话。

3.2 Reduce Side Join

所谓ReduceSide Join是Join发生在Reduce阶段,其模型是比较简单的,过程如下:
  a. Map输入,这里一般多个Input,无论是否使用MutlipleInput或CompositeInputFormat,总之,Join的On字段,将作为key输出。
  如:k1->v1, k1->v2
  b. Reduce合并,把同一个key的Values,Join在一起:
  如:k1,v1,v2
  c. 笛卡尔乘积, 上面的如果是1-1,那么OK。如果是1-n或者m-n怎么处理呢?答案是进行Value和文件名的关联,这样在Reduce的时候,可以进行m*n的笛卡尔乘积。为了避免长文件名编入PairValue,可以对文件名做Hash值,或者做自然数全局索引。
  原理如下:file1 join file2, map产出:    k1->Pair<f1,v1>, k1->Pair<f2,v2> and k1->Pair<f2,v3>, 那么Reduce时,比较简单做出v1-v2,v1-v3的join处理。

3.3. Side Data Distribution

见MapSide Join

3.4. MapReduce Library Classes

- ChainMapper+ChainReducer,M(Ms)+R(Ms)
- FieldSelectionMapper and FieldSelectionReducer (new API)
- IntSumReducer,LongSumReducer

- MultithreadedMapper (new API)

- TokenCounterMapper
- RegexMapper

© 著作权归作者所有

woodo
粉丝 5
博文 57
码字总数 32118
作品 0
朝阳
高级程序员
私信 提问
MapReduce和Spark的区别

性能: Spark是在内存中处理数据的,而MapReduce是通过map和reduce操作在磁盘中处理数据,所以正常情况下Spark的处理速度会比mapreduce快。但是当数据量大,不能一次性加载到内存的时候,Spa...

无精疯
04/15
106
0
阿里云 E-MapReduce产品优势及使用场景

E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 产品优势: 与自建集...

凹凹凸曼
2018/09/12
0
0
如何分布式运行mapreduce程序

如何分布式运行mapreduce程序 一、 首先要知道此前提 若在windows的Eclipse工程中直接启动mapreduc程序,需要先把hadoop集群的配置目录下的xml都拷贝到src目录下,让程序自动读取集群的地址后...

Zero零_度
2015/09/06
248
0
Hadoop2.X的安装与配置(二)本地模式

在上一篇文章中,我们介绍了Hadoop2.X安装与配置前的准备阶段。 在本地模式配置前,首先完成准备阶段。 点击如下链接,进入准备阶段的配置 https://blog.csdn.net/weixin38187469/article/d...

weixin_38187469
2018/04/16
0
0
从分治算法到 MapReduce

从分治算法说起 要说 MapReduce 就不得不说分治算法,而分治算法其实说白了,就是四个字 分而治之 。其实就是将一个复杂的问题分解成多组相同或类似的子问题,对这些子问题再分,然后再分。直...

终日而思一
2018/11/23
42
0

没有更多内容

加载失败,请刷新页面

加载更多

java通过ServerSocket与Socket实现通信

首先说一下ServerSocket与Socket. 1.ServerSocket ServerSocket是用来监听客户端Socket连接的类,如果没有连接会一直处于等待状态. ServetSocket有三个构造方法: (1) ServerSocket(int port);...

Blueeeeeee
今天
6
0
用 Sphinx 搭建博客时,如何自定义插件?

之前有不少同学看过我的个人博客(http://python-online.cn),也根据我写的教程完成了自己个人站点的搭建。 点此:使用 Python 30分钟 教你快速搭建一个博客 为防有的同学不清楚 Sphinx ,这...

王炳明
昨天
5
0
黑客之道-40本书籍助你快速入门黑客技术免费下载

场景 黑客是一个中文词语,皆源自英文hacker,随着灰鸽子的出现,灰鸽子成为了很多假借黑客名义控制他人电脑的黑客技术,于是出现了“骇客”与"黑客"分家。2012年电影频道节目中心出品的电影...

badaoliumang
昨天
15
0
很遗憾,没有一篇文章能讲清楚线程的生命周期!

(手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本。 简介 大家都知道线程是有生命周期,但是彤哥可以认真负责地告诉你网上几乎没有一篇文章讲得是完全正确的。 ...

彤哥读源码
昨天
15
0
jquery--DOM操作基础

本文转载于:专业的前端网站➭jquery--DOM操作基础 元素的访问 元素属性操作 获取:attr(name);$("#my").attr("src"); 设置:attr(name,value);$("#myImg").attr("src","images/1.jpg"); ......

前端老手
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部