Redis listpack


8种机械键盘轴体对比
本人程序员,要买一个写代码的键盘,请问红轴和茶轴怎么选?

介绍

由来

为了节约内存, 如果有很多节点, Redis将这些节点紧凑地分配在一起, 在以前使用的是ziplist, 但由于其复杂度高, 现在转而使用新的结构listpack. 简而言之, 就是一个紧凑使用内存的列表, 支持双向访问的专用(adhoc)结构

整体结构

在内存中的布局是这样的:

其中, 前2个是header:

total-bytes 32位无符号整数, 表示listpack占用的总字节数(包括total-bytes自身和结尾字节), 有了该字段, 我们就可以从一个listpack的头部跳到尾部, 以便从后向前访问

num-entries 16位无符号整数, 表示listpack的节点数. 但是, 当它的值是65535时, 也就是16位无符号整数能表示的最大值, 就说明节点数未知, 需要扫描一遍才知道. 当扫描之后发现节点数小于65535时, 又会更新成实际的节点数

结尾end-byte是一个特殊的EOF字节, 0xFF

entry结构

如果单独看节点的话, 每个节点有以下部分:

encoding-type 数据的编码方式

entry-data 保存的数据(可能没有)

entry-len 保存了encoding-type和entry-data共占了多少字节

为了简洁, 下文统一将encoding-type与entry-data一共占用的字节称为: entry大小, entry-len里面保存的值就是entry大小(只不过格式是特殊设计的)

encoding-type

小的integer

对于一些比较小的数, 比如1, 2, 66是非常常见的, 为此, 将0到127范围内的数和encoding-type一起编码, 省去了entry-data部分, 也就是0|xxxxxxx形式, 例如:

1
2
"x03" --> 0|0000011 --> 3
"x12" --> 0|0010010 --> 18

短的string

同样的, 短的字符串也很常见, 所以为长度不超过63的字符串如下编码:

1
10|xxxxxx <entry-data>

其中, xxxxxx是6位无符号整数, 表示字符串的长度, 例子:

1
2
"x40" --> 10|000000 --> 空字符串
"x45hello" --> 10|001001 hello --> 字符串"hello"

更大的编码方式

当encoding-type的最高2位都是1时, 此时就是另一种编码方式:

1
2
110|xxxxx yyyyyyyy --> 13位有符号整数
1110|xxxx yyyyyyyy <entry-data> --> 长度不超过4095的字符串

当以上都不能满足时, 才用到下面这些编码方式:

1
2
3
4
5
6
1111|0000 <4 bytes len> <entry-data>  --> 非常大的字符串
1111|0001 <entry-data> --> 16位有符号整数
1111|0010 <entry-data> --> 24位有符号整数
1111|0011 <entry-data> --> 32位有符号整数
1111|0100 <entry-data> --> 64位有符号整数
1111|0101 - 1111|1110 --> 未使用

总结

integer说明
0|xxxxxxx0-127
110|xxxxx yyyyyyyy13位有符号整数
1111|0001 <entry-data>16位有符号整数
1111|0010 <entry-data>24位有符号整数
1111|0011 <entry-data>32位有符号整数
1111|0100 <entry-data>64位有符号整数

注: 没有特殊说明的话, 整数都是以小端方式保存的

string说明
10|xxxxxx <entry-data>长度不超过63的字符串
1110|xxxx yyyyyyyy <entry-data>长度不超过4095的字符串
1111|0000 <4 bytes len> <entry-data>大字符串

entry-len

entry-len有以下特点:

  • 是变长的, 以节省空间
  • 只能从右向左解析

有几个问题:

  • 为什么要有entry-len域?

考虑从右向左迭代时, 我们假定p指向当前节点, 那怎么把p指向上一个节点呢, 应该将p减多少距离? 当前节点是没有上个节点的信息, 但是如果我们将p–, p就指向上个节点的entry-len域的最后一个字节了, 就可以开始解析上一个节点的entry大小了, 于是就知道p要减多少, 就是上个节点, 没有entry-len的话就无法到上个节点了

  • 我们从右向左解析, 又是变长的, 什么时候是个头呢?

为此, 对于entry-len中每个字节, 把最高1位作为标志位, 为1表示左边还有要解析, 否则就是解析完了, 这样每个字节实际只使用了7位

还有一点要注意的是, entry-len是以大端模式保存的, 最高有效位字节在内存最低地址处, 例子:

1
"x20" --> 00100000 --> 32字节

如果长度是500字节呢? 500变成二进制是111110100, 再每7位划分, 实际的编码就变成了:

1
2
3
111110100 --> 00000011 11110100
---------->
地址增长

下面是entry大小与entry-len需要的字节数的关系:

entry大小entry-len需要字节数
0 - 1271字节
128 - 163832字节
16383 - 20971513字节
2097151 - 2684354554字节
268435455 - 343597383675字节
  • 只能从右向左解析, 那在从左往右迭代的时候, 无法读取entry-len的值, 怎么知道指针要加多少到下一个节点呢?

其实, 在从左往右时, 我们已经知道entry大小了, 只要再知道entry-len占了多少字节就行了, 而这直接查上表就行了, 无需解析entry-len中的值了

一个entry的例子

结合上面的内容, 我们看一个保存”hello”的节点的完整内容是怎样的:

首先, “hello”长度是5, 小于63, 编码方式选择10|xxxxxx <entry-data>, 所以前半部分是 x45 hello, entry大小是6个字节, entry-len需要1个字节, 合在一起就是:

1
x45hellox06

源码分析

创建

1
2
3
4
5
6
7
8
unsigned char *(void) {
unsigned char *lp = lp_malloc(LP_HDR_SIZE+1);
if (lp == NULL) return NULL;
lpSetTotalBytes(lp,LP_HDR_SIZE+1);
lpSetNumElements(lp,0);
lp[LP_HDR_SIZE] = LP_EOF;
return lp;
}

编码数据

对于ele指向的数据, 首先尝试以整数的方式编码, 如果成功, 就编码并保存到intenc中, 同时将entry大小保存到enclen中. 不能以整数编码的话, 就只保存entry大小

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
int (unsigned char *ele, uint32_t size, unsigned char *intenc, uint64_t *enclen) {
int64_t v;

if (lpStringToInt64((const char*)ele, size, &v)) {
if (v >= 0 && v <= 127) {
intenc[0] = v;
*enclen = 1;
} else if (v >= -4096 && v <= 4095) {
if (v < 0) v = ((int64_t)1<<13) + v;
intenc[0] = (v>>8) | 0xC0;
intenc[1] = v&0xff;
*enclen = 2;
} else if (v >= -32768 && v <= 32767) {
...
} else if (v >= -8388608 && v <= 8388607) { // 24位有符号整数
...
} else if (v >= -2147483648 && v <= 2147483647) { // 32位有符号整数
...
} else { // 64位有符号整数
...
}
return LP_ENCODING_INT;
} else { // 不能以整数编码
if (size < 64) *enclen = 1 + size;
else if (size < 4096) *enclen = 2 + size;
else *enclen = 5 + size;
return LP_ENCODING_STRING;
}
}

上面函数只针对整数编码, 字符串的编码用的是下面这个函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 编码字符串
void lpEncodeString(unsigned char *buf, unsigned char *s, uint32_t len) {
if (len < 64) {
buf[0] = len | 0x80;
memcpy(buf+1,s,len);
} else if (len < 4096) {
buf[0] = (len >> 8) | 0xE0;
buf[1] = len & 0xff;
memcpy(buf+2,s,len);
} else {
buf[0] = 0xF0;
buf[1] = len & 0xff;
buf[2] = (len >> 8) & 0xff;
buf[3] = (len >> 16) & 0xff;
buf[4] = (len >> 24) & 0xff;
memcpy(buf+5,s,len);
}
}

编/解码entry-len

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

// 编码
unsigned long lpEncodeBacklen(unsigned char *buf, uint64_t l) {
if (l <= 127) {
if (buf) buf[0] = l;
return 1;
} else if (l < 16383) {
if (buf) {
buf[0] = l>>7; // 每7位分割
buf[1] = (l&127)|128; // 低有效位在高地址
}
return 2;
} else if (l < 2097151) {
if (buf) {
buf[0] = l>>14;
buf[1] = ((l>>7)&127)|128;
buf[2] = (l&127)|128;
}
return 3;
} else if (l < 268435455) {
...
} else {
...
}
}

// 解码
uint64_t lpDecodeBacklen(unsigned char *p) {
uint64_t val = 0;
uint64_t shift = 0;
do {
val |= (uint64_t)(p[0] & 127) << shift; // 只取低7位
if ((p[0] & 128) == 0) break; // 最高位为0表示结束
shift += 7;
p--;
if (shift > 28) return UINT64_MAX; // entry-len最多只有5字节
} while(1);
return val;
}

迭代

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
unsigned char *lpFirst(unsigned char *lp) {
lp += LP_HDR_SIZE; // 跳过header
if (lp[0] == LP_EOF)
return NULL;
return lp;
}

