ktlb

## 背景:

1. spark graphx并未提供极大团挖掘算法
2. 当下的极大团算法都是串行化的算法,基于Bron–Kerbosch算法

## 思路:

1. spark graphx提供了连通图的算法,连通图和极大团都是无向图中的概念,极大团为连通图的子集
2. 利用spark graphx 找出连通图,在从各个连通图中,利用串行化的极大团算法,找出极大团 (伪并行化)
3. 对于关联性较强的图,找出来的连通图非常大,这时串行化的极大团算法,仍然会耗时很久,这里利用剪枝的思想减少样本数据量,但是对于大图,优化空间有限
4. 期待真正的并行化的极大团算法

## 配置文件:

``````graph_data_path=hdfs://localhost/graph_data
out_path=hdfs://localhost/clique
ck_path=hdfs://localhost/checkpoint
numIter=50		剪枝次数
count=3			极大团顶点数大小
algorithm=2		极大团算法,1:个人实现  2:jgrapht
percent=90		剪枝后的顶点数,占前一次的百分比,如果剪完后,还剩下90%的数据,那么剪枝效率已然不高
spark.master=local
spark.app.name=graph
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC
spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+UseCompressedOops -XX:+DisableExplicitGC
spark.driver.maxResultSize=10g
spark.default.parallelism=60
``````

jgrapht

## 样本数据:

``````{"src":"0","dst":"1"}
{"src":"0","dst":"2"}
{"src":"0","dst":"3"}
{"src":"1","dst":"0"}
{"src":"2","dst":"1"}
{"src":"3","dst":"5"}
{"src":"4","dst":"6"}
{"src":"5","dst":"4"}
{"src":"6","dst":"5"}
{"src":"3","dst":"2"}
{"src":"2","dst":"3"}
{"src":"6","dst":"4"}
{"src":"3","dst":"4"}
{"src":"4","dst":"3"}
{"src":"2","dst":"6"}
{"src":"6","dst":"2"}
{"src":"6","dst":"7"}
{"src":"7","dst":"6"}
``````

## 输出:

``````0,1,2
0,2,3
3,4,5
4,5,6
``````

## 代码实现:

``````import java.util
import java.util.Properties

import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
import org.jgrapht.alg.BronKerboschCliqueFinder
import org.jgrapht.graph.{DefaultEdge, SimpleGraph}

import scala.collection.JavaConverters._
import scala.collection.mutable

object ApplicationTitan {
def main(args: Array[String]) {
val prop = new Properties()

val graph_data_path = prop.getProperty("graph_data_path")
val out_path = prop.getProperty("out_path")
val ck_path = prop.getProperty("ck_path")
val count = Integer.parseInt(prop.getProperty("count"))
val numIter = Integer.parseInt(prop.getProperty("numIter"))
val algorithm = Integer.parseInt(prop.getProperty("algorithm"))
val percent = Integer.parseInt(prop.getProperty("percent"))
val conf = new SparkConf()
try {
Runtime.getRuntime.exec("hdfs dfs -rm -r " + out_path)
//            Runtime.getRuntime.exec("cmd.exe /C rd /s /q " + out_path)
} catch {
case ex: Exception =>
ex.printStackTrace(System.out)
}

prop.stringPropertyNames().asScala.foreach(s => {
if (s.startsWith("spark")) {
conf.set(s, prop.getProperty(s))
}
})
conf.registerKryoClasses(Array(getClass))
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
sc.setCheckpointDir(ck_path)
val sqlc = new SQLContext(sc)
try {
//                        .json(graph_data_path)
.parquet(graph_data_path)

var e_rdd = e_df
.mapPartitions(it => {
it.map({
case Row(dst: String, src: String) =>
val src_long = src.toLong
val dst_long = dst.toLong
if (src_long < dst_long) (src_long, dst_long) else (dst_long, src_long)
})
}).distinct()
e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)

var iter = 0
var bc_size = 0
//剪枝
while (iter <= numIter) {
val temp = e_rdd
.flatMap(x => List((x._1, 1), (x._2, 1)))
.reduceByKey((x, y) => x + y)
.filter(x => x._2 >= count - 1)
.mapPartitions(it => it.map(x => x._1))
val bc_value = temp.collect().toSet
e_rdd = e_rdd.filter(x => bc.value.contains(x._1) && bc.value.contains(x._2))
e_rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)
iter += 1
if (bc_size != 0 && bc_value.size >= bc_size * percent / 100) {
println("total iter : "+ iter)
iter = Int.MaxValue
}
bc_size = bc_value.size
}

// 构造图
val edge: RDD[Edge[Long]] = e_rdd.mapPartitions(it => it.map(x => Edge(x._1, x._2)))
val graph = Graph.fromEdges(edge, 0, StorageLevel.MEMORY_AND_DISK_SER, StorageLevel.MEMORY_AND_DISK_SER)

//连通图
val cc = graph.connectedComponents().vertices
cc.persist(StorageLevel.MEMORY_AND_DISK_SER)

cc.join(e_rdd)
.mapPartitions(it => it.map(x => ((math.random * 10).toInt.toString.concat(x._2._1.toString), (x._1, x._2._2))))
.aggregateByKey(List[(Long, Long)]())((list, v) => list :+ v, (list1, list2) => list1 ::: list2)
.mapPartitions(it => it.map(x => (x._1.substring(1), x._2)))
.aggregateByKey(List[(Long, Long)]())((list1, list2) => list1 ::: list2, (list3, list4) => list3 ::: list4)
.filter(x => x._2.size >= count - 1)
.flatMap(x => {
if (algorithm == 1)
find(x, count)
else
find2(x, count)
})
.mapPartitions(it => {
it.map({
case set =>
var temp = ""
set.asScala.foreach(x => temp += x + ",")
temp.substring(0, temp.length - 1)
case _ =>
})
})
//                .coalesce(1)
.saveAsTextFile(out_path)
}

