文档章节

Spark Streaming + Kafka 基于广播变量优化

k
 kunping
发布于 2017/06/11 13:23
字数 1240
阅读 534
收藏 2

一、问题描述

    主要是讨论的问题是Spark应用程序中的对象序列化和生命周期管理。经常会遇到java.io.NotSerializableException异常,这主要是因为某部分应用程序代码在Spark driver运行,而其他部分在Spark executors。下面进行简单的分析,在Spark driver上创建2个job,并在集群上的executor上执行。通常情况下,job的数量取决于应用程序的业务逻辑,而task的数量取决于数据分区。

代码片段

dstream.foreachRDD { rdd =>   //outer loop
  val where1 = "on the driver"
    rdd.foreach { record =>   //inner loop
      val where2 = "on different executors"
    }
  }
}

    外部循环的rdd是运行在本地的driver中,RDD(弹性分布式数据集)是一种将数据透明分布到集群节点的结构。driver是唯一可以访问rdd的地方。

    但是内部循环是以分布式方式运行的。RDD将被分区,内部循环遍历Spark executor上的RDD元素的子集。

    Spark使用Java(或Kyro)序列化将应用程序对象从driver发送到executors。首先,应用程序类必须用scala.Serializable或java.io.Serializable进行标记,以避免奇怪的异常。但这至少有两个缺点:

        a. 复杂的对象被序列化并发送到远程的集群节点时,可能会出现性能损失。但可以使用Spark的广播变量来优化它。

        b. 不是一切都是可序列化的,例如:TCP套接字不能被序列化并在节点之间发送。

二、整合Spark Streaming和Kafka producer

    在介绍之后,我们将在应用程序中讨论和解决该问题。该应用程序是部署在YARN群集上的长时间运行的Spark Streaming作业。该job从Kafka中接收数据,校验数据,将其转换成Avro二进制格式,并将其发送到另一个Kafka的topic。

代码片段

dstream.foreachRDD { rdd =>
  val producer = createKafkaProducer()
  rdd.foreach { message =>
    producer.send(message)
  }
  producer.close()
}

    producer是在driver上创建,但消息被发送到executor。这个producer与Kafka的brokers保持套接字连接,以至于它不能被序列化和在网络中发送。因此,该代码片段会抛java.io.NotSerializableException异常。

代码片段

dstream.foreachRDD { rdd =>
  rdd.foreach { message =>
    val producer = createKafkaProducer()
    producer.send(message)
    producer.close()
  }
}

    producer是在executor上创建并关闭,所以它不需要序列化。上述代码片段,每个消息都会创建和关闭producer。与集群建立连接是需要时间的,由于Kafka procuder需要在所有的分区中查找leaders,所以它比打开一个普通的套接字连接更耗时。

代码片段

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val producer = createKafkaProducer()
    partitionOfRecords.foreach { message =>
      producer.send(message)
    }
    producer.close()
  }
}

    采用foreachPartition来改进。每个partition的每条记录都是通过Spark task在单个executor上操作。您可以安全共享Kafka procuder(线程安全的)实例。但是在(20k消息/秒,64个分区,2秒批处理)的规模下,Kafka procuder每2秒创建64次,平均只发送625条消息。

    Kafka的procuder是一个线程安全的对象,所以它很容易被同一个JVM的多个Spark task共享。此外,Kafka procuder是异步的,并在发送之前缓冲数据。如果采用连接池的方式,需要考虑将procuder返回连接池中,数据是否仍在处理。

三、解决问题

    基于Scala的关键字lazy延迟加载和Spark广播机制来解决该问题。当Kafka的procuder完全被driver管理时,要确保只有一次使用Spark广播机制发送KafkaSink对象。

val kafkaSink = sparkContext.broadcast(KafkaSink(conf))

dstream.foreachRDD { rdd =>
  rdd.foreach { message =>
    kafkaSink.value.send(message)
  }
}

    KafkaSink类是Kafka procuder的简单封装。我们只需要发送一个如何在执行器中创建Kafka procuder的"方式",而不是procuder它本身。该类是可序列化的,而Kafka procuder在executor首次使用之前初始化。KafkaSink类的构造函数使用一个函数,该函数在调用时会返回Kafka procuder。一旦创建了Kafka procuder,它就被分配到一个procuder的变量中,避免在每一个send( )调用时初始化。

class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

  lazy val producer = createProducer()

  def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
}

object KafkaSink {
  def apply(config: Map[String, Object]): KafkaSink = {
    val f = () => {
      new KafkaProducer[String, String](config)
    }
    new KafkaSink(f)
  }
}

    在生产部署之前,需要对KafkaSink进行一些改进。在executor的JVM关闭之前,我们必须关闭Kafka procuder。缺少这一步,Kafka procuder内部缓冲的所有消息都将丢失。

object KafkaSink {
  def apply(config: Map[String, Object]): KafkaSink = {
    val f = () => {
      val producer = new KafkaProducer[String, String](config)

      sys.addShutdownHook {
        producer.close()
      }

      producer
    }
    new KafkaSink(f)
  }
}

    函数f在executor上运行时,我们可以注册关闭的钩子来关闭Kafka procuder。关闭的钩子在executor的JVM关闭之前执行,Kafka procuder将刷新所有缓冲的消息。

四、总结

    虽然这篇文章涉及到Kafka,但这种方法可以很容易地适应其他情况,如需要创建有限数量复杂的,不可序列化的对象的实例。

 

 

 

© 著作权归作者所有

k
粉丝 3
博文 61
码字总数 29867
作品 0
深圳
私信 提问
Spark Streaming应该如何消费Kafka?

前言 在项目中,需要对每天千万级的游戏评论信息进行词频统计,在生产者一端,我们将数据按照每天的拉取时间存入了Kafka当中,而在消费者一端,我们利用了spark streaming从kafka中不断拉取数...

王知无
07/20
0
0
Spark实战(一)SparkStreaming集成Kafka

Spark Streaming + Kafka集成指南 Kafka项目在版本0.8和0.10之间引入了一个新的消费者API,因此有两个独立的相应Spark Streaming包可用。请选择正确的包, 请注意,0.8集成与后来的0.9和0.1...

FrankDeng
2018/07/15
0
0
Spark调优 | Spark Streaming 调优实践

京东HBase招聘,坐标京东 , 北辰职场 HBase 资深大数据工程师(HBase/Hadoop/Phoenix) 主要职责: 1. 构建分布式大数据服务平台,参与构建公司海量数据存储、实时查询系统; 2. 负责HBase...

HBase技术社区
08/12
0
0
基于Spark SQL实现对HDFS操作的实时监控报警

1.前言 E-MapReduce计划从EMR-3.18.1版本开始提供Spark Streaming SQL的预览版功能。Spark Streaming SQL是在Spark Structured Streaming的基础上做了进一步封装,方便用户使用SQL语言进行S...

鱼跟猫
04/12
0
0
Spark实战 | Kafka与Spark Streaming的联姻

Kafka与Spark虽然没有很直接的必然关系,但是实际应用中却经常以couple的形式存在。如果你的Kafka的爱好者,流式计算框架Spark、Flink等也不得不去了解;如果你是Spark的爱好者,Kafka又或许...

HBase技术社区
02/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周日乱弹 —— 我,小小编辑,食人族酋长

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @宇辰OSC :分享娃娃的单曲《飘洋过海来看你》: #今日歌曲推荐# 《飘洋过海来看你》- 娃娃 手机党少年们想听歌,请使劲儿戳(这里) @宇辰OSC...

小小编辑
今天
161
8
spring cloud

一、从面试题入手 1.1、什么事微服务 1.2、微服务之间如何独立通讯的 1.3、springCloud和Dubbo有哪些区别 1.通信机制:DUbbo基于RPC远程过程调用;微服务cloud基于http restFUL API 1.4、spr...

榴莲黑芝麻糊
今天
2
0
Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
6
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
昨天
8
0
详解箭头函数和普通函数的区别以及箭头函数的注意事项、不适用场景

箭头函数是ES6的API,相信很多人都知道,因为其语法上相对于普通函数更简洁,深受大家的喜爱。就是这种我们日常开发中一直在使用的API,大部分同学却对它的了解程度还是不够深... 普通函数和...

OBKoro1
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部