领域模型

  1. 消息由生产者初始化并发送到云消息队列 RocketMQ 版服务端。
  2. 消息按照到达云消息队列 RocketMQ 版服务端的顺序存储到主题的指定队列中。
  3. 消费者按照指定的订阅关系从云消息队列 RocketMQ 版服务端中获取消息并消费

image-20241106164027902

RocketMO架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费
    同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

  • NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。NameServer 互相独立,彼此没有通信关系,无状态。

    主要包括两个功能:

    • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
    • 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。

    消息客户端与NameServer、Broker的交互设计要点如下。

    • Broker每隔30s向NameServer集群的每一台机器发送心跳包,包含自身创建的topic路由等信息。
    • 消息客户端每隔30s向NameServer更新对应topic的路由信息。
    • NameServer收到Broker发送的心跳包时会记录时间戳。
    • NameServer每隔10s会扫描一次brokerLiveTable(存放心跳包的时间戳信息),如果在120s内没有收到心跳包,则认为Broker失效,更新topic的路由信息,将失效的Broker信息移除。
    • 更新上述路由表(HashTable)使用了锁粒度较少的读写锁,允许多个消息发送者并发读操作,保证消息发送时的高并发。同一时刻NameServer只处理一个Broker心跳包,多个心跳包请求串行执行。
  • BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子模块。

    • Remoting Module:整个Broker的实体,负责处理来自clients端的请求
    • Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
    • Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
    • HA Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
    • HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
    • Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。

主题

主题是用于标识同一类业务的消息。主题是一个逻辑概念,并不是真实存在的消息容器。

在拆分主题时,需要考虑拆分粒度:

  • 消息类型:顺序消息和普通消息要使用不同的主题
  • 消息业务:业务上没有关联性的要使用不同的主题
  • 消息量级:数量级或时效性不同的业务建议使用不同的主题

创建和管理主题会占用一定系统资源,生产环境下,主题管理应尽量避免自动化机制。

队列

队列是消息存储和传输的实际容器,主题中有多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。

队列的主要作用

  • 存储顺序性:先进先出,通过offset对消息进行标记管理
  • 流式操作语义:可确保消息从任意offset读取任意数量的消息,实现类似聚合读取、回溯读取等特性

常见队列增加场景

  • 增加队列实现物理节点负载均衡
  • 增加队列实现顺序消息性能扩展
  • 非顺序消息消费的负载均衡与队列数无关

顺序消息

异构系统间需要维持强一致的同步,上游的事件变更需要按照顺序传递到下游进行处理,这类场景下需要使用顺序消息保证数据传输的顺序。

应用场景

数据实时增量同步:当涉及到系统同步数据库中数据时,往往需要使用顺序消息。否则,在查询过程中易出现读取到已删除的数据的情况。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//顺序消息发送。
MessageBuilder messageBuilder = null;
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//设置顺序消息的排序分组,该分组尽量保持离散,避免热点排序分组。
.setMessageGroup("fifoGroup001")
//消息体。
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}
//消费顺序消息时,需要确保当前消费者分组是顺序投递模式,否则仍然按并发乱序投递。
//消费示例一:使用PushConsumer消费顺序消息,只需要在消费监听器处理即可。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView);
//根据消费结果返回状态。
return ConsumeResult.SUCCESS;
}
};
//消费示例二:使用SimpleConsumer消费顺序消息,主动获取消息进行消费处理并提交消费结果。
//需要注意的是,同一个MessageGroup的消息,如果前序消息没有消费完成,再次调用Receive是获取不到后续消息的。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}

定时/延时消息

应用场景

1 分布式定时调度

在分布式定时调度场景下,需要实现精确的定时任务,比如每晚22:00执行一次数据同步,每隔5分钟触发一次任务推送。
RocketMQ在其中就充当一个定时任务触发器,我们发送到MQ上的消息会根据固定的时刻推送。

2 任务超时处理

电商支付场景下,订单下单后暂未支付,不可以直接关闭订单,而需要等待一段事件后才能关闭订单。
RocketMQ 可以实现超时任务的检查触发。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//定时/延时消息发送
MessageBuilder messageBuilder = new MessageBuilder();
//以下示例表示:延迟时间为10分钟之后的Unix时间戳。
Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
.setDeliveryTimestamp(deliverTimeStamp)
//消息体
.setBody("messageBody".getBytes())
.build();
try {
//发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
System.out.println(sendReceipt.getMessageId());
} catch (ClientException e) {
e.printStackTrace();
}

//消费示例一:使用PushConsumer消费定时消息,只需要在消费监听器处理即可。
MessageListener messageListener = new MessageListener() {
@Override
public ConsumeResult consume(MessageView messageView) {
System.out.println(messageView.getDeliveryTimestamp());
//根据消费结果返回状态。
return ConsumeResult.SUCCESS;
}
};

//消费示例二:使用SimpleConsumer消费定时消息,主动获取消息进行消费处理并提交消费结果。
List<MessageView> messageViewList = null;
try {
messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
messageViewList.forEach(messageView -> {
System.out.println(messageView);
//消费处理完成后,需要主动调用ACK提交消费结果。
try {
simpleConsumer.ack(messageView);
} catch (ClientException e) {
e.printStackTrace();
}
});
} catch (ClientException e) {
//如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
e.printStackTrace();
}

事务消息

应用场景

分布式系统调用的特点是一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,为了保证核心业务和多个下游业务的执行结果一致,需要用到事务消息。

以电商场景为例,用户支付订单后,会修改物流发货、积分变更、购物车状态清空等多个子系统的变更。

