文档章节

Hadoop MapReduce流程

满小茂
 满小茂
发布于 2017/04/18 22:32
字数 2643
阅读 290
收藏 5

1.Hadoop MapReduce框架

   hadoop1.x和hadoop2.x使用的的MapReduce模型是不同的,hadoop1.x使用的JobTrackTaskTrack来分配任务和执行任务,而hadoop2.x是使用(yarn框架) 资源管理器ResourceManager、应用主体ApplicationMaster和节点管理器NodeManager来分配任务和执行任务。

 hadoop2.x    yarn框架结构图

     

 

一个应用程序所需的Container分为两大类,如下:

  1. 运行ApplicationMaster的Container:这是由ResourceManager(向内部的资源调度器)申请和启动的,用户提交应用程序时,可指定唯一的ApplicationMaster所需的资源;

 2.运行各类任务的Container:这是由ApplicationMaster向ResourceManager申请的,并由ApplicationMaster与NodeManager通信以启动之。以上两类Container可能在任意节点上,它们的位置通常而言是随机的,即ApplicationMaster可能与它管理的任务运行在一个节点上。

2.MapReduce工作流程

map完成后的中间过程

    map过程包括:
        1). 从磁盘读入数据
        2). 运行map任务
        3). 写结果到磁盘
    reduce过程包括:
        1). shuffle&sort
        2). 运行reduce任务
        3). 写结果到磁盘
    分析
        在map的第三个阶段,map任务的输出会被
Partitioner类以指定的方式区分地写出到输出文件里,如果提供了Combiner,在Mapper输出键值时,键值对不会被马上写到输出里,他们会被缓冲在内存中,当达到一定的数据量时,这部分数据会在Combiner中进行合并,然后再输出到Partitioner中。这个阶段通过将数据写入磁盘提高了系统的可靠性,但降低了性能。

      排序过程:先根据数据所属的partition进行排序,然后每个partition中的数据再按key来排序。partition的目是将记录划分到不同的Reducer上去,以期望能够达到负载均衡,以后的Reducer就会根据partition来读取自己对应的数据。
       在reduce的第一个阶段,Hadoop框架会根据Map结果中的key,将相关的结果传输到某一个Reducer上,这个步骤中的远程传输使用了HTTP协议。

        map->combiner->partitioner->comparator(根据partition排序,每个partition中的数据再根据map输出的key进行排序)->划分到不同Reducer中。

   官网解释的shuffe流程

    

hadoop shuffe阶段调优参数

        mapreduce的优化通常围绕shuffle的过程展开,包括如何增加并发、多用内存少用磁盘、减少shuffle的文件大小等。shuffe阶段我们可以通过配置一些 Hadoop Job 的参数调整 Hadoop shuffle 性能。Hadoop中途的sort及merge,使用多路归并排序方法。

        详细配置参数可看     https://my.oschina.net/manmao/blog/1551042

3.Hadoop知识点小结

(1)map数量控制【显示控制HDFS文件分片大小】

        map数量和InputSpilt的个数有关系,一般是一个InputSpilt一个map,为了调节map个数,往往可以通过设置InputSpilt分片大小调节map个数。

        实现方法,对mapred.min.split.sizemapred.max.split.size的设置重新赋值,我们在程序只需重新赋值给这两个值就可以控制InputSplit分片的大小了。

    假如我们想要设置的分片大小为10MB,我们可以在MapReduce程序的驱动部分添加如下代码

TextInputFormat.setMinInputSplitSize(job,1024L);//设置最小分片大小

TextInputFormat.setMaxInputSplitSize(job,1024×1024×10L);//设置最大分片大小

(2)多文件输入

    对于多个路径的多种文件格式输入可以采用MultipleInputs来处理,不同格式的文件可以采用不同的map操作。

MultipleInputs.addInputPath(job, firstPath, FirstInputFormat.class, FirstMap.class);
MultipleInputs.addInputPath(job, sencondPath, SecondInputFormat.class, SecondMap.class);

(3)多文件输出

       抽象类,主要参考org.apache.hadoop.mapred.lib.MultipleOutputFormat。子类唯一需要实现的方法是:String generateFileNameForKeyValue(K key, V value, Configuration conf),即通过key和value及conf配置信息决定文件名(含扩展名)。代码如下。

public static class AlphabetOutputFormat extends MultipleOutputFormat<Text, IntWritable> {  
        @Override  
        protected String generateFileNameForKeyValue(Text key, IntWritable value, Configuration conf) {  
            char c = key.toString().toLowerCase().charAt(0);  
            if (c >= 'a' && c <= 'z') {  
                return c + ".txt";  
            }  
            return "other.txt";  
        }  
    }  

 job.setOutputFormatClass(AlphabetOutputFormat.class);//设置输出格式  

