文档章节

Spark Python 常用API

AllenOR灵感
 AllenOR灵感
发布于 2017/09/10 01:26
字数 2873
阅读 0
收藏 0
点赞 0
评论 0

该文主要学习一下《Spark快速大数据分析》这本书,然后记录了一些常用的Python接口,完整版接口点击这里


Spark中的RDD就是一个不可变的分布式对象集合,每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上。用户可以使用两种方法创建RDD:读取一个外部数据集,或在驱动器程序里分发驱动器程序中的对象集合(比如 list 和 set)。创建出来之后,RDD支持两种类型的操作:转化操作(transformation)和行动操作(action)。转化操作会由一个RDD生成一个新的RDD,比如 filter() 函数。行动操作会对RDD计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如HDFS)中,比如 first() 函数。如果,你对于当前的函数无法判断是转化操作还是行动操作时,你可以看该函数的返回值是什么,如果是一个RDD,那么该函数就是一个转化操作,如果是其他的数据类型,那么该函数就是一个行动操作。


有关转化操作的API


filter()函数

函数例子:

errorsRDD = inputRDD.filter(lambda x: "error" in x)

# 或者
def hasError(line):
  return 'error' in line

errorsRDD = inputRDD.filter(hasError)

这个API的作用是挑选出包含 error 的内容。注意,filter()操作不会改变已有的 inputRDD 中的数据。实际上,该操作会返回一个全新的RDD。


union()函数

函数例子:

errorsRDD = inputRDD.filter(lambda line : "error" in line)
warningRDD = inputRDD.filter(lambda line : "warning" in line)
badLinesRDD = errorsRDD.union(warningRDD)

这个API的作用是计算两个RDD的并集。如果两个RDD之间有重复的元素,那么在新生成的RDD中也会包含重复的元素。


intersection()函数

函数例子:

inputRDD = sc.parallelize([1,2,3,4,5,6,7,8,9])
a = inputRDD.filter(lambda x : x % 2 == 0) # 2,4,6,8
b = inputRDD.filter(lambda x : x > 5) # 6,7,8,9
c = a.intersection(b) # 8,6

这个API的作用是计算两个RDD的交集。该API在运行时也会去掉所有重复的元素(单个RDD内的重复元素也会一起移除)。尽管intersection()与union()的概念相似,intersection()的性能却要差很多,因为它需要通过网络混洗数据来发现共有的元素。


subtract()函数

函数例子:

inputRDD = sc.parallelize([1,2,3,4,5,6,7,8,9])
a = inputRDD.filter(lambda x : x % 2 == 0) # 2,4,6,8
b = inputRDD.filter(lambda x : x > 5) # 6,7,8,9
c = a.subtract(b) # 2,4

这个API的作用是计算两个RDD的差集,即返回一个由只存在于第一个RDD中而不存在与第二个RDD中的所有元素组成的RDD。和intersection()一样,该API也需要进行数据混洗。


cartesian()函数

函数例子:

inputRDD = sc.parallelize([1,2,3,4,5,6,7,8,9])
a=sc.parallelize(['a','b','c','d'])
b = inputRDD.filter(lambda x : x > 5) # 6,7,8,9
c = a.cartesian(b) 
# output
[('a', 6), ('a', 7), ('a', 8), ('a', 9), ('b', 6), ('b', 7), ('b', 8), ('b', 9), ('c', 6), ('c', 7), ('c', 8), ('c', 9), ('d', 6), ('d', 7), ('d', 8), ('d', 9)]

这个API的作用是计算两个RDD的笛卡尔积。该API转化操作会返回所有可能的(a,b)对,其中a是源RDD中的元素,而b则是来自另一个RDD。笛卡尔积在我们希望考虑所有可能的组合的相似度时比较有用,比如计算各用户对各种产品的预期兴趣程度。我们也可以求一个RDD与其自身的笛卡尔积,这可以用于求用户相似度的应用中。不过要特别注意的是,求大规模RDD的笛卡尔积开销巨大。


map()函数

