DefaultMQPushConsumer 核心参数与工作原理

Push模型消息拉取机制

image-20250126143819864

其核心关键点如下:

  1. 经过队列负载机制后,会分配给当前消费者一些队列,注意一个消费组可以订阅多个主题,正如上面 pullRequestQueue 中所示,topic_test、topic_test1 这两个主题都分配了一个队列。
  2. 轮流从 pullRequestQueue 中取出一个 PullRequest 对象,根据该对象中的拉取偏移量向 Broker 发起拉取请求,默认拉取 32 条,可通过上文中提到的 pullBatchSize 参数进行改变,该方法不仅会返回消息列表,还会返更改 PullRequest 对象中的下一次拉取的偏移量。
  3. 接收到 Broker 返回的消息后,会首先放入 ProccessQueue(处理队列),该队列的内部结构为 TreeMap,key 存放的是消息在消息消费队列(consumequeue)中的偏移量,而 value 为具体的消息对象。
  4. 然后将拉取到的消息提交到消费组内部的线程池,并立即返回,并将 PullRequest 对象放入到 pullRequestQueue 中,然后取出下一个 PullRequest 对象继续重复消息拉取的流程,从这里可以看出,消息拉取与消息消费是不同的线程。
  5. 消息消费组线程池处理完一条消息后,会将消息从 ProccessQueue 中,然后会向 Broker 汇报消息消费进度,以便下次重启时能从上一次消费的位置开始消费。

消息消费进度提交

通过上面的介绍,想必读者应该对消息消费进度有了一个比较直观的认识,接下来我们再来介绍一下 RocketMQ PUSH 模式的消息消费进度提交机制。

通过上文的消息消费拉取模型可以看出,消息消费组线程池在处理完一条消息后,会将消息从 ProccessQueue 中移除,并向 Broker 汇报消息消费进度,那请大家思考一下下面这个问题:

image-20250126143838943

例如现在处理队列中有 5 条消息,并且是线程池并发消费,那如果消息偏移量为 3 的消息(3:msg3)先于偏移量为 0、1、2 的消息处理完,那向 Broker 如何汇报消息消费进度呢?

有读者朋友说,消息 msg3 处理完,当然是向 Broker 汇报 msg3 的偏移量作为消息消费进度呀。但细心思考一下,发现如果提交 msg3 的偏移量为消息消费进度,那汇报完毕后如果消费者发生内存溢出等问题导致 JVM 异常退出,msg1 的消息还未处理,然后重启消费者,由于消息消费进度文件中存储的是 msg3 的消息偏移量,会继续从 msg3 开始消费,会造成消息丢失。显然这种方式并不可取。

RocketMQ 采取的方式是处理完 msg3 之后,会将 msg3 从消息处理队列中移除,但在向 Broker 汇报消息消费进度时是取 ProceeQueue 中最小的偏移量为消息消费进度,即汇报的消息消费进度是 0。

image-20250126143931458

即如果处理队列如上图所示,那提交的消息进度为 2。但这种方案也并非完美,有可能会造成消息重复消费,例如如果发生内存溢出等异常情况,消费者重新启动,会继续从消息偏移量为 2 的消息开始消费,msg3 就会被消费多次,故RocketMQ 不保证消息重复消费

消息消费进度具体的提交流程如下图所示:

image-20250126144455180

从这里也可以看成,为了减少消费者与 Broker 的网络交互,提高性能,提交消息消费进度时会首先存入到本地缓存表中,然后定时上报到 Broker,同样 Broker 也会首先存储本地缓存表,然后定时刷写到磁盘。

Push模型参数注意事项

ConsumeFromWhere 注意事项

下面首先先看一段 RokcetMQ PUSH 模式消费者的常见使用方式:

1

