rocketmq消息存储流程源码分析(二)

本篇主要分析ConsumeQueue和IndexFile的构建过程。ConsumeQueue主要是供消息消费者使用的,IndexFile主要是消息查询时使用的。

ConsumeQueue的构建

首先我们看ReputMessageService,ReputMessageService在DefaultMessageStore启动的时候启动。

ReputMessageService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133

* 构造ConsumerQueue和IndexQueue的线程
*/
class extends ServiceThread {

private volatile long reputFromOffset = 0;

public long getReputFromOffset() {
return reputFromOffset;
}

public void setReputFromOffset(long reputFromOffset) {
this.reputFromOffset = reputFromOffset;
}


public void shutdown() {
for (int i = 0; i < 50 && this.isCommitLogAvailable(); i++) {
try {
Thread.sleep(100);
} catch (InterruptedException ignored) {
}
}

if (this.isCommitLogAvailable()) {
log.warn("shutdown ReputMessageService, but commitlog have not finish to be dispatched, CL: {} reputFromOffset: {}",
DefaultMessageStore.this.commitLog.getMaxOffset(), this.reputFromOffset);
}

super.shutdown();
}

public long behind() {
return DefaultMessageStore.this.commitLog.getMaxOffset() - this.reputFromOffset;
}

private boolean isCommitLogAvailable() {
return this.reputFromOffset < DefaultMessageStore.this.commitLog.getMaxOffset();
}

private void doReput() {
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {

if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}

//从commitLog中获取reputFromOffset对应的mappedFile对应的mappedByteBuffer,且size为mappedByteBuffer的wrotePosition或者是commitPosition减去reputFromOffset,也即写到ByteBuffer且还没构建的消息,包含多条消息
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();

//循环读取消息
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
//从mappedByteBuffer中读取消息并封装成DispatchRequest进行返回
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
int size = dispatchRequest.getMsgSize();

if (dispatchRequest.isSuccess()) {
if (size > 0) {
//调用CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex构建ConsumeQueue和Index
DefaultMessageStore.this.doDispatch(dispatchRequest);

//TODO 长轮询相关,后续分析
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
}
//reputFromOffset进行累加
this.reputFromOffset += size;
readSize += size;
if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
DefaultMessageStore.this.storeStatsService
.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
.addAndGet(dispatchRequest.getMsgSize());
}
//TODO 什么场景下回走这个逻辑?消息size为0?
} else if (size == 0) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {
if (size > 0) {
log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
this.reputFromOffset += size;
} else {
doNext = false;
if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",
this.reputFromOffset);

this.reputFromOffset += result.getSize() - readSize;
}
}
}
}
} finally {
result.release();
}
} else {
doNext = false;
}
}
}


public void run() {
DefaultMessageStore.log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
//1毫秒执行一次
Thread.sleep(1);
this.doReput();
} catch (Exception e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
DefaultMessageStore.log.info(this.getServiceName() + " service end");
}


public String getServiceName() {
return ReputMessageService.class.getSimpleName();
}
}

CommitLogDispatcherBuildConsumeQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
//非事务消息或者是事务提交消息,执行putMessagePositionInfo即构建ConsumeQueue的逻辑
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}

DefaultMessageStore中的putMessagePositionInfo第一步会根据Topic和QueueId获取ConsumeQueue,然后在调用ConsumeQueue的putMessagePositionInfoWrapper进行写入操作,这部分代码我就不贴了,直接看ConsumeQueue。

ConsumeQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public class ConsumeQueue {
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
//ConsumeQueue是否可写
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
//重试30次
for (int i = 0; i < maxRetries && canWrite; i++) {
long tagsCode = request.getTagsCode();
//TODO ConsumeQueueExt.CqExtUnit消息过滤相关,暂不分析
if (isExtWriteEnable()) {
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());

long extAddr = this.consumeQueueExt.put(cqExtUnit);
if (isExtAddr(extAddr)) {
tagsCode = extAddr;
} else {
log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
topic, queueId, request.getCommitLogOffset());
}
}
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
}
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}

private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {

if (offset <= this.maxPhysicOffset) {
return true;
}

//将消息写入ByteBuffer,由ByteBuffer.allocate(CQ_STORE_UNIT_SIZE)分配
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);

//计算出当前要写入消费队列也即ConsumeQueue的偏移量,由cqOffset * 20来算,由于ConsumeQueue中每个消息占用20个字节,cqOffset代表该消息在ConsumeQueue的位置
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;

//根据expectLogicOffset获取最后一个MappedFile
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}

if (cqOffset != 0) {
//验证前面算出来的当前要写入ConsumeQueue的偏移量和根据MappedFile算出的待写入的偏移量,如果相等,则进行写入操作,如果小于,则代表已经写过了,如果大于,则有问题
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();

if (expectLogicOffset < currentLogicOffset) {
log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}

if (expectLogicOffset != currentLogicOffset) {
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
this.maxPhysicOffset = offset;
//将消息写入ConsumeQueue对应的mappedFile的FileChannel
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}

//ConsumeQueue的ByteBuffer刷盘磁盘
public boolean flush(final int flushLeastPages) {
boolean result = this.mappedFileQueue.flush(flushLeastPages);
if (isExtReadEnable()) {
result = result & this.consumeQueueExt.flush(flushLeastPages);
}

return result;
}
}

