文档章节

Hadoop文件分片与InputFormat

o
 osc_gu9d45li
发布于 2019/04/28 14:55
字数 2475
阅读 15
收藏 0

精选30+云产品,助力企业轻松上云!>>>

 

1. Mapper Reducer 数量

对于一个默认的MapReduce Job 来说,map任务的数量等于输入文件被划分成的分块数,这个取决于输入文件的大小以及文件块的大小(如果此文件在 HDFS中)。但是对于 reduce的任务,并不会自动决定reducer数目的大小,若未指定,则默认为1。例如:

 

 

但单个reducer任务执行效率不尽人意,在实际场景中会将它设置为一个较大的数值。此时,决定Key条目被送往哪个reducer由方法 setPartitionerClass() 指定:job.setPartitionerClass(HashPartitioner.class); 

 
默认为HashPartitioner,它会将每条KeyHash,然后与最大的整型值做一次按位与操作,以得到一个非负整数。然后对分区数做取模(mod)操作,将key分配到其中一个分区。这里的分区数即为reducer数目。HashPartitioner 源码如下:
 
public class HashPartitioner<K, V> extends Partitioner<K, V> {
    
public HashPartitioner() {
    }

   
public int getPartition(K key, V value, int numReduceTasks) {
       
return (key.hashCode() & 2147483647) % numReduceTasks;
    }
}

 

若是为reducer数目设置为默认值1,则所有的中间数据都会被放入到一个reducer中,作业处理效率会非常低效。若是设置了过大的值,则每个reducer都会输出一个文件,会导致过多的小文件。

在为一个任务选择多少个reducer个数时,应遵循的原则为:目标reducer保持在每个运行5分钟左右,且产生至少一个HDFS块的输出比较合适。

记录在发送给 reducer 之前,会被MapReduce系统进行排序。因此输入文件中的行会被交叉放入一个合并后的输出文件。

 

2. 输入格式

我们已经了解到map的输入是分片(split),一个分片对应一个mapper,且仅被一个mapper处理。分片里面是多条记录(item)。“输入分片”在Hadoop中以InputSplit 接口的方式提供:

public interface InputSplit extends Writable {
   
long getLength() throws IOException;

    String[] getLocations()
throws IOException;
}

 

它包含两个方法,分别为getLength() getLocations()。其中getLength() 用于获取数据的长度(以字节为单位);getLocations() 用于获取一组存储位置(也就是一组主机名)。其中getLocations()的返回值由mapreduce系统获取后,实现data locality,也就是尽量将map任务放在离数据节点近的地方。而getLength() 的返回值用于排序分片,将最大的分片优先处理,以最小化整个作业运行的时间。

InputSplitmapreduce中的分片)由InputFormat创建,它负责创建输入分片,并将它们分成一条条记录(item)。首先简单看一下InputFormat 抽象类:

public abstract class InputFormat<K, V> {
   
public InputFormat() {
    }

   
public abstract List<InputSplit> getSplits(JobContext var1) throws IOException, InterruptedException;

   
public abstract RecordReader<K, V> createRecordReader(InputSplit var1, TaskAttemptContext var2) throws IOException, InterruptedException;
}

 

这里 getSplits() 方法计算分片,然后将计算得到的List 结果发给 application masterApplication master 根据其分片所在节点信息,调度map任务到离分片数据最近的节点。在map任务端,会把输入分片传给 InputFormat createRecordReader() 方法,此方法会返回一个 RecordReader 对象,用于迭代读取这个分片上的记录(item),并生成记录的键值对,之后传递给 map函数。通过查看 Mapper 类中的 run() 方法,更好的了解此过程:

public void run(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) throws IOException, InterruptedException {
   
this.setup(context);

   
try {
       
while(context.nextKeyValue()) {
           
this.map(context.getCurrentKey(), context.getCurrentValue(), context);
        }
    }
finally {
        
this.cleanup(context);
    }

}

 

这里,先运行setup() 操作,然后从 context 不断迭代获取分片的内容,并传给map() 方法,并由map() 方法进一步对 key-value 对进行处理。

 

3. FileInputFormat

Hadoop 中,数据源一般为文件,而 FileInputFormat 类就是用于处理数据源为文件的一个(继承于)InputFormat :

