在线支付网站建设,青海省安建设管理部门网站,凡客建站手机版下载,开一个网站需要多少钱Broker启动
入口#xff1a; org.apache.rocketmq.broker.BrokerStartup#main broker的启动主要分为两部分#xff1a;1.创建brokerController 2.启动brokerController。与平时进行业务开发时不同的是#xff0c;这里的BrokerController相当于Broker的一个中央控制器类 org.apache.rocketmq.broker.BrokerStartup#main broker的启动主要分为两部分1.创建brokerController 2.启动brokerController。与平时进行业务开发时不同的是这里的BrokerController相当于Broker的一个中央控制器类并不是编写http接口的类 创建brokerController设置一些属性参数读取外部配置文件实例化brokerController并调用其初始化方法。 实例化brokerController主要是会实例化很多的配置类和线程池的阻塞队列。初始化方法 会加载如消费者消费进度等的文件会实例化消息存储的服务类并调用其加载方法加载储存消息的相关文件到内存中并进行数据恢复。还会创建各类线程池并注册netty服务的消息处理器之后创建各种定时任务如持久化消费者消费进度的任务、定时打印各种日志的任务等。 启动brokerController: 内部会启动消息储存服务、netty服务、长轮询拉取消息挂起服务、向nameserver心跳定时任务等 public static void main(String[] args) {// broker启动start(createBrokerController(args));
}创建brokerController
创建brokerControllerorg.apache.rocketmq.broker.BrokerStartup#createBrokerController
public static BrokerController createBrokerController(String[] args) {// 设置属性rocketmq.remoting.version即当前rocketmq版本System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));if (null System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE)) {NettySystemConfig.socketSndbufSize 131072;}if (null System.getProperty(NettySystemConfig.COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE)) {NettySystemConfig.socketRcvbufSize 131072;}try {//PackageConflictDetect.detectFastjson();Options options ServerUtil.buildCommandlineOptions(new Options());commandLine ServerUtil.parseCmdLine(mqbroker, args, buildCommandlineOptions(options),new PosixParser());if (null commandLine) {System.exit(-1);}// 创建Broker的配置类包含Broker的各种配置比如 ROCKETMQ_HOMEfinal BrokerConfig brokerConfig new BrokerConfig();// NettyServer的配置类Broker接收来自客户端的消息的时候作为服务端final NettyServerConfig nettyServerConfig new NettyServerConfig();// NettyClient的配置类Broker连接NameServer的时候还会作为客户端final NettyClientConfig nettyClientConfig new NettyClientConfig();nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,String.valueOf(TlsSystemConfig.tlsMode TlsMode.ENFORCING))));// 设置作为NettyServer时的监听端口为10911nettyServerConfig.setListenPort(10911);// Broker的消息存储配置例如各种文件大小等final MessageStoreConfig messageStoreConfig new MessageStoreConfig();if (BrokerRole.SLAVE messageStoreConfig.getBrokerRole()) {int ratio messageStoreConfig.getAccessMessageInMemoryMaxRatio() - 10;messageStoreConfig.setAccessMessageInMemoryMaxRatio(ratio);}/** 判断命令行启动是否包含 -c 命令用于指定配置文件* 如果包含则解析指定的配置文件* 启动Broker的时候使用命令参数 -c /xxx/rocketmq/config/conf/broker.conf*/if (commandLine.hasOption(c)) {String file commandLine.getOptionValue(c);if (file ! null) {configFile file;InputStream in new BufferedInputStream(new FileInputStream(file));properties new Properties();properties.load(in);properties2SystemEnv(properties);MixAll.properties2Object(properties, brokerConfig);MixAll.properties2Object(properties, nettyServerConfig);MixAll.properties2Object(properties, nettyClientConfig);MixAll.properties2Object(properties, messageStoreConfig);BrokerPathConfigHelper.setBrokerConfigPath(file);in.close();}}MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);if (null brokerConfig.getRocketmqHome()) {System.out.printf(Please set the %s variable in your environment to match the location of the RocketMQ installation, MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}// 获取namesrv地址String namesrvAddr brokerConfig.getNamesrvAddr();if (null ! namesrvAddr) {try {// 可以指定多个地址以 ; 隔开这里进行拆分String[] addrArray namesrvAddr.split(;);for (String addr : addrArray) {RemotingUtil.string2SocketAddress(addr);}} catch (Exception e) {System.out.printf(The Name Server Address[%s] illegal, please set it as follows, \127.0.0.1:9876;192.168.0.1:9876\%n,namesrvAddr);System.exit(-3);}}// 设置、校验brokerIdBrokerId为0表示Master非0表示Slaveswitch (messageStoreConfig.getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:brokerConfig.setBrokerId(MixAll.MASTER_ID);break;case SLAVE:if (brokerConfig.getBrokerId() 0) {System.out.printf(Slaves brokerId must be 0);System.exit(-3);}break;default:break;}// 是否开启DLeger即多副本主从切换集群if (messageStoreConfig.isEnableDLegerCommitLog()) {brokerConfig.setBrokerId(-1);}// 设置高可用通信监听端口为监听端口1默认就是10912// 该端口主要用于 如 主从同步之类的高可用操作messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() 1);LoggerContext lc (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(brokerConfig.getRocketmqHome() /conf/logback_broker.xml);// 判断命令行中是否包含字符pprintConfigItem和m如果存在则打印配置信息并结束jvm运行没有的话就不用管if (commandLine.hasOption(p)) {InternalLogger console InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig);MixAll.printObjectProperties(console, nettyServerConfig);MixAll.printObjectProperties(console, nettyClientConfig);MixAll.printObjectProperties(console, messageStoreConfig);System.exit(0);} else if (commandLine.hasOption(m)) {InternalLogger console InternalLoggerFactory.getLogger(LoggerName.BROKER_CONSOLE_NAME);MixAll.printObjectProperties(console, brokerConfig, true);MixAll.printObjectProperties(console, nettyServerConfig, true);MixAll.printObjectProperties(console, nettyClientConfig, true);MixAll.printObjectProperties(console, messageStoreConfig, true);System.exit(0);}// 打印当前broker的配置日志log InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);MixAll.printObjectProperties(log, brokerConfig);MixAll.printObjectProperties(log, nettyServerConfig);MixAll.printObjectProperties(log, nettyClientConfig);MixAll.printObjectProperties(log, messageStoreConfig);// 实例化 BrokerController内部主要初始化了一些配置类、manager类、处理器、线程池等final BrokerController controller new BrokerController(brokerConfig,nettyServerConfig,nettyClientConfig,messageStoreConfig);// remember all configs to prevent discard// 将所有的-c的外部配置信息保存到BrokerController中的Configuration对象属性的allConfigs属性中controller.getConfiguration().registerConfig(properties);// 初始化BrokerControllerboolean initResult controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 添加关闭钩子方法在Broker关闭之前执行进行一些内存清理、对象销毁等操作Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {private volatile boolean hasShutdown false;private AtomicInteger shutdownTimes new AtomicInteger(0);Overridepublic void run() {synchronized (this) {log.info(Shutdown hook was invoked, {}, this.shutdownTimes.incrementAndGet());if (!this.hasShutdown) {this.hasShutdown true;long beginTime System.currentTimeMillis();controller.shutdown();long consumingTimeTotal System.currentTimeMillis() - beginTime;log.info(Shutdown hook over, consuming total time(ms): {}, consumingTimeTotal);}}}}, ShutdownHook));return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}实例化brokerController
实例化brokerController: org.apache.rocketmq.broker.BrokerController#BrokerController
public BrokerController(final BrokerConfig brokerConfig,final NettyServerConfig nettyServerConfig,final NettyClientConfig nettyClientConfig,final MessageStoreConfig messageStoreConfig
) {// broker的配置this.brokerConfig brokerConfig;// 作为netty服务端与客户端交互的配置this.nettyServerConfig nettyServerConfig;// 作为netty客户端与服务端交互的配置this.nettyClientConfig nettyClientConfig;// 消息存储的配置this.messageStoreConfig messageStoreConfig;// 消费者偏移量管理器维护offset进度信息this.consumerOffsetManager new ConsumerOffsetManager(this);//topic配置管理器管理broker中存储的所有topic的配置this.topicConfigManager new TopicConfigManager(this);// 处理消费者拉取消息请求的处理器this.pullMessageProcessor new PullMessageProcessor(this);//拉取请求挂起服务处理无消息时push长轮询消费者的挂起等待机制this.pullRequestHoldService new PullRequestHoldService(this);//消息送达的监听器生产者消息到达时通过该监听器触发pullRequestHoldService通知pullRequestHoldServicethis.messageArrivingListener new NotifyMessageArrivingListener(this.pullRequestHoldService);//消费者id变化监听器this.consumerIdsChangeListener new DefaultConsumerIdsChangeListener(this);//消费者管理类维护消费者组的注册实例信息以及topic的订阅信息并对消费者id变化进行监听this.consumerManager new ConsumerManager(this.consumerIdsChangeListener);//消费者过滤管理器配置文件为xx/config/consumerFilter.jsonthis.consumerFilterManager new ConsumerFilterManager(this);//生产者管理器包含生产者的注册信息通过groupName分组this.producerManager new ProducerManager();//客户端连接心跳服务用于定时扫描生产者和消费者客户端并将不活跃的客户端通道及相关信息移除this.clientHousekeepingService new ClientHousekeepingService(this);//处理某些broker到客户端的请求例如检查生产者的事务状态重置offsetthis.broker2Client new Broker2Client(this);//订阅分组关系管理器维护消费者组的一些附加运维信息this.subscriptionGroupManager new SubscriptionGroupManager(this);//broker对方访问的API处理broker对外的发起请求比如向nameServer注册向master、slave发起的请求this.brokerOuterAPI new BrokerOuterAPI(nettyClientConfig);//过滤服务管理器拉取消息过滤this.filterServerManager new FilterServerManager(this);//用于从节点定时向主节点发起请求同步数据例如topic配置、消费位移等this.slaveSynchronize new SlaveSynchronize(this);/*初始化各种阻塞队列。将会被设置到对应的处理不同客户端请求的线程池执行器中*///处理来自生产者的发送消息的请求的队列this.sendThreadPoolQueue new LinkedBlockingQueueRunnable(this.brokerConfig.getSendThreadPoolQueueCapacity());this.pullThreadPoolQueue new LinkedBlockingQueueRunnable(this.brokerConfig.getPullThreadPoolQueueCapacity());//处理reply消息的请求的队列RocketMQ4.7.0版本中增加了request-reply新特性该特性允许producer在发送消息后同步或者异步等待consumer消费完消息并返回响应消息类似rpc调用效果。//即生产者发送了消息之后可以同步或者异步的收到消费了这条消息的消费者的响应this.replyThreadPoolQueue new LinkedBlockingQueueRunnable(this.brokerConfig.getReplyThreadPoolQueueCapacity());//处理查询请求的队列this.queryThreadPoolQueue new LinkedBlockingQueueRunnable(this.brokerConfig.getQueryThreadPoolQueueCapacity());//客户端管理器的队列this.clientManagerThreadPoolQueue new LinkedBlockingQueueRunnable(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());//消费者管理器的队列目前没用到this.consumerManagerThreadPoolQueue new LinkedBlockingQueueRunnable(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());//心跳处理的队列this.heartbeatThreadPoolQueue new LinkedBlockingQueueRunnable(this.brokerConfig.getHeartbeatThreadPoolQueueCapacity());//事务消息相关处理的队列this.endTransactionThreadPoolQueue new LinkedBlockingQueueRunnable(this.brokerConfig.getEndTransactionPoolQueueCapacity());//broker状态管理器保存Broker运行时状态this.brokerStatsManager new BrokerStatsManager(this.brokerConfig.getBrokerClusterName());//目前没用到this.setStoreHost(new InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), this.getNettyServerConfig().getListenPort()));//broker快速失败服务this.brokerFastFailure new BrokerFastFailure(this);//配置类this.configuration new Configuration(log,BrokerPathConfigHelper.getBrokerConfigPath(),this.brokerConfig, this.nettyServerConfig, this.nettyClientConfig, this.messageStoreConfig);
}初始化brokerController
初始化brokerController: org.apache.rocketmq.broker.BrokerController#initialize
public boolean initialize() throws CloneNotSupportedException {//topic配置文件加载路径为 {user.home}/store/config/topics.jsonboolean result this.topicConfigManager.load();// 消费者消费偏移量配置文件加载路径为 {user.home}/store/config/consumerOffset.jsonresult result this.consumerOffsetManager.load();// 订阅分组配置文件加载路径为 {user.home}/store/config/subscriptionGroup.jsonresult result this.subscriptionGroupManager.load();// 消费者过滤配置文件加载路径为 {user.home}/store/config/consumerFilter.jsonresult result this.consumerFilterManager.load();if (result) {// 实例化和初始化消息存储服务相关类 DefaultMessageStoretry {// 实例化消息存储类DefaultMessageStorethis.messageStore new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,this.brokerConfig);/*** enableDLegerCommitLog 为 true默认为false则创建DLedgerRoleChangeHandler。* 在启用enableDLegerCommitLog情况下broker通过raft协议选主可以实现主从角色自动切换*/if (messageStoreConfig.isEnableDLegerCommitLog()) {DLedgerRoleChangeHandler roleChangeHandler new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);}this.brokerStats new BrokerStats((DefaultMessageStore) this.messageStore);//load pluginMessageStorePluginContext context new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);this.messageStore MessageStoreFactory.build(context, this.messageStore);this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));} catch (IOException e) {result false;log.error(Failed to initialize, e);}}/** 通过消息存储服务 加载 储存消息的相关文件到内存中并进行数据恢复此步骤是broker启动的核心步骤* 文件commitLog文件储存消息的、consumequeue文件消费者依据此文件消费储存指向commitLog中消息的偏移量、indexFile文件* 数据恢复broker可能会异常关闭导致消息在文件中储存不完整 或 已储存到commitLog但未存储到consumequeue和indexFile*/result result this.messageStore.load();if (result) {// 创建netty远程服务remotingServer和fastRemotingServerthis.remotingServer new NettyRemotingServer(this.nettyServerConfig, this.clientHousekeepingService);NettyServerConfig fastConfig (NettyServerConfig) this.nettyServerConfig.clone();fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);this.fastRemotingServer new NettyRemotingServer(fastConfig, this.clientHousekeepingService);/********************* 创建各种执行器线程池*********************/// 处理发送消息的请求的线程池this.sendMessageExecutor new BrokerFixedThreadPoolExecutor(this.brokerConfig.getSendMessageThreadPoolNums(),this.brokerConfig.getSendMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.sendThreadPoolQueue,new ThreadFactoryImpl(SendMessageThread_));//处理拉取消息的请求的线程池this.pullMessageExecutor new BrokerFixedThreadPoolExecutor(this.brokerConfig.getPullMessageThreadPoolNums(),this.brokerConfig.getPullMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.pullThreadPoolQueue,new ThreadFactoryImpl(PullMessageThread_));// 处理reply消息的请求的线程池Reply模式允许Producer发出消息后以同步或异步的形式等Consumer消费并返回一个响应消息达到类似RPC的调用过程this.replyMessageExecutor new BrokerFixedThreadPoolExecutor(this.brokerConfig.getProcessReplyMessageThreadPoolNums(),this.brokerConfig.getProcessReplyMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.replyThreadPoolQueue,new ThreadFactoryImpl(ProcessReplyMessageThread_));// 处理查询请求的线程池this.queryMessageExecutor new BrokerFixedThreadPoolExecutor(this.brokerConfig.getQueryMessageThreadPoolNums(),this.brokerConfig.getQueryMessageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.queryThreadPoolQueue,new ThreadFactoryImpl(QueryMessageThread_));// broker 管理线程池作为默认处理器的线程池this.adminBrokerExecutor Executors.newFixedThreadPool(this.brokerConfig.getAdminBrokerThreadPoolNums(), new ThreadFactoryImpl(AdminBrokerThread_));// 客户端管理器的线程池this.clientManageExecutor new ThreadPoolExecutor(this.brokerConfig.getClientManageThreadPoolNums(),this.brokerConfig.getClientManageThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.clientManagerThreadPoolQueue,new ThreadFactoryImpl(ClientManageThread_));// 心跳处理的线程池this.heartbeatExecutor new BrokerFixedThreadPoolExecutor(this.brokerConfig.getHeartbeatThreadPoolNums(),this.brokerConfig.getHeartbeatThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.heartbeatThreadPoolQueue,new ThreadFactoryImpl(HeartbeatThread_, true));// 事务消息相关处理的线程池this.endTransactionExecutor new BrokerFixedThreadPoolExecutor(this.brokerConfig.getEndTransactionThreadPoolNums(),this.brokerConfig.getEndTransactionThreadPoolNums(),1000 * 60,TimeUnit.MILLISECONDS,this.endTransactionThreadPoolQueue,new ThreadFactoryImpl(EndTransactionThread_));//消费者管理的线程池this.consumerManageExecutor Executors.newFixedThreadPool(this.brokerConfig.getConsumerManageThreadPoolNums(), new ThreadFactoryImpl(ConsumerManageThread_));// 注册netty服务的消息处理器this.registerProcessor();/********************** 启动各种定时任务 **********************/final long initialDelay UtilAll.computeNextMorningTimeMillis() - System.currentTimeMillis();final long period 1000 * 60 * 60 * 24;// 每隔24h打印昨天生产和消费的消息数量this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.getBrokerStats().record();} catch (Throwable e) {log.error(schedule record error., e);}}}, initialDelay, period, TimeUnit.MILLISECONDS);//每隔5s將消费者offset进行持久化存入consumerOffset.json文件中this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.consumerOffsetManager.persist();} catch (Throwable e) {log.error(schedule persist consumerOffset error., e);}}}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);//每隔10s將消费过滤信息进行持久化存入consumerFilter.json文件中this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.consumerFilterManager.persist();} catch (Throwable e) {log.error(schedule persist consumer filter error., e);}}}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);/*** 每3分钟检查消费进度若消费进度落后超过 consumerFallbehindThreshold 1024L * 1024 * 1024 * 16\* 且disableConsumeIfConsumerReadSlowlytrue(默认false)则剔除掉该订阅组该消费者组停止消费消息来保护broker* 因为存储消息的commitLog一个文件大小才为1024L * 1024 * 1024。* ps不清楚原因*/this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.protectBroker();} catch (Throwable e) {log.error(protectBroker error., e);}}}, 3, 3, TimeUnit.MINUTES);//每隔1s將打印发送消息线程池队列、拉取消息线程池队列、查询消息线程池队列、结束事务线程池队列的大小以及队列头部元素存在时间this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.printWaterMark();} catch (Throwable e) {log.error(printWaterMark error., e);}}}, 10, 1, TimeUnit.SECONDS);//每隔1min將打印已存储在commitlog提交日志中但尚未分派到consumequeue消费队列的字节数。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {log.info(dispatch behind commit log {} bytes, BrokerController.this.getMessageStore().dispatchBehindBytes());} catch (Throwable e) {log.error(schedule dispatchBehindBytes error., e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);if (this.brokerConfig.getNamesrvAddr() ! null) {// 更新NamesrvAddrthis.brokerOuterAPI.updateNameServerAddressList(this.brokerConfig.getNamesrvAddr());log.info(Set user specified name server address: {}, this.brokerConfig.getNamesrvAddr());} else if (this.brokerConfig.isFetchNamesrvAddrByAddressServer()) {/*** 如果未指定nameSvr地址且开启从地址服务器获取* 则每2min从nameSvr地址服务器获取最新的地址并更新* 若希望动态更新nameSvr地址则需要指定地址服务器url和fetchNamesrvAddrByAddressServer设置为true*/this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.brokerOuterAPI.fetchNameServerAddr();} catch (Throwable e) {log.error(ScheduledTask fetchNameServerAddr exception, e);}}}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);}//如果没有开启DLeger服务DLeger开启后表示支持高可用的主从自动切换if (!messageStoreConfig.isEnableDLegerCommitLog()) {if (BrokerRole.SLAVE this.messageStoreConfig.getBrokerRole()) {// 如果是从节点更新master地址if (this.messageStoreConfig.getHaMasterAddress() ! null this.messageStoreConfig.getHaMasterAddress().length() 6) {this.messageStore.updateHaMasterAddress(this.messageStoreConfig.getHaMasterAddress());this.updateMasterHAServerAddrPeriodically false;} else {this.updateMasterHAServerAddrPeriodically true;}} else {//如果是主节点每隔60s將打印主从节点的差异this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.printMasterAndSlaveDiff();} catch (Throwable e) {log.error(schedule printMasterAndSlaveDiff error., e);}}}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);}}// ....省略// 初始化事务消息相关服务initialTransaction();// 初始化权限相关服务initialAcl();// 初始化RPC调用的钩子函数initialRpcHooks();}return result;
}启动brokerController
入口: org.apache.rocketmq.broker.BrokerStartup#start
public static BrokerController start(BrokerController controller) {try {// BrokerController启动controller.start();String tip The broker[ controller.getBrokerConfig().getBrokerName() , controller.getBrokerAddr() ] boot success. serializeType RemotingCommand.getSerializeTypeConfigInThisServer();if (null ! controller.getBrokerConfig().getNamesrvAddr()) {tip and name server is controller.getBrokerConfig().getNamesrvAddr();}log.info(tip);System.out.printf(%s%n, tip);return controller;} catch (Throwable e) {e.printStackTrace();System.exit(-1);}return null;
}public void start() throws Exception {//启动消息存储服务if (this.messageStore ! null) {this.messageStore.start();}//启动netty远程服务if (this.remotingServer ! null) {this.remotingServer.start();}//启动netty远程服务VIP通道if (this.fastRemotingServer ! null) {this.fastRemotingServer.start();}if (this.fileWatchService ! null) {this.fileWatchService.start();}if (this.brokerOuterAPI ! null) {this.brokerOuterAPI.start();}//长轮询拉取消息挂起服务启动if (this.pullRequestHoldService ! null) {this.pullRequestHoldService.start();}//客户端连接心跳服务启动if (this.clientHousekeepingService ! null) {this.clientHousekeepingService.start();}if (this.filterServerManager ! null) {this.filterServerManager.start();}// 如果没有开启DLegerif (!messageStoreConfig.isEnableDLegerCommitLog()) {//如果不是SLAVE那么启动事务消息检查服务startProcessorByHa(messageStoreConfig.getBrokerRole());//如果是SLAVE则启动主从同步服务, 定时任务每隔10s与master机器同步数据采用slave主动拉取的方法//同步的内容包括topic配置消费者消费位移、延迟消息偏移量、订阅组信息等handleSlaveSynchronize(messageStoreConfig.getBrokerRole());this.registerBrokerAll(true, false, true);}// 定时任务默认每30s向namesvr发起一次注册即心跳包。时间间隔可配置registerNameServerPeriod1万到6万毫秒间。this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {Overridepublic void run() {try {BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error(registerBrokerAll Exception, e);}}}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager ! null) {this.brokerStatsManager.start();}if (this.brokerFastFailure ! null) {this.brokerFastFailure.start();}}