当前位置: 首页 > news >正文

深度网网站建设排超联赛积分榜

深度网网站建设,排超联赛积分榜,网站目录链接怎么做,湖南厦门网站优化消费者位移 消费者位移这一节介绍了消费者位移的基本概念和消息格式,本节我们来聊聊消费位移的提交。 Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费…
消费者位移

消费者位移这一节介绍了消费者位移的基本概念和消息格式,本节我们来聊聊消费位移的提交。

Consumer 需要向 Kafka 汇报自己的位移数据,这个汇报过程被称为提交位移(Committing Offsets)。因为 Consumer 能够同时消费多个分区的数据,所以位移的提交实际上是在分区粒度上进行的,即 Consumer 需要为分配给它的每个分区提交各自的位移数据。

提交位移主要是为了记录Consumer 的消费进度,这样当 Consumer 发生重启之后,就能够从 Kafka 中读取之前提交的位移,从而继续消费,避免以避免重复消费,或消息丢失等。换句话说,位移提交是 Kafka 提供给你的一个工具或语义保障,你负责维持这个语义保障,即如果你提交了位移 X,那么 Kafka 会认为所有位移值小于 X 的消息你都已经成功消费了。

因为位移提交非常灵活,你完全可以提交任何位移值。假设你的 Consumer 消费了 10 条消息,你提交的位移值却是 20,那么从理论上讲就丢失了10条数据;相反地,如果你提交的位移值是 5,那么就重复消费5条数据。所以你对位移提交的管理直接影响了你的 Consumer 所能提供的消息语义保障。

位移提交

从使用角度来说位移提交分为自动提交和手动提交;从 Consumer 的角度来说,位移提交分为同步提交和异步提交。

自动提交

默认情况下就是自动提交,你根本无需关心位移提交的事情,Consumer 端有个参数 enable.auto.commit默认值是 true,即 Consumer 默认自动提交位移的。还有个参数auto.commit.interval.ms,默认值是 5 秒,即每 5 秒会为你自动提交一次位移。

这里我们用一段简单的代码来看看这两个参数怎么使用

Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "kafka_test");// 自动提交props.put("enable.auto.commit", "true");// 间隔2秒  props.put("auto.commit.interval.ms", "2000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("topic"));while (true) {ConsumerRecords<String, String> records = consumer.poll(100);for (ConsumerRecord<String, String> record : records) {// process}}
手动提交

设置 enable.auto.commit 为 false,还需要调用相应的 API 手动提交位移,KafkaConsumer.commitSync()。

// props.put("enable.auto.commit", "false");
while (true) {ConsumerRecords<String, String> records =consumer.poll(Duration.ofSeconds(1));// 处理消息process(records); try {// 同步提交consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常}
}

commitSync()有一个缺陷,提交时Consumer 程序会处于阻塞状态,在生产系统中,因为程序而非资源限制而导致的阻塞都可能是系统的瓶颈,会影响整个应用程序的 TPS。虽然也可以选择拉长提交间隔,但这样做的后果是 Consumer 的提交频率下降,在下次 Consumer 重启回来后,会有更多的消息被重新消费。鉴于这个问题,Kafka 提供了另一个 异步API 方法:KafkaConsumer.commitAsync()。

不过commitAsync 的问题在于,出现问题时它不会自动重试。因为它是异步操作,倘若提交失败后自动重试,那么它重试时提交的位移值可能早已经“过期”或不是最新值了。因此,异步提交的重试其实没有意义,所以 commitAsync 是不会重试的。

我们可以将 commitSync 和 commitAsync 组合使用以规避这样的问题:

   try {while(true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 异步提交规避阻塞commitAysnc(); }
} catch(Exception e) {} finally {try {// 使用同步阻塞式提交兜底consumer.commitSync(); } finally {consumer.close();
}
}

同时使用了 commitSync() 和 commitAsync()。对于常规性、阶段性的手动提交,我们调用 commitAsync() 避免程序阻塞,而在 Consumer 要关闭前,我们调用 commitSync() 方法执行同步阻塞式的位移提交,以确保 Consumer 关闭前能够保存正确的位移数据。将两者结合后,我们既实现了异步无阻塞式的位移管理,也确保了 Consumer 位移的正确性,如果你自行编写代码开发一套 Kafka Consumer 应用,可以尝试使用上面的代码范例来实现手动的位移提交。

其实还有一种更高级的提交方式,就是分批量提交,就不再这里展开,留给大家查资料学习,也欢迎各位同学在评论区交流讨论!

http://www.hkea.cn/news/922183/

相关文章:

  • 广州网站建设兼职网站为什么要做seo
  • 中企动力官网 网站怎么在平台上做推广
  • 教育培训网站建设方案广告宣传费用一般多少
  • 计算机网站设计论文营销排名seo
  • 源码资源国内专业seo公司
  • 丽水微信网站建设报价免费精准客源
  • 广东建设工程中标公示网站google搜索引擎优化
  • 南宁老牌网站建设公司正版google下载
  • 网站做信用认证有必要吗微信朋友圈推广平台
  • 电子政务网站建设要求百度关键词规划师
  • 博客网站开发毕设免费大数据分析网站
  • 深圳教育平台网站建设好消息疫情要结束了
  • 国外设计文章的网站淘宝代运营靠谱吗
  • 市桥网站建设sem论坛
  • 猎头公司是做什么的可靠吗排名优化外包公司
  • 扶贫网站建设关键词查询神器
  • 沈阳酒店企业网站制作公司2023年9月疫情又开始了吗
  • 厦门专业网站建设如何快速推广一个新产品
  • 帮人做传销网站违法吗seo网站排名助手
  • 如何做优品快报下的子网站营销型网站建设目标
  • 用织梦做网站调用乱码营业推广是什么意思
  • 做走私网站北京口碑最好的it培训机构
  • 网站建设OA系统开发it培训机构哪家好
  • 网站运维可以做哪些域名查询网站入口
  • 网站开发的基本语言外贸平台自建站
  • 女生自己做网站营销方法有哪些
  • 怎么自己做网站吓别人金融网站推广圳seo公司
  • 彩票网站的客服有做吗海淀seo搜索优化多少钱
  • 河源哪有做网站网页模板设计
  • 手机网站可以做英文版本吗近三天时政热点