Kafka Streams Core Concepts

2017/08/20 18:22
阅读数 112

Core Concepts

Kafka Streams is a client library for processing and analyzing data stored in Kafka. It builds upon important stream processing concepts such as properly distinguishing between event time and processing time, windowing support, and simple yet efficient management of application state.

Kafka Streams是一套用来处理和分析Kafka中数据的客户端类库。它构建在一些重要的流处理概念之上:区分事件时间和处理时间、开窗的支持、简单有效的状态管理等。

Kafka Streams has a low barrier to entry: You can quickly write and run a small-scale proof-of-concept on a single machine; and you only need to run additional instances of your application on multiple machines to scale up to high-volume production workloads. Kafka Streams transparently handles the load balancing of multiple instances of the same application by leveraging Kafka's parallelism model.

Kafka Streams入门的门槛很低:很容易在单机上编写并运行(小规模、弹性概念)的示例程序,然后通过在多台机器上运行多个实例即可水平扩展从而达到高吞吐量。Kafka Streams利用Kafka的并发模型以实现透明的(显而易见的)负载均衡。

Some highlights of Kafka Streams:

Kafka Streams中的一些亮点:

  • Designed as a simple and lightweight client library, which can be easily embedded in any Java application and integrated with any existing packaging, deployment and operational tools that users have for their streaming applications.设计成简单和轻量级的客户端类库,可以轻松的整合在任何Java应用中,并集成用户已有的packaging、部署和操作工具中。
  • Has no external dependencies on systems other than Apache Kafka itself as the internal messaging layer; notably, it uses Kafka's partitioning model to horizontally scale processing while maintaining strong ordering guarantees.除了依赖Kafka本身,没有其他外部系统依赖,使用Kafka的分区模型来水平扩展并保证有序处理
  • Supports fault-tolerant local state, which enables very fast and efficient stateful operations like windowed joins and aggregations.支持容错的本地状态,这使得可以快速有效的进行状态操作,如窗口关联和聚合。
  • Supports exactly-once processing semantics to guarantee that each record will be processed once and only once even when there is a failure on either Streams clients or Kafka brokers in the middle of processing.支持exactly-once的处理语义,来保证在数据处理中,即使Streams客户端或者Kafka brokers出现失败,也能确保数据处理一次且只处理一次
  • Employs one-record-at-a-time processing to achieve millisecond processing latency, and supports event-time based windowing operations with late arrival of records.支持一次处理一条记录的处理方式以实现millisecond低延迟,也支持基于事件时间的窗口操作。
  • Offers necessary stream processing primitives, along with a high-level Streams DSL and a low-level Processor API.提供了两套流处理原语:high-level Streams DSL 和low-level Processor API

We first summarize the key concepts of Kafka Streams.

我们先总结一下Kafka Streams中的核心概念

Stream Processing Topology

  • stream is the most important abstraction provided by Kafka Streams: it represents an unbounded, continuously updating data set. A stream is an ordered, replayable, and fault-tolerant sequence of immutable data records, where a data record is defined as a key-value pair.在Kafka Streams中,stream是最重要的抽象概念;它代表了一个无边界的、持续更新的数据集。一个stream是一种有序的、可重放的、容错的、不可变的数据记录序列,而且data record定义为一个键值对。
  • stream processing application is any program that makes use of the Kafka Streams library. It defines its computational logic through one or more processor topologies, where a processor topology is a graph of stream processors (nodes) that are connected by streams (edges).一个stream processing application是任何使用了Kafka Streams library的程序,通过一或多个“processor topologies”来定义其计算逻辑,一个processor topology就是一张以流处理器为(节点)和流(边)构成的图。
  • stream processor is a node in the processor topology; it represents a processing step to transform data in streams by receiving one input record at a time from its upstream processors in the topology, applying its operation to it, and may subsequently produce one or more output records to its downstream processors.“stream processor”是processor topology中的节点,它表示一个转换数据的流处理步骤,从上游处理器中一次接收一条输入记录,对这条记录进行操作,并且可能随后将一或多条记录输出到下游处理器。

There are two special processors in the topology:

  • Source Processor: A source processor is a special type of stream processor that does not have any upstream processors. It produces an input stream to its topology from one or multiple Kafka topics by consuming records from these topics and forward them to its down-stream processors.Source Processor: source处理器一种特殊类型的流处理器,它为从一个或多个Kafka topics中通过消费记录来为topology生产输入流,将这些记录发送到下游处理器中。
  • Sink Processor: A sink processor is a special type of stream processor that does not have down-stream processors. It sends any received records from its up-stream processors to a specified Kafka topic.Sink Processor:sink处理器是一种特殊类型的流处理器。它将冲上游处理器中接收到的记录发送到指定的Kafka topic中。

