网站的源代码有什么用,个人承包工程需要什么资质,wordpress管理员名,搜索企业的软件本文转载自 linkedkeeper.com 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能#xff0c;成为异步RPC的主要手段之一。 当今市面上有很多主流的消息中间件#xff0c;如老牌的ActiveMQ、RabbitMQ#… 本文转载自 linkedkeeper.com 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能成为异步RPC的主要手段之一。 当今市面上有很多主流的消息中间件如老牌的ActiveMQ、RabbitMQ炙手可热的Kafka阿里巴巴自主开发的Notify、MetaQ、RocketMQ等。 本文不会一一介绍这些消息队列的所有特性而是探讨一下自主开发设计一个消息队列时你需要思考和设计的重要方面。过程中我们会参考这些成熟消息队列的很多重要思想。 本文首先会阐述什么时候你需要一个消息队列然后以Push模型为主从零开始分析设计一个消息队列时需要考虑到的问题如RPC、高可用、顺序和重复消息、可靠投递、消费关系解析等。 也会分析以Kafka为代表的pull模型所具备的优点。最后是一些高级主题如用批量/异步提高性能、pull模型的系统设计理念、存储子系统的设计、流量控制的设计、公平调度的实现等。其中最后四个方面会放在下篇讲解。
何时需要消息队列
当你需要使用消息队列时首先需要考虑它的必要性。可以使用mq的场景有很多最常用的几种是做业务解耦/最终一致性/广播/错峰流控等。反之如果需要强一致性关注业务逻辑的处理结果则RPC显得更为合适。
解耦
解耦是消息队列要解决的最本质问题。所谓解耦简单点讲就是一个事务只关心核心的流程。而需要依赖其他系统但不那么重要的事情有通知即可无需等待结果。换句话说基于消息的模型关心的是“通知”而非“处理”。 比如在美团旅游我们有一个产品中心产品中心上游对接的是主站、移动后台、旅游供应链等各个数据源下游对接的是筛选系统、API系统等展示系统。当上游的数据发生变更的时候如果不使用消息系统势必要调用我们的接口来更新数据就特别依赖产品中心接口的稳定性和处理能力。但其实作为旅游的产品中心也许只有对于旅游自建供应链产品中心更新成功才是他们关心的事情。而对于团购等外部系统产品中心更新成功也好、失败也罢并不是他们的职责所在。他们只需要保证在信息变更的时候通知到我们就好了。 而我们的下游可能有更新索引、刷新缓存等一系列需求。对于产品中心来说这也不是我们的职责所在。说白了如果他们定时来拉取数据也能保证数据的更新只是实时性没有那么强。但使用接口方式去更新他们的数据显然对于产品中心来说太过于“重量级”了只需要发布一个产品ID变更的通知由下游系统来处理可能更为合理。 再举一个例子对于我们的订单系统订单最终支付成功之后可能需要给用户发送短信积分什么的但其实这已经不是我们系统的核心流程了。如果外部系统速度偏慢比如短信网关速度不好那么主流程的时间会加长很多用户肯定不希望点击支付过好几分钟才看到结果。那么我们只需要通知短信系统“我们支付成功了”不一定非要等待它处理完成。
最终一致性
最终一致性指的是两个系统的状态保持一致要么都成功要么都失败。当然有个时间限制理论上越快越好但实际上在各种异常的情况下可能会有一定延迟达到最终一致状态但最后两个系统的状态是一样的。 业界有一些为“最终一致性”而生的消息队列如Notify阿里、QMQ去哪儿等其设计初衷就是为了交易系统中的高可靠通知。 以一个银行的转账过程来理解最终一致性转账的需求很简单如果A系统扣钱成功则B系统加钱一定成功。反之则一起回滚像什么都没发生一样。 然而这个过程中存在很多可能的意外
A扣钱成功调用B加钱接口失败。A扣钱成功调用B加钱接口虽然成功但获取最终结果时网络异常引起超时。A扣钱成功B加钱失败A想回滚扣的钱但A机器down机。
可见想把这件看似简单的事真正做成真的不那么容易。所有跨VM的一致性问题从技术的角度讲通用的解决方案是
强一致性分布式事务但落地太难且成本太高后文会具体提到。最终一致性主要是用“记录”和“补偿”的方式。在做所有的不确定的事情之前先把事情记录下来然后去做不确定的事情结果可能是成功、失败或是不确定“不确定”例如超时等可以等价为失败。成功就可以把记录的东西清理掉了对于失败和不确定可以依靠定时任务等方式把所有失败的事情重新搞一遍直到成功为止。 回到刚才的例子系统在A扣钱成功的情况下把要给B“通知”这件事记录在库里为了保证最高的可靠性可以把通知B系统加钱和扣钱成功这两件事维护在一个本地事务里通知成功则删除这条记录通知失败或不确定则依靠定时任务补偿性地通知我们直到我们把状态更新成正确的为止。 整个这个模型依然可以基于RPC来做但可以抽象成一个统一的模型基于消息队列来做一个“企业总线”。 具体来说本地事务维护业务变化和通知消息一起落地失败则一起回滚然后RPC到达broker在broker成功落地后RPC返回成功本地消息可以删除。否则本地消息一直靠定时任务轮询不断重发这样就保证了消息可靠落地broker。 broker往consumer发送消息的过程类似一直发送消息直到consumer发送消费成功确认。 我们先不理会重复消息的问题通过两次消息落地加补偿下游是一定可以收到消息的。然后依赖状态机版本号等方式做判重更新自己的业务就实现了最终一致性。
最终一致性不是消息队列的必备特性但确实可以依靠消息队列来做最终一致性的事情。另外所有不保证100%不丢消息的消息队列理论上无法实现最终一致性。好吧应该说理论上的100%排除系统严重故障和bug。 像Kafka一类的设计在设计层面上就有丢消息的可能比如定时刷盘如果掉电就会丢消息。哪怕只丢千分之一的消息业务也必须用其他的手段来保证结果正确。
广播
消息队列的基本功能之一是进行广播。如果没有消息队列每当一个新的业务方接入我们都要联调一次新接口。有了消息队列我们只需要关心消息是否送达了队列至于谁希望订阅是下游的事情无疑极大地减少了开发和联调的工作量。 比如本文开始提到的产品中心发布产品变更的消息以及景点库很多去重更新的消息可能“关心”方有很多个但产品中心和景点库只需要发布变更消息即可谁关心谁接入。
错峰与流控
试想上下游对于事情的处理能力是不同的。比如Web前端每秒承受上千万的请求并不是什么神奇的事情只需要加多一点机器再搭建一些LVS负载均衡设备和Nginx等即可。但数据库的处理能力却十分有限即使使用SSD加分库分表单机的处理能力仍然在万级。由于成本的考虑我们不能奢求数据库的机器数量追上前端。 这种问题同样存在于系统和系统之间如短信系统可能由于短板效应速度卡在网关上每秒几百次请求跟前端的并发量不是一个数量级。但用户晚上个半分钟左右收到短信一般是不会有太大问题的。如果没有消息队列两个系统之间通过协商、滑动窗口等复杂的方案也不是说不能实现。但系统复杂性指数级增长势必在上游或者下游做存储并且要处理定时、拥塞等一系列问题。而且每当有处理能力有差距的时候都需要单独开发一套逻辑来维护这套逻辑。所以利用中间系统转储两个系统的通信内容并在下游系统有能力处理这些消息的时候再处理这些消息是一套相对较通用的方式。
总而言之消息队列不是万能的。对于需要强事务保证而且延迟敏感的RPC是优于消息队列的。 对于一些无关痛痒或者对于别人非常重要但是对于自己不是那么关心的事情可以利用消息队列去做。 支持最终一致性的消息队列能够用来处理延迟不那么敏感的“分布式事务”场景而且相对于笨重的分布式事务可能是更优的处理方式。 当上下游系统处理能力存在差距的时候利用消息队列做一个通用的“漏斗”。在下游有能力处理的时候再进行分发。
如果下游有很多系统关心你的系统发出的通知的时候果断地使用消息队列吧。
消息队列的流派之争
这篇文章的标题很难起网上一翻全是各种MQ的性能比较很容易让人以为我也是这么“粗俗”的人o(╯□╰)o。我这篇文章想要表达的是——它们根本不是一个东西有毛的性能好比较
MQ是什么
Message QueueMQ消息队列中间件。很多人都说MQ通过将消息的发送和接收分离来实现应用程序的异步和解偶这个给人的直觉是——MQ是异步的用来解耦的但是这个只是MQ的效果而不是目的。MQ真正的目的是为了通讯屏蔽底层复杂的通讯协议定义了一套应用层的、更加简单的通讯协议。一个分布式系统中两个模块之间通讯要么是HTTP要么是自己开发的TCP但是这两种协议其实都是原始的协议。HTTP协议很难实现两端通讯——模块A可以调用BB也可以主动调用A如果要做到这个两端都要背上WebServer而且还不支持长连接HTTP 2.0的库根本找不到。TCP就更加原始了粘包、心跳、私有的协议想一想头皮就发麻。MQ所要做的就是在这些协议之上构建一个简单的“协议”——生产者/消费者模型。MQ带给我的“协议”不是具体的通讯协议而是更高层次通讯模型。它定义了两个对象——发送数据的叫生产者消费数据的叫消费者 提供一个SDK让我们可以定义自己的生产者和消费者实现消息通讯而无视底层通讯协议。
MQ的流派
列出功能表来比较MQ差异或者来一场“MQ性能大比武”的做法都是比较扯的首先要做的事情应该是分类。我理解的MQ分为两个流派
有broker
这个流派通常有一台服务器作为Broker所有的消息都通过它中转。生产者把消息发送给它就结束自己的任务了Broker则把消息主动推送给消费者或者消费者主动轮询。
重Topic流 kafka、JMS就属于这个流派生产者会发送key和数据到Broker由Broker比较key之后决定给那个消费者。这种模式是我们最常见的模式是我们对MQ最多的印象。在这种模式下一个topic往往是一个比较大的概念甚至一个系统中就可能只有一个topictopic某种意义上就是queue生产者发送key相当于说“hi把数据放到key的队列中”。 如上图所示Broker定义了三个队列key1key2key3生产者发送数据的时候会发送key1和dataBroker在推送数据的时候则推送data也可能把key带上。虽然架构一样但是kafka的性能要比jms的性能不知道高到多少倍所以基本这种类型的MQ只有kafka一种备选方案。如果你需要一条暴力的数据流在乎性能而非灵活性)那么kafka是最好的选择。
轻Topic流
这种的代表是RabbitMQ或者说是AMQP。生产者发送key和数据消费者定义订阅的队列Broker收到数据之后会通过一定的逻辑计算出key对应的队列然后把数据交给队列。 注意到了吗这种模式下解耦了key和queue在这种架构中queue是非常轻量级的在RabbitMQ中它的上限取决于你的内存消费者关心的只是自己的queue生产者不必关心数据最终给谁只要指定key就行了中间的那层映射在AMQP中叫exchange交换机。AMQP中有四种种exchange——Direct exchangekey就等于queueFanout exchange无视key给所有的queue都来一份Topic exchangekey可以用“宽字符”模糊匹配queue最后一个厉害了Headers exchange无视key通过查看消息的头部元数据来决定发给那个queueAMQP头部元数据非常丰富而且可以自定义。这种结构的架构给通讯带来了很大的灵活性我们能想到的通讯方式都可以用这四种exchange表达出来。如果你需要一个企业数据总线在乎灵活性那么RabbitMQ绝对的值得一用。
无broker
此门派是AMQP的“叛徒”某位道友嫌弃AMQP太“重”那是他没看到用Erlang实现的时候是多么的行云流水 所以设计了zeromq。这位道友非常睿智他非常敏锐的意识到——MQ是更高级的Socket它是解决通讯问题的。所以ZeroMQ被设计成了一个“库”而不是一个中间件这种实现也可以达到——没有broker的目的。 节点之间通讯的消息都是发送到彼此的队列中每个节点都既是生产者又是消费者。ZeroMQ做的事情就是封装出一套类似于scoket的API可以完成发送数据读取数据。如果你仔细想一下其实ZeroMQ是这样的 顿悟了吗Actor模型ZeroMQ其实就是一个跨语言的、重量级的Actor模型邮箱库。你可以把自己的程序想象成一个actorzeromq就是提供邮箱功能的库zeromq可以实现同一台机器的IPC通讯也可以实现不同机器的TCP、UDP通讯。如果你需要一个强大的、灵活、野蛮的通讯能力别犹豫zeromq。
MQ只能异步吗
答案是否定了首先ZeroMQ支持请求-应答模式其次RabbitMQ提供了RPC是地地道道的同步通讯只有JMS、kafka这种架构才只能做异步。我们很多人第一次接触MQ都是JMS之类的这种所以才会产生这种错觉。
总结
kafkazeromqrabbitmq代表了三种完全不同风格的MQ架构关注点完全不同
kafka在乎的是性能速度 rabbitmq追求的是灵活 zeromq追求的是轻量级、分布式
如果你拿zeromq来做大数据量的传输功能不是生产者的内存“爆掉”就是消费者被“压死”如果你用kafka做通讯总线那绝对的不会快只能更慢你想要rabbitmq实现分布式那真的是难为它。
如何设计一个消息队列
综述
我们现在明确了消息队列的使用场景下一步就是如何设计实现一个消息队列了。 基于消息的系统模型不一定需要broker(消息队列服务端)。市面上的的Akkaactor模型)、ZeroMQ等其实都是基于消息的系统设计范式但是没有broker。 我们之所以要设计一个消息队列并且配备broker无外乎要做两件事情
消息的转储在更合适的时间点投递或者通过一系列手段辅助消息最终能送达消费机。规范一种范式和通用的模式以满足解耦、最终一致性、错峰等需求。 掰开了揉碎了看最简单的消息队列可以做成一个消息转发器把一次RPC做成两次RPC。发送者把消息投递到服务端以下简称broker服务端再将消息转发一手到接收端就是这么简单。
一般来讲设计消息队列的整体思路是先build一个整体的数据流,例如producer发送给broker,broker发送给consumer,consumer回复消费确认broker删除/备份消息等。 利用RPC将数据流串起来。然后考虑RPC的高可用性尽量做到无状态方便水平扩展。 之后考虑如何承载消息堆积然后在合适的时机投递消息而处理堆积的最佳方式就是存储存储的选型需要综合考虑性能/可靠性和开发维护成本等诸多因素。 为了实现广播功能我们必须要维护消费关系可以利用zk/config server等保存消费关系。 在完成了上述几个功能后消息队列基本就实现了。然后我们可以考虑一些高级特性如可靠投递事务特性性能优化等。 下面我们会以设计消息队列时重点考虑的模块为主线穿插灌输一些消息队列的特性实现方法来具体分析设计实现一个消息队列时的方方面面。
实现队列基本功能
RPC通信协议
刚才讲到所谓消息队列无外乎两次RPC加一次转储当然需要消费端最终做消费确认的情况是三次RPC。既然是RPC就必然牵扯出一系列话题什么负载均衡啊、服务发现啊、通信协议啊、序列化协议啊等等。在这一块我的强烈建议是不要重复造轮子。利用公司现有的RPC框架Thrift也好Dubbo也好或者是其他自定义的框架也好。因为消息队列的RPC和普通的RPC没有本质区别。当然了自主利用Memchached或者Redis协议重新写一套RPC框架并非不可如MetaQ使用了自己封装的Gecko NIO框架卡夫卡也用了类似的协议。但实现成本和难度无疑倍增。排除对效率的极端要求都可以使用现成的RPC框架。 简单来讲服务端提供两个RPC服务一个用来接收消息一个用来确认消息收到。并且做到不管哪个server收到消息和确认消息结果一致即可。当然这中间可能还涉及跨IDC的服务的问题。这里和RPC的原则是一致的尽量优先选择本机房投递。你可能会问如果producer和consumer本身就在两个机房了怎么办首先broker必须保证感知的到所有consumer的存在。其次producer尽量选择就近的机房就好了。
高可用
其实所有的高可用是依赖于RPC和存储的高可用来做的。先来看RPC的高可用美团的基于MTThrift的RPC框架阿里的Dubbo等其本身就具有服务自动发现负载均衡等功能。而消息队列的高可用只要保证broker接受消息和确认消息的接口是幂等的并且consumer的几台机器处理消息是幂等的这样就把消息队列的可用性转交给RPC框架来处理了。 那么怎么保证幂等呢最简单的方式莫过于共享存储。broker多机器共享一个DB或者一个分布式文件/kv系统则处理消息自然是幂等的。就算有单点故障其他节点可以立刻顶上。另外failover可以依赖定时任务的补偿这是消息队列本身天然就可以支持的功能。存储系统本身的可用性我们不需要操太多心放心大胆的交给DBA们吧 对于不共享存储的队列如Kafka使用分区加主备模式就略微麻烦一些。需要保证每一个分区内的高可用性也就是每一个分区至少要有一个主备且需要做数据的同步关于这块HA的细节可以参考下篇pull模型消息系统设计。
服务端承载消息堆积的能力
消息到达服务端如果不经过任何处理就到接收者了broker就失去了它的意义。为了满足我们错峰/流控/最终可达等一系列需求把消息存储下来然后选择时机投递就显得是顺理成章的了。 只是这个存储可以做成很多方式。比如存储在内存里存储在分布式KV里存储在磁盘里存储在数据库里等等。但归结起来主要有持久化和非持久化两种。 持久化的形式能更大程度地保证消息的可靠性如断电等不可抗外力并且理论上能承载更大限度的消息堆积外存的空间远大于内存。 但并不是每种消息都需要持久化存储。很多消息对于投递性能的要求大于可靠性的要求且数量极大如日志。这时候消息不落地直接暂存内存尝试几次failover最终投递出去也未尝不可。 市面上的消息队列普遍两种形式都支持。当然具体的场景还要具体结合公司的业务来看。
存储子系统的选择
我们来看看如果需要数据落地的情况下各种存储子系统的选择。理论上从速度来看文件系统分布式KV持久化分布式文件系统数据库而可靠性却截然相反。还是要从支持的业务场景出发作出最合理的选择如果你们的消息队列是用来支持支付/交易等对可靠性要求非常高但对性能和量的要求没有这么高而且没有时间精力专门做文件存储系统的研究DB是最好的选择。 但是DB受制于IOPS如果要求单broker 5位数以上的QPS性能基于文件的存储是比较好的解决方案。整体上可以采用数据文件索引文件的方式处理具体这块的设计比较复杂可以参考下篇的存储子系统设计。 分布式KV如MongoDBHBase等或者持久化的Redis由于其编程接口较友好性能也比较可观如果在可靠性要求不是那么高的场景也不失为一个不错的选择。
消费关系解析
现在我们的消息队列初步具备了转储消息的能力。下面一个重要的事情就是解析发送接收关系进行正确的消息投递了。 市面上的消息队列定义了一堆让人晕头转向的名词如JMS 规范中的Topic/QueueKafka里面的Topic/Partition/ConsumerGroupRabbitMQ里面的Exchange等等。抛开现象看本质无外乎是单播与广播的区别。所谓单播就是点到点而广播是一点对多点。当然对于互联网的大部分应用来说组间广播、组内单播是最常见的情形。 消息需要通知到多个业务集群而一个业务集群内有很多台机器只要一台机器消费这个消息就可以了。 当然这不是绝对的很多时候组内的广播也是有适用场景的如本地缓存的更新等等。另外消费关系除了组内组间可能会有多级树状关系。这种情况太过于复杂一般不列入考虑范围。所以一般比较通用的设计是支持组间广播不同的组注册不同的订阅。组内的不同机器如果注册一个相同的ID则单播如果注册不同的ID(如IP地址端口)则广播。 至于广播关系的维护一般由于消息队列本身都是集群所以都维护在公共存储上如config server、zookeeper等。维护广播关系所要做的事情基本是一致的:
发送关系的维护。发送关系变更时的通知。
队列高级特性设计
上面都是些消息队列基本功能的实现下面来看一些关于消息队列特性相关的内容不管可靠投递/消息丢失与重复以及事务乃至于性能不是每个消息队列都会照顾到所以要依照业务的需求来仔细衡量各种特性实现的成本利弊最终做出最为合理的设计。
可靠投递最终一致性
这是个激动人心的话题完全不丢消息究竟可不可能答案是完全可能前提是消息可能会重复并且在异常情况下要接受消息的延迟。 方案说简单也简单就是每当要发生不可靠的事情RPC等之前先将消息落地然后发送。当失败或者不知道成功失败比如超时时消息状态是待发送定时任务不停轮询所有待发送消息最终一定可以送达。 具体来说
producer往broker发送消息之前需要做一次落地。请求到server后server确保数据落地后再告诉客户端发送成功。支持广播的消息队列需要对每个待发送的endpoint持久化一个发送状态直到所有endpoint状态都OK才可删除消息。
对于各种不确定超时、down机、消息没有送达、送达后数据没落地、数据落地了回复没收到其实对于发送方来说都是一件事情就是消息没有送达。 重推消息所面临的问题就是消息重复。重复和丢失就像两个噩梦你必须要面对一个。好在消息重复还有处理的机会消息丢失再想找回就难了。 Anyway作为一个成熟的消息队列应该尽量在各个环节减少重复投递的可能性不能因为重复有解决方案就放纵的乱投递。 最后说一句不是所有的系统都要求最终一致性或者可靠投递比如一个论坛系统、一个招聘系统。一个重复的简历或话题被发布可能比丢失了一个发布显得更让用户无法接受。不断重复一句话任何基础组件要服务于业务场景。
消费确认
当broker把消息投递给消费者后消费者可以立即响应我收到了这个消息。但收到了这个消息只是第一步我能不能处理这个消息却不一定。或许因为消费能力的问题系统的负荷已经不能处理这个消息或者是刚才状态机里面提到的消息不是我想要接收的消息主动要求重发。 把消息的送达和消息的处理分开这样才真正的实现了消息队列的本质-解耦。所以允许消费者主动进行消费确认是必要的。当然对于没有特殊逻辑的消息默认Auto Ack也是可以的但一定要允许消费方主动ack。 对于正确消费ack的没什么特殊的。但是对于reject和error需要特别说明。reject这件事情往往业务方是无法感知到的系统的流量和健康状况的评估以及处理能力的评估是一件非常复杂的事情。举个极端的例子收到一个消息开始build索引可能这个消息要处理半个小时但消息量却是非常的小。所以reject这块建议做成滑动窗口/线程池类似的模型来控制 消费能力不匹配的时候直接拒绝过一段时间重发减少业务的负担。 但业务出错这件事情是只有业务方自己知道的就像上文提到的状态机等等。这时应该允许业务方主动ack error并可以与broker约定下次投递的时间。
重复消息和顺序消息
上文谈到重复消息是不可能100%避免的除非可以允许丢失那么顺序消息能否100%满足呢? 答案是可以但条件更为苛刻
允许消息丢失。从发送方到服务方到接受者都是单点单线程。
所以绝对的顺序消息基本上是不能实现的当然在METAQ/Kafka等pull模型的消息队列中单线程生产/消费排除消息丢失也是一种顺序消息的解决方案。 一般来讲一个主流消息队列的设计范式里应该是不丢消息的前提下尽量减少重复消息不保证消息的投递顺序。 谈到重复消息主要是两个话题
如何鉴别消息重复并幂等的处理重复消息。一个消息队列如何尽量减少重复消息的投递。
先来看看第一个话题每一个消息应该有它的唯一身份。不管是业务方自定义的还是根据IP/PID/时间戳生成的MessageId如果有地方记录这个MessageId消息到来是能够进行比对就 能完成重复的鉴定。数据库的唯一键/bloom filter/分布式KV中的key都是不错的选择。由于消息不能被永久存储所以理论上都存在消息从持久化存储移除的瞬间上游还在投递的可能上游因种种原因投递失败不停重试都到了下游清理消息的时间。这种事情都是异常情况下才会发生的毕竟是小众情况。两分钟消息都还没送达多送一次又能怎样呢幂等的处理消息是一门艺术因为种种原因重复消息或者错乱的消息还是来到了说两种通用的解决方案
版本号。 状态机。
事务
持久性是事务的一个特性然而只满足持久性却不一定能满足事务的特性。还是拿扣钱/加钱的例子讲。满足事务的一致性特征则必须要么都不进行要么都能成功。 解决方案从大方向上有两种
两阶段提交分布式事务。本地事务本地落地补偿发送。
分布式事务存在的最大问题是成本太高两阶段提交协议对于仲裁down机或者单点故障几乎是一个无解的黑洞。对于交易密集型或者I/O密集型的应用没有办法承受这么高的网络延迟系统复杂性。 并且成熟的分布式事务一定构建与比较靠谱的商用DB和商用中间件上成本也太高。 那如何使用本地事务解决分布式事务的问题呢以本地和业务在一个数据库实例中建表为例子与扣钱的业务操作同一个事务里将消息插入本地数据库。如果消息入库失败则业务回滚如果消息入库成功事务提交。 然后发送消息注意这里可以实时发送不需要等定时任务检出以提高消息实时性。以后的问题就是前文的最终一致性问题所提到的了只要消息没有发送成功就一直靠定时任务重试。 这里有一个关键的点本地事务做的是业务落地和消息落地的事务而不是业务落地和RPC成功的事务。这里很多人容易混淆如果是后者无疑是事务嵌套RPC是大忌会有长事务死锁等各种风险。 而消息只要成功落地很大程度上就没有丢失的风险磁盘物理损坏除外。而消息只要投递到服务端确认后本地才做删除就完成了producer-broker的可靠投递并且当消息存储异常时业务也是可以回滚的。 本地事务存在两个最大的使用障碍
配置较为复杂“绑架”业务方必须本地数据库实例提供一个库表。对于消息延迟高敏感的业务不适用。
话说回来不是每个业务都需要强事务的。扣钱和加钱需要事务保证但下单和生成短信却不需要事务不能因为要求发短信的消息存储投递失败而要求下单业务回滚。所以一个完整的消息队列应该定义清楚自己可以投递的消息类型如事务型消息本地非持久型消息以及服务端不落地的非可靠消息等。对不同的业务场景做不同的选择。另外事务的使用应该尽量低成本、透明化可以依托于现有的成熟框架如Spring的声明式事务做扩展。业务方只需要使用Transactional标签即可。
性能相关
异步/同步
首先澄清一个概念异步同步和oneway是三件事。异步归根结底你还是需要关心结果的但可能不是当时的时间点关心可以用轮询或者回调等方式处理结果同步是需要当时关心 的结果的而oneway是发出去就不管死活的方式这种对于某些完全对可靠性没有要求的场景还是适用的但不是我们重点讨论的范畴。 回归来看任何的RPC都是存在客户端异步与服务端异步的而且是可以任意组合的客户端同步对服务端异步客户端异步对服务端异步客户端同步对服务端同步客户端异步对服务端同步。 对于客户端来说同步与异步主要是拿到一个Result还是Future(Listenable)的区别。实现方式可以是线程池NIO或者其他事件机制这里先不展开讲。 服务端异步可能稍微难理解一点这个是需要RPC协议支持的。参考servlet 3.0规范服务端可以吐一个future给客户端并且在future done的时候通知客户端。 整个过程可以参考下面的代码
客户端同步服务端异步。
FutureResult future request(server);//server立刻返回futuresynchronized(future){while(!future.isDone()){ future.wait();//server处理结束后会notify这个future并修改isdone标志}}return future.get();
客户端同步服务端同步。
Result result request(server);客户端异步服务端同步(这里用线程池的方式)。
FutureResult future executor.submit(new Callable(){public void callResult(){ result request(server);}})return future;
客户端异步服务端异步。
FutureResult future request(server);//server立刻返回future return future
上面说了这么多其实是想让大家脱离两个误区
RPC只有客户端能做异步服务端不能。异步只能通过线程池。
那么服务端使用异步最大的好处是什么呢说到底是解放了线程和I/O。试想服务端有一堆I/O等待处理如果每个请求都需要同步响应每条消息都需要结果立刻返回那么就几乎没法做I/O合并 当然接口可以设计成batch的但可能batch发过来的仍然数量较少。而如果用异步的方式返回给客户端future就可以有机会进行I/O的合并把几个批次发过来的消息一起落地这种合并对于MySQL等允许batch insert的数据库效果尤其明显并且彻底释放了线程。不至于说来多少请求开多少线程能够支持的并发量直线提高。 来看第二个误区返回future的方式不一定只有线程池。换句话说可以在线程池里面进行同步操作也可以进行异步操作也可以不使用线程池使用异步操作NIO、事件。 回到消息队列的议题上我们当然不希望消息的发送阻塞主流程前面提到了server端如果使用异步模型则可能因消息合并带来一定程度上的消息延迟所以可以先使用线程池提交一个发送请求主流程继续往下走。 但是线程池中的请求关心结果吗Of course必须等待服务端消息成功落地才算是消息发送成功。所以这里的模型准确地说事客户端半同步半异步使用线程池不阻塞主流程但线程池中的任务需要等待server端的返回server端是纯异步。客户端的线程池wait在server端吐回的future上直到server端处理完毕才解除阻塞继续进行。
总结一句同步能够保证结果异步能够保证效率要合理的结合才能做到最好的效率。
push还是pull
上文提到的消息队列大多是针对push模型的设计。现在市面上有很多经典的也比较成熟的pull模型的消息队列如Kafka、MetaQ等。这跟JMS中传统的push方式有很大的区别可谓另辟蹊径。 我们简要分析下push和pull模型各自存在的利弊。
慢消费
慢消费无疑是push模型最大的致命伤穿成流水线来看如果消费者的速度比发送者的速度慢很多势必造成消息在broker的堆积。假设这些消息都是有用的无法丢弃的消息就要一直在broker端保存。当然这还不是最致命的最致命的是broker给consumer推送一堆consumer无法处理的消息consumer不是reject就是error然后来回踢皮球。 反观pull模式consumer可以按需消费不用担心自己处理不了的消息来骚扰自己而broker堆积消息也会相对简单无需记录每一个要发送消息的状态只需要维护所有消息的队列和偏移量就可以了。所以对于建立索引等慢消费消息量有限且到来的速度不均匀的情况pull模式比较合适。
消息延迟与忙等
这是pull模式最大的短板。由于主动权在消费方消费方无法准确地决定何时去拉取最新的消息。如果一次pull取到消息了还可以继续去pull如果没有pull取到则需要等待一段时间重新pull。 但等待多久就很难判定了。你可能会说我可以有xx动态pull取时间调整算法但问题的本质在于有没有消息到来这件事情决定权不在消费方。也许1分钟内连续来了1000条消息然后半个小时没有新消息产生 可能你的算法算出下次最有可能到来的时间点是31分钟之后或者60分钟之后结果下条消息10分钟后到了是不是很让人沮丧 当然也不是说延迟就没有解决方案了业界较成熟的做法是从短时间开始不会对broker有太大负担然后指数级增长等待。比如开始等5ms然后10ms然后20ms然后40ms……直到有消息到来然后再回到5ms。 即使这样依然存在延迟问题假设40ms到80ms之间的50ms消息到来消息就延迟了30ms而且对于半个小时来一次的消息这些开销就是白白浪费的。 在阿里的RocketMq里有一种优化的做法-长轮询来平衡推拉模型各自的缺点。基本思路是:消费者如果尝试拉取失败不是直接return,而是把连接挂在那里wait,服务端如果有新的消息到来把连接notify起来这也是不错的思路。但海量的长连接block对系统的开销还是不容小觑的还是要合理的评估时间间隔给wait加一个时间上限比较好~
总结
本文从为何使用消息队列开始讲起然后主要介绍了如何从零开始设计一个消息队列包括RPC、事务、最终一致性、广播、消息确认等关键问题。并对消息队列的push、pull模型做了简要分析最后从批量和异步角度分析了消息队列性能优化的思路。下篇会着重介绍一些高级话题如存储系统的设计、流控和错峰的设计、公平调度等。希望通过这些让大家对消息队列有个提纲挈领的整体认识并给自主开发消息队列提供思路。另外本文主要是源自自己在开发消息队列中的思考和读源码时的体会比较不官方也难免会存在一些漏洞欢迎大家多多交流。 本文由博客一文多发平台 OpenWrite 发布