文档章节

【译文】MapReduce:大型集群上的简化数据处理

Gaussic
 Gaussic
发布于 2015/03/17 11:57
字数 5447
阅读 1120
收藏 0
点赞 0
评论 0

【译文】MapReduce:大型集群上的简化数据处理

      作者:Jeffrey Dean 和 Sanjay Ghemawat

摘要:

      MapReduce是一个编程模型,以及处理和生成大型数据集的一个相关实现,它适合各种各样的现实任务。用户指定计算的map和reduce函数。底层运行系统自动地将大规模集群机器间的计算并行化,处理机器故障,以及调度机器间通信以充分利用网络和磁盘。程序员会发现这个系统很好使用:在过去的去年中,超过一万个不同的MapReduce程序已经在Google内部实现,平均每天有十万个MapReuce作业在Google集群上被执行,每天总共处理20PB以上的数据。

1 简介

      在MapReduce开发之前,作者和其他许多的Google员工实现了数以百计的处理大量原始数据(如抓取到的文档、Web请求日志等等)的专用计算方法,以计算各种导出的数据,如倒排索引、Web文档图结构的各种表示、每个host抓取到的页面数的总结、某一天最频繁的一组查询。大多数这样的计算在概念上是非常简单的,然而它们的输入数据量通常非常大。为了在合理的时间内完成这些计算,它们必须分布到成百上千的机器上。如何并行化计算,分发数据,以及处理故障,这些问题结合起来,往往会让程序员使用大量复杂代码来处理,而掩盖了原本简单的计算。

      为了应对这一复杂性,我们设计了一个新的抽象,它允许我们表达试图执行的简单计算,但将并行化、容错、数据分布和负载均衡等凌乱的细节隐藏到了库中。这个抽象的灵感来源于出现在Lisp和许多其他函数式语言中的map和reduce原语。我们实现了大部分的计算,包括为输入的每一个逻辑记录应用一个map操作以计算一组中间键值对,然后对所有共享同一个键的值应用一个reduce操作以恰当地结合导出的数据。此函数式模型支持用户自定义map和reduce操作,使我们能非常容易地并行处理大型计算,和使用再执行(reexecution)作为主要的容错机制。

      这项工作的主要贡献就是一个简单而强大的接口,它完成自动并行化、大规模分布计算,结合该接口的一个实现在大型商用PC集群上获得了很高的性能表现。该编程模型还可以用于同一台机器上多个核心间的并行计算。

      第2部分描述了基本的编程模型并给出几个例子。第3部分描述了MapReduce接口专门针对基于集群的计算环境的一个实现。第4部分描述了我们发现的这个编程模型的几个很有用的改进(refinements)。第5部分描述了对各种不同任务的实现的性能度量。第6部分探索了MapReduce在Google中的应用,包括使用它作为重写我们的生产索引系统的基础的一些经验。第7部分讨论了相关和未来的工作。

2 编程模型

      这个计算需要一组输入键/值对,并生成一组输出键/值对。MapReduce库的使用者将计算表达为两个函数:map和reduce。

      map,由用户编写,需要一对输入并生成一组中间键/值对。MapReduce库将所有与相同键值 I 相关联的值组合到一起,并将它们传递给reduce函数。

      Reduce函数,同样由用户编写,接受中间键 I 和这个键的一组值。它将这些值合并以形成一组可能更小的值。通常每次reduce调用只生成0个或1个输出值。中间值靠一个迭代器提供给用户的reduce函数。这使我们能够处理大量太大以至于不能装入内存的值列表。

2.1 例子

      考虑一下在一个巨大的文档集合中统计每个单词出现次数的问题。使用者会编写与下面伪代码类似的代码:

map(String key, String value);
// key: document name
// value: document contents
for each word w in value:
    EmitIntermediate(w, "1");
    
reduce(String key, String values);
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
    result += ParseInt(v);
