# GraphX Pregel API

Graph data and graph processing is getting more and more attention lately in various fields. It has become apparent that a large number of real world problems can be described in terms of graphs, for instance, the Web graph, the social network graph, the train network graph and the language graph. Often these graphs are exceptionally huge, take the Web graph for example, it is estimated that the number of web pages may have exceeded 30 billion. We are in need of a system that is able to process these graphs created by modern applications.

The importance of graph data has driven the development of numerous tools and frameworks to perform graph processing or graph storage, for instance, Apache Giraph, GraphLab, Spark GraphX, Neo4j, Titan and others. In this article, we will focus on one such tool, namely, the GraphX Pregel API.

## What is GraphX

Most of us have heard of the Apache Spark cluster computing framework and how the Databricks team used Spark to set a new world record in large scale sorting. GraphX is a distributed graph processing framework that was built on top of Spark, it started off as a research project at UC Berkeley AMPLab and Databricks (have a look at this paper) and it is now part of the Apache Spark project.

GraphX extends the Resilient Distributed Dataset (RDD) in Spark by introducing the Resilient Distributed Property Graph, on top of it. Built into GraphX are a collection of graph algorithms (e.g., Page Rank, Triangle Counting and Connected Components) and also some fundamental operations on graph. It even has an optimised variant of the Pregel API, which is a popular graph processing architecture developed at Google in a 2010 paper that aims to solve some of the problem faced in large graph processing.

## Pregel API

As mentioned earlier, graphs created by modern appliction can be very large (potentially terabytes or petabytes in size), thus, a single node (computer) will not be able to hold all these nodes in memory. The way forward is to partition and parallelise the graph to involve many machines to process them in parallel.

Pregel employs a vertex-centric approach in processing large distributed graphs. As described in this paper, Pregel computations consist of a sequence of iterations, called ** supersteps**. During a superstep, the following can happen in the framework:

- It receives and reads messages that are sent to
`V`

from the previous superstep`S-1`

. - It applies a user-defined function
`f`

to each vertices in parallel, so`f`

essentially specifies the behaviour of a single vertex`V`

at a single superstep`S`

. - It can mutate the state of
`V`

. - It can send messages to other vertices (typically along outgoing edges) that the vertices will receive in the next superstep
`S+1`

.

In this approach, all communations are between supersteps `S`

and `S+1`

. Within each superstep, the same user-defined function `f`

is applied to all of the vertices in parallel.

### Termination condition

Each vertex either has the state **active** or **inactive**, in each superstep, vertices that receive a message will become active if not already, `f`

will only be applied to vertices that are **active**.

In superstep 0, all vertices are in the ** active** state, they will all receive a user-defined initial message,

`m`

. They will then perform the operations mentioned above and in the end, vertices that remain active will send messages to other vertices (typically along outgoing edges), while other inactive vertices will do nothing.The iteration **terminates** when all vertices are in the **inactive** state and that no vertices will receive any further messages.

## Example

Let’s have a look at a simple example, given the graph below with 4 nodes, each representing an integer value, we wish to find the minimum value in this graph.

The pseudo-code definition of `f`

is also given above, it will:

- Set
`originalValue`

to the current`value`

of the vertex. - Mutate the
`value`

of the vertex to the**minimum**of all the incoming messages and`originalValue`

. - If
`originalValue`

and`value`

are the same, then we will render the vertex**inactive**. Otherwise, send message out to all its outgoing neighbour.

In the diagrams that follows, active vertices are indicated by green shades, and after `f`

is applied, the internal value of the vertex will be indicated by `m(n)`

with `m`

representing the new value and `n`

representing the original value.

In superstep 0, all the vertices are initialised to be in the active state and having `-1`

as their `originalValue`

. In this superstep, the vertices will send their values as messages to all their outgoing neighbours as illustrated in the following diagram.

In superstep 1, `f`

will be applied to all the active vertices. Vertices that receive a message (from superstep 0) that is smaller than their current value will mutate their current value to the new value (while keeping track of the original value), all the other vertices will remain unchanged.

Once that’s done, vertices that fulfil the predicate `originalValue == value`

will be switched to the state **inactive** (indicated by pink shades). For instance, the upper right vertex receive the message `7`

, then it set its `originalValue`

to `3`

and its `value`

to `3`

, notice now that `originalValue == value`

, thus rendering it **inactive**. Inactive vertices will not send any further message to their neighbours while other active vertices will send its value to the next vertices (outgoing neighbours).

Similar things happen in superstep 2, vertices that receive one or more messages will become **active** (e.g., the upper right vertex, it was inactive in the previous superstep, but became active in this superstep since it has an incoming message) if not already, then `f`

is applied to all the active vertices, mutating some of them and making some of them inactive along the way before sending out further messages. In our case, there is only one active vertex left (upper right) and it will send out the message `2`

to its neighbour.

Once again, in superstep 3, `f`

is applied to the active vertices (upper right and lower right in our case), only this time, after `f`

is applied, all the vertices have `originalValue == value`

, rendering them **inactive** and there are no further message passing involve, we have reached our termination condition, if we take a look at the new graph, it will tell us the minimum value in all its vertices.

We have shown how the Pregel API works with a simple example, we also mentioned that GraphX exposes a variant of the Pregel API. In the next section, we will take a look at the same method and how it will work in GraphX by implementing it in code.

