hadoop实例:使用MultipleInputs实现join功能

原创
2015/07/13 11:54
阅读数 197

上代码

package com.bigdata;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class LikeJoinMain {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException{
		Configuration conf = new Configuration();
		if(args.length != 3)
		{
			System.err.println("please add args likes input1 input2 output");
		}
		String[] params = new GenericOptionsParser(conf, args).getRemainingArgs();
		Job job = Job.getInstance(conf, "like_join");
		job.setJarByClass(LikeJoinMain.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(ValueBean.class);
		job.setReducerClass(AllReduce.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);
		MultipleInputs.addInputPath(job, new Path(params[0]), TextInputFormat.class, InputMap1.class);
		MultipleInputs.addInputPath(job, new Path(params[1]), TextInputFormat.class, InputMap2.class);
		FileOutputFormat.setOutputPath(job, new Path(params[2]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

package com.bigdata;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class ValueBean implements WritableComparable<ValueBean>{

	private String flag;
	
	private String value;
	
	public ValueBean(){
		super();
	}
	
	public ValueBean(String flag, String value){
		super();
		this.flag = flag;
		this.value = value;
	}
	
	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(flag);
		out.writeUTF(value);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.flag = in.readUTF();
		this.value = in.readUTF();
	}

	@Override
	public int compareTo(ValueBean o) {
		return this.value.compareTo(o.getValue());
	}

	public String getFlag(){
		return this.flag;
	}
	
	public void setFlag(String flag){
		this.flag = flag;
	}
	
	public String getValue(){
		return this.value;
	}
	
	public void setValue(String value){
		this.value = value;
	}
	
}

package com.bigdata;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class InputMap1 extends Mapper<LongWritable, Text, Text, ValueBean>{
	
	private String delimiter;
	
	@Override
	protected void setup(
			Mapper<LongWritable, Text, Text, ValueBean>.Context context)
			throws IOException, InterruptedException {
		delimiter = context.getConfiguration().get("delimiter", ",");
	}
	
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, Text, ValueBean>.Context context)
			throws IOException, InterruptedException {
		if(value.getLength() > 0)
		{
			String[] strArr = value.toString().trim().split(delimiter);
			if(strArr.length == 2)
			{
				context.write(new Text(strArr[1]), new ValueBean("1", strArr[0]));
			}
		}
	}
}

package com.bigdata;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class InputMap2 extends Mapper<LongWritable, Text, Text, ValueBean>{
	
	private String delimiter;
	
	@Override
	protected void setup(
			Mapper<LongWritable, Text, Text, ValueBean>.Context context)
			throws IOException, InterruptedException {
		delimiter = context.getConfiguration().get("delimiter", ",");
	}
	
	@Override
	protected void map(LongWritable key, Text value,
			Mapper<LongWritable, Text, Text, ValueBean>.Context context)
			throws IOException, InterruptedException {
		if(value.getLength() > 0)
		{
			String[] strArr = value.toString().trim().split(delimiter);
			if(strArr.length == 2)
			{
				context.write(new Text(strArr[0]), new ValueBean("2", strArr[1]));
			}
		}
	}
}

package com.bigdata;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class AllReduce extends org.apache.hadoop.mapreduce.Reducer<Text, ValueBean, NullWritable, Text>{
	@Override
	protected void reduce(Text arg0, Iterable<ValueBean> arg1,
			Reducer<Text, ValueBean, NullWritable, Text>.Context arg2)
			throws IOException, InterruptedException {
		String userValue = null;
		String phoneValue = null;
		for (ValueBean value : arg1) {
			if (value.getFlag() != null && value.getFlag().equals("1")) {
				userValue = value.getValue();
			} else {
				phoneValue = value.getValue();
			}
		}
		arg2.write(NullWritable.get(), new Text(userValue + "--" + arg0.toString() + "--" + phoneValue));
	}
}


参考文章:MapReduce对输入多文件的处理

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部