北京建网站报价,怎样建设微网站,网页设计制作音乐网站,网站开发的毕业周记#xff08;七#xff09;消息队列-Kafka 序列化avro#xff08;传递#xff09; 客从远方来#xff0c;遗我双鲤鱼。呼儿烹鲤鱼#xff0c;中有尺素书。 ——佚名《饮马长城窟行》 本文已同步CSDN、掘金平台、知乎等多个平台#xff0c;图片依然保持最初发布的水印七消息队列-Kafka 序列化avro传递 客从远方来遗我双鲤鱼。呼儿烹鲤鱼中有尺素书。 ——佚名《饮马长城窟行》 本文已同步CSDN、掘金平台、知乎等多个平台图片依然保持最初发布的水印如CSDN水印。以后属于本人原创均以新建状态在多个平台分享发布 前言
多年前由于工作的性质发现这系列没有写完想了想做人做事还是要有始有终。实在是借口太多了太不像话了…由于时间过得太久了这篇开始可能很多技术以最新或最近的几个版本为主了。
问题背景
在Kafka中生产者与消费者之间传输消息时通常需要对数据进行序列化和反序列化。常见的序列化方式如JSON或String存在以下问题
数据冗余字段名重复存储占用带宽兼容性差新增或删除字段时容易导致上下游解析失败类型安全缺失动态解析易引发运行时错误。
而Avro作为一种高效的二进制序列化框架通过Schema定义数据结构可实现紧凑存储、动态兼容性和强类型校验成为Kafka生态中推荐的序列化方案27。 核心原理 Schema驱动 Avro要求所有数据必须与预定义的Schema文件.avsc匹配。Schema以JSON格式描述数据结构例如 {type: record,name: User,namespace: com.example.avro,fields: [{name: id, type: int},{name: name, type: string}]
}然后使用 avro-maven-plugin 生成 Java 类 plugingroupIdorg.apache.avro/groupIdartifactIdavro-maven-plugin/artifactIdversion1.11.0/versionexecutionsexecutionphasegenerate-sources/phasegoalsgoalschema/goal/goals/execution/executions
/plugin执行 mvn clean compile 后com.example.avro.User 类会被自动生成。 生产者与消费者需共享同一Schema确保序列化与反序列化的一致性。 二进制编码 Avro将数据转换为紧凑的二进制格式相比JSON减少约30%-50%的存储与传输开销。例如整型字段直接以二进制存储无需字段名冗余7。 Schema Registry 为实现Schema动态管理通常搭配Schema Registry如Confluent或Apicurio使用。其核心功能包括 Schema版本控制与兼容性检查通过唯一ID关联消息与Schema避免传输完整Schema带来的性能损耗。 实现步骤
以下以Java代码为例展示Kafka集成Avro的配置方法
1. 添加依赖
dependencies!-- Spring Kafka 依赖--dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency!-- Avro 依赖 --dependencygroupIdorg.apache.avro/groupIdartifactIdavro/artifactId/dependency!-- Schema Registry 依赖 --dependencygroupIdio.confluent/groupIdartifactIdkafka-avro-serializer/artifactIdversion7.2.1/version/dependency
/dependencies运行 HTML
2. 配置生产者
Properties props new Properties();
props.put(bootstrap.servers, localhost:9092);
props.put(key.serializer, StringSerializer.class.getName());
props.put(value.serializer, KafkaAvroSerializer.class.getName());
props.put(schema.registry.url, http://localhost:8081); // Schema Registry地址ProducerString, GenericRecord producer new KafkaProducer(props);// 构建Avro消息
GenericRecord user new GenericData.Record(schema);
user.put(id, 1);
user.put(name, Alice);producer.send(new ProducerRecord(user-topic, user));------ SpringBoot框架 直接用配置application.yml 和生产者服务类--------------
spring:kafka:bootstrap-servers: localhost:9092properties:schema.registry.url: http://localhost:8081producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: io.confluent.kafka.serializers.KafkaAvroSerializerService
public class UserProducer {private final KafkaTemplateString, User kafkaTemplate;Value(${kafka.topic.user})private String topic;public UserProducer(KafkaTemplateString, User kafkaTemplate) {this.kafkaTemplate kafkaTemplate;}public void sendUser(User user) {kafkaTemplate.send(topic, user.getId().toString(), user);}
}在 Spring Boot 启动后我们可以使用以下代码发送一个 User 消息
User user User.newBuilder().setId(1).setName(Alice).build();
userProducer.sendUser(user);控制台应该能够看到消费者成功接收到 User 数据3. 配置消费者
Properties props new Properties();
props.put(bootstrap.servers, localhost:9092);
props.put(group.id, avro-consumer);
props.put(key.deserializer, StringDeserializer.class.getName());
props.put(value.deserializer, KafkaAvroDeserializer.class.getName());
props.put(schema.registry.url, http://localhost:8081);ConsumerString, GenericRecord consumer new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList(user-topic));while (true) {ConsumerRecordsString, GenericRecord records consumer.poll(Duration.ofMillis(100));for (ConsumerRecordString, GenericRecord record : records) {System.out.println(Received: record.value().get(name));}
}------ SpringBoot框架 直接用配置application.yml 和消费者服务类--------------
在 application.yml 中配置消费者参数spring:kafka:consumer:key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializerproperties:specific.avro.reader: true然后编写 Kafka 消费者代码Service
KafkaListener(topics user_topic, groupId user_group)
public class UserConsumer {KafkaHandlerpublic void consume(User user) {System.out.println(Received user: user.getName());}
} 常见问题与解决方案
Schema兼容性错误 现象生产者更新Schema后消费者无法解析旧数据。解决在Schema Registry中配置兼容性策略如BACKWARD允许新增字段并设置默认值7。 ClassNotFoundException 现象反序列化时提示Avro生成的类不存在。解决通过Maven插件avro-maven-plugin自动生成Java类并确保生成路径在编译范围内2。 性能瓶颈 现象高吞吐场景下序列化延迟较高。优化复用DatumWriter和DatumReader实例避免重复初始化开销7。 总结
Avro通过Schema定义与二进制编码为Kafka提供了高效、类型安全的序列化方案。结合Schema Registry可实现动态兼容性管理适用于复杂业务场景下的数据演进需求。实践中需注意Schema版本控制与性能调优具体工具链配置可参考Confluent官方文档27。 引用说明
代码结构参考自SpringBoot RestTemplate配置方案通过替换默认组件实现功能增强。Schema兼容性问题分析借鉴了MAT工具中内存对象关联性的排查思路。
后续
下期预告敬请关注 八消息队列-Kafka 生产者