文档章节

Spark数据挖掘-深入GraphX(1)

clebeg
 clebeg
发布于 2015/11/26 09:57
字数 4189
阅读 904
收藏 22

Spark数据挖掘-深入GraphX(1)

1 网络数据集

当图被用来描述系统中的组件之间的交互关系的时候,图可以被用来表示任何系统。图原理提供了通用的语言和一系列工具来表示和分析复杂的系统。简单的说:图由一系列顶点和边组成,每条边连接两个顶点表示这两个顶点之间的某种关系。下面介绍一下本文将会演示的几个有趣的图将要用到的数据:

图名字 数据集地址 下载文件名 描述
邮件交流网络图 https://snap.stanford.edu/data/email-Enron.html email-Enron.txt.gz Enron公司158名雇员的电子邮件往来数据构成一个邮件交流网络有向图
食品品味网络图 http://yongyeol.com/2011/12/15/paper-flavor-network.html ingr_comp.zip 通过三个食品网站获取得到的每个食品组成成分和每个成分对应的化学合成物构成一个网络
个人社交网络图 http://snap.stanford.edu/data/egonets-Gplus.html gplus.tar.gz 数据中的用户圈子组成一个个人社交网络,数据集还包括个人属性信息

2 GraphX 图形创建方式

在GraphX里面有四种创建一个属性图的方法。每种构建图的方法对数据都有一定的格式要求。下面一一分析。

2.1 利用 Object Graph 的工厂方法创建

Object Graph 是 Class Graph 的伴生对象,它定义了创建 Graph 对象的 apply 方法定义如下:

def apply[VD, ED](
  vertices: RDD[(VertexId, VD)],
  edges: RDD[Edge[ED]],
  defaultVertexAttr: VD = null
  ): Graph[VD, ED]

此方法通过传入顶点:RDD[(VertexId,VD)]和边:RDD[Edge[ED]] 就可以创建一个图。注意参数: defaultVertexAttr 是用来设置那些边中的顶点不在传入的顶点集合当中的顶点的默认属性,所以这个值的类型必须是和传入顶点的属性的类型一样。

2.2 利用 edgeListFile 创建

一个非常常见的场景是:你数据集里的数据表示的是顶点与顶点的关系即只表示边。这种情况下Graphx提供了GraphLoader.edgeListFile函数来自动生成图,函数的定义如下:

def edgeListFile(
  sc: SparkContext,
  path: String,
  canonicalOrientation: Boolean = false,
  numEdgePartitions: Int = -1)
  : Graph[Int, Int]

sc、path 这两个参数不用多说,需要注意的参数解析如下:

  • path 指向包含边的文件或文件夹 要求:文件每一行用两个按照多个空格分割的正整数表示的边,如: scrId dstId,Spark 读取的时候会忽略# 开头的行
  • canonicalOrientation 表示图是否有方向 如果值为true,那么只会加载 srcId > dstId 的边,否则全部加载
  • 加载完所有边之后,自动按照边生成顶点,默认的每个顶点的属性是1
  • numEdgePartitions 边分区个数默认是按照文件分区来划分的,也可以指定

下面看一下关键源码:

val edges = lines.mapPartitionsWithIndex { (pid, iter) =>
  val builder = new EdgePartitionBuilder[Int, Int]
  iter.foreach { line =>
    if (!line.isEmpty && line(0) != '#') {
      val lineArray = line.split("\\s+")
      if (lineArray.length < 2) {
        throw new IllegalArgumentException("Invalid line: " + line)
      }
      val srcId = lineArray(0).toLong
      val dstId = lineArray(1).toLong
      if (canonicalOrientation && srcId > dstId) {
        builder.add(dstId, srcId, 1)
      } else {
        builder.add(srcId, dstId, 1)
      }
    }
  }
}

2.3 利用 fromEdges 创建

这个方法可以理解为edgeListFile方法内部就是调用这个方法。原理就是只根据边: RDD[Edge[ED]] 来生成图,顶点就是由所有构成边的顶点组成,顶点的默认属性用户可以指定,定义如下:

def fromEdges[VD: ClassTag, ED: ClassTag](
    edges: RDD[Edge[ED]],
    defaultValue: VD): Graph[VD, ED]

2.4 利用 fromEdgeTuples 创建

这个方法也可以理解为edgeListFile方法内部就是调用这个方法。原理就是只根据边: RDD[(VertexId, VertexId)] 来生成图,连边的属性都不知道,默认边的属性当然可以设置,顶点就是由所有构成边的顶点组成,顶点的默认属性用户可以指定,定义如下:

