文档章节

Spark GraphX之Dijkstra、Prime、FloydWarshall

绝世武神
 绝世武神
发布于 2017/02/09 17:09
字数 1463
阅读 371
收藏 0

输入图片说明

ShortestPaths的源码如下:

package org.apache.spark.graphx.lib

import scala.reflect.ClassTag

import org.apache.spark.graphx._

/**
 * Computes shortest paths to the given set of landmark vertices, returning a graph where each
 * vertex attribute is a map containing the shortest-path distance to each reachable landmark.
 */
object ShortestPaths {
  /** Stores a map from the vertex id of a landmark to the distance to that landmark. */
  type SPMap = Map[VertexId, Int]

  private def makeMap(x: (VertexId, Int)*) = Map(x: _*)

  private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }

  private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap =
    (spmap1.keySet ++ spmap2.keySet).map {
      k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
    }.toMap

  /**
   * Computes shortest paths to the given set of landmark vertices.
   *
   * @tparam ED the edge attribute type (not used in the computation)
   *
   * @param graph the graph for which to compute the shortest paths
   * @param landmarks the list of landmark vertex ids. Shortest paths will be computed to each
   * landmark.
   *
   * @return a graph where each vertex attribute is a map containing the shortest-path distance to
   * each reachable landmark vertex.
   */
  def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = {
    val spGraph = graph.mapVertices { (vid, attr) =>
      if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap()
    }

    val initialMessage = makeMap()

    def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
      addMaps(attr, msg)
    }

    def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = {
      val newAttr = incrementMap(edge.dstAttr)
      if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr))
      else Iterator.empty
    }

    Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
  }
}

关于单源最短路径,我们可以调用 ShortestPaths .run(graph, landmarks) 得到graph中的顶点到landmarks的“距离”,但是这个“距离”只是“跳数”。换句话说,只在graph中每条边的权重都为1的情况下,才能保证结果的正确性。而现实情况中,往往都不满足这个条件。那么问题来了,我们该如何做呢?学过图论的朋友都知道,Dijkstra算法可以解决这个问题。遗憾的是,GraphX目前(Spark2.0.2)并未提供这样的API,所以基于GraphX实现Dijkstra算法变得很有必要。

Dijkstra(单源最短路径)

  //单源最短路径
  def dijkstra[VD: ClassTag](g : Graph[VD, Double], origin: VertexId) = {
    //初始化,其中属性为(boolean, double,Long)类型,boolean用于标记是否访问过,double为顶点距离原点的距离,Long是上一个顶点的id
    var g2 = g.mapVertices((vid, _) => (false, if(vid == origin) 0 else Double.MaxValue, -1L))

    for(i <- 1L to g.vertices.count()) {
      //从没有访问过的顶点中找出距离原点最近的点
      val currentVertexId = g2.vertices.filter(! _._2._1).reduce((a,b) => if (a._2._2 < b._2._2) a else b)._1
      //更新currentVertexId邻接顶点的‘double’值
      val newDistances = g2.aggregateMessages[(Double, Long)](
        triplet => if(triplet.srcId == currentVertexId && !triplet.dstAttr._1) {    //只给未确定的顶点发送消息
          triplet.sendToDst((triplet.srcAttr._2 + triplet.attr, triplet.srcId))
        },
        (x, y) => if(x._1 < y._1) x else y ,
        TripletFields.All
      )
      //newDistances.foreach(x => println("currentVertexId\t"+currentVertexId+"\t->\t"+x))
      //更新图形
      g2 = g2.outerJoinVertices(newDistances) {
        case (vid, vd, Some(newSum)) => (vd._1 || vid == currentVertexId, math.min(vd._2, newSum._1), if(vd._2 <= newSum._1) vd._3 else newSum._2 )
        case (vid, vd, None) => (vd._1|| vid == currentVertexId, vd._2, vd._3)
      }
      //g2.vertices.foreach(x => println("currentVertexId\t"+currentVertexId+"\t->\t"+x))
    }

    //g2
    g.outerJoinVertices(g2.vertices)( (vid, srcAttr, dist) => (srcAttr, dist.getOrElse(false, Double.MaxValue, -1)._2, dist.getOrElse(false, Double.MaxValue, -1)._3) )
  }

Prime(最小生成树)

知道Dijkstra算法的人也一定知道Prime算法。

  //最小生成树
  def prime[VD: ClassTag](g : Graph[VD, Double], origin: VertexId) = {
    //初始化,其中属性为(boolean, double,Long)类型,boolean用于标记是否访问过,double为加入当前顶点的代价,Long是上一个顶点的id
    var g2 = g.mapVertices((vid, _) => (false, if(vid == origin) 0 else Double.MaxValue, -1L))

    for(i <- 1L to g.vertices.count()) {
      //从没有访问过的顶点中找出 代价最小 的点
      val currentVertexId = g2.vertices.filter(! _._2._1).reduce((a,b) => if (a._2._2 < b._2._2) a else b)._1
      //更新currentVertexId邻接顶点的‘double’值
      val newDistances = g2.aggregateMessages[(Double, Long)](
        triplet => if(triplet.srcId == currentVertexId && !triplet.dstAttr._1) {    //只给未确定的顶点发送消息
          triplet.sendToDst((triplet.attr, triplet.srcId))
        },
        (x, y) => if(x._1 < y._1) x else y ,
        TripletFields.All
      )
      //newDistances.foreach(x => println("currentVertexId\t"+currentVertexId+"\t->\t"+x))
      //更新图形
      g2 = g2.outerJoinVertices(newDistances) {
        case (vid, vd, Some(newSum)) => (vd._1 || vid == currentVertexId, math.min(vd._2, newSum._1), if(vd._2 <= newSum._1) vd._3 else newSum._2 )
        case (vid, vd, None) => (vd._1|| vid == currentVertexId, vd._2, vd._3)
      }
      //g2.vertices.foreach(x => println("currentVertexId\t"+currentVertexId+"\t->\t"+x))
    }

    //g2
    g.outerJoinVertices(g2.vertices)( (vid, srcAttr, dist) => (srcAttr, dist.getOrElse(false, Double.MaxValue, -1)._2, dist.getOrElse(false, Double.MaxValue, -1)._3) )
  }

FloydWarshall(多源最短路径)

  //多源最短路径
  def floydWarshall[VD: ClassTag](g: Graph[VD, Double]) = {
    def mergeMaps(a: Map[VertexId, Double], b: Map[VertexId, Double]) = {
      (a.keySet ++ b.keySet).map{ k => (k, math.min(a.getOrElse(k, Double.MaxValue), b.getOrElse(k, Double.MaxValue))) }.toMap
    }

    val N = g.vertices.count()    //图顶点的个数
    var n = -1
    //初始化图
    var g2 = g.mapVertices( (vid, _) => Map(vid -> 0.0) )

    //当n = N*N时,退出循环。注:不难发现最终结果是一个实对称矩阵
    while(n < N * N) {
      val newVertices = g2.aggregateMessages[Map[VertexId, Double]](
        triplet =>{
          val dstPlus = triplet.dstAttr.map{ case (vid, distance) => (vid, triplet.attr+distance) }
          if(dstPlus != triplet.srcAttr) { triplet.sendToSrc(dstPlus) }
        },
        (a, b) => mergeMaps(a, b) ,
        TripletFields.Dst
      )

      g2 = g2.outerJoinVertices(newVertices)( (_, oldAttr, opt) => mergeMaps(oldAttr, opt.get) )

      n = g2.vertices.map{ case (vid, srcAttr) => srcAttr.size }.reduce(_ + _)
      //println("number\t" + n)
    }

    g2
  }

纸上得来终觉浅,绝知此事要躬行。下面开始实战、实战、实战,重要的事情说三遍!!!

    val myVertices = sc.makeRDD(Array((1L, "A"), (2L, "B"), (3L, "C"), (4L, "D"), (5L, "E"), (6L, "F"), (7L, "G")))
    val initialEdges = sc.makeRDD(Array(Edge(1L, 2L, 7.0), Edge(1L, 4L, 5.0),
                                   Edge(2L, 3L, 8.0), Edge(2L, 4L, 9.0), Edge(2L, 5L, 7.0),
                                   Edge(3L, 5L, 5.0),
                                   Edge(4L, 5L, 15.0), Edge(4L, 6L, 6.0),
                                   Edge(5L, 6L, 8.0), Edge(5L, 7L, 9.0),
                                   Edge(6L, 7L, 11.0)))
    val myEdges = initialEdges.filter(e => e.srcId != e.dstId).flatMap(e => Array(e, Edge(e.dstId, e.srcId, e.attr))).distinct()  //去掉自循环边,有向图变为无向图,去除重复边
    val myGraph = Graph(myVertices, myEdges).cache()

    println(ShortestPaths.run(myGraph, Seq(3)).vertices.collect().mkString(","))
    println(dijkstra(myGraph, 3L).vertices.map(x => (x._1, x._2)).collect().mkString(" | "))
    println(prime(myGraph, 3L).vertices.map(x => (x._1, x._2)).collect().mkString(" | "))
    floydWarshall(myGraph).vertices.foreach(println)