## Example in GraphX

In GraphX, a graph is defined to be a property graph that is parameterised over the vertex (VD) and edge (ED) types. Each vertex is keyed by a unique 64-bit long identifier (VertexID).

`1234``class Graph[VD, ED] { val vertices: VertexRDD[VD] val edges: EdgeRDD[ED]}`

The `VertexRDD[A]`

extends `RDD[(VertexID, A)]`

and adds the additional constraint that each VertexID occurs only once. while the `EdgeRDD[ED]`

extends `RDD[Edge[ED]]`

.

Take the same example as given in the previous section.

In our case, our graph is a simple graph, we model the vertices to have property of type `(Int, Int)`

(the original value and the new value) while edges have no property, for the sake of simplicity, we force a Boolean property to the edges that will always have the value `true`

. So, our final graph will have the type `Graph[Int, Boolean]`

. Let’s write the code to generate the graph.

`12345678910111213141516171819202122232425````
val sc: SparkContext// Create an RDD for the verticesval vertices: RDD[(VertexId, (Int, Int))] = sc.parallelize(Array((1L, (7,-1)), (2L, (3,-1)), (3L, (2,-1)), (4L, (6,-1))))// Create an RDD for edgesval relationships: RDD[Edge[Boolean]] = sc.parallelize(Array(Edge(1L, 2L, true), Edge(1L, 4L, true), Edge(2L, 4L, true), Edge(3L, 1L, true), Edge(3L, 4L, true)))// Create the graphval graph = Graph(vertices, relationships)// Check the graphgraph.vertices.collect.foreach(println)
// output:// (1, (7, -1))// (2, (3, -1))// (3, (2, -1))// (4, (6, -1))
```

### Pregel API in GraphX

So now, `graph`

is the GraphX representation of our **Initial Graph** shown in the diagram above. Let’s take a look at the Pregel API in GraphX.

`12345678``def pregel[A] (initialMsg: A, maxIter: Int = Int.MaxValue, activeDir: EdgeDirection = EdgeDirection.Out) (vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED]`

The first argument list contains configuration parameters:

`initialMsg`

is the user defined message that will be sent to the vertices prior to superstep 0.`maxIter`

is the maximum number of iterations we will perform before it terminates.`activeDir`

is the edge direction in which to send messages, normally, this will be`EdgeDirection.Out`

.

The second argument list contains the functions that handles the mutation of state and message passing:

`vprog`

is the user defined function for receiving messages.`sendMsg`

is the user defined function to determine the messages to send out for the next iteration and where to send it to.`mergeMsg`

is the user defined function to merge multiple messages arriving at the same vertex at the start of a superstep before applying the vertex program`vprog`

.

#### Edge Triplet

Notice `EdgeTriplet[VD, ED]`

in the signature of `sendMsg`

? An edge triplet represents an edge, along with the vertex attributes of its neighboring vertices. It has the following properties:

`srcId`

The source vertex id.`srcAttr`

The source vertex attribute.`dstId`

The destination vertex id.`dstAttr`

The destination vertex attribute.`attr`

The edge attribute.

Graphically, the difference between **Vertices**, **Edges** and **Triplets** is illustrated in the diagram below.

#### User defined functions `f`

Now that we have seen the GraphX Pregel API, let’s start by defining our own `vprog`

, `sendMsg`

and `mergeMsg`

functions.

`12345678910111213141516171819``val initialMsg = 9999def vprog(vertexId: VertexId, value: (Int, Int), message: Int): (Int, Int) = { if (message == initialMsg) value else (message min value._1, value._1)}def sendMsg(triplet: EdgeTriplet[(Int, Int), Boolean]): Iterator[(VertexId, Int)] = { val sourceVertex = triplet.srcAttr if (sourceVertex._1 == sourceVertex._2) Iterator.empty else Iterator((triplet.dstId, sourceVertex._1))}def mergeMsg(msg1: Int, msg2: Int): Int = msg1 min msg2`

We have chosen `9999`

(with assumption that no numbers will exceed this) to be the initial message that will be sent to all the vertices in superstep 0 to get things started, but feel free to choose `INTEGER.MAX`

if needed.

With all the functions defined, let’s feed them into the GraphX Pregel API as shown below.

`123456``val minGraph = graph.pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)( vprog, sendMsg, mergeMsg)`

Once the execution finish, `minGraph`

will contain the result graph, we can inspect the graph by printing them out (since it’s a small graph).

`12345678910````
minGraph.vertices.collect.foreach{ case (vertexId, (value, original_value)) => println(value)}
// output:// 2// 2// 2// 2
```

Thus, the minimum value in the graph is 2.

### Conclusion

We have demonstrated a simple example in GraphX of the Pregel API, it is enough to get us started with the GraphX Pregel API, but of course this is not the only way to find the minimum value in a graph, there might be some other algorithm that makes use of the relationship attribute to solve the same problem. Since the publication of the Pregel paper in 2010, there are plenty of example we can find from the internet that use Pregel to solve complex graph problems. There is also Apache Giraph which is an open source implementation of Pregel.

In the next article, we will talk about GraphX in detail and how we can make good use of its graph parallelism to solve some complex graph problem effectively. So stay tuned.