spark join和union+reducebykey 对比

原创
2019/08/20 11:19
阅读数 621

spark join和union+reducebykey 对比

对于某些聚合需求可以用union+reducebykey的方式替换join,join将会做两个表的连接操作比较消耗新能,union+reducebykey,可以在map的端进行预聚合,然后再reduce阶段再次聚合,涉及到的shuffle较少

具体举例,需求统计每个文件夹昨天和今天的文件数相加总和

hdfsdir filecount day
文件夹 文件数 日期

join实现如下

package cn.x2q

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

/**
  * @Auther: huowang
  * @Date: 14:15 2019/08/16
  * @DES: spark
  *       测试程序 测试  spark 的 join 和 union + reducebykey  效率
  *
  *
  * @Modified By:
  */
object TestJoin extends Logging {


  def main(args: Array[String]): Unit = {

    log.info("==============>start TestJoin  ====> ")
    val sparkConf = new SparkConf().setAppName("TestJoin").setMaster("local")
    //val sparkConf = new SparkConf().setAppName("TestJoin")
    val session = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
    /////////////////// 测试  //////////////////////////////////
    val df1 = session.sql(
      s"""
         |
         |SELECT
         |	t1.hdfsdir,
         |  (t1.filecount+t2.filecount) filecount
         |FROM
         |	ops.ops_fsimage_base t1
         |  full outer join ops.ops_fsimage_base t2
         |  on t1.hdfsdir=t2.hdfsdir and t2.day=20190814
         |where t1.day=20190815
         |
      """.stripMargin
    ).rdd
      .saveAsTextFile("/user/huowang/join")

    session.stop()

  }

}

DAG

union 实现

package cn.x2q

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession

/**
  * @Auther: huowang
  * @Date: 14:15 2019/08/16
  * @DES: spark
  *       测试程序 测试  spark 的 join 和 union + reducebykey  效率
  *
  *
  * @Modified By:
  */
object TestUnion extends Logging {


  def main(args: Array[String]): Unit = {

    log.info("==============>start TestUnion  ====> ")
    val sparkConf = new SparkConf().setAppName("TestUnion").setMaster("local")
   // val sparkConf = new SparkConf().setAppName("TestJoin")
    val session = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
    /////////////////// 测试  //////////////////////////////////
    val rdd = session.sql(
      s"""
         |
         |SELECT
         |	t1.hdfsdir hdfsdir,
         |  t1.filecount filecount
         |FROM
         |	ops.ops_fsimage_base t1
         |where t1.day=20190815
         |union all
         |SELECT
         |	t2.hdfsdir hdfsdir,
         |  t2.filecount filecount
         |FROM
         |	ops.ops_fsimage_base t2
         |where t2.day=20190814
      """.stripMargin
    ).rdd.map(x=>{
      (x.getString(0),x.getLong(1))
    }).reduceByKey(_+_)
      .saveAsTextFile("/user/huowang/union")
    session.stop()

  }

}

DAG

总结

可以看出 union的流程更简单,task的数量也更少

从执行的时间也可以看出 union耗时更少(由于数据量较少时间差不明显 但是可以看出union的耗时是更少的)

而且如果在不自定义输出文件的合并的情况下  union的输出结果更为集中 但是join的输出结果文件更为碎片化

 

展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部
返回顶部
顶部