温州做高端网站公司,网站如何在国外推广,开购物网站需要多少钱,付费推广网站背景
当我们想要实现提前触发计算的触发器时#xff0c;我们可以使用ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger作为触发器达到比如几分钟触发一次计算并发送计算结果的类#xff0c;我们本文就从代码角度解析下实现自定义触发器的一些注意事项
Continuo…背景
当我们想要实现提前触发计算的触发器时我们可以使用ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger作为触发器达到比如几分钟触发一次计算并发送计算结果的类我们本文就从代码角度解析下实现自定义触发器的一些注意事项
ContinuousEventTimeTrigger源码解析
PublicEvolving
public class ContinuousEventTimeTriggerW extends Window extends TriggerObject, W {private static final long serialVersionUID 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptorLong stateDesc new ReducingStateDescriptor(fire-time, new Min(), LongSerializer.INSTANCE);private ContinuousEventTimeTrigger(long interval) {this.interval interval;}Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)throws Exception {if (window.maxTimestamp() ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediately// 这里只需要fire触发计算为什么不是FIRE_AND_PURGE?return TriggerResult.FIRE;} else {// 这里注册一个结束窗口的计时器是否必要ctx.registerEventTimeTimer(window.maxTimestamp());}ReducingStateLong fireTimestamp ctx.getPartitionedState(stateDesc);if (fireTimestamp.get() null) {long start timestamp - (timestamp % interval);long nextFireTimestamp start interval;ctx.registerEventTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);}return TriggerResult.CONTINUE;}Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {//为什么是FIRE而不是FIRE_AND_PURGEif (time window.maxTimestamp()) {return TriggerResult.FIRE;}ReducingStateLong fireTimestampState ctx.getPartitionedState(stateDesc);Long fireTimestamp fireTimestampState.get();if (fireTimestamp ! null fireTimestamp time) {fireTimestampState.clear();fireTimestampState.add(time interval);ctx.registerEventTimeTimer(time interval);return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}Overridepublic void clear(W window, TriggerContext ctx) throws Exception {// 清除计时器ReducingStateLong fireTimestamp ctx.getPartitionedState(stateDesc);Long timestamp fireTimestamp.get();if (timestamp ! null) {ctx.deleteEventTimeTimer(timestamp);fireTimestamp.clear();}}Overridepublic boolean canMerge() {return true;}Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {ctx.mergePartitionedState(stateDesc);Long nextFireTimestamp ctx.getPartitionedState(stateDesc).get();if (nextFireTimestamp ! null) {ctx.registerEventTimeTimer(nextFireTimestamp);}}Overridepublic String toString() {return ContinuousEventTimeTrigger( interval );}VisibleForTestingpublic long getInterval() {return interval;}/*** Creates a trigger that continuously fires based on the given interval.** param interval The time interval at which to fire.* param W The type of {link Window Windows} on which this trigger can operate.*/public static W extends Window ContinuousEventTimeTriggerW of(Time interval) {return new ContinuousEventTimeTrigger(interval.toMilliseconds());}private static class Min implements ReduceFunctionLong {private static final long serialVersionUID 1L;Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}}
}
ContinuousProcessingTimeTrigger源码
PublicEvolving
public class ContinuousProcessingTimeTriggerW extends Window extends TriggerObject, W {private static final long serialVersionUID 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptorLong stateDesc new ReducingStateDescriptor(fire-time, new Min(), LongSerializer.INSTANCE);private ContinuousProcessingTimeTrigger(long interval) {this.interval interval;}Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)throws Exception {ReducingStateLong fireTimestamp ctx.getPartitionedState(stateDesc);timestamp ctx.getCurrentProcessingTime();// 注册计时器为什么这里不需要类似ContinuousEventTimeTrigger一样注册一个窗口结束时间的计时器if (fireTimestamp.get() null) {long start timestamp - (timestamp % interval);long nextFireTimestamp start interval;ctx.registerProcessingTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);return TriggerResult.CONTINUE;}return TriggerResult.CONTINUE;}Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception {ReducingStateLong fireTimestamp ctx.getPartitionedState(stateDesc);if (fireTimestamp.get().equals(time)) {fireTimestamp.clear();fireTimestamp.add(time interval);ctx.registerProcessingTimeTimer(time interval);return TriggerResult.FIRE;}//为什么这里没有FIRE_AND_PURGE状态是何时清理的return TriggerResult.CONTINUE;}Overridepublic void clear(W window, TriggerContext ctx) throws Exception {//清除计时器// State could be merged into new window.ReducingStateLong fireTimestamp ctx.getPartitionedState(stateDesc);Long timestamp fireTimestamp.get();if (timestamp ! null) {ctx.deleteProcessingTimeTimer(timestamp);fireTimestamp.clear();}}Overridepublic boolean canMerge() {return true;}Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {// States for old windows will lose after the call.ctx.mergePartitionedState(stateDesc);// Register timer for this new window.Long nextFireTimestamp ctx.getPartitionedState(stateDesc).get();if (nextFireTimestamp ! null) {ctx.registerProcessingTimeTimer(nextFireTimestamp);}}VisibleForTestingpublic long getInterval() {return interval;}Overridepublic String toString() {return ContinuousProcessingTimeTrigger( interval );}/*** Creates a trigger that continuously fires based on the given interval.** param interval The time interval at which to fire.* param W The type of {link Window Windows} on which this trigger can operate.*/public static W extends Window ContinuousProcessingTimeTriggerW of(Time interval) {return new ContinuousProcessingTimeTrigger(interval.toMilliseconds());}private static class Min implements ReduceFunctionLong {private static final long serialVersionUID 1L;Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}}
}疑问
1.为什么ContinuousEventTimeTrigger需要注册一个窗口结束时间的计时器而ContinuousProcessingTimeTrigger不注册 答案: 其实我们需要看下它注册后的目的作用是什么ContinuousEventTimeTrigger的ontimer在处理窗口结束的触发器时会返回FIRE触发计算那问题就来了如果只是触发计算那么如果没有那么仅仅只是窗口结束的时候没有触发一次计算而已。所以这里不是必须的 2.为什么ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger返回的结果是FIRE而不是FIRE_AND_PURGE状态是什么时候清理的 答案 首先要明确状态的清理这个逻辑状态其实包括窗口的状态触发器的状态等返回FIRE仅仅是触发计算而不会清理任何状态而假设返回FIRE_AND_PURGE的作用是触发计算并进行窗口状态的清理(注意这里是不包括触发器的状态的清理的)其实状态的清理是由WindowOperator在清理时间到时进行的对于事件时间是窗口结束时间迟到容忍间隔时间对于处理时间是窗口结束时间所以不必要在窗口结束时间到的时候返回FIRE_AND_PURGE可以统一由WindowOperator在清理时间到之后统一清理状态