函数例子:

doubleRDD = inputRDD.map(lambda x: x * 2)

这个API的作用是遍历inputRDD中所有的元素,然后返回的新的RDD中的元素是原来的两倍。


flatMap()函数

函数例子:

inputRDD = sc.parallelize(['i love you', 'hello world'])
outputRDD = inputRDD.flatMap(lambda x: x.split(' '))
print outputRDD.count() # 5

这个API的作用是被应用到输入inputRDD中每个元素上,不过返回的不是一个一个元素,而是一个返回值序列的迭代器。输出的RDD倒不是由迭代器组成的。我们得到的是一个包含各个迭代器可访问的所有元素的RDD。


distinct()函数

函数例子:

inputRDD = sc.parallelize([2,4,3,1,2,3,3,2,1,3,4,2,3,1,4])
distinctRDD = inputRDD.distinct()
dictinctRDD.collect()   # 1,2,3,4

这个API的作用是来生成一个只包含不同元素的新RDD。不过由于该操作需要对所有数据通过网络进行混洗(shuffle),所有这个操作非常消耗时间。


sample()函数

函数例子:

inputRDD = sc.parallelize([1,2,3,4,5,6,7,8,9,0])
sampleRDD = inputRDD.sample(False, 0.5)
# 2,3,4,7
sampleRDD = inputRDD.sample(True, 0.5)
# 2,2,2,5,6,6

这个API的作用是来随机采集RDD中的数据,第一个参数表示RDD中的元素是否可以被重复采集,如果True,那么表示可以重复采集。第二个参数是元素是否被采集的概率,取值范围必须是 [0,1]


reduceByKey()函数

函数例子:

inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
outputRDD = inputRDD.reduceByKey(lambda x,y: x+y)
# output
[(1, 2), (3, 10)]

这个API的作用是来合并具有相同键的值。


groupByKey()函数

函数例子:

inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
outputRDD = inputRDD.groupByKey(lambda x,y: x+y)
# {(1, [1]), (3,[4,6])}
for (i,j) in outputRDD.collect():
  for item in j:
    print item
# output item
<pyspark.resultiterable.ResultIterable object at 0x110a7ec90>
2
<pyspark.resultiterable.ResultIterable object at 0x110a7ed50>
4
6

这个API的作用是对具有相同键的值进行分组。


mapValues()函数

函数例子:

inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
outputRDD = inputRDD.mapValues(lambda x: x+1)
# {(1, [1]), (3,[4,6])}
for (i,j) in outputRDD.collect():
  for item in j:
    print item
# output item
<pyspark.resultiterable.ResultIterable object at 0x110a7ec90>
2
<pyspark.resultiterable.ResultIterable object at 0x110a7ed50>
4
6

这个API的作用是对具有相同键的值进行分组。


flatMapValues()函数

函数例子:

inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
outputRDD = inputRDD.flatMapValues(lambda x: range(x, 6))
# output
[(1, 2), (1, 3), (1, 4), (1, 5), (3, 4), (3, 5)]

这个API的作用是对pair RDD中的每个值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录。通常用于符号化。


keys()函数

函数例子:

inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
outputRDD = inputRDD.keys()
# output
[1,3,3]

这个API的作用是返回一个仅包含键的RDD。


values()函数

函数例子:

inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
outputRDD = inputRDD.values()
# output
[2,4,6]

这个API的作用是返回一个仅包含值的RDD。


sortByKey()函数

函数例子:

inputRDD = sc.parallelize([(11,2),(13,4),(3,6)])
outputRDD = inputRDD.sortByKey()
# output
[(3,6),(11,2),(13,4)]

这个API的作用是返回一个根据键排序的RDD。


combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)函数

函数例子:

inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
outputRDD = inputRDD.combineByKey(
  (lambda x: (x, 1)),
  (lambda x, y: (x[0] + y, x[1] + 1)),
  (lambda x, y: (x[0] + y[0], x[1] + y[1]))
)
# output
{[(1, (2, 1)), (3, (10, 2))]}

