rocketmq消息消费流程源码分析(三)

本篇我们主要关注定时消费的流程。

定时消息示例

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class  {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("TopicTest_1" ,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//设置定时level
msg.setDelayTimeLevel(3);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
//设置NameServer的地址
consumer.setNamesrvAddr("127.0.0.1:9876");
//设置从哪开始消息,CONSUME_FROM_FIRST_OFFSET:从队列当前最小偏移量开始消费,CONSUME_FROM_LAST_OFFSET:从队列当前最大偏移量开始消费,CONSUME_FROM_TIMESTAMP:从消费者启动时间戳开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅主题,subExpression:消息过滤表达式,TAG或者是SQL92表达式
consumer.subscribe("TopicTest_1", "*");
//注册并发消费消息的事件监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
//消费消息
for (int i = 0; i < msgs.size(); i++) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs.get(i));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//启动consumer
consumer.start();
System.out.printf("Consumer Started.%n");
}
}

目前Rokect MQ支持的延迟时间有

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

对应的延迟级别有,从1到18共18个延迟级别

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18

Rocket MQ不支持任意时间精度的延迟消息

实现原理

延迟消息的核心思路是:当消息设置了DELAY这个属性后(也即setDelayTimeLevel()),由producer发出以后,在broker端都会存到一个Topic下即SCHEDULE_TOPIC_XXXX,不同的延迟级别会对应不同的队列。当延迟时间到了,由定时线程读取该队列的消息,然后转换成真实的TOPIC以及队列中,相应的consumer才会消费。下面我们来看具体实现。

消息存储

由于在消息存储流程中我们具体分析过,这里不在赘述,可以参看RocketMQ消息存储流程源码分析(一),我们直接看和定时消息相关的代码。

CommitLog

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class CommitLog {
public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
//省略代码
//获取消息类型
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
//如果是非事务消息或者是事务消息的Commit消息
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// 获取延迟级别
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
//将Topic修改为SCHEDULE_TOPIC_XXXX,队列ID修改为延迟级别-1
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

// 保存真实的Topic即队列ID
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
//省略代码。。。
}
}

定时消费

ScheduleMessageService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
public class ScheduleMessageService extends ConfigManager {

//在DefaultMessageStore的load方法中调用
public boolean load() {
boolean result = super.load();
//解析延迟级别,见parseDelayLevel的分析
result = result && this.parseDelayLevel();
return result;
}

public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);

//获取延时时间,延迟时间为"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
try {
//循环进行解析,并将延迟级别: 延迟时间放到map中去,如1:1s 2:5s
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);

int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
this.delayLevelTable.put(level, delayTimeMillis);
}
} catch (Exception e) {
log.error("parseDelayLevel exception", e);
log.info("levelString String = {}", levelString);
return false;
}

return true;
}

//定时任务启动
public void start() {
//循环遍历delayLevelTable(在parseDelayLevel方法中填充值)
for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
Integer level = entry.getKey();
Long timeDelay = entry.getValue();
Long offset = this.offsetTable.get(level);
if (null == offset) {
offset = 0L;
}
//每一个延迟级别都会有一个定时器来执行TimerTask
if (timeDelay != null) {
this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
}
}

//每10s钟将延迟队列扫描处理的进度持久化到delayOffset.json文件中。
this.timer.scheduleAtFixedRate(new TimerTask() {

public void run() {
try {
ScheduleMessageService.this.persist();
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
}

class DeliverDelayedMessageTimerTask extends TimerTask {
private final int delayLevel;
private final long offset;

public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
this.delayLevel = delayLevel;
this.offset = offset;
}


public void run() {
try {
this.executeOnTimeup();
} catch (Exception e) {
// XXX: warn and notify me
log.error("ScheduleMessageService, executeOnTimeup exception", e);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
}
}

private long correctDeliverTimestamp(final long now, final long deliverTimestamp) {
long result = deliverTimestamp;
long maxTimestamp = now + ScheduleMessageService.this.delayLevelTable.get(this.delayLevel);
if (deliverTimestamp > maxTimestamp) {
result = now;
}
return result;
}

public void executeOnTimeup() {
//获取定时任务消费队列ConsumeQueue
ConsumeQueue cq =
ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
long failScheduleOffset = offset;
if (cq != null) {
//取出ConsumeQueue偏移量为offset的数据,包含多条消息
SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
if (bufferCQ != null) {
try {
long nextOffset = offset;
int i = 0;
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
//循环执行
for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
long offsetPy = bufferCQ.getByteBuffer().getLong();
int sizePy = bufferCQ.getByteBuffer().getInt();
long tagsCode = bufferCQ.getByteBuffer().getLong();
//tagsCode和时间有关???在构建ConsumeQueue的时候,如果未定时消息,则会将tagsCode转换成到期的时间戳
if (cq.isExtAddr(tagsCode)) {
if (cq.getExt(tagsCode, cqExtUnit)) {
tagsCode = cqExtUnit.getTagsCode();
} else {
//can't find ext content.So re compute tags code.
log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
tagsCode, offsetPy, sizePy);
long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
}
}

long now = System.currentTimeMillis();
long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);

nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long countdown = deliverTimestamp - now;
//如果已经到期了,则恢复Topic及队列ID重新写入CommitLog;如果未到期,则过countdown再执行此定时任务
if (countdown <= 0) {
//查询消息
MessageExt msgExt =
ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
offsetPy, sizePy);
//如果消息不为空,则将消息转换成MessageExtBrokerInner,也即恢复之前的Topic及队列ID,再写入到commitlog里
if (msgExt != null) {
try {
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult =
ScheduleMessageService.this.defaultMessageStore
.putMessage(msgInner);
//如果写成功了,则继续写吓一条消息
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
//如果写失败了,则10s后再执行此定时任务
} else {
// XXX: warn and notify me
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
/*
* XXX: warn and notify me
*/
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
}
} else {
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
countdown);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
}
} // end of for
//全部执行完以后,则100ms后再执行此定时任务
nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
return;
} finally {

bufferCQ.release();
}
} // end of if (bufferCQ != null)
//如果根据offset未取到SelectMappedBufferResult,则纠正下次定时任务的offset为当前定时任务队列的最小值
else {

long cqMinOffset = cq.getMinOffsetInQueue();
if (offset < cqMinOffset) {
failScheduleOffset = cqMinOffset;
log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
+ cqMinOffset + ", queueId=" + cq.getQueueId());
}
}
} // end of if (cq != null)

ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
failScheduleOffset), DELAY_FOR_A_WHILE);
}
}

消息写入到CommitLog之后,就会执行正常的如构建ConsumeQueue等,消费端就可以进行消费了。