rocketmq消息消费流程源码分析(二)

本篇我们主要关注顺序消费的流程,Rocket MQ支持局部消息顺序消费,可以确保同一个消费队列中的消息被顺序消费。如果要做到全局有序,则可以将Topic配置成一个订阅,但这牺牲了高可用性。

顺序消费示例

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class  {
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC"};

List<OrderDemo> orderList = new Producer().buildOrders();

Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);

for (int i = 0; i < 10; i++) {
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTestjjjj", tags[i % tags.length], "KEY" + i, body.getBytes());

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {

public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg;
long index = id % mqs.size();
return mqs.get((int)index);
}
}, orderList.get(i).getOrderId());//订单id

System.out.println("Message: " + body);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}

发送结果如下,可以看到orderId为15103111039的消息都发到了broker-a的第3个队列上:

1
2
3
4
5
6
7
8
9
10
Broker: broker-a, queueId: 3, Message: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103111039, desc='创建'}
Broker: broker-a, queueId: 1, Message: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103111065, desc='创建'}
Broker: broker-a, queueId: 3, Message: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103111039, desc='付款'}
Broker: broker-a, queueId: 3, Message: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103117235, desc='创建'}
Broker: broker-a, queueId: 1, Message: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103111065, desc='付款'}
Broker: broker-a, queueId: 3, Message: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103117235, desc='付款'}
Broker: broker-a, queueId: 1, Message: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103111065, desc='完成'}
Broker: broker-a, queueId: 3, Message: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103111039, desc='推送'}
Broker: broker-a, queueId: 3, Message: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103117235, desc='完成'}
Broker: broker-a, queueId: 3, Message: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103111039, desc='完成'}

Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

consumer.subscribe("TopicTestjjjj", "TagA || TagB || TagC");

consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();

public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
System.out.print(Thread.currentThread().getName() + " Receive New Messages: " );
for (MessageExt msg: msgs) {
System.out.println("content: " + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});

consumer.start();
System.out.println("Consumer Started.");
}
}

消费结果如下,我们可以看到订单号为15103111039的消息是按照创建-付款-推送-完成的顺序进行消费的,保证了局部的顺序性,其他两个订单号也都保证了消费的顺序:

1
2
3
4
5
6
7
8
9
10
ConsumeMessageThread_3 Receive New Messages: content: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103111039, desc='创建'}
ConsumeMessageThread_4 Receive New Messages: content: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103111065, desc='创建'}
ConsumeMessageThread_4 Receive New Messages: content: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103111065, desc='付款'}
ConsumeMessageThread_3 Receive New Messages: content: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103111039, desc='付款'}
ConsumeMessageThread_4 Receive New Messages: content: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103111065, desc='完成'}
ConsumeMessageThread_3 Receive New Messages: content: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103117235, desc='创建'}
ConsumeMessageThread_3 Receive New Messages: content: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103117235, desc='付款'}
ConsumeMessageThread_3 Receive New Messages: content: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103111039, desc='推送'}
ConsumeMessageThread_3 Receive New Messages: content: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103117235, desc='完成'}
ConsumeMessageThread_3 Receive New Messages: content: 2019-06-28 22:57:18 Hello RocketMQ OrderDemo{orderId=15103111039, desc='完成'}

综上所述,Rocket MQ支持局部消息顺序消费,在Producer端要保证需要顺序消费的消息发送到同一个消息队列(上例就是将同一个订单号的消息发送到同一个队列),在Consumer端用MessageListenerOrderly这个类来保证同一个消息队列的消息是顺序消费的。接下来我们就来分析一下实现原理。

消息发送

DefaultMQProducerImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class DefaultMQProducerImpl implements MQProducerInner {
private SendResult sendSelectImpl(
Message msg,
MessageQueueSelector selector,
Object arg,
final CommunicationMode communicationMode,
final SendCallback sendCallback, final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
//验证状态是否是RUNNING状态
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
//获取路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
try {
//调用传入的MessageQueueSelector的select方法获取队列ID,此处是根据orderId获取队列ID
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
} catch (Throwable e) {
throw new MQClientException("select message queue throwed exception.", e);
}

long costTime = System.currentTimeMillis() - beginStartTime;
if (timeout < costTime) {
throw new RemotingTooMuchRequestException("sendSelectImpl call timeout");
}
if (mq != null) {
//发送消息逻辑,此处和普通消息的发送逻辑一致,请参看RocketMQ消息发送流程源码分析
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, null, timeout - costTime);
} else {
throw new MQClientException("select message queue return null.", null);
}
}

