文档章节

Spark GraphX Pregel API: An Example

openthings
 openthings
发布于 2016/09/02 14:23
字数 1952
阅读 320
收藏 0

GraphX Pregel API

Graph data and graph processing is getting more and more attention lately in various fields. It has become apparent that a large number of real world problems can be described in terms of graphs, for instance, the Web graph, the social network graph, the train network graph and the language graph. Often these graphs are exceptionally huge, take the Web graph for example, it is estimated that the number of web pages may have exceeded 30 billion. We are in need of a system that is able to process these graphs created by modern applications.

The importance of graph data has driven the development of numerous tools and frameworks to perform graph processing or graph storage, for instance, Apache Giraph, GraphLab, Spark GraphX, Neo4j, Titan and others. In this article, we will focus on one such tool, namely, the GraphX Pregel API.

What is GraphX

Most of us have heard of the Apache Spark cluster computing framework and how the Databricks team used Spark to set a new world record in large scale sorting. GraphX is a distributed graph processing framework that was built on top of Spark, it started off as a research project at UC Berkeley AMPLab and Databricks (have a look at this paper) and it is now part of the Apache Spark project.

graphx

GraphX extends the Resilient Distributed Dataset (RDD) in Spark by introducing the Resilient Distributed Property Graph, on top of it. Built into GraphX are a collection of graph algorithms (e.g., Page Rank, Triangle Counting and Connected Components) and also some fundamental operations on graph. It even has an optimised variant of the Pregel API, which is a popular graph processing architecture developed at Google in a 2010 paper that aims to solve some of the problem faced in large graph processing.

Pregel API

As mentioned earlier, graphs created by modern appliction can be very large (potentially terabytes or petabytes in size), thus, a single node (computer) will not be able to hold all these nodes in memory. The way forward is to partition and parallelise the graph to involve many machines to process them in parallel.

Pregel employs a vertex-centric approach in processing large distributed graphs. As described in this paper, Pregel computations consist of a sequence of iterations, called supersteps. During a superstep, the following can happen in the framework:

  • It receives and reads messages that are sent to V from the previous superstep S-1.
  • It applies a user-defined function f to each vertices in parallel, so f essentially specifies the behaviour of a single vertex V at a single superstep S.
  • It can mutate the state of V.
  • It can send messages to other vertices (typically along outgoing edges) that the vertices will receive in the next superstep S+1.

In this approach, all communations are between supersteps S and S+1. Within each superstep, the same user-defined function f is applied to all of the vertices in parallel.

Termination condition

Each vertex either has the state active or inactive, in each superstep, vertices that receive a message will become active if not already, f will only be applied to vertices that are active.

In superstep 0, all vertices are in the active state, they will all receive a user-defined initial message, m. They will then perform the operations mentioned above and in the end, vertices that remain active will send messages to other vertices (typically along outgoing edges), while other inactive vertices will do nothing.

The iteration terminates when all vertices are in the inactive state and that no vertices will receive any further messages.

Example

Let’s have a look at a simple example, given the graph below with 4 nodes, each representing an integer value, we wish to find the minimum value in this graph.

pregel_initial_graph

The pseudo-code definition of f is also given above, it will:

  • Set originalValue to the current value of the vertex.
  • Mutate the value of the vertex to the minimum of all the incoming messages and originalValue.
  • If originalValue and value are the same, then we will render the vertex inactive. Otherwise, send message out to all its outgoing neighbour.

In the diagrams that follows, active vertices are indicated by green shades, and after f is applied, the internal value of the vertex will be indicated by m(n) with m representing the new value and n representing the original value.

In superstep 0, all the vertices are initialised to be in the active state and having -1 as their originalValue. In this superstep, the vertices will send their values as messages to all their outgoing neighbours as illustrated in the following diagram.

pregel_superstep_00

In superstep 1, f will be applied to all the active vertices. Vertices that receive a message (from superstep 0) that is smaller than their current value will mutate their current value to the new value (while keeping track of the original value), all the other vertices will remain unchanged.

Once that’s done, vertices that fulfil the predicate originalValue == value will be switched to the state inactive (indicated by pink shades). For instance, the upper right vertex receive the message 7, then it set its originalValue to 3 and its value to 3, notice now that originalValue == value, thus rendering it inactive. Inactive vertices will not send any further message to their neighbours while other active vertices will send its value to the next vertices (outgoing neighbours).

pregel_superstep_01

Similar things happen in superstep 2, vertices that receive one or more messages will become active (e.g., the upper right vertex, it was inactive in the previous superstep, but became active in this superstep since it has an incoming message) if not already, then f is applied to all the active vertices, mutating some of them and making some of them inactive along the way before sending out further messages. In our case, there is only one active vertex left (upper right) and it will send out the message 2 to its neighbour.

