sparkGraphX 基础操作
sparkGraphX 基础操作
JPblog 发表于6个月前
sparkGraphX 基础操作
• 发表于 6个月前
• 阅读 11
• 收藏 0
• 评论 0

1. 目的

由数组形成graph并展示基础操作

2. 素材

由Array生成vertexRDD和edgeRDD

3. 代码

/**
* Created by puwenchao on 2016-07-06.
*/
package test
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx._
import org.apache.spark.graphx.lib.ShortestPaths
import org.apache.spark.rdd.RDD

object triangle {
def main(args: Array[String]) {
//屏蔽日志
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

//设置运行环境
val conf = new SparkConf().setAppName("triangle").setMaster("local")
val sc = new SparkContext(conf)

//设置顶点和边，注意顶点和边都是用元组定义的Array
//顶点的数据类型是VD:(String,Int)
val vertexArray = Array(
(3L, ("rxin", 28)),
(7L, ("jgonzal", 27)),
(5L, ("franklin", 65)),
(2L, ("istoica", 42))
)
//边的数据类型ED:Int
val edgeArray = Array(
Edge(3L, 7L, 7),
Edge(5L, 3L, 2),
Edge(2L, 5L, 4),
Edge(5L, 7L, 3)
)

//构造vertexRDD和edgeRDD
val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray)
val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray)
//构造图Graph[VD,ED]
val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
//筛选有效子图
val subgraph= graph.subgraph(vpred = (id,attr) => attr._2 != "Missing")
println("subgraph:")
subgraph.vertices.collect().foreach(println(_))

//三元组
val e=graph.triplets
println("\ntriplets:")
e.take(10).foreach {println}

//总边数
val f=graph.numEdges
println("\nnumEdges:"+f)

//总顶点数
val g=graph.numVertices
println("\nnumVertices:"+g)

//顶点入度
val h=graph.inDegrees
println("\ninDegrees:"+h.collect.mkString(","))

//顶点出度
val i=graph.outDegrees
println("\noutDegrees:"+i.collect.mkString(","))

//筛选来源点ID>目标点ID
val j=graph.edges.filter { case Edge(src, dst, prop) => src > dst }.count
println("\nedge-filter:"+j)

//三角计数
val l=graph.triangleCount.vertices
println("\ntriangleCount:")
l.foreach(println)

//网页排序算法
val m=graph.pageRank(0.1).vertices
println("\npageRank:")
m.foreach(println)

//最短路径
val landmarks = Seq(7).map(_.toLong)
val o=ShortestPaths.run(graph,landmarks).vertices
println("\nShortestPaths:")
o.foreach(println)

sc.stop()
}
}

4. 输出

subgraph:
(3,(rxin,28))
(7,(jgonzal,27))
(5,(franklin,65))
(2,(istoica,42))

triplets:
((2,(istoica,42)),(5,(franklin,65)),4)
((3,(rxin,28)),(7,(jgonzal,27)),7)
((5,(franklin,65)),(3,(rxin,28)),2)
((5,(franklin,65)),(7,(jgonzal,27)),3)

numEdges:4

numVertices:4

inDegrees:(3,1),(7,2),(5,1)

outDegrees:(3,1),(5,2),(2,1)

edge-filter:1

triangleCount:
(3,1)
(7,1)
(5,1)
(2,0)

pageRank:
(3,0.2679375)
(7,0.39543749999999994)
(5,0.27749999999999997)
(2,0.15)

ShortestPaths:
(3,Map(7 -> 1))
(7,Map(7 -> 0))
(5,Map(7 -> 1))
(2,Map(7 -> 2))

×