2014-11-17--Hadoop的基础学习(四)--编写不同MapReudce程序及其特性(上)
2014-11-17--Hadoop的基础学习(四)--编写不同MapReudce程序及其特性(上)
查封炉台 发表于3年前
2014-11-17--Hadoop的基础学习(四)--编写不同MapReudce程序及其特性(上)
  • 发表于 3年前
  • 阅读 150
  • 收藏 1
  • 点赞 0
  • 评论 0

腾讯云 新注册用户 域名抢购1元起>>>   

摘要: MapReduce本身的程序是有很多的扩展的同时的话还涉及到很多的知识点,包括(数据类型,序列化机制,Partitioner编程,自定义排序编程,Combiner编程,常见的MapReduce算法).在本博客中将分析mapreduce过程源码的分析. 本打算做一篇博文搞定,但是越写发现Mapreduce的知识点多而且碎.所以分为上下篇.1-4为上篇,5-9为下篇.

1.Hadoop的序列化机制

  序列化就是把 内存中的对象的状态信息,转换成 字节序列以便于存储(持久化)和网络传输。而反序列化就是将收到 字节序列或者是硬盘的持久化数据,转换成内存中的对象。

  其实在Java规范中,已经有了一套序列化的机制,某个面向对象的类实现Serializable接口就能实现序列化与反序列化,但是记得一定要加上序列化版本ID serialVersionUID .可是为什么Hadoop要自主研发序列化机制呢?它对比原生态的有什么特点和区别呢?

JDK在序列化的时候,算法会考虑这些事情:

  1. 将对象实例相关的类元数据输出。

  2. 递归地输出类的超类描述直到不再有超类。

  3. 类元数据完了以后,开始从最顶层的超类开始输出对象实例的实际数据值.

  4. 从上至下递归输出实例的数据

    优点:从上面来看java的序列化确实很强大,序列化后得到的信息也很详细,所以反序列化就变得特别简单.

所以我们只要implements Serializable接口,JDK会自动处理一切,Java的序列化机制相当复杂,能处理各种对象关系。

        缺点:Java的序列化机制计算量开销大,且序列化的结果体积太大,有时能达到对象大小的数倍.引用机制也会导致大文件不能分割.

       这些缺点对于Hadoop是非常致命的,因为在Hadoop集群之间需要通讯或者是RPC调用的话,需要序列化,而且要求序列化要快,且体积要小,占用带宽要小。所以Hadoop就自个玩了一套.

Hadoop的序列化的特点是:

 1 . 紧凑:由于带宽是集群中信息传递的最宝贵的资源所以我们必须想法设法缩小传递信息的大小,hadoop的序列化就   为了更好地坐到这一点而设计的。

 2 . 对象可重用:JDK的反序列化会不断地创建对象,这肯定会造成一定的系统开销,但是在hadoop的反序列化中,能重复的利用一个对象的readField方法来重新产生不同的对象。

 3 . 可扩展性:Hadoop的序列化有多中选择

             a.可以利用实现hadoop框架中的Writable接口。(原生的)

             b.使用开源的序列化框架protocol Buffers,Avro等框架。

     PS(网络来源):hadoop2.X之后是实现一个叫YARN,所有应用(如mapreduce,或者其他spark实时或者离线的计算框架都可以运行在YARN上),YARN还负责对资源的调度等等。YARN的序列化就是用Google开发的序列化框架protocol Buffers,目前支持支持三种语言C++,java,Python.所以RPC这一层我们就可以利用其他语言来做文章,满足其他语言开发者的需求。

接下来的话就是如何使用序列化机制,Writable介绍如下.

2.Writable接口及其它的实现类

Hadoop原生的序列化,hadoop原生的序列化类需要实现一个叫Writeable的接口,类似于Serializable接口。

还有hadoop也为我们提供了几个序列化类,他们都直接或者间接地实现了Writable接口。如:IntWritable,LongWritable,Text,org.apache.hadoop.io.WritableComparable<T>等等。

实现Writable接口必须实现两个方法:

public void write(DataOutput out) throws IOException ;
public void readFields(DataInput in) throws IOException ;

实现WritableComparable接口必须实现三个方法,翻阅该接口的的源码,都已经给出demo了.篇幅原因,自己去看吧

案例1:数据如下图,统计电话号码相同的的上传下载流量和总流量.电话号码,上传流量,下载流量,总流量.(1,lastest-2,lastest-3)

