文档章节

PageRank算法并行实现(二)

东方神剑
 东方神剑
发布于 2014/11/26 13:45
字数 1007
阅读 750
收藏 6
点赞 1
评论 0

4). PageRank计算: PageRank.java

pagerank-step1

矩阵解释:

  • 实现邻接与PR矩阵的乘法

  • map以邻接矩阵的行号为key,由于上一步是输出的是列,所以这里需要转成行

  • reduce计算得到未标准化的特征值

新建文件: PageRank.java

package org.conan.myhadoop.pagerank;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;
public class PageRank {
    public static class PageRankMapper extends Mapper<LongWritable, Text, Text, Text> {
        private String flag;// tmp1 or result
        private static int nums = 4;// 页面数
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getParent().getName();// 判断读的数据集
        }
        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = PageRankJob.DELIMITER.split(values.toString());
            if (flag.equals("tmp1")) {
                String row = values.toString().substring(0,1);
                String[] vals = PageRankJob.DELIMITER.split(values.toString().substring(2));// 矩阵转置
                for (int i = 0; i < vals.length; i++) {
                    Text k = new Text(String.valueOf(i + 1));
                    Text v = new Text(String.valueOf("A:" + (row) + "," + vals[i]));
                    context.write(k, v);
                }
            } else if (flag.equals("pr")) {
                for (int i = 1; i <= nums; i++) {
                    Text k = new Text(String.valueOf(i));
                    Text v = new Text("B:" + tokens[0] + "," + tokens[1]);
                    context.write(k, v);
                }
            }
        }
    }
    public static class PageRankReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            Map<Integer, Float> mapA = new HashMap<Integer, Float>();
            Map<Integer, Float> mapB = new HashMap<Integer, Float>();
            float pr = 0f;
            for (Text line : values) {
                System.out.println(line);
                String vals = line.toString();
                if (vals.startsWith("A:")) {
                    String[] tokenA = PageRankJob.DELIMITER.split(vals.substring(2));
                    mapA.put(Integer.parseInt(tokenA[0]), Float.parseFloat(tokenA[1]));
                }
                if (vals.startsWith("B:")) {
                    String[] tokenB = PageRankJob.DELIMITER.split(vals.substring(2));
                    mapB.put(Integer.parseInt(tokenB[0]), Float.parseFloat(tokenB[1]));
                }
            }
            Iterator iterA = mapA.keySet().iterator();
            while(iterA.hasNext()){
                int idx = iterA.next();
                float A = mapA.get(idx);
                float B = mapB.get(idx);
                pr += A * B;
            }
            context.write(key, new Text(PageRankJob.scaleFloat(pr)));
            // System.out.println(key + ":" + PageRankJob.scaleFloat(pr));
        }
    }
    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = PageRankJob.config();
        String input = path.get("tmp1");
        String output = path.get("tmp2");
        String pr = path.get("input_pr");
        HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);
        hdfs.rmr(output);
        Job job = new Job(conf);
        job.setJarByClass(PageRank.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(PageRankMapper.class);
         job.setReducerClass(PageRankReducer.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path(input), new Path(pr));
        FileOutputFormat.setOutputPath(job, new Path(output));
        job.waitForCompletion(true);
        hdfs.rmr(pr);
        hdfs.rename(output, pr);
    }
}

5). PR标准化: Normal.java

normal-step1

矩阵解释:

  • 对PR的计算结果标准化,让所以PR值落在(0,1)区间

新建文件:Normal.java

package org.conan.myhadoop.pagerank;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;
public class Normal {
    public static class NormalMapper extends Mapper<LongWritable, Text, Text, Text> {
        Text k = new Text("1");
        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            context.write(k, values);
        }
    }
    public static class NormalReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            List vList = new ArrayList();
            float sum = 0f;
            for (Text line : values) {
                vList.add(line.toString());
                String[] vals = PageRankJob.DELIMITER.split(line.toString());
                float f = Float.parseFloat(vals[1]);
                sum += f;
            }
            for (String line : vList) {
                String[] vals = PageRankJob.DELIMITER.split(line.toString());
                Text k = new Text(vals[0]);
                float f = Float.parseFloat(vals[1]);
                Text v = new Text(PageRankJob.scaleFloat((float) (f / sum)));
                context.write(k, v);
                System.out.println(k + ":" + v);
            }
        }
    }
    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = PageRankJob.config();
        String input = path.get("input_pr");
        String output = path.get("result");
        HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);
        hdfs.rmr(output);
        Job job = new Job(conf);
        job.setJarByClass(Normal.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setMapperClass(NormalMapper.class);
        job.setReducerClass(NormalReducer.class);
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));
        job.waitForCompletion(true);
    }
}

