aboutsummaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/db.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/db.c')
-rw-r--r--examples/redis-unstable/src/db.c3793
1 files changed, 3793 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/db.c b/examples/redis-unstable/src/db.c
new file mode 100644
index 0000000..2a64147
--- /dev/null
+++ b/examples/redis-unstable/src/db.c
@@ -0,0 +1,3793 @@
1/*
2 * Copyright (c) 2009-Present, Redis Ltd.
3 * All rights reserved.
4 *
5 * Copyright (c) 2024-present, Valkey contributors.
6 * All rights reserved.
7 *
8 * Licensed under your choice of (a) the Redis Source Available License 2.0
9 * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
10 * GNU Affero General Public License v3 (AGPLv3).
11 *
12 * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
13 */
14
15#include "server.h"
16#include "cluster.h"
17#include "atomicvar.h"
18#include "latency.h"
19#include "script.h"
20#include "functions.h"
21#include "cluster_asm.h"
22#include "redisassert.h"
23
24#include <signal.h>
25#include <ctype.h>
26#include "bio.h"
27#include "keymeta.h"
28
29/*-----------------------------------------------------------------------------
30 * C-level DB API
31 *----------------------------------------------------------------------------*/
32
33static_assert(MAX_KEYSIZES_TYPES == OBJ_TYPE_BASIC_MAX, "Must be equal");
34
35/* Flags for expireIfNeeded */
36#define EXPIRE_FORCE_DELETE_EXPIRED 1
37#define EXPIRE_AVOID_DELETE_EXPIRED 2
38#define EXPIRE_ALLOW_ACCESS_EXPIRED 4
39#define EXPIRE_ALLOW_ACCESS_TRIMMED 8
40
41/* Return values for expireIfNeeded */
42typedef enum {
43 KEY_VALID = 0, /* Could be volatile and not yet expired, non-volatile, or even non-existing key. */
44 KEY_EXPIRED, /* Logically expired but not yet deleted. */
45 KEY_DELETED, /* The key was deleted now. */
46 KEY_TRIMMED /* Logically trimmed but not yet deleted. */
47} keyStatus;
48
49static keyStatus expireIfNeeded(redisDb *db, robj *key, kvobj *kv, int flags);
50
51/* Update LFU when an object is accessed.
52 * Firstly, decrement the counter if the decrement time is reached.
53 * Then logarithmically increment the counter, and update the access time. */
54void updateLFU(robj *val) {
55 unsigned long counter = LFUDecrAndReturn(val);
56 counter = LFULogIncr(counter);
57 val->lru = (LFUGetTimeInMinutes()<<8) | counter;
58}
59
60/* Update LRM when an object is modified. */
61void updateLRM(robj *o) {
62 if (o->refcount == OBJ_SHARED_REFCOUNT)
63 return;
64 if (server.maxmemory_policy & MAXMEMORY_FLAG_LRM) {
65 o->lru = LRU_CLOCK();
66 }
67}
68
69/*
70 * Update histogram of keys-sizes
71 *
72 * It is used to track the distribution of key sizes in the dataset. It is updated
73 * every time key's length is modified. Available to user via INFO command.
74 *
75 * The histogram is a base-2 logarithmic histogram, with 64 bins. The i'th bin
76 * represents the number of keys with a size in the range 2^i and 2^(i+1)
77 * exclusive. oldLen/newLen must be smaller than 2^48, and if their value
78 * equals -1, it means that the key is being created/deleted, respectively. Each
79 * data type has its own histogram and it is per database (In addition, there is
80 * histogram per slot for future cluster use).
81 *
82 * Example mapping of key lengths to bins:
83 * [1,2)->1 [2,4)->2 [4,8)->3 [8,16)->4 ...
84 *
85 * Since strings can be zero length, the histogram also tracks:
86 * [0,1)->0
87 */
88void updateKeysizesHist(redisDb *db, int didx, uint32_t type, int64_t oldLen, int64_t newLen) {
89 if(unlikely(type >= OBJ_TYPE_BASIC_MAX))
90 return;
91
92 kvstoreDictMetadata *dictMeta = kvstoreGetDictMeta(db->keys, didx, 0);
93 kvstoreMetadata *kvstoreMeta = kvstoreGetMetadata(db->keys);
94
95 if (oldLen > 0) {
96 int old_bin = log2ceil(oldLen) + 1;
97 debugServerAssert(old_bin < MAX_KEYSIZES_BINS);
98 /* If following a key deletion it is last one in slot's dict, then
99 * slot's dict might get released as well. Verify if metadata is not NULL. */
100 if(dictMeta) {
101 dictMeta->keysizes_hist[type][old_bin]--;
102 debugServerAssert(dictMeta->keysizes_hist[type][old_bin] >= 0);
103 }
104 kvstoreMeta->keysizes_hist[type][old_bin]--;
105 debugServerAssert(kvstoreMeta->keysizes_hist[type][old_bin] >= 0);
106 } else {
107 /* here, oldLen can be either 0 or -1 */
108 if (oldLen == 0) {
109 /* Only strings can be empty. Yet, a command flow might temporarily
110 * dbAdd() empty collection, and only after add elements. */
111
112 if (dictMeta) {
113 dictMeta->keysizes_hist[type][0]--;
114 debugServerAssert(dictMeta->keysizes_hist[type][0] >= 0);
115 }
116 kvstoreMeta->keysizes_hist[type][0]--;
117 debugServerAssert(kvstoreMeta->keysizes_hist[type][0] >= 0);
118 }
119 }
120
121 if (newLen > 0) {
122 int new_bin = log2ceil(newLen) + 1;
123 debugServerAssert(new_bin < MAX_KEYSIZES_BINS);
124 /* If following a key deletion it is last one in slot's dict, then
125 * slot's dict might get released as well. Verify if metadata is not NULL. */
126 if(dictMeta) dictMeta->keysizes_hist[type][new_bin]++;
127 kvstoreMeta->keysizes_hist[type][new_bin]++;
128 } else {
129 /* here, newLen can be either 0 or -1 */
130 if (newLen == 0) {
131 /* Only strings can be empty. Yet, a command flow might temporarily
132 * dbAdd() empty collection, and only after add elements. */
133
134 if (dictMeta) dictMeta->keysizes_hist[type][0]++;
135 kvstoreMeta->keysizes_hist[type][0]++;
136 }
137 }
138}
139
140void updateSlotAllocSize(redisDb *db, int didx, size_t oldsize, size_t newsize) {
141 debugServerAssert(server.memory_tracking_per_slot);
142 kvstoreDictMetadata *dictMeta = kvstoreGetDictMeta(db->keys, didx, 0);
143 if (!dictMeta) return;
144 debugServerAssert(oldsize <= dictMeta->alloc_size);
145 dictMeta->alloc_size -= oldsize;
146 dictMeta->alloc_size += newsize;
147}
148
149/* Assert keysizes histogram (For debugging only)
150 *
151 * Triggered by DEBUG KEYSIZES-HIST-ASSERT 1 and tested after each command.
152 */
153void dbgAssertKeysizesHist(redisDb *db) {
154 /* Scan DB and build expected histogram by scanning all keys */
155 int64_t scanHist[MAX_KEYSIZES_TYPES][MAX_KEYSIZES_BINS] = {{0}};
156 dictEntry *de;
157 kvstoreIterator kvs_it;
158 kvstoreIteratorInit(&kvs_it, db->keys);
159 while ((de = kvstoreIteratorNext(&kvs_it)) != NULL) {
160 kvobj *kv = dictGetKV(de);
161 if (kv->type < OBJ_TYPE_BASIC_MAX) {
162 int64_t len = getObjectLength(kv);
163 scanHist[kv->type][(len == 0) ? 0 : log2ceil(len) + 1]++;
164 }
165 }
166 kvstoreIteratorReset(&kvs_it);
167 for (int type = 0; type < OBJ_TYPE_BASIC_MAX; type++) {
168 kvstoreMetadata *meta = kvstoreGetMetadata(db->keys);
169 volatile int64_t *keysizesHist = meta->keysizes_hist[type];
170 for (int i = 0; i < MAX_KEYSIZES_BINS; i++) {
171 if (scanHist[type][i] == keysizesHist[i])
172 continue;
173
174 /* print scanStr vs. expected histograms for debugging */
175 char scanStr[500], keysizesStr[500];
176 int l1 = 0, l2 = 0;
177 for (int j = 0; (j < MAX_KEYSIZES_BINS) && (l1 < 500) && (l2 < 500); j++) {
178 if (scanHist[type][j])
179 l1 += snprintf(scanStr + l1, sizeof(scanStr) - l1,
180 "[%d]=%"PRId64" ", j, scanHist[type][j]);
181 if (keysizesHist[j])
182 l2 += snprintf(keysizesStr + l2, sizeof(keysizesStr) - l2,
183 "[%d]=%"PRId64" ", j, keysizesHist[j]);
184 }
185 serverPanic("dbgAssertKeysizesHist: type=%d\nscanStr=%s\nkeysizes=%s\n",
186 type, scanStr, keysizesStr);
187 }
188 }
189}
190
191/* Assert per-slot alloc_size (For debugging only)
192 *
193 * Triggered by DEBUG ALLOCSIZE-SLOTS-ASSERT 1 and tested after each command.
194 */
195void dbgAssertAllocSizePerSlot(redisDb *db) {
196 if (!server.memory_tracking_per_slot) return;
197 size_t slot_sizes[CLUSTER_SLOTS] = {0};
198 dictEntry *de;
199 kvstoreIterator kvs_it;
200 kvstoreIteratorInit(&kvs_it, db->keys);
201 while ((de = kvstoreIteratorNext(&kvs_it)) != NULL) {
202 int slot = kvstoreIteratorGetCurrentDictIndex(&kvs_it);
203 kvobj *kv = dictGetKV(de);
204 slot_sizes[slot] += kvobjAllocSize(kv);
205 }
206 kvstoreIteratorReset(&kvs_it);
207
208 int num_slots = kvstoreNumDicts(db->keys);
209 for (int slot = 0; slot < num_slots; slot++) {
210 kvstoreDictMetadata *dictMeta = kvstoreGetDictMeta(db->keys, slot, 0);
211 size_t want = slot_sizes[slot];
212 size_t have = dictMeta ? dictMeta->alloc_size : 0;
213 if (have == want) continue;
214 serverPanic("dbgAssertAllocSizePerSlot: slot=%d expected=%zu actual=%zu",
215 slot, want, have);
216 }
217}
218
219/* Lookup a kvobj for read or write operations, or return NULL if the it is not
220 * found in the specified DB. This function implements the functionality of
221 * lookupKeyRead(), lookupKeyWrite() and their ...WithFlags() variants.
222 *
223 * link - If key found, return the link of the key.
224 * If key not found, return the bucket link, where the key should be added.
225 * Or NULL if dict wasn't allocated yet.
226 *
227 * Side-effects of calling this function:
228 *
229 * 1. A key gets expired if it reached it's TTL.
230 * 2. The key's last access time is updated.
231 * 3. The global keys hits/misses stats are updated (reported in INFO).
232 * 4. If keyspace notifications are enabled, a "keymiss" notification is fired.
233 *
234 * Flags change the behavior of this command:
235 *
236 * LOOKUP_NONE (or zero): No special flags are passed.
237 * LOOKUP_NOTOUCH: Don't alter the last access time of the key.
238 * LOOKUP_NONOTIFY: Don't trigger keyspace event on key miss.
239 * LOOKUP_NOSTATS: Don't increment key hits/misses counters.
240 * LOOKUP_WRITE: Prepare the key for writing (delete expired keys even on
241 * replicas, use separate keyspace stats and events (TODO)).
242 * LOOKUP_NOEXPIRE: Perform expiration check, but avoid deleting the key,
243 * so that we don't have to propagate the deletion.
244 *
245 * Note: this function also returns NULL if the key is logically expired but
246 * still existing, in case this is a replica and the LOOKUP_WRITE is not set.
247 * Even if the key expiry is master-driven, we can correctly report a key is
248 * expired on replicas even if the master is lagging expiring our key via DELs
249 * in the replication link. */
250kvobj *lookupKey(redisDb *db, robj *key, int flags, dictEntryLink *link) {
251
252 kvobj *val = dbFindByLink(db, key->ptr, link);
253
254 if (val) {
255 /* Forcing deletion of expired keys on a replica makes the replica
256 * inconsistent with the master. We forbid it on readonly replicas, but
257 * we have to allow it on writable replicas to make write commands
258 * behave consistently.
259 *
260 * It's possible that the WRITE flag is set even during a readonly
261 * command, since the command may trigger events that cause modules to
262 * perform additional writes. */
263 int is_ro_replica = server.masterhost && server.repl_slave_ro;
264 int expire_flags = 0;
265 if (flags & LOOKUP_WRITE && !is_ro_replica)
266 expire_flags |= EXPIRE_FORCE_DELETE_EXPIRED;
267 if (flags & LOOKUP_NOEXPIRE)
268 expire_flags |= EXPIRE_AVOID_DELETE_EXPIRED;
269 if (flags & LOOKUP_ACCESS_EXPIRED)
270 expire_flags |= EXPIRE_ALLOW_ACCESS_EXPIRED;
271 if (flags & LOOKUP_ACCESS_TRIMMED)
272 expire_flags |= EXPIRE_ALLOW_ACCESS_TRIMMED;
273 if (expireIfNeeded(db, key, val, expire_flags) != KEY_VALID) {
274 /* The key is no longer valid. */
275 val = NULL;
276 if (link) *link = NULL;
277 }
278 }
279
280 if (val) {
281 /* Update the access time for the ageing algorithm.
282 * Don't do it if we have a saving child, as this will trigger
283 * a copy on write madness. */
284 if (((flags & LOOKUP_NOTOUCH) == 0) &&
285 (server.current_client && server.current_client->flags & CLIENT_NO_TOUCH) &&
286 (server.executing_client && server.executing_client->cmd->proc != touchCommand))
287 flags |= LOOKUP_NOTOUCH;
288 if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){
289 if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
290 updateLFU(val);
291 } else if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LRM)) {
292 /* LRM policy should NOT update timestamp on reads. */
293 val->lru = LRU_CLOCK();
294 }
295 }
296
297 if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
298 server.stat_keyspace_hits++;
299 /* TODO: Use separate hits stats for WRITE */
300 } else {
301 if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE)))
302 notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
303 if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE)))
304 server.stat_keyspace_misses++;
305 /* TODO: Use separate misses stats and notify event for WRITE */
306 }
307
308 return val;
309}
310
311/* Lookup a key for read operations, or return NULL if the key is not found
312 * in the specified DB.
313 *
314 * This API should not be used when we write to the key after obtaining
315 * the object linked to the key, but only for read only operations.
316 *
317 * This function is equivalent to lookupKey(). The point of using this function
318 * rather than lookupKey() directly is to indicate that the purpose is to read
319 * the key. */
320kvobj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
321 serverAssert(!(flags & LOOKUP_WRITE));
322 return lookupKey(db, key, flags, NULL);
323}
324
325/* Like lookupKeyReadWithFlags(), but does not use any flag, which is the
326 * common case. */
327kvobj *lookupKeyRead(redisDb *db, robj *key) {
328 return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
329}
330
331/* Lookup a key for write operations, and as a side effect, if needed, expires
332 * the key if its TTL is reached. It's equivalent to lookupKey() with the
333 * LOOKUP_WRITE flag added.
334 *
335 * Returns the linked value object if the key exists or NULL if the key
336 * does not exist in the specified DB. */
337kvobj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) {
338 return lookupKey(db, key, flags | LOOKUP_WRITE, NULL);
339}
340
341kvobj *lookupKeyWrite(redisDb *db, robj *key) {
342 return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);
343}
344
345/* Like lookupKeyWrite(), but accepts ref to optional `link`
346 *
347 * link - If key found, updated to link the key.
348 * If key not found, updated to the bucket where the key should be added.
349 * If key not found and dict is empty, it is set to NULL
350 */
351kvobj *lookupKeyWriteWithLink(redisDb *db, robj *key, dictEntryLink *link) {
352 return lookupKey(db, key, LOOKUP_NONE | LOOKUP_WRITE, link);
353}
354
355kvobj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
356 kvobj *kv = lookupKeyRead(c->db, key);
357 if (!kv) addReplyOrErrorObject(c, reply);
358 return kv;
359}
360
361kvobj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
362 kvobj *kv = lookupKeyWrite(c->db, key);
363 if (!kv) addReplyOrErrorObject(c, reply);
364 return kv;
365}
366
367/* Add a key-value entry to the DB.
368 *
369 * A copy of 'key' is stored in the database. The caller must ensure the
370 * `key` is properly freed by calling decrRefcount(key).
371 *
372 * The value may (if its reference counter == 1) be reallocated and become
373 * invalid after a call to this function. The (possibly reallocated) value is
374 * stored in the database and the 'valref' pointer is updated to point to the
375 * new allocation.
376 *
377 * The reference counter of the value pointed to by valref is not incremented,
378 * so the caller should not free the value using decrRefcount after calling this
379 * function.
380 *
381 * link - Optional link to bucket where the key should be added.
382 * On return, get updated, by need, to the inserted key.
383 *
384 * keymeta - Defines metadata to be attached to the key. Including optional
385 * expiration and modules metadata to be copied (REQUIRED).
386 */
387kvobj *dbAddInternal(redisDb *db, robj *key, robj **valref, dictEntryLink *link,
388 const KeyMetaSpec *keymeta)
389{
390 int slot = getKeySlot(key->ptr);
391 dictEntryLink tmp = NULL;
392 if (link == NULL) link = &tmp;
393 robj *val = *valref;
394 kvobj *kv = kvobjSet(key->ptr, val, keymeta->metabits);
395 initObjectLRUOrLFU(kv);
396 kvstoreDictSetAtLink(db->keys, slot, kv, link, 1);
397
398 /* Handle metadata (expiration and modules metadata) */
399 if (keymeta->metabits) {
400 if (keymeta->metabits & KEY_META_MASK_EXPIRE) {
401 /* Expiry is always the first meta (from last) */
402 long long expire = keymeta->meta[KEY_META_ID_MAX - 1];
403 kvobj *newkv = setExpireByLink(NULL, db, key->ptr, expire, *link);
404 serverAssert(newkv == kv);
405 }
406
407 /* memcpy modules metadata to beginning of kvobj */
408 if (keymeta->metabits & KEY_META_MASK_MODULES)
409 /* Also trivial overwrite expire */
410 memcpy(kvobjGetAllocPtr(kv),
411 keymeta->meta + KEY_META_ID_MAX - keymeta->numMeta,
412 keymeta->numMeta * sizeof(uint64_t));
413 }
414
415 signalKeyAsReady(db, key, kv->type);
416 notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id);
417 updateKeysizesHist(db, slot, kv->type, -1, getObjectLength(kv)); /* add hist */
418 if (server.memory_tracking_per_slot)
419 updateSlotAllocSize(db, slot, 0, kvobjAllocSize(kv));
420 *valref = kv;
421 return kv;
422}
423
424/* Read dbAddInternal() comment */
425kvobj *dbAdd(redisDb *db, robj *key, robj **valref) {
426 KeyMetaSpec keyMetaEmpty; /* No metadata added */
427 keyMetaSpecInit(&keyMetaEmpty);
428 return dbAddInternal(db, key, valref, NULL, &keyMetaEmpty);
429}
430
431kvobj *dbAddByLink(redisDb *db, robj *key, robj **valref, dictEntryLink *link) {
432 KeyMetaSpec keyMetaEmpty; /* No metadata added */
433 keyMetaSpecInit(&keyMetaEmpty);
434 return dbAddInternal(db, key, valref, link, &keyMetaEmpty);
435}
436
437/* Returns key's hash slot when cluster mode is enabled, or 0 when disabled.
438 * The only difference between this function and getKeySlot, is that it's not using cached key slot from the current_client
439 * and always calculates CRC hash.
440 * This is useful when slot needs to be calculated for a key that user didn't request for, such as in case of eviction. */
441int calculateKeySlot(sds key) {
442 return server.cluster_enabled ? keyHashSlot(key, (int) sdslen(key)) : 0;
443}
444
445/* Return slot-specific dictionary for key based on key's hash slot when cluster mode is enabled, else 0.*/
446int getKeySlot(sds key) {
447 if (!server.cluster_enabled) return 0;
448 /* This is performance optimization that uses pre-set slot id from the current command,
449 * in order to avoid calculation of the key hash.
450 *
451 * This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set.
452 * It only gets set during the execution of command under `call` method. Other flows requesting
453 * the key slot would fallback to calculateKeySlot.
454 */
455 if (server.current_client && server.current_client->slot >= 0 && server.current_client->flags & CLIENT_EXECUTING_COMMAND) {
456 debugServerAssertWithInfo(server.current_client, NULL,
457 (int)keyHashSlot(key, (int)sdslen(key)) == server.current_client->slot);
458 return server.current_client->slot;
459 }
460 int slot = keyHashSlot(key, (int)sdslen(key));
461 return slot;
462}
463
464/* Return the slot of the key in the command.
465 * INVALID_CLUSTER_SLOT if no keys, CLUSTER_CROSSSLOT if cross slot, otherwise the slot number. */
466int getSlotFromCommand(struct redisCommand *cmd, robj **argv, int argc) {
467 if (!cmd || !server.cluster_enabled) return INVALID_CLUSTER_SLOT;
468
469 /* Get the keys from the command */
470 getKeysResult result = GETKEYS_RESULT_INIT;
471 getKeysFromCommand(cmd, argv, argc, &result);
472
473 /* Extract slot from the keys result. */
474 int slot = extractSlotFromKeysResult(argv, &result);
475 getKeysFreeResult(&result);
476 return slot;
477}
478
479/* This is a special version of dbAdd() that is used only when loading
480 * keys from the RDB file: the key is passed as an SDS string that is
481 * copied by the function and freed by the caller.
482 *
483 * Moreover this function will not abort if the key is already busy, to
484 * give more control to the caller, nor will signal the key as ready
485 * since it is not useful in this context.
486 *
487 * If added to db, returns pointer to the object, Otherwise NULL is returned.
488 */
489kvobj *dbAddRDBLoad(redisDb *db, sds key, robj **valref, const KeyMetaSpec *keyMetaSpec) {
490 /* Add new kvobj to the db. */
491 int slot = getKeySlot(key);
492
493 dictEntryLink link, bucket;
494 link = kvstoreDictFindLink(db->keys, slot, key, &bucket);
495
496 /* If already exists, return NULL */
497 if (link != NULL)
498 return NULL;
499
500 /* Create kvobj with metadata bits from KeyMetaSpec */
501 robj *val = *valref;
502 kvobj *kv = kvobjSet(key, val, keyMetaSpec->metabits);
503 initObjectLRUOrLFU(kv);
504 kvstoreDictSetAtLink(db->keys, slot, kv, &bucket, 1);
505
506 /* Handle metadata (expiration and modules metadata) */
507 if (keyMetaSpec->metabits) {
508 if (keyMetaSpec->metabits & KEY_META_MASK_EXPIRE) {
509 /* Expiry is always the first meta (from last) */
510 long long expire = keyMetaSpec->meta[KEY_META_ID_MAX - 1];
511 kvobj *newkv = setExpireByLink(NULL, db, key, expire, bucket);
512 serverAssert(newkv == kv);
513 }
514
515 /* memcpy modules metadata to beginning of kvobj */
516 if (keyMetaSpec->metabits & KEY_META_MASK_MODULES)
517 memcpy(kvobjGetAllocPtr(kv),
518 keyMetaSpec->meta + KEY_META_ID_MAX - keyMetaSpec->numMeta,
519 keyMetaSpec->numMeta * sizeof(uint64_t));
520 }
521
522 updateKeysizesHist(db, slot, kv->type, -1, (int64_t) getObjectLength(kv));
523 if (server.memory_tracking_per_slot)
524 updateSlotAllocSize(db, slot, 0, kvobjAllocSize(kv));
525 return *valref = kv;
526}
527
528/**
529 * Overwrite an existing key's value in db with a new value.
530 *
531 * - If the reference count of 'valref' is 1 the ownership of the value is
532 * transferred to this function. The value may be reallocated, potentially
533 * invalidating any external references to it. The (potentially reallocated)
534 * value is stored in the database, and the 'valref' pointer is updated to
535 * reflect the new allocation, if one occurs.
536 * - The reference counter of the value referenced by 'valref' is not incremented
537 * so the caller must refrain from releasing it using decrRefCount after this
538 * function is called.
539 * - This function does not modify the expire time of the existing key.
540 * - The 'overwrite' flag is an indication whether this is done as part of a
541 * complete replacement of their key, which can be thought as a deletion and
542 * replacement (in which case we need to emit deletion signals), or just an
543 * update of a value of an existing key (when false).
544 * - The `link` is optional, can save lookup, if provided.
545 */
546static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link,
547 int overwrite, int updateKeySizes, int keepTTL) {
548 int freeModuleMeta = 0;
549 robj *val = *valref;
550 int slot = getKeySlot(key->ptr);
551 size_t oldsize = 0;
552 if (!link) {
553 link = kvstoreDictFindLink(db->keys, slot, key->ptr, NULL);
554 serverAssertWithInfo(NULL, key, link != NULL); /* expected to exist */
555 }
556 kvobj *old = dictGetKV(*link);
557 kvobj *kvNew;
558
559 int64_t oldlen = (int64_t) getObjectLength(old);
560 int oldtype = old->type;
561
562 /* if hash with HFEs, take care to remove from global HFE DS before attempting
563 * to manipulate and maybe free kvOld object */
564 if (old->type == OBJ_HASH)
565 estoreRemove(db->subexpires, slot, old);
566
567 long long oldExpire = getExpire(db, key->ptr, old);
568
569 /* All metadata will be kept if not `overwrite` for the new object */
570 uint32_t newKeyMetaBits = old->metabits;
571 /* clear expire if not keepTTL or no old expire */
572 if ((!keepTTL) || (oldExpire == -1))
573 newKeyMetaBits &= ~KEY_META_MASK_EXPIRE;
574
575 if (overwrite) {
576 /* On overwrite, discard module metadata excluding expire if set */
577 newKeyMetaBits &= KEY_META_MASK_EXPIRE;
578 /* RM_StringDMA may call dbUnshareStringValue which may free val, so we
579 * need to incr to retain old */
580 incrRefCount(old);
581
582 /* Free related metadata. Ignore builtin metadata (currently only expire) */
583 if (getModuleMetaBits(old->metabits)) {
584 keyMetaOnUnlink(db, key, old);
585 freeModuleMeta = 1;
586 }
587
588 /* Although the key is not really deleted from the database, we regard
589 * overwrite as two steps of unlink+add, so we still need to call the unlink
590 * callback of the module. */
591 moduleNotifyKeyUnlink(key,old,db->id,DB_FLAG_KEY_OVERWRITE);
592 /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
593 signalDeletedKeyAsReady(db,key,old->type);
594 decrRefCount(old);
595 /* Because of RM_StringDMA, old may be changed, so we need get old again */
596 old = dictGetKV(*link);
597 }
598 if (server.memory_tracking_per_slot)
599 oldsize = kvobjAllocSize(old);
600
601 if ((old->refcount == 1 && old->encoding != OBJ_ENCODING_EMBSTR) &&
602 (val->refcount == 1 && val->encoding != OBJ_ENCODING_EMBSTR) && (!freeModuleMeta))
603 {
604 /* Keep old object in the database. Just swap it's ptr, type and
605 * encoding with the content of val. */
606 robj tmp = *old;
607 old->type = val->type;
608 old->encoding = val->encoding;
609 old->ptr = val->ptr;
610 val->type = tmp.type;
611 val->encoding = tmp.encoding;
612 val->ptr = tmp.ptr;
613 /* Set new to old to keep the old object. Set old to val to be freed below. */
614 kvNew = old;
615 old = val;
616
617 /* Handle TTL in the optimization path */
618 if ((!keepTTL) && (oldExpire >= 0))
619 removeExpire(db, key);
620 } else {
621 /* Replace the old value at its location in the key space. */
622 val->lru = old->lru;
623
624 kvNew = kvobjSet(key->ptr, val, newKeyMetaBits);
625 kvstoreDictSetAtLink(db->keys, slot, kvNew, &link, 0);
626
627 /* if expiry replace the old value at its location in the expire space. */
628 if (oldExpire != -1) {
629 if (keepTTL) {
630 kvobjSetExpire(kvNew, oldExpire); /* kvNew not reallocated here */
631 dictEntryLink exLink = kvstoreDictFindLink(db->expires, slot,
632 key->ptr, NULL);
633 serverAssertWithInfo(NULL, key, exLink != NULL);
634 kvstoreDictSetAtLink(db->expires, slot, kvNew, &exLink, 0);
635 } else {
636 kvstoreDictDelete(db->expires, slot, key->ptr);
637 }
638 }
639
640 if (newKeyMetaBits & KEY_META_MASK_MODULES)
641 keyMetaTransition(old, kvNew);
642 }
643
644 /* Remove old key and add new key to KEYSIZES histogram */
645 int64_t newlen = (int64_t) getObjectLength(kvNew);
646 if (updateKeySizes) {
647 /* Save one call if old and new are the same type */
648 if (oldtype == kvNew->type) {
649 updateKeysizesHist(db, slot, oldtype, oldlen, newlen);
650 } else {
651 updateKeysizesHist(db, slot, oldtype, oldlen, -1);
652 updateKeysizesHist(db, slot, kvNew->type, -1, newlen);
653 }
654 }
655
656 if (server.memory_tracking_per_slot)
657 updateSlotAllocSize(db, slot, oldsize, kvobjAllocSize(kvNew));
658
659 if (server.io_threads_num > 1 && old->encoding == OBJ_ENCODING_RAW) {
660 /* In multi-threaded mode, the OBJ_ENCODING_RAW string object usually is
661 * allocated in the IO thread, so we defer the free to the IO thread.
662 * Besides, we never free a string object in BIO threads, so, even with
663 * lazyfree-lazy-server-del enabled, a fallback to main thread freeing
664 * due to defer free failure doesn't go against the config intention. */
665 tryDeferFreeClientObject(server.current_client, DEFERRED_OBJECT_TYPE_ROBJ, old);
666 } else if (server.lazyfree_lazy_server_del) {
667 freeObjAsync(key, old, db->id);
668 } else {
669 decrRefCount(old);
670 }
671 *valref = kvNew;
672}
673
674/* Replace an existing key with a new value, we just replace value and don't
675 * emit any events */
676void dbReplaceValue(redisDb *db, robj *key, robj **valref, int updateKeySizes) {
677 dbSetValue(db, key, valref, NULL, 0, updateKeySizes, 1);
678}
679
680/* Replace an existing key with a new value (don't emit any events)
681 *
682 * parameter 'link' is optional. If provided, saves lookup.
683 */
684void dbReplaceValueWithLink(redisDb *db, robj *key, robj **val, dictEntryLink link) {
685 dbSetValue(db, key, val, link, 0, 1, 1);
686}
687
688/* High level Set operation. This function can be used in order to set
689 * a key, whatever it was existing or not, to a new object.
690 *
691 * 1) The value may be reallocated when adding it to the database. The value
692 * pointer 'valref' is updated to point to the reallocated object. The
693 * reference count of the value object is *not* incremented.
694 * 2) clients WATCHing for the destination key notified.
695 * 3) The expire time of the key is reset (the key is made persistent),
696 * unless 'SETKEY_KEEPTTL' is enabled in flags.
697 * 4) The key lookup can take place outside this interface outcome will be
698 * delivered with 'SETKEY_ALREADY_EXIST' or 'SETKEY_DOESNT_EXIST'
699 *
700 * All the new keys in the database should be created via this interface.
701 * The client 'c' argument may be set to NULL if the operation is performed
702 * in a context where there is no clear client performing the operation. */
703void setKey(client *c, redisDb *db, robj *key, robj **valref, int flags) {
704 setKeyByLink(c, db, key, valref, flags, NULL);
705}
706
707/* Like setKey(), but accepts an optional link
708 *
709 * - If flags is set with SETKEY_ALREADY_EXIST, then `link` must be provided
710 * - If flags is set with SETKEY_DOESNT_EXIST, then `link` is optional. If
711 * provided, it will point to the bucket where the key should be added.
712 * - If flag is not set (0) then add or update key, and `link` must be NULL
713 * On return, link get updated, by need, to the inserted kvobj.
714 */
715void setKeyByLink(client *c, redisDb *db, robj *key, robj **valref, int flags, dictEntryLink *plink) {
716 dictEntryLink dummy = NULL, *link = plink ? plink : &dummy;
717 int exists;
718 kvobj *oldval = NULL;
719
720 if (flags & SETKEY_ALREADY_EXIST) {
721 debugServerAssert((*link) != NULL);
722 oldval = dictGetKV(**link);
723 exists = 1;
724 } else if (flags & SETKEY_DOESNT_EXIST) {
725 /* link is optional */
726 exists = 0;
727 } else {
728 /* Add or update key */
729 oldval = lookupKeyWriteWithLink(db, key, link);
730 exists = oldval != NULL;
731 }
732
733 if (exists) {
734 int oldtype = oldval->type;
735 int newtype = (*valref)->type;
736
737 /* Update the value of an existing key */
738 dbSetValue(db, key, valref, *link, 1, 1, flags & SETKEY_KEEPTTL);
739
740 /* Notify keyspace events for override and type change */
741 notifyKeyspaceEvent(NOTIFY_OVERWRITTEN, "overwritten", key, db->id);
742 if (oldtype != newtype)
743 notifyKeyspaceEvent(NOTIFY_TYPE_CHANGED, "type_changed", key, db->id);
744 } else {
745 /* Add the new key to the database */
746 dbAddByLink(db, key, valref, link);
747 }
748
749 /* Signal key modification and update LRM timestamp. */
750 keyModified(c,db,key,*valref,!(flags & SETKEY_NO_SIGNAL));
751}
752
753/* During atomic slot migration, keys that are being imported are in an
754 * intermediate state. we cannot access them and therefore skip them.
755 *
756 * This callback function now is used by:
757 * - dbRandomKey
758 * - keysCommand
759 * - scanCommand
760 */
761static int accessKeysShouldSkipDictIndex(int didx) {
762 return !clusterCanAccessKeysInSlot(didx);
763}
764
765/* Return a random key, in form of a Redis object.
766 * If there are no keys, NULL is returned.
767 *
768 * The function makes sure to return keys not already expired. */
769robj *dbRandomKey(redisDb *db) {
770 dictEntry *de;
771 int maxtries = 100;
772 int allvolatile = kvstoreSize(db->keys) == kvstoreSize(db->expires);
773
774 while(1) {
775 robj *keyobj;
776 int randomSlot = kvstoreGetFairRandomDictIndex(db->keys, accessKeysShouldSkipDictIndex, 16, 1);
777 if (randomSlot == -1) return NULL;
778 de = kvstoreDictGetFairRandomKey(db->keys, randomSlot);
779 if (de == NULL) return NULL;
780
781 kvobj *kv = dictGetKV(de);
782 sds key = kvobjGetKey(kv);
783 keyobj = createStringObject(key,sdslen(key));
784 if (allvolatile && (server.masterhost || isPausedActions(PAUSE_ACTION_EXPIRE)) && --maxtries == 0) {
785 /* If the DB is composed only of keys with an expire set,
786 * it could happen that all the keys are already logically
787 * expired in the slave, so the function cannot stop because
788 * expireIfNeeded() is false, nor it can stop because
789 * dictGetFairRandomKey() returns NULL (there are keys to return).
790 * To prevent the infinite loop we do some tries, but if there
791 * are the conditions for an infinite loop, eventually we
792 * return a key name that may be already expired. */
793 return keyobj;
794 }
795 if (expireIfNeeded(db, keyobj, kv, 0) != KEY_VALID) {
796 decrRefCount(keyobj);
797 continue; /* search for another key. This expired. */
798 }
799
800 return keyobj;
801 }
802}
803
804/* Helper for sync and async delete. */
805int dbGenericDelete(redisDb *db, robj *key, int async, int flags) {
806 dictEntryLink link;
807 int table;
808 int slot = getKeySlot(key->ptr);
809 link = kvstoreDictTwoPhaseUnlinkFind(db->keys, slot, key->ptr, &table);
810
811 if (link) {
812 kvobj *kv = dictGetKV(*link);
813
814 int64_t oldlen = (int64_t) getObjectLength(kv);
815 int type = kv->type;
816
817 /* If hash object with expiry on fields, remove it from HFE DS of DB */
818 if (type == OBJ_HASH)
819 estoreRemove(db->subexpires, slot, kv);
820
821 /* RM_StringDMA may call dbUnshareStringValue which may free kv, so we
822 * need to incr to retain kv */
823 incrRefCount(kv); /* refcnt=1->2 */
824 /* Metadata hook: notify unlink for key metadata cleanup. */
825 if (getModuleMetaBits(kv->metabits)) keyMetaOnUnlink(db, key, kv);
826 /* Tells the module that the key has been unlinked from the database. */
827 moduleNotifyKeyUnlink(key, kv, db->id, flags);
828 /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
829 signalDeletedKeyAsReady(db,key,type);
830 /* We should call decr before freeObjAsync. If not, the refcount may be
831 * greater than 1, so freeObjAsync doesn't work */
832 decrRefCount(kv);
833
834 /* Because of dbUnshareStringValue, the val in db may change. */
835 kv = dictGetKV(*link);
836
837 /* if expirable, delete an entry from the expires dict is not decrRefCount of kvobj */
838 if (kvobjGetExpire(kv) != -1)
839 kvstoreDictDelete(db->expires, slot, key->ptr);
840
841 if (async) {
842 if (server.memory_tracking_per_slot)
843 updateSlotAllocSize(db, slot, kvobjAllocSize(kv), 0);
844 freeObjAsync(key, kv, db->id);
845 /* Set the key to NULL in the main dictionary. */
846 kvstoreDictSetAtLink(db->keys, slot, NULL, &link, 0);
847 }
848 kvstoreDictTwoPhaseUnlinkFree(db->keys, slot, link, table);
849
850 /* remove key from histogram */
851 if(!(flags & DB_FLAG_NO_UPDATE_KEYSIZES))
852 updateKeysizesHist(db, slot, type, oldlen, -1);
853 return 1;
854 } else {
855 return 0;
856 }
857}
858
859/* Delete a key, value, and associated expiration entry if any, from the DB */
860int dbSyncDelete(redisDb *db, robj *key) {
861 return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED);
862}
863
864/* Delete a key, value, and associated expiration entry if any, from the DB. If
865 * the value consists of many allocations, it may be freed asynchronously. */
866int dbAsyncDelete(redisDb *db, robj *key) {
867 return dbGenericDelete(db, key, 1, DB_FLAG_KEY_DELETED);
868}
869
870/* This is a wrapper whose behavior depends on the Redis lazy free
871 * configuration. Deletes the key synchronously or asynchronously. */
872int dbDelete(redisDb *db, robj *key) {
873 return dbGenericDelete(db, key, server.lazyfree_lazy_server_del, DB_FLAG_KEY_DELETED);
874}
875
876/* Similar to dbDelete(), but does not update the keysizes histogram.
877 * This is used when we want to delete a key without affecting the histogram,
878 * typically in cases where a command flow deletes elements from a collection
879 * and then deletes the collection itself. In such cases, using dbDelete()
880 * would incorrectly decrement bin #0. A corresponding test should be added
881 * to `info-keysizes.tcl`. */
882int dbDeleteSkipKeysizesUpdate(redisDb *db, robj *key) {
883 return dbGenericDelete(db, key, server.lazyfree_lazy_server_del,
884 DB_FLAG_KEY_DELETED | DB_FLAG_NO_UPDATE_KEYSIZES);
885}
886
887/* Prepare the string object stored at 'key' to be modified destructively
888 * to implement commands like SETBIT or APPEND.
889 *
890 * An object is usually ready to be modified unless one of the two conditions
891 * are true:
892 *
893 * 1) The object 'o' is shared (refcount > 1), we don't want to affect
894 * other users.
895 * 2) The object encoding is not "RAW".
896 *
897 * If the object is found in one of the above conditions (or both) by the
898 * function, an unshared / not-encoded copy of the string object is stored
899 * at 'key' in the specified 'db'. Otherwise the object 'o' itself is
900 * returned.
901 *
902 * USAGE:
903 *
904 * The object 'o' is what the caller already obtained by looking up 'key'
905 * in 'db', the usage pattern looks like this:
906 *
907 * o = lookupKeyWrite(db,key);
908 * if (checkType(c,o,OBJ_STRING)) return;
909 * o = dbUnshareStringValue(db,key,o);
910 *
911 * At this point the caller is ready to modify the object, for example
912 * using an sdscat() call to append some data, or anything else.
913 */
914kvobj *dbUnshareStringValue(redisDb *db, robj *key, kvobj *kv) {
915 return dbUnshareStringValueByLink(db,key,kv,NULL);
916}
917
918/* Like dbUnshareStringValue(), but accepts a optional link,
919 * which can be used if we already have one, thus saving the dbFind call. */
920kvobj *dbUnshareStringValueByLink(redisDb *db, robj *key, kvobj *o, dictEntryLink link) {
921 serverAssert(o->type == OBJ_STRING);
922 if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
923 robj *decoded = getDecodedObject(o);
924 o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr));
925 decrRefCount(decoded);
926 dbReplaceValueWithLink(db, key, &o, link);
927 }
928 return o;
929}
930
931/* Remove all keys from the database(s) structure. The dbarray argument
932 * may not be the server main DBs (could be a temporary DB).
933 *
934 * The dbnum can be -1 if all the DBs should be emptied, or the specified
935 * DB index if we want to empty only a single database.
936 * The function returns the number of keys removed from the database(s). */
937long long emptyDbStructure(redisDb *dbarray, int dbnum, int async,
938 void(callback)(dict*))
939{
940 long long removed = 0;
941 int startdb, enddb;
942
943 if (dbnum == -1) {
944 startdb = 0;
945 enddb = server.dbnum-1;
946 } else {
947 startdb = enddb = dbnum;
948 }
949
950 for (int j = startdb; j <= enddb; j++) {
951 removed += kvstoreSize(dbarray[j].keys);
952 if (async) {
953 emptyDbAsync(&dbarray[j]);
954 } else {
955 /* Destroy sub-expires before deleting the kv-objects since ebuckets
956 * data structure is embedded in the stored kv-objects. */
957 estoreEmpty(dbarray[j].subexpires);
958 kvstoreEmpty(dbarray[j].keys, callback);
959 kvstoreEmpty(dbarray[j].expires, callback);
960 }
961 /* Because all keys of database are removed, reset average ttl. */
962 dbarray[j].avg_ttl = 0;
963 dbarray[j].expires_cursor = 0;
964 }
965
966 return removed;
967}
968
969/* Remove all data (keys and functions) from all the databases in a
970 * Redis server. If callback is given the function is called from
971 * time to time to signal that work is in progress.
972 *
973 * The dbnum can be -1 if all the DBs should be flushed, or the specified
974 * DB number if we want to flush only a single Redis database number.
975 *
976 * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or
977 * EMPTYDB_ASYNC if we want the memory to be freed in a different thread
978 * and the function to return ASAP. EMPTYDB_NOFUNCTIONS can also be set
979 * to specify that we do not want to delete the functions.
980 *
981 * On success the function returns the number of keys removed from the
982 * database(s). Otherwise -1 is returned in the specific case the
983 * DB number is out of range, and errno is set to EINVAL. */
984long long emptyData(int dbnum, int flags, void(callback)(dict*)) {
985 int async = (flags & EMPTYDB_ASYNC);
986 int with_functions = !(flags & EMPTYDB_NOFUNCTIONS);
987 RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum};
988 long long removed = 0;
989
990 if (dbnum < -1 || dbnum >= server.dbnum) {
991 errno = EINVAL;
992 return -1;
993 }
994
995 if (dbnum == -1 || dbnum == 0)
996 asmCancelTrimJobs();
997
998 /* Fire the flushdb modules event. */
999 moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
1000 REDISMODULE_SUBEVENT_FLUSHDB_START,
1001 &fi);
1002
1003 /* Make sure the WATCHed keys are affected by the FLUSH* commands.
1004 * Note that we need to call the function while the keys are still
1005 * there. */
1006 signalFlushedDb(dbnum, async, NULL);
1007
1008 /* Empty redis database structure. */
1009 removed = emptyDbStructure(server.db, dbnum, async, callback);
1010
1011 if (dbnum == -1) flushSlaveKeysWithExpireList();
1012
1013 if (with_functions) {
1014 serverAssert(dbnum == -1);
1015 functionsLibCtxClearCurrent(async);
1016 }
1017
1018 /* Also fire the end event. Note that this event will fire almost
1019 * immediately after the start event if the flush is asynchronous. */
1020 moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB,
1021 REDISMODULE_SUBEVENT_FLUSHDB_END,
1022 &fi);
1023
1024 return removed;
1025}
1026
1027/* Initialize temporary db on replica for use during diskless replication. */
1028redisDb *initTempDb(void) {
1029 int slot_count_bits = 0;
1030 int flags = KVSTORE_ALLOCATE_DICTS_ON_DEMAND;
1031 if (server.cluster_enabled) {
1032 slot_count_bits = CLUSTER_SLOT_MASK_BITS;
1033 flags |= KVSTORE_FREE_EMPTY_DICTS;
1034 }
1035 redisDb *tempDb = zcalloc(sizeof(redisDb)*server.dbnum);
1036 for (int i=0; i<server.dbnum; i++) {
1037 tempDb[i].id = i;
1038 tempDb[i].keys = kvstoreCreate(&kvstoreExType, &dbDictType, slot_count_bits,
1039 flags);
1040 tempDb[i].expires = kvstoreCreate(&kvstoreBaseType, &dbExpiresDictType,
1041 slot_count_bits, flags);
1042 tempDb[i].subexpires = estoreCreate(&subexpiresBucketsType, slot_count_bits);
1043 }
1044
1045 return tempDb;
1046}
1047
1048/* Discard tempDb, this can be slow (similar to FLUSHALL), but it's always async. */
1049void discardTempDb(redisDb *tempDb) {
1050 int async = 1;
1051
1052 /* Release temp DBs. */
1053 emptyDbStructure(tempDb, -1, async, NULL);
1054 for (int i=0; i<server.dbnum; i++) {
1055 /* Destroy sub-expires before deleting the kv-objects since ebuckets
1056 * data structure is embedded in the stored kv-objects. */
1057 estoreRelease(tempDb[i].subexpires);
1058 kvstoreRelease(tempDb[i].keys);
1059 kvstoreRelease(tempDb[i].expires);
1060 }
1061
1062 zfree(tempDb);
1063}
1064
1065int selectDb(client *c, int id) {
1066 if (id < 0 || id >= server.dbnum)
1067 return C_ERR;
1068 c->db = &server.db[id];
1069 return C_OK;
1070}
1071
1072long long dbTotalServerKeyCount(void) {
1073 long long total = 0;
1074 int j;
1075 for (j = 0; j < server.dbnum; j++) {
1076 total += kvstoreSize(server.db[j].keys);
1077 }
1078 return total;
1079}
1080
1081/*-----------------------------------------------------------------------------
1082 * Hooks for key space changes.
1083 *
1084 * Every time a key in the database is modified the function
1085 * keyModified() is called.
1086 *
1087 * Every time a DB is flushed the function signalFlushDb() is called.
1088 *----------------------------------------------------------------------------*/
1089
1090/* Called when a key is modified to update LRM timestamp
1091 * and optionally signal watchers/tracking clients.
1092 *
1093 * Arguments:
1094 * - c: client (may be NULL if the key was modified out of a context of a client)
1095 * - db: database containing the key
1096 * - key: the key that was modified
1097 * - val: the value object (if NULL, LRM won't be updated, e.g., for deleted keys)
1098 * - signal: if true, trigger WATCH and client-side tracking invalidation
1099 */
1100void keyModified(client *c, redisDb *db, robj *key, robj *val, int signal) {
1101 if (val) updateLRM(val);
1102 if (signal) {
1103 touchWatchedKey(db,key);
1104 trackingInvalidateKey(c,key,1);
1105 }
1106}
1107
1108void signalFlushedDb(int dbid, int async, slotRangeArray *slots) {
1109 int startdb, enddb;
1110 if (dbid == -1) {
1111 startdb = 0;
1112 enddb = server.dbnum-1;
1113 } else {
1114 startdb = enddb = dbid;
1115 }
1116
1117 for (int j = startdb; j <= enddb; j++) {
1118 scanDatabaseForDeletedKeys(&server.db[j], NULL, slots);
1119 touchAllWatchedKeysInDb(&server.db[j], NULL, slots);
1120 }
1121
1122 trackingInvalidateKeysOnFlush(async);
1123
1124 /* Changes in this method may take place in swapMainDbWithTempDb as well,
1125 * where we execute similar calls, but with subtle differences as it's
1126 * not simply flushing db. */
1127}
1128
1129/*-----------------------------------------------------------------------------
1130 * Type agnostic commands operating on the key space
1131 *----------------------------------------------------------------------------*/
1132
1133/* Return the set of flags to use for the emptyData() call for FLUSHALL
1134 * and FLUSHDB commands.
1135 *
1136 * sync: flushes the database in an sync manner.
1137 * async: flushes the database in an async manner.
1138 * no option: determine sync or async according to the value of lazyfree-lazy-user-flush.
1139 *
1140 * On success C_OK is returned and the flags are stored in *flags, otherwise
1141 * C_ERR is returned and the function sends an error to the client. */
1142int getFlushCommandFlags(client *c, int *flags) {
1143 /* Parse the optional ASYNC option. */
1144 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"sync")) {
1145 *flags = EMPTYDB_NO_FLAGS;
1146 } else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"async")) {
1147 *flags = EMPTYDB_ASYNC;
1148 } else if (c->argc == 1) {
1149 *flags = server.lazyfree_lazy_user_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
1150 } else {
1151 addReplyErrorObject(c,shared.syntaxerr);
1152 return C_ERR;
1153 }
1154 return C_OK;
1155}
1156
1157/* Flushes the whole server data set. */
1158void flushAllDataAndResetRDB(int flags) {
1159 server.dirty += emptyData(-1,flags,NULL);
1160 if (server.child_type == CHILD_TYPE_RDB) killRDBChild();
1161 if (server.saveparamslen > 0) {
1162 rdbSaveInfo rsi, *rsiptr;
1163 rsiptr = rdbPopulateSaveInfo(&rsi);
1164 rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE);
1165 }
1166
1167#if defined(USE_JEMALLOC)
1168 /* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
1169 * for large databases, flushdb blocks for long anyway, so a bit more won't
1170 * harm and this way the flush and purge will be synchronous. */
1171 if (!(flags & EMPTYDB_ASYNC)) {
1172 /* Only clear the current thread cache.
1173 * Ignore the return call since this will fail if the tcache is disabled. */
1174 je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0);
1175
1176 jemalloc_purge();
1177 }
1178#endif
1179}
1180
1181/* CB function on blocking ASYNC FLUSH completion
1182 *
1183 * Utilized by commands SFLUSH, FLUSHALL and FLUSHDB.
1184 */
1185void flushallSyncBgDone(uint64_t client_id, void *userdata) {
1186 slotRangeArray *slots = userdata;
1187 client *c = lookupClientByID(client_id);
1188
1189 /* Verify that client still exists and being blocked. */
1190 if (!(c && c->flags & CLIENT_BLOCKED)) {
1191 slotRangeArrayFree(slots);
1192 return;
1193 }
1194
1195 /* Update current_client (Called functions might rely on it) */
1196 client *old_client = server.current_client;
1197 server.current_client = c;
1198
1199 /* Don't update blocked_us since command was processed in bg by lazy_free thread */
1200 updateStatsOnUnblock(c, 0 /*blocked_us*/, elapsedUs(c->bstate.lazyfreeStartTime), 0);
1201
1202 /* Only SFLUSH command pass user data pointer. */
1203 if (slots)
1204 replySlotsFlushAndFree(c, slots);
1205 else
1206 addReply(c, shared.ok);
1207
1208 /* mark client as unblocked */
1209 unblockClient(c, 1);
1210
1211 if (c->flags & CLIENT_PENDING_COMMAND) {
1212 c->flags &= ~CLIENT_PENDING_COMMAND;
1213 /* The FLUSH command won't be reprocessed, FLUSH command is finished, but
1214 * we still need to complete its full processing flow, including updating
1215 * the replication offset. */
1216 commandProcessed(c);
1217 }
1218
1219 /* On flush completion, update the client's memory */
1220 updateClientMemUsageAndBucket(c);
1221
1222 /* restore current_client */
1223 server.current_client = old_client;
1224}
1225
1226/* Common flush command implementation for FLUSHALL, FLUSHDB and SFLUSH.
1227 *
1228 * Return 1 indicates that flush SYNC is actually running in bg as blocking ASYNC
1229 * Return 0 otherwise
1230 *
1231 * slots - provided only by SFLUSH command, otherwise NULL. Will be used on
1232 * completion to reply with the slots flush result. Ownership is passed
1233 * to the completion job in case of `blocking_async`.
1234 */
1235int flushCommandCommon(client *c, int type, int flags, slotRangeArray *slots) {
1236 int blocking_async = 0; /* Flush SYNC option to run as blocking ASYNC */
1237
1238 /* in case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */
1239 if ((!(flags & EMPTYDB_ASYNC)) && (!(c->flags & CLIENT_AVOID_BLOCKING_ASYNC_FLUSH))) {
1240 /* Run as ASYNC */
1241 flags |= EMPTYDB_ASYNC;
1242 blocking_async = 1;
1243 }
1244
1245 /* Cancel all ASM tasks that overlap with the given slot ranges. */
1246 clusterAsmCancelBySlotRangeArray(slots, c->argv[0]->ptr);
1247
1248 if (type == FLUSH_TYPE_ALL)
1249 flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS);
1250 else
1251 server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL);
1252
1253 /* Without the forceCommandPropagation, when DB(s) was already empty,
1254 * FLUSHALL\FLUSHDB will not be replicated nor put into the AOF. */
1255 forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF);
1256
1257 /* if blocking ASYNC, block client and add completion job request to BIO lazyfree
1258 * worker's queue. To be called and reply with OK only after all preceding pending
1259 * lazyfree jobs in queue were processed */
1260 if (blocking_async) {
1261 /* measure bg job till completion as elapsed time of flush command */
1262 elapsedStart(&c->bstate.lazyfreeStartTime);
1263
1264 c->bstate.timeout = 0;
1265 /* We still need to perform cleanup operations for the command, including
1266 * updating the replication offset, so mark this command as pending to
1267 * avoid command from being reset during unblock. */
1268 c->flags |= CLIENT_PENDING_COMMAND;
1269 blockClient(c,BLOCKED_LAZYFREE);
1270 bioCreateCompRq(BIO_WORKER_LAZY_FREE, flushallSyncBgDone, c->id, slots);
1271 }
1272
1273#if defined(USE_JEMALLOC)
1274 /* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
1275 * for large databases, flushdb blocks for long anyway, so a bit more won't
1276 * harm and this way the flush and purge will be synchronous.
1277 *
1278 * Take care purge only FLUSHDB for sync flow. FLUSHALL sync flow already
1279 * applied at flushAllDataAndResetRDB. Async flow will apply only later on */
1280 if ((type != FLUSH_TYPE_ALL) && (!(flags & EMPTYDB_ASYNC))) {
1281 /* Only clear the current thread cache.
1282 * Ignore the return call since this will fail if the tcache is disabled. */
1283 je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0);
1284
1285 jemalloc_purge();
1286 }
1287#endif
1288 return blocking_async;
1289}
1290
1291/* FLUSHALL [SYNC|ASYNC]
1292 *
1293 * Flushes the whole server data set. */
1294void flushallCommand(client *c) {
1295 int flags;
1296 if (getFlushCommandFlags(c,&flags) == C_ERR) return;
1297
1298 /* If FLUSH SYNC isn't running as blocking async, then reply */
1299 if (flushCommandCommon(c, FLUSH_TYPE_ALL, flags, NULL) == 0)
1300 addReply(c, shared.ok);
1301}
1302
1303/* FLUSHDB [SYNC|ASYNC]
1304 *
1305 * Flushes the currently SELECTed Redis DB. */
1306void flushdbCommand(client *c) {
1307 int flags;
1308 if (getFlushCommandFlags(c,&flags) == C_ERR) return;
1309
1310 /* If FLUSH SYNC isn't running as blocking async, then reply */
1311 if (flushCommandCommon(c, FLUSH_TYPE_DB,flags, NULL) == 0)
1312 addReply(c, shared.ok);
1313
1314}
1315
1316/* This command implements DEL and UNLINK. */
1317void delGenericCommand(client *c, int lazy) {
1318 int numdel = 0, j;
1319
1320 for (j = 1; j < c->argc; j++) {
1321 if (expireIfNeeded(c->db, c->argv[j], NULL, 0) == KEY_DELETED)
1322 continue;
1323 int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
1324 dbSyncDelete(c->db,c->argv[j]);
1325 if (deleted) {
1326 keyModified(c,c->db,c->argv[j],NULL,1);
1327 notifyKeyspaceEvent(NOTIFY_GENERIC,
1328 "del",c->argv[j],c->db->id);
1329 server.dirty++;
1330 numdel++;
1331 }
1332 }
1333 addReplyLongLong(c,numdel);
1334}
1335
1336void delCommand(client *c) {
1337 delGenericCommand(c,server.lazyfree_lazy_user_del);
1338}
1339
1340/* DELEX key [IFEQ match-value|IFNE match-value|IFDEQ match-digest|IFDNE match-digest]
1341 *
1342 * Conditionally removes the specified key. A key is ignored if it does not
1343 * exist.
1344 * If no condition is specified the behavior is the same as DEL command.
1345 * If condition is specified the key must be of STRING type.
1346 *
1347 * IFEQ/IFNE conditions check the match-value against the value of the key
1348 * IFDEQ/IFDNE conditions check the match-digest against the digest of the key's value.*/
1349void delexCommand(client *c) {
1350 kvobj *o;
1351 int deleted = 0, should_delete = 0;
1352
1353 /* If there are no conditions specified we just delete the key */
1354 if (c->argc == 2) {
1355 delGenericCommand(c, server.lazyfree_lazy_server_del);
1356 return;
1357 }
1358
1359 /* If we have more than two arguments the next two are condition and
1360 * match-value */
1361 if (c->argc != 4) {
1362 addReplyErrorArity(c);
1363 return;
1364 }
1365
1366 robj *key = c->argv[1];
1367 o = lookupKeyRead(c->db, key);
1368 if (o == NULL) {
1369 addReplyLongLong(c, 0);
1370 return;
1371 }
1372
1373 /* If any conditions are specified the only supported key type for now is
1374 * string */
1375 if (o->type != OBJ_STRING) {
1376 addReplyError(c, "Key should be of string type if conditions are specified");
1377 return;
1378 }
1379
1380 char *condition = c->argv[2]->ptr;
1381 if (!strcasecmp("ifeq", condition)) {
1382 robj *valueobj = getDecodedObject(o);
1383 sds match_value = c->argv[3]->ptr;
1384 if (sdscmp(valueobj->ptr, match_value) == 0)
1385 should_delete = 1;
1386
1387 decrRefCount(valueobj);
1388 } else if (!strcasecmp("ifne", condition)) {
1389 robj *valueobj = getDecodedObject(o);
1390 sds match_value = c->argv[3]->ptr;
1391 if (sdscmp(valueobj->ptr, match_value) != 0)
1392 should_delete = 1;
1393
1394 decrRefCount(valueobj);
1395 } else if (!strcasecmp("ifdeq", condition)) {
1396 if (validateHexDigest(c, c->argv[3]->ptr) != C_OK)
1397 return;
1398
1399 sds current_digest = stringDigest(o);
1400 if (strcasecmp(current_digest, c->argv[3]->ptr) == 0)
1401 should_delete = 1;
1402
1403 sdsfree(current_digest);
1404 } else if (!strcasecmp("ifdne", condition)) {
1405 if (validateHexDigest(c, c->argv[3]->ptr) != C_OK)
1406 return;
1407
1408 sds current_digest = stringDigest(o);
1409 if (strcasecmp(current_digest, c->argv[3]->ptr) != 0)
1410 should_delete = 1;
1411
1412 sdsfree(current_digest);
1413 } else {
1414 addReplyError(c, "Invalid condition. Use IFEQ, IFNE, IFDEQ, or IFDNE");
1415 return;
1416 }
1417
1418 if (should_delete) {
1419 deleted = server.lazyfree_lazy_server_del ?
1420 dbAsyncDelete(c->db, key) :
1421 dbSyncDelete(c->db, key);
1422 }
1423
1424 if (deleted) {
1425 rewriteClientCommandVector(c, 2, shared.del, key);
1426 keyModified(c, c->db, key, NULL, 1);
1427 notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id);
1428 server.dirty++;
1429 }
1430
1431 addReplyLongLong(c, deleted);
1432}
1433
1434void unlinkCommand(client *c) {
1435 delGenericCommand(c,1);
1436}
1437
1438/* EXISTS key1 key2 ... key_N.
1439 * Return value is the number of keys existing. */
1440void existsCommand(client *c) {
1441 long long count = 0;
1442 int j;
1443
1444 for (j = 1; j < c->argc; j++) {
1445 if (lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH)) count++;
1446 }
1447 addReplyLongLong(c,count);
1448}
1449
1450void selectCommand(client *c) {
1451 int id;
1452
1453 if (getIntFromObjectOrReply(c, c->argv[1], &id, NULL) != C_OK)
1454 return;
1455
1456 if (server.cluster_enabled && id != 0) {
1457 addReplyError(c,"SELECT is not allowed in cluster mode");
1458 return;
1459 }
1460
1461 if (id != 0) {
1462 server.stat_cluster_incompatible_ops++;
1463 }
1464
1465 if (selectDb(c,id) == C_ERR) {
1466 addReplyError(c,"DB index is out of range");
1467 } else {
1468 addReply(c,shared.ok);
1469 }
1470}
1471
1472void randomkeyCommand(client *c) {
1473 robj *key;
1474
1475 if ((key = dbRandomKey(c->db)) == NULL) {
1476 addReplyNull(c);
1477 return;
1478 }
1479
1480 addReplyBulk(c,key);
1481 decrRefCount(key);
1482}
1483
1484void keysCommand(client *c) {
1485 dictEntry *de;
1486 sds pattern = c->argv[1]->ptr;
1487 int plen = sdslen(pattern), allkeys, pslot = -1;
1488 unsigned long numkeys = 0;
1489 void *replylen = addReplyDeferredLen(c);
1490 allkeys = (pattern[0] == '*' && plen == 1);
1491 if (server.cluster_enabled && !allkeys) {
1492 pslot = patternHashSlot(pattern, plen);
1493 }
1494 int has_slot = pslot != -1;
1495 union {
1496 kvstoreDictIterator kvs_di;
1497 kvstoreIterator kvs_it;
1498 } it;
1499 if (has_slot) {
1500 if (!kvstoreDictSize(c->db->keys, pslot) || accessKeysShouldSkipDictIndex(pslot)) {
1501 /* Requested slot is empty */
1502 setDeferredArrayLen(c,replylen,0);
1503 return;
1504 }
1505 kvstoreInitDictSafeIterator(&it.kvs_di, c->db->keys, pslot);
1506 } else {
1507 kvstoreIteratorInit(&it.kvs_it, c->db->keys);
1508 }
1509
1510 while ((de = has_slot ? kvstoreDictIteratorNext(&it.kvs_di) : kvstoreIteratorNext(&it.kvs_it)) != NULL) {
1511 if (!has_slot && accessKeysShouldSkipDictIndex(kvstoreIteratorGetCurrentDictIndex(&it.kvs_it))) {
1512 continue;
1513 }
1514
1515 kvobj *kv = dictGetKV(de);
1516 sds key = kvobjGetKey(kv);
1517
1518 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
1519 if (!keyIsExpired(c->db, NULL, kv)) {
1520 addReplyBulkCBuffer(c, key, sdslen(key));
1521 numkeys++;
1522 }
1523 }
1524 if (c->flags & CLIENT_CLOSE_ASAP)
1525 break;
1526 }
1527 if (has_slot)
1528 kvstoreResetDictIterator(&it.kvs_di);
1529 else
1530 kvstoreIteratorReset(&it.kvs_it);
1531 setDeferredArrayLen(c,replylen,numkeys);
1532}
1533
1534/* Data used by the dict scan callback. */
1535typedef struct {
1536 list *keys; /* elements that collect from dict */
1537 robj *o; /* o must be a hash/set/zset object, NULL means current db */
1538 long long type; /* the particular type when scan the db */
1539 sds pattern; /* pattern string, NULL means no pattern */
1540 long sampled; /* cumulative number of keys sampled */
1541 int no_values; /* set to 1 means to return keys only */
1542 sds typename; /* typename string, NULL means no type filter */
1543 redisDb *db; /* database reference for expiration checks */
1544} scanData;
1545
1546/* Helper function to compare key type in scan commands */
1547int objectTypeCompare(robj *o, long long target) {
1548 if (o->type != OBJ_MODULE) {
1549 if (o->type != target)
1550 return 0;
1551 else
1552 return 1;
1553 }
1554 /* module type compare */
1555 moduleType *type = ((moduleValue *)o->ptr)->type;
1556 long long mt = (long long)REDISMODULE_TYPE_SIGN(type->entity.id);
1557 if (target != -mt)
1558 return 0;
1559 else
1560 return 1;
1561}
1562/* This callback is used by scanGenericCommand in order to collect elements
1563 * returned by the dictionary iterator into a list. */
1564void scanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) {
1565 UNUSED(plink);
1566 Entry *hashEntry = NULL;
1567 scanData *data = (scanData *)privdata;
1568 list *keys = data->keys;
1569 robj *o = data->o;
1570 sds val = NULL;
1571 void *key = NULL; /* if OBJ_HASH then key is of type `hfield`. Otherwise, `sds` */
1572 void *keyStr;
1573 data->sampled++;
1574
1575 /* o and typename can not have values at the same time. */
1576 serverAssert(!((data->type != LLONG_MAX) && o));
1577
1578 kvobj *kv = NULL;
1579 zskiplistNode *znode = NULL;
1580 if (!o) { /* If scanning keyspace */
1581 kv = dictGetKV(de);
1582 keyStr = kvobjGetKey(kv);
1583 } else if (o->type == OBJ_HASH) {
1584 hashEntry = dictGetKey(de);
1585 keyStr = entryGetField(hashEntry);
1586 } else if (o->type == OBJ_ZSET) {
1587 znode = dictGetKey(de);
1588 keyStr = zslGetNodeElement(znode);
1589 } else {
1590 keyStr = dictGetKey(de);
1591 }
1592
1593 /* Filter element if it does not match the pattern. */
1594 if (data->pattern) {
1595 if (!stringmatchlen(data->pattern, sdslen(data->pattern), keyStr, sdslen(keyStr), 0)) {
1596 return;
1597 }
1598 }
1599
1600 if (!o) {
1601 /* Expiration check first - only for database keyspace scanning.
1602 * Use kv obj to avoid robj creation. */
1603 if (expireIfNeeded(data->db, NULL, kv, 0) != KEY_VALID)
1604 return;
1605
1606 /* Type filtering - only for database keyspace scanning */
1607 if (data->typename) {
1608 /* For unknown types (LLONG_MAX), skip all keys */
1609 if (data->type == LLONG_MAX)
1610 return;
1611 /* For known types, skip keys that don't match */
1612 if (!objectTypeCompare(kv, data->type))
1613 return;
1614 }
1615 }
1616
1617 if (o == NULL) {
1618 key = keyStr;
1619 } else if (o->type == OBJ_SET) {
1620 key = keyStr;
1621 } else if (o->type == OBJ_HASH) {
1622 key = keyStr;
1623 val = entryGetValue(hashEntry);
1624
1625 /* If field is expired, then ignore */
1626 if (entryIsExpired(hashEntry))
1627 return;
1628
1629 } else if (o->type == OBJ_ZSET) {
1630 char buf[MAX_LONG_DOUBLE_CHARS];
1631 int len = ld2string(buf, sizeof(buf), znode->score, LD_STR_AUTO);
1632 key = sdsdup(keyStr);
1633 val = sdsnewlen(buf, len);
1634 } else {
1635 serverPanic("Type not handled in SCAN callback.");
1636 }
1637
1638 listAddNodeTail(keys, key);
1639 if (val && !data->no_values) listAddNodeTail(keys, val);
1640}
1641
1642/* Try to parse a SCAN cursor stored at object 'o':
1643 * if the cursor is valid, store it as unsigned integer into *cursor and
1644 * returns C_OK. Otherwise return C_ERR and send an error to the
1645 * client. */
1646int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor) {
1647 if (!string2ull(o->ptr, cursor)) {
1648 addReplyError(c, "invalid cursor");
1649 return C_ERR;
1650 }
1651 return C_OK;
1652}
1653
1654char *obj_type_name[OBJ_TYPE_MAX] = {
1655 "string",
1656 "list",
1657 "set",
1658 "zset",
1659 "hash",
1660 NULL, /* module type is special */
1661 "stream"
1662};
1663
1664/* Helper function to get type from a string in scan commands */
1665long long getObjectTypeByName(char *name) {
1666
1667 for (long long i = 0; i < OBJ_TYPE_MAX; i++) {
1668 if (obj_type_name[i] && !strcasecmp(name, obj_type_name[i])) {
1669 return i;
1670 }
1671 }
1672
1673 moduleType *mt = moduleTypeLookupModuleByNameIgnoreCase(name);
1674 if (mt != NULL) return -(REDISMODULE_TYPE_SIGN(mt->entity.id));
1675
1676 return LLONG_MAX;
1677}
1678
1679char *getObjectTypeName(robj *o) {
1680 if (o == NULL) {
1681 return "none";
1682 }
1683
1684 serverAssert(o->type >= 0 && o->type < OBJ_TYPE_MAX);
1685
1686 if (o->type == OBJ_MODULE) {
1687 moduleValue *mv = o->ptr;
1688 return mv->type->entity.name;
1689 } else {
1690 return obj_type_name[o->type];
1691 }
1692}
1693
1694static int scanShouldSkipDict(dict *d, int didx) {
1695 UNUSED(d);
1696 return accessKeysShouldSkipDictIndex(didx);
1697}
1698
1699/* This command implements SCAN, HSCAN and SSCAN commands.
1700 * If object 'o' is passed, then it must be a Hash, Set or Zset object, otherwise
1701 * if 'o' is NULL the command will operate on the dictionary associated with
1702 * the current database.
1703 *
1704 * When 'o' is not NULL the function assumes that the first argument in
1705 * the client arguments vector is a key so it skips it before iterating
1706 * in order to parse options.
1707 *
1708 * In the case of a Hash object the function returns both the field and value
1709 * of every element on the Hash. */
1710void scanGenericCommand(client *c, robj *o, unsigned long long cursor) {
1711 int i, j;
1712 listNode *node;
1713 long count = 10;
1714 sds pat = NULL;
1715 sds typename = NULL;
1716 long long type = LLONG_MAX;
1717 int patlen = 0, use_pattern = 0, no_values = 0;
1718 dict *ht;
1719
1720 /* Object must be NULL (to iterate keys names), or the type of the object
1721 * must be Set, Sorted Set, or Hash. */
1722 serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
1723 o->type == OBJ_ZSET);
1724
1725 /* Set i to the first option argument. The previous one is the cursor. */
1726 i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */
1727
1728 /* Step 1: Parse options. */
1729 while (i < c->argc) {
1730 j = c->argc - i;
1731 if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
1732 if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
1733 != C_OK)
1734 {
1735 return;
1736 }
1737
1738 if (count < 1) {
1739 addReplyErrorObject(c,shared.syntaxerr);
1740 return;
1741 }
1742
1743 i += 2;
1744 } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
1745 pat = c->argv[i+1]->ptr;
1746 patlen = sdslen(pat);
1747
1748 /* The pattern always matches if it is exactly "*", so it is
1749 * equivalent to disabling it. */
1750 use_pattern = !(patlen == 1 && pat[0] == '*');
1751
1752 i += 2;
1753 } else if (!strcasecmp(c->argv[i]->ptr, "type") && o == NULL && j >= 2) {
1754 /* SCAN for a particular type only applies to the db dict */
1755 typename = c->argv[i+1]->ptr;
1756 type = getObjectTypeByName(typename);
1757 if (type == LLONG_MAX) {
1758 /* TODO: uncomment in redis 8.0
1759 addReplyErrorFormat(c, "unknown type name '%s'", typename);
1760 return; */
1761 }
1762 i+= 2;
1763 } else if (!strcasecmp(c->argv[i]->ptr, "novalues")) {
1764 if (!o || o->type != OBJ_HASH) {
1765 addReplyError(c, "NOVALUES option can only be used in HSCAN");
1766 return;
1767 }
1768 no_values = 1;
1769 i++;
1770 } else {
1771 addReplyErrorObject(c,shared.syntaxerr);
1772 return;
1773 }
1774 }
1775
1776 /* Step 2: Iterate the collection.
1777 *
1778 * Note that if the object is encoded with a listpack, intset, or any other
1779 * representation that is not a hash table, we are sure that it is also
1780 * composed of a small number of elements. So to avoid taking state we
1781 * just return everything inside the object in a single call, setting the
1782 * cursor to zero to signal the end of the iteration. */
1783
1784 /* Handle the case of a hash table. */
1785 ht = NULL;
1786 if (o == NULL) {
1787 ht = NULL;
1788 } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
1789 ht = o->ptr;
1790 } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
1791 ht = o->ptr;
1792 } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
1793 zset *zs = o->ptr;
1794 ht = zs->dict;
1795 }
1796
1797 list *keys = listCreate();
1798 /* Set a free callback for the contents of the collected keys list.
1799 * For the main keyspace dict, and when we scan a key that's dict encoded
1800 * (we have 'ht'), we don't need to define free method because the strings
1801 * in the list are just a shallow copy from the pointer in the dictEntry.
1802 * When scanning a key with other encodings (e.g. listpack), we need to
1803 * free the temporary strings we add to that list.
1804 * The exception to the above is ZSET, where we do allocate temporary
1805 * strings even when scanning a dict. */
1806 if (o && (!ht || o->type == OBJ_ZSET)) {
1807 listSetFreeMethod(keys, sdsfreegeneric);
1808 }
1809
1810 /* For main dictionary scan or data structure using hashtable. */
1811 if (!o || ht) {
1812 /* We set the max number of iterations to ten times the specified
1813 * COUNT, so if the hash table is in a pathological state (very
1814 * sparsely populated) we avoid to block too much time at the cost
1815 * of returning no or very few elements. */
1816 long maxiterations = count*10;
1817
1818 /* We pass scanData which have three pointers to the callback:
1819 * 1. data.keys: the list to which it will add new elements;
1820 * 2. data.o: the object containing the dictionary so that
1821 * it is possible to fetch more data in a type-dependent way;
1822 * 3. data.type: the specified type scan in the db, LLONG_MAX means
1823 * type matching is no needed;
1824 * 4. data.pattern: the pattern string;
1825 * 5. data.sampled: the maxiteration limit is there in case we're
1826 * working on an empty dict, one with a lot of empty buckets, and
1827 * for the buckets are not empty, we need to limit the spampled number
1828 * to prevent a long hang time caused by filtering too many keys;
1829 * 6. data.no_values: to control whether values will be returned or
1830 * only keys are returned. */
1831 scanData data = {
1832 .keys = keys,
1833 .o = o,
1834 .type = type,
1835 .pattern = use_pattern ? pat : NULL,
1836 .sampled = 0,
1837 .no_values = no_values,
1838 .typename = typename,
1839 .db = c->db,
1840 };
1841
1842 /* A pattern may restrict all matching keys to one cluster slot. */
1843 int onlydidx = -1;
1844 if (o == NULL && use_pattern && server.cluster_enabled) {
1845 onlydidx = patternHashSlot(pat, patlen);
1846 }
1847 do {
1848 /* In cluster mode there is a separate dictionary for each slot.
1849 * If cursor is empty, we should try exploring next non-empty slot. */
1850 if (o == NULL) {
1851 cursor = kvstoreScan(c->db->keys, cursor, onlydidx, scanCallback, scanShouldSkipDict, &data);
1852 } else {
1853 cursor = dictScan(ht, cursor, scanCallback, &data);
1854 }
1855 } while (cursor && maxiterations-- && data.sampled < count);
1856 } else if (o->type == OBJ_SET) {
1857 unsigned long array_reply_len = 0;
1858 void *replylen = NULL;
1859 listRelease(keys);
1860 char *str;
1861 char buf[LONG_STR_SIZE];
1862 size_t len;
1863 int64_t llele;
1864 /* Reply to the client. */
1865 addReplyArrayLen(c, 2);
1866 /* Cursor is always 0 given we iterate over all set */
1867 addReplyBulkLongLong(c,0);
1868 /* If there is no pattern the length is the entire set size, otherwise we defer the reply size */
1869 if (use_pattern)
1870 replylen = addReplyDeferredLen(c);
1871 else {
1872 array_reply_len = setTypeSize(o);
1873 addReplyArrayLen(c, array_reply_len);
1874 }
1875
1876 setTypeIterator si;
1877 unsigned long cur_length = 0;
1878 setTypeInitIterator(&si, o);
1879 while (setTypeNext(&si, &str, &len, &llele) != -1) {
1880 if (str == NULL) {
1881 len = ll2string(buf, sizeof(buf), llele);
1882 }
1883 char *key = str ? str : buf;
1884 if (use_pattern && !stringmatchlen(pat, patlen, key, len, 0)) {
1885 continue;
1886 }
1887 addReplyBulkCBuffer(c, key, len);
1888 cur_length++;
1889 }
1890 setTypeResetIterator(&si);
1891 if (use_pattern)
1892 setDeferredArrayLen(c,replylen,cur_length);
1893 else
1894 serverAssert(cur_length == array_reply_len); /* fail on corrupt data */
1895 return;
1896 } else if ((o->type == OBJ_HASH || o->type == OBJ_ZSET) &&
1897 o->encoding == OBJ_ENCODING_LISTPACK)
1898 {
1899 unsigned char *p = lpFirst(o->ptr);
1900 unsigned char *str;
1901 int64_t len;
1902 unsigned long array_reply_len = 0;
1903 unsigned char intbuf[LP_INTBUF_SIZE];
1904 void *replylen = NULL;
1905 listRelease(keys);
1906
1907 /* Reply to the client. */
1908 addReplyArrayLen(c, 2);
1909 /* Cursor is always 0 given we iterate over all set */
1910 addReplyBulkLongLong(c,0);
1911 /* If there is no pattern the length is the entire set size, otherwise we defer the reply size */
1912 if (use_pattern)
1913 replylen = addReplyDeferredLen(c);
1914 else {
1915 array_reply_len = o->type == OBJ_HASH ? hashTypeLength(o, 0) : zsetLength(o);
1916 if (!no_values) {
1917 array_reply_len *= 2;
1918 }
1919 addReplyArrayLen(c, array_reply_len);
1920 }
1921 unsigned long cur_length = 0;
1922 while(p) {
1923 str = lpGet(p, &len, intbuf);
1924 /* point to the value */
1925 p = lpNext(o->ptr, p);
1926 if (use_pattern && !stringmatchlen(pat, patlen, (char *)str, len, 0)) {
1927 /* jump to the next key/val pair */
1928 p = lpNext(o->ptr, p);
1929 continue;
1930 }
1931 /* add key object */
1932 addReplyBulkCBuffer(c, str, len);
1933 cur_length++;
1934 /* add value object */
1935 if (!no_values) {
1936 str = lpGet(p, &len, intbuf);
1937 addReplyBulkCBuffer(c, str, len);
1938 cur_length++;
1939 }
1940 p = lpNext(o->ptr, p);
1941 }
1942 if (use_pattern)
1943 setDeferredArrayLen(c,replylen,cur_length);
1944 else
1945 serverAssert(cur_length == array_reply_len); /* fail on corrupt data */
1946 return;
1947 } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_LISTPACK_EX) {
1948 int64_t len;
1949 long long expire_at;
1950 unsigned char *lp = hashTypeListpackGetLp(o);
1951 unsigned char *p = lpFirst(lp);
1952 unsigned char *str, *val;
1953 unsigned char intbuf[LP_INTBUF_SIZE];
1954 void *replylen = NULL;
1955
1956 listRelease(keys);
1957 /* Reply to the client. */
1958 addReplyArrayLen(c, 2);
1959 /* Cursor is always 0 given we iterate over all set */
1960 addReplyBulkLongLong(c,0);
1961 /* In the case of OBJ_ENCODING_LISTPACK_EX we always defer the reply size given some fields might be expired */
1962 replylen = addReplyDeferredLen(c);
1963 unsigned long cur_length = 0;
1964
1965 while (p) {
1966 str = lpGet(p, &len, intbuf);
1967 p = lpNext(lp, p);
1968 val = p; /* Keep pointer to value */
1969
1970 p = lpNext(lp, p);
1971 serverAssert(p && lpGetIntegerValue(p, &expire_at));
1972
1973 if (hashTypeIsExpired(o, expire_at) ||
1974 (use_pattern && !stringmatchlen(pat, patlen, (char *)str, len, 0)))
1975 {
1976 /* jump to the next key/val pair */
1977 p = lpNext(lp, p);
1978 continue;
1979 }
1980
1981 /* add key object */
1982 addReplyBulkCBuffer(c, str, len);
1983 cur_length++;
1984 /* add value object */
1985 if (!no_values) {
1986 str = lpGet(val, &len, intbuf);
1987 addReplyBulkCBuffer(c, str, len);
1988 cur_length++;
1989 }
1990 p = lpNext(lp, p);
1991 }
1992 setDeferredArrayLen(c,replylen,cur_length);
1993 return;
1994 } else {
1995 serverPanic("Not handled encoding in SCAN.");
1996 }
1997
1998 /* Step 3: Reply to the client. */
1999 addReplyArrayLen(c, 2);
2000 addReplyBulkLongLong(c,cursor);
2001
2002 addReplyArrayLen(c, listLength(keys));
2003 while ((node = listFirst(keys)) != NULL) {
2004 void *key = listNodeValue(node);
2005 addReplyBulkCBuffer(c, key, sdslen(key));
2006 listDelNode(keys, node);
2007 }
2008
2009 listRelease(keys);
2010}
2011
2012/* The SCAN command completely relies on scanGenericCommand. */
2013void scanCommand(client *c) {
2014 unsigned long long cursor;
2015 if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return;
2016 scanGenericCommand(c,NULL,cursor);
2017}
2018
2019void dbsizeCommand(client *c) {
2020 addReplyLongLong(c,dbSize(c->db));
2021}
2022
2023void lastsaveCommand(client *c) {
2024 addReplyLongLong(c,server.lastsave);
2025}
2026
2027void typeCommand(client *c) {
2028 kvobj *kv = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH);
2029 addReplyStatus(c, getObjectTypeName(kv));
2030}
2031
2032void shutdownCommand(client *c) {
2033 int flags = SHUTDOWN_NOFLAGS;
2034 int abort = 0;
2035 for (int i = 1; i < c->argc; i++) {
2036 if (!strcasecmp(c->argv[i]->ptr,"nosave")) {
2037 flags |= SHUTDOWN_NOSAVE;
2038 } else if (!strcasecmp(c->argv[i]->ptr,"save")) {
2039 flags |= SHUTDOWN_SAVE;
2040 } else if (!strcasecmp(c->argv[i]->ptr, "now")) {
2041 flags |= SHUTDOWN_NOW;
2042 } else if (!strcasecmp(c->argv[i]->ptr, "force")) {
2043 flags |= SHUTDOWN_FORCE;
2044 } else if (!strcasecmp(c->argv[i]->ptr, "abort")) {
2045 abort = 1;
2046 } else {
2047 addReplyErrorObject(c,shared.syntaxerr);
2048 return;
2049 }
2050 }
2051 if ((abort && flags != SHUTDOWN_NOFLAGS) ||
2052 (flags & SHUTDOWN_NOSAVE && flags & SHUTDOWN_SAVE))
2053 {
2054 /* Illegal combo. */
2055 addReplyErrorObject(c,shared.syntaxerr);
2056 return;
2057 }
2058
2059 if (abort) {
2060 if (abortShutdown() == C_OK)
2061 addReply(c, shared.ok);
2062 else
2063 addReplyError(c, "No shutdown in progress.");
2064 return;
2065 }
2066
2067 if (!(flags & SHUTDOWN_NOW) && c->flags & CLIENT_DENY_BLOCKING) {
2068 addReplyError(c, "SHUTDOWN without NOW or ABORT isn't allowed for DENY BLOCKING client");
2069 return;
2070 }
2071
2072 if (!(flags & SHUTDOWN_NOSAVE) && isInsideYieldingLongCommand()) {
2073 /* Script timed out. Shutdown allowed only with the NOSAVE flag. See
2074 * also processCommand where these errors are returned. */
2075 if (server.busy_module_yield_flags && server.busy_module_yield_reply) {
2076 addReplyErrorFormat(c, "-BUSY %s", server.busy_module_yield_reply);
2077 } else if (server.busy_module_yield_flags) {
2078 addReplyErrorObject(c, shared.slowmoduleerr);
2079 } else if (scriptIsEval()) {
2080 addReplyErrorObject(c, shared.slowevalerr);
2081 } else {
2082 addReplyErrorObject(c, shared.slowscripterr);
2083 }
2084 return;
2085 }
2086
2087 blockClientShutdown(c);
2088 if (prepareForShutdown(flags) == C_OK) exit(0);
2089 /* If we're here, then shutdown is ongoing (the client is still blocked) or
2090 * failed (the client has received an error). */
2091}
2092
2093void renameGenericCommand(client *c, int nx) {
2094 kvobj *o;
2095 int samekey = 0;
2096 uint64_t minHashExpireTime = EB_EXPIRE_TIME_INVALID;
2097
2098 /* When source and dest key is the same, no operation is performed,
2099 * if the key exists, however we still return an error on unexisting key. */
2100 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;
2101
2102 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
2103 return;
2104
2105 if (samekey) {
2106 addReply(c,nx ? shared.czero : shared.ok);
2107 return;
2108 }
2109
2110 incrRefCount(o);
2111 kvobj *destval = lookupKeyWrite(c->db,c->argv[2]);
2112 int overwritten = 0;
2113 int desttype = -1;
2114 if (destval != NULL) {
2115 if (nx) {
2116 decrRefCount(o);
2117 addReply(c,shared.czero);
2118 return;
2119 }
2120
2121 /* Overwrite: delete the old key before creating the new one
2122 * with the same name. */
2123 desttype = destval->type;
2124 dbDelete(c->db,c->argv[2]);
2125 overwritten = 1;
2126 }
2127
2128 /* If hash with expiration on fields then remove it from global HFE DS and
2129 * keep next expiration time. Otherwise, dbDelete() will remove it from the
2130 * global HFE DS and we will lose the expiration time. */
2131 int srctype = o->type;
2132 if (srctype == OBJ_HASH)
2133 minHashExpireTime = estoreRemove(c->db->subexpires, getKeySlot(c->argv[1]->ptr), o);
2134
2135 /* Prepare metadata for the renamed key */
2136 KeyMetaSpec keymeta;
2137 keyMetaSpecInit(&keymeta);
2138 if (o->metabits) keyMetaOnRename(c->db, o, c->argv[1], c->argv[2], &keymeta);
2139
2140 dbDelete(c->db,c->argv[1]);
2141
2142 dbAddInternal(c->db, c->argv[2], &o, NULL, &keymeta);
2143
2144 /* If hash with HFEs, register in DB subexpires */
2145 if (minHashExpireTime != EB_EXPIRE_TIME_INVALID)
2146 estoreAdd(c->db->subexpires, getKeySlot(c->argv[2]->ptr), o, minHashExpireTime);
2147
2148 keyModified(c,c->db,c->argv[1],NULL,1);
2149 keyModified(c,c->db,c->argv[2],NULL,1); /* LRM already updated by dbAddInternal */
2150 notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
2151 c->argv[1],c->db->id);
2152 notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
2153 c->argv[2],c->db->id);
2154 if (overwritten) {
2155 notifyKeyspaceEvent(NOTIFY_OVERWRITTEN, "overwritten", c->argv[2], c->db->id);
2156 if (desttype != srctype)
2157 notifyKeyspaceEvent(NOTIFY_TYPE_CHANGED, "type_changed", c->argv[2], c->db->id);
2158 }
2159 server.dirty++;
2160 addReply(c,nx ? shared.cone : shared.ok);
2161}
2162
2163void renameCommand(client *c) {
2164 renameGenericCommand(c,0);
2165}
2166
2167void renamenxCommand(client *c) {
2168 renameGenericCommand(c,1);
2169}
2170
2171void moveCommand(client *c) {
2172 redisDb *src, *dst;
2173 int srcid, dbid;
2174 uint64_t hashExpireTime = EB_EXPIRE_TIME_INVALID;
2175
2176 if (server.cluster_enabled) {
2177 addReplyError(c,"MOVE is not allowed in cluster mode");
2178 return;
2179 }
2180
2181 /* Obtain source and target DB pointers */
2182 src = c->db;
2183 srcid = c->db->id;
2184
2185 if (getIntFromObjectOrReply(c, c->argv[2], &dbid, NULL) != C_OK)
2186 return;
2187
2188 if (selectDb(c,dbid) == C_ERR) {
2189 addReplyError(c,"DB index is out of range");
2190 return;
2191 }
2192 dst = c->db;
2193 selectDb(c,srcid); /* Back to the source DB */
2194
2195 /* If the user is moving using as target the same
2196 * DB as the source DB it is probably an error. */
2197 if (src == dst) {
2198 addReplyErrorObject(c,shared.sameobjecterr);
2199 return;
2200 }
2201
2202 /* Record incompatible operations in cluster mode */
2203 server.stat_cluster_incompatible_ops++;
2204
2205 /* Check if the element exists and get a reference */
2206 kvobj *kv = lookupKeyWrite(c->db,c->argv[1]);
2207 if (!kv) {
2208 addReply(c,shared.czero);
2209 return;
2210 }
2211
2212 /* Return zero if the key already exists in the target DB */
2213 dictEntryLink dstBucket;
2214 if (lookupKey(dst, c->argv[1], LOOKUP_WRITE, &dstBucket) != NULL) {
2215 addReply(c,shared.czero);
2216 return;
2217 }
2218
2219 int slot = getKeySlot(c->argv[1]->ptr);
2220
2221 /* If hash with expiration on fields, remove it from DB subexpires and keep
2222 * aside registered expiration time. Must be before removal of the
2223 * object since it embeds ExpireMeta that is used by subexpires */
2224 if (kv->type == OBJ_HASH)
2225 hashExpireTime = estoreRemove(src->subexpires, slot, kv);
2226
2227 /* Move a side metadata before dbDelete() */
2228 KeyMetaSpec keymeta;
2229 keyMetaSpecInit(&keymeta);
2230 keyMetaOnMove(kv, c->argv[1], srcid, dbid, &keymeta);
2231
2232 incrRefCount(kv); /* ref counter = 1->2 */
2233 dbDelete(src,c->argv[1]); /* ref counter = 2->1 */
2234
2235 dbAddInternal(dst, c->argv[1], &kv, &dstBucket, &keymeta);
2236
2237 /* If object of type hash with expiration on fields. Taken care to add the
2238 * hash to subexpires of `dst` only after dbDelete(). */
2239 if (hashExpireTime != EB_EXPIRE_TIME_INVALID)
2240 estoreAdd(dst->subexpires, slot, kv, hashExpireTime);
2241
2242 keyModified(c,src,c->argv[1],NULL,1);
2243 keyModified(c,dst,c->argv[1],NULL,1); /* LRM already updated by dbAddInternal */
2244 notifyKeyspaceEvent(NOTIFY_GENERIC,
2245 "move_from",c->argv[1],src->id);
2246 notifyKeyspaceEvent(NOTIFY_GENERIC,
2247 "move_to",c->argv[1],dst->id);
2248
2249 server.dirty++;
2250 addReply(c,shared.cone);
2251}
2252
2253void copyCommand(client *c) {
2254 kvobj *o;
2255 redisDb *src, *dst;
2256 int srcid, dbid;
2257 int j, replace = 0, delete = 0;
2258
2259 /* Obtain source and target DB pointers
2260 * Default target DB is the same as the source DB
2261 * Parse the REPLACE option and targetDB option. */
2262 src = c->db;
2263 dst = c->db;
2264 srcid = c->db->id;
2265 dbid = c->db->id;
2266 for (j = 3; j < c->argc; j++) {
2267 int additional = c->argc - j - 1;
2268 if (!strcasecmp(c->argv[j]->ptr,"replace")) {
2269 replace = 1;
2270 } else if (!strcasecmp(c->argv[j]->ptr, "db") && additional >= 1) {
2271 if (getIntFromObjectOrReply(c, c->argv[j+1], &dbid, NULL) != C_OK)
2272 return;
2273
2274 if (selectDb(c, dbid) == C_ERR) {
2275 addReplyError(c,"DB index is out of range");
2276 return;
2277 }
2278 dst = c->db;
2279 selectDb(c,srcid); /* Back to the source DB */
2280 j++; /* Consume additional arg. */
2281 } else {
2282 addReplyErrorObject(c,shared.syntaxerr);
2283 return;
2284 }
2285 }
2286
2287 if ((server.cluster_enabled == 1) && (srcid != 0 || dbid != 0)) {
2288 addReplyError(c,"Copying to another database is not allowed in cluster mode");
2289 return;
2290 }
2291
2292 /* If the user select the same DB as
2293 * the source DB and using newkey as the same key
2294 * it is probably an error. */
2295 robj *key = c->argv[1];
2296 robj *newkey = c->argv[2];
2297 if (src == dst && (sdscmp(key->ptr, newkey->ptr) == 0)) {
2298 addReplyErrorObject(c,shared.sameobjecterr);
2299 return;
2300 }
2301
2302 if (srcid != 0 || dbid != 0) {
2303 server.stat_cluster_incompatible_ops++;
2304 }
2305
2306 /* Check if the element exists and get a reference */
2307 o = lookupKeyRead(c->db, key);
2308 if (!o) {
2309 addReply(c,shared.czero);
2310 return;
2311 }
2312
2313 /* Return zero if the key already exists in the target DB.
2314 * If REPLACE option is selected, delete newkey from targetDB. */
2315 kvobj *destval = lookupKeyWrite(dst,newkey);
2316 if (destval != NULL) {
2317 if (replace) {
2318 delete = 1;
2319 } else {
2320 addReply(c,shared.czero);
2321 return;
2322 }
2323 }
2324 int destoldtype = destval ? destval->type : -1;
2325 int destnewtype = o->type;
2326
2327 /* Duplicate object according to object's type. */
2328 robj *newobj;
2329 uint64_t minHashExpire = EB_EXPIRE_TIME_INVALID; /* HFE feature */
2330 switch(o->type) {
2331 case OBJ_STRING: newobj = dupStringObject(o); break;
2332 case OBJ_LIST: newobj = listTypeDup(o); break;
2333 case OBJ_SET: newobj = setTypeDup(o); break;
2334 case OBJ_ZSET: newobj = zsetDup(o); break;
2335 case OBJ_HASH: newobj = hashTypeDup(o, &minHashExpire); break;
2336 case OBJ_STREAM: newobj = streamDup(o); break;
2337 case OBJ_MODULE:
2338 newobj = moduleTypeDupOrReply(c, key, newkey, dst->id, o);
2339 if (!newobj) return;
2340 break;
2341 default:
2342 addReplyError(c, "unknown type object");
2343 return;
2344 }
2345
2346 if (delete) {
2347 dbDelete(dst,newkey);
2348 }
2349
2350 /* Prepare metadata for the new key */
2351 KeyMetaSpec keymeta;
2352 keyMetaSpecInit(&keymeta);
2353 if (o->metabits) keyMetaOnCopy(o, key, newkey, c->db->id, dst->id, &keymeta);
2354
2355 kvobj *kvCopy = dbAddInternal(dst, newkey, &newobj, NULL, &keymeta);
2356
2357 /* If minExpiredField was set, then the object is hash with expiration
2358 * on fields and need to register it in global HFE DS */
2359 if (minHashExpire != EB_EXPIRE_TIME_INVALID)
2360 estoreAdd(dst->subexpires, getKeySlot(newkey->ptr), kvCopy, minHashExpire);
2361
2362 /* OK! key copied. Signal modification (LRM already updated by dbAddInternal) */
2363 keyModified(c,dst,c->argv[2],NULL,1);
2364 notifyKeyspaceEvent(NOTIFY_GENERIC,"copy_to",c->argv[2],dst->id);
2365
2366 /* `delete` implies the destination key was overwritten */
2367 if (delete) {
2368 notifyKeyspaceEvent(NOTIFY_OVERWRITTEN, "overwritten", c->argv[2], dst->id);
2369 if (destoldtype != destnewtype)
2370 notifyKeyspaceEvent(NOTIFY_TYPE_CHANGED, "type_changed", c->argv[2], dst->id);
2371 }
2372
2373 server.dirty++;
2374 addReply(c,shared.cone);
2375}
2376
2377/* Helper function for dbSwapDatabases(): scans the list of keys that have
2378 * one or more blocked clients for B[LR]POP or other blocking commands
2379 * and signal the keys as ready if they are of the right type. See the comment
2380 * where the function is used for more info. */
2381void scanDatabaseForReadyKeys(redisDb *db) {
2382 dictEntry *de;
2383 dictIterator di;
2384 dictInitSafeIterator(&di, db->blocking_keys);
2385 while((de = dictNext(&di)) != NULL) {
2386 robj *key = dictGetKey(de);
2387 kvobj *kv = dbFind(db, key->ptr);
2388 if (kv)
2389 signalKeyAsReady(db, key, kv->type);
2390 }
2391 dictResetIterator(&di);
2392}
2393
2394/* Since we are unblocking XREADGROUP clients in the event the key was
2395 * deleted/overwritten we must do the same in case the database was
2396 * flushed/swapped. If 'slots' is not NULL, only keys in the specified slot
2397 * range are considered. */
2398void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with, slotRangeArray *slots) {
2399 dictEntry *de;
2400 dictIterator di;
2401
2402 dictInitSafeIterator(&di, emptied->blocking_keys);
2403 while((de = dictNext(&di)) != NULL) {
2404 robj *key = dictGetKey(de);
2405 /* Check if key belongs to the slot range. */
2406 if (slots && !slotRangeArrayContains(slots, keyHashSlot(key->ptr, sdslen(key->ptr))))
2407 continue;
2408 int existed = 0, exists = 0;
2409 int original_type = -1, curr_type = -1;
2410
2411 kvobj *kv = dbFind(emptied, key->ptr);
2412 if (kv) {
2413 original_type = kv->type;
2414 existed = 1;
2415 }
2416
2417 if (replaced_with) {
2418 kv = dbFind(replaced_with, key->ptr);
2419 if (kv) {
2420 curr_type = kv->type;
2421 exists = 1;
2422 }
2423 }
2424 /* We want to try to unblock any client using a blocking XREADGROUP */
2425 if ((existed && !exists) || original_type != curr_type)
2426 signalDeletedKeyAsReady(emptied, key, original_type);
2427 }
2428 dictResetIterator(&di);
2429}
2430
2431/* Swap two databases at runtime so that all clients will magically see
2432 * the new database even if already connected. Note that the client
2433 * structure c->db points to a given DB, so we need to be smarter and
2434 * swap the underlying referenced structures, otherwise we would need
2435 * to fix all the references to the Redis DB structure.
2436 *
2437 * Returns C_ERR if at least one of the DB ids are out of range, otherwise
2438 * C_OK is returned. */
2439int dbSwapDatabases(int id1, int id2) {
2440 if (id1 < 0 || id1 >= server.dbnum ||
2441 id2 < 0 || id2 >= server.dbnum) return C_ERR;
2442 if (id1 == id2) return C_OK;
2443 redisDb aux = server.db[id1];
2444 redisDb *db1 = &server.db[id1], *db2 = &server.db[id2];
2445
2446 /* Swapdb should make transaction fail if there is any
2447 * client watching keys */
2448 touchAllWatchedKeysInDb(db1, db2, NULL);
2449 touchAllWatchedKeysInDb(db2, db1, NULL);
2450
2451 /* Try to unblock any XREADGROUP clients if the key no longer exists. */
2452 scanDatabaseForDeletedKeys(db1, db2, NULL);
2453 scanDatabaseForDeletedKeys(db2, db1, NULL);
2454
2455 /* Swap hash tables. Note that we don't swap blocking_keys,
2456 * ready_keys and watched_keys, since we want clients to
2457 * remain in the same DB they were. */
2458 db1->keys = db2->keys;
2459 db1->expires = db2->expires;
2460 db1->subexpires = db2->subexpires;
2461 db1->avg_ttl = db2->avg_ttl;
2462 db1->expires_cursor = db2->expires_cursor;
2463
2464 db2->keys = aux.keys;
2465 db2->expires = aux.expires;
2466 db2->subexpires = aux.subexpires;
2467 db2->avg_ttl = aux.avg_ttl;
2468 db2->expires_cursor = aux.expires_cursor;
2469
2470 /* Now we need to handle clients blocked on lists: as an effect
2471 * of swapping the two DBs, a client that was waiting for list
2472 * X in a given DB, may now actually be unblocked if X happens
2473 * to exist in the new version of the DB, after the swap.
2474 *
2475 * However normally we only do this check for efficiency reasons
2476 * in dbAdd() when a list is created. So here we need to rescan
2477 * the list of clients blocked on lists and signal lists as ready
2478 * if needed. */
2479 scanDatabaseForReadyKeys(db1);
2480 scanDatabaseForReadyKeys(db2);
2481 return C_OK;
2482}
2483
2484/* Logically, this discards (flushes) the old main database, and apply the newly loaded
2485 * database (temp) as the main (active) database, the actual freeing of old database
2486 * (which will now be placed in the temp one) is done later. */
2487void swapMainDbWithTempDb(redisDb *tempDb) {
2488 for (int i=0; i<server.dbnum; i++) {
2489 redisDb aux = server.db[i];
2490 redisDb *activedb = &server.db[i], *newdb = &tempDb[i];
2491
2492 /* Swapping databases should make transaction fail if there is any
2493 * client watching keys. */
2494 touchAllWatchedKeysInDb(activedb, newdb, NULL);
2495
2496 /* Try to unblock any XREADGROUP clients if the key no longer exists. */
2497 scanDatabaseForDeletedKeys(activedb, newdb, NULL);
2498
2499 /* Swap hash tables. Note that we don't swap blocking_keys,
2500 * ready_keys and watched_keys, since clients
2501 * remain in the same DB they were. */
2502 activedb->keys = newdb->keys;
2503 activedb->expires = newdb->expires;
2504 activedb->subexpires = newdb->subexpires;
2505 activedb->avg_ttl = newdb->avg_ttl;
2506 activedb->expires_cursor = newdb->expires_cursor;
2507
2508 newdb->keys = aux.keys;
2509 newdb->expires = aux.expires;
2510 newdb->subexpires = aux.subexpires;
2511 newdb->avg_ttl = aux.avg_ttl;
2512 newdb->expires_cursor = aux.expires_cursor;
2513
2514 /* Now we need to handle clients blocked on lists: as an effect
2515 * of swapping the two DBs, a client that was waiting for list
2516 * X in a given DB, may now actually be unblocked if X happens
2517 * to exist in the new version of the DB, after the swap.
2518 *
2519 * However normally we only do this check for efficiency reasons
2520 * in dbAdd() when a list is created. So here we need to rescan
2521 * the list of clients blocked on lists and signal lists as ready
2522 * if needed. */
2523 scanDatabaseForReadyKeys(activedb);
2524 }
2525
2526 trackingInvalidateKeysOnFlush(1);
2527 flushSlaveKeysWithExpireList();
2528}
2529
2530/* SWAPDB db1 db2 */
2531void swapdbCommand(client *c) {
2532 int id1, id2;
2533
2534 /* Not allowed in cluster mode: we have just DB 0 there. */
2535 if (server.cluster_enabled) {
2536 addReplyError(c,"SWAPDB is not allowed in cluster mode");
2537 return;
2538 }
2539
2540 /* Get the two DBs indexes. */
2541 if (getIntFromObjectOrReply(c, c->argv[1], &id1,
2542 "invalid first DB index") != C_OK)
2543 return;
2544
2545 if (getIntFromObjectOrReply(c, c->argv[2], &id2,
2546 "invalid second DB index") != C_OK)
2547 return;
2548
2549 /* Swap... */
2550 if (dbSwapDatabases(id1,id2) == C_ERR) {
2551 addReplyError(c,"DB index is out of range");
2552 return;
2553 } else {
2554 RedisModuleSwapDbInfo si = {REDISMODULE_SWAPDBINFO_VERSION,id1,id2};
2555 moduleFireServerEvent(REDISMODULE_EVENT_SWAPDB,0,&si);
2556 server.dirty++;
2557 server.stat_cluster_incompatible_ops++;
2558 addReply(c,shared.ok);
2559 }
2560}
2561
2562/*-----------------------------------------------------------------------------
2563 * Expires API
2564 *----------------------------------------------------------------------------*/
2565
2566/* Remove expiry from key
2567 *
2568 * Remove the object from db->expires and set to -1 attached TTL to KV
2569 */
2570int removeExpire(redisDb *db, robj *key) {
2571 int table;
2572 int slot = getKeySlot(key->ptr);
2573 dictEntryLink link = kvstoreDictTwoPhaseUnlinkFind(db->expires, slot, key->ptr, &table);
2574
2575 if (link == NULL) return 0;
2576 dictEntry *de = *link;
2577 kvobj *kv = dictGetKV(de);
2578 kvobj *newkv = kvobjSetExpire(kv, -1);
2579 serverAssert(newkv == kv);
2580 kvstoreDictTwoPhaseUnlinkFree(db->expires, slot, link, table);
2581 return 1;
2582}
2583
2584
2585/* Set an expire to the specified key. If the expire is set in the context
2586 * of an user calling a command 'c' is the client, otherwise 'c' is set
2587 * to NULL. The 'when' parameter is the absolute unix time in milliseconds
2588 * after which the key will no longer be considered valid.
2589 *
2590 * Note: It may reallocate kvobj. The returned ref may point to a new object. */
2591kvobj *setExpire(client *c, redisDb *db, robj *key, long long when) {
2592 return setExpireByLink(c,db,key->ptr,when,NULL);
2593}
2594
2595/* Like setExpire(), but accepts an optional `keyLink` to save lookup */
2596kvobj *setExpireByLink(client *c, redisDb *db, sds key, long long when, dictEntryLink keyLink) {
2597 /* Reuse the sds from the main dict in the expire dict */
2598 int slot = getKeySlot(key);
2599 size_t oldsize = 0;
2600 if (!keyLink) {
2601 keyLink = kvstoreDictFindLink(db->keys, slot, key, NULL);
2602 serverAssert(keyLink != NULL);
2603 }
2604 kvobj *kv = dictGetKV(*keyLink);
2605 long long old_when = kvobjGetExpire(kv);
2606
2607 if (old_when != -1) { /* old expire */
2608 kvobj *kvnew = kvobjSetExpire(kv, when); /* release kv if reallocated */
2609 /* Val already had an expire field, so it was not reallocated. */
2610 serverAssert(kv == kvnew);
2611 } else { /* No old expire */
2612 if (server.memory_tracking_per_slot)
2613 oldsize = kvobjAllocSize(kv);
2614 uint64_t subexpiry = EB_EXPIRE_TIME_INVALID;
2615 /* If hash with HFEs, take care to remove from global HFE DS before attempting
2616 * to manipulate and maybe free kv object */
2617 if (kv->type == OBJ_HASH)
2618 subexpiry = estoreRemove(db->subexpires, slot, kv);
2619
2620 kvobj *kvnew = kvobjSetExpire(kv, when); /* release kv if reallocated */
2621 /* if kvobj was reallocated, update dict */
2622 if (kv != kvnew) {
2623 kvstoreDictSetAtLink(db->keys, slot, kvnew, &keyLink, 0);
2624 if (server.memory_tracking_per_slot)
2625 updateSlotAllocSize(db, slot, oldsize, kvobjAllocSize(kvnew));
2626 kv = kvnew;
2627 }
2628 /* Now add to expires */
2629 dictEntry *de = kvstoreDictAddRaw(db->expires, slot, kv, NULL);
2630 serverAssert(de != NULL);
2631
2632 if (subexpiry != EB_EXPIRE_TIME_INVALID)
2633 estoreAdd(db->subexpires, slot, kv, subexpiry);
2634 }
2635
2636 int writable_slave = server.masterhost && server.repl_slave_ro == 0;
2637 if (c && writable_slave && !(c->flags & CLIENT_MASTER))
2638 rememberSlaveKeyWithExpire(db,key);
2639 return kv;
2640}
2641
2642/* Retrieve the expiration time for the specified key.
2643 * Returns -1 if the key has no expiration set or doesn't exists
2644 *
2645 * To avoid lookup, pass key-value object (`kv`) instead of `key`.
2646 */
2647long long getExpire(redisDb *db, sds key, kvobj *kv) {
2648 if (kv == NULL) kv = dbFindExpires(db, key);
2649 if (kv == NULL) return -1;
2650 return kvobjGetExpire(kv);
2651}
2652
2653/* Delete the specified expired or evicted key and propagate to replicas.
2654 * Currently notify_type can only be NOTIFY_EXPIRED or NOTIFY_EVICTED,
2655 * and it affects other aspects like the latency monitor event name and,
2656 * which config to look for lazy free, stats var to increment, and so on.
2657 *
2658 * key_mem_freed is an out parameter which contains the estimated
2659 * amount of memory freed due to the trimming (may be NULL) */
2660static void deleteKeyAndPropagate(redisDb *db, robj *keyobj, int notify_type, long long *key_mem_freed) {
2661 mstime_t latency;
2662 int del_flag = notify_type == NOTIFY_EXPIRED ? DB_FLAG_KEY_EXPIRED : DB_FLAG_KEY_EVICTED;
2663 int lazy_flag = notify_type == NOTIFY_EXPIRED ? server.lazyfree_lazy_expire : server.lazyfree_lazy_eviction;
2664 char *latency_name = notify_type == NOTIFY_EXPIRED ? "expire-del" : "evict-del";
2665 char *notify_name = notify_type == NOTIFY_EXPIRED ? "expired" : "evicted";
2666
2667 /* The key needs to be converted from static to heap before deleted */
2668 int static_key = keyobj->refcount == OBJ_STATIC_REFCOUNT;
2669 if (static_key) {
2670 keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr));
2671 }
2672
2673 serverLog(LL_DEBUG,"key %s %s: deleting it", (char*)keyobj->ptr, notify_type == NOTIFY_EXPIRED ? "expired" : "evicted");
2674
2675 /* We compute the amount of memory freed by db*Delete() alone.
2676 * It is possible that actually the memory needed to propagate
2677 * the DEL in AOF and replication link is greater than the one
2678 * we are freeing removing the key, but we can't account for
2679 * that otherwise we would never exit the loop.
2680 *
2681 * Same for CSC invalidation messages generated by keyModified.
2682 *
2683 * AOF and Output buffer memory will be freed eventually so
2684 * we only care about memory used by the key space.
2685 *
2686 * The code here used to first propagate and then record delta
2687 * using only zmalloc_used_memory but in CRDT we can't do that
2688 * so we use freeMemoryGetNotCountedMemory to avoid counting
2689 * AOF and slave buffers */
2690 if (key_mem_freed) *key_mem_freed = (long long) zmalloc_used_memory() - freeMemoryGetNotCountedMemory();
2691 latencyStartMonitor(latency);
2692 dbGenericDelete(db, keyobj, lazy_flag, del_flag);
2693 latencyEndMonitor(latency);
2694 latencyAddSampleIfNeeded(latency_name, latency);
2695 if (key_mem_freed) *key_mem_freed -= (long long) zmalloc_used_memory() - freeMemoryGetNotCountedMemory();
2696
2697 notifyKeyspaceEvent(notify_type, notify_name,keyobj, db->id);
2698 keyModified(NULL, db, keyobj, NULL, 1);
2699 propagateDeletion(db, keyobj, lazy_flag);
2700
2701 if (notify_type == NOTIFY_EXPIRED)
2702 server.stat_expiredkeys++;
2703 else
2704 server.stat_evictedkeys++;
2705
2706 if (static_key)
2707 decrRefCount(keyobj);
2708}
2709
2710/* Delete the specified expired key and propagate. */
2711void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
2712 deleteKeyAndPropagate(db, keyobj, NOTIFY_EXPIRED, NULL);
2713}
2714
2715/* Delete the specified evicted key and propagate. */
2716void deleteEvictedKeyAndPropagate(redisDb *db, robj *keyobj, long long *key_mem_freed) {
2717 deleteKeyAndPropagate(db, keyobj, NOTIFY_EVICTED, key_mem_freed);
2718}
2719
2720/* Propagate an implicit key deletion into replicas and the AOF file.
2721 * When a key was deleted in the master by eviction, expiration or a similar
2722 * mechanism a DEL/UNLINK operation for this key is sent
2723 * to all the replicas and the AOF file if enabled.
2724 *
2725 * This way the key deletion is centralized in one place, and since both
2726 * AOF and the replication link guarantee operation ordering, everything
2727 * will be consistent even if we allow write operations against deleted
2728 * keys.
2729 *
2730 * This function may be called from:
2731 * 1. Within call(): Example: Lazy-expire on key access.
2732 * In this case the caller doesn't have to do anything
2733 * because call() handles server.also_propagate(); or
2734 * 2. Outside of call(): Example: Active-expire, eviction, slot ownership changed.
2735 * In this the caller must remember to call
2736 * postExecutionUnitOperations, preferably just after a
2737 * single deletion batch, so that DEL/UNLINK will NOT be wrapped
2738 * in MULTI/EXEC */
2739void propagateDeletion(redisDb *db, robj *key, int lazy) {
2740 robj *argv[2];
2741
2742 argv[0] = lazy ? shared.unlink : shared.del;
2743 argv[1] = key;
2744 incrRefCount(argv[0]);
2745 incrRefCount(argv[1]);
2746
2747 /* If the master decided to delete a key we must propagate it to replicas no matter what.
2748 * Even if module executed a command without asking for propagation. */
2749 int prev_replication_allowed = server.replication_allowed;
2750 server.replication_allowed = 1;
2751 alsoPropagate(db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL);
2752 server.replication_allowed = prev_replication_allowed;
2753
2754 decrRefCount(argv[0]);
2755 decrRefCount(argv[1]);
2756}
2757
2758/* Check if the key is expired
2759 *
2760 * Provide either the key name for a lookup or KV object (to save lookup)
2761 */
2762int keyIsExpired(redisDb *db, sds key, kvobj *kv) {
2763 /* Don't expire anything while loading. It will be done later. */
2764 if (server.loading || server.allow_access_expired) return 0;
2765 mstime_t when = getExpire(db, key, kv);
2766 if (when < 0) return 0; /* No expire for this key */
2767 const mstime_t now = commandTimeSnapshot();
2768 /* The key expired if the current (virtual or real) time is greater
2769 * than the expire time of the key. */
2770 return now > when;
2771}
2772
2773/* Check if user configuration allows key to be deleted due to expiary */
2774int confAllowsExpireDel(void) {
2775 if (server.lazyexpire_nested_arbitrary_keys)
2776 return 1;
2777
2778 /* This configuration specifically targets nested commands, to align with RE's feature of replication between dbs.
2779 * transactions (from scripts or multi-exec) containing commands like SCAN and RANDOMKEY will execute locally, but their
2780 * lazy-expiration DELs may induce CROSS-SLOT on remote proxy in mode replica-of (RED-161574) */
2781 return !(server.execution_nesting > 1 && server.executing_client->cmd->flags & CMD_TOUCHES_ARBITRARY_KEYS);
2782}
2783
2784/* This function is called when we are going to perform some operation
2785 * in a given key, but such key may be already logically expired even if
2786 * it still exists in the database. The main way this function is called
2787 * is via lookupKey*() family of functions.
2788 *
2789 * The behavior of the function depends on the replication role of the
2790 * instance, because by default replicas do not delete expired keys. They
2791 * wait for DELs from the master for consistency matters. However even
2792 * replicas will try to have a coherent return value for the function,
2793 * so that read commands executed in the replica side will be able to
2794 * behave like if the key is expired even if still present (because the
2795 * master has yet to propagate the DEL).
2796 *
2797 * In masters as a side effect of finding a key which is expired, such
2798 * key will be evicted from the database. Also this may trigger the
2799 * propagation of a DEL/UNLINK command in AOF / replication stream.
2800 *
2801 * On replicas, this function does not delete expired keys by default, but
2802 * it still returns KEY_EXPIRED if the key is logically expired. To force deletion
2803 * of logically expired keys even on replicas, use the EXPIRE_FORCE_DELETE_EXPIRED
2804 * flag. Note though that if the current client is executing
2805 * replicated commands from the master, keys are never considered expired.
2806 *
2807 * On the other hand, if you just want expiration check, but need to avoid
2808 * the actual key deletion and propagation of the deletion, use the
2809 * EXPIRE_AVOID_DELETE_EXPIRED flag. If also needed to read expired key (that
2810 * hasn't being deleted yet) then use EXPIRE_ALLOW_ACCESS_EXPIRED.
2811 *
2812 * The return value of the function is KEY_VALID if the key is still valid.
2813 * The function returns KEY_EXPIRED if the key is expired BUT not deleted,
2814 * or returns KEY_DELETED if the key is expired and deleted. If the key is in a
2815 * trim job due to slot migration, the function returns KEY_TRIMMED, unless
2816 * EXPIRE_ALLOW_ACCESS_TRIMMED is set, in which case it returns KEY_VALID.
2817 *
2818 * You can optionally pass `kv` to save a lookup.
2819 */
2820keyStatus expireIfNeeded(redisDb *db, robj *key, kvobj *kv, int flags) {
2821 debugAssert(key != NULL || kv != NULL);
2822
2823 /* NOTE: Keys in slots scheduled for trimming can still exist for a while.
2824 * We don't delete it here, return KEY_VALID if allowing access to trimmed
2825 * keys, and return KEY_TRIMMED otherwise. */
2826 sds key_name = key ? key->ptr : kvobjGetKey(kv);
2827 if (asmIsKeyInTrimJob(key_name)) {
2828 if (server.allow_access_trimmed || (flags & EXPIRE_ALLOW_ACCESS_TRIMMED))
2829 return KEY_VALID;
2830
2831 return KEY_TRIMMED;
2832 }
2833
2834 if ((flags & EXPIRE_ALLOW_ACCESS_EXPIRED) ||
2835 (!keyIsExpired(db, key ? key->ptr : NULL, kv)))
2836 return KEY_VALID;
2837
2838 /* If we are running in the context of a replica, instead of
2839 * evicting the expired key from the database, we return ASAP:
2840 * the replica key expiration is controlled by the master that will
2841 * send us synthesized DEL operations for expired keys. The
2842 * exception is when write operations are performed on writable
2843 * replicas.
2844 *
2845 * In cluster mode, we also return ASAP if we are importing data
2846 * from the source, to avoid deleting keys that are still in use.
2847 * We create a fake master client for data import, which can be
2848 * identified using the CLIENT_MASTER flag.
2849 *
2850 * Still we try to return the right information to the caller,
2851 * that is, KEY_VALID if we think the key should still be valid,
2852 * KEY_EXPIRED if we think the key is expired but don't want to delete it at this time.
2853 *
2854 * When replicating commands from the master, keys are never considered
2855 * expired. */
2856 if (server.masterhost != NULL || server.cluster_enabled) {
2857 if (server.current_client && (server.current_client->flags & CLIENT_MASTER)) return KEY_VALID;
2858 if (server.masterhost != NULL && !(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED;
2859 }
2860
2861 /* Check if user configuration disables lazy-expire deletions in current state.
2862 * This will only apply if the server doesn't mandate key deletion to operate correctly (write commands). */
2863 if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED) && !confAllowsExpireDel())
2864 return KEY_EXPIRED;
2865
2866 /* In some cases we're explicitly instructed to return an indication of a
2867 * missing key without actually deleting it, even on masters. */
2868 if (flags & EXPIRE_AVOID_DELETE_EXPIRED)
2869 return KEY_EXPIRED;
2870
2871 /* If 'expire' action is paused, for whatever reason, then don't expire any key.
2872 * Typically, at the end of the pause we will properly expire the key OR we
2873 * will have failed over and the new primary will send us the expire. */
2874 if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return KEY_EXPIRED;
2875
2876 /* Perform deletion */
2877 if (key) {
2878 deleteExpiredKeyAndPropagate(db, key);
2879 } else {
2880 sds keyname = kvobjGetKey(kv);
2881 robj *tmpkey = createStringObject(keyname, sdslen(keyname));
2882 deleteExpiredKeyAndPropagate(db, tmpkey);
2883 decrRefCount(tmpkey);
2884 }
2885 return KEY_DELETED;
2886}
2887
2888/* CB passed to kvstoreExpand.
2889 * The purpose is to skip expansion of unused dicts in cluster mode (all
2890 * dicts not mapped to *my* slots) */
2891static int dbExpandSkipSlot(int slot) {
2892 return !clusterNodeCoversSlot(getMyClusterNode(), slot);
2893}
2894
2895/*
2896 * This functions increases size of the main/expires db to match desired number.
2897 * In cluster mode resizes all individual dictionaries for slots that this node owns.
2898 *
2899 * Based on the parameter `try_expand`, appropriate dict expand API is invoked.
2900 * if try_expand is set to 1, `dictTryExpand` is used else `dictExpand`.
2901 * The return code is either `DICT_OK`/`DICT_ERR` for both the API(s).
2902 * `DICT_OK` response is for successful expansion. However ,`DICT_ERR` response signifies failure in allocation in
2903 * `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed.
2904 */
2905static int dbExpandGeneric(kvstore *kvs, uint64_t db_size, int try_expand) {
2906 int ret;
2907 if (server.cluster_enabled) {
2908 /* We don't know exact number of keys that would fall into each slot, but we can
2909 * approximate it, assuming even distribution, divide it by the number of slots. */
2910 int slots = getMyShardSlotCount();
2911 if (slots == 0) return C_OK;
2912 db_size = db_size / slots;
2913 ret = kvstoreExpand(kvs, db_size, try_expand, dbExpandSkipSlot);
2914 } else {
2915 ret = kvstoreExpand(kvs, db_size, try_expand, NULL);
2916 }
2917
2918 return ret? C_OK : C_ERR;
2919}
2920
2921int dbExpand(redisDb *db, uint64_t db_size, int try_expand) {
2922 return dbExpandGeneric(db->keys, db_size, try_expand);
2923}
2924
2925int dbExpandExpires(redisDb *db, uint64_t db_size, int try_expand) {
2926 return dbExpandGeneric(db->expires, db_size, try_expand);
2927}
2928
2929static kvobj *dbFindGeneric(kvstore *kvs, sds key) {
2930 dictEntry *res = kvstoreDictFind(kvs, getKeySlot(key), key);
2931 return (res) ? dictGetKey(res) : NULL;
2932}
2933
2934kvobj *dbFind(redisDb *db, sds key) {
2935 return dbFindGeneric(db->keys, key);
2936}
2937
2938/* Find a KV in the main db. Return also link to it.
2939 *
2940 * plink - If found, set to the link of the key in the dict.
2941 * If not found, set to the bucket where the key should be added.
2942 * If set to NULL, then HT of dict not allocated yet.
2943 */
2944kvobj *dbFindByLink(redisDb *db, sds key, dictEntryLink *plink) {
2945 int slot = getKeySlot(key);
2946 dictEntryLink link, bucket;
2947
2948 link = kvstoreDictFindLink(db->keys, slot, key, &bucket);
2949 if (link == NULL) {
2950 if (plink) *plink = bucket;
2951 return NULL;
2952 } else {
2953 if (plink) *plink = link;
2954 return dictGetKV(*link);
2955 }
2956}
2957
2958kvobj *dbFindExpires(redisDb *db, sds key) {
2959 return dbFindGeneric(db->expires, key);
2960}
2961
2962unsigned long long dbSize(redisDb *db) {
2963 unsigned long long total = kvstoreSize(db->keys);
2964
2965 if (server.cluster_enabled) {
2966 /* If we are the master and there is no import or trim in progress,
2967 * then we can return the total count. If not, we need to subtract
2968 * the number of keys in slots that are not accessible, as below. */
2969 if (clusterNodeIsMaster(getMyClusterNode()) &&
2970 !asmImportInProgress() &&
2971 !asmIsTrimInProgress())
2972 {
2973 return total;
2974 }
2975
2976 /* Besides, we don't know the slot migration states on replicas, so we
2977 * need to check each slot to see if it's accessible. */
2978 for (int i = 0; i < CLUSTER_SLOTS; i++) {
2979 dict *d = kvstoreGetDict(db->keys, i);
2980 if (d && !clusterCanAccessKeysInSlot(i)) {
2981 total -= kvstoreDictSize(db->keys, i);
2982 }
2983 }
2984 }
2985
2986 return total;
2987}
2988
2989unsigned long long dbScan(redisDb *db, unsigned long long cursor, dictScanFunction *scan_cb, void *privdata) {
2990 return kvstoreScan(db->keys, cursor, -1, scan_cb, scanShouldSkipDict, privdata);
2991}
2992
2993/* -----------------------------------------------------------------------------
2994 * API to get key arguments from commands
2995 * ---------------------------------------------------------------------------*/
2996
2997/* Prepare the getKeysResult struct to hold numkeys, either by using the
2998 * pre-allocated keysbuf or by allocating a new array on the heap.
2999 *
3000 * This function must be called at least once before starting to populate
3001 * the result, and can be called repeatedly to enlarge the result array.
3002 */
3003keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys) {
3004 /* GETKEYS_RESULT_INIT initializes keys to NULL, point it to the pre-allocated stack
3005 * buffer here. */
3006 if (!result->keys) {
3007 serverAssert(!result->numkeys);
3008 result->keys = result->keysbuf;
3009 }
3010
3011 /* Resize if necessary */
3012 if (numkeys > result->size) {
3013 if (result->keys != result->keysbuf) {
3014 /* We're not using a static buffer, just (re)alloc */
3015 result->keys = zrealloc(result->keys, numkeys * sizeof(keyReference));
3016 } else {
3017 /* We are using a static buffer, copy its contents */
3018 result->keys = zmalloc(numkeys * sizeof(keyReference));
3019 if (result->numkeys)
3020 memcpy(result->keys, result->keysbuf, result->numkeys * sizeof(keyReference));
3021 }
3022 result->size = numkeys;
3023 }
3024
3025 return result->keys;
3026}
3027
3028/* Returns a bitmask with all the flags found in any of the key specs of the command.
3029 * The 'inv' argument means we'll return a mask with all flags that are missing in at least one spec. */
3030int64_t getAllKeySpecsFlags(struct redisCommand *cmd, int inv) {
3031 int64_t flags = 0;
3032 for (int j = 0; j < cmd->key_specs_num; j++) {
3033 keySpec *spec = cmd->key_specs + j;
3034 flags |= inv? ~spec->flags : spec->flags;
3035 }
3036 return flags;
3037}
3038
3039/* Fetch the keys based of the provided key specs. Returns the number of keys found, or -1 on error.
3040 * There are several flags that can be used to modify how this function finds keys in a command.
3041 *
3042 * GET_KEYSPEC_INCLUDE_NOT_KEYS: Return 'fake' keys as if they were keys.
3043 * GET_KEYSPEC_RETURN_PARTIAL: Skips invalid and incomplete keyspecs but returns the keys
3044 * found in other valid keyspecs.
3045 */
3046int getKeysUsingKeySpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) {
3047 long j, i, last, first, step;
3048 keyReference *keys;
3049 serverAssert(result->numkeys == 0); /* caller should initialize or reset it */
3050
3051 for (j = 0; j < cmd->key_specs_num; j++) {
3052 keySpec *spec = cmd->key_specs + j;
3053 serverAssert(spec->begin_search_type != KSPEC_BS_INVALID);
3054 /* Skip specs that represent 'fake' keys */
3055 if ((spec->flags & CMD_KEY_NOT_KEY) && !(search_flags & GET_KEYSPEC_INCLUDE_NOT_KEYS)) {
3056 continue;
3057 }
3058
3059 first = 0;
3060 if (spec->begin_search_type == KSPEC_BS_INDEX) {
3061 first = spec->bs.index.pos;
3062 } else if (spec->begin_search_type == KSPEC_BS_KEYWORD) {
3063 int start_index = spec->bs.keyword.startfrom > 0 ? spec->bs.keyword.startfrom : argc+spec->bs.keyword.startfrom;
3064 int end_index = spec->bs.keyword.startfrom > 0 ? argc-1: 1;
3065 for (i = start_index; i != end_index; i = start_index <= end_index ? i + 1 : i - 1) {
3066 if (i >= argc || i < 1)
3067 break;
3068 if (!strcasecmp((char*)argv[i]->ptr,spec->bs.keyword.keyword)) {
3069 first = i+1;
3070 break;
3071 }
3072 }
3073 /* keyword not found */
3074 if (!first) {
3075 continue;
3076 }
3077 } else {
3078 /* unknown spec */
3079 goto invalid_spec;
3080 }
3081
3082 if (spec->find_keys_type == KSPEC_FK_RANGE) {
3083 step = spec->fk.range.keystep;
3084 if (spec->fk.range.lastkey >= 0) {
3085 last = first + spec->fk.range.lastkey;
3086 } else {
3087 if (!spec->fk.range.limit) {
3088 last = argc + spec->fk.range.lastkey;
3089 } else {
3090 serverAssert(spec->fk.range.lastkey == -1);
3091 last = first + ((argc-first)/spec->fk.range.limit + spec->fk.range.lastkey);
3092 }
3093 }
3094 } else if (spec->find_keys_type == KSPEC_FK_KEYNUM) {
3095 step = spec->fk.keynum.keystep;
3096 long long numkeys;
3097 if (spec->fk.keynum.keynumidx >= argc)
3098 goto invalid_spec;
3099
3100 sds keynum_str = argv[first + spec->fk.keynum.keynumidx]->ptr;
3101 if (!string2ll(keynum_str,sdslen(keynum_str),&numkeys) || numkeys < 0) {
3102 /* Unable to parse the numkeys argument or it was invalid */
3103 goto invalid_spec;
3104 }
3105
3106 first += spec->fk.keynum.firstkey;
3107 last = first + ((long)numkeys - 1) * step;
3108 } else {
3109 /* unknown spec */
3110 goto invalid_spec;
3111 }
3112
3113 /* First or last is out of bounds, which indicates a syntax error */
3114 if (last >= argc || last < first || first >= argc) {
3115 goto invalid_spec;
3116 }
3117
3118 int count = ((last - first)+1);
3119 keys = getKeysPrepareResult(result, result->numkeys + count);
3120
3121 for (i = first; i <= last; i += step) {
3122 if (i >= argc || i < first) {
3123 /* Modules commands, and standard commands with a not fixed number
3124 * of arguments (negative arity parameter) do not have dispatch
3125 * time arity checks, so we need to handle the case where the user
3126 * passed an invalid number of arguments here. In this case we
3127 * return no keys and expect the command implementation to report
3128 * an arity or syntax error. */
3129 if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
3130 continue;
3131 } else {
3132 serverPanic("Redis built-in command declared keys positions not matching the arity requirements.");
3133 }
3134 }
3135 keys[result->numkeys].pos = i;
3136 keys[result->numkeys].flags = spec->flags;
3137 result->numkeys++;
3138 }
3139
3140 /* Handle incomplete specs (only after we added the current spec
3141 * to `keys`, just in case GET_KEYSPEC_RETURN_PARTIAL was given) */
3142 if (spec->flags & CMD_KEY_INCOMPLETE) {
3143 goto invalid_spec;
3144 }
3145
3146 /* Done with this spec */
3147 continue;
3148
3149invalid_spec:
3150 if (search_flags & GET_KEYSPEC_RETURN_PARTIAL) {
3151 continue;
3152 } else {
3153 result->numkeys = 0;
3154 return -1;
3155 }
3156 }
3157
3158 return result->numkeys;
3159}
3160
3161/* Return all the arguments that are keys in the command passed via argc / argv.
3162 * This function will eventually replace getKeysFromCommand.
3163 *
3164 * The command returns the positions of all the key arguments inside the array,
3165 * so the actual return value is a heap allocated array of integers. The
3166 * length of the array is returned by reference into *numkeys.
3167 *
3168 * Along with the position, this command also returns the flags that are
3169 * associated with how Redis will access the key.
3170 *
3171 * 'cmd' must be point to the corresponding entry into the redisCommand
3172 * table, according to the command name in argv[0]. */
3173int getKeysFromCommandWithSpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) {
3174 /* The command has at least one key-spec not marked as NOT_KEY */
3175 int has_keyspec = (getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY);
3176 /* The command has at least one key-spec marked as VARIABLE_FLAGS */
3177 int has_varflags = (getAllKeySpecsFlags(cmd, 0) & CMD_KEY_VARIABLE_FLAGS);
3178
3179 /* We prefer key-specs if there are any, and their flags are reliable. */
3180 if (has_keyspec && !has_varflags) {
3181 int ret = getKeysUsingKeySpecs(cmd,argv,argc,search_flags,result);
3182 if (ret >= 0)
3183 return ret;
3184 /* If the specs returned with an error (probably an INVALID or INCOMPLETE spec),
3185 * fallback to the callback method. */
3186 }
3187
3188 /* Resort to getkeys callback methods. */
3189 if (cmd->flags & CMD_MODULE_GETKEYS)
3190 return moduleGetCommandKeysViaAPI(cmd,argv,argc,result);
3191
3192 /* We use native getkeys as a last resort, since not all these native getkeys provide
3193 * flags properly (only the ones that correspond to INVALID, INCOMPLETE or VARIABLE_FLAGS do.*/
3194 if (cmd->getkeys_proc)
3195 return cmd->getkeys_proc(cmd,argv,argc,result);
3196 return 0;
3197}
3198
3199/* This function returns a sanity check if the command may have keys. */
3200int doesCommandHaveKeys(struct redisCommand *cmd) {
3201 return cmd->getkeys_proc || /* has getkeys_proc (non modules) */
3202 (cmd->flags & CMD_MODULE_GETKEYS) || /* module with GETKEYS */
3203 (getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY); /* has at least one key-spec not marked as NOT_KEY */
3204}
3205
3206/* A simplified channel spec table that contains all of the redis commands
3207 * and which channels they have and how they are accessed. */
3208typedef struct ChannelSpecs {
3209 redisCommandProc *proc; /* Command procedure to match against */
3210 uint64_t flags; /* CMD_CHANNEL_* flags for this command */
3211 int start; /* The initial position of the first channel */
3212 int count; /* The number of channels, or -1 if all remaining
3213 * arguments are channels. */
3214} ChannelSpecs;
3215
3216ChannelSpecs commands_with_channels[] = {
3217 {subscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1},
3218 {ssubscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1},
3219 {unsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1},
3220 {sunsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1},
3221 {psubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_SUBSCRIBE, 1, -1},
3222 {punsubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_UNSUBSCRIBE, 1, -1},
3223 {publishCommand, CMD_CHANNEL_PUBLISH, 1, 1},
3224 {spublishCommand, CMD_CHANNEL_PUBLISH, 1, 1},
3225 {NULL,0} /* Terminator. */
3226};
3227
3228/* Returns 1 if the command may access any channels matched by the flags
3229 * argument. */
3230int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags) {
3231 /* If a module declares get channels, we are just going to assume
3232 * has channels. This API is allowed to return false positives. */
3233 if (cmd->flags & CMD_MODULE_GETCHANNELS) {
3234 return 1;
3235 }
3236 for (ChannelSpecs *spec = commands_with_channels; spec->proc != NULL; spec += 1) {
3237 if (cmd->proc == spec->proc) {
3238 return !!(spec->flags & flags);
3239 }
3240 }
3241 return 0;
3242}
3243
3244/* Return all the arguments that are channels in the command passed via argc / argv.
3245 * This function behaves similar to getKeysFromCommandWithSpecs, but with channels
3246 * instead of keys.
3247 *
3248 * The command returns the positions of all the channel arguments inside the array,
3249 * so the actual return value is a heap allocated array of integers. The
3250 * length of the array is returned by reference into *numkeys.
3251 *
3252 * Along with the position, this command also returns the flags that are
3253 * associated with how Redis will access the channel.
3254 *
3255 * 'cmd' must be point to the corresponding entry into the redisCommand
3256 * table, according to the command name in argv[0]. */
3257int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3258 keyReference *keys;
3259 /* If a module declares get channels, use that. */
3260 if (cmd->flags & CMD_MODULE_GETCHANNELS) {
3261 return moduleGetCommandChannelsViaAPI(cmd, argv, argc, result);
3262 }
3263 /* Otherwise check the channel spec table */
3264 for (ChannelSpecs *spec = commands_with_channels; spec != NULL; spec += 1) {
3265 if (cmd->proc == spec->proc) {
3266 int start = spec->start;
3267 int stop = (spec->count == -1) ? argc : start + spec->count;
3268 if (stop > argc) stop = argc;
3269 int count = 0;
3270 keys = getKeysPrepareResult(result, stop - start);
3271 for (int i = start; i < stop; i++ ) {
3272 keys[count].pos = i;
3273 keys[count++].flags = spec->flags;
3274 }
3275 result->numkeys = count;
3276 return count;
3277 }
3278 }
3279 return 0;
3280}
3281
3282/* Extract keys/channels from a command and calculate the cluster slot.
3283 * Returns the number of keys/channels extracted.
3284 * The slot number is returned by reference into *slot.
3285 * If is_incomplete is not NULL, it will be set for key extraction.
3286 *
3287 * This function handles both regular commands (keys) and sharded pubsub
3288 * commands (channels), but excludes regular pubsub commands which don't
3289 * have slots.
3290 */
3291int extractKeysAndSlot(struct redisCommand *cmd, robj **argv, int argc,
3292 getKeysResult *result, int *slot) {
3293 int num_keys = -1;
3294
3295 if (!doesCommandHaveChannelsWithFlags(cmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE)) {
3296 num_keys = getKeysFromCommandWithSpecs(cmd, argv, argc, GET_KEYSPEC_DEFAULT, result);
3297 } else {
3298 /* Only extract channels for commands that have key_specs (sharded pubsub).
3299 * Regular pubsub commands (PUBLISH, SUBSCRIBE) don't have slots. */
3300 if (cmd->key_specs_num > 0) {
3301 num_keys = getChannelsFromCommand(cmd, argv, argc, result);
3302 } else {
3303 num_keys = 0;
3304 }
3305 }
3306
3307 *slot = extractSlotFromKeysResult(argv, result);
3308 return num_keys;
3309}
3310
3311/* The base case is to use the keys position as given in the command table
3312 * (firstkey, lastkey, step).
3313 * This function works only on command with the legacy_range_key_spec,
3314 * all other commands should be handled by getkeys_proc.
3315 *
3316 * If the commands keyspec is incomplete, no keys will be returned, and the provided
3317 * keys function should be called instead.
3318 *
3319 * NOTE: This function does not guarantee populating the flags for
3320 * the keys, in order to get flags you should use getKeysUsingKeySpecs. */
3321int getKeysUsingLegacyRangeSpec(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3322 int j, i = 0, last, first, step;
3323 keyReference *keys;
3324 UNUSED(argv);
3325
3326 if (cmd->legacy_range_key_spec.begin_search_type == KSPEC_BS_INVALID) {
3327 result->numkeys = 0;
3328 return 0;
3329 }
3330
3331 first = cmd->legacy_range_key_spec.bs.index.pos;
3332 last = cmd->legacy_range_key_spec.fk.range.lastkey;
3333 if (last >= 0)
3334 last += first;
3335 step = cmd->legacy_range_key_spec.fk.range.keystep;
3336
3337 if (last < 0) last = argc+last;
3338
3339 int count = ((last - first)+1);
3340 keys = getKeysPrepareResult(result, count);
3341
3342 for (j = first; j <= last; j += step) {
3343 if (j >= argc || j < first) {
3344 /* Modules commands, and standard commands with a not fixed number
3345 * of arguments (negative arity parameter) do not have dispatch
3346 * time arity checks, so we need to handle the case where the user
3347 * passed an invalid number of arguments here. In this case we
3348 * return no keys and expect the command implementation to report
3349 * an arity or syntax error. */
3350 if (cmd->flags & CMD_MODULE || cmd->arity < 0) {
3351 result->numkeys = 0;
3352 return 0;
3353 } else {
3354 serverPanic("Redis built-in command declared keys positions not matching the arity requirements.");
3355 }
3356 }
3357 keys[i].pos = j;
3358 /* Flags are omitted from legacy key specs */
3359 keys[i++].flags = 0;
3360 }
3361 result->numkeys = i;
3362 return i;
3363}
3364
3365/* Return all the arguments that are keys in the command passed via argc / argv.
3366 *
3367 * The command returns the positions of all the key arguments inside the array,
3368 * so the actual return value is a heap allocated array of integers. The
3369 * length of the array is returned by reference into *numkeys.
3370 *
3371 * 'cmd' must be point to the corresponding entry into the redisCommand
3372 * table, according to the command name in argv[0].
3373 *
3374 * This function uses the command table if a command-specific helper function
3375 * is not required, otherwise it calls the command-specific function. */
3376int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3377 if (cmd->flags & CMD_MODULE_GETKEYS) {
3378 return moduleGetCommandKeysViaAPI(cmd,argv,argc,result);
3379 } else if (cmd->getkeys_proc) {
3380 return cmd->getkeys_proc(cmd,argv,argc,result);
3381 } else {
3382 return getKeysUsingLegacyRangeSpec(cmd,argv,argc,result);
3383 }
3384}
3385
3386/* Free the result of getKeysFromCommand. */
3387void getKeysFreeResult(getKeysResult *result) {
3388 if (result && result->keys != result->keysbuf)
3389 zfree(result->keys);
3390}
3391
3392/* Helper function to extract keys from following commands:
3393 * COMMAND [destkey] <num-keys> <key> [...] <key> [...] ... <options>
3394 *
3395 * eg:
3396 * ZUNION <num-keys> <key> <key> ... <key> <options>
3397 * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options>
3398 *
3399 * 'storeKeyOfs': destkey index, 0 means destkey not exists.
3400 * 'keyCountOfs': num-keys index.
3401 * 'firstKeyOfs': firstkey index.
3402 * 'keyStep': the interval of each key, usually this value is 1.
3403 *
3404 * The commands using this function have a fully defined keyspec, so returning flags isn't needed. */
3405int genericGetKeys(int storeKeyOfs, int keyCountOfs, int firstKeyOfs, int keyStep,
3406 robj **argv, int argc, getKeysResult *result) {
3407 int i, num;
3408 keyReference *keys;
3409
3410 num = atoi(argv[keyCountOfs]->ptr);
3411 /* Sanity check. Don't return any key if the command is going to
3412 * reply with syntax error. (no input keys). */
3413 if (num < 1 || num > (argc - firstKeyOfs)/keyStep) {
3414 result->numkeys = 0;
3415 return 0;
3416 }
3417
3418 int numkeys = storeKeyOfs ? num + 1 : num;
3419 keys = getKeysPrepareResult(result, numkeys);
3420 result->numkeys = numkeys;
3421
3422 /* Add all key positions for argv[firstKeyOfs...n] to keys[] */
3423 for (i = 0; i < num; i++) {
3424 keys[i].pos = firstKeyOfs+(i*keyStep);
3425 keys[i].flags = 0;
3426 }
3427
3428 if (storeKeyOfs) {
3429 keys[num].pos = storeKeyOfs;
3430 keys[num].flags = 0;
3431 }
3432 return result->numkeys;
3433}
3434
3435int sintercardGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3436 UNUSED(cmd);
3437 return genericGetKeys(0, 1, 2, 1, argv, argc, result);
3438}
3439
3440int zunionInterDiffStoreGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3441 UNUSED(cmd);
3442 return genericGetKeys(1, 2, 3, 1, argv, argc, result);
3443}
3444
3445int zunionInterDiffGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3446 UNUSED(cmd);
3447 return genericGetKeys(0, 1, 2, 1, argv, argc, result);
3448}
3449
3450int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3451 UNUSED(cmd);
3452 return genericGetKeys(0, 2, 3, 1, argv, argc, result);
3453}
3454
3455int functionGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3456 UNUSED(cmd);
3457 return genericGetKeys(0, 2, 3, 1, argv, argc, result);
3458}
3459
3460int lmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3461 UNUSED(cmd);
3462 return genericGetKeys(0, 1, 2, 1, argv, argc, result);
3463}
3464
3465int blmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3466 UNUSED(cmd);
3467 return genericGetKeys(0, 2, 3, 1, argv, argc, result);
3468}
3469
3470int zmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3471 UNUSED(cmd);
3472 return genericGetKeys(0, 1, 2, 1, argv, argc, result);
3473}
3474
3475int bzmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3476 UNUSED(cmd);
3477 return genericGetKeys(0, 2, 3, 1, argv, argc, result);
3478}
3479
3480/* Helper function to extract keys from the SORT RO command.
3481 *
3482 * SORT <sort-key>
3483 *
3484 * The second argument of SORT is always a key, however an arbitrary number of
3485 * keys may be accessed while doing the sort (the BY and GET args), so the
3486 * key-spec declares incomplete keys which is why we have to provide a concrete
3487 * implementation to fetch the keys.
3488 *
3489 * This command declares incomplete keys, so the flags are correctly set for this function */
3490int sortROGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3491 keyReference *keys;
3492 UNUSED(cmd);
3493 UNUSED(argv);
3494 UNUSED(argc);
3495
3496 keys = getKeysPrepareResult(result, 1);
3497 keys[0].pos = 1; /* <sort-key> is always present. */
3498 keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
3499 result->numkeys = 1;
3500 return result->numkeys;
3501}
3502
3503/* Helper function to extract keys from the SORT command.
3504 *
3505 * SORT <sort-key> ... STORE <store-key> ...
3506 *
3507 * The first argument of SORT is always a key, however a list of options
3508 * follow in SQL-alike style. Here we parse just the minimum in order to
3509 * correctly identify keys in the "STORE" option.
3510 *
3511 * This command declares incomplete keys, so the flags are correctly set for this function */
3512int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3513 int i, j, num, found_store = 0;
3514 keyReference *keys;
3515 UNUSED(cmd);
3516
3517 num = 0;
3518 keys = getKeysPrepareResult(result, 2); /* Alloc 2 places for the worst case. */
3519 keys[num].pos = 1; /* <sort-key> is always present. */
3520 keys[num++].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
3521
3522 /* Search for STORE option. By default we consider options to don't
3523 * have arguments, so if we find an unknown option name we scan the
3524 * next. However there are options with 1 or 2 arguments, so we
3525 * provide a list here in order to skip the right number of args. */
3526 struct {
3527 char *name;
3528 int skip;
3529 } skiplist[] = {
3530 {"limit", 2},
3531 {"get", 1},
3532 {"by", 1},
3533 {NULL, 0} /* End of elements. */
3534 };
3535
3536 for (i = 2; i < argc; i++) {
3537 for (j = 0; skiplist[j].name != NULL; j++) {
3538 if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) {
3539 i += skiplist[j].skip;
3540 break;
3541 } else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) {
3542 /* Note: we don't increment "num" here and continue the loop
3543 * to be sure to process the *last* "STORE" option if multiple
3544 * ones are provided. This is same behavior as SORT. */
3545 found_store = 1;
3546 keys[num].pos = i+1; /* <store-key> */
3547 keys[num].flags = CMD_KEY_OW | CMD_KEY_UPDATE;
3548 break;
3549 }
3550 }
3551 }
3552 result->numkeys = num + found_store;
3553 return result->numkeys;
3554}
3555
3556/* This command declares incomplete keys, so the flags are correctly set for this function */
3557int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3558 int i, j, num, first;
3559 keyReference *keys;
3560 UNUSED(cmd);
3561
3562 /* Assume the obvious form. */
3563 first = 3;
3564 num = 1;
3565
3566 /* But check for the extended one with the KEYS option. */
3567 struct {
3568 char* name;
3569 int skip;
3570 } skip_keywords[] = {
3571 {"copy", 0},
3572 {"replace", 0},
3573 {"auth", 1},
3574 {"auth2", 2},
3575 {NULL, 0}
3576 };
3577 if (argc > 6) {
3578 for (i = 6; i < argc; i++) {
3579 if (!strcasecmp(argv[i]->ptr, "keys")) {
3580 if (sdslen(argv[3]->ptr) > 0) {
3581 /* This is a syntax error. So ignore the keys and leave
3582 * the syntax error to be handled by migrateCommand. */
3583 num = 0;
3584 } else {
3585 first = i + 1;
3586 num = argc - first;
3587 }
3588 break;
3589 }
3590 for (j = 0; skip_keywords[j].name != NULL; j++) {
3591 if (!strcasecmp(argv[i]->ptr, skip_keywords[j].name)) {
3592 i += skip_keywords[j].skip;
3593 break;
3594 }
3595 }
3596 }
3597 }
3598
3599 keys = getKeysPrepareResult(result, num);
3600 for (i = 0; i < num; i++) {
3601 keys[i].pos = first+i;
3602 keys[i].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_DELETE;
3603 }
3604 result->numkeys = num;
3605 return num;
3606}
3607
3608/* Helper function to extract keys from following commands:
3609 * GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC]
3610 * [COUNT count] [STORE key|STOREDIST key]
3611 * GEORADIUSBYMEMBER key member radius unit ... options ...
3612 *
3613 * This command has a fully defined keyspec, so returning flags isn't needed. */
3614int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3615 int i, num;
3616 keyReference *keys;
3617 UNUSED(cmd);
3618
3619 /* Check for the presence of the stored key in the command */
3620 int stored_key = -1;
3621 for (i = 5; i < argc; i++) {
3622 char *arg = argv[i]->ptr;
3623 /* For the case when user specifies both "store" and "storedist" options, the
3624 * second key specified would override the first key. This behavior is kept
3625 * the same as in georadiusCommand method.
3626 */
3627 if ((!strcasecmp(arg, "store") || !strcasecmp(arg, "storedist")) && ((i+1) < argc)) {
3628 stored_key = i+1;
3629 i++;
3630 }
3631 }
3632 num = 1 + (stored_key == -1 ? 0 : 1);
3633
3634 /* Keys in the command come from two places:
3635 * argv[1] = key,
3636 * argv[5...n] = stored key if present
3637 */
3638 keys = getKeysPrepareResult(result, num);
3639
3640 /* Add all key positions to keys[] */
3641 keys[0].pos = 1;
3642 keys[0].flags = 0;
3643 if(num > 1) {
3644 keys[1].pos = stored_key;
3645 keys[1].flags = 0;
3646 }
3647 result->numkeys = num;
3648 return num;
3649}
3650
3651/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>]
3652 * STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N
3653 *
3654 * This command has a fully defined keyspec, so returning flags isn't needed. */
3655int xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3656 int i, num = 0;
3657 keyReference *keys;
3658 UNUSED(cmd);
3659
3660 /* We need to parse the options of the command in order to seek the first
3661 * "STREAMS" string which is actually the option. This is needed because
3662 * "STREAMS" could also be the name of the consumer group and even the
3663 * name of the stream key. */
3664 int streams_pos = -1;
3665 for (i = 1; i < argc; i++) {
3666 char *arg = argv[i]->ptr;
3667 if (!strcasecmp(arg, "block")) {
3668 i++; /* Skip option argument. */
3669 } else if (!strcasecmp(arg, "count")) {
3670 i++; /* Skip option argument. */
3671 } else if (!strcasecmp(arg, "group")) {
3672 i += 2; /* Skip option argument. */
3673 } else if (!strcasecmp(arg, "noack")) {
3674 /* Nothing to do. */
3675 } else if (!strcasecmp(arg, "streams")) {
3676 streams_pos = i;
3677 break;
3678 } else {
3679 break; /* Syntax error. */
3680 }
3681 }
3682 if (streams_pos != -1) num = argc - streams_pos - 1;
3683
3684 /* Syntax error. */
3685 if (streams_pos == -1 || num == 0 || num % 2 != 0) {
3686 result->numkeys = 0;
3687 return 0;
3688 }
3689 num /= 2; /* We have half the keys as there are arguments because
3690 there are also the IDs, one per key. */
3691
3692 keys = getKeysPrepareResult(result, num);
3693 for (i = streams_pos+1; i < argc-num; i++) {
3694 keys[i-streams_pos-1].pos = i;
3695 keys[i-streams_pos-1].flags = 0;
3696 }
3697 result->numkeys = num;
3698 return num;
3699}
3700
3701/* Helper function to extract keys from the SET command, which may have
3702 * an RW flag if the GET, IF* arguments are present, OW otherwise. */
3703int setGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3704 keyReference *keys;
3705 UNUSED(cmd);
3706
3707 keys = getKeysPrepareResult(result, 1);
3708 keys[0].pos = 1; /* We always know the position */
3709 result->numkeys = 1;
3710 int actual = CMD_KEY_OW;
3711 int logical = CMD_KEY_UPDATE;
3712
3713 for (int i = 3; i < argc; i++) {
3714 char *arg = argv[i]->ptr;
3715 if ((arg[0] == 'g' || arg[0] == 'G') &&
3716 (arg[1] == 'e' || arg[1] == 'E') &&
3717 (arg[2] == 't' || arg[2] == 'T') && arg[3] == '\0')
3718 {
3719 actual = CMD_KEY_RW;
3720 logical |= CMD_KEY_ACCESS;
3721 } else if (!strcasecmp(arg, "ifeq") || !strcasecmp(arg, "ifne") ||
3722 !strcasecmp(arg, "ifdeq") || !strcasecmp(arg, "ifdne"))
3723 {
3724 actual = CMD_KEY_RW;
3725 }
3726 }
3727
3728 keys[0].flags = actual | logical;
3729
3730 return 1;
3731}
3732
3733/* Helper function to extract keys from the DELEX command, which may have
3734 * an RW flag if the IF* arguments are present, RM otherwise. */
3735int delexGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3736 keyReference *keys;
3737 UNUSED(cmd);
3738
3739 keys = getKeysPrepareResult(result, 1);
3740 keys[0].pos = 1; /* We always know the position */
3741 result->numkeys = 1;
3742 int actual = CMD_KEY_RM;
3743 int logical = CMD_KEY_DELETE;
3744
3745 for (int i = 2; i < argc; i++) {
3746 char *arg = argv[i]->ptr;
3747 if (!strcasecmp(arg, "ifeq") || !strcasecmp(arg, "ifne") ||
3748 !strcasecmp(arg, "ifdeq") || !strcasecmp(arg, "ifdne"))
3749 {
3750 actual = CMD_KEY_RW;
3751 }
3752 }
3753
3754 keys[0].flags = actual | logical;
3755
3756 return 1;
3757}
3758
3759/* Helper function to extract keys from the BITFIELD command, which may be
3760 * read-only if the BITFIELD GET subcommand is used. */
3761int bitfieldGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) {
3762 keyReference *keys;
3763 int readonly = 1;
3764 UNUSED(cmd);
3765
3766 keys = getKeysPrepareResult(result, 1);
3767 keys[0].pos = 1; /* We always know the position */
3768 result->numkeys = 1;
3769
3770 for (int i = 2; i < argc; i++) {
3771 int remargs = argc - i - 1; /* Remaining args other than current. */
3772 char *arg = argv[i]->ptr;
3773 if (!strcasecmp(arg, "get") && remargs >= 2) {
3774 i += 2;
3775 } else if ((!strcasecmp(arg, "set") || !strcasecmp(arg, "incrby")) && remargs >= 3) {
3776 readonly = 0;
3777 i += 3;
3778 break;
3779 } else if (!strcasecmp(arg, "overflow") && remargs >= 1) {
3780 i += 1;
3781 } else {
3782 readonly = 0; /* Syntax error. safer to assume non-RO. */
3783 break;
3784 }
3785 }
3786
3787 if (readonly) {
3788 keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS;
3789 } else {
3790 keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE;
3791 }
3792 return 1;
3793}