文档章节

Spark Learning

无断无灭无住无念
 无断无灭无住无念
发布于 2017/09/06 15:41
字数 697
阅读 16
收藏 0
点赞 0
评论 0

offcially manual : https://spark.apache.org/docs/latest/rdd-programming-guide.html

一: Spark versus Hadoop

Spark is faster than Hadoop cause hadoop execute disk io to retain failure tolerant function,whereas Spark through its functional programming .

二 :spark RDD(resilient distributed datasets)

TRANSFORMATION: LAZY to execute,like filter(),map(),flatMap() and so forth,spark could optimize chain operations,never execute intermediate process.

ACTION: EAGER to execute. like count(), foreach()  countByKey and so forth

三:spark job execution

四:COMMON USED API

transformation: groupBy   groupByKey  reduceBy reduceByKey  mapValues keys不会立即计算结果(lazy)

WikipediaRanking assignment:使用inverted index配合reduceByKey排序 比传统的遍历行查找内容aggregate速度快上一倍

package wikipedia

import java.util.stream.Collectors

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

case class WikipediaArticle(title: String, text: String) {
  /**
    * @return Whether the text of this article mentions `lang` or not
    * @param lang Language to look for (e.g. "Scala")
    */
  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

object WikipediaRanking {

  val langs = List(
    "JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS",
    "Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy")

  val conf: SparkConf = new SparkConf().setAppName("Spark RDD").setMaster("local[*]").set("spark.executor.memory", "2g")
  val sc: SparkContext = new SparkContext(conf)

  // Hint: use a combination of `sc.textFile`, `WikipediaData.filePath` and `WikipediaData.parse`
  val wikiRdd: RDD[WikipediaArticle] = sc.textFile(WikipediaData.filePath).flatMap(lines => lines.split("\n")).map(x=>WikipediaData.parse(x))
  /** Returns the number of articles on which the language `lang` occurs.
   *  Hint1: consider using method `aggregate` on RDD[T].
   *  Hint2: consider using method `mentionsLanguage` on `WikipediaArticle`
   */
  def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = rdd.aggregate(0)((acc,article)=>
    if(article.mentionsLanguage(lang)) acc+1 else acc,(acc1, acc2) => (acc1 + acc2))

  /* (1) Use `occurrencesOfLang` to compute the ranking of the languages
   *     (`val langs`) by determining the number of Wikipedia articles that
   *     mention each language at least once. Don't forget to sort the
   *     languages by their occurrence, in decreasing order!
   *
   *   Note: this operation is long-running. It can potentially run for
   *   several seconds.
   */
  //Result��List(("Scala", 999999), ("JavaScript", 1278), ("LOLCODE", 982), ("Java", 42))
  def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] =
    langs.map((lang)=>(lang,occurrencesOfLang(lang,rdd))).sortWith((x,y)=>x._2>y._2)



  /* Compute an inverted index of the set of articles, mapping each language
   * to the Wikipedia pages in which it occurs.
   */
  def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = {
    rdd.map((w)=>(w,langs.filter((o)=>w.mentionsLanguage(o)).toList)).map(x=>x._2.map((ls)=>(ls,x._1))).flatMap(x=>x).groupByKey()
  }




  /* (2) Compute the language ranking again, but now using the inverted index. Can you notice
   *     a performance improvement?
   *
   *   Note: this operation is long-running. It can potentially run for
   *   several seconds.
   */
  def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] =
  index.map((o)=>(o._1,o._2.size)).sortBy(_._2,false).collect().toList

  /* (3) Use `reduceByKey` so that the computation of the index and the ranking are combined.
   *     Can you notice an improvement in performance compared to measuring *both* the computation of the index
   *     and the computation of the ranking? If so, can you think of a reason?
   *
   *   Note: this operation is long-running. It can potentially run for
   *   several seconds.
   */
  def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] =
    rdd.map((w)=>(w,langs.filter((o)=>w.mentionsLanguage(o)).toList)).map(x=>x._2.map((ls)=>(ls,x._1))).flatMap(x=>x).map((m)=>(m._1,1)).reduceByKey(_+_).sortBy(_._2,false).collect().toList


  def main(args: Array[String]) {

    /* Languages ranked according to (1) */
    val langsRanked: List[(String, Int)] = timed("Part 1: naive ranking", rankLangs(langs, wikiRdd))

    /* An inverted index mapping languages to wikipedia pages on which they appear */
    def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)

    /* Languages ranked according to (2), using the inverted index */
    val langsRanked2: List[(String, Int)] = timed("Part 2: ranking using inverted index", rankLangsUsingIndex(index))

    /* Languages ranked according to (3) */
    val langsRanked3: List[(String, Int)] = timed("Part 3: ranking using reduceByKey", rankLangsReduceByKey(langs, wikiRdd))

    /* Output the speed of each ranking */
    println(timing)
    sc.stop()
  }

  val timing = new StringBuffer
  def timed[T](label: String, code: => T): T = {
    val start = System.currentTimeMillis()
    val result = code
    val stop = System.currentTimeMillis()
    timing.append(s"Processing $label took ${stop - start} ms.\n")
    result
  }
}

pair RDDs

© 著作权归作者所有

共有 人打赏支持
无断无灭无住无念
粉丝 14
博文 39
码字总数 30015
作品 0
苏州
CTO(技术副总裁)
Spark的39个机器学习库-英文

Apache Spark itself 1. MLlib AMPLab Spark originally came out of Berkeley AMPLab and even today AMPLab projects, even though they are not in Apache Spark Foundation, enjoy a sta......

MoksMo
2015/11/04
0
1
你不能错过的 spark 学习资源

1. 书籍,在线文档 2. 网站 3. Databricks Blog 4. 文章,博客 5. 视频

u012608836
04/12
0
0
在 Databricks 可获得 Spark 1.5 预览版

我们兴奋地宣布,从今天开始,Apache Spark1.5.0的预览数据砖是可用的。我们的用户现在可以选择提供集群与Spark 1.5或先前的火花版本准备好几个点击。 正式,Spark 1.5预计将在数周内公布,和社区...

stark_summer
2015/08/25
0
0
A Vision for Making Deep Learning Simple

A Vision for Making Deep Learning Simple When MapReduce was introduced 15 years ago, it showed the world a glimpse into the future. For the first time, engineers at Silicon Vall......

openthings
05/17
0
0
利用Knime建立Spark Machine learning 模型 1:开发环境搭建

1、Knime Analytics 安装 从官方网站下载合适的版本 https://www.knime.com/downloads 将下载的安装包在安装路径解压 https://www.knime.com/installation-0 下图是knime启动后的欢迎页面...

forestwater
05/09
0
0
Comprehensive Introduction to Apache Spark

Introduction Industry estimates that we are creating more than 2.5 Quintillion bytes of data every year. Think of it for a moment – 1 Qunitillion = 1 Million Billion! Can you i......

grasp_D
06/15
0
0
利用KNIME建立Spark Machine learning模型 2:泰坦尼克幸存预测

本文利用KNIME基于Spark决策树模型算法,通过对泰坦尼克的包含乘客及船员的特征属性的训练数据集进行训练,得出决策树幸存模型,并利用测试数据集对模型进行测试。 1、从Kaggle网站下载训练...

forestwater
05/09
0
0
当Spark遇上TensorFlow分布式深度学习框架原理和实践

近年来,机器学习和深度学习不断被炒热,tensorflow 作为谷歌发布的数值计算和神经网络的新框架也获得了诸多关注,spark和tensorflow深度学习框架的结合,使得tensorflow在现有的spark集群上...

蓝色de叶子
04/16
0
0
Spark 1.4.1 Standalone 模式部署安装配置

各节点执行如下操作(或在一个节点上操作完后 scp 到其它节点): 1、 解压spark安装程序到程序目录/bigdata/soft/spark-1.4.1,约定此目录为$SPARKHOME tar –zxvf spark-1.4-bin-hadoop2.6....

山疯
2015/08/12
0
0
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
0
1

没有更多内容

加载失败,请刷新页面

加载更多

下一页

实现异步有哪些方法

有哪些方法可以实现异步呢? 方式一:java 线程池 示例: @Test public final void test_ThreadPool() throws InterruptedException { ScheduledThreadPoolExecutor scheduledThre......

黄威
今天
0
0
linux服务器修改mtu值优化cpu

一、jumbo frames 相关 1、什么是jumbo frames Jumbo frames 是指比标准Ethernet Frames长的frame,即比1518/1522 bit大的frames,Jumbo frame的大小是每个设备厂商规定的,不属于IEEE标准;...

六库科技
今天
0
0
牛客网刷题

1. 二维数组中的查找(难度:易) 题目描述 在一个二维数组中(每个一维数组的长度相同),每一行都按照从左到右递增的顺序排序,每一列都按照从上到下递增的顺序排序。请完成一个函数,输入...

大不了敲一辈子代码
今天
0
0
linux系统的任务计划、服务管理

linux任务计划cron 在linux下,有时候要在我们不在的时候执行一项命令,或启动一个脚本,可以使用任务计划cron功能。 任务计划要用crontab命令完成 选项: -u 指定某个用户,不加-u表示当前用...

黄昏残影
昨天
0
0
设计模式:单例模式

单例模式的定义是确保某个类在任何情况下都只有一个实例,并且需要提供一个全局的访问点供调用者访问该实例的一种模式。 实现以上模式基于以下必须遵守的两点: 1.构造方法私有化 2.提供一个...

人觉非常君
昨天
0
0
《Linux Perf Master》Edition 0.4 发布

在线阅读:https://riboseyim.gitbook.io/perf 在线阅读:https://www.gitbook.com/book/riboseyim/linux-perf-master/details 百度网盘【pdf、mobi、ePub】:https://pan.baidu.com/s/1C20T......

RiboseYim
昨天
1
0
conda 换源

https://mirrors.tuna.tsinghua.edu.cn/help/anaconda/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/conda config --add channels https://mir......

阿豪boy
昨天
1
0
Confluence 6 安装补丁类文件

Atlassian 支持或者 Atlassian 缺陷修复小组可能针对有一些关键问题会提供补丁来解决这些问题,但是这些问题还没有放到下一个更新版本中。这些问题将会使用 Class 类文件同时在官方 Jira bug...

honeymose
昨天
0
0
非常实用的IDEA插件之总结

1、Alibaba Java Coding Guidelines 经过247天的持续研发,阿里巴巴于10月14日在杭州云栖大会上,正式发布众所期待的《阿里巴巴Java开发规约》扫描插件!该插件由阿里巴巴P3C项目组研发。P3C...

Gibbons
昨天
1
0
Tomcat介绍,安装jdk,安装tomcat,配置Tomcat监听80端口

Tomcat介绍 Tomcat是Apache软件基金会(Apache Software Foundation)的Jakarta项目中的一个核心项目,由Apache、Sun和其他一些公司及个人共同开发而成。 java程序写的网站用tomcat+jdk来运行...

TaoXu
昨天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部