PageRank算法并行实现（一）
PageRank算法并行实现（一）

PageRank算法并行实现（一）
• 发表于 3年前
• 阅读 216
• 收藏 1
• 评论 0

1. PageRank算法并行化原理

2. MapReduce分步式编程

## 1. PageRank算法分步式原理

PageRank的分步式算法原理，简单来讲，就是通过矩阵计算实现并行化。

1). 把邻接矩阵的列，按数据行存储

``````          [,1]   [,2]   [,3]   [,4]
[1,] 0.0375000 0.0375 0.0375 0.0375
[2,] 0.3208333 0.0375 0.0375 0.8875
[3,] 0.3208333 0.4625 0.0375 0.0375
[4,] 0.3208333 0.4625 0.8875 0.0375``````

``````1       0.037499994,0.32083333,0.32083333,0.32083333
2       0.037499994,0.037499994,0.4625,0.4625
3       0.037499994,0.037499994,0.037499994,0.88750005
4       0.037499994,0.88750005,0.037499994,0.037499994``````

2). 迭代：求矩阵特征值

map过程：

• input: 邻接矩阵, pr值

• output: key为pr的行号，value为邻接矩阵和pr值的乘法求和公式

reduce过程：

• input: key为pr的行号，value为邻接矩阵和pr值的乘法求和公式

• output: key为pr的行号, value为计算的结果，即pr值

``````0.0375000 0.0375 0.0375 0.0375     1     0.150000
0.3208333 0.0375 0.0375 0.8875  *  1  =  1.283333
0.3208333 0.4625 0.0375 0.0375     1     0.858333
0.3208333 0.4625 0.8875 0.0375     1     1.708333``````

``````0.0375000 0.0375 0.0375 0.0375     0.150000      0.150000
0.3208333 0.0375 0.0375 0.8875  *  1.283333  =   1.6445833
0.3208333 0.4625 0.0375 0.0375     0.858333      0.7379167
0.3208333 0.4625 0.8875 0.0375     1.708333      1.4675000``````

… 10次迭代

``````0.1500000
1.4955721
0.8255034
1.5289245``````

3). 标准化PR值

``````0.150000                                              0.0375000
1.4955721  / (0.15+1.4955721+0.8255034+1.5289245) =   0.3738930
0.8255034                                             0.2063759
1.5289245                                             0.3822311``````

## 2. MapReduce分步式编程

MapReduce流程分解

HDFS目录

• input(/user/hdfs/pagerank): HDFS的根目录

• input_pr(/user/hdfs/pagerank/pr): 临时目录，存储中间结果PR值

• tmp1(/user/hdfs/pagerank/tmp1):临时目录，存储邻接矩阵

• tmp2(/user/hdfs/pagerank/tmp2):临时目录，迭代计算PR值，然后保存到input_pr

• result(/user/hdfs/pagerank/result): PR值输出结果

•

• 网页链接关系数据: page.csv

• 出始的PR数据:pr.csv

• PageRank计算: PageRank.java

• PR标准化: Normal.java

• 启动程序: PageRankJob.java

1). 网页链接关系数据: page.csv

``````1,2
1,3
1,4
2,3
2,4
3,4
4,2``````

2). 出始的PR数据:pr.csv

``````1,1
2,1
3,1
4,1``````

• 阻尼系数为0.85

• 页面数为4

• reduce以行输出矩阵的列，输出列主要用于分步式存储，下一步需要转成行

``````package org.conan.myhadoop.pagerank;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
private static int nums = 4;// 页面数
private static float d = 0.85f;// 阻尼系数
public static class AdjacencyMatrixMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
System.out.println(values.toString());
String[] tokens = PageRankJob.DELIMITER.split(values.toString());
Text k = new Text(tokens[0]);
Text v = new Text(tokens[1]);
context.write(k, v);
}
}
public static class AdjacencyMatrixReducer extends Reducer<Text, Text, Text, Text> {
@Override
public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
float[] G = new float[nums];// 概率矩阵列
Arrays.fill(G, (float) (1 - d) / G.length);
float[] A = new float[nums];// 近邻矩阵列
int sum = 0;// 链出数量
for (Text val : values) {
int idx = Integer.parseInt(val.toString());
A[idx - 1] = 1;
sum++;
}
if (sum == 0) {// 分母不能为0
sum = 1;
}
StringBuilder sb = new StringBuilder();
for (int i = 0; i < A.length; i++) {
sb.append("," + (float) (G[i] + d * A[i] / sum));
}
Text v = new Text(sb.toString().substring(1));
System.out.println(key + ":" + v.toString());
context.write(key, v);
}
}
public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
JobConf conf = PageRankJob.config();
String input = path.get("input");
String input_pr = path.get("input_pr");
String output = path.get("tmp1");
String page = path.get("page");
String pr = path.get("pr");
HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);
hdfs.rmr(input);
hdfs.mkdirs(input);
hdfs.mkdirs(input_pr);
hdfs.copyFile(page, input);
hdfs.copyFile(pr, input_pr);
Job job = new Job(conf);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.setInputPaths(job, new Path(page));
FileOutputFormat.setOutputPath(job, new Path(output));
job.waitForCompletion(true);
}
}``````

×