diff options
Diffstat (limited to 'examples/redis-unstable/src/db.c')
| -rw-r--r-- | examples/redis-unstable/src/db.c | 3793 |
1 files changed, 3793 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/db.c b/examples/redis-unstable/src/db.c new file mode 100644 index 0000000..2a64147 --- /dev/null +++ b/examples/redis-unstable/src/db.c @@ -0,0 +1,3793 @@ +/* + * Copyright (c) 2009-Present, Redis Ltd. + * All rights reserved. + * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. + */ + +#include "server.h" +#include "cluster.h" +#include "atomicvar.h" +#include "latency.h" +#include "script.h" +#include "functions.h" +#include "cluster_asm.h" +#include "redisassert.h" + +#include <signal.h> +#include <ctype.h> +#include "bio.h" +#include "keymeta.h" + +/*----------------------------------------------------------------------------- + * C-level DB API + *----------------------------------------------------------------------------*/ + +static_assert(MAX_KEYSIZES_TYPES == OBJ_TYPE_BASIC_MAX, "Must be equal"); + +/* Flags for expireIfNeeded */ +#define EXPIRE_FORCE_DELETE_EXPIRED 1 +#define EXPIRE_AVOID_DELETE_EXPIRED 2 +#define EXPIRE_ALLOW_ACCESS_EXPIRED 4 +#define EXPIRE_ALLOW_ACCESS_TRIMMED 8 + +/* Return values for expireIfNeeded */ +typedef enum { + KEY_VALID = 0, /* Could be volatile and not yet expired, non-volatile, or even non-existing key. */ + KEY_EXPIRED, /* Logically expired but not yet deleted. */ + KEY_DELETED, /* The key was deleted now. */ + KEY_TRIMMED /* Logically trimmed but not yet deleted. */ +} keyStatus; + +static keyStatus expireIfNeeded(redisDb *db, robj *key, kvobj *kv, int flags); + +/* Update LFU when an object is accessed. + * Firstly, decrement the counter if the decrement time is reached. + * Then logarithmically increment the counter, and update the access time. */ +void updateLFU(robj *val) { + unsigned long counter = LFUDecrAndReturn(val); + counter = LFULogIncr(counter); + val->lru = (LFUGetTimeInMinutes()<<8) | counter; +} + +/* Update LRM when an object is modified. */ +void updateLRM(robj *o) { + if (o->refcount == OBJ_SHARED_REFCOUNT) + return; + if (server.maxmemory_policy & MAXMEMORY_FLAG_LRM) { + o->lru = LRU_CLOCK(); + } +} + +/* + * Update histogram of keys-sizes + * + * It is used to track the distribution of key sizes in the dataset. It is updated + * every time key's length is modified. Available to user via INFO command. + * + * The histogram is a base-2 logarithmic histogram, with 64 bins. The i'th bin + * represents the number of keys with a size in the range 2^i and 2^(i+1) + * exclusive. oldLen/newLen must be smaller than 2^48, and if their value + * equals -1, it means that the key is being created/deleted, respectively. Each + * data type has its own histogram and it is per database (In addition, there is + * histogram per slot for future cluster use). + * + * Example mapping of key lengths to bins: + * [1,2)->1 [2,4)->2 [4,8)->3 [8,16)->4 ... + * + * Since strings can be zero length, the histogram also tracks: + * [0,1)->0 + */ +void updateKeysizesHist(redisDb *db, int didx, uint32_t type, int64_t oldLen, int64_t newLen) { + if(unlikely(type >= OBJ_TYPE_BASIC_MAX)) + return; + + kvstoreDictMetadata *dictMeta = kvstoreGetDictMeta(db->keys, didx, 0); + kvstoreMetadata *kvstoreMeta = kvstoreGetMetadata(db->keys); + + if (oldLen > 0) { + int old_bin = log2ceil(oldLen) + 1; + debugServerAssert(old_bin < MAX_KEYSIZES_BINS); + /* If following a key deletion it is last one in slot's dict, then + * slot's dict might get released as well. Verify if metadata is not NULL. */ + if(dictMeta) { + dictMeta->keysizes_hist[type][old_bin]--; + debugServerAssert(dictMeta->keysizes_hist[type][old_bin] >= 0); + } + kvstoreMeta->keysizes_hist[type][old_bin]--; + debugServerAssert(kvstoreMeta->keysizes_hist[type][old_bin] >= 0); + } else { + /* here, oldLen can be either 0 or -1 */ + if (oldLen == 0) { + /* Only strings can be empty. Yet, a command flow might temporarily + * dbAdd() empty collection, and only after add elements. */ + + if (dictMeta) { + dictMeta->keysizes_hist[type][0]--; + debugServerAssert(dictMeta->keysizes_hist[type][0] >= 0); + } + kvstoreMeta->keysizes_hist[type][0]--; + debugServerAssert(kvstoreMeta->keysizes_hist[type][0] >= 0); + } + } + + if (newLen > 0) { + int new_bin = log2ceil(newLen) + 1; + debugServerAssert(new_bin < MAX_KEYSIZES_BINS); + /* If following a key deletion it is last one in slot's dict, then + * slot's dict might get released as well. Verify if metadata is not NULL. */ + if(dictMeta) dictMeta->keysizes_hist[type][new_bin]++; + kvstoreMeta->keysizes_hist[type][new_bin]++; + } else { + /* here, newLen can be either 0 or -1 */ + if (newLen == 0) { + /* Only strings can be empty. Yet, a command flow might temporarily + * dbAdd() empty collection, and only after add elements. */ + + if (dictMeta) dictMeta->keysizes_hist[type][0]++; + kvstoreMeta->keysizes_hist[type][0]++; + } + } +} + +void updateSlotAllocSize(redisDb *db, int didx, size_t oldsize, size_t newsize) { + debugServerAssert(server.memory_tracking_per_slot); + kvstoreDictMetadata *dictMeta = kvstoreGetDictMeta(db->keys, didx, 0); + if (!dictMeta) return; + debugServerAssert(oldsize <= dictMeta->alloc_size); + dictMeta->alloc_size -= oldsize; + dictMeta->alloc_size += newsize; +} + +/* Assert keysizes histogram (For debugging only) + * + * Triggered by DEBUG KEYSIZES-HIST-ASSERT 1 and tested after each command. + */ +void dbgAssertKeysizesHist(redisDb *db) { + /* Scan DB and build expected histogram by scanning all keys */ + int64_t scanHist[MAX_KEYSIZES_TYPES][MAX_KEYSIZES_BINS] = {{0}}; + dictEntry *de; + kvstoreIterator kvs_it; + kvstoreIteratorInit(&kvs_it, db->keys); + while ((de = kvstoreIteratorNext(&kvs_it)) != NULL) { + kvobj *kv = dictGetKV(de); + if (kv->type < OBJ_TYPE_BASIC_MAX) { + int64_t len = getObjectLength(kv); + scanHist[kv->type][(len == 0) ? 0 : log2ceil(len) + 1]++; + } + } + kvstoreIteratorReset(&kvs_it); + for (int type = 0; type < OBJ_TYPE_BASIC_MAX; type++) { + kvstoreMetadata *meta = kvstoreGetMetadata(db->keys); + volatile int64_t *keysizesHist = meta->keysizes_hist[type]; + for (int i = 0; i < MAX_KEYSIZES_BINS; i++) { + if (scanHist[type][i] == keysizesHist[i]) + continue; + + /* print scanStr vs. expected histograms for debugging */ + char scanStr[500], keysizesStr[500]; + int l1 = 0, l2 = 0; + for (int j = 0; (j < MAX_KEYSIZES_BINS) && (l1 < 500) && (l2 < 500); j++) { + if (scanHist[type][j]) + l1 += snprintf(scanStr + l1, sizeof(scanStr) - l1, + "[%d]=%"PRId64" ", j, scanHist[type][j]); + if (keysizesHist[j]) + l2 += snprintf(keysizesStr + l2, sizeof(keysizesStr) - l2, + "[%d]=%"PRId64" ", j, keysizesHist[j]); + } + serverPanic("dbgAssertKeysizesHist: type=%d\nscanStr=%s\nkeysizes=%s\n", + type, scanStr, keysizesStr); + } + } +} + +/* Assert per-slot alloc_size (For debugging only) + * + * Triggered by DEBUG ALLOCSIZE-SLOTS-ASSERT 1 and tested after each command. + */ +void dbgAssertAllocSizePerSlot(redisDb *db) { + if (!server.memory_tracking_per_slot) return; + size_t slot_sizes[CLUSTER_SLOTS] = {0}; + dictEntry *de; + kvstoreIterator kvs_it; + kvstoreIteratorInit(&kvs_it, db->keys); + while ((de = kvstoreIteratorNext(&kvs_it)) != NULL) { + int slot = kvstoreIteratorGetCurrentDictIndex(&kvs_it); + kvobj *kv = dictGetKV(de); + slot_sizes[slot] += kvobjAllocSize(kv); + } + kvstoreIteratorReset(&kvs_it); + + int num_slots = kvstoreNumDicts(db->keys); + for (int slot = 0; slot < num_slots; slot++) { + kvstoreDictMetadata *dictMeta = kvstoreGetDictMeta(db->keys, slot, 0); + size_t want = slot_sizes[slot]; + size_t have = dictMeta ? dictMeta->alloc_size : 0; + if (have == want) continue; + serverPanic("dbgAssertAllocSizePerSlot: slot=%d expected=%zu actual=%zu", + slot, want, have); + } +} + +/* Lookup a kvobj for read or write operations, or return NULL if the it is not + * found in the specified DB. This function implements the functionality of + * lookupKeyRead(), lookupKeyWrite() and their ...WithFlags() variants. + * + * link - If key found, return the link of the key. + * If key not found, return the bucket link, where the key should be added. + * Or NULL if dict wasn't allocated yet. + * + * Side-effects of calling this function: + * + * 1. A key gets expired if it reached it's TTL. + * 2. The key's last access time is updated. + * 3. The global keys hits/misses stats are updated (reported in INFO). + * 4. If keyspace notifications are enabled, a "keymiss" notification is fired. + * + * Flags change the behavior of this command: + * + * LOOKUP_NONE (or zero): No special flags are passed. + * LOOKUP_NOTOUCH: Don't alter the last access time of the key. + * LOOKUP_NONOTIFY: Don't trigger keyspace event on key miss. + * LOOKUP_NOSTATS: Don't increment key hits/misses counters. + * LOOKUP_WRITE: Prepare the key for writing (delete expired keys even on + * replicas, use separate keyspace stats and events (TODO)). + * LOOKUP_NOEXPIRE: Perform expiration check, but avoid deleting the key, + * so that we don't have to propagate the deletion. + * + * Note: this function also returns NULL if the key is logically expired but + * still existing, in case this is a replica and the LOOKUP_WRITE is not set. + * Even if the key expiry is master-driven, we can correctly report a key is + * expired on replicas even if the master is lagging expiring our key via DELs + * in the replication link. */ +kvobj *lookupKey(redisDb *db, robj *key, int flags, dictEntryLink *link) { + + kvobj *val = dbFindByLink(db, key->ptr, link); + + if (val) { + /* Forcing deletion of expired keys on a replica makes the replica + * inconsistent with the master. We forbid it on readonly replicas, but + * we have to allow it on writable replicas to make write commands + * behave consistently. + * + * It's possible that the WRITE flag is set even during a readonly + * command, since the command may trigger events that cause modules to + * perform additional writes. */ + int is_ro_replica = server.masterhost && server.repl_slave_ro; + int expire_flags = 0; + if (flags & LOOKUP_WRITE && !is_ro_replica) + expire_flags |= EXPIRE_FORCE_DELETE_EXPIRED; + if (flags & LOOKUP_NOEXPIRE) + expire_flags |= EXPIRE_AVOID_DELETE_EXPIRED; + if (flags & LOOKUP_ACCESS_EXPIRED) + expire_flags |= EXPIRE_ALLOW_ACCESS_EXPIRED; + if (flags & LOOKUP_ACCESS_TRIMMED) + expire_flags |= EXPIRE_ALLOW_ACCESS_TRIMMED; + if (expireIfNeeded(db, key, val, expire_flags) != KEY_VALID) { + /* The key is no longer valid. */ + val = NULL; + if (link) *link = NULL; + } + } + + if (val) { + /* Update the access time for the ageing algorithm. + * Don't do it if we have a saving child, as this will trigger + * a copy on write madness. */ + if (((flags & LOOKUP_NOTOUCH) == 0) && + (server.current_client && server.current_client->flags & CLIENT_NO_TOUCH) && + (server.executing_client && server.executing_client->cmd->proc != touchCommand)) + flags |= LOOKUP_NOTOUCH; + if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){ + if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { + updateLFU(val); + } else if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LRM)) { + /* LRM policy should NOT update timestamp on reads. */ + val->lru = LRU_CLOCK(); + } + } + + if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE))) + server.stat_keyspace_hits++; + /* TODO: Use separate hits stats for WRITE */ + } else { + if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE))) + notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id); + if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE))) + server.stat_keyspace_misses++; + /* TODO: Use separate misses stats and notify event for WRITE */ + } + + return val; +} + +/* Lookup a key for read operations, or return NULL if the key is not found + * in the specified DB. + * + * This API should not be used when we write to the key after obtaining + * the object linked to the key, but only for read only operations. + * + * This function is equivalent to lookupKey(). The point of using this function + * rather than lookupKey() directly is to indicate that the purpose is to read + * the key. */ +kvobj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { + serverAssert(!(flags & LOOKUP_WRITE)); + return lookupKey(db, key, flags, NULL); +} + +/* Like lookupKeyReadWithFlags(), but does not use any flag, which is the + * common case. */ +kvobj *lookupKeyRead(redisDb *db, robj *key) { + return lookupKeyReadWithFlags(db,key,LOOKUP_NONE); +} + +/* Lookup a key for write operations, and as a side effect, if needed, expires + * the key if its TTL is reached. It's equivalent to lookupKey() with the + * LOOKUP_WRITE flag added. + * + * Returns the linked value object if the key exists or NULL if the key + * does not exist in the specified DB. */ +kvobj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) { + return lookupKey(db, key, flags | LOOKUP_WRITE, NULL); +} + +kvobj *lookupKeyWrite(redisDb *db, robj *key) { + return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE); +} + +/* Like lookupKeyWrite(), but accepts ref to optional `link` + * + * link - If key found, updated to link the key. + * If key not found, updated to the bucket where the key should be added. + * If key not found and dict is empty, it is set to NULL + */ +kvobj *lookupKeyWriteWithLink(redisDb *db, robj *key, dictEntryLink *link) { + return lookupKey(db, key, LOOKUP_NONE | LOOKUP_WRITE, link); +} + +kvobj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) { + kvobj *kv = lookupKeyRead(c->db, key); + if (!kv) addReplyOrErrorObject(c, reply); + return kv; +} + +kvobj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { + kvobj *kv = lookupKeyWrite(c->db, key); + if (!kv) addReplyOrErrorObject(c, reply); + return kv; +} + +/* Add a key-value entry to the DB. + * + * A copy of 'key' is stored in the database. The caller must ensure the + * `key` is properly freed by calling decrRefcount(key). + * + * The value may (if its reference counter == 1) be reallocated and become + * invalid after a call to this function. The (possibly reallocated) value is + * stored in the database and the 'valref' pointer is updated to point to the + * new allocation. + * + * The reference counter of the value pointed to by valref is not incremented, + * so the caller should not free the value using decrRefcount after calling this + * function. + * + * link - Optional link to bucket where the key should be added. + * On return, get updated, by need, to the inserted key. + * + * keymeta - Defines metadata to be attached to the key. Including optional + * expiration and modules metadata to be copied (REQUIRED). + */ +kvobj *dbAddInternal(redisDb *db, robj *key, robj **valref, dictEntryLink *link, + const KeyMetaSpec *keymeta) +{ + int slot = getKeySlot(key->ptr); + dictEntryLink tmp = NULL; + if (link == NULL) link = &tmp; + robj *val = *valref; + kvobj *kv = kvobjSet(key->ptr, val, keymeta->metabits); + initObjectLRUOrLFU(kv); + kvstoreDictSetAtLink(db->keys, slot, kv, link, 1); + + /* Handle metadata (expiration and modules metadata) */ + if (keymeta->metabits) { + if (keymeta->metabits & KEY_META_MASK_EXPIRE) { + /* Expiry is always the first meta (from last) */ + long long expire = keymeta->meta[KEY_META_ID_MAX - 1]; + kvobj *newkv = setExpireByLink(NULL, db, key->ptr, expire, *link); + serverAssert(newkv == kv); + } + + /* memcpy modules metadata to beginning of kvobj */ + if (keymeta->metabits & KEY_META_MASK_MODULES) + /* Also trivial overwrite expire */ + memcpy(kvobjGetAllocPtr(kv), + keymeta->meta + KEY_META_ID_MAX - keymeta->numMeta, + keymeta->numMeta * sizeof(uint64_t)); + } + + signalKeyAsReady(db, key, kv->type); + notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id); + updateKeysizesHist(db, slot, kv->type, -1, getObjectLength(kv)); /* add hist */ + if (server.memory_tracking_per_slot) + updateSlotAllocSize(db, slot, 0, kvobjAllocSize(kv)); + *valref = kv; + return kv; +} + +/* Read dbAddInternal() comment */ +kvobj *dbAdd(redisDb *db, robj *key, robj **valref) { + KeyMetaSpec keyMetaEmpty; /* No metadata added */ + keyMetaSpecInit(&keyMetaEmpty); + return dbAddInternal(db, key, valref, NULL, &keyMetaEmpty); +} + +kvobj *dbAddByLink(redisDb *db, robj *key, robj **valref, dictEntryLink *link) { + KeyMetaSpec keyMetaEmpty; /* No metadata added */ + keyMetaSpecInit(&keyMetaEmpty); + return dbAddInternal(db, key, valref, link, &keyMetaEmpty); +} + +/* Returns key's hash slot when cluster mode is enabled, or 0 when disabled. + * The only difference between this function and getKeySlot, is that it's not using cached key slot from the current_client + * and always calculates CRC hash. + * This is useful when slot needs to be calculated for a key that user didn't request for, such as in case of eviction. */ +int calculateKeySlot(sds key) { + return server.cluster_enabled ? keyHashSlot(key, (int) sdslen(key)) : 0; +} + +/* Return slot-specific dictionary for key based on key's hash slot when cluster mode is enabled, else 0.*/ +int getKeySlot(sds key) { + if (!server.cluster_enabled) return 0; + /* This is performance optimization that uses pre-set slot id from the current command, + * in order to avoid calculation of the key hash. + * + * This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set. + * It only gets set during the execution of command under `call` method. Other flows requesting + * the key slot would fallback to calculateKeySlot. + */ + if (server.current_client && server.current_client->slot >= 0 && server.current_client->flags & CLIENT_EXECUTING_COMMAND) { + debugServerAssertWithInfo(server.current_client, NULL, + (int)keyHashSlot(key, (int)sdslen(key)) == server.current_client->slot); + return server.current_client->slot; + } + int slot = keyHashSlot(key, (int)sdslen(key)); + return slot; +} + +/* Return the slot of the key in the command. + * INVALID_CLUSTER_SLOT if no keys, CLUSTER_CROSSSLOT if cross slot, otherwise the slot number. */ +int getSlotFromCommand(struct redisCommand *cmd, robj **argv, int argc) { + if (!cmd || !server.cluster_enabled) return INVALID_CLUSTER_SLOT; + + /* Get the keys from the command */ + getKeysResult result = GETKEYS_RESULT_INIT; + getKeysFromCommand(cmd, argv, argc, &result); + + /* Extract slot from the keys result. */ + int slot = extractSlotFromKeysResult(argv, &result); + getKeysFreeResult(&result); + return slot; +} + +/* This is a special version of dbAdd() that is used only when loading + * keys from the RDB file: the key is passed as an SDS string that is + * copied by the function and freed by the caller. + * + * Moreover this function will not abort if the key is already busy, to + * give more control to the caller, nor will signal the key as ready + * since it is not useful in this context. + * + * If added to db, returns pointer to the object, Otherwise NULL is returned. + */ +kvobj *dbAddRDBLoad(redisDb *db, sds key, robj **valref, const KeyMetaSpec *keyMetaSpec) { + /* Add new kvobj to the db. */ + int slot = getKeySlot(key); + + dictEntryLink link, bucket; + link = kvstoreDictFindLink(db->keys, slot, key, &bucket); + + /* If already exists, return NULL */ + if (link != NULL) + return NULL; + + /* Create kvobj with metadata bits from KeyMetaSpec */ + robj *val = *valref; + kvobj *kv = kvobjSet(key, val, keyMetaSpec->metabits); + initObjectLRUOrLFU(kv); + kvstoreDictSetAtLink(db->keys, slot, kv, &bucket, 1); + + /* Handle metadata (expiration and modules metadata) */ + if (keyMetaSpec->metabits) { + if (keyMetaSpec->metabits & KEY_META_MASK_EXPIRE) { + /* Expiry is always the first meta (from last) */ + long long expire = keyMetaSpec->meta[KEY_META_ID_MAX - 1]; + kvobj *newkv = setExpireByLink(NULL, db, key, expire, bucket); + serverAssert(newkv == kv); + } + + /* memcpy modules metadata to beginning of kvobj */ + if (keyMetaSpec->metabits & KEY_META_MASK_MODULES) + memcpy(kvobjGetAllocPtr(kv), + keyMetaSpec->meta + KEY_META_ID_MAX - keyMetaSpec->numMeta, + keyMetaSpec->numMeta * sizeof(uint64_t)); + } + + updateKeysizesHist(db, slot, kv->type, -1, (int64_t) getObjectLength(kv)); + if (server.memory_tracking_per_slot) + updateSlotAllocSize(db, slot, 0, kvobjAllocSize(kv)); + return *valref = kv; +} + +/** + * Overwrite an existing key's value in db with a new value. + * + * - If the reference count of 'valref' is 1 the ownership of the value is + * transferred to this function. The value may be reallocated, potentially + * invalidating any external references to it. The (potentially reallocated) + * value is stored in the database, and the 'valref' pointer is updated to + * reflect the new allocation, if one occurs. + * - The reference counter of the value referenced by 'valref' is not incremented + * so the caller must refrain from releasing it using decrRefCount after this + * function is called. + * - This function does not modify the expire time of the existing key. + * - The 'overwrite' flag is an indication whether this is done as part of a + * complete replacement of their key, which can be thought as a deletion and + * replacement (in which case we need to emit deletion signals), or just an + * update of a value of an existing key (when false). + * - The `link` is optional, can save lookup, if provided. + */ +static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link, + int overwrite, int updateKeySizes, int keepTTL) { + int freeModuleMeta = 0; + robj *val = *valref; + int slot = getKeySlot(key->ptr); + size_t oldsize = 0; + if (!link) { + link = kvstoreDictFindLink(db->keys, slot, key->ptr, NULL); + serverAssertWithInfo(NULL, key, link != NULL); /* expected to exist */ + } + kvobj *old = dictGetKV(*link); + kvobj *kvNew; + + int64_t oldlen = (int64_t) getObjectLength(old); + int oldtype = old->type; + + /* if hash with HFEs, take care to remove from global HFE DS before attempting + * to manipulate and maybe free kvOld object */ + if (old->type == OBJ_HASH) + estoreRemove(db->subexpires, slot, old); + + long long oldExpire = getExpire(db, key->ptr, old); + + /* All metadata will be kept if not `overwrite` for the new object */ + uint32_t newKeyMetaBits = old->metabits; + /* clear expire if not keepTTL or no old expire */ + if ((!keepTTL) || (oldExpire == -1)) + newKeyMetaBits &= ~KEY_META_MASK_EXPIRE; + + if (overwrite) { + /* On overwrite, discard module metadata excluding expire if set */ + newKeyMetaBits &= KEY_META_MASK_EXPIRE; + /* RM_StringDMA may call dbUnshareStringValue which may free val, so we + * need to incr to retain old */ + incrRefCount(old); + + /* Free related metadata. Ignore builtin metadata (currently only expire) */ + if (getModuleMetaBits(old->metabits)) { + keyMetaOnUnlink(db, key, old); + freeModuleMeta = 1; + } + + /* Although the key is not really deleted from the database, we regard + * overwrite as two steps of unlink+add, so we still need to call the unlink + * callback of the module. */ + moduleNotifyKeyUnlink(key,old,db->id,DB_FLAG_KEY_OVERWRITE); + /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ + signalDeletedKeyAsReady(db,key,old->type); + decrRefCount(old); + /* Because of RM_StringDMA, old may be changed, so we need get old again */ + old = dictGetKV(*link); + } + if (server.memory_tracking_per_slot) + oldsize = kvobjAllocSize(old); + + if ((old->refcount == 1 && old->encoding != OBJ_ENCODING_EMBSTR) && + (val->refcount == 1 && val->encoding != OBJ_ENCODING_EMBSTR) && (!freeModuleMeta)) + { + /* Keep old object in the database. Just swap it's ptr, type and + * encoding with the content of val. */ + robj tmp = *old; + old->type = val->type; + old->encoding = val->encoding; + old->ptr = val->ptr; + val->type = tmp.type; + val->encoding = tmp.encoding; + val->ptr = tmp.ptr; + /* Set new to old to keep the old object. Set old to val to be freed below. */ + kvNew = old; + old = val; + + /* Handle TTL in the optimization path */ + if ((!keepTTL) && (oldExpire >= 0)) + removeExpire(db, key); + } else { + /* Replace the old value at its location in the key space. */ + val->lru = old->lru; + + kvNew = kvobjSet(key->ptr, val, newKeyMetaBits); + kvstoreDictSetAtLink(db->keys, slot, kvNew, &link, 0); + + /* if expiry replace the old value at its location in the expire space. */ + if (oldExpire != -1) { + if (keepTTL) { + kvobjSetExpire(kvNew, oldExpire); /* kvNew not reallocated here */ + dictEntryLink exLink = kvstoreDictFindLink(db->expires, slot, + key->ptr, NULL); + serverAssertWithInfo(NULL, key, exLink != NULL); + kvstoreDictSetAtLink(db->expires, slot, kvNew, &exLink, 0); + } else { + kvstoreDictDelete(db->expires, slot, key->ptr); + } + } + + if (newKeyMetaBits & KEY_META_MASK_MODULES) + keyMetaTransition(old, kvNew); + } + + /* Remove old key and add new key to KEYSIZES histogram */ + int64_t newlen = (int64_t) getObjectLength(kvNew); + if (updateKeySizes) { + /* Save one call if old and new are the same type */ + if (oldtype == kvNew->type) { + updateKeysizesHist(db, slot, oldtype, oldlen, newlen); + } else { + updateKeysizesHist(db, slot, oldtype, oldlen, -1); + updateKeysizesHist(db, slot, kvNew->type, -1, newlen); + } + } + + if (server.memory_tracking_per_slot) + updateSlotAllocSize(db, slot, oldsize, kvobjAllocSize(kvNew)); + + if (server.io_threads_num > 1 && old->encoding == OBJ_ENCODING_RAW) { + /* In multi-threaded mode, the OBJ_ENCODING_RAW string object usually is + * allocated in the IO thread, so we defer the free to the IO thread. + * Besides, we never free a string object in BIO threads, so, even with + * lazyfree-lazy-server-del enabled, a fallback to main thread freeing + * due to defer free failure doesn't go against the config intention. */ + tryDeferFreeClientObject(server.current_client, DEFERRED_OBJECT_TYPE_ROBJ, old); + } else if (server.lazyfree_lazy_server_del) { + freeObjAsync(key, old, db->id); + } else { + decrRefCount(old); + } + *valref = kvNew; +} + +/* Replace an existing key with a new value, we just replace value and don't + * emit any events */ +void dbReplaceValue(redisDb *db, robj *key, robj **valref, int updateKeySizes) { + dbSetValue(db, key, valref, NULL, 0, updateKeySizes, 1); +} + +/* Replace an existing key with a new value (don't emit any events) + * + * parameter 'link' is optional. If provided, saves lookup. + */ +void dbReplaceValueWithLink(redisDb *db, robj *key, robj **val, dictEntryLink link) { + dbSetValue(db, key, val, link, 0, 1, 1); +} + +/* High level Set operation. This function can be used in order to set + * a key, whatever it was existing or not, to a new object. + * + * 1) The value may be reallocated when adding it to the database. The value + * pointer 'valref' is updated to point to the reallocated object. The + * reference count of the value object is *not* incremented. + * 2) clients WATCHing for the destination key notified. + * 3) The expire time of the key is reset (the key is made persistent), + * unless 'SETKEY_KEEPTTL' is enabled in flags. + * 4) The key lookup can take place outside this interface outcome will be + * delivered with 'SETKEY_ALREADY_EXIST' or 'SETKEY_DOESNT_EXIST' + * + * All the new keys in the database should be created via this interface. + * The client 'c' argument may be set to NULL if the operation is performed + * in a context where there is no clear client performing the operation. */ +void setKey(client *c, redisDb *db, robj *key, robj **valref, int flags) { + setKeyByLink(c, db, key, valref, flags, NULL); +} + +/* Like setKey(), but accepts an optional link + * + * - If flags is set with SETKEY_ALREADY_EXIST, then `link` must be provided + * - If flags is set with SETKEY_DOESNT_EXIST, then `link` is optional. If + * provided, it will point to the bucket where the key should be added. + * - If flag is not set (0) then add or update key, and `link` must be NULL + * On return, link get updated, by need, to the inserted kvobj. + */ +void setKeyByLink(client *c, redisDb *db, robj *key, robj **valref, int flags, dictEntryLink *plink) { + dictEntryLink dummy = NULL, *link = plink ? plink : &dummy; + int exists; + kvobj *oldval = NULL; + + if (flags & SETKEY_ALREADY_EXIST) { + debugServerAssert((*link) != NULL); + oldval = dictGetKV(**link); + exists = 1; + } else if (flags & SETKEY_DOESNT_EXIST) { + /* link is optional */ + exists = 0; + } else { + /* Add or update key */ + oldval = lookupKeyWriteWithLink(db, key, link); + exists = oldval != NULL; + } + + if (exists) { + int oldtype = oldval->type; + int newtype = (*valref)->type; + + /* Update the value of an existing key */ + dbSetValue(db, key, valref, *link, 1, 1, flags & SETKEY_KEEPTTL); + + /* Notify keyspace events for override and type change */ + notifyKeyspaceEvent(NOTIFY_OVERWRITTEN, "overwritten", key, db->id); + if (oldtype != newtype) + notifyKeyspaceEvent(NOTIFY_TYPE_CHANGED, "type_changed", key, db->id); + } else { + /* Add the new key to the database */ + dbAddByLink(db, key, valref, link); + } + + /* Signal key modification and update LRM timestamp. */ + keyModified(c,db,key,*valref,!(flags & SETKEY_NO_SIGNAL)); +} + +/* During atomic slot migration, keys that are being imported are in an + * intermediate state. we cannot access them and therefore skip them. + * + * This callback function now is used by: + * - dbRandomKey + * - keysCommand + * - scanCommand + */ +static int accessKeysShouldSkipDictIndex(int didx) { + return !clusterCanAccessKeysInSlot(didx); +} + +/* Return a random key, in form of a Redis object. + * If there are no keys, NULL is returned. + * + * The function makes sure to return keys not already expired. */ +robj *dbRandomKey(redisDb *db) { + dictEntry *de; + int maxtries = 100; + int allvolatile = kvstoreSize(db->keys) == kvstoreSize(db->expires); + + while(1) { + robj *keyobj; + int randomSlot = kvstoreGetFairRandomDictIndex(db->keys, accessKeysShouldSkipDictIndex, 16, 1); + if (randomSlot == -1) return NULL; + de = kvstoreDictGetFairRandomKey(db->keys, randomSlot); + if (de == NULL) return NULL; + + kvobj *kv = dictGetKV(de); + sds key = kvobjGetKey(kv); + keyobj = createStringObject(key,sdslen(key)); + if (allvolatile && (server.masterhost || isPausedActions(PAUSE_ACTION_EXPIRE)) && --maxtries == 0) { + /* If the DB is composed only of keys with an expire set, + * it could happen that all the keys are already logically + * expired in the slave, so the function cannot stop because + * expireIfNeeded() is false, nor it can stop because + * dictGetFairRandomKey() returns NULL (there are keys to return). + * To prevent the infinite loop we do some tries, but if there + * are the conditions for an infinite loop, eventually we + * return a key name that may be already expired. */ + return keyobj; + } + if (expireIfNeeded(db, keyobj, kv, 0) != KEY_VALID) { + decrRefCount(keyobj); + continue; /* search for another key. This expired. */ + } + + return keyobj; + } +} + +/* Helper for sync and async delete. */ +int dbGenericDelete(redisDb *db, robj *key, int async, int flags) { + dictEntryLink link; + int table; + int slot = getKeySlot(key->ptr); + link = kvstoreDictTwoPhaseUnlinkFind(db->keys, slot, key->ptr, &table); + + if (link) { + kvobj *kv = dictGetKV(*link); + + int64_t oldlen = (int64_t) getObjectLength(kv); + int type = kv->type; + + /* If hash object with expiry on fields, remove it from HFE DS of DB */ + if (type == OBJ_HASH) + estoreRemove(db->subexpires, slot, kv); + + /* RM_StringDMA may call dbUnshareStringValue which may free kv, so we + * need to incr to retain kv */ + incrRefCount(kv); /* refcnt=1->2 */ + /* Metadata hook: notify unlink for key metadata cleanup. */ + if (getModuleMetaBits(kv->metabits)) keyMetaOnUnlink(db, key, kv); + /* Tells the module that the key has been unlinked from the database. */ + moduleNotifyKeyUnlink(key, kv, db->id, flags); + /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ + signalDeletedKeyAsReady(db,key,type); + /* We should call decr before freeObjAsync. If not, the refcount may be + * greater than 1, so freeObjAsync doesn't work */ + decrRefCount(kv); + + /* Because of dbUnshareStringValue, the val in db may change. */ + kv = dictGetKV(*link); + + /* if expirable, delete an entry from the expires dict is not decrRefCount of kvobj */ + if (kvobjGetExpire(kv) != -1) + kvstoreDictDelete(db->expires, slot, key->ptr); + + if (async) { + if (server.memory_tracking_per_slot) + updateSlotAllocSize(db, slot, kvobjAllocSize(kv), 0); + freeObjAsync(key, kv, db->id); + /* Set the key to NULL in the main dictionary. */ + kvstoreDictSetAtLink(db->keys, slot, NULL, &link, 0); + } + kvstoreDictTwoPhaseUnlinkFree(db->keys, slot, link, table); + + /* remove key from histogram */ + if(!(flags & DB_FLAG_NO_UPDATE_KEYSIZES)) + updateKeysizesHist(db, slot, type, oldlen, -1); + return 1; + } else { + return 0; + } +} + +/* Delete a key, value, and associated expiration entry if any, from the DB */ +int dbSyncDelete(redisDb *db, robj *key) { + return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED); +} + +/* Delete a key, value, and associated expiration entry if any, from the DB. If + * the value consists of many allocations, it may be freed asynchronously. */ +int dbAsyncDelete(redisDb *db, robj *key) { + return dbGenericDelete(db, key, 1, DB_FLAG_KEY_DELETED); +} + +/* This is a wrapper whose behavior depends on the Redis lazy free + * configuration. Deletes the key synchronously or asynchronously. */ +int dbDelete(redisDb *db, robj *key) { + return dbGenericDelete(db, key, server.lazyfree_lazy_server_del, DB_FLAG_KEY_DELETED); +} + +/* Similar to dbDelete(), but does not update the keysizes histogram. + * This is used when we want to delete a key without affecting the histogram, + * typically in cases where a command flow deletes elements from a collection + * and then deletes the collection itself. In such cases, using dbDelete() + * would incorrectly decrement bin #0. A corresponding test should be added + * to `info-keysizes.tcl`. */ +int dbDeleteSkipKeysizesUpdate(redisDb *db, robj *key) { + return dbGenericDelete(db, key, server.lazyfree_lazy_server_del, + DB_FLAG_KEY_DELETED | DB_FLAG_NO_UPDATE_KEYSIZES); +} + +/* Prepare the string object stored at 'key' to be modified destructively + * to implement commands like SETBIT or APPEND. + * + * An object is usually ready to be modified unless one of the two conditions + * are true: + * + * 1) The object 'o' is shared (refcount > 1), we don't want to affect + * other users. + * 2) The object encoding is not "RAW". + * + * If the object is found in one of the above conditions (or both) by the + * function, an unshared / not-encoded copy of the string object is stored + * at 'key' in the specified 'db'. Otherwise the object 'o' itself is + * returned. + * + * USAGE: + * + * The object 'o' is what the caller already obtained by looking up 'key' + * in 'db', the usage pattern looks like this: + * + * o = lookupKeyWrite(db,key); + * if (checkType(c,o,OBJ_STRING)) return; + * o = dbUnshareStringValue(db,key,o); + * + * At this point the caller is ready to modify the object, for example + * using an sdscat() call to append some data, or anything else. + */ +kvobj *dbUnshareStringValue(redisDb *db, robj *key, kvobj *kv) { + return dbUnshareStringValueByLink(db,key,kv,NULL); +} + +/* Like dbUnshareStringValue(), but accepts a optional link, + * which can be used if we already have one, thus saving the dbFind call. */ +kvobj *dbUnshareStringValueByLink(redisDb *db, robj *key, kvobj *o, dictEntryLink link) { + serverAssert(o->type == OBJ_STRING); + if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) { + robj *decoded = getDecodedObject(o); + o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr)); + decrRefCount(decoded); + dbReplaceValueWithLink(db, key, &o, link); + } + return o; +} + +/* Remove all keys from the database(s) structure. The dbarray argument + * may not be the server main DBs (could be a temporary DB). + * + * The dbnum can be -1 if all the DBs should be emptied, or the specified + * DB index if we want to empty only a single database. + * The function returns the number of keys removed from the database(s). */ +long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, + void(callback)(dict*)) +{ + long long removed = 0; + int startdb, enddb; + + if (dbnum == -1) { + startdb = 0; + enddb = server.dbnum-1; + } else { + startdb = enddb = dbnum; + } + + for (int j = startdb; j <= enddb; j++) { + removed += kvstoreSize(dbarray[j].keys); + if (async) { + emptyDbAsync(&dbarray[j]); + } else { + /* Destroy sub-expires before deleting the kv-objects since ebuckets + * data structure is embedded in the stored kv-objects. */ + estoreEmpty(dbarray[j].subexpires); + kvstoreEmpty(dbarray[j].keys, callback); + kvstoreEmpty(dbarray[j].expires, callback); + } + /* Because all keys of database are removed, reset average ttl. */ + dbarray[j].avg_ttl = 0; + dbarray[j].expires_cursor = 0; + } + + return removed; +} + +/* Remove all data (keys and functions) from all the databases in a + * Redis server. If callback is given the function is called from + * time to time to signal that work is in progress. + * + * The dbnum can be -1 if all the DBs should be flushed, or the specified + * DB number if we want to flush only a single Redis database number. + * + * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or + * EMPTYDB_ASYNC if we want the memory to be freed in a different thread + * and the function to return ASAP. EMPTYDB_NOFUNCTIONS can also be set + * to specify that we do not want to delete the functions. + * + * On success the function returns the number of keys removed from the + * database(s). Otherwise -1 is returned in the specific case the + * DB number is out of range, and errno is set to EINVAL. */ +long long emptyData(int dbnum, int flags, void(callback)(dict*)) { + int async = (flags & EMPTYDB_ASYNC); + int with_functions = !(flags & EMPTYDB_NOFUNCTIONS); + RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum}; + long long removed = 0; + + if (dbnum < -1 || dbnum >= server.dbnum) { + errno = EINVAL; + return -1; + } + + if (dbnum == -1 || dbnum == 0) + asmCancelTrimJobs(); + + /* Fire the flushdb modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, + REDISMODULE_SUBEVENT_FLUSHDB_START, + &fi); + + /* Make sure the WATCHed keys are affected by the FLUSH* commands. + * Note that we need to call the function while the keys are still + * there. */ + signalFlushedDb(dbnum, async, NULL); + + /* Empty redis database structure. */ + removed = emptyDbStructure(server.db, dbnum, async, callback); + + if (dbnum == -1) flushSlaveKeysWithExpireList(); + + if (with_functions) { + serverAssert(dbnum == -1); + functionsLibCtxClearCurrent(async); + } + + /* Also fire the end event. Note that this event will fire almost + * immediately after the start event if the flush is asynchronous. */ + moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, + REDISMODULE_SUBEVENT_FLUSHDB_END, + &fi); + + return removed; +} + +/* Initialize temporary db on replica for use during diskless replication. */ +redisDb *initTempDb(void) { + int slot_count_bits = 0; + int flags = KVSTORE_ALLOCATE_DICTS_ON_DEMAND; + if (server.cluster_enabled) { + slot_count_bits = CLUSTER_SLOT_MASK_BITS; + flags |= KVSTORE_FREE_EMPTY_DICTS; + } + redisDb *tempDb = zcalloc(sizeof(redisDb)*server.dbnum); + for (int i=0; i<server.dbnum; i++) { + tempDb[i].id = i; + tempDb[i].keys = kvstoreCreate(&kvstoreExType, &dbDictType, slot_count_bits, + flags); + tempDb[i].expires = kvstoreCreate(&kvstoreBaseType, &dbExpiresDictType, + slot_count_bits, flags); + tempDb[i].subexpires = estoreCreate(&subexpiresBucketsType, slot_count_bits); + } + + return tempDb; +} + +/* Discard tempDb, this can be slow (similar to FLUSHALL), but it's always async. */ +void discardTempDb(redisDb *tempDb) { + int async = 1; + + /* Release temp DBs. */ + emptyDbStructure(tempDb, -1, async, NULL); + for (int i=0; i<server.dbnum; i++) { + /* Destroy sub-expires before deleting the kv-objects since ebuckets + * data structure is embedded in the stored kv-objects. */ + estoreRelease(tempDb[i].subexpires); + kvstoreRelease(tempDb[i].keys); + kvstoreRelease(tempDb[i].expires); + } + + zfree(tempDb); +} + +int selectDb(client *c, int id) { + if (id < 0 || id >= server.dbnum) + return C_ERR; + c->db = &server.db[id]; + return C_OK; +} + +long long dbTotalServerKeyCount(void) { + long long total = 0; + int j; + for (j = 0; j < server.dbnum; j++) { + total += kvstoreSize(server.db[j].keys); + } + return total; +} + +/*----------------------------------------------------------------------------- + * Hooks for key space changes. + * + * Every time a key in the database is modified the function + * keyModified() is called. + * + * Every time a DB is flushed the function signalFlushDb() is called. + *----------------------------------------------------------------------------*/ + +/* Called when a key is modified to update LRM timestamp + * and optionally signal watchers/tracking clients. + * + * Arguments: + * - c: client (may be NULL if the key was modified out of a context of a client) + * - db: database containing the key + * - key: the key that was modified + * - val: the value object (if NULL, LRM won't be updated, e.g., for deleted keys) + * - signal: if true, trigger WATCH and client-side tracking invalidation + */ +void keyModified(client *c, redisDb *db, robj *key, robj *val, int signal) { + if (val) updateLRM(val); + if (signal) { + touchWatchedKey(db,key); + trackingInvalidateKey(c,key,1); + } +} + +void signalFlushedDb(int dbid, int async, slotRangeArray *slots) { + int startdb, enddb; + if (dbid == -1) { + startdb = 0; + enddb = server.dbnum-1; + } else { + startdb = enddb = dbid; + } + + for (int j = startdb; j <= enddb; j++) { + scanDatabaseForDeletedKeys(&server.db[j], NULL, slots); + touchAllWatchedKeysInDb(&server.db[j], NULL, slots); + } + + trackingInvalidateKeysOnFlush(async); + + /* Changes in this method may take place in swapMainDbWithTempDb as well, + * where we execute similar calls, but with subtle differences as it's + * not simply flushing db. */ +} + +/*----------------------------------------------------------------------------- + * Type agnostic commands operating on the key space + *----------------------------------------------------------------------------*/ + +/* Return the set of flags to use for the emptyData() call for FLUSHALL + * and FLUSHDB commands. + * + * sync: flushes the database in an sync manner. + * async: flushes the database in an async manner. + * no option: determine sync or async according to the value of lazyfree-lazy-user-flush. + * + * On success C_OK is returned and the flags are stored in *flags, otherwise + * C_ERR is returned and the function sends an error to the client. */ +int getFlushCommandFlags(client *c, int *flags) { + /* Parse the optional ASYNC option. */ + if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"sync")) { + *flags = EMPTYDB_NO_FLAGS; + } else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"async")) { + *flags = EMPTYDB_ASYNC; + } else if (c->argc == 1) { + *flags = server.lazyfree_lazy_user_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return C_ERR; + } + return C_OK; +} + +/* Flushes the whole server data set. */ +void flushAllDataAndResetRDB(int flags) { + server.dirty += emptyData(-1,flags,NULL); + if (server.child_type == CHILD_TYPE_RDB) killRDBChild(); + if (server.saveparamslen > 0) { + rdbSaveInfo rsi, *rsiptr; + rsiptr = rdbPopulateSaveInfo(&rsi); + rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE); + } + +#if defined(USE_JEMALLOC) + /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. + * for large databases, flushdb blocks for long anyway, so a bit more won't + * harm and this way the flush and purge will be synchronous. */ + if (!(flags & EMPTYDB_ASYNC)) { + /* Only clear the current thread cache. + * Ignore the return call since this will fail if the tcache is disabled. */ + je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0); + + jemalloc_purge(); + } +#endif +} + +/* CB function on blocking ASYNC FLUSH completion + * + * Utilized by commands SFLUSH, FLUSHALL and FLUSHDB. + */ +void flushallSyncBgDone(uint64_t client_id, void *userdata) { + slotRangeArray *slots = userdata; + client *c = lookupClientByID(client_id); + + /* Verify that client still exists and being blocked. */ + if (!(c && c->flags & CLIENT_BLOCKED)) { + slotRangeArrayFree(slots); + return; + } + + /* Update current_client (Called functions might rely on it) */ + client *old_client = server.current_client; + server.current_client = c; + + /* Don't update blocked_us since command was processed in bg by lazy_free thread */ + updateStatsOnUnblock(c, 0 /*blocked_us*/, elapsedUs(c->bstate.lazyfreeStartTime), 0); + + /* Only SFLUSH command pass user data pointer. */ + if (slots) + replySlotsFlushAndFree(c, slots); + else + addReply(c, shared.ok); + + /* mark client as unblocked */ + unblockClient(c, 1); + + if (c->flags & CLIENT_PENDING_COMMAND) { + c->flags &= ~CLIENT_PENDING_COMMAND; + /* The FLUSH command won't be reprocessed, FLUSH command is finished, but + * we still need to complete its full processing flow, including updating + * the replication offset. */ + commandProcessed(c); + } + + /* On flush completion, update the client's memory */ + updateClientMemUsageAndBucket(c); + + /* restore current_client */ + server.current_client = old_client; +} + +/* Common flush command implementation for FLUSHALL, FLUSHDB and SFLUSH. + * + * Return 1 indicates that flush SYNC is actually running in bg as blocking ASYNC + * Return 0 otherwise + * + * slots - provided only by SFLUSH command, otherwise NULL. Will be used on + * completion to reply with the slots flush result. Ownership is passed + * to the completion job in case of `blocking_async`. + */ +int flushCommandCommon(client *c, int type, int flags, slotRangeArray *slots) { + int blocking_async = 0; /* Flush SYNC option to run as blocking ASYNC */ + + /* in case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */ + if ((!(flags & EMPTYDB_ASYNC)) && (!(c->flags & CLIENT_AVOID_BLOCKING_ASYNC_FLUSH))) { + /* Run as ASYNC */ + flags |= EMPTYDB_ASYNC; + blocking_async = 1; + } + + /* Cancel all ASM tasks that overlap with the given slot ranges. */ + clusterAsmCancelBySlotRangeArray(slots, c->argv[0]->ptr); + + if (type == FLUSH_TYPE_ALL) + flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS); + else + server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL); + + /* Without the forceCommandPropagation, when DB(s) was already empty, + * FLUSHALL\FLUSHDB will not be replicated nor put into the AOF. */ + forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF); + + /* if blocking ASYNC, block client and add completion job request to BIO lazyfree + * worker's queue. To be called and reply with OK only after all preceding pending + * lazyfree jobs in queue were processed */ + if (blocking_async) { + /* measure bg job till completion as elapsed time of flush command */ + elapsedStart(&c->bstate.lazyfreeStartTime); + + c->bstate.timeout = 0; + /* We still need to perform cleanup operations for the command, including + * updating the replication offset, so mark this command as pending to + * avoid command from being reset during unblock. */ + c->flags |= CLIENT_PENDING_COMMAND; + blockClient(c,BLOCKED_LAZYFREE); + bioCreateCompRq(BIO_WORKER_LAZY_FREE, flushallSyncBgDone, c->id, slots); + } + +#if defined(USE_JEMALLOC) + /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. + * for large databases, flushdb blocks for long anyway, so a bit more won't + * harm and this way the flush and purge will be synchronous. + * + * Take care purge only FLUSHDB for sync flow. FLUSHALL sync flow already + * applied at flushAllDataAndResetRDB. Async flow will apply only later on */ + if ((type != FLUSH_TYPE_ALL) && (!(flags & EMPTYDB_ASYNC))) { + /* Only clear the current thread cache. + * Ignore the return call since this will fail if the tcache is disabled. */ + je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0); + + jemalloc_purge(); + } +#endif + return blocking_async; +} + +/* FLUSHALL [SYNC|ASYNC] + * + * Flushes the whole server data set. */ +void flushallCommand(client *c) { + int flags; + if (getFlushCommandFlags(c,&flags) == C_ERR) return; + + /* If FLUSH SYNC isn't running as blocking async, then reply */ + if (flushCommandCommon(c, FLUSH_TYPE_ALL, flags, NULL) == 0) + addReply(c, shared.ok); +} + +/* FLUSHDB [SYNC|ASYNC] + * + * Flushes the currently SELECTed Redis DB. */ +void flushdbCommand(client *c) { + int flags; + if (getFlushCommandFlags(c,&flags) == C_ERR) return; + + /* If FLUSH SYNC isn't running as blocking async, then reply */ + if (flushCommandCommon(c, FLUSH_TYPE_DB,flags, NULL) == 0) + addReply(c, shared.ok); + +} + +/* This command implements DEL and UNLINK. */ +void delGenericCommand(client *c, int lazy) { + int numdel = 0, j; + + for (j = 1; j < c->argc; j++) { + if (expireIfNeeded(c->db, c->argv[j], NULL, 0) == KEY_DELETED) + continue; + int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) : + dbSyncDelete(c->db,c->argv[j]); + if (deleted) { + keyModified(c,c->db,c->argv[j],NULL,1); + notifyKeyspaceEvent(NOTIFY_GENERIC, + "del",c->argv[j],c->db->id); + server.dirty++; + numdel++; + } + } + addReplyLongLong(c,numdel); +} + +void delCommand(client *c) { + delGenericCommand(c,server.lazyfree_lazy_user_del); +} + +/* DELEX key [IFEQ match-value|IFNE match-value|IFDEQ match-digest|IFDNE match-digest] + * + * Conditionally removes the specified key. A key is ignored if it does not + * exist. + * If no condition is specified the behavior is the same as DEL command. + * If condition is specified the key must be of STRING type. + * + * IFEQ/IFNE conditions check the match-value against the value of the key + * IFDEQ/IFDNE conditions check the match-digest against the digest of the key's value.*/ +void delexCommand(client *c) { + kvobj *o; + int deleted = 0, should_delete = 0; + + /* If there are no conditions specified we just delete the key */ + if (c->argc == 2) { + delGenericCommand(c, server.lazyfree_lazy_server_del); + return; + } + + /* If we have more than two arguments the next two are condition and + * match-value */ + if (c->argc != 4) { + addReplyErrorArity(c); + return; + } + + robj *key = c->argv[1]; + o = lookupKeyRead(c->db, key); + if (o == NULL) { + addReplyLongLong(c, 0); + return; + } + + /* If any conditions are specified the only supported key type for now is + * string */ + if (o->type != OBJ_STRING) { + addReplyError(c, "Key should be of string type if conditions are specified"); + return; + } + + char *condition = c->argv[2]->ptr; + if (!strcasecmp("ifeq", condition)) { + robj *valueobj = getDecodedObject(o); + sds match_value = c->argv[3]->ptr; + if (sdscmp(valueobj->ptr, match_value) == 0) + should_delete = 1; + + decrRefCount(valueobj); + } else if (!strcasecmp("ifne", condition)) { + robj *valueobj = getDecodedObject(o); + sds match_value = c->argv[3]->ptr; + if (sdscmp(valueobj->ptr, match_value) != 0) + should_delete = 1; + + decrRefCount(valueobj); + } else if (!strcasecmp("ifdeq", condition)) { + if (validateHexDigest(c, c->argv[3]->ptr) != C_OK) + return; + + sds current_digest = stringDigest(o); + if (strcasecmp(current_digest, c->argv[3]->ptr) == 0) + should_delete = 1; + + sdsfree(current_digest); + } else if (!strcasecmp("ifdne", condition)) { + if (validateHexDigest(c, c->argv[3]->ptr) != C_OK) + return; + + sds current_digest = stringDigest(o); + if (strcasecmp(current_digest, c->argv[3]->ptr) != 0) + should_delete = 1; + + sdsfree(current_digest); + } else { + addReplyError(c, "Invalid condition. Use IFEQ, IFNE, IFDEQ, or IFDNE"); + return; + } + + if (should_delete) { + deleted = server.lazyfree_lazy_server_del ? + dbAsyncDelete(c->db, key) : + dbSyncDelete(c->db, key); + } + + if (deleted) { + rewriteClientCommandVector(c, 2, shared.del, key); + keyModified(c, c->db, key, NULL, 1); + notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id); + server.dirty++; + } + + addReplyLongLong(c, deleted); +} + +void unlinkCommand(client *c) { + delGenericCommand(c,1); +} + +/* EXISTS key1 key2 ... key_N. + * Return value is the number of keys existing. */ +void existsCommand(client *c) { + long long count = 0; + int j; + + for (j = 1; j < c->argc; j++) { + if (lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH)) count++; + } + addReplyLongLong(c,count); +} + +void selectCommand(client *c) { + int id; + + if (getIntFromObjectOrReply(c, c->argv[1], &id, NULL) != C_OK) + return; + + if (server.cluster_enabled && id != 0) { + addReplyError(c,"SELECT is not allowed in cluster mode"); + return; + } + + if (id != 0) { + server.stat_cluster_incompatible_ops++; + } + + if (selectDb(c,id) == C_ERR) { + addReplyError(c,"DB index is out of range"); + } else { + addReply(c,shared.ok); + } +} + +void randomkeyCommand(client *c) { + robj *key; + + if ((key = dbRandomKey(c->db)) == NULL) { + addReplyNull(c); + return; + } + + addReplyBulk(c,key); + decrRefCount(key); +} + +void keysCommand(client *c) { + dictEntry *de; + sds pattern = c->argv[1]->ptr; + int plen = sdslen(pattern), allkeys, pslot = -1; + unsigned long numkeys = 0; + void *replylen = addReplyDeferredLen(c); + allkeys = (pattern[0] == '*' && plen == 1); + if (server.cluster_enabled && !allkeys) { + pslot = patternHashSlot(pattern, plen); + } + int has_slot = pslot != -1; + union { + kvstoreDictIterator kvs_di; + kvstoreIterator kvs_it; + } it; + if (has_slot) { + if (!kvstoreDictSize(c->db->keys, pslot) || accessKeysShouldSkipDictIndex(pslot)) { + /* Requested slot is empty */ + setDeferredArrayLen(c,replylen,0); + return; + } + kvstoreInitDictSafeIterator(&it.kvs_di, c->db->keys, pslot); + } else { + kvstoreIteratorInit(&it.kvs_it, c->db->keys); + } + + while ((de = has_slot ? kvstoreDictIteratorNext(&it.kvs_di) : kvstoreIteratorNext(&it.kvs_it)) != NULL) { + if (!has_slot && accessKeysShouldSkipDictIndex(kvstoreIteratorGetCurrentDictIndex(&it.kvs_it))) { + continue; + } + + kvobj *kv = dictGetKV(de); + sds key = kvobjGetKey(kv); + + if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) { + if (!keyIsExpired(c->db, NULL, kv)) { + addReplyBulkCBuffer(c, key, sdslen(key)); + numkeys++; + } + } + if (c->flags & CLIENT_CLOSE_ASAP) + break; + } + if (has_slot) + kvstoreResetDictIterator(&it.kvs_di); + else + kvstoreIteratorReset(&it.kvs_it); + setDeferredArrayLen(c,replylen,numkeys); +} + +/* Data used by the dict scan callback. */ +typedef struct { + list *keys; /* elements that collect from dict */ + robj *o; /* o must be a hash/set/zset object, NULL means current db */ + long long type; /* the particular type when scan the db */ + sds pattern; /* pattern string, NULL means no pattern */ + long sampled; /* cumulative number of keys sampled */ + int no_values; /* set to 1 means to return keys only */ + sds typename; /* typename string, NULL means no type filter */ + redisDb *db; /* database reference for expiration checks */ +} scanData; + +/* Helper function to compare key type in scan commands */ +int objectTypeCompare(robj *o, long long target) { + if (o->type != OBJ_MODULE) { + if (o->type != target) + return 0; + else + return 1; + } + /* module type compare */ + moduleType *type = ((moduleValue *)o->ptr)->type; + long long mt = (long long)REDISMODULE_TYPE_SIGN(type->entity.id); + if (target != -mt) + return 0; + else + return 1; +} +/* This callback is used by scanGenericCommand in order to collect elements + * returned by the dictionary iterator into a list. */ +void scanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) { + UNUSED(plink); + Entry *hashEntry = NULL; + scanData *data = (scanData *)privdata; + list *keys = data->keys; + robj *o = data->o; + sds val = NULL; + void *key = NULL; /* if OBJ_HASH then key is of type `hfield`. Otherwise, `sds` */ + void *keyStr; + data->sampled++; + + /* o and typename can not have values at the same time. */ + serverAssert(!((data->type != LLONG_MAX) && o)); + + kvobj *kv = NULL; + zskiplistNode *znode = NULL; + if (!o) { /* If scanning keyspace */ + kv = dictGetKV(de); + keyStr = kvobjGetKey(kv); + } else if (o->type == OBJ_HASH) { + hashEntry = dictGetKey(de); + keyStr = entryGetField(hashEntry); + } else if (o->type == OBJ_ZSET) { + znode = dictGetKey(de); + keyStr = zslGetNodeElement(znode); + } else { + keyStr = dictGetKey(de); + } + + /* Filter element if it does not match the pattern. */ + if (data->pattern) { + if (!stringmatchlen(data->pattern, sdslen(data->pattern), keyStr, sdslen(keyStr), 0)) { + return; + } + } + + if (!o) { + /* Expiration check first - only for database keyspace scanning. + * Use kv obj to avoid robj creation. */ + if (expireIfNeeded(data->db, NULL, kv, 0) != KEY_VALID) + return; + + /* Type filtering - only for database keyspace scanning */ + if (data->typename) { + /* For unknown types (LLONG_MAX), skip all keys */ + if (data->type == LLONG_MAX) + return; + /* For known types, skip keys that don't match */ + if (!objectTypeCompare(kv, data->type)) + return; + } + } + + if (o == NULL) { + key = keyStr; + } else if (o->type == OBJ_SET) { + key = keyStr; + } else if (o->type == OBJ_HASH) { + key = keyStr; + val = entryGetValue(hashEntry); + + /* If field is expired, then ignore */ + if (entryIsExpired(hashEntry)) + return; + + } else if (o->type == OBJ_ZSET) { + char buf[MAX_LONG_DOUBLE_CHARS]; + int len = ld2string(buf, sizeof(buf), znode->score, LD_STR_AUTO); + key = sdsdup(keyStr); + val = sdsnewlen(buf, len); + } else { + serverPanic("Type not handled in SCAN callback."); + } + + listAddNodeTail(keys, key); + if (val && !data->no_values) listAddNodeTail(keys, val); +} + +/* Try to parse a SCAN cursor stored at object 'o': + * if the cursor is valid, store it as unsigned integer into *cursor and + * returns C_OK. Otherwise return C_ERR and send an error to the + * client. */ +int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor) { + if (!string2ull(o->ptr, cursor)) { + addReplyError(c, "invalid cursor"); + return C_ERR; + } + return C_OK; +} + +char *obj_type_name[OBJ_TYPE_MAX] = { + "string", + "list", + "set", + "zset", + "hash", + NULL, /* module type is special */ + "stream" +}; + +/* Helper function to get type from a string in scan commands */ +long long getObjectTypeByName(char *name) { + + for (long long i = 0; i < OBJ_TYPE_MAX; i++) { + if (obj_type_name[i] && !strcasecmp(name, obj_type_name[i])) { + return i; + } + } + + moduleType *mt = moduleTypeLookupModuleByNameIgnoreCase(name); + if (mt != NULL) return -(REDISMODULE_TYPE_SIGN(mt->entity.id)); + + return LLONG_MAX; +} + +char *getObjectTypeName(robj *o) { + if (o == NULL) { + return "none"; + } + + serverAssert(o->type >= 0 && o->type < OBJ_TYPE_MAX); + + if (o->type == OBJ_MODULE) { + moduleValue *mv = o->ptr; + return mv->type->entity.name; + } else { + return obj_type_name[o->type]; + } +} + +static int scanShouldSkipDict(dict *d, int didx) { + UNUSED(d); + return accessKeysShouldSkipDictIndex(didx); +} + +/* This command implements SCAN, HSCAN and SSCAN commands. + * If object 'o' is passed, then it must be a Hash, Set or Zset object, otherwise + * if 'o' is NULL the command will operate on the dictionary associated with + * the current database. + * + * When 'o' is not NULL the function assumes that the first argument in + * the client arguments vector is a key so it skips it before iterating + * in order to parse options. + * + * In the case of a Hash object the function returns both the field and value + * of every element on the Hash. */ +void scanGenericCommand(client *c, robj *o, unsigned long long cursor) { + int i, j; + listNode *node; + long count = 10; + sds pat = NULL; + sds typename = NULL; + long long type = LLONG_MAX; + int patlen = 0, use_pattern = 0, no_values = 0; + dict *ht; + + /* Object must be NULL (to iterate keys names), or the type of the object + * must be Set, Sorted Set, or Hash. */ + serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH || + o->type == OBJ_ZSET); + + /* Set i to the first option argument. The previous one is the cursor. */ + i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */ + + /* Step 1: Parse options. */ + while (i < c->argc) { + j = c->argc - i; + if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) { + if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL) + != C_OK) + { + return; + } + + if (count < 1) { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + + i += 2; + } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) { + pat = c->argv[i+1]->ptr; + patlen = sdslen(pat); + + /* The pattern always matches if it is exactly "*", so it is + * equivalent to disabling it. */ + use_pattern = !(patlen == 1 && pat[0] == '*'); + + i += 2; + } else if (!strcasecmp(c->argv[i]->ptr, "type") && o == NULL && j >= 2) { + /* SCAN for a particular type only applies to the db dict */ + typename = c->argv[i+1]->ptr; + type = getObjectTypeByName(typename); + if (type == LLONG_MAX) { + /* TODO: uncomment in redis 8.0 + addReplyErrorFormat(c, "unknown type name '%s'", typename); + return; */ + } + i+= 2; + } else if (!strcasecmp(c->argv[i]->ptr, "novalues")) { + if (!o || o->type != OBJ_HASH) { + addReplyError(c, "NOVALUES option can only be used in HSCAN"); + return; + } + no_values = 1; + i++; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + + /* Step 2: Iterate the collection. + * + * Note that if the object is encoded with a listpack, intset, or any other + * representation that is not a hash table, we are sure that it is also + * composed of a small number of elements. So to avoid taking state we + * just return everything inside the object in a single call, setting the + * cursor to zero to signal the end of the iteration. */ + + /* Handle the case of a hash table. */ + ht = NULL; + if (o == NULL) { + ht = NULL; + } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) { + ht = o->ptr; + } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) { + ht = o->ptr; + } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) { + zset *zs = o->ptr; + ht = zs->dict; + } + + list *keys = listCreate(); + /* Set a free callback for the contents of the collected keys list. + * For the main keyspace dict, and when we scan a key that's dict encoded + * (we have 'ht'), we don't need to define free method because the strings + * in the list are just a shallow copy from the pointer in the dictEntry. + * When scanning a key with other encodings (e.g. listpack), we need to + * free the temporary strings we add to that list. + * The exception to the above is ZSET, where we do allocate temporary + * strings even when scanning a dict. */ + if (o && (!ht || o->type == OBJ_ZSET)) { + listSetFreeMethod(keys, sdsfreegeneric); + } + + /* For main dictionary scan or data structure using hashtable. */ + if (!o || ht) { + /* We set the max number of iterations to ten times the specified + * COUNT, so if the hash table is in a pathological state (very + * sparsely populated) we avoid to block too much time at the cost + * of returning no or very few elements. */ + long maxiterations = count*10; + + /* We pass scanData which have three pointers to the callback: + * 1. data.keys: the list to which it will add new elements; + * 2. data.o: the object containing the dictionary so that + * it is possible to fetch more data in a type-dependent way; + * 3. data.type: the specified type scan in the db, LLONG_MAX means + * type matching is no needed; + * 4. data.pattern: the pattern string; + * 5. data.sampled: the maxiteration limit is there in case we're + * working on an empty dict, one with a lot of empty buckets, and + * for the buckets are not empty, we need to limit the spampled number + * to prevent a long hang time caused by filtering too many keys; + * 6. data.no_values: to control whether values will be returned or + * only keys are returned. */ + scanData data = { + .keys = keys, + .o = o, + .type = type, + .pattern = use_pattern ? pat : NULL, + .sampled = 0, + .no_values = no_values, + .typename = typename, + .db = c->db, + }; + + /* A pattern may restrict all matching keys to one cluster slot. */ + int onlydidx = -1; + if (o == NULL && use_pattern && server.cluster_enabled) { + onlydidx = patternHashSlot(pat, patlen); + } + do { + /* In cluster mode there is a separate dictionary for each slot. + * If cursor is empty, we should try exploring next non-empty slot. */ + if (o == NULL) { + cursor = kvstoreScan(c->db->keys, cursor, onlydidx, scanCallback, scanShouldSkipDict, &data); + } else { + cursor = dictScan(ht, cursor, scanCallback, &data); + } + } while (cursor && maxiterations-- && data.sampled < count); + } else if (o->type == OBJ_SET) { + unsigned long array_reply_len = 0; + void *replylen = NULL; + listRelease(keys); + char *str; + char buf[LONG_STR_SIZE]; + size_t len; + int64_t llele; + /* Reply to the client. */ + addReplyArrayLen(c, 2); + /* Cursor is always 0 given we iterate over all set */ + addReplyBulkLongLong(c,0); + /* If there is no pattern the length is the entire set size, otherwise we defer the reply size */ + if (use_pattern) + replylen = addReplyDeferredLen(c); + else { + array_reply_len = setTypeSize(o); + addReplyArrayLen(c, array_reply_len); + } + + setTypeIterator si; + unsigned long cur_length = 0; + setTypeInitIterator(&si, o); + while (setTypeNext(&si, &str, &len, &llele) != -1) { + if (str == NULL) { + len = ll2string(buf, sizeof(buf), llele); + } + char *key = str ? str : buf; + if (use_pattern && !stringmatchlen(pat, patlen, key, len, 0)) { + continue; + } + addReplyBulkCBuffer(c, key, len); + cur_length++; + } + setTypeResetIterator(&si); + if (use_pattern) + setDeferredArrayLen(c,replylen,cur_length); + else + serverAssert(cur_length == array_reply_len); /* fail on corrupt data */ + return; + } else if ((o->type == OBJ_HASH || o->type == OBJ_ZSET) && + o->encoding == OBJ_ENCODING_LISTPACK) + { + unsigned char *p = lpFirst(o->ptr); + unsigned char *str; + int64_t len; + unsigned long array_reply_len = 0; + unsigned char intbuf[LP_INTBUF_SIZE]; + void *replylen = NULL; + listRelease(keys); + + /* Reply to the client. */ + addReplyArrayLen(c, 2); + /* Cursor is always 0 given we iterate over all set */ + addReplyBulkLongLong(c,0); + /* If there is no pattern the length is the entire set size, otherwise we defer the reply size */ + if (use_pattern) + replylen = addReplyDeferredLen(c); + else { + array_reply_len = o->type == OBJ_HASH ? hashTypeLength(o, 0) : zsetLength(o); + if (!no_values) { + array_reply_len *= 2; + } + addReplyArrayLen(c, array_reply_len); + } + unsigned long cur_length = 0; + while(p) { + str = lpGet(p, &len, intbuf); + /* point to the value */ + p = lpNext(o->ptr, p); + if (use_pattern && !stringmatchlen(pat, patlen, (char *)str, len, 0)) { + /* jump to the next key/val pair */ + p = lpNext(o->ptr, p); + continue; + } + /* add key object */ + addReplyBulkCBuffer(c, str, len); + cur_length++; + /* add value object */ + if (!no_values) { + str = lpGet(p, &len, intbuf); + addReplyBulkCBuffer(c, str, len); + cur_length++; + } + p = lpNext(o->ptr, p); + } + if (use_pattern) + setDeferredArrayLen(c,replylen,cur_length); + else + serverAssert(cur_length == array_reply_len); /* fail on corrupt data */ + return; + } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_LISTPACK_EX) { + int64_t len; + long long expire_at; + unsigned char *lp = hashTypeListpackGetLp(o); + unsigned char *p = lpFirst(lp); + unsigned char *str, *val; + unsigned char intbuf[LP_INTBUF_SIZE]; + void *replylen = NULL; + + listRelease(keys); + /* Reply to the client. */ + addReplyArrayLen(c, 2); + /* Cursor is always 0 given we iterate over all set */ + addReplyBulkLongLong(c,0); + /* In the case of OBJ_ENCODING_LISTPACK_EX we always defer the reply size given some fields might be expired */ + replylen = addReplyDeferredLen(c); + unsigned long cur_length = 0; + + while (p) { + str = lpGet(p, &len, intbuf); + p = lpNext(lp, p); + val = p; /* Keep pointer to value */ + + p = lpNext(lp, p); + serverAssert(p && lpGetIntegerValue(p, &expire_at)); + + if (hashTypeIsExpired(o, expire_at) || + (use_pattern && !stringmatchlen(pat, patlen, (char *)str, len, 0))) + { + /* jump to the next key/val pair */ + p = lpNext(lp, p); + continue; + } + + /* add key object */ + addReplyBulkCBuffer(c, str, len); + cur_length++; + /* add value object */ + if (!no_values) { + str = lpGet(val, &len, intbuf); + addReplyBulkCBuffer(c, str, len); + cur_length++; + } + p = lpNext(lp, p); + } + setDeferredArrayLen(c,replylen,cur_length); + return; + } else { + serverPanic("Not handled encoding in SCAN."); + } + + /* Step 3: Reply to the client. */ + addReplyArrayLen(c, 2); + addReplyBulkLongLong(c,cursor); + + addReplyArrayLen(c, listLength(keys)); + while ((node = listFirst(keys)) != NULL) { + void *key = listNodeValue(node); + addReplyBulkCBuffer(c, key, sdslen(key)); + listDelNode(keys, node); + } + + listRelease(keys); +} + +/* The SCAN command completely relies on scanGenericCommand. */ +void scanCommand(client *c) { + unsigned long long cursor; + if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return; + scanGenericCommand(c,NULL,cursor); +} + +void dbsizeCommand(client *c) { + addReplyLongLong(c,dbSize(c->db)); +} + +void lastsaveCommand(client *c) { + addReplyLongLong(c,server.lastsave); +} + +void typeCommand(client *c) { + kvobj *kv = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH); + addReplyStatus(c, getObjectTypeName(kv)); +} + +void shutdownCommand(client *c) { + int flags = SHUTDOWN_NOFLAGS; + int abort = 0; + for (int i = 1; i < c->argc; i++) { + if (!strcasecmp(c->argv[i]->ptr,"nosave")) { + flags |= SHUTDOWN_NOSAVE; + } else if (!strcasecmp(c->argv[i]->ptr,"save")) { + flags |= SHUTDOWN_SAVE; + } else if (!strcasecmp(c->argv[i]->ptr, "now")) { + flags |= SHUTDOWN_NOW; + } else if (!strcasecmp(c->argv[i]->ptr, "force")) { + flags |= SHUTDOWN_FORCE; + } else if (!strcasecmp(c->argv[i]->ptr, "abort")) { + abort = 1; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + if ((abort && flags != SHUTDOWN_NOFLAGS) || + (flags & SHUTDOWN_NOSAVE && flags & SHUTDOWN_SAVE)) + { + /* Illegal combo. */ + addReplyErrorObject(c,shared.syntaxerr); + return; + } + + if (abort) { + if (abortShutdown() == C_OK) + addReply(c, shared.ok); + else + addReplyError(c, "No shutdown in progress."); + return; + } + + if (!(flags & SHUTDOWN_NOW) && c->flags & CLIENT_DENY_BLOCKING) { + addReplyError(c, "SHUTDOWN without NOW or ABORT isn't allowed for DENY BLOCKING client"); + return; + } + + if (!(flags & SHUTDOWN_NOSAVE) && isInsideYieldingLongCommand()) { + /* Script timed out. Shutdown allowed only with the NOSAVE flag. See + * also processCommand where these errors are returned. */ + if (server.busy_module_yield_flags && server.busy_module_yield_reply) { + addReplyErrorFormat(c, "-BUSY %s", server.busy_module_yield_reply); + } else if (server.busy_module_yield_flags) { + addReplyErrorObject(c, shared.slowmoduleerr); + } else if (scriptIsEval()) { + addReplyErrorObject(c, shared.slowevalerr); + } else { + addReplyErrorObject(c, shared.slowscripterr); + } + return; + } + + blockClientShutdown(c); + if (prepareForShutdown(flags) == C_OK) exit(0); + /* If we're here, then shutdown is ongoing (the client is still blocked) or + * failed (the client has received an error). */ +} + +void renameGenericCommand(client *c, int nx) { + kvobj *o; + int samekey = 0; + uint64_t minHashExpireTime = EB_EXPIRE_TIME_INVALID; + + /* When source and dest key is the same, no operation is performed, + * if the key exists, however we still return an error on unexisting key. */ + if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1; + + if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL) + return; + + if (samekey) { + addReply(c,nx ? shared.czero : shared.ok); + return; + } + + incrRefCount(o); + kvobj *destval = lookupKeyWrite(c->db,c->argv[2]); + int overwritten = 0; + int desttype = -1; + if (destval != NULL) { + if (nx) { + decrRefCount(o); + addReply(c,shared.czero); + return; + } + + /* Overwrite: delete the old key before creating the new one + * with the same name. */ + desttype = destval->type; + dbDelete(c->db,c->argv[2]); + overwritten = 1; + } + + /* If hash with expiration on fields then remove it from global HFE DS and + * keep next expiration time. Otherwise, dbDelete() will remove it from the + * global HFE DS and we will lose the expiration time. */ + int srctype = o->type; + if (srctype == OBJ_HASH) + minHashExpireTime = estoreRemove(c->db->subexpires, getKeySlot(c->argv[1]->ptr), o); + + /* Prepare metadata for the renamed key */ + KeyMetaSpec keymeta; + keyMetaSpecInit(&keymeta); + if (o->metabits) keyMetaOnRename(c->db, o, c->argv[1], c->argv[2], &keymeta); + + dbDelete(c->db,c->argv[1]); + + dbAddInternal(c->db, c->argv[2], &o, NULL, &keymeta); + + /* If hash with HFEs, register in DB subexpires */ + if (minHashExpireTime != EB_EXPIRE_TIME_INVALID) + estoreAdd(c->db->subexpires, getKeySlot(c->argv[2]->ptr), o, minHashExpireTime); + + keyModified(c,c->db,c->argv[1],NULL,1); + keyModified(c,c->db,c->argv[2],NULL,1); /* LRM already updated by dbAddInternal */ + notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", + c->argv[1],c->db->id); + notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to", + c->argv[2],c->db->id); + if (overwritten) { + notifyKeyspaceEvent(NOTIFY_OVERWRITTEN, "overwritten", c->argv[2], c->db->id); + if (desttype != srctype) + notifyKeyspaceEvent(NOTIFY_TYPE_CHANGED, "type_changed", c->argv[2], c->db->id); + } + server.dirty++; + addReply(c,nx ? shared.cone : shared.ok); +} + +void renameCommand(client *c) { + renameGenericCommand(c,0); +} + +void renamenxCommand(client *c) { + renameGenericCommand(c,1); +} + +void moveCommand(client *c) { + redisDb *src, *dst; + int srcid, dbid; + uint64_t hashExpireTime = EB_EXPIRE_TIME_INVALID; + + if (server.cluster_enabled) { + addReplyError(c,"MOVE is not allowed in cluster mode"); + return; + } + + /* Obtain source and target DB pointers */ + src = c->db; + srcid = c->db->id; + + if (getIntFromObjectOrReply(c, c->argv[2], &dbid, NULL) != C_OK) + return; + + if (selectDb(c,dbid) == C_ERR) { + addReplyError(c,"DB index is out of range"); + return; + } + dst = c->db; + selectDb(c,srcid); /* Back to the source DB */ + + /* If the user is moving using as target the same + * DB as the source DB it is probably an error. */ + if (src == dst) { + addReplyErrorObject(c,shared.sameobjecterr); + return; + } + + /* Record incompatible operations in cluster mode */ + server.stat_cluster_incompatible_ops++; + + /* Check if the element exists and get a reference */ + kvobj *kv = lookupKeyWrite(c->db,c->argv[1]); + if (!kv) { + addReply(c,shared.czero); + return; + } + + /* Return zero if the key already exists in the target DB */ + dictEntryLink dstBucket; + if (lookupKey(dst, c->argv[1], LOOKUP_WRITE, &dstBucket) != NULL) { + addReply(c,shared.czero); + return; + } + + int slot = getKeySlot(c->argv[1]->ptr); + + /* If hash with expiration on fields, remove it from DB subexpires and keep + * aside registered expiration time. Must be before removal of the + * object since it embeds ExpireMeta that is used by subexpires */ + if (kv->type == OBJ_HASH) + hashExpireTime = estoreRemove(src->subexpires, slot, kv); + + /* Move a side metadata before dbDelete() */ + KeyMetaSpec keymeta; + keyMetaSpecInit(&keymeta); + keyMetaOnMove(kv, c->argv[1], srcid, dbid, &keymeta); + + incrRefCount(kv); /* ref counter = 1->2 */ + dbDelete(src,c->argv[1]); /* ref counter = 2->1 */ + + dbAddInternal(dst, c->argv[1], &kv, &dstBucket, &keymeta); + + /* If object of type hash with expiration on fields. Taken care to add the + * hash to subexpires of `dst` only after dbDelete(). */ + if (hashExpireTime != EB_EXPIRE_TIME_INVALID) + estoreAdd(dst->subexpires, slot, kv, hashExpireTime); + + keyModified(c,src,c->argv[1],NULL,1); + keyModified(c,dst,c->argv[1],NULL,1); /* LRM already updated by dbAddInternal */ + notifyKeyspaceEvent(NOTIFY_GENERIC, + "move_from",c->argv[1],src->id); + notifyKeyspaceEvent(NOTIFY_GENERIC, + "move_to",c->argv[1],dst->id); + + server.dirty++; + addReply(c,shared.cone); +} + +void copyCommand(client *c) { + kvobj *o; + redisDb *src, *dst; + int srcid, dbid; + int j, replace = 0, delete = 0; + + /* Obtain source and target DB pointers + * Default target DB is the same as the source DB + * Parse the REPLACE option and targetDB option. */ + src = c->db; + dst = c->db; + srcid = c->db->id; + dbid = c->db->id; + for (j = 3; j < c->argc; j++) { + int additional = c->argc - j - 1; + if (!strcasecmp(c->argv[j]->ptr,"replace")) { + replace = 1; + } else if (!strcasecmp(c->argv[j]->ptr, "db") && additional >= 1) { + if (getIntFromObjectOrReply(c, c->argv[j+1], &dbid, NULL) != C_OK) + return; + + if (selectDb(c, dbid) == C_ERR) { + addReplyError(c,"DB index is out of range"); + return; + } + dst = c->db; + selectDb(c,srcid); /* Back to the source DB */ + j++; /* Consume additional arg. */ + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + } + + if ((server.cluster_enabled == 1) && (srcid != 0 || dbid != 0)) { + addReplyError(c,"Copying to another database is not allowed in cluster mode"); + return; + } + + /* If the user select the same DB as + * the source DB and using newkey as the same key + * it is probably an error. */ + robj *key = c->argv[1]; + robj *newkey = c->argv[2]; + if (src == dst && (sdscmp(key->ptr, newkey->ptr) == 0)) { + addReplyErrorObject(c,shared.sameobjecterr); + return; + } + + if (srcid != 0 || dbid != 0) { + server.stat_cluster_incompatible_ops++; + } + + /* Check if the element exists and get a reference */ + o = lookupKeyRead(c->db, key); + if (!o) { + addReply(c,shared.czero); + return; + } + + /* Return zero if the key already exists in the target DB. + * If REPLACE option is selected, delete newkey from targetDB. */ + kvobj *destval = lookupKeyWrite(dst,newkey); + if (destval != NULL) { + if (replace) { + delete = 1; + } else { + addReply(c,shared.czero); + return; + } + } + int destoldtype = destval ? destval->type : -1; + int destnewtype = o->type; + + /* Duplicate object according to object's type. */ + robj *newobj; + uint64_t minHashExpire = EB_EXPIRE_TIME_INVALID; /* HFE feature */ + switch(o->type) { + case OBJ_STRING: newobj = dupStringObject(o); break; + case OBJ_LIST: newobj = listTypeDup(o); break; + case OBJ_SET: newobj = setTypeDup(o); break; + case OBJ_ZSET: newobj = zsetDup(o); break; + case OBJ_HASH: newobj = hashTypeDup(o, &minHashExpire); break; + case OBJ_STREAM: newobj = streamDup(o); break; + case OBJ_MODULE: + newobj = moduleTypeDupOrReply(c, key, newkey, dst->id, o); + if (!newobj) return; + break; + default: + addReplyError(c, "unknown type object"); + return; + } + + if (delete) { + dbDelete(dst,newkey); + } + + /* Prepare metadata for the new key */ + KeyMetaSpec keymeta; + keyMetaSpecInit(&keymeta); + if (o->metabits) keyMetaOnCopy(o, key, newkey, c->db->id, dst->id, &keymeta); + + kvobj *kvCopy = dbAddInternal(dst, newkey, &newobj, NULL, &keymeta); + + /* If minExpiredField was set, then the object is hash with expiration + * on fields and need to register it in global HFE DS */ + if (minHashExpire != EB_EXPIRE_TIME_INVALID) + estoreAdd(dst->subexpires, getKeySlot(newkey->ptr), kvCopy, minHashExpire); + + /* OK! key copied. Signal modification (LRM already updated by dbAddInternal) */ + keyModified(c,dst,c->argv[2],NULL,1); + notifyKeyspaceEvent(NOTIFY_GENERIC,"copy_to",c->argv[2],dst->id); + + /* `delete` implies the destination key was overwritten */ + if (delete) { + notifyKeyspaceEvent(NOTIFY_OVERWRITTEN, "overwritten", c->argv[2], dst->id); + if (destoldtype != destnewtype) + notifyKeyspaceEvent(NOTIFY_TYPE_CHANGED, "type_changed", c->argv[2], dst->id); + } + + server.dirty++; + addReply(c,shared.cone); +} + +/* Helper function for dbSwapDatabases(): scans the list of keys that have + * one or more blocked clients for B[LR]POP or other blocking commands + * and signal the keys as ready if they are of the right type. See the comment + * where the function is used for more info. */ +void scanDatabaseForReadyKeys(redisDb *db) { + dictEntry *de; + dictIterator di; + dictInitSafeIterator(&di, db->blocking_keys); + while((de = dictNext(&di)) != NULL) { + robj *key = dictGetKey(de); + kvobj *kv = dbFind(db, key->ptr); + if (kv) + signalKeyAsReady(db, key, kv->type); + } + dictResetIterator(&di); +} + +/* Since we are unblocking XREADGROUP clients in the event the key was + * deleted/overwritten we must do the same in case the database was + * flushed/swapped. If 'slots' is not NULL, only keys in the specified slot + * range are considered. */ +void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with, slotRangeArray *slots) { + dictEntry *de; + dictIterator di; + + dictInitSafeIterator(&di, emptied->blocking_keys); + while((de = dictNext(&di)) != NULL) { + robj *key = dictGetKey(de); + /* Check if key belongs to the slot range. */ + if (slots && !slotRangeArrayContains(slots, keyHashSlot(key->ptr, sdslen(key->ptr)))) + continue; + int existed = 0, exists = 0; + int original_type = -1, curr_type = -1; + + kvobj *kv = dbFind(emptied, key->ptr); + if (kv) { + original_type = kv->type; + existed = 1; + } + + if (replaced_with) { + kv = dbFind(replaced_with, key->ptr); + if (kv) { + curr_type = kv->type; + exists = 1; + } + } + /* We want to try to unblock any client using a blocking XREADGROUP */ + if ((existed && !exists) || original_type != curr_type) + signalDeletedKeyAsReady(emptied, key, original_type); + } + dictResetIterator(&di); +} + +/* Swap two databases at runtime so that all clients will magically see + * the new database even if already connected. Note that the client + * structure c->db points to a given DB, so we need to be smarter and + * swap the underlying referenced structures, otherwise we would need + * to fix all the references to the Redis DB structure. + * + * Returns C_ERR if at least one of the DB ids are out of range, otherwise + * C_OK is returned. */ +int dbSwapDatabases(int id1, int id2) { + if (id1 < 0 || id1 >= server.dbnum || + id2 < 0 || id2 >= server.dbnum) return C_ERR; + if (id1 == id2) return C_OK; + redisDb aux = server.db[id1]; + redisDb *db1 = &server.db[id1], *db2 = &server.db[id2]; + + /* Swapdb should make transaction fail if there is any + * client watching keys */ + touchAllWatchedKeysInDb(db1, db2, NULL); + touchAllWatchedKeysInDb(db2, db1, NULL); + + /* Try to unblock any XREADGROUP clients if the key no longer exists. */ + scanDatabaseForDeletedKeys(db1, db2, NULL); + scanDatabaseForDeletedKeys(db2, db1, NULL); + + /* Swap hash tables. Note that we don't swap blocking_keys, + * ready_keys and watched_keys, since we want clients to + * remain in the same DB they were. */ + db1->keys = db2->keys; + db1->expires = db2->expires; + db1->subexpires = db2->subexpires; + db1->avg_ttl = db2->avg_ttl; + db1->expires_cursor = db2->expires_cursor; + + db2->keys = aux.keys; + db2->expires = aux.expires; + db2->subexpires = aux.subexpires; + db2->avg_ttl = aux.avg_ttl; + db2->expires_cursor = aux.expires_cursor; + + /* Now we need to handle clients blocked on lists: as an effect + * of swapping the two DBs, a client that was waiting for list + * X in a given DB, may now actually be unblocked if X happens + * to exist in the new version of the DB, after the swap. + * + * However normally we only do this check for efficiency reasons + * in dbAdd() when a list is created. So here we need to rescan + * the list of clients blocked on lists and signal lists as ready + * if needed. */ + scanDatabaseForReadyKeys(db1); + scanDatabaseForReadyKeys(db2); + return C_OK; +} + +/* Logically, this discards (flushes) the old main database, and apply the newly loaded + * database (temp) as the main (active) database, the actual freeing of old database + * (which will now be placed in the temp one) is done later. */ +void swapMainDbWithTempDb(redisDb *tempDb) { + for (int i=0; i<server.dbnum; i++) { + redisDb aux = server.db[i]; + redisDb *activedb = &server.db[i], *newdb = &tempDb[i]; + + /* Swapping databases should make transaction fail if there is any + * client watching keys. */ + touchAllWatchedKeysInDb(activedb, newdb, NULL); + + /* Try to unblock any XREADGROUP clients if the key no longer exists. */ + scanDatabaseForDeletedKeys(activedb, newdb, NULL); + + /* Swap hash tables. Note that we don't swap blocking_keys, + * ready_keys and watched_keys, since clients + * remain in the same DB they were. */ + activedb->keys = newdb->keys; + activedb->expires = newdb->expires; + activedb->subexpires = newdb->subexpires; + activedb->avg_ttl = newdb->avg_ttl; + activedb->expires_cursor = newdb->expires_cursor; + + newdb->keys = aux.keys; + newdb->expires = aux.expires; + newdb->subexpires = aux.subexpires; + newdb->avg_ttl = aux.avg_ttl; + newdb->expires_cursor = aux.expires_cursor; + + /* Now we need to handle clients blocked on lists: as an effect + * of swapping the two DBs, a client that was waiting for list + * X in a given DB, may now actually be unblocked if X happens + * to exist in the new version of the DB, after the swap. + * + * However normally we only do this check for efficiency reasons + * in dbAdd() when a list is created. So here we need to rescan + * the list of clients blocked on lists and signal lists as ready + * if needed. */ + scanDatabaseForReadyKeys(activedb); + } + + trackingInvalidateKeysOnFlush(1); + flushSlaveKeysWithExpireList(); +} + +/* SWAPDB db1 db2 */ +void swapdbCommand(client *c) { + int id1, id2; + + /* Not allowed in cluster mode: we have just DB 0 there. */ + if (server.cluster_enabled) { + addReplyError(c,"SWAPDB is not allowed in cluster mode"); + return; + } + + /* Get the two DBs indexes. */ + if (getIntFromObjectOrReply(c, c->argv[1], &id1, + "invalid first DB index") != C_OK) + return; + + if (getIntFromObjectOrReply(c, c->argv[2], &id2, + "invalid second DB index") != C_OK) + return; + + /* Swap... */ + if (dbSwapDatabases(id1,id2) == C_ERR) { + addReplyError(c,"DB index is out of range"); + return; + } else { + RedisModuleSwapDbInfo si = {REDISMODULE_SWAPDBINFO_VERSION,id1,id2}; + moduleFireServerEvent(REDISMODULE_EVENT_SWAPDB,0,&si); + server.dirty++; + server.stat_cluster_incompatible_ops++; + addReply(c,shared.ok); + } +} + +/*----------------------------------------------------------------------------- + * Expires API + *----------------------------------------------------------------------------*/ + +/* Remove expiry from key + * + * Remove the object from db->expires and set to -1 attached TTL to KV + */ +int removeExpire(redisDb *db, robj *key) { + int table; + int slot = getKeySlot(key->ptr); + dictEntryLink link = kvstoreDictTwoPhaseUnlinkFind(db->expires, slot, key->ptr, &table); + + if (link == NULL) return 0; + dictEntry *de = *link; + kvobj *kv = dictGetKV(de); + kvobj *newkv = kvobjSetExpire(kv, -1); + serverAssert(newkv == kv); + kvstoreDictTwoPhaseUnlinkFree(db->expires, slot, link, table); + return 1; +} + + +/* Set an expire to the specified key. If the expire is set in the context + * of an user calling a command 'c' is the client, otherwise 'c' is set + * to NULL. The 'when' parameter is the absolute unix time in milliseconds + * after which the key will no longer be considered valid. + * + * Note: It may reallocate kvobj. The returned ref may point to a new object. */ +kvobj *setExpire(client *c, redisDb *db, robj *key, long long when) { + return setExpireByLink(c,db,key->ptr,when,NULL); +} + +/* Like setExpire(), but accepts an optional `keyLink` to save lookup */ +kvobj *setExpireByLink(client *c, redisDb *db, sds key, long long when, dictEntryLink keyLink) { + /* Reuse the sds from the main dict in the expire dict */ + int slot = getKeySlot(key); + size_t oldsize = 0; + if (!keyLink) { + keyLink = kvstoreDictFindLink(db->keys, slot, key, NULL); + serverAssert(keyLink != NULL); + } + kvobj *kv = dictGetKV(*keyLink); + long long old_when = kvobjGetExpire(kv); + + if (old_when != -1) { /* old expire */ + kvobj *kvnew = kvobjSetExpire(kv, when); /* release kv if reallocated */ + /* Val already had an expire field, so it was not reallocated. */ + serverAssert(kv == kvnew); + } else { /* No old expire */ + if (server.memory_tracking_per_slot) + oldsize = kvobjAllocSize(kv); + uint64_t subexpiry = EB_EXPIRE_TIME_INVALID; + /* If hash with HFEs, take care to remove from global HFE DS before attempting + * to manipulate and maybe free kv object */ + if (kv->type == OBJ_HASH) + subexpiry = estoreRemove(db->subexpires, slot, kv); + + kvobj *kvnew = kvobjSetExpire(kv, when); /* release kv if reallocated */ + /* if kvobj was reallocated, update dict */ + if (kv != kvnew) { + kvstoreDictSetAtLink(db->keys, slot, kvnew, &keyLink, 0); + if (server.memory_tracking_per_slot) + updateSlotAllocSize(db, slot, oldsize, kvobjAllocSize(kvnew)); + kv = kvnew; + } + /* Now add to expires */ + dictEntry *de = kvstoreDictAddRaw(db->expires, slot, kv, NULL); + serverAssert(de != NULL); + + if (subexpiry != EB_EXPIRE_TIME_INVALID) + estoreAdd(db->subexpires, slot, kv, subexpiry); + } + + int writable_slave = server.masterhost && server.repl_slave_ro == 0; + if (c && writable_slave && !(c->flags & CLIENT_MASTER)) + rememberSlaveKeyWithExpire(db,key); + return kv; +} + +/* Retrieve the expiration time for the specified key. + * Returns -1 if the key has no expiration set or doesn't exists + * + * To avoid lookup, pass key-value object (`kv`) instead of `key`. + */ +long long getExpire(redisDb *db, sds key, kvobj *kv) { + if (kv == NULL) kv = dbFindExpires(db, key); + if (kv == NULL) return -1; + return kvobjGetExpire(kv); +} + +/* Delete the specified expired or evicted key and propagate to replicas. + * Currently notify_type can only be NOTIFY_EXPIRED or NOTIFY_EVICTED, + * and it affects other aspects like the latency monitor event name and, + * which config to look for lazy free, stats var to increment, and so on. + * + * key_mem_freed is an out parameter which contains the estimated + * amount of memory freed due to the trimming (may be NULL) */ +static void deleteKeyAndPropagate(redisDb *db, robj *keyobj, int notify_type, long long *key_mem_freed) { + mstime_t latency; + int del_flag = notify_type == NOTIFY_EXPIRED ? DB_FLAG_KEY_EXPIRED : DB_FLAG_KEY_EVICTED; + int lazy_flag = notify_type == NOTIFY_EXPIRED ? server.lazyfree_lazy_expire : server.lazyfree_lazy_eviction; + char *latency_name = notify_type == NOTIFY_EXPIRED ? "expire-del" : "evict-del"; + char *notify_name = notify_type == NOTIFY_EXPIRED ? "expired" : "evicted"; + + /* The key needs to be converted from static to heap before deleted */ + int static_key = keyobj->refcount == OBJ_STATIC_REFCOUNT; + if (static_key) { + keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr)); + } + + serverLog(LL_DEBUG,"key %s %s: deleting it", (char*)keyobj->ptr, notify_type == NOTIFY_EXPIRED ? "expired" : "evicted"); + + /* We compute the amount of memory freed by db*Delete() alone. + * It is possible that actually the memory needed to propagate + * the DEL in AOF and replication link is greater than the one + * we are freeing removing the key, but we can't account for + * that otherwise we would never exit the loop. + * + * Same for CSC invalidation messages generated by keyModified. + * + * AOF and Output buffer memory will be freed eventually so + * we only care about memory used by the key space. + * + * The code here used to first propagate and then record delta + * using only zmalloc_used_memory but in CRDT we can't do that + * so we use freeMemoryGetNotCountedMemory to avoid counting + * AOF and slave buffers */ + if (key_mem_freed) *key_mem_freed = (long long) zmalloc_used_memory() - freeMemoryGetNotCountedMemory(); + latencyStartMonitor(latency); + dbGenericDelete(db, keyobj, lazy_flag, del_flag); + latencyEndMonitor(latency); + latencyAddSampleIfNeeded(latency_name, latency); + if (key_mem_freed) *key_mem_freed -= (long long) zmalloc_used_memory() - freeMemoryGetNotCountedMemory(); + + notifyKeyspaceEvent(notify_type, notify_name,keyobj, db->id); + keyModified(NULL, db, keyobj, NULL, 1); + propagateDeletion(db, keyobj, lazy_flag); + + if (notify_type == NOTIFY_EXPIRED) + server.stat_expiredkeys++; + else + server.stat_evictedkeys++; + + if (static_key) + decrRefCount(keyobj); +} + +/* Delete the specified expired key and propagate. */ +void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) { + deleteKeyAndPropagate(db, keyobj, NOTIFY_EXPIRED, NULL); +} + +/* Delete the specified evicted key and propagate. */ +void deleteEvictedKeyAndPropagate(redisDb *db, robj *keyobj, long long *key_mem_freed) { + deleteKeyAndPropagate(db, keyobj, NOTIFY_EVICTED, key_mem_freed); +} + +/* Propagate an implicit key deletion into replicas and the AOF file. + * When a key was deleted in the master by eviction, expiration or a similar + * mechanism a DEL/UNLINK operation for this key is sent + * to all the replicas and the AOF file if enabled. + * + * This way the key deletion is centralized in one place, and since both + * AOF and the replication link guarantee operation ordering, everything + * will be consistent even if we allow write operations against deleted + * keys. + * + * This function may be called from: + * 1. Within call(): Example: Lazy-expire on key access. + * In this case the caller doesn't have to do anything + * because call() handles server.also_propagate(); or + * 2. Outside of call(): Example: Active-expire, eviction, slot ownership changed. + * In this the caller must remember to call + * postExecutionUnitOperations, preferably just after a + * single deletion batch, so that DEL/UNLINK will NOT be wrapped + * in MULTI/EXEC */ +void propagateDeletion(redisDb *db, robj *key, int lazy) { + robj *argv[2]; + + argv[0] = lazy ? shared.unlink : shared.del; + argv[1] = key; + incrRefCount(argv[0]); + incrRefCount(argv[1]); + + /* If the master decided to delete a key we must propagate it to replicas no matter what. + * Even if module executed a command without asking for propagation. */ + int prev_replication_allowed = server.replication_allowed; + server.replication_allowed = 1; + alsoPropagate(db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL); + server.replication_allowed = prev_replication_allowed; + + decrRefCount(argv[0]); + decrRefCount(argv[1]); +} + +/* Check if the key is expired + * + * Provide either the key name for a lookup or KV object (to save lookup) + */ +int keyIsExpired(redisDb *db, sds key, kvobj *kv) { + /* Don't expire anything while loading. It will be done later. */ + if (server.loading || server.allow_access_expired) return 0; + mstime_t when = getExpire(db, key, kv); + if (when < 0) return 0; /* No expire for this key */ + const mstime_t now = commandTimeSnapshot(); + /* The key expired if the current (virtual or real) time is greater + * than the expire time of the key. */ + return now > when; +} + +/* Check if user configuration allows key to be deleted due to expiary */ +int confAllowsExpireDel(void) { + if (server.lazyexpire_nested_arbitrary_keys) + return 1; + + /* This configuration specifically targets nested commands, to align with RE's feature of replication between dbs. + * transactions (from scripts or multi-exec) containing commands like SCAN and RANDOMKEY will execute locally, but their + * lazy-expiration DELs may induce CROSS-SLOT on remote proxy in mode replica-of (RED-161574) */ + return !(server.execution_nesting > 1 && server.executing_client->cmd->flags & CMD_TOUCHES_ARBITRARY_KEYS); +} + +/* This function is called when we are going to perform some operation + * in a given key, but such key may be already logically expired even if + * it still exists in the database. The main way this function is called + * is via lookupKey*() family of functions. + * + * The behavior of the function depends on the replication role of the + * instance, because by default replicas do not delete expired keys. They + * wait for DELs from the master for consistency matters. However even + * replicas will try to have a coherent return value for the function, + * so that read commands executed in the replica side will be able to + * behave like if the key is expired even if still present (because the + * master has yet to propagate the DEL). + * + * In masters as a side effect of finding a key which is expired, such + * key will be evicted from the database. Also this may trigger the + * propagation of a DEL/UNLINK command in AOF / replication stream. + * + * On replicas, this function does not delete expired keys by default, but + * it still returns KEY_EXPIRED if the key is logically expired. To force deletion + * of logically expired keys even on replicas, use the EXPIRE_FORCE_DELETE_EXPIRED + * flag. Note though that if the current client is executing + * replicated commands from the master, keys are never considered expired. + * + * On the other hand, if you just want expiration check, but need to avoid + * the actual key deletion and propagation of the deletion, use the + * EXPIRE_AVOID_DELETE_EXPIRED flag. If also needed to read expired key (that + * hasn't being deleted yet) then use EXPIRE_ALLOW_ACCESS_EXPIRED. + * + * The return value of the function is KEY_VALID if the key is still valid. + * The function returns KEY_EXPIRED if the key is expired BUT not deleted, + * or returns KEY_DELETED if the key is expired and deleted. If the key is in a + * trim job due to slot migration, the function returns KEY_TRIMMED, unless + * EXPIRE_ALLOW_ACCESS_TRIMMED is set, in which case it returns KEY_VALID. + * + * You can optionally pass `kv` to save a lookup. + */ +keyStatus expireIfNeeded(redisDb *db, robj *key, kvobj *kv, int flags) { + debugAssert(key != NULL || kv != NULL); + + /* NOTE: Keys in slots scheduled for trimming can still exist for a while. + * We don't delete it here, return KEY_VALID if allowing access to trimmed + * keys, and return KEY_TRIMMED otherwise. */ + sds key_name = key ? key->ptr : kvobjGetKey(kv); + if (asmIsKeyInTrimJob(key_name)) { + if (server.allow_access_trimmed || (flags & EXPIRE_ALLOW_ACCESS_TRIMMED)) + return KEY_VALID; + + return KEY_TRIMMED; + } + + if ((flags & EXPIRE_ALLOW_ACCESS_EXPIRED) || + (!keyIsExpired(db, key ? key->ptr : NULL, kv))) + return KEY_VALID; + + /* If we are running in the context of a replica, instead of + * evicting the expired key from the database, we return ASAP: + * the replica key expiration is controlled by the master that will + * send us synthesized DEL operations for expired keys. The + * exception is when write operations are performed on writable + * replicas. + * + * In cluster mode, we also return ASAP if we are importing data + * from the source, to avoid deleting keys that are still in use. + * We create a fake master client for data import, which can be + * identified using the CLIENT_MASTER flag. + * + * Still we try to return the right information to the caller, + * that is, KEY_VALID if we think the key should still be valid, + * KEY_EXPIRED if we think the key is expired but don't want to delete it at this time. + * + * When replicating commands from the master, keys are never considered + * expired. */ + if (server.masterhost != NULL || server.cluster_enabled) { + if (server.current_client && (server.current_client->flags & CLIENT_MASTER)) return KEY_VALID; + if (server.masterhost != NULL && !(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED; + } + + /* Check if user configuration disables lazy-expire deletions in current state. + * This will only apply if the server doesn't mandate key deletion to operate correctly (write commands). */ + if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED) && !confAllowsExpireDel()) + return KEY_EXPIRED; + + /* In some cases we're explicitly instructed to return an indication of a + * missing key without actually deleting it, even on masters. */ + if (flags & EXPIRE_AVOID_DELETE_EXPIRED) + return KEY_EXPIRED; + + /* If 'expire' action is paused, for whatever reason, then don't expire any key. + * Typically, at the end of the pause we will properly expire the key OR we + * will have failed over and the new primary will send us the expire. */ + if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return KEY_EXPIRED; + + /* Perform deletion */ + if (key) { + deleteExpiredKeyAndPropagate(db, key); + } else { + sds keyname = kvobjGetKey(kv); + robj *tmpkey = createStringObject(keyname, sdslen(keyname)); + deleteExpiredKeyAndPropagate(db, tmpkey); + decrRefCount(tmpkey); + } + return KEY_DELETED; +} + +/* CB passed to kvstoreExpand. + * The purpose is to skip expansion of unused dicts in cluster mode (all + * dicts not mapped to *my* slots) */ +static int dbExpandSkipSlot(int slot) { + return !clusterNodeCoversSlot(getMyClusterNode(), slot); +} + +/* + * This functions increases size of the main/expires db to match desired number. + * In cluster mode resizes all individual dictionaries for slots that this node owns. + * + * Based on the parameter `try_expand`, appropriate dict expand API is invoked. + * if try_expand is set to 1, `dictTryExpand` is used else `dictExpand`. + * The return code is either `DICT_OK`/`DICT_ERR` for both the API(s). + * `DICT_OK` response is for successful expansion. However ,`DICT_ERR` response signifies failure in allocation in + * `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed. + */ +static int dbExpandGeneric(kvstore *kvs, uint64_t db_size, int try_expand) { + int ret; + if (server.cluster_enabled) { + /* We don't know exact number of keys that would fall into each slot, but we can + * approximate it, assuming even distribution, divide it by the number of slots. */ + int slots = getMyShardSlotCount(); + if (slots == 0) return C_OK; + db_size = db_size / slots; + ret = kvstoreExpand(kvs, db_size, try_expand, dbExpandSkipSlot); + } else { + ret = kvstoreExpand(kvs, db_size, try_expand, NULL); + } + + return ret? C_OK : C_ERR; +} + +int dbExpand(redisDb *db, uint64_t db_size, int try_expand) { + return dbExpandGeneric(db->keys, db_size, try_expand); +} + +int dbExpandExpires(redisDb *db, uint64_t db_size, int try_expand) { + return dbExpandGeneric(db->expires, db_size, try_expand); +} + +static kvobj *dbFindGeneric(kvstore *kvs, sds key) { + dictEntry *res = kvstoreDictFind(kvs, getKeySlot(key), key); + return (res) ? dictGetKey(res) : NULL; +} + +kvobj *dbFind(redisDb *db, sds key) { + return dbFindGeneric(db->keys, key); +} + +/* Find a KV in the main db. Return also link to it. + * + * plink - If found, set to the link of the key in the dict. + * If not found, set to the bucket where the key should be added. + * If set to NULL, then HT of dict not allocated yet. + */ +kvobj *dbFindByLink(redisDb *db, sds key, dictEntryLink *plink) { + int slot = getKeySlot(key); + dictEntryLink link, bucket; + + link = kvstoreDictFindLink(db->keys, slot, key, &bucket); + if (link == NULL) { + if (plink) *plink = bucket; + return NULL; + } else { + if (plink) *plink = link; + return dictGetKV(*link); + } +} + +kvobj *dbFindExpires(redisDb *db, sds key) { + return dbFindGeneric(db->expires, key); +} + +unsigned long long dbSize(redisDb *db) { + unsigned long long total = kvstoreSize(db->keys); + + if (server.cluster_enabled) { + /* If we are the master and there is no import or trim in progress, + * then we can return the total count. If not, we need to subtract + * the number of keys in slots that are not accessible, as below. */ + if (clusterNodeIsMaster(getMyClusterNode()) && + !asmImportInProgress() && + !asmIsTrimInProgress()) + { + return total; + } + + /* Besides, we don't know the slot migration states on replicas, so we + * need to check each slot to see if it's accessible. */ + for (int i = 0; i < CLUSTER_SLOTS; i++) { + dict *d = kvstoreGetDict(db->keys, i); + if (d && !clusterCanAccessKeysInSlot(i)) { + total -= kvstoreDictSize(db->keys, i); + } + } + } + + return total; +} + +unsigned long long dbScan(redisDb *db, unsigned long long cursor, dictScanFunction *scan_cb, void *privdata) { + return kvstoreScan(db->keys, cursor, -1, scan_cb, scanShouldSkipDict, privdata); +} + +/* ----------------------------------------------------------------------------- + * API to get key arguments from commands + * ---------------------------------------------------------------------------*/ + +/* Prepare the getKeysResult struct to hold numkeys, either by using the + * pre-allocated keysbuf or by allocating a new array on the heap. + * + * This function must be called at least once before starting to populate + * the result, and can be called repeatedly to enlarge the result array. + */ +keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys) { + /* GETKEYS_RESULT_INIT initializes keys to NULL, point it to the pre-allocated stack + * buffer here. */ + if (!result->keys) { + serverAssert(!result->numkeys); + result->keys = result->keysbuf; + } + + /* Resize if necessary */ + if (numkeys > result->size) { + if (result->keys != result->keysbuf) { + /* We're not using a static buffer, just (re)alloc */ + result->keys = zrealloc(result->keys, numkeys * sizeof(keyReference)); + } else { + /* We are using a static buffer, copy its contents */ + result->keys = zmalloc(numkeys * sizeof(keyReference)); + if (result->numkeys) + memcpy(result->keys, result->keysbuf, result->numkeys * sizeof(keyReference)); + } + result->size = numkeys; + } + + return result->keys; +} + +/* Returns a bitmask with all the flags found in any of the key specs of the command. + * The 'inv' argument means we'll return a mask with all flags that are missing in at least one spec. */ +int64_t getAllKeySpecsFlags(struct redisCommand *cmd, int inv) { + int64_t flags = 0; + for (int j = 0; j < cmd->key_specs_num; j++) { + keySpec *spec = cmd->key_specs + j; + flags |= inv? ~spec->flags : spec->flags; + } + return flags; +} + +/* Fetch the keys based of the provided key specs. Returns the number of keys found, or -1 on error. + * There are several flags that can be used to modify how this function finds keys in a command. + * + * GET_KEYSPEC_INCLUDE_NOT_KEYS: Return 'fake' keys as if they were keys. + * GET_KEYSPEC_RETURN_PARTIAL: Skips invalid and incomplete keyspecs but returns the keys + * found in other valid keyspecs. + */ +int getKeysUsingKeySpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) { + long j, i, last, first, step; + keyReference *keys; + serverAssert(result->numkeys == 0); /* caller should initialize or reset it */ + + for (j = 0; j < cmd->key_specs_num; j++) { + keySpec *spec = cmd->key_specs + j; + serverAssert(spec->begin_search_type != KSPEC_BS_INVALID); + /* Skip specs that represent 'fake' keys */ + if ((spec->flags & CMD_KEY_NOT_KEY) && !(search_flags & GET_KEYSPEC_INCLUDE_NOT_KEYS)) { + continue; + } + + first = 0; + if (spec->begin_search_type == KSPEC_BS_INDEX) { + first = spec->bs.index.pos; + } else if (spec->begin_search_type == KSPEC_BS_KEYWORD) { + int start_index = spec->bs.keyword.startfrom > 0 ? spec->bs.keyword.startfrom : argc+spec->bs.keyword.startfrom; + int end_index = spec->bs.keyword.startfrom > 0 ? argc-1: 1; + for (i = start_index; i != end_index; i = start_index <= end_index ? i + 1 : i - 1) { + if (i >= argc || i < 1) + break; + if (!strcasecmp((char*)argv[i]->ptr,spec->bs.keyword.keyword)) { + first = i+1; + break; + } + } + /* keyword not found */ + if (!first) { + continue; + } + } else { + /* unknown spec */ + goto invalid_spec; + } + + if (spec->find_keys_type == KSPEC_FK_RANGE) { + step = spec->fk.range.keystep; + if (spec->fk.range.lastkey >= 0) { + last = first + spec->fk.range.lastkey; + } else { + if (!spec->fk.range.limit) { + last = argc + spec->fk.range.lastkey; + } else { + serverAssert(spec->fk.range.lastkey == -1); + last = first + ((argc-first)/spec->fk.range.limit + spec->fk.range.lastkey); + } + } + } else if (spec->find_keys_type == KSPEC_FK_KEYNUM) { + step = spec->fk.keynum.keystep; + long long numkeys; + if (spec->fk.keynum.keynumidx >= argc) + goto invalid_spec; + + sds keynum_str = argv[first + spec->fk.keynum.keynumidx]->ptr; + if (!string2ll(keynum_str,sdslen(keynum_str),&numkeys) || numkeys < 0) { + /* Unable to parse the numkeys argument or it was invalid */ + goto invalid_spec; + } + + first += spec->fk.keynum.firstkey; + last = first + ((long)numkeys - 1) * step; + } else { + /* unknown spec */ + goto invalid_spec; + } + + /* First or last is out of bounds, which indicates a syntax error */ + if (last >= argc || last < first || first >= argc) { + goto invalid_spec; + } + + int count = ((last - first)+1); + keys = getKeysPrepareResult(result, result->numkeys + count); + + for (i = first; i <= last; i += step) { + if (i >= argc || i < first) { + /* Modules commands, and standard commands with a not fixed number + * of arguments (negative arity parameter) do not have dispatch + * time arity checks, so we need to handle the case where the user + * passed an invalid number of arguments here. In this case we + * return no keys and expect the command implementation to report + * an arity or syntax error. */ + if (cmd->flags & CMD_MODULE || cmd->arity < 0) { + continue; + } else { + serverPanic("Redis built-in command declared keys positions not matching the arity requirements."); + } + } + keys[result->numkeys].pos = i; + keys[result->numkeys].flags = spec->flags; + result->numkeys++; + } + + /* Handle incomplete specs (only after we added the current spec + * to `keys`, just in case GET_KEYSPEC_RETURN_PARTIAL was given) */ + if (spec->flags & CMD_KEY_INCOMPLETE) { + goto invalid_spec; + } + + /* Done with this spec */ + continue; + +invalid_spec: + if (search_flags & GET_KEYSPEC_RETURN_PARTIAL) { + continue; + } else { + result->numkeys = 0; + return -1; + } + } + + return result->numkeys; +} + +/* Return all the arguments that are keys in the command passed via argc / argv. + * This function will eventually replace getKeysFromCommand. + * + * The command returns the positions of all the key arguments inside the array, + * so the actual return value is a heap allocated array of integers. The + * length of the array is returned by reference into *numkeys. + * + * Along with the position, this command also returns the flags that are + * associated with how Redis will access the key. + * + * 'cmd' must be point to the corresponding entry into the redisCommand + * table, according to the command name in argv[0]. */ +int getKeysFromCommandWithSpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) { + /* The command has at least one key-spec not marked as NOT_KEY */ + int has_keyspec = (getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY); + /* The command has at least one key-spec marked as VARIABLE_FLAGS */ + int has_varflags = (getAllKeySpecsFlags(cmd, 0) & CMD_KEY_VARIABLE_FLAGS); + + /* We prefer key-specs if there are any, and their flags are reliable. */ + if (has_keyspec && !has_varflags) { + int ret = getKeysUsingKeySpecs(cmd,argv,argc,search_flags,result); + if (ret >= 0) + return ret; + /* If the specs returned with an error (probably an INVALID or INCOMPLETE spec), + * fallback to the callback method. */ + } + + /* Resort to getkeys callback methods. */ + if (cmd->flags & CMD_MODULE_GETKEYS) + return moduleGetCommandKeysViaAPI(cmd,argv,argc,result); + + /* We use native getkeys as a last resort, since not all these native getkeys provide + * flags properly (only the ones that correspond to INVALID, INCOMPLETE or VARIABLE_FLAGS do.*/ + if (cmd->getkeys_proc) + return cmd->getkeys_proc(cmd,argv,argc,result); + return 0; +} + +/* This function returns a sanity check if the command may have keys. */ +int doesCommandHaveKeys(struct redisCommand *cmd) { + return cmd->getkeys_proc || /* has getkeys_proc (non modules) */ + (cmd->flags & CMD_MODULE_GETKEYS) || /* module with GETKEYS */ + (getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY); /* has at least one key-spec not marked as NOT_KEY */ +} + +/* A simplified channel spec table that contains all of the redis commands + * and which channels they have and how they are accessed. */ +typedef struct ChannelSpecs { + redisCommandProc *proc; /* Command procedure to match against */ + uint64_t flags; /* CMD_CHANNEL_* flags for this command */ + int start; /* The initial position of the first channel */ + int count; /* The number of channels, or -1 if all remaining + * arguments are channels. */ +} ChannelSpecs; + +ChannelSpecs commands_with_channels[] = { + {subscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1}, + {ssubscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1}, + {unsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1}, + {sunsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1}, + {psubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_SUBSCRIBE, 1, -1}, + {punsubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_UNSUBSCRIBE, 1, -1}, + {publishCommand, CMD_CHANNEL_PUBLISH, 1, 1}, + {spublishCommand, CMD_CHANNEL_PUBLISH, 1, 1}, + {NULL,0} /* Terminator. */ +}; + +/* Returns 1 if the command may access any channels matched by the flags + * argument. */ +int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags) { + /* If a module declares get channels, we are just going to assume + * has channels. This API is allowed to return false positives. */ + if (cmd->flags & CMD_MODULE_GETCHANNELS) { + return 1; + } + for (ChannelSpecs *spec = commands_with_channels; spec->proc != NULL; spec += 1) { + if (cmd->proc == spec->proc) { + return !!(spec->flags & flags); + } + } + return 0; +} + +/* Return all the arguments that are channels in the command passed via argc / argv. + * This function behaves similar to getKeysFromCommandWithSpecs, but with channels + * instead of keys. + * + * The command returns the positions of all the channel arguments inside the array, + * so the actual return value is a heap allocated array of integers. The + * length of the array is returned by reference into *numkeys. + * + * Along with the position, this command also returns the flags that are + * associated with how Redis will access the channel. + * + * 'cmd' must be point to the corresponding entry into the redisCommand + * table, according to the command name in argv[0]. */ +int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + keyReference *keys; + /* If a module declares get channels, use that. */ + if (cmd->flags & CMD_MODULE_GETCHANNELS) { + return moduleGetCommandChannelsViaAPI(cmd, argv, argc, result); + } + /* Otherwise check the channel spec table */ + for (ChannelSpecs *spec = commands_with_channels; spec != NULL; spec += 1) { + if (cmd->proc == spec->proc) { + int start = spec->start; + int stop = (spec->count == -1) ? argc : start + spec->count; + if (stop > argc) stop = argc; + int count = 0; + keys = getKeysPrepareResult(result, stop - start); + for (int i = start; i < stop; i++ ) { + keys[count].pos = i; + keys[count++].flags = spec->flags; + } + result->numkeys = count; + return count; + } + } + return 0; +} + +/* Extract keys/channels from a command and calculate the cluster slot. + * Returns the number of keys/channels extracted. + * The slot number is returned by reference into *slot. + * If is_incomplete is not NULL, it will be set for key extraction. + * + * This function handles both regular commands (keys) and sharded pubsub + * commands (channels), but excludes regular pubsub commands which don't + * have slots. + */ +int extractKeysAndSlot(struct redisCommand *cmd, robj **argv, int argc, + getKeysResult *result, int *slot) { + int num_keys = -1; + + if (!doesCommandHaveChannelsWithFlags(cmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE)) { + num_keys = getKeysFromCommandWithSpecs(cmd, argv, argc, GET_KEYSPEC_DEFAULT, result); + } else { + /* Only extract channels for commands that have key_specs (sharded pubsub). + * Regular pubsub commands (PUBLISH, SUBSCRIBE) don't have slots. */ + if (cmd->key_specs_num > 0) { + num_keys = getChannelsFromCommand(cmd, argv, argc, result); + } else { + num_keys = 0; + } + } + + *slot = extractSlotFromKeysResult(argv, result); + return num_keys; +} + +/* The base case is to use the keys position as given in the command table + * (firstkey, lastkey, step). + * This function works only on command with the legacy_range_key_spec, + * all other commands should be handled by getkeys_proc. + * + * If the commands keyspec is incomplete, no keys will be returned, and the provided + * keys function should be called instead. + * + * NOTE: This function does not guarantee populating the flags for + * the keys, in order to get flags you should use getKeysUsingKeySpecs. */ +int getKeysUsingLegacyRangeSpec(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + int j, i = 0, last, first, step; + keyReference *keys; + UNUSED(argv); + + if (cmd->legacy_range_key_spec.begin_search_type == KSPEC_BS_INVALID) { + result->numkeys = 0; + return 0; + } + + first = cmd->legacy_range_key_spec.bs.index.pos; + last = cmd->legacy_range_key_spec.fk.range.lastkey; + if (last >= 0) + last += first; + step = cmd->legacy_range_key_spec.fk.range.keystep; + + if (last < 0) last = argc+last; + + int count = ((last - first)+1); + keys = getKeysPrepareResult(result, count); + + for (j = first; j <= last; j += step) { + if (j >= argc || j < first) { + /* Modules commands, and standard commands with a not fixed number + * of arguments (negative arity parameter) do not have dispatch + * time arity checks, so we need to handle the case where the user + * passed an invalid number of arguments here. In this case we + * return no keys and expect the command implementation to report + * an arity or syntax error. */ + if (cmd->flags & CMD_MODULE || cmd->arity < 0) { + result->numkeys = 0; + return 0; + } else { + serverPanic("Redis built-in command declared keys positions not matching the arity requirements."); + } + } + keys[i].pos = j; + /* Flags are omitted from legacy key specs */ + keys[i++].flags = 0; + } + result->numkeys = i; + return i; +} + +/* Return all the arguments that are keys in the command passed via argc / argv. + * + * The command returns the positions of all the key arguments inside the array, + * so the actual return value is a heap allocated array of integers. The + * length of the array is returned by reference into *numkeys. + * + * 'cmd' must be point to the corresponding entry into the redisCommand + * table, according to the command name in argv[0]. + * + * This function uses the command table if a command-specific helper function + * is not required, otherwise it calls the command-specific function. */ +int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + if (cmd->flags & CMD_MODULE_GETKEYS) { + return moduleGetCommandKeysViaAPI(cmd,argv,argc,result); + } else if (cmd->getkeys_proc) { + return cmd->getkeys_proc(cmd,argv,argc,result); + } else { + return getKeysUsingLegacyRangeSpec(cmd,argv,argc,result); + } +} + +/* Free the result of getKeysFromCommand. */ +void getKeysFreeResult(getKeysResult *result) { + if (result && result->keys != result->keysbuf) + zfree(result->keys); +} + +/* Helper function to extract keys from following commands: + * COMMAND [destkey] <num-keys> <key> [...] <key> [...] ... <options> + * + * eg: + * ZUNION <num-keys> <key> <key> ... <key> <options> + * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options> + * + * 'storeKeyOfs': destkey index, 0 means destkey not exists. + * 'keyCountOfs': num-keys index. + * 'firstKeyOfs': firstkey index. + * 'keyStep': the interval of each key, usually this value is 1. + * + * The commands using this function have a fully defined keyspec, so returning flags isn't needed. */ +int genericGetKeys(int storeKeyOfs, int keyCountOfs, int firstKeyOfs, int keyStep, + robj **argv, int argc, getKeysResult *result) { + int i, num; + keyReference *keys; + + num = atoi(argv[keyCountOfs]->ptr); + /* Sanity check. Don't return any key if the command is going to + * reply with syntax error. (no input keys). */ + if (num < 1 || num > (argc - firstKeyOfs)/keyStep) { + result->numkeys = 0; + return 0; + } + + int numkeys = storeKeyOfs ? num + 1 : num; + keys = getKeysPrepareResult(result, numkeys); + result->numkeys = numkeys; + + /* Add all key positions for argv[firstKeyOfs...n] to keys[] */ + for (i = 0; i < num; i++) { + keys[i].pos = firstKeyOfs+(i*keyStep); + keys[i].flags = 0; + } + + if (storeKeyOfs) { + keys[num].pos = storeKeyOfs; + keys[num].flags = 0; + } + return result->numkeys; +} + +int sintercardGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + UNUSED(cmd); + return genericGetKeys(0, 1, 2, 1, argv, argc, result); +} + +int zunionInterDiffStoreGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + UNUSED(cmd); + return genericGetKeys(1, 2, 3, 1, argv, argc, result); +} + +int zunionInterDiffGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + UNUSED(cmd); + return genericGetKeys(0, 1, 2, 1, argv, argc, result); +} + +int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + UNUSED(cmd); + return genericGetKeys(0, 2, 3, 1, argv, argc, result); +} + +int functionGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + UNUSED(cmd); + return genericGetKeys(0, 2, 3, 1, argv, argc, result); +} + +int lmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + UNUSED(cmd); + return genericGetKeys(0, 1, 2, 1, argv, argc, result); +} + +int blmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + UNUSED(cmd); + return genericGetKeys(0, 2, 3, 1, argv, argc, result); +} + +int zmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + UNUSED(cmd); + return genericGetKeys(0, 1, 2, 1, argv, argc, result); +} + +int bzmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + UNUSED(cmd); + return genericGetKeys(0, 2, 3, 1, argv, argc, result); +} + +/* Helper function to extract keys from the SORT RO command. + * + * SORT <sort-key> + * + * The second argument of SORT is always a key, however an arbitrary number of + * keys may be accessed while doing the sort (the BY and GET args), so the + * key-spec declares incomplete keys which is why we have to provide a concrete + * implementation to fetch the keys. + * + * This command declares incomplete keys, so the flags are correctly set for this function */ +int sortROGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + keyReference *keys; + UNUSED(cmd); + UNUSED(argv); + UNUSED(argc); + + keys = getKeysPrepareResult(result, 1); + keys[0].pos = 1; /* <sort-key> is always present. */ + keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS; + result->numkeys = 1; + return result->numkeys; +} + +/* Helper function to extract keys from the SORT command. + * + * SORT <sort-key> ... STORE <store-key> ... + * + * The first argument of SORT is always a key, however a list of options + * follow in SQL-alike style. Here we parse just the minimum in order to + * correctly identify keys in the "STORE" option. + * + * This command declares incomplete keys, so the flags are correctly set for this function */ +int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + int i, j, num, found_store = 0; + keyReference *keys; + UNUSED(cmd); + + num = 0; + keys = getKeysPrepareResult(result, 2); /* Alloc 2 places for the worst case. */ + keys[num].pos = 1; /* <sort-key> is always present. */ + keys[num++].flags = CMD_KEY_RO | CMD_KEY_ACCESS; + + /* Search for STORE option. By default we consider options to don't + * have arguments, so if we find an unknown option name we scan the + * next. However there are options with 1 or 2 arguments, so we + * provide a list here in order to skip the right number of args. */ + struct { + char *name; + int skip; + } skiplist[] = { + {"limit", 2}, + {"get", 1}, + {"by", 1}, + {NULL, 0} /* End of elements. */ + }; + + for (i = 2; i < argc; i++) { + for (j = 0; skiplist[j].name != NULL; j++) { + if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) { + i += skiplist[j].skip; + break; + } else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) { + /* Note: we don't increment "num" here and continue the loop + * to be sure to process the *last* "STORE" option if multiple + * ones are provided. This is same behavior as SORT. */ + found_store = 1; + keys[num].pos = i+1; /* <store-key> */ + keys[num].flags = CMD_KEY_OW | CMD_KEY_UPDATE; + break; + } + } + } + result->numkeys = num + found_store; + return result->numkeys; +} + +/* This command declares incomplete keys, so the flags are correctly set for this function */ +int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + int i, j, num, first; + keyReference *keys; + UNUSED(cmd); + + /* Assume the obvious form. */ + first = 3; + num = 1; + + /* But check for the extended one with the KEYS option. */ + struct { + char* name; + int skip; + } skip_keywords[] = { + {"copy", 0}, + {"replace", 0}, + {"auth", 1}, + {"auth2", 2}, + {NULL, 0} + }; + if (argc > 6) { + for (i = 6; i < argc; i++) { + if (!strcasecmp(argv[i]->ptr, "keys")) { + if (sdslen(argv[3]->ptr) > 0) { + /* This is a syntax error. So ignore the keys and leave + * the syntax error to be handled by migrateCommand. */ + num = 0; + } else { + first = i + 1; + num = argc - first; + } + break; + } + for (j = 0; skip_keywords[j].name != NULL; j++) { + if (!strcasecmp(argv[i]->ptr, skip_keywords[j].name)) { + i += skip_keywords[j].skip; + break; + } + } + } + } + + keys = getKeysPrepareResult(result, num); + for (i = 0; i < num; i++) { + keys[i].pos = first+i; + keys[i].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_DELETE; + } + result->numkeys = num; + return num; +} + +/* Helper function to extract keys from following commands: + * GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC] + * [COUNT count] [STORE key|STOREDIST key] + * GEORADIUSBYMEMBER key member radius unit ... options ... + * + * This command has a fully defined keyspec, so returning flags isn't needed. */ +int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + int i, num; + keyReference *keys; + UNUSED(cmd); + + /* Check for the presence of the stored key in the command */ + int stored_key = -1; + for (i = 5; i < argc; i++) { + char *arg = argv[i]->ptr; + /* For the case when user specifies both "store" and "storedist" options, the + * second key specified would override the first key. This behavior is kept + * the same as in georadiusCommand method. + */ + if ((!strcasecmp(arg, "store") || !strcasecmp(arg, "storedist")) && ((i+1) < argc)) { + stored_key = i+1; + i++; + } + } + num = 1 + (stored_key == -1 ? 0 : 1); + + /* Keys in the command come from two places: + * argv[1] = key, + * argv[5...n] = stored key if present + */ + keys = getKeysPrepareResult(result, num); + + /* Add all key positions to keys[] */ + keys[0].pos = 1; + keys[0].flags = 0; + if(num > 1) { + keys[1].pos = stored_key; + keys[1].flags = 0; + } + result->numkeys = num; + return num; +} + +/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>] + * STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N + * + * This command has a fully defined keyspec, so returning flags isn't needed. */ +int xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + int i, num = 0; + keyReference *keys; + UNUSED(cmd); + + /* We need to parse the options of the command in order to seek the first + * "STREAMS" string which is actually the option. This is needed because + * "STREAMS" could also be the name of the consumer group and even the + * name of the stream key. */ + int streams_pos = -1; + for (i = 1; i < argc; i++) { + char *arg = argv[i]->ptr; + if (!strcasecmp(arg, "block")) { + i++; /* Skip option argument. */ + } else if (!strcasecmp(arg, "count")) { + i++; /* Skip option argument. */ + } else if (!strcasecmp(arg, "group")) { + i += 2; /* Skip option argument. */ + } else if (!strcasecmp(arg, "noack")) { + /* Nothing to do. */ + } else if (!strcasecmp(arg, "streams")) { + streams_pos = i; + break; + } else { + break; /* Syntax error. */ + } + } + if (streams_pos != -1) num = argc - streams_pos - 1; + + /* Syntax error. */ + if (streams_pos == -1 || num == 0 || num % 2 != 0) { + result->numkeys = 0; + return 0; + } + num /= 2; /* We have half the keys as there are arguments because + there are also the IDs, one per key. */ + + keys = getKeysPrepareResult(result, num); + for (i = streams_pos+1; i < argc-num; i++) { + keys[i-streams_pos-1].pos = i; + keys[i-streams_pos-1].flags = 0; + } + result->numkeys = num; + return num; +} + +/* Helper function to extract keys from the SET command, which may have + * an RW flag if the GET, IF* arguments are present, OW otherwise. */ +int setGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + keyReference *keys; + UNUSED(cmd); + + keys = getKeysPrepareResult(result, 1); + keys[0].pos = 1; /* We always know the position */ + result->numkeys = 1; + int actual = CMD_KEY_OW; + int logical = CMD_KEY_UPDATE; + + for (int i = 3; i < argc; i++) { + char *arg = argv[i]->ptr; + if ((arg[0] == 'g' || arg[0] == 'G') && + (arg[1] == 'e' || arg[1] == 'E') && + (arg[2] == 't' || arg[2] == 'T') && arg[3] == '\0') + { + actual = CMD_KEY_RW; + logical |= CMD_KEY_ACCESS; + } else if (!strcasecmp(arg, "ifeq") || !strcasecmp(arg, "ifne") || + !strcasecmp(arg, "ifdeq") || !strcasecmp(arg, "ifdne")) + { + actual = CMD_KEY_RW; + } + } + + keys[0].flags = actual | logical; + + return 1; +} + +/* Helper function to extract keys from the DELEX command, which may have + * an RW flag if the IF* arguments are present, RM otherwise. */ +int delexGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + keyReference *keys; + UNUSED(cmd); + + keys = getKeysPrepareResult(result, 1); + keys[0].pos = 1; /* We always know the position */ + result->numkeys = 1; + int actual = CMD_KEY_RM; + int logical = CMD_KEY_DELETE; + + for (int i = 2; i < argc; i++) { + char *arg = argv[i]->ptr; + if (!strcasecmp(arg, "ifeq") || !strcasecmp(arg, "ifne") || + !strcasecmp(arg, "ifdeq") || !strcasecmp(arg, "ifdne")) + { + actual = CMD_KEY_RW; + } + } + + keys[0].flags = actual | logical; + + return 1; +} + +/* Helper function to extract keys from the BITFIELD command, which may be + * read-only if the BITFIELD GET subcommand is used. */ +int bitfieldGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { + keyReference *keys; + int readonly = 1; + UNUSED(cmd); + + keys = getKeysPrepareResult(result, 1); + keys[0].pos = 1; /* We always know the position */ + result->numkeys = 1; + + for (int i = 2; i < argc; i++) { + int remargs = argc - i - 1; /* Remaining args other than current. */ + char *arg = argv[i]->ptr; + if (!strcasecmp(arg, "get") && remargs >= 2) { + i += 2; + } else if ((!strcasecmp(arg, "set") || !strcasecmp(arg, "incrby")) && remargs >= 3) { + readonly = 0; + i += 3; + break; + } else if (!strcasecmp(arg, "overflow") && remargs >= 1) { + i += 1; + } else { + readonly = 0; /* Syntax error. safer to assume non-RO. */ + break; + } + } + + if (readonly) { + keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS; + } else { + keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE; + } + return 1; +} |
