diff options
Diffstat (limited to 'examples/redis-unstable/src/kvstore.c')
| -rw-r--r-- | examples/redis-unstable/src/kvstore.c | 1171 |
1 files changed, 0 insertions, 1171 deletions
diff --git a/examples/redis-unstable/src/kvstore.c b/examples/redis-unstable/src/kvstore.c deleted file mode 100644 index fb9e7d5..0000000 --- a/examples/redis-unstable/src/kvstore.c +++ /dev/null @@ -1,1171 +0,0 @@ -/* - * Copyright (c) 2011-Present, Redis Ltd. and contributors. - * All rights reserved. - * - * Copyright (c) 2024-present, Valkey contributors. - * All rights reserved. - * - * Licensed under your choice of (a) the Redis Source Available License 2.0 - * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the - * GNU Affero General Public License v3 (AGPLv3). - * - * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. - */ - -#include "fmacros.h" - -#include <string.h> -#include <stddef.h> - -#include "zmalloc.h" -#include "kvstore.h" -#include "fwtree.h" -#include "redisassert.h" -#include "monotonic.h" - -#define UNUSED(V) ((void) V) - -struct _kvstore { - int flags; - kvstoreType *type; - dictType dtype; - dict **dicts; - long long num_dicts; - long long num_dicts_bits; - list *rehashing; /* List of dictionaries in this kvstore that are currently rehashing. */ - int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used if num_dicts > 1). */ - int allocated_dicts; /* The number of allocated dicts. */ - int non_empty_dicts; /* The number of non-empty dicts. */ - unsigned long long key_count; /* Total number of keys in this kvstore. */ - unsigned long long bucket_count; /* Total number of buckets in this kvstore across dictionaries. */ - fenwickTree *dict_sizes; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given dict-index. */ - size_t overhead_hashtable_rehashing; /* The overhead of dictionaries rehashing. */ - void *metadata[]; /* conditionally allocated based on "flags" */ -}; - -/**********************************/ -/*** Helpers **********************/ -/**********************************/ - -/* Get the dictionary pointer based on dict-index. */ -dict *kvstoreGetDict(kvstore *kvs, int didx) { - return kvs->dicts[didx]; -} - -static dict **kvstoreGetDictRef(kvstore *kvs, int didx) { - return &kvs->dicts[didx]; -} - -static int kvstoreDictIsRehashingPaused(kvstore *kvs, int didx) -{ - dict *d = kvstoreGetDict(kvs, didx); - return d ? dictIsRehashingPaused(d) : 0; -} - -static void addDictIndexToCursor(kvstore *kvs, int didx, unsigned long long *cursor) { - if (kvs->num_dicts == 1) - return; - /* didx can be -1 when iteration is over and there are no more dicts to visit. */ - if (didx < 0) - return; - *cursor = (*cursor << kvs->num_dicts_bits) | didx; -} - -static int getAndClearDictIndexFromCursor(kvstore *kvs, unsigned long long *cursor) { - if (kvs->num_dicts == 1) - return 0; - int didx = (int) (*cursor & (kvs->num_dicts-1)); - *cursor = *cursor >> kvs->num_dicts_bits; - return didx; -} - -/* Updates binary index tree (Fenwick tree), updates key count for a given dict */ -static void cumulativeKeyCountAdd(kvstore *kvs, int didx, long delta) { - kvs->key_count += delta; - - dict *d = kvstoreGetDict(kvs, didx); - size_t dsize = dictSize(d); - /* Increment if dsize is 1 and delta is positive (first element inserted, dict becomes non-empty). - * Decrement if dsize is 0 (dict becomes empty). */ - int non_empty_dicts_delta = (dsize == 1 && delta > 0) ? 1 : (dsize == 0) ? -1 : 0; - kvs->non_empty_dicts += non_empty_dicts_delta; - - /* BIT does not need to be calculated when there's only one dict. */ - if (kvs->num_dicts == 1) - return; - - /* Update the BIT */ - fwTreeUpdate(kvs->dict_sizes, didx, delta); -} - -/* Create the dict if it does not exist and return it. */ -static dict *createDictIfNeeded(kvstore *kvs, int didx) { - dict *d = kvstoreGetDict(kvs, didx); - if (d) return d; - - kvs->dicts[didx] = dictCreate(&kvs->dtype); - kvs->allocated_dicts++; - return kvs->dicts[didx]; -} - -/* Called when the dict will delete entries, the function will check - * KVSTORE_FREE_EMPTY_DICTS to determine whether the empty dict needs - * to be freed. - * - * Note that for rehashing dicts, that is, in the case of safe iterators - * and Scan, we won't delete the dict. We will check whether it needs - * to be deleted when we're releasing the iterator. */ -static void freeDictIfNeeded(kvstore *kvs, int didx) { - if (!(kvs->flags & KVSTORE_FREE_EMPTY_DICTS) || - !kvstoreGetDict(kvs, didx) || - kvstoreDictSize(kvs, didx) != 0 || - kvstoreDictIsRehashingPaused(kvs, didx)) - return; - - /* Use callback if provided to check if dict can be freed */ - if (kvs->type->canFreeDict && !kvs->type->canFreeDict(kvs, didx)) - return; - - dictRelease(kvs->dicts[didx]); - kvs->dicts[didx] = NULL; - kvs->allocated_dicts--; -} - -void kvstoreFreeDictIfNeeded(kvstore *kvs, int didx) { - freeDictIfNeeded(kvs, didx); -} - -/**********************************/ -/*** dict callbacks ***************/ -/**********************************/ - -/* Adds dictionary to the rehashing list, which allows us - * to quickly find rehash targets during incremental rehashing. - * - * If there are multiple dicts, updates the bucket count for the given dictionary - * in a DB, bucket count incremented with the new ht size during the rehashing phase. - * If there's one dict, bucket count can be retrieved directly from single dict bucket. */ -static void kvstoreDictRehashingStarted(dict *d) { - kvstore *kvs = d->type->userdata; - kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d); - listAddNodeTail(kvs->rehashing, d); - metadata->rehashing_node = listLast(kvs->rehashing); - - unsigned long long from, to; - dictRehashingInfo(d, &from, &to); - kvs->overhead_hashtable_rehashing += from; -} - -/* Remove dictionary from the rehashing list. - * - * Updates the bucket count for the given dictionary in a DB. It removes - * the old ht size of the dictionary from the total sum of buckets for a DB. */ -static void kvstoreDictRehashingCompleted(dict *d) { - kvstore *kvs = d->type->userdata; - kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d); - if (metadata->rehashing_node) { - listDelNode(kvs->rehashing, metadata->rehashing_node); - metadata->rehashing_node = NULL; - } - - unsigned long long from, to; - dictRehashingInfo(d, &from, &to); - kvs->overhead_hashtable_rehashing -= from; -} - -/* Updates the bucket count for the given dictionary in a DB. It adds the new ht size - * of the dictionary or removes the old ht size of the dictionary from the total - * sum of buckets for a DB. */ -static void kvstoreDictBucketChanged(dict *d, long long delta) { - kvstore *kvs = d->type->userdata; - kvs->bucket_count += delta; -} - -/* Returns the size of the DB dict extended metadata in bytes. */ -static size_t kvstoreDictBaseMetaSize(dict *d) { - UNUSED(d); - return sizeof(kvstoreDictMetaBase); -} - -/**********************************/ -/*** API **************************/ -/**********************************/ - -/* Create an array of dictionaries - * num_dicts_bits is the log2 of the amount of dictionaries needed (e.g. 0 for 1 dict, - * 3 for 8 dicts, etc.) */ -kvstore *kvstoreCreate(kvstoreType *type, dictType *dtype, int num_dicts_bits, int flags) { - /* We can't support more than 2^16 dicts because we want to save 48 bits - * for the dict cursor, see kvstoreScan */ - assert(num_dicts_bits <= 16); - assert(!type->dictMetadataBytes || type->dictMetadataBytes(NULL) >= sizeof(kvstoreDictMetaBase)); - - /* Calc kvstore size */ - size_t kvsize = sizeof(kvstore); - /* Conditionally calc also histogram size */ - if (type->kvstoreMetadataBytes) - kvsize += type->kvstoreMetadataBytes(NULL); - - kvstore *kvs = zcalloc(kvsize); - memcpy(&kvs->dtype, dtype, sizeof(kvs->dtype)); - kvs->flags = flags; - kvs->type = type; - - /* kvstore must be the one to set these callbacks, so we make sure the - * caller didn't do it */ - assert(!dtype->userdata); - assert(!dtype->dictMetadataBytes); - assert(!dtype->rehashingStarted); - assert(!dtype->rehashingCompleted); - kvs->dtype.userdata = kvs; - kvs->dtype.dictMetadataBytes = type->dictMetadataBytes ? - type->dictMetadataBytes : kvstoreDictBaseMetaSize; - kvs->dtype.rehashingStarted = kvstoreDictRehashingStarted; - kvs->dtype.rehashingCompleted = kvstoreDictRehashingCompleted; - kvs->dtype.bucketChanged = kvstoreDictBucketChanged; - - kvs->num_dicts_bits = num_dicts_bits; - kvs->num_dicts = 1 << kvs->num_dicts_bits; - kvs->dicts = zcalloc(sizeof(dict*) * kvs->num_dicts); - if (!(kvs->flags & KVSTORE_ALLOCATE_DICTS_ON_DEMAND)) { - for (int i = 0; i < kvs->num_dicts; i++) - createDictIfNeeded(kvs, i); - } - - kvs->rehashing = listCreate(); - kvs->key_count = 0; - kvs->non_empty_dicts = 0; - kvs->resize_cursor = 0; - kvs->dict_sizes = kvs->num_dicts > 1 ? fwTreeCreate(kvs->num_dicts_bits) : NULL; - kvs->bucket_count = 0; - kvs->overhead_hashtable_rehashing = 0; - return kvs; -} - -void kvstoreEmpty(kvstore *kvs, void(callback)(dict*)) { - for (int didx = 0; didx < kvs->num_dicts; didx++) { - dict *d = kvstoreGetDict(kvs, didx); - if (!d) - continue; - kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d); - if (metadata->rehashing_node) - metadata->rehashing_node = NULL; - dictEmpty(d, callback); - if (kvs->type->onDictEmpty) kvs->type->onDictEmpty(kvs, didx); - freeDictIfNeeded(kvs, didx); - } - - if (kvs->type->onKvstoreEmpty) kvs->type->onKvstoreEmpty(kvs); - - listEmpty(kvs->rehashing); - - kvs->key_count = 0; - kvs->non_empty_dicts = 0; - kvs->resize_cursor = 0; - kvs->bucket_count = 0; - if (kvs->dict_sizes) - fwTreeClear(kvs->dict_sizes); - kvs->overhead_hashtable_rehashing = 0; -} - -void kvstoreRelease(kvstore *kvs) { - for (int didx = 0; didx < kvs->num_dicts; didx++) { - dict *d = kvstoreGetDict(kvs, didx); - if (!d) - continue; - kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d); - if (metadata->rehashing_node) - metadata->rehashing_node = NULL; - if (kvs->type->onDictEmpty) kvs->type->onDictEmpty(kvs, didx); - dictRelease(d); - } - zfree(kvs->dicts); - - listRelease(kvs->rehashing); - if (kvs->dict_sizes) - fwTreeDestroy(kvs->dict_sizes); - - zfree(kvs); -} - -unsigned long long int kvstoreSize(kvstore *kvs) { - return kvs->key_count; -} - -/* This method provides the cumulative sum of all the dictionary buckets - * across dictionaries in a database. */ -unsigned long kvstoreBuckets(kvstore *kvs) { - if (kvs->num_dicts != 1) { - return kvs->bucket_count; - } else { - return kvs->dicts[0]? dictBuckets(kvs->dicts[0]) : 0; - } -} - -size_t kvstoreMemUsage(kvstore *kvs) { - size_t mem = sizeof(*kvs); - size_t metaSize = kvs->dtype.dictMetadataBytes(NULL); - unsigned long long keys_count = kvstoreSize(kvs); - mem += keys_count * dictEntryMemUsage(kvs->dtype.no_value) + - kvstoreBuckets(kvs) * sizeof(dictEntry*) + - kvs->allocated_dicts * (sizeof(dict) + metaSize); - - /* Values are dict* shared with kvs->dicts */ - mem += listLength(kvs->rehashing) * sizeof(listNode); - - return mem; -} - -/* - * This method is used to iterate over the elements of the entire kvstore specifically across dicts. - * It's a three pronged approach. - * - * 1. It uses the provided cursor `cursor` to retrieve the dict index from it. - * 2. If the dictionary is in a valid state checked through the provided callback `dictScanValidFunction`, - * it performs a dictScan over the appropriate `keyType` dictionary of `db`. - * 3. If the dict is entirely scanned i.e. the cursor has reached 0, the next non empty dict is discovered. - * The dict information is embedded into the cursor and returned. - * - * To restrict the scan to a single dict, pass a valid dict index as - * 'onlydidx', otherwise pass -1. - */ -unsigned long long kvstoreScan(kvstore *kvs, unsigned long long cursor, - int onlydidx, dictScanFunction *scan_cb, - kvstoreScanShouldSkipDict *skip_cb, - void *privdata) -{ - unsigned long long _cursor = 0; - /* During dictionary traversal, 48 upper bits in the cursor are used for positioning in the HT. - * Following lower bits are used for the dict index number, ranging from 0 to 2^num_dicts_bits-1. - * Dict index is always 0 at the start of iteration and can be incremented only if there are - * multiple dicts. */ - int didx = getAndClearDictIndexFromCursor(kvs, &cursor); - if (onlydidx >= 0) { - if (didx < onlydidx) { - /* Fast-forward to onlydidx. */ - assert(onlydidx < kvs->num_dicts); - didx = onlydidx; - cursor = 0; - } else if (didx > onlydidx) { - /* The cursor is already past onlydidx. */ - return 0; - } - } - - dict *d = kvstoreGetDict(kvs, didx); - - int skip = !d || (skip_cb && skip_cb(d, didx)); - if (!skip) { - _cursor = dictScan(d, cursor, scan_cb, privdata); - /* In dictScan, scan_cb may delete entries (e.g., in active expire case). */ - freeDictIfNeeded(kvs, didx); - } - /* scanning done for the current dictionary or if the scanning wasn't possible, move to the next dict index. */ - if (_cursor == 0 || skip) { - if (onlydidx >= 0) - return 0; - didx = kvstoreGetNextNonEmptyDictIndex(kvs, didx); - } - if (didx == -1) { - return 0; - } - addDictIndexToCursor(kvs, didx, &_cursor); - return _cursor; -} - -/* - * This functions increases size of kvstore to match desired number. - * It resizes all individual dictionaries, unless skip_cb indicates otherwise. - * - * Based on the parameter `try_expand`, appropriate dict expand API is invoked. - * if try_expand is set to 1, `dictTryExpand` is used else `dictExpand`. - * The return code is either `DICT_OK`/`DICT_ERR` for both the API(s). - * `DICT_OK` response is for successful expansion. However, `DICT_ERR` response signifies failure in allocation in - * `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed. - */ -int kvstoreExpand(kvstore *kvs, uint64_t newsize, int try_expand, kvstoreExpandShouldSkipDictIndex *skip_cb) { - for (int i = 0; i < kvs->num_dicts; i++) { - dict *d = kvstoreGetDict(kvs, i); - if (!d || (skip_cb && skip_cb(i))) - continue; - int result = try_expand ? dictTryExpand(d, newsize) : dictExpand(d, newsize); - if (try_expand && result == DICT_ERR) - return 0; - } - - return 1; -} - -/* Returns fair random dict index, probability of each dict being returned is - * proportional to the number of elements that dictionary holds. - * This function guarantees that it returns a dict-index of a non-empty dict, - * unless the entire kvstore is empty or all dicts are skipped. - * - * Parameters: - * - kvs: the kvstore instance - * - skip_cb: callback to determine if a dict should be skipped (NULL means no skipping) - * - fair_attempts: number of fair selection attempts before falling back - * - slow_fallback: if 1, uses systematic search when fair attempts fail - * - * Returns: - * - Valid dict index (>= 0) on success - * - -1 if no valid dict found (either slow_fallback is 0 or all dicts are skipped) - * - * Time complexity: O(fair_attempts * log(kvs->num_dicts)) for fair attempts, - * plus O(kvs->num_dicts) for systematic fallback if enabled. - */ -int kvstoreGetFairRandomDictIndex(kvstore *kvs, kvstoreRandomShouldSkipDictIndex *skip_cb, - int fair_attempts, int slow_fallback) -{ - if (kvs->num_dicts == 1 || kvstoreSize(kvs) == 0) - return 0; - - unsigned long long total_size = kvstoreSize(kvs); - - /* Try fair attempts first. If skip_cb is not applicable, execute only once. */ - for (int attempt = 0; attempt < fair_attempts; attempt++) { - unsigned long target = (randomULong() % total_size) + 1; - int didx = kvstoreFindDictIndexByKeyIndex(kvs, target); - if (!skip_cb || !skip_cb(didx)) { - return didx; - } - } - - /* If fair attempts failed and slow fallback is allowed */ - if (slow_fallback) { - /* systematic check from random start */ - int start = randomULong() % kvs->num_dicts; - for (int i = 0; i < kvs->num_dicts; i++) { - int didx = (start + i) % kvs->num_dicts; - dict *d = kvstoreGetDict(kvs, didx); - if (d && (!skip_cb || !skip_cb(didx))) { - return didx; - } - } - } - - /* Failed to find valid dict that has elements */ - return -1; -} - -void kvstoreGetStats(kvstore *kvs, char *buf, size_t bufsize, int full) { - buf[0] = '\0'; - - size_t l; - char *orig_buf = buf; - size_t orig_bufsize = bufsize; - dictStats *mainHtStats = NULL; - dictStats *rehashHtStats = NULL; - dict *d; - kvstoreIterator kvs_it; - - kvstoreIteratorInit(&kvs_it, kvs); - while ((d = kvstoreIteratorNextDict(&kvs_it))) { - dictStats *stats = dictGetStatsHt(d, 0, full); - if (!mainHtStats) { - mainHtStats = stats; - } else { - dictCombineStats(stats, mainHtStats); - dictFreeStats(stats); - } - if (dictIsRehashing(d)) { - stats = dictGetStatsHt(d, 1, full); - if (!rehashHtStats) { - rehashHtStats = stats; - } else { - dictCombineStats(stats, rehashHtStats); - dictFreeStats(stats); - } - } - } - kvstoreIteratorReset(&kvs_it); - - if (mainHtStats && bufsize > 0) { - l = dictGetStatsMsg(buf, bufsize, mainHtStats, full); - dictFreeStats(mainHtStats); - buf += l; - bufsize -= l; - } - - if (rehashHtStats && bufsize > 0) { - l = dictGetStatsMsg(buf, bufsize, rehashHtStats, full); - dictFreeStats(rehashHtStats); - buf += l; - bufsize -= l; - } - /* Make sure there is a NULL term at the end. */ - if (orig_bufsize) orig_buf[orig_bufsize - 1] = '\0'; -} - -/* Finds a dict containing target element in a key space ordered by dict index. - * Consider this example. Dictionaries are represented by brackets and keys by dots: - * #0 #1 #2 #3 #4 - * [..][....][...][.......][.] - * ^ - * target - * - * In this case dict #3 contains key that we are trying to find. - * - * The return value is 0 based dict-index, and the range of the target is [1..kvstoreSize], kvstoreSize inclusive. - * - * To find the dict, we start with the root node of the binary index tree and search through its children - * from the highest index (2^num_dicts_bits in our case) to the lowest index. At each node, we check if the target - * value is greater than the node's value. If it is, we remove the node's value from the target and recursively - * search for the new target using the current node as the parent. - * Time complexity of this function is O(log(kvs->num_dicts)) - */ -int kvstoreFindDictIndexByKeyIndex(kvstore *kvs, unsigned long target) { - if (kvs->num_dicts == 1 || kvstoreSize(kvs) == 0) - return 0; - assert(target <= kvstoreSize(kvs)); - - return fwTreeFindIndex(kvs->dict_sizes, target); -} - -/* Get the first non-empty dict index in the kvstore. Returns -1 if kvstore is empty. */ -int kvstoreGetFirstNonEmptyDictIndex(kvstore *kvs) { - if (kvstoreSize(kvs) == 0) - return -1; - if (kvs->num_dicts == 1) - return 0; - return fwTreeFindFirstNonEmpty(kvs->dict_sizes); -} - -/* Returns next non-empty dict index strictly after given one, or -1 if provided didx is the last one. */ -int kvstoreGetNextNonEmptyDictIndex(kvstore *kvs, int didx) { - if (kvs->num_dicts == 1) { - assert(didx == 0); - return -1; - } - return fwTreeFindNextNonEmpty(kvs->dict_sizes, didx); -} - -int kvstoreNumNonEmptyDicts(kvstore *kvs) { - return kvs->non_empty_dicts; -} - -int kvstoreNumAllocatedDicts(kvstore *kvs) { - return kvs->allocated_dicts; -} - -int kvstoreNumDicts(kvstore *kvs) { - return kvs->num_dicts; -} - -/* Move dict from one kvstore to another. */ -void kvstoreMoveDict(kvstore *kvs, kvstore *dst, int didx) { - assert(kvs->num_dicts > didx); - assert(kvs->num_dicts == dst->num_dicts); - assert(dst->dicts[didx] == NULL); - - dict *d = kvs->dicts[didx]; - if (d == NULL) return; - - /* Adjust source kvstore */ - kvs->allocated_dicts -= 1; - cumulativeKeyCountAdd(kvs, didx, -((long long)dictSize(d))); - kvstoreDictBucketChanged(d, -((long long) dictBuckets(d))); - /* If rehashing, stop it. */ - if (dictIsRehashing(d)) - kvstoreDictRehashingCompleted(d); - /* Clear dict from source kvstore and create a new one if needed */ - kvs->dicts[didx] = NULL; - if (!(kvs->flags & (KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS))) - createDictIfNeeded(kvs, didx); - - /* Move dict to destination kvstore */ - dst->dicts[didx] = d; - dst->dicts[didx]->type = &dst->dtype; - dst->allocated_dicts += 1; - cumulativeKeyCountAdd(dst, didx, dictSize(d)); - kvstoreDictBucketChanged(d, dictBuckets(d)); - if (dictIsRehashing(dst->dicts[didx])) - kvstoreDictRehashingStarted(dst->dicts[didx]); -} - -/* Returns kvstore iterator that can be used to iterate through sub-dictionaries. - * - * The caller should reset kvs_it with kvstoreIteratorReset. */ -void kvstoreIteratorInit(kvstoreIterator *kvs_it, kvstore *kvs) { - kvs_it->kvs = kvs; - kvs_it->didx = -1; - kvs_it->next_didx = kvstoreGetFirstNonEmptyDictIndex(kvs_it->kvs); /* Finds first non-empty dict index. */ - dictInitSafeIterator(&kvs_it->di, NULL); -} - -/* Free the kvs_it returned by kvstoreIteratorInit. */ -void kvstoreIteratorReset(kvstoreIterator *kvs_it) { - dictIterator *iter = &kvs_it->di; - dictResetIterator(iter); - /* In the safe iterator context, we may delete entries. */ - if (kvs_it->didx != -1) - freeDictIfNeeded(kvs_it->kvs, kvs_it->didx); -} - -/* Returns next dictionary from the iterator, or NULL if iteration is complete. - * - * - Takes care to reset the iter of the previous dict before moved to the next dict. - */ -dict *kvstoreIteratorNextDict(kvstoreIterator *kvs_it) { - if (kvs_it->next_didx == -1) - return NULL; - - /* The dict may be deleted during the iteration process, so here need to check for NULL. */ - if (kvs_it->didx != -1 && kvstoreGetDict(kvs_it->kvs, kvs_it->didx)) { - /* Before we move to the next dict, reset the iter of the previous dict. */ - dictIterator *iter = &kvs_it->di; - dictResetIterator(iter); - /* In the safe iterator context, we may delete entries. */ - freeDictIfNeeded(kvs_it->kvs, kvs_it->didx); - } - - kvs_it->didx = kvs_it->next_didx; - kvs_it->next_didx = kvstoreGetNextNonEmptyDictIndex(kvs_it->kvs, kvs_it->didx); - return kvs_it->kvs->dicts[kvs_it->didx]; -} - -int kvstoreIteratorGetCurrentDictIndex(kvstoreIterator *kvs_it) { - assert(kvs_it->didx >= 0 && kvs_it->didx < kvs_it->kvs->num_dicts); - return kvs_it->didx; -} - -/* Returns next entry. */ -dictEntry *kvstoreIteratorNext(kvstoreIterator *kvs_it) { - dictEntry *de = kvs_it->di.d ? dictNext(&kvs_it->di) : NULL; - if (!de) { /* No current dict or reached the end of the dictionary. */ - - /* Before we move to the next dict, function kvstoreIteratorNextDict() - * reset the iter of the previous dict & freeDictIfNeeded(). */ - dict *d = kvstoreIteratorNextDict(kvs_it); - - if (!d) - return NULL; - - dictInitSafeIterator(&kvs_it->di, d); - de = dictNext(&kvs_it->di); - } - return de; -} - -/* This method traverses through kvstore dictionaries and triggers a resize. - * It first tries to shrink if needed, and if it isn't, it tries to expand. */ -void kvstoreTryResizeDicts(kvstore *kvs, int limit) { - if (limit > kvs->num_dicts) - limit = kvs->num_dicts; - - for (int i = 0; i < limit; i++) { - int didx = kvs->resize_cursor; - dict *d = kvstoreGetDict(kvs, didx); - if (d && dictShrinkIfNeeded(d) == DICT_ERR) { - dictExpandIfNeeded(d); - } - kvs->resize_cursor = (didx + 1) % kvs->num_dicts; - } -} - -/* Our hash table implementation performs rehashing incrementally while - * we write/read from the hash table. Still if the server is idle, the hash - * table will use two tables for a long time. So we try to use threshold_us - * of CPU time at every call of this function to perform some rehashing. - * - * The function returns the amount of microsecs spent if some rehashing was - * performed, otherwise 0 is returned. */ -uint64_t kvstoreIncrementallyRehash(kvstore *kvs, uint64_t threshold_us) { - if (listLength(kvs->rehashing) == 0) - return 0; - - /* Our goal is to rehash as many dictionaries as we can before reaching threshold_us, - * after each dictionary completes rehashing, it removes itself from the list. */ - listNode *node; - monotime timer; - uint64_t elapsed_us = 0; - elapsedStart(&timer); - while ((node = listFirst(kvs->rehashing))) { - dictRehashMicroseconds(listNodeValue(node), threshold_us - elapsed_us); - - elapsed_us = elapsedUs(timer); - if (elapsed_us >= threshold_us) { - break; /* Reached the time limit. */ - } - } - return elapsed_us; -} - -size_t kvstoreOverheadHashtableLut(kvstore *kvs) { - return kvs->bucket_count * sizeof(dictEntry *); -} - -size_t kvstoreOverheadHashtableRehashing(kvstore *kvs) { - return kvs->overhead_hashtable_rehashing * sizeof(dictEntry *); -} - -unsigned long kvstoreDictRehashingCount(kvstore *kvs) { - return listLength(kvs->rehashing); -} - -unsigned long kvstoreDictSize(kvstore *kvs, int didx) -{ - dict *d = kvstoreGetDict(kvs, didx); - if (!d) - return 0; - return dictSize(d); -} - -void kvstoreInitDictIterator(kvstoreDictIterator *kvs_di, kvstore *kvs, int didx) -{ - kvs_di->kvs = kvs; - kvs_di->didx = didx; - dictInitIterator(&kvs_di->di, kvstoreGetDict(kvs, didx)); -} - -void kvstoreInitDictSafeIterator(kvstoreDictIterator *kvs_di, kvstore *kvs, int didx) -{ - kvs_di->kvs = kvs; - kvs_di->didx = didx; - dictInitSafeIterator(&kvs_di->di, kvstoreGetDict(kvs, didx)); -} - -/* Free the kvs_di returned by kvstoreGetDictIterator and kvstoreGetDictSafeIterator. */ -void kvstoreResetDictIterator(kvstoreDictIterator *kvs_di) -{ - /* The dict may be deleted during the iteration process, so here need to check for NULL. */ - if (kvstoreGetDict(kvs_di->kvs, kvs_di->didx)) { - dictResetIterator(&kvs_di->di); - /* In the safe iterator context, we may delete entries. */ - freeDictIfNeeded(kvs_di->kvs, kvs_di->didx); - } -} - -/* Get the next element of the dict through kvstoreDictIterator and dictNext. */ -dictEntry *kvstoreDictIteratorNext(kvstoreDictIterator *kvs_di) -{ - /* The dict may be deleted during the iteration process, so here need to check for NULL. */ - dict *d = kvstoreGetDict(kvs_di->kvs, kvs_di->didx); - if (!d) return NULL; - - return dictNext(&kvs_di->di); -} - -dictEntry *kvstoreDictGetRandomKey(kvstore *kvs, int didx) -{ - dict *d = kvstoreGetDict(kvs, didx); - if (!d) - return NULL; - return dictGetRandomKey(d); -} - -dictEntry *kvstoreDictGetFairRandomKey(kvstore *kvs, int didx) -{ - dict *d = kvstoreGetDict(kvs, didx); - if (!d) - return NULL; - return dictGetFairRandomKey(d); -} - -unsigned int kvstoreDictGetSomeKeys(kvstore *kvs, int didx, dictEntry **des, unsigned int count) -{ - dict *d = kvstoreGetDict(kvs, didx); - if (!d) - return 0; - return dictGetSomeKeys(d, des, count); -} - -int kvstoreDictExpand(kvstore *kvs, int didx, unsigned long size) -{ - dict *d = createDictIfNeeded(kvs, didx); - if (!d) - return DICT_ERR; - return dictExpand(d, size); -} - -unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata) -{ - dict *d = kvstoreGetDict(kvs, didx); - if (!d) - return 0; - return dictScanDefrag(d, v, fn, defragfns, privdata); -} - -/* Unlike kvstoreDictScanDefrag(), this method doesn't defrag the data(keys and values) - * within dict, it only reallocates the memory used by the dict structure itself using - * the provided allocation function. This feature was added for the active defrag feature. - * - * With 16k dictionaries for cluster mode with 1 shard, this operation may require substantial time - * to execute. A "cursor" is used to perform the operation iteratively. When first called, a - * cursor value of 0 should be provided. The return value is an updated cursor which should be - * provided on the next iteration. The operation is complete when 0 is returned. - * - * The 'defragfn' callback is called with a reference to the dict that callback can reallocate. */ -unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn) { - for (int didx = cursor; didx < kvs->num_dicts; didx++) { - dict **d = kvstoreGetDictRef(kvs, didx), *newd; - if (!*d) - continue; - if ((newd = defragfn(*d))) { - *d = newd; - - /* After defragmenting the dict, update its corresponding - * rehashing node in the kvstore's rehashing list. */ - kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(*d); - if (metadata->rehashing_node) - metadata->rehashing_node->value = *d; - } - return (didx + 1); - } - return 0; -} - -void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key) -{ - dict *d = kvstoreGetDict(kvs, didx); - if (!d) - return NULL; - assert(d->type->no_value == 0); - return dictFetchValue(d, key); -} - -dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key) { - dict *d = kvstoreGetDict(kvs, didx); - if (!d) - return NULL; - return dictFind(d, key); -} - -/* Find a link to a key in the specified kvstore. If not found return NULL. - * - * This function is a wrapper around dictFindLink(), used to locate a key in a dict - * from a kvstore. - * - * The caller may provide a bucket pointer to receive the reference to the bucket - * where the key is stored or need to be added. - * - * Returns: - * A reference to the dictEntry if found, otherwise NULL. - * - * Important: - * After calling kvstoreDictFindLink(), any necessary updates based on returned - * link or bucket must be made immediately after, commonly by kvstoreDictSetAtLink() - * without any operations in between that might modify the dict. Otherwise, - * the link or bucket may become invalid. Example usage: - * - * link = kvstoreDictFindLink(kvs, didx, key, &bucket); - * ... Do something, but don't modify kvs->dicts[didx] ... - * if (link) - * kvstoreDictSetAtLink(kvs, didx, kv, &link, 0); // Update existing entry - * else - * kvstoreDictSetAtLink(kvs, didx, kv, &bucket, 1); // Insert new entry - */ -dictEntryLink kvstoreDictFindLink(kvstore *kvs, int didx, void *key, dictEntryLink *bucket) { - if (bucket) *bucket = NULL; - dict *d = kvstoreGetDict(kvs, didx); - if (!d) return NULL; - return dictFindLink(d, key, bucket); -} - -/* Set a key (or key-value) in the specified kvstore. - * - * This function inserts a new key or updates an existing one, depending on - * the `newItem` flag. - * - * Parameters: - * link: - When `newItem` is set, `link` points to the bucket of the key. - * - When `newItem` is not set, `link` points to the link of the key. - * - If link is NULL, dictFindLink() will be called to locate the link. - * - * newItem: - If set, add a new key with a new dictEntry. - * - If not set, update the key of an existing dictEntry. - */ -void kvstoreDictSetAtLink(kvstore *kvs, int didx, void *kv, dictEntryLink *link, int newItem) { - dict *d; - if (newItem) { - d = createDictIfNeeded(kvs, didx); - dictSetKeyAtLink(d, kv, link, newItem); - cumulativeKeyCountAdd(kvs, didx, 1); /* must be called only after updating dict */ - } else { - d = kvstoreGetDict(kvs, didx); - dictSetKeyAtLink(d, kv, link, newItem); - } -} - -dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing) { - dict *d = createDictIfNeeded(kvs, didx); - dictEntry *ret = dictAddRaw(d, key, existing); - if (ret) - cumulativeKeyCountAdd(kvs, didx, 1); - return ret; -} - -void kvstoreDictSetKey(kvstore *kvs, int didx, dictEntry* de, void *key) { - dict *d = kvstoreGetDict(kvs, didx); - dictSetKey(d, de, key); -} - -void kvstoreDictSetVal(kvstore *kvs, int didx, dictEntry *de, void *val) { - dict *d = kvstoreGetDict(kvs, didx); - assert(d->type->no_value == 0); - dictSetVal(d, de, val); -} - -dictEntryLink kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, int *table_index) { - dict *d = kvstoreGetDict(kvs, didx); - if (!d) - return NULL; - return dictTwoPhaseUnlinkFind(kvstoreGetDict(kvs, didx), key, table_index); -} - -void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntryLink link, int table_index) { - dict *d = kvstoreGetDict(kvs, didx); - dictTwoPhaseUnlinkFree(d, link, table_index); - cumulativeKeyCountAdd(kvs, didx, -1); - freeDictIfNeeded(kvs, didx); -} - -int kvstoreDictDelete(kvstore *kvs, int didx, const void *key) { - dict *d = kvstoreGetDict(kvs, didx); - if (!d) - return DICT_ERR; - int ret = dictDelete(d, key); - if (ret == DICT_OK) { - cumulativeKeyCountAdd(kvs, didx, -1); - freeDictIfNeeded(kvs, didx); - } - return ret; -} - -void *kvstoreGetDictMeta(kvstore *kvs, int didx, int createIfNeeded) { - dict *d = kvstoreGetDict(kvs, didx); - if (!d) { - if (!createIfNeeded) return NULL; - d = createDictIfNeeded(kvs, didx); - } - return dictMetadata(d); -} - -void *kvstoreGetMetadata(kvstore *kvs) { - return &kvs->metadata; -} - -#ifdef REDIS_TEST -#include <stdio.h> -#include "testhelp.h" - -#define TEST(name) printf("test — %s\n", name); - -uint64_t hashTestCallback(const void *key) { - return dictGenHashFunction((unsigned char*)key, strlen((char*)key)); -} - -void freeTestCallback(dict *d, void *val) { - UNUSED(d); - zfree(val); -} - -void *defragAllocTest(void *ptr) { - size_t size = zmalloc_usable_size(ptr); - void *newptr = zmalloc(size); - memcpy(newptr, ptr, size); - zfree(ptr); - return newptr; -} - -dict *defragLUTTestCallback(dict *d) { - /* handle the dict struct */ - d = defragAllocTest(d); - /* handle the first hash table */ - d->ht_table[0] = defragAllocTest(d->ht_table[0]); - /* handle the second hash table */ - if (d->ht_table[1]) - d->ht_table[1] = defragAllocTest(d->ht_table[1]); - return d; -} - -dictType KvstoreDictTestType = { - hashTestCallback, - NULL, - NULL, - NULL, - freeTestCallback, - NULL, - NULL -}; - -kvstoreType KvstoreTestType = { - NULL, /* kvstore metadata size */ - NULL, /* dict metadata size */ - NULL, /* can free dict */ - NULL, /* on kvstore empty */ - NULL, /* on dict empty */ -}; - -char *stringFromInt(int value) { - char buf[32]; - int len; - char *s; - - len = snprintf(buf, sizeof(buf), "%d",value); - s = zmalloc(len+1); - memcpy(s, buf, len); - s[len] = '\0'; - return s; -} - -/* ./redis-server test kvstore */ -int kvstoreTest(int argc, char **argv, int flags) { - UNUSED(argc); - UNUSED(argv); - UNUSED(flags); - - int i; - void *key; - dictEntry *de; - kvstoreIterator kvs_it; - kvstoreDictIterator kvs_di; - - /* Test also dictType with no_value=1 */ - dictType KvstoreDictNovalTestType = KvstoreDictTestType; - KvstoreDictNovalTestType.no_value = 1; - - int didx = 0; - int curr_slot = 0; - kvstore *kvs1 = kvstoreCreate(&KvstoreTestType, &KvstoreDictTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND); - kvstore *kvs2 = kvstoreCreate(&KvstoreTestType, &KvstoreDictNovalTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS); - - TEST("Add 16 keys") { - for (i = 0; i < 16; i++) { - de = kvstoreDictAddRaw(kvs1, didx, stringFromInt(i), NULL); - assert(de != NULL); - de = kvstoreDictAddRaw(kvs2, didx, stringFromInt(i), NULL); - assert(de != NULL); - } - assert(kvstoreDictSize(kvs1, didx) == 16); - assert(kvstoreSize(kvs1) == 16); - assert(kvstoreDictSize(kvs2, didx) == 16); - assert(kvstoreSize(kvs2) == 16); - } - - TEST("kvstoreIterator creating and releasing without kvstoreIteratorNextDict()") { - kvstore *kvs = kvstoreCreate(&KvstoreTestType, &KvstoreDictNovalTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS); - kvstoreIterator kvs_iter; - kvstoreIteratorInit(&kvs_iter, kvs); - kvstoreIteratorReset(&kvs_iter); - kvstoreRelease(kvs); - } - - TEST("kvstoreIterator case 1: removing all keys does not delete the empty dict") { - kvstoreIteratorInit(&kvs_it, kvs1); - while((de = kvstoreIteratorNext(&kvs_it)) != NULL) { - curr_slot = kvstoreIteratorGetCurrentDictIndex(&kvs_it); - key = dictGetKey(de); - assert(kvstoreDictDelete(kvs1, curr_slot, key) == DICT_OK); - } - kvstoreIteratorReset(&kvs_it); - - dict *d = kvstoreGetDict(kvs1, didx); - assert(d != NULL); - assert(kvstoreDictSize(kvs1, didx) == 0); - assert(kvstoreSize(kvs1) == 0); - } - - TEST("kvstoreIterator case 2: removing all keys will delete the empty dict") { - kvstoreIteratorInit(&kvs_it, kvs2); - while((de = kvstoreIteratorNext(&kvs_it)) != NULL) { - curr_slot = kvstoreIteratorGetCurrentDictIndex(&kvs_it); - key = dictGetKey(de); - assert(kvstoreDictDelete(kvs2, curr_slot, key) == DICT_OK); - } - kvstoreIteratorReset(&kvs_it); - - /* Make sure the dict was removed from the rehashing list. */ - while (kvstoreIncrementallyRehash(kvs2, 1000)) {} - - dict *d = kvstoreGetDict(kvs2, didx); - assert(d == NULL); - assert(kvstoreDictSize(kvs2, didx) == 0); - assert(kvstoreSize(kvs2) == 0); - } - - TEST("Add 16 keys again") { - for (i = 0; i < 16; i++) { - de = kvstoreDictAddRaw(kvs1, didx, stringFromInt(i), NULL); - assert(de != NULL); - de = kvstoreDictAddRaw(kvs2, didx, stringFromInt(i), NULL); - assert(de != NULL); - } - assert(kvstoreDictSize(kvs1, didx) == 16); - assert(kvstoreSize(kvs1) == 16); - assert(kvstoreDictSize(kvs2, didx) == 16); - assert(kvstoreSize(kvs2) == 16); - } - - TEST("kvstoreDictIterator case 1: removing all keys does not delete the empty dict") { - kvstoreInitDictSafeIterator(&kvs_di, kvs1, didx); - while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) { - key = dictGetKey(de); - assert(kvstoreDictDelete(kvs1, didx, key) == DICT_OK); - } - kvstoreResetDictIterator(&kvs_di); - - dict *d = kvstoreGetDict(kvs1, didx); - assert(d != NULL); - assert(kvstoreDictSize(kvs1, didx) == 0); - assert(kvstoreSize(kvs1) == 0); - } - - TEST("kvstoreDictIterator case 2: removing all keys will delete the empty dict") { - kvstoreInitDictSafeIterator(&kvs_di, kvs2, didx); - while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) { - key = dictGetKey(de); - assert(kvstoreDictDelete(kvs2, didx, key) == DICT_OK); - } - kvstoreResetDictIterator(&kvs_di); - - dict *d = kvstoreGetDict(kvs2, didx); - assert(d == NULL); - assert(kvstoreDictSize(kvs2, didx) == 0); - assert(kvstoreSize(kvs2) == 0); - } - - TEST("Verify that a rehashing dict's node in the rehashing list is correctly updated after defragmentation") { - unsigned long cursor = 0; - kvstore *kvs = kvstoreCreate(&KvstoreTestType, &KvstoreDictTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND); - for (i = 0; i < 256; i++) { - de = kvstoreDictAddRaw(kvs, 0, stringFromInt(i), NULL); - if (listLength(kvs->rehashing)) break; - } - assert(listLength(kvs->rehashing)); - while ((cursor = kvstoreDictLUTDefrag(kvs, cursor, defragLUTTestCallback)) != 0) {} - while (kvstoreIncrementallyRehash(kvs, 1000)) {} - kvstoreRelease(kvs); - } - - TEST("Verify non-empty dict count is correctly updated") { - kvstore *kvs = kvstoreCreate(&KvstoreTestType, &KvstoreDictTestType, 2, - KVSTORE_ALLOCATE_DICTS_ON_DEMAND); - for (int idx = 0; idx < 4; idx++) { - for (i = 0; i < 16; i++) { - de = kvstoreDictAddRaw(kvs, idx, stringFromInt(i), NULL); - assert(de != NULL); - /* When the first element is inserted, the number of non-empty dictionaries is increased by 1. */ - if (i == 0) assert(kvstoreNumNonEmptyDicts(kvs) == idx + 1); - } - } - - /* Step by step, clear all dictionaries and ensure non-empty dict count is updated */ - for (int idx = 0; idx < 4; idx++) { - kvstoreInitDictSafeIterator(&kvs_di, kvs, idx); - while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) { - key = dictGetKey(de); - assert(kvstoreDictDelete(kvs, idx, key) == DICT_OK); - /* When the dictionary is emptied, the number of non-empty dictionaries is reduced by 1. */ - if (kvstoreDictSize(kvs, idx) == 0) assert(kvstoreNumNonEmptyDicts(kvs) == 3 - idx); - } - kvstoreResetDictIterator(&kvs_di); - } - kvstoreRelease(kvs); - } - - kvstoreRelease(kvs1); - kvstoreRelease(kvs2); - return 0; -} -#endif |
