网站建设 管理规范,比较还做的调查网站,怎么制作学校网站,黑马程序员教程目录 一、自动提交offset的相关参数二、消费者#xff08;自动提交 offset#xff09;代码示例 一、自动提交offset的相关参数 官网文档 参数解释 参数描述enable.auto.commi默认值为 true#xff0c;消费者会自动周期性地向服务器提交偏移量。auto.commit.interval.ms如果… 目录 一、自动提交offset的相关参数二、消费者自动提交 offset代码示例 一、自动提交offset的相关参数 官网文档 参数解释 参数描述enable.auto.commi默认值为 true消费者会自动周期性地向服务器提交偏移量。auto.commit.interval.ms如果设置了 enable.auto.commit 的值为 true 则该值定义了消费者偏移量向 Kafka 提交的频率默认 5s。 图解分析
二、消费者自动提交 offset代码示例 消费者自动提交 offset代码 // 自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 提交时间间隔 1秒
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);消费者自动提交 offset代码完整代码 package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerAutoOffset {public static void main(String[] args) {// 配置Properties properties new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092);// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,test3);// 自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);// 提交时间间隔 1秒properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);// 1 创建一个消费者 , helloKafkaConsumerString, String kafkaConsumer new KafkaConsumer(properties);// 2 订阅主题 firstArrayListString topics new ArrayList();topics.add(sevenTopic);kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecordsString, String consumerRecords kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecordString, String consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}