Time Semantics
博客专区 > Yulong_ 的博客 > 博客详情
Time Semantics
Yulong_ 发表于4个月前
Time Semantics
  • 发表于 4个月前
  • 阅读 8
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 新注册用户 域名抢购1元起>>>   

Core Concepts中介绍了三种语义Event time、Processing-time、Ingestion time。

这里需要注意的是:punctuate方法必须使用新数据才能触发。

时间语义设置

如何实现不同的时间语义主要取决于两个方面:

message timestamp类型

在0.10.x版本之后timestamps自动嵌入Kafka messages中。基于Kafka配置,这些timestamp代表event-time或者ingestion-time。参数如下,broker参数为log.message.timestamp.type,topic参数为message.timestamp.type。

log.message.timestamp.type:定义message的timestamp是create time或者log append time。参数值为CreateTime 或LogAppendTime。默认值是CreateTime。broker的参数配置。

message.timestamp.type:定义message的timestamp是create time或者log append time。参数值为CreateTime 或LogAppendTime。默认值是CreateTime。topic创建时参数配置,不设置则使用log.message.timestamp.type。

TimestampExtractor类型

接口TimestampExractor分为两类:

WallclockTimestampExtractor

WallclockTimestampExtractor提供了processing-time语义,只提供了一个方法extract。
  • extract方法获取时间为当前系统时间(System.currentTimeMillis())

ExtractRecordMetadataTimestamp

ExtractRecordMetadataTimestamp为抽象类及其子类提供了非processing-time语义。提供了两个方法extract和onInvalidTimestamp。
  • extract方法获取的是message的timestamp值。与message timestamp类型共同作用来提供event-time或者ingestion-time语义。
  • onInvalidTimestamp抽象方法需实现,主要是当遇到无效的message timestamp时如何进行处理。
ExtractRecordMetadataTimestamp包含如下子类:
  • FailOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,extractor会抛出异常。
  • LogAndSkipOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,将此记录在WARN日志中打印,依然返回当前的message timestamp值,最终会在数据处理时会导致忽略处理这条记录。
  • UsePreviousTimeOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,将上一条有效记录的timestamp作为当前记录的timestamp值。

自定义TimestampExtractor

可以自定义实现TimestampExtractor接口,使用提取器提取记录中的部分数据作为返回,这样可以灵活设置语义。

一般情况下,我们使用producer发送message到kafka集群时,可以指定message timestamp来设置(也就是event-time),但是如果message timestamp并不是我们需要的,那么就需要自定义提取器来提取message的某个field。

总结

通过如下表格可见:

当使用WallclockTimestampExtractor提供processing-time语义。

当ExtractRecordMetadataTimestamp子类与CreateTime类型一起时,提供event-time语义。

当ExtractRecordMetadataTimestamp子类与LogAppendTime类型一起时,提供ingestion-time语义。

自定义实现TimestampExtractor接口,提供自定义time语义。

语义类型 message timestamp TimestampExractor
processing-time
 
WallclockTimestampExtractor
event-time
CreateTime
ExtractRecordMetadataTimestamp子类
ingestion-time
LogAppendTime
ExtractRecordMetadataTimestamp子类
自定义语义   自己实现TimestampExtractor

processing-time

事件或数据记录被流处理程序开始处理时的时间点,晚于event-time和ingestion-time

这里需要注意的是:punctuate方法必须使用新数据才能触发。

比如在process-time语义中,设置context.schedule(5000),程序执行时间为20秒,在0到5秒有数据,第一条数据到来会触发第一次punctuate:

如果5秒之后不再获取任何新数据(因为程序我们设置执行时间为20秒)则永远不会触发punctuate;

如果5秒之后才获取一条新数据,则同时触发(0-5)一次punctuate;

如果10秒之后才获取一条新数据,则同时触发(0-5),(5-10)两次punctuate;

如果15秒之后才获取一条新数据,则同时触发(0-5),(5-10),(10-15)三次punctuate;

在processing-time语义下我们需要如下配置:

props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.WallclockTimestampExtractor");//timestamp.extractor

 

