当前位置: 首页 > news >正文

北京网站优化诊断淘宝网pc首页

北京网站优化诊断,淘宝网pc首页,兴安盟建设局网站,手机网站模板下载免费Flink ⽀持⾮常多的数据 Join ⽅式#xff0c;主要包括以下三种#xff1a; 动态表#xff08;流#xff09;与动态表#xff08;流#xff09;的 Join动态表#xff08;流#xff09;与外部维表#xff08;⽐如 Redis#xff09;的 Join动态表字段的列转⾏#xf…Flink ⽀持⾮常多的数据 Join ⽅式主要包括以下三种 动态表流与动态表流的 Join动态表流与外部维表⽐如 Redis的 Join动态表字段的列转⾏⼀种特殊的 Join 细分 Flink SQL ⽀持的 Join Regular Join流与流的 Join包括 Inner Equal Join、Outer Equal Join Interval Join流与流的 Join两条流⼀段时间区间内的 Join Temporal Join流与流的 Join包括事件时间处理时间的 Temporal Join类似于离线中的快照 Join Lookup Join流与外部维表的 Join Array Expansion表字段的列转⾏类似于 Hive 的 explode 数据炸开的列转⾏ Table Function⾃定义函数的表字段的列转⾏⽀持 Inner Join 和 Left Outer Join 1.Regular Join **Regular Join 定义⽀持 Batch\Streaming**Regular Join 和离线 Hive SQL ⼀样的 Regular Join通过条件关联两条流数据输出。 **应⽤场景**⽐如⽇志关联扩充维度数据构建宽表⽇志通过 ID 关联计算 CTR。 Regular Join 包含以下⼏种以 L 作为左流中的数据标识 R 作为右流中的数据标识 Inner JoinInner Equal Join流任务中只有两条流 Join 到才输出输出 [L, R]Left JoinOuter Equal Join流任务中左流数据到达之后⽆论有没有 Join 到右流的数据都会输出Join 到输出 [L, R] 没 Join 到输出 [L, null] 如果右流数据到达之后发现左流之前输出过没有 Join 到的数据则会发起回撤流先输出 -[L, null] 然后输出 [L, R]Right JoinOuter Equal Join有 Left Join ⼀样左表和右表的执⾏逻辑完全相反Full JoinOuter Equal Join流任务中左流或者右流的数据到达之后⽆论有没有 Join 到另外⼀条流的数据都会输出对右流来说Join 到输出 [L, R] 没 Join 到输出 [null, R] 对左流来说Join 到输出 [L, R] 没 Join 到输出 [L, null] 。如果⼀条流的数据到达之后发现另⼀条流之前输出过没有 Join 到的数据则会发起回撤流左流数据到达为例回撤 -[null, R] 输出[L, R] 右流数据到达为例回撤 -[L, null] 输出 [L, R] **实际案例**案例为曝光⽇志关联点击⽇志筛选既有曝光⼜有点击的数据并且补充点击的扩展参数 aInner Join 案例 -- 曝光⽇志数据 CREATE TABLE show_log_table (log_id BIGINT,show_params STRING ) WITH (connector datagen,rows-per-second 2,fields.show_params.length 1,fields.log_id.min 1,fields.log_id.max 100 );-- 点击⽇志数据 CREATE TABLE click_log_table (log_id BIGINT,click_params STRING ) WITH (connector datagen,rows-per-second 2,fields.click_params.length 1,fields.log_id.min 1,fields.log_id.max 10 );CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING ) WITH (connector print );-- 流的 INNER JOIN条件为 log_id INSERT INTO sink_table SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params FROM show_log_table INNER JOIN click_log_table ON show_log_table.log_id click_log_table.log_id;输出结果如下 I[5, d, 5, f] I[5, d, 5, 8] I[5, d, 5, 2] I[3, 4, 3, 0] I[3, 4, 3, 3]bLeft Join 案例 CREATE TABLE show_log_table (log_id BIGINT,show_params STRING ) WITH (connector datagen,rows-per-second 1,fields.show_params.length 3,fields.log_id.min 1,fields.log_id.max 10 );CREATE TABLE click_log_table (log_id BIGINT,click_params STRING ) WITH (connector datagen,rows-per-second 1,fields.click_params.length 3,fields.log_id.min 1,fields.log_id.max 10 );CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING ) WITH (connector print );set sql-client.execution.result-modechangelog;INSERT INTO sink_table SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params FROM show_log_table LEFT JOIN click_log_table ON show_log_table.log_id click_log_table.log_id;输出结果如下 I[5, f3c, 5, c05] I[5, 6e2, 5, 1f6] I[5, 86b, 5, 1f6] I[5, f3c, 5, 1f6] -D[3, 4ab, null, null] -D[3, 6f2, null, null] I[3, 4ab, 3, 765] I[3, 6f2, 3, 765] I[2, 3c4, null, null] I[3, 4ab, 3, a8b] I[3, 6f2, 3, a8b] I[2, c03, null, null] ...cFull Join 案例 CREATE TABLE show_log_table (log_id BIGINT,show_params STRING ) WITH (connector datagen,rows-per-second 2,fields.show_params.length 1,fields.log_id.min 1,fields.log_id.max 10 );CREATE TABLE click_log_table (log_id BIGINT,click_params STRING )WITH (connector datagen,rows-per-second 2,fields.click_params.length 1,fields.log_id.min 1,fields.log_id.max 10 );CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING ) WITH (connector print );INSERT INTO sink_table SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params FROM show_log_table FULL JOIN click_log_table ON show_log_table.log_id click_log_table.log_id;输出结果如下 I[null, null, 7, 6] I[6, 5, null, null] -D[1, c, null, null] I[1, c, 1, 2] I[3, 1, null, null] I[null, null, 7, d] I[10, 0, null, null] I[null, null, 2, 6] -D[null, null, 7, 6] -D[null, null, 7, d] ...关于 Regular Join 的注意事项 实时 Regular Join 可以不是 等值 join等值 join 和 ⾮等值 join 区别在于等值 join 数据 shuffle 策略是 Hash会按照 Join on 中的等值条件作为 id 发往对应的下游 ⾮等值 join 数据 shuffle 策略是 Global所有数据发往⼀个并发按照⾮等值条件进⾏关联。 等值 Join 非等值 Join Join 的流程是左流新来⼀条数据之后会和右流中符合条件的所有数据做 Join然后输出。 流的上游是⽆限的数据要做到关联的话Flink 会将两条流的所有数据都存储在 State 中所以 Flink 任务的 State 会⽆限增⼤需要为 State 配置合适的 TTL以防⽌ State 过⼤。 2.Interval Join时间区间 Join **Interval Join 定义⽀持 Batch\Streaming**Interval Join 可以让⼀条流去 Join 另⼀条流中前后⼀段时间内的数据。 **应⽤场景**Regular Join 会产⽣回撤流在实时数仓中⼀般写⼊的 sink 是类似于 Kafka 的消息队列然后接 clickhouse 等引擎这些引擎不具备处理回撤流的能⼒Interval Join ⽤于消灭回撤流的。 Interval Join 包含以下⼏种以 L 作为左流中的数据标识 R 作为右流中的数据标识 Inner Interval Join流任务中只有两条流 Join 到满⾜ Join on 中的条件两条流的数据在时间区间 满⾜其他等值条件才输出输出 [L, R]Left Interval Join流任务中左流数据到达之后如果没有 Join 到右流的数据就会等待放在 State 中等如果右流之后数据到达发现能和刚刚那条左流数据 Join 到则会输出 [L,R] 。事件时间中随着 Watermark 的推进也⽀持处理时间。如果发现发现左流 State 中的数据过期了就把左流中过期的数据从 State 中删除然后输出 [L, null] 如果右流 State 中的数据过期了就直接从 State 中删除。Right Interval Join和 Left Interval Join 执⾏逻辑⼀样只不过左表和右表的执⾏逻辑完全相反。Full Interval Join流任务中左流或者右流的数据到达之后如果没有 Join 到另外⼀条流的数据就会等待左流放在左流对应的 State 中等右流放在右流对应的 State 中等如果之后另⼀条流数据到达之后发现能和刚刚那条数据 Join 到则会输出 [L, R] 。事件时间中随着 Watermark 的推进也⽀持处理时间发现 State 中的数据过期了就将这些数据从 State 中删除并且输出左流过期输出[L, null] 右流过期输出 -[null, R] **Inner Interval Join 和 Outer Interval Join 的区别在于**Outer 在随着时间推移的过程中如果有数据过期了之后会根据是否是 Outer 将没有 Join 到的数据也给输出。 **实际案例**曝光⽇志关联点击⽇志筛选既有曝光⼜有点击的数据条件是曝光发⽣之后4 ⼩时之内的点击并且补充点击的扩展参数 aInner Interval Join CREATE TABLE show_log_table (log_id BIGINT,show_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time ) WITH (connector datagen,rows-per-second 1,fields.show_params.length 1,fields.log_id.min 1,fields.log_id.max 10 );CREATE TABLE click_log_table (log_id BIGINT,click_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time ) WITH (connector datagen,rows-per-second 1,fields.click_params.length 1,fields.log_id.min 1,fields.log_id.max 10 );CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING ) WITH (connector print );INSERT INTO sink_table SELECTshow_log_table.log_id as s_id,show_log_table.show_params as s_params,click_log_table.log_id as c_id,click_log_table.click_params as c_params FROM show_log_table INNER JOIN click_log_table ON show_log_table.log_id click_log_table.log_id AND show_log_table.row_time BETWEEN click_log_table.row_time - INTERVAL 4 SECOND AND click_log_table.row_time输出结果如下 6 I[2, a, 2, 6] 6 I[2, 6, 2, 6] 2 I[4, 1, 4, 5] 2 I[10, 8, 10, d] 2 I[10, 7, 10, d] 2 I[10, d, 10, d] 2 I[5, b, 5, d] 6 I[1, a, 1, 7]bLeft Interval Join CREATE TABLE show_log (log_id BIGINT,show_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time ) WITH (connector datagen,rows-per-second 1,fields.show_params.length 1,fields.log_id.min 1,fields.log_id.max 10 );CREATE TABLE click_log (log_id BIGINT,click_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time ) WITH (connector datagen,rows-per-second 1,fields.click_params.length 1,fields.log_id.min 1,fields.log_id.max 10 );CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING ) WITH (connector print );INSERT INTO sink_table SELECTshow_log.log_id as s_id,show_log.show_params as s_params,click_log.log_id as c_id,click_log.click_params as c_params FROM show_log LEFT JOIN click_log ON show_log.log_id click_log.log_id AND show_log.row_time BETWEEN click_log.row_time - INTERVAL 5 SECOND AND click_log.row_time输出结果如下 I[6, e, 6, 7] I[11, d, null, null] I[7, b, null, null] I[8, 0, 8, 3] I[13, 6, null, null]cFull Interval Join CREATE TABLE show_log (log_id BIGINT,show_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time ) WITH (connector datagen,rows-per-second 1,fields.show_params.length 1,fields.log_id.min 5,fields.log_id.max 15 );CREATE TABLE click_log (log_id BIGINT,click_params STRING,row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),WATERMARK FOR row_time AS row_time ) WITH (connector datagen,rows-per-second 1,fields.click_params.length 1,fields.log_id.min 1,fields.log_id.max 10 );CREATE TABLE sink_table (s_id BIGINT,s_params STRING,c_id BIGINT,c_params STRING ) WITH (connector print );INSERT INTO sink_table SELECTshow_log.log_id as s_id,show_log.show_params as s_params,click_log.log_id as c_id,click_log.click_params as c_params FROM show_log FULL JOIN click_log ON show_log.log_id click_log.log_id AND show_log.row_time BETWEEN click_log.row_time - INTERVAL 5 SECOND AND click_log.row_time输出结果如下 I[6, 1, null, null] I[7, 3, 7, 8] I[null, null, 6, 6] I[null, null, 4, d] I[8, d, null, null] I[null, null, 3, b]关于 Interval Join 的注意事项 实时 Interval Join 可以不是 等值 join 等值 join 和 ⾮等值 join 区别在于 等值 join 数据 shuffle 策略是 Hash会按照 Join on 中的等值条件作为 id 发往对应的下游 ⾮等值 join 数据 shuffle 策略是 Global所有数据发往⼀个并发将满⾜条件的数据进⾏关联输出。 3.Temporal Join快照 Join **Temporal Join 定义⽀持 Batch\Streaming**同离线中的 拉链快照表 Flink SQL 中对应的表叫做 Versioned Table 使⽤⼀个明细表去 join 这个 Versioned Table 的 join 操作就叫做 Temporal Join。 Temporal Join 中Versioned Table 是对同⼀条 key在 DDL 中以 primary key 标记同⼀个 key的历史版本根据时间划分版本做维护当有明细表 Join 这个表时可以根据明细表中的时间版本选择 Versioned Table 对应时间区间内的快照数据进⾏ join。 **应⽤场景**⽐如汇率数据实时的根据汇率计算总⾦额在 12:00 之前事件时间⼈⺠币和美元汇率是 7:1在 12:00 之后变为 6:1那么在 12:00 之前数据就要按照 7:1 进⾏计算12:00 之后就要按照 6:1 计算。 **Verisoned Table**Verisoned Table 中存储的数据通常来源于 CDC 或者会发⽣更新的数据。Flink SQL 会为 Versioned Table 维护 Primary Key 下的所有历史时间版本的数据。 **示例**汇率计算中定义 Versioned Table 的两种⽅式。 -- 定义⼀个汇率 versioned 表 CREATE TABLE currency_rates (currency STRING,conversion_rate DECIMAL(32, 2),update_time TIMESTAMP(3) METADATA FROM values.source.timestamp VIRTUAL,WATERMARK FOR update_time AS update_time,-- PRIMARY KEY 定义⽅式PRIMARY KEY(currency) NOT ENFORCED ) WITH (connector kafka,value.format debezium-json,/* ... */ );-- 将数据源表按照 Deduplicate ⽅式定义为 Versioned Table CREATE VIEW versioned_rates AS SELECT currency, conversion_rate, update_time -- 1. 定义 update_time 为时间字段FROM (SELECT *,ROW_NUMBER() OVER (PARTITION BY currency -- 2. 定义 currency 为主键ORDER BY update_time DESC -- 3. ORDER BY 中必须是时间戳列) AS rownum FROM currency_rates) WHERE rownum 1;**Temporal Join ⽀持的时间语义**事件时间、处理时间 **实际案例**汇率计算以 事件时间 任务举例 -- 1. 定义⼀个输⼊订单表 CREATE TABLE orders (order_id BIGINT,price BIGINT,currency STRING,order_time TIMESTAMP(3),WATERMARK FOR order_time AS order_time ) WITH (connector filesystem, path file:///Users/hhx/Desktop/orders.csv,format csv );1,100,a,2023-11-01 10:10:10.100 2,200,a,2023-11-02 10:10:10.100 3,300,a,2023-11-03 10:10:10.100 4,300,a,2023-11-04 10:10:10.100 5,300,a,2023-11-05 10:10:10.100 6,300,a,2023-11-06 10:10:10.100-- 2. 定义⼀个汇率 versioned 表其中 versioned 表的概念下⽂会介绍到 CREATE TABLE currency_rates (currency STRING,conversion_rate BIGINT,update_time TIMESTAMP(3),WATERMARK FOR update_time AS update_time,PRIMARY KEY(currency) NOT ENFORCED ) WITH (connector filesystem, path file:///Users/hhx/Desktop/currency_rates.csv,format csv );a,10,2023-11-01 09:10:10.100 a,11,2023-11-01 10:00:10.100 a,12,2023-11-01 10:10:10.100 a,13,2023-11-01 10:20:10.100 a,14,2023-11-02 10:20:10.100 a,15,2023-11-03 10:20:10.100 a,16,2023-11-04 10:20:10.100 a,17,2023-11-05 10:20:10.100 a,18,2023-11-06 10:00:10.100 a,19,2023-11-06 10:11:10.100SELECTorder_id,price,orders.currency,conversion_rate,order_time,update_time FROM orders -- 3. Temporal Join 逻辑 -- SQL 语法为FOR SYSTEM_TIME AS OF LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time ON orders.currency currency_rates.currency;可以看到相同的货币汇率会根据具体数据的事件时间不同 Join 到对应时间的汇率【Join 到最近可用的汇率】 注意 事件时间的 Temporal Join ⼀定要给左右两张表都设置 Watermark。 事件时间的 Temporal Join ⼀定要把 Versioned Table 的主键包含在 Join on 的条件中。 **实际案例**汇率计算以 处理时间 任务举例 10:15 SELECT * FROM LatestRates;currency rateUS Dollar 102 Euro 114 Yen 110:30 SELECT * FROM LatestRates;currency rateUS Dollar 102 Euro 114 Yen 1-- 10:42 时Euro 的汇率从 114 变为 116 10:52 SELECT * FROM LatestRates;currency rateUS Dollar 102 Euro 116 Yen 1-- 从 Orders 表查询数据 SELECT * FROM Orders;amount currency2 Euro 在处理时间 10:15 到达的⼀条数据1 US Dollar 在处理时间 10:30 到达的⼀条数据2 Euro 在处理时间 10:52 到达的⼀条数据-- 执⾏关联查询 SELECTo.amount,o.currency,r.rate, o.amount * r.rate FROMOrders AS oJOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS rON r.currency o.currency-- 结果如下 amount currency rate amount*rate2 Euro 114 228 在处理时间 10:15 到达的⼀条数据1 US Dollar 102 102 在处理时间 10:30 到达的⼀条数据2 Euro 116 232 在处理时间 10:52 到达的⼀条数据处理时间语义中是根据左流数据到达的时间决定拿到的汇率值Flink 就只为 LatestRates 维护了最新的状态数据不需要关⼼历史版本的数据。 注意 Processing-time temporal join is not supported yet.4.Lookup Join维表 Join **Lookup Join 定义⽀持 Batch\Streaming**Lookup Join 是维表 Join实时数仓场景中实时获取外部缓存。 **应⽤场景**Regular JoinInterval Join 等上⾯说的 Join 都是流与流之间的 Join⽽ Lookup Join 是流与 RedisMysqlHBase 这种存储介质的 JoinLookup 的意思是实时查找。 **实际案例**使⽤曝光⽤户⽇志流show_log关联⽤户画像维表user_profile关联到⽤户的维度之后提供给下游计算分性别年龄段的曝光⽤户数使⽤。 输⼊数据 曝光⽤户⽇志流show_log数据数据存储在 kafka 中 log_id timestamp user_id 1 2021-11-01 00:01:03 a 2 2021-11-01 00:03:00 b 3 2021-11-01 00:05:00 c 4 2021-11-01 00:06:00 b 5 2021-11-01 00:07:00 c⽤户画像维表user_profile数据数据存储在 redis 中 user_id(主键) age sex a 12-18 男 b 18-24 ⼥ c 18-24 男**注意**redis 中的数据结构是按照 keyvalue 存储的其中 key 为 user_idvalue 为 agesex 的 json。 CREATE TABLE show_log (log_id BIGINT,timestamp TIMESTAMP(3),user_id STRING,proctime AS PROCTIME() ) WITH (connector filesystem, path file:///Users/hhx/Desktop/show_log.csv,format csv );1 2021-11-01 00:01:03 a 2 2021-11-01 00:03:00 b 3 2021-11-01 00:05:00 c 4 2021-11-01 00:06:00 b 5 2021-11-01 00:07:00 cCREATE TABLE user_profile (user_id STRING,age STRING,sex STRING,proctime AS PROCTIME(),PRIMARY KEY(user_id) NOT ENFORCED ) WITH (connector filesystem, path file:///Users/hhx/Desktop/currency_rates.csv,format csv );a 12-18 男 b 18-24 ⼥ c 18-24 男CREATE TABLE sink_table (log_id BIGINT,timestamp TIMESTAMP(3),user_id STRING,proctime TIMESTAMP(3),age STRING,sex STRING ) WITH (connector print );-- Processing-time temporal join is not supported yet. -- lookup join 的 query 逻辑 INSERT INTO sink_table SELECTs.log_id as log_id, s.timestamp as timestamp, s.user_id as user_id, s.proctime as proctime, u.sex as sex, u.age as age FROM show_log AS s LEFT JOIN user_profile FOR SYSTEM_TIME AS OF s.proctime AS u ON s.user_id u.user_id输出数据如下 log_id timestamp user_id age sex 1 2021-11-01 00:01:03 a 12-18 男 2 2021-11-01 00:03:00 b 18-24 ⼥ 3 2021-11-01 00:05:00 c 18-24 男 4 2021-11-01 00:06:00 b 18-24 ⼥ 5 2021-11-01 00:07:00 c 18-24 男实时的 lookup 维表关联能使⽤ 处理时间 去做关联。 注意 a同⼀条数据关联到的维度数据可能不同 实时数仓中常⽤的实时维表是不断变化的当前流表数据关联完维表数据后如果同⼀个 key 的维表的数据发⽣了变化已关联到的维表的结果数据不会再同步更新。 举个例⼦维表中 user_id 为 1 的数据在 0800 时 age 由 12-18 变为了 18-24那么当任务在 0801 failover 之后从 0759 开始回溯数据时原本应该关联到 12-18 的数据会关联到 18-24 的 age 数据有可能会影响数据质量。 b会发⽣实时的新建及更新的维表应该建⽴起数据延迟的监控防⽌流表数据先于维表数据到达关联不到维表数据 c维表常⻅的性能问题及优化思路 维表性能问题 ⾼ qps 下访问维表存储引擎产⽣的任务背压数据产出延迟问题。 举个例⼦ **在没有使⽤维表的情况下**⼀条数据从输⼊ Flink 任务到输出 Flink 任务的时延假如为 0.1 ms 那么并⾏度为 1 的任务的吞吐可以达到 1 query / 0.1 ms 1w qps 。 **在使⽤维表之后**每条数据访问维表的外部存储的时⻓为 2 ms 那么⼀条数据从输⼊ Flink 任务到输出 Flink 任务的时延就会变成 2.1 ms 那么同样并⾏度为 1 的任务的吞吐只能达到 1 query / 2.1 ms 476 qps 两者的吞吐量相差 21 倍导致维表 join 的算⼦会产⽣背压任务产出会延迟。 常⽤的优化⽅案-DataStream **按照 redis 维表的 key 分桶 local cache**通过按照 key 分桶的⽅式让⼤多数据的维表关联的数据访问⾛之前访问过得 local cache 即可把访问外部存储 2.1 ms 处理⼀个 query 变为访问内存的 0.1 ms 处理⼀个 query 的时⻓。**异步访问外存**DataStream api 有异步算⼦可以利⽤线程池去同时多次请求维表外部存储把 2.1 ms 处理 1 个 query 变为 2.1 ms 处理 10 个 query吞吐可变优化到 10 / 2.1 ms 4761 qps。**批量访问外存**除了异步访问之外还可以批量访问外部存储举例在访问 redis 维表的 1 query 占⽤ 2.1 ms 时⻓中其中可能有 2 ms 都是在⽹络请求上⾯的耗时 其中只有 0.1 ms 是 redis server 处理请求的时⻓可以使⽤ redis 提供的 pipeline 能⼒在客户端也就是 flink 任务 lookup join 算⼦中攒⼀批数据使⽤ pipeline 去同时访问 redis sever把 2.1 ms 处理 1 个 query 变为 7ms2ms 50 * 0.1ms 处理 50 个 query吞吐可变为 50 query / 7 ms 7143 qps。 **实测**上述优化效果中最好⽤的是 1 32 相⽐ 3 还是⼀条⼀条发请求性能会差⼀些。 常⽤的优化⽅案-Flink SQL **按照 redis 维表的 key 分桶 local cache**sql 中做分桶得先做 group by如果做了 group by 的聚合就只能在 udaf 中做访问 redis 处理并且 UDAF 产出的结果只能是⼀条实现复杂因此选择不做 keyby 分桶直接使⽤ local cache 做本地缓存虽然【直接缓存】的效果⽐【先按照 key 分桶再做缓存】的效果差但是也能减少访问 redis 压⼒。 **异步访问外存**官⽅实现的 hbase connector ⽀持异步访问搜索 lookup.async。 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/hbase/ **批量访问外存**基于 redis 的批量访问外存优化功能参考下⽂。 https://mp.weixin.qq.com/s/ku11tCZp7CAFzpkqd4J1cQ5.Regular Join 、Interval Join、Temporal Join、Lookup Join 总结 aFlinkSQL 的 Join 按照流的性质分为 流与流的 JoinRegular JoinInterval JoinTemporal Join流于外部存储的 JoinLookup Join bInner Join 与 Outer Join 区别 Inner Join只有两条流 Join 上才会发出不涉及回撤流 Outer JoinJoin 不上会发出 null如果是 Regular Outer Join 涉及回撤流Interval Outer Join 不涉及回撤流 cRegular Join 、Interval Join、Temporal Join 区别 Regular Join如果不设置状态的 TTL两条流的所有数据都会暂存进行 Join涉及回撤流 Interval Join可以选定 一条流 的 指定时间区间内 的 数据 进行 Join不涉及回撤流 Temporal Join根据 一条流的时间字段 选择 另一条流的历史时间区间 进行 Join不涉及回撤流
http://www.hkea.cn/news/14290212/

