文档章节

spark程序性能调优实践

jhonephone
 jhonephone
发布于 2015/05/18 16:49
字数 898
阅读 47
收藏 0

    初学者刚开始写spark程序的时候,往往只注重实现相应的功能,而容易忽略采用何种实现方式能够实现最高的效率。本文后面讲详细阐述作者在实际项目中遇到的spark程序调优问题。

    1. 下面这段代码的背景是这样的,panelFeatureMid1类型为RDD[(String, (scala.collection.mutable.HashMap[String,Double], (Option[String], Option[String])))],表示一个UUID(可以理解为cookie id)上出现的SPID(可以理解为网页上某个监测点号)及其频次的HashMap统计结果,后面的两个Option[String]中一个表示的是该UUID的人群属性信息(例如性别,年龄,教育程度,收入),另一个可以忽略。

    现在需要以SPID及其频次为特征,训练人群属性的分类器。这就需要对SPID进行编号,下面的一段程序就是实现这个功能。

        //step2: broadcast all unique spid and index them
        val spidSet = panelFeatureMid1.mapPartitions(tp => {
           val spidSet = tp.foldLeft(HashSet.empty[String])((s, elem) => s.union(elem._2._1.keySet))
           spidSet.map(s => (s, 1)).toIterator
           }).reduceByKey((a, b) => 1).map(tp => tp._1).collect
        println("spidSet num:" + spidSet.size)
        val indexSet = sc.broadcast(spidSet)
        val panelFeatureMid = panelFeatureMid1.mapPartitions(tp => 
          {var i = -1;
           val indexMap = indexSet.value.foldLeft(HashMap.empty[String, Int])((s, elem) => {i = i+1; s.put(elem, i); s});
           val out = tp.map(s => (s._1, (s._2._2, s._2._1.map(spid => (indexMap.get(spid._1).get, spid._2) ))))
           out.toIterator
           }).cache

       个人经验:a. 在后面多个task中将使用到的RDD,需要调用cache函数保存在内存中

                     b. 对于规模不大,而又需要全局使用的数据集,可以作为广播变量broadcast出去

                     c. 在涉及需要全局数据参与map操作过程时,尽量使用mapPartitions

    2. 在spark程序中,需要特别留意的是需要进行IO shuffle的操作,因为shuffle操作将导致RDD数据的网络IO,非常耗时。而其中join操作(包括left,right,full各种join)尤其令初学者容易陷入耗时的shuffle操作中,不能自拔,而对其产生畏惧。

        下面的这段代码的背景是这样的:panel和cookieMapping都是RDD[(String, String)]类型,第一个String表示的是UUID(同上),第二个String分别表示人群属性和cookie mapping属性,需要将同一个UUID的人群属性和cookie mapping属性连接到一起,即对它们做full join。

        val panel = getPanel(sc, panelDir + "/l" + month).repartition(numPartitions)
        val cookieMapping = getCookieMapping(sc, new             StringBuilder(cookieMappingDir).append("/").append(month).append("/*").toString)
             .repartition(numPartitions)
        //step 2: panel full outer join cookie mapping data, repartition and cache
        val total = cookieMapping.fullOuterJoin(panel).cache


       个人经验:a. 在进行join操作前,尽量确保参与join操作的两个RDD的分区数量相同,这样可以避免无谓的shuffle操作,同时在groupByKey和reduceByKey等操作中,也提供了分区数量参数,在这里设置分区数量,可以省略额外的repartition操作,如下例:

        val uuids = sc.broadcast(total.map(tp => tp._1).collect.toSet)
        println("uuids number = " + uuids.value.size)
        println("driver after collect freeMem = " + Runtime.getRuntime().freeMemory() + " totalMem = " + Runtime.getRuntime().totalMemory())
        val uuidSpid = sc.newAPIHadoopFile(new StringBuilder(monitorEtlDir).append("/").append(month).append("*/campaign*").toString,
        classOf[MzSequenceFileInputFormat], classOf[LongWritable], classOf[Text])
        .mapPartitions(tp => {
          val uuidSet = uuids.value
 val freeMem = Runtime.getRuntime().freeMemory()
          val totalMem = Runtime.getRuntime().totalMemory()
          println("map freeMem = " + freeMem + " totalMem = " + totalMem)
          tp.flatMap(ts => {val items = ts._2.toString().split("\\^").map(s => {val kv = s.split("=")
      (kv(0), kv(1))}).toMap
      val uuid = items("uuid");
      if(uuidSet.contains(uuid)) Iterator((uuid, items.get("p")))
      else Iterator()})
        }).groupByKey(numPartitions)
    .map(tp => { val spidPv = tp._2.foldLeft(HashMap.empty[String, Double])(
    (pv, p) => {val spid = p.get;
        if(spid != null){
        if(pv.contains(spid))
        pv.put(spid, pv.get(spid).get+1)
        else pv.put(spid, 1)}
        pv })
    (tp._1, spidPv) })
    
    val panelFeatureMid1 = uuidSpid.join(total)