事务消息处理流程

事务消息

  1. 生产者将消息发送至云消息队列 RocketMQ 版服务端。
  2. 云消息队列 RocketMQ 版服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为“暂不能投递”,这种状态下的消息即为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

使用示例

事务消息相比普通消息发送时需要修改以下几点:

  • 发送事务消息前,需要开启事务并关联本地的事务执行。
  • 为保证事务一致性,在构建生产者时,必须设置事务检查器和预绑定事务消息发送的主题列表,客户端内置的事务检查器会对绑定的事务主题做异常状态恢复。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
//演示demo,模拟订单表查询服务,用来确认订单事务是否提交成功。
private static boolean checkOrderById(String orderId) {
return true;
}

//演示demo,模拟本地事务的执行结果。
private static boolean doLocalTransaction() {
return true;
}

public static void main(String[] args) throws ClientException {
ClientServiceProvider provider = new ClientServiceProvider();
MessageBuilder messageBuilder = new MessageBuilder();
//构造事务生产者:事务消息需要生产者构建一个事务检查器,用于检查确认异常半事务的中间状态。
Producer producer = provider.newProducerBuilder()
.setTransactionChecker(messageView -> {
/**
* 事务检查器一般是根据业务的ID去检查本地事务是否正确提交还是回滚,此处以订单ID属性为例。
* 在订单表找到了这个订单,说明本地事务插入订单的操作已经正确提交;如果订单表没有订单,说明本地事务已经回滚。
*/
final String orderId = messageView.getProperties().get("OrderId");
if (Strings.isNullOrEmpty(orderId)) {
// 错误的消息,直接返回Rollback。
return TransactionResolution.ROLLBACK;
}
return checkOrderById(orderId) ? TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
})
.build();
//开启事务分支。
final Transaction transaction;
try {
transaction = producer.beginTransaction();
} catch (ClientException e) {
e.printStackTrace();
//事务分支开启失败,直接退出。
return;
}
Message message = messageBuilder.setTopic("topic")
//设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
//设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
//一般事务消息都会设置一个本地事务关联的唯一ID,用来做本地事务回查的校验。
.addProperty("OrderId", "xxx")
//消息体。
.setBody("messageBody".getBytes())
.build();
//发送半事务消息
final SendReceipt sendReceipt;
try {
sendReceipt = producer.send(message, transaction);
} catch (ClientException e) {
//半事务消息发送失败,事务可以直接退出并回滚。
return;
}
/**
* 执行本地事务,并确定本地事务结果。
* 1. 如果本地事务提交成功,则提交消息事务。
* 2. 如果本地事务提交失败,则回滚消息事务。
* 3. 如果本地事务未知异常,则不处理,等待事务消息回查。
*
*/
boolean localTransactionOk = doLocalTransaction();
if (localTransactionOk) {
try {
transaction.commit();
} catch (ClientException e) {
// 业务可以自身对实时性的要求选择是否重试,如果放弃重试,可以依赖事务消息回查机制进行事务状态的提交。
e.printStackTrace();
}
} else {
try {
transaction.rollback();
} catch (ClientException e) {
// 建议记录异常信息,回滚异常时可以无需重试,依赖事务消息回查机制进行事务状态的提交。
e.printStackTrace();
}
}
}

死信消息

消息消费异常进行消费重试时,达到最大重试次数后会转为死信状态,RocketMQ 支持将这些死信消息(Dead-Letter Message)保存至死信队列(Dead-Letter Queue),方便后续进行业务恢复或回溯。

应用场景

消息重试失败后,可以选择将死信消息存储到指定的死信 topic 中,创建另一个消费者组消费死信消息来处理异常链路或分析死信消息。

使用限制

  • 死信Topic只支持普通消息和顺序消息类型的Topic,事务消息类型和定时消息类型的Topic不能作为死信Topic。
  • 不支持将生产原消息的Topic作为死信Topic(避免出现循环雪崩的问题)。在死信消息转存流程中,若系统发现死信Topic和生产消息的原Topic相同,则该条消息将被丢弃。
  • 不同Topic的死信消息可以保存到同一个Topic中。
  • 删除某个ConsumerGroup时,对应的死信Topic不会被删除。
  • 若某个Topic被死信策略引用,删除该Topic前,您必须先解除该Topic的死信策略关系

死信队列具备以下特点:

  • RocketMQ会自动为需要死信队列的ConsumerGroup创建死信队列。
  • 死信队列与ConsumerGroup对应,死信队列中包含该ConsumerGroup所有相关topic的死信消息。
  • 死信队列中消息的有效期与正常消息相同,默认48小时。
  • 若要消费死信队列中的消息,需在控制台将死信队列的权限设置为6,即可读可写。

Kafka、RabbitMQ和RocketMQ

RabbitMQ Kafka RocketMQ
吞吐量 万级(5.95w/s) 10万级(17.3w/s) 10万级(11.6w/s)
时效性 微秒级 毫秒级 毫秒级
可用性 基于主从架构的高可用性。但集群扩展麻烦。 非常高。支持集群部署 非常高。分布式架构
消息可靠性 经过参数优化配置,可以保证数据不丢失 kafka收到消息后会写入磁盘缓冲区,没有直接落到物理磁盘上,机器故障可能会导致磁盘缓冲区内数据消失 经过参数优化配置,可以保证数据不丢失
性能稳定性 消息堆积时,性能不稳定 分区多时性能不稳定 队列较多、消息堆积时性能稳定