当前位置: 首页 > news >正文

网站模板制作流程哪些平台可以打小广告

网站模板制作流程,哪些平台可以打小广告,做网站会什么软件,一条龙 有哪些服务Kafka 消费组位移消费者 API命令行Kafka : 基于日志结构(log-based)的消息引擎 消费消息时,只是从磁盘文件上读取数据,不会删除消息数据位移数据能由消费者控制,能很容易修改位移的值,实现重复消费历史数据…

Kafka 消费组位移

  • 消费者 API
  • 命令行

Kafka : 基于日志结构(log-based)的消息引擎

  • 消费消息时,只是从磁盘文件上读取数据,不会删除消息数据
  • 位移数据能由消费者控制,能很容易修改位移的值,实现重复消费历史数据

重设位移的两个维度 :

  • 位移维度 : 根据位移值重设 : 把消费者的位移值重设成自定义位移值
  • 时间维度 : 将位移调整成 > 该时间的最小位移

7 种重设策略 :

维度策略含义
位移维度Earliest位移调整当前最早位移处
Latest位移调整到当前最新位移处
Current位移调整到当前最新提交位移处
Specified-Offset位移调整成指定位移
Shift-By-N位移调整到当前位移 + N 处
时间维度DataTime位移调整到大于给定时间的最小位移处
Duration位移调整到离当前时间指定间隔的位移处

Earliest 策略 : 将位移调整到主题当前最早位移处

  • 不一定是 0,很久的消息会被 Kafka 自动删除,所以当前最早位移可能是 > 0 的值
  • 当要重新消费主题的所有消息,就用 Earliest 策略

Latest 策略 : 把位移重设成最新末端位移

  • 当向某个主题发送 15 条消息,最新末端位移是 15
  • 当要跳过所有历史消息,从最新的消息处开始消费,就用 Latest 策略

Current 策略 : 将位移调整成消费者当前提交的最新位移

  • 修改消费者代码,并重启消费者,结果发现代码有问题,回滚前的代码变更,还把位移重设为消费者重启时的位置

Specified-Offset 策略 : 消费者把位移值调整到你指定的位移处

  • 消费者处理某条错误消息时 , 能手动跳过此消息的处理
  • 生产中 , 出现 corrupted 消息无法被消费时,消费者会抛出异常,无法继续工作

Specified-Offset 策略 : 指定位移的绝对数值

Shift-By-N 策略 : 指定位移的相对数值 : 跳过一段消息的距离

  • 把位移重设成当前位移的前 100 条位移处,指定 N 为 -100

DateTime : 指定一个时间,将位移重置到该时间之后的最早位移处

  • 使用场景 : 重新消费昨天的数据,并使用该策略重设位移到昨天 0 点

Duration 策略 : 相对的时间间隔,将位移调整到距离当前给定时间间隔的位移处,格式是 PnDTnHnMnS

  • Java 8 引入 Duration : 以字母 P 开头,后面由 4 部分组成 (D、H、M、S : 天、小时、分钟、秒)
  • 将位移调回到 15 分钟前,就指定 PT0H15M0S

消费者 API

Earliest 策略 :

Properties consumerProperties = new Properties();
// 禁止自动提交位移
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 重设的消费者组的组 ID
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);String topic = "test";  // 要重设位移的 Kafka 主题 
try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {consumer.subscribe(Collections.singleton(topic));consumer.poll(0);// 一次性构造主题的所有分区对象consumer.seekToBeginning( consumer.partitionsFor(topic).stream().map(partitionInfo ->new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));
} 

Latest 策略 :

consumer.seekToEnd(consumer.partitionsFor(topic).stream().map(partitionInfo -> new TopicPartition(topic, partitionInfo.partition())).collect(Collectors.toList()));

Current 策略 : 获取当前提交的最新位移 :

consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())).forEach(tp -> {long committedOffset = consumer.committed(tp).offset();consumer.seek(tp, committedOffset);
});

Specified-Offset 策略 :

long targetOffset = 1234L;for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp = new TopicPartition(topic, info.partition());consumer.seek(tp, targetOffset);
}

实现 Shift-By-N 策略

for (PartitionInfo info : consumer.partitionsFor(topic)) {TopicPartition tp = new TopicPartition(topic, info.partition());// 假设向前跳 123 条消息long targetOffset = consumer.committed(tp).offset() + 123L; consumer.seek(tp, targetOffset);
}

DateTime 策略 : 重设位移到 2022 年 12 月 11 日晚上 8 点

long ts = LocalDateTime.of(2022, 12, 11, 20, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp -> ts));for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset());
}

Duration 策略 : 将位移调回 30 分钟前

Map<TopicPartition, Long> timeToSearch = consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(topic, info.partition())).collect(Collectors.toMap(Function.identity(), tp -> System.currentTimeMillis() - 30 * 1000  * 60));for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : consumer.offsetsForTimes(timeToSearch).entrySet()) {consumer.seek(entry.getKey(), entry.getValue().offset());
}

命令行

Earliest 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-earliest –execute

Latest 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-latest --execute

Current 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-current --execute

Specified-Offset 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--all-topics --to-offset <offset> --execute

Shift-By-N 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--shift-by <offset_N> --execute

DateTime 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--to-datetime 2019-06-20T20:00:00.000 --execute

Duration 策略 :

bin/kafka-consumer-groups.sh \
--bootstrap-server kafka-host:port \
--group test-group --reset-offsets \
--by-duration PT0H30M0S --execute
http://www.hkea.cn/news/580363/

相关文章:

  • 广东海外建设监理有限公司官方网站2345网址导航是病毒吗
  • 深圳网站制作培训宁波网络营销公司
  • 网站建设方案书 模板长清区seo网络优化软件
  • 简述网站的推广策略产品设计
  • 商贸有限公司网站建设此网站服务器不在国内维护
  • 常州个人做网站制作小程序的软件
  • 郑州做网站公司dz论坛如何seo
  • 十堰商城网站建设网络营销seo优化
  • 小欢喜林磊儿什么网站做家教福州seo推广外包
  • 许昌网站开发博客营销
  • 做网站用jquery爱站网关键词挖掘
  • wordpress手动裁剪seo营销推广服务公司
  • 英文网站建设网站海南网站制作公司
  • 网页设计与网站建设主要内容软文营销的特点
  • 一起做网站17广州最新小组排名
  • 最专业的网站设计公司有哪些论坛企业推广
  • 单页网站怎么做外链个人网页
  • 宁波城乡住房建设局网站有效的网络推广
  • 网站建设 深圳销售crm客户管理系统
  • 高端网站开发设计站长之家字体
  • 免费网站建站工具购买域名的网站
  • 淘宝联盟怎么做网站百度网站提交
  • 前端做用vue做后台多还是做网站多青岛网站快速排名优化
  • 岳阳网站开发公司海淀区seo多少钱
  • 2017年做网站维护总结百度搜索软件
  • 南京网站建设公司点击器原理
  • 网站怎么编辑搜狗网站提交入口
  • 自建网站做外贸的流程广告推广方式
  • 警告欺骗网站模板免费注册
  • 获取网站访客信息seo分析师招聘