redis数据结构set, zset, hash, string类型的会用到dict。zset, hash类型在数据量少的时候会在ziplist, 数据量大时,zset使用dict + skiplist的数据结构,dict来存储value和score的映射关系。set在集合元素全部为int这种特殊情况下会使用intset, 通过二叉查找来查找数据。
其他场景例如集群模式中用来存储ip:port与clusterNode的映射关系。
核心数据结构定义如下有:
2.1 dictEntry
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;
2.2 dictht
typedef struct dictht {
dictEntry **table;
// table size
unsigned long size;
//用于将哈希值映射到table的位置索引 = size - 1, hash(key) % sizemask 来计算落到table的那个bucket上
unsigned long sizemask;
//dict中数据个数,used / size的比值就是装载因子
} dictht;
2.3 dict
typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
/* 表示rehash的步骤,rehashidx == -1 当前没有进行rehash */
long rehashidx;
/* number of iterators currently running */
unsigned long iterators;
} dict;
2.4 dictType
typedef struct dictType {
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;
层属关系代码看着不太直观,我们换成图:
dict在需要扩展内存时避免一次性对所有key进行rehash,而是将rehash操作分散到对于dict的各个增删改查的操作中去。这种方法能做到每次只对一小部分key进行重哈希,而每次重哈希之间不影响dict的操作。这是为了避免rehash期间单个请求的响应时间剧烈增加。
判断是否进行rehash
static int _dictExpandIfNeeded(dict *d)
{
/* 判定是否当前处于rehash的状态. */
if (dictIsRehashing(d)) return DICT_OK;
/* 初始设置,table size = 4 */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
if (d->ht[0].used >= d->ht[0].size &&
(dict_can_resize ||
//数据量超过table size的5倍
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
{
//根据当前dict元素数量的2倍进行扩展
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}
dict扩展逻辑
int dictExpand(dict *d, unsigned long size)
{
/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;
dictht n; /* the new hash table */
unsigned long realsize = _dictNextPower(size);
/* Rehashing to the same table size is not useful. */
if (realsize == d->ht[0].size) return DICT_ERR;
/* 根据新的size重新分配一个hashtable */
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}
/* 重新分配的hashtable置于ht[1] */
d->ht[1] = n;
//开始渐进rehash的标记
d->rehashidx = 0;
return DICT_OK;
}
//如果2^n >= used * 2 返回 2^n, hashtable的size一定是2^n
static unsigned long _dictNextPower(unsigned long size)
{
unsigned long i = DICT_HT_INITIAL_SIZE;
if (size >= LONG_MAX) return LONG_MAX + 1LU;
while(1) {
if (i >= size)
return i;
i *= 2;
}
}
开启渐进式rehash
什么时候开启渐进式rehash, 在每次从hashtable中定位key的时候,代码如下
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
unsigned long idx, table;
dictEntry *he;
if (existing) *existing = NULL;
/* 判断是否需要扩展 */
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
/* 拉链法遍历hashtable,遍历两个hashtable */
/* ht[0], ht[1], 如果当前没有在rehash,则只遍历ht[0] */
for (table = 0; table <= 1; table++) {
//计算数据所在bucket位置,找到链表第一个entry
idx = hash & d->ht[table].sizemask;
/* Search if this slot does not already contain the given key */
he = d->ht[table].table[idx];
//遍历链表, 比较key值
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
if (existing) *existing = he;
return -1;
}
he = he->next;
}
if (!dictIsRehashing(d)) break;
}
return idx;
}
分步渐进rehash
什么时候进行分步渐进rehash, 在每一个add, get操作中:
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
dictht *ht;
//进行分步rehash
if (dictIsRehashing(d))
_dictRehashStep(d);
/* Get the index of the new element, or -1 if
* the element already exists. */
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
* system it is more likely that recently added entries are accessed
* more frequently. */
/** rehash时, 数据存入ht[1] */
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;
/* 设置key, 返回entry*/
dictSetKey(d, entry, key);
return entry;
}
分步rehash过程
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1);
}
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
/* 如果table中存在bucket为空时,检查10个bucket后都为空则返回,等下一次再处理 */
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
/* 一次处理table中的一个bucket */
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}
redis dict的缩小的触发存在两种情况
尝试触发的代码如下
//当used / size < 10%时进行开启缩容rehash。
int htNeedsResize(dict *dict) {
long long size, used;
size = dictSlots(dict);
used = dictSize(dict);
return (size > DICT_HT_INITIAL_SIZE &&
(used*100/size < HASHTABLE_MIN_FILL));
}
/* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
* we resize the hash table to save memory */
void tryResizeHashTables(int dbid) {
if (htNeedsResize(server.db[dbid].dict))
dictResize(server.db[dbid].dict);
if (htNeedsResize(server.db[dbid].expires))
dictResize(server.db[dbid].expires);
}
由于rehash时需要分配内存,此部分内存是从redis maxmemory中的一部分,因此当bucket比较大时,rehash生成ht[1]时会使用大量内存。如果ht[1]占用的内存 + 原来的内存超过maxmeory,则会发生key逐出。由于sizeof(dictEntry*) = 8byte,因此当ht[0] size为4时,ht[1]扩展2倍,ht[1]占用内存为8 * 4 * 2 = 64byte,当size = 2^n 时,ht[1]占用的内存为8 * 2 * 2^n。
三、dict scan
redis中dict是有状态的,dict存在四种状态:
1 正常状态
2 正在rehash状态
3 rehash扩容完成状态
4 rehash缩容完成状态
由于redis中dict是有状态的,只有在正常状态下可以完整的扫描字典中的所有的key。当dict在进行扩容和缩容时可能会存在扫描key的遗漏或者重复扫描。由于redis的dict中存在两个hashtable,分别为ht[0], ht[1]。rehash的过程是将ht[0]的key重新hash到ht[1]。这时redis的所有key其实存在于两个hashtable中。redis针对四种情况的方案如下:
在redis正在rehash时,采用hash桶掩码的高位序访问来解决。如下图
高位序访问即按照dict sizemask(掩码),在有效位(上图中dict sizemask为00000111)上从高位开始加一枚举(右边图的);低位则按照有效位的低位逐步加一枚举(左边的图)。
高位序遍历路径 : 0->4->2->6->1->5->3->7
低位序遍历路径:0->1->2->3->4->5->6->7
redis采用掩码高位序访问的原因是在rehash时尽量少的重复扫描key。为什么从高位掩码访问会减少重复扫描。由于rehash最终计算bucket位置时是取模操作(v & size_mask),举个例子:
针对 v = 00101011,原来size_mask为00000111,取模
00000111
00101011
= 00000011
当size_mask为00001111,取模
00001111
00101011
00001011
可以看出取模后的两个值存在相同的后缀011。size_mask变大后,生成的结果只是高位发生变化。
原来落到bucket[00000011]的key,table size从8->16时,size_mask从00000111->00001111时
会落到 [00000011]
和 [00001011]中,高位变化,低位不变,此时再看一遍从高位序访问的图
看一个redis进行scan操作的例子,如下图。
redis从左边hashtable bucket0开始遍历,到遍历bucket6时发生了rehash。bucket长度为8扩展到16,redis的scan路径就会按照 6→14→1→9→5→13→3→11→7→15来完成,这里存在一个隐藏逻辑,一定是先遍历小hashtable,再变了大的hashtable。
从图上可以看出高位序scan在dict rehash时即可以避免重复遍历,又能完整返回原始的所有Key。同理,字典缩容时也一样,字典缩容可以看出是反向扩容。
来看一下redis scan源码
unsigned long dictScan(dict *d,
unsigned long v,
dictScanFunction *fn,
dictScanBucketFunction* bucketfn,
void *privdata)
{
dictht *t0, *t1;
const dictEntry *de, *next;
unsigned long m0, m1;
if (dictSize(d) == 0) return 0;
//没有进行rehash的情况
if (!dictIsRehashing(d)) {
t0 = &(d->ht[0]);
m0= t0->sizemask;
/* Emit entries at cursor */
if (bucketfn)
bucketfn(privdata, &t0->table[v & m0]);
de = t0->table[v & m0];
while (de) {
next = de->next;
fn(privdata, de);
de = next;
}
/* 高位序遍历计算出下一个cursor */
v |= ~m0;
/* Increment the reverse cursor */
v = rev(v);
v++;
v = rev(v);
} else {
//进行rehash的情况
t0 = &d->ht[0];
t1 = &d->ht[1];
/* 确保t0为小hashtable,t1为大的hashtable */
if (t0->size > t1->size) {
t0 = &d->ht[1];
t1 = &d->ht[0];
}
m0 = t0->sizemask;
m1 = t1->sizemask;
/* 先根据cursor遍历小的hashtable */
if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
de = t0->table[v & m0];
while (de) {
next = de->next;
fn(privdata, de);
de = next;
}
/* 再变了大的hashtable,大的hashtable会遍历多个bucket */
do {
/* Emit entries at cursor */
if (bucketfn) bucketfn(privdata, &t1->table[v & m1]);
de = t1->table[v & m1];
while (de) {
next = de->next;
fn(privdata, de);
de = next;
}
/* 反向二进制迭代算法来计算出下一个cursor */
v |= ~m1;
v = rev(v);
v++;
v = rev(v);
/* Continue while bits covered by mask difference is non-zero */
} while (v & (m0 ^ m1));
}
return v;
}
// 反向二进制迭代算法
1. 对游标进行二进制翻转(原来的高位变成低位)
2. 低位+1 (对原来的高位进行+1,进位等操作)
3. 再进行一次二进制翻转 (恢复原来的高位,此时完成高位+1迭代)
作者介绍:互联网十年交易/支付/搜索等架构和研发经验,擅长架构设计百亿级流量系统和高并发性能调优,目前专注做卫星通信/卫星遥感的研发工作,努力拥抱商业航天的星辰大海。 公众号:无量云颠
欢迎一起交流技术 !~
页面更新:2024-03-12
本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828
© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号