网站网上商城建设方案,软件设计师教程,网站怎么快速收录,北京王府井在哪个区1 前言
在Spring Kafka中#xff0c;可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。
2 基于Kafka管理的拦截器
Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor和 org.apache.kafka.cli…1 前言
在Spring Kafka中可以通过配置拦截器来实现对生产者和消费者消息的拦截。拦截器可以用来记录日志、修改消息等等。
2 基于Kafka管理的拦截器
Kafka原生提供的拦截器接口是org.apache.kafka.clients.producer.ProducerInterceptor和 org.apache.kafka.clients.consumer.ConsumerInterceptor, 示例如下
2.1 定义拦截器
生产者拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptorString, String {Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String record) {// 在发送消息之前操作System.out.println(Sending message: record.value());return record; // 继续发送}Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}Overridepublic void close() {// 资源清理}Overridepublic void configure(MapString, ? configs) {// 可以在这里获取配置}
}2.2 定义消费者拦截器
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;public class CustomConsumerInterceptor implements ConsumerInterceptorString, String {Overridepublic void configure(MapString, ? configs) {// 配置拦截器}Overridepublic ConsumerRecordsString, String onConsume(ConsumerRecordsString, String records) {// 处理接收到的消息records.forEach(record - {System.out.println(Consumed message: record.value());});return records;}Overridepublic void onCommit(MapTopicPartition, OffsetAndMetadata offsets) {}Overridepublic void close() {// 资源清理}
}2.3 添加拦截器
方式一通过工厂自定义器设置拦截器
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer, DefaultKafkaConsumerFactoryCustomizer {Overridepublic void customize(DefaultKafkaProducerFactory?, ? producerFactory) {producerFactory.updateConfigs(Map.of(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}Overridepublic void customize(DefaultKafkaConsumerFactory?, ? consumerFactory) {consumerFactory.updateConfigs(Map.of(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName()));}
}方式二通过配置设置拦截器
spring:kafka:producer:properties:interceptor.classes: org.example.kafka.CustomProducerInterceptorconsumer:properties:interceptor.classes: org.example.kafka.CustomConsumerInterceptor2.4 拦截器使用Spring容器中的Bean
上面的方法可以看到拦截器由于没有在Spring容器中管理则无法使用容器中其他Bean来做业务处理那么可以另外一种策略达到让拦截器受Spring容器管理的需求, 已消息生产者拦截器为例 Bean定义
Component
public class MyComponent {public void checkMessage(String message) {System.out.println(Sending message: message);}
}生产者拦截器
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;public class CustomProducerInterceptor implements ProducerInterceptorString, String {private MyComponent myComponent;Overridepublic ProducerRecordString, String onSend(ProducerRecordString, String record) {myComponent.checkMessage(record.value());return record; // 继续发送}Overridepublic void onAcknowledgement(RecordMetadata metadata, Exception exception) {}Overridepublic void close() {// 资源清理}Overridepublic void configure(MapString, ? configs) {myComponent configs.get(myComponent);}
}设置拦截器
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaProducerFactoryCustomizer;
import org.springframework.boot.autoconfigure.kafka.DefaultKafkaConsumerFactoryCustomizer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;Component
public class KafkaProducerCustomizer implements DefaultKafkaProducerFactoryCustomizer {Autowiredprivate MyComponent myComponent;Overridepublic void customize(DefaultKafkaProducerFactory?, ? producerFactory) {producerFactory.updateConfigs(Map.of(myComponent, myComponent,ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName()));}
}3 基于Spring-Kafka管理的拦截器
基于Kafka管理的拦截器对于消费消息的拦截只能做到批量消费级别ConsumerRecords如果要对单条消息拦截可以使用Spring-Kafka提供的org.springframework.kafka.listener.RecordInterceptor接口。
3.1 单条消息拦截接口定义
由于此拦截器是受Spring容器管理的所以可以通过Component注解自动注入到容器中进行自动拦截。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.listener.RecordInterceptor;Component
public class CustomRecordInterceptor implements RecordInterceptorObject, Object {Overridepublic ConsumerRecordObject, Object intercept(ConsumerRecordObject, Object record) {System.out.println(record.topic());return record;}
}