文档章节

学习MapReduce(六)自定义OutputFormat

静下来想想静静
 静下来想想静静
发布于 2017/03/13 17:21
字数 1317
阅读 45
收藏 0

    2017.3.13,上午学习了InputFormat,下午有点虚度光阴了,并且感觉有点小累,具体代码就不贴了,因为我也没有敲,看看明天早上有没有精神吧。

    OutputFormat看名就知道,它是用来规定文件输出规则的。它和InputFormat一样一样的,只是OutputFormat写一个RecordWriter类,而InputFormat需要写一个RecordReader类。RecordWriter规定了将reduce的输出结果写入到哪个自己定义的输出流中:如下代码所示

public class MyRecordWriter extends RecordWriter{
        //继承了RecordWriter,先定义两个输出流
        FSDataOutputStream enhanceOut = null;
        FSDataOutputStream toCrawlOut = null;
        
        public MyRecordWriter(FSDataOutputStream enhanceOut, FSDataOutputStream toCrawlOut) {
            this.enhanceOut = enhanceOut;
            this.toCrawlOut = toCrawlOut;
        }
		//重写write方法,这里的K,V值是reduce阶段的输出值,下面代码表示了key值中是否包含字符串tocrawl,如果包含,在输出流toCrawlOut写入这个K值,
		//不存在就将它写入enhanceOut中
        @Override
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
            
            //有了数据,你来负责写到目的地 —— hdfs
            //判断,进来内容如果是带tocrawl的,就往待爬清单输出流中写 toCrawlOut
            if(key.toString().contains("tocrawl")){
                toCrawlOut.write(key.toString().getBytes());
            }else{
                enhanceOut.write(key.toString().getBytes());
            }
                
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            
            if(toCrawlOut!=null){
                toCrawlOut.close();
            }
            if(enhanceOut!=null){
                enhanceOut.close();
            }
        }
    }
}

这时,我们就可以写一个自己的OutputFormat类,代码如下:

public class LogEnhancerOutputFormat extends FileOutputFormat{
	//这个类继承了FileOutputFormat,咱们往下看
    
    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {

		//通过TaskAttemptContext获取到一个Configuration的实例,在获取到一个FileSystem的实例
        FileSystem fs = FileSystem.get(context.getConfiguration());
		//新建两个文件路劲,这两个路径是为了接受相对应的输入流的。
        Path enhancePath = new Path("hdfs://hdp-node01:9000/flow/enhancelog/enhanced.log");
        Path toCrawlPath = new Path("hdfs://hdp-node01:9000/flow/tocrawl/tocrawl.log");
        //新建了两个输入流
        FSDataOutputStream enhanceOut = fs.create(enhancePath);
        FSDataOutputStream toCrawlOut = fs.create(toCrawlPath);
        
        //这里返回一个自己写的RecordWriter实例,注意这点我们将上面建的那两个输入流传给我MyRecordWriter,
		//而在MyRecordWriter的write方法,我们将一些内容写进这俩输出流里,那是不是在程序结束的时候,我们会有两个
		//日志文件,这俩日志文件是不是会有内容?内容就是我们规定的内容。
        return new MyRecordWriter(enhanceOut,toCrawlOut);
    }
}

以上写完时候我们只需要在Driver类,也就是由main方法的类中,通过Job来设定自己的输出格式就行了:

------------------------------>>>>贴上我自己敲的代码:

------------------------------>>>>Mapper,Reducer和Driver类一起贴了

package lcPhoneFolw.mrOutputFormat;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable,Text,LongWritable,LongWritable>{

	//我是想简单实现下,规定自己的OutputFormat来实现不同v值得结果放入不同的文件中
	@Override
	protected void map(LongWritable key,Text value,Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		String[] strs = line.split("\t");
		LongWritable phoneNum = new LongWritable(Long.valueOf(strs[0]));
		LongWritable sumload = new LongWritable(Long.valueOf(strs[1]+Long.valueOf(strs[2])));
		context.write(phoneNum, sumload);
	}
}
package lcPhoneFolw.mrOutputFormat;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * Reducer类的输出是要根据自己定义的OutputFormat来放日志的。
 * @author admin
 *
 */
public class MyReduce extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {
	
	@Override
	protected void reduce(
			LongWritable key,
			Iterable<LongWritable> values,Context context)
			throws IOException, InterruptedException {
		//对相同手机号的总流量进行累加
		long sum = 0;
		for (LongWritable value : values) {
			sum+=value.get();
		}
		context.write(key,new LongWritable(sum));
	}
	
	
	
}
package lcPhoneFolw.mrOutputFormat;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(MyDriver.class);
		
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReduce.class);
		
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(LongWritable.class);
		
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(LongWritable.class);
		//定义用哪个OutputFormat类为自己的输出类。(这里写我们自己写的类)
		job.setOutputFormatClass(MyOutputFormat.class);
		//设定输入和输出路径,虽然我们定义了自己的OutputFormat中有文件输出路径,但是mapreduce需要生产一个_success文件
		//所以我们还是需要写一个输出文件路径的
		FileInputFormat.setInputPaths(job,new Path(args[0]));
		FileOutputFormat.setOutputPath(job,new Path(args[1]));
		
		job.waitForCompletion(true);
	}
}

----------------------------------->>>>自己敲的OutputFormat

package lcPhoneFolw.mrOutputFormat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyOutputFormat extends FileOutputFormat<LongWritable,LongWritable>{
	
	//这个方法主要是返回自己想要的RecordWriter,因为我的reduce输出的是LongWritable,LongWritable类型的 ,所以这里我
	//也定义它接受的是LongWritable,LongWritable的。
	@Override
	public RecordWriter<LongWritable, LongWritable> getRecordWriter(TaskAttemptContext context)
			throws IOException, InterruptedException {
		//通过上下文类得到这个流程中的配置类实例
		Configuration conf = context.getConfiguration();
		FileSystem fs = FileSystem.get(conf);//通过配置实例,得到一个文件系统实例。
		FSDataOutputStream fos1 = fs.create(new Path("E:\\bigfloat.log"));
		FSDataOutputStream fos2 = fs.create(new Path("E:\\smallfloat.log"));
		MyRecordWriter mw = new MyRecordWriter(fos1,fos2);
		return mw;
	}
	/**
	 * MyRecordWriter定义自己写的RecordWriter子类。重写的write方法,是reduce阶段输出的K,V。
	 * 这个例子我们只根据K,来判断是否要放进哪个日志中。
	 * @author admin
	 *
	 */
	static class MyRecordWriter extends RecordWriter<LongWritable,LongWritable>{
		//新建两个输出流,这两个流对应两个日志文件
		private FSDataOutputStream fos1;
		private FSDataOutputStream fos2;
		
		
		public MyRecordWriter() {}
		
		public MyRecordWriter(FSDataOutputStream fos1, FSDataOutputStream fos2) {
			this.fos1 = fos1;
			this.fos2 = fos2;
		}
		/**
		 * 重写write方法,这个方法就是自己规定的规则,比如我想让总流量大于44733592的K放入bigfloat.log里
		 * 将小于等于1000的放入smallfloat.log中,当然放的全是手机号
		 */
		@Override
		public void write(LongWritable key, LongWritable value) throws IOException,
				InterruptedException {
			int m = (int)value.get();
			
			if(m>44733592){//根据value的大小,将不同的手机号放入相应的日志中
				fos1.writeUTF(key.toString()+"\t"+value.toString()+"\r\n");
			}else {
				fos2.writeUTF(key.toString()+"\t"+value.toString()+"\r\n");
			}
			
		}

		@Override
		public void close(TaskAttemptContext context) throws IOException,
				InterruptedException {
			if(null != fos1){
				fos1.close();
			}
			if(null!=fos2){
				fos2.close();
			}
		}
	
	}
}

 

Job.setoutputFormat(自己写的OutputFormat类.class)

 

以上就是我对OutputFormat的理解,谢谢观看!

© 著作权归作者所有

静下来想想静静
粉丝 2
博文 14
码字总数 12348
作品 0
焦作
私信 提问
【hadoop】16.MapReduce-简介

简介 本章节我们先来了解一些关于MapReduce的理论知识。从本章节您可以学习到:MapReduce的相关知识。 1、概念 Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析...

Areya
01/12
19
0
大数据(hadoop-Mapreduce原理架构)

课程目标: 1:MapReduce的应用场景 2:MapReduce编程模型 3:MapReduce的架构 4:常见MapReduce应用场景 5:总结 MapReduce的定义 源自于Google的MapReduce论文 发表于2004年12月 Hadoop M...

这很耳东先生
04/30
22
0
大数据教程(10.5)运营商流量日志解析增强

上一篇文章分析了如何使用hadoop来实现sql中group by 并且取每组中最大值的需求--订单中成交金额最大的订单项分析,本篇博客博主将继续分享一个mapreduce实战例子--运营商流量日志解析增强;...

em_aaron
2018/12/23
105
0
Hadoop实战读书笔记(7)

输入数据概要 输入数据通常驻留在较大的文件中,通常几十或者数百GB,甚至更大。MapReduce处理的基本原则之一是将输入数据分割成块。这些块可以在多台计算机上并行处理,在Hadoop的术语中这些...

祥林会跟你远走高飞
2014/12/08
123
0
Hadoop之MapReduce的基本概念

MapReduce 的概念 MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于 hadoop 的数据分析应用”的核心框架; MapReduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一...

飞鱼说编程
01/16
47
0

没有更多内容

加载失败,请刷新页面

加载更多

CentOS7.6中安装使用fcitx框架

内容目录 一、为什么要使用fcitx?二、安装fcitx框架三、安装搜狗输入法 一、为什么要使用fcitx? Gnome3桌面自带的输入法框架为ibus,而在使用ibus时会时不时出现卡顿无法输入的现象。 搜狗和...

技术训练营
昨天
5
0
《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
昨天
8
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
昨天
10
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
昨天
6
0
PHP+Ajax微信手机端九宫格抽奖实例

PHP+Ajax结合lottery.js制作的一款微信手机端九宫格抽奖实例,抽奖完成后有收货地址添加表单出现。支持可以设置中奖概率等。 奖品列表 <div class="lottery_list clearfix" id="lottery"> ......

ymkjs1990
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部