diff options
Diffstat (limited to 'examples/redis-unstable/src/db.c')
| -rw-r--r-- | examples/redis-unstable/src/db.c | 3793 |
1 files changed, 0 insertions, 3793 deletions
diff --git a/examples/redis-unstable/src/db.c b/examples/redis-unstable/src/db.c deleted file mode 100644 index 2a64147..0000000 --- a/examples/redis-unstable/src/db.c +++ /dev/null @@ -1,3793 +0,0 @@ -/* - * Copyright (c) 2009-Present, Redis Ltd. - * All rights reserved. - * - * Copyright (c) 2024-present, Valkey contributors. - * All rights reserved. - * - * Licensed under your choice of (a) the Redis Source Available License 2.0 - * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the - * GNU Affero General Public License v3 (AGPLv3). - * - * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. - */ - -#include "server.h" -#include "cluster.h" -#include "atomicvar.h" -#include "latency.h" -#include "script.h" -#include "functions.h" -#include "cluster_asm.h" -#include "redisassert.h" - -#include <signal.h> -#include <ctype.h> -#include "bio.h" -#include "keymeta.h" - -/*----------------------------------------------------------------------------- - * C-level DB API - *----------------------------------------------------------------------------*/ - -static_assert(MAX_KEYSIZES_TYPES == OBJ_TYPE_BASIC_MAX, "Must be equal"); - -/* Flags for expireIfNeeded */ -#define EXPIRE_FORCE_DELETE_EXPIRED 1 -#define EXPIRE_AVOID_DELETE_EXPIRED 2 -#define EXPIRE_ALLOW_ACCESS_EXPIRED 4 -#define EXPIRE_ALLOW_ACCESS_TRIMMED 8 - -/* Return values for expireIfNeeded */ -typedef enum { - KEY_VALID = 0, /* Could be volatile and not yet expired, non-volatile, or even non-existing key. */ - KEY_EXPIRED, /* Logically expired but not yet deleted. */ - KEY_DELETED, /* The key was deleted now. */ - KEY_TRIMMED /* Logically trimmed but not yet deleted. */ -} keyStatus; - -static keyStatus expireIfNeeded(redisDb *db, robj *key, kvobj *kv, int flags); - -/* Update LFU when an object is accessed. - * Firstly, decrement the counter if the decrement time is reached. - * Then logarithmically increment the counter, and update the access time. */ -void updateLFU(robj *val) { - unsigned long counter = LFUDecrAndReturn(val); - counter = LFULogIncr(counter); - val->lru = (LFUGetTimeInMinutes()<<8) | counter; -} - -/* Update LRM when an object is modified. */ -void updateLRM(robj *o) { - if (o->refcount == OBJ_SHARED_REFCOUNT) - return; - if (server.maxmemory_policy & MAXMEMORY_FLAG_LRM) { - o->lru = LRU_CLOCK(); - } -} - -/* - * Update histogram of keys-sizes - * - * It is used to track the distribution of key sizes in the dataset. It is updated - * every time key's length is modified. Available to user via INFO command. - * - * The histogram is a base-2 logarithmic histogram, with 64 bins. The i'th bin - * represents the number of keys with a size in the range 2^i and 2^(i+1) - * exclusive. oldLen/newLen must be smaller than 2^48, and if their value - * equals -1, it means that the key is being created/deleted, respectively. Each - * data type has its own histogram and it is per database (In addition, there is - * histogram per slot for future cluster use). - * - * Example mapping of key lengths to bins: - * [1,2)->1 [2,4)->2 [4,8)->3 [8,16)->4 ... - * - * Since strings can be zero length, the histogram also tracks: - * [0,1)->0 - */ -void updateKeysizesHist(redisDb *db, int didx, uint32_t type, int64_t oldLen, int64_t newLen) { - if(unlikely(type >= OBJ_TYPE_BASIC_MAX)) - return; - - kvstoreDictMetadata *dictMeta = kvstoreGetDictMeta(db->keys, didx, 0); - kvstoreMetadata *kvstoreMeta = kvstoreGetMetadata(db->keys); - - if (oldLen > 0) { - int old_bin = log2ceil(oldLen) + 1; - debugServerAssert(old_bin < MAX_KEYSIZES_BINS); - /* If following a key deletion it is last one in slot's dict, then - * slot's dict might get released as well. Verify if metadata is not NULL. */ - if(dictMeta) { - dictMeta->keysizes_hist[type][old_bin]--; - debugServerAssert(dictMeta->keysizes_hist[type][old_bin] >= 0); - } - kvstoreMeta->keysizes_hist[type][old_bin]--; - debugServerAssert(kvstoreMeta->keysizes_hist[type][old_bin] >= 0); - } else { - /* here, oldLen can be either 0 or -1 */ - if (oldLen == 0) { - /* Only strings can be empty. Yet, a command flow might temporarily - * dbAdd() empty collection, and only after add elements. */ - - if (dictMeta) { - dictMeta->keysizes_hist[type][0]--; - debugServerAssert(dictMeta->keysizes_hist[type][0] >= 0); - } - kvstoreMeta->keysizes_hist[type][0]--; - debugServerAssert(kvstoreMeta->keysizes_hist[type][0] >= 0); - } - } - - if (newLen > 0) { - int new_bin = log2ceil(newLen) + 1; - debugServerAssert(new_bin < MAX_KEYSIZES_BINS); - /* If following a key deletion it is last one in slot's dict, then - * slot's dict might get released as well. Verify if metadata is not NULL. */ - if(dictMeta) dictMeta->keysizes_hist[type][new_bin]++; - kvstoreMeta->keysizes_hist[type][new_bin]++; - } else { - /* here, newLen can be either 0 or -1 */ - if (newLen == 0) { - /* Only strings can be empty. Yet, a command flow might temporarily - * dbAdd() empty collection, and only after add elements. */ - - if (dictMeta) dictMeta->keysizes_hist[type][0]++; - kvstoreMeta->keysizes_hist[type][0]++; - } - } -} - -void updateSlotAllocSize(redisDb *db, int didx, size_t oldsize, size_t newsize) { - debugServerAssert(server.memory_tracking_per_slot); - kvstoreDictMetadata *dictMeta = kvstoreGetDictMeta(db->keys, didx, 0); - if (!dictMeta) return; - debugServerAssert(oldsize <= dictMeta->alloc_size); - dictMeta->alloc_size -= oldsize; - dictMeta->alloc_size += newsize; -} - -/* Assert keysizes histogram (For debugging only) - * - * Triggered by DEBUG KEYSIZES-HIST-ASSERT 1 and tested after each command. - */ -void dbgAssertKeysizesHist(redisDb *db) { - /* Scan DB and build expected histogram by scanning all keys */ - int64_t scanHist[MAX_KEYSIZES_TYPES][MAX_KEYSIZES_BINS] = {{0}}; - dictEntry *de; - kvstoreIterator kvs_it; - kvstoreIteratorInit(&kvs_it, db->keys); - while ((de = kvstoreIteratorNext(&kvs_it)) != NULL) { - kvobj *kv = dictGetKV(de); - if (kv->type < OBJ_TYPE_BASIC_MAX) { - int64_t len = getObjectLength(kv); - scanHist[kv->type][(len == 0) ? 0 : log2ceil(len) + 1]++; - } - } - kvstoreIteratorReset(&kvs_it); - for (int type = 0; type < OBJ_TYPE_BASIC_MAX; type++) { - kvstoreMetadata *meta = kvstoreGetMetadata(db->keys); - volatile int64_t *keysizesHist = meta->keysizes_hist[type]; - for (int i = 0; i < MAX_KEYSIZES_BINS; i++) { - if (scanHist[type][i] == keysizesHist[i]) - continue; - - /* print scanStr vs. expected histograms for debugging */ - char scanStr[500], keysizesStr[500]; - int l1 = 0, l2 = 0; - for (int j = 0; (j < MAX_KEYSIZES_BINS) && (l1 < 500) && (l2 < 500); j++) { - if (scanHist[type][j]) - l1 += snprintf(scanStr + l1, sizeof(scanStr) - l1, - "[%d]=%"PRId64" ", j, scanHist[type][j]); - if (keysizesHist[j]) - l2 += snprintf(keysizesStr + l2, sizeof(keysizesStr) - l2, - "[%d]=%"PRId64" ", j, keysizesHist[j]); - } - serverPanic("dbgAssertKeysizesHist: type=%d\nscanStr=%s\nkeysizes=%s\n", - type, scanStr, keysizesStr); - } - } -} - -/* Assert per-slot alloc_size (For debugging only) - * - * Triggered by DEBUG ALLOCSIZE-SLOTS-ASSERT 1 and tested after each command. - */ -void dbgAssertAllocSizePerSlot(redisDb *db) { - if (!server.memory_tracking_per_slot) return; - size_t slot_sizes[CLUSTER_SLOTS] = {0}; - dictEntry *de; - kvstoreIterator kvs_it; - kvstoreIteratorInit(&kvs_it, db->keys); - while ((de = kvstoreIteratorNext(&kvs_it)) != NULL) { - int slot = kvstoreIteratorGetCurrentDictIndex(&kvs_it); - kvobj *kv = dictGetKV(de); - slot_sizes[slot] += kvobjAllocSize(kv); - } - kvstoreIteratorReset(&kvs_it); - - int num_slots = kvstoreNumDicts(db->keys); - for (int slot = 0; slot < num_slots; slot++) { - kvstoreDictMetadata *dictMeta = kvstoreGetDictMeta(db->keys, slot, 0); - size_t want = slot_sizes[slot]; - size_t have = dictMeta ? dictMeta->alloc_size : 0; - if (have == want) continue; - serverPanic("dbgAssertAllocSizePerSlot: slot=%d expected=%zu actual=%zu", - slot, want, have); - } -} - -/* Lookup a kvobj for read or write operations, or return NULL if the it is not - * found in the specified DB. This function implements the functionality of - * lookupKeyRead(), lookupKeyWrite() and their ...WithFlags() variants. - * - * link - If key found, return the link of the key. - * If key not found, return the bucket link, where the key should be added. - * Or NULL if dict wasn't allocated yet. - * - * Side-effects of calling this function: - * - * 1. A key gets expired if it reached it's TTL. - * 2. The key's last access time is updated. - * 3. The global keys hits/misses stats are updated (reported in INFO). - * 4. If keyspace notifications are enabled, a "keymiss" notification is fired. - * - * Flags change the behavior of this command: - * - * LOOKUP_NONE (or zero): No special flags are passed. - * LOOKUP_NOTOUCH: Don't alter the last access time of the key. - * LOOKUP_NONOTIFY: Don't trigger keyspace event on key miss. - * LOOKUP_NOSTATS: Don't increment key hits/misses counters. - * LOOKUP_WRITE: Prepare the key for writing (delete expired keys even on - * replicas, use separate keyspace stats and events (TODO)). - * LOOKUP_NOEXPIRE: Perform expiration check, but avoid deleting the key, - * so that we don't have to propagate the deletion. - * - * Note: this function also returns NULL if the key is logically expired but - * still existing, in case this is a replica and the LOOKUP_WRITE is not set. - * Even if the key expiry is master-driven, we can correctly report a key is - * expired on replicas even if the master is lagging expiring our key via DELs - * in the replication link. */ -kvobj *lookupKey(redisDb *db, robj *key, int flags, dictEntryLink *link) { - - kvobj *val = dbFindByLink(db, key->ptr, link); - - if (val) { - /* Forcing deletion of expired keys on a replica makes the replica - * inconsistent with the master. We forbid it on readonly replicas, but - * we have to allow it on writable replicas to make write commands - * behave consistently. - * - * It's possible that the WRITE flag is set even during a readonly - * command, since the command may trigger events that cause modules to - * perform additional writes. */ - int is_ro_replica = server.masterhost && server.repl_slave_ro; - int expire_flags = 0; - if (flags & LOOKUP_WRITE && !is_ro_replica) - expire_flags |= EXPIRE_FORCE_DELETE_EXPIRED; - if (flags & LOOKUP_NOEXPIRE) - expire_flags |= EXPIRE_AVOID_DELETE_EXPIRED; - if (flags & LOOKUP_ACCESS_EXPIRED) - expire_flags |= EXPIRE_ALLOW_ACCESS_EXPIRED; - if (flags & LOOKUP_ACCESS_TRIMMED) - expire_flags |= EXPIRE_ALLOW_ACCESS_TRIMMED; - if (expireIfNeeded(db, key, val, expire_flags) != KEY_VALID) { - /* The key is no longer valid. */ - val = NULL; - if (link) *link = NULL; - } - } - - if (val) { - /* Update the access time for the ageing algorithm. - * Don't do it if we have a saving child, as this will trigger - * a copy on write madness. */ - if (((flags & LOOKUP_NOTOUCH) == 0) && - (server.current_client && server.current_client->flags & CLIENT_NO_TOUCH) && - (server.executing_client && server.executing_client->cmd->proc != touchCommand)) - flags |= LOOKUP_NOTOUCH; - if (!hasActiveChildProcess() && !(flags & LOOKUP_NOTOUCH)){ - if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { - updateLFU(val); - } else if (!(server.maxmemory_policy & MAXMEMORY_FLAG_LRM)) { - /* LRM policy should NOT update timestamp on reads. */ - val->lru = LRU_CLOCK(); - } - } - - if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE))) - server.stat_keyspace_hits++; - /* TODO: Use separate hits stats for WRITE */ - } else { - if (!(flags & (LOOKUP_NONOTIFY | LOOKUP_WRITE))) - notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id); - if (!(flags & (LOOKUP_NOSTATS | LOOKUP_WRITE))) - server.stat_keyspace_misses++; - /* TODO: Use separate misses stats and notify event for WRITE */ - } - - return val; -} - -/* Lookup a key for read operations, or return NULL if the key is not found - * in the specified DB. - * - * This API should not be used when we write to the key after obtaining - * the object linked to the key, but only for read only operations. - * - * This function is equivalent to lookupKey(). The point of using this function - * rather than lookupKey() directly is to indicate that the purpose is to read - * the key. */ -kvobj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { - serverAssert(!(flags & LOOKUP_WRITE)); - return lookupKey(db, key, flags, NULL); -} - -/* Like lookupKeyReadWithFlags(), but does not use any flag, which is the - * common case. */ -kvobj *lookupKeyRead(redisDb *db, robj *key) { - return lookupKeyReadWithFlags(db,key,LOOKUP_NONE); -} - -/* Lookup a key for write operations, and as a side effect, if needed, expires - * the key if its TTL is reached. It's equivalent to lookupKey() with the - * LOOKUP_WRITE flag added. - * - * Returns the linked value object if the key exists or NULL if the key - * does not exist in the specified DB. */ -kvobj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags) { - return lookupKey(db, key, flags | LOOKUP_WRITE, NULL); -} - -kvobj *lookupKeyWrite(redisDb *db, robj *key) { - return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE); -} - -/* Like lookupKeyWrite(), but accepts ref to optional `link` - * - * link - If key found, updated to link the key. - * If key not found, updated to the bucket where the key should be added. - * If key not found and dict is empty, it is set to NULL - */ -kvobj *lookupKeyWriteWithLink(redisDb *db, robj *key, dictEntryLink *link) { - return lookupKey(db, key, LOOKUP_NONE | LOOKUP_WRITE, link); -} - -kvobj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) { - kvobj *kv = lookupKeyRead(c->db, key); - if (!kv) addReplyOrErrorObject(c, reply); - return kv; -} - -kvobj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { - kvobj *kv = lookupKeyWrite(c->db, key); - if (!kv) addReplyOrErrorObject(c, reply); - return kv; -} - -/* Add a key-value entry to the DB. - * - * A copy of 'key' is stored in the database. The caller must ensure the - * `key` is properly freed by calling decrRefcount(key). - * - * The value may (if its reference counter == 1) be reallocated and become - * invalid after a call to this function. The (possibly reallocated) value is - * stored in the database and the 'valref' pointer is updated to point to the - * new allocation. - * - * The reference counter of the value pointed to by valref is not incremented, - * so the caller should not free the value using decrRefcount after calling this - * function. - * - * link - Optional link to bucket where the key should be added. - * On return, get updated, by need, to the inserted key. - * - * keymeta - Defines metadata to be attached to the key. Including optional - * expiration and modules metadata to be copied (REQUIRED). - */ -kvobj *dbAddInternal(redisDb *db, robj *key, robj **valref, dictEntryLink *link, - const KeyMetaSpec *keymeta) -{ - int slot = getKeySlot(key->ptr); - dictEntryLink tmp = NULL; - if (link == NULL) link = &tmp; - robj *val = *valref; - kvobj *kv = kvobjSet(key->ptr, val, keymeta->metabits); - initObjectLRUOrLFU(kv); - kvstoreDictSetAtLink(db->keys, slot, kv, link, 1); - - /* Handle metadata (expiration and modules metadata) */ - if (keymeta->metabits) { - if (keymeta->metabits & KEY_META_MASK_EXPIRE) { - /* Expiry is always the first meta (from last) */ - long long expire = keymeta->meta[KEY_META_ID_MAX - 1]; - kvobj *newkv = setExpireByLink(NULL, db, key->ptr, expire, *link); - serverAssert(newkv == kv); - } - - /* memcpy modules metadata to beginning of kvobj */ - if (keymeta->metabits & KEY_META_MASK_MODULES) - /* Also trivial overwrite expire */ - memcpy(kvobjGetAllocPtr(kv), - keymeta->meta + KEY_META_ID_MAX - keymeta->numMeta, - keymeta->numMeta * sizeof(uint64_t)); - } - - signalKeyAsReady(db, key, kv->type); - notifyKeyspaceEvent(NOTIFY_NEW,"new",key,db->id); - updateKeysizesHist(db, slot, kv->type, -1, getObjectLength(kv)); /* add hist */ - if (server.memory_tracking_per_slot) - updateSlotAllocSize(db, slot, 0, kvobjAllocSize(kv)); - *valref = kv; - return kv; -} - -/* Read dbAddInternal() comment */ -kvobj *dbAdd(redisDb *db, robj *key, robj **valref) { - KeyMetaSpec keyMetaEmpty; /* No metadata added */ - keyMetaSpecInit(&keyMetaEmpty); - return dbAddInternal(db, key, valref, NULL, &keyMetaEmpty); -} - -kvobj *dbAddByLink(redisDb *db, robj *key, robj **valref, dictEntryLink *link) { - KeyMetaSpec keyMetaEmpty; /* No metadata added */ - keyMetaSpecInit(&keyMetaEmpty); - return dbAddInternal(db, key, valref, link, &keyMetaEmpty); -} - -/* Returns key's hash slot when cluster mode is enabled, or 0 when disabled. - * The only difference between this function and getKeySlot, is that it's not using cached key slot from the current_client - * and always calculates CRC hash. - * This is useful when slot needs to be calculated for a key that user didn't request for, such as in case of eviction. */ -int calculateKeySlot(sds key) { - return server.cluster_enabled ? keyHashSlot(key, (int) sdslen(key)) : 0; -} - -/* Return slot-specific dictionary for key based on key's hash slot when cluster mode is enabled, else 0.*/ -int getKeySlot(sds key) { - if (!server.cluster_enabled) return 0; - /* This is performance optimization that uses pre-set slot id from the current command, - * in order to avoid calculation of the key hash. - * - * This optimization is only used when current_client flag `CLIENT_EXECUTING_COMMAND` is set. - * It only gets set during the execution of command under `call` method. Other flows requesting - * the key slot would fallback to calculateKeySlot. - */ - if (server.current_client && server.current_client->slot >= 0 && server.current_client->flags & CLIENT_EXECUTING_COMMAND) { - debugServerAssertWithInfo(server.current_client, NULL, - (int)keyHashSlot(key, (int)sdslen(key)) == server.current_client->slot); - return server.current_client->slot; - } - int slot = keyHashSlot(key, (int)sdslen(key)); - return slot; -} - -/* Return the slot of the key in the command. - * INVALID_CLUSTER_SLOT if no keys, CLUSTER_CROSSSLOT if cross slot, otherwise the slot number. */ -int getSlotFromCommand(struct redisCommand *cmd, robj **argv, int argc) { - if (!cmd || !server.cluster_enabled) return INVALID_CLUSTER_SLOT; - - /* Get the keys from the command */ - getKeysResult result = GETKEYS_RESULT_INIT; - getKeysFromCommand(cmd, argv, argc, &result); - - /* Extract slot from the keys result. */ - int slot = extractSlotFromKeysResult(argv, &result); - getKeysFreeResult(&result); - return slot; -} - -/* This is a special version of dbAdd() that is used only when loading - * keys from the RDB file: the key is passed as an SDS string that is - * copied by the function and freed by the caller. - * - * Moreover this function will not abort if the key is already busy, to - * give more control to the caller, nor will signal the key as ready - * since it is not useful in this context. - * - * If added to db, returns pointer to the object, Otherwise NULL is returned. - */ -kvobj *dbAddRDBLoad(redisDb *db, sds key, robj **valref, const KeyMetaSpec *keyMetaSpec) { - /* Add new kvobj to the db. */ - int slot = getKeySlot(key); - - dictEntryLink link, bucket; - link = kvstoreDictFindLink(db->keys, slot, key, &bucket); - - /* If already exists, return NULL */ - if (link != NULL) - return NULL; - - /* Create kvobj with metadata bits from KeyMetaSpec */ - robj *val = *valref; - kvobj *kv = kvobjSet(key, val, keyMetaSpec->metabits); - initObjectLRUOrLFU(kv); - kvstoreDictSetAtLink(db->keys, slot, kv, &bucket, 1); - - /* Handle metadata (expiration and modules metadata) */ - if (keyMetaSpec->metabits) { - if (keyMetaSpec->metabits & KEY_META_MASK_EXPIRE) { - /* Expiry is always the first meta (from last) */ - long long expire = keyMetaSpec->meta[KEY_META_ID_MAX - 1]; - kvobj *newkv = setExpireByLink(NULL, db, key, expire, bucket); - serverAssert(newkv == kv); - } - - /* memcpy modules metadata to beginning of kvobj */ - if (keyMetaSpec->metabits & KEY_META_MASK_MODULES) - memcpy(kvobjGetAllocPtr(kv), - keyMetaSpec->meta + KEY_META_ID_MAX - keyMetaSpec->numMeta, - keyMetaSpec->numMeta * sizeof(uint64_t)); - } - - updateKeysizesHist(db, slot, kv->type, -1, (int64_t) getObjectLength(kv)); - if (server.memory_tracking_per_slot) - updateSlotAllocSize(db, slot, 0, kvobjAllocSize(kv)); - return *valref = kv; -} - -/** - * Overwrite an existing key's value in db with a new value. - * - * - If the reference count of 'valref' is 1 the ownership of the value is - * transferred to this function. The value may be reallocated, potentially - * invalidating any external references to it. The (potentially reallocated) - * value is stored in the database, and the 'valref' pointer is updated to - * reflect the new allocation, if one occurs. - * - The reference counter of the value referenced by 'valref' is not incremented - * so the caller must refrain from releasing it using decrRefCount after this - * function is called. - * - This function does not modify the expire time of the existing key. - * - The 'overwrite' flag is an indication whether this is done as part of a - * complete replacement of their key, which can be thought as a deletion and - * replacement (in which case we need to emit deletion signals), or just an - * update of a value of an existing key (when false). - * - The `link` is optional, can save lookup, if provided. - */ -static void dbSetValue(redisDb *db, robj *key, robj **valref, dictEntryLink link, - int overwrite, int updateKeySizes, int keepTTL) { - int freeModuleMeta = 0; - robj *val = *valref; - int slot = getKeySlot(key->ptr); - size_t oldsize = 0; - if (!link) { - link = kvstoreDictFindLink(db->keys, slot, key->ptr, NULL); - serverAssertWithInfo(NULL, key, link != NULL); /* expected to exist */ - } - kvobj *old = dictGetKV(*link); - kvobj *kvNew; - - int64_t oldlen = (int64_t) getObjectLength(old); - int oldtype = old->type; - - /* if hash with HFEs, take care to remove from global HFE DS before attempting - * to manipulate and maybe free kvOld object */ - if (old->type == OBJ_HASH) - estoreRemove(db->subexpires, slot, old); - - long long oldExpire = getExpire(db, key->ptr, old); - - /* All metadata will be kept if not `overwrite` for the new object */ - uint32_t newKeyMetaBits = old->metabits; - /* clear expire if not keepTTL or no old expire */ - if ((!keepTTL) || (oldExpire == -1)) - newKeyMetaBits &= ~KEY_META_MASK_EXPIRE; - - if (overwrite) { - /* On overwrite, discard module metadata excluding expire if set */ - newKeyMetaBits &= KEY_META_MASK_EXPIRE; - /* RM_StringDMA may call dbUnshareStringValue which may free val, so we - * need to incr to retain old */ - incrRefCount(old); - - /* Free related metadata. Ignore builtin metadata (currently only expire) */ - if (getModuleMetaBits(old->metabits)) { - keyMetaOnUnlink(db, key, old); - freeModuleMeta = 1; - } - - /* Although the key is not really deleted from the database, we regard - * overwrite as two steps of unlink+add, so we still need to call the unlink - * callback of the module. */ - moduleNotifyKeyUnlink(key,old,db->id,DB_FLAG_KEY_OVERWRITE); - /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ - signalDeletedKeyAsReady(db,key,old->type); - decrRefCount(old); - /* Because of RM_StringDMA, old may be changed, so we need get old again */ - old = dictGetKV(*link); - } - if (server.memory_tracking_per_slot) - oldsize = kvobjAllocSize(old); - - if ((old->refcount == 1 && old->encoding != OBJ_ENCODING_EMBSTR) && - (val->refcount == 1 && val->encoding != OBJ_ENCODING_EMBSTR) && (!freeModuleMeta)) - { - /* Keep old object in the database. Just swap it's ptr, type and - * encoding with the content of val. */ - robj tmp = *old; - old->type = val->type; - old->encoding = val->encoding; - old->ptr = val->ptr; - val->type = tmp.type; - val->encoding = tmp.encoding; - val->ptr = tmp.ptr; - /* Set new to old to keep the old object. Set old to val to be freed below. */ - kvNew = old; - old = val; - - /* Handle TTL in the optimization path */ - if ((!keepTTL) && (oldExpire >= 0)) - removeExpire(db, key); - } else { - /* Replace the old value at its location in the key space. */ - val->lru = old->lru; - - kvNew = kvobjSet(key->ptr, val, newKeyMetaBits); - kvstoreDictSetAtLink(db->keys, slot, kvNew, &link, 0); - - /* if expiry replace the old value at its location in the expire space. */ - if (oldExpire != -1) { - if (keepTTL) { - kvobjSetExpire(kvNew, oldExpire); /* kvNew not reallocated here */ - dictEntryLink exLink = kvstoreDictFindLink(db->expires, slot, - key->ptr, NULL); - serverAssertWithInfo(NULL, key, exLink != NULL); - kvstoreDictSetAtLink(db->expires, slot, kvNew, &exLink, 0); - } else { - kvstoreDictDelete(db->expires, slot, key->ptr); - } - } - - if (newKeyMetaBits & KEY_META_MASK_MODULES) - keyMetaTransition(old, kvNew); - } - - /* Remove old key and add new key to KEYSIZES histogram */ - int64_t newlen = (int64_t) getObjectLength(kvNew); - if (updateKeySizes) { - /* Save one call if old and new are the same type */ - if (oldtype == kvNew->type) { - updateKeysizesHist(db, slot, oldtype, oldlen, newlen); - } else { - updateKeysizesHist(db, slot, oldtype, oldlen, -1); - updateKeysizesHist(db, slot, kvNew->type, -1, newlen); - } - } - - if (server.memory_tracking_per_slot) - updateSlotAllocSize(db, slot, oldsize, kvobjAllocSize(kvNew)); - - if (server.io_threads_num > 1 && old->encoding == OBJ_ENCODING_RAW) { - /* In multi-threaded mode, the OBJ_ENCODING_RAW string object usually is - * allocated in the IO thread, so we defer the free to the IO thread. - * Besides, we never free a string object in BIO threads, so, even with - * lazyfree-lazy-server-del enabled, a fallback to main thread freeing - * due to defer free failure doesn't go against the config intention. */ - tryDeferFreeClientObject(server.current_client, DEFERRED_OBJECT_TYPE_ROBJ, old); - } else if (server.lazyfree_lazy_server_del) { - freeObjAsync(key, old, db->id); - } else { - decrRefCount(old); - } - *valref = kvNew; -} - -/* Replace an existing key with a new value, we just replace value and don't - * emit any events */ -void dbReplaceValue(redisDb *db, robj *key, robj **valref, int updateKeySizes) { - dbSetValue(db, key, valref, NULL, 0, updateKeySizes, 1); -} - -/* Replace an existing key with a new value (don't emit any events) - * - * parameter 'link' is optional. If provided, saves lookup. - */ -void dbReplaceValueWithLink(redisDb *db, robj *key, robj **val, dictEntryLink link) { - dbSetValue(db, key, val, link, 0, 1, 1); -} - -/* High level Set operation. This function can be used in order to set - * a key, whatever it was existing or not, to a new object. - * - * 1) The value may be reallocated when adding it to the database. The value - * pointer 'valref' is updated to point to the reallocated object. The - * reference count of the value object is *not* incremented. - * 2) clients WATCHing for the destination key notified. - * 3) The expire time of the key is reset (the key is made persistent), - * unless 'SETKEY_KEEPTTL' is enabled in flags. - * 4) The key lookup can take place outside this interface outcome will be - * delivered with 'SETKEY_ALREADY_EXIST' or 'SETKEY_DOESNT_EXIST' - * - * All the new keys in the database should be created via this interface. - * The client 'c' argument may be set to NULL if the operation is performed - * in a context where there is no clear client performing the operation. */ -void setKey(client *c, redisDb *db, robj *key, robj **valref, int flags) { - setKeyByLink(c, db, key, valref, flags, NULL); -} - -/* Like setKey(), but accepts an optional link - * - * - If flags is set with SETKEY_ALREADY_EXIST, then `link` must be provided - * - If flags is set with SETKEY_DOESNT_EXIST, then `link` is optional. If - * provided, it will point to the bucket where the key should be added. - * - If flag is not set (0) then add or update key, and `link` must be NULL - * On return, link get updated, by need, to the inserted kvobj. - */ -void setKeyByLink(client *c, redisDb *db, robj *key, robj **valref, int flags, dictEntryLink *plink) { - dictEntryLink dummy = NULL, *link = plink ? plink : &dummy; - int exists; - kvobj *oldval = NULL; - - if (flags & SETKEY_ALREADY_EXIST) { - debugServerAssert((*link) != NULL); - oldval = dictGetKV(**link); - exists = 1; - } else if (flags & SETKEY_DOESNT_EXIST) { - /* link is optional */ - exists = 0; - } else { - /* Add or update key */ - oldval = lookupKeyWriteWithLink(db, key, link); - exists = oldval != NULL; - } - - if (exists) { - int oldtype = oldval->type; - int newtype = (*valref)->type; - - /* Update the value of an existing key */ - dbSetValue(db, key, valref, *link, 1, 1, flags & SETKEY_KEEPTTL); - - /* Notify keyspace events for override and type change */ - notifyKeyspaceEvent(NOTIFY_OVERWRITTEN, "overwritten", key, db->id); - if (oldtype != newtype) - notifyKeyspaceEvent(NOTIFY_TYPE_CHANGED, "type_changed", key, db->id); - } else { - /* Add the new key to the database */ - dbAddByLink(db, key, valref, link); - } - - /* Signal key modification and update LRM timestamp. */ - keyModified(c,db,key,*valref,!(flags & SETKEY_NO_SIGNAL)); -} - -/* During atomic slot migration, keys that are being imported are in an - * intermediate state. we cannot access them and therefore skip them. - * - * This callback function now is used by: - * - dbRandomKey - * - keysCommand - * - scanCommand - */ -static int accessKeysShouldSkipDictIndex(int didx) { - return !clusterCanAccessKeysInSlot(didx); -} - -/* Return a random key, in form of a Redis object. - * If there are no keys, NULL is returned. - * - * The function makes sure to return keys not already expired. */ -robj *dbRandomKey(redisDb *db) { - dictEntry *de; - int maxtries = 100; - int allvolatile = kvstoreSize(db->keys) == kvstoreSize(db->expires); - - while(1) { - robj *keyobj; - int randomSlot = kvstoreGetFairRandomDictIndex(db->keys, accessKeysShouldSkipDictIndex, 16, 1); - if (randomSlot == -1) return NULL; - de = kvstoreDictGetFairRandomKey(db->keys, randomSlot); - if (de == NULL) return NULL; - - kvobj *kv = dictGetKV(de); - sds key = kvobjGetKey(kv); - keyobj = createStringObject(key,sdslen(key)); - if (allvolatile && (server.masterhost || isPausedActions(PAUSE_ACTION_EXPIRE)) && --maxtries == 0) { - /* If the DB is composed only of keys with an expire set, - * it could happen that all the keys are already logically - * expired in the slave, so the function cannot stop because - * expireIfNeeded() is false, nor it can stop because - * dictGetFairRandomKey() returns NULL (there are keys to return). - * To prevent the infinite loop we do some tries, but if there - * are the conditions for an infinite loop, eventually we - * return a key name that may be already expired. */ - return keyobj; - } - if (expireIfNeeded(db, keyobj, kv, 0) != KEY_VALID) { - decrRefCount(keyobj); - continue; /* search for another key. This expired. */ - } - - return keyobj; - } -} - -/* Helper for sync and async delete. */ -int dbGenericDelete(redisDb *db, robj *key, int async, int flags) { - dictEntryLink link; - int table; - int slot = getKeySlot(key->ptr); - link = kvstoreDictTwoPhaseUnlinkFind(db->keys, slot, key->ptr, &table); - - if (link) { - kvobj *kv = dictGetKV(*link); - - int64_t oldlen = (int64_t) getObjectLength(kv); - int type = kv->type; - - /* If hash object with expiry on fields, remove it from HFE DS of DB */ - if (type == OBJ_HASH) - estoreRemove(db->subexpires, slot, kv); - - /* RM_StringDMA may call dbUnshareStringValue which may free kv, so we - * need to incr to retain kv */ - incrRefCount(kv); /* refcnt=1->2 */ - /* Metadata hook: notify unlink for key metadata cleanup. */ - if (getModuleMetaBits(kv->metabits)) keyMetaOnUnlink(db, key, kv); - /* Tells the module that the key has been unlinked from the database. */ - moduleNotifyKeyUnlink(key, kv, db->id, flags); - /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ - signalDeletedKeyAsReady(db,key,type); - /* We should call decr before freeObjAsync. If not, the refcount may be - * greater than 1, so freeObjAsync doesn't work */ - decrRefCount(kv); - - /* Because of dbUnshareStringValue, the val in db may change. */ - kv = dictGetKV(*link); - - /* if expirable, delete an entry from the expires dict is not decrRefCount of kvobj */ - if (kvobjGetExpire(kv) != -1) - kvstoreDictDelete(db->expires, slot, key->ptr); - - if (async) { - if (server.memory_tracking_per_slot) - updateSlotAllocSize(db, slot, kvobjAllocSize(kv), 0); - freeObjAsync(key, kv, db->id); - /* Set the key to NULL in the main dictionary. */ - kvstoreDictSetAtLink(db->keys, slot, NULL, &link, 0); - } - kvstoreDictTwoPhaseUnlinkFree(db->keys, slot, link, table); - - /* remove key from histogram */ - if(!(flags & DB_FLAG_NO_UPDATE_KEYSIZES)) - updateKeysizesHist(db, slot, type, oldlen, -1); - return 1; - } else { - return 0; - } -} - -/* Delete a key, value, and associated expiration entry if any, from the DB */ -int dbSyncDelete(redisDb *db, robj *key) { - return dbGenericDelete(db, key, 0, DB_FLAG_KEY_DELETED); -} - -/* Delete a key, value, and associated expiration entry if any, from the DB. If - * the value consists of many allocations, it may be freed asynchronously. */ -int dbAsyncDelete(redisDb *db, robj *key) { - return dbGenericDelete(db, key, 1, DB_FLAG_KEY_DELETED); -} - -/* This is a wrapper whose behavior depends on the Redis lazy free - * configuration. Deletes the key synchronously or asynchronously. */ -int dbDelete(redisDb *db, robj *key) { - return dbGenericDelete(db, key, server.lazyfree_lazy_server_del, DB_FLAG_KEY_DELETED); -} - -/* Similar to dbDelete(), but does not update the keysizes histogram. - * This is used when we want to delete a key without affecting the histogram, - * typically in cases where a command flow deletes elements from a collection - * and then deletes the collection itself. In such cases, using dbDelete() - * would incorrectly decrement bin #0. A corresponding test should be added - * to `info-keysizes.tcl`. */ -int dbDeleteSkipKeysizesUpdate(redisDb *db, robj *key) { - return dbGenericDelete(db, key, server.lazyfree_lazy_server_del, - DB_FLAG_KEY_DELETED | DB_FLAG_NO_UPDATE_KEYSIZES); -} - -/* Prepare the string object stored at 'key' to be modified destructively - * to implement commands like SETBIT or APPEND. - * - * An object is usually ready to be modified unless one of the two conditions - * are true: - * - * 1) The object 'o' is shared (refcount > 1), we don't want to affect - * other users. - * 2) The object encoding is not "RAW". - * - * If the object is found in one of the above conditions (or both) by the - * function, an unshared / not-encoded copy of the string object is stored - * at 'key' in the specified 'db'. Otherwise the object 'o' itself is - * returned. - * - * USAGE: - * - * The object 'o' is what the caller already obtained by looking up 'key' - * in 'db', the usage pattern looks like this: - * - * o = lookupKeyWrite(db,key); - * if (checkType(c,o,OBJ_STRING)) return; - * o = dbUnshareStringValue(db,key,o); - * - * At this point the caller is ready to modify the object, for example - * using an sdscat() call to append some data, or anything else. - */ -kvobj *dbUnshareStringValue(redisDb *db, robj *key, kvobj *kv) { - return dbUnshareStringValueByLink(db,key,kv,NULL); -} - -/* Like dbUnshareStringValue(), but accepts a optional link, - * which can be used if we already have one, thus saving the dbFind call. */ -kvobj *dbUnshareStringValueByLink(redisDb *db, robj *key, kvobj *o, dictEntryLink link) { - serverAssert(o->type == OBJ_STRING); - if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) { - robj *decoded = getDecodedObject(o); - o = createRawStringObject(decoded->ptr, sdslen(decoded->ptr)); - decrRefCount(decoded); - dbReplaceValueWithLink(db, key, &o, link); - } - return o; -} - -/* Remove all keys from the database(s) structure. The dbarray argument - * may not be the server main DBs (could be a temporary DB). - * - * The dbnum can be -1 if all the DBs should be emptied, or the specified - * DB index if we want to empty only a single database. - * The function returns the number of keys removed from the database(s). */ -long long emptyDbStructure(redisDb *dbarray, int dbnum, int async, - void(callback)(dict*)) -{ - long long removed = 0; - int startdb, enddb; - - if (dbnum == -1) { - startdb = 0; - enddb = server.dbnum-1; - } else { - startdb = enddb = dbnum; - } - - for (int j = startdb; j <= enddb; j++) { - removed += kvstoreSize(dbarray[j].keys); - if (async) { - emptyDbAsync(&dbarray[j]); - } else { - /* Destroy sub-expires before deleting the kv-objects since ebuckets - * data structure is embedded in the stored kv-objects. */ - estoreEmpty(dbarray[j].subexpires); - kvstoreEmpty(dbarray[j].keys, callback); - kvstoreEmpty(dbarray[j].expires, callback); - } - /* Because all keys of database are removed, reset average ttl. */ - dbarray[j].avg_ttl = 0; - dbarray[j].expires_cursor = 0; - } - - return removed; -} - -/* Remove all data (keys and functions) from all the databases in a - * Redis server. If callback is given the function is called from - * time to time to signal that work is in progress. - * - * The dbnum can be -1 if all the DBs should be flushed, or the specified - * DB number if we want to flush only a single Redis database number. - * - * Flags are be EMPTYDB_NO_FLAGS if no special flags are specified or - * EMPTYDB_ASYNC if we want the memory to be freed in a different thread - * and the function to return ASAP. EMPTYDB_NOFUNCTIONS can also be set - * to specify that we do not want to delete the functions. - * - * On success the function returns the number of keys removed from the - * database(s). Otherwise -1 is returned in the specific case the - * DB number is out of range, and errno is set to EINVAL. */ -long long emptyData(int dbnum, int flags, void(callback)(dict*)) { - int async = (flags & EMPTYDB_ASYNC); - int with_functions = !(flags & EMPTYDB_NOFUNCTIONS); - RedisModuleFlushInfoV1 fi = {REDISMODULE_FLUSHINFO_VERSION,!async,dbnum}; - long long removed = 0; - - if (dbnum < -1 || dbnum >= server.dbnum) { - errno = EINVAL; - return -1; - } - - if (dbnum == -1 || dbnum == 0) - asmCancelTrimJobs(); - - /* Fire the flushdb modules event. */ - moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, - REDISMODULE_SUBEVENT_FLUSHDB_START, - &fi); - - /* Make sure the WATCHed keys are affected by the FLUSH* commands. - * Note that we need to call the function while the keys are still - * there. */ - signalFlushedDb(dbnum, async, NULL); - - /* Empty redis database structure. */ - removed = emptyDbStructure(server.db, dbnum, async, callback); - - if (dbnum == -1) flushSlaveKeysWithExpireList(); - - if (with_functions) { - serverAssert(dbnum == -1); - functionsLibCtxClearCurrent(async); - } - - /* Also fire the end event. Note that this event will fire almost - * immediately after the start event if the flush is asynchronous. */ - moduleFireServerEvent(REDISMODULE_EVENT_FLUSHDB, - REDISMODULE_SUBEVENT_FLUSHDB_END, - &fi); - - return removed; -} - -/* Initialize temporary db on replica for use during diskless replication. */ -redisDb *initTempDb(void) { - int slot_count_bits = 0; - int flags = KVSTORE_ALLOCATE_DICTS_ON_DEMAND; - if (server.cluster_enabled) { - slot_count_bits = CLUSTER_SLOT_MASK_BITS; - flags |= KVSTORE_FREE_EMPTY_DICTS; - } - redisDb *tempDb = zcalloc(sizeof(redisDb)*server.dbnum); - for (int i=0; i<server.dbnum; i++) { - tempDb[i].id = i; - tempDb[i].keys = kvstoreCreate(&kvstoreExType, &dbDictType, slot_count_bits, - flags); - tempDb[i].expires = kvstoreCreate(&kvstoreBaseType, &dbExpiresDictType, - slot_count_bits, flags); - tempDb[i].subexpires = estoreCreate(&subexpiresBucketsType, slot_count_bits); - } - - return tempDb; -} - -/* Discard tempDb, this can be slow (similar to FLUSHALL), but it's always async. */ -void discardTempDb(redisDb *tempDb) { - int async = 1; - - /* Release temp DBs. */ - emptyDbStructure(tempDb, -1, async, NULL); - for (int i=0; i<server.dbnum; i++) { - /* Destroy sub-expires before deleting the kv-objects since ebuckets - * data structure is embedded in the stored kv-objects. */ - estoreRelease(tempDb[i].subexpires); - kvstoreRelease(tempDb[i].keys); - kvstoreRelease(tempDb[i].expires); - } - - zfree(tempDb); -} - -int selectDb(client *c, int id) { - if (id < 0 || id >= server.dbnum) - return C_ERR; - c->db = &server.db[id]; - return C_OK; -} - -long long dbTotalServerKeyCount(void) { - long long total = 0; - int j; - for (j = 0; j < server.dbnum; j++) { - total += kvstoreSize(server.db[j].keys); - } - return total; -} - -/*----------------------------------------------------------------------------- - * Hooks for key space changes. - * - * Every time a key in the database is modified the function - * keyModified() is called. - * - * Every time a DB is flushed the function signalFlushDb() is called. - *----------------------------------------------------------------------------*/ - -/* Called when a key is modified to update LRM timestamp - * and optionally signal watchers/tracking clients. - * - * Arguments: - * - c: client (may be NULL if the key was modified out of a context of a client) - * - db: database containing the key - * - key: the key that was modified - * - val: the value object (if NULL, LRM won't be updated, e.g., for deleted keys) - * - signal: if true, trigger WATCH and client-side tracking invalidation - */ -void keyModified(client *c, redisDb *db, robj *key, robj *val, int signal) { - if (val) updateLRM(val); - if (signal) { - touchWatchedKey(db,key); - trackingInvalidateKey(c,key,1); - } -} - -void signalFlushedDb(int dbid, int async, slotRangeArray *slots) { - int startdb, enddb; - if (dbid == -1) { - startdb = 0; - enddb = server.dbnum-1; - } else { - startdb = enddb = dbid; - } - - for (int j = startdb; j <= enddb; j++) { - scanDatabaseForDeletedKeys(&server.db[j], NULL, slots); - touchAllWatchedKeysInDb(&server.db[j], NULL, slots); - } - - trackingInvalidateKeysOnFlush(async); - - /* Changes in this method may take place in swapMainDbWithTempDb as well, - * where we execute similar calls, but with subtle differences as it's - * not simply flushing db. */ -} - -/*----------------------------------------------------------------------------- - * Type agnostic commands operating on the key space - *----------------------------------------------------------------------------*/ - -/* Return the set of flags to use for the emptyData() call for FLUSHALL - * and FLUSHDB commands. - * - * sync: flushes the database in an sync manner. - * async: flushes the database in an async manner. - * no option: determine sync or async according to the value of lazyfree-lazy-user-flush. - * - * On success C_OK is returned and the flags are stored in *flags, otherwise - * C_ERR is returned and the function sends an error to the client. */ -int getFlushCommandFlags(client *c, int *flags) { - /* Parse the optional ASYNC option. */ - if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"sync")) { - *flags = EMPTYDB_NO_FLAGS; - } else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"async")) { - *flags = EMPTYDB_ASYNC; - } else if (c->argc == 1) { - *flags = server.lazyfree_lazy_user_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; - } else { - addReplyErrorObject(c,shared.syntaxerr); - return C_ERR; - } - return C_OK; -} - -/* Flushes the whole server data set. */ -void flushAllDataAndResetRDB(int flags) { - server.dirty += emptyData(-1,flags,NULL); - if (server.child_type == CHILD_TYPE_RDB) killRDBChild(); - if (server.saveparamslen > 0) { - rdbSaveInfo rsi, *rsiptr; - rsiptr = rdbPopulateSaveInfo(&rsi); - rdbSave(SLAVE_REQ_NONE,server.rdb_filename,rsiptr,RDBFLAGS_NONE); - } - -#if defined(USE_JEMALLOC) - /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. - * for large databases, flushdb blocks for long anyway, so a bit more won't - * harm and this way the flush and purge will be synchronous. */ - if (!(flags & EMPTYDB_ASYNC)) { - /* Only clear the current thread cache. - * Ignore the return call since this will fail if the tcache is disabled. */ - je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0); - - jemalloc_purge(); - } -#endif -} - -/* CB function on blocking ASYNC FLUSH completion - * - * Utilized by commands SFLUSH, FLUSHALL and FLUSHDB. - */ -void flushallSyncBgDone(uint64_t client_id, void *userdata) { - slotRangeArray *slots = userdata; - client *c = lookupClientByID(client_id); - - /* Verify that client still exists and being blocked. */ - if (!(c && c->flags & CLIENT_BLOCKED)) { - slotRangeArrayFree(slots); - return; - } - - /* Update current_client (Called functions might rely on it) */ - client *old_client = server.current_client; - server.current_client = c; - - /* Don't update blocked_us since command was processed in bg by lazy_free thread */ - updateStatsOnUnblock(c, 0 /*blocked_us*/, elapsedUs(c->bstate.lazyfreeStartTime), 0); - - /* Only SFLUSH command pass user data pointer. */ - if (slots) - replySlotsFlushAndFree(c, slots); - else - addReply(c, shared.ok); - - /* mark client as unblocked */ - unblockClient(c, 1); - - if (c->flags & CLIENT_PENDING_COMMAND) { - c->flags &= ~CLIENT_PENDING_COMMAND; - /* The FLUSH command won't be reprocessed, FLUSH command is finished, but - * we still need to complete its full processing flow, including updating - * the replication offset. */ - commandProcessed(c); - } - - /* On flush completion, update the client's memory */ - updateClientMemUsageAndBucket(c); - - /* restore current_client */ - server.current_client = old_client; -} - -/* Common flush command implementation for FLUSHALL, FLUSHDB and SFLUSH. - * - * Return 1 indicates that flush SYNC is actually running in bg as blocking ASYNC - * Return 0 otherwise - * - * slots - provided only by SFLUSH command, otherwise NULL. Will be used on - * completion to reply with the slots flush result. Ownership is passed - * to the completion job in case of `blocking_async`. - */ -int flushCommandCommon(client *c, int type, int flags, slotRangeArray *slots) { - int blocking_async = 0; /* Flush SYNC option to run as blocking ASYNC */ - - /* in case of SYNC, check if we can optimize and run it in bg as blocking ASYNC */ - if ((!(flags & EMPTYDB_ASYNC)) && (!(c->flags & CLIENT_AVOID_BLOCKING_ASYNC_FLUSH))) { - /* Run as ASYNC */ - flags |= EMPTYDB_ASYNC; - blocking_async = 1; - } - - /* Cancel all ASM tasks that overlap with the given slot ranges. */ - clusterAsmCancelBySlotRangeArray(slots, c->argv[0]->ptr); - - if (type == FLUSH_TYPE_ALL) - flushAllDataAndResetRDB(flags | EMPTYDB_NOFUNCTIONS); - else - server.dirty += emptyData(c->db->id,flags | EMPTYDB_NOFUNCTIONS,NULL); - - /* Without the forceCommandPropagation, when DB(s) was already empty, - * FLUSHALL\FLUSHDB will not be replicated nor put into the AOF. */ - forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF); - - /* if blocking ASYNC, block client and add completion job request to BIO lazyfree - * worker's queue. To be called and reply with OK only after all preceding pending - * lazyfree jobs in queue were processed */ - if (blocking_async) { - /* measure bg job till completion as elapsed time of flush command */ - elapsedStart(&c->bstate.lazyfreeStartTime); - - c->bstate.timeout = 0; - /* We still need to perform cleanup operations for the command, including - * updating the replication offset, so mark this command as pending to - * avoid command from being reset during unblock. */ - c->flags |= CLIENT_PENDING_COMMAND; - blockClient(c,BLOCKED_LAZYFREE); - bioCreateCompRq(BIO_WORKER_LAZY_FREE, flushallSyncBgDone, c->id, slots); - } - -#if defined(USE_JEMALLOC) - /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. - * for large databases, flushdb blocks for long anyway, so a bit more won't - * harm and this way the flush and purge will be synchronous. - * - * Take care purge only FLUSHDB for sync flow. FLUSHALL sync flow already - * applied at flushAllDataAndResetRDB. Async flow will apply only later on */ - if ((type != FLUSH_TYPE_ALL) && (!(flags & EMPTYDB_ASYNC))) { - /* Only clear the current thread cache. - * Ignore the return call since this will fail if the tcache is disabled. */ - je_mallctl("thread.tcache.flush", NULL, NULL, NULL, 0); - - jemalloc_purge(); - } -#endif - return blocking_async; -} - -/* FLUSHALL [SYNC|ASYNC] - * - * Flushes the whole server data set. */ -void flushallCommand(client *c) { - int flags; - if (getFlushCommandFlags(c,&flags) == C_ERR) return; - - /* If FLUSH SYNC isn't running as blocking async, then reply */ - if (flushCommandCommon(c, FLUSH_TYPE_ALL, flags, NULL) == 0) - addReply(c, shared.ok); -} - -/* FLUSHDB [SYNC|ASYNC] - * - * Flushes the currently SELECTed Redis DB. */ -void flushdbCommand(client *c) { - int flags; - if (getFlushCommandFlags(c,&flags) == C_ERR) return; - - /* If FLUSH SYNC isn't running as blocking async, then reply */ - if (flushCommandCommon(c, FLUSH_TYPE_DB,flags, NULL) == 0) - addReply(c, shared.ok); - -} - -/* This command implements DEL and UNLINK. */ -void delGenericCommand(client *c, int lazy) { - int numdel = 0, j; - - for (j = 1; j < c->argc; j++) { - if (expireIfNeeded(c->db, c->argv[j], NULL, 0) == KEY_DELETED) - continue; - int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) : - dbSyncDelete(c->db,c->argv[j]); - if (deleted) { - keyModified(c,c->db,c->argv[j],NULL,1); - notifyKeyspaceEvent(NOTIFY_GENERIC, - "del",c->argv[j],c->db->id); - server.dirty++; - numdel++; - } - } - addReplyLongLong(c,numdel); -} - -void delCommand(client *c) { - delGenericCommand(c,server.lazyfree_lazy_user_del); -} - -/* DELEX key [IFEQ match-value|IFNE match-value|IFDEQ match-digest|IFDNE match-digest] - * - * Conditionally removes the specified key. A key is ignored if it does not - * exist. - * If no condition is specified the behavior is the same as DEL command. - * If condition is specified the key must be of STRING type. - * - * IFEQ/IFNE conditions check the match-value against the value of the key - * IFDEQ/IFDNE conditions check the match-digest against the digest of the key's value.*/ -void delexCommand(client *c) { - kvobj *o; - int deleted = 0, should_delete = 0; - - /* If there are no conditions specified we just delete the key */ - if (c->argc == 2) { - delGenericCommand(c, server.lazyfree_lazy_server_del); - return; - } - - /* If we have more than two arguments the next two are condition and - * match-value */ - if (c->argc != 4) { - addReplyErrorArity(c); - return; - } - - robj *key = c->argv[1]; - o = lookupKeyRead(c->db, key); - if (o == NULL) { - addReplyLongLong(c, 0); - return; - } - - /* If any conditions are specified the only supported key type for now is - * string */ - if (o->type != OBJ_STRING) { - addReplyError(c, "Key should be of string type if conditions are specified"); - return; - } - - char *condition = c->argv[2]->ptr; - if (!strcasecmp("ifeq", condition)) { - robj *valueobj = getDecodedObject(o); - sds match_value = c->argv[3]->ptr; - if (sdscmp(valueobj->ptr, match_value) == 0) - should_delete = 1; - - decrRefCount(valueobj); - } else if (!strcasecmp("ifne", condition)) { - robj *valueobj = getDecodedObject(o); - sds match_value = c->argv[3]->ptr; - if (sdscmp(valueobj->ptr, match_value) != 0) - should_delete = 1; - - decrRefCount(valueobj); - } else if (!strcasecmp("ifdeq", condition)) { - if (validateHexDigest(c, c->argv[3]->ptr) != C_OK) - return; - - sds current_digest = stringDigest(o); - if (strcasecmp(current_digest, c->argv[3]->ptr) == 0) - should_delete = 1; - - sdsfree(current_digest); - } else if (!strcasecmp("ifdne", condition)) { - if (validateHexDigest(c, c->argv[3]->ptr) != C_OK) - return; - - sds current_digest = stringDigest(o); - if (strcasecmp(current_digest, c->argv[3]->ptr) != 0) - should_delete = 1; - - sdsfree(current_digest); - } else { - addReplyError(c, "Invalid condition. Use IFEQ, IFNE, IFDEQ, or IFDNE"); - return; - } - - if (should_delete) { - deleted = server.lazyfree_lazy_server_del ? - dbAsyncDelete(c->db, key) : - dbSyncDelete(c->db, key); - } - - if (deleted) { - rewriteClientCommandVector(c, 2, shared.del, key); - keyModified(c, c->db, key, NULL, 1); - notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, c->db->id); - server.dirty++; - } - - addReplyLongLong(c, deleted); -} - -void unlinkCommand(client *c) { - delGenericCommand(c,1); -} - -/* EXISTS key1 key2 ... key_N. - * Return value is the number of keys existing. */ -void existsCommand(client *c) { - long long count = 0; - int j; - - for (j = 1; j < c->argc; j++) { - if (lookupKeyReadWithFlags(c->db,c->argv[j],LOOKUP_NOTOUCH)) count++; - } - addReplyLongLong(c,count); -} - -void selectCommand(client *c) { - int id; - - if (getIntFromObjectOrReply(c, c->argv[1], &id, NULL) != C_OK) - return; - - if (server.cluster_enabled && id != 0) { - addReplyError(c,"SELECT is not allowed in cluster mode"); - return; - } - - if (id != 0) { - server.stat_cluster_incompatible_ops++; - } - - if (selectDb(c,id) == C_ERR) { - addReplyError(c,"DB index is out of range"); - } else { - addReply(c,shared.ok); - } -} - -void randomkeyCommand(client *c) { - robj *key; - - if ((key = dbRandomKey(c->db)) == NULL) { - addReplyNull(c); - return; - } - - addReplyBulk(c,key); - decrRefCount(key); -} - -void keysCommand(client *c) { - dictEntry *de; - sds pattern = c->argv[1]->ptr; - int plen = sdslen(pattern), allkeys, pslot = -1; - unsigned long numkeys = 0; - void *replylen = addReplyDeferredLen(c); - allkeys = (pattern[0] == '*' && plen == 1); - if (server.cluster_enabled && !allkeys) { - pslot = patternHashSlot(pattern, plen); - } - int has_slot = pslot != -1; - union { - kvstoreDictIterator kvs_di; - kvstoreIterator kvs_it; - } it; - if (has_slot) { - if (!kvstoreDictSize(c->db->keys, pslot) || accessKeysShouldSkipDictIndex(pslot)) { - /* Requested slot is empty */ - setDeferredArrayLen(c,replylen,0); - return; - } - kvstoreInitDictSafeIterator(&it.kvs_di, c->db->keys, pslot); - } else { - kvstoreIteratorInit(&it.kvs_it, c->db->keys); - } - - while ((de = has_slot ? kvstoreDictIteratorNext(&it.kvs_di) : kvstoreIteratorNext(&it.kvs_it)) != NULL) { - if (!has_slot && accessKeysShouldSkipDictIndex(kvstoreIteratorGetCurrentDictIndex(&it.kvs_it))) { - continue; - } - - kvobj *kv = dictGetKV(de); - sds key = kvobjGetKey(kv); - - if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) { - if (!keyIsExpired(c->db, NULL, kv)) { - addReplyBulkCBuffer(c, key, sdslen(key)); - numkeys++; - } - } - if (c->flags & CLIENT_CLOSE_ASAP) - break; - } - if (has_slot) - kvstoreResetDictIterator(&it.kvs_di); - else - kvstoreIteratorReset(&it.kvs_it); - setDeferredArrayLen(c,replylen,numkeys); -} - -/* Data used by the dict scan callback. */ -typedef struct { - list *keys; /* elements that collect from dict */ - robj *o; /* o must be a hash/set/zset object, NULL means current db */ - long long type; /* the particular type when scan the db */ - sds pattern; /* pattern string, NULL means no pattern */ - long sampled; /* cumulative number of keys sampled */ - int no_values; /* set to 1 means to return keys only */ - sds typename; /* typename string, NULL means no type filter */ - redisDb *db; /* database reference for expiration checks */ -} scanData; - -/* Helper function to compare key type in scan commands */ -int objectTypeCompare(robj *o, long long target) { - if (o->type != OBJ_MODULE) { - if (o->type != target) - return 0; - else - return 1; - } - /* module type compare */ - moduleType *type = ((moduleValue *)o->ptr)->type; - long long mt = (long long)REDISMODULE_TYPE_SIGN(type->entity.id); - if (target != -mt) - return 0; - else - return 1; -} -/* This callback is used by scanGenericCommand in order to collect elements - * returned by the dictionary iterator into a list. */ -void scanCallback(void *privdata, const dictEntry *de, dictEntryLink plink) { - UNUSED(plink); - Entry *hashEntry = NULL; - scanData *data = (scanData *)privdata; - list *keys = data->keys; - robj *o = data->o; - sds val = NULL; - void *key = NULL; /* if OBJ_HASH then key is of type `hfield`. Otherwise, `sds` */ - void *keyStr; - data->sampled++; - - /* o and typename can not have values at the same time. */ - serverAssert(!((data->type != LLONG_MAX) && o)); - - kvobj *kv = NULL; - zskiplistNode *znode = NULL; - if (!o) { /* If scanning keyspace */ - kv = dictGetKV(de); - keyStr = kvobjGetKey(kv); - } else if (o->type == OBJ_HASH) { - hashEntry = dictGetKey(de); - keyStr = entryGetField(hashEntry); - } else if (o->type == OBJ_ZSET) { - znode = dictGetKey(de); - keyStr = zslGetNodeElement(znode); - } else { - keyStr = dictGetKey(de); - } - - /* Filter element if it does not match the pattern. */ - if (data->pattern) { - if (!stringmatchlen(data->pattern, sdslen(data->pattern), keyStr, sdslen(keyStr), 0)) { - return; - } - } - - if (!o) { - /* Expiration check first - only for database keyspace scanning. - * Use kv obj to avoid robj creation. */ - if (expireIfNeeded(data->db, NULL, kv, 0) != KEY_VALID) - return; - - /* Type filtering - only for database keyspace scanning */ - if (data->typename) { - /* For unknown types (LLONG_MAX), skip all keys */ - if (data->type == LLONG_MAX) - return; - /* For known types, skip keys that don't match */ - if (!objectTypeCompare(kv, data->type)) - return; - } - } - - if (o == NULL) { - key = keyStr; - } else if (o->type == OBJ_SET) { - key = keyStr; - } else if (o->type == OBJ_HASH) { - key = keyStr; - val = entryGetValue(hashEntry); - - /* If field is expired, then ignore */ - if (entryIsExpired(hashEntry)) - return; - - } else if (o->type == OBJ_ZSET) { - char buf[MAX_LONG_DOUBLE_CHARS]; - int len = ld2string(buf, sizeof(buf), znode->score, LD_STR_AUTO); - key = sdsdup(keyStr); - val = sdsnewlen(buf, len); - } else { - serverPanic("Type not handled in SCAN callback."); - } - - listAddNodeTail(keys, key); - if (val && !data->no_values) listAddNodeTail(keys, val); -} - -/* Try to parse a SCAN cursor stored at object 'o': - * if the cursor is valid, store it as unsigned integer into *cursor and - * returns C_OK. Otherwise return C_ERR and send an error to the - * client. */ -int parseScanCursorOrReply(client *c, robj *o, unsigned long long *cursor) { - if (!string2ull(o->ptr, cursor)) { - addReplyError(c, "invalid cursor"); - return C_ERR; - } - return C_OK; -} - -char *obj_type_name[OBJ_TYPE_MAX] = { - "string", - "list", - "set", - "zset", - "hash", - NULL, /* module type is special */ - "stream" -}; - -/* Helper function to get type from a string in scan commands */ -long long getObjectTypeByName(char *name) { - - for (long long i = 0; i < OBJ_TYPE_MAX; i++) { - if (obj_type_name[i] && !strcasecmp(name, obj_type_name[i])) { - return i; - } - } - - moduleType *mt = moduleTypeLookupModuleByNameIgnoreCase(name); - if (mt != NULL) return -(REDISMODULE_TYPE_SIGN(mt->entity.id)); - - return LLONG_MAX; -} - -char *getObjectTypeName(robj *o) { - if (o == NULL) { - return "none"; - } - - serverAssert(o->type >= 0 && o->type < OBJ_TYPE_MAX); - - if (o->type == OBJ_MODULE) { - moduleValue *mv = o->ptr; - return mv->type->entity.name; - } else { - return obj_type_name[o->type]; - } -} - -static int scanShouldSkipDict(dict *d, int didx) { - UNUSED(d); - return accessKeysShouldSkipDictIndex(didx); -} - -/* This command implements SCAN, HSCAN and SSCAN commands. - * If object 'o' is passed, then it must be a Hash, Set or Zset object, otherwise - * if 'o' is NULL the command will operate on the dictionary associated with - * the current database. - * - * When 'o' is not NULL the function assumes that the first argument in - * the client arguments vector is a key so it skips it before iterating - * in order to parse options. - * - * In the case of a Hash object the function returns both the field and value - * of every element on the Hash. */ -void scanGenericCommand(client *c, robj *o, unsigned long long cursor) { - int i, j; - listNode *node; - long count = 10; - sds pat = NULL; - sds typename = NULL; - long long type = LLONG_MAX; - int patlen = 0, use_pattern = 0, no_values = 0; - dict *ht; - - /* Object must be NULL (to iterate keys names), or the type of the object - * must be Set, Sorted Set, or Hash. */ - serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH || - o->type == OBJ_ZSET); - - /* Set i to the first option argument. The previous one is the cursor. */ - i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */ - - /* Step 1: Parse options. */ - while (i < c->argc) { - j = c->argc - i; - if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) { - if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL) - != C_OK) - { - return; - } - - if (count < 1) { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - - i += 2; - } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) { - pat = c->argv[i+1]->ptr; - patlen = sdslen(pat); - - /* The pattern always matches if it is exactly "*", so it is - * equivalent to disabling it. */ - use_pattern = !(patlen == 1 && pat[0] == '*'); - - i += 2; - } else if (!strcasecmp(c->argv[i]->ptr, "type") && o == NULL && j >= 2) { - /* SCAN for a particular type only applies to the db dict */ - typename = c->argv[i+1]->ptr; - type = getObjectTypeByName(typename); - if (type == LLONG_MAX) { - /* TODO: uncomment in redis 8.0 - addReplyErrorFormat(c, "unknown type name '%s'", typename); - return; */ - } - i+= 2; - } else if (!strcasecmp(c->argv[i]->ptr, "novalues")) { - if (!o || o->type != OBJ_HASH) { - addReplyError(c, "NOVALUES option can only be used in HSCAN"); - return; - } - no_values = 1; - i++; - } else { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - } - - /* Step 2: Iterate the collection. - * - * Note that if the object is encoded with a listpack, intset, or any other - * representation that is not a hash table, we are sure that it is also - * composed of a small number of elements. So to avoid taking state we - * just return everything inside the object in a single call, setting the - * cursor to zero to signal the end of the iteration. */ - - /* Handle the case of a hash table. */ - ht = NULL; - if (o == NULL) { - ht = NULL; - } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) { - ht = o->ptr; - } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) { - ht = o->ptr; - } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) { - zset *zs = o->ptr; - ht = zs->dict; - } - - list *keys = listCreate(); - /* Set a free callback for the contents of the collected keys list. - * For the main keyspace dict, and when we scan a key that's dict encoded - * (we have 'ht'), we don't need to define free method because the strings - * in the list are just a shallow copy from the pointer in the dictEntry. - * When scanning a key with other encodings (e.g. listpack), we need to - * free the temporary strings we add to that list. - * The exception to the above is ZSET, where we do allocate temporary - * strings even when scanning a dict. */ - if (o && (!ht || o->type == OBJ_ZSET)) { - listSetFreeMethod(keys, sdsfreegeneric); - } - - /* For main dictionary scan or data structure using hashtable. */ - if (!o || ht) { - /* We set the max number of iterations to ten times the specified - * COUNT, so if the hash table is in a pathological state (very - * sparsely populated) we avoid to block too much time at the cost - * of returning no or very few elements. */ - long maxiterations = count*10; - - /* We pass scanData which have three pointers to the callback: - * 1. data.keys: the list to which it will add new elements; - * 2. data.o: the object containing the dictionary so that - * it is possible to fetch more data in a type-dependent way; - * 3. data.type: the specified type scan in the db, LLONG_MAX means - * type matching is no needed; - * 4. data.pattern: the pattern string; - * 5. data.sampled: the maxiteration limit is there in case we're - * working on an empty dict, one with a lot of empty buckets, and - * for the buckets are not empty, we need to limit the spampled number - * to prevent a long hang time caused by filtering too many keys; - * 6. data.no_values: to control whether values will be returned or - * only keys are returned. */ - scanData data = { - .keys = keys, - .o = o, - .type = type, - .pattern = use_pattern ? pat : NULL, - .sampled = 0, - .no_values = no_values, - .typename = typename, - .db = c->db, - }; - - /* A pattern may restrict all matching keys to one cluster slot. */ - int onlydidx = -1; - if (o == NULL && use_pattern && server.cluster_enabled) { - onlydidx = patternHashSlot(pat, patlen); - } - do { - /* In cluster mode there is a separate dictionary for each slot. - * If cursor is empty, we should try exploring next non-empty slot. */ - if (o == NULL) { - cursor = kvstoreScan(c->db->keys, cursor, onlydidx, scanCallback, scanShouldSkipDict, &data); - } else { - cursor = dictScan(ht, cursor, scanCallback, &data); - } - } while (cursor && maxiterations-- && data.sampled < count); - } else if (o->type == OBJ_SET) { - unsigned long array_reply_len = 0; - void *replylen = NULL; - listRelease(keys); - char *str; - char buf[LONG_STR_SIZE]; - size_t len; - int64_t llele; - /* Reply to the client. */ - addReplyArrayLen(c, 2); - /* Cursor is always 0 given we iterate over all set */ - addReplyBulkLongLong(c,0); - /* If there is no pattern the length is the entire set size, otherwise we defer the reply size */ - if (use_pattern) - replylen = addReplyDeferredLen(c); - else { - array_reply_len = setTypeSize(o); - addReplyArrayLen(c, array_reply_len); - } - - setTypeIterator si; - unsigned long cur_length = 0; - setTypeInitIterator(&si, o); - while (setTypeNext(&si, &str, &len, &llele) != -1) { - if (str == NULL) { - len = ll2string(buf, sizeof(buf), llele); - } - char *key = str ? str : buf; - if (use_pattern && !stringmatchlen(pat, patlen, key, len, 0)) { - continue; - } - addReplyBulkCBuffer(c, key, len); - cur_length++; - } - setTypeResetIterator(&si); - if (use_pattern) - setDeferredArrayLen(c,replylen,cur_length); - else - serverAssert(cur_length == array_reply_len); /* fail on corrupt data */ - return; - } else if ((o->type == OBJ_HASH || o->type == OBJ_ZSET) && - o->encoding == OBJ_ENCODING_LISTPACK) - { - unsigned char *p = lpFirst(o->ptr); - unsigned char *str; - int64_t len; - unsigned long array_reply_len = 0; - unsigned char intbuf[LP_INTBUF_SIZE]; - void *replylen = NULL; - listRelease(keys); - - /* Reply to the client. */ - addReplyArrayLen(c, 2); - /* Cursor is always 0 given we iterate over all set */ - addReplyBulkLongLong(c,0); - /* If there is no pattern the length is the entire set size, otherwise we defer the reply size */ - if (use_pattern) - replylen = addReplyDeferredLen(c); - else { - array_reply_len = o->type == OBJ_HASH ? hashTypeLength(o, 0) : zsetLength(o); - if (!no_values) { - array_reply_len *= 2; - } - addReplyArrayLen(c, array_reply_len); - } - unsigned long cur_length = 0; - while(p) { - str = lpGet(p, &len, intbuf); - /* point to the value */ - p = lpNext(o->ptr, p); - if (use_pattern && !stringmatchlen(pat, patlen, (char *)str, len, 0)) { - /* jump to the next key/val pair */ - p = lpNext(o->ptr, p); - continue; - } - /* add key object */ - addReplyBulkCBuffer(c, str, len); - cur_length++; - /* add value object */ - if (!no_values) { - str = lpGet(p, &len, intbuf); - addReplyBulkCBuffer(c, str, len); - cur_length++; - } - p = lpNext(o->ptr, p); - } - if (use_pattern) - setDeferredArrayLen(c,replylen,cur_length); - else - serverAssert(cur_length == array_reply_len); /* fail on corrupt data */ - return; - } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_LISTPACK_EX) { - int64_t len; - long long expire_at; - unsigned char *lp = hashTypeListpackGetLp(o); - unsigned char *p = lpFirst(lp); - unsigned char *str, *val; - unsigned char intbuf[LP_INTBUF_SIZE]; - void *replylen = NULL; - - listRelease(keys); - /* Reply to the client. */ - addReplyArrayLen(c, 2); - /* Cursor is always 0 given we iterate over all set */ - addReplyBulkLongLong(c,0); - /* In the case of OBJ_ENCODING_LISTPACK_EX we always defer the reply size given some fields might be expired */ - replylen = addReplyDeferredLen(c); - unsigned long cur_length = 0; - - while (p) { - str = lpGet(p, &len, intbuf); - p = lpNext(lp, p); - val = p; /* Keep pointer to value */ - - p = lpNext(lp, p); - serverAssert(p && lpGetIntegerValue(p, &expire_at)); - - if (hashTypeIsExpired(o, expire_at) || - (use_pattern && !stringmatchlen(pat, patlen, (char *)str, len, 0))) - { - /* jump to the next key/val pair */ - p = lpNext(lp, p); - continue; - } - - /* add key object */ - addReplyBulkCBuffer(c, str, len); - cur_length++; - /* add value object */ - if (!no_values) { - str = lpGet(val, &len, intbuf); - addReplyBulkCBuffer(c, str, len); - cur_length++; - } - p = lpNext(lp, p); - } - setDeferredArrayLen(c,replylen,cur_length); - return; - } else { - serverPanic("Not handled encoding in SCAN."); - } - - /* Step 3: Reply to the client. */ - addReplyArrayLen(c, 2); - addReplyBulkLongLong(c,cursor); - - addReplyArrayLen(c, listLength(keys)); - while ((node = listFirst(keys)) != NULL) { - void *key = listNodeValue(node); - addReplyBulkCBuffer(c, key, sdslen(key)); - listDelNode(keys, node); - } - - listRelease(keys); -} - -/* The SCAN command completely relies on scanGenericCommand. */ -void scanCommand(client *c) { - unsigned long long cursor; - if (parseScanCursorOrReply(c,c->argv[1],&cursor) == C_ERR) return; - scanGenericCommand(c,NULL,cursor); -} - -void dbsizeCommand(client *c) { - addReplyLongLong(c,dbSize(c->db)); -} - -void lastsaveCommand(client *c) { - addReplyLongLong(c,server.lastsave); -} - -void typeCommand(client *c) { - kvobj *kv = lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH); - addReplyStatus(c, getObjectTypeName(kv)); -} - -void shutdownCommand(client *c) { - int flags = SHUTDOWN_NOFLAGS; - int abort = 0; - for (int i = 1; i < c->argc; i++) { - if (!strcasecmp(c->argv[i]->ptr,"nosave")) { - flags |= SHUTDOWN_NOSAVE; - } else if (!strcasecmp(c->argv[i]->ptr,"save")) { - flags |= SHUTDOWN_SAVE; - } else if (!strcasecmp(c->argv[i]->ptr, "now")) { - flags |= SHUTDOWN_NOW; - } else if (!strcasecmp(c->argv[i]->ptr, "force")) { - flags |= SHUTDOWN_FORCE; - } else if (!strcasecmp(c->argv[i]->ptr, "abort")) { - abort = 1; - } else { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - } - if ((abort && flags != SHUTDOWN_NOFLAGS) || - (flags & SHUTDOWN_NOSAVE && flags & SHUTDOWN_SAVE)) - { - /* Illegal combo. */ - addReplyErrorObject(c,shared.syntaxerr); - return; - } - - if (abort) { - if (abortShutdown() == C_OK) - addReply(c, shared.ok); - else - addReplyError(c, "No shutdown in progress."); - return; - } - - if (!(flags & SHUTDOWN_NOW) && c->flags & CLIENT_DENY_BLOCKING) { - addReplyError(c, "SHUTDOWN without NOW or ABORT isn't allowed for DENY BLOCKING client"); - return; - } - - if (!(flags & SHUTDOWN_NOSAVE) && isInsideYieldingLongCommand()) { - /* Script timed out. Shutdown allowed only with the NOSAVE flag. See - * also processCommand where these errors are returned. */ - if (server.busy_module_yield_flags && server.busy_module_yield_reply) { - addReplyErrorFormat(c, "-BUSY %s", server.busy_module_yield_reply); - } else if (server.busy_module_yield_flags) { - addReplyErrorObject(c, shared.slowmoduleerr); - } else if (scriptIsEval()) { - addReplyErrorObject(c, shared.slowevalerr); - } else { - addReplyErrorObject(c, shared.slowscripterr); - } - return; - } - - blockClientShutdown(c); - if (prepareForShutdown(flags) == C_OK) exit(0); - /* If we're here, then shutdown is ongoing (the client is still blocked) or - * failed (the client has received an error). */ -} - -void renameGenericCommand(client *c, int nx) { - kvobj *o; - int samekey = 0; - uint64_t minHashExpireTime = EB_EXPIRE_TIME_INVALID; - - /* When source and dest key is the same, no operation is performed, - * if the key exists, however we still return an error on unexisting key. */ - if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1; - - if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL) - return; - - if (samekey) { - addReply(c,nx ? shared.czero : shared.ok); - return; - } - - incrRefCount(o); - kvobj *destval = lookupKeyWrite(c->db,c->argv[2]); - int overwritten = 0; - int desttype = -1; - if (destval != NULL) { - if (nx) { - decrRefCount(o); - addReply(c,shared.czero); - return; - } - - /* Overwrite: delete the old key before creating the new one - * with the same name. */ - desttype = destval->type; - dbDelete(c->db,c->argv[2]); - overwritten = 1; - } - - /* If hash with expiration on fields then remove it from global HFE DS and - * keep next expiration time. Otherwise, dbDelete() will remove it from the - * global HFE DS and we will lose the expiration time. */ - int srctype = o->type; - if (srctype == OBJ_HASH) - minHashExpireTime = estoreRemove(c->db->subexpires, getKeySlot(c->argv[1]->ptr), o); - - /* Prepare metadata for the renamed key */ - KeyMetaSpec keymeta; - keyMetaSpecInit(&keymeta); - if (o->metabits) keyMetaOnRename(c->db, o, c->argv[1], c->argv[2], &keymeta); - - dbDelete(c->db,c->argv[1]); - - dbAddInternal(c->db, c->argv[2], &o, NULL, &keymeta); - - /* If hash with HFEs, register in DB subexpires */ - if (minHashExpireTime != EB_EXPIRE_TIME_INVALID) - estoreAdd(c->db->subexpires, getKeySlot(c->argv[2]->ptr), o, minHashExpireTime); - - keyModified(c,c->db,c->argv[1],NULL,1); - keyModified(c,c->db,c->argv[2],NULL,1); /* LRM already updated by dbAddInternal */ - notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", - c->argv[1],c->db->id); - notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to", - c->argv[2],c->db->id); - if (overwritten) { - notifyKeyspaceEvent(NOTIFY_OVERWRITTEN, "overwritten", c->argv[2], c->db->id); - if (desttype != srctype) - notifyKeyspaceEvent(NOTIFY_TYPE_CHANGED, "type_changed", c->argv[2], c->db->id); - } - server.dirty++; - addReply(c,nx ? shared.cone : shared.ok); -} - -void renameCommand(client *c) { - renameGenericCommand(c,0); -} - -void renamenxCommand(client *c) { - renameGenericCommand(c,1); -} - -void moveCommand(client *c) { - redisDb *src, *dst; - int srcid, dbid; - uint64_t hashExpireTime = EB_EXPIRE_TIME_INVALID; - - if (server.cluster_enabled) { - addReplyError(c,"MOVE is not allowed in cluster mode"); - return; - } - - /* Obtain source and target DB pointers */ - src = c->db; - srcid = c->db->id; - - if (getIntFromObjectOrReply(c, c->argv[2], &dbid, NULL) != C_OK) - return; - - if (selectDb(c,dbid) == C_ERR) { - addReplyError(c,"DB index is out of range"); - return; - } - dst = c->db; - selectDb(c,srcid); /* Back to the source DB */ - - /* If the user is moving using as target the same - * DB as the source DB it is probably an error. */ - if (src == dst) { - addReplyErrorObject(c,shared.sameobjecterr); - return; - } - - /* Record incompatible operations in cluster mode */ - server.stat_cluster_incompatible_ops++; - - /* Check if the element exists and get a reference */ - kvobj *kv = lookupKeyWrite(c->db,c->argv[1]); - if (!kv) { - addReply(c,shared.czero); - return; - } - - /* Return zero if the key already exists in the target DB */ - dictEntryLink dstBucket; - if (lookupKey(dst, c->argv[1], LOOKUP_WRITE, &dstBucket) != NULL) { - addReply(c,shared.czero); - return; - } - - int slot = getKeySlot(c->argv[1]->ptr); - - /* If hash with expiration on fields, remove it from DB subexpires and keep - * aside registered expiration time. Must be before removal of the - * object since it embeds ExpireMeta that is used by subexpires */ - if (kv->type == OBJ_HASH) - hashExpireTime = estoreRemove(src->subexpires, slot, kv); - - /* Move a side metadata before dbDelete() */ - KeyMetaSpec keymeta; - keyMetaSpecInit(&keymeta); - keyMetaOnMove(kv, c->argv[1], srcid, dbid, &keymeta); - - incrRefCount(kv); /* ref counter = 1->2 */ - dbDelete(src,c->argv[1]); /* ref counter = 2->1 */ - - dbAddInternal(dst, c->argv[1], &kv, &dstBucket, &keymeta); - - /* If object of type hash with expiration on fields. Taken care to add the - * hash to subexpires of `dst` only after dbDelete(). */ - if (hashExpireTime != EB_EXPIRE_TIME_INVALID) - estoreAdd(dst->subexpires, slot, kv, hashExpireTime); - - keyModified(c,src,c->argv[1],NULL,1); - keyModified(c,dst,c->argv[1],NULL,1); /* LRM already updated by dbAddInternal */ - notifyKeyspaceEvent(NOTIFY_GENERIC, - "move_from",c->argv[1],src->id); - notifyKeyspaceEvent(NOTIFY_GENERIC, - "move_to",c->argv[1],dst->id); - - server.dirty++; - addReply(c,shared.cone); -} - -void copyCommand(client *c) { - kvobj *o; - redisDb *src, *dst; - int srcid, dbid; - int j, replace = 0, delete = 0; - - /* Obtain source and target DB pointers - * Default target DB is the same as the source DB - * Parse the REPLACE option and targetDB option. */ - src = c->db; - dst = c->db; - srcid = c->db->id; - dbid = c->db->id; - for (j = 3; j < c->argc; j++) { - int additional = c->argc - j - 1; - if (!strcasecmp(c->argv[j]->ptr,"replace")) { - replace = 1; - } else if (!strcasecmp(c->argv[j]->ptr, "db") && additional >= 1) { - if (getIntFromObjectOrReply(c, c->argv[j+1], &dbid, NULL) != C_OK) - return; - - if (selectDb(c, dbid) == C_ERR) { - addReplyError(c,"DB index is out of range"); - return; - } - dst = c->db; - selectDb(c,srcid); /* Back to the source DB */ - j++; /* Consume additional arg. */ - } else { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - } - - if ((server.cluster_enabled == 1) && (srcid != 0 || dbid != 0)) { - addReplyError(c,"Copying to another database is not allowed in cluster mode"); - return; - } - - /* If the user select the same DB as - * the source DB and using newkey as the same key - * it is probably an error. */ - robj *key = c->argv[1]; - robj *newkey = c->argv[2]; - if (src == dst && (sdscmp(key->ptr, newkey->ptr) == 0)) { - addReplyErrorObject(c,shared.sameobjecterr); - return; - } - - if (srcid != 0 || dbid != 0) { - server.stat_cluster_incompatible_ops++; - } - - /* Check if the element exists and get a reference */ - o = lookupKeyRead(c->db, key); - if (!o) { - addReply(c,shared.czero); - return; - } - - /* Return zero if the key already exists in the target DB. - * If REPLACE option is selected, delete newkey from targetDB. */ - kvobj *destval = lookupKeyWrite(dst,newkey); - if (destval != NULL) { - if (replace) { - delete = 1; - } else { - addReply(c,shared.czero); - return; - } - } - int destoldtype = destval ? destval->type : -1; - int destnewtype = o->type; - - /* Duplicate object according to object's type. */ - robj *newobj; - uint64_t minHashExpire = EB_EXPIRE_TIME_INVALID; /* HFE feature */ - switch(o->type) { - case OBJ_STRING: newobj = dupStringObject(o); break; - case OBJ_LIST: newobj = listTypeDup(o); break; - case OBJ_SET: newobj = setTypeDup(o); break; - case OBJ_ZSET: newobj = zsetDup(o); break; - case OBJ_HASH: newobj = hashTypeDup(o, &minHashExpire); break; - case OBJ_STREAM: newobj = streamDup(o); break; - case OBJ_MODULE: - newobj = moduleTypeDupOrReply(c, key, newkey, dst->id, o); - if (!newobj) return; - break; - default: - addReplyError(c, "unknown type object"); - return; - } - - if (delete) { - dbDelete(dst,newkey); - } - - /* Prepare metadata for the new key */ - KeyMetaSpec keymeta; - keyMetaSpecInit(&keymeta); - if (o->metabits) keyMetaOnCopy(o, key, newkey, c->db->id, dst->id, &keymeta); - - kvobj *kvCopy = dbAddInternal(dst, newkey, &newobj, NULL, &keymeta); - - /* If minExpiredField was set, then the object is hash with expiration - * on fields and need to register it in global HFE DS */ - if (minHashExpire != EB_EXPIRE_TIME_INVALID) - estoreAdd(dst->subexpires, getKeySlot(newkey->ptr), kvCopy, minHashExpire); - - /* OK! key copied. Signal modification (LRM already updated by dbAddInternal) */ - keyModified(c,dst,c->argv[2],NULL,1); - notifyKeyspaceEvent(NOTIFY_GENERIC,"copy_to",c->argv[2],dst->id); - - /* `delete` implies the destination key was overwritten */ - if (delete) { - notifyKeyspaceEvent(NOTIFY_OVERWRITTEN, "overwritten", c->argv[2], dst->id); - if (destoldtype != destnewtype) - notifyKeyspaceEvent(NOTIFY_TYPE_CHANGED, "type_changed", c->argv[2], dst->id); - } - - server.dirty++; - addReply(c,shared.cone); -} - -/* Helper function for dbSwapDatabases(): scans the list of keys that have - * one or more blocked clients for B[LR]POP or other blocking commands - * and signal the keys as ready if they are of the right type. See the comment - * where the function is used for more info. */ -void scanDatabaseForReadyKeys(redisDb *db) { - dictEntry *de; - dictIterator di; - dictInitSafeIterator(&di, db->blocking_keys); - while((de = dictNext(&di)) != NULL) { - robj *key = dictGetKey(de); - kvobj *kv = dbFind(db, key->ptr); - if (kv) - signalKeyAsReady(db, key, kv->type); - } - dictResetIterator(&di); -} - -/* Since we are unblocking XREADGROUP clients in the event the key was - * deleted/overwritten we must do the same in case the database was - * flushed/swapped. If 'slots' is not NULL, only keys in the specified slot - * range are considered. */ -void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with, slotRangeArray *slots) { - dictEntry *de; - dictIterator di; - - dictInitSafeIterator(&di, emptied->blocking_keys); - while((de = dictNext(&di)) != NULL) { - robj *key = dictGetKey(de); - /* Check if key belongs to the slot range. */ - if (slots && !slotRangeArrayContains(slots, keyHashSlot(key->ptr, sdslen(key->ptr)))) - continue; - int existed = 0, exists = 0; - int original_type = -1, curr_type = -1; - - kvobj *kv = dbFind(emptied, key->ptr); - if (kv) { - original_type = kv->type; - existed = 1; - } - - if (replaced_with) { - kv = dbFind(replaced_with, key->ptr); - if (kv) { - curr_type = kv->type; - exists = 1; - } - } - /* We want to try to unblock any client using a blocking XREADGROUP */ - if ((existed && !exists) || original_type != curr_type) - signalDeletedKeyAsReady(emptied, key, original_type); - } - dictResetIterator(&di); -} - -/* Swap two databases at runtime so that all clients will magically see - * the new database even if already connected. Note that the client - * structure c->db points to a given DB, so we need to be smarter and - * swap the underlying referenced structures, otherwise we would need - * to fix all the references to the Redis DB structure. - * - * Returns C_ERR if at least one of the DB ids are out of range, otherwise - * C_OK is returned. */ -int dbSwapDatabases(int id1, int id2) { - if (id1 < 0 || id1 >= server.dbnum || - id2 < 0 || id2 >= server.dbnum) return C_ERR; - if (id1 == id2) return C_OK; - redisDb aux = server.db[id1]; - redisDb *db1 = &server.db[id1], *db2 = &server.db[id2]; - - /* Swapdb should make transaction fail if there is any - * client watching keys */ - touchAllWatchedKeysInDb(db1, db2, NULL); - touchAllWatchedKeysInDb(db2, db1, NULL); - - /* Try to unblock any XREADGROUP clients if the key no longer exists. */ - scanDatabaseForDeletedKeys(db1, db2, NULL); - scanDatabaseForDeletedKeys(db2, db1, NULL); - - /* Swap hash tables. Note that we don't swap blocking_keys, - * ready_keys and watched_keys, since we want clients to - * remain in the same DB they were. */ - db1->keys = db2->keys; - db1->expires = db2->expires; - db1->subexpires = db2->subexpires; - db1->avg_ttl = db2->avg_ttl; - db1->expires_cursor = db2->expires_cursor; - - db2->keys = aux.keys; - db2->expires = aux.expires; - db2->subexpires = aux.subexpires; - db2->avg_ttl = aux.avg_ttl; - db2->expires_cursor = aux.expires_cursor; - - /* Now we need to handle clients blocked on lists: as an effect - * of swapping the two DBs, a client that was waiting for list - * X in a given DB, may now actually be unblocked if X happens - * to exist in the new version of the DB, after the swap. - * - * However normally we only do this check for efficiency reasons - * in dbAdd() when a list is created. So here we need to rescan - * the list of clients blocked on lists and signal lists as ready - * if needed. */ - scanDatabaseForReadyKeys(db1); - scanDatabaseForReadyKeys(db2); - return C_OK; -} - -/* Logically, this discards (flushes) the old main database, and apply the newly loaded - * database (temp) as the main (active) database, the actual freeing of old database - * (which will now be placed in the temp one) is done later. */ -void swapMainDbWithTempDb(redisDb *tempDb) { - for (int i=0; i<server.dbnum; i++) { - redisDb aux = server.db[i]; - redisDb *activedb = &server.db[i], *newdb = &tempDb[i]; - - /* Swapping databases should make transaction fail if there is any - * client watching keys. */ - touchAllWatchedKeysInDb(activedb, newdb, NULL); - - /* Try to unblock any XREADGROUP clients if the key no longer exists. */ - scanDatabaseForDeletedKeys(activedb, newdb, NULL); - - /* Swap hash tables. Note that we don't swap blocking_keys, - * ready_keys and watched_keys, since clients - * remain in the same DB they were. */ - activedb->keys = newdb->keys; - activedb->expires = newdb->expires; - activedb->subexpires = newdb->subexpires; - activedb->avg_ttl = newdb->avg_ttl; - activedb->expires_cursor = newdb->expires_cursor; - - newdb->keys = aux.keys; - newdb->expires = aux.expires; - newdb->subexpires = aux.subexpires; - newdb->avg_ttl = aux.avg_ttl; - newdb->expires_cursor = aux.expires_cursor; - - /* Now we need to handle clients blocked on lists: as an effect - * of swapping the two DBs, a client that was waiting for list - * X in a given DB, may now actually be unblocked if X happens - * to exist in the new version of the DB, after the swap. - * - * However normally we only do this check for efficiency reasons - * in dbAdd() when a list is created. So here we need to rescan - * the list of clients blocked on lists and signal lists as ready - * if needed. */ - scanDatabaseForReadyKeys(activedb); - } - - trackingInvalidateKeysOnFlush(1); - flushSlaveKeysWithExpireList(); -} - -/* SWAPDB db1 db2 */ -void swapdbCommand(client *c) { - int id1, id2; - - /* Not allowed in cluster mode: we have just DB 0 there. */ - if (server.cluster_enabled) { - addReplyError(c,"SWAPDB is not allowed in cluster mode"); - return; - } - - /* Get the two DBs indexes. */ - if (getIntFromObjectOrReply(c, c->argv[1], &id1, - "invalid first DB index") != C_OK) - return; - - if (getIntFromObjectOrReply(c, c->argv[2], &id2, - "invalid second DB index") != C_OK) - return; - - /* Swap... */ - if (dbSwapDatabases(id1,id2) == C_ERR) { - addReplyError(c,"DB index is out of range"); - return; - } else { - RedisModuleSwapDbInfo si = {REDISMODULE_SWAPDBINFO_VERSION,id1,id2}; - moduleFireServerEvent(REDISMODULE_EVENT_SWAPDB,0,&si); - server.dirty++; - server.stat_cluster_incompatible_ops++; - addReply(c,shared.ok); - } -} - -/*----------------------------------------------------------------------------- - * Expires API - *----------------------------------------------------------------------------*/ - -/* Remove expiry from key - * - * Remove the object from db->expires and set to -1 attached TTL to KV - */ -int removeExpire(redisDb *db, robj *key) { - int table; - int slot = getKeySlot(key->ptr); - dictEntryLink link = kvstoreDictTwoPhaseUnlinkFind(db->expires, slot, key->ptr, &table); - - if (link == NULL) return 0; - dictEntry *de = *link; - kvobj *kv = dictGetKV(de); - kvobj *newkv = kvobjSetExpire(kv, -1); - serverAssert(newkv == kv); - kvstoreDictTwoPhaseUnlinkFree(db->expires, slot, link, table); - return 1; -} - - -/* Set an expire to the specified key. If the expire is set in the context - * of an user calling a command 'c' is the client, otherwise 'c' is set - * to NULL. The 'when' parameter is the absolute unix time in milliseconds - * after which the key will no longer be considered valid. - * - * Note: It may reallocate kvobj. The returned ref may point to a new object. */ -kvobj *setExpire(client *c, redisDb *db, robj *key, long long when) { - return setExpireByLink(c,db,key->ptr,when,NULL); -} - -/* Like setExpire(), but accepts an optional `keyLink` to save lookup */ -kvobj *setExpireByLink(client *c, redisDb *db, sds key, long long when, dictEntryLink keyLink) { - /* Reuse the sds from the main dict in the expire dict */ - int slot = getKeySlot(key); - size_t oldsize = 0; - if (!keyLink) { - keyLink = kvstoreDictFindLink(db->keys, slot, key, NULL); - serverAssert(keyLink != NULL); - } - kvobj *kv = dictGetKV(*keyLink); - long long old_when = kvobjGetExpire(kv); - - if (old_when != -1) { /* old expire */ - kvobj *kvnew = kvobjSetExpire(kv, when); /* release kv if reallocated */ - /* Val already had an expire field, so it was not reallocated. */ - serverAssert(kv == kvnew); - } else { /* No old expire */ - if (server.memory_tracking_per_slot) - oldsize = kvobjAllocSize(kv); - uint64_t subexpiry = EB_EXPIRE_TIME_INVALID; - /* If hash with HFEs, take care to remove from global HFE DS before attempting - * to manipulate and maybe free kv object */ - if (kv->type == OBJ_HASH) - subexpiry = estoreRemove(db->subexpires, slot, kv); - - kvobj *kvnew = kvobjSetExpire(kv, when); /* release kv if reallocated */ - /* if kvobj was reallocated, update dict */ - if (kv != kvnew) { - kvstoreDictSetAtLink(db->keys, slot, kvnew, &keyLink, 0); - if (server.memory_tracking_per_slot) - updateSlotAllocSize(db, slot, oldsize, kvobjAllocSize(kvnew)); - kv = kvnew; - } - /* Now add to expires */ - dictEntry *de = kvstoreDictAddRaw(db->expires, slot, kv, NULL); - serverAssert(de != NULL); - - if (subexpiry != EB_EXPIRE_TIME_INVALID) - estoreAdd(db->subexpires, slot, kv, subexpiry); - } - - int writable_slave = server.masterhost && server.repl_slave_ro == 0; - if (c && writable_slave && !(c->flags & CLIENT_MASTER)) - rememberSlaveKeyWithExpire(db,key); - return kv; -} - -/* Retrieve the expiration time for the specified key. - * Returns -1 if the key has no expiration set or doesn't exists - * - * To avoid lookup, pass key-value object (`kv`) instead of `key`. - */ -long long getExpire(redisDb *db, sds key, kvobj *kv) { - if (kv == NULL) kv = dbFindExpires(db, key); - if (kv == NULL) return -1; - return kvobjGetExpire(kv); -} - -/* Delete the specified expired or evicted key and propagate to replicas. - * Currently notify_type can only be NOTIFY_EXPIRED or NOTIFY_EVICTED, - * and it affects other aspects like the latency monitor event name and, - * which config to look for lazy free, stats var to increment, and so on. - * - * key_mem_freed is an out parameter which contains the estimated - * amount of memory freed due to the trimming (may be NULL) */ -static void deleteKeyAndPropagate(redisDb *db, robj *keyobj, int notify_type, long long *key_mem_freed) { - mstime_t latency; - int del_flag = notify_type == NOTIFY_EXPIRED ? DB_FLAG_KEY_EXPIRED : DB_FLAG_KEY_EVICTED; - int lazy_flag = notify_type == NOTIFY_EXPIRED ? server.lazyfree_lazy_expire : server.lazyfree_lazy_eviction; - char *latency_name = notify_type == NOTIFY_EXPIRED ? "expire-del" : "evict-del"; - char *notify_name = notify_type == NOTIFY_EXPIRED ? "expired" : "evicted"; - - /* The key needs to be converted from static to heap before deleted */ - int static_key = keyobj->refcount == OBJ_STATIC_REFCOUNT; - if (static_key) { - keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr)); - } - - serverLog(LL_DEBUG,"key %s %s: deleting it", (char*)keyobj->ptr, notify_type == NOTIFY_EXPIRED ? "expired" : "evicted"); - - /* We compute the amount of memory freed by db*Delete() alone. - * It is possible that actually the memory needed to propagate - * the DEL in AOF and replication link is greater than the one - * we are freeing removing the key, but we can't account for - * that otherwise we would never exit the loop. - * - * Same for CSC invalidation messages generated by keyModified. - * - * AOF and Output buffer memory will be freed eventually so - * we only care about memory used by the key space. - * - * The code here used to first propagate and then record delta - * using only zmalloc_used_memory but in CRDT we can't do that - * so we use freeMemoryGetNotCountedMemory to avoid counting - * AOF and slave buffers */ - if (key_mem_freed) *key_mem_freed = (long long) zmalloc_used_memory() - freeMemoryGetNotCountedMemory(); - latencyStartMonitor(latency); - dbGenericDelete(db, keyobj, lazy_flag, del_flag); - latencyEndMonitor(latency); - latencyAddSampleIfNeeded(latency_name, latency); - if (key_mem_freed) *key_mem_freed -= (long long) zmalloc_used_memory() - freeMemoryGetNotCountedMemory(); - - notifyKeyspaceEvent(notify_type, notify_name,keyobj, db->id); - keyModified(NULL, db, keyobj, NULL, 1); - propagateDeletion(db, keyobj, lazy_flag); - - if (notify_type == NOTIFY_EXPIRED) - server.stat_expiredkeys++; - else - server.stat_evictedkeys++; - - if (static_key) - decrRefCount(keyobj); -} - -/* Delete the specified expired key and propagate. */ -void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) { - deleteKeyAndPropagate(db, keyobj, NOTIFY_EXPIRED, NULL); -} - -/* Delete the specified evicted key and propagate. */ -void deleteEvictedKeyAndPropagate(redisDb *db, robj *keyobj, long long *key_mem_freed) { - deleteKeyAndPropagate(db, keyobj, NOTIFY_EVICTED, key_mem_freed); -} - -/* Propagate an implicit key deletion into replicas and the AOF file. - * When a key was deleted in the master by eviction, expiration or a similar - * mechanism a DEL/UNLINK operation for this key is sent - * to all the replicas and the AOF file if enabled. - * - * This way the key deletion is centralized in one place, and since both - * AOF and the replication link guarantee operation ordering, everything - * will be consistent even if we allow write operations against deleted - * keys. - * - * This function may be called from: - * 1. Within call(): Example: Lazy-expire on key access. - * In this case the caller doesn't have to do anything - * because call() handles server.also_propagate(); or - * 2. Outside of call(): Example: Active-expire, eviction, slot ownership changed. - * In this the caller must remember to call - * postExecutionUnitOperations, preferably just after a - * single deletion batch, so that DEL/UNLINK will NOT be wrapped - * in MULTI/EXEC */ -void propagateDeletion(redisDb *db, robj *key, int lazy) { - robj *argv[2]; - - argv[0] = lazy ? shared.unlink : shared.del; - argv[1] = key; - incrRefCount(argv[0]); - incrRefCount(argv[1]); - - /* If the master decided to delete a key we must propagate it to replicas no matter what. - * Even if module executed a command without asking for propagation. */ - int prev_replication_allowed = server.replication_allowed; - server.replication_allowed = 1; - alsoPropagate(db->id,argv,2,PROPAGATE_AOF|PROPAGATE_REPL); - server.replication_allowed = prev_replication_allowed; - - decrRefCount(argv[0]); - decrRefCount(argv[1]); -} - -/* Check if the key is expired - * - * Provide either the key name for a lookup or KV object (to save lookup) - */ -int keyIsExpired(redisDb *db, sds key, kvobj *kv) { - /* Don't expire anything while loading. It will be done later. */ - if (server.loading || server.allow_access_expired) return 0; - mstime_t when = getExpire(db, key, kv); - if (when < 0) return 0; /* No expire for this key */ - const mstime_t now = commandTimeSnapshot(); - /* The key expired if the current (virtual or real) time is greater - * than the expire time of the key. */ - return now > when; -} - -/* Check if user configuration allows key to be deleted due to expiary */ -int confAllowsExpireDel(void) { - if (server.lazyexpire_nested_arbitrary_keys) - return 1; - - /* This configuration specifically targets nested commands, to align with RE's feature of replication between dbs. - * transactions (from scripts or multi-exec) containing commands like SCAN and RANDOMKEY will execute locally, but their - * lazy-expiration DELs may induce CROSS-SLOT on remote proxy in mode replica-of (RED-161574) */ - return !(server.execution_nesting > 1 && server.executing_client->cmd->flags & CMD_TOUCHES_ARBITRARY_KEYS); -} - -/* This function is called when we are going to perform some operation - * in a given key, but such key may be already logically expired even if - * it still exists in the database. The main way this function is called - * is via lookupKey*() family of functions. - * - * The behavior of the function depends on the replication role of the - * instance, because by default replicas do not delete expired keys. They - * wait for DELs from the master for consistency matters. However even - * replicas will try to have a coherent return value for the function, - * so that read commands executed in the replica side will be able to - * behave like if the key is expired even if still present (because the - * master has yet to propagate the DEL). - * - * In masters as a side effect of finding a key which is expired, such - * key will be evicted from the database. Also this may trigger the - * propagation of a DEL/UNLINK command in AOF / replication stream. - * - * On replicas, this function does not delete expired keys by default, but - * it still returns KEY_EXPIRED if the key is logically expired. To force deletion - * of logically expired keys even on replicas, use the EXPIRE_FORCE_DELETE_EXPIRED - * flag. Note though that if the current client is executing - * replicated commands from the master, keys are never considered expired. - * - * On the other hand, if you just want expiration check, but need to avoid - * the actual key deletion and propagation of the deletion, use the - * EXPIRE_AVOID_DELETE_EXPIRED flag. If also needed to read expired key (that - * hasn't being deleted yet) then use EXPIRE_ALLOW_ACCESS_EXPIRED. - * - * The return value of the function is KEY_VALID if the key is still valid. - * The function returns KEY_EXPIRED if the key is expired BUT not deleted, - * or returns KEY_DELETED if the key is expired and deleted. If the key is in a - * trim job due to slot migration, the function returns KEY_TRIMMED, unless - * EXPIRE_ALLOW_ACCESS_TRIMMED is set, in which case it returns KEY_VALID. - * - * You can optionally pass `kv` to save a lookup. - */ -keyStatus expireIfNeeded(redisDb *db, robj *key, kvobj *kv, int flags) { - debugAssert(key != NULL || kv != NULL); - - /* NOTE: Keys in slots scheduled for trimming can still exist for a while. - * We don't delete it here, return KEY_VALID if allowing access to trimmed - * keys, and return KEY_TRIMMED otherwise. */ - sds key_name = key ? key->ptr : kvobjGetKey(kv); - if (asmIsKeyInTrimJob(key_name)) { - if (server.allow_access_trimmed || (flags & EXPIRE_ALLOW_ACCESS_TRIMMED)) - return KEY_VALID; - - return KEY_TRIMMED; - } - - if ((flags & EXPIRE_ALLOW_ACCESS_EXPIRED) || - (!keyIsExpired(db, key ? key->ptr : NULL, kv))) - return KEY_VALID; - - /* If we are running in the context of a replica, instead of - * evicting the expired key from the database, we return ASAP: - * the replica key expiration is controlled by the master that will - * send us synthesized DEL operations for expired keys. The - * exception is when write operations are performed on writable - * replicas. - * - * In cluster mode, we also return ASAP if we are importing data - * from the source, to avoid deleting keys that are still in use. - * We create a fake master client for data import, which can be - * identified using the CLIENT_MASTER flag. - * - * Still we try to return the right information to the caller, - * that is, KEY_VALID if we think the key should still be valid, - * KEY_EXPIRED if we think the key is expired but don't want to delete it at this time. - * - * When replicating commands from the master, keys are never considered - * expired. */ - if (server.masterhost != NULL || server.cluster_enabled) { - if (server.current_client && (server.current_client->flags & CLIENT_MASTER)) return KEY_VALID; - if (server.masterhost != NULL && !(flags & EXPIRE_FORCE_DELETE_EXPIRED)) return KEY_EXPIRED; - } - - /* Check if user configuration disables lazy-expire deletions in current state. - * This will only apply if the server doesn't mandate key deletion to operate correctly (write commands). */ - if (!(flags & EXPIRE_FORCE_DELETE_EXPIRED) && !confAllowsExpireDel()) - return KEY_EXPIRED; - - /* In some cases we're explicitly instructed to return an indication of a - * missing key without actually deleting it, even on masters. */ - if (flags & EXPIRE_AVOID_DELETE_EXPIRED) - return KEY_EXPIRED; - - /* If 'expire' action is paused, for whatever reason, then don't expire any key. - * Typically, at the end of the pause we will properly expire the key OR we - * will have failed over and the new primary will send us the expire. */ - if (isPausedActionsWithUpdate(PAUSE_ACTION_EXPIRE)) return KEY_EXPIRED; - - /* Perform deletion */ - if (key) { - deleteExpiredKeyAndPropagate(db, key); - } else { - sds keyname = kvobjGetKey(kv); - robj *tmpkey = createStringObject(keyname, sdslen(keyname)); - deleteExpiredKeyAndPropagate(db, tmpkey); - decrRefCount(tmpkey); - } - return KEY_DELETED; -} - -/* CB passed to kvstoreExpand. - * The purpose is to skip expansion of unused dicts in cluster mode (all - * dicts not mapped to *my* slots) */ -static int dbExpandSkipSlot(int slot) { - return !clusterNodeCoversSlot(getMyClusterNode(), slot); -} - -/* - * This functions increases size of the main/expires db to match desired number. - * In cluster mode resizes all individual dictionaries for slots that this node owns. - * - * Based on the parameter `try_expand`, appropriate dict expand API is invoked. - * if try_expand is set to 1, `dictTryExpand` is used else `dictExpand`. - * The return code is either `DICT_OK`/`DICT_ERR` for both the API(s). - * `DICT_OK` response is for successful expansion. However ,`DICT_ERR` response signifies failure in allocation in - * `dictTryExpand` call and in case of `dictExpand` call it signifies no expansion was performed. - */ -static int dbExpandGeneric(kvstore *kvs, uint64_t db_size, int try_expand) { - int ret; - if (server.cluster_enabled) { - /* We don't know exact number of keys that would fall into each slot, but we can - * approximate it, assuming even distribution, divide it by the number of slots. */ - int slots = getMyShardSlotCount(); - if (slots == 0) return C_OK; - db_size = db_size / slots; - ret = kvstoreExpand(kvs, db_size, try_expand, dbExpandSkipSlot); - } else { - ret = kvstoreExpand(kvs, db_size, try_expand, NULL); - } - - return ret? C_OK : C_ERR; -} - -int dbExpand(redisDb *db, uint64_t db_size, int try_expand) { - return dbExpandGeneric(db->keys, db_size, try_expand); -} - -int dbExpandExpires(redisDb *db, uint64_t db_size, int try_expand) { - return dbExpandGeneric(db->expires, db_size, try_expand); -} - -static kvobj *dbFindGeneric(kvstore *kvs, sds key) { - dictEntry *res = kvstoreDictFind(kvs, getKeySlot(key), key); - return (res) ? dictGetKey(res) : NULL; -} - -kvobj *dbFind(redisDb *db, sds key) { - return dbFindGeneric(db->keys, key); -} - -/* Find a KV in the main db. Return also link to it. - * - * plink - If found, set to the link of the key in the dict. - * If not found, set to the bucket where the key should be added. - * If set to NULL, then HT of dict not allocated yet. - */ -kvobj *dbFindByLink(redisDb *db, sds key, dictEntryLink *plink) { - int slot = getKeySlot(key); - dictEntryLink link, bucket; - - link = kvstoreDictFindLink(db->keys, slot, key, &bucket); - if (link == NULL) { - if (plink) *plink = bucket; - return NULL; - } else { - if (plink) *plink = link; - return dictGetKV(*link); - } -} - -kvobj *dbFindExpires(redisDb *db, sds key) { - return dbFindGeneric(db->expires, key); -} - -unsigned long long dbSize(redisDb *db) { - unsigned long long total = kvstoreSize(db->keys); - - if (server.cluster_enabled) { - /* If we are the master and there is no import or trim in progress, - * then we can return the total count. If not, we need to subtract - * the number of keys in slots that are not accessible, as below. */ - if (clusterNodeIsMaster(getMyClusterNode()) && - !asmImportInProgress() && - !asmIsTrimInProgress()) - { - return total; - } - - /* Besides, we don't know the slot migration states on replicas, so we - * need to check each slot to see if it's accessible. */ - for (int i = 0; i < CLUSTER_SLOTS; i++) { - dict *d = kvstoreGetDict(db->keys, i); - if (d && !clusterCanAccessKeysInSlot(i)) { - total -= kvstoreDictSize(db->keys, i); - } - } - } - - return total; -} - -unsigned long long dbScan(redisDb *db, unsigned long long cursor, dictScanFunction *scan_cb, void *privdata) { - return kvstoreScan(db->keys, cursor, -1, scan_cb, scanShouldSkipDict, privdata); -} - -/* ----------------------------------------------------------------------------- - * API to get key arguments from commands - * ---------------------------------------------------------------------------*/ - -/* Prepare the getKeysResult struct to hold numkeys, either by using the - * pre-allocated keysbuf or by allocating a new array on the heap. - * - * This function must be called at least once before starting to populate - * the result, and can be called repeatedly to enlarge the result array. - */ -keyReference *getKeysPrepareResult(getKeysResult *result, int numkeys) { - /* GETKEYS_RESULT_INIT initializes keys to NULL, point it to the pre-allocated stack - * buffer here. */ - if (!result->keys) { - serverAssert(!result->numkeys); - result->keys = result->keysbuf; - } - - /* Resize if necessary */ - if (numkeys > result->size) { - if (result->keys != result->keysbuf) { - /* We're not using a static buffer, just (re)alloc */ - result->keys = zrealloc(result->keys, numkeys * sizeof(keyReference)); - } else { - /* We are using a static buffer, copy its contents */ - result->keys = zmalloc(numkeys * sizeof(keyReference)); - if (result->numkeys) - memcpy(result->keys, result->keysbuf, result->numkeys * sizeof(keyReference)); - } - result->size = numkeys; - } - - return result->keys; -} - -/* Returns a bitmask with all the flags found in any of the key specs of the command. - * The 'inv' argument means we'll return a mask with all flags that are missing in at least one spec. */ -int64_t getAllKeySpecsFlags(struct redisCommand *cmd, int inv) { - int64_t flags = 0; - for (int j = 0; j < cmd->key_specs_num; j++) { - keySpec *spec = cmd->key_specs + j; - flags |= inv? ~spec->flags : spec->flags; - } - return flags; -} - -/* Fetch the keys based of the provided key specs. Returns the number of keys found, or -1 on error. - * There are several flags that can be used to modify how this function finds keys in a command. - * - * GET_KEYSPEC_INCLUDE_NOT_KEYS: Return 'fake' keys as if they were keys. - * GET_KEYSPEC_RETURN_PARTIAL: Skips invalid and incomplete keyspecs but returns the keys - * found in other valid keyspecs. - */ -int getKeysUsingKeySpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) { - long j, i, last, first, step; - keyReference *keys; - serverAssert(result->numkeys == 0); /* caller should initialize or reset it */ - - for (j = 0; j < cmd->key_specs_num; j++) { - keySpec *spec = cmd->key_specs + j; - serverAssert(spec->begin_search_type != KSPEC_BS_INVALID); - /* Skip specs that represent 'fake' keys */ - if ((spec->flags & CMD_KEY_NOT_KEY) && !(search_flags & GET_KEYSPEC_INCLUDE_NOT_KEYS)) { - continue; - } - - first = 0; - if (spec->begin_search_type == KSPEC_BS_INDEX) { - first = spec->bs.index.pos; - } else if (spec->begin_search_type == KSPEC_BS_KEYWORD) { - int start_index = spec->bs.keyword.startfrom > 0 ? spec->bs.keyword.startfrom : argc+spec->bs.keyword.startfrom; - int end_index = spec->bs.keyword.startfrom > 0 ? argc-1: 1; - for (i = start_index; i != end_index; i = start_index <= end_index ? i + 1 : i - 1) { - if (i >= argc || i < 1) - break; - if (!strcasecmp((char*)argv[i]->ptr,spec->bs.keyword.keyword)) { - first = i+1; - break; - } - } - /* keyword not found */ - if (!first) { - continue; - } - } else { - /* unknown spec */ - goto invalid_spec; - } - - if (spec->find_keys_type == KSPEC_FK_RANGE) { - step = spec->fk.range.keystep; - if (spec->fk.range.lastkey >= 0) { - last = first + spec->fk.range.lastkey; - } else { - if (!spec->fk.range.limit) { - last = argc + spec->fk.range.lastkey; - } else { - serverAssert(spec->fk.range.lastkey == -1); - last = first + ((argc-first)/spec->fk.range.limit + spec->fk.range.lastkey); - } - } - } else if (spec->find_keys_type == KSPEC_FK_KEYNUM) { - step = spec->fk.keynum.keystep; - long long numkeys; - if (spec->fk.keynum.keynumidx >= argc) - goto invalid_spec; - - sds keynum_str = argv[first + spec->fk.keynum.keynumidx]->ptr; - if (!string2ll(keynum_str,sdslen(keynum_str),&numkeys) || numkeys < 0) { - /* Unable to parse the numkeys argument or it was invalid */ - goto invalid_spec; - } - - first += spec->fk.keynum.firstkey; - last = first + ((long)numkeys - 1) * step; - } else { - /* unknown spec */ - goto invalid_spec; - } - - /* First or last is out of bounds, which indicates a syntax error */ - if (last >= argc || last < first || first >= argc) { - goto invalid_spec; - } - - int count = ((last - first)+1); - keys = getKeysPrepareResult(result, result->numkeys + count); - - for (i = first; i <= last; i += step) { - if (i >= argc || i < first) { - /* Modules commands, and standard commands with a not fixed number - * of arguments (negative arity parameter) do not have dispatch - * time arity checks, so we need to handle the case where the user - * passed an invalid number of arguments here. In this case we - * return no keys and expect the command implementation to report - * an arity or syntax error. */ - if (cmd->flags & CMD_MODULE || cmd->arity < 0) { - continue; - } else { - serverPanic("Redis built-in command declared keys positions not matching the arity requirements."); - } - } - keys[result->numkeys].pos = i; - keys[result->numkeys].flags = spec->flags; - result->numkeys++; - } - - /* Handle incomplete specs (only after we added the current spec - * to `keys`, just in case GET_KEYSPEC_RETURN_PARTIAL was given) */ - if (spec->flags & CMD_KEY_INCOMPLETE) { - goto invalid_spec; - } - - /* Done with this spec */ - continue; - -invalid_spec: - if (search_flags & GET_KEYSPEC_RETURN_PARTIAL) { - continue; - } else { - result->numkeys = 0; - return -1; - } - } - - return result->numkeys; -} - -/* Return all the arguments that are keys in the command passed via argc / argv. - * This function will eventually replace getKeysFromCommand. - * - * The command returns the positions of all the key arguments inside the array, - * so the actual return value is a heap allocated array of integers. The - * length of the array is returned by reference into *numkeys. - * - * Along with the position, this command also returns the flags that are - * associated with how Redis will access the key. - * - * 'cmd' must be point to the corresponding entry into the redisCommand - * table, according to the command name in argv[0]. */ -int getKeysFromCommandWithSpecs(struct redisCommand *cmd, robj **argv, int argc, int search_flags, getKeysResult *result) { - /* The command has at least one key-spec not marked as NOT_KEY */ - int has_keyspec = (getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY); - /* The command has at least one key-spec marked as VARIABLE_FLAGS */ - int has_varflags = (getAllKeySpecsFlags(cmd, 0) & CMD_KEY_VARIABLE_FLAGS); - - /* We prefer key-specs if there are any, and their flags are reliable. */ - if (has_keyspec && !has_varflags) { - int ret = getKeysUsingKeySpecs(cmd,argv,argc,search_flags,result); - if (ret >= 0) - return ret; - /* If the specs returned with an error (probably an INVALID or INCOMPLETE spec), - * fallback to the callback method. */ - } - - /* Resort to getkeys callback methods. */ - if (cmd->flags & CMD_MODULE_GETKEYS) - return moduleGetCommandKeysViaAPI(cmd,argv,argc,result); - - /* We use native getkeys as a last resort, since not all these native getkeys provide - * flags properly (only the ones that correspond to INVALID, INCOMPLETE or VARIABLE_FLAGS do.*/ - if (cmd->getkeys_proc) - return cmd->getkeys_proc(cmd,argv,argc,result); - return 0; -} - -/* This function returns a sanity check if the command may have keys. */ -int doesCommandHaveKeys(struct redisCommand *cmd) { - return cmd->getkeys_proc || /* has getkeys_proc (non modules) */ - (cmd->flags & CMD_MODULE_GETKEYS) || /* module with GETKEYS */ - (getAllKeySpecsFlags(cmd, 1) & CMD_KEY_NOT_KEY); /* has at least one key-spec not marked as NOT_KEY */ -} - -/* A simplified channel spec table that contains all of the redis commands - * and which channels they have and how they are accessed. */ -typedef struct ChannelSpecs { - redisCommandProc *proc; /* Command procedure to match against */ - uint64_t flags; /* CMD_CHANNEL_* flags for this command */ - int start; /* The initial position of the first channel */ - int count; /* The number of channels, or -1 if all remaining - * arguments are channels. */ -} ChannelSpecs; - -ChannelSpecs commands_with_channels[] = { - {subscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1}, - {ssubscribeCommand, CMD_CHANNEL_SUBSCRIBE, 1, -1}, - {unsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1}, - {sunsubscribeCommand, CMD_CHANNEL_UNSUBSCRIBE, 1, -1}, - {psubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_SUBSCRIBE, 1, -1}, - {punsubscribeCommand, CMD_CHANNEL_PATTERN | CMD_CHANNEL_UNSUBSCRIBE, 1, -1}, - {publishCommand, CMD_CHANNEL_PUBLISH, 1, 1}, - {spublishCommand, CMD_CHANNEL_PUBLISH, 1, 1}, - {NULL,0} /* Terminator. */ -}; - -/* Returns 1 if the command may access any channels matched by the flags - * argument. */ -int doesCommandHaveChannelsWithFlags(struct redisCommand *cmd, int flags) { - /* If a module declares get channels, we are just going to assume - * has channels. This API is allowed to return false positives. */ - if (cmd->flags & CMD_MODULE_GETCHANNELS) { - return 1; - } - for (ChannelSpecs *spec = commands_with_channels; spec->proc != NULL; spec += 1) { - if (cmd->proc == spec->proc) { - return !!(spec->flags & flags); - } - } - return 0; -} - -/* Return all the arguments that are channels in the command passed via argc / argv. - * This function behaves similar to getKeysFromCommandWithSpecs, but with channels - * instead of keys. - * - * The command returns the positions of all the channel arguments inside the array, - * so the actual return value is a heap allocated array of integers. The - * length of the array is returned by reference into *numkeys. - * - * Along with the position, this command also returns the flags that are - * associated with how Redis will access the channel. - * - * 'cmd' must be point to the corresponding entry into the redisCommand - * table, according to the command name in argv[0]. */ -int getChannelsFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - keyReference *keys; - /* If a module declares get channels, use that. */ - if (cmd->flags & CMD_MODULE_GETCHANNELS) { - return moduleGetCommandChannelsViaAPI(cmd, argv, argc, result); - } - /* Otherwise check the channel spec table */ - for (ChannelSpecs *spec = commands_with_channels; spec != NULL; spec += 1) { - if (cmd->proc == spec->proc) { - int start = spec->start; - int stop = (spec->count == -1) ? argc : start + spec->count; - if (stop > argc) stop = argc; - int count = 0; - keys = getKeysPrepareResult(result, stop - start); - for (int i = start; i < stop; i++ ) { - keys[count].pos = i; - keys[count++].flags = spec->flags; - } - result->numkeys = count; - return count; - } - } - return 0; -} - -/* Extract keys/channels from a command and calculate the cluster slot. - * Returns the number of keys/channels extracted. - * The slot number is returned by reference into *slot. - * If is_incomplete is not NULL, it will be set for key extraction. - * - * This function handles both regular commands (keys) and sharded pubsub - * commands (channels), but excludes regular pubsub commands which don't - * have slots. - */ -int extractKeysAndSlot(struct redisCommand *cmd, robj **argv, int argc, - getKeysResult *result, int *slot) { - int num_keys = -1; - - if (!doesCommandHaveChannelsWithFlags(cmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE)) { - num_keys = getKeysFromCommandWithSpecs(cmd, argv, argc, GET_KEYSPEC_DEFAULT, result); - } else { - /* Only extract channels for commands that have key_specs (sharded pubsub). - * Regular pubsub commands (PUBLISH, SUBSCRIBE) don't have slots. */ - if (cmd->key_specs_num > 0) { - num_keys = getChannelsFromCommand(cmd, argv, argc, result); - } else { - num_keys = 0; - } - } - - *slot = extractSlotFromKeysResult(argv, result); - return num_keys; -} - -/* The base case is to use the keys position as given in the command table - * (firstkey, lastkey, step). - * This function works only on command with the legacy_range_key_spec, - * all other commands should be handled by getkeys_proc. - * - * If the commands keyspec is incomplete, no keys will be returned, and the provided - * keys function should be called instead. - * - * NOTE: This function does not guarantee populating the flags for - * the keys, in order to get flags you should use getKeysUsingKeySpecs. */ -int getKeysUsingLegacyRangeSpec(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - int j, i = 0, last, first, step; - keyReference *keys; - UNUSED(argv); - - if (cmd->legacy_range_key_spec.begin_search_type == KSPEC_BS_INVALID) { - result->numkeys = 0; - return 0; - } - - first = cmd->legacy_range_key_spec.bs.index.pos; - last = cmd->legacy_range_key_spec.fk.range.lastkey; - if (last >= 0) - last += first; - step = cmd->legacy_range_key_spec.fk.range.keystep; - - if (last < 0) last = argc+last; - - int count = ((last - first)+1); - keys = getKeysPrepareResult(result, count); - - for (j = first; j <= last; j += step) { - if (j >= argc || j < first) { - /* Modules commands, and standard commands with a not fixed number - * of arguments (negative arity parameter) do not have dispatch - * time arity checks, so we need to handle the case where the user - * passed an invalid number of arguments here. In this case we - * return no keys and expect the command implementation to report - * an arity or syntax error. */ - if (cmd->flags & CMD_MODULE || cmd->arity < 0) { - result->numkeys = 0; - return 0; - } else { - serverPanic("Redis built-in command declared keys positions not matching the arity requirements."); - } - } - keys[i].pos = j; - /* Flags are omitted from legacy key specs */ - keys[i++].flags = 0; - } - result->numkeys = i; - return i; -} - -/* Return all the arguments that are keys in the command passed via argc / argv. - * - * The command returns the positions of all the key arguments inside the array, - * so the actual return value is a heap allocated array of integers. The - * length of the array is returned by reference into *numkeys. - * - * 'cmd' must be point to the corresponding entry into the redisCommand - * table, according to the command name in argv[0]. - * - * This function uses the command table if a command-specific helper function - * is not required, otherwise it calls the command-specific function. */ -int getKeysFromCommand(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - if (cmd->flags & CMD_MODULE_GETKEYS) { - return moduleGetCommandKeysViaAPI(cmd,argv,argc,result); - } else if (cmd->getkeys_proc) { - return cmd->getkeys_proc(cmd,argv,argc,result); - } else { - return getKeysUsingLegacyRangeSpec(cmd,argv,argc,result); - } -} - -/* Free the result of getKeysFromCommand. */ -void getKeysFreeResult(getKeysResult *result) { - if (result && result->keys != result->keysbuf) - zfree(result->keys); -} - -/* Helper function to extract keys from following commands: - * COMMAND [destkey] <num-keys> <key> [...] <key> [...] ... <options> - * - * eg: - * ZUNION <num-keys> <key> <key> ... <key> <options> - * ZUNIONSTORE <destkey> <num-keys> <key> <key> ... <key> <options> - * - * 'storeKeyOfs': destkey index, 0 means destkey not exists. - * 'keyCountOfs': num-keys index. - * 'firstKeyOfs': firstkey index. - * 'keyStep': the interval of each key, usually this value is 1. - * - * The commands using this function have a fully defined keyspec, so returning flags isn't needed. */ -int genericGetKeys(int storeKeyOfs, int keyCountOfs, int firstKeyOfs, int keyStep, - robj **argv, int argc, getKeysResult *result) { - int i, num; - keyReference *keys; - - num = atoi(argv[keyCountOfs]->ptr); - /* Sanity check. Don't return any key if the command is going to - * reply with syntax error. (no input keys). */ - if (num < 1 || num > (argc - firstKeyOfs)/keyStep) { - result->numkeys = 0; - return 0; - } - - int numkeys = storeKeyOfs ? num + 1 : num; - keys = getKeysPrepareResult(result, numkeys); - result->numkeys = numkeys; - - /* Add all key positions for argv[firstKeyOfs...n] to keys[] */ - for (i = 0; i < num; i++) { - keys[i].pos = firstKeyOfs+(i*keyStep); - keys[i].flags = 0; - } - - if (storeKeyOfs) { - keys[num].pos = storeKeyOfs; - keys[num].flags = 0; - } - return result->numkeys; -} - -int sintercardGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - UNUSED(cmd); - return genericGetKeys(0, 1, 2, 1, argv, argc, result); -} - -int zunionInterDiffStoreGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - UNUSED(cmd); - return genericGetKeys(1, 2, 3, 1, argv, argc, result); -} - -int zunionInterDiffGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - UNUSED(cmd); - return genericGetKeys(0, 1, 2, 1, argv, argc, result); -} - -int evalGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - UNUSED(cmd); - return genericGetKeys(0, 2, 3, 1, argv, argc, result); -} - -int functionGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - UNUSED(cmd); - return genericGetKeys(0, 2, 3, 1, argv, argc, result); -} - -int lmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - UNUSED(cmd); - return genericGetKeys(0, 1, 2, 1, argv, argc, result); -} - -int blmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - UNUSED(cmd); - return genericGetKeys(0, 2, 3, 1, argv, argc, result); -} - -int zmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - UNUSED(cmd); - return genericGetKeys(0, 1, 2, 1, argv, argc, result); -} - -int bzmpopGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - UNUSED(cmd); - return genericGetKeys(0, 2, 3, 1, argv, argc, result); -} - -/* Helper function to extract keys from the SORT RO command. - * - * SORT <sort-key> - * - * The second argument of SORT is always a key, however an arbitrary number of - * keys may be accessed while doing the sort (the BY and GET args), so the - * key-spec declares incomplete keys which is why we have to provide a concrete - * implementation to fetch the keys. - * - * This command declares incomplete keys, so the flags are correctly set for this function */ -int sortROGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - keyReference *keys; - UNUSED(cmd); - UNUSED(argv); - UNUSED(argc); - - keys = getKeysPrepareResult(result, 1); - keys[0].pos = 1; /* <sort-key> is always present. */ - keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS; - result->numkeys = 1; - return result->numkeys; -} - -/* Helper function to extract keys from the SORT command. - * - * SORT <sort-key> ... STORE <store-key> ... - * - * The first argument of SORT is always a key, however a list of options - * follow in SQL-alike style. Here we parse just the minimum in order to - * correctly identify keys in the "STORE" option. - * - * This command declares incomplete keys, so the flags are correctly set for this function */ -int sortGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - int i, j, num, found_store = 0; - keyReference *keys; - UNUSED(cmd); - - num = 0; - keys = getKeysPrepareResult(result, 2); /* Alloc 2 places for the worst case. */ - keys[num].pos = 1; /* <sort-key> is always present. */ - keys[num++].flags = CMD_KEY_RO | CMD_KEY_ACCESS; - - /* Search for STORE option. By default we consider options to don't - * have arguments, so if we find an unknown option name we scan the - * next. However there are options with 1 or 2 arguments, so we - * provide a list here in order to skip the right number of args. */ - struct { - char *name; - int skip; - } skiplist[] = { - {"limit", 2}, - {"get", 1}, - {"by", 1}, - {NULL, 0} /* End of elements. */ - }; - - for (i = 2; i < argc; i++) { - for (j = 0; skiplist[j].name != NULL; j++) { - if (!strcasecmp(argv[i]->ptr,skiplist[j].name)) { - i += skiplist[j].skip; - break; - } else if (!strcasecmp(argv[i]->ptr,"store") && i+1 < argc) { - /* Note: we don't increment "num" here and continue the loop - * to be sure to process the *last* "STORE" option if multiple - * ones are provided. This is same behavior as SORT. */ - found_store = 1; - keys[num].pos = i+1; /* <store-key> */ - keys[num].flags = CMD_KEY_OW | CMD_KEY_UPDATE; - break; - } - } - } - result->numkeys = num + found_store; - return result->numkeys; -} - -/* This command declares incomplete keys, so the flags are correctly set for this function */ -int migrateGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - int i, j, num, first; - keyReference *keys; - UNUSED(cmd); - - /* Assume the obvious form. */ - first = 3; - num = 1; - - /* But check for the extended one with the KEYS option. */ - struct { - char* name; - int skip; - } skip_keywords[] = { - {"copy", 0}, - {"replace", 0}, - {"auth", 1}, - {"auth2", 2}, - {NULL, 0} - }; - if (argc > 6) { - for (i = 6; i < argc; i++) { - if (!strcasecmp(argv[i]->ptr, "keys")) { - if (sdslen(argv[3]->ptr) > 0) { - /* This is a syntax error. So ignore the keys and leave - * the syntax error to be handled by migrateCommand. */ - num = 0; - } else { - first = i + 1; - num = argc - first; - } - break; - } - for (j = 0; skip_keywords[j].name != NULL; j++) { - if (!strcasecmp(argv[i]->ptr, skip_keywords[j].name)) { - i += skip_keywords[j].skip; - break; - } - } - } - } - - keys = getKeysPrepareResult(result, num); - for (i = 0; i < num; i++) { - keys[i].pos = first+i; - keys[i].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_DELETE; - } - result->numkeys = num; - return num; -} - -/* Helper function to extract keys from following commands: - * GEORADIUS key x y radius unit [WITHDIST] [WITHHASH] [WITHCOORD] [ASC|DESC] - * [COUNT count] [STORE key|STOREDIST key] - * GEORADIUSBYMEMBER key member radius unit ... options ... - * - * This command has a fully defined keyspec, so returning flags isn't needed. */ -int georadiusGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - int i, num; - keyReference *keys; - UNUSED(cmd); - - /* Check for the presence of the stored key in the command */ - int stored_key = -1; - for (i = 5; i < argc; i++) { - char *arg = argv[i]->ptr; - /* For the case when user specifies both "store" and "storedist" options, the - * second key specified would override the first key. This behavior is kept - * the same as in georadiusCommand method. - */ - if ((!strcasecmp(arg, "store") || !strcasecmp(arg, "storedist")) && ((i+1) < argc)) { - stored_key = i+1; - i++; - } - } - num = 1 + (stored_key == -1 ? 0 : 1); - - /* Keys in the command come from two places: - * argv[1] = key, - * argv[5...n] = stored key if present - */ - keys = getKeysPrepareResult(result, num); - - /* Add all key positions to keys[] */ - keys[0].pos = 1; - keys[0].flags = 0; - if(num > 1) { - keys[1].pos = stored_key; - keys[1].flags = 0; - } - result->numkeys = num; - return num; -} - -/* XREAD [BLOCK <milliseconds>] [COUNT <count>] [GROUP <groupname> <ttl>] - * STREAMS key_1 key_2 ... key_N ID_1 ID_2 ... ID_N - * - * This command has a fully defined keyspec, so returning flags isn't needed. */ -int xreadGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - int i, num = 0; - keyReference *keys; - UNUSED(cmd); - - /* We need to parse the options of the command in order to seek the first - * "STREAMS" string which is actually the option. This is needed because - * "STREAMS" could also be the name of the consumer group and even the - * name of the stream key. */ - int streams_pos = -1; - for (i = 1; i < argc; i++) { - char *arg = argv[i]->ptr; - if (!strcasecmp(arg, "block")) { - i++; /* Skip option argument. */ - } else if (!strcasecmp(arg, "count")) { - i++; /* Skip option argument. */ - } else if (!strcasecmp(arg, "group")) { - i += 2; /* Skip option argument. */ - } else if (!strcasecmp(arg, "noack")) { - /* Nothing to do. */ - } else if (!strcasecmp(arg, "streams")) { - streams_pos = i; - break; - } else { - break; /* Syntax error. */ - } - } - if (streams_pos != -1) num = argc - streams_pos - 1; - - /* Syntax error. */ - if (streams_pos == -1 || num == 0 || num % 2 != 0) { - result->numkeys = 0; - return 0; - } - num /= 2; /* We have half the keys as there are arguments because - there are also the IDs, one per key. */ - - keys = getKeysPrepareResult(result, num); - for (i = streams_pos+1; i < argc-num; i++) { - keys[i-streams_pos-1].pos = i; - keys[i-streams_pos-1].flags = 0; - } - result->numkeys = num; - return num; -} - -/* Helper function to extract keys from the SET command, which may have - * an RW flag if the GET, IF* arguments are present, OW otherwise. */ -int setGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - keyReference *keys; - UNUSED(cmd); - - keys = getKeysPrepareResult(result, 1); - keys[0].pos = 1; /* We always know the position */ - result->numkeys = 1; - int actual = CMD_KEY_OW; - int logical = CMD_KEY_UPDATE; - - for (int i = 3; i < argc; i++) { - char *arg = argv[i]->ptr; - if ((arg[0] == 'g' || arg[0] == 'G') && - (arg[1] == 'e' || arg[1] == 'E') && - (arg[2] == 't' || arg[2] == 'T') && arg[3] == '\0') - { - actual = CMD_KEY_RW; - logical |= CMD_KEY_ACCESS; - } else if (!strcasecmp(arg, "ifeq") || !strcasecmp(arg, "ifne") || - !strcasecmp(arg, "ifdeq") || !strcasecmp(arg, "ifdne")) - { - actual = CMD_KEY_RW; - } - } - - keys[0].flags = actual | logical; - - return 1; -} - -/* Helper function to extract keys from the DELEX command, which may have - * an RW flag if the IF* arguments are present, RM otherwise. */ -int delexGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - keyReference *keys; - UNUSED(cmd); - - keys = getKeysPrepareResult(result, 1); - keys[0].pos = 1; /* We always know the position */ - result->numkeys = 1; - int actual = CMD_KEY_RM; - int logical = CMD_KEY_DELETE; - - for (int i = 2; i < argc; i++) { - char *arg = argv[i]->ptr; - if (!strcasecmp(arg, "ifeq") || !strcasecmp(arg, "ifne") || - !strcasecmp(arg, "ifdeq") || !strcasecmp(arg, "ifdne")) - { - actual = CMD_KEY_RW; - } - } - - keys[0].flags = actual | logical; - - return 1; -} - -/* Helper function to extract keys from the BITFIELD command, which may be - * read-only if the BITFIELD GET subcommand is used. */ -int bitfieldGetKeys(struct redisCommand *cmd, robj **argv, int argc, getKeysResult *result) { - keyReference *keys; - int readonly = 1; - UNUSED(cmd); - - keys = getKeysPrepareResult(result, 1); - keys[0].pos = 1; /* We always know the position */ - result->numkeys = 1; - - for (int i = 2; i < argc; i++) { - int remargs = argc - i - 1; /* Remaining args other than current. */ - char *arg = argv[i]->ptr; - if (!strcasecmp(arg, "get") && remargs >= 2) { - i += 2; - } else if ((!strcasecmp(arg, "set") || !strcasecmp(arg, "incrby")) && remargs >= 3) { - readonly = 0; - i += 3; - break; - } else if (!strcasecmp(arg, "overflow") && remargs >= 1) { - i += 1; - } else { - readonly = 0; /* Syntax error. safer to assume non-RO. */ - break; - } - } - - if (readonly) { - keys[0].flags = CMD_KEY_RO | CMD_KEY_ACCESS; - } else { - keys[0].flags = CMD_KEY_RW | CMD_KEY_ACCESS | CMD_KEY_UPDATE; - } - return 1; -} |