Emit(AsString(result));

      map函数发出每个单词加一个相关的出现次数(count)(在这个简单例子中仅为1)。reduce函数对发给一个单词的所有数(count)求和。

      此外,用户编写代码将输入和输出文件名以及可选的调优参数填入mapreduce规范对象中。然后调用MapReduce函数,将它传递给规范对象。用户的代码与MapReduce库(C++实现)相连接。我们最初的MapReduce资料中有这个例子的完整程序【8】。

2.2 类型

      尽管前面的伪代码是按照输入输出字符串形式编写的,概念上由用户提供的map和reduce函数是有相关类型的。

map        (k1, v1)        --> list(k2, v2)
reduce     (k2, list(v2))  --> list(v2)

      也就是说,输入键和值与输出键和值来自不同的域。此外,中间键和值与输出键和值来自同一个域。

3 实现

      MapRedue接口的许多不同实现都是可能的。正确的选择取决于环境。例如,一种实现可能适合一个小型的共享内存的机器,另外一种可能适合一个大型的NUMA多处理器,而另外一种可能适合一个更大的联网计算机集合。在我们最初的文章发表以后,已经发展出了很多MapReduce的开源实现【1, 2】,MapReduce在各种问题领域的适用性也得到了研究【7, 16】。

      这一部分描述了我们的一种MapReduce实现,其目标是目前广泛应用在Google中的计算环境:由交换千兆以太网连接在一起的大型PC集群【4】。在该环境中,机器通常运行Linux系统,有双核 x86 处理器以及4-8GB内存。个别机器拥有1GB/s的网络带宽,但每台机器等分的带宽远远低于1GB/s。一个计算集群包含了成千上万台机器,因此机器故障是很常见的。存储由直接附在单独机器上的廉价IDE磁盘提供。GFS,Google内部开发的一个分布式文件系统【10】,用来管理存储在这些磁盘上的数据。文件系统使用复制来提供不可靠的硬件之上的可用性与可靠性。

      使用者提交 jobs 给调度系统。每个 job 包含一组任务,且由调度程序映射(mapped)到集群间的一组可用的机器上。

3.1 执行概述

      通过自动将输入数据分割为一个有M个分裂(splits)的组,map调用分布在多台机器间。输入分裂可以由不同的机器并行处理。reduce调用通过利用分割函数(比如,hash(key) mod R)将中间键空间划分为R片进行分布。分割数R和分割函数都是由使用者指定的。

      图1展示了在我们的实现中MapReduce操作的整体流程。当用户程序调用MapReduce函数,以下顺序行为将会发生(图1中标记的数字对应下面列中的数字)。

 1. 用户程序中的MapReduce库首先将输入文件划分为M片,通常每片16~64MB(由用户通过可选参数控制)。然后启动集群中程序的多个副本。

 2. 这些程序副本中有一个特殊的master副本。其他副本则是由master分配了work的workers。集群中需要分配 M 个 map tasks 和 R 个 reduce tasks。master挑选闲置的workers且为每个worker分配一个 map task 或 reduce task。

 3. 分配了 map task 的一个worker读取相应输入划分的内容。它从输入数据中解析出键/值对并将每一对传递给用户定义的map函数。由map函数产生的中间键/值对缓冲在内存中。

 4. 缓冲区的键/值对定期地写入本地磁盘,由partition函数划分到 R 个区域中。这些本地磁盘上的缓冲对的位置被传递会master,它将负责转发这些位置给 reduce workers。

 5. 当一个 reduce worker 被 master 通知了这些位置后,它使用远程进程调用来读取来自map workers的本地磁盘中的缓冲数据。当reduce worker读取到了所有分区中的中间数据后,它按照中间键将其排序,从而使所有相同键的出现次数组合在了一起。排序是必要的,因为通常很多不同的键被map到了同一个reduce task。如果中间数据太大以至于不能放在内存中,还需要使用一个外部的排序。

 6. reduce worker对排序好的中间数据执行迭代,对每个唯一的中间键,它将这个键以及相应的一组中间值传递个用户的 reduce 函数。reduce 函数的输出被附加到这个reduce分区的最终输出文件中。

 7. 当所有的 map tasks 和 reduce tasks 都完成后,master唤醒用户程序。在这一点上,用户程序的MapReduce调用返回到用户代码处。


       成功完成后,mapreduce执行的输出可以在R个输出文件中获得(每个reduce task一个,由用户指定文件名)。通常,用户无需将这R个输出文件合并到一个文件中;他们通常将这些文件作为另一个MapReduce调用的输入,或者在来自另外一个可以处理划分到了多个文件中的输入的分布式应用程序中使用它们。

