学习MapReduce(六)自定义OutputFormat

原创
2017/03/13 17:21
阅读数 484

    2017.3.13,上午学习了InputFormat,下午有点虚度光阴了,并且感觉有点小累,具体代码就不贴了,因为我也没有敲,看看明天早上有没有精神吧。

    OutputFormat看名就知道,它是用来规定文件输出规则的。它和InputFormat一样一样的,只是OutputFormat写一个RecordWriter类,而InputFormat需要写一个RecordReader类。RecordWriter规定了将reduce的输出结果写入到哪个自己定义的输出流中:如下代码所示

public class MyRecordWriter extends RecordWriter{
        //继承了RecordWriter,先定义两个输出流
        FSDataOutputStream enhanceOut = null;
        FSDataOutputStream toCrawlOut = null;
        
        public MyRecordWriter(FSDataOutputStream enhanceOut, FSDataOutputStream toCrawlOut) {
            this.enhanceOut = enhanceOut;
            this.toCrawlOut = toCrawlOut;
        }
		//重写write方法,这里的K,V值是reduce阶段的输出值,下面代码表示了key值中是否包含字符串tocrawl,如果包含,在输出流toCrawlOut写入这个K值,
		//不存在就将它写入enhanceOut中
        @Override
        public void write(Text key, NullWritable value) throws IOException, InterruptedException {
            
            //有了数据,你来负责写到目的地 —— hdfs
            //判断,进来内容如果是带tocrawl的,就往待爬清单输出流中写 toCrawlOut
            if(key.toString().contains("tocrawl")){
                toCrawlOut.write(key.toString().getBytes());
            }else{
                enhanceOut.write(key.toString().getBytes());
            }
                
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            
            if(toCrawlOut!=null){
                toCrawlOut.close();
            }
            if(enhanceOut!=null){
                enhanceOut.close();
            }
        }
    }
}

这时,我们就可以写一个自己的OutputFormat类,代码如下:

public class LogEnhancerOutputFormat extends FileOutputFormat{
	//这个类继承了FileOutputFormat,咱们往下看
    
    @Override
    public RecordWriter getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {

		//通过TaskAttemptContext获取到一个Configuration的实例,在获取到一个FileSystem的实例
        FileSystem fs = FileSystem.get(context.getConfiguration());
		//新建两个文件路劲,这两个路径是为了接受相对应的输入流的。
        Path enhancePath = new Path("hdfs://hdp-node01:9000/flow/enhancelog/enhanced.log");
        Path toCrawlPath = new Path("hdfs://hdp-node01:9000/flow/tocrawl/tocrawl.log");
        //新建了两个输入流
        FSDataOutputStream enhanceOut = fs.create(enhancePath);
        FSDataOutputStream toCrawlOut = fs.create(toCrawlPath);
        
        //这里返回一个自己写的RecordWriter实例,注意这点我们将上面建的那两个输入流传给我MyRecordWriter,
		//而在MyRecordWriter的write方法,我们将一些内容写进这俩输出流里,那是不是在程序结束的时候,我们会有两个
		//日志文件,这俩日志文件是不是会有内容?内容就是我们规定的内容。
        return new MyRecordWriter(enhanceOut,toCrawlOut);
    }
}

以上写完时候我们只需要在Driver类,也就是由main方法的类中,通过Job来设定自己的输出格式就行了:

------------------------------>>>>贴上我自己敲的代码:

------------------------------>>>>Mapper,Reducer和Driver类一起贴了

package lcPhoneFolw.mrOutputFormat;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<LongWritable,Text,LongWritable,LongWritable>{

	//我是想简单实现下,规定自己的OutputFormat来实现不同v值得结果放入不同的文件中
	@Override
	protected void map(LongWritable key,Text value,Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		String[] strs = line.split("\t");
		LongWritable phoneNum = new LongWritable(Long.valueOf(strs[0]));
		LongWritable sumload = new LongWritable(Long.valueOf(strs[1]+Long.valueOf(strs[2])));
		context.write(phoneNum, sumload);
	}
}
package lcPhoneFolw.mrOutputFormat;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * Reducer类的输出是要根据自己定义的OutputFormat来放日志的。
 * @author admin
 *
 */
