文档章节

Spark程序设计

o
 osc_isezqdgg
发布于 2019/09/18 17:47
字数 1762
阅读 0
收藏 0

精选30+云产品,助力企业轻松上云!>>>

 一、Spark编程模型

Spark常规步骤:

  1. 创建SparkContext对象:连接集群,提供创建RDD和广播变量等接口
  2. 输入数据:从Hadoop等外部数据源或Scala数据集创建RDD
    • 创建RDD-Scala集合/本地文件
      • 并行度3,创建3个partition
    •  HDFS

    •  文件被切分成block分布在多个节点上,通过textFile读入机器内存,转为RDD的partition对象。action触发之后,才真物理上去执行。

    • 总结
    • 外部的数据,比如HDFS上从外部来,变为RDD,不断转换也是RDD,
    • 内部的数据,比如一些变量,是Scala集合或容器,存在Driver当前APP单机的数据集,包括Broadcast变量,在单机有副本。而外部数据,最后通过collect等返回的数据,也返回到Driver单机数据
    • RDD上的数据,被每一台节点上spark的block manager管理的,里面存着。Driver节点上则知道全局RDD到底有哪些partition,究竟被每台机器怎么管理
    • broadcast变量声明初始化在Driver,可以分发到不同的节点上,节点上就有副本,供Task共享
  3. 处理:RDD上执行操作API
    • 核心API 按语义分类
  4. 输出:Action触发执行,保存结果到外部数据源或回收到Driver

 

其他[Optional]

  1. “共享变量”:Driver分发的全局变量:广播变量等
    • 声明初始化共享变量
    • 闭包传递,每个Task里都会有该变量,是和任务打包一起分发到节点上的
    • 使用broadcast,则每个Excutor只有一个,Task共享该变量,减少了数据传输,提高空间使用率

       2.Checkpoint:检查点备份

       3.Cache:缓存复用数据

       4.采样(sample):小数据集验证

       5.调试:take,foreach等

 

二、Spark优化

 通过程序或配置参数控制:

  • 控制任务并行度
  • 降低单条数据处理开销
  • 数据倾斜问题
  • RDD缓存复用
  • 操作符的选择
  • 为作业设置合理的资源

1.控制任务并行度

问题与影响

  • 数目过小:运行过慢,容易OOM
  • 数目过大:产生过多小任务,启动和调度开销较大

推荐数量

  • 每个CPU core对应2~3个Task

控制并行任务(Task)数量

  • Map任务并行度
    • 输入的Stage的Task并行度
    • 默认值:输入文件的数据块数量一致(HDFS block)
    • 通过API控制sc.textFile("input.txt", 100)
  • Reduce任务并行度
    • 默认值:使用parent RDD的partition数量
    • 通过设置配置spark.default.parallelism更改默认配置
    • 通过API控制groupByKey,reduceByKey等提供了相关参数
      • rdd.reduceByKey(_+_, 100)

 

 

 

2.降低单条记录开销

 

 

 mapPartitions

 

3.数据倾斜或任务倾斜问题

问题:某个任务T负载过大,造成拖慢整体Stage进度或Task出现异常

原因:任务T数据过多或任务T所在节点有问题

解决:

  • 数据:选择合适的Partition Key
  • 调度:spark.speculation设置为true
  • 节点:剔除所有问题节点

Stage0中下面Task数据量过大

 

 

 

可以将该Task根据随机数再切分为两个Task,三个Task处理的数据量基本就一致了

可以自定义方法解决该问题

 

 4.缓存重复使用RDD

 

 两个action

5.操作符的选择

 

 

6.为作业设置合理资源

 

 

7.监控与诊断

 

 

8.Spark参数配置方式

 

 

监控实例

作业提交到了Yarn

打开resourcemanager的端口,可看到所有正在运行或运行结束的Application

 

 点开正在运行的UserClick

 

 点击TracingUrl,进入Spark的监控界面

 

 

 

 如果是standalone模式,直接打开4040端口即可

 

三、案例

 电影受众分析

1.电影受众分析背景

 

 2.电影受众分析数据

 

用户数据:用户ID,性别,年龄,职业, 编码

电影数据:电影ID,电影名,风格

评价数据:用户ID,电影ID,评分,时间戳

 

