当前位置: 首页 > news >正文

中国供求网seo学院

中国供求网,seo学院,淘宝推广软件,汽车大全官网文章最前: 我是Octopus,这个名字来源于我的中文名--章鱼;我热爱编程、热爱算法、热爱开源。所有源码在我的个人github ;这博客是记录我学习的点点滴滴,如果您对 Python、Java、AI、算法有兴趣,可以关注我的…

文章最前: 我是Octopus,这个名字来源于我的中文名--章鱼;我热爱编程、热爱算法、热爱开源。所有源码在我的个人github ;这博客是记录我学习的点点滴滴,如果您对 Python、Java、AI、算法有兴趣,可以关注我的动态,一起学习,共同进步。

   相关文章:

  1. PySpark 概述
  2. Spark连接快速入门
  3. Spark上使用pandas API快速入门

创建pyspark对象

import warnings
warnings.filterwarnings('ignore')
#import pandas as pd
#import numpy as np
from datetime import timedelta, date, datetime
import time
import gc
import os
import argparse                             
import sysfrom pyspark.sql import SparkSession, functions as fn
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import *
from pyspark import StorageLevel
spark = SparkSession \.builder \.appName("stockout_test") \.config("hive.exec.dynamic.partition.mode", "nonstrict") \.config("spark.sql.sources.partitionOverwriteMode", "dynamic")\.config("spark.driver.memory", '20g')\.config("spark.executor.memory", '40g')\.config("spark.yarn.executor.memoryOverhead", '1g')\.config("spark.executor.instances", 8)\.config("spark.executor.cores", 8)\.config("spark.kryoserializer.buffer.max", '128m')\.config("spark.yarn.queue", 'root.algo')\.config("spark.executorEnv.OMP_NUM_THREADS", 12)\.config("spark.executorEnv.ARROW_PRE_0_15_IPC_FORMAT", 1) \.config("spark.default.parallelism", 800)\.enableHiveSupport() \.getOrCreate()
spark.sql("set hive.exec.dynamic.partition.mode = nonstrict")
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1")

创建DataFrame

employee_salary = [("zhangsan", "IT", 8000),("lisi", "IT", 7000),("wangwu", "IT", 7500),("zhaoliu", "ALGO", 10000),("qisan", "IT", 8000),("bajiu", "ALGO", 12000),("james", "ALGO", 11000),("wangzai", "INCREASE", 7000),("carter", "INCREASE", 8000),("kobe", "IT", 9000)]columns= ["name", "department", "salary"]
df = spark.createDataFrame(data = employee_salary, schema = columns)
df.show()
+--------+----------+------+
|    name|department|salary|
+--------+----------+------+
|zhangsan|        IT|  8000|
|    lisi|        IT|  7000|
|  wangwu|        IT|  7500|
| zhaoliu|      ALGO| 10000|
|   qisan|        IT|  8000|
|   bajiu|      ALGO| 12000|
|   james|      ALGO| 11000|
| wangzai|  INCREASE|  7000|
|  carter|  INCREASE|  8000|
|    kobe|        IT|  9000|
+--------+----------+------+

row_number()

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("row_number", F.row_number().over(windowSpec)).show(truncate=False)
+--------+----------+------+----------+
|name    |department|salary|row_number|
+--------+----------+------+----------+
|carter  |INCREASE  |8000  |1         |
|wangzai |INCREASE  |7000  |2         |
|kobe    |IT        |9000  |1         |
|zhangsan|IT        |8000  |2         |
|qisan   |IT        |8000  |3         |
|wangwu  |IT        |7500  |4         |
|lisi    |IT        |7000  |5         |
|bajiu   |ALGO      |12000 |1         |
|james   |ALGO      |11000 |2         |
|zhaoliu |ALGO      |10000 |3         |
+--------+----------+------+----------+

Rank()

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("rank",F.rank().over(windowSpec)).show(truncate=False)
+--------+----------+------+----+
|name    |department|salary|rank|
+--------+----------+------+----+
|carter  |INCREASE  |8000  |1   |
|wangzai |INCREASE  |7000  |2   |
|kobe    |IT        |9000  |1   |
|qisan   |IT        |8000  |2   |
|zhangsan|IT        |8000  |2   |
|wangwu  |IT        |7500  |4   |
|lisi    |IT        |7000  |5   |
|bajiu   |ALGO      |12000 |1   |
|james   |ALGO      |11000 |2   |
|zhaoliu |ALGO      |10000 |3   |
+--------+----------+------+----+

dense_rank()

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("dense_rank",F.dense_rank().over(windowSpec)).show()
+--------+----------+------+----------+
|    name|department|salary|dense_rank|
+--------+----------+------+----------+
|  carter|  INCREASE|  8000|         1|
| wangzai|  INCREASE|  7000|         2|
|    kobe|        IT|  9000|         1|
|   qisan|        IT|  8000|         2|
|zhangsan|        IT|  8000|         2|
|  wangwu|        IT|  7500|         3|
|    lisi|        IT|  7000|         4|
|   bajiu|      ALGO| 12000|         1|
|   james|      ALGO| 11000|         2|
| zhaoliu|      ALGO| 10000|         3|
+--------+----------+------+----------+

lag()

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("lag",F.lag("salary",1).over(windowSpec)).show()
+--------+----------+------+-----+
|    name|department|salary|  lag|
+--------+----------+------+-----+
|  carter|  INCREASE|  8000| null|
| wangzai|  INCREASE|  7000| 8000|
|    kobe|        IT|  9000| null|
|zhangsan|        IT|  8000| 9000|
|   qisan|        IT|  8000| 8000|
|  wangwu|        IT|  7500| 8000|
|    lisi|        IT|  7000| 7500|
|   bajiu|      ALGO| 12000| null|
|   james|      ALGO| 11000|12000|
| zhaoliu|      ALGO| 10000|11000|
+--------+----------+------+-----+

lead()

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
df.withColumn("lead",F.lead("salary", 1).over(windowSpec)).show()
+--------+----------+------+-----+
|    name|department|salary| lead|
+--------+----------+------+-----+
|  carter|  INCREASE|  8000| 7000|
| wangzai|  INCREASE|  7000| null|
|    kobe|        IT|  9000| 8000|
|zhangsan|        IT|  8000| 8000|
|   qisan|        IT|  8000| 7500|
|  wangwu|        IT|  7500| 7000|
|    lisi|        IT|  7000| null|
|   bajiu|      ALGO| 12000|11000|
|   james|      ALGO| 11000|10000|
| zhaoliu|      ALGO| 10000| null|
+--------+----------+------+-----+

Aggregate Functions

