文档章节

Hadoop Outline Part 7 (MapReduce Type and Format)

woodo
 woodo
发布于 2014/10/04 10:50
字数 1982
阅读 49
收藏 0

1. MapReduce Types

1.1 MapReduce原形(ProtoType)

map:(k1,v1) -> list(k2,v2)
combine:(k2,list(v2)) ->list(k2,v2)
partition: (k2,v2)->Integer
reduce: (k2,list(v2)) ->list(k3,v3)


类型匹配
虽然没有强制要求,但是注意map的输出一定要和reduce的输入,类型相同。Conf设置里边的参数类型要和mapper/reducer里相同。hadoop程序没有编译时检查,但是运行时,会报告转换错误。
另外,给map/reduce函数加入@Override,也会帮助避免一些错误。

Combiner
合并器,shuffle阶段进行合并,注意它的函数型和reduce一样,不过这里注意,它和reduce唯一的不同是输出参数类型不一样,他的输出是k2/v2。也有使用reduce函数充当combine的,注意除非typeof(k2/v2) = typeof(k3/v3).

Partitioner
public abstract class Partitioner<KEY,VALUE>{
 public abstract int getPartition(Key key, Value value,int numPartitions);
}
分区器,Reduce阶段进行分区,以决定哪些数据进入哪个Reduce分区。


输入类型(Input Type)是有输入数据格式(InputFormat)来决定的。比如TextInputFormat使用LongWritable/Text作为输入。

输出类型由Reduce决定。但是格式仍然有输出数据格式(OutputFormat)决定。比如TextOutputFormat使用key\tvalue. 默认\t作为分隔符。

Type Mapping Diagram

1.2 默认Job类型配置

默认job程序,

@Override
public int run(String[] args) throws Exception {
    Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
    if (job == null) {
        return -1; 
    }

    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(Mapper.class);

    job.setMapOutputKeyClass(LongWritable.class);
    job.setMapOutputValueClass(Text.class);

    /// Default Partitioner 
    job.setPartitionerClass(HashPartitioner.class);
    /// Default Reduce number 
    job.setNumReduceTasks(1);
    job.setReducerClass(Reducer.class);

    job.setOutputKeyClass(LongWritable.class);
    job.setOutputValueClass(Text.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    return job.waitForCompletion(true) ? 0 : 1;
}

注意Partitioner使用的是HashPartitioner
加入程序和注释

public class HashPartitioner<K, V> extends Partitioner<K, V> {
    public int getPartition(K key, V value,int numReduceTasks) {
        return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

Streaming略。

2.输入类型(InputFormat)

2.1 Input Splits and Record

定义
- An input split is a chunk of the input that is processed by a single map. Notice that a split doesn’t contain the input data;
- Each map processes a single split.
- Each split is divided into records, and the map processes each record—a key-value pair in turn.
- Splits and records are logical,nothing tied to files.

public abstract class InputSplit {
    ///a length in bytes,largest get processed first,minimize the job runtime
    public abstract long getLength() throws IOException, InterruptedException;

    ///a set of storage locations, which are just hostname strings.place map tasks
    ///as close to the split’s data as possible,
    public abstract String[] getLocations() throws IOException,InterruptedException;
}

public abstract class InputFormat<K, V> {
    public abstract List<InputSplit> getSplits(JobContext context) 
                                     throws IOException, InterruptedException;

    ///On a tasktracker, the map task passes the split to the createRecordReader() method  
    ///on InputFormat to obtain a RecordReader for that split.
    public abstract RecordReader<K, V> createRecordReader(InputSplit split, 
                                                    TaskAttemptContext context)  
                                                throws IOException,InterruptedException;
}

RecordReader可以读取记录,产生key-value,并设置在Context里边,然后调用mapper->run(),如下:

public class Mapper{
    public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context); 
        }
        cleanup(context);
    }
}


注意:只有当nextKeyValue被调用时,才会改变当前currentkey和value。
Key和Value是immutable的,所以需要new Text(value),才能保留正确的值。

MultithreadedMapper可以使用多线程,并发的运行多个map函数。如果map函数有网络或磁盘io时,会显著提高效率。

