Low-level Processor API(翻译)
博客专区 > Yulong_ 的博客 > 博客详情
Low-level Processor API(翻译)
Yulong_ 发表于3个月前
Low-level Processor API(翻译)
  • 发表于 3个月前
  • 阅读 0
  • 收藏 0
  • 点赞 0
  • 评论 0

华为云·免费上云实践>>>   

Processor

Stream processor 是processor topology中的一个节点。 Processor API developers定义任意的流处理器,这些流处理器中包括接收数据的处理器、以及与状态存储相关的处理器,共同组成处理器拓扑,来呈现自定义的处理逻辑。

Processor 接口提供了两个主要的API方法:process和punctuate。process方法是每次在接收数据上的操作;punctuate方法是周期性地对数据进行操作。而且处理能够保持init方法初始化的ProcessorContext实例变量,使用context来调度punctuation周期(context().schedule),指定输出到下游处理器的的key/value对(context().forward),提交当前处理进展(context().commit)。

下面是一个简单wordcount的Processor实现:

public class MyProcessor implements Processor<String, String> {
private ProcessorContext context;
private KeyValueStore<String, Long> kvStore;
 
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {//初始化
// keep the processor context locally because we need it in punctuate() and commit()
this.context = context;//context对象
 
// call this processor's punctuate() method every 1000 milliseconds.//定义punctuate处理周期为1000milliseconds,每次进行Count计算的周期
this.context.schedule(1000);
 
// retrieve the key-value store named "Counts"//Counts的状态存储,key/value
this.kvStore = (KeyValueStore<String, Long>) context.getStateStore("Counts");
}
 
@Override
public void process(String dummy, String line) {//对接收的每条数据进行处理,参数dummy为接收数据的key,参数line为接收数据的value
String[] words = line.toLowerCase().split(" ");//切分line,获得words
 
for (String word : words) {
    Long oldValue = this.kvStore.get(word);//根据word为key获取状态存储的value
 
    if (oldValue == null) {
        this.kvStore.put(word, 1L);//如果key在状态存储中不存在则,作为第一条key插入,put=1
    } else {
        this.kvStore.put(word, oldValue + 1L);//如果key在状态存储中存在则,key+1,put+1
    }
}
}
 
@Override
public void punctuate(long timestamp) {//周期性的对状态数据进行处理
KeyValueIterator<String, Long> iter = this.kvStore.all();

while (iter.hasNext()) {//迭代器对状态数据中的进行输出
    KeyValue<String, Long> entry = iter.next();
    context.forward(entry.key, entry.value.toString());//forward
}
 
iter.close();
// commit the current processing progress
context.commit();//提交这阶段的操作
}
 
@Override
public void close() {//close方法
// close any resources managed by this processor.
// Note: Do not close any StateStores as these are managed
// by the library
}
};

 

In the above implementation, the following actions are performed:

 

  • In the init method, schedule the punctuation every 1 second and retrieve the local state store by its name "Counts".init方法中,每秒调度punctuation方法一次,检索名为Counts的本地状态存储。
  • In the process method, upon each received record, split the value string into words, and update their counts into the state store (we will talk about this feature later in the section).在process方法中,在每次接收到的记录中,切分value字符串为words,更新words在状态存储中的counts。
  • In the punctuate method, iterate the local state store and send the aggregated counts to the downstream processor, and commit the current stream state.在punctuate方法中,本地存储的迭代器,将聚合counts发送到下游处理器,并且提交本地stream状态。
     

Processor Topology

With the customized processors defined in the Processor API, developers can use the TopologyBuilder to build a processor topology by connecting these processors together:

在Processor API中使用自定义processor时,开发者可以使用TopologyBuilder通过连接这些processor到一起来创建processor topology:

TopologyBuilder builder = new TopologyBuilder();
 
builder.addSource("SOURCE", "src-topic")
// add "PROCESS1" node which takes the source processor "SOURCE" as its upstream processor
.addProcessor("PROCESS1", () -> new MyProcessor1(), "SOURCE")
 
