当前位置: 首页 > news >正文

国外网站翻墙怎么做买域名哪个网站好

国外网站翻墙怎么做,买域名哪个网站好,如何做网站授权,柬埔寨做网站背景 在处理窗口函数时#xff0c;ProcessWindowFunction处理函数可以定义三个状态#xff1a; 富函数getRuntimeContext.getState, 每个key每个窗口的状态context.windowState(),每个key的状态context.globalState#xff0c;那么这几个状态之间有什么关系呢#xff1f; …背景 在处理窗口函数时ProcessWindowFunction处理函数可以定义三个状态 富函数getRuntimeContext.getState, 每个key每个窗口的状态context.windowState(),每个key的状态context.globalState那么这几个状态之间有什么关系呢 ProcessWindowFunction处理函数三种状态之间的关系 1.getRuntimeContext.getState这个定义的状态是每个key维度的也就是可以跨时间窗口并维持状态的 2.context.windowState()这个定义的状态是和每个key以及窗口相关的也就是虽然key相同但是时间窗口不同他们的值也不一样. 3.context.globalState这个定义的状态是和每个key相关的也就是和getRuntimeContext.getState的定义一样可以跨窗口维护状态 验证代码如下所示 package wikiedits.func;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.util.Collector; import wikiedits.func.model.KeyCount;import java.text.SimpleDateFormat;import java.util.Date;public class ProcessWindowFunctionDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 并行度为1env.setParallelism(1);// 设置数据源一共三个元素DataStreamTuple2String, Integer dataStream env.addSource(new SourceFunctionTuple2String, Integer() {Overridepublic void run(SourceContextTuple2String, Integer ctx) throws Exception {int xxxNum 0;int yyyNum 0;for (int i 1; i Integer.MAX_VALUE; i) {// 只有XXX和YYY两种nameString name (0 i % 2) ? XXX : YYY;//更新aaa和bbb元素的总数if (0 i % 2) {xxxNum;} else {yyyNum;}// 使用当前时间作为时间戳long timeStamp System.currentTimeMillis();// 将数据和时间戳打印出来用来验证数据System.out.println(String.format(source%s, %s, XXX total : %d, YYY total : %d\n,name,time(timeStamp),xxxNum,yyyNum));// 发射一个元素并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2String, Integer(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1000);}}Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分再用ProcessWindowFunctionSingleOutputStreamOperatorString mainDataStream dataStream// 以Tuple2的f0字段作为key本例中实际上key只有aaa和bbb两种.keyBy(value - value.f0)// 5秒一次的滚动窗口.timeWindow(Time.seconds(5))// 统计每个key当前窗口内的元素数量然后把key、数量、窗口起止时间整理成字符串发送给下游算子.process(new ProcessWindowFunctionTuple2String, Integer, String, String, TimeWindow() {// 自定义状态private ValueStateKeyCount state;Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态name是myStatestate getRuntimeContext().getState(new ValueStateDescriptor(myState, KeyCount.class));}public void clear(Context context){ValueStateKeyCount contextWindowValueState context.windowState().getState(new ValueStateDescriptor(myWindowState, KeyCount.class));contextWindowValueState.clear();}Overridepublic void process(String s, Context context, IterableTuple2String, Integer iterable,CollectorString collector) throws Exception {// 从backend取得当前单词的myState状态KeyCount current state.value();// 如果myState还从未没有赋值过就在此初始化if (current null) {current new KeyCount();current.key s;current.count 0;}int count 0;// iterable可以访问该key当前窗口内的所有数据// 这里简单处理只统计了元素数量for (Tuple2String, Integer tuple2 : iterable) {count;}// 更新当前key的元素总数current.count count;// 更新状态到backendstate.update(current);System.out.println(getRuntimeContext() context : (getRuntimeContext() context));ValueStateKeyCount contextWindowValueState context.windowState().getState(new ValueStateDescriptor(myWindowState, KeyCount.class));ValueStateKeyCount contextGlobalValueState context.globalState().getState(new ValueStateDescriptor(myGlobalState, KeyCount.class));KeyCount windowValue contextWindowValueState.value();if (windowValue null) {windowValue new KeyCount();windowValue.key s;windowValue.count 0;}windowValue.count count;contextWindowValueState.update(windowValue);KeyCount globalValue contextGlobalValueState.value();if (globalValue null) {globalValue new KeyCount();globalValue.key s;globalValue.count 0;}globalValue.count count;contextGlobalValueState.update(globalValue);ValueStateKeyCount contextWindowSameNameState context.windowState().getState(new ValueStateDescriptor(myState, KeyCount.class));ValueStateKeyCount contextGlobalSameNameState context.globalState().getState(new ValueStateDescriptor(myState, KeyCount.class));System.out.println(contextWindowSameNameState contextGlobalSameNameState : (contextWindowSameNameState contextGlobalSameNameState));System.out.println(state contextGlobalSameNameState : (state contextGlobalSameNameState));// 将当前key及其窗口的元素数量还有窗口的起止时间整理成字符串String value String.format(window, %s, %s - %s, %d, total : %d, windowStateCount :%s, globalStateCount :%s\n,// 当前keys,// 当前窗口的起始时间time(context.window().getStart()),// 当前窗口的结束时间time(context.window().getEnd()),// 当前key在当前窗口内元素总数count,// 当前key出现的总数current.count,contextWindowValueState.value(),contextGlobalValueState.value());// 发射到下游算子collector.collect(value);}});// 打印结果通过分析打印信息检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute(processfunction demo : processwindowfunction);}public static String time(long timeStamp) {return new SimpleDateFormat(hh:mm:ss).format(new Date(timeStamp));}} 输出结果 window, XXX, 08:34:45 - 08:34:50, 3, total : 22, windowStateCount :KeyCount{keyXXX, count3}, globalStateCount :KeyCount{keyXXX, count22} window, YYY, 08:34:45 - 08:34:50, 2, total : 22, windowStateCount :KeyCount{keyYYY, count2}, globalStateCount :KeyCount{keyYYY, count22}从结果可以验证以上的结论此外需要特别注意的一点是context.windowState()的状态需要在clear方法中清理掉因为一旦时间窗口结束就再也没有机会清理了 从这个例子中还发现一个比较有趣的现象 ValueStateKeyCount state getRuntimeContext().getState(new ValueStateDescriptor(myState, KeyCount.class)); ValueStateKeyCount contextWindowSameNameState context.windowState().getState(new ValueStateDescriptor(myState, KeyCount.class)); ValueStateKeyCount contextGlobalSameNameState context.globalState().getState(new ValueStateDescriptor(myState, KeyCount.class));在open中通过getRuntimeContext().getState定义的状态竟然可以通过 context.windowState()/ context.globalState()访问到并且他们指向的都是同一个变量可以参见代码的输出 System.out.println(contextWindowSameNameState contextGlobalSameNameState : (contextWindowSameNameState contextGlobalSameNameState)); System.out.println(state contextGlobalSameNameState : (state contextGlobalSameNameState));结果如下 contextWindowSameNameState contextGlobalSameNameState :true state contextGlobalSameNameState :true参考文献 https://cloud.tencent.com/developer/article/1815079
http://www.hkea.cn/news/14358142/

相关文章:

  • 网上书城网站开发意义电子商务网站建设软件选择
  • 栖霞酒店网站设计价格广州网站建设在线
  • 河南省建设网站政务公开网站项目建设书
  • 推进网站集约化建设wordpress 不能更换主题
  • 特色的南昌网站制作现在感染症状有哪些
  • 四川手机网站建设费用健身房网站的建设情况
  • 怎么做粉丝福利购网站wordpress文章加背景颜色
  • 关于网站设计的论文建设厅网站上报名
  • 北京网站推广公司全球网站排名
  • 重庆建设工程质量协会网站宿州百度seo排名软件
  • 涿州做网站建设wordpress后台接口数据
  • 做网站会什么软件站酷网站源码
  • 公司怎样制作网站如何用付费音乐做视频网站
  • 网站开发方法有哪些o2o是什么意思通俗讲
  • 湖南网站建设 系统asp网站空间申请
  • 企业网站建设需求调查centos启动wordpress
  • 格豪网络建站兰州做网站哪个平台好
  • 不用代码的网站建设百度指数查询网
  • 做盗版网站塘厦镇住房规划建设局网站
  • 如东住房和城乡建设局网站什么是销售型网站
  • 建设网站用什么软件排版珠海做网站开发服务公司
  • 清新大气企业公司网站源码自己做网站自己做SEO
  • 屏蔽ip地址访问网站响应式外贸网站价格
  • 西安做网站企业阿里云网站建设初衷
  • 海淀手机网站设计公司seo刷点击软件
  • 站群服务器是什么意思仙游县网站建设
  • 十堰网站建设网站建站 上海
  • 简述企业网站的基本功能自己创建一个网站
  • 电子商务网站建设 下载珠海移动网站建设公司
  • 外贸建站与推广广州越秀区发布紧急通告