文档章节

Hadoop MapReduce Overview

manonline
 manonline
发布于 2017/07/22 17:56
字数 1628
阅读 0
收藏 0

MapReduce Job

Overview

  • Client -> Submit Job -> YARN Resource Manager -> YARN Node Manager -> Application Master and Tasks
    • MapReduce Application Master
    • MapReduce Tasks
    • HDFS
      • JobResources : Jar, configuration, input splits, etc
      • Input Files
      • Output Files

Lifecycle

  • Job Submission (Client -> Yarn Resource Manager)
  • Job Initialization (Yarn Resource Manager -> Yarn Node Manager)
    • Scheduler -> Container -> Application Master
    • Application Master.MRAppMaster
      • bookkeeping objects: for the status update;
      • get input splits
      • determine map and reduce tasks
        • map tasks = splits
        • reduce tasks = mapreduce.job.reduce;
      • Tuning
        • map task: 1 minute;
        • reduce task: 5 minutes;
  • Task Assignment (Application Master-> Node Manager -> Tasks)
    • Data Locality, Map Tasks first
    • Resource Requirement
      • mapreduce.map.memory.mb
      • mapreduce.map.cpu.vcores
      • mapreduce.reduce.memory.mb
      • mapreduce.reduce.cpu.vcores
    • container allocation
  • Task Execution (Node Manager)
    • YarnChild()
      • Localize the resources/files;
      • Run map tasks and reduce tasks;
    • Progress & Status Update
      • Client -> Polling/getJobStatus() -> Application Master <- YarnChild <- Report <- Task
      • Client Polling from Application Master (every second or getJobStatus())
      • Tasks Report progress to Application Master
  • Job Completion
    • Clean up intermedia tasks
  • Retrieving the Results
    • each Reduce task produce one part

Code Highlight

  • ToolRunner.run(new MapReduceApp(), args)
  • MapReduceApp.class
    • Handle passed-in args (Configuration)
    • Setup Job
      • setJarByClass()/setJar : jar to be deployed in cluster;
      • MapReduceConfiguration() : configure MapReduce tasks;
        • InputFormat
        • MapperClass
        • ReducerClass
        • OutputKeyClass
        • OutputValueClass
        • OutputFormat
    • Run Job
      • waitForCompletion() : submit/run the job and poll the progress;

Failures

Task Failure

  • Detection
    • Tasks report back failure
    • Node Manager report back failure (for instance JVM failure)
    • Timeout (No progress report from tasks in 10 minutes)
  • Handling
    • retry up to 4 times;
    • fail the whole jobs after 4 times;

Application Master Failure

  • Detection
    • Application master sends heartbeats to Resource Manager;
    • mapreduce.am.max-attempts
  • Handling
    • Resource Manager spin up a new application master
    • recover the state of the tasks : yarn.app.mapreduce.am.job.recovery.enable
    • client switch to the new application master
      • due to time out getting an update from the previous, the client will check with resource manager to get a new one.

Node Manager Failure

  • Detection :
    • Node Manager fails to send heartbeats to Resource Manager;
    • yar.resourcemanager.nm.liveness-monitor.expiry-interval.ms
  • Handling :
    • Recover Application Master and Tasks (as above)

Resource Manager Failure

  • HA Solution + Zookeeper

InputFormat

Take the input and produce split for map tasks; The mapping between split and map task is on one one;

  • Raw File -> InputFormatClass -> K1, V1
    • Client Calculate the Splits by using
      • InputFormat.getSplits() : split = (size of the split, reference to the data)
    • The client sends Splits to Application Master, Split is just a reference to the data, rather thant the data itself;
    • Application Master schedule Map tasks
    • Map task process Split by invoking 
      • InputFormat.createRecordReader()
      • Map.run() take one record and invoke Map.map() with K1, V1
//-------------InputFormat.java-----------
public abstract class InputFormat<K, V> {
    public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;

    public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}

//-------------Mapper.java-----------------
// Mapper.run()
public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    // context.nextKeyValue will delegate the process to RecordReader
    // which is crated by InputFormat.createInputRecord();
    while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
}

