做h5页面的网站哪个好,南京市浦口区城乡建设局网站,物联网网站开发公司,凡科建站官网页更换视频这是仿真过程某图#xff1a; 仿真实战kafka kafka消费sink端和StructuredStreaming集成通信成功 #xff0c; 数据接收全部接收
数据落地情况#xff1a; 全部接收到并all存入mysql
下面就简单分享一下StructuredStreaming代码吧
import org.apache.spark.sql.function… 这是仿真过程某图 仿真实战kafka kafka消费sink端和StructuredStreaming集成通信成功 数据接收全部接收
数据落地情况 全部接收到并all存入mysql
下面就简单分享一下StructuredStreaming代码吧
import org.apache.spark.sql.functions.{col, from_json}
import org.apache.spark.sql.streaming.{ OutputMode, Trigger}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}val spark: SparkSession SparkSession.builder().appName(kafkaConsumer).master(local[3]).getOrCreate()import spark.implicits._// 定义json字段类型格式val Jsonschmea: StructType new StructType().add(id, dataType IntegerType).add(name, dataType StringType).add(sorce, dataType IntegerType)val message: DataFrame spark.readStream // message为从kafka读到的原数据.format(kafka).option(kafka.bootstrap.servers, xxxxx:9092,xxxx:9092,xxxx:9092).option(subscribe, xxxx).option(startingOffsets, latest).load()// 将json字符串转化为结构化数据val streamData: DataFrame message.selectExpr(cast(value as String) as message) .select(from_json($message, Jsonschmea).alias(data))// 将json结构化为新的df// 预加载mysql驱动// 实时写入 第二个参数预占位want给每一批次加入唯一表示 but本次仅占位没有传参数def writeToMysql(batchDF: DataFrame, epochId: Long): Unit {val sqlurl jdbc:mysql://localhost:xxxx/xxxxval sqluser xxxxval sqlpass xxxxxClass.forName(com.mysql.cj.jdbc.Driver) // mysql 8.0后得驱动旧版本去掉cjbatchDF.foreachPartition {partitionOfRecords val connection DriverManager.getConnection(sqlurl, sqluser, sqlpass)// 关闭自动提交以支持增量写入connection.setAutoCommit(false)// 创建预编译的插入语句val insertsql insert into jsonstream(id,name,sorce) values(?,?,?)val preparedStatement connection.prepareStatement(insertsql)partitionOfRecords.foreach {row
// val id row.getAs[Int](data.id)
// val name row.getAs[String](data.name)
// val score row.getAs[Int](data.sorce)val id row.getAs[Row](data).getAs[Int](id)val name row.getAs[Row](data).getAs[String](name)val sorce row.getAs[Row](data).getAs[Int](sorce)// 设置参数到预处理sql函数中preparedStatement.setInt(1, id)preparedStatement.setString(2, name)preparedStatement.setInt(3, sorce)// 执行添加到批次操作preparedStatement.addBatch()}preparedStatement.executeBatch()connection.commit() // 执行批处理后手动提交事务preparedStatement.close() // 手动GCconnection.close()}}// 数据落地到数据库streamData.writeStream.outputMode(OutputMode.Append()).foreachBatch(writeToMysql _).trigger(Trigger.ProcessingTime(1 millisecond)) // 1 毫秒每个batch.start().awaitTermination()
存储按照一定批次量做存储
友情提示 上述程序是经过脱敏处理的哦 ----彩蛋----
如果你看到者你会知道scala在11更新之后也就是12版本如下
batchDF.foreachPartition {partitionOfRecords ... 这个位置 Dataset的foreachPartition 里面不能处理 Row的Iterator 所以需要转为rdd在做处理
所以更改后为
batchDF.rdd.foreachPartition { partitionOfRecords ...
而且这里不能用foreach 否则无法序列化就能存储到mysql 不能被序列化的数据是不能在网络中进行传输的通过二进制流的形式传出在被反序列化回来转化为对象的形式存储
ok -----