3.2 master数据结构

       master中有多种数据结构。对每一个map task和reduce task,它存储了其状态信息(限制,进行,或完成)和worker机器的身份(对于非闲置tasks)。

       master是map tasks传播中间文件区域位置到reduce tasks的导管。因此,对于每个完成了的map task,master存储由这个map task生成的R个中间文件区域的位置和大小。master在map tasks称后接收到这些位置和大小信息的更新。这些信息将逐步推送到正在进行reduce tasks的workers中。

3.3 容错

        由于MapReduce库旨在帮助利用成百上千的机器来处理大量数据,它必须优雅地容忍机器故障。

处理worker故障

        master会定期地ping每一个worker。如果在一定时间内没有收到来自某台worker的响应,master将这个worker标记为故障。任何由worker完成的map tasks都被重置为初始闲置状态,因而可以在其他的workers中调度。同样,在故障worker上的任何正在进行的map task和reduce task也被重置为闲置状态以便进行重新调度。

       故障worker上已完成的map task需要重新执行,因为它们的输出存储在了故障机器的本地磁盘中导致无法访问。已完成的reduce tasks无需重新执行,因为它们的输出存储在了全局文件系统中。

       当一个map task首先由worker A执行然后又由worker B执行(因为A发生了故障),所有执行reduce task的workers将被通知重新执行。任何还未从worker A读取数据的reduce task将从worker B读取数据。

       MapReduce适应于大规模的worker故障。例如,在一个MapReduce操作中,在运行中的集群上的网络维护导致了一组80台机器在几分钟内无法到达。MapReduce master简单地重新执行无法到达的worker机器的工作且继续前进,最终完成MapReduce操作。

语义失败

       当用户提供的map和reduce操作是它们他们的输入值的特定函数时,我们的分布式实现生成的输出将与整个程序的无错顺序执行生成的输出相同。

       我们依靠map和reduce任务输出的原子的提交来实现这一性质。每个正在进行的task将其输出写入私有临时文件中。一个reduce task生成一个这样的文件,map task生成R个这样的文件(每个reduce task一个)。当一个map task完成后,worker发送一条消息给master,这条消息中包含了R个临时文件的名字。如果master接收到了来自一个已完成的map task的完成消息,它将忽略这条消息。否则,它将这R个文件名记录到master数据结构中。

       当一个reduce task完成后,reduce worker自动重命名其临时输出文件为最终输出文件。如果同一个reduce task在多台机器上执行,同一个最终输出文件的多个重命名调用将被执行。我们依靠由底层文件系统提供的原子的重命名操作来保证最终文件系统状态仅包含来自一个reduce任务执行生成的数据。

       绝大多数的map和reduce操作是确定的,事实上,我们的语义等价于这种情况下的一次顺序执行,这使得程序员能够非常容易地推断程序的行为。当 map 和/或 reduce 操作不确定时,我们提供了较弱但仍然合理的语义。在不确定操作存在时,一个特定reduce task R1的输出等价于由非确定性程序的一次顺序执行R1生成的输出。然而,另一个不同的reduce task R2的输出可能对应该非确定性程序的另一个不同顺序执行R2的输出。

       考虑map task M和reduce task R1和R2。令 e(Ri) 作为作为R1的执行(这确实是一个这样的执行)。较弱的语义出现因为 e(R1) 可能读取了M的一次执行生成的输出,e(R2)可能读取了M的另一次执行生成的输出。