(4) Map任务经验

    确定map任务数时依次优先参考如下几个原则

  • 每个map任务使用的内存不超过800M,尽量在500M以下。
  • 每个map任务运行时间控制在大约20分钟,最好1-3分钟。
  • 每个map任务处理的最大数据量为一个HDFS块大小(目前为256MB,默认64M,需要调整),一个map任务。
  • map任务总数不超过平台可用的任务槽位
  • map个数为split的份数
  • 压缩文件不可切分
  • 非压缩文件和sequence文件可以切分
  •  dfs.block.size决定block大小,默认64m 可根据数据量设置为128M或者512M

      输入分片,InputSpilt;

      Map任务执行时间 mapred.task.timeout 默认600秒。计算密集型任务需要扩大时间,或者定时发送心跳信息。Shuffe:组合器,分区器。

(5) yarn container 内存和CPU调优

<property>  
    <name>yarn.scheduler.minimum-allocation-mb</name>  
    <value>256<alue>  
</property>  
  
<property>  
    <name>yarn.nodemanager.resource.memory-mb</name>  
    <value>4096</value>  
</property>  

       一个节点上运行的任务数目主要由两个因素决定,一个是NodeManager可使用的资源总量,一个是单个任务的资源需求量,比如一个NodeManager上可用资源为8 GB内存,8 CPU。单个任务资源需求量为1 GB内存,1CPU,则该节点最多运行8个任务。

yarn-site.xml

NodeManager上可用资源是由管理员在配置文件yarn-site.xml中配置的,相关参数如下:
    yarn.nodemanager.resource.memory-mb节点总的可用物理内存量,默认是8192
    yarn.nodemanager.resource.cpu-vcores节点总的可用CPU数目,默认是8

对于任务的相关参数如下:
    yarn.scheduler.minimum-allocation-mb:container最小可申请内存量,默认是1024
    yarn.scheduler.minimum-allocation-vcores:container最小可申请CPU数,默认是1
    yarn.scheduler.maximum-allocation-mb:container最大可申请内存量,默认是8096
    yarn.scheduler.maximum-allocation-vcores:container最大可申请CPU数,默认是4

    yarn.nodemanager.vmem-pmem-ratio   物理内存 与 虚拟内存的比率,每用1M物理内存,默认使用2.1M虚拟内存,如果任务使用虚拟内存超过2.1 ,则会杀掉Container任务,(建议调大);
    yarn.nodemanager.vmem-check-enabled  虚拟内存的检查false掉,默认是true

 mapred-site.xml

    map/reduce的内存配置

    mapreduce.map.memory.mb = (1~2倍) * yarn.scheduler.minimum-allocation-mb     map内存
    mapreduce.reduce.memory.mb = (1~4倍) * yarn.scheduler.minimum-allocation-mb  reduce内存 

(6) MR 任务推测执行

         推测执行(Speculative Execution)是指在集群环境下运行MapReduce,可能是程序Bug,负载不均或者其他的一些问题,导致在一个JOB下的多个TASK速度不一致,比如有的任务已经完成,但是有些任务可能只跑了10%,根据木桶原理,这些任务将成为整个JOB的短板,如果集群启动了推测执行,这时为了最大限度的提高短板,Hadoop会为该task启动备份任务,让speculative task与原始task同时处理一份数据,哪个先运行完,则将谁的结果作为最终结果,并且在运行完成后Kill掉另外一个任务。

        推测执行(Speculative Execution)是通过利用更多的资源来换取时间的一种优化策略,但是在资源很紧张的情况下,推测执行也不一定能带来时间上的优化,推测执行配置mapred-site.xml  

         mapreduce.map.speculative:如果为true则Map Task可以推测执行,即一个Map Task可以启动Speculative Task运行并行执行,该Speculative Task与原始Task同时处理同一份数据,谁先处理完,则将谁的结果作为最终结果。默认为true。

         mapreduce.reduce.speculative:同上,默认值为true。

        mapreduce.job.speculative.speculative-cap-running-tasks:能够推测重跑正在运行任务(单个JOB)的百分之几,默认是:0.1。

        mapreduce.job.speculative.speculative-cap-total-tasks:能够推测重跑全部任务(单个JOB)的百分之几,默认是:0.01。

         mapreduce.job.speculative.minimum-allowed-tasks:可以推测重新执行允许的最小任务数。默认是:10。

首先,mapreduce.job.speculative.minimum-allowed-tasks和mapreduce.job.speculative.speculative-cap-total-tasks * 总任务数,取最大值。