2.2 InputFormat

addInputPath 默认加入目录下的所有文件,但是并不递归下面子目录。可以通过set mapred.input.dir.recursive = true 来enable这个功能。
PathFilter 来排出不想要的文件。默认隐藏文件不被包含。比如以.和_开头的文件。

Split的重要属性
mapred.min.split.size,默认1,最小的有效split的大小,单位byte
mapred.max.split.size,默认Long.MAX_VALUE,最大的有效split的大小,单位byte
dfs.block.size,默认64M, hdfs的block大小,单位byte
splitsize = max(minimumSplitSize,min(maximumSplitSize,blockSize));

一个有趣的问题: 对于一个split来说,如果一个record就比splitsize大了,怎么办? 这个其实如果读了代码,就好理解,所谓splitsize都是在读完一个record后判断的,所以这种情况下,实际split的size确实大于约定的splitsize, 也就是说在map的输入你会得到整个record,不会因为blocksize小,而被截断. 再说record是logical record, 也是跨block的。


CombineFileInputFormat
对于很多数量的小文件,FileInputFormat会把每个文件作为一个split来处理,这样图增很多map任务,如果文件很小,map会有浪费。CombineFileInputFormat可以把多个文件的数据合并到一个split,而且还要考虑data locality的问题,选择最优的小文件合并。实际上就是: 合并读入.

不过应该尽量避免小文件,可选的方案:
 - SequenceFile,合并小文件到这种文件,文件名作key,内容作value。
 - Archive File
Sample: CombineFileInputFormat

阻止分片(preventing splitting)
 - dirty方案:set minimumSplitSize = Long.MAX_VALUE
 - 最佳方案:继承一个InputFormat
  

 @Override
 public boolean isSplitable(JobContext context,Path file){
     return false
 }

Map中访问文件信息
InputSplit split = Context.getInputSplit();
split.getPath(); //input file path
split.getStart(); //byte offset of current split from the beginning
split.getLength(); //size of split by byte

Sample: WholeFileInputFormat
Sample: SmallFilesToSequenceFileConventer

TextInputFormat
默认的InputFormat。每条记录是一串(行)文字,不包含\r\n.Key是LongWritable,byte offset in file。

跨越block的行。logical record会读取下一个block中残余的行。

KeyValueTextInputFormat
这个类正好匹配hadoop的输出TextOutputFormat
delimiter默认是\t.设置mapreduce.input.keyvaluelinerecordreader.key.value.separator

注意: Key-value都是Text类型,如果需要其他类型,需要自己转换.

NLineInputFormat
默认情况下的TextInputFormat,每个split可能包含可变数目的record,这个数目决定于split size和line length。
NLineInputFormat可以控制每个mapper收到固定数目的record,N的设定在:mapreduce.input.lineinputformat.linespermap.
map必须10分钟报告进度,否则可能timeout

XML
如何读取XML呢,xml必须是顶级结点的行。如root,node1,node2。。
job.setInputFormatClass = StreamInputFormat.class
set stream.recordreader.class = org.apache.hadoop.streaming.StreamXmlRecordReader
set start-tag and end-tag.
key:LongWritable, Value:<start-tag>....</start-tag>

上面的这些在hadoop2中已经失效,所以我模仿mahout的xmlInputFormat写了一个,代码如下:
Sample: XmlInputFormat,

package com.jinbao.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
 * Reads records that are delimited by a specifc begin/end tag.
 */
