diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
| commit | 5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch) | |
| tree | 1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/src/defrag.c | |
| parent | c7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff) | |
| download | crep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz | |
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/src/defrag.c')
| -rw-r--r-- | examples/redis-unstable/src/defrag.c | 1985 |
1 files changed, 1985 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/defrag.c b/examples/redis-unstable/src/defrag.c new file mode 100644 index 0000000..f0bff15 --- /dev/null +++ b/examples/redis-unstable/src/defrag.c @@ -0,0 +1,1985 @@ +/* + * Active memory defragmentation + * Try to find key / value allocations that need to be re-allocated in order + * to reduce external fragmentation. + * We do that by scanning the keyspace and for each pointer we have, we can try to + * ask the allocator if moving it to a new address will help reduce fragmentation. + * + * Copyright (c) 2020-Present, Redis Ltd. + * All rights reserved. + * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. + */ + +#include "server.h" +#include <stddef.h> +#include <math.h> + +#ifdef HAVE_DEFRAG + +#define DEFRAG_CYCLE_US 500 /* Standard duration of defrag cycle (in microseconds) */ + +typedef enum { DEFRAG_NOT_DONE = 0, + DEFRAG_DONE = 1 } doneStatus; + +/* + * Defragmentation is performed in stages. Each stage is serviced by a stage function + * (defragStageFn). The stage function is passed a context (void*) to defrag. The contents of that + * context are unique to the particular stage - and may even be NULL for some stage functions. The + * same stage function can be used multiple times (for different stages) each having a different + * context. + * + * Parameters: + * endtime - This is the monotonic time that the function should end and return. This ensures + * a bounded latency due to defrag. + * ctx - A pointer to context which is unique to the stage function. + * + * Returns: + * - DEFRAG_DONE if the stage is complete + * - DEFRAG_NOT_DONE if there is more work to do + */ +typedef doneStatus (*defragStageFn)(void *ctx, monotime endtime); + +/* Function pointer type for freeing context in defragmentation stages. */ +typedef void (*defragStageContextFreeFn)(void *ctx); +typedef struct { + defragStageFn stage_fn; /* The function to be invoked for the stage */ + defragStageContextFreeFn ctx_free_fn; /* Function to free the context */ + void *ctx; /* Context, unique to the stage function */ +} StageDescriptor; + +/* Globals needed for the main defrag processing logic. + * Doesn't include variables specific to a stage or type of data. */ +struct DefragContext { + monotime start_cycle; /* Time of beginning of defrag cycle */ + long long start_defrag_hits; /* server.stat_active_defrag_hits captured at beginning of cycle */ + long long start_defrag_misses; /* server.stat_active_defrag_misses captured at beginning of cycle */ + float start_frag_pct; /* Fragmention percent of beginning of defrag cycle */ + float decay_rate; /* Defrag speed decay rate */ + + list *remaining_stages; /* List of stages which remain to be processed */ + listNode *current_stage; /* The list node of stage that's currently being processed */ + + long long timeproc_id; /* Eventloop ID of the timerproc (or AE_DELETED_EVENT_ID) */ + monotime timeproc_end_time; /* Ending time of previous timerproc execution */ + long timeproc_overage_us; /* A correction value if over target CPU percent */ +}; +static struct DefragContext defrag = {0, 0, 0, 0, 1.0f}; + +#define ITER_SLOT_DEFRAG_LUT (-2) +#define ITER_SLOT_UNASSIGNED (-1) + +/* There are a number of stages which process a kvstore. To simplify this, a stage helper function + * `defragStageKvstoreHelper()` is defined. This function aids in iterating over the kvstore. It + * uses these definitions. + */ +/* State of the kvstore helper. The context passed to the kvstore helper MUST BEGIN + * with a kvstoreIterState (or be passed as NULL). */ +typedef struct { + kvstore *kvs; + int slot; /* Consider defines ITER_SLOT_XXX for special values. */ + unsigned long cursor; +} kvstoreIterState; +#define INIT_KVSTORE_STATE(kvs) ((kvstoreIterState){(kvs), ITER_SLOT_DEFRAG_LUT, 0}) + +/* The kvstore helper uses this function to perform tasks before continuing the iteration. For the + * main dictionary, large items are set aside and processed by this function before continuing with + * iteration over the kvstore. + * endtime - This is the monotonic time that the function should end and return. + * ctx - Context for functions invoked by the helper. If provided in the call to + * `defragStageKvstoreHelper()`, the `kvstoreIterState` portion (at the beginning) + * will be updated with the current kvstore iteration status. + * + * Returns: + * - DEFRAG_DONE if the pre-continue work is complete + * - DEFRAG_NOT_DONE if there is more work to do + */ +typedef doneStatus (*kvstoreHelperPreContinueFn)(void *ctx, monotime endtime); + +typedef struct { + kvstoreIterState kvstate; + int dbid; + + /* When scanning a main kvstore, large elements are queued for later handling rather than + * causing a large latency spike while processing a hash table bucket. This list is only used + * for stage: "defragStageDbKeys". It will only contain values for the current kvstore being + * defragged. + * Note that this is a list of key names. It's possible that the key may be deleted or modified + * before "later" and we will search by key name to find the entry when we defrag the item later. */ + list *defrag_later; + unsigned long defrag_later_cursor; +} defragKeysCtx; +static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); + +/* Context for subexpires */ +typedef struct { + estore *subexpires; + int slot; /* Consider defines ITER_SLOT_XXX for special values. */ + int dbid; + unsigned long cursor; +} defragSubexpiresCtx; + +/* Context for pubsub kvstores */ +typedef dict *(*getClientChannelsFn)(client *); +typedef struct { + kvstoreIterState kvstate; + getClientChannelsFn getPubSubChannels; +} defragPubSubCtx; +static_assert(offsetof(defragPubSubCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); + +typedef struct { + sds module_name; + unsigned long cursor; +} defragModuleCtx; + +/* this method was added to jemalloc in order to help us understand which + * pointers are worthwhile moving and which aren't */ +int je_get_defrag_hint(void* ptr); + +#if !defined(DEBUG_DEFRAG_FORCE) +/* Defrag helper for generic allocations without freeing old pointer. + * + * Note: The caller is responsible for freeing the old pointer if this function + * returns a non-NULL value. */ +void* activeDefragAllocWithoutFree(void *ptr) { + size_t size; + void *newptr; + if(!je_get_defrag_hint(ptr)) { + server.stat_active_defrag_misses++; + return NULL; + } + /* move this allocation to a new allocation. + * make sure not to use the thread cache. so that we don't get back the same + * pointers we try to free */ + size = zmalloc_usable_size(ptr); + newptr = zmalloc_no_tcache(size); + memcpy(newptr, ptr, size); + server.stat_active_defrag_hits++; + return newptr; +} + +void activeDefragFree(void *ptr) { + zfree_no_tcache(ptr); +} + +/* Defrag helper for generic allocations. + * + * returns NULL in case the allocation wasn't moved. + * when it returns a non-null value, the old pointer was already released + * and should NOT be accessed. */ +void* activeDefragAlloc(void *ptr) { + void *newptr = activeDefragAllocWithoutFree(ptr); + if (newptr) + activeDefragFree(ptr); + return newptr; +} + +/* Raw memory allocation for defrag, avoid using tcache. */ +void *activeDefragAllocRaw(size_t size) { + return zmalloc_no_tcache(size); +} + +/* Raw memory free for defrag, avoid using tcache. */ +void activeDefragFreeRaw(void *ptr) { + activeDefragFree(ptr); + server.stat_active_defrag_hits++; +} +#else +void *activeDefragAllocWithoutFree(void *ptr) { + size_t size; + void *newptr; + size = zmalloc_usable_size(ptr); + newptr = zmalloc(size); + memcpy(newptr, ptr, size); + server.stat_active_defrag_hits++; + return newptr; +} + +void activeDefragFree(void *ptr) { + zfree(ptr); +} + +void *activeDefragAlloc(void *ptr) { + void *newptr = activeDefragAllocWithoutFree(ptr); + if (newptr) + activeDefragFree(ptr); + return newptr; +} + +void *activeDefragAllocRaw(size_t size) { + return zmalloc(size); +} + +void activeDefragFreeRaw(void *ptr) { + zfree(ptr); + server.stat_active_defrag_hits++; +} +#endif + +/*Defrag helper for sds strings + * + * returns NULL in case the allocation wasn't moved. + * when it returns a non-null value, the old pointer was already released + * and should NOT be accessed. */ +sds activeDefragSds(sds sdsptr) { + void* ptr = sdsAllocPtr(sdsptr); + void* newptr = activeDefragAlloc(ptr); + if (newptr) { + size_t offset = sdsptr - (char*)ptr; + sdsptr = (char*)newptr + offset; + return sdsptr; + } + return NULL; +} + +/* Defrag helper for hfield (entry) strings + * + * returns NULL in case the allocation wasn't moved. + * when it returns a non-null value, the old pointer was already released + * and should NOT be accessed. */ +Entry *activeDefragEntry(Entry *entry) { + Entry *ret = NULL; + + /* First, defrag the entry allocation itself */ + void *ptr = entryGetAllocPtr(entry); + void *newptr = activeDefragAlloc(ptr); + if (newptr) { + size_t offset = (char*)entry - (char*)ptr; + entry = (Entry *)((char*)newptr + offset); + ret = entry; + } + + /* Then defrag the value if it's not embedded (using the potentially new entry) */ + sds *valuePtr = entryGetValuePtrRef(entry); + if (valuePtr) { + sds new_value = activeDefragSds(*valuePtr); + if (new_value) *valuePtr = new_value; + } + + return ret; +} + +/* Defrag helper for hfield strings and update the reference in the dict. + * + * returns NULL in case the allocation wasn't moved. + * when it returns a non-null value, the old pointer was already released + * and should NOT be accessed. */ +void *activeDefragHfieldAndUpdateRef(void *ptr, void *privdata) { + dict *d = privdata; + dictEntryLink link; + + /* Before the key is released, obtain the link to + * ensure we can safely access and update the key. */ + link = dictFindLink(d, ptr, NULL); + serverAssert(link); + + Entry *newEntry = activeDefragEntry(ptr); + if (newEntry) + dictSetKeyAtLink(d, newEntry, &link, 0); + return newEntry; +} + +/* Defrag helper for robj and/or string objects with expected refcount. + * + * Like activeDefragStringOb, but it requires the caller to pass in the expected + * reference count. In some cases, the caller needs to update a robj whose + * reference count is not 1, in these cases, the caller must explicitly pass + * in the reference count, otherwise defragmentation will not be performed. + * Note that the caller is responsible for updating any other references to the robj. */ +robj *activeDefragStringObEx(robj* ob, int expected_refcount) { + robj *ret = NULL; + if (ob->refcount!=expected_refcount) + return NULL; + + /* try to defrag robj (only if not an EMBSTR type (handled below). */ + if (ob->type!=OBJ_STRING || ob->encoding!=OBJ_ENCODING_EMBSTR) { + if ((ret = activeDefragAlloc(ob))) { + ob = ret; + } + } + + /* try to defrag string object */ + if (ob->type == OBJ_STRING) { + if(ob->encoding==OBJ_ENCODING_RAW) { + sds newsds = activeDefragSds((sds)ob->ptr); + if (newsds) { + ob->ptr = newsds; + } + } else if (ob->encoding==OBJ_ENCODING_EMBSTR) { + /* The sds is embedded in the object allocation, calculate the + * offset and update the pointer in the new allocation. */ + long ofs = (intptr_t)ob->ptr - (intptr_t)ob; + if ((ret = activeDefragAlloc(ob))) { + ret->ptr = (void*)((intptr_t)ret + ofs); + } + } else if (ob->encoding!=OBJ_ENCODING_INT) { + serverPanic("Unknown string encoding"); + } + } + return ret; +} + +/* Defrag helper for robj and/or string objects + * + * returns NULL in case the allocation wasn't moved. + * when it returns a non-null value, the old pointer was already released + * and should NOT be accessed. */ +robj *activeDefragStringOb(robj* ob) { + return activeDefragStringObEx(ob, 1); +} + +/* Defrag helper for lua scripts + * + * returns NULL in case the allocation wasn't moved. + * when it returns a non-null value, the old pointer was already released + * and should NOT be accessed. */ +luaScript *activeDefragLuaScript(luaScript *script) { + luaScript *ret = NULL; + + /* try to defrag script struct */ + if ((ret = activeDefragAlloc(script))) { + script = ret; + } + + /* try to defrag actual script object */ + robj *ob = activeDefragStringOb(script->body); + if (ob) script->body = ob; + + return ret; +} + +/* Defrag helper for dict main allocations (dict struct, and hash tables). + * Receives a pointer to the dict* and return a new dict* when the dict + * struct itself was moved. + * + * Returns NULL in case the allocation wasn't moved. + * When it returns a non-null value, the old pointer was already released + * and should NOT be accessed. */ +dict *dictDefragTables(dict *d) { + dict *ret = NULL; + dictEntry **newtable; + /* handle the dict struct */ + if ((ret = activeDefragAlloc(d))) + d = ret; + /* handle the first hash table */ + if (!d->ht_table[0]) return ret; /* created but unused */ + newtable = activeDefragAlloc(d->ht_table[0]); + if (newtable) + d->ht_table[0] = newtable; + /* handle the second hash table */ + if (d->ht_table[1]) { + newtable = activeDefragAlloc(d->ht_table[1]); + if (newtable) + d->ht_table[1] = newtable; + } + return ret; +} + +/* Internal function used by activeDefragZsetNode */ +void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnode, zskiplistNode **update) { + int i; + for (i = 0; i < zsl->level; i++) { + if (update[i]->level[i].forward == oldnode) + update[i]->level[i].forward = newnode; + } + serverAssert(zsl->header!=oldnode); + if (newnode->level[0].forward) { + serverAssert(newnode->level[0].forward->backward==oldnode); + newnode->level[0].forward->backward = newnode; + } else { + serverAssert(zsl->tail==oldnode); + zsl->tail = newnode; + } +} + +/* Defrag a single zset node, update dictEntry and skiplist struct */ +void activeDefragZsetNode(zset *zs, dictEntry *de, dictEntryLink plink) { + zskiplistNode *znode = dictGetKey(de); + + /* Try to defrag the skiplist node first */ + zskiplistNode *newnode = activeDefragAllocWithoutFree(znode); + if (!newnode) return; /* No defrag needed */ + + /* Node was defragged, now we need to update all skiplist pointers */ + zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *iter; + int i; + double score = newnode->score; + sds ele = zslGetNodeElement(newnode); + + /* Find all pointers that need to be updated */ + iter = zs->zsl->header; + for (i = zs->zsl->level-1; i >= 0; i--) { + while (iter->level[i].forward && + iter->level[i].forward != znode && + zslCompareWithNode(score, ele, iter->level[i].forward) > 0) + iter = iter->level[i].forward; + update[i] = iter; + } + + /* Verify we found the right node */ + iter = iter->level[0].forward; + serverAssert(iter && iter == znode); + + /* Update all skiplist pointers and dict key */ + zslUpdateNode(zs->zsl, znode, newnode, update); + dictSetKeyAtLink(zs->dict, newnode, &plink, 0); + + /* Free the old node now that all pointers have been updated */ + activeDefragFree(znode); +} + +#define DEFRAG_SDS_DICT_NO_VAL 0 +#define DEFRAG_SDS_DICT_VAL_IS_SDS 1 +#define DEFRAG_SDS_DICT_VAL_IS_STROB 2 +#define DEFRAG_SDS_DICT_VAL_VOID_PTR 3 +#define DEFRAG_SDS_DICT_VAL_LUA_SCRIPT 4 + +void activeDefragSdsDictCallback(void *privdata, const dictEntry *de, dictEntryLink plink) { + UNUSED(plink); + UNUSED(privdata); + UNUSED(de); +} + +void activeDefragLuaScriptDictCallback(void *privdata, const dictEntry *de, dictEntryLink plink) { + UNUSED(plink); + UNUSED(privdata); + + /* If this luaScript is in the LRU list, unconditionally update the node's + * value pointer to the current dict key (regardless of reallocation). */ + luaScript *script = dictGetVal(de); + if (script->node) + script->node->value = dictGetKey(de); +} + +void activeDefragHfieldDictCallback(void *privdata, const dictEntry *de, dictEntryLink plink) { + dict *d = privdata; + Entry *newEntry = NULL, *entry = dictGetKey(de); + + /* If the hfield does not have TTL, we directly defrag it. + * Fields with TTL are skipped here and will be defragmented later + * during the hash expiry ebuckets defragmentation phase. */ + if (entryGetExpiry(entry) == EB_EXPIRE_TIME_INVALID) { + if ((newEntry = activeDefragEntry(entry))) { + /* Hash dicts use no_value=1, so we must use dictSetKeyAtLink */ + dictSetKeyAtLink(d, newEntry, &plink, 0); + } + } +} + +/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */ +void activeDefragSdsDict(dict* d, int val_type) { + unsigned long cursor = 0; + dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = (dictDefragAllocFunction *)activeDefragSds, + .defragVal = (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS ? (dictDefragAllocFunction *)activeDefragSds : + val_type == DEFRAG_SDS_DICT_VAL_IS_STROB ? (dictDefragAllocFunction *)activeDefragStringOb : + val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR ? (dictDefragAllocFunction *)activeDefragAlloc : + val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT ? (dictDefragAllocFunction *)activeDefragLuaScript : + NULL) + }; + dictScanFunction *fn = (val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT ? + activeDefragLuaScriptDictCallback : activeDefragSdsDictCallback); + do { + cursor = dictScanDefrag(d, cursor, fn, + &defragfns, NULL); + } while (cursor != 0); +} + +/* Defrag a dict with hfield key (no separate value - value is part of entry). */ +void activeDefragHfieldDict(dict *d) { + unsigned long cursor = 0; + dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, /* Only defrag dictEntry */ + .defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */ + .defragVal = NULL /* No separate value - value is part of the entry (hfield). */ + }; + do { + cursor = dictScanDefrag(d, cursor, activeDefragHfieldDictCallback, + &defragfns, d); + } while (cursor != 0); + + /* Continue with defragmentation of hash fields that have with TTL. + * During the dictionary defragmentaion above, we skipped fields with TTL, + * Now we continue to defrag those fields by using the expiry buckets. */ + if (d->type == &entryHashDictTypeWithHFE) { + cursor = 0; + ebDefragFunctions eb_defragfns = { + .defragAlloc = activeDefragAlloc, + .defragItem = activeDefragHfieldAndUpdateRef + }; + ebuckets *eb = hashTypeGetDictMetaHFE(d); + while (ebScanDefrag(eb, &hashFieldExpireBucketsType, &cursor, &eb_defragfns, d)) {} + } +} + +/* Defrag a list of ptr, sds or robj string values */ +void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { + quicklistNode *newnode, *node = *node_ref; + unsigned char *newzl; + if ((newnode = activeDefragAlloc(node))) { + if (newnode->prev) + newnode->prev->next = newnode; + else + ql->head = newnode; + if (newnode->next) + newnode->next->prev = newnode; + else + ql->tail = newnode; + *node_ref = node = newnode; + } + if ((newzl = activeDefragAlloc(node->entry))) + node->entry = newzl; +} + +void activeDefragQuickListNodes(quicklist *ql) { + quicklistNode *node = ql->head; + while (node) { + activeDefragQuickListNode(ql, &node); + node = node->next; + } +} + +/* when the value has lots of elements, we want to handle it later and not as + * part of the main dictionary scan. this is needed in order to prevent latency + * spikes when handling large items */ +void defragLater(defragKeysCtx *ctx, kvobj *kv) { + if (!ctx->defrag_later) { + ctx->defrag_later = listCreate(); + listSetFreeMethod(ctx->defrag_later, sdsfreegeneric); + ctx->defrag_later_cursor = 0; + } + sds key = sdsdup(kvobjGetKey(kv)); + listAddNodeTail(ctx->defrag_later, key); +} + +/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ +long scanLaterList(robj *ob, unsigned long *cursor, monotime endtime) { + quicklist *ql = ob->ptr; + quicklistNode *node; + long iterations = 0; + int bookmark_failed = 0; + serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST); + + if (*cursor == 0) { + /* if cursor is 0, we start new iteration */ + node = ql->head; + } else { + node = quicklistBookmarkFind(ql, "_AD"); + if (!node) { + /* if the bookmark was deleted, it means we reached the end. */ + *cursor = 0; + return 0; + } + node = node->next; + } + + (*cursor)++; + while (node) { + activeDefragQuickListNode(ql, &node); + server.stat_active_defrag_scanned++; + if (++iterations > 128 && !bookmark_failed) { + if (getMonotonicUs() > endtime) { + if (!quicklistBookmarkCreate(&ql, "_AD", node)) { + bookmark_failed = 1; + } else { + ob->ptr = ql; /* bookmark creation may have re-allocated the quicklist */ + return 1; + } + } + iterations = 0; + } + node = node->next; + } + quicklistBookmarkDelete(ql, "_AD"); + *cursor = 0; + return bookmark_failed? 1: 0; +} + +typedef struct { + zset *zs; +} scanLaterZsetData; + +void scanZsetCallback(void *privdata, const dictEntry *_de, dictEntryLink plink) { + dictEntry *de = (dictEntry*)_de; + scanLaterZsetData *data = privdata; + activeDefragZsetNode(data->zs, de, plink); + server.stat_active_defrag_scanned++; +} + +void scanLaterZset(robj *ob, unsigned long *cursor) { + serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST); + zset *zs = (zset*)ob->ptr; + dict *d = zs->dict; + scanLaterZsetData data = {zs}; + dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc}; + *cursor = dictScanDefrag(d, *cursor, scanZsetCallback, &defragfns, &data); +} + +/* Used as scan callback when all the work is done in the dictDefragFunctions. */ +void scanCallbackCountScanned(void *privdata, const dictEntry *de, dictEntryLink plink) { + UNUSED(plink); + UNUSED(privdata); + UNUSED(de); + server.stat_active_defrag_scanned++; +} + +void scanLaterSet(robj *ob, unsigned long *cursor) { + serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT); + dict *d = ob->ptr; + dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = (dictDefragAllocFunction *)activeDefragSds + }; + *cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL); +} + +void scanLaterHash(robj *ob, unsigned long *cursor) { + serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT); + dict *d = ob->ptr; + + typedef enum { + HASH_DEFRAG_NONE = 0, + HASH_DEFRAG_DICT = 1, + HASH_DEFRAG_EBUCKETS = 2 + } hashDefragPhase; + static hashDefragPhase defrag_phase = HASH_DEFRAG_NONE; + + /* Start a new hash defrag. */ + if (!*cursor || defrag_phase == HASH_DEFRAG_NONE) + defrag_phase = HASH_DEFRAG_DICT; + + /* Defrag hash dictionary but skip TTL fields. */ + if (defrag_phase == HASH_DEFRAG_DICT) { + dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */ + .defragVal = NULL /* value stored along with key as part of Entry */ + }; + *cursor = dictScanDefrag(d, *cursor, activeDefragHfieldDictCallback, &defragfns, d); + + /* Move to next phase. */ + if (!*cursor) defrag_phase = HASH_DEFRAG_EBUCKETS; + } + + /* Defrag ebuckets and TTL fields. */ + if (defrag_phase == HASH_DEFRAG_EBUCKETS) { + if (d->type == &entryHashDictTypeWithHFE) { + ebDefragFunctions eb_defragfns = { + .defragAlloc = activeDefragAlloc, + .defragItem = activeDefragHfieldAndUpdateRef + }; + ebuckets *eb = hashTypeGetDictMetaHFE(d); + ebScanDefrag(eb, &hashFieldExpireBucketsType, cursor, &eb_defragfns, d); + } else { + /* Finish defragmentation if this dict doesn't have expired fields. */ + *cursor = 0; + } + if (!*cursor) defrag_phase = HASH_DEFRAG_NONE; + } +} + +void defragQuicklist(defragKeysCtx *ctx, kvobj *kv) { + quicklist *ql = kv->ptr, *newql; + serverAssert(kv->type == OBJ_LIST && kv->encoding == OBJ_ENCODING_QUICKLIST); + if ((newql = activeDefragAlloc(ql))) + kv->ptr = ql = newql; + if (ql->len > server.active_defrag_max_scan_fields) + defragLater(ctx, kv); + else + activeDefragQuickListNodes(ql); +} + +void defragZsetSkiplist(defragKeysCtx *ctx, kvobj *ob) { + zset *zs = (zset*)ob->ptr; + zset *newzs; + zskiplist *newzsl; + dict *newdict; + struct zskiplistNode *newheader; + serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST); + if ((newzs = activeDefragAlloc(zs))) + ob->ptr = zs = newzs; + if ((newzsl = activeDefragAlloc(zs->zsl))) + zs->zsl = newzsl; + if ((newheader = activeDefragAlloc(zs->zsl->header))) + zs->zsl->header = newheader; + if (dictSize(zs->dict) > server.active_defrag_max_scan_fields) + defragLater(ctx, ob); + else { + /* Use dictScanDefrag to iterate and defrag both dictEntry structures and skiplist nodes. + * dictScanDefrag handles defragging dictEntry/dictEntryNoValue structures via defragfns, + * and calls our callback with plink for each entry so we can defrag skiplist nodes. */ + scanLaterZsetData data = {zs}; + dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc}; + unsigned long cursor = 0; + do { + cursor = dictScanDefrag(zs->dict, cursor, scanZsetCallback, &defragfns, &data); + } while (cursor != 0); + } + /* defrag the dict struct and tables */ + if ((newdict = dictDefragTables(zs->dict))) + zs->dict = newdict; +} + +void defragHash(defragKeysCtx *ctx, kvobj *ob) { + dict *d, *newd; + serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT); + d = ob->ptr; + if (dictSize(d) > server.active_defrag_max_scan_fields) + defragLater(ctx, ob); + else + activeDefragHfieldDict(d); + /* defrag the dict struct and tables */ + if ((newd = dictDefragTables(ob->ptr))) + ob->ptr = newd; +} + +void defragSet(defragKeysCtx *ctx, kvobj *ob) { + dict *d, *newd; + serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT); + d = ob->ptr; + if (dictSize(d) > server.active_defrag_max_scan_fields) + defragLater(ctx, ob); + else + activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL); + /* defrag the dict struct and tables */ + if ((newd = dictDefragTables(ob->ptr))) + ob->ptr = newd; +} + +/* Defrag callback for radix tree iterator, called for each node, + * used in order to defrag the nodes allocations. */ +int defragRaxNode(raxNode **noderef, void *privdata) { + UNUSED(privdata); + raxNode *newnode = activeDefragAlloc(*noderef); + if (newnode) { + *noderef = newnode; + return 1; + } + return 0; +} + +/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ +int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime) { + static unsigned char next[sizeof(streamID)]; + raxIterator ri; + long iterations = 0; + serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM); + + stream *s = ob->ptr; + raxStart(&ri,s->rax); + if (*cursor == 0) { + /* if cursor is 0, we start new iteration */ + defragRaxNode(&s->rax->head, NULL); + /* assign the iterator node callback before the seek, so that the + * initial nodes that are processed till the first item are covered */ + ri.node_cb = defragRaxNode; + raxSeek(&ri,"^",NULL,0); + } else { + /* if cursor is non-zero, we seek to the static 'next'. + * Since node_cb is set after seek operation, any node traversed during seek wouldn't + * be defragmented. To prevent this, we advance to next node before exiting previous + * run, ensuring it gets defragmented instead of being skipped during current seek. */ + if (!raxSeek(&ri,">=", next, sizeof(next))) { + *cursor = 0; + raxStop(&ri); + return 0; + } + /* assign the iterator node callback after the seek, so that the + * initial nodes that are processed till now aren't covered */ + ri.node_cb = defragRaxNode; + } + + (*cursor)++; + while (raxNext(&ri)) { + void *newdata = activeDefragAlloc(ri.data); + if (newdata) + raxSetData(ri.node, ri.data=newdata); + server.stat_active_defrag_scanned++; + if (++iterations > 128) { + if (getMonotonicUs() > endtime) { + /* Move to next node. */ + if (!raxNext(&ri)) { + /* If we reached the end, we can stop */ + *cursor = 0; + raxStop(&ri); + return 0; + } + serverAssert(ri.key_len==sizeof(next)); + memcpy(next,ri.key,ri.key_len); + raxStop(&ri); + return 1; + } + iterations = 0; + } + } + raxStop(&ri); + *cursor = 0; + return 0; +} + +/* optional callback used defrag each rax element (not including the element pointer itself) */ +typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata); + +/* defrag radix tree including: + * 1) rax struct + * 2) rax nodes + * 3) rax entry data (only if defrag_data is specified) + * 4) call a callback per element, and allow the callback to return a new pointer for the element */ +void defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) { + raxIterator ri; + rax* rax; + if ((rax = activeDefragAlloc(*raxref))) + *raxref = rax; + rax = *raxref; + raxStart(&ri,rax); + ri.node_cb = defragRaxNode; + defragRaxNode(&rax->head, NULL); + raxSeek(&ri,"^",NULL,0); + while (raxNext(&ri)) { + void *newdata = NULL; + if (element_cb) + newdata = element_cb(&ri, element_cb_data); + if (defrag_data && !newdata) + newdata = activeDefragAlloc(ri.data); + if (newdata) + raxSetData(ri.node, ri.data=newdata); + } + raxStop(&ri); +} + +typedef struct { + streamCG *cg; + streamConsumer *c; +} PendingEntryContext; + +void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) { + PendingEntryContext *ctx = privdata; + streamNACK *nack = ri->data, *newnack; + nack->consumer = ctx->c; /* update nack pointer to consumer */ + nack->cgroup_ref_node->value = ctx->cg; /* Update the value of cgroups_ref node to the consumer group. */ + newnack = activeDefragAlloc(nack); + if (newnack) { + /* Update consumer group pointer to the nack. + * pel_by_time doesn't need updating since delivery time is unchanged. */ + void *prev; + raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev); + serverAssert(prev==nack); + } + return newnack; +} + +typedef struct { + stream *s; + streamCG *cg; +} StreamConsumerContext; + +void* defragStreamConsumer(raxIterator *ri, void *privdata) { + StreamConsumerContext *ctx = privdata; + stream *s = ctx->s; + streamCG *cg = ctx->cg; + streamConsumer *c = ri->data; + void *newc = activeDefragAlloc(c); + if (newc) { + c = newc; + } + sds newsds = activeDefragSds(c->name); + if (newsds) + c->name = newsds; + if (c->pel) { + /* Update pel back-pointer to new stream */ + c->pel->alloc_size = &s->alloc_size; + PendingEntryContext pel_ctx = {cg, c}; + defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx); + } + return newc; /* returns NULL if c was not defragged */ +} + +void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) { + stream *s = privdata; + streamCG *newcg, *cg = ri->data; + if ((newcg = activeDefragAlloc(cg))) + cg = newcg; + if (cg->pel) { + /* Update pel back-pointer to new stream */ + cg->pel->alloc_size = &s->alloc_size; + defragRadixTree(&cg->pel, 0, NULL, NULL); + } + if (cg->pel_by_time) { + /* Update pel_by_time back-pointer to new stream */ + cg->pel_by_time->alloc_size = &s->alloc_size; + defragRadixTree(&cg->pel_by_time, 0, NULL, NULL); + } + if (cg->consumers) { + /* Update consumers back-pointer to new stream */ + cg->consumers->alloc_size = &s->alloc_size; + StreamConsumerContext consumer_ctx = {s, cg}; + defragRadixTree(&cg->consumers, 0, defragStreamConsumer, &consumer_ctx); + } + return cg; +} + +/* Defrag a single idmpProducer's dict and linked list entries. */ +static void defragIdmpProducer(idmpProducer *producer) { + if (producer->idmp_dict == NULL) return; + + dict *newdict = dictDefragTables(producer->idmp_dict); + if (newdict) + producer->idmp_dict = newdict; + + idmpEntry *prev = NULL; + idmpEntry *entry = producer->idmp_head; + while (entry != NULL) { + idmpEntry *next = entry->next; + idmpEntry *newentry = activeDefragAllocWithoutFree(entry); + if (newentry) { + dictEntry *de = dictFind(producer->idmp_dict, entry); + serverAssert(de); + dictSetKey(producer->idmp_dict, de, newentry); + if (prev) + prev->next = newentry; + else + producer->idmp_head = newentry; + if (producer->idmp_tail == entry) + producer->idmp_tail = newentry; + activeDefragFree(entry); + entry = newentry; + } + prev = entry; + entry = next; + } +} + +/* Defrag all IDMP producers and their dict/linked list entries. */ +void defragStreamIdmpProducers(stream *s) { + if (s->idmp_producers == NULL) return; + + /* Defrag the producers rax tree itself */ + rax *newrax = activeDefragAlloc(s->idmp_producers); + if (newrax) + s->idmp_producers = newrax; + + /* Defrag the rax head node */ + defragRaxNode(&s->idmp_producers->head, NULL); + + /* Iterate through all producers and defrag each one */ + raxIterator ri; + raxStart(&ri, s->idmp_producers); + /* Set the node callback to defrag internal rax nodes */ + ri.node_cb = defragRaxNode; + raxSeek(&ri, "^", NULL, 0); + while (raxNext(&ri)) { + idmpProducer *producer = ri.data; + idmpProducer *newproducer = activeDefragAlloc(producer); + if (newproducer) { + raxSetData(ri.node, ri.data=newproducer); + producer = newproducer; + } + defragIdmpProducer(producer); + } + raxStop(&ri); +} + +void defragStream(defragKeysCtx *ctx, kvobj *ob) { + serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM); + stream *s = ob->ptr, *news; + + /* handle the main struct */ + if ((news = activeDefragAlloc(s))) + ob->ptr = s = news; + + /* Update rax back-pointer to new stream */ + s->rax->alloc_size = &s->alloc_size; + if (raxSize(s->rax) > server.active_defrag_max_scan_fields) { + rax *newrax = activeDefragAlloc(s->rax); + if (newrax) + s->rax = newrax; + defragLater(ctx, ob); + } else + defragRadixTree(&s->rax, 1, NULL, NULL); + + if (s->cgroups) { + /* Update cgroups back-pointer to new stream */ + s->cgroups->alloc_size = &s->alloc_size; + defragRadixTree(&s->cgroups, 0, defragStreamConsumerGroup, s); + } + + if (s->cgroups_ref) { + /* Update cgroups_ref back-pointer to new stream */ + s->cgroups_ref->alloc_size = &s->alloc_size; + } + + if (s->idmp_producers) { + /* Defrag the producers and all idmpEntry structures in their linked lists */ + defragStreamIdmpProducers(s); + } +} + +/* Defrag a module key. This is either done immediately or scheduled + * for later. Returns then number of pointers defragged. + */ +void defragModule(defragKeysCtx *ctx, redisDb *db, kvobj *kv) { + serverAssert(kv->type == OBJ_MODULE); + robj keyobj; + initStaticStringObject(keyobj, kvobjGetKey(kv)); + if (!moduleDefragValue(&keyobj, kv, db->id)) + defragLater(ctx, kv); +} + +/* Defrag a kvobj structure, taking into account optional preceding metadata. + * For EMBSTR strings, also defrags the embedded string value in the same allocation. + * For RAW strings and other types, only the kvobj wrapper is defragged here; + * the value's internal data structures are defragged separately in defragKey(). + * + * Returns NULL if the allocation wasn't moved. + * When it returns a non-null value, the old pointer was already released + * (unless without_free is set) and should NOT be accessed. */ +robj *activeDefragKvobj(kvobj* kv, int without_free) { + void *alloc, *newalloc; + kvobj *kvNew = NULL; + /* Use LONG_MIN as sentinel to detect if we have an EMBSTR string */ + long offsetEmbstr = LONG_MIN; + + /* Don't defrag kvobj's with multiple references (refcount > 1) */ + if (kv->refcount != 1) + return NULL; + + /* Calculate offset for EMBSTR strings */ + if ((kv->type == OBJ_STRING) && (kv->encoding == OBJ_ENCODING_EMBSTR)) + offsetEmbstr = (intptr_t)kv->ptr - (intptr_t)kv; + + /* Defrag the kvobj allocation (including optional metadata prefix). + * For EMBSTR strings, this allocation also contains the embedded string data, + * so we'll need to recalculate the ptr offset after defragmentation (see below). */ + + alloc = kvobjGetAllocPtr(kv); + size_t metaBytes = (char *)kv - (char *)alloc; + if (without_free) + newalloc = activeDefragAllocWithoutFree(alloc); + else + newalloc = activeDefragAlloc(alloc); + + if (!newalloc) + return NULL; + + /* Update kv pointer to new allocation */ + kvNew = (kvobj *)((char *)newalloc + metaBytes); + + /* For EMBSTR strings, recalculate ptr to point to the embedded string data + * at the same offset within the new allocation */ + if (offsetEmbstr != LONG_MIN) + kvNew->ptr = (void*)((intptr_t)kvNew + offsetEmbstr); + + return kvNew; +} + +/* for each key we scan in the main dict, this function will attempt to defrag + * all the various pointers it has. */ +void defragKey(defragKeysCtx *ctx, dictEntry *de, dictEntryLink link) { + UNUSED(link); + dictEntryLink exlink = NULL; + kvobj *kvnew = NULL, *ob = dictGetKV(de); + size_t oldsize = 0; + redisDb *db = &server.db[ctx->dbid]; + int slot = ctx->kvstate.slot; + unsigned char *newzl; + + if (server.memory_tracking_per_slot) + oldsize = kvobjAllocSize(ob); + + long long expire = kvobjGetExpire(ob); + /* We can't search in db->expires for that KV after we've released + * the pointer it holds, since it won't be able to do the string + * compare. Search it before, if needed. */ + if (expire != -1) { + exlink = kvstoreDictFindLink(db->expires, slot, kvobjGetKey(ob), NULL); + serverAssert(exlink != NULL); + } + + /* Try to defrag robj. For hash objects with HFEs, + * defer defragmentation until processing db's subexpires. */ + if (!(ob->type == OBJ_HASH && hashTypeGetMinExpire(ob, 0) != EB_EXPIRE_TIME_INVALID)) { + /* If the dict doesn't have metadata, we directly defrag it. */ + kvnew = activeDefragKvobj(ob, 0); + } + if (kvnew) { + kvstoreDictSetAtLink(db->keys, slot, kvnew, &link, 0); + if (expire != -1) + kvstoreDictSetAtLink(db->expires, slot, kvnew, &exlink, 0); + ob = kvnew; + } + + if (ob->type == OBJ_STRING) { + /* Only defrag strings with refcount==1 (String might be shared as dict + * keys, e.g. pub/sub channels, and may be accessed by IO threads. Other + * types are never used as dict keys) */ + if ((ob->refcount==1) && (ob->encoding == OBJ_ENCODING_RAW)) { + /* For RAW strings, defrag the separate SDS allocation */ + sds newsds = activeDefragSds((sds)ob->ptr); + if (newsds) ob->ptr = newsds; + } + } else if (ob->type == OBJ_LIST) { + if (ob->encoding == OBJ_ENCODING_QUICKLIST) { + defragQuicklist(ctx, ob); + } else if (ob->encoding == OBJ_ENCODING_LISTPACK) { + if ((newzl = activeDefragAlloc(ob->ptr))) + ob->ptr = newzl; + } else { + serverPanic("Unknown list encoding"); + } + } else if (ob->type == OBJ_SET) { + if (ob->encoding == OBJ_ENCODING_HT) { + defragSet(ctx, ob); + } else if (ob->encoding == OBJ_ENCODING_INTSET || + ob->encoding == OBJ_ENCODING_LISTPACK) + { + void *newptr, *ptr = ob->ptr; + if ((newptr = activeDefragAlloc(ptr))) + ob->ptr = newptr; + } else { + serverPanic("Unknown set encoding"); + } + } else if (ob->type == OBJ_ZSET) { + if (ob->encoding == OBJ_ENCODING_LISTPACK) { + if ((newzl = activeDefragAlloc(ob->ptr))) + ob->ptr = newzl; + } else if (ob->encoding == OBJ_ENCODING_SKIPLIST) { + defragZsetSkiplist(ctx, ob); + } else { + serverPanic("Unknown sorted set encoding"); + } + } else if (ob->type == OBJ_HASH) { + if (ob->encoding == OBJ_ENCODING_LISTPACK) { + if ((newzl = activeDefragAlloc(ob->ptr))) + ob->ptr = newzl; + } else if (ob->encoding == OBJ_ENCODING_LISTPACK_EX) { + listpackEx *newlpt, *lpt = (listpackEx*)ob->ptr; + if ((newlpt = activeDefragAlloc(lpt))) + ob->ptr = lpt = newlpt; + if ((newzl = activeDefragAlloc(lpt->lp))) + lpt->lp = newzl; + } else if (ob->encoding == OBJ_ENCODING_HT) { + defragHash(ctx, ob); + } else { + serverPanic("Unknown hash encoding"); + } + } else if (ob->type == OBJ_STREAM) { + defragStream(ctx, ob); + } else if (ob->type == OBJ_MODULE) { + defragModule(ctx,db, ob); + } else { + serverPanic("Unknown object type"); + } + if (server.memory_tracking_per_slot) + updateSlotAllocSize(db, slot, oldsize, kvobjAllocSize(ob)); +} + +/* Defrag scan callback for the main db dictionary. */ +static void dbKeysScanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) { + long long hits_before = server.stat_active_defrag_hits; + defragKey((defragKeysCtx *)privdata, (dictEntry *)de, plink); + if (server.stat_active_defrag_hits != hits_before) + server.stat_active_defrag_key_hits++; + else + server.stat_active_defrag_key_misses++; + server.stat_active_defrag_scanned++; +} + +#if !defined(DEBUG_DEFRAG_FORCE) +/* Utility function to get the fragmentation ratio from jemalloc. + * It is critical to do that by comparing only heap maps that belong to + * jemalloc, and skip ones the jemalloc keeps as spare. Since we use this + * fragmentation ratio in order to decide if a defrag action should be taken + * or not, a false detection can cause the defragmenter to waste a lot of CPU + * without the possibility of getting any results. */ +float getAllocatorFragmentation(size_t *out_frag_bytes) { + size_t resident, active, allocated, frag_smallbins_bytes; + zmalloc_get_allocator_info(1, &allocated, &active, &resident, NULL, NULL, &frag_smallbins_bytes); + + if (server.lua_arena != UINT_MAX) { + size_t lua_resident, lua_active, lua_allocated, lua_frag_smallbins_bytes; + zmalloc_get_allocator_info_by_arena(server.lua_arena, 0, &lua_allocated, &lua_active, &lua_resident, &lua_frag_smallbins_bytes); + resident -= lua_resident; + active -= lua_active; + allocated -= lua_allocated; + frag_smallbins_bytes -= lua_frag_smallbins_bytes; + } + + /* Calculate the fragmentation ratio as the proportion of wasted memory in small + * bins (which are defraggable) relative to the total allocated memory (including large bins). + * This is because otherwise, if most of the memory usage is large bins, we may show high percentage, + * despite the fact it's not a lot of memory for the user. */ + float frag_pct = (float)frag_smallbins_bytes / allocated * 100; + float rss_pct = ((float)resident / allocated)*100 - 100; + size_t rss_bytes = resident - allocated; + if(out_frag_bytes) + *out_frag_bytes = frag_smallbins_bytes; + serverLog(LL_DEBUG, + "allocated=%zu, active=%zu, resident=%zu, frag=%.2f%% (%.2f%% rss), frag_bytes=%zu (%zu rss)", + allocated, active, resident, frag_pct, rss_pct, frag_smallbins_bytes, rss_bytes); + return frag_pct; +} +#else +float getAllocatorFragmentation(size_t *out_frag_bytes) { + if (out_frag_bytes) + *out_frag_bytes = SIZE_MAX; + return 99; /* The maximum percentage of fragmentation */ +} +#endif + +/* Defrag scan callback for the pubsub dictionary. */ +void defragPubsubScanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) { + UNUSED(plink); + defragPubSubCtx *ctx = privdata; + kvstore *pubsub_channels = ctx->kvstate.kvs; + robj *newchannel, *channel = dictGetKey(de); + dict *newclients, *clients = dictGetVal(de); + + /* Try to defrag the channel name. */ + serverAssert(channel->refcount == (int)dictSize(clients) + 1); + newchannel = activeDefragStringObEx(channel, dictSize(clients) + 1); + if (newchannel) { + kvstoreDictSetKey(pubsub_channels, ctx->kvstate.slot, (dictEntry*)de, newchannel); + + /* The channel name is shared by the client's pubsub(shard) and server's + * pubsub(shard), after defraging the channel name, we need to update + * the reference in the clients' dictionary. */ + dictIterator di; + dictEntry *clientde; + dictInitIterator(&di, clients); + while((clientde = dictNext(&di)) != NULL) { + client *c = dictGetKey(clientde); + dict *client_channels = ctx->getPubSubChannels(c); + uint64_t hash = dictGetHash(client_channels, newchannel); + dictEntry *pubsub_channel = dictFindByHashAndPtr(client_channels, channel, hash); + serverAssert(pubsub_channel); + dictSetKey(ctx->getPubSubChannels(c), pubsub_channel, newchannel); + } + dictResetIterator(&di); + } + + /* Try to defrag the dictionary of clients that is stored as the value part. */ + if ((newclients = dictDefragTables(clients))) + kvstoreDictSetVal(pubsub_channels, ctx->kvstate.slot, (dictEntry *)de, newclients); + + server.stat_active_defrag_scanned++; +} + +/* returns 0 more work may or may not be needed (see non-zero cursor), + * and 1 if time is up and more work is needed. */ +int defragLaterItem(kvobj *ob, unsigned long *cursor, monotime endtime, int dbid) { + if (ob) { + if (ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST) { + return scanLaterList(ob, cursor, endtime); + } else if (ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT) { + scanLaterSet(ob, cursor); + } else if (ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST) { + scanLaterZset(ob, cursor); + } else if (ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT) { + scanLaterHash(ob, cursor); + } else if (ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM) { + return scanLaterStreamListpacks(ob, cursor, endtime); + } else if (ob->type == OBJ_MODULE) { + robj keyobj; + initStaticStringObject(keyobj, kvobjGetKey(ob)); + return moduleLateDefrag(&keyobj, ob, cursor, endtime, dbid); + } else { + *cursor = 0; /* object type/encoding may have changed since we schedule it for later */ + } + } else { + *cursor = 0; /* object may have been deleted already */ + } + return 0; +} + +static int defragIsRunning(void) { + return (defrag.timeproc_id > 0); +} + +/* A kvstoreHelperPreContinueFn */ +static doneStatus defragLaterStep(void *ctx, monotime endtime) { + defragKeysCtx *defrag_keys_ctx = ctx; + redisDb *db = &server.db[defrag_keys_ctx->dbid]; + int slot = defrag_keys_ctx->kvstate.slot; + size_t oldsize = 0; + + unsigned int iterations = 0; + unsigned long long prev_defragged = server.stat_active_defrag_hits; + unsigned long long prev_scanned = server.stat_active_defrag_scanned; + + while (defrag_keys_ctx->defrag_later && listLength(defrag_keys_ctx->defrag_later) > 0) { + listNode *head = listFirst(defrag_keys_ctx->defrag_later); + sds key = head->value; + dictEntry *de = kvstoreDictFind(defrag_keys_ctx->kvstate.kvs, defrag_keys_ctx->kvstate.slot, key); + kvobj *kv = de ? dictGetKV(de) : NULL; + + long long key_defragged = server.stat_active_defrag_hits; + if (server.memory_tracking_per_slot && kv) + oldsize = kvobjAllocSize(kv); + int timeout = (defragLaterItem(kv, &defrag_keys_ctx->defrag_later_cursor, endtime, defrag_keys_ctx->dbid) == 1); + if (server.memory_tracking_per_slot && kv) + updateSlotAllocSize(db, slot, oldsize, kvobjAllocSize(kv)); + if (key_defragged != server.stat_active_defrag_hits) { + server.stat_active_defrag_key_hits++; + } else { + server.stat_active_defrag_key_misses++; + } + + if (timeout) break; + + if (defrag_keys_ctx->defrag_later_cursor == 0) { + /* the item is finished, move on */ + listDelNode(defrag_keys_ctx->defrag_later, head); + } + + if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || + server.stat_active_defrag_scanned - prev_scanned > 64) { + if (getMonotonicUs() > endtime) break; + iterations = 0; + prev_defragged = server.stat_active_defrag_hits; + prev_scanned = server.stat_active_defrag_scanned; + } + } + + return (!defrag_keys_ctx->defrag_later || listLength(defrag_keys_ctx->defrag_later) == 0) ? DEFRAG_DONE : DEFRAG_NOT_DONE; +} + +#define INTERPOLATE(x, x1, x2, y1, y2) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) ) +#define LIMIT(y, min, max) ((y)<(min)? min: ((y)>(max)? max: (y))) + +/* decide if defrag is needed, and at what CPU effort to invest in it */ +void computeDefragCycles(void) { + size_t frag_bytes; + float frag_pct = getAllocatorFragmentation(&frag_bytes); + /* If we're not already running, and below the threshold, exit. */ + if (!server.active_defrag_running) { + if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes) + return; + } + + /* Calculate the adaptive aggressiveness of the defrag based on the current + * fragmentation and configurations. */ + int cpu_pct = INTERPOLATE(frag_pct, + server.active_defrag_threshold_lower, + server.active_defrag_threshold_upper, + server.active_defrag_cycle_min, + server.active_defrag_cycle_max); + cpu_pct *= defrag.decay_rate; + cpu_pct = LIMIT(cpu_pct, + server.active_defrag_cycle_min, + server.active_defrag_cycle_max); + + /* Normally we allow increasing the aggressiveness during a scan, but don't + * reduce it, since we should not lower the aggressiveness when fragmentation + * drops. But when a configuration is made, we should reconsider it. */ + if (cpu_pct > server.active_defrag_running || + server.active_defrag_configuration_changed) + { + server.active_defrag_configuration_changed = 0; + if (defragIsRunning()) { + serverLog(LL_VERBOSE, "Changing active defrag CPU, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", + frag_pct, frag_bytes, cpu_pct); + } else { + serverLog(LL_VERBOSE, + "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", + frag_pct, frag_bytes, cpu_pct); + } + server.active_defrag_running = cpu_pct; + } +} + +/* This helper function handles most of the work for iterating over a kvstore. 'privdata', if + * provided, MUST begin with 'kvstoreIterState' and this part is automatically updated by this + * function during the iteration. */ +static doneStatus defragStageKvstoreHelper(monotime endtime, + void *ctx, + dictScanFunction scan_fn, + kvstoreHelperPreContinueFn precontinue_fn, + dictDefragFunctions *defragfns) +{ + unsigned int iterations = 0; + unsigned long long prev_defragged = server.stat_active_defrag_hits; + unsigned long long prev_scanned = server.stat_active_defrag_scanned; + kvstoreIterState *state = (kvstoreIterState*)ctx; + + if (state->slot == ITER_SLOT_DEFRAG_LUT) { + /* Before we start scanning the kvstore, handle the main structures */ + do { + state->cursor = kvstoreDictLUTDefrag(state->kvs, state->cursor, dictDefragTables); + if (getMonotonicUs() >= endtime) return DEFRAG_NOT_DONE; + } while (state->cursor != 0); + state->slot = ITER_SLOT_UNASSIGNED; + } + + while (1) { + if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || server.stat_active_defrag_scanned - prev_scanned > 64) { + if (getMonotonicUs() >= endtime) break; + iterations = 0; + prev_defragged = server.stat_active_defrag_hits; + prev_scanned = server.stat_active_defrag_scanned; + } + + if (precontinue_fn) { + if (precontinue_fn(ctx, endtime) == DEFRAG_NOT_DONE) return DEFRAG_NOT_DONE; + } + + if (!state->cursor) { + /* If there's no cursor, we're ready to begin a new kvstore slot. */ + if (state->slot == ITER_SLOT_UNASSIGNED) { + state->slot = kvstoreGetFirstNonEmptyDictIndex(state->kvs); + } else { + state->slot = kvstoreGetNextNonEmptyDictIndex(state->kvs, state->slot); + } + + if (state->slot == ITER_SLOT_UNASSIGNED) return DEFRAG_DONE; + } + + /* Whatever privdata's actual type, this function requires that it begins with kvstoreIterState. */ + state->cursor = kvstoreDictScanDefrag(state->kvs, state->slot, state->cursor, + scan_fn, defragfns, ctx); + } + + return DEFRAG_NOT_DONE; +} + +static doneStatus defragStageDbKeys(void *ctx, monotime endtime) { + defragKeysCtx *defrag_keys_ctx = ctx; + redisDb *db = &server.db[defrag_keys_ctx->dbid]; + if (db->keys != defrag_keys_ctx->kvstate.kvs) { + /* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */ + return DEFRAG_DONE; + } + + /* Note: for DB keys, we use the start/finish callback to fix an expires table entry if + * the main DB entry has been moved. */ + static dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, /* Handled by dbKeysScanCallback */ + .defragVal = NULL, /* Handled by dbKeysScanCallback */ + }; + + return defragStageKvstoreHelper(endtime, ctx, + dbKeysScanCallback, defragLaterStep, &defragfns); +} + +static doneStatus defragStageExpiresKvstore(void *ctx, monotime endtime) { + defragKeysCtx *defrag_keys_ctx = ctx; + redisDb *db = &server.db[defrag_keys_ctx->dbid]; + if (db->expires != defrag_keys_ctx->kvstate.kvs) { + /* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */ + return DEFRAG_DONE; + } + + static dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, /* Not needed for expires (just a ref) */ + .defragVal = NULL, /* Not needed for expires (no value) */ + }; + return defragStageKvstoreHelper(endtime, ctx, + scanCallbackCountScanned, NULL, &defragfns); +} + +/* Defrag (hash) object with subexpiry and update its reference in the DB keys. */ +void *activeDefragSubexpiresOB(void *ptr, void *privdata) { + redisDb *db = privdata; + dictEntryLink link, exlink = NULL; + kvobj *newkv, *kv = ptr; + sds keystr = kvobjGetKey(kv); + unsigned int slot = calculateKeySlot(keystr); + + serverAssert(kv->type == OBJ_HASH); /* Currently relevant only for hashes */ + + long long expire = kvobjGetExpire(kv); + /* We can't search in db->expires for that KV after we've released + * the pointer it holds, since it won't be able to do the string + * compare. Search it before, if needed. */ + if (expire != -1) { + exlink = kvstoreDictFindLink(db->expires, slot, keystr, NULL); + serverAssert(exlink != NULL); + } + + if ((newkv = activeDefragKvobj(kv, 1))) { + /* Update its reference in the DB keys. */ + link = kvstoreDictFindLink(db->keys, slot, keystr, NULL); + serverAssert(link != NULL); + kvstoreDictSetAtLink(db->keys, slot, newkv, &link, 0); + if (expire != -1) + kvstoreDictSetAtLink(db->expires, slot, newkv, &exlink, 0); + activeDefragFree(kvobjGetAllocPtr(kv)); + } + return newkv; +} + +static doneStatus defragStageSubexpires(void *ctx, monotime endtime) { + unsigned int iterations = 0; + unsigned long long prev_defragged = server.stat_active_defrag_hits; + unsigned long long prev_scanned = server.stat_active_defrag_scanned; + defragSubexpiresCtx *subctx = ctx; + redisDb *db = &server.db[subctx->dbid]; + estore *subexpires = db->subexpires; + + /* If estore changed (flushdb, swapdb, etc.), Just complete the stage. */ + if (db->subexpires != subctx->subexpires) { + return DEFRAG_DONE; + } + + ebDefragFunctions eb_defragfns = { + .defragAlloc = activeDefragAlloc, + .defragItem = activeDefragSubexpiresOB + }; + + while (1) { + if (++iterations > 16 || + server.stat_active_defrag_hits - prev_defragged > 512 || + server.stat_active_defrag_scanned - prev_scanned > 64) + { + if (getMonotonicUs() >= endtime) break; + iterations = 0; + prev_defragged = server.stat_active_defrag_hits; + prev_scanned = server.stat_active_defrag_scanned; + } + + /* If there's no cursor, we're ready to begin a new estore slot. */ + if (!subctx->cursor) { + if (subctx->slot == ITER_SLOT_UNASSIGNED) { + subctx->slot = estoreGetFirstNonEmptyBucket(subexpires); + } else { + subctx->slot = estoreGetNextNonEmptyBucket(subexpires, subctx->slot); + } + + if (subctx->slot == ITER_SLOT_UNASSIGNED) return DEFRAG_DONE; + } + + /* Get the ebuckets for the current slot and scan it */ + ebuckets *bucket = estoreGetBuckets(subexpires, subctx->slot); + if (!ebScanDefrag(bucket, &subexpiresBucketsType, &subctx->cursor, &eb_defragfns, db)) + subctx->cursor = 0; /* Reset cursor to move to next slot */ + } + + return DEFRAG_NOT_DONE; +} + +static doneStatus defragStagePubsubKvstore(void *ctx, monotime endtime) { + static dictDefragFunctions defragfns = { + .defragAlloc = activeDefragAlloc, + .defragKey = NULL, /* Handled by defragPubsubScanCallback */ + .defragVal = NULL, /* Not needed for expires (no value) */ + }; + + return defragStageKvstoreHelper(endtime, ctx, + defragPubsubScanCallback, NULL, &defragfns); +} + +static doneStatus defragLuaScripts(void *ctx, monotime endtime) { + UNUSED(endtime); + UNUSED(ctx); + activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); + return DEFRAG_DONE; +} + +/* Handles defragmentation of module global data. This is a stage function + * that gets called periodically during the active defragmentation process. */ +static doneStatus defragModuleGlobals(void *ctx, monotime endtime) { + defragModuleCtx *defrag_module_ctx = ctx; + + RedisModule *module = moduleGetHandleByName(defrag_module_ctx->module_name); + if (!module) { + /* Module has been unloaded, nothing to defrag. */ + return DEFRAG_DONE; + } + /* Interval shouldn't exceed 1 hour */ + serverAssert(!endtime || llabs((long long)endtime - (long long)getMonotonicUs()) < 60*60*1000*1000LL); + + /* Call appropriate version of module's defrag callback: + * 1. Version 2 (defrag_cb_2): Supports incremental defrag and returns whether more work is needed + * 2. Version 1 (defrag_cb): Legacy version, performs all work in one call. + * Note: V1 doesn't support incremental defragmentation, may block for longer periods. */ + RedisModuleDefragCtx defrag_ctx = { endtime, &defrag_module_ctx->cursor, NULL, -1, -1, -1 }; + if (module->defrag_cb_2) { + return module->defrag_cb_2(&defrag_ctx) ? DEFRAG_NOT_DONE : DEFRAG_DONE; + } else if (module->defrag_cb) { + module->defrag_cb(&defrag_ctx); + return DEFRAG_DONE; + } else { + redis_unreachable(); + } +} + +static void freeDefragKeysContext(void *ctx) { + defragKeysCtx *defrag_keys_ctx = ctx; + if (defrag_keys_ctx->defrag_later) { + listRelease(defrag_keys_ctx->defrag_later); + } + zfree(defrag_keys_ctx); +} + +static void freeDefragModelContext(void *ctx) { + defragModuleCtx *defrag_model_ctx = ctx; + sdsfree(defrag_model_ctx->module_name); + zfree(defrag_model_ctx); +} + +static void freeDefragContext(void *ptr) { + StageDescriptor *stage = ptr; + if (stage->ctx_free_fn) + stage->ctx_free_fn(stage->ctx); + zfree(stage); +} + +static void addDefragStage(defragStageFn stage_fn, defragStageContextFreeFn ctx_free_fn, void *ctx) { + StageDescriptor *stage = zmalloc(sizeof(StageDescriptor)); + stage->stage_fn = stage_fn; + stage->ctx_free_fn = ctx_free_fn; + stage->ctx = ctx; + listAddNodeTail(defrag.remaining_stages, stage); +} + +/* Updates the defrag decay rate based on the observed effectiveness of the defrag process. + * The decay rate is used to gradually slow down defrag when it's not being effective. */ +static void updateDefragDecayRate(float frag_pct) { + long long last_hits = server.stat_active_defrag_hits - defrag.start_defrag_hits; + long long last_misses = server.stat_active_defrag_misses - defrag.start_defrag_misses; + float last_frag_pct_change = defrag.start_frag_pct - frag_pct; + /* When defragmentation efficiency is low, we gradually reduce the + * speed for the next cycle to avoid CPU waste. However, in the + * following two cases, we keep the normal speed: + * 1) If the fragmentation percentage has increased or decreased by more than 2%. + * 2) If the fragmentation percentage decrease is small, but hits are above 1%, + * we still keep the normal speed. */ + if (fabs(last_frag_pct_change) > 2 || + (last_frag_pct_change < 0 && last_hits >= (last_hits + last_misses) * 0.01)) + { + defrag.decay_rate = 1.0f; + } else { + defrag.decay_rate *= 0.9; + } +} + +/* Called at the end of a complete defrag cycle, or when defrag is terminated */ +static void endDefragCycle(int normal_termination) { + if (normal_termination) { + /* For normal termination, we expect... */ + serverAssert(!defrag.current_stage); + serverAssert(listLength(defrag.remaining_stages) == 0); + } else { + /* Defrag is being terminated abnormally */ + aeDeleteTimeEvent(server.el, defrag.timeproc_id); + + if (defrag.current_stage) { + listDelNode(defrag.remaining_stages, defrag.current_stage); + defrag.current_stage = NULL; + } + } + defrag.timeproc_id = AE_DELETED_EVENT_ID; + + listRelease(defrag.remaining_stages); + defrag.remaining_stages = NULL; + + size_t frag_bytes; + float frag_pct = getAllocatorFragmentation(&frag_bytes); + serverLog(LL_VERBOSE, "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu", + (int)elapsedMs(defrag.start_cycle), (int)(server.stat_active_defrag_hits - defrag.start_defrag_hits), + frag_pct, frag_bytes); + + server.stat_total_active_defrag_time += elapsedUs(server.stat_last_active_defrag_time); + server.stat_last_active_defrag_time = 0; + server.active_defrag_running = 0; + + updateDefragDecayRate(frag_pct); + moduleDefragEnd(); + + /* Immediately check to see if we should start another defrag cycle. */ + activeDefragCycle(); +} + +/* Must be called at the start of the timeProc as it measures the delay from the end of the previous + * timeProc invocation when performing the computation. */ +static int computeDefragCycleUs(void) { + long dutyCycleUs; + + int targetCpuPercent = server.active_defrag_running; + serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100); + + static int prevCpuPercent = 0; /* STATIC - this persists */ + if (targetCpuPercent != prevCpuPercent) { + /* If the targetCpuPercent changes, the value might be different from when the last wait + * time was computed. In this case, don't consider wait time. (This is really only an + * issue in crazy tests that dramatically increase CPU while defrag is running.) */ + defrag.timeproc_end_time = 0; + prevCpuPercent = targetCpuPercent; + } + + /* Given when the last duty cycle ended, compute time needed to achieve the desired percentage. */ + if (defrag.timeproc_end_time == 0) { + /* Either the first call to the timeProc, or we were paused for some reason. */ + defrag.timeproc_overage_us = 0; + dutyCycleUs = DEFRAG_CYCLE_US; + } else { + long waitedUs = getMonotonicUs() - defrag.timeproc_end_time; + /* Given the elapsed wait time between calls, compute the necessary duty time needed to + * achieve the desired CPU percentage. + * With: D = duty time, W = wait time, P = percent + * Solve: D P + * ----- = ----- + * D + W 100 + * Solving for D: + * D = P * W / (100 - P) + * + * Note that dutyCycleUs addresses starvation. If the wait time was long, we will compensate + * with a proportionately long duty-cycle. This won't significantly affect perceived + * latency, because clients are already being impacted by the long cycle time which caused + * the starvation of the timer. */ + dutyCycleUs = targetCpuPercent * waitedUs / (100 - targetCpuPercent); + + /* Also adjust for any accumulated overage. */ + dutyCycleUs -= defrag.timeproc_overage_us; + defrag.timeproc_overage_us = 0; + + if (dutyCycleUs < DEFRAG_CYCLE_US) { + /* We never reduce our cycle time, that would increase overhead. Instead, we track this + * as part of the overage, and increase wait time between cycles. */ + defrag.timeproc_overage_us = DEFRAG_CYCLE_US - dutyCycleUs; + dutyCycleUs = DEFRAG_CYCLE_US; + } else if (dutyCycleUs > DEFRAG_CYCLE_US * 10) { + /* Add a time limit for the defrag duty cycle to prevent excessive latency. + * When latency is already high (indicated by a long time between calls), + * we don't want to make it worse by running defrag for too long. */ + dutyCycleUs = DEFRAG_CYCLE_US * 10; + } + } + return dutyCycleUs; +} + +/* Must be called at the end of the timeProc as it records the timeproc_end_time for use in the next + * computeDefragCycleUs computation. */ +static int computeDelayMs(monotime intendedEndtime) { + defrag.timeproc_end_time = getMonotonicUs(); + long overage = defrag.timeproc_end_time - intendedEndtime; + defrag.timeproc_overage_us += overage; /* track over/under desired CPU */ + /* Allow negative overage (underage) to count against existing overage, but don't allow + * underage (from short stages) to be accumulated. */ + if (defrag.timeproc_overage_us < 0) defrag.timeproc_overage_us = 0; + + int targetCpuPercent = server.active_defrag_running; + serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100); + + /* Given the desired duty cycle, what inter-cycle delay do we need to achieve that? */ + /* We want to achieve a specific CPU percent. To do that, we can't use a skewed computation. */ + /* Example, if we run for 1ms and delay 10ms, that's NOT 10%, because the total cycle time is 11ms. */ + /* Instead, if we rum for 1ms, our total time should be 10ms. So the delay is only 9ms. */ + long totalCycleTimeUs = DEFRAG_CYCLE_US * 100 / targetCpuPercent; + long delayUs = totalCycleTimeUs - DEFRAG_CYCLE_US; + /* Only increase delay by the fraction of the overage that would be non-duty-cycle */ + delayUs += defrag.timeproc_overage_us * (100 - targetCpuPercent) / 100; + if (delayUs < 0) delayUs = 0; + long delayMs = delayUs / 1000; /* round down */ + return delayMs; +} + +/* An independent time proc for defrag. While defrag is running, this is called much more often + * than the server cron. Frequent short calls provides low latency impact. */ +static int activeDefragTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData) { + UNUSED(eventLoop); + UNUSED(id); + UNUSED(clientData); + + /* This timer shouldn't be registered unless there's work to do. */ + serverAssert(defrag.current_stage || listLength(defrag.remaining_stages) > 0); + + if (!server.active_defrag_enabled) { + /* Defrag has been disabled while running */ + endDefragCycle(0); + return AE_NOMORE; + } + + if (hasActiveChildProcess()) { + /* If there's a child process, pause the defrag, polling until the child completes. */ + defrag.timeproc_end_time = 0; /* prevent starvation recovery */ + return 100; + } + + monotime starttime = getMonotonicUs(); + int dutyCycleUs = computeDefragCycleUs(); +#if defined(DEBUG_DEFRAG_FULLY) + dutyCycleUs = 30*1000*1000LL; /* 30 seconds */ +#endif + monotime endtime = starttime + dutyCycleUs; + int haveMoreWork = 1; + + mstime_t latency; + latencyStartMonitor(latency); + + do { + if (!defrag.current_stage) { + defrag.current_stage = listFirst(defrag.remaining_stages); + } + + StageDescriptor *stage = listNodeValue(defrag.current_stage); + doneStatus status = stage->stage_fn(stage->ctx, endtime); + if (status == DEFRAG_DONE) { + listDelNode(defrag.remaining_stages, defrag.current_stage); + defrag.current_stage = NULL; + } + + haveMoreWork = (defrag.current_stage || listLength(defrag.remaining_stages) > 0); + /* If we've completed a stage early, and still have a standard time allotment remaining, + * we'll start another stage. This can happen when defrag is running infrequently, and + * starvation protection has increased the duty-cycle. */ + } while (haveMoreWork && getMonotonicUs() <= endtime - DEFRAG_CYCLE_US); + + latencyEndMonitor(latency); + latencyAddSampleIfNeeded("active-defrag-cycle", latency); + + if (haveMoreWork) { + return computeDelayMs(endtime); + } else { + endDefragCycle(1); + return AE_NOMORE; /* Ends the timer proc */ + } +} + +/* During long running scripts, or while loading, there is a periodic function for handling other + * actions. This interface allows defrag to continue running, avoiding a single long defrag step + * after the long operation completes. */ +void defragWhileBlocked(void) { + /* This is called infrequently, while timers are not active. We might need to start defrag. */ + if (!defragIsRunning()) activeDefragCycle(); + + if (!defragIsRunning()) return; + + /* Save off the timeproc_id. If we have a normal termination, it will be cleared. */ + long long timeproc_id = defrag.timeproc_id; + + /* Simulate a single call of the timer proc */ + long long reschedule_delay = activeDefragTimeProc(NULL, 0, NULL); + if (reschedule_delay == AE_NOMORE) { + /* If it's done, deregister the timer */ + aeDeleteTimeEvent(server.el, timeproc_id); + } + /* Otherwise, just ignore the reschedule_delay, the timer will pop the next time that the + * event loop can process timers again. */ +} + +static void beginDefragCycle(void) { + serverAssert(!defragIsRunning()); + + moduleDefragStart(); + + serverAssert(defrag.remaining_stages == NULL); + defrag.remaining_stages = listCreate(); + listSetFreeMethod(defrag.remaining_stages, freeDefragContext); + + for (int dbid = 0; dbid < server.dbnum; dbid++) { + redisDb *db = &server.db[dbid]; + + /* Add stage for keys. */ + defragKeysCtx *defrag_keys_ctx = zcalloc(sizeof(defragKeysCtx)); + defrag_keys_ctx->kvstate = INIT_KVSTORE_STATE(db->keys); + defrag_keys_ctx->dbid = dbid; + addDefragStage(defragStageDbKeys, freeDefragKeysContext, defrag_keys_ctx); + + /* Add stage for expires. */ + defragKeysCtx *defrag_expires_ctx = zcalloc(sizeof(defragKeysCtx)); + defrag_expires_ctx->kvstate = INIT_KVSTORE_STATE(db->expires); + defrag_expires_ctx->dbid = dbid; + addDefragStage(defragStageExpiresKvstore, freeDefragKeysContext, defrag_expires_ctx); + + /* Add stage for subexpires. */ + defragSubexpiresCtx *defrag_subexpires_ctx = zcalloc(sizeof(defragSubexpiresCtx)); + defrag_subexpires_ctx->subexpires = db->subexpires; + defrag_subexpires_ctx->slot = ITER_SLOT_UNASSIGNED; + defrag_subexpires_ctx->cursor = 0; + defrag_subexpires_ctx->dbid = dbid; + addDefragStage(defragStageSubexpires, zfree, defrag_subexpires_ctx); + } + + /* Add stage for pubsub channels. */ + defragPubSubCtx *defrag_pubsub_ctx = zmalloc(sizeof(defragPubSubCtx)); + defrag_pubsub_ctx->kvstate = INIT_KVSTORE_STATE(server.pubsub_channels); + defrag_pubsub_ctx->getPubSubChannels = getClientPubSubChannels; + addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsub_ctx); + + /* Add stage for pubsubshard channels. */ + defragPubSubCtx *defrag_pubsubshard_ctx = zmalloc(sizeof(defragPubSubCtx)); + defrag_pubsubshard_ctx->kvstate = INIT_KVSTORE_STATE(server.pubsubshard_channels); + defrag_pubsubshard_ctx->getPubSubChannels = getClientPubSubShardChannels; + addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsubshard_ctx); + + addDefragStage(defragLuaScripts, NULL, NULL); + + /* Add stages for modules. */ + dictIterator di; + dictEntry *de; + dictInitIterator(&di, modules); + while ((de = dictNext(&di)) != NULL) { + struct RedisModule *module = dictGetVal(de); + if (module->defrag_cb || module->defrag_cb_2) { + defragModuleCtx *ctx = zmalloc(sizeof(defragModuleCtx)); + ctx->cursor = 0; + ctx->module_name = sdsnew(module->name); + addDefragStage(defragModuleGlobals, freeDefragModelContext, ctx); + } + } + dictResetIterator(&di); + + defrag.current_stage = NULL; + defrag.start_cycle = getMonotonicUs(); + defrag.start_defrag_hits = server.stat_active_defrag_hits; + defrag.start_defrag_misses = server.stat_active_defrag_misses; + defrag.start_frag_pct = getAllocatorFragmentation(NULL); + defrag.timeproc_end_time = 0; + defrag.timeproc_overage_us = 0; + defrag.timeproc_id = aeCreateTimeEvent(server.el, 0, activeDefragTimeProc, NULL, NULL); + + elapsedStart(&server.stat_last_active_defrag_time); +} + +void activeDefragCycle(void) { + if (!server.active_defrag_enabled) return; + + /* Defrag gets paused while a child process is active. So there's no point in starting a new + * cycle or adjusting the CPU percentage for an existing cycle. */ + if (hasActiveChildProcess()) return; + + computeDefragCycles(); + + if (server.active_defrag_running > 0 && !defragIsRunning()) beginDefragCycle(); +} + +#else /* HAVE_DEFRAG */ + +void activeDefragCycle(void) { + /* Not implemented yet. */ +} + +void *activeDefragAlloc(void *ptr) { + UNUSED(ptr); + return NULL; +} + +void *activeDefragAllocRaw(size_t size) { + /* fallback to regular allocation */ + return zmalloc(size); +} + +void activeDefragFreeRaw(void *ptr) { + /* fallback to regular free */ + zfree(ptr); +} + +robj *activeDefragStringOb(robj *ob) { + UNUSED(ob); + return NULL; +} + +void defragWhileBlocked(void) { +} + +#endif |
