文档章节

逐行分析Hadoop的HelloWorld

牧师-Panda
 牧师-Panda
发布于 2016/11/06 19:50
字数 2376
阅读 49
收藏 0

学写代码的时候,我们总是先从helloworld开始写起,那么学习Hadoop,我们也必不可少的从helloworld开始,那么WordCount作为经典的Hadoop程序,可以作为我们庖丁解牛的材料,进而从代码的角度学习一下mapreduce的实现过程。下面我们就开始一步步的探索。

先从源码看起,再一步步剖析

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    conf.set("mapred.job.tracker", "172.16.10.15:9001");//自己额外加的代码
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

大家可以看到整个源代码分为三个部分:

1. Map

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context) 
throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

a) 定义一个自己的Map过程,TokenizerMapper 这个类名自己设定,这个类需要继承org.apache.hadoop.mapreduce包中的Mapper类,四个参数分别表示输入键key的参数类型,输入值value的参数类型,输出键key的参数类型,输出值value的参数类型 值得注意的是Hadoop本身提供了一套可优化的网络序列化传输的基本类型,而不是用java内嵌的类型。这些类型都是在org.apache.hadoop.io包中。其中LongWritable类型相当于Long类型,Text类型相当于String类型,IntWritable相当于Integer类型。
b) map方法中参数value是指文本文件中的一行,参数key是为该行首字母相对于文本文件首地址的偏移量
c) StringTokenizer类是一个用来分隔String的应用类,类似于split。

//它的构造函数有三种:
public StringTokenizer(String str)
public StringTokenizer(String str,String delim)
public StringTokenizer(String str,String delim,boolean returnDelims)
//其中第一个参数为要分隔的String,第二个参数为分隔字符集合,第三个参数为分隔符是否作为标记返回,如果不指定分隔符,默认是'\t\n\r\f'
//它的方法主要有三种:
public boolean hasMoreTokens()//返回是否还有分隔符
public String nextToken()//返回从当前位置到下一个分隔符的字符串
public int countTokens()//返回nextToken方法被调用的次数

d) 经过StringTolenizer 处理之后会得到一个个 < word,1 > 这样的键值对,放在context里,Context用于输出内容的写入,读起来有点儿绕口,自己理解一下。

2. Reduce

public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
	private IntWritable result = new IntWritable();

	public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable val : values) {
			sum += val.get();
		}
		result.set(sum);
		context.write(key, result);
	}
}

a) 同mapper 过程一样,Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,并重写其reduce方法。 
b) reduce方法中输入参数key 指单个单词,values 指对应单词的计数值的列表 
c) reduce 方法的目的就是对列表的值进行加和处理 
d) 输出的是< key,value>,key 指单个单词,value 指对应单词的计数值的列表的值的总和。

3. Main

public static void main(String[] args) throws Exception {
	Configuration conf = new Configuration();
	conf.set("mapred.job.tracker", "172.16.10.15:9001");//自己额外加的代码
	String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	if (otherArgs.length != 2) {
		System.err.println("Usage: wordcount <in> <out>");
		System.exit(2);
	}
	Job job = new Job(conf, "word count");
	job.setJarByClass(WordCount.class);
	job.setMapperClass(TokenizerMapper.class);
	job.setCombinerClass(IntSumReducer.class);
	job.setReducerClass(IntSumReducer.class);
	job.setOutputKeyClass(Text.class);
	job.setOutputValueClass(IntWritable.class);
	FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
	FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
	System.exit(job.waitForCompletion(true) ? 0 : 1);
}

a) Configuration conf = new Configuration(); 默认情况下,Configuration开始实例化的时候,会从Hadoop的配置文件里读取参数。 
b) conf.set(“mapred.job.tracker”, “172.16.10.15:9001”);设置这句代码是由于我们要把使用eclipse提交作业到Hadoop集群,所以手动添加Job运行地址。若是直接在Hadoop 集群进行运行,不用加这句代码。 而且你可以看到只要前三句使用了这个代码,所以这三句以后的代码才是所有Hadoop例子中都会包含的。
c) 接下来这一句也是读取参数,这里是从命令行参数里读取参数。 
d) Job job = new Job(conf, “word count”); 在MapReduce处理过程中,由Job对象负责管理和运行一个计算任务,然后通过Job的若干方法来对任务的参数进行设置。”word count”是Job的名字,(当然了,根据所有java语言规范规定的那样,你也可以用

Job job = new Job();
job.setJobName("Name");

的形式做声明)。 
e) job.setJarByClass(WordCount.class);是根据WordCount类的位置设置Jar文件 。

