文档章节

Hadoop MapReduce Overview

manonline
 manonline
发布于 2017/07/22 17:56
字数 1628
阅读 0
收藏 0

MapReduce Job

Overview

  • Client -> Submit Job -> YARN Resource Manager -> YARN Node Manager -> Application Master and Tasks
    • MapReduce Application Master
    • MapReduce Tasks
    • HDFS
      • JobResources : Jar, configuration, input splits, etc
      • Input Files
      • Output Files

Lifecycle

  • Job Submission (Client -> Yarn Resource Manager)
  • Job Initialization (Yarn Resource Manager -> Yarn Node Manager)
    • Scheduler -> Container -> Application Master
    • Application Master.MRAppMaster
      • bookkeeping objects: for the status update;
      • get input splits
      • determine map and reduce tasks
        • map tasks = splits
        • reduce tasks = mapreduce.job.reduce;
      • Tuning
        • map task: 1 minute;
        • reduce task: 5 minutes;
  • Task Assignment (Application Master-> Node Manager -> Tasks)
    • Data Locality, Map Tasks first
    • Resource Requirement
      • mapreduce.map.memory.mb
      • mapreduce.map.cpu.vcores
      • mapreduce.reduce.memory.mb
      • mapreduce.reduce.cpu.vcores
    • container allocation
  • Task Execution (Node Manager)
    • YarnChild()
      • Localize the resources/files;
      • Run map tasks and reduce tasks;
    • Progress & Status Update
      • Client -> Polling/getJobStatus() -> Application Master <- YarnChild <- Report <- Task
      • Client Polling from Application Master (every second or getJobStatus())
      • Tasks Report progress to Application Master
  • Job Completion
    • Clean up intermedia tasks
  • Retrieving the Results
    • each Reduce task produce one part

Code Highlight

  • ToolRunner.run(new MapReduceApp(), args)
  • MapReduceApp.class
    • Handle passed-in args (Configuration)
    • Setup Job
      • setJarByClass()/setJar : jar to be deployed in cluster;
      • MapReduceConfiguration() : configure MapReduce tasks;
        • InputFormat
        • MapperClass
        • ReducerClass
        • OutputKeyClass
        • OutputValueClass
        • OutputFormat
    • Run Job
      • waitForCompletion() : submit/run the job and poll the progress;

Failures

Task Failure

  • Detection
    • Tasks report back failure
    • Node Manager report back failure (for instance JVM failure)
    • Timeout (No progress report from tasks in 10 minutes)
  • Handling
    • retry up to 4 times;
    • fail the whole jobs after 4 times;

Application Master Failure

  • Detection
    • Application master sends heartbeats to Resource Manager;
    • mapreduce.am.max-attempts
  • Handling
    • Resource Manager spin up a new application master
    • recover the state of the tasks : yarn.app.mapreduce.am.job.recovery.enable
    • client switch to the new application master
      • due to time out getting an update from the previous, the client will check with resource manager to get a new one.

Node Manager Failure

  • Detection :
    • Node Manager fails to send heartbeats to Resource Manager;
    • yar.resourcemanager.nm.liveness-monitor.expiry-interval.ms
  • Handling :
    • Recover Application Master and Tasks (as above)

Resource Manager Failure

  • HA Solution + Zookeeper

InputFormat

Take the input and produce split for map tasks; The mapping between split and map task is on one one;

  • Raw File -> InputFormatClass -> K1, V1
    • Client Calculate the Splits by using
      • InputFormat.getSplits() : split = (size of the split, reference to the data)
    • The client sends Splits to Application Master, Split is just a reference to the data, rather thant the data itself;
    • Application Master schedule Map tasks
    • Map task process Split by invoking 
      • InputFormat.createRecordReader()
      • Map.run() take one record and invoke Map.map() with K1, V1
//-------------InputFormat.java-----------
public abstract class InputFormat<K, V> {
    public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;

    public abstract RecordReader<K, V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException;
}

//-------------Mapper.java-----------------
// Mapper.run()
public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    // context.nextKeyValue will delegate the process to RecordReader
    // which is crated by InputFormat.createInputRecord();
    while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
    }
    cleanup(context);
}