catch {
case ex: Exception =>
ex.printStackTrace(System.out)
}
sc.stop()
}
//自己实现的极大团算法
def find(x: (String, List[(Long, Long)]), count: Int): mutable.Set[util.Set[String]] = {
println(x._1 + "|s|" + x._2.size)
println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())
val neighbors = new util.HashMap[String, util.Set[String]]
val finder = new CliqueFinder(neighbors, count)
x._2.foreach(r => {
val v1 = r._1.toString
val v2 = r._2.toString
if (neighbors.containsKey(v1)) {
} else {
val temp = new util.HashSet[String]()
neighbors.put(v1, temp)
}
if (neighbors.containsKey(v2)) {
} else {
val temp = new util.HashSet[String]()
neighbors.put(v2, temp)
}
})
println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())
finder.findMaxCliques().asScala
}
//jgrapht 中的极大团算法
def find2(x: (String, List[(Long, Long)]), count: Int): Set[util.Set[String]] = {
println(x._1 + "|s|" + x._2.size)
println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())
val to_clique = new SimpleGraph[String, DefaultEdge](classOf[DefaultEdge])
x._2.foreach(r => {
val v1 = r._1.toString
val v2 = r._2.toString
})
val finder = new BronKerboschCliqueFinder(to_clique)
val list = finder.getAllMaximalCliques.asScala
var result = Set[util.Set[String]]()
list.foreach(x => {
if (x.size() >= count)
result = result + x
})
println("BKCliqueFinder---" + x._1 + "---" + System.currentTimeMillis())
result
}
}
``````

## 自己实现的极大团算法:

``````import java.util.*;

