网站风格化设计方案,企业品牌网站建设方案,张家明做网站,网站高级?O置1. 数据结构
我们自底向上来描述redis散列涉及的数据结构。
首先是负责存储键值的结构#xff0c;Java 中存储的结构叫 Entry#xff0c;redis里也差不多#xff0c;叫dictEntry:
typedef struct dictEntry {void *key; // 键#xff0c;它是一个指针类型…1. 数据结构
我们自底向上来描述redis散列涉及的数据结构。
首先是负责存储键值的结构Java 中存储的结构叫 Entryredis里也差不多叫dictEntry:
typedef struct dictEntry {void *key; // 键它是一个指针类型所以我们可以将其指向sds的指针union { // 这是一个联合类型也就是你可以选择任意一个字段来存储void *val; // 因为散列也是redis数据库的底层结构它是使用val字段uint64_t u64; // redis还用散列保存键过期时间此时就用u64存储过期时间int64_t s64;double d;} v;struct dictEntry *next; // Java中为了处理哈希冲突是采用拉链法redis也是一样
} dictEntry; 我们向上就是 dictEntry 的存储容器也叫 dictht 我们就叫他哈希
typedef struct dictht {dictEntry **table; // 指针数组 unsigned long size; // 底层数组长度或者会所是slot数量unsigned long sizemask; // 它比size小1位运算比取模快用它加速定位hash值索引unsigned long used; // 这个是当前哈希表里所有数据的总和哈希冲突的也算
} dictht;
然后其实到这里基本可以用了。但是redis依然在 dicthst 之上抽象了 dict 结构我们叫他散列:
typedef struct dict {dictType *type; // redis将数据和函数分离dictType 是 API 抽象void *privdata; // 网上说是和 type 一起用的我们初始化时都是NULLdictht ht[2]; // len2的dictht数组主要用0当rehast时会开放1long rehashidx; // rehashidx -1 则处于rehashingint16_t pauserehash; /* If 0 rehashing is paused (0 indicates coding error) */
} dict;
下面是作为数据库键值对存储时候用到的 dictType
/* Db-dict, keys are sds strings, vals are Redis objects. */
dictType dbDictType {dictSdsHash, /* hash function */NULL, /* key dup */NULL, /* val dup */dictSdsKeyCompare, /* key compare */dictSdsDestructor, /* key destructor */dictObjectDestructor, /* val destructor */dictExpandAllowed /* allow to expand */
};
下面是作为失效控制存储时候用到的 dictType
/* Db-expires */
dictType dbExpiresDictType {dictSdsHash, /* hash function */NULL, /* key dup */NULL, /* val dup */dictSdsKeyCompare, /* key compare */NULL, /* key destructor */NULL, /* val destructor */dictExpandAllowed /* allow to expand */
};
下面还有针对事务watch专用的 dictType
/* Keylist hash table type has unencoded redis objects as keys and* lists as values. Its used for blocking operations (BLPOP) and to* map swapped keys to a list of clients waiting for this keys to be loaded. */
dictType keylistDictType {dictObjHash, /* hash function */NULL, /* key dup */NULL, /* val dup */dictObjKeyCompare, /* key compare */dictObjectDestructor, /* key destructor */dictListDestructor, /* val destructor */NULL /* allow to expand */
}; 最后这部分是redis服务器上的DB字段我们关注散列就是因为它可以用作数据库的实现所以这里也列举一下
struct redisServer {redisDb *db;
}
/* Redis database representation. There are multiple databases identified* by integers from 0 (the default database) up to the max configured* database. The database number is the id field in the structure. */
typedef struct redisDb {dict *dict; // DB 的 keyspace键空间当然不是只存储键dict *expires; // DB 中键的过期时间dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/dict *ready_keys; /* Blocked keys that received a PUSH */dict *watched_keys; /* 事务中通过watch观察的键int id; // 0-15redis支持16个数据库long long avg_ttl; // 平均ttl。没啥用unsigned long expires_cursor; // Cursor of the active expire cycle. */list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb; 需要注意的是redis 支持的map结构底层的实现可能是ziplist即压缩列表也可能是dict即散列。我们将名词规范化。 2.生命周期初始化
我们先看作为数据库键值对存储时的初始化这是在redis服务器启动时的一部分DB 里直接使用dict他压根没想用ziplist
for (j 0; j server.dbnum; j) {server.db[j].dict dictCreate(dbDictType,NULL);server.db[j].expires dictCreate(dbExpiresDictType,NULL);server.db[j].expires_cursor 0;server.db[j].blocking_keys dictCreate(keylistDictType,NULL);server.db[j].ready_keys dictCreate(objectKeyPointerValueDictType,NULL);server.db[j].watched_keys dictCreate(keylistDictType,NULL);server.db[j].id j;server.db[j].avg_ttl 0;server.db[j].defrag_later listCreate();listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
}
我们知道redis默认支持16个库这里就是for循环生成16个数据库然后我们进入 dictCreate 方法
/* Create a new hash table */
dict *dictCreate(dictType *type,void *privDataPtr)
{dict *d zmalloc(sizeof(*d)); // 使用malloc 分配内存返回指针_dictInit(d,type,privDataPtr); // 委托其他方法return d;
}
官方注释也很清晰这就是创建一个新的哈希表的方法我们传入了两个参数一个就是 dictType 后续针对该哈希表的操作的一些可以自定义的 API都是由这个结构体来处理的。
我理解这个非常像 Java 里的抽象类里未实现的方法交给子类去自定义。
/* Initialize the hash table */
int _dictInit(dict *d, dictType *type,void *privDataPtr)
{_dictReset(d-ht[0]); // 初始化 dict.ht[0]_dictReset(d-ht[1]); // 初始化 dict.ht[1]d-type type; // 这个玩意就是 dictTyped-privdata privDataPtr; // 这个玩意是个NULLd-rehashidx -1; // rehash索引-1表示没有发生rehashd-pauserehash 0;return DICT_OK;
}
/* Reset a hash table already initialized with ht_init().* NOTE: This function should only be called by ht_destroy(). */
static void _dictReset(dictht *ht)
{ht-table NULL; // dictEntry **table 哈希里的字段声明ht-size 0;ht-sizemask 0;ht-used 0;
}
dictCreate 委托给 _dictInit 方法来初始化我们刚生成的 dict这里的工作其实很简答就是初始化需要注意的是此时 table 数组是 NULL等真正 set 值时才会初始化。 3.生命周期请求
3.1 hset 命令
Redis 负责处理 hset 命令的函数就叫 hsetCommand 而且redis里的命令都是这个起名法 typedef struct redisObject {unsigned type:4;unsigned encoding:4;unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or* LFU data (least significant 8 bits frequency* and most significant 16 bits access time). */int refcount; // 这个字段不用太关注void *ptr; // 指向真实结构体的指针比如我们现在讲散列那么它可能指向// ziplist也可能指向 dict
} robj; robj 是贯穿 redis 的一个容器类型你将其想象成 Java 里的 ListOptional。
主要关注typeencoding以及ptr
type该对象的类型字符串列表集合有序集合mapstreamencoding每种type会因为各种性能考虑在不同场景下有多重编码方式啥是编码就是底层数据结构的类型。打个比方Java 里 List 在底层可以用数组也可以用链表实现ptr指针结构typeencoding选择出的结构体的指针 void hsetCommand(client *c) {int i, created 0;robj *o;// 如果参数数量是奇数那么命令输入错误这是hset的格式hset key field valueif ((c-argc % 2) 1) {addReplyErrorFormat(c,wrong number of arguments for %s command,c-cmd-name);return;}// 查询传入的key对应的散列对象的robj如果不存在则创建一个合适的散列对象并返回// 我们提前讲如果该key不存在那么会创建一个ziplist他其实是个char数组的指针// 然后用robj对ziplist做封装if ((o hashTypeLookupWriteOrCreate(c,c-argv[1])) NULL) return;hashTypeTryConversion(o,c-argv,2,c-argc-1);for (i 2; i c-argc; i 2)created !hashTypeSet(o,c-argv[i]-ptr,c-argv[i1]-ptr,HASH_SET_COPY);/* HMSET (deprecated) and HSET return value is different. */char *cmdname c-argv[0]-ptr;if (cmdname[1] s || cmdname[1] S) {/* HSET */addReplyLongLong(c, created);} else {/* HMSET */addReply(c, shared.ok);}signalModifiedKey(c,c-db,c-argv[1]);notifyKeyspaceEvent(NOTIFY_HASH,hset,c-argv[1],c-db-id);server.dirty (c-argc - 2)/2;
}
3.1.1 hashTypeLookupWriteOrCreate
我们先看第一个内部函数就是根据 hset 传入的 key 查得对应的哈希散列
robj *hashTypeLookupWriteOrCreate(client *c, robj *key) {// 根据传入的key【robj】查回对应的val【robj】传入的key就是map的keyrobj *o lookupKeyWrite(c-db,key);// 由于key对应的应该是一个散列所以这里强制匹配即robj.type HASHif (checkType(c,o,OBJ_HASH)) return NULL;// 如果返回的robj是NULL则说明不存在该散列那么就创建一个散列。否则直接返回if (o NULL) {// 当在db里没有查到这个key那么创建一个map对象默认初始化的都是ziplist编码的o createHashObject();// 将新创建的robj以及key设置到db中key用于定位插槽【slot】的// 而o也就是新创建的这个map对象会和key一起再组成一个dictEntry对象// 存储到db这个大dict中db.dict[0]dbAdd(c-db,key,o);}return o;
}
里面比较核心的方法是 createHashObject中这个方法只生成一个空的map对象【robj】不会向里面设置任何键值对这里会涉及到散列的编码选择
robj *createHashObject(void) {unsigned char *zl ziplistNew();robj *o createObject(OBJ_HASH, zl);o-encoding OBJ_ENCODING_ZIPLIST;return o;
}
/* Create a new empty ziplist. */
unsigned char *ziplistNew(void) {unsigned int bytes ZIPLIST_HEADER_SIZEZIPLIST_END_SIZE;unsigned char *zl zmalloc(bytes);ZIPLIST_BYTES(zl) intrev32ifbe(bytes);ZIPLIST_TAIL_OFFSET(zl) intrev32ifbe(ZIPLIST_HEADER_SIZE);ZIPLIST_LENGTH(zl) 0;zl[bytes-1] ZIP_END;return zl;
}
可以看到默认redis初始化的map就是 ziplist然后等真正插入键值对时再看是否需要转换为哈希。 然后由于可能是新生成了一个map对象所以还涉及将其挂到db上的动作
void dbAdd(redisDb *db, robj *key, robj *val) {// key是hset命令的keyval则是刚生成的map对象【robj】sds copy sdsdup(key-ptr);// 该函数内部会讲keyval构造为一个dictEntry挂到db.dict.ht[0]中// 按照顺序看DB - dict 散列 - dictha 哈希 - dictEntry int retval dictAdd(db-dict, copy, val);serverAssertWithInfo(NULL,key,retval DICT_OK);signalKeyAsReady(db, key, val-type);if (server.cluster_enabled) slotToKeyAdd(key-ptr);
}
这部分代码其实与前面讲的 dictAdd 是基本相同的实现毕竟db也就是个散列
3.1.2 hashTypeTryConversion
然后我们来看第二个函数 hashTypeTryConversion 简单的将就是判断插入新的值后是否可以将原本是 ziplist 存储的散列结构转换为哈希存储的散列。
散列支持两种编码即 ziplist 和 hash当满足下面两个条件时redis会使用ziplist
key-value 结构的所有键值对的字符串长度都小于 hash-max-ziplist-value默认值64可以通过配置文件修改散列对象保存的键值对的个数1个键值对记为1小于 hash-max-ziplist-entries默认值512,可以通过配置文件修改
redis 只支持 ziplist 到 hash 的转换不支持反向。
3.1.3 hashTypeSet
然后我们来看第三个函数 hashTypeSet 他就是插入键值对的。
#define HASH_SET_TAKE_FIELD (10)
#define HASH_SET_TAKE_VALUE (11)
#define HASH_SET_COPY 0
int hashTypeSet(robj *o, sds field, sds value, int flags) {int update 0;// 如果值【robj】的编码方式是 ziplist 时if (o-encoding OBJ_ENCODING_ZIPLIST) {unsigned char *zl, *fptr, *vptr;zl o-ptr;fptr ziplistIndex(zl, ZIPLIST_HEAD);if (fptr ! NULL) {fptr ziplistFind(zl, fptr, (unsigned char*)field, sdslen(field), 1);if (fptr ! NULL) {/* Grab pointer to the value (fptr points to the field) */vptr ziplistNext(zl, fptr);serverAssert(vptr ! NULL);update 1;/* Replace value */zl ziplistReplace(zl, vptr, (unsigned char*)value,sdslen(value));}}if (!update) {/* Push new field/value pair onto the tail of the ziplist */zl ziplistPush(zl, (unsigned char*)field, sdslen(field),ZIPLIST_TAIL);zl ziplistPush(zl, (unsigned char*)value, sdslen(value),ZIPLIST_TAIL);}o-ptr zl;/* Check if the ziplist needs to be converted to a hash table */if (hashTypeLength(o) server.hash_max_ziplist_entries)hashTypeConvert(o, OBJ_ENCODING_HT);} else if (o-encoding OBJ_ENCODING_HT) {// 如果值【robj】的编码方式为哈希时传入命令里的 field 查回匹配的entry// 如果没匹配就设置NULL需要注意dictEntry 保存了键【key】和值【val】dictEntry *de dictFind(o-ptr,field);if (de) {// 如果map里有该field则进入这段代码首先获取目前field对应的值并释放内存sdsfree(dictGetVal(de));if (flags HASH_SET_TAKE_VALUE) {// 然后将新的val设置到指针上dictGetVal(de)返回的是指针dictGetVal(de) value;value NULL;} else {// 这个是将value复制了一遍然后设置到val指针上dictGetVal(de) sdsdup(value);}update 1;} else {// 如果map中未查得fieldsds f,v;// 构建field结构体if (flags HASH_SET_TAKE_FIELD) {f field;field NULL;} else {f sdsdup(field);}// 构建value结构体if (flags HASH_SET_TAKE_VALUE) {v value;value NULL;} else {v sdsdup(value);}// 将field-value设置到map中dictAdd(o-ptr,f,v);}} else {serverPanic(Unknown hash encoding);}/* Free SDS strings we did not referenced elsewhere if the flags* want this function to be responsible. */if (flags HASH_SET_TAKE_FIELD field) sdsfree(field);if (flags HASH_SET_TAKE_VALUE value) sdsfree(value);return update;
} 这里的 dictAdd 负责将键值设到 map 上
int dictAdd(dict *d, void *key, void *val)
{ // 设置entry如果entry已存在这里返回NULLdictEntry *entry dictAddRaw(d,key,NULL);if (!entry) return DICT_ERR;// 设置值dictSetVal(d, entry, val);return DICT_OK;
}
dictAddRaw 此函数添加条目但不是设置值而是向用户返回 dictEntry 结构这将确保按用户的意愿填充值字段。 这个函数也直接暴露给用户API主要是为了在哈希值中存储非指针前面我看到过 dictEntry 里有个联合体即可以放 *val也可以放数字等。 dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{long index;dictEntry *entry;dictht *ht;if (dictIsRehashing(d)) _dictRehashStep(d);/* Get the index of the new element, or -1 if* the element already exists. */if ((index _dictKeyIndex(d, key, dictHashKey(d,key), existing)) -1)return NULL;/* Allocate the memory and store the new entry.* Insert the element in top, with the assumption that in a database* system it is more likely that recently added entries are accessed* more frequently. */ht dictIsRehashing(d) ? d-ht[1] : d-ht[0];entry zmalloc(sizeof(*entry));entry-next ht-table[index];ht-table[index] entry;ht-used;/* Set the hash entry fields. */dictSetKey(d, entry, key);return entry;
} 3.1.4 signalModifiedKey
我们来看第四个函数signalModifiedKey 这个函数是后置处理涉及事物 watck key 和失效。 void signalModifiedKey(client *c, redisDb *db, robj *key) {touchWatchedKey(db,key);trackingInvalidateKey(c,key,1);
}
/* Touch a key, so that if this key is being WATCHed by some client the* next EXEC will fail. */
// watch key 检查这样客户端在执行 exec 时就失败
void touchWatchedKey(redisDb *db, robj *key) {list *clients;listIter li;listNode *ln;// 如果db的watched_keys它也是一个散列为空则返回没有事物在watchif (dictSize(db-watched_keys) 0) return;// 查询watch了该key的客户端这里返回的是个list这里查询很简单就是哈希取模再比较keyclients dictFetchValue(db-watched_keys, key);if (!clients) return;// 将所有watch了该key的客户端的flag标签都或等 CLIENT_DIRTY_CAS/* Mark all the clients watching this key as CLIENT_DIRTY_CAS *//* Check if we are already watching for this key */listRewind(clients,li);while((ln listNext(li))) {client *c listNodeValue(ln);c-flags | CLIENT_DIRTY_CAS;}
}
// 我也不知道这个函数是干嘛的
void trackingInvalidateKey(client *c, robj *keyobj, int bcast) {if (TrackingTable NULL) return;unsigned char *key (unsigned char*)keyobj-ptr;size_t keylen sdslen(keyobj-ptr);if (bcast raxSize(PrefixTable) 0)trackingRememberKeyToBroadcast(c,(char *)key,keylen);rax *ids raxFind(TrackingTable,key,keylen);if (ids raxNotFound) return;raxIterator ri;raxStart(ri,ids);raxSeek(ri,^,NULL,0);while(raxNext(ri)) {uint64_t id;memcpy(id,ri.key,sizeof(id));client *target lookupClientByID(id);/* Note that if the client is in BCAST mode, we dont want to* send invalidation messages that were pending in the case* previously the client was not in BCAST mode. This can happen if* TRACKING is enabled normally, and then the client switches to* BCAST mode. */if (target NULL ||!(target-flags CLIENT_TRACKING)||target-flags CLIENT_TRACKING_BCAST){continue;}/* If the client enabled the NOLOOP mode, dont send notifications* about keys changed by the client itself. */if (target-flags CLIENT_TRACKING_NOLOOP target server.current_client){continue;}/* If target is current client and its executing a command, we need schedule key invalidation.* As the invalidation messages may be interleaved with command* response and should after command response. */if (target server.current_client server.fixed_time_expire) {incrRefCount(keyobj);listAddNodeTail(server.tracking_pending_keys, keyobj);} else {sendTrackingMessage(target,(char *)keyobj-ptr,sdslen(keyobj-ptr),0);}}raxStop(ri);/* Free the tracking table: well create the radix tree and populate it* again if more keys will be modified in this caching slot. */TrackingTableTotalItems - raxSize(ids);raxFree(ids);raxRemove(TrackingTable,(unsigned char*)key,keylen,NULL);
}
3.2 hlen 命令 hlen 命令是由下面这个函数来处理的
void hlenCommand(client *c) {robj *o;// 这行很简单就是从数据库里查询key对应的robj对象其中因为这map结构的方法// 所以检查robj头里的type是否是HASH否则命令就用错了if ((o lookupKeyReadOrReply(c,c-argv[1],shared.czero)) NULL ||checkType(c,o,OBJ_HASH)) return;// hashTypeLength 返回一个长度然后addReplyLongLong将结果写到buf中addReplyLongLong(c,hashTypeLength(o));
}
所以核心的方法都是在下面这个函数内
/* Return the number of elements in a hash. */
unsigned long hashTypeLength(const robj *o) {unsigned long length ULONG_MAX;// 如果是ziplist编码的那么这里有优点特殊了// 当 zllen 小于65535时那么ziplist的lenzllen// 当 zllen 等于 65535,那么就得一个个算直到END标记处if (o-encoding OBJ_ENCODING_ZIPLIST) {// 这里使用ziplistLen函数该函数就是直接拿ziplist数据结构的zllen字段// 这里除以2是因为这是保存的键值对是成对出现的length ziplistLen(o-ptr) / 2;} else if (o-encoding OBJ_ENCODING_HT) {// 如果时hash编码其dictht里的used字段记录了当前有多少键值对length dictSize((const dict*)o-ptr);} else {serverPanic(Unknown hash encoding);}return length;
}
下面引用ziplist的内存布局图
area |---- ziplist header ----|----------- entries -------------|-end-|size 4 bytes 4 bytes 2 bytes ? ? ? ? 1 byte---------------------------------------------------------------
component | zlbytes | zltail | zllen | entry1 | entry2 | ... | entryN | zlend |---------------------------------------------------------------^ ^ ^
address | | |ZIPLIST_ENTRY_HEAD | ZIPLIST_ENTRY_END|ZIPLIST_ENTRY_TAIL 域长度/类型域的值zlbytesuint32_t整个 ziplist 占用的内存字节数对 ziplist 进行内存重分配或者计算末端时使用。zltailuint32_t到达 ziplist 表尾节点的偏移量。 通过这个偏移量可以在不遍历整个 ziplist 的前提下弹出表尾节点。zllenuint16_tziplist 中节点的数量。 当这个值小于 UINT16_MAX 65535时这个值就是 ziplist 中节点的数量 当这个值等于 UINT16_MAX 时节点的数量需要遍历整个 ziplist 才能计算得出。entryX?ziplist 所保存的节点各个节点的长度根据内容而定。zlenduint8_t255 的二进制值 1111 1111 UINT8_MAX 用于标记 ziplist 的末端。
3.3 增量rehash 增量式rehash是redis时间事件处理函数中的一部分
/* Rehash */
if (server.activerehashing) {// dbs_per_call 这个值是min16你设置的数据库数量for (j 0; j dbs_per_call; j) {// 增量式rehashint work_done incrementallyRehash(rehash_db);// work_done 标识该在rehash过程中是否真的干活了if (work_done) {/* If the function did some work, stop here, well do* more at the next cron loop. */break;} else {/* If this db didnt need rehash, well try the next one. */// 如果该库没有做任何工作那说明该库无需rehash那处理下一个数据// 这里取了个模好实现环型处理,0,1,2,3...15,0,1,2...rehash_db;rehash_db % server.dbnum;}}
}
我们主要关注下面这个增量式rehash核心方法
int incrementallyRehash(int dbid) {/* Keys dictionary */// 如果键空间在rehash那么就执行1ms的if (dictIsRehashing(server.db[dbid].dict)) {dictRehashMilliseconds(server.db[dbid].dict,1);return 1; /* already used our millisecond for this loop... */}/* Expires */// 如果键空间没有rehash则检查失效空间是否在rehash是的话也执行1ms的if (dictIsRehashing(server.db[dbid].expires)) {dictRehashMilliseconds(server.db[dbid].expires,1);return 1; /* already used our millisecond for this loop... */}return 0;
}
如果检测到该db处于散列中那么就调用下面这个方法执行1ms的任务
/* Rehash in msdelta milliseconds. The value of delta is larger * than 0, and is smaller than 1 in most cases. The exact upper bound * depends on the running time of dictRehash(d,100).*/
int dictRehashMilliseconds(dict *d, int ms) {// 如果 dict 的 pauserehash0那么就暂停rehashif (d-pauserehash 0) return 0;long long start timeInMilliseconds();int rehashes 0;// while循环调用 dictRehash,每次执行100任务量执行完后就计算下剩余时间不够就结束while(dictRehash(d,100)) {rehashes 100;if (timeInMilliseconds()-start ms) break;}return rehashes;
}核心方法rehash我们送进来的n100
int dictRehash(dict *d, int n) {// n * 100,这是最大允许访问的空插槽int empty_visits n*10; /* Max number of empty buckets to visit. */unsigned long s0 d-ht[0].size;unsigned long s1 d-ht[1].size;if (dict_can_resize DICT_RESIZE_FORBID || !dictIsRehashing(d)) return 0;if (dict_can_resize DICT_RESIZE_AVOID ((s1 s0 s1 / s0 dict_force_resize_ratio) ||(s1 s0 s0 / s1 dict_force_resize_ratio))){return 0;}// while循环迭代次数最大为n同时确保ht[0]剩余简键值对0while(n-- d-ht[0].used ! 0) {dictEntry *de, *nextde;assert(d-ht[0].size (unsigned long)d-rehashidx);// 需要注意的是rehashidx是插槽索引简单说就是dictht结构体数组指针这个数组的索引while(d-ht[0].table[d-rehashidx] NULL) {// 当插槽位置为NULL那么rehashidxd-rehashidx;// 如果空插槽碰到的太多超过100*n也不做了怕时间花销太大if (--empty_visits 0) return 1;}// 取出rehashidx处的头部dictEntryde d-ht[0].table[d-rehashidx];// 将该插槽下所有的dictEntry迁移到ht[1]中while(de) {uint64_t h;nextde de-next;// 从dictEntry中取出key然后与ht[1]的sizemask做取模得到索引h dictHashKey(d, de-key) d-ht[1].sizemask;// 这里怕迷糊de是当前entrynextde是下一个entry// 首先将de设置到当前entry的next指针设置为ht[1]中插槽头entryde-next d-ht[1].table[h];// 然后将ht[1]的头entry换成当前entryd-ht[1].table[h] de;// used 统计计算d-ht[0].used--;d-ht[1].used;// 然后重用de变量存储下一个entry然后循环起来直刀最后一个冲突键为止de nextde;}// 将当前rehashidx位置设置NULL毕竟都迁移完了然后 rehashidxd-ht[0].table[d-rehashidx] NULL;d-rehashidx;}// 如果整个表都被rehash完了那么释放内存然后ht[0]和ht[1]对掉并下掉rehashidx标志if (d-ht[0].used 0) {zfree(d-ht[0].table);d-ht[0] d-ht[1];_dictReset(d-ht[1]);d-rehashidx -1;return 0;}/* More to rehash... */return 1;
}
3.4 单步rehash
其实除了定时时间任务里的增量rehash之外在正常命令处理时也会随时做一点rehash我称呼它为单步rehahs主要是为了贴合它起的名字 /* This function performs just a step of rehashing, and only if hashing has* not been paused for our hash table. When we have iterators in the* middle of a rehashing we cant mess with the two hash tables otherwise* some element can be missed or duplicated.** This function is called by common lookup or update operations in the* dictionary so that the hash table automatically migrates from H1 to H2* while it is actively used. */
static void _dictRehashStep(dict *d) {if (d-pauserehash 0) dictRehash(d,1);
}
该函数与前面增量rehahs不同的点在于他n 不是100次而是 1次。除此之外没啥任何区别。
我们下面大概列举一下会出发单步rehash的场景
向db中插入新的键值对向db删除一个键向db发起查询操作 3.5 迭代器
Jababoy 应该比较数据首先就是获取迭代器Redis 中有好多种 iterator我们首先介绍散列类型的就是db内的某个map上的hkeys hashTypeIterator *hashTypeInitIterator(robj *subject) {// 函数传入的subject就是我们的迭代目标比方我们这里时在处理散列结构// 那么送入的就可能是ziplist也可能是hash编码的hashTypeIterator *hi zmalloc(sizeof(hashTypeIterator));hi-subject subject;hi-encoding subject-encoding;if (hi-encoding OBJ_ENCODING_ZIPLIST) {// 如果是ziplist编码hi-fptr NULL;hi-vptr NULL;} else if (hi-encoding OBJ_ENCODING_HT) {// 如果是hash编码hi-di dictGetIterator(subject-ptr);} else {serverPanic(Unknown hash encoding);}return hi;
}
dictIterator *dictGetIterator(dict *d)
{// 初始化一个dictIterator结构dictIterator *iter zmalloc(sizeof(*iter));iter-d d;iter-table 0;iter-index -1;iter-safe 0;iter-entry NULL;iter-nextEntry NULL;return iter;
}
我们看一下迭代器的数据结构hashTypeIterator 内部嵌套了一个 dictIterator结构原因是因为map有两种编码所以为了给使用者一个公共API就出现了hashTypeIterator由它来负责调度是走ziplist的迭代器还是hash的迭代器。
typedef struct {robj *subject;int encoding;unsigned char *fptr, *vptr;dictIterator *di;dictEntry *de;
} hashTypeIterator;typedef struct dictIterator {dict *d;long index;int table, safe;dictEntry *entry, *nextEntry;/* unsafe iterator fingerprint for misuse detection. */unsigned long long fingerprint;
} dictIterator; 初始化完我们继续看下一个函数 hashTypeNext对应 Java 的就是 next() 方法 int hashTypeNext(hashTypeIterator *hi) {// 如果是ziplist编码的迭代器if (hi-encoding OBJ_ENCODING_ZIPLIST) {unsigned char *zl;unsigned char *fptr, *vptr;// hi 是迭代器subject是ziplist的robj用ptr可以直接指针到ziplistzl hi-subject-ptr;// 下面这俩玩意吃实话时就是NULLfptr hi-fptr;vptr hi-vptr;if (fptr NULL) {// 初始化cursorserverAssert(vptr NULL);fptr ziplistIndex(zl, 0);} else {/* Advance cursor */serverAssert(vptr ! NULL);fptr ziplistNext(zl, vptr);}if (fptr NULL) return C_ERR;/* Grab pointer to the value (fptr points to the field) */vptr ziplistNext(zl, fptr);serverAssert(vptr ! NULL);/* fptr, vptr now point to the first or next pair */// hi-fptr fptr;hi-vptr vptr;} else if (hi-encoding OBJ_ENCODING_HT) {// 如果是哈希编码的迭代器if ((hi-de dictNext(hi-di)) NULL) return C_ERR;} else {serverPanic(Unknown hash encoding);}return C_OK;
}
由于ziplist本身就是有序的它是按照插入顺序存储的所以如果是ziplis编码的话迭代器就是直接使用了ziplist本身的特性进行迭代。
而对于hash编码的case我们需要重点研究dictNex函数的实现
dictEntry *dictNext(dictIterator *iter)
{while (1) {if (iter-entry NULL) {// 初始化时iter-table 就是0表示选择ht[0]开始dictht *ht iter-d-ht[iter-table];// 其实下面这俩判断就是index和table的初始值当首次执行时会根据// 迭代饿模式来区别设置如果是安全模式则设置dict的pasuerRehash// 如果是非安全模式则计算一个词是dict各个状态字段汇聚出来的一个指纹哈希值if (iter-index -1 iter-table 0) {if (iter-safe)dictPauseRehashing(iter-d);elseiter-fingerprint dictFingerprint(iter-d);}// index开始扫描哈希它就像rehash里的rehashidxiter-index;// 这个条件是判断是否当前index已经超过当前扫描的dictht的size最大插槽数了if (iter-index (long) ht-size) {// 当dict处于rehash中且现在是扫描0号表那么就切换到1号表index归零if (dictIsRehashing(iter-d) iter-table 0) {iter-table;iter-index 0;ht iter-d-ht[1];} else {break;}}// 将该插槽处的dictEntry设置到entry上然后iter-entry ht-table[iter-index];} else {// 这种case是因为之前设置过entry这里又进入循环了iter-entry iter-nextEntry;}// 如果entry非null则设置好nextEntry就可以返回了// 如果entry位null会进入while循环继续查找if (iter-entry) {/* We need to save the next here, the iterator user* may delete the entry we are returning. */iter-nextEntry iter-entry-next;return iter-entry;}}return NULL;
}
我们需要理解 dictIterator 的内部字段的含义
d 就是本迭代的 dict 对象【robj】entry 当前迭代到的元素nextEntry 下一个元素safe 是否安全模式迭代安全模式会暂停rehashfingerprint 指纹当不是安全模式时redis将dict里的usedsize等字段进行哈希计算得到一个指纹
我理解这个迭代器很容易理解线循环哈希插槽然后在插槽内再循环链表。