文档章节

关于flink的时间处理不正确的现象复现&原因分析

强子1985
 强子1985
发布于 12/06 15:31
字数 870
阅读 163
收藏 1

跟朋友聊天,说输出的时间不对,之前测试没关注到这个,然后就在processing模式下看了下,发现时间确实不正确

然后就debug,看问题在哪,最终分析出了原因,记录如下:

具体我在朋友的https://github.com/apache/flink/pull/7180 最下面给出了复现方案及原因分析

let me show how to generate the wrong result

background: processing time in tumbling window flink:1.5.0

the invoke stack is as follows:
[1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747)
[2] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:53)
[3] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74)
[4] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72)
[5] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39)
[6] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46)
[7] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550)
[8] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505)
[9] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266)
[10] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281)
[11] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511)
[12] java.util.concurrent.FutureTask.run (FutureTask.java:266)
[13] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180)
[14] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293)
[15] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142)
[16] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617)
[17] java.lang.Thread.run (Thread.java:748)

now ,we are at [1] org.apache.calcite.runtime.SqlFunctions.internalToTimestamp (SqlFunctions.java:1,747)

and the code is as follows:
public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - LOCAL_TZ.getOffset(v)); }
let us print the value of windowStart:v
print v
v = 1544074830000
let us print the value of windowEnd:v
print v
v = 1544074833000

after this, come back to
[1] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:51)

then,we will execute
`
if (windowStartOffset.isDefined) {
output.setField(
lastFieldPos + windowStartOffset.get,
SqlFunctions.internalToTimestamp(windowStart))
}

if (windowEndOffset.isDefined) {
output.setField(
lastFieldPos + windowEndOffset.get,
SqlFunctions.internalToTimestamp(windowEnd))
}
`

before execute,the output is
output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,null,null,null"
after execute,the output is
output = "pro0,throwable0,ERROR,ip0,1,ymm-appmetric-dev-self1_5_924367729,2018-12-06 05:40:30.0,2018-12-06 05:40:33.0,null"

so,do you think the
long value 1544074830000 translated to be 2018-12-06 05:40:30.0
long value 1544074833000 translated to be 2018-12-06 05:40:33.0
would be right?

I am in China, I think the timestamp should be 2018-12-06 13:40:30.0 and 2018-12-06 13:40:33.0

okay,let us continue

now ,the data will be write to kafka,before write ,the data will be serialized
let us see what happened!

the call stack is as follows:
[1] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer._timestamp (DateSerializer.java:41) [2] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:48) [3] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.std.DateSerializer.serialize (DateSerializer.java:15) [4] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue (DefaultSerializerProvider.java:130) [5] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.writeValue (ObjectMapper.java:2,444) [6] org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper.valueToTree (ObjectMapper.java:2,586) [7] org.apache.flink.formats.json.JsonRowSerializationSchema.convert (JsonRowSerializationSchema.java:189) [8] org.apache.flink.formats.json.JsonRowSerializationSchema.convertRow (JsonRowSerializationSchema.java:128) [9] org.apache.flink.formats.json.JsonRowSerializationSchema.serialize (JsonRowSerializationSchema.java:102) [10] org.apache.flink.formats.json.JsonRowSerializationSchema.serialize (JsonRowSerializationSchema.java:51) [11] org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper.serializeValue (KeyedSerializationSchemaWrapper.java:46) [12] org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010.invoke (FlinkKafkaProducer010.java:355) [13] org.apache.flink.streaming.api.operators.StreamSink.processElement (StreamSink.java:56) [14] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [15] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [16] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [17] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [18] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [19] org.apache.flink.streaming.api.operators.StreamMap.processElement (StreamMap.java:41) [20] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [21] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [22] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [23] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [24] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [25] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51) [26] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:37) [27] org.apache.flink.table.runtime.CRowWrappingCollector.collect (CRowWrappingCollector.scala:28) [28] DataStreamCalcRule$88.processElement (null) [29] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:66) [30] org.apache.flink.table.runtime.CRowProcessRunner.processElement (CRowProcessRunner.scala:35) [31] org.apache.flink.streaming.api.operators.ProcessOperator.processElement (ProcessOperator.java:66) [32] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator (OperatorChain.java:560) [33] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:535) [34] org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect (OperatorChain.java:515) [35] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:679) [36] org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect (AbstractStreamOperator.java:657) [37] org.apache.flink.streaming.api.operators.TimestampedCollector.collect (TimestampedCollector.java:51) [38] org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect (TimeWindowPropertyCollector.scala:65) [39] org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply (IncrementalAggregateWindowFunction.scala:74) [40] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:72) [41] org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply (IncrementalAggregateTimeWindowFunction.scala:39) [42] org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process (InternalSingleValueWindowFunction.java:46) [43] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents (WindowOperator.java:550) [44] org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime (WindowOperator.java:505) [45] org.apache.flink.streaming.api.operators.HeapInternalTimerService.onProcessingTime (HeapInternalTimerService.java:266) [46] org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run (SystemProcessingTimeService.java:281) [47] java.util.concurrent.Executors$RunnableAdapter.call (Executors.java:511) [48] java.util.concurrent.FutureTask.run (FutureTask.java:266) [49] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201 (ScheduledThreadPoolExecutor.java:180) [50] java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run (ScheduledThreadPoolExecutor.java:293) [51] java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1,142) [52] java.util.concurrent.ThreadPoolExecutor$Worker.run (ThreadPoolExecutor.java:617) [53] java.lang.Thread.run (Thread.java:748)
and the code is as follows:
protected long _timestamp(Date value) { return value == null ? 0L : value.getTime(); }

here,use windowEnd for example,the value is
value = "2018-12-06 05:40:33.0"
value.getTime() = 1544046033000

see,the initial value is 1544074833000 and the final value is 1544046033000

the minus value is 28800000, ---> 8 hours ,because I am in China.
why? the key reason is SqlFunctions.internalToTimestamp
public static Timestamp internalToTimestamp(long v)
{
return new Timestamp(v - LOCAL_TZ.getOffset(v));
}

in the code, It minus the LOCAL_TZ , I think it is redundant!

刚才又看了下,其实根本原因就是时间转换来转换去,没有用同一个类,用了2个类的方法

结果就乱套了,要改的话就是SqlFunctions的那个类

© 著作权归作者所有

共有 人打赏支持
强子1985

强子1985

粉丝 875
博文 1087
码字总数 798508
作品 8
南京
架构师
私信 提问
Apache Flink China Meetup 北京站 - 计算之美,何止于快

活动时间:11月18日13:30-17:30 活动地点:北京市朝阳区酒仙桥路6号院2号楼(360大厦A座1层发布厅) 通过本次meetup,你可以获悉: • Spark和Flink在行为数据分析上的比对 • Storm与Flink...

flink-china
11/09
0
0
Flink China社区线下 Meetup·北京站-实时计算,大有可为

阿里去年双十一的数据大屏,实时处理数据达每秒4亿+的体量 美团推荐系统则采取冷启动策略,用商圈为单位进行用户消费实时统计 滴滴在过车指标分析等场景中采用流计算实时监控 2017年,今日头...

Flink
07/04
0
0
首次加入云栖大会的Flink专场,究竟都讲了啥?

导读:9月19日,云栖大会的Flink分论坛,在杭州正式开幕。今天是云栖大会的第一天,据悉,这也是云栖大会首次加入Flink的论坛,足见阿里对于Flink前景的看好与重视。本次Flink分论坛,除了阿...

若有-若无
09/25
0
0
Flink 的新方向在哪里?这场顶级盛会给出了答案

九月的柏林,比杭州多了一丝清冽,与之相对应的,是如火如荼的2018 Flink Forward Berlin(以下简称FFB)会场。在这个初秋,Apache Flink 核心贡献者、行业先锋、实践专家在这里齐聚一堂,围...

技术小能手
09/13
0
0
基于Flink流处理的动态实时电商实时分析系统

在开始学习前给大家说下什么是Flink? 1.Flink是一个针对流数据和批数据的分布式处理引擎,主要用Java代码实现。 2.Apache Flink作为Apache的顶级项目,Flink集众多优点于一身,包括快速、可...

liwei2000
11/07
0
0

没有更多内容

加载失败,请刷新页面

加载更多

slot分发内容

slot元素作为组件模板之中的内容分发插槽。这个元素自身将被替换。 有 name 特性的 slot 称为具名 slot。 有 slot 特性的内容将分发到名字相匹配的具名 slot。 内容分发就是指混合父组件的内...

Carbenson
8分钟前
1
0
python开发入门

1.执行python文件 # python ./demo.py 2.Python ImportError: No module named 'requests'异常 解决方法: # pip install requests;...

硅谷课堂
10分钟前
1
0
官宣,PyTorch 1.0 稳定版本现已推出

简评:快来一起快乐地学习吧。 随着 PyTorch 生态系统和社区继续为开发人员提供有趣的新项目和教育资源,今天(12 月 7日)在 NeurIPS 会议上发布了 PyTorch 1.0 稳定版。研究人员和工程师现...

极光推送
23分钟前
1
0
对比理解adr,ldr指令

很多人在写简单的裸机代码或分析uboot时,常常遇到adr ldr指令。却分不清这2者的区别,今天就来谈谈adr与ldr指令。 参照韦老师的代码和Makefile写了test_adr.S: .text .globl _start _start...

天王盖地虎626
34分钟前
2
0
将spring boot 项目注册为Linux的服务

springboot 注册为Linux系统服务 springboot 注册为Linux系统服务

miaojiangmin
35分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部