6). 启动程序: PageRankJob.java

新建文件:PageRankJob.java

package org.conan.myhadoop.pagerank;
import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.hadoop.mapred.JobConf;
public class PageRankJob {
    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");
    public static void main(String[] args) {
        Map<String, String> path = new HashMap<String, String>();
        path.put("page", "logfile/pagerank/page.csv");// 本地的数据文件
        path.put("pr", "logfile/pagerank/pr.csv");// 本地的数据文件
        path.put("input", HDFS + "/user/hdfs/pagerank");// HDFS的目录
        path.put("input_pr", HDFS + "/user/hdfs/pagerank/pr");// pr存储目
        path.put("tmp1", HDFS + "/user/hdfs/pagerank/tmp1");// 临时目录,存放邻接矩阵
        path.put("tmp2", HDFS + "/user/hdfs/pagerank/tmp2");// 临时目录,计算到得PR,覆盖input_pr
        path.put("result", HDFS + "/user/hdfs/pagerank/result");// 计算结果的PR
        try {
            AdjacencyMatrix.run(path);
            int iter = 3;
            for (int i = 0; i < iter; i++) {// 迭代执行
                PageRank.run(path);
            }
            Normal.run(path);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.exit(0);
    }
    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(PageRankJob.class);
        conf.setJobName("PageRank");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }
    public static String scaleFloat(float f) {// 保留6位小数
        DecimalFormat df = new DecimalFormat("##0.000000");
        return df.format(f);
    }
}

© 著作权归作者所有

共有 人打赏支持
东方神剑

东方神剑

粉丝 63
博文 126
码字总数 93166
作品 0
朝阳
程序员
PageRank算法并行实现(一)

前言 Google通过PageRank算法模型,实现了对全互联网网页的打分。但对于海量数据的处理,在单机下是不可能实现,所以如何将PageRank并行计算,将是本文的重点。 目录 PageRank算法并行化原理...

东方神剑 ⋅ 2014/11/26 ⋅ 0

张尉东/GraphMapReduce

#GraphMapReduce: 基于MapReduce编程模型的图计算框架 (名词约束: 顶点Vertex-图中顶点;节点Process-计算单元节点),目录说明: 代码主要包含四个文件: gmr.cpp gmr.h algorithms.h graph.h |g...

张尉东 ⋅ 2016/01/22 ⋅ 0

从赌钱游戏看PageRank算法

谈到并行计算应用,会有人想到PageRank算法,我们有成千上万的网页分析链接关系确定排名先后,借助并行计算完成是一个很好的场景。长期以来,google的创始发明PageRank算法吸引了很多人学习研...

fourinone ⋅ 2013/03/27 ⋅ 19

深入探讨PageRank(一):PageRank算法原理入门

深入探讨PageRank(一):PageRank算法原理入门 一、PageRank简介 大名鼎鼎的PageRank算法是Google排名运算法则(排名公式)的一个非常重要的组成部分,其用于衡量一个网站好坏的标准。在揉合...

monkey_d_meng ⋅ 2011/06/19 ⋅ 0

【原创】机器学习之PageRank算法应用与C#实现(2)球队排名应用与C#代码

阅读目录 1.足球队排名问题 2.利用PageRank算法的思路 3.C#编程实现过程 4.算法测试   在上一篇文章:机器学习之PageRank算法应用与C#实现(1)算法介绍 中,对PageRank算法的原理和过程进行...

老朱第八 ⋅ 2017/11/11 ⋅ 0

网络爬虫的抓取策略:深度抓取策略、广度优先遍历策略、Partial PageRank策略、OCIP策略、大站优先策略

前言 遍历策略是爬虫的核心问题,在爬虫系统中,待抓取URL队列是很重要的一部分。待抓取URL队列中的URL以什么样的顺序排列也是一个很重要的问题,因为这涉及到先抓取那个页面,后抓取哪个页面...

扶七 ⋅ 05/10 ⋅ 0

基于Hadoop 的分布式网络爬虫技术学习笔记

基于Hadoop 的分布式网络爬虫技术学习笔记 一、网络爬虫原理 Web网络爬虫系统的功能是下载网页数据,为搜索引擎系统提供数据来源。很多大型的网络搜索引擎系统都被称为基于 Web数据采集的搜索...

