重试机制

触发消息发送重试机制的条件如下:

  • 客户端消息发送请求调用失败或请求超时
  • 网络异常造成连接失败或请求超时。
  • 服务端节点处于重启或下线等状态造成连接失败。
  • 服务端运行慢造成请求超时。
  • 服务端返回失败错误码
    • 系统逻辑错误:因运行逻辑不正确造成的错误。
    • 系统流控错误:因容量超限造成的流控错误。

消息重试时间间隔如下:

  • 无序消息:重试时间为阶梯时间
    消息重试时间间隔

  • 顺序消息:重试时间为固定时间,默认为3s

RocketMQ的重试机制涉及发送端重试和消费端重试,消费端重试关联死信队列

发送端重试

RocketMQ 在客户端中内置了请求重试逻辑,支持在初始化时配置消息发送最大重试次数(默认为 2 次),失败时会按照设置的重试次数重新发送。直到消息发送成功,或者达到最大重试次数时结束,并在最后一次失败后返回调用错误的响应。对于同步发送和异步发送,均支持消息发送重试

  • 同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败(返回错误码或抛出异常)。
  • 异步发送:调用线程不会阻塞,但调用结果会通过回调的形式,以异常事件或者成功事件返回。

我们也手动设置重试次数。代码示例如下:

1
2
3
4
5
6
public class DefaultMQProducer  {
//设置消息发送失败时的最大重试次数
public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
}

1
2
3
4
DefaultMQProducer producer = new DefaultMQProducer();
producer.setNamesrvAddr("0.0.0.0:9876;0.0.0.1:9876");
producer.setRetryTimesWhenSendFailed(3);
producer.start();

或者在配置文件 application.yaml 中配置

1
2
3
4
5
6
7
8
9
10
rocketmq:  
name-server: 0.0.0.0:9876
producer:
group: producer-group
enable-msg-trace: false
retry-times-when-send-async-failed: 3
retry-times-when-send-failed: 3
send-message-timeout: 60000
consumer:
group: consumer-group

消费端重试

RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+ consumerGroup “的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。

考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至**”%RETRY%+consumerGroup”**的重试队列中。

  • 只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息是不会重试的。

分析了ConsumeMessageConcurrentlyService的源码,Consumer消费完成后会返回一个状态码,ConsumeConcurrentlyStatus是一个记录了并发消费状态的枚举类,共有两种状态

1
2
3
4
5
6
7
8
public enum ConsumeConcurrentlyStatus {
//消费成功
ConsumeConcurrentlyStatus,

//消费失败,一段时间后重试
RECONSUME_LATER;
}

我们拿并发消费时举例,MessageListenerConcurrently 中当捕获到异常时会设置下一级别的消费时间间隔并返回“再次消费”的状态码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override  
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
log.debug("received msg: {}", messageExt);
try {
long now = System.currentTimeMillis();
handleMessage(messageExt);
long costTime = System.currentTimeMillis() - now;
log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
} catch (Exception e) { // 捕获到异常
log.warn("consume message failed. messageExt:{}, error:{}", messageExt, e);
//设置下一个超时时间间隔
context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume);
// 返回状态码
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}

异常重试

RocketMQ可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔,如下:

1
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

我们可以在代码中指定最大重试次数,利用RocketMQConstant.MAX_RETRY_TIMES判断,如果超过最大重试次数,则提前返回“消费成功”,让 MQ 停止重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
/**

* @Description:RocketMQ消息消费者
*/
@Slf4j
@Service
public class MessageConsumer implements MessageListenerConcurrently {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;

private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");


@PostConstruct
public void start() {
try {
consumer.setNamesrvAddr(namesrvAddr);

//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

//设置集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);

//订阅主题
consumer.subscribe("DefaultCluster", "*");

//注册消息监听器
consumer.registerMessageListener(this);

//启动消费端
consumer.start();

log.info("Message Consumer Start...");
System.err.println("Message Consumer Start...");
} catch (MQClientException e) {
log.error("Message Consumer Start Error!!",e);
}

}

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

MessageExt message = msgs.get(0);
try {
//逐条消费
String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " +
message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);

//模拟业务异常
int i = 1 / 0;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("Consume Message Error!!", e);
//抛出异常时,返回ConsumeConcurrentlyStatus.RECONSUME_LATER,尝试重试。当重试指定次数后返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
int reconsumeTimes = message.getReconsumeTimes();
System.err.println("Now Retry Times: " + reconsumeTimes);
if (reconsumeTimes >= RocketMQConstant.MAX_RETRY_TIMES) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}

}

使用监听器监听时,可以在 service 层里抛出异常,RocketMQ 检测到异常后会自动返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 的状态码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Slf4j  
@Component
@RocketMQMessageListener(topic = TopicConstant.RPA_CALLBACK_RESULT_TASK,
consumerGroup = "${rocketmq.consumer.group}"+ "-test-mq", consumeThreadMax = 2)
public class Consumer implements RocketMQListener<T> {
@Resource
TestService testService;

@SneakyThrows
@Override
public void onMessage(String msg) {
log.info("收到信息:msg:{}", msg);
testService.receiveMsgTest(msg);
}
}

超时重试

当 Consumer 处理时间过长,在一定时间内没有返回 Broker 消费状态,Broker 也会自动重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

@Slf4j
@Service
public class MessageConsumer implements MessageListenerConcurrently {
@Value("${spring.rocketmq.namesrvAddr}")
private String namesrvAddr;

private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");


@PostConstruct
public void start() {
try {
consumer.setNamesrvAddr(namesrvAddr);

//从消息队列头部开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

//设置集群消费模式
consumer.setMessageModel(MessageModel.CLUSTERING);

//设置消费超时时间(分钟)
consumer.setConsumeTimeout(RocketMQConstant.CONSUMER_TIMEOUT_MINUTES);

//订阅主题
consumer.subscribe("DefaultCluster", "*");

//注册消息监听器
consumer.registerMessageListener(this);

//启动消费端
consumer.start();

log.info("Message Consumer Start...");
System.err.println("Message Consumer Start...");
} catch (MQClientException e) {
log.error("Message Consumer Start Error!!",e);
}

}

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
if (CollectionUtils.isEmpty(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

MessageExt message = msgs.get(0);
try {
//逐条消费
String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " +
message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);

//模拟耗时操作2分钟,大于设置的消费超时时间
Thread.sleep(1000L * 60 * 2);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
log.error("Consume Message Error!!", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}

}