summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/kvstore.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/kvstore.c')
-rw-r--r--examples/redis-unstable/src/kvstore.c1171
1 files changed, 0 insertions, 1171 deletions
diff --git a/examples/redis-unstable/src/kvstore.c b/examples/redis-unstable/src/kvstore.c
deleted file mode 100644
index fb9e7d5..0000000
--- a/examples/redis-unstable/src/kvstore.c
+++ /dev/null
@@ -1,1171 +0,0 @@
-/*
- * Copyright (c) 2011-Present, Redis Ltd. and contributors.
- * All rights reserved.
- *
- * Copyright (c) 2024-present, Valkey contributors.
- * All rights reserved.
- *
- * Licensed under your choice of (a) the Redis Source Available License 2.0
- * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
- * GNU Affero General Public License v3 (AGPLv3).
- *
- * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
- */
-
-#include "fmacros.h"
-
-#include <string.h>
-#include <stddef.h>
-
-#include "zmalloc.h"
-#include "kvstore.h"
-#include "fwtree.h"
-#include "redisassert.h"
-#include "monotonic.h"
-
-#define UNUSED(V) ((void) V)
-
-struct _kvstore {
- int flags;
- kvstoreType *type;
- dictType dtype;
- dict **dicts;
- long long num_dicts;
- long long num_dicts_bits;
- list *rehashing; /* List of dictionaries in this kvstore that are currently rehashing. */
- int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used if num_dicts > 1). */
- int allocated_dicts; /* The number of allocated dicts. */
- int non_empty_dicts; /* The number of non-empty dicts. */
- unsigned long long key_count; /* Total number of keys in this kvstore. */
- unsigned long long bucket_count; /* Total number of buckets in this kvstore across dictionaries. */
- fenwickTree *dict_sizes; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given dict-index. */
- size_t overhead_hashtable_rehashing; /* The overhead of dictionaries rehashing. */
- void *metadata[]; /* conditionally allocated based on "flags" */
-};
-
-/**********************************/
-/*** Helpers **********************/
-/**********************************/
-
-/* Get the dictionary pointer based on dict-index. */
-dict *kvstoreGetDict(kvstore *kvs, int didx) {
- return kvs->dicts[didx];
-}
-
-static dict **kvstoreGetDictRef(kvstore *kvs, int didx) {
- return &kvs->dicts[didx];
-}
-
-static int kvstoreDictIsRehashingPaused(kvstore *kvs, int didx)
-{
- dict *d = kvstoreGetDict(kvs, didx);
- return d ? dictIsRehashingPaused(d) : 0;
-}
-
-static void addDictIndexToCursor(kvstore *kvs, int didx, unsigned long long *cursor) {
- if (kvs->num_dicts == 1)
- return;
- /* didx can be -1 when iteration is over and there are no more dicts to visit. */
- if (didx < 0)
- return;
- *cursor = (*cursor << kvs->num_dicts_bits) | didx;
-}
-
-static int getAndClearDictIndexFromCursor(kvstore *kvs, unsigned long long *cursor) {
- if (kvs->num_dicts == 1)
- return 0;
- int didx = (int) (*cursor & (kvs->num_dicts-1));
- *cursor = *cursor >> kvs->num_dicts_bits;
- return didx;
-}
-
-/* Updates binary index tree (Fenwick tree), updates key count for a given dict */
-static void cumulativeKeyCountAdd(kvstore *kvs, int didx, long delta) {
- kvs->key_count += delta;
-
- dict *d = kvstoreGetDict(kvs, didx);
- size_t dsize = dictSize(d);
- /* Increment if dsize is 1 and delta is positive (first element inserted, dict becomes non-empty).
- * Decrement if dsize is 0 (dict becomes empty). */
- int non_empty_dicts_delta = (dsize == 1 && delta > 0) ? 1 : (dsize == 0) ? -1 : 0;
- kvs->non_empty_dicts += non_empty_dicts_delta;
-
- /* BIT does not need to be calculated when there's only one dict. */
- if (kvs->num_dicts == 1)
- return;
-
- /* Update the BIT */
- fwTreeUpdate(kvs->dict_sizes, didx, delta);
-}
-
-/* Create the dict if it does not exist and return it. */
-static dict *createDictIfNeeded(kvstore *kvs, int didx) {
- dict *d = kvstoreGetDict(kvs, didx);
- if (d) return d;
-
- kvs->dicts[didx] = dictCreate(&kvs->dtype);
- kvs->allocated_dicts++;
- return kvs->dicts[didx];
-}
-
-/* Called when the dict will delete entries, the function will check
- * KVSTORE_FREE_EMPTY_DICTS to determine whether the empty dict needs
- * to be freed.
- *
- * Note that for rehashing dicts, that is, in the case of safe iterators
- * and Scan, we won't delete the dict. We will check whether it needs
- * to be deleted when we're releasing the iterator. */
-static void freeDictIfNeeded(kvstore *kvs, int didx) {
- if (!(kvs->flags & KVSTORE_FREE_EMPTY_DICTS) ||
- !kvstoreGetDict(kvs, didx) ||
- kvstoreDictSize(kvs, didx) != 0 ||
- kvstoreDictIsRehashingPaused(kvs, didx))
- return;
-
- /* Use callback if provided to check if dict can be freed */
- if (kvs->type->canFreeDict && !kvs->type->canFreeDict(kvs, didx))
- return;
-
- dictRelease(kvs->dicts[didx]);
- kvs->dicts[didx] = NULL;
- kvs->allocated_dicts--;
-}
-
-void kvstoreFreeDictIfNeeded(kvstore *kvs, int didx) {
- freeDictIfNeeded(kvs, didx);
-}
-
-/**********************************/
-/*** dict callbacks ***************/
-/**********************************/
-
-/* Adds dictionary to the rehashing list, which allows us
- * to quickly find rehash targets during incremental rehashing.
- *
- * If there are multiple dicts, updates the bucket count for the given dictionary
- * in a DB, bucket count incremented with the new ht size during the rehashing phase.
- * If there's one dict, bucket count can be retrieved directly from single dict bucket. */
-static void kvstoreDictRehashingStarted(dict *d) {
- kvstore *kvs = d->type->userdata;
- kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d);
- listAddNodeTail(kvs->rehashing, d);
- metadata->rehashing_node = listLast(kvs->rehashing);
-
- unsigned long long from, to;
- dictRehashingInfo(d, &from, &to);
- kvs->overhead_hashtable_rehashing += from;
-}
-
-/* Remove dictionary from the rehashing list.
- *
- * Updates the bucket count for the given dictionary in a DB. It removes
- * the old ht size of the dictionary from the total sum of buckets for a DB. */
-static void kvstoreDictRehashingCompleted(dict *d) {
- kvstore *kvs = d->type->userdata;
- kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d);
- if (metadata->rehashing_node) {
- listDelNode(kvs->rehashing, metadata->rehashing_node);
- metadata->rehashing_node = NULL;
- }
-
- unsigned long long from, to;
- dictRehashingInfo(d, &from, &to);
- kvs->overhead_hashtable_rehashing -= from;
-}
-
-/* Updates the bucket count for the given dictionary in a DB. It adds the new ht size
- * of the dictionary or removes the old ht size of the dictionary from the total
- * sum of buckets for a DB. */
-static void kvstoreDictBucketChanged(dict *d, long long delta) {
- kvstore *kvs = d->type->userdata;
- kvs->bucket_count += delta;
-}
-
-/* Returns the size of the DB dict extended metadata in bytes. */
-static size_t kvstoreDictBaseMetaSize(dict *d) {
- UNUSED(d);
- return sizeof(kvstoreDictMetaBase);
-}
-
-/**********************************/
-/*** API **************************/
-/**********************************/
-
-/* Create an array of dictionaries
- * num_dicts_bits is the log2 of the amount of dictionaries needed (e.g. 0 for 1 dict,
- * 3 for 8 dicts, etc.) */
-kvstore *kvstoreCreate(kvstoreType *type, dictType *dtype, int num_dicts_bits, int flags) {
- /* We can't support more than 2^16 dicts because we want to save 48 bits
- * for the dict cursor, see kvstoreScan */
- assert(num_dicts_bits <= 16);
- assert(!type->dictMetadataBytes || type->dictMetadataBytes(NULL) >= sizeof(kvstoreDictMetaBase));
-
- /* Calc kvstore size */
- size_t kvsize = sizeof(kvstore);
- /* Conditionally calc also histogram size */
- if (type->kvstoreMetadataBytes)
- kvsize += type->kvstoreMetadataBytes(NULL);
-
- kvstore *kvs = zcalloc(kvsize);
- memcpy(&kvs->dtype, dtype, sizeof(kvs->dtype));
- kvs->flags = flags;
- kvs->type = type;
-
- /* kvstore must be the one to set these callbacks, so we make sure the
- * caller didn't do it */
- assert(!dtype->userdata);
- assert(!dtype->dictMetadataBytes);
- assert(!dtype->rehashingStarted);
- assert(!dtype->rehashingCompleted);
- kvs->dtype.userdata = kvs;
- kvs->dtype.dictMetadataBytes = type->dictMetadataBytes ?
- type->dictMetadataBytes : kvstoreDictBaseMetaSize;
- kvs->dtype.rehashingStarted = kvstoreDictRehashingStarted;
- kvs->dtype.rehashingCompleted = kvstoreDictRehashingCompleted;
- kvs->dtype.bucketChanged = kvstoreDictBucketChanged;
-
- kvs->num_dicts_bits = num_dicts_bits;
- kvs->num_dicts = 1 << kvs->num_dicts_bits;
- kvs->dicts = zcalloc(sizeof(dict*) * kvs->num_dicts);
- if (!(kvs->flags & KVSTORE_ALLOCATE_DICTS_ON_DEMAND)) {
- for (int i = 0; i < kvs->num_dicts; i++)
- createDictIfNeeded(kvs, i);
- }
-
- kvs->rehashing = listCreate();
- kvs->key_count = 0;
- kvs->non_empty_dicts = 0;
- kvs->resize_cursor = 0;
- kvs->dict_sizes = kvs->num_dicts > 1 ? fwTreeCreate(kvs->num_dicts_bits) : NULL;
- kvs->bucket_count = 0;
- kvs->overhead_hashtable_rehashing = 0;
- return kvs;
-}
-
-void kvstoreEmpty(kvstore *kvs, void(callback)(dict*)) {
- for (int didx = 0; didx < kvs->num_dicts; didx++) {
- dict *d = kvstoreGetDict(kvs, didx);
- if (!d)
- continue;
- kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d);
- if (metadata->rehashing_node)
- metadata->rehashing_node = NULL;
- dictEmpty(d, callback);
- if (kvs->type->onDictEmpty) kvs->type->onDictEmpty(kvs, didx);
- freeDictIfNeeded(kvs, didx);
- }
-
- if (kvs->type->onKvstoreEmpty) kvs->type->onKvstoreEmpty(kvs);
-
- listEmpty(kvs->rehashing);
-
- kvs->key_count = 0;
- kvs->non_empty_dicts = 0;
- kvs->resize_cursor = 0;
- kvs->bucket_count = 0;
- if (kvs->dict_sizes)
- fwTreeClear(kvs->dict_sizes);
- kvs->overhead_hashtable_rehashing = 0;
-}
-
-void kvstoreRelease(kvstore *kvs) {
- for (int didx = 0; didx < kvs->num_dicts; didx++) {
- dict *d = kvstoreGetDict(kvs, didx);
- if (!d)
- continue;
- kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d);
- if (metadata->rehashing_node)
- metadata->rehashing_node = NULL;
- if (kvs->type->onDictEmpty) kvs->type->onDictEmpty(kvs, didx);
- dictRelease(d);
- }
- zfree(kvs->dicts);
-
- listRelease(kvs->rehashing);
- if (kvs->dict_sizes)
- fwTreeDestroy(kvs->dict_sizes);
-
- zfree(kvs);
-}
-
-unsigned long long int kvstoreSize(kvstore *kvs) {
- return kvs->key_count;
-}
-
-/* This method provides the cumulative sum of all the dictionary buckets
- * across dictionaries in a database. */
-unsigned long kvstoreBuckets(kvstore *kvs) {
- if (kvs->num_dicts != 1) {
- return kvs->bucket_count;
- } else {
- return kvs->dicts[0]? dictBuckets(kvs->dicts[0]) : 0;
- }
-}
-
-size_t kvstoreMemUsage(kvstore *kvs) {
- size_t mem = sizeof(*kvs);
- size_t metaSize = kvs->dtype.dictMetadataBytes(NULL);
- unsigned long long keys_count = kvstoreSize(kvs);
- mem += keys_count * dictEntryMemUsage(kvs->dtype.no_value) +
- kvstoreBuckets(kvs) * sizeof(dictEntry*) +
- kvs->allocated_dicts * (sizeof(dict) + metaSize);
-
- /* Values are dict* shared with kvs->dicts */
- mem += listLength(kvs->rehashing) * sizeof(listNode);
-
- return mem;
-}
-
-/*
- * This method is used to iterate over the elements of the entire kvstore specifically across dicts.
- * It's a three pronged approach.
- *
- * 1. It uses the provided cursor `cursor` to retrieve the dict index from it.
- * 2. If the dictionary is in a valid state checked through the provided callback `dictScanValidFunction`,
- * it performs a dictScan over the appropriate `keyType` dictionary of `db`.
- * 3. If the dict is entirely scanned i.e. the cursor has reached 0, the next non empty dict is discovered.
- * The dict information is embedded into the cursor and returned.
- *
- * To restrict the scan to a single dict, pass a valid dict index as
- * 'onlydidx', otherwise pass -1.
- */
-unsigned long long kvstoreScan(kvstore *kvs, unsigned long long cursor,
- int onlydidx, dictScanFunction *scan_cb,
- kvstoreScanShouldSkipDict *skip_cb,
- void *privdata)
-{
- unsigned long long _cursor = 0;
- /* During dictionary traversal, 48 upper bits in the cursor are used for positioning in the HT.
- * Following lower bits are used for the dict index number, ranging from 0 to 2^num_dicts_bits-1.
- * Dict index is always 0 at the start of iteration and can be incremented only if there are
- * multiple dicts. */
- int didx = getAndClearDictIndexFromCursor(kvs, &cursor);
- if (onlydidx >= 0) {
- if (didx < onlydidx) {
- /* Fast-forward to onlydidx. */
- assert(onlydidx < kvs->num_dicts);
- didx = onlydidx;
- cursor = 0;
- } else if (didx > onlydidx) {
- /* The cursor is already past onlydidx. */
- return 0;
- }
- }
-
- dict *d = kvstoreGetDict(kvs, didx);
-
- int skip = !d || (skip_cb && skip_cb(d, didx));
- if (!skip) {
- _cursor = dictScan(d, cursor, scan_cb, privdata);
- /* In dictScan, scan_cb may delete entries (e.g., in active expire case). */
- freeDictIfNeeded(kvs, didx);
- }
- /* scanning done for the current dictionary or if the scanning wasn't possible, move to the next dict index. */
- if (_cursor == 0 || skip) {
- if (onlydidx >= 0)
- return 0;
- didx = kvstoreGetNextNonEmptyDictIndex(kvs, didx);
- }
- if (didx == -1) {
- return 0;
- }
- addDictIndexToCursor(kvs, didx, &_cursor);
- return _cursor;
-}
-
-/*
- * This functions increases size of kvstore to match desired number.
- * It resizes all individual dictionaries, unless skip_cb indicates otherwise.
- *
- * Based on the parameter `try_expand`, appropriate dict expand API is invoked.
- * if try_expand is set to 1, `dictTryExpand` is used else `dictExpand`.
- * The return code is either `DICT_OK`/`DICT_ERR` for both the API(s).
- * `DICT_OK` response is for successful expansion. However, `DICT_ERR` response signifies failure in allocation in
- * `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed.
- */
-int kvstoreExpand(kvstore *kvs, uint64_t newsize, int try_expand, kvstoreExpandShouldSkipDictIndex *skip_cb) {
- for (int i = 0; i < kvs->num_dicts; i++) {
- dict *d = kvstoreGetDict(kvs, i);
- if (!d || (skip_cb && skip_cb(i)))
- continue;
- int result = try_expand ? dictTryExpand(d, newsize) : dictExpand(d, newsize);
- if (try_expand && result == DICT_ERR)
- return 0;
- }
-
- return 1;
-}
-
-/* Returns fair random dict index, probability of each dict being returned is
- * proportional to the number of elements that dictionary holds.
- * This function guarantees that it returns a dict-index of a non-empty dict,
- * unless the entire kvstore is empty or all dicts are skipped.
- *
- * Parameters:
- * - kvs: the kvstore instance
- * - skip_cb: callback to determine if a dict should be skipped (NULL means no skipping)
- * - fair_attempts: number of fair selection attempts before falling back
- * - slow_fallback: if 1, uses systematic search when fair attempts fail
- *
- * Returns:
- * - Valid dict index (>= 0) on success
- * - -1 if no valid dict found (either slow_fallback is 0 or all dicts are skipped)
- *
- * Time complexity: O(fair_attempts * log(kvs->num_dicts)) for fair attempts,
- * plus O(kvs->num_dicts) for systematic fallback if enabled.
- */
-int kvstoreGetFairRandomDictIndex(kvstore *kvs, kvstoreRandomShouldSkipDictIndex *skip_cb,
- int fair_attempts, int slow_fallback)
-{
- if (kvs->num_dicts == 1 || kvstoreSize(kvs) == 0)
- return 0;
-
- unsigned long long total_size = kvstoreSize(kvs);
-
- /* Try fair attempts first. If skip_cb is not applicable, execute only once. */
- for (int attempt = 0; attempt < fair_attempts; attempt++) {
- unsigned long target = (randomULong() % total_size) + 1;
- int didx = kvstoreFindDictIndexByKeyIndex(kvs, target);
- if (!skip_cb || !skip_cb(didx)) {
- return didx;
- }
- }
-
- /* If fair attempts failed and slow fallback is allowed */
- if (slow_fallback) {
- /* systematic check from random start */
- int start = randomULong() % kvs->num_dicts;
- for (int i = 0; i < kvs->num_dicts; i++) {
- int didx = (start + i) % kvs->num_dicts;
- dict *d = kvstoreGetDict(kvs, didx);
- if (d && (!skip_cb || !skip_cb(didx))) {
- return didx;
- }
- }
- }
-
- /* Failed to find valid dict that has elements */
- return -1;
-}
-
-void kvstoreGetStats(kvstore *kvs, char *buf, size_t bufsize, int full) {
- buf[0] = '\0';
-
- size_t l;
- char *orig_buf = buf;
- size_t orig_bufsize = bufsize;
- dictStats *mainHtStats = NULL;
- dictStats *rehashHtStats = NULL;
- dict *d;
- kvstoreIterator kvs_it;
-
- kvstoreIteratorInit(&kvs_it, kvs);
- while ((d = kvstoreIteratorNextDict(&kvs_it))) {
- dictStats *stats = dictGetStatsHt(d, 0, full);
- if (!mainHtStats) {
- mainHtStats = stats;
- } else {
- dictCombineStats(stats, mainHtStats);
- dictFreeStats(stats);
- }
- if (dictIsRehashing(d)) {
- stats = dictGetStatsHt(d, 1, full);
- if (!rehashHtStats) {
- rehashHtStats = stats;
- } else {
- dictCombineStats(stats, rehashHtStats);
- dictFreeStats(stats);
- }
- }
- }
- kvstoreIteratorReset(&kvs_it);
-
- if (mainHtStats && bufsize > 0) {
- l = dictGetStatsMsg(buf, bufsize, mainHtStats, full);
- dictFreeStats(mainHtStats);
- buf += l;
- bufsize -= l;
- }
-
- if (rehashHtStats && bufsize > 0) {
- l = dictGetStatsMsg(buf, bufsize, rehashHtStats, full);
- dictFreeStats(rehashHtStats);
- buf += l;
- bufsize -= l;
- }
- /* Make sure there is a NULL term at the end. */
- if (orig_bufsize) orig_buf[orig_bufsize - 1] = '\0';
-}
-
-/* Finds a dict containing target element in a key space ordered by dict index.
- * Consider this example. Dictionaries are represented by brackets and keys by dots:
- * #0 #1 #2 #3 #4
- * [..][....][...][.......][.]
- * ^
- * target
- *
- * In this case dict #3 contains key that we are trying to find.
- *
- * The return value is 0 based dict-index, and the range of the target is [1..kvstoreSize], kvstoreSize inclusive.
- *
- * To find the dict, we start with the root node of the binary index tree and search through its children
- * from the highest index (2^num_dicts_bits in our case) to the lowest index. At each node, we check if the target
- * value is greater than the node's value. If it is, we remove the node's value from the target and recursively
- * search for the new target using the current node as the parent.
- * Time complexity of this function is O(log(kvs->num_dicts))
- */
-int kvstoreFindDictIndexByKeyIndex(kvstore *kvs, unsigned long target) {
- if (kvs->num_dicts == 1 || kvstoreSize(kvs) == 0)
- return 0;
- assert(target <= kvstoreSize(kvs));
-
- return fwTreeFindIndex(kvs->dict_sizes, target);
-}
-
-/* Get the first non-empty dict index in the kvstore. Returns -1 if kvstore is empty. */
-int kvstoreGetFirstNonEmptyDictIndex(kvstore *kvs) {
- if (kvstoreSize(kvs) == 0)
- return -1;
- if (kvs->num_dicts == 1)
- return 0;
- return fwTreeFindFirstNonEmpty(kvs->dict_sizes);
-}
-
-/* Returns next non-empty dict index strictly after given one, or -1 if provided didx is the last one. */
-int kvstoreGetNextNonEmptyDictIndex(kvstore *kvs, int didx) {
- if (kvs->num_dicts == 1) {
- assert(didx == 0);
- return -1;
- }
- return fwTreeFindNextNonEmpty(kvs->dict_sizes, didx);
-}
-
-int kvstoreNumNonEmptyDicts(kvstore *kvs) {
- return kvs->non_empty_dicts;
-}
-
-int kvstoreNumAllocatedDicts(kvstore *kvs) {
- return kvs->allocated_dicts;
-}
-
-int kvstoreNumDicts(kvstore *kvs) {
- return kvs->num_dicts;
-}
-
-/* Move dict from one kvstore to another. */
-void kvstoreMoveDict(kvstore *kvs, kvstore *dst, int didx) {
- assert(kvs->num_dicts > didx);
- assert(kvs->num_dicts == dst->num_dicts);
- assert(dst->dicts[didx] == NULL);
-
- dict *d = kvs->dicts[didx];
- if (d == NULL) return;
-
- /* Adjust source kvstore */
- kvs->allocated_dicts -= 1;
- cumulativeKeyCountAdd(kvs, didx, -((long long)dictSize(d)));
- kvstoreDictBucketChanged(d, -((long long) dictBuckets(d)));
- /* If rehashing, stop it. */
- if (dictIsRehashing(d))
- kvstoreDictRehashingCompleted(d);
- /* Clear dict from source kvstore and create a new one if needed */
- kvs->dicts[didx] = NULL;
- if (!(kvs->flags & (KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS)))
- createDictIfNeeded(kvs, didx);
-
- /* Move dict to destination kvstore */
- dst->dicts[didx] = d;
- dst->dicts[didx]->type = &dst->dtype;
- dst->allocated_dicts += 1;
- cumulativeKeyCountAdd(dst, didx, dictSize(d));
- kvstoreDictBucketChanged(d, dictBuckets(d));
- if (dictIsRehashing(dst->dicts[didx]))
- kvstoreDictRehashingStarted(dst->dicts[didx]);
-}
-
-/* Returns kvstore iterator that can be used to iterate through sub-dictionaries.
- *
- * The caller should reset kvs_it with kvstoreIteratorReset. */
-void kvstoreIteratorInit(kvstoreIterator *kvs_it, kvstore *kvs) {
- kvs_it->kvs = kvs;
- kvs_it->didx = -1;
- kvs_it->next_didx = kvstoreGetFirstNonEmptyDictIndex(kvs_it->kvs); /* Finds first non-empty dict index. */
- dictInitSafeIterator(&kvs_it->di, NULL);
-}
-
-/* Free the kvs_it returned by kvstoreIteratorInit. */
-void kvstoreIteratorReset(kvstoreIterator *kvs_it) {
- dictIterator *iter = &kvs_it->di;
- dictResetIterator(iter);
- /* In the safe iterator context, we may delete entries. */
- if (kvs_it->didx != -1)
- freeDictIfNeeded(kvs_it->kvs, kvs_it->didx);
-}
-
-/* Returns next dictionary from the iterator, or NULL if iteration is complete.
- *
- * - Takes care to reset the iter of the previous dict before moved to the next dict.
- */
-dict *kvstoreIteratorNextDict(kvstoreIterator *kvs_it) {
- if (kvs_it->next_didx == -1)
- return NULL;
-
- /* The dict may be deleted during the iteration process, so here need to check for NULL. */
- if (kvs_it->didx != -1 && kvstoreGetDict(kvs_it->kvs, kvs_it->didx)) {
- /* Before we move to the next dict, reset the iter of the previous dict. */
- dictIterator *iter = &kvs_it->di;
- dictResetIterator(iter);
- /* In the safe iterator context, we may delete entries. */
- freeDictIfNeeded(kvs_it->kvs, kvs_it->didx);
- }
-
- kvs_it->didx = kvs_it->next_didx;
- kvs_it->next_didx = kvstoreGetNextNonEmptyDictIndex(kvs_it->kvs, kvs_it->didx);
- return kvs_it->kvs->dicts[kvs_it->didx];
-}
-
-int kvstoreIteratorGetCurrentDictIndex(kvstoreIterator *kvs_it) {
- assert(kvs_it->didx >= 0 && kvs_it->didx < kvs_it->kvs->num_dicts);
- return kvs_it->didx;
-}
-
-/* Returns next entry. */
-dictEntry *kvstoreIteratorNext(kvstoreIterator *kvs_it) {
- dictEntry *de = kvs_it->di.d ? dictNext(&kvs_it->di) : NULL;
- if (!de) { /* No current dict or reached the end of the dictionary. */
-
- /* Before we move to the next dict, function kvstoreIteratorNextDict()
- * reset the iter of the previous dict & freeDictIfNeeded(). */
- dict *d = kvstoreIteratorNextDict(kvs_it);
-
- if (!d)
- return NULL;
-
- dictInitSafeIterator(&kvs_it->di, d);
- de = dictNext(&kvs_it->di);
- }
- return de;
-}
-
-/* This method traverses through kvstore dictionaries and triggers a resize.
- * It first tries to shrink if needed, and if it isn't, it tries to expand. */
-void kvstoreTryResizeDicts(kvstore *kvs, int limit) {
- if (limit > kvs->num_dicts)
- limit = kvs->num_dicts;
-
- for (int i = 0; i < limit; i++) {
- int didx = kvs->resize_cursor;
- dict *d = kvstoreGetDict(kvs, didx);
- if (d && dictShrinkIfNeeded(d) == DICT_ERR) {
- dictExpandIfNeeded(d);
- }
- kvs->resize_cursor = (didx + 1) % kvs->num_dicts;
- }
-}
-
-/* Our hash table implementation performs rehashing incrementally while
- * we write/read from the hash table. Still if the server is idle, the hash
- * table will use two tables for a long time. So we try to use threshold_us
- * of CPU time at every call of this function to perform some rehashing.
- *
- * The function returns the amount of microsecs spent if some rehashing was
- * performed, otherwise 0 is returned. */
-uint64_t kvstoreIncrementallyRehash(kvstore *kvs, uint64_t threshold_us) {
- if (listLength(kvs->rehashing) == 0)
- return 0;
-
- /* Our goal is to rehash as many dictionaries as we can before reaching threshold_us,
- * after each dictionary completes rehashing, it removes itself from the list. */
- listNode *node;
- monotime timer;
- uint64_t elapsed_us = 0;
- elapsedStart(&timer);
- while ((node = listFirst(kvs->rehashing))) {
- dictRehashMicroseconds(listNodeValue(node), threshold_us - elapsed_us);
-
- elapsed_us = elapsedUs(timer);
- if (elapsed_us >= threshold_us) {
- break; /* Reached the time limit. */
- }
- }
- return elapsed_us;
-}
-
-size_t kvstoreOverheadHashtableLut(kvstore *kvs) {
- return kvs->bucket_count * sizeof(dictEntry *);
-}
-
-size_t kvstoreOverheadHashtableRehashing(kvstore *kvs) {
- return kvs->overhead_hashtable_rehashing * sizeof(dictEntry *);
-}
-
-unsigned long kvstoreDictRehashingCount(kvstore *kvs) {
- return listLength(kvs->rehashing);
-}
-
-unsigned long kvstoreDictSize(kvstore *kvs, int didx)
-{
- dict *d = kvstoreGetDict(kvs, didx);
- if (!d)
- return 0;
- return dictSize(d);
-}
-
-void kvstoreInitDictIterator(kvstoreDictIterator *kvs_di, kvstore *kvs, int didx)
-{
- kvs_di->kvs = kvs;
- kvs_di->didx = didx;
- dictInitIterator(&kvs_di->di, kvstoreGetDict(kvs, didx));
-}
-
-void kvstoreInitDictSafeIterator(kvstoreDictIterator *kvs_di, kvstore *kvs, int didx)
-{
- kvs_di->kvs = kvs;
- kvs_di->didx = didx;
- dictInitSafeIterator(&kvs_di->di, kvstoreGetDict(kvs, didx));
-}
-
-/* Free the kvs_di returned by kvstoreGetDictIterator and kvstoreGetDictSafeIterator. */
-void kvstoreResetDictIterator(kvstoreDictIterator *kvs_di)
-{
- /* The dict may be deleted during the iteration process, so here need to check for NULL. */
- if (kvstoreGetDict(kvs_di->kvs, kvs_di->didx)) {
- dictResetIterator(&kvs_di->di);
- /* In the safe iterator context, we may delete entries. */
- freeDictIfNeeded(kvs_di->kvs, kvs_di->didx);
- }
-}
-
-/* Get the next element of the dict through kvstoreDictIterator and dictNext. */
-dictEntry *kvstoreDictIteratorNext(kvstoreDictIterator *kvs_di)
-{
- /* The dict may be deleted during the iteration process, so here need to check for NULL. */
- dict *d = kvstoreGetDict(kvs_di->kvs, kvs_di->didx);
- if (!d) return NULL;
-
- return dictNext(&kvs_di->di);
-}
-
-dictEntry *kvstoreDictGetRandomKey(kvstore *kvs, int didx)
-{
- dict *d = kvstoreGetDict(kvs, didx);
- if (!d)
- return NULL;
- return dictGetRandomKey(d);
-}
-
-dictEntry *kvstoreDictGetFairRandomKey(kvstore *kvs, int didx)
-{
- dict *d = kvstoreGetDict(kvs, didx);
- if (!d)
- return NULL;
- return dictGetFairRandomKey(d);
-}
-
-unsigned int kvstoreDictGetSomeKeys(kvstore *kvs, int didx, dictEntry **des, unsigned int count)
-{
- dict *d = kvstoreGetDict(kvs, didx);
- if (!d)
- return 0;
- return dictGetSomeKeys(d, des, count);
-}
-
-int kvstoreDictExpand(kvstore *kvs, int didx, unsigned long size)
-{
- dict *d = createDictIfNeeded(kvs, didx);
- if (!d)
- return DICT_ERR;
- return dictExpand(d, size);
-}
-
-unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata)
-{
- dict *d = kvstoreGetDict(kvs, didx);
- if (!d)
- return 0;
- return dictScanDefrag(d, v, fn, defragfns, privdata);
-}
-
-/* Unlike kvstoreDictScanDefrag(), this method doesn't defrag the data(keys and values)
- * within dict, it only reallocates the memory used by the dict structure itself using
- * the provided allocation function. This feature was added for the active defrag feature.
- *
- * With 16k dictionaries for cluster mode with 1 shard, this operation may require substantial time
- * to execute. A "cursor" is used to perform the operation iteratively. When first called, a
- * cursor value of 0 should be provided. The return value is an updated cursor which should be
- * provided on the next iteration. The operation is complete when 0 is returned.
- *
- * The 'defragfn' callback is called with a reference to the dict that callback can reallocate. */
-unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn) {
- for (int didx = cursor; didx < kvs->num_dicts; didx++) {
- dict **d = kvstoreGetDictRef(kvs, didx), *newd;
- if (!*d)
- continue;
- if ((newd = defragfn(*d))) {
- *d = newd;
-
- /* After defragmenting the dict, update its corresponding
- * rehashing node in the kvstore's rehashing list. */
- kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(*d);
- if (metadata->rehashing_node)
- metadata->rehashing_node->value = *d;
- }
- return (didx + 1);
- }
- return 0;
-}
-
-void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key)
-{
- dict *d = kvstoreGetDict(kvs, didx);
- if (!d)
- return NULL;
- assert(d->type->no_value == 0);
- return dictFetchValue(d, key);
-}
-
-dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key) {
- dict *d = kvstoreGetDict(kvs, didx);
- if (!d)
- return NULL;
- return dictFind(d, key);
-}
-
-/* Find a link to a key in the specified kvstore. If not found return NULL.
- *
- * This function is a wrapper around dictFindLink(), used to locate a key in a dict
- * from a kvstore.
- *
- * The caller may provide a bucket pointer to receive the reference to the bucket
- * where the key is stored or need to be added.
- *
- * Returns:
- * A reference to the dictEntry if found, otherwise NULL.
- *
- * Important:
- * After calling kvstoreDictFindLink(), any necessary updates based on returned
- * link or bucket must be made immediately after, commonly by kvstoreDictSetAtLink()
- * without any operations in between that might modify the dict. Otherwise,
- * the link or bucket may become invalid. Example usage:
- *
- * link = kvstoreDictFindLink(kvs, didx, key, &bucket);
- * ... Do something, but don't modify kvs->dicts[didx] ...
- * if (link)
- * kvstoreDictSetAtLink(kvs, didx, kv, &link, 0); // Update existing entry
- * else
- * kvstoreDictSetAtLink(kvs, didx, kv, &bucket, 1); // Insert new entry
- */
-dictEntryLink kvstoreDictFindLink(kvstore *kvs, int didx, void *key, dictEntryLink *bucket) {
- if (bucket) *bucket = NULL;
- dict *d = kvstoreGetDict(kvs, didx);
- if (!d) return NULL;
- return dictFindLink(d, key, bucket);
-}
-
-/* Set a key (or key-value) in the specified kvstore.
- *
- * This function inserts a new key or updates an existing one, depending on
- * the `newItem` flag.
- *
- * Parameters:
- * link: - When `newItem` is set, `link` points to the bucket of the key.
- * - When `newItem` is not set, `link` points to the link of the key.
- * - If link is NULL, dictFindLink() will be called to locate the link.
- *
- * newItem: - If set, add a new key with a new dictEntry.
- * - If not set, update the key of an existing dictEntry.
- */
-void kvstoreDictSetAtLink(kvstore *kvs, int didx, void *kv, dictEntryLink *link, int newItem) {
- dict *d;
- if (newItem) {
- d = createDictIfNeeded(kvs, didx);
- dictSetKeyAtLink(d, kv, link, newItem);
- cumulativeKeyCountAdd(kvs, didx, 1); /* must be called only after updating dict */
- } else {
- d = kvstoreGetDict(kvs, didx);
- dictSetKeyAtLink(d, kv, link, newItem);
- }
-}
-
-dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing) {
- dict *d = createDictIfNeeded(kvs, didx);
- dictEntry *ret = dictAddRaw(d, key, existing);
- if (ret)
- cumulativeKeyCountAdd(kvs, didx, 1);
- return ret;
-}
-
-void kvstoreDictSetKey(kvstore *kvs, int didx, dictEntry* de, void *key) {
- dict *d = kvstoreGetDict(kvs, didx);
- dictSetKey(d, de, key);
-}
-
-void kvstoreDictSetVal(kvstore *kvs, int didx, dictEntry *de, void *val) {
- dict *d = kvstoreGetDict(kvs, didx);
- assert(d->type->no_value == 0);
- dictSetVal(d, de, val);
-}
-
-dictEntryLink kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, int *table_index) {
- dict *d = kvstoreGetDict(kvs, didx);
- if (!d)
- return NULL;
- return dictTwoPhaseUnlinkFind(kvstoreGetDict(kvs, didx), key, table_index);
-}
-
-void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntryLink link, int table_index) {
- dict *d = kvstoreGetDict(kvs, didx);
- dictTwoPhaseUnlinkFree(d, link, table_index);
- cumulativeKeyCountAdd(kvs, didx, -1);
- freeDictIfNeeded(kvs, didx);
-}
-
-int kvstoreDictDelete(kvstore *kvs, int didx, const void *key) {
- dict *d = kvstoreGetDict(kvs, didx);
- if (!d)
- return DICT_ERR;
- int ret = dictDelete(d, key);
- if (ret == DICT_OK) {
- cumulativeKeyCountAdd(kvs, didx, -1);
- freeDictIfNeeded(kvs, didx);
- }
- return ret;
-}
-
-void *kvstoreGetDictMeta(kvstore *kvs, int didx, int createIfNeeded) {
- dict *d = kvstoreGetDict(kvs, didx);
- if (!d) {
- if (!createIfNeeded) return NULL;
- d = createDictIfNeeded(kvs, didx);
- }
- return dictMetadata(d);
-}
-
-void *kvstoreGetMetadata(kvstore *kvs) {
- return &kvs->metadata;
-}
-
-#ifdef REDIS_TEST
-#include <stdio.h>
-#include "testhelp.h"
-
-#define TEST(name) printf("test — %s\n", name);
-
-uint64_t hashTestCallback(const void *key) {
- return dictGenHashFunction((unsigned char*)key, strlen((char*)key));
-}
-
-void freeTestCallback(dict *d, void *val) {
- UNUSED(d);
- zfree(val);
-}
-
-void *defragAllocTest(void *ptr) {
- size_t size = zmalloc_usable_size(ptr);
- void *newptr = zmalloc(size);
- memcpy(newptr, ptr, size);
- zfree(ptr);
- return newptr;
-}
-
-dict *defragLUTTestCallback(dict *d) {
- /* handle the dict struct */
- d = defragAllocTest(d);
- /* handle the first hash table */
- d->ht_table[0] = defragAllocTest(d->ht_table[0]);
- /* handle the second hash table */
- if (d->ht_table[1])
- d->ht_table[1] = defragAllocTest(d->ht_table[1]);
- return d;
-}
-
-dictType KvstoreDictTestType = {
- hashTestCallback,
- NULL,
- NULL,
- NULL,
- freeTestCallback,
- NULL,
- NULL
-};
-
-kvstoreType KvstoreTestType = {
- NULL, /* kvstore metadata size */
- NULL, /* dict metadata size */
- NULL, /* can free dict */
- NULL, /* on kvstore empty */
- NULL, /* on dict empty */
-};
-
-char *stringFromInt(int value) {
- char buf[32];
- int len;
- char *s;
-
- len = snprintf(buf, sizeof(buf), "%d",value);
- s = zmalloc(len+1);
- memcpy(s, buf, len);
- s[len] = '\0';
- return s;
-}
-
-/* ./redis-server test kvstore */
-int kvstoreTest(int argc, char **argv, int flags) {
- UNUSED(argc);
- UNUSED(argv);
- UNUSED(flags);
-
- int i;
- void *key;
- dictEntry *de;
- kvstoreIterator kvs_it;
- kvstoreDictIterator kvs_di;
-
- /* Test also dictType with no_value=1 */
- dictType KvstoreDictNovalTestType = KvstoreDictTestType;
- KvstoreDictNovalTestType.no_value = 1;
-
- int didx = 0;
- int curr_slot = 0;
- kvstore *kvs1 = kvstoreCreate(&KvstoreTestType, &KvstoreDictTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
- kvstore *kvs2 = kvstoreCreate(&KvstoreTestType, &KvstoreDictNovalTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS);
-
- TEST("Add 16 keys") {
- for (i = 0; i < 16; i++) {
- de = kvstoreDictAddRaw(kvs1, didx, stringFromInt(i), NULL);
- assert(de != NULL);
- de = kvstoreDictAddRaw(kvs2, didx, stringFromInt(i), NULL);
- assert(de != NULL);
- }
- assert(kvstoreDictSize(kvs1, didx) == 16);
- assert(kvstoreSize(kvs1) == 16);
- assert(kvstoreDictSize(kvs2, didx) == 16);
- assert(kvstoreSize(kvs2) == 16);
- }
-
- TEST("kvstoreIterator creating and releasing without kvstoreIteratorNextDict()") {
- kvstore *kvs = kvstoreCreate(&KvstoreTestType, &KvstoreDictNovalTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS);
- kvstoreIterator kvs_iter;
- kvstoreIteratorInit(&kvs_iter, kvs);
- kvstoreIteratorReset(&kvs_iter);
- kvstoreRelease(kvs);
- }
-
- TEST("kvstoreIterator case 1: removing all keys does not delete the empty dict") {
- kvstoreIteratorInit(&kvs_it, kvs1);
- while((de = kvstoreIteratorNext(&kvs_it)) != NULL) {
- curr_slot = kvstoreIteratorGetCurrentDictIndex(&kvs_it);
- key = dictGetKey(de);
- assert(kvstoreDictDelete(kvs1, curr_slot, key) == DICT_OK);
- }
- kvstoreIteratorReset(&kvs_it);
-
- dict *d = kvstoreGetDict(kvs1, didx);
- assert(d != NULL);
- assert(kvstoreDictSize(kvs1, didx) == 0);
- assert(kvstoreSize(kvs1) == 0);
- }
-
- TEST("kvstoreIterator case 2: removing all keys will delete the empty dict") {
- kvstoreIteratorInit(&kvs_it, kvs2);
- while((de = kvstoreIteratorNext(&kvs_it)) != NULL) {
- curr_slot = kvstoreIteratorGetCurrentDictIndex(&kvs_it);
- key = dictGetKey(de);
- assert(kvstoreDictDelete(kvs2, curr_slot, key) == DICT_OK);
- }
- kvstoreIteratorReset(&kvs_it);
-
- /* Make sure the dict was removed from the rehashing list. */
- while (kvstoreIncrementallyRehash(kvs2, 1000)) {}
-
- dict *d = kvstoreGetDict(kvs2, didx);
- assert(d == NULL);
- assert(kvstoreDictSize(kvs2, didx) == 0);
- assert(kvstoreSize(kvs2) == 0);
- }
-
- TEST("Add 16 keys again") {
- for (i = 0; i < 16; i++) {
- de = kvstoreDictAddRaw(kvs1, didx, stringFromInt(i), NULL);
- assert(de != NULL);
- de = kvstoreDictAddRaw(kvs2, didx, stringFromInt(i), NULL);
- assert(de != NULL);
- }
- assert(kvstoreDictSize(kvs1, didx) == 16);
- assert(kvstoreSize(kvs1) == 16);
- assert(kvstoreDictSize(kvs2, didx) == 16);
- assert(kvstoreSize(kvs2) == 16);
- }
-
- TEST("kvstoreDictIterator case 1: removing all keys does not delete the empty dict") {
- kvstoreInitDictSafeIterator(&kvs_di, kvs1, didx);
- while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
- key = dictGetKey(de);
- assert(kvstoreDictDelete(kvs1, didx, key) == DICT_OK);
- }
- kvstoreResetDictIterator(&kvs_di);
-
- dict *d = kvstoreGetDict(kvs1, didx);
- assert(d != NULL);
- assert(kvstoreDictSize(kvs1, didx) == 0);
- assert(kvstoreSize(kvs1) == 0);
- }
-
- TEST("kvstoreDictIterator case 2: removing all keys will delete the empty dict") {
- kvstoreInitDictSafeIterator(&kvs_di, kvs2, didx);
- while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
- key = dictGetKey(de);
- assert(kvstoreDictDelete(kvs2, didx, key) == DICT_OK);
- }
- kvstoreResetDictIterator(&kvs_di);
-
- dict *d = kvstoreGetDict(kvs2, didx);
- assert(d == NULL);
- assert(kvstoreDictSize(kvs2, didx) == 0);
- assert(kvstoreSize(kvs2) == 0);
- }
-
- TEST("Verify that a rehashing dict's node in the rehashing list is correctly updated after defragmentation") {
- unsigned long cursor = 0;
- kvstore *kvs = kvstoreCreate(&KvstoreTestType, &KvstoreDictTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
- for (i = 0; i < 256; i++) {
- de = kvstoreDictAddRaw(kvs, 0, stringFromInt(i), NULL);
- if (listLength(kvs->rehashing)) break;
- }
- assert(listLength(kvs->rehashing));
- while ((cursor = kvstoreDictLUTDefrag(kvs, cursor, defragLUTTestCallback)) != 0) {}
- while (kvstoreIncrementallyRehash(kvs, 1000)) {}
- kvstoreRelease(kvs);
- }
-
- TEST("Verify non-empty dict count is correctly updated") {
- kvstore *kvs = kvstoreCreate(&KvstoreTestType, &KvstoreDictTestType, 2,
- KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
- for (int idx = 0; idx < 4; idx++) {
- for (i = 0; i < 16; i++) {
- de = kvstoreDictAddRaw(kvs, idx, stringFromInt(i), NULL);
- assert(de != NULL);
- /* When the first element is inserted, the number of non-empty dictionaries is increased by 1. */
- if (i == 0) assert(kvstoreNumNonEmptyDicts(kvs) == idx + 1);
- }
- }
-
- /* Step by step, clear all dictionaries and ensure non-empty dict count is updated */
- for (int idx = 0; idx < 4; idx++) {
- kvstoreInitDictSafeIterator(&kvs_di, kvs, idx);
- while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
- key = dictGetKey(de);
- assert(kvstoreDictDelete(kvs, idx, key) == DICT_OK);
- /* When the dictionary is emptied, the number of non-empty dictionaries is reduced by 1. */
- if (kvstoreDictSize(kvs, idx) == 0) assert(kvstoreNumNonEmptyDicts(kvs) == 3 - idx);
- }
- kvstoreResetDictIterator(&kvs_di);
- }
- kvstoreRelease(kvs);
- }
-
- kvstoreRelease(kvs1);
- kvstoreRelease(kvs2);
- return 0;
-}
-#endif