文档章节

学习MapReduce(五),自定义InputFormat类

静下来想想静静
 静下来想想静静
发布于 2017/03/13 10:56
字数 1760
阅读 47
收藏 0

    2017.3.13继续学习MapReduce。昨天天学习了Partition的自定义,Combiner的自定义,InputFormat用了很长时间才算是马马虎虎有点感觉。默认使用的InputFormat类是底层提供的FileInputFormat,FileInputFormat使用的是默认的RecordReader,我们需要重写createRecordReader,返回自己写的RecordReader。

    RecordReader可以规定按照自己规定的K,V值传给Mapper类的输入K,V。比如:我们想在本地Map中对海量的小文件合并成一个大文件,我们可以规定文件名为K,文件里的全内容为V传给Mapper。代码如下:

----------------->>>自己写的RecordReader

package lcPhoneFolw.inputformat;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
/**
 * 将会传入一个k,v值
 * @author admin
 *
 */
public class MyRecordReaderDemo extends RecordReader {
	//第一次看这玩意 一脸蒙蔽啊,RecordReader是用来干什么,为什么要重写,根本没概念,不写了,去百度去了
	//百度进修回来了,看了一圈,还在蒙蔽中,只是有点感悟了,如:我们默认使用的LineRecordReader,和TextInputFormat
	//LineRecordReader是对一个文件一行一行的返回V值,以行的偏移量为(就是行号)K值返回给Map。我们写自己的RecordReader
	//无非就是改变给Map的K,V值。这个案例是将一整个文件当作V值返回给Map
	private boolean flag = false; //这个有什么用我是真的不知道---》可能以后会搞明白以后补充
	private FileSplit filesplit; //这个是一个文件类的实例 必须要
	private Configuration conf;//这个必须要
	private Text key;
	private Text value = new Text();//这个是返回给Map的V值
	
	/**
	 * 初始化(属于重写,参数是一个输入流,和一个Context。Context是hadoop中的上下文类,可以通过这个类获得流程的所有信息,所以我们可以通过他
	 * 来得到一个Configuration的一个实例。)
	 */
	@Override
	public void initialize(InputSplit split, TaskAttemptContext context)
			throws IOException, InterruptedException {
		this.filesplit = (FileSplit)split;//将输入流转化为文件类
		this.conf = context.getConfiguration();//从上下文类获得Configuration实例
		
		
	}

	@Override
	public boolean nextKeyValue() throws IOException, InterruptedException {
		if(!flag){
			byte[] b = new byte[(int)filesplit.getLength()];//新建一个文件字节长度的字节数组
			Path path = filesplit.getPath();//手里拿着文件类,是不是可以获得文件路径(目前还不知道获取路径是问了干什么)
			FileSystem fs = path.getFileSystem(conf);//原来是为了得到一个FileSystem的实例啊
			FSDataInputStream fis = fs.open(path);//通过FileSystem打开这个文件
			//这里我好像出问题了,我得到的输入流是一个byte类型的。。。。我返回的V值却是一个Text类型的
			IOUtils.readFully(fis,b,0,b.length);//这里我将文件中的内容写入b中
			value = new Text(new String(b));//将数组转换为字符串,在将字符串转换成一个Text
			key = new Text(path.toString());//将文件名当作key
			fis.close();//关闭流
			flag = true;//不知道为啥  --》以后补充
			return true;//不知道为啥 --》以后补充
		}
		return false;
	}

	@Override
	public Text getCurrentKey() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return key;
	}

	@Override
	public Text getCurrentValue() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return value;
	}

	@Override
	public float getProgress() throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		return flag?1.0f:0.0f;
	}

	@Override
	public void close() throws IOException {
		
	}

}

以上代码可以实现以文件名为K,文件全内容为V传给Map。然后,开始----写自己的InputFormat类:

package lcPhoneFolw.inputformat;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
/*
 * 自己定义的InputFormat类,这个类的主要作用是用来返回自己定义RecordReader
 */
public class MyInuputFormat extends FileInputFormat{
	
	
	//约束文件是否是可切片的,这里返回false,表示不可切片
	@Override
	protected boolean isSplitable(JobContext context, Path filename) {
		// TODO Auto-generated method stub
		return false;
	}

	@Override
	public RecordReader createRecordReader(InputSplit split,
			TaskAttemptContext context) throws IOException,
			InterruptedException {
		MyRecordReaderDemo mrd = new MyRecordReaderDemo();
		mrd.initialize(split, context);
		return mrd;
	}

}

    其实写自己的InputFormat类就是重写了一个createRecordReader方法,这个方法的返回值为RecordReader,返回自己写的RecordReader就可以了。因为我们想要合并文件,所以重写isSplitable返回false约束文件不能被切片。

    然后写自己的实现类,这个实现类继承了Configured并实现了Tool接口,这个类主要是重写run方法。当然,写run方法之前需要些一个Mapper类来实现map(毕竟我们需要map来运行计算不是?)

-------------------------->>>>自己写的Mapper类:

package lcPhoneFolw.inputformat;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SequenceFileMapper extends Mapper<Text,Text,Text,Text> {

	
	@Override
	protected void map(Text key, Text value,Context context) throws IOException,
			InterruptedException {
		context.write(key, value);

	}
	
}

这个Mapper类非常简单,接受来自RecordReader传过来的K,V值,输出相应的K,V值

------------------------>>>自己的实现类

