文档章节

Time Semantics

Yulong_
 Yulong_
发布于 2017/08/23 16:06
字数 1439
阅读 12
收藏 0
点赞 0
评论 0

Core Concepts中介绍了三种语义Event time、Processing-time、Ingestion time。

这里需要注意的是:punctuate方法必须使用新数据才能触发。

时间语义设置

如何实现不同的时间语义主要取决于两个方面:

message timestamp类型

在0.10.x版本之后timestamps自动嵌入Kafka messages中。基于Kafka配置,这些timestamp代表event-time或者ingestion-time。参数如下,broker参数为log.message.timestamp.type,topic参数为message.timestamp.type。

log.message.timestamp.type:定义message的timestamp是create time或者log append time。参数值为CreateTime 或LogAppendTime。默认值是CreateTime。broker的参数配置。

message.timestamp.type:定义message的timestamp是create time或者log append time。参数值为CreateTime 或LogAppendTime。默认值是CreateTime。topic创建时参数配置,不设置则使用log.message.timestamp.type。

TimestampExtractor类型

接口TimestampExractor分为两类:

WallclockTimestampExtractor

WallclockTimestampExtractor提供了processing-time语义,只提供了一个方法extract。
  • extract方法获取时间为当前系统时间(System.currentTimeMillis())

ExtractRecordMetadataTimestamp

ExtractRecordMetadataTimestamp为抽象类及其子类提供了非processing-time语义。提供了两个方法extract和onInvalidTimestamp。
  • extract方法获取的是message的timestamp值。与message timestamp类型共同作用来提供event-time或者ingestion-time语义。
  • onInvalidTimestamp抽象方法需实现,主要是当遇到无效的message timestamp时如何进行处理。
ExtractRecordMetadataTimestamp包含如下子类:
  • FailOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,extractor会抛出异常。
  • LogAndSkipOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,将此记录在WARN日志中打印,依然返回当前的message timestamp值,最终会在数据处理时会导致忽略处理这条记录。
  • UsePreviousTimeOnInvalidTimestamp:如果某条记录含有无效的timestamp值时,将上一条有效记录的timestamp作为当前记录的timestamp值。

自定义TimestampExtractor

可以自定义实现TimestampExtractor接口,使用提取器提取记录中的部分数据作为返回,这样可以灵活设置语义。

一般情况下,我们使用producer发送message到kafka集群时,可以指定message timestamp来设置(也就是event-time),但是如果message timestamp并不是我们需要的,那么就需要自定义提取器来提取message的某个field。

总结

通过如下表格可见:

当使用WallclockTimestampExtractor提供processing-time语义。

当ExtractRecordMetadataTimestamp子类与CreateTime类型一起时,提供event-time语义。

当ExtractRecordMetadataTimestamp子类与LogAppendTime类型一起时,提供ingestion-time语义。

自定义实现TimestampExtractor接口,提供自定义time语义。

语义类型 message timestamp TimestampExractor
processing-time
 
WallclockTimestampExtractor
event-time
CreateTime
ExtractRecordMetadataTimestamp子类
ingestion-time
LogAppendTime
ExtractRecordMetadataTimestamp子类
自定义语义   自己实现TimestampExtractor

processing-time

事件或数据记录被流处理程序开始处理时的时间点,晚于event-time和ingestion-time

这里需要注意的是:punctuate方法必须使用新数据才能触发。

比如在process-time语义中,设置context.schedule(5000),程序执行时间为20秒,在0到5秒有数据,第一条数据到来会触发第一次punctuate:

如果5秒之后不再获取任何新数据(因为程序我们设置执行时间为20秒)则永远不会触发punctuate;

如果5秒之后才获取一条新数据,则同时触发(0-5)一次punctuate;

如果10秒之后才获取一条新数据,则同时触发(0-5),(5-10)两次punctuate;

如果15秒之后才获取一条新数据,则同时触发(0-5),(5-10),(10-15)三次punctuate;

在processing-time语义下我们需要如下配置:

props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.WallclockTimestampExtractor");//timestamp.extractor

 

import org.apache.commons.io.FileUtils;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import java.io.File;
import java.util.Locale;
import java.util.Properties;
/**
 * Demonstrates, using the low-level Processor APIs, how to implement the
 * WordCount program that computes a simple word occurrence histogram from an
 * input text.
 *
 * In this example, the input stream reads from a topic named
 * "streams-file-input", where the values of messages represent lines of text;
 * and the histogram output is written to topic
 * "streams-wordcount-processor-output" where each record is an updated count of
 * a single word.
 *
 * Before running this example you must create the input topic and the output
 * topic (e.g. via bin/kafka-topics.sh --create ...), and write some data to the
 * input topic (e.g. via bin/kafka-console-producer.sh). Otherwise you won't see
 * any data arriving in the output topic.
 */
public class WordCountProcessorDemo {
	private static class MyProcessorSupplier implements ProcessorSupplier<String, String> {
		@Override
		public Processor<String, String> get() {
			return new Processor<String, String>() {
				private ProcessorContext context;
				private KeyValueStore<String, Integer> kvStore;
				@Override
				@SuppressWarnings("unchecked")
				public void init(ProcessorContext context) {
					this.context = context;
					this.context.schedule(5000);
					this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
				}
				@Override
				public void process(String dummy, String line) {
					String[] words = line.toLowerCase(Locale.getDefault()).split(" ");
					System.out.println("line:" + line);
					for (String word : words) {
						Integer oldValue = this.kvStore.get(word);
						if (oldValue == null) {
							this.kvStore.put(word, 1);
						} else {
							this.kvStore.put(word, oldValue + 1);
						}
					}
					context.commit();
				}
				@Override
				public void punctuate(long timestamp) {
					try (KeyValueIterator<String, Integer> iter = this.kvStore.all()) {
						System.out.println("----------- " + timestamp + "----------- ");
						//System.out.println(TimestampUtil.TimestampFormat(timestamp));
						while (iter.hasNext()) {
							KeyValue<String, Integer> entry = iter.next();
							System.out.println("[" + entry.key + ", " + entry.value + "]");
							context.forward(entry.key, entry.value.toString());
						}
					}
				}
				@Override
				public void close() {
				}
			};
		}
	}
	public static void main(String[] args) throws Exception {
		Properties props = new Properties();
		props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
		props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "breath:9092");
		props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
		props.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
		props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.WallclockTimestampExtractor");
		TopologyBuilder builder = new TopologyBuilder();
		builder.addSource("Source", "streams-file-input");
		builder.addProcessor("Process", new MyProcessorSupplier(), "Source");
		builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(),
				"Process");
		builder.addSink("Sink", "streams-wordcount-processor-output", "Process");
		KafkaStreams streams = new KafkaStreams(builder, props);
		streams.start();
		Thread.sleep(20000L);
		streams.close();
		// 清空state store
		String appStateDir = props.get(StreamsConfig.STATE_DIR_CONFIG) + System.getProperty("file.separator")
				+ props.get(StreamsConfig.APPLICATION_ID_CONFIG);
		FileUtils.deleteDirectory(new File(appStateDir));
		// 清空application相关中间topic以及input topic的offset
		String kafkaHome = System.getenv("KAFKA_HOME");
		Runtime runtime = Runtime.getRuntime();
		runtime.exec(kafkaHome + "/bin/kafka-streams-application-reset.sh " + "--application-id "
				+ props.get(StreamsConfig.APPLICATION_ID_CONFIG) + " " + "--bootstrap-servers breath:9092 "
				+ "--zookeeper breath:2181/kafka01021 " + "--input-topics streams-file-input");
	}
}


event-time

事件或数据记录发生的时间点,通常是源头产生。早于processing-time和ingestion-time

event-time的timestamp生成方式有两种

  • 用户可以指定timestamp,在producer.send方法中发送的ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value)进行设置。
  • 用户不指定timestamp,则producer.send当时的时间作为timestamp。

在event-time语义下我们需要保证topic的message timestamp类型为CreateTime,同时设置ExtractRecordMetadataTimestamp子类,根据需要如下三种选择:

  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.FailOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp");//timestamp.extractor

ingestion-time

事件或数据记录被存储在Kafka broker的某个topic partition中的时间点。早于processing-time,晚于event-time。

在event-time语义下我们需要保证topic的message timestamp类型为LogAppendTime,同时设置ExtractRecordMetadataTimestamp子类,根据需要如下三种选择:

  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.FailOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp");//timestamp.extractor
  • props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp");//timestamp.extractor

© 著作权归作者所有

共有 人打赏支持
Yulong_
粉丝 8
博文 79
码字总数 169760
作品 0
朝阳
部门经理
What is the difference between syntax and semantic

Syntax is about the structure or the grammar of the language. It answers the question: how do I construct a valid sentence? All languages, even English and other human (aka "nat......

pczhangtl ⋅ 2014/07/02 ⋅ 0

Oracle下各个NLS相关参数取得方法

1)--------------------------------------------------------------------------------------------------------- NLSNCHARCHARACTERSET: SQL> select * from nls_database_parameters whe......

嗯哼9925 ⋅ 2017/12/26 ⋅ 0

关于python multiprocessing进程通信的pipe和queue方式

这两天温故了python 的multiprocessing多进程模块,看到的pipe和queue这两种ipc方式,啥事ipc? ipc就是进程间的通信模式,常用的一半是socke,rpc,pipe和消息队列等。 今个就再把pipe和que...

rfyiamcool ⋅ 2014/09/08 ⋅ 0

Apache Drill 0.4.0 发布,大型数据集分析系统

Apache Drill是为大数据集的互动分析而生,是Google的Dremel的开源版本。它的目标是可以高效地对大数据集进行分析,可以运行在1000台以上的服务器,在几秒内处理PB级的数据和万亿条的数据记录...

亚当李 ⋅ 2014/08/14 ⋅ 6

FileZilla Client 3.6.0 Beta1 发布

FileZilla Client 发布了 3.6.0 的首个 Beta 版,该版本改进主要包括: 新功能: Auto-scroll file lists if dragging an item near the top or bottom Add option to create empty files t......

oschina ⋅ 2012/10/22 ⋅ 3

Google Protocol Buffers v3.0.0-alpha-1 发布

Google Protocol Buffers v3.0.0-alpha-1 发布,新特性包括: Removal of field presence logic for primitive value fields, removalof required fields, and removal of default values. ......

oschina ⋅ 2014/12/16 ⋅ 9

McSema

MC-Semantics (或者 mcsema, 发音 'em see se ma') 是一个用来将机器码翻译成 LLVM IR 的库,包括如下几个子项目: Control Flow Recovery Instruction Semantics Binary File Parsing Sema...

红薯 ⋅ 2014/08/08 ⋅ 0

Oracle字符集

Oracle字符集 10.1 Toad client显示乱码错误 10.2 Client端显示中文 ORACLE数据库有国家字符集(national character set)与数据库字符集(database character set)之分。两者都是在创建数据库...

wangbinbin0326 ⋅ 2015/08/26 ⋅ 0

Alluxio 1.1.0 发布,分布式文件系统

Alluxio 1.1.0 发布了,Alluxio是一个高容错的分布式文件系统,允许文件以内存的速度在集群框架中进行可靠的共享,类似Spark和MapReduce。通过利用lineage信息,积极地使用内存,Alluxio的吞...

愚_者 ⋅ 2016/06/12 ⋅ 4

利用jms 消息选择器 Message Selectors 过滤消息

下面是个人理解,如果有不正确的地方麻烦指出来.更正. JMS 消息选择器 配置 1. 消息选择器是根据 header 和 properties 允许客户端选择性的制定需要接收的消息 注意 消息选择器是无法利用 消息...

lwei ⋅ 2012/06/27 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

vbs 取文件大小 字节

dim namedim fs, s'name = Inputbox("姓名")'msgbox(name)set fs = wscript.createobject("scripting.filesystemobject") 'fs为FSO实例if (fs.folderexists("c:\temp"))......

vga ⋅ 10分钟前 ⋅ 0

高并发之Nginx的限流

首先Nginx的版本号有要求,最低为1.11.5 如果低于这个版本,在Nginx的配置中 upstream web_app { server 到达Ip1:端口 max_conns=10; server 到达Ip2:端口 max_conns=10; } server { listen ...

算法之名 ⋅ 今天 ⋅ 0

Spring | IOC AOP 注解 简单使用

写在前面的话 很久没更新笔记了,有人会抱怨:小冯啊,你是不是在偷懒啊,没有学习了。老哥,真的冤枉:我觉得我自己很菜,还在努力学习呢,正在学习Vue.js做管理系统呢。即便这样,我还是不...

Wenyi_Feng ⋅ 今天 ⋅ 0

博客迁移到 https://www.jianshu.com/u/aa501451a235

博客迁移到 https://www.jianshu.com/u/aa501451a235 本博客不再更新

为为02 ⋅ 今天 ⋅ 0

win10怎么彻底关闭自动更新

win10自带的更新每天都很多,每一次下载都要占用大量网络,而且安装要等得时间也蛮久的。 工具/原料 Win10 方法/步骤 单击左下角开始菜单点击设置图标进入设置界面 在设置窗口中输入“服务”...

阿K1225 ⋅ 今天 ⋅ 0

Elasticsearch 6.3.0 SQL功能使用案例分享

The best elasticsearch highlevel java rest api-----bboss Elasticsearch 6.3.0 官方新推出的SQL检索插件非常不错,本文一个实际案例来介绍其使用方法。 1.代码中的sql检索 @Testpu...

bboss ⋅ 今天 ⋅ 0

informix数据库在linux中的安装以及用java/c/c++访问

一、安装前准备 安装JDK(略) 到IBM官网上下载informix软件:iif.12.10.FC9DE.linux-x86_64.tar放在某个大家都可以访问的目录比如:/mypkg,并解压到该目录下。 我也放到了百度云和天翼云上...

wangxuwei ⋅ 今天 ⋅ 0

PHP语言系统ZBLOG或许无法重现月光博客的闪耀历史[图]

最近在写博客,希望通过自己努力打造一个优秀的教育类主题博客,名动江湖,但是问题来了,现在写博客还有前途吗?面对强大的自媒体站点围剿,还有信心和可能型吗? 至于程序部分,我选择了P...

原创小博客 ⋅ 今天 ⋅ 0

IntelliJ IDEA 2018.1新特性

工欲善其事必先利其器,如果有一款IDE可以让你更高效地专注于开发以及源码阅读,为什么不试一试? 本文转载自:netty技术内幕 3月27日,jetbrains正式发布期待已久的IntelliJ IDEA 2018.1,再...

Romane ⋅ 今天 ⋅ 0

浅谈设计模式之工厂模式

工厂模式(Factory Pattern)是 Java 中最常用的设计模式之一。这种类型的设计模式属于创建型模式,它提供了一种创建对象的最佳方式。 在工厂模式中,我们在创建对象时不会对客户端暴露创建逻...

佛系程序猿灬 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部