文档章节

spark编程基础

JPblog
 JPblog
发布于 2016/07/18 14:44
字数 1050
阅读 10
收藏 0

1. RDD概念和构建

        Spark 核心的概念是Resilient Distributed Dataset (RDD):一个可并行操作的有容错机制的数据集合。有2 种方式创建RDDs:第一种是在你的驱动程序中并行化一个已经存在的集合;另外一种是引用一个外部存储系统的数据集,例如共享的文件系统,HDFS,HBase或其他Hadoop 数据格式的数据源。

并行集合

    并行集合(Parallelized collections) 的创建是通过在一个已有的集合(Scala Seq)上调用SparkContext 的parallelize方法实现的。集合中的元素被复制到一个可并行操作的分布式数据集中。

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data,2)

(Parallelize可指定第二个参数用以指定并行个数,建议指定为机器核心数的2-4倍。)

一旦创建完成,这个分布式数据集(distData)就可以被并行操作。例如,我们可以调用distData.reduce((a, b) => a + b)将这个数组中的元素相加。

外部数据集

spark 可以从任何一个Hadoop 支持的存储源创建分布式数据集,包括你的本地文件系统,HDFS,Cassandra,HBase,Amazon S3等。Spark 支持文本文件(textfiles),SequenceFiles和其他Hadoop InputFormat。文本文件RDDs 可以使用SparkContext 的textFile方法创建。

val distFile = sc.textFile("data.txt",2)
distFile: RDD[String] = MappedRDD@1d4cee08

(textFile可指定第二个参数用以指定并行个数,建议指定为机器核心数的2-4倍。)

一旦创建完成,distFiile就能做数据集操作。例如,我们可以用下面的方式使用map和reduce操作将所有行的长度相加:distFile.map(s => s.length).reduce((a, b) => a + b)。

 

2. transformation操作

map

val num1 = sc.parallelize(List(1, 2, 3, 4), 2)  //定义一个列表形式的RD
val mapRDD1=num1.map(number=>number*2) //转换成原来的两倍
val mapRDD2=num1.map(x=>(x,1)) //转换成key-value形式
println(mapRDD1.collect.mkString(","))
println(mapRDD2.collect.mkString(","))

    结果:2,4,6,8

              (1,1),(2,1),(3,1),(4,1)

flatMap

val num1 = sc.parallelize(List(1, 2, 3, 4), 2)
val flatMapRDD=num1.flatMap(x=>1 to x)
println(flatMapRDD.collect.mkString(","))

    结果:1,1,2,1,2,3,1,2,3,4

filter

val str = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "lion"), 2)
val filterRDD=str.filter(word=>word.contains("i"))
println(filterRDD.collect.mkString(","))

    结果:tiger,lion,lion

union

val num1 = sc.parallelize(List(1, 2, 3, 4), 2)
val num2 = sc.parallelize(List(4, 3, 8, 7), 2)
val unionRDD=num1.union(num2)
println(unionRDD.collect.mkString(","))

    结果:1,2,3,4,4,3,8,7

intersection

val num1 = sc.parallelize(List(1, 2, 3), 2)
val num2 = sc.parallelize(List(4, 3, 8), 2)
val intersectionRDD=num1.intersection(num2)
println(intersectionRDD.collect.mkString(","))
    结果:4,3

distinct

val str = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "lion"), 2)
val distinctRDD=str.distinct
println(distinctRDD.collect.mkString(","))

    结果:dog, tiger, lion, cat, panther

groupByKey

val str = sc.parallelize(List(("pet","dog"), ("animal","tiger"), ("animal","lion"), ("pet","cat")), 2)
 val groupByKeyRDD=str.groupByKey()
 println(groupByKeyRDD.collect.mkString(","))
    结果:(animal,CompactBuffer(tiger, lion)),(pet,CompactBuffer(dog, cat))

reduceByKey

val kv = sc.parallelize(List(("dog",1), ("tiger",1), ("lion",2), ("dog",2)), 2)
val reduceByKeyRDD=kv.reduceByKey(_+_)
println(reduceByKeyRDD.collect.mkString(","))

    结果:(dog,3),(lion,2),(tiger,1)

sortByKey

val kv = sc.parallelize(List(("dog",1), ("tiger",1), ("lion",2), ("dog",2)), 2)
val sortByKeyRDD=kv.sortByKey()
println(sortByKeyRDD.collect.mkString(","))
    结果:(dog,1),(dog,2),(lion,2),(tiger,1)

coGroup

val str = sc.parallelize(List(("pet","dog"), ("animal","tiger"), ("animal","lion"), ("pet","cat")), 2)
val str1 = sc.parallelize(List(("pet","bird"), ("animal","sheep"), ("animal","cow"), ("pet","snake")), 2)
val coGroupRDD=str.cogroup(str1)
coGroupRDD.foreach(println)
    结果:(animal,(CompactBuffer(tiger, lion),CompactBuffer(sheep, cow)))
             (pet,(CompactBuffer(dog, cat),CompactBuffer(bird, snake)))


join

val array1 = sc.parallelize(List((1,2),(3,8),(5,9),(7,6)), 2)
val array2 = sc.parallelize(List((1,4),(2,8),(7,9),(9,6)), 2)
val joinRDD=array1.join(array2)
println(joinRDD.collect().mkString(","))
    结果:(1,(2,4)),(7,(6,9))

cartesian