为什么要这么做?因为我们在Hadoop集群上运行这个作业时候,要把代码打包成一个JAR文件,用以在集群上发布这个文件。Hadoop利用这个传递进去的类来查找包含它的JAR文件。
f) job.setMapperClass(TokenizerMapper.class);设置Mapper 
g) job.setCombinerClass(IntSumReducer.class);设置Combiner,这里先使用Reduce类来进行Mapper 的中间结果的合并,能够减轻网络传输的压力。 
h) job.setReducerClass(IntSumReducer.class);设置Reduce 
i) job.setOutputKeyClass(Text.class);和 job.setOutputValueClass(IntWritable.class);分别是设置输出键的类型和设置输出值的类型 
j) FileInputFormat.addInputPath(job, new Path(otherArgs[0]));设置输入文件,它是otherArgs第一个参数 
k) FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));设置输出文件,将输出结果写入这个文件里,它是otherArgs第二个参数 。

注意:在运行作业前这个输出目录不应该存在,否则Hadoop会报错并拒绝运行该作业。这种预防措施的目的是防止数据丢失(如果长时间运行的数据结果被意外覆盖,肯定是非常恼人的)
l) System.exit(job.waitForCompletion(true) ? 0 : 1);job执行,等待执行结果

4. 各个包的功能

到此为止,三大部分就分析完毕,然后再来看看引入的有哪些类:

a) package org.apache.hadoop.examples;Java 提供包机制管理代码,关键词是package, 包名字可以自己定,但不能重复。通常为了包的唯一性,推荐使用公司域名的逆序作为包,于是有了上面例子中的‘org.apache.hadoop’这样的包名。 
b) import java.io.IOException; 凡是以java开头的包,在JDK1.7的API里可以找到类的资料。这里是从java.io中引入IOException,是一个输入输出异常类。 
c) import java.util.StringTokenizer;这是从java.util包中引入的StringTokenizer类,是一个解析文本的类。具体用法上文中已提过了。 
d) import org.apache.hadoop.conf.Configuration;凡是以org.apache.hadoop开头的包,在Hadoop1.2.1 的API文档可以找到类的资料。这里是从hadoop的conf包中引入Configuration类,它是一个读写和保存配置信息的类。 
e) import org.apache.hadoop.fs.Path;  Path类保存文件或者目录的路径字符串 
f) import org.apache.hadoop.io.IntWritable;  IntWritable是一个以类表示的可序化的整数。在java中,要表示一个整数,可以使用int类型,也可以使用integer类型,integer封装了int类型,且integer类是可序化的。但Hadoop认为integer的可序化不合适,于是实现了IntWritable。 
g) import org.apache.hadoop.io.Text;  从io包中引入Text类,是一个存储字符串的可比较可序化的类。 
h) import org.apache.hadoop.mapreduce.Job;  引入Job类,Hadoop中每个需要执行的任务是一个Job,这个Job负责参数配置、设置MapReduce细节、提交到Hadoop集群、执行控制等操作。 
i) import org.apache.hadoop.mapreduce.Mapper;引入Mapper类,负责MapReduce中的Map过程。 
j) import org.apache.hadoop.mapreduce.Reducer;引入Reduce类,负责MapReduce中的Reduce过程。 
k) import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;引入FileInputFormat类,主要功能是将文件进行切片。 
l) import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;FileOutputFormat类是将输出结果写入文件。 
m) import org.apache.hadoop.util.GenericOptionsParser;这个类负责解析命令行参数。

 

从代码的功能上,我们已经对map reduce有了一个清晰的认识,那么wordcount程序具体是怎么执行的呢?

将文件file1.txt,file2.txt 上传到hdfs中的hdfsinput1文件夹里(上传的方式可以通过eclipse客户端,也可以通过Hadoop命令行),然后在eclipse上编写wordcount.java文件(也即是第一部分分析的源码) 
这里写图片描述

