当前位置: 首页 > news >正文

专题网站建设方案wordpress单页面博客

专题网站建设方案,wordpress单页面博客,深圳高端包装盒设计,ps怎么做网站首页界面文章目录集成Flink环境准备sql-client方式启动sql-client插入数据查询数据更新数据流式插入code 方式环境准备代码类型映射核心参数设置去重参数并发参数压缩参数文件大小Hadoop参数内存优化读取方式流读#xff08;Streaming Query#xff09;增量读取#xff08;Increment… 文章目录集成Flink环境准备sql-client方式启动sql-client插入数据查询数据更新数据流式插入code 方式环境准备代码类型映射核心参数设置去重参数并发参数压缩参数文件大小Hadoop参数内存优化读取方式流读Streaming Query增量读取Incremental Query限流写入方式CDC 数据同步离线批量导入全量接增量写入模式Changelog 模式Append 模式Bucket 索引Hudi Catalog离线 Compaction离线 Clustering常见基础问题核心原理分析数据去重原理表写入原理表读取原理集成Flink HudiSupported Flink version0.12.x1.15.x、1.14.x、1.13.x0.11.x1.14.x、1.13.x0.10.x1.13.x0.9.01.12.2 0.11.x不建议使用如果要用请使用补丁分支:https://github.com/apache/hudi/pull/6182 环境准备 1拷贝编译好的jar包到Flink的lib目录下 cp /opt/software/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle_2.12-0.12.0.jar /opt/module/flink-1.13.6/lib/2拷贝guava包解决依赖冲突 cp /opt/module/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/module/flink-1.13.6/lib/3配置Hadoop环境变量 sudo vim /etc/profile.d/my_env.shexport HADOOP_CLASSPATHhadoop classpath export HADOOP_CONF_DIR$HADOOP_HOME/etc/hadoopsource /etc/profile.d/my_env.sh4启动Hadoop略 sql-client方式 启动sql-client 1修改flink-conf.yaml配置 vim /opt/module/flink-1.13.6/conf/flink-conf.yamlclassloader.check-leaked-classloader: false taskmanager.numberOfTaskSlots: 4state.backend: rocksdb execution.checkpointing.interval: 30000 state.checkpoints.dir: hdfs://hadoop1:8020/ckps state.backend.incremental: true2local模式 修改workers vim /opt/module/flink-1.13.6/conf/workers #表示会在本地启动3个TaskManager的 local集群 localhost localhost localhost启动Flink /opt/module/flink-1.13.6/bin/start-cluster.sh查看webuihttp://hadoop1:8081 启动Flink的sql-client /opt/module/flink-1.13.6/bin/sql-client.sh embedded3yarn-session模式 解决依赖问题 cp /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/module/flink-1.13.6/lib/启动yarn-session /opt/module/flink-1.13.6/bin/yarn-session.sh -d启动sql-client /opt/module/flink-1.13.6/bin/sql-client.sh embedded -s yarn-session插入数据 set sql-client.execution.result-modetableau;-- 创建hudi表 CREATE TABLE t1(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),partition VARCHAR(20) ) PARTITIONED BY (partition) WITH (connector hudi,path hdfs://hadoop1:8020/tmp/hudi_flink/t1,table.type MERGE_ON_READ –- 默认是COW ); 或如下写法 CREATE TABLE t1(uuid VARCHAR(20),name VARCHAR(10),age INT,ts TIMESTAMP(3),partition VARCHAR(20),PRIMARY KEY(uuid) NOT ENFORCED ) PARTITIONED BY (partition) WITH (connector hudi,path hdfs://hadoop1:8020/tmp/hudi_flink/t1,table.type MERGE_ON_READ );-- 插入数据 INSERT INTO t1 VALUES(id1,Danny,23,TIMESTAMP 1970-01-01 00:00:01,par1),(id2,Stephen,33,TIMESTAMP 1970-01-01 00:00:02,par1),(id3,Julian,53,TIMESTAMP 1970-01-01 00:00:03,par2),(id4,Fabian,31,TIMESTAMP 1970-01-01 00:00:04,par2),(id5,Sophia,18,TIMESTAMP 1970-01-01 00:00:05,par3),(id6,Emma,20,TIMESTAMP 1970-01-01 00:00:06,par3),(id7,Bob,44,TIMESTAMP 1970-01-01 00:00:07,par4),(id8,Han,56,TIMESTAMP 1970-01-01 00:00:08,par4);查询数据 select * from t1;更新数据 insert into t1 values(id1,Danny,27,TIMESTAMP 1970-01-01 00:00:01,par1);注意保存模式现在是Append。通常除非是第一次创建表否则请始终使用追加模式。现在再次查询数据将显示更新的记录。每个写操作都会生成一个用时间戳表示的新提交。查找前一次提交中相同的_hoodie_record_keys在_hoodie_commit_time、age字段中的变化。 流式插入 1创建测试表 CREATE TABLE sourceT (uuid varchar(20),name varchar(10),age int,ts timestamp(3),partition varchar(20) ) WITH (connector datagen,rows-per-second 1 );create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),partition varchar(20) ) with (connector hudi,path /tmp/hudi_flink/t2,table.type MERGE_ON_READ );2执行插入 insert into t2 select * from sourceT;3查看job 查看HDFS目录 4查询结果 set sql-client.execution.result-modetableau; select * from t2 limit 10;code 方式 除了用sql-client还可以自己编写FlinkSQL程序打包提交Flink作业。 环境准备 1手动install依赖 mvn install:install-file -DgroupIdorg.apache.hudi -DartifactIdhudi-flink_2.12 -Dversion0.12.0 -Dpackagingjar -Dfile./hudi-flink1.13-bundle-0.12.0.jar2创建Maven工程 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.atguigu.hudi/groupIdartifactIdflink-hudi-demo/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetflink.version1.13.6/flink.versionhudi.version0.12.0/hudi.versionjava.version1.8/java.versionscala.binary.version2.12/scala.binary.versionslf4j.version1.7.30/slf4j.version/propertiesdependenciesdependencygroupIdorg.apache.flink/groupIdartifactIdflink-java/artifactIdversion${flink.version}/versionscopeprovided/scope !--不会打包到依赖中只参与编译不参与运行 --/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-streaming-java_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-clients_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner-blink_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependency!--idea运行时也有webui--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-runtime-web_${scala.binary.version}/artifactIdversion${flink.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion${slf4j.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion${slf4j.version}/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.logging.log4j/groupIdartifactIdlog4j-to-slf4j/artifactIdversion2.14.0/versionscopeprovided/scope/dependencydependencygroupIdorg.apache.flink/groupIdartifactIdflink-statebackend-rocksdb_${scala.binary.version}/artifactIdversion${flink.version}/version/dependencydependencygroupIdorg.apache.hadoop/groupIdartifactIdhadoop-client/artifactIdversion3.1.3/versionscopeprovided/scope/dependency!--手动install到本地maven仓库--dependencygroupIdorg.apache.hudi/groupIdartifactIdhudi-flink_2.12/artifactIdversion${hudi.version}/versionscopeprovided/scope/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-shade-plugin/artifactIdversion3.2.4/versionexecutionsexecutionphasepackage/phasegoalsgoalshade/goal/goalsconfigurationartifactSetexcludesexcludecom.google.code.findbugs:jsr305/excludeexcludeorg.slf4j:*/excludeexcludelog4j:*/excludeexcludeorg.apache.hadoop:*/exclude/excludes/artifactSetfiltersfilter!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --artifact*:*/artifactexcludesexcludeMETA-INF/*.SF/excludeexcludeMETA-INF/*.DSA/excludeexcludeMETA-INF/*.RSA/exclude/excludes/filter/filterstransformers combine.childrenappendtransformer implementationorg.apache.maven.plugins.shade.resource.ServicesResourceTransformer/transformer/transformers/configuration/execution/executions/plugin/plugins/build/project代码 package com.atguigu.hudi.flink;import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; import org.apache.flink.contrib.streaming.state.PredefinedOptions; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.util.concurrent.TimeUnit;public class HudiDemo {public static void main(String[] args) {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 设置状态后端RocksDBEmbeddedRocksDBStateBackend embeddedRocksDBStateBackend new EmbeddedRocksDBStateBackend(true);embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);env.setStateBackend(embeddedRocksDBStateBackend);// checkpoint配置env.enableCheckpointing(TimeUnit.SECONDS.toMillis(30), CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig env.getCheckpointConfig();checkpointConfig.setCheckpointStorage(hdfs://hadoop1:8020/ckps);checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(20));checkpointConfig.setTolerableCheckpointFailureNumber(5);checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);StreamTableEnvironment sTableEnv StreamTableEnvironment.create(env);sTableEnv.executeSql(CREATE TABLE sourceT (\n uuid varchar(20),\n name varchar(10),\n age int,\n ts timestamp(3),\n partition varchar(20)\n ) WITH (\n connector datagen,\n rows-per-second 1\n ));sTableEnv.executeSql(create table t2(\n uuid varchar(20),\n name varchar(10),\n age int,\n ts timestamp(3),\n partition varchar(20)\n )\n with (\n connector hudi,\n path /tmp/hudi_flink/t2,\n table.type MERGE_ON_READ\n ));sTableEnv.executeSql(insert into t2 select * from sourceT);} }提交运行 将代码打成jar包上传到目录myjars执行提交命令 bin/flink run -t yarn-per-job \ -c com.atguigu.hudi.flink.HudiDemo \ ./myjars/flink-hudi-demo-1.0-SNAPSHOT.jar类型映射 Flink SQL TypeHudi TypeAvro logical typeCHAR/VARCHAR/STRINGstringBOOLEANbooleanBINARY / VARBINARYbytesDECIMALfixeddecimalTINYINTintSMALLINTintINTintBIGINTlongFLOATfloatDOUBLEdoubleDATEintdateTIMEinttime-millisTIMESTAMPlongtimestamp-millisARRAYarrayMAP(key must be string/char/varchar type)mapMULTISET(element must be string/char/varchar type)mapROWrecord 核心参数设置 Flink可配参数https://hudi.apache.org/docs/configurations#FLINK_SQL 去重参数 通过如下语法设置主键 -- 设置单个主键 create table hoodie_table (f0 int primary key not enforced,f1 varchar(20),... ) with (connector hudi,... )-- 设置联合主键 create table hoodie_table (f0 int,f1 varchar(20),...primary key(f0, f1) not enforced ) with (connector hudi,... )名称说明默认值备注hoodie.datasource.write.recordkey.field主键字段–支持主键语法 PRIMARY KEY 设置支持逗号分隔的多个字段precombine.field(0.13.0 之前版本为 write.precombine.field)去重时间字段–record 合并的时候会按照该字段排序选值较大的 record 为合并结果不指定则为处理序选择后到的 record 并发参数 参数说明 名称说明默认值备注write.taskswriter 的并发每个 writer 顺序写 1~N 个 buckets4增加并发对小文件个数没影响write.bucket_assign.tasksbucket assigner 的并发Flink的并行度增加并发同时增加了并发写的 bucekt 数也就变相增加了小文件(小 bucket) 数write.index_bootstrap.tasksIndex bootstrap 算子的并发增加并发可以加快 bootstrap 阶段的效率bootstrap 阶段会阻塞 checkpoint因此需要设置多一些的 checkpoint 失败容忍次数Flink的并行度只在 index.bootstrap.enabled 为 true 时生效read.tasks读算子的并发batch 和 stream4compaction.tasksonline compaction 算子的并发writer 的并发online compaction 比较耗费资源建议走 offline compaction 案例演示 可以flink建表时在with中指定或Hints临时指定参数的方式在需要调整的表名后面加上 /* OPTIONS() */ insert into t2 /* OPTIONS(write.tasks2,write.bucket_assign.tasks3,compaction.tasks4) */ select * from sourceT;压缩参数 参数说明 在线压缩的参数通过设置 compaction.async.enabled false关闭在线压缩执行但是调度compaction.schedule.enabled 仍然建议开启之后通过离线压缩直接执行 在线压缩任务 阶段性调度的压缩 plan。 名称说明默认值备注compaction.schedule.enabled是否阶段性生成压缩 plantrue建议开启即使compaction.async.enabled 关闭的情况下compaction.async.enabled是否开启异步压缩true通过关闭此参数关闭在线压缩compaction.tasks压缩 task 并发4compaction.trigger.strategy压缩策略num_commits支持四种策略num_commits、time_elapsed、num_and_time、num_or_timecompaction.delta_commits默认策略5 个 commits 压缩一次5compaction.delta_seconds3600compaction.max_memory压缩去重的 hash map 可用内存100MB资源够用的话建议调整到 1GBcompaction.target_io每个压缩 plan 的 IO 上限默认 5GB500GB 案例演示 CREATE TABLE t3(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),partition VARCHAR(20) ) WITH (connector hudi,path hdfs://hadoop1:8020/tmp/hudi_flink/t3,compaction.async.enabled true,compaction.tasks 1,compaction.schedule.enabled true,compaction.trigger.strategy num_commits,compaction.delta_commits 2,table.type MERGE_ON_READ );set table.dynamic-table-options.enabledtrue; insert into t3 select * from sourceT/* OPTIONS(rows-per-second 5)*/;注意如果没有按照5.2.1中yarn-session模式解决hadoop依赖冲突问题那么无法compaction生成parquet文件报错很隐晦在Exception中看不到要搜索TaskManager中关于compaction才能看到报错。 文件大小 参数说明 Hudi会自管理文件大小避免向查询引擎暴露小文件其中自动处理文件大小起很大作用。在进行insert/upsert操作时Hudi可以将文件大小维护在一个指定文件大小。 目前只有 log 文件的写入大小可以做到精确控制parquet 文件大小按照估算值。 名称说明默认值备注hoodie.parquet.max.file.size最大可写入的 parquet 文件大小120 * 1024 * 1024默认 120MB(单位 byte)超过该大小切新的 file grouphoodie.logfile.to.parquet.compression.ratiolog文件大小转 parquet 的比率0.35hoodie 统一依据 parquet 大小来评估小文件策略hoodie.parquet.small.file.limit在写入时hudi 会尝试先追加写已存小文件该参数设置了小文件的大小阈值小于该参数的文件被认为是小文件104857600默认 100MB(单位 byte)大于 100MB小于 120MB 的文件会被忽略避免写过度放大hoodie.copyonwrite.record.size.estimate预估的 record 大小hoodie 会依据历史的 commits 动态估算 record 的大小但是前提是之前有单次写入超过 hoodie.parquet.small.file.limit 大小在未达到这个大小时会使用这个参数1024默认 1KB(单位 byte)如果作业流量比较小可以设置下这个参数hoodie.logfile.max.sizeLogFile最大大小。这是在将Log滚转到下一个版本之前允许的最大大小。1073741824默认1GB(单位 byte) 案例演示 CREATE TABLE t4(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),partition VARCHAR(20) ) WITH (connector hudi,path hdfs://hadoop1:8020/tmp/hudi_flink/t4,compaction.tasks 1,hoodie.parquet.max.file.size 10000,hoodie.parquet.small.file.limit5000,table.type MERGE_ON_READ );set table.dynamic-table-options.enabledtrue; insert into t4 select * from sourceT /* OPTIONS(rows-per-second 5)*/;Hadoop参数 从 0.12.0 开始支持如果有跨集群提交执行的需求可以通过 sql 的 ddl 指定 per-job级别的 hadoop 配置 名称说明默认值备注hadoop.${you option key}通过 hadoop.前缀指定 hadoop 配置项–支持同时指定多个 hadoop 配置项 内存优化 内存参数 名称说明默认值备注write.task.max.size一个 write task 的最大可用内存1024当前预留给 write buffer 的内存为write.task.max.size -compaction.max_memory当 write task 的内存 buffer达到阈值后会将内存里最大的 buffer flush 出去write.batch.sizeFlink 的写 task 为了提高写数据效率会按照写 bucket 提前 buffer 数据每个 bucket 的数据在内存达到阈值之前会一直 cache 在内存中当阈值达到会把数据 buffer 传递给 hoodie 的 writer 执行写操作256一般不用设置保持默认值就好write.log_block.sizehoodie 的 log writer 在收到 write task 的数据后不会马上 flush 数据writer 是以 LogBlock 为单位往磁盘刷数据的在 LogBlock 攒够之前 records 会以序列化字节的形式 buffer 在 writer 内部128一般不用设置保持默认值就好write.merge.max_memoryhoodie 在 COW 写操作的时候会有增量数据和 base file 数据 merge 的过程增量的数据会缓存在内存的 map 结构里这个 map 是可 spill 的这个参数控制了 map 可以使用的堆内存大小100一般不用设置保持默认值就好compaction.max_memory同 write.merge.max_memory: 100MB 类似只是发生在压缩时。100如果是 online compaction资源充足时可以开大些比如 1GB MOR 1state backend 换成 rocksdb (默认的 in-memory state-backend 非常吃内存) 2内存够的话compaction.max_memory 调大些 (默认是 100MB 可以调到 1GB) 3关注 TM 分配给每个 write task 的内存保证每个 write task 能够分配到 write.task.max.size 所配置的大小比如 TM 的内存是 4GB 跑了 2 个 StreamWriteFunction 那每个 write function 能分到 2GB尽量预留一些 buffer因为网络 bufferTM 上其他类型 task (比如 BucketAssignFunction 也会吃些内存) 4需要关注 compaction 的内存变化compaction.max_memory 控制了每个 compaction task 读 log 时可以利用的内存大小compaction.tasks 控制了 compaction task 的并发 注意: write.task.max.size - compaction.max_memory 是预留给每个 write task 的内存 buffer COW 1state backend 换成 rocksdb默认的 in-memory state-backend 非常吃内存。 2write.task.max.size 和 write.merge.max_memory 同时调大默认是 1GB 和 100MB 可以调到 2GB 和 1GB。 3关注 TM 分配给每个 write task 的内存保证每个 write task 能够分配到 write.task.max.size 所配置的大小比如 TM 的内存是 4GB 跑了 2 个 StreamWriteFunction 那每个 write function 能分到 2GB尽量预留一些 buffer因为网络 bufferTM 上其他类型 task比如 BucketAssignFunction 也会吃些内存。 注意write.task.max.size - write.merge.max_memory 是预留给每个 write task 的内存 buffer。 读取方式 流读Streaming Query 当前表默认是快照读取即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式通过 read.start-commit 参数指定起始消费位置支持指定 earliest 从最早消费。 WITH参数 名称Required默认值说明read.streaming.enabledfalsefalse设置 true 开启流读模式read.start-commitfalse最新 commit指定 ‘yyyyMMddHHmmss’ 格式的起始 commit闭区间read.streaming.skip_compactionfalsefalse流读时是否跳过 compaction 的 commits跳过 compaction 有两个用途1避免 upsert 语义下重复消费 (compaction 的 instant 为重复数据如果不跳过有小概率会重复消费)2) changelog 模式下保证语义正确性****0.11 开始以上两个问题已经通过保留 compaction 的 instant time 修复****clean.retain_commitsfalse10cleaner 最多保留的历史 commits 数大于此数量的历史 commits 会被清理掉changelog 模式下这个参数可以控制 changelog 的保留时间例如 checkpoint 周期为 5 分钟一次默认最少保留 50 分钟的时间。 注意当参数 read.streaming.skip_compaction 打开并且 streaming reader 消费落后于clean.retain_commits 数时流读可能会丢失数据。从 0.11 开始compaction 不会再变更 record 的 instant time因此理论上数据不会再重复消费但是还是会重复读取并丢弃因此额外的开销还是无法避免对性能有要求的话还是可以开启此参数。 CREATE TABLE t5(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),partition VARCHAR(20) ) WITH (connector hudi,path hdfs://hadoop1:8020/tmp/hudi_flink/t5,table.type MERGE_ON_READ,read.streaming.enabled true,read.streaming.check-interval 4 -- 默认60s );insert into t5 select * from sourceT; select * from t5;增量读取Incremental Query 从 0.10.0 开始支持。 如果有增量读取 batch 数据的需求增量读取包含三种场景。 1Stream 增量消费通过参数 read.start-commit 指定起始消费位置 2Batch 增量消费通过参数 read.start-commit 指定起始消费位置通过参数 read.end-commit 指定结束消费位置区间为闭区间即包含起始、结束的 commit 3TimeTravelBatch 消费某个时间点的数据通过参数 read.end-commit 指定结束消费位置即可由于起始位置默认从最新所以无需重复声明 WITH 参数 名称Required默认值说明read.start-commitfalse默认从最新 commit支持 earliest 从最早消费read.end-commitfalse默认到最新 commit 限流 如果将全量数据(百亿数量级) 和增量先同步到 kafka再通过 flink 流式消费的方式将库表数据直接导成 hoodie 表因为直接消费全量部分数据量大吞吐高、乱序严重写入的 partition 随机会导致写入性能退化出现吞吐毛刺这时候可以开启限速参数保证流量平稳写入。 WITH 参数 名称Required默认值说明write.rate.limitfalse0默认关闭限速 写入方式 CDC 数据同步 CDC 数据保存了完整的数据库变更当前可通过两种途径将数据导入 hudi: 第一种通过 cdc-connector 直接对接 DB 的 binlog 将数据导入 hudi优点是不依赖消息队列缺点是对 db server 造成压力。第二种对接 cdc format 消费 kafka 数据导入 hudi优点是可扩展性强缺点是依赖 kafka。 注意如果上游数据无法保证顺序需要指定 write.precombine.field 字段。 1准备MySQL表 MySQL开启binlog并建表 create database test; use test; create table stu3 (id int unsigned auto_increment primary key COMMENT 自增id,name varchar(20) not null comment 学生名字,school varchar(20) not null comment 学校名字,nickname varchar(20) not null comment 学生小名,age int not null comment 学生年龄,class_num int not null comment 班级人数,phone bigint not null comment 电话号码,email varchar(64) comment 家庭网络邮箱,ip varchar(32) comment IP地址 ) engineInnoDB default charsetutf8;2flink读取mysql binlog并写入kafka 创建MySQL表 create table stu3_binlog(id bigint not null,name string,school string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string,primary key (id) not enforced ) with (connector mysql-cdc,hostname hadoop1,port 3306,username root,password aaaaaa,database-name test,table-name stu3 );创建Kafka表 create table stu3_binlog_sink_kafka(id bigint not null,name string,school string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string,primary key (id) not enforced ) with (connector upsert-kafka,topic cdc_mysql_stu3_sink,properties.zookeeper.connect hadoop1:2181,properties.bootstrap.servers hadoop1:9092,key.format json,value.format json );将mysql binlog日志写入kafka insert into stu3_binlog_sink_kafka select * from stu3_binlog;3flink读取kafka数据并写入hudi数据湖 创建kafka源表 create table stu3_binlog_source_kafka(id bigint not null,name string,school string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string ) with (connector kafka,topic cdc_mysql_stu3_sink,properties.bootstrap.servers hadoop1:9092,format json,scan.startup.mode earliest-offset,properties.group.id testGroup );创建hudi目标表 create table stu3_binlog_sink_hudi(id bigint not null,name string,school string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string,primary key (id) not enforced )partitioned by (school)with (connector hudi,path hdfs://hadoop1:8020/tmp/hudi_flink/stu3_binlog_sink_hudi,table.type MERGE_ON_READ,write.option insert,write.precombine.field school);将kafka数据写入到hudi中 insert into stu3_binlog_sink_hudi select * from stu3_binlog_source_kafka;4使用datafaker插入数据 datafaker安装及说明https://developer.aliyun.com/article/852227 新建meta.txt文件文件内容为 id||int||自增id[:inc(id,1)] name||varchar(20)||学生名字 school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)] nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)] age||int||学生年龄[:age] class_num||int||班级人数[:int(10, 100)] phone||bigint||电话号码[:phone_number] email||varchar(64)||家庭网络邮箱[:email] ip||varchar(32)||IP地址[:ipv4]生成10000条数据并写入到mysql中的test.stu3表 datafaker rdb mysqlmysqldb://root:aaaaaahadoop1:3306/test?charsetutf8 stu3 10000 --meta meta.txt注意如果要再次生成测试数据则需要修改meta.txt将自增id中的1改为比10000大的数不然会出现主键冲突情况。 5统计数据入Hudi情况 create table stu3_binlog_hudi_view(id bigint not null,name string,school string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string,primary key (id) not enforced )partitioned by (school)with (connector hudi,path hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi,table.type MERGE_ON_READ,write.precombine.field school);select count(*) from stu3_binlog_hudi_view; 6实时查看数据入湖情况 create table stu3_binlog_hudi_streaming_view(id bigint not null,name string,school string,nickname string,age int not null,class_num int not null,phone bigint not null,email string,ip string,primary key (id) not enforced )partitioned by (school)with (connector hudi,path hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi,table.type MERGE_ON_READ,write.precombine.field school,read.streaming.enabled true);select * from stu3_binlog_hudi_streaming_view;离线批量导入 如果存量数据来源于其他数据源可以使用批量导入功能快速将存量数据导成 Hoodie 表格式。 1原理 批量导入省去了 avro 的序列化以及数据的 merge 过程后续不会再有去重操作数据的唯一性需要自己来保证。 bulk_insert 需要在 Batch Execuiton Mode 下执行更高效Batch 模式默认会按照 partition path 排序输入消息再写入 Hoodie避免 file handle 频繁切换导致性能下降。 SET execution.runtime-mode batch; SET execution.checkpointing.interval 0;bulk_insert write task 的并发通过参数 write.tasks 指定并发的数量会影响到小文件的数量理论上bulk_insert write task 的并发数就是划分的 bucket 数当然每个 bucket 在写到文件大小上限parquet 120 MB的时候会 roll over 到新的文件句柄所以最后写文件数量 bulk_insert write task 数。 2WITH参数 名称Required默认值说明write.operationtrueupsert配置 bulk_insert 开启该功能write.tasksfalse4bulk_insert 写 task 的并发最后的文件数 write.taskswrite.bulk_insert.shuffle_by_partitionwrite.bulk_insert.shuffle_input从 0.11 开始falsetrue是否将数据按照 partition 字段 shuffle 再通过 write task 写入开启该参数将减少小文件的数量 但是可能有数据倾斜风险write.bulk_insert.sort_by_partitionwrite.bulk_insert.sort_input从 0.11 开始falsetrue是否将数据线按照 partition 字段排序再写入当一个 write task 写多个 partition开启可以减少小文件数量write.sort.memory128sort 算子的可用 managed memory单位 MB 3案例 Mysql建表 create database test; use test; create table stu4 (id int unsigned auto_increment primary key COMMENT 自增id,name varchar(20) not null comment 学生名字,school varchar(20) not null comment 学校名字,nickname varchar(20) not null comment 学生小名,age int not null comment 学生年龄,score decimal(4,2) not null comment 成绩,class_num int not null comment 班级人数,phone bigint not null comment 电话号码,email varchar(64) comment 家庭网络邮箱,ip varchar(32) comment IP地址 ) engineInnoDB default charsetutf8;新建meta.txt文件文件内容为 id||int||自增id[:inc(id,1)] name||varchar(20)||学生名字 school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)] nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)] age||int||学生年龄[:age] score||decimal(4,2)||成绩[:decimal(4,2,1)] class_num||int||班级人数[:int(10, 100)] phone||bigint||电话号码[:phone_number] email||varchar(64)||家庭网络邮箱[:email] ip||varchar(32)||IP地址[:ipv4]使用datafaker生成10万条数据并写入到mysql中的test.stu4表 datafaker rdb mysqlmysqldb://root:aaaaaahadoop1:3306/test?charsetutf8 stu4 100000 --meta meta.txt备注如果要再次生成测试数据则需要将meta.txt中的自增id改为比100000大的数不然会出现主键冲突情况。 Flink SQL client 创建myql数据源 create table stu4(id bigint not null,name string,school string,nickname string,age int not null,score decimal(4,2) not null,class_num int not null,phone bigint not null,email string,ip string,PRIMARY KEY (id) NOT ENFORCED ) with (connector jdbc,url jdbc:mysql://hadoop1:3306/test?serverTimezoneGMT%2B8,username root,password aaaaaa,table-name stu4 );Flink SQL client创建hudi表 create table stu4_sink_hudi(id bigint not null,name string,school string,nickname string,age int not null,score decimal(4,2) not null,class_num int not null,phone bigint not null,email string,ip string,primary key (id) not enforced )partitioned by (school)with (connector hudi,path hdfs://hadoop1:8020/tmp/hudi_flink/stu4_sink_hudi,table.type MERGE_ON_READ,write.option bulk_insert,write.precombine.field school);Flink SQL client执行mysql数据插入到hudi中 insert into stu4_sink_hudi select * from stu4;全量接增量 如果已经有全量的离线 Hoodie 表需要接上实时写入并且保证数据不重复可以开启 index bootstrap 功能。 如果觉得流程冗长可以在写入全量数据的时候资源调大直接走流模式写全量走完接新数据再将资源调小或者开启限流功能。 WITH 参数 名称Required默认值说明index.bootstrap.enabledtruefalse开启索引加载会将已存表的最新数据一次性加载到 state 中index.partition.regexfalse*设置正则表达式进行分区筛选默认为加载全部分区 使用流程 1 CREATE TABLE 创建和 Hoodie 表对应的语句注意 table type 要正确 2设置 index.bootstrap.enabled true开启索引加载功能 3flink conf 中设置 checkpoint 失败容忍 execution.checkpointing.tolerable-failed-checkpoints n(取决于checkpoint 调度次数) 4等待第一次 checkpoint 成功表示索引加载完成 5索引加载完成后可以退出并保存 savepoint (也可以直接用 externalized checkpoint) 6重启任务将 index.bootstrap.enabled 关闭参数配置到合适的大小如果RowDataToHoodieFunction 和 BootstrapFunction 并发不同可以重启避免 shuffle 说明 1索引加载是阻塞式所以在索引加载过程中 checkpoint 无法完成 2索引加载由数据流触发需要确保每个 partition 都至少有1条数据即上游 source 有数据进来 3索引加载为并发加载根据数据量大小加载时间不同可以在log中搜索 finish loading the index under partition 和 Load records from file 日志来观察索引加载的进度 4第一次checkpoint成功就表示索引已经加载完成后续从 checkpoint 恢复时无需再次加载索引 注意在当前的0.12版本以上划横线的部分已经不再需要了。0.9 cherry pick 分支之后 写入模式 Changelog 模式 如果希望 Hoodie 保留消息的所有变更I/-U/U/D之后接上 Flink 引擎的有状态计算实现全链路近实时数仓生产增量计算Hoodie 的 MOR 表通过行存原生支持保留消息的所有变更format 层面的集成通过流读 MOR 表可以消费到所有的变更记录。 1WITH 参数 名称Required默认值说明changelog.enabledfalsefalse默认是关闭状态即 UPSERT 语义所有的消息仅保证最后一条合并消息中间的变更可能会被 merge 掉改成 true 支持消费所有变更。 批快照读仍然会合并所有的中间结果不管 format 是否已存储中间状态。 开启 changelog.enabled 参数后中间的变更也只是 Best Effort: 异步的压缩任务会将中间变更合并成 1 条所以如果流读消费不够及时被压缩后只能读到最后一条记录。当然通过调整压缩的 buffer 时间可以预留一定的时间 buffer 给 reader比如调整压缩的两个参数 compaction.delta_commits:5compaction.delta_seconds: 3600。 说明 Changelog 模式开启流读的话要在 sql-client 里面设置参数 set sql-client.execution.result-modetableau; 或者 set sql-client.execution.result-modechangelog;否则中间结果在读的时候会被直接合并。参考https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#running-sql-queries 2流读 changelog 仅在 0.10.0 支持本 feature 为实验性。 开启 changelog 模式后hudi 会保留一段时间的 changelog 供下游 consumer 消费我们可以通过流读 ODS 层 changelog 接上 ETL 逻辑写入到 DWD 层如下图的 pipeline 流读的时候我们要注意 changelog 有可能会被 compaction 合并掉中间记录会消除可能会影响计算结果需要关注sql-client的属性result-mode同上。 3演示案例 使用changelog set sql-client.execution.result-modetableau; CREATE TABLE t6(id int,ts int,primary key (id) not enforced ) WITH (connector hudi,path hdfs://hadoop1:8020/tmp/hudi_flink/t6,table.type MERGE_ON_READ,read.streaming.enabled true,read.streaming.check-interval 4,changelog.enabled true );insert into t6 values (1,1); insert into t6 values (1,2);set table.dynamic-table-options.enabledtrue; select * from t6/* OPTIONS(read.start-commitearliest)*/; select count(*) from t6/* OPTIONS(read.start-commitearliest)*/;不使用changelog CREATE TABLE t6_v(id int,ts int,primary key (id) not enforced ) WITH (connector hudi,path hdfs://hadoop1:8020/tmp/hudi_flink/t6,table.type MERGE_ON_READ,read.streaming.enabled true,read.streaming.check-interval 4 );select * from t6_v/* OPTIONS(read.start-commitearliest)*/; select count(*) from t6_v/* OPTIONS(read.start-commitearliest)*/;Append 模式 从 0.10 开始支持 对于 INSERT 模式 MOR 默认会 apply 小文件策略 会追加写 avro log 文件COW 每次直接写新的 parquet 文件没有小文件策略 Hudi 支持丰富的 Clustering 策略优化 INSERT 模式下的小文件问题 1Inline Clustering 只有 Copy On Write 表支持该模式 名称Required默认值说明write.insert.clusterfalsefalse是否在写入时合并小文件COW 表默认 insert 写不合并小文件开启该参数后每次写入会优先合并之前的小文件不会去重吞吐会受影响 2Async Clustering 从 0.12 开始支持 WITH参数 名称Required默认值说明clustering.schedule.enabledfalsefalse是否在写入时定时异步调度 clustering plan默认关闭clustering.delta_commitsfalse4调度 clsutering plan 的间隔 commitsclustering.schedule.enabled 为 true 时生效clustering.async.enabledfalsefalse是否异步执行 clustering plan默认关闭clustering.tasksfalse4Clustering task 执行并发clustering.plan.strategy.target.file.max.bytesfalse1024 * 1024 * 1024Clustering 单文件目标大小默认 1GBclustering.plan.strategy.small.file.limitfalse600小于该大小的文件才会参与 clustering默认600MBclustering.plan.strategy.sort.columnsfalseN/A支持指定特殊的排序字段clustering.plan.partition.filter.modefalseNONE支持NONE不做限制RECENT_DAYS按时间天回溯SELECTED_PARTITIONS指定固定的 partitionclustering.plan.strategy.daybased.lookback.partitionsfalse2RECENT_DAYS 生效默认 2 天 Clustering Plan Strategy 支持定制化的 clustering 策略。 名称Required默认值说明clustering.plan.partition.filter.modefalseNONE支持· NONE不做限制· RECENT_DAYS按时间天回溯· SELECTED_PARTITIONS指定固定的 partitionclustering.plan.strategy.daybased.lookback.partitionsfalse2RECENT_DAYS 生效默认 2 天clustering.plan.strategy.cluster.begin.partitionfalseN/ASELECTED_PARTITIONS 生效指定开始 partition(inclusive)clustering.plan.strategy.cluster.end.partitionfalseN/ASELECTED_PARTITIONS 生效指定结束 partition(incluseve)clustering.plan.strategy.partition.regex.patternfalseN/A正则表达式过滤 partitionsclustering.plan.strategy.partition.selectedfalseN/A显示指定目标 partitions支持逗号 , 分割多个 partition Bucket 索引 从 0.11 开始支持 默认的 flink 流式写入使用 state 存储索引信息primary key 到 fileId 的映射关系。当数据量比较大的时候state的存储开销可能成为瓶颈bucket 索引通过固定的 hash 策略将相同 key 的数据分配到同一个 fileGroup 中避免了索引的存储和查询开销。 1WITH参数 名称Required默认值说明index.typefalseFLINK_STATE设置 BUCKET 开启 Bucket 索引功能hoodie.bucket.index.hash.fieldfalse主键可以设置成主键的子集hoodie.bucket.index.num.bucketsfalse4默认每个 partition 的 bucket 数当前设置后则不可再变更。 2和 state 索引的对比 bucket index 没有 state 的存储计算开销性能较好bucket index 无法扩 bucketsstate index 则可以依据文件的大小动态扩容bucket index 不支持跨 partition 的变更(如果输入是 cdc 流则没有这个限制)state index 没有限制 Hudi Catalog 从 0.12.0 开始支持通过 catalog 可以管理 flink 创建的表避免重复建表操作另外 hms 模式的 catalog 支持自动补全 hive 同步参数。 DFS 模式 Catalog SQL样例 CREATE CATALOG hoodie_catalogWITH (typehudi,catalog.path ${catalog 的默认路径},modedfs );Hms 模式 Catalog SQL 样例 CREATE CATALOG hoodie_catalogWITH (typehudi,catalog.path ${catalog 的默认路径},hive.conf.dir ${hive-site.xml 所在的目录},modehms -- 支持 dfs 模式通过文件系统管理表属性);1WITH 参数 名称Required默认值说明catalog.pathtrue–默认的 catalog 根路径用作表路径的自动推导默认的表路径${catalog.path}/${db_name}/${table_name}default-databasefalsedefault默认的 database 名hive.conf.dirfalse–hive-site.xml 所在的目录只在 hms 模式下生效modefalsedfs支持 hms模式通过 hive 管理元数据table.externalfalsefalse是否创建外部表只在 hms 模式下生效 2使用dfs方式 创建sql-client初始化sql文件 vim /opt/module/flink-1.13.6/conf/sql-client-init.sqlCREATE CATALOG hoodie_catalogWITH (typehudi,catalog.path /tmp/hudi_catalog,modedfs );USE CATALOG hoodie_catalog;指定sql-client启动时加载sql文件 hadoop fs -mkdir /tmp/hudi_catalogbin/sql-client.sh embedded -i conf/sql-client-init.sql -s yarn-session建库建表插入 create database test; use test;create table t2(uuid varchar(20),name varchar(10),age int,ts timestamp(3),partition varchar(20),primary key (uuid) not enforced ) with (connector hudi,path /tmp/hudi_catalog/default/t2,table.type MERGE_ON_READ );insert into t2 values(1,zs,18,TIMESTAMP 1970-01-01 00:00:01,a);退出sql-client重新进入表信息还在 use test; show tables; select * from t2;离线 Compaction MOR 表的 compaction 默认是自动打开的策略是 5 个 commits 执行一次压缩。 因为压缩操作比较耗费内存和写流程放在同一个 pipeline在数据量比较大的时候10w/s qps容易干扰写流程此时采用离线定时任务的方式执行 compaction 任务更稳定。 1设置参数 compaction.async.enabled 为 false关闭在线 compaction。compaction.schedule.enabled 仍然保持开启由写任务阶段性触发压缩 plan。 2原理 一个 compaction 的任务的执行包括两部分 schedule 压缩 plan 该过程推荐由写任务定时触发写参数 compaction.schedule.enabled 默认开启 执行对应的压缩 plan 3使用方式 执行命令 离线 compaction 需要手动执行 Java 程序程序入口 hudi-flink1.13-bundle-0.12.0.jar org.apache.hudi.sink.compact.HoodieFlinkCompactor // 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table参数配置 参数名required默认值备注–pathtrue–目标表的路径–compaction-tasksfalse-1压缩 task 的并发默认是待压缩 file group 的数量–compaction-max-memoryfalse100 单位 MB压缩时 log 数据的索引 map默认 100MB内存足够可以开大些–schedulefalsefalse是否要执行 schedule compaction 的操作当写流程还在持续写入表数据的时候开启这个参数有丢失查询数据的风险所以开启该参数一定要保证当前没有任务往表里写数据, 写任务的 compaction plan 默认是一直 schedule 的除非手动关闭默认 5 个 commits 一次压缩–seqfalseLIFO执行压缩任务的顺序默认是从最新的压缩 plan 开始执行可选值LIFO: 从最新的 plan 开始执行FIFO: 从最老的 plan 开始执行–servicefalsefalse是否开启 service 模式service 模式会打开常驻进程一直监听压缩任务并提交到集群执行从 0.11 开始执行–min-compaction-interval-secondsfalse600 单位 秒service 模式下的执行间隔默认 10 分钟 案例演示 创建表关闭在线压缩 create table t7(id int,ts int,primary key (id) not enforced ) with (connector hudi,path /tmp/hudi_catalog/default/t7,compaction.async.enabled false,compaction.schedule.enabled true,table.type MERGE_ON_READ );insert into t7 values(1,1); insert into t7 values(2,2); insert into t7 values(3,3); insert into t7 values(4,4); insert into t7 values(5,5);// 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://hadoop1:8020/tmp/hudi_catalog/default/t7离线 Clustering 异步的 clustering 相对于 online 的 async clustering 资源隔离从而更加稳定。 1设置参数 clustering.async.enabled 为 false关闭在线 clustering。clustering.schedule.enabled 仍然保持开启由写任务阶段性触发 clustering plan。 2原理 一个 clustering 的任务的执行包括两部分 schedule plan 推荐由写任务定时触发写参数 clustering.schedule.enabled 默认开启。执行对应的 plan 3使用方式 执行命令 离线 clustering 需要手动执行 Java 程序程序入口 hudi-flink1.13-bundle-0.12.0.jarorg.apache.hudi.sink.clustering.HoodieFlinkClusteringJob 注意必须是分区表否则报错空指针异常。 // 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table参数配置 参数名required默认值备注–pathtrue–目标表的路径。–clustering-tasksfalse-1Clustering task 的并发默认是待压缩 file group 的数量。–schedulefalsefalse是否要执行 schedule clustering plan 的操作当写流程还在持续写入表数据的时候开启这个参数有丢失查询数据的风险所以开启该参数一定要保证当前没有任务往表里写数据, 写任务的 clustering plan 默认是一直 schedule 的除非手动关闭默认 4 个 commits 一次 clustering。–seqfalseFIFO执行压缩任务的顺序默认是从最老的 clustering plan 开始执行可选值LIFO: 从最新的 plan 开始执行FIFO: 从最老的 plan 开始执行–target-file-max-bytesfalse1024 1024 1024最大目标文件默认 1GB。–small-file-limitfalse600小于该大小的文件会参与 clustering默认 600MB。–sort-columnsfalseN/AClustering 可选排序列。–servicefalsefalse是否开启 service 模式service 模式会打开常驻进程一直监听压缩任务并提交到集群执行从 0.11 开始执行。–min-compaction-interval-secondsfalse600 单位 秒service 模式下的执行间隔默认 10 分钟。 案例演示 创建表关闭在线压缩 create table t8(id int,age int,ts int,primary key (id) not enforced ) partitioned by (age) with (connector hudi,path /tmp/hudi_catalog/default/t8,clustering.async.enabled false,clustering.schedule.enabled true,table.type COPY_ON_WRITE );insert into t8 values(1,18,1); insert into t8 values(2,18,2); insert into t8 values(3,18,3); insert into t8 values(4,18,4); insert into t8 values(5,18,5);// 命令行的方式 ./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://hadoop1:8020/tmp/hudi_catalog/default/t8常见基础问题 1存储一直看不到数据 如果是 streaming 写请确保开启 checkpointFlink 的 writer 有 3 种刷数据到磁盘的策略 当某个 bucket 在内存积攒到一定大小 (可配默认 64MB)当总的 buffer 大小积攒到一定大小可配默认 1GB当 checkpoint 触发将内存里的数据全部 flush 出去 2数据有重复 如果是 COW 写需要开启参数 write.insert.drop.duplicatesCOW 写每个 bucket 的第一个文件默认是不去重的只有增量的数据会去重全局去重需要开启该参数MOR 写不需要开启任何参数定义好 primary key 后默认全局去重。注意从 0.10 版本开始该属性改名 write.precombine 并且默认为 true。 如果需要多 partition 去重需要开启参数: index.global.enabled 为 true。注意从 0.10 版本开始该属性默认为 true 索引 index 是判断数据重复的核心数据结构index.state.ttl 设置了索引保存的时间默认为 1.5 天对于长时间周期的更新比如更新一个月前的数据需要将 index.state.ttl 调大单位天设置小于 0 代表永久保存。注意从 0.10 版本开始该属性默认为 0。 3Merge On Read 写只有 log 文件 Merge On Read 默认开启了异步的 compaction策略是 5 个 commits 压缩一次当条件满足参会触发压缩任务另外压缩本身因为耗费资源所以不一定能跟上写入效率可能会有滞后。 可以先观察 log搜索 compaction 关键词看是否有 compact 任务调度 After filtering, Nothing to compact for 关键词说明本次 compaction strategy 是不做压缩。 核心原理分析 数据去重原理 Hoodie 的数据去重分两步 写入前攒 buffer 阶段去重核心接口HoodieRecordPayload#preCombine写入过程中去重核心接口HoodieRecordPayload#combineAndGetUpdateValue 1消息版本新旧 相同 record key 主键的数据通过write.precombine.field 指定的字段来判断哪个更新即 precombine 字段更大的 record 更新如果是相等的 precombine 字段则后来的数据更新。 从 0.10 版本开始write.precombine.field 字段为可选如果没有指定会看 schema 中是否有 ts 字段如果有ts 字段被选为 precombine 字段如果没有指定schema 中也没有 ts 字段则为处理顺序后来的消息默认较新。 2攒消息阶段的去重 Hoodie 将 buffer 消息发给 write handle 之前可以执行一次去重操作通过HoodieRecordPayload#preCombine 接口保留 precombine 字段较大的消息此操作为纯内存的计算在同一个 write task 中为单并发执行。 注意write.precombine 选项控制了攒消息的去重。 3写 parquet 增量消息的去重 在Hoodie 写入流程中Hoodie 每写一个 parquet 都会有 base 增量 merge 的过程增量的消息会先放到一个 spillable map 的数据结构构建内存 index这里的增量数据如果没有提前去重那么同 key 的后来消息会直接覆盖先来的消息。 Writer 接着扫 base 文件过程中会不断查看内存 index 是否有同 key 的新消息如果有会走 HoodieRecordPayload#combineAndGetUpdateValue 接口判断保留哪个消息。 注意: MOR 表的 compaction 阶段和 COW 表的写入流程都会有 parquet 增量消息去重的逻辑。 4跨 partition 的消息去重 默认情况下不同的 partition 的消息是不去重的即相同的 key 消息如果新消息换了 partition那么老的 partiiton 消息仍然保留。 开启 index.global.enabled 选项开启跨 partition 去重原理是先往老的 partiton 发一条删除消息再写新 partition。 表写入原理 分为三个模块数据写入、数据压缩与数据清理。 1数据写入分析 基础数据封装将数据流中flink的RowData封装成Hoodie实体BucketAssigner桶分配器主要是给数据分配写入的文件地址若为插入操作则取大小最小的FileGroup对应的FileId文件内进行插入在此文件的后续写入中文件 ID 保持不变并且提交时间会更新以显示最新版本。这也意味着记录的任何特定版本给定其分区路径都可以使用文件 ID 和 instantTime进行唯一定位若为更新操作则直接在当前location进行数据更新Hoodie Stream Writer数据写入将数据缓存起来在超过设置的最大flushSize或是做checkpoint时进行刷新到文件中Oprator Coordinator主要与Hoodie Stream Writer进行交互处理checkpoint等事件在做checkpoint时提交instant到timeLine上并生成下一个instant的时间算法为取当前最新的commi时间比对当前时间与commit时间若当前时间大于commit时间则返回否则一直循环等待生成。 2数据压缩 压缩compaction用于在 MergeOnRead存储类型时将基于行的log日志文件转化为parquet列式数据文件用于加快记录的查找。compaction首先会遍历各分区下最新的parquet数据文件和其对应的log日志文件进行合并并生成新的FileSlice在TimeLine 上提交新的Instance 具体策略分为4种具体见官网说明 compaction.trigger.strategy: Strategy to trigger compaction, options are 1.num_commits: trigger compaction when reach N delta commits; 2.time_elapsed: trigger compaction when time elapsed N seconds since last compaction; 3.num_and_time: trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied; 4.num_or_time: trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is num_commits Default Value: num_commits (Optional)在项目实践中需要注意参数’read.streaming.skip_compaction’ 参数的配置其表示在流式读取该表是否跳过压缩后的数据若该表用于后续聚合操作表的输入表则需要配置值为true,表示聚合操作表不再消费读取压缩数据。若不配置或配置为false,则该表中的数据在未被压缩之前被聚合操作表读取了一次在压缩后数据又被读取一次会导致聚合表的sum、count等算子结果出现双倍情况。 3数据清理 随着用户向表中写入更多数据对于每次更新Hudi会生成一个新版本的数据文件用于保存更新后的记录COPY_ON_WRITE或将这些增量更新写入日志文件以避免重写更新版本的数据文件MERGE_ON_READ。在这种情况下根据更新频率文件版本数可能会无限增长但如果不需要保留无限的历史记录则必须有一个流程服务来回收旧版本的数据这就是 Hudi 的清理服务。 具体清理策略可参考官网一般使用的清理策略为KEEP_LATEST_FILE_VERSIONS此策略具有保持 N 个文件版本而不受时间限制的效果。会删除N之外的FileSlice。 4Job图 如下为生产环境中flink Job图可以看到各task和上述分析过程对应需要注意的是可以调整并行度来提升写入速度。 表读取原理 如下为Hudi数据流式读取Job图。 其过程为 开启split_monitor算子每隔N秒(可配置)监听TimeLine上变化并将变更的Instance封装为FileSlice。分发log文件时候按照fileId值进行keyBy保证同一file group下数据文件都给一个Task进行处理从而保证数据处理的有序性。split_reader根据FileSlice信息进行数据读取。
http://www.hkea.cn/news/14482151/