val num1 = sc.parallelize(List(1, 2, 3), 2)
val num2 = sc.parallelize(List(4, 3, 8), 2)
val cartesianRDD=num1.cartesian(num2)
println(cartesianRDD.collect.mkString(","))
    结果:(1,4),(1,3),(1,8),(2,4),(3,4),(2,3),(2,8),(3,3),(3,8)

 

3. action操作

reduce

val num1 = sc.parallelize(List(1, 2, 3), 2)
val reduceNUM = num1.reduce((x, y) => x + y)
println(reduceNUM)

    结果:6

collect

val num1 = sc.parallelize(List(1, 2, 3), 2)
println(num1.collect.mkString(","))  //输出逗号分隔的num1中的元素

    结果:1,2,3

count

val num1 = sc.parallelize(List(1, 2, 3), 2)
val countNUM = num1.count()
println(countNUM)

    结果:3

first

val num1 = sc.parallelize(List(1, 2, 3), 2)
val firstNUM=num1.first()
println(firstNUM)

    结果:1

take

val num1 = sc.parallelize(List(1, 2, 3), 2)
val takeNUM=num1.take(2)
println(takeNUM.mkString(“,”))  //输出逗号分隔的元素集合

    结果:1,2

takeOrdered

val num2 = sc.parallelize(List(4, 3, 8), 2)
val takeOrderedNUM=num2.takeOrdered(2)
println(takeOrderedNUM.mkString(","))

    结果:3,4

saveAsTextFile

val str = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "lion"), 2)
str.saveAsTextFile("/result")

    结果:

        [root@localhost result]# ls -l

        part-00000    part-00001    _SUCCESS

        [root@localhost result]# cat part-00000

        dog

        tiger

        lion

countByKey

val str = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "lion"), 2)
val countByKeyNUM=str.countByValue()
println(countByKeyNUM.mkString(","))

    结果:lion -> 2,tiger -> 1,dog -> 1,panther -> 1,cat -> 1

foreach

val num1 = sc.parallelize(List(1, 2, 3), 2)
num1.foreach(println)

    结果:1

              2

              3

 

© 著作权归作者所有

共有 人打赏支持
上一篇: spark RDD持久化
下一篇: sparkCore RDD join
JPblog
粉丝 14
博文 65
码字总数 42633
作品 0
朝阳
程序员
私信 提问
18小时内掌握Spark,全面提升Spark技能!

伴随着大数据相关技术和产业的逐步成熟,继Hadoop之后,Spark技术以其无可比拟的优势,发展迅速,将成为替代Hadoop的下一代云计算、大数据核心技术。   Spark是基于内存,是云计算领域的继...

Spark亚太研究院
2014/06/12
62
0
Spark2.1.0之基础知识

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80303035 在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文...

泰山不老生
2018/05/24
0
0
Spark 2.0 时代全面到来 —— 2.0.1 版本发布

距离Spark 2.0.0发布两个月后,Spark 2.0.1版本发布了,这是一个修正版本,共处理了300多个Issue,涉及spark稳定性和bug等方面的修复 ,它的发布意味着Spark 2.0接近生产环境使用要求,想要尝...

达尔文
2016/10/08
13.1K
22
如何成为云计算大数据Spark高手

Spark采用一个统一的技术堆栈解决了云计算大数据的如流处理、图技术、机器学习、NoSQL查询等方面的所有核心问题,具有完善的生态系统,这直接奠定了其一统云计算大数据领域的霸主地位; 要想...

Spark亚太研究院
2014/06/13
2K
3
# Apache Spark系列技术直播# 第五讲【 Spark RDD编程入门 】

内容提要:本次讲座主要涵盖Spark RDD编程入门基础,包括: Spark、RDD简介 RDD API简介 打包与spark-submit 性能分析与调优基础 主讲人:王道远(健身) 阿里巴巴计算平台EMR技术专家 直播时间...

spark小能手
2018/12/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

dockerfile 镜像构建(1)

通用dockerfile 利用已经编译好的.jar 来构建镜像。要构建的目录如下: [root@iZuf61quxhnlk9m2tkx16cZ demo_jar]# docker build -t demo:1 . 运行镜像: [root@iZuf61quxhnlk9m2tkx16cZ de...

Canaan_
14分钟前
0
0
Redis radix tree源码解析

Redis实现了不定长压缩前缀的radix tree,用在集群模式下存储slot对应的的所有key信息。本文将详述在Redis中如何实现radix tree。 核心数据结构 raxNode是radix tree的核心数据结构,其结构体...

阿里云云栖社区
17分钟前
4
0
vue import 传入变量

在做动态添加component的时候,传入变量就会报错,出现以下错误信息: vue-router.esm.js?fe87:1921 Error: Cannot find module '@/components/index'. at eval (eval at ./src/components ......

朝如青丝暮成雪
19分钟前
0
0
Flutter开发 Dio拦截器实现token验证过期的功能

前言: 之前分享过在Android中使用Retrofit实现token失效刷新的处理方案,现在Flutter项目也有“token验证过期”的需求,所以接下来我简单总结一下在Flutter项目中如何实现自动刷新token并重...

EmilyWu
20分钟前
5
0
final Map可以修改内容,final 常量不能修改

1.final Map 可以put元素,但是不可以重新赋值 如: final Map map = new HashMap(); map = new HashMap();//不可以 因为栈中变量map引用地址不能修改 2.final str = “aa”; str = "bb";/......

qimh
23分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部