文档章节

kafka + spark streaming 确保不丢失不重复消费的offset管理方法

d
 dreamness
发布于 07/04 10:29
字数 310
阅读 44
收藏 0

在使用Spark Streaming消费kafka中的数据时, 官方推荐将offset信息存储在kafka中

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

但是kafka不支持事务,这样可能会出现问题, 若spark当前批次开始处理时, 上一批次的offset没有更新完或更新失败, 就会发生重复消费的状况。

因此可以利用数据库的事务, 将业务逻辑及更新offset一起放在事务中(数据存放到哪里, offset就存放在哪里), 利用了事务的原子性、幂等性保证避免了数据重复消费、没有消费的情况。

val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
  new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
}.toMap

val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
)

stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  val results = yourCalculation(rdd)

  // begin your transaction

  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly

  // end your transaction
}
参考资料:(spark官方文档)http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

© 著作权归作者所有

d

dreamness

粉丝 3
博文 48
码字总数 17330
作品 0
沈阳
私信 提问
spark-streaming-kafka之createDirectStream模式

最近一直在用directstream方式消费kafka中的数据,特此总结,整个代码工程分为三个部分 一. 完整工程代码如下(某些地方特意做了说明, 这个代码的部分函数直接用的是spark-streaming-kafka-0...

hblt-j
2018/11/16
89
0
Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式

Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据了。 ...

张欢19933
2017/06/19
164
0
Spark Streaming 中使用kafka低级api+zookeeper 保存 offset 并重用 以及 相关代码整合

在 Spark Streaming 中消费 Kafka 数据的时候,有两种方式分别是 1)基于 Receiver-based 的 createStream 方法和 2)Direct Approach (No Receivers) 方式的 createDirectStream 方法,详细...

cql252283126
2018/04/13
0
0
干货 | Spark Streaming 和 Flink 详细对比

本文从编程模型、任务调度、时间机制、Kafka 动态分区的感知、容错及处理语义、背压等几个方面对比 Spark Stream 与 Flink,希望对有实时处理需求业务的企业端用户在框架选型有所启发。本文篇...

xiaomin0322
05/29
59
0
kafka direct方式获取数据解析

---title: Spark Streaming基于kafka获取数据源码解析subtitle: kafka的direct方式获取数据description: kafkaRDD计算解析keywords: [spark,kafkaRDD,direct]date: 2019-01-08tags: [spark,k......

freeli
01/09
49
0

没有更多内容

加载失败,请刷新页面

加载更多

iptables删除命令中的相关问题

最近在做一个中间件的配置工作,在配置iptables的时候,当用户想删除EIP(即释放当前连接),发现使用iptables的相关命令会提示错误。iptables: Bad rule (does a matching rule exist in t...

xiangyunyan
43分钟前
2
0
IT兄弟连 HTML5教程 HTML5表单 新增的表单属性1

HTML5 Input表单为<form>和<input>标签添加了几个新属性,属性如表1。 1 autocomplete属性 autocomplete属性规定form或input域应该拥有自动完成功能,当用户在自动完成域中开始输入时,浏览器...

老码农的一亩三分地
今天
7
0
OSChina 周五乱弹 —— 葛优理论+1

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @这次装个文艺青年吧 :#今日歌曲推荐# 分享米津玄師的单曲《LOSER》: mv中的舞蹈诡异却又美丽,如此随性怕是难再跳出第二次…… 《LOSER》-...

小小编辑
今天
1K
21
nginx学习笔记

中间件位于客户机/ 服务器的操作系统之上,管理计算机资源和网络通讯。 是连接两个独立应用程序或独立系统的软件。 web请求通过中间件可以直接调用操作系统,也可以经过中间件把请求分发到多...

码农实战
今天
5
0
Spring Security 实战干货:玩转自定义登录

1. 前言 前面的关于 Spring Security 相关的文章只是一个预热。为了接下来更好的实战,如果你错过了请从 Spring Security 实战系列 开始。安全访问的第一步就是认证(Authentication),认证...

码农小胖哥
今天
16
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部