网站建设知识
Apache Flink源码解析之stream-operator
2025-07-22 09:59  点击:0

前面我们谈论了Flink stream中的transformation。你可以将transformation看成编写Flink程序并构建流式处理程序的必要组成部分(静态表现形式);而本篇我们将探讨transformation在Flink运行时对应的动态表现形式——operator。他们之间的映射关系见下图:

具体的探讨可以查看前文:Flink中的一些核心概念

StreamOperator

所有operator的最终基类,operator的分类方式,按照输入流个数不同分为:

无输入:StreamSource 单个流输入:oneInputStreamOperator 两个流输入:TwoInputStreamOperator

跟生命周期有关的核心抽象方法:

setup : 实例化operator open :该方法会在任何元素被处理之前执行,它的实现通常包含了operator的初始化逻辑 close :该方法在所有的元素都进入到operator被处理之后调用 dispose :该方法在operator生命周期的最后阶段执行,主要用于回收资源

StreamOperator及其实现类中还包含了一些状态恢复与保存相关的逻辑,但这些不是本文的主题,所有暂时不做探讨。

先来看一下整个package的类关系图:

我们整个剖析方式大致也按照以上operator的分类方式以及类的层次结构来。

StreamSource

作为一个流处理DAG的起点,source operator相比其他operator无疑是特别的(从类的继承关系图也可以看出来)。

它需要接受SourceFunction的实例。并且我们可以看到,它的chaining strategy是HEAD(它表示operator不能有前置operator,但可以作为其他operator的前置operator,下文会谈到)。

this.chainingStrategy = ChainingStrategy.HEAD;

StreamSource的实现略显复杂,因为它涉及到我们前面文章谈SourceFunction时谈到的SourceFunction.SourceContext的实现。在这里提供了三个实现,分别对应我们之前谈到的Flink对事件时间的三个分类:

NonTimestampContext:针对ProcessingTime,该SourceContext将时间戳设置为-1,并且不发射watermark AutomaticWatermarkContext:针对IngestionTime,提供自动的watermark发射机制的SourceContext ManualWatermarkContext:针对EventTime的人工发射watermark的SourceContext

它们之间的对应关系也体现在其run方法的实现中:

switch (timeCharacteristic) {

case EventTime:

ctx = new ManualWatermarkContext<>(this, lockingObject, collector);

break;

case IngestionTime:

ctx = new AutomaticWatermarkContext<>(this, lockingObject, collector,

getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());

break;

case ProcessingTime:

ctx = new NonTimestampContext<>(this, lockingObject, collector);

break;

default:

throw new Exception(String.valueOf(timeCharacteristic));

}

在run方法内部会调用SourceFunction的run方法:

try {

userFunction.run(ctx);

if (!isCanceledOrStopped()) {

ctx.emitWatermark(Watermark.MAX_WATERMARK);

}

} finally {

ctx.close();

}

StreamSource通过一个属性:canceledOrStopped来控制sourceFunction的停止。

整个StreamSource的运行逻辑由run来表述,通过cancel来控制停止逻辑。

NonTimestampContext

NonTimestampContext会忽略时间戳,因此它的实现里稍微特别一点的地方在下面的这两个方法:

public void collectWithTimestamp(T element, long timestamp) {

collect(element);

}

以及

public void emitWatermark(Watermark mark) {

owner.checkAsyncException();

}

第一个方法忽略了时间戳,第二个方法不发送watermark。

ManualWatermarkContext

无需特别说明

AutomaticWatermarkContext

该类是自动发送watermark的实现,在构造器中接收参数watermarkInterval来指定自动发送watermark的时间间隔。具体的实现机制是,新建一个独立的发射线程,以指定的时间间隔发射:

this.scheduleExecutor = Executors.newScheduledThreadPool(1);

this.watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {

@Override

public void run() {

final long currentTime = System.currentTimeMillis();

if (currentTime > nextWatermarkTime) {

final long watermarkTime = currentTime - (currentTime % watermarkInterval);

synchronized (lockingObjectParam) {

if (currentTime > nextWatermarkTime) {

outputParam.emitWatermark(new Watermark(watermarkTime));

nextWatermarkTime += watermarkInterval;

}

}

}

}

}, 0, watermarkInterval, TimeUnit.MILLISECONDS);

除了这种基于时间的以固定频率发射watermark的机制,在collect方法被调用时,也会检查当前的时间戳,如果达到发送条件也会触发emit watermark。

而因为该类实现的是自动发送,在构造器中实现一个定时发送机制,所以emitWatermark方法也就不需要再实现发送逻辑(因为已不再需要用户程序调用emitWatermark方法了),而该方法在该类中的主要任务是负责停止自动发送。停止自动发送的触发条件是收到最后一个元素的信号(将最后一个元素的时间戳设置为Long.MAX_VALUE),emitWatermark收到该标识后,再将其往下游传递并关闭定时发送线程。

OneInputStreamOperator

单一输入流的operator接口,继承自StreamOperator。提供了两个接口方法:

processElement:处理到达该operator的一个元素 processWatermark:处理一个Watermark

TwoInputStreamOperator

支持两个流作为输入的operator,同样继承自StreamOperator。扩充了多个接口方法:

processElement1 : 处理来自第一个输入的某个元素 processElement2 : 处理来自第二个输入的某个元素 processWatermark1 : 处理来自第一个输入的一个Watermark processWatermark2 : 处理来自第二个输入的一个Watermark

辅助实现类

Output

Collector的扩展,增加了发射WaterMark的功能。该接口主要供operator用于发射元素或者WaterMark。

emitWatermark : 该发射WaterMark将广播给下游的所有operator

TimeCharacteristic

Flink在涉及到时间相关的处理时,将时间划分为三类。而时间类型的定义在Flink中就是用该枚举来表示:

ProcessingTime IngestionTime EventTime

这三种时间类型之前我们曾多次提及,这里不再啰嗦

TimestampedCollector

Output的包装器实现,它用于给元素设置时间戳

AbstractStreamOperator

该抽象类为实现一个具体的operator提供基本的支持,Flink内置提供的operator全部都直接或间接继承自AbstractStreamOperator。

它内部包含了三大类的属性:

配置属性 运行时属性 键值对状态属性

大都数方法都是辅助方法,值得一提的是setup方法。从这里我们可以看到所有operator标识符的生成方式:

String operatorIdentifier = getClass().getSimpleName() + "_" + config.getVertexID() + "_" + runtimeContext.getIndexOfThisSubtask();

可以看到标识是由”_”间隔的三段拼接而成。三段分别是:类名,vertex id,以及当前subtask的索引。

然后基于此标识,创建了用于存储状态的stateBackend:

stateBackend = container.createStateBackend(operatorIdentifier, keySerializer);

stateBackend 在 dispose方法中会被关闭。

AbstractStreamOperator并没有对open/close等生命周期方法提供具体的实现,这些方法的具体实现被后延至后面谈到的AbstractUdfStreamOperator中。

AbstractUdfStreamOperator

该类主要针对operator生命周期相关的方法(open/close/dispose)提供了模板实现。而这些实现都统一针对用户定义的Function的实例(简称udf)。

ChainingStrategy

该枚举定义了operator的chain strategy(链接策略)。当一个operator链接到其前置operator时,意味着它们将在同一个线程上执行。StreamOperator的默认值是HEAD,这意味着它将没有前置operator,不过它有可能成为其他operator的前置operator。大部分StreamOperator将该枚举以ALWAYS覆盖,表示它们将链接到一个前置operator。

它的三个枚举值:

ALWAYS :上面已经提到过,它允许将当前operator链接到某前置operator,这是提升性能的良好实践,它能够提升operator的并行度 NEVER :该策略不支持operator被链接到某前置operator也不支持被作为其他operator的前置operator。 HEAD :该策略表示operator没有前置operator,不过可以作为其他operator的chain header

内置的Operator实现

StreamCounter

元素累加器,没有什么特别的

StreamProject

这里需要解释一下,此处的project,并非通常所指的项目的意思,而是投射、投影的意思。你可以将其类比于SQL中的SELECT子句。因此他允许你选择你需要的fields集合。这通过其构造器的一个字段索引数组来指定:

在processElement方法中,它依次遍历所有需要的字段索引,将元素中需要的字段提取出来,放入一个用于输出的outTuple,最后再将其发射出去:

public void processElement(StreamRecord element) throws Exception {

for (int i = 0; i < this.numFields; i++) {

outTuple.setField(((Tuple) element.getValue()).getField(fields[i]), i);

}

output.collect(element.replace(outTuple));

}

StreamFilter

filter operator,处理逻辑很简单,根据自定义的FilterFunction方法,对每个元素进行过滤,如果满足过滤条件,则将该元素emit出去。

StreamMap

map operator,根据传入的MapFunction,对每个元素应用map操作后将其发射出去。

StreamFlatMap

flatmap operator接收FlatMapFunction函数,有一些特别之处:在其open方法中,它初始化了一个TimestampedCollector,作为传递给FlatMapFunction的collector,该collector是给那些特定的userFunction使用的,并且用于给他们操作的元素设置时间戳。

StreamGroupedFold

分组的fold operator,fold函数的执行依赖于一个初始化值initialValue。因此这里涉及到状态保存。并且状态是跟具体的分区关联的。因此,在open方法的实现中,需要获得跟分区关联的ValueState:

ValueStateDescriptor stateId = new ValueStateDescriptor<>(STATE_NAME, outTypeSerializer, null);

values = getPartitionedState(stateId);

processElement方法的实现,涉及到一系列的操作:从ValueState中获取数据,作为“新”的初始值跟当前元素一起进行fold函数运算,获得结果后更新ValueState,然后将获得的结果emit出去。

StreamGroupedReduce

按分组进行reduce操作的operator.

基于特定的状态名称:

private static final String STATE_NAME = "_op_state";

构建状态id

ValueStateDescriptor stateId = new ValueStateDescriptor<>(STATE_NAME, serializer, null);

然后再获取状态值:

values = getPartitionedState(stateId);

以上两个动作在open方法中实现

在processElement方法中,分为两种情况:

如果之前已存在状态值,那么拿当前值跟之前的状态值做reduce并获得结果,将结果再次更新到最新状态并emit出去 如果之前不存在状态值,那么直接将当前值更新到状态中,并将当前值emit出去

StreamSink

sink operator,通常是流处理的最后一个operator。它接收SinkFunction的实例。在processElement中依次调用其invoke方法。

小结

本文主要探讨了stream transformation的运行时形式operator的大致实现。