3.4 局部性

       在我们的计算环境中,网络带宽是一个相对稀缺的资源。我们靠充分利用输入数据(由GFS管理【10】)存储在组成集群的机器的本地磁盘中这一事实来节省网络带宽。GFS将每个文件分成64MB的块且在不同机器上存储了每个块的多个副本(通常3个)。MapReeuce master考虑每个输入文件的位置信息且试图调度一台含有相应输入数据的机器上的一个map task。如果失败,它将试图调度与该任务的输入的复制品相邻的一个map task(例如,同一网络交换机中包含相同数据的两台机器)。当在一个集群的 workers 重要部分运行大型MapReduce操作时,大多数输入数据都是本地读取的,并不消耗网络带宽。

3.5 Task粒度

       我们将map阶段细分为M个片段,reduce阶段细分为R个片段,如前所述。理想情况下,M和R应该远高于worker机器的数量。每个worker执行多个不同tasks改善了负载均衡,且当一个worker故障后加快了恢复速度:它完成的多个map tasks可以分布到所有其他worker机器上重新执行。

       由于master必须做O(M+R)此调度决策和在内存中保持O(M*R)个状态,如前所述,在我们的实现中M和R的数量大小是有实际界限的。(然而,内存的使用量很小。O(M*R)个状态中大约包含每个map/reduce task对一字节的数据。)

       此外,R通常受到用户限制,因为每个reduce task的输出最终保存在一个单独的输出文件中。在实践中,我们倾向于选择M因而每个独立task大约有16MB到64MB的输入数据(因而之前所述的局部优化达到最搞笑),且我们让R是我们希望使用的机器数量的一个小的倍数。我们通常以M=200000, R=5000,使用2000台worker机器执行MapReduce。

3.6 备份Tasks

       延长MapReduce操作总时间的一个普遍原因是一个掉队者(straggler),也就是说,在这个计算中有一台机器花了异常长的时间来完成最后几个map或reduce tasks。掉队者会以一大堆的理由出现。比如说,一台拥有坏磁盘的机器可能经历频繁的矫正错误从而使读取性能从30MB/s降低到了1MB/s。集群调度系统可能在这个机器上调度了其他任务,导致它更慢地执行MapReduce代码,由于竞争CPU、内存、本地磁盘或网络带宽等资源。我们经历的一个最近的问题是机器初始化代码中的一个bug导致处理器缓存失效:受影响的机器计算速度放慢了100倍。

       我们有一个通用机制来减轻掉队者问题。当一个MapReduce操作接近完成时,master将调度还在进行的任务的备份执行。无论是原始或者备份执行完成,这个任务都被标记为完成。我们调整了这个机制,因而它增加了该计算的计算资源的使用,但不超过几个百分点。我们发现它大大降低了完成大型MapReduce操作的时间。作为一个例子,当没有备份task机制时,在5.3部分描述的排序程序多花了44%的时间完成。

4 改进

       虽然由简单编写的map和reduce函数提供的基本功能已足以满足大多数需求,我们发现了一些有用的扩展。这包括:

 • 用户指定的分区(partition)函数来决定如何将中间键值对映射到R个reduce碎片;

 • 排序保证:我们的实现保证这R个reduce分区中的每个,中间键值对都按键的升序处理;

 • 用户指定的结合(combiner)函数的作用是,在同一个map task内,对按照同一个键生成的中间值进行局部结合,以减少必须在网络间传输的中间数据数量;

 • 自定义输入输出类型,为了读新的输入格式和生成新的输出格式;

 • 在单机上执行简单debug和小规模测试的一种方式。

       在【8】中有对这几项的详细讨论。

5 性能表现

        在此部分,我们利用大型集群上的两个计算来测量MapReduce的性能表现。一个计算通过搜索大约1TB的数据来找到一个特定的模式。另一个计算对大约1TB的数据进行排序。这两个程序代表由MapReduce用户编写的真正程序的一个大的子集-----程序的一个类用来从一个表示(representation)向另一个表示shuffle数据,另一个类从大数据集中提取小部分关注的数据。

5.1 集群配置

       所有程序都在一个拥有大约1800台机器的集群上执行。每台机器拥有两个支持超线程的2GHz的Intel Xeon处理器,4GB内存,两个160GB的IDE磁盘,和千兆以太网接入。这些机器被安排在一个二级树形的交换网络中,该网络根部大约有100~200Gbps的聚合带宽。所有机器都在同一个托管设施中,因此任何一对机器间的往返通信时间不超过1毫秒。

       虽然有4GB内存,但是大约1~1.5GB保留给了运行在集群上的其他任务。这些程序在一个周末的下午执行,此时CPUs,磁盘和网络带宽基本都空闲。

5.2 Grep

      grep程序扫描了10^10个100字节的记录,搜索一个相对稀有的三字符模式串(该模式串大约出现在92337个记录中)。输入被划分为了大约64MB大小的片(M=15000),整个输出都放在了一个文件中(R=1)。

       图2展示了计算随时间推移的进展。Y轴显示了输入数据的扫描速率。随着安排到MapReduce计算的机器越来越多,速率也在逐步提升,当安排了1764个workers时速度达到峰值30GB/s以上。map任务结束后,速率来时下降且在大约80秒时到达0。整个计算从开始到结束大约花费了150秒。这包括1分钟的启动消耗。这个消耗来自向所有workers机器传播程序、延迟与GFS的交互以开启一组1000个输入文件,和获取局部优化所需的信息。

© 著作权归作者所有

共有 人打赏支持
Gaussic
粉丝 384
博文 28
码字总数 66788
作品 0
宝山
[HCNA Cloud]FusionInsight架构与原理

大数据是指无法再一定时间内用常规软件工具对其内容进行抓取、管理和处理的数据集合。 Yarn是Hadoop2.0中的资源管理系统,它是一个通用的资源管理模块,可为各类应用程序进行资源管理和调度。...

Grodd ⋅ 04/25 ⋅ 0

HADOOP集群MAPREDUCE原理篇

Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架; Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程...

weixin_40747272 ⋅ 04/16 ⋅ 0

大数据开发 | MapReduce介绍

1. MapReduce 介绍 1.1MapReduce的作用 假设有一个计算文件中单词个数的需求,文件比较多也比较大,在单击运行的时候机器的内存受限,磁盘受限,运算能力受限,而一旦将单机版程序扩展到集群...

嘿你好夏天 ⋅ 04/18 ⋅ 0

大数据经典学习路线(及供参考)之 一

1.Linux基础和分布式集群技术 学完此阶段可掌握的核心能力: 熟练使用Linux,熟练安装Linux上的软件,了解熟悉负载均衡、高可靠等集群相关概念,搭建互联网高并发、高可靠的服务架构; 学完此...

柯西带你学编程 ⋅ 05/22 ⋅ 0

谷歌三篇论文之二---MapReduce

MapReduce:超大机群上的简单数据处理 MapReduce是一个编程模型,和处理、产生大数据集的相关实现。用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集。然后再指定一个r...

qq_37111953 ⋅ 04/14 ⋅ 0

架构精讲: Hadoop技术框架和架构演进方向

Apache Hadoop是一个开源软件框架,可安装在一个商用机器集群中,使机器可彼此通信并协同工作,以高度分布式的方式共同存储和处理大量数据。最初,Hadoop 包含以下两个主要组件: Hadoop Dist...

btb5e6nsu1g511eg5xeg ⋅ 05/15 ⋅ 0

【Hive】Hive介绍及Hive环境搭建

1、Hive简介 Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射成一张表,并提供类SQL查询功能。Hive是由Facebook开源用于解决海量结构化日志的数据统计的工具。 在Hadoop生态...

gongxifacai_believe ⋅ 04/27 ⋅ 0

Hadoop、MapReduce、YARN和Spark的区别与联系

(1) Hadoop 1.0 第一代Hadoop,由分布式存储系统HDFS和分布式计算框架MapReduce组成,其中,HDFS由一个NameNode和多个DataNode组成,MapReduce由一个JobTracker和多个TaskTracker组成,对应...

cuiyaonan2000 ⋅ 05/08 ⋅ 0

【数据蒋堂】第48期:Hadoop中理论与工程的错位

Hadoop是当前重要的大数据计算平台,它试图摒弃传统数据库的理念,重新构建一套新的大数据体系。但是,这并不是件很容易的事,在Hadoop的设计和实现中能看到一些先天不足的地方,其中一点就是...

润乾软件 ⋅ 04/17 ⋅ 0

MapReduce 实验 (一) 原理

官网 http://hadoop.apache.org/ hadoop三大组件 HDFS:分布式存储系统 https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html MapReduce:分布式计算......

pcdog ⋅ 04/15 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

三步为你的App集成LivePhoto功能

摘要:LivePhoto是iOS9新推出的一种拍照方式,类似于拍摄Gif图或录制视频片段生成图片。如果没有画面感,可以联想《哈利波特》霍格沃茨城堡的壁画,哈哈,很炫酷有木有,但坑爹的是只有iphone6S以...

壹峰 ⋅ 16分钟前 ⋅ 0

centos7 git安装

由于centos中的源仓库中git不是最新版本,需要进行源码安装。 1、查看yum仓库git信息 [root@iZm5e3d4r5i5ml889vh6esZ zh]# yum info gitLoaded plugins: fastestmirrorLoading mirror s...

xixingzhe ⋅ 25分钟前 ⋅ 0

input file 重复上传同一张图片失效的解决办法

解决办法 方法一:来回切换input[type='file']的type属性值,可以是‘text’,'button','button'....,然后再切换回来‘file’ 方法二:每次取消图片预览后,重置input[type='file']的value的...

时刻在奔跑 ⋅ 26分钟前 ⋅ 0

Mahout推荐算法API详解

前言 用Mahout来构建推荐系统,是一件既简单又困难的事情。简单是因为Mahout完整地封装了“协同过滤”算法,并实现了并行化,提供非常简单的API接口;困难是因为我们不了解算法细节,很难去根...

xiaomin0322 ⋅ 31分钟前 ⋅ 0

WampServer默认web服务器根目录位置

安装WampServer之后的web服务器根目录默认位置在WampServer安装目录下的www:

临江仙卜算子 ⋅ 32分钟前 ⋅ 0

Redux的一些手法记录

Redux Redux的基本概念见另一篇文。 这里记录一下Redux在项目中的实际操作的手法。 actions 首先定义action.js,actions的type,可以另起一个action-type.js文件。 action-type.js用来存...

LinearLaw ⋅ 33分钟前 ⋅ 0

android 手势检测(左右滑动、上下滑动)

GestureDetector类可以让我们快速的处理手势事件,如点击,滑动等。 使用GestureDetector分三步: 1. 定义GestureDetector类 2. 初始化手势类,同时设置手势监听 3. 将touch事件交给gesture...

王先森oO ⋅ 47分钟前 ⋅ 0

java 方法的执行时间监控 设置超时(Future 接口)

java 方法的执行时间监控 设置超时(Future 接口) import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor......

青峰Jun19er ⋅ 52分钟前 ⋅ 0

一名开源小白的Apache成长自述

今天收到了来自Apache Vote我成为Serviceomb项目Committer的邮件,代表自己的贡献得到了充分的肯定;除了感谢团队的给力支持,我更希望将自己的成长经历——如何践行Apache Way的心得介绍给大...

微服务框架 ⋅ 54分钟前 ⋅ 0

vim介绍、颜色显示和移动光标、一般模式下复制、剪切和粘贴

1.vim 是 vi 的升级版 vim 是带有颜色显示的 mini安装的系统,一般都不带有vim [root@aminglinux-128 ~]# yum install -y vim-enhanced已加载插件:fastestmirror, langpacksLoading mir...

oschina130111 ⋅ 54分钟前 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部