hadoop之三、好友推荐

原创
2016/09/16 23:26
阅读数 79
package www.bjsxt.fof;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.Text;

public class Fof extends Text{

	public Fof(){
		
	}
	public Fof(String a,String b) {
		super(join(a, b));
	}
	public static String join(String a, String b) {
		if (StringUtils.isNotBlank(a)&&StringUtils.isNotBlank(b)) {
			if (a.compareTo(b)<0) {
				return a+"\t"+b;
			}else {
				return b+"\t"+a;
			}
		}
		return null;
	}
}

 

package www.bjsxt.fof;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



 

public class RunJob {
	
	public static void main(String[] args) {
		Configuration conf = new Configuration();
		try {
	 		 	 conf.set("fs.defaultFS", "hdfs://node11:8020");
//	 		 	 conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "|");
	 			//conf.set("mapred.jar", "C:\\Users\\root\\Desktop\\wordCount.jar");
				
				FileSystem fs = FileSystem.get(conf);
				Job job = Job.getInstance(conf, "friend");
				job.setJarByClass(RunJob.class);
				
				job.setMapperClass(FriendMapper1.class);
 				job.setReducerClass(FriendReducer1.class);
 				job.setMapOutputKeyClass(Fof.class);
 				job.setMapOutputValueClass(IntWritable.class);
			 
						
//				默认的第一个分隔符是  制表符  KEY_VALUE_SEPERATOR  
				job.setInputFormatClass(KeyValueTextInputFormat.class);
				
				FileInputFormat.addInputPath(job, new Path("/usr/friend/"));
				Path outpath = new Path("/usr/friendoutput/");
				
				if (fs.exists(outpath)) {
					fs.delete(outpath,true);
				}
				FileOutputFormat.setOutputPath(job, outpath);
				
				boolean f = job.waitForCompletion(true);
				if (f) {
					System.out.println("friend  1    任务执行成功");
					
					System.out.println("friend  2    任务开始执行");
					
					job = Job.getInstance(conf, "friend2");
					job.setJarByClass(RunJob.class);
					
					job.setMapperClass( FriendMapper2.class);
	 				job.setReducerClass(FriendReducer2.class);
	 				job.setMapOutputKeyClass(User.class);
	 				job.setMapOutputValueClass(Text.class);
	 				job.setSortComparatorClass(MySort.class);
	 				job.setGroupingComparatorClass(MyGroup.class);
				 
							
//					默认的第一个分隔符是  制表符  KEY_VALUE_SEPERATOR  
					job.setInputFormatClass(KeyValueTextInputFormat.class);
					
					FileInputFormat.addInputPath(job, new Path("/usr/friendoutput"));
					Path outpath2 = new Path("/usr/friendoutput2");
					
					if (fs.exists(outpath2)) {
						fs.delete(outpath2,true);
					}
					FileOutputFormat.setOutputPath(job, outpath2);
					
					boolean f2 = job.waitForCompletion(true);
					if (f2) {
						System.out.println("friend  2    任务执行成功");
					}
					
				}
		
		} catch ( Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	
	/**
	 * mapper 这里用Text作为输入键,那么以  \t  为分割符的前面就是键,后面就是值
	 * @author root
	 *
	 */
	static class FriendMapper1 extends Mapper<Text, Text, Fof, IntWritable>{

		@Override
		protected void map(Text key, Text value, Mapper<Text, Text, Fof, IntWritable>.Context context) throws IOException, InterruptedException {
			 String user = key.toString();
			 String[] friends = StringUtils.split(value.toString(),"\t");
			 for (int i = 0; i < friends.length; i++) {
				String f1 = friends[i];
//				 直接好友
				context.write(new Fof(user,f1), new IntWritable(2));
				for (int j = i+1; j < friends.length; j++) {
					String f2 = friends[j];
					Fof fof = new Fof(f1, f2);
					context.write(fof, new IntWritable(1));
				}
				 
			}
			 
		} 
		
		
	}

	
	
	/**
	 * reducer   输出每月中温度最高的前三条记录
	 * @author root
	 *
	 */
	static class FriendReducer1 extends Reducer<Fof, IntWritable, Fof, IntWritable>{

		@Override
		protected void reduce(Fof arg0, Iterable<IntWritable> arg1, Reducer<Fof, IntWritable, Fof, IntWritable>.Context arg2) throws IOException, InterruptedException {
		 
			int num = 0 ;
			boolean f = true;
		 for (IntWritable i : arg1) {
			 if (i.get()==2) {
				f =false;
				break;
			}else {
				num +=i.get();
			}
			
		}
		 
//		 当不是直接好友时
		 if (f==true) {
			arg2.write(arg0, new IntWritable(num));
		}
			
		} 
	}
		
		static class FriendMapper2 extends Mapper<Text, Text, User, Text>{

			@Override
			protected void map(Text key, Text value, Mapper<Text, Text, User, Text>.Context context) throws IOException, InterruptedException { 
				String user = key.toString();
				String other = value.toString().split("\t")[0];
			    int  count  = Integer.parseInt(value.toString().split("\t")[1]);
			    User u1 = new User();
			    u1.setUser(user);
			    u1.setOther(other);
			    u1.setCount(count);
			    User u2 = new User();
				u2.setUser(other);
				u2.setOther(user);
				u2.setCount(count);
				
				context.write(u1, new Text(other+"\t"+count));
				context.write(u2, new Text(user+"\t"+count));
			} 
			
			
		}

		
		/**
		 * 排序  键相同的进行排序,倒序
		 * @author root
		 *
		 */
		static class  MySort   extends  WritableComparator   {

			public MySort() {
				super(User.class,true);
			}
			@Override
			public int compare(WritableComparable a, WritableComparable b) {
				 User u1 =(User) a;
				 User u2 =(User) b;
				 int r1 = u1.getUser().compareTo(u2.getUser());
				 if (r1==0) {
					return  - Integer.compare(u1.getCount(), u2.getCount());
				}
				 return r1;
			} 
			
		}
		
		
		/**
		 * 分组,键值相同的分为一组
		 * @author root
		 *
		 */
		static class  MyGroup   extends  WritableComparator   {

			public MyGroup() {
				super(User.class,true);
			}
			@Override
			public int compare(WritableComparable a, WritableComparable b) {
				 User u1 =(User) a;
				 User u2 =(User) b;
				 return u1.getUser().compareTo(u2.getUser());
		 
			} 
			
		}
		
		
		/**
		 * 二次reducer   将好友列表拼接起来
		 * @author root
		 *
		 */
		static class FriendReducer2 extends Reducer<User, Text, Text, Text>{

			@Override
			protected void reduce(User arg0, Iterable<Text> arg1, Reducer<User, Text, Text, Text>.Context arg2) throws IOException, InterruptedException {
				 StringBuilder sb = new StringBuilder();
				for (Text t : arg1) {
					sb.append(t+",");
				}
				arg2.write(new Text(arg0.getUser()+": "), new Text(sb.toString()));
			} 
			
			
		}


}

 

package www.bjsxt.fof;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;


public class User implements WritableComparable<User>{
	
	@Override
	public int compareTo(User o) {
		 
		return this==o?0:-1;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		 out.writeUTF(user);
		 out.writeUTF(other);
		 out.writeInt(count);;
		
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		 user = in.readUTF();
		 other = in.readUTF();
		 count = in.readInt();
		
	}
	private String user;
	private String other;
	private int  count;
	public String getUser() {
		return user;
	}
	public void setUser(String user) {
		this.user = user;
	}
	public String getOther() {
		return other;
	}
	public void setOther(String other) {
		this.other = other;
	}
	public int getCount() {
		return count;
	}
	public void setCount(int count) {
		this.count = count;
	}
	

}

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部