summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/defrag.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/defrag.c')
-rw-r--r--examples/redis-unstable/src/defrag.c1985
1 files changed, 1985 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/defrag.c b/examples/redis-unstable/src/defrag.c
new file mode 100644
index 0000000..f0bff15
--- /dev/null
+++ b/examples/redis-unstable/src/defrag.c
@@ -0,0 +1,1985 @@
+/*
+ * Active memory defragmentation
+ * Try to find key / value allocations that need to be re-allocated in order
+ * to reduce external fragmentation.
+ * We do that by scanning the keyspace and for each pointer we have, we can try to
+ * ask the allocator if moving it to a new address will help reduce fragmentation.
+ *
+ * Copyright (c) 2020-Present, Redis Ltd.
+ * All rights reserved.
+ *
+ * Copyright (c) 2024-present, Valkey contributors.
+ * All rights reserved.
+ *
+ * Licensed under your choice of (a) the Redis Source Available License 2.0
+ * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
+ * GNU Affero General Public License v3 (AGPLv3).
+ *
+ * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
+ */
+
+#include "server.h"
+#include <stddef.h>
+#include <math.h>
+
+#ifdef HAVE_DEFRAG
+
+#define DEFRAG_CYCLE_US 500 /* Standard duration of defrag cycle (in microseconds) */
+
+typedef enum { DEFRAG_NOT_DONE = 0,
+ DEFRAG_DONE = 1 } doneStatus;
+
+/*
+ * Defragmentation is performed in stages. Each stage is serviced by a stage function
+ * (defragStageFn). The stage function is passed a context (void*) to defrag. The contents of that
+ * context are unique to the particular stage - and may even be NULL for some stage functions. The
+ * same stage function can be used multiple times (for different stages) each having a different
+ * context.
+ *
+ * Parameters:
+ * endtime - This is the monotonic time that the function should end and return. This ensures
+ * a bounded latency due to defrag.
+ * ctx - A pointer to context which is unique to the stage function.
+ *
+ * Returns:
+ * - DEFRAG_DONE if the stage is complete
+ * - DEFRAG_NOT_DONE if there is more work to do
+ */
+typedef doneStatus (*defragStageFn)(void *ctx, monotime endtime);
+
+/* Function pointer type for freeing context in defragmentation stages. */
+typedef void (*defragStageContextFreeFn)(void *ctx);
+typedef struct {
+ defragStageFn stage_fn; /* The function to be invoked for the stage */
+ defragStageContextFreeFn ctx_free_fn; /* Function to free the context */
+ void *ctx; /* Context, unique to the stage function */
+} StageDescriptor;
+
+/* Globals needed for the main defrag processing logic.
+ * Doesn't include variables specific to a stage or type of data. */
+struct DefragContext {
+ monotime start_cycle; /* Time of beginning of defrag cycle */
+ long long start_defrag_hits; /* server.stat_active_defrag_hits captured at beginning of cycle */
+ long long start_defrag_misses; /* server.stat_active_defrag_misses captured at beginning of cycle */
+ float start_frag_pct; /* Fragmention percent of beginning of defrag cycle */
+ float decay_rate; /* Defrag speed decay rate */
+
+ list *remaining_stages; /* List of stages which remain to be processed */
+ listNode *current_stage; /* The list node of stage that's currently being processed */
+
+ long long timeproc_id; /* Eventloop ID of the timerproc (or AE_DELETED_EVENT_ID) */
+ monotime timeproc_end_time; /* Ending time of previous timerproc execution */
+ long timeproc_overage_us; /* A correction value if over target CPU percent */
+};
+static struct DefragContext defrag = {0, 0, 0, 0, 1.0f};
+
+#define ITER_SLOT_DEFRAG_LUT (-2)
+#define ITER_SLOT_UNASSIGNED (-1)
+
+/* There are a number of stages which process a kvstore. To simplify this, a stage helper function
+ * `defragStageKvstoreHelper()` is defined. This function aids in iterating over the kvstore. It
+ * uses these definitions.
+ */
+/* State of the kvstore helper. The context passed to the kvstore helper MUST BEGIN
+ * with a kvstoreIterState (or be passed as NULL). */
+typedef struct {
+ kvstore *kvs;
+ int slot; /* Consider defines ITER_SLOT_XXX for special values. */
+ unsigned long cursor;
+} kvstoreIterState;
+#define INIT_KVSTORE_STATE(kvs) ((kvstoreIterState){(kvs), ITER_SLOT_DEFRAG_LUT, 0})
+
+/* The kvstore helper uses this function to perform tasks before continuing the iteration. For the
+ * main dictionary, large items are set aside and processed by this function before continuing with
+ * iteration over the kvstore.
+ * endtime - This is the monotonic time that the function should end and return.
+ * ctx - Context for functions invoked by the helper. If provided in the call to
+ * `defragStageKvstoreHelper()`, the `kvstoreIterState` portion (at the beginning)
+ * will be updated with the current kvstore iteration status.
+ *
+ * Returns:
+ * - DEFRAG_DONE if the pre-continue work is complete
+ * - DEFRAG_NOT_DONE if there is more work to do
+ */
+typedef doneStatus (*kvstoreHelperPreContinueFn)(void *ctx, monotime endtime);
+
+typedef struct {
+ kvstoreIterState kvstate;
+ int dbid;
+
+ /* When scanning a main kvstore, large elements are queued for later handling rather than
+ * causing a large latency spike while processing a hash table bucket. This list is only used
+ * for stage: "defragStageDbKeys". It will only contain values for the current kvstore being
+ * defragged.
+ * Note that this is a list of key names. It's possible that the key may be deleted or modified
+ * before "later" and we will search by key name to find the entry when we defrag the item later. */
+ list *defrag_later;
+ unsigned long defrag_later_cursor;
+} defragKeysCtx;
+static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");
+
+/* Context for subexpires */
+typedef struct {
+ estore *subexpires;
+ int slot; /* Consider defines ITER_SLOT_XXX for special values. */
+ int dbid;
+ unsigned long cursor;
+} defragSubexpiresCtx;
+
+/* Context for pubsub kvstores */
+typedef dict *(*getClientChannelsFn)(client *);
+typedef struct {
+ kvstoreIterState kvstate;
+ getClientChannelsFn getPubSubChannels;
+} defragPubSubCtx;
+static_assert(offsetof(defragPubSubCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");
+
+typedef struct {
+ sds module_name;
+ unsigned long cursor;
+} defragModuleCtx;
+
+/* this method was added to jemalloc in order to help us understand which
+ * pointers are worthwhile moving and which aren't */
+int je_get_defrag_hint(void* ptr);
+
+#if !defined(DEBUG_DEFRAG_FORCE)
+/* Defrag helper for generic allocations without freeing old pointer.
+ *
+ * Note: The caller is responsible for freeing the old pointer if this function
+ * returns a non-NULL value. */
+void* activeDefragAllocWithoutFree(void *ptr) {
+ size_t size;
+ void *newptr;
+ if(!je_get_defrag_hint(ptr)) {
+ server.stat_active_defrag_misses++;
+ return NULL;
+ }
+ /* move this allocation to a new allocation.
+ * make sure not to use the thread cache. so that we don't get back the same
+ * pointers we try to free */
+ size = zmalloc_usable_size(ptr);
+ newptr = zmalloc_no_tcache(size);
+ memcpy(newptr, ptr, size);
+ server.stat_active_defrag_hits++;
+ return newptr;
+}
+
+void activeDefragFree(void *ptr) {
+ zfree_no_tcache(ptr);
+}
+
+/* Defrag helper for generic allocations.
+ *
+ * returns NULL in case the allocation wasn't moved.
+ * when it returns a non-null value, the old pointer was already released
+ * and should NOT be accessed. */
+void* activeDefragAlloc(void *ptr) {
+ void *newptr = activeDefragAllocWithoutFree(ptr);
+ if (newptr)
+ activeDefragFree(ptr);
+ return newptr;
+}
+
+/* Raw memory allocation for defrag, avoid using tcache. */
+void *activeDefragAllocRaw(size_t size) {
+ return zmalloc_no_tcache(size);
+}
+
+/* Raw memory free for defrag, avoid using tcache. */
+void activeDefragFreeRaw(void *ptr) {
+ activeDefragFree(ptr);
+ server.stat_active_defrag_hits++;
+}
+#else
+void *activeDefragAllocWithoutFree(void *ptr) {
+ size_t size;
+ void *newptr;
+ size = zmalloc_usable_size(ptr);
+ newptr = zmalloc(size);
+ memcpy(newptr, ptr, size);
+ server.stat_active_defrag_hits++;
+ return newptr;
+}
+
+void activeDefragFree(void *ptr) {
+ zfree(ptr);
+}
+
+void *activeDefragAlloc(void *ptr) {
+ void *newptr = activeDefragAllocWithoutFree(ptr);
+ if (newptr)
+ activeDefragFree(ptr);
+ return newptr;
+}
+
+void *activeDefragAllocRaw(size_t size) {
+ return zmalloc(size);
+}
+
+void activeDefragFreeRaw(void *ptr) {
+ zfree(ptr);
+ server.stat_active_defrag_hits++;
+}
+#endif
+
+/*Defrag helper for sds strings
+ *
+ * returns NULL in case the allocation wasn't moved.
+ * when it returns a non-null value, the old pointer was already released
+ * and should NOT be accessed. */
+sds activeDefragSds(sds sdsptr) {
+ void* ptr = sdsAllocPtr(sdsptr);
+ void* newptr = activeDefragAlloc(ptr);
+ if (newptr) {
+ size_t offset = sdsptr - (char*)ptr;
+ sdsptr = (char*)newptr + offset;
+ return sdsptr;
+ }
+ return NULL;
+}
+
+/* Defrag helper for hfield (entry) strings
+ *
+ * returns NULL in case the allocation wasn't moved.
+ * when it returns a non-null value, the old pointer was already released
+ * and should NOT be accessed. */
+Entry *activeDefragEntry(Entry *entry) {
+ Entry *ret = NULL;
+
+ /* First, defrag the entry allocation itself */
+ void *ptr = entryGetAllocPtr(entry);
+ void *newptr = activeDefragAlloc(ptr);
+ if (newptr) {
+ size_t offset = (char*)entry - (char*)ptr;
+ entry = (Entry *)((char*)newptr + offset);
+ ret = entry;
+ }
+
+ /* Then defrag the value if it's not embedded (using the potentially new entry) */
+ sds *valuePtr = entryGetValuePtrRef(entry);
+ if (valuePtr) {
+ sds new_value = activeDefragSds(*valuePtr);
+ if (new_value) *valuePtr = new_value;
+ }
+
+ return ret;
+}
+
+/* Defrag helper for hfield strings and update the reference in the dict.
+ *
+ * returns NULL in case the allocation wasn't moved.
+ * when it returns a non-null value, the old pointer was already released
+ * and should NOT be accessed. */
+void *activeDefragHfieldAndUpdateRef(void *ptr, void *privdata) {
+ dict *d = privdata;
+ dictEntryLink link;
+
+ /* Before the key is released, obtain the link to
+ * ensure we can safely access and update the key. */
+ link = dictFindLink(d, ptr, NULL);
+ serverAssert(link);
+
+ Entry *newEntry = activeDefragEntry(ptr);
+ if (newEntry)
+ dictSetKeyAtLink(d, newEntry, &link, 0);
+ return newEntry;
+}
+
+/* Defrag helper for robj and/or string objects with expected refcount.
+ *
+ * Like activeDefragStringOb, but it requires the caller to pass in the expected
+ * reference count. In some cases, the caller needs to update a robj whose
+ * reference count is not 1, in these cases, the caller must explicitly pass
+ * in the reference count, otherwise defragmentation will not be performed.
+ * Note that the caller is responsible for updating any other references to the robj. */
+robj *activeDefragStringObEx(robj* ob, int expected_refcount) {
+ robj *ret = NULL;
+ if (ob->refcount!=expected_refcount)
+ return NULL;
+
+ /* try to defrag robj (only if not an EMBSTR type (handled below). */
+ if (ob->type!=OBJ_STRING || ob->encoding!=OBJ_ENCODING_EMBSTR) {
+ if ((ret = activeDefragAlloc(ob))) {
+ ob = ret;
+ }
+ }
+
+ /* try to defrag string object */
+ if (ob->type == OBJ_STRING) {
+ if(ob->encoding==OBJ_ENCODING_RAW) {
+ sds newsds = activeDefragSds((sds)ob->ptr);
+ if (newsds) {
+ ob->ptr = newsds;
+ }
+ } else if (ob->encoding==OBJ_ENCODING_EMBSTR) {
+ /* The sds is embedded in the object allocation, calculate the
+ * offset and update the pointer in the new allocation. */
+ long ofs = (intptr_t)ob->ptr - (intptr_t)ob;
+ if ((ret = activeDefragAlloc(ob))) {
+ ret->ptr = (void*)((intptr_t)ret + ofs);
+ }
+ } else if (ob->encoding!=OBJ_ENCODING_INT) {
+ serverPanic("Unknown string encoding");
+ }
+ }
+ return ret;
+}
+
+/* Defrag helper for robj and/or string objects
+ *
+ * returns NULL in case the allocation wasn't moved.
+ * when it returns a non-null value, the old pointer was already released
+ * and should NOT be accessed. */
+robj *activeDefragStringOb(robj* ob) {
+ return activeDefragStringObEx(ob, 1);
+}
+
+/* Defrag helper for lua scripts
+ *
+ * returns NULL in case the allocation wasn't moved.
+ * when it returns a non-null value, the old pointer was already released
+ * and should NOT be accessed. */
+luaScript *activeDefragLuaScript(luaScript *script) {
+ luaScript *ret = NULL;
+
+ /* try to defrag script struct */
+ if ((ret = activeDefragAlloc(script))) {
+ script = ret;
+ }
+
+ /* try to defrag actual script object */
+ robj *ob = activeDefragStringOb(script->body);
+ if (ob) script->body = ob;
+
+ return ret;
+}
+
+/* Defrag helper for dict main allocations (dict struct, and hash tables).
+ * Receives a pointer to the dict* and return a new dict* when the dict
+ * struct itself was moved.
+ *
+ * Returns NULL in case the allocation wasn't moved.
+ * When it returns a non-null value, the old pointer was already released
+ * and should NOT be accessed. */
+dict *dictDefragTables(dict *d) {
+ dict *ret = NULL;
+ dictEntry **newtable;
+ /* handle the dict struct */
+ if ((ret = activeDefragAlloc(d)))
+ d = ret;
+ /* handle the first hash table */
+ if (!d->ht_table[0]) return ret; /* created but unused */
+ newtable = activeDefragAlloc(d->ht_table[0]);
+ if (newtable)
+ d->ht_table[0] = newtable;
+ /* handle the second hash table */
+ if (d->ht_table[1]) {
+ newtable = activeDefragAlloc(d->ht_table[1]);
+ if (newtable)
+ d->ht_table[1] = newtable;
+ }
+ return ret;
+}
+
+/* Internal function used by activeDefragZsetNode */
+void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnode, zskiplistNode **update) {
+ int i;
+ for (i = 0; i < zsl->level; i++) {
+ if (update[i]->level[i].forward == oldnode)
+ update[i]->level[i].forward = newnode;
+ }
+ serverAssert(zsl->header!=oldnode);
+ if (newnode->level[0].forward) {
+ serverAssert(newnode->level[0].forward->backward==oldnode);
+ newnode->level[0].forward->backward = newnode;
+ } else {
+ serverAssert(zsl->tail==oldnode);
+ zsl->tail = newnode;
+ }
+}
+
+/* Defrag a single zset node, update dictEntry and skiplist struct */
+void activeDefragZsetNode(zset *zs, dictEntry *de, dictEntryLink plink) {
+ zskiplistNode *znode = dictGetKey(de);
+
+ /* Try to defrag the skiplist node first */
+ zskiplistNode *newnode = activeDefragAllocWithoutFree(znode);
+ if (!newnode) return; /* No defrag needed */
+
+ /* Node was defragged, now we need to update all skiplist pointers */
+ zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *iter;
+ int i;
+ double score = newnode->score;
+ sds ele = zslGetNodeElement(newnode);
+
+ /* Find all pointers that need to be updated */
+ iter = zs->zsl->header;
+ for (i = zs->zsl->level-1; i >= 0; i--) {
+ while (iter->level[i].forward &&
+ iter->level[i].forward != znode &&
+ zslCompareWithNode(score, ele, iter->level[i].forward) > 0)
+ iter = iter->level[i].forward;
+ update[i] = iter;
+ }
+
+ /* Verify we found the right node */
+ iter = iter->level[0].forward;
+ serverAssert(iter && iter == znode);
+
+ /* Update all skiplist pointers and dict key */
+ zslUpdateNode(zs->zsl, znode, newnode, update);
+ dictSetKeyAtLink(zs->dict, newnode, &plink, 0);
+
+ /* Free the old node now that all pointers have been updated */
+ activeDefragFree(znode);
+}
+
+#define DEFRAG_SDS_DICT_NO_VAL 0
+#define DEFRAG_SDS_DICT_VAL_IS_SDS 1
+#define DEFRAG_SDS_DICT_VAL_IS_STROB 2
+#define DEFRAG_SDS_DICT_VAL_VOID_PTR 3
+#define DEFRAG_SDS_DICT_VAL_LUA_SCRIPT 4
+
+void activeDefragSdsDictCallback(void *privdata, const dictEntry *de, dictEntryLink plink) {
+ UNUSED(plink);
+ UNUSED(privdata);
+ UNUSED(de);
+}
+
+void activeDefragLuaScriptDictCallback(void *privdata, const dictEntry *de, dictEntryLink plink) {
+ UNUSED(plink);
+ UNUSED(privdata);
+
+ /* If this luaScript is in the LRU list, unconditionally update the node's
+ * value pointer to the current dict key (regardless of reallocation). */
+ luaScript *script = dictGetVal(de);
+ if (script->node)
+ script->node->value = dictGetKey(de);
+}
+
+void activeDefragHfieldDictCallback(void *privdata, const dictEntry *de, dictEntryLink plink) {
+ dict *d = privdata;
+ Entry *newEntry = NULL, *entry = dictGetKey(de);
+
+ /* If the hfield does not have TTL, we directly defrag it.
+ * Fields with TTL are skipped here and will be defragmented later
+ * during the hash expiry ebuckets defragmentation phase. */
+ if (entryGetExpiry(entry) == EB_EXPIRE_TIME_INVALID) {
+ if ((newEntry = activeDefragEntry(entry))) {
+ /* Hash dicts use no_value=1, so we must use dictSetKeyAtLink */
+ dictSetKeyAtLink(d, newEntry, &plink, 0);
+ }
+ }
+}
+
+/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */
+void activeDefragSdsDict(dict* d, int val_type) {
+ unsigned long cursor = 0;
+ dictDefragFunctions defragfns = {
+ .defragAlloc = activeDefragAlloc,
+ .defragKey = (dictDefragAllocFunction *)activeDefragSds,
+ .defragVal = (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS ? (dictDefragAllocFunction *)activeDefragSds :
+ val_type == DEFRAG_SDS_DICT_VAL_IS_STROB ? (dictDefragAllocFunction *)activeDefragStringOb :
+ val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR ? (dictDefragAllocFunction *)activeDefragAlloc :
+ val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT ? (dictDefragAllocFunction *)activeDefragLuaScript :
+ NULL)
+ };
+ dictScanFunction *fn = (val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT ?
+ activeDefragLuaScriptDictCallback : activeDefragSdsDictCallback);
+ do {
+ cursor = dictScanDefrag(d, cursor, fn,
+ &defragfns, NULL);
+ } while (cursor != 0);
+}
+
+/* Defrag a dict with hfield key (no separate value - value is part of entry). */
+void activeDefragHfieldDict(dict *d) {
+ unsigned long cursor = 0;
+ dictDefragFunctions defragfns = {
+ .defragAlloc = activeDefragAlloc, /* Only defrag dictEntry */
+ .defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */
+ .defragVal = NULL /* No separate value - value is part of the entry (hfield). */
+ };
+ do {
+ cursor = dictScanDefrag(d, cursor, activeDefragHfieldDictCallback,
+ &defragfns, d);
+ } while (cursor != 0);
+
+ /* Continue with defragmentation of hash fields that have with TTL.
+ * During the dictionary defragmentaion above, we skipped fields with TTL,
+ * Now we continue to defrag those fields by using the expiry buckets. */
+ if (d->type == &entryHashDictTypeWithHFE) {
+ cursor = 0;
+ ebDefragFunctions eb_defragfns = {
+ .defragAlloc = activeDefragAlloc,
+ .defragItem = activeDefragHfieldAndUpdateRef
+ };
+ ebuckets *eb = hashTypeGetDictMetaHFE(d);
+ while (ebScanDefrag(eb, &hashFieldExpireBucketsType, &cursor, &eb_defragfns, d)) {}
+ }
+}
+
+/* Defrag a list of ptr, sds or robj string values */
+void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
+ quicklistNode *newnode, *node = *node_ref;
+ unsigned char *newzl;
+ if ((newnode = activeDefragAlloc(node))) {
+ if (newnode->prev)
+ newnode->prev->next = newnode;
+ else
+ ql->head = newnode;
+ if (newnode->next)
+ newnode->next->prev = newnode;
+ else
+ ql->tail = newnode;
+ *node_ref = node = newnode;
+ }
+ if ((newzl = activeDefragAlloc(node->entry)))
+ node->entry = newzl;
+}
+
+void activeDefragQuickListNodes(quicklist *ql) {
+ quicklistNode *node = ql->head;
+ while (node) {
+ activeDefragQuickListNode(ql, &node);
+ node = node->next;
+ }
+}
+
+/* when the value has lots of elements, we want to handle it later and not as
+ * part of the main dictionary scan. this is needed in order to prevent latency
+ * spikes when handling large items */
+void defragLater(defragKeysCtx *ctx, kvobj *kv) {
+ if (!ctx->defrag_later) {
+ ctx->defrag_later = listCreate();
+ listSetFreeMethod(ctx->defrag_later, sdsfreegeneric);
+ ctx->defrag_later_cursor = 0;
+ }
+ sds key = sdsdup(kvobjGetKey(kv));
+ listAddNodeTail(ctx->defrag_later, key);
+}
+
+/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
+long scanLaterList(robj *ob, unsigned long *cursor, monotime endtime) {
+ quicklist *ql = ob->ptr;
+ quicklistNode *node;
+ long iterations = 0;
+ int bookmark_failed = 0;
+ serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST);
+
+ if (*cursor == 0) {
+ /* if cursor is 0, we start new iteration */
+ node = ql->head;
+ } else {
+ node = quicklistBookmarkFind(ql, "_AD");
+ if (!node) {
+ /* if the bookmark was deleted, it means we reached the end. */
+ *cursor = 0;
+ return 0;
+ }
+ node = node->next;
+ }
+
+ (*cursor)++;
+ while (node) {
+ activeDefragQuickListNode(ql, &node);
+ server.stat_active_defrag_scanned++;
+ if (++iterations > 128 && !bookmark_failed) {
+ if (getMonotonicUs() > endtime) {
+ if (!quicklistBookmarkCreate(&ql, "_AD", node)) {
+ bookmark_failed = 1;
+ } else {
+ ob->ptr = ql; /* bookmark creation may have re-allocated the quicklist */
+ return 1;
+ }
+ }
+ iterations = 0;
+ }
+ node = node->next;
+ }
+ quicklistBookmarkDelete(ql, "_AD");
+ *cursor = 0;
+ return bookmark_failed? 1: 0;
+}
+
+typedef struct {
+ zset *zs;
+} scanLaterZsetData;
+
+void scanZsetCallback(void *privdata, const dictEntry *_de, dictEntryLink plink) {
+ dictEntry *de = (dictEntry*)_de;
+ scanLaterZsetData *data = privdata;
+ activeDefragZsetNode(data->zs, de, plink);
+ server.stat_active_defrag_scanned++;
+}
+
+void scanLaterZset(robj *ob, unsigned long *cursor) {
+ serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
+ zset *zs = (zset*)ob->ptr;
+ dict *d = zs->dict;
+ scanLaterZsetData data = {zs};
+ dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc};
+ *cursor = dictScanDefrag(d, *cursor, scanZsetCallback, &defragfns, &data);
+}
+
+/* Used as scan callback when all the work is done in the dictDefragFunctions. */
+void scanCallbackCountScanned(void *privdata, const dictEntry *de, dictEntryLink plink) {
+ UNUSED(plink);
+ UNUSED(privdata);
+ UNUSED(de);
+ server.stat_active_defrag_scanned++;
+}
+
+void scanLaterSet(robj *ob, unsigned long *cursor) {
+ serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT);
+ dict *d = ob->ptr;
+ dictDefragFunctions defragfns = {
+ .defragAlloc = activeDefragAlloc,
+ .defragKey = (dictDefragAllocFunction *)activeDefragSds
+ };
+ *cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL);
+}
+
+void scanLaterHash(robj *ob, unsigned long *cursor) {
+ serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
+ dict *d = ob->ptr;
+
+ typedef enum {
+ HASH_DEFRAG_NONE = 0,
+ HASH_DEFRAG_DICT = 1,
+ HASH_DEFRAG_EBUCKETS = 2
+ } hashDefragPhase;
+ static hashDefragPhase defrag_phase = HASH_DEFRAG_NONE;
+
+ /* Start a new hash defrag. */
+ if (!*cursor || defrag_phase == HASH_DEFRAG_NONE)
+ defrag_phase = HASH_DEFRAG_DICT;
+
+ /* Defrag hash dictionary but skip TTL fields. */
+ if (defrag_phase == HASH_DEFRAG_DICT) {
+ dictDefragFunctions defragfns = {
+ .defragAlloc = activeDefragAlloc,
+ .defragKey = NULL, /* Will be defragmented in activeDefragHfieldDictCallback. */
+ .defragVal = NULL /* value stored along with key as part of Entry */
+ };
+ *cursor = dictScanDefrag(d, *cursor, activeDefragHfieldDictCallback, &defragfns, d);
+
+ /* Move to next phase. */
+ if (!*cursor) defrag_phase = HASH_DEFRAG_EBUCKETS;
+ }
+
+ /* Defrag ebuckets and TTL fields. */
+ if (defrag_phase == HASH_DEFRAG_EBUCKETS) {
+ if (d->type == &entryHashDictTypeWithHFE) {
+ ebDefragFunctions eb_defragfns = {
+ .defragAlloc = activeDefragAlloc,
+ .defragItem = activeDefragHfieldAndUpdateRef
+ };
+ ebuckets *eb = hashTypeGetDictMetaHFE(d);
+ ebScanDefrag(eb, &hashFieldExpireBucketsType, cursor, &eb_defragfns, d);
+ } else {
+ /* Finish defragmentation if this dict doesn't have expired fields. */
+ *cursor = 0;
+ }
+ if (!*cursor) defrag_phase = HASH_DEFRAG_NONE;
+ }
+}
+
+void defragQuicklist(defragKeysCtx *ctx, kvobj *kv) {
+ quicklist *ql = kv->ptr, *newql;
+ serverAssert(kv->type == OBJ_LIST && kv->encoding == OBJ_ENCODING_QUICKLIST);
+ if ((newql = activeDefragAlloc(ql)))
+ kv->ptr = ql = newql;
+ if (ql->len > server.active_defrag_max_scan_fields)
+ defragLater(ctx, kv);
+ else
+ activeDefragQuickListNodes(ql);
+}
+
+void defragZsetSkiplist(defragKeysCtx *ctx, kvobj *ob) {
+ zset *zs = (zset*)ob->ptr;
+ zset *newzs;
+ zskiplist *newzsl;
+ dict *newdict;
+ struct zskiplistNode *newheader;
+ serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
+ if ((newzs = activeDefragAlloc(zs)))
+ ob->ptr = zs = newzs;
+ if ((newzsl = activeDefragAlloc(zs->zsl)))
+ zs->zsl = newzsl;
+ if ((newheader = activeDefragAlloc(zs->zsl->header)))
+ zs->zsl->header = newheader;
+ if (dictSize(zs->dict) > server.active_defrag_max_scan_fields)
+ defragLater(ctx, ob);
+ else {
+ /* Use dictScanDefrag to iterate and defrag both dictEntry structures and skiplist nodes.
+ * dictScanDefrag handles defragging dictEntry/dictEntryNoValue structures via defragfns,
+ * and calls our callback with plink for each entry so we can defrag skiplist nodes. */
+ scanLaterZsetData data = {zs};
+ dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc};
+ unsigned long cursor = 0;
+ do {
+ cursor = dictScanDefrag(zs->dict, cursor, scanZsetCallback, &defragfns, &data);
+ } while (cursor != 0);
+ }
+ /* defrag the dict struct and tables */
+ if ((newdict = dictDefragTables(zs->dict)))
+ zs->dict = newdict;
+}
+
+void defragHash(defragKeysCtx *ctx, kvobj *ob) {
+ dict *d, *newd;
+ serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
+ d = ob->ptr;
+ if (dictSize(d) > server.active_defrag_max_scan_fields)
+ defragLater(ctx, ob);
+ else
+ activeDefragHfieldDict(d);
+ /* defrag the dict struct and tables */
+ if ((newd = dictDefragTables(ob->ptr)))
+ ob->ptr = newd;
+}
+
+void defragSet(defragKeysCtx *ctx, kvobj *ob) {
+ dict *d, *newd;
+ serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT);
+ d = ob->ptr;
+ if (dictSize(d) > server.active_defrag_max_scan_fields)
+ defragLater(ctx, ob);
+ else
+ activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL);
+ /* defrag the dict struct and tables */
+ if ((newd = dictDefragTables(ob->ptr)))
+ ob->ptr = newd;
+}
+
+/* Defrag callback for radix tree iterator, called for each node,
+ * used in order to defrag the nodes allocations. */
+int defragRaxNode(raxNode **noderef, void *privdata) {
+ UNUSED(privdata);
+ raxNode *newnode = activeDefragAlloc(*noderef);
+ if (newnode) {
+ *noderef = newnode;
+ return 1;
+ }
+ return 0;
+}
+
+/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
+int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime) {
+ static unsigned char next[sizeof(streamID)];
+ raxIterator ri;
+ long iterations = 0;
+ serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM);
+
+ stream *s = ob->ptr;
+ raxStart(&ri,s->rax);
+ if (*cursor == 0) {
+ /* if cursor is 0, we start new iteration */
+ defragRaxNode(&s->rax->head, NULL);
+ /* assign the iterator node callback before the seek, so that the
+ * initial nodes that are processed till the first item are covered */
+ ri.node_cb = defragRaxNode;
+ raxSeek(&ri,"^",NULL,0);
+ } else {
+ /* if cursor is non-zero, we seek to the static 'next'.
+ * Since node_cb is set after seek operation, any node traversed during seek wouldn't
+ * be defragmented. To prevent this, we advance to next node before exiting previous
+ * run, ensuring it gets defragmented instead of being skipped during current seek. */
+ if (!raxSeek(&ri,">=", next, sizeof(next))) {
+ *cursor = 0;
+ raxStop(&ri);
+ return 0;
+ }
+ /* assign the iterator node callback after the seek, so that the
+ * initial nodes that are processed till now aren't covered */
+ ri.node_cb = defragRaxNode;
+ }
+
+ (*cursor)++;
+ while (raxNext(&ri)) {
+ void *newdata = activeDefragAlloc(ri.data);
+ if (newdata)
+ raxSetData(ri.node, ri.data=newdata);
+ server.stat_active_defrag_scanned++;
+ if (++iterations > 128) {
+ if (getMonotonicUs() > endtime) {
+ /* Move to next node. */
+ if (!raxNext(&ri)) {
+ /* If we reached the end, we can stop */
+ *cursor = 0;
+ raxStop(&ri);
+ return 0;
+ }
+ serverAssert(ri.key_len==sizeof(next));
+ memcpy(next,ri.key,ri.key_len);
+ raxStop(&ri);
+ return 1;
+ }
+ iterations = 0;
+ }
+ }
+ raxStop(&ri);
+ *cursor = 0;
+ return 0;
+}
+
+/* optional callback used defrag each rax element (not including the element pointer itself) */
+typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata);
+
+/* defrag radix tree including:
+ * 1) rax struct
+ * 2) rax nodes
+ * 3) rax entry data (only if defrag_data is specified)
+ * 4) call a callback per element, and allow the callback to return a new pointer for the element */
+void defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) {
+ raxIterator ri;
+ rax* rax;
+ if ((rax = activeDefragAlloc(*raxref)))
+ *raxref = rax;
+ rax = *raxref;
+ raxStart(&ri,rax);
+ ri.node_cb = defragRaxNode;
+ defragRaxNode(&rax->head, NULL);
+ raxSeek(&ri,"^",NULL,0);
+ while (raxNext(&ri)) {
+ void *newdata = NULL;
+ if (element_cb)
+ newdata = element_cb(&ri, element_cb_data);
+ if (defrag_data && !newdata)
+ newdata = activeDefragAlloc(ri.data);
+ if (newdata)
+ raxSetData(ri.node, ri.data=newdata);
+ }
+ raxStop(&ri);
+}
+
+typedef struct {
+ streamCG *cg;
+ streamConsumer *c;
+} PendingEntryContext;
+
+void* defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) {
+ PendingEntryContext *ctx = privdata;
+ streamNACK *nack = ri->data, *newnack;
+ nack->consumer = ctx->c; /* update nack pointer to consumer */
+ nack->cgroup_ref_node->value = ctx->cg; /* Update the value of cgroups_ref node to the consumer group. */
+ newnack = activeDefragAlloc(nack);
+ if (newnack) {
+ /* Update consumer group pointer to the nack.
+ * pel_by_time doesn't need updating since delivery time is unchanged. */
+ void *prev;
+ raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev);
+ serverAssert(prev==nack);
+ }
+ return newnack;
+}
+
+typedef struct {
+ stream *s;
+ streamCG *cg;
+} StreamConsumerContext;
+
+void* defragStreamConsumer(raxIterator *ri, void *privdata) {
+ StreamConsumerContext *ctx = privdata;
+ stream *s = ctx->s;
+ streamCG *cg = ctx->cg;
+ streamConsumer *c = ri->data;
+ void *newc = activeDefragAlloc(c);
+ if (newc) {
+ c = newc;
+ }
+ sds newsds = activeDefragSds(c->name);
+ if (newsds)
+ c->name = newsds;
+ if (c->pel) {
+ /* Update pel back-pointer to new stream */
+ c->pel->alloc_size = &s->alloc_size;
+ PendingEntryContext pel_ctx = {cg, c};
+ defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx);
+ }
+ return newc; /* returns NULL if c was not defragged */
+}
+
+void* defragStreamConsumerGroup(raxIterator *ri, void *privdata) {
+ stream *s = privdata;
+ streamCG *newcg, *cg = ri->data;
+ if ((newcg = activeDefragAlloc(cg)))
+ cg = newcg;
+ if (cg->pel) {
+ /* Update pel back-pointer to new stream */
+ cg->pel->alloc_size = &s->alloc_size;
+ defragRadixTree(&cg->pel, 0, NULL, NULL);
+ }
+ if (cg->pel_by_time) {
+ /* Update pel_by_time back-pointer to new stream */
+ cg->pel_by_time->alloc_size = &s->alloc_size;
+ defragRadixTree(&cg->pel_by_time, 0, NULL, NULL);
+ }
+ if (cg->consumers) {
+ /* Update consumers back-pointer to new stream */
+ cg->consumers->alloc_size = &s->alloc_size;
+ StreamConsumerContext consumer_ctx = {s, cg};
+ defragRadixTree(&cg->consumers, 0, defragStreamConsumer, &consumer_ctx);
+ }
+ return cg;
+}
+
+/* Defrag a single idmpProducer's dict and linked list entries. */
+static void defragIdmpProducer(idmpProducer *producer) {
+ if (producer->idmp_dict == NULL) return;
+
+ dict *newdict = dictDefragTables(producer->idmp_dict);
+ if (newdict)
+ producer->idmp_dict = newdict;
+
+ idmpEntry *prev = NULL;
+ idmpEntry *entry = producer->idmp_head;
+ while (entry != NULL) {
+ idmpEntry *next = entry->next;
+ idmpEntry *newentry = activeDefragAllocWithoutFree(entry);
+ if (newentry) {
+ dictEntry *de = dictFind(producer->idmp_dict, entry);
+ serverAssert(de);
+ dictSetKey(producer->idmp_dict, de, newentry);
+ if (prev)
+ prev->next = newentry;
+ else
+ producer->idmp_head = newentry;
+ if (producer->idmp_tail == entry)
+ producer->idmp_tail = newentry;
+ activeDefragFree(entry);
+ entry = newentry;
+ }
+ prev = entry;
+ entry = next;
+ }
+}
+
+/* Defrag all IDMP producers and their dict/linked list entries. */
+void defragStreamIdmpProducers(stream *s) {
+ if (s->idmp_producers == NULL) return;
+
+ /* Defrag the producers rax tree itself */
+ rax *newrax = activeDefragAlloc(s->idmp_producers);
+ if (newrax)
+ s->idmp_producers = newrax;
+
+ /* Defrag the rax head node */
+ defragRaxNode(&s->idmp_producers->head, NULL);
+
+ /* Iterate through all producers and defrag each one */
+ raxIterator ri;
+ raxStart(&ri, s->idmp_producers);
+ /* Set the node callback to defrag internal rax nodes */
+ ri.node_cb = defragRaxNode;
+ raxSeek(&ri, "^", NULL, 0);
+ while (raxNext(&ri)) {
+ idmpProducer *producer = ri.data;
+ idmpProducer *newproducer = activeDefragAlloc(producer);
+ if (newproducer) {
+ raxSetData(ri.node, ri.data=newproducer);
+ producer = newproducer;
+ }
+ defragIdmpProducer(producer);
+ }
+ raxStop(&ri);
+}
+
+void defragStream(defragKeysCtx *ctx, kvobj *ob) {
+ serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM);
+ stream *s = ob->ptr, *news;
+
+ /* handle the main struct */
+ if ((news = activeDefragAlloc(s)))
+ ob->ptr = s = news;
+
+ /* Update rax back-pointer to new stream */
+ s->rax->alloc_size = &s->alloc_size;
+ if (raxSize(s->rax) > server.active_defrag_max_scan_fields) {
+ rax *newrax = activeDefragAlloc(s->rax);
+ if (newrax)
+ s->rax = newrax;
+ defragLater(ctx, ob);
+ } else
+ defragRadixTree(&s->rax, 1, NULL, NULL);
+
+ if (s->cgroups) {
+ /* Update cgroups back-pointer to new stream */
+ s->cgroups->alloc_size = &s->alloc_size;
+ defragRadixTree(&s->cgroups, 0, defragStreamConsumerGroup, s);
+ }
+
+ if (s->cgroups_ref) {
+ /* Update cgroups_ref back-pointer to new stream */
+ s->cgroups_ref->alloc_size = &s->alloc_size;
+ }
+
+ if (s->idmp_producers) {
+ /* Defrag the producers and all idmpEntry structures in their linked lists */
+ defragStreamIdmpProducers(s);
+ }
+}
+
+/* Defrag a module key. This is either done immediately or scheduled
+ * for later. Returns then number of pointers defragged.
+ */
+void defragModule(defragKeysCtx *ctx, redisDb *db, kvobj *kv) {
+ serverAssert(kv->type == OBJ_MODULE);
+ robj keyobj;
+ initStaticStringObject(keyobj, kvobjGetKey(kv));
+ if (!moduleDefragValue(&keyobj, kv, db->id))
+ defragLater(ctx, kv);
+}
+
+/* Defrag a kvobj structure, taking into account optional preceding metadata.
+ * For EMBSTR strings, also defrags the embedded string value in the same allocation.
+ * For RAW strings and other types, only the kvobj wrapper is defragged here;
+ * the value's internal data structures are defragged separately in defragKey().
+ *
+ * Returns NULL if the allocation wasn't moved.
+ * When it returns a non-null value, the old pointer was already released
+ * (unless without_free is set) and should NOT be accessed. */
+robj *activeDefragKvobj(kvobj* kv, int without_free) {
+ void *alloc, *newalloc;
+ kvobj *kvNew = NULL;
+ /* Use LONG_MIN as sentinel to detect if we have an EMBSTR string */
+ long offsetEmbstr = LONG_MIN;
+
+ /* Don't defrag kvobj's with multiple references (refcount > 1) */
+ if (kv->refcount != 1)
+ return NULL;
+
+ /* Calculate offset for EMBSTR strings */
+ if ((kv->type == OBJ_STRING) && (kv->encoding == OBJ_ENCODING_EMBSTR))
+ offsetEmbstr = (intptr_t)kv->ptr - (intptr_t)kv;
+
+ /* Defrag the kvobj allocation (including optional metadata prefix).
+ * For EMBSTR strings, this allocation also contains the embedded string data,
+ * so we'll need to recalculate the ptr offset after defragmentation (see below). */
+
+ alloc = kvobjGetAllocPtr(kv);
+ size_t metaBytes = (char *)kv - (char *)alloc;
+ if (without_free)
+ newalloc = activeDefragAllocWithoutFree(alloc);
+ else
+ newalloc = activeDefragAlloc(alloc);
+
+ if (!newalloc)
+ return NULL;
+
+ /* Update kv pointer to new allocation */
+ kvNew = (kvobj *)((char *)newalloc + metaBytes);
+
+ /* For EMBSTR strings, recalculate ptr to point to the embedded string data
+ * at the same offset within the new allocation */
+ if (offsetEmbstr != LONG_MIN)
+ kvNew->ptr = (void*)((intptr_t)kvNew + offsetEmbstr);
+
+ return kvNew;
+}
+
+/* for each key we scan in the main dict, this function will attempt to defrag
+ * all the various pointers it has. */
+void defragKey(defragKeysCtx *ctx, dictEntry *de, dictEntryLink link) {
+ UNUSED(link);
+ dictEntryLink exlink = NULL;
+ kvobj *kvnew = NULL, *ob = dictGetKV(de);
+ size_t oldsize = 0;
+ redisDb *db = &server.db[ctx->dbid];
+ int slot = ctx->kvstate.slot;
+ unsigned char *newzl;
+
+ if (server.memory_tracking_per_slot)
+ oldsize = kvobjAllocSize(ob);
+
+ long long expire = kvobjGetExpire(ob);
+ /* We can't search in db->expires for that KV after we've released
+ * the pointer it holds, since it won't be able to do the string
+ * compare. Search it before, if needed. */
+ if (expire != -1) {
+ exlink = kvstoreDictFindLink(db->expires, slot, kvobjGetKey(ob), NULL);
+ serverAssert(exlink != NULL);
+ }
+
+ /* Try to defrag robj. For hash objects with HFEs,
+ * defer defragmentation until processing db's subexpires. */
+ if (!(ob->type == OBJ_HASH && hashTypeGetMinExpire(ob, 0) != EB_EXPIRE_TIME_INVALID)) {
+ /* If the dict doesn't have metadata, we directly defrag it. */
+ kvnew = activeDefragKvobj(ob, 0);
+ }
+ if (kvnew) {
+ kvstoreDictSetAtLink(db->keys, slot, kvnew, &link, 0);
+ if (expire != -1)
+ kvstoreDictSetAtLink(db->expires, slot, kvnew, &exlink, 0);
+ ob = kvnew;
+ }
+
+ if (ob->type == OBJ_STRING) {
+ /* Only defrag strings with refcount==1 (String might be shared as dict
+ * keys, e.g. pub/sub channels, and may be accessed by IO threads. Other
+ * types are never used as dict keys) */
+ if ((ob->refcount==1) && (ob->encoding == OBJ_ENCODING_RAW)) {
+ /* For RAW strings, defrag the separate SDS allocation */
+ sds newsds = activeDefragSds((sds)ob->ptr);
+ if (newsds) ob->ptr = newsds;
+ }
+ } else if (ob->type == OBJ_LIST) {
+ if (ob->encoding == OBJ_ENCODING_QUICKLIST) {
+ defragQuicklist(ctx, ob);
+ } else if (ob->encoding == OBJ_ENCODING_LISTPACK) {
+ if ((newzl = activeDefragAlloc(ob->ptr)))
+ ob->ptr = newzl;
+ } else {
+ serverPanic("Unknown list encoding");
+ }
+ } else if (ob->type == OBJ_SET) {
+ if (ob->encoding == OBJ_ENCODING_HT) {
+ defragSet(ctx, ob);
+ } else if (ob->encoding == OBJ_ENCODING_INTSET ||
+ ob->encoding == OBJ_ENCODING_LISTPACK)
+ {
+ void *newptr, *ptr = ob->ptr;
+ if ((newptr = activeDefragAlloc(ptr)))
+ ob->ptr = newptr;
+ } else {
+ serverPanic("Unknown set encoding");
+ }
+ } else if (ob->type == OBJ_ZSET) {
+ if (ob->encoding == OBJ_ENCODING_LISTPACK) {
+ if ((newzl = activeDefragAlloc(ob->ptr)))
+ ob->ptr = newzl;
+ } else if (ob->encoding == OBJ_ENCODING_SKIPLIST) {
+ defragZsetSkiplist(ctx, ob);
+ } else {
+ serverPanic("Unknown sorted set encoding");
+ }
+ } else if (ob->type == OBJ_HASH) {
+ if (ob->encoding == OBJ_ENCODING_LISTPACK) {
+ if ((newzl = activeDefragAlloc(ob->ptr)))
+ ob->ptr = newzl;
+ } else if (ob->encoding == OBJ_ENCODING_LISTPACK_EX) {
+ listpackEx *newlpt, *lpt = (listpackEx*)ob->ptr;
+ if ((newlpt = activeDefragAlloc(lpt)))
+ ob->ptr = lpt = newlpt;
+ if ((newzl = activeDefragAlloc(lpt->lp)))
+ lpt->lp = newzl;
+ } else if (ob->encoding == OBJ_ENCODING_HT) {
+ defragHash(ctx, ob);
+ } else {
+ serverPanic("Unknown hash encoding");
+ }
+ } else if (ob->type == OBJ_STREAM) {
+ defragStream(ctx, ob);
+ } else if (ob->type == OBJ_MODULE) {
+ defragModule(ctx,db, ob);
+ } else {
+ serverPanic("Unknown object type");
+ }
+ if (server.memory_tracking_per_slot)
+ updateSlotAllocSize(db, slot, oldsize, kvobjAllocSize(ob));
+}
+
+/* Defrag scan callback for the main db dictionary. */
+static void dbKeysScanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) {
+ long long hits_before = server.stat_active_defrag_hits;
+ defragKey((defragKeysCtx *)privdata, (dictEntry *)de, plink);
+ if (server.stat_active_defrag_hits != hits_before)
+ server.stat_active_defrag_key_hits++;
+ else
+ server.stat_active_defrag_key_misses++;
+ server.stat_active_defrag_scanned++;
+}
+
+#if !defined(DEBUG_DEFRAG_FORCE)
+/* Utility function to get the fragmentation ratio from jemalloc.
+ * It is critical to do that by comparing only heap maps that belong to
+ * jemalloc, and skip ones the jemalloc keeps as spare. Since we use this
+ * fragmentation ratio in order to decide if a defrag action should be taken
+ * or not, a false detection can cause the defragmenter to waste a lot of CPU
+ * without the possibility of getting any results. */
+float getAllocatorFragmentation(size_t *out_frag_bytes) {
+ size_t resident, active, allocated, frag_smallbins_bytes;
+ zmalloc_get_allocator_info(1, &allocated, &active, &resident, NULL, NULL, &frag_smallbins_bytes);
+
+ if (server.lua_arena != UINT_MAX) {
+ size_t lua_resident, lua_active, lua_allocated, lua_frag_smallbins_bytes;
+ zmalloc_get_allocator_info_by_arena(server.lua_arena, 0, &lua_allocated, &lua_active, &lua_resident, &lua_frag_smallbins_bytes);
+ resident -= lua_resident;
+ active -= lua_active;
+ allocated -= lua_allocated;
+ frag_smallbins_bytes -= lua_frag_smallbins_bytes;
+ }
+
+ /* Calculate the fragmentation ratio as the proportion of wasted memory in small
+ * bins (which are defraggable) relative to the total allocated memory (including large bins).
+ * This is because otherwise, if most of the memory usage is large bins, we may show high percentage,
+ * despite the fact it's not a lot of memory for the user. */
+ float frag_pct = (float)frag_smallbins_bytes / allocated * 100;
+ float rss_pct = ((float)resident / allocated)*100 - 100;
+ size_t rss_bytes = resident - allocated;
+ if(out_frag_bytes)
+ *out_frag_bytes = frag_smallbins_bytes;
+ serverLog(LL_DEBUG,
+ "allocated=%zu, active=%zu, resident=%zu, frag=%.2f%% (%.2f%% rss), frag_bytes=%zu (%zu rss)",
+ allocated, active, resident, frag_pct, rss_pct, frag_smallbins_bytes, rss_bytes);
+ return frag_pct;
+}
+#else
+float getAllocatorFragmentation(size_t *out_frag_bytes) {
+ if (out_frag_bytes)
+ *out_frag_bytes = SIZE_MAX;
+ return 99; /* The maximum percentage of fragmentation */
+}
+#endif
+
+/* Defrag scan callback for the pubsub dictionary. */
+void defragPubsubScanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) {
+ UNUSED(plink);
+ defragPubSubCtx *ctx = privdata;
+ kvstore *pubsub_channels = ctx->kvstate.kvs;
+ robj *newchannel, *channel = dictGetKey(de);
+ dict *newclients, *clients = dictGetVal(de);
+
+ /* Try to defrag the channel name. */
+ serverAssert(channel->refcount == (int)dictSize(clients) + 1);
+ newchannel = activeDefragStringObEx(channel, dictSize(clients) + 1);
+ if (newchannel) {
+ kvstoreDictSetKey(pubsub_channels, ctx->kvstate.slot, (dictEntry*)de, newchannel);
+
+ /* The channel name is shared by the client's pubsub(shard) and server's
+ * pubsub(shard), after defraging the channel name, we need to update
+ * the reference in the clients' dictionary. */
+ dictIterator di;
+ dictEntry *clientde;
+ dictInitIterator(&di, clients);
+ while((clientde = dictNext(&di)) != NULL) {
+ client *c = dictGetKey(clientde);
+ dict *client_channels = ctx->getPubSubChannels(c);
+ uint64_t hash = dictGetHash(client_channels, newchannel);
+ dictEntry *pubsub_channel = dictFindByHashAndPtr(client_channels, channel, hash);
+ serverAssert(pubsub_channel);
+ dictSetKey(ctx->getPubSubChannels(c), pubsub_channel, newchannel);
+ }
+ dictResetIterator(&di);
+ }
+
+ /* Try to defrag the dictionary of clients that is stored as the value part. */
+ if ((newclients = dictDefragTables(clients)))
+ kvstoreDictSetVal(pubsub_channels, ctx->kvstate.slot, (dictEntry *)de, newclients);
+
+ server.stat_active_defrag_scanned++;
+}
+
+/* returns 0 more work may or may not be needed (see non-zero cursor),
+ * and 1 if time is up and more work is needed. */
+int defragLaterItem(kvobj *ob, unsigned long *cursor, monotime endtime, int dbid) {
+ if (ob) {
+ if (ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST) {
+ return scanLaterList(ob, cursor, endtime);
+ } else if (ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT) {
+ scanLaterSet(ob, cursor);
+ } else if (ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST) {
+ scanLaterZset(ob, cursor);
+ } else if (ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT) {
+ scanLaterHash(ob, cursor);
+ } else if (ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM) {
+ return scanLaterStreamListpacks(ob, cursor, endtime);
+ } else if (ob->type == OBJ_MODULE) {
+ robj keyobj;
+ initStaticStringObject(keyobj, kvobjGetKey(ob));
+ return moduleLateDefrag(&keyobj, ob, cursor, endtime, dbid);
+ } else {
+ *cursor = 0; /* object type/encoding may have changed since we schedule it for later */
+ }
+ } else {
+ *cursor = 0; /* object may have been deleted already */
+ }
+ return 0;
+}
+
+static int defragIsRunning(void) {
+ return (defrag.timeproc_id > 0);
+}
+
+/* A kvstoreHelperPreContinueFn */
+static doneStatus defragLaterStep(void *ctx, monotime endtime) {
+ defragKeysCtx *defrag_keys_ctx = ctx;
+ redisDb *db = &server.db[defrag_keys_ctx->dbid];
+ int slot = defrag_keys_ctx->kvstate.slot;
+ size_t oldsize = 0;
+
+ unsigned int iterations = 0;
+ unsigned long long prev_defragged = server.stat_active_defrag_hits;
+ unsigned long long prev_scanned = server.stat_active_defrag_scanned;
+
+ while (defrag_keys_ctx->defrag_later && listLength(defrag_keys_ctx->defrag_later) > 0) {
+ listNode *head = listFirst(defrag_keys_ctx->defrag_later);
+ sds key = head->value;
+ dictEntry *de = kvstoreDictFind(defrag_keys_ctx->kvstate.kvs, defrag_keys_ctx->kvstate.slot, key);
+ kvobj *kv = de ? dictGetKV(de) : NULL;
+
+ long long key_defragged = server.stat_active_defrag_hits;
+ if (server.memory_tracking_per_slot && kv)
+ oldsize = kvobjAllocSize(kv);
+ int timeout = (defragLaterItem(kv, &defrag_keys_ctx->defrag_later_cursor, endtime, defrag_keys_ctx->dbid) == 1);
+ if (server.memory_tracking_per_slot && kv)
+ updateSlotAllocSize(db, slot, oldsize, kvobjAllocSize(kv));
+ if (key_defragged != server.stat_active_defrag_hits) {
+ server.stat_active_defrag_key_hits++;
+ } else {
+ server.stat_active_defrag_key_misses++;
+ }
+
+ if (timeout) break;
+
+ if (defrag_keys_ctx->defrag_later_cursor == 0) {
+ /* the item is finished, move on */
+ listDelNode(defrag_keys_ctx->defrag_later, head);
+ }
+
+ if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 ||
+ server.stat_active_defrag_scanned - prev_scanned > 64) {
+ if (getMonotonicUs() > endtime) break;
+ iterations = 0;
+ prev_defragged = server.stat_active_defrag_hits;
+ prev_scanned = server.stat_active_defrag_scanned;
+ }
+ }
+
+ return (!defrag_keys_ctx->defrag_later || listLength(defrag_keys_ctx->defrag_later) == 0) ? DEFRAG_DONE : DEFRAG_NOT_DONE;
+}
+
+#define INTERPOLATE(x, x1, x2, y1, y2) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) )
+#define LIMIT(y, min, max) ((y)<(min)? min: ((y)>(max)? max: (y)))
+
+/* decide if defrag is needed, and at what CPU effort to invest in it */
+void computeDefragCycles(void) {
+ size_t frag_bytes;
+ float frag_pct = getAllocatorFragmentation(&frag_bytes);
+ /* If we're not already running, and below the threshold, exit. */
+ if (!server.active_defrag_running) {
+ if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes)
+ return;
+ }
+
+ /* Calculate the adaptive aggressiveness of the defrag based on the current
+ * fragmentation and configurations. */
+ int cpu_pct = INTERPOLATE(frag_pct,
+ server.active_defrag_threshold_lower,
+ server.active_defrag_threshold_upper,
+ server.active_defrag_cycle_min,
+ server.active_defrag_cycle_max);
+ cpu_pct *= defrag.decay_rate;
+ cpu_pct = LIMIT(cpu_pct,
+ server.active_defrag_cycle_min,
+ server.active_defrag_cycle_max);
+
+ /* Normally we allow increasing the aggressiveness during a scan, but don't
+ * reduce it, since we should not lower the aggressiveness when fragmentation
+ * drops. But when a configuration is made, we should reconsider it. */
+ if (cpu_pct > server.active_defrag_running ||
+ server.active_defrag_configuration_changed)
+ {
+ server.active_defrag_configuration_changed = 0;
+ if (defragIsRunning()) {
+ serverLog(LL_VERBOSE, "Changing active defrag CPU, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%",
+ frag_pct, frag_bytes, cpu_pct);
+ } else {
+ serverLog(LL_VERBOSE,
+ "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%",
+ frag_pct, frag_bytes, cpu_pct);
+ }
+ server.active_defrag_running = cpu_pct;
+ }
+}
+
+/* This helper function handles most of the work for iterating over a kvstore. 'privdata', if
+ * provided, MUST begin with 'kvstoreIterState' and this part is automatically updated by this
+ * function during the iteration. */
+static doneStatus defragStageKvstoreHelper(monotime endtime,
+ void *ctx,
+ dictScanFunction scan_fn,
+ kvstoreHelperPreContinueFn precontinue_fn,
+ dictDefragFunctions *defragfns)
+{
+ unsigned int iterations = 0;
+ unsigned long long prev_defragged = server.stat_active_defrag_hits;
+ unsigned long long prev_scanned = server.stat_active_defrag_scanned;
+ kvstoreIterState *state = (kvstoreIterState*)ctx;
+
+ if (state->slot == ITER_SLOT_DEFRAG_LUT) {
+ /* Before we start scanning the kvstore, handle the main structures */
+ do {
+ state->cursor = kvstoreDictLUTDefrag(state->kvs, state->cursor, dictDefragTables);
+ if (getMonotonicUs() >= endtime) return DEFRAG_NOT_DONE;
+ } while (state->cursor != 0);
+ state->slot = ITER_SLOT_UNASSIGNED;
+ }
+
+ while (1) {
+ if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || server.stat_active_defrag_scanned - prev_scanned > 64) {
+ if (getMonotonicUs() >= endtime) break;
+ iterations = 0;
+ prev_defragged = server.stat_active_defrag_hits;
+ prev_scanned = server.stat_active_defrag_scanned;
+ }
+
+ if (precontinue_fn) {
+ if (precontinue_fn(ctx, endtime) == DEFRAG_NOT_DONE) return DEFRAG_NOT_DONE;
+ }
+
+ if (!state->cursor) {
+ /* If there's no cursor, we're ready to begin a new kvstore slot. */
+ if (state->slot == ITER_SLOT_UNASSIGNED) {
+ state->slot = kvstoreGetFirstNonEmptyDictIndex(state->kvs);
+ } else {
+ state->slot = kvstoreGetNextNonEmptyDictIndex(state->kvs, state->slot);
+ }
+
+ if (state->slot == ITER_SLOT_UNASSIGNED) return DEFRAG_DONE;
+ }
+
+ /* Whatever privdata's actual type, this function requires that it begins with kvstoreIterState. */
+ state->cursor = kvstoreDictScanDefrag(state->kvs, state->slot, state->cursor,
+ scan_fn, defragfns, ctx);
+ }
+
+ return DEFRAG_NOT_DONE;
+}
+
+static doneStatus defragStageDbKeys(void *ctx, monotime endtime) {
+ defragKeysCtx *defrag_keys_ctx = ctx;
+ redisDb *db = &server.db[defrag_keys_ctx->dbid];
+ if (db->keys != defrag_keys_ctx->kvstate.kvs) {
+ /* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */
+ return DEFRAG_DONE;
+ }
+
+ /* Note: for DB keys, we use the start/finish callback to fix an expires table entry if
+ * the main DB entry has been moved. */
+ static dictDefragFunctions defragfns = {
+ .defragAlloc = activeDefragAlloc,
+ .defragKey = NULL, /* Handled by dbKeysScanCallback */
+ .defragVal = NULL, /* Handled by dbKeysScanCallback */
+ };
+
+ return defragStageKvstoreHelper(endtime, ctx,
+ dbKeysScanCallback, defragLaterStep, &defragfns);
+}
+
+static doneStatus defragStageExpiresKvstore(void *ctx, monotime endtime) {
+ defragKeysCtx *defrag_keys_ctx = ctx;
+ redisDb *db = &server.db[defrag_keys_ctx->dbid];
+ if (db->expires != defrag_keys_ctx->kvstate.kvs) {
+ /* There has been a change of the kvs (flushdb, swapdb, etc.). Just complete the stage. */
+ return DEFRAG_DONE;
+ }
+
+ static dictDefragFunctions defragfns = {
+ .defragAlloc = activeDefragAlloc,
+ .defragKey = NULL, /* Not needed for expires (just a ref) */
+ .defragVal = NULL, /* Not needed for expires (no value) */
+ };
+ return defragStageKvstoreHelper(endtime, ctx,
+ scanCallbackCountScanned, NULL, &defragfns);
+}
+
+/* Defrag (hash) object with subexpiry and update its reference in the DB keys. */
+void *activeDefragSubexpiresOB(void *ptr, void *privdata) {
+ redisDb *db = privdata;
+ dictEntryLink link, exlink = NULL;
+ kvobj *newkv, *kv = ptr;
+ sds keystr = kvobjGetKey(kv);
+ unsigned int slot = calculateKeySlot(keystr);
+
+ serverAssert(kv->type == OBJ_HASH); /* Currently relevant only for hashes */
+
+ long long expire = kvobjGetExpire(kv);
+ /* We can't search in db->expires for that KV after we've released
+ * the pointer it holds, since it won't be able to do the string
+ * compare. Search it before, if needed. */
+ if (expire != -1) {
+ exlink = kvstoreDictFindLink(db->expires, slot, keystr, NULL);
+ serverAssert(exlink != NULL);
+ }
+
+ if ((newkv = activeDefragKvobj(kv, 1))) {
+ /* Update its reference in the DB keys. */
+ link = kvstoreDictFindLink(db->keys, slot, keystr, NULL);
+ serverAssert(link != NULL);
+ kvstoreDictSetAtLink(db->keys, slot, newkv, &link, 0);
+ if (expire != -1)
+ kvstoreDictSetAtLink(db->expires, slot, newkv, &exlink, 0);
+ activeDefragFree(kvobjGetAllocPtr(kv));
+ }
+ return newkv;
+}
+
+static doneStatus defragStageSubexpires(void *ctx, monotime endtime) {
+ unsigned int iterations = 0;
+ unsigned long long prev_defragged = server.stat_active_defrag_hits;
+ unsigned long long prev_scanned = server.stat_active_defrag_scanned;
+ defragSubexpiresCtx *subctx = ctx;
+ redisDb *db = &server.db[subctx->dbid];
+ estore *subexpires = db->subexpires;
+
+ /* If estore changed (flushdb, swapdb, etc.), Just complete the stage. */
+ if (db->subexpires != subctx->subexpires) {
+ return DEFRAG_DONE;
+ }
+
+ ebDefragFunctions eb_defragfns = {
+ .defragAlloc = activeDefragAlloc,
+ .defragItem = activeDefragSubexpiresOB
+ };
+
+ while (1) {
+ if (++iterations > 16 ||
+ server.stat_active_defrag_hits - prev_defragged > 512 ||
+ server.stat_active_defrag_scanned - prev_scanned > 64)
+ {
+ if (getMonotonicUs() >= endtime) break;
+ iterations = 0;
+ prev_defragged = server.stat_active_defrag_hits;
+ prev_scanned = server.stat_active_defrag_scanned;
+ }
+
+ /* If there's no cursor, we're ready to begin a new estore slot. */
+ if (!subctx->cursor) {
+ if (subctx->slot == ITER_SLOT_UNASSIGNED) {
+ subctx->slot = estoreGetFirstNonEmptyBucket(subexpires);
+ } else {
+ subctx->slot = estoreGetNextNonEmptyBucket(subexpires, subctx->slot);
+ }
+
+ if (subctx->slot == ITER_SLOT_UNASSIGNED) return DEFRAG_DONE;
+ }
+
+ /* Get the ebuckets for the current slot and scan it */
+ ebuckets *bucket = estoreGetBuckets(subexpires, subctx->slot);
+ if (!ebScanDefrag(bucket, &subexpiresBucketsType, &subctx->cursor, &eb_defragfns, db))
+ subctx->cursor = 0; /* Reset cursor to move to next slot */
+ }
+
+ return DEFRAG_NOT_DONE;
+}
+
+static doneStatus defragStagePubsubKvstore(void *ctx, monotime endtime) {
+ static dictDefragFunctions defragfns = {
+ .defragAlloc = activeDefragAlloc,
+ .defragKey = NULL, /* Handled by defragPubsubScanCallback */
+ .defragVal = NULL, /* Not needed for expires (no value) */
+ };
+
+ return defragStageKvstoreHelper(endtime, ctx,
+ defragPubsubScanCallback, NULL, &defragfns);
+}
+
+static doneStatus defragLuaScripts(void *ctx, monotime endtime) {
+ UNUSED(endtime);
+ UNUSED(ctx);
+ activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT);
+ return DEFRAG_DONE;
+}
+
+/* Handles defragmentation of module global data. This is a stage function
+ * that gets called periodically during the active defragmentation process. */
+static doneStatus defragModuleGlobals(void *ctx, monotime endtime) {
+ defragModuleCtx *defrag_module_ctx = ctx;
+
+ RedisModule *module = moduleGetHandleByName(defrag_module_ctx->module_name);
+ if (!module) {
+ /* Module has been unloaded, nothing to defrag. */
+ return DEFRAG_DONE;
+ }
+ /* Interval shouldn't exceed 1 hour */
+ serverAssert(!endtime || llabs((long long)endtime - (long long)getMonotonicUs()) < 60*60*1000*1000LL);
+
+ /* Call appropriate version of module's defrag callback:
+ * 1. Version 2 (defrag_cb_2): Supports incremental defrag and returns whether more work is needed
+ * 2. Version 1 (defrag_cb): Legacy version, performs all work in one call.
+ * Note: V1 doesn't support incremental defragmentation, may block for longer periods. */
+ RedisModuleDefragCtx defrag_ctx = { endtime, &defrag_module_ctx->cursor, NULL, -1, -1, -1 };
+ if (module->defrag_cb_2) {
+ return module->defrag_cb_2(&defrag_ctx) ? DEFRAG_NOT_DONE : DEFRAG_DONE;
+ } else if (module->defrag_cb) {
+ module->defrag_cb(&defrag_ctx);
+ return DEFRAG_DONE;
+ } else {
+ redis_unreachable();
+ }
+}
+
+static void freeDefragKeysContext(void *ctx) {
+ defragKeysCtx *defrag_keys_ctx = ctx;
+ if (defrag_keys_ctx->defrag_later) {
+ listRelease(defrag_keys_ctx->defrag_later);
+ }
+ zfree(defrag_keys_ctx);
+}
+
+static void freeDefragModelContext(void *ctx) {
+ defragModuleCtx *defrag_model_ctx = ctx;
+ sdsfree(defrag_model_ctx->module_name);
+ zfree(defrag_model_ctx);
+}
+
+static void freeDefragContext(void *ptr) {
+ StageDescriptor *stage = ptr;
+ if (stage->ctx_free_fn)
+ stage->ctx_free_fn(stage->ctx);
+ zfree(stage);
+}
+
+static void addDefragStage(defragStageFn stage_fn, defragStageContextFreeFn ctx_free_fn, void *ctx) {
+ StageDescriptor *stage = zmalloc(sizeof(StageDescriptor));
+ stage->stage_fn = stage_fn;
+ stage->ctx_free_fn = ctx_free_fn;
+ stage->ctx = ctx;
+ listAddNodeTail(defrag.remaining_stages, stage);
+}
+
+/* Updates the defrag decay rate based on the observed effectiveness of the defrag process.
+ * The decay rate is used to gradually slow down defrag when it's not being effective. */
+static void updateDefragDecayRate(float frag_pct) {
+ long long last_hits = server.stat_active_defrag_hits - defrag.start_defrag_hits;
+ long long last_misses = server.stat_active_defrag_misses - defrag.start_defrag_misses;
+ float last_frag_pct_change = defrag.start_frag_pct - frag_pct;
+ /* When defragmentation efficiency is low, we gradually reduce the
+ * speed for the next cycle to avoid CPU waste. However, in the
+ * following two cases, we keep the normal speed:
+ * 1) If the fragmentation percentage has increased or decreased by more than 2%.
+ * 2) If the fragmentation percentage decrease is small, but hits are above 1%,
+ * we still keep the normal speed. */
+ if (fabs(last_frag_pct_change) > 2 ||
+ (last_frag_pct_change < 0 && last_hits >= (last_hits + last_misses) * 0.01))
+ {
+ defrag.decay_rate = 1.0f;
+ } else {
+ defrag.decay_rate *= 0.9;
+ }
+}
+
+/* Called at the end of a complete defrag cycle, or when defrag is terminated */
+static void endDefragCycle(int normal_termination) {
+ if (normal_termination) {
+ /* For normal termination, we expect... */
+ serverAssert(!defrag.current_stage);
+ serverAssert(listLength(defrag.remaining_stages) == 0);
+ } else {
+ /* Defrag is being terminated abnormally */
+ aeDeleteTimeEvent(server.el, defrag.timeproc_id);
+
+ if (defrag.current_stage) {
+ listDelNode(defrag.remaining_stages, defrag.current_stage);
+ defrag.current_stage = NULL;
+ }
+ }
+ defrag.timeproc_id = AE_DELETED_EVENT_ID;
+
+ listRelease(defrag.remaining_stages);
+ defrag.remaining_stages = NULL;
+
+ size_t frag_bytes;
+ float frag_pct = getAllocatorFragmentation(&frag_bytes);
+ serverLog(LL_VERBOSE, "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu",
+ (int)elapsedMs(defrag.start_cycle), (int)(server.stat_active_defrag_hits - defrag.start_defrag_hits),
+ frag_pct, frag_bytes);
+
+ server.stat_total_active_defrag_time += elapsedUs(server.stat_last_active_defrag_time);
+ server.stat_last_active_defrag_time = 0;
+ server.active_defrag_running = 0;
+
+ updateDefragDecayRate(frag_pct);
+ moduleDefragEnd();
+
+ /* Immediately check to see if we should start another defrag cycle. */
+ activeDefragCycle();
+}
+
+/* Must be called at the start of the timeProc as it measures the delay from the end of the previous
+ * timeProc invocation when performing the computation. */
+static int computeDefragCycleUs(void) {
+ long dutyCycleUs;
+
+ int targetCpuPercent = server.active_defrag_running;
+ serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100);
+
+ static int prevCpuPercent = 0; /* STATIC - this persists */
+ if (targetCpuPercent != prevCpuPercent) {
+ /* If the targetCpuPercent changes, the value might be different from when the last wait
+ * time was computed. In this case, don't consider wait time. (This is really only an
+ * issue in crazy tests that dramatically increase CPU while defrag is running.) */
+ defrag.timeproc_end_time = 0;
+ prevCpuPercent = targetCpuPercent;
+ }
+
+ /* Given when the last duty cycle ended, compute time needed to achieve the desired percentage. */
+ if (defrag.timeproc_end_time == 0) {
+ /* Either the first call to the timeProc, or we were paused for some reason. */
+ defrag.timeproc_overage_us = 0;
+ dutyCycleUs = DEFRAG_CYCLE_US;
+ } else {
+ long waitedUs = getMonotonicUs() - defrag.timeproc_end_time;
+ /* Given the elapsed wait time between calls, compute the necessary duty time needed to
+ * achieve the desired CPU percentage.
+ * With: D = duty time, W = wait time, P = percent
+ * Solve: D P
+ * ----- = -----
+ * D + W 100
+ * Solving for D:
+ * D = P * W / (100 - P)
+ *
+ * Note that dutyCycleUs addresses starvation. If the wait time was long, we will compensate
+ * with a proportionately long duty-cycle. This won't significantly affect perceived
+ * latency, because clients are already being impacted by the long cycle time which caused
+ * the starvation of the timer. */
+ dutyCycleUs = targetCpuPercent * waitedUs / (100 - targetCpuPercent);
+
+ /* Also adjust for any accumulated overage. */
+ dutyCycleUs -= defrag.timeproc_overage_us;
+ defrag.timeproc_overage_us = 0;
+
+ if (dutyCycleUs < DEFRAG_CYCLE_US) {
+ /* We never reduce our cycle time, that would increase overhead. Instead, we track this
+ * as part of the overage, and increase wait time between cycles. */
+ defrag.timeproc_overage_us = DEFRAG_CYCLE_US - dutyCycleUs;
+ dutyCycleUs = DEFRAG_CYCLE_US;
+ } else if (dutyCycleUs > DEFRAG_CYCLE_US * 10) {
+ /* Add a time limit for the defrag duty cycle to prevent excessive latency.
+ * When latency is already high (indicated by a long time between calls),
+ * we don't want to make it worse by running defrag for too long. */
+ dutyCycleUs = DEFRAG_CYCLE_US * 10;
+ }
+ }
+ return dutyCycleUs;
+}
+
+/* Must be called at the end of the timeProc as it records the timeproc_end_time for use in the next
+ * computeDefragCycleUs computation. */
+static int computeDelayMs(monotime intendedEndtime) {
+ defrag.timeproc_end_time = getMonotonicUs();
+ long overage = defrag.timeproc_end_time - intendedEndtime;
+ defrag.timeproc_overage_us += overage; /* track over/under desired CPU */
+ /* Allow negative overage (underage) to count against existing overage, but don't allow
+ * underage (from short stages) to be accumulated. */
+ if (defrag.timeproc_overage_us < 0) defrag.timeproc_overage_us = 0;
+
+ int targetCpuPercent = server.active_defrag_running;
+ serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100);
+
+ /* Given the desired duty cycle, what inter-cycle delay do we need to achieve that? */
+ /* We want to achieve a specific CPU percent. To do that, we can't use a skewed computation. */
+ /* Example, if we run for 1ms and delay 10ms, that's NOT 10%, because the total cycle time is 11ms. */
+ /* Instead, if we rum for 1ms, our total time should be 10ms. So the delay is only 9ms. */
+ long totalCycleTimeUs = DEFRAG_CYCLE_US * 100 / targetCpuPercent;
+ long delayUs = totalCycleTimeUs - DEFRAG_CYCLE_US;
+ /* Only increase delay by the fraction of the overage that would be non-duty-cycle */
+ delayUs += defrag.timeproc_overage_us * (100 - targetCpuPercent) / 100;
+ if (delayUs < 0) delayUs = 0;
+ long delayMs = delayUs / 1000; /* round down */
+ return delayMs;
+}
+
+/* An independent time proc for defrag. While defrag is running, this is called much more often
+ * than the server cron. Frequent short calls provides low latency impact. */
+static int activeDefragTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData) {
+ UNUSED(eventLoop);
+ UNUSED(id);
+ UNUSED(clientData);
+
+ /* This timer shouldn't be registered unless there's work to do. */
+ serverAssert(defrag.current_stage || listLength(defrag.remaining_stages) > 0);
+
+ if (!server.active_defrag_enabled) {
+ /* Defrag has been disabled while running */
+ endDefragCycle(0);
+ return AE_NOMORE;
+ }
+
+ if (hasActiveChildProcess()) {
+ /* If there's a child process, pause the defrag, polling until the child completes. */
+ defrag.timeproc_end_time = 0; /* prevent starvation recovery */
+ return 100;
+ }
+
+ monotime starttime = getMonotonicUs();
+ int dutyCycleUs = computeDefragCycleUs();
+#if defined(DEBUG_DEFRAG_FULLY)
+ dutyCycleUs = 30*1000*1000LL; /* 30 seconds */
+#endif
+ monotime endtime = starttime + dutyCycleUs;
+ int haveMoreWork = 1;
+
+ mstime_t latency;
+ latencyStartMonitor(latency);
+
+ do {
+ if (!defrag.current_stage) {
+ defrag.current_stage = listFirst(defrag.remaining_stages);
+ }
+
+ StageDescriptor *stage = listNodeValue(defrag.current_stage);
+ doneStatus status = stage->stage_fn(stage->ctx, endtime);
+ if (status == DEFRAG_DONE) {
+ listDelNode(defrag.remaining_stages, defrag.current_stage);
+ defrag.current_stage = NULL;
+ }
+
+ haveMoreWork = (defrag.current_stage || listLength(defrag.remaining_stages) > 0);
+ /* If we've completed a stage early, and still have a standard time allotment remaining,
+ * we'll start another stage. This can happen when defrag is running infrequently, and
+ * starvation protection has increased the duty-cycle. */
+ } while (haveMoreWork && getMonotonicUs() <= endtime - DEFRAG_CYCLE_US);
+
+ latencyEndMonitor(latency);
+ latencyAddSampleIfNeeded("active-defrag-cycle", latency);
+
+ if (haveMoreWork) {
+ return computeDelayMs(endtime);
+ } else {
+ endDefragCycle(1);
+ return AE_NOMORE; /* Ends the timer proc */
+ }
+}
+
+/* During long running scripts, or while loading, there is a periodic function for handling other
+ * actions. This interface allows defrag to continue running, avoiding a single long defrag step
+ * after the long operation completes. */
+void defragWhileBlocked(void) {
+ /* This is called infrequently, while timers are not active. We might need to start defrag. */
+ if (!defragIsRunning()) activeDefragCycle();
+
+ if (!defragIsRunning()) return;
+
+ /* Save off the timeproc_id. If we have a normal termination, it will be cleared. */
+ long long timeproc_id = defrag.timeproc_id;
+
+ /* Simulate a single call of the timer proc */
+ long long reschedule_delay = activeDefragTimeProc(NULL, 0, NULL);
+ if (reschedule_delay == AE_NOMORE) {
+ /* If it's done, deregister the timer */
+ aeDeleteTimeEvent(server.el, timeproc_id);
+ }
+ /* Otherwise, just ignore the reschedule_delay, the timer will pop the next time that the
+ * event loop can process timers again. */
+}
+
+static void beginDefragCycle(void) {
+ serverAssert(!defragIsRunning());
+
+ moduleDefragStart();
+
+ serverAssert(defrag.remaining_stages == NULL);
+ defrag.remaining_stages = listCreate();
+ listSetFreeMethod(defrag.remaining_stages, freeDefragContext);
+
+ for (int dbid = 0; dbid < server.dbnum; dbid++) {
+ redisDb *db = &server.db[dbid];
+
+ /* Add stage for keys. */
+ defragKeysCtx *defrag_keys_ctx = zcalloc(sizeof(defragKeysCtx));
+ defrag_keys_ctx->kvstate = INIT_KVSTORE_STATE(db->keys);
+ defrag_keys_ctx->dbid = dbid;
+ addDefragStage(defragStageDbKeys, freeDefragKeysContext, defrag_keys_ctx);
+
+ /* Add stage for expires. */
+ defragKeysCtx *defrag_expires_ctx = zcalloc(sizeof(defragKeysCtx));
+ defrag_expires_ctx->kvstate = INIT_KVSTORE_STATE(db->expires);
+ defrag_expires_ctx->dbid = dbid;
+ addDefragStage(defragStageExpiresKvstore, freeDefragKeysContext, defrag_expires_ctx);
+
+ /* Add stage for subexpires. */
+ defragSubexpiresCtx *defrag_subexpires_ctx = zcalloc(sizeof(defragSubexpiresCtx));
+ defrag_subexpires_ctx->subexpires = db->subexpires;
+ defrag_subexpires_ctx->slot = ITER_SLOT_UNASSIGNED;
+ defrag_subexpires_ctx->cursor = 0;
+ defrag_subexpires_ctx->dbid = dbid;
+ addDefragStage(defragStageSubexpires, zfree, defrag_subexpires_ctx);
+ }
+
+ /* Add stage for pubsub channels. */
+ defragPubSubCtx *defrag_pubsub_ctx = zmalloc(sizeof(defragPubSubCtx));
+ defrag_pubsub_ctx->kvstate = INIT_KVSTORE_STATE(server.pubsub_channels);
+ defrag_pubsub_ctx->getPubSubChannels = getClientPubSubChannels;
+ addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsub_ctx);
+
+ /* Add stage for pubsubshard channels. */
+ defragPubSubCtx *defrag_pubsubshard_ctx = zmalloc(sizeof(defragPubSubCtx));
+ defrag_pubsubshard_ctx->kvstate = INIT_KVSTORE_STATE(server.pubsubshard_channels);
+ defrag_pubsubshard_ctx->getPubSubChannels = getClientPubSubShardChannels;
+ addDefragStage(defragStagePubsubKvstore, zfree, defrag_pubsubshard_ctx);
+
+ addDefragStage(defragLuaScripts, NULL, NULL);
+
+ /* Add stages for modules. */
+ dictIterator di;
+ dictEntry *de;
+ dictInitIterator(&di, modules);
+ while ((de = dictNext(&di)) != NULL) {
+ struct RedisModule *module = dictGetVal(de);
+ if (module->defrag_cb || module->defrag_cb_2) {
+ defragModuleCtx *ctx = zmalloc(sizeof(defragModuleCtx));
+ ctx->cursor = 0;
+ ctx->module_name = sdsnew(module->name);
+ addDefragStage(defragModuleGlobals, freeDefragModelContext, ctx);
+ }
+ }
+ dictResetIterator(&di);
+
+ defrag.current_stage = NULL;
+ defrag.start_cycle = getMonotonicUs();
+ defrag.start_defrag_hits = server.stat_active_defrag_hits;
+ defrag.start_defrag_misses = server.stat_active_defrag_misses;
+ defrag.start_frag_pct = getAllocatorFragmentation(NULL);
+ defrag.timeproc_end_time = 0;
+ defrag.timeproc_overage_us = 0;
+ defrag.timeproc_id = aeCreateTimeEvent(server.el, 0, activeDefragTimeProc, NULL, NULL);
+
+ elapsedStart(&server.stat_last_active_defrag_time);
+}
+
+void activeDefragCycle(void) {
+ if (!server.active_defrag_enabled) return;
+
+ /* Defrag gets paused while a child process is active. So there's no point in starting a new
+ * cycle or adjusting the CPU percentage for an existing cycle. */
+ if (hasActiveChildProcess()) return;
+
+ computeDefragCycles();
+
+ if (server.active_defrag_running > 0 && !defragIsRunning()) beginDefragCycle();
+}
+
+#else /* HAVE_DEFRAG */
+
+void activeDefragCycle(void) {
+ /* Not implemented yet. */
+}
+
+void *activeDefragAlloc(void *ptr) {
+ UNUSED(ptr);
+ return NULL;
+}
+
+void *activeDefragAllocRaw(size_t size) {
+ /* fallback to regular allocation */
+ return zmalloc(size);
+}
+
+void activeDefragFreeRaw(void *ptr) {
+ /* fallback to regular free */
+ zfree(ptr);
+}
+
+robj *activeDefragStringOb(robj *ob) {
+ UNUSED(ob);
+ return NULL;
+}
+
+void defragWhileBlocked(void) {
+}
+
+#endif