文档章节

广播变量和累加器是多少首都师大

新大陆2号
 新大陆2号
发布于 2017/03/22 16:15
字数 1195
阅读 3
收藏 0

一、广播变量和累加器

1.1 广播变量:

广播变量允许程序员将一个只读的变量缓存在每台机器上,而不用在任务之间传递变量。广播变量可被用于有效地给每个节点一个大输入数据集的副本。Spark还尝试使用高效地广播算法来分发变量,进而减少通信的开销。 Spark的动作通过一系列的步骤执行,这些步骤由分布式的洗牌操作分开。Spark自动地广播每个步骤每个任务需要的通用数据。这些广播数据被序列化地缓存,在运行任务之前被反序列化出来。这意味着当我们需要在多个阶段的任务之间使用相同的数据,或者以反序列化形式缓存数据是十分重要的时候,显式地创建广播变量才有用。

1.2 累加器:

累加器是仅仅被相关操作累加的变量,因此可以在并行中被有效地支持。它可以被用来实现计数器和总和。Spark原生地只支持数字类型的累加器,编程者可以添加新类型的支持。如果创建累加器时指定了名字,可以在Spark的UI界面看到。这有利于理解每个执行阶段的进程。(对于python还不支持) 累加器通过对一个初始化了的变量v调用SparkContext.accumulator(v)来创建。在集群上运行的任务可以通过add或者”+=”方法在累加器上进行累加操作。但是,它们不能读取它的值。只有驱动程序能够读取它的值,通过累加器的value方法。

二.Java和Scala版本的实战演示

2.1 Java版本:

package com.Streaming;

import org.apache.spark.Accumulator;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaStreamingContext;


import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import scala.Tuple2;

import java.util.*;

/**
 * 利用广播进行黑名单过滤!
 *
 * 无论是计数器还是广播!都不是想象的那么简单!
 * 联合使用非常强大!!!绝对是高端应用!
 *
 * 如果 联合使用扩展的话,该怎么做!!!
 *
 * ?
 */
public class BroadcastAccumulator {

	/**
	 * 肯定要创建一个广播List
	 *
	 * 在上下文中实例化!
     */
	private static volatile Broadcast<List<String>> broadcastList = null;

	/**
	 * 计数器!
	 * 在上下文中实例化!
	 */
	private static volatile Accumulator<Integer> accumulator = null;

	public static void main(String[] args) {

		SparkConf conf = new SparkConf().setMaster("local[2]").
				setAppName("WordCountOnlieBroadcast");

		JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));


		/**
		 * 没有action的话,广播并不会发出去!
		 *
		 * 使用broadcast广播黑名单到每个Executor中!
		 */
		broadcastList = jsc.sc().broadcast(Arrays.asList("Hadoop","Mahout","Hive"));

		/**
		 * 全局计数器!用于统计在线过滤了多少个黑名单!
		 */
		accumulator = jsc.sparkContext().accumulator(0,"OnlineBlackListCounter");


		JavaReceiverInputDStream<String> lines = jsc.socketTextStream("Master", 9999);


		/**
		 * 这里省去flatmap因为名单是一个个的!
		 */
		JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
			@Override
			public Tuple2<String, Integer> call(String word) {
				return new Tuple2<String, Integer>(word, 1);
			}
		});

		JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
			@Override
			public Integer call(Integer v1, Integer v2) {
				return v1 + v2;
			}
		});

		/**
		 * Funtion里面 前几个参数是 入参。
		 * 后面的出参。
		 * 体现在call方法里面!
		 *
		 * 这里直接基于RDD进行操作了!
		 */
		wordsCount.foreach(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
			@Override
			public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws Exception {
				rdd.filter(new Function<Tuple2<String, Integer>, Boolean>() {
					@Override
					public Boolean call(Tuple2<String, Integer> wordPair) throws Exception {
						if (broadcastList.value().contains(wordPair._1)) {

							/**
							 * accumulator不应该仅仅用来计数。
							 * 可以同时写进数据库或者redis中!
							 */
							accumulator.add(wordPair._2);
							return false;
						}else {
							return true;
						}
					};
					/**
					 * 这里真的希望 广播和计数器执行的话。要进行一个action操作!
					 */
				}).collect();

				System.out.println("广播器里面的值"+broadcastList.value());
				System.out.println("计时器里面的值"+accumulator.value());
				return null;
			}
		});


		jsc.start();
		jsc.awaitTermination();
		jsc.close();

	}

	}



2.2 Scala版本

package com.Streaming

import java.util

import org.apache.spark.streaming.{Duration, StreamingContext}
import org.apache.spark.{Accumulable, Accumulator, SparkContext, SparkConf}
import org.apache.spark.broadcast.Broadcast

/**
  * Created by lxh on 2016/6/30.
  */
object BroadcastAccumulatorStreaming {

  /**
    * 声明一个广播和累加器!
    */
  private var broadcastList:Broadcast[List[String]]  = _
  private var accumulator:Accumulator[Int] = _

