当前位置: 首页 > news >正文

苏州做网站优化的erp教学零基础入门

苏州做网站优化的,erp教学零基础入门,临沂吧网站建设,最好网站建设公司哪家好消息监听容器 1、KafkaMessageListenerContainer 由spring提供用于监听以及拉取消息#xff0c;并将这些消息按指定格式转换后交给由KafkaListener注解的方法处理#xff0c;相当于一个消费者#xff1b; 看看其整体代码结构#xff1a; 可以发现其入口方法为doStart(),…消息监听容器 1、KafkaMessageListenerContainer 由spring提供用于监听以及拉取消息并将这些消息按指定格式转换后交给由KafkaListener注解的方法处理相当于一个消费者 看看其整体代码结构 可以发现其入口方法为doStart(), 往上追溯到实现了SmartLifecycle接口很明显由spring管理其start和stop操作 ListenerConsumer, 内部真正拉取消息消费的是这个结构其 实现了Runable接口简言之它就是一个后台线程轮训拉取并处理消息while true死循环拉取消息。 在doStart方法中会创建ListenerConsumer并交给线程池处理 以上步骤就开启了消息监听过程。 KafkaMessageListenerContainer#doStart protected void doStart() {if (isRunning()) {return;}ContainerProperties containerProperties getContainerProperties();if (!this.consumerFactory.isAutoCommit()) {AckMode ackMode containerProperties.getAckMode();if (ackMode.equals(AckMode.COUNT) || ackMode.equals(AckMode.COUNT_TIME)) {Assert.state(containerProperties.getAckCount() 0, ackCount must be 0);}if ((ackMode.equals(AckMode.TIME) || ackMode.equals(AckMode.COUNT_TIME)) containerProperties.getAckTime() 0) {containerProperties.setAckTime(5000);}}Object messageListener containerProperties.getMessageListener();Assert.state(messageListener ! null, A MessageListener is required);if (containerProperties.getConsumerTaskExecutor() null) {SimpleAsyncTaskExecutor consumerExecutor new SimpleAsyncTaskExecutor((getBeanName() null ? : getBeanName()) -C-);containerProperties.setConsumerTaskExecutor(consumerExecutor);}Assert.state(messageListener instanceof GenericMessageListener, Listener must be a GenericListener);this.listener (GenericMessageListener?) messageListener;ListenerType listenerType ListenerUtils.determineListenerType(this.listener);if (this.listener instanceof DelegatingMessageListener) {Object delegating this.listener;while (delegating instanceof DelegatingMessageListener) {delegating ((DelegatingMessageListener?) delegating).getDelegate();}listenerType ListenerUtils.determineListenerType(delegating);}// 这里创建了监听消费者对象this.listenerConsumer new ListenerConsumer(this.listener, listenerType);setRunning(true);// 将消费者对象放入到线程池中执行this.listenerConsumerFuture containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);} KafkaMessageListenerContainer.ListenerConsumer#run public void run() {this.consumerThread Thread.currentThread();if (this.genericListener instanceof ConsumerSeekAware) {((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);}if (this.transactionManager ! null) {ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);}this.count 0;this.last System.currentTimeMillis();if (isRunning() this.definedPartitions ! null) {try {initPartitionsIfNeeded();}catch (Exception e) {this.logger.error(Failed to set initial offsets, e);}}long lastReceive System.currentTimeMillis();long lastAlertAt lastReceive;while (isRunning()) {try {if (!this.autoCommit !this.isRecordAck) {processCommits();}processSeeks();if (!this.consumerPaused isPaused()) {this.consumer.pause(this.consumer.assignment());this.consumerPaused true;if (this.logger.isDebugEnabled()) {this.logger.debug(Paused consumption from: this.consumer.paused());}publishConsumerPausedEvent(this.consumer.assignment());}// 拉取信息ConsumerRecordsK, V records this.consumer.poll(this.containerProperties.getPollTimeout());this.lastPoll System.currentTimeMillis();if (this.consumerPaused !isPaused()) {if (this.logger.isDebugEnabled()) {this.logger.debug(Resuming consumption from: this.consumer.paused());}SetTopicPartition paused this.consumer.paused();this.consumer.resume(paused);this.consumerPaused false;publishConsumerResumedEvent(paused);}if (records ! null this.logger.isDebugEnabled()) {this.logger.debug(Received: records.count() records);if (records.count() 0 this.logger.isTraceEnabled()) {this.logger.trace(records.partitions().stream().flatMap(p - records.records(p).stream())// map to same format as send metadata toString().map(r - r.topic() - r.partition() r.offset()).collect(Collectors.toList()));}}if (records ! null records.count() 0) {if (this.containerProperties.getIdleEventInterval() ! null) {lastReceive System.currentTimeMillis();}invokeListener(records);}else {if (this.containerProperties.getIdleEventInterval() ! null) {long now System.currentTimeMillis();if (now lastReceive this.containerProperties.getIdleEventInterval() now lastAlertAt this.containerProperties.getIdleEventInterval()) {publishIdleContainerEvent(now - lastReceive, this.isConsumerAwareListener? this.consumer : null, this.consumerPaused);lastAlertAt now;if (this.genericListener instanceof ConsumerSeekAware) {seekPartitions(getAssignedPartitions(), true);}}}}}catch (WakeupException e) {// Ignore, were stopping}catch (NoOffsetForPartitionException nofpe) {this.fatalError true;ListenerConsumer.this.logger.error(No offset and no reset policy, nofpe);break;}catch (Exception e) {handleConsumerException(e);}}ProducerFactoryUtils.clearConsumerGroupId();if (!this.fatalError) {if (this.kafkaTxManager null) {commitPendingAcks();try {this.consumer.unsubscribe();}catch (WakeupException e) {// No-op. Continue process}}}else {ListenerConsumer.this.logger.error(No offset and no reset policy; stopping container);KafkaMessageListenerContainer.this.stop();}this.monitorTask.cancel(true);if (!this.taskSchedulerExplicitlySet) {((ThreadPoolTaskScheduler) this.taskScheduler).destroy();}this.consumer.close();this.logger.info(Consumer stopped);} 2、ConcurrentMessageListenerContainer 并发消息监听相当于创建消费者其底层逻辑仍然是通过KafkaMessageListenerContainer实现处理从实现上看就是在KafkaMessageListenerContainer上做了层包装有多少的concurrency就创建多个KafkaMessageListenerContainer也就是concurrency个消费者。 protected void doStart() {if (!isRunning()) {ContainerProperties containerProperties getContainerProperties();TopicPartitionInitialOffset[] topicPartitions containerProperties.getTopicPartitions();if (topicPartitions ! null this.concurrency topicPartitions.length) {this.logger.warn(When specific partitions are provided, the concurrency must be less than or equal to the number of partitions; reduced from this.concurrency to topicPartitions.length);this.concurrency topicPartitions.length;}setRunning(true);// 创建多个消费者for (int i 0; i this.concurrency; i) {KafkaMessageListenerContainerK, V container;if (topicPartitions null) {container new KafkaMessageListenerContainer(this, this.consumerFactory,containerProperties);}else {container new KafkaMessageListenerContainer(this, this.consumerFactory,containerProperties, partitionSubset(containerProperties, i));}String beanName getBeanName();container.setBeanName((beanName ! null ? beanName : consumer) - i);if (getApplicationEventPublisher() ! null) {container.setApplicationEventPublisher(getApplicationEventPublisher());}container.setClientIdSuffix(- i);container.setAfterRollbackProcessor(getAfterRollbackProcessor());container.start();this.containers.add(container);}}} 3、KafkaListener底层监听原理 上面已经介绍了KafkaMessageListenerContainer的作用是拉取并处理消息但还缺少关键的一步即 如何将我们的业务逻辑与KafkaMessageListenerContainer的处理逻辑联系起来 那么这个桥梁就是KafkaListener注解 KafkaListenerAnnotationBeanPostProcessor, 从后缀BeanPostProcessor就可以知道这是Spring IOC初始化bean相关的操作当然这里也是此类会扫描带KafkaListener注解的类或者方法通过 KafkaListenerContainerFactory工厂创建对应的KafkaMessageListenerContainer并调用start方法启动监听也就是这样打通了这条路… 4、Spring Boot 自动加载kafka相关配置 1、KafkaAutoConfiguration 自动生成kafka相关配置比如当缺少这些bean的时候KafkaTemplate、ProducerListener、ConsumerFactory、ProducerFactory等默认创建bean实例 2、KafkaAnnotationDrivenConfiguration 主要是针对于spring-kafka提供的注解背后的相关操作比如 KafkaListener; 在开启了EnableKafka注解后spring会扫描到此配置并创建缺少的bean实例比如当配置的工厂beanName不是kafkaListenerContainerFactory的时候就会默认创建一个beanName为kafkaListenerContainerFactory的实例这也是为什么在springboot中不用定义consumer的相关配置也可以通过KafkaListener正常的处理消息 5、消息处理 1、单条消息处理 Configuration public class KafkaConsumerConfiguration {BeanKafkaListenerContainerFactoryConcurrentMessageListenerContainerInteger, String kafkaCustomizeContainerFactory() {ConcurrentKafkaListenerContainerFactoryInteger, String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(2);factory.getContainerProperties().setPollTimeout(3000);return factory;}private ConsumerFactoryInteger, String consumerFactory() {return new DefaultKafkaConsumerFactory(consumerConfigs());}private MapString, Object consumerConfigs() {MapString, Object props new HashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bizConfig.getReconciliationInstanceKafkaServers());props.put(ConsumerConfig.GROUP_ID_CONFIG, bizConfig.getReconciliationInstanceKafkaConsumerGroupId());props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 300);// poll 一次拉取的阻塞的最大时长单位毫秒。这里指的是阻塞拉取需要满足至少 fetch-min-size 大小的消息props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 10000);return props;}} 这种方式的KafkaLisener中的参数是单条的。 2、批量处理 Configuration EnableKafka public class KafkaConfig {Bean public KafkaListenerContainerFactory?, ? batchFactory() {ConcurrentKafkaListenerContainerFactoryInteger, String factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());// 增加开启批量处理factory.setBatchListener(true); // return factory; }Beanpublic ConsumerFactoryInteger, String consumerFactory() {return new DefaultKafkaConsumerFactory(consumerConfigs());}Beanpublic MapString, Object consumerConfigs() {MapString, Object props new HashMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());...return props;} }// 注意这里接受的是集合类型 KafkaListener(id list, topics myTopic, containerFactory batchFactory) public void listen(ListString list) {... } 这种方式的KafkaLisener中的参数是多条的。 6、线程池相关 如果没有额外给Kafka指定线程池底层默认用的是SimpleAsyncTaskExecutor类它不使用线程池而是为每个任务创建新线程。相当于一个消费者用一个独立的线程来跑。 总结 spring为了将kafka融入其生态方便在spring大环境下使用kafka开发了spring-kafa这一模块本质上是为了帮助开发者更好的以spring的方式使用kafka KafkaListener就是这么一个工具在同一个项目中既可以有单条的消息处理也可以配置多条的消息处理稍微改变下配置即可实现很是方便 当然KafkaListener单条或者多条消息处理仍然是spring自行封装处理与kafka-client客户端的拉取机制无关比如一次性拉取50条消息对于单条处理来说就是循环50次处理而多条消息处理则可以一次性处理50条本质上来说这套逻辑都是spring处理的并不是说单条消费就是通过kafka-client一次只拉取一条消息 在使用过程中需要注意spring自动的创建的一些bean实例当然也可以覆盖其自动创建的实例以满足特定的需求场景。                          原文链接https://blog.csdn.net/yuechuzhixing/article/details/124725713
http://www.hkea.cn/news/14327092/

