【阅读笔记】rocketmq 特性实现 —— 消息跟踪

介绍 rocketmq 消息跟踪功能

消息跟踪,可以跟踪消息由哪个 client 产生,保存在哪个 broker , 是否被消费以及由哪个 client 消费等一系统消息的属性。 这个功能可以方便消息的调试的故障排查。

示例

producer 打开消息跟踪的方法

1
2
3


DefaultMQProducer producer = new DefaultMQProducer("producerGroup", true);

consumer 打开消息跟踪的方法

1
2
3


DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup", true);

消息跟踪会用消息的形式保存到 RMQ_SYS_TRACE_TOPIC 这个 topic, 可以用 console 查看,也可以自行解析 topic 里的消息查看。

实现

producer 消息跟踪

当传入的 enableMsgTrace 参数为 true 时,注册一个 message hook, 进行消息跟踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

public class extends ClientConfig implements MQProducer {

private TraceDispatcher traceDispatcher = null;

public DefaultMQProducer(final String producerGroup, RPCHook rpcHook, boolean enableMsgTrace,final String customizedTraceTopic) {

......

if (enableMsgTrace) {
try {
// 如果 customizedTraceTopic 为空, 就用 RMQ_SYS_TRACE_TOPIC
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
// 注册 message hook
this.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
}

SendMessageTraceHookImpl 获取产生的消息的信息,然后通过 AsyncTraceDispatcher 发送到 RMQ_SYS_TRACE_TOPIC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

public class SendMessageTraceHookImpl implements SendMessageHook {

private TraceDispatcher localDispatcher;

public SendMessageTraceHookImpl(TraceDispatcher localDispatcher) {
this.localDispatcher = localDispatcher;
}


public String hookName() {
return "SendMessageTraceHook";
}

@Override
public void sendMessageBefore(SendMessageContext context) {
// 如果是跟踪消息本身就跳过
if (context == null ||
context.getMessage().getTopic().startsWith(
((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())
) {

return;
}

//build the context content of TuxeTraceContext
TraceContext tuxeContext = new TraceContext();
tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));
// 这里设置了 MqTraceContext, 在 sendMessageAfter 会用到
context.setMqTraceContext(tuxeContext);

// 跟踪类型
tuxeContext.setTraceType(TraceType.Pub);

// producer group
tuxeContext.setGroupName(context.getProducerGroup());

//build the data bean object of message trace
TraceBean traceBean = new TraceBean();
traceBean.setTopic(context.getMessage().getTopic());
traceBean.setTags(context.getMessage().getTags());
traceBean.setKeys(context.getMessage().getKeys());
traceBean.setStoreHost(context.getBrokerAddr());
traceBean.setBodyLength(context.getMessage().getBody().length);
traceBean.setMsgType(context.getMsgType());

tuxeContext.getTraceBeans().add(traceBean);
}

@Override
public void sendMessageAfter(SendMessageContext context) {
// 如果是跟踪消息本身就跳过
// MqTraceContext 在 sendMessageBefore 中有设置
if (context == null ||
context.getMessage().getTopic().startsWith(
((AsyncTraceDispatcher) localDispatcher).getTraceTopicName()
) ||
context.getMqTraceContext() == null) {
return;
}

if (context.getSendResult() == null) {
return;
}

// broker 可以设置关闭或打开消息跟踪
if (context.getSendResult().getRegionId() == null || !context.getSendResult().isTraceOn()) {
// if switch is false,skip it
return;
}

TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();
TraceBean traceBean = tuxeContext.getTraceBeans().get(0);

// 发送消息所花的时间
int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());
tuxeContext.setCostTime(costTime);

// 是否发送成功
if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
tuxeContext.setSuccess(true);
} else {
tuxeContext.setSuccess(false);
}
tuxeContext.setRegionId(context.getSendResult().getRegionId());
traceBean.setMsgId(context.getSendResult().getMsgId());
traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);

// append 到 dispatcher
localDispatcher.append(tuxeContext);
}
}

consumer 消息跟踪

当传入的 enableMsgTrace 参数为 true 时,注册一个 message hook, 进行消息跟踪

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

private TraceDispatcher traceDispatcher = null;

