测网站打开的速度的网址,六安哪家公司做网站好,企业建站系统漏洞,网络规划设计师证书样本在Spark应用中#xff0c;外部系统经常需要使用到Spark DStream处理后的数据#xff0c;因此#xff0c;需要采用输出操作把DStream的数据输出到数据库或者文件系统中。
这里以《Spark2.1.0入门#xff1a;DStream输出操作》中介绍的NetworkWordCountStateful.scala为基础…在Spark应用中外部系统经常需要使用到Spark DStream处理后的数据因此需要采用输出操作把DStream的数据输出到数据库或者文件系统中。
这里以《Spark2.1.0入门DStream输出操作》中介绍的NetworkWordCountStateful.scala为基础进行修改。
把DStream输出到文本文件中
NetworkWordCountStateful.scala
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevelobject NetworkWordCountStateful {def main(args: Array[String]) {//定义状态更新函数val updateFunc (values: Seq[Int], state: Option[Int]) {val currentCount values.foldLeft(0)(_ _)val previousCount state.getOrElse(0)Some(currentCount previousCount)}StreamingExamples.setStreamingLogLevels() //设置log4j日志级别val conf new SparkConf().setMaster(local[2]).setAppName(NetworkWordCountStateful)val sc new StreamingContext(conf, Seconds(5))sc.checkpoint(file:///usr/local/spark/mycode/streaming/dstreamoutput/) //设置检查点检查点具有容错机制val lines sc.socketTextStream(localhost, 9999)val words lines.flatMap(_.split( ))val wordDstream words.map(x (x, 1))val stateDstream wordDstream.updateStateByKey[Int](updateFunc)stateDstream.print()//下面是新增的语句把DStream保存到文本文件中stateDstream.saveAsTextFiles(file:///usr/local/spark/mycode/streaming/dstreamoutput/output.txt)sc.start()sc.awaitTermination()}
}把DStream写入到MySQL数据库中
mysql use spark
mysql create table wordcount (word char(20), count int(4));
mysql select * from wordcount
//这个时候wordcount表是空的没有任何记录NetworkWordCountStateful.scala
import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevelobject NetworkWordCountStateful {def main(args: Array[String]) {//定义状态更新函数val updateFunc (values: Seq[Int], state: Option[Int]) {val currentCount values.foldLeft(0)(_ _)val previousCount state.getOrElse(0)Some(currentCount previousCount)}val conf new SparkConf().setMaster(local[2]).setAppName(NetworkWordCountStateful)val sc new StreamingContext(conf, Seconds(5))sc.checkpoint(file:///usr/local/spark/mycode/streaming/dstreamoutput/) //设置检查点检查点具有容错机制val lines sc.socketTextStream(localhost, 9999)val words lines.flatMap(_.split( ))val wordDstream words.map(x (x, 1))val stateDstream wordDstream.updateStateByKey[Int](updateFunc)stateDstream.print()//下面是新增的语句把DStream保存到MySQL数据库中 stateDstream.foreachRDD(rdd {//内部函数def func(records: Iterator[(String,Int)]) {var conn: Connection nullvar stmt: PreparedStatement nulltry {val url jdbc:mysql://localhost:3306/sparkval user rootval password hadoop //笔者设置的数据库密码是hadoop请改成你自己的mysql数据库密码conn DriverManager.getConnection(url, user, password)records.foreach(p {val sql insert into wordcount(word,count) values (?,?)stmt conn.prepareStatement(sql);stmt.setString(1, p._1.trim)stmt.setInt(2,p._2.toInt)stmt.executeUpdate()})} catch {case e: Exception e.printStackTrace()} finally {if (stmt ! null) {stmt.close()}if (conn ! null) {conn.close()}}}val repartitionedRDD rdd.repartition(3)repartitionedRDD.foreachPartition(func)})sc.start()sc.awaitTermination()}
}对于stateDstream为了把它保存到MySQL数据库中我们采用了如下的形式
stateDstream.foreachRDD(function)其中function就是一个RDD[T]Unit类型的函数对于本程序而言就是RDD[(String,Int)]Unit类型的函数也就是说stateDstream中的每个RDD都是RDD[(String,Int)]类型想象一下统计结果的形式是(“hadoop”,3)。这样对stateDstream中的每个RDD都会执行function中的操作即把该RDD保存到MySQL的操作。
下面看function的处理逻辑在function部分函数体要执行的处理逻辑实际上是下面的形式 def func(records: Iterator[(String,Int)]){……}val repartitionedRDD rdd.repartition(3)repartitionedRDD.foreachPartition(func) 也就是说这里定义了一个内部函数func它的功能是接收records然后把records保存到MySQL中。到这里你可能会有疑问为什么不是把stateDstream中的每个RDD直接拿去保存到MySQL中还要调用rdd.repartition(3)对这些RDD重新设置分区数为3呢这是因为每次保存RDD到MySQL中都需要启动数据库连接如果RDD分区数量太大那么就会带来多次数据库连接开销为了减少开销就有必要把RDD的分区数量控制在较小的范围内所以这里就把RDD的分区数量重新设置为3。然后对于每个RDD分区就调用repartitionedRDD.foreachPartition(func)把每个分区的数据通过func保存到MySQL中这时传递给func的输入参数就是Iterator[(String,Int)]类型的records。如果你不好理解下面这种调用形式
repartitionedRDD.foreachPartition(func) //这种形式func没有带任何参数可能不太好理解不是那么直观实际上这句语句和下面的语句是等价的下面的语句形式你可能会更好理解
repartitionedRDD.foreachPartition(records func(records)) 上面这种等价的形式比较直观为func()函数传入了一个records参数这就正好和 def func(records: Iterator[(String,Int)])定义对应起来了方便理解。