diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
| commit | dcacc00e3750300617ba6e16eb346713f91a783a (patch) | |
| tree | 38e2d4fb5ed9d119711d4295c6eda4b014af73fd /examples/redis-unstable/src/defrag.c | |
| parent | 58dac10aeb8f5a041c46bddbeaf4c7966a99b998 (diff) | |
| download | crep-dcacc00e3750300617ba6e16eb346713f91a783a.tar.gz | |
Remove testing data
Diffstat (limited to 'examples/redis-unstable/src/defrag.c')
| -rw-r--r-- | examples/redis-unstable/src/defrag.c | 1985 |
1 files changed, 0 insertions, 1985 deletions
diff --git a/examples/redis-unstable/src/defrag.c b/examples/redis-unstable/src/defrag.c deleted file mode 100644 index f0bff15..0000000 --- a/examples/redis-unstable/src/defrag.c +++ /dev/null @@ -1,1985 +0,0 @@ -/* - * Active memory defragmentation - * Try to find key / value allocations that need to be re-allocated in order - * to reduce external fragmentation. - * We do that by scanning the keyspace and for each pointer we have, we can try to - * ask the allocator if moving it to a new address will help reduce fragmentation. - * - * Copyright (c) 2020-Present, Redis Ltd. - * All rights reserved. - * - * Copyright (c) 2024-present, Valkey contributors. - * All rights reserved. - * - * Licensed under your choice of (a) the Redis Source Available License 2.0 - * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the - * GNU Affero General Public License v3 (AGPLv3). - * - * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. - */ - -#include "server.h" -#include <stddef.h> -#include <math.h> - -#ifdef HAVE_DEFRAG - -#define DEFRAG_CYCLE_US 500 /* Standard duration of defrag cycle (in microseconds) */ - -typedef enum { DEFRAG_NOT_DONE = 0, - DEFRAG_DONE = 1 } doneStatus; - -/* - * Defragmentation is performed in stages. Each stage is serviced by a stage function - * (defragStageFn). The stage function is passed a context (void*) to defrag. The contents of that - * context are unique to the particular stage - and may even be NULL for some stage functions. The - * same stage function can be used multiple times (for different stages) each having a different - * context. - * - * Parameters: - * endtime - This is the monotonic time that the function should end and return. This ensures - * a bounded latency due to defrag. - * ctx - A pointer to context which is unique to the stage function. - * - * Returns: - * - DEFRAG_DONE if the stage is complete - * - DEFRAG_NOT_DONE if there is more work to do - */ -typedef doneStatus (*defragStageFn)(void *ctx, monotime endtime); - -/* Function pointer type for freeing context in defragmentation stages. */ -typedef void (*defragStageContextFreeFn)(void *ctx); -typedef struct { - defragStageFn stage_fn; /* The function to be invoked for the stage */ - defragStageContextFreeFn ctx_free_fn; /* Function to free the context */ - void *ctx; /* Context, unique to the stage function */ -} StageDescriptor; - -/* Globals needed for the main defrag processing logic. - * Doesn't include variables specific to a stage or type of data. */ -struct DefragContext { - monotime start_cycle; /* Time of beginning of defrag cycle */ - long long start_defrag_hits; /* server.stat_active_defrag_hits captured at beginning of cycle */ - long long start_defrag_misses; /* server.stat_active_defrag_misses captured at beginning of cycle */ - float start_frag_pct; /* Fragmention percent of beginning of defrag cycle */ - float decay_rate; /* Defrag speed decay rate */ - - list *remaining_stages; /* List of stages which remain to be processed */ - listNode *current_stage; /* The list node of stage that's currently being processed */ - - long long timeproc_id; /* Eventloop ID of the timerproc (or AE_DELETED_EVENT_ID) */ - monotime timeproc_end_time; /* Ending time of previous timerproc execution */ - long timeproc_overage_us; /* A correction value if over target CPU percent */ -}; -static struct DefragContext defrag = {0, 0, 0, 0, 1.0f}; - -#define ITER_SLOT_DEFRAG_LUT (-2) -#define ITER_SLOT_UNASSIGNED (-1) - -/* There are a number of stages which process a kvstore. To simplify this, a stage helper function - * `defragStageKvstoreHelper()` is defined. This function aids in iterating over the kvstore. It - * uses these definitions. - */ -/* State of the kvstore helper. The context passed to the kvstore helper MUST BEGIN - * with a kvstoreIterState (or be passed as NULL). */ -typedef struct { - kvstore *kvs; - int slot; /* Consider defines ITER_SLOT_XXX for special values. */ - unsigned long cursor; -} kvstoreIterState; -#define INIT_KVSTORE_STATE(kvs) ((kvstoreIterState){(kvs), ITER_SLOT_DEFRAG_LUT, 0}) - -/* The kvstore helper uses this function to perform tasks before continuing the iteration. For the - * main dictionary, large items are set aside and processed by this function before continuing with - * iteration over the kvstore. - * endtime - This is the monotonic time that the function should end and return. - * ctx - Context for functions invoked by the helper. If provided in the call to - * `defragStageKvstoreHelper()`, the `kvstoreIterState` portion (at the beginning) - * will be updated with the current kvstore iteration status. - * - * Returns: - * - DEFRAG_DONE if the pre-continue work is complete - * - DEFRAG_NOT_DONE if there is more work to do - */ -typedef doneStatus (*kvstoreHelperPreContinueFn)(void *ctx, monotime endtime); - -typedef struct { - kvstoreIterState kvstate; - int dbid; - - /* When scanning a main kvstore, large elements are queued for later handling rather than - * causing a large latency spike while processing a hash table bucket. This list is only used - * for stage: "defragStageDbKeys". It will only contain values for the current kvstore being - * defragged. - * Note that this is a list of key names. It's possible that the key may be deleted or modified - * before "later" and we will search by key name to find the entry when we defrag the item later. */ - list *defrag_later; - unsigned long defrag_later_cursor; -} defragKeysCtx; -static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); - -/* Context for subexpires */ -typedef struct { - estore *subexpires; - int slot; /* Consider defines ITER_SLOT_XXX for special values. */ - int dbid; - unsigned long cursor; -} defragSubexpiresCtx; - -/* Context for pubsub kvstores */ -typedef dict *(*getClientChannelsFn)(client *); -typedef struct { - kvstoreIterState kvstate; - getClientChannelsFn getPubSubChannels; -} defragPubSubCtx; -static_assert(offsetof(defragPubSubCtx, kvstate) == 0, "defragStageKvstoreHelper requires this"); - -typedef struct { - sds module_name; - unsigned long cursor; -} defragModuleCtx; - -/* this method was added to jemalloc in order to help us understand which - * pointers are worthwhile moving and which aren't */ -int je_get_defrag_hint(void* ptr); - -#if !defined(DEBUG_DEFRAG_FORCE) -/* Defrag helper for generic allocations without freeing old pointer. - * - * Note: The caller is responsible for freeing the old pointer if this function - * returns a non-NULL value. */ -void* activeDefragAllocWithoutFree(void *ptr) { - size_t size; - void *newptr; - if(!je_get_defrag_hint(ptr)) { - server.stat_active_defrag_misses++; - return NULL; - } - /* move this allocation to a new allocation. - * make sure not to use the thread cache. so that we don't get back the same - * pointers we try to free */ - size = zmalloc_usable_size(ptr); - newptr = zmalloc_no_tcache(size); - memcpy(newptr, ptr, size); - server.stat_active_defrag_hits++; - return newptr; -} - -void activeDefragFree(void *ptr) { - zfree_no_tcache(ptr); -} - -/* Defrag helper for generic allocations. - * - * returns NULL in case the allocation wasn't moved. - * when it returns a non-null value, the old pointer was already released - * and should NOT be accessed. */ -void* activeDefragAlloc(void *ptr) { - void *newptr = activeDefragAllocWithoutFree(ptr); - if (newptr) - activeDefragFree(ptr); - return newptr; -} - -/* Raw memory allocation for defrag, avoid using tcache. */ -void *activeDefragAllocRaw(size_t size) { - return zmalloc_no_tcache(size); -} - -/* Raw memory free for defrag, avoid using tcache. */ -void activeDefragFreeRaw(void *ptr) { - activeDefragFree(ptr); - server.stat_active_defrag_hits++; -} -#else -void *activeDefragAllocWithoutFree(void *ptr) { - size_t size; - void *newptr; - size = zmalloc_usable_size(ptr); - newptr = zmalloc(size); - memcpy(newptr, ptr, size); - server.stat_active_defrag_hits++; - return newptr; -} - -void activeDefragFree(void *ptr) { - zfree(ptr); -} - -void *activeDefragAlloc(void *ptr) { - void *newptr = activeDefragAllocWithoutFree(ptr); - if (newptr) - activeDefragFree(ptr); - return newptr; -} - -void *activeDefragAllocRaw(size_t size) { - return zmalloc(size); -} - -void activeDefragFreeRaw(void *ptr) { - zfree(ptr); - server.stat_active_defrag_hits++; -} -#endif - -/*Defrag helper for sds strings - * - * returns NULL in case the allocation wasn't moved. - * when it returns a non-null value, the old pointer was already released - * and should NOT be accessed. */ -sds activeDefragSds(sds sdsptr) { - void* ptr = sdsAllocPtr(sdsptr); - void* newptr = activeDefragAlloc(ptr); - if (newptr) { - size_t offset = sdsptr - (char*)ptr; - sdsptr = (char*)newptr + offset; - return sdsptr; - } - return NULL; -} - -/* Defrag helper for hfield (entry) strings - * - * returns NULL in case the allocation wasn't moved. - * when it returns a non-null value, the old pointer was already released - * and should NOT be accessed. */ -Entry *activeDefragEntry(Entry *entry) { - Entry *ret = NULL; - - /* First, defrag the entry allocation itself */ - void *ptr = entryGetAllocPtr(entry); - void *newptr = activeDefragAlloc(ptr); - if (newptr) { - size_t offset = (char*)entry - (char*)ptr; - entry = (Entry *)((char*)newptr + offset); - ret = entry; - } - - /* Then defrag the value if it's not embedded (using the potentially new entry) */ - sds *valuePtr = entryGetValuePtrRef(entry); - if (valuePtr) { - sds new_value = activeDefragSds(*valuePtr); - if (new_value) *valuePtr = new_value; - } - - return ret; -} - -/* Defrag helper for hfield strings and update the reference in the dict. - * - * returns NULL in case the allocation wasn't moved. - * when it returns a non-null value, the old pointer was already released - * and should NOT be accessed. */ -void *activeDefragHfieldAndUpdateRef(void *ptr, void *privdata) { - dict *d = privdata; - dictEntryLink link; - - /* Before the key is released, obtain the link to - * ensure we can safely access and update the key. */ - link = dictFindLink(d, ptr, NULL); - serverAssert(link); - - Entry *newEntry = activeDefragEntry(ptr); - if (newEntry) - dictSetKeyAtLink(d, newEntry, &link, 0); - return newEntry; -} - -/* Defrag helper for robj and/or string objects with expected refcount. - * - * Like activeDefragStringOb, but it requires the caller to pass in the expected - * reference count. In some cases, the caller needs to update a robj whose - * reference count is not 1, in these cases, the caller must explicitly pass - * in the reference count, otherwise defragmentation will not be performed. - * Note that the caller is responsible for updating any other references to the robj. */ -robj *activeDefragStringObEx(robj* ob, int expected_refcount) { - robj *ret = NULL; - if (ob->refcount!=expected_refcount) - return NULL; - - /* try to defrag robj (only if not an EMBSTR type (handled below). */ - if (ob->type!=OBJ_STRING || ob->encoding!=OBJ_ENCODING_EMBSTR) { - if ((ret = activeDefragAlloc(ob))) { - ob = ret; - } - } - - /* try to defrag string object */ - if (ob->type == OBJ_STRING) { - if(ob->encoding==OBJ_ENCODING_RAW) { - sds newsds = activeDefragSds((sds)ob->ptr); - if (newsds) { - ob->ptr = newsds; - } - } else if (ob->encoding==OBJ_ENCODING_EMBSTR) { - /* The sds is embedded in the object allocation, calculate the - * offset and update the pointer in the new allocation. */ - long ofs = (intptr_t)ob->ptr - (intptr_t)ob; - if ((ret = activeDefragAlloc(ob))) { - ret->ptr = (void*)((intptr_t)ret + ofs); - } - } else if (ob->encoding!=OBJ_ENCODING_INT) { - serverPanic("Unknown string encoding"); - } - } - return ret; -} - -/* Defrag helper for robj and/or string objects - * - * returns NULL in case the allocation wasn't moved. - * when it returns a non-null value, the old pointer was already released - * and should NOT be accessed. */ -robj *activeDefragStringOb(robj* ob) { - return activeDefragStringObEx(ob, 1); -} - -/* Defrag helper for lua scripts - * - * returns NULL in case the allocation wasn't moved. - * when it returns a non-null value, the old pointer was already released - * and should NOT be accessed. */ -luaScript *activeDefragLuaScript(luaScript *script) { - luaScript *ret = NULL; - - /* try to defrag script struct */ - if ((ret = activeDefragAlloc(script))) { - script = ret; - } - - /* try to defrag actual script object */ - robj *ob = activeDefragStringOb(script->body); - if (ob) script->body = ob; - - return ret; -} - -/* Defrag helper for dict main allocations (dict struct, and hash tables). - * Receives a pointer to the dict* and return a new dict* when the dict - * struct itself was moved. - * - * Returns NULL in case the allocation wasn't moved. - * When it returns a non-null value, the old pointer was already released - * and should NOT be accessed. */ -dict *dictDefragTables(dict *d) { - dict *ret = NULL; - dictEntry **newtable; - /* handle the dict struct */ - if ((ret = activeDefragAlloc(d))) - d = ret; - /* handle the first hash table */ - if (!d->ht_table[0]) return ret; /* created but unused */ - newtable = activeDefragAlloc(d->ht_table[0]); - if (newtable) - d->ht_table[0] = newtable; - /* handle the second hash table */ - if (d->ht_table[1]) { - newtable = activeDefragAlloc(d->ht_table[1]); - if (newtable) - d->ht_table[1] = newtable; - } - return ret; -} - -/* Internal function used by activeDefragZsetNode */ -void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnode, zskiplistNode **update) { - int i; - for (i = 0; i < zsl->level; i++) { - if (update[i]->level[i].forward == oldnode) - update[i]->level[i].forward = newnode; - } - serverAssert(zsl->header!=oldnode); - if (newnode->level[0].forward) { - serverAssert(newnode->level[0].forward->backward==oldnode); - newnode->level[0].forward->backward = newnode; - } else { - serverAssert(zsl->tail==oldnode); - zsl->tail = newnode; - } -} - -/* Defrag a single zset node, update dictEntry and skiplist struct */ -void activeDefragZsetNode(zset *zs, dictEntry *de, dictEntryLink plink) { - zskiplistNode *znode = dictGetKey(de); - - /* Try to defrag the skiplist node first */ - zskiplistNode *newnode = activeDefragAllocWithoutFree(znode); - if (!newnode) return; /* No defrag needed */ - - /* Node was defragged, now we need to update all skiplist pointers */ - zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *iter; - int i; - double score = newnode->score; - sds ele = zslGetNodeElement(newnode); - - /* Find all pointers that need to be updated */ - iter = zs->zsl->header; - for (i = zs->zsl->level-1; i >= 0; i--) { - while (iter->level[i].forward && - iter->level[i].forward != znode && - zslCompareWithNode(score, ele, iter->level[i].forward) > 0) - iter = iter->level[i].forward; - update[i] = iter; - } - - /* Verify we found the right node */ - iter = iter->level[0].forward; - serverAssert(iter && iter == znode); - - /* Update all skiplist pointers and dict key */ - zslUpdateNode(zs->zsl, znode, newnode, update); - dictSetKeyAtLink(zs->dict, newnode, &plink, 0); - - /* Free the old node now that all pointers have been updated */ - activeDefragFree(znode); -} - -#define DEFRAG_SDS_DICT_NO_VAL 0 -#define DEFRAG_SDS_DICT_VAL_IS_SDS 1 -#define DEFRAG_SDS_DICT_VAL_IS_STROB 2 -#define DEFRAG_SDS_DICT_VAL_VOID_PTR 3 -#define DEFRAG_SDS_DICT_VAL_LUA_SCRIPT 4 - -void activeDefragSdsDictCallback(void *privdata, const dictEntry *de, dictEntryLink plink) { - UNUSED(plink); - UNUSED(privdata); - UNUSED(de); -} - -void activeDefragLuaScriptDictCallback(void *privdata, const dictEntry *de, dictEntryLink plink) { - UNUSED(plink); - UNUSED(privdata); - - /* If this luaScript is in the LRU list, unconditionally update the node's - * value pointer to the current dict key (regardless of reallocation). */ - luaScript *script = dictGetVal(de); - if (script->node) - script->node->value = dictGetKey(de); -} - -void activeDefragHfieldDictCallback(void *privdata, const dictEntry *de, dictEntryLink plink) { - dict *d = privdata; - Entry *newEntry = NULL, *entry = dictGetKey(de); - - /* If the hfield does not have TTL, we directly defrag it. - * Fields with TTL are skipped here and will be defragmented later - * during the hash expiry ebuckets defragmentation phase. */ - if (entryGetExpiry(entry) == EB_EXPIRE_TIME_INVALID) { - if ((newEntry = activeDefragEntry(entry))) { - /* Hash dicts use no_value=1, so we must use dictSetKeyAtLink */ - dictSetKeyAtLink(d, newEntry, &plink, 0); - } - } -} - -/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */ -void activeDefragSdsDict(dict* d, int val_type) { - unsigned long cursor = 0; - dictDefragFunctions defragfns = { - .defragAlloc = activeDefragAlloc, - .defragKey = (dictDefragAllocFunction *)activeDefragSds, - .defragVal = (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS ? (dictDefragAllocFunction *)activeDefragSds : - val_type == DEFRAG_SDS_DICT_VAL_IS_STROB ? (dictDefragAllocFunction *)activeDefragStringOb : - val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR ? (dictDefragAllocFunction *)activeDefragAlloc : - val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT ? (dictDefragAllocFunction *)activeDefragLuaScript : - NULL) - }; - dictScanFunction *fn = (val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT ? - activeDefragLuaScriptDictCallback : activeDefragSdsDictCallback); - do { - cursor = dictScanDefrag(d, cursor, fn, - &defragfns, NULL); - } while (cursor != 0); -} - -/* Defrag a dict with hfield key (no separate value - value is part of entry). */ -void activeDefragHfieldDict(dict *d) { - unsigned long cursor = 0; - dictDefragFunctions defragfns = { - .defragAlloc = activeDefragAlloc, /* Only defrag dictEntry */ - .defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */ - .defragVal = NULL /* No separate value - value is part of the entry (hfield). */ - }; - do { - cursor = dictScanDefrag(d, cursor, activeDefragHfieldDictCallback, - &defragfns, d); - } while (cursor != 0); - - /* Continue with defragmentation of hash fields that have with TTL. - * During the dictionary defragmentaion above, we skipped fields with TTL, - * Now we continue to defrag those fields by using the expiry buckets. */ - if (d->type == &entryHashDictTypeWithHFE) { - cursor = 0; - ebDefragFunctions eb_defragfns = { - .defragAlloc = activeDefragAlloc, - .defragItem = activeDefragHfieldAndUpdateRef - }; - ebuckets *eb = hashTypeGetDictMetaHFE(d); - while (ebScanDefrag(eb, &hashFieldExpireBucketsType, &cursor, &eb_defragfns, d)) {} - } -} - -/* Defrag a list of ptr, sds or robj string values */ -void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) { - quicklistNode *newnode, *node = *node_ref; - unsigned char *newzl; - if ((newnode = activeDefragAlloc(node))) { - if (newnode->prev) - newnode->prev->next = newnode; - else - ql->head = newnode; - if (newnode->next) - newnode->next->prev = newnode; - else - ql->tail = newnode; - *node_ref = node = newnode; - } - if ((newzl = activeDefragAlloc(node->entry))) - node->entry = newzl; -} - -void activeDefragQuickListNodes(quicklist *ql) { - quicklistNode *node = ql->head; - while (node) { - activeDefragQuickListNode(ql, &node); - node = node->next; - } -} - -/* when the value has lots of elements, we want to handle it later and not as - * part of the main dictionary scan. this is needed in order to prevent latency - * spikes when handling large items */ -void defragLater(defragKeysCtx *ctx, kvobj *kv) { - if (!ctx->defrag_later) { - ctx->defrag_later = listCreate(); - listSetFreeMethod(ctx->defrag_later, sdsfreegeneric); - ctx->defrag_later_cursor = 0; - } - sds key = sdsdup(kvobjGetKey(kv)); - listAddNodeTail(ctx->defrag_later, key); -} - -/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -long scanLaterList(robj *ob, unsigned long *cursor, monotime endtime) { - quicklist *ql = ob->ptr; - quicklistNode *node; - long iterations = 0; - int bookmark_failed = 0; - serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST); - - if (*cursor == 0) { - /* if cursor is 0, we start new iteration */ - node = ql->head; - } else { - node = quicklistBookmarkFind(ql, "_AD"); - if (!node) { - /* if the bookmark was deleted, it means we reached the end. */ - *cursor = 0; - return 0; - } - node = node->next; - } - - (*cursor)++; - while (node) { - activeDefragQuickListNode(ql, &node); - server.stat_active_defrag_scanned++; - if (++iterations > 128 && !bookmark_failed) { - if (getMonotonicUs() > endtime) { - if (!quicklistBookmarkCreate(&ql, "_AD", node)) { - bookmark_failed = 1; - } else { - ob->ptr = ql; /* bookmark creation may have re-allocated the quicklist */ - return 1; - } - } - iterations = 0; - } - node = node->next; - } - quicklistBookmarkDelete(ql, "_AD"); - *cursor = 0; - return bookmark_failed? 1: 0; -} - -typedef struct { - zset *zs; -} scanLaterZsetData; - -void scanZsetCallback(void *privdata, const dictEntry *_de, dictEntryLink plink) { - dictEntry *de = (dictEntry*)_de; - scanLaterZsetData *data = privdata; - activeDefragZsetNode(data->zs, de, plink); - server.stat_active_defrag_scanned++; -} - -void scanLaterZset(robj *ob, unsigned long *cursor) { - serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST); - zset *zs = (zset*)ob->ptr; - dict *d = zs->dict; - scanLaterZsetData data = {zs}; - dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc}; - *cursor = dictScanDefrag(d, *cursor, scanZsetCallback, &defragfns, &data); -} - -/* Used as scan callback when all the work is done in the dictDefragFunctions. */ -void scanCallbackCountScanned(void *privdata, const dictEntry *de, dictEntryLink plink) { - UNUSED(plink); - UNUSED(privdata); - UNUSED(de); - server.stat_active_defrag_scanned++; -} - -void scanLaterSet(robj *ob, unsigned long *cursor) { - serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT); - dict *d = ob->ptr; - dictDefragFunctions defragfns = { - .defragAlloc = activeDefragAlloc, - .defragKey = (dictDefragAllocFunction *)activeDefragSds - }; - *cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL); -} - -void scanLaterHash(robj *ob, unsigned long *cursor) { - serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT); - dict *d = ob->ptr; - - typedef enum { - HASH_DEFRAG_NONE = 0, - HASH_DEFRAG_DICT = 1, - HASH_DEFRAG_EBUCKETS = 2 - } hashDefragPhase; - static hashDefragPhase defrag_phase = HASH_DEFRAG_NONE; - - /* Start a new hash defrag. */ - if (!*cursor || defrag_phase == HASH_DEFRAG_NONE) - defrag_phase = HASH_DEFRAG_DICT; - - /* Defrag hash dictionary but skip TTL fields. */ - if (defrag_phase == HASH_DEFRAG_DICT) { - dictDefragFunctions defragfns = { - .defragAlloc = activeDefragAlloc, - .defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */ - .defragVal = NULL /* value stored along with key as part of Entry */ - }; - *cursor = dictScanDefrag(d, *cursor, activeDefragHfieldDictCallback, &defragfns, d); - - /* Move to next phase. */ - if (!*cursor) defrag_phase = HASH_DEFRAG_EBUCKETS; - } - - /* Defrag ebuckets and TTL fields. */ - if (defrag_phase == HASH_DEFRAG_EBUCKETS) { - if (d->type == &entryHashDictTypeWithHFE) { - ebDefragFunctions eb_defragfns = { - .defragAlloc = activeDefragAlloc, - .defragItem = activeDefragHfieldAndUpdateRef - }; - ebuckets *eb = hashTypeGetDictMetaHFE(d); - ebScanDefrag(eb, &hashFieldExpireBucketsType, cursor, &eb_defragfns, d); - } else { - /* Finish defragmentation if this dict doesn't have expired fields. */ - *cursor = 0; - } - if (!*cursor) defrag_phase = HASH_DEFRAG_NONE; - } -} - -void defragQuicklist(defragKeysCtx *ctx, kvobj *kv) { - quicklist *ql = kv->ptr, *newql; - serverAssert(kv->type == OBJ_LIST && kv->encoding == OBJ_ENCODING_QUICKLIST); - if ((newql = activeDefragAlloc(ql))) - kv->ptr = ql = newql; - if (ql->len > server.active_defrag_max_scan_fields) - defragLater(ctx, kv); - else - activeDefragQuickListNodes(ql); -} - -void defragZsetSkiplist(defragKeysCtx *ctx, kvobj *ob) { - zset *zs = (zset*)ob->ptr; - zset *newzs; - zskiplist *newzsl; - dict *newdict; - struct zskiplistNode *newheader; - serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST); - if ((newzs = activeDefragAlloc(zs))) - ob->ptr = zs = newzs; - if ((newzsl = activeDefragAlloc(zs->zsl))) - zs->zsl = newzsl; - if ((newheader = activeDefragAlloc(zs->zsl->header))) - zs->zsl->header = newheader; - if (dictSize(zs->dict) > server.active_defrag_max_scan_fields) - defragLater(ctx, ob); - else { - /* Use dictScanDefrag to iterate and defrag both dictEntry structures and skiplist nodes. - * dictScanDefrag handles defragging dictEntry/dictEntryNoValue structures via defragfns, - * and calls our callback with plink for each entry so we can defrag skiplist nodes. */ - scanLaterZsetData data = {zs}; - dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc}; - unsigned long cursor = 0; - do { - cursor = dictScanDefrag(zs->dict, cursor, scanZsetCallback, &defragfns, &data); - } while (cursor != 0); - } - /* defrag the dict struct and tables */ - if ((newdict = dictDefragTables(zs->dict))) - zs->dict = newdict; -} - -void defragHash(defragKeysCtx *ctx, kvobj *ob) { - dict *d, *newd; - serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT); - d = ob->ptr; - if (dictSize(d) > server.active_defrag_max_scan_fields) - defragLater(ctx, ob); - else - activeDefragHfieldDict(d); - /* defrag the dict struct and tables */ - if ((newd = dictDefragTables(ob->ptr))) - ob->ptr = newd; -} - -void defragSet(defragKeysCtx *ctx, kvobj *ob) { - dict *d, *newd; - serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT); - d = ob->ptr; - if (dictSize(d) > server.active_defrag_max_scan_fields) - defragLater(ctx, ob); - else - activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL); - /* defrag the dict struct and tables */ - if ((newd = dictDefragTables(ob->ptr))) - ob->ptr = newd; -} - -/* Defrag callback for radix tree iterator, called for each node, - * used in order to defrag the nodes allocations. */ -int defragRaxNode(raxNode **noderef, void *privdata) { - UNUSED(privdata); - raxNode *newnode = activeDefragAlloc(*noderef); - if (newnode) { - *noderef = newnode; - return 1; - } - return 0; -} - -/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */ -int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime) { - static unsigned char next[sizeof(streamID)]; - raxIterator ri; - long iterations = 0; - serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM); - - stream *s = ob->ptr; - raxStart(&ri,s->rax); - if (*cursor == 0) { - /* if cursor is 0, we start new iteration */ - defragRaxNode(&s->rax->head, NULL); - /* assign the iterator node callback before the seek, so that the - * initial nodes that are processed till the first item are covered */ - ri.node_cb = defragRaxNode; - raxSeek(&ri,"^",NULL,0); - } else { - /* if cursor is non-zero, we seek to the static 'next'. - * Since node_cb is set after seek operation, any node traversed during seek wouldn't - * be defragmented. To prevent this, we advance to next node before exiting previous - * run, ensuring it gets defragmented instead of being skipped during current seek. */ - if (!raxSeek(&ri,">=", next, sizeof(next))) { - *cursor = 0; - raxStop(&ri); - return 0; - } - /* assign the iterator node callback after the seek, so that the - * initial nodes that are processed till now aren't covered */ - ri.node_cb = defragRaxNode; - } - - (*cursor)++; - while (raxNext(&ri)) { - void *newdata = activeDefragAlloc(ri.data); - if (newdata) - raxSetData(ri.node, ri.data=newdata); - server.stat_active_defrag_scanned++; - if (++iterations > 128) { - if (getMonotonicUs() > endtime) { - /* Move to next node. */ - if (!raxNext(&ri)) { - /* If we reached the end, we can stop */ - *cursor = 0; - raxStop(&ri); - return 0; - } - serverAssert(ri.key_len==sizeof(next)); - memcpy(next,ri.key,ri.key_len); - raxStop(&ri); - return 1; - } - iterations = 0; - } - } - raxStop(&ri); - *cursor = 0; - return 0; -} - -/* optional callback used defrag each rax element (not including the element pointer itself) */ -typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata); - -/* defrag radix tree including: - * 1) rax struct - * 2) rax nodes - * 3) rax entry data (only if defrag_data is specified) - * 4) call a callback per element, and allow the callback to return a new pointer for the element */ -void defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) { - raxIterator ri; - rax* rax; - if ((rax = activeDefragAlloc(*raxref))) - *raxref = rax; - rax = *raxref; - raxStart(&ri,rax); - ri.node_cb = defragRaxNode; - defragRaxNode(&rax->head, NULL); - raxSeek(&ri,"^",NULL,0); - while (raxNext(&ri)) { - void *newdata = NULL; - if (element_cb) - newdata = element_cb(&ri, element_cb_data); - if (defrag_data && !newdata) - newdata = activeDefragAlloc(ri.data); - if (newdata) - raxSetData(ri.node, ri.data=newdata); - } - raxStop(&ri); -} - -typedef struct { - streamCG *cg; - streamConsumer *c; -} PendingEntryContext; - -void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) { - PendingEntryContext *ctx = privdata; - streamNACK *nack = ri->data, *newnack; - nack->consumer = ctx->c; /* update nack pointer to consumer */ - nack->cgroup_ref_node->value = ctx->cg; /* Update the value of cgroups_ref node to the consumer group. */ - newnack = activeDefragAlloc(nack); - if (newnack) { - /* Update consumer group pointer to the nack. - * pel_by_time doesn't need updating since delivery time is unchanged. */ - void *prev; - raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev); - serverAssert(prev==nack); - } - return newnack; -} - -typedef struct { - stream *s; - streamCG *cg; -} StreamConsumerContext; - -void* defragStreamConsumer(raxIterator *ri, void *privdata) { - StreamConsumerContext *ctx = privdata; - stream *s = ctx->s; - streamCG *cg = ctx->cg; - streamConsumer *c = ri->data; - void *newc = activeDefragAlloc(c); - if (newc) { - c = newc; - } - sds newsds = activeDefragSds(c->name); - if (newsds) - c->name = newsds; - if (c->pel) { - /* Update pel back-pointer to new stream */ - c->pel->alloc_size = &s->alloc_size; - PendingEntryContext pel_ctx = {cg, c}; - defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx); - } - return newc; /* returns NULL if c was not defragged */ -} - -void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) { - stream *s = privdata; - streamCG *newcg, *cg = ri->data; - if ((newcg = activeDefragAlloc(cg))) - cg = newcg; - if (cg->pel) { - /* Update pel back-pointer to new stream */ - cg->pel->alloc_size = &s->alloc_size; - defragRadixTree(&cg->pel, 0, NULL, NULL); - } - if (cg->pel_by_time) { - /* Update pel_by_time back-pointer to new stream */ - cg->pel_by_time->alloc_size = &s->alloc_size; - defragRadixTree(&cg->pel_by_time, 0, NULL, NULL); - } - if (cg->consumers) { - /* Update consumers back-pointer to new stream */ - cg->consumers->alloc_size = &s->alloc_size; - StreamConsumerContext consumer_ctx = {s, cg}; - defragRadixTree(&cg->consumers, 0, defragStreamConsumer, &consumer_ctx); - } - return cg; -} - -/* Defrag a single idmpProducer's dict and linked list entries. */ -static void defragIdmpProducer(idmpProducer *producer) { - if (producer->idmp_dict == NULL) return; - - dict *newdict = dictDefragTables(producer->idmp_dict); - if (newdict) - producer->idmp_dict = newdict; - - idmpEntry *prev = NULL; - idmpEntry *entry = producer->idmp_head; - while (entry != NULL) { - idmpEntry *next = entry->next; - idmpEntry *newentry = activeDefragAllocWithoutFree(entry); - if (newentry) { - dictEntry *de = dictFind(producer->idmp_dict, entry); - serverAssert(de); - dictSetKey(producer->idmp_dict, de, newentry); - if (prev) - prev->next = newentry; - else - producer->idmp_head = newentry; - if (producer->idmp_tail == entry) - producer->idmp_tail = newentry; - activeDefragFree(entry); - entry = newentry; - } - prev = entry; - entry = next; - } -} - -/* Defrag all IDMP producers and their dict/linked list entries. */ -void defragStreamIdmpProducers(stream *s) { - if (s->idmp_producers == NULL) return; - - /* Defrag the producers rax tree itself */ - rax *newrax = activeDefragAlloc(s->idmp_producers); - if (newrax) - s->idmp_producers = newrax; - - /* Defrag the rax head node */ - defragRaxNode(&s->idmp_producers->head, NULL); - - /* Iterate through all producers and defrag each one */ - raxIterator ri; - raxStart(&ri, s->idmp_producers); - /* Set the node callback to defrag internal rax nodes */ - ri.node_cb = defragRaxNode; - raxSeek(&ri, "^", NULL, 0); - while (raxNext(&ri)) { - idmpProducer *producer = ri.data; - idmpProducer *newproducer = activeDefragAlloc(producer); - if (newproducer) { - raxSetData(ri.node, ri.data=newproducer); - producer = newproducer; - } - defragIdmpProducer(producer); - } - raxStop(&ri); -} - -void defragStream(defragKeysCtx *ctx, kvobj *ob) { - serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM); - stream *s = ob->ptr, *news; - - /* handle the main struct */ - if ((news = activeDefragAlloc(s))) - ob->ptr = s = news; - - /* Update rax back-pointer to new stream */ - s->rax->alloc_size = &s->alloc_size; - if (raxSize(s->rax) > server.active_defrag_max_scan_fields) { - rax *newrax = activeDefragAlloc(s->rax); - if (newrax) - s->rax = newrax; - defragLater(ctx, ob); - } else - defragRadixTree(&s->rax, 1, NULL, NULL); - - if (s->cgroups) { - /* Update cgroups back-pointer to new stream */ - s->cgroups->alloc_size = &s->alloc_size; - defragRadixTree(&s->cgroups, 0, defragStreamConsumerGroup, s); - } - - if (s->cgroups_ref) { - /* Update cgroups_ref back-pointer to new stream */ - s->cgroups_ref->alloc_size = &s->alloc_size; - } - - if (s->idmp_producers) { - /* Defrag the producers and all idmpEntry structures in their linked lists */ - defragStreamIdmpProducers(s); - } -} - -/* Defrag a module key. This is either done immediately or scheduled - * for later. Returns then number of pointers defragged. - */ -void defragModule(defragKeysCtx *ctx, redisDb *db, kvobj *kv) { - serverAssert(kv->type == OBJ_MODULE); - robj keyobj; - initStaticStringObject(keyobj, kvobjGetKey(kv)); - if (!moduleDefragValue(&keyobj, kv, db->id)) - defragLater(ctx, kv); -} - -/* Defrag a kvobj structure, taking into account optional preceding metadata. - * For EMBSTR strings, also defrags the embedded string value in the same allocation. - * For RAW strings and other types, only the kvobj wrapper is defragged here; - * the value's internal data structures are defragged separately in defragKey(). - * - * Returns NULL if the allocation wasn't moved. - * When it returns a non-null value, the old pointer was already released - * (unless without_free is set) and should NOT be accessed. */ -robj *activeDefragKvobj(kvobj* kv, int without_free) { - void *alloc, *newalloc; - kvobj *kvNew = NULL; - /* Use LONG_MIN as sentinel to detect if we have an EMBSTR string */ - long offsetEmbstr = LONG_MIN; - - /* Don't defrag kvobj's with multiple references (refcount > 1) */ - if (kv->refcount != 1) - return NULL; - - /* Calculate offset for EMBSTR strings */ - if ((kv->type == OBJ_STRING) && (kv->encoding == OBJ_ENCODING_EMBSTR)) - offsetEmbstr = (intptr_t)kv->ptr - (intptr_t)kv; - - /* Defrag the kvobj allocation (including optional metadata prefix). - * For EMBSTR strings, this allocation also contains the embedded string data, - * so we'll need to recalculate the ptr offset after defragmentation (see below). */ - - alloc = kvobjGetAllocPtr(kv); - size_t metaBytes = (char *)kv - (char *)alloc; - if (without_free) - newalloc = activeDefragAllocWithoutFree(alloc); - else - newalloc = activeDefragAlloc(alloc); - - if (!newalloc) - return NULL; - - /* Update kv pointer to new allocation */ - kvNew = (kvobj *)((char *)newalloc + metaBytes); - - /* For EMBSTR strings, recalculate ptr to point to the embedded string data - * at the same offset within the new allocation */ - if (offsetEmbstr != LONG_MIN) - kvNew->ptr = (void*)((intptr_t)kvNew + offsetEmbstr); - - return kvNew; -} - -/* for each key we scan in the main dict, this function will attempt to defrag - * all the various pointers it has. */ -void defragKey(defragKeysCtx *ctx, dictEntry *de, dictEntryLink link) { - UNUSED(link); - dictEntryLink exlink = NULL; - kvobj *kvnew = NULL, *ob = dictGetKV(de); - size_t oldsize = 0; - redisDb *db = &server.db[ctx->dbid]; - int slot = ctx->kvstate.slot; - unsigned char *newzl; - - if (server.memory_tracking_per_slot) - oldsize = kvobjAllocSize(ob); - - long long expire = kvobjGetExpire(ob); - /* We can't search in db->expires for that KV after we've released - * the pointer it holds, since it won't be able to do the string - * compare. Search it before, if needed. */ - if (expire != -1) { - exlink = kvstoreDictFindLink(db->expires, slot, kvobjGetKey(ob), NULL); - serverAssert(exlink != NULL); - } - - /* Try to defrag robj. For hash objects with HFEs, - * defer defragmentation until processing db's subexpires. */ - if (!(ob->type == OBJ_HASH && hashTypeGetMinExpire(ob, 0) != EB_EXPIRE_TIME_INVALID)) { - /* If the dict doesn't have metadata, we directly defrag it. */ - kvnew = activeDefragKvobj(ob, 0); - } - if (kvnew) { - kvstoreDictSetAtLink(db->keys, slot, kvnew, &link, 0); - if (expire != -1) - kvstoreDictSetAtLink(db->expires, slot, kvnew, &exlink, 0); - ob = kvnew; - } - - if (ob->type == OBJ_STRING) { - /* Only defrag strings with refcount==1 (String might be shared as dict - * keys, e.g. pub/sub channels, and may be accessed by IO threads. Other - * types are never used as dict keys) */ - if ((ob->refcount==1) && (ob->encoding == OBJ_ENCODING_RAW)) { - /* For RAW strings, defrag the separate SDS allocation */ - sds newsds = activeDefragSds((sds)ob->ptr); - if (newsds) ob->ptr = newsds; - } - } else if (ob->type == OBJ_LIST) { - if (ob->encoding == OBJ_ENCODING_QUICKLIST) { - defragQuicklist(ctx, ob); - } else if (ob->encoding == OBJ_ENCODING_LISTPACK) { - if ((newzl = activeDefragAlloc(ob->ptr))) - ob->ptr = newzl; - } else { - serverPanic("Unknown list encoding"); - } - } else if (ob->type == OBJ_SET) { - if (ob->encoding == OBJ_ENCODING_HT) { - defragSet(ctx, ob); - } else if (ob->encoding == OBJ_ENCODING_INTSET || - ob->encoding == OBJ_ENCODING_LISTPACK) - { - void *newptr, *ptr = ob->ptr; - if ((newptr = activeDefragAlloc(ptr))) - ob->ptr = newptr; - } else { - serverPanic("Unknown set encoding"); - } - } else if (ob->type == OBJ_ZSET) { - if (ob->encoding == OBJ_ENCODING_LISTPACK) { - if ((newzl = activeDefragAlloc(ob->ptr))) - ob->ptr = newzl; - } else if (ob->encoding == OBJ_ENCODING_SKIPLIST) { - defragZsetSkiplist(ctx, ob); - } else { - serverPanic("Unknown sorted set encoding"); - } - } else if (ob->type == OBJ_HASH) { - if (ob->encoding == OBJ_ENCODING_LISTPACK) { - if ((newzl = activeDefragAlloc(ob->ptr))) - ob->ptr = newzl; - } else if (ob->encoding == OBJ_ENCODING_LISTPACK_EX) { - listpackEx *newlpt, *lpt = (listpackEx*)ob->ptr; - if ((newlpt = activeDefragAlloc(lpt))) - ob->ptr = lpt = newlpt; - if ((newzl = activeDefragAlloc(lpt->lp))) - lpt->lp = newzl; - } else if (ob->encoding == OBJ_ENCODING_HT) { - defragHash(ctx, ob); - } else { - serverPanic("Unknown hash encoding"); - } - } else if (ob->type == OBJ_STREAM) { - defragStream(ctx, ob); - } else if (ob->type == OBJ_MODULE) { - defragModule(ctx,db, ob); - } else { - serverPanic("Unknown object type"); - } - if (server.memory_tracking_per_slot) - updateSlotAllocSize(db, slot, oldsize, kvobjAllocSize(ob)); -} - -/* Defrag scan callback for the main db dictionary. */ -static void dbKeysScanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) { - long long hits_before = server.stat_active_defrag_hits; - defragKey((defragKeysCtx *)privdata, (dictEntry *)de, plink); - if (server.stat_active_defrag_hits != hits_before) - server.stat_active_defrag_key_hits++; - else - server.stat_active_defrag_key_misses++; - server.stat_active_defrag_scanned++; -} - -#if !defined(DEBUG_DEFRAG_FORCE) -/* Utility function to get the fragmentation ratio from jemalloc. - * It is critical to do that by comparing only heap maps that belong to - * jemalloc, and skip ones the jemalloc keeps as spare. Since we use this - * fragmentation ratio in order to decide if a defrag action should be taken - * or not, a false detection can cause the defragmenter to waste a lot of CPU - * without the possibility of getting any results. */ -float getAllocatorFragmentation(size_t *out_frag_bytes) { - size_t resident, active, allocated, frag_smallbins_bytes; - zmalloc_get_allocator_info(1, &allocated, &active, &resident, NULL, NULL, &frag_smallbins_bytes); - - if (server.lua_arena != UINT_MAX) { - size_t lua_resident, lua_active, lua_allocated, lua_frag_smallbins_bytes; - zmalloc_get_allocator_info_by_arena(server.lua_arena, 0, &lua_allocated, &lua_active, &lua_resident, &lua_frag_smallbins_bytes); - resident -= lua_resident; - active -= lua_active; - allocated -= lua_allocated; - frag_smallbins_bytes -= lua_frag_smallbins_bytes; - } - - /* Calculate the fragmentation ratio as the proportion of wasted memory in small - * bins (which are defraggable) relative to the total allocated memory (including large bins). - * This is because otherwise, if most of the memory usage is large bins, we may show high percentage, - * despite the fact it's not a lot of memory for the user. */ - float frag_pct = (float)frag_smallbins_bytes / allocated * 100; - float rss_pct = ((float)resident / allocated)*100 - 100; - size_t rss_bytes = resident - allocated; - if(out_frag_bytes) - *out_frag_bytes = frag_smallbins_bytes; - serverLog(LL_DEBUG, - "allocated=%zu, active=%zu, resident=%zu, frag=%.2f%% (%.2f%% rss), frag_bytes=%zu (%zu rss)", - allocated, active, resident, frag_pct, rss_pct, frag_smallbins_bytes, rss_bytes); - return frag_pct; -} -#else -float getAllocatorFragmentation(size_t *out_frag_bytes) { - if (out_frag_bytes) - *out_frag_bytes = SIZE_MAX; - return 99; /* The maximum percentage of fragmentation */ -} -#endif - -/* Defrag scan callback for the pubsub dictionary. */ -void defragPubsubScanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) { - UNUSED(plink); - defragPubSubCtx *ctx = privdata; - kvstore *pubsub_channels = ctx->kvstate.kvs; - robj *newchannel, *channel = dictGetKey(de); - dict *newclients, *clients = dictGetVal(de); - - /* Try to defrag the channel name. */ - serverAssert(channel->refcount == (int)dictSize(clients) + 1); - newchannel = activeDefragStringObEx(channel, dictSize(clients) + 1); - if (newchannel) { - kvstoreDictSetKey(pubsub_channels, ctx->kvstate.slot, (dictEntry*)de, newchannel); - - /* The channel name is shared by the client's pubsub(shard) and server's - * pubsub(shard), after defraging the channel name, we need to update - * the reference in the clients' dictionary. */ - dictIterator di; - dictEntry *clientde; - dictInitIterator(&di, clients); - while((clientde = dictNext(&di)) != NULL) { - client *c = dictGetKey(clientde); - dict *client_channels = ctx->getPubSubChannels(c); - uint64_t hash = dictGetHash(client_channels, newchannel); - dictEntry *pubsub_channel = dictFindByHashAndPtr(client_channels, channel, hash); - serverAssert(pubsub_channel); - dictSetKey(ctx->getPubSubChannels(c), pubsub_channel, newchannel); - } - dictResetIterator(&di); - } - - /* Try to defrag the dictionary of clients that is stored as the value part. */ - if ((newclients = dictDefragTables(clients))) - kvstoreDictSetVal(pubsub_channels, ctx->kvstate.slot, (dictEntry *)de, newclients); - - server.stat_active_defrag_scanned++; -} - -/* returns 0 more work may or may not be needed (see non-zero cursor), - * and 1 if time is up and more work is needed. */ -int defragLaterItem(kvobj *ob, unsigned long *cursor, monotime endtime, int dbid) { - if (ob) { - if (ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST) { - return scanLaterList(ob, cursor, endtime); - } else if (ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT) { - scanLaterSet(ob, cursor); - } else if (ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST) { - scanLaterZset(ob, cursor); - } else if (ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT) { - scanLaterHash(ob, cursor); - } else if (ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM) { - return scanLaterStreamListpacks(ob, cursor, endtime); - } else if (ob->type == OBJ_MODULE) { - robj keyobj; - initStaticStringObject(keyobj, kvobjGetKey(ob)); - return moduleLateDefrag(&keyobj, ob, cursor, endtime, dbid); - } else { - *cursor = 0; /* object type/encoding may have changed since we schedule it for later */ - } - } else { - *cursor = 0; /* object may have been deleted already */ - } - return 0; -} - -static int defragIsRunning(void) { - return (defrag.timeproc_id > 0); -} - -/* A kvstoreHelperPreContinueFn */ -static doneStatus defragLaterStep(void *ctx, monotime endtime) { - defragKeysCtx *defrag_keys_ctx = ctx; - redisDb *db = &server.db[defrag_keys_ctx->dbid]; - int slot = defrag_keys_ctx->kvstate.slot; - size_t oldsize = 0; - - unsigned int iterations = 0; - unsigned long long prev_defragged = server.stat_active_defrag_hits; - unsigned long long prev_scanned = server.stat_active_defrag_scanned; - - while (defrag_keys_ctx->defrag_later && listLength(defrag_keys_ctx->defrag_later) > 0) { - listNode *head = listFirst(defrag_keys_ctx->defrag_later); - sds key = head->value; - dictEntry *de = kvstoreDictFind(defrag_keys_ctx->kvstate.kvs, defrag_keys_ctx->kvstate.slot, key); - kvobj *kv = de ? dictGetKV(de) : NULL; - - long long key_defragged = server.stat_active_defrag_hits; - if (server.memory_tracking_per_slot && kv) - oldsize = kvobjAllocSize(kv); - int timeout = (defragLaterItem(kv, &defrag_keys_ctx->defrag_later_cursor, endtime, defrag_keys_ctx->dbid) == 1); - if (server.memory_tracking_per_slot && kv) - updateSlotAllocSize(db, slot, oldsize, kvobjAllocSize(kv)); - if (key_defragged != server.stat_active_defrag_hits) { - server.stat_active_defrag_key_hits++; - } else { - server.stat_active_defrag_key_misses++; - } - - if (timeout) break; - - if (defrag_keys_ctx->defrag_later_cursor == 0) { - /* the item is finished, move on */ - listDelNode(defrag_keys_ctx->defrag_later, head); - } - - if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || - server.stat_active_defrag_scanned - prev_scanned > 64) { - if (getMonotonicUs() > endtime) break; - iterations = 0; - prev_defragged = server.stat_active_defrag_hits; - prev_scanned = server.stat_active_defrag_scanned; - } - } - - return (!defrag_keys_ctx->defrag_later || listLength(defrag_keys_ctx->defrag_later) == 0) ? DEFRAG_DONE : DEFRAG_NOT_DONE; -} - -#define INTERPOLATE(x, x1, x2, y1, y2) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) ) -#define LIMIT(y, min, max) ((y)<(min)? min: ((y)>(max)? max: (y))) - -/* decide if defrag is needed, and at what CPU effort to invest in it */ -void computeDefragCycles(void) { - size_t frag_bytes; - float frag_pct = getAllocatorFragmentation(&frag_bytes); - /* If we're not already running, and below the threshold, exit. */ - if (!server.active_defrag_running) { - if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes) - return; - } - - /* Calculate the adaptive aggressiveness of the defrag based on the current - * fragmentation and configurations. */ - int cpu_pct = INTERPOLATE(frag_pct, - server.active_defrag_threshold_lower, - server.active_defrag_threshold_upper, - server.active_defrag_cycle_min, - server.active_defrag_cycle_max); - cpu_pct *= defrag.decay_rate; - cpu_pct = LIMIT(cpu_pct, - server.active_defrag_cycle_min, - server.active_defrag_cycle_max); - - /* Normally we allow increasing the aggressiveness during a scan, but don't - * reduce it, since we should not lower the aggressiveness when fragmentation - * drops. But when a configuration is made, we should reconsider it. */ - if (cpu_pct > server.active_defrag_running || - server.active_defrag_configuration_changed) - { - server.active_defrag_configuration_changed = 0; - if (defragIsRunning()) { - serverLog(LL_VERBOSE, "Changing active defrag CPU, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", - frag_pct, frag_bytes, cpu_pct); - } else { - serverLog(LL_VERBOSE, - "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%", - frag_pct, frag_bytes, cpu_pct); - } - server.active_defrag_running = cpu_pct; - } -} - -/* This helper function handles most of the work for iterating over a kvstore. 'privdata', if - * provided, MUST begin with 'kvstoreIterState' and this part is automatically updated by this - * function during the iteration. */ -static doneStatus defragStageKvstoreHelper(monotime endtime, - void *ctx, - dictScanFunction scan_fn, - kvstoreHelperPreContinueFn precontinue_fn, - dictDefragFunctions *defragfns) -{ - unsigned int iterations = 0; - unsigned long long prev_defragged = server.stat_active_defrag_hits; - unsigned long long prev_scanned = server.stat_active_defrag_scanned; - kvstoreIterState *state = (kvstoreIterState*)ctx; - - if (state->slot == ITER_SLOT_DEFRAG_LUT) { - /* Before we start scanning the kvstore, handle the main structures */ - do { - state->cursor = kvstoreDictLUTDefrag(state->kvs, state->cursor, dictDefragTables); - if (getMonotonicUs() >= endtime) return DEFRAG_NOT_DONE; - } while (state->cursor != 0); - state->slot = ITER_SLOT_UNASSIGNED; - } - - while (1) { - if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || server.stat_active_defrag_scanned - prev_scanned > 64) { - if (getMonotonicUs() >= endtime) break; - iterations = 0; - prev_defragged = server.stat_active_defrag_hits; - prev_scanned = server.stat_active_defrag_scanned; - } - - if (precontinue_fn) { - if (precontinue_fn(ctx, endtime) == DEFRAG_NOT_DONE) return DEFRAG_NOT_DONE; - } - - if (!state->cursor) { - /* If there's no cursor, we're ready to begin a new kvstore slot. */ - if (state->slot == ITER_SLOT_UNASSIGNED) { - state->slot = kvstoreGetFirstNonEmptyDictIndex(state->kvs); - } else { - state->slot = kvstoreGetNextNonEmptyDictIndex(state->kvs, state->slot); - } - - if (state->slot == ITER_SLOT_UNASSIGNED) return DEFRAG_DONE; - } - - /* Whatever privdata's actual type, this function requires that it begins with kvstoreIterState. */ - state->cursor = kvstoreDictScanDefrag(state->kvs, state->slot, state->cursor, - scan_fn, defragfns, ctx); - } - - return DEFRAG_NOT_DONE; -} - -static doneStatus defragStageDbKeys(void *ctx, monotime endtime) { - defragKeysCtx *defrag_keys_ctx = ctx; - redisDb *db = &server.db[defrag_keys_ctx->dbid]; - if (db->keys != defrag_keys_ctx->kvstate.kvs) { - /* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */ - return DEFRAG_DONE; - } - - /* Note: for DB keys, we use the start/finish callback to fix an expires table entry if - * the main DB entry has been moved. */ - static dictDefragFunctions defragfns = { - .defragAlloc = activeDefragAlloc, - .defragKey = NULL, /* Handled by dbKeysScanCallback */ - .defragVal = NULL, /* Handled by dbKeysScanCallback */ - }; - - return defragStageKvstoreHelper(endtime, ctx, - dbKeysScanCallback, defragLaterStep, &defragfns); -} - -static doneStatus defragStageExpiresKvstore(void *ctx, monotime endtime) { - defragKeysCtx *defrag_keys_ctx = ctx; - redisDb *db = &server.db[defrag_keys_ctx->dbid]; - if (db->expires != defrag_keys_ctx->kvstate.kvs) { - /* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */ - return DEFRAG_DONE; - } - - static dictDefragFunctions defragfns = { - .defragAlloc = activeDefragAlloc, - .defragKey = NULL, /* Not needed for expires (just a ref) */ - .defragVal = NULL, /* Not needed for expires (no value) */ - }; - return defragStageKvstoreHelper(endtime, ctx, - scanCallbackCountScanned, NULL, &defragfns); -} - -/* Defrag (hash) object with subexpiry and update its reference in the DB keys. */ -void *activeDefragSubexpiresOB(void *ptr, void *privdata) { - redisDb *db = privdata; - dictEntryLink link, exlink = NULL; - kvobj *newkv, *kv = ptr; - sds keystr = kvobjGetKey(kv); - unsigned int slot = calculateKeySlot(keystr); - - serverAssert(kv->type == OBJ_HASH); /* Currently relevant only for hashes */ - - long long expire = kvobjGetExpire(kv); - /* We can't search in db->expires for that KV after we've released - * the pointer it holds, since it won't be able to do the string - * compare. Search it before, if needed. */ - if (expire != -1) { - exlink = kvstoreDictFindLink(db->expires, slot, keystr, NULL); - serverAssert(exlink != NULL); - } - - if ((newkv = activeDefragKvobj(kv, 1))) { - /* Update its reference in the DB keys. */ - link = kvstoreDictFindLink(db->keys, slot, keystr, NULL); - serverAssert(link != NULL); - kvstoreDictSetAtLink(db->keys, slot, newkv, &link, 0); - if (expire != -1) - kvstoreDictSetAtLink(db->expires, slot, newkv, &exlink, 0); - activeDefragFree(kvobjGetAllocPtr(kv)); - } - return newkv; -} - -static doneStatus defragStageSubexpires(void *ctx, monotime endtime) { - unsigned int iterations = 0; - unsigned long long prev_defragged = server.stat_active_defrag_hits; - unsigned long long prev_scanned = server.stat_active_defrag_scanned; - defragSubexpiresCtx *subctx = ctx; - redisDb *db = &server.db[subctx->dbid]; - estore *subexpires = db->subexpires; - - /* If estore changed (flushdb, swapdb, etc.), Just complete the stage. */ - if (db->subexpires != subctx->subexpires) { - return DEFRAG_DONE; - } - - ebDefragFunctions eb_defragfns = { - .defragAlloc = activeDefragAlloc, - .defragItem = activeDefragSubexpiresOB - }; - - while (1) { - if (++iterations > 16 || - server.stat_active_defrag_hits - prev_defragged > 512 || - server.stat_active_defrag_scanned - prev_scanned > 64) - { - if (getMonotonicUs() >= endtime) break; - iterations = 0; - prev_defragged = server.stat_active_defrag_hits; - prev_scanned = server.stat_active_defrag_scanned; - } - - /* If there's no cursor, we're ready to begin a new estore slot. */ - if (!subctx->cursor) { - if (subctx->slot == ITER_SLOT_UNASSIGNED) { - subctx->slot = estoreGetFirstNonEmptyBucket(subexpires); - } else { - subctx->slot = estoreGetNextNonEmptyBucket(subexpires, subctx->slot); - } - - if (subctx->slot == ITER_SLOT_UNASSIGNED) return DEFRAG_DONE; - } - - /* Get the ebuckets for the current slot and scan it */ - ebuckets *bucket = estoreGetBuckets(subexpires, subctx->slot); - if (!ebScanDefrag(bucket, &subexpiresBucketsType, &subctx->cursor, &eb_defragfns, db)) - subctx->cursor = 0; /* Reset cursor to move to next slot */ - } - - return DEFRAG_NOT_DONE; -} - -static doneStatus defragStagePubsubKvstore(void *ctx, monotime endtime) { - static dictDefragFunctions defragfns = { - .defragAlloc = activeDefragAlloc, - .defragKey = NULL, /* Handled by defragPubsubScanCallback */ - .defragVal = NULL, /* Not needed for expires (no value) */ - }; - - return defragStageKvstoreHelper(endtime, ctx, - defragPubsubScanCallback, NULL, &defragfns); -} - -static doneStatus defragLuaScripts(void *ctx, monotime endtime) { - UNUSED(endtime); - UNUSED(ctx); - activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT); - return DEFRAG_DONE; -} - -/* Handles defragmentation of module global data. This is a stage function - * that gets called periodically during the active defragmentation process. */ -static doneStatus defragModuleGlobals(void *ctx, monotime endtime) { - defragModuleCtx *defrag_module_ctx = ctx; - - RedisModule *module = moduleGetHandleByName(defrag_module_ctx->module_name); - if (!module) { - /* Module has been unloaded, nothing to defrag. */ - return DEFRAG_DONE; - } - /* Interval shouldn't exceed 1 hour */ - serverAssert(!endtime || llabs((long long)endtime - (long long)getMonotonicUs()) < 60*60*1000*1000LL); - - /* Call appropriate version of module's defrag callback: - * 1. Version 2 (defrag_cb_2): Supports incremental defrag and returns whether more work is needed - * 2. Version 1 (defrag_cb): Legacy version, performs all work in one call. - * Note: V1 doesn't support incremental defragmentation, may block for longer periods. */ - RedisModuleDefragCtx defrag_ctx = { endtime, &defrag_module_ctx->cursor, NULL, -1, -1, -1 }; - if (module->defrag_cb_2) { - return module->defrag_cb_2(&defrag_ctx) ? DEFRAG_NOT_DONE : DEFRAG_DONE; - } else if (module->defrag_cb) { - module->defrag_cb(&defrag_ctx); - return DEFRAG_DONE; - } else { - redis_unreachable(); - } -} - -static void freeDefragKeysContext(void *ctx) { - defragKeysCtx *defrag_keys_ctx = ctx; - if (defrag_keys_ctx->defrag_later) { - listRelease(defrag_keys_ctx->defrag_later); - } - zfree(defrag_keys_ctx); -} - -static void freeDefragModelContext(void *ctx) { - defragModuleCtx *defrag_model_ctx = ctx; - sdsfree(defrag_model_ctx->module_name); - zfree(defrag_model_ctx); -} - -static void freeDefragContext(void *ptr) { - StageDescriptor *stage = ptr; - if (stage->ctx_free_fn) - stage->ctx_free_fn(stage->ctx); - zfree(stage); -} - -static void addDefragStage(defragStageFn stage_fn, defragStageContextFreeFn ctx_free_fn, void *ctx) { - StageDescriptor *stage = zmalloc(sizeof(StageDescriptor)); - stage->stage_fn = stage_fn; - stage->ctx_free_fn = ctx_free_fn; - stage->ctx = ctx; - listAddNodeTail(defrag.remaining_stages, stage); -} - -/* Updates the defrag decay rate based on the observed effectiveness of the defrag process. - * The decay rate is used to gradually slow down defrag when it's not being effective. */ -static void updateDefragDecayRate(float frag_pct) { - long long last_hits = server.stat_active_defrag_hits - defrag.start_defrag_hits; - long long last_misses = server.stat_active_defrag_misses - defrag.start_defrag_misses; - float last_frag_pct_change = defrag.start_frag_pct - frag_pct; - /* When defragmentation efficiency is low, we gradually reduce the - * speed for the next cycle to avoid CPU waste. However, in the - * following two cases, we keep the normal speed: - * 1) If the fragmentation percentage has increased or decreased by more than 2%. - * 2) If the fragmentation percentage decrease is small, but hits are above 1%, - * we still keep the normal speed. */ - if (fabs(last_frag_pct_change) > 2 || - (last_frag_pct_change < 0 && last_hits >= (last_hits + last_misses) * 0.01)) - { - defrag.decay_rate = 1.0f; - } else { - defrag.decay_rate *= 0.9; - } -} - -/* Called at the end of a complete defrag cycle, or when defrag is terminated */ -static void endDefragCycle(int normal_termination) { - if (normal_termination) { - /* For normal termination, we expect... */ - serverAssert(!defrag.current_stage); - serverAssert(listLength(defrag.remaining_stages) == 0); - } else { - /* Defrag is being terminated abnormally */ - aeDeleteTimeEvent(server.el, defrag.timeproc_id); - - if (defrag.current_stage) { - listDelNode(defrag.remaining_stages, defrag.current_stage); - defrag.current_stage = NULL; - } - } - defrag.timeproc_id = AE_DELETED_EVENT_ID; - - listRelease(defrag.remaining_stages); - defrag.remaining_stages = NULL; - - size_t frag_bytes; - float frag_pct = getAllocatorFragmentation(&frag_bytes); - serverLog(LL_VERBOSE, "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu", - (int)elapsedMs(defrag.start_cycle), (int)(server.stat_active_defrag_hits - defrag.start_defrag_hits), - frag_pct, frag_bytes); - - server.stat_total_active_defrag_time += elapsedUs(server.stat_last_active_defrag_time); - server.stat_last_active_defrag_time = 0; - server.active_defrag_running = 0; - - updateDefragDecayRate(frag_pct); - moduleDefragEnd(); - - /* Immediately check to see if we should start another defrag cycle. */ - activeDefragCycle(); -} - -/* Must be called at the start of the timeProc as it measures the delay from the end of the previous - * timeProc invocation when performing the computation. */ -static int computeDefragCycleUs(void) { - long dutyCycleUs; - - int targetCpuPercent = server.active_defrag_running; - serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100); - - static int prevCpuPercent = 0; /* STATIC - this persists */ - if (targetCpuPercent != prevCpuPercent) { - /* If the targetCpuPercent changes, the value might be different from when the last wait - * time was computed. In this case, don't consider wait time. (This is really only an - * issue in crazy tests that dramatically increase CPU while defrag is running.) */ - defrag.timeproc_end_time = 0; - prevCpuPercent = targetCpuPercent; - } - - /* Given when the last duty cycle ended, compute time needed to achieve the desired percentage. */ - if (defrag.timeproc_end_time == 0) { - /* Either the first call to the timeProc, or we were paused for some reason. */ - defrag.timeproc_overage_us = 0; - dutyCycleUs = DEFRAG_CYCLE_US; - } else { - long waitedUs = getMonotonicUs() - defrag.timeproc_end_time; - /* Given the elapsed wait time between calls, compute the necessary duty time needed to - * achieve the desired CPU percentage. - * With: D = duty time, W = wait time, P = percent - * Solve: D P - * ----- = ----- - * D + W 100 - * Solving for D: - * D = P * W / (100 - P) - * - * Note that dutyCycleUs addresses starvation. If the wait time was long, we will compensate - * with a proportionately long duty-cycle. This won't significantly affect perceived - * latency, because clients are already being impacted by the long cycle time which caused - * the starvation of the timer. */ - dutyCycleUs = targetCpuPercent * waitedUs / (100 - targetCpuPercent); - - /* Also adjust for any accumulated overage. */ - dutyCycleUs -= defrag.timeproc_overage_us; - defrag.timeproc_overage_us = 0; - - if (dutyCycleUs < DEFRAG_CYCLE_US) { - /* We never reduce our cycle time, that would increase overhead. Instead, we track this - * as part of the overage, and increase wait time between cycles. */ - defrag.timeproc_overage_us = DEFRAG_CYCLE_US - dutyCycleUs; - dutyCycleUs = DEFRAG_CYCLE_US; - } else if (dutyCycleUs > DEFRAG_CYCLE_US * 10) { - /* Add a time limit for the defrag duty cycle to prevent excessive latency. - * When latency is already high (indicated by a long time between calls), - * we don't want to make it worse by running defrag for too long. */ - dutyCycleUs = DEFRAG_CYCLE_US * 10; - } - } - return dutyCycleUs; -} - -/* Must be called at the end of the timeProc as it records the timeproc_end_time for use in the next - * computeDefragCycleUs computation. */ -static int computeDelayMs(monotime intendedEndtime) { - defrag.timeproc_end_time = getMonotonicUs(); - long overage = defrag.timeproc_end_time - intendedEndtime; - defrag.timeproc_overage_us += overage; /* track over/under desired CPU */ - /* Allow negative overage (underage) to count against existing overage, but don't allow - * underage (from short stages) to be accumulated. */ - if (defrag.timeproc_overage_us < 0) defrag.timeproc_overage_us = 0; - - int targetCpuPercent = server.active_defrag_running; - serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100); - - /* Given the desired duty cycle, what inter-cycle delay do we need to achieve that? */ - /* We want to achieve a specific CPU percent. To do that, we can't use a skewed computation. */ - /* Example, if we run for 1ms and delay 10ms, that's NOT 10%, because the total cycle time is 11ms. */ - /* Instead, if we rum for 1ms, our total time should be 10ms. So the delay is only 9ms. */ - long totalCycleTimeUs = DEFRAG_CYCLE_US * 100 / targetCpuPercent; - long delayUs = totalCycleTimeUs - DEFRAG_CYCLE_US; - /* Only increase delay by the fraction of the overage that would be non-duty-cycle */ - delayUs += defrag.timeproc_overage_us * (100 - targetCpuPercent) / 100; - if (delayUs < 0) delayUs = 0; - long delayMs = delayUs / 1000; /* round down */ - return delayMs; -} - -/* An independent time proc for defrag. While defrag is running, this is called much more often - * than the server cron. Frequent short calls provides low latency impact. */ -static int activeDefragTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData) { - UNUSED(eventLoop); - UNUSED(id); - UNUSED(clientData); - - /* This timer shouldn't be registered unless there's work to do. */ - serverAssert(defrag.current_stage || listLength(defrag.remaining_stages) > 0); - - if (!server.active_defrag_enabled) { - /* Defrag has been disabled while running */ - endDefragCycle(0); - return AE_NOMORE; - } - - if (hasActiveChildProcess()) { - /* If there's a child process, pause the defrag, polling until the child completes. */ - defrag.timeproc_end_time = 0; /* prevent starvation recovery */ - return 100; - } - - monotime starttime = getMonotonicUs(); - int dutyCycleUs = computeDefragCycleUs(); -#if defined(DEBUG_DEFRAG_FULLY) - dutyCycleUs = 30*1000*1000LL; /* 30 seconds */ -#endif - monotime endtime = starttime + dutyCycleUs; - int haveMoreWork = 1; - - mstime_t latency; - latencyStartMonitor(latency); - - do { - if (!defrag.current_stage) { - defrag.current_stage = listFirst(defrag.remaining_stages); - } - - StageDescriptor *stage = listNodeValue(defrag.current_stage); - doneStatus status = stage->stage_fn(stage->ctx, endtime); - if (status == DEFRAG_DONE) { - listDelNode(defrag.remaining_stages, defrag.current_stage); - defrag.current_stage = NULL; - } - - haveMoreWork = (defrag.current_stage || listLength(defrag.remaining_stages) > 0); - /* If we've completed a stage early, and still have a standard time allotment remaining, - * we'll start another stage. This can happen when defrag is running infrequently, and - * starvation protection has increased the duty-cycle. */ - } while (haveMoreWork && getMonotonicUs() <= endtime - DEFRAG_CYCLE_US); - - latencyEndMonitor(latency); - latencyAddSampleIfNeeded("active-defrag-cycle", latency); - - if (haveMoreWork) { - return computeDelayMs(endtime); - } else { - endDefragCycle(1); - return AE_NOMORE; /* Ends the timer proc */ - } -} - -/* During long running scripts, or while loading, there is a periodic function for handling other - * actions. This interface allows defrag to continue running, avoiding a single long defrag step - * after the long operation completes. */ -void defragWhileBlocked(void) { - /* This is called infrequently, while timers are not active. We might need to start defrag. */ - if (!defragIsRunning()) activeDefragCycle(); - - if (!defragIsRunning()) return; - - /* Save off the timeproc_id. If we have a normal termination, it will be cleared. */ - long long timeproc_id = defrag.timeproc_id; - - /* Simulate a single call of the timer proc */ - long long reschedule_delay = activeDefragTimeProc(NULL, 0, NULL); - if (reschedule_delay == AE_NOMORE) { - /* If it's done, deregister the timer */ - aeDeleteTimeEvent(server.el, timeproc_id); - } - /* Otherwise, just ignore the reschedule_delay, the timer will pop the next time that the - * event loop can process timers again. */ -} - -static void beginDefragCycle(void) { - serverAssert(!defragIsRunning()); - - moduleDefragStart(); - - serverAssert(defrag.remaining_stages == NULL); - defrag.remaining_stages = listCreate(); - listSetFreeMethod(defrag.remaining_stages, freeDefragContext); - - for (int dbid = 0; dbid < server.dbnum; dbid++) { - redisDb *db = &server.db[dbid]; - - /* Add stage for keys. */ - defragKeysCtx *defrag_keys_ctx = zcalloc(sizeof(defragKeysCtx)); - defrag_keys_ctx->kvstate = INIT_KVSTORE_STATE(db->keys); - defrag_keys_ctx->dbid = dbid; - addDefragStage(defragStageDbKeys, freeDefragKeysContext, defrag_keys_ctx); - - /* Add stage for expires. */ - defragKeysCtx *defrag_expires_ctx = zcalloc(sizeof(defragKeysCtx)); - defrag_expires_ctx->kvstate = INIT_KVSTORE_STATE(db->expires); - defrag_expires_ctx->dbid = dbid; - addDefragStage(defragStageExpiresKvstore, freeDefragKeysContext, defrag_expires_ctx); - - /* Add stage for subexpires. */ - defragSubexpiresCtx *defrag_subexpires_ctx = zcalloc(sizeof(defragSubexpiresCtx)); - defrag_subexpires_ctx->subexpires = db->subexpires; - defrag_subexpires_ctx->slot = ITER_SLOT_UNASSIGNED; - defrag_subexpires_ctx->cursor = 0; - defrag_subexpires_ctx->dbid = dbid; - addDefragStage(defragStageSubexpires, zfree, defrag_subexpires_ctx); - } - - /* Add stage for pubsub channels. */ - defragPubSubCtx *defrag_pubsub_ctx = zmalloc(sizeof(defragPubSubCtx)); - defrag_pubsub_ctx->kvstate = INIT_KVSTORE_STATE(server.pubsub_channels); - defrag_pubsub_ctx->getPubSubChannels = getClientPubSubChannels; - addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsub_ctx); - - /* Add stage for pubsubshard channels. */ - defragPubSubCtx *defrag_pubsubshard_ctx = zmalloc(sizeof(defragPubSubCtx)); - defrag_pubsubshard_ctx->kvstate = INIT_KVSTORE_STATE(server.pubsubshard_channels); - defrag_pubsubshard_ctx->getPubSubChannels = getClientPubSubShardChannels; - addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsubshard_ctx); - - addDefragStage(defragLuaScripts, NULL, NULL); - - /* Add stages for modules. */ - dictIterator di; - dictEntry *de; - dictInitIterator(&di, modules); - while ((de = dictNext(&di)) != NULL) { - struct RedisModule *module = dictGetVal(de); - if (module->defrag_cb || module->defrag_cb_2) { - defragModuleCtx *ctx = zmalloc(sizeof(defragModuleCtx)); - ctx->cursor = 0; - ctx->module_name = sdsnew(module->name); - addDefragStage(defragModuleGlobals, freeDefragModelContext, ctx); - } - } - dictResetIterator(&di); - - defrag.current_stage = NULL; - defrag.start_cycle = getMonotonicUs(); - defrag.start_defrag_hits = server.stat_active_defrag_hits; - defrag.start_defrag_misses = server.stat_active_defrag_misses; - defrag.start_frag_pct = getAllocatorFragmentation(NULL); - defrag.timeproc_end_time = 0; - defrag.timeproc_overage_us = 0; - defrag.timeproc_id = aeCreateTimeEvent(server.el, 0, activeDefragTimeProc, NULL, NULL); - - elapsedStart(&server.stat_last_active_defrag_time); -} - -void activeDefragCycle(void) { - if (!server.active_defrag_enabled) return; - - /* Defrag gets paused while a child process is active. So there's no point in starting a new - * cycle or adjusting the CPU percentage for an existing cycle. */ - if (hasActiveChildProcess()) return; - - computeDefragCycles(); - - if (server.active_defrag_running > 0 && !defragIsRunning()) beginDefragCycle(); -} - -#else /* HAVE_DEFRAG */ - -void activeDefragCycle(void) { - /* Not implemented yet. */ -} - -void *activeDefragAlloc(void *ptr) { - UNUSED(ptr); - return NULL; -} - -void *activeDefragAllocRaw(size_t size) { - /* fallback to regular allocation */ - return zmalloc(size); -} - -void activeDefragFreeRaw(void *ptr) { - /* fallback to regular free */ - zfree(ptr); -} - -robj *activeDefragStringOb(robj *ob) { - UNUSED(ob); - return NULL; -} - -void defragWhileBlocked(void) { -} - -#endif |
