文档章节

hadoop之三、好友推荐

captainliu
 captainliu
发布于 2016/09/16 23:26
字数 821
阅读 18
收藏 0
package www.bjsxt.fof;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.Text;

public class Fof extends Text{

	public Fof(){
		
	}
	public Fof(String a,String b) {
		super(join(a, b));
	}
	public static String join(String a, String b) {
		if (StringUtils.isNotBlank(a)&&StringUtils.isNotBlank(b)) {
			if (a.compareTo(b)<0) {
				return a+"\t"+b;
			}else {
				return b+"\t"+a;
			}
		}
		return null;
	}
}

 

package www.bjsxt.fof;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



 

public class RunJob {
	
	public static void main(String[] args) {
		Configuration conf = new Configuration();
		try {
	 		 	 conf.set("fs.defaultFS", "hdfs://node11:8020");
//	 		 	 conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "|");
	 			//conf.set("mapred.jar", "C:\\Users\\root\\Desktop\\wordCount.jar");
				
				FileSystem fs = FileSystem.get(conf);
				Job job = Job.getInstance(conf, "friend");
				job.setJarByClass(RunJob.class);
				
				job.setMapperClass(FriendMapper1.class);
 				job.setReducerClass(FriendReducer1.class);
 				job.setMapOutputKeyClass(Fof.class);
 				job.setMapOutputValueClass(IntWritable.class);
			 
						
//				默认的第一个分隔符是  制表符  KEY_VALUE_SEPERATOR  
				job.setInputFormatClass(KeyValueTextInputFormat.class);
				
				FileInputFormat.addInputPath(job, new Path("/usr/friend/"));
				Path outpath = new Path("/usr/friendoutput/");
				
				if (fs.exists(outpath)) {
					fs.delete(outpath,true);
				}
				FileOutputFormat.setOutputPath(job, outpath);
				
				boolean f = job.waitForCompletion(true);
				if (f) {
					System.out.println("friend  1    任务执行成功");
					
					System.out.println("friend  2    任务开始执行");
					
					job = Job.getInstance(conf, "friend2");
					job.setJarByClass(RunJob.class);
					
					job.setMapperClass( FriendMapper2.class);
	 				job.setReducerClass(FriendReducer2.class);
	 				job.setMapOutputKeyClass(User.class);
	 				job.setMapOutputValueClass(Text.class);
	 				job.setSortComparatorClass(MySort.class);
	 				job.setGroupingComparatorClass(MyGroup.class);
				 
							
//					默认的第一个分隔符是  制表符  KEY_VALUE_SEPERATOR  
					job.setInputFormatClass(KeyValueTextInputFormat.class);
					
					FileInputFormat.addInputPath(job, new Path("/usr/friendoutput"));
					Path outpath2 = new Path("/usr/friendoutput2");
					
					if (fs.exists(outpath2)) {
						fs.delete(outpath2,true);
					}
					FileOutputFormat.setOutputPath(job, outpath2);
					
					boolean f2 = job.waitForCompletion(true);
					if (f2) {
						System.out.println("friend  2    任务执行成功");
					}
					
				}
		
		} catch ( Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	
	/**
	 * mapper 这里用Text作为输入键,那么以  \t  为分割符的前面就是键,后面就是值
	 * @author root
	 *
	 */
	static class FriendMapper1 extends Mapper<Text, Text, Fof, IntWritable>{

		@Override
		protected void map(Text key, Text value, Mapper<Text, Text, Fof, IntWritable>.Context context) throws IOException, InterruptedException {
			 String user = key.toString();
			 String[] friends = StringUtils.split(value.toString(),"\t");
			 for (int i = 0; i < friends.length; i++) {
				String f1 = friends[i];
//				 直接好友
				context.write(new Fof(user,f1), new IntWritable(2));
				for (int j = i+1; j < friends.length; j++) {
					String f2 = friends[j];
					Fof fof = new Fof(f1, f2);
					context.write(fof, new IntWritable(1));
				}
				 
			}
			 
		} 
		
		
	}

	
	
	/**
	 * reducer   输出每月中温度最高的前三条记录
	 * @author root
	 *
	 */
	static class FriendReducer1 extends Reducer<Fof, IntWritable, Fof, IntWritable>{

		@Override
		protected void reduce(Fof arg0, Iterable<IntWritable> arg1, Reducer<Fof, IntWritable, Fof, IntWritable>.Context arg2) throws IOException, InterruptedException {
		 
			int num = 0 ;
			boolean f = true;
		 for (IntWritable i : arg1) {
			 if (i.get()==2) {
				f =false;
				break;
			}else {
				num +=i.get();
			}
			
		}
		 
//		 当不是直接好友时
		 if (f==true) {
			arg2.write(arg0, new IntWritable(num));
		}
			
		} 
	}
		
		static class FriendMapper2 extends Mapper<Text, Text, User, Text>{

			@Override
			protected void map(Text key, Text value, Mapper<Text, Text, User, Text>.Context context) throws IOException, InterruptedException { 
				String user = key.toString();
				String other = value.toString().split("\t")[0];
			    int  count  = Integer.parseInt(value.toString().split("\t")[1]);
			    User u1 = new User();
			    u1.setUser(user);
			    u1.setOther(other);
			    u1.setCount(count);
			    User u2 = new User();
				u2.setUser(other);
				u2.setOther(user);
				u2.setCount(count);
				
				context.write(u1, new Text(other+"\t"+count));
				context.write(u2, new Text(user+"\t"+count));
			} 
			
			
		}

		
		/**
		 * 排序  键相同的进行排序,倒序
		 * @author root
		 *
		 */
		static class  MySort   extends  WritableComparator   {

			public MySort() {
				super(User.class,true);
			}
			@Override
			public int compare(WritableComparable a, WritableComparable b) {
				 User u1 =(User) a;
				 User u2 =(User) b;
				 int r1 = u1.getUser().compareTo(u2.getUser());
				 if (r1==0) {
					return  - Integer.compare(u1.getCount(), u2.getCount());
				}
				 return r1;
			} 
			
		}
		
		
		/**
		 * 分组,键值相同的分为一组
		 * @author root
		 *
		 */
		static class  MyGroup   extends  WritableComparator   {

			public MyGroup() {
				super(User.class,true);
			}
			@Override
			public int compare(WritableComparable a, WritableComparable b) {
				 User u1 =(User) a;
				 User u2 =(User) b;
				 return u1.getUser().compareTo(u2.getUser());
		 
			} 
			
		}
		
		
		/**
		 * 二次reducer   将好友列表拼接起来
		 * @author root
		 *
		 */
		static class FriendReducer2 extends Reducer<User, Text, Text, Text>{

			@Override
			protected void reduce(User arg0, Iterable<Text> arg1, Reducer<User, Text, Text, Text>.Context arg2) throws IOException, InterruptedException {
				 StringBuilder sb = new StringBuilder();
				for (Text t : arg1) {
					sb.append(t+",");
				}
				arg2.write(new Text(arg0.getUser()+": "), new Text(sb.toString()));
			} 
			
			
		}


}

 

package www.bjsxt.fof;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;


public class User implements WritableComparable<User>{
	
	@Override
	public int compareTo(User o) {
		 
		return this==o?0:-1;
	}
	@Override
	public void write(DataOutput out) throws IOException {
		 out.writeUTF(user);
		 out.writeUTF(other);
		 out.writeInt(count);;
		
	}
	@Override
	public void readFields(DataInput in) throws IOException {
		 user = in.readUTF();
		 other = in.readUTF();
		 count = in.readInt();
		
	}
	private String user;
	private String other;
	private int  count;
	public String getUser() {
		return user;
	}
	public void setUser(String user) {
		this.user = user;
	}
	public String getOther() {
		return other;
	}
	public void setOther(String other) {
		this.other = other;
	}
	public int getCount() {
		return count;
	}
	public void setCount(int count) {
		this.count = count;
	}
	

}

 

© 著作权归作者所有

共有 人打赏支持
captainliu
粉丝 11
博文 106
码字总数 83678
作品 0
昌平
程序员
私信 提问
Hadoop实例:二度人脉与好友推荐

在新浪微博、人人网等社交网站上,为了使用户在网络上认识更多的朋友,社交网站往往提供类似“你可能感兴趣的人”、“间接关注推荐”等好友推荐的功能。一直很好奇这个功能是怎么实现的。 其...

intergret
2013/01/03
0
12
各大平台的推荐系统原来是靠他来完成的,太牛了

1. 推荐系统概述 电子商务网站是个性化推荐系统重要地应用的领域之一,亚马逊就是个性化推荐系统的积极应用者和推广者,亚马逊的推荐系统深入到网站的各类商品,为亚马逊带来了至少30%的销售...

h8b6pk7m7r8
2017/11/27
0
0
各大平台的推荐系统原来是靠他来完成的,太牛了

1. 推荐系统概述 电子商务网站是个性化推荐系统重要地应用的领域之一,亚马逊就是个性化推荐系统的积极应用者和推广者,亚马逊的推荐系统深入到网站的各类商品,为亚马逊带来了至少30%的销售...

h8b6pk7m7r8
2017/11/30
0
0
[推荐]开源软件架构

这本书原先只有英文版,现在图灵这里有中文版的了,写的真好 http://www.ituring.com.cn/minibook/19 前言(卷一) 卷1:第1章 Asterisk 卷1:第3章 The Bourne-Again Shell 卷1:第4章 Berk...

难易
2013/11/23
2.8K
4
Hadoop家族学习路线图

权声明:本文为博主原创文章,未经博主允许不得转载。 目录(?)[+] 主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增...

_Mr_Computer_
2016/07/13
0
0

没有更多内容

加载失败,请刷新页面

加载更多

C++ vector和list的区别

1.vector数据结构 vector和数组类似,拥有一段连续的内存空间,并且起始地址不变。 因此能高效的进行随机存取,时间复杂度为o(1); 但因为内存空间是连续的,所以在进行插入和删除操作时,会造...

shzwork
今天
3
0
Spring之invokeBeanFactoryPostProcessors详解

Spring的refresh的invokeBeanFactoryPostProcessors,就是调用所有注册的、原始的BeanFactoryPostProcessor。 相关源码 public static void invokeBeanFactoryPostProcessors(Configu......

cregu
昨天
4
0
ibmcom/db2express-c_docker官方使用文档

(DEPRECIATED) Please check DB2 Developer-C Edition for the replacement. What is IBM DB2 Express-C ? ``IBM DB2 Express-C``` is the no-charge community edition of DB2 server, a si......

BG2KNT
昨天
3
0
Ubuntu 18.04.2 LTS nvidia-docker2 : 依赖: docker-ce (= 5:18.09.0~3-0~ubuntu-bionic)

平台:Ubuntu 18.04.2 LTS nvidia-docker2 版本:2.0.3 错误描述:在安装nvidia-docker2的时候报dpkg依赖错误 nvidia-docker2 : 依赖: docker-ce (= 5:18.09.0~3-0~ubuntu-bionic) 先看一下依......

Pulsar-V
昨天
4
0
学习笔记1-goland结构体(struct)

写在前面:若有侵权,请发邮件by.su@qq.com告知。 转载者告知:如果本文被转载,但凡涉及到侵权相关事宜,转载者需负责。请知悉! 本文永久更新地址:https://my.oschina.net/bysu/blog/3036...

不最醉不龟归
昨天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部