构建需要通过 setConsumeFromWhere(…) 指定从哪消费,正如上篇提到的,RocketMQ 支持从最新消息、最早消息、指定时间戳这三种方式进行消费。大家可以思考一下,如果一个消费者启动运行了一段时间,由于版本发布等原因需要先停掉消费者,代码更新后,再启动消费者时消费者还能使用上面这三种策略,从新的一条消息消费吗?如果是这样,在发版期间新发送的消息将全部丢失,这显然是不可接受的,要从上一次开始消费的时候消费,才能保证消息不丢失。

故 ConsumeFromWhere 这个参数的含义是,初次启动从何处开始消费。更准确的表述是,如果查询不到消息消费进度时,从什么地方开始消费

所以在实际使用过程中,如果对于一个设置为 CONSUME_FROM_FIRST_OFFSET 的运行良久的消费者,当前版本的业务逻辑进行了重大重构,而且业务希望是从最新的消息开始消费,想通过如下代码来实现其业务意图,则显然是不成功的。

1
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

上面做法是错误的,要达到业务目标,需要使用 RocketMQ 提供的重置位点,其命令如下:

1
sh ./mqadmin resetOffsetByTime -n 127.0.0.1:9876  -g CID_CONSUMER_TEST -t TopicTest -s now

其中参数说明如下:

  • -n:NameServer 地址
  • -g:消费组名称
  • -t:主题名称
  • -s:时间戳,可选值为 now、时间戳(毫秒)、yyyy-MM-dd#HH:mm:ss:SSS

当然也可以通过 RocketMQ-Console 重置位点,操作如下图所示:

2

消费组线程数设置注意事项

在 RocketMQ 中,每一个消费组都会启动一个线程池用来实现消费端在消费组的隔离,RocketMQ 也提供了 consumeThreadMin、consumeThreadMax 两个参数来设置线程池中的线程个数,但是由于线程池内部持有的队列为一个无界队列,导致 consumeThreadMax 大于 consumeThreadMin,线程个数最大也只能 consumeThreadMin 个线程数量,故在实践中,往往会将这两个值设置为相同,避免给大家带来一种误解,在消息端消息很多的情况,会创建更多的线程来提高消息的处理速率。

小技巧:RocketMQ 中的消费组线程的名称会以 ConsumeMessageThread_ 开头,例如下图。

9

批量消费注意事项

RocketMQ 支持消息批量消费,在消费端与批量消费相关的两个参数分别为:

  • pullBatchSize:消息客户端一次向 Broker 发送拉取消息每批返回最大的消息条数,默认为 32。
  • consumeMessageBatchMaxSize:提交到消息消费监听器中的消息条数,默认为 1。

consumeMessageBatchMaxSize

默认情况下一次消息会拉取 32 条消息,但业务监听器收到的消息默认一条,为了更直观对其了解,现给出如下示例代码:

10

如果将 consumeMessageBatchMaxSize 设置 10,其运行效果如下图所示:

11

可以看到该参数生效了,consumeMessageBatchMaxSize 这个参数非常适合批处理,例如结合数据库的批处理,能显著提高性能。

pullBatchSize

大家发现了一个问题,如果单条消息的处理时间较快,通过增加消费组线程个数无法显著提高消息的消费 TPS,并且通过 jstack 命令,看到几乎所有的线程都处于等待处理任务,其截图类似如下:

12

此种情况说明线程都“无所事事”,应该增大其工作量,自然而然地需要增大每一批次消息拉取的数量。故尝试每一次消息拉取 100 条,每批消费 50 条。即通过如下代码进行设置:

1
2
consumer.setPullBatchSize(100);
consumer.setConsumeMessageBatchMaxSize(200);

这里设置 consumeMessageBatchMaxSize 的值大于 pullBatchSize 的主要目的,就是验证每一次拉取的消息,因为如果 consumeMessageBatchMaxSize 大于 pullBatchSize,那每次批处理的消息条数等于 pullBatchSize,如果 consumeMessageBatchMaxSize 小于 pullBatchSize,会在客户端分页,然后尽最大可能一次传入 consumeMessageBatchMaxSize 条消息。

为了确保有足够的消息,在消息拉取之前,我建议先使用生产者压入大量消息。

13

