当前位置: 首页 > news >正文

广州天美展览公司网站受欢迎的网站建设案例

广州天美展览公司网站,受欢迎的网站建设案例,上海互联网公司,做网站优化的价格Kafka事务 消息中间件的消息保障的3个级别 At most once 至多一次。数据丢失。At last once 至少一次。数据冗余Exactly one 精准一次。好#xff01;#xff01;#xff01; 如何区分只要盯准提交位移、消费消息这两个动作的时机就可以了。 当#xff1a;先消费消息、…Kafka事务 消息中间件的消息保障的3个级别 At most once 至多一次。数据丢失。At last once 至少一次。数据冗余Exactly one 精准一次。好 如何区分只要盯准提交位移、消费消息这两个动作的时机就可以了。 当先消费消息、再提交位移。 如果提交位移这一步挂了就会再消费一遍消息。重复消费》〉》至少一次 当先提交位移、再消费消息。 提议位移成功、消费消息失败那么数据就丢失了》〉》至多一次 如何精准一次呢 幂等和事务 幂等 对接口的多次调用所产生的结果和一次调用的结果是一样的。 即第一次调用中途挂了再次调用一次调用 为true 如何实现 在v2版本的消息存储格式用有两个字段。produce_id简称pid 、first sequence 每个新的生产者实例在初始化的时候都会被分配一个pid每个pid消息发送到每一个分区都有序列号 sequence序列号会从0开始递增每发送一条消息PID,分区 对应的序列号的值会➕1。这个序列号值SN在broker的内存中维护。只有当SN_newSN_old1. broker才会接收这个消息。 如SN_new SN_old1 说明消息重复了这个消息可以直接丢掉。 如SN_newSN_old1 说明消息丢失了有数据还没有卸写入。抛乱序异常OutOforderSequenceException。 即用序列号来保证消息的顺序消费。 注意 所记录的这个序列号是针对 每一对PID,分区 所以这个幂等实现的是单会话、单分区的。 如何保证多个分区之间的幂等性呢 事务 保证对多个分区写入操作的原子性要么全部成功、要么全部失败。将应用程序的生产消息、消费消息、提交消费位移当作原子操作来处理。 用户显示指定一个事务id transactionalId。这个事务id是唯一的 从生产者角度来考虑事务保证了生产者会话消息的幂等发送 和跨生产者会话的事务恢复. 生产者会话消息的幂等发送如有有两个相同事务id的生产者新的创建了 旧的就会被kill某个生产者实例宕机了新的生产者实例可以保证未完成的旧事务要么被提交 要没被中断 实现过程以consume-transform-produce为例。 package com.hzbank.yjj.transaction;import com.hzbank.yjj.producer.CustomerPartitioner; import com.hzbank.yjj.producer.ProducerlnterceptorPrefix; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer;import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Properties;public class TransactionConsumeTransformProduce {public static final String brokerList localhost:9092;public static Properties getConsumerProps(){Properties props new Properties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());props.put(ConsumerConfig.GROUP_ID_CONFIG,groupId);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);return props;}public static Properties getProducerProps(){Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,transactionalId);return props;}public static void main(String[] args) {//初始化生产者和消费者KafkaConsumerString, String consumer new KafkaConsumer(getConsumerProps());consumer.subscribe(Collections.singletonList(topic-source));KafkaProducerString, String producer new KafkaProducer(getProducerProps());//初始化事务producer.initTransactions();while (true){ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000));if(!records.isEmpty()){HashMapTopicPartition, OffsetAndMetadata offsets new HashMap();//开启事务producer.beginTransaction();try {for (TopicPartition partition : records.partitions()) {ListConsumerRecordString, String partitionRecords records.records(partition);for (ConsumerRecordString, String record : partitionRecords) {System.out.println(获取到了topic-source发送过来的数据record.value());System.out.println(do some );ProducerRecordString, String producerRecord new ProducerRecord(topic-sink, record.key(), record.value());producer.send(producerRecord);}// 获取最近一次的消费位移long lastConsumedOffset partitionRecords.get(partitionRecords.size() - 1).offset();offsets.put(partition,new OffsetAndMetadata(lastConsumedOffset1));}//提交消费位移producer.sendOffsetsToTransaction(offsets,groupId);//提交事务producer.commitTransaction();} catch (ProducerFencedException e) {System.out.println(异常了);producer.abortTransaction();}}}}} 1. 找到TransactionCoordinator。 TransactionCoordinator负责分配和管理事务。 FindCoordinatorRequest 发送请求找到TransactionCoordinator所在的broker节点。返回其对应的node_id、 host、 port 信息 transactionalId 的哈希值计算主题_transaction_state 中的分区编号 根据分区leader副本找到所在的broker节点极为Transaction Coordinator节点 2. 获取pid 通过InitProducerIdRequest向TransactionCoordinator 获取pid 为当前生产者分配一个pid。 String transactionalId; 事务id int transactionTimeoutMs; 事务状态更新超时时间 3. 保存pid TransactionCoordinator 第一次收到事务id会和对应pid保存下来以消息事务日志消息的形式保存到主题_transaction_state中实现持久化 InitProducerIdRequest还会出发一下任务 - 增加pid对应的producer_epoch.具有相同 PID 但 producer_epoch 小 于该 producer_叩och 的其他生产者新开启的事务将被拒绝 。 - 恢复( Commit)或中止( Ab。此)之前的生 产 者未完成的 事务4. 开启事务 通过 KafkaProduc町的 beginTransaction方法。调用该方法后生产者本 地会标记己经开启了 一个新的事务 只有在生产者发送第一条消息之后 TransactionCoordinator 才会认为该事务 己经开启 。 5. Consume-Transform-Produce 整个事务处理数据。 AddPartitionsToTxnRequest让 TransactionCoordinator 将transactionld, TopicPartition的对应关系存储在主题 transaction state 中 ProduceRequest生产者通过 ProduceRequest 请求发送消息( ProducerBatch)到用户 自定义主题中 AddOffsetsToTxnRequestTransactionCoordinator 收到这个AddOffsetsToTxnRequest请求通过 groupId 来推导出在一consumer_offsets 中的分区 TxnOffsetCommitRequest发送 TxnOffsetCommitRequest 请求给 GroupCoordinator从而将本次事务中 包含的消费位移信息 offsets 存储到主题 consumer offsets 中 6. 提交或者终止事务 KafkaProducer 的 commitTransaction()方法或 abortTransaction()方法。 写不下去了暂时就先理解这么多了后面再多结合源码去看看。 参考书籍《深入理解 Kafka:核心设计与实践原理》
http://www.hkea.cn/news/14332901/

