企业网站模板 简洁,南京市互联网平台公司,北京微信网站搭建多少钱,东莞室内设计学校Kafka#xff1a;分布式消息系统的核心原理与安装部署-CSDN博客
自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例-CSDN博客
Kafka 生产者全面解析#xff1a;从基础原理到高级实践-CSDN博客
Kafka 生产者优化与数据处理经验-CSDN博客
Kafka 工作流程解析#xff1a…
Kafka分布式消息系统的核心原理与安装部署-CSDN博客
自定义 Kafka 脚本 kf-use.sh 的解析与功能与应用示例-CSDN博客
Kafka 生产者全面解析从基础原理到高级实践-CSDN博客
Kafka 生产者优化与数据处理经验-CSDN博客
Kafka 工作流程解析从 Broker 工作原理、节点的服役、退役、副本的生成到数据存储与读写优化-CSDN博客
Kafka 消费者全面解析原理、消费者 API 与Offset 位移-CSDN博客
Kafka 分区分配及再平衡策略深度解析与消费者事务和数据积压的简单介绍-CSDN博客
Kafka 数据倾斜原因、影响与解决方案-CSDN博客
Kafka 核心要点解析_kafka mirrok-CSDN博客
Kafka 核心问题深度解析全面理解分布式消息队列的关键要点_kafka队列日志-CSDN博客 目录
一、分区分配策略基础
二、Range 分区分配策略
一原理
二案例
三Range 分区分配再平衡案例
三、RoundRobin 分区分配策略
一原理
二案例
三RoundRobin 分区分配再平衡案例
四、Sticky 分区分配策略
一原理
二案例
三Sticky 分区分配再平衡案例
五、CooperativeSticky 分区分配策略
六、消费者事务
七、数据积压消费者如何提高吞吐量
八、总结 在 Kafka 的消费任务处理中分区的分配以及再平衡是至关重要的环节。合理的分区分配策略能够确保消费者高效地处理消息而理解再平衡机制则有助于应对消费者组在运行过程中的动态变化。本文将深入探讨 Kafka 中不同的分区分配策略包括 Range、RoundRobin、Sticky 和 CooperativeSticky以及它们在各种场景下的再平衡表现并结合实际案例进行详细分析并对消费者事务和数据积压进行简单介绍。 一、分区分配策略基础 在一个 Kafka 消费者组中包含多个消费者而一个主题则由多个分区组成。关键问题在于确定哪个消费者来消费哪个分区的数据。Kafka 提供了四种主流的分区分配策略并且可以通过配置参数 partition.assignment.strategy 来修改分区的分配策略默认策略是 Range CooperativeSticky。同时还有一些相关的重要参数 参数名称 描述 heartbeat.interval.ms Kafka 消费者和 coordinator 之间的心跳时间默认 3s。 该条目的值必须小于session.timeout.ms也不应该高于 session.timeout.ms 的 1/3。 session.timeout.ms Kafka 消费者和 coordinator 之间连接超时时间默认 45s。超 过该值该消费者被移除消费者组执行再平衡。 max.poll.interval.ms 消费者处理消息的最大时长默认是 5 分钟。超过该值该 消费者被移除消费者组执行再平衡 partition.assignment.strategy 消 费 者 分 区 分 配 策 略 默 认 策 略 是 Range CooperativeSticky。Kafka 可以同时使用多个分区分配策略。 可 以 选 择 的 策 略 包 括 Range 、 RoundRobin 、 Sticky 、CooperativeSticky 二、Range 分区分配策略 一原理 Range 分区分配策略是基于主题的分区数量和消费者数量进行分配。它会按照顺序将连续的分区分配给每个消费者尽可能平均地分配分区但可能会导致不同消费者分配到的分区数量不一致。 二案例 首先将主题 first 修改为 7 个分区 bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic first --partitions 7注意分区数可增加但不能减少主题的副本数修改需要制定计划执行不能直接修改。 由三个消费者 CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组组名都为 “test”同时启动这 3 个消费者。 启动 CustomProducer 生产者发送 500 条消息随机发送到不同的分区修改发送次数为 500 次。
package com.bigdata.kafka.producer;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {// 1. 创建 kafka 生产者的配置对象Properties properties new Properties();// 2. 给 kafka 配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.235.128:9092);// key,value 序列化必须key.serializervalue.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());// 3. 创建 kafka 生产者对象KafkaProducerString, String kafkaProducer newKafkaProducerString, String(properties);// 4. 调用 send 方法,发送消息for (int i 0; i 500; i) {// 添加回调kafkaProducer.send(new ProducerRecord(first,bigdata i), new Callback() {// 该方法在 Producer 收到 ack 时调用为异步调用Overridepublic void onCompletion(RecordMetadata metadata,Exception exception) {if (exception null) {// 没有异常,输出信息到控制台System.out.println( 主题 metadata.topic() - 分区 metadata.partition());} else {// 出现异常打印exception.printStackTrace();}}});// 延迟一会会看到数据发往不同分区Thread.sleep(20);}// 5. 关闭资源kafkaProducer.close();}
} 说明Kafka 默认的分区分配策略就是 Range CooperativeSticky所以不需要修改策略。 默认是Range,但是在经过一次升级之后会自动变为CooperativeSticky。这个是官方给出的解释。 默认的分配器是[RangeAssignor, CooperativeStickyAssignor]默认情况下将使用RangeAssignor但允许通过一次滚动反弹升级到CooperativeStickyAssignor该滚动反弹会将RangeAssignor从列表中删除。 观察消费情况发现一个消费者消费了 56 分区一个消费了 0,1,2 分区一个消费了 34 分区。这是按照 Range 策略分配的结果。 此时并没有修改分区策略原因是默认是Range. 三Range 分区分配再平衡案例 停止掉 0 号消费者快速重新发送消息45s 以内此时 1 号消费者消费到 3、4 号分区数据2 号消费者消费到 5、6 号分区数据0 号的数据无人消费。 说明0 号消费者挂掉后消费者组需要按照超时时间 45s 来判断它是否退出所以需要等待时间到了 45s 后判断它真的退出就会把任务分配给其他 broker 执行。 再次重新发送消息45s 以后1 号消费者消费到 0、1、2、3 号分区数据2 号消费者消费到 4、5、6 号分区数据。 说明消费者 0 已经被踢出消费者组所以重新按照 range 方式分配。 三、RoundRobin 分区分配策略 一原理 RoundRobin 分区分配策略以轮询的方式将分区分配给消费者确保每个消费者尽可能均衡地获取分区不考虑主题的因素只要是消费者组内的分区都会按照轮询顺序分配。 二案例 在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代码中修改分区分配策略为 RoundRobin指定 org.apache.kafka.clients.consumer.RoundRobinAssignor并修改消费者组为 test2。
package com.bigdata.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerWithFenPei {public static void main(String[] args) {Properties properties new Properties();// 连接kafkaproperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,hadoop11:9092);// 字段反序列化 key 和 valueproperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());// 配置消费者组组名任意起名 必须properties.put(ConsumerConfig.GROUP_ID_CONFIG, test2);// 指定分区的分配方案properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.RoundRobinAssignor);KafkaConsumerString, String kafkaConsumer new KafkaConsumerString, String(properties);// 消费者订阅主题主题有数据就会拉取数据// 指定消费的主题ArrayListString topics new ArrayList();topics.add(first);// 一个消费者可以订阅多个主题kafkaConsumer.subscribe(topics);while(true){//1 秒中向kafka拉取一批数据ConsumerRecordsString, String records kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString,String record :records) {// 打印一条数据System.out.println(record);// 可以打印记录中的很多内容比如 key value offset topic 等信息System.out.println(record.value());}}}
}修改一下消费者组为test2 重启 3 个消费者重复发送消息步骤并观察分区结果。 三RoundRobin 分区分配再平衡案例 停止掉 0 号消费者快速重新发送消息45s 以内1 号消费者消费到 2、5 号分区数据2 号消费者消费到 4、1 号分区数据0 号消费者以前对应的数据无人消费。 说明0 号消费者挂掉后消费者组需要按照超时时间 45s 来判断它是否退出所以需要等待时间到了 45s 后判断它真的退出就会把任务分配给其他 broker 执行。 再次重新发送消息45s 以后1 号消费者消费到 0、2、4、6 号分区数据2 号消费者消费到 1、3、5 号分区数据。 说明消费者 0 已经被踢出消费者组所以重新按照 RoundRobin 方式分配。 四、Sticky 分区分配策略 一原理 粘性分区定义可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前 考虑上一次分配的结果尽量少的调整分配的变动可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略首先会尽量均衡的放置分区 到消费者上面在出现同一消费者组内消费者出现问题的时候会尽量保持原有分配的分区不变化。 二案例 1需求
设置主题为 first7 个分区准备 3 个消费者采用粘性分区策略并进行消费观察
消费分配情况。然后再停止其中一个消费者再次观察消费分配情况。
2步骤
1修改分区分配策略为粘性。
注意3 个消费者都应该注释掉之后重启 3 个消费者如果出现报错全部停止等
会再重启或者修改为全新的消费者组。
// 修改分区分配策略
ArrayListString startegys new ArrayList();
startegys.add(org.apache.kafka.clients.consumer.StickyAssignor);
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);
3使用同样的生产者发送 500 条消息。
可以看到会尽量保持分区的个数近似划分分区。 三Sticky 分区分配再平衡案例 停止掉 0 号消费者快速重新发送消息45s 以内1 号消费者消费到 2、5、3 号分区数据2 号消费者消费到 4、6 号分区数据0 号消费者的任务无人顶替。 说明0 号消费者挂掉后消费者组需要按照超时时间 45s 来判断它是否退出所以需 要等待时间到了 45s 后判断它真的退出就会把任务分配给其他 broker 执行。 再次重新发送消息45s 以后1 号消费者消费到 2、3、5 号分区数据2 号消费者消费到 0、1、4、6 号分区数据。 说明消费者 0 已经被踢出消费者组所以重新按照粘性方式分配。 五、CooperativeSticky 分区分配策略 CooperativeSticky 是新添加的策略。在消费过程中会根据消费的偏移量情况进行重新再平衡也就是粘性分区并且在运行过程中还会根据消费的实际情况重新分配消费者直到平衡为止。其好处是实现负载均衡但多次平衡会浪费性能它采用动态平衡在消费过程中实施再平衡而不是等到某个消费者退出再平衡。 六、消费者事务 若要实现 Kafka 消费端的精准一次性消费需要将消费过程和提交 offset 过程做原子绑定。此时可将 Kafka 的 offset 保存到支持事务的自定义介质如 MySQL这部分知识将在后续项目中深入涉及事务具有 ACID 四大特征例如转账场景张三 -- 李四就需要事务的保障来确保数据的准确性和完整性。 七、数据积压消费者如何提高吞吐量 当面临数据积压问题时消费者可以采取多种方式提高吞吐量例如增加消费者数量、优化消费者代码处理逻辑、调整相关参数如 max.poll.interval.ms 等以适应更高的处理负载等。后续将深入探讨数据积压场景下的优化策略。 八、总结 通过对 Kafka 分区分配以及再平衡策略的深入理解和实践可以更好地构建和优化 Kafka 消费任务处理流程确保系统的高效稳定运行。在实际应用中需要根据具体的业务需求和场景特点选择合适的分区分配策略并合理处理再平衡过程中的各种情况。 消费者事务方面为实现精准一次性消费需将消费与提交 offset 原子绑定可将 offset 存于支持事务的自定义介质如 MySQL 中。在数据积压场景下消费者可通过增加数量、优化代码处理逻辑、调整参数等方式提高吞吐量后续会深入探讨相关优化策略。这些知识对于深入理解和优化 Kafka 消费者的性能、可靠性和数据处理准确性具有极为重要的意义有助于在实际应用中更好地构建和管理基于 Kafka 的系统架构