基于spark的图计算框架GraphX

原创
2014/09/04 11:24
阅读数 862

基于Spark的GraphX.pptx

1. Property Graph:用户定义的有向图,图中的每个顶点和每条边都附加一个用户定义的对象,允许在两个顶点之间并行存在多条边。每个顶点都具有一个64位的唯一标识(VertexID),GraphX并不强制VertexID有序。每条边则由起始和终止VertexID标识。

Graph具有两个参数化的类型:Vertex(VD)和Edge(ED),分别对应附加在顶点和边上的对象。当VD和ED为基本的数据类型时,Graph将把它们保存在数组中。

Graph和RDD一样(spark的基本数据类型,Resilient Distributed Dataset),创建之后不可再改变,分布式存储在集群上,并且具有容错能力。对图中结构和值的改变,都将需要产生一个新的Graph对象,新的Graph将与之前的Graph共享大部分数据结构。Graph通过顶点分割方法,分割在不同的机器上。任何数据分片所在机器的失败都将引发该数据分片在其它机器上重新创建。

逻辑上Graph包含VertexRDD和EdgeRDD,即:

    class Graph[VD,ED] {

    val vertices: VertexRDD[VD]

    val edges: EdgeRDD[ED,VD]

    }

    其中,VertexRDD[VD]和EdgeRDD[ED,VD]分别是RDD[VertexID,VD]和RDD[Edge[ED]]经过优化(extends)后的版本,提供了图计算相关功能,并做了内部优化。

    2. Graph类的成员变量

    class Graph[VD, ED] {
      //Graph的基本信息:边数,顶点数,入度,出度,度
      val numEdges: Long
      val numVertices: Long
      val inDegrees: VertexRDD[Int]
      val outDegrees: VertexRDD[Int]
      val degrees: VertexRDD[Int]
      //Graph的顶点RDD,边RDD,以及三元组RDD
      val vertices: VertexRDD[VD]
      val edges: EdgeRDD[ED, VD]
      val triplets: RDD[EdgeTriplet[VD, ED]]
     }
    class Graph[VD, ED] {
      def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
      def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
      def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
    }
    上面的每个操作都将改变Graph中vertex和edge特性,并产生一个新的Graph
class Graph[VD, ED] {
  def reverse: Graph[VD, ED]
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED]
  def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED]
}

reverse操作返回一个愿图中边的方向反转的新图。由于该操作没有改变顶点和边的特性,所以不需要数据的移动

subgraph操作返回连接的点和边满足vpred和epred构成的子图

mask操作返回两个图相交的子图,groupEdges操作合并重复的边

5. 连接操作

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD)
    : Graph[VD, ED]
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]}

joinVertices操作,连接顶点和输入的RDD,然后对连接得到的顶点,应用用户定义的map函数,若在RDD中没有匹配连接的顶点,则保持顶点原有的值不变

outerjoinVertices类似于joinVertices,只是用户定义的map函数应用于所有的顶点,且可以改变顶点的类型

其中f(a)(b)的写法类似于f(a,b),只是参数b的类型取决于a

6. 邻域聚合

GraphX中,经过深度优化的核心聚合操作是mapReduceTriplets

class Graph[VD, ED] {
  def mapReduceTriplets[A](
      map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
      reduce: (A, A) => A)
    : VertexRDD[A]}

mapReduceTriplets接收一个用户定义的map函数,应用于Graph的每一个三元组,产生消息(message)給三元组中的任意顶点。为了便于预先聚合优化,暂时只支持給其中一个顶点发送消息。随后,用户定义的reduce函数结合发送给每一个顶点的消息。最终返回VertexRDD[A],没有收到消息的顶点不包含在该结果之中

mapRedeceTriplets还包含一个可选的参数:activeSetOpt,指定执行map操作的顶点集合

7. 在spark中,RDD默认是不会一直保存在内存中的,为了避免重复计算,需要显式的指定:Graph.cache(),显式指定保存在内存中的RDD只有在系统内存不足时,才会强制采用LRU(least recently uesd)方式调出内存。然而,对于迭代计算则应该uncaching迭代产生的中间数据,因此,在进行图的迭代计算时,推荐采用Pregel API,它会自动的unpersist不需要的中间结果。

8. GraphX Pregel API

图天然就是一个递归的数据结构,图中顶点的特性取决于它们邻域顶点的特性,反过来又影响其邻域顶点的特性。因此,很多重要的图算法都需要迭代计算每个顶点的特性,直到收敛。GraphX提供类似于Pregel的操作,其是Google Pregel和GraphLab框架抽象的结合。

class GraphOps[VD, ED] {
  def pregel[A]
      (initialMsg: A,    //初始消息,最大迭代次数,消息传递方向
       maxIter: Int = Int.MaxValue,
       activeDir: EdgeDirection = EdgeDirection.Out)
      (vprog: (VertexId, VD, A) => VD,
       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
       mergeMsg: (A, A) => A)
    : Graph[VD, ED] = {
    var g = mapVertices( (vid, vdata) => vprog(vid, vdata, initialMsg) ).cache()
    var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
    var activeMessages = messages.count()
    var i = 0
    while (activeMessages > 0 && i < maxIterations) {
      val newVerts = g.vertices.innerJoin(messages)(vprog).cache()
      g = g.outerJoinVertices(newVerts) { (vid, old, newOpt) => newOpt.getOrElse(old) }.cache()
      messages = g.mapReduceTriplets(sendMsg, mergeMsg, Some((newVerts, activeDir))).cache()
      activeMessages = messages.count()
      i += 1
    }
    g
  }
}

9. 创建Graph

GraphX提供了根据顶点和边RDD或者从磁盘上创建图的方法。默认情况下,图构建器不会重新分割图的边,即边将留在它们起始分片所在机器。然而,Graph.groupEdges要求将图重新分片,因为该操作假设相同的边处在相同的分片中。所以需先调用Graph.partitionBy操作。

GraphLoader.edgeListFile操作,从磁盘加载图,解析sourceVD destinationVD,跳过#开始的注释行,顶点值默认为1

10.VertexRDD和EdgeRDD

GraphX提供Graph的VertexRDD和EdgeRDD,由于GraphX对顶点和边的数据结构进行了优化,因此还提供一些额外的功能。Vertex[A]继承自RDD[VertexID,A],并且约束VertexID只能出现一次,采用哈希表的方式存储顶点属性A。EdgeRDD继承自RDD[Edge[ED]]依据策略PartitionStrategy,将边保存在分块中。在每个分块中,边的结构和属性保存在不同的结构中。

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部