throw new MQClientException("No route info for this topic, " + msg.getTopic(), null);
}
}

消息消费

ConsumeMessageOrderlyService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
public class ConsumeMessageOrderlyService implements ConsumeMessageService {
//构造函数
public ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl defaultMQPushConsumerImpl,
MessageListenerOrderly messageListener) {
this.defaultMQPushConsumerImpl = defaultMQPushConsumerImpl;
this.messageListener = messageListener;

this.defaultMQPushConsumer = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer();
this.consumerGroup = this.defaultMQPushConsumer.getConsumerGroup();
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();

this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));

this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));
}

public void start() {
//启动定时任务,每隔20秒执行一次锁定分配给自己的消息消费队列
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

public void run() {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
}
}

public synchronized void lockMQPeriodically() {
if (!this.stopped) {
//调用RebalanceImpl的lockAll方法锁定消费队列,见下面分析
this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
}
}

//构造ConsumeRequest,提交到线程池里进行消费,ConsumeRequest如何消费,看下面分析

public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) {
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}

//处理消费结果
public boolean processConsumeResult(
final List<MessageExt> msgs,
final ConsumeOrderlyStatus status,
final ConsumeOrderlyContext context,
final ConsumeRequest consumeRequest
) {
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) {
switch (status) {
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
consumeRequest.getMessageQueue());
//消费成功,执行processQueue的commit方法,会移除这批消息
case SUCCESS:
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
//检查消息重试次数大于或者等于最大重试次数,将该消息发送到broker端,broker端会进入到DLQ,进入DLQ后,将直接调用processQueue的commit方法,会移除这批消息
if (checkReconsumeTimes(msgs)) {
//进行消费重试,将consumingMsgOrderlyTreeMap的数据再放回msgTreeMap,延迟进行消费
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(
consumeRequest.getProcessQueue(),
consumeRequest.getMessageQueue(),
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
default:
break;
}
} else {
//代码省略

if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}

return continueConsume;
}
}

RebalanceImpl

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public abstract class RebalanceImpl {
public void lockAll() {
//将消息队列按照brokerId阻止成HashMap<String, Set<MessageQueue>>的形式
HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();

Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
//遍历brokerId
while (it.hasNext()) {
Entry<String, Set<MessageQueue>> entry = it.next();
final String brokerName = entry.getKey();
final Set<MessageQueue> mqs = entry.getValue();

if (mqs.isEmpty())
continue;

FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
if (findBrokerResult != null) {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(this.consumerGroup);
requestBody.setClientId(this.mQClientFactory.getClientId());
requestBody.setMqSet(mqs);

try {
//返回被锁定的消息队列
Set<MessageQueue> lockOKMQSet =
this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);

//根据MessageQueue获取ProcessQueue,置为锁定状态,并更新锁定时间
for (MessageQueue mq : lockOKMQSet) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
if (!processQueue.isLocked()) {
log.info("the message queue locked OK, Group: {} {}", this.consumerGroup, mq);
}

processQueue.setLocked(true);
processQueue.setLastLockTimestamp(System.currentTimeMillis());
}
}
//遍历当前处理中的消息消费队列(即messageQueue),如果不持有该消费队列的锁,则将处理队列(即pocessQueue)的锁状态置为false,暂停该消费队列的消费拉取与消费
for (MessageQueue mq : mqs) {
if (!lockOKMQSet.contains(mq)) {
ProcessQueue processQueue = this.processQueueTable.get(mq);
if (processQueue != null) {
processQueue.setLocked(false);
log.warn("the message queue locked Failed, Group: {} {}", this.consumerGroup, mq);
}
}
}
} catch (Exception e) {
log.error("lockBatchMQ exception, " + mqs, e);
}
}
}
}
}

ConsumeRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class ConsumeRequest implements Runnable {
private final ProcessQueue processQueue;
private final MessageQueue messageQueue;

public ConsumeRequest(ProcessQueue processQueue, MessageQueue messageQueue) {
this.processQueue = processQueue;
this.messageQueue = messageQueue;
}

public ProcessQueue getProcessQueue() {
return processQueue;
}

public MessageQueue getMessageQueue() {
return messageQueue;
}


public void run() {
//如果消息处理队列为丢弃,则停止本次消费
if (this.processQueue.isDropped()) {
log.warn("run, the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}

//根据消费队列获取一个对象
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
//加锁,含义就是针对这个消费队列messageQueue,同一个时刻只能有一个线程进行消费
synchronized (objLock) {
//如果是广播模式,直接进入消费;如果是集群模式,则判断processQueue被锁定且锁未超时
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
final long beginTime = System.currentTimeMillis();
for (boolean continueConsume = true; continueConsume; ) {
//再次判断消息处理队列是否丢弃,则停止本次消费
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
break;
}
//TODO 以下两个判断为什么?
if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& !this.processQueue.isLocked()) {
log.warn("the message queue not locked, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}

if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
&& this.processQueue.isLockExpired()) {
log.warn("the message queue lock expired, so consume later, {}", this.messageQueue);
ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 10);
break;
}

//当消费时长大于60s(默认),则重新提交到线程池里进行消费
long interval = System.currentTimeMillis() - beginTime;
if (interval > MAX_TIME_CONSUME_CONTINUOUSLY) {
ConsumeMessageOrderlyService.this.submitConsumeRequestLater(processQueue, messageQueue, 10);
break;
}

final int consumeBatchSize =
ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
//从处理队列中顺序取出consumeBatchSize条消息,从processQueue的msgTreeMap取出并删除,且加入到consumingMsgOrderlyTreeMap中
List<MessageExt> msgs = this.processQueue.takeMessags(consumeBatchSize);
if (!msgs.isEmpty()) {
final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);

ConsumeOrderlyStatus status = null;
//执行钩子函数
ConsumeMessageContext consumeMessageContext = null;
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext = new ConsumeMessageContext();
consumeMessageContext
.setConsumerGroup(ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumerGroup());
consumeMessageContext.setMq(messageQueue);
consumeMessageContext.setMsgList(msgs);
consumeMessageContext.setSuccess(false);
// init the consume context type
consumeMessageContext.setProps(new HashMap<String, String>());
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookBefore(consumeMessageContext);
}

long beginTimestamp = System.currentTimeMillis();
ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
boolean hasException = false;
try {
//对处理队列进行加锁
this.processQueue.getLockConsume().lock();
if (this.processQueue.isDropped()) {
log.warn("consumeMessage, the message queue not be able to consume, because it's dropped. {}",
this.messageQueue);
break;
}
//执行真正的消费逻辑
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
RemotingHelper.exceptionSimpleDesc(e),
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
hasException = true;
} finally {
this.processQueue.getLockConsume().unlock();
}

if (null == status
|| ConsumeOrderlyStatus.ROLLBACK == status
|| ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
log.warn("consumeMessage Orderly return not OK, Group: {} Msgs: {} MQ: {}",
ConsumeMessageOrderlyService.this.consumerGroup,
msgs,
messageQueue);
}

long consumeRT = System.currentTimeMillis() - beginTimestamp;
//如果有异常,则设置returnType为EXCEPTION,且
if (null == status) {
if (hasException) {
returnType = ConsumeReturnType.EXCEPTION;
} else {
returnType = ConsumeReturnType.RETURNNULL;
}
} else if (consumeRT >= defaultMQPushConsumer.getConsumeTimeout() * 60 * 1000) {
returnType = ConsumeReturnType.TIME_OUT;
} else if (ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT == status) {
returnType = ConsumeReturnType.FAILED;
} else if (ConsumeOrderlyStatus.SUCCESS == status) {
returnType = ConsumeReturnType.SUCCESS;
}

if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.getProps().put(MixAll.CONSUME_CONTEXT_TYPE, returnType.name());
}

if (null == status) {
status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
//执行消费后的钩子函数
if (ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.hasHook()) {
consumeMessageContext.setStatus(status.toString());
consumeMessageContext
.setSuccess(ConsumeOrderlyStatus.SUCCESS == status || ConsumeOrderlyStatus.COMMIT == status);
ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.executeHookAfter(consumeMessageContext);
}

ConsumeMessageOrderlyService.this.getConsumerStatsManager()
.incConsumeRT(ConsumeMessageOrderlyService.this.consumerGroup, messageQueue.getTopic(), consumeRT);
//调用ConsumeMessageOrderlyService的processConsumeResult方法处理消费结果,参看ConsumeMessageOrderlyService
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
} else {
if (this.processQueue.isDropped()) {
log.warn("the message queue not be able to consume, because it's dropped. {}", this.messageQueue);
return;
}

ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);
}
}
}
}