summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/kvstore.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/kvstore.c')
-rw-r--r--examples/redis-unstable/src/kvstore.c1171
1 files changed, 1171 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/kvstore.c b/examples/redis-unstable/src/kvstore.c
new file mode 100644
index 0000000..fb9e7d5
--- /dev/null
+++ b/examples/redis-unstable/src/kvstore.c
@@ -0,0 +1,1171 @@
+/*
+ * Copyright (c) 2011-Present, Redis Ltd. and contributors.
+ * All rights reserved.
+ *
+ * Copyright (c) 2024-present, Valkey contributors.
+ * All rights reserved.
+ *
+ * Licensed under your choice of (a) the Redis Source Available License 2.0
+ * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
+ * GNU Affero General Public License v3 (AGPLv3).
+ *
+ * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
+ */
+
+#include "fmacros.h"
+
+#include <string.h>
+#include <stddef.h>
+
+#include "zmalloc.h"
+#include "kvstore.h"
+#include "fwtree.h"
+#include "redisassert.h"
+#include "monotonic.h"
+
+#define UNUSED(V) ((void) V)
+
+struct _kvstore {
+ int flags;
+ kvstoreType *type;
+ dictType dtype;
+ dict **dicts;
+ long long num_dicts;
+ long long num_dicts_bits;
+ list *rehashing; /* List of dictionaries in this kvstore that are currently rehashing. */
+ int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used if num_dicts > 1). */
+ int allocated_dicts; /* The number of allocated dicts. */
+ int non_empty_dicts; /* The number of non-empty dicts. */
+ unsigned long long key_count; /* Total number of keys in this kvstore. */
+ unsigned long long bucket_count; /* Total number of buckets in this kvstore across dictionaries. */
+ fenwickTree *dict_sizes; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given dict-index. */
+ size_t overhead_hashtable_rehashing; /* The overhead of dictionaries rehashing. */
+ void *metadata[]; /* conditionally allocated based on "flags" */
+};
+
+/**********************************/
+/*** Helpers **********************/
+/**********************************/
+
+/* Get the dictionary pointer based on dict-index. */
+dict *kvstoreGetDict(kvstore *kvs, int didx) {
+ return kvs->dicts[didx];
+}
+
+static dict **kvstoreGetDictRef(kvstore *kvs, int didx) {
+ return &kvs->dicts[didx];
+}
+
+static int kvstoreDictIsRehashingPaused(kvstore *kvs, int didx)
+{
+ dict *d = kvstoreGetDict(kvs, didx);
+ return d ? dictIsRehashingPaused(d) : 0;
+}
+
+static void addDictIndexToCursor(kvstore *kvs, int didx, unsigned long long *cursor) {
+ if (kvs->num_dicts == 1)
+ return;
+ /* didx can be -1 when iteration is over and there are no more dicts to visit. */
+ if (didx < 0)
+ return;
+ *cursor = (*cursor << kvs->num_dicts_bits) | didx;
+}
+
+static int getAndClearDictIndexFromCursor(kvstore *kvs, unsigned long long *cursor) {
+ if (kvs->num_dicts == 1)
+ return 0;
+ int didx = (int) (*cursor & (kvs->num_dicts-1));
+ *cursor = *cursor >> kvs->num_dicts_bits;
+ return didx;
+}
+
+/* Updates binary index tree (Fenwick tree), updates key count for a given dict */
+static void cumulativeKeyCountAdd(kvstore *kvs, int didx, long delta) {
+ kvs->key_count += delta;
+
+ dict *d = kvstoreGetDict(kvs, didx);
+ size_t dsize = dictSize(d);
+ /* Increment if dsize is 1 and delta is positive (first element inserted, dict becomes non-empty).
+ * Decrement if dsize is 0 (dict becomes empty). */
+ int non_empty_dicts_delta = (dsize == 1 && delta > 0) ? 1 : (dsize == 0) ? -1 : 0;
+ kvs->non_empty_dicts += non_empty_dicts_delta;
+
+ /* BIT does not need to be calculated when there's only one dict. */
+ if (kvs->num_dicts == 1)
+ return;
+
+ /* Update the BIT */
+ fwTreeUpdate(kvs->dict_sizes, didx, delta);
+}
+
+/* Create the dict if it does not exist and return it. */
+static dict *createDictIfNeeded(kvstore *kvs, int didx) {
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (d) return d;
+
+ kvs->dicts[didx] = dictCreate(&kvs->dtype);
+ kvs->allocated_dicts++;
+ return kvs->dicts[didx];
+}
+
+/* Called when the dict will delete entries, the function will check
+ * KVSTORE_FREE_EMPTY_DICTS to determine whether the empty dict needs
+ * to be freed.
+ *
+ * Note that for rehashing dicts, that is, in the case of safe iterators
+ * and Scan, we won't delete the dict. We will check whether it needs
+ * to be deleted when we're releasing the iterator. */
+static void freeDictIfNeeded(kvstore *kvs, int didx) {
+ if (!(kvs->flags & KVSTORE_FREE_EMPTY_DICTS) ||
+ !kvstoreGetDict(kvs, didx) ||
+ kvstoreDictSize(kvs, didx) != 0 ||
+ kvstoreDictIsRehashingPaused(kvs, didx))
+ return;
+
+ /* Use callback if provided to check if dict can be freed */
+ if (kvs->type->canFreeDict && !kvs->type->canFreeDict(kvs, didx))
+ return;
+
+ dictRelease(kvs->dicts[didx]);
+ kvs->dicts[didx] = NULL;
+ kvs->allocated_dicts--;
+}
+
+void kvstoreFreeDictIfNeeded(kvstore *kvs, int didx) {
+ freeDictIfNeeded(kvs, didx);
+}
+
+/**********************************/
+/*** dict callbacks ***************/
+/**********************************/
+
+/* Adds dictionary to the rehashing list, which allows us
+ * to quickly find rehash targets during incremental rehashing.
+ *
+ * If there are multiple dicts, updates the bucket count for the given dictionary
+ * in a DB, bucket count incremented with the new ht size during the rehashing phase.
+ * If there's one dict, bucket count can be retrieved directly from single dict bucket. */
+static void kvstoreDictRehashingStarted(dict *d) {
+ kvstore *kvs = d->type->userdata;
+ kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d);
+ listAddNodeTail(kvs->rehashing, d);
+ metadata->rehashing_node = listLast(kvs->rehashing);
+
+ unsigned long long from, to;
+ dictRehashingInfo(d, &from, &to);
+ kvs->overhead_hashtable_rehashing += from;
+}
+
+/* Remove dictionary from the rehashing list.
+ *
+ * Updates the bucket count for the given dictionary in a DB. It removes
+ * the old ht size of the dictionary from the total sum of buckets for a DB. */
+static void kvstoreDictRehashingCompleted(dict *d) {
+ kvstore *kvs = d->type->userdata;
+ kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d);
+ if (metadata->rehashing_node) {
+ listDelNode(kvs->rehashing, metadata->rehashing_node);
+ metadata->rehashing_node = NULL;
+ }
+
+ unsigned long long from, to;
+ dictRehashingInfo(d, &from, &to);
+ kvs->overhead_hashtable_rehashing -= from;
+}
+
+/* Updates the bucket count for the given dictionary in a DB. It adds the new ht size
+ * of the dictionary or removes the old ht size of the dictionary from the total
+ * sum of buckets for a DB. */
+static void kvstoreDictBucketChanged(dict *d, long long delta) {
+ kvstore *kvs = d->type->userdata;
+ kvs->bucket_count += delta;
+}
+
+/* Returns the size of the DB dict extended metadata in bytes. */
+static size_t kvstoreDictBaseMetaSize(dict *d) {
+ UNUSED(d);
+ return sizeof(kvstoreDictMetaBase);
+}
+
+/**********************************/
+/*** API **************************/
+/**********************************/
+
+/* Create an array of dictionaries
+ * num_dicts_bits is the log2 of the amount of dictionaries needed (e.g. 0 for 1 dict,
+ * 3 for 8 dicts, etc.) */
+kvstore *kvstoreCreate(kvstoreType *type, dictType *dtype, int num_dicts_bits, int flags) {
+ /* We can't support more than 2^16 dicts because we want to save 48 bits
+ * for the dict cursor, see kvstoreScan */
+ assert(num_dicts_bits <= 16);
+ assert(!type->dictMetadataBytes || type->dictMetadataBytes(NULL) >= sizeof(kvstoreDictMetaBase));
+
+ /* Calc kvstore size */
+ size_t kvsize = sizeof(kvstore);
+ /* Conditionally calc also histogram size */
+ if (type->kvstoreMetadataBytes)
+ kvsize += type->kvstoreMetadataBytes(NULL);
+
+ kvstore *kvs = zcalloc(kvsize);
+ memcpy(&kvs->dtype, dtype, sizeof(kvs->dtype));
+ kvs->flags = flags;
+ kvs->type = type;
+
+ /* kvstore must be the one to set these callbacks, so we make sure the
+ * caller didn't do it */
+ assert(!dtype->userdata);
+ assert(!dtype->dictMetadataBytes);
+ assert(!dtype->rehashingStarted);
+ assert(!dtype->rehashingCompleted);
+ kvs->dtype.userdata = kvs;
+ kvs->dtype.dictMetadataBytes = type->dictMetadataBytes ?
+ type->dictMetadataBytes : kvstoreDictBaseMetaSize;
+ kvs->dtype.rehashingStarted = kvstoreDictRehashingStarted;
+ kvs->dtype.rehashingCompleted = kvstoreDictRehashingCompleted;
+ kvs->dtype.bucketChanged = kvstoreDictBucketChanged;
+
+ kvs->num_dicts_bits = num_dicts_bits;
+ kvs->num_dicts = 1 << kvs->num_dicts_bits;
+ kvs->dicts = zcalloc(sizeof(dict*) * kvs->num_dicts);
+ if (!(kvs->flags & KVSTORE_ALLOCATE_DICTS_ON_DEMAND)) {
+ for (int i = 0; i < kvs->num_dicts; i++)
+ createDictIfNeeded(kvs, i);
+ }
+
+ kvs->rehashing = listCreate();
+ kvs->key_count = 0;
+ kvs->non_empty_dicts = 0;
+ kvs->resize_cursor = 0;
+ kvs->dict_sizes = kvs->num_dicts > 1 ? fwTreeCreate(kvs->num_dicts_bits) : NULL;
+ kvs->bucket_count = 0;
+ kvs->overhead_hashtable_rehashing = 0;
+ return kvs;
+}
+
+void kvstoreEmpty(kvstore *kvs, void(callback)(dict*)) {
+ for (int didx = 0; didx < kvs->num_dicts; didx++) {
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (!d)
+ continue;
+ kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d);
+ if (metadata->rehashing_node)
+ metadata->rehashing_node = NULL;
+ dictEmpty(d, callback);
+ if (kvs->type->onDictEmpty) kvs->type->onDictEmpty(kvs, didx);
+ freeDictIfNeeded(kvs, didx);
+ }
+
+ if (kvs->type->onKvstoreEmpty) kvs->type->onKvstoreEmpty(kvs);
+
+ listEmpty(kvs->rehashing);
+
+ kvs->key_count = 0;
+ kvs->non_empty_dicts = 0;
+ kvs->resize_cursor = 0;
+ kvs->bucket_count = 0;
+ if (kvs->dict_sizes)
+ fwTreeClear(kvs->dict_sizes);
+ kvs->overhead_hashtable_rehashing = 0;
+}
+
+void kvstoreRelease(kvstore *kvs) {
+ for (int didx = 0; didx < kvs->num_dicts; didx++) {
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (!d)
+ continue;
+ kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d);
+ if (metadata->rehashing_node)
+ metadata->rehashing_node = NULL;
+ if (kvs->type->onDictEmpty) kvs->type->onDictEmpty(kvs, didx);
+ dictRelease(d);
+ }
+ zfree(kvs->dicts);
+
+ listRelease(kvs->rehashing);
+ if (kvs->dict_sizes)
+ fwTreeDestroy(kvs->dict_sizes);
+
+ zfree(kvs);
+}
+
+unsigned long long int kvstoreSize(kvstore *kvs) {
+ return kvs->key_count;
+}
+
+/* This method provides the cumulative sum of all the dictionary buckets
+ * across dictionaries in a database. */
+unsigned long kvstoreBuckets(kvstore *kvs) {
+ if (kvs->num_dicts != 1) {
+ return kvs->bucket_count;
+ } else {
+ return kvs->dicts[0]? dictBuckets(kvs->dicts[0]) : 0;
+ }
+}
+
+size_t kvstoreMemUsage(kvstore *kvs) {
+ size_t mem = sizeof(*kvs);
+ size_t metaSize = kvs->dtype.dictMetadataBytes(NULL);
+ unsigned long long keys_count = kvstoreSize(kvs);
+ mem += keys_count * dictEntryMemUsage(kvs->dtype.no_value) +
+ kvstoreBuckets(kvs) * sizeof(dictEntry*) +
+ kvs->allocated_dicts * (sizeof(dict) + metaSize);
+
+ /* Values are dict* shared with kvs->dicts */
+ mem += listLength(kvs->rehashing) * sizeof(listNode);
+
+ return mem;
+}
+
+/*
+ * This method is used to iterate over the elements of the entire kvstore specifically across dicts.
+ * It's a three pronged approach.
+ *
+ * 1. It uses the provided cursor `cursor` to retrieve the dict index from it.
+ * 2. If the dictionary is in a valid state checked through the provided callback `dictScanValidFunction`,
+ * it performs a dictScan over the appropriate `keyType` dictionary of `db`.
+ * 3. If the dict is entirely scanned i.e. the cursor has reached 0, the next non empty dict is discovered.
+ * The dict information is embedded into the cursor and returned.
+ *
+ * To restrict the scan to a single dict, pass a valid dict index as
+ * 'onlydidx', otherwise pass -1.
+ */
+unsigned long long kvstoreScan(kvstore *kvs, unsigned long long cursor,
+ int onlydidx, dictScanFunction *scan_cb,
+ kvstoreScanShouldSkipDict *skip_cb,
+ void *privdata)
+{
+ unsigned long long _cursor = 0;
+ /* During dictionary traversal, 48 upper bits in the cursor are used for positioning in the HT.
+ * Following lower bits are used for the dict index number, ranging from 0 to 2^num_dicts_bits-1.
+ * Dict index is always 0 at the start of iteration and can be incremented only if there are
+ * multiple dicts. */
+ int didx = getAndClearDictIndexFromCursor(kvs, &cursor);
+ if (onlydidx >= 0) {
+ if (didx < onlydidx) {
+ /* Fast-forward to onlydidx. */
+ assert(onlydidx < kvs->num_dicts);
+ didx = onlydidx;
+ cursor = 0;
+ } else if (didx > onlydidx) {
+ /* The cursor is already past onlydidx. */
+ return 0;
+ }
+ }
+
+ dict *d = kvstoreGetDict(kvs, didx);
+
+ int skip = !d || (skip_cb && skip_cb(d, didx));
+ if (!skip) {
+ _cursor = dictScan(d, cursor, scan_cb, privdata);
+ /* In dictScan, scan_cb may delete entries (e.g., in active expire case). */
+ freeDictIfNeeded(kvs, didx);
+ }
+ /* scanning done for the current dictionary or if the scanning wasn't possible, move to the next dict index. */
+ if (_cursor == 0 || skip) {
+ if (onlydidx >= 0)
+ return 0;
+ didx = kvstoreGetNextNonEmptyDictIndex(kvs, didx);
+ }
+ if (didx == -1) {
+ return 0;
+ }
+ addDictIndexToCursor(kvs, didx, &_cursor);
+ return _cursor;
+}
+
+/*
+ * This functions increases size of kvstore to match desired number.
+ * It resizes all individual dictionaries, unless skip_cb indicates otherwise.
+ *
+ * Based on the parameter `try_expand`, appropriate dict expand API is invoked.
+ * if try_expand is set to 1, `dictTryExpand` is used else `dictExpand`.
+ * The return code is either `DICT_OK`/`DICT_ERR` for both the API(s).
+ * `DICT_OK` response is for successful expansion. However, `DICT_ERR` response signifies failure in allocation in
+ * `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed.
+ */
+int kvstoreExpand(kvstore *kvs, uint64_t newsize, int try_expand, kvstoreExpandShouldSkipDictIndex *skip_cb) {
+ for (int i = 0; i < kvs->num_dicts; i++) {
+ dict *d = kvstoreGetDict(kvs, i);
+ if (!d || (skip_cb && skip_cb(i)))
+ continue;
+ int result = try_expand ? dictTryExpand(d, newsize) : dictExpand(d, newsize);
+ if (try_expand && result == DICT_ERR)
+ return 0;
+ }
+
+ return 1;
+}
+
+/* Returns fair random dict index, probability of each dict being returned is
+ * proportional to the number of elements that dictionary holds.
+ * This function guarantees that it returns a dict-index of a non-empty dict,
+ * unless the entire kvstore is empty or all dicts are skipped.
+ *
+ * Parameters:
+ * - kvs: the kvstore instance
+ * - skip_cb: callback to determine if a dict should be skipped (NULL means no skipping)
+ * - fair_attempts: number of fair selection attempts before falling back
+ * - slow_fallback: if 1, uses systematic search when fair attempts fail
+ *
+ * Returns:
+ * - Valid dict index (>= 0) on success
+ * - -1 if no valid dict found (either slow_fallback is 0 or all dicts are skipped)
+ *
+ * Time complexity: O(fair_attempts * log(kvs->num_dicts)) for fair attempts,
+ * plus O(kvs->num_dicts) for systematic fallback if enabled.
+ */
+int kvstoreGetFairRandomDictIndex(kvstore *kvs, kvstoreRandomShouldSkipDictIndex *skip_cb,
+ int fair_attempts, int slow_fallback)
+{
+ if (kvs->num_dicts == 1 || kvstoreSize(kvs) == 0)
+ return 0;
+
+ unsigned long long total_size = kvstoreSize(kvs);
+
+ /* Try fair attempts first. If skip_cb is not applicable, execute only once. */
+ for (int attempt = 0; attempt < fair_attempts; attempt++) {
+ unsigned long target = (randomULong() % total_size) + 1;
+ int didx = kvstoreFindDictIndexByKeyIndex(kvs, target);
+ if (!skip_cb || !skip_cb(didx)) {
+ return didx;
+ }
+ }
+
+ /* If fair attempts failed and slow fallback is allowed */
+ if (slow_fallback) {
+ /* systematic check from random start */
+ int start = randomULong() % kvs->num_dicts;
+ for (int i = 0; i < kvs->num_dicts; i++) {
+ int didx = (start + i) % kvs->num_dicts;
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (d && (!skip_cb || !skip_cb(didx))) {
+ return didx;
+ }
+ }
+ }
+
+ /* Failed to find valid dict that has elements */
+ return -1;
+}
+
+void kvstoreGetStats(kvstore *kvs, char *buf, size_t bufsize, int full) {
+ buf[0] = '\0';
+
+ size_t l;
+ char *orig_buf = buf;
+ size_t orig_bufsize = bufsize;
+ dictStats *mainHtStats = NULL;
+ dictStats *rehashHtStats = NULL;
+ dict *d;
+ kvstoreIterator kvs_it;
+
+ kvstoreIteratorInit(&kvs_it, kvs);
+ while ((d = kvstoreIteratorNextDict(&kvs_it))) {
+ dictStats *stats = dictGetStatsHt(d, 0, full);
+ if (!mainHtStats) {
+ mainHtStats = stats;
+ } else {
+ dictCombineStats(stats, mainHtStats);
+ dictFreeStats(stats);
+ }
+ if (dictIsRehashing(d)) {
+ stats = dictGetStatsHt(d, 1, full);
+ if (!rehashHtStats) {
+ rehashHtStats = stats;
+ } else {
+ dictCombineStats(stats, rehashHtStats);
+ dictFreeStats(stats);
+ }
+ }
+ }
+ kvstoreIteratorReset(&kvs_it);
+
+ if (mainHtStats && bufsize > 0) {
+ l = dictGetStatsMsg(buf, bufsize, mainHtStats, full);
+ dictFreeStats(mainHtStats);
+ buf += l;
+ bufsize -= l;
+ }
+
+ if (rehashHtStats && bufsize > 0) {
+ l = dictGetStatsMsg(buf, bufsize, rehashHtStats, full);
+ dictFreeStats(rehashHtStats);
+ buf += l;
+ bufsize -= l;
+ }
+ /* Make sure there is a NULL term at the end. */
+ if (orig_bufsize) orig_buf[orig_bufsize - 1] = '\0';
+}
+
+/* Finds a dict containing target element in a key space ordered by dict index.
+ * Consider this example. Dictionaries are represented by brackets and keys by dots:
+ * #0 #1 #2 #3 #4
+ * [..][....][...][.......][.]
+ * ^
+ * target
+ *
+ * In this case dict #3 contains key that we are trying to find.
+ *
+ * The return value is 0 based dict-index, and the range of the target is [1..kvstoreSize], kvstoreSize inclusive.
+ *
+ * To find the dict, we start with the root node of the binary index tree and search through its children
+ * from the highest index (2^num_dicts_bits in our case) to the lowest index. At each node, we check if the target
+ * value is greater than the node's value. If it is, we remove the node's value from the target and recursively
+ * search for the new target using the current node as the parent.
+ * Time complexity of this function is O(log(kvs->num_dicts))
+ */
+int kvstoreFindDictIndexByKeyIndex(kvstore *kvs, unsigned long target) {
+ if (kvs->num_dicts == 1 || kvstoreSize(kvs) == 0)
+ return 0;
+ assert(target <= kvstoreSize(kvs));
+
+ return fwTreeFindIndex(kvs->dict_sizes, target);
+}
+
+/* Get the first non-empty dict index in the kvstore. Returns -1 if kvstore is empty. */
+int kvstoreGetFirstNonEmptyDictIndex(kvstore *kvs) {
+ if (kvstoreSize(kvs) == 0)
+ return -1;
+ if (kvs->num_dicts == 1)
+ return 0;
+ return fwTreeFindFirstNonEmpty(kvs->dict_sizes);
+}
+
+/* Returns next non-empty dict index strictly after given one, or -1 if provided didx is the last one. */
+int kvstoreGetNextNonEmptyDictIndex(kvstore *kvs, int didx) {
+ if (kvs->num_dicts == 1) {
+ assert(didx == 0);
+ return -1;
+ }
+ return fwTreeFindNextNonEmpty(kvs->dict_sizes, didx);
+}
+
+int kvstoreNumNonEmptyDicts(kvstore *kvs) {
+ return kvs->non_empty_dicts;
+}
+
+int kvstoreNumAllocatedDicts(kvstore *kvs) {
+ return kvs->allocated_dicts;
+}
+
+int kvstoreNumDicts(kvstore *kvs) {
+ return kvs->num_dicts;
+}
+
+/* Move dict from one kvstore to another. */
+void kvstoreMoveDict(kvstore *kvs, kvstore *dst, int didx) {
+ assert(kvs->num_dicts > didx);
+ assert(kvs->num_dicts == dst->num_dicts);
+ assert(dst->dicts[didx] == NULL);
+
+ dict *d = kvs->dicts[didx];
+ if (d == NULL) return;
+
+ /* Adjust source kvstore */
+ kvs->allocated_dicts -= 1;
+ cumulativeKeyCountAdd(kvs, didx, -((long long)dictSize(d)));
+ kvstoreDictBucketChanged(d, -((long long) dictBuckets(d)));
+ /* If rehashing, stop it. */
+ if (dictIsRehashing(d))
+ kvstoreDictRehashingCompleted(d);
+ /* Clear dict from source kvstore and create a new one if needed */
+ kvs->dicts[didx] = NULL;
+ if (!(kvs->flags & (KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS)))
+ createDictIfNeeded(kvs, didx);
+
+ /* Move dict to destination kvstore */
+ dst->dicts[didx] = d;
+ dst->dicts[didx]->type = &dst->dtype;
+ dst->allocated_dicts += 1;
+ cumulativeKeyCountAdd(dst, didx, dictSize(d));
+ kvstoreDictBucketChanged(d, dictBuckets(d));
+ if (dictIsRehashing(dst->dicts[didx]))
+ kvstoreDictRehashingStarted(dst->dicts[didx]);
+}
+
+/* Returns kvstore iterator that can be used to iterate through sub-dictionaries.
+ *
+ * The caller should reset kvs_it with kvstoreIteratorReset. */
+void kvstoreIteratorInit(kvstoreIterator *kvs_it, kvstore *kvs) {
+ kvs_it->kvs = kvs;
+ kvs_it->didx = -1;
+ kvs_it->next_didx = kvstoreGetFirstNonEmptyDictIndex(kvs_it->kvs); /* Finds first non-empty dict index. */
+ dictInitSafeIterator(&kvs_it->di, NULL);
+}
+
+/* Free the kvs_it returned by kvstoreIteratorInit. */
+void kvstoreIteratorReset(kvstoreIterator *kvs_it) {
+ dictIterator *iter = &kvs_it->di;
+ dictResetIterator(iter);
+ /* In the safe iterator context, we may delete entries. */
+ if (kvs_it->didx != -1)
+ freeDictIfNeeded(kvs_it->kvs, kvs_it->didx);
+}
+
+/* Returns next dictionary from the iterator, or NULL if iteration is complete.
+ *
+ * - Takes care to reset the iter of the previous dict before moved to the next dict.
+ */
+dict *kvstoreIteratorNextDict(kvstoreIterator *kvs_it) {
+ if (kvs_it->next_didx == -1)
+ return NULL;
+
+ /* The dict may be deleted during the iteration process, so here need to check for NULL. */
+ if (kvs_it->didx != -1 && kvstoreGetDict(kvs_it->kvs, kvs_it->didx)) {
+ /* Before we move to the next dict, reset the iter of the previous dict. */
+ dictIterator *iter = &kvs_it->di;
+ dictResetIterator(iter);
+ /* In the safe iterator context, we may delete entries. */
+ freeDictIfNeeded(kvs_it->kvs, kvs_it->didx);
+ }
+
+ kvs_it->didx = kvs_it->next_didx;
+ kvs_it->next_didx = kvstoreGetNextNonEmptyDictIndex(kvs_it->kvs, kvs_it->didx);
+ return kvs_it->kvs->dicts[kvs_it->didx];
+}
+
+int kvstoreIteratorGetCurrentDictIndex(kvstoreIterator *kvs_it) {
+ assert(kvs_it->didx >= 0 && kvs_it->didx < kvs_it->kvs->num_dicts);
+ return kvs_it->didx;
+}
+
+/* Returns next entry. */
+dictEntry *kvstoreIteratorNext(kvstoreIterator *kvs_it) {
+ dictEntry *de = kvs_it->di.d ? dictNext(&kvs_it->di) : NULL;
+ if (!de) { /* No current dict or reached the end of the dictionary. */
+
+ /* Before we move to the next dict, function kvstoreIteratorNextDict()
+ * reset the iter of the previous dict & freeDictIfNeeded(). */
+ dict *d = kvstoreIteratorNextDict(kvs_it);
+
+ if (!d)
+ return NULL;
+
+ dictInitSafeIterator(&kvs_it->di, d);
+ de = dictNext(&kvs_it->di);
+ }
+ return de;
+}
+
+/* This method traverses through kvstore dictionaries and triggers a resize.
+ * It first tries to shrink if needed, and if it isn't, it tries to expand. */
+void kvstoreTryResizeDicts(kvstore *kvs, int limit) {
+ if (limit > kvs->num_dicts)
+ limit = kvs->num_dicts;
+
+ for (int i = 0; i < limit; i++) {
+ int didx = kvs->resize_cursor;
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (d && dictShrinkIfNeeded(d) == DICT_ERR) {
+ dictExpandIfNeeded(d);
+ }
+ kvs->resize_cursor = (didx + 1) % kvs->num_dicts;
+ }
+}
+
+/* Our hash table implementation performs rehashing incrementally while
+ * we write/read from the hash table. Still if the server is idle, the hash
+ * table will use two tables for a long time. So we try to use threshold_us
+ * of CPU time at every call of this function to perform some rehashing.
+ *
+ * The function returns the amount of microsecs spent if some rehashing was
+ * performed, otherwise 0 is returned. */
+uint64_t kvstoreIncrementallyRehash(kvstore *kvs, uint64_t threshold_us) {
+ if (listLength(kvs->rehashing) == 0)
+ return 0;
+
+ /* Our goal is to rehash as many dictionaries as we can before reaching threshold_us,
+ * after each dictionary completes rehashing, it removes itself from the list. */
+ listNode *node;
+ monotime timer;
+ uint64_t elapsed_us = 0;
+ elapsedStart(&timer);
+ while ((node = listFirst(kvs->rehashing))) {
+ dictRehashMicroseconds(listNodeValue(node), threshold_us - elapsed_us);
+
+ elapsed_us = elapsedUs(timer);
+ if (elapsed_us >= threshold_us) {
+ break; /* Reached the time limit. */
+ }
+ }
+ return elapsed_us;
+}
+
+size_t kvstoreOverheadHashtableLut(kvstore *kvs) {
+ return kvs->bucket_count * sizeof(dictEntry *);
+}
+
+size_t kvstoreOverheadHashtableRehashing(kvstore *kvs) {
+ return kvs->overhead_hashtable_rehashing * sizeof(dictEntry *);
+}
+
+unsigned long kvstoreDictRehashingCount(kvstore *kvs) {
+ return listLength(kvs->rehashing);
+}
+
+unsigned long kvstoreDictSize(kvstore *kvs, int didx)
+{
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (!d)
+ return 0;
+ return dictSize(d);
+}
+
+void kvstoreInitDictIterator(kvstoreDictIterator *kvs_di, kvstore *kvs, int didx)
+{
+ kvs_di->kvs = kvs;
+ kvs_di->didx = didx;
+ dictInitIterator(&kvs_di->di, kvstoreGetDict(kvs, didx));
+}
+
+void kvstoreInitDictSafeIterator(kvstoreDictIterator *kvs_di, kvstore *kvs, int didx)
+{
+ kvs_di->kvs = kvs;
+ kvs_di->didx = didx;
+ dictInitSafeIterator(&kvs_di->di, kvstoreGetDict(kvs, didx));
+}
+
+/* Free the kvs_di returned by kvstoreGetDictIterator and kvstoreGetDictSafeIterator. */
+void kvstoreResetDictIterator(kvstoreDictIterator *kvs_di)
+{
+ /* The dict may be deleted during the iteration process, so here need to check for NULL. */
+ if (kvstoreGetDict(kvs_di->kvs, kvs_di->didx)) {
+ dictResetIterator(&kvs_di->di);
+ /* In the safe iterator context, we may delete entries. */
+ freeDictIfNeeded(kvs_di->kvs, kvs_di->didx);
+ }
+}
+
+/* Get the next element of the dict through kvstoreDictIterator and dictNext. */
+dictEntry *kvstoreDictIteratorNext(kvstoreDictIterator *kvs_di)
+{
+ /* The dict may be deleted during the iteration process, so here need to check for NULL. */
+ dict *d = kvstoreGetDict(kvs_di->kvs, kvs_di->didx);
+ if (!d) return NULL;
+
+ return dictNext(&kvs_di->di);
+}
+
+dictEntry *kvstoreDictGetRandomKey(kvstore *kvs, int didx)
+{
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (!d)
+ return NULL;
+ return dictGetRandomKey(d);
+}
+
+dictEntry *kvstoreDictGetFairRandomKey(kvstore *kvs, int didx)
+{
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (!d)
+ return NULL;
+ return dictGetFairRandomKey(d);
+}
+
+unsigned int kvstoreDictGetSomeKeys(kvstore *kvs, int didx, dictEntry **des, unsigned int count)
+{
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (!d)
+ return 0;
+ return dictGetSomeKeys(d, des, count);
+}
+
+int kvstoreDictExpand(kvstore *kvs, int didx, unsigned long size)
+{
+ dict *d = createDictIfNeeded(kvs, didx);
+ if (!d)
+ return DICT_ERR;
+ return dictExpand(d, size);
+}
+
+unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata)
+{
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (!d)
+ return 0;
+ return dictScanDefrag(d, v, fn, defragfns, privdata);
+}
+
+/* Unlike kvstoreDictScanDefrag(), this method doesn't defrag the data(keys and values)
+ * within dict, it only reallocates the memory used by the dict structure itself using
+ * the provided allocation function. This feature was added for the active defrag feature.
+ *
+ * With 16k dictionaries for cluster mode with 1 shard, this operation may require substantial time
+ * to execute. A "cursor" is used to perform the operation iteratively. When first called, a
+ * cursor value of 0 should be provided. The return value is an updated cursor which should be
+ * provided on the next iteration. The operation is complete when 0 is returned.
+ *
+ * The 'defragfn' callback is called with a reference to the dict that callback can reallocate. */
+unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn) {
+ for (int didx = cursor; didx < kvs->num_dicts; didx++) {
+ dict **d = kvstoreGetDictRef(kvs, didx), *newd;
+ if (!*d)
+ continue;
+ if ((newd = defragfn(*d))) {
+ *d = newd;
+
+ /* After defragmenting the dict, update its corresponding
+ * rehashing node in the kvstore's rehashing list. */
+ kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(*d);
+ if (metadata->rehashing_node)
+ metadata->rehashing_node->value = *d;
+ }
+ return (didx + 1);
+ }
+ return 0;
+}
+
+void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key)
+{
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (!d)
+ return NULL;
+ assert(d->type->no_value == 0);
+ return dictFetchValue(d, key);
+}
+
+dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key) {
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (!d)
+ return NULL;
+ return dictFind(d, key);
+}
+
+/* Find a link to a key in the specified kvstore. If not found return NULL.
+ *
+ * This function is a wrapper around dictFindLink(), used to locate a key in a dict
+ * from a kvstore.
+ *
+ * The caller may provide a bucket pointer to receive the reference to the bucket
+ * where the key is stored or need to be added.
+ *
+ * Returns:
+ * A reference to the dictEntry if found, otherwise NULL.
+ *
+ * Important:
+ * After calling kvstoreDictFindLink(), any necessary updates based on returned
+ * link or bucket must be made immediately after, commonly by kvstoreDictSetAtLink()
+ * without any operations in between that might modify the dict. Otherwise,
+ * the link or bucket may become invalid. Example usage:
+ *
+ * link = kvstoreDictFindLink(kvs, didx, key, &bucket);
+ * ... Do something, but don't modify kvs->dicts[didx] ...
+ * if (link)
+ * kvstoreDictSetAtLink(kvs, didx, kv, &link, 0); // Update existing entry
+ * else
+ * kvstoreDictSetAtLink(kvs, didx, kv, &bucket, 1); // Insert new entry
+ */
+dictEntryLink kvstoreDictFindLink(kvstore *kvs, int didx, void *key, dictEntryLink *bucket) {
+ if (bucket) *bucket = NULL;
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (!d) return NULL;
+ return dictFindLink(d, key, bucket);
+}
+
+/* Set a key (or key-value) in the specified kvstore.
+ *
+ * This function inserts a new key or updates an existing one, depending on
+ * the `newItem` flag.
+ *
+ * Parameters:
+ * link: - When `newItem` is set, `link` points to the bucket of the key.
+ * - When `newItem` is not set, `link` points to the link of the key.
+ * - If link is NULL, dictFindLink() will be called to locate the link.
+ *
+ * newItem: - If set, add a new key with a new dictEntry.
+ * - If not set, update the key of an existing dictEntry.
+ */
+void kvstoreDictSetAtLink(kvstore *kvs, int didx, void *kv, dictEntryLink *link, int newItem) {
+ dict *d;
+ if (newItem) {
+ d = createDictIfNeeded(kvs, didx);
+ dictSetKeyAtLink(d, kv, link, newItem);
+ cumulativeKeyCountAdd(kvs, didx, 1); /* must be called only after updating dict */
+ } else {
+ d = kvstoreGetDict(kvs, didx);
+ dictSetKeyAtLink(d, kv, link, newItem);
+ }
+}
+
+dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing) {
+ dict *d = createDictIfNeeded(kvs, didx);
+ dictEntry *ret = dictAddRaw(d, key, existing);
+ if (ret)
+ cumulativeKeyCountAdd(kvs, didx, 1);
+ return ret;
+}
+
+void kvstoreDictSetKey(kvstore *kvs, int didx, dictEntry* de, void *key) {
+ dict *d = kvstoreGetDict(kvs, didx);
+ dictSetKey(d, de, key);
+}
+
+void kvstoreDictSetVal(kvstore *kvs, int didx, dictEntry *de, void *val) {
+ dict *d = kvstoreGetDict(kvs, didx);
+ assert(d->type->no_value == 0);
+ dictSetVal(d, de, val);
+}
+
+dictEntryLink kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, int *table_index) {
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (!d)
+ return NULL;
+ return dictTwoPhaseUnlinkFind(kvstoreGetDict(kvs, didx), key, table_index);
+}
+
+void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntryLink link, int table_index) {
+ dict *d = kvstoreGetDict(kvs, didx);
+ dictTwoPhaseUnlinkFree(d, link, table_index);
+ cumulativeKeyCountAdd(kvs, didx, -1);
+ freeDictIfNeeded(kvs, didx);
+}
+
+int kvstoreDictDelete(kvstore *kvs, int didx, const void *key) {
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (!d)
+ return DICT_ERR;
+ int ret = dictDelete(d, key);
+ if (ret == DICT_OK) {
+ cumulativeKeyCountAdd(kvs, didx, -1);
+ freeDictIfNeeded(kvs, didx);
+ }
+ return ret;
+}
+
+void *kvstoreGetDictMeta(kvstore *kvs, int didx, int createIfNeeded) {
+ dict *d = kvstoreGetDict(kvs, didx);
+ if (!d) {
+ if (!createIfNeeded) return NULL;
+ d = createDictIfNeeded(kvs, didx);
+ }
+ return dictMetadata(d);
+}
+
+void *kvstoreGetMetadata(kvstore *kvs) {
+ return &kvs->metadata;
+}
+
+#ifdef REDIS_TEST
+#include <stdio.h>
+#include "testhelp.h"
+
+#define TEST(name) printf("test — %s\n", name);
+
+uint64_t hashTestCallback(const void *key) {
+ return dictGenHashFunction((unsigned char*)key, strlen((char*)key));
+}
+
+void freeTestCallback(dict *d, void *val) {
+ UNUSED(d);
+ zfree(val);
+}
+
+void *defragAllocTest(void *ptr) {
+ size_t size = zmalloc_usable_size(ptr);
+ void *newptr = zmalloc(size);
+ memcpy(newptr, ptr, size);
+ zfree(ptr);
+ return newptr;
+}
+
+dict *defragLUTTestCallback(dict *d) {
+ /* handle the dict struct */
+ d = defragAllocTest(d);
+ /* handle the first hash table */
+ d->ht_table[0] = defragAllocTest(d->ht_table[0]);
+ /* handle the second hash table */
+ if (d->ht_table[1])
+ d->ht_table[1] = defragAllocTest(d->ht_table[1]);
+ return d;
+}
+
+dictType KvstoreDictTestType = {
+ hashTestCallback,
+ NULL,
+ NULL,
+ NULL,
+ freeTestCallback,
+ NULL,
+ NULL
+};
+
+kvstoreType KvstoreTestType = {
+ NULL, /* kvstore metadata size */
+ NULL, /* dict metadata size */
+ NULL, /* can free dict */
+ NULL, /* on kvstore empty */
+ NULL, /* on dict empty */
+};
+
+char *stringFromInt(int value) {
+ char buf[32];
+ int len;
+ char *s;
+
+ len = snprintf(buf, sizeof(buf), "%d",value);
+ s = zmalloc(len+1);
+ memcpy(s, buf, len);
+ s[len] = '\0';
+ return s;
+}
+
+/* ./redis-server test kvstore */
+int kvstoreTest(int argc, char **argv, int flags) {
+ UNUSED(argc);
+ UNUSED(argv);
+ UNUSED(flags);
+
+ int i;
+ void *key;
+ dictEntry *de;
+ kvstoreIterator kvs_it;
+ kvstoreDictIterator kvs_di;
+
+ /* Test also dictType with no_value=1 */
+ dictType KvstoreDictNovalTestType = KvstoreDictTestType;
+ KvstoreDictNovalTestType.no_value = 1;
+
+ int didx = 0;
+ int curr_slot = 0;
+ kvstore *kvs1 = kvstoreCreate(&KvstoreTestType, &KvstoreDictTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
+ kvstore *kvs2 = kvstoreCreate(&KvstoreTestType, &KvstoreDictNovalTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS);
+
+ TEST("Add 16 keys") {
+ for (i = 0; i < 16; i++) {
+ de = kvstoreDictAddRaw(kvs1, didx, stringFromInt(i), NULL);
+ assert(de != NULL);
+ de = kvstoreDictAddRaw(kvs2, didx, stringFromInt(i), NULL);
+ assert(de != NULL);
+ }
+ assert(kvstoreDictSize(kvs1, didx) == 16);
+ assert(kvstoreSize(kvs1) == 16);
+ assert(kvstoreDictSize(kvs2, didx) == 16);
+ assert(kvstoreSize(kvs2) == 16);
+ }
+
+ TEST("kvstoreIterator creating and releasing without kvstoreIteratorNextDict()") {
+ kvstore *kvs = kvstoreCreate(&KvstoreTestType, &KvstoreDictNovalTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS);
+ kvstoreIterator kvs_iter;
+ kvstoreIteratorInit(&kvs_iter, kvs);
+ kvstoreIteratorReset(&kvs_iter);
+ kvstoreRelease(kvs);
+ }
+
+ TEST("kvstoreIterator case 1: removing all keys does not delete the empty dict") {
+ kvstoreIteratorInit(&kvs_it, kvs1);
+ while((de = kvstoreIteratorNext(&kvs_it)) != NULL) {
+ curr_slot = kvstoreIteratorGetCurrentDictIndex(&kvs_it);
+ key = dictGetKey(de);
+ assert(kvstoreDictDelete(kvs1, curr_slot, key) == DICT_OK);
+ }
+ kvstoreIteratorReset(&kvs_it);
+
+ dict *d = kvstoreGetDict(kvs1, didx);
+ assert(d != NULL);
+ assert(kvstoreDictSize(kvs1, didx) == 0);
+ assert(kvstoreSize(kvs1) == 0);
+ }
+
+ TEST("kvstoreIterator case 2: removing all keys will delete the empty dict") {
+ kvstoreIteratorInit(&kvs_it, kvs2);
+ while((de = kvstoreIteratorNext(&kvs_it)) != NULL) {
+ curr_slot = kvstoreIteratorGetCurrentDictIndex(&kvs_it);
+ key = dictGetKey(de);
+ assert(kvstoreDictDelete(kvs2, curr_slot, key) == DICT_OK);
+ }
+ kvstoreIteratorReset(&kvs_it);
+
+ /* Make sure the dict was removed from the rehashing list. */
+ while (kvstoreIncrementallyRehash(kvs2, 1000)) {}
+
+ dict *d = kvstoreGetDict(kvs2, didx);
+ assert(d == NULL);
+ assert(kvstoreDictSize(kvs2, didx) == 0);
+ assert(kvstoreSize(kvs2) == 0);
+ }
+
+ TEST("Add 16 keys again") {
+ for (i = 0; i < 16; i++) {
+ de = kvstoreDictAddRaw(kvs1, didx, stringFromInt(i), NULL);
+ assert(de != NULL);
+ de = kvstoreDictAddRaw(kvs2, didx, stringFromInt(i), NULL);
+ assert(de != NULL);
+ }
+ assert(kvstoreDictSize(kvs1, didx) == 16);
+ assert(kvstoreSize(kvs1) == 16);
+ assert(kvstoreDictSize(kvs2, didx) == 16);
+ assert(kvstoreSize(kvs2) == 16);
+ }
+
+ TEST("kvstoreDictIterator case 1: removing all keys does not delete the empty dict") {
+ kvstoreInitDictSafeIterator(&kvs_di, kvs1, didx);
+ while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
+ key = dictGetKey(de);
+ assert(kvstoreDictDelete(kvs1, didx, key) == DICT_OK);
+ }
+ kvstoreResetDictIterator(&kvs_di);
+
+ dict *d = kvstoreGetDict(kvs1, didx);
+ assert(d != NULL);
+ assert(kvstoreDictSize(kvs1, didx) == 0);
+ assert(kvstoreSize(kvs1) == 0);
+ }
+
+ TEST("kvstoreDictIterator case 2: removing all keys will delete the empty dict") {
+ kvstoreInitDictSafeIterator(&kvs_di, kvs2, didx);
+ while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
+ key = dictGetKey(de);
+ assert(kvstoreDictDelete(kvs2, didx, key) == DICT_OK);
+ }
+ kvstoreResetDictIterator(&kvs_di);
+
+ dict *d = kvstoreGetDict(kvs2, didx);
+ assert(d == NULL);
+ assert(kvstoreDictSize(kvs2, didx) == 0);
+ assert(kvstoreSize(kvs2) == 0);
+ }
+
+ TEST("Verify that a rehashing dict's node in the rehashing list is correctly updated after defragmentation") {
+ unsigned long cursor = 0;
+ kvstore *kvs = kvstoreCreate(&KvstoreTestType, &KvstoreDictTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
+ for (i = 0; i < 256; i++) {
+ de = kvstoreDictAddRaw(kvs, 0, stringFromInt(i), NULL);
+ if (listLength(kvs->rehashing)) break;
+ }
+ assert(listLength(kvs->rehashing));
+ while ((cursor = kvstoreDictLUTDefrag(kvs, cursor, defragLUTTestCallback)) != 0) {}
+ while (kvstoreIncrementallyRehash(kvs, 1000)) {}
+ kvstoreRelease(kvs);
+ }
+
+ TEST("Verify non-empty dict count is correctly updated") {
+ kvstore *kvs = kvstoreCreate(&KvstoreTestType, &KvstoreDictTestType, 2,
+ KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
+ for (int idx = 0; idx < 4; idx++) {
+ for (i = 0; i < 16; i++) {
+ de = kvstoreDictAddRaw(kvs, idx, stringFromInt(i), NULL);
+ assert(de != NULL);
+ /* When the first element is inserted, the number of non-empty dictionaries is increased by 1. */
+ if (i == 0) assert(kvstoreNumNonEmptyDicts(kvs) == idx + 1);
+ }
+ }
+
+ /* Step by step, clear all dictionaries and ensure non-empty dict count is updated */
+ for (int idx = 0; idx < 4; idx++) {
+ kvstoreInitDictSafeIterator(&kvs_di, kvs, idx);
+ while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
+ key = dictGetKey(de);
+ assert(kvstoreDictDelete(kvs, idx, key) == DICT_OK);
+ /* When the dictionary is emptied, the number of non-empty dictionaries is reduced by 1. */
+ if (kvstoreDictSize(kvs, idx) == 0) assert(kvstoreNumNonEmptyDicts(kvs) == 3 - idx);
+ }
+ kvstoreResetDictIterator(&kvs_di);
+ }
+ kvstoreRelease(kvs);
+ }
+
+ kvstoreRelease(kvs1);
+ kvstoreRelease(kvs2);
+ return 0;
+}
+#endif