文档章节

Hadoop MapReduce Overview

manonline
 manonline
发布于 2017/07/22 17:56
字数 1628
阅读 0
收藏 0
点赞 0
评论 0

MapReduce Job

Overview

  • Client -> Submit Job -> YARN Resource Manager -> YARN Node Manager -> Application Master and Tasks
    • MapReduce Application Master
    • MapReduce Tasks
    • HDFS
      • JobResources : Jar, configuration, input splits, etc
      • Input Files
      • Output Files

Lifecycle

  • Job Submission (Client -> Yarn Resource Manager)
  • Job Initialization (Yarn Resource Manager -> Yarn Node Manager)
    • Scheduler -> Container -> Application Master
    • Application Master.MRAppMaster
      • bookkeeping objects: for the status update;
      • get input splits
      • determine map and reduce tasks
        • map tasks = splits
        • reduce tasks = mapreduce.job.reduce;
      • Tuning
        • map task: 1 minute;
        • reduce task: 5 minutes;
  • Task Assignment (Application Master-> Node Manager -> Tasks)
    • Data Locality, Map Tasks first
    • Resource Requirement
      • mapreduce.map.memory.mb
      • mapreduce.map.cpu.vcores
      • mapreduce.reduce.memory.mb
      • mapreduce.reduce.cpu.vcores
    • container allocation
  • Task Execution (Node Manager)
    • YarnChild()
      • Localize the resources/files;
      • Run map tasks and reduce tasks;
    • Progress & Status Update
      • Client -> Polling/getJobStatus() -> Application Master <- YarnChild <- Report <- Task
      • Client Polling from Application Master (every second or getJobStatus())
      • Tasks Report progress to Application Master
  • Job Completion
    • Clean up intermedia tasks
  • Retrieving the Results
    • each Reduce task produce one part

Code Highlight

  • ToolRunner.run(new MapReduceApp(), args)
  • MapReduceApp.class
    • Handle passed-in args (Configuration)
    • Setup Job
      • setJarByClass()/setJar : jar to be deployed in cluster;
      • MapReduceConfiguration() : configure MapReduce tasks;
        • InputFormat
        • MapperClass
        • ReducerClass
        • OutputKeyClass
        • OutputValueClass
        • OutputFormat
    • Run Job
      • waitForCompletion() : submit/run the job and poll the progress;

Failures

Task Failure

  • Detection
    • Tasks report back failure
    • Node Manager report back failure (for instance JVM failure)
    • Timeout (No progress report from tasks in 10 minutes)
  • Handling
    • retry up to 4 times;
    • fail the whole jobs after 4 times;

Application Master Failure

  • Detection
    • Application master sends heartbeats to Resource Manager;
    • mapreduce.am.max-attempts
  • Handling
    • Resource Manager spin up a new application master
    • recover the state of the tasks : yarn.app.mapreduce.am.job.recovery.enable
    • client switch to the new application master
      • due to time out getting an update from the previous, the client will check with resource manager to get a new one.

Node Manager Failure

  • Detection :
    • Node Manager fails to send heartbeats to Resource Manager;
    • yar.resourcemanager.nm.liveness-monitor.expiry-interval.ms
  • Handling :
    • Recover Application Master and Tasks (as above)

Resource Manager Failure

  • HA Solution + Zookeeper

InputFormat

Take the input and produce split for map tasks; The mapping between split and map task is on one one;

  • Raw File -> InputFormatClass -> K1, V1
    • Client Calculate the Splits by using
      • InputFormat.getSplits() : split = (size of the split, reference to the data)
    • The client sends Splits to Application Master, Split is just a reference to the data, rather thant the data itself;
    • Application Master schedule Map tasks
    • Map task process Split by invoking 
      • InputFormat.createRecordReader()
      • Map.run() take one record and invoke Map.map() with K1, V1
//-------------InputFormat.java-----------
public abstract class InputFormat<K, V> {
    public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;

    public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}

//-------------Mapper.java-----------------
// Mapper.run()
public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    // context.nextKeyValue will delegate the process to RecordReader
    // which is crated by InputFormat.createInputRecord();
    while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
}

