舞钢市城乡建设局网站,漯河建网站,企业vi设计的作用与意义,app网站开发教程实验5 Spark Structured Streaming编程实践
实验内容和要求
0.结构化流练习任务 0.1 讲义文件源-json数据任务。按照讲义中json数据的生成及分析#xff0c;复现实验#xff0c;并适当分析。 #xff08;1#xff09;创建程序生成JSON格式的File源测试数据
import osimp…实验5 Spark Structured Streaming编程实践
实验内容和要求
0.结构化流练习任务 0.1 讲义文件源-json数据任务。按照讲义中json数据的生成及分析复现实验并适当分析。 1创建程序生成JSON格式的File源测试数据
import osimport shutilimport randomimport time
TEST_DATA_TEMP_DIR /tmp/
TEST_DATA_DIR /tmp/testdata/ACTION_DEF [login, logout, purchase]
DISTRICT_DEF [fujian, beijing, shanghai, guangzhou]
JSON_LINE_PATTERN {{eventTime: {}, action: {}, district: {}}}\n‘# 测试的环境搭建判断文件夹是否存在如果存在则删除旧数据并建立文件夹
def test_setUp():if os.path.exists(TEST_DATA_DIR):shutil.rmtree(TEST_DATA_DIR, ignore_errorsTrue)os.mkdir(TEST_DATA_DIR)
# 测试环境的恢复对文件夹进行清理
def test_tearDown():if os.path.exists(TEST_DATA_DIR):shutil.rmtree(TEST_DATA_DIR, ignore_errorsTrue)# 生成测试文件
def write_and_move(filename, data):with open(TEST_DATA_TEMP_DIR filename,wt, encodingutf-8) as f:f.write(data)shutil.move(TEST_DATA_TEMP_DIR filename,TEST_DATA_DIR filename)if __name__ __main__:test_setUp()# 这里生成200个文件for i in range(200):filename e-mall-{}.json.format(i)content rndcount list(range(100))random.shuffle(rndcount)for _ in rndcount:content JSON_LINE_PATTERN.format(str(int(time.time())),random.choice(ACTION_DEF),random .choice(DISTRICT_DEF))write_and_move(filename, content)time.sleep(1)test_tearDown()2创建程序对数据进行统计
# 导入需要用到的模块
import os
import shutil
from pprint import pprintfrom pyspark.sql import SparkSession
from pyspark.sql.functions import window, asc
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import TimestampType, StringType
# 定义JSON文件的路径常量此为本地路径
TEST_DATA_DIR_SPARK /tmp/testdata/
if __name__ __main__:# 定义模式为时间戳类型的eventTime、字符串类型的操作和省份组成schema StructType([StructField(eventTime, TimestampType(), True),StructField(action, StringType(), True),StructField(district, StringType(), True)])spark SparkSession \.builder \.appName(StructuredEMallPurchaseCount) \.getOrCreate()spark.sparkContext.setLogLevel(WARN)lines spark \.readStream \.format(json) \.schema(schema) \.option(maxFilesPerTrigger, 100) \.load(TEST_DATA_DIR_SPARK)# 定义窗口windowDuration 1 minuteswindowedCounts lines \.filter(action purchase) \.groupBy(district, window(eventTime, windowDuration)) \.count() \.sort(asc(window)) query windowedCounts \.writeStream \.outputMode(complete) \.format(console) \.option(truncate, false) \.trigger(processingTime10 seconds) \.start()query.awaitTermination()
3测试运行程序 0.2 讲义kafka源2字母单词分析任务按照讲义要求复现kafka源实验。 1.启动Kafka 在Linux系统中新建一个终端记作“Zookeeper终端”输入下面命令启动Zookeeper服务 cd /usr/local/kafkabin/zookeeper-server-start.sh config/zookeeper.properties 新建第二个终端记作“Kafka终端”然后输入下面命令启动Kafka服务 cd /usr/local/kafkabin/kafka-server-start.sh config/server.properties 新建第三个终端记作“监控输入终端”执行如下命令监控Kafka收到的文本 cd /usr/local/kafkabin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordcount-topic 新建第四个终端记作“监控输出终端”执行如下命令监控输出的结果文本 cd /usr/local/kafkabin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic wordcount-result-topic 2.编写生产者Producer程序
# spark_ss_kafka_producer.pyimport string
import random
import timefrom kafka import KafkaProducerif __name__ __main__:producer KafkaProducer(bootstrap_servers[localhost:9092])while True:s2 (random.choice(string.ascii_lowercase) for _ in range(2))word .join(s2)value bytearray(word, utf-8)producer.send(wordcount-topic, valuevalue) \.get(timeout10)time.sleep(0.1)
3.编写消费者Consumer程序
# spark_ss_kafka_consumer.pyfrom pyspark.sql import SparkSessionif __name__ __main__:spark SparkSession \.builder \.appName(StructuredKafkaWordCount) \.getOrCreate()spark.sparkContext.setLogLevel(WARN) lines spark \.readStream \.format(kafka) \.option(kafka.bootstrap.servers, localhost:9092) \.option(subscribe, wordcount-topic) \.load() \.selectExpr(CAST(value AS STRING))wordCounts lines.groupBy(value).count()query wordCounts \.selectExpr(CAST(value AS STRING) as key, CONCAT(CAST(value AS STRING), :, CAST(count AS STRING)) as value) \.writeStream \.outputMode(complete) \.format(kafka) \.option(kafka.bootstrap.servers, localhost:9092) \.option(topic, wordcount-result-topic) \.option(checkpointLocation, file:///tmp/kafka-sink-cp) \.trigger(processingTime8 seconds) \.start()query.awaitTermination()
在终端中执行如下命令运行消费者程序 0.3 讲义socket源结构化流实现词频统计。按照讲义要求复现socket源实验。 代码文件spark_ss_rate.py
# spark_ss_rate.pyfrom pyspark.sql import SparkSessionif __name__ __main__:spark SparkSession \.builder \.appName(TestRateStreamSource) \.getOrCreate()spark.sparkContext.setLogLevel(WARN)lines spark \.readStream \.format(rate) \.option(rowsPerSecond, 5) \.load()print(lines.schema)query lines \.writeStream \.outputMode(update) \.format(console) \.option(truncate, false) \.start()query.awaitTermination()
在Linux终端中执行spark_ss_rate.py 0.4不选使用rate源评估系统性能。 1.日志分析任务 1.1通过Socket传送Syslog到Spark日志分析是一个大数据分析中较为常见的场景。 实验原理 在Unix类操作系统里Syslog广泛被应用于系统或者应用的日志记录中。Syslog通常被记录在本地文件内比如Ubuntu内为/var/log/syslog文件名也可以被发送给远程Syslog服务器。Syslog日志内一般包括产生日志的时间、主机名、程序模块、进程名、进程ID、严重性和日志内容。日志一般会通过Kafka等有容错保障的源发送本实验为了简化直接将Syslog通过Socket源发送。 实验过程 新建一个终端执行如下命令tail -n1 -f /var/log/syslog | nc -lk 9988“tail -n1 -f /var/log/syslog” 表示从第一行开始打印文件syslog的内容“-f”表示如果文件有增加则持续输出最新的内容。 然后通过管道把文件内容发送到nc程序nc程序可以进一步把数据发送给Spark。如果/var/log/syslog内的内容增长速度较慢可以再新开一个终端计作“手动发送日志终端”手动在终端输入如下内容来增加日志信息到/var/log/syslog内logger ‘I am a test error log message.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext# 创建SparkContext和StreamingContext
sc SparkContext(appNameSyslogAnalysis)
ssc StreamingContext(sc, 1)# 创建一个DStream接收来自Socket的数据流
lines ssc.socketTextStream(localhost, 9988)# 在数据流上应用转换和操作
word_counts lines.flatMap(lambda line: line.split( )) \.map(lambda word: (word, 1)) \.reduceByKey(lambda x, y: x y)# 输出结果到控制台
word_counts.pprint()# 启动StreamingContext
ssc.start()
ssc.awaitTermination() 1.2对Syslog进行查询 由Spark接收nc程序发送过来的日志信息然后完成以下任务 统计CRON这个进程每小时生成的日志数并以时间顺序排列水印设置为1分钟。统计每小时的每个进程或者服务分别产生的日志总数水印设置为1分钟。输出所有日志内容带error的日志。
from pyspark.sql.functions import window
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType# 创建SparkSession
spark SparkSession.builder \.appName(LogAnalysis) \.getOrCreate()# 定义日志数据的模式
schema StructType([StructField(timestamp, TimestampType(), True),StructField(message, StringType(), True)
])# 从socket接收日志数据流
logs spark.readStream \.format(socket) \.option(host, localhost) \.option(port, 9988) \.load()# 将接收到的日志数据流应用模式
logs logs.selectExpr(CAST(value AS STRING)) \.selectExpr(to_timestamp(value, yyyy-MM-dd HH:mm:ss) AS timestamp, value AS message) \.select(col(timestamp), col(message).alias(log_message))# 统计CRON进程每小时生成的日志数并按时间顺序排列
cron_logs logs.filter(col(log_message).contains(CRON)) \.groupBy(window(timestamp, 1 hour)) \.count() \.orderBy(window)# 统计每小时每个进程或服务产生的日志总数
service_logs logs.groupBy(window(timestamp, 1 hour), log_message) \.count() \.orderBy(window)# 输出所有带有error的日志内容
error_logs logs.filter(col(log_message).contains(error))# 设置水印为1分钟
cron_logs cron_logs.withWatermark(window, 1 minute)
service_logs service_logs.withWatermark(window, 1 minute)
error_logs error_logs.withWatermark(timestamp, 1 minute)# 启动流式处理并输出结果
query_cron_logs cron_logs.writeStream \.outputMode(complete) \.format(console) \.start()query_service_logs service_logs.writeStream \.outputMode(complete) \.format(console) \.start()query_error_logs error_logs.writeStream \.outputMode(append) \.format(console) \.start()# 等待流式处理完成
query_cron_logs.awaitTermination()
query_service_logs.awaitTermination()
query_error_logs.awaitTermination()
2.股市分析任务进阶任务
数据集采用dj30数据集见教学平台。实验说明 本实验将使用两个移动均线策略短期移动均线为10天长期移动均线为40天。当短期移动均线越过长期移动均线时这是一个买入信号因为它表明趋势正在向上移动。这就是所谓的黄金交叉。同时当短期移动均线穿过长期移动均线下方时这是一个卖出信号因为它表明趋势正在向下移动。这就是所谓的死亡交叉。两种叉形如下图所示:dj30.csv包含了道琼斯工业平均指数25年的价格历史。 实验要求 1.设置流以将数据输入structed streaming。2.使用structed streaming窗口累计 dj30sum和dj30ct分别为价格的总和和计数。3.将这两个structed streaming (dj30sum和dj30ct)分开产生dj30avg从而创建10天MA和40天MA的移动平均值。4.比较两个移动平均线(短期移动平均线和长期移动平均线)来指示买入和卖出信号。 您的输出[dj30-feeder只有一个符号的数据:DJI这是隐含的。这个问题的输出将是[(日期买入DJI)(日期卖出DJI)等等]。应该是[(日期买入符号)(日期卖出符号)等等]的形式。 1.设置流以将数据输入structed streaming。 from pyspark.sql import SparkSession
from pyspark.sql.functions import *# 创建一个SparkSession对象
spark SparkSession.builder \.appName(StructuredStreamingExample) \.getOrCreate()
inputPath path_to_dj30.csv# 读取dj30.csv文件并创建一个输入流
df spark.readStream \.format(csv) \.option(header, true) \.load(inputPath)# 对数据进行处理和转换
df df.withColumn(timestamp, to_timestamp(col(date), yyyy-MM-dd))# 定义输出操作
agg_df df.groupBy(window(timestamp, 1 hour)).agg(sum(price).alias(dj30sum), count(price).alias(dj30ct))# 启动流式处理
query agg_df.writeStream \.outputMode(complete) \.format(console) \.start()# 等待流式处理完成
query.awaitTermination()
from pyspark.sql import SparkSession
from pyspark.sql.functions import colspark SparkSession.builder \.appName(DJ30 Structured Streaming) \.getOrCreate()dj30_data spark.read.csv(path/to/dj30.csv, headerTrue)streaming_data dj30_data.select(col(Long Date).alias(date), col(Close).cast(float).alias(close))streaming_data.createOrReplaceTempView(dj30_stream)streaming_df spark.sql(SELECT * FROM dj30_stream) 2.使用structed streaming窗口累计 dj30sum和dj30ct分别为价格的总和和计数 3.将这两个structed streaming (dj30sum和dj30ct)分开产生dj30avg从而创建10天MA和40天MA的移动平均值 4.比较两个移动平均线(短期移动平均线和长期移动平均线)来指示买入和卖出信号。