陕西省城乡建设厅官方网站,网络广告策划书模板,企业微信自建应用怎么开发,别人给我们做的网站如何关闭背景信息
Canal是一个CDC#xff08;ChangeLog Data Capture#xff0c;变更日志数据捕获#xff09;工具#xff0c;可以实时地将MySQL变更传输到其他系统。Canal为变更日志提供了统一的数据格式#xff0c;并支持使用JSON或protobuf序列化消息#xff08;Canal默认使用…背景信息
Canal是一个CDCChangeLog Data Capture变更日志数据捕获工具可以实时地将MySQL变更传输到其他系统。Canal为变更日志提供了统一的数据格式并支持使用JSON或protobuf序列化消息Canal默认使用protobuf。支持Canal格式的连接器有消息队列Kafka和对象存储OSS。
Flink支持将Canal的JSON消息解析为INSERT、UPDATE或DELETE消息到Flink SQL系统中。在很多情况下利用Canal这个特性非常的有用例如 将增量数据从数据库同步到其他系统 日志审计 数据库的实时物化视图 数据库表的temporal join变更历史
Flink还支持将Flink SQL中的INSERT、UPDATE或DELETE消息编码为Canal格式的JSON消息输出到Kafka等存储中。 重要 目前Flink还不支持将UPDATE_BEFORE和UPDATE_AFTER合并为一条UPDATE消息。因此Flink将UPDATE_BEFORE和UPDATE_AFTER分别编码为DELETE和INSERT类型的Canal消息。 将Kafka topic注册成Flink表之后您可以将Canal消息用作变更日志源。
-- 关于MySQL products 表的实时物化视图。
-- 计算相同产品的最新平均重量。
SELECT name, AVG(weight) FROM topic_products GROUP BY name;-- 将MySQL products 表的所有数据和增量更改同步到Elasticsearch products 索引以供将来搜索。
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;
配置选项 选项 要求 默认 类型 描述 format 必填 (none) String 指定要使用的格式使用Canal格式时参数取值为canal-json。 canal-json.ignore-parse-errors 选填 false Boolean 参数取值如下 true当解析异常时跳过当前字段或行。 false默认值报出错误作业启动失败。 canal-json.timestamp-format.standard 选填 SQL String 指定输入和输出时间戳格式。参数取值如下 SQL解析yyyy-MM-dd HH:mm:ss.s{precision}格式的输入时间戳例如2020-12-30 12:13:14.123并以相同格式输出时间戳。 ISO-8601解析yyyy-MM-ddTHH:mm:ss.s{precision}格式的输入时间戳例如2020-12-30T12:13:14.123并以相同的格式输出时间戳。 canal-json.map-null-key.mode 选填 FAIL String 指定处理Map中key值为空的方法。参数取值如下 FAIL在Map中key值为空的时候抛出异常。 DROP丢弃Map中key值为空的数据项。 LITERAL使用字符串常量来替换Map中的空key值。字符串常量的值由canal-json.map-null-key.literal定义。 canal-json.map-null-key.literal 选填 null String 当canal-json.map-null-key.mode的值是LITERAL时指定字符串常量替换Map中的空key值。 canal-json.encode.decimal-as-plain-number 选填 false Boolean 参数取值如下 true所有DECIMAL类型的数据保持原状不使用科学计数法表示例如0.000000027表示为0.000000027。 false所有DECIMAL类型的数据使用科学计数法表示例如0.000000027表示为2.7E-8。 canal-json.database.include 选填 (none) String 一个可选的正则表达式通过正则匹配Canal记录中的database元字段仅读取指定数据库的changelog记录。正则字符串与Java的Pattern兼容。 canal-json.table.include 选填 (none) String 一个可选的正则表达式通过正则匹配Canal记录中的table元字段仅读取指定表的changelog记录。正则字符串与Java的Pattern兼容。
类型映射
目前Canal使用JSON格式进行序列化和反序列化。有关数据类型映射的更多详细信息请参阅JSON Format。Canal格式额外兼容了数据传输服务DTS在Kafka集群存储使用的Canal扩展变更类型INIT。请参见Kafka集群的数据存储格式。
其他使用说明
可用的元数据
下面的格式元数据可以在DDL语句中声明为只读VIRTUAL列。
重要
格式元数据字段只有在相应的连接器转发格式元数据时才可用。目前只有Kafka连接器能够声明其值格式的元数据字段。 键 数据类型 说明 database STRING NULL 原始数据库。对应于Canal记录中的database字段。 table STRING NULL 原始数据库的表。对应于Canal记录中的table字段。 sql-type MAPSTRING, INT NULL 各种sql类型的映射。对应于Canal记录中的sqlType字段。 pk-names ARRAYSTRING NULL 主键名称数组。对应于Canal记录中的pkNames字段。 ingestion-timestamp TIMESTAMP_LTZ(3) NULL 连接器处理事件时的时间戳。对应于Canal记录中的ts字段。
如何在Kafka中访问Canal元数据字段的代码示例如下。
CREATE TABLE KafkaTable (origin_database STRING METADATA FROM value.database VIRTUAL,origin_table STRING METADATA FROM value.table VIRTUAL,origin_sql_type MAPSTRING, INT METADATA FROM value.sql-type VIRTUAL,origin_pk_names ARRAYSTRING METADATA FROM value.pk-names VIRTUAL,origin_ts TIMESTAMP(3) METADATA FROM value.ingestion-timestamp VIRTUAL,user_id BIGINT,item_id BIGINT,behavior STRING
) WITH (connector kafka,topic user_behavior,properties.bootstrap.servers localhost:9092,properties.group.id testGroup,scan.startup.mode earliest-offset,value.format canal-json
);
常见问题
故障时投递重复的变更事件
在正常的操作环境下Canal能够以exactly-once的语义投递每条变更事件Flink能够正常消费Canal产生的变更事件。在非正常情况下例如有故障发生Canal只能保证at-least-once的投递语义。此时Canal可能会投递重复的变更事件到Kafka中当Flink从Kafka中消费的时候就会得到重复的事件可能导致Flink query的运行得到错误的结果或者非预期的异常。因此在这种情况下建议将作业参数table.exec.source.cdc-events-duplicate设置成true并在该source上定义PRIMARY KEY。Flink系统会生成一个额外的有状态算子使用该PRIMARY KEY来对变更事件去重并生成一个规范化的changelog流。 参考:Canal格式的使用方法和类型映射_实时计算 Flink版(Flink)-阿里云帮助中心