summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/lazyfree.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/lazyfree.c')
-rw-r--r--examples/redis-unstable/src/lazyfree.c362
1 files changed, 0 insertions, 362 deletions
diff --git a/examples/redis-unstable/src/lazyfree.c b/examples/redis-unstable/src/lazyfree.c
deleted file mode 100644
index 1960be1..0000000
--- a/examples/redis-unstable/src/lazyfree.c
+++ /dev/null
@@ -1,362 +0,0 @@
-#include "server.h"
-#include "bio.h"
-#include "atomicvar.h"
-#include "functions.h"
-#include "cluster.h"
-#include "ebuckets.h"
-
-static redisAtomic size_t lazyfree_objects = 0;
-static redisAtomic size_t lazyfreed_objects = 0;
-
-/* Release objects from the lazyfree thread. It's just decrRefCount()
- * updating the count of objects to release. */
-void lazyfreeFreeObject(void *args[]) {
- robj *o = (robj *) args[0];
- decrRefCount(o);
- atomicDecr(lazyfree_objects,1);
- atomicIncr(lazyfreed_objects,1);
-}
-
-/* Release a database from the lazyfree thread. The 'db' pointer is the
- * database which was substituted with a fresh one in the main thread
- * when the database was logically deleted. */
-void lazyfreeFreeDatabase(void *args[]) {
- kvstore *da1 = args[0];
- kvstore *da2 = args[1];
- estore *subexpires = args[2];
- estoreRelease(subexpires);
- size_t numkeys = kvstoreSize(da1);
- kvstoreRelease(da1);
- kvstoreRelease(da2);
- atomicDecr(lazyfree_objects,numkeys);
- atomicIncr(lazyfreed_objects,numkeys);
-
-#if defined(USE_JEMALLOC)
- /* Only clear the current thread cache.
- * Ignore the return call since this will fail if the tcache is disabled. */
- je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0);
-
- jemalloc_purge();
-#endif
-}
-
-/* Release the key tracking table. */
-void lazyFreeTrackingTable(void *args[]) {
- rax *rt = args[0];
- size_t len = rt->numele;
- freeTrackingRadixTree(rt);
- atomicDecr(lazyfree_objects,len);
- atomicIncr(lazyfreed_objects,len);
-}
-
-/* Release the error stats rax tree. */
-void lazyFreeErrors(void *args[]) {
- rax *errors = args[0];
- size_t len = errors->numele;
- raxFreeWithCallback(errors, zfree);
- atomicDecr(lazyfree_objects,len);
- atomicIncr(lazyfreed_objects,len);
-}
-
-/* Release the lua_scripts dict. */
-void lazyFreeLuaScripts(void *args[]) {
- dict *lua_scripts = args[0];
- list *lua_scripts_lru_list = args[1];
- lua_State *lua = args[2];
- long long len = dictSize(lua_scripts);
- freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua);
- atomicDecr(lazyfree_objects,len);
- atomicIncr(lazyfreed_objects,len);
-}
-
-/* Release the functions ctx. */
-void lazyFreeFunctionsCtx(void *args[]) {
- functionsLibCtx *functions_lib_ctx = args[0];
- dict *engs = args[1];
- size_t len = functionsLibCtxFunctionsLen(functions_lib_ctx);
- functionsLibCtxFree(functions_lib_ctx);
- len += dictSize(engs);
- dictRelease(engs);
- atomicDecr(lazyfree_objects,len);
- atomicIncr(lazyfreed_objects,len);
-}
-
-/* Release replication backlog referencing memory. */
-void lazyFreeReplicationBacklogRefMem(void *args[]) {
- list *blocks = args[0];
- rax *index = args[1];
- long long len = listLength(blocks);
- len += raxSize(index);
- listRelease(blocks);
- raxFree(index);
- atomicDecr(lazyfree_objects,len);
- atomicIncr(lazyfreed_objects,len);
-}
-
-/* Return the number of currently pending objects to free. */
-size_t lazyfreeGetPendingObjectsCount(void) {
- size_t aux;
- atomicGet(lazyfree_objects,aux);
- return aux;
-}
-
-/* Return the number of objects that have been freed. */
-size_t lazyfreeGetFreedObjectsCount(void) {
- size_t aux;
- atomicGet(lazyfreed_objects,aux);
- return aux;
-}
-
-void lazyfreeResetStats(void) {
- atomicSet(lazyfreed_objects,0);
-}
-
-/* Return the amount of work needed in order to free an object.
- * The return value is not always the actual number of allocations the
- * object is composed of, but a number proportional to it.
- *
- * For strings the function always returns 1.
- *
- * For aggregated objects represented by hash tables or other data structures
- * the function just returns the number of elements the object is composed of.
- *
- * Objects composed of single allocations are always reported as having a
- * single item even if they are actually logical composed of multiple
- * elements.
- *
- * For lists the function returns the number of elements in the quicklist
- * representing the list. */
-size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) {
- if (obj->type == OBJ_LIST && obj->encoding == OBJ_ENCODING_QUICKLIST) {
- quicklist *ql = obj->ptr;
- return ql->len;
- } else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
- dict *ht = obj->ptr;
- return dictSize(ht);
- } else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
- zset *zs = obj->ptr;
- return zs->zsl->length;
- } else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
- dict *ht = obj->ptr;
- return dictSize(ht);
- } else if (obj->type == OBJ_STREAM) {
- size_t effort = 0;
- stream *s = obj->ptr;
-
- /* Make a best effort estimate to maintain constant runtime. Every macro
- * node in the Stream is one allocation. */
- effort += s->rax->numnodes;
-
- /* Every consumer group is an allocation and so are the entries in its
- * PEL. We use size of the first group's PEL as an estimate for all
- * others. */
- if (s->cgroups && raxSize(s->cgroups)) {
- raxIterator ri;
- streamCG *cg;
- raxStart(&ri,s->cgroups);
- raxSeek(&ri,"^",NULL,0);
- /* There must be at least one group so the following should always
- * work. */
- serverAssert(raxNext(&ri));
- cg = ri.data;
- effort += raxSize(s->cgroups)*(1+raxSize(cg->pel));
- raxStop(&ri);
- }
- return effort;
- } else if (obj->type == OBJ_MODULE) {
- size_t effort = moduleGetFreeEffort(key, obj, dbid);
- /* If the module's free_effort returns 0, we will use asynchronous free
- * memory by default. */
- return effort == 0 ? ULONG_MAX : effort;
- } else {
- return 1; /* Everything else is a single allocation. */
- }
-}
-
-/* If there are enough allocations to free the value object asynchronously, it
- * may be put into a lazy free list instead of being freed synchronously. The
- * lazy free list will be reclaimed in a different bio.c thread. If the value is
- * composed of a few allocations, to free in a lazy way is actually just
- * slower... So under a certain limit we just free the object synchronously. */
-#define LAZYFREE_THRESHOLD 64
-
-/* Free an object, if the object is huge enough, free it in async way. */
-void freeObjAsync(robj *key, robj *obj, int dbid) {
- size_t free_effort = lazyfreeGetFreeEffort(key,obj,dbid);
- /* Note that if the object is shared, to reclaim it now it is not
- * possible. This rarely happens, however sometimes the implementation
- * of parts of the Redis core may call incrRefCount() to protect
- * objects, and then call dbDelete(). */
- if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) {
- atomicIncr(lazyfree_objects,1);
- bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj);
- } else {
- decrRefCount(obj);
- }
-}
-
-/* Duplicate client reply objects that reference database objects to avoid race
- * conditions with bio threads during async flushdb.
- *
- * Since incrRefCount/decrRefCount are not thread-safe, and bio thread may
- * free database objects while main thread/IO threads send client replies, we need to
- * create independent copies of the string objects to avoid concurrent access. */
-static void protectClientReplyObjects(void) {
- /* If there are no clients with pending ref replies, exit ASAP. */
- if (!listLength(server.clients_with_pending_ref_reply))
- return;
-
- /* Pause all IO threads to safely duplicate string objects. */
- int allpaused = 0;
- if (server.io_threads_num > 1) {
- serverAssert(pthread_equal(server.main_thread_id, pthread_self()));
- allpaused = 1;
- pauseAllIOThreads();
- }
-
- listNode *ln;
- listIter li;
- listRewind(server.clients_with_pending_ref_reply, &li);
- while ((ln = listNext(&li)) != NULL) {
- client *c = listNodeValue(ln);
-
- /* Process c->buf if it's encoded */
- if (c->buf_encoded && c->bufpos > 0) {
- char *ptr = c->buf;
- while (ptr < c->buf + c->bufpos) {
- payloadHeader *header = (payloadHeader *)ptr;
- ptr += sizeof(payloadHeader);
-
- if (header->payload_type == BULK_STR_REF) {
- bulkStrRef *str_ref = (bulkStrRef *)ptr;
- if (str_ref->obj != NULL) {
- /* Duplicate the string object */
- robj *new_obj = dupStringObject(str_ref->obj);
- decrRefCount(str_ref->obj);
- str_ref->obj = new_obj;
- }
- }
- ptr += header->payload_len;
- }
- }
-
- /* Process reply list */
- if (c->reply && listLength(c->reply)) {
- listIter reply_li;
- listNode *reply_ln;
- listRewind(c->reply, &reply_li);
- while ((reply_ln = listNext(&reply_li))) {
- clientReplyBlock *block = listNodeValue(reply_ln);
- if (block && block->buf_encoded) {
- char *ptr = block->buf;
- while (ptr < block->buf + block->used) {
- payloadHeader *header = (payloadHeader *)ptr;
- ptr += sizeof(payloadHeader);
-
- if (header->payload_type == BULK_STR_REF) {
- bulkStrRef *str_ref = (bulkStrRef *)ptr;
- if (str_ref->obj != NULL) {
- /* Duplicate the string object */
- robj *new_obj = dupStringObject(str_ref->obj);
- decrRefCount(str_ref->obj);
- str_ref->obj = new_obj;
- }
- }
- ptr += header->payload_len;
- }
- }
- }
- }
-
- /* Process references in IO deferred objects and remove client from
- * pending ref list since all refs have been duplicated above. */
- freeClientIODeferredObjects(c, 0);
- tryUnlinkClientFromPendingRefReply(c, 1);
- }
-
- if (allpaused) resumeAllIOThreads();
-}
-
-/* Empty a Redis DB asynchronously. What the function does actually is to
- * create a new empty set of hash tables and scheduling the old ones for
- * lazy freeing. */
-void emptyDbAsync(redisDb *db) {
- int slot_count_bits = 0;
- int flags = KVSTORE_ALLOCATE_DICTS_ON_DEMAND;
- if (server.cluster_enabled) {
- slot_count_bits = CLUSTER_SLOT_MASK_BITS;
- flags |= KVSTORE_FREE_EMPTY_DICTS;
- }
- kvstore *oldkeys = db->keys, *oldexpires = db->expires;
- estore *oldsubexpires = db->subexpires;
- db->keys = kvstoreCreate(&kvstoreExType, &dbDictType, slot_count_bits, flags);
- db->expires = kvstoreCreate(&kvstoreBaseType, &dbExpiresDictType, slot_count_bits, flags);
- db->subexpires = estoreCreate(&subexpiresBucketsType, slot_count_bits);
- protectClientReplyObjects(); /* Protect client reply objects before async free. */
- emptyDbDataAsync(oldkeys, oldexpires, oldsubexpires);
-}
-
-/* Empty a Redis DB data asynchronously. */
-void emptyDbDataAsync(kvstore *keys, kvstore *expires, ebuckets hexpires) {
- atomicIncr(lazyfree_objects, kvstoreSize(keys));
- bioCreateLazyFreeJob(lazyfreeFreeDatabase, 3, keys, expires, hexpires);
-}
-
-/* Free the key tracking table.
- * If the table is huge enough, free it in async way. */
-void freeTrackingRadixTreeAsync(rax *tracking) {
- /* Because this rax has only keys and no values so we use numnodes. */
- if (tracking->numnodes > LAZYFREE_THRESHOLD) {
- atomicIncr(lazyfree_objects,tracking->numele);
- bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking);
- } else {
- freeTrackingRadixTree(tracking);
- }
-}
-
-/* Free the error stats rax tree.
- * If the rax tree is huge enough, free it in async way. */
-void freeErrorsRadixTreeAsync(rax *errors) {
- /* Because this rax has only keys and no values so we use numnodes. */
- if (errors->numnodes > LAZYFREE_THRESHOLD) {
- atomicIncr(lazyfree_objects,errors->numele);
- bioCreateLazyFreeJob(lazyFreeErrors,1,errors);
- } else {
- raxFreeWithCallback(errors, zfree);
- }
-}
-
-/* Free lua_scripts dict and lru list, if the dict is huge enough, free them in async way.
- * Close lua interpreter, if there are a lot of lua scripts, close it in async way. */
-void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua) {
- if (dictSize(lua_scripts) > LAZYFREE_THRESHOLD) {
- atomicIncr(lazyfree_objects,dictSize(lua_scripts));
- bioCreateLazyFreeJob(lazyFreeLuaScripts,3,lua_scripts,lua_scripts_lru_list,lua);
- } else {
- freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua);
- }
-}
-
-/* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */
-void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx, dict *engs) {
- if (functionsLibCtxFunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) {
- atomicIncr(lazyfree_objects,functionsLibCtxFunctionsLen(functions_lib_ctx)+dictSize(engs));
- bioCreateLazyFreeJob(lazyFreeFunctionsCtx,2,functions_lib_ctx,engs);
- } else {
- functionsLibCtxFree(functions_lib_ctx);
- dictRelease(engs);
- }
-}
-
-/* Free replication backlog referencing buffer blocks and rax index. */
-void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) {
- if (listLength(blocks) > LAZYFREE_THRESHOLD ||
- raxSize(index) > LAZYFREE_THRESHOLD)
- {
- atomicIncr(lazyfree_objects,listLength(blocks)+raxSize(index));
- bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem,2,blocks,index);
- } else {
- listRelease(blocks);
- raxFree(index);
- }
-}