文档章节

Hadoop Outline Part 5 (MapReduce- Conf,Test,Debug)

woodo
 woodo
发布于 2014/09/12 18:57
字数 2345
阅读 45
收藏 0

Map-Reduce 之 配置,测试,调试

Evironvemnt:

cdh5.1

Configuration 

配置文件位置

使用cdh5.1,该文件位于

/etc/hadoop/conf, 其实/etc/hadoop下面有几种目录,比如conf.dist,conf.pseudo,conf.impala


文件列表

hadoop-env.sh,可以控制全局环境变量


core-site.xml,最重要的是参数fs.defaultFS

1.value = File:\\\home\, 这个是单机模式(single-node standalone),hadoop daemon运行在一个jvm进程。主要方便调试。

2.value = hdfs://localhost:8020,这个是伪分布式(Pseudo-distributes),就是每个daemon运行在单独的jvm进程,但还是都在一台主机上。主要用于学习测试调试等。

3.value = hdfs://host:8020, 集群模式.


hdfs-site.xml,最重要的是参数dfs.replication

除了集群模式是3,一般都设置为1.

dfs.namenode.replication.min = 1,块复制的底线


mapred-site.xml,最重要的是参数mapred.job.tracker

也就是jobtracker运行在那一台机器上。


yarn-site.xml,主要用来配置resourcemanager。


hadoop-metrics.properties,如果配置了Ambari,需要配置此文件,以便于发射监控指标给Ambari服务器。

log4j.properties


如果有多个配置文件加载,那么一般情况下,后加载的配置覆盖相同的早加载的配置文件。为了防止不期望的覆盖,配置文件中有final的关键字,它可以防止后面的覆盖。


conf和jvm的配置, 我们可以把某些配置写入jvm properties,如果这样做,它是最高优先级的,比conf高。

hadoop jar -Ddfs.replication=1


Map-Reduce Sample

首先说主程序,MyWordCount继承于Tool 和 Configured, Configured主要用来帮助Tool实现Configurable.

interface Tool extends Configurable

Configured extends Configurable

一般都会调用ToolRunner来运行程序,ToolRunner内部会调用GenericOptionsParser,所以你的程序可以添加参数的能力。

这里和hadoop1的不同在于org.apache.hadoop.mapreduce,我记得1.0,好像是mapred.

/**
 * write by jinbao
 */
package com.jinbao.hadoop.mapred.unittest;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


/**
 * @author cloudera
 *
 */
public class MyWordCount  extends Configured implements Tool  {

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {

		try {
			ToolRunner.run(new MyWordCount(), args);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public int run(String[] args) throws Exception {
		if(args.length != 2){
			System.err.printf("usage: %s,  [generic options] <input> <output> \n",getClass().getSimpleName());
			ToolRunner.printGenericCommandUsage(System.err);
			return -1;
		}
		
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf,"word counting");
		
		job.setJarByClass(MyWordCount.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setReducerClass(SumReducer.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job,new Path(args[1]));

		
		System.exit(job.waitForCompletion(true)?0:1);
		
		return 0;
	}
	/**
	 * @author cloudera
	 *
	 */
	public static class TokenizerMapper extends Mapper<Object,Text,Text,IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();
		
		public void map(Object key, Text value, Context context) throws IOException,InterruptedException {
			StringTokenizer itr = new StringTokenizer(value.toString());
			while ( itr.hasMoreTokens()){
				word.set(itr.nextToken());
				context.write(word, one);
			}
		}
	}
	
	public static class SumReducer extends Reducer<Text,IntWritable, Text, IntWritable>{
		private static IntWritable result = new IntWritable();
		public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
			int sum = 0;
			for (IntWritable val:values){
				sum += val.get();
			}
			
			result.set(sum);
			context.write(key, result);
		}
	}

}

MapReduce Web UI
MRv1: http://jobtracker-host:50030
MRv2: http://resourcemgr-host:8088/cluster
      application细节,可以到job history里边去看。

单元测试-MRUnit

这是一个专门针对map-reduce单元测试的工具包

需要下载依赖

1. junit,这个eclipse已经自带了,hadoop的lib下面也有。

2. mockito,这个下面的包里有。