3.电影受众分析任务

  • 看过“Sixteen Candles”用户年龄和性别分布
    • 电影受众分析数据:过滤
    • 连接
    • 分布:聚合运算,年龄和性别为key统计数量

创建Object

package org.sparkcourse.movie

import org.apache.spark._


object MovieUser{
  def main(args: Array[String]): Unit = {
    //创建SparkContext
    val master = if(args.length > 0) args(0).toString else "local"
    val datapath = if(args.length > 1) args(1).toString else "data/ml-1m"
    val conf = new SparkConf().setMaster(master).setAppName("MovieUser")
    val sc = new SparkContext(conf)
    //输入数据
    val usersRdd = sc.textFile(datapath+"/users.dat")
    val ratingsRdd = sc.textFile(datapath+"/ratings.dat")
    //抽取数据的属性,过滤符合条件的电影
    //RDD[(userId, (gender, age))]
    val users = usersRdd.map(_.split("::")).map(x=>{
      (x(0), (x(1), x(2)))
    })
    //RDD[(userID, movieID)] split返回数组
    val rating = ratingsRdd.map(_.split("::")).map(x =>
      (x(0), x(1))).filter(x => x._2.equals("2144"))
    //join两个数据集
    val userRating = rating.join(users)
    userRating.take(num=1).foreach(println(_))
    //统计分析
    val userDistribution = userRating.map(x=>{
      (x._2._2, 1)
    }).reduceByKey(_+_)
      .foreach(println(_))
    sc.stop()

  }
}

users  (userID, (gender, age))

rating  (userID, movieID)

userRating (userID, (movieID, (gender, age))) 例如,(4425, (2144, (M, 35)))

userDistribution ((gender, age), 1) 求和

 

  • 年龄段20-30的男性年轻人,最喜欢看哪10部电影
    • 年龄段:过滤
    • 最喜欢10部电影:聚合,排序
package org.sparkcourse.movie

import org.apache.spark._

import scala.collection.immutable.HashSet

object PopularMovie {
  def main(args: Array[String]): Unit = {


    val conf = new SparkConf().setMaster("local").setAppName("PopularMovie")
    val sc = new SparkContext(conf)
    //输入数据
    val usersRdd = sc.textFile("data/ml-1m/users.dat")
    val ratingsRdd =  sc.textFile("data/ml-1m/ratings.dat")
    //抽取数据和过滤
    val users = usersRdd.map(_.split("::")).map(x=>{
      (x(0), x(2)) //userid, age
    }).filter(x=>x._2.toInt>=20&&x._2.toInt<=30)
      .map(_._1)
      .collect() // 变为了Driver上单机的变量,用广播变量发出去
    val userSet = HashSet() ++ users
    val broadcastUserSet = sc.broadcast(userSet)
    //聚合和排序
    val topKMovies = ratingsRdd.map(_.split("::"))
      .map(x=>{(x(0), x(1))}) //userid, movieid
      .filter(x => {
      broadcastUserSet.value.contains(x._1)
    }).map(x=>{
      (x._2, 1)
    }).reduceByKey(_+_)
      .map(x =>(x._2, x._1))
      .sortByKey(false) // ascending=false
      .map(x=>{(x._2, x._1)})
      .take(3)
      .foreach(println(_))
  }
}

 

  • 最受欢迎的前3部电影
package org.sparkcourse.movie

import org.apache.spark._

object TopKMovie {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("TopKMovie")
    val sc = new SparkContext(conf)

    // 输入
    val ratingsRdd = sc.textFile("data/ml-1m/ratings.dat")

    // 数据抽取
    val ratings = ratingsRdd.map(_.split("::"))
      .map(x => {
        (x(0), x(1), x(2)) // userid, movieid, rating
      })

    // 分析
    val topKScoreMostMovie = ratings.map(x => {
      (x._2, (x._3.toInt, 1)) // (movieid, (rating, 1))
    }).reduceByKey((v1, v2) => { 
      (v1._1 + v2._1, v1._2 + v2._2) // {movieid: (rating之和,数量之和)}
    }).map(x => {
      (x._2._1.toFloat / x._2._2.toFloat, x._1) // (平均分, movieid)
    }).sortByKey(false)
        .take(3)
        .foreach(println(_))
    sc.stop()
  }
}

 reduceByKey((v1, v2) => { (v1._1 + v2._1, v1._2 + v2._2)})

v1和v2都是value