要理解combineByKey(),要先理解它在处理数据时是如何处理每个元素的。由于combineByKey()会遍历分区中的所有的元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同。
如果这是一个新的元素,combineByKey()会使用一个叫作createCombiner()的函数来创建那个键对应的累加器的初始值。需要注意的是,这一过程会在每个分区中第一次出现各个键时发生,而不是在整个RDD中第一次出现一个键时发生。
如果这是一个在处理当前分区之前已经遇到的值,它会使用mergeValue()方法将该键的累加器对应的当前值与这个新的值进行合并。
由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器,如果有两个或者更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法将各个分区的结果进行合并。


有关行动操作的API


top()函数

函数例子:

outputdata = inputRDD.top(10)
for line in outputdata:
  print line

这个API的作用是返回inputRDD最前面的K个元素,返回的数据类型是一个list,长度是K。


take()函数

函数例子:

outputdata = inputRDD.take(10)
for line in outputdata:
  print line

这个API的作用是返回inputRDD中的K个元素,返回的数据类型是一个list,长度是K。


first()函数

函数例子:

inputRDD.first()

这个API的作用是返回inputRDD最前面的元素,返回的数据类型是一个字符串,编码是Unicode编码。


collect()函数

函数例子:

outputdata = inputRDD.collect()
for line in outputdata:
  print line

这个API的作用是返回inputRDD中所有的元素,返回的数据类型是一个list。注意,这个API只能在小数据上面使用,如果数据量太大,非常消耗时间和内存。


count()函数

函数例子:

len = inputRDD.count()
print len

这个API的作用是返回inputRDD中元素的个数。


reduce()函数

函数例子:

inputRDD = sc.parallelize([1,2,3,4,5,6,7,8,9])
output = inputRDD.reduce(lambda x,y : x+y)
# output
45

这个API的作用是接收一个函数作为参数,这个函数要操作两个RDD的元素类型的数据并返回一个同样类型的新元素。


takeSample(withReplacement, num, seed = None)函数

函数例子:

inputRDD = sc.parallelize(range(10))
output = inputRDD.takeSample(True, 20)
# output
[8, 5, 5, 7, 7, 6, 3, 1, 0, 7, 5, 5, 4, 3, 3, 4, 8, 2, 7, 4]
output = inputRDD.takeSample(False, 5)
# output
[2, 9, 7, 8, 0]

这个API的作用是返回一个指定长度的子集。如果 withReplacement 是True,那么返回的元素可以是重复采集的。


countByValue()函数

函数例子:

inputRDD = sc.parallelize(['a','b','c','d'])
output = inputRDD.countByValue()
# output
{'a': 1, 'c': 1, 'b': 1, 'd': 1}

这个API的作用是计算各元素在RDD中出现的次数。


substractByKey()函数

函数例子:

inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
otherRDD = sc.parallelize([(3,9)])
output = inputRDD.subtractByKey(otherRDD)
# output
[(1,2)]

这个API的作用是删除inputRDD中键与otherRDD中的键相同的元素。


join()函数

函数例子:

inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
otherRDD = sc.parallelize([(3,9)])
output = inputRDD.join(otherRDD)
# output
[(3, (4, 9)), (3, (6, 9))]

这个API的作用是对两个RDD进行内连接。


cogroup()函数

函数例子:

inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
otherRDD = sc.parallelize([(3,9)])
output = inputRDD.cogroup(otherRDD)
# output
{(1, ([2], [])), (3, ([4, 6], [9]))}

这个API的作用是将两个RDD中拥有相同键的数据分组到一起。


rightOuterJoin()函数

函数例子:

inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
otherRDD = sc.parallelize([(3,9)])
output = inputRDD.rightOuterJoin(otherRDD)
# output
[(3, (4, 9)), (3, (6, 9))]

这个API的作用是对两个RDD进行连接操作,确保第一个RDD的键必须存在(右外连接)。


leftOuterJoin()函数

函数例子:

inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
otherRDD = sc.parallelize([(3,9)])
output = inputRDD.leftOuterJoin(otherRDD)
# output
[(1, (2, None)), (3, (4, 9)), (3, (6, 9))]

这个API的作用是对两个RDD进行连接操作,确保第二个RDD的键必须存在(左外连接)。


countByKey()函数

函数例子:

inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
output = inputRDD.countByKey()
# output
{1: 1, 3: 2}

这个API的作用是对每个键对应的元素分别计数。


collectAsMap()函数

函数例子:

inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
output = inputRDD.collectAsMap()
# output
{1: 2, 3: 6}

这个API的作用是将结果以映射表的形式返回,以便查询。但如果RDD中,同一个key存在多个value,那么后面的value将会覆盖掉前面的value,最终得到的key就是唯一的,而且对应一个value。


lookup()函数

函数例子:

inputRDD = sc.parallelize([(1,2),(3,4),(3,6)])
output = inputRDD.lookup(3)
# output
[4, 6]

这个API的作用是返回给定键对应的所有值。


从Hive中读取数据


from pyspark.sql import HiveContext

hiveCtx = HiveContext(sc)
rows = hiveCtx.sql("select name, age from users")
firstRow = rows.first()
print firstRow.name

Python向spark传递函数有三种方法


# 方法一
word = rdd.filter(lambda s: "error" in s)

# 方法二
def containsError(s):
  return "error" in s
word = rdd.filter(containsError)

# 方法三
class WordFunctions(object):
  ...
  def getMatchesNoReference(self, rdd):
    # 安全方式:只把需要的字段提取到局部变量中
    query = self.query
    return rdd.filter(lambda x: query in x)

本文转载自:http://www.jianshu.com/p/4a2a6c336372

共有 人打赏支持
AllenOR灵感
粉丝 10
博文 2634
码字总数 82983
作品 0
程序员
windows 安装 spark 及 pycharm 调试 TopN 实例

首先声明本文搭建的环境为:windows8.1 + spark1.6.0 + python2.7 + jdk8,spark on windows 对 windows及python版本不怎么挑,但是对 spark 版本要求极其苛刻,比如 spark1.6.1 就无法运行。...

大数据之路
2012/06/28
0
0
Spark 入门(Python、Scala 版)

本文中,我们将首先讨论如何在本地机器上利用Spark进行简单分析。然后,将在入门级水平探索Spark,了解Spark是什么以及它如何工作(希望可以激发更多探索)。最后两节将开始通过命令行与Spa...

大数据之路
2015/05/07
0
0
pycharm pyspark 配置