发现每批拉取的条数最多不会超过 32,显然服务端有足够的消息够拉取。

这是因为 Broker 端对消息拉取也提供了保护机制,同样有参数可以控制一次拉取最多返回消息的条数,其参数主要如下:

1
int maxTransferCountOnMessageInMemory

如果此次消息拉取能全部命中,内存允许一次消息拉取的最大条数,默认值为 32 条。

1
int maxTransferBytesOnMessageInMemory

如果此次消息拉取能全部命中,内存允许一次消息拉取的最大消息大小,默认为 256K。

1
int maxTransferCountOnMessageInDisk

如果此次消息无法命中,内存需要从磁盘读取消息,则每一次拉取允许的最大条数,默认为 8。

1
int maxTransferBytesOnMessageInDisk

如果此次消息无法命中,内存需要从磁盘读取消息,则每一次拉取允许的消息总大小,默认为 64K。

故如果需要一次拉取 100 条消息,还需要修改 broker 端相关的配置信息,通常建议修只修改命中内存相关的,如果要从磁盘拉取,为了包含 Broker,maxTransferCountOnMessageInDisk、maxTransferBytesOnMessageInDisk 保持默认值。

如果使用场景是大数据领域,建议的配置如下:

1
2
maxTransferCountOnMessageInMemory=5000
maxTransferBytesOnMessageInMemory = 5000 * 1024

如果是业务类场景,建议配置如下:

1
2
maxTransferCountOnMessageInMemory=2000
maxTransferBytesOnMessageInMemory = 2000 * 1024

修改 Broker 相关配置后,再运行上面的程序,其返回结果如下:

14

订阅关系不一致导致消息丢失

在 RocketMQ 中,一个消费组能订阅多个主题,也能订阅多个 Tag,多个 Tag 用 || 分割,但同一个消费组中的所有消费者的订阅关系必须一致,不能一个订阅 TAGA,另外一个消费者却订阅 TAGB

一条消息的 Tag 为 TAGA,并且消费组 dw_tag_test 其中一个消费者有订阅 TAGA,那为什么还会显示 CONSUMED_BUT_FILTERED,这个状态代表的含义是,该条消息不符合消息过滤规则被过滤了,其原理图如下所示:

image-20250126150301779

其本质原因是,一个队列同一时间只会分配给一个消费者,这样队列上不符合的消息消费会被过滤,并且消息消费进度会向前移动,这样就会造成消息丢失。

DefaultLitePullConsumer 核心参数与实战

DefaultMQPullConsumer(PULL 模式)的 API 太底层,使用起来及其不方便,RocketMQ 官方设计者也注意到这个问题,为此在 RocketMQ 4.6.0 版本中引入了 PULL 模式的另外一个实现类 DefaultLitePullConsumer,即从 4.6.0 版本后,DefaultMQPullConsumer 已经被标记为废弃,故接下来将重点介绍 DefaultLitePullConsumer,并探究如何在实际中运用它解决相关问题。

Lite Pull 与 PUSH 模式之对比

从上面的示例可以看出 Lite PULL 相关的 API 比 4.6.0 之前的 DefaultMQPullConsumer 的使用上要简便不少,从编程风格上已非常接近了 PUSH 模式,其底层的实现原理是否也一致呢?显然不是的,请听我我慢慢道来。

不知大家是否注意到,Lite PULL 模式下只是通过 poll() 方法拉取一批消息,然后提交给应用程序处理,采取自动提交模式下位点的提交与消费结果并没有直接挂钩,即消息如果处理失败,其消费位点还是继续向前继续推进,缺乏消息的重试机制。为了论证笔者的观点,这里给出 DefaultLitePullConsumer 的 poll() 方法执行流程图,请大家重点关注位点提交所处的位置。

image-20250126151333790

