/** * An operator that implements the logic for windowing based on a { @link WindowAssigner} and * { @link Trigger}. * *
* When an element arrives it gets assigned a key using a {
@link KeySelector} and it gets * assigned to zero or more windows using a { @link WindowAssigner}. Based on this, the element * is put into panes. A pane is the bucket of elements that have the same key and same * { @code Window}. An element can be in multiple panes if it was assigned to multiple windows by the * { @code WindowAssigner}. * *
* Each pane gets its own instance of the provided {
@code Trigger}. This trigger determines when * the contents of the pane should be processed to emit results. When a trigger fires, * the given { @link InternalWindowFunction} is invoked to produce the results that are emitted for * the pane to which the { @code Trigger} belongs. * * @param The type of key returned by the { @code KeySelector}. * @param The type of the incoming elements. * @param The type of elements emitted by the { @code InternalWindowFunction}. * @param The type of { @code Window} that the { @code WindowAssigner} assigns. */@Internalpublic class WindowOperator extends AbstractUdfStreamOperator > implements OneInputStreamOperator , Triggerable, InputTypeConfigurable { // ------------------------------------------------------------------------ // Configuration values and user functions // ------------------------------------------------------------------------ protected final WindowAssigner windowAssigner; protected final KeySelector keySelector; protected final Trigger trigger; protected final StateDescriptor , ?> windowStateDescriptor; /** * The allowed lateness for elements. This is used for: *
*
Deciding if an element should be dropped from a window due to lateness. *
Clearing the state of a window if the system time passes the * { @code window.maxTimestamp + allowedLateness} landmark. *
*/ protected final long allowedLateness; //允许late多久,即当watermark已经触发后 /** * To keep track of the current watermark so that we can immediately fire if a trigger * registers an event time callback for a timestamp that lies in the past. */ protected transient long currentWatermark = Long.MIN_VALUE; protected transient Context context = new Context(null, null); //Trigger Context protected transient WindowAssigner.WindowAssignerContext windowAssignerContext; //只为获取getCurrentProcessingTime // ------------------------------------------------------------------------ // State that needs to be checkpointed // ------------------------------------------------------------------------ /** * Processing time timers that are currently in-flight. */ protected transient PriorityQueue > processingTimeTimersQueue; //Timer用于存储timestamp,key,window, queue按时间排序 /** * Current waiting watermark callbacks. */ protected transient Set > watermarkTimers; protected transient PriorityQueue > watermarkTimersQueue; // protected transient Map > mergingWindowsByKey; //用于记录merge后的stateWindow和window的对应关系
对于window operator而已,最关键的是WindowAssigner和Trigger
WindowAssigner
WindowAssigner,用于指定一个tuple应该被分配到那些windows去
借用个图,可以看出有多少种WindowAssigner
对于WindowAssigner,最关键的接口是,assignWindows
为一个element,分配一组windows, Collection<W>
@PublicEvolvingpublic abstract class WindowAssigner implements Serializable { private static final long serialVersionUID = 1L; /** * Returns a { @code Collection} of windows that should be assigned to the element. * * @param element The element to which windows should be assigned. * @param timestamp The timestamp of the element. * @param context The { @link WindowAssignerContext} in which the assigner operates. */ public abstract Collection assignWindows(T element, long timestamp, WindowAssignerContext context); /** * Returns the default trigger associated with this { @code WindowAssigner}. */ public abstract Trigger getDefaultTrigger(StreamExecutionEnvironment env); /** * Returns a { @link TypeSerializer} for serializing windows that are assigned by * this { @code WindowAssigner}. */ public abstract TypeSerializer getWindowSerializer(ExecutionConfig executionConfig);
实际看下,具体WindowAssigner的实现
public class TumblingProcessingTimeWindows extends WindowAssigner
public class SlidingEventTimeWindows extends WindowAssigner { private final long size; private final long slide; @Override public Collection assignWindows(Object element, long timestamp, WindowAssignerContext context) { if (timestamp > Long.MIN_VALUE) { List windows = new ArrayList<>((int) (size / slide)); long lastStart = timestamp - timestamp % slide; for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); //可以看到这里会assign多个TimeWindow,因为是slide } return windows; } else { } } @Override public Trigger getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); }
@Overridepublic void processWatermark(Watermark mark) throws Exception { boolean fire; do { Timer timer = watermarkTimersQueue.peek(); //这叫watermarkTimersQueue,是否有些歧义,叫eventTimerQueue更好理解些 if (timer != null && timer.timestamp <= mark.getTimestamp()) { fire = true; watermarkTimers.remove(timer); watermarkTimersQueue.remove(); context.key = timer.key; context.window = timer.window; setKeyContext(timer.key); //stateBackend.setCurrentKey(key); AppendingState windowState; MergingWindowSet mergingWindows = null; if (windowAssigner instanceof MergingWindowAssigner) { //MergingWindow mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); if (stateWindow == null) { // then the window is already purged and this is a cleanup // timer set due to allowed lateness that has nothing to clean, // so it is safe to just ignore continue; } windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); } else { //普通window windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); //取得window的state } ACC contents = windowState.get(); if (contents == null) { // if we have no state, there is nothing to do continue; } TriggerResult triggerResult = context.onEventTime(timer.timestamp); //触发onEvent if (triggerResult.isFire()) { fire(context.window, contents); } if (triggerResult.isPurge() || (windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { cleanup(context.window, windowState, mergingWindows); } } else { fire = false; } } while (fire); //如果fire为true,继续看下个waterMarkTimer是否需要fire output.emitWatermark(mark); //把waterMark传递下去 this.currentWatermark = mark.getTimestamp(); //更新currentWaterMark}
@Overridepublic void trigger(long time) throws Exception { boolean fire; //Remove information about the triggering task processingTimeTimerFutures.remove(time); processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time)); do { Timer timer = processingTimeTimersQueue.peek(); if (timer != null && timer.timestamp <= time) { fire = true; processingTimeTimers.remove(timer); processingTimeTimersQueue.remove(); context.key = timer.key; context.window = timer.window; setKeyContext(timer.key); AppendingState windowState; MergingWindowSet mergingWindows = null; if (windowAssigner instanceof MergingWindowAssigner) { mergingWindows = getMergingWindowSet(); W stateWindow = mergingWindows.getStateWindow(context.window); if (stateWindow == null) { // then the window is already purged and this is a cleanup // timer set due to allowed lateness that has nothing to clean, // so it is safe to just ignore continue; } windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor); } else { windowState = getPartitionedState(context.window, windowSerializer, windowStateDescriptor); } ACC contents = windowState.get(); if (contents == null) { // if we have no state, there is nothing to do continue; } TriggerResult triggerResult = context.onProcessingTime(timer.timestamp); if (triggerResult.isFire()) { fire(context.window, contents); } if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.timestamp))) { cleanup(context.window, windowState, mergingWindows); } } else { fire = false; } } while (fire);}
EvictingWindowOperator
Evicting对于WindowOperator而言,就是多了Evictor
private void fire(W window, Iterable > contents) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); // Work around type system restrictions... int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window); //执行evict FluentIterable projectedContents = FluentIterable .from(contents) .skip(toEvict) .transform(new Function , IN>() { @Override public IN apply(StreamRecord input) { return input.getValue(); } }); userFunction.apply(context.key, context.window, projectedContents, timestampedCollector);}