3. powermock,下载连接here

4. MRUnit,去apache家找here


下面上我的程序:

package com.jinbao.hadoop.mapred.unittest;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.io.*;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.apache.hadoop.mrunit.types.Pair;


public class MyWordCountTest {
	private MapDriver<Object, Text, Text, IntWritable> mapDriver;
	private ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
	private MapReduceDriver<Object, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;

	@Before
	public void setUp() {
		MyWordCount.TokenizerMapper mapper = new MyWordCount.TokenizerMapper();
		MyWordCount.SumReducer reducer = new MyWordCount.SumReducer();

		mapDriver = MapDriver.newMapDriver(mapper);
		reduceDriver = ReduceDriver.newReduceDriver(reducer);
		mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, reducer);
	}

	@Test
	public void testMapper() throws IOException {
		mapDriver.withInput(new LongWritable(), new Text("test input from unit test"));
		
		ArrayList<Pair<Text,IntWritable>> outputRecords = new ArrayList<Pair<Text,IntWritable>>();
		outputRecords.add( new Pair<Text,IntWritable>(new Text("test"),new IntWritable(1) ) );
		outputRecords.add( new Pair<Text,IntWritable>(new Text("input"),new IntWritable(1) ) );
		outputRecords.add( new Pair<Text,IntWritable>(new Text("from"),new IntWritable(1) ) );
		outputRecords.add( new Pair<Text,IntWritable>(new Text("unit"),new IntWritable(1) ) );
		outputRecords.add( new Pair<Text,IntWritable>(new Text("test"),new IntWritable(1) ) );
		
		mapDriver.withAllOutput(outputRecords);
		mapDriver.runTest();
	}

	@Test
	public void testReducer() throws IOException {
		reduceDriver.withInput(new Text("input"), new ArrayList<IntWritable>(Arrays.asList(new IntWritable(1), new IntWritable(3))) );
		reduceDriver.withOutput(new Text("input"), new IntWritable(4));
		reduceDriver.runTest();
	}
	
	@Test
	public void testMapperReducer() throws IOException {
		mapReduceDriver.withInput(new LongWritable(), new Text("test input input input input input test") );
		
		ArrayList<Pair<Text,IntWritable>> outputRecords = new ArrayList<Pair<Text,IntWritable>>();
		
		outputRecords.add( new Pair<Text,IntWritable>(new Text("input"),new IntWritable(5) ) );
		outputRecords.add( new Pair<Text,IntWritable>(new Text("test"),new IntWritable(2) ) );
		
		mapReduceDriver.withAllOutput(outputRecords);
		mapReduceDriver.runTest();
	}
}


Run MRUnit

上图直接运行@Test方法就可以解决90%以上的问题,否则你的UnitTest覆盖率太低,那么后期在cluster出问题,就debug成本比较高了.


Run Locally

Eclipse里边配置Debug Configuration:
/home/cloudera/workspace/in /home/cloudera/workspace/out
注意:job runner运行的都是本地目录,使用toolrunner默认是启动一个standalone的jvm来运行hadoop,另外,只能有0或1个reduce.这个不是问题,只要非常方便的调试就可以了.

YARN里边默认是mapreduce.framework.name必须设置为local,不过这都是默认的,不需要管它。


Run in Cluster

导出jar,我都是用eclipse来干,用ant,命令行等都可以,看喜好了。
如果你的jar包有依赖,那么也要把依赖包到处在某个lib里边,并且minifest里边配置main class是哪一个.这个package和war打包没什么区别

%hadoop fs -copyFromLocal /home/cloudera/word.txt data/in
%hadoop jar wordcount.jar data/in data/out


IsolationRunner and Remote Debugger

前提:keep.failed.task.files,该选项默认为 false,表示对于失败的task,其运行的临时数据和目录是不会被保存的。这是一个per job的配置,运行job的时候加上这个选项。
如何重跑: 
    当fail的task环境具备以后,就可以对单独的task进行重跑了。重跑的方式为:
