当前位置: 首页 > news >正文

做婚介网站可行性报告制作链接的app的软件

做婚介网站可行性报告,制作链接的app的软件,微机课做网站,b2c商城企业有哪些背景 当我们想要实现提前触发计算的触发器时,我们可以使用ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger作为触发器达到比如几分钟触发一次计算并发送计算结果的类,我们本文就从代码角度解析下实现自定义触发器的一些注意事项 Continuo…

背景

当我们想要实现提前触发计算的触发器时,我们可以使用ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger作为触发器达到比如几分钟触发一次计算并发送计算结果的类,我们本文就从代码角度解析下实现自定义触发器的一些注意事项

ContinuousEventTimeTrigger源码解析

@PublicEvolving
public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object, W> {private static final long serialVersionUID = 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptor<Long> stateDesc =new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);private ContinuousEventTimeTrigger(long interval) {this.interval = interval;}@Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)throws Exception {if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediately// 这里只需要fire触发计算,为什么不是FIRE_AND_PURGE?return TriggerResult.FIRE;} else {// 这里注册一个结束窗口的计时器是否必要?ctx.registerEventTimeTimer(window.maxTimestamp());}ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);if (fireTimestamp.get() == null) {long start = timestamp - (timestamp % interval);long nextFireTimestamp = start + interval;ctx.registerEventTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {//为什么是FIRE而不是FIRE_AND_PURGEif (time == window.maxTimestamp()) {return TriggerResult.FIRE;}ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);Long fireTimestamp = fireTimestampState.get();if (fireTimestamp != null && fireTimestamp == time) {fireTimestampState.clear();fireTimestampState.add(time + interval);ctx.registerEventTimeTimer(time + interval);return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {// 清除计时器ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);Long timestamp = fireTimestamp.get();if (timestamp != null) {ctx.deleteEventTimeTimer(timestamp);fireTimestamp.clear();}}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {ctx.mergePartitionedState(stateDesc);Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();if (nextFireTimestamp != null) {ctx.registerEventTimeTimer(nextFireTimestamp);}}@Overridepublic String toString() {return "ContinuousEventTimeTrigger(" + interval + ")";}@VisibleForTestingpublic long getInterval() {return interval;}/*** Creates a trigger that continuously fires based on the given interval.** @param interval The time interval at which to fire.* @param <W> The type of {@link Window Windows} on which this trigger can operate.*/public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval) {return new ContinuousEventTimeTrigger<>(interval.toMilliseconds());}private static class Min implements ReduceFunction<Long> {private static final long serialVersionUID = 1L;@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}}
}

ContinuousProcessingTimeTrigger源码

@PublicEvolving
public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {private static final long serialVersionUID = 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptor<Long> stateDesc =new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);private ContinuousProcessingTimeTrigger(long interval) {this.interval = interval;}@Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)throws Exception {ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);timestamp = ctx.getCurrentProcessingTime();// 注册计时器,为什么这里不需要类似ContinuousEventTimeTrigger一样注册一个窗口结束时间的计时器?if (fireTimestamp.get() == null) {long start = timestamp - (timestamp % interval);long nextFireTimestamp = start + interval;ctx.registerProcessingTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);return TriggerResult.CONTINUE;}return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}@Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception {ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);if (fireTimestamp.get().equals(time)) {fireTimestamp.clear();fireTimestamp.add(time + interval);ctx.registerProcessingTimeTimer(time + interval);return TriggerResult.FIRE;}//为什么这里没有FIRE_AND_PURGE?状态是何时清理的?return TriggerResult.CONTINUE;}@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {//清除计时器// State could be merged into new window.ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);Long timestamp = fireTimestamp.get();if (timestamp != null) {ctx.deleteProcessingTimeTimer(timestamp);fireTimestamp.clear();}}@Overridepublic boolean canMerge() {return true;}@Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {// States for old windows will lose after the call.ctx.mergePartitionedState(stateDesc);// Register timer for this new window.Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();if (nextFireTimestamp != null) {ctx.registerProcessingTimeTimer(nextFireTimestamp);}}@VisibleForTestingpublic long getInterval() {return interval;}@Overridepublic String toString() {return "ContinuousProcessingTimeTrigger(" + interval + ")";}/*** Creates a trigger that continuously fires based on the given interval.** @param interval The time interval at which to fire.* @param <W> The type of {@link Window Windows} on which this trigger can operate.*/public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval) {return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());}private static class Min implements ReduceFunction<Long> {private static final long serialVersionUID = 1L;@Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}}
}

疑问:

1.为什么ContinuousEventTimeTrigger需要注册一个窗口结束时间的计时器而ContinuousProcessingTimeTrigger不注册?
答案: 其实我们需要看下它注册后的目的作用是什么,ContinuousEventTimeTrigger的ontimer在处理窗口结束的触发器时会返回FIRE触发计算,那问题就来了,如果只是触发计算,那么如果没有,那么仅仅只是窗口结束的时候没有触发一次计算而已。所以这里不是必须的
2.为什么ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger返回的结果是FIRE而不是FIRE_AND_PURGE?状态是什么时候清理的?
答案: 首先要明确状态的清理这个逻辑,状态其实包括窗口的状态,触发器的状态等,返回FIRE仅仅是触发计算,而不会清理任何状态,而假设返回FIRE_AND_PURGE的作用是触发计算,并进行窗口状态的清理(注意这里是不包括触发器的状态的清理的),其实状态的清理是由WindowOperator在清理时间到时进行的(对于事件时间是窗口结束时间+迟到容忍间隔时间,对于处理时间是窗口结束时间),所以不必要在窗口结束时间到的时候返回FIRE_AND_PURGE,可以统一由WindowOperator在清理时间到之后统一清理状态

http://www.hkea.cn/news/896268/

相关文章:

  • 视频直播app开发网站南京最新消息今天
  • 溧阳手机网站哪里做万网域名注册官网查询
  • 网站维护收费推广产品吸引人的句子
  • 怎么用一个主机做多个网站许昌网络推广公司
  • 网站域名所有权郑州网站运营专业乐云seo
  • 桂园精品网站建设费用网站seo查询站长之家
  • 安卓手机怎么做网站站长工具seo综合查询广告
  • 余姚网站建设的公司手机百度账号申请注册
  • 预付网站制作费怎么做凭证如何自制网站
  • 定制网站多少钱北京seo网站管理
  • 南昌做网站公司哪家好如何建立独立网站
  • 成都解放号网站建设什么是百度竞价
  • 网站优化的基本思想与原则百度号码
  • 沧州网站建设制作设计优化深圳seo优化推广
  • 建立一个网站需要什么技术网上培训机构
  • 网站设计与管理论文百度账号注册平台
  • 网站空间商推荐seo是什么职位缩写
  • 怎么建设boss网站文件外链
  • 百度推广网站建设费百度搜索引擎的网址是多少
  • php 手机网站 上传图片定制网站建设
  • 关于网站建设的问题百度关键词分析
  • 登录官方网站装修公司网络推广方案
  • 设计网站官网入口网站搜索优化方法
  • 网站优化qq群山东做网站
  • wordpress icomoon太原seo快速排名
  • 中华建设杂志网站记者数据指数
  • 网站开发测试情况南召seo快速排名价格
  • 上海仓储公司小红书seo优化
  • 南京建设公司网站网络营销整合推广
  • wordpress更改语言沈阳seo优化