文档章节

Spark Learning

王桥修道院副院长
 王桥修道院副院长
发布于 2017/09/06 15:41
字数 697
阅读 43
收藏 0

offcially manual : https://spark.apache.org/docs/latest/rdd-programming-guide.html

一: Spark versus Hadoop

Spark is faster than Hadoop cause hadoop execute disk io to retain failure tolerant function,whereas Spark through its functional programming .

二 :spark RDD(resilient distributed datasets)

TRANSFORMATION: LAZY to execute,like filter(),map(),flatMap() and so forth,spark could optimize chain operations,never execute intermediate process.

ACTION: EAGER to execute. like count(), foreach()  countByKey and so forth

三:spark job execution

四:COMMON USED API

transformation: groupBy   groupByKey  reduceBy reduceByKey  mapValues keys不会立即计算结果(lazy)

WikipediaRanking assignment:使用inverted index配合reduceByKey排序 比传统的遍历行查找内容aggregate速度快上一倍

package wikipedia

import java.util.stream.Collectors

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

case class WikipediaArticle(title: String, text: String) {
  /**
    * @return Whether the text of this article mentions `lang` or not
    * @param lang Language to look for (e.g. "Scala")
    */
  def mentionsLanguage(lang: String): Boolean = text.split(' ').contains(lang)
}

object WikipediaRanking {

  val langs = List(
    "JavaScript", "Java", "PHP", "Python", "C#", "C++", "Ruby", "CSS",
    "Objective-C", "Perl", "Scala", "Haskell", "MATLAB", "Clojure", "Groovy")

  val conf: SparkConf = new SparkConf().setAppName("Spark RDD").setMaster("local[*]").set("spark.executor.memory", "2g")
  val sc: SparkContext = new SparkContext(conf)

  // Hint: use a combination of `sc.textFile`, `WikipediaData.filePath` and `WikipediaData.parse`
  val wikiRdd: RDD[WikipediaArticle] = sc.textFile(WikipediaData.filePath).flatMap(lines => lines.split("\n")).map(x=>WikipediaData.parse(x))
  /** Returns the number of articles on which the language `lang` occurs.
   *  Hint1: consider using method `aggregate` on RDD[T].
   *  Hint2: consider using method `mentionsLanguage` on `WikipediaArticle`
   */
  def occurrencesOfLang(lang: String, rdd: RDD[WikipediaArticle]): Int = rdd.aggregate(0)((acc,article)=>
    if(article.mentionsLanguage(lang)) acc+1 else acc,(acc1, acc2) => (acc1 + acc2))

  /* (1) Use `occurrencesOfLang` to compute the ranking of the languages
   *     (`val langs`) by determining the number of Wikipedia articles that
   *     mention each language at least once. Don't forget to sort the
   *     languages by their occurrence, in decreasing order!
   *
   *   Note: this operation is long-running. It can potentially run for
   *   several seconds.
   */
  //Result��List(("Scala", 999999), ("JavaScript", 1278), ("LOLCODE", 982), ("Java", 42))
  def rankLangs(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] =
    langs.map((lang)=>(lang,occurrencesOfLang(lang,rdd))).sortWith((x,y)=>x._2>y._2)



  /* Compute an inverted index of the set of articles, mapping each language
   * to the Wikipedia pages in which it occurs.
   */
  def makeIndex(langs: List[String], rdd: RDD[WikipediaArticle]): RDD[(String, Iterable[WikipediaArticle])] = {
    rdd.map((w)=>(w,langs.filter((o)=>w.mentionsLanguage(o)).toList)).map(x=>x._2.map((ls)=>(ls,x._1))).flatMap(x=>x).groupByKey()
  }




  /* (2) Compute the language ranking again, but now using the inverted index. Can you notice
   *     a performance improvement?
   *
   *   Note: this operation is long-running. It can potentially run for
   *   several seconds.
   */
  def rankLangsUsingIndex(index: RDD[(String, Iterable[WikipediaArticle])]): List[(String, Int)] =
  index.map((o)=>(o._1,o._2.size)).sortBy(_._2,false).collect().toList

  /* (3) Use `reduceByKey` so that the computation of the index and the ranking are combined.
   *     Can you notice an improvement in performance compared to measuring *both* the computation of the index
   *     and the computation of the ranking? If so, can you think of a reason?
   *
   *   Note: this operation is long-running. It can potentially run for
   *   several seconds.
   */
  def rankLangsReduceByKey(langs: List[String], rdd: RDD[WikipediaArticle]): List[(String, Int)] =
    rdd.map((w)=>(w,langs.filter((o)=>w.mentionsLanguage(o)).toList)).map(x=>x._2.map((ls)=>(ls,x._1))).flatMap(x=>x).map((m)=>(m._1,1)).reduceByKey(_+_).sortBy(_._2,false).collect().toList


