1.ProcessFunction介绍
1.1 ProcessFunction基本构成
ProcessFunction
是一个低级流处理操作,可以访问所有(非循环)流应用程序的基本构建块:
- 事件(流元素)
- state(容错,一致,仅在keyed stream上)
- 定时器(事件时间和处理时间,仅限keyed stream)
该ProcessFunction
可以被看作是一个可以访问keyed state
和定时器的FlatMapFunction
。它可以对输入流中接收的每个事件进行调用处理。
状态
对于容错的state,ProcessFunction
可以访问Flink的keyed state,可以通过其访问 RuntimeContext
,类似于其他有状态函数访问keyed state
的方式。
time
定时器允许应用程序对processing time
和 event time.
的变化作出反应。每次调用该函数processElement(...)
都会获得一个Context
对象,该对象可以访问元素的事件时间戳和TimerService。的TimerService
可用于注册为将来事件- /处理-时刻回调。
触发时间
达到计时器的特定时间时,将onTimer(...)
调用该方法。在event time上注册时间为T的timer,一旦watermark大于或等于T,就会触发onTimer。在该调用期间,所有状态再次限定为创建计时器的key的状态,允许计时器操纵keyed state
。
注意如果要访问键控状态和计时器,则必须应用ProcessFunction
键控流:
stream.keyBy(...).process(new MyProcessFunction())
2.实战 机器宕机告警
2.1 需求
filebeat采集机器数据到kafka,对于某台机器2分钟内没有新数据流入kafka,则判定这台机器宕机。
2.2 Main函数
StreamExecutionEnvironment env = ...; //
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //使用eventTime
DataSource<MetricEvent> source ;//读取kafka
SingleOutputStreamOperator<SimplifyMetricEvent> data = source.flatMap(new FirstFilterFunction()).assignTimestampsAndWatermarks(new MetricLagWatermarkExtractor());
int timeLimit=2*60*1000;
SingleOutputStreamOperator<SimplifyMetricEvent>
data.keyBy(SimplifyMetricEvent::getIp).process(new OutageFunction(timeLimit)).print();
注意:MetricLagWatermarkExtractor 一定要对异常数据做处理,比如时间戳大于当前时间,这个时间戳就不能作为watermark,否则后续onTimer方法的调用时间就不准确。
2.3 ProcessFunction
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.List;
public class OutageFunction extends KeyedProcessFunction<String, SimplifyMetricEvent, SimplifyMetricEvent> {
private ValueState<SimplifyMetricEvent> state;
private int delay;
@Override
public void open(Configuration configuration) {
TypeInformation<SimplifyMetricEvent> info = TypeInformation.of(new TypeHint<SimplifyMetricEvent>() {
});
TypeInformation<Boolean> resolveInfo = TypeInformation.of(new TypeHint<Boolean>() {
});
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", info));
}
public OutageFunction(int delay) {
this.delay = delay;
}
@Override
public void processElement(SimplifyMetricEvent simplifyMetricEvent, Context ctx, Collector<SimplifyMetricEvent> collector) throws Exception {
SimplifyMetricEvent current = state.value();
if (current == null) {
current = new SimplifyMetricEvent(simplifyMetricEvent.getClusterName(), simplifyMetricEvent.getHostIp(),
simplifyMetricEvent.getTimestamp(), simplifyMetricEvent.getResolve(), System.currentTimeMillis());
}
current.setTimestamp(simplifyMetricEvent.getTimestamp());
current.setSystemTimestamp(System.currentTimeMillis());
state.update(current);
ctx.timerService().registerEventTimeTimer(current.getSystemTimestamp() + delay);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<SimplifyMetricEvent> out) throws Exception {
// get the state for the key that scheduled the timer
//获取计划定时器的key的状态
SimplifyMetricEvent result = state.value();
// 检查是否是过时的定时器或最新的定时器
if (result != null && timestamp >= result.getSystemTimestamp() + delay) {
out.collect(result); //宕机发生,往下游发送事件
ctx.timerService().registerEventTimeTimer(timestamp + delay);//注册下一个宕机事件
result.setSystemTimestamp(timestamp);
state.update(result);
}
}
}
ctx.timerService().registerEventTimeTimer(current.getSystemTimestamp() + delay);
就是定义一个事件触发器,触发的时间是current.getSystemTimestamp() + delay
。到达该时间则调用
onTimer(long timestamp, OnTimerContext ctx, Collector<SimplifyMetricEvent> out)
。
3.流程解析
1
data:SingleOutputStreamOperator 调用keyBy形成 KeyedStream,调用process
@Internal
public <R> SingleOutputStreamOperator<R> process(
KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
TypeInformation<R> outputType) {
KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
return transform("KeyedProcess", outputType, operator);
}
keyedProcessFunction
就是上边我们自定义的OutageFunction
。
这里生成的 KeyedProcessOperator
2
public class KeyedProcessOperator<K, IN, OUT>
extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
KeyedProcessOperator
实现了 Triggerable
@Override //实现Triggerable
public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.setAbsoluteTimestamp(timer.getTimestamp());
invokeUserFunction(TimeDomain.EVENT_TIME, timer);
}
@Override //实现Triggerable
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
collector.eraseTimestamp();
invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
}
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
collector.setTimestamp(element);
context.element = element;
userFunction.processElement(element.getValue(), context, collector);
context.element = null;
}
private void invokeUserFunction(
TimeDomain timeDomain,
InternalTimer<K, VoidNamespace> timer) throws Exception {
onTimerContext.timeDomain = timeDomain;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
onTimerContext.timeDomain = null;
onTimerContext.timer = null;
}
userFunction.processElement(element.getValue(), context, collector);
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
userFunction
就是我们上面的OutageFunction
。
这里看到在onEventTime
或者onProcessingTime
方法调用的时候才会调用userFunction.onTimer。那么 onEventTime
什么时候触发呢?
3.以onEventTime为例
进入到
InternalTimerServiceImpl
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
eventTimeTimersQueue.poll();
keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer);
}
}
也就是说InternalTimerServiceImpl
调用advanceWatermark
时我们的onEventTime
方法才调用。而advanceWatermark
方法的入参time
是当前operator的watermark所代表的时间。那么什么时候调用advanceWatermark
呢?这个等下再看。
这个方法里面的eventTimeTimersQueue
是
/**
* Event time timers that are currently in-flight.
*/
private final KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue;
当我们调用时ctx.timerService().registerEventTimeTimer(current.getSystemTimestamp() + delay);
就是调用
@Override
public void registerEventTimeTimer(N namespace, long time) {
eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
}
向里eventTimeTimersQueue
存储TimerHeapInternalTimer
(包含key,timestamp等)。
当调用advanceWatermark时,更新currentWatermark,从eventTimeTimersQueue里peek出timer,判断当前watermark的时间是否大于timer里的时间,若大于,则从队列里弹出这个timer调用 triggerTarget.onEventTime(timer)
也就是调用 KeyedProcessOperator.onEventTime
,最终调用到里我们自定义OutageFunction
的onTimer
方法。
3.总结一下
如果我们的env用的是 TimeCharacteristic.EventTime
,那么我们自定义的 KeyedProcessFunction
的onTimer
触发时间是这个算子的watermark时间大于 ctx.timerService().registerEventTimeTimer(current.getSystemTimestamp() + delay)
注册的时间时才会触发。
注意因为这里的触发时间和watermark强相关,在上游算子assignTimestampsAndWatermarks
时一定正确处理wartermark的值。
todo:
什么时候调用InternalTimerServiceImpl
的advanceWatermark
呢?