学习MapReduce(四),分区Partition的使用

原创
2017/03/12 17:16
阅读数 12

    2017.3.12,MapReduce一共有Map阶段,Reduce阶段两个阶段,但是在这两个阶段中间,有Shuffle阶段,Shuffle阶段分很多阶段:比如Combiner,Partition等等,可以用这样的结构来说:

        Map -->Partition-->Combiner(如果有的话)-->Reduce ---->输出到HDFS上<<<目前就知道这些阶段,可能还有更多更细节的阶段,以后补充>>>    

        其实我们如果不写自己的Partition,Hadoop会根据自己的一个类来进行分区-->HashPartitoner.但是很多案例或者业务需要我们自己来进行分区。这时候我们就要写自己的Partition类,通过继承Partition来完成。

    在本地会先走Map,Map的输出会当作Partition的输入。然后,根据业务需求写自己的逻辑代码。

代码如下:

----------------------->>>>自己写的序列化类

 

package lcPhoneFolw;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class PhoneFlow implements WritableComparable<PhoneFlow>{
	private long upload;
	private long download;
	private long sumload;
	
	public PhoneFlow(){}
	
	@Override
	public String toString() {
		
		return upload+"\t"+download+"\t"+sumload;
	}

	public PhoneFlow(long upload, long download) {
		this.upload = upload;
		this.download = download;
		this.sumload = upload+download;
	}

	public long getUpload() {
		return upload;
	}

	public void setUpload(long upload) {
		this.upload = upload;
	}

	public long getDownload() {
		return download;
	}

	public void setDownload(long download) {
		this.download = download;
	}

	public long getSumload() {
		return sumload;
	}

	public void setSumload(long sumload) {
		this.sumload = sumload;
	}
	//序列化
	public void write(DataOutput out) throws IOException {
		// TODO Auto-generated method stub
		out.writeLong(upload);
		out.writeLong(download);
		out.writeLong(sumload);
	}
	//反序列化(应该是和序列化的顺序一样,如果不一样应该是会出问题。没有研究过)
	public void readFields(DataInput in) throws IOException {
		// TODO Auto-generated method stub
		upload = in.readLong();
		download = in.readLong();
		sumload = in.readLong();
	}


	//自定义比较器,会在map输出结果 PhoneFlow为K时,自动按照这个比较器传给reduce
	public int compareTo(PhoneFlow o) {

		return this.sumload>o.getSumload() ? -1 : 1 ;
	}



}

--------->>>>>>自己写的Partition类

 

package lcPhoneFolw;

import java.util.*;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
 * 按照业务需求,我们需要针对手机号进行分区。这里传入的k,v值是map阶段的输出k,v
 * @author admin
 *
 */
public class PhonePartitioner extends Partitioner<Text, PhoneFlow>{
	//测试,所以只写了一个分区规则,就是匹配1380的手机号 分成一个区
	public static Map<String,Integer> map = new HashMap<String,Integer>();
	static{
		map.put("1380",0);
	}
	@Override
	public int getPartition(Text key, PhoneFlow values, int numPartitions) {
		String number = key.toString().substring(0,4);
		Integer partition = map.get(number);
		return partition==null?1:partition;
	}

}

------------------------->>>>>测试类(这个类中我是通过内部类来写的Mapper类和Reducer类)

package lcPhoneFolw;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 这个类是为了测试Partition的
 * @author admin
 *
 */
public class PhoneAllPartitionDemo {
	static class PhoneMapper3 extends Mapper<LongWritable,Text,Text,PhoneFlow>{
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] strs = line.split("\t");
			Text phoneNumber = new Text(strs[0]);
			long upload =  Long.valueOf(strs[1]);
			long download =  Long.valueOf(strs[2]);
			PhoneFlow pf = new PhoneFlow(upload,download);
			//将手机号和PhoneFlow传给PhonePartitioner来进行分区
			context.write(phoneNumber, pf);
		}
	}
	
	static class PhoneReducer3 extends Reducer<Text, PhoneFlow, Text, PhoneFlow>{
		
		@Override
		protected void reduce(Text key, Iterable<PhoneFlow> values,Context context)
				throws IOException, InterruptedException {
			context.write(key,values.iterator().next());
		}
	}
	/**
	 * main方法里有些变化,得定义Partition类和开几个任务
	 * @param args
	 */
	public static void main(String[] args)throws Exception {
		Configuration conf = new Configuration();
		
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(PhoneAllPartitionDemo.class);
		
		//设定分区类和reduce开启几个任务
		job.setPartitionerClass(PhonePartitioner.class);
		job.setNumReduceTasks(2);
		
		//下面就是mapper和Reducer的设定
		job.setMapperClass(PhoneMapper3.class);
		job.setReducerClass(PhoneReducer3.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(PhoneFlow.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(PhoneFlow.class);
		
		FileInputFormat.setInputPaths(job,new Path(args[0]));
		FileOutputFormat.setOutputPath(job,new Path(args[1]));
		
		job.waitForCompletion(true);

	}
}

运行测试类,得到的文件有:

    文件数量从默认的1个变成了两个了。这是因为在Partition类中定义了两个分区,一个1380号段,一个非1380号段的。打开这两个文件发现果然是这样子滴。

    如上就是Partition的使用,谢谢观看

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部