1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
1363154400022 	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4			4	0	240	0	200
1363157993044 	18211575961	94-71-AC-CD-E6-18:CMCC-EASY	120.196.100.99	iface.qiyi.com	视频网站	15	12	1527	2106	200
1363157995074 	84138413	5C-0E-8B-8C-E8-20:7DaysInn	120.197.40.4	122.72.52.12		20	16	4116	1432	200
1363157993055 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
1363157995033 	15920133257	5C-0E-8B-C7-BA-20:CMCC	120.197.40.4	sug.so.360.cn	信息安全	20	20	3156	2936	200
1363157983019 	13719199419	68-A1-B7-03-07-B1:CMCC-EASY	120.196.100.82			4	0	240	0	200
1363157984041 	13660577991	5C-0E-8B-92-5C-20:CMCC-EASY	120.197.40.4	s19.cnzz.com	站点统计	24	9	6960	690	200
1363157973098 	15013685858	5C-0E-8B-C7-F7-90:CMCC	120.197.40.4	rank.ie.sogou.com	搜索引擎	28	27	3659	3538	200
1363157986029 	15989002119	E8-99-C4-4E-93-E0:CMCC-EASY	120.196.100.99	www.umeng.com	站点统计	3	3	1938	180	200
1363157992093 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			15	9	918	4938	200
1363157986041 	13480253104	5C-0E-8B-C7-FC-80:CMCC-EASY	120.197.40.4			3	3	180	180	200
1363157984040 	13602846565	5C-0E-8B-8B-B6-00:CMCC	120.197.40.4	2052.flash2-http.qq.com	综合门户	15	12	1938	2910	200
1363157995093 	13922314466	00-FD-07-A2-EC-BA:CMCC	120.196.100.82	img.qfc.cn		12	12	3008	3720	200
1363157982040 	13502468823	5C-0A-5B-6A-0B-D4:CMCC-EASY	120.196.100.99	y0.ifengimg.com	综合门户	57	102	7335	110349	200
1363157986072 	18320173382	84-25-DB-4F-10-1A:CMCC-EASY	120.196.100.99	input.shouji.sogou.com	搜索引擎	21	18	9531	2412	200
1363157990043 	13925057413	00-1F-64-E1-E6-9A:CMCC	120.196.100.55	t3.baidu.com	搜索引擎	69	63	11058	48243	200
1363157988072 	13760778710	00-FD-07-A4-7B-08:CMCC	120.196.100.82			2	2	120	120	200
1363157985066 	13726238888	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
1363157993055 	13560436666	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200

  1. 定义可序列化的JavaBean.com.codewatching.fluxcount.bean.FlowBean

package com.codewatching.fluxcount.bean;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable {
	private String phoneNum;
	private long upFlow;
	private long downFlow;
	private long sumFlow;
	public FlowBean(){}
	public FlowBean(String phoneNum, long upFlow, long downFlow) {
		super();
		this.phoneNum = phoneNum;
		this.upFlow = upFlow;
		this.downFlow = downFlow;
		this.sumFlow = upFlow+downFlow;
	}
	public String getPhoneNum() {
		return phoneNum;
	}
	public void setPhoneNum(String phoneNum) {
		this.phoneNum = phoneNum;
	}
	public long getUpFlow() {
		return upFlow;
	}
	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}
	public long getDownFlow() {
		return downFlow;
	}
	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}
	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(phoneNum);
		out.writeLong(downFlow);
		out.writeLong(upFlow);
		out.writeLong(sumFlow);
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		phoneNum = in.readUTF();
		downFlow = in.readLong();
		upFlow = in.readLong();
		sumFlow = in.readLong();
	}
	@Override
	public String toString() {
		return upFlow+"\t"+downFlow+"\t"+sumFlow;
	}
}

       2. 编写Mapper,Reducer,Runner.

