建设营销型网站广州,东莞房价将暴跌,个人网站备案地址选择,北京互联网公司招聘1. 背景
spark默认的jdbc只会用单task读取数据#xff0c;读取大数据量时#xff0c;效率低。 2. 解决方案
根据分区字段#xff0c;如日期进行划分#xff0c;增加task数量提升效率。 /*** 返回每个task按时间段划分的过滤语句* param startDate* param endDate* param …1. 背景
spark默认的jdbc只会用单task读取数据读取大数据量时效率低。 2. 解决方案
根据分区字段如日期进行划分增加task数量提升效率。 /*** 返回每个task按时间段划分的过滤语句* param startDate* param endDate* param threadCount* return*/def getPredicateDates(startDate: String, endDate: String, threadCount: Int): Array[String] {getPredicates(startDate, endDate, threadCount).map(xsrecordDate${x._1} and recordDate ${x._2})}/*** 将startDate到endDate间的日期根据给定的threadCount参数做时间段划分例如* getPredicates(2017-01-01, 2017-01-31, 10)* 返回* 2017-01-01 - 2017-01-04* 2017-01-05 - 2017-01-08* 2017-01-09 - 2017-01-12* 2017-01-13 - 2017-01-16* 2017-01-17 - 2017-01-20* 2017-01-21 - 2017-01-24* 2017-01-25 - 2017-01-28* 2017-01-29 - 2017-01-31** param startDate 开始日期* param endDate 结束日期* param threadCount 线程数* return 包含各个连续时段的数组*/def getPredicates(startDate: String, endDate: String, threadCount: Int): Array[(String, String)] {val dayDiff DateTimeUtils.rangeDay(startDate, endDate)val buff new ArrayBuffer[(String, String)]()if (dayDiff threadCount) {//天数差小于期望的线程数则按照每天一个线程处理var tempDate startDatewhile (tempDate endDate) {buff (tempDate - tempDate)tempDate DateTimeUtils.dateAddOne(tempDate)}} else {//天数差大于期望的线程数则按照线程数对时间段切分val offset (dayDiff / threadCount).toIntvar tempDate startDatewhile (DateTimeUtils.dateAddN(tempDate, offset) endDate) {buff (tempDate - DateTimeUtils.dateAddN(tempDate, offset))tempDate DateTimeUtils.dateAddOne(DateTimeUtils.dateAddN(tempDate, offset))}if (tempDate ! endDate) {buff (tempDate - endDate)}}buff.toArray}
DateTimeUtils工具类
import java.text.SimpleDateFormat
import java.util.{Calendar, Date, Locale}object DateTimeUtils {def rangeDay(startDateStr: String, endDateStr: String): Long {val dateFormat: SimpleDateFormat new SimpleDateFormat(yyyy-MM-dd)val startDate: Date dateFormat.parse(startDateStr)val endDate: Date dateFormat.parse(endDateStr)(endDate.getTime() - startDate.getTime()) / 1000 / 60 / 60 / 24}def dateAddOne(dateStr: String): String {var dateFormat: SimpleDateFormat new SimpleDateFormat(yyyy-MM-dd)var dateInfo: Date dateFormat.parse(dateStr)var cal: Calendar Calendar.getInstance()cal.setTime(dateInfo)cal.add(Calendar.DATE, 1)dateFormat.format(cal.getTime)}def dateAddN(dateStr: String, value: Int): String {var dateFormat: SimpleDateFormat new SimpleDateFormat(yyyy-MM-dd)var dateInfo: Date dateFormat.parse(dateStr)var cal: Calendar Calendar.getInstance()cal.setTime(dateInfo)cal.add(Calendar.DATE, value)dateFormat.format(cal.getTime)}
}
举例 val startDate DateTimeUtils.dateAddN(calcDate,-365) //获取计算日期一年前的日期作为开始时间val predicates getPredicateDates(startDate,calcDate,12) //分12个task读取提高性能val url PropUtils.getProxyJdbc() //jdbc连接的代理需按自己的项目实现val res spark.read.jdbc(url, tableName, predicates,PropUtils.getProperties()) 3. 实验及结论
使用1个节点 8核16G的Clickhouse数据库spark从clickhouse读取近4亿行数据。 单Task运行时间14min 按日期划分成12个Task运行时间1.6min 结论性能提升88.6%