1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
|
#include "server.h"
#include "bio.h"
#include "atomicvar.h"
#include "functions.h"
#include "cluster.h"
#include "ebuckets.h"
static redisAtomic size_t lazyfree_objects = 0;
static redisAtomic size_t lazyfreed_objects = 0;
/* Release objects from the lazyfree thread. It's just decrRefCount()
* updating the count of objects to release. */
void lazyfreeFreeObject(void *args[]) {
robj *o = (robj *) args[0];
decrRefCount(o);
atomicDecr(lazyfree_objects,1);
atomicIncr(lazyfreed_objects,1);
}
/* Release a database from the lazyfree thread. The 'db' pointer is the
* database which was substituted with a fresh one in the main thread
* when the database was logically deleted. */
void lazyfreeFreeDatabase(void *args[]) {
kvstore *da1 = args[0];
kvstore *da2 = args[1];
estore *subexpires = args[2];
estoreRelease(subexpires);
size_t numkeys = kvstoreSize(da1);
kvstoreRelease(da1);
kvstoreRelease(da2);
atomicDecr(lazyfree_objects,numkeys);
atomicIncr(lazyfreed_objects,numkeys);
#if defined(USE_JEMALLOC)
/* Only clear the current thread cache.
* Ignore the return call since this will fail if the tcache is disabled. */
je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0);
jemalloc_purge();
#endif
}
/* Release the key tracking table. */
void lazyFreeTrackingTable(void *args[]) {
rax *rt = args[0];
size_t len = rt->numele;
freeTrackingRadixTree(rt);
atomicDecr(lazyfree_objects,len);
atomicIncr(lazyfreed_objects,len);
}
/* Release the error stats rax tree. */
void lazyFreeErrors(void *args[]) {
rax *errors = args[0];
size_t len = errors->numele;
raxFreeWithCallback(errors, zfree);
atomicDecr(lazyfree_objects,len);
atomicIncr(lazyfreed_objects,len);
}
/* Release the lua_scripts dict. */
void lazyFreeLuaScripts(void *args[]) {
dict *lua_scripts = args[0];
list *lua_scripts_lru_list = args[1];
lua_State *lua = args[2];
long long len = dictSize(lua_scripts);
freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua);
atomicDecr(lazyfree_objects,len);
atomicIncr(lazyfreed_objects,len);
}
/* Release the functions ctx. */
void lazyFreeFunctionsCtx(void *args[]) {
functionsLibCtx *functions_lib_ctx = args[0];
dict *engs = args[1];
size_t len = functionsLibCtxFunctionsLen(functions_lib_ctx);
functionsLibCtxFree(functions_lib_ctx);
len += dictSize(engs);
dictRelease(engs);
atomicDecr(lazyfree_objects,len);
atomicIncr(lazyfreed_objects,len);
}
/* Release replication backlog referencing memory. */
void lazyFreeReplicationBacklogRefMem(void *args[]) {
list *blocks = args[0];
rax *index = args[1];
long long len = listLength(blocks);
len += raxSize(index);
listRelease(blocks);
raxFree(index);
atomicDecr(lazyfree_objects,len);
atomicIncr(lazyfreed_objects,len);
}
/* Return the number of currently pending objects to free. */
size_t lazyfreeGetPendingObjectsCount(void) {
size_t aux;
atomicGet(lazyfree_objects,aux);
return aux;
}
/* Return the number of objects that have been freed. */
size_t lazyfreeGetFreedObjectsCount(void) {
size_t aux;
atomicGet(lazyfreed_objects,aux);
return aux;
}
void lazyfreeResetStats(void) {
atomicSet(lazyfreed_objects,0);
}
/* Return the amount of work needed in order to free an object.
* The return value is not always the actual number of allocations the
* object is composed of, but a number proportional to it.
*
* For strings the function always returns 1.
*
* For aggregated objects represented by hash tables or other data structures
* the function just returns the number of elements the object is composed of.
*
* Objects composed of single allocations are always reported as having a
* single item even if they are actually logical composed of multiple
* elements.
*
* For lists the function returns the number of elements in the quicklist
* representing the list. */
size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) {
if (obj->type == OBJ_LIST && obj->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = obj->ptr;
return ql->len;
} else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
dict *ht = obj->ptr;
return dictSize(ht);
} else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
zset *zs = obj->ptr;
return zs->zsl->length;
} else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
dict *ht = obj->ptr;
return dictSize(ht);
} else if (obj->type == OBJ_STREAM) {
size_t effort = 0;
stream *s = obj->ptr;
/* Make a best effort estimate to maintain constant runtime. Every macro
* node in the Stream is one allocation. */
effort += s->rax->numnodes;
/* Every consumer group is an allocation and so are the entries in its
* PEL. We use size of the first group's PEL as an estimate for all
* others. */
if (s->cgroups && raxSize(s->cgroups)) {
raxIterator ri;
streamCG *cg;
raxStart(&ri,s->cgroups);
raxSeek(&ri,"^",NULL,0);
/* There must be at least one group so the following should always
* work. */
serverAssert(raxNext(&ri));
cg = ri.data;
effort += raxSize(s->cgroups)*(1+raxSize(cg->pel));
raxStop(&ri);
}
return effort;
} else if (obj->type == OBJ_MODULE) {
size_t effort = moduleGetFreeEffort(key, obj, dbid);
/* If the module's free_effort returns 0, we will use asynchronous free
* memory by default. */
return effort == 0 ? ULONG_MAX : effort;
} else {
return 1; /* Everything else is a single allocation. */
}
}
/* If there are enough allocations to free the value object asynchronously, it
* may be put into a lazy free list instead of being freed synchronously. The
* lazy free list will be reclaimed in a different bio.c thread. If the value is
* composed of a few allocations, to free in a lazy way is actually just
* slower... So under a certain limit we just free the object synchronously. */
#define LAZYFREE_THRESHOLD 64
/* Free an object, if the object is huge enough, free it in async way. */
void freeObjAsync(robj *key, robj *obj, int dbid) {
size_t free_effort = lazyfreeGetFreeEffort(key,obj,dbid);
/* Note that if the object is shared, to reclaim it now it is not
* possible. This rarely happens, however sometimes the implementation
* of parts of the Redis core may call incrRefCount() to protect
* objects, and then call dbDelete(). */
if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) {
atomicIncr(lazyfree_objects,1);
bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj);
} else {
decrRefCount(obj);
}
}
/* Duplicate client reply objects that reference database objects to avoid race
* conditions with bio threads during async flushdb.
*
* Since incrRefCount/decrRefCount are not thread-safe, and bio thread may
* free database objects while main thread/IO threads send client replies, we need to
* create independent copies of the string objects to avoid concurrent access. */
static void protectClientReplyObjects(void) {
/* If there are no clients with pending ref replies, exit ASAP. */
if (!listLength(server.clients_with_pending_ref_reply))
return;
/* Pause all IO threads to safely duplicate string objects. */
int allpaused = 0;
if (server.io_threads_num > 1) {
serverAssert(pthread_equal(server.main_thread_id, pthread_self()));
allpaused = 1;
pauseAllIOThreads();
}
listNode *ln;
listIter li;
listRewind(server.clients_with_pending_ref_reply, &li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
/* Process c->buf if it's encoded */
if (c->buf_encoded && c->bufpos > 0) {
char *ptr = c->buf;
while (ptr < c->buf + c->bufpos) {
payloadHeader *header = (payloadHeader *)ptr;
ptr += sizeof(payloadHeader);
if (header->payload_type == BULK_STR_REF) {
bulkStrRef *str_ref = (bulkStrRef *)ptr;
if (str_ref->obj != NULL) {
/* Duplicate the string object */
robj *new_obj = dupStringObject(str_ref->obj);
decrRefCount(str_ref->obj);
str_ref->obj = new_obj;
}
}
ptr += header->payload_len;
}
}
/* Process reply list */
if (c->reply && listLength(c->reply)) {
listIter reply_li;
listNode *reply_ln;
listRewind(c->reply, &reply_li);
while ((reply_ln = listNext(&reply_li))) {
clientReplyBlock *block = listNodeValue(reply_ln);
if (block && block->buf_encoded) {
char *ptr = block->buf;
while (ptr < block->buf + block->used) {
payloadHeader *header = (payloadHeader *)ptr;
ptr += sizeof(payloadHeader);
if (header->payload_type == BULK_STR_REF) {
bulkStrRef *str_ref = (bulkStrRef *)ptr;
if (str_ref->obj != NULL) {
/* Duplicate the string object */
robj *new_obj = dupStringObject(str_ref->obj);
decrRefCount(str_ref->obj);
str_ref->obj = new_obj;
}
}
ptr += header->payload_len;
}
}
}
}
/* Process references in IO deferred objects and remove client from
* pending ref list since all refs have been duplicated above. */
freeClientIODeferredObjects(c, 0);
tryUnlinkClientFromPendingRefReply(c, 1);
}
if (allpaused) resumeAllIOThreads();
}
/* Empty a Redis DB asynchronously. What the function does actually is to
* create a new empty set of hash tables and scheduling the old ones for
* lazy freeing. */
void emptyDbAsync(redisDb *db) {
int slot_count_bits = 0;
int flags = KVSTORE_ALLOCATE_DICTS_ON_DEMAND;
if (server.cluster_enabled) {
slot_count_bits = CLUSTER_SLOT_MASK_BITS;
flags |= KVSTORE_FREE_EMPTY_DICTS;
}
kvstore *oldkeys = db->keys, *oldexpires = db->expires;
estore *oldsubexpires = db->subexpires;
db->keys = kvstoreCreate(&kvstoreExType, &dbDictType, slot_count_bits, flags);
db->expires = kvstoreCreate(&kvstoreBaseType, &dbExpiresDictType, slot_count_bits, flags);
db->subexpires = estoreCreate(&subexpiresBucketsType, slot_count_bits);
protectClientReplyObjects(); /* Protect client reply objects before async free. */
emptyDbDataAsync(oldkeys, oldexpires, oldsubexpires);
}
/* Empty a Redis DB data asynchronously. */
void emptyDbDataAsync(kvstore *keys, kvstore *expires, ebuckets hexpires) {
atomicIncr(lazyfree_objects, kvstoreSize(keys));
bioCreateLazyFreeJob(lazyfreeFreeDatabase, 3, keys, expires, hexpires);
}
/* Free the key tracking table.
* If the table is huge enough, free it in async way. */
void freeTrackingRadixTreeAsync(rax *tracking) {
/* Because this rax has only keys and no values so we use numnodes. */
if (tracking->numnodes > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects,tracking->numele);
bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking);
} else {
freeTrackingRadixTree(tracking);
}
}
/* Free the error stats rax tree.
* If the rax tree is huge enough, free it in async way. */
void freeErrorsRadixTreeAsync(rax *errors) {
/* Because this rax has only keys and no values so we use numnodes. */
if (errors->numnodes > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects,errors->numele);
bioCreateLazyFreeJob(lazyFreeErrors,1,errors);
} else {
raxFreeWithCallback(errors, zfree);
}
}
/* Free lua_scripts dict and lru list, if the dict is huge enough, free them in async way.
* Close lua interpreter, if there are a lot of lua scripts, close it in async way. */
void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua) {
if (dictSize(lua_scripts) > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects,dictSize(lua_scripts));
bioCreateLazyFreeJob(lazyFreeLuaScripts,3,lua_scripts,lua_scripts_lru_list,lua);
} else {
freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua);
}
}
/* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */
void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx, dict *engs) {
if (functionsLibCtxFunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects,functionsLibCtxFunctionsLen(functions_lib_ctx)+dictSize(engs));
bioCreateLazyFreeJob(lazyFreeFunctionsCtx,2,functions_lib_ctx,engs);
} else {
functionsLibCtxFree(functions_lib_ctx);
dictRelease(engs);
}
}
/* Free replication backlog referencing buffer blocks and rax index. */
void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) {
if (listLength(blocks) > LAZYFREE_THRESHOLD ||
raxSize(index) > LAZYFREE_THRESHOLD)
{
atomicIncr(lazyfree_objects,listLength(blocks)+raxSize(index));
bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem,2,blocks,index);
} else {
listRelease(blocks);
raxFree(index);
}
}
|