// Mapper.map()
protected void map(KEYIN key, VALUEIN value, Context context) {
    // map processing based on given key and value;
    context.write((KEYOUT) key, (VALUEOUT) value);
}

FileInputFormat

  • Specify the to-be-processed files, i.e. input files;
  • Specify the how to splits the input files - max(minsize, min(maxsize, blocksize))
    • mapreduce.input.fileinput.format.split.minsize
    • mapreduce.input.fileinput.format.split.maxsize
    • dfs.blocksize
  • Preventing Split
    • make minSize > fileSize
    • subclass overrides isSplitable() to return false;
  • Processing Whole File as a Record
    • isSplitable = false
    • createRecordReader -> use WholeFileRecordReader();
    • Can be used for PACKAGING small fiels into SEQUENCE files;
      • key = original files
      • value = content of the file

CombineFileInputFormat

  • generates one split per node, which may be made of multiple blocks.
  • decouples amount of data that a mapper consumers from the block size.
  • GOOD for processing many small files.

Text Input

  • key = byte offset of the whole file (i.e. global);
  • value = line content;
    • line is not possible to track, since the file is broken into split
  • lines won't be broken in a split
    • lines is broken in block;
    • there is some data transfer to make up the split, but very minimal.

KeyValueTextInputFormat

  • key=first part in a line;
  • value=second part a line;
  • where first part and 2nd part are separated by
    • mapreduce.input.keyvaluelinerecordreader.key.value.separator (default TAB)

NLineInputFormat

  • key=empty
  • value=line content
  • N=how many lines in a split
    • mapreduce.input.lineinputformat.linespermap (default to 1)

XML

  • org.apache.hadoop.streaming.mapreduce.StreamXmlRecordReader

Binary Input

SequenceFileInputFormat

  • Stores sequences of binary key-value pairs.

SequenceFileAsTextInputFormat

  • Converts binary key-value pairs to Text key-value pairs as the input to Mapper, which is done via toString()

SequenceFileAsBinaryInputFormat

  • Binary key-value pairs to Mapper, Mapper to interpret.

FixedLengthInputFormat

  • Reading fixed-width binary records from a file.

Multiple Inputs

  • Allow developers to specify InputFormat and Mapper for each input directory
MultipleInputs.addInputPath(job, inputPathA, TextInputFormat.class, MaxTemperatureMapper.class);

MultipleInputs.addInputPath(job, inputPathB, TextInputFormat.class, AnotherTemperatureMapper.class);

Database Input

  • DatabaseInputFormat is an input format for reading data from a relational database, using JDBC
  • Sqoop

OutputFormat

Text Output

  • Default one with the default separater as TAB

Binary Output

  • SequenceFileOutputFormat
  • SequenceFileAsBinaryOutputFormat
  • MapFileOutputFormat

Multiple Outputs

The default behavior is that each Reducer produces one output file, and files are named by the partition number: part-r-00000, part-r-00001, and so on.

MultipleOutputs allows you to write data to files whose names are derived from the output kyes and values or in fact from an arbitrary string.

MapReduce

Data Type

Mapper

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public class Context extends MapperContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        // ...
    }
    public void map(KEYIN key, VALUE value, Context context) throws IOException, InterruptedException {
        // ...
    }
}

Reducer

public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public class Context extends ReducerContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        // ...
    }
    public void reduce(KEYIN key, VALUE value, Context context) throws IOException, InterruptedException {
        // ...
    }
}

Data Flow

  • Mapper : (K1, V1) -> list(K2, V2)
  • Group/Sort : (K2, list(V2)) - handled by framework
  • Reducer : (K2, list(V2)) -> list(K3, V3)
   
InputFormatClass     TextInputFormat.class
MapperClass Mapper.class
MapOutputKeyClass LongWritable.class
MapOutputValueClass Text.class
PartitionerClass HashPartitioner.class
NumReduceTasks 1
ReducerClass Reducer.class
OutputKeyClass LongWritable.class
OutputValueClass Text.class
OutputFormatClass TextOutputFormat.class

Shuffle & Sort

Map Output

  • Map writes output to -> Circular Memory Buffer (100M by Default)
  • Buffer reaches Threshold ->
    • Divides the data into Partitions
    • Perform Sort by Key
    • Perform Combiner function (if any)
    • Spill content to local Disk (mapreduce.cluster.local.dir), i.e. Spill Files;
  • Combiner Function again (if any) and only if > 3 spill files;
    • If <3, potential reduction in map output size is not worth the overhead in invoking combiner.
  • Merge Spill files into A Single Partitioned and Sorted Output file;
  • Compress Output file
    • mapreduce.map.output.compress=true
    • mapreduce.map.output.compress.codec=library
  • Enable HTTP access for Reducer
    • mapreduce.shuffle.max.threads=0 (default 2XCPU on node manager)
  • Notify Application Master using heartbeat mechanism of done and
    • map output and hosts so that reduce task can know the mapping

Reduce Input

  • Reduce task Copy partitions from different nodes (map tasks) that hold its partition
    • mapreduce.reduce.shuffle.parallelcopies=5
    • Spill to disk if mapreduce.reduce.merge.inmem.threshold is met
    • Performer Combiner function (if any)
    • background job to Merge into Sorted files (to save some processing time later)
  • Sort/Merge phase (when all copy is done)
  • Reduce Phase

Reduce Output

  • Setting up and create the temp and final files.

Complex Processing

  • Single but Complex Mapper/Reducer
  • ChainMapper/ChainReducer
  • Multiple Jobs
    • Linear Jobs/Synchronized Jobs
      • runJob(conf1)
      • runJob(conf2)
    • DAG
      • JobControl: setup dependencies, query previous job status/result.
  • Workflow Engine: STORE and RUN workflow
    • Workflow: a collection of jobs (MapReduce, Pig, Hive, etc)
    • Engine: coordinate jobs execution based on predefined schedule and data availability.
    • Action Node: perform the jobs;
    • Control Node: conditional logic or parallel execution;
      • start control node
      • kill control node
      • end control node
    • Tools :
      • Oozie: running workflow as a service rather than on the client side (JobControl). makes job rerun easier without running successful jobs again.
  • Higher-level Language
    • Pig
    • Hive
    • Cascading
    • Crunch
    • Spark

Advanced Features

Counter

  • A useful CHANNEL for gathering statistics about the job. Usually better than logging.
  • Built-in Counters (Grouping)
    • org.apache.hadoop.mapreduce.FileSystemCounter
    • org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
    • org.apache.hadoop.mapreduce.lib.output.FileInputFormatCounter
    • org.apache.hadoop.mapreduce.JobCounter
      • TOTAL_LAUNCHED_MAPS
    • org.apache.hadoop.mapreduce.TaskCounter
      • MAP_INPUT_RECORDS :
        • records read by each map task.
        • sent to the application master periodically.
  • User-Defined Java Counters: 
    • Group Name = enum name; Counter Name = enum field;
    • incremented as desired in the mapper or reducer
    • aggregated globally
enum CounterGroupA {
    Counter1,
    Counter2,
    Countern
}

enum CounterGroupB {
    Counter1,
    Counter2,
    Countern
}

protected void map() {
    // .....
    context.getCounter(CounterGroupA.Counter1).increment(1));

    // ...
    countext.getCounter("DynamicCounter").increment(1);
    
}

Sorting

  • Partial Sort
    • Use Map task only (without Reduce Task), since sort is a built in function.
    • Save to sequence file
    • However, records are stored in each output file, but not across output files.
  • Total Sort
    • Use a Partitioner that respects the total order of the output 
    • Data Distribution / Range has to be calculated to ensure data are evenly distributed, otherwise, it would like to one machine doing too much 
      • loop through the whole dataset: overkill
      • sampling class 
  • Secondary Sort
    • Sorting the values in list(values)
    • Steps
      • The new Key = (Key, Value), so that (Key, Value) pair are sorted;
      • The new Partitioner built based on Key, so that the same (Key, Value) with the same Key will end up on the same Partition and in turn, the same Reducer.
      • Reducer group the values by the OLD key.
      • Comparator needs to be rewritten on both map and reduce tasks

Joins

Rather than writing MapReduce programs, you might consider using a higher-level framework such as Pig, Hive, Cascading, Cruc, or Spark.

Map-Side Joins

  • Two sources need to be sorted by the same key (join key) AND they also need to be partitioned in the same way.
  • Take the result of the above sorting as the input to Map tasks;
    • Use CompositerInputFormat (org.apache.hadoop.mapreduce.join) to join the two input.

Reduce-Side Joins

  • Multiple Inputs to configure different mappers for different source
    • To read different sources
    • The key has to be the JOIN Key + Tag, which is to ensure
      • Data pertaining to the same key gets landed on the same Reducer
      • Tag (comparable tag) to ensure who to be loaded by Reducer first, i.e. who to drive the join.
  • Map Tasks send all the data pertaining to the same join key to the same reducer.
  • Reduce task each time parsing a record
    • get the value iterator
    • compare the current key with all the keys contained in the values (via iterator) and do the join. 

© 著作权归作者所有

共有 人打赏支持
manonline
粉丝 0
博文 73
码字总数 66740
作品 0
如何分布式运行mapreduce程序

如何分布式运行mapreduce程序 一、 首先要知道此前提 若在windows的Eclipse工程中直接启动mapreduc程序,需要先把hadoop集群的配置目录下的xml都拷贝到src目录下,让程序自动读取集群的地址后...

Zero零_度
2015/09/06
0
0
Hadoop、MapReduce、YARN和Spark的区别与联系

(1) Hadoop 1.0 第一代Hadoop,由分布式存储系统HDFS和分布式计算框架MapReduce组成,其中,HDFS由一个NameNode和多个DataNode组成,MapReduce由一个JobTracker和多个TaskTracker组成,对应...

cuiyaonan2000
05/08
0
0
Hadoop2.X的安装与配置(二)本地模式

在上一篇文章中,我们介绍了Hadoop2.X安装与配置前的准备阶段。 在本地模式配置前,首先完成准备阶段。 点击如下链接,进入准备阶段的配置 https://blog.csdn.net/weixin38187469/article/d...

weixin_38187469
04/16
0
0
2014-11-12--Hadoop的基础学习(三)--Hadoop中MapReduce框架入门

1.MapReduce的简单概念 百度百科:MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",和他们的主要思想,都是从函数式编程语言里借来的...

查封炉台
2014/11/16
0
8
Hadoop 2.5.0编译到Apache Hadoop Common失败

[INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary: [INFO] [INFO] Apache Hadoop Main ................................. SUCCES......

尧雪
04/19
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSX | SafariBookmarksSyncAgent意外退出解决方法

1. 启动系统, 按住⌘-R不松手2. 在实用工具(Utilities)下打开终端,输入csrutil disable, 然后回车; 你就看到提示系统完整性保护(SIP: System Integrity Protection)已禁用3. 输入reboot回车...

云迹
今天
4
0
面向对象类之间的关系

面向对象类之间的关系:is-a、has-a、use-a is-a关系也叫继承或泛化,比如大雁和鸟类之间的关系就是继承。 has-a关系称为关联关系,例如企鹅在气候寒冷的地方生活,“企鹅”和“气候”就是关...

gackey
今天
4
0
读书(附电子书)|小狗钱钱之白色的拉布拉多

关注公众号,在公众号中回复“小狗钱钱”可免费获得电子书。 一、背景 之前写了一篇文章 《小狗钱钱》 理财小白应该读的一本书,那时候我才看那本书,现在看了一大半了,发现这本书确实不错,...

tiankonguse
今天
4
0
Permissions 0777 for ‘***’ are too open

异常显示: @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ @ WARNING: UNPROTECTED PRIVATE KEY FILE! @ @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ ......

李玉长
今天
5
0
区块链10年了,还未落地,它失败了吗?

导读 几乎每个人,甚至是对通证持怀疑态度的人,都对区块链的技术有积极的看法,因为它有可能改变世界。然而,区块链技术问世已经10年了,我们仍然没有真正的用上区块链技术。 几乎每个人,甚...

问题终结者
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部