from pyspark.sql.window import Window
import pyspark.sql.functions as FwindowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
windowSpecAgg  = Window.partitionBy("department")df.withColumn("row", F.row_number().over(windowSpec)) \.withColumn("avg", F.avg("salary").over(windowSpecAgg)) \.withColumn("sum", F.sum("salary").over(windowSpecAgg)) \.withColumn("min", F.min("salary").over(windowSpecAgg)) \.withColumn("max", F.max("salary").over(windowSpecAgg)) \.withColumn("count", F.count("salary").over(windowSpecAgg)) \.withColumn("distinct_count", F.approx_count_distinct("salary").over(windowSpecAgg)) \.show()
+--------+----------+------+---+-------+-----+-----+-----+-----+--------------+
|    name|department|salary|row|    avg|  sum|  min|  max|count|distinct_count|
+--------+----------+------+---+-------+-----+-----+-----+-----+--------------+
|  carter|  INCREASE|  8000|  1| 7500.0|15000| 7000| 8000|    2|             2|
| wangzai|  INCREASE|  7000|  2| 7500.0|15000| 7000| 8000|    2|             2|
|    kobe|        IT|  9000|  1| 7900.0|39500| 7000| 9000|    5|             4|
|zhangsan|        IT|  8000|  2| 7900.0|39500| 7000| 9000|    5|             4|
|   qisan|        IT|  8000|  3| 7900.0|39500| 7000| 9000|    5|             4|
|  wangwu|        IT|  7500|  4| 7900.0|39500| 7000| 9000|    5|             4|
|    lisi|        IT|  7000|  5| 7900.0|39500| 7000| 9000|    5|             4|
|   bajiu|      ALGO| 12000|  1|11000.0|33000|10000|12000|    3|             3|
|   james|      ALGO| 11000|  2|11000.0|33000|10000|12000|    3|             3|
| zhaoliu|      ALGO| 10000|  3|11000.0|33000|10000|12000|    3|             3|
+--------+----------+------+---+-------+-----+-----+-----+-----+--------------+
from pyspark.sql.window import Window
import pyspark.sql.functions as F
# 需要注意的是 approx_count_distinct() 函数适用于窗函数的统计,
# 而在groupby中通常用countDistinct()来代替该函数,用来求组内不重复的数值的条数。
# approx_count_distinct()取的是近似的数值,不太准确,使用需注意 windowSpec  = Window.partitionBy("department").orderBy(F.desc("salary"))
windowSpecAgg  = Window.partitionBy("department")df.withColumn("row", F.row_number().over(windowSpec)) \.withColumn("avg", F.avg("salary").over(windowSpecAgg)) \.withColumn("sum", F.sum("salary").over(windowSpecAgg)) \.withColumn("min", F.min("salary").over(windowSpecAgg)) \.withColumn("max", F.max("salary").over(windowSpecAgg)) \.withColumn("count", F.count("salary").over(windowSpecAgg)) \.withColumn("distinct_count", F.approx_count_distinct("salary").over(windowSpecAgg)) \.where(F.col("row")==1).select("department","avg","sum","min","max","count","distinct_count") \.show()

 +----------+-------+-----+-----+-----+-----+--------------+ |department| avg| sum| min| max|count|distinct_count| +----------+-------+-----+-----+-----+-----+--------------+ | INCREASE| 7500.0|15000| 7000| 8000| 2| 2| | IT| 7900.0|39500| 7000| 9000| 5| 4| | ALGO|11000.0|33000|10000|12000| 3| 3| +----------+-------+-----+-----+-----+-----+--------------+

http://www.hkea.cn/news/362381/

相关文章:

  • 怎么做多个网站单点登录艺考培训
  • 网站怎么做双语种seo关键词如何设置
  • 用java做的游戏下载网站有哪些内容成都网络推广优化
  • 慈溪市网站建设google官网
  • 网站建设计划seo网站排名优化软件是什么
  • 大连网站建设谁家好郴州网站定制
  • 网站建设背景怎么写一个企业该如何进行网络营销
  • 为女朋友做的表白网站百度大数据分析工具
  • 上海高端网站建设服务公seo推广公司
  • 找人合伙做网站平台仿站定制模板建站
  • 深圳市网站建设科技公司腾讯网网站网址
  • wordpress语言文件夹seo销售好做吗
  • 河北建设集团官网西安网站seo
  • 在外汇局网站做登记报告恢复原来的百度
  • 做外贸做的很好的网站全国疫情突然又严重了
  • 开发app需要什么样的团队百度seo优化培训
  • ftp上传网站之后软文什么意思范例
  • 询广西南宁网站运营推广系统
  • wordpress侧边栏小工具佛山网站优化
  • 用vs做网站原型企业培训课程有哪些内容
  • wordpress评论自定义百度刷排名seo
  • 四川建设网官网登录入口泉州seo外包
  • 网站有备案 去掉备案网络营销意思
  • 新建网站推广给企业百度问一问在线咨询客服
  • 曹鹏wordpress建站seo视频广东疫情防控措施
  • 网站开发的岗位排名优化工具
  • 岳阳做网站怎么做推广让别人主动加我
  • 不断改进网站建设公司百度官网优化
  • 万户网站宁波网站制作优化服务
  • 潍坊快速网站排名网站是怎么做出来的