网站产品怎么改顺序,网站导航栏有哪些,站长网网站模板,fixed wordpress4.SpringBoot集成Kafka 文章目录 4.SpringBoot集成Kafka1.入门示例2.yml完整配置3.关键配置注释说明1. 生产者优化参数2. 消费者可靠性配置3. 监听器高级特性4. 安全认证配置 4.配置验证方法5.不同场景配置模板场景1#xff1a;高吞吐日志收集场景2#xff1a;金融级事务消息…4.SpringBoot集成Kafka 文章目录 4.SpringBoot集成Kafka1.入门示例2.yml完整配置3.关键配置注释说明1. 生产者优化参数2. 消费者可靠性配置3. 监听器高级特性4. 安全认证配置 4.配置验证方法5.不同场景配置模板场景1高吞吐日志收集场景2金融级事务消息场景3跨数据中心同步 5.高级配置1.事务支持2.消息重试与死信队列 来源参考的deepseek如有侵权联系立删
1.入门示例
1.pom依赖
dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId
/dependency
2.KafkaProducer消息生产者配置
Component
Slf4j
public class KafkaProducer {private HashMap mapnew HashMap();Autowiredprivate KafkaTemplateInteger,String kafkaTemplate;public void send(String topic,String msg){log.info(开始发送消息topic{};message{},topic,msg);ListenableFutureSendResultInteger,String sendkafkaTemplate.send(topic, msg);//消息确认机制send.addCallback(new ListenableFutureCallbackSendResultInteger,String(){Overridepublic void onSuccess(SendResultInteger, String result) {log.info(消息发送成功topic{};message{},topic,msg);}Overridepublic void onFailure(Throwable ex) {//落库操作map.put(topic,msg);}});}
}springboot3.x写法
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import lombok.RequiredArgsConstructor;Service
RequiredArgsConstructor
public class KafkaProducerService {private final KafkaTemplateString, String kafkaTemplate;// 同步发送阻塞等待确认public void sendMessageSync(String topic, String key, String value) {kafkaTemplate.send(topic, key, value).whenComplete((result, ex) - {if (ex null) {System.out.printf(发送成功topic%s, partition%d, offset%d%n,result.getRecordMetadata().topic(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());} else {System.err.println(发送失败 ex.getMessage());}});}// 异步发送默认方式public void sendMessageAsync(String topic, String message) {kafkaTemplate.send(topic, message);}
}
Spring Boot 2.xsend() 返回 ListenableFutureSendResult支持 addCallback() 回调。Spring Boot 3.xsend() 返回 CompletableFutureSendResult弃用 ListenableFuture因此需要使用 CompletableFuture 的 API如 whenComplete。
3.KafkaConsumer消息消费
Component
Slf4j
public class KafkaConsumer {private ListString existnew ArrayList();KafkaListener(topics {lx},groupId lx)public void consumer(ConsumerRecordInteger,String record){if (exist.contains(record.value())){log.error(不满足幂等校验);}log.info(消息消费成功topic{}message{}, record.topic(), record.value());exist.add(record.value());}
}import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;Component
public class KafkaConsumerService {// 单个消息消费手动提交偏移量KafkaListener(topics test-topic, groupId spring-group)public void listenMessage(String message, Acknowledgment ack) {System.out.println(收到消息 message);ack.acknowledge(); // 手动提交}// 批量消费需配置 listener.typebatchKafkaListener(topics batch-topic, groupId spring-group)public void listenBatch(ListString messages, Acknowledgment ack) {messages.forEach(msg - System.out.println(批量消息 msg));ack.acknowledge();}
}
4.yml配置文件
生产者配置
#kafka配置
spring:kafka:#kafka集群地址# bootstraps-server: 192.168.25.100:9092bootstrap-servers: 47.122.26.22:9092producer:#批量发送的数据量大小batch-size: 1#可用发送数量的最大缓存buffer-memory: 33554432#key序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer#value序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer#达到多少时间后会发送properties:linger.ms: 1# 禁止生产者触发 Topic 创建请求allow.auto.create.topics: false#代表集群中从节点都持久化后才认为发送成功acks: -1
消费者配置
spring:kafka:#kafka集群地址bootstraps-server: 192.168.25.100:9092consumer:enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000# 禁用生产者触发 Topic 元数据请求时自动创建allow.auto.create.topics: falsegroup-id: testauto-offset-reset: earliestlistener:ack-mode: manual_immediate # 精准控制offset提交concurrency: 3 # 并发消费者数type: batch
5.实体类
Data
public class KafkaRequest {/*** 主题*/private String topic;/*** 消息*/private String message;
}6.消息发送
RestController
Slf4j
public class KafkaController {private final String topiclx;private int temp1;Autowiredprivate KafkaProducer producer;/*** 下单** param kafkaRequest* return null*/RequestMapping(/test01)public void test01(KafkaRequest kafkaRequest){log.info(test01测试成功topic:{};message:{},kafkaRequest.getTopic(), kafkaRequest.getMessage());producer.send(kafkaRequest.getTopic(), kafkaRequest.getMessage());}RequestMapping(/test02)public void test02(KafkaRequest kafkaRequest){log.info(test02测试成功topic:{};message:{},topic, temp);producer.send(topic, String.valueOf(temp));temp;}
}
kafka启动方式
./kafka-server-start.sh ../config/server.properties2.yml完整配置
spring:kafka:# 基础配置必填项bootstrap-servers: localhost:9092 # Kafka集群地址多节点用逗号分隔 client-id: spring-boot-app # 客户端标识日志追踪用# 生产者配置 producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer # 键序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值序列化器acks: all # 消息确认机制all表示所有副本确认最高可靠性retries: 5 # 发送失败重试次数需配合幂等性使用batch-size: 16384 # 批量发送缓冲区大小单位字节linger-ms: 50 # 发送延迟等待时间毫秒提高吞吐量buffer-memory: 33554432 # 生产者内存缓冲区大小默认32MBcompression-type: snappy # 消息压缩算法可选gzip/lz4/zstdtransaction-id-prefix: tx- # 开启事务时需配置前缀需配合Transactional# 消费者配置 consumer:group-id: app-consumer-group # 消费者组ID同一组共享分区auto-offset-reset: earliest # 无Offset时策略earliest从头/latest最新enable-auto-commit: false # 关闭自动提交Offset推荐手动提交key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500 # 单次poll最大消息数避免OOMfetch-max-wait-ms: 500 # 消费者等待broker返回数据的最长时间isolation-level: read_committed # 事务消息隔离级别read_committed/read_uncommitted# 监听器配置高级优化listener:type: single # 监听器类型single单条/batch批量ack-mode: manual # Offset提交模式manual手动/batch批量提交concurrency: 3 # 消费者线程数建议等于分区数poll-timeout: 3000 # poll方法超时时间毫秒# 消息重试与死信队列容错机制retry:topic:attempts: 3 # 最大重试次数initial-interval: 1000 # 初始重试间隔毫秒multiplier: 2.0 # 重试间隔倍数指数退避dead-letter-topic: dlq-${topic} # 死信队列命名规则自动创建# 安全协议企业级场景properties:security.protocol: SASL_PLAINTEXT # 安全协议如PLAINTEXT/SASL_SSLsasl.mechanism: PLAIN # SASL认证机制ssl.truststore.location: /path/to/truststore.jks# 自定义业务配置非Kafka标准参数app:kafka:topics:input-topic: user-events # 业务输入Topicoutput-topic: processed-events # 业务输出Topic
3.关键配置注释说明
1. 生产者优化参数
参数说明推荐值acksall确保所有ISR副本写入成功防止数据丢失高可靠性场景必选compression-typesnappy减少网络带宽占用提升吞吐量消息体1KB时启用transaction-id-prefix支持跨分区原子性写入需配合Transactional注解金融交易类业务必配
2. 消费者可靠性配置
参数说明注意事项enable-auto-commitfalse避免消息处理失败但Offset已提交导致数据丢失需手动调用ack.acknowledge()isolation-levelread_committed只消费已提交的事务消息需与生产者事务配置联动
3. 监听器高级特性
参数使用场景示例typebatch批量消费提升吞吐量适用于日志处理等实时性要求低的场景concurrency3并发消费者数需与Topic分区数一致避免资源浪费
4. 安全认证配置
spring:kafka:properties:security.protocol: SASL_SSLsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required usernameadmin passwordsecret;企业级必配生产环境需启用SSL加密SASL认证 4.配置验证方法
启动检查添加ConfigurationProperties(prefix spring.kafka)绑定配置到Bean通过单元测试验证注入值日志监控开启DEBUG日志观察生产者/消费者连接状态 logging:level:org.springframework.kafka: DEBUGAdminClient 工具通过编程方式检查Topic元数据
Autowired
private KafkaAdminClient adminClient;public void checkTopic() {MapString, TopicDescription topics adminClient.describeTopics(user-events);topics.values().forEach(topic - System.out.println(topic));
}5.不同场景配置模板
场景1高吞吐日志收集
producer:compression-type: lz4batch-size: 65536linger-ms: 100
consumer:auto-offset-reset: latestenable-auto-commit: true # 允许少量数据丢失以换取性能场景2金融级事务消息
producer:acks: allretries: 10transaction-id-prefix: fin-tx-
consumer:isolation-level: read_committedenable-auto-commit: false场景3跨数据中心同步
spring:kafka:bootstrap-servers: kafka-dc1:9092,kafka-dc2:9092properties:client.dns.lookup: use_all_dns_ips # 支持多IP解析reconnect.backoff.ms: 1000 # 断线重连策略5.高级配置
1.事务支持
// 配置事务管理器
Bean
public KafkaTransactionManagerString, String transactionManager(ProducerFactoryString, String producerFactory) {return new KafkaTransactionManager(producerFactory);
}// 使用事务发送
Transactional
public void sendWithTransaction() {kafkaTemplate.send(topic1, msg1);kafkaTemplate.send(topic2, msg2);
}2.消息重试与死信队列
spring:kafka:listener:retry:max-attempts: 3backoff:initial-interval: 1000multiplier: 2.0dead-letter-topic: my-dlt-topic # 死信队列