【阅读笔记】rocketmq 远程通信 (二)

分析 rocketmq 如何实现基本的远程通信

rocketmq-remoting 工程实现 rocketmq 的远程通信,组件间大部分的通信都是通过这个模块实现。

rocketmq-remoting 使用 netty 实现的,基于 TCP 的应用层协议。协议格式在 RemotingCommand 定义,一个数据包由数据包长度,header 长度,header,body 四部分组成,如下:

+--------+---------------+------+------+
|        |               |      |      |
| length | header length | head | body |
|        |               |      |      |
+--------+---------------+------+------+

header 包含几个部分

作用
code 表示命令的类型,在 RequestCode 定义
language 表示编写的语言,如 java、C、go
version rocketmq 的版本
opaque 在一个 tcp channel 中唯一,用于标识 request 和 response 的对应关系
flag 保存多个信息;RPC类型,如 1 表示 RPC_ONEWAY;命令类型, 包含两类数据包 REQUEST_COMMAND (请求命令)和 RESPONSE_COMMAND(应答命令)
remark 描述信息
extFields 用户定义部分,长度可变

body 就是要发送的命令数据

实现

概述

rocketmq-remoting 类图 部署图

remoting 主要由 NettyRemotingServer 和 NettyRemotingClient 分别实现 server, clinet。

相同点

  1. server 可以向 clinet 发送 request 命令, client 也可以向 server 发送命令
  2. 都有 3 种调用方法 sync, async 和 one way,但 server 只能用已经有的 channel 发送, client 可以创建新的 channel 然后发送
  3. 都可以通过 registerProcessor 注册一个命令处理器,用于命令特定命令
  4. 可以更新 namesrv 列表

不同点

  1. server 有侦听端口(这个当然了)
  2. server 可以设定默认的 processor, clinet 不能

NettyRemotingAbstract

server 和 client 的公共部分由 NettyRemotingAbstract 实现,包括:

  • request processor 的注册以及命令的处理, response 命令的处理
  • sync, async, one way 三种调用方法的实现
  • NettyEvent 的处理。 NettyEvent 是 remoting 定义的,在 netty 连接有变化时触发的事件
    NettyEventExecutor 可以执行注册了的 ChannelEventListener
  • async 和 one way 调用的流量控制

request & response 的处理

当子类收到 RemotingCommand 数据时,调用 processMessageReceived 方法,processMessageReceived 根据命令的类型是(type)决定如果是 REQUEST_COMMAND,则调用 processRequestCommand,如果是 RESPONSE_COMMAND 则调用 processResponseCommand

request

1
HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>> processorTable

成员 processorTable 保存着注册的 request code 和 processor 的对应关系,ExecutorService 是执行的线程池
当 processRequestCommand 就是用 processorTable 中找到对应的 processor,然后构造一个 Runnable 提交到指定的线程池

这个 Runnable 调用 processRequest 方法得到 response 类型的 RemotingCommand,如果是 sync 或 async 调用方法,就
调用 writeAndFlush 方法将 response 写回调用方

response

1
ConcurrentMap<Integer , ResponseFuture> responseTable

成员 responseTable 保存返回的 response 类型 RemotingCommand

当向远端发送一个 request 时,NettyRemotingAbstract 生成一个 opaque 和一个 ResponseFuture 保存到 responseTable
当远端发送的 response 到达时,由 processResponseCommand 方法处理,processResponseCommand 方法根据返回的 opaque
找到对应的 ResponseFuture,设置返回结果,然后调用回调(async)或唤醒等待 ResponseFuture 的线程(sync)

sync, async, one way 三种调用方法的实现

  • sync 同步调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public RemotingCommand (final Channel channel,
final RemotingCommand request,
final long timeoutMillis) {

...

// 生成 future 并放到 responseTable
ResponseFuture responseFuture = new ResponseFuture
responseTable.put(opaque, responseFuture)

// 向远端发送命令
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
...
});

// 等待写 responseFuture,
// 等收到对应的 Response 时,唤醒这个线程
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);

...
}
  • async 异步调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

public void invokeAsyncImpl(final Channel channel,
final RemotingCommand request,
final long timeoutMillis,
// 多了个 callback
final InvokeCallback invokeCallback)

...

// 调用次数控制
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS)
if (acquired) {

// 生成 future 并放到 responseTable
// 这个 future 设置了 callback
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);

// 向远端发送命令
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
...
});

