文档章节

Hadoop之MapReduce理论篇02

飞鱼说编程
 飞鱼说编程
发布于 2019/01/30 20:31
字数 2776
阅读 24
收藏 0

1. ReduceTask工作机制

1.1. 设置ReduceTask 

ReduceTask 的并行度同样影响整个 job 的执行并发度和执行效率,但与 MapTask 的并发数由切片数决定不同,ReduceTask 数量的决定是可以直接手动设置:

// 默认值是1,手动设置为4
job.setNumReduceTasks(4);

1.2. 注意

(1) 如果数据分布不均匀,就有可能在 Reduce 阶段产生数据倾斜;
(2) ReduceTask 数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有 1 个 ReduceTask;
(3) 具体多少个 ReduceTask,需要根据集群性能而定;
(4) 如果分区数不是 1,但是 ReduceTask 为 1,是否执行分区过程。答案是:不执行分区过程。因为在 MapTask 的源码中,执行分区的前提是先判断 ReduceNum 个数是否大于 1。不大于 1 肯定不执行。

1.3. 实验:测试ReduceTask多少合适

(1) 实验环境:1 个 master 节点,16 个 slave 节点: CPU:8GHZ , 内存: 2G,MapTask 数量为 16,测试数据量为 1G;
(2) 实验结论:

Reduce task

1

5

10

15

16

20

25

30

45

60

总时间

892

146

110

92

88

100

128

101

145

104

1.4. ReduceTask工作机制

(1) Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中;
(2) Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多;
(3) Sort 阶段:按照 MapReduce 语义,用户编写 reduce() 函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可;
(4) Reduce 阶段:reduce() 函数将计算结果写到 HDFS 上。

2. 自定义OutputFormat

2.1. 基本概念

要在一个 mapreduce 程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义 outputformat 来实现:
(1) 自定义 outputformat;
(2) 改写 recordwriter,具体改写输出数据的方法 write();

2.2. 案例实操

2.2.1. 需求

使用自定义 OutputFormat 实现过滤日志及自定义日志输出路径:
过滤输入的 log 日志中是否包含 bigdata
    (1)包含 bigdata 的日志输出到 e:/bigdata.log
    (2)不包含 bigdata 的日志输出到 e:/other.log

2.2.2. 代码实现

(1) 自定义一个 outputformat

package com.test.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{

	@Override
	public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
			throws IOException, InterruptedException {

		// 创建一个RecordWriter
		return new FilterRecordWriter(job);
	}
}

(2) 具体的写数据 RecordWriter

