文档章节

Spark数据挖掘-FPGrowth算法

clebeg
 clebeg
发布于 2015/11/03 15:15
字数 2445
阅读 706
收藏 9

Spark数据挖掘-FPGrowth算法

主要内容

  1. 什么是关联规则挖掘?
  2. 关联规则有哪些术语?
  3. 什么是FP-Growth算法?

1.1 FPGrowth算法

1.1.1 基本概念

关联规则挖掘的一个典型例子是购物篮分析。关联规则研究有助于发现交易数据库中不同商品(项)之间的联系,找出顾客购买行为模式,如购买了某一商品对购买其他商品的影响,分析结果可以应用于商品货架布局、货存安排以及根据购买模式对用户进行分类。

关联规则的相关术语如下:

**(1)项与项集 **
这是一个集合的概念,在一篮子商品中的一件消费品即为一项(Item),则若干项的集合为项集,如{啤酒,尿布}构成一个二元项集。

**(2)关联规则 **
一般记为的形式,X为先决条件,Y为相应的关联结果,用于表示数据内隐含的关联性。如:表示购买了尿布的消费者往往也会购买啤酒。 关联性强度如何,由三个概念——支持度、置信度、提升度来控制和评价。 例:有10000个消费者购买了商品,其中购买尿布1000个,购买啤酒2000个,购买面包500个,同时购买尿布和面包800个,同时购买尿布和面包100个。

(3)支持度(Support)
支持度是指在所有项集中{X, Y}出现的可能性,即项集中同时含有X和Y的概率: 该指标作为建立强关联规则的第一个门槛,衡量了所考察关联规则在“量”上的多少。通过设定最小阈值(minsup),剔除“出镜率”较低的无意义规则,保留出现较为频繁的项集所隐含的规则。 设定最小阈值为5%,由于{尿布,啤酒}的支持度为800/10000=8%,满足基本输了要求,成为频繁项集,保留规则;而{尿布,面包}的支持度为100/10000=1%,被剔除。

(4)置信度(Confidence)
置信度表示在先决条件X发生的条件下,关联结果Y发生的概率: 这是生成强关联规则的第二个门槛,衡量了所考察的关联规则在“质”上的可靠性。相似的,我们需要对置信度设定最小阈值(mincon)来实现进一步筛选。 具体的,当设定置信度的最小阈值为70%时,置信度为800/1000=80%,而的置信度为800/2000=40%,被剔除。

(5)提升度(lift)
提升度表示在含有X的条件下同时含有Y的可能性与没有X这个条件下项集中含有Y的可能性之比:公式为confidence(artichok => cracker)/support(cracker) = 80%/50% = 1.6。该指标与置信度同样衡量规则的可靠性,可以看作是置信度的一种互补指标。

1.1.2 FP-Growth算法

FP-Growth(频繁模式增长)算法是韩家炜老师在2000年提出的关联分析算法,它采取如下分治策略:将提供频繁项集的数据库压缩到一棵频繁模式树(FP-Tree),但仍保留项集关联信息;该算法和Apriori算法最大的不同有两点:第一,不产生候选集,第二,只需要两次遍历数据库,大大提高了效率。

**(1)按以下步骤构造FP-树 **
(a) 扫描事务数据库D一次。收集频繁项的集合F和它们的支持度。对F按支持度降序排序,结果为频繁项表L。
(b) 创建FP-树的根结点,以“null”标记它。对于D 中每个事务Trans,执行:选择 Trans 中的频繁项,并按L中的次序排序。设排序后的频繁项表为[p | P],其中,p 是第一个元素,而P 是剩余元素的表。调用insert_tree([p | P], T)。该过程执行情况如下。如果T有子女N使得N.item-name = p.item-name,则N 的计数增加1;否则创建一个新结点N将其计数设置为1,链接到它的父结点T,并且通过结点链结构将其链接到具有相同item-name的结点。如果P非空,递归地调用insert_tree(P, N)。

