在流处理中,时间是一个十分核心的概念,是整个系统的基石。例如,我们常常会碰到这样的需求:给定一个时间窗口,例如一个小时,统计时间窗口的内数据指标。那怎么划分什么数据将步入这个窗口呢?在窗口的定义之前,首先须要确定一个应用使用哪些样的时间语义。
本文将介绍Flink的EventTime、ProcessingTime和IngestionTime三种时间语义,接着会详尽介绍EventTime和Watermark的工作机制,以及怎样对数据流设置EventTime并生成Watermark。
Flink的三种时间语义
Flink的三种时间语义
如上图所示,Flink支持三种时间语义:
EventTime
EventTime指的是数据流中每位元素或则每位风波自带的时间属性,通常是风波发生的时间。因为风波从发生到步入Flink时间算子之间有好多环节,一个较早发生的风波由于延后可能较晚抵达,因而使用EventTime意味着风波抵达有可能是正序的。
使用EventTime时,最理想的情况下,我们可以仍然等待所有的风波抵达后再进行时间窗口的处理。假定一个时间窗口内的所有数据都早已抵达,基于EventTime的流处理会得到正确且一致的结果:无论我们是将同一个程序布署在不同的估算环境还是在相同的环境下多次估算同一份数据,都还能得到同样的估算结果。我们根本不同害怕正序抵达的问题。但这只是理想情况,现实中难以实现,由于我们既不晓得到底要等多长时间能够确认所有风波都早已抵达,更不可能无限地仍然等待下去。在实际应用中,当涉及到对风波依照时间窗口进行统计时,Flink会将窗口内的风波缓存出来,直至接收到一个Watermark,以确认不会有更晚数据的抵达。Watermark意味着在一个时间窗口下,Flink会等待一个有限的时间,这在一定程度上减少了估算结果的绝对确切性,并且降低了系统的延后。在流处理领域,比起其他几种时间语义,使用EventTime的用处是某个风波的时间是确定的,这样才能保证估算结果在一定程度上的可预测性。
一个基于EventTime的Flink程序中必须定义EventTime,以及怎样生成Watermark。我们可以使用元素中自带的时间,也可以在元素抵达Flink后人为给EventTime形参。
使用EventTime的优势是结果的可预测性,缺点是缓存较大,降低了延后,且调试和定位问题更复杂。
ProcessingTime
对于某个算子来说如何安装linux,ProcessingTime指算子使用当前机器的系统时钟来定义时间。在ProcessingTime的时间窗口场景下,无论风波哪些时侯发生,只要该风波在某个时间段达到了某个算子,都会被归结到该窗口下,不须要Watermark机制。对于一个程序在同一个估算环境来说,每位算子都有一定的历时,同一个风波的ProcessingTime,第n个算子和第n+1个算子不同。假如一个程序在不同的集群和环境下执行时,限于软硬件诱因,不同环境下前序算子处理速率不同,对于下游算子来说,风波的ProcessingTime也会不同,不同环境下时间窗口的估算结果会发生变化。为此,ProcessingTime在时间窗口下的估算会有不确定性。
ProcessingTime只依赖当前执行机器的系统时钟,不须要依赖Watermark,无需缓存。ProcessingTime是实现上去十分简单也是延后最小的一种时间语义。
IngestionTime
IngestionTime是风波抵达FlinkSouce的时间。从Source到下游各个算子中间可能有好多估算环节,任何一个算子的处理速率快慢可能影响到下游算子的ProcessingTime。而IngestionTime定义的是数据流最早步入Flink的时间,因而不会被算子处理速率影响。
IngestionTime一般是EventTime和ProcessingTime之间的一个折中方案。比起EventTime,IngestionTime可以不须要设置复杂的Watermark,因而也不须要太多缓存,延后较低。比起ProcessingTime,IngestionTime的时间是Souce形参的,一个风波在整个处理过程从头至尾都使用这个时间,但是后续算子不受前序算子处理速率的影响,估算结果相对确切一些,但估算成本稍高。
设置时间语义
在Flink中,我们须要在执行环境层面设置使用哪种时间语义。下边的代码使用EventTime:
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
假如想用另外两种时间语义,须要替换为:TimeCharacteristic.ProcessingTime和TimeCharacteristic.IngestionTime。
EventTime和Watermark
Flink的三种时间语义中,ProcessingTime和IngestionTime都可以不用设置Watermark。假如我们要使用EventTime语义,以下两项配置缺一不可:第一,使用一个时间戳为数据流中每位风波的EventTime形参;第二,生成Watermark。
实际上,EventTime是每位风波的元数据,Flink并不晓得每位风波的发生时间是哪些,我们必需要为每位风波的EventTime形参一个时间戳。关于时间戳,包括Flink在内的绝大多数系统都支持Unix时间戳系统(Unixtime或Unixepoch)。Unix时间戳系统以1970-01-0100:00:00.000为起始点,其他时间记为距离该起始时间的整数差值,通常是微秒(millisecond)精度。
有了EventTime时间戳,我们还必须生成Watermark。Watermark是Flink插入到数据流中的一种特殊的数据结构,它包含一个时间戳,并假定后续不会有大于该时间戳的数据。右图展示了一个正序数据流,其中方框是单个风波,方框中的数字是其对应的EventTime时间戳,圆圈为Watermark,圆圈中的数字为Watermark对应的时间戳。
Watermark的生成有以下几点须要注意:
分布式环境下Watermark的传播
在实际估算过程中,Flink的算子通常分布在多个并行的分区(或则称为实例)上,Flink须要将Watermark在并行环境下往前传播。如右图所示,Flink的每位并行算子子任务会维护针对该子任务的EventTime时钟,这个时钟记录了这个算子子任务Watermark处理进度,随着上游Watermark数据不断向上发送,算子子任务的EventTime时钟也要不断往前更新。因为上游各分区的处理速率不同,抵达当前算子的Watermark也会有先后快慢之分,每位算子子任务会维护来自上游不同分区的Watermark信息,这是一个列表,列表内对应上游算子各分区的Watermark时间戳等信息。
当上游某分区有Watermark步入该算子子任务后,Flink先判定新流入的Watermark时间戳是否小于PartitionWatermark列表内记录的该分区的历史Watermark时间戳,假如新流入的更大,则更新该分区的Watermark。比如,某个分区新流入的Watermark时间戳为4,算子子任务维护的该分区Watermark为1,这么Flink会更新PartitionWatermark列表为最新的时间戳4。接着,Flink会遍历PartitionWatermark列表中的所有时间戳,选择最小的一个作为该算子子任务的EventTime。同时,Flink会将更新的EventTime作为Watermark发送给下游所有算子子任务。算子子任务EventTime的更新意味着该子任务将时间推动到了这个时间,该时间之前的风波早已被处理并发送到下游。诸如,图中第二步和第三步,PartitionWatermark列表更新后,造成列表中最小时间戳发生了变化,算子子任务的EventTime时钟也相应进行了更新。整个过程完成了数据流中的Watermark推进算子子任务Watermark的时钟更新过程。Watermark像一个幕后推进者,不断将流处理系统的EventTime往前推动。我们可以将这些机制总结为:
Flink某算子子任务按照各上游流入的Watermark来更新PartitionWatermark列表。选定PartitionWatermark列表中最小的时间作为该算子的EventTimelinux makefile,并将这个时间发送给下游算子。
这样的设计机制满足了并行环境下Watermark在各算子中的传播问题,并且如果某个上游分区的Watermark仍然不更新,PartitionWatermark列表其他地方都在正常更新,惟独某些分区的时间逗留在很早的某个时间,这会造成算子的EventTime时钟不更新,相应的时间窗口估算也不会被触发,大量的数据积压在算子内部得不到处理,整个流处理处于空转状态。这些问题可能出现在使用数据流自带的Watermark,自带的Watermark在个别分区下没有及时更新。针对这些问题,一种解决办法是按照机器当前的时钟周期性地生成Watermark。
据悉,在union等多数据流处理时,Flink也使用上述Watermark更新机制,那就意味着,多个数据流的时间必须对齐,假如一方的Watermark时间较老,那整个应用的EventTime时钟也会使用这个较老的时间,其他数据流的数据会被积压。一旦发觉某个数据流不再生成新的Watermark,我们要在SourceFunction中的SourceContext里调用markAsTemporarilyIdle设置该数据流为空闲状态。
抽取时间戳及生成Watermark
至此,我们早已了解了Flink的EventTime时间戳和Watermark机制的大致工作原理,接出来我们将展示怎样在代码层面设置时间戳并生成Watermark。其实,对时间和Watermark的设置只对EventTime时间语义起作用,假若一个作业基于ProcessingTime或IngestionTime,那时间的设置没有哪些意义。由于时间在后续处理中还会用到,时间的设置要在任何时间窗口操作之前,其实,时间越早设置越好。Flink提供了以下方式设置时间戳和Watermark:
Source
我们可以在Source阶段,通过自定义SourceFunction或RichSourceFunction,在SourceContext里重画voidcollectWithTimestamp(Telement,longtimestamp)和voidemitWatermark(Watermarkmark)两个方式,其中,collectWithTimestamp给数据流中的每位元素T形参一个timestamp作为EventTime,emitWatermark生成Watermark。下边的代码展示了使用Scala调用这两个方式抽取时间戳并生成Watermark。
case class MyType(data: Double, eventTime: Long, hasWatermark:Boolean, watermarkTime: Long)
class MySource extends RichSourceFunction[MyType] {
override def run(ctx: SourceContext[MyType]): Unit = {
while (/* condition */) {
val next: MyType = getNext()
ctx.collectWithTimestamp(next, next.eventTimestamp)
if (next.hasWatermark) {
ctx.emitWatermark(new Watermark(next.watermarkTime))
}
}
}
}
在Source以后通过TimestampAssigner设置
假如我们不想更改Source,也可以在Source以后,通过时间戳指定器(TimestampAssigner)来设置。TimestampAssigner是一个在DataStream[T]上调用的算子,它会给数据流生成时间戳和Watermark,但不改变数据流的类型T。例如,我们可以在Source以后,先过滤掉不须要的内容,之后设置时间戳和Watermark。下边的代码展示了使用TimestampAssigner的大致流程。
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 使用EventTime时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyType] = env.addSource(...)
// 先过滤不需要的内容,然后设置Timestamp和Watermark。
val withTimestampsAndWatermarks: DataStream[MyType] = stream
.filter( item => "ERROR".equals(item.info) )
// 我们要实现一个MyTimestampsAndWatermarks,MyTimestampsAndWatermarks继承并实现了TimestampAssigner,告知Flink如何抽取时间戳并生成Watermark。
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
withTimestampsAndWatermarks
.keyBy(...)
.timeWindow(Time.seconds(10))
.reduce( (a, b) => a.add(b) )
.addSink(...)
MyTimestampsAndWatermarks须要承继并实现TimestampAssigner。TimestampAssigner是一个函数式插口类unix时间戳在线转换工具,它的源码如下:
public interface TimestampAssigner extends Function {
long extractTimestamp(T element, long previousElementTimestamp);
}
extractTimestamp方式为数据流中的每位元素T的EventTime形参。
TimestampAssigner主要有两种实现方法,一种是周期性地(Periodic)生成Watermark,一种是挨个式地(Punctuated)生成Watermark。假如同时也在Source阶段设置了时间戳,那使用这些方法设置的时间戳和Watermark会将Source阶段的设置覆盖。
AssignerWithPeriodicWatermarks
AssignerWithPeriodicWatermarks是一个承继了TimestampAssigner的插口类:
public interface AssignerWithPeriodicWatermarks extends TimestampAssigner {
Watermark getCurrentWatermark();
}
它可以周期性地生成Watermark,其中,这个周期是可以设置的,默认情况下是每200微秒生成一个Watermark,或则说Flink每200微秒调用一次getCurrentWatermark技巧。我们可以在执行环境中设置这个周期:
// 每5000毫秒生成一个Watermark
env.getConfig.setAutoWatermarkInterval(5000L)
下边的代码具体实现了AssignerWithPeriodicWatermarksunix时间戳在线转换工具,它抽取元素中的第二个数组为EventTime,每次抽取完时间戳后,更新时间戳最大值,之后以时间戳最大值慢1分钟的时间作为Watermark发送出去。
input.assignTimestampsAndWatermarks(new MyPeriodicAssigner)
// 假设数据流的元素有两个字段(String, Long),其中第二个字段是该元素的时间戳
class MyPeriodicAssigner extends AssignerWithPeriodicWatermarks[(String, Long)] {
val bound: Long = 60 * 1000 // 1分钟
var maxTs: Long = Long.MinValue // 已抽取的timestamp最大值
override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
// 更新maxTs为当前遇到的最大值
maxTs = maxTs.max(element._2)
// 使用第二个字段作为这个元素的Event Time
element._2
}
override def getCurrentWatermark: Watermark = {
// Watermark比Timestamp最大值慢1分钟
val watermark = new Watermark(maxTs - bound)
watermark
}
}
里面的代码假定了Watermark比已流入数据中时间戳最大者慢1分钟,超过1分钟的将被视为迟到数据。考虑到这些场景比较普遍,Flink早已帮我们封装好了这样的代码,名为BoundedOutOfOrdernessTimestampExtractor,其内部实现与前面的代码几乎一致,我们只须要将最大的延后时间作为参数传入。
val boundedOutOfOrder = input.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[(String, Long)](Time.minutes(1)) {
override def extractTimestamp(element: (String, Long)): Long = {
element._2
}
})
AssignerWithPunctuatedWatermarks
public interface AssignerWithPunctuatedWatermarks extends TimestampAssigner {
Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
}
这些方法对数据流中的每位元素挨个进行检测,假如数据流的元素中有一些特殊标记,我们要在checkAndGetNextWatermark方式中加以判定,并生成Watermark。checkAndGetNextWatermark方式会在extractTimestamp方式然后调用。
// 数据流有三个字段 第二个字段是时间戳,第三个字段判断是否为Watermark的标记
class MyPunctuatedAssigner extends AssignerWithPunctuatedWatermarks[(String, Long, Boolean)] {
override def extractTimestamp(element: (String, Long, Boolean), previousElementTimestamp: Long): Long = {
element._2
}
override def checkAndGetNextWatermark(element: (String, Long, Boolean), extractedTimestamp: Long): Watermark = {
if (element._3)
new Watermark(extractedTimestamp)
else
null
}
}
里面的代码中,假定数据流有三个数组,第二个数组是EventTime时间戳,第三个数组标记是否是Watermark。checkAndGetNextWatermark对每位元素进行检测,判定是否须要生成新的Watermark。
平衡延后和确切性
至此,我们早已了解了Flink的EventTime和Watermark生成方式,这么具体怎么操作呢?实际上,这个问题可能并没有一个标准答案。批处理中,数据都早已打算好了,不须要考虑未来新流入的数据,而流处理中,我们没法完全预知有多少迟到数据,数据的流入依赖业务的场景、数据的输入、网络的传输、集群的性能等等。Watermark是一种在延后和确切性之间平衡的策略:Watermark与风波的时间戳贴合较紧,一些重要数据将被当作迟到数据,影响估算结果的确切性;Watermark设置得较松,整个应用的延后降低,更多的数据会先缓存上去以等待估算,会降低显存的压力。对待具体的业务场景,我们可能须要反复尝试,通过一些监控手段来不断迭代和调整时间策略。