传统消息队列在 AI 场景的局限性

在传统互联网应用中,消息队列广泛用于服务解耦、异步通信和削峰填谷等场景。它通过异步化的事件驱动方式,提升系统可扩展性与稳定性。典型场景如订单处理、日志收集、通知推送等。传统消息队列(如 Apache RocketMO、Kafka)强调高并发写入、顺序消费和基本负载均衡,已形成成熟的技术范式。

然而,随着生成式 AI 的兴起,AI 应用呈现出截然不同的业务特征:推理耗时长达分钟级、上下文数据高达上百 MB、多轮对话需长期维护状态、多 Agent 协同依赖复杂异步编排,且严重依赖昂贵的 GPU 资源。

在此背景下,传统消息队列暴露出明显局限:无法高效支持百万级长会话隔离、缺乏对大消息的优化传输、难以实现消费速度的精细控制,更不具备优先级调度与资源导向的智能负载均衡能力。简单的异步模型已无法一步满足 AI 场景下对稳定性、成本控制与任务优先级的严苛要求。

因此在A1云原生架构下的消息队列必须具备以下特点:

  • 支持长会话与大消息体的消息中枢。
  • 实现削峰填谷、定速消费的智能调度能力。
  • 提供优先级、权重控制的分级事件驱动机制。
  • 构建高可靠、可恢复的 Agent 编排引擎。

消息模型提升 AI 通信效率

AI 应用的交互通常具有长耗时、多轮次和高算力成本的特点。当依赖 SSE或 WebSocket 等长连接时,一旦连接中断(如网关重启、超时或网络波动),不仅会话上下文可能丢失,已执行的 AI 任务也会被迫中断,导致昂贵的计算资源被浪费。因此,构建一个可靠的会话管理机制,确保在长时间对话中上下文的连续性与完整性,减少因重连或重试带来的资源消耗,同时降低应用逻辑的复杂性,成为该场景下的关键技术挑战。

针对这一挑战,RocketMQ 提出了一种创新的轻量化架构——其核心理念是:为每个会话或问题动态创建一个独立的轻量级主题(Lite-Topic)。以客户端与 AI 服务建立会话为例,系统自动创建一个以 SessionID 命名的专属队列(如 chatbot/{sessionlD}或 chatbot/{questionlD}),所有会话历史、上下文和中间结果均以消息形式在该主题中有序流转。通过将每个会话隔离在独立的消息通道中,不仅实现了上下文的持久化与顺序保障,也彻底解耦了会话生命周期与长连接状态,为构建高可靠、可恢复的 AI对话系统提供了底层支撑。

image-20251017145753452

这一创新架构的实现,依托于 RocketMQ为 AI场景深度优化的四大核心能力:

  • 百万级 Lite-Topic 支持:单集群可管理百万级轻量主题,为每个会话独立分配 Topic,实现高并发下的会话隔离,性能无损。
  • 全自动轻量管理:Lite-Topic 按需动态创建,连接断开后自动回收,彻底杜绝资源泄漏,运维零干预。
  • 大消息体传输能力:支持数十 MB乃至更大消息,轻松承载长 Prompt、图像、文档等 AIGC典型数据负载。
  • 严格顺序消息保障:在单队列内保证消息有序,确保LLM 流式输出的token 顺序不乱,支撑连贯流畅的交互体验。

从业务模型上来看,轻量级消息模型包括了轻量级发送、轻量级订阅以及全新的消费分发策略。

  • 轻量级发送:
    • 基于百万队列的方案,本质上是一个个 Queue。
    • 从全局上来看,一个轻量级Topic不会存在于每一个 Broker 上,在分配和发送时像顺序 Topic 的发送一样要做 Queue 的 Hash。
    • Queue 的消息是某个 Broker专属的,一个轻量级 Topic的发送在只会到一台Broker,而不是轮询发送。
  • 轻量级订阅:
    • 消费组 Group 的概念被弱化。
    • 订阅关系、消费进度管理粒度更细,以 clientID 维度维护。
    • 新增互斥(Exclusive)消费模式。
    • TTL到期后自动删除订阅关系。
  • 消费分发策略:
    • 客户端发起读请求不再指定 Topic,而是 Broker 根据 client ID 识别订阅关系,并返回多
      个 Topic 的多条消息。
    • 引入类似 Epoll 机制的 Topic ready set,在 POP 请求处理时直接访问就绪的topic。
    • 当订阅上线、新消息发送、消息 ACK(Acknowledgement,确认)后仍有消息、order Lock 释放时往 topic ready set 进行 add 操作。

image-20251017150418687

轻量化消息模型突破了传统消息队列订阅关系单一、隔离粒度粗、管理复杂等局限,通过精细化的资源隔离机制,实现了海量 Lite-Topic 的高效生命周期管理与低延迟消息投递。该模型为 AI 场景下的会话管理、上下文持久化以及多 Agent 间的异步协同,提供了高可靠、易扩展的全新架构解决方案。

应用案例:阿里巴巴安全团队“安全小蜜”智能助手

阿里巴巴安全团队推出的“安全小蜜”智能助手,在应对大规模并发会话时,曾面临会话上下文丢失、任务中断导致资源浪费等挑战。

通过引入 RocketMQ 的 Lite-Topic 能力重构会话保持机制,“安全小蜜”成功实现了会话状态的自动持久化与快速恢复。这不仅能够在多轮对话中,对用户的安全问题进行快速、精准的理解和响应,还大幅简化了工程实现复杂度,有效降低了因任务中断引发的资源浪费,整体提升了用户体验与业务处理效率。

目前,阿里云多个产品线的 AI 答疑机器人也已采用该方案完成升级,进一步验证了该架构在多样化 AI 场景下的通用性与有效性。

基于消息驱动的智能化资源调度

大模型服务普遍面临两大核心资源调度难题:

  • 负载不匹配:前端请求常突发波动,而后端算力资源有限且稳定,直接对接易引发服务过载或利用率不足,难以实现稳定服务与资源效率的平衡。
  • 资源分配无差别:在流量被平滑后,仍需解决关键问题——如何优先保障高价值任务(如 VIP 请求、核心业务)的资源获取,以最大化算力的服务价值。

RocketMQ 不仅实现了流量的平滑缓冲,更通过优先级与配额机制,赋予系统智能调度与资源优化的能力,推动消息系统从被动队列向主动控制中枢演进。开发者无需自研复杂调度中间件,即可实现对 AI流量的精细化管控。其核心能力包括:

  • 天然削峰填谷,保护 AI算力:消息队列作为“流量水库”,可缓存突发请求,使后端 AI 服务按自身处理能力自适应消费,实现负载均衡,避免因瞬时高峰导致服务崩溃或资源闲置。
  • 定速消费,精准控制算力使用:支持为消费者组(ConsumerGroup)设置消费配额(quota),实现稳定速率消费。开发者可精确设定每秒调用次数,在保障模型服务稳定的前提下,最大化GPU 利用率与系统吞吐。
  • 优先级调度,实现智能资源分配:在资源竞争场景下,支持多维度调度策略:
    • 抢占式优先级:将 VIP 请求、关键任务标记为高优先级消息,确保其优先被消费,保障核心业务响应质量。
    • 权重动态分配:在多租户共享算力池场景中,可根据业务重要性或执行状态动态调整消息优先级,平衡吞吐效率与资源公平性,防止个别租户“资源饥饿”

image-20251017150939670

通过 RocketMQ 的综合调度能力,可以高效稳定的实现资源管理。用户将请求统一写入RocketMQ突发流量被暂存为“待处理会话”。AI 推理服务按自身处理能力设置定速平滑消费,避免雪崩或空转,保障服务稳定性。当有更高优先级的用户消息进入时会被标记,系统优先调度处理,确保高价值客户获得毫秒级响应体验。而当多个业务线共享算力池时,根据 SLA和执行状态动态调整消息优先级,保障核心业务的同时,避免低优先级租户长期得不到资源。

应用案例:阿里云大模型服务平台百炼、通义灵码

阿里云大模型服务平台百炼的网关系统通过引入 RocketMQ 实现了对请求流量的削峰填谷,有效将前端不规则的访问压力转化为平稳、可控的后端算力调度。同时,借助 RocketMQ 的消息优先级功能,根据用户的请求流量设置合理的优先级,避免了大流量用户请求导致小流量用户分配不到算力资源,显著提升了资源利用率和服务公平性。

通义灵码通过 RocketMQ 将其 codebase RAG 架构从原有的同步流程升级为异步流程,实现代码向量化与流量削峰填谷,保障了系统全链路的稳定性。

异步通信枢纽:AI 工作流

Google 提出的 A2A 协议推荐采用异步通信机制来解决 AI 任务长耗时带来的同步阻塞问题。其核心机制是将一次请求 - 响应(Request-Reply)调用,解耦为一个初始请求和一个异步通知(pushNotificationConfig)。在各类 Agentic AI 平台的工作流中,每个节点执行完任务后都需要向下游节点通知执行结果,而异步通信正是支撑这种复杂协作的关键。

由于 AI 任务普遍运行时间长,工作流场景同样需要解决“同步调用导致级联阻塞”的问题。无论是 Agent 之间的外部通信,还是工作流内部的任务流转,都面临一个共同挑战:如何优雅地处理长耗时任务,避免系统阻塞?核心解决方案是采用统一的架构模式——将长耗时、有状态的交互,转化为由无状态、事件驱动的可靠异步通知机制来连接

