文档章节

Alpakka Kafka,反应式Kafka客户端

羊八井
 羊八井
发布于 02/24 00:11
字数 1036
阅读 16
收藏 0

Alpakka Kafka 是一个要用于 Java 和 Scala 语言的开源的流感知和反应式集成数据线项目。它建立在 Akka Stream之上,提供了 DSL 来支持反应式和流式编程,内置回压功能。Akka Streams 是 Reactive Streams 和JDK 9+ java.util.concurrent.Flow 的兼容实现,可无缝地与其进行互操作。

要使用 Alpakka Kafka,需要在你的项目添加如下依赖:

libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "1.0-RC2"

当前支持 kafka-clients 2.1.x 和 Akka Streams 2.5.21。

快速开始

对Akka Streams或Kafka不熟的,可先查阅两者的官方文档:

Alpakka Kafka 写的代码非常精致且简洁,也许你会一眼爱上它的美。

object KafkaGetting extends App {
  implicit val system = ActorSystem()
  implicit val mat = ActorMaterializer()
  import system.dispatcher
  val config = system.settings.config

  val producerSettings = 
    ProducerSettings(config.getConfig("akka.kafka.producer"),
      new StringSerializer, new StringSerializer)

  val consumerSettings = 
    ConsumerSettings(config.getConfig("akka.kafka.consumer"), 
      new StringDeserializer, new StringDeserializer)

  val producerQueue = Source
    .queue[String](128, OverflowStrategy.fail)
    .map(str => new ProducerRecord[String, String]("test", str))
    .toMat(Producer.plainSink(producerSettings))(Keep.left)
    .run()

  val consumerControl = Consumer
    .plainSource(consumerSettings, Subscriptions.topics("test"))
    .map(record => record.value())
    .toMat(Sink.foreach(value => println(value)))(Keep.left)
    .run()

  Source(1 to 10)
    .map(_.toString)
    .throttle(1, 2.seconds)
    .runForeach(message => producerQueue.offer(message))
    .onComplete(tryValue => println(s"producer send over, return $tryValue"))

  println("Press 'enter' key exit.")
  StdIn.readLine()
  producerQueue.complete()
  consumerControl.shutdown()
  system.terminate()
  Await.result(system.whenTerminated, 10.seconds)
}

上面的代码实现了一个完整的Kafka生产者、消费者数据处理流程,整个处理都是异步、非阻塞的。没有显示线程创建、没有类似 where(true) 这样的消费处理循环……接下来,让我们分析下以上代码。

代码分析

producerSettings

Alpakka Kafka 使用ProducerSettings来封装创建Kafka生产者时需要的参数,它使用了 Typesafe Config 通过可配置的方式来构建生产者。

producerSettings 使用 "akka.kafka.producer" 部分的参数来构造 Kafka 生产者,以下是一个示例的 Typesafe Config 配置:

akka.kafka.producer {
  # 同时可运行的send操作数量
  parallelism = 100

  # 调用 `KafkaProducer.close` 时等待关闭的时间
  close-timeout = 60s
  
  # 线程池
  use-dispatcher = "akka.kafka.default-dispatcher"

  # 定义 org.apache.kafka.clients.producer.ProducerConfig 属性需要的参数
  kafka-clients {
    # 使用英文逗号分隔多个Kafka服务地址
    bootstrap.servers = "localhost:9092"
  }
}

consumerSettings

consumerSettings 使用 "akka.kafka.consumer" 部分的参数来构造 Kafka 消费者,以下是一个示例的 Typesafe Config 配置:

akka.kafka.consumer {
  # 拉取数据间隔周期
  poll-interval = 50ms
  
  # 拉取数据超时时间
  poll-timeout = 30s

  # 调用 `KafkaConsumer.close` 时等待关闭的时间
  close-timeout = 20s
  
  # 线程池
  use-dispatcher = "akka.kafka.default-dispatcher"

  # 定义 org.apache.kafka.clients.producer.ProducerConfig 属性需要的参数
  kafka-clients {
    # 使用英文逗号分隔多个Kafka服务地址
    bootstrap.servers = "localhost:9092"

    # 自动commit消息
    enable.auto.commit = true

    # 消费者组ID
    group.id = "resource-dev"

    # 从最新的offset开始读取消息,否则从头开始读取
    auto.offset.reset = "earliest"
  }
}

producerQueue

