文档章节

从零开始学习Spark--第2章 spark开发

brian_2017
 brian_2017
发布于 2017/01/17 09:37
字数 1586
阅读 8
收藏 0
1. 主要参考资料
http://spark.incubator.apache.org/docs/latest/scala-programming-guide.html
http://www.eecs.berkeley.edu/Pubs/TechRpts/2011/EECS-2011-82.pdf


2. 简介
每个Spark应用,都有一个驱动程序,它运行main函数,在集群上执行各种并行运算。Spark的最重要的抽象是RDD--Resilient Distributed Data(弹性分布式数据)。RDD存储数据,这些数据分布在Spark集群的各种节点上,这些数据可以进行并行计算。可以从一个HDFS文件或者类似的文件系统创建RDD,也可以从Scala的Collection创建RDD,也可以从另外一个RDD处理之后创建。RDD可以持久化到内存里。RDD可以自动从节点失效中恢复。


Spark第二个抽象是Shared Variables--共享变量,用于并行计算。Spark有两种共享变量:Broadcast varibles--广播变量,它在所有节点的内存里缓存一个值;Accumulators--累积量,它们只能被“加”起来,比如计数器或者是“求和”。


3. Spark程序的第一步是初始化SparkContext,它通知Spark如何获取一个集群:
    val sc = new SparkContext(master, appName, [sparkHome], [jars])


参数master指明集群的地址,是字符串,master可以是"local"--在本地单机运行,也可以是Spark或者Mesos集群的URL。
参数appName是Spark应用的名称,会在集群的web界面里显示出来。
参数sparkHome是spark的安装目录,注意,集群内所有节点的Spark必须安装在同样的目录下。
参数jars是打包后的Spark应用,是本地目录,这些Jar包会被复制发送到集群内的所有节点执行。


如果是运行Spark Shell,那么它会自从创建一个SparkContext,变量名sc,不要在Spark Shell创建新的SparkContext,不会生效的。在运行Spark Shell之前,可以指定环境变量,让Spark知道使用哪个集群,也可以用ADD_JARS环境变量把JARS添加到classpath。比如,如果想在spakr-shell在本地4核的cpu运行,需要如下方式启动:


$MASTER=local[4] ./spark-shell


这里的4,是启动4个工作线程。


如果要添加JARS,可以如下:
$MASTER=local[4]  ADD_JARS=code.jar ./spark-shell




4. Master URLs


Master的URL有4中:
local,本地,单线程
local[K],本地,K个线程
spark://HOST:PORT,在spark集群上运行。
mesos://HOST:PORT,在Mesos集群上运行。




5. RDD-弹性分布式数据
Spark以RDD为核心概念开发的,它的运行也是以RDD为中心。有两种RDD:第一种是并行Collections,它是Scala collection,可以进行并行计算;第二种是Hadoop数据集,它是并行计算HDFS文件的每条记录,凡是Hadoop支持的文件系统,都可以进行操作。这两种RDD都以同样的方式处理。


6. RDD之 并行Collections
并行Collections由SparkContext的parallelize方法,在一个已经存在的Scala collection上创建。这个collection上的成员会被copy成分布式数据库,也就是copy到所有节点,于是就可以进行并行计算了。举例如下:


#scala的collection
scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)


#并行collection
scala> val distData = sc.parallelize(data)
distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e


第一条语句创建一个Scala collection,第二条语句将它转化成并行collection。并行collection有一个重要参数,就是slices数,spark在进行计算的时候,每个slice对应一个task。通常,一个CPU对应2~4个slice。一般情况下,Sparkt会根据集群的状况,自动计算slice,也可以手动指定,比如说,paralize(data,10)就是指定了10个slice。


7. RDD之 Hadoop数据集


Spark支持在任何Hadoop能处理的文件系统上创建分布式数据集,包括本地文件系统,Amazon S3,Hypertable,HBase等等。Spark支持文本文件,序列文件,以及任何Hadoop的InputFormat。


比如,从文本文件创建数据集的方式如下:


scala> val distFile = sc.textFile("data.txt")
distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08


如果给distFile设置slice数量,形如sc.textFile("data.txt",5)。默认情况下,sparkt为data.txt的每个block块设置一个slice,注意,手工设置的slice数,只能比文件的block块数量大,不能比它小。


对于SequenceFile-序列文件,SparkContext的sequenceFile[k, v]函数将它转化成RDD。


对其他的Hadoop InputFormat,SparkContext.hadoopRDD方法处理。


8. RDD运算
RDD支持两种运算:变换transformation-从已有的RDD创建一个新的RDD,如map;或者从action中创建RDD,如reduce。


Spark的transformation都是lazy的,Spark会记下这些transformation,不立刻计算结果,直到action需要返回结果的时候再进行计算。


默认情况下,每个RDD的transformation都会重新计算,但如果将RDD用persisi持久化到内存里,或者缓存到内存里,它就不重新计算了,由此加快查询速度。


9. RDD持久化
如果一个RDD被持久化了,那么,每个节点都会存数这个RDD的所有slice,于是可以在内存进行计算,可以重用,这样可以让后来的action计算的更快,通常会把速度提高至少十倍。对迭代式计算来说,持久化非常关键。RDD的persisi方法和cache方法都可以进行持久化。RDD是容错的--如果它的任何部分丢失了,都会重新计算创建。


RDD有不同的存储方式,可以存在硬盘,或者内存,或者复制到所有节点。而chach函数只有一个默认的存储方式就是内存。


10. 共享变量-广播变量
广播变量--在集群的每个节点机器上都缓存一个只读的变量,比如说,每个节点都保存一份输入数据的只读缓存。


广播变量的使用方式:
scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)


scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)


注意:创建了广播变量之后,就不能使用v了,要使用broadcaseCar;v值不能修改。


11. 共享变量-累计量:
只要是用作计数器counter或者求和sum,只能做add运算,例子如下:
scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0


scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s


scala> accum.value
res2: Int = 10

© 著作权归作者所有

brian_2017
粉丝 3
博文 61
码字总数 145216
作品 0
私信 提问
我的第一本著作:Spark技术内幕上市!

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/anzhsoft2008/article/details/48594363 现在各大网站销售中! 京东:http://item.jd.com/11770787.html 当当...

anzhsoft
2015/09/20
0
0
【Spark亚太研究院系列丛书】Spark实战高手之路-第3章Spark架构设计与编程模型第1节②

三、你为什么需要Spark; 你需要Spark的十大理由: 1,Spark是可以革命Hadoop的目前唯一替代者,能够做Hadoop做的一切事情,同时速度比Hadoop快了100倍以上: Logistic regression in Hadoo...

Spark亚太研究院
2014/12/16
423
2
OSC 第 65 期高手问答 — Spark 企业级实战

OSCHINA 本期高手问答(3月23日-3月29日)我们请来了 @王家林 (王家林)为大家解答关于 Spark 开发方面的问题。 王 家林,Spark 亚太研究院院长和首席专家,当今云计算领域最火爆的技术Docke...

叶秀兰
2015/03/23
6.4K
22
一份顶级互联网公司实践:十万节点下的Spark灰度发布

作者介绍 郭俊,专注于大数据架构,熟悉Kafka和Flume源码,熟悉Hadoop和Spark原理,精通数据(仓)库模型设计和SQL调优。 本文转载自大数据架构订阅号,原文链接:www.jasongj.com/spark/ci_...

郭俊 Jason Guo
03/15
0
0
微软发布 .Net for Apache Spark :用什么语言开发大数据都可以

4 月 24 日,在 Spark+AI 峰会 上,我们很高兴地宣布推出 .NET for Apache Spark。Spark 是一种流行的开源分布式处理引擎,用于分析大型数据集。Spark 可用于处理批量数据、实时流、机器学习...

开源大数据EMR
05/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

rime设置为默认简体

转载 https://github.com/ModerRAS/ModerRAS.github.io/blob/master/_posts/2018-11-07-rime%E8%AE%BE%E7%BD%AE%E4%B8%BA%E9%BB%98%E8%AE%A4%E7%AE%80%E4%BD%93.md 写在开始 我的Arch Linux上......

zhenruyan
今天
5
0
简述TCP的流量控制与拥塞控制

1. TCP流量控制 流量控制就是让发送方的发送速率不要太快,要让接收方来的及接收。 原理是通过确认报文中窗口字段来控制发送方的发送速率,发送方的发送窗口大小不能超过接收方给出窗口大小。...

鏡花水月
今天
10
0
OSChina 周日乱弹 —— 别问,问就是没空

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @tom_tdhzz :#今日歌曲推荐# 分享容祖儿/彭羚的单曲《心淡》: 《心淡》- 容祖儿/彭羚 手机党少年们想听歌,请使劲儿戳(这里) @wqp0010 :周...

小小编辑
今天
1K
11
golang微服务框架go-micro 入门笔记2.1 micro工具之micro api

micro api micro 功能非常强大,本文将详细阐述micro api 命令行的功能 重要的事情说3次 本文全部代码https://idea.techidea8.com/open/idea.shtml?id=6 本文全部代码https://idea.techidea8....

非正式解决方案
今天
5
0
Spring Context 你真的懂了吗

今天介绍一下大家常见的一个单词 context 应该怎么去理解,正确的理解它有助于我们学习 spring 以及计算机系统中的其他知识。 1. context 是什么 我们经常在编程中见到 context 这个单词,当...

Java知其所以然
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部