spark 广播变量 之广播表(dataframe)

原创
2018/01/10 11:42
阅读数 3.6K

Broadcast variables(广播变量)允许程序员将一个 read-only(只读的)变量缓存到每台机器上,而不是给任务传递一个副本。它们是如何来使用呢,例如,广播变量可以用一种高效的方式给每个节点传递一份比较大的 input dataset(输入数据集)副本。在使用广播变量时,Spark 也尝试使用高效广播算法分发 broadcast variables(广播变量)以降低通信成本。

Spark 的 action(动作)操作是通过一系列的 stage(阶段)进行执行的,这些 stage(阶段)是通过分布式的 "shuffle" 操作进行拆分的。Spark 会自动广播出每个 stage(阶段)内任务所需要的公共数据。这种情况下广播的数据使用序列化的形式进行缓存,并在每个任务运行前进行反序列化。这也就意味着,只有在跨越多个 stage(阶段)的多个任务会使用相同的数据,或者在使用反序列化形式的数据特别重要的情况下,使用广播变量会有比较好的效果。(来自官网描述)

1、如果广播一个rdd的值的话,可以把rdd.collect 汇总到driver端,然后通过sc广播到excutor

2、但是如果广播一个表(datafram),上面的那种方式就不能这样做了

例如:我广播一个手机号码归属地的表,结构是(1353763,深圳市)

代码如下

  val df_operate = sqlContext.sparkContext
      .textFile(inpath_hdfs)
      .map(t => {
        val arr = t.split("\001")
        arr
      }).filter(t => {
      t.length == 7
    }).map(t => {
      (t(1), t(6))
    }).toDF("cell_seg", "cell_city")
    
//   进行广播
    val broadcast_df_standard = sc.broadcast(df_operate.as("t_base_phone_to_operate"))

 

sparkDS 的代码如下

contact_DStream.foreachRDD { rdd =>

      if (!rdd.isEmpty) {
        //        val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)

        val sparkSession: SparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
        import sparkSession.implicits._

        //        val logger = Logger.getLogger(Contact.getClass.getName)
        val es_url = broadcast_es_uri1.value
        val es_port = broadcast_es_port.value
        val es_index = broadcast_es_index.value
        val es_type = broadcast_es_type.value

        //广播变量的值
        val url_config = broadcast_url_config.value
        val user_config = broadcast_user_config.value
        val passwd_config = broadcast_passwd_config.value
        val table_config_name = broadcast_table_config.value


        val index2kinesisStreamName = broadcast_index2kinesis.value


        val df_standard = broadcast_df_standard.value

        //外部数据归属地
//        df_standard.createGlobalTempView("t_base_phone_to_operate")
        df_standard.createOrReplaceTempView("t_base_phone_to_operate")
//      数据cache到内存里,防止大量的oom
        sparkSession.catalog.cacheTable("t_base_phone_to_operate")

       
//      cache后的数据记得要释放,不然太占内存了       
        sparkSession.catalog.clearCache()

      }
    }
    ssc.start()
    ssc.awaitTermination()
  }

 

 

展开阅读全文
打赏
1
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
1
分享
返回顶部
顶部