aboutsummaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/lazyfree.c
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
commit5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch)
tree1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/src/lazyfree.c
parentc7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff)
downloadcrep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/src/lazyfree.c')
-rw-r--r--examples/redis-unstable/src/lazyfree.c362
1 files changed, 362 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/lazyfree.c b/examples/redis-unstable/src/lazyfree.c
new file mode 100644
index 0000000..1960be1
--- /dev/null
+++ b/examples/redis-unstable/src/lazyfree.c
@@ -0,0 +1,362 @@
1#include "server.h"
2#include "bio.h"
3#include "atomicvar.h"
4#include "functions.h"
5#include "cluster.h"
6#include "ebuckets.h"
7
8static redisAtomic size_t lazyfree_objects = 0;
9static redisAtomic size_t lazyfreed_objects = 0;
10
11/* Release objects from the lazyfree thread. It's just decrRefCount()
12 * updating the count of objects to release. */
13void lazyfreeFreeObject(void *args[]) {
14 robj *o = (robj *) args[0];
15 decrRefCount(o);
16 atomicDecr(lazyfree_objects,1);
17 atomicIncr(lazyfreed_objects,1);
18}
19
20/* Release a database from the lazyfree thread. The 'db' pointer is the
21 * database which was substituted with a fresh one in the main thread
22 * when the database was logically deleted. */
23void lazyfreeFreeDatabase(void *args[]) {
24 kvstore *da1 = args[0];
25 kvstore *da2 = args[1];
26 estore *subexpires = args[2];
27 estoreRelease(subexpires);
28 size_t numkeys = kvstoreSize(da1);
29 kvstoreRelease(da1);
30 kvstoreRelease(da2);
31 atomicDecr(lazyfree_objects,numkeys);
32 atomicIncr(lazyfreed_objects,numkeys);
33
34#if defined(USE_JEMALLOC)
35 /* Only clear the current thread cache.
36 * Ignore the return call since this will fail if the tcache is disabled. */
37 je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0);
38
39 jemalloc_purge();
40#endif
41}
42
43/* Release the key tracking table. */
44void lazyFreeTrackingTable(void *args[]) {
45 rax *rt = args[0];
46 size_t len = rt->numele;
47 freeTrackingRadixTree(rt);
48 atomicDecr(lazyfree_objects,len);
49 atomicIncr(lazyfreed_objects,len);
50}
51
52/* Release the error stats rax tree. */
53void lazyFreeErrors(void *args[]) {
54 rax *errors = args[0];
55 size_t len = errors->numele;
56 raxFreeWithCallback(errors, zfree);
57 atomicDecr(lazyfree_objects,len);
58 atomicIncr(lazyfreed_objects,len);
59}
60
61/* Release the lua_scripts dict. */
62void lazyFreeLuaScripts(void *args[]) {
63 dict *lua_scripts = args[0];
64 list *lua_scripts_lru_list = args[1];
65 lua_State *lua = args[2];
66 long long len = dictSize(lua_scripts);
67 freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua);
68 atomicDecr(lazyfree_objects,len);
69 atomicIncr(lazyfreed_objects,len);
70}
71
72/* Release the functions ctx. */
73void lazyFreeFunctionsCtx(void *args[]) {
74 functionsLibCtx *functions_lib_ctx = args[0];
75 dict *engs = args[1];
76 size_t len = functionsLibCtxFunctionsLen(functions_lib_ctx);
77 functionsLibCtxFree(functions_lib_ctx);
78 len += dictSize(engs);
79 dictRelease(engs);
80 atomicDecr(lazyfree_objects,len);
81 atomicIncr(lazyfreed_objects,len);
82}
83
84/* Release replication backlog referencing memory. */
85void lazyFreeReplicationBacklogRefMem(void *args[]) {
86 list *blocks = args[0];
87 rax *index = args[1];
88 long long len = listLength(blocks);
89 len += raxSize(index);
90 listRelease(blocks);
91 raxFree(index);
92 atomicDecr(lazyfree_objects,len);
93 atomicIncr(lazyfreed_objects,len);
94}
95
96/* Return the number of currently pending objects to free. */
97size_t lazyfreeGetPendingObjectsCount(void) {
98 size_t aux;
99 atomicGet(lazyfree_objects,aux);
100 return aux;
101}
102
103/* Return the number of objects that have been freed. */
104size_t lazyfreeGetFreedObjectsCount(void) {
105 size_t aux;
106 atomicGet(lazyfreed_objects,aux);
107 return aux;
108}
109
110void lazyfreeResetStats(void) {
111 atomicSet(lazyfreed_objects,0);
112}
113
114/* Return the amount of work needed in order to free an object.
115 * The return value is not always the actual number of allocations the
116 * object is composed of, but a number proportional to it.
117 *
118 * For strings the function always returns 1.
119 *
120 * For aggregated objects represented by hash tables or other data structures
121 * the function just returns the number of elements the object is composed of.
122 *
123 * Objects composed of single allocations are always reported as having a
124 * single item even if they are actually logical composed of multiple
125 * elements.
126 *
127 * For lists the function returns the number of elements in the quicklist
128 * representing the list. */
129size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) {
130 if (obj->type == OBJ_LIST && obj->encoding == OBJ_ENCODING_QUICKLIST) {
131 quicklist *ql = obj->ptr;
132 return ql->len;
133 } else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
134 dict *ht = obj->ptr;
135 return dictSize(ht);
136 } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
137 zset *zs = obj->ptr;
138 return zs->zsl->length;
139 } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
140 dict *ht = obj->ptr;
141 return dictSize(ht);
142 } else if (obj->type == OBJ_STREAM) {
143 size_t effort = 0;
144 stream *s = obj->ptr;
145
146 /* Make a best effort estimate to maintain constant runtime. Every macro
147 * node in the Stream is one allocation. */
148 effort += s->rax->numnodes;
149
150 /* Every consumer group is an allocation and so are the entries in its
151 * PEL. We use size of the first group's PEL as an estimate for all
152 * others. */
153 if (s->cgroups && raxSize(s->cgroups)) {
154 raxIterator ri;
155 streamCG *cg;
156 raxStart(&ri,s->cgroups);
157 raxSeek(&ri,"^",NULL,0);
158 /* There must be at least one group so the following should always
159 * work. */
160 serverAssert(raxNext(&ri));
161 cg = ri.data;
162 effort += raxSize(s->cgroups)*(1+raxSize(cg->pel));
163 raxStop(&ri);
164 }
165 return effort;
166 } else if (obj->type == OBJ_MODULE) {
167 size_t effort = moduleGetFreeEffort(key, obj, dbid);
168 /* If the module's free_effort returns 0, we will use asynchronous free
169 * memory by default. */
170 return effort == 0 ? ULONG_MAX : effort;
171 } else {
172 return 1; /* Everything else is a single allocation. */
173 }
174}
175
176/* If there are enough allocations to free the value object asynchronously, it
177 * may be put into a lazy free list instead of being freed synchronously. The
178 * lazy free list will be reclaimed in a different bio.c thread. If the value is
179 * composed of a few allocations, to free in a lazy way is actually just
180 * slower... So under a certain limit we just free the object synchronously. */
181#define LAZYFREE_THRESHOLD 64
182
183/* Free an object, if the object is huge enough, free it in async way. */
184void freeObjAsync(robj *key, robj *obj, int dbid) {
185 size_t free_effort = lazyfreeGetFreeEffort(key,obj,dbid);
186 /* Note that if the object is shared, to reclaim it now it is not
187 * possible. This rarely happens, however sometimes the implementation
188 * of parts of the Redis core may call incrRefCount() to protect
189 * objects, and then call dbDelete(). */
190 if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) {
191 atomicIncr(lazyfree_objects,1);
192 bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj);
193 } else {
194 decrRefCount(obj);
195 }
196}
197
198/* Duplicate client reply objects that reference database objects to avoid race
199 * conditions with bio threads during async flushdb.
200 *
201 * Since incrRefCount/decrRefCount are not thread-safe, and bio thread may
202 * free database objects while main thread/IO threads send client replies, we need to
203 * create independent copies of the string objects to avoid concurrent access. */
204static void protectClientReplyObjects(void) {
205 /* If there are no clients with pending ref replies, exit ASAP. */
206 if (!listLength(server.clients_with_pending_ref_reply))
207 return;
208
209 /* Pause all IO threads to safely duplicate string objects. */
210 int allpaused = 0;
211 if (server.io_threads_num > 1) {
212 serverAssert(pthread_equal(server.main_thread_id, pthread_self()));
213 allpaused = 1;
214 pauseAllIOThreads();
215 }
216
217 listNode *ln;
218 listIter li;
219 listRewind(server.clients_with_pending_ref_reply, &li);
220 while ((ln = listNext(&li)) != NULL) {
221 client *c = listNodeValue(ln);
222
223 /* Process c->buf if it's encoded */
224 if (c->buf_encoded && c->bufpos > 0) {
225 char *ptr = c->buf;
226 while (ptr < c->buf + c->bufpos) {
227 payloadHeader *header = (payloadHeader *)ptr;
228 ptr += sizeof(payloadHeader);
229
230 if (header->payload_type == BULK_STR_REF) {
231 bulkStrRef *str_ref = (bulkStrRef *)ptr;
232 if (str_ref->obj != NULL) {
233 /* Duplicate the string object */
234 robj *new_obj = dupStringObject(str_ref->obj);
235 decrRefCount(str_ref->obj);
236 str_ref->obj = new_obj;
237 }
238 }
239 ptr += header->payload_len;
240 }
241 }
242
243 /* Process reply list */
244 if (c->reply && listLength(c->reply)) {
245 listIter reply_li;
246 listNode *reply_ln;
247 listRewind(c->reply, &reply_li);
248 while ((reply_ln = listNext(&reply_li))) {
249 clientReplyBlock *block = listNodeValue(reply_ln);
250 if (block && block->buf_encoded) {
251 char *ptr = block->buf;
252 while (ptr < block->buf + block->used) {
253 payloadHeader *header = (payloadHeader *)ptr;
254 ptr += sizeof(payloadHeader);
255
256 if (header->payload_type == BULK_STR_REF) {
257 bulkStrRef *str_ref = (bulkStrRef *)ptr;
258 if (str_ref->obj != NULL) {
259 /* Duplicate the string object */
260 robj *new_obj = dupStringObject(str_ref->obj);
261 decrRefCount(str_ref->obj);
262 str_ref->obj = new_obj;
263 }
264 }
265 ptr += header->payload_len;
266 }
267 }
268 }
269 }
270
271 /* Process references in IO deferred objects and remove client from
272 * pending ref list since all refs have been duplicated above. */
273 freeClientIODeferredObjects(c, 0);
274 tryUnlinkClientFromPendingRefReply(c, 1);
275 }
276
277 if (allpaused) resumeAllIOThreads();
278}
279
280/* Empty a Redis DB asynchronously. What the function does actually is to
281 * create a new empty set of hash tables and scheduling the old ones for
282 * lazy freeing. */
283void emptyDbAsync(redisDb *db) {
284 int slot_count_bits = 0;
285 int flags = KVSTORE_ALLOCATE_DICTS_ON_DEMAND;
286 if (server.cluster_enabled) {
287 slot_count_bits = CLUSTER_SLOT_MASK_BITS;
288 flags |= KVSTORE_FREE_EMPTY_DICTS;
289 }
290 kvstore *oldkeys = db->keys, *oldexpires = db->expires;
291 estore *oldsubexpires = db->subexpires;
292 db->keys = kvstoreCreate(&kvstoreExType, &dbDictType, slot_count_bits, flags);
293 db->expires = kvstoreCreate(&kvstoreBaseType, &dbExpiresDictType, slot_count_bits, flags);
294 db->subexpires = estoreCreate(&subexpiresBucketsType, slot_count_bits);
295 protectClientReplyObjects(); /* Protect client reply objects before async free. */
296 emptyDbDataAsync(oldkeys, oldexpires, oldsubexpires);
297}
298
299/* Empty a Redis DB data asynchronously. */
300void emptyDbDataAsync(kvstore *keys, kvstore *expires, ebuckets hexpires) {
301 atomicIncr(lazyfree_objects, kvstoreSize(keys));
302 bioCreateLazyFreeJob(lazyfreeFreeDatabase, 3, keys, expires, hexpires);
303}
304
305/* Free the key tracking table.
306 * If the table is huge enough, free it in async way. */
307void freeTrackingRadixTreeAsync(rax *tracking) {
308 /* Because this rax has only keys and no values so we use numnodes. */
309 if (tracking->numnodes > LAZYFREE_THRESHOLD) {
310 atomicIncr(lazyfree_objects,tracking->numele);
311 bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking);
312 } else {
313 freeTrackingRadixTree(tracking);
314 }
315}
316
317/* Free the error stats rax tree.
318 * If the rax tree is huge enough, free it in async way. */
319void freeErrorsRadixTreeAsync(rax *errors) {
320 /* Because this rax has only keys and no values so we use numnodes. */
321 if (errors->numnodes > LAZYFREE_THRESHOLD) {
322 atomicIncr(lazyfree_objects,errors->numele);
323 bioCreateLazyFreeJob(lazyFreeErrors,1,errors);
324 } else {
325 raxFreeWithCallback(errors, zfree);
326 }
327}
328
329/* Free lua_scripts dict and lru list, if the dict is huge enough, free them in async way.
330 * Close lua interpreter, if there are a lot of lua scripts, close it in async way. */
331void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua) {
332 if (dictSize(lua_scripts) > LAZYFREE_THRESHOLD) {
333 atomicIncr(lazyfree_objects,dictSize(lua_scripts));
334 bioCreateLazyFreeJob(lazyFreeLuaScripts,3,lua_scripts,lua_scripts_lru_list,lua);
335 } else {
336 freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua);
337 }
338}
339
340/* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */
341void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx, dict *engs) {
342 if (functionsLibCtxFunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) {
343 atomicIncr(lazyfree_objects,functionsLibCtxFunctionsLen(functions_lib_ctx)+dictSize(engs));
344 bioCreateLazyFreeJob(lazyFreeFunctionsCtx,2,functions_lib_ctx,engs);
345 } else {
346 functionsLibCtxFree(functions_lib_ctx);
347 dictRelease(engs);
348 }
349}
350
351/* Free replication backlog referencing buffer blocks and rax index. */
352void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) {
353 if (listLength(blocks) > LAZYFREE_THRESHOLD ||
354 raxSize(index) > LAZYFREE_THRESHOLD)
355 {
356 atomicIncr(lazyfree_objects,listLength(blocks)+raxSize(index));
357 bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem,2,blocks,index);
358 } else {
359 listRelease(blocks);
360 raxFree(index);
361 }
362}