相关文章:

  • 2017响应式网站 全站天津seo培训
  • 做公司网站怎么删除图片义乌万物网络科技 网站建设
  • 高中学校网站模板网页微信登录
  • 网站开发 0755优秀营销策划方案
  • 北京建站公司兴田德润很好热搜关键词
  • 手机如何做api网站中国企业500强
  • 公司网站维护一般需要做什么商机互联网站建设
  • 上海发乐门网站建设公司遵义房产信息网
  • 网站格式有哪些爱站seo
  • 统计二级域名的网站流量有什么用广东手机网站建设价格
  • 做网站公司昆明自助建设视频网站
  • 北京朝阳网站戴尔的网站建设有哪些主要特色
  • 专业行业网站建站报价lol做视频那个网站好
  • 用ps怎么做网站导航条wordpress 文章表格
  • 茶叶网站源码 下载兰州专业做网站的公司
  • 建设银行观澜支行网站广州商城网站建设地址
  • .net网站吃内存少儿编程加盟哪个品牌好
  • 群晖nas做网站高校网站群管理系统
  • 北京网站推广技巧交换链接的例子
  • 游戏网站开发设计报告联派网站建设
  • 自己做的网站竞价优化自己建一个简单的网站
  • 唐山网址建站p2p借贷网站开发
  • 网站的制作电工培训内容
  • 中小型网站建设与管理设计总结网站建设分为
  • 十大知名平面设计公司廊坊优化网站排名
  • 怎么做网站建设作业河南建设集团
  • 网站快速收录工具企业信息化管理系统有哪些
  • 成都地区网站建设一站式平台网站开发技术
  • 做个英文网站多少钱网络营销策略的概念有哪些
  • 宁波企业网站seo嘉定营销型 网站制作