**(2)FP-树的挖掘 **
通过调用FP_growth(FP_tree, null)实现。该过程实现如下:

  • FP_growth(Tree, α)
  • if Tree 含单个路径P then
  • for 路径 P 中结点的每个组合(记作β)
  • 产生模式β ∪ α,其支持度support = β中结点的最小支持度;
  • else for each ai在Tree的头部(按照支持度由低到高顺序进行扫描) {
  • 产生一个模式β = ai ∪ α,其支持度support = ai .support;
  • 构造β的条件模式基,然后构造β的条件FP-树Treeβ;
  • if Treeβ ≠ ∅ then
  • 调用 FP_growth (Treeβ, β);}
  • end

1.1.3 FP-Growth算法演示—构造FP-树

  • **事务数据库建立 **
    0

  • **创建根结点和频繁项目表 **
    1

  • 加入第一个事务(I2,I1,I5)
    2

  • 加入第二个事务(I2,I4)
    3

  • 加入第三个事务(I2,I3)
    4

以此类推加入第5、6、7、8、9个事务。

  • 加入第九个事务(I2,I1,I3)
    5

1.1.4 FP-Growth算法演示—FP-树挖掘

FP-树建好后,就可以进行频繁项集的挖掘,挖掘算法称为FpGrowth(Frequent Pattern Growth)算法,挖掘从表头header的最后一个项开始,以此类推。本文以I5、I3为例进行挖掘。

(1)挖掘I5:
对于I5,得到条件模式基:<(I2,I1:1)>、<I2,I1,I3:1>
构造条件FP-tree:
6
得到I5频繁项集:{{I2,I5:2},{I1,I5:2},{I2,I1,I5:2}}
I4、I1的挖掘与I5类似,条件FP-树都是单路径。
(2)挖掘I3:
I5的情况是比较简单的,因为I5对应的条件FP-树是单路径的,I3稍微复杂一点。I3的条件模式基是(I2 I1:2), (I2:2), (I1:2),生成的条件FP-树如下图:
6
I3的条件FP-树仍然是一个多路径树,首先把模式后缀I3和条件FP-树中的项头表中的每一项取并集,得到一组模式{I2 I3:4, I1 I3:4},但是这一组模式不是后缀为I3的所有模式。还需要递归调用FP-growth,模式后缀为{I1,I3},{I1,I3}的条件模式基为{I2:2},其生成的条件FP-树如下图所示。
6
在FP_growth中把I2和模式后缀{I1,I3}取并得到模式{I1 I2 I3:2}。 理论上还应该计算一下模式后缀为{I2,I3}的模式集,但是{I2,I3}的条件模式基为空,递归调用结束。最终模式后缀I3的支持度>2的所有模式为:{ I2 I3:4, I1 I3:4, I1 I2 I3:2}。

1.2 Spark Mllib FPGrowth源码分析

FPGrowth源码包括:FPGrowth、FPTree两部分。 其中FPGrowth中包括:run方法、genFreqItems方法、genFreqItemsets方法、genCondTransactions方法; FPTree中包括:add方法、merge方法、project方法、getTransactions方法、extract方法。

// run 计算频繁项集
  /**
   * Computes an FP-Growth model that contains frequent itemsets.
   * @param data input data set, each element contains a transaction
   * @return an [[FPGrowthModel]]
   */
  def run[Item: ClassTag](data: RDD[Array[Item]]): FPGrowthModel[Item] = {
    if (data.getStorageLevel == StorageLevel.NONE) {
      logWarning("Input data is not cached.")
    }
    val count = data.count()//计算事务总数
    val minCount = math.ceil(minSupport * count).toLong//计算最小支持度
    val numParts = if (numPartitions > 0) numPartitions else data.partitions.length
val partitioner = new HashPartitioner(numParts)
//freqItems计算满足最小支持度的Items项
val freqItems = genFreqItems(data, minCount, partitioner)
//freqItemsets计算频繁项集
    val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner)
    new FPGrowthModel(freqItemsets)
}
// genFreqItems计算满足最小支持度的Items项
/**
   * Generates frequent items by filtering the input data using minimal support level.
   * @param minCount minimum count for frequent itemsets
   * @param partitioner partitioner used to distribute items
   * @return array of frequent pattern ordered by their frequencies
   */
  privatedef genFreqItems[Item: ClassTag](
      data: RDD[Array[Item]],
      minCount: Long,
      partitioner: Partitioner): Array[Item] = {
    data.flatMap { t =>
      val uniq = t.toSet
      if (t.size != uniq.size) {
        thrownew SparkException(s"Items in a transaction must be unique but got ${t.toSeq}.")
      }
      t
    }.map(v => (v, 1L))
      .reduceByKey(partitioner, _ + _)
      .filter(_._2 >= minCount)
      .collect()
      .sortBy(-_._2)
      .map(_._1)
}//统计每个Items项的频次,对小于minCount的Items项过滤,返回Items项。

