文档章节

Time Semantics

Yulong_
 Yulong_
发布于 2017/08/23 16:06
字数 1439
阅读 18
收藏 0

Core Concepts中介绍了三种语义Event time、Processing-time、Ingestion time。

这里需要注意的是:punctuate方法必须使用新数据才能触发。

时间语义设置

如何实现不同的时间语义主要取决于两个方面:

message timestamp类型

在0.10.x版本之后timestamps自动嵌入Kafka messages中。基于Kafka配置,这些timestamp代表event-time或者ingestion-time。参数如下,broker参数为log.message.timestamp.type,topic参数为message.timestamp.type。

log.message.timestamp.type:定义message的timestamp是create time或者log append time。参数值为CreateTime 或LogAppendTime。默认值是CreateTime。broker的参数配置。

message.timestamp.type:定义message的timestamp是create time或者log append time。参数值为CreateTime 或LogAppendTime。默认值是CreateTime。topic创建时参数配置,不设置则使用log.message.timestamp.type。

TimestampExtractor类型

接口TimestampExractor分为两类:

WallclockTimestampExtractor

WallclockTimestampExtractor提供了processing-time语义,只提供了一个方法extract。
  • extract方法获取时间为当前系统时间(System.currentTimeMillis())

ExtractRecordMetadataTimestamp

ExtractRecordMetadataTimestamp为抽象类及其子类提供了非processing-time语义。提供了两个方法extract和onInvalidTimestamp。
  • extract方法获取的是message的timestamp值。与message timestamp类型共同作用来提供event-time或者ingestion-time语义。
  • onInvalidTimestamp抽象方法需实现,主要是当遇到无效的message timestamp时如何进行处理。
ExtractRecordMetadataTimestamp包含如下子类:
  • FailOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,extractor会抛出异常。
  • LogAndSkipOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,将此记录在WARN日志中打印,依然返回当前的message timestamp值,最终会在数据处理时会导致忽略处理这条记录。
  • UsePreviousTimeOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,将上一条有效记录的timestamp作为当前记录的timestamp值。

自定义TimestampExtractor

可以自定义实现TimestampExtractor接口,使用提取器提取记录中的部分数据作为返回,这样可以灵活设置语义。

一般情况下,我们使用producer发送message到kafka集群时,可以指定message timestamp来设置(也就是event-time),但是如果message timestamp并不是我们需要的,那么就需要自定义提取器来提取message的某个field。

总结

通过如下表格可见:

当使用WallclockTimestampExtractor提供processing-time语义。

当ExtractRecordMetadataTimestamp子类与CreateTime类型一起时,提供event-time语义。

当ExtractRecordMetadataTimestamp子类与LogAppendTime类型一起时,提供ingestion-time语义。

自定义实现TimestampExtractor接口,提供自定义time语义。

语义类型 message timestamp TimestampExractor
processing-time
 
WallclockTimestampExtractor
event-time
CreateTime
ExtractRecordMetadataTimestamp子类
ingestion-time
LogAppendTime
ExtractRecordMetadataTimestamp子类
自定义语义   自己实现TimestampExtractor

processing-time

事件或数据记录被流处理程序开始处理时的时间点,晚于event-time和ingestion-time

这里需要注意的是:punctuate方法必须使用新数据才能触发。

比如在process-time语义中,设置context.schedule(5000),程序执行时间为20秒,在0到5秒有数据,第一条数据到来会触发第一次punctuate:

如果5秒之后不再获取任何新数据(因为程序我们设置执行时间为20秒)则永远不会触发punctuate;

如果5秒之后才获取一条新数据,则同时触发(0-5)一次punctuate;

如果10秒之后才获取一条新数据,则同时触发(0-5),(5-10)两次punctuate;

如果15秒之后才获取一条新数据,则同时触发(0-5),(5-10),(10-15)三次punctuate;

在processing-time语义下我们需要如下配置:

props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.WallclockTimestampExtractor");//timestamp.extractor

 

import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import java.io.File;
import java.util.Locale;
import java.util.Properties;
/**
 * Demonstrates, using the low-level Processor APIs, how to implement the
 * WordCount program that computes a simple word occurrence histogram from an
 * input text.
 *
 * In this example, the input stream reads from a topic named
 * "streams-file-input", where the values of messages represent lines of text;
 * and the histogram output is written to topic
 * "streams-wordcount-processor-output" where each record is an updated count of
 * a single word.
 *
 * Before running this example you must create the input topic and the output
 * topic (e.g. via bin/kafka-topics.sh --create ...), and write some data to the
 * input topic (e.g. via bin/kafka-console-producer.sh). Otherwise you won't see
 * any data arriving in the output topic.
 */
public class WordCountProcessorDemo {
	private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
		@Override
		public Processor<String, String> get() {
			return new Processor<String, String>() {
				private ProcessorContext context;
				private KeyValueStore<String, Integer> kvStore;
				@Override
				@SuppressWarnings("unchecked")
				public void init(ProcessorContext context) {
					this.context = context;
					this.context.schedule(5000);
					this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
				}
				@Override
				public void process(String dummy, String line) {
					String[] words = line.toLowerCase(Locale.getDefault()).split(" ");
					System.out.println("line:" + line);
					for (String word : words) {
						Integer oldValue = this.kvStore.get(word);
						if (oldValue == null) {
							this.kvStore.put(word, 1);
						} else {
							this.kvStore.put(word, oldValue + 1);
						}
					}
					context.commit();
				}
				@Override
				public void punctuate(long timestamp) {
					try (KeyValueIterator<String, Integer> iter = this.kvStore.all()) {
						System.out.println("----------- " + timestamp + "----------- ");
						//System.out.println(TimestampUtil.TimestampFormat(timestamp));
						while (iter.hasNext()) {
							KeyValue<String, Integer> entry = iter.next();
							System.out.println("[" + entry.key + ", " + entry.value + "]");
							context.forward(entry.key, entry.value.toString());
						}
					}
				}
				@Override
				public void close() {
				}
			};
		}
	}
	public static void main(String[] args) throws Exception {
		Properties props = new Properties();
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "breath:9092");
		props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
		props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.WallclockTimestampExtractor");
		TopologyBuilder builder = new TopologyBuilder();
		builder.addSource("Source", "streams-file-input");
		builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
		builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(),
				"Process");
		builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
		KafkaStreams streams = new KafkaStreams(builder, props);
		streams.start();
		Thread.sleep(20000L);
		streams.close();
		// 清空state store
		String appStateDir = props.get(StreamsConfig.STATE_DIR_CONFIG) + System.getProperty("file.separator")
				+ props.get(StreamsConfig.APPLICATION_ID_CONFIG);
		FileUtils.deleteDirectory(new File(appStateDir));
		// 清空application相关中间topic以及input topic的offset
		String kafkaHome = System.getenv("KAFKA_HOME");
		Runtime runtime = Runtime.getRuntime();
		runtime.exec(kafkaHome + "/bin/kafka-streams-application-reset.sh " + "--application-id "
				+ props.get(StreamsConfig.APPLICATION_ID_CONFIG) + " " + "--bootstrap-servers breath:9092 "
				+ "--zookeeper breath:2181/kafka01021 " + "--input-topics streams-file-input");
	}
}


event-time

事件或数据记录发生的时间点,通常是源头产生。早于processing-time和ingestion-time

event-time的timestamp生成方式有两种

  • 用户可以指定timestamp,在producer.send方法中发送的ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)进行设置。
  • 用户不指定timestamp,则producer.send当时的时间作为timestamp。

在event-time语义下我们需要保证topic的message timestamp类型为CreateTime,同时设置ExtractRecordMetadataTimestamp子类,根据需要如下三种选择:

  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.FailOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp");//timestamp.extractor

ingestion-time

事件或数据记录被存储在Kafka broker的某个topic partition中的时间点。早于processing-time,晚于event-time。

在event-time语义下我们需要保证topic的message timestamp类型为LogAppendTime,同时设置ExtractRecordMetadataTimestamp子类,根据需要如下三种选择:

  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.FailOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp");//timestamp.extractor

© 著作权归作者所有

共有 人打赏支持
Yulong_
粉丝 8
博文 93
码字总数 169760
作品 0
朝阳
部门经理
What is the difference between syntax and semantic

Syntax is about the structure or the grammar of the language. It answers the question: how do I construct a valid sentence? All languages, even English and other human (aka "nat......

pczhangtl
2014/07/02
0
0
Oracle下各个NLS相关参数取得方法

1)--------------------------------------------------------------------------------------------------------- NLSNCHARCHARACTERSET: SQL> select * from nls_database_parameters whe......

嗯哼9925
2017/12/26
0
0
关于python multiprocessing进程通信的pipe和queue方式

这两天温故了python 的multiprocessing多进程模块,看到的pipe和queue这两种ipc方式,啥事ipc? ipc就是进程间的通信模式,常用的一半是socke,rpc,pipe和消息队列等。 今个就再把pipe和que...

rfyiamcool
2014/09/08
0
0
Apache Drill 0.4.0 发布,大型数据集分析系统

Apache Drill是为大数据集的互动分析而生,是Google的Dremel的开源版本。它的目标是可以高效地对大数据集进行分析,可以运行在1000台以上的服务器,在几秒内处理PB级的数据和万亿条的数据记录...

亚当李
2014/08/14
3.1K
6
FileZilla Client 3.6.0 Beta1 发布

FileZilla Client 发布了 3.6.0 的首个 Beta 版,该版本改进主要包括: 新功能: Auto-scroll file lists if dragging an item near the top or bottom Add option to create empty files t......

oschina
2012/10/22
452
3

没有更多内容

加载失败,请刷新页面

加载更多

mysql 数据类型及占用字节数

数字类型 TINYINT                           1 字节 SMALLINT                          2 个字节 MEDIUMINT                         3 个字节...

会游泳的鱼_
42分钟前
3
0
高性能mysql:创建高性能的索引

性能优化简介 MySQL性能定义为完成某件任务所需要的时间量度,换句话说,性能即响应时间,这是一个非常重要的原则。我们通过任务和时间而不是资源来测量性能。数据库服务器的目的是执行SQL语...

背后的辛酸
57分钟前
4
0
HTTP get、post 中请求json与map传参格式

import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;import java.nio.charset.Charset;import java.util.ArrayList;import java.util.List;im......

寒风中的独狼
今天
3
0
IDEA中tomcat启动慢 耗时10分钟

用idea中的tomcat以debug模式启动,会非常的慢,而正常启动没啥问题;原因是debug模式中View Breakpoints断点代码,断点的是jar包,而现在启动由于jar包发生变化,导致启动时一直处于等待中。...

GoodMarver
今天
5
0
Linux学习-10月18(awk)

9.6/9.7 awk 一、awk简介   1. awk是一种编程语言,用于对文本和数据进行处理的   2. 具有强大的文本格式化能力   3. 利用命令awk,可以将一些文本整理成为我们想要的样子   4. 命令awk...

wxy丶
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部