上面只分析了ConsumeQueue写入ByteBuffer的过程,其实ConsumeQueue也有相应的文件存储,ConsumeQueue中的数据是什么时候刷到磁盘中的呢?在DefaultMessageStore创建的时候会创建FlushConsumeQueueService,这个线程会隔flushIntervalConsumeQueue进行刷盘操作,相关的逻辑请参看FlushConsumeQueueService。

IndexFile的构建

CommitLogDispatcherBuildIndex

1
2
3
4
5
6
7
8
9
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {

public void dispatch(DispatchRequest request) {
//构建Index文件
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}

IndexService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
public class IndexService {
public void buildIndex(DispatchRequest req) {
//获取并创建Index文件,并重试3次
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}

final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return;
}

//如果uniqKey也即消息唯一键不为空,则写到indexFile
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}

//为多个key构建索引,RocketMQ支持为同一个消息建立多个索引
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}

private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
//调用调用indexFile的putKey方法写入IndexFile,参看IndexFile类的分析
for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
//如果文件写满了,则调用retryGetAndCreateIndexFile重新创建一个IndexFile进行写入
indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
return null;
}

ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
}
return indexFile;
}

public IndexFile retryGetAndCreateIndexFile() {
IndexFile indexFile = null;
//重试3次
for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
//
indexFile = this.getAndCreateLastIndexFile();
if (null != indexFile)
break;

try {
log.info("Tried to create index file " + times + " times");
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("Interrupted", e);
}
}

if (null == indexFile) {
this.defaultMessageStore.getAccessRights().makeIndexFileError();
log.error("Mark index file cannot build flag");
}

return indexFile;
}

//获取最后一个IndexFile或者创建一个新的IndexFile并加到indexFileList中
public IndexFile getAndCreateLastIndexFile() {
IndexFile indexFile = null;
IndexFile prevIndexFile = null;
long lastUpdateEndPhyOffset = 0;
long lastUpdateIndexTimestamp = 0;

{
this.readWriteLock.readLock().lock();
if (!this.indexFileList.isEmpty()) {
//取indexFileList的最后一个文件
IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
//如果租后一个文件没有写满,则将最后一个IndexFile返回
if (!tmp.isWriteFull()) {
indexFile = tmp;
//如果写满了,则创建一个新的indexFile,并将这个文件设置成prevIndexFile,启用一个新的线程进行刷盘,代码在@1
} else {
lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
lastUpdateIndexTimestamp = tmp.getEndTimestamp();
prevIndexFile = tmp;
}
}

this.readWriteLock.readLock().unlock();
}

//创建一个新的indexFile
if (indexFile == null) {
try {
String fileName =
this.storePath + File.separator
+ UtilAll.timeMillisToHumanString(System.currentTimeMillis());
indexFile =
new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
lastUpdateIndexTimestamp);
this.readWriteLock.writeLock().lock();
this.indexFileList.add(indexFile);
} catch (Exception e) {
log.error("getLastIndexFile exception ", e);
} finally {
this.readWriteLock.writeLock().unlock();
}

if (indexFile != null) {
//@1将上一个IndexFile进行刷盘
final IndexFile flushThisFile = prevIndexFile;
Thread flushThread = new Thread(new Runnable() {
@Override
public void run() {
IndexService.this.flush(flushThisFile);
}
}, "FlushIndexFileThread");

flushThread.setDaemon(true);
flushThread.start();
}
}

return indexFile;
}
}

IndexFile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
public class IndexFile {

* 刷盘逻辑,由IndexService标记了@1处调用
*/
public void flush() {
long beginTime = System.currentTimeMillis();
if (this.mappedFile.hold()) {
this.indexHeader.updateByteBuffer();
this.mappedByteBuffer.force();
this.mappedFile.release();
log.info("flush index file eclipse time(ms) " + (System.currentTimeMillis() - beginTime));
}
}

public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
//indexCount代表已Index条目列表当前已使用的个数,indexNum代表总共能存储index条目的个数,默认为2000万
if (this.indexHeader.getIndexCount() < this.indexNum) {
//计算key的hash值
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
//计算hash key在hash槽的位置
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

FileLock fileLock = null;

try {
//获取该hash槽存储的Index条目的索引,如果存在冲突,则会组成链表结构
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}

//该消息存储时间与第一条消息的时间戳的差值
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}

//计算Index条目存储的位置
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;

//存储index条目,包括key的hashcode,消息对应的物理偏移量,该消息存储时间与第一条消息的时间戳的差值,该条目的前一条条目的索引
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);

//将Index条目存储的索引写到hash槽
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}

//indexHeader的相关处理
this.indexHeader.incHashSlotCount();
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);

return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}

return false;
}
}

index索引文件结构如下,上面的代码可以结合下面这张图一起看。

indexFile