由于map,mapPartitions等操作不改变分区数量,所以这里可以确保做join时的两个RDD分区数量相同

© 著作权归作者所有

jhonephone
粉丝 5
博文 19
码字总数 24506
作品 0
合肥
高级程序员
私信 提问
spark submit参数及调优

spark submit参数介绍 你可以通过spark-submit --help或者spark-shell --help来查看这些参数。 使用格式: ./bin/spark-submit --class --master --deploy-mode --conf = ... # other option......

citibank
2018/07/17
0
0
Spark 通用的性能配置方法:内存和CPU的配置

前言 本文主要介绍关于通过配置Spark任务运行时的内存和CPU(Vcore)来提升Spark性能的方法。通过配置内存和CPU(Vcore)是比较基础、通用的方法。本文出现的Demo以X-Pack Spark数据工作台为...

云hbase+spark
07/08
0
0
Spark性能优化篇一:资源调优

Spark性能优化篇一:资源调优 所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。以下参...

u010262291
2018/05/30
0
0
18小时内掌握Spark,全面提升Spark技能!

伴随着大数据相关技术和产业的逐步成熟,继Hadoop之后,Spark技术以其无可比拟的优势,发展迅速,将成为替代Hadoop的下一代云计算、大数据核心技术。   Spark是基于内存,是云计算领域的继...

Spark亚太研究院
2014/06/12
79
0
Apache Spark调优(Tuning Spark)

由于Spark基于内存计算的特性,集群的任何资源都可以成为Spark程序的瓶颈:CPU,网络带宽,或者内存。通常,如果内存容得下数据,瓶颈会是网络带宽。不过有时你同样需要做些优化,例如将RDD以...

Spark
02/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring Boot + Mybatis-Plus 集成与使用(二)

前言: 本章节介绍MyBatis-Puls的CRUD使用。在开始之前,先简单讲解下上章节关于Spring Boot是如何自动配置MyBatis-Plus。 一、自动配置 当Spring Boot应用从主方法main()启动后,首先加载S...

伴学编程
昨天
7
0
用最通俗的方法讲spring [一] ──── AOP

@[TOC](用最通俗的方法讲spring [一] ──── AOP) 写这个系列的目的(可以跳过不看) 自己写这个系列的目的,是因为自己是个比较笨的人,我曾一度怀疑自己的智商不适合干编程这个行业.因为在我...

小贼贼子
昨天
7
0
Flutter系列之在 macOS 上安装和配置 Flutter 开发环境

本文为Flutter开发环境在macOS下安装全过程: 一、系统配置要求 想要安装并运行 Flutter,你的开发环境需要最低满足以下要求: 操作系统:macOS(64位) 磁盘空间:700 MB(不包含 IDE 或其余...

過愙
昨天
6
0
OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
昨天
2.5K
16
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
昨天
42
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部