当前位置: 首页 > news >正文

网站设计 济南wordpress hta

网站设计 济南,wordpress hta,php网站上线,口碑好网站建设资源文章目录 前言1、application.yml2、RabbitMqConfig3、MqMessage4、MqMessageItem5、DirectMode6、StateConsumer#xff1a;消费者7、InfoConsumer#xff1a;消费者 前言 本文是工作之余的随手记#xff0c;记录在工作期间使用 RabbitMQ 的笔记。 1、application.yml 使… 文章目录 前言1、application.yml2、RabbitMqConfig3、MqMessage4、MqMessageItem5、DirectMode6、StateConsumer消费者7、InfoConsumer消费者 前言 本文是工作之余的随手记记录在工作期间使用 RabbitMQ 的笔记。 1、application.yml 使用 use 属性方便随时打开和关闭使用 MQ 并且可以做到细化控制。 spring:rabbitmq:use: truehost: 10.100.10.100port: 5672username: wenpassword: 123456exchangeSubPush: exWenqueueSubPush: ha.queue.SubPushrouteSubPush: 1000exchangeState: sync.ex.StatequeueState: ha.q.ServerqueueStateSync: ha.q.StateServerrouteState: stateexchangeOnlineMonitor: sync.ex.StaterouteOnlineMonitor: statequeueOnlineMonitor: ha.q.Onlinepom.xml 文件中使用的是 SpringBoot 项目使用 spring-boot-starter-amqp 依赖。 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdcom.wen/groupIdartifactIdspringboot-mybatis/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncoding/propertiesparentartifactIdspring-boot-starter-parent/artifactIdgroupIdorg.springframework.boot/groupIdversion2.5.3/version/parentdependenciesdependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-tomcat/artifactId/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-amqp/artifactId/dependencydependencygroupIdcom.alibaba.fastjson2/groupIdartifactIdfastjson2/artifactIdversion2.0.18/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.83/version/dependencydependencygroupIdcn.hutool/groupIdartifactIdhutool-all/artifactIdversion5.8.1/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdscopetest/scope/dependencydependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdversion1.16.18/version/dependency/dependencies /project2、RabbitMqConfig 配置类将可配置的参数使用 Value 做好配置与 application.yml 相互对应。 package com.wen.mq;import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct;Slf4j Configuration Data public class RabbitMqConfig {Value(${spring.rabbitmq.use:true})private boolean use;Value(${spring.rabbitmq.host})private String host;Value(${spring.rabbitmq.port})private int port;Value(${spring.rabbitmq.username})private String username;Value(${spring.rabbitmq.password})private String password;Value(${spring.rabbitmq.virtual-host:})private String virtualHost;Value(${spring.rabbitmq.exchangeState})private String exchangeState;Value(${spring.rabbitmq.queueState})private String queueState;Value(${spring.rabbitmq.routeState})private String routeState;Value((${spring.rabbitmq.queueStateSync}))private String queueStateSync;Value(${spring.rabbitmq.exchangeOnlineInfo})private String exchangeOnlineInfo;Value(${spring.rabbitmq.routeOnlineInfo})private String routeOnlineInfo;Value(${spring.rabbitmq.queueOnlineInfo})private String queueOnlineInfo;PostConstructprivate void init() {} }3、MqMessage MQ 消息实体类 package com.wen.mq;import lombok.Data;Data public class MqMessageT {private String msgType;private String msgOrigin;private long time;private T data;}4、MqMessageItem MQ 消息实体类 package com.wen.mq;import lombok.Data;Data public class MqMessageItem {private long userId;private String userName;private int userAge;private String userSex;private String userPhone;private String op;}5、DirectMode 配置中心使用 SimpleMessageListenerContainer 进行配置。新加一个消费者队列就要在这里进行配置。 package com.wen.mq;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;Slf4j Configuration public class DirectMode {AutowiredRabbitMqConfig rabbitMqConfig;Autowiredprivate CachingConnectionFactory connectionFactory;Autowiredprivate StateConsumer stateConsumer;Autowiredprivate InfoConsumer infoConsumer;Beanpublic SimpleMessageListenerContainer initMQ() {if (!rabbitMqConfig.isUse()) {return null;}log.info(begin!);SimpleMessageListenerContainer container new SimpleMessageListenerContainer(connectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认这里改为手动确认// 设置一个队列container.setQueueNames(rabbitMqConfig.getQueueStateSync());//如果同时设置多个队列如下 前提是队列都是必须已经创建存在的//container.setQueueNames(TestDirectQueue,TestDirectQueue2,TestDirectQueue3”);//另一种设置队列的方法,如果使用这种情况,那么要设置多个,就使用addQueues//container.setQueues(new Queue(TestDirectQueue,true));//container.addQueues(new Queue(TestDirectQueue2,true));//container.addQueues(new Queue(TestDirectQueue3,true));container.setMessageListener(stateConsumer);log.info(end);return container;}Beanpublic SimpleMessageListenerContainer contactSyncContainer() {if (!rabbitMqConfig.isUse()) {return null;}log.info(contact begin);SimpleMessageListenerContainer container new SimpleMessageListenerContainer(connectionFactory);container.setConcurrentConsumers(1);container.setMaxConcurrentConsumers(1);container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // RabbitMQ默认是自动确认这里改为手动确认消息//设置一个队列container.setQueueNames(rabbitMqConfig.getQueueOnlineInfo());container.setMessageListener(infoConsumer);log.info(contact end);return container;}Beanpublic Queue queueState() {if (!rabbitMqConfig.isUse()) {return null;}return new Queue(rabbitMqConfig.getQueueState());}Beanpublic Queue queueStateSync() {if (!rabbitMqConfig.isUse()) {return null;}return new Queue(rabbitMqConfig.getQueueStateSync());}BeanDirectExchange exchangeState() {if (!rabbitMqConfig.isUse()) {return null;}return new DirectExchange(rabbitMqConfig.getExchangeState());}BeanBinding bindingState() {if (!rabbitMqConfig.isUse()) {return null;}return BindingBuilder.bind(queueState()).to(exchangeState()).with(rabbitMqConfig.getRouteState());}BeanBinding bindingStateSync() {if (!rabbitMqConfig.isUse()) {return null;}return BindingBuilder.bind(queueStateSync()).to(exchangeState()).with(rabbitMqConfig.getRouteState());}// 新加一个消费者Beanpublic Queue queueOnlineMonitor() {if (!rabbitMqConfig.isUse()) {return null;}return new Queue(rabbitMqConfig.getQueueOnlineInfo());}BeanDirectExchange exchangeOnlineMonitor() {if (!rabbitMqConfig.isUse()) {return null;}return new DirectExchange(rabbitMqConfig.getExchangeOnlineInfo());}BeanBinding bindingExchangeOnlineMonitor() {if (!rabbitMqConfig.isUse()) {return null;}return BindingBuilder.bind(queueOnlineMonitor()).to(exchangeOnlineMonitor()).with(rabbitMqConfig.getRouteOnlineInfo());} }6、StateConsumer消费者 实现 ChannelAwareMessageListener 接口可以在这里面做相应的操作例如存缓存存库等。 package com.wen.mq;import cn.hutool.core.collection.CollectionUtil; import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.TypeReference; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.stream.Collectors;Slf4j Component public class StateConsumer implements ChannelAwareMessageListener {AutowiredRabbitMqConfig rabbitMqConfig;Overridepublic void onMessage(Message message, Channel channel) throws Exception {String queueName message.getMessageProperties().getConsumerQueue();long deliveryTag message.getMessageProperties().getDeliveryTag();if (!rabbitMqConfig.getQueueStateSync().equals(queueName)) {String bodyStr new String(message.getBody(), StandardCharsets.UTF_8);try {MqMessageListMqMessageItem mqMessage JSON.parseObject(bodyStr, new TypeReferenceMqMessageListMqMessageItem() {});// 这里可以对消息做其他处理例如存储到缓存中ListMqMessageItem items mqMessage.getData();if (CollectionUtil.isNotEmpty(items)) {applyToRedis(mqMessage);}log.info(consume mq msg ok, queue:{}, deliveryTag:{}, msg:{}, queueName, deliveryTag, mqMessage);channel.basicAck(deliveryTag, false);} catch (JSONException e) {log.error(parse mq msg exception, queue:{}, deliveryTag:{}, queueName, deliveryTag, e);channel.basicReject(deliveryTag, false);} catch (Exception e) {log.error(consume mq msg exception, queue:{}, deliveryTag:{}, queueName, deliveryTag, e);channel.basicReject(deliveryTag, true); //为true会重新放回队列}}}public static final String MQ_STATE_OP_REMOVE_STATE REMOVE_STATE;public static final String MQ_STATE_OP_CHANGE_STATE CHANGE_STATE;private void applyToRedis(MqMessageListMqMessageItem mqMessage) {ListMqMessageItem data mqMessage.getData();MapString, ListMqMessageItem itemGroupByOp data.stream().collect(Collectors.groupingBy(item - item.getOp()));ListMqMessageItem stateToRemove itemGroupByOp.get(MQ_STATE_OP_REMOVE_STATE);ListMqMessageItem stateToChange itemGroupByOp.get(MQ_STATE_OP_CHANGE_STATE);if (CollectionUtil.isNotEmpty(stateToRemove)) {MapLong, SetString map new HashMap();for (MqMessageItem item : stateToRemove) {map.computeIfAbsent(item.getUserId(), u - new HashSet()).add(String.valueOf(item.getUserAge()));}// cacheService.removeUserState(map);}if (CollectionUtil.isNotEmpty(stateToChange)) {ListMqMessageItem list stateToChange.stream().map(u - {MqMessageItem dto new MqMessageItem();dto.setUserId(u.getUserId());dto.setUserAge(u.getUserAge());dto.setUserName(u.getUserName());dto.setUserSex(u.getUserSex());dto.setUserPhone(u.getUserPhone());return dto;}).collect(Collectors.toList());// cacheService.saveUserState(list);}} }7、InfoConsumer消费者 实现 ChannelAwareMessageListener 接口可以在这里面做相应的操作例如存缓存存库等。 package com.wen.mq;import com.alibaba.fastjson.JSONException; import com.alibaba.fastjson.JSONObject; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;Slf4j Component public class InfoConsumer implements ChannelAwareMessageListener {AutowiredRabbitMqConfig rabbitMqConfig;Overridepublic void onMessage(Message message, Channel channel) throws Exception {String queueName message.getMessageProperties().getConsumerQueue();log.info(queueName: {}, queueName);long deliveryTag message.getMessageProperties().getDeliveryTag();try {byte[] body message.getBody();String content new String(body);MqMessage msg JSONObject.parseObject(content, MqMessage.class);if (rabbitMqConfig.getQueueOnlineInfo().equals(queueName)) {// 订阅到的消息就是变更的消息// 这里可使用service对消息进行消费返回一个booleanlog.info(用户监控数据写入失败数据{}, msg);}log.info(consume mq msg ok, queue:{}, deliveryTag:{}, msg:{}, queueName, deliveryTag, msg);channel.basicAck(deliveryTag, false);} catch (JSONException e) {log.error(parse mq msg exception, queue:{}, deliveryTag:{}, queueName, deliveryTag, e);channel.basicReject(deliveryTag, false); //为true会重新放回队列} catch (Exception e) {log.error(consume mq msg exception, queue:{}, deliveryTag:{}, queueName, deliveryTag, e);channel.basicReject(deliveryTag, true); //为true会重新放回队列}} }
http://www.hkea.cn/news/14257941/

相关文章:

  • 易语言怎么做网站自动登录wordpress导入数据库后出现乱码
  • 建设购物网站的目的个人养老保险缴费档次
  • 怎么做全民夺宝网站做智慧教室的网站
  • 做海报的素材那个网站比较好海兴做网站
  • 音乐网站建设程序html语言的特点
  • 备案的网站必须打开吗枣强网站建设
  • 网站可以给pdf做笔记c 网站开发案例
  • 湖南张家界建设局网站义乌论坛
  • 郑州建立一个网站需要哪些wordpress无法编辑器
  • 网站专题报道页面怎么做的阿里云添加网站
  • 烟台seo网站诊断全屋定制营销
  • 金融类的网站怎么做怎么联系企业的网站建设
  • 邵阳网站制作建设软件大全安卓版下载
  • 百度多久收录一次网站商标注册网上
  • 搭建企业网站的步骤福建平潭建设局网站
  • thinkphp网站后台模板网站建设需要用到什么
  • 福州有什么做网站的公司wordpress 医院模板下载
  • 淅川微网站开发wordpress手机单页面模板
  • 网站点击按钮回到页面顶部怎么做政务网站建设存在的问题
  • 学校网站建设情况说明宝塔配置wordpress和dz伪静态
  • 编程网站免费中文版wish跨境电商平台官网
  • 17zwd一起做业网站灰色行业老域名做网站不收录
  • 家具设计网站推荐什么做电子书下载网站好
  • 赣州制作网站百度织梦网站图片不显示图片
  • 网站运营知识晚上睡不着想看点正能量
  • 石家庄新华区网站建设wordpress模板2018
  • 温州本地网站网站商城前台模板
  • 金属材料网站建设无锡网站建设上海韵茵
  • 网站建设+开源wordpress 小米主题
  • 网站建设 排名宝下拉wordpress媒体文件隔离