介绍
由来
为了节约内存, 如果有很多节点, Redis 将这些节点紧凑地分配在一起, 在以前使用的是ziplist
, 但由于其复杂度高, 现在转而使用新的结构listpack
. 简而言之, 就是一个紧凑使用内存的列表, 支持双向访问的专用 (adhoc) 结构
整体结构
在内存中的布局是这样的:

其中, 前 2 个是 header:
total-bytes
32 位无符号整数, 表示 listpack 占用的总字节数 (包括 total-bytes 自身和结尾字节), 有了该字段, 我们就可以从一个 listpack 的头部跳到尾部, 以便从后向前访问
num-entries
16 位无符号整数, 表示 listpack 的节点数. 但是, 当它的值是 65535 时, 也就是 16 位无符号整数能表示的最大值, 就说明节点数未知, 需要扫描一遍才知道. 当扫描之后发现节点数小于 65535 时, 又会更新成实际的节点数
结尾end-byte
是一个特殊的 EOF 字节, 0xFF
entry 结构
如果单独看节点的话, 每个节点有以下部分:

encoding-type
数据的编码方式
entry-data
保存的数据 (可能没有)
entry-len
保存了 encoding-type 和 entry-data 共占了多少字节
为了简洁, 下文统一将 encoding-type 与 entry-data 一共占用的字节称为: entry 大小, entry-len 里面保存的值就是 entry 大小 (只不过格式是特殊设计的)
encoding-type
小的 integer
对于一些比较小的数, 比如 1, 2, 66 是非常常见的, 为此, 将 0 到 127 范围内的数和 encoding-type 一起编码, 省去了 entry-data 部分, 也就是0|xxxxxxx
形式, 例如:
1 2
|
"x03" --> 0|0000011 --> 3 "x12" --> 0|0010010 --> 18
|
短的 string
同样的, 短的字符串也很常见, 所以为长度不超过 63 的字符串如下编码:
其中, xxxxxx
是 6 位无符号整数, 表示字符串的长度, 例子:
1 2
|
"x40" --> 10|000000 --> 空字符串 "x45hello" --> 10|001001 hello --> 字符串"hello"
|
更大的编码方式
当 encoding-type 的最高 2 位都是 1 时, 此时就是另一种编码方式:
1 2
|
110|xxxxx yyyyyyyy --> 13位有符号整数 1110|xxxx yyyyyyyy <entry-data> --> 长度不超过4095的字符串
|
当以上都不能满足时, 才用到下面这些编码方式:
1 2 3 4 5 6
|
1111|0000 <4 bytes len> <entry-data> --> 非常大的字符串 1111|0001 <entry-data> --> 16位有符号整数 1111|0010 <entry-data> --> 24位有符号整数 1111|0011 <entry-data> --> 32位有符号整数 1111|0100 <entry-data> --> 64位有符号整数 1111|0101 - 1111|1110 --> 未使用
|
总结
integer |
说明 |
0|xxxxxxx |
0-127 |
110|xxxxx yyyyyyyy |
13 位有符号整数 |
1111|0001 <entry-data> |
16 位有符号整数 |
1111|0010 <entry-data> |
24 位有符号整数 |
1111|0011 <entry-data> |
32 位有符号整数 |
1111|0100 <entry-data> |
64 位有符号整数 |
注: 没有特殊说明的话, 整数都是以小端方式保存的
string |
说明 |
10|xxxxxx <entry-data> |
长度不超过 63 的字符串 |
1110|xxxx yyyyyyyy <entry-data> |
长度不超过 4095 的字符串 |
1111|0000 <4 bytes len> <entry-data> |
大字符串 |
entry-len
entry-len 有以下特点:
有几个问题:
考虑从右向左迭代时, 我们假定 p 指向当前节点, 那怎么把 p 指向上一个节点呢, 应该将 p 减多少距离? 当前节点是没有上个节点的信息, 但是如果我们将 p–, p 就指向上个节点的 entry-len 域的最后一个字节了, 就可以开始解析上一个节点的 entry 大小了, 于是就知道 p 要减多少, 就是上个节点, 没有 entry-len 的话就无法到上个节点了
- 我们从右向左解析, 又是变长的, 什么时候是个头呢?
为此, 对于 entry-len 中每个字节, 把最高 1 位作为标志位, 为 1 表示左边还有要解析, 否则就是解析完了, 这样每个字节实际只使用了 7 位
还有一点要注意的是, entry-len 是以大端模式保存的, 最高有效位字节在内存最低地址处, 例子:
1
|
"x20" --> 00100000 --> 32字节
|
如果长度是 500 字节呢? 500 变成二进制是111110100
, 再每 7 位划分, 实际的编码就变成了:
1 2 3
|
111110100 --> 00000011 11110100 ----------> 地址增长
|
下面是 entry 大小与 entry-len 需要的字节数的关系:
entry 大小 |
entry-len 需要字节数 |
0 - 127 |
1 字节 |
128 - 16383 |
2 字节 |
16383 - 2097151 |
3 字节 |
2097151 - 268435455 |
4 字节 |
268435455 - 34359738367 |
5 字节 |
- 只能从右向左解析, 那在从左往右迭代的时候, 无法读取 entry-len 的值, 怎么知道指针要加多少到下一个节点呢?
其实, 在从左往右时, 我们已经知道 entry 大小了, 只要再知道 entry-len 占了多少字节就行了, 而这直接查上表就行了, 无需解析 entry-len 中的值了
一个 entry 的例子
结合上面的内容, 我们看一个保存” hello” 的节点的完整内容是怎样的:
首先, “hello” 长度是 5, 小于 63, 编码方式选择10|xxxxxx <entry-data>
, 所以前半部分是 x45 hello
, entry 大小是 6 个字节, entry-len 需要 1 个字节, 合在一起就是:
源码分析
创建
1 2 3 4 5 6 7 8
|
unsigned char *(void) { unsigned char *lp = lp_malloc(LP_HDR_SIZE+1); if (lp == NULL) return NULL; lpSetTotalBytes(lp,LP_HDR_SIZE+1); lpSetNumElements(lp,0); lp[LP_HDR_SIZE] = LP_EOF; return lp; }
|
编码数据
对于 ele 指向的数据, 首先尝试以整数的方式编码, 如果成功, 就编码并保存到 intenc 中, 同时将 entry 大小保存到 enclen 中. 不能以整数编码的话, 就只保存 entry 大小
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
int (unsigned char *ele, uint32_t size, unsigned char *intenc, uint64_t *enclen) { int64_t v; if (lpStringToInt64((const char*)ele, size, &v)) { if (v >= 0 && v <= 127) { intenc[0] = v; *enclen = 1; } else if (v >= -4096 && v <= 4095) { if (v < 0) v = ((int64_t)1<<13) + v; intenc[0] = (v>>8) | 0xC0; intenc[1] = v&0xff; *enclen = 2; } else if (v >= -32768 && v <= 32767) { ... } else if (v >= -8388608 && v <= 8388607) { ... } else if (v >= -2147483648 && v <= 2147483647) { ... } else { ... } return LP_ENCODING_INT; } else { if (size < 64) *enclen = 1 + size; else if (size < 4096) *enclen = 2 + size; else *enclen = 5 + size; return LP_ENCODING_STRING; } }
|
上面函数只针对整数编码, 字符串的编码用的是下面这个函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
void lpEncodeString(unsigned char *buf, unsigned char *s, uint32_t len) { if (len < 64) { buf[0] = len | 0x80; memcpy(buf+1,s,len); } else if (len < 4096) { buf[0] = (len >> 8) | 0xE0; buf[1] = len & 0xff; memcpy(buf+2,s,len); } else { buf[0] = 0xF0; buf[1] = len & 0xff; buf[2] = (len >> 8) & 0xff; buf[3] = (len >> 16) & 0xff; buf[4] = (len >> 24) & 0xff; memcpy(buf+5,s,len); } }
|
编/解码 entry-len
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
unsigned long lpEncodeBacklen(unsigned char *buf, uint64_t l) { if (l <= 127) { if (buf) buf[0] = l; return 1; } else if (l < 16383) { if (buf) { buf[0] = l>>7; buf[1] = (l&127)|128; } return 2; } else if (l < 2097151) { if (buf) { buf[0] = l>>14; buf[1] = ((l>>7)&127)|128; buf[2] = (l&127)|128; } return 3; } else if (l < 268435455) { ... } else { ... } }
uint64_t lpDecodeBacklen(unsigned char *p) { uint64_t val = 0; uint64_t shift = 0; do { val |= (uint64_t)(p[0] & 127) << shift; if ((p[0] & 128) == 0) break; shift += 7; p--; if (shift > 28) return UINT64_MAX; } while(1); return val; }
|
迭代
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
unsigned char lpFirst(unsigned char *lp) { lp += LP_HDR_SIZE; if (lp[0] == LP_EOF) return NULL; return lp; }
unsigned char *lpNext(unsigned char *lp, unsigned char *p) { ((void) lp); p = lpSkip(p); if (p[0] == LP_EOF) return NULL; return p; }
unsigned char *lpLast(unsigned char *lp) { unsigned char *p = lp + lpGetTotalBytes(lp) - 1; return lpPrev(lp,p); }
unsigned char *lpPrev(unsigned char *lp, unsigned char *p) { if (p - lp == LP_HDR_SIZE) return NULL; p--; uint64_t prevlen = lpDecodeBacklen(p); prevlen += lpEncodeBacklen(NULL,prevlen); return p - prevlen + 1; }
|
插入
1 2 3 4 5 6
|
unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size) { uint64_t listpack_bytes = lpGetTotalBytes(lp); unsigned char *eofptr = lp + listpack_bytes - 1; return lpInsert(lp, ele, size, eofptr, LP_BEFORE, NULL); }
|
删除
1 2 3 4
|
unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp) { return lpInsert(lp, NULL, 0, p, LP_REPLACE, newp); }
|
插入和删除操作都是对lpInsert()
的包装:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103
|
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp) { unsigned char intenc[9]; unsigned char backlen[5]; uint64_t enclen;
if (ele == NULL) where = LP_REPLACE;
if (where == LP_AFTER) { p = lpSkip(p); where = LP_BEFORE; }
unsigned long poff = p - lp;
int enctype; if (ele) { enctype = lpEncodeGetType(ele,size,intenc,&enclen); } else { enctype = -1; enclen = 0; }
unsigned long backlen_size = ele ? lpEncodeBacklen(backlen,enclen) : 0;
uint64_t old_listpack_bytes = lpGetTotalBytes(lp);
uint32_t replaced_len = 0; if (where == LP_REPLACE) { replaced_len = lpCurrentEncodedSize(p); replaced_len += lpEncodeBacklen(NULL,replaced_len); }
uint64_t new_listpack_bytes = old_listpack_bytes + enclen + backlen_size - replaced_len; if (new_listpack_bytes > UINT32_MAX) return NULL;
unsigned char *dst = lp + poff;
if (new_listpack_bytes > old_listpack_bytes) { if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL; dst = lp + poff; }
if (where == LP_BEFORE) { memmove(dst+enclen+backlen_size,dst,old_listpack_bytes-poff); } else { long lendiff = (enclen+backlen_size)-replaced_len; memmove(dst+replaced_len+lendiff, dst+replaced_len, old_listpack_bytes-poff-replaced_len); }
if (new_listpack_bytes < old_listpack_bytes) { if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL; dst = lp + poff; }
if (newp) { *newp = dst; if (ele == NULL && dst[0] == LP_EOF) *newp = NULL; } if (ele) { if (enctype == LP_ENCODING_INT) { memcpy(dst,intenc,enclen); } else { lpEncodeString(dst,ele,size); } dst += enclen; memcpy(dst,backlen,backlen_size); dst += backlen_size; }
if (where != LP_REPLACE || ele == NULL) { uint32_t num_elements = lpGetNumElements(lp); if (num_elements != LP_HDR_NUMELE_UNKNOWN) { if (ele) lpSetNumElements(lp,num_elements+1); else lpSetNumElements(lp,num_elements-1); } } lpSetTotalBytes(lp,new_listpack_bytes);
return lp; }
|
下面 2 个图是对上面函数中memmove
部分的图解:


