diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
| commit | 5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch) | |
| tree | 1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/src/chk.c | |
| parent | c7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff) | |
| download | crep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz | |
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/src/chk.c')
| -rw-r--r-- | examples/redis-unstable/src/chk.c | 822 |
1 files changed, 822 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/chk.c b/examples/redis-unstable/src/chk.c new file mode 100644 index 0000000..f15cfb1 --- /dev/null +++ b/examples/redis-unstable/src/chk.c @@ -0,0 +1,822 @@ +/* Implementation of a topK structure using CuckooHeavyKeeper algorithm + * + * Implementation is based on the paper "Cuckoo Heavy Keeper and the balancing + * act of maintaining heavy hitters in stream processing" by Vinh Quang Ngo and + * Marina Papatriantafilou. Also, the accompanying C++ implementation was used + * as a reference point: https://github.com/vinhqngo5/Cuckoo_Heavy_Keeper + * Main changes are addition of a min-heap so we can keep names of the top K + * elements - idea comes from RedisBloom's TopK structure. + * + * Copyright (c) 2026-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#include "chk.h" +#include "redisassert.h" +#include "zmalloc.h" +#include "xxhash.h" + +#include <math.h> +#include <stdlib.h> +#include <string.h> + +/* Lobby to heavy item promotion threshold */ +#define LOBBY_PROMOTION_THRESHOLD 16 + +#ifndef static_assert +#define static_assert(expr, lit) extern char __static_assert_failure[(expr) ? 1:-1] +#endif + +static_assert(LOBBY_PROMOTION_THRESHOLD < CHK_LUT_SIZE, + "Lobby promotion threshold should be less then the LUT size to " + "ensure constant operations during decayCounter!"); + +/* After a heavy item is demoted is starts recursively kicking out other heavy + * items in the case it should stay heavy (defined by isHeavyHitter). In + * principle this process could go over all the items in the chkTopK's tables + * so it's artificially limited by this constant. */ +#define MAX_KICKS 16 + +/* An item is defined as heavy hitter if its count is more or equal to x * N + * where x is a threshold constant (HEAVY_RATIO) and N is the total count the + * chkTopK structure has accumulated. See the paper for more info. */ +#define HEAVY_RATIO 0.008 + +/* A unique seed for the items when storing them in the heap so it's not related + * to the cuckoo's hashes. Also, we don't need the less-bit hash here as the + * heap does not take much memory so we avoid needless possible collisions. */ +#define HEAP_SEED 1919 + +typedef struct { + size_t idx[CHK_NUM_TABLES]; + fingerprint_t fp; +} fpAndIdx; + +#define min(a, b) ((a) < (b) ? (a) : (b)) + +/* Heap operations */ +static chkHeapBucket *chkCheckExistInHeap(chkTopK *topk, const char *item, int itemlen, uint64_t fp) { + for (int32_t i = topk->k - 1; i >= 0; --i) { + chkHeapBucket *bucket = topk->heap + i; + if (bucket->fp == fp && bucket->item && + sdslen(bucket->item) == (size_t)itemlen && + memcmp(bucket->item, item, itemlen) == 0) + { + return bucket; + } + } + return NULL; +} + +void chkHeapifyDown(chkHeapBucket *array, size_t len, size_t start) { + size_t child = start; + + if (len < 2 || (len - 2) / 2 < child) { + return; + } + child = 2 * child + 1; + if ((child + 1) < len && (array[child].count > array[child + 1].count)) { + ++child; + } + if (array[child].count > array[start].count) { + return; + } + + chkHeapBucket top = {0}; + top = array[start]; + do { + memcpy(&array[start], &array[child], sizeof(chkHeapBucket)); + start = child; + + if ((len - 2) / 2 < child) { + break; + } + child = 2 * child + 1; + + if ((child + 1) < len && (array[child].count > array[child + 1].count)) { + ++child; + } + } while (array[child].count < top.count); + memcpy(&array[start], &top, sizeof(chkHeapBucket)); +} + +/*----------------------------------------------------------------------------- + * chkTopK operations + *----------------------------------------------------------------------------*/ + +/* Create the chkTopK structure. Note, CHK paper recommends decay=1.08. + * numbuckets must be a power of 2. Recommended size for numbuckets is at least + * 7 or 8 times k. */ +chkTopK *chkTopKCreate(int k, int numbuckets, double decay) { + /* Number of buckets need to be a power of 2 for better performance - we + * have better cache locality of the tables and faster table indices + * calculations. */ + assert(k > 0 && (numbuckets & (numbuckets - 1)) == 0); + + size_t usable = 0; + chkTopK *topk = zcalloc_usable(sizeof(chkTopK), &usable); + topk->alloc_size += usable; + + for (int i = 0; i < CHK_NUM_TABLES; ++i) { + topk->tables[i] = zcalloc_usable(sizeof(chkBucket) * numbuckets, &usable); + topk->alloc_size += usable; + } + + topk->heap = zcalloc_usable(sizeof(chkHeapBucket) * k, &usable); + topk->alloc_size += usable; + + topk->decay = decay; + topk->inv_decay = 1. / decay; + topk->k = k; + topk->numbuckets = numbuckets; + + topk->lut_decay_exp[0] = 0; + topk->lut_min_decay[0] = 0; + topk->lut_decay_prob[0] = 0; + for (int i = 1; i < CHK_LUT_SIZE + 1; ++i) { + topk->lut_decay_exp[i] = topk->lut_decay_exp[i - 1] + pow(topk->decay, i - 1); + topk->lut_min_decay[i] = topk->lut_decay_exp[i] - topk->lut_decay_exp[i - 1]; + topk->lut_decay_prob[i] = pow(topk->inv_decay, i); + } + + return topk; +} + +/* Release chkTopK resources */ +void chkTopKRelease(chkTopK *topk) { + size_t usable; + for (int i = 0; i < CHK_NUM_TABLES; ++i) { + zfree_usable(topk->tables[i], &usable); + topk->alloc_size -= usable; + } + for (int i = 0; i < topk->k; ++i) { + if (topk->heap[i].item) { + topk->alloc_size -= sdsAllocSize(topk->heap[i].item); + sdsfree(topk->heap[i].item); + } + } + zfree_usable(topk->heap, &usable); + topk->alloc_size -= usable; + debugAssert(topk->alloc_size == zmalloc_usable_size(topk)); + + zfree(topk); +} + +static inline int generateAltIdx(fingerprint_t fp, int idx, int numbuckets) { + return (idx ^ (0x5bd1e995 * (size_t)fp)) & (numbuckets - 1); +} + +fpAndIdx generateItemFpAndIdxs(chkTopK *topk, char *item, int itemlen) { + uint64_t hash = XXH3_64bits_withSeed(item, itemlen, 0); + + fpAndIdx res; + res.fp = (hash & 0xFFFF); /* Only use 16 bits for fingerprint */ + + /* Note numbuckets are a power of 2 so we don't use modulo for index calc */ + res.idx[0] = (hash >> 32) & (topk->numbuckets - 1); + for (int i = 1; i < CHK_NUM_TABLES; ++i) { + res.idx[i] = generateAltIdx(res.fp, res.idx[i-1], topk->numbuckets); + } + + return res; +} + +typedef struct { + int table_idx; + int pos; +} checkEntryRes; + +/* Check if `item` is a heavy entry. If so we bump its count. If not - we make + * it a heavy entry immediately if there is an empty spot, thus skipping the + * lobby as an optimization. */ +checkEntryRes checkHeavyEntries(chkTopK *topk, fpAndIdx item, counter_t weight) { + int empty_table_idx = -1; + int empty_pos = -1; + + for (int i = 0; i < CHK_NUM_TABLES; ++i) { + int idx = item.idx[i]; + + chkBucket *bucket = &topk->tables[i][idx]; + for (int j = 0; j < CHK_HEAVY_ENTRIES_PER_BUCKET; ++j) { + chkHeavyEntry *e = &bucket->heavy_entries[j]; + if (e->count > 0) { + if (e->fp == item.fp) { + e->count += weight; + + checkEntryRes res = { i, j }; + return res; + } + } else if (empty_table_idx == -1) { + empty_table_idx = i; + empty_pos = j; + } + } + } + + if (empty_table_idx == -1) { + checkEntryRes res = { -1, -1 }; + return res; + } + + /* If there is an empty slot in the heavy entries just put the item there + * instead of going through the lobby first (optimization as per the paper) */ + int idx = item.idx[empty_table_idx]; + chkHeavyEntry *e = &topk->tables[empty_table_idx][idx].heavy_entries[empty_pos]; + e->fp = item.fp; + e->count = weight; + + checkEntryRes res = {empty_table_idx, empty_pos}; + return res; +} + +/* A heavy hitter is defined by the paper as an item with counter more or equal + * to phi * N, where phi is a constant and N is the total count the structure + * has recorded up to that point */ +int isHeavyHitter(chkTopK *topk, counter_t cnt) { + return cnt >= (topk->total * HEAVY_RATIO); +} + +/* After a lobby item is promoted it may be placed on a heavy item's spot. The + * latter is kicked out, but it may recursively kick out another heavy item. + * The process is limited by MAX_KICKS and also by the fact that during updates + * one of the kicked out items may have its counter decayed so much - it's not + * passing the heavy item threshold (see isHeavyHitter). */ +void kickout(chkTopK *topk, chkHeavyEntry entry, int idx, int table_idx) { + for (int i = 0; i < MAX_KICKS; ++i) { + /* Do not try to swap with any entries if we don't reach the heavy + * hitter threshold */ + if (!isHeavyHitter(topk, entry.count)) return; + + /* Find the heavy entry in the alt bucket in the other table with + * minimum count. If there is empty entry there just occupy it, else + * recursively kick the minimal one out. + * To find the alt bucket we need to compute the alt index from the + * fingerprint of the kicked-out entry. */ + table_idx = 1 - table_idx; + idx = generateAltIdx(entry.fp, idx, topk->numbuckets); + + chkBucket *bucket = &topk->tables[table_idx][idx]; + counter_t min = (counter_t)-1; + int min_pos = -1; + for (int j = 0; j < CHK_HEAVY_ENTRIES_PER_BUCKET; ++j) { + chkHeavyEntry *e = &bucket->heavy_entries[j]; + if (e->count == 0) { + *e = entry; + return; + } + if (e->count < min) { + min = e->count; + min_pos = j; + } + } + + chkHeavyEntry old_entry = bucket->heavy_entries[min_pos]; + bucket->heavy_entries[min_pos] = entry; + entry = old_entry; + } +} + +/* When a lobby entry's counter passes the promotion threshold we try to promote + * it with some probability. See the paper for more details. If promotion is + * successful the lobby entry may kick out a heavy one - see kickout() */ +int tryPromoteAndKickout(chkTopK *topk, fpAndIdx item, counter_t new_count, + int table_idx) +{ + int idx = item.idx[table_idx]; + chkBucket *bucket = &topk->tables[table_idx][idx]; + counter_t min = (counter_t)-1; /* counter_t is unsigned */ + int min_idx = -1; + + /* We search for heavy item bucket of the promoted lobby entry. We may have + * an empty space which we immediately occupy. Otherwise we choose the + * bucket with lowest counter */ + for (int i = 0; i < CHK_HEAVY_ENTRIES_PER_BUCKET; ++i) { + if (bucket->heavy_entries[i].count == 0) { + bucket->heavy_entries[i].fp = item.fp; + bucket->heavy_entries[i].count = new_count; + return i; + } + if (bucket->heavy_entries[i].count < min) { + min = bucket->heavy_entries[i].count; + min_idx = i; + } + } + + /* If the heavy entry that is going to be kicked out has a counter lower + * than the lobby's one we always kick it out */ + if (min > new_count) { + double prob = (new_count - LOBBY_PROMOTION_THRESHOLD) / + (double)(min - LOBBY_PROMOTION_THRESHOLD); + + if ((rand() / (double)RAND_MAX) >= prob) return -1; + } + + chkHeavyEntry to_kickout = bucket->heavy_entries[min_idx]; + /* Note, that here the promoted item keeps the old count as per the paper */ + bucket->heavy_entries[min_idx].fp = bucket->lobby_entry.fp; + + bucket->lobby_entry.count = 0; + bucket->lobby_entry.fp = 0; + + kickout(topk, to_kickout, idx, table_idx); + + return min_idx; +} + +/* Check if an item is a lobby entry */ +checkEntryRes checkLobbyEntries(chkTopK *topk, fpAndIdx item, counter_t weight) { + for (int i = 0; i < CHK_NUM_TABLES; ++i) { + int idx = item.idx[i]; + + chkBucket *bucket = &topk->tables[i][idx]; + chkLobbyEntry *e = &bucket->lobby_entry; + + /* No match or empty lobby entry */ + if (e->fp != item.fp || e->count == 0) continue; + + /* If we don't cross the threshold just update the counter */ + uint64_t new_count = (uint64_t)e->count + weight; + if (new_count < LOBBY_PROMOTION_THRESHOLD) { + e->count = (uint16_t)new_count; + + checkEntryRes res = { i, -1 }; + return res; + } + + /* Try to promote the entry to heavy entry if we crossed the threshold. + * Else just set the counter to the value of the threshold */ + int kickout_pos = tryPromoteAndKickout(topk, item, new_count, i); + if (kickout_pos != -1) { + checkEntryRes res = {i, kickout_pos}; + return res; + } + + e->count = LOBBY_PROMOTION_THRESHOLD; + checkEntryRes res = { i, -1 }; + return res; + } + + checkEntryRes res = { -1, -1 }; + return res; +} + +/* Probability to decay cnt with 1. + * Equal to pow(decay, -cnt) */ +static inline double getDecayProb(chkTopK *topk, counter_t cnt) { + if (cnt < CHK_LUT_SIZE) { + return topk->lut_decay_prob[cnt]; + } + + return pow(topk->lut_decay_prob[CHK_LUT_SIZE], + ((double)cnt / (CHK_LUT_SIZE))) * + topk->lut_decay_prob[cnt % (CHK_LUT_SIZE)]; +} + +/* Expected decay steps to decay cnt to 0. + * Equal to sum(pow(decay, i)) for i in [0; cnt] */ +static inline double getExpDecayCount(chkTopK *topk, lobby_counter_t cnt) { + return topk->lut_decay_exp[cnt]; +} + +/* Expected minimum decay steps to decay cnt with 1. Since probability is + * pow(decay, -cnt) it's equal to pow(decay, cnt) */ +static inline double getMinDecayCount(chkTopK *topk, counter_t cnt) { + if (cnt < CHK_LUT_SIZE) { + return topk->lut_min_decay[cnt]; + } + + return pow(topk->lut_min_decay[CHK_LUT_SIZE], + ((double)cnt / (CHK_LUT_SIZE))) * + topk->lut_min_decay[cnt % (CHK_LUT_SIZE)]; +} + +/* When there is a hash-collission between lobby entries we decay the existing + * lobby entry with the weight of the new one. Return the counter after decaying. */ +lobby_counter_t chkDecayCounter(chkTopK *topk, lobby_counter_t cnt, counter_t weight) { + if (weight == 0) return cnt; + + /* Unweighted update - just decay with probability pow(decay, -cnt) */ + if (weight == 1) { + double prob = getDecayProb(topk, (counter_t)cnt); + if ((rand() / (double)RAND_MAX) < prob) { + return cnt - 1; + } + return cnt; + } + + /* For weighted updates we simulate multiple unweighted ones */ + + /* Weight is smaller than the minimum amount of decay steps required to + * decay the counter with probability of 100% so again we roll the dice */ + double min_decay = getMinDecayCount(topk, cnt); + if (weight < (counter_t)min_decay) { + double prob = weight / min_decay; + if ((rand() / (double)RAND_MAX) < prob) { + return cnt - 1; + } + return cnt; + } + + /* Weight is more than the expected amount of decay steps to decay the + * counter to 0. */ + double exp_decays = getExpDecayCount(topk, cnt); + if (weight >= (counter_t)exp_decays) + return 0; + + /* Weight is large enough to decay the counter to cnt - X where 0 < X < cnt. + * We binary search for the largest value `C` such that: + * + * (expected decay ops for `C`) >= (expected decay ops for `cnt`) - `weight` + * i.e lut_decay_exp[C] + weight >= lut_decay_exp[cnt] + * + * Note that since cnt is a lobby counter it will necessarily be less or + * equal than LOBBY_PROMOTION_THRESHOLD, so although we binary search this + * is a O(1) operation */ + int left = 0; + int right = cnt; + while (left < right) { + int mid = left + (right - left) / 2; + + if (topk->lut_decay_exp[mid] + weight >= topk->lut_decay_exp[cnt]) { + right = mid; + } else { + left = mid + 1; + } + } + + return left; +} + +/* Update weighted item. If another one was expelled from the topK list - + * return it. Caller is responsible for releasing it */ +sds chkTopKUpdate(chkTopK *topk, char *item, int itemlen, counter_t weight) +{ + if (weight == 0) return NULL; + + topk->total += weight; + + /* Generate a fingerprint and indices for both cuckoo tables. */ + fpAndIdx itemFpIdx = generateItemFpAndIdxs(topk, item, itemlen); + + /* Check if the item is amongst the heavy entries. If so we just update its + * counter. */ + checkEntryRes res = checkHeavyEntries(topk, itemFpIdx, weight); + if (res.table_idx != -1) { + goto update_heap; + } + + /* If the item is not already heavy it may be in the lobby. If so we'll + * increase its counter and promote it to a heavy entry if it passes the + * threshold */ + res = checkLobbyEntries(topk, itemFpIdx, weight); + if (res.table_idx != -1) { + goto update_heap; + } + + /* Item is not tracked at all. Check for empty lobby entries - if there is + * any - place the item there. The weight may be higher than the promotional + * threshold in which case we'll try to promote it. */ + for (int i = 0; i < CHK_NUM_TABLES; ++i) { + int idx = itemFpIdx.idx[i]; + chkBucket *bucket = &topk->tables[i][idx]; + if (bucket->lobby_entry.count == 0) { + bucket->lobby_entry.fp = itemFpIdx.fp; + + res.table_idx = i; + res.pos = -1; + + if (weight < LOBBY_PROMOTION_THRESHOLD) { + bucket->lobby_entry.count = weight; + } else { + int kickout_pos = tryPromoteAndKickout(topk, itemFpIdx, weight, i); + if (kickout_pos != -1) { + res.pos = kickout_pos; + } else { + bucket->lobby_entry.count = LOBBY_PROMOTION_THRESHOLD; + } + } + + goto update_heap; + } + } + + /* If there are no empty lobby entries choose a table deterministically, + * decay its lobby counter and update */ + int table_idx = itemFpIdx.fp & 1; + int idx = itemFpIdx.idx[table_idx]; + + chkLobbyEntry *e = &topk->tables[table_idx][idx].lobby_entry; + + /* new_count is the count of `e` after decaying it with weight */ + lobby_counter_t new_count = chkDecayCounter(topk, e->count, weight); + + /* if the chosen lobby entry has decayed its counter to 0, it's replaced by + * the new entry. Note, in that case the new entry has it's weight + * decreased by the approximate amount of decay operations needed to decay + * the old entry. */ + if (new_count == 0) { + e->fp = itemFpIdx.fp; + counter_t exp_decay_cnt = getExpDecayCount(topk, e->count); + e->count = exp_decay_cnt >= weight ? + 1 : (lobby_counter_t)min(255, weight - exp_decay_cnt); + } else { + e->count = new_count; + } + + if (e->count >= LOBBY_PROMOTION_THRESHOLD) { + int kickout_pos = tryPromoteAndKickout(topk, itemFpIdx, e->count, table_idx); + if (kickout_pos != -1) { + res.table_idx = table_idx; + res.pos = kickout_pos; + } + } + + /* After a change in the structure has occurred we check if we also need to + * update the heap - i.e bump a new item in it, or reorder an old item if + * it's counter went up. */ +update_heap: + if (res.table_idx == -1 || res.pos == -1) + return NULL; + + table_idx = res.table_idx; + idx = itemFpIdx.idx[table_idx]; + + counter_t heap_min = topk->heap[0].count; + chkHeavyEntry *entry = &topk->tables[table_idx][idx].heavy_entries[res.pos]; + + if (entry->count < heap_min) + return NULL; + + /* Heap uses different hash than the cuckoo tables */ + uint64_t fp = XXH3_64bits_withSeed(item, itemlen, HEAP_SEED); + chkHeapBucket *itemHeapPtr = chkCheckExistInHeap(topk, item, itemlen, fp); + if (itemHeapPtr != NULL) { + itemHeapPtr->count = entry->count; + chkHeapifyDown(topk->heap, topk->k, itemHeapPtr - topk->heap); + } else { + /* We know the new entry has bigger count than the min-element so it's + * safe to expel it. */ + sds expelled = topk->heap[0].item; + if (expelled) topk->alloc_size -= sdsAllocSize(expelled); + + topk->heap[0].count = entry->count; + topk->heap[0].fp = fp; + topk->heap[0].item = sdsnewlen(item, itemlen); + topk->alloc_size += sdsAllocSize(topk->heap[0].item); + + chkHeapifyDown(topk->heap, topk->k, 0); + return expelled; + } + + return NULL; +} + +int cmpchkHeapBucket(const void *tmp1, const void *tmp2) { + const chkHeapBucket *res1 = tmp1; + const chkHeapBucket *res2 = tmp2; + return res1->count < res2->count ? 1 : res1->count > res2->count ? -1 : 0; +} + +/* Get an ordered by count list of topk->k elements inside the topk object. + * + * NOTE, the returned array is a copy of the internal heap stored by `topk`. The + * caller is responsible for releasing it after use. The elements of the array + * share their `item` pointers with the internal topk->heap buckets so one must + * not use it after `topk` is released. */ +chkHeapBucket *chkTopKList(chkTopK *topk) { + chkHeapBucket *list = zmalloc(sizeof(chkHeapBucket) * topk->k); + memcpy(list, topk->heap, sizeof(chkHeapBucket) * topk->k); + qsort(list, topk->k, sizeof(*list), cmpchkHeapBucket); + return list; +} + +size_t chkTopKGetMemoryUsage(chkTopK *topk) { + if (!topk) return 0; + + return topk->alloc_size; +} + +#ifdef REDIS_TEST + +#include <stdio.h> +#include "testhelp.h" + +#define UNUSED(x) (void)(x) + +static int findItemInList(chkHeapBucket *list, int k, const char *item, int itemlen) { + for (int i = 0; i < k; i++) { + if (list[i].item != NULL && + sdslen(list[i].item) == (size_t)itemlen && + memcmp(list[i].item, item, itemlen) == 0) { + return i; + } + } + return -1; +} + +static int verifyListSorted(chkHeapBucket *list, int k) { + for (int i = 0; i < k - 1; i++) { + if (list[i].item == NULL) continue; + if (list[i + 1].item == NULL) continue; + if (list[i].count < list[i + 1].count) { + return 0; + } + } + return 1; +} + +static void chkTopKUpdateAndFreeExpelled(chkTopK *topk, const char *item, int itemlen, counter_t weight) { + sds expelled = chkTopKUpdate(topk, (char *)item, itemlen, weight); + if (expelled) sdsfree(expelled); +} + +static void testBasicTopK(void) { + int k = 5; + int numbuckets = 64; + double decay = 0.9; + + chkTopK *topk = chkTopKCreate(k, numbuckets, decay); + test_cond("Create topk structure", topk != NULL); + + if (topk == NULL) return; + + chkTopKUpdateAndFreeExpelled(topk, "item1", 5, 100); + chkTopKUpdateAndFreeExpelled(topk, "item2", 5, 200); + chkTopKUpdateAndFreeExpelled(topk, "item3", 5, 150); + chkTopKUpdateAndFreeExpelled(topk, "item4", 5, 50); + chkTopKUpdateAndFreeExpelled(topk, "item5", 5, 300); + chkTopKUpdateAndFreeExpelled(topk, "item6", 5, 75); + + chkHeapBucket *list = chkTopKList(topk); + test_cond("chkTopKList returns non-NULL", list != NULL); + + if (list == NULL) { + chkTopKRelease(topk); + return; + } + + test_cond("TopK list is sorted in descending order", verifyListSorted(list, k)); + + int idx1 = findItemInList(list, k, "item5", 5); + int idx2 = findItemInList(list, k, "item2", 5); + int idx3 = findItemInList(list, k, "item3", 5); + + test_cond("Heaviest items are in the list", idx1 != -1 && idx2 != -1 && idx3 != -1); + + test_cond("item5 has the highest count", idx1 == 0); + + zfree(list); + chkTopKRelease(topk); +} + +static void testHeavierElementsReplaceLighter(void) { + int k = 5; + int numbuckets = 64; + double decay = 0.9; + + chkTopK *topk = chkTopKCreate(k, numbuckets, decay); + test_cond("Create topk structure for replacement test", topk != NULL); + + if (topk == NULL) return; + + chkTopKUpdateAndFreeExpelled(topk, "light1", 6, 50); + chkTopKUpdateAndFreeExpelled(topk, "light2", 6, 60); + chkTopKUpdateAndFreeExpelled(topk, "light3", 6, 70); + chkTopKUpdateAndFreeExpelled(topk, "light4", 6, 80); + chkTopKUpdateAndFreeExpelled(topk, "light5", 6, 90); + + chkHeapBucket *list1 = chkTopKList(topk); + test_cond("Initial topk list is not NULL", list1 != NULL); + + if (list1 == NULL) { + chkTopKRelease(topk); + return; + } + + int light1_idx = findItemInList(list1, k, "light1", 6); + int light2_idx = findItemInList(list1, k, "light2", 6); + int light3_idx = findItemInList(list1, k, "light3", 6); + int light4_idx = findItemInList(list1, k, "light4", 6); + int light5_idx = findItemInList(list1, k, "light5", 6); + + test_cond("light1 is in initial topk list", light1_idx != -1); + test_cond("light2 is in initial topk list", light2_idx != -1); + test_cond("light3 is in initial topk list", light3_idx != -1); + test_cond("light4 is in initial topk list", light4_idx != -1); + test_cond("light5 is in initial topk list", light5_idx != -1); + + zfree(list1); + + chkTopKUpdateAndFreeExpelled(topk, "heavy1", 6, 500); + chkTopKUpdateAndFreeExpelled(topk, "heavy2", 6, 600); + + chkHeapBucket *list2 = chkTopKList(topk); + test_cond("Updated topk list is not NULL", list2 != NULL); + + if (list2 == NULL) { + chkTopKRelease(topk); + return; + } + + int heavy1_idx = findItemInList(list2, k, "heavy1", 6); + int heavy2_idx = findItemInList(list2, k, "heavy2", 6); + + test_cond("heavy1 is in updated topk list", heavy1_idx != -1); + test_cond("heavy2 is in updated topk list", heavy2_idx != -1); + + light1_idx = findItemInList(list2, k, "light1", 6); + light2_idx = findItemInList(list2, k, "light2", 6); + light3_idx = findItemInList(list2, k, "light3", 6); + light4_idx = findItemInList(list2, k, "light4", 6); + light5_idx = findItemInList(list2, k, "light5", 6); + + int light_items_remaining = (light1_idx != -1 ? 1 : 0) + + (light2_idx != -1 ? 1 : 0) + + (light3_idx != -1 ? 1 : 0) + + (light4_idx != -1 ? 1 : 0) + + (light5_idx != -1 ? 1 : 0); + + test_cond("Some lighter items remain in the list after adding heavier ones", + light_items_remaining > 0); + + zfree(list2); + chkTopKRelease(topk); +} + +static void testManySmallWeightUpdates(void) { + int k = 2; + int numbuckets = 64; + double decay = 0.9; + + chkTopK *topk = chkTopKCreate(k, numbuckets, decay); + test_cond("Create topk structure for small weight updates test", topk != NULL); + + if (topk == NULL) return; + + chkTopKUpdateAndFreeExpelled(topk, "item0", 5, 50); + chkTopKUpdateAndFreeExpelled(topk, "item1", 5, 100); + + chkHeapBucket *list1 = chkTopKList(topk); + test_cond("Topk list after adding item0 and item1 is not NULL", list1 != NULL); + + if (list1 == NULL) { + chkTopKRelease(topk); + return; + } + + int item0_idx1 = findItemInList(list1, k, "item0", 5); + int item1_idx1 = findItemInList(list1, k, "item1", 5); + + test_cond("item0 and item1 are in topk after initial updates", + item0_idx1 != -1 && item1_idx1 != -1); + + zfree(list1); + + for (int i = 0; i < 100; i++) { + chkTopKUpdateAndFreeExpelled(topk, "item2", 5, 1); + } + + chkHeapBucket *list2 = chkTopKList(topk); + test_cond("Topk list after many small updates is not NULL", list2 != NULL); + + if (list2 == NULL) { + chkTopKRelease(topk); + return; + } + + int item0_idx2 = findItemInList(list2, k, "item0", 5); + int item1_idx2 = findItemInList(list2, k, "item1", 5); + int item2_idx2 = findItemInList(list2, k, "item2", 5); + + test_cond("item1 and item2 are in topk, item0 is not", + item1_idx2 != -1 && item2_idx2 != -1 && item0_idx2 == -1); + + counter_t item1_count = 0; + counter_t item2_count = 0; + if (item1_idx2 != -1) item1_count = list2[item1_idx2].count; + if (item2_idx2 != -1) item2_count = list2[item2_idx2].count; + + test_cond("item1 and item2 have similar weights", item1_count > 0 && item2_count > 0 && + (item1_count > item2_count ? item1_count - item2_count : item2_count - item1_count) < 5); + + zfree(list2); + chkTopKRelease(topk); +} + +int chkTopKTest(int argc, char *argv[], int flags) { + UNUSED(argc); + UNUSED(argv); + UNUSED(flags); + + testBasicTopK(); + testHeavierElementsReplaceLighter(); + testManySmallWeightUpdates(); + + return 0; +} + +#endif /* REDIS_TEST */ |