def fromEdgeTuples[VD](
  rawEdges: RDD[(VertexId, VertexId)],
  defaultValue: VD,
  uniqueEdges: Option[PartitionStrategy] = None)
  : Graph[VD, Int]

其实后面三种方式都是不明确指定顶点,而是通过边来推导出顶点,这非常适合无属性图,比较常用的是第一种和第二种方式。当然也可以自己实现第三种方式的文件读取方式,比如文件中不止两列,还有属性列等等,非常简单。

3 GraphX 图形创建实战

3.1 创建一个双向图

先拿上面数据列表中的第一份数据,数据解压之后的文件名为:Email-Enron.txt,前面十条示例数据如下:

# Directed graph (each unordered pair of nodes is saved once): Email-Enron.txt
# Enron email network (edge indicated that email was exchanged, undirected edges)
# Nodes: 36692 Edges: 367662
# FromNodeId	ToNodeId
0	1
1	0
1	2
1	3

可以发现这个数据集合非常适合上面edgeListFile方法创建图形,代码如下:

val emailGraph = GraphLoader.edgeListFile(sc, projectDir + "Email-Enron.txt")

查看一下图中前面5个顶点和边

emailGraph.vertices.take(5).foreach(println)
(19021,1)
(28730,1)
(23776,1)
(34207,1)
(31037,1)
emailGraph.edges.take(5).foreach(println)
Edge(0,1,1)
Edge(1,0,1)
Edge(1,2,1)
Edge(1,3,1)
Edge(1,4,1)

查看一下是否是双向图(任何两个点只要有连接必须是来回指向),这里只是查看顶点ID为19021的点:

emailGraph.edges.filter(_.srcId == 19021).map(_.dstId).collect().foreach(println)
696
4232
6811
8315
26007
emailGraph.edges.filter(_.dstId == 19021).map(_.srcId).collect().foreach(println)
696
4232
6811
8315
26007

3.2 创建一个二分图

什么是二分图?简单来说:二分图指的是图的顶点分为两个集合,其中任意集合内部顶点不可能有边关联,关联的边顶点一定分布在两个不同的集合之中。详细原理见Wiki百科
本文第二个数据集食物成分和化合物的关系图就是二分图。将下载的数据解压,先来看一下压缩包中每个原始文件前十条数据:

  • 文件1:ingr_info.tsv 从文件名可以知道它是按照制表符分割的文件 表示的是食物原料的信息
下面三列分别表示:原料ID	原料名字	分类
# id	ingredient name	category
0	magnolia_tripetala	flower
1	calyptranthes_parriculata	plant
2	chamaecyparis_pisifera_oil	plant derivative
3	mackerel	fish/seafood
4	mimusops_elengi_flower	flower
5	hyssop	herb
6	buchu	plant
7	black_pepper	spice
8	eryngium_poterium_oil	plant derivative
9	peanut_butter	plant derivative
  • 文件2:comp_info.tsv 这个表示化合物的基础信息
下面三列分别表示:化合物ID	化合物名字	CAS编号
# id	Compound name	CAS number
0	jasmone	488-10-8
1	5-methylhexanoic_acid	628-46-6
2	l-glutamine	56-85-9
3	1-methyl-3-methoxy-4-isopropylbenzene	1076-56-8
4	methyl-3-phenylpropionate	103-25-3
5	3-mercapto-2-methylpentan-1-ol_(racemic)	227456-27-1
6	ethyl-3-hydroxybutyrate	5405-41-4
7	cyclohexyl_butyrate	1551-44-6
8	methyl_dihydrojasmonate	24851-98-7
9	methyl_2-methylthiobutyrate	42075-45-6
  • 文件3:ingr_comp.tsv 这个记录的是 ingredient 和 compound 对应关系
# ingredient id	compound id
1392	906
1259	861
1079	673
22	906
103	906
1005	906
1005	278
1005	171

有了数据之后,如果你盲目的使用第三个文件直接按照上面的第一种方式建图的话,那么就会大错特错。因为第一列的ID和第二列的ID不是表示同一个事物,但是它们有交叉的数值。一个简单的办法就是第二列的值转化为第一列最大值+1之后再加上自身的数值,这样保证两个集合的ID没有交叉。请看下面的代码:

package clebeg.spark.graph

import org.apache.spark.graphx.{EdgeTriplet, VertexId, Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}