查找
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
|
unsigned char *lpSeek(unsigned char *lp, long index) { int forward = 1;
uint32_t numele = lpGetNumElements(lp); if (numele != 65535) { if (index < 0) index = (long)numele + index; if (index < 0) return NULL; if (index >= numele) return NULL;
if (index > numele/2) { forward = 0; index -= numele; } } else { if (index < 0) forward = 0; }
if (forward) { unsigned char *ele = lpFirst(lp); while (index > 0 && ele) { ele = lpNext(lp,ele); index--; } return ele; } else { unsigned char *ele = lpLast(lp); while (index < -1 && ele) { ele = lpPrev(lp,ele); index++; } return ele; } }
|
取数据
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
|
unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf) { int64_t val; uint64_t uval, negstart, negmax;
if (LP_ENCODING_IS_7BIT_UINT(p[0])) { negstart = UINT64_MAX; negmax = 0; uval = p[0] & 0x7f; } else if (LP_ENCODING_IS_6BIT_STR(p[0])) { *count = LP_ENCODING_6BIT_STR_LEN(p); return p+1; } else if (LP_ENCODING_IS_13BIT_INT(p[0])) { uval = ((p[0]&0x1f)<<8) | p[1]; negstart = (uint64_t)1<<12; negmax = 8191; } else if (LP_ENCODING_IS_16BIT_INT(p[0])) { uval = (uint64_t)p[1] | (uint64_t)p[2]<<8; negstart = (uint64_t)1<<15; negmax = UINT16_MAX; } else if (LP_ENCODING_IS_24BIT_INT(p[0])) { ... } else if (LP_ENCODING_IS_32BIT_INT(p[0])) { ... } else if (LP_ENCODING_IS_64BIT_INT(p[0])) { ... } else if (LP_ENCODING_IS_12BIT_STR(p[0])) { *count = LP_ENCODING_12BIT_STR_LEN(p); return p+2; } else if (LP_ENCODING_IS_32BIT_STR(p[0])) { *count = LP_ENCODING_32BIT_STR_LEN(p); return p+5; } else { uval = 12345678900000000ULL + p[0]; negstart = UINT64_MAX; negmax = 0; }
if (uval >= negstart) { uval = negmax - uval; val = uval; val = -val - 1; } else { val = uval; }
if (intbuf) { *count = snprintf((char)intbuf,LP_INTBUF_SIZE,"%lld",(long long)val); return intbuf; } else { *count = val; return NULL; } }
|
补充
有一个疑问就是代码里面通篇都是用的unsigned char *
, 而不是char *
. 推测原因可能是, 保存的数据有可能是二进制的, 而且传入的数据也都是有size
指示长度的. 而且如果用的是char *
的话, 看下面这个 (具体参见 ref) :
char c = 0xFF;
if (c == 0xFF) {
}
ref
Listpack specification
字节序
补码
Signed to unsigned conversion in C - is it always safe?
Using the == operator to compare a char to 0x80 always results in false?