public class MyReduce extends Reducer<LongWritable, LongWritable, LongWritable, LongWritable> {
	
	@Override
	protected void reduce(
			LongWritable key,
			Iterable<LongWritable> values,Context context)
			throws IOException, InterruptedException {
		//对相同手机号的总流量进行累加
		long sum = 0;
		for (LongWritable value : values) {
			sum+=value.get();
		}
		context.write(key,new LongWritable(sum));
	}
	
	
	
}
package lcPhoneFolw.mrOutputFormat;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyDriver {
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(MyDriver.class);
		
		job.setMapperClass(MyMapper.class);
		job.setReducerClass(MyReduce.class);
		
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(LongWritable.class);
		
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(LongWritable.class);
		//定义用哪个OutputFormat类为自己的输出类。(这里写我们自己写的类)
		job.setOutputFormatClass(MyOutputFormat.class);
		//设定输入和输出路径,虽然我们定义了自己的OutputFormat中有文件输出路径,但是mapreduce需要生产一个_success文件
		//所以我们还是需要写一个输出文件路径的
		FileInputFormat.setInputPaths(job,new Path(args[0]));
		FileOutputFormat.setOutputPath(job,new Path(args[1]));
		
		job.waitForCompletion(true);
	}
}

----------------------------------->>>>自己敲的OutputFormat

package lcPhoneFolw.mrOutputFormat;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyOutputFormat extends FileOutputFormat<LongWritable,LongWritable>{
	
	//这个方法主要是返回自己想要的RecordWriter,因为我的reduce输出的是LongWritable,LongWritable类型的 ,所以这里我
	//也定义它接受的是LongWritable,LongWritable的。
	@Override
	public RecordWriter<LongWritable, LongWritable> getRecordWriter(TaskAttemptContext context)
			throws IOException, InterruptedException {
		//通过上下文类得到这个流程中的配置类实例
		Configuration conf = context.getConfiguration();
		FileSystem fs = FileSystem.get(conf);//通过配置实例,得到一个文件系统实例。
		FSDataOutputStream fos1 = fs.create(new Path("E:\\bigfloat.log"));
		FSDataOutputStream fos2 = fs.create(new Path("E:\\smallfloat.log"));
		MyRecordWriter mw = new MyRecordWriter(fos1,fos2);
		return mw;
	}
	/**
	 * MyRecordWriter定义自己写的RecordWriter子类。重写的write方法,是reduce阶段输出的K,V。
	 * 这个例子我们只根据K,来判断是否要放进哪个日志中。
	 * @author admin
	 *
	 */
	static class MyRecordWriter extends RecordWriter<LongWritable,LongWritable>{
		//新建两个输出流,这两个流对应两个日志文件
		private FSDataOutputStream fos1;
		private FSDataOutputStream fos2;
		
		
		public MyRecordWriter() {}
		
		public MyRecordWriter(FSDataOutputStream fos1, FSDataOutputStream fos2) {
			this.fos1 = fos1;
			this.fos2 = fos2;
		}
		/**
		 * 重写write方法,这个方法就是自己规定的规则,比如我想让总流量大于44733592的K放入bigfloat.log里
		 * 将小于等于1000的放入smallfloat.log中,当然放的全是手机号
		 */
		@Override
		public void write(LongWritable key, LongWritable value) throws IOException,
				InterruptedException {
			int m = (int)value.get();
			
			if(m>44733592){//根据value的大小,将不同的手机号放入相应的日志中
				fos1.writeUTF(key.toString()+"\t"+value.toString()+"\r\n");
			}else {
				fos2.writeUTF(key.toString()+"\t"+value.toString()+"\r\n");
			}
			
		}

		@Override
		public void close(TaskAttemptContext context) throws IOException,
				InterruptedException {
			if(null != fos1){
				fos1.close();
			}
			if(null!=fos2){
				fos2.close();
			}
		}
	
	}
}

 

Job.setoutputFormat(自己写的OutputFormat类.class)

 

以上就是我对OutputFormat的理解,谢谢观看!

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部