当前位置: 首页 > news >正文

网站建设 站内页面连接如何搭建一个自己的服务器

网站建设 站内页面连接,如何搭建一个自己的服务器,wordpress 图片显示,一元夺宝网站制作视频Pyspark 注#xff1a;大家觉得博客好的话#xff0c;别忘了点赞收藏呀#xff0c;本人每周都会更新关于人工智能和大数据相关的内容#xff0c;内容多为原创#xff0c;Python Java Scala SQL 代码#xff0c;CV NLP 推荐系统等#xff0c;Spark Flink Kafka Hbase Hi…Pyspark 注大家觉得博客好的话别忘了点赞收藏呀本人每周都会更新关于人工智能和大数据相关的内容内容多为原创Python Java Scala SQL 代码CV NLP 推荐系统等Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货各种顶会的论文解读一起进步。 今天继续和大家分享一下Pyspark_结构化流4 #博学谷IT学习技术支持 文章目录Pyspark前言一、数据模拟器代码二、需求说明和代码实现总结前言 接上次继续Pyspark_结构化流今天主要是一个结构化流结合kafka的一个小案例。 一、数据模拟器代码 1- 创建一个topic, 放置后续物联网数据: search-log-topic ./kafka-topics.sh --create --zookeeper node1:2181 --topic search-log-topic --partitions 3 --replication-factor 2 import json import random import time import os from kafka import KafkaProducer# 锁定远端操作环境, 避免存在多个版本环境的问题 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python# 快捷键: main 回车 if __name__ __main__:print(模拟物联网数据)# 1- 构建一个kafka的生产者:producer KafkaProducer(bootstrap_servers[node1:9092, node2:9092, node3:9092],acksall,value_serializerlambda m: json.dumps(m).encode(utf-8))# 2- 物联网设备类型deviceTypes [洗衣机, 油烟机, 空调, 窗帘, 灯, 窗户, 煤气报警器, 水表, 燃气表]while True:index random.choice(range(0, len(deviceTypes)))deviceID fdevice_{index}_{random.randrange(1, 20)}deviceType deviceTypes[index]deviceSignal random.choice(range(10, 100))# 组装数据集print({deviceID: deviceID, deviceType: deviceType, deviceSignal: deviceSignal,time: time.strftime(%s)})# 发送数据producer.send(topicsearch-log-topic,value{deviceID: deviceID, deviceType: deviceType, deviceSignal: deviceSignal,time: time.strftime(%s)})# 间隔时间 5s内随机time.sleep(random.choice(range(1, 5))) 生成的kafka数据 {‘deviceID’: ‘device_0_14’, ‘deviceType’: ‘洗衣机’, ‘deviceSignal’: 18, ‘time’: ‘1680157073’} {‘deviceID’: ‘device_2_8’, ‘deviceType’: ‘空调’, ‘deviceSignal’: 30, ‘time’: ‘1680157074’} {‘deviceID’: ‘device_0_17’, ‘deviceType’: ‘洗衣机’, ‘deviceSignal’: 84, ‘time’: ‘1680157076’} {‘deviceID’: ‘device_2_15’, ‘deviceType’: ‘空调’, ‘deviceSignal’: 99, ‘time’: ‘1680157078’} {‘deviceID’: ‘device_1_17’, ‘deviceType’: ‘油烟机’, ‘deviceSignal’: 50, ‘time’: ‘1680157081’} 二、需求说明和代码实现 求: 各种信号强度30的设备的各个类型的数量和平均信号强度,先过滤再聚合 from pyspark.sql import SparkSession import pyspark.sql.functions as F import os# 锁定远端环境, 确保环境统一 os.environ[SPARK_HOME] /export/server/spark os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3 os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3if __name__ __main__:print(综合案例: 物联网案例实现)# 1- 创建SparkSession对象spark SparkSession.builder \.appName(file_source) \.master(local[1]) \.config(spark.sql.shuffle.partitions, 4) \.getOrCreate()# 2- 从Kafka中读取消息数据df spark.readStream \.format(kafka) \.option(kafka.bootstrap.servers, node1:9092,node2:9092,node3:9092) \.option(subscribe, search-log-topic) \.option(startingOffsets, earliest) \.load()# 3- 处理数据# 求: 各种信号强度30的设备的各个类型的数量和平均信号强度,先过滤再聚合# 数据: {deviceID: device_4_4, deviceType: 灯, deviceSignal: 20, time: 1677243108}df df.selectExpr(CAST(value AS STRING))# 思考 如何做呢?# 需要将这个Json字符串中各个字段都获取出来, 形成一个多列的数据# 专业名称: JSON拉平# 涉及函数: get_json_object() json_tuple()# df.createTempView(t1)# SQL# df spark.sql(# select# get_json_object(value,$.deviceID) as deviceID,# get_json_object(value,$.deviceType) as deviceType,# get_json_object(value,$.deviceSignal) as deviceSignal,# get_json_object(value,$.time) as time# from t1# )# df spark.sql(# select# json_tuple(value,deviceID,deviceType,deviceSignal,time) as (deviceID,deviceType,deviceSignal,time)# from t1# )# DSL# df df.select(# F.get_json_object(value, $.deviceID).alias(deviceID),# F.get_json_object(value,$.deviceType).alias(deviceType),# F.get_json_object(value,$.deviceSignal).alias(deviceSignal),# F.get_json_object(value,$.time).alias(time)# )df df.select(F.json_tuple(value, deviceID, deviceType, deviceSignal, time).alias(deviceID, deviceType,deviceSignal, time))# 求: 各种信号强度30的设备的各个类型的数量和平均信号强度,先过滤再聚合df df.where(df[deviceSignal] 30).groupBy(deviceType).agg(F.count(deviceID).alias(device_cnt),F.round(F.avg(deviceSignal), 2).alias(deviceSignal_avg))# 4- 打印结果df.writeStream.format(console).outputMode(complete).start().awaitTermination() 总结 今天主要和大家分享了如何用Pyspark_结构化流结合kafka模拟物连网小案例。
http://www.hkea.cn/news/14266880/

相关文章:

  • 阿里云网站地图是怎么做的wordpress友链插件
  • 百度投诉电话24小时巩义自助建站优化
  • 未成年人做网站多少钱能注册500万公司
  • 温州网站建设价格技术山西太原网络推广
  • 馆陶网站建设公司体育用品东莞网站建设
  • 做毕业设计免费网站建设用什么程序做资讯类网站
  • 百度数据网站贵阳市白云区官方网站
  • 网站服务器租用怎样收费西安wordpress
  • 河北省电力建设第一工程公司网站做网站需要学多久
  • 建站推广哪里有建站新闻资讯域名备案的网站建设方案书模板
  • 高端网站哪个比较好建设购物网站多少钱
  • wordpress如何建站呢wordpress邮件分析插件
  • 做网站开发教程社保门户网站建设方案
  • 房屋出租网站模板深圳市升华建设有限公司网站
  • 网站开发课程知识点总结泉州建站模板
  • 10g空间网站做视频网站网站的发展前景
  • 怎样做网站吸引人软件上传到那个网站做宣传
  • 自己做挖矿网站平面广告设计公司
  • 连云港做网站公司哪家好顺德网站建设原创
  • 2008 访问网站提示建设中更改wordpress主题语言包
  • 网站开发的pc或移动端山东省建设监理协会官方网站
  • 网站开发制作费用数码网站名
  • 龙海网站建设价格wordpress页面上分页
  • 海外网站哪个最好西安高校定制网站建设
  • 网站网站设计公司网站开发的比较
  • 商丘做网站sqlongliqi中国商铺网
  • 有人做网站吗wordpress文章上的图片显示不
  • 威海自适应网站建设建设通下载
  • 公司网站建设征稿令电商扶贫网站建设
  • 怎么弄一个网站python做网站项目