文档章节

Spark+Kafka的Direct方式将偏移量发送到Zookeeper实现

hblt-j
 hblt-j
发布于 2018/11/16 20:59
字数 941
阅读 77
收藏 1

 Apache Spark 1.3.0引入了Direct API,利用Kafka的低层次API从Kafka集群中读取数据,并且在SparkStreaming系统里面维护偏移量相关的信息,并且通过这种方式去实现零数据丢失(zero data loss)相比使用基于Receiver的方法要高效。但是因为是Spark Streaming系统自己维护Kafka的读偏移量,而Spark Streaming系统并没有将这个消费的偏移量发送到Zookeeper中,这将导致那些基于偏移量的Kafka集群监控软件(比如:Apache Kafka监控之Kafka Web ConsoleApache Kafka监控之KafkaOffsetMonitor等)失效。本文就是基于为了解决这个问题,使得我们编写的Spark Streaming程序能够在每次接收到数据之后自动地更新Zookeeper中Kafka的偏移量。

  我们从Spark的官方文档可以知道,维护Spark内部维护Kafka便宜了信息是存储在HasOffsetRanges类的offsetRanges中,我们可以在Spark Streaming程序里面获取这些信息:

val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

这样我们就可以获取所以分区消费信息,只需要遍历offsetsList,然后将这些信息发送到Zookeeper即可更新Kafka消费的偏移量。完整的代码片段如下:

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)

      messages.foreachRDD(rdd => {

        val offsetsList = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

        val kc = new KafkaCluster(kafkaParams)

        for (offsets < - offsetsList) {

          val topicAndPartition = TopicAndPartition("iteblog", offsets.partition)

          val o = kc.setConsumerOffsets(args(0), Map((topicAndPartition, offsets.untilOffset)))

          if (o.isLeft) {

            println(s"Error updating the offset to Kafka cluster: ${o.left.get}")

          }

        }

})

  KafkaCluster类用于建立和Kafka集群的链接相关的操作工具类,我们可以对Kafka中Topic的每个分区设置其相应的偏移量Map((topicAndPartition, offsets.untilOffset)),然后调用KafkaCluster类的setConsumerOffsets方法去更新Zookeeper里面的信息,这样我们就可以更新Kafka的偏移量,最后我们就可以通过KafkaOffsetMonitor之类软件去监控Kafka中相应Topic的消费信息,下图是KafkaOffsetMonitor的监控情况:



如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

  从图中我们可以看到KafkaOffsetMonitor监控软件已经可以监控到Kafka相关分区的消费情况,这对监控我们整个Spark Streaming程序来非常重要,因为我们可以任意时刻了解Spark读取速度。另外,KafkaCluster工具类的完整代码如下:

package org.apache.spark.streaming.kafka

 

import kafka.api.OffsetCommitRequest

import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}

import kafka.consumer.SimpleConsumer

import org.apache.spark.SparkException

import org.apache.spark.streaming.kafka.KafkaCluster.SimpleConsumerConfig

 

import scala.collection.mutable.ArrayBuffer

import scala.util.Random

import scala.util.control.NonFatal

 

/**

 * User: 过往记忆

 * Date: 2015-06-02

 * Time: 下午23:46

 * bolg: https://www.iteblog.com

 * 本文地址:https://www.iteblog.com/archives/1381

 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货

 * 过往记忆博客微信公共帐号:iteblog_hadoop

 */

 

class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {

  type Err = ArrayBuffer[Throwable]

 

  @transient private var _config: SimpleConsumerConfig = null

 

  def config: SimpleConsumerConfig = this.synchronized {

    if (_config == null) {

      _config = SimpleConsumerConfig(kafkaParams)

    }

    _config

  }

 

  def setConsumerOffsets(groupId: String,

                         offsets: Map[TopicAndPartition, Long]

                          ): Either[Err, Map[TopicAndPartition, Short]] = {

    setConsumerOffsetMetadata(groupId, offsets.map { kv =>

      kv._1 -> OffsetMetadataAndError(kv._2)

    })

  }

 

  def setConsumerOffsetMetadata(groupId: String,

                                metadata: Map[TopicAndPartition, OffsetMetadataAndError]

                                 ): Either[Err, Map[TopicAndPartition, Short]] = {

    var result = Map[TopicAndPartition, Short]()

    val req = OffsetCommitRequest(groupId, metadata)

    val errs = new Err

    val topicAndPartitions = metadata.keySet

    withBrokers(Random.shuffle(config.seedBrokers), errs) { consumer =>

      val resp = consumer.commitOffsets(req)

      val respMap = resp.requestInfo

      val needed = topicAndPartitions.diff(result.keySet)

      needed.foreach { tp: TopicAndPartition =>

        respMap.get(tp).foreach { err: Short =>

          if (err == ErrorMapping.NoError) {

            result += tp -> err

          } else {

            errs.append(ErrorMapping.exceptionFor(err))

          }

        }

      }

      if (result.keys.size == topicAndPartitions.size) {

        return Right(result)

      }

    }

    val missing = topicAndPartitions.diff(result.keySet)

    errs.append(new SparkException(s"Couldn't set offsets for ${missing}"))

    Left(errs)

  }

 