Note that in normal processor nodes other remote systems can also be accessed while processing the current record. Therefore the processed results can either be streamed back into Kafka or written to an external system.




Kafka Streams offers two ways to define the stream processing topology: the Kafka Streams DSL provides the most common data transformation operations such as mapfilterjoin and aggregations out of the box; the lower-level Processor API allows developers define and connect custom processors as well as to interact with state stores.

Kafka Streams提供了两种定义流处理拓扑的方式:Kafka Streams DSL提供最常用的数据转换操作,如map、filter、join和aggregations; lower-level Processor API允许开发者定义和连接通常的处理器,并与“状态仓库”交互。

A processor topology is merely a logical abstraction for your stream processing code. At runtime, the logical topology is instantiated and replicated inside the application for parallel processing (see Stream Partitions and Tasks for details).



A critical aspect in stream processing is the notion of time, and how it is modeled and integrated. For example, some operations such as windowing are defined based on time boundaries.


Common notions of time in streams are:

  • Event time - The point in time when an event or data record occurred, i.e. was originally created "at the source". Example: If the event is a geo-location change reported by a GPS sensor in a car, then the associated event-time would be the time when the GPS sensor captured the location change.事件或数据记录发生的时间点,通常是源头产生。
  • Processing time - The point in time when the event or data record happens to be processed by the stream processing application, i.e. when the record is being consumed. The processing time may be milliseconds, hours, or days etc. later than the original event time. Example: Imagine an analytics application that reads and processes the geo-location data reported from car sensors to present it to a fleet management dashboard. Here, processing-time in the analytics application might be milliseconds or seconds (e.g. for real-time pipelines based on Apache Kafka and Kafka Streams) or hours (e.g. for batch pipelines based on Apache Hadoop or Apache Spark) after event-time.处理时间:事件或数据记录被流处理程序开始处理时的时间点,比如当记录开始被消费。处理时间可能比事件时间晚几毫秒、几个小时或几天不等。
  • Ingestion time - The point in time when an event or data record is stored in a topic partition by a Kafka broker. The difference to event time is that this ingestion timestamp is generated when the record is appended to the target topic by the Kafka broker, not when the record is created "at the source". The difference to processing time is that processing time is when the stream processing application processes the record. For example, if a record is never processed, there is no notion of processing time for it, but it still has an ingestion time.获取时间:事件或数据记录被存储在Kafka broker的某个topic partition中的时间点。与事件时间不同的是,获取时间是记录被添加到kafka broker的指定topic的时间点,而不是源头生成记录的时间。与处理时间不同的是,处理时间是记录被流处理程序处理时的时间点。

The choice between event-time and ingestion-time is actually done through the configuration of Kafka (not Kafka Streams): From Kafka 0.10.x onwards, timestamps are automatically embedded into Kafka messages. Depending on Kafka's configuration these timestamps represent event-time or ingestion-time. The respective Kafka configuration setting can be specified on the broker level or per topic. The default timestamp extractor in Kafka Streams will retrieve these embedded timestamps as-is. Hence, the effective time semantics of your application depend on the effective Kafka configuration for these embedded timestamps.

在时间时间和获取时间之间的选择实际上是Kafka的配置(不是Kafka Streams):从Kafka 0.10.x之后,timestamps被自动内置添加到kafka消息中。基于Kafka的配置,这些timestamps显示event-time或者ingestion-time。各自Kafka配置可以在broker级别或者topic级别进行设置。Kafka Streams的默认timestamp抽取器,将接收这些内置的时间戳。因此,你的程序中有效时间语义依赖于kafka中关于内置时间戳的有效配置。

Kafka Streams assigns a timestamp to every data record via the TimestampExtractor interface. Concrete implementations of this interface may retrieve or compute timestamps based on the actual contents of data records such as an embedded timestamp field to provide event-time semantics, or use any other approach such as returning the current wall-clock time at the time of processing, thereby yielding processing-time semantics to stream processing applications. Developers can thus enforce different notions of time depending on their business needs. For example, per-record timestamps describe the progress of a stream with regards to time (although records may be out-of-order within the stream) and are leveraged by time-dependent operations such as joins.