unsigned char *lpNext(unsigned char *lp, unsigned char *p) {
((void) lp); /* lp is not used for now. However lpPrev() uses it. */
p = lpSkip(p); // 跳过entry大小 + entry-len占用字节数
if (p[0] == LP_EOF)
return NULL;
return p;
}


unsigned char *lpLast(unsigned char *lp) {
unsigned char *p = lp + lpGetTotalBytes(lp) - 1; // EOF字节
return lpPrev(lp,p);
}

unsigned char *lpPrev(unsigned char *lp, unsigned char *p) {
if (p - lp == LP_HDR_SIZE) // 是第一个节点
return NULL;
p--; /* 上一个节点的最后一个字节(也即entry-len的第一个字节) */
uint64_t prevlen = lpDecodeBacklen(p); // 解析的上个entry大小
prevlen += lpEncodeBacklen(NULL,prevlen); // entry-len占用字节数
return p - prevlen + 1;
}

插入

1
2
3
4
5
6
// 编码ele指向的数据, 并追加到listpack的结尾
unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size) {
uint64_t listpack_bytes = lpGetTotalBytes(lp);
unsigned char *eofptr = lp + listpack_bytes - 1;
return lpInsert(lp, ele, size, eofptr, LP_BEFORE, NULL); // c插入到EOF之前
}

删除

1
2
3
4
// 删除p指向的节点, newp保存的是p的下一个节点的地址, 或者是NULL
unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp) {
return lpInsert(lp, NULL, 0, p, LP_REPLACE, newp);
}

插入和删除操作都是对lpInsert()的包装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, 
unsigned char *p, int where, unsigned char **newp) {
unsigned char intenc[9]; // 整数编码entry大小最多只有9字节
unsigned char backlen[5]; // entry-len最多只有5字节

uint64_t enclen; // 编码ele的内容后, entry大小

// ele为NULL表示删除, 也就是将原节点内容替换成空, 将where改成LP_REPLACE
if (ele == NULL) where = LP_REPLACE;

// 插入统一成插入当前节点之前
if (where == LP_AFTER) {
p = lpSkip(p);
where = LP_BEFORE;
}

/* 保存当前p指向节点的偏移量, realloc后地址可能变了,
* 后续获得该节点的地址就需要用新起始地址+偏移量了 */
unsigned long poff = p - lp;

int enctype; // ele编码方式
if (ele) {
enctype = lpEncodeGetType(ele,size,intenc,&enclen);
} else { // 删除
enctype = -1;
enclen = 0;
}

// 新节点的entry-len占几个字节
unsigned long backlen_size = ele ? lpEncodeBacklen(backlen,enclen) : 0;

uint64_t old_listpack_bytes = lpGetTotalBytes(lp);

uint32_t replaced_len = 0; // 被替换的节点原本占用的字节数(LP_REPLACE时才用到)
if (where == LP_REPLACE) {
replaced_len = lpCurrentEncodedSize(p);
replaced_len += lpEncodeBacklen(NULL,replaced_len);
}

// 原本listpack总空间 + 新节点字节数 - 老节点字节数
uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size
- replaced_len;
if (new_listpack_bytes > UINT32_MAX) return NULL;

unsigned char *dst = lp + poff;

/* 如果空间变大了, 先realloc申请内存, 再移动内存
* 如果空间变小了, 先移动内存, 再realloc回收内存 */
if (new_listpack_bytes > old_listpack_bytes) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}

if (where == LP_BEFORE) {
// 在dst之前插入. 从dst开始整体全部后移, 移动距离就是新节点的总长度(enclen+backlen_size)
memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff);
} else { // LP_REPLACE
// 替换dst指向的节点. 先计算前后空间变化, 把dst的下一个节点开始整体向左或向右移
long lendiff = (enclen+backlen_size)-replaced_len;
memmove(dst+replaced_len+lendiff,
dst+replaced_len,
old_listpack_bytes-poff-replaced_len);
}

// 空间收缩, 要回收尾部的内存
if (new_listpack_bytes < old_listpack_bytes) {
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
dst = lp + poff;
}

// 保存下个节点的地址到newp
if (newp) {
*newp = dst;
if (ele == NULL && dst[0] == LP_EOF) // 删除的是最后一个节点
*newp = NULL;
}

// 保存新节点的数据
if (ele) {
if (enctype == LP_ENCODING_INT) {
memcpy(dst,intenc,enclen); // 整数已经编码好了, 直接复制
} else {
lpEncodeString(dst,ele,size);
}
dst += enclen;
memcpy(dst,backlen,backlen_size); // entry-len字段
dst += backlen_size;
}

