01|业界的主流消息队列是如何发展起来的?
作为一个消息队列的老兵,我经常被团队的新同学或者客户的研发人员问:我希望在我们的业务中引入消息队列,应该选择哪一款合适?是不是用最近很火的Pulsar就好了,但是业务团队推荐用RocketMQ或RabbitMQ,而大数据团队推荐用Kafka,有没有什么选择的标准可以参考呢?
这个问题你之前一定疑惑过,可能也有了一套自己的选择标准,不过遇到这种情况,我都会先反问:你觉得消息队列是做什么的?
自从负责消息队列云服务之后,我接触了很多客户,发现基本90%以上的用户(大概率你也在其中),业务的数据量都不大,场景也不复杂。只用到了消息队列最基本的生产和消费功能,把消息队列当做缓冲分发的管道,基本不会用到消息队列的高级功能,比如死信队列、事务消息、延时消息等等。
所以,很多情况下,我们不需要很复杂的消息队列,有些时候甚至都不需要引入业界标准的消息队列产品。是不是和你之前的想法不太一致?今天我们就从这个有点反常识的观点开始讨论。
你真的需要标准消息队列吗?
我第一次用消息队列是在我负责游戏论坛开发的时候,有一个功能是需要把用户对帖子的评论和点赞数据发给下游分析。你可以想一下,如果你接到这个需求,会怎么做呢?
我的第一个想法就是在流程里面加一个函数,封装一下,再把数据通过 HTTP API 发送给下游,就完成了。
不过我上级选择了另外一套方案,引入Kafka,在主流程里面先把数据发送到Kafka,再让下游去消费数据。那时候Kafka还是0.8.*的版本,公司用得很少,我们团队也是第一次用,还特意找运维搭了套集群。
Apache Kafka是 LinkedIn 在2011 年贡献给了 Apache 软件基金会的分布式消息队列。主要满足大数据领域中的高吞吐量、低延迟场景。核心功能较为简单,只提供生产和消费,后来加了幂等和事务。采用通用分布式架构实现发布-订阅模型,能高效地处理海量数据。
当时我不太理解加Kafka的意义是什么。
因为我们的需求非常简单,只涉及生产和消费两个动作,而且现网一直在用Redis。当时我认为使用Redis的List实现Push/Pop就可以满足需求,效果是一样的,还省去Kafka的维护和学习成本。进一步,如果担心Redis的数据没做持久化丢失了,也可以通过MySQL的表,把数据insert进去,下游去select出来,也能实现Push/Pop的效果。
那为什么还是用Kafka呢?
当时有个业务背景:用户的回复会包含图片和表情等复杂结构数据,导致内容可能会特别长。另外遇到高峰时,回帖数量会特别多,短时间内可能有几千万的回帖,回帖的总内容很大。
Redis的问题在于数据都存在内存里,如果数据没有及时消费,就会打爆内存。而且因为回帖内容可能很大,在Redis的Value里存大的数据会有性能和稳定性问题。而MySQL在insert上是有性能瓶颈的,短时间内大量回帖,插入会特别频繁,性能就扛不住。
而Kafka就没有这个问题,作为一个消息队列,它的特点就是高吞吐、大消息、高并发、持久化,不会存在性能、功能、稳定性方面的问题。这就是我们选择Kafka的理由。
其实如果没有大消息和大流量等复杂场景,是可以选用非标准消息队列产品的。比如在用户状态审核的场景中,只需要向下游传递用户ID和审核结果,结构简单,数量有限。这时候选择非标准消息队列比如Redis和MySQL也是可以的。
所以,是否选择使用标准消息队列产品,取决于你的数据和业务场景的需求。 当数据量大、场景复杂后,才必须引入标准消息队列,因为它有高吞吐、持久化、长久堆积的特性。
既然用非标准消息队列也可以满足相应需求,那到底什么是消息队列呢?
从宏观上来讲,我会认为具有缓冲作用、具备类发布和订阅能力的存储引擎都可以称做消息队列。因为消息队列的最基本功能就是生产和消费,在发布订阅之上,扩展如死信队列、顺序消息、延时消息等高阶能力,并实现高吞吐、低延时、高可靠等特性,就成为了我们所熟知的功能齐全的标准消息队列。
现在业界都有哪些标准的消息队列呢?我们一起来梳理下。
业界都有哪些消息队列?
还是跟随我的时间线来看。后来我负责的广告业务用到了另一款消息队列 RabbitMQ,当时我最直观的感觉就是:它功能比Kafka多好多,支持延时消息、死信队列、优先级队列、事务消息等等,业务上也是因为要用到这些功能才选择的RabbitMQ。
RabbitMQ是2007年由国外一家叫做Rabbit的公司开源的,用Erlang 写成的消息队列。主要满足业务中消息总线的场景。特点是功能丰富,低流量下稳定性较高,基本具备消息队列所应该具备的所有功能。缺点是在大流量的情况下会有明显的瓶颈和稳定性风险。
当时开源的消息队列并不丰富。基于JMS协议发展出来的ActiveMQ 因为功能和稳定性问题,用的人比较少。Kafka刚开源不久,功能只有生产和消费。AMQP协议的 Erlang 实现 RabbitMQ,因为功能丰富,稳定性较高,成为主流选择。
ActiveMQ 项目最初是由 LogicBlaze 的创始人于 2004 年创建的,支持完成JMS规范的消息队列。因为生态、功能定位方面的原因,在国内用的人并不多。
AMQP是一个消息队列协议规范,它不是一款具体的消息队列。因为不同消息队列的访问协议是不一样的,导致不同的消息队列需要用不同的SDK访问,客户的切换成本很高。2003年,多个金融服务机构希望制定一个消息队列的协议规范,希望不同的消息队列的协议都根据这个标准实现,这样就可以不需要重复开发SDK,不同的应用程序之间的交互和切换可以更简单、更方便。这就是AMQP的由来。
不过开源选择少,并不意味着那时候没有好的消息队列可用,商业化的闭源的消息队列一直在蓬勃发展,但需要购买才能使用,而且不便宜。
比如微软1997年推出的MSMQ、AWS 2004年推出的SQS,而消息队列领域最牛的,某种意义上(从收入维度看)一直都是 IBM MQ,在金融、证券行业用得特别多,几乎一统天下。数据显示,到了2020 年 IBM MQ 每年在全球仍有近 10 亿美金的营收。现在国内各大云厂商消息队列的收入加起来,可能都不够IBM MQ的零头。
后来开源社区逐渐完善,阿里的 RocketMQ在2017年从Apache基金会毕业,Pulsar也在2018成为了Apache的顶级项目,开源社区生态逐渐繁荣,选择慢慢变多。
RocketMQ 在定位上和 RabbitMQ 很像,功能丰富,在业务消息中经常会用到。不过RocketMQ是在移动互联网浪潮下发展起来的,业务场景更加复杂,也支持更多功能,比如消息Tag、消息轨迹、消息查询等等。
除了功能层面,在架构和性能层面,RabbitMQ开发设计早,当时分布式的设计理念还不成熟,导致它在架构层面的设计存在较大的缺陷,遇到大流量、高并发的时候,容易出现集群不可用、网络分区等情况无法解决。而RocketMQ在分布式架构上实现得更合理优雅,在大流量、高并发的场景下表现优秀稳定。
RocketMQ是2013年阿里研发,2016年开源,可满足大规模微服务场景的消息队列产品。可以理解为是RabbitMQ的高可用、分布式升级版。功能丰富,基本可以满足业务消息场景下的所有需求。稳定性、数据可靠性方面的表现都较好。性能介于RabbitMQ和Kafka之间。
Pulsar和Kafka很像,主要定位在流领域,主打大吞吐的流式计算。但Kafka的功能比较简单,支持基本的发布订阅、幂等、事务消息。Pulsar在满足这些功能的基础上,也希望支持RocketMQ和RabbitMQ的功能,所以功能最丰富。
除了功能层面,在架构和性能层面上,Pulsar的架构设计比Kafka更符合当前云原生架构,它的定位是Kafka的升级版,主要解决Kafka当前的一些痛点问题,比如集群扩缩容慢、分区迁移需要Rebalance、无法支持超多分区等。性能目前没有特别大的差距。
不过Pulsar发展时间较短,架构较复杂,功能支持较多,当前阶段在稳定性上Kafka会比Pulsar好非常多。
Pulsar 2017年由 Yahoo 开发的消息队列系统。一开始定位是流计算领域,可以理解为Kafka的升级版,近期希望同时发展消息和流两个方向。其架构上的设计理念较为优秀,比如计算存储分离、弹性、多租户。在功能上目前正在追赶RabbitMQ/RocketMQ。性能层面,和Kafka没有明显的差异。但当前阶段的稳定性还需要提升。
这里总结一下,当前开源社区用的较多的消息队列主要有RabbitMQ、Kafka,RocketMQ和Pulsar四款。下面我整理了一张消息队列发展的时间脉络图,供你参考。
在开源社区发展的这段时间,国内大厂也一直在自研消息队列,比如阿里的RocketMQ、腾讯的CMQ和TubeMQ、京东的JMQ、字节的BMQ。只是发展程度不一样,有的开源了成为顶级项目,有的慢慢消亡了,有的仅限在公司内部使用。
虽然有这么多的消息队列,但它们发展的方向其实是一致的。
消息队列的发展脉络
结合我的个人经历,我们可以从消息队列的历史中抽象出两条交织的发展脉络:上层的需求变化和下层的架构演进。
从需求发展路径上看,消息队列的发展趋势是: 消息 -> 流 -> 消息和流融合。
从架构发展的角度来看,消息队列的发展趋势是: 单机 -> 分布式 -> 云原生/Serverless。
在90年代到21世纪初,以IBM MQ和AMQP为代表的消息队列主要满足业务上对消息的需求,即异步通讯、架构解耦。
2010年左右,移动互联网发展,大数据兴起,传统的消息队列在架构上无法满足大流量的吞吐需求,就发展出了以Kafka为代表的消息队列,主打大吞吐、大流量。我们也进入了分布式的时代,现在大家熟知的消息队列都是分布式的架构,所以会有分区、副本、一致性概念。
随着业务场景越来越复杂,业务消息的数据量也越来越大。基于开源AMQP的RabbitMQ在性能和架构上已经无法满足消息场景的需求,从而发展出了RocketMQ。
近几年随着云计算的发展、云原生和Serverless的理念兴起,在弹性、成本的驱动下,消息队列的架构往云原生/Serverless方向演变,简单来说,就是利用云上的弹性计算、存储等基础设施去实现架构的Serverless,按需使用、按量付费,最终达到使用端感受到的免运维、低成本。
基于云原生架构设计的Pulsar开始走向成熟,业界的MQ也出现了 计算存储分离、分层存储、多租户、弹性计算等概念。
云原生/Serverless的发展趋势,你应该听得比较多了,“消息和流融合”的趋势可能有些陌生,我稍微解释一下。
什么是消息和流?
消息、流分开来看都比较好理解。
消息就是业务消息,在业务架构(比如微服务架构)中用来作消息传递,做系统的消息总线,比如用户提交订单的流程。
流,就是在大数据架构中用来做大流量时的数据削峰,比如日志的投递流转。
消息和流融合就是这两个事情都能做。不过为什么会有消息和流融合的这个趋势呢?
其实都是钱的原因。虽然消息队列是基础组件,但是功能比较单一,主要是缓冲作用,在消息、流的方向上,功能需求一直是相对固定的,细分的市场也都有领头组件,流领域目前是Kafka一家独大,消息领域的头部玩家,国外是RabbitMQ,国内是RocketMQ。
对MQ厂商来说,如果希望扩大产品份额或者新品抢占市场,就需要有 独特竞争力,核心就是自己产品的功能和成本, 即功能更多更丰富、成本更低。这里的成本指的是能为客户,也就是消息队列使用者,节省多少资源、人力成本。
因为对我们使用者来说,业务和大数据都不陌生,日常工作都需要进行业务和数据系统的开发。但是麻烦的问题是:在业务消息中,我们经常需要使用RocketMQ或RabbitMQ,在大数据系统中,我们又需要使用Kafka或Pulsar。
所以, 运维侧需要运维多款 MQ,研发侧需要学习使用多款MQ,这在一定程度上拉升了研发和运维的成本。如果有一款消息队列满足所有场景,只需要部署一款消息队列,就能满足所有业务的需求,这种设计思想是非常有商业价值的。
现在我们也可以看到业界主流的4款消息队列,在消息和流的融合上各有动作。
RabbitMQ因为开发语言、架构和社区的活跃度、定位的原因,基本不会走这条路。
Kafka虽然也强调云原生,但目前主要工作在自身的架构优化上,比如去ZooKeeper,暂时在消息方向没有提出明确概念。但在我看来,未来Kafka应该会往这个方向转变,因为流的场景始终会有瓶颈,打通一个新方向在商业上肯定是有价值的。
RocketMQ在消息领域已经非常成熟,社区也希望打通流的场景,扩展使用范围,提升竞争力,抢占市场,也在往这个方向努力。
Pulsar是一个新兴架构,没有历史包袱,主打的就是云原生的消息和流的融合架构,希望满足更多场景,解决更多业务需求。
总结
现在你应该能解答我们开头提出的问题了,广义上讲,消息队列是有缓冲作用、具备类发布和订阅能力的存储引擎。而技术方案如何选择消息队列,来源于业务场景和数据量。如果只需要最基本的生产和消费功能,可以不用标准消息队列产品,大部分公司或业务的场景下,一款消息队列就能满足所有需求了。
技术的演进都是商业驱动的,消息队列的演进,无论是从需求发展路径上看是消息 -> 流 -> 消息和流融合,还是从架构发展角度的单机 -> 分布式 -> 云原生/Serverless,本质其实都是在思考如何降低成本和吸引客户。
为了降低成本,弹性是最基础的要求。所以消息队列在技术上,对计算弹性的需求提出了计算存储分离架构,对低存储成本的需求提出了分层存储的概念,对资源复用的需求提出了多租户的概念。
为了吸引客户,各个消息队列都在尽量提高自己的竞争力,围绕着功能、容灾、多架构、生态建设展开。
不过要注意,消息和流只是业界的趋势,不是我们作为使用者必然的非此即彼的选择。在开发者实际使用的时候,我也发现很多人会将Kafka当做一个业务消息总线在用,也有人使用RocketMQ传递大流量的日志,当做大数据架构中的管道在用。
02|消息队列在架构和功能层面都包含哪些概念?
这节课我们来了解一下消息队列在架构和功能层面的基本概念,也是想有针对性地对齐一些通用基础概念,同时让你对消息队列有一个整体认识,从而让后面的学习过程更加顺利。
什么时候会用到消息队列?
首先我们从使用者的角度,来聊聊什么情况下我们会用到消息队列。
在系统架构中,消息队列的定位就是 总线和管道,主要起到解耦上下游系统、数据缓存的作用。它不像数据库,会有很多计算、聚合、查询的逻辑,它的主要操作就是 生产和消费。所以,我们在业务中不管是使用哪款消息队列,我们的核心操作永远是生产和消费数据。
一般情况下,我们会在需要解耦上下游系统、对数据有缓冲缓存需求或者需要用到消息队列的某些功能(比如延时消息、优先级消息)的时候选择使用消息队列,然后再根据实际需求选型。
下面我们就用经典的订单下单流程,来简要概括下对消息队列的使用情况。
下单流程是一个典型的 系统解耦、 消息分发 的场景,一份数据需要被多个下游系统处理。另外一个经典场景就是日志采集流程,一般日志数据都很大,直接发到下游,下游系统可能会扛不住崩溃,所以会把数据先缓存到消息队列中。所以消息队列的基本特性就是高性能、高吞吐、低延时。
架构层面的基本概念
接下来我们将通过一张图示,来了解一下消息队列架构层面常见的一些基本概念。
Broker : Broker 本质上是一个进程,比如 RocketMQ 的 Broker 就是指RocketMQ Server 启动成功后的一个进程。在实际部署过程中,通常一个物理节点只会起一个进程,所以大部分情况下我们认为 Broker 就表示一个节点,但是在一些特殊场景下,一个物理节点中也可以起多个进程,就表示一台节点有多个Broker。
Topic(主题) : 在大部分消息队列中,Topic 都是指用来组织分区关系的一个逻辑概念。通常情况下,一个 Topic 会包含多个分区。但是 RabbitMQ 是一个例外,Topic 是指具体某一种主题模式。
Partition/Queue/MessageQueue(分区/分片): 在消息队列中,分区、分片、Partiton、Queue、MessageQueue 是一个概念,后面统一用分区来称呼,都是用来表示数据存储的最小单位。一般可以直接将消息写入到一个分区中,也可以将消息写入到Topic,再分发到具体某个分区。一个Topic 通常会包含一个或多个分区。
Producer(生产者): 生产者指消息的发送方,即发送消息的客户端,也叫生产端。
Consumer(消费者):消费者指消息的接收方,即接收消息的客户端,也叫消费端。
ConsumerGroup/Subscription(消费分组/订阅):一般情况下,消息队列中消费分组和订阅是同一个概念,后面统一用消费分组来称呼。它是用来组织消费者和分区关系的逻辑概念,也有保存消费进度的作用。
Message(消息):指一条真实的业务数据,消息队列的每条数据一般都叫做一条消息。
Offset/ConsumerOffset/Cursor(位点/消费位点/游标): 指消费者消费分区的进度,即每个消费者都会去消费分区,为了避免重复消费进度,都会保存消费者消费分区的进度信息。
ACK/OffsetCommit(确认/位点提交):确认和位点提交一般都是指提交消费进度的操作,即数据消费成功后,提交当前的消费位点,确保不重复消费。
Leader/Follower(领导者/追随者,主副本/从副本):Leader 和 Follower一般是分区维度副本的概念,即集群中的分区一般会有多个副本。此时就会有主从副本的概念,一般是一个主副本配上一个或多个从副本。
Segment(段/数据分段):段是指消息数据在底层具体存储时,分为多个文件存储时的文件,这个文件就叫做分区的数据段。即比如每超过 1G 的文件就新起一个文件来存储,这个文件就是Segment。基本所有的消息队列都有段的概念,比如Kakfa的Segment、Pulsar的Ledger等等。
StartOffset/EndOffset(起始位点/结束位点):起始位点和结束位点是分区维度的概念。即数据是顺序写入到分区的,一般从0的位置开始往后写,此时起始位点就是0。因为数据有过期的概念,分区维度较早的数据会被清理。此时起始位点就会往后移,表示当前阶段最早那条有效消息的位点。结束位点是指最新的那条数据的写入位置。因为数据一直在写入分区,所以起始位点和结束位点是一直动态变化的。
ACL(访问控制技术):ACL 全称是Access Control List,用来对集群中的资源进行权限控制,比如控制分区或Topic的读和写等。
功能层面的基本概念
讲完了架构层面的基本概念,我们来看看功能层面的基本概念。
相比于数据库的基本操作是增删改查,消息队列的基本操作就是生产和消费,即读和写。消息队列一般是不支持客户端修改和删除单条数据的。接下来我们就从功能的角度,来了解一些常见的基本概念。
顺序消息: 是指从生产者和消费者的视角来看,生产者按顺序写入Topic的消息,在消费者这边能不能按生产者写入的顺序消费到消息,如果能就是顺序消息。
延时消息/定时消息:都是指生产者发送消息到 Broker 时,可以设置这条消息在多久后能被消费到,当时间到了后,消息就会被消费到。延时的意思就是指以 Broker 收到消息的时间为准,多久后消息能被消费者消费,比如消息发送成功后的30分钟才能被消费。定时是指可以指定消息在设置的时间才能被看到,比如设置明天的20:00才能被消费。从技术上来看,两者是一样的;从客户端的角度,功能上稍微有细微的差别;从内核的角度,一般两种消息是以同一个概念出现的。
事务消息:消息队列的事务因为在不同的消息队列中的实现方式不一样,所以定义也不太一样。正常情况下,事务表示多个操作的原子性,即一批操作要么一起成功,要么一起失败。在消息队列中,一般指发送一批消息,要么同时成功,要么同时失败。
消息重试:消息重试分为生产者重试和消费者重试。生产者重试是指当消息发送失败后,可以设置重试逻辑,比如重试几次、多久后重试、重试间隔多少。消费者重试是指当消费的消息处理失败后,会自动重试消费消息。
消息回溯:是指当允许消息被多次消费,即某条消息消费成功后,这条消息不会被删除,还能再重复到这条消息。
广播消费:广播听起来是一个主动的,即 Broker 将一条消息广播发送给多个消费者。但是在消息队列中,广播本质上是指一条消息能不能被很多个消费者消费到。只要能被多个消费者消费到,就能起到广播消费的效果,就可以叫做广播消费。
死信队列:死信队列是一个功能,不是一个像分区一样的实体概念。它是指当某条消息无法处理成功时,则把这条消息写入到死信队列,将这条消息保存起来,从而可以处理后续的消息的功能。大部分情况下,死信队列在消费端使用得比较多,即消费到的消息无法处理成功,则将数据先保存到死信队列,然后可以继续处理其他消息。当然,在生产的时候也会有死信队列的概念,即某条消息无法写入Topic,则可以先写入到死信队列。从功能上来看,死信队列的功能业务也可以自己去实现。消息队列中死信队列的意思是,消息队列的SDK已经集成了这部分功能,从而让业务使用起来就很简单。
优先级队列:优先级队列是指可以给在一个分区或队列中的消息设置权重,权重大的消息能够被优先消费到。大部分情况下,消息队列的消息处理是FIFO先进先出的规则。此时如果某些消息需要被优先处理,基于这个规则就无法实现。所以就有了优先级队列的概念,优先级是消息维度设置的。
消息过滤:是指可以给每条消息打上标签,在消费的时候可以根据标签信息去消费消息。可以理解为一个简单的查询消息的功能,即通过标签去查询过滤消息。消息过滤主要在消费端生效。
消息过期/删除(TTL):是指消息队列中的消息会在一定时间或者超过一定大小后会被删除。因为消息队列主要是缓冲作用,所以一般会要求消息在一定的策略后会自动被清理。
消息轨迹:是指记录一条消息从生产端发送、服务端保存、消费端消费的全生命周期的流程信息。用来追溯消息什么时候被发送、是否发送成功、什么时候发送成功、服务端是否保存成功、什么时候保存成功、被哪些消费者消费、是否消费成功、什么时候被消费等等信息。
消息查询:是指能够根据某些信息查询到消息队列中的信息。比如根据消息ID或根据消费位点来查询消息,可以理解为数据库里面的固定条件的select操作。
消息压缩:是指生产端发送消息的时候,是否支持将消息进行压缩,以节省物理资源(比如网卡、硬盘)。压缩可以在SDK完成,也可以在Broker完成,并没有严格限制。通常来看,压缩在客户端完成会比较合理。
多租户:是指同一个集群是否有逻辑隔离,比如一个物理集群能否创建两个名称都为test的主题。此时一般会有一个逻辑概念 Namespace(命名空间)和 Tenant(租户)来做隔离,一般有这两个概念的就是支持多租户。
消息持久化:是指消息发送到Broker后,会不会持久化存储,比如存储到硬盘。有些消息队列为了保证性能,只会把消息存储在内存,此时节点重启后数据就会丢失。
消息流控:是指能否对写入集群的消息进行限制。一般会支持Topic、分区、消费分组、集群等维度的限流。
总结
到这儿,我们预习篇的内容就结束了,这个起步是不是非常简单。概念就不过多总结了,最后我们来总结一下 4 款主流消息队列的区别以及选型建议, 第01讲 也提到过,它们分别是RabbitMQ、RocketMQ、Kafka、Pulsar。
RabbitMQ和RocektMQ属于业务消息类的消息队列,它们的特点是功能丰富、低延时、数据高可靠性、消息可追踪等等,同时也支持延时消息、优先级队列、消息过滤等功能特性。RabbitMQ发展较早,RocketMQ则是新生的消息类的消息队列,从功能、集群化、稳定性、性能来看,RocketMQ 都是比 RabbitMQ 表现要好的。所以从某种意义上说,RocketMQ是可以替代RabbitMQ的,但是因为RabbitMQ发展悠久、内核稳定以及能满足大部分的业务消息场景,所以目前用户群体也很大。国内的业务消息类的选型一般以RocketMQ优先,然后才是RabbitMQ,而国外的业务消息类选型一般优先的是RabbitMQ。
Kakfa 属于主打流场景的消息队列。它的特点是追求高吞吐、大流量,在功能上相对简单。不支持太多消息队列的功能,比如死信队列、延时消息、消息过滤等等。但它的核心竞争力就是非常稳定、吞吐性能非常高,能承担超大流量的业务场景。所以它是流场景下的消息管道的不二选择。
Pulsar 从定位上是消息和流一体的。目标就是满足所有消息和流的场景,希望同时满足功能和性能两方面的需求。所以Pulsar的内核会支持很多功能,在性能和吞吐方面也经常拿来与Kakfa做比较。但是因为其发展时间较短,目前还不是那么稳定,正处于快速发展阶段。
从个人选择来看,我也给你一些建议。
业务消息类的场景,我会推荐你优先选择RocketMQ。主要原因是RocketMQ的性能高、社区活跃、集群化架构稳定、功能也非常丰富。而RabbitMQ当前架构存在缺点,单机存在瓶颈,在高QPS场景表现不是那么好,并且可能出现网络分区。所以从功能、性能、稳定性出发,我会优先推荐你使用RocketMQ。
流方向的场景,我会推荐你优先选择Kafka。主要原因是Kafka本身的性能和吞吐表现非常优越,延时和可靠性表现也不错。而Pulsar虽然主打的是替换Kafka,并且功能丰富,架构设计理念先进,但是因为发展周期较短,很多功能还不稳定,当前阶段的现网运营表现并不是那么好。所以虽然Kafka存在扩容、Rebalance方面的缺陷,但是从稳定性、性能出发,我还是会优先推荐你使用Kafka。
在日常使用中,我们也可能会根据业务需求同时运营多款消息队列,比如RocketMQ/RabbitMQ+Kafka。
更多细节总结如下,你可以再详细看看。
03|通信协议:如何设计一个好的通信协议?
今天我们正式进入基础篇的学习,我会带你构建最基础的消息队列。
从功能上来看,一个最基础的消息队列应该具备生产、存储、消费的能力,也就是能完成“生产者把数据发送到Broker,Broker收到数据后,持久化存储数据,最后消费者从Broker消费数据”的整个流程。
我们从这个流程来拆解技术架构,如下图所示,最基础的消息队列应该具备五个模块。
通信协议:用来完成客户端(生产者和消费者)和Broker之间的通信,比如生产或消费。
网络模块:客户端用来发送数据,服务端用来接收数据。
存储模块:服务端用来完成持久化数据存储。
生产者:完成生产相关的功能。
消费者:完成消费相关的功能。
我们知道,消息队列本质上讲是个CS模型,通过客户端和服务端之间的交互完成生产、消费等行为。 不知道你在日常的开发过程中,是否会好奇客户端和服务端之间的通信流程是怎么实现的呢?
那今天我们就开始学习基础篇的第一讲——通信协议。为了完成交互,我们第一步就需要确定服务端和客户端是如何通信的。而通信的第一步就是确定使用哪种通信协议进行通信。
说到协议,我们开发者最熟悉的可能就是HTTP协议了,HTTP作为一个标准协议,有很多优点。那能否用HTTP协议作为消息队列的通信协议呢?带着你的思考,我们开始学习。
通信协议基础
所有协议的选择和设计都是根据需求来的,我们知道消息队列的核心特性是高吞吐、低延时、高可靠,所以在协议上至少需要满足:
协议可靠性要高,不能丢数据。
协议的性能要高,通信的延时要低。
协议的内容要精简,带宽的利用率要高。
协议需要具备可扩展能力,方便功能的增减。
那有没有现成的满足这四个要求的协议呢?
目前业界的通信协议可以分为 公有协议和私有协议 两种。公有协议指公开的受到认可的具有规范的协议,比如JMS、HTTP、STOMP等。私有协议是指根据自身的功能和需求设计的协议,一般不具备通用性,比如Kafka、RocketMQ、Puslar的协议都是私有协议。
其实消息队列领域是存在公有的、可直接使用的标准协议的,比如AMQP、MQTT、OpenMessaging,它们设计的初衷就是为了解决因各个消息队列的协议不一样导致的组件互通、用户使用成本高、重复设计、重复开发成本等问题。但是,公有的标准协议讨论制定需要较长时间,往往无法及时赶上需求的变化,灵活性不足。
因此大多数消息队列为了自身的功能支持、迭代速度、灵活性考虑,在核心通信协议的选择上不会选择公有协议,都会选择自定义私有协议。
那私有协议要怎么设计实现呢?
从技术上来看,私有协议设计一般需要包含三个步骤。
网络通信协议选型,指计算机七层网络模型中的协议选择。比如传输层的TCP/UDP、应用层的HTTP/WebSocket等。
应用通信协议设计,指如何约定客户端和服务端之间的通信规则。比如如何识别请求内容、如何确定请求字段信息等。
编解码(序列化/反序列化)实现,用于将二进制的消息内容解析为程序可识别的数据格式。
每一步具体如何选择和实现呢?我们先看网络通信协议。
网络通信协议选型
从功能需求出发,为了保证性能和可靠性,几乎所有主流消息队列在核心生产、消费链路的协议选择上, 都是基于可靠性高、长连接的TCP协议。
下面是一张当前业界主要的协议类型图:
四层的UDP虽然也是长连接,性能更高,但是因为其不可靠传输的特性,业界几乎没有消息队列用它通信。
七层的HTTP协议每次通信都需要经历三次握手、四次关闭等步骤,并且协议结构也不够精简。从而在性能(比如耗时)上的表现较差,不适合高吞吐、大流量、低延时的场景。所以主流协议在核心链路上很少使用HTTP。
但是,很少并不代表没有,HTTP协议的优点是客户端库非常丰富,协议成熟,非常容易和第三方集成,用户使用起来成本非常低。
所以一些主打轻量、简单的消息队列,比如AWS SQS、Tencent CMQ,它们主链路的协议就是用的HTTP协议。核心考虑是满足 多场景的需求,即支持多种接入方式并降低接入门槛。七层协议虽然在性能上有一些降低,但是在一些特殊场景或者某些对耗时不敏感的业务中,降低接入成本是收益很高的事情。
接下来看应用通信协议的设计,如何构成?设计的时候我们应该关注什么呢?
四层的UDP虽然也是长连接,性能更高,但是因为其不可靠传输的特性,业界几乎没有消息队列用它通信。
七层的HTTP协议每次通信都需要经历三次握手、四次关闭等步骤,并且协议结构也不够精简。从而在性能(比如耗时)上的表现较差,不适合高吞吐、大流量、低延时的场景。所以主流协议在核心链路上很少使用HTTP。
但是,很少并不代表没有,HTTP协议的优点是客户端库非常丰富,协议成熟,非常容易和第三方集成,用户使用起来成本非常低。
所以一些主打轻量、简单的消息队列,比如AWS SQS、Tencent CMQ,它们主链路的协议就是用的HTTP协议。核心考虑是满足 多场景的需求,即支持多种接入方式并降低接入门槛。七层协议虽然在性能上有一些降低,但是在一些特殊场景或者某些对耗时不敏感的业务中,降低接入成本是收益很高的事情。
接下来看应用通信协议的设计,如何构成?设计的时候我们应该关注什么呢?
应用通信协议设计
从应用通信协议构成的角度,协议一般会包含协议头和协议体两部分。
协议头 包含一些通用信息和数据源信息,比如协议版本、请求标识、请求的ID、客户端ID等等。
协议体 主要包含本次通信的业务数据,比如一串字符串、一段JSON格式的数据或者原始二进制数据等等。
从编解码协议的设计角度来看,需要分别针对“请求”和“返回”设计协议,请求协议结构和返回协议结构一般长这样
设计的原则是: 请求维度的通用信息放在协议头,消息维度的信息就放在协议体。那具体怎么设计呢?我们结合Kafka协议来分析。
协议头的设计
协议头的设计,首先要确认协议中需要携带哪些通用的信息。一般情况下,请求头要携带本次请求以及源端的一些信息,返回头要携带请求唯一标识来表示对应哪个请求,这样就可以了。
所以,请求头一般需要携带协议版本、请求标识、请求的ID、客户端ID等信息。而返回头,一般只需要携带本次请求的ID、本次请求的处理结果(成功或失败)等几个信息。
接下来,我们分析一下Kafka协议的请求头和返回头的内容,让你对协议头的设计有个更直观认识。如下图所示,Kafka V2 协议的请求头中携带了四个信息
- 用来标识请求类型的api\_key,如生产、消费、获取元数据。
- 用来标识请求协议版本的api\_version,如V0、V1、V2。
- 用来唯一标识该请求correlation\_id,可以理解为请求ID。
- 用来标识客户端的client\_id。
Kafka V0协议的返回头中只携带了一个信息,即该请求的correlation\_id,用来标识这个返回是哪个请求的。
这里有个细节你可能注意到了,请求协议头是V2版本,返回协议头是V0版本,会不会有点问题呢?
其实是没有的。因为从协议的角度,一般业务需求的变化(增加或删除)都会涉及请求内容的修改,所以请求的协议变化是比较频繁的,而返回头只要能标识本次对应的请求即可,所以协议的变化比较少。所以, 请求头和返回头的协议版本制定,是建议分开定义的,这样在后期的维护升级中会更加灵活。
协议体的设计
协议体的设计就和业务功能密切相关了。因为协议体是携带本次请求/返回的具体内容的,不同接口是不一样的,比如生产、消费、确认,每个接口的功能不一样,结构基本千差万别。
不过设计上还是有共性的,注意三个点: 极简、向后兼容、协议版本管理。如何理解呢?
协议在实现上首先需要具备向后兼容的能力,后续的变更(如增加或删除)不会影响新老客户端的使用;然后协议内容上要尽量精简(比如字段和数据类型),这样可以降低编解码和传输过程中的带宽的开销,以及其他物理资源的开销;最后需要协议版本管理,方便后续的变更。
同样为了让你直观感受协议体的设计,我们看Kafka生产请求和返回的协议内容,你可以先自己分析一下。
Kafka 生产请求协议体如下:
Kafka 生产返回协议体如下:
Kafka 生产请求的协议体包含了事务ID、acks信息、请求超时时间、Topic相关的数据,都是和生产操作相关的。生产返回的协议体包含了限流信息、分区维度的范围信息等。这些字段中的每个字段都是经过多轮迭代、重复设计定下来的,每个字段都有用处。
所以在协议体的设计上, 最核心的就是要遵循“极简”原则,在满足业务要求的基础上,尽量压缩协议的大小。
接下来我想讨论一下数据类型,在协议设计里,我们很容易忽略的一个事就是数据类型,比如上面 throttle_time_ms 是 INT32,error_code 是 INT16。
数据类型很简单,用来标识每个字段的类型,不过为什么会有这个东西呢,不能直接用int、string、char 等基础类型吗?这里有两个原因。
消息队列是多语言通信的。不同语言对于同一类型的定义和实现是不一样的,如果使用同一种基础类型在不同的语言进行解析,可能会出现解析错乱等错误。
需要尽量精简消息的长度。比如只需要1个byte就可以表示的内容,如果用4个byte来表示,就会导致消息的内容更长,消耗更多的物理带宽。
所以一般在协议设计的时候,我们也需要设计相关的基础数据类型(如何设计你可以参考 Kafka 的协议数据类型 或者 Protobuf 的数据类型)。
接下来我们看看编解码的实现吧。
编解码实现
编解码也称为序列化和反序列,就是数据发送的时候编码,收到数据的时候解码。
为什么要编解码呢? 如下图所示,因为数据在网络中传输时是二进制的形式,所以在客户端发送数据的时候就要将原始的格式数据编码为二进制数据,以便在TCP协议中传输,这一步就是序列化。然后在服务端将收到的二进制数据根据约定好的规范解析成为原始的格式数据,这就是反序列化。
在序列化和反序列化中,最重要的就是TCP的粘包和拆包。我们知道TCP是一个“流”协议,是一串数据,没有明显的界限,TCP层面不知道这段流数据的意义,只负责传输。所以应用层就要根据某个规则从流数据中拆出完整的包,解析出有意义的数据,这就是粘包和拆包的作用。
粘包/拆包的几个思路就是:
消息定长。
在包尾增加回车换行符进行分割,例如FTP协议。
将消息分为消息头和消息体,消息头中包含消息总长度,然后根据长度从流中解析出数据。
更加复杂的应用层协议,比如HTTP、WebSocket等。
早期,消息队列的协议设计几乎都是自定义实现编解码,如RabbitMQ、RocektMQ 4.0、Kafka等。
但从0实现编解码器比较复杂,随着业界主流编解码框架和编解码协议的成熟,一些消息队列(如Pulsar和RocketMQ 5.0)开始使用业界成熟的编解码框架,如Google的Protobuf。
Protobuf是一个灵活、高效、结构化的编解码框架,业界非常流行,很多商业产品都会用,它支持多语言,编解码性能较高,可扩展性强,产品成熟度高。这些优点,都是我们在设计协议的时候需要重点考虑和实现的,并且我们自定义实现编解码的效果不一定有Protobuf好。所以新的消息队列产品或者新架构可以考虑选择Protobuf作为编解码框架。
为了加深你对“自定义实现编解码”和“使用现成的编解码框架”两个路径的选择判断,我们来结合RocketMQ的通信协议分析一下。
从RocketMQ看编解码的实现
RocketMQ是业界唯一一个既支持自定义编解码,又支持成熟编解码框架的消息队列产品。RocketMQ 5.0之前支持的Remoting协议是自定义编解码,5.0之后支持的gRPC协议是基于Protobuf 编解码框架。
用Protobuf的主要原因是它选择gRPC框架作为通信框架。而gRPC框架中默认编解码器为Protobuf,编解码操作已经在gRPC的库中正确地定义和实现了,不需要单独开发。所以RocketMQ可以把重点放在Rocket消息队列本身的逻辑上,不需要在协议方面上花费太多精力。
接下来,我们看一下RocketMQ的“生产请求”在Remoting协议和gRPC协议中的协议结构。
如下图所示,自定义的Remoting协议的整体结构,包括协议头(消息头)和协议体(消息体)两部分。消息头包含请求操作码、版本、标记、扩展信息等通用信息。消息体包含的就是各个请求的具体内容,比如生产请求就是包含生产请求的请求数据,是一个复合的结构
生产请求接口的消息体中的具体内容,包含生产组、Topic、标记等生产请求需要携带的信息,服务端需要根据这些信息完成对应操作。
public class SendMessageRequestHeader......{
private String producerGroup;
private String topic;
private String defaultTopic;
private Integer defaultTopicQueueNums;
private Integer queueId;
private Integer sysFlag;
private Long bornTimestamp;
private Integer flag;
private String properties;
private Integer reconsumeTimes;
private boolean unitMode = false;
private boolean batch = false;
private Integer maxReconsumeTimes;
......
private transient byte[] body;
}
gRPC的生产消息的请求结构,内容简单,只需要定义生产请求需要携带的相关信息,比如Topic、用户属性、系统属性等。
message Message {
Resource topic = 1;
map<string, string> user_properties = 2;
SystemProperties system_properties = 3;
bytes body = 4;
}
message SendMessageRequest {
repeated Message messages = 1;
}
service MessagingService {
rpc SendMessage(SendMessageRequest) returns (SendMessageResponse) {}
}
对比上面两段代码,我们可以很直观地看到区别,gRPC的协议结构比Remoting的协议结构简单很多,比如不需要定义消息头,定义方式也更加简单。
有这样的区别主要有以下两点原因:
Remoting需要先设计协议的整体结构,协议头和协议体,而gRPC只需要关注协议体。因为gRPC已经完成了整体结构和协议体的设计。
Remoting需要在各个语言自定义实现相关代码,而gRPC基于Protobuf编解码框架,只要根据Protobuf语法定义好协议体的内容,就可以使用工具生成各语言的代码。
这也佐证了我们的看法,使用现成的编解码框架比自定义编解码更加方便和高效。
总结
到这里,今天的主要内容——消息队列的通信协议设计我们就学完了。
从功能支持、迭代速度、灵活性上考虑,大多数消息队列的核心通信协议都会优先考虑自定义的私有协议。
私有协议的设计主要考虑网络通信协议选择、应用通信协议设计、编解码实现三个方面。
网络通信协议选型,基于可靠、低延时的需求,大部分情况下应该选择TCP。
应用通信协议设计,分为请求协议和返回协议两方面。协议应该包含协议头和协议体两部分。协议头主要包含一些通用的信息,协议体包含请求维度的信息。
编解码,也叫序列化和反序列化。在实现上分为自定义实现和使用现成的编解码框架两个路径。
其中最重要的是应用通信协议部分的设计选型,这部分需要设计协议头和协议体。重要的是要思考协议头和协议体里面分别要放什么,放多了浪费带宽影响传输性能,放少了无法满足业务需求,需要频繁修改协议内容。另外,每个字段的类型也有讲究,需要尽量降低每次通信的数据大小。
所以应用通信协议的内容设计是非常考验技术功底或者经验的。有一个技巧是,如果需要实现自定义的协议,可以去参考一下业界主流的协议实现,看看都包含哪些元素,各自踩过什么坑。总结分析后,这样一般能设计出一个相对较好的消息队列。
另外,我们不可能一下子设计出完美的协议,所以核心是保证协议的向前兼容和向后兼容的能力,以便后续的升级和改造。
因为历史发展原因,业界大部分的消息队列的编解码都是自己实现的,只有近年兴起的Pulsar和RocketMQ的新版本选择了Protobuf作为编解码框架。从编解码框架的选择来看,如果是一个全新的项目或架构,使用现成的编解码框架比如Protobuf,是比较好的选择。
思考题
为什么业界的消息队列有多种标准的协议呢?
期待你的分享。今天我们没有展开讲各个消息队列的协议细节,如果你感兴趣也可以留言讨论。欢迎你把这节课分享给身边的朋友一起学习。我们下节课再见!
04|网络:如何设计高性能的网络模块?
今天我们讲消息队列的第二个基础知识点——网络模块。对消息队列来说,网络模块是核心组件之一,网络模块的性能很大程度上决定了消息传输的能力和整体性能。
如果你是Java技术栈的开发人员,讲到网络模块的开发,大概率第一反应就是Netty。Netty作为Java网络编程中最出名的类库,几乎主宰了Java的网络编程。 那消息队列网络模块 的选型, 是不是直接用 Netty 就可以了呢?
带着你的思考,我们开始今天的课程。
选型之前,我们得先知道要解决什么问题。消息队列是需要满足高吞吐、高可靠、低延时,并支持多语言访问的基础软件,网络模块最需要解决的是 性能、 稳定性、开发成本 三个问题。接下来我们就围绕这三点来思考消息队列网络模块应该怎样设计。首先我们先来分析一下网络模块的性能瓶颈可能在哪里。
网络模块的性能瓶颈分析
我们基于最基础的消息队列访问链路图分析。
对于 单个请求 来说,请求流程是:客户端(生产者/消费者)构建请求后,向服务端发送请求包 -> 服务端接收包后,将包交给业务线程处理 -> 业务线程处理完成后,将结果返回给客户端。其中可能消耗性能的有三个点。
编解码的速度。上节课我们详细讲过。
网络延迟。也就是客户端到服务端的网络延迟,这一点在软件层面几乎无法优化,取决于网络链路的性能,跟网络模块无关。
服务端/客户端网络模块的处理速度。发送/接收请求包后,包是否能及时被处理,比如当逻辑线程处理完成后,网络模块是否及时回包。这一点属于性能优化,是网络模块设计的核心工作,我们后续会细讲。
对于 并发请求 来说,在单个请求维度的问题的基础上,还需要处理高并发、高QPS、高流量等场景带来的性能问题。主要包含三个方面。
高效的连接管理:当客户端和服务端之间的TCP连接数很多,如何高效处理、管理连接。
快速处理高并发请求:当客户端和服务端之间的QPS很高,如何快速处理(接收、返回)请求。
大流量场景:当客户端和服务端之间的流量很高,如何快速吞吐(读、写)数据。
大流量场景,某种意义上是高并发处理的一种子场景。因为大流量分为单个请求包大并发小、单个请求包小并发大两种场景。第一种的瓶颈主要在于数据拷贝、垃圾回收、CPU占用等方面,主要依赖语言层面的编码技巧来解决,一般问题不大。第二种场景是我们需要主要解决的。
知道了瓶颈在哪里,接下来我们来具体看一下如何设计出一个高性能的网络模块。
高性能网络模块的设计实现
从技术上来看,高性能网络模块的设计可以分为如何高效管理大量的TCP连接、如何快速处理高并发的请求、如何提高稳定性和降低开发成本等三个方面。
基于多路复用技术管理 TCP 连接
从技术原理来看,高效处理大量TCP连接,在消息队列中主要有单条TCP连接的复用和多路复用两种技术思路。
1. 单条TCP连接的复用
这是在一条真实的TCP连接中,创建信道(channel,可以理解为虚拟连接)的概念。通过编程手段,我们把信道当做一条TCP连接使用,做到TCP连接的复用,避免创建大量TCP连接导致系统资源消耗过多。缺点是在协议设计和编码实现的时候有额外开发工作量,而且近年随着异步IO、IO多路复用技术的发展,这种方案有点多余。
因为语言特性、历史背景原因,RabbitMQ用的就是这种方案。
2. IO多路复用技术
主流的消息队列Kakfa、RocketMQ、Pulsar的网络模块都是基于IO多路复用的思路开发的。
IO多路复用技术,是指通过把多个IO的阻塞复用到同一个selector的阻塞上,让系统在单线程的情况下可以同时处理多个客户端请求。最大的优势是系统开销小,系统不需要创建额外的进程或者线程,降低了维护的工作量,也节省了资源。
目前支持IO多路复用的系统调用有Select、Poll、Epoll等,Java NIO库底层就是基于Epoll机制实现的。
不过,即使用了这两种技术, 单机能处理的连接数还是有上限的。
第一个上限是操作系统的FD上限,如果连接数超过了FD的数量,连接会创建失败。第二个限制是系统资源的限制,主要是CPU和内存。频繁创建、删除或者创建过多连接会消耗大量的物理资源,导致系统负载过高。
所以你会发现, 每个消息队列的配置中都会提到连接数的限制和系统 FD 上限调整。Linux中可以通过命令查看系统的FD信息。
//查看能打开FD的数量
ulimit -n //用户级限制
cat /proc/sys/fs/file-max //系统级限制
//临时修改最大数量
ulimit -n 100000 //将最大值改为100000
解决了第一个问题连接处理,我们看第二个问题:如何快速处理高并发请求。
基于Reactor模型处理高并发请求
先看单个请求的处理。
我们知道,两点之间直线最短。对于单个请求来说,最快的处理方式就是客户端直接发出请求,服务端接收到包后,直接丢给后面的业务线程处理,当业务线程处理成功后,直接返回给客户端。
这种处理模式是最快的,但是这里有两个问题需要解决。
如何第一时间拿到包交给后端的业务逻辑处理?
当业务逻辑处理完成后,如何立即拿到返回值返回给客户端?
我们最直观的思路就是阻塞等待模型,不断轮询等待请求拿到包,业务逻辑处理完,直接返回结果给客户端。这种处理是最快的。但是阻塞等待模型因为是串行的处理机制,每个请求需要等待上一个请求处理完才能处理,处理效率会很低。所以,单个请求,最合理的方式就是 异步的事件驱动模型,可以通过Epoll和异步编程来解决。
再看高并发请求的情况。
在高并发的情况下会有很多连接、请求需要处理,核心思路就是并行、多线程处理。那如何并行处理呢?这时候就需要用到 Reactor 模型了。
Reactor 模型是一种处理并发服务请求的事件设计模式,当主流程收到请求后,通过多路分离处理的方式,把请求分发给相应的请求处理器处理。如下图所示,Reactor 模式包含Reactor、Acceptor、Handler三个角色。
Reactor:负责监听和分配事件。收到事件后分派给对应的 Handler处理,事件包括连接建立就绪、读就绪、写就绪等。
Acceptor:负责处理客户端新连接。Reactor 接收到客户端的连接事件后,会转发给 Acceptor,Acceptor接收客户端的连接,然后创建对应的Handler,并向Reactor注册此 Handler。
Handler:请求处理器,负责业务逻辑的处理,即业务处理线程。
从技术上看,Reactor模型一般有三种实现模式。
单 Reactor 单线程模型(单 Reactor 单线程)
单 Reactor 多线程模型 (单 Reactor 多线程)
主从 Reactor 多线程模型 (多 Reactor 多线程)
我们具体分析一下,看消息队列更适合哪一种。
单 Reactor 单线程模型,特点是Reactor和Handler都是单线程的串行处理。
优点是所有处理逻辑放在单线程中实现,没有上下文切换、线程竞争、进程通信等问题。缺点是在性能与可靠性方面存在比较严重的问题。
性能上,因为是单线程处理,无法充分利用 CPU 资源,并且业务逻辑Handler的处理是同步的,容易造成阻塞,出现性能瓶颈。可靠性主要是因为单Reactor是单线程的,如果出现异常不能处理请求,会导致整个系统通信模块不可用。
所以单 Reactor 单进程模型不适用于计算密集型的场景,只适用于业务处理非常快速的场景。
相比起来,单 Reactor 多线程模型,业务逻辑处理Handler 变成了多线程,也就是说,获取到 IO读写事件之后,业务逻辑是一批线程在处理。
优点是 Handler 收到响应后通过 send 把响应结果返回给客户端,降低 Reactor 的性能开销,提升整个应用的吞吐。而且 Handler 使用多线程模式,可以充分利用 CPU 的性能,提高了业务逻辑的处理速度。
缺点是 Handler 使用多线程模式,带来了多线程竞争资源的开销,同时涉及共享数据的互斥和保护机制,实现比较复杂。另外,单个 Reactor 承担所有事件的监听、分发和响应,对于高并发场景,容易造成性能瓶颈。
在此基础上,主从 Reactor 多线程模型,是让Reactor也变为了多线程。
当前业界消息队列的网络模型,比如Pulsar、Kafka、RocketMQ,为了保证性能,都是基于主从 Reactor 多线程模型开发的。
这种方案,优点是Reactor的主线程和子线程分工明确。主线程只负责接收新连接,子线程负责完成后续的业务处理。同时主线程和子线程的交互也很简单,子线程接收主线程的连接后,只管业务处理即可,无须关注主线程,可以直接在子线程把处理结果返回给客户端。所以,主从Reactor 多线程模型适用于高并发场景,Netty 网络通信框架也采用了这种实现。
缺点是如果基于NIO从零开始开发,开发的复杂度和成本较高。另外,Acceptor是一个单线程,如果挂了,如何处理客户端新连接是一个风险点。
为了解决Acceptor的单点问题,有些组件为了保证高可用性,会对主从 Reactor 多线程做一些优化,把Acceptor也变为多线程的形态。我们在公有云上商业化版本的 Kafka 就是使用的这种模型。
讲到这里,基于IO多路复用技术和Reactor模型,我们已经可以解决网络模块的性能问题了。接下来我们来看如何提高网络模块的稳定性和降低开发成本。
基于成熟网络框架提高稳定性并降低开发成本
这里的“稳定性”主要指代码的稳定性。因为网络模块的特点是编码非常复杂,要考虑的细节和边界条件非常多,一些异常情况的处理也很细节,需要经过长时间的打磨。但是一旦开发完成,稳定后,代码几乎不需要再改动,因为需求是相对固定的。
在Java中,网络编程的核心是一个基础的类库——Java NIO库,它的底层是基于Linux/Unix IO复用模型Epoll实现的。
如果我们要基于Java NIO库开发一个Server,需要处理网络的闪断、客户端的重复接入、连接管理、安全认证、编解码、心跳保持、半包读写、异常处理等等细节,工作量非常大。所以在消息队列的网络编程模型中, 为了提高稳定性或者降低成本,选择现成的、成熟的NIO框架是一个更好的方案。
而Netty就是这样一个基于Java NIO封装的成熟框架。所以我们一提到Java的网络编程,最先想到的就是Netty。当前业界主流消息队列RocketMQ、Pulsar也都是基于Netty开发的网络模块,Kafka 因为历史原因是基于Java NIO实现的。
接下来我们以RocketMQ和Kafka的网络模型为例,来分析一下主流消息队列的网络模型的设计实现。
主流消息队列的网络模型实现
Kafka 网络模型
Kafka的网络层没有用Netty作为底层的通信库,而是直接采用Java NIO实现网络通信。在网络模型中,也是参照Reactor多线程模型,采用多线程、多Selector的设计。
看整个网络层的结构图。Processor线程和Handler线程之间通过RequestChannel传递数据,RequestChannel中包含一个RequestQueue队列和多个ResponseQueues队列。每个Processor线程对应一个ResponseQueue。
具体流程上:
一个Acceptor接收客户端建立连接的请求,创建Socket连接并分配给Processor处理。
Processor线程把读取到的请求存入RequestQueue中,Handler线程从RequestQueue队列中取出请求进行处理。
Handler线程处理请求产生的响应,会存放到Processor对应的ResponseQueue中,Processor 线程从其对应的ResponseQueue中取出响应信息,并返回给客户端。
RocketMQ 网络模型
RocketMQ 采用Netty组件作为底层通信库,遵循Reactor多线程模型,同时又在Reactor模型上做了一些扩展和优化。所以它的网络模型是Netty的网络模型,Netty底层采用的是主从Reactor多线程模型,模型的原理逻辑跟前面讲到的主从Reactor多线程模型是一样的。
在主从Reactor多线程模型的理论基础上,我们来分析一下RocketMQ中NettyRemotingServer 的具体实现形式。
具体流程上:
一个 Reactor 主线程负责监听 TCP网络连接请求,建立好连接,创建SocketChannel,并注册到Selector上。RocketMQ的源码中会自动根据OS的类型选择NIO和Epoll,也可以通过参数配置,监听真正的网络数据。
接收到网络数据后,会把数据传递给Reactor线程池处理。
真正执行业务逻辑之前,会进行SSL验证、编解码、空闲检查、网络连接管理,这些工作在Worker线程池处理(defaultEventExecutorGroup)。
处理业务操作,放在业务Processor线程池中执行。
从Kafka和RocketMQ的网络模型的实现来看,网络模块既可以基于原生的Java NIO,也可以基于NIO的框架(如Netty)来完成开发,不过基本思想都是基于IO多路复用技术和Reactor模型来提高处理性能、完成具体的编码实现。
但是到这里还没有结束,NIO编程属于TCP层网络编程,我们还需要进行协议设计、编解码、链路的建立/关闭等工作,才算完成一个完整的网络模块的开发。有没有更好的方案可以解决这些问题,减少我们的工作量呢?
NIO 编程和 RPC 框架
要想不关心底层的调用细节(如底层的网络协议和传输协议等),我们可以调用远端机器上的函数或方法来实现,也就是RPC(Remote Procedure Call)远程过程调用。
因为RPC调用的是一个远端对象,调用者和被调用者处于不同的节点上,想完成调用,必须实现4个能力。
网络传输协议:远端调用底层需要经过网络传输,所以需要选择网络通信协议,比如TCP。
应用通信协议:网络传输需要设计好应用层的通信协议,比如HTTP2或自定义协议。
服务发现:调用的是远端对象,需要可以定位到调用的服务器地址以及调用的具体方法。
序列化和反序列化: 网络传输的是二进制数据,因此RPC框架需要自带序列化和反序列化的能力。
讲到这里,不知道你有没有发现,RPC框架完成的工作等于上节课通信协议和前面讲的网络模块设计两部分的工作。在当前的微服务架构中,RPC已经是我们很熟悉、很常用且很成熟的技术了。
那RPC框架作为消息队列中的网络模块会有哪些优缺点呢?
我们以gRPC框架举例分析。gRPC是Google推出的一个RPC框架,可以说是RPC框架中的典型代表。主要有以下三个优点:
gRPC 内核已经很好地实现了服务发现、连接管理、编解码器等公共部分,我们可以把开发精力集中在消息队列本身,不需要在网络模块消耗太多精力。
gRPC 几乎支持所有主流编程语言,开发各个消息队列的SDK可以节省很多开发成本。
很多云原生系统,比如Service Mesh都集成了gRPC协议,基于HTTP2的gRPC的消息队列很容易被云原生系统中的其他组件所访问,组件间的集成成本很低。
但是当前主流的消息队列都不支持gRPC框架,这是因为如果支持就要做很大的架构改动。而且,gRPC底层默认是七层的HTTP2协议,在性能上,可能比直接基于TCP协议实现的方式差一些。但是HTTP2本身在性能上做了一些优化,从实际表现来看,性能损耗在大部分场景下是可以接受的。
所以如果是一个新设计的消息队列或者消息队列的新架构,通过成熟的RPC框架来实现网络模块是一个蛮不错的方案。比如RocketMQ 5.0中的Proxy就使用gRPC框架实现了网络模块。
总结
消息队列的网络模块主要解决的是性能、稳定性、成本三个方面的问题。
性能问题,核心是通过 Reactor 模型、IO 多路复用技术解决的。Reactor模式在Java网络编程中用得非常广泛,比如 Netty 就实现了 Reactor 多线程模型。即使不用Netty进行网络编程(比如Kafka 直接基于Java NIO编程)的情况下,网络模块也大多是参考或基于Reactor模式实现的。因为Reactor模式可以结合多路复用、异步调用、多线程等技术解决高并发、大流量场景下的网络模块的性能问题。
在Java技术栈下,网络编程的核心是Java NIO。但为了解决稳定性和开发成本的问题,建议选择业界成熟的网络框架来实现网络模块,而不是基于原生的Java NIO来实现。成熟的框架分为成熟的NIO框架(如Netty)和成熟的RPC框架(如gRPC)。
目前业界主流的消息队列都是基于Java NIO和Netty实现的。Netty是我们网络模块编程的常用选型,大部分情况下,可能还是我们的最终选择。但是Netty好用并不意味着所有的Java网络编程都必须选择Java NIO和Netty。
当你需要构建一个组件的网络模块的时候,你要先知道这个组件的业务特点是什么,需要解决哪些问题,再来考虑使用什么技术。比如在客户端连接数不多、并发不高,流量也很小的场景,只需要一个简单的网络Server就够了,完全没必要选择Java NIO或Netty来实现你的网络模块。随着技术架构的迭代,基于RPC框架的方案也是一个不错的选择。
思考题
假如你的团队需要开发一款新的消息队列,你需要完成网络模块的选型开发设计,你的思考路径是什么?
欢迎分享你的思考,如果觉得有收获,也欢迎你把这节课分享给身边的朋友。我们下节课再见!
上节课思考闭环
为什么业界的消息队列有多种标准的协议呢?
业界的消息队列有多种标准的协议,如MQTT、AMQP、OpenMessaging。主要是因为业务场景不一样,一套协议标准无法满足多种场景需要。
MQTT是为了满足物联网领域的通信而设计的,背景是网络环境不稳定、网络带宽小,从而需要极精简的协议结构,并允许可能的数据丢失。
AMQP是主要面向业务消息的协议,因为要承载复杂的业务逻辑,所以协议设计上要尽可能丰富,包含多种场景,并且在传输过程中不允许出现数据丢失。因为AMQP协议本身的设计具有很多局限,比如功能太简单,所以不太符合移动互联网、云原生架构下的消息需求。
OpenMessaging的设计初衷是设计一个符合更多场景的消息队列协议。