diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
| commit | 5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch) | |
| tree | 1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/src/lazyfree.c | |
| parent | c7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff) | |
| download | crep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz | |
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/src/lazyfree.c')
| -rw-r--r-- | examples/redis-unstable/src/lazyfree.c | 362 |
1 files changed, 362 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/lazyfree.c b/examples/redis-unstable/src/lazyfree.c new file mode 100644 index 0000000..1960be1 --- /dev/null +++ b/examples/redis-unstable/src/lazyfree.c | |||
| @@ -0,0 +1,362 @@ | |||
| 1 | #include "server.h" | ||
| 2 | #include "bio.h" | ||
| 3 | #include "atomicvar.h" | ||
| 4 | #include "functions.h" | ||
| 5 | #include "cluster.h" | ||
| 6 | #include "ebuckets.h" | ||
| 7 | |||
| 8 | static redisAtomic size_t lazyfree_objects = 0; | ||
| 9 | static redisAtomic size_t lazyfreed_objects = 0; | ||
| 10 | |||
| 11 | /* Release objects from the lazyfree thread. It's just decrRefCount() | ||
| 12 | * updating the count of objects to release. */ | ||
| 13 | void lazyfreeFreeObject(void *args[]) { | ||
| 14 | robj *o = (robj *) args[0]; | ||
| 15 | decrRefCount(o); | ||
| 16 | atomicDecr(lazyfree_objects,1); | ||
| 17 | atomicIncr(lazyfreed_objects,1); | ||
| 18 | } | ||
| 19 | |||
| 20 | /* Release a database from the lazyfree thread. The 'db' pointer is the | ||
| 21 | * database which was substituted with a fresh one in the main thread | ||
| 22 | * when the database was logically deleted. */ | ||
| 23 | void lazyfreeFreeDatabase(void *args[]) { | ||
| 24 | kvstore *da1 = args[0]; | ||
| 25 | kvstore *da2 = args[1]; | ||
| 26 | estore *subexpires = args[2]; | ||
| 27 | estoreRelease(subexpires); | ||
| 28 | size_t numkeys = kvstoreSize(da1); | ||
| 29 | kvstoreRelease(da1); | ||
| 30 | kvstoreRelease(da2); | ||
| 31 | atomicDecr(lazyfree_objects,numkeys); | ||
| 32 | atomicIncr(lazyfreed_objects,numkeys); | ||
| 33 | |||
| 34 | #if defined(USE_JEMALLOC) | ||
| 35 | /* Only clear the current thread cache. | ||
| 36 | * Ignore the return call since this will fail if the tcache is disabled. */ | ||
| 37 | je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0); | ||
| 38 | |||
| 39 | jemalloc_purge(); | ||
| 40 | #endif | ||
| 41 | } | ||
| 42 | |||
| 43 | /* Release the key tracking table. */ | ||
| 44 | void lazyFreeTrackingTable(void *args[]) { | ||
| 45 | rax *rt = args[0]; | ||
| 46 | size_t len = rt->numele; | ||
| 47 | freeTrackingRadixTree(rt); | ||
| 48 | atomicDecr(lazyfree_objects,len); | ||
| 49 | atomicIncr(lazyfreed_objects,len); | ||
| 50 | } | ||
| 51 | |||
| 52 | /* Release the error stats rax tree. */ | ||
| 53 | void lazyFreeErrors(void *args[]) { | ||
| 54 | rax *errors = args[0]; | ||
| 55 | size_t len = errors->numele; | ||
| 56 | raxFreeWithCallback(errors, zfree); | ||
| 57 | atomicDecr(lazyfree_objects,len); | ||
| 58 | atomicIncr(lazyfreed_objects,len); | ||
| 59 | } | ||
| 60 | |||
| 61 | /* Release the lua_scripts dict. */ | ||
| 62 | void lazyFreeLuaScripts(void *args[]) { | ||
| 63 | dict *lua_scripts = args[0]; | ||
| 64 | list *lua_scripts_lru_list = args[1]; | ||
| 65 | lua_State *lua = args[2]; | ||
| 66 | long long len = dictSize(lua_scripts); | ||
| 67 | freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua); | ||
| 68 | atomicDecr(lazyfree_objects,len); | ||
| 69 | atomicIncr(lazyfreed_objects,len); | ||
| 70 | } | ||
| 71 | |||
| 72 | /* Release the functions ctx. */ | ||
| 73 | void lazyFreeFunctionsCtx(void *args[]) { | ||
| 74 | functionsLibCtx *functions_lib_ctx = args[0]; | ||
| 75 | dict *engs = args[1]; | ||
| 76 | size_t len = functionsLibCtxFunctionsLen(functions_lib_ctx); | ||
| 77 | functionsLibCtxFree(functions_lib_ctx); | ||
| 78 | len += dictSize(engs); | ||
| 79 | dictRelease(engs); | ||
| 80 | atomicDecr(lazyfree_objects,len); | ||
| 81 | atomicIncr(lazyfreed_objects,len); | ||
| 82 | } | ||
| 83 | |||
| 84 | /* Release replication backlog referencing memory. */ | ||
| 85 | void lazyFreeReplicationBacklogRefMem(void *args[]) { | ||
| 86 | list *blocks = args[0]; | ||
| 87 | rax *index = args[1]; | ||
| 88 | long long len = listLength(blocks); | ||
| 89 | len += raxSize(index); | ||
| 90 | listRelease(blocks); | ||
| 91 | raxFree(index); | ||
| 92 | atomicDecr(lazyfree_objects,len); | ||
| 93 | atomicIncr(lazyfreed_objects,len); | ||
| 94 | } | ||
| 95 | |||
| 96 | /* Return the number of currently pending objects to free. */ | ||
| 97 | size_t lazyfreeGetPendingObjectsCount(void) { | ||
| 98 | size_t aux; | ||
| 99 | atomicGet(lazyfree_objects,aux); | ||
| 100 | return aux; | ||
| 101 | } | ||
| 102 | |||
| 103 | /* Return the number of objects that have been freed. */ | ||
| 104 | size_t lazyfreeGetFreedObjectsCount(void) { | ||
| 105 | size_t aux; | ||
| 106 | atomicGet(lazyfreed_objects,aux); | ||
| 107 | return aux; | ||
| 108 | } | ||
| 109 | |||
| 110 | void lazyfreeResetStats(void) { | ||
| 111 | atomicSet(lazyfreed_objects,0); | ||
| 112 | } | ||
| 113 | |||
| 114 | /* Return the amount of work needed in order to free an object. | ||
| 115 | * The return value is not always the actual number of allocations the | ||
| 116 | * object is composed of, but a number proportional to it. | ||
| 117 | * | ||
| 118 | * For strings the function always returns 1. | ||
| 119 | * | ||
| 120 | * For aggregated objects represented by hash tables or other data structures | ||
| 121 | * the function just returns the number of elements the object is composed of. | ||
| 122 | * | ||
| 123 | * Objects composed of single allocations are always reported as having a | ||
| 124 | * single item even if they are actually logical composed of multiple | ||
| 125 | * elements. | ||
| 126 | * | ||
| 127 | * For lists the function returns the number of elements in the quicklist | ||
| 128 | * representing the list. */ | ||
| 129 | size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) { | ||
| 130 | if (obj->type == OBJ_LIST && obj->encoding == OBJ_ENCODING_QUICKLIST) { | ||
| 131 | quicklist *ql = obj->ptr; | ||
| 132 | return ql->len; | ||
| 133 | } else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) { | ||
| 134 | dict *ht = obj->ptr; | ||
| 135 | return dictSize(ht); | ||
| 136 | } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){ | ||
| 137 | zset *zs = obj->ptr; | ||
| 138 | return zs->zsl->length; | ||
| 139 | } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) { | ||
| 140 | dict *ht = obj->ptr; | ||
| 141 | return dictSize(ht); | ||
| 142 | } else if (obj->type == OBJ_STREAM) { | ||
| 143 | size_t effort = 0; | ||
| 144 | stream *s = obj->ptr; | ||
| 145 | |||
| 146 | /* Make a best effort estimate to maintain constant runtime. Every macro | ||
| 147 | * node in the Stream is one allocation. */ | ||
| 148 | effort += s->rax->numnodes; | ||
| 149 | |||
| 150 | /* Every consumer group is an allocation and so are the entries in its | ||
| 151 | * PEL. We use size of the first group's PEL as an estimate for all | ||
| 152 | * others. */ | ||
| 153 | if (s->cgroups && raxSize(s->cgroups)) { | ||
| 154 | raxIterator ri; | ||
| 155 | streamCG *cg; | ||
| 156 | raxStart(&ri,s->cgroups); | ||
| 157 | raxSeek(&ri,"^",NULL,0); | ||
| 158 | /* There must be at least one group so the following should always | ||
| 159 | * work. */ | ||
| 160 | serverAssert(raxNext(&ri)); | ||
| 161 | cg = ri.data; | ||
| 162 | effort += raxSize(s->cgroups)*(1+raxSize(cg->pel)); | ||
| 163 | raxStop(&ri); | ||
| 164 | } | ||
| 165 | return effort; | ||
| 166 | } else if (obj->type == OBJ_MODULE) { | ||
| 167 | size_t effort = moduleGetFreeEffort(key, obj, dbid); | ||
| 168 | /* If the module's free_effort returns 0, we will use asynchronous free | ||
| 169 | * memory by default. */ | ||
| 170 | return effort == 0 ? ULONG_MAX : effort; | ||
| 171 | } else { | ||
| 172 | return 1; /* Everything else is a single allocation. */ | ||
| 173 | } | ||
| 174 | } | ||
| 175 | |||
| 176 | /* If there are enough allocations to free the value object asynchronously, it | ||
| 177 | * may be put into a lazy free list instead of being freed synchronously. The | ||
| 178 | * lazy free list will be reclaimed in a different bio.c thread. If the value is | ||
| 179 | * composed of a few allocations, to free in a lazy way is actually just | ||
| 180 | * slower... So under a certain limit we just free the object synchronously. */ | ||
| 181 | #define LAZYFREE_THRESHOLD 64 | ||
| 182 | |||
| 183 | /* Free an object, if the object is huge enough, free it in async way. */ | ||
| 184 | void freeObjAsync(robj *key, robj *obj, int dbid) { | ||
| 185 | size_t free_effort = lazyfreeGetFreeEffort(key,obj,dbid); | ||
| 186 | /* Note that if the object is shared, to reclaim it now it is not | ||
| 187 | * possible. This rarely happens, however sometimes the implementation | ||
| 188 | * of parts of the Redis core may call incrRefCount() to protect | ||
| 189 | * objects, and then call dbDelete(). */ | ||
| 190 | if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) { | ||
| 191 | atomicIncr(lazyfree_objects,1); | ||
| 192 | bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj); | ||
| 193 | } else { | ||
| 194 | decrRefCount(obj); | ||
| 195 | } | ||
| 196 | } | ||
| 197 | |||
| 198 | /* Duplicate client reply objects that reference database objects to avoid race | ||
| 199 | * conditions with bio threads during async flushdb. | ||
| 200 | * | ||
| 201 | * Since incrRefCount/decrRefCount are not thread-safe, and bio thread may | ||
| 202 | * free database objects while main thread/IO threads send client replies, we need to | ||
| 203 | * create independent copies of the string objects to avoid concurrent access. */ | ||
| 204 | static void protectClientReplyObjects(void) { | ||
| 205 | /* If there are no clients with pending ref replies, exit ASAP. */ | ||
| 206 | if (!listLength(server.clients_with_pending_ref_reply)) | ||
| 207 | return; | ||
| 208 | |||
| 209 | /* Pause all IO threads to safely duplicate string objects. */ | ||
| 210 | int allpaused = 0; | ||
| 211 | if (server.io_threads_num > 1) { | ||
| 212 | serverAssert(pthread_equal(server.main_thread_id, pthread_self())); | ||
| 213 | allpaused = 1; | ||
| 214 | pauseAllIOThreads(); | ||
| 215 | } | ||
| 216 | |||
| 217 | listNode *ln; | ||
| 218 | listIter li; | ||
| 219 | listRewind(server.clients_with_pending_ref_reply, &li); | ||
| 220 | while ((ln = listNext(&li)) != NULL) { | ||
| 221 | client *c = listNodeValue(ln); | ||
| 222 | |||
| 223 | /* Process c->buf if it's encoded */ | ||
| 224 | if (c->buf_encoded && c->bufpos > 0) { | ||
| 225 | char *ptr = c->buf; | ||
| 226 | while (ptr < c->buf + c->bufpos) { | ||
| 227 | payloadHeader *header = (payloadHeader *)ptr; | ||
| 228 | ptr += sizeof(payloadHeader); | ||
| 229 | |||
| 230 | if (header->payload_type == BULK_STR_REF) { | ||
| 231 | bulkStrRef *str_ref = (bulkStrRef *)ptr; | ||
| 232 | if (str_ref->obj != NULL) { | ||
| 233 | /* Duplicate the string object */ | ||
| 234 | robj *new_obj = dupStringObject(str_ref->obj); | ||
| 235 | decrRefCount(str_ref->obj); | ||
| 236 | str_ref->obj = new_obj; | ||
| 237 | } | ||
| 238 | } | ||
| 239 | ptr += header->payload_len; | ||
| 240 | } | ||
| 241 | } | ||
| 242 | |||
| 243 | /* Process reply list */ | ||
| 244 | if (c->reply && listLength(c->reply)) { | ||
| 245 | listIter reply_li; | ||
| 246 | listNode *reply_ln; | ||
| 247 | listRewind(c->reply, &reply_li); | ||
| 248 | while ((reply_ln = listNext(&reply_li))) { | ||
| 249 | clientReplyBlock *block = listNodeValue(reply_ln); | ||
| 250 | if (block && block->buf_encoded) { | ||
| 251 | char *ptr = block->buf; | ||
| 252 | while (ptr < block->buf + block->used) { | ||
| 253 | payloadHeader *header = (payloadHeader *)ptr; | ||
| 254 | ptr += sizeof(payloadHeader); | ||
| 255 | |||
| 256 | if (header->payload_type == BULK_STR_REF) { | ||
| 257 | bulkStrRef *str_ref = (bulkStrRef *)ptr; | ||
| 258 | if (str_ref->obj != NULL) { | ||
| 259 | /* Duplicate the string object */ | ||
| 260 | robj *new_obj = dupStringObject(str_ref->obj); | ||
| 261 | decrRefCount(str_ref->obj); | ||
| 262 | str_ref->obj = new_obj; | ||
| 263 | } | ||
| 264 | } | ||
| 265 | ptr += header->payload_len; | ||
| 266 | } | ||
| 267 | } | ||
| 268 | } | ||
| 269 | } | ||
| 270 | |||
| 271 | /* Process references in IO deferred objects and remove client from | ||
| 272 | * pending ref list since all refs have been duplicated above. */ | ||
| 273 | freeClientIODeferredObjects(c, 0); | ||
| 274 | tryUnlinkClientFromPendingRefReply(c, 1); | ||
| 275 | } | ||
| 276 | |||
| 277 | if (allpaused) resumeAllIOThreads(); | ||
| 278 | } | ||
| 279 | |||
| 280 | /* Empty a Redis DB asynchronously. What the function does actually is to | ||
| 281 | * create a new empty set of hash tables and scheduling the old ones for | ||
| 282 | * lazy freeing. */ | ||
| 283 | void emptyDbAsync(redisDb *db) { | ||
| 284 | int slot_count_bits = 0; | ||
| 285 | int flags = KVSTORE_ALLOCATE_DICTS_ON_DEMAND; | ||
| 286 | if (server.cluster_enabled) { | ||
| 287 | slot_count_bits = CLUSTER_SLOT_MASK_BITS; | ||
| 288 | flags |= KVSTORE_FREE_EMPTY_DICTS; | ||
| 289 | } | ||
| 290 | kvstore *oldkeys = db->keys, *oldexpires = db->expires; | ||
| 291 | estore *oldsubexpires = db->subexpires; | ||
| 292 | db->keys = kvstoreCreate(&kvstoreExType, &dbDictType, slot_count_bits, flags); | ||
| 293 | db->expires = kvstoreCreate(&kvstoreBaseType, &dbExpiresDictType, slot_count_bits, flags); | ||
| 294 | db->subexpires = estoreCreate(&subexpiresBucketsType, slot_count_bits); | ||
| 295 | protectClientReplyObjects(); /* Protect client reply objects before async free. */ | ||
| 296 | emptyDbDataAsync(oldkeys, oldexpires, oldsubexpires); | ||
| 297 | } | ||
| 298 | |||
| 299 | /* Empty a Redis DB data asynchronously. */ | ||
| 300 | void emptyDbDataAsync(kvstore *keys, kvstore *expires, ebuckets hexpires) { | ||
| 301 | atomicIncr(lazyfree_objects, kvstoreSize(keys)); | ||
| 302 | bioCreateLazyFreeJob(lazyfreeFreeDatabase, 3, keys, expires, hexpires); | ||
| 303 | } | ||
| 304 | |||
| 305 | /* Free the key tracking table. | ||
| 306 | * If the table is huge enough, free it in async way. */ | ||
| 307 | void freeTrackingRadixTreeAsync(rax *tracking) { | ||
| 308 | /* Because this rax has only keys and no values so we use numnodes. */ | ||
| 309 | if (tracking->numnodes > LAZYFREE_THRESHOLD) { | ||
| 310 | atomicIncr(lazyfree_objects,tracking->numele); | ||
| 311 | bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking); | ||
| 312 | } else { | ||
| 313 | freeTrackingRadixTree(tracking); | ||
| 314 | } | ||
| 315 | } | ||
| 316 | |||
| 317 | /* Free the error stats rax tree. | ||
| 318 | * If the rax tree is huge enough, free it in async way. */ | ||
| 319 | void freeErrorsRadixTreeAsync(rax *errors) { | ||
| 320 | /* Because this rax has only keys and no values so we use numnodes. */ | ||
| 321 | if (errors->numnodes > LAZYFREE_THRESHOLD) { | ||
| 322 | atomicIncr(lazyfree_objects,errors->numele); | ||
| 323 | bioCreateLazyFreeJob(lazyFreeErrors,1,errors); | ||
| 324 | } else { | ||
| 325 | raxFreeWithCallback(errors, zfree); | ||
| 326 | } | ||
| 327 | } | ||
| 328 | |||
| 329 | /* Free lua_scripts dict and lru list, if the dict is huge enough, free them in async way. | ||
| 330 | * Close lua interpreter, if there are a lot of lua scripts, close it in async way. */ | ||
| 331 | void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua) { | ||
| 332 | if (dictSize(lua_scripts) > LAZYFREE_THRESHOLD) { | ||
| 333 | atomicIncr(lazyfree_objects,dictSize(lua_scripts)); | ||
| 334 | bioCreateLazyFreeJob(lazyFreeLuaScripts,3,lua_scripts,lua_scripts_lru_list,lua); | ||
| 335 | } else { | ||
| 336 | freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua); | ||
| 337 | } | ||
| 338 | } | ||
| 339 | |||
| 340 | /* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */ | ||
| 341 | void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx, dict *engs) { | ||
| 342 | if (functionsLibCtxFunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) { | ||
| 343 | atomicIncr(lazyfree_objects,functionsLibCtxFunctionsLen(functions_lib_ctx)+dictSize(engs)); | ||
| 344 | bioCreateLazyFreeJob(lazyFreeFunctionsCtx,2,functions_lib_ctx,engs); | ||
| 345 | } else { | ||
| 346 | functionsLibCtxFree(functions_lib_ctx); | ||
| 347 | dictRelease(engs); | ||
| 348 | } | ||
| 349 | } | ||
| 350 | |||
| 351 | /* Free replication backlog referencing buffer blocks and rax index. */ | ||
| 352 | void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) { | ||
| 353 | if (listLength(blocks) > LAZYFREE_THRESHOLD || | ||
| 354 | raxSize(index) > LAZYFREE_THRESHOLD) | ||
| 355 | { | ||
| 356 | atomicIncr(lazyfree_objects,listLength(blocks)+raxSize(index)); | ||
| 357 | bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem,2,blocks,index); | ||
| 358 | } else { | ||
| 359 | listRelease(blocks); | ||
| 360 | raxFree(index); | ||
| 361 | } | ||
| 362 | } | ||
