保安公司网站如何做,做一个营销网站,建设装修网站,网站icp备案需求描述#xff1a;
我现在有一个dataframe,名为dfin,样例数据如下
a1_id_lxha2_PHtime比亚迪_汉1232023-11-15 12:12:23比亚迪_汉1252023-11-15 13:14:51比亚迪_汉1232023-11-15 12:13:23比亚迪_汉1262023-11-16 14:12:34比亚迪_秦2312023-11-15 14:12:28比亚迪_秦2342023…需求描述
我现在有一个dataframe,名为dfin,样例数据如下
a1_id_lxha2_PHtime比亚迪_汉1232023-11-15 12:12:23比亚迪_汉1252023-11-15 13:14:51比亚迪_汉1232023-11-15 12:13:23比亚迪_汉1262023-11-16 14:12:34比亚迪_秦2312023-11-15 14:12:28比亚迪_秦2342023-11-16 16:12:51比亚迪_秦2312023-11-15 12:13:51比亚迪_秦2312023-11-15 12:14:51 现在我每天接入实时数据保存在库里时间为最近时间的两天数据比如今天是20号数据库里存的数据就是19号20号的数据现在我要进行跑批任务将数据库里的19号的数据离线跑批进行部分逻辑处理后增加的存入
代码
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import java.text.SimpleDateFormat
import java.util.Dateval spark SparkSession.builder.appName(DateSubtraction).getOrCreate()import spark.implicits._
//测试data,实际生产中我们接前序dataframe
//20231121_lxh
//https://blog.csdn.net/qq_52128187?typeblog
val data Seq((比亚迪_汉, 123, 2023-11-15 12:12:23),(比亚迪_汉, 125, 2023-11-15 13:14:51),(比亚迪_汉, 123, 2023-11-15 12:13:23),(比亚迪_汉, 126, 2023-11-16 14:12:34),(比亚迪_秦, 231, 2023-11-15 14:12:28),(比亚迪_秦, 234, 2023-11-16 16:12:51),(比亚迪_秦, 231, 2023-11-15 12:13:51),(比亚迪_秦, 231, 2023-11-15 12:14:51)
)
//以此为例
val df data.toDF(a1, a2, time)val dateFormat new SimpleDateFormat(yyyy-MM-dd)
val currentDate dateFormat.format(new Date())val dfinWithNewColumn df.withColumn(date_subtracted, date_sub(to_date($time), 1))dfinWithNewColumn.show()
//后面在处理的话直接转为创建临时表或者再进行后续操作即可
//https://blog.csdn.net/qq_52128187?typeblog 输出结果
a1a2timedate_subtracted比亚迪_汉1232023-11-15 12:12:232023-11-14 12:12:23比亚迪_汉1252023-11-15 13:14:512023-11-14 13:14:51比亚迪_汉1232023-11-15 12:13:232023-11-14 12:13:23比亚迪_汉1262023-11-16 14:12:342023-11-15 14:12:34比亚迪_秦2312023-11-15 14:12:282023-11-14 14:12:28比亚迪_秦2342023-11-16 16:12:512023-11-15 16:12:51比亚迪_秦2312023-11-15 12:13:512023-11-14 12:13:51比亚迪_秦2312023-11-15 12:14:512023-11-14 12:14:5