// genFreqItemsets计算频繁项集:生成FP-Trees,挖掘FP-Trees
/**
   * Generate frequent itemsets by building FP-Trees, the extraction is done on each partition.
   * @param data transactions
   * @param minCount minimum count for frequent itemsets
   * @param freqItems frequent items
   * @param partitioner partitioner used to distribute transactions
   * @return an RDD of (frequent itemset, count)
   */
  privatedef genFreqItemsets[Item: ClassTag](
      data: RDD[Array[Item]],
      minCount: Long,
      freqItems: Array[Item],
      partitioner: Partitioner): RDD[FreqItemset[Item]] = {
    val itemToRank = freqItems.zipWithIndex.toMap//表头
    data.flatMap { transaction =>
      genCondTransactions(transaction, itemToRank, partitioner)
    }.aggregateByKey(new FPTree[Int], partitioner.numPartitions)( //生成FP树
      (tree, transaction) => tree.add(transaction, 1L), //FP树增加一条事务
      (tree1, tree2) => tree1.merge(tree2)) //FP树合并
    .flatMap { case (part, tree) =>
      tree.extract(minCount, x => partitioner.getPartition(x) == part)//FP树挖掘频繁项
    }.map { case (ranks, count) =>
      new FreqItemset(ranks.map(i => freqItems(i)).toArray, count)
    }
}
// add FP-Trees增加一条事务数据
/** Adds a transaction with count. */
  def add(t: Iterable[T], count: Long = 1L): this.type = {
    require(count > 0)
    var curr = root
    curr.count += count
    t.foreach { item =>
      val summary = summaries.getOrElseUpdate(item, new Summary)
      summary.count += count
      val child = curr.children.getOrElseUpdate(item, {
        val newNode = new Node(curr)
        newNode.item = item
        summary.nodes += newNode
        newNode
      })
      child.count += count
      curr = child
    }
    this
}
// merge FP-Trees合并
  /** Merges another FP-Tree. */
  def merge(other: FPTree[T]): this.type = {
    other.transactions.foreach { case (t, c) =>
      add(t, c)
    }
    this
}
// extract FP-Trees挖掘,返回所有频繁项集
  /** Extracts all patterns with valid suffix and minimum count. */
  def extract(
      minCount: Long,
      validateSuffix: T => Boolean = _ => true): Iterator[(List[T], Long)] = {
    summaries.iterator.flatMap { case (item, summary) =>
      if (validateSuffix(item) && summary.count >= minCount) {
        Iterator.single((item :: Nil, summary.count)) ++
          project(item).extract(minCount).map { case (t, c) =>
            (item :: t, c)
          }
      } else {
        Iterator.empty
      }
    }
  }
}

1.3 Mllib FPGrowth实例

1、数据

数据格式为:物品1物品2物品3…
r z h k p
z y x w v u t s
s x o n r
x z y m t s q e
z
x z y r q t p

2、代码

//读取样本数据
 valdata_path = "/home/tmp/sample_fpgrowth.txt"
 valdata = sc.textFile(data_path)
 valexamples = data.map(_.split(" ")).cache()
 //建立模型
 valminSupport = 2
 valnumPartition = 10
 valmodel = new FPGrowth()
   .setMinSupport(minSupport)
   .setNumPartitions(numPartition)
   .run(examples)
 //打印结果
 println(s"Number of frequent itemsets: ${model.freqItemsets.count()}")
 model.freqItemsets.collect().foreach { itemset =>
   println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)
 }

个人微信公众号

欢迎关注本人微信公众号,会定时发送关于大数据、机器学习、Java、Linux 等技术的学习文章,而且是一个系列一个系列的发布,无任何广告,纯属个人兴趣。
Clebeg能量集结号

本文转载自:http://www.aboutyun.com/thread-13009-1-1.html

clebeg
粉丝 45
博文 40
码字总数 40057
作品 0
广州
程序员
私信 提问
【Spark亚太研究院系列丛书】Spark实战高手之路-第3章Spark架构设计与编程模型第1节②

三、你为什么需要Spark; 你需要Spark的十大理由: 1,Spark是可以革命Hadoop的目前唯一替代者,能够做Hadoop做的一切事情,同时速度比Hadoop快了100倍以上: Logistic regression in Hadoo...

Spark亚太研究院
2014/12/16
432
2
Spark部署及技术选型

Spark部署及技术选型 在飞速发展的云计算大数据时代,Spark是继Hadoop之后,成为替代Hadoop的下一代云计算大数据核心技术,目前Spark已经构建了自己的整个大数据处理生态系统,如流处理、图技...

Spark亚太研究院
2014/07/01
2.2K
4
从Hadoop到Spark的架构实践

当下,Spark已经在国内得到了广泛的认可和支持:2014年,Spark Summit China在北京召开,场面火爆;同年,Spark Meetup在北京、上海、深圳和杭州四个城市举办,其中仅北京就成功举办了5次,内...

Emilypz
2015/10/10
1K
0
Spark成为大数据高手进阶步骤

什么是Spark Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapRedu...

MoksMo
2015/11/05
2.1K
1
【Spark亚太研究院系列丛书】Spark实战高手之路-第3章Spark架构设计与编程模型第1节 ①

一、MapReduce已死,Spark称霸 由于Hadoop的MapReduce高延迟的死穴,导致Hadoop无力处理很多对时间有要求的场景,人们对其批评越来越多,Hadoop无力改变现在而导致正在死亡。正如任何领域一样...

Spark亚太研究院
2014/12/12
311
0

没有更多内容

加载失败,请刷新页面

加载更多

linux 磁盘不足异常

linux 报 No space left on device 异常 ,则是磁盘不足 ,导致异常 运行 df -h 命令查询磁盘使用率,如果有100%,则查找目录大日志文件删除 1.磁盘不足导致系统应用写入文件失败,如系统日志...

zaolonglei
21分钟前
2
0
即学即用的 30 段 Python 实用代码

☞ 分享:最全最新的Python学习大礼包 ☜ 点击查看 编译:Pita & AI开发者,作者:Fatos Morina Python是目前最流行的语言之一,它在数据科学、机器学习、web开发、脚本编写、自动化方面被许...

Object_Man
22分钟前
3
0
The server time zone value 'EDT' is unrecognized or represents more than one time zone.

2019-10-14 18:07:43.714 ERROR 74363 --- [Druid-ConnectionPool-Create-1855026648] com.alibaba.druid.pool.DruidDataSource : create connection SQLException, url: jdbc:mysql://10.30......

yizhichao
35分钟前
8
0
html加载顺序以及影响页面二次渲染额的因素

本文转载于:专业的前端网站➱html加载顺序以及影响页面二次渲染额的因素 浏览器请求发往服务器以后,返回HTML页面,页面内容开始渲染,具体的执行顺序为: 1. 浏览器开始载入html代码,发现<...

前端老手
37分钟前
9
0
BeginnersBook JSP、JSTL、Servlet 教程

来源:ApacheCN BeginnersBook 翻译项目 译者:飞龙 协议:CC BY-NC-SA 4.0 贡献指南 本项目需要校对,欢迎大家提交 Pull Request。 请您勇敢地去翻译和改进翻译。虽然我们追求卓越,但我们并...

ApacheCN_飞龙
49分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部