import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import java.io.File;
import java.util.Locale;
import java.util.Properties;
/**
 * Demonstrates, using the low-level Processor APIs, how to implement the
 * WordCount program that computes a simple word occurrence histogram from an
 * input text.
 *
 * In this example, the input stream reads from a topic named
 * "streams-file-input", where the values of messages represent lines of text;
 * and the histogram output is written to topic
 * "streams-wordcount-processor-output" where each record is an updated count of
 * a single word.
 *
 * Before running this example you must create the input topic and the output
 * topic (e.g. via bin/kafka-topics.sh --create ...), and write some data to the
 * input topic (e.g. via bin/kafka-console-producer.sh). Otherwise you won't see
 * any data arriving in the output topic.
 */
public class WordCountProcessorDemo {
	private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
		@Override
		public Processor<String, String> get() {
			return new Processor<String, String>() {
				private ProcessorContext context;
				private KeyValueStore<String, Integer> kvStore;
				@Override
				@SuppressWarnings("unchecked")
				public void init(ProcessorContext context) {
					this.context = context;
					this.context.schedule(5000);
					this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
				}
				@Override
				public void process(String dummy, String line) {
					String[] words = line.toLowerCase(Locale.getDefault()).split(" ");
					System.out.println("line:" + line);
					for (String word : words) {
						Integer oldValue = this.kvStore.get(word);
						if (oldValue == null) {
							this.kvStore.put(word, 1);
						} else {
							this.kvStore.put(word, oldValue + 1);
						}
					}
					context.commit();
				}
				@Override
				public void punctuate(long timestamp) {
					try (KeyValueIterator<String, Integer> iter = this.kvStore.all()) {
						System.out.println("----------- " + timestamp + "----------- ");
						//System.out.println(TimestampUtil.TimestampFormat(timestamp));
						while (iter.hasNext()) {
							KeyValue<String, Integer> entry = iter.next();
							System.out.println("[" + entry.key + ", " + entry.value + "]");
							context.forward(entry.key, entry.value.toString());
						}
					}
				}
				@Override
				public void close() {
				}
			};
		}
	}
	public static void main(String[] args) throws Exception {
		Properties props = new Properties();
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "breath:9092");
		props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
		props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.WallclockTimestampExtractor");
		TopologyBuilder builder = new TopologyBuilder();
		builder.addSource("Source", "streams-file-input");
		builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
		builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(),
				"Process");
		builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
		KafkaStreams streams = new KafkaStreams(builder, props);
		streams.start();
		Thread.sleep(20000L);
		streams.close();
		// 清空state store
		String appStateDir = props.get(StreamsConfig.STATE_DIR_CONFIG) + System.getProperty("file.separator")
				+ props.get(StreamsConfig.APPLICATION_ID_CONFIG);
		FileUtils.deleteDirectory(new File(appStateDir));
		// 清空application相关中间topic以及input topic的offset
		String kafkaHome = System.getenv("KAFKA_HOME");
		Runtime runtime = Runtime.getRuntime();
		runtime.exec(kafkaHome + "/bin/kafka-streams-application-reset.sh " + "--application-id "
				+ props.get(StreamsConfig.APPLICATION_ID_CONFIG) + " " + "--bootstrap-servers breath:9092 "
				+ "--zookeeper breath:2181/kafka01021 " + "--input-topics streams-file-input");
	}
}


event-time

事件或数据记录发生的时间点,通常是源头产生。早于processing-time和ingestion-time

event-time的timestamp生成方式有两种

  • 用户可以指定timestamp,在producer.send方法中发送的ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)进行设置。
  • 用户不指定timestamp,则producer.send当时的时间作为timestamp。

在event-time语义下我们需要保证topic的message timestamp类型为CreateTime,同时设置ExtractRecordMetadataTimestamp子类,根据需要如下三种选择:

  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.FailOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp");//timestamp.extractor

ingestion-time

事件或数据记录被存储在Kafka broker的某个topic partition中的时间点。早于processing-time,晚于event-time。

在event-time语义下我们需要保证topic的message timestamp类型为LogAppendTime,同时设置ExtractRecordMetadataTimestamp子类,根据需要如下三种选择:

  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.FailOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp");//timestamp.extractor
共有 人打赏支持
粉丝 9
博文 79
码字总数 169741
×
Yulong_
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: