文档章节

Spark中文python文档

BryanYang
 BryanYang
发布于 2015/01/22 19:29
字数 2402
阅读 6828
收藏 31

Archive

Download Spark

Speed

Run programs up to 100x faster than Hadoop MapReduce in memory, or 10x faster on disk.

外文地址:

http://spark.apache.org/docs/latest/programming-guide.html

Spark has an advanced DAG execution engine that supports cyclic data flow and in-memory computing.

在最顶层,Spark包含一个起始程序来运行用户编写的main 方法,并且执行多样的并行操作在一个集群上。其中最主要的抽象就是Spark提供一个 RDD (弹性集群数据集),是一些可以通过节点访问执行并行任务的集群的集合。RDD 通过一个Hadoop文件系统中的文件被创建(或者其它支持Hadoop文件系统的系统)。用户经常会让Spark去保留一个RDD在内存里。允许它在并行操作时被高效重用。最后,Rdd将自动从节点错误中恢复。

第二个被抽象是,在并行操作时,变量将被共享。默认的,当Spark 在不同节点上并行运行任务集合中的一个方法,它为任务中每个方法的变量都保留一个备份。有时候,一些变量需要被跨任务共享,或者两个任务间共享,或者给起始程序共享。Spark提供两种形式的变量:广播变量,可以在所有节点上缓存变量在内存里。另一个是储蓄变量,只能被增加,例如数量,或者和。

本文展示这些功能,使用每个Spark支持的语言,本文是python。如果你打开 Spark的交互shell,bin/spark-shell, 或者python的shell, bin/pyspark. 很容易学的、


Spark 所使用的数据集RDDs 是一个可以被并行操作的容错数据集。有两种方式去创建它,1,并行一个你本地存在的集合。或者引用一个额外的储存系统。例如: 一个共享的文件系统,HDFS, HBase, 或者任何提供Hadoop输入格式的资源。


并行集合通过Spark 上下文 SparkContext’s parallelize 方法被创建,基于你磁盘上一个存在的可以迭代的集合。集合中的元素通过通过复制形成一个分布式数据集,这个数据集可以被并行使用。例如。下面是一个怎样创建一个并发数据集用数字1-5:

data = [1, 2, 3, 4, 5] distData = sc.parallelize(data)

一旦被创建。这个分布式数据集可以被并发使用。例如,我们可以 distData.reduce(lambda a, b: a + b) 去相加他们。我们稍后并发操作。

并发数据集的一个重要的参数是数据集被分割的子集数量。Spark 将为每个子集运行一个任务。代表性的,你想要没给CPU运行2-4个子集。正常情况下: Spark 将试图设计你分区的数量分局你的集群。但是,你依然可以通过传递第二个参数来手动设置它。 parallelize (e.g. sc.parallelize(data, 10)). 

注意:你代码中一些地方会用到 slices (子集的别名),为了上下兼容性而存在的。

其它数据集合:

Pyspark 可以从任何被hadoop 支持的存储资源中 创建分布式数据集。包括本地文件系统,HDFS, Cassandra, HBase, Amazon S3 等。Sparks 支持文本文件,hadoop 输入格式文件。

文本文件 RDDs 可以通过 SparkContext’s textFile 方法创建这个方法使用一个URI 为这个文件,不管是本地路径,还是hdfs://s3n://, URI。将它读作行的集合。下面是一个例子:

>>> distFile = sc.textFile("data.txt")

一旦被创建。文件可以通过集合操作使用。例如,我们可以相加行的数目通过使用 map 和 reduce 方法如下:

distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b).

在用Spark读取文件时候的一些注意项:

  • 如果使用一个本地文件系统,这个文件的权限必须被开放。可以被复制或者被共享。

  • 所有的Spark 文件输入方法,包括 textFile, 也支持运行目录,压缩文件,和通配符。例如,你可以使用 textFile("/my/directory") ,textFile("/my/directory/*.txt") 和  textFile("/my/directory/*.gz");

  • textFile 方法同样使用第二个参数去控制文件被分割的子集。默认,Spark为这个文件的每个Block (在 HDFS中是64M )创建一个子集,但是你可以通过传递一个参数来设定更高的值。注意,这个值不能比默认 blocks分割的 更小。

除了 textFile , Spark 的Python API 提供了其它的数据格式:

  • SparkContext.wholeTextFiles 让你读一个包含很多小文件的目录。然后返回以键值对的方式返回它们(文件名,内容)。这个方法相对 textFile ,后者返回文件的每一行。

  • RDD.saveAsPickleFile and SparkContext.pickleFile  支持以简单python对象格式存储一个RDD 。

  • SequenceFile and Hadoop Input/Output Formats

注意,这个功能目前在实验阶段。用于高级用户。它或许被替代在将来。新的方式将采用基于 Saprk SQL 读写。那时候,Spark SQL 将是最优先的方式。

写支持:

PySpark SequenceFile 支持加载一个java 键值组成的RDD ,根据java 类型转换成可写的。当保存一个键值对组成的RDD 到 SequenceFile中, PySpark 进行这个转换。它把Python对象转成 Java对象,然后把它们转变成可写的。下面这些类型自动转换:

 

数组不被处理,用户需要去指定自定义的 ArrayWritable 子类型 当读写的时候。在写的时候,用户还需要指定自定义的转换把 数组转换成 自动以的 ArrayWritalbe 子类型。在读的时候,默认的转换将自定义的ArrayWritable 子类型转成Java 对象。然后转成Python元组。

保存和加载 SequenceFiles

和文本文件类似。SequenceFiles 通过指定路径被保存和加载。key 和 value 可以被分开。但是对于标准写来讲,不需要如此。

>>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
>>> rdd.saveAsSequenceFile("path/to/file")
>>> sorted(sc.sequenceFile("path/to/file").collect()) 
[(1, u'a'), (2, u'aa'), (3, u'aaa')]

Rdd 操作:

Rdd 支持两种形式的操作,1转换,根据现有的数据集创建一个新的。2动作,在数据集上运行一些计算后返回值。例如,map 是一个转换,通过一个方法将数据集的每个元素返回到一个rdd结果中,另一边,reduce 是一个动作,通过方法将RDDd的元素合并成一个最终结果(reduceByKey 返回一个分布式数据集)。

Spark 中所有的转换都是懒执行的。所以它们并不马上计算它们的结果。代替的,它们记着这些用于基础数据集的转换,当一个动作要求一个结果被返回来,那么才执行这些计算。这个设计可以使Spark 运行更高效。例如。我们可以实现一个通过map创建的数据集被reduce 使用然后返回一个reduce结果,而不是一个大的map过的集合。

默认,每一个被转换的RDD或许被重新计算当你每次对它使用action时。但是,你可以缓存一个RDD在内存里,通过使用 persist或者cache,方法。此时,Spark将保存这个元素在集群上,为了你下次更快使用它。也支持缓存rdds 在磁盘上。或者复制在多节点上。

lines = sc.textFile("data.txt") 
lineLengths = lines.map(lambda s: len(s)) 
totalLength = lineLengths.reduce(lambda a, b: a + b)

第一行根据一个外部文件定义了一个基础rdd。数据集不被加载到内存,除非被动作执行。lines 仅仅是一个文件的指针,第二行定义一个 lineLength 作为一个map 的转换结果。同样的,lineLength 不是立即被计算。因为懒执行。最后,我么可以reduce, 它是一个动作。 此时,Spark 分割计算成很多任务。在各个机器上运行,每个机器运行它自己部分的map和一个reduce 的结果。仅仅返回这个结果到起始程序中。

如果我们之后需要多次使用lineLength,我们可以缓存它:

lineLengths.persist()

在reduce 之前,lineLengths 将再第一次被计算出来后缓存到内存。

传递方法给Spark:

Spark 的API 特别支持传递方法给起始程序,然后在集群上运行。有三种推荐方式:

  • Lambda表达式, 对于简单的方法可以写成表达式,Lambda不支持多行方法 或者不返回值的方法。

  • 对于长一点的代码。在方法内部定义。

  • 作为一个模块。

例如,传递一个长一点的方法而不是使用lomba。看下面代码:

"""MyScript.py""" 
if __name__ == "__main__":     
    def myFunc(s):         
        words = s.split(" ")         
        return len(words)     
    sc = SparkContext(...)     
    sc.textFile("file.txt").map(myFunc)

注意,当一个对象实例方法 允许传入引用时(和单例对象相反)。需要传递这个class和方法一起:例如:

class MyClass(object):     
    def func(self, s):         
        return s     
    def doStuff(self, rdd):         
        return rdd.map(self.func)

创建了一个新的 MyClass, 然后调用 doStuff。内部的map 引用这个MyClass 实例的 func ,所以整个对象需要被传递到 集群。

类似的,外部对象的可访问领域,将引用真个对象。

class MyClass(object):     
    def __init__(self):         
        self.field = "Hello"     
    def doStuff(self, rdd):         
        return rdd.map(lambda s: self.field + x)

为了防止出错,最简单的方式是复制 field 到一个本地变量代替访问它:

def doStuff(self, rdd):     
    field = self.field     
    return rdd.map(lambda s: field + x)

待续!

© 著作权归作者所有

BryanYang
粉丝 16
博文 165
码字总数 52036
作品 0
石景山
程序员
私信 提问
加载中

评论(1)

苏生不惑
苏生不惑
SPARK 源码分析技术分享(带bilibili视频)

SPARK 源码分析技术分享 (带bilibili视频) 【本站点正在持续更新中…2018-12-05…】 SPARK 1.6.0-cdh5.15.0 Hadoop 2.6.0-cdh5.15.0 spark-scala-maven 微信(技术交流) : thinktothings SPA...

thinktothings
2018/12/02
0
0
工作日志——基于k8s搭建spark集群

使用k8s搭建spark集群 这几天尝试在k8s中搭建spark集群,踩了一些坑,跟大家分享一下。 Spark的组件介绍可参考官方文档 大数据生态圈简介可参考这篇文章 基于k8s官方示例 具体参考github k8...

店家小二
2018/12/16
0
0
Apache Spark 1.6 预览版:更简便的搜索

日前,Databricks公司发布了一个Apache Spark主要版本的可用性。除了可用性、可移植性等几个新的特性外,本次发布还提供了对尚未发布的Apache Spark 1.6预览。Databricks用户有机会在官方发布...

水门-kay
2015/12/21
2.9K
4
在Windows上build Spark

在本机(Windows 8)上安装spark玩一玩,Spark的Quick Start上没有指明具体步骤,自己Build好了之后把步骤记录一下。 首先,到Spark的官网上下载Spark的压缩包,然后解压缩到本地文件夹。我下...

wangyugod
2014/11/13
6.7K
0
微软发布 .Net for Apache Spark :用什么语言开发大数据都可以

4 月 24 日,在 Spark+AI 峰会 上,我们很高兴地宣布推出 .NET for Apache Spark。Spark 是一种流行的开源分布式处理引擎,用于分析大型数据集。Spark 可用于处理批量数据、实时流、机器学习...

开源大数据EMR
05/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

mac下redis安装、设置、启动停止

常用命令说明 redis服务器:redis-server redis客户端:redis-cli redis性能测试工具:redis-benchmark AOF文件修复工具:redis-check-aof RDB文件修复工具:redis-check-rd redis设置临时密...

botkenni
16分钟前
3
0
好程序员web前端分享HTML5常见面试题集锦四

好程序员web前端分享HTML5常见面试题集锦四 1、为什么要初始化CSS样式? 答案:因为浏览器的兼容问题,不同浏览器对有些标签的默认值是不同的,如果没对CSS初始化往往会出现浏览器之间的页面...

好程序员官方
17分钟前
2
0
CDN的网络架构是什么?

CDN网络架构主要由两大部分,分为中心和边缘两部分,中心指CDN网管中心和DNS重定向解析中心,负责全局负载均衡,设备系统安装在管理中心机房,边缘主要指异地节点,CDN分发的载体,主要由Cac...

云漫网络Ruan
17分钟前
2
0
pandas入门07---可视化

01 入门 制作提供信息的可视化是数据分析的重要任务之一。首先介绍一下matplotlib库。 import matplotlib.pyplot as pltimport numpy as npdata = np.arange(10)print(data)plt.plot(da...

筠初
18分钟前
2
0
201_PyTorch中文教程:Torch与Numpy互操作

201_PyTorch中文教程:Torch与Numpy互操作 Numpy是经典的数学计算库,Torch中的Tensor可以与之互相转换,从而可以充分利用二者的计算函数和模型,以及使用其它支持Numpy的软件库和工具。但需...

openthings
18分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部