创建个人主页网站,长沙工程建设管理中心网站,app网站建设源码,如何开电商平台一、ACL权限控制
应用场景#xff1a;
RocketMQ提供了针对队列、用户等不同维度的非常全面的权限管理机制。通常来说#xff0c;RocketMQ作为一个内部服务#xff0c;是不需要进行权限控制的#xff0c;但是#xff0c;如果要通过RocketMQ进行跨部门甚至跨公司的合作
RocketMQ提供了针对队列、用户等不同维度的非常全面的权限管理机制。通常来说RocketMQ作为一个内部服务是不需要进行权限控制的但是如果要通过RocketMQ进行跨部门甚至跨公司的合作权限控制的重要性就显现出来了。
应用场景
RocketMQ提供了针对队列、用户等不同维度的非常全面的权限管理机制。通常来说RocketMQ作为一个内部服务是不需要进行权限控制的但是如果要通过RocketMQ进行跨部门甚至跨公司的合作权限控制的重要性就显现出来了。
权限控制体系
1、RocketMQ针对每个Topic就有完整的权限控制。比如在控制平台中就可以很方便的给每个Topic配置权限。 perm字段表示Topic的权限。有三个可选项。 2禁写禁订阅4可订阅不能写6可写可订阅
2、在Broker端还提供了更详细的权限控制机制。主要是在broker.conf中打开acl的标志aclEnabletrue。然后就可以用他提供的plain_acl.yml来进行权限配置了。并且这个配置文件是热加载的也就是说要修改配置时只要修改配置文件就可以了不用重启Broker服务。文件的配置方式也非常简单一目了然。
#全局白名单不受ACL控制
#通常需要将主从架构中的所有节点加进来
globalWhiteRemoteAddresses:
- 10.10.103.*
- 192.168.0.*accounts:
#第一个账户
- accessKey: RocketMQsecretKey: 12345678whiteRemoteAddress:admin: false defaultTopicPerm: DENY #默认Topic访问策略是拒绝defaultGroupPerm: SUB #默认Group访问策略是只允许订阅topicPerms:- topicADENY #topicA拒绝- topicBPUB|SUB #topicB允许发布和订阅消息- topicCSUB #topicC只允许订阅groupPerms:# the group should convert to retry topic- groupADENY- groupBPUB|SUB- groupCSUB
#第二个账户只要是来自192.168.1.*的IP就可以访问所有资源
- accessKey: rocketmq2secretKey: 12345678whiteRemoteAddress: 192.168.1.*# if it is admin, it could access all resourcesadmin: true
接下来在客户端就可以通过accessKey和secretKey提交身份信息了。客户端在使用时需要先引入一个Maven依赖包。
dependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-acl/artifactIdversion4.9.1/version
/dependency 然后在声明客户端时传入一个RPCHook。
//声明时传入RPCHookDefaultMQProducer producer new DefaultMQProducer(ProducerGroupName, getAclRPCHook());private static final String ACL_ACCESS_KEY RocketMQ;private static final String ACL_SECRET_KEY 1234567;static RPCHook getAclRPCHook() {return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));}
二、springboot整合RocketMQ
1、快速实战
快速创建RocketMQ的客户端。创建Maven工程引入关键依赖
dependenciesdependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-spring-boot-starter/artifactIdversion2.2.2/versionexclusionsexclusiongroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.apache.rocketmq/groupIdartifactIdrocketmq-client/artifactIdversion4.9.5/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactIdversion2.5.9/version/dependencydependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-test/artifactIdversion2.5.9/version/dependencydependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.13.2/versionscopetest/scope/dependencydependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger-ui/artifactIdversion2.9.2/version/dependencydependencygroupIdio.springfox/groupIdartifactIdspringfox-swagger2/artifactIdversion2.9.2/version/dependency/dependencies
使用SpringBoot集成时要非常注意版本
启动类
SpringBootApplication
public class RocketMQSBApplication {public static void main(String[] args) {SpringApplication.run(RocketMQSBApplication.class,args);}
} 配置文件
rocketmq.name-server192.168.65.112:9876
rocketmq.producer.groupspringBootGroup#如果这里不配那就需要在消费者的注解中配。
#rocketmq.consumer.topic
rocketmq.consumer.grouptestGroup
server.port9000
接下来就可以声明生产者直接使用RocketMQTemplate进行消息发送。
package com.roy.rocketmq.basic;import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;import javax.annotation.Resource;Component
public class SpringProducer {Resourceprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String topic,String msg){this.rocketMQTemplate.convertAndSend(topic,msg);}
} 另外这个rocketMQTemplate不光可以发消息还可以主动拉消息。 消费者的声明也很简单。所有属性通过RocketMQMessageListener注解声明。
Component
RocketMQMessageListener(consumerGroup MyConsumerGroup, topic TestTopic,consumeMode ConsumeMode.CONCURRENTLY,messageModel MessageModel.BROADCASTING)
public class SpringConsumer implements RocketMQListenerString {Overridepublic void onMessage(String message) {System.out.println(Received message : message);}
}
这里唯一需要注意下的就是消息了。SpringBoot框架中对消息的封装与原生API的消息封装是不一样的。
2、如何处理各种消息类型
1、各种基础的消息发送机制参见单元测试类com.roy.rocketmq.SpringRocketTest
2、一个RocketMQTemplate实例只能包含一个生产者也就只能往一个Topic下发送消息。如果需要往另外一个Topic下发送消息就需要通过ExtRocketMQTemplateConfiguration()注解另外声明一个子类实例。
3、对于事务消息机制最关键的事务监听器需要通过RocketMQTransactionListener注解注入到Spring容器当中。在这个注解当中可以通过rocketMQTemplateBeanName属性指向具体的RocketMQTemplate子类。
3、实现原理
Push模式
Push模式对于RocketMQMessageListener注解的处理方式入口在rocketmq-spring-boot-2.2.2.jar中的org.apache.rocketmq.spring.autoconfigure.ListenerContainerConfiguration类中。 怎么找到的评经验猜以及碰运气。 这个ListenerContainerConfiguration类继承了Spring当中的SmartInitializingSingleton接口当Spring容器当中所有非懒加载的实例加载完成后就会触发他的afterSingletonsInstantiated方法进行初始化。在这个方法中会去扫描所有带有注解RocketMQMessageListener注解的类将他注册到内部一个Container容器当中。
public void afterSingletonsInstantiated() {MapString, Object beans this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class).entrySet().stream().filter(entry - !ScopedProxyUtils.isScopedTarget(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));beans.forEach(this::registerContainer);}
这里这个Container可以认为是客户端实例的一个容器通过这个容器来封装RocketMQ的原生API。
registerContainer的方法挺长的我这里截取出跟今天的主题相关的几行重要的源码
private void registerContainer(String beanName, Object bean) {.....//获取Bean上面的注解RocketMQMessageListener annotation clazz.getAnnotation(RocketMQMessageListener.class);...//检查注解的配置情况validate(annotation);String containerBeanName String.format(%s_%s, DefaultRocketMQListenerContainer.class.getName(),counter.incrementAndGet());GenericApplicationContext genericApplicationContext (GenericApplicationContext) applicationContext;//将扫描到的注解转化成为Container并注册到上下文中。genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,() - createRocketMQListenerContainer(containerBeanName, bean, annotation));DefaultRocketMQListenerContainer container genericApplicationContext.getBean(containerBeanName,DefaultRocketMQListenerContainer.class);//启动容器这里就相当于是启动了消费者if (!container.isRunning()) {try {container.start();} catch (Exception e) {log.error(Started container failed. {}, container, e);throw new RuntimeException(e);}}log.info(Register the listener to container, listenerBeanName:{}, containerBeanName:{}, beanName, containerBeanName);}
这其中最关注的当然是创建容器的createRocketMQListenerContainer方法中。而在这个方法中你基本看不到RocketMQ的原生API都是在创建并维护一个DefaultRocketMQListenerContainer对象。而这个DefaultRocketMQListenerContainer类就是我们今天关注的重点。
DefaultRocketMQListenerContainer类实现了InitializingBean接口自然要先关注他的afterPropertiesSet方法。这是Spring提供的对象初始化的扩展机制。
public void afterPropertiesSet() throws Exception {initRocketMQPushConsumer();this.messageType getMessageType();this.methodParameter getMethodParameter();log.debug(RocketMQ messageType: {}, messageType);}这个方法就是用来初始化RocketMQ消费者的。在这个方法里就会创建一个RocketMQ原生的DefaultMQPushConsumer消费者。同样方法很长抽取出比较关注的重点源码。
private void initRocketMQPushConsumer() throws MQClientException {.....//检查并创建consumer对象。if (Objects.nonNull(rpcHook)) {consumer new DefaultMQPushConsumer(consumerGroup, rpcHook, new AllocateMessageQueueAveragely(),enableMsgTrace, this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));consumer.setVipChannelEnabled(false);} else {log.debug(Access-key or secret-key not configure in this .);consumer new DefaultMQPushConsumer(consumerGroup, enableMsgTrace,this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));}// 定制instanceName有没有很熟悉consumer.setInstanceName(RocketMQUtil.getInstanceName(nameServer));.....//设定广播消费还是集群消费。switch (messageModel) {case BROADCASTING:consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.BROADCASTING);break;case CLUSTERING:consumer.setMessageModel(org.apache.rocketmq.common.protocol.heartbeat.MessageModel.CLUSTERING);break;default:throw new IllegalArgumentException(Property messageModel was wrong.);}//维护消费者的其他属性。 ...//指定Consumer的消费监听 --》在消费监听中就会去调用onMessage方法。switch (consumeMode) {case ORDERLY:consumer.setMessageListener(new DefaultMessageListenerOrderly());break;case CONCURRENTLY:consumer.setMessageListener(new DefaultMessageListenerConcurrently());break;default:throw new IllegalArgumentException(Property consumeMode was wrong.);}}
这整个就是在维护RocketMQ的原生消费者对象。其中的使用方式其实有很多地方是很值得借鉴的尤其是消费监听的处理。
Pull模式
Pull模式的实现其实是通过在RocketMQTemplate实例中注入一个DefaultLitePullConsumer实例来实现的。只要注入并启动了这个DefaultLitePullConsumer示例后后续就可以通过template实例的receive方法来调用DefaultLitePullConsumer的poll方法主动去Pull获取消息了。
初始化DefaultLitePullConsumer的代码依然是在rocketmq-spring-boot-2.2.2.jar包中。不过处理类是org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration。这个配置类会配置在jar包中的spring.factories文件中通过SpringBoot的自动装载机制加载进来。
Bean(CONSUMER_BEAN_NAME)ConditionalOnMissingBean(DefaultLitePullConsumer.class)ConditionalOnProperty(prefix rocketmq, value {name-server, consumer.group, consumer.topic}) //解析的springboot配置属性。public DefaultLitePullConsumer defaultLitePullConsumer(RocketMQProperties rocketMQProperties)throws MQClientException {RocketMQProperties.Consumer consumerConfig rocketMQProperties.getConsumer();String nameServer rocketMQProperties.getNameServer();String groupName consumerConfig.getGroup();String topicName consumerConfig.getTopic();Assert.hasText(nameServer, [rocketmq.name-server] must not be null);Assert.hasText(groupName, [rocketmq.consumer.group] must not be null);Assert.hasText(topicName, [rocketmq.consumer.topic] must not be null);...//创建消费者 DefaultLitePullConsumer litePullConsumer RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,groupName, topicName, messageModel, selectorType, selectorExpression, ak, sk, pullBatchSize, useTLS);litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());litePullConsumer.setNamespace(consumerConfig.getNamespace());return litePullConsumer;}
RocketMQUtil.createDefaultLitePullConsumer方法中就是在维护一个DefaultLitePullConsumer实例。这个实例就是RocketMQ的原生API当中提供的拉模式客户端。 实际开发中拉模式用得比较少。但是其实RocketMQ针对拉模式也做了非常多的优化。原本提供了一个DefaultMQPullConsumer类进行拉模式消息消费DefaultLitePullConsumer在此基础上做了很多优化。有兴趣可以自己研究一下。 三、RocketMQ最佳实践
1、合理分配Topic、Tag
一个应用尽可能用一个Topic而消息子类型则可以用tags来标识。tags可以由应用自由设置只有生产者在发送消息设置了tags消费方在订阅消息时才可以利用tags通过broker做消息过滤message.setTags(TagA)。
2、使用Key加快消息索引
每个消息在业务层面的唯一标识码要设置到keys字段方便将来定位消息丢失问题。服务器会为每个消息创建索引哈希索引应用可以通过topic、key来查询这条消息内容以及消息被谁消费。由于是哈希索引请务必保证key尽可能唯一这样可以避免潜在的哈希冲突。
3、关注错误消息重试
我们已经知道RocketMQ的消费者端如果处理消息失败了Broker是会将消息重新进行投送的。而在重试时RocketMQ实际上会为每个消费者组创建一个对应的重试队列。重试的消息会进入一个 “%RETRY%”ConsumeGroup 的队列中。
如果重试次数超过设置的次数会进入死信队列到时候需要手动处理所以要检查消费者端执行失败的代码
4、手动处理死信队列
死信队列需要人工进行干预而死信队列的默认权限是不可读并且不可写权限perm被设置成了2:禁读(这个权限有三种 2:禁读4:禁写,6:可读可写)需要手动将死信队列的权限配置成6才能被消费(可以通过mqadmin指定或者web控制台)。死信队列和重试队列都只与消费者组有关和topic和消费者终端无关
5、消费者端进行幂等控制
在MQ系统中对于消息幂等有三种实现语义
at most once 最多一次每条消息最多只会被消费一次at least once 至少一次每条消息至少会被消费一次exactly once 刚刚好一次每条消息都只会确定的消费一次
这三种语义都有他适用的业务场景。
其中at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。
为保证消息消费只有一次生产者发送消息时最好设置一个全局标识性id消费者端根据标识判断消费一次就行
四、RocketMQ基本源码分析 1、nameserver启动流程 kvConfigManagerkey、value的配置读取routeInfoManager组件路由重定位到broker上NettyRemotingServer接收远端请求的服务器接收broker请求注册和客户端发来的请求
2、broker启动流程 1、关注重点
Broker是整个RocketMQ的业务核心。所有消息存储、转发这些重要的业务都是Broker进行处理。
这里重点梳理Broker有哪些内部服务。这些内部服务将是整理Broker核心业务流程的起点。
2、源码重点
Broker启动的入口在BrokerStartup这个类可以从他的main方法开始调试。
启动过程关键点重点也是围绕一个BrokerController对象先创建然后再启动。
首先 在BrokerStartup.createBrokerController方法中可以看到Broker的几个核心配置
BrokerConfig Broker服务配置MessageStoreConfig 消息存储配置。 这两个配置参数都可以在broker.conf文件中进行配置NettyServerConfig Netty服务端占用了10911端口。同样也可以在配置文件中覆盖。NettyClientConfig Broker既要作为Netty服务端向客户端提供核心业务能力又要作为Netty客户端向NameServer注册心跳。
这些配置是我们了解如何优化 RocketMQ 使用的关键。