文档章节

hadoop 里执行 MapReduce 任务的几种常见方式

大数据之路
 大数据之路
发布于 2012/10/14 22:39
字数 1331
阅读 10174
收藏 11

说明:

测试文件:

echo -e "aa\tbb \tcc\nbb\tcc\tdd" > 3.txt
hadoop fs -put 3.txt /tmp/3.txt

全文的例子均以该文件做测试用例,统计单词出现的次数(WordCount)。

1、原生态的方式:java 源码编译打包成jar包后,由 hadoop 脚本调度执行,举例:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

	public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
		/**
		 * LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,
		 * 这些类实现了WritableComparable接口, 都能够被串行化从而便于在分布式环境中进行数据交换,
		 * 你可以将它们分别视为long,int,String 的替代品。
		 */
		// IntWritable one 相当于 java 原生类型 int 1
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text();

		public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
			// 每行记录都会调用 map 方法处理,此处是每行都被分词
			StringTokenizer itr = new StringTokenizer(value.toString());
			while (itr.hasMoreTokens()) {
				word.set(itr.nextToken());
				// 输出每个词及其出现的次数 1,类似 <word1,1><word2,1><word1,1>
				context.write(word, one);
			}
		}
	}

	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException,
				InterruptedException {
			// key 相同的键值对会被分发到同一个 reduce中处理
			// 例如 <word1,<1,1>>在 reduce1 中处理,而<word2,<1>> 会在 reduce2 中处理
			int sum = 0;
			// 相同的key(单词)的出现次数会被 sum 累加
			for (IntWritable val : values) {
				sum += val.get();
			}
			result.set(sum);
			// 1个 reduce 处理完1 个键值对后,会输出其 key(单词)对应的结果(出现次数)
			context.write(key, result);
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		// 多队列hadoop集群中,设置使用的队列
		conf.set("mapred.job.queue.name", "regular");
		// 之所以此处不直接用 argv[1] 这样的,是为了排除掉运行时的集群属性参数,例如队列参数,
		// 得到用户输入的纯参数,如路径信息等
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		for (String argsStr : otherArgs) {
			System.out.println("-->> " + argsStr);
		}
		if (otherArgs.length < 2) {
			System.err.println("Usage: wordcount <in> <out>");
			System.exit(2);
		}
		Job job = new Job(conf, "word count");
		job.setJarByClass(WordCount.class);
		// map、reduce 输入输出类
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		// 输入输出路径
		FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
		// 多子job的类中,可以保证各个子job串行执行
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

执行:

bin/hadoop jar /tmp/wordcount.jar WordCount /tmp/3.txt /tmp/5

结果:

hadoop fs -cat /tmp/5/*
aa      1
bb      2
cc      2
dd      1

参考资料:

Hadoop - Map/Reduce 通过WordCount例子的变化来了解新版hadoop接口的变化

http://blog.csdn.net/derekjiang/article/details/6836209

Hadoop示例程序WordCount运行及详解

http://samuschen.iteye.com/blog/763940

官方的 wordcount v1.0 例子

http://hadoop.apache.org/docs/r1.1.1/mapred_tutorial.html#Example%3A+WordCount+v1.0

2、基于 MR 的数据流 Like SQL 脚本开发语言:pig
A1 = load '/data/3.txt';
A = stream A1 through `sed "s/\t/ /g"`;
B = foreach A generate flatten(TOKENIZE((chararray)$0)) as word;
C = filter B by word matches '\\w+';
D = group C by word;
E = foreach D generate COUNT(C), group;
dump E;

注意:不同分隔符对load及后面的$0的影响。

详情请见:
https://gist.github.com/186460
http://www.slideshare.net/erikeldridge/a-brief-handson-introduction-to-hadoop-pig

3、构建数据仓库的类 SQL 开发语言:hive
create table textlines(text string);
load data inpath '/data/3.txt' overwrite into table textlines;
SELECT wordColumn, count(1) FROM textlines LATERAL VIEW explode(split(text,'\t+')) wordTable AS wordColumn GROUP BY wordColumn;

详情请见:

http://my.oschina.net/leejun2005/blog/83045
http://blog.csdn.net/techdo/article/details/7433222

4、跨平台的脚本语言:python
map:
#!/usr/bin/python
import os,re,sys
for line in sys.stdin:
	for i in line.strip().split("\t"):
		print i

reduce:

#!/usr/bin/python
import os,re,sys
arr = {}
for words in sys.stdin:
	word = words.strip()
	if word not in arr:
		arr[word] = 1
	else:
		arr[word] += 1
for k, v in arr.items():
	print str(k) + ": " + str(v)

最后在shell下执行:

hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.203.0.jar -file map.py -file reduce.py  -mapper map.py -reducer reduce.py -input /data/3.txt -output /data/py

注意:脚本开头需要显示指定何种解释器以及赋予脚本执行权限

详情请见:
http://blog.csdn.net/jiedushi/article/details/7390015

5、Linux 下的瑞士军刀:shell 脚本
map:
#!/bin/bash
tr '\t' '\n'

reduce:

#!/bin/bash
sort|uniq -c

最后在shell下执行:

june@deepin:~/hadoop/hadoop-0.20.203.0/tmp>
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming-0.20.203.0.jar -file map.py -file reduce.py  -mapper map.py -reducer reduce.py -input /data/3.txt -output /data/py
packageJobJar: [map.py, reduce.py, /home/june/data_hadoop/tmp/hadoop-unjar2676221286002400849/] [] /tmp/streamjob8722854685251202950.jar tmpDir=null
12/10/14 21:57:00 INFO mapred.FileInputFormat: Total input paths to process : 1
12/10/14 21:57:00 INFO streaming.StreamJob: getLocalDirs(): [/home/june/data_hadoop/tmp/mapred/local]
12/10/14 21:57:00 INFO streaming.StreamJob: Running job: job_201210141552_0041
12/10/14 21:57:00 INFO streaming.StreamJob: To kill this job, run:
12/10/14 21:57:00 INFO streaming.StreamJob: /home/june/hadoop/hadoop-0.20.203.0/bin/../bin/hadoop job  -Dmapred.job.tracker=localhost:9001 -kill job_201210141552_0041
12/10/14 21:57:00 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201210141552_0041
12/10/14 21:57:01 INFO streaming.StreamJob:  map 0%  reduce 0%
12/10/14 21:57:13 INFO streaming.StreamJob:  map 67%  reduce 0%
12/10/14 21:57:19 INFO streaming.StreamJob:  map 100%  reduce 0%
12/10/14 21:57:22 INFO streaming.StreamJob:  map 100%  reduce 22%
12/10/14 21:57:31 INFO streaming.StreamJob:  map 100%  reduce 100%
12/10/14 21:57:37 INFO streaming.StreamJob: Job complete: job_201210141552_0041
12/10/14 21:57:37 INFO streaming.StreamJob: Output: /data/py
june@deepin:~/hadoop/hadoop-0.20.203.0/tmp>
hadoop fs -cat /data/py/part-00000
      1 aa	
      1 bb 	
      1 bb	
      2 cc	
      1 dd	
june@deepin:~/hadoop/hadoop-0.20.203.0/tmp>


特别提示:上述有些方法对字段后的空格忽略或计算,请注意仔细甄别。


说明:列举了上述几种方法主要是给大家一个不同的思路,
在解决问题的过程中,开发效率、执行效率都是我们需要考虑的,不要太局限某一种方法了。

© 著作权归作者所有

大数据之路
粉丝 1605
博文 514
码字总数 333882
作品 0
武汉
架构师
私信 提问
Hadoop实战之 MapReduce

私塾在线 整体课程概览 第一部分:开始云计算之旅 第二部分:初识Hadoop 第三部分:Hadoop 环境安装部署 第四部分:Hadoop Shell 基本操作介绍 第五部分:Hadoop 分布式文件系统1 第五部分:...

linni
2014/01/08
737
0
大数据(hadoop-Mapreduce原理架构)

课程目标: 1:MapReduce的应用场景 2:MapReduce编程模型 3:MapReduce的架构 4:常见MapReduce应用场景 5:总结 MapReduce的定义 源自于Google的MapReduce论文 发表于2004年12月 Hadoop M...

这很耳东先生
04/30
31
0
Hadoop 版本 生态圈 MapReduce模型

一 Hadoop版本 和 生态圈 1. Hadoop版本 (1) Apache Hadoop版本介绍 Apache的开源项目开发流程 : -- 主干分支 : 新功能都是在 主干分支(trunk)上开发; -- 特性独有分支 : 很多新特性稳定性很...

日拱一卒
2014/05/17
71
0
2014-11-12--Hadoop的基础学习(三)--Hadoop中MapReduce框架入门

1.MapReduce的简单概念 百度百科:MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",和他们的主要思想,都是从函数式编程语言里借来的...

查封炉台
2014/11/16
8.3K
8
【Hadoop】- MapReduce 框架详细介绍

MapReduce 简介 说明: 通过由普通机器组成的集群对大量数据集进行并行处理可依靠的容错软件框架。 MapReduce作业可以将数据集分割为Map任务并行处理的数据块,框架对对Map过程产生的数据进行...

ZeroneLove
02/24
25
0

没有更多内容

加载失败,请刷新页面

加载更多

最简单的获取相机拍照的图片

  import android.content.Intent;import android.graphics.Bitmap;import android.os.Bundle;import android.os.Environment;import android.provider.MediaStore;import andr......

MrLins
58分钟前
4
0
说好不哭!数据可视化深度干货,前端开发下一个涨薪点在这里~

随着互联网在各行各业的影响不断深入,数据规模越来越大,各企业也越来越重视数据的价值。作为一家专业的数据智能公司,个推从消息推送服务起家,经过多年的持续耕耘,积累沉淀了海量数据,在...

个推
今天
8
0
第三方支付-返回与回调注意事项

不管是支付宝,微信,还是其它第三方支付,第四方支付,支付机构服务商只要涉及到钱的交易都要进行如下校验,全部成功了才视为成功订单 1.http请求是否成功 2.校验商户号 3.校验订单号及状态...

Shingfi
今天
4
0
简述Java内存分配和回收策略以及Minor GC 和 Major GC(Full GC)

内存分配: 1. 栈区:栈可分为Java虚拟机和本地方法栈 2. 堆区:堆被所有线程共享,在虚拟机启动时创建,是唯一的目的是存放对象实例,是gc的主要区域。通常可分为两个区块年轻代和年老代。更...

DustinChan
今天
6
0
Excel插入批注:可在批注插入文字、形状、图片

1.批注一直显示:审阅选项卡-------->勾选显示批注选项: 2.插入批注快捷键:Shift+F2 组合键 3.在批注中插入图片:鼠标右键点击批注框的小圆点【重点不可以在批注文本框内点击】----->调出批...

东方墨天
今天
6
1

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部