盐都区城乡建设局网站,网站优化 月付费,广州 建网站,网站域名怎么进行实名认证随着 Paimon 近两年的推广普及#xff0c;使用 FlinkPaimon 构建数据湖仓的实践也越来越多。在 Flink 实时数据开发中#xff0c;对于依赖大量状态 state 的场景#xff0c;如长周期的累加指标计算、回撤长历史数据并更新等#xff0c;使用实时数仓作为中间存储来代替 Flin… 随着 Paimon 近两年的推广普及使用 FlinkPaimon 构建数据湖仓的实践也越来越多。在 Flink 实时数据开发中对于依赖大量状态 state 的场景如长周期的累加指标计算、回撤长历史数据并更新等使用实时数仓作为中间存储来代替 Flink 的内部状态 state 是非常有必要的。 本文主要分享了使用 Paimon 作为实时状态存储并在 Flink 中通过 Lookup 维表 Join 的方式进行状态查询和更新的应用实践。 简介 ▐ 是什么 Apache Paimon是一种流批统一的数据湖存储格式结合 Flink 可以构建流批处理的实时湖仓一体架构。Paimon 具有实时更新的能力可应用于对时效性要求不太高的场景如 1-5 分钟其主键表支持大规模更新写入具有非常高的更新性能同时也支持定义合并引擎按照自定义的方式更新记录。 Paimon 底层使用 OSS/HDFS 等作为存储同时数据文件以 LSMtree 的格式进行组织具有更优的实时数据更新能力和完整的流处理能力。 图片来源于 paimon 官网 ▐ 为什么 1. 低成本、可扩展性实时数仓产品也可以作为 flink 的中间存储比如 hologres但是 Paimon 的存储成本约为其的 1/9通过查询官网OSS 的存储为 0.12 元/GB/月Hologres 为 1 元/GB/月。同时数据湖相比于数据仓库可以与更多的大数据引擎Hive/Spark/Trino 等等兼容解决数据孤岛和数据冗余存储的问题。 2. 实时性能相比于其他的数据湖产品Paimon 是天然面向 Flink 设计而诞生的相比于 hudi面向 Spark 批处理设计、Iceberg 等Paimon在与 Flink 结合具有更优的处理大批量数据的 upsert 能力同时数据更新时效性最短可支持到 1 分钟且性能稳定。 应用案例 ▐ 1. 排除逆向退款的用户下单标签 需求背景 目前奥格运营平台提供的下单相关的实时标签如用户最近一次实物购买时间等都是基于实时下单流来加工的即不考虑用户后续的逆向退款情况。然而运营同学需要实时圈选出近一段时间未成功购买未下单或下单后退款的人群制定运营策略及发放权益提高复购率。因此该需求可以明确为构建手淘用户排除逆向退款后的最近一次下单时间的实时标签。 问题分析 用户的下单行为和退款行为是有时序性的因此当用户在下单后发生逆向退款行为时需要回撤之前的订单结果并回溯最近一次支付成功且未退款的订单信息。由于下单流和退款流是两条独立的流因此我们首先考虑到使用双流 join 的方法将他们关联起来但是会有如下两个限制 1. 时效性限制由于用户的两个行为发生的时间间隔是无法确定的用户可能下单后立刻退款也可能隔几个月后再申请退款interval join 和 window join 对窗口的间隔长度有较大的限制而 regular join 会受到状态存储大小的限制无法将间隔时间很长的两条流关联在一起。 2. 状态依赖限制这些方法都是完全依赖状态任务一旦无状态重启状态会丢失导致数据不准确。 解决方法 为了突破上述限制考虑采用 Paimon 表作为中间存储表存储用户在某商家的历史订单的下单时间当退款流来的时候可以去进行 lookupjoin 维表关联获取该用户在该商家的历史订单信息并排除掉所有退款成功的订单后输出最近一次未退款的订单时间。 整体方案设计 方案优势 不依赖状态可以节省大量内存且任务可以随时无状态启动。突破业务行为时间上的限制即使用户在下单后 1 年退款成功依然可以实现回撤。 方案不足 Paimon 表固有的数据时效性限制。存储在 Paimon 表中的历史订单数限制不可能无限存储某用户 x 商家维度近一年的所有订单通过和业务沟通确定只需要存储近 10 单即在极端情况下如果用户近 10 单全部退款则无法再回溯。 Paimon 维表设计 CREATE TABLE paimon-catalog.paimon-db.test_dws_pay_refund (test_buyer_id VARCHAR,test_seller_id VARCHAR,test_order_dict VARCHAR, -- 历史订单字典格式见下文说明PRIMARY KEY (test_buyer_id, test_seller_id) NOT ENFORCED
) WITH (merge-engine deduplicate, -- 使用预聚合数据合并机制产生聚合表changelog-producer none,bucket 1000,bucket-key test_buyer_id,test_seller_id,delete-file.thread-num 32
); 说明 1. 非分区表该维表主要是用于存储用户的历史订单信息与日期没有直接关系因此直接设置为非分区表。 2. 主键表主键设置为买家 ID商家 ID合并引擎采用 deduplicate即保留同一主键下最新一条数据。 3. 分桶数量因为要进行维表 join为了能够采用 shuffle 策略设置固定分桶提前探查历史数据后预估维表大小在 TB 级别遵循每个桶的数据量大小在 2GB 左右的官方建议设置分桶数量为 1000。 4. 不产生 changelog由于该表只作为维表使用为了节省资源可以不产生 changelog。 5. 字段含义对于非主键字段test_order_dict 用于存储用的历史订单信息格式为${order_id}:${pay_timestamp}_${is_refund_flag}其中 order_id 表示订单号pay_timestamp 表示下单时间is_refund_flag 表示是否退款。 开发经验分享 1. 分桶数量设置不合理 问题描述初始化 Paimon 表后的大小为 TB 级别一开始设置的分桶数为 400即每个桶的数据量在 5-10GB 范围内启动任务后发现数据总是卡在 LookupJoin 这一节点一直增大并发至 2048也一直堵塞如下图所示 原因分析 通过日志发现主要是在读取 Paimon 表的 OSS 存储时出现了阻塞原因是单个桶的文件量太大且传输速率已达到带宽上线因此依然需要长时间的同步。对于 shuffle join会对 join key必须要包含所有的 bucket key基于 bucket 的数量进行 shuffle然后分配到各个并发上当算子的并发分桶数时每个并发刚好只需要加载一个桶的文件到内存如果并发数小于分桶数则某些并发可能需要加载多个桶的文件数据可能会出现倾斜的情况。 解决方法按照官方文档的推荐单个桶的数据量不要超过 5G因此增大桶的数量到 1000启动任务时LookupJoin 算子各并发加载 Paimon 维表的时间明显缩短3 分钟左右即可加载完成并正常处理数据。 2. 写 Paimon 表出现Heartbeat of TaskManager timed out 问题描述初始化 Paimon 维表时需要将 ODPS 计算好的历史数据批量写入到 Paimon 表中在 vvp 中提交批任务但是在 writer 算子阶段部分 TM 会出现Heartbeat of TaskManager timed out 大的报错导致频繁失败重启。 原因分析通过查看官方文档writer 算子出现此类情况通常是因为 TM 的堆内存不足导致的paimon 表使用堆内存只要是以下 3 种方式 Paimon 主键表 writer 算子的每个并发都有一个内存 buffer 用于排序。该 buffer 的大小受 write-buffer-size 表参数控制默认值为 256 mb。Paimon 默认使用 orc 文件格式因此还需要一个内存 buffer 将内存里的数据分批转为列存格式。该 buffer 的大小受 orc.write.batch-size 表参数控制默认值为 1024即默认保存 1024 行数据。每个被修改的分桶都有一个专用的 writer 对象处理该分桶的写入数据。 解决方法 批任务配置时增大 TM 的内存从 4GB 到 8GB设置如下参数如果 Flink 作业不依赖于状态可以避免使用托管内存从而增加对内存的使用。 taskmanager.memory.managed.size1m 3. 非分区表数据清除策略 问题描述对于非分区主键表无法向分区表一样设置分区定期清除策略这样会导致表的数据量越来越大影响后续该表的读写性能。 尝试方法Paimon 官网中有对记录级别过期清除的参数如下 但在实际应用的过程中发现这种方法只是在新记录写入时会根据时间字段去判断该记录是否过期若过期则不会写入这条新记录对于 Paimon 表中已有的历史记录无法生效。 解决方法 将 Paimon 表改为只写入不压缩模式 alter table paimon-catalog.paimon-db.test_dws_pay_refund set (write-only true); 通过 delete 语句清除过期数据【批处理】待清除完成后恢复压缩参数设置为 write-only false 说明如果直接启动批作业执行 delete 语句在提交阶段任务会直接失败因为当多个 writer 同时写入同一个分桶数据时由于多个 writer 标记了同一条数据为‘deleted,在压缩过程中会发生冲突导致任务失败重启但是批作业无法进行重启。 ▐ 累加类实时标签开发 需求背景 目前平台提供的下单类实时标签大都为某段时间是否下单的二元标签为了丰富实时标签的多元性考虑新增某段时间下单数量、下单金额等累计类的实时标签。 问题分析 对于长周期可累加型指标的问题通常有以下几种思路 1. 开窗聚合这种方法的缺点是长时间的开窗会受到状态大小的限制因此不适用于长周期指标的累加。 2. 离线 T-2 结果实时结果关联汇总这种方法需要 离线任务必须在当天产出在大促期间需要强保障实时任务依然需要进行长达 2 天的开窗任务一旦无状态重启就需要从 2 天前的点位重新计算。 解决方法 对于上述问题中间存储是一定需要的但是我们考虑使用 Paimon 表来代替方法 2 中的离线结果表这样既可以摆脱对离线任务的强依赖同时也可以随时进行无状态重启。 整体方案设计 1. 累计日期的选择累计周期 N 的取值可以让用户自定义选择这要求我们提前算好不同 N 天的结果并存储在数据库中 因此 Paimon 维表必须存储天粒度的累加数据方便不同时间段指标结果的累加。针对该方案累加周期 N 的选择不建议太长因为这样会增大 Paimon 表的存储成本和维表关联时的计算成本因此我们这里选择 N 的最大取值为 7也可以根据业务的需要适当增加。 2. DAU 刷新数据如果某个用户今日没有下单那么存储在 Tair 中的近 N 天结果是不会被更新的这就导致结果的不准确因此需要使用 DAU 流驱动近 N 天累加结果的更新。 Paimon 维表设计 CREATE TABLE paimon-catalog.paimon-db.test_dws_pay_cate1_result (test_buyer_id BIGINT NOT NULL, test_cate1_id STRING NOT NULL, -- 一级类目IDds STRING NOT NULL,test_order_cnt BIGINT, -- 下单数test_gmv DOUBLE, -- GMVPRIMARY KEY (test_buyer_id, test_cate1_id, ds) NOT ENFORCED
)
COMMENT
WITH (bucket 40,bucket-key test_buyer_id,changelog-producer lookup,fields.test_order_cnt.aggregate-function sum,fields.test_gmv.aggregate-function sum,merge-engine aggregation,partition.expiration-time 7d,partition.timestamp-formatter yyyyMMdd,partition.timestamp-pattern $ds
) 说明 1. 分区表分区粒度为天分区存储每天每个用户 x 一级类目的成交单数和金额设置分区过期日期为 7 天 2. 主键表主键设置为用户 ID一级类目 ID 分区字段 3. 分桶数量分桶键设置为用户 ID具体原因可在下文会详述分桶数量根据维表大小(GB 级别)设置为 40 4. 聚合引擎使用 Paimon 主键表的 aggregation 预聚合引擎分别对字段 test_order_cnt下单数和 test_gmv下单金额采用 sum 累加的方式预聚合。 5. changelog 产生方式设置为 lookup其实我们下游并不需要实时消费 paimon 维表但是对于使用 aggregation 引擎的表必须设置该参数。 开发经验分享 由于分桶键设置不合理导致的 LookupJoin 算子报错 问题描述在设计 Paimon 维表的时候一开始会自然得把分桶键设置为主键即 test_buyer_id 和test_cate1_id两个字段但是在LookupJoin 的时候发现如果 joinkey 只是 test_buyer_id即 bucket key 的一部分对应算子就会长时间处于初始化中最后报错。 原因分析如果 joinkey 只是 bucket key 的一部分就会导致在 join 的时候无法通过 joinkey 快速定位到目标维表数据所在分桶及目标数据的位置导致查询效率低下。 解决方法需要保证 joinkey 完全包括 bucket key这样才可以通过 joinkey 快速定位到维表数据所在的分桶从而定位快速读取维表数据。因此将 bucket key 设置为 test_buyer_id。 衍生问题采用上述解决方法后启动任务会发现 LookupJoin 算子会有 5-10 分钟的初始化过程这是因为我们在 join 的时候没有指定 ds 分区字段的值导致 join 的时候会读取多个分区而不同分区的数据是存在不同的分桶中的因此 vvp 在加载维表数据的时候不会通过 shuffle 策略将维表数据加载到对应内存中而是将维表的全量数据加载到每个 TM 的 rocksdb 中然后再去从 TM 本地根据 joinkey 查询数据因此加载需要较长的时间。 经验总结 通过上述两个案例我们成功实践了 Paimon Flink 的实时湖仓架构用于需要大量实时状态存储的场景。使用 Paimon 作为中间存储进行维表 JOIN可以解决 Flink 内部状态 state 成本高、不可重启、存储周期短等限制从而满足复杂实时场景的数据开发需求同时这些中间存储结果也可以通过流/批的形式被 ODPS/Hologres 等大数据引擎消费实现数据统一这也是我们后续会进一步探索和应用的场景。 参考资料 Writer 算子堆内存的使用https://paimon.apache.org/docs/master/maintenance/write-performance/#write-memoryFlink 维表 Joinhttps://help.aliyun.com/zh/flink/developer-reference/join-statements-for-dimension-tables#section-syw-j1p-cgbPaimon dedicated compactionhttps://paimon.apache.org/docs/0.9/maintenance/dedicated-compaction/记录级别过期策略https://paimon.apache.org/docs/0.9/primary-key-table/compaction/数据湖产品对比https://www.cnblogs.com/johnnyzen/p/18189005 团队介绍 淘天业务技术用户运营平台技术团队是一支懂用户技术驱动的年轻队伍以用户为中心通过技术创新提升用户全生命周期体验持续为用户创造价值。团队立足体系化打造业界领先的用户增长基础设施以媒体外投平台、ABTest平台、用户运营平台为代表的基础设施赋能阿里集团用户增长日均处理数据量千亿规模、调用QPS千万级。 ¤ 拓展阅读 ¤ 3DXR技术 | 终端技术 | 音视频技术 服务端技术 | 技术质量 | 数据算法