// Mapper.map()
protected void map(KEYIN key, VALUEIN value, Context context) {
    // map processing based on given key and value;
    context.write((KEYOUT) key, (VALUEOUT) value);
}

FileInputFormat

  • Specify the to-be-processed files, i.e. input files;
  • Specify the how to splits the input files - max(minsize, min(maxsize, blocksize))
    • mapreduce.input.fileinput.format.split.minsize
    • mapreduce.input.fileinput.format.split.maxsize
    • dfs.blocksize
  • Preventing Split
    • make minSize > fileSize
    • subclass overrides isSplitable() to return false;
  • Processing Whole File as a Record
    • isSplitable = false
    • createRecordReader -> use WholeFileRecordReader();
    • Can be used for PACKAGING small fiels into SEQUENCE files;
      • key = original files
      • value = content of the file

CombineFileInputFormat

  • generates one split per node, which may be made of multiple blocks.
  • decouples amount of data that a mapper consumers from the block size.
  • GOOD for processing many small files.

Text Input

  • key = byte offset of the whole file (i.e. global);
  • value = line content;
    • line is not possible to track, since the file is broken into split
  • lines won't be broken in a split
    • lines is broken in block;
    • there is some data transfer to make up the split, but very minimal.

KeyValueTextInputFormat

  • key=first part in a line;
  • value=second part a line;
  • where first part and 2nd part are separated by
    • mapreduce.input.keyvaluelinerecordreader.key.value.separator (default TAB)

NLineInputFormat

  • key=empty
  • value=line content
  • N=how many lines in a split
    • mapreduce.input.lineinputformat.linespermap (default to 1)

XML

  • org.apache.hadoop.streaming.mapreduce.StreamXmlRecordReader

Binary Input

SequenceFileInputFormat

  • Stores sequences of binary key-value pairs.

SequenceFileAsTextInputFormat

  • Converts binary key-value pairs to Text key-value pairs as the input to Mapper, which is done via toString()

SequenceFileAsBinaryInputFormat

  • Binary key-value pairs to Mapper, Mapper to interpret.

FixedLengthInputFormat

  • Reading fixed-width binary records from a file.

Multiple Inputs

  • Allow developers to specify InputFormat and Mapper for each input directory
MultipleInputs.addInputPath(job, inputPathA, TextInputFormat.class, MaxTemperatureMapper.class);

MultipleInputs.addInputPath(job, inputPathB, TextInputFormat.class, AnotherTemperatureMapper.class);

Database Input

  • DatabaseInputFormat is an input format for reading data from a relational database, using JDBC
  • Sqoop

OutputFormat

Text Output

  • Default one with the default separater as TAB

Binary Output

  • SequenceFileOutputFormat
  • SequenceFileAsBinaryOutputFormat
  • MapFileOutputFormat

Multiple Outputs

The default behavior is that each Reducer produces one output file, and files are named by the partition number: part-r-00000, part-r-00001, and so on.

MultipleOutputs allows you to write data to files whose names are derived from the output kyes and values or in fact from an arbitrary string.

MapReduce

Data Type

Mapper

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public class Context extends MapperContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        // ...
    }
    public void map(KEYIN key, VALUE value, Context context) throws IOException, InterruptedException {
        // ...
    }
}

Reducer

public class Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    public class Context extends ReducerContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        // ...
    }
    public void reduce(KEYIN key, VALUE value, Context context) throws IOException, InterruptedException {
        // ...
    }
}

Data Flow

  • Mapper : (K1, V1) -> list(K2, V2)
  • Group/Sort : (K2, list(V2)) - handled by framework
  • Reducer : (K2, list(V2)) -> list(K3, V3)
   
InputFormatClass     TextInputFormat.class
MapperClass Mapper.class
MapOutputKeyClass LongWritable.class
MapOutputValueClass Text.class
PartitionerClass HashPartitioner.class
NumReduceTasks 1
ReducerClass Reducer.class
OutputKeyClass LongWritable.class
OutputValueClass Text.class
OutputFormatClass TextOutputFormat.class

Shuffle & Sort

