网站不排名一切等于零,电子商务网站预算,湖北网站建设公司,杭州网站seo外包Debezium SchemaNameAdjuster 分析
目录 1. 概述2. 核心功能3. 实现原理4. 应用场景5. 扩展示例6. 总结1. 概述
SchemaNameAdjuster 是 Debezium 中的一个工具类,主要用于确保 Schema 名称符合 Avro 命名规范。在数据库变更事件被转换为 Kafka 消息时,需要为每个表和字段创…Debezium SchemaNameAdjuster 分析
目录
1. 概述2. 核心功能3. 实现原理4. 应用场景5. 扩展示例6. 总结1. 概述
SchemaNameAdjuster 是 Debezium 中的一个工具类,主要用于确保 Schema 名称符合 Avro 命名规范。在数据库变更事件被转换为 Kafka 消息时,需要为每个表和字段创建相应的 Avro Schema,而这些名称必须符合 Avro 的命名规则。
2. 核心功能 名称校验: 检查 Schema 名称是否符合 Avro 命名规范验证首字符和非首字符的合法性 名称调整: 将不合法字符替换为合法字符(默认使用下划线’_')保持名称的语义性和可读性 冲突处理: 检测并处理名称冲突支持自定义冲突处理策略 3. 实现原理
3.1 核心接口
public interface SchemaNameAdjuster {/*** 调整提议的名称使其符合 Avro 命名规范*/String adjust(String proposedName);
}3.2 名称验证规则
首字符规则:public static boolean isValidFullnameFirstCharacter(char c) {return (c = 'a' c = 'z') || (c = 'A' c = 'Z') || c == '_';
}非首字符规则:public static boolean isValidFullnameNonFirstCharacter(char c) {return c == '.' || isValidFullnameFirstCharacter(c) || (c = '0' c = '9');
}3.3 调整策略
默认策略:SchemaNameAdjuster adjuster = SchemaNameAdjuster.create("_", (original, replacement, conflict) - {LOGGER.warn("Schema name '{}' is invalid, using '{}' instead", original, replacement);
});自定义替换:SchemaNameAdjuster customAdjuster = SchemaNameAdjuster.create(c - c == '-' ? "_" : String.valueOf(c),(original, replacement, conflict) - {// 自定义冲突处理逻辑}
);4. 应用场景
4.1 表 Schema 构建
在构建数据库表的 Schema 时,需要为 Value Schema 和 Key Schema 生成合法的 Avro 名称:
SchemaBuilder valSchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Value"));
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct().name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Key"));4.2 CloudEvents 格式转换
在将 Debezium 事件转换为 CloudEvents 格式时,需要调整 Schema 名称:
CESchemaBuilder ceSchemaBuilder = defineSchema().withName(schemaNameAdjuster.adjust(maker.ceEnvelopeSchemaName()))4.3 逻辑表路由
在进行逻辑表路由时,需要为新的目标主题生成合法的 Schema 名称:
valueBuilder.name(schemaNameAdjuster.adjust(newTopicName + ".Value"));4.4 心跳机制
在配置心跳机制时,需要确保心跳消息的 Schema 名称符合规范:
return new HeartbeatImpl(interval,topic,logicalName,schemaNameAdjuster);4.5 基本用例
表名转换:String tableName = "my-table";
String adjustedName = adjuster.adjust(tableName); // 结果: "my_table"复杂Schema名称:String complexName = "com.example.my-schema.v2";
String adjusted = adjuster.adjust(complexName); // 结果: "com.example.my_schema.v2"特殊字符处理:String specialChars = "table$name@2.0";
String adjusted = adjuster.adjust(specialChars); // 结果: "table_name_2.0"4.6 具体Schema生成示例
让我们以一个具体的表结构为例,展示 SchemaNameAdjuster 如何处理 Schema 名称:
-- 原始表结构
CREATE TABLE inventory.products (id INT PRIMARY KEY,name VARCHAR(255),description TEXT,weight DECIMAL(5,