当前位置: 首页 > news >正文

视频网站建设类图网络营销的功能有哪些

视频网站建设类图,网络营销的功能有哪些,北仑网站制作,哪里有手机网站建设线上运行的CEP中肯定经常遇到规则变更的情况#xff0c;如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景#xff0c;如果规则窗口过长#xff08;一两个星期#xff09;#xff0c;状态过大#xff0c;就会导致重启…        线上运行的CEP中肯定经常遇到规则变更的情况如果每次变更时都将任务重启、重新发布是非常不优雅的。尤其在营销或者风控这种对实时性要求比较高的场景如果规则窗口过长一两个星期状态过大就会导致重启时间延长期间就会造成一些想要处理的异常行为不能及时发现。 1.实现分析 外部加载通常规则引擎会有专门的规则管理模块提供用户去创建自己的规则对于Flink任务来说需要到外部去加载规则动态更新需要提供定时去检测规则是否变更历史状态清理在模式匹配中是一系列NFAState 的不断变更如果规则发生变更需要清理历史状态API需要对外提供易用的API 2.代码实现 首先实现一个用户API。 package cep.functions;import java.io.Serializable;import org.apache.flink.api.common.functions.Function;import cep.pattern.Pattern;/*** author StephenYou* Created on 2023-07-23* Description: 动态Pattern接口用户调用API不区分key*/ public interface DynamicPatternFunctionT extends Function, Serializable {/**** 初始化* throws Exception*/public void init() throws Exception;/*** 注入新的pattern* return*/public PatternT,T inject() throws Exception;/*** 一个扫描周期:ms* return*/public long getPeriod() throws Exception;/*** 规则是否发生变更* return*/public boolean isChanged() throws Exception; }希望上述API的调用方式如下。 //正常调用CEP.pattern(dataStream,pattern);//动态PatternCEP.injectionPattern(dataStream, new UserDynamicPatternFunction()) 所以需要修改CEP-Lib源码 b.增加injectionPattern函数。 public class CEP {/**** Dynamic injection pattern function * param input* param dynamicPatternFunction* return* param T*/public static T PatternStreamT injectionPattern throws Exception (DataStreamT input,DynamicPatternFunctionT dynamicPatternFunction){return new PatternStream(input, dynamicPatternFunction); } } 增加PatternStream构造函数因为需要动态更新所以有必要传进去整个函数。 public class PatternStreamT {PatternStream(final DataStreamT inputStream, DynamicPatternFunctionT dynamicPatternFunction) throws Exception {this(PatternStreamBuilder.forStreamAndPatternFunction(inputStream, dynamicPatternFunction));} } 修改PatternStreamBuilder.build, 增加调用函数的过程。 final CepOperatorIN, K, OUT operator null;if (patternFunction null ) {operator new CepOperator(inputSerializer,isProcessingTime,nfaFactory,comparator,pattern.getAfterMatchSkipStrategy(),processFunction,lateDataOutputTag);} else {operator new CepOperator(inputSerializer,isProcessingTime,patternFunction,comparator,null,processFunction,lateDataOutputTag);} 增加对应的CepOperator构造函数。 public CepOperator(final TypeSerializerIN inputSerializer,final boolean isProcessingTime,final DynamicPatternFunction patternFunction,Nullable final EventComparatorIN comparator,Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,final PatternProcessFunctionIN, OUT function,Nullable final OutputTagIN lateDataOutputTag) {super(function);this.inputSerializer Preconditions.checkNotNull(inputSerializer);this.patternFunction patternFunction;this.isProcessingTime isProcessingTime;this.comparator comparator;this.lateDataOutputTag lateDataOutputTag;if (afterMatchSkipStrategy null) {this.afterMatchSkipStrategy AfterMatchSkipStrategy.noSkip();} else {this.afterMatchSkipStrategy afterMatchSkipStrategy;}this.nfaFactory null;} 加载Pattern构造NFA Overridepublic void open() throws Exception {super.open();timerService getInternalTimerService(watermark-callbacks, VoidNamespaceSerializer.INSTANCE, this);//初始化if (patternFunction ! null) {patternFunction.init();Pattern pattern patternFunction.inject();afterMatchSkipStrategy pattern.getAfterMatchSkipStrategy();boolean timeoutHandling getUserFunction() instanceof TimedOutPartialMatchHandler;nfaFactory NFACompiler.compileFactory(pattern, timeoutHandling);long period patternFunction.getPeriod();// 注册定时器检测规则是否变更if (period 0) {getProcessingTimeService().registerTimer(timerService.currentProcessingTime() period, this::onProcessingTime);}}nfa nfaFactory.createNFA();nfa.open(cepRuntimeContext, new Configuration());context new ContextFunctionImpl();collector new TimestampedCollector(output);cepTimerService new TimerServiceImpl();// metricsthis.numLateRecordsDropped metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);} 状态清理一共分为两块: 匹配状态数据清理、定时器清理 进行状态清理: Overridepublic void processElement(StreamRecordIN element) throws Exception {if (patternFunction ! null) {// 规则版本更新if (needRefresh.value() refreshVersion.get()) {//清除状态computationStates.clear();elementQueueState.clear();partialMatches.releaseCacheStatisticsTimer();//清除定时器IterableLong registerTime registerTimeState.get();if (registerTime ! null) {IteratorLong iterator registerTime.iterator();while (iterator.hasNext()) {Long l iterator.next();//删除定时器timerService.deleteEventTimeTimer(VoidNamespace.INSTANCE, l);timerService.deleteProcessingTimeTimer(VoidNamespace.INSTANCE, l);//状态清理iterator.remove();}}//更新当前的版本needRefresh.update(refreshVersion.get());}} } 上面是在处理每条数据时清除状态和版本。接下来要进行状态和版本的初始化。 Overridepublic void initializeState(StateInitializationContext context) throws Exception {super.initializeState(context);//初始化状态if (patternFunction ! null) {/*** 两个标识位状态*/refreshFlagState context.getOperatorStateStore().getUnionListState(new ListStateDescriptorInteger(refreshFlagState, Integer.class));if (context.isRestored()) {if (refreshFlagState.get().iterator().hasNext()) {refreshVersion new AtomicInteger(refreshFlagState.get().iterator().next());}} else {refreshVersion new AtomicInteger(0);}needRefresh context.getKeyedStateStore().getState(new ValueStateDescriptorInteger(needRefreshState, Integer.class, 0));} } 3.测试验证 设置每10s变更一次Pattern。 PatternStream patternStream CEP.injectionPattern(source, new TestDynamicPatternFunction());patternStream.select(new PatternSelectFunctionTuple3String, Long, String, Map() {Overridepublic Map select(Map map) throws Exception {map.put(processingTime, System.currentTimeMillis());return map;}}).print();env.execute(SyCep);}public static class TestDynamicPatternFunction implements DynamicPatternFunctionTuple3String, Long, String {public TestDynamicPatternFunction() {this.flag true;}boolean flag;int time 0;Overridepublic void init() throws Exception {flag true;}Overridepublic PatternTuple3String, Long, String, Tuple3String, Long, String inject()throws Exception {// 2种patternif (flag) {Pattern pattern Pattern.Tuple3String, Long, Stringbegin(start).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(success);}}).times(1).followedBy(middle).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(fail);}}).times(1).next(end);return pattern;} else {Pattern pattern Pattern.Tuple3String, Long, Stringbegin(start2).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(success2);}}).times(2).next(middle2).where(new IterativeConditionTuple3String, Long, String() {Overridepublic boolean filter(Tuple3String, Long, String value,ContextTuple3String, Long, String ctx) throws Exception {return value.f2.equals(fail2);}}).times(2).next(end2);return pattern;}}Overridepublic long getPeriod() throws Exception {return 10000;}Overridepublic boolean isChanged() throws Exception {flag !flag ;time getPeriod();System.out.println(change pattern : time);return true;}} 打印结果符合预期 4.源码地址 感觉有用的话帮忙点个小星星。^_^ GitHub - StephenYou520/SyCep: CEP 动态Pattern
http://www.hkea.cn/news/14357665/