1 、安装了pycharm,下载spark(官网下载,我下的是spark-2.1.1-bin-hadoop2.7.tgz,解压缩后为文件夹spark-2.1.1-bin-hadoop2.7,我将文件放在了/Applications/spark/下,这个文件夹里面有p...

张欢19933
05/09
0
0
Ubuntu下Spark开发环境搭建

Ubuntu 64基本环境配置 安装JDK,下载jdk-8u45-linux-x64.tar.gz,解压到/opt/jdk1.8.045 下载地址:http://www.oracle.com/technetwork/java/javase/downloads/index.html 安装scala,下载s......

张国凯
2015/05/21
0
0
你不能错过的 spark 学习资源

1. 书籍,在线文档 2. 网站 3. Databricks Blog 4. 文章,博客 5. 视频

u012608836
04/12
0
0
慕课网Spark SQL日志分析 - 5.DateFrame&Dataset

5.DateFrame&Dataset 1.DateFrame产生背景 DataFrame 不是Spark Sql提出的。而是在早起的Python、R、Pandas语言中就早就有了的。 Spark诞生之初一个目标就是给大数据生态圈提供一个基于通用语...

Meet相识_bfa5
07/12
0
0
使用PySpark编写SparkSQL程序查询Hive数据仓库

作业脚本采用Python语言编写,Spark为Python开发者提供了一个API-----PySpark,利用PySpark可以很方便的连接Hive 下面是准备要查询的HiveSQL 下面是准备提交的Python脚本 脚本开头指定utf8编...

teaGod
04/11
0
0
Comprehensive Introduction to Apache Spark

Introduction Industry estimates that we are creating more than 2.5 Quintillion bytes of data every year. Think of it for a moment – 1 Qunitillion = 1 Million Billion! Can you i......

grasp_D
06/15
0
0
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
0
1
打造基于hadoop的网站日志分析系统(5)之spark在日志分析系统里的简单应用

1.下载spark和运行 wget http://apache.fayea.com/apache-mirror/spark/spark-1.0.0/spark-1.0.0-bin-hadoop2.tgz 我这里下载的是1.0.0版,由于我们只是测试spark的用法所以不需要配置spark集...

豚鼠窝窝
2014/07/10
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

conda 换源

https://mirrors.tuna.tsinghua.edu.cn/help/anaconda/ conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/conda config --add channels https://mir......

阿豪boy
5分钟前
0
0
Confluence 6 安装补丁类文件

Atlassian 支持或者 Atlassian 缺陷修复小组可能针对有一些关键问题会提供补丁来解决这些问题,但是这些问题还没有放到下一个更新版本中。这些问题将会使用 Class 类文件同时在官方 Jira bug...

honeymose
14分钟前
0
0
设计模式:代理模式

代理模式可以分为三种:静态代理,动态代理,cglib代理 1.静态代理:被代理的类需要实现一接口或是继承一父类 委托类(被代理的类): package com.java.pattern.proxy.staticdemo;publ...

人觉非常君
17分钟前
0
0
非常实用的IDEA插件之总结

1、Alibaba Java Coding Guidelines 经过247天的持续研发,阿里巴巴于10月14日在杭州云栖大会上,正式发布众所期待的《阿里巴巴Java开发规约》扫描插件!该插件由阿里巴巴P3C项目组研发。P3C...

Gibbons
23分钟前
0
0
Tomcat介绍,安装jdk,安装tomcat,配置Tomcat监听80端口

Tomcat介绍 Tomcat是Apache软件基金会(Apache Software Foundation)的Jakarta项目中的一个核心项目,由Apache、Sun和其他一些公司及个人共同开发而成。 java程序写的网站用tomcat+jdk来运行...

TaoXu
23分钟前
0
0
TensorFlow,从一个 Android Demo 开始

TensorFlow Android Demo 项目地址 Machine Learning 既然提到了 TensorFlow,那是不是得神经网络、机器学习了解下? 如果你能坚持把 机器学习速成课程 给啃完了,觉得还挺有兴趣的,那可以考...

孟飞阳
25分钟前
0
0
JVM学习笔记二:内存结构规范

1、JVM基本结构图 2、java堆(Heap) 3、方法区(Method Area) 4、程序计数器 5、JAVA栈图解 局部变量表:八大基本类型,还可以存储引用类型 上一篇:JVM学习笔记一:类加载机制介绍...

刘祖鹏
30分钟前
0
0
mui集成微信H5支付(返回白屏问题已经解决)

一.项目需求 因为公司人员缺少,没有专门开发安卓和ios的人员,为了项目尽早上线采用了混合APP开发的方式,我选择了MUI混合开发框架,项目中需要在用户购买VIP会员的时候进行支付,所以需要在项目...

银装素裹
34分钟前
0
0
SpringBoot集成Redis--配置自定义的RedisCacheManager

配置自定义的RedisCacheManager--1自定义键生成规则 默认的键生成器 当不指定缓存的key时,SpringBoot会使用SimpleKeyGenerator生成key。 SimpleKeyGenerator SimpleKey 查看源码可以发现,它...

karma123
53分钟前
0
0
防火墙未来的发展趋势在哪里?

导读 防火墙(Firewall),也称防护墙,是由Check Point创立者Gil Shwed于1993年发明并引入国际互联网。当下互联网时代,无论是大小企业,大部分都会部署有防火墙的设备,但这些防火墙往往并不...

问题终结者
56分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部