学习MapReduce(七),对自定义序列化类型的聚合和排序--GroupingComparator

原创
2017/03/14 11:35
阅读数 85

    2017.3.14 终于快见到曙光了,MapReduce的一些基本概念和常见工具类就快要学完了,今天来到了GroupingComparator。这个类对应的应用场景是你有很多数据,但是你想知道这些数据中相同id,url,或者用户的某个行为或者数据的最大值,最小值或者一些其他的。这个时候就需要用到GroupingComparator这个概念。

    首先要根据需求来写一个自己的序列化类,这个类中按照需求来写序列化,只是在比较的时候,需要注意一点,比如,你想知道这个ID中某个数据的最大值,你应该先比较这个id是否一样,如果一样在比较最大值。写完序列化,需要写一个GroupingComparator类,这个类继承了WriterableComparetor类。重写一个compare方法,注意这个方法的参数是两个WritableComparable类,而我们写的序列化类就是实现了这个WritableComparable接口。故我们可以将这两个参数给强转成自己写的序列化类。

    第一次写完,运行的时候,程序报空指针异常。看了看错误说是我的GroupingComparator类没有下一个K,V值,百度一下才知道GroupingComparator类的无参空构造必须要在super里制定自己的序列化类才行。修改过之后正常运行并且运算正确。下面贴代码

------------------------------->>>>自己的序列化类

package lcPhoneFolw.mrGroupCompared;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparable;

/**
 * 还是手机流量的案例,现在需要求一个手机号访问哪个网页的时间最长(也就是访问哪个网站的次数最多),首先应该先根据用户和网址信息过滤一次
 * 
 * 然后再在这个过滤后的文件中找每个用户访问哪个网站的次数最多。前面那一步就不做了,直接做最后一个
 * 其实重在理解,理解了它的工作机制,业务上的需求,就根据自己的理解来做,做到瓶颈了就百度或者询问经验更加丰富的
 * 这样学习才能很快。。。。。。
 * @author admin
 *
 */

public class MyPhoneBean implements WritableComparable<MyPhoneBean> {
	//这个是自己写的序列化类我只取手机号和总流量
	private String phoneNum;
	private LongWritable sumload;
	
	
	
	public MyPhoneBean() {}

	public MyPhoneBean(String phoneNum, LongWritable sumload) {
		this.phoneNum = phoneNum;
		this.sumload = sumload;
	}

	public String getPhoneNum() {
		return phoneNum;
	}

	public void PhoneNum(String phoneNum) {
		this.phoneNum = phoneNum;
	}

	public LongWritable getSumload() {
		return sumload;
	}

	public void setSumload(LongWritable sumload) {
		this.sumload = sumload;
	}

	public void write(DataOutput out) throws IOException {
		out.writeUTF(phoneNum);
		out.writeLong(sumload.get());
	}

	public void readFields(DataInput in) throws IOException {
		phoneNum = in.readUTF();
		sumload = new LongWritable(in.readLong());
	}

	public int compareTo(MyPhoneBean o) {
		//先通过手机号比较如果一样再通过总流量比较
		int m = this.phoneNum.compareTo(o.phoneNum);
		if(m == 0){
			m = this.sumload.compareTo(o.getSumload());
		}
		//按照倒序排序
		return m;
	}

	@Override
	public String toString() {
		
		return this.phoneNum+"\t"+this.sumload+"\r\n";
	}
	
	
	
	
}

---------------------------->>>>>GroupingComparator类

package lcPhoneFolw.mrGroupCompared;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyBeanKey extends WritableComparator {
	public MyBeanKey(){
		super(MyPhoneBean.class,true);
	}
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		MyPhoneBean mb1 = (MyPhoneBean)a;
		MyPhoneBean mb2 = (MyPhoneBean)b;
		
		return mb1.getPhoneNum().compareTo(mb2.getPhoneNum());
	}
	
	
	
	
}

------------------------------->>>>>测试类

package lcPhoneFolw.mrGroupCompared;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyPhoneGroupAll {
	static class MyphoneMapper extends Mapper<LongWritable, Text, MyPhoneBean, Text>{
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			String[] strs = value.toString().split("\t");
			String phoneNum = strs[0];
			long sumload = Long.valueOf(strs[1])+Long.valueOf(strs[2]);
			MyPhoneBean mb = new MyPhoneBean(phoneNum,new LongWritable(sumload));
			context.write(mb, new Text(phoneNum));
		}
	}
	static class MyphoneReducer extends Reducer<MyPhoneBean, Text, MyPhoneBean, NullWritable>{
		//这里接受的数据已经是通过排序比较后的了所以直接将K给输出就行了
		@Override
		protected void reduce(MyPhoneBean key,Iterable<Text> values,Context context)
				throws IOException, InterruptedException {
			System.out.println(key);
			context.write(key, NullWritable.get());
		}
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		Job job = Job.getInstance(conf);
		
		job.setJarByClass(MyPhoneGroupAll.class);
		job.setMapperClass(MyphoneMapper.class);
		job.setReducerClass(MyphoneReducer.class);
		//设定reduce端接收的聚合K的类
		job.setGroupingComparatorClass(MyBeanKey.class);
		
		job.setMapOutputKeyClass(MyPhoneBean.class);
		job.setMapOutputValueClass(Text.class);
		
		job.setOutputKeyClass(MyPhoneBean.class);
		job.setOutputValueClass(NullWritable.class);
		
		FileInputFormat.setInputPaths(job,new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.waitForCompletion(true);
	}
}

---------------------------------》》》》》

以上就是代码全内容,谢谢观看

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部