Spark GraphX Pregel API: An Example

2016/09/02 14:23
阅读数 1.4K

GraphX Pregel API

Graph data and graph processing is getting more and more attention lately in various fields. It has become apparent that a large number of real world problems can be described in terms of graphs, for instance, the Web graph, the social network graph, the train network graph and the language graph. Often these graphs are exceptionally huge, take the Web graph for example, it is estimated that the number of web pages may have exceeded 30 billion. We are in need of a system that is able to process these graphs created by modern applications.

The importance of graph data has driven the development of numerous tools and frameworks to perform graph processing or graph storage, for instance, Apache Giraph, GraphLab, Spark GraphX, Neo4j, Titan and others. In this article, we will focus on one such tool, namely, the GraphX Pregel API.

What is GraphX

Most of us have heard of the Apache Spark cluster computing framework and how the Databricks team used Spark to set a new world record in large scale sorting. GraphX is a distributed graph processing framework that was built on top of Spark, it started off as a research project at UC Berkeley AMPLab and Databricks (have a look at this paper) and it is now part of the Apache Spark project.


GraphX extends the Resilient Distributed Dataset (RDD) in Spark by introducing the Resilient Distributed Property Graph, on top of it. Built into GraphX are a collection of graph algorithms (e.g., Page Rank, Triangle Counting and Connected Components) and also some fundamental operations on graph. It even has an optimised variant of the Pregel API, which is a popular graph processing architecture developed at Google in a 2010 paper that aims to solve some of the problem faced in large graph processing.

Pregel API

As mentioned earlier, graphs created by modern appliction can be very large (potentially terabytes or petabytes in size), thus, a single node (computer) will not be able to hold all these nodes in memory. The way forward is to partition and parallelise the graph to involve many machines to process them in parallel.

Pregel employs a vertex-centric approach in processing large distributed graphs. As described in this paper, Pregel computations consist of a sequence of iterations, called supersteps. During a superstep, the following can happen in the framework:

  • It receives and reads messages that are sent to V from the previous superstep S-1.
  • It applies a user-defined function f to each vertices in parallel, so f essentially specifies the behaviour of a single vertex V at a single superstep S.
  • It can mutate the state of V.
  • It can send messages to other vertices (typically along outgoing edges) that the vertices will receive in the next superstep S+1.

In this approach, all communations are between supersteps S and S+1. Within each superstep, the same user-defined function f is applied to all of the vertices in parallel.

Termination condition

Each vertex either has the state active or inactive, in each superstep, vertices that receive a message will become active if not already, f will only be applied to vertices that are active.

In superstep 0, all vertices are in the active state, they will all receive a user-defined initial message, m. They will then perform the operations mentioned above and in the end, vertices that remain active will send messages to other vertices (typically along outgoing edges), while other inactive vertices will do nothing.

The iteration terminates when all vertices are in the inactive state and that no vertices will receive any further messages.


Let’s have a look at a simple example, given the graph below with 4 nodes, each representing an integer value, we wish to find the minimum value in this graph.


The pseudo-code definition of f is also given above, it will:

  • Set originalValue to the current value of the vertex.
  • Mutate the value of the vertex to the minimum of all the incoming messages and originalValue.
  • If originalValue and value are the same, then we will render the vertex inactive. Otherwise, send message out to all its outgoing neighbour.

In the diagrams that follows, active vertices are indicated by green shades, and after f is applied, the internal value of the vertex will be indicated by m(n) with m representing the new value and n representing the original value.

In superstep 0, all the vertices are initialised to be in the active state and having -1 as their originalValue. In this superstep, the vertices will send their values as messages to all their outgoing neighbours as illustrated in the following diagram.


In superstep 1, f will be applied to all the active vertices. Vertices that receive a message (from superstep 0) that is smaller than their current value will mutate their current value to the new value (while keeping track of the original value), all the other vertices will remain unchanged.

Once that’s done, vertices that fulfil the predicate originalValue == value will be switched to the state inactive (indicated by pink shades). For instance, the upper right vertex receive the message 7, then it set its originalValue to 3 and its value to 3, notice now that originalValue == value, thus rendering it inactive. Inactive vertices will not send any further message to their neighbours while other active vertices will send its value to the next vertices (outgoing neighbours).


Similar things happen in superstep 2, vertices that receive one or more messages will become active (e.g., the upper right vertex, it was inactive in the previous superstep, but became active in this superstep since it has an incoming message) if not already, then f is applied to all the active vertices, mutating some of them and making some of them inactive along the way before sending out further messages. In our case, there is only one active vertex left (upper right) and it will send out the message 2 to its neighbour.


Once again, in superstep 3, f is applied to the active vertices (upper right and lower right in our case), only this time, after f is applied, all the vertices have originalValue == value, rendering them inactive and there are no further message passing involve, we have reached our termination condition, if we take a look at the new graph, it will tell us the minimum value in all its vertices.


We have shown how the Pregel API works with a simple example, we also mentioned that GraphX exposes a variant of the Pregel API. In the next section, we will take a look at the same method and how it will work in GraphX by implementing it in code.

Example in GraphX