// Mapper.map()
protected void map(KEYIN key, VALUEIN value, Context context) {
    // map processing based on given key and value;
    context.write((KEYOUT) key, (VALUEOUT) value);
}

FileInputFormat

  • Specify the to-be-processed files, i.e. input files;
  • Specify the how to splits the input files - max(minsize, min(maxsize, blocksize))
    • mapreduce.input.fileinput.format.split.minsize
    • mapreduce.input.fileinput.format.split.maxsize
    • dfs.blocksize
  • Preventing Split
    • make minSize > fileSize
    • subclass overrides isSplitable() to return false;
  • Processing Whole File as a Record
    • isSplitable = false
    • createRecordReader -> use WholeFileRecordReader();
    • Can be used for PACKAGING small fiels into SEQUENCE files;
      • key = original files
      • value = content of the file

CombineFileInputFormat

  • generates one split per node, which may be made of multiple blocks.
  • decouples amount of data that a mapper consumers from the block size.
  • GOOD for processing many small files.

Text Input

  • key = byte offset of the whole file (i.e. global);
  • value = line content;
    • line is not possible to track, since the file is broken into split
  • lines won't be broken in a split
    • lines is broken in block;
    • there is some data transfer to make up the split, but very minimal.

KeyValueTextInputFormat

  • key=first part in a line;
  • value=second part a line;
  • where first part and 2nd part are separated by
    • mapreduce.input.keyvaluelinerecordreader.key.value.separator (default TAB)

NLineInputFormat

  • key=empty
  • value=line content
  • N=how many lines in a split
    • mapreduce.input.lineinputformat.linespermap (default to 1)

XML

  • org.apache.hadoop.streaming.mapreduce.StreamXmlRecordReader

Binary Input

SequenceFileInputFormat

  • Stores sequences of binary key-value pairs.

SequenceFileAsTextInputFormat

  • Converts binary key-value pairs to Text key-value pairs as the input to Mapper, which is done via toString()

SequenceFileAsBinaryInputFormat

  • Binary key-value pairs to Mapper, Mapper to interpret.

FixedLengthInputFormat

  • Reading fixed-width binary records from a file.

Multiple Inputs

  • Allow developers to specify InputFormat and Mapper for each input directory
MultipleInputs.addInputPath(job, inputPathA, TextInputFormat.class, MaxTemperatureMapper.class);

MultipleInputs.addInputPath(job, inputPathB, TextInputFormat.class, AnotherTemperatureMapper.class);

Database Input

  • DatabaseInputFormat is an input format for reading data from a relational database, using JDBC
  • Sqoop

OutputFormat

Text Output

  • Default one with the default separater as TAB

Binary Output

  • SequenceFileOutputFormat
  • SequenceFileAsBinaryOutputFormat
  • MapFileOutputFormat

Multiple Outputs

The default behavior is that each Reducer produces one output file, and files are named by the partition number: part-r-00000, part-r-00001, and so on.

MultipleOutputs allows you to write data to files whose names are derived from the output kyes and values or in fact from an arbitrary string.

MapReduce

Data Type

Mapper

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public class Context extends MapperContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        // ...
    }
    public void map(KEYIN key, VALUE value, Context context) throws IOException, InterruptedException {
        // ...
    }
}

Reducer

public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public class Context extends ReducerContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        // ...
    }
    public void reduce(KEYIN key, VALUE value, Context context) throws IOException, InterruptedException {
        // ...
    }
}

Data Flow

  • Mapper : (K1, V1) -> list(K2, V2)
  • Group/Sort : (K2, list(V2)) - handled by framework
  • Reducer : (K2, list(V2)) -> list(K3, V3)
   
InputFormatClass     TextInputFormat.class
MapperClass Mapper.class
MapOutputKeyClass LongWritable.class
MapOutputValueClass Text.class
PartitionerClass HashPartitioner.class
NumReduceTasks 1
ReducerClass Reducer.class
OutputKeyClass LongWritable.class
OutputValueClass Text.class
OutputFormatClass TextOutputFormat.class

