文档章节

Spark Streaming + Kafka Integration Guide

刺猬一号
 刺猬一号
发布于 2018/07/18 20:48
字数 1535
阅读 74
收藏 4

The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses the new Kafka consumer API instead of the simple API, there are notable differences in usage. This version of the integration is marked as experimental, so the API is potentially subject to change.

Linking

For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see Linking section in the main programming guide for further information).

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.11
version = 2.3.1

Do not manually add dependencies on org.apache.kafka artifacts (e.g. kafka-clients). The spark-streaming-kafka-0-10 artifact has the appropriate transitive dependencies already, and different versions may be incompatible in hard to diagnose ways.

Creating a Direct Stream

Note that the namespace for the import includes the version, org.apache.spark.streaming.kafka010

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

Each item in the stream is a ConsumerRecord

For possible kafkaParams, see Kafka consumer config docs. If your Spark batch duration is larger than the default Kafka heartbeat session timeout (30 seconds), increase heartbeat.interval.ms and session.timeout.ms appropriately. For batches larger than 5 minutes, this will require changing group.max.session.timeout.ms on the broker. Note that the example sets enable.auto.commit to false, for discussion see Storing Offsets below.

LocationStrategies

The new Kafka consumer API will pre-fetch messages into buffers. Therefore it is important for performance reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each batch), and prefer to schedule partitions on the host locations that have the appropriate consumers.

In most cases, you should use LocationStrategies.PreferConsistent as shown above. This will distribute partitions evenly across available executors. If your executors are on the same hosts as your Kafka brokers, use PreferBrokers, which will prefer to schedule partitions on the Kafka leader for that partition. Finally, if you have a significant skew in load among partitions, use PreferFixed. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location).

The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via spark.streaming.kafka.consumer.cache.maxCapacity.

If you would like to disable the caching for Kafka consumers, you can set spark.streaming.kafka.consumer.cache.enabled to false. Disabling the cache may be needed to workaround the problem described in SPARK-19185. This property may be removed in later versions of Spark, once SPARK-19185 is resolved.

The cache is keyed by topicpartition and group.id, so use a separate group.id for each call to createDirectStream.

ConsumerStrategies

The new Kafka consumer API has a number of different ways to specify topics, some of which require considerable post-object-instantiation setup.ConsumerStrategies provides an abstraction that allows Spark to obtain properly configured consumers even after restart from checkpoint.

ConsumerStrategies.Subscribe, as shown above, allows you to subscribe to a fixed collection of topics. SubscribePattern allows you to use a regex to specify topics of interest. Note that unlike the 0.8 integration, using Subscribe or SubscribePattern should respond to adding partitions during a running stream. Finally, Assign allows you to specify a fixed collection of partitions. All three strategies have overloaded constructors that allow you to specify the starting offset for a particular partition.

If you have specific consumer setup needs that are not met by the options above, ConsumerStrategy is a public class that you can extend.

Creating an RDD

If you have a use case that is better suited to batch processing, you can create an RDD for a defined range of offsets.

// Import dependencies and create kafka params as in Create Direct Stream above

val offsetRanges = Array(
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange("test", 0, 0, 100),
  OffsetRange("test", 1, 0, 100)
)

val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)

Note that you cannot use PreferBrokers, because without the stream there is not a driver-side consumer to automatically look up broker metadata for you. Use PreferFixed with your own metadata lookups if necessary.

Obtaining Offsets

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
  rdd.foreachPartition { iter =>
    val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
    println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
  }
}

Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the result of createDirectStream, not later down a chain of methods. Be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().

Storing Offsets

Kafka delivery semantics in the case of failure depend on how and when offsets are stored. Spark output operations are at-least-once. So if you want the equivalent of exactly-once semantics, you must either store offsets after an idempotent output, or store offsets in an atomic transaction alongside output. With this integration, you have 3 options, in order of increasing reliability (and code complexity), for how to store offsets.

Checkpoints

If you enable Spark checkpointing, offsets will be stored in the checkpoint. This is easy to enable, but there are drawbacks. Your output operation must be idempotent, since you will get repeated outputs; transactions are not an option. Furthermore, you cannot recover from a checkpoint if your application code has changed. For planned upgrades, you can mitigate this by running the new code at the same time as the old code (since outputs need to be idempotent anyway, they should not clash). But for unplanned failures that require code changes, you will lose data unless you have another way to identify known good starting offsets.

Kafka itself

Kafka has an offset commit API that stores offsets in a special Kafka topic. By default, the new consumer will periodically auto-commit offsets. This is almost certainly not what you want, because messages successfully polled by the consumer may not yet have resulted in a Spark output operation, resulting in undefined semantics. This is why the stream example above sets “enable.auto.commit” to false. However, you can commit offsets to Kafka after you know your output has been stored, using the commitAsync API. The benefit as compared to checkpoints is that Kafka is a durable store regardless of changes to your application code. However, Kafka is not transactional, so your outputs must still be idempotent.

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations. The commitAsync call is threadsafe, but must occur after outputs if you want meaningful semantics.