In GraphX, a graph is defined to be a property graph that is parameterised over the vertex (VD) and edge (ED) types. Each vertex is keyed by a unique 64-bit long identifier (VertexID).

1234class Graph[VD, ED] {  val vertices: VertexRDD[VD]  val edges: EdgeRDD[ED]}

The VertexRDD[A] extends RDD[(VertexID, A)] and adds the additional constraint that each VertexID occurs only once. while the EdgeRDD[ED] extends RDD[Edge[ED]].

Take the same example as given in the previous section.


In our case, our graph is a simple graph, we model the vertices to have property of type (Int, Int) (the original value and the new value) while edges have no property, for the sake of simplicity, we force a Boolean property to the edges that will always have the value true. So, our final graph will have the type Graph[Int, Boolean]. Let’s write the code to generate the graph.

12345678910111213141516171819202122232425val sc: SparkContext// Create an RDD for the verticesval vertices: RDD[(VertexId, (Int, Int))] =  sc.parallelize(Array((1L, (7,-1)), (2L, (3,-1)),                       (3L, (2,-1)), (4L, (6,-1))))// Create an RDD for edgesval relationships: RDD[Edge[Boolean]] =  sc.parallelize(Array(Edge(1L, 2L, true), Edge(1L, 4L, true),                       Edge(2L, 4L, true), Edge(3L, 1L, true),                        Edge(3L, 4L, true)))// Create the graphval graph = Graph(vertices, relationships)// Check the graphgraph.vertices.collect.foreach(println)
// output://    (1, (7, -1))//    (2, (3, -1))//    (3, (2, -1))//    (4, (6, -1))

Pregel API in GraphX

So now, graph is the GraphX representation of our Initial Graph shown in the diagram above. Let’s take a look at the Pregel API in GraphX.

12345678def pregel[A]      (initialMsg: A,       maxIter: Int = Int.MaxValue,       activeDir: EdgeDirection = EdgeDirection.Out)      (vprog: (VertexId, VD, A) => VD,       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],       mergeMsg: (A, A) => A)    : Graph[VD, ED]

The first argument list contains configuration parameters:

  • initialMsg is the user defined message that will be sent to the vertices prior to superstep 0.
  • maxIter is the maximum number of iterations we will perform before it terminates.
  • activeDir is the  edge direction in which to send messages, normally, this will be EdgeDirection.Out.

The second argument list contains the functions that handles the mutation of state and message passing:

  • vprog is the user defined function for receiving messages.
  • sendMsg is the user defined function to determine the messages to send out for the next iteration and where to send it to.
  • mergeMsg is the user defined function to merge multiple messages arriving at the same vertex at the start of a superstep before applying the vertex program vprog.

Edge Triplet

Notice EdgeTriplet[VD, ED] in the signature of sendMsg? An edge triplet represents an edge, along with the vertex attributes of its neighboring vertices. It has the following properties:

  • srcId The source vertex id.
  • srcAttr The source vertex attribute.
  • dstId The destination vertex id.
  • dstAttr The destination vertex attribute.
  • attr The edge attribute.

Graphically, the difference between Vertices, Edges and Triplets is illustrated in the diagram below.


User defined functions f

Now that we have seen the GraphX Pregel API, let’s start by defining our own vprog, sendMsg and mergeMsg functions.

12345678910111213141516171819val initialMsg = 9999def vprog(vertexId: VertexId, value: (Int, Int), message: Int): (Int, Int) = {  if (message == initialMsg)    value  else    (message min value._1, value._1)}def sendMsg(triplet: EdgeTriplet[(Int, Int), Boolean]): Iterator[(VertexId, Int)] = {  val sourceVertex = triplet.srcAttr  if (sourceVertex._1 == sourceVertex._2)    Iterator.empty  else     Iterator((triplet.dstId, sourceVertex._1))}def mergeMsg(msg1: Int, msg2: Int): Int = msg1 min msg2

We have chosen 9999 (with assumption that no numbers will exceed this) to be the initial message that will be sent to all the vertices in superstep 0 to get things started, but feel free to choose INTEGER.MAX if needed.

With all the functions defined, let’s feed them into the GraphX Pregel API as shown below.

123456val minGraph = graph.pregel(initialMsg,                             Int.MaxValue,                             EdgeDirection.Out)(                            vprog,                            sendMsg,                            mergeMsg)

Once the execution finish, minGraph will contain the result graph, we can inspect the graph by printing them out (since it’s a small graph).

12345678910minGraph.vertices.collect.foreach{  case (vertexId, (value, original_value)) => println(value)}
// output://    2//    2//    2//    2

Thus, the minimum value in the graph is 2.


We have demonstrated a simple example in GraphX of the Pregel API, it is enough to get us started with the GraphX Pregel API, but of course this is not the only way to find the minimum value in a graph, there might be some other algorithm that makes use of the relationship attribute to solve the same problem. Since the publication of the Pregel paper in 2010, there are plenty of example we can find from the internet that use Pregel to solve complex graph problems. There is also Apache Giraph which is an open source implementation of Pregel.

In the next article, we will talk about GraphX in detail and how we can make good use of its graph parallelism to solve some complex graph problem effectively. So stay tuned.  



0 收藏
1 评论
0 收藏