当前位置: 首页 > news >正文

人人商城网站开发成都网站建站推广

人人商城网站开发,成都网站建站推广,dede网站名称不能中文,百度信誉任何在网站展示线上运行的CEP中肯定经常遇到规则变更的情况#xff0c;如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景#xff0c;如果规则窗口过长#xff08;一两个星期#xff09;#xff0c;状态过大#xff0c;就会导致重启…        线上运行的CEP中肯定经常遇到规则变更的情况如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景如果规则窗口过长一两个星期状态过大就会导致重启时间延长期间就会造成一些想要处理的异常行为不能及时发现。 1.实现分析 外部加载通常规则引擎会有专门的规则管理模块提供用户去创建自己的规则对于Flink任务来说需要到外部去加载规则动态更新需要提供定时去检测规则是否变更历史状态清理在模式匹配中是一系列NFAState 的不断变更如果规则发生变更需要清理历史状态API需要对外提供易用的API 2.代码实现 首先实现一个用户API。 package cep.functions;import java.io.Serializable;import org.apache.flink.api.common.functions.Function;import cep.pattern.Pattern;/*** author StephenYou* Created on 2023-07-23* Description: 动态Pattern接口用户调用API不区分key*/ public interface DynamicPatternFunctionT extends Function, Serializable {/**** 初始化* throws Exception*/public void init() throws Exception;/*** 注入新的pattern* return*/public PatternT,T inject() throws Exception;/*** 一个扫描周期:ms* return*/public long getPeriod() throws Exception;/*** 规则是否发生变更* return*/public boolean isChanged() throws Exception; }希望上述API的调用方式如下。 //正常调用CEP.pattern(dataStream,pattern);//动态PatternCEP.injectionPattern(dataStream, new UserDynamicPatternFunction()) 所以需要修改CEP-Lib源码 b.增加injectionPattern函数。 public class CEP {/**** Dynamic injection pattern function * param input* param dynamicPatternFunction* return* param T*/public static T PatternStreamT injectionPattern throws Exception (DataStreamT input,DynamicPatternFunctionT dynamicPatternFunction){return new PatternStream(input, dynamicPatternFunction); } } 增加PatternStream构造函数因为需要动态更新所以有必要传进去整个函数。 public class PatternStreamT {PatternStream(final DataStreamT inputStream, DynamicPatternFunctionT dynamicPatternFunction) throws Exception {this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction));} } 修改PatternStreamBuilder.build, 增加调用函数的过程。 final CepOperatorIN, K, OUT operator null;if (patternFunction null ) {operator new CepOperator(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);} else {operator new CepOperator(inputSerializer,isProcessingTime,patternFunction,comparator,null,processFunction,lateDataOutputTag);} 增加对应的CepOperator构造函数。 public CepOperator(final TypeSerializerIN inputSerializer,final boolean isProcessingTime,final DynamicPatternFunction patternFunction,Nullable final EventComparatorIN comparator,Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,final PatternProcessFunctionIN, OUT function,Nullable final OutputTagIN lateDataOutputTag) {super(function);this.inputSerializer Preconditions.checkNotNull(inputSerializer);this.patternFunction patternFunction;this.isProcessingTime isProcessingTime;this.comparator comparator;this.lateDataOutputTag lateDataOutputTag;if (afterMatchSkipStrategy null) {this.afterMatchSkipStrategy AfterMatchSkipStrategy.noSkip();} else {this.afterMatchSkipStrategy afterMatchSkipStrategy;}this.nfaFactory null;} 加载Pattern构造NFA Overridepublic void open() throws Exception {super.open();timerService getInternalTimerService(watermark-callbacks, VoidNamespaceSerializer.INSTANCE, this);//初始化if (patternFunction ! null) {patternFunction.init();Pattern pattern patternFunction.inject();afterMatchSkipStrategy pattern.getAfterMatchSkipStrategy();boolean timeoutHandling getUserFunction() instanceof TimedOutPartialMatchHandler;nfaFactory NFACompiler.compileFactory(pattern, timeoutHandling);long period patternFunction.getPeriod();// 注册定时器检测规则是否变更if (period 0) {getProcessingTimeService().registerTimer(timerService.currentProcessingTime() period, this::onProcessingTime);}}nfa nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());context new ContextFunctionImpl();collector new TimestampedCollector(output);cepTimerService new TimerServiceImpl();// metricsthis.numLateRecordsDropped metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);} 状态清理一共分为两块: 匹配状态数据清理、定时器清理 进行状态清理: Overridepublic void processElement(StreamRecordIN element) throws Exception {if (patternFunction ! null) {// 规则版本更新if (needRefresh.value() refreshVersion.get()) {//清除状态computationStates.clear();elementQueueState.clear();partialMatches.releaseCacheStatisticsTimer();//清除定时器IterableLong registerTime registerTimeState.get();if (registerTime ! null) {IteratorLong iterator registerTime.iterator();while (iterator.hasNext()) {Long l iterator.next();//删除定时器timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l);timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l);//状态清理iterator.remove();}}//更新当前的版本needRefresh.update(refreshVersion.get());}} } 上面是在处理每条数据时清除状态和版本。接下来要进行状态和版本的初始化。 Overridepublic void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);//初始化状态if (patternFunction ! null) {/*** 两个标识位状态*/refreshFlagState context.getOperatorStateStore().getUnionListState(new ListStateDescriptorInteger(refreshFlagState, Integer.class));if (context.isRestored()) {if (refreshFlagState.get().iterator().hasNext()) {refreshVersion new AtomicInteger(refreshFlagState.get().iterator().next());}} else {refreshVersion new AtomicInteger(0);}needRefresh context.getKeyedStateStore().getState(new ValueStateDescriptorInteger(needRefreshState, Integer.class, 0));} } 3.测试验证 设置每10s变更一次Pattern。 PatternStream patternStream CEP.injectionPattern(source, new TestDynamicPatternFunction());patternStream.select(new PatternSelectFunctionTuple3String, Long, String, Map() {Overridepublic Map select(Map map) throws Exception {map.put(processingTime, System.currentTimeMillis());return map;}}).print();env.execute(SyCep);}public static class TestDynamicPatternFunction implements DynamicPatternFunctionTuple3String, Long, String {public TestDynamicPatternFunction() {this.flag true;}boolean flag;int time 0;Overridepublic void init() throws Exception {flag true;}Overridepublic PatternTuple3String, Long, String, Tuple3String, Long, String inject()throws Exception {// 2种patternif (flag) {Pattern pattern Pattern.Tuple3String, Long, Stringbegin(start).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(success);}}).times(1).followedBy(middle).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(fail);}}).times(1).next(end);return pattern;} else {Pattern pattern Pattern.Tuple3String, Long, Stringbegin(start2).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(success2);}}).times(2).next(middle2).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(fail2);}}).times(2).next(end2);return pattern;}}Overridepublic long getPeriod() throws Exception {return 10000;}Overridepublic boolean isChanged() throws Exception {flag !flag ;time getPeriod();System.out.println(change pattern : time);return true;}} 打印结果符合预期 4.源码地址 感觉有用的话帮忙点个小星星。^_^ GitHub - StephenYou520/SyCep: CEP 动态Pattern
http://www.hkea.cn/news/14314934/

相关文章:

  • 深圳三站合一网站建设互联网推广属于什么经营范围
  • 百度手机网站制作信息流广告投放
  • 做企业网站市场分析vs2015做网站
  • 在哪注册网站东风地区网站建设价格
  • 河南省建设监理网站重庆最新网站备案
  • 国际物流网站建设男人互做网站
  • 阿里巴巴网站图片怎么做的优秀的品牌策划案例
  • 做的新网站做百度推广怎么弄做网站要有哪些知识
  • 廊坊网站关键词推广wordpress的文件权限设置
  • 哪里 教做网站带维护做php网站开发能赚钱吗
  • dede网站地图栏目如何上传文件代刷网站建设
  • 深圳建设网站哪家最好南宁3及分销网站制作
  • 宁波信誉好品牌网站设计地址手机怎么管理wifi踢人
  • 昆山兼职做网站wordpress例行维护
  • 青岛seo网站建设济南做网站最好的单位
  • 网站建设招标采购需求paypal网站做外贸
  • 个人备案的网站名称订阅号上链接的网站怎么做的
  • 类似淘宝的网站怎么做宣城市建设银行网站
  • 河南省建设协会网站做企业网站的第一步需要啥
  • 织梦网站专题模板下载几百块钱可以做网站吗
  • 兰州网站移动端优化现在做推广的新渠道有哪些
  • 网站营销优化方案上海市场营销公司
  • WordPress开网站很慢新闻排版设计用什么软件
  • 南宁武鸣区建设局网站微信商城软件开发
  • 怎么做html5网站吗手机端百度收录入口
  • 西安市官网网站seo优化技术入门
  • 天河岗顶棠下上社网站建设天津做网站推广的网站
  • 大型网站开发成本开启wordpress多站点
  • wordpress百度站长主动推送网片挂钩
  • 黑龙江建筑职业技术学院招生网站正规网络公司关键词排名优化