Lite Pull 模式的自动提交位点,一个非常重要的特征是 poll() 方法一返回,这批消息就默认是消费成功了,一旦没有处理好,就会造成消息丢失,那有没有方法解决上述这个问题呢,seek 方法就闪亮登场了,在业务方法处理过程中,如果处理失败,可以通过 seek 方法重置消费位点,即在捕获到消息业务处理后,需要根据返回的第一条消息中(MessageExt)信息构建一个 MessageQueue 对象以及需要重置的位点。

Lite Pull 模式的消费者相比 PUSH 模式的另外一个不同点是 Lite Pull 模式没有消息消费重试机制,PUSH 模式在并发消费模式下默认提供了 16 次重试,并且每一次重试的间隔不一致,极大的简化了编程模型。在这方面 Lite Pull 模型还是会稍显复杂。

Lite Pull 模式针对 PUSH 模式一个非常大亮点是消息拉取线程是以消息消费组为维度的,而且一个消费者默认会创建 20 个拉取任务,在消息拉取效率方面比 PUSH 模型具有无可比拟的优势,特别适合大数据领域的批处理任务,即每隔多久运行一次的拉取任务。

消息位点

消息是按到达服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的Long类型坐标,这个坐标被定义为消息位点。

队列中最早一条消息的位点为最小消息位点(MinOffset);最新一条消息的位点为最大消息位点(MaxOffset)。虽然消息队列逻辑上是无限存储,但由于服务端物理节点的存储空间有限,云消息队列 RocketMQ 版会滚动删除队列中存储最早的消息。因此,消息的最小消费位点和最大消费位点会一直递增变化。

消费位点

每个主题的队列都可以被多个消费者分组订阅。若某条消息被某个消费者消费后直接被删除,则其他订阅了该主题的消费者将无法消费该消息。

因此,云消息队列 RocketMQ 版通过消费位点管理消息的消费进度。每条消息被某个消费者消费完成后不会立即在队列中删除,云消息队列 RocketMQ 版会基于每个消费者分组维护一份消费记录,该记录指定消费者分组消费某一个队列时,消费过的最新一条消息的位点,即消费位点。

当消费者客户端离线,又再次重新上线时,会严格按照服务端保存的消费进度继续处理消息。如果服务端保存的历史位点信息已过期被删除,此时消费位点向前移动至服务端存储的最小位点。

因此,不同应用需要设置不同的消费者组,每个应用都能独立地接收和处理该 Topic 下的所有消息,互不干扰。而且消息队列会为每个消费者组独立记录其在 Topic 中的消费位点(offset)。这意味着每个消费者组都可以从自己上次消费的位置继续消费,互不影响。

长轮询实现原理

PULL 模式通常适合大数据领域的批处理操作,对消息的实时性要求不高,更加看重的是消息的拉取效率,即一次消息需要拉取尽可能多的消息,这样方便一次性对大量数据进行处理,提高数据的处理效率,特别是希望一次消息拉取再不济也要拉取点消息,不要出现太多无效的拉取请求(没有返回消息的拉取请求)。

首先大家来看一下如下这个场景:

5

即 Broker 端没有新消息时,Broker 端采取何种措施呢?我想基本有如下两种策略进行选择:

  • Broker 端没有新消息,立即返回,拉取结果中不包含任何消息。
  • 当前拉取请求在 Broker 端挂起,在 Broker 端挂起,并且轮询 Broker 端是否有新消息,即轮询机制。

上面说的第二种方式,有一个“高大上”的名字——轮询,根据轮询的方式又可以分为长轮询、短轮询

  • 短轮询:第一次未拉取到消息后等待一个时间间隔后再试,默认为 1s,可以在 Broker 的配置文件中设置 shortPollingTimeMills 改变默认值,即轮询一次,注意:只轮询一次
  • 长轮询:可以由 PULL 客户端设置在 Broker 端挂起的超时时间,默认为 20s,然后在 Broker 端没有拉取到消息后默认每隔 5s 一次轮询,并且在 Broker 端获取到新消息后,会唤醒拉取线程,结束轮询,尝试一次消息拉取,然后返回一批消息到客户端,长轮询的时序图如下所示:

6

