文档章节

算法——水塘抽样 reservoirSample

freeli
 freeli
发布于 2018/12/12 20:39
字数 1156
阅读 8
收藏 0

简介

reservoir的作用是:**在不知道文件总行数的情况下,如何从文件中随机的抽取一行?**即是说如果最后发现文字档共有N行,则每一行被抽取的概率均为1/N?

我们可以:定义取出的行号为choice,第一次直接以第一行作为取出行 choice ,而后第二次以二分之一概率决定是否用第二行替换 choice ,第三次以三分之一的概率决定是否以第三行替换 choice ……,以此类推。由上面的分析我们可以得出结论,在取第n个数据的时候,我们生成一个0到1的随机数p,如果p小于1/n,保留第n个数。大于1/n,继续保留前面的数。直到数据流结束,返回此数,算法结束。

这个问题的扩展就是:如何从未知或者很大样本空间随机地取k个数?亦即是说,如果档案共有N ≥ k行,则每一行被抽取的概率为k/N。

  根据上面(随机取出一元素)的分析,我们可以把上面的1/n变为k/n即可。思路为:在取第n个数据的时候,我们生成一个0到1的随机数p,如果p小于k/n,替换池中任意一个为第n个数。大于k/n,继续保留前面的数。直到数据流结束,返回此k个数。但是为了保证计算机计算分数额准确性,一般是生成一个0到n的随机数,跟k相比,道理是一样的。

伪代码

从S中抽取首k项放入「水塘」中
对于每一个S[j]项(j ≥ k):
   随机产生一个范围0到j的整数r
   若 r < k 则把水塘中的第r项换成S[j]项
/*
  S has items to sample, R will contain the result
*/
ReservoirSample(S[1..n], R[1..k])
  // fill the reservoir array
  for i = 1 to k
      R[i] := S[i]
 
  // replace elements with gradually decreasing probability
  for i = k+1 to n
    j := random(1, i)   // important: inclusive range
    if j <= k
        R[j] := S[i]

实现概述

  1. 获取到需要抽样RDD分区的样本大小k和分区的所有KEY数组input
  2. 初始化抽样结果集reservoir为分区前K个KEY值
  3. 如果分区的总数小于预计样本大小k,则将当前分区的所有数据作为样本数据,否则到第四步
  4. 遍历分区里所有Key组成的数组input
  5. 生成随机需要替换input数组的下标,如果下标小于K则替换
  6. 返回抽取的key值数组和当前分区的总数据量: (reservoir, l)

实现源码

/**
   * Reservoir sampling implementation that also returns the input size.
   *
   * @param input:RDD的分区里面的key组成的Iterator
   * @param k :抽样大小=
   		val sampleSize = math.min(20.0 * partitions, 1e6)
   		val k=math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
   * @param seed random seed:选取随机数的种子
   * @return (samples, input size)
   */
  def reservoirSampleAndCount[T: ClassTag](
      input: Iterator[T],
      k: Int,
      seed: Long = Random.nextLong())
    : (Array[T], Long) = {
    val reservoir = new Array[T](k)
    // Put the first k elements in the reservoir.
    // 初始化水塘数据为input的钱K个数,即:reservoir数组中放了RDD分区的前K个key值
    var i = 0
    while (i < k && input.hasNext) {
      val item = input.next()
      reservoir(i) = item
      i += 1
    }

    // If we have consumed all the elements, return them. Otherwise do the replacement.
    // 如果当前的RDD总数小于预设值的采样量则全部作为采样数据并结束采样
    if (i < k) {
      // If input size < k, trim the array to return only an array of input size.
      val trimReservoir = new Array[T](i)
      System.arraycopy(reservoir, 0, trimReservoir, 0, i)
      (trimReservoir, i)
    } else {
      // If input size > k, continue the sampling process.
      var l = i.toLong
      val rand = new XORShiftRandom(seed)
      // 遍历所有的key
      while (input.hasNext) {
        val item = input.next()
        l += 1
        // There are k elements in the reservoir, and the l-th element has been
        // consumed. It should be chosen with probability k/l. The expression
        // below is a random long chosen uniformly from [0,l)
        // 计算出需要替换的数组下标
        // 选取第n个数的概率是:n/l; 如果随机替换数组值的概率是p=rand.nextDouble,
        // 则如果p<k/l;则替换池中任意一个数,即: p*l < k 则进行替换,用p*l作为随机替换的下标
        val replacementIndex = (rand.nextDouble() * l).toLong
        if (replacementIndex < k) {
          // 替换reservoir[随机抽取的下标]的值为input[l]的值item
          reservoir(replacementIndex.toInt) = item
        }
      }
      (reservoir, l)
    }
  }