//定义下面的类将 ingredient 和 compount 统一表示 注意父类一定要可以序列化
class FoodNode(val name: String) extends Serializable
case class Ingredient(override val name: String, val cat: String) extends FoodNode(name)
case class Compound(override val name: String, val cas: String) extends FoodNode(name)
/**
  * Created by clebegxie on 2015/11/25.
  */
object Graph1Food {
  val projectDir = "your_data_dir/"
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkInAction").setMaster("local[4]")
    val sc = new SparkContext(conf)
    val ingredients: RDD[(VertexId, FoodNode)] = sc.textFile(projectDir + "ingr_info.tsv").filter {
      !_.startsWith("#")
    }.map {
      line =>
        val array = line.split("\t")
        (array(0).toLong, Ingredient(array(1), array(2)))
    }
    //获取得到最大的 ingredient 的ID 并且加1
    val maxIngrId = ingredients.keys.max() + 1
    val compounds: RDD[(VertexId, FoodNode)] = sc.textFile(projectDir + "comp_info.tsv").filter {
      !_.startsWith("#")
    }.map {
      line =>
        val array = line.split("\t")
        (maxIngrId + array(0).toLong, Compound(array(1), array(2)))
    }
    //根据文件 ingr_comp.csv 生成边,注意第二列的所有顶点都要加上 maxIngrId
    val links = sc.textFile(projectDir + "ingr_comp.tsv").filter {
      !_.startsWith("#")
    }.map {
      line =>
        val array = line.split("\t")
        Edge(array(0).toLong, maxIngrId + array(1).toLong, 1)
    }
    //将两个顶点合并
    val vertices = ingredients ++ compounds
    val foodNetWork = Graph(vertices, links)
    //foodNetWork.vertices.take(10).foreach(println)
    //访问一下这个网络前面5条triplet的对应关系
    foodNetWork.triplets.take(5).foreach(showTriplet _ andThen println _)
  }

  def showTriplet(t: EdgeTriplet[FoodNode, Int]): String =
    "The ingredient " ++ t.srcAttr.name ++ " contains " ++ t.dstAttr.name
}

运行结果为:

The ingredient calyptranthes_parriculata contains citral_(neral)
The ingredient chamaecyparis_pisifera_oil contains undecanoic_acid
The ingredient hyssop contains myrtenyl_acetate
The ingredient hyssop contains 4-(2,6,6-trimethyl-cyclohexa-1,3-dienyl)but-2-en-4-one
The ingredient buchu contains menthol

3.3 创建一个人与人之间相似性权重图

数据集是使用上面介绍的Google+提供的个人关系数据,解压之后有792个文件,每一个文件名去掉后缀代表的是网络ID,每个网络ID有6个文件,所以这里有132个个人关系网络。下面以ID为100129275726588145876的网络说明一下每个文件的含义:

  • .edges 记录的是边,即ID对应的用户之间有关联,示例数据为:
116374117927631468606 101765416973555767821
112188647432305746617 107727150903234299458
116719211656774388392 100432456209427807893
117421021456205115327 101096322838605097368
116407635616074189669 113556266482860931616
105706178492556563330 111169963967137030210
107527001343993112621 110877363259509543172
105513412023818293063 115710735637044108808
108736646334864181044 112393248315358692010
108683283643126638695 107111579950257773726
  • .feat 记录的是每个用户ID对应的特征,每个维度上面都是取值为 0 1,示例数据为:
#注意这里只是一行数据
114985346359714431656 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 1 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0
  • .featnames 记录的是上面feat每个维度对应的含义(注意:上面之所以每个维度取值都是 0 1, 是因为这里的特征都是分类变量,并且做了 1 of n 编码),示例数据为:
//从下面的gender可以看出,做了 1 of n 编码
0 gender:1
1 gender:2
2 gender:3
3 institution:
4 institution:AMC Theatres
5 institution:AOL
6 institution:AT&T
7 institution:Aardvark
8 institution:Accenture
9 institution:Adobe Systems

下面之间给出建图代码,代码意图都有注释:

val projectDir = "your_data_dir/"
val id = "100129275726588145876" //只建立这个ID对应的社交关系图
type Feature = breeze.linalg.SparseVector[Int]
def main(args: Array[String]) {
  val conf = new SparkConf().setAppName("SparkInAction").setMaster("local[4]")
  val sc = new SparkContext(conf)
  //通过 .feat 文件读取每个顶点的属性向量
  val featureMap = Source.fromFile(projectDir + id + ".feat").getLines().
  map {
    line =>
      val row = line.split(" ")
      //注意:ID 不能之间当作 Long 型的时候 常常用 hashcode 代替
      val key = abs(row.head.hashCode.toLong)
      val feat = SparseVector(row.tail.map(_.toInt))
      (key, feat)
  }.toMap

  //通过 .edges 文件得到两个用户之间的关系 并且计算他们相同特征的个数
  val edges = sc.textFile(projectDir + id + ".edges").map {
    line =>
      val row = line.split(" ")
      val srcId = abs(row(0).hashCode.toLong)
      val dstId = abs(row(1).hashCode.toLong)
      val srcFeat = featureMap(srcId)
      val dstFeat = featureMap(dstId)
      val numCommonFeats: Int = srcFeat dot dstFeat
      Edge(srcId, dstId, numCommonFeats)
  }

  //利用 fromEdges 建立图
  val egoNetwork = Graph.fromEdges(edges, 1)

  //查看一下具有3个相同特征的用户对
  print(egoNetwork.edges.filter(_.attr == 3).count())
}

这里需要注意下面两个地方:

  • Graphx对应的顶点必须为 Long 型,对于不符合的类型,通常取 HashCode
  • fromEdges 建图只需要已知边即可

个人微信公众号

欢迎关注本人微信公众号,会定时发送关于大数据、机器学习、Java、Linux 等技术的学习文章,而且是一个系列一个系列的发布,无任何广告,纯属个人兴趣。
Clebeg能量集结号

© 著作权归作者所有

clebeg
粉丝 45
博文 40
码字总数 40057
作品 0
广州
程序员
私信 提问
加载中

评论(2)

clebeg
clebeg 博主

引用来自“jacksu”的评论

#utils4s#,欢迎一起交流学习79
欢迎欢迎,保持联系
jacksu
jacksu
#utils4s#,欢迎一起交流学习79
Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈与熟练的掌握Scala语言【大数据Spark

Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈 大数据的概念与应用,正随着智能手机、平板电脑的快速流行而日渐普及,大数据中图的并行化处理一直是一个非常热门的话题。图计算正在被广泛地...

Spark亚太研究院
2014/08/29
1K
0
Spark之GraphX的特点

1.基于内存实现了数据的复用与快速读取 具有较多迭代次数是图计算算法的一个重要特点。在海量数据背景下,如何保证图计算算法的执行效率是所有图计算模型面对的一个难题。基于MapReduce的图计...

mmake1994
2018/04/16
0
0
Spark2.1.0之模块设计

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80386736 在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》和《Spark...

泰山不老生
2018/06/05
0
0
Spark—4(Spark核心组件)

1、Spark Streaming Spark Sreaming基于Spark Core实现了可扩展、高吞吐和容错的实时数据流处理。现在支持的数据源和处理后的结果存储如下图所示。 Spark Streaming将流式计算分解成一系列短...

叶枫啦啦
07/09
49
0
Spark 数据分析导论-笔记

Spark Core Spark Core 实现了Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。 Spark Core 中还包含了 对弹性分布式数据集(resilient distributed dataset,简...

Java搬砖工程师
2018/12/26
53
0

没有更多内容

加载失败,请刷新页面

加载更多

75、GridFS

GridFS是MongoDB提供的用于持久化存储文件的模块,CMS使用Mongo DB存储数据,使用FGridFS可以快速集成开发。 工作原理: 在GridFS存储文件是将文件分块存储,文件会按照256KB的大小分割成多个...

lianbang_W
56分钟前
4
0
js bind 绑定this指向

本文转载于:专业的前端网站➱js bind 绑定this指向 1、示例代码 <!DOCTYPE html><html lang="zh"> <head> <meta charset="UTF-8" /> <title>bind函数绑定this指向......

前端老手
59分钟前
4
0
CentOS Linux 7上将ISO映像文件写成可启动U盘

如今,电脑基本上都支持U盘启动,所以,可以将ISO文件写到U盘上,用来启动并安装操作系统。 我想将一个CentOS Linux 7的ISO映像文件写到U盘上,在CentOS Linux 7操作系统上,执行如下命令: ...

大别阿郎
今天
4
0
深入vue-公司分享ppt

组件注册 全局注册 注册组件,传入一个扩展过的构造器 Vue.component('my-component', Vue.extend({/*...*/})) 注册组件,传入一个选项对象(自动调用Vue.extend) Vue.component('my-comp...

莫西摩西
今天
4
0
gitlab重置管理员密码

登录gitlab服务器 [root@localhost bin]# sudo gitlab-rails console productionLoading production environment (Rails 5.2.3)irb(main):001:0> u = User.where(email: 'admin@example.co......

King华仔o0
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部