package com.test.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {
	FSDataOutputStream bigdataOut = null;
	FSDataOutputStream otherOut = null;

	public FilterRecordWriter(TaskAttemptContext job) {
		// 1 获取文件系统
		FileSystem fs;

		try {
			fs = FileSystem.get(job.getConfiguration());

			// 2 创建输出文件路径
			Path bigdataPath = new Path("e:/bigdata.log");
			Path otherPath = new Path("e:/other.log");

			// 3 创建输出流
			bigdataOut = fs.create(bigdataPath);
			otherOut = fs.create(otherPath);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void write(Text key, NullWritable value) throws IOException, InterruptedException {

		// 判断是否包含“bigdata”输出到不同文件
		if (key.toString().contains("bigdata")) {
			bigdataOut.write(key.toString().getBytes());
		} else {
			otherOut.write(key.toString().getBytes());
		}
	}

	@Override
	public void close(TaskAttemptContext context) throws IOException, InterruptedException {
		// 关闭资源
		if (bigdataOut!= null) {
			bigdataOut.close();
		}
		
		if (otherOut != null) {
			otherOut.close();
		}
	}
}

(3) 编写 FilterMapper

package com.test.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable>{
	
	Text k = new Text();
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		// 1 获取一行
		String line = value.toString();
		
		k.set(line);
		
		// 3 写出
		context.write(k, NullWritable.get());
	}
}

(4) 编写 FilterReducer

package com.test.mapreduce.outputformat;
import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class FilterReducer extends Reducer<Text, NullWritable, Text, NullWritable> {

	@Override
	protected void reduce(Text key, Iterable<NullWritable> values, Context context)
			throws IOException, InterruptedException {

		String k = key.toString();
		k = k + "\r\n";

		context.write(new Text(k), NullWritable.get());
	}
}

(5) 编写 FilterDriver

package com.test.mapreduce.outputformat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class FilterDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();

		Job job = Job.getInstance(conf);

		job.setJarByClass(FilterDriver.class);
		job.setMapperClass(FilterMapper.class);
		job.setReducerClass(FilterReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(NullWritable.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(NullWritable.class);

		// 要将自定义的输出格式组件设置到job中
		job.setOutputFormatClass(FilterOutputFormat.class);

		FileInputFormat.setInputPaths(job, new Path(args[0]));

		// 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
		// 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
		FileOutputFormat.setOutputPath(job, new Path(args[1]));

		boolean result = job.waitForCompletion(true);
		System.exit(result ? 0 : 1);
	}
}

3. MapReduce数据压缩

3.1. 基本概念 

      压缩技术能够有效减少底层存储系统 (HDFS) 读写字节数。压缩提高了网络带宽和磁盘空间的效率。在 Hadood 下,尤其是数据规模很大和工作负载密集的情况下,使用数据压缩显得非常重要。在这种情况下, I/O 操作和网络数据传输要花大量的时间。还有,Shuffle 与 Merge 过程同样也面临着巨大的 I/O 压力。
      鉴于磁盘 I/O 和网络带宽是 Hadoop 的宝贵资源,数据压缩对于节省资源、最小化磁盘 I/O 和网络传输非常有帮助。不过,尽管压缩与解压操作的 CPU 开销不高,其性能的提升和资源的节省并非没有代价。
      如果磁盘 I/O 和网络带宽影响了 MapReduce 作业性能,在任意 MapReduce 阶段启用压缩都可以改善端到端处理时间并减少 I/O 和网络流量。
      压缩 MapReduce 的一种优化策略:通过压缩编码对 Mapper 或者 Reducer 的输出进行压缩,以减少磁盘 IO,提高MR程序运行速度(但相应增加了 CPU 运算负担)
      注意:压缩特性运用得当能提高性能,但运用不当也可能降低性能
      基本原则:
(1) 运算密集型的 job,少用压缩
(2) IO 密集型的 job,多用压缩

3.2. MR 支持的压缩编码

压缩格式

工具

算法

文件扩展名

是否可切分

DEFAULT

DEFAULT

.deflate

Gzip

gzip

DEFAULT

.gz

bzip2

bzip2

bzip2

.bz2

LZO

lzop

LZO

.lzo

LZ4

LZ4

.lz4

Snappy

Snappy

.snappy

为了支持多种压缩/解压缩算法,Hadoop 引入了编码/解码类,如下表所示:

压缩格式

对应的编码/解码类

DEFLATE

org.apache.hadoop.io.compress.DefaultCodec

gzip

org.apache.hadoop.io.compress.GzipCodec

bzip2

org.apache.hadoop.io.compress.BZip2Codec

LZO

com.hadoop.compression.lzo.LzopCodec

LZ4

org.apache.hadoop.io.compress.Lz4Codec

Snappy

org.apache.hadoop.io.compress.SnappyCodec

压缩性能的比较:

压缩算法

原始文件大小

压缩文件大小

压缩速度

解压速度

gzip

8.3GB

1.8GB

17.5MB/s

58MB/s

bzip2

8.3GB

1.1GB

2.4MB/s

9.5MB/s

LZO-bset

8.3GB

2GB

4MB/s

60.6MB/s

LZO

8.3GB

2.9GB

49.3MB/s

74.6MB/s

3.3. 采用压缩的位置

(1) 输入压缩:
      在有大量数据并计划重复处理的情况下,应该考虑对输入进行压缩。然而,你无须显示指定使用的编解码方式。Hadoop 自动检查文件扩展名,如果扩展名能够匹配,就会用恰当的编解码方式对文件进行压缩和解压。否则,Hadoop 就不会使用任何编解码。
(2) 压缩 mapper 输出:
当 map 任务输出的中间数据量很大时,应考虑在此阶段采用压缩技术。这能显著改善内部数据 Shuffle 过程,而 Shuffle 过程在 Hadoop 处理过程中是资源消耗最多的环节。如果发现数据量大造成网络传输缓慢,应该考虑使用压缩技术。可用于压缩 mapper 输出的快速编解码包括 LZO、LZ4 或者 Snappy。
      注:LZO 是供 Hadoop 压缩数据用的通用压缩编解码。其设计目标是达到与硬盘读取速度相当的压缩速度,因此速度是优先考虑的因素,而不是压缩率。与 gzip 编解码相比,它的压缩速度是 gzip 的 5 倍,而解压速度是 gzip 的 2 倍。同一个文件用 LZO 压缩后比用 gzip 压缩后大 50%,但比压缩前小 25%~50%。这对改善性能非常有利,map 阶段完成时间快4倍。
(3) 压缩 reducer 输出:
      在此阶段启用压缩技术能够减少要存储的数据量,因此降低所需的磁盘空间。当 mapreduce 作业形成作业链条时,因为第二个作业的输入也已压缩,所以启用压缩同样有效。

4. Yarn资源管理

4.1. 基本概念

Yarn 是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而 mapreduce 等运算程序则相当于运行于操作系统之上的应用程序。

4.2. Yarn 的重要概念

(1) Yarn 并不清楚用户提交的程序的运行机制;
(2) Yarn 只提供运算资源的调度 (用户程序向 Yarn 申请资源,Yarn 就负责分配资源);
(3) Yarn 中的主管角色叫 ResourceManager;
(4) Yarn 中具体提供运算资源的角色叫NodeManager;
(5) 这样一来,Yarn 其实就与运行的用户程序完全解耦,就意味着 Yarn 上可以运行各种类型的分布式运算程序 (mapreduce 只是其中的一种),比如 mapreduce、storm 程序,spark程序等;
(6) 所以,spark、storm 等运算框架都可以整合在 Yarn 上运行,只要他们各自的框架中有符合 Yarn 规范的资源请求机制即可;
(7) Yarn 就成为一个通用的资源调度平台,从此,企业中以前存在的各种运算集群都可以整合在一个物理集群上,提高资源利用率,方便数据共享。

4.3. Yarn 的工作机制

(0) Mr 程序提交到客户端所在的节点;
(1) YarnRunner 向 Resourcemanager 申请一个 application;
(2) RM 将该应用程序的资源路径返回给 YarnRunner;
(3) 该程序将运行所需资源提交到 HDFS 上;
(4) 程序资源提交完毕后,申请运行 MRAppMaster;
(5) RM 将用户的请求初始化成一个 Task;
(6) 其中一个 NodeManager 领取到 Task 任务;
(7) 该 NodeManager 创建容器 Container,并产生 MRAppmaster;
(8) Container 从 HDFS 上拷贝资源到本地;
(9) MRAppmaster 向 RM 申请运行 MapTask 容器;
(10) RM 将运行 MaptTask 任务分配给另外两个 NodeManager,另两个 NodeManager 分别领取任务并创建容器;
(11) MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个 NodeManager 分别启动 MapTask,MapTask 对数据分区排序;
(12) MRAppmaster 向 RM 申请 2 个容器,运行 ReduceTask;
(13) ReduceTask 向 MapTask 获取相应分区的数据;
(14) 程序运行完毕后,MR 会向 RM 注销自己。
 

本文为原创文章,如果对你有一点点的帮助,别忘了点赞哦!比心!如需转载,请注明出处,谢谢!

 

© 著作权归作者所有

飞鱼说编程

飞鱼说编程

粉丝 195
博文 366
码字总数 685323
作品 0
深圳
程序员
私信 提问
国内第一篇详细讲解hadoop2的automatic HA+Federation+Yarn配置的教程

前言 hadoop是分布式系统,运行在linux之上,配置起来相对复杂。对于hadoop1,很多同学就因为不能搭建正确的运行环境,导致学习兴趣锐减。不过,我有免费的学习视频下载,请点击这里。 hado...

吴超沉思录
2014/02/12
1K
5
如何分布式运行mapreduce程序

如何分布式运行mapreduce程序 一、 首先要知道此前提 若在windows的Eclipse工程中直接启动mapreduc程序,需要先把hadoop集群的配置目录下的xml都拷贝到src目录下,让程序自动读取集群的地址后...

Zero零_度
2015/09/06
259
0
大数据教程(8.1)mapreduce核心思想

上一章介绍了hadoop的HDFS文件系统的原理及API使用。本章博主将继续对hadoop的mapreduce编程框架进行分享。 mapreduce原理篇 mapreduce是一个分布式运算程序的编程框架,是用户开发“基于had...

em_aaron
2018/11/19
124
0
【电子书】Hadoop实战手册 (样章第一章)

Hadoop实战手册 [美] Jonathan R. Owens,Jon Lentz,Brian Femiano 著; 傅杰,赵磊,卢学裕 译 内容简介   这是一本Hadoop实用手册,主要针对实际问题给出相应的解决方案。《Hadoop实战手...

dwf07223
2018/06/28
0
0
MapReduce初探之一~~基于Mongodb实现标签统计

MapReduce 是一种编程模型,是 Google 提出的一种软件架构,主要应用于分布式系统上。Google对其原始的定义是“ MapReduce is a framework for computing certain kinds of distributable pr...

zhiweiofli
2013/03/06
2.3K
5

没有更多内容

加载失败,请刷新页面

加载更多

CentOS-启用SFTP

创建用户组及用户 $ groupadd sftp $ useradd -g sftp -s /sbin/nologin -d /home/sftp sftp 设置密码 $ passwd sftp 输入密码(123456) 确认密码 修改sshd_config文件 $ vim /etc/ssh/sshd_......

自由人生-ZYRS
14分钟前
11
0
这个IM项目没时间搞了,开源算了。10万并发,基于golang。

先上效果 安装方法 本系统升级到golang1.12,请开启如下支持 #开启go mod支持export GO111MODULE=on#使用代理export GOPROXY=https://goproxy.io 1.下载项目 git clone https://github.c...

非正式解决方案
18分钟前
6
0
Mysql基本操作

查看mysql中已经有的数据库 二、删除已经有的数据库school 三、创建新数据库myschool 四、进入到myschool中 五、查看myschool库中所有的表 六、新建一张student表 七、查看student表结构 八、...

愚蠢的土豆
18分钟前
8
0
经典检索算法:BM25

BM25算法是一种常见用来做相关度打分的公式 思路比较简单,主要就是计算一个query里面所有词和文档的相关度, 然后在把分数做累加操作 而每个词的相关度分数主要还是受到tf/idf的影响 其实就...

Java搬砖工程师
25分钟前
5
0
详解mycat+haproxy+keepalived搭建高可用负载均衡mysql集群

概述 目前业界对数据库性能优化普遍采用集群方式,而oracle集群软硬件投入昂贵,mysql则比较推荐用mycat去搭建数据库集群,下面介绍一下怎么用mycat+haproxy+keepalived搭建一个属于mysql数据...

小致Daddy
26分钟前
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部