有哪些网站可以免费做推广的,建设网站 注册与登陆,精准客户营销,新任上海市领导调整公示文章目录
Strctured Streaming简单应用
一、Output Modes输出模式
二、Streaming Table API
三、Triggers
1、unspecified#xff08;默认模式#xff09;
2、Fixed interval micro-batches默认模式
2、Fixed interval micro-batches固定间隔批次
3、 One-time micro-batch 仅一次触发
4、Continuous with fixed checkpoint interval连续处理 Strctured Streaming简单应用
一、Output Modes输出模式
Structured Streaming中结果输出时outputMode可以设置三种模式三种默认区别如下
Append Mode默认模式追加模式只有自上次触发后追加到结果表中的新行才会被输出。只有select、where、map、flatmap、filter、join查询支持追加模式。Complete Mode完整模式将整个更新的结果输出。仅可用于代码中有聚合查询情况代码中没有聚合查询不能使用。Update Mode更新模式自Spark2.1.1版本后可用只有自上次触发后更新的行才会被输出。这种模式仅仅输出自上次触发以来发生更改的行。如果结果数据没有聚合操作那么相当于Append Mode。 二、Streaming Table API
在Spark3.1版本之后我们可以通过DataStreamReader.table()方式实时读取流式表中的数据使用DataStreamWriter.toTable()向表中实时写数据。
案例读取Socket数据实时写入到Spark流表中然后读取流表数据展示数据。
代码示例如下
package com.lanson.structuredStreamingimport org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SparkSession}object StreamTableAPI {def main(args: Array[String]): Unit {//1.创建对象val spark: SparkSession SparkSession.builder().master(local).appName(StreamTableAPI).config(spark.sql.shuffle.partitions, 1).config(spark.sql.warehouse.dir, ./my-spark-warehouse).getOrCreate()spark.sparkContext.setLogLevel(Error);import spark.implicits._//2.读取socket数据注册流表val df: DataFrame spark.readStream.format(socket).option(host, node3).option(port, 9999).load()//3.对df进行转换val personinfo: DataFrame df.as[String].map(line {val arr: Array[String] line.split(,)(arr(0).toInt, arr(1), arr(2).toInt)}).toDF(id, name, age)//4.将以上personinfo 写入到流表中personinfo.writeStream.option(checkpointLocation,./checkpoint/dir1).toTable(mytbl)import org.apache.spark.sql.functions._//5.读取mytbl 流表中的数据val query: StreamingQuery spark.readStream.table(mytbl).withColumn(new_age, col(age).plus(6)).select(id, name, age, new_age).writeStream.format(console).start()query.awaitTermination()}
}以上代码编写完成后启动向控制台输入以下数据
1,zs,18
2,ls,19
3,ww,20
4,ml,21
5,tq,22
结果输入如下 注意以上代码执行时Spark中写出的表由Spark 参数”spark.sql.warehouse.dir”指定的路径临时维护数据每次执行时需要将该路径下的表数据清空。 三、Triggers
Structured Streaming Triggers 决定了流式数据被处理时是微批处理还是连续实时处理以下是支持的Triggers
实时处理以下是支持的Triggers: Trigger Type 描述 Unspecified(默认) 代码使用Trigger.ProcessingTime(0L)。代码中没有明确指定触发类型则查询默认以微批模式执行表示尽可能快的执行查询。 Fixed interval micro-batches(固定间隔批次) 代码使用Trigger.ProcessingTime(long interval,TimeUnit timeUnit)查询将以微批模式处理批次间隔根据用户指定的时间间隔决定 如果前一个微批处理时间在时间间隔内完成则会等待间隔时间完成后再开始下一个微批处理如果前一个微批处理时间超过了时间间隔那么下一个微批处理将在前一个微批处理完成后立即开始。如果没有新数据可用则不会启动微批处理。 One-time micro-batch(仅一次性触发) 代码使用Trigger.Once()只执行一个微批次查询所有可用数据然后自动停止适用于一次性作业。 Continuous with fixed checkpoint interval(以固定checkpoint interval连续处理实验阶段) 代码使用Trigger.Continuous(long interval,TimeUnit timeUnit)以固定的Checkpoint间隔(interval)连续处理。在这种模式下连续处理引擎将每隔一定的间隔(interval)做一次checkpoint可获得低至1ms的延迟。
下面以读取Socket数据为例Scala代码演示各个模式 1、unspecified默认模式
代码如下
//3.默认微批模式执行查询尽快将结果写出到控制台
val query: StreamingQuery frame.writeStream.format(console).start()query.awaitTermination() 2、Fixed interval micro-batches固定间隔批次
代码如下
//3.用户指定固定间隔批次触发查询val query: StreamingQuery frame.writeStream.format(console).trigger(Trigger.ProcessingTime(5 seconds))
// .trigger(Trigger.ProcessingTime(5,TimeUnit.SECONDS).start()query.awaitTermination()
注意这种固定间隔批次指的是第一批次处理完成等待间隔时间然后处理第二批次数据依次类推。 3、 One-time micro-batch 仅一次触发
代码如下
//4.仅一次触发执行
val query: StreamingQuery frame.writeStream.format(console).trigger(Trigger.Once()).start()
query.awaitTermination() 4、Continuous with fixed checkpoint interval连续处理
Continuous不再是周期性启动task的批量执行数而是启动长期运行的task而是不断一个一个数据进行处理周期性的通过指定checkpoint来记录状态如果不指定checkpoint目录会将状态记录在Temp目录下保证exactly-once语义这样就可以实现低延迟。详细内容可以参照后续“Continuous处理”章节。
代码如下
//3.Continuous 连续触发执行
val query: StreamingQuery frame.writeStream.format(console)//每10ms 记录一次状态而不是执行一次.trigger(Trigger.Continuous(10,TimeUnit.MILLISECONDS)).option(checkpointLocation,./checkpint/dir4).start()
query.awaitTermination() 博客主页https://lansonli.blog.csdn.net欢迎点赞 收藏 ⭐留言 如有错误敬请指正本文由 Lansonli 原创首发于 CSDN博客停下休息的时候不要忘了别人还在奔跑希望大家抓紧时间学习全力奔赴更美好的生活✨