使用Akka Streams构造一个生产者队列 producerQueue,再由 Producer.plainSink 来消费发送到 producerQueue 里的消息。需要注意的是构造 Source.queue[String] 时设置的 128 这个参数并不是 Kafka 的消息队列容量,而是 Akka Streams Source 构造出来的一个Queue。Producer.plainSink 是一个 下游 ,它消费来自 producerQueue 这个上游的消息,再将数据发送到 Kafka 服务。

consumerControl

通过 Consumer 这个Akka Streams Source构造了一个Kafka消费者,并监听指定的 "test" 主题。consumerControl 流首先从收到的每个消息(ConsumerRecord)中取得 value,并发送到下游,下游通过 Sink.foreach 接收数据并打印到终端。

Source(1 to 10)

生成从1到10的字符串消息值,并每隔2秒通过 producerQueue 发送一个消息到Kafka。

小结

本文通过一个简单的示例展现怎样通过 Alpakka Kafka 来实现对 Kafka 的集成,完成的代码示例见:https://github.com/ihongka/akka-fusion/blob/master/fusion-kafka/src/test/scala/fusion/kafka/getting/KafkaGetting.scala

Kafka发展到现在,已不单单再是一个消息系统了,在MQ之外,它还提供了KSQL和Connector特性。应用基于 Kafka 可以有更多的设计和实现,而Akka Stream + Kafka是一个强大的组合,接下来我会写一系列文章介绍怎样使用 Alpakka Kafka 来基于 Kafka 进行应用和架构设计。

© 著作权归作者所有

共有 人打赏支持
羊八井

羊八井

粉丝 93
博文 41
码字总数 51095
作品 0
朝阳
技术主管
私信 提问
E-MapReduce上如何采集Kafka客户端Metrics

1. 背景 我们知道Kafka提供一套非常完善的Metrics数据,覆盖Broker,Consumer,Producer,Stream以及Connect。E-MapReduce通过Ganglia收集了Kafka Broker metrics信息,可以很好地监控Broke...

鱼跟猫
2018/08/09
0
0
Kafka安全机制解析及重构(一)

许久不写文章,主要是心情不佳,而且恰逢项目方向变动,学习和研究都放下了不少。最近开始主攻Kafka,在了解Kafka的功能后,要把Kafka构建成一个企业级的消息平台,首先需要完善的就是安全机...

MisterCH
2018/07/27
0
0
消息中间件(Kafka/RabbitMQ)收录集

本篇主要整理工作中遇到的一些消息中间件的相关知识,包括Kafka, RabbitMQ, RocketMQ, ActiveMQ等,不排除收录其他消息中间件的可能。 这里会持续收录相关知识,包括安装、部署、使用示例、监...

u013256816
2017/01/26
0
0
kafka设置外网消费者

kafka 的默认配置比较简单,但想把其端口暴露给外网(指定端口),则有一些额外的注意情况 kafka 的版本,0.8和0.10 的配置不同,这里以0.10.0.0 为主, 没有用集群,一台机做测试 zookeeper安...

mingover
2017/06/22
0
0
Nginx集成Kafka

xupan002 #librdkafka 是一个C实现的高性能 Apache Kafka 客户端,为生产环境提供了一个可靠和高性能的客户端。 librdkafka 同样也提供了传统的 C++ 接口。 性能 librdkafka 是一个基于现代硬...

xpttxsok
2017/12/25
11
0

没有更多内容

加载失败,请刷新页面

加载更多

RadosClient OSDC

RadosClient.h class librados::RadosClient : public Dispatcher//继承自Dispatcher(消息分发类){public: using Dispatcher::cct; md_config_t *conf;//配置文件private: ......

banwh
30分钟前
0
0
如果让你写一个消息队列,该如何进行架构设计?

面试题 如果让你写一个消息队列,该如何进行架构设计?说一下你的思路。 面试官心理分析 其实聊到这个问题,一般面试官要考察两块: 你有没有对某一个消息队列做过较为深入的原理的了解,或者...

李红欧巴
今天
4
0
错题

无知的小狼
今天
2
0
PowerShell因为在此系统中禁止执行脚本的解决方法

参考:window系统包管理工具--chocolatey 报错提示: & : 无法加载文件 C:\Users\liuzidong\AppData\Local\Temp\chocolatey\chocInstall\tools\chocolateyInstall.ps1,因为在此系统上禁止运...

近在咫尺远在天涯
今天
3
0
TP5 跨域请求处理

https://blog.csdn.net/a593706205/article/details/81774987 https://blog.csdn.net/wyk9916/article/details/82315700...

15834278076
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部