Your own data store

For data stores that support transactions, saving offsets in the same transaction as the results can keep the two in sync, even in failure situations. If you’re careful about detecting repeated or skipped offset ranges, rolling back the transaction prevents duplicated or lost messages from affecting results. This gives the equivalent of exactly-once semantics. It is also possible to use this tactic even for outputs that result from aggregations, which are typically hard to make idempotent.

// The details depend on your data store, but the general idea looks like this

// begin from the the offsets committed to the database
val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = yourCalculation(rdd)

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
}

SSL / TLS

The new Kafka consumer supports SSL. To enable it, set kafkaParams appropriately before passing to createDirectStream / createRDD. Note that this only applies to communication between Spark and Kafka brokers; you are still responsible for separately securing Spark inter-node communication.

val kafkaParams = Map[String, Object](
  // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
  "security.protocol" -> "SSL",
  "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
  "ssl.truststore.password" -> "test1234",
  "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
  "ssl.keystore.password" -> "test1234",
  "ssl.key.password" -> "test1234"
)

Deploying

As with any Spark applications, spark-submit is used to launch your application.

For Scala and Java applications, if you are using SBT or Maven for project management, then package spark-streaming-kafka-0-10_2.11 and its dependencies into the application JAR. Make sure spark-core_2.11 and spark-streaming_2.11 are marked as provided dependencies as those are already present in a Spark installation. Then use spark-submit to launch your application (see Deploying section in the main programming guide).

© 著作权归作者所有

共有 人打赏支持
刺猬一号
粉丝 11
博文 373
码字总数 616361
作品 0
深圳
私信 提问
Spark Streaming 中使用kafka低级api+zookeeper 保存 offset 并重用 以及 相关代码整合

在 Spark Streaming 中消费 Kafka 数据的时候,有两种方式分别是 1)基于 Receiver-based 的 createStream 方法和 2)Direct Approach (No Receivers) 方式的 createDirectStream 方法,详细...

cql252283126
2018/04/13
0
0
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
0
1
整合Kafka到Spark Streaming——代码示例和挑战

作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管。本文,Michael详细的演示了如何将Kafka整合到Spark...

stark_summer
2015/03/03
0
0
Spring XD 1.1 RC1 发布

Spring XD 1.1 RC1 发布,此版本现已提供下载,包括一些新特性和 bug 修复,主要是 Stream 处理。Spring XD 1.1. M2 中处理模块引入了对 Reactor 的 Stream API 的支持。在 1.1 RC1 支持使用...

oschina
2015/01/29
1K
4
Apache Spark 2.4.0 正式发布

Apache Spark 2.4 与昨天正式发布,Apache Spark 2.4 版本是 2.x 系列的第五个版本。 如果想及时了解 Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号: itebloghadoop Apache Spa...

Spark
2018/11/09
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Temp-Memo

Recommended Ref : SQL High CPU troubleshooting checklist -- Top 50 SQL highly consuming cpuSELECT TOP 50[Avg. MultiCore/CPU time(sec)] = qs.total_worker_time / 1000000 / qs......

Goopand
18分钟前
1
0
dotConnect for Oracle入门指南(七):存储过程

【下载dotConnect for Oracle最新版本】 dotConnect for Oracle(原名OraDirect.NET)建立在ADO.NET技术上,为基于Oracle数据库的应用程序提供完整的解决方案。它为设计应用程序结构带来了新的...

电池盒
20分钟前
2
0
如何使用阿里云ARMS轻松重现用户浏览器问题

客户投诉不断,本地却无法重现? 页面加载较慢是用户经常会反馈的问题,也是前端非常关注的问题之一。但定位、排查解决这类问题就通常会花费非常多的时间,主要原因如下: 页面是在用户端的浏...

阿里云官方博客
24分钟前
1
0
因资源用尽导致服务宕机

1. 事故的发生 服务调用场景和发生的事件如下图所示,红色表示服务不可用. 服务A和服务B都是内部服务,服务C_*为不同运营商提供的服务,遵循一样的协议。 某一天,突然发现所有服务A调用服务...

北风刮的不认真了
28分钟前
3
0
锤子科技"临死前"被"接盘" ,内部人士爆料已改签今日头条母公司

就在昨天,据据锤子科技内部人士透露,部分锤子科技员工在昨天已经接到了相关的临时通知,要求改签劳动合同至今日头条的母公司——字节跳动。至于这是锤子科技真正再度复活还是借尸还魂都不重...

终端研发部
38分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部