package com.codewatching.fluxcount.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import com.codewatching.fluxcount.bean.FlowBean;
public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		String line = value.toString();
		String[] fileds = line.split("\t");
		int length = fileds.length;
		String phoneNum = fileds[1];
		long upFlow = Long.parseLong(fileds[length-3]);
		long downFlow = Long.parseLong(fileds[length-2]);
		FlowBean flowBean = new FlowBean(phoneNum, upFlow, downFlow);
		//以flowBean为value供reducer处理
		context.write(new Text(phoneNum), flowBean);
	}
}
package com.codewatching.fluxcount.hadoop;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import com.codewatching.fluxcount.bean.FlowBean;
public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
	@Override
	protected void reduce(Text key, Iterable<FlowBean> values,Context context)
			throws IOException, InterruptedException {
		long _downFlow = 0;
		long _upFlow = 0;
		for (FlowBean flowBean : values) {
			_downFlow += flowBean.getDownFlow();
			_upFlow += flowBean.getUpFlow();
		}
		FlowBean bean = new FlowBean(key.toString(), _upFlow, _downFlow);
		context.write(key, bean);
	}
}
package com.codewatching.fluxcount.hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.codewatching.fluxcount.bean.FlowBean;
public class FlowSumRunner extends Configured implements Tool{
	@Override
	public int run(String[] args) throws Exception {
		Configuration configuration = new Configuration();
		Job job = Job.getInstance(configuration);
		configuration.set("mapreduce.job.jar", "fluxcount.jar");
		job.setJarByClass(FlowSumRunner.class);
		job.setMapperClass(FlowSumMapper.class);
		job.setReducerClass(FlowSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		FileSystem fileSystem = FileSystem.get(configuration);
		Path path = new Path(args[1]);
		if(fileSystem.exists(path)){
			fileSystem.delete(path, true);
		}
		FileOutputFormat.setOutputPath(job, path);
		
		return job.waitForCompletion(true)?0:1;
	}
	public static void main(String[] args) throws Exception {
		ToolRunner.run(new Configuration(), new FlowSumRunner(), args);
	}
}

3.Partitioner类的编程

hadoop的map/reduce中支持对key进行分区,从而让map出来的数据均匀分布在reduce上.Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出结果.Mapper的结果,可能送到Combiner(下面回讲到)做合并, Mapper最终处理的键值对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer。哪个key到哪个Reducer的分配过程,是由Partitioner规定的.说的真麻烦。如果我们去查阅Partitioner类的源码,就知道它是个抽象类,里面有个抽象方法:

/** 
   * Get the partition number for a given key (hence record) given the total 
   * number of partitions i.e. number of reduce-tasks for the job.
   *   
   * <p>Typically a hash function on a all or a subset of the key.</p>
   *
   * @param key the key to be partioned.
   * @param value the entry value.
   * @param numPartitions the total number of partitions.
   * @return the partition number for the <code>key</code>.
   */
public abstract int getPartition(KEY key, VALUE value, int numPartitions);

而在类的注释也是非常的全面,不得抱怨一句。洋文如果好一点的话,学起来会轻松多了.唉,老大难.

Partitioner
controls the partitioning of the keys of the  intermediate map-outputs.....省略..

案例2:在案例1的基础上,然后将号码进行分区,假设135是北京,139是江西...将各地区的统计出来,并且各地区单独存放文件.效果图如下:

  1. 在案例的基础上,编写一个Partitionner实现类.

package com.codewatching.fluxcount.hadoop;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
import com.codewatching.fluxcount.bean.FlowBean;
public class AreaPartitioner extends Partitioner<Text, FlowBean>{
	private static Map<String,Integer> areaMap;
	static{
		areaMap = new HashMap<String, Integer>();
		areaMap.put("135", 0);    //模拟分区,存在内存中。
		areaMap.put("137", 1);
		areaMap.put("138", 2);
		areaMap.put("139", 3);
	}
	@Override
	public int getPartition(Text key, FlowBean value, int numPartitions) {
		int area = 4;  //默认都是为4
		String prefix = key.toString().substring(0,3);
		//判断是否在某个分区中
		Integer index = areaMap.get(prefix);
		if(index!=null){
			area = index;  //如果存在,取相应的数字0,1,2,3
		}
		return area;
	}
}

   2.在Runner中添加两行代码.

   3.在Hadoop中的运行结果.

其实上Hadoop已经提供了一个默认的实现类叫着HashPartitioner.看看它如何key分区的.

将key均匀分布在ReduceTasks上,举例如果Key为Text的话,Text的hashcode方法跟String的基本一致,都是采用的Horner公式计算,得到一个int,string太大的话这个int值可能会溢出变成负数,所以与上Integer.MAX_VALUE(即0111111111111111),然后再对reduce个数取余,这样就可以让key均匀分布在reduce上。 

PS:这个简单算法得到的结果可能不均匀,因为key毕竟不会那么线性连续.

4.MapReduce的输出处理类和输入处理类

输入处理类:InputFormat的作用负责MR的输入部分

              1、验证作业的输入是否规范。

              2、把输入文件切分成InputSplit。

              3、提供RecordReader的实现类,把InputSplit读到Mapper中进行处理.

     

      最佳分片的大小应该与块大小相同:因为它是确保可以存储在单个节点上的最大输入块的大小。如果分片跨越2个数据块,那么对于任何一个HDFS节点,基本上都不可能同时存储着2个数据块,因此分片中的部分数据需要通过网络传输到Map任务节点,与使用本地数据运行整个Map任务相比,这种方法显然效率更低。

PS:还可以编写自定义的输入处理类,继承InputFormat,重写相应的方法,当然,首先要知道方法的作用.--建议读源代码.

输出处理类:OutputFormat,在Ruduce处理之后.

编程时,输出输入处理类在哪使用指定:

5.Combiner编程

Combiner实质上就是不同上下文的Reducer函数启的功能是差不多的.

6.Shuffle过程分析

7.自定义的排序编程---倒排索引

8.常见的MapReduce算法

9.Split原理及源码分析

标签: MR Partitioner
共有 人打赏支持
粉丝 50
博文 56
码字总数 138491
×
查封炉台
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: