博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink – window operator
阅读量:6882 次
发布时间:2019-06-27

本文共 14109 字,大约阅读时间需要 47 分钟。

参考,

 

 

WindowOperator

window operator通过WindowAssigner和Trigger来实现它的逻辑

当一个element到达时,通过KeySelector先assign一个key,并且通过WindowAssigner assign若干个windows,这样这个element会被放入若干个pane

一个pane会存放所有相同key和相同window的elements

/** * An operator that implements the logic for windowing based on a {
@link WindowAssigner} and * {
@link Trigger}. * *

* When an element arrives it gets assigned a key using a {

@link KeySelector} and it gets * assigned to zero or more windows using a {
@link WindowAssigner}. Based on this, the element * is put into panes. A pane is the bucket of elements that have the same key and same * {
@code Window}. An element can be in multiple panes if it was assigned to multiple windows by the * {
@code WindowAssigner}. * *

* Each pane gets its own instance of the provided {

@code Trigger}. This trigger determines when * the contents of the pane should be processed to emit results. When a trigger fires, * the given {
@link InternalWindowFunction} is invoked to produce the results that are emitted for * the pane to which the {
@code Trigger} belongs. * * @param
The type of key returned by the {
@code KeySelector}. * @param
The type of the incoming elements. * @param
The type of elements emitted by the {
@code InternalWindowFunction}. * @param
The type of {
@code Window} that the {
@code WindowAssigner} assigns. */@Internalpublic class WindowOperator
extends AbstractUdfStreamOperator
> implements OneInputStreamOperator
, Triggerable, InputTypeConfigurable { // ------------------------------------------------------------------------ // Configuration values and user functions // ------------------------------------------------------------------------ protected final WindowAssigner
windowAssigner; protected final KeySelector
keySelector; protected final Trigger
trigger; protected final StateDescriptor
, ?> windowStateDescriptor; /** * The allowed lateness for elements. This is used for: *

    *
  • Deciding if an element should be dropped from a window due to lateness. *
  • Clearing the state of a window if the system time passes the * {
    @code window.maxTimestamp + allowedLateness} landmark. *
*/ protected final long allowedLateness; //允许late多久,即当watermark已经触发后 /** * To keep track of the current watermark so that we can immediately fire if a trigger * registers an event time callback for a timestamp that lies in the past. */ protected transient long currentWatermark = Long.MIN_VALUE; protected transient Context context = new Context(null, null); //Trigger Context protected transient WindowAssigner.WindowAssignerContext windowAssignerContext; //只为获取getCurrentProcessingTime // ------------------------------------------------------------------------ // State that needs to be checkpointed // ------------------------------------------------------------------------ /** * Processing time timers that are currently in-flight. */ protected transient PriorityQueue
> processingTimeTimersQueue; //Timer用于存储timestamp,key,window, queue按时间排序 /** * Current waiting watermark callbacks. */ protected transient Set
> watermarkTimers; protected transient PriorityQueue
> watermarkTimersQueue; // protected transient Map
> mergingWindowsByKey; //用于记录merge后的stateWindow和window的对应关系

 

对于window operator而已,最关键的是WindowAssigner和Trigger

 

WindowAssigner

WindowAssigner,用于指定一个tuple应该被分配到那些windows去

借用个图,可以看出有多少种WindowAssigner

对于WindowAssigner,最关键的接口是,assignWindows

为一个element,分配一组windows, Collection<W>

@PublicEvolvingpublic abstract class WindowAssigner
implements Serializable { private static final long serialVersionUID = 1L; /** * Returns a {
@code Collection} of windows that should be assigned to the element. * * @param element The element to which windows should be assigned. * @param timestamp The timestamp of the element. * @param context The {
@link WindowAssignerContext} in which the assigner operates. */ public abstract Collection
assignWindows(T element, long timestamp, WindowAssignerContext context); /** * Returns the default trigger associated with this {
@code WindowAssigner}. */ public abstract Trigger
getDefaultTrigger(StreamExecutionEnvironment env); /** * Returns a {
@link TypeSerializer} for serializing windows that are assigned by * this {
@code WindowAssigner}. */ public abstract TypeSerializer
getWindowSerializer(ExecutionConfig executionConfig);

实际看下,具体WindowAssigner的实现

public class TumblingProcessingTimeWindows extends WindowAssigner
{ @Override public Collection
assignWindows(Object element, long timestamp, WindowAssignerContext context) { final long now = context.getCurrentProcessingTime(); long start = now - (now % size); return Collections.singletonList(new TimeWindow(start, start + size)); //很简单,分配一个TimeWindow } @Override public Trigger
getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); //默认给出的是ProcessingTimeTrigger,如其名 }
public class SlidingEventTimeWindows extends WindowAssigner
{ private final long size; private final long slide; @Override public Collection
assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { List
windows = new ArrayList<>((int) (size / slide)); long lastStart = timestamp - timestamp % slide; for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); //可以看到这里会assign多个TimeWindow,因为是slide } return windows; } else { } } @Override public Trigger
getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); }

 

Trigger, Evictor

参考,

 

下面看看3个主要的接口,分别触发,onElement,onEventTime,onProcessingTime

processElement

处理element到达的逻辑,触发onElement

public void processElement(StreamRecord
element) throws Exception { Collection
elementWindows = windowAssigner.assignWindows( //通过WindowAssigner为element分配一系列windows element.getValue(), element.getTimestamp(), windowAssignerContext); final K key = (K) getStateBackend().getCurrentKey(); if (windowAssigner instanceof MergingWindowAssigner) { //如果是MergingWindow //....... } else { //如果是普通window for (W window: elementWindows) { // drop if the window is already late if (isLate(window)) { //late data的处理,默认是丢弃 continue; } AppendingState
windowState = getPartitionedState( //从backend中取出该window的状态,就是buffer的element window, windowSerializer, windowStateDescriptor); windowState.add(element.getValue()); //把当前的element加入buffer state context.key = key; context.window = window; //context的设计相当tricky和晦涩 TriggerResult triggerResult = context.onElement(element); //触发onElment,得到triggerResult if (triggerResult.isFire()) { //对triggerResult做各种处理 ACC contents = windowState.get(); if (contents == null) { continue; } fire(window, contents); //如果fire,真正去计算窗口中的elements } if (triggerResult.isPurge()) { cleanup(window, windowState, null); //purge,即去cleanup elements } else { registerCleanupTimer(window); } } }}

 

判断是否是late data的逻辑

protected boolean isLate(W window) {    return (windowAssigner.isEventTime() && (cleanupTime(window) <= currentWatermark));}private long cleanupTime(W window) {    long cleanupTime = window.maxTimestamp() + allowedLateness; //allowedLateness;     return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;}

 

fire逻辑

private void fire(W window, ACC contents) throws Exception {    timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());    userFunction.apply(context.key, context.window, contents, timestampedCollector);}

 

processWatermark

处理watermark,onEvent触发

@Overridepublic void processWatermark(Watermark mark) throws Exception {    boolean fire;    do {        Timer
timer = watermarkTimersQueue.peek(); //这叫watermarkTimersQueue,是否有些歧义,叫eventTimerQueue更好理解些 if (timer != null && timer.timestamp <= mark.getTimestamp()) { fire = true; watermarkTimers.remove(timer); watermarkTimersQueue.remove(); context.key = timer.key; context.window = timer.window; setKeyContext(timer.key); //stateBackend.setCurrentKey(key); AppendingState
windowState; MergingWindowSet
mergingWindows = null; if (windowAssigner instanceof MergingWindowAssigner) { //MergingWindow mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); if (stateWindow == null) { // then the window is already purged and this is a cleanup // timer set due to allowed lateness that has nothing to clean, // so it is safe to just ignore continue; } windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); } else { //普通window windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); //取得window的state } ACC contents = windowState.get(); if (contents == null) { // if we have no state, there is nothing to do continue; } TriggerResult triggerResult = context.onEventTime(timer.timestamp); //触发onEvent if (triggerResult.isFire()) { fire(context.window, contents); } if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { cleanup(context.window, windowState, mergingWindows); } } else { fire = false; } } while (fire); //如果fire为true,继续看下个waterMarkTimer是否需要fire output.emitWatermark(mark); //把waterMark传递下去 this.currentWatermark = mark.getTimestamp(); //更新currentWaterMark}

 

trigger

首先,这个函数的命名有问题,为何和前面的process…不匹配

这个是用来触发onProcessingTime,这个需要依赖系统时间的定时器来触发,逻辑和processWatermark基本等同,只是触发条件不一样

@Overridepublic void trigger(long time) throws Exception {    boolean fire;    //Remove information about the triggering task    processingTimeTimerFutures.remove(time);    processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time));    do {        Timer
timer = processingTimeTimersQueue.peek(); if (timer != null && timer.timestamp <= time) { fire = true; processingTimeTimers.remove(timer); processingTimeTimersQueue.remove(); context.key = timer.key; context.window = timer.window; setKeyContext(timer.key); AppendingState
windowState; MergingWindowSet
mergingWindows = null; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); if (stateWindow == null) { // then the window is already purged and this is a cleanup // timer set due to allowed lateness that has nothing to clean, // so it is safe to just ignore continue; } windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); } else { windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); } ACC contents = windowState.get(); if (contents == null) { // if we have no state, there is nothing to do continue; } TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); if (triggerResult.isFire()) { fire(context.window, contents); } if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { cleanup(context.window, windowState, mergingWindows); } } else { fire = false; } } while (fire);}

 

EvictingWindowOperator

Evicting对于WindowOperator而言,就是多了Evictor

private void fire(W window, Iterable
> contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); // Work around type system restrictions... int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window); //执行evict FluentIterable
projectedContents = FluentIterable .from(contents) .skip(toEvict) .transform(new Function
, IN>() { @Override public IN apply(StreamRecord
input) { return input.getValue(); } }); userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);}

关键的逻辑就是在fire的时候,在apply function之前,会先remove需要evict的elements

转载地址:http://tkbbl.baihongyu.com/

你可能感兴趣的文章
大数据||hadoop分布式集群安装
查看>>
华为设备默认console密码
查看>>
wxWidgets第四课 EVT_LEFT_UP关联鼠标弹起事件不生效
查看>>
【故障解决】ORA-06502错误解决
查看>>
Linux中./configure,make,make install的作用
查看>>
如何使用Docker、Docker-Compose和Rancher搭建部署Pipeline(四)
查看>>
Pinger2
查看>>
SQL Server 2014如何提升非在线的在线操作
查看>>
ASP.NET中模拟管理员用户提升权限
查看>>
beego3---gohttp底层实现
查看>>
二叉树
查看>>
.net framework3.5新特性1:Lambda表达式
查看>>
对Kalman(卡尔曼)滤波器的理解@@zz
查看>>
sybase备份与恢复及sybase常用语句
查看>>
EBS Concurrent Manager(并发管理器)异常处理
查看>>
Java动态数组
查看>>
ubuntu完美搭建git服务器【转】
查看>>
基于jQuery的表单验证插件:jValidate
查看>>
Vim编辑器运用的五个技巧
查看>>
物联网智能硬件设备常见攻击方法
查看>>