Shuffle & Sort

Map Output

  • Map writes output to -> Circular Memory Buffer (100M by Default)
  • Buffer reaches Threshold ->
    • Divides the data into Partitions
    • Perform Sort by Key
    • Perform Combiner function (if any)
    • Spill content to local Disk (mapreduce.cluster.local.dir), i.e. Spill Files;
  • Combiner Function again (if any) and only if > 3 spill files;
    • If <3, potential reduction in map output size is not worth the overhead in invoking combiner.
  • Merge Spill files into A Single Partitioned and Sorted Output file;
  • Compress Output file
    • mapreduce.map.output.compress=true
    • mapreduce.map.output.compress.codec=library
  • Enable HTTP access for Reducer
    • mapreduce.shuffle.max.threads=0 (default 2XCPU on node manager)
  • Notify Application Master using heartbeat mechanism of done and
    • map output and hosts so that reduce task can know the mapping

Reduce Input

  • Reduce task Copy partitions from different nodes (map tasks) that hold its partition
    • mapreduce.reduce.shuffle.parallelcopies=5
    • Spill to disk if mapreduce.reduce.merge.inmem.threshold is met
    • Performer Combiner function (if any)
    • background job to Merge into Sorted files (to save some processing time later)
  • Sort/Merge phase (when all copy is done)
  • Reduce Phase

Reduce Output

  • Setting up and create the temp and final files.

Complex Processing

  • Single but Complex Mapper/Reducer
  • ChainMapper/ChainReducer
  • Multiple Jobs
    • Linear Jobs/Synchronized Jobs
      • runJob(conf1)
      • runJob(conf2)
    • DAG
      • JobControl: setup dependencies, query previous job status/result.
  • Workflow Engine: STORE and RUN workflow
    • Workflow: a collection of jobs (MapReduce, Pig, Hive, etc)
    • Engine: coordinate jobs execution based on predefined schedule and data availability.
    • Action Node: perform the jobs;
    • Control Node: conditional logic or parallel execution;
      • start control node
      • kill control node
      • end control node
    • Tools :
      • Oozie: running workflow as a service rather than on the client side (JobControl). makes job rerun easier without running successful jobs again.
  • Higher-level Language
    • Pig
    • Hive
    • Cascading
    • Crunch
    • Spark

Advanced Features

Counter

  • A useful CHANNEL for gathering statistics about the job. Usually better than logging.
  • Built-in Counters (Grouping)
    • org.apache.hadoop.mapreduce.FileSystemCounter
    • org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
    • org.apache.hadoop.mapreduce.lib.output.FileInputFormatCounter
    • org.apache.hadoop.mapreduce.JobCounter
      • TOTAL_LAUNCHED_MAPS
    • org.apache.hadoop.mapreduce.TaskCounter
      • MAP_INPUT_RECORDS :
        • records read by each map task.
        • sent to the application master periodically.
  • User-Defined Java Counters: 
    • Group Name = enum name; Counter Name = enum field;
    • incremented as desired in the mapper or reducer
    • aggregated globally
enum CounterGroupA {
    Counter1,
    Counter2,
    Countern
}

enum CounterGroupB {
    Counter1,
    Counter2,
    Countern
}

protected void map() {
    // .....
    context.getCounter(CounterGroupA.Counter1).increment(1));

    // ...
    countext.getCounter("DynamicCounter").increment(1);
    
}

Sorting

  • Partial Sort
    • Use Map task only (without Reduce Task), since sort is a built in function.
    • Save to sequence file
    • However, records are stored in each output file, but not across output files.
  • Total Sort
    • Use a Partitioner that respects the total order of the output 
    • Data Distribution / Range has to be calculated to ensure data are evenly distributed, otherwise, it would like to one machine doing too much 
      • loop through the whole dataset: overkill
      • sampling class 
  • Secondary Sort
    • Sorting the values in list(values)
    • Steps
      • The new Key = (Key, Value), so that (Key, Value) pair are sorted;
      • The new Partitioner built based on Key, so that the same (Key, Value) with the same Key will end up on the same Partition and in turn, the same Reducer.
      • Reducer group the values by the OLD key.
      • Comparator needs to be rewritten on both map and reduce tasks

