文档章节

hadoop-reduce分析

cookqq
 cookqq
发布于 2014/01/05 20:22
字数 1086
阅读 129
收藏 0

【推荐】2019 Java 开发者跳槽指南.pdf(吐血整理) >>>

Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出。

* Licensed to the Apache Software Foundation (ASF) under one

package org.apache.hadoop.mapreduce;

import java.io.IOException;

 * Reduces a set of intermediate values which share a key to a smaller set of
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {

  public class Context 
    extends ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
    public Context(Configuration conf, TaskAttemptID taskid,
                   RawKeyValueIterator input, 
                   Counter inputKeyCounter,
                   Counter inputValueCounter,
                   RecordWriter<KEYOUT,VALUEOUT> output,
                   OutputCommitter committer,
                   StatusReporter reporter,
                   RawComparator<KEYIN> comparator,
                   Class<KEYIN> keyClass,
                   Class<VALUEIN> valueClass
                   ) throws IOException, InterruptedException {
      super(conf, taskid, input, inputKeyCounter, inputValueCounter,
            output, committer, reporter, 
            comparator, keyClass, valueClass);
    }
  }

  /**
   * Called once at the start of the task.
   */
  protected void setup(Context context
                       ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * This method is called once for each key. Most applications will define
   * their reduce class by overriding this method. The default implementation
   * is an identity function.
   */
  @SuppressWarnings("unchecked")
  protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                        ) throws IOException, InterruptedException {
    for(VALUEIN value: values) {
      context.write((KEYOUT) key, (VALUEOUT) value);
    }
  }

  /**
   * Called once at the end of the task.
   */
  protected void cleanup(Context context
                         ) throws IOException, InterruptedException {
    // NOTHING
  }

  /**
   * Advanced application writers can use the 
   * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
   * control how the reduce task works.
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    while (context.nextKey()) {
      reduce(context.getCurrentKey(), context.getValues(), context);
    }
    cleanup(context);
  }
}

Mapper的结果,可能送到可能的Combiner做合并,Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。

Mapper最终处理的结果对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer那,哪个key到哪个Reducer的分配过程,是由Partitioner规定的,它只有一个方法,输入是Map的结果对<key, value>和Reducer的数目,输出则是分配的Reducer(整数编号)。系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。

* Licensed to the Apache Software Foundation (ASF) under one

package org.apache.hadoop.mapreduce;

 * Partitions the key space.
public abstract class Partitioner<KEY, VALUE> {
  
  /** 
   * Get the partition number for a given key (hence record) given the total 
   * number of partitions i.e. number of reduce-tasks for the job.
   *   
   * <p>Typically a hash function on a all or a subset of the key.</p>
   *
   * @param key the key to be partioned.
   * @param value the entry value.
   * @param numPartitions the total number of partitions.
   * @return the partition number for the <code>key</code>.
   */
  public abstract int getPartition(KEY key, VALUE value, int numPartitions);
  
}

 * Licensed to the Apache Software Foundation (ASF) under one

package org.apache.hadoop.mapreduce.lib.partition;

import org.apache.hadoop.mapreduce.Partitioner;

/** Partition keys by their {@link Object#hashCode()}. */
public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
                          int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

Reducer是所有用户定制Reducer类的基类,和Mapper类似,它也有setup,reduce,cleanup和run方法,其中setup和cleanup含义和Mapper相同,reduce是真正合并Mapper结果的地方,它的输入是key和这个key对应的所有value的一个迭代器,同时还包括Reducer的上下文。系统中定义了两个非常简单的Reducer,IntSumReducer和LongSumReducer,分别用于对整形/长整型的value求和。

* Licensed to the Apache Software Foundation (ASF) under one

package org.apache.hadoop.mapreduce.lib.reduce;

import java.io.IOException;

public class IntSumReducer<Key> extends Reducer<Key,IntWritable,
                                                Key,IntWritable> {
  private IntWritable result = new IntWritable();

  public void reduce(Key key, Iterable<IntWritable> values, 
                     Context context) throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
      sum += val.get();
    }
    result.set(sum);
    context.write(key, result);
  }

}

Reduce的结果,通过Reducer.Context的方法collect输出到文件中,和输入类似,Hadoop引入了OutputFormat。OutputFormat依赖两个辅助接口:RecordWriter和OutputCommitter,来处理输出。RecordWriter提供了write方法,用于输出<key, value>和close方法,用于关闭对应的输出。OutputCommitter提供了一系列方法,用户通过实现这些方法,可以定制OutputFormat生存期某些阶段需要的特殊操作。我们在TaskInputOutputContext中讨论过这些方法(明显,TaskInputOutputContext是OutputFormat和Reducer间的桥梁)。

