当前位置: 首页 > news >正文

网站推送怎么做的网站推广流程是

网站推送怎么做的,网站推广流程是,网站开发流程介绍,做正版电子书下载网站目录 前言一、情景介绍二、问题分析三、代码实现 前言 之前接到一个需求#xff0c;我们项目的技术负责人希望通过配置的形式#xff0c;在项目启动的时候自动根据配置生成对应的消费者 觉得还有点意思#xff0c;随即记录一下~ 一、情景介绍 比如我这里有一个消费者 Mes… 目录 前言一、情景介绍二、问题分析三、代码实现 前言 之前接到一个需求我们项目的技术负责人希望通过配置的形式在项目启动的时候自动根据配置生成对应的消费者 觉得还有点意思随即记录一下~ 一、情景介绍 比如我这里有一个消费者 MessageConsumer Slf4j Service RocketMQMessageListener(consumerGroup mike-group,topic mike-message,selectorExpression TAG_MESSAGE_CONSUMER,consumeThreadMax 6,consumeTimeout 60L) public class MessageConsumer implements RocketMQListenerNotifyMessage {Overridepublic void onMessage(NotifyMessage notifyMessage) {System.err.println(我收到啦~~);System.err.println(message notifyMessage);} }在项目启动的时候会根据 RocketMQMessageListener 注解上的配置生成一个消费者 假如我还需要一个 MessageConsumer 消费者其 selectorExpression 的配置为 TAG_MESSAGE_CONSUMER_01consumeThreadMax 要设置为 8 通常情况下我们会再复制一个 MessageConsumer 命名为 MessageConsumer_01然后在新的消费者上改对应的配置例如 Slf4j Service RocketMQMessageListener(consumerGroup mike-group-01,topic mike-message,selectorExpression TAG_MESSAGE_CONSUMER_01,consumeThreadMax 8,consumeTimeout 60L) public class MessageConsumer_01 implements RocketMQListenerNotifyMessage {Overridepublic void onMessage(NotifyMessage notifyMessage) {System.err.println(我收到啦~~);System.err.println(message notifyMessage);} }这样做虽然没啥问题只是这两个类除了配置不一样其它的代码都是一摸一样的倘若之后还要有一个 selectorExpression TAG_MESSAGE_CONSUMER_02 的消费者那我又得再复制一个 MessageConsumer这样就造成了大量的代码冗余 所以就希望通过读取配置文件生成对应配置的消费者 二、问题分析 要如何实现这个功能可以去看下 RocketMQ 的源码看 Spring 是如何创建 RocketMQ 的消费者的 源码如下 org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer#initRocketMQPushConsumer 在该方法中可以看到 Spring 是如何初始化消费者参照这个方法只需要在项目启动完成后将初始化从注解上获取消费者配置的地方换成从配置文件上获取就可以了 通过实现 ApplicationListenerApplicationReadyEvent 可以监听项目是否启动完成 三、代码实现 因为消费者是需要通过配置文件的配置来自动生成那么可以将需要自动生成的消费者比如 MessageConsumer其 RocketMQMessageListener 的配置注释掉 Slf4j Service //RocketMQMessageListener( // consumerGroup mike-group, // topic mike-message, // selectorExpression TAG_MESSAGE_CONSUMER, // consumeThreadMax 6, // consumeTimeout 60L) public class MessageConsumer implements RocketMQListenerNotifyMessage {Overridepublic void onMessage(NotifyMessage notifyMessage) {System.err.println(我收到啦~~);System.err.println(message notifyMessage);} }配置文件上自动注入消费者的配置最好和 RocketMQMessageListener 的属性相同并且可以配置多个自动注入的消费者那么对应的映射文件可以这么写 AutoConsumerProperties.java import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.cloud.context.config.annotation.RefreshScope; import org.springframework.context.annotation.Configuration;import java.util.List;Data RefreshScope Configuration ConfigurationProperties(prefix auto-consumer) public class AutoConsumerProperties {private ListAutoConsumer messageConsumer; }AutoConsumer.java import lombok.Data; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.MessageModel; import org.apache.rocketmq.spring.annotation.SelectorType;Data public class AutoConsumer {private String consumerGroup;private String topic;private SelectorType selectorType SelectorType.TAG;private String selectorExpression *;private ConsumeMode consumeMode ConsumeMode.CONCURRENTLY;private MessageModel messageModel MessageModel.CLUSTERING;private int consumeThreadMin 64;private int consumeThreadMax 64;private long consumeTimeout 15L;private String accessKey;private String secretKey;private boolean enableMsgTrace;private String customizedTraceTopic;private String nameServer;private String accessChannel; }核心代码 ConsumerStarted.java import cn.hutool.core.collection.CollUtil; import com.mike.common.core.utils.JacksonUtil; import com.mike.server.message.config.properties.AutoConsumer; import com.mike.server.message.config.properties.AutoConsumerProperties; import com.mike.server.message.consumer.MessageConsumer; import com.mike.server.message.domain.entity.NotifyMessage; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.client.AccessChannel; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.MessageSelector; import org.apache.rocketmq.client.consumer.listener.*; import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.support.RocketMQUtil; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; import org.springframework.util.Assert;import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Objects;Slf4j Component public class ConsumerStarted implements ApplicationContextAware, ApplicationListenerApplicationReadyEvent /* , InitializingBean, SmartLifecycle */ {Value(${rocketmq.name-server:})private String nameServer;Value(${rocketmq.consumer.topic:})private String topic;Value(${rocketmq.consumer.access-key:})private String accessKey;Value(${rocketmq.consumer.secret-key:})private String secretKey;Resourceprivate AutoConsumerProperties autoConsumerProperties;Resourceprivate MessageConsumer messageConsumer;private ApplicationContext applicationContext;private final static boolean enableMsgTrace true;private final static String customizedTraceTopic null;OverrideSuppressWarnings(all)public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {// 需要等到程序启动完全之后再去启动initConsumer();}public void initConsumer() {ListAutoConsumer messageConsumers autoConsumerProperties.getMessageConsumer();if (CollUtil.isEmpty(messageConsumers)) return;final RocketMQListenerNotifyMessage messageConsumerListener messageConsumer;this.autoGenerateConsumer(messageConsumers, messageConsumerListener, NotifyMessage.class);}SuppressWarnings(all)private R void autoGenerateConsumer(ListAutoConsumer autoConsumers, RocketMQListenerR rocketMQListener, ClassR objClass) {// 根据 tag 自动生成对应的消费者for (AutoConsumer autoConsumer : autoConsumers) {String consumerGroup autoConsumer.getConsumerGroup();String nameServer getValueOrDefault(autoConsumer.getNameServer(), this.nameServer);String topic getValueOrDefault(autoConsumer.getTopic(), this.topic);String accessKey getValueOrDefault(autoConsumer.getAccessKey(), this.accessKey);String secretKey getValueOrDefault(autoConsumer.getSecretKey(), this.secretKey);try {Assert.notNull(consumerGroup, Property consumerGroup is required);Assert.notNull(nameServer, Property nameServer is required);Assert.notNull(topic, Property topic is required);DefaultMQPushConsumer consumer;RPCHook rpcHook RocketMQUtil.getRPCHookByAkSk(this.applicationContext.getEnvironment(), accessKey, secretKey);if (Objects.nonNull(rpcHook)) {consumer new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);consumer.setVipChannelEnabled(false);} else {consumer new DefaultMQPushConsumer(consumerGroup, enableMsgTrace, customizedTraceTopic);}consumer.setInstanceName(RocketMQUtil.getInstanceName(this.nameServer));consumer.setNamesrvAddr(this.nameServer);consumer.setAccessChannel(AccessChannel.LOCAL);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.setConsumeThreadMin(autoConsumer.getConsumeThreadMin());consumer.setConsumeThreadMax(autoConsumer.getConsumeThreadMax());if (consumer.getConsumeThreadMax() consumer.getConsumeThreadMin()) {consumer.setConsumeThreadMin(consumer.getConsumeThreadMax());}switch (autoConsumer.getMessageModel()) {case BROADCASTING:consumer.setMessageModel(MessageModel.BROADCASTING);break;case CLUSTERING:consumer.setMessageModel(MessageModel.CLUSTERING);break;default:throw new IllegalArgumentException(Property messageModel was wrong.);}switch (autoConsumer.getSelectorType()) {case TAG:consumer.subscribe(topic, autoConsumer.getSelectorExpression());break;case SQL92:consumer.subscribe(topic, MessageSelector.bySql(autoConsumer.getSelectorExpression()));break;default:throw new IllegalArgumentException(Property selectorType was wrong.);}switch (autoConsumer.getConsumeMode()) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly(autoConsumer, rocketMQListener, objClass));break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently(autoConsumer, rocketMQListener, objClass));break;default:throw new IllegalArgumentException(Property consumeMode was wrong.);}consumer.start();log.info(Consumer Start Success: {}:{}, topic, autoConsumer.getSelectorExpression());} catch (MQClientException e) {e.printStackTrace();log.info(Consumer Start Failed: {}:{}, topic, autoConsumer.getSelectorExpression());}}}private String getValueOrDefault(String value, String defaultValue) {return StringUtils.isNotBlank(value)? value: defaultValue;}OverrideSuppressWarnings(all)public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext applicationContext;}public class DefaultMessageListenerOrderlyT implements MessageListenerOrderly {private final AutoConsumer autoConsumer;private final RocketMQListenerT rocketMQListener;private final ClassT objClass;public DefaultMessageListenerOrderly(AutoConsumer autoConsumer, RocketMQListenerT rocketMQListener, ClassT objClass) {this.autoConsumer autoConsumer;this.rocketMQListener rocketMQListener;this.objClass objClass;}public ConsumeOrderlyStatus consumeMessage(ListMessageExt msgList, ConsumeOrderlyContext context) {for (MessageExt messageExt : msgList) {log.info(group[{}]-tag[{}] consume start -, autoConsumer.getConsumerGroup(), autoConsumer.getSelectorExpression());log.debug(received msg: {}, messageExt);try {long now System.currentTimeMillis();this.rocketMQListener.onMessage(doConvertMessage(messageExt, this.objClass));long costTime System.currentTimeMillis() - now;log.debug(consume {} cost: {} ms, messageExt.getMsgId(), costTime);} catch (Exception var9) {log.warn(consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}, messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), var9);final long suspendCurrentQueueTimeMillis 1000L;context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}}return ConsumeOrderlyStatus.SUCCESS;}}public class DefaultMessageListenerConcurrently T implements MessageListenerConcurrently {private final AutoConsumer autoConsumer;private final RocketMQListenerT rocketMQListener;private final ClassT objClass;public DefaultMessageListenerConcurrently(AutoConsumer autoConsumer, RocketMQListenerT rocketMQListener, ClassT objClass) {this.autoConsumer autoConsumer;this.rocketMQListener rocketMQListener;this.objClass objClass;}public ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgList, ConsumeConcurrentlyContext context) {for (MessageExt messageExt : msgList) {log.info(group[{}]-tag[{}] consume start -, autoConsumer.getConsumerGroup(), autoConsumer.getSelectorExpression());log.debug(received msg: {}, messageExt);try {long now System.currentTimeMillis();this.rocketMQListener.onMessage(doConvertMessage(messageExt, objClass));long costTime System.currentTimeMillis() - now;log.debug(consume {} cost: {} ms, messageExt.getMsgId(), costTime);} catch (Exception var9) {log.warn(consume message failed. messageId:{}, topic:{}, reconsumeTimes:{}, messageExt.getMsgId(), messageExt.getTopic(), messageExt.getReconsumeTimes(), var9);final int delayLevelWhenNextConsume 0;context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}}SuppressWarnings(unchecked)private T T doConvertMessage(MessageExt messageExt, ClassT objClass) {if (Objects.equals(objClass, MessageExt.class)) {return (T)messageExt;} else {String str new String(messageExt.getBody(), StandardCharsets.UTF_8);if (Objects.equals(objClass, String.class)) {return (T)str;} else {if (objClass ! null) {return JacksonUtil.fromJson(str, objClass);} else {log.info(convert failed. str:{}, msgType:{}, str, null);throw new RuntimeException(cannot convert message to null);}}}} }配置文件 yml 新增自动注入消费者的配置 auto-consumer:message-consumer:- consumer-group: mico-grouptopic: mike-messageselector-expression: TAG_MESSAGE_CONSUMERconsume-thread-max: 6- consumer-group: mike-group-01topic: mike-messageselector-expression: TAG_MESSAGE_CONSUMER_01consume-thread-max: 8- consumer-group: mike-group-02topic: mike-messageselector-expression: TAG_MESSAGE_CONSUMER_02consume-thread-max: 10如果是配置在 properties 文件中配置如下 auto-consumer.message-consumer[0].consumer-group mico-group auto-consumer.message-consumer[0].topic mike-message auto-consumer.message-consumer[0].selector-expression TAG_MESSAGE_CONSUMER auto-consumer.message-consumer[0].consume-thread-max 6auto-consumer.message-consumer[1].consumer-group mico-group-01 auto-consumer.message-consumer[1].topic mike-message auto-consumer.message-consumer[1].selector-expression TAG_MESSAGE_CONSUMER_01 auto-consumer.message-consumer[1].consume-thread-max 8auto-consumer.message-consumer[2].consumer-group mico-group-02 auto-consumer.message-consumer[2].topic mike-message auto-consumer.message-consumer[2].selector-expression TAG_MESSAGE_CONSUMER_02 auto-consumer.message-consumer[2].consume-thread-max 10启动项目进行验证观察是否有三个消费者被创建 从日志上看确实根据配置文件自动创建了三个不同的消费者
http://www.hkea.cn/news/14500915/

