当前位置: 首页 > news >正文

网站营销方案设计公司怎么写代码做网站

网站营销方案设计公司,怎么写代码做网站,云服务器一年多少钱,长春市网站推广文章目录 1.UDF2.UDAF2.1 UDF函数实现原理2.2需求:计算用户平均年龄2.2.1 使用RDD实现2.2.2 使用UDAF弱类型实现2.2.3 使用UDAF强类型实现 1.UDF 用户可以通过 spark.udf 功能添加自定义函数#xff0c;实现自定义功能。 如#xff1a;实现需求在用户name前加上Name:… 文章目录 1.UDF2.UDAF2.1 UDF函数实现原理2.2需求:计算用户平均年龄2.2.1 使用RDD实现2.2.2 使用UDAF弱类型实现2.2.3 使用UDAF强类型实现 1.UDF 用户可以通过 spark.udf 功能添加自定义函数实现自定义功能。 如实现需求在用户name前加上Name:字符串并打印在控制台 def main(args: Array[String]): Unit {//创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLDemo03)//创建 SparkSession 对象val sc: SparkSession SparkSession.builder().config(conf).getOrCreate()import sc.implicits._//创建DataFrameval dataRDD: RDD[(String,Int)] sc.sparkContext.makeRDD(List((zhangsan,21),(lisi,24)))val dataframe dataRDD.toDF(name,age)//注册udf函数sc.udf.register(addName,(x:String)Name:x)//创建临时视图dataframe.createOrReplaceTempView(people)//对临时视图使用udf函数sc.sql(select addName(name) from people).show()sc.stop()}2.UDAF 强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数 如 count()countDistinct()avg()max()min()。除此之外用户可以设定自己的自定义聚合函数。**通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。**从 Spark3.0 版本后UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数Aggregator。 2.1 UDF函数实现原理 在Spark中UDF用户自定义函数在对表中的数据进行处理时通常会将数据放入缓冲区中以便进行计算。这种缓冲策略可以提高数据处理的效率特别是对于大数据集。 2.2需求:计算用户平均年龄 2.2.1 使用RDD实现 val dataRDD: RDD[(String,Int)] sc.sparkContext.makeRDD(List((zhangsan,21),(lisi,24),(wangwu,26)))val reduceResult: (Int, Int) dataRDD.map({case (name, age) {(age, 1)}}).reduce((t1, t2) {(t1._1 t2._1, t1._2 t2._2)})println(reduceResult._1/reduceResult._2)2.2.2 使用UDAF弱类型实现 需要用户自定义类实现UserDefinedAggregateFunction并重写其中的方法当前已不推荐使用。 package bigdata.wordcount.udfimport org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType} import org.apache.spark.util.AccumulatorV2/*** 用户自定义函数*/ object UDF_Demo02 {def main(args: Array[String]): Unit {//创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLDemo03)//创建 SparkSession 对象val sc: SparkSession SparkSession.builder().config(conf).getOrCreate()import sc.implicits._val dataRDD: RDD[(String, Int)] sc.sparkContext.makeRDD(List((zhangsan, 19), (lisi, 21), (wangwu, 22)))val dataFrame: DataFrame dataRDD.toDF(name,age)dataFrame.createOrReplaceTempView(user)//创建聚合函数var myAvgnew MyAverageUDAF()//在Spark中注册自定义的聚合函数sc.udf.register(avgMy,myAvg)sc.sql(select avgMy(age) from user).show()sc.stop()}case class User(var name:String,var age:Int)}class MyAverageUDAF extends UserDefinedAggregateFunction{//输入的要进行聚合的参数的类型override def inputSchema: StructType StructType(Array(StructField(age,IntegerType)))//聚合函数缓冲区中的值的数据类型override def bufferSchema: StructType StructType(Array(StructField(sum,LongType),StructField(count,LongType)))//函数返回的值的数据类型override def dataType: DataType DoubleType//判断函数的稳定性//对于相同类型的输入是否有相同类型的输出override def deterministic: Boolean true//聚合函数缓冲区中值的初始化//因为数据是弱类型的函数缓冲区中是根据索引来找到对应的变量override def initialize(buffer: MutableAggregationBuffer): Unit {//年龄的总和buffer(0)0L//年龄的个数buffer(1)0L}//更新缓冲区中的数据(执行操作步骤)override def update(buffer: MutableAggregationBuffer, input: Row): Unit {//第0个索引值是否为空if(!input.isNullAt(0)) {//更新年龄sum的值buffer(0)buffer.getLong(0)input.getInt(0)//更新年龄个数buffer(1)buffer.getLong(1)1;}}//合并缓冲区override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit {buffer1(0)buffer1.getLong(0)buffer2.getLong(0)buffer1(1)buffer1.getLong(1)buffer2.getLong(1)}//计算最终结果override def evaluate(buffer: Row): Double {buffer.getLong(0).toDouble / buffer.getLong(1)} }2.2.3 使用UDAF强类型实现 Spark3.0 版本可以采用强类型的 Aggregator 方式代替 UserDefinedAggregateFunction package bigdata.wordcount.udfimport org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession, TypedColumn} import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType} import org.apache.spark.util.AccumulatorV2/*** 用户自定义函数*/ object UDF_Demo03 {def main(args: Array[String]): Unit {//创建上下文环境配置对象val conf: SparkConf new SparkConf().setMaster(local[*]).setAppName(SparkSQLDemo03)//创建 SparkSession 对象val sc: SparkSession SparkSession.builder().config(conf).getOrCreate()import sc.implicits._val dataRDD: RDD[(String, Int)] sc.sparkContext.makeRDD(List((zhangsan, 19), (lisi, 21), (wangwu, 22)))val dataFrame: DataFrame dataRDD.toDF(name,age)val dataset: Dataset[User01] dataFrame.as[User01]//创建聚合函数var myAvgnew MyAverageUDAF01()//将聚合函数转换为查询的列val col: TypedColumn[User01, Double] myAvg.toColumn//执行查询操作dataset.select(col).show()sc.stop()}case class User(var name:String,var age:Int)}//输入数据类型 case class User01(var name:String,var age:Int) //缓存中的数据类型 case class AgeBuffer(var sum:Long,var count:Long)class MyAverageUDAF01 extends Aggregator[User01,AgeBuffer,Double]{//设置初始值override def zero: AgeBuffer {AgeBuffer(0L,0L)}//缓冲区实现聚合override def reduce(b: AgeBuffer, a: User01): AgeBuffer {b.sum b.sum a.ageb.count b.count 1b}//合并缓冲区override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer {b1.sumb2.sumb1.countb2.countb1}//计算最终结果override def finish(buff: AgeBuffer): Double {buff.sum.toDouble/buff.count}//设置编码器和解码器//自定义类型就是 product 自带类型根据类型选择override def bufferEncoder: Encoder[AgeBuffer] {Encoders.product}override def outputEncoder: Encoder[Double] {Encoders.scalaDouble} }
http://www.hkea.cn/news/14499326/

相关文章:

  • p2p借贷网站开发 论文上海注册公司注册资金
  • 江苏网站建设平台photoshop教程
  • 外贸多语言网站网站更新方法
  • 做充气气模产品一般去哪些网站wordpress电影资源网站
  • 番禺网站开发哪家专业coding免费搭建wordpress
  • wordpress vip购买页面如何优化百度seo排名
  • 建网站公司哪个比较好seo基础视频教程
  • 清远东莞网站建设上海市安全建设监理协会网站
  • pc网站开发语言宝安网站设计制作
  • python 做网站 用哪个框架好十大装修公司排名哪家最好
  • 淄博 建设网站重庆一般做一个网站需要多少钱
  • 成品网站w灬源码伊园上海建设银行官网网站
  • 哔哩哔哩黄页网站江苏省建设考试信息管理系统网站
  • 余姚市城乡建设局网站建设一个网站的基本步骤
  • 桂林网站建设企业管理咨询收费标准
  • 吉林省建设厅网站特殊工种潍坊微信网站
  • 网站外链建设策略网站建设步和客户沟通
  • 网站制作案例网站开发的国内外现状
  • 浙江短视频seo优化网站云南网络营销文化优化
  • 网站开发浏览器举报网站制度建设方面
  • 做网站可以临摹吗深圳广告公司画册设计
  • 上饶市网站建设wordpress后台编辑小工具
  • 营销型网站建设策划书怎么写凡科建站官网登录
  • 学校如何重视校园网站建设企业管理软件开发软件公司
  • 散热器 东莞网站建设工程建设公司官网
  • 成都市建设工程质量协会网站眼科医院网站做竞价带来的询盘量
  • 网站地图制作怎么做?0基础wordpress
  • 网站扫二维码怎么做青海建设厅通报网站
  • 东莞知名网站网络项目工作室
  • 购物网站简介南京网页设计照片