之前Flink的Watermark原理老是不明白,并且在CSDN上找的一篇文章,似乎是因为版本的问题,两年前的博客,代码验证下来始终有问题,在网上和人谈论之后,重新用代码验证了,才有点清晰明了,在此记录一下。
WaterMark
实时计算中,数据时间比较敏感。有eventTime
和processTime
区分,一般来说eventTime
是从原始的消息中提取过来的,processTime
是Flink
自己提供的,Flink
中一个亮点就是可以基于eventTime
计算,这个功能很有用,因为实时数据可能会经过比较长的链路,多少会有延时,并且有很大的不确定性,对于一些需要精确体现事件变化趋势的场景中,单纯使用processTime
显然是不合理的。
概念
watermark
是一种衡量Event Time
进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp.watermark
是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark
机制结合window
来实现。
流处理从事件产生,到流经source
,再到operator
,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order
或者说late element
)。
但是对于late element
,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window
去进行计算了。这个特别的机制,就是watermark
。
window划分
window
的设定无关数据本身,而是系统定义好了的。window
是flink
中划分数据一个基本单位,window
的划分方式是固定的,默认会根据自然时间划分window
,并且划分方式是前闭后开。
window划分 | w1 | w2 | w3 |
---|---|---|---|
3s | [00:00:00~00:00:03) | [00:00:03~00:00:06) | [00:00:06~00:00:09) |
5s | [00:00:00~00:00:05) | [00:00:05~00:00:10) | [00:00:10~00:00:15) |
10s | [00:00:00~00:00:10) | [00:00:10~00:00:20) | [00:00:20~00:00:30) |
1min | [00:00:00~00:01:00) | [00:01:00~00:02:00) | [00:02:00~00:03:00) |
示例
如果设置最大允许的乱序时间是10s,滚动时间窗口为5s
1 | {"datetime":"2019-03-26 16:25:24","name":"zhangsan"} |
触达改记录的时间窗口应该为2019-03-26 16:25:20~2019-03-26 16:25:25
即当有数据eventTime >= 2019-03-26 16:25:35 时
1 | {"datetime":"2019-03-26 16:25:35","name":"zhangsan"} |
提取watermark
watermark的提取工作在taskManager中完成,意味着这项工作是并行进行的的,而watermark是一个全局的概念,就是一个整个Flink作业之后一个warkermark。
AssignerWithPeriodicWatermarks
定时提取watermark,这种方式会定时提取更新wartermark。
1 | //默认200ms |
AssignerWithPunctuatedWatermarks
伴随event的到来就提取watermark,就是每一个event到来的时候,就会提取一次Watermark。
这样的方式当然设置watermark更为精准,但是当数据量大的时候,频繁的更新wartermark会比较影响性能。
通常情况下采用定时提取就足够了。
使用
设置数据流时间特征
1 | //设置为事件时间 |
默认为TimeCharacteristic.ProcessingTime
,默认水位线更新每隔200ms
入口文件
1 | val env = StreamExecutionEnvironment.getExecutionEnvironment |
1 | class MyWaterMark extends AssignerWithPeriodicWatermarks[EventObj] { |
代码详解
- 设置为事件时间
- 接受本地socket数据
- 抽取timestamp生成watermark,打印(线程id,key,eventTime,currentMaxTimestamp,watermark)
- event time每隔3秒触发一次窗口,打印(key,窗口内元素个数,窗口内最早元素的时间,窗口内最晚元素的时间,窗口自身开始时间,窗口自身结束时间)
试验
第一次
数据
1 | {"datetime":"2019-03-26 16:25:24","name":"zhangsan"} |
输出
1 | |currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:24],currentMaxTimestamp:[2019-03-26 16:25:24],watermark:[2019-03-26 16:25:14] |
汇总
Key | EventTime | currentMaxTimestamp | Watermark |
---|---|---|---|
zhangsan | 2019-03-26 16:25:24 | 2019-03-26 16:25:24 | 2019-03-26 16:25:14 |
第二次
数据
1 | {"datetime":"2019-03-26 16:25:27","name":"zhangsan"} |
输出
1 | currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:27],currentMaxTimestamp:[2019-03-26 16:25:27],watermark:[2019-03-26 16:25:17] |
随着EventTime的升高,Watermark升高。
汇总
Key | EventTime | currentMaxTimestamp | Watermark |
---|---|---|---|
zhangsan | 2019-03-26 16:25:24 | 2019-03-26 16:25:24 | 2019-03-26 16:25:14 |
zhangsan | 2019-03-26 16:25:27 | 2019-03-26 16:25:27 | 2019-03-26 16:25:17 |
第三次
数据
1 | {"datetime":"2019-03-26 16:25:34","name":"zhangsan"} |
输出
1 | currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:34],currentMaxTimestamp:[2019-03-26 16:25:34],watermark:[2019-03-26 16:25:24] |
到这里,window仍然没有被触发,此时watermark的时间已经等于了第一条数据的Event Time了。
汇总
Key | EventTime | currentMaxTimestamp | Watermark |
---|---|---|---|
zhangsan | 2019-03-26 16:25:24 | 2019-03-26 16:25:24 | 2019-03-26 16:25:14 |
zhangsan | 2019-03-26 16:25:27 | 2019-03-26 16:25:27 | 2019-03-26 16:25:17 |
zhangsan | 2019-03-26 16:25:34 | 2019-03-26 16:25:34 | 2019-03-26 16:25:24 |
第四次
数据
1 | {"datetime":"2019-03-26 16:25:35","name":"zhangsan"} |
输出
1 | currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:35],currentMaxTimestamp:[2019-03-26 16:25:35],watermark:[2019-03-26 16:25:25](zhangsan,1,2019-03-26 16:25:24,2019-03-26 16:25:24,2019-03-26 16:25:20,2019-03-26 16:25:25) |
输出
1 | currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:35],currentMaxTimestamp:[2019-03-26 16:25:35],watermark:[2019-03-26 16:25:25](zhangsan,1,2019-03-26 16:25:24,2019-03-26 16:25:24,2019-03-26 16:25:20,2019-03-26 16:25:25) |
直接证明了window的设定无关数据本身,而是系统定义好了的。
输入的数据中,根据自身的Event Time,将数据划分到不同的window中,如果window中有数据,则当watermark时间>=Event Time时,就符合了window触发的条件了,最终决定window触发,还是由数据本身的Event Time所属的window中的window_end_time决定。
当最后一条数据16:25:35到达是,Watermark提升到16:25:25,此时窗口16:25:20~16:25:25中有数据,Window被触发。
汇总
Key | EventTime | currentMaxTimestamp | Watermark | WindowStartTime | WindowEndTime |
---|---|---|---|---|---|
zhangsan | 2019-03-26 16:25:24 | 2019-03-26 16:25:24 | 2019-03-26 16:25:14 | ||
zhangsan | 2019-03-26 16:25:27 | 2019-03-26 16:25:27 | 2019-03-26 16:25:17 | ||
zhangsan | 2019-03-26 16:25:34 | 2019-03-26 16:25:34 | 2019-03-26 16:25:24 | ||
zhangsan | 2019-03-26 16:25:35 | 2019-03-26 16:25:35 | 2019-03-26 16:25:25 | [2019-03-26 16:25:20 | 2019-03-26 16:25:25) |
第五次
数据
1 | {"datetime":"2019-03-26 16:25:37","name":"zhangsan"} |
输出
1 | currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:37],currentMaxTimestamp:[2019-03-26 16:25:37],watermark:[2019-03-26 16:25:27] |
此时,watermark时间虽然已经达到了第二条数据的时间,但是由于其没有达到第二条数据所在window的结束时间,所以window并没有被触发。
第二条数据所在的window时间是:[2019-03-26 16:25:25,2019-03-26 16:25:30)
汇总
Key | EventTime | currentMaxTimestamp | Watermark | WindowStartTime | WindowEndTime |
---|---|---|---|---|---|
zhangsan | 2019-03-26 16:25:24 | 2019-03-26 16:25:24 | 2019-03-26 16:25:14 | ||
zhangsan | 2019-03-26 16:25:27 | 2019-03-26 16:25:27 | 2019-03-26 16:25:17 | ||
zhangsan | 2019-03-26 16:25:34 | 2019-03-26 16:25:34 | 2019-03-26 16:25:24 | ||
zhangsan | 2019-03-26 16:25:35 | 2019-03-26 16:25:35 | 2019-03-26 16:25:25 | [2019-03-26 16:25:20 | 2019-03-26 16:25:25) |
zhangsan | 2019-03-26 16:25:37 | 2019-03-26 16:25:37 | 2019-03-26 16:25:27 |
第六次
数据
1 | {"datetime":"2019-03-26 16:25:40","name":"zhangsan"} |
输出
1 | currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:40],currentMaxTimestamp:[2019-03-26 16:25:40],watermark:[2019-03-26 16:25:30](zhangsan,1,2019-03-26 16:25:27,2019-03-26 16:25:27,2019-03-26 16:25:25,2019-03-26 16:25:30) |
Key | EventTime | currentMaxTimestamp | Watermark | WindowStartTime | WindowEndTime |
---|---|---|---|---|---|
zhangsan | 2019-03-26 16:25:24 | 2019-03-26 16:25:24 | 2019-03-26 16:25:14 | ||
zhangsan | 2019-03-26 16:25:27 | 2019-03-26 16:25:27 | 2019-03-26 16:25:17 | ||
zhangsan | 2019-03-26 16:25:34 | 2019-03-26 16:25:34 | 2019-03-26 16:25:24 | ||
zhangsan | 2019-03-26 16:25:35 | 2019-03-26 16:25:35 | 2019-03-26 16:25:25 | [2019-03-26 16:25:20 | 2019-03-26 16:25:25) |
zhangsan | 2019-03-26 16:25:37 | 2019-03-26 16:25:37 | 2019-03-26 16:25:27 | ||
zhangsan | 2019-03-26 16:25:40 | 2019-03-26 16:25:40 | 2019-03-26 16:25:30 | [2019-03-26 16:25:25 | 2019-03-26 16:25:30) |
结论
window的触发要符合以下几个条件:
- watermark时间 >= window_end_time
- 在[window_start_time,window_end_time)中有数据存在
同时满足了以上2个条件,window才会触发。
watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加.
多并行度
总结
Flink如何处理乱序?
watermark+window机制。window中可以对input进行按照Event Time排序,使得完全按照Event Time发生的顺序去处理数据,以达到处理乱序数据的目的。
Flink何时触发window?
对于late element太多的数据而言
- Event Time < watermark时间
对于out-of-order以及正常的数据而言
- watermark时间 >= window_end_time
- 在[window_start_time,window_end_time)中有数据存在
Flink应该如何设置最大乱序时间?
结合自己的业务以及数据情况去设置。