  def main(args: Array[String]) {

    /* Languages ranked according to (1) */
    val langsRanked: List[(String, Int)] = timed("Part 1: naive ranking", rankLangs(langs, wikiRdd))

    /* An inverted index mapping languages to wikipedia pages on which they appear */
    def index: RDD[(String, Iterable[WikipediaArticle])] = makeIndex(langs, wikiRdd)

    /* Languages ranked according to (2), using the inverted index */
    val langsRanked2: List[(String, Int)] = timed("Part 2: ranking using inverted index", rankLangsUsingIndex(index))

    /* Languages ranked according to (3) */
    val langsRanked3: List[(String, Int)] = timed("Part 3: ranking using reduceByKey", rankLangsReduceByKey(langs, wikiRdd))

    /* Output the speed of each ranking */
    println(timing)
    sc.stop()
  }

  val timing = new StringBuffer
  def timed[T](label: String, code: => T): T = {
    val start = System.currentTimeMillis()
    val result = code
    val stop = System.currentTimeMillis()
    timing.append(s"Processing $label took ${stop - start} ms.\n")
    result
  }
}

pair RDDs

© 著作权归作者所有

共有 人打赏支持
上一篇: Freemarker
王桥修道院副院长
粉丝 15
博文 56
码字总数 39619
作品 0
宁波
CTO(技术副总裁)
私信 提问
Spark+AI Summit Europe 2018 PPT下载[共95个]

为期三天的 Spark+AI Summit Europe 于 2018-10-02 ~ 04 在伦敦举行,一如往前,本次会议包含大量 AI 相关的议题,某种意义上也代表着 Spark 未来的发展方向。作为大数据领域的顶级会议,Spa...

Spark
2018/10/13
0
0
Spark和Scala的网络资源汇集

版权声明:本文为博主chszs的原创文章,未经博主允许不得转载。 https://blog.csdn.net/chszs/article/details/51194023 Spark和Scala的网络资源汇集 作者:chszs,未经博主允许不得转载。经...

chszs
2016/04/19
0
0
你不能错过的 spark 学习资源

1. 书籍,在线文档 2. 网站 3. Databricks Blog 4. 文章,博客 5. 视频

u012608836
2018/04/12
0
0
Spark的39个机器学习库-英文

Apache Spark itself 1. MLlib AMPLab Spark originally came out of Berkeley AMPLab and even today AMPLab projects, even though they are not in Apache Spark Foundation, enjoy a sta......

MoksMo
2015/11/04
0
1
在 Databricks 可获得 Spark 1.5 预览版

我们兴奋地宣布,从今天开始,Apache Spark1.5.0的预览数据砖是可用的。我们的用户现在可以选择提供集群与Spark 1.5或先前的火花版本准备好几个点击。 正式,Spark 1.5预计将在数周内公布,和社区...

stark_summer
2015/08/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

乱入Linux界的我是如何学习的

欢迎来到建哥学Linux,咳!咳!咳!开个玩笑哈,我是一个IT男,IT界的入门选手,正在学习Linux。 在之前,一直想进军IT界,学习IT技术,但是苦于没有人指导,也不知道学什么,最开始我自己在...

linux-tao
50分钟前
1
0
乱入Linux界的我是如何学习的

欢迎来到建哥学Linux,咳!咳!咳!开个玩笑哈,我是一个IT男,IT界的入门选手,正在学习Linux。 在之前,一直想进军IT界,学习IT技术,但是苦于没有人指导,也不知道学什么,最开始我自己在...

linuxprobe16
今天
9
0
OSChina 周日乱弹 —— 没时间 没头发 但有钱

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @开源中国总经办主任 :分享齐一的单曲《这个年纪》 《这个年纪》- 齐一 手机党少年们想听歌,请使劲儿戳(这里) @肿肿卷 :我真的可以睡一天...

小小编辑
今天
110
6
Django进阶 1.1 ORM基础—ORM 1.2.1 增删改查之查询 1.2.2 删改增 (1) 1.2.3 删改增 (2)

ORM基础 ORM是Django操作数据库的API,Django的作者将sql语句封装在里面供我们使用。 我们前面还提到过Django提供一个模拟数据库的工具,sqlite,供我们学习测试使用。 如果我们想使用mysql...

隐匿的蚂蚁
今天
3
0
Windows 上安装 Scala

在安装 Scala 之前需要先安装 Java 环境,具体安装的详细方法就不在这里描述了。 您可以自行搜索我们网站中的内容获得其他网站的帮助来获得如何安装 Java 环境的方法。 接下来,我们可以从 ...

honeymose
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部