相关文章:

  • 网站怎么挂广告电子商务目前就业形势
  • 做饼干的网站网站建设7个基本流程步骤有哪些
  • 绍兴网站建设公司最好的网站优化公司
  • 合肥金融网站开发长沙百度推广运营公司
  • 有做任务赚赏金的网站吗wordpress顶踩插件
  • 上海企业网站建设电话一个公司建设网站
  • 网站设计程序设计公司起名怎么起好
  • 成都价格网站建设服务公司网站为什么显示正在建设中
  • 请牢记此域名深圳专业seo
  • 镇江网站开发信息服务平台有哪些
  • 织梦系统如何做网站地图成都哪家做网站建设比较好
  • 网站信息可以滨州网站建设九鲁
  • 网站建设费属于什么税目简述网页设计的流程
  • 网页设计作品及源码seo企业优化顾问
  • 外贸公司的网站建设交通设施东莞网站建设
  • 建设网站有哪些步骤推广普通话 奋进新征程
  • 自动采集更新的网站wordpress烟台 网站建设
  • 系统难还是网站设计难做北京信息网
  • 自适应响应式网站源码做美工需要哪些网站
  • 网站友情链接交易平台厦门网站推广的目标
  • 网站建设好后怎么更新内容wordpress for windows
  • 设计站网站设计制作的介绍
  • 湛江有人做网站 的吗做违规网站
  • 最简单的一个网站开发百度浏览器官方网站
  • 宁波营销型网站建设首选汉中公司做网站
  • flash网站项目背景wordpress移动端转发分享
  • 微信网站开发用什么语言开公司要什么条件
  • 通城网站建设uml电子商务网站建设文档
  • 怎么申请个人网站html手机网站如何制作
  • 哪个网站专门做高清壁纸前端培训心得