相关文章:

  • 网站定制开发一般多久wordpress中文附件乱码
  • 艺术网站建设dede大气企业网站模板(一屏展示)
  • 网站网址模板企业手机网站建设公司
  • 网页设计最牛的网站建设wap网站开发公司
  • 宿迁网站网站建设西安短视频培训
  • 网站ftp上传工具哪个好用vps lnmp wordpress
  • 深圳企业社保网站官网黄页是什么东西
  • 大眼睛网站建设长沙模板网站长沙网站建设
  • 网站建设有必要做手机端吗外贸快车
  • 北京著名网站建设公司网站桌面图标怎么做
  • 便宜的广州网站建设服务上杭建设局网站
  • 吴江区经济开发区建设工程网站网页设计需求分析
  • 关于单位网站建设的免费购物平台都有哪些
  • 新视网站建设联系qq玉屏网络推广公司
  • 化妆品品牌网站建设学做网站培训
  • 专业网站设计的公司wordpress做企业网站
  • 网站 app 哪个先做网站建设与维护学什么科目
  • 如何建设网站子页零基础学做网站教程
  • 番禺网站开发哪家好铜川公司做网站
  • 网站设计的主题wordpress 评论后
  • 网络推广建立站点如何建立wordpress 添加下载按钮
  • 东莞专业做网站的公司行业数据分析网站
  • 白石洲附近做网站公司免费咨询骨科医生回答在线
  • akm建站系统宁乡网站建设公司
  • 海南网站建设设计海口网站模板系统
  • 简单网站建设山东临沂网站推广
  • 潜江公司做网站网站开发环境包括什么
  • 网站开发文件结构组成十大新零售公司
  • 自己做的网站 打开了没有图片深圳市宝安区网站建设
  • 福州市高速公路建设指挥部网站建设125摩托车价格及图片