文档章节

Hadoop实战读书笔记(6)

祥林会跟你远走高飞
 祥林会跟你远走高飞
发布于 2014/12/08 15:03
字数 2001
阅读 54
收藏 3

putmerge程序的大体流程是?

1、根据用户定义的参数设置本地目录和HDFS的目录文件

2、提取本地输入目录中每个文件的信息

3、创建一个输出流写入到HDF文件

4、遍历本地目录中的每个文件,打开一个输入流来读取该文件,剩下就是一个标准的Java文件复制过程了

具体程序如下:

public static void main(String[] args) throws IOException {

       Configuration conf = new Configuration();

       FileSystem hdfs = FileSystem.get(conf);

       FileSystem local = FieSystem.getLocal(conf);

       // 设定输入目录与输出文件

       Path inputDir = new Path(args[0]);

       Path hdfsFile = new Path(args[1]);

 

       try {

              // 得到本地文件列表

              FileStatus[] inputFiles = local.listStatus(inputDir);

              // 生成HDFS输出流

              FSDataOutputStream out = hdfs.create(hdfsFile);

 

              for (int i = 0; i < inputFiles.length; i++) {

                     System.out.println(inputFiles[i].getPath().getName());

                     // 打开本地输入流

                     FSDataInputStream in = local.open(inputFiles[i].getPath());

                     byte buffer[] = new byte[256];

                     int bytesRead = 0;

                     while ( (bytesRead = in.read(buffer)) > 0) {

                            out.write(buffer, 0, bytesRead);

                     }

                     in.close();

              }

              out.close();

       } catch (IOException e) {

              e.printStackTrace();

       }

}

 

那么现在有数据了,还要对它进行处理、分析以及做其他的操作。

MapReduce程序通过操作键/值对来处理数据,一般形式为

map: (K1, V1) -> list(K2, V2)

reduce:(K2, list(V2)) -> list(K3, V3)

 

Hadoop数据类型有哪些?

MapReduce框架并不允许它们是任意的类。

虽然我们经常把某些键与值称为整数、字符串等,但它们实际上并不是IntegerString等哪些标准的Java类。为了让键/值对可以在集群上移动,MapReduce框架提供了一种序列化键/值对的方法。因此,只有那些支持这种序列化的类能够在这个框架中充当键或者值。

 

更具体的Hadoop类型说明

实现Writable接口的类可以是值

而实现WritableComparable<T>接口的类既可以是键也可以是值

注意WritableComparable<T>接口是Writablejava.lang.Comparable<T>接口的组合,对于键而言,我们需要这个比较,因为它们将在Reduce阶段进行排序,而值仅会被简单地传递。

 

/值对经常使用的数据类型列表,这些类均实现WritableComparable接口

描述

BooleanWritable

标准布尔变量的封装

ByteWritable

单字节数的封装

DoubleWritable

双字节数的封装

FloatWritable

浮点数的封装

IntWritable

整数的封装

LongWritable

Long的封装

Text

使用UTF-8格式的文本封装

NullWritable

无键值时的站位符

 

如何自定义数据类型?

只要它实现了Writable(WritableComparable<T>)接口。

 

定义一个Edge类型用于表示一个网络的边界

public class Edge implements WritableComparable<Efge> {

       private String departureNode;

       private String arrivalNode;

 

       public String getDepartureNode() {

              return departureNode;

       }

       // 说明如何读入数据

       @Override

       public void readFields(DataInput in) throws IOException {

              departureNode = in.readUTF();

              arrivalNode = in.readUTF();

       }

       // 说明如何写出数据

       @Override

       public void write(DataOutput out) throws IOException {

              out.writeUTF(departureNode);

              out.writeUTF(arrivalNode);

       }

       // 定义数据排序

       @Override

       public int compareTo(Edge o) {

              return (departureNode.compareTo(o.departureNode) != 0)

                     ? departureNode.compareTo(o.departureNode)

                     : arrivalNode.compareTo(o.arrivalNode);

       }

}

 

Mapper类是什么?

一个类要作为mapper,需继承MapReduceBase基类并实现Mapper接口。

mapperreducer的基类均为MapReduceBase

其中包含一些函数或方法:

1void configure(JobConf job),该函数提取XML配置文件或者应用程序主类中的参数,在数据处理之前调用该函数。

2void close(),作为map任务结束前的最后一个操作,该函数完成所有的结尾工作,如关闭数据库连接、打开文件等。

