Redis源码分析之字典dict

一、dict的使用场景

redis数据结构set, zset, hash, string类型的会用到dict。zset, hash类型在数据量少的时候会在ziplist, 数据量大时,zset使用dict + skiplist的数据结构,dict来存储value和score的映射关系。set在集合元素全部为int这种特殊情况下会使用intset, 通过二叉查找来查找数据。

其他场景例如集群模式中用来存储ip:port与clusterNode的映射关系。

二、dict的数据结构

核心数据结构定义如下有:

2.1 dictEntry

typedef struct dictEntry {
    void *key;
    union {
        void *val;
        uint64_t u64;
        int64_t s64;
        double d;
    } v;
    struct dictEntry *next;
} dictEntry;

2.2 dictht

typedef struct dictht {
    dictEntry **table;
    // table size
    unsigned long size;  
    //用于将哈希值映射到table的位置索引 = size - 1, hash(key) % sizemask 来计算落到table的那个bucket上    
    unsigned long sizemask;  
    //dict中数据个数,used / size的比值就是装载因子
  } dictht;

2.3 dict

typedef struct dict {
    dictType *type;
    void *privdata;
    dictht ht[2];
    /* 表示rehash的步骤,rehashidx == -1 当前没有进行rehash */
    long rehashidx; 
    /* number of iterators currently running */
    unsigned long iterators; 
} dict;

2.4 dictType

typedef struct dictType {
    uint64_t (*hashFunction)(const void *key);
    void *(*keyDup)(void *privdata, const void *key);
    void *(*valDup)(void *privdata, const void *obj);
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    void (*keyDestructor)(void *privdata, void *key);
    void (*valDestructor)(void *privdata, void *obj);
} dictType;

层属关系代码看着不太直观,我们换成图:

redis dict核心数据结构模型

三、dict的核心API

dict在需要扩展内存时避免一次性对所有key进行rehash,而是将rehash操作分散到对于dict的各个增删改查的操作中去。这种方法能做到每次只对一小部分key进行重哈希,而每次重哈希之间不影响dict的操作。这是为了避免rehash期间单个请求的响应时间剧烈增加。

判断是否进行rehash

static int _dictExpandIfNeeded(dict *d)
{
    /* 判定是否当前处于rehash的状态. */
    if (dictIsRehashing(d)) return DICT_OK;


    /* 初始设置,table size = 4 */
    if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
    /* If we reached the 1:1 ratio, and we are allowed to resize the hash
     * table (global setting) or we should avoid it but the ratio between
     * elements/buckets is over the "safe" threshold, we resize doubling
     * the number of buckets. */
    if (d->ht[0].used >= d->ht[0].size &&
        (dict_can_resize ||
         //数据量超过table size的5倍
         d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
    {
        //根据当前dict元素数量的2倍进行扩展
       return dictExpand(d, d->ht[0].used*2);
    }
    return DICT_OK;
}

dict扩展逻辑

int dictExpand(dict *d, unsigned long size)
{
    /* the size is invalid if it is smaller than the number of
     * elements already inside the hash table */
    if (dictIsRehashing(d) || d->ht[0].used > size)
        return DICT_ERR;
    dictht n; /* the new hash table */
    unsigned long realsize = _dictNextPower(size);
    /* Rehashing to the same table size is not useful. */
    if (realsize == d->ht[0].size) return DICT_ERR;
    /* 根据新的size重新分配一个hashtable */
    n.size = realsize;
    n.sizemask = realsize-1;
    n.table = zcalloc(realsize*sizeof(dictEntry*));
    n.used = 0;


    /* Is this the first initialization? If so it's not really a rehashing
     * we just set the first hash table so that it can accept keys. */
    if (d->ht[0].table == NULL) {
        d->ht[0] = n;
        return DICT_OK;
    }


    /* 重新分配的hashtable置于ht[1] */
    d->ht[1] = n;
    //开始渐进rehash的标记
    d->rehashidx = 0;
    return DICT_OK;
}


//如果2^n >= used * 2 返回 2^n, hashtable的size一定是2^n
static unsigned long _dictNextPower(unsigned long size)
{
    unsigned long i = DICT_HT_INITIAL_SIZE;
    if (size >= LONG_MAX) return LONG_MAX + 1LU;
    while(1) {
        if (i >= size)
            return i;
        i *= 2;
    }
}

开启渐进式rehash

什么时候开启渐进式rehash, 在每次从hashtable中定位key的时候,代码如下

static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
    unsigned long idx, table;
    dictEntry *he;
    if (existing) *existing = NULL;


    /* 判断是否需要扩展 */
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
        
    /* 拉链法遍历hashtable,遍历两个hashtable */
    /* ht[0], ht[1], 如果当前没有在rehash,则只遍历ht[0] */
    for (table = 0; table <= 1; table++) {
        //计算数据所在bucket位置,找到链表第一个entry
        idx = hash & d->ht[table].sizemask;


        /* Search if this slot does not already contain the given key */
        he = d->ht[table].table[idx];
        //遍历链表, 比较key值
        while(he) {
            if (key==he->key || dictCompareKeys(d, key, he->key)) {
                if (existing) *existing = he;
                return -1;
            }
            he = he->next;
        }
        if (!dictIsRehashing(d)) break;
    }
    return idx;
}

分步渐进rehash

什么时候进行分步渐进rehash, 在每一个add, get操作中:

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    long index;
    dictEntry *entry;
    dictht *ht;
    //进行分步rehash
    if (dictIsRehashing(d))
       _dictRehashStep(d);
       
    /* Get the index of the new element, or -1 if
     * the element already exists. */
    if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
        return NULL;
    /* Allocate the memory and store the new entry.
     * Insert the element in top, with the assumption that in a database
     * system it is more likely that recently added entries are accessed
     * more frequently. */
    /** rehash时, 数据存入ht[1] */
    ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
    entry = zmalloc(sizeof(*entry));
    entry->next = ht->table[index];
    ht->table[index] = entry;
    ht->used++;
    /* 设置key, 返回entry*/
    dictSetKey(d, entry, key);
    return entry;
}

分步rehash过程

static void _dictRehashStep(dict *d) {
    if (d->iterators == 0) dictRehash(d,1);
}


int dictRehash(dict *d, int n) {
    int empty_visits = n*10; /* Max number of empty buckets to visit. */
    if (!dictIsRehashing(d)) return 0;


    while(n-- && d->ht[0].used != 0) {
        dictEntry *de, *nextde;


        /* Note that rehashidx can't overflow as we are sure there are more
         * elements because ht[0].used != 0 */
        assert(d->ht[0].size > (unsigned long)d->rehashidx);


        /* 如果table中存在bucket为空时,检查10个bucket后都为空则返回,等下一次再处理 */
        while(d->ht[0].table[d->rehashidx] == NULL) {
            d->rehashidx++;
            if (--empty_visits == 0) return 1;
        }
        
        de = d->ht[0].table[d->rehashidx];
        
        /* 一次处理table中的一个bucket */
        /* Move all the keys in this bucket from the old to the new hash HT */
        while(de) {
            uint64_t h;
            nextde = de->next;
            /* Get the index in the new hash table */
            h = dictHashKey(d, de->key) & d->ht[1].sizemask;
            de->next = d->ht[1].table[h];
            d->ht[1].table[h] = de;
            d->ht[0].used--;
            d->ht[1].used++;
            de = nextde;
        }
        d->ht[0].table[d->rehashidx] = NULL;
        d->rehashidx++;
    }
    /* Check if we already rehashed the whole table... */
    if (d->ht[0].used == 0) {
        zfree(d->ht[0].table);
        d->ht[0] = d->ht[1];
        _dictReset(&d->ht[1]);
        d->rehashidx = -1;
        return 0;
    }
    /* More to rehash... */
    return 1;
}

redis dict的缩小的触发存在两种情况

尝试触发的代码如下

//当used / size < 10%时进行开启缩容rehash。
int htNeedsResize(dict *dict) {
    long long size, used;
    size = dictSlots(dict);
    used = dictSize(dict);
    return (size > DICT_HT_INITIAL_SIZE &&
        (used*100/size < HASHTABLE_MIN_FILL));
}


/* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
 * we resize the hash table to save memory */
void tryResizeHashTables(int dbid) {
    if (htNeedsResize(server.db[dbid].dict))
        dictResize(server.db[dbid].dict);
        
    if (htNeedsResize(server.db[dbid].expires))
        dictResize(server.db[dbid].expires);
}

由于rehash时需要分配内存,此部分内存是从redis maxmemory中的一部分,因此当bucket比较大时,rehash生成ht[1]时会使用大量内存。如果ht[1]占用的内存 + 原来的内存超过maxmeory,则会发生key逐出。由于sizeof(dictEntry*) = 8byte,因此当ht[0] size为4时,ht[1]扩展2倍,ht[1]占用内存为8 * 4 * 2 = 64byte,当size = 2^n 时,ht[1]占用的内存为8 * 2 * 2^n。

三、dict scan

redis中dict是有状态的,dict存在四种状态:

1 正常状态

2 正在rehash状态

3 rehash扩容完成状态

4 rehash缩容完成状态

由于redis中dict是有状态的,只有在正常状态下可以完整的扫描字典中的所有的key。当dict在进行扩容和缩容时可能会存在扫描key的遗漏或者重复扫描。由于redis的dict中存在两个hashtable,分别为ht[0], ht[1]。rehash的过程是将ht[0]的key重新hash到ht[1]。这时redis的所有key其实存在于两个hashtable中。redis针对四种情况的方案如下:

    1. Dict tablesize保持不变,稳定的状态下,直接顺序遍历即可
    2. Dict Resize,dict扩大了,如果还是按照顺序遍历,就会导致扫描大量重复Key。比如tablesize从8变成了16,假设之前访问的是3号桶,那么表扩展后则是继续访问415号桶;但是,原先的03号桶中的数据在Dict长度变大后被迁移到811号桶中,因此,遍历811号桶的时候会有大量的重复Key被返回。
    3. Dict Resize,dict缩小了,如果还是按照顺序遍历,就会导致大量的Key被遗漏。比如tablesize从8变成了4,假设当前访问的是3号桶,那么下一次则会直接返回遍历结束了;但是之前47号桶中的数据在缩容后迁移带 可03号桶中,因此这部分Key就无法扫描到。
    4. 字典正在rehash,这种情况如(2)和(3)情况一下,要么大量重复扫描、要么遗漏很多Key。

在redis正在rehash时,采用hash桶掩码的高位序访问来解决。如下图

hash桶掩码的高位序访问

高位序访问即按照dict sizemask(掩码),在有效位(上图中dict sizemask为00000111)上从高位开始加一枚举(右边图的);低位则按照有效位的低位逐步加一枚举(左边的图)。

高位序遍历路径 : 0->4->2->6->1->5->3->7

低位序遍历路径:0->1->2->3->4->5->6->7

redis采用掩码高位序访问的原因是在rehash时尽量少的重复扫描key。为什么从高位掩码访问会减少重复扫描。由于rehash最终计算bucket位置时是取模操作(v & size_mask),举个例子:

针对 v = 00101011,原来size_mask为00000111,取模

      00000111
       00101011
=      00000011

当size_mask为00001111,取模

       00001111
       00101011
       00001011

可以看出取模后的两个值存在相同的后缀011。size_mask变大后,生成的结果只是高位发生变化。

原来落到bucket[00000011]的key,table size从8->16时,size_mask从00000111->00001111时
会落到        [00000011]
和           [00001011]中,高位变化,低位不变,此时再看一遍从高位序访问的图

看一个redis进行scan操作的例子,如下图。

redis从左边hashtable bucket0开始遍历,到遍历bucket6时发生了rehash。bucket长度为8扩展到16,redis的scan路径就会按照 6→14→1→9→5→13→3→11→7→15来完成,这里存在一个隐藏逻辑,一定是先遍历小hashtable,再变了大的hashtable。

hashtable bucket

从图上可以看出高位序scan在dict rehash时即可以避免重复遍历,又能完整返回原始的所有Key。同理,字典缩容时也一样,字典缩容可以看出是反向扩容。

来看一下redis scan源码

unsigned long dictScan(dict *d,
                       unsigned long v,
                       dictScanFunction *fn,
                       dictScanBucketFunction* bucketfn,
                       void *privdata)
{
    dictht *t0, *t1;
    const dictEntry *de, *next;
    unsigned long m0, m1;


    if (dictSize(d) == 0) return 0;
    //没有进行rehash的情况
    if (!dictIsRehashing(d)) {
        t0 = &(d->ht[0]);
        m0= t0->sizemask;
        /* Emit entries at cursor */
        if (bucketfn) 
          bucketfn(privdata, &t0->table[v & m0]);
        
        de = t0->table[v & m0];
        
        while (de) {
            next = de->next;
            fn(privdata, de);
            de = next;
        }
        /* 高位序遍历计算出下一个cursor */
        v |= ~m0;
        /* Increment the reverse cursor */
        v = rev(v);
        v++;
        v = rev(v);
    } else {
        //进行rehash的情况
        t0 = &d->ht[0];
        t1 = &d->ht[1];
        /* 确保t0为小hashtable,t1为大的hashtable */
        if (t0->size > t1->size) {
            t0 = &d->ht[1];
            t1 = &d->ht[0];
        }
        m0 = t0->sizemask;
        m1 = t1->sizemask;
        /* 先根据cursor遍历小的hashtable */
        if (bucketfn) bucketfn(privdata, &t0->table[v & m0]);
        de = t0->table[v & m0];
        while (de) {
            next = de->next;
            fn(privdata, de);
            de = next;
        }
        /* 再变了大的hashtable,大的hashtable会遍历多个bucket */
        do {
            /* Emit entries at cursor */
            if (bucketfn) bucketfn(privdata, &t1->table[v & m1]);
            de = t1->table[v & m1];
            while (de) {
                next = de->next;
                fn(privdata, de);
                de = next;
            }
            /* 反向二进制迭代算法来计算出下一个cursor */
            v |= ~m1;
            v = rev(v);
            v++;
            v = rev(v);
            /* Continue while bits covered by mask difference is non-zero */
        } while (v & (m0 ^ m1));
    }
    return v;
}


// 反向二进制迭代算法
1. 对游标进行二进制翻转(原来的高位变成低位)
2. 低位+1 (对原来的高位进行+1,进位等操作)
3. 再进行一次二进制翻转 (恢复原来的高位,此时完成高位+1迭代)


作者介绍:互联网十年交易/支付/搜索等架构和研发经验,擅长架构设计百亿级流量系统和高并发性能调优,目前专注做卫星通信/卫星遥感的研发工作,努力拥抱商业航天的星辰大海。 公众号:无量云颠

欢迎一起交流技术 !~

展开阅读全文

页面更新:2024-03-12

标签:遍历   低位   数据结构   高位   字典   源码   元素   核心   情况   操作   关系   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top