NSQ源码学习笔记(三)

上一篇的最后一段代码中,channel中的消息在发送至客户端时,也同步了一份消息发送到了inFight队列中

1
subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)

这里其实一开始不是很明白,在上网查阅了资料后,了解到inFlight队列是NSQ用来实现消息至少投递一次的。知道了功能后,再来看就很明了了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *Channel) (msg *Message, clientID int64, timeout time.Duration) error {
now := time.Now()
msg.clientID = clientID
msg.deliveryTS = now
msg.pri = now.Add(timeout).UnixNano()
err := c.pushInFlightMessage(msg)
if err != nil {
return err

}
c.addToInFlightPQ(msg)
return nil

}

上述代码中,首先初始化消息的过期时间timeout+now,通过将msg加入到InFlight队列中,InFlight其实是一个堆排序队列,优先级是按照超时时间来排序的,越靠近过期时间,将会越靠前。这里只是将消息存入队列,那么在哪里消费呢?我们在第一篇笔记中的末尾,Nsqd在完成监听部分的初始化后,有四个自启动的goroutine,第一个通过Wrap启动的n.queueScanLoop()就是用来执行消费的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
func (n *NSQD) queueScanLoop() {

workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)

//任务结果 队列
responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)

// 用来优雅关闭
closeCh := make(chan int)
// 利用Ticket来定期开始任务和调整worker
workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

channels := n.channels()
// 调整worker
n.resizePool(len(channels), workCh, responseCh, closeCh)

for {
select {
case <-workTicker.C: // 开始一次任务的派发
if len(channels) == 0 {
continue

}
case <-refreshTicker.C: // 重新调整 worker 数量
channels = n.channels()
n.resizePool(len(channels), workCh, responseCh, closeCh)
continue
case <-n.exitChan:exitChan// 退出
goto exit

}

// num最大为nsqd的所有channel总数
num := n.getOpts().QueueScanSelectionCount
if num > len(channels) {
num = len(channels)

}

loop:
// 随机取出num个channel, 派发给 worker 进行 扫描
for _, i := range util.UniqRands(num, len(channels)) {
workCh <- channels[i]

}

// 接收 扫描结果, 统一 有多少 channel 是 "脏" 的
numDirty := 0
for i := 0; i < num; i++ {
if <-responseCh {
numDirty++

}

}

// 假如 "脏" 的 "比例" 大于阀值, 则不等待 workTicker
// 马上进行下一轮 扫描
if float64(numDirty) / float64(num) > n.getOpts().QueueScanDirtyPercent {
goto loop

}

}

exit:
n.logf("QUEUESCAN: closing")
close(closeCh)
workTicker.Stop()
refreshTicker.Stop()

}

// resizePool adjusts the size of the pool of queueScanWorker goroutines
//
// 1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
//
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
// 校验启动的worker数量,最大为nsqd的所有channel数 * 1/4,
idealPoolSize := int(float64(num) * 0.25)
if idealPoolSize < 1 {
idealPoolSize = 1

} else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
idealPoolSize = n.getOpts().QueueScanWorkerPoolMax

}
for {
// 当前启动的worker数等于设定的idealPoolSize,那么直接返回,
// 如果大于了idealPoolSize,通过closeCh关闭一个worker
// 如果未达到idealPoolSize,启动worker的goroutine
if idealPoolSize == n.poolSize {
break

} else if idealPoolSize < n.poolSize {
// contract
closeCh <- 1
n.poolSize--

} else {
// expand
n.waitGroup.Wrap(func() {
n.queueScanWorker(workCh, responseCh, closeCh)

})
n.poolSize++

}

}

}

worker的具体实现是queueScanWorker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
// queueScanWorker receives work (in the form of a channel) from queueScanLoop
// and processes the deferred and in-flight queues
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
for {
select {
case c := <-workCh:
now := time.Now().UnixNano()
dirty := false
// 实现消息至少被投递一次
if c.processInFlightQueue(now) {
dirty = true

}
// 实现延迟消息队列
if c.processDeferredQueue(now) {
dirty = true

}
// 如果有过期消息的存在,则dirty
responseCh <- dirty
case <-closeCh:
return

}

}

}

func (c *Channel) processInFlightQueue(t int64) bool {
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()

if c.Exiting() {
return false

}

dirty := false
for {
c.inFlightMutex.Lock()
// 从队列中获取已经过期的消息
msg, _ := c.inFlightPQ.PeekAndShift(t)
c.inFlightMutex.Unlock()

if msg == nil {
goto exit

}
dirty = true
// 如果获取到了符合条件的msg,按msg.ID将msg在infight队列中删除
_, err := c.popInFlightMessage(msg.clientID, msg.ID)
if err != nil {
goto exit

}
atomic.AddUint64(&c.timeoutCount, 1)
c.RLock()
client, ok := c.clients[msg.clientID]
c.RUnlock()
if ok {
client.TimedOutMessage()

}
// 消息在channel中发起重新投递
c.doRequeue(msg)

}

exit:
return dirty

}

// 延迟消息队列的实现
func (c *Channel) processDeferredQueue(t int64) bool {
c.exitMutex.RLock()
defer c.exitMutex.RUnlock()

if c.Exiting() {
return false

}

dirty := false
for {
c.deferredMutex.Lock()
item, _ := c.deferredPQ.PeekAndShift(t)
c.deferredMutex.Unlock()

if item == nil {
goto exit

}
dirty = true

msg := item.Value.(*Message)
_, err := c.popDeferredMessage(msg.ID)
if err != nil {
goto exit

}
c.doRequeue(msg)

}

exit:
return dirty

}

  上面的两个函数processDeferredQueueprocessInFlightQueue的实现基本一致,那为什么相同的逻辑要实现两次呢。两个队列,DeferredQueue 用 head 包实现, InFlightQueue 自己又实现了一次heap, 其实跟 DeferredQueue 不是一样的么?

  之前两个就真是是一样的, 后来有一个提交,里面的注释是: this eliminates the use of container/heap and the associated cost of boxing and interface type assertions.

https://github.com/nsqio/nsq/commit/74bfde101934700cb0cd980d01b6dfe2fe5a6a53”

  意思就是说, 这些 队列里 存的是 Message 这个类型, 如果使用 heap, 需要存到 heap.Item 的 Value 里,而这个value 是一个 interface{} , 赋值 和 取值 都需要做类型推断 和 包装,那么作为 InFlightQueue 这个 “高负荷” 的队列, 减少这种 “类型推断和包装” , 有利于提高性能

测试一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type Item struct {
d1 int
d2 int

}

func BenchmarkT1(b *testing.B) {
q := make([]*Item, 0)Item// 不需要类型推断的 slice
for i := 0; i < b.N; i++ {
q = append(q, &Item{i, i})

}
for _, hero := range q {
hero.d1++

}

}

func BenchmarkT2(b *testing.B) {
q := make([]interface{}, 0)
for i := 0; i < b.N; i++ {
q = append(q, &Item{i, i})

}
for _, hero := range q {
hero.(*Item).d1++d1// 需要做类型推断

}

}

测试结果:

1
2
BenchmarkT1-8           10000000               241 ns/op
BenchmarkT2-8 5000000 332 ns/op