江苏金安建设公司网站,如何创建网站挣钱,小时的网站建设,怎么在百度发布免费广告文章目录 01. Kafka 分区的作用02. PartitionInfo 分区源码03. Partitioner 分区器接口源码04. 自定义分区器05. 默认分区器 DefaultPartitioner06. 随机分区分配 RoundRobinPartitioner07. 黏性随机分区分配 UniformStickyPartitioner08. 为什么Kafka 2.4 版本后引入黏性分区策… 文章目录 01. Kafka 分区的作用02. PartitionInfo 分区源码03. Partitioner 分区器接口源码04. 自定义分区器05. 默认分区器 DefaultPartitioner06. 随机分区分配 RoundRobinPartitioner07. 黏性随机分区分配 UniformStickyPartitioner08. 为什么Kafka 2.4 版本后引入黏性分区策略 01. Kafka 分区的作用
分区的作用就是提供负载均衡的能力或者说对数据进行分区的主要原因就是为了实现系统的高伸缩性。不同的分区能够被放置到不同节点的机器上而数据的读写操作也都是针对分区这个粒度而进行的这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且我们还可以通过添加新的节点机器来增加整体系统的吞吐量。
除了提供负载均衡这种最核心的功能之外利用分区也可以实现其他一些业务级别的需求比如实现业务级别的消息顺序的问题。
生产者发送的消息实体 ProducerRecord 的构造方法
public class ProducerRecordK, V {// ....public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {this(topic, partition, timestamp, key, value, null);}public ProducerRecord(String topic, Integer partition, K key, V value, IterableHeader headers) {this(topic, partition, null, key, value, headers);}public ProducerRecord(String topic, Integer partition, K key, V value) {this(topic, partition, null, key, value, null);}public ProducerRecord(String topic, K key, V value) {this(topic, null, null, key, value, null);}public ProducerRecord(String topic, V value) {this(topic, null, null, null, value, null);}}我们发送消息时可以指定分区号如果不指定那就需要分区器这个很重要一条消息该发往哪一个分区关系到顺序消息问题。下面我们说说 Kafka 生产者的分区策略。所谓分区策略是决定生产者将消息发送到哪个分区的算法。Kafka 为我们提供了默认的分区策略同时它也支持你自定义分区策略。
02. PartitionInfo 分区源码
/*** This is used to describe per-partition state in the MetadataResponse.*/
public class PartitionInfo {// 表示该分区所属的主题名称。private final String topic;// 表示该分区的编号。private final int partition;// 表示该分区的领导者节点。private final Node leader;// 表示该分区的所有副本节点。private final Node[] replicas;// 表示该分区的所有同步副本节点。private final Node[] inSyncReplicas;// 表示该分区的所有离线副本节点。private final Node[] offlineReplicas;public PartitionInfo(String topic, int partition, Node leader, Node[] replicas, Node[] inSyncReplicas) {this(topic, partition, leader, replicas, inSyncReplicas, new Node[0]);}public PartitionInfo(String topic,int partition,Node leader,Node[] replicas,Node[] inSyncReplicas,Node[] offlineReplicas) {this.topic topic;this.partition partition;this.leader leader;this.replicas replicas;this.inSyncReplicas inSyncReplicas;this.offlineReplicas offlineReplicas;}// ....
}03. Partitioner 分区器接口源码
Kafka的Partitioner接口是用来决定消息被分配到哪个分区的。它定义了一个方法partition该方法接收三个参数topic、key和value返回一个int类型的分区号表示消息应该被分配到哪个分区。
public interface Partitioner extends Configurable {/*** Compute the partition for the given record.** param topic The topic name* param key The key to partition on (or null if no key)* param keyBytes The serialized key to partition on( or null if no key)* param value The value to partition on or null* param valueBytes The serialized value to partition on or null* param cluster The current cluster metadata*/int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);/*** This is called when partitioner is closed.*/default void close() {}
}Partitioner接口的实现类可以根据不同的业务需求来实现不同的分区策略例如根据消息的键、值、时间戳等信息来决定分区。
这里的topic、key、keyBytes、value和valueBytes都属于消息数据cluster则是集群信息。Kafka 给你这么多信息就是希望让你能够充分地利用这些信息对消息进行分区计算出它要被发送到哪个分区中。
04. 自定义分区器
只要你自己的实现类定义好了 partition 方法同时设置partitioner.class 参数为你自己实现类的 Full Qualified Name那么生产者程序就会按照你的代码逻辑对消息进行分区。
① 实现自定义分区策略 MyPartitioner
public class MyPartitioner implements Partitioner {private final AtomicInteger counter new AtomicInteger(0);Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取该 topic 可用的所有分区信息ListPartitionInfo partitionInfos cluster.availablePartitionsForTopic(topic);int size partitionInfos.size();if(keyBytesnull){// 如果 keyBytes 为 null表示该消息没有 key此时采用 round-robin 的方式将消息均匀地分配到不同的分区中。// 每次调用 getAndIncrement() 方法获取计数器的当前值并自增然后对可用分区数取模得到该消息应该被分配到的分区编号。return counter.getAndIncrement() % size;}else{// 如果 keyBytes 不为 null表示该消息有 key此时采用 murmur2 哈希算法将 key 转换为一个整数值并对可用分区数取模得到该消息应该被分配到的分区编号。return Utils.toPositive(Utils.murmur2(keyBytes) % size);}}Overridepublic void close() {}Overridepublic void configure(MapString, ? map) {}
}② 显式地配置生产者端的参数 partitioner.class
public class CustomProducer01 {private static final String brokerList 10.65.132.2:9093;private static final String topic test;public static Properties initConfig(){Properties properties new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerList);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 使用自定义分区器properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());return properties;}public static void main(String[] args) {// kafka生产者属性配置Properties properties initConfig();// kafka生产者发送消息默认是异步发送方式KafkaProducerString, String kafkaProducer new KafkaProducerString, String(properties);ProducerRecordString, String producerRecord new ProducerRecord(topic, 你好kafka,使用自定义分区器);kafkaProducer.send(producerRecord, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(enull){System.out.println(recordMetadata发送的分区为recordMetadata.partition());}}});// 关闭资源kafkaProducer.close();}
}05. 默认分区器 DefaultPartitioner
ProducerRecord对象包含了主题名称、分区、记录的键和值。
① 如果发送的消息ProducerRecord指定了分区就直接使用该分区不会使用分区器
ProducerRecordString, String record new ProducerRecord(test,2,hh,你好);对应源码
public class KafkaProducerK, V implements ProducerK, V {// 如果消息记录具有分区则返回该值否则调用配置的分区器类来计算分区。private int partition(ProducerRecordK, V record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {// 如果记录中指定了分区则使用该分区,否则调用分区器计算分区 Integer partition record.partition();return partition ! null ?partition :partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);}
}② Kafka消息就是一个个的键–值对ProducerRecord对象可以只包含主题名称和值键默认情况下是null。不过大多数应用程序还是会用键来发送消息。键有两种用途一是作为消息的附加信息与消息保存在一起二是用来确定消息应该被写入主题的哪个分区。具有相同键的消息将被写入同一个分区。如果一个进程只从主题的某些分区读取数据那么具有相同键的所有记录都会被这个进程读取。要创建一个包含键和值的记录只需像下面这样创建一个ProducerRecord即可
ProducerRecordString, String record new ProducerRecord(test,hh,你好);如果键不为空并且使用了默认的分区器那么Kafka会对键进行哈希使用Kafka自己的哈希算法即使升级Java版本哈希值也不会发生变化然后根据哈希值把消息映射到特定的分区。这里的关键在于同一个键总是被映射到同一个分区所以在进行映射时会用到主题所有的分区而不只是可用的分区。这也意味着如果在写入数据时目标分区不可用那么就会出错。不过这种情况很少发生。
③ 如果要创建键为null的消息那么不指定键就可以了
ProducerRecordString, String record new ProducerRecord(test,你好);如果键为null并且使用了默认的分区器那么记录将被随机发送给主题的分区。分区器使用轮询调度(round-robin)算法将消息均衡地分布到各个分区中。从Kafka 2.4开始在处理键为null的记录时默认分区器使用的轮询调度算法具备了黏性。也就是说在切换到下一个分区之前它会将同一个批次的消息全部写入当前分区。这样就可以使用更少的请求发送相同数量的消息既降低了延迟又减少了broker占用CPU的时间。
如果使用了默认的分区器那么只有在不改变主题分区数量的情况下键与分区之间的映射才能保持一致。例如只要分区数量保持不变就可以保证用户045189的记录总是被写到分区34。这样就可以在从分区读取数据时做各种优化。但是一旦主题增加了新分区这个就无法保证了——旧数据仍然留在分区34但新记录可能被写到了其他分区。如果要使用键来映射分区那么最好在创建主题时就把分区规划好而且永远不要增加新分区。
/**默认的分区策略如下如果记录中指定了分区则使用该分区。如果未指定分区但存在键则根据键的哈希值选择一个分区。如果既没有指定分区也没有键则选择一个“粘性分区”当批处理满时更改。kafka 2.4 版本以后*/
public class DefaultPartitioner implements Partitioner {private final StickyPartitionCache stickyPartitionCache new StickyPartitionCache();public void configure(MapString, ? configs) {}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,int numPartitions) {// 如果键为null则随机选择一个粘性分区 if (keyBytes null) {return stickyPartitionCache.partition(topic, cluster);}// 如果键不为空那么Kafka会对键进行哈希然后根据哈希值把消息映射到特定的分区。// 使用 MurmurHash2 算法计算给定字节数组的哈希值。return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}public void close() {}/*** 如果当前的粘性分区已经完成了一个批次更改粘性分区。 * 如果尚未确定粘性分区设置一个粘性分区。*/public void onNewBatch(String topic, Cluster cluster, int prevPartition) {stickyPartitionCache.nextPartition(topic, cluster, prevPartition);}
}用于粘性分区行为的缓存
/*** 一个内部类实现了用于粘性分区行为的缓存。该缓存跟踪任何给定主题的当前粘性分区。 */
public class StickyPartitionCache {// ConcurrentMap类型的indexCache成员变量用于存储主题和其对应的粘性分区。private final ConcurrentMapString, Integer indexCache;public StickyPartitionCache() {this.indexCache new ConcurrentHashMap();}// 获取给定主题的当前粘性分区。如果该主题的粘性分区尚未设置则返回下一个分区。public int partition(String topic, Cluster cluster) {Integer part indexCache.get(topic);if (part null) {return nextPartition(topic, cluster, -1);}return part;}// 获取给定主题的下一个粘性分区。 public int nextPartition(String topic, Cluster cluster, int prevPartition) {ListPartitionInfo partitions cluster.partitionsForTopic(topic);// 获取给定主题的粘性分区Integer oldPart indexCache.get(topic);Integer newPart oldPart;if (oldPart null || oldPart prevPartition) {// 如果没有可用分区则从所有分区列表中随机选择一个可用分区ListPartitionInfo availablePartitions cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() 1) {Integer random Utils.toPositive(ThreadLocalRandom.current().nextInt());newPart random % partitions.size();// 如果只有一个可用分区则选择该分区} else if (availablePartitions.size() 1) {newPart availablePartitions.get(0).partition();// 从可用分区列表中随机选择一个分区} else {while (newPart null || newPart.equals(oldPart)) {int random Utils.toPositive(ThreadLocalRandom.current().nextInt());newPart availablePartitions.get(random % availablePartitions.size()).partition();}}if (oldPart null) {indexCache.putIfAbsent(topic, newPart);} else {indexCache.replace(topic, prevPartition, newPart);}return indexCache.get(topic);}return indexCache.get(topic);}
}除了默认的分区器Kafka客户端还提供了RoundRobinPartitioner和UniformStickyPartitioner。在消息不包含键的情况下可以用它们来实现随机分区分配和黏性随机分区分配。对某些应用程序例如ETL应用程序会将数据从Kafka加载到关系数据库中并使用Kafka记录的键作为数据库的主键来说键很重要但如果负载出现了倾斜那么其中某些键就会对应较大的负载。这个时候可以用UniformStickyPartitioner将负载均衡地分布到所有分区。
06. 随机分区分配 RoundRobinPartitioner
RoundRobinPartitioner 分区器使用轮询调度(round-robin)算法将消息均衡地分布到各个分区中。在Kafka 2.4版本之前在处理键为null的记录时默认分区器使用的便是轮询调度算法。
轮询调度(round-robin)算法即顺序分配。比如一个主题下有 3 个分区那么第一条消息被发送到分区 0第二条被发送到分区 1第三条被发送到分区 2以此类推。当生产第 4 条消息时又会重新开始即将其分配到分区 0就像下面这张图展示的那样。 轮询策略有非常优秀的负载均衡表现它总是能保证消息最大限度地被平均分配到所有分区上。RoundRobinPartitioner源码
/*** The Round-Robin partitioner:当用户希望将写操作平均分配到所有分区时可以使用此分区策略。 */
public class RoundRobinPartitioner implements Partitioner {private final ConcurrentMapString, AtomicInteger topicCounterMap new ConcurrentHashMap();public void configure(MapString, ? configs) {}Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取该 topic 所有的分区ListPartitionInfo partitions cluster.partitionsForTopic(topic);int numPartitions partitions.size();int nextValue nextValue(topic);// 获取该 topic 所有可用的分区ListPartitionInfo availablePartitions cluster.availablePartitionsForTopic(topic);if (!availablePartitions.isEmpty()) {// 取模从可用的分区列表中获取分区// Utils.toPositive(nextValue) 的作用是将传入的参数 nextValue 转换为正数。// 如果 nextValue 是负数则返回 0否则返回 nextValue 的值。int part Utils.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {// no partitions are available, give a non-available partition// 取模从分区列表中获取分区return Utils.toPositive(nextValue) % numPartitions;}}private int nextValue(String topic) {// 在ConcurrentMap中插入一个键值对如果该键不存在则使用AtomicInteger的默认值0初始化值// 如果该键已经存在则返回与该键关联的AtomicInteger对象。AtomicInteger counter topicCounterMap.computeIfAbsent(topic, k - {return new AtomicInteger(0);});// 使用返回的AtomicInteger对象对值进行原子操作增加值return counter.getAndIncrement();}public void close() {}
}07. 黏性随机分区分配 UniformStickyPartitioner
黏性分区策略会随机选择一个分区并尽可能一直使用该分区待该分区的batch已满或者已完成时切换分区。
UniformStickyPartitioner 实现源码
/*** The partitioning strategy:如果记录中指定了分区则使用该分区否则选择粘性分区当批处理满时会更改该分区。*/
public class UniformStickyPartitioner implements Partitioner {private final StickyPartitionCache stickyPartitionCache new StickyPartitionCache();public void configure(MapString, ? configs) {}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// StickyPartitionCache 获取粘性分区return stickyPartitionCache.partition(topic, cluster);}public void close() {}/*** 如果当前的粘性分区已经完成了一个批次请更改粘性分区。* 如果尚未确定粘性分区设置一个粘性分区。*/public void onNewBatch(String topic, Cluster cluster, int prevPartition) {stickyPartitionCache.nextPartition(topic, cluster, prevPartition);}
}08. 为什么Kafka 2.4 版本后引入黏性分区策略
① 提升发送性能减少碎片化发送请求
一般情况下一个Kafka Topic会有多个分区。Kafka Producer客户端在向服务端发送消息时需要先确认往哪个Topic的哪个分区发送。我们给同一个分区发送多条消息时Producer客户端将相关消息打包成一个Batch批量发送到服务端。Producer客户端在处理Batch时是有额外开销的。一般情况下小Batch会导致Producer客户端产生大量请求造成请求队列在客户端和服务端的排队并造成相关机器的CPU升高从而整体推高了消息发送和消费延迟。一个合适的Batch大小可以减少发送消息时客户端向服务端发起的请求次数在整体上提高消息发送的吞吐和延迟。
Batch机制Kafka Producer端主要通过两个参数进行控制
batch.size : 发往每个分区Partition的消息缓存量消息内容的字节数之和不是条数。达到设置的数值时就会触发一次网络请求然后Producer客户端把消息批量发往服务器。如果batch.size设置过小有可能影响发送性能和稳定性。建议保持默认值16384。单位字节。linger.ms : 每条消息在缓存中的最长时间。若超过这个时间Producer客户端就会忽略batch.size的限制立即把消息发往服务器。建议根据业务场景 设置linger.ms在100~1000之间。单位毫秒。
因此Kafka Producer客户端什么时候把消息批量发送至服务器是由batch.size和linger.ms共同决定的。您可以根据具体业务需求进行调整。为了提升发送的性能保障服务的稳定性 建议您设置batch.size16384和linger.ms1000。
② 只有发送到相同分区的消息才会被放到同一个Batch中因此决定一个Batch如何形成的一个因素是Kafka Producer端设置的分区策略。 Kafka Producer允许通过设置Partitioner的实现类来选择适合自己业务的分区。
在消息指定Key的情况下Kafka Producer的默认策略是对消息的Key进行哈希然后根据哈希结果选择分区保证相同Key的消息会发送到同一个分区。
在消息没有指定Key的情况下Kafka 版2.4版本之前的默认策略是循环使用主题的所有分区将消息以轮询的方式发送到每一个分区上。但是这种默认策略Batch的效果会比较差在实际使用中可能会产生大量的小Batch从而使得生产者延迟增加。鉴于该默认策略对无Key消息的分区效率低问题Kafka 在2.4版本引入了黏性分区策略Sticky Partitioning Strategy。
黏性分区策略主要解决无Key消息分散到不同分区造成小Batch问题。其主要策略是如果一个分区的Batch完成后就随机选择另一个分区然后后续的消息尽可能地使用该分区。这种策略在短时间内看会将消息发送到同一个分区如果拉长整个运行时间消息还是可以均匀地发布到各个分区上的。这样可以避免消息出现分区倾斜同时还可以降低延迟提升服务整体性能。
如果您使用的 KafkaProducer客户端是2.4及以上版本在未指定分区和消息键的情况下默认的分区策略就采用黏性分区策略。如果您使用的Producer客户端版本小于2.4可以根据黏性分区策略原理自行实现分区策略然后通过参数partitioner.class设置指定的分区策略。
关于黏性分区策略实现可以参考如下代码实现该代码的实现逻辑主要是根据一定的时间间隔切换一次分区。
public class MyStickyPartitioner implements Partitioner {// 记录上一次切换分区时间。private long lastPartitionChangeTimeMillis 0L;// 记录当前分区。private int currentPartition -1;// 分区切换时间间隔可以根据实际业务选择切换分区的时间间隔。private long partitionChangeTimeGap 100L;public void configure(MapString, ? configs) {}public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 获取所有分区信息。ListPartitionInfo partitions cluster.partitionsForTopic(topic);int numPartitions partitions.size();if (keyBytes null) {ListPartitionInfo availablePartitions cluster.availablePartitionsForTopic(topic);int availablePartitionSize availablePartitions.size();// 判断当前可用分区。if (availablePartitionSize 0) {handlePartitionChange(availablePartitionSize);return availablePartitions.get(currentPartition).partition();} else {handlePartitionChange(numPartitions);return currentPartition;}} else {// 对于有key的消息根据key的哈希值选择分区。return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}}private void handlePartitionChange(int partitionNum) {long currentTimeMillis System.currentTimeMillis();// 如果超过分区切换时间间隔则切换下一个分区否则还是选择之前的分区。if (currentTimeMillis - lastPartitionChangeTimeMillis partitionChangeTimeGap|| currentPartition 0 || currentPartition partitionNum) {lastPartitionChangeTimeMillis currentTimeMillis;currentPartition Utils.toPositive(ThreadLocalRandom.current().nextInt()) % partitionNum;}}public void close() {}
}