当前位置: 首页 > news >正文

网站产品怎么改顺序网站导航栏有哪些

网站产品怎么改顺序,网站导航栏有哪些,站长网网站模板,fixed wordpress4.SpringBoot集成Kafka 文章目录 4.SpringBoot集成Kafka1.入门示例2.yml完整配置3.关键配置注释说明1. 生产者优化参数2. 消费者可靠性配置3. 监听器高级特性4. 安全认证配置 4.配置验证方法5.不同场景配置模板场景1#xff1a;高吞吐日志收集场景2#xff1a;金融级事务消息…4.SpringBoot集成Kafka 文章目录 4.SpringBoot集成Kafka1.入门示例2.yml完整配置3.关键配置注释说明1. 生产者优化参数2. 消费者可靠性配置3. 监听器高级特性4. 安全认证配置 4.配置验证方法5.不同场景配置模板场景1高吞吐日志收集场景2金融级事务消息场景3跨数据中心同步 5.高级配置1.事务支持2.消息重试与死信队列 来源参考的deepseek如有侵权联系立删 1.入门示例 1.pom依赖 dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId /dependency 2.KafkaProducer消息生产者配置 Component Slf4j public class KafkaProducer {private HashMap mapnew HashMap();Autowiredprivate KafkaTemplateInteger,String kafkaTemplate;public void send(String topic,String msg){log.info(开始发送消息topic{};message{},topic,msg);ListenableFutureSendResultInteger,String sendkafkaTemplate.send(topic, msg);//消息确认机制send.addCallback(new ListenableFutureCallbackSendResultInteger,String(){Overridepublic void onSuccess(SendResultInteger, String result) {log.info(消息发送成功topic{};message{},topic,msg);}Overridepublic void onFailure(Throwable ex) {//落库操作map.put(topic,msg);}});} }springboot3.x写法 import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Service; import lombok.RequiredArgsConstructor;Service RequiredArgsConstructor public class KafkaProducerService {private final KafkaTemplateString, String kafkaTemplate;// 同步发送阻塞等待确认public void sendMessageSync(String topic, String key, String value) {kafkaTemplate.send(topic, key, value).whenComplete((result, ex) - {if (ex null) {System.out.printf(发送成功topic%s, partition%d, offset%d%n,result.getRecordMetadata().topic(),result.getRecordMetadata().partition(),result.getRecordMetadata().offset());} else {System.err.println(发送失败 ex.getMessage());}});}// 异步发送默认方式public void sendMessageAsync(String topic, String message) {kafkaTemplate.send(topic, message);} } Spring Boot 2.xsend() 返回 ListenableFutureSendResult支持 addCallback() 回调。Spring Boot 3.xsend() 返回 CompletableFutureSendResult弃用 ListenableFuture因此需要使用 CompletableFuture 的 API如 whenComplete。 3.KafkaConsumer消息消费 Component Slf4j public class KafkaConsumer {private ListString existnew ArrayList();KafkaListener(topics {lx},groupId lx)public void consumer(ConsumerRecordInteger,String record){if (exist.contains(record.value())){log.error(不满足幂等校验);}log.info(消息消费成功topic{}message{}, record.topic(), record.value());exist.add(record.value());} }import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.stereotype.Component;Component public class KafkaConsumerService {// 单个消息消费手动提交偏移量KafkaListener(topics test-topic, groupId spring-group)public void listenMessage(String message, Acknowledgment ack) {System.out.println(收到消息 message);ack.acknowledge(); // 手动提交}// 批量消费需配置 listener.typebatchKafkaListener(topics batch-topic, groupId spring-group)public void listenBatch(ListString messages, Acknowledgment ack) {messages.forEach(msg - System.out.println(批量消息 msg));ack.acknowledge();} } 4.yml配置文件 生产者配置 #kafka配置 spring:kafka:#kafka集群地址# bootstraps-server: 192.168.25.100:9092bootstrap-servers: 47.122.26.22:9092producer:#批量发送的数据量大小batch-size: 1#可用发送数量的最大缓存buffer-memory: 33554432#key序列化器key-serializer: org.apache.kafka.common.serialization.StringSerializer#value序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer#达到多少时间后会发送properties:linger.ms: 1# 禁止生产者触发 Topic 创建请求allow.auto.create.topics: false#代表集群中从节点都持久化后才认为发送成功acks: -1 消费者配置 spring:kafka:#kafka集群地址bootstraps-server: 192.168.25.100:9092consumer:enable-auto-commit: falsekey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:session.timeout.ms: 15000# 禁用生产者触发 Topic 元数据请求时自动创建allow.auto.create.topics: falsegroup-id: testauto-offset-reset: earliestlistener:ack-mode: manual_immediate # 精准控制offset提交concurrency: 3 # 并发消费者数type: batch 5.实体类 Data public class KafkaRequest {/*** 主题*/private String topic;/*** 消息*/private String message; }6.消息发送 RestController Slf4j public class KafkaController {private final String topiclx;private int temp1;Autowiredprivate KafkaProducer producer;/*** 下单** param kafkaRequest* return null*/RequestMapping(/test01)public void test01(KafkaRequest kafkaRequest){log.info(test01测试成功topic:{};message:{},kafkaRequest.getTopic(), kafkaRequest.getMessage());producer.send(kafkaRequest.getTopic(), kafkaRequest.getMessage());}RequestMapping(/test02)public void test02(KafkaRequest kafkaRequest){log.info(test02测试成功topic:{};message:{},topic, temp);producer.send(topic, String.valueOf(temp));temp;} } kafka启动方式 ./kafka-server-start.sh ../config/server.properties2.yml完整配置 spring:kafka:# 基础配置必填项bootstrap-servers: localhost:9092 # Kafka集群地址多节点用逗号分隔 client-id: spring-boot-app # 客户端标识日志追踪用# 生产者配置 producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer # 键序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # 值序列化器acks: all # 消息确认机制all表示所有副本确认最高可靠性retries: 5 # 发送失败重试次数需配合幂等性使用batch-size: 16384 # 批量发送缓冲区大小单位字节linger-ms: 50 # 发送延迟等待时间毫秒提高吞吐量buffer-memory: 33554432 # 生产者内存缓冲区大小默认32MBcompression-type: snappy # 消息压缩算法可选gzip/lz4/zstdtransaction-id-prefix: tx- # 开启事务时需配置前缀需配合Transactional# 消费者配置 consumer:group-id: app-consumer-group # 消费者组ID同一组共享分区auto-offset-reset: earliest # 无Offset时策略earliest从头/latest最新enable-auto-commit: false # 关闭自动提交Offset推荐手动提交key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializermax-poll-records: 500 # 单次poll最大消息数避免OOMfetch-max-wait-ms: 500 # 消费者等待broker返回数据的最长时间isolation-level: read_committed # 事务消息隔离级别read_committed/read_uncommitted# 监听器配置高级优化listener:type: single # 监听器类型single单条/batch批量ack-mode: manual # Offset提交模式manual手动/batch批量提交concurrency: 3 # 消费者线程数建议等于分区数poll-timeout: 3000 # poll方法超时时间毫秒# 消息重试与死信队列容错机制retry:topic:attempts: 3 # 最大重试次数initial-interval: 1000 # 初始重试间隔毫秒multiplier: 2.0 # 重试间隔倍数指数退避dead-letter-topic: dlq-${topic} # 死信队列命名规则自动创建# 安全协议企业级场景properties:security.protocol: SASL_PLAINTEXT # 安全协议如PLAINTEXT/SASL_SSLsasl.mechanism: PLAIN # SASL认证机制ssl.truststore.location: /path/to/truststore.jks# 自定义业务配置非Kafka标准参数app:kafka:topics:input-topic: user-events # 业务输入Topicoutput-topic: processed-events # 业务输出Topic 3.关键配置注释说明 1. 生产者优化参数 参数说明推荐值acksall确保所有ISR副本写入成功防止数据丢失高可靠性场景必选compression-typesnappy减少网络带宽占用提升吞吐量消息体1KB时启用transaction-id-prefix支持跨分区原子性写入需配合Transactional注解金融交易类业务必配 2. 消费者可靠性配置 参数说明注意事项enable-auto-commitfalse避免消息处理失败但Offset已提交导致数据丢失需手动调用ack.acknowledge()isolation-levelread_committed只消费已提交的事务消息需与生产者事务配置联动 3. 监听器高级特性 参数使用场景示例typebatch批量消费提升吞吐量适用于日志处理等实时性要求低的场景concurrency3并发消费者数需与Topic分区数一致避免资源浪费 4. 安全认证配置 spring:kafka:properties:security.protocol: SASL_SSLsasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required usernameadmin passwordsecret;企业级必配生产环境需启用SSL加密SASL认证 4.配置验证方法 启动检查添加ConfigurationProperties(prefix spring.kafka)绑定配置到Bean通过单元测试验证注入值日志监控开启DEBUG日志观察生产者/消费者连接状态 logging:level:org.springframework.kafka: DEBUGAdminClient 工具通过编程方式检查Topic元数据 Autowired private KafkaAdminClient adminClient;public void checkTopic() {MapString, TopicDescription topics adminClient.describeTopics(user-events);topics.values().forEach(topic - System.out.println(topic)); }5.不同场景配置模板 场景1高吞吐日志收集 producer:compression-type: lz4batch-size: 65536linger-ms: 100 consumer:auto-offset-reset: latestenable-auto-commit: true # 允许少量数据丢失以换取性能场景2金融级事务消息 producer:acks: allretries: 10transaction-id-prefix: fin-tx- consumer:isolation-level: read_committedenable-auto-commit: false场景3跨数据中心同步 spring:kafka:bootstrap-servers: kafka-dc1:9092,kafka-dc2:9092properties:client.dns.lookup: use_all_dns_ips # 支持多IP解析reconnect.backoff.ms: 1000 # 断线重连策略5.高级配置 1.事务支持 // 配置事务管理器 Bean public KafkaTransactionManagerString, String transactionManager(ProducerFactoryString, String producerFactory) {return new KafkaTransactionManager(producerFactory); }// 使用事务发送 Transactional public void sendWithTransaction() {kafkaTemplate.send(topic1, msg1);kafkaTemplate.send(topic2, msg2); }2.消息重试与死信队列 spring:kafka:listener:retry:max-attempts: 3backoff:initial-interval: 1000multiplier: 2.0dead-letter-topic: my-dlt-topic # 死信队列
http://www.hkea.cn/news/14532430/

相关文章:

  • 宁夏网站建设多少钱微信商城有哪些
  • 专题类网站成都建设网站
  • 发展历程 网站建设《电子商务网站开发》实验报告
  • 简单的网站类型有哪些内容济宁住房和城乡建设局网站首页
  • 重庆门户网站达濠市政建设有限公司网站
  • 微商城网站建设多少钱全国十大物联网平台公司
  • 泰安网站建设公司带网站开发报价评估
  • 网站建网站建设和优网页设计网站名字
  • 湖北高端网站建设公司邮箱名称
  • 网站的安全维护什么是网络营销基本思想
  • 深圳市网站制作怎么网站建设多少钱
  • 不建网站如何做淘宝客做米业的企业网站
  • 迅睿cms建站教程做货代用的网站
  • 贵阳公司网站建设网站代运营方案
  • 免费单页网站模板wordpress网站视频播放
  • 动态倒计时网站模板软件开发模型有几种各有什么特点
  • 做高端品牌网站建设上海到北京
  • 网站研发公司寻找定制型网站建设
  • 自己做网站兼职滨海网站建设
  • 手机版做网站建站工具搭建网站
  • 东莞专业做淘宝网站推广scala做网站
  • 网站互动推广前后端分离实现网站开发
  • 简单的管理系统有哪些长沙seo霜天博客
  • 定西网站建设公司网页设计模板图片素材下载
  • 利用php做直播网站wordpress 磁贴主题
  • 购物网站如何建设陕西找人做网站多少钱
  • 网站设计基础语言不包括这些内容商务门户网站怎么做
  • 个人介绍网站怎么做免费的个人网页
  • 东莞网站开发营销最近新闻内容
  • 宁波网站建设seo公司网站建设设计