package lcPhoneFolw.inputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 这个类写的是本地通过mapreduce来处理文件的合并的,需要继承Configured和实现Tool
 * Configured是什么,Tool又是什么?,字面意思Configured和Configuration一个意思?
 * Tool--->>工具?百度进修去。。。。。。。。
 * 大概明白了,这个类主要是为了运行本地计算,诸如:小文件合并成一个大文件,等等。重写的run方法可以
 * 写一个Job,这个Job可以只有Map,也可以没有job,只得到想要的东西。也可以mapreduce都写。。。我是这样理解的。
 * 下面这个是将无数小文件合并成一个大文件。
 * @author admin
 *
 */
public class SmallFileToBigFile extends Configured implements Tool {
	
	public int run(String[] args) throws Exception {
		Configuration conf = new Configuration();//新建一个Configuration实例
//		System.setProperty("HADOOP_USER_NAME", "hdfs");//如果本地环境变量已配置,可以不写
//		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
//		if (otherArgs.length != 2) {
//			System.err.println("Usage: combinefiles");
//			System.exit(2);
//		}   这么一大堆东西只是判断输入的参数是否是两个,写上去更好。(主要是制定输入路径和输出路径)
		//新建一个Job实例
		Job job = Job.getInstance(conf,"combine small files to sequencefile");
		//设定文件输入格式为自己规定的输入格式(如果没有设定,也可以合并成一个文件,但是这个文件还是按照一行一行的读取的。
		//但是如果想要成功运行,自己写的Mapper类必须使用LongWritable为输入K)
		job.setInputFormatClass(MyInuputFormat.class);
		//设定输出格式------>这个也可以自己写,还没研究,这里使用底层提供的SequenceFileOutputFormat类
		job.setOutputFormatClass(SequenceFileOutputFormat.class);
		//设定自己定义的Mapper类
		job.setMapperClass(SequenceFileMapper.class);
		//设定输出K,V值,这里没有Reduce所以可以直接使用setOutputKeyClass和setOutputValueClass
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		//设定输入输出文件路径
		MyInuputFormat.setInputPaths(job,new Path(args[0]));
		FileOutputFormat.setOutputPath(job,new Path(args[1]));
		//返回提交状态
		return job.waitForCompletion(true) ? 0 : 1;

	}
	public static void main(String[] args) throws Exception {
		//运行run方法
		int exitCode = ToolRunner.run(new SmallFileToBigFile(),args);
		System.exit(exitCode);
	}
		

}

    代码中我已经写的很清楚了,重新run方法,其实就是提交了一个Job,这个Job规定了Mapper类,输入格式和输出格式,其中输入格式就是自己规定的InputFormat,也就是说在运行这个代码的时候,文件先进入自己写InputFoamat中实现自己规定的K,V输出。然后交个Mapper,Mapper开始计算,输出格式按照底层提供的类输出。

    大概就是这样。。。。。谢谢观看

 

 

© 著作权归作者所有

静下来想想静静
粉丝 2
博文 14
码字总数 12348
作品 0
焦作
私信 提问
大数据教程(10.6)自定义inputFormat(小文件合并)

上一篇文章分析了运营商流量日志解析增强的实现,至此,mapreduce的组件中除了inputFormat全都自定义写过了!博主今天将继续分享自定义inputFormat。 一、需求 无论hdfs还是mapreduce,对于小...

em_aaron
2018/12/29
26
0
MapReduce多种输入格式

MapReduce多种输入格式 文件是 MapReduce 任务数据的初始存储地。正常情况下,输入文件一般是存储在 HDFS 里面。这些文件的格式可以是任意的:我们可以使用基于行的日志文件,也可以使用二进...

wypersist
2018/05/06
0
0
自定义 hadoop MapReduce InputFormat 切分输入文件

在上一篇中,我们实现了按 cookieId 和 time 进行二次排序,现在又有新问题:假如我需要按 cookieId 和 cookieId&time 的组合进行分析呢?此时最好的办法是自定义 InputFormat,让 mapreduc...

大数据之路
2013/05/26
12.3K
16
Hadoop实战读书笔记(7)

输入数据概要 输入数据通常驻留在较大的文件中,通常几十或者数百GB,甚至更大。MapReduce处理的基本原则之一是将输入数据分割成块。这些块可以在多台计算机上并行处理,在Hadoop的术语中这些...

祥林会跟你远走高飞
2014/12/08
123
0
【hadoop】16.MapReduce-简介

简介 本章节我们先来了解一些关于MapReduce的理论知识。从本章节您可以学习到:MapReduce的相关知识。 1、概念 Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析...

Areya
01/12
19
0

没有更多内容

加载失败,请刷新页面

加载更多

python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
5分钟前
0
0
OSChina 周日乱弹 —— 我,小小编辑,食人族酋长

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @宇辰OSC :分享娃娃的单曲《飘洋过海来看你》: #今日歌曲推荐# 《飘洋过海来看你》- 娃娃 手机党少年们想听歌,请使劲儿戳(这里) @宇辰OSC...

小小编辑
今天
735
10
MongoDB系列-- SpringBoot 中对 MongoDB 的 基本操作

SpringBoot 中对 MongoDB 的 基本操作 Database 库的创建 首先 在MongoDB 操作客户端 Robo 3T 中 创建数据库: 增加用户User: 创建 Collections 集合(类似mysql 中的 表): 后面我们大部分都...

TcWong
今天
40
0
spring cloud

一、从面试题入手 1.1、什么事微服务 1.2、微服务之间如何独立通讯的 1.3、springCloud和Dubbo有哪些区别 1.通信机制:DUbbo基于RPC远程过程调用;微服务cloud基于http restFUL API 1.4、spr...

榴莲黑芝麻糊
今天
26
0
Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
79
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部