public DefaultMQPushConsumer(final String consumerGroup, RPCHook rpcHook,
AllocateMessageQueueStrategy allocateMessageQueueStrategy, boolean enableMsgTrace, final String customizedTraceTopic) {

......

// 同 producer 一样
if (enableMsgTrace) {
try {
AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
// 注册 message hook
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageTraceHookImpl(traceDispatcher));
} catch (Throwable e) {
log.error("system mqtrace hook init failed ,maybe can't send msg trace data");
}
}
}
}


public class ConsumeMessageTraceHookImpl implements ConsumeMessageHook {

private TraceDispatcher localDispatcher;

public ConsumeMessageTraceHookImpl(TraceDispatcher localDispatcher) {
this.localDispatcher = localDispatcher;
}

@Override
public String hookName() {
return "ConsumeMessageTraceHook";
}

@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}

TraceContext traceContext = new TraceContext();
// 在 consumeMessageAfter 里会用到
context.setMqTraceContext(traceContext);
traceContext.setTraceType(TraceType.SubBefore);
traceContext.setGroupName(context.getConsumerGroup());

// 每个消息构造一个 TraceBean 放到 beans
List<TraceBean> beans = new ArrayList<TraceBean>();
for (MessageExt msg : context.getMsgList()) {
if (msg == null) {
continue;
}
String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);

if (traceOn != null && traceOn.equals("false")) {
// If trace switch is false ,skip it
continue;
}

TraceBean traceBean = new TraceBean();
traceBean.setTopic(msg.getTopic());
traceBean.setMsgId(msg.getMsgId());
traceBean.setTags(msg.getTags());
traceBean.setKeys(msg.getKeys());
traceBean.setStoreTime(msg.getStoreTimestamp());
traceBean.setBodyLength(msg.getStoreSize());
traceBean.setRetryTimes(msg.getReconsumeTimes());
traceContext.setRegionId(regionId);
beans.add(traceBean);
} // End of for

// append 到 dispatcher
if (beans.size() > 0) {
traceContext.setTraceBeans(beans);
traceContext.setTimeStamp(System.currentTimeMillis());
localDispatcher.append(traceContext);
}
}

@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();

if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {
// If subbefore bean is null ,skip it
return;
}

TraceContext subAfterContext = new TraceContext();
subAfterContext.setTraceType(TraceType.SubAfter);
subAfterContext.setRegionId(subBeforeContext.getRegionId());
subAfterContext.setGroupName(subBeforeContext.getGroupName());
subAfterContext.setRequestId(subBeforeContext.getRequestId());
subAfterContext.setSuccess(context.isSuccess());

// Caculate the cost time for processing messages
int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());
subAfterContext.setCostTime(costTime);

subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
if (contextType != null) {
subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
}

// append 到 dispatcher
localDispatcher.append(subAfterContext);
}
}

AsyncTraceDispatcher

写跟踪消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
public class AsyncTraceDispatcher implements TraceDispatcher {

private ArrayBlockingQueue<TraceContext> traceContextQueue;

private ArrayBlockingQueue<Runnable> appenderQueue;

public void start(String nameSrvAddr) throws MQClientException {
// 设置为已启动
if (isStarted.compareAndSet(false, true)) {
traceProducer.setNamesrvAddr(nameSrvAddr);
traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
traceProducer.start();
}

// 创建一个写消息的线程
this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
this.worker.setDaemon(true);
this.worker.start();

// 注册 shutdown hook, 进程退出时执行 flush 方法
this.registerShutDownHook();
}

// 将 TraceContext 写到 Queue
@Override
public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((TraceContext) ctx);
if (!result) {
log.info("buffer full" + discardCount.incrementAndGet() + " ,context is " + ctx);
}
return result;
}

// 等待 traceContextQueue 被清空
@Override
public void flush() throws IOException {
// The maximum waiting time for refresh,avoid being written all the time, resulting in failure to return.
long end = System.currentTimeMillis() + 500;
while (traceContextQueue.size() > 0 || appenderQueue.size() > 0 && System.currentTimeMillis() <= end) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
break;
}
}
log.info("------end trace send " + traceContextQueue.size() + " " + appenderQueue.size());
}