TJXLJY ⋅ 2014/07/30 ⋅ 0

No.5 使用 PageRank 找到关系网中的牛人

0x00 前言 社交关系数据已经准备就绪,PageRank算法的原理和实现我们也已经大致掌握,下面就可以在此基础上做一些有意思的事情了。 本篇会在前面抓取的500w简书的粉丝数据上,使用 PageRank...

dantezhao ⋅ 2017/09/11 ⋅ 0

pagerank 算法 快速入门

背景 pageRank 是Google CEO 拉里佩奇提出的一种算法,来计算互联网里的网站的重要性,以对搜索进行排名。 此处为啥算法叫pagerank,因为是以Google公司创办人拉里·佩奇(Larry Page)之姓来...

yuejiewc ⋅ 05/23 ⋅ 0

PageRank算法计算网页的价值

现假设有A,B,C,D,E五个网页,其中 1) A网页有链接指向B,C,D,E 2) B网页有链接指向A,D 3) C网页有链接指向A,D 4) D网页有链接指向C 5) E网页有链接指向A,C A 请写出这个网页链接结构的Google矩...

灯下黑鬼吹灯 ⋅ 2016/12/04 ⋅ 17

没有更多内容

加载失败,请刷新页面

加载更多

下一页

笔试题之Java基础部分【简】【一】

基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语法,集合的语法,io 的语法,虚拟机方面的语法,其他 1.length、length()和size() length针对...

anlve ⋅ 7分钟前 ⋅ 0

table eg

user_id user_name full_name 1 zhangsan 张三 2 lisi 李四 `` ™ [========] 2018-06-18 09:42:06 星期一½ gdsgagagagdsgasgagadsgdasgagsa...

qwfys ⋅ 31分钟前 ⋅ 0

一个有趣的Java问题

先来看看源码: public class TestDemo { public static void main(String[] args) { Integer a = 10; Integer b = 20; swap(a, b); System.out......

linxyz ⋅ 35分钟前 ⋅ 0

十五周二次课

十五周二次课 17.1mysql主从介绍 17.2准备工作 17.3配置主 17.4配置从 17.5测试主从同步 17.1mysql主从介绍 MySQL主从介绍 MySQL主从又叫做Replication、AB复制。简单讲就是A和B两台机器做主...

河图再现 ⋅ 今天 ⋅ 0

docker安装snmp rrdtool环境

以Ubuntu16:04作为基础版本 docker pull ubuntu:16.04 启动一个容器 docker run -d -i -t --name flow_mete ubuntu:16.04 bash 进入容器 docker exec -it flow_mete bash cd ~ 安装基本软件 ......

messud4312 ⋅ 今天 ⋅ 0

OSChina 周一乱弹 —— 快别开心了,你还没有女友呢。

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @莱布妮子 :分享吴彤的单曲《好春光》 《好春光》- 吴彤 手机党少年们想听歌,请使劲儿戳(这里) @clouddyy :小萝莉街上乱跑,误把我认错成...

小小编辑 ⋅ 今天 ⋅ 8

Java 开发者不容错过的 12 种高效工具

Java 开发者常常都会想办法如何更快地编写 Java 代码,让编程变得更加轻松。目前,市面上涌现出越来越多的高效编程工具。所以,以下总结了一系列工具列表,其中包含了大多数开发人员已经使用...

jason_kiss ⋅ 昨天 ⋅ 0

Linux下php访问远程ms sqlserver

1、安装freetds(略,安装在/opt/local/freetds 下) 2、cd /path/to/php-5.6.36/ 进入PHP源码目录 3、cd ext/mssql进入MSSQL模块源码目录 4、/opt/php/bin/phpize生成编译配置文件 5、 . ./...

wangxuwei ⋅ 昨天 ⋅ 0

如何成为技术专家

文章来源于 -- 时间的朋友 拥有良好的心态。首先要有空杯心态,用欣赏的眼光发现并学习别人的长处,包括但不限于工具的使用,工作方法,解决问题以及规划未来的能力等。向别人学习的同时要注...

长安一梦 ⋅ 昨天 ⋅ 0

Linux vmstat命令实战详解

vmstat命令是最常见的Linux/Unix监控工具,可以展现给定时间间隔的服务器的状态值,包括服务器的CPU使用率,内存使用,虚拟内存交换情况,IO读写情况。这个命令是我查看Linux/Unix最喜爱的命令...

刘祖鹏 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部