diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
| commit | dcacc00e3750300617ba6e16eb346713f91a783a (patch) | |
| tree | 38e2d4fb5ed9d119711d4295c6eda4b014af73fd /examples/redis-unstable/src/kvstore.c | |
| parent | 58dac10aeb8f5a041c46bddbeaf4c7966a99b998 (diff) | |
| download | crep-dcacc00e3750300617ba6e16eb346713f91a783a.tar.gz | |
Remove testing data
Diffstat (limited to 'examples/redis-unstable/src/kvstore.c')
| -rw-r--r-- | examples/redis-unstable/src/kvstore.c | 1171 |
1 files changed, 0 insertions, 1171 deletions
diff --git a/examples/redis-unstable/src/kvstore.c b/examples/redis-unstable/src/kvstore.c deleted file mode 100644 index fb9e7d5..0000000 --- a/examples/redis-unstable/src/kvstore.c +++ /dev/null | |||
| @@ -1,1171 +0,0 @@ | |||
| 1 | /* | ||
| 2 | * Copyright (c) 2011-Present, Redis Ltd. and contributors. | ||
| 3 | * All rights reserved. | ||
| 4 | * | ||
| 5 | * Copyright (c) 2024-present, Valkey contributors. | ||
| 6 | * All rights reserved. | ||
| 7 | * | ||
| 8 | * Licensed under your choice of (a) the Redis Source Available License 2.0 | ||
| 9 | * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the | ||
| 10 | * GNU Affero General Public License v3 (AGPLv3). | ||
| 11 | * | ||
| 12 | * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. | ||
| 13 | */ | ||
| 14 | |||
| 15 | #include "fmacros.h" | ||
| 16 | |||
| 17 | #include <string.h> | ||
| 18 | #include <stddef.h> | ||
| 19 | |||
| 20 | #include "zmalloc.h" | ||
| 21 | #include "kvstore.h" | ||
| 22 | #include "fwtree.h" | ||
| 23 | #include "redisassert.h" | ||
| 24 | #include "monotonic.h" | ||
| 25 | |||
| 26 | #define UNUSED(V) ((void) V) | ||
| 27 | |||
| 28 | struct _kvstore { | ||
| 29 | int flags; | ||
| 30 | kvstoreType *type; | ||
| 31 | dictType dtype; | ||
| 32 | dict **dicts; | ||
| 33 | long long num_dicts; | ||
| 34 | long long num_dicts_bits; | ||
| 35 | list *rehashing; /* List of dictionaries in this kvstore that are currently rehashing. */ | ||
| 36 | int resize_cursor; /* Cron job uses this cursor to gradually resize dictionaries (only used if num_dicts > 1). */ | ||
| 37 | int allocated_dicts; /* The number of allocated dicts. */ | ||
| 38 | int non_empty_dicts; /* The number of non-empty dicts. */ | ||
| 39 | unsigned long long key_count; /* Total number of keys in this kvstore. */ | ||
| 40 | unsigned long long bucket_count; /* Total number of buckets in this kvstore across dictionaries. */ | ||
| 41 | fenwickTree *dict_sizes; /* Binary indexed tree (BIT) that describes cumulative key frequencies up until given dict-index. */ | ||
| 42 | size_t overhead_hashtable_rehashing; /* The overhead of dictionaries rehashing. */ | ||
| 43 | void *metadata[]; /* conditionally allocated based on "flags" */ | ||
| 44 | }; | ||
| 45 | |||
| 46 | /**********************************/ | ||
| 47 | /*** Helpers **********************/ | ||
| 48 | /**********************************/ | ||
| 49 | |||
| 50 | /* Get the dictionary pointer based on dict-index. */ | ||
| 51 | dict *kvstoreGetDict(kvstore *kvs, int didx) { | ||
| 52 | return kvs->dicts[didx]; | ||
| 53 | } | ||
| 54 | |||
| 55 | static dict **kvstoreGetDictRef(kvstore *kvs, int didx) { | ||
| 56 | return &kvs->dicts[didx]; | ||
| 57 | } | ||
| 58 | |||
| 59 | static int kvstoreDictIsRehashingPaused(kvstore *kvs, int didx) | ||
| 60 | { | ||
| 61 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 62 | return d ? dictIsRehashingPaused(d) : 0; | ||
| 63 | } | ||
| 64 | |||
| 65 | static void addDictIndexToCursor(kvstore *kvs, int didx, unsigned long long *cursor) { | ||
| 66 | if (kvs->num_dicts == 1) | ||
| 67 | return; | ||
| 68 | /* didx can be -1 when iteration is over and there are no more dicts to visit. */ | ||
| 69 | if (didx < 0) | ||
| 70 | return; | ||
| 71 | *cursor = (*cursor << kvs->num_dicts_bits) | didx; | ||
| 72 | } | ||
| 73 | |||
| 74 | static int getAndClearDictIndexFromCursor(kvstore *kvs, unsigned long long *cursor) { | ||
| 75 | if (kvs->num_dicts == 1) | ||
| 76 | return 0; | ||
| 77 | int didx = (int) (*cursor & (kvs->num_dicts-1)); | ||
| 78 | *cursor = *cursor >> kvs->num_dicts_bits; | ||
| 79 | return didx; | ||
| 80 | } | ||
| 81 | |||
| 82 | /* Updates binary index tree (Fenwick tree), updates key count for a given dict */ | ||
| 83 | static void cumulativeKeyCountAdd(kvstore *kvs, int didx, long delta) { | ||
| 84 | kvs->key_count += delta; | ||
| 85 | |||
| 86 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 87 | size_t dsize = dictSize(d); | ||
| 88 | /* Increment if dsize is 1 and delta is positive (first element inserted, dict becomes non-empty). | ||
| 89 | * Decrement if dsize is 0 (dict becomes empty). */ | ||
| 90 | int non_empty_dicts_delta = (dsize == 1 && delta > 0) ? 1 : (dsize == 0) ? -1 : 0; | ||
| 91 | kvs->non_empty_dicts += non_empty_dicts_delta; | ||
| 92 | |||
| 93 | /* BIT does not need to be calculated when there's only one dict. */ | ||
| 94 | if (kvs->num_dicts == 1) | ||
| 95 | return; | ||
| 96 | |||
| 97 | /* Update the BIT */ | ||
| 98 | fwTreeUpdate(kvs->dict_sizes, didx, delta); | ||
| 99 | } | ||
| 100 | |||
| 101 | /* Create the dict if it does not exist and return it. */ | ||
| 102 | static dict *createDictIfNeeded(kvstore *kvs, int didx) { | ||
| 103 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 104 | if (d) return d; | ||
| 105 | |||
| 106 | kvs->dicts[didx] = dictCreate(&kvs->dtype); | ||
| 107 | kvs->allocated_dicts++; | ||
| 108 | return kvs->dicts[didx]; | ||
| 109 | } | ||
| 110 | |||
| 111 | /* Called when the dict will delete entries, the function will check | ||
| 112 | * KVSTORE_FREE_EMPTY_DICTS to determine whether the empty dict needs | ||
| 113 | * to be freed. | ||
| 114 | * | ||
| 115 | * Note that for rehashing dicts, that is, in the case of safe iterators | ||
| 116 | * and Scan, we won't delete the dict. We will check whether it needs | ||
| 117 | * to be deleted when we're releasing the iterator. */ | ||
| 118 | static void freeDictIfNeeded(kvstore *kvs, int didx) { | ||
| 119 | if (!(kvs->flags & KVSTORE_FREE_EMPTY_DICTS) || | ||
| 120 | !kvstoreGetDict(kvs, didx) || | ||
| 121 | kvstoreDictSize(kvs, didx) != 0 || | ||
| 122 | kvstoreDictIsRehashingPaused(kvs, didx)) | ||
| 123 | return; | ||
| 124 | |||
| 125 | /* Use callback if provided to check if dict can be freed */ | ||
| 126 | if (kvs->type->canFreeDict && !kvs->type->canFreeDict(kvs, didx)) | ||
| 127 | return; | ||
| 128 | |||
| 129 | dictRelease(kvs->dicts[didx]); | ||
| 130 | kvs->dicts[didx] = NULL; | ||
| 131 | kvs->allocated_dicts--; | ||
| 132 | } | ||
| 133 | |||
| 134 | void kvstoreFreeDictIfNeeded(kvstore *kvs, int didx) { | ||
| 135 | freeDictIfNeeded(kvs, didx); | ||
| 136 | } | ||
| 137 | |||
| 138 | /**********************************/ | ||
| 139 | /*** dict callbacks ***************/ | ||
| 140 | /**********************************/ | ||
| 141 | |||
| 142 | /* Adds dictionary to the rehashing list, which allows us | ||
| 143 | * to quickly find rehash targets during incremental rehashing. | ||
| 144 | * | ||
| 145 | * If there are multiple dicts, updates the bucket count for the given dictionary | ||
| 146 | * in a DB, bucket count incremented with the new ht size during the rehashing phase. | ||
| 147 | * If there's one dict, bucket count can be retrieved directly from single dict bucket. */ | ||
| 148 | static void kvstoreDictRehashingStarted(dict *d) { | ||
| 149 | kvstore *kvs = d->type->userdata; | ||
| 150 | kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d); | ||
| 151 | listAddNodeTail(kvs->rehashing, d); | ||
| 152 | metadata->rehashing_node = listLast(kvs->rehashing); | ||
| 153 | |||
| 154 | unsigned long long from, to; | ||
| 155 | dictRehashingInfo(d, &from, &to); | ||
| 156 | kvs->overhead_hashtable_rehashing += from; | ||
| 157 | } | ||
| 158 | |||
| 159 | /* Remove dictionary from the rehashing list. | ||
| 160 | * | ||
| 161 | * Updates the bucket count for the given dictionary in a DB. It removes | ||
| 162 | * the old ht size of the dictionary from the total sum of buckets for a DB. */ | ||
| 163 | static void kvstoreDictRehashingCompleted(dict *d) { | ||
| 164 | kvstore *kvs = d->type->userdata; | ||
| 165 | kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d); | ||
| 166 | if (metadata->rehashing_node) { | ||
| 167 | listDelNode(kvs->rehashing, metadata->rehashing_node); | ||
| 168 | metadata->rehashing_node = NULL; | ||
| 169 | } | ||
| 170 | |||
| 171 | unsigned long long from, to; | ||
| 172 | dictRehashingInfo(d, &from, &to); | ||
| 173 | kvs->overhead_hashtable_rehashing -= from; | ||
| 174 | } | ||
| 175 | |||
| 176 | /* Updates the bucket count for the given dictionary in a DB. It adds the new ht size | ||
| 177 | * of the dictionary or removes the old ht size of the dictionary from the total | ||
| 178 | * sum of buckets for a DB. */ | ||
| 179 | static void kvstoreDictBucketChanged(dict *d, long long delta) { | ||
| 180 | kvstore *kvs = d->type->userdata; | ||
| 181 | kvs->bucket_count += delta; | ||
| 182 | } | ||
| 183 | |||
| 184 | /* Returns the size of the DB dict extended metadata in bytes. */ | ||
| 185 | static size_t kvstoreDictBaseMetaSize(dict *d) { | ||
| 186 | UNUSED(d); | ||
| 187 | return sizeof(kvstoreDictMetaBase); | ||
| 188 | } | ||
| 189 | |||
| 190 | /**********************************/ | ||
| 191 | /*** API **************************/ | ||
| 192 | /**********************************/ | ||
| 193 | |||
| 194 | /* Create an array of dictionaries | ||
| 195 | * num_dicts_bits is the log2 of the amount of dictionaries needed (e.g. 0 for 1 dict, | ||
| 196 | * 3 for 8 dicts, etc.) */ | ||
| 197 | kvstore *kvstoreCreate(kvstoreType *type, dictType *dtype, int num_dicts_bits, int flags) { | ||
| 198 | /* We can't support more than 2^16 dicts because we want to save 48 bits | ||
| 199 | * for the dict cursor, see kvstoreScan */ | ||
| 200 | assert(num_dicts_bits <= 16); | ||
| 201 | assert(!type->dictMetadataBytes || type->dictMetadataBytes(NULL) >= sizeof(kvstoreDictMetaBase)); | ||
| 202 | |||
| 203 | /* Calc kvstore size */ | ||
| 204 | size_t kvsize = sizeof(kvstore); | ||
| 205 | /* Conditionally calc also histogram size */ | ||
| 206 | if (type->kvstoreMetadataBytes) | ||
| 207 | kvsize += type->kvstoreMetadataBytes(NULL); | ||
| 208 | |||
| 209 | kvstore *kvs = zcalloc(kvsize); | ||
| 210 | memcpy(&kvs->dtype, dtype, sizeof(kvs->dtype)); | ||
| 211 | kvs->flags = flags; | ||
| 212 | kvs->type = type; | ||
| 213 | |||
| 214 | /* kvstore must be the one to set these callbacks, so we make sure the | ||
| 215 | * caller didn't do it */ | ||
| 216 | assert(!dtype->userdata); | ||
| 217 | assert(!dtype->dictMetadataBytes); | ||
| 218 | assert(!dtype->rehashingStarted); | ||
| 219 | assert(!dtype->rehashingCompleted); | ||
| 220 | kvs->dtype.userdata = kvs; | ||
| 221 | kvs->dtype.dictMetadataBytes = type->dictMetadataBytes ? | ||
| 222 | type->dictMetadataBytes : kvstoreDictBaseMetaSize; | ||
| 223 | kvs->dtype.rehashingStarted = kvstoreDictRehashingStarted; | ||
| 224 | kvs->dtype.rehashingCompleted = kvstoreDictRehashingCompleted; | ||
| 225 | kvs->dtype.bucketChanged = kvstoreDictBucketChanged; | ||
| 226 | |||
| 227 | kvs->num_dicts_bits = num_dicts_bits; | ||
| 228 | kvs->num_dicts = 1 << kvs->num_dicts_bits; | ||
| 229 | kvs->dicts = zcalloc(sizeof(dict*) * kvs->num_dicts); | ||
| 230 | if (!(kvs->flags & KVSTORE_ALLOCATE_DICTS_ON_DEMAND)) { | ||
| 231 | for (int i = 0; i < kvs->num_dicts; i++) | ||
| 232 | createDictIfNeeded(kvs, i); | ||
| 233 | } | ||
| 234 | |||
| 235 | kvs->rehashing = listCreate(); | ||
| 236 | kvs->key_count = 0; | ||
| 237 | kvs->non_empty_dicts = 0; | ||
| 238 | kvs->resize_cursor = 0; | ||
| 239 | kvs->dict_sizes = kvs->num_dicts > 1 ? fwTreeCreate(kvs->num_dicts_bits) : NULL; | ||
| 240 | kvs->bucket_count = 0; | ||
| 241 | kvs->overhead_hashtable_rehashing = 0; | ||
| 242 | return kvs; | ||
| 243 | } | ||
| 244 | |||
| 245 | void kvstoreEmpty(kvstore *kvs, void(callback)(dict*)) { | ||
| 246 | for (int didx = 0; didx < kvs->num_dicts; didx++) { | ||
| 247 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 248 | if (!d) | ||
| 249 | continue; | ||
| 250 | kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d); | ||
| 251 | if (metadata->rehashing_node) | ||
| 252 | metadata->rehashing_node = NULL; | ||
| 253 | dictEmpty(d, callback); | ||
| 254 | if (kvs->type->onDictEmpty) kvs->type->onDictEmpty(kvs, didx); | ||
| 255 | freeDictIfNeeded(kvs, didx); | ||
| 256 | } | ||
| 257 | |||
| 258 | if (kvs->type->onKvstoreEmpty) kvs->type->onKvstoreEmpty(kvs); | ||
| 259 | |||
| 260 | listEmpty(kvs->rehashing); | ||
| 261 | |||
| 262 | kvs->key_count = 0; | ||
| 263 | kvs->non_empty_dicts = 0; | ||
| 264 | kvs->resize_cursor = 0; | ||
| 265 | kvs->bucket_count = 0; | ||
| 266 | if (kvs->dict_sizes) | ||
| 267 | fwTreeClear(kvs->dict_sizes); | ||
| 268 | kvs->overhead_hashtable_rehashing = 0; | ||
| 269 | } | ||
| 270 | |||
| 271 | void kvstoreRelease(kvstore *kvs) { | ||
| 272 | for (int didx = 0; didx < kvs->num_dicts; didx++) { | ||
| 273 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 274 | if (!d) | ||
| 275 | continue; | ||
| 276 | kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(d); | ||
| 277 | if (metadata->rehashing_node) | ||
| 278 | metadata->rehashing_node = NULL; | ||
| 279 | if (kvs->type->onDictEmpty) kvs->type->onDictEmpty(kvs, didx); | ||
| 280 | dictRelease(d); | ||
| 281 | } | ||
| 282 | zfree(kvs->dicts); | ||
| 283 | |||
| 284 | listRelease(kvs->rehashing); | ||
| 285 | if (kvs->dict_sizes) | ||
| 286 | fwTreeDestroy(kvs->dict_sizes); | ||
| 287 | |||
| 288 | zfree(kvs); | ||
| 289 | } | ||
| 290 | |||
| 291 | unsigned long long int kvstoreSize(kvstore *kvs) { | ||
| 292 | return kvs->key_count; | ||
| 293 | } | ||
| 294 | |||
| 295 | /* This method provides the cumulative sum of all the dictionary buckets | ||
| 296 | * across dictionaries in a database. */ | ||
| 297 | unsigned long kvstoreBuckets(kvstore *kvs) { | ||
| 298 | if (kvs->num_dicts != 1) { | ||
| 299 | return kvs->bucket_count; | ||
| 300 | } else { | ||
| 301 | return kvs->dicts[0]? dictBuckets(kvs->dicts[0]) : 0; | ||
| 302 | } | ||
| 303 | } | ||
| 304 | |||
| 305 | size_t kvstoreMemUsage(kvstore *kvs) { | ||
| 306 | size_t mem = sizeof(*kvs); | ||
| 307 | size_t metaSize = kvs->dtype.dictMetadataBytes(NULL); | ||
| 308 | unsigned long long keys_count = kvstoreSize(kvs); | ||
| 309 | mem += keys_count * dictEntryMemUsage(kvs->dtype.no_value) + | ||
| 310 | kvstoreBuckets(kvs) * sizeof(dictEntry*) + | ||
| 311 | kvs->allocated_dicts * (sizeof(dict) + metaSize); | ||
| 312 | |||
| 313 | /* Values are dict* shared with kvs->dicts */ | ||
| 314 | mem += listLength(kvs->rehashing) * sizeof(listNode); | ||
| 315 | |||
| 316 | return mem; | ||
| 317 | } | ||
| 318 | |||
| 319 | /* | ||
| 320 | * This method is used to iterate over the elements of the entire kvstore specifically across dicts. | ||
| 321 | * It's a three pronged approach. | ||
| 322 | * | ||
| 323 | * 1. It uses the provided cursor `cursor` to retrieve the dict index from it. | ||
| 324 | * 2. If the dictionary is in a valid state checked through the provided callback `dictScanValidFunction`, | ||
| 325 | * it performs a dictScan over the appropriate `keyType` dictionary of `db`. | ||
| 326 | * 3. If the dict is entirely scanned i.e. the cursor has reached 0, the next non empty dict is discovered. | ||
| 327 | * The dict information is embedded into the cursor and returned. | ||
| 328 | * | ||
| 329 | * To restrict the scan to a single dict, pass a valid dict index as | ||
| 330 | * 'onlydidx', otherwise pass -1. | ||
| 331 | */ | ||
| 332 | unsigned long long kvstoreScan(kvstore *kvs, unsigned long long cursor, | ||
| 333 | int onlydidx, dictScanFunction *scan_cb, | ||
| 334 | kvstoreScanShouldSkipDict *skip_cb, | ||
| 335 | void *privdata) | ||
| 336 | { | ||
| 337 | unsigned long long _cursor = 0; | ||
| 338 | /* During dictionary traversal, 48 upper bits in the cursor are used for positioning in the HT. | ||
| 339 | * Following lower bits are used for the dict index number, ranging from 0 to 2^num_dicts_bits-1. | ||
| 340 | * Dict index is always 0 at the start of iteration and can be incremented only if there are | ||
| 341 | * multiple dicts. */ | ||
| 342 | int didx = getAndClearDictIndexFromCursor(kvs, &cursor); | ||
| 343 | if (onlydidx >= 0) { | ||
| 344 | if (didx < onlydidx) { | ||
| 345 | /* Fast-forward to onlydidx. */ | ||
| 346 | assert(onlydidx < kvs->num_dicts); | ||
| 347 | didx = onlydidx; | ||
| 348 | cursor = 0; | ||
| 349 | } else if (didx > onlydidx) { | ||
| 350 | /* The cursor is already past onlydidx. */ | ||
| 351 | return 0; | ||
| 352 | } | ||
| 353 | } | ||
| 354 | |||
| 355 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 356 | |||
| 357 | int skip = !d || (skip_cb && skip_cb(d, didx)); | ||
| 358 | if (!skip) { | ||
| 359 | _cursor = dictScan(d, cursor, scan_cb, privdata); | ||
| 360 | /* In dictScan, scan_cb may delete entries (e.g., in active expire case). */ | ||
| 361 | freeDictIfNeeded(kvs, didx); | ||
| 362 | } | ||
| 363 | /* scanning done for the current dictionary or if the scanning wasn't possible, move to the next dict index. */ | ||
| 364 | if (_cursor == 0 || skip) { | ||
| 365 | if (onlydidx >= 0) | ||
| 366 | return 0; | ||
| 367 | didx = kvstoreGetNextNonEmptyDictIndex(kvs, didx); | ||
| 368 | } | ||
| 369 | if (didx == -1) { | ||
| 370 | return 0; | ||
| 371 | } | ||
| 372 | addDictIndexToCursor(kvs, didx, &_cursor); | ||
| 373 | return _cursor; | ||
| 374 | } | ||
| 375 | |||
| 376 | /* | ||
| 377 | * This functions increases size of kvstore to match desired number. | ||
| 378 | * It resizes all individual dictionaries, unless skip_cb indicates otherwise. | ||
| 379 | * | ||
| 380 | * Based on the parameter `try_expand`, appropriate dict expand API is invoked. | ||
| 381 | * if try_expand is set to 1, `dictTryExpand` is used else `dictExpand`. | ||
| 382 | * The return code is either `DICT_OK`/`DICT_ERR` for both the API(s). | ||
| 383 | * `DICT_OK` response is for successful expansion. However, `DICT_ERR` response signifies failure in allocation in | ||
| 384 | * `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed. | ||
| 385 | */ | ||
| 386 | int kvstoreExpand(kvstore *kvs, uint64_t newsize, int try_expand, kvstoreExpandShouldSkipDictIndex *skip_cb) { | ||
| 387 | for (int i = 0; i < kvs->num_dicts; i++) { | ||
| 388 | dict *d = kvstoreGetDict(kvs, i); | ||
| 389 | if (!d || (skip_cb && skip_cb(i))) | ||
| 390 | continue; | ||
| 391 | int result = try_expand ? dictTryExpand(d, newsize) : dictExpand(d, newsize); | ||
| 392 | if (try_expand && result == DICT_ERR) | ||
| 393 | return 0; | ||
| 394 | } | ||
| 395 | |||
| 396 | return 1; | ||
| 397 | } | ||
| 398 | |||
| 399 | /* Returns fair random dict index, probability of each dict being returned is | ||
| 400 | * proportional to the number of elements that dictionary holds. | ||
| 401 | * This function guarantees that it returns a dict-index of a non-empty dict, | ||
| 402 | * unless the entire kvstore is empty or all dicts are skipped. | ||
| 403 | * | ||
| 404 | * Parameters: | ||
| 405 | * - kvs: the kvstore instance | ||
| 406 | * - skip_cb: callback to determine if a dict should be skipped (NULL means no skipping) | ||
| 407 | * - fair_attempts: number of fair selection attempts before falling back | ||
| 408 | * - slow_fallback: if 1, uses systematic search when fair attempts fail | ||
| 409 | * | ||
| 410 | * Returns: | ||
| 411 | * - Valid dict index (>= 0) on success | ||
| 412 | * - -1 if no valid dict found (either slow_fallback is 0 or all dicts are skipped) | ||
| 413 | * | ||
| 414 | * Time complexity: O(fair_attempts * log(kvs->num_dicts)) for fair attempts, | ||
| 415 | * plus O(kvs->num_dicts) for systematic fallback if enabled. | ||
| 416 | */ | ||
| 417 | int kvstoreGetFairRandomDictIndex(kvstore *kvs, kvstoreRandomShouldSkipDictIndex *skip_cb, | ||
| 418 | int fair_attempts, int slow_fallback) | ||
| 419 | { | ||
| 420 | if (kvs->num_dicts == 1 || kvstoreSize(kvs) == 0) | ||
| 421 | return 0; | ||
| 422 | |||
| 423 | unsigned long long total_size = kvstoreSize(kvs); | ||
| 424 | |||
| 425 | /* Try fair attempts first. If skip_cb is not applicable, execute only once. */ | ||
| 426 | for (int attempt = 0; attempt < fair_attempts; attempt++) { | ||
| 427 | unsigned long target = (randomULong() % total_size) + 1; | ||
| 428 | int didx = kvstoreFindDictIndexByKeyIndex(kvs, target); | ||
| 429 | if (!skip_cb || !skip_cb(didx)) { | ||
| 430 | return didx; | ||
| 431 | } | ||
| 432 | } | ||
| 433 | |||
| 434 | /* If fair attempts failed and slow fallback is allowed */ | ||
| 435 | if (slow_fallback) { | ||
| 436 | /* systematic check from random start */ | ||
| 437 | int start = randomULong() % kvs->num_dicts; | ||
| 438 | for (int i = 0; i < kvs->num_dicts; i++) { | ||
| 439 | int didx = (start + i) % kvs->num_dicts; | ||
| 440 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 441 | if (d && (!skip_cb || !skip_cb(didx))) { | ||
| 442 | return didx; | ||
| 443 | } | ||
| 444 | } | ||
| 445 | } | ||
| 446 | |||
| 447 | /* Failed to find valid dict that has elements */ | ||
| 448 | return -1; | ||
| 449 | } | ||
| 450 | |||
| 451 | void kvstoreGetStats(kvstore *kvs, char *buf, size_t bufsize, int full) { | ||
| 452 | buf[0] = '\0'; | ||
| 453 | |||
| 454 | size_t l; | ||
| 455 | char *orig_buf = buf; | ||
| 456 | size_t orig_bufsize = bufsize; | ||
| 457 | dictStats *mainHtStats = NULL; | ||
| 458 | dictStats *rehashHtStats = NULL; | ||
| 459 | dict *d; | ||
| 460 | kvstoreIterator kvs_it; | ||
| 461 | |||
| 462 | kvstoreIteratorInit(&kvs_it, kvs); | ||
| 463 | while ((d = kvstoreIteratorNextDict(&kvs_it))) { | ||
| 464 | dictStats *stats = dictGetStatsHt(d, 0, full); | ||
| 465 | if (!mainHtStats) { | ||
| 466 | mainHtStats = stats; | ||
| 467 | } else { | ||
| 468 | dictCombineStats(stats, mainHtStats); | ||
| 469 | dictFreeStats(stats); | ||
| 470 | } | ||
| 471 | if (dictIsRehashing(d)) { | ||
| 472 | stats = dictGetStatsHt(d, 1, full); | ||
| 473 | if (!rehashHtStats) { | ||
| 474 | rehashHtStats = stats; | ||
| 475 | } else { | ||
| 476 | dictCombineStats(stats, rehashHtStats); | ||
| 477 | dictFreeStats(stats); | ||
| 478 | } | ||
| 479 | } | ||
| 480 | } | ||
| 481 | kvstoreIteratorReset(&kvs_it); | ||
| 482 | |||
| 483 | if (mainHtStats && bufsize > 0) { | ||
| 484 | l = dictGetStatsMsg(buf, bufsize, mainHtStats, full); | ||
| 485 | dictFreeStats(mainHtStats); | ||
| 486 | buf += l; | ||
| 487 | bufsize -= l; | ||
| 488 | } | ||
| 489 | |||
| 490 | if (rehashHtStats && bufsize > 0) { | ||
| 491 | l = dictGetStatsMsg(buf, bufsize, rehashHtStats, full); | ||
| 492 | dictFreeStats(rehashHtStats); | ||
| 493 | buf += l; | ||
| 494 | bufsize -= l; | ||
| 495 | } | ||
| 496 | /* Make sure there is a NULL term at the end. */ | ||
| 497 | if (orig_bufsize) orig_buf[orig_bufsize - 1] = '\0'; | ||
| 498 | } | ||
| 499 | |||
| 500 | /* Finds a dict containing target element in a key space ordered by dict index. | ||
| 501 | * Consider this example. Dictionaries are represented by brackets and keys by dots: | ||
| 502 | * #0 #1 #2 #3 #4 | ||
| 503 | * [..][....][...][.......][.] | ||
| 504 | * ^ | ||
| 505 | * target | ||
| 506 | * | ||
| 507 | * In this case dict #3 contains key that we are trying to find. | ||
| 508 | * | ||
| 509 | * The return value is 0 based dict-index, and the range of the target is [1..kvstoreSize], kvstoreSize inclusive. | ||
| 510 | * | ||
| 511 | * To find the dict, we start with the root node of the binary index tree and search through its children | ||
| 512 | * from the highest index (2^num_dicts_bits in our case) to the lowest index. At each node, we check if the target | ||
| 513 | * value is greater than the node's value. If it is, we remove the node's value from the target and recursively | ||
| 514 | * search for the new target using the current node as the parent. | ||
| 515 | * Time complexity of this function is O(log(kvs->num_dicts)) | ||
| 516 | */ | ||
| 517 | int kvstoreFindDictIndexByKeyIndex(kvstore *kvs, unsigned long target) { | ||
| 518 | if (kvs->num_dicts == 1 || kvstoreSize(kvs) == 0) | ||
| 519 | return 0; | ||
| 520 | assert(target <= kvstoreSize(kvs)); | ||
| 521 | |||
| 522 | return fwTreeFindIndex(kvs->dict_sizes, target); | ||
| 523 | } | ||
| 524 | |||
| 525 | /* Get the first non-empty dict index in the kvstore. Returns -1 if kvstore is empty. */ | ||
| 526 | int kvstoreGetFirstNonEmptyDictIndex(kvstore *kvs) { | ||
| 527 | if (kvstoreSize(kvs) == 0) | ||
| 528 | return -1; | ||
| 529 | if (kvs->num_dicts == 1) | ||
| 530 | return 0; | ||
| 531 | return fwTreeFindFirstNonEmpty(kvs->dict_sizes); | ||
| 532 | } | ||
| 533 | |||
| 534 | /* Returns next non-empty dict index strictly after given one, or -1 if provided didx is the last one. */ | ||
| 535 | int kvstoreGetNextNonEmptyDictIndex(kvstore *kvs, int didx) { | ||
| 536 | if (kvs->num_dicts == 1) { | ||
| 537 | assert(didx == 0); | ||
| 538 | return -1; | ||
| 539 | } | ||
| 540 | return fwTreeFindNextNonEmpty(kvs->dict_sizes, didx); | ||
| 541 | } | ||
| 542 | |||
| 543 | int kvstoreNumNonEmptyDicts(kvstore *kvs) { | ||
| 544 | return kvs->non_empty_dicts; | ||
| 545 | } | ||
| 546 | |||
| 547 | int kvstoreNumAllocatedDicts(kvstore *kvs) { | ||
| 548 | return kvs->allocated_dicts; | ||
| 549 | } | ||
| 550 | |||
| 551 | int kvstoreNumDicts(kvstore *kvs) { | ||
| 552 | return kvs->num_dicts; | ||
| 553 | } | ||
| 554 | |||
| 555 | /* Move dict from one kvstore to another. */ | ||
| 556 | void kvstoreMoveDict(kvstore *kvs, kvstore *dst, int didx) { | ||
| 557 | assert(kvs->num_dicts > didx); | ||
| 558 | assert(kvs->num_dicts == dst->num_dicts); | ||
| 559 | assert(dst->dicts[didx] == NULL); | ||
| 560 | |||
| 561 | dict *d = kvs->dicts[didx]; | ||
| 562 | if (d == NULL) return; | ||
| 563 | |||
| 564 | /* Adjust source kvstore */ | ||
| 565 | kvs->allocated_dicts -= 1; | ||
| 566 | cumulativeKeyCountAdd(kvs, didx, -((long long)dictSize(d))); | ||
| 567 | kvstoreDictBucketChanged(d, -((long long) dictBuckets(d))); | ||
| 568 | /* If rehashing, stop it. */ | ||
| 569 | if (dictIsRehashing(d)) | ||
| 570 | kvstoreDictRehashingCompleted(d); | ||
| 571 | /* Clear dict from source kvstore and create a new one if needed */ | ||
| 572 | kvs->dicts[didx] = NULL; | ||
| 573 | if (!(kvs->flags & (KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS))) | ||
| 574 | createDictIfNeeded(kvs, didx); | ||
| 575 | |||
| 576 | /* Move dict to destination kvstore */ | ||
| 577 | dst->dicts[didx] = d; | ||
| 578 | dst->dicts[didx]->type = &dst->dtype; | ||
| 579 | dst->allocated_dicts += 1; | ||
| 580 | cumulativeKeyCountAdd(dst, didx, dictSize(d)); | ||
| 581 | kvstoreDictBucketChanged(d, dictBuckets(d)); | ||
| 582 | if (dictIsRehashing(dst->dicts[didx])) | ||
| 583 | kvstoreDictRehashingStarted(dst->dicts[didx]); | ||
| 584 | } | ||
| 585 | |||
| 586 | /* Returns kvstore iterator that can be used to iterate through sub-dictionaries. | ||
| 587 | * | ||
| 588 | * The caller should reset kvs_it with kvstoreIteratorReset. */ | ||
| 589 | void kvstoreIteratorInit(kvstoreIterator *kvs_it, kvstore *kvs) { | ||
| 590 | kvs_it->kvs = kvs; | ||
| 591 | kvs_it->didx = -1; | ||
| 592 | kvs_it->next_didx = kvstoreGetFirstNonEmptyDictIndex(kvs_it->kvs); /* Finds first non-empty dict index. */ | ||
| 593 | dictInitSafeIterator(&kvs_it->di, NULL); | ||
| 594 | } | ||
| 595 | |||
| 596 | /* Free the kvs_it returned by kvstoreIteratorInit. */ | ||
| 597 | void kvstoreIteratorReset(kvstoreIterator *kvs_it) { | ||
| 598 | dictIterator *iter = &kvs_it->di; | ||
| 599 | dictResetIterator(iter); | ||
| 600 | /* In the safe iterator context, we may delete entries. */ | ||
| 601 | if (kvs_it->didx != -1) | ||
| 602 | freeDictIfNeeded(kvs_it->kvs, kvs_it->didx); | ||
| 603 | } | ||
| 604 | |||
| 605 | /* Returns next dictionary from the iterator, or NULL if iteration is complete. | ||
| 606 | * | ||
| 607 | * - Takes care to reset the iter of the previous dict before moved to the next dict. | ||
| 608 | */ | ||
| 609 | dict *kvstoreIteratorNextDict(kvstoreIterator *kvs_it) { | ||
| 610 | if (kvs_it->next_didx == -1) | ||
| 611 | return NULL; | ||
| 612 | |||
| 613 | /* The dict may be deleted during the iteration process, so here need to check for NULL. */ | ||
| 614 | if (kvs_it->didx != -1 && kvstoreGetDict(kvs_it->kvs, kvs_it->didx)) { | ||
| 615 | /* Before we move to the next dict, reset the iter of the previous dict. */ | ||
| 616 | dictIterator *iter = &kvs_it->di; | ||
| 617 | dictResetIterator(iter); | ||
| 618 | /* In the safe iterator context, we may delete entries. */ | ||
| 619 | freeDictIfNeeded(kvs_it->kvs, kvs_it->didx); | ||
| 620 | } | ||
| 621 | |||
| 622 | kvs_it->didx = kvs_it->next_didx; | ||
| 623 | kvs_it->next_didx = kvstoreGetNextNonEmptyDictIndex(kvs_it->kvs, kvs_it->didx); | ||
| 624 | return kvs_it->kvs->dicts[kvs_it->didx]; | ||
| 625 | } | ||
| 626 | |||
| 627 | int kvstoreIteratorGetCurrentDictIndex(kvstoreIterator *kvs_it) { | ||
| 628 | assert(kvs_it->didx >= 0 && kvs_it->didx < kvs_it->kvs->num_dicts); | ||
| 629 | return kvs_it->didx; | ||
| 630 | } | ||
| 631 | |||
| 632 | /* Returns next entry. */ | ||
| 633 | dictEntry *kvstoreIteratorNext(kvstoreIterator *kvs_it) { | ||
| 634 | dictEntry *de = kvs_it->di.d ? dictNext(&kvs_it->di) : NULL; | ||
| 635 | if (!de) { /* No current dict or reached the end of the dictionary. */ | ||
| 636 | |||
| 637 | /* Before we move to the next dict, function kvstoreIteratorNextDict() | ||
| 638 | * reset the iter of the previous dict & freeDictIfNeeded(). */ | ||
| 639 | dict *d = kvstoreIteratorNextDict(kvs_it); | ||
| 640 | |||
| 641 | if (!d) | ||
| 642 | return NULL; | ||
| 643 | |||
| 644 | dictInitSafeIterator(&kvs_it->di, d); | ||
| 645 | de = dictNext(&kvs_it->di); | ||
| 646 | } | ||
| 647 | return de; | ||
| 648 | } | ||
| 649 | |||
| 650 | /* This method traverses through kvstore dictionaries and triggers a resize. | ||
| 651 | * It first tries to shrink if needed, and if it isn't, it tries to expand. */ | ||
| 652 | void kvstoreTryResizeDicts(kvstore *kvs, int limit) { | ||
| 653 | if (limit > kvs->num_dicts) | ||
| 654 | limit = kvs->num_dicts; | ||
| 655 | |||
| 656 | for (int i = 0; i < limit; i++) { | ||
| 657 | int didx = kvs->resize_cursor; | ||
| 658 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 659 | if (d && dictShrinkIfNeeded(d) == DICT_ERR) { | ||
| 660 | dictExpandIfNeeded(d); | ||
| 661 | } | ||
| 662 | kvs->resize_cursor = (didx + 1) % kvs->num_dicts; | ||
| 663 | } | ||
| 664 | } | ||
| 665 | |||
| 666 | /* Our hash table implementation performs rehashing incrementally while | ||
| 667 | * we write/read from the hash table. Still if the server is idle, the hash | ||
| 668 | * table will use two tables for a long time. So we try to use threshold_us | ||
| 669 | * of CPU time at every call of this function to perform some rehashing. | ||
| 670 | * | ||
| 671 | * The function returns the amount of microsecs spent if some rehashing was | ||
| 672 | * performed, otherwise 0 is returned. */ | ||
| 673 | uint64_t kvstoreIncrementallyRehash(kvstore *kvs, uint64_t threshold_us) { | ||
| 674 | if (listLength(kvs->rehashing) == 0) | ||
| 675 | return 0; | ||
| 676 | |||
| 677 | /* Our goal is to rehash as many dictionaries as we can before reaching threshold_us, | ||
| 678 | * after each dictionary completes rehashing, it removes itself from the list. */ | ||
| 679 | listNode *node; | ||
| 680 | monotime timer; | ||
| 681 | uint64_t elapsed_us = 0; | ||
| 682 | elapsedStart(&timer); | ||
| 683 | while ((node = listFirst(kvs->rehashing))) { | ||
| 684 | dictRehashMicroseconds(listNodeValue(node), threshold_us - elapsed_us); | ||
| 685 | |||
| 686 | elapsed_us = elapsedUs(timer); | ||
| 687 | if (elapsed_us >= threshold_us) { | ||
| 688 | break; /* Reached the time limit. */ | ||
| 689 | } | ||
| 690 | } | ||
| 691 | return elapsed_us; | ||
| 692 | } | ||
| 693 | |||
| 694 | size_t kvstoreOverheadHashtableLut(kvstore *kvs) { | ||
| 695 | return kvs->bucket_count * sizeof(dictEntry *); | ||
| 696 | } | ||
| 697 | |||
| 698 | size_t kvstoreOverheadHashtableRehashing(kvstore *kvs) { | ||
| 699 | return kvs->overhead_hashtable_rehashing * sizeof(dictEntry *); | ||
| 700 | } | ||
| 701 | |||
| 702 | unsigned long kvstoreDictRehashingCount(kvstore *kvs) { | ||
| 703 | return listLength(kvs->rehashing); | ||
| 704 | } | ||
| 705 | |||
| 706 | unsigned long kvstoreDictSize(kvstore *kvs, int didx) | ||
| 707 | { | ||
| 708 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 709 | if (!d) | ||
| 710 | return 0; | ||
| 711 | return dictSize(d); | ||
| 712 | } | ||
| 713 | |||
| 714 | void kvstoreInitDictIterator(kvstoreDictIterator *kvs_di, kvstore *kvs, int didx) | ||
| 715 | { | ||
| 716 | kvs_di->kvs = kvs; | ||
| 717 | kvs_di->didx = didx; | ||
| 718 | dictInitIterator(&kvs_di->di, kvstoreGetDict(kvs, didx)); | ||
| 719 | } | ||
| 720 | |||
| 721 | void kvstoreInitDictSafeIterator(kvstoreDictIterator *kvs_di, kvstore *kvs, int didx) | ||
| 722 | { | ||
| 723 | kvs_di->kvs = kvs; | ||
| 724 | kvs_di->didx = didx; | ||
| 725 | dictInitSafeIterator(&kvs_di->di, kvstoreGetDict(kvs, didx)); | ||
| 726 | } | ||
| 727 | |||
| 728 | /* Free the kvs_di returned by kvstoreGetDictIterator and kvstoreGetDictSafeIterator. */ | ||
| 729 | void kvstoreResetDictIterator(kvstoreDictIterator *kvs_di) | ||
| 730 | { | ||
| 731 | /* The dict may be deleted during the iteration process, so here need to check for NULL. */ | ||
| 732 | if (kvstoreGetDict(kvs_di->kvs, kvs_di->didx)) { | ||
| 733 | dictResetIterator(&kvs_di->di); | ||
| 734 | /* In the safe iterator context, we may delete entries. */ | ||
| 735 | freeDictIfNeeded(kvs_di->kvs, kvs_di->didx); | ||
| 736 | } | ||
| 737 | } | ||
| 738 | |||
| 739 | /* Get the next element of the dict through kvstoreDictIterator and dictNext. */ | ||
| 740 | dictEntry *kvstoreDictIteratorNext(kvstoreDictIterator *kvs_di) | ||
| 741 | { | ||
| 742 | /* The dict may be deleted during the iteration process, so here need to check for NULL. */ | ||
| 743 | dict *d = kvstoreGetDict(kvs_di->kvs, kvs_di->didx); | ||
| 744 | if (!d) return NULL; | ||
| 745 | |||
| 746 | return dictNext(&kvs_di->di); | ||
| 747 | } | ||
| 748 | |||
| 749 | dictEntry *kvstoreDictGetRandomKey(kvstore *kvs, int didx) | ||
| 750 | { | ||
| 751 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 752 | if (!d) | ||
| 753 | return NULL; | ||
| 754 | return dictGetRandomKey(d); | ||
| 755 | } | ||
| 756 | |||
| 757 | dictEntry *kvstoreDictGetFairRandomKey(kvstore *kvs, int didx) | ||
| 758 | { | ||
| 759 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 760 | if (!d) | ||
| 761 | return NULL; | ||
| 762 | return dictGetFairRandomKey(d); | ||
| 763 | } | ||
| 764 | |||
| 765 | unsigned int kvstoreDictGetSomeKeys(kvstore *kvs, int didx, dictEntry **des, unsigned int count) | ||
| 766 | { | ||
| 767 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 768 | if (!d) | ||
| 769 | return 0; | ||
| 770 | return dictGetSomeKeys(d, des, count); | ||
| 771 | } | ||
| 772 | |||
| 773 | int kvstoreDictExpand(kvstore *kvs, int didx, unsigned long size) | ||
| 774 | { | ||
| 775 | dict *d = createDictIfNeeded(kvs, didx); | ||
| 776 | if (!d) | ||
| 777 | return DICT_ERR; | ||
| 778 | return dictExpand(d, size); | ||
| 779 | } | ||
| 780 | |||
| 781 | unsigned long kvstoreDictScanDefrag(kvstore *kvs, int didx, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata) | ||
| 782 | { | ||
| 783 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 784 | if (!d) | ||
| 785 | return 0; | ||
| 786 | return dictScanDefrag(d, v, fn, defragfns, privdata); | ||
| 787 | } | ||
| 788 | |||
| 789 | /* Unlike kvstoreDictScanDefrag(), this method doesn't defrag the data(keys and values) | ||
| 790 | * within dict, it only reallocates the memory used by the dict structure itself using | ||
| 791 | * the provided allocation function. This feature was added for the active defrag feature. | ||
| 792 | * | ||
| 793 | * With 16k dictionaries for cluster mode with 1 shard, this operation may require substantial time | ||
| 794 | * to execute. A "cursor" is used to perform the operation iteratively. When first called, a | ||
| 795 | * cursor value of 0 should be provided. The return value is an updated cursor which should be | ||
| 796 | * provided on the next iteration. The operation is complete when 0 is returned. | ||
| 797 | * | ||
| 798 | * The 'defragfn' callback is called with a reference to the dict that callback can reallocate. */ | ||
| 799 | unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn) { | ||
| 800 | for (int didx = cursor; didx < kvs->num_dicts; didx++) { | ||
| 801 | dict **d = kvstoreGetDictRef(kvs, didx), *newd; | ||
| 802 | if (!*d) | ||
| 803 | continue; | ||
| 804 | if ((newd = defragfn(*d))) { | ||
| 805 | *d = newd; | ||
| 806 | |||
| 807 | /* After defragmenting the dict, update its corresponding | ||
| 808 | * rehashing node in the kvstore's rehashing list. */ | ||
| 809 | kvstoreDictMetaBase *metadata = (kvstoreDictMetaBase *)dictMetadata(*d); | ||
| 810 | if (metadata->rehashing_node) | ||
| 811 | metadata->rehashing_node->value = *d; | ||
| 812 | } | ||
| 813 | return (didx + 1); | ||
| 814 | } | ||
| 815 | return 0; | ||
| 816 | } | ||
| 817 | |||
| 818 | void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key) | ||
| 819 | { | ||
| 820 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 821 | if (!d) | ||
| 822 | return NULL; | ||
| 823 | assert(d->type->no_value == 0); | ||
| 824 | return dictFetchValue(d, key); | ||
| 825 | } | ||
| 826 | |||
| 827 | dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key) { | ||
| 828 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 829 | if (!d) | ||
| 830 | return NULL; | ||
| 831 | return dictFind(d, key); | ||
| 832 | } | ||
| 833 | |||
| 834 | /* Find a link to a key in the specified kvstore. If not found return NULL. | ||
| 835 | * | ||
| 836 | * This function is a wrapper around dictFindLink(), used to locate a key in a dict | ||
| 837 | * from a kvstore. | ||
| 838 | * | ||
| 839 | * The caller may provide a bucket pointer to receive the reference to the bucket | ||
| 840 | * where the key is stored or need to be added. | ||
| 841 | * | ||
| 842 | * Returns: | ||
| 843 | * A reference to the dictEntry if found, otherwise NULL. | ||
| 844 | * | ||
| 845 | * Important: | ||
| 846 | * After calling kvstoreDictFindLink(), any necessary updates based on returned | ||
| 847 | * link or bucket must be made immediately after, commonly by kvstoreDictSetAtLink() | ||
| 848 | * without any operations in between that might modify the dict. Otherwise, | ||
| 849 | * the link or bucket may become invalid. Example usage: | ||
| 850 | * | ||
| 851 | * link = kvstoreDictFindLink(kvs, didx, key, &bucket); | ||
| 852 | * ... Do something, but don't modify kvs->dicts[didx] ... | ||
| 853 | * if (link) | ||
| 854 | * kvstoreDictSetAtLink(kvs, didx, kv, &link, 0); // Update existing entry | ||
| 855 | * else | ||
| 856 | * kvstoreDictSetAtLink(kvs, didx, kv, &bucket, 1); // Insert new entry | ||
| 857 | */ | ||
| 858 | dictEntryLink kvstoreDictFindLink(kvstore *kvs, int didx, void *key, dictEntryLink *bucket) { | ||
| 859 | if (bucket) *bucket = NULL; | ||
| 860 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 861 | if (!d) return NULL; | ||
| 862 | return dictFindLink(d, key, bucket); | ||
| 863 | } | ||
| 864 | |||
| 865 | /* Set a key (or key-value) in the specified kvstore. | ||
| 866 | * | ||
| 867 | * This function inserts a new key or updates an existing one, depending on | ||
| 868 | * the `newItem` flag. | ||
| 869 | * | ||
| 870 | * Parameters: | ||
| 871 | * link: - When `newItem` is set, `link` points to the bucket of the key. | ||
| 872 | * - When `newItem` is not set, `link` points to the link of the key. | ||
| 873 | * - If link is NULL, dictFindLink() will be called to locate the link. | ||
| 874 | * | ||
| 875 | * newItem: - If set, add a new key with a new dictEntry. | ||
| 876 | * - If not set, update the key of an existing dictEntry. | ||
| 877 | */ | ||
| 878 | void kvstoreDictSetAtLink(kvstore *kvs, int didx, void *kv, dictEntryLink *link, int newItem) { | ||
| 879 | dict *d; | ||
| 880 | if (newItem) { | ||
| 881 | d = createDictIfNeeded(kvs, didx); | ||
| 882 | dictSetKeyAtLink(d, kv, link, newItem); | ||
| 883 | cumulativeKeyCountAdd(kvs, didx, 1); /* must be called only after updating dict */ | ||
| 884 | } else { | ||
| 885 | d = kvstoreGetDict(kvs, didx); | ||
| 886 | dictSetKeyAtLink(d, kv, link, newItem); | ||
| 887 | } | ||
| 888 | } | ||
| 889 | |||
| 890 | dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing) { | ||
| 891 | dict *d = createDictIfNeeded(kvs, didx); | ||
| 892 | dictEntry *ret = dictAddRaw(d, key, existing); | ||
| 893 | if (ret) | ||
| 894 | cumulativeKeyCountAdd(kvs, didx, 1); | ||
| 895 | return ret; | ||
| 896 | } | ||
| 897 | |||
| 898 | void kvstoreDictSetKey(kvstore *kvs, int didx, dictEntry* de, void *key) { | ||
| 899 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 900 | dictSetKey(d, de, key); | ||
| 901 | } | ||
| 902 | |||
| 903 | void kvstoreDictSetVal(kvstore *kvs, int didx, dictEntry *de, void *val) { | ||
| 904 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 905 | assert(d->type->no_value == 0); | ||
| 906 | dictSetVal(d, de, val); | ||
| 907 | } | ||
| 908 | |||
| 909 | dictEntryLink kvstoreDictTwoPhaseUnlinkFind(kvstore *kvs, int didx, const void *key, int *table_index) { | ||
| 910 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 911 | if (!d) | ||
| 912 | return NULL; | ||
| 913 | return dictTwoPhaseUnlinkFind(kvstoreGetDict(kvs, didx), key, table_index); | ||
| 914 | } | ||
| 915 | |||
| 916 | void kvstoreDictTwoPhaseUnlinkFree(kvstore *kvs, int didx, dictEntryLink link, int table_index) { | ||
| 917 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 918 | dictTwoPhaseUnlinkFree(d, link, table_index); | ||
| 919 | cumulativeKeyCountAdd(kvs, didx, -1); | ||
| 920 | freeDictIfNeeded(kvs, didx); | ||
| 921 | } | ||
| 922 | |||
| 923 | int kvstoreDictDelete(kvstore *kvs, int didx, const void *key) { | ||
| 924 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 925 | if (!d) | ||
| 926 | return DICT_ERR; | ||
| 927 | int ret = dictDelete(d, key); | ||
| 928 | if (ret == DICT_OK) { | ||
| 929 | cumulativeKeyCountAdd(kvs, didx, -1); | ||
| 930 | freeDictIfNeeded(kvs, didx); | ||
| 931 | } | ||
| 932 | return ret; | ||
| 933 | } | ||
| 934 | |||
| 935 | void *kvstoreGetDictMeta(kvstore *kvs, int didx, int createIfNeeded) { | ||
| 936 | dict *d = kvstoreGetDict(kvs, didx); | ||
| 937 | if (!d) { | ||
| 938 | if (!createIfNeeded) return NULL; | ||
| 939 | d = createDictIfNeeded(kvs, didx); | ||
| 940 | } | ||
| 941 | return dictMetadata(d); | ||
| 942 | } | ||
| 943 | |||
| 944 | void *kvstoreGetMetadata(kvstore *kvs) { | ||
| 945 | return &kvs->metadata; | ||
| 946 | } | ||
| 947 | |||
| 948 | #ifdef REDIS_TEST | ||
| 949 | #include <stdio.h> | ||
| 950 | #include "testhelp.h" | ||
| 951 | |||
| 952 | #define TEST(name) printf("test — %s\n", name); | ||
| 953 | |||
| 954 | uint64_t hashTestCallback(const void *key) { | ||
| 955 | return dictGenHashFunction((unsigned char*)key, strlen((char*)key)); | ||
| 956 | } | ||
| 957 | |||
| 958 | void freeTestCallback(dict *d, void *val) { | ||
| 959 | UNUSED(d); | ||
| 960 | zfree(val); | ||
| 961 | } | ||
| 962 | |||
| 963 | void *defragAllocTest(void *ptr) { | ||
| 964 | size_t size = zmalloc_usable_size(ptr); | ||
| 965 | void *newptr = zmalloc(size); | ||
| 966 | memcpy(newptr, ptr, size); | ||
| 967 | zfree(ptr); | ||
| 968 | return newptr; | ||
| 969 | } | ||
| 970 | |||
| 971 | dict *defragLUTTestCallback(dict *d) { | ||
| 972 | /* handle the dict struct */ | ||
| 973 | d = defragAllocTest(d); | ||
| 974 | /* handle the first hash table */ | ||
| 975 | d->ht_table[0] = defragAllocTest(d->ht_table[0]); | ||
| 976 | /* handle the second hash table */ | ||
| 977 | if (d->ht_table[1]) | ||
| 978 | d->ht_table[1] = defragAllocTest(d->ht_table[1]); | ||
| 979 | return d; | ||
| 980 | } | ||
| 981 | |||
| 982 | dictType KvstoreDictTestType = { | ||
| 983 | hashTestCallback, | ||
| 984 | NULL, | ||
| 985 | NULL, | ||
| 986 | NULL, | ||
| 987 | freeTestCallback, | ||
| 988 | NULL, | ||
| 989 | NULL | ||
| 990 | }; | ||
| 991 | |||
| 992 | kvstoreType KvstoreTestType = { | ||
| 993 | NULL, /* kvstore metadata size */ | ||
| 994 | NULL, /* dict metadata size */ | ||
| 995 | NULL, /* can free dict */ | ||
| 996 | NULL, /* on kvstore empty */ | ||
| 997 | NULL, /* on dict empty */ | ||
| 998 | }; | ||
| 999 | |||
| 1000 | char *stringFromInt(int value) { | ||
| 1001 | char buf[32]; | ||
| 1002 | int len; | ||
| 1003 | char *s; | ||
| 1004 | |||
| 1005 | len = snprintf(buf, sizeof(buf), "%d",value); | ||
| 1006 | s = zmalloc(len+1); | ||
| 1007 | memcpy(s, buf, len); | ||
| 1008 | s[len] = '\0'; | ||
| 1009 | return s; | ||
| 1010 | } | ||
| 1011 | |||
| 1012 | /* ./redis-server test kvstore */ | ||
| 1013 | int kvstoreTest(int argc, char **argv, int flags) { | ||
| 1014 | UNUSED(argc); | ||
| 1015 | UNUSED(argv); | ||
| 1016 | UNUSED(flags); | ||
| 1017 | |||
| 1018 | int i; | ||
| 1019 | void *key; | ||
| 1020 | dictEntry *de; | ||
| 1021 | kvstoreIterator kvs_it; | ||
| 1022 | kvstoreDictIterator kvs_di; | ||
| 1023 | |||
| 1024 | /* Test also dictType with no_value=1 */ | ||
| 1025 | dictType KvstoreDictNovalTestType = KvstoreDictTestType; | ||
| 1026 | KvstoreDictNovalTestType.no_value = 1; | ||
| 1027 | |||
| 1028 | int didx = 0; | ||
| 1029 | int curr_slot = 0; | ||
| 1030 | kvstore *kvs1 = kvstoreCreate(&KvstoreTestType, &KvstoreDictTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND); | ||
| 1031 | kvstore *kvs2 = kvstoreCreate(&KvstoreTestType, &KvstoreDictNovalTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS); | ||
| 1032 | |||
| 1033 | TEST("Add 16 keys") { | ||
| 1034 | for (i = 0; i < 16; i++) { | ||
| 1035 | de = kvstoreDictAddRaw(kvs1, didx, stringFromInt(i), NULL); | ||
| 1036 | assert(de != NULL); | ||
| 1037 | de = kvstoreDictAddRaw(kvs2, didx, stringFromInt(i), NULL); | ||
| 1038 | assert(de != NULL); | ||
| 1039 | } | ||
| 1040 | assert(kvstoreDictSize(kvs1, didx) == 16); | ||
| 1041 | assert(kvstoreSize(kvs1) == 16); | ||
| 1042 | assert(kvstoreDictSize(kvs2, didx) == 16); | ||
| 1043 | assert(kvstoreSize(kvs2) == 16); | ||
| 1044 | } | ||
| 1045 | |||
| 1046 | TEST("kvstoreIterator creating and releasing without kvstoreIteratorNextDict()") { | ||
| 1047 | kvstore *kvs = kvstoreCreate(&KvstoreTestType, &KvstoreDictNovalTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS); | ||
| 1048 | kvstoreIterator kvs_iter; | ||
| 1049 | kvstoreIteratorInit(&kvs_iter, kvs); | ||
| 1050 | kvstoreIteratorReset(&kvs_iter); | ||
| 1051 | kvstoreRelease(kvs); | ||
| 1052 | } | ||
| 1053 | |||
| 1054 | TEST("kvstoreIterator case 1: removing all keys does not delete the empty dict") { | ||
| 1055 | kvstoreIteratorInit(&kvs_it, kvs1); | ||
| 1056 | while((de = kvstoreIteratorNext(&kvs_it)) != NULL) { | ||
| 1057 | curr_slot = kvstoreIteratorGetCurrentDictIndex(&kvs_it); | ||
| 1058 | key = dictGetKey(de); | ||
| 1059 | assert(kvstoreDictDelete(kvs1, curr_slot, key) == DICT_OK); | ||
| 1060 | } | ||
| 1061 | kvstoreIteratorReset(&kvs_it); | ||
| 1062 | |||
| 1063 | dict *d = kvstoreGetDict(kvs1, didx); | ||
| 1064 | assert(d != NULL); | ||
| 1065 | assert(kvstoreDictSize(kvs1, didx) == 0); | ||
| 1066 | assert(kvstoreSize(kvs1) == 0); | ||
| 1067 | } | ||
| 1068 | |||
| 1069 | TEST("kvstoreIterator case 2: removing all keys will delete the empty dict") { | ||
| 1070 | kvstoreIteratorInit(&kvs_it, kvs2); | ||
| 1071 | while((de = kvstoreIteratorNext(&kvs_it)) != NULL) { | ||
| 1072 | curr_slot = kvstoreIteratorGetCurrentDictIndex(&kvs_it); | ||
| 1073 | key = dictGetKey(de); | ||
| 1074 | assert(kvstoreDictDelete(kvs2, curr_slot, key) == DICT_OK); | ||
| 1075 | } | ||
| 1076 | kvstoreIteratorReset(&kvs_it); | ||
| 1077 | |||
| 1078 | /* Make sure the dict was removed from the rehashing list. */ | ||
| 1079 | while (kvstoreIncrementallyRehash(kvs2, 1000)) {} | ||
| 1080 | |||
| 1081 | dict *d = kvstoreGetDict(kvs2, didx); | ||
| 1082 | assert(d == NULL); | ||
| 1083 | assert(kvstoreDictSize(kvs2, didx) == 0); | ||
| 1084 | assert(kvstoreSize(kvs2) == 0); | ||
| 1085 | } | ||
| 1086 | |||
| 1087 | TEST("Add 16 keys again") { | ||
| 1088 | for (i = 0; i < 16; i++) { | ||
| 1089 | de = kvstoreDictAddRaw(kvs1, didx, stringFromInt(i), NULL); | ||
| 1090 | assert(de != NULL); | ||
| 1091 | de = kvstoreDictAddRaw(kvs2, didx, stringFromInt(i), NULL); | ||
| 1092 | assert(de != NULL); | ||
| 1093 | } | ||
| 1094 | assert(kvstoreDictSize(kvs1, didx) == 16); | ||
| 1095 | assert(kvstoreSize(kvs1) == 16); | ||
| 1096 | assert(kvstoreDictSize(kvs2, didx) == 16); | ||
| 1097 | assert(kvstoreSize(kvs2) == 16); | ||
| 1098 | } | ||
| 1099 | |||
| 1100 | TEST("kvstoreDictIterator case 1: removing all keys does not delete the empty dict") { | ||
| 1101 | kvstoreInitDictSafeIterator(&kvs_di, kvs1, didx); | ||
| 1102 | while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) { | ||
| 1103 | key = dictGetKey(de); | ||
| 1104 | assert(kvstoreDictDelete(kvs1, didx, key) == DICT_OK); | ||
| 1105 | } | ||
| 1106 | kvstoreResetDictIterator(&kvs_di); | ||
| 1107 | |||
| 1108 | dict *d = kvstoreGetDict(kvs1, didx); | ||
| 1109 | assert(d != NULL); | ||
| 1110 | assert(kvstoreDictSize(kvs1, didx) == 0); | ||
| 1111 | assert(kvstoreSize(kvs1) == 0); | ||
| 1112 | } | ||
| 1113 | |||
| 1114 | TEST("kvstoreDictIterator case 2: removing all keys will delete the empty dict") { | ||
| 1115 | kvstoreInitDictSafeIterator(&kvs_di, kvs2, didx); | ||
| 1116 | while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) { | ||
| 1117 | key = dictGetKey(de); | ||
| 1118 | assert(kvstoreDictDelete(kvs2, didx, key) == DICT_OK); | ||
| 1119 | } | ||
| 1120 | kvstoreResetDictIterator(&kvs_di); | ||
| 1121 | |||
| 1122 | dict *d = kvstoreGetDict(kvs2, didx); | ||
| 1123 | assert(d == NULL); | ||
| 1124 | assert(kvstoreDictSize(kvs2, didx) == 0); | ||
| 1125 | assert(kvstoreSize(kvs2) == 0); | ||
| 1126 | } | ||
| 1127 | |||
| 1128 | TEST("Verify that a rehashing dict's node in the rehashing list is correctly updated after defragmentation") { | ||
| 1129 | unsigned long cursor = 0; | ||
| 1130 | kvstore *kvs = kvstoreCreate(&KvstoreTestType, &KvstoreDictTestType, 0, KVSTORE_ALLOCATE_DICTS_ON_DEMAND); | ||
| 1131 | for (i = 0; i < 256; i++) { | ||
| 1132 | de = kvstoreDictAddRaw(kvs, 0, stringFromInt(i), NULL); | ||
| 1133 | if (listLength(kvs->rehashing)) break; | ||
| 1134 | } | ||
| 1135 | assert(listLength(kvs->rehashing)); | ||
| 1136 | while ((cursor = kvstoreDictLUTDefrag(kvs, cursor, defragLUTTestCallback)) != 0) {} | ||
| 1137 | while (kvstoreIncrementallyRehash(kvs, 1000)) {} | ||
| 1138 | kvstoreRelease(kvs); | ||
| 1139 | } | ||
| 1140 | |||
| 1141 | TEST("Verify non-empty dict count is correctly updated") { | ||
| 1142 | kvstore *kvs = kvstoreCreate(&KvstoreTestType, &KvstoreDictTestType, 2, | ||
| 1143 | KVSTORE_ALLOCATE_DICTS_ON_DEMAND); | ||
| 1144 | for (int idx = 0; idx < 4; idx++) { | ||
| 1145 | for (i = 0; i < 16; i++) { | ||
| 1146 | de = kvstoreDictAddRaw(kvs, idx, stringFromInt(i), NULL); | ||
| 1147 | assert(de != NULL); | ||
| 1148 | /* When the first element is inserted, the number of non-empty dictionaries is increased by 1. */ | ||
| 1149 | if (i == 0) assert(kvstoreNumNonEmptyDicts(kvs) == idx + 1); | ||
| 1150 | } | ||
| 1151 | } | ||
| 1152 | |||
| 1153 | /* Step by step, clear all dictionaries and ensure non-empty dict count is updated */ | ||
| 1154 | for (int idx = 0; idx < 4; idx++) { | ||
| 1155 | kvstoreInitDictSafeIterator(&kvs_di, kvs, idx); | ||
| 1156 | while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) { | ||
| 1157 | key = dictGetKey(de); | ||
| 1158 | assert(kvstoreDictDelete(kvs, idx, key) == DICT_OK); | ||
| 1159 | /* When the dictionary is emptied, the number of non-empty dictionaries is reduced by 1. */ | ||
| 1160 | if (kvstoreDictSize(kvs, idx) == 0) assert(kvstoreNumNonEmptyDicts(kvs) == 3 - idx); | ||
| 1161 | } | ||
| 1162 | kvstoreResetDictIterator(&kvs_di); | ||
| 1163 | } | ||
| 1164 | kvstoreRelease(kvs); | ||
| 1165 | } | ||
| 1166 | |||
| 1167 | kvstoreRelease(kvs1); | ||
| 1168 | kvstoreRelease(kvs2); | ||
| 1169 | return 0; | ||
| 1170 | } | ||
| 1171 | #endif | ||