Kafka Streams通过TimestampExtractor 接口为每条数据记录指定时间戳。这个接口的具体实现可以检索或计算数据记录中的实际内容,比如嵌入时间戳字段来提供事件时间语义,或使用任何其他的方法例如加工时返回当前时钟时间,从而产生流处理程序的处理时间语义。因此,开发人员可以根据业务需求执行不同的时间概念。例如,每个记录时间戳的描述关于时间的流处理(虽然记录可能是无序的),同样可以通过类似join之类的时间依赖操作进行改变。

Finally, whenever a Kafka Streams application writes records to Kafka, then it will also assign timestamps to these new records. The way the timestamps are assigned depends on the context:


  • When new output records are generated via processing some input record, for example, context.forward() triggered in the process() function call, output record timestamps are inherited from input record timestamps directly.
  • When new output records are generated via periodic functions such as punctuate(), the output record timestamp is defined as the current internal time (obtained through context.timestamp()) of the stream task.
  • For aggregations, the timestamp of a resulting aggregate update record will be that of the latest arrived input record that triggered the update.


Some stream processing applications don't require state, which means the processing of a message is independent from the processing of all other messages. However, being able to maintain state opens up many possibilities for sophisticated stream processing applications: you can join input streams, or group and aggregate data records. Many such stateful operators are provided by the Kafka Streams DSL.

一些流处理程序不需要状态管理,这意味着消息之间的处理独立的。如果可以管理状态的话,可以为很多复杂的流处理应用提供很多可能性:如在流中连接、分组或聚合数据记录等。大量有状态的操作方法在Kafka Streams DSL中提供。

Kafka Streams provides so-called state stores, which can be used by stream processing applications to store and query data. This is an important capability when implementing stateful operations. Every task in Kafka Streams embeds one or more state stores that can be accessed via APIs to store and query data required for processing. These state stores can either be a persistent key-value store, an in-memory hashmap, or another convenient data structure. Kafka Streams offers fault-tolerance and automatic recovery for local state stores.

Kafka Streams提供了所谓的状态存储,流处理应用可以用它来存储和查询数据。在时间状态操作时,这是一个很重要的功能。Kafka Streams中每个任务添加一个或多个状态存储,可以通过APIs来进行存储和查询数据处理。这些状态存储可以使用持久化的key-value存储,一个in-memory hashmap,或者其他的合适的数据结构。Kafka Streams提供了本地状态存储的容错和自动恢复。

Kafka Streams allows direct read-only queries of the state stores by methods, threads, processes or applications external to the stream processing application that created the state stores. This is provided through a feature called Interactive Queries. All stores are named and Interactive Queries exposes only the read operations of the underlying implementation.

Kafka Streams允许通过指向流处理程序的方法、线程、进程或外部应用程序对状态存储进行直接只读查询。这是通过一个名为 Interactive Queries的特性提供的。所有存储都被命名,交互式查询只公开底层实现的读取操作。


In stream processing, one of the most frequently asked question is "does my stream processing system guarantee that each record is processed once and only once, even if some failures are encountered in the middle of processing?" Failing to guarantee exactly-once stream processing is a deal-breaker for many applications that cannot tolerate any data-loss or data duplicates, and in that case a batch-oriented framework is usually used in addition to the stream processing pipeline, known as the Lambda Architecture. Prior to, Kafka only provides at-least-once delivery guarantees and hence any stream processing systems that leverage it as the backend storage could not guarantee end-to-end exactly-once semantics. In fact, even for those stream processing systems that claim to support exactly-once processing, as long as they are reading from / writing to Kafka as the source / sink, their applications cannot actually guarantee that no duplicates will be generated throughout the pipeline. Since the release, Kafka has added support to allow its producers to send messages to different topic partitions in a transactional and idempotent manner, and Kafka Streams has hence added the end-to-end exactly-once processing semantics by leveraging these features. More specifically, it guarantees that for any record read from the source Kafka topics, its processing results will be reflected exactly once in the output Kafka topic as well as in the state stores for stateful operations. Note the key difference between Kafka Streams end-to-end exactly-once guarantee with other stream processing frameworks' claimed guarantees is that Kafka Streams tightly integrates with the underlying Kafka storage system and ensure that commits on the input topic offsets, updates on the state stores, and writes to the output topics will be completed atomically instead of treating Kafka as an external system that may have side-effects. To read more details on how this is done inside Kafka Streams, readers are recommended to read KIP-129. In order to achieve exactly-once semantics when running Kafka Streams applications, users can simply set the processing.guarantee config value to exactly_once (default value is at_least_once). More details can be found in the Kafka Streams Configs section.

1 收藏
0 评论
1 收藏