summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/estore.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/estore.c')
-rw-r--r--examples/redis-unstable/src/estore.c496
1 files changed, 496 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/estore.c b/examples/redis-unstable/src/estore.c
new file mode 100644
index 0000000..6335bd5
--- /dev/null
+++ b/examples/redis-unstable/src/estore.c
@@ -0,0 +1,496 @@
+/*
+ * estore.c -- Expiration Store implementation
+ *
+ * Copyright (c) 2011-Present, Redis Ltd.
+ * All rights reserved.
+ *
+ * Licensed under your choice of (a) the Redis Source Available License 2.0
+ * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
+ * GNU Affero General Public License v3 (AGPLv3).
+ */
+
+#include "fmacros.h"
+#include "estore.h"
+#include "zmalloc.h"
+#include "redisassert.h"
+#include "server.h"
+#include <string.h>
+
+/* Forward declaration of the estore structure */
+struct _estore {
+ int flags; /* Flags for configuration options */
+ EbucketsType *bucket_type; /* Type of buckets used in this store */
+ ebuckets *ebArray; /* Array of ebuckets (one per slot in cluster mode, or just one) */
+ int num_buckets_bits; /* Log2 of the number of buckets */
+ int num_buckets; /* Number of buckets (1 << num_buckets_bits) */
+ unsigned long long count; /* Total number of items in this estore */
+ fenwickTree *buckets_sizes; /* Binary indexed tree (BIT) that describes cumulative key frequencies */
+};
+
+/* Get the appropriate bucket for a given eidx */
+ebuckets *estoreGetBuckets(estore *es, int eidx) {
+ debugAssert(eidx < es->num_buckets);
+ return &(es->ebArray[eidx]);
+}
+
+/* Create a new expiration store
+ * type - Pointer to a static EbucketsType defining the bucket behavior.
+ * num_buckets_bits - The log2 of the number of buckets (0 for 1 bucket,
+ * CLUSTER_SLOT_MASK_BITS for CLUSTER_SLOTS buckets)
+ * flags - Configuration flags
+ */
+estore *estoreCreate(EbucketsType *type, int num_buckets_bits) {
+ /* We can't support more than 2^16 buckets to be consistent with kvstore */
+ assert(num_buckets_bits <= 16);
+
+ estore *es = zmalloc(sizeof(estore));
+ /* Store the bucket type */
+ es->bucket_type = type;
+
+ /* Calculate number of buckets based on num_buckets_bits */
+ es->num_buckets_bits = num_buckets_bits;
+ es->num_buckets = 1 << num_buckets_bits;
+ es->buckets_sizes = es->num_buckets > 1 ? fwTreeCreate(num_buckets_bits) : NULL;
+
+ /* Allocate the buckets array */
+ es->ebArray = zcalloc(sizeof(ebuckets) * es->num_buckets);
+
+ /* Initialize all buckets */
+ for (int i = 0; i < es->num_buckets; i++) {
+ es->ebArray[i] = ebCreate();
+ }
+
+ es->count = 0;
+ return es;
+}
+
+/* Empty an expiration store (clear all entries but keep the structure) */
+void estoreEmpty(estore *es) {
+ if (es == NULL) return;
+
+ for (int i = 0; i < es->num_buckets; i++) {
+ ebDestroy(&es->ebArray[i], es->bucket_type, NULL);
+ es->ebArray[i] = ebCreate();
+ }
+
+ if (es->buckets_sizes) fwTreeClear(es->buckets_sizes);
+ es->count = 0;
+}
+
+/* Check if the expiration store is empty */
+int estoreIsEmpty(estore *es) {
+ return es->count == 0;
+}
+
+/* Get the first non-empty bucket index in the estore */
+int estoreGetFirstNonEmptyBucket(estore *es) {
+ if (es->num_buckets == 1 || estoreSize(es) == 0)
+ return 0;
+ return fwTreeFindFirstNonEmpty(es->buckets_sizes);
+}
+
+/* Get the next non-empty bucket index after the given index */
+int estoreGetNextNonEmptyBucket(estore *es, int eidx) {
+ if (es->num_buckets == 1) {
+ assert(eidx == 0);
+ return -1;
+ }
+ return fwTreeFindNextNonEmpty(es->buckets_sizes, eidx);
+}
+
+/* Release an expiration store (free all memory) */
+void estoreRelease(estore *es) {
+ if (es == NULL) return;
+
+ for (int i = 0; i < es->num_buckets; i++) {
+ if (es->ebArray[i])
+ ebDestroy(&es->ebArray[i], es->bucket_type, NULL);
+ }
+ fwTreeDestroy(es->buckets_sizes);
+ zfree(es->ebArray);
+ zfree(es);
+}
+
+/* Perform active expiration on a specific bucket */
+void estoreActiveExpire(estore *es, int eidx, ExpireInfo *info) {
+ ebuckets *eb = estoreGetBuckets(es, eidx);
+ uint64_t before = ebGetTotalItems(*eb, es->bucket_type);
+ ebExpire(eb, es->bucket_type, info);
+ /* If items expired (or updated), update the BIT and estore count */
+ if (info->itemsExpired) {
+ uint64_t diff = before - ebGetTotalItems(*eb, es->bucket_type);
+ fwTreeUpdate(es->buckets_sizes, eidx, (long long) diff);
+ es->count -= diff;
+ }
+}
+
+/* Add item to estore with the given expiration time. The item must has
+ * expireMeta already allocated. */
+void estoreAdd(estore *es, int eidx, eItem item, uint64_t when) {
+ debugAssert(es != NULL && item != NULL);
+
+ /* currently only used by hash field expiration. Verify it has expireMeta */
+ debugAssert((((robj *)item)->encoding == OBJ_ENCODING_LISTPACK_EX) ||
+ ((((robj *)item)->encoding == OBJ_ENCODING_HT) &&
+ ((dict *) ((robj *)item)->ptr)->type == &entryHashDictTypeWithHFE));
+
+ ebuckets *bucket = estoreGetBuckets(es, eidx);
+ if (ebAdd(bucket, es->bucket_type, item, when) == 0) {
+ es->count++;
+ fwTreeUpdate(es->buckets_sizes, eidx, 1);
+ }
+}
+
+/* Remove an item from the expiration store. Returns the expire time or EB_EXPIRE_TIME_INVALID */
+uint64_t estoreRemove(estore *es, int eidx, eItem item) {
+ uint64_t expireTime;
+ debugAssert(es != NULL && item != NULL);
+
+ /* Currently only used by hash field expiration. gracefully ignore otherwise */
+ kvobj *kv = (kvobj *) item;
+ if ( (kv->type != OBJ_HASH) ||
+ (kv->encoding == OBJ_ENCODING_LISTPACK) ||
+ ((kv->encoding == OBJ_ENCODING_HT) && (((dict *)kv->ptr)->type != &entryHashDictTypeWithHFE)))
+ return EB_EXPIRE_TIME_INVALID;
+
+ /* If (ExpireMeta of kv) marked as trash, then it is already removed */
+ if ((expireTime = ebGetExpireTime(es->bucket_type, item)) == EB_EXPIRE_TIME_INVALID)
+ return EB_EXPIRE_TIME_INVALID;
+
+ ebuckets *bucket = estoreGetBuckets(es, eidx);
+ serverAssert(ebRemove(bucket, es->bucket_type, item)==1);
+ es->count--;
+ fwTreeUpdate(es->buckets_sizes, eidx, -1);
+
+ return expireTime;
+}
+
+/* Update an item's expiration time in the store */
+void estoreUpdate(estore *es, int eidx, eItem item, uint64_t when) {
+ debugAssert(es != NULL && item != NULL);
+
+ /* currently only used by hash field expiration. Verify it has expireMeta */
+ debugAssert((((robj *)item)->encoding == OBJ_ENCODING_LISTPACK_EX) ||
+ ((((robj *)item)->encoding == OBJ_ENCODING_HT) &&
+ ((dict *) ((robj *)item)->ptr)->type == &entryHashDictTypeWithHFE));
+
+ debugAssert(ebGetExpireTime(es->bucket_type, item) != EB_EXPIRE_TIME_INVALID);
+
+ ebuckets *bucket = estoreGetBuckets(es, eidx);
+
+ /* Remove the item from its current position */
+ serverAssert(ebRemove(bucket, es->bucket_type, item) != 0);
+
+ /* Add the item back with the new expiration time */
+ serverAssert(ebAdd(bucket, es->bucket_type, item, when) == 0);
+
+ /* Note that estore count remain unchanged */
+}
+
+/* Get the total number of items in the expiration store */
+uint64_t estoreSize(estore *es) {
+ return es->count;
+}
+
+/* Move ebuckets from one estore to another */
+void estoreMoveEbuckets(estore *src, estore *dst, int eidx) {
+ serverAssert(src->num_buckets > eidx);
+ serverAssert(src->num_buckets == dst->num_buckets);
+ serverAssert(ebIsEmpty(dst->ebArray[eidx])); /* If it is NULL */
+
+ /* Adjust source estore */
+ ebuckets eb = src->ebArray[eidx];
+ if (ebIsEmpty(eb)) return;
+ int64_t count = (int64_t)ebGetTotalItems(eb, src->bucket_type);
+ src->count -= count;
+ fwTreeUpdate(src->buckets_sizes, eidx, -count);
+ src->ebArray[eidx] = ebCreate(); /* Set to NULL actually.*/
+
+ /* Move ebuckets to destination estore */
+ dst->ebArray[eidx] = eb;
+ dst->count += count;
+ fwTreeUpdate(dst->buckets_sizes, eidx, count);
+}
+
+#ifdef REDIS_TEST
+#include <stdio.h>
+#include "testhelp.h"
+
+#define TEST(name) printf("test — %s\n", name);
+
+/* Test item structure for estore testing */
+typedef struct TestItem {
+ kvobj kv; /* mimic kvobj of type HASH to pass checks in estore */
+ ExpireMeta mexpire;
+ int index;
+} TestItem;
+
+/* Test EbucketsType for estore testing */
+ExpireMeta *getTestItemExpireMeta(const eItem item) {
+ return &((TestItem *)item)->mexpire;
+}
+
+void deleteTestItemCb(eItem item, void *ctx) {
+ UNUSED(ctx);
+ zfree(item);
+}
+
+EbucketsType testEbucketsType = {
+ .getExpireMeta = getTestItemExpireMeta,
+ .onDeleteItem = deleteTestItemCb,
+ .itemsAddrAreOdd = 0,
+};
+
+/* Helper function to create a test item */
+static TestItem *createTestItem(int index) {
+ TestItem *item = zmalloc(sizeof(TestItem));
+ item->index = index;
+ item->mexpire.trash = 1;
+ /* mimic kvobj of type HASH to pass checks in estore */
+ item->kv.type = OBJ_HASH;
+ item->kv.encoding = OBJ_ENCODING_LISTPACK_EX;
+ return item;
+}
+
+static ExpireAction activeExpireTestCb(eItem item, void *ctx) {
+ UNUSED(ctx);
+ zfree(item);
+ return 0;
+}
+
+int estoreTest(int argc, char **argv, int flags) {
+ UNUSED(argc);
+ UNUSED(argv);
+ UNUSED(flags);
+
+ /* Initialize minimal server state needed for testing */
+ server.hz = 10;
+ server.unixtime = time(NULL);
+
+ TEST("Create and destroy estore") {
+ estore *es = estoreCreate(&testEbucketsType, 0);
+ assert(es != NULL);
+ assert(estoreIsEmpty(es));
+ assert(estoreSize(es) == 0);
+ estoreRelease(es);
+ }
+
+ TEST("Create estore with multiple buckets") {
+ estore *es = estoreCreate(&testEbucketsType, 2); /* 4 buckets */
+ assert(es != NULL);
+ assert(estoreIsEmpty(es));
+ assert(estoreSize(es) == 0);
+
+ /* Test bucket access */
+ ebuckets *bucket0 = estoreGetBuckets(es, 0);
+ ebuckets *bucket1 = estoreGetBuckets(es, 1);
+ ebuckets *bucket2 = estoreGetBuckets(es, 2);
+ ebuckets *bucket3 = estoreGetBuckets(es, 3);
+
+ assert(bucket0 != NULL);
+ assert(bucket1 != NULL);
+ assert(bucket2 != NULL);
+ assert(bucket3 != NULL);
+
+ /* All buckets should be different */
+ assert(bucket0 != bucket1);
+ assert(bucket0 != bucket2);
+ assert(bucket1 != bucket3);
+
+ estoreRelease(es);
+ }
+
+ TEST("Add and remove items") {
+ estore *es = estoreCreate(&testEbucketsType, 1); /* 2 buckets */
+
+ /* Test initial state */
+ assert(estoreSize(es) == 0);
+ assert(estoreIsEmpty(es));
+
+ /* Create test items */
+ TestItem *item1 = createTestItem(1);
+ TestItem *item2 = createTestItem(2);
+ TestItem *item3 = createTestItem(3);
+
+ /* Add items to different buckets */
+ estoreAdd(es, 0, item1, 1000);
+ assert(estoreSize(es) == 1);
+ assert(!estoreIsEmpty(es));
+
+ estoreAdd(es, 1, item2, 2000);
+ assert(estoreSize(es) == 2);
+
+ estoreAdd(es, 0, item3, 3000); /* Add another item to bucket 0 */
+ assert(estoreSize(es) == 3);
+
+ /* Verify expiration times are set correctly */
+ assert(ebGetMetaExpTime(&item1->mexpire) == 1000);
+ assert(ebGetMetaExpTime(&item2->mexpire) == 2000);
+ assert(ebGetMetaExpTime(&item3->mexpire) == 3000);
+
+ /* Remove items */
+ uint64_t expireTime1 = estoreRemove(es, 0, item1);
+ assert(expireTime1 == 1000);
+ assert(estoreSize(es) == 2);
+ zfree(item1);
+
+ uint64_t expireTime2 = estoreRemove(es, 1, item2);
+ assert(expireTime2 == 2000);
+ assert(estoreSize(es) == 1);
+ zfree(item2);
+
+ uint64_t expireTime3 = estoreRemove(es, 0, item3);
+ assert(expireTime3 == 3000);
+ assert(estoreSize(es) == 0);
+ assert(estoreIsEmpty(es));
+ zfree(item3);
+
+ /* Clean up - items are freed by the onDeleteItem callback */
+ estoreRelease(es);
+ }
+
+ TEST("Update item expiration") {
+ estore *es = estoreCreate(&testEbucketsType, 0); /* 1 bucket */
+
+ /* Create and add a test item */
+ TestItem *item = createTestItem(1);
+ estoreAdd(es, 0, item, 1000);
+ assert(estoreSize(es) == 1);
+
+ /* Verify initial expiration time */
+ assert(ebGetMetaExpTime(&item->mexpire) == 1000);
+
+ /* Update expiration time */
+ estoreUpdate(es, 0, item, 2000);
+ assert(estoreSize(es) == 1); /* Size should remain the same */
+ assert(ebGetMetaExpTime(&item->mexpire) == 2000);
+
+ /* Update again to a different time */
+ estoreUpdate(es, 0, item, 500);
+ assert(estoreSize(es) == 1);
+ assert(ebGetMetaExpTime(&item->mexpire) == 500);
+
+ /* Clean up */
+ estoreRemove(es, 0, item);
+ assert(estoreSize(es) == 0);
+ zfree(item);
+
+ estoreRelease(es);
+ }
+
+ TEST("Non-empty bucket iteration") {
+ estore *es = estoreCreate(&testEbucketsType, 2); /* 4 buckets */
+
+ /* Test bucket iteration on empty store */
+ assert(estoreGetFirstNonEmptyBucket(es) == 0); /* Returns 0 when empty */
+ assert(estoreGetNextNonEmptyBucket(es, 0) == -1); /* No next bucket when empty */
+
+ /* Create test items and add to specific buckets */
+ TestItem *item1 = createTestItem(1);
+ TestItem *item2 = createTestItem(2);
+ TestItem *item3 = createTestItem(3);
+
+ /* Add to bucket 1 and 3 (skip 0 and 2) */
+ estoreAdd(es, 1, item1, 1000);
+ estoreAdd(es, 3, item2, 2000);
+ estoreAdd(es, 3, item3, 3000); /* Add another item to bucket 3 */
+
+ assert(estoreSize(es) == 3);
+
+ /* Test iteration through non-empty buckets */
+ int firstBucket = estoreGetFirstNonEmptyBucket(es);
+ assert(firstBucket == 1);
+
+ int nextBucket = estoreGetNextNonEmptyBucket(es, firstBucket);
+ assert(nextBucket == 3);
+
+ int lastBucket = estoreGetNextNonEmptyBucket(es, nextBucket);
+ assert(lastBucket == -1); /* No more non-empty buckets */
+
+ /* Test iteration from different starting points */
+ assert(estoreGetNextNonEmptyBucket(es, 0) == 1);
+ assert(estoreGetNextNonEmptyBucket(es, 2) == 3);
+
+ /* Clean up */
+ estoreRemove(es, 1, item1);
+ zfree(item1);
+ estoreRemove(es, 3, item2);
+ zfree(item2);
+ estoreRemove(es, 3, item3);
+ zfree(item3);
+ assert(estoreSize(es) == 0);
+
+ estoreRelease(es);
+ }
+
+ TEST("Empty estore") {
+ estore *es = estoreCreate(&testEbucketsType, 1); /* 2 buckets */
+
+ /* Add some items */
+ TestItem *item1 = createTestItem(1);
+ TestItem *item2 = createTestItem(2);
+ TestItem *item3 = createTestItem(3);
+
+ estoreAdd(es, 0, item1, 1000);
+ estoreAdd(es, 1, item2, 2000);
+ estoreAdd(es, 0, item3, 3000);
+ assert(estoreSize(es) == 3);
+ assert(!estoreIsEmpty(es));
+
+ /* Empty the store - this should call onDeleteItem for all items */
+ estoreEmpty(es);
+ assert(estoreSize(es) == 0);
+ assert(estoreIsEmpty(es));
+
+ /* Verify buckets are empty */
+ assert(estoreGetFirstNonEmptyBucket(es) == 0); /* Returns 0 when empty */
+ assert(estoreGetNextNonEmptyBucket(es, 0) == -1);
+
+ estoreRelease(es);
+ }
+
+ TEST("Active expiration") {
+ estore *es = estoreCreate(&testEbucketsType, 14); /* 2^14 buckets */
+
+ /* Create items with different expiration times */
+ TestItem *expiredItem1 = createTestItem(1);
+ TestItem *expiredItem2 = createTestItem(2);
+ TestItem *expiredItem3 = createTestItem(3);
+ TestItem *futureItem = createTestItem(4);
+
+ estoreAdd(es, 0, expiredItem1, 1023);
+ estoreAdd(es, 0, expiredItem2, 2047);
+ estoreAdd(es, 1, expiredItem3, 127);
+ estoreAdd(es, 0, futureItem, 4095);
+ assert(estoreSize(es) == 4);
+
+ /* Perform active expiration */
+ ExpireInfo expireInfo = {
+ .maxToExpire = UINT64_MAX,
+ .onExpireItem = activeExpireTestCb,
+ .ctx = NULL,
+ .now = 2048, /* Current time in milliseconds */
+ .itemsExpired = 0
+ };
+
+ estoreActiveExpire(es, 0, &expireInfo);
+
+ /* The expired items should be removed, future item should remain */
+ assert(expireInfo.itemsExpired == 2);
+ assert(estoreSize(es) == 2);
+
+ estoreActiveExpire(es, 1, &expireInfo);
+ assert(expireInfo.itemsExpired == 1);
+ assert(estoreSize(es) == 1);
+
+ /* Clean up remaining item */
+ estoreRemove(es, 0, futureItem);
+ zfree(futureItem);
+ assert(estoreSize(es) == 0);
+
+ estoreRelease(es);
+ }
+
+ return 0;
+}
+#endif