即对于(movieid, (rating, 1))来说,是(rating, 1)

reducByKey对相同的键的进行value的操作

reduceByKey(binary_function)
reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行binary_function的reduce操作,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

那么讲到这里,差不多函数功能已经明了了,而reduceByKey的是如何运行的呢?下面这张图就清楚了揭示了其原理:

 

它会在数据搬移以前,即在reduce之前就进行了reduce操作。

可以实现同样功能的还有GroupByKey函数,但是,groupbykey函数并不能提前进行reduce,也就是说,上面的处理过程会翻译成这样:

 

 

 

所以在处理大规模应用的时候,应该使用reduceByKey函数。

 

o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。
课时24 第六课Spark SQL程序设计与应用案例

Spark SQL程序设计基础 数据集准备 数据集 http://grouplens.org/datasets/movielens MovieLens 1M Dataset 相关数据文件 users.dat movies.dat ratings.dat Spark SQL命令访问 准备数据 ./......

刀锋
2018/12/28
0
0
【原创 Hadoop&Spark 动手实践 10】Spark SQL 程序设计基础与动手实践(下)

【原创 Hadoop&Spark 动手实践 10】Spark SQL 程序设计基础与动手实践(下) 目标: 1. 深入理解Spark SQL 程序设计的原理 2. 通过简单的命令来验证Spark SQL的运行原理 3. 通过一个完整的案...

Jonson Li
2017/05/22
0
0
Spark、Python spark、Hadoop简介

Spark、Python spark、Hadoop简介 Spark简介 1、Spark简介及功能模块 Spark是一个弹性的分布式运算框架,作为一个用途广泛的大数据运算平台,Spark允许用户将数据加载到cluster集群的内存中储...

SanFanCSgo
03/31
0
0
大数据实习之spark

Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。 与 Hadoop 和 Storm 等其他大数据和 MapReduce 技术相比,Spark 有如下优势。 首先,Spark 为我们提供了一个全面、统一...

osc_pll3h24t
2019/06/12
1
0
【转载】大数据用到的技术和软件(名词解释)

大数据学习路线 java   (Java se,javaweb) Linux   (shell,高并发架构,lucene,solr) Hadoop   (Hadoop,HDFS,Mapreduce,yarn,hive,hbase,sqoop,zookeeper,flume) 机器学习   (R,maho......

osc_es532h90
04/16
2
0

没有更多内容

加载失败,请刷新页面

加载更多

使用命名管道承载gRPC

最近GRPC很火,感觉整RPC不用GRPC都快跟不上时髦了。 gRPC设计 gRPC是一种与语言无关的高性能远程过程调用 (RPC) 框架。刚好需要使用一个的RPC应用系统,自然而然就盯上了它,但是它真能够解...

osc_nq69o22c
21分钟前
16
0
06-敏捷开发框架-apis 脚本库 引用位置无关性设计

动态引入技术的设计,对我们来说非常重要。 同时也说明动态语言的使用对我们来说也是非常重要。 没有动态语言的支撑,有些想法可能不容易实现,或者有替代方案,可能会花更大的代价。 前端开...

osc_5zg9z6t1
23分钟前
21
0
(三)学习了解OrchardCore笔记——灵魂中间件ModularTenantContainerMiddleware的第一行①的模块部分

  了解到了OrchardCore主要由两个中间件(ModularTenantContainerMiddleware和ModularTenantRouterMiddleware)构成,下面开始了解ModularTenantContainerMiddleware中间件第一行代码。   ...

osc_kdarxvx0
24分钟前
15
0
50Mn18Cr4V锻锻环件

电机无磁护环怎么锻性能才能《高高》?50Mn18Cr4V高锰无磁钢在变形温度为900~1 100℃、应变速率为0.1 ~10s-1条件下的热变形行为. 结果,VC第二相的应变诱导析出对50Mn18Cr4V的热变形行为产生...

无磁钢
24分钟前
16
0
【遇见offer】一汽-大众实习生专场来啦!成长+学习+福利,一个也不能少~

在上次一汽-大众的社招直播之后,实习生的专场招聘也终于来啦! 针对2020年暑期,我们提供了非常多的实习岗位给大家选择。 如果你想得到大厂实习的宝贵经验,如果你想得到更快速的成长,如果...

osc_b88oux8w
26分钟前
25
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部