  def main(args: Array[String]) {

    val sparkConf = new SparkConf().setMaster("local[4]").setAppName("broadcasttest")
    val sc = new SparkContext(sparkConf)

    /**
      * duration是ms
      */
    val ssc = new StreamingContext(sc,Duration(2000))
   // broadcastList = ssc.sparkContext.broadcast(util.Arrays.asList("Hadoop","Spark"))
    broadcastList = ssc.sparkContext.broadcast(List("Hadoop","Spark"))
    accumulator= ssc.sparkContext.accumulator(0,"broadcasttest")

    /**
      * 获取数据!
      */
    val lines = ssc.socketTextStream("localhost",9999)

    /**
      * 拿到数据后 怎么处理!
      *
      * 1.flatmap把行分割成词。
      * 2.map把词变成tuple(word,1)
      * 3.reducebykey累加value
      * (4.sortBykey排名)
      * 4.进行过滤。 value是否在累加器中。
      * 5.打印显示。
      */
    val words = lines.flatMap(line => line.split(" "))

    val wordpair = words.map(word => (word,1))

    wordpair.filter(record => {broadcastList.value.contains(record._1)})


    val pair = wordpair.reduceByKey(_+_)

    /**
      *这步为什么要先foreachRDD?
      *
      * 因为这个pair 是PairDStream<String, Integer>
      *
      *   进行foreachRDD是为了?
      *
      */
/*    pair.foreachRDD(rdd => {
      rdd.filter(record => {

        if (broadcastList.value.contains(record._1)) {
          accumulator.add(1)
          return true
        } else {
          return false
        }

      })

    })*/

    val filtedpair = pair.filter(record => {
        if (broadcastList.value.contains(record._1)) {
          accumulator.add(record._2)
          true
        } else {
          false
        }

     }).print

    println("累加器的值"+accumulator.value)

   // pair.filter(record => {broadcastList.value.contains(record._1)})

   /* val keypair = pair.map(pair => (pair._2,pair._1))*/

    /**
      * 如果DStream自己没有某个算子操作。就通过转化transform!
      */
   /* keypair.transform(rdd => {
      rdd.sortByKey(false)//TODO
    })*/
    pair.print()
    ssc.start()
    ssc.awaitTermination()

  }

}

© 著作权归作者所有

下一篇: 桶表
新大陆2号
粉丝 0
博文 8
码字总数 2458
作品 0
东城
私信 提问
Spark共享变量(广播变量、累加器)

1.Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator) 累加器用来对信息进行聚合,而广播变量用来高效分发较大的对象。共享变量出现的原因: 通常在向 Spark 传递函...

张泽立
04/10
22
0
Spark2.1 共享变量(Broadcast Variables&Accumulators)分析。

版权声明:本文为博主原创文章,转载请标明出处:http://blog.csdn.net/leafagem https://blog.csdn.net/LeafageM/article/details/76381085 在spark中,当我们将一个function传递给算子去执...

繁城落叶
2017/07/30
0
0
Spark学习实例(Python):共享变量Shared Variables

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 https://blog.csdn.net/a544258023/article/details/97250230 通常在使用Spark算子函数,比如...

雷禄辉
08/20
0
0
Spark编程指南《Spark 官方文档》

《Spark 官方文档》Spark编程指南 spark-1.6.0 [原文地址] Spark编程指南 概述 总体上来说,每个Spark应用都包含一个驱动器(driver)程序,驱动器运行用户的main函数,并在集群上执行各种并...

openthings
2016/05/29
290
0
Apache Spark 官方文档 翻译 - 编程指南

最近用Apache Spark 处理一些大数据,学了spark官方英文文档,顺便翻译了方便学习。 spark 版本 2.2.0. 翻译官方文档原地址: http://spark.apache.org/docs/latest/rdd-programming-guide....

___k先生
2017/12/07
0
0

没有更多内容

加载失败,请刷新页面

加载更多

3_数组

3_数组

行者终成事
今天
7
0
经典系统设计面试题解析:如何设计TinyURL(二)

原文链接:https://www.educative.io/courses/grokking-the-system-design-interview/m2ygV4E81AR 编者注:本文以一道经典的系统设计面试题:《如何设计TinyURL》的参考答案和解析为例,帮助...

APEMESH
今天
7
0
使用logstash同步MySQL数据到ES

概述   在生成业务常有将MySQL数据同步到ES的需求,如果需要很高的定制化,往往需要开发同步程序用于处理数据。但没有特殊业务需求,官方提供的logstash就很有优势了。   在使用logstas...

zxiaofan666
今天
10
0
X-MSG-IM-分布式信令跟踪能力

经过一周多的鏖战, X-MSG-IM的分布式信令跟踪能力已基本具备, 特点是: 实时. 只有要RX/TX就会实时产生信令跟踪事件, 先入kafka, 再入influxdb待查. 同时提供实时sub/pub接口. 完备. 可以完整...

dev5
今天
7
0
OpenJDK之CyclicBarrier

OpenJDK8,本人看的是openJDK。以前就看过,只是经常忘记,所以记录下 图1 CyclicBarrier是Doug Lea在JDK1.5中引入的,作用就不详细描述了,主要有如下俩个方法使用: await()方法,如果当前线...

克虏伯
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部