Spark Streaming + Kafka 基于广播变量优化

原创
2017/06/11 13:23
阅读数 4K

一、问题描述

    主要是讨论的问题是Spark应用程序中的对象序列化和生命周期管理。经常会遇到java.io.NotSerializableException异常,这主要是因为某部分应用程序代码在Spark driver运行,而其他部分在Spark executors。下面进行简单的分析,在Spark driver上创建2个job,并在集群上的executor上执行。通常情况下,job的数量取决于应用程序的业务逻辑,而task的数量取决于数据分区。

代码片段

dstream.foreachRDD { rdd =>   //outer loop
  val where1 = "on the driver"
    rdd.foreach { record =>   //inner loop
      val where2 = "on different executors"
    }
  }
}

    外部循环的rdd是运行在本地的driver中,RDD(弹性分布式数据集)是一种将数据透明分布到集群节点的结构。driver是唯一可以访问rdd的地方。

    但是内部循环是以分布式方式运行的。RDD将被分区,内部循环遍历Spark executor上的RDD元素的子集。

    Spark使用Java(或Kyro)序列化将应用程序对象从driver发送到executors。首先,应用程序类必须用scala.Serializable或java.io.Serializable进行标记,以避免奇怪的异常。但这至少有两个缺点:

        a. 复杂的对象被序列化并发送到远程的集群节点时,可能会出现性能损失。但可以使用Spark的广播变量来优化它。

        b. 不是一切都是可序列化的,例如:TCP套接字不能被序列化并在节点之间发送。

二、整合Spark Streaming和Kafka producer

    在介绍之后,我们将在应用程序中讨论和解决该问题。该应用程序是部署在YARN群集上的长时间运行的Spark Streaming作业。该job从Kafka中接收数据,校验数据,将其转换成Avro二进制格式,并将其发送到另一个Kafka的topic。

代码片段

dstream.foreachRDD { rdd =>
  val producer = createKafkaProducer()
  rdd.foreach { message =>
    producer.send(message)
  }
  producer.close()
}

    producer是在driver上创建,但消息被发送到executor。这个producer与Kafka的brokers保持套接字连接,以至于它不能被序列化和在网络中发送。因此,该代码片段会抛java.io.NotSerializableException异常。

代码片段

dstream.foreachRDD { rdd =>
  rdd.foreach { message =>
    val producer = createKafkaProducer()
    producer.send(message)
    producer.close()
  }
}

    producer是在executor上创建并关闭,所以它不需要序列化。上述代码片段,每个消息都会创建和关闭producer。与集群建立连接是需要时间的,由于Kafka procuder需要在所有的分区中查找leaders,所以它比打开一个普通的套接字连接更耗时。

代码片段

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val producer = createKafkaProducer()
    partitionOfRecords.foreach { message =>
      producer.send(message)
    }
    producer.close()
  }
}

    采用foreachPartition来改进。每个partition的每条记录都是通过Spark task在单个executor上操作。您可以安全共享Kafka procuder(线程安全的)实例。但是在(20k消息/秒,64个分区,2秒批处理)的规模下,Kafka procuder每2秒创建64次,平均只发送625条消息。

    Kafka的procuder是一个线程安全的对象,所以它很容易被同一个JVM的多个Spark task共享。此外,Kafka procuder是异步的,并在发送之前缓冲数据。如果采用连接池的方式,需要考虑将procuder返回连接池中,数据是否仍在处理。

三、解决问题

    基于Scala的关键字lazy延迟加载和Spark广播机制来解决该问题。当Kafka的procuder完全被driver管理时,要确保只有一次使用Spark广播机制发送KafkaSink对象。

val kafkaSink = sparkContext.broadcast(KafkaSink(conf))

dstream.foreachRDD { rdd =>
  rdd.foreach { message =>
    kafkaSink.value.send(message)
  }
}

    KafkaSink类是Kafka procuder的简单封装。我们只需要发送一个如何在执行器中创建Kafka procuder的"方式",而不是procuder它本身。该类是可序列化的,而Kafka procuder在executor首次使用之前初始化。KafkaSink类的构造函数使用一个函数,该函数在调用时会返回Kafka procuder。一旦创建了Kafka procuder,它就被分配到一个procuder的变量中,避免在每一个send( )调用时初始化。

class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {

  lazy val producer = createProducer()

  def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
}

object KafkaSink {
  def apply(config: Map[String, Object]): KafkaSink = {
    val f = () => {
      new KafkaProducer[String, String](config)
    }
    new KafkaSink(f)
  }
}

    在生产部署之前,需要对KafkaSink进行一些改进。在executor的JVM关闭之前,我们必须关闭Kafka procuder。缺少这一步,Kafka procuder内部缓冲的所有消息都将丢失。

object KafkaSink {
  def apply(config: Map[String, Object]): KafkaSink = {
    val f = () => {
      val producer = new KafkaProducer[String, String](config)

      sys.addShutdownHook {
        producer.close()
      }

      producer
    }
    new KafkaSink(f)
  }
}

    函数f在executor上运行时,我们可以注册关闭的钩子来关闭Kafka procuder。关闭的钩子在executor的JVM关闭之前执行,Kafka procuder将刷新所有缓冲的消息。

四、总结

    虽然这篇文章涉及到Kafka,但这种方法可以很容易地适应其他情况,如需要创建有限数量复杂的,不可序列化的对象的实例。

 

 

 

展开阅读全文
打赏
1
2 收藏
分享
加载中
更多评论
打赏
0 评论
2 收藏
1
分享
返回顶部
顶部