当前位置: 首页 > news >正文

毕业设计网站登陆建设银行官方网站

毕业设计网站,登陆建设银行官方网站,2023智慧树网络营销答案,wordpress对接公众号开发者目录 前言一、情景介绍二、问题分析三、代码实现 前言 之前接到一个需求#xff0c;我们项目的技术负责人希望通过配置的形式#xff0c;在项目启动的时候自动根据配置生成对应的消费者 觉得还有点意思#xff0c;随即记录一下~ 一、情景介绍 比如我这里有一个消费者 Mes… 目录 前言一、情景介绍二、问题分析三、代码实现 前言 之前接到一个需求我们项目的技术负责人希望通过配置的形式在项目启动的时候自动根据配置生成对应的消费者 觉得还有点意思随即记录一下~ 一、情景介绍 比如我这里有一个消费者 MessageConsumer Slf4j Service RocketMQMessageListener(consumerGroup mike-group,topic mike-message,selectorExpression TAG_MESSAGE_CONSUMER,consumeThreadMax 6,consumeTimeout 60L) public class MessageConsumer implements RocketMQListenerNotifyMessage {Overridepublic void onMessage(NotifyMessage notifyMessage) {System.err.println(我收到啦~~);System.err.println(message notifyMessage);} }在项目启动的时候会根据 RocketMQMessageListener 注解上的配置生成一个消费者 假如我还需要一个 MessageConsumer 消费者其 selectorExpression 的配置为 TAG_MESSAGE_CONSUMER_01consumeThreadMax 要设置为 8 通常情况下我们会再复制一个 MessageConsumer 命名为 MessageConsumer_01然后在新的消费者上改对应的配置例如 Slf4j Service RocketMQMessageListener(consumerGroup mike-group-01,topic mike-message,selectorExpression TAG_MESSAGE_CONSUMER_01,consumeThreadMax 8,consumeTimeout 60L) public class MessageConsumer_01 implements RocketMQListenerNotifyMessage {Overridepublic void onMessage(NotifyMessage notifyMessage) {System.err.println(我收到啦~~);System.err.println(message notifyMessage);} }这样做虽然没啥问题只是这两个类除了配置不一样其它的代码都是一摸一样的倘若之后还要有一个 selectorExpression TAG_MESSAGE_CONSUMER_02 的消费者那我又得再复制一个 MessageConsumer这样就造成了大量的代码冗余 所以就希望通过读取配置文件生成对应配置的消费者 二、问题分析 要如何实现这个功能可以去看下 RocketMQ 的源码看 Spring 是如何创建 RocketMQ 的消费者的 源码如下 org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer#initRocketMQPushConsumer 在该方法中可以看到 Spring 是如何初始化消费者参照这个方法只需要在项目启动完成后将初始化从注解上获取消费者配置的地方换成从配置文件上获取就可以了 通过实现 ApplicationListenerApplicationReadyEvent 可以监听项目是否启动完成 三、代码实现 因为消费者是需要通过配置文件的配置来自动生成那么可以将需要自动生成的消费者比如 MessageConsumer其 RocketMQMessageListener 的配置注释掉 Slf4j Service //RocketMQMessageListener( // consumerGroup mike-group, // topic mike-message, // selectorExpression TAG_MESSAGE_CONSUMER, // consumeThreadMax 6, // consumeTimeout 60L) public class MessageConsumer implements RocketMQListenerNotifyMessage {Overridepublic void onMessage(NotifyMessage notifyMessage) {System.err.println(我收到啦~~);System.err.println(message notifyMessage);} }配置文件上自动注入消费者的配置最好和 RocketMQMessageListener 的属性相同并且可以配置多个自动注入的消费者那么对应的映射文件可以这么写 AutoConsumerProperties.java import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.context.annotation.Configuration;import java.util.List;Data RefreshScope Configuration ConfigurationProperties(prefix auto-consumer) public class AutoConsumerProperties {private ListAutoConsumer messageConsumer; }AutoConsumer.java import lombok.Data; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.SelectorType;Data public class AutoConsumer {private String consumerGroup;private String topic;private SelectorType selectorType SelectorType.TAG;private String selectorExpression *;private ConsumeMode consumeMode ConsumeMode.CONCURRENTLY;private MessageModel messageModel MessageModel.CLUSTERING;private int consumeThreadMin 64;private int consumeThreadMax 64;private long consumeTimeout 15L;private String accessKey;private String secretKey;private boolean enableMsgTrace;private String customizedTraceTopic;private String nameServer;private String accessChannel; }核心代码 ConsumerStarted.java import cn.hutool.core.collection.CollUtil; import com.mike.common.core.utils.JacksonUtil; import com.mike.server.message.config.properties.AutoConsumer; import com.mike.server.message.config.properties.AutoConsumerProperties; import com.mike.server.message.consumer.MessageConsumer; import com.mike.server.message.domain.entity.NotifyMessage; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.support.RocketMQUtil; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.util.Assert;import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Objects;Slf4j Component public class ConsumerStarted implements ApplicationContextAware, ApplicationListenerApplicationReadyEvent /* , InitializingBean, SmartLifecycle */ {Value(${rocketmq.name-server:})private String nameServer;Value(${rocketmq.consumer.topic:})private String topic;Value(${rocketmq.consumer.access-key:})private String accessKey;Value(${rocketmq.consumer.secret-key:})private String secretKey;Resourceprivate AutoConsumerProperties autoConsumerProperties;Resourceprivate MessageConsumer messageConsumer;private ApplicationContext applicationContext;private final static boolean enableMsgTrace true;private final static String customizedTraceTopic null;OverrideSuppressWarnings(all)public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {// 需要等到程序启动完全之后再去启动initConsumer();}public void initConsumer() {ListAutoConsumer messageConsumers autoConsumerProperties.getMessageConsumer();if (CollUtil.isEmpty(messageConsumers)) return;final RocketMQListenerNotifyMessage messageConsumerListener messageConsumer;this.autoGenerateConsumer(messageConsumers, messageConsumerListener, NotifyMessage.class);}SuppressWarnings(all)private R void autoGenerateConsumer(ListAutoConsumer autoConsumers, RocketMQListenerR rocketMQListener, ClassR objClass) {// 根据 tag 自动生成对应的消费者for (AutoConsumer autoConsumer : autoConsumers) {String consumerGroup autoConsumer.getConsumerGroup();String nameServer getValueOrDefault(autoConsumer.getNameServer(), this.nameServer);String topic getValueOrDefault(autoConsumer.getTopic(), this.topic);String accessKey getValueOrDefault(autoConsumer.getAccessKey(), this.accessKey);String secretKey getValueOrDefault(autoConsumer.getSecretKey(), this.secretKey);try {Assert.notNull(consumerGroup, Property consumerGroup is required);Assert.notNull(nameServer, Property nameServer is required);Assert.notNull(topic, Property topic is required);DefaultMQPushConsumer consumer;RPCHook rpcHook RocketMQUtil.getRPCHookByAkSk(this.applicationContext.getEnvironment(), accessKey, secretKey);if (Objects.nonNull(rpcHook)) {consumer new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);consumer.setVipChannelEnabled(false);} else {consumer new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, customizedTraceTopic);}consumer.setInstanceName(RocketMQUtil.getInstanceName(this.nameServer));consumer.setNamesrvAddr(this.nameServer);consumer.setAccessChannel(AccessChannel.LOCAL);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setConsumeThreadMin(autoConsumer.getConsumeThreadMin());consumer.setConsumeThreadMax(autoConsumer.getConsumeThreadMax());if (consumer.getConsumeThreadMax() consumer.getConsumeThreadMin()) {consumer.setConsumeThreadMin(consumer.getConsumeThreadMax());}switch (autoConsumer.getMessageModel()) {case BROADCASTING:consumer.setMessageModel(MessageModel.BROADCASTING);break;case CLUSTERING:consumer.setMessageModel(MessageModel.CLUSTERING);break;default:throw new IllegalArgumentException(Property messageModel was wrong.);}switch (autoConsumer.getSelectorType()) {case TAG:consumer.subscribe(topic, autoConsumer.getSelectorExpression());break;case SQL92:consumer.subscribe(topic, MessageSelector.bySql(autoConsumer.getSelectorExpression()));break;default:throw new IllegalArgumentException(Property selectorType was wrong.);}switch (autoConsumer.getConsumeMode()) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly(autoConsumer, rocketMQListener, objClass));break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently(autoConsumer, rocketMQListener, objClass));break;default:throw new IllegalArgumentException(Property consumeMode was wrong.);}consumer.start();log.info(Consumer Start Success: {}:{}, topic, autoConsumer.getSelectorExpression());} catch (MQClientException e) {e.printStackTrace();log.info(Consumer Start Failed: {}:{}, topic, autoConsumer.getSelectorExpression());}}}private String getValueOrDefault(String value, String defaultValue) {return StringUtils.isNotBlank(value)? value: defaultValue;}OverrideSuppressWarnings(all)public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext applicationContext;}public class DefaultMessageListenerOrderlyT implements MessageListenerOrderly {private final AutoConsumer autoConsumer;private final RocketMQListenerT rocketMQListener;private final ClassT objClass;public DefaultMessageListenerOrderly(AutoConsumer autoConsumer, RocketMQListenerT rocketMQListener, ClassT objClass) {this.autoConsumer autoConsumer;this.rocketMQListener rocketMQListener;this.objClass objClass;}public ConsumeOrderlyStatus consumeMessage(ListMessageExt msgList, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgList) {log.info(group[{}]-tag[{}] consume start -, autoConsumer.getConsumerGroup(), autoConsumer.getSelectorExpression());log.debug(received msg: {}, messageExt);try {long now System.currentTimeMillis();this.rocketMQListener.onMessage(doConvertMessage(messageExt, this.objClass));long costTime System.currentTimeMillis() - now;log.debug(consume {} cost: {} ms, messageExt.getMsgId(), costTime);} catch (Exception var9) {log.warn(consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}, messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), var9);final long suspendCurrentQueueTimeMillis 1000L;context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}}public class DefaultMessageListenerConcurrently T implements MessageListenerConcurrently {private final AutoConsumer autoConsumer;private final RocketMQListenerT rocketMQListener;private final ClassT objClass;public DefaultMessageListenerConcurrently(AutoConsumer autoConsumer, RocketMQListenerT rocketMQListener, ClassT objClass) {this.autoConsumer autoConsumer;this.rocketMQListener rocketMQListener;this.objClass objClass;}public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgList, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgList) {log.info(group[{}]-tag[{}] consume start -, autoConsumer.getConsumerGroup(), autoConsumer.getSelectorExpression());log.debug(received msg: {}, messageExt);try {long now System.currentTimeMillis();this.rocketMQListener.onMessage(doConvertMessage(messageExt, objClass));long costTime System.currentTimeMillis() - now;log.debug(consume {} cost: {} ms, messageExt.getMsgId(), costTime);} catch (Exception var9) {log.warn(consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}, messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), var9);final int delayLevelWhenNextConsume 0;context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}SuppressWarnings(unchecked)private T T doConvertMessage(MessageExt messageExt, ClassT objClass) {if (Objects.equals(objClass, MessageExt.class)) {return (T)messageExt;} else {String str new String(messageExt.getBody(), StandardCharsets.UTF_8);if (Objects.equals(objClass, String.class)) {return (T)str;} else {if (objClass ! null) {return JacksonUtil.fromJson(str, objClass);} else {log.info(convert failed. str:{}, msgType:{}, str, null);throw new RuntimeException(cannot convert message to null);}}}} }配置文件 yml 新增自动注入消费者的配置 auto-consumer:message-consumer:- consumer-group: mico-grouptopic: mike-messageselector-expression: TAG_MESSAGE_CONSUMERconsume-thread-max: 6- consumer-group: mike-group-01topic: mike-messageselector-expression: TAG_MESSAGE_CONSUMER_01consume-thread-max: 8- consumer-group: mike-group-02topic: mike-messageselector-expression: TAG_MESSAGE_CONSUMER_02consume-thread-max: 10如果是配置在 properties 文件中配置如下 auto-consumer.message-consumer[0].consumer-group mico-group auto-consumer.message-consumer[0].topic mike-message auto-consumer.message-consumer[0].selector-expression TAG_MESSAGE_CONSUMER auto-consumer.message-consumer[0].consume-thread-max 6auto-consumer.message-consumer[1].consumer-group mico-group-01 auto-consumer.message-consumer[1].topic mike-message auto-consumer.message-consumer[1].selector-expression TAG_MESSAGE_CONSUMER_01 auto-consumer.message-consumer[1].consume-thread-max 8auto-consumer.message-consumer[2].consumer-group mico-group-02 auto-consumer.message-consumer[2].topic mike-message auto-consumer.message-consumer[2].selector-expression TAG_MESSAGE_CONSUMER_02 auto-consumer.message-consumer[2].consume-thread-max 10启动项目进行验证观察是否有三个消费者被创建 从日志上看确实根据配置文件自动创建了三个不同的消费者
http://www.hkea.cn/news/14473436/

相关文章:

  • 网络推广外包费用做seo需要哪些知识
  • p2p网站建设多少钱泰州快速建站模板
  • 网站改版 方案重庆网站建设公司怎么做
  • 二级域名做网站有哪些缺点营销推广渠道有哪些
  • wordpress建立php站点地图云浮北京网站建设
  • 网站开发最新技术百度一下免费下载安装
  • 沈阳网站制作的公司哪家好网站排名优化公司
  • 有什的自学做网站淘宝联盟网站推广位怎么做
  • 可以自己买个服务器做网站吗百度免费优化
  • 给人家做的网站想改怎么改整合营销理论
  • 网站分站加盟宣传片制作标准参数
  • 建设文明网站包括哪些内容专门做拼花网站
  • 如何把网站放在根目录兰州网站建设招聘信息
  • 网站流量攻击软件wordpress垂直模板
  • 网站建设与维护试题手机与电脑网站制作
  • wordpress网站代码优化搜狗推广优化
  • 哪家公司提供专业的网站建设微信wordpress小程序
  • 手机网站建设价位阳朔到桂林游船时间表
  • 写网站论文怎么做帮做网站一般多少钱
  • 网站图片上的水印怎么做企业网页建设公司哪家比较好
  • 大连网络工程谷歌seo优化是什么
  • 网站建设群标签好写什么中国室内设计联盟图片
  • 移动开发主要学什么百度seo综合查询
  • 网站开发设计需要什么证书企业管理十大系统
  • 关于医院网站建设的通知俄文网站推广
  • 上海企业免费建站网页升级紧急通知自动跳转
  • 网站运营 策划 推广 维护房子简装修效果图片
  • 江苏建设部官方网站ppt模板下载免费版百度云
  • 网站开发工程师应聘书700字网站开发技术服务合同
  • 锡盟建设局网站让人做网站 需要准备什么条件