文档章节

kafka + spark streaming 确保不丢失不重复消费的offset管理方法

d
 dreamness
发布于 07/04 10:29
字数 310
阅读 36
收藏 0

在使用Spark Streaming消费kafka中的数据时, 官方推荐将offset信息存储在kafka中

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

但是kafka不支持事务,这样可能会出现问题, 若spark当前批次开始处理时, 上一批次的offset没有更新完或更新失败, 就会发生重复消费的状况。

因此可以利用数据库的事务, 将业务逻辑及更新offset一起放在事务中(数据存放到哪里, offset就存放在哪里), 利用了事务的原子性、幂等性保证避免了数据重复消费、没有消费的情况。

val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = yourCalculation(rdd)

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
}
参考资料:(spark官方文档)http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

© 著作权归作者所有

d

dreamness

粉丝 2
博文 43
码字总数 14078
作品 0
沈阳
私信 提问
spark-streaming-kafka之createDirectStream模式

最近一直在用directstream方式消费kafka中的数据,特此总结,整个代码工程分为三个部分 一. 完整工程代码如下(某些地方特意做了说明, 这个代码的部分函数直接用的是spark-streaming-kafka-0...

hblt-j
2018/11/16
86
0
Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式

Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据了。 ...

张欢19933
2017/06/19
161
0
Spark Streaming 中使用kafka低级api+zookeeper 保存 offset 并重用 以及 相关代码整合

在 Spark Streaming 中消费 Kafka 数据的时候,有两种方式分别是 1)基于 Receiver-based 的 createStream 方法和 2)Direct Approach (No Receivers) 方式的 createDirectStream 方法,详细...

cql252283126
2018/04/13
0
0
干货 | Spark Streaming 和 Flink 详细对比

本文从编程模型、任务调度、时间机制、Kafka 动态分区的感知、容错及处理语义、背压等几个方面对比 Spark Stream 与 Flink,希望对有实时处理需求业务的企业端用户在框架选型有所启发。本文篇...

xiaomin0322
05/29
52
0
kafka direct方式获取数据解析

---title: Spark Streaming基于kafka获取数据源码解析subtitle: kafka的direct方式获取数据description: kafkaRDD计算解析keywords: [spark,kafkaRDD,direct]date: 2019-01-08tags: [spark,k......

freeli
01/09
48
0

没有更多内容

加载失败,请刷新页面

加载更多

springboot全家桶,集成shiro,rabbitmq,redis, mybatis,druid,swagger

搭建了一个SpringBoot的模板,下载即可作为单体应用的服务后台模板 项目地址:https://gitee.com/devilscode/springboot_template 项目集成框架 springboot 作为基础框架 shiro 权限管理 ra...

devils_os
14分钟前
2
0
云服务器的优势有哪些?

与传统的服务器相比,云服务器有多种显著的优势,因此近年来云服务器租用一直都很受消费者的欢迎。 那么云服务器都有哪些优势呢? 云服务器的稳定性很好 虚拟主机是很多用户建站的首选方案,这...

云漫网络Ruan
14分钟前
2
0
好程序员web前端学习路线分享CSS浮动-清除浮动篇

好程序员web前端学习路线分享CSS浮动-清除浮动篇,为什么要清除浮动   这里所说的清除浮动,并不是不要浮动了,而是清除浮动与浮动之间的影响。那么到底会有什么影响呢? 1.高度塌陷   ...

好程序员官方
25分钟前
1
0
我有酒,你有故事吗?

记录学习的每一秒,巨杉技术社区有奖征文全新启动! 如果, 你热爱数据技术,乐于尝(zhe)试(teng), 又喜欢写点什么~ 那么, 无论你是架构大佬,还是DBA大神, 只要你愿意通过文字分享你...

巨杉数据库
29分钟前
1
0
【Git】ignore文件不生效

简介 使用git的过程中, 某些文件可能不希望上传到服务器,例如日志、本地化配置等信息。这时候,我们可以通过配置.gitignore文件忽略这些文件的提交。 配置.gitignore 常见的配置如下: HELP...

Areya
31分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部