当前位置: 首页 > news >正文

新乡市封丘县建设局网站响应式网页设计简单

新乡市封丘县建设局网站,响应式网页设计简单,seo管理系统易语言,网站背景图片代码Producer是Kakfa模型中生产者组件#xff0c;也就是Kafka架构中数据的生产来源#xff0c;虽然其整体是比较简单的组件#xff0c;但依然有很多细节需要细品一番。比如Kafka的Producer实现原理是什么#xff0c;怎么发送的消息#xff1f;IO通讯模型是什么#xff1f;在实…Producer是Kakfa模型中生产者组件也就是Kafka架构中数据的生产来源虽然其整体是比较简单的组件但依然有很多细节需要细品一番。比如Kafka的Producer实现原理是什么怎么发送的消息IO通讯模型是什么在实际工作中怎么调优来实现高效性 简单的生产者程序 一、客户端初始化  KafkaProducer new KafkaProducer() 是Producer初始化过程比如Interceptor、Serializer、Partitioner、RecordAccumulator等。当我们使用KafkaProducer发送消息的时候消息会经过拦截器Interceptor、序列化器Serializer和分区器Partitioner最后会暂存到消息收集器RecordAccumulator中最终读取按批次发送。 以下跟踪比较核心的机制流程 1、 初始化RecordAccumulator记录累加器 简单介绍RecordAccumulator可以理解为Producer发送数据缓冲区Producer数据发送时并不会直接连接Broker后一条一条的发送而是会将数据Record放入RecordAccumulator中按批次发送。 2、初始化Sender的Iothread在Producer在初始化过程中会额外的创建一个ioThread。 二、Send方法 到此位置Kafka只是做了一些初始化的工作没有与kafka集群建立连接更没有相关元数据信息。那继续看send中的doSend方法。 private FutureRecordMetadata doSend(ProducerRecordK, V record, Callback callback) {TopicPartition tp null;try {try {// waitOnMetadata更新元数据clusterAndWaitTime this.waitOnMetadata(record.topic(),record.partition(), this.maxBlockTimeMs);} catch (KafkaException var19) {Cluster cluster clusterAndWaitTime.cluster;........byte[] serializedKey;try {// 序列化serializedKey this.keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException var18) {........byte[] serializedValue;try {serializedValue this.valueSerializer.serialize(record.topic(), record.headers(), record.value());} // 获取元数据中partition信息int partition this.partition(record, serializedKey, serializedValue, cluster);// ..........// 数据append到accumulator中RecordAppendResult result this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);..........return result.future; 1、WaitonMetadat元数据更新 方法内部会优先判断当前Cluster是否存在元数据partitions如果不存在意味着还没有建立连接获取元数据此时它会wakeup唤醒sender线程。 注意此时Cluster并不是完全时空的它已经有指定的Node列表信息。 在早期版本的时候元数据是存储在zookeeper中的 元数据指的是集群中分区信息、节点信息、以及节点、主题、分区的映射关系等。在生产者启动的时候没有元数据的支撑是无法进行数据的发送的等于瞎子。但是zookeeper存储元数据在并发场景下会对zookeeper产生网卡压力那就意味着要保障Kakfa可靠性的前提就要保障zookeeper的可靠性。 所以在1.0版本之后Kafka将元数据维护在了Broker节点中。Producer可以通过Borker获取元数据减少对zookeeper的依赖。只有一些核心的内容交给zookeeper做分布式协调。 2、Sender线程run方法 Sender线程中run方法一个while(runing)这是一中Loop过程一种常见的响应式编程方式比如Redis服务中也是一种EventLoop事件轮询过程。 其内部核心方法NetWorkClient.poll实现了客户端连接、数据发送、事件处理工作。 metadataUpdater.maybeUpdate方法在第一次被执行时因为没有元数据节点信息会执行this.maybeUpdate(now, node)方法方法内部实现了initiateConnect方法用于客户端建立连接其底层就是使用的Java Nio的Selector多路复用器。 建立连接之后,nioSelector.select()等待事件响应。 之后触发handleCompletedReceives处理器进行元数据同步过程。 注意: 在完成元数据更新以后metadata.update会调用 this.notifyAll()唤醒阻塞的main线程进行数据发送工作。 到此为止主线程waitOnMetadata方法完成元数据的更新。 之后main就开始处理Serializer序列化获取partition元数据信息以及数据发送工作。 3、RecordAccumulator 记录累加器 生产者在发送数据时并不是建立连接后每消息发送的而是会将消息按批次发送。RecordAccumulator 对象中batches会为每一个TopicPartition维护一个双端队列。用于缓存record数据。 ConcurrentMapTopicPartition, DequeProducerBatch batches TopicPartition 主题分区缓冲区按照主题分为不同的双端队列 DequeProducerBatch 双端队列ProducerBatch一批次的数据多个数据默认容量16k 结构如下 生产者在往batches中添加数据时使用了Sychronized所以Producer在多线程场景下是线程安全的。 为什么要有RecordAccumulator RecordAccumulator的主要作用是暂存Main Thread发送过来的消息然后Sender Thread就可以从RecordAccumulator中批量的获取到消息减少单个消息获取的请求次数减少网卡IO压力提升性能效率。 相关参数配置以及调优点 1、RecordAccumulator buffer.memory默认大小32mb 指每一个new KafkaProducer中RecordAccumulator的batches所有承载的最大buffer.memory32mb。 设置方式properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 3210241024); 如果RecordAccumulator 缓存空间满时会进行阻塞等待数据被消费如果指定时间内消息没有发送除去即仍然是满状态则抛出异常默认60s。 设置方式properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 60*1000); 优化点 根据业务需求如果TopicPartition较多而数据量很大时这是及时单个TopicPartition中batche很少可能总的容量也超过32mb这时可以扩大buffer_memery大小。 2、Kafka中单个batche大小默认为16k。 指每个batche大小为16k。batche用于存储数据Record。 如果record 16k则batche可以存储多个数据此时batche空间是会被重复利用的。 如果record 16k则当前record会额外申请存储空间使用完后销毁。 优化点batche大小需要根据业务评估不要有过多大record存在确保每一个batche可以容纳record尽量减少内存空间的频繁申请和销毁以及内存碎片化。 3、同步阻塞和非阻塞的选择 RecordAccumulator用于支持分批次发送数据。在KafkaProducer中send方法是异步接口通过 send.get()方法可以使其阻塞等待数据返回。 实现同步的发送数据需要等待kafka接收了record后响应producer才会进行下一个record发送。此时虽然会有更高一致性但RecordAccumulator就失去了意义。 非阻塞send情况下当生产和消费端IO不对称时可以通过LINGRE_MS_CONFG 30 来要求sender线程每次拉取RecordAccumulator中数据时等待一段时间再拉取尽量确保按批次拉取减少更多的网络IO。 设置方式properties.put(ProducerConfig.LINGRE_MS_C0NFG , 0); 继续内容分析 到此当Main线程将数据append到RecordAccumulator容器后其核心的工作就结束了此时它也会调用sender.wakeup告知已经有数据需要处理了并确保sender线程不会select阻塞住。 Sender线程是一个Loop过程在发送数据过程中会从RecordAccumulator中拉取批次数据进行打包发送并不是一个个batche发送。默认封装的包大小为1mb。 设置方式pp.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG , String.valueOf(1 * 1024 * 1024)) Sender线程在真正发送数据前还额外存储了Request数据到InFilgntRequest飞行中的包InFilgntRequest 默认大小为5意思是指生产者向kafka发送5个包request后都没有回应时则停止发送变成阻塞状态。 这种设计在同步发送过程没有作用的因为同步过程是每请求返回的。 SEND_BUFFER_CONFIG 发送缓冲区配置、RECEIVE_BUFFER_CONFIG 接收缓冲区配置这两个就是IO层的缓冲区配置了不同的操作系统可能不一样。设置成-1代表默认使用系统分配大小。 pp.setProperty(ProducerConfig.SEND_BUFFER_CONFIG , String.valueOf(32 * 1024)); pp.setProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG , String.valueOf(32 * 1024)); 查看内核默认配置 到此Producer整个数据发送流程机制就清楚了Ack的设定涉及到Broker数据同步和Consumer消费状态这块单独再进行分析。 总结一下 1、Producer的实现是由Main线程和Sender线程组合完成的。 Main线程核心完成了数据的输入、Producer初始化和数据append到RecordAccumulator工作具体的元数据的更新、数据发送等IO操作都是都Sender线程完成。 Sender线程工作模式是中间件中比较常见的响应式编程模式。其在Loop过程中进行客户端连接、元数据更新、数据打包发送等工作。 2、Kakfa中IO操作封装了Java中Nio的实现Selector底层是多路复用器的实现而不是netty。 3、Producer发送数据过程并不是简单的一条一条数据发送其内部封装RecordAccumulator、Batche、Request包可以实现按批次发送数据减少IO次数。同时结合FilghtRequest飞行中请求大小限制确保kafka未正常响应时抛出异常防止数据丢失。在开发过程中可以通过调整参数来达到优化目的。
http://www.hkea.cn/news/14486641/

相关文章:

  • asp.net 4.0网站开...荥阳郑州网站建设
  • 网站建设有什么出路系统开发师
  • 全国分类信息网站排名网站备案许可证号查询
  • 女子医院网站设计怎么做哈尔滨网站建设丿薇
  • 有哪些做婚品的网站开一个网站建设公司需要什么软件
  • 北京做商铺的网站抖音代运营带货费用怎么收费
  • 网站素材模板 站长新加坡网站域名
  • 南宁快速建站模板福建网站开发
  • 嘉兴网站建设需要多少钱vue做的网站影响收录么
  • 做视频广告在哪个网站能够赚钱开发商
  • 网站建设中遇到的问题免费网站空间10g
  • 昆山网站维护北京做彩右影影视公司网站
  • 做网站策划案手工制作月饼
  • 常州做网站价位客户管理系统排行榜
  • 软件论坛网站有哪些微山网站建设哪家便宜
  • 做网站如何容易被百度抓取百度查询网
  • 可以做淘宝店铺开关灯网站如何做网站电话
  • 百家号seo怎么做seo教程技术整站优化
  • 盛泽建设局网站建立免费个人网站
  • 找人做网站属于了解些什么呢合肥seo管理
  • 哪个网站可以做c 的项目怎样退订3d卫星街景会员费用
  • 人才网网站建设方案wordpress mce
  • 广州市哪有做网站的广州网站备案
  • 用模板做的网站多少钱如何做网站将数据上传
  • 网站开发语言选择服装设计公司排行榜
  • 表格如何做网站wordpress内容函数
  • 长沙雨花区建设局网站做网站网站代理怎么找客源
  • 鹤壁做网站公司电话wordpress 恋月
  • 四川鸿业建设集团网站成都专业做网站的公司
  • gudao网站建设建筑网站 法人签字