从这里可以看出,长轮询比短轮询,轮询等待的时间长,短轮询只轮询一次,并且默认等待时间为 1s,而长轮询默认一次阻塞 5s,但支持被唤醒。

在 broker 端与长轮询相关的参数如下:

  • longPollingEnable:是否开启长轮询,默认为 true。
  • shortPollingTimeMills:短轮询等待的时间,默认为 1000,表示 1s。

结合实际场景顺序消费、消息过滤实战

顺序消费

业务场景描述

现在开发一个银行类项目,对用户的每一笔余额变更都需要发送短信通知到用户。如果用户同时在电商平台下单,转账两个渠道在同一时间进行了余额变更,此时用户收到的短信必须顺序的,例如先网上购物,消费了 128,余额 1000,再转账给朋友 200,剩余余额 800,如果这两条短信的发送顺序颠倒,给用户会带来很大的困扰,故在该场景下必须保证顺序。这里所谓的顺序,是针对同一个账号的,不同的账号无需保证顺序性,例如用户 A 的余额发送变更,用户 B 的余额发生变更,这两条短信的发送其实相互不干扰的,故不同账号之间无需保证顺序。

代码实现

本篇代码主要采用截图的方式展示其关键代码,并对其进行一些简单的解读。

1

首先这里的主业务是操作账户的余额,然后是余额变更后需要发短信通知给用户,但由于发送短信与账户转载是两个相对独立但又紧密的操作,故这里可以引入消息中间件来解耦这两个操作。但由于发送短信业务,其顺序一定要与扣款的顺序保证一致,故需要使用顺序消费。

由于 RocketMQ 只提供了消息队列的局部有序,故如果要实现某一类消息的顺序执行,就必须将这类消息发送到同一个队列,故这里在消息发送时使用了 MessageQueueSelector,并且使用用户账户进行队列负载,这样同一个账户的消息就会账号余额变更的顺序到达队列,然后队列中的消息就能被顺序消费。

2

顺序消费的事件监听器为 MessageListenerOrderly,表示顺序消费。

顺序消费在使用上比较简单,那 RocketMQ 顺序消费是如何实现的?队列重新负载时还能保持顺序消费吗?顺序消费会重复消费吗?

RocketMQ 顺序消费原理简述

在 RocketMQ 中,PUSH 模式的消息拉取模型如下图所示:

image-20250126143819864

上述流程在前面的章节中已做了详述,这里不再累述,这里想重点突出线程池。

RocketMQ 消息消费端按照消费组进行的线程隔离,即每一个消费组都会创建已线程池,由一个线程池负责分配的所有队列中的消息。

所以要保证消费端对单队列中的消息顺序处理,故多线程处理,需要按照消息消费队列进行加锁。故顺序消费在消费端的并发度并不取决消费端线程池的大小,而是取决于分给给消费者的队列数量,故如果一个 Topic 是用在顺序消费场景中,建议消费者的队列数设置增多,可以适当为非顺序消费的 2~3 倍,这样有利于提高消费端的并发度,方便横向扩容。

消费端的横向扩容或 Broker 端队列个数的变更都会触发消息消费队列的重新负载,在并发消息时在队列负载的时候一个消费队列有可能被多个消费者同时消息,但顺序消费时并不会出现这种情况,因为顺序消息不仅仅在消费消息时会锁定消息消费队列,在分配到消息队列时,能从该队列拉取消息还需要在 Broker 端申请该消费队列的锁,即同一个时间只有一个消费者能拉取该队列中的消息,确保顺序消费的语义。

从前面的文章中也介绍到并发消费模式在消费失败是有重试机制,默认重试 16 次,而且重试时是先将消息发送到 Broker,然后再次拉取到消息,这种机制就会丧失其消费的顺序性,故如果是顺序消费模式,消息重试时在消费端不停的重试,重试次数为 Integer.MAX_VALUE,即如果一条消息如果一直不能消费成功,其消息消费进度就会一直无法向前推进,即会造成消息积压现象。

