当前位置: 首页 > news >正文

扬州做阿里巴巴的公司网站优化seo搜索排名

扬州做阿里巴巴的公司网站,优化seo搜索排名,找人做app网站吗,中企动力网站优化1.5 window 滚动窗口滑动窗口 window操作就是窗口函数。Spark Streaming提供了滑动窗口操作的支持#xff0c;从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据#xff0c;会被聚合起来执行计算操作#xff0c;然后生成的RDD#xff0c;会…1.5 window 滚动窗口滑动窗口 window操作就是窗口函数。Spark Streaming提供了滑动窗口操作的支持从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据会被聚合起来执行计算操作然后生成的RDD会作为window DStream的一个RDD。比如下图中就是对每三秒钟的数据执行一次滑动窗口计算这3秒内的3个RDD会被聚合起来进行处理然后过了两秒钟又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作都必须指定两个参数窗口长度以及滑动间隔而且这两个参数值都必须是batch间隔的整数倍。 红色的矩形就是一个窗口窗口hold的是一段时间内的数据流。 这里面每一个time都是时间单元在官方的例子中每隔window size是3 time unit, 而且每隔2个单位时间窗口会slide一次。 所以基于窗口的操作需要指定2个参数 window length - The duration of the window (3 in the figure) slide interval - The interval at which the window-based operation is performed (2 in the figure). 窗口大小个人感觉是一段时间内数据的容器。 滑动间隔就是我们可以理解的cron表达式吧。 案例实现 package com.qianfeng.sparkstreaming ​ import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} ​ /*** 统计截止到目前为止出现的每一个key的次数* window窗口操作每个多长M时间,通过过往N长时间内产生的数据* M就是滑动长度sliding interval* N就是窗口长度window length*/ object Demo05_WCWithWindow {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(WordCountUpdateStateByKey).setMaster(local[*])val batchInterval 2val duration Seconds(batchInterval)val ssc new StreamingContext(conf, duration)val lines:DStream[String] ssc.socketTextStream(qianfeng01, 6666)val pairs:DStream[(String, Int)] lines.flatMap(_.split(\\s)).map((_, 1)) ​val ret:DStream[(String, Int)] pairs.reduceByKeyAndWindow(__,windowDuration Seconds(batchInterval * 3),slideDuration Seconds(batchInterval * 2)) ​ret.print() ​ssc.start()ssc.awaitTermination()} } 1.6 SparkSQL和SparkStreaming的整合案例 Spark最强大的地方在于可以与Spark Core、Spark SQL整合使用之前已经通过transform、foreachRDD等算子看到如何将DStream中的RDD使用Spark Core执行批处理操作。现在就来看看如何将DStream中的RDD与Spark SQL结合起来使用。 案例top3的商品排序 最新的top3 这里就是基于updatestateByKey统计截止到目前为止的不同品类下的商品销量top3 代码实现 package com.qianfeng.sparkstreaming ​ import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream /*** SparkStreaming整合SparkSQL的案例之热门品类top3排行* 输入数据格式* id brand category* 1 huwei watch* 2 huawei phone**/ object Demo06_SQLWithStreaming {def main(args: Array[String]): Unit {val conf new SparkConf().setAppName(StreamingIntegerationSQL).setMaster(local[*])val batchInterval 2val duration Seconds(batchInterval)val spark SparkSession.builder().config(conf).getOrCreate()val ssc new StreamingContext(spark.sparkContext, duration)ssc.checkpoint(/Users/liyadong/data/sparkdata/streamingdata/chk-1)val lines:DStream[String] ssc.socketTextStream(qianfeng01, 6666)//001 mi moblieval pairs:DStream[(String, Int)] lines.map(line {val fields line.split(\\s)if(fields null || fields.length ! 3) {(, -1)} else {val brand fields(1)val category fields(2)(s${category}_${brand}, 1)}}).filter(t t._2 ! -1) ​val usb:DStream[(String, Int)] pairs.updateStateByKey(updateFunc) ​usb.foreachRDD((rdd, bTime) {if(!rdd.isEmpty()) {//category_brand countimport spark.implicits._val df rdd.map{case (cb, count) {val category cb.substring(0, cb.indexOf(_))val brand cb.substring(cb.indexOf(_) 1)(category, brand, count)}}.toDF(category, brand, sales) ​df.createOrReplaceTempView(tmp_category_brand_sales)val sql |select| t.category,| t.brand,| t.sales,| t.rank|from (| select|   category,|   brand,|   sales,|   row_number() over(partition by category order by sales desc) rank| from tmp_category_brand_sales|) t|where t.rank 4|;.stripMarginspark.sql(sql).show()}}) ​ssc.start()ssc.awaitTermination()} ​def updateFunc(seq: Seq[Int], option: Option[Int]): Option[Int] {Option(seq.sum option.getOrElse(0))} } 1.7 SparkStreaming整合Reids //将实时结果写入Redis中 dStream.foreachRDD((w,c){val jedis new Jedis(192.168.10.101, 6379)   //抽到公共地方即可jedis.auth(root)jedis.set(w.toString(),c.toString())  //一个key对应多个值可以考虑hset }) Guff_hys_python数据结构,大数据开发学习,python实训项目-CSDN博客
http://www.hkea.cn/news/14422337/

相关文章:

  • 企业网站建设需要考虑内容淄博临淄网站建设
  • 某公司人事管理网站开发网站广告接入
  • 武进做网站的公司个人网站 费用
  • 广州 创意的网站设计做菠菜网站判多久
  • 网站源码怎么写山西企业模板建站信息
  • 王晴儿网站建设玩具网站建设
  • 保健品网站可以做网站微信文章导入wordpress
  • 菜鸟教程网站是怎么做的吉林电商网站建设
  • 网站建设产品中心重庆市建设考试报名网站
  • 外贸网站建设如何做推广运营是做什么的
  • 公司网站建设电话南通五建宏业建设工程有限公司网站
  • 网站开发外包平台西宁做网站的好公司
  • 深圳网站建设网站设计软文推广怎么建设两个大淘客网站
  • 怎样才能建设只是于自己的网站中山百度首页推广
  • 广州海珠区赤岗 新港网站建设公司高稳定性的网站设计制作
  • wordpress刷量插件网站页脚优化怎么做
  • 农林网站建设公司上海营销型网站制作
  • 网站建设汇报方案ppt做网站必须用域名吗
  • 网站建设的有什么需求企业网站建设 新天地网络
  • 网站收录低的原因优化师证书
  • 收录网站是怎么做的常州商城网站建设
  • 代码网站建设红河县网站建设
  • 广东营销式网站做百度网站分录
  • 河北省建设厅报名网站宿州医疗网站建设
  • 山东莱州市建设局网站买个域名
  • ps切图做网站网站定位策划书
  • 怎么做免流网站天元建设集团有限公司申请破产了吗
  • 建网站卖东西拼多多电商运营模式
  • 胶州国际网站建设效果商事主体信息查询平台
  • 招聘网站开发计划书百科网站模板