新乡市封丘县建设局网站,响应式网页设计简单,seo管理系统易语言,网站背景图片代码Producer是Kakfa模型中生产者组件#xff0c;也就是Kafka架构中数据的生产来源#xff0c;虽然其整体是比较简单的组件#xff0c;但依然有很多细节需要细品一番。比如Kafka的Producer实现原理是什么#xff0c;怎么发送的消息#xff1f;IO通讯模型是什么#xff1f;在实…Producer是Kakfa模型中生产者组件也就是Kafka架构中数据的生产来源虽然其整体是比较简单的组件但依然有很多细节需要细品一番。比如Kafka的Producer实现原理是什么怎么发送的消息IO通讯模型是什么在实际工作中怎么调优来实现高效性
简单的生产者程序 一、客户端初始化 KafkaProducer
new KafkaProducer() 是Producer初始化过程比如Interceptor、Serializer、Partitioner、RecordAccumulator等。当我们使用KafkaProducer发送消息的时候消息会经过拦截器Interceptor、序列化器Serializer和分区器Partitioner最后会暂存到消息收集器RecordAccumulator中最终读取按批次发送。
以下跟踪比较核心的机制流程
1、 初始化RecordAccumulator记录累加器
简单介绍RecordAccumulator可以理解为Producer发送数据缓冲区Producer数据发送时并不会直接连接Broker后一条一条的发送而是会将数据Record放入RecordAccumulator中按批次发送。
2、初始化Sender的Iothread在Producer在初始化过程中会额外的创建一个ioThread。 二、Send方法
到此位置Kafka只是做了一些初始化的工作没有与kafka集群建立连接更没有相关元数据信息。那继续看send中的doSend方法。
private FutureRecordMetadata doSend(ProducerRecordK, V record, Callback callback) {TopicPartition tp null;try {try {// waitOnMetadata更新元数据clusterAndWaitTime this.waitOnMetadata(record.topic(),record.partition(), this.maxBlockTimeMs);} catch (KafkaException var19) {Cluster cluster clusterAndWaitTime.cluster;........byte[] serializedKey;try {// 序列化serializedKey this.keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException var18) {........byte[] serializedValue;try {serializedValue this.valueSerializer.serialize(record.topic(), record.headers(), record.value());} // 获取元数据中partition信息int partition this.partition(record, serializedKey, serializedValue, cluster);// ..........// 数据append到accumulator中RecordAppendResult result this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);..........return result.future;
1、WaitonMetadat元数据更新
方法内部会优先判断当前Cluster是否存在元数据partitions如果不存在意味着还没有建立连接获取元数据此时它会wakeup唤醒sender线程。 注意此时Cluster并不是完全时空的它已经有指定的Node列表信息。 在早期版本的时候元数据是存储在zookeeper中的 元数据指的是集群中分区信息、节点信息、以及节点、主题、分区的映射关系等。在生产者启动的时候没有元数据的支撑是无法进行数据的发送的等于瞎子。但是zookeeper存储元数据在并发场景下会对zookeeper产生网卡压力那就意味着要保障Kakfa可靠性的前提就要保障zookeeper的可靠性。
所以在1.0版本之后Kafka将元数据维护在了Broker节点中。Producer可以通过Borker获取元数据减少对zookeeper的依赖。只有一些核心的内容交给zookeeper做分布式协调。
2、Sender线程run方法
Sender线程中run方法一个while(runing)这是一中Loop过程一种常见的响应式编程方式比如Redis服务中也是一种EventLoop事件轮询过程。 其内部核心方法NetWorkClient.poll实现了客户端连接、数据发送、事件处理工作。 metadataUpdater.maybeUpdate方法在第一次被执行时因为没有元数据节点信息会执行this.maybeUpdate(now, node)方法方法内部实现了initiateConnect方法用于客户端建立连接其底层就是使用的Java Nio的Selector多路复用器。 建立连接之后,nioSelector.select()等待事件响应。
之后触发handleCompletedReceives处理器进行元数据同步过程。
注意: 在完成元数据更新以后metadata.update会调用 this.notifyAll()唤醒阻塞的main线程进行数据发送工作。 到此为止主线程waitOnMetadata方法完成元数据的更新。
之后main就开始处理Serializer序列化获取partition元数据信息以及数据发送工作。
3、RecordAccumulator 记录累加器
生产者在发送数据时并不是建立连接后每消息发送的而是会将消息按批次发送。RecordAccumulator 对象中batches会为每一个TopicPartition维护一个双端队列。用于缓存record数据。
ConcurrentMapTopicPartition, DequeProducerBatch batches
TopicPartition 主题分区缓冲区按照主题分为不同的双端队列
DequeProducerBatch 双端队列ProducerBatch一批次的数据多个数据默认容量16k 结构如下 生产者在往batches中添加数据时使用了Sychronized所以Producer在多线程场景下是线程安全的。 为什么要有RecordAccumulator
RecordAccumulator的主要作用是暂存Main Thread发送过来的消息然后Sender Thread就可以从RecordAccumulator中批量的获取到消息减少单个消息获取的请求次数减少网卡IO压力提升性能效率。
相关参数配置以及调优点
1、RecordAccumulator buffer.memory默认大小32mb
指每一个new KafkaProducer中RecordAccumulator的batches所有承载的最大buffer.memory32mb。 设置方式properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 3210241024);
如果RecordAccumulator 缓存空间满时会进行阻塞等待数据被消费如果指定时间内消息没有发送除去即仍然是满状态则抛出异常默认60s。
设置方式properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60*1000);
优化点 根据业务需求如果TopicPartition较多而数据量很大时这是及时单个TopicPartition中batche很少可能总的容量也超过32mb这时可以扩大buffer_memery大小。
2、Kafka中单个batche大小默认为16k。
指每个batche大小为16k。batche用于存储数据Record。
如果record 16k则batche可以存储多个数据此时batche空间是会被重复利用的。
如果record 16k则当前record会额外申请存储空间使用完后销毁。 优化点batche大小需要根据业务评估不要有过多大record存在确保每一个batche可以容纳record尽量减少内存空间的频繁申请和销毁以及内存碎片化。 3、同步阻塞和非阻塞的选择
RecordAccumulator用于支持分批次发送数据。在KafkaProducer中send方法是异步接口通过 send.get()方法可以使其阻塞等待数据返回。
实现同步的发送数据需要等待kafka接收了record后响应producer才会进行下一个record发送。此时虽然会有更高一致性但RecordAccumulator就失去了意义。
非阻塞send情况下当生产和消费端IO不对称时可以通过LINGRE_MS_CONFG 30 来要求sender线程每次拉取RecordAccumulator中数据时等待一段时间再拉取尽量确保按批次拉取减少更多的网络IO。
设置方式properties.put(ProducerConfig.LINGRE_MS_C0NFG , 0);
继续内容分析
到此当Main线程将数据append到RecordAccumulator容器后其核心的工作就结束了此时它也会调用sender.wakeup告知已经有数据需要处理了并确保sender线程不会select阻塞住。
Sender线程是一个Loop过程在发送数据过程中会从RecordAccumulator中拉取批次数据进行打包发送并不是一个个batche发送。默认封装的包大小为1mb。
设置方式pp.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG , String.valueOf(1 * 1024 * 1024))
Sender线程在真正发送数据前还额外存储了Request数据到InFilgntRequest飞行中的包InFilgntRequest 默认大小为5意思是指生产者向kafka发送5个包request后都没有回应时则停止发送变成阻塞状态。
这种设计在同步发送过程没有作用的因为同步过程是每请求返回的。 SEND_BUFFER_CONFIG 发送缓冲区配置、RECEIVE_BUFFER_CONFIG 接收缓冲区配置这两个就是IO层的缓冲区配置了不同的操作系统可能不一样。设置成-1代表默认使用系统分配大小。
pp.setProperty(ProducerConfig.SEND_BUFFER_CONFIG , String.valueOf(32 * 1024));
pp.setProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG , String.valueOf(32 * 1024));
查看内核默认配置 到此Producer整个数据发送流程机制就清楚了Ack的设定涉及到Broker数据同步和Consumer消费状态这块单独再进行分析。
总结一下
1、Producer的实现是由Main线程和Sender线程组合完成的。
Main线程核心完成了数据的输入、Producer初始化和数据append到RecordAccumulator工作具体的元数据的更新、数据发送等IO操作都是都Sender线程完成。
Sender线程工作模式是中间件中比较常见的响应式编程模式。其在Loop过程中进行客户端连接、元数据更新、数据打包发送等工作。
2、Kakfa中IO操作封装了Java中Nio的实现Selector底层是多路复用器的实现而不是netty。
3、Producer发送数据过程并不是简单的一条一条数据发送其内部封装RecordAccumulator、Batche、Request包可以实现按批次发送数据减少IO次数。同时结合FilghtRequest飞行中请求大小限制确保kafka未正常响应时抛出异常防止数据丢失。在开发过程中可以通过调整参数来达到优化目的。