重试机制
触发消息发送重试机制的条件如下:
- 客户端消息发送请求调用失败或请求超时
- 网络异常造成连接失败或请求超时。
- 服务端节点处于重启或下线等状态造成连接失败。
- 服务端运行慢造成请求超时。
- 服务端返回失败错误码
- 系统逻辑错误:因运行逻辑不正确造成的错误。
- 系统流控错误:因容量超限造成的流控错误。
消息重试时间间隔如下:
无序消息:重试时间为阶梯时间

顺序消息:重试时间为固定时间,默认为3s
RocketMQ的重试机制涉及发送端重试和消费端重试,消费端重试关联死信队列
发送端重试
RocketMQ 在客户端中内置了请求重试逻辑,支持在初始化时配置消息发送最大重试次数(默认为 2 次),失败时会按照设置的重试次数重新发送。直到消息发送成功,或者达到最大重试次数时结束,并在最后一次失败后返回调用错误的响应。对于同步发送和异步发送,均支持消息发送重试。
- 同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败(返回错误码或抛出异常)。
- 异步发送:调用线程不会阻塞,但调用结果会通过回调的形式,以异常事件或者成功事件返回。
我们也手动设置重试次数。代码示例如下:
1 2 3 4 5 6
| public class DefaultMQProducer { public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) { this.retryTimesWhenSendFailed = retryTimesWhenSendFailed; }
|
1 2 3 4
| DefaultMQProducer producer = new DefaultMQProducer(); producer.setNamesrvAddr("0.0.0.0:9876;0.0.0.1:9876"); producer.setRetryTimesWhenSendFailed(3); producer.start();
|
或者在配置文件 application.yaml 中配置
1 2 3 4 5 6 7 8 9 10
| rocketmq: name-server: 0.0.0.0:9876 producer: group: producer-group enable-msg-trace: false retry-times-when-send-async-failed: 3 retry-times-when-send-failed: 3 send-message-timeout: 60000 consumer: group: consumer-group
|
消费端重试
RocketMQ会为每个消费组都设置一个Topic名称为“%RETRY%+ consumerGroup “的重试队列(这里需要注意的是,这个Topic的重试队列是针对消费组,而不是针对每个Topic设置的),用于暂时保存因为各种异常而导致Consumer端无法消费的消息。
考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ对于重试消息的处理是先保存至Topic名称为“SCHEDULE_TOPIC XXXX”的延迟队列中,后台定时任务按照对应的时间进行Delay后重新保存至**”%RETRY%+consumerGroup”**的重试队列中。
- 只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息是不会重试的。
分析了ConsumeMessageConcurrentlyService的源码,Consumer消费完成后会返回一个状态码,ConsumeConcurrentlyStatus是一个记录了并发消费状态的枚举类,共有两种状态
1 2 3 4 5 6 7 8
| public enum ConsumeConcurrentlyStatus { ConsumeConcurrentlyStatus,
RECONSUME_LATER; }
|
我们拿并发消费时举例,MessageListenerConcurrently 中当捕获到异常时会设置下一级别的消费时间间隔并返回“再次消费”的状态码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt messageExt : msgs) { log.debug("received msg: {}", messageExt); try { long now = System.currentTimeMillis(); handleMessage(messageExt); long costTime = System.currentTimeMillis() - now; log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime); } catch (Exception e) { log.warn("consume message failed. messageExt:{}, error:{}", messageExt, e); context.setDelayLevelWhenNextConsume(delayLevelWhenNextConsume); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
|
异常重试
RocketMQ可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔,如下:
1
| messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
|
我们可以在代码中指定最大重试次数,利用RocketMQConstant.MAX_RETRY_TIMES判断,如果超过最大重试次数,则提前返回“消费成功”,让 MQ 停止重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
|
@Slf4j @Service public class MessageConsumer implements MessageListenerConcurrently { @Value("${spring.rocketmq.namesrvAddr}") private String namesrvAddr;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
@PostConstruct public void start() { try { consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("DefaultCluster", "*");
consumer.registerMessageListener(this);
consumer.start();
log.info("Message Consumer Start..."); System.err.println("Message Consumer Start..."); } catch (MQClientException e) { log.error("Message Consumer Start Error!!",e); }
}
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { if (CollectionUtils.isEmpty(msgs)) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
MessageExt message = msgs.get(0); try { String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET); System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " + message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);
int i = 1 / 0; return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("Consume Message Error!!", e); int reconsumeTimes = message.getReconsumeTimes(); System.err.println("Now Retry Times: " + reconsumeTimes); if (reconsumeTimes >= RocketMQConstant.MAX_RETRY_TIMES) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }
}
|
使用监听器监听时,可以在 service 层里抛出异常,RocketMQ 检测到异常后会自动返回 ConsumeConcurrentlyStatus.RECONSUME_LATER 的状态码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Slf4j @Component @RocketMQMessageListener(topic = TopicConstant.RPA_CALLBACK_RESULT_TASK, consumerGroup = "${rocketmq.consumer.group}"+ "-test-mq", consumeThreadMax = 2) public class Consumer implements RocketMQListener<T> { @Resource TestService testService;
@SneakyThrows @Override public void onMessage(String msg) { log.info("收到信息:msg:{}", msg); testService.receiveMsgTest(msg); } }
|
超时重试
当 Consumer 处理时间过长,在一定时间内没有返回 Broker 消费状态,Broker 也会自动重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| @Slf4j @Service public class MessageConsumer implements MessageListenerConcurrently { @Value("${spring.rocketmq.namesrvAddr}") private String namesrvAddr;
private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
@PostConstruct public void start() { try { consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.setConsumeTimeout(RocketMQConstant.CONSUMER_TIMEOUT_MINUTES);
consumer.subscribe("DefaultCluster", "*");
consumer.registerMessageListener(this);
consumer.start();
log.info("Message Consumer Start..."); System.err.println("Message Consumer Start..."); } catch (MQClientException e) { log.error("Message Consumer Start Error!!",e); }
}
@Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { if (CollectionUtils.isEmpty(msgs)) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }
MessageExt message = msgs.get(0); try { String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET); System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " + message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);
Thread.sleep(1000L * 60 * 2); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error("Consume Message Error!!", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }
}
|