public class XmlInputFormat extends FileInputFormat<LongWritable, Text> {
 public static final String START_TAG_KEY = "xmlinput.start";
 public static final String END_TAG_KEY = "xmlinput.end";
 @Override
 public RecordReader<LongWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
  XmlRecordReader reader = new XmlRecordReader();
  reader.initialize(split, context);
  return reader;
 }
 /**
  * XMLRecordReader class to read through a given xml document to output xml
  * blocks as records as specified by the start tag and end tag
  * 
  */
 public static class XmlRecordReader extends RecordReader<LongWritable, Text> {
  private FileSplit fileSplit;
  private Configuration conf;
  
  private byte[] startTag;
  private byte[] endTag;
  private long start;
  private long end;
  private FSDataInputStream fsin;
  private DataOutputBuffer buffer = new DataOutputBuffer();
  private LongWritable key = new LongWritable();
  private Text value = new Text();
  
  @Override
  public void initialize(InputSplit split, TaskAttemptContext context) 
          throws IOException, InterruptedException {
   this.fileSplit = (FileSplit) split;
   this.conf = context.getConfiguration();
   
   startTag = conf.get(START_TAG_KEY).getBytes("utf-8");
   endTag = conf.get(END_TAG_KEY).getBytes("utf-8");
   // open the file and seek to the start of the split
   start = fileSplit.getStart();
   end = start + fileSplit.getLength();
   Path file = fileSplit.getPath();
   FileSystem fs = file.getFileSystem(conf);
   fsin = fs.open(fileSplit.getPath());
   fsin.seek(start);
  }
  
  @Override
  public boolean nextKeyValue() throws IOException, InterruptedException {
   if (fsin.getPos() < end) {
    if (readUntilMatch(startTag, false)) {
     try {
      buffer.write(startTag);
      if (readUntilMatch(endTag, true)) {
       key.set(fsin.getPos());
       value.set(buffer.getData(), 0, buffer.getLength());
       return true;
      }
     } finally {
      buffer.reset();
     }
    }
   }
   return false;
  }
  private boolean readUntilMatch(byte[] match, boolean withinBlock)
    throws IOException {
   int i = 0;
   while (true) {
    int b = fsin.read();
    // end of file:
    if (b == -1)
     return false;
    // save to buffer:
    if (withinBlock)
     buffer.write(b);
    // check if we're matching:
    if (b == match[i]) {
     i++;
     if (i >= match.length)
      return true;
    } else
     i = 0;
    // see if we've passed the stop point:
    if (!withinBlock && i == 0 && fsin.getPos() >= end)
     return false;
   }
  }
  
  @Override
  public void close() throws IOException {
   fsin.close();   
  }
  @Override
  public LongWritable getCurrentKey() throws IOException, InterruptedException {
   return key;
  }
  @Override
  public Text getCurrentValue() throws IOException, InterruptedException {
   return value;
  }
  @Override
  public float getProgress() throws IOException, InterruptedException {
   return (fsin.getPos() - start) / (float) (end - start);
  }
 }
}

  public static int run(Configuration conf,String in, String out) throws Exception {
     //for input xml
     conf.set("xmlinput.start", "<person>");
     conf.set("xmlinput.end", "</person>"); // no reduces

     Job job = Job.getInstance(conf,"XML2Tabular");
     job.setJarByClass(Xml2Tabular.class);
     job.setInputFormatClass(XmlInputFormat.class);
     job.setOutputFormatClass(TextOutputFormat.class);
     job.setMapperClass(XMLMapper.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(Text.class);
     FileInputFormat.addInputPath(job, new Path(in));
     FileOutputFormat.setOutputPath(job,new Path(out));
     return job.waitForCompletion(true) ? 0 : 1;

 }


SequenceFileInputFormat
它也能读MapFile。key:Value in SequenceFile。
对应output:SequenceFileOutputFormat

SequenceFileAsTextInputFormat
它转换key/value为Text类型,主要为Streaming服务。
无对应output

SequenceFileAsBinaryInputFormat
对应output:SequenceFileAsBinaryOutputFormat

MultipleInputs
MultipleInputs.addInputPath(job,path1,TextInputFormat.class,Mapper1.class);
MultipleInputs.addInputPath(job,path2,SequenceFileInputFormat.class,Mapper2.class);

DBInputFormat/DBOutputFormat

DBInputFormat用jdbc接口访问数据库。注意数据库太大,内存局限。
DBOutputFormat用来导出MR结果到数据库中。
另外,Sqoop也是一个选择。

TableInputFormat/TableOutputForm

是为HBase准备的table访问类。

2.输出类型(OutputFormat)

TextOutputFormat
分隔符是tab字符(\t),由mapreduce.output.textoutputformat.separator修改。
它的输出可以由KeyValueTextInputFormat读取。
Suppress key或value:NullWritable type
如果没有输出,可以是NullOutputFormat

MapFileOutputFormat
写MapFile,key是排序的。

MultipleOutputs

可以拥有额外输出,可以有自定义的文件名,一般在reduce函数中用. 但是map中应该不可以用.

String basePath = String.format("%d/part", key.get());  
multipleOutputs.write(new Text(valuesum), key, basePath);


© 著作权归作者所有

woodo
粉丝 5
博文 57
码字总数 32118
作品 0
朝阳
高级程序员
私信 提问
hadoop集群搭建

最近在搭建hadoop集群,格式化namenode成功之后又突然shutting down 了,不知道是什么问题,搭建流程如下: Hadoop集群搭建步骤 1. 架构图 2. 准备5台机器 centosa: 192.168.42.128 centosb:...

lvzhl
2016/12/11
642
3
Hadoop源代码分析(包hadoop.mapred中的MapReduce接口)

前面已经完成了对org.apache.hadoop.mapreduce的分析,这个包提供了Hadoop MapReduce部分的应用API,用于用户实现自己的MapReduce应用。但这些接口是给未来的MapReduce应用的,目前MapReduce...

超人学院
2015/05/25
114
0
大数据之---Yarn伪分布式部署和MapReduce案例

1、软件环境 本次涉及伪分布式部署只是要主机hadoop01,软件安装参考伪分布式部署终极篇 2、配置yarn和mapreduce 3、提交测试jar计算圆周率 job15248048138350001 job命名格式: jobunix时间...

ycwyong
2018/05/17
0
0
关于hadoop-2.0.2-alpha源码在eclipse下编译的问题

近日,小弟想要在eclipse下编写mapreduce程序,由于新版本中已不附带eclipse-plugin,所以只能自己动手编译了,可却遇到了以下错误,故来请教各位前辈同行,这是我的原因还是hadoop新版本的源...

wmsz
2012/11/28
2.4K
3
新手求助:格式化HDFS文件系统 报错 namenode

刚开始学习hadoop,现在我在虚拟机中模拟了两台centos虚拟机,配置到格式化HDFs文件系统时报错。 百度了一翻并不知道怎么解决,求大神指导,谢谢 [hadoop@Master ~]$ hdfs namenode -format 1...

小刺猬2号
2015/11/10
1K
1

没有更多内容

加载失败,请刷新页面

加载更多

深入理解JVM - 类加载机制

类加载过程 一个类型从被加载到虚拟机内存中开始,到卸载出内存为止,它的整个生命周期将会经历加载(Loading)、验证(Verification)、准备(Preparation)、解析(Resolution)、初始化(...

xiaolyuh
11分钟前
57
0
脸盲症的小伙伴 测试下你的脸盲症程度

笔者在背单词的时候突然想到了一个问题,就是背单词的时候,相近的词容易混淆,例如:coast和roast,在我背诵的时候,我就很烦恼,不光是英文单词,还有汉字,例如“籍”和“藉“,我还是个中...

蛤蟆丸子
13分钟前
50
0
「网易官方」极客战记(codecombat)攻略-地牢-囚犯the-prisoner

解放囚犯,你会得到盟友。 简介 敬请期待! 默认代码 # 释放囚犯,击败守卫并夺取宝石。 # 从"Weak Door"后解救Patrick。 # 击败名为"Two"的守卫。 # 获得宝石。 概览 您可以按照名称 "Weak ...

极客战记
14分钟前
12
0
Final cut pro 10.4.4中文版本

1.双击打开dmg,点击红框图示 2.出现这个界面后直接回车 3直接将fcp拖拽到application文件夹 然后就可以直接打开了! 百度网盘地址:链接: https://pan.baidu.com/s/1Db9hXmzPV4EdR7_LxEqctA...

kylin_ink
15分钟前
32
0
jquery.validate

规则名称 类型 描述 required Boolean 设置该项内容为必填 remote Json|String 请求远程资源来校验内容有效性 minlength Number 设置内容的最少字符长度 maxlength Number 设置内容的最多字符...

愚蠢的土豆
15分钟前
129
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部