由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成< key,value>,这一步由MapReduce框架自动完成,其中key值为该行首字母相对于文本文件首地址的偏移量。 
这里写图片描述

将分割好的< key,value>对交给自己定义的map方法,输出新的< key,value>对。 
这里写图片描述

得到map方法输出的< key,value>对后,进行Combine操作。这里Combine 执行的是Reduce的代码。 
这里写图片描述

同样,在Reduce过程中先对输入的数据进行排序,再交由自定义的reduce方法进行处理,得到新的< key,value>对,并作为WordCount的输出结果,输出结果存放在第一张图的lxnoutputssss文件夹下的part-r-00000里。 
这里写图片描述

© 著作权归作者所有

牧师-Panda
粉丝 33
博文 146
码字总数 180044
作品 0
浦东
私信 提问
HDFS-2.7.0系列2: hadoop namenode -format脚本解析

我们都知道,在Hadoop 2.7.0中,系统刚开始执行时,需要执行一个命令就是 hadoop namenode -format 下面我们就来逐行分析下,这个命令对应的脚本的真正内涵! --- bin=bin=bin= 打印出来的b...

强子哥哥
2015/06/01
123
0
Node.js开发入门—HelloWorld再分析

在Node.js开发入门(1)我们用http模块实现了一个简单的HelloWorld网站,这次我们再来仔细分析下代码,了解更多的细节。 先看看http版本的HelloWorld代码: 代码就是这么简单: HelloWorld代码...

foruok
2015/08/05
0
0
Linux:“awk”命令的妙用

导读 awk是一个强大的文本分析工具,简单来说awk就是把文件逐行读入,(空格,制表符)为默认分隔符将每行切片,切开的部分再进行各种分析处理。 0、基本用法 awk是一个强大的文本分析工具,...

问题终结者
03/27
28
0
Hadoop(十):简单了解Hadoop数据类型,输入输出格式及用户如何自定义。

一:Hadoop内置的数据类型。 Hadoop提供如下内置的数据类型,这些数据类型都实现了WritableComparable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储,以及进行大小比较...

牧羊人Berg
2016/05/13
2.2K
0
2-Hadoop学习之旅-MapReduce

MapReduce设计理念 移动计算,而不是移动数据。 MapReduce之Helloworld(Word Count)处理过程 MapReduce的Split大小 max.split(200M) min.split(50M) block(128M) max(min.split,min(max.s......

SET
2016/09/10
119
1

没有更多内容

加载失败,请刷新页面

加载更多

32位与64位Linux系统下各类型长度对比

64 位的优点:64 位的应用程序可以直接访问 4EB 的内存和文件大小最大达到4 EB(2 的 63 次幂);可以访问大型数据库。本文介绍的是64位下C语言开发程序注意事项。 1. 32 位和 64 位C数据类型...

mskk
6分钟前
1
0
Vue 实现点击空白处隐藏某节点(三种方式:指令、普通、遮罩)

在项目中往往会有这样的需求: 弹出框(或Popover)在 show 后,点击空白处可以将其 hide。 针对此需求,整理了三种实现方式,大家按实际情况选择。 当然,我们做项目肯定会用到 UI 框架,常...

张兴华ZHero
12分钟前
1
0
SpringBoot激活profiles你知道几种方式?

多环境是最常见的配置隔离方式之一,可以根据不同的运行环境提供不同的配置信息来应对不同的业务场景,在SpringBoot内支持了多种配置隔离的方式,可以激活单个或者多个配置文件。 激活Profi...

恒宇少年
14分钟前
2
0
PDF修改文字的方法有哪些?怎么修改PDF文件中的文字

PDF修改文字一直以来都是一个难以解决的问题,很多的办公族在办公的时候会有修改PDF文件中的文字的需要,可是PDF文件一般是不能进行编辑和修改的,难道就没有什么办法解决这个问题了嘛?不要...

趣味办公社
17分钟前
1
0
企业组织中采用服务网格的挑战

作者:Christian Posta 译者:罗广明 原文:https://blog.christianposta.com/challenges-of-adopting-service-mesh-in-enterprise-organizations/ 编者按 本文作者介绍了企业组织采用服务网...

jimmysong
26分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部