网站流量建设,律师网站建设公司,html投票代码,怎么进入网站管理系统1、概述
Apache Flink提供了两个关系API——Table API和SQL——用于统一的流和批处理。Table API是一个用于Java、Scala和Python的语言集成查询API#xff0c;它允许以非常直观的方式组合来自关系操作符(如选择、过滤和连接)的查询。Flink的SQL支持基于Apache Calcite#x…1、概述
Apache Flink提供了两个关系API——Table API和SQL——用于统一的流和批处理。Table API是一个用于Java、Scala和Python的语言集成查询API它允许以非常直观的方式组合来自关系操作符(如选择、过滤和连接)的查询。Flink的SQL支持基于Apache Calcite它实现了SQL标准。无论输入是连续的(streaming流)还是有界的(batch批处理)在两个接口中指定的查询都具有相同的语义并指定相同的结果。
Table API和SQL接口与Flink的DataStream API无缝集成。您可以轻松地在所有api和构建在它们之上的库之间切换。例如您可以使用MATCH_RECOGNIZE子句从表中检测模式然后使用DataStream API基于匹配的模式构建警报。
1.1 表程序依赖关系
为了使用Table API和SQL定义数据管道您需要将Table API作为一个依赖项添加到项目中。
有关如何为Java和Scala配置这些依赖项的更多信息请参阅项目配置部分。
如果您正在使用Python请参考Python API的文档
1.2 下一步做什么
概念和公共API:Table API和SQL的共享概念和API。数据类型:列出预定义的数据类型及其属性。流式概念:针对表API或SQL的特定于流式的文档例如时间属性的配置和更新结果的处理。连接到外部系统:用于向外部系统读写数据的可用连接器和格式。Table API:支持的Table API的操作和API。SQL:支持的SQL操作和语法。内置函数: 表API和SQL中支持的函数。SQL Client:使用Flink SQL在没有编程知识的情况下向集群提交一个表程序。SQL网关:一种服务它允许多个客户端从远程并发地执行SQL。SQL JDBC驱动程序: 向sql-gateway提交SQL语句的JDBC驱动程序。
2、概念和通用API
Table API和SQL集成在一个联合API中。这个API的核心概念是一个表Table它作为查询的输入和输出。本文档展示了具有表API和SQL查询的程序的公共结构如何注册表如何查询表以及如何发出(emit)表。
2.1 Table API和SQL程序的结构
下面的代码示例展示了表API和SQL程序的公共结构。
import org.apache.flink.table.api.*;
import org.apache.flink.connector.datagen.table.DataGenConnectorOptions;// Create a TableEnvironment for batch or streaming execution.
// See the Create a TableEnvironment section for details.
TableEnvironment tableEnv TableEnvironment.create(/*…*/);// Create a source table
tableEnv.createTemporaryTable(SourceTable, TableDescriptor.forConnector(datagen).schema(Schema.newBuilder().column(f0, DataTypes.STRING()).build()).option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L).build());// Create a sink table (using SQL DDL)
tableEnv.executeSql(CREATE TEMPORARY TABLE SinkTable WITH (connector blackhole) LIKE SourceTable (EXCLUDING OPTIONS) );// Create a Table object from a Table API query
Table table1 tableEnv.from(SourceTable);// Create a Table object from a SQL query
Table table2 tableEnv.sqlQuery(SELECT * FROM SourceTable);// Emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult table1.insertInto(SinkTable).execute();表API和SQL查询可以很容易地集成并嵌入到数据流程序(DataStream programs)中。请查看数据流API集成页面了解如何将数据流转换为表反之亦然。 2.2 创建一个表环境(TableEnvironment)
TableEnvironment 是表API和SQL集成的入口点它负责:
Registering a Table in the internal catalogRegistering catalogsLoading pluggable modulesExecuting SQL queriesRegistering a user-defined (scalar, table, or aggregation) functionConverting between DataStream and Table (in case of StreamTableEnvironment)
Table 总是绑定到一个特定的TableEnvironment。不可能在同一个查询中组合不同TableEnvironments中的表(tables)例如连接或联合它们。TableEnvironment是通过调用静态TableEnvironment.create()方法创建的。
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;EnvironmentSettings settings EnvironmentSettings.newInstance().inStreamingMode()//.inBatchMode().build();TableEnvironment tEnv TableEnvironment.create(settings);或者用户可以从现有的StreamExecutionEnvironment中创建一个StreamTableEnvironment来与DataStream API进行互操作。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv StreamTableEnvironment.create(env);2.3 在编目Catalog 中创建表
TableEnvironment维护用标识符创建的表的编目映射。每个标识符由3部分组成:编目名、数据库名和对象名。如果未指定编目或数据库则将使用当前默认值(请参阅表标识符展开部分中的示例)。
表可以是虚表(VIEWS)或常规表(TABLES)。VIEWS可以从现有的Table对象(通常是Table API或SQL查询的结果)创建。TABLES 描述外部数据如文件、数据库表或消息队列。
2.3.1 临时表与永久表
表可以是临时的绑定到单个Flink会话的生命周期也可以是永久的并且在多个Flink会话和集群中可见。
永久表需要一个编目(catalog)(比如Hive Metastore)来维护表的元数据。永久表创建后它对连接到编目的任何Flink会话都是可见的并且将继续存在直到表被显式删除。
另一方面临时表总是存储在内存中并且只在创建它们的Flink会话期间存在。这些表对其他会话是不可见的。它们不绑定到任何编目或数据库但可以在其中一个名称空间中创建。如果临时表对应的数据库被删除则临时表不会被删除。
遮蔽(Shadowing)
可以注册具有与现有永久表相同标识符的临时表。临时表会遮蔽永久表并且只要临时表存在永久表就无法访问。所有带有该标识符的查询都将对临时表执行。
这可能对实验有用。它允许首先对临时表运行完全相同的查询例如只有一个数据子集或者数据被混淆。一旦验证查询是正确的就可以针对实际的生产表运行它。
2.3.2 创建表
虚拟表
在SQL术语中Table API对象对应于VIEW(虚拟表)。它封装了一个逻辑查询计划。它可以在编目(catalog)中创建如下所示:
// get a TableEnvironment
TableEnvironment tableEnv ...; // see Create a TableEnvironment section// table is the result of a simple projection query
Table projTable tableEnv.from(X).select(...);// register the Table projTable as table projectedTable
tableEnv.createTemporaryView(projectedTable, projTable);注意:Table 对象类似于关系数据库系统中的VIEW对象也就是说定义表(Table)的查询没有被优化但是当另一个查询引用注册的表时将被内联。如果多个查询引用同一个已注册表则每个引用查询将被内联并执行多次即注册表的结果将不会被共享。
Connector Tables
还可以通过连接器声明创建关系数据库中已知的TABLE。连接器描述了存储表数据的外部系统。存储系统如Apache Kafka或常规文件系统可以在这里声明。
这样的表可以直接使用表API创建,也可以通过切换到SQL DDL来创建。
// Using table descriptors
final TableDescriptor sourceDescriptor TableDescriptor.forConnector(datagen).schema(Schema.newBuilder().column(f0, DataTypes.STRING()).build()).option(DataGenConnectorOptions.ROWS_PER_SECOND, 100L).build();tableEnv.createTable(SourceTableA, sourceDescriptor);
tableEnv.createTemporaryTable(SourceTableB, sourceDescriptor);// Using SQL DDL
tableEnv.executeSql(CREATE [TEMPORARY] TABLE MyTable (...) WITH (...));2.3.3 扩展表标识符
表总是使用由编目、数据库和表名组成的3部分标识符注册。
用户可以将其中的一个编目和一个数据库设置为“当前编目”和“当前数据库”。有了它们上面提到的3部分标识符中的前两个部分可以是可选的——如果没有提供它们将引用当前编目和当前数据库。用户可以通过表API或SQL切换当前编目和当前数据库。
标识符遵循SQL要求这意味着它们可以用反号字符()进行转义。
TableEnvironment tEnv ...;
tEnv.useCatalog(custom_catalog);
tEnv.useDatabase(custom_database);Table table ...;// register the view named exampleView in the catalog named custom_catalog
// in the database named custom_database
tableEnv.createTemporaryView(exampleView, table);// register the view named exampleView in the catalog named custom_catalog
// in the database named other_database
tableEnv.createTemporaryView(other_database.exampleView, table);// register the view named example.View in the catalog named custom_catalog
// in the database named custom_database
tableEnv.createTemporaryView(example.View, table);// register the view named exampleView in the catalog named other_catalog
// in the database named other_database
tableEnv.createTemporaryView(other_catalog.other_database.exampleView, table);2.4 查询表
Table API
Table API是用于Scala和Java的语言集成查询API。与SQL不同查询没有指定为Strings 而是在宿主语言中逐步组成。
这个API基于Table类它表示一个表(流或批处理)并提供应用关系操作的方法。这些方法返回一个新的Table对象它表示对输入Table应用关系操作的结果。一些关系操作由多个方法调用组成例如table.groupBy(...).select()其中groupBy(…)指定表的分组而select(…)是表分组上的投影。
Table API文档描述了流和批处理表上支持的所有Table API操作。
下面的例子展示了一个简单的Table API聚合查询:
// get a TableEnvironment
TableEnvironment tableEnv ...; // see Create a TableEnvironment section// register Orders table// scan registered Orders table
Table orders tableEnv.from(Orders);
// compute revenue for all customers from France
Table revenue orders.filter($(cCountry).isEqual(FRANCE)).groupBy($(cID), $(cName)).select($(cID), $(cName), $(revenue).sum().as(revSum));// emit or convert Table
// execute querySQL
Flink的SQL集成基于Apache Calcite它实现了SQL标准。SQL查询被指定为常规字符串。
SQL文档描述了Flink对流和批处理表的SQL支持。
下面的示例展示了如何指定查询并将结果作为Table返回。
// get a TableEnvironment
TableEnvironment tableEnv ...; // see Create a TableEnvironment section// register Orders table// compute revenue for all customers from France
Table revenue tableEnv.sqlQuery(SELECT cID, cName, SUM(revenue) AS revSum FROM Orders WHERE cCountry FRANCE GROUP BY cID, cName);// emit or convert Table
// execute query下面的示例展示了如何指定将其结果插入到已注册表中的更新查询。
// get a TableEnvironment
TableEnvironment tableEnv ...; // see Create a TableEnvironment section// register Orders table
// register RevenueFrance output table// compute revenue for all customers from France and emit to RevenueFrance
tableEnv.executeSql(INSERT INTO RevenueFrance SELECT cID, cName, SUM(revenue) AS revSum FROM Orders WHERE cCountry FRANCE GROUP BY cID, cName);混合表API和SQL
Table API和SQL查询可以很容易地混合因为两者都返回Table对象:
可以在SQL查询返回的Table对象上定义Table API查询。通过在TableEnvironment中注册结果表并在SQL查询的FROM子句中引用它可以根据表API查询的结果定义SQL查询。
2.5 Emit a Table
通过将表写入TableSink来发出(emitted)表。TableSink是一个通用接口支持多种文件格式(例如CSV, Apache Parquet, Apache Avro)存储系统(例如JDBC, Apache HBase, Apache Cassandra, Elasticsearch)或者消息系统(例如Apache Kafka, RabbitMQ)。
批处理Table 只能写入BatchTableSink而流Table 需要AppendStreamTableSink, RetractStreamTableSink或UpsertStreamTableSink。
请参阅有关表源和接收器的文档了解有关可用接收器的详细信息以及如何实现自定义DynamicTableSink的说明。
Table.insertInto(String tableName)方法定义了一个完整的端到端管道将源表发送到已注册的接收表。该方法根据名称从编目中查找表接收器并验证Table 的模式(Schema)是否与接收器的模式相同。管道可以用TablePipeline.explain()解释并调用TablePipeline.execute()执行。
下面的例子展示了如何发出(emit)一个Table:
// get a TableEnvironment
TableEnvironment tableEnv ...; // see Create a TableEnvironment section// create an output Table
final Schema schema Schema.newBuilder().column(a, DataTypes.INT()).column(b, DataTypes.STRING()).column(c, DataTypes.BIGINT()).build();tableEnv.createTemporaryTable(CsvSinkTable, TableDescriptor.forConnector(filesystem).schema(schema).option(path, /path/to/file).format(FormatDescriptor.forFormat(csv).option(field-delimiter, |).build()).build());// compute a result Table using Table API operators and/or SQL queries
Table result ...;// Prepare the insert into pipeline
TablePipeline pipeline result.insertInto(CsvSinkTable);// Print explain details
pipeline.printExplain();// emit the result Table to the registered TableSink
pipeline.execute();2.6 翻译并执行查询
表API和SQL查询被转换为数据流程序无论它们的输入是流还是批处理。查询在内部表示为逻辑查询计划并分两个阶段进行转换:
Optimization of the logical plan,Translation into a DataStream program.
表API或SQL查询在以下情况下被转换:
调用TableEnvironment.executeSql()。此方法用于执行给定的语句并且在调用此方法后立即转换sql查询。TablePipeline.execute()被调用。此方法用于执行源到接收器的管道并且在调用此方法后立即转换Table API程序。调用Table.execute()。此方法用于将表内容收集到本地客户机并且在调用此方法后立即转换table API。调用StatementSet.execute()。TablePipeline(通过StatementSet.add()发送到sink)或INSERT语句(通过StatementSet. addinsertsql()指定)将首先在StatementSet中进行缓冲。一旦StatementSet.execute()被调用它们就会被转换。所有的汇将被优化为一个DAG。Table 在转换为DataStream 时进行转换(请参阅与数据流的集成)。转换后它是一个常规的数据流程序并在调用StreamExecutionEnvironment.execute()时执行。
2.7 查询优化
Apache Flink利用并扩展了Apache Calcite来执行复杂的查询优化。这包括一系列基于规则和成本的优化例如:
Subquery decorrelation based on Apache CalciteProject pruningPartition pruningFilter push-downSub-plan deduplication to avoid duplicate computationSpecial subquery rewriting, including two parts: Converts IN and EXISTS into left semi-joins (将IN和EXISTS转换为左半连接)Converts NOT IN and NOT EXISTS into left anti-join (将NOT IN和NOT EXISTS转换为左反连接) Optional join reordering (可选的连接重排序) Enabled via table.optimizer.join-reorder-enabled 注意: IN/EXISTS/NOT IN/NOT EXISTS目前只支持子查询重写的合词条件。 优化器不仅根据计划还根据数据源提供的丰富统计信息和每个操作的细粒度成本(如io、cpu、网络和内存)做出智能决策。
高级用户可以通过CalciteConfig对象提供自定义优化该对象可以通过调用TableEnvironment#getConfig#setPlannerConfig提供给表环境。
2.8 Explaining a Table
Table API提供了一种机制来解释用于计算Table的逻辑和优化的查询计划。这是通过Table.explain()方法或StatementSet.explain()方法完成的。Table.explain()返回一个表的计划。StatementSet.explain() 返回多个接收点的计划。它返回一个字符串描述三个计划:
关系查询的抽象语法树即未优化的逻辑查询计划、优化的逻辑查询计划和物理执行计划。
TableEnvironment.explainSql()和TableEnvironment.executeSql()支持执行EXPLAIN语句来获取计划请参考EXPLAIN页面。
下面的代码显示了一个使用Table.explain()方法的示例和给定表的相应输出:
StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv StreamTableEnvironment.create(env);DataStreamTuple2Integer, String stream1 env.fromElements(new Tuple2(1, hello));
DataStreamTuple2Integer, String stream2 env.fromElements(new Tuple2(1, hello));// explain Table API
Table table1 tEnv.fromDataStream(stream1, $(count), $(word));
Table table2 tEnv.fromDataStream(stream2, $(count), $(word));
Table table table1.where($(word).like(F%)).unionAll(table2);System.out.println(table.explain());上面示例的结果是 Abstract Syntax Tree
LogicalUnion(all[true])
:- LogicalFilter(condition[LIKE($1, _UTF-16LEF%)])
: - LogicalTableScan(table[[Unregistered_DataStream_1]])
- LogicalTableScan(table[[Unregistered_DataStream_2]]) Optimized Physical Plan
Union(all[true], union[count, word])
:- Calc(select[count, word], where[LIKE(word, _UTF-16LEF%)])
: - DataStreamScan(table[[Unregistered_DataStream_1]], fields[count, word])
- DataStreamScan(table[[Unregistered_DataStream_2]], fields[count, word]) Optimized Execution Plan
Union(all[true], union[count, word])
:- Calc(select[count, word], where[LIKE(word, _UTF-16LEF%)])
: - DataStreamScan(table[[Unregistered_DataStream_1]], fields[count, word])
- DataStreamScan(table[[Unregistered_DataStream_2]], fields[count, word])下面的代码显示了使用StatementSet.explain()方法的多汇计划的示例和相应的输出:
EnvironmentSettings settings EnvironmentSettings.inStreamingMode();
TableEnvironment tEnv TableEnvironment.create(settings);final Schema schema Schema.newBuilder().column(count, DataTypes.INT()).column(word, DataTypes.STRING()).build();tEnv.createTemporaryTable(MySource1, TableDescriptor.forConnector(filesystem).schema(schema).option(path, /source/path1).format(csv).build());
tEnv.createTemporaryTable(MySource2, TableDescriptor.forConnector(filesystem).schema(schema).option(path, /source/path2).format(csv).build());
tEnv.createTemporaryTable(MySink1, TableDescriptor.forConnector(filesystem).schema(schema).option(path, /sink/path1).format(csv).build());
tEnv.createTemporaryTable(MySink2, TableDescriptor.forConnector(filesystem).schema(schema).option(path, /sink/path2).format(csv).build());StatementSet stmtSet tEnv.createStatementSet();Table table1 tEnv.from(MySource1).where($(word).like(F%));
stmtSet.add(table1.insertInto(MySink1));Table table2 table1.unionAll(tEnv.from(MySource2));
stmtSet.add(table2.insertInto(MySink2));String explanation stmtSet.explain();
System.out.println(explanation);the result of multiple-sinks plan is Abstract Syntax Tree
LogicalLegacySink(name[default_catalog.default_database.MySink1], fields[count, word])
- LogicalFilter(condition[LIKE($1, _UTF-16LEF%)])- LogicalTableScan(table[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])LogicalLegacySink(name[default_catalog.default_database.MySink2], fields[count, word])
- LogicalUnion(all[true]):- LogicalFilter(condition[LIKE($1, _UTF-16LEF%)]): - LogicalTableScan(table[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])- LogicalTableScan(table[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]]) Optimized Physical Plan
LegacySink(name[default_catalog.default_database.MySink1], fields[count, word])
- Calc(select[count, word], where[LIKE(word, _UTF-16LEF%)])- LegacyTableSourceScan(table[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields[count, word])LegacySink(name[default_catalog.default_database.MySink2], fields[count, word])
- Union(all[true], union[count, word]):- Calc(select[count, word], where[LIKE(word, _UTF-16LEF%)]): - LegacyTableSourceScan(table[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields[count, word])- LegacyTableSourceScan(table[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields[count, word]) Optimized Execution Plan
Calc(select[count, word], where[LIKE(word, _UTF-16LEF%)])(reuse_id[1])
- LegacyTableSourceScan(table[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields[count, word])LegacySink(name[default_catalog.default_database.MySink1], fields[count, word])
- Reused(reference_id[1])LegacySink(name[default_catalog.default_database.MySink2], fields[count, word])
- Union(all[true], union[count, word]):- Reused(reference_id[1])- LegacyTableSourceScan(table[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields[count, word])