文档章节

ScalaInAction 数据预处理

clebeg
 clebeg
发布于 2015/10/27 17:14
字数 770
阅读 280
收藏 0

ScalaInAction 数据预处理

前言

使用 Spark+Scala 进行数据预处理,最大的优势就是可以处理大数据量而且速度还会很快。
Scala 不仅拥有类似与R语言一样的语法特点,还比R语言更加灵活,可以开发自己想要的工具。
Spark 基于 Scala 开发,虽然在大数据处理的时候某些语法和 List 有些不同,但是使用起来也是非常顺手。

数据源

下载测试数据集合,解压缩,数据来源是:UC Irvine Machine Learning Repository,这个里面有很多好东西。

$ mkdir linkage
$ cd linkage/
$ curl -o donation.zip http://bit.ly/1Aoywaq
$ unzip donation.zip
$ unzip 'block_*.zip'

如果有 Spark 集群可以将数据上传到集群上面,下面的测试代码只需要修改两个地方就可以运行。

数据清洗

首先数据集合有10个文件,每个文件都是 Table 结果,列按照逗号分割,而且每个文件都有 header,空值使用 ? 表示。下面要做如下几件事:

  1. 去掉 header
  def isHead(line: String): Boolean = {
    line.contains("id_1")
  }
  1. 逗号分割转换 ? 为 Double.NaN 并且保存列到指定的对象
  case class MatchData(id1: Int, id2: Int, scores: Array[Double], matched: Boolean)
  def toDouble(s: String) = {
    if ("?".equals(s)) Double.NaN else s.toDouble
  }

  def parse(line: String) = {
    val pieces = line.split(',')
    val id1 = pieces(0).toInt
    val id2 = pieces(1).toInt
    val scores = pieces.slice(2, 11).map(x => toDouble(x))
    val matched = pieces(11).toBoolean
    MatchData(id1, id2, scores, matched)
  }

RDD[Double] 基本统计

这个转换的过程是比较有技术含量的,必须新建一个类,代替原来的 StatCounter 类,因为原来的 StatCounter 类不考虑空值。新建 DoubleNaNStatCounter

class DoubleNaNStatCounter extends Serializable {
  val stats: StatCounter = new StatCounter()
  var nan: Long = 0
  def add(x: Double): DoubleNaNStatCount = {
    if (java.lang.Double.isNaN(x))
      nan += 1
    else
      stats.merge(x)
    this
  }

  def merge(other: DoubleNaNStatCount): DoubleNaNStatCount = {
    stats.merge(other.stats)
    nan += other.nan
    this
  }

  override def toString = {
    "stats: " + stats.toString() + " NaN: " + nan
  }
}

object DoubleNaNStatCount extends Serializable {
  def apply(x: Double) = new DoubleNaNStatCount().add(x)
}

再加一个辅助方法,这个方法非常重要,partition本地处理,减少数据传输,优化效率:

  def statsWithMissing(rdd: RDD[Array[Double]]): Array[DoubleNaNStatCount] = {
    val nastats = rdd.mapPartitions((iter: Iterator[Array[Double]]) => {
      val nas: Array[DoubleNaNStatCount] = iter.next().map(d => DoubleNaNStatCount(d))
      iter.foreach(arr => {
        nas.zip(arr).foreach { case (n, d) => n.add(d) }
      })
      Iterator(nas)
    })
    nastats.reduce((n1, n2) => {
      n1.zip(n2).map { case (a, b) => a.merge(b) }
    })
  }

上面类和方法看懂了,基本对 Spark 工作原理就懂了。上面两个操作,等于自己实现了一个更加通用的 Stat,可以作为以后的工作方法用。

驱动代码

  def main(args: Array[String]) {
    //master指定为本地,意味着这是测试
    val conf = new SparkConf().setAppName("SparkInAction").setMaster("local[4]")
    val sc = new SparkContext(conf)
    //如果不是测试这个路径修改为对应的正确位置
    val rdd = sc.textFile("F:\\clebeg\\spark\\donation")
    //去除每个文件的头信息
    val noHeader = rdd.filter(!isHead(_))
    val parsed = noHeader.map(parse)
    //如何转换成Map,如何为Map排序
    val matchCount = parsed.map(md => md.matched).countByValue()
    matchCount.foreach(println)
    matchCount.toSeq.sortBy(_._1).foreach(println)
    matchCount.toSeq.sortBy(_._2).foreach(println)
    //RDD[Double] 通过隐式类型转换具有 stats 方法

    //下面查看匹配和不匹配的数据之间的差异
    val nasm = statsWithMissing(parsed.filter(_.matched).map(_.scores))
    val nasn = statsWithMissing(parsed.filter(!_.matched).map(_.scores))
    val diff = nasm.zip(nasn).map{case (a, b) => (a.nan + b.nan, a.stats.mean - b.stats.mean)}
    diff.foreach(println)
  )

© 著作权归作者所有

clebeg
粉丝 45
博文 40
码字总数 40057
作品 0
广州
程序员
私信 提问
ScalaInAction测试——静态测试

ScalaInAction测试——静态测试 前言 首先,我们需要了解如何测试发送和接受消息,包括发送然后不管的模式以及发送之后等待回复的交互式模式。 使用的是 Scala 的测试框架 [ScalaTest]。这个...

clebeg
2015/10/26
62
0
时序数据库(II)- 预处理

物联网领域近期如火如荼,互联网和传统公司争相布局物联网。作为物联网领域数据存储的首选时序数据库也越来越多进入人们的视野,早在2016年7月,百度云在其天工物联网平台上发布了国内首个多...

张瑞鹤
2017/05/18
16
0
数据挖掘之数据预处理

1. 结构化数据的特征 结构化数据特性 2. 常见数据集类型 数据集类型 3. 数据预处理的流程 预处理流程 4. 数据质量 数据挖掘过程中 数据质量 对运行结果至关重要。 由于数据挖掘使用的数据常常...

七八音
2017/12/27
0
0
JFinal教程JfinalUIB 代码笔记 (4)--- 高仿mybatis的sql的集中管理

实现sql的集中管理,简单的把一些固定长度的sql移植进xml很简单,这没有什么好多说的,关键问题是我们平时处理的sql,有大量是动态长度的,比如说最常见的就是多条件的分页查询,往往我们会在...

littleant
2014/08/12
5.9K
5
19- OpenCV+TensorFlow 入门人工智能图像处理-刷脸识别实现

章节介绍 实现一个刷脸识别 输入一张图片,找到图片中的人脸,识别这个是哪一个人。 案例代码简单 人工网络层级低。 隐层只有一层。 综合: 样本收集 + 图像预处理 + TensorFlow神经网络 pyth...

天涯明月笙
2018/05/19
0
0

没有更多内容

加载失败,请刷新页面

加载更多

【2019年8月版本】OCP 071认证考试最新版本的考试原题-第13题

Choose the best answer. Examine this query: SELECT TRUNC (ROUND(156.00,-2),-1) FROM DUAL; What is the result? A) 16 B) 160 C) 150 D) 200 E) 100 Answer:D (解析:关键就是 round ......

oschina_5359
32分钟前
5
0
SolrJ 操作solr增删改查

Solr的特性包括: • 高级的全文搜索功能 • 专为高通量的网络流量进行的优化 • 基于开放接口(XML和HTTP)的标准 • 综合的HTML管理界面 • 可伸缩性-能够有效地复制到另外一个Solr搜索服...

雷开你的门
34分钟前
5
0
What’s the easiest way to teach a robot arm to stack blocks?

What’s the easiest way to teach a robot arm to stack blocks? That’s the nuanced question to which researchers at Google parent company Alphabet’s DeepMind sought answers dur......

巧克夹心
35分钟前
4
0
Vuex-dev demo npm install 时总是报phantomjs-prebuilt@2.1.14安装失败

解决方法,删除node_modules,package-lock.json以及package.json里面的 phantomjs-prebuilt。然后 npm install,最后安装phantomjs-prebuilt。 npm install phantomjs-prebuilt@2.1.14 --ig......

牧云橙
36分钟前
7
0
理解自定义starter的主要目的

自定义starter的主要功能是自动帮我们将某个JavaBean转换成SpringBean,在这个过程中,有些类的实例化会依赖其他的类,考虑到是否需要交给Spring管理,会有很多注解来辅助boot是否是实例化该...

redis缓存
44分钟前
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部