// 直接返回
// 等收到对应的 Response 时,会调用对应的 callback
}

...
}
  • one way
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void invokeOnewayImpl(final Channel channel,
final RemotingCommand request,
final long timeoutMillis) {

...

boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);

// 调用次数控制
if (acquired) {

// 向远端发送命令
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
...
});

// 直接返回,连 response future 都不需要
}
...
}

ResponseFuture

保存异步调用的结果。异步调用时远程命令时,先 new 一个 ResponseFuture 对象保存到 responseTable
待远程调用返回时,将结果写到 ResponseFuture

ResponseFuture 通过成员 opaque 和 processChannel 标识出唯一的请求,
countDownLatch 成员用于实现同步调用,waitResponse 时 await,有结果时 countDown
responseCommand 和 cause 保存调用的结果

NettyLogger

桥接 netty 的 log 到 rocketmq。

netty 没有默认支持一种日志框架,而是自动检测用户使用什么日志框架,它就用什么日志
https://segmentfault.com/a/1190000005797595

netty 使用 InternalLoggerFactory 实现日志框架检测并生成 logger,而 NettyLogger 就是将 InternalLoggerFactory 的 logger 工厂设置成 NettyBridgeLoggerFactory

这个工厂生成的 Logger 就是 InternalLogger,即 rocketmq-logging 模块的 logger

NettyRemotingServer

用 netty 实现一个 server

server 创建时会检测能不能用 epool,如果可以就用 EpollEventLoopGroup, 否则用就 NioEventLoopGroup

start 方法启动 netty

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void start()  {
...

ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
// 设定 tcp 连接队列
.option(ChannelOption.SO_BACKLOG, 1024)
// server 的惯用选项,就侦听的端口可重复侦听
.option(ChannelOption.SO_REUSEADDR, true)
// 关闭长连接
.option(ChannelOption.SO_KEEPALIVE, false)
// 关闭 Nagle 算法,一有数据立马发送
.childOption(ChannelOption.TCP_NODELAY, true)
// TCP 读写窗口大小
.childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
.childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())
// 设置侦听哪个网卡和端口
.localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
.childHandler(new ChannelInitializer<SocketChannel>() {

public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
.addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, new HandshakeHandler(TlsSystemConfig.tlsMode))
.addLast(defaultEventExecutorGroup,
// 继承 MessageToByteEncoder<RemotingCommand>,将 RemotingCommand 写到 ByteBuffer
new NettyEncoder(),
// 继承 LengthFieldBasedFrameDecoder,从字节流解析出 RemotingCommand
new NettyDecoder(),
// 空闲连接处理器
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
// netty 连接管理,NettyEvent 就是通过这个发出
new NettyConnectManageHandler(),
// SimpleChannelInboundHandler<RemotingCommand>
// 调用基类 processMessageReceived 方法
new NettyServerHandler()
);
}
});

...
}

server 有一个 Timer 定时调用 scanResponseTable 清理过期的 response future

NettyRemotingClient

用 netty 实现一个 client

start 方法启动 netty

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void start() {

Bootstrap handler =
this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, false)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
.option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
.option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
// 设置的 handler 基本与 server 一样
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (nettyClientConfig.isUseTLS()) {
if (null != sslContext) {
pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
log.info("Prepend SSL handler");
} else {
log.warn("Connections are insecure as SSLContext is null!");
}
}

pipeline.addLast(
defaultEventExecutorGroup,
new NettyEncoder(),
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
new NettyConnectManageHandler(),
new NettyClientHandler());
}
});

...
}

与 server 一样, client 也有一个 Timer 定时调用 scanResponseTable 清理过期的 response future

序列化

远程通信需要只能传送字节流,所以在发送 request 或 response 前,需要先将对象转换成字节流,而接受到远端的数据时,需要将字节流转换成数据,这就是序列化和反序列化的职责

所有的需要传输的实体类都在 rocketmq-common 工程的 org.apache.rocketmq.common.protocol 的子包中定义,他们之中需要序列化的都会继承 rocketmq-remoting 工程下的 RemotingSerializable

示例 Register Broker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

// 生成 header
RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
... // set fields

// 生成 body
RegisterBrokerBody requestBody = new RegisterBrokerBody();
... // set fields

final byte[] body = requestBody.encode(compressed); // 将对象转换成 json 形式 byte []

// 生成 RemotingCommand
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);

// 发送
remotingClient.invokeSync(addr, request, timeoutMillis);