Hadoop平台下面实现PageRank算法

原创
2014/09/22 16:38
阅读数 310

这里我们在hadoop平台下面实现Pagerank算法


输入文件格式如下:

1    1.0 2 3 4 5 6 7 8
2    2.0 3 4 5 6 7 8
3    3.0 4 5 6 7 8
4    4.0 5 6 7 8
5    5.0 6 7 8
6    6.0 7 8
7    7.0 8
8    8.0 1 2 3 4 5 6 7


拿第一行进行说明: 1表示网址  然后用tab键隔开,记住一定要是tab键,1.0为给予的初始pr值,2,3,4,5,6,7,8为从网址1指向的网址
下面几行都是如此


代码如下:

package com.apache.hadoop.io;

import java.io.IOException;
import java.text.DecimalFormat;
import java.text.NumberFormat;
import java.util.StringTokenizer;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class PageRank {

  public static class MyMapper   extends Mapper<Object, Text, Text, Text>
  {
	  	private Text id = new Text();
  	 	public void map(Object key, Text value, Context context ) throws IOException, InterruptedException
  	 	{
  	 		String line = value.toString();
  	 		//判断是否为输入文件
  	 		if(line.substring(0,1).matches("[0-9]{1}"))
  	 		{
				  boolean flag = false;
				  if(line.contains("_"))
				  {
						line = line.replace("_","");
						flag = true;
				  }
				  //对输入文件进行处理
				  String[] values = line.split("\t");
				  Text t = new Text(values[0]);
				  String[] vals = values[1].split(" ");
				  //将url保存下来,下次计算也要用到
				  String url="_";
				  
				  double pr = 0;
				  int i = 0;
				  int num = 0;
				  
				  if(flag)
				  {
					  i=2;
					  pr=Double.valueOf(vals[1]);
					  num=vals.length-2;
				  }
				  else
				  {
					  i=1;
					  pr=Double.valueOf(vals[0]);
					  num=vals.length-1;
				  }
				  
				  for(;i<vals.length;i++)
				  {
					  url=url+vals[i]+" ";
					  id.set(vals[i]);
					  Text prt = new Text(String.valueOf(pr/num));
					  context.write(id,prt);
				  }
				  context.write(t,new Text(url));
			  }
		  }
  }

  public static class MyReducer  extends Reducer<Text,Text,Text,Text>
  {
			  private Text result = new Text();
			  private Double pr = new Double(0);
			  
		 public void reduce(Text key, Iterable<Text> values,  Context context  ) throws IOException, InterruptedException
		 {
			  double sum=0;
			  String url="";
			  
			  for(Text val:values)
			  {
					  //发现_标记则表明是url,否则是外链pr,要参与计算
				  if(!val.toString().contains("_"))
				  {
					  sum=sum+Double.valueOf(val.toString());
				  }
				  else
				 {
					  url=val.toString();
				  }
	    	  }
			  pr=0.15+0.85*sum;
			  String str=String.format("%.3f",pr);
			  result.set(new Text(str+" "+url));
			  context.write(key,result);
		  }
 }

	public static void main(String[] args) throws Exception
	{
			 // String paths="/user/root/input11/file.txt";
		//这里的路径为用户自己的输入文件的位置
		     String paths="hdfs://localhost:9000/user/root/input11";
			  String path1=paths;
			  String path2="";

			  //这里我们迭代20次
			  for(int i=1;i<=20;i++)
			  {
			       System.out.println("This is the "+i+"th job!");
			        System.out.println("path1:"+path1);
				 System.out.println("path2:"+path2);
						  
				  Configuration conf = new Configuration();
				Job job = new Job(conf, "PageRank");
						  
				 path2=paths+i;
						  
				 job.setJarByClass(PageRank.class);
			         job.setMapperClass(MyMapper.class);
				  job.setCombinerClass(MyReducer.class);
				job.setReducerClass(MyReducer.class);
				job.setOutputKeyClass(Text.class);
				job.setOutputValueClass(Text.class);
			       FileInputFormat.addInputPath(job, new Path(path1));
			      FileOutputFormat.setOutputPath(job, new Path(path2));
						  
			  path1=path2;
						  
			 job.waitForCompletion(true);
			System.out.println(i+"th end!");
		}
	  }	
 }

结果如下:

1    0.150 0.501 _2 3 4 5 6 7 8
2    0.150 0.562 _3 4 5 6 7 8
3    0.150 0.644 _4 5 6 7 8
4    0.150 0.755 _5 6 7 8
5    0.150 0.919 _6 7 8
6    0.150 1.184 _7 8
7    0.150 1.698 _8
8    0.150 2.822 _1 2 3 4 5 6 7 


可以看出此时的结果比8要大,还没有到达预期结果,我们可以自己设置迭代次数

在进行实验室实验中间结果可能产生很多,我们可以在hadoop的HDFS中进行删除相应的命令为


bin/hadoop fs -rmr /user/root/input11*






展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部