Map Output

  • Map writes output to -> Circular Memory Buffer (100M by Default)
  • Buffer reaches Threshold ->
    • Divides the data into Partitions
    • Perform Sort by Key
    • Perform Combiner function (if any)
    • Spill content to local Disk (mapreduce.cluster.local.dir), i.e. Spill Files;
  • Combiner Function again (if any) and only if > 3 spill files;
    • If <3, potential reduction in map output size is not worth the overhead in invoking combiner.
  • Merge Spill files into A Single Partitioned and Sorted Output file;
  • Compress Output file
    • mapreduce.map.output.compress=true
    • mapreduce.map.output.compress.codec=library
  • Enable HTTP access for Reducer
    • mapreduce.shuffle.max.threads=0 (default 2XCPU on node manager)
  • Notify Application Master using heartbeat mechanism of done and
    • map output and hosts so that reduce task can know the mapping

Reduce Input

  • Reduce task Copy partitions from different nodes (map tasks) that hold its partition
    • mapreduce.reduce.shuffle.parallelcopies=5
    • Spill to disk if mapreduce.reduce.merge.inmem.threshold is met
    • Performer Combiner function (if any)
    • background job to Merge into Sorted files (to save some processing time later)
  • Sort/Merge phase (when all copy is done)
  • Reduce Phase

Reduce Output

  • Setting up and create the temp and final files.

Complex Processing

  • Single but Complex Mapper/Reducer
  • ChainMapper/ChainReducer
  • Multiple Jobs
    • Linear Jobs/Synchronized Jobs
      • runJob(conf1)
      • runJob(conf2)
    • DAG
      • JobControl: setup dependencies, query previous job status/result.
  • Workflow Engine: STORE and RUN workflow
    • Workflow: a collection of jobs (MapReduce, Pig, Hive, etc)
    • Engine: coordinate jobs execution based on predefined schedule and data availability.
    • Action Node: perform the jobs;
    • Control Node: conditional logic or parallel execution;
      • start control node
      • kill control node
      • end control node
    • Tools :
      • Oozie: running workflow as a service rather than on the client side (JobControl). makes job rerun easier without running successful jobs again.
  • Higher-level Language
    • Pig
    • Hive
    • Cascading
    • Crunch
    • Spark

Advanced Features

Counter

  • A useful CHANNEL for gathering statistics about the job. Usually better than logging.
  • Built-in Counters (Grouping)
    • org.apache.hadoop.mapreduce.FileSystemCounter
    • org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter
    • org.apache.hadoop.mapreduce.lib.output.FileInputFormatCounter
    • org.apache.hadoop.mapreduce.JobCounter
      • TOTAL_LAUNCHED_MAPS
    • org.apache.hadoop.mapreduce.TaskCounter
      • MAP_INPUT_RECORDS :
        • records read by each map task.
        • sent to the application master periodically.
  • User-Defined Java Counters: 
    • Group Name = enum name; Counter Name = enum field;
    • incremented as desired in the mapper or reducer
    • aggregated globally
enum CounterGroupA {
    Counter1,
    Counter2,
    Countern
}

enum CounterGroupB {
    Counter1,
    Counter2,
    Countern
}

protected void map() {
    // .....
    context.getCounter(CounterGroupA.Counter1).increment(1));

    // ...
    countext.getCounter("DynamicCounter").increment(1);
    
}

Sorting

  • Partial Sort
    • Use Map task only (without Reduce Task), since sort is a built in function.
    • Save to sequence file
    • However, records are stored in each output file, but not across output files.
  • Total Sort
    • Use a Partitioner that respects the total order of the output 
    • Data Distribution / Range has to be calculated to ensure data are evenly distributed, otherwise, it would like to one machine doing too much 
      • loop through the whole dataset: overkill
      • sampling class 
  • Secondary Sort
    • Sorting the values in list(values)
    • Steps
      • The new Key = (Key, Value), so that (Key, Value) pair are sorted;
      • The new Partitioner built based on Key, so that the same (Key, Value) with the same Key will end up on the same Partition and in turn, the same Reducer.
      • Reducer group the values by the OLD key.
      • Comparator needs to be rewritten on both map and reduce tasks

Joins

Rather than writing MapReduce programs, you might consider using a higher-level framework such as Pig, Hive, Cascading, Cruc, or Spark.