public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {

 

可以看到它是一个抽象类,它的实现类有CombineFileInputFormatTextInputFormatKeyValueTextInputFormatNLineInputFormat以及SequenceFileInputFormat

FileInputFormat类提供两个功能:1. 指出作业的输入文件位置;2. 为输入文件生成分片的代码实现。

FileInputFormat中,作业的输入可以是一个文件、一个目录,也可以是目录与文件的集合。它提供了四种方法用于设置Job的输入路径:

public static void addInputPath(JobConf conf, Path path)
public static void addInputPaths(JobConf conf, String commaSeparatedPaths)
public static void setInputPaths(JobConf conf, Path... inputPaths)
public static void setInputPaths(JobConf conf, String commaSeparatedPaths) 

 

其中addInputPath() addInputPaths() 用于添加路径,以构成路径列表。而setInputPath() 用于一次性设置完整的路径列表(会替换前面所有路径设置)。

在设置路径后,也可以指定需要排除的特定文件,此功能由 setInputPathFilter() 实现:

public static void setInputPathFilter(JobConf conf, Class<? extends PathFilter> filter) {
    conf.setClass(
"mapreduce.input.pathFilter.class", filter, PathFilter.class);
}

 

它可以设置一个过滤器PathFilter,默认的实现是过滤掉隐藏文件(以 . _ 开头的文件)。如果通过setInputPathFilter() 设置过滤器,它会在默认过滤器的基础上进行过滤,也就是说,仅会在非隐藏文件中再次进行过滤。

输入路径的设置可以通过属性与选项进行配置,在属性配置中相关配置为:

mapreduce.input.fileinputformat.inputdir (逗号分隔属性,无默认值)

mapreduce.input.pathFilter.class  PathFilter 类名,无默认值)

 

4. FileInputFormat 类处理输入分片

在设置了一组文件后,FileInputFormat会将文件转换为输入分片。这里需要注意的是:在HDFS中,一个文件可以占用(分布到)多个block,但是不会存在一个block中存多个文件。对于小文件(小于一个HDFS 块大小的文件)来说,一个文件就是占用一个block,但是不会占据整个block的空间。例如,当一个1MB的文件存储在一个128MB 的块中时,文件只使用 1MB 的磁盘空间,而不是128MB)。

FileInputFormat 只分割大文件,也就是文件超过HDFS块的大小。在FileInputFormat中,控制分片大小的属性有:

mapreduce.input.fileinputformat.split.minsize  一个文件分片最小的有效字节数(int类型),默认值为1(字节)

mapreduce.input.fileinputformat.split.maxsize  一个文件分片中最大的有效字节数(long 类型),默认值为Long.MAX_VALUE,即9223372036854775807

dfs.blocksize        HDFS中的块大小(按字节),默认为 128MB(即 134217728

 

最小分片通常为1个字节,用户可以设置最小分片的大小超过HDFS 块大小,这样会强制分片比HDFS块大。但是如果数据存储在 HDFS 上,则这样对data locality 来说,并不友好,以至于延长任务执行时间。

最大分片默认是 Java Long 类型的最大值,只有把它的值设置为小于 HDFS Block 大小才有效,此时会强制分片比块小。

FileInputFormat中,分片的大小由以下公式计算:

protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
   
return Math.max(minSize, Math.min(maxSize, blockSize));
}

 

其中参数部分为:

long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);

 

protected long getFormatMinSplitSize() {
   
return 1L;
}

 

public static long getMinSplitSize(JobContext job) {
   
return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);
}

 

public static long getMaxSplitSize(JobContext context) {
   
return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L);
}

 

minSize 若未指定,则默认为 1MaxSize默认为Java Long类型最大值。再计算时,先取 maxSize blockSize 的最小值,然后再取结果与 minSize的最大值。

在默认情况下:minSize < blockSize < maxSize

所以分片的大小就是 blockSize大小。

 

5. 小文件与 CombineFileInputFormat

相对于大量的小文件,Hadoop更适合处理少量的大文件。其中一个原因是:对于每个小文件(远小于 HDFS块大小),FileInputFormat 都会生成一个分片(生成的分片要么是文件的整个内容,要么是文件的部分内容),这样会产生大量的 map 任务,并且每个map任务仅处理一小部分数据,这样会导致任务执行效率低下,时间过长。

CombineFileInputFormat 可以缓解此问题,它针对小文件而设计。FileInputFormat 为每个小文件产生一个分片,而CombineFileInpurtFormat 把多个文件打包到一个分片中,以便于每个 mapper 可以处理更多的数据。更重要的是:CombineFileInputFormat在分配多个block到同一个 split时,会考虑到node locality 以及 rack locality。所以它的速度在一个典型的 mr 任务中,处理输入的速度并不会下降。

不过尽可能要避免小文件过多的情况,原因有:

  1. 处理小文件会增加运行作业而必须的寻址次数

  2. 浪费namenode的内存

可以尝试使用顺序文件(sequence file)将这些小文件合并成一个或多个大文件:例如将文件名作为key,文件内容作为 value。但是如果集群里已经有了大量小文件,可以尝试一下CombineFileInputFormat 方法。