然后,拿到上一步的值和mapreduce.job.speculative.speculative-cap-running-tasks * 正在运行的任务数,取最大值,该值就是猜测执行的运行的任务数

        mapreduce.job.speculative.retry-after-no-speculate:等待时间(毫秒)做下一轮的猜测,如果没有任务,推测在这一轮。默认:1000(ms)

        mapreduce.job.speculative.retry-after-speculate:等待时间(毫秒)做下一轮的猜测,如果有任务推测在这一轮。默认:15000(ms)

        mapreduce.job.speculative.slowtaskthreshold:标准差,任务的平均进展率必须低于所有正在运行任务的平均值才会被认为是太慢的任务,默认值:1.0。

4.总结

(1) InputFormat

InputSpilt和RecordReader是InputFormat的关键属性,有两个关键的方法,getSpilt方法将数据文件切分成spilt,spilt默认大小为hdfs的块大小(64M),getRecordReader将spilt解析成records,再将record解析成<Key,Value>。常见的InputFormat有如下:

       TextFileInputFormat,NLineInputFormat,KeyValueTextFileInputFormat,SequeceFileInputFormat。其中TextFileInputFormat的key为文件偏移量,value为每一行的数据。

(2) hadoop partitioner

        hadoop默认的分区算法,[key.hashcode]%[reduce数量]

(3) Mapreduce优化

        问题:

         1、数据倾斜。2、map和reduce个数设置不合理,reduce等待过多。 3、spill次数过多,merge次数过多。

         解决办法

          1.数据倾斜,partioner设置不合理,需要把key数量比较多数据均分到不同reduce上去。

         2.设置map.reduce共存,map运行到一定程度后,reduce也开始运行,下面就是map运行多少百分比后启动redcue。

               默认  mapreduce.job.reduce.slowstart.completedmaps=0.05.

                建议设置为0.8,map完成80%后才开始reduce copy

         3.合并小文件,减少map数量。减少spill次数,增大 map输出内存缓冲区(io.sort.mb)及spill内存上限(sort.spill.percent),减少文件溢出次数,从而减少磁盘 IO。

     4.减少merge次数。通过调整Reduce Task中合并小文件时,一次合并的文件的个数(mapreduce.task.io.sort.factor )。来减少merge次数。

(4)次排序 

(5)join

         map side jon (将小表加载到每个map内存中,输出大表中存在的key)。

         reduce side join (将相同key发送到reduce,在reduce端对两个表的数据进行join)

         semi jon(map端过滤掉不需要join的数据)。

(6)调度器

         容量调度器(Capacity Scheduler)  配置每个队列的容量百分比,同一个队列的任务排队等待,不同队列的job同时运行在yarn 

         公平调度器 (Fair Scheduler)   每个队列的job按照缺额公平调度job,缺额:理想计算资源和实际计算资源的差距,缺额越大,优先级越高,越先执行。不同队列的job公平调度。

    

 

 

附录

CentOS安装ClouderManager和hadoop组件

http://blog.csdn.net/zzq900503/article/details/52982828

自定义安装Haoop和Spark

 http://wuchong.me/blog/2015/04/04/spark-on-yarn-cluster-deploy/

© 著作权归作者所有

满小茂
粉丝 79
博文 121
码字总数 136849
作品 0
成都
程序员
私信 提问
大数据经典学习路线(及供参考)之 一

1.Linux基础和分布式集群技术 学完此阶段可掌握的核心能力: 熟练使用Linux,熟练安装Linux上的软件,了解熟悉负载均衡、高可靠等集群相关概念,搭建互联网高并发、高可靠的服务架构; 学完此...

柯西带你学编程
2018/05/22
0
0
大数据教程(8.1)mapreduce核心思想

上一章介绍了hadoop的HDFS文件系统的原理及API使用。本章博主将继续对hadoop的mapreduce编程框架进行分享。 mapreduce原理篇 mapreduce是一个分布式运算程序的编程框架,是用户开发“基于had...

em_aaron
2018/11/19
68
0
【hadoop】16.MapReduce-简介

简介 本章节我们先来了解一些关于MapReduce的理论知识。从本章节您可以学习到:MapReduce的相关知识。 1、概念 Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析...

Areya
01/12
19
0
阿里云 E-MapReduce产品优势及使用场景

E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 产品优势: 与自建集...

凹凹凸曼
2018/09/12
0
0
大数据MapReduce 编程实战

MapReduce 编程实战 一、大数据的起源 1、举例:(1)商品推荐 问题1:大量订单如何存储? 问题2:大量订单如何计算? (2)天气预报: 问题1:大量的天气数据如何存储? 问题2:大量的天气数...

我叫大兄弟
2018/05/06
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
今天
1K
12
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
今天
22
0
计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
17
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
29
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部