输出依次如下:

ShortestPaths:
(1,Map(3 -> 2)) | (2,Map(3 -> 1)) | (3,Map(3 -> 0)) | (4,Map(3 -> 2)) | (5,Map(3 -> 1)) | (6,Map(3 -> 2)) | (7,Map(3 -> 2))
Dijkstra:
(1,(A,15.0,2)) | (2,(B,8.0,3)) | (3,(C,0.0,-1)) | (4,(D,17.0,2)) | (5,(E,5.0,3)) | (6,(F,13.0,5)) | (7,(G,14.0,5))
Prime:
(1,(A,7.0,2)) | (2,(B,7.0,5)) | (3,(C,0.0,-1)) | (4,(D,5.0,1)) | (5,(E,5.0,3)) | (6,(F,6.0,4)) | (7,(G,9.0,5))
FloydWarshall:
(4,Map(5 -> 14.0, 1 -> 5.0, 6 -> 6.0, 2 -> 9.0, 7 -> 17.0, 3 -> 17.0, 4 -> 0.0))
(2,Map(5 -> 7.0, 1 -> 7.0, 6 -> 15.0, 2 -> 0.0, 7 -> 16.0, 3 -> 8.0, 4 -> 9.0))
(7,Map(5 -> 9.0, 1 -> 22.0, 6 -> 11.0, 2 -> 16.0, 7 -> 0.0, 3 -> 14.0, 4 -> 17.0))
(5,Map(5 -> 0.0, 1 -> 14.0, 6 -> 8.0, 2 -> 7.0, 7 -> 9.0, 3 -> 5.0, 4 -> 14.0))
(3,Map(5 -> 5.0, 1 -> 15.0, 6 -> 13.0, 2 -> 8.0, 7 -> 14.0, 3 -> 0.0, 4 -> 17.0))
(1,Map(5 -> 14.0, 1 -> 0.0, 6 -> 11.0, 2 -> 7.0, 7 -> 22.0, 3 -> 15.0, 4 -> 5.0))
(6,Map(5 -> 8.0, 1 -> 11.0, 6 -> 0.0, 2 -> 15.0, 7 -> 11.0, 3 -> 13.0, 4 -> 6.0))

友情链接

© 著作权归作者所有

绝世武神
粉丝 20
博文 33
码字总数 48343
作品 0
海淀
程序员
私信 提问
Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈与熟练的掌握Scala语言【大数据Spark

Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈 大数据的概念与应用,正随着智能手机、平板电脑的快速流行而日渐普及,大数据中图的并行化处理一直是一个非常热门的话题。图计算正在被广泛地...

Spark亚太研究院
2014/08/29
1K
0
Spark之GraphX的特点

1.基于内存实现了数据的复用与快速读取 具有较多迭代次数是图计算算法的一个重要特点。在海量数据背景下,如何保证图计算算法的执行效率是所有图计算模型面对的一个难题。基于MapReduce的图计...

mmake1994
2018/04/16
0
0
Spark 数据分析导论-笔记

Spark Core Spark Core 实现了Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。 Spark Core 中还包含了 对弹性分布式数据集(resilient distributed dataset,简...

Java搬砖工程师
2018/12/26
28
0
Spark2.1.0之模块设计

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80386736 在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》和《Spark...

泰山不老生
2018/06/05
0
0
Spark—4(Spark核心组件)

1、Spark Streaming Spark Sreaming基于Spark Core实现了可扩展、高吞吐和容错的实时数据流处理。现在支持的数据源和处理后的结果存储如下图所示。 Spark Streaming将流式计算分解成一系列短...

叶枫啦啦
07/09
33
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
今天
2.1K
14
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
今天
38
0
计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
40
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
61
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
20
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部