// 更新header
if (where != LP_REPLACE || ele == NULL) {
uint32_t num_elements = lpGetNumElements(lp);
if (num_elements != LP_HDR_NUMELE_UNKNOWN) {
if (ele)
lpSetNumElements(lp,num_elements+1);
else
lpSetNumElements(lp,num_elements-1);
}
}
lpSetTotalBytes(lp,new_listpack_bytes);

return lp;
}

下面2个图是对上面函数中memmove部分的图解:

插入节点

取代节点

查找

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 返回第index个节点
unsigned char *lpSeek(unsigned char *lp, long index) {
int forward = 1; // 默认是从左往右

// 根据entry个数和index再决定是从左到右还是从右向左
uint32_t numele = lpGetNumElements(lp);
if (numele != 65535) {
if (index < 0) index = (long)numele + index;
if (index < 0) return NULL; // 加了numele还是<0, 左越界
if (index >= numele) return NULL; // 右越界

// 如果index的位置在listpack的右半部分, 从右边开始, 可以少访问一些
if (index > numele/2) {
forward = 0;
index -= numele; // 从右向左时, index是<0的
}
} else {
// entry个数未知, 并且index<0的话, 只能从右开始
// 如果index>0的话, 从左边开始就好了, 不需要知道entry个数
if (index < 0) forward = 0;
}

// 下面就是利用lpNext() lpPrev()了
if (forward) {
unsigned char *ele = lpFirst(lp);
while (index > 0 && ele) {
ele = lpNext(lp,ele);
index--;
}
return ele;
} else {
unsigned char *ele = lpLast(lp);
while (index < -1 && ele) {
ele = lpPrev(lp,ele);
index++;
}
return ele;
}
}

取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/* 保存的是字符串: 返回字符串的起始地址, 其长度保存在count中
*
* 保存的是整数:
* intbuf == NULL : 返回NULL, 数保存在count中
* intbuf != NULL : 数以字符串形式保存在intbuf中, count是字符串的长度
*/
unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf) {
int64_t val;
uint64_t uval, negstart, negmax;

if (LP_ENCODING_IS_7BIT_UINT(p[0])) {
negstart = UINT64_MAX; // 0-127总是正数
negmax = 0;
uval = p[0] & 0x7f;
} else if (LP_ENCODING_IS_6BIT_STR(p[0])) {
*count = LP_ENCODING_6BIT_STR_LEN(p);
return p+1;
} else if (LP_ENCODING_IS_13BIT_INT(p[0])) {
uval = ((p[0]&0x1f)<<8) | p[1];
negstart = (uint64_t)1<<12;
negmax = 8191;
} else if (LP_ENCODING_IS_16BIT_INT(p[0])) {
uval = (uint64_t)p[1] |
(uint64_t)p[2]<<8;
negstart = (uint64_t)1<<15;
negmax = UINT16_MAX;
} else if (LP_ENCODING_IS_24BIT_INT(p[0])) {
...
} else if (LP_ENCODING_IS_32BIT_INT(p[0])) {
...
} else if (LP_ENCODING_IS_64BIT_INT(p[0])) {
...
} else if (LP_ENCODING_IS_12BIT_STR(p[0])) {
*count = LP_ENCODING_12BIT_STR_LEN(p);
return p+2;
} else if (LP_ENCODING_IS_32BIT_STR(p[0])) {
*count = LP_ENCODING_32BIT_STR_LEN(p);
return p+5;
} else { // 有错误
uval = 12345678900000000ULL + p[0];
negstart = UINT64_MAX;
negmax = 0;
}

// 执行到这表示是整数, 要转换成有符号整数, 用的是补码的知识
if (uval >= negstart) { // 最高一位是1
// 防止unsigned -> signed转换时的UB(undefined behaviour) */
uval = negmax - uval;
val = uval;
val = -val - 1;
} else {
val = uval;
}

if (intbuf) {
*count = snprintf((char*)intbuf,LP_INTBUF_SIZE,"%lld",(long long)val);
return intbuf;
} else {
*count = val;
return NULL;
}
}

补充

有一个疑问就是代码里面通篇都是用的unsigned char *, 而不是char *. 推测原因可能是, 保存的数据有可能是二进制的, 而且传入的数据也都是有size指示长度的. 而且如果用的是char *的话, 看下面这个(具体参见ref) :

char c = 0xFF;
if (c == 0xFF) { // 永远为false, 如果是unsigned char就为true

}

ref

Listpack specification

字节序

补码

Signed to unsigned conversion in C - is it always safe?

Using the == operator to compare a char to 0x80 always results in false?