CombinedFileInputFormat不仅处理小文件有好处,处理大文件时也有益处。例如,如果mapper在处理一个block时仅花费很少的时间,则可以考虑使用CombineFileInputFormat,并将maximum split size 设置为 HDFS block 大小的几倍(参数为mapred.max.split.size)。这样每个mapper会处理多个block,使得整个处理时间下降。

 

6. 避免分片

有时候可能需要计算整个文件里的顺序关系,这种任务无法分布式处理,所以只能让文件由一个mapper处理,此时需要避免文件被分片。

有两种方式可以避免文件被分片,而是当作一个单独分片处理:

  1. 设置最小分片大小split.minsize Java Long类型最大值(long.MAX_VALUE

  2.  使用FileInputFormat 具体子类时,重写isSplitable() 方法,把返回值设置为 false

使用第二种方法时,以 TextInputFormat类为例:

public class TextInputFormat extends FileInputFormat<LongWritable, Text> implements JobConfigurable {
   
private CompressionCodecFactory compressionCodecs = null;

   
public TextInputFormat() {
    }

   
public void configure(JobConf conf) {
       
this.compressionCodecs = new CompressionCodecFactory(conf);
    }

   
protected boolean isSplitable(FileSystem fs, Path file) {
        CompressionCodec codec =
this.compressionCodecs.getCodec(file);
       
return null == codec ? true : codec instanceof SplittableCompressionCodec;
    }

….

}

默认会根据 CompressionCodec 类型判断是否切分,也可以直接指定return false,使得输入文件不可切分。

 

References:Hadoop权威指南第4版 

 

o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。
Hadoop实战读书笔记(7)

输入数据概要 输入数据通常驻留在较大的文件中,通常几十或者数百GB,甚至更大。MapReduce处理的基本原则之一是将输入数据分割成块。这些块可以在多台计算机上并行处理,在Hadoop的术语中这些...

祥林会跟你远走高飞
2014/12/08
142
0
【Hadoop-2.7 源码系列】0-WordCount 解析 及 整个MR Job的执行流程介绍

一、摘自hadoop-example 二、核心组件 Configuration Hadoop 提供了一套独有的配置文件管理系统,并提供API。 Configuration可以合并多个配置文件,产生一个配置文件。 Configuration提供属性...

HarryWu
2018/05/18
80
0
MapReduce的输入格式

MapReduce的输入格式 文件是 MapReduce 任务数据的初始存储地。正常情况下,输入文件一般是存储在 HDFS 里面。这些文件的格式可以是任意的:我们可以使用基于行的日志文件, 也可以使用二进制...

等待救赎
2016/02/18
8
0
Hadoop输入和输出的处理类(7)

hadoop输入的处理类 InputFormat InputFormat负责处理MR的输入部分。 作用: 1、验证作业的输入是否规范。 2、把输入文件切分成InputSplit。 3、提供RecordReader的实现类,把InputSplit读到...

肖鋭
2014/03/01
223
0
通过inputSplit分片size控制map数目

前言:在具体执行Hadoop程序的时候,我们要根据不同的情况来设置Map的个数。除了设置固定的每个节点上可运行的最大map个数外,我们还需要控制真正执行Map操作的任务个数。 1.如何控制实际运行...

仙剑奇侠
2014/12/05
3K
0

没有更多内容

加载失败,请刷新页面

加载更多

linux下java环境搭建

1、jdk下载: 官方地址:https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html 如下图所示,我这边选择的是红框中的版本 2、压缩包上传至服务器 将下载的压缩包上传...

wc_飞豆
49分钟前
17
0
面试题:Java对象不再使用时,为什么要赋值为null?

前言 许多Java开发者都曾听说过“不使用的对象应手动赋值为null“这句话,而且好多开发者一直信奉着这句话;问其原因,大都是回答“有利于GC更早回收内存,减少内存占用”,但再往深入问就回...

码农突围
52分钟前
22
0
设计模式(5) 原型模式

原型模式 原型模式的适用场景 浅拷贝 深拷贝 用Initialize方法修改初始化状态 原型模式与之前学习的各种工厂方法、单例模式、建造者模式最大、最直观的区别在于,它是从一个既有的对象“克隆...

zhixin9001
52分钟前
7
0
获取免费的pycharm激活码网站

http://www.lookdiv.com/

云烟成雨forever
52分钟前
27
0
用Helm部署Kubernetes应用,支持多环境部署与版本回滚

1 前言 Helm是优秀的基于Kubernetes的包管理器。利用Helm,可以快速安装常用的Kubernetes应用,可以针对同一个应用快速部署多套环境,还可以实现运维人员与开发人员的职责分离。现在让我们安...

南瓜慢说
53分钟前
25
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部