相关文章:

  • 网站开发如何进行管理开发一个平台
  • 一个域名多个网站外贸 wordpress模板
  • 上海做展会的网站都有哪些浙江省城乡住房建设网站
  • 我想建立一个网站不知道怎么做啊东莞网络优化推广
  • 在地税网站怎么做税种认定哪个网站教做ppt模板
  • 黄山网站开发wordpress 标签前缀
  • dedecms精仿学校网站模板固戍网站建设
  • 网站备案一般需要多久做视频网站 视频放在哪
  • 宿迁网站建设案例团购网站建设方案
  • 网站标题作弊网站开发平台及常用开发工具
  • wordpress主页设置分类网站优化公司推荐
  • 河北企业建站系统信息移动端网站开发标题设置
  • 化工集团网站建设 中企动力重庆富通科技有限公司网站
  • 江干建设局网站南宁网站制作最新招聘信息
  • 网上做家教兼职哪个网站做照片用的视频模板下载网站好
  • 优斗士做网站怎么样高端网站搭建公司
  • 下载百度官方网站常用网站开发软件
  • 江苏省建设通官方网站ai做的网站怎么切图
  • fview网站开发建筑工程施工合同电子版
  • 网站建设销售员话术企业展厅设计施工
  • 专业做包装的电商网站廊坊公司快速建站
  • 门户网站建设公司咨询济南房产信息网
  • 南宁市企业网站建设黄页引流推广
  • 成都网站制作工具手机网上银行
  • 哪些购物网站有做拼团活动logo制作免费版
  • 网络平台指网站 建设项目所在地网站提示域名重定向怎么做
  • 系统优化软件哪个最好的网站seo在线优化
  • 如何快速搭建自己的网站如何免费建站
  • 网站建设-英九网络用cms做单页网站怎么做
  • 网站主机空间价格seo做得比较好的公司