做面包的公司网站,容桂网站开发,英语做美食网站,推荐上海网站建设Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置如何处理更新结果、时态表、流上的join、流上的确定性以及查询配置 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例1 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例2 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例3 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例4 16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例6 17、Flink 之Table API: Table API 支持的操作1 17、Flink 之Table API: Table API 支持的操作2 18、Flink的SQL 支持的操作和语法 19、Flink 的Table API 和 SQL 中的内置函数及示例1 19、Flink 的Table API 和 SQL 中的自定义函数及示例2 19、Flink 的Table API 和 SQL 中的自定义函数及示例3 19、Flink 的Table API 和 SQL 中的自定义函数及示例4 20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL可以直接提交 SQL 任务到集群上
22、Flink 的table api与sql之创建表的DDL 24、Flink 的table api与sql之Catalogs介绍、类型、java api和sql实现ddl、java api和sql操作catalog-1 24、Flink 的table api与sql之Catalogsjava api操作数据库、表-2 24、Flink 的table api与sql之Catalogsjava api操作视图-3 24、Flink 的table api与sql之Catalogsjava api操作分区与函数-4 25、Flink 的table api与sql之函数(自定义函数示例) 26、Flink 的SQL之概览与入门示例 27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例1 27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例2 27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例3 27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例4 27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例5 27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例6 27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例7 28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE1 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE2 30、Flink SQL之SQL 客户端通过kafka和filesystem的例子介绍了配置文件使用-表、视图等 32、Flink table api和SQL 之用户自定义 Sources Sinks实现及详细示例 41、Flink之Hive 方言介绍及详细示例 42、Flink 的table api与sql之Hive Catalog 43、Flink之Hive 读写及详细验证示例 44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的 文章目录 Flink 系列文章一、自定义函数1、概述2、标量函数-自定义函数说明及示例3、表值函数-自定义函数说明及示例4、聚合函数5、表值聚合函数1、示例1- 计算topN2、示例2 - emitUpdateWithRetract 方法使用老版本Planner可用 本文介绍了自定义函数的分类以及四种自定义函数实现的例子。 本文依赖flink、kafka集群能正常使用。 本文分为5个部分即自定义函数介绍、标量自定义函数、表值自定义函数、标量聚合函数和表值聚合函数的实现示例。 本文的示例如无特殊说明则是在Flink 1.17版本中运行。
一、自定义函数
自定义函数UDF是一种扩展开发机制可以用来在查询语句里调用难以用其他方式表达的频繁使用或自定义的逻辑。
自定义函数可以用 JVM 语言例如 Java 或 Scala或 Python 实现实现者可以在 UDF 中使用任意第三方库本文聚焦于使用 JVM 语言开发自定义函数。
1、概述
当前 Flink 有如下几种函数
标量函数将标量值转换成一个新标量值表值函数将标量值转换成新的行数据聚合函数将多行数据里的标量值转换成一个新标量值表值聚合函数将多行数据里的标量值转换成新的行数据异步表值函数是异步查询外部数据系统的特殊函数。 标量和表值函数已经使用了新的基于数据类型的类型系统聚合函数仍然使用基于 TypeInformation 的旧类型系统。 2、标量函数-自定义函数说明及示例
自定义标量函数可以把 0 到多个标量值映射成 1 个标量值数据类型里列出的任何数据类型都可作为求值方法的参数和返回值类型。
想要实现自定义标量函数你需要扩展 org.apache.flink.table.functions 里面的 ScalarFunction 并且实现一个或者多个求值方法。标量函数的行为取决于你写的求值方法。 求值方法必须是 public 的而且名字必须是 eval。 下面自定义函数是将balance加上万元以及求balance/age仅仅示例如何使用其运行结果在每次输出的代码后面注释的行。
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** author alanchan**/
public class TestUDScalarFunctionDemo {DataNoArgsConstructorAllArgsConstructorpublic static class User {private long id;private String name;private int age;private int balance;private Long rowtime;}final static ListUser userList Arrays.asList(new User(1L, alan, 18, 20,1698742358391L), new User(2L, alan, 19, 25,1698742359396L), new User(3L, alan, 25, 30,1698742360407L),new User(4L, alanchan, 28,35, 1698742361409L), new User(5L, alanchan, 29, 35,1698742362424L));public static class TestScalarFunction extends ScalarFunction {// 接受任意类型输入返回 String 型输出public String eval(DataTypeHint(inputGroup InputGroup.ANY) Object o) {return o.toString() (万元);}public double eval(Integer age, Integer balance) {return balance / age *1.0;}}/*** param args* throws Exception*/public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);DataStreamUser users env.fromCollection(userList);Table usersTable tenv.fromDataStream(users, $(id), $(name), $(age),$(balance), $(rowtime));//1、 在 Table API 里不经注册直接“内联”调用函数Table result usersTable.select($(id), $(name), call(TestScalarFunction.class, $(balance)));DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);
// resultDS.print();
// 11 (true,I[2, alan, 25 (万元)])
// 12 (true,I[3, alan, 30 (万元)])
// 13 (true,I[4, alanchan, 35 (万元)])
// 10 (true,I[1, alan, 20 (万元)])
// 14 (true,I[5, alanchan, 35 (万元)])Table result2 usersTable.select($(id), $(name), $(age), call(TestScalarFunction.class, $(balance)), call(TestScalarFunction.class, $(age), $(balance)));DataStreamTuple2Boolean, Row result2DS tenv.toRetractStream(result2, Row.class);
// result2DS.print();
// 9 (true,I[2, alan, 19, 25 (万元), 1.0])
// 10 (true,I[3, alan, 25, 30 (万元), 1.0])
// 12 (true,I[5, alanchan, 29, 35 (万元), 1.0])
// 11 (true,I[4, alanchan, 28, 35 (万元), 1.0])
// 8 (true,I[1, alan, 18, 20 (万元), 1.0])//2、 注册函数tenv.createTemporarySystemFunction(TestScalarFunction, TestScalarFunction.class);// 在 Table API 里调用注册好的函数Table result3 usersTable.select($(id), $(name),call(TestScalarFunction, $(balance)));DataStreamTuple2Boolean, Row result3DS tenv.toRetractStream(result3, Row.class);
// result3DS.print();
// 2 (true,I[4, alanchan, 35 (万元)])
// 3 (true,I[5, alanchan, 35 (万元)])
// 15 (true,I[1, alan, 20 (万元)])
// 16 (true,I[2, alan, 25 (万元)])
// 1 (true,I[3, alan, 30 (万元)])// 在 SQL 里调用注册好的函数tenv.createTemporaryView(user_view, users);Table result4 tenv.sqlQuery(SELECT id,name,TestScalarFunction(balance) ,TestScalarFunction(age,balance) FROM user_view);DataStreamTuple2Boolean, Row result4DS tenv.toRetractStream(result4, Row.class);result4DS.print();
// 14 (true,I[1, alan, 20 (万元), 1.0])
// 1 (true,I[4, alanchan, 35 (万元), 1.0])
// 2 (true,I[5, alanchan, 35 (万元), 1.0])
// 15 (true,I[2, alan, 25 (万元), 1.0])
// 16 (true,I[3, alan, 30 (万元), 1.0])env.execute();}}3、表值函数-自定义函数说明及示例
跟自定义标量函数一样自定义表值函数的输入参数也可以是 0 到多个标量。但是跟标量函数只能返回一个值不同的是它可以返回任意多行。返回的每一行可以包含 1 到多列如果输出行只包含 1 列会省略结构化信息并生成标量值这个标量值在运行阶段会隐式地包装进行里。
要定义一个表值函数你需要扩展 org.apache.flink.table.functions 下的 TableFunction可以通过实现多个名为 eval 的方法对求值方法进行重载。像其他函数一样输入和输出类型也可以通过反射自动提取出来。表值函数返回的表的类型取决于 TableFunction 类的泛型参数 T不同于标量函数表值函数的求值方法本身不包含返回类型而是通过 collect(T) 方法来发送要输出的行。
在 Table API 中表值函数是通过 .joinLateral(…) 或者 .leftOuterJoinLateral(…) 来使用的。joinLateral 算子会把外表算子左侧的表的每一行跟跟表值函数返回的所有行位于算子右侧进行 crossjoin。leftOuterJoinLateral 算子也是把外表算子左侧的表的每一行跟表值函数返回的所有行位于算子右侧进行crossjoin并且如果表值函数返回 0 行也会保留外表的这一行。
在 SQL 里面用 JOIN 或者 以 ON TRUE 为条件的 LEFT JOIN 来配合 LATERAL TABLE() 的使用。
下面示例中包含表值函数的四种应用方式。
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** author alanchan**/
public class TestUDTableFunctionDemo {DataNoArgsConstructorAllArgsConstructorpublic static class User {private long id;private String name;private int age;private int balance;private Long rowtime;}final static ListUser userList Arrays.asList(new User(1L, alan,chen, 18, 20,1698742358391L), new User(2L, alan,chen, 19, 25,1698742359396L), new User(3L, alan,chen, 25, 30,1698742360407L),new User(4L, alan,chan, 28,35, 1698742361409L), new User(5L, alan,chan, 29, 35,1698742362424L));FunctionHint(output DataTypeHint(ROWfirstName STRING, lastName String))public static class SplitFunction extends TableFunctionRow {public void eval(String str) {String[] names str.split(,);collect(Row.of(names[0],names[1]));
// for (String s : str.split(, )) {
// // use collect(...) to emit a row
// collect(Row.of(s, s.length()));
// }}}FunctionHint(output DataTypeHint(ROWid int, name String, age int, balance int, rowtime string))public static class OverloadedFunction extends TableFunctionRow {public void eval(String str) {String[] user str.split(,);collect(Row.of(Integer.valueOf(user[0]),user[1],Integer.valueOf(user[2]),Integer.valueOf(user[3]),user[4]));}}/*** param args* throws Exception */public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);DataStreamUser users env.fromCollection(userList);Table usersTable tenv.fromDataStream(users, $(id), $(name), $(age), $(balance), $(rowtime));// 1、 在 Table API 里不经注册直接“内联”调用函数Table result usersTable.joinLateral(call(SplitFunction.class, $(name))).select($(id), $(name),$(firstName),$(lastName));DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);
// resultDS.print();
// 11 (true,I[5, alan,chan, alan, chan])
// 7 (true,I[1, alan,chen, alan, chen])
// 9 (true,I[3, alan,chen, alan, chen])
// 10 (true,I[4, alan,chan, alan, chan])
// 8 (true,I[2, alan,chen, alan, chen])DataStreamString row env.fromCollection(//id name age balance rowtimeArrays.asList(11,alan,18,20,1699341167461,12,alan,19,25,1699341168464,13,alan,20,30,1699341169472,14,alanchan,18,22,1699341170479,15,alanchan,19,25,1699341171482));Table usersTable2 tenv.fromDataStream(row, $(userString));Table result2 usersTable2.joinLateral(call(OverloadedFunction.class, $(userString))).select($(userString),$(id),$(name),$(age),$(balance),$(rowtime)) ; DataStreamTuple2Boolean, Row result2DS tenv.toRetractStream(result2, Row.class);
// result2DS.print();
// 15 (true,I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
// 13 (true,I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 14 (true,I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
// 11 (true,I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
// 12 (true,I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])Table result3 usersTable2.leftOuterJoinLateral(call(OverloadedFunction.class, $(userString))).select($(userString),$(id),$(name),$(age),$(balance),$(rowtime)) ; DataStreamTuple2Boolean, Row result3DS tenv.toRetractStream(result3, Row.class);
// result3DS.print();
// 5 (true,I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 6 (true,I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
// 3 (true,I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
// 4 (true,I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])
// 7 (true,I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])// 在 Table API 里重命名函数字段Table result4 usersTable2.leftOuterJoinLateral(call(OverloadedFunction.class, $(userString)).as(t_id,t_name,t_age,t_balance,t_rowtime)).select($(userString),$(t_id),$(t_name),$(t_age),$(t_balance),$(t_rowtime)) ; DataStreamTuple2Boolean, Row result4DS tenv.toRetractStream(result4, Row.class);
// result4DS.print();
// 10 (true,I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
// 13 (true,I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
// 14 (true,I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
// 12 (true,I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 11 (true,I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])//2、 注册函数tenv.createTemporarySystemFunction(OverloadedFunction, OverloadedFunction.class);// 在 Table API 里调用注册好的函数Table result5 usersTable2.leftOuterJoinLateral(call(OverloadedFunction, $(userString)).as(t_id,t_name,t_age,t_balance,t_rowtime)).select($(userString),$(t_id),$(t_name),$(t_age),$(t_balance),$(t_rowtime)) ; DataStreamTuple2Boolean, Row result5DS tenv.toRetractStream(result5, Row.class);
// result5DS.print();
// 11 (true,I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
// 14 (true,I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
// 15 (true,I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
// 13 (true,I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 12 (true,I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])Table result6 usersTable2.joinLateral(call(OverloadedFunction, $(userString)).as(t_id,t_name,t_age,t_balance,t_rowtime)).select($(userString),$(t_id),$(t_name),$(t_age),$(t_balance),$(t_rowtime)) ; DataStreamTuple2Boolean, Row result6DS tenv.toRetractStream(result6, Row.class);
// result6DS.print();
// 8 (true,I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
// 9 (true,I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
// 5 (true,I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
// 7 (true,I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 6 (true,I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])//3、 在 SQL 里调用注册好的函数tenv.createTemporaryView(user_view, usersTable2);Table result7 tenv.sqlQuery(SELECT userString, id,name,age,balance,rowtime FROM user_view, LATERAL TABLE(OverloadedFunction(userString)));DataStreamTuple2Boolean, Row result7DS tenv.toRetractStream(result7, Row.class);
// result7DS.print();
// 15 (true,I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 13 (true,I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
// 1 (true,I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
// 14 (true,I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])
// 16 (true,I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])Table result8 tenv.sqlQuery(SELECT userString, id,name,age,balance,rowtime FROM user_view LEFT JOIN LATERAL TABLE( OverloadedFunction(userString)) ON TRUE );DataStreamTuple2Boolean, Row result8DS tenv.toRetractStream(result8, Row.class);
// result8DS.print();
// 13 (true,I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])
// 1 (true,I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
// 15 (true,I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 14 (true,I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])
// 16 (true,I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])//4、 在 SQL 里重命名函数字段Table result9 tenv.sqlQuery(SELECT userString, t_id, t_name,t_age,t_balance,t_rowtime FROM user_view LEFT JOIN LATERAL TABLE(OverloadedFunction(userString)) AS T(t_id, t_name,t_age,t_balance,t_rowtime) ON TRUE);DataStreamTuple2Boolean, Row result9DS tenv.toRetractStream(result9, Row.class);result9DS.print();
// 7 (true,I[12,alan,19,25,1699341168464, 12, alan, 19, 25, 1699341168464])
// 10 (true,I[15,alanchan,19,25,1699341171482, 15, alanchan, 19, 25, 1699341171482])
// 9 (true,I[14,alanchan,18,22,1699341170479, 14, alanchan, 18, 22, 1699341170479])
// 8 (true,I[13,alan,20,30,1699341169472, 13, alan, 20, 30, 1699341169472])
// 6 (true,I[11,alan,18,20,1699341167461, 11, alan, 18, 20, 1699341167461])env.execute();}}4、聚合函数
自定义聚合函数UDAGG是把一个表一行或者多行每行可以有一列或者多列聚合成一个标量值。
该示例包含以下三个功能
定义一个聚合函数来计算某一列的加权平均在 TableEnvironment 中注册函数在查询中使用函数
为了计算加权平均值accumulator 需要存储加权总和以及数据的条数。 在例子里定义了一个类 Aalan_WeightedAvgAccum 来作为 accumulator。Flink 的 checkpoint 机制会自动保存 accumulator在失败时进行恢复以此来保证精确一次的语义。
例子的WeightedAvgAggregateFunction自定义聚合函数的 accumulate 方法有三个输入参数。 第一个是 Aalan_WeightedAvgAccum accumulator 另外两个是用户自定义的输入输入的值 ivaluebalance 和 输入的权重 iweightage。
尽管 retract()、merge()、resetAccumulator() 这几个方法在大多数聚合类型中都不是必须实现的样例中提供了他们的实现。 在 Scala 样例中也是用的是 Java 的基础类型并且定义了 getResultType() 和 getAccumulatorType()因为 Flink 的类型推导对于 Scala 的类型推导做的不是很好。 import static org.apache.flink.table.api.Expressions.$;import java.util.Arrays;
import java.util.Iterator;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** author alanchan**/
public class TestUDAGGDemo {// 加权平均累加器bean加上名称以示区别避免混淆public static class Aalan_WeightedAvgAccum {public long sum 0;public int count 0;}// 聚合函数的自定义实现计算加权平均public static class WeightedAvgAggregateFunction extends AggregateFunctionLong, Aalan_WeightedAvgAccum {/*** 创建和初始化aggregate function 的Accumulator 方法*/Overridepublic Aalan_WeightedAvgAccum createAccumulator() {return new Aalan_WeightedAvgAccum();}/*** 每次应该具体化materialized聚合结果时调用。 返回的值可能是早期且不完整的结果随着数据的到达而定期发出也可能是聚合的最终结果。*/Overridepublic Long getValue(Aalan_WeightedAvgAccum acc) {if (acc.count 0) {return null;} else {return acc.sum / acc.count;}}/*** 处理输入值并更新提供的累加器实例。方法accumulate 可以用不同的自定义类型和参数重载。聚合函数至少需要一个accumulate方法。* * param acc 累加器bean,包含当前汇总结果的累加器* param iValue 输入的需要的累加值* param iWeight 输入的需要累加的值的权重*/public void accumulate(Aalan_WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum iValue * iWeight;acc.count iWeight;}/*** 收回累加器实例中的输入值。当前设计假设输入是先前累积的值。收回方法可以是重载了不同的自定义类型和参数。 此功能在datastream的有界流基于over* aggregate必须被实现。* * param acc 累加器bean,包含当前汇总结果的累加器* param iValue 输入的需要的累加值* param iWeight 输入的需要累加的值的权重*/public void retract(Aalan_WeightedAvgAccum acc, long iValue, int iWeight) {acc.sum - iValue * iWeight;acc.count - iWeight;}/*** 将一组accumulator 实例合并为一个accumulator 实例。 该函数在datastream session window的分组聚合 和* 有界流的分组聚合必须实现。* * param acc 累加器用于保存合并后的聚合结果。 应该注意的是累加器可以包含先前的聚合结果。 因此用户不应在自定义合并方法中替换或清除此实例。* param it 指向将被合并的一组累加器的Iterable。*/public void merge(Aalan_WeightedAvgAccum acc, IterableAalan_WeightedAvgAccum it) {IteratorAalan_WeightedAvgAccum iter it.iterator();while (iter.hasNext()) {Aalan_WeightedAvgAccum a iter.next();acc.count a.count;acc.sum a.sum;}}/*** 重置此[[AggregateFunction]]的累加器。必须为有界分组聚合实现此函数。* * param acc*/public void resetAccumulator(Aalan_WeightedAvgAccum acc) {acc.count 0;acc.sum 0L;}}DataNoArgsConstructorAllArgsConstructorpublic static class User {private long id;private String name;private int age;private long balance;private Long rowtime;}final static ListUser userList Arrays.asList(new User(1L, alan, 18, 20, 1698742358391L), new User(2L, alan, 19, 25, 1698742359396L),new User(3L, alan, 25, 30, 1698742360407L), new User(4L, alanchan, 28, 35, 1698742361409L), new User(5L, alanchan, 29, 35, 1698742362424L));public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 将聚合函数注册为函数tenv.registerFunction(alan_weightavgAF, new WeightedAvgAggregateFunction());DataStreamUser users env.fromCollection(userList);Table usersTable tenv.fromDataStream(users, $(id), $(name), $(age), $(balance), $(rowtime));tenv.createTemporaryView(user_view, users);// 使用函数String sql SELECT name, alan_weightavgAF(balance, age) AS avgPoints FROM user_view GROUP BY name;Table result tenv.sqlQuery(sql);DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print();
// 16 (true,I[alanchan, 35])
// 2 (true,I[alan, 20])
// 2 (false,-U[alan, 20])
// 2 (true,U[alan, 22])
// 2 (false,-U[alan, 22])
// 2 (true,U[alan, 25])env.execute();}}5、表值聚合函数
自定义表值聚合函数UDTAGG可以把一个表一行或者多行每行有一列或者多列聚合成另一张表结果中可以有多行多列。
1、示例1- 计算topN
下面的例子展示了如何
定义一个 TableAggregateFunction 来计算给定列的最大的 3 个值在 TableEnvironment 中注册函数在 Table API 查询中使用函数当前只在 Table API 中支持 TableAggregateFunction
为了计算最大的 3 个值accumulator 需要保存当前看到的最大的 3 个值。 在例子中定义了类 TopAccum 来作为 accumulator。Flink 的 checkpoint 机制会自动保存 accumulator并且在失败时进行恢复来保证精确一次的语义。
TopTableAggregateFunction 表值聚合函数TableAggregateFunction的 accumulate() 方法有两个输入 第一个是 TopAccum accumulator 另一个是用户定义的输入输入的值 v。
尽管 merge() 方法在大多数聚合类型中不是必须的也在样例中提供了它的实现。 在 Scala 样例中也使用的是 Java 的基础类型并且定义了 getResultType() 和 getAccumulatorType() 方法因为 Flink 的类型推导对于 Scala 的类型推导支持的不是很好。 import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** author alanchan**/
public class TestUDTAGGDemo {/*** Accumulator for Top3**/Datapublic static class TopAccum {public Integer first;public Integer second;public Integer third;}public static class TopTableAggregateFunction extends TableAggregateFunctionTuple2Integer, Integer, TopAccum {Overridepublic TopAccum createAccumulator() {TopAccum acc new TopAccum();acc.first Integer.MIN_VALUE;acc.second Integer.MIN_VALUE;acc.third Integer.MIN_VALUE;return acc;}public void accumulate(TopAccum acc, Integer v) {if (v acc.first) {acc.third acc.second;acc.second acc.first;acc.first v;} else if (v acc.second) {acc.third acc.second;acc.second v;} else if (v acc.third) {acc.third v;}}public void merge(TopAccum acc, java.lang.IterableTopAccum iterable) {for (TopAccum otherAcc : iterable) {accumulate(acc, otherAcc.first);accumulate(acc, otherAcc.second);accumulate(acc, otherAcc.third);}}public void emitValue(TopAccum acc, CollectorTuple2Integer, Integer out) {
// System.out.println(acc:acc);// emit the value and rankif (acc.first ! Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.first, 1));}if (acc.second ! Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.second, 2));}if (acc.third ! Integer.MIN_VALUE) {out.collect(Tuple2.of(acc.third, 3));}}}DataNoArgsConstructorAllArgsConstructorpublic static class User {private long id;private String name;private int age;private int balance;private Long rowtime;}final static ListUser userList Arrays.asList(new User(1L, alan, 18, 20, 1698742358391L), new User(2L, alan, 19, 25, 1698742359396L),new User(3L, alan, 25, 30, 1698742360407L), new User(4L, alanchan, 28, 35, 1698742361409L), new User(5L, alanchan, 29, 35, 1698742362424L));public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tenv StreamTableEnvironment.create(env);// 将聚合函数注册为函数tenv.registerFunction(top, new TopTableAggregateFunction());DataStreamUser users env.fromCollection(userList);Table usersTable tenv.fromDataStream(users, $(id), $(name), $(age), $(balance), $(rowtime));// 使用函数Table result usersTable.groupBy($(name)).flatAggregate(call(top, $(balance))).select($(name), $(f0).as(balance), $(f1).as(rank));DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print();
// 2 (true,I[alan, 20, 1])
// 16 (true,I[alanchan, 35, 1])
// 2 (false,-D[alan, 20, 1])
// 16 (false,-D[alanchan, 35, 1])
// 2 (true,I[alan, 25, 1])
// 16 (true,I[alanchan, 35, 1])
// 2 (true,I[alan, 20, 2])
// 16 (true,I[alanchan, 35, 2])
// 2 (false,-D[alan, 25, 1])
// 2 (false,-D[alan, 20, 2])
// 2 (true,I[alan, 30, 1])
// 2 (true,I[alan, 25, 2])
// 2 (true,I[alan, 20, 3])env.execute();}}2、示例2 - emitUpdateWithRetract 方法使用老版本Planner可用
下面的例子展示了如何使用 emitUpdateWithRetract 方法来只发送更新的数据。 为了只发送更新的结果accumulator 保存了上一次的最大的3个值也保存了当前最大的3个值。 如果 TopN 中的 n 非常大这种既保存上次的结果也保存当前的结果的方式不太高效。 一种解决这种问题的方式是把输入数据直接存储到 accumulator 中然后在调用 emitUpdateWithRetract 方法时再进行计算。 需要特别说明的是 下面的示例需要使用到useOldPlanner对应的planner的maven依赖见下文
!-- flink执行计划,这是1.9版本之前的--dependencygroupIdorg.apache.flink/groupIdartifactIdflink-table-planner_2.11/artifactIdversion1.13.6/version/dependency如果flink的版本比较高的话下面的示例将不能运行因为新版本的Builder没有useOldPlanner()方法了已经移除。不能构造EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build(); //新版本该方法已经被移除Deprecatedpublic Builder useOldPlanner() {this.plannerClass OLD_PLANNER_FACTORY;this.executorClass OLD_EXECUTOR_FACTORY;return this;}如果使用OldPlanner的话emitValue和emitUpdateWithRetract仅需定义一个就可以了并且emitUpdateWithRetract的优先级大于emitValue。但是在Blink Planner里只看有没有定义emitValue。 也即在Blink Planner中只能使用emitValue不能使用emitUpdateWithRetract。 否则会报如下异常 Exception in thread “main” org.apache.flink.table.api.ValidationException: Could not find an implementation method ‘emitValue’ in class ‘org.tablesql.udf.TestUDTAGGDemo2$TopNTableAggregateFunction’ for function ‘TopNTableAggregateFunction’ that matches the following signature: void emitValue(org.tablesql.udf.TestUDTAGGDemo2.TopNAccum, org.apache.flink.util.Collector) 具体示例如下 import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;import java.util.Arrays;
import java.util.List;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.types.Row;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;/*** author alanchan**/
public class TestUDTAGGDemo2 {Datapublic static class TopNAccum {public Integer first;public Integer second;public Integer third;public Integer oldFirst;public Integer oldSecond;public Integer oldThird;}/*** 自定义聚合函数实现* * author alanchan**/public static class TopNTableAggregateFunction extends TableAggregateFunctionTuple2Integer, Integer, TopNAccum {Overridepublic TopNAccum createAccumulator() {TopNAccum topNAccum new TopNAccum();topNAccum.first Integer.MIN_VALUE;topNAccum.second Integer.MIN_VALUE;topNAccum.third Integer.MIN_VALUE;topNAccum.oldFirst Integer.MIN_VALUE;topNAccum.oldSecond Integer.MIN_VALUE;topNAccum.oldThird Integer.MIN_VALUE;return topNAccum;}public void accumulate(TopNAccum acc, Integer v) {if (v acc.first) {acc.third acc.second;acc.second acc.first;acc.first v;} else if (v acc.second) {acc.third acc.second;acc.second v;} else if (v acc.third) {acc.third v;}}public void emitUpdateWithRetract(TopNAccum acc, RetractableCollectorTuple2Integer, Integer out) {System.out.println(emitUpdateWithRetract----acc: acc);if (!acc.first.equals(acc.oldFirst)) {// if there is an update, retract old value then emit new value.if (acc.oldFirst ! Integer.MIN_VALUE) {out.retract(Tuple2.of(acc.oldFirst, 1));}out.collect(Tuple2.of(acc.first, 1));acc.oldFirst acc.first;}if (!acc.second.equals(acc.oldSecond)) {// if there is an update, retract old value then emit new value.if (acc.oldSecond ! Integer.MIN_VALUE) {out.retract(Tuple2.of(acc.oldSecond, 2));}out.collect(Tuple2.of(acc.second, 2));acc.oldSecond acc.second;}if (!acc.third.equals(acc.oldThird)) {// if there is an update, retract old value then emit new value.if (acc.oldThird ! Integer.MIN_VALUE) {out.retract(Tuple2.of(acc.oldThird, 3));}out.collect(Tuple2.of(acc.third, 3));acc.oldThird acc.third;}}}DataNoArgsConstructorAllArgsConstructorpublic static class User {private long id;private String name;private int age;private int balance;private Long rowtime;}final static ListUser userList Arrays.asList(new User(1L, alan, 18, 20, 1698742358391L), new User(2L, alan, 19, 25, 1698742359396L),new User(3L, alan, 25, 30, 1698742360407L), new User(11L, alan, 28, 31, 1698742358391L), new User(12L, alan, 29, 32, 1698742359396L),new User(13L, alan, 35, 35, 1698742360407L), new User(23L, alan, 45, 36, 1698742360407L), new User(14L, alanchan, 28, 15, 1698742361409L), new User(15L, alanchan, 29, 16, 1698742362424L), new User(24L, alanchan, 30, 20, 1698742361409L),new User(25L, alanchan, 31, 22, 1698742362424L), new User(34L, alanchan, 32, 24, 1698742361409L), new User(35L, alanchan, 33, 26, 1698742362424L),new User(44L, alanchan, 34, 28, 1698742361409L), new User(55L, alanchan, 35, 35, 1698742362424L));public static void main(String[] args) throws Exception {StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
// env.setParallelism(1);EnvironmentSettings settings EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();StreamTableEnvironment tenv StreamTableEnvironment.create(env, settings);// 将聚合函数注册为函数tenv.registerFunction(topN, new TopNTableAggregateFunction());DataStreamUser users env.fromCollection(userList);Table usersTable tenv.fromDataStream(users, $(id), $(name), $(age), $(balance), $(rowtime));// 使用函数Table result usersTable.groupBy($(name)).flatAggregate(call(topN, $(balance))).select($(name), $(f0).as(balance), $(f1).as(rank));DataStreamTuple2Boolean, Row resultDS tenv.toRetractStream(result, Row.class);resultDS.print();env.execute();}}
运行结果如下
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first20, second-2147483648, third-2147483648, oldFirst-2147483648, oldSecond-2147483648, oldThird-2147483648)
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first15, second-2147483648, third-2147483648, oldFirst-2147483648, oldSecond-2147483648, oldThird-2147483648)
14 (true,I[alan, 20, 1])
9 (true,I[alanchan, 15, 1])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first25, second20, third-2147483648, oldFirst20, oldSecond-2147483648, oldThird-2147483648)
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first16, second15, third-2147483648, oldFirst15, oldSecond-2147483648, oldThird-2147483648)
14 (false,I[alan, 20, 1])
14 (true,I[alan, 25, 1])
14 (true,I[alan, 20, 2])
9 (false,I[alanchan, 15, 1])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first30, second25, third20, oldFirst25, oldSecond20, oldThird-2147483648)
14 (false,I[alan, 25, 1])
9 (true,I[alanchan, 16, 1])
14 (true,I[alan, 30, 1])
14 (false,I[alan, 20, 2])
9 (true,I[alanchan, 15, 2])
14 (true,I[alan, 25, 2])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first20, second16, third15, oldFirst16, oldSecond15, oldThird-2147483648)
14 (true,I[alan, 20, 3])
9 (false,I[alanchan, 16, 1])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first31, second30, third25, oldFirst30, oldSecond25, oldThird20)
9 (true,I[alanchan, 20, 1])
9 (false,I[alanchan, 15, 2])
14 (false,I[alan, 30, 1])
9 (true,I[alanchan, 16, 2])
14 (true,I[alan, 31, 1])
9 (true,I[alanchan, 15, 3])
14 (false,I[alan, 25, 2])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first22, second20, third16, oldFirst20, oldSecond16, oldThird15)
14 (true,I[alan, 30, 2])
9 (false,I[alanchan, 20, 1])
14 (false,I[alan, 20, 3])
9 (true,I[alanchan, 22, 1])
14 (true,I[alan, 25, 3])
9 (false,I[alanchan, 16, 2])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first32, second31, third30, oldFirst31, oldSecond30, oldThird25)
9 (true,I[alanchan, 20, 2])
9 (false,I[alanchan, 15, 3])
9 (true,I[alanchan, 16, 3])
14 (false,I[alan, 31, 1])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first24, second22, third20, oldFirst22, oldSecond20, oldThird16)
14 (true,I[alan, 32, 1])
9 (false,I[alanchan, 22, 1])
14 (false,I[alan, 30, 2])
9 (true,I[alanchan, 24, 1])
9 (false,I[alanchan, 20, 2])
14 (true,I[alan, 31, 2])
9 (true,I[alanchan, 22, 2])
14 (false,I[alan, 25, 3])
9 (false,I[alanchan, 16, 3])
14 (true,I[alan, 30, 3])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first35, second32, third31, oldFirst32, oldSecond31, oldThird30)
9 (true,I[alanchan, 20, 3])
14 (false,I[alan, 32, 1])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first26, second24, third22, oldFirst24, oldSecond22, oldThird20)
14 (true,I[alan, 35, 1])
9 (false,I[alanchan, 24, 1])
14 (false,I[alan, 31, 2])
9 (true,I[alanchan, 26, 1])
14 (true,I[alan, 32, 2])
9 (false,I[alanchan, 22, 2])
14 (false,I[alan, 30, 3])
9 (true,I[alanchan, 24, 2])
14 (true,I[alan, 31, 3])
9 (false,I[alanchan, 20, 3])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first36, second35, third32, oldFirst35, oldSecond32, oldThird31)
9 (true,I[alanchan, 22, 3])
14 (false,I[alan, 35, 1])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first28, second26, third24, oldFirst26, oldSecond24, oldThird22)
14 (true,I[alan, 36, 1])
9 (false,I[alanchan, 26, 1])
14 (false,I[alan, 32, 2])
9 (true,I[alanchan, 28, 1])
14 (true,I[alan, 35, 2])
9 (false,I[alanchan, 24, 2])
9 (true,I[alanchan, 26, 2])
14 (false,I[alan, 31, 3])
9 (false,I[alanchan, 22, 3])
14 (true,I[alan, 32, 3])
9 (true,I[alanchan, 24, 3])
emitUpdateWithRetract----acc:TestUDTAGGDemo2.TopNAccum(first35, second28, third26, oldFirst28, oldSecond26, oldThird24)
9 (false,I[alanchan, 28, 1])
9 (true,I[alanchan, 35, 1])
9 (false,I[alanchan, 26, 2])
9 (true,I[alanchan, 28, 2])
9 (false,I[alanchan, 24, 3])
9 (true,I[alanchan, 26, 3])
以上介绍了自定义函数的分类以及四种自定义函数实现的例子。