【阅读笔记】rocketmq 特性实现 —— 消息重试及死信队列

描述 rocketmq 消息重试和死信队列机制

当消息消费失败时, rocketmq 支持消费失败的消息进行重新消费。当重试一定次数后,会将消息放到死信队列,在系统恢复正常后,可以由死信队列读取原来的消息重新进行消费。

重试实现

不同类型的 consumer 重试的机制不一样,这里只分析 PUSH 模型

Concurrently / CLUSTERING

这个场景,消息消费失败后, consumer 向 broker 发送 CONSUMER_SEND_MSG_BACK 命令将消息存到 %RETRY% topic,同组的 consumer 会再从 %RETRY% topic 拉取消息进行重新消费。

消息重试一定次数后,将会发到死信队列 %DLQ% topic, 不再进行重试,需要手工重新消费。

consumer 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209

consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
int sum = 0;
int ex = 1 / sum;
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

public class implements ConsumeMessageService {

class ConsumeRequest implements Runnable {


public void run() {

......
try {

status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log
hasException = true;
}
......

if (!processQueue.isDropped()) {
ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
}

......
}
}

public void processConsumeResult(
final ConsumeConcurrentlyStatus status,
final ConsumeConcurrentlyContext context,
final ConsumeRequest consumeRequest) {

......

switch (this.defaultMQPushConsumer.getMessageModel()) {
case BROADCASTING:
// 只是打印日志
// 说明 BROADCASTING 模型不会重试
break;
case CLUSTERING:
List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());

// ackIndex + 1 开始就是消费失败的消息, 在这之前的都是消费成功的消息
for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
MessageExt msg = consumeRequest.getMsgs().get(i);
/* 将消息发回 broker
*/
boolean result = this.sendMessageBack(msg, context);

// 如果 sendMessageBack 失败就加入到 msgBackFailed
if (!result) {
// 这里设置了 ReconsumeTimes, 说明这种情况也是计算重试次数的
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}
}

// msgBackFailed 里的消息重新消费
if (!msgBackFailed.isEmpty()) {
consumeRequest.getMsgs().removeAll(msgBackFailed);
this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
}
break;
}
}

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
// 发回 broker 的消息实际上是延时消息,根据重试的次数决定延时的级别
int delayLevel = context.getDelayLevelWhenNextConsume();

try {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}

return false;
}
}

public class DefaultMQPushConsumerImpl implements MQConsumerInner {

public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {

try {
// 选一个 master
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());

/* 同步发送 CONSUMER_SEND_MSG_BACK 命令到 broker
*/
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(),
delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);

// 如果发送异常,就用普通的 producer 发送
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
// topic 设置为 %RETRY%
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());

MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));

// 设置重试次数
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));

// 设置延时级别, 延时投递到 %RETRY%
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

this.mQClientFactory.getDefaultMQProducer().send(newMsg);
}
}
}

broker 代码

``` java

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {

private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
throws RemotingCommandException {

......

// %RETRY% 拼 consumer group 得到实际的 retry topic
String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());

.....

// 创建 RETRY topic
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);

......

// 找到原消息
MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
if (null == msgExt) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("look message by offset failed, " + requestHeader.getOffset());
return response;
}

// 给消息加入 RETRY_TOPIC 属性,说明是重试的消息
final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
if (null == retryTopic) {
MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
}

int delayLevel = requestHeader.getDelayLevel();

int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
}

// 满足条件的话,加入到死信队列
// 如果 consumer 有设置,那么 maxReconsumeTimes 就是设置的值,否则就是默认的 16(写死)
if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) {
// 死信队列的 topic 名
newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

// 创建死信队列
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
newTopic,
DLQ_NUMS_PER_GROUP,
PermName.PERM_WRITE, 0
);
} else {
// 否则设置好延时级别
if (0 == delayLevel) {
delayLevel = 3 + msgExt.getReconsumeTimes();
}

msgExt.setDelayTimeLevel(delayLevel);
}

......

MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(newTopic);
// 复制旧消息的属性

......

// 保存消息, 会处理延时消息
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

......
}
}

Concurrently / BROADCASTING

这种情况不会重试

Orderly

消费代码抛出异常或返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT 的效果是一样的

由于顺序消息需要保证消息在同一个 consumer 下进行消费,所以重试并不需要将消息发回 broker, 而是在 consumer 本地重新消费

当超出最大重试次数时,才会发回 broker。如果还是消费失败,就将消息放到死信队列,同 Concurrently 一样

consumer 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85

public class ConsumeMessageOrderlyService implements ConsumeMessageService {
......

public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
) {

if (context.isAutoCommit()) {
switch (status) {
......
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
// 统计失败
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());

// 检查是否超出最大重试次数
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
// 本地重新消费
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
}
} else {
switch (status) {
......

case SUSPEND_CURRENT_QUEUE_A_MOMENT:
// 同上面是一样的
}
}
}

private boolean checkReconsumeTimes(List<MessageExt> msgs) {
boolean suspend = false;
if (msgs != null && !msgs.isEmpty()) {
for (MessageExt msg : msgs) {
if (msg.getReconsumeTimes() >= getMaxReconsumeTimes()) {
MessageAccessor.setReconsumeTime(msg, String.valueOf(msg.getReconsumeTimes()));
if (!sendMessageBack(msg)) {
suspend = true;
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
}
} else {
suspend = true;
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
}
}
}
return suspend;
}

// 多次重试都不成功,将消息发到 %RETRY% topic
public boolean sendMessageBack(final MessageExt msg) {
try {
// getRetryTopic
Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());
String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}

return false;
}

......
}