前文提到,Apache RocketMQ 全新推出的 Lite-Topic 机制,凭借其轻量化、自动化的动态管理能力,可高效实现 Request-Reply 模式的异步通信。核心流程如下:

  • 动态创建回复通道:当 Agent A 向 Agent B 发起请求时(如 message/send),无需同步等待响应。而是在请求中嵌入唯一的动态回复地址,例如 a2a-topic/{taskID}。同时,Agent A 订阅该地址,RocketMQ 会在首次连接时自动创建这个轻量化的 Sub-Topic,相当于为本次任务开辟了一个专属的异步通信通道。

  • 异步投递执行结果:Agent B 按照自己的节奏处理任务。在任务完成后,它将结果封装为消息,直接发布到请求中指定的回复地址 a2a-topic/{taskID}。

  • 自动回收通信资源:当 Agent A 成功接收并处理完结果后,会断开与该 Lite-Topic 的连接。RocketMQ 的智能资源管理机制会检测到该 Topic 已无消费者,并在设定的 TTL(Time-To-Live)后自动清理该 Topic 资源。整个过程完全自动化,无需人工干预,杜绝了资源泄露的风险。

RocketMQ 的 Lite-Topic 方案优势在于其系统性的设计:百万级 Lite-Topic 的海量并发能力,结合按需创建、用后即焚的零开销资源管理,从根本上解决了大规模 Agent 协作场景下的扩展性与易用性问题。同时,顺序消息保障机制确保了流式或多步任务的逻辑正确,而内置的持久化与高可用机制则保障了异步通信的最终一致性与可靠性。这些能力共同为 A2A 场景构建了一个真正健壮、高效且可扩展的异步通信基础设施。

应用案例:阿里 AI 实验室

阿里 AI 实验室在其多 AI Agent 工作流中,基于 RocketMQ 构建了一套高效、可靠的 Agent 编排体系。工作流中的每个节点均采用事件驱动架构,实现可靠、持久化的通信。借助 Lite-Topic 机制,还能实现 Agent 之间的节点级通信,从而实现任务流程的精细化编排。

在多 Agent 协同执行 AI 任务的过程中,即使遇到 Agent 发布重启、调用超时等情况导致完整任务链中断,也能通过持久化事件流的可靠重试,继续推进中断的 AI 任务,既有效避免了资源浪费,又显著提升了用户体验。

架构解析:RocketMQ for AI 的关键技术升级

为实现前文所述的创新模型,Apache RocketMQ 需具备在单个集群中高效管理百万级 Lite-Topic 的能力,但原有架构在支持该能力时面临两大核心挑战:在存储层面,原先基于文件的索引和元数据管理机制已难以支撑如此量级的 Topic;在消息分发投递过程中,当单个消费者订阅大量的 Lite-Topic 时,旧有的长轮询通知机制在延迟和并发性能上也显得捉襟见肘。

因此,要实现海量 Lite-Topic 的高效管理,必须攻克以下两个关键技术难题:

  • 百万级 Lite-Topic 的元数据存储与索引结构的技术方案;
  • 面向海量 Lite-Topic 订阅场景的高效消息分发与投递机制。

image-20251017152009567

百万级 Lite-Topic 的数量级跃升,意味着索引和元数据无法沿用之前的模型。若为每个主题维护一个或者多个基于物理文件的索引结构,将带来巨大的系统开销和运维负担。

为此,Apache RocketMQ 基于其 LMQ 存储引擎 和 KV Store 能力,重新设计了元数据管理和索引存储:

  • 统一存储、多路分发:所有消息在底层的 CommitLog 文件中仅存储一份,但通过多路分发机制,可以为不同的 Lite-Topic 生成各自的消费索引(ConsumerQueue,简称 CQ)。

  • 索引存储引擎升级:摒弃了传统的文件型 CQ 结构,替换为高性能的 KV 存储引擎 RocksDB。通过将队列索引信息和消息物理偏移量(Physical Offset)作为键值对存储,充分发挥 RocksDB 在顺序写入方面的高性能优势,从而实现对百万级队列的高效管理。

在 Lite-Topic 存储模型的基础上,RocketMQ 进一步对消息分发与投递机制进行优化,针对单个消费者订阅上万个 Lite-Topic 的场景,重新设计了一套创新的事件驱动拉取(Event-Driven Pull)机制,如图 3 所示:

  • 订阅关系(Subscription Set)管理:Broker 负责管理消费者订阅关系 Subscription 的 Lite-Topic Set,并支持增量更新,从而能够实时、主动地感知消息与订阅的匹配状态。

  • 事件驱动与就绪集(Ready Set)维护:每当有新消息写入,Broker 会立即根据其维护的 Subscription Set 进行匹配,并将符合条件的消息(或其索引)添加到为消费者维护的 Ready Set 中。

  • 高效 Poll Ready Set:消费者只需对 Ready Set 发起 poll 请求,即可从 Ready Set 中获取所有匹配的消息。这种方式允许 Broker 将来自不同主题、不同流量的消息进行合并与攒批,在一次响应中高效地返回给消费者,显著降低了网络交互频率,从而提升整体性能。

通过在存储层与分发机制的创新升级,Apache RocketMQ 有效解决了 Lite-Topic 模型的关键挑战:在存储层面,采用高性能的 RocksDB 替代传统文件索引,实现了对百万级元数据的高效管理;在消息分发层面,通过创新的“事件驱动拉取”模型,由 Broker 主动维护订阅集与就绪集,将消费者的海量轮询转变为对聚合消息的单次高效拉取,确保了在海量订阅场景下的低延迟与高吞吐。