pregel_superstep_02

Once again, in superstep 3, f is applied to the active vertices (upper right and lower right in our case), only this time, after f is applied, all the vertices have originalValue == value, rendering them inactive and there are no further message passing involve, we have reached our termination condition, if we take a look at the new graph, it will tell us the minimum value in all its vertices.

pregel_superstep_03

We have shown how the Pregel API works with a simple example, we also mentioned that GraphX exposes a variant of the Pregel API. In the next section, we will take a look at the same method and how it will work in GraphX by implementing it in code.

Example in GraphX

In GraphX, a graph is defined to be a property graph that is parameterised over the vertex (VD) and edge (ED) types. Each vertex is keyed by a unique 64-bit long identifier (VertexID).

1234class Graph[VD, ED] {  val vertices: VertexRDD[VD]  val edges: EdgeRDD[ED]}

The VertexRDD[A] extends RDD[(VertexID, A)] and adds the additional constraint that each VertexID occurs only once. while the EdgeRDD[ED] extends RDD[Edge[ED]].

Take the same example as given in the previous section.

graphx_initial_graph

In our case, our graph is a simple graph, we model the vertices to have property of type (Int, Int) (the original value and the new value) while edges have no property, for the sake of simplicity, we force a Boolean property to the edges that will always have the value true. So, our final graph will have the type Graph[Int, Boolean]. Let’s write the code to generate the graph.

12345678910111213141516171819202122232425val sc: SparkContext// Create an RDD for the verticesval vertices: RDD[(VertexId, (Int, Int))] =  sc.parallelize(Array((1L, (7,-1)), (2L, (3,-1)),                       (3L, (2,-1)), (4L, (6,-1))))// Create an RDD for edgesval relationships: RDD[Edge[Boolean]] =  sc.parallelize(Array(Edge(1L, 2L, true), Edge(1L, 4L, true),                       Edge(2L, 4L, true), Edge(3L, 1L, true),                        Edge(3L, 4L, true)))// Create the graphval graph = Graph(vertices, relationships)// Check the graphgraph.vertices.collect.foreach(println)
// output://    (1, (7, -1))//    (2, (3, -1))//    (3, (2, -1))//    (4, (6, -1))

Pregel API in GraphX

So now, graph is the GraphX representation of our Initial Graph shown in the diagram above. Let’s take a look at the Pregel API in GraphX.

12345678def pregel[A]      (initialMsg: A,       maxIter: Int = Int.MaxValue,       activeDir: EdgeDirection = EdgeDirection.Out)      (vprog: (VertexId, VD, A) => VD,       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],       mergeMsg: (A, A) => A)    : Graph[VD, ED]

The first argument list contains configuration parameters:

  • initialMsg is the user defined message that will be sent to the vertices prior to superstep 0.
  • maxIter is the maximum number of iterations we will perform before it terminates.
  • activeDir is the  edge direction in which to send messages, normally, this will be EdgeDirection.Out.

The second argument list contains the functions that handles the mutation of state and message passing:

  • vprog is the user defined function for receiving messages.
  • sendMsg is the user defined function to determine the messages to send out for the next iteration and where to send it to.
  • mergeMsg is the user defined function to merge multiple messages arriving at the same vertex at the start of a superstep before applying the vertex program vprog.

Edge Triplet

Notice EdgeTriplet[VD, ED] in the signature of sendMsg? An edge triplet represents an edge, along with the vertex attributes of its neighboring vertices. It has the following properties:

  • srcId The source vertex id.
  • srcAttr The source vertex attribute.
  • dstId The destination vertex id.
  • dstAttr The destination vertex attribute.
  • attr The edge attribute.

Graphically, the difference between Vertices, Edges and Triplets is illustrated in the diagram below.

triplet

User defined functions f

Now that we have seen the GraphX Pregel API, let’s start by defining our own vprog, sendMsg and mergeMsg functions.

12345678910111213141516171819val initialMsg = 9999def vprog(vertexId: VertexId, value: (Int, Int), message: Int): (Int, Int) = {  if (message == initialMsg)    value  else    (message min value._1, value._1)}def sendMsg(triplet: EdgeTriplet[(Int, Int), Boolean]): Iterator[(VertexId, Int)] = {  val sourceVertex = triplet.srcAttr  if (sourceVertex._1 == sourceVertex._2)    Iterator.empty  else     Iterator((triplet.dstId, sourceVertex._1))}def mergeMsg(msg1: Int, msg2: Int): Int = msg1 min msg2

We have chosen 9999 (with assumption that no numbers will exceed this) to be the initial message that will be sent to all the vertices in superstep 0 to get things started, but feel free to choose INTEGER.MAX if needed.

With all the functions defined, let’s feed them into the GraphX Pregel API as shown below.

123456val minGraph = graph.pregel(initialMsg,                             Int.MaxValue,                             EdgeDirection.Out)(                            vprog,                            sendMsg,                            mergeMsg)

Once the execution finish, minGraph will contain the result graph, we can inspect the graph by printing them out (since it’s a small graph).

12345678910minGraph.vertices.collect.foreach{  case (vertexId, (value, original_value)) => println(value)}
// output://    2//    2//    2//    2

Thus, the minimum value in the graph is 2.

Conclusion

We have demonstrated a simple example in GraphX of the Pregel API, it is enough to get us started with the GraphX Pregel API, but of course this is not the only way to find the minimum value in a graph, there might be some other algorithm that makes use of the relationship attribute to solve the same problem. Since the publication of the Pregel paper in 2010, there are plenty of example we can find from the internet that use Pregel to solve complex graph problems. There is also Apache Giraph which is an open source implementation of Pregel.

In the next article, we will talk about GraphX in detail and how we can make good use of its graph parallelism to solve some complex graph problem effectively. So stay tuned.  

本文转载自:http://www.cakesolutions.net/teamblogs/graphx-pregel-api-an-example

openthings
粉丝 325
博文 1140
码字总数 689435
作品 1
东城
架构师
私信 提问
加载中

评论(1)

g
geowangsh
精华帖!
Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈与熟练的掌握Scala语言【大数据Spark

Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈 大数据的概念与应用,正随着智能手机、平板电脑的快速流行而日渐普及,大数据中图的并行化处理一直是一个非常热门的话题。图计算正在被广泛地...

Spark亚太研究院
2014/08/29
1K
0
Spark2.1.0之模块设计

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80386736 在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》和《Spark...

泰山不老生
2018/06/05
0
0
Spark 数据分析导论-笔记

Spark Core Spark Core 实现了Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。 Spark Core 中还包含了 对弹性分布式数据集(resilient distributed dataset,简...

Java搬砖工程师
2018/12/26
56
0
Spark之GraphX的特点

1.基于内存实现了数据的复用与快速读取 具有较多迭代次数是图计算算法的一个重要特点。在海量数据背景下,如何保证图计算算法的执行效率是所有图计算模型面对的一个难题。基于MapReduce的图计...

mmake1994
2018/04/16
0
0
Spark—4(Spark核心组件)

1、Spark Streaming Spark Sreaming基于Spark Core实现了可扩展、高吞吐和容错的实时数据流处理。现在支持的数据源和处理后的结果存储如下图所示。 Spark Streaming将流式计算分解成一系列短...

叶枫啦啦
07/09
51
0

没有更多内容

加载失败,请刷新页面

加载更多

32位与64位Linux系统下各类型长度对比

64 位的优点:64 位的应用程序可以直接访问 4EB 的内存和文件大小最大达到4 EB(2 的 63 次幂);可以访问大型数据库。本文介绍的是64位下C语言开发程序注意事项。 1. 32 位和 64 位C数据类型...

mskk
8分钟前
1
0
Vue 实现点击空白处隐藏某节点(三种方式:指令、普通、遮罩)

在项目中往往会有这样的需求: 弹出框(或Popover)在 show 后,点击空白处可以将其 hide。 针对此需求,整理了三种实现方式,大家按实际情况选择。 当然,我们做项目肯定会用到 UI 框架,常...

张兴华ZHero
15分钟前
2
0
SpringBoot激活profiles你知道几种方式?

多环境是最常见的配置隔离方式之一,可以根据不同的运行环境提供不同的配置信息来应对不同的业务场景,在SpringBoot内支持了多种配置隔离的方式,可以激活单个或者多个配置文件。 激活Profi...

恒宇少年
17分钟前
3
0
PDF修改文字的方法有哪些?怎么修改PDF文件中的文字

PDF修改文字一直以来都是一个难以解决的问题,很多的办公族在办公的时候会有修改PDF文件中的文字的需要,可是PDF文件一般是不能进行编辑和修改的,难道就没有什么办法解决这个问题了嘛?不要...

趣味办公社
20分钟前
2
0
企业组织中采用服务网格的挑战

作者:Christian Posta 译者:罗广明 原文:https://blog.christianposta.com/challenges-of-adopting-service-mesh-in-enterprise-organizations/ 编者按 本文作者介绍了企业组织采用服务网...

jimmysong
29分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部