// add "PROCESS2" node which takes "PROCESS1" as its upstream processor
.addProcessor("PROCESS2", () -> new MyProcessor2(), "PROCESS1")
 
// add "PROCESS3" node which takes "PROCESS1" as its upstream processor
.addProcessor("PROCESS3", () -> new MyProcessor3(), "PROCESS1")
 
// add the sink processor node "SINK1" that takes Kafka topic "sink-topic1"
// as output and the "PROCESS1" node as its upstream processor
.addSink("SINK1", "sink-topic1", "PROCESS1")
 
// add the sink processor node "SINK2" that takes Kafka topic "sink-topic2"
// as output and the "PROCESS2" node as its upstream processor
.addSink("SINK2", "sink-topic2", "PROCESS2")
 
// add the sink processor node "SINK3" that takes Kafka topic "sink-topic3"
// as output and the "PROCESS3" node as its upstream processor
.addSink("SINK3", "sink-topic3", "PROCESS3");

 

There are several steps in the above code to build the topology, and here is a quick walk through:

  • First of all a source node named "SOURCE" is added to the topology using the addSource method, with one Kafka topic "src-topic" fed to it.最前面的节点为SOURCE节点,使用addSource方法来定义,指定名为src-topic的topic
  • Three processor nodes are then added using the addProcessor method; here the first processor is a child of the "SOURCE" node, but is the parent of the other two processors.然后添加了三个processor节点,使用addProcessor方法来添加processor节点
  • Finally three sink nodes are added to complete the topology using the addSink method, each piping from a different parent processor node and writing to a separate topic.最终添加了三个sink节点,使用addSink方法来添加sink节点,每个piping来自不同的父处理器节点并写入一个单独的topic。

 

State Stores

Note that the Processor API is not limited to only accessing the current records as they arrive in the process() method, but can also maintain processing states that keep recently arrived records to use in stateful processing operations such as windowed joins or aggregation. To take advantage of these states, users can define a state store by implementing the StateStore interface (the Kafka Streams library also has a few extended interfaces such as KeyValueStore); in practice, though, users usually do not need to customize such a state store from scratch but can simply use the Stores factory to define a state store by specifying whether it should be persistent, log-backed, etc. In the following example, a persistent key-value store named "Counts" with key type String and value type Long is created.

注意Processor API不但能通过process方法接收数据,也能保存这些接收记录的执行状态,并用于诸如windowed joins或者aggregation之类的状态执行操作。为了使用这些状态,用户通过实现StateStore接口来定义一个状态存储(Kafka Streams library中也包含一些可继承接口如KeyValueStore);实践中,尽管用户不需要自定义一个状态存储,但是可以简易地使用Stores工厂来定义一个状态存储,并指定它是否需要持久化、日志备份。在下面的例子中,持久化key-value存储名为“Counts”,key类型为String,value类型为Long。

StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();

To take advantage of these state stores, developers can use the TopologyBuilder.addStateStore method when building the processor topology to create the local state and associate it with the processor nodes that needs to access it; or they can connect a created state store with the existing processor nodes throughTopologyBuilder.connectProcessorAndStateStores.

利用这些状态存储,开发者能够使用TopologyBuilder.addStateStore方法来建立processor拓扑和本地存储之间的联系;或者使用TopologyBuilder.connectProcessorAndStateStores方法来连接一个已经创建的状态存储和已存在的processor节点。

TopologyBuilder builder = new TopologyBuilder();
 
builder.addSource("SOURCE", "src-topic")
 
.addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
// add the created state store "COUNTS" associated with processor "PROCESS1"
.addStateStore(countStore, "PROCESS1")
.addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
.addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
 
// connect the state store "COUNTS" with processor "PROCESS2"
.connectProcessorAndStateStores("PROCESS2", "COUNTS");
 
.addSink("SINK1", "sink-topic1", "PROCESS1")
.addSink("SINK2", "sink-topic2", "PROCESS2")
.addSink("SINK3", "sink-topic3", "PROCESS3");

 

共有 人打赏支持
粉丝 9
博文 79
码字总数 169741
×
Yulong_
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: