wordpress网站从零,房地产设计方案,医疗wordpress,郑州高端定制网站建设前言
最近做项目#xff0c;需要支持kafka多数据源#xff0c;实际上我们也可以通过代码固定写死多套kafka集群逻辑#xff0c;但是如果需要不修改代码扩展呢#xff0c;因为kafka本身不处理额外逻辑#xff0c;只是起到削峰#xff0c;和数据的传递#xff0c;那么就需…前言
最近做项目需要支持kafka多数据源实际上我们也可以通过代码固定写死多套kafka集群逻辑但是如果需要不修改代码扩展呢因为kafka本身不处理额外逻辑只是起到削峰和数据的传递那么就需要对架构做一定的设计了。
准备test
kafka本身非常容易上手如果我们需要单元测试引入jar依赖JDK使用1.8当然也可以使用JDK17 dependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactIdversion2.7.17/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdversion2.7.17/versionscopetest/scope/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion2.9.13/version/dependencydependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka-test/artifactIdversion2.9.13/versionscopetest/scope/dependency!-- https://mvnrepository.com/artifact/org.testcontainers/kafka --dependencygroupIdorg.testcontainers/groupIdartifactIdkafka/artifactIdversion1.20.1/versionscopetest/scope/dependency/dependencies
修改发送者和接收者
Component
public class KafkaProducer {private static final Logger LOGGER LoggerFactory.getLogger(KafkaProducer.class);Autowiredprivate KafkaTemplateString, String kafkaTemplate;public void send(String topic, String payload) {LOGGER.info(sending payload{} to topic{}, payload, topic);kafkaTemplate.send(topic, payload);}
}Component
public class KafkaConsumer {private static final Logger LOGGER LoggerFactory.getLogger(KafkaConsumer.class);private String payload;KafkaListener(topics ${test.topic})public void receive(ConsumerRecord?, ? consumerRecord) {LOGGER.info(----------------received payload{}, consumerRecord.toString());payload consumerRecord.toString();}public String getPayload() {return payload;}public void setPayload(String payload) {this.payload payload;}
}
然后写main方法随意写一个即可配置入戏
spring:kafka:consumer:auto-offset-reset: earliestgroup-id: mytest
test:topic: embedded-test-topic
写一个单元测试
SpringBootTest
EmbeddedKafka(partitions 1, brokerProperties { listenersPLAINTEXT://localhost:9092, port9092 })
class DemoMainTest {Autowiredprivate KafkaConsumer consumer;Autowiredprivate KafkaProducer producer;Value(${test.topic})private String topic;Testvoid embedKafka() throws InterruptedException {String data Sending with our own simple KafkaProducer;producer.send(topic, data);Thread.sleep(3000);assertThat(consumer.getPayload(), containsString(data));Thread.sleep(10000);}
}
通过
EmbeddedKafka(partitions 1, brokerProperties { listenersPLAINTEXT://localhost:9092, port9092 })
直接模拟一个kafka里面有一些注解参数可以设置broker的 数量端口zk的端口topic和partition数量等 实际上是通过embed zk和kafka来mock了一个kafka server
单元测试运行成功 思路
有了kafka单元测试后根据springboot map可以接收多套配置的方式不就实现了kafka的多数据源的能力貌似非常简单但是如果需要不用修改代码消费端怎么办发送者可以手动创建消费端是注解方式topic等信息在注解参数中注解参数值却是常量代码写死的那么我们就需要
不让Springboot自动扫描根据配置手动扫描注册bean字节码生成bean就可以根据参数
这里没考虑把消费端和发送者的额外处理逻辑写在这里的做法统一处理kafka类似kafka网关因为kafka一般不会仅一套且不会仅有一个topic需要分发处理比如slbfeign等。
kafka消费者的原理
其实kafka发送者和消费者也是类似逻辑但是spring-kafka通过注解方式实现消费者如果我们使用原生kafka的kafkaconsumer那么只需要通过Map接收参数然后自己实现消费逻辑就行但是spring-kafka毕竟做了很多公共没必要的逻辑拉取消费的一系列参数线程池管理等处理措施。看看Spring-kafka的消费者初始化原理
BeanPostProcessor的kafka实现
org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor
看前置处理 什么都没做所以所有逻辑都在后置处理
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {if (!this.nonAnnotatedClasses.contains(bean.getClass())) {Class? targetClass AopUtils.getTargetClass(bean);//找到注解消费注解KafkaListener打在类上一般不用这种方式CollectionKafkaListener classLevelListeners findListenerAnnotations(targetClass);//类上KafkaListener注解的标志final boolean hasClassLevelListeners classLevelListeners.size() 0;final ListMethod multiMethods new ArrayList();//找到消费方法去每个方法上找KafkaListener注解MapMethod, SetKafkaListener annotatedMethods MethodIntrospector.selectMethods(targetClass,(MethodIntrospector.MetadataLookupSetKafkaListener) method - {SetKafkaListener listenerMethods findListenerAnnotations(method);return (!listenerMethods.isEmpty() ? listenerMethods : null);});if (hasClassLevelListeners) {//类上KafkaListener注解的时候通过另外的注解KafkaHandler的方式找到消费方法SetMethod methodsWithHandler MethodIntrospector.selectMethods(targetClass,(ReflectionUtils.MethodFilter) method -AnnotationUtils.findAnnotation(method, KafkaHandler.class) ! null);multiMethods.addAll(methodsWithHandler);}//实际上大部分类是没有kafka消费注解的效率并不高但是因为日志是trace所以日志一般默认看不见//注解KafkaListener打在方法上的时候if (annotatedMethods.isEmpty() !hasClassLevelListeners) {this.nonAnnotatedClasses.add(bean.getClass());this.logger.trace(() - No KafkaListener annotations found on bean type: bean.getClass());}else {// Non-empty set of methodsfor (Map.EntryMethod, SetKafkaListener entry : annotatedMethods.entrySet()) {Method method entry.getKey();for (KafkaListener listener : entry.getValue()) {//核心逻辑processKafkaListener(listener, method, bean, beanName);}}this.logger.debug(() - annotatedMethods.size() KafkaListener methods processed on bean beanName : annotatedMethods);}//注解KafkaListener打在类上实际上处理逻辑跟KafkaListener打在方法上差不多if (hasClassLevelListeners) {processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);}}return bean;}
如果是注解打在类上如下 本文中的示例的KafkaListener打在方法上所以分析
processKafkaListener
其实原理都一样spring-kafka不会写2份一样逻辑只是读取处理的参数略有不同
protected synchronized void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean,String beanName) {//检查代理Method methodToUse checkProxy(method, bean);//终端设计思想Spring很多地方都这样设计尤其是swaggerMethodKafkaListenerEndpointK, V endpoint new MethodKafkaListenerEndpoint();endpoint.setMethod(methodToUse);//bean的名称这里需要定制全局唯一否则多个listener会冲突String beanRef kafkaListener.beanRef();this.listenerScope.addListener(beanRef, bean);String[] topics resolveTopics(kafkaListener);TopicPartitionOffset[] tps resolveTopicPartitions(kafkaListener);if (!processMainAndRetryListeners(kafkaListener, bean, beanName, methodToUse, endpoint, topics, tps)) {//核心逻辑processListener(endpoint, kafkaListener, bean, beanName, topics, tps);}this.listenerScope.removeListener(beanRef);}
继续
processListener
protected void processListener(MethodKafkaListenerEndpoint?, ? endpoint, KafkaListener kafkaListener,Object bean, String beanName, String[] topics, TopicPartitionOffset[] tps) {//MethodKafkaListenerEndpoint赋值了这个很关键processKafkaListenerAnnotation(endpoint, kafkaListener, bean, topics, tps);//容器工厂String containerFactory resolve(kafkaListener.containerFactory());KafkaListenerContainerFactory? listenerContainerFactory resolveContainerFactory(kafkaListener,containerFactory, beanName);//注册终端最终生效this.registrar.registerEndpoint(endpoint, listenerContainerFactory);}
processKafkaListenerAnnotation
private void processKafkaListenerAnnotation(MethodKafkaListenerEndpoint?, ? endpoint,KafkaListener kafkaListener, Object bean, String[] topics, TopicPartitionOffset[] tps) {endpoint.setBean(bean);endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);endpoint.setId(getEndpointId(kafkaListener));endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));endpoint.setTopicPartitions(tps);endpoint.setTopics(topics);endpoint.setTopicPattern(resolvePattern(kafkaListener));endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), clientIdPrefix));endpoint.setListenerInfo(resolveExpressionAsBytes(kafkaListener.info(), info));String group kafkaListener.containerGroup();if (StringUtils.hasText(group)) {Object resolvedGroup resolveExpression(group);if (resolvedGroup instanceof String) {endpoint.setGroup((String) resolvedGroup);}}String concurrency kafkaListener.concurrency();if (StringUtils.hasText(concurrency)) {endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, concurrency));}String autoStartup kafkaListener.autoStartup();if (StringUtils.hasText(autoStartup)) {endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, autoStartup));}resolveKafkaProperties(endpoint, kafkaListener.properties());endpoint.setSplitIterables(kafkaListener.splitIterables());if (StringUtils.hasText(kafkaListener.batch())) {endpoint.setBatchListener(Boolean.parseBoolean(kafkaListener.batch()));}endpoint.setBeanFactory(this.beanFactory);resolveErrorHandler(endpoint, kafkaListener);resolveContentTypeConverter(endpoint, kafkaListener);resolveFilter(endpoint, kafkaListener);}
各种参数注册尤其是其中的ID和handler是必须的不注册不行笔者试着自己设置endpoint发现其中的各种handler注册。
解决方式
先写一个工具类用于创建一些关键类的bean定义了发送者创建消费者工厂类消费者的创建由注解扫描实现引用工具类的消费者容器工厂bean。
public class KafkaConfigUtil {private DefaultKafkaProducerFactoryString, String initProducerFactory(KafkaProperties kafkaProperties) {return new DefaultKafkaProducerFactory(kafkaProperties.buildProducerProperties());}public KafkaTemplateString, String initKafkaTemplate(KafkaProperties kafkaProperties) {return new KafkaTemplate(initProducerFactory(kafkaProperties));}private ConsumerFactory? super Integer, ? super String initConsumerFactory(KafkaProperties kafkaProperties) {return new DefaultKafkaConsumerFactory(kafkaProperties.buildConsumerProperties());}public KafkaListenerContainerFactoryConcurrentMessageListenerContainerInteger, StringinitKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {ConcurrentKafkaListenerContainerFactoryInteger, String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(initConsumerFactory(kafkaProperties));return factory;}
}1、通过Map接收多数据源
定义一个配置接收器仿造zuul的模式 ConfigurationProperties(prefix spring.kafka)
public class KafkaMultiProperties {private MapString, KafkaProperties routes;public MapString, KafkaProperties getRoutes() {return routes;}public void setRoutes(MapString, KafkaProperties routes) {this.routes routes;}
}
每一个route其实就说一套kafka再写一个Configuration注入配置文件
Configuration
EnableConfigurationProperties(KafkaMultiProperties.class)
public class KafkaConfiguration {}
这样就可以注入配置了从此可以根据配置的不同初始化不同的kafka集群逻辑。 这样就可以把自定义的Properties注入Springboot的placeholder中。
2、通过自定义扫描支持消费者
如果消费者或者发送者逻辑需要写在当前kafka网关应用那么只能通过自定义扫描方式支持配置不同所有配置的生成者和消费者必须代码实现逻辑通过配置加载方式自定义扫描注入bean即可。以消费者为例生产者不涉及注解发送方式相对简单。
public class KafkaConfigInit {private KafkaMultiProperties kafkaMultiProperties;private ConfigurableApplicationContext applicationContext;public KafkaConfigInit(KafkaMultiProperties kafkaMultiProperties,ConfigurableApplicationContext applicationContext) {this.kafkaMultiProperties kafkaMultiProperties;this.applicationContext applicationContext;}PostConstructpublic void initConfig() {if (kafkaMultiProperties null || kafkaMultiProperties.getRoutes() null) return;kafkaMultiProperties.getRoutes().forEach((k, v) - {//register producer by configConfigurableListableBeanFactory beanFactory applicationContext.getBeanFactory();beanFactory.registerSingleton(k _producer, KafkaConfigUtil.initKafkaTemplate(v));//register consumer container factoryKafkaListenerContainerFactoryConcurrentMessageListenerContainerInteger, String kafkaListenerContainerFactory KafkaConfigUtil.initKafkaListenerContainerFactory(v);beanFactory.registerSingleton(k _consumerFactory, kafkaListenerContainerFactory);});}
}
写了一个初始化的bean用于通过配置加载bean。但是有2个问题
消费者是注解方式扫描bean需要根据配置加载不能写在代码里面这里仅仅是注册bean并不会被beanpostprocessor处理
关于第1点
因为需要按照配置加载不能代码写bean的加载逻辑只能自己扫描按照配置加载那么需要自定义扫描注解和扫描包名减少扫描范围提高效率
关于第2点
需要手动执行beanpostprocessor的逻辑即可
show me the code
完善刚刚写的部分代码
写一个注解
Target({ ElementType.TYPE, ElementType.ANNOTATION_TYPE })
Retention(RetentionPolicy.RUNTIME)
Documented
public interface KafkaConfigConsumer {String beanId() default ;
}
通过beanId区分配置文件的key_consumer可以作为唯一标识定义一种标准
可以使用Spring的
PathMatchingResourcePatternResolver
自己解析resources信息来拿到写的自定义注解的类然后生成对象注入Spring
public class KafkaConfigInit {private KafkaMultiProperties kafkaMultiProperties;private ConfigurableApplicationContext applicationContext;private KafkaListenerAnnotationBeanPostProcessor?,? kafkaListenerAnnotationBeanPostProcessor;private static final MapString, Object consumerMap new ConcurrentHashMap();public KafkaConfigInit(KafkaMultiProperties kafkaMultiProperties, ConfigurableApplicationContext applicationContext, KafkaListenerAnnotationBeanPostProcessor?, ? kafkaListenerAnnotationBeanPostProcessor) {this.kafkaMultiProperties kafkaMultiProperties;this.applicationContext applicationContext;this.kafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationBeanPostProcessor;}PostConstructpublic void initConfig() throws IOException {scanConsumer();if (kafkaMultiProperties null || kafkaMultiProperties.getRoutes() null) return;kafkaMultiProperties.getRoutes().forEach((k, v) - {//register producer by configConfigurableListableBeanFactory beanFactory applicationContext.getBeanFactory();beanFactory.registerSingleton(k _producer, KafkaConfigUtil.initKafkaTemplate(v));//register consumer container factoryKafkaListenerContainerFactoryConcurrentMessageListenerContainerInteger, String kafkaListenerContainerFactory KafkaConfigUtil.initKafkaListenerContainerFactory(v);beanFactory.registerSingleton(k _containerFactory, kafkaListenerContainerFactory);beanFactory.registerSingleton(k_consumer, consumerMap.get(k_consumer));kafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(consumerMap.get(k_consumer), k_consumer);});}private void scanConsumer() throws IOException {SimpleMetadataReaderFactory register new SimpleMetadataReaderFactory();PathMatchingResourcePatternResolver resolver new PathMatchingResourcePatternResolver();Resource[] resources resolver.getResources(ResourcePatternResolver.CLASSPATH_ALL_URL_PREFIX com/feng/kafka/demo/init/*);Arrays.stream(resources).forEach((resource)-{try {MetadataReader metadataReader register.getMetadataReader(resource);if (metadataReader.getAnnotationMetadata().hasAnnotatedMethods(org.springframework.kafka.annotation.KafkaListener)){String className metadataReader.getClassMetadata().getClassName();Class? clazz Class.forName(className);KafkaConfigConsumer kafkaConfigConsumer clazz.getDeclaredAnnotation(KafkaConfigConsumer.class);Object obj clazz.newInstance();consumerMap.put(kafkaConfigConsumer.beanId(), obj);}} catch (IOException | ClassNotFoundException | InstantiationException | IllegalAccessException e) {throw new RuntimeException(e);}});}}
同时需要手动执行
kafkaListenerAnnotationBeanPostProcessor
的逻辑上面有源码分析而且因为要支持多数据源所以需要修改消费者的注解参数
//KafkaListener(topics ${test.topic})
//Component
KafkaConfigConsumer(beanId xxx_consumer)
public class KafkaConsumer {private static final Logger LOGGER LoggerFactory.getLogger(KafkaConsumer.class);private String payload;// KafkaHandlerKafkaListener(topics ${test.topic}, beanRef xxx_listener, containerFactory xxx_containerFactory)public void receive(ConsumerRecord?, ? consumerRecord) {LOGGER.info(----------------received payload{}, consumerRecord.toString());payload consumerRecord.toString();}// other getterspublic String getPayload() {return payload;}public void setPayload(String payload) {this.payload payload;}
}
增加beanRef属性外加我们自己写的注解然后通过Configuration注入
Configuration
EnableConfigurationProperties(KafkaMultiProperties.class)
public class KafkaConfiguration {Beanpublic KafkaConfigInit initKafka(KafkaMultiProperties kafkaMultiProperties,ConfigurableApplicationContext applicationContext,KafkaListenerAnnotationBeanPostProcessor?, ? kafkaListenerAnnotationBeanPostProcessor){return new KafkaConfigInit(kafkaMultiProperties, applicationContext, kafkaListenerAnnotationBeanPostProcessor);}
}
然后修改配置文件和单元测试类
spring:kafka:routes:xxx:producer:batchSize: 1consumer:auto-offset-reset: earliestgroup-id: xxx
然后修改单元测试代码
SpringBootTest
EmbeddedKafka(partitions 1, brokerProperties { listenersPLAINTEXT://localhost:9092, port9092 })
class DemoMainTest {LazyAutowiredprivate KafkaConsumer consumer;Autowiredprivate ApplicationContext applicationContext;Value(${test.topic})private String topic;Testvoid embedKafka() throws InterruptedException {String data Sending with our own simple KafkaProducer;applicationContext.getBean(xxx_producer, KafkaTemplate.class).send(topic, data);Thread.sleep(3000);assertThat(consumer.getPayload(), containsString(data));Thread.sleep(10000);}
}
执行单元测试成功 数据正确发送消费断言正常
3、通过字节码生成支持消费者
上面的方式觉得还是不方便一般而言处理消息和消费消息是异步的即使是同步也不会在消费线程直接处理一般是发送到其他地方接口处理所以为啥还要写消费者代码呢默认一个不就好了但是注解参数确是常量那么字节码生成一个唯一的类即可。
如果生成者和消费者处理逻辑不用网关应用处理那么仅仅是无脑转发类似zuul可以通过字节码生成方式实现统一逻辑主要是消费者毕竟有注解生产者不存在注解可以直接new出来注入bean。
以javassist为例简单些当然asm也可以
show me the code
其实就说把扫描的消费者类变成固定某个类消费
//KafkaListener(topics ${test.topic})
//Component
//KafkaConfigConsumer(beanId xxx_consumer)
public class KafkaConsumer {private static final Logger LOGGER LoggerFactory.getLogger(KafkaConsumer.class);private String payload;// KafkaHandler
// KafkaListener(topics ${test.topic}, beanRef xxx_listener, containerFactory xxx_containerFactory)public void receive(ConsumerRecord?, ? consumerRecord) {LOGGER.info(----------------received payload{}, consumerRecord.toString());payload consumerRecord.toString();}
去掉注解因为注解需要我们动态加上去下一步修改bean创建流程 PostConstructpublic void initConfig() throws IOException {
// scanConsumer();if (kafkaMultiProperties null || kafkaMultiProperties.getRoutes() null) return;kafkaMultiProperties.getRoutes().forEach((k, v) - {//register producer by configConfigurableListableBeanFactory beanFactory applicationContext.getBeanFactory();beanFactory.registerSingleton(k _producer, KafkaConfigUtil.initKafkaTemplate(v));//register consumer container factoryKafkaListenerContainerFactoryConcurrentMessageListenerContainerInteger, String kafkaListenerContainerFactory KafkaConfigUtil.initKafkaListenerContainerFactory(v);beanFactory.registerSingleton(k _containerFactory, kafkaListenerContainerFactory);// beanFactory.registerSingleton(k _consumer, consumerMap.get(k _consumer));Object obj initConsumerBean(k);beanFactory.registerSingleton(k _consumer, obj);kafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization(obj, k _consumer);});}private Object initConsumerBean(String key) {try {ClassPool pool ClassPool.getDefault();CtClass ct pool.getCtClass(com.feng.kafka.demo.init.KafkaConsumer);//修改类名避免重复ct.setName(com.feng.kafka.demo.init.KafkaConsumerkey);//获取类中的方法CtMethod ctMethod ct.getDeclaredMethod(receive);MethodInfo methodInfo ctMethod.getMethodInfo();ConstPool cp methodInfo.getConstPool();//获取注解属性AnnotationsAttribute attribute new AnnotationsAttribute(cp, AnnotationsAttribute.visibleTag);Annotation annotation new Annotation(org.springframework.kafka.annotation.KafkaListener, cp);ArrayMemberValue arrayMemberValue new ArrayMemberValue(cp);arrayMemberValue.setValue(new MemberValue[]{new StringMemberValue(embedded-test-topic, cp)});annotation.addMemberValue(topics, arrayMemberValue);annotation.addMemberValue(beanRef, new StringMemberValue(key_listener, cp));annotation.addMemberValue(containerFactory, new StringMemberValue(key_containerFactory, cp));attribute.addAnnotation(annotation);methodInfo.addAttribute(attribute);byte[] bytes ct.toBytecode();Class? clazz ReflectUtils.defineClass(com.feng.kafka.demo.init.KafkaConsumer key, bytes, Thread.currentThread().getContextClassLoader());return clazz.newInstance();} catch (Exception e) {throw new RuntimeException(e);}}
通过字节码生成和动态加载class方式生成唯一的对象实现通过配置方式支持多数据源不需要写一句消费代码。
单元测试 去掉了断言因为类是动态变化的了。
总结
实际上spring-kafka已经非常完善了spring-kafka插件的支持也很完善不需要关注kafka的消费过程只需要配置即可但是也为灵活性埋下了隐患当然一般而言我们基本上用不到多kafka的情况也不会做一个kafka网关应用不过当业务需要的时候可以设计一套kafka网关应用分发kafka的消息起到一个流量网关的能力解耦业务的应用实现架构的松耦合。