文档章节

Graphx处理janusGraph数据实现

仔仔1993
 仔仔1993
发布于 05/21 17:52
字数 1038
阅读 15
收藏 0

声明: 此方案是在spark直接执行gremlinSQL方案实现受阻的情况下的备选方案,不涉及工作机密,不存在泄密可能,纯属个人思考,希望抛砖引玉

方案: 将gremlinSql的查询结果转化为startGraph,然后转写到HDFS,spark读取hdfs的starGraphJSon构建graphx可用的图,然后就可以调用graphx丰富的图计算算法;从而将实现graphX操作janusgraph的目的

1. gremlinSql的查询结果转换成starGraphJson

由于org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONWriter保存的graphSon格式无法满足需求,所以将查询出的带path的点边数据自己转换成单点图的json结构,转化方法如下,然后存到hdfs,存储方法不再赘述。

public StringBuilder generatorStarGraphJson(Vertex vertex, Edge edge, StringBuilder starGraphJson){
	String inVId;
	String outVId;
	String VId;
	starGraphJson.append("{").append("\"id\":"+vertex.id()+","+"\"label\":\""+vertex.label()+"\",");
	//这种情况有outE和inE,outE中会有inV信息,inE会有一个inV
	inVId = edge.inVertex().id().toString();
	outVId = edge.outVertex().id().toString();
	VId = vertex.id().toString();
	if(inVId.equalsIgnoreCase(VId)){
		starGraphJson.append("\"outE\":{").append("\""+edge.label()+"\":[{").append("\"id\":\""+edge.id()+"\",")
				.append("\"inV\":"+edge.inVertex().id()+",").append("\"properties\":{"+concatEdgeProperties(edge)+"}}]},");
	}else if(outVId.equalsIgnoreCase(VId)){
		starGraphJson.append("\"inE\":{").append("\""+edge.label()+"\":[{").append("\"id\":\""+edge.id()+"\",")
				.append("\"outV\":"+edge.inVertex().id()+",").append("\"properties\":{"+concatEdgeProperties(edge)+"}}]},");
	}else{
		throw new Exception("点边不对应数据错误!!!");
	}
	//拼接点的properties
	starGraphJson.append("\"properties\":{").append(concatVertexProperties(vertex)).append("}}");
	return  starGraphJson;
}

2. spark读取指定路径的starGraph转成graph

class GraphSon2GraphXRDD() extends Serializable {

def getGraphConf(HDFSFilePath : String): BaseConfiguration ={ val inputGraphConf = new BaseConfiguration inputGraphConf.setProperty("gremlin.graph", classOf\[HadoopGraph\].getName) inputGraphConf.setProperty(Constants.GREMLIN\_HADOOP\_GRAPH\_READER, classOf\[GraphSONInputFormat\].getName) inputGraphConf.setProperty(Constants.GREMLIN\_HADOOP\_INPUT\_LOCATION, HDFSFilePath) inputGraphConf.setProperty(Constants.MAPREDUCE\_INPUT\_FILEINPUTFORMAT_INPUTDIR, HDFSFilePath) inputGraphConf }

def getSc(sparkHost:String ,isRemote:Boolean): SparkContext ={ var sparkConf = new SparkConf() if(isRemote){ //待完善 }else{ sparkConf.setMaster("local\[*\]").setAppName("GraphSon2GraphX") } val sc = new SparkContext(sparkConf) sc }

def getJavaRDD(conf : BaseConfiguration, sc : SparkContext): JavaPairRDD\[AnyRef, VertexWritable\] ={ val jsc = JavaSparkContext.fromSparkContext(sc) val graphRDDInput = new InputFormatRDD val vertexWritableJavaPairRDD = graphRDDInput.readGraphRDD(conf, jsc) vertexWritableJavaPairRDD }

def getVertexRDD(vertexWritableJavaPairRDD : JavaPairRDD\[AnyRef, VertexWritable\]): RDD\[(Long,util.HashMap\[String,java.io.Serializable\])\] ={ vertexWritableJavaPairRDD.rdd.map((tuple2: Tuple2\[AnyRef, VertexWritable\]) => { // Get the center vertex val v = tuple2._2.get val g = StarGraph.of(v) // In case the vertex id in TinkerGraph is not long type // val vid = convertStringIDToLongID([v.id](http://v.id)().toString) val vid = [v.id](http://v.id)().toString.toLong // Pass the vertex properties to GraphX vertex value map and remain the original vertex id var graphxValueMap : util.HashMap\[String,java.io.Serializable\] = new util.HashMapString,java.io.Serializable graphxValueMap.put("originalID",[v.id](http://v.id)().toString) graphxValueMap.putAll(g.traversal.V([v.id](http://v.id)).valueMap().next(1).get(0)) (vid,graphxValueMap) }) }

def getEdgeRDD(vertexWritableJavaPairRDD : JavaPairRDD\[AnyRef, VertexWritable\]): RDD\[graphx.Edge\[util.HashMap\[String, java.io.Serializable\]\]\] ={ val edge = vertexWritableJavaPairRDD.rdd.flatMap((tuple2: Tuple2\[AnyRef, VertexWritable\]) => { val v = tuple2._2.get val g = StarGraph.of(v) val edgelist:util.List\[Edge\] = g.traversal.V([v.id](http://v.id)).outE().toList

  // Put all edges of the center vertex into the list
  val list = new collection.mutable.ArrayBuffer[graphx.Edge[util.HashMap[String,java.io.Serializable]]]()
  var x = 0
  for(x <- 0 until edgelist.size()){
    var srcId = edgelist.get(x).inVertex.id().toString
    var dstId = edgelist.get(x).outVertex.id().toString
    //        val md1 = convertStringIDToLongID(srcId)
    //        val md2 = convertStringIDToLongID(dstId)
    val md1 = srcId.toLong
    val md2 = dstId.toLong
    // Get the properties of the edge
    var edgeAttr = new util.HashMap[String,java.io.Serializable]()
    var perporties : util.Iterator[Property[Nothing]] = edgelist.get(x).properties()
    while(perporties.hasNext){
      val property = perporties.next()
      edgeAttr.put(property.key(),property.value().toString)
    }
    list.append(graphx.Edge(md1,md2,edgeAttr))
  }
  list
})
val edgeRDD = edge.distinct()
edgeRDD

}

def doLAP(vertexWritableJavaPairRDD : JavaPairRDD\[AnyRef, VertexWritable\], iterationNum : Int): Array\[Array\[String\]\] = { val vertexRDD = getVertexRDD(vertexWritableJavaPairRDD)

val edgeRDD = getEdgeRDD(vertexWritableJavaPairRDD)

val graph = graphx.Graph[util.HashMap[String,java.io.Serializable],
  util.HashMap[String,java.io.Serializable]](vertexRDD,edgeRDD,new util.HashMap[String,java.io.Serializable]())

val LVMRsult = lib.LabelPropagation.run(graph , iterationNum).vertices.collect.sortWith (_._1 < _._1).map(f => {
  println(f.toString())
  f})
getFinalCommunit(LVMRsult)

}

def getFinalCommunit(LVMRsult:Array\[(Long,Long)\]): Array\[Array\[String\]\] ={ var result = new Array[Array\[String\]](LVMRsult.length) var tmp = new ArrayBufferString for(i <- 0 until LVMRsult.length){ var k = 0 val array = new ArrayBufferString
  //社区中包含多个值
  for(j &lt;- (i+1) until LVMRsult.length) {
    if(LVMRsult(i)._2.equals(LVMRsult(j)._2)){
      if(!tmp.contains(LVMRsult(i)._1.toString)){
        array += LVMRsult(i)._1.toString
        tmp += LVMRsult(i)._1.toString
      }
      if(!tmp.contains(LVMRsult(j)._1.toString)){
        array += LVMRsult(j)._1.toString
        tmp += LVMRsult(j)._1.toString
      }
      k = k+1
    }
  }

  //自己为一个社区
  if(k.equals(0)){
    if(!tmp.contains(LVMRsult(i)._1.toString)){
      array += LVMRsult(i)._1.toString
      tmp += LVMRsult(i)._1.toString
    }

  }
  if(array.length &gt; 0){
    result.update(i,array.toArray.distinct)
  }
}
result.filter(f =&gt; {
  println(if (f.length &gt;0) f.mkString("(",",",")"))
  f != null
})

}

def doPageRank(vertexWritableJavaPairRDD : JavaPairRDD\[AnyRef, VertexWritable\], stopThreshold : Double): Array\[Array\[Any\]\] = { val vertexRDD:RDD\[(Long,util.HashMap\[String,java.io.Serializable\])\] = getVertexRDD(vertexWritableJavaPairRDD)


val edgeRDD = getEdgeRDD(vertexWritableJavaPairRDD)

val graph = graphx.Graph[util.HashMap[String,java.io.Serializable],
  util.HashMap[String,java.io.Serializable]](vertexRDD,edgeRDD,new util.HashMap[String,java.io.Serializable]())
val gpgraph = graph.pageRank(stopThreshold).cache()

val titleAndPrGraph = graph.outerJoinVertices(gpgraph.vertices) {
  (v, title, rank) =&gt; (rank.getOrElse(0.0), title)
}

//倒序 false  正序 true

// titleAndPrGraph.vertices.sortBy((entry: (VertexId, (Double, Object))) => entry.\_2.\_1, false).foreach(f => println(f.\_1+":"+f.\_2._1))

val pageRank = titleAndPrGraph.vertices.sortBy((entry: (VertexId, (Double, Object))) =&gt; entry._2._1, false).map(f =&gt; {
  println(f._1+":"+f._2._1)
  Array(f._1.toString,f._2._1)
})
pageRank.collect()

}

}

这样就贯通了janusgraph和graphx,调用graphx的丰富的图计算功能就畅通无阻,就是实现有点挫,希望抛砖引玉

© 著作权归作者所有

仔仔1993
粉丝 8
博文 59
码字总数 55422
作品 0
武汉
后端工程师
私信 提问
JanusGraph-配置(第四章)--英文翻译

Part III, “Storage Backends” 列出了全部支持的存储后台, 如何配置参数. 如果需要支持图的高级检索(例如:全文检索, 地理位置检索, 和范围检索) 需要配置索引后台. Part IV, “Index Backe...

landebug
2018/01/04
0
0
图数据库JanusGraph快速实战

本节将使用Gods图作为演示示例,此图在JanusGraph演示中广泛使用。该图如下图所示。这个抽象的数据模型对应图模型中的属性,这个特定的实例描述了罗马万神殿中人物和地点之间的关系。此外,图...

咻咻ing
04/24
0
0
创建分布式图数据库JanusGraph对象的两种方法

JanusGraph 是一个分布式图数据库,相对于neo4j可进行横向扩展,且存储和图引擎分离,架构优美,本文将介绍JanusGraph的两种创建方式。 1、添加Maven依赖 有以下两种方式构建JanusGraph对象 ...

wodwl
2018/11/27
0
0
Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈与熟练的掌握Scala语言【大数据Spark

Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈 大数据的概念与应用,正随着智能手机、平板电脑的快速流行而日渐普及,大数据中图的并行化处理一直是一个非常热门的话题。图计算正在被广泛地...

Spark亚太研究院
2014/08/29
0
0
Spark之GraphX的特点

1.基于内存实现了数据的复用与快速读取 具有较多迭代次数是图计算算法的一个重要特点。在海量数据背景下,如何保证图计算算法的执行效率是所有图计算模型面对的一个难题。基于MapReduce的图计...

mmake1994
2018/04/16
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Flutter移动端实战手册

该文章属于<简书 — 刘小壮>原创,转载请注明: <简书 — 刘小壮> https://www.jianshu.com/p/d27c1f5ee3ff iOS接入Flutter 在进行iOS和Flutter的混编时,iOS比Android的接入方式略复杂,但也...

刘小壮
21分钟前
0
0
测试环境项目一键部署

git项目测试环境一键部署脚本 #!/bash/sh pid=`ps -ef | grep 'user_demo.jar' | grep -v grep | awk '{print $2}'` kill -9 $pid #跳转到git clone下目录 cd /home/apple/userdemo/ #拉取最......

ccman996
24分钟前
0
0
应用框架(spring-boot参数配置与使用)

这很耳东先生
31分钟前
0
0
使用SecureCRT从windows上传文件到Linux

点击File,然后选择SFTP,打开SFTP,然后进行下列操作。 1.指定文件所在路径 lcd E:\ 2.查看当前windows文件所在路径 lpwd 3.指定linux下文件上传路径 cd /usr/local 4.查询当前linux文件上传...

大雁南飞了
33分钟前
1
0
前后端分离-前端搭建(Vue)(3)加入Element和axios

继续上篇博客的Vue刚刚搭建完 , https://my.oschina.net/u/4116654/blog/3064431 这次我们来把Element 和axios弄好 首先先安装Element 下载好了之后 在src目录下的main.js里去添加Element的...

咸鱼-李y
44分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部