  private def withBrokers(brokers: Iterable[(String, Int)], errs: Err)

                         (fn: SimpleConsumer => Any): Unit = {

    brokers.foreach { hp =>

      var consumer: SimpleConsumer = null

      try {

        consumer = connect(hp._1, hp._2)

        fn(consumer)

      } catch {

        case NonFatal(e) =>

          errs.append(e)

      } finally {

        if (consumer != null) {

          consumer.close()

        }

      }

    }

  }

 

  def connect(host: String, port: Int): SimpleConsumer =

    new SimpleConsumer(host, port, config.socketTimeoutMs,

      config.socketReceiveBufferBytes, config.clientId)

}

本文转载自:https://www.iteblog.com/archives/1381.html

hblt-j
粉丝 24
博文 218
码字总数 73000
作品 0
海淀
架构师
私信 提问
kafka direct方式获取数据解析

---title: Spark Streaming基于kafka获取数据源码解析subtitle: kafka的direct方式获取数据description: kafkaRDD计算解析keywords: [spark,kafkaRDD,direct]date: 2019-01-08tags: [spark,k......

freeli
01/09
48
0
Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式

Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式,可以从代码中简单理解成Receiver方式是通过zookeeper来连接kafka队列,Direct方式是直接连接到kafka的节点上获取数据了。 ...

张欢19933
2017/06/19
162
0
ZooKeeper源码:ZooKeeper类分析

ZooKeeper版本:3.4.10。 一.创建ZooKeeper对象 ZooKeeper类是ZooKeeper客户端的实现,用来发送命令给ZooKeeper服务器。 ZooKeeper中可以设置Watcher,每个Watcher在节点状态发生变化的时候被...

Jacktanger
2018/09/01
173
0
Spark Streaming管理Kafka偏移量

前言 为了让Spark Streaming消费kafka的数据不丢数据,可以创建Kafka Direct DStream,由Spark Streaming自己管理offset,并不是存到zookeeper。启用Spark Streaming的 checkpoints是存储偏移...

架构师springboot
2018/12/11
205
0
ZooKeeper的Watcher机制

ZooKeeper 提供了分布式数据的发布/订阅功能。 在 ZooKeeper 中,引入了 Watcher 机制来实现这种分布式的通知功能。 ZooKeeper 允许客户端向服务端注册一个 Watcher 监听, 当服务器的一些特...

Java搬砖工程师
2018/11/19
297
0

没有更多内容

加载失败,请刷新页面

加载更多

最简单的获取相机拍照的图片

  import android.content.Intent;import android.graphics.Bitmap;import android.os.Bundle;import android.os.Environment;import android.provider.MediaStore;import andr......

MrLins
17分钟前
1
0
说好不哭!数据可视化深度干货,前端开发下一个涨薪点在这里~

随着互联网在各行各业的影响不断深入,数据规模越来越大,各企业也越来越重视数据的价值。作为一家专业的数据智能公司,个推从消息推送服务起家,经过多年的持续耕耘,积累沉淀了海量数据,在...

个推
18分钟前
4
0
第三方支付-返回与回调注意事项

不管是支付宝,微信,还是其它第三方支付,第四方支付,支付机构服务商只要涉及到钱的交易都要进行如下校验,全部成功了才视为成功订单 1.http请求是否成功 2.校验商户号 3.校验订单号及状态...

Shingfi
21分钟前
3
0
简述Java内存分配和回收策略以及Minor GC 和 Major GC(Full GC)

内存分配: 1. 栈区:栈可分为Java虚拟机和本地方法栈 2. 堆区:堆被所有线程共享,在虚拟机启动时创建,是唯一的目的是存放对象实例,是gc的主要区域。通常可分为两个区块年轻代和年老代。更...

DustinChan
27分钟前
4
0
Excel插入批注:可在批注插入文字、形状、图片

1.批注一直显示:审阅选项卡-------->勾选显示批注选项: 2.插入批注快捷键:Shift+F2 组合键 3.在批注中插入图片:鼠标右键点击批注框的小圆点【重点不可以在批注文本框内点击】----->调出批...

东方墨天
50分钟前
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部