文档章节

hadoop2 自定义OutputFormat场景杂记

山哥
 山哥
发布于 2016/09/28 09:16
字数 750
阅读 59
收藏 1

提示:以下代码都是在 Hadoop2.7.x 最新API下进行。

场景1:自定义输出文件名前缀

示例:计算学生的平均分成绩,输出:学生姓名和平均分成绩;要求:根据成绩的范围(0~59, 60~70, 70~80, 80~90, 90~100)输出到不同的文件中,文件名前缀为:student_score_059、student_score_6070、student_score_7080、student_score_8090、student_score_90100。

方法1(最简单,推荐):Override MultipleTextOutputFormatgenerateFileNameForKeyValue()方法。

// 重载 MultipleTextOutputFormat 的 generateFileNameForKeyValue()方法来实现
public class PartitionScoreOutputFormat extends MultipleTextOutputFormat<Text, IntWritable>
{
	private static final String PREFIX = "student_score_";
	
	@Override
	protected String generateFileNameForKeyValue(Text key, IntWritable value, String name)
	{
		int score = value.get();
		if(score < 60) {
			return PREFIX + "059";
		}
		if(score < 70) {
			return PREFIX + "6070";
		}
		if(score < 80) {
			return PREFIX + "7080";
		}
		if(score < 90) {
			return PREFIX + "8090";
		}
		
		return PREFIX + "90100";
	}
}

// 调用
job.setOutputFormat(PartitionScoreOutputFormat.class)

方法2:使用 MultipleOutputs.addNamedOutput() 方法

public class StudentScoreReducer extends Reducer<Text, IntWritable, Text, IntWritable>
{
	// 使用 MultipleOutputs
	private MultipleOutputs<Text, IntWritable> mos;
	
	@Override
	protected void setup(Context context) throws ...
	{
		super.setup(context);
		mos = new MultipleOutputs<Text,IntWritable>(context);
	}
	
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws ...
	{
		int totalScore = 0;
		int count = 0;
		
		for(IntWritable score : values) {
			totalScore += score.get();
			count++;
		}
		
		int avgScore = count > 0 ? totalScore/count : 0;
		
		// 使用 named output,对应在 Task 中的 MultipleOutputs.addNamedOutput(...) 定义
		mos.write(getNamed(avgScore), key, new IntWritable(avgScore));
		
	}
	
	@Override
	protected void cleanup(...) 
	{
		// ...
		mos.close();
	}
	
	private static String getNamed(int score) 
	{
		if(score < 60) {
			return "score059";
		}
		if(score < 70) {
			return "score6070";
		}
		if(score < 80) {
			return "score7080";
		}
		if(score < 90) {
			return "score8090";
		}
		return "score90100";
	}
	
}
public class StudentScoreTask
{
	public static void main(String[] args) throws ...
	{
		Job job = Job.getInstance(new Configuration());
		job.setJobName("...");
		
		// ...
		// 重要:定义命名输出规则: 
		// 第二个参数:score059 等名字要和 Reducer中的 MultipleOutputs.write(namedParam, ...) 命名一致
		MultipleOutputs.addNamedOutput(job, "score059", TextOutputFormat.class, Text.class, IntWritable.class);
		MultipleOutputs.addNamedOutput(job, "score6070", ...);
		MultipleOutputs.addNamedOutput(job, "score7080", ...);
		MultipleOutputs.addNamedOutput(job, "score8090", ...);
		MultipleOutputs.addNamedOutput(job, "score90100", ...);
		
		// ...
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
	
}

场景2:完全自定义输出的文件名和后缀

示例:计算学生的平均分成绩,将学生姓名和成绩输出到文件中,文件名格式为:student_score_${yyyyMMdd}_${taskId}.txt(student_score_20160930_0.txt)

方法:由于是输出到文本文件,因此 Override TextOutputFormat 的 getDefaultWorkFile() 方法即可:

public class StudentScoreOutputFormat<K,V> extends TextOutputFormcat<K,V>
{
	@Override
	public Path getDefaultWorkFile(TaskAttemptContext context, String extension) throws IOException
	{
		FileOutputCommitter comitter = (FileOutputCommitter) super.getOutputCommitter(context);
		
		return new Path(committer.getWorkPath(), generateFileName(context));
	}
	
	public synchronized static String generateFileName(TaskAttemptContext context)
	{
		TaskID taskId = context.getTaskAttemptID().getTaskID();
		int partition = taskId.getId();
		
		String currentDate = new SimpleDateFormat("yyyyMMdd").format(Calendar.getInstance().getTime());
		
		return String.format("student_score_%s_%d.txt", currentDate, partition);
	}
}

// 使用方式
Job job = Job.getInstance(...);
//...

// 将输出文件格式化类指定为自定义的 StudentScoreOutputFormat 即可
job.setOutputFormatClass(StudentScoreOutputFormat.class);

// 如果不想生成空文件 part-r-xxxx 等,使用 LazyOutputFormat 设置替代上面的设置即可
// LazyOutputFormat.setOutputFormatClass(job, StudentScoreOutputFormat.class);

场景3:修改默认一个Task一个文件输出,将所有输出合并到一个文件中,该文件采用固定size进行分割为多个文件(类似日志文件输出,比如当文件size达到1G时,自动生成第二个文件,后续输出到第二个文件中,以此类推)。

 

================== 未完待续,后面会持续补充遇到的特殊文件输出要求,更欢迎大家提供~~~

 

 

© 著作权归作者所有

共有 人打赏支持
山哥

山哥

粉丝 246
博文 353
码字总数 136686
作品 0
南京
程序员
私信 提问
java使用thumbnailator-0.4.8.jar 生成缩略图

场景一:图片尺寸不变,修改图片文件类型 使用: Thumbnails.of("F:\image\IMG20131229114806.png") .scale(1f) .outputFormat("jpg") .toFile("F:\image\output\IMG20131229114806"); 注意:......

不打伞的小鱼
2015/05/21
0
1
基于hive的日志数据统计实战

基于hive的日志数据统计实战 一、 hive 简介 hive 是一个基于 hadoop 的开源数据仓库工具,用于存储和处理海量结构化数据。 它把海量数据存储于 hadoop 文件系统,而不是数据库,但提供了一套...

空_明
2013/12/11
0
0
hive指定多个字符作为列分隔符

hive创建表指定分隔符,不支持多个字符作为分隔符,如果想使用多个字符作为分割符的话就需要实现InputFormat.主要重写next方法,代码如下 package hiveStream;import java.io.IOException; imp...

秋叶不落
2014/02/18
0
1
SpyDroid源码分析系列9:AudioStream之方法分析

~~ publicAudioStream() { setAudioSource(MediaRecorder.AudioSource.CAMCORDER); } public voidsetAudioSource(int audioSource) { mAudioSource = audioSource; } //设置mAudioSource pu......

强子哥哥
2013/11/05
660
0
程序员杂记系列

有感而发,糊撇乱侃的文章。宣泄一些郁闷,分享一些快乐。 程序员杂记:以显摆的名义! http://jianle.blog.51cto.com/429692/642710 程序员杂记:奋战在青青网吧! http://jianle.blog.51c...

剑了
2011/11/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

linux 服务管理 Crontba、Ntpdate、Logrotate、Supervisor

crond linux 系统则是由 cron (crond) 这个系统服务来控制的。Linux 系统上面原本就有非常多的计划性工作,因此这个系统服务是默认启动的。 另外, 由于使用者自己也可以设置计划任务,所以,...

狼王黄师傅
39分钟前
1
0
Sobel算子和Scharr滤波器

Sobel算子在数学上的本质是微分,对离散信号,是求邻域内的增量。 基本原理:在图像上,对图像信号在某点进行微分,表示图像的某个特征(如,强度、色调或者饱和度)在该点的变换程度。以强度...

yepanl
56分钟前
1
0
Jenkins API 使用

Jenkins 是一款流行的开源持续集成工具,可以用来做一些软件开发的自动化工作,如打包,测试,自动部署等。 Jenkins 中有 view 和 job 的概念, view 相当于组, job 则是具体的任务。 view...

YanWen
57分钟前
5
0
聊聊jest的NodeChecker

序 本文主要研究一下jest的NodeChecker NodeChecker jest-common-6.3.1-sources.jar!/io/searchbox/client/config/discovery/NodeChecker.java public class NodeChecker extends AbstractS......

go4it
今天
3
0
深入分析String.intern和String常量的实现原理

背景 字符串类型在实际应用场景中使用非常频繁,如果为每个字符串常量都生成一个对应的String对象,明显会造成内存的浪费,针对这一问题,虚拟机实现一个字符串常量池的概念,提供了如下实现...

群星纪元
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部