diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
| commit | 5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch) | |
| tree | 1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/tests/modules/defragtest.c | |
| parent | c7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff) | |
| download | crep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz | |
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/tests/modules/defragtest.c')
| -rw-r--r-- | examples/redis-unstable/tests/modules/defragtest.c | 475 |
1 files changed, 475 insertions, 0 deletions
diff --git a/examples/redis-unstable/tests/modules/defragtest.c b/examples/redis-unstable/tests/modules/defragtest.c new file mode 100644 index 0000000..8b9e182 --- /dev/null +++ b/examples/redis-unstable/tests/modules/defragtest.c @@ -0,0 +1,475 @@ +/* A module that implements defrag callback mechanisms. + */ + +#include "redismodule.h" +#include <stdlib.h> +#include <string.h> + +#define UNUSED(V) ((void) V) + +static RedisModuleType *FragType; + +struct FragObject { + unsigned long len; + void **values; + int maxstep; +}; + +/* Make sure we get the expected cursor */ +unsigned long int last_set_cursor = 0; + +unsigned long int datatype_attempts = 0; +unsigned long int datatype_defragged = 0; +unsigned long int datatype_raw_defragged = 0; +unsigned long int datatype_resumes = 0; +unsigned long int datatype_wrong_cursor = 0; +unsigned long int defrag_started = 0; +unsigned long int defrag_ended = 0; +unsigned long int global_strings_attempts = 0; +unsigned long int global_strings_defragged = 0; +unsigned long int global_dicts_resumes = 0; /* Number of dict defragmentation resumed from a previous break */ +unsigned long int global_subdicts_resumes = 0; /* Number of subdict defragmentation resumed from a previous break */ +unsigned long int global_dicts_attempts = 0; /* Number of attempts to defragment dictionary */ +unsigned long int global_dicts_defragged = 0; /* Number of dictionaries successfully defragmented */ +unsigned long int global_dicts_items_defragged = 0; /* Number of dictionaries items successfully defragmented */ + +unsigned long global_strings_len = 0; +RedisModuleString **global_strings = NULL; + +unsigned long global_dicts_len = 0; +RedisModuleDict **global_dicts = NULL; + +static void createGlobalStrings(RedisModuleCtx *ctx, unsigned long count) +{ + global_strings_len = count; + global_strings = RedisModule_Alloc(sizeof(RedisModuleString *) * count); + + for (unsigned long i = 0; i < count; i++) { + global_strings[i] = RedisModule_CreateStringFromLongLong(ctx, i); + } +} + +static int defragGlobalStrings(RedisModuleDefragCtx *ctx) +{ + unsigned long cursor = 0; + RedisModule_DefragCursorGet(ctx, &cursor); + + if (!global_strings_len) return 0; /* strings is empty. */ + RedisModule_Assert(cursor < global_strings_len); + for (; cursor < global_strings_len; cursor++) { + RedisModuleString *str = global_strings[cursor]; + if (!str) continue; + RedisModuleString *new = RedisModule_DefragRedisModuleString(ctx, str); + global_strings_attempts++; + if (new != NULL) { + global_strings[cursor] = new; + global_strings_defragged++; + } + + if (RedisModule_DefragShouldStop(ctx)) { + RedisModule_DefragCursorSet(ctx, cursor); + return 1; + } + } + return 0; +} + +static void createFragGlobalStrings(RedisModuleCtx *ctx) { + for (unsigned long i = 0; i < global_strings_len; i++) { + if (i % 2 == 1) { + RedisModule_FreeString(ctx, global_strings[i]); + global_strings[i] = NULL; + } + } +} + +static void createGlobalDicts(RedisModuleCtx *ctx, unsigned long count) { + global_dicts_len = count; + global_dicts = RedisModule_Alloc(sizeof(RedisModuleDict *) * count); + + /* Create some nested dictionaries: + * - Each main dict contains some subdicts. + * - Each sub-dict contains some strings. */ + for (unsigned long i = 0; i < count; i++) { + RedisModuleDict *dict = RedisModule_CreateDict(ctx); + for (unsigned long j = 0; j < 10; j++) { + /* Create sub dict. */ + RedisModuleDict *subdict = RedisModule_CreateDict(ctx); + for (unsigned long k = 0; k < 10; k++) { + RedisModuleString *str = RedisModule_CreateStringFromULongLong(ctx, k); + RedisModule_DictSet(subdict, str, str); + } + + RedisModuleString *key = RedisModule_CreateStringFromULongLong(ctx, j); + RedisModule_DictSet(dict, key, subdict); + RedisModule_FreeString(ctx, key); + } + global_dicts[i] = dict; + } +} + +static void freeFragGlobalSubDict(RedisModuleCtx *ctx, RedisModuleDict *subdict) { + char *key; + size_t keylen; + RedisModuleString *str; + RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(subdict, "^", NULL, 0); + while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&str))) { + RedisModule_FreeString(ctx, str); + } + RedisModule_FreeDict(ctx, subdict); + RedisModule_DictIteratorStop(iter); +} + +static void createFragGlobalDicts(RedisModuleCtx *ctx) { + char *key; + size_t keylen; + RedisModuleDict *subdict; + + for (unsigned long i = 0; i < global_dicts_len; i++) { + RedisModuleDict *dict = global_dicts[i]; + if (!dict) continue; + + /* Handle dictionaries differently based on their index in global_dicts array: + * 1. For odd indices (i % 2 == 1): Remove the entire dictionary. + * 2. For even indices: Keep the dictionary but remove half of its items. */ + if (i % 2 == 1) { + RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(dict, "^", NULL, 0); + while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&subdict))) { + freeFragGlobalSubDict(ctx, subdict); + } + RedisModule_FreeDict(ctx, dict); + global_dicts[i] = NULL; + RedisModule_DictIteratorStop(iter); + } else { + int key_index = 0; + RedisModuleDictIter *iter = RedisModule_DictIteratorStartC(dict, "^", NULL, 0); + while ((key = RedisModule_DictNextC(iter, &keylen, (void**)&subdict))) { + if (key_index++ % 2 == 1) { + freeFragGlobalSubDict(ctx, subdict); + RedisModule_DictReplaceC(dict, key, keylen, NULL); + } + } + RedisModule_DictIteratorStop(iter); + } + } +} + +static int defragGlobalSubDictValueCB(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen, void **newptr) { + REDISMODULE_NOT_USED(key); + REDISMODULE_NOT_USED(keylen); + if (!data) return 0; + *newptr = RedisModule_DefragAlloc(ctx, data); + return 0; +} + +static int defragGlobalDictValueCB(RedisModuleDefragCtx *ctx, void *data, unsigned char *key, size_t keylen, void **newptr) { + REDISMODULE_NOT_USED(key); + REDISMODULE_NOT_USED(keylen); + static RedisModuleString *seekTo = NULL; + RedisModuleDict *subdict = data; + if (!subdict) return 0; + if (seekTo != NULL) global_subdicts_resumes++; + + *newptr = RedisModule_DefragRedisModuleDict(ctx, subdict, defragGlobalSubDictValueCB, &seekTo); + if (*newptr) global_dicts_items_defragged++; + /* Return 1 if seekTo is not NULL, indicating this node needs more defrag work. */ + return seekTo != NULL; +} + +static int defragGlobalDicts(RedisModuleDefragCtx *ctx) { + static RedisModuleString *seekTo = NULL; + static unsigned long dict_index = 0; + unsigned long cursor = 0; + + RedisModule_DefragCursorGet(ctx, &cursor); + if (cursor == 0) { /* Start a new defrag. */ + if (seekTo) { + RedisModule_FreeString(NULL, seekTo); + seekTo = NULL; + } + dict_index = 0; + } else { + global_dicts_resumes++; + } + + if (!global_dicts_len) return 0; /* dicts is empty. */ + RedisModule_Assert(dict_index < global_dicts_len); + for (; dict_index < global_dicts_len; dict_index++) { + RedisModuleDict *dict = global_dicts[dict_index]; + if (!dict) continue; + RedisModuleDict *new = RedisModule_DefragRedisModuleDict(ctx, dict, defragGlobalDictValueCB, &seekTo); + global_dicts_attempts++; + if (new != NULL) { + global_dicts[dict_index] = new; + global_dicts_defragged++; + } + + if (seekTo != NULL) { + /* Set cursor to 1 to indicate defragmentation is not finished. */ + RedisModule_DefragCursorSet(ctx, 1); + return 1; + } + } + + /* Set cursor to 0 to indicate completion. */ + dict_index = 0; + RedisModule_DefragCursorSet(ctx, 0); + return 0; +} + +typedef enum { DEFRAG_NOT_START, DEFRAG_STRING, DEFRAG_DICT } defrag_module_stage; +static int defragGlobal(RedisModuleDefragCtx *ctx) { + static defrag_module_stage stage = DEFRAG_NOT_START; + if (stage == DEFRAG_NOT_START) { + stage = DEFRAG_STRING; /* Start a new global defrag. */ + } + + if (stage == DEFRAG_STRING) { + if (defragGlobalStrings(ctx) != 0) return 1; + stage = DEFRAG_DICT; + } + if (stage == DEFRAG_DICT) { + if (defragGlobalDicts(ctx) != 0) return 1; + stage = DEFRAG_NOT_START; + } + return 0; +} + +static void defragStart(RedisModuleDefragCtx *ctx) { + REDISMODULE_NOT_USED(ctx); + defrag_started++; +} + +static void defragEnd(RedisModuleDefragCtx *ctx) { + REDISMODULE_NOT_USED(ctx); + defrag_ended++; +} + +static void FragInfo(RedisModuleInfoCtx *ctx, int for_crash_report) { + REDISMODULE_NOT_USED(for_crash_report); + + RedisModule_InfoAddSection(ctx, "stats"); + RedisModule_InfoAddFieldLongLong(ctx, "datatype_attempts", datatype_attempts); + RedisModule_InfoAddFieldLongLong(ctx, "datatype_defragged", datatype_defragged); + RedisModule_InfoAddFieldLongLong(ctx, "datatype_raw_defragged", datatype_raw_defragged); + RedisModule_InfoAddFieldLongLong(ctx, "datatype_resumes", datatype_resumes); + RedisModule_InfoAddFieldLongLong(ctx, "datatype_wrong_cursor", datatype_wrong_cursor); + RedisModule_InfoAddFieldLongLong(ctx, "global_strings_attempts", global_strings_attempts); + RedisModule_InfoAddFieldLongLong(ctx, "global_strings_defragged", global_strings_defragged); + RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_resumes", global_dicts_resumes); + RedisModule_InfoAddFieldLongLong(ctx, "global_subdicts_resumes", global_subdicts_resumes); + RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_attempts", global_dicts_attempts); + RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_defragged", global_dicts_defragged); + RedisModule_InfoAddFieldLongLong(ctx, "global_dicts_items_defragged", global_dicts_items_defragged); + RedisModule_InfoAddFieldLongLong(ctx, "defrag_started", defrag_started); + RedisModule_InfoAddFieldLongLong(ctx, "defrag_ended", defrag_ended); +} + +struct FragObject *createFragObject(unsigned long len, unsigned long size, int maxstep) { + struct FragObject *o = RedisModule_Alloc(sizeof(*o)); + o->len = len; + o->values = RedisModule_Alloc(sizeof(RedisModuleString*) * len); + o->maxstep = maxstep; + + for (unsigned long i = 0; i < len; i++) { + o->values[i] = RedisModule_Calloc(1, size); + } + + return o; +} + +/* FRAG.RESETSTATS */ +static int fragResetStatsCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + datatype_attempts = 0; + datatype_defragged = 0; + datatype_raw_defragged = 0; + datatype_resumes = 0; + datatype_wrong_cursor = 0; + global_strings_attempts = 0; + global_strings_defragged = 0; + global_dicts_resumes = 0; + global_subdicts_resumes = 0; + global_dicts_attempts = 0; + global_dicts_defragged = 0; + global_dicts_items_defragged = 0; + defrag_started = 0; + defrag_ended = 0; + + RedisModule_ReplyWithSimpleString(ctx, "OK"); + return REDISMODULE_OK; +} + +/* FRAG.CREATE key len size maxstep */ +static int fragCreateCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 5) + return RedisModule_WrongArity(ctx); + + RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1], + REDISMODULE_READ|REDISMODULE_WRITE); + int type = RedisModule_KeyType(key); + if (type != REDISMODULE_KEYTYPE_EMPTY) + { + return RedisModule_ReplyWithError(ctx, "ERR key exists"); + } + + long long len; + if ((RedisModule_StringToLongLong(argv[2], &len) != REDISMODULE_OK)) { + return RedisModule_ReplyWithError(ctx, "ERR invalid len"); + } + + long long size; + if ((RedisModule_StringToLongLong(argv[3], &size) != REDISMODULE_OK)) { + return RedisModule_ReplyWithError(ctx, "ERR invalid size"); + } + + long long maxstep; + if ((RedisModule_StringToLongLong(argv[4], &maxstep) != REDISMODULE_OK)) { + return RedisModule_ReplyWithError(ctx, "ERR invalid maxstep"); + } + + struct FragObject *o = createFragObject(len, size, maxstep); + RedisModule_ModuleTypeSetValue(key, FragType, o); + RedisModule_ReplyWithSimpleString(ctx, "OK"); + RedisModule_CloseKey(key); + + return REDISMODULE_OK; +} + +/* FRAG.create_frag_global len */ +static int fragCreateGlobalCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + UNUSED(argv); + if (argc != 2) + return RedisModule_WrongArity(ctx); + + long long glen; + if ((RedisModule_StringToLongLong(argv[1], &glen) != REDISMODULE_OK)) { + return RedisModule_ReplyWithError(ctx, "ERR invalid len"); + } + + createGlobalStrings(ctx, glen); + createGlobalDicts(ctx, glen); + createFragGlobalStrings(ctx); + createFragGlobalDicts(ctx); + RedisModule_ReplyWithSimpleString(ctx, "OK"); + return REDISMODULE_OK; +} + +void FragFree(void *value) { + struct FragObject *o = value; + + for (unsigned long i = 0; i < o->len; i++) + RedisModule_Free(o->values[i]); + RedisModule_Free(o->values); + RedisModule_Free(o); +} + +size_t FragFreeEffort(RedisModuleString *key, const void *value) { + REDISMODULE_NOT_USED(key); + + const struct FragObject *o = value; + return o->len; +} + +int FragDefrag(RedisModuleDefragCtx *ctx, RedisModuleString *key, void **value) { + unsigned long i = 0; + int steps = 0; + + int dbid = RedisModule_GetDbIdFromDefragCtx(ctx); + RedisModule_Assert(dbid != -1); + + RedisModule_Log(NULL, "notice", "Defrag key: %s", RedisModule_StringPtrLen(key, NULL)); + + /* Attempt to get cursor, validate it's what we're exepcting */ + if (RedisModule_DefragCursorGet(ctx, &i) == REDISMODULE_OK) { + if (i > 0) datatype_resumes++; + + /* Validate we're expecting this cursor */ + if (i != last_set_cursor) datatype_wrong_cursor++; + } else { + if (last_set_cursor != 0) datatype_wrong_cursor++; + } + + /* Attempt to defrag the object itself */ + datatype_attempts++; + struct FragObject *o = RedisModule_DefragAlloc(ctx, *value); + if (o == NULL) { + /* Not defragged */ + o = *value; + } else { + /* Defragged */ + *value = o; + datatype_defragged++; + } + + /* Deep defrag now */ + for (; i < o->len; i++) { + datatype_attempts++; + void *new = RedisModule_DefragAlloc(ctx, o->values[i]); + if (new) { + o->values[i] = new; + datatype_defragged++; + } + + if ((o->maxstep && ++steps > o->maxstep) || + ((i % 64 == 0) && RedisModule_DefragShouldStop(ctx))) + { + RedisModule_DefragCursorSet(ctx, i); + last_set_cursor = i; + return 1; + } + } + + /* Defrag the values array itself using RedisModule_DefragAllocRaw + * and RedisModule_DefragFreeRaw for testing purposes. */ + void *new_values = RedisModule_DefragAllocRaw(ctx, o->len * sizeof(void*)); + memcpy(new_values, o->values, o->len * sizeof(void*)); + RedisModule_DefragFreeRaw(ctx, o->values); + o->values = new_values; + datatype_raw_defragged++; + + last_set_cursor = 0; + return 0; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx, "defragtest", 1, REDISMODULE_APIVER_1) + == REDISMODULE_ERR) return REDISMODULE_ERR; + + if (RedisModule_GetTypeMethodVersion() < REDISMODULE_TYPE_METHOD_VERSION) { + return REDISMODULE_ERR; + } + + RedisModuleTypeMethods tm = { + .version = REDISMODULE_TYPE_METHOD_VERSION, + .free = FragFree, + .free_effort = FragFreeEffort, + .defrag = FragDefrag + }; + + FragType = RedisModule_CreateDataType(ctx, "frag_type", 0, &tm); + if (FragType == NULL) return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "frag.create", + fragCreateCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "frag.create_frag_global", + fragCreateGlobalCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "frag.resetstats", + fragResetStatsCommand, "write deny-oom", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + RedisModule_RegisterInfoFunc(ctx, FragInfo); + RedisModule_RegisterDefragFunc2(ctx, defragGlobal); + RedisModule_RegisterDefragCallbacks(ctx, defragStart, defragEnd); + + return REDISMODULE_OK; +} |