相关文章:

  • idc销售网站源码章丘网站开发培训
  • 郑州建网站十大wordpress 3无法上传rar zip
  • 京东网站难做吗珠海网站建站模板
  • 做科研交流常用的网站上海市人才服务中心官网
  • 重庆网站建设小能手在线图片生成器
  • 网站开发使用软件环境东莞搭建网站要多少钱
  • 电影网站如何做长尾关键词到那个网站做翻译接单
  • 单位建设网站的请示国内最好的网站建设公司
  • 别人网站 自己的二级域名烟台网站网站建设
  • 电商网站国内外需求分析中国进出口贸易网
  • 大型网站制作需要什么设备上海通信管理局网站
  • wordpress5.21开启多站点做PPT素材图片网站 知乎
  • 常见的建站工具企业营销网站建设公司
  • 嘉兴微信网站在线免费货源网站
  • 赫章网站建设网站建设套餐是什么
  • pc响应式网站设计贵阳网站开发工作室
  • 免费搭建个人业务网站企业邮箱服务
  • 高端网站建设南京池州专业网站建设公司
  • 移动商务网站开发课程东莞百度推广教程
  • 个人网站建设的过程重庆响应式网站制作
  • 在线识别图片找原图温岭新站seo
  • 图书馆登录系统网站建设代码网站源码地址怎么看
  • 怎么找网站做推广网站与手机app是一体吗
  • 自动全屏网站模板如何制作局域网站
  • 四川中成煤炭建设集团网站无锡百姓网免费发布信息网
  • 专业网站建设制作价格公司外贸网站怎么做
  • 长沙营销网站建设网站建设设计制作外包
  • 制作响应式网站报价如何做广告推广赚钱
  • wordpress站点前台请求数过多小程序免费制作平台小程序
  • 注册深圳公司恒诚信价格网站优化前景