相关文章:

  • 高端网站建设公司排行呼市做网站建设的公司哪家好
  • 网站流量数据什么网站做推广效果好
  • 建设网站思路科技公司 网站 石家庄
  • 北京网站开发公司前十名昆明网页制作设计
  • 个人网站建设计划表2345网址导航手机版下载
  • 龙口建网站价格最少的钱怎么做网站
  • 在后台怎么做网站内链嵌入式软件开发工程师工作内容
  • 网站的设计开发免费建站系统下载
  • 网站开发进程报告wordpress主题和预览不同
  • 东莞公司想建网站网页制作学什么东西
  • 杨家平网站建设做废品回收哪个网站好点
  • 嘉峪关做网站做外国人生意的网站有哪些
  • 太原网站建设制作公司哪家好wordpress h5
  • 做网站软件A开头的wordpress主题是用什么开发出来的
  • 关于建设网站的情况说明书网络规划设计师学历低
  • 邢台做移动网站费用大连房地产网站建设
  • 网站设计ai公司邮箱签名模板
  • 桂林市临桂区城乡建设局网站县级部门和乡镇不能建网站建设
  • 商城网站建设运营合同书西安高校定制网站建设公司推荐
  • 杭州公司注销网站备案WordPress的MySQL宕
  • 厦门网站优化建设河南网站建设工作室
  • 无锡滨湖区建设局网站wordpress single page
  • 服务器怎样做网站呢深圳短视频seo搜索排名如何做
  • 网站标准尺寸做电脑网站宽度
  • 子目录创建网站网站建设实训心得php
  • 商务网站设计做h5网站
  • 装修公司网站php源码南宁建网站必荐云尚网络
  • html网站编辑器网站开发设计框图
  • 驾校网站模版如何提升网站搜索排名
  • win7下asp网站搭建创建网站的详细步骤