Mapper接口负责数据处理阶段,它采用Mapper<K1, V1, K2, V2>Java泛型,这里键类和值类分别实现WritableComparableWritable接口。

Mapper类只有是一个方法-map,用于处理一个单独的键/值对。

void map (K1 key,

              V1 value,

              OutputCollector<K2, V2> output,

              Reporter reporter

              ) throws IOException

 

上面这个map函数的参数都是什么意思?

该函数处理一个给定的键/值对 (K1, V1),生成一个键/值对 (K2, V2) 的列表 (该列表页可能为空)

OutputCollector接收这个映射过程的输出

Reporter可提供对mapper相关附加信息的记录

 

Hadoop提供了一些有用的mapper实现,这些实现是?

描述

IdentityMapper<K, V>

实现Mapper<K,   V, K, V>, 将输入直接映射到输出

InverseMapper<K, V>

实现Mapper<K,   V, V, K> 反转键/值对

RegexMapper<K>

实现Mapper<K,   Text, Text, LongWritable>, 为每个常规表达式的匹配项生成一个   match 1  

TokenCountMapper<K>

实现Mapper<K,   Text, Text, LongWritable>, 当输入的值为分词时,   生成一个(token 1  

 

Reducer是什么?

一个类要作为reducer,需继承MapReduceBase基类并实现Reducer接口。

以便于允许配置和清理。

此外,它还必须实现Reducer接口使其具有如下的单一方法:

void reduce (K2 key,

                     Iterator<V2> values,

                     OutputCollector<K3, V3> output,

                     Reporter reporter

                     ) throws IOException

reducer任务接收来自各个mapper的输出时,它按照键/值对中的键对输入数据进行排序,并将相同键的值归并。然后调用reduce()函数,并通过迭代处理哪些与指定键相关联的值,生成一个 (可能为空的) 列表 K3, V3

OutputCollector接收reduce阶段的输出,并写入输出文件

Reporter可提供对reducer相关附加信息的记录,形成任务进度

 

一些非常有用的由Hadoop预定义的Reducer实现

描述

IdentityReducer<K, V>

实现Reducer<K,   V, K, V>, 将输入直接映射到输出

LongSumReducer<K>

实现<K,   LongWritable, K, LongWritable>   计算与给定键相对应的所有值的和

 

注:虽然我们将Hadoop程序称为MapReduce应用,但是在mapreduce两个阶段之间还有一个极其重要的步骤:将mapper的结果输出给不同的reducer。这就是partitioner的工作。

 

初次使用MapReduce的程序员通常有一个误解?

仅需要一个reducer 采用单一的reducer可以在处理之前对所有的数据进行排序。

No,采用单一的reducer忽略了并行计算的好处。

那么就应该使用多个reducer是么?但需要解决一个问题,如何确定mapper应该把键/值对输出给谁。

默认的作法是对键进行散列来确定reducerHadoop通过HashPartitioner类强制执行这个策略。但有时HashPartitioner会出错。

 

HashPartitioner会出什么错?

假如你使用Edge类来分析航班信息来决定从各个机场离港的乘客数目,这些数据可能是:

(San Francisco, Los Angeles) Chuck Lam

(San Francisco, Dallas) James Warren

如果你使用HashPartitioner,这两行可以被送到不同的reducer 离港的乘客数目被处理两次并且两次都是错误的

 

如何为你的应用量身定制partitioner呢?

上面的情况,我希望具有相同离港地的所有edge被送往相同的reducer,怎么做呢?只要对Edge类的departureNode成员进行散列就可以了:

public class EdgePartitioner implements Partitioner<Edge, Writable> {

       @Override

       public int getPartition (Edge key, Writable value, int numPartitions) {

              return key.getDepartureNode().hashCode() % numPartitions;

       }

       @Override

       public void configure(JobConf conf) { }

}

一个定制的partitioner只需要实现configure()getPartition()两个函数,前者将Hadoop对作业的配置应用在patitioner上,而后者返回一个介于0reduce任务数之间的整数,指向键/值对将要发送的reducer

 

Combiner:本地reduce

在许多MapReduce应用场景中,我们不妨在分发mapper结果之前做一下 "本地Reduce"。再考虑一下WordCount的例子,如果作业处理的文件中单词 "the" 出现了574次,存储并洗牌一次 ("the", 574) /值对比许多次 ("the", 1) 更为高效。这种处理步骤被称为合并。

 

预定义mapperReducer类的单词计数

public class WordCount {

       public static void main (String[] args) {

              JobClient client = new JobClient();

              JobConf conf = new JobConf(WordCount.class);

 

              FileInputFormat.addInputPath(conf, new Path(args[0]));

              FileOutputFormat.setOutputPath(conf, new Path(args[1]));

      

              conf.setOutputKeyClass(Text.class);

              conf.setOutputValueClass(LongWritable.class);

              conf.setMapperClass(TokenCountMapper.class); // Hadoop自己的TokenCountMapper

              conf.setCombinerClass(LongSumReducer.class);

              conf.setReducerClass(LongSumReduver.class); // Hadoop自己的LongSumReducer

              client.setConf(conf);

              try {

                     JobClient.runJob(conf);

              } catch (Exception e) {

                     e.printStackTrace();

              }

       }

}

使用Hadoop预定义的类TokenCountMapperLongSumReducer,编写MapReduce分厂的容易,Hadoop也支持生成更复杂的程序,这里只是强调Hadoop允许你通过最小的代码量快速生成实用的程序。


© 著作权归作者所有

共有 人打赏支持
祥林会跟你远走高飞
粉丝 26
博文 50
码字总数 98029
作品 0
昌平
程序员
Hadoop实战读书笔记(5)

HDFS文件操作 你可以把一个大数据集(100TB)在HDFS中存储为单个文件,而大多数其他的文件系统无力实现这一点。虽然该文件存在多个副本分布在多台机器上来支持并行处理,你也不必考虑这些细节...

祥林会跟你远走高飞
2014/12/08
0
0
Hadoop实战读书笔记(8)

什么是开发数据集? 一个流行的开发策略是为生产环境中的大数据集建立一个较小的、抽样的数据子集,称为开发数据集。这个开发数据集可能只有几百兆字节。当你以单机或者伪分布式模式编写程序...

祥林会跟你远走高飞
2014/12/08
0
0
敏捷教练成长记:漫漫长路第三周

看到跆拳道的软文,讲到: 学跆拳道的正确顺序: 第一阶段:有兴趣 第二阶段:没兴趣 第三阶段:逼练习 第四阶段:成习惯 第五阶段:有兴趣 第六阶段:真热爱 大部分家长在孩子第二阶段时放弃...

转型实践者
2017/11/17
0
0
2017年年终总结

前言 不知不觉,2017年又接近尾声了,又到了该写年终总结的时候了,往年这个时候都会熙熙攘攘,各大平台提早预热过年的气氛,而今年显得格外的平静,这可能正如我的现在的心境,波澜而不惊!...

韩俊强
01/03
0
0
Hadoop实战读书笔记(7)

输入数据概要 输入数据通常驻留在较大的文件中,通常几十或者数百GB,甚至更大。MapReduce处理的基本原则之一是将输入数据分割成块。这些块可以在多台计算机上并行处理,在Hadoop的术语中这些...

祥林会跟你远走高飞
2014/12/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

20180920 rzsz传输文件、用户和用户组相关配置文件与管理

利用rz、sz实现Linux与Windows互传文件 [root@centos01 ~]# yum install -y lrzsz # 安装工具sz test.txt # 弹出对话框,传递到选择的路径下rz # 回车后,会从对话框中选择对应的文件传递...

野雪球
今天
2
0
OSChina 周四乱弹 —— 毒蛇当辣条

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @ 达尔文:分享花澤香菜/前野智昭/小野大輔/井上喜久子的单曲《ミッション! 健?康?第?イチ》 《ミッション! 健?康?第?イチ》- 花澤香菜/前野智...

小小编辑
今天
9
3
java -jar运行内存设置

java -Xms64m #JVM启动时的初始堆大小 -Xmx128m #最大堆大小 -Xmn64m #年轻代的大小,其余的空间是老年代 -XX:MaxMetaspaceSize=128m # -XX:CompressedClassSpaceSize=6...

李玉长
今天
4
0
Spring | 手把手教你SSM最优雅的整合方式

HEY 本节主要内容为:基于Spring从0到1搭建一个web工程,适合初学者,Java初级开发者。欢迎与我交流。 MODULE 新建一个Maven工程。 不论你是什么工具,选这个就可以了,然后next,直至finis...

冯文议
今天
2
0
RxJS的另外四种实现方式(四)——性能最高的库(续)

接上一篇RxJS的另外四种实现方式(三)——性能最高的库 上一篇文章我展示了这个最高性能库的实现方法。下面我介绍一下这个性能提升的秘密。 首先,为了弄清楚Most库究竟为何如此快,我必须借...

一个灰
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部