OutputFormat和RecordWriter分别对应着InputFormat和RecordReader,系统提供了空输出NullOutputFormat(什么结果都不输出,NullOutputFormat.RecordWriter只是示例,系统中没有定义),LazyOutputFormat(没在类图中出现,不分析),FilterOutputFormat(不分析)和基于文件FileOutputFormat的SequenceFileOutputFormat和TextOutputFormat输出。

基于文件的输出FileOutputFormat利用了一些配置项配合工作,包括mapred.output.compress:是否压缩;mapred.output.compression.codec:压缩方法;mapred.output.dir:输出路径;mapred.work.output.dir:输出工作路径。FileOutputFormat还依赖于FileOutputCommitter,通过FileOutputCommitter提供一些和Job,Task相关的临时文件管理功能。如FileOutputCommitter的setupJob,会在输出路径下创建一个名为_temporary的临时目录,cleanupJob则会删除这个目录。

SequenceFileOutputFormat输出和TextOutputFormat输出分别对应输入的SequenceFileInputFormat和TextInputFormat



本文转载自:http://caibinbupt.iteye.com/blog/339192

上一篇: hadoop-ID分析
下一篇: hadoop-Mapper分析
cookqq

cookqq

粉丝 119
博文 268
码字总数 156096
作品 0
海淀
技术主管
私信 提问
加载中

评论(2)

cookqq
cookqq 博主
sdf
cookqq
cookqq 博主
sdf
分布式计算核武器hadoop--性能篇

hadoop获得的巨大的成功令人不得不关注和研究它。本文将从它如何提高计算性能,减少计算时间入手分析hadoop。 1.性能可以线性提高 hadoop的计算模型就是map/reduce,每一个计算任务会被分割成...

loveczp
2011/08/19
0
0
Hadoop实战之MapReducer项目结构分析

一.MapReducer项目结构分析 1.前言 参考本例子前: 1.需要确保搭建好了hadoop集群环境。 2.安装了eclipse开发hadoop的环境。 3.这是关于MapReducer中的Mapper、Reduce、Job三个类的学习分析。...

itbiggod
2018/05/06
0
0
记Hadoop2.5.0线上mapreduce任务执行map任务划分的一次问题解决

前言 近日在线上发现有些mapreduce作业的执行时间很长,我们需要解决这个问题。输入文件的大小是5G,采用了lzo压缩,整个集群的默认block大小是128M。本文将详细描述这次线上问题的排查过程。...

小萝卜_
2016/12/08
42
0
Mahout安装与配置笔记

一、硬件环境 操作系统:Linux ubuntu-13.04-desktop-i386 jdk安装版本:jdk-7u51-linux-i586 Hadoop版本:Hadoop-1.1.1(一个Namenode,三个Datanode部署) 二、安装步骤 在Mahout安装之前读...

kartik
2014/06/01
0
0
JSON查询语言--Jaql

Jaql 是一个 JSON 的查询语言,用于简化 JSON 数据的建模和操作,主要用于分析大规模的半结构化数据。核心特性包括:可扩展性和并行处理。使用 Hadoop 的 Map-Reduce 作为并行计算平台。 Ja...

匿名
2011/07/21
2.5K
2

没有更多内容

加载失败,请刷新页面

加载更多

dynamic-connectivity 动态连通性问题之 quick-union 算法

quick-union 的思想是:若对象 p 的 root_id 和对象 q 的 root_id 相等,则认为 p 和 q 连通。 若要将对象 p 和对象 q 连通(已知两对象未连通),则将 p 的 root_id 的值设为 q 的 root_id ...

Phpythoner_Alei
今天
40
0
OSChina 周六乱弹 —— 实在选不出来就唱国歌

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @花间小酌 :#今日歌曲推荐# 分享阿冗的单曲《你的答案》。--祝大家在2020年都找到自己答案。 《你的答案》- 阿冗 手机党少年们想听歌,请使劲...

小小编辑
今天
25
1
Maven打包可执行Jar包的方法

在使用Java开发中,会使用到将工程打包成可执行的jar包的情况,那么在maven中怎么将项目中的依赖包都添加到jar中呢。在pom.xml中添加一下插件: <build><plugins><plugin><ar...

CapJes
今天
13
0
使用vue 开发地图类系统(openlayers.js)的注意。

使用vue 开发地图类系统的注意。 1、使用地图应该创建的对象 少使用 vue 的data 和计算属性(comments)存数据或是vuex。 为什么要要注意这个问题呢? 答:这个就要了解到vue的实现原理 。原理...

DY-Tao
昨天
10
0
web移动端学习:高德地图demo(一)

在高德地图开发中申请开发者资格,然后在控制台中新建应用,获得KEY; 新建模板HTML文件; <!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><title>地图demo</title><scri......

dxiya
昨天
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部