1. 上到task出错的tasktracker机器 上
2. 在该tasktracker上找到fail的task运行时的目录环境 1. 在 tasktracker中,对于每一个task都会有一个单独的执行环境,其中包括其work目录,其对应的中间文件,以及其运行时需要用到的配置文件等
2. 这些 目录是由tasktracker的配置决定,配置选项为: mapred.local.dir. 该选项可能是一个逗号分隔的路径list,每个 list都是tasktracker对在其上执行的task建立工作目录的根目录。比如如果mapred.local.dir=/disk1 /mapred/local,/disk2/mapred/local,那么task的执行环境就是mapred.local.dir /taskTracker/jobcache/job-ID/task-attempt-ID
3. 找到该task的执行工作目录后,就可以进入到 该目录下,然后其中就会有该task的运行环境,通常包括一个work目录,一个job.xml文件,以及一个task要进行操作的数据文件(对map来 说是split.dta,对reduce来说是file.out)。
4. 找到环境以后,就可以重跑task了。 1. cd work
2. hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml
•   ◦ 这样,IsolationRunner就会读取job.xml的配置(这里的job.xml相当 于提交客户端的hadoop-site.xml配置文件与命令行-D配置的接合),然后对该map或者reduce进行重新运行。
1. 到这里为止,已经实现了task单独重跑,但是还是没有解决对其进行单步断点debug。这里利用到的其实是jvm的远程 debug的功能。方式如下: 1. 在重跑task之前,export一个环境变 量:export HADOOP_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=8888"
2. 这 样,hadoop的指令就会通过8888端口将debug信息发送出去
3. 然后在自己本地的开发环境IDE中(比如 eclipse),launch一个远程调试,并在代码中打一个断点,就可以对在tasktracker上运行的独立map或者reduce task进行远程单步调试了。
详细可以去到这个blog看看。
http://blog.csdn.net/cwyspy/article/details/10004995

Note: 非常不幸,在最近的版本里面,IsolationRunner已经不能使用,所以在hadoop2里边,需要找到失败节点后,把问题文件拷贝出来,进行单机调试。

合并结果集

根据Reduce个数,可以会有多个part的结果集,那么可以使用下面命令来合并

% hadoop fs -getmerge max-temp max-temp-local

% sort max-temp-local | tail


Tuning a Job

  1. Number of mappers

  2. Number of reducers

  3. Combiners

  4. Intermediate compression

  5. Custom serialization

  6. Shuffle tweaks


MapReduce Workflows

In other words, as a rule of thumb, think about adding more jobs, rather than adding complexity to jobs.


ChainMapper and ChainReducer

It's a Map*/Reduce model, which means multiple mappers work as a chain, and after last mapper, output will go to reducer. this sounds reduced network IO.

Though called 'ChainReducer', actually only a Reducer working for ChainMapper, so gets the name.


Mapper1->Mapper2->MapperN->Reducer


JobControl

MR has a class JobControl, but as I test it's really not maintained well. 

Simply to use:

if(Run(job1)

   Run(job2)


Apache Oozie

Oozie是一种Java Web应用程序,它运行在Java servlet容器——即Tomcat——中,并使用数据库来存储以下内容:

  • 工作流定义

  • 当前运行的工作流实例,包括实例的状态和变量

Oozie工作流是放置在控制依赖DAG(有向无环图 Direct Acyclic Graph)中的一组动作(例如,Hadoop的Map/Reduce作业、Pig作业等),其中指定了动作执行的顺序。我们会使用hPDL(一种XML流程定义语言)来描述这个图。

hPDL是一种很简洁的语言,只会使用少数流程控制和动作节点。控制节点会定义执行的流程,并包含工作流的起点和终点(start、end和fail节点)以及控制工作流执行路径的机制(decision、fork和join节点)。动作节点是一些机制,通过它们工作流会触发执行计算或者处理任务。Oozie为以下类型的动作提供支持: Hadoop map-reduce、Hadoop文件系统、Pig、Java和Oozie的子工作流(SSH动作已经从Oozie schema 0.2之后的版本中移除了)


这个是个重头戏,我需要单独一张来深入。


© 著作权归作者所有

woodo
粉丝 5
博文 57
码字总数 32118
作品 0
朝阳
高级程序员
私信 提问
加载中

评论(1)

woodo
woodo 博主
Hadoop mapper and reducer output mismatch

java.lang.Exception: java.io.IOException: wrong value class: class org.apache.hadoop.io.LongWritable is not class org.apache.hadoop.io.IntWritable
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.io.IOException: wrong value class: class org.apache.hadoop.io.LongWritable is not class org.apache.hadoop.io.IntWritable
at org.apache.hadoop.mapred.IFile$Writer.append(IFile.java:168)
at org.apache.hadoop.mapred.Task$CombineOutputCollector.collect(Task.java:1160)
at org.myorg.Sort$Reduce.reduce(Sort.java:34)
at org.myorg.Sort$Reduce.reduce(Sort.java:28)
at org.apache.hadoop.mapred.Task$OldCombinerRunner.combine(Task.java:1436)

Comment out the below line and the program should work.
conf.setCombinerClass(Reduce.class);

When a combiner class is defined for a job, the intermediate key value pairs are combined on the same node as the map task before sending to the reducer. Combiner reduces the network traffic
【Hadoop] - windows开发环境搭建

文章说明:因Linux平台再GUI页面通过IDE进行Hadoop开发,会导致Linux在GUI上极度消耗资源,对于一些配置不是很高的PC,可能会出现卡顿的情况,非常影响程序编写,本文就详细介绍如何在windo...

ZeroneLove
02/24
23
0
第6章-MapReduce的工作机制-笔记

作业的提交 可以只用一行代码来运行一个MapReduce作业: JobClient.runJob(conf)。 作业的调度 Hadoop作业调度演进 1、早期版本的Hadoop使用FIFO调度算法来运行作业 早期版本的Hadoop使用一种...

hiqj
2014/10/16
37
0
安装一个单节点的 Hadoop 分布式系统

构建 MapReduce Tar包 你可以获取 MapReduce 的tar包,也可以从源码中构建这个tar包 注意: 你需要使用 2.4.1 或者更新版本的 protoc 如果你想忽略 mapreduce 内置的构建,可以忽略 maven 的 ...

oschina
2013/07/23
983
0
mapreduce操作hbase报错NoClassDefFoundError

下面是具体的报错信息,执行方式:/opt/cloudera/parcels/CDH/bin/hadoop jar mapreduce.jar 查了官方文档,可以通过制定hadoop_classpath实现,但是我效,操作如下: 第一步:指定环境变量 ...

skysky
2015/05/27
3.1K
2
大数据之---Yarn伪分布式部署和MapReduce案例

1、软件环境 本次涉及伪分布式部署只是要主机hadoop01,软件安装参考伪分布式部署终极篇 2、配置yarn和mapreduce 3、提交测试jar计算圆周率 job15248048138350001 job命名格式: jobunix时间...

ycwyong
2018/05/17
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Replugin借助“UI进程”来快速释放Dex

public static boolean preload(PluginInfo pi) { if (pi == null) { return false; } // 借助“UI进程”来快速释放Dex(见PluginFastInstallProviderProxy的说明) return PluginFastInsta......

Gemini-Lin
52分钟前
4
0
Hibernate 5 的模块/包(modules/artifacts)

Hibernate 的功能被拆分成一系列的模块/包(modules/artifacts),其目的是为了对依赖进行独立(模块化)。 模块名称 说明 hibernate-core 这个是 Hibernate 的主要(main (core))模块。定义...

honeymoose
今天
4
0
CSS--属性

一、溢出 当内容多,元素区域小的时候,就会产生溢出效果,默认是纵向溢出 横向溢出:在内容和容器之间再套一层容器,并且内部容器要比外部容器宽 属性:overflow/overflow-x/overflow-y 取值...

wytao1995
今天
4
0
精华帖

第一章 jQuery简介 jQuery是一个JavaScript库 jQuery具备简洁的语法和跨平台的兼容性 简化了JavaScript的操作。 在页面中引入jQuery jQuery是一个JavaScript脚本库,不需要特别的安装,只需要...

流川偑
今天
7
0
语音对话英语翻译在线翻译成中文哪个方法好用

想要进行将中文翻译成英文,或者将英文翻译成中文的操作,其实有一个非常简单的工具就能够帮助完成将语音进行翻译转换的软件。 在应用市场或者百度手机助手等各大应用渠道里面就能够找到一款...

401恶户
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部