// 收集 TraceContext, 然后批量发送
class AsyncRunnable implements Runnable {
private boolean stopped;

@Override
public void run() {
while (!stopped) {
// 一批 context
List<TraceContext> contexts = new ArrayList<TraceContext>(batchSize);
for (int i = 0; i < batchSize; i++) {
TraceContext context = null;
try {
// 从 traceContextQueue 获取数据,这里会阻塞
context = traceContextQueue.poll(5, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
if (context != null) {
contexts.add(context);
} else {
break;
}
}

if (contexts.size() > 0) {
// 发送消息
AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
traceExecuter.submit(request);
} else if (AsyncTraceDispatcher.this.stopped) {
this.stopped = true;
}
}
}
}


class AsyncAppenderRequest implements Runnable {

// 要发送的 TraceContext
List<TraceContext> contextList;

public AsyncAppenderRequest(final List<TraceContext> contextList) {
if (contextList != null) {
this.contextList = contextList;
} else {
this.contextList = new ArrayList<TraceContext>(1);
}
}

@Override
public void run() {
sendTraceData(contextList);
}

public void sendTraceData(List<TraceContext> contextList) {
Map<String, List<TraceTransferBean>> transBeanMap = new HashMap<String, List<TraceTransferBean>>();

for (TraceContext context : contextList) {
if (context.getTraceBeans().isEmpty()) {
continue;
}
// Topic value corresponding to original message entity content
String topic = context.getTraceBeans().get(0).getTopic();

// 用消息的原始 topic 进行分组
String key = topic;
List<TraceTransferBean> transBeanList = transBeanMap.get(key);
if (transBeanList == null) {
transBeanList = new ArrayList<TraceTransferBean>();
transBeanMap.put(key, transBeanList);
}

// 将 context 转换为 TraceTransferBean
TraceTransferBean traceData = TraceDataEncoder.encoderFromContextBean(context);
transBeanList.add(traceData);
}

for (Map.Entry<String, List<TraceTransferBean>> entry : transBeanMap.entrySet()) {
flushData(entry.getValue());
}
}

/**
* Batch sending data actually
*/
private void flushData(List<TraceTransferBean> transBeanList) {
if (transBeanList.size() == 0) {
return;
}

// Temporary buffer
StringBuilder buffer = new StringBuilder(1024);
int count = 0;
Set<String> keySet = new HashSet<String>();

for (TraceTransferBean bean : transBeanList) {
// Keyset of message trace includes msgId of or original message
keySet.addAll(bean.getTransKey());
buffer.append(bean.getTransData());
count++;

// Ensure that the size of the package should not exceed the upper limit.
if (buffer.length() >= traceProducer.getMaxMessageSize()) {
sendTraceDataByMQ(keySet, buffer.toString());
// Clear temporary buffer after finishing
buffer.delete(0, buffer.length());
keySet.clear();
count = 0;
}
}

// 处理最后的 bean
if (count > 0) {
sendTraceDataByMQ(keySet, buffer.toString());
}
transBeanList.clear();
}

// 发跟踪信息到 mq
private void sendTraceDataByMQ(Set<String> keySet, final String data) {
String topic = traceTopicName;

// 新建一个 message
final Message message = new Message(topic, data.getBytes());

// Keyset of message trace includes msgId of or original message
message.setKeys(keySet);
try {
// traceProducer 是在 AsyncTraceDispatcher 中 new 出来的 DefaultMQProducer
// traceBrokerSet 里是有效的 broker name,这些 broker 有 topic
Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(), topic);

// 异步调用的 callback
SendCallback callback = new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}

@Override
public void onException(Throwable e) {
log.info("send trace data ,the traceData is " + data);
}
};

if (traceBrokerSet.isEmpty()) {
// No cross set
traceProducer.send(message, callback, 5000);
} else {
// 从 traceBrokerSet 里选择一个队列
traceProducer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// arg 就是 traceBrokerSet
Set<String> brokerSet = (Set<String>) arg;

List<MessageQueue> filterMqs = new ArrayList<MessageQueue>();
for (MessageQueue queue : mqs) {
if (brokerSet.contains(queue.getBrokerName())) {
filterMqs.add(queue);
}
}

int index = sendWhichQueue.getAndIncrement();
int pos = Math.abs(index) % filterMqs.size();
if (pos < 0) {
pos = 0;
}
return filterMqs.get(pos);
}
}, traceBrokerSet, callback);
}
} catch (Exception e) {
log.info("send trace data,the traceData is" + data);
}
}
} // end of AsyncAppenderRequest
}