Joins

Rather than writing MapReduce programs, you might consider using a higher-level framework such as Pig, Hive, Cascading, Cruc, or Spark.

Map-Side Joins

  • Two sources need to be sorted by the same key (join key) AND they also need to be partitioned in the same way.
  • Take the result of the above sorting as the input to Map tasks;
    • Use CompositerInputFormat (org.apache.hadoop.mapreduce.join) to join the two input.

Reduce-Side Joins

  • Multiple Inputs to configure different mappers for different source
    • To read different sources
    • The key has to be the JOIN Key + Tag, which is to ensure
      • Data pertaining to the same key gets landed on the same Reducer
      • Tag (comparable tag) to ensure who to be loaded by Reducer first, i.e. who to drive the join.
  • Map Tasks send all the data pertaining to the same join key to the same reducer.
  • Reduce task each time parsing a record
    • get the value iterator
    • compare the current key with all the keys contained in the values (via iterator) and do the join. 

© 著作权归作者所有

共有 人打赏支持
manonline
粉丝 0
博文 30
码字总数 66740
作品 0
Hadoop、MapReduce、YARN和Spark的区别与联系

(1) Hadoop 1.0 第一代Hadoop,由分布式存储系统HDFS和分布式计算框架MapReduce组成,其中,HDFS由一个NameNode和多个DataNode组成,MapReduce由一个JobTracker和多个TaskTracker组成,对应...

cuiyaonan2000 ⋅ 05/08 ⋅ 0

Hadoop2.X的安装与配置(二)本地模式

在上一篇文章中,我们介绍了Hadoop2.X安装与配置前的准备阶段。 在本地模式配置前,首先完成准备阶段。 点击如下链接,进入准备阶段的配置 https://blog.csdn.net/weixin38187469/article/d...

weixin_38187469 ⋅ 04/16 ⋅ 0

Hadoop 2.5.0编译到Apache Hadoop Common失败

[INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary: [INFO] [INFO] Apache Hadoop Main ................................. SUCCES......

尧雪 ⋅ 04/19 ⋅ 0

大数据经典学习路线(及供参考)之 一

1.Linux基础和分布式集群技术 学完此阶段可掌握的核心能力: 熟练使用Linux,熟练安装Linux上的软件,了解熟悉负载均衡、高可靠等集群相关概念,搭建互联网高并发、高可靠的服务架构; 学完此...

柯西带你学编程 ⋅ 05/22 ⋅ 0

大数据之---Yarn伪分布式部署和MapReduce案例

1、软件环境 本次涉及伪分布式部署只是要主机hadoop01,软件安装参考伪分布式部署终极篇 2、配置yarn和mapreduce 3、提交测试jar计算圆周率 job15248048138350001 job命名格式: jobunix时间...

ycwyong ⋅ 05/17 ⋅ 0

Spark与Hadoop的比较(特别说一下 Spark 和 MapReduce比较)

Hadoop和Spark方面要记住的最重要一点就是,它们并不是非此即彼的关系,因为它们不是相互排斥,也不是说一方是另一方的简易替代者。两者彼此兼容,这使得这对组合成为一种功能极其强大的解决...

小海bug ⋅ 06/21 ⋅ 0

大数据MapReduce 编程实战

MapReduce 编程实战 一、大数据的起源 1、举例:(1)商品推荐 问题1:大量订单如何存储? 问题2:大量订单如何计算? (2)天气预报: 问题1:大量的天气数据如何存储? 问题2:大量的天气数...

我叫大兄弟 ⋅ 05/06 ⋅ 0

MapReduce 实验 (一) 原理

官网 http://hadoop.apache.org/ hadoop三大组件 HDFS:分布式存储系统 https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsUserGuide.html MapReduce:分布式计算......

pcdog ⋅ 04/15 ⋅ 0

大数据开发 | MapReduce介绍

1. MapReduce 介绍 1.1MapReduce的作用 假设有一个计算文件中单词个数的需求,文件比较多也比较大,在单击运行的时候机器的内存受限,磁盘受限,运算能力受限,而一旦将单机版程序扩展到集群...

嘿你好夏天 ⋅ 04/18 ⋅ 0

Linux下安装配置Hadoop2.7.6

前提 安装jdk 下载 wget http://mirrors.hust.edu.cn/apache/hadoop/common/hadoop-2.7.6/hadoop-2.7.6.tar.gz 解压 配置 vim /etc/profile # 配置java环境变量 export JAVAHOME=/opt/jdk1.......

晨猫 ⋅ 06/19 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

mysql5.7系列修改root默认密码

操作系统为centos7 64 1、修改 /etc/my.cnf,在 [mysqld] 小节下添加一行:skip-grant-tables=1 这一行配置让 mysqld 启动时不对密码进行验证 2、重启 mysqld 服务:systemctl restart mysql...

sskill ⋅ 昨天 ⋅ 0

Intellij IDEA神器常用技巧六-Debug详解

在调试代码的时候,你的项目得debug模式启动,也就是点那个绿色的甲虫启动服务器,然后,就可以在代码里面断点调试啦。下面不要在意,这个快捷键具体是啥,因为,这个keymap是可以自己配置的...

Mkeeper ⋅ 昨天 ⋅ 0

zip压缩工具、tar打包、打包并压缩

zip 支持压缩目录 1.在/tmp/目录下创建目录(study_zip)及文件 root@yolks1 study_zip]# !treetree 11└── 2 └── 3 └── test_zip.txt2 directories, 1 file 2.yum...

蛋黄Yolks ⋅ 昨天 ⋅ 0

聊聊HystrixThreadPool

序 本文主要研究一下HystrixThreadPool HystrixThreadPool hystrix-core-1.5.12-sources.jar!/com/netflix/hystrix/HystrixThreadPool.java /** * ThreadPool used to executed {@link Hys......

go4it ⋅ 昨天 ⋅ 0

容器之上传镜像到Docker hub

Docker hub在国内可以访问,首先要创建一个账号,这个后面会用到,我是用126邮箱注册的。 1. docker login List-1 Username不能使用你注册的邮箱,要用使用注册时用的username;要输入密码 ...

汉斯-冯-拉特 ⋅ 昨天 ⋅ 0

SpringBoot简单使用ehcache

1,SpringBoot版本 2.0.3.RELEASE ①,pom.xml <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.3.RELE......

暗中观察 ⋅ 昨天 ⋅ 0

监控各项服务

比如有三个服务, 为了减少故障时间,增加监控任务,使用linux的 crontab 实现. 步骤: 1,每个服务写一个ping接口 监控如下内容: 1,HouseServer 是否正常运行,所以需要增加一个ping的接口 ; http...

黄威 ⋅ 昨天 ⋅ 0

Spring源码解析(八)——实例创建(下)

前言 来到实例创建的最后一节,前面已经将一个实例通过不同方式(工厂方法、构造器注入、默认构造器)给创建出来了,下面我们要对创建出来的实例进行一些“加工”处理。 源码解读 回顾下之前...

MarvelCode ⋅ 昨天 ⋅ 0

nodejs __proto__跟prototype

前言 nodejs中完全没有class的这个概念,这点跟PHP,JAVA等面向对象的语言很不一样,没有class跟object的区分,那么nodejs是怎么样实现继承的呢? 对象 对象是由属性跟方法组成的一个东西,就...

Ai5tbb ⋅ 昨天 ⋅ 0

Ubuntu16.04 PHP7.0 不能用MYSQLi方式连接MySQL5.7数据库

Q: Ubuntu16.04 PHP7.0 不能用MYSQLi方式连接MySQL5.7数据库 A: 执行以下2条命令解决: apt-get install php-mysql service apache2 restart php -m 执行后会多以下4个模块: mysqli mysqlnd...

SamXIAO ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部