Map-Side Joins

  • Two sources need to be sorted by the same key (join key) AND they also need to be partitioned in the same way.
  • Take the result of the above sorting as the input to Map tasks;
    • Use CompositerInputFormat (org.apache.hadoop.mapreduce.join) to join the two input.

Reduce-Side Joins

  • Multiple Inputs to configure different mappers for different source
    • To read different sources
    • The key has to be the JOIN Key + Tag, which is to ensure
      • Data pertaining to the same key gets landed on the same Reducer
      • Tag (comparable tag) to ensure who to be loaded by Reducer first, i.e. who to drive the join.
  • Map Tasks send all the data pertaining to the same join key to the same reducer.
  • Reduce task each time parsing a record
    • get the value iterator
    • compare the current key with all the keys contained in the values (via iterator) and do the join. 

© 著作权归作者所有

共有 人打赏支持
上一篇: Hadoop Hive
下一篇: Data Warehouse
manonline
粉丝 0
博文 73
码字总数 66740
作品 0
私信 提问
如何分布式运行mapreduce程序

如何分布式运行mapreduce程序 一、 首先要知道此前提 若在windows的Eclipse工程中直接启动mapreduc程序,需要先把hadoop集群的配置目录下的xml都拷贝到src目录下,让程序自动读取集群的地址后...

Zero零_度
2015/09/06
0
0
Hadoop、MapReduce、YARN和Spark的区别与联系

(1) Hadoop 1.0 第一代Hadoop,由分布式存储系统HDFS和分布式计算框架MapReduce组成,其中,HDFS由一个NameNode和多个DataNode组成,MapReduce由一个JobTracker和多个TaskTracker组成,对应...

cuiyaonan2000
05/08
0
0
Hadoop2.X的安装与配置(二)本地模式

在上一篇文章中,我们介绍了Hadoop2.X安装与配置前的准备阶段。 在本地模式配置前,首先完成准备阶段。 点击如下链接,进入准备阶段的配置 https://blog.csdn.net/weixin38187469/article/d...

weixin_38187469
04/16
0
0
2014-11-12--Hadoop的基础学习(三)--Hadoop中MapReduce框架入门

1.MapReduce的简单概念 百度百科:MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",和他们的主要思想,都是从函数式编程语言里借来的...

查封炉台
2014/11/16
0
8
Hadoop 2.5.0编译到Apache Hadoop Common失败

[INFO] ------------------------------------------------------------------------ [INFO] Reactor Summary: [INFO] [INFO] Apache Hadoop Main ................................. SUCCES......

尧雪
04/19
0
0

没有更多内容

加载失败,请刷新页面

加载更多

当程序员有了中年危机 你会发现你就是个屁

前言 程序员是一个怎样的存在?引用一句鸡汤的名言来说:你以为你用双手改变了世界,实际上是苍老了自己。为什么我今天会抛出这个话题,其实我也是一个懵懂的少年,我也曾经为了成为一名程序...

架构师springboot
26分钟前
3
0
大型网站B2C商城项目实战+MongoDB+Redis+zookeeper+MySQL

本文列出了当今计算机软件开发和应用领域最关键部分,如果你想保证你现在以及未来的几年不失业,那么你最好跟上这些技术的发展。虽然你不必对这十种技术样样精通,但至少应该对它们非常熟悉。...

java知识分子
26分钟前
2
0
大型企业网络系统集成方案如何设计?

网络系统集成是企业实现无纸化办公和即时通讯办公的基础建设,在以生产效率为核心竞争力的市场中,企业想要快速获取信息并有效提高企业工作效率及业务能力,企业网络系统集成是必不可少的,由...

Java干货分享
27分钟前
1
0
Spring应用学习——IOC

1. Spring简介 1. Spring的出现是为了取代EJB(Enterprise JavaBean)的臃肿、低效、脱离现实的缺点。Spring致力于J2EE应用的各层(表现层、业务层、持久层)的解决方案,Spring是企业应用开...

江左煤郎
28分钟前
1
0
用Redis轻松实现秒杀系统

导论 曾经被问过好多次怎样实现秒杀系统的问题。昨天又在CSDN架构师微信群被问到了。因此这里把我设想的实现秒杀系统的价格设计分享出来。供大家参考。 秒杀系统的架构设计 秒杀系统,是典型...

James-
36分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部