/**
* [@author](https://my.oschina.net/arthor) mopspecial@gmail.com
* [@date](https://my.oschina.net/u/2504391) 2017/7/31
*/
public class CliqueFinder {
private Map<String, Set<String>> neighbors;
private Set<String> nodes;
private Set<Set<String>> maxCliques = new HashSet<>();
private Integer minSize;

public CliqueFinder(Map<String, Set<String>> neighbors, Integer minSize) {
this.neighbors = neighbors;
this.nodes = neighbors.keySet();
this.minSize = minSize;
}

private void bk3(Set<String> clique, List<String> candidates, List<String> excluded) {
if (candidates.isEmpty() && excluded.isEmpty()) {
if (!clique.isEmpty() && clique.size() >= minSize) {
}
return;
}

for (String s : degeneracy_order(candidates)) {
List<String> new_candidates = new ArrayList<>(candidates);
new_candidates.retainAll(neighbors.get(s));

List<String> new_excluded = new ArrayList<>(excluded);
new_excluded.retainAll(neighbors.get(s));
Set<String> nextClique = new HashSet<>(clique);
bk2(nextClique, new_candidates, new_excluded);
candidates.remove(s);
}
}

private void bk2(Set<String> clique, List<String> candidates, List<String> excluded) {
if (candidates.isEmpty() && excluded.isEmpty()) {
if (!clique.isEmpty() && clique.size() >= minSize) {
}
return;
}
String pivot = pick_random(candidates);
if (pivot == null) {
pivot = pick_random(excluded);
}
List<String> tempc = new ArrayList<>(candidates);
tempc.removeAll(neighbors.get(pivot));

for (String s : tempc) {
List<String> new_candidates = new ArrayList<>(candidates);
new_candidates.retainAll(neighbors.get(s));

List<String> new_excluded = new ArrayList<>(excluded);
new_excluded.retainAll(neighbors.get(s));
Set<String> nextClique = new HashSet<>(clique);
bk2(nextClique, new_candidates, new_excluded);
candidates.remove(s);
}
}

private List<String> degeneracy_order(List<String> innerNodes) {
List<String> result = new ArrayList<>();
Map<String, Integer> deg = new HashMap<>();
for (String node : innerNodes) {
deg.put(node, neighbors.get(node).size());
}
while (!deg.isEmpty()) {
Integer min = Collections.min(deg.values());
String minKey = null;
for (String key : deg.keySet()) {
if (deg.get(key).equals(min)) {
minKey = key;
break;
}
}
deg.remove(minKey);
for (String k : neighbors.get(minKey)) {
if (deg.containsKey(k)) {
deg.put(k, deg.get(k) - 1);
}
}

}
return result;
}

private String pick_random(List<String> random) {
if (random != null && !random.isEmpty()) {
return random.get(0);
} else {
return null;
}
}

public Set<Set<String>> findMaxCliques() {
this.bk3(new HashSet<>(), new ArrayList<>(nodes), new ArrayList<>());
return maxCliques;
}

public static void main(String[] args) {
Map<String, Set<String>> neighbors = new HashMap<>();
neighbors.put("0", new HashSet<>(Arrays.asList("1", "2", "3")));
neighbors.put("1", new HashSet<>(Arrays.asList("0", "2")));
neighbors.put("2", new HashSet<>(Arrays.asList("0", "1", "3", "6")));
neighbors.put("3", new HashSet<>(Arrays.asList("0", "2", "4", "5")));
neighbors.put("4", new HashSet<>(Arrays.asList("3", "5", "6")));
neighbors.put("5", new HashSet<>(Arrays.asList("3", "4", "6")));
neighbors.put("6", new HashSet<>(Arrays.asList("2", "4", "5")));
neighbors.put("7", new HashSet<>(Arrays.asList("6")));
CliqueFinder finder = new CliqueFinder(neighbors, 3);
finder.bk3(new HashSet<>(), new ArrayList<>(neighbors.keySet()), new ArrayList<>());
System.out.println(finder.maxCliques);
}
}
``````

### 评论(1)

Spark GraphX宝刀出鞘，图文并茂研习图计算秘笈与熟练的掌握Scala语言【大数据Spark

Spark GraphX宝刀出鞘，图文并茂研习图计算秘笈 大数据的概念与应用，正随着智能手机、平板电脑的快速流行而日渐普及，大数据中图的并行化处理一直是一个非常热门的话题。图计算正在被广泛地...

Spark亚太研究院
2014/08/29
1K
0
Spark之GraphX的特点

1.基于内存实现了数据的复用与快速读取 具有较多迭代次数是图计算算法的一个重要特点。在海量数据背景下，如何保证图计算算法的执行效率是所有图计算模型面对的一个难题。基于MapReduce的图计...

mmake1994
2018/04/16
0
0
Spark2.1.0之模块设计

2018/06/05
0
0
Spark 数据分析导论-笔记

Spark Core Spark Core 实现了Spark 的基本功能，包含任务调度、内存管理、错误恢复、与存储系统交互等模块。 Spark Core 中还包含了 对弹性分布式数据集（resilient distributed dataset，简...

Java搬砖工程师
2018/12/26
28
0

2018/05/15
17
0

OSChina 周日乱弹 —— 我，小小编辑，食人族酋长

Osc乱弹歌单（2019）请戳（这里） 【今日歌曲】 @宇辰OSC ：分享娃娃的单曲《飘洋过海来看你》: #今日歌曲推荐# 《飘洋过海来看你》- 娃娃 手机党少年们想听歌，请使劲儿戳（这里） @宇辰OSC...

42分钟前
85
5
spring cloud

2
0
Executor线程池原理与源码解读

6
0
maven 环境隔离

8
0

OBKoro1

7
0