参考:https://www.iteblog.com/archives/1525.html

© 著作权归作者所有

共有 人打赏支持
freeli
粉丝 1
博文 61
码字总数 85793
作品 0
朝阳
程序员
私信 提问
sparkCore源码解析之RangePartitioner

---title: spark源码解析之RangePartitionersubtitle: spark rdd 分区器description: ,RangePartitioner详解keywords: [水塘抽样,RangePartitioner,分区]author: liyzdate: 2018-12-10tags: ......

freeli
2018/12/12
0
0
[LeetCode] Shuffle an Array 数组洗牌

Shuffle a set of numbers without duplicates. Example: // Init an array with set 1, 2, and 3.int[] nums = {1,2,3};Solution solution = new Solution(nums);// Shuffle the array [1,2......

机器的心脏
2017/12/12
0
0
sparkCore源码解析之partition

---title: sparkCore源码解析之partitionsubtitle: 分区的源码解析description: 水塘抽样keywords: [spark,core,源码,partition]author: liyzdate: 2019-01-14tags: [spark,源码解析] categ......

freeli
01/14
0
0
sparkCore源码解析之RangePartitioner源码[具体流程见上篇]

---title: spark源码解析之RangePartitioner源码subtitle: spark rdd 分区器description: ,RangePartitioner详解keywords: [水塘抽样,RangePartitioner,分区]author: liyzdate: 2018-12-10ta......

freeli
2018/12/12
0
1
Spark分区器HashPartitioner和RangePartitioner代码详解

1.HashPartitioner分区怎么用源码解析? 2.RangePartitioner分区怎么用源码解析? 3.定位分区ID怎么用源码解析? HashPartitioner分区 复制代码 RangePartitioner分区...

亮亮-AC米兰
2017/02/09
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Django进阶 1.1 ORM基础—ORM 1.2.1 增删改查之查询 1.2.2 删改增 (1) 1.2.3 删改增 (2)

ORM基础 ORM是Django操作数据库的API,Django的作者将sql语句封装在里面供我们使用。 我们前面还提到过Django提供一个模拟数据库的工具,sqlite,供我们学习测试使用。 如果我们想使用mysql...

隐匿的蚂蚁
今天
1
0
Windows 上安装 Scala

在安装 Scala 之前需要先安装 Java 环境,具体安装的详细方法就不在这里描述了。 您可以自行搜索我们网站中的内容获得其他网站的帮助来获得如何安装 Java 环境的方法。 接下来,我们可以从 ...

honeymose
今天
3
0
数据库篇多表操作

第1章 多表操作 实际开发中,一个项目通常需要很多张表才能完成。例如:一个商城项目就需要分类表(category)、商品表(products)、订单表(orders)等多张表。且这些表的数据之间存在一定的关系...

stars永恒
今天
3
0
nginx日志自动切割

1.日志配置(Nginx 日志) access.log----记录哪些用户,哪些页面以及用户浏览器,IP等访问信息;error.log------记录服务器错误的日志 #配置日志存储路径:location / {      a...

em_aaron
昨天
5
0
java 反射

基本概念 RTTI,即Run-Time Type Identification,运行时类型识别。RTTI能在运行时就能够自动识别每个编译时已知的类型。   要想理解反射的原理,首先要了解什么是类型信息。Java让我们在运...

细节探索者
昨天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部