温馨提示:顺序消息时一定要捕捉异常,必须能区分是系统异常还是业务异常,更加准确的要能区分哪些异常是通过重试能恢复的,哪些是通过重试无法恢复的。无法恢复的一定要尽量在发送到 MQ 之前就要拦截,并且需要提高告警功能。

消息过滤实战

业务场景描述

例如公司采用的是微服务架构,分为如下几个子系统,基础数据、订单模块、商家模块,各个模块的数据库都是独立的。微服务带来的架构伸缩性不容质疑,但数据库的相互独立,对于基础数据的 join 操作就不那么方便了,即在订单模块需要使用基础数据,还需要通过 Dubbo 服务的方式去请求接口,为了避免接口的调用,基础数据的数据量又不是特别多的情况,项目组更倾向于将基础数据的数据同步到各个业务模块的数据库,然后基础数据发生变化及时通知订单模块,这样与基础数据的表 join 操作就可以在本库完成。

技术方案

4

上述方案的关键思路:

  1. 基础数据一旦数据发生变化,就向 MQ 的 base_data_topic 发送一条消息。
  2. 下游系统例如订单模块、商家模块订阅 base_data_topic 完成数据的同步。

问题,如果订单模块出现一些不可预知的错误,导致数据同步出现异常,并且发现的时候,存储在 MQ 中的消息已经被删除,此时需要上游(基础数据)重推数据,这个时候,如果基础数据重推的消息直接发送到 base_data_topic,那该 Topic 的所有消费者都会消费到,这显然是不合适的。怎么解决呢?

通常有两种办法:

  • 为各个子模块创建另外一个主题,例如 retry_ods_base_data_topic,这样需要向哪个子系统就向哪个 Topic 发送。
  • 引入 Tag 机制。

本节主要来介绍一下 Tag 的思路。

首先,正常情况下,基础模块将数据变更发送到 base_data_topic,并且消息的 Tag 为 all。然后为每一个子系统定义一个单独的重推 Tag,例如 ods、shop。

消费端同时订阅 all 和各自的重推 Tag,完美解决问题。

代码实现

在消息发送时需要按照需求指定消息的 Tag,其示例代码如下:

5

然后在消息消费时订阅时,更加各自的模块订阅各自关注的 Tag,其示例代码如下:

6

在消息订阅时一个消费组可以订阅多个 Tag,多个 Tag 使用双竖线分隔。

Topic 与 Tag 之争

用 Tag 对同一个主题进行区分会引来一个“副作用”,就是在重置消息消费位点时该消费组需要“处理”的是所有标签的数据,虽然在 Broker 端、消息消费端最终会过滤,不符合 Tag 的消息并不会执行业务逻辑,但在消息拉取时还是需要将消息读取到 PageCache 中并进行过滤,会有一定的性能损耗,但这个不是什么大问题。

在数据推送这个场景,除了使用 Tag 机制来区分重推数据外,也可以为重推的数据再申请一个额外的主题,即通过主题来区分不同的数据,这种方案倒不说不可以,但这个在运维管理层面需要申请众多的 Topic,而这类 Topic 存储的其实是一类数据,使用不同的 Topic 存储同类数据,会显得较为松散。当然如果是不同的业务场景,就建议使用 Topic 来隔离。

消息过滤常见问题

  1. 多个消费者订阅同一个Topic下的不同Tag,出现消息丢失情况。

可能原因:若多个消费者是通过同一个消费者分组(Group ID)订阅的指定Topic,则所有消费者的过滤条件即订阅的Tag要一致,否则会出现订阅关系不一致,导致部分消息丢失。

  1. 消费者在线无消费消息,但Group有堆积。

采用SQL/TAG消费过滤的方式,未被过滤条件命中的消息会计算为消息堆积,堆积量的计算如下所示。

image

  • SQL消费方式:堆积量 = 已就绪的消息量 + 处理中的消息量 - 未被SQL命中的消息数量。
  • TAG消费方式:堆积量 =(已就绪的消息量 + 处理中的消息量)* TAG标签消息百分比。