summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/cluster.c
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
commit5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch)
tree1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/src/cluster.c
parentc7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff)
downloadcrep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/src/cluster.c')
-rw-r--r--examples/redis-unstable/src/cluster.c2263
1 files changed, 2263 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/cluster.c b/examples/redis-unstable/src/cluster.c
new file mode 100644
index 0000000..d07c31c
--- /dev/null
+++ b/examples/redis-unstable/src/cluster.c
@@ -0,0 +1,2263 @@
+/*
+ * Copyright (c) 2009-Present, Redis Ltd.
+ * All rights reserved.
+ *
+ * Copyright (c) 2024-present, Valkey contributors.
+ * All rights reserved.
+ *
+ * Licensed under your choice of (a) the Redis Source Available License 2.0
+ * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
+ * GNU Affero General Public License v3 (AGPLv3).
+ *
+ * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
+ */
+
+/*
+ * cluster.c contains the common parts of a clustering
+ * implementation, the parts that are shared between
+ * any implementation of Redis clustering.
+ */
+
+#include "server.h"
+#include "cluster.h"
+#include "cluster_asm.h"
+#include "cluster_slot_stats.h"
+
+#include <ctype.h>
+
+/* -----------------------------------------------------------------------------
+ * Key space handling
+ * -------------------------------------------------------------------------- */
+
+/* If it can be inferred that the given glob-style pattern, as implemented in
+ * stringmatchlen() in util.c, only can match keys belonging to a single slot,
+ * that slot is returned. Otherwise -1 is returned. */
+int patternHashSlot(char *pattern, int length) {
+ int s = -1; /* index of the first '{' */
+
+ for (int i = 0; i < length; i++) {
+ if (pattern[i] == '*' || pattern[i] == '?' || pattern[i] == '[') {
+ /* Wildcard or character class found. Keys can be in any slot. */
+ return -1;
+ } else if (pattern[i] == '\\') {
+ /* Escaped character. Computing slot in this case is not
+ * implemented. We would need a temp buffer. */
+ return -1;
+ } else if (s == -1 && pattern[i] == '{') {
+ /* Opening brace '{' found. */
+ s = i;
+ } else if (s >= 0 && pattern[i] == '}' && i == s + 1) {
+ /* Empty tag '{}' found. The whole key is hashed. Ignore braces. */
+ s = -2;
+ } else if (s >= 0 && pattern[i] == '}') {
+ /* Non-empty tag '{...}' found. Hash what's between braces. */
+ return crc16(pattern + s + 1, i - s - 1) & 0x3FFF;
+ }
+ }
+
+ /* The pattern matches a single key. Hash the whole pattern. */
+ return crc16(pattern, length) & 0x3FFF;
+}
+
+int getSlotOrReply(client *c, robj *o) {
+ long long slot;
+
+ if (getLongLongFromObject(o,&slot) != C_OK ||
+ slot < 0 || slot >= CLUSTER_SLOTS)
+ {
+ addReplyError(c,"Invalid or out of range slot");
+ return -1;
+ }
+ return (int) slot;
+}
+
+ConnectionType *connTypeOfCluster(void) {
+ if (server.tls_cluster) {
+ return connectionTypeTls();
+ }
+
+ return connectionTypeTcp();
+}
+
+/* -----------------------------------------------------------------------------
+ * DUMP, RESTORE and MIGRATE commands
+ * -------------------------------------------------------------------------- */
+
+/* Generates a DUMP-format representation of the object 'o', adding it to the
+ * io stream pointed by 'rio'. This function can't fail. */
+void createDumpPayload(rio *payload, robj *o, robj *key, int dbid, int skip_checksum) {
+ unsigned char buf[2];
+ uint64_t crc = 0;
+
+ /* Serialize the object in an RDB-like format. It consist of an object type
+ * byte followed by the serialized object. This is understood by RESTORE. */
+ rioInitWithBuffer(payload,sdsempty());
+
+ /* Save key metadata if present without (handles TTL separately via command args) */
+ if (getModuleMetaBits(o->metabits))
+ serverAssert(rdbSaveKeyMetadata(payload, key, o, dbid) != -1);
+ serverAssert(rdbSaveObjectType(payload,o));
+ serverAssert(rdbSaveObject(payload,o,key,dbid));
+
+ /* Write the footer, this is how it looks like:
+ * ----------------+---------------------+---------------+
+ * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
+ * ----------------+---------------------+---------------+
+ * RDB version and CRC are both in little endian.
+ */
+
+ /* RDB version */
+ buf[0] = RDB_VERSION & 0xff;
+ buf[1] = (RDB_VERSION >> 8) & 0xff;
+ payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
+
+ /* If crc checksum is disabled, crc is set to 0 and no checksum validation
+ * will be performed on RESTORE. */
+ if (!skip_checksum) {
+ /* CRC64 */
+ crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
+ sdslen(payload->io.buffer.ptr));
+ memrev64ifbe(&crc);
+ }
+ payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
+}
+
+/* Verify that the RDB version of the dump payload matches the one of this Redis
+ * instance and that the checksum is ok.
+ * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
+ * is returned. If rdbver_ptr is not NULL, its populated with the value read
+ * from the input buffer. */
+int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) {
+ unsigned char *footer;
+ uint16_t rdbver;
+ uint64_t crc;
+
+ /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
+ if (len < 10) return C_ERR;
+ footer = p+(len-10);
+
+ /* Set and verify RDB version. */
+ rdbver = (footer[1] << 8) | footer[0];
+ if (rdbver_ptr) {
+ *rdbver_ptr = rdbver;
+ }
+ if (rdbver > RDB_VERSION) return C_ERR;
+
+ if (server.skip_checksum_validation)
+ return C_OK;
+
+ uint64_t crc_payload;
+ memcpy(&crc_payload, footer+2, 8);
+ if (crc_payload == 0) /* No checksum. */
+ return C_OK;
+
+ /* Verify CRC64 */
+ crc = crc64(0,p,len-8);
+ memrev64ifbe(&crc);
+ return crc == crc_payload ? C_OK : C_ERR;
+}
+
+/* DUMP keyname
+ * DUMP is actually not used by Redis Cluster but it is the obvious
+ * complement of RESTORE and can be useful for different applications. */
+void dumpCommand(client *c) {
+ kvobj *o;
+ rio payload;
+
+ /* Check if the key is here. */
+ if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
+ addReplyNull(c);
+ return;
+ }
+
+ /* Create the DUMP encoded representation. */
+ createDumpPayload(&payload,o,c->argv[1],c->db->id,0);
+
+ /* Transfer to the client */
+ addReplyBulkSds(c,payload.io.buffer.ptr);
+ return;
+}
+
+/* RESTORE key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency] */
+void restoreCommand(client *c) {
+ long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
+ rio payload;
+ int j, type, replace = 0, absttl = 0;
+ robj *obj;
+
+ /* Parse additional options */
+ for (j = 4; j < c->argc; j++) {
+ int additional = c->argc-j-1;
+ if (!strcasecmp(c->argv[j]->ptr,"replace")) {
+ replace = 1;
+ } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
+ absttl = 1;
+ } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
+ lfu_freq == -1)
+ {
+ if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
+ != C_OK) return;
+ if (lru_idle < 0) {
+ addReplyError(c,"Invalid IDLETIME value, must be >= 0");
+ return;
+ }
+ lru_clock = LRU_CLOCK();
+ j++; /* Consume additional arg. */
+ } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
+ lru_idle == -1)
+ {
+ if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
+ != C_OK) return;
+ if (lfu_freq < 0 || lfu_freq > 255) {
+ addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
+ return;
+ }
+ j++; /* Consume additional arg. */
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ }
+
+ /* Make sure this key does not already exist here... */
+ robj *key = c->argv[1];
+ kvobj *oldval = lookupKeyWrite(c->db,key);
+ int oldtype = oldval ? oldval->type : -1;
+ if (!replace && oldval) {
+ addReplyErrorObject(c,shared.busykeyerr);
+ return;
+ }
+
+ /* Check if the TTL value makes sense */
+ if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
+ return;
+ } else if (ttl < 0) {
+ addReplyError(c,"Invalid TTL value, must be >= 0");
+ return;
+ }
+
+ /* Verify RDB version and data checksum. */
+ if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr),NULL) == C_ERR)
+ {
+ addReplyError(c,"DUMP payload version or checksum are wrong");
+ return;
+ }
+
+ rioInitWithBuffer(&payload,c->argv[3]->ptr);
+
+ /* Initialize metadata spec to collect metadata+expiry from payload. */
+ KeyMetaSpec keymeta;
+ keyMetaSpecInit(&keymeta);
+
+ /* Compute TTL early so we can add it to metadata spec in correct order */
+ if (ttl) {
+ if (!absttl) ttl+=commandTimeSnapshot();
+ keyMetaSpecAdd(&keymeta, KEY_META_ID_EXPIRE, ttl);
+ }
+
+ /* With metadata, type = RDB_OPCODE_KEY_META. Layout: [<META>,]<TYPE>,<KEY>,<VALUE> */
+ type = rdbLoadType(&payload);
+ if (rdbResolveKeyType(&payload, &type, c->db->id, &keymeta) == -1) {
+ addReplyError(c,"Bad data format");
+ return;
+ }
+
+ /* Load the object */
+ if ((obj = rdbLoadObject(type,&payload,key->ptr,c->db->id,NULL)) == NULL)
+ {
+ keyMetaSpecCleanup(&keymeta);
+ addReplyError(c,"Bad data format");
+ return;
+ }
+
+ /* Remove the old key if needed. */
+ int deleted = 0;
+ if (replace)
+ deleted = dbDelete(c->db,key);
+
+ if (ttl && checkAlreadyExpired(ttl)) {
+ if (deleted) {
+ robj *aux = server.lazyfree_lazy_server_del ? shared.unlink : shared.del;
+ rewriteClientCommandVector(c, 2, aux, key);
+ keyModified(c,c->db,key,NULL,1);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
+ server.dirty++;
+ }
+ keyMetaSpecCleanup(&keymeta);
+ decrRefCount(obj);
+ addReply(c, shared.ok);
+ return;
+ }
+
+ /* Create the key and set the TTL if any */
+ kvobj *kv = dbAddInternal(c->db, key, &obj, NULL, &keymeta);
+
+ /* If minExpiredField was set, then the object is hash with expiration
+ * on fields and need to register it in global HFE DS */
+ if (kv->type == OBJ_HASH) {
+ uint64_t minExpiredField = hashTypeGetMinExpire(kv, 1);
+ if (minExpiredField != EB_EXPIRE_TIME_INVALID)
+ estoreAdd(c->db->subexpires, getKeySlot(key->ptr), kv, minExpiredField);
+ }
+
+ if (ttl) {
+ if (!absttl) {
+ /* Propagate TTL as absolute timestamp */
+ robj *ttl_obj = createStringObjectFromLongLong(ttl);
+ rewriteClientCommandArgument(c,2,ttl_obj);
+ decrRefCount(ttl_obj);
+ rewriteClientCommandArgument(c,c->argc,shared.absttl);
+ }
+ }
+ objectSetLRUOrLFU(kv, lfu_freq, lru_idle, lru_clock, 1000);
+ keyModified(c,c->db,key,NULL,1);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
+
+ /* If we deleted a key that means REPLACE parameter was passed and the
+ * destination key existed. */
+ if (deleted) {
+ notifyKeyspaceEvent(NOTIFY_OVERWRITTEN, "overwritten", key, c->db->id);
+ if (oldtype != kv->type) {
+ notifyKeyspaceEvent(NOTIFY_TYPE_CHANGED, "type_changed", key, c->db->id);
+ }
+ }
+ addReply(c,shared.ok);
+ server.dirty++;
+}
+/* MIGRATE socket cache implementation.
+ *
+ * We take a map between host:ip and a TCP socket that we used to connect
+ * to this instance in recent time.
+ * This sockets are closed when the max number we cache is reached, and also
+ * in serverCron() when they are around for more than a few seconds. */
+#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
+#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
+
+typedef struct migrateCachedSocket {
+ connection *conn;
+ long last_dbid;
+ time_t last_use_time;
+} migrateCachedSocket;
+
+/* Return a migrateCachedSocket containing a TCP socket connected with the
+ * target instance, possibly returning a cached one.
+ *
+ * This function is responsible of sending errors to the client if a
+ * connection can't be established. In this case -1 is returned.
+ * Otherwise on success the socket is returned, and the caller should not
+ * attempt to free it after usage.
+ *
+ * If the caller detects an error while using the socket, migrateCloseSocket()
+ * should be called so that the connection will be created from scratch
+ * the next time. */
+migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
+ connection *conn;
+ sds name = sdsempty();
+ migrateCachedSocket *cs;
+
+ /* Check if we have an already cached socket for this ip:port pair. */
+ name = sdscatlen(name,host->ptr,sdslen(host->ptr));
+ name = sdscatlen(name,":",1);
+ name = sdscatlen(name,port->ptr,sdslen(port->ptr));
+ cs = dictFetchValue(server.migrate_cached_sockets,name);
+ if (cs) {
+ sdsfree(name);
+ cs->last_use_time = server.unixtime;
+ return cs;
+ }
+
+ /* No cached socket, create one. */
+ if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
+ /* Too many items, drop one at random. */
+ dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
+ cs = dictGetVal(de);
+ connClose(cs->conn);
+ zfree(cs);
+ dictDelete(server.migrate_cached_sockets,dictGetKey(de));
+ }
+
+ /* Create the connection */
+ conn = connCreate(server.el, connTypeOfCluster());
+ if (connBlockingConnect(conn, host->ptr, atoi(port->ptr), timeout)
+ != C_OK) {
+ addReplyError(c,"-IOERR error or timeout connecting to the client");
+ connClose(conn);
+ sdsfree(name);
+ return NULL;
+ }
+ connEnableTcpNoDelay(conn);
+
+ /* Add to the cache and return it to the caller. */
+ cs = zmalloc(sizeof(*cs));
+ cs->conn = conn;
+
+ cs->last_dbid = -1;
+ cs->last_use_time = server.unixtime;
+ dictAdd(server.migrate_cached_sockets,name,cs);
+ return cs;
+}
+
+/* Free a migrate cached connection. */
+void migrateCloseSocket(robj *host, robj *port) {
+ sds name = sdsempty();
+ migrateCachedSocket *cs;
+
+ name = sdscatlen(name,host->ptr,sdslen(host->ptr));
+ name = sdscatlen(name,":",1);
+ name = sdscatlen(name,port->ptr,sdslen(port->ptr));
+ cs = dictFetchValue(server.migrate_cached_sockets,name);
+ if (!cs) {
+ sdsfree(name);
+ return;
+ }
+
+ connClose(cs->conn);
+ zfree(cs);
+ dictDelete(server.migrate_cached_sockets,name);
+ sdsfree(name);
+}
+
+void migrateCloseTimedoutSockets(void) {
+ dictIterator di;
+ dictEntry *de;
+
+ dictInitSafeIterator(&di, server.migrate_cached_sockets);
+ while((de = dictNext(&di)) != NULL) {
+ migrateCachedSocket *cs = dictGetVal(de);
+
+ if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
+ connClose(cs->conn);
+ zfree(cs);
+ dictDelete(server.migrate_cached_sockets,dictGetKey(de));
+ }
+ }
+ dictResetIterator(&di);
+}
+
+/* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password |
+ * AUTH2 username password]
+ *
+ * On in the multiple keys form:
+ *
+ * MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password |
+ * AUTH2 username password] KEYS key1 key2 ... keyN */
+void migrateCommand(client *c) {
+ migrateCachedSocket *cs;
+ int copy = 0, replace = 0, j;
+ char *username = NULL;
+ char *password = NULL;
+ long timeout;
+ long dbid;
+ robj **kvArray = NULL; /* Objects to migrate. */
+ robj **keyArray = NULL; /* Key names. */
+ robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
+ rio cmd, payload;
+ int may_retry = 1;
+ int write_error = 0;
+ int argv_rewritten = 0;
+
+ /* To support the KEYS option we need the following additional state. */
+ int first_key = 3; /* Argument index of the first key. */
+ int num_keys = 1; /* By default only migrate the 'key' argument. */
+
+ /* Parse additional options */
+ for (j = 6; j < c->argc; j++) {
+ int moreargs = (c->argc-1) - j;
+ if (!strcasecmp(c->argv[j]->ptr,"copy")) {
+ copy = 1;
+ } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
+ replace = 1;
+ } else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
+ if (!moreargs) {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ j++;
+ password = c->argv[j]->ptr;
+ redactClientCommandArgument(c,j);
+ } else if (!strcasecmp(c->argv[j]->ptr,"auth2")) {
+ if (moreargs < 2) {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ username = c->argv[++j]->ptr;
+ redactClientCommandArgument(c,j);
+ password = c->argv[++j]->ptr;
+ redactClientCommandArgument(c,j);
+ } else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
+ if (sdslen(c->argv[3]->ptr) != 0) {
+ addReplyError(c,
+ "When using MIGRATE KEYS option, the key argument"
+ " must be set to the empty string");
+ return;
+ }
+ first_key = j+1;
+ num_keys = c->argc - j - 1;
+ break; /* All the remaining args are keys. */
+ } else {
+ addReplyErrorObject(c,shared.syntaxerr);
+ return;
+ }
+ }
+
+ /* Sanity check */
+ if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
+ getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
+ {
+ return;
+ }
+ if (timeout <= 0) timeout = 1000;
+
+ /* Check if the keys are here. If at least one key is to migrate, do it
+ * otherwise if all the keys are missing reply with "NOKEY" to signal
+ * the caller there was nothing to migrate. We don't return an error in
+ * this case, since often this is due to a normal condition like the key
+ * expiring in the meantime. */
+ kvArray = zrealloc(kvArray,sizeof(kvobj*)*num_keys);
+ keyArray = zrealloc(keyArray,sizeof(robj*)*num_keys);
+ int num_exists = 0;
+
+ for (j = 0; j < num_keys; j++) {
+ if ((kvArray[num_exists] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
+ keyArray[num_exists] = c->argv[first_key+j];
+ num_exists++;
+ }
+ }
+ num_keys = num_exists;
+ if (num_keys == 0) {
+ zfree(kvArray); zfree(keyArray);
+ addReplySds(c,sdsnew("+NOKEY\r\n"));
+ return;
+ }
+
+ try_again:
+ write_error = 0;
+
+ /* Connect */
+ cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
+ if (cs == NULL) {
+ zfree(kvArray); zfree(keyArray);
+ return; /* error sent to the client by migrateGetSocket() */
+ }
+
+ rioInitWithBuffer(&cmd,sdsempty());
+
+ /* Authentication */
+ if (password) {
+ int arity = username ? 3 : 2;
+ serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity));
+ serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
+ if (username) {
+ serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username,
+ sdslen(username)));
+ }
+ serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
+ sdslen(password)));
+ }
+
+ /* Send the SELECT command if the current DB is not already selected. */
+ int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
+ if (select) {
+ serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
+ serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
+ serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
+ }
+
+ int non_expired = 0; /* Number of keys that we'll find non expired.
+ Note that serializing large keys may take some time
+ so certain keys that were found non expired by the
+ lookupKey() function, may be expired later. */
+
+ /* Create RESTORE payload and generate the protocol to call the command. */
+ for (j = 0; j < num_keys; j++) {
+ long long ttl = 0;
+ long long expireat = kvobjGetExpire(kvArray[j]);
+
+ if (expireat != -1) {
+ ttl = expireat-commandTimeSnapshot();
+ if (ttl < 0) {
+ continue;
+ }
+ if (ttl < 1) ttl = 1;
+ }
+
+ /* Relocate valid (non expired) keys and values into the array in successive
+ * positions to remove holes created by the keys that were present
+ * in the first lookup but are now expired after the second lookup. */
+ kvArray[non_expired] = kvArray[j];
+ keyArray[non_expired++] = keyArray[j];
+
+ serverAssertWithInfo(c,NULL,
+ rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
+
+ if (server.cluster_enabled)
+ serverAssertWithInfo(c,NULL,
+ rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
+ else
+ serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
+ serverAssertWithInfo(c,NULL,sdsEncodedObject(keyArray[j]));
+ serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,keyArray[j]->ptr,
+ sdslen(keyArray[j]->ptr)));
+ serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
+
+ /* Emit the payload argument, that is the serialized object using
+ * the DUMP format. */
+ createDumpPayload(&payload,kvArray[j],keyArray[j],dbid,0);
+ serverAssertWithInfo(c,NULL,
+ rioWriteBulkString(&cmd,payload.io.buffer.ptr,
+ sdslen(payload.io.buffer.ptr)));
+ sdsfree(payload.io.buffer.ptr);
+
+ /* Add the REPLACE option to the RESTORE command if it was specified
+ * as a MIGRATE option. */
+ if (replace)
+ serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
+ }
+
+ /* Fix the actual number of keys we are migrating. */
+ num_keys = non_expired;
+
+ /* Transfer the query to the other node in 64K chunks. */
+ errno = 0;
+ {
+ sds buf = cmd.io.buffer.ptr;
+ size_t pos = 0, towrite;
+ int nwritten = 0;
+
+ while ((towrite = sdslen(buf)-pos) > 0) {
+ towrite = (towrite > (64*1024) ? (64*1024) : towrite);
+ nwritten = connSyncWrite(cs->conn,buf+pos,towrite,timeout);
+ if (nwritten != (signed)towrite) {
+ write_error = 1;
+ goto socket_err;
+ }
+ pos += nwritten;
+ }
+ }
+
+ char buf0[1024]; /* Auth reply. */
+ char buf1[1024]; /* Select reply. */
+ char buf2[1024]; /* Restore reply. */
+
+ /* Read the AUTH reply if needed. */
+ if (password && connSyncReadLine(cs->conn, buf0, sizeof(buf0), timeout) <= 0)
+ goto socket_err;
+
+ /* Read the SELECT reply if needed. */
+ if (select && connSyncReadLine(cs->conn, buf1, sizeof(buf1), timeout) <= 0)
+ goto socket_err;
+
+ /* Read the RESTORE replies. */
+ int error_from_target = 0;
+ int socket_error = 0;
+ int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
+
+ /* Allocate the new argument vector that will replace the current command,
+ * to propagate the MIGRATE as a DEL command (if no COPY option was given).
+ * We allocate num_keys+1 because the additional argument is for "DEL"
+ * command name itself. */
+ if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
+
+ for (j = 0; j < num_keys; j++) {
+ if (connSyncReadLine(cs->conn, buf2, sizeof(buf2), timeout) <= 0) {
+ socket_error = 1;
+ break;
+ }
+ if ((password && buf0[0] == '-') ||
+ (select && buf1[0] == '-') ||
+ buf2[0] == '-')
+ {
+ /* On error assume that last_dbid is no longer valid. */
+ if (!error_from_target) {
+ cs->last_dbid = -1;
+ char *errbuf;
+ if (password && buf0[0] == '-') errbuf = buf0;
+ else if (select && buf1[0] == '-') errbuf = buf1;
+ else errbuf = buf2;
+
+ error_from_target = 1;
+ addReplyErrorFormat(c,"Target instance replied with error: %s",
+ errbuf+1);
+ }
+ } else {
+ if (!copy) {
+ /* No COPY option: remove the local key, signal the change. */
+ dbDelete(c->db,keyArray[j]);
+ keyModified(c,c->db,keyArray[j],NULL,1);
+ notifyKeyspaceEvent(NOTIFY_GENERIC,"del",keyArray[j],c->db->id);
+ server.dirty++;
+
+ /* Populate the argument vector to replace the old one. */
+ newargv[del_idx++] = keyArray[j];
+ incrRefCount(keyArray[j]);
+ }
+ }
+ }
+
+ /* On socket error, if we want to retry, do it now before rewriting the
+ * command vector. We only retry if we are sure nothing was processed
+ * and we failed to read the first reply (j == 0 test). */
+ if (!error_from_target && socket_error && j == 0 && may_retry &&
+ errno != ETIMEDOUT)
+ {
+ goto socket_err; /* A retry is guaranteed because of tested conditions.*/
+ }
+
+ /* On socket errors, close the migration socket now that we still have
+ * the original host/port in the ARGV. Later the original command may be
+ * rewritten to DEL and will be too later. */
+ if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
+
+ if (!copy) {
+ /* Translate MIGRATE as DEL for replication/AOF. Note that we do
+ * this only for the keys for which we received an acknowledgement
+ * from the receiving Redis server, by using the del_idx index. */
+ if (del_idx > 1) {
+ newargv[0] = createStringObject("DEL",3);
+ /* Note that the following call takes ownership of newargv. */
+ replaceClientCommandVector(c,del_idx,newargv);
+ argv_rewritten = 1;
+ } else {
+ /* No key transfer acknowledged, no need to rewrite as DEL. */
+ zfree(newargv);
+ }
+ newargv = NULL; /* Make it safe to call zfree() on it in the future. */
+ }
+
+ /* If we are here and a socket error happened, we don't want to retry.
+ * Just signal the problem to the client, but only do it if we did not
+ * already queue a different error reported by the destination server. */
+ if (!error_from_target && socket_error) {
+ may_retry = 0;
+ goto socket_err;
+ }
+
+ if (!error_from_target) {
+ /* Success! Update the last_dbid in migrateCachedSocket, so that we can
+ * avoid SELECT the next time if the target DB is the same. Reply +OK.
+ *
+ * Note: If we reached this point, even if socket_error is true
+ * still the SELECT command succeeded (otherwise the code jumps to
+ * socket_err label. */
+ cs->last_dbid = dbid;
+ addReply(c,shared.ok);
+ } else {
+ /* On error we already sent it in the for loop above, and set
+ * the currently selected socket to -1 to force SELECT the next time. */
+ }
+
+ sdsfree(cmd.io.buffer.ptr);
+ zfree(kvArray); zfree(keyArray); zfree(newargv);
+ return;
+
+/* On socket errors we try to close the cached socket and try again.
+ * It is very common for the cached socket to get closed, if just reopening
+ * it works it's a shame to notify the error to the caller. */
+ socket_err:
+ /* Cleanup we want to perform in both the retry and no retry case.
+ * Note: Closing the migrate socket will also force SELECT next time. */
+ sdsfree(cmd.io.buffer.ptr);
+
+ /* If the command was rewritten as DEL and there was a socket error,
+ * we already closed the socket earlier. While migrateCloseSocket()
+ * is idempotent, the host/port arguments are now gone, so don't do it
+ * again. */
+ if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
+ zfree(newargv);
+ newargv = NULL; /* This will get reallocated on retry. */
+
+ /* Retry only if it's not a timeout and we never attempted a retry
+ * (or the code jumping here did not set may_retry to zero). */
+ if (errno != ETIMEDOUT && may_retry) {
+ may_retry = 0;
+ goto try_again;
+ }
+
+ /* Cleanup we want to do if no retry is attempted. */
+ zfree(kvArray); zfree(keyArray);
+ addReplyErrorSds(c, sdscatprintf(sdsempty(),
+ "-IOERR error or timeout %s to target instance",
+ write_error ? "writing" : "reading"));
+ return;
+}
+
+/* Cluster node sanity check. Returns C_OK if the node id
+ * is valid an C_ERR otherwise. */
+int verifyClusterNodeId(const char *name, int length) {
+ if (length != CLUSTER_NAMELEN) return C_ERR;
+ for (int i = 0; i < length; i++) {
+ if (name[i] >= 'a' && name[i] <= 'z') continue;
+ if (name[i] >= '0' && name[i] <= '9') continue;
+ return C_ERR;
+ }
+ return C_OK;
+}
+
+int isValidAuxChar(int c) {
+ return isalnum(c) || (strchr("!#$%&()*+:;<>?@[]^{|}~", c) == NULL);
+}
+
+int isValidAuxString(char *s, unsigned int length) {
+ for (unsigned i = 0; i < length; i++) {
+ if (!isValidAuxChar(s[i])) return 0;
+ }
+ return 1;
+}
+
+void clusterCommandMyId(client *c) {
+ char *name = clusterNodeGetName(getMyClusterNode());
+ if (name) {
+ addReplyBulkCBuffer(c,name, CLUSTER_NAMELEN);
+ } else {
+ addReplyError(c, "No ID yet");
+ }
+}
+
+char* getMyClusterId(void) {
+ return clusterNodeGetName(getMyClusterNode());
+}
+
+void clusterCommandMyShardId(client *c) {
+ char *sid = clusterNodeGetShardId(getMyClusterNode());
+ if (sid) {
+ addReplyBulkCBuffer(c,sid, CLUSTER_NAMELEN);
+ } else {
+ addReplyError(c, "No shard ID yet");
+ }
+}
+
+/* When a cluster command is called, we need to decide whether to return TLS info or
+ * non-TLS info by the client's connection type. However if the command is called by
+ * a Lua script or RM_call, there is no connection in the fake client, so we use
+ * server.current_client here to get the real client if available. And if it is not
+ * available (modules may call commands without a real client), we return the default
+ * info, which is determined by server.tls_cluster. */
+static int shouldReturnTlsInfo(void) {
+ if (server.current_client && server.current_client->conn) {
+ return connIsTLS(server.current_client->conn);
+ } else {
+ return server.tls_cluster;
+ }
+}
+
+unsigned int countKeysInSlot(unsigned int slot) {
+ return kvstoreDictSize(server.db->keys, slot);
+}
+
+/* Add detailed information of a node to the output buffer of the given client. */
+void addNodeDetailsToShardReply(client *c, clusterNode *node) {
+
+ int reply_count = 0;
+ char *hostname;
+ void *node_replylen = addReplyDeferredLen(c);
+
+ addReplyBulkCString(c, "id");
+ addReplyBulkCBuffer(c, clusterNodeGetName(node), CLUSTER_NAMELEN);
+ reply_count++;
+
+ if (clusterNodeTcpPort(node)) {
+ addReplyBulkCString(c, "port");
+ addReplyLongLong(c, clusterNodeTcpPort(node));
+ reply_count++;
+ }
+
+ if (clusterNodeTlsPort(node)) {
+ addReplyBulkCString(c, "tls-port");
+ addReplyLongLong(c, clusterNodeTlsPort(node));
+ reply_count++;
+ }
+
+ addReplyBulkCString(c, "ip");
+ addReplyBulkCString(c, clusterNodeIp(node));
+ reply_count++;
+
+ addReplyBulkCString(c, "endpoint");
+ addReplyBulkCString(c, clusterNodePreferredEndpoint(node));
+ reply_count++;
+
+ hostname = clusterNodeHostname(node);
+ if (hostname != NULL && *hostname != '\0') {
+ addReplyBulkCString(c, "hostname");
+ addReplyBulkCString(c, hostname);
+ reply_count++;
+ }
+
+ long long node_offset;
+ if (clusterNodeIsMyself(node)) {
+ node_offset = clusterNodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
+ } else {
+ node_offset = clusterNodeReplOffset(node);
+ }
+
+ addReplyBulkCString(c, "role");
+ addReplyBulkCString(c, clusterNodeIsSlave(node) ? "replica" : "master");
+ reply_count++;
+
+ addReplyBulkCString(c, "replication-offset");
+ addReplyLongLong(c, node_offset);
+ reply_count++;
+
+ addReplyBulkCString(c, "health");
+ const char *health_msg = NULL;
+ if (clusterNodeIsFailing(node)) {
+ health_msg = "fail";
+ } else if (clusterNodeIsSlave(node) && node_offset == 0) {
+ health_msg = "loading";
+ } else {
+ health_msg = "online";
+ }
+ addReplyBulkCString(c, health_msg);
+ reply_count++;
+
+ setDeferredMapLen(c, node_replylen, reply_count);
+}
+
+static clusterNode *clusterGetMasterFromShard(void *shard_handle) {
+ clusterNode *n = NULL;
+ void *node_it = clusterShardHandleGetNodeIterator(shard_handle);
+ while((n = clusterShardNodeIteratorNext(node_it)) != NULL) {
+ if (!clusterNodeIsFailing(n)) {
+ break;
+ }
+ }
+ clusterShardNodeIteratorFree(node_it);
+ if (!n) return NULL;
+ return clusterNodeGetMaster(n);
+}
+
+/* Add the shard reply of a single shard based off the given primary node. */
+void addShardReplyForClusterShards(client *c, void *shard_handle) {
+ serverAssert(clusterGetShardNodeCount(shard_handle) > 0);
+ addReplyMapLen(c, 2);
+ addReplyBulkCString(c, "slots");
+
+ /* Use slot_info_pairs from the primary only */
+ clusterNode *master_node = clusterGetMasterFromShard(shard_handle);
+
+ if (master_node && clusterNodeHasSlotInfo(master_node)) {
+ serverAssert((clusterNodeSlotInfoCount(master_node) % 2) == 0);
+ addReplyArrayLen(c, clusterNodeSlotInfoCount(master_node));
+ for (int i = 0; i < clusterNodeSlotInfoCount(master_node); i++)
+ addReplyLongLong(c, (unsigned long)clusterNodeSlotInfoEntry(master_node, i));
+ } else {
+ /* If no slot info pair is provided, the node owns no slots */
+ addReplyArrayLen(c, 0);
+ }
+
+ addReplyBulkCString(c, "nodes");
+ addReplyArrayLen(c, clusterGetShardNodeCount(shard_handle));
+ void *node_it = clusterShardHandleGetNodeIterator(shard_handle);
+ for (clusterNode *n = clusterShardNodeIteratorNext(node_it); n != NULL; n = clusterShardNodeIteratorNext(node_it)) {
+ addNodeDetailsToShardReply(c, n);
+ clusterFreeNodesSlotsInfo(n);
+ }
+ clusterShardNodeIteratorFree(node_it);
+}
+
+/* Add to the output buffer of the given client, an array of slot (start, end)
+ * pair owned by the shard, also the primary and set of replica(s) along with
+ * information about each node. */
+void clusterCommandShards(client *c) {
+ addReplyArrayLen(c, clusterGetShardCount());
+ /* This call will add slot_info_pairs to all nodes */
+ clusterGenNodesSlotsInfo(0);
+ dictIterator *shard_it = clusterGetShardIterator();
+ for(void *shard_handle = clusterNextShardHandle(shard_it); shard_handle != NULL; shard_handle = clusterNextShardHandle(shard_it)) {
+ addShardReplyForClusterShards(c, shard_handle);
+ }
+ clusterFreeShardIterator(shard_it);
+}
+
+void clusterCommandHelp(client *c) {
+ const char *help[] = {
+ "COUNTKEYSINSLOT <slot>",
+ " Return the number of keys in <slot>.",
+ "GETKEYSINSLOT <slot> <count>",
+ " Return key names stored by current node in a slot.",
+ "INFO",
+ " Return information about the cluster.",
+ "KEYSLOT <key>",
+ " Return the hash slot for <key>.",
+ "MYID",
+ " Return the node id.",
+ "MYSHARDID",
+ " Return the node's shard id.",
+ "NODES",
+ " Return cluster configuration seen by node. Output format:",
+ " <id> <ip:port@bus-port[,hostname]> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ...",
+ "REPLICAS <node-id>",
+ " Return <node-id> replicas.",
+ "SLOTS",
+ " Return information about slots range mappings. Each range is made of:",
+ " start, end, master and replicas IP addresses, ports and ids",
+ "SLOT-STATS",
+ " Return an array of slot usage statistics for slots assigned to the current node.",
+ "SHARDS",
+ " Return information about slot range mappings and the nodes associated with them.",
+ NULL
+ };
+
+ addExtendedReplyHelp(c, help, clusterCommandExtendedHelp());
+}
+
+void clusterCommand(client *c) {
+ if (server.cluster_enabled == 0) {
+ addReplyError(c,"This instance has cluster support disabled");
+ return;
+ }
+
+ if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
+ clusterCommandHelp(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
+ /* CLUSTER NODES */
+ /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
+ sds nodes = clusterGenNodesDescription(c, 0, shouldReturnTlsInfo());
+ addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
+ sdsfree(nodes);
+ } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
+ /* CLUSTER MYID */
+ clusterCommandMyId(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"myshardid") && c->argc == 2) {
+ /* CLUSTER MYSHARDID */
+ clusterCommandMyShardId(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
+ /* CLUSTER SLOTS */
+ clusterCommandSlots(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"shards") && c->argc == 2) {
+ /* CLUSTER SHARDS */
+ clusterCommandShards(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
+ /* CLUSTER INFO */
+
+ sds info = genClusterInfoString();
+
+ /* Produce the reply protocol. */
+ addReplyVerbatim(c,info,sdslen(info),"txt");
+ sdsfree(info);
+ } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
+ /* CLUSTER KEYSLOT <key> */
+ sds key = c->argv[2]->ptr;
+
+ addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
+ } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
+ /* CLUSTER COUNTKEYSINSLOT <slot> */
+ long long slot;
+
+ if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
+ return;
+ if (slot < 0 || slot >= CLUSTER_SLOTS) {
+ addReplyError(c,"Invalid slot");
+ return;
+ }
+
+ if (!clusterCanAccessKeysInSlot(slot)) {
+ addReplyLongLong(c, 0);
+ return;
+ }
+ addReplyLongLong(c,countKeysInSlot(slot));
+ } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
+ /* CLUSTER GETKEYSINSLOT <slot> <count> */
+ long long maxkeys, slot;
+
+ if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
+ return;
+ if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
+ != C_OK)
+ return;
+ if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
+ addReplyError(c,"Invalid slot or number of keys");
+ return;
+ }
+
+ if (!clusterCanAccessKeysInSlot(slot)) {
+ addReplyArrayLen(c, 0);
+ return;
+ }
+
+ unsigned int keys_in_slot = countKeysInSlot(slot);
+ unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
+ addReplyArrayLen(c,numkeys);
+ kvstoreDictIterator kvs_di;
+ dictEntry *de = NULL;
+ kvstoreInitDictIterator(&kvs_di, server.db->keys, slot);
+ for (unsigned int i = 0; i < numkeys; i++) {
+ de = kvstoreDictIteratorNext(&kvs_di);
+ serverAssert(de != NULL);
+ sds sdskey = kvobjGetKey(dictGetKV(de));
+ addReplyBulkCBuffer(c, sdskey, sdslen(sdskey));
+ }
+ kvstoreResetDictIterator(&kvs_di);
+ } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
+ !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
+ /* CLUSTER SLAVES <NODE ID> */
+ /* CLUSTER REPLICAS <NODE ID> */
+ clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
+ int j;
+
+ /* Lookup the specified node in our table. */
+ if (!n) {
+ addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
+ return;
+ }
+
+ if (clusterNodeIsSlave(n)) {
+ addReplyError(c,"The specified node is not a master");
+ return;
+ }
+
+ /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
+ addReplyArrayLen(c, clusterNodeNumSlaves(n));
+ for (j = 0; j < clusterNodeNumSlaves(n); j++) {
+ sds ni = clusterGenNodeDescription(c, clusterNodeGetSlave(n, j), shouldReturnTlsInfo());
+ addReplyBulkCString(c,ni);
+ sdsfree(ni);
+ }
+ } else if (!strcasecmp(c->argv[1]->ptr, "migration")) {
+ clusterMigrationCommand(c);
+ } else if (!strcasecmp(c->argv[1]->ptr,"syncslots") && c->argc >= 3) {
+ clusterSyncSlotsCommand(c);
+ } else if(!clusterCommandSpecial(c)) {
+ addReplySubcommandSyntaxError(c);
+ return;
+ }
+}
+
+/* Extract slot number from keys in a keys_result structure and return to caller.
+ * Returns:
+ * - The slot number if all keys belong to the same slot
+ * - INVALID_CLUSTER_SLOT if there are no keys or cluster is disabled
+ * - CLUSTER_CROSSSLOT if keys belong to different slots (cross-slot error) */
+int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result) {
+ if (keys_result->numkeys == 0 || !server.cluster_enabled)
+ return INVALID_CLUSTER_SLOT;
+
+ int first_slot = INVALID_CLUSTER_SLOT;
+ for (int j = 0; j < keys_result->numkeys; j++) {
+ robj *this_key = argv[keys_result->keys[j].pos];
+ int this_slot = (int)keyHashSlot((char*)this_key->ptr, sdslen(this_key->ptr));
+
+ if (first_slot == INVALID_CLUSTER_SLOT)
+ first_slot = this_slot;
+ else if (first_slot != this_slot) {
+ return CLUSTER_CROSSSLOT;
+ }
+ }
+ return first_slot;
+}
+
+/* Return the pointer to the cluster node that is able to serve the command.
+ * For the function to succeed the command should only target either:
+ *
+ * 1) A single key (even multiple times like RPOPLPUSH mylist mylist).
+ * 2) Multiple keys in the same hash slot, while the slot is stable (no
+ * resharding in progress).
+ *
+ * On success the function returns the node that is able to serve the request.
+ * If the node is not 'myself' a redirection must be performed. The kind of
+ * redirection is specified setting the integer passed by reference
+ * 'error_code', which will be set to CLUSTER_REDIR_ASK or
+ * CLUSTER_REDIR_MOVED.
+ *
+ * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
+ *
+ * If the command fails NULL is returned, and the reason of the failure is
+ * provided via 'error_code', which will be set to:
+ *
+ * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
+ * don't belong to the same hash slot.
+ *
+ * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
+ * belonging to the same slot, but the slot is not stable (in migration or
+ * importing state, likely because a resharding is in progress).
+ *
+ * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
+ * not bound to any node. In this case the cluster global state should be
+ * already "down" but it is fragile to rely on the update of the global state,
+ * so we also handle it here.
+ *
+ * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
+ * down but the user attempts to execute a command that addresses one or more keys. */
+clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot,
+ getKeysResult *keys_result, uint8_t read_error, uint64_t cmd_flags, int *error_code)
+{
+ clusterNode *myself = getMyClusterNode();
+ clusterNode *n = NULL;
+ robj *firstkey = NULL;
+ int multiple_keys = 0;
+ multiState *ms, _ms;
+ pendingCommand mc;
+ pendingCommand *mcp = &mc;
+ int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
+ existing_keys = 0;
+ int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */
+
+ /* Allow any key to be set if a module disabled cluster redirections. */
+ if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
+ return myself;
+
+ /* Set error code optimistically for the base case. */
+ if (error_code) *error_code = CLUSTER_REDIR_NONE;
+
+ /* Modules can turn off Redis Cluster redirection: this is useful
+ * when writing a module that implements a completely different
+ * distributed system. */
+
+ /* We handle all the cases as if they were EXEC commands, so we have
+ * a common code path for everything */
+ if (cmd->proc == execCommand) {
+ /* If CLIENT_MULTI flag is not set EXEC is just going to return an
+ * error. */
+ if (!(c->flags & CLIENT_MULTI)) return myself;
+ ms = &c->mstate;
+ } else {
+ /* In order to have a single codepath create a fake Multi State
+ * structure if the client is not in MULTI/EXEC state, this way
+ * we have a single codepath below. */
+ ms = &_ms;
+ _ms.commands = &mcp;
+ _ms.count = 1;
+
+ /* Properly initialize the fake pendingCommand */
+ initPendingCommand(&mc);
+ mc.argv = argv;
+ mc.argc = argc;
+ mc.cmd = cmd;
+ mc.slot = hashslot ? *hashslot : INVALID_CLUSTER_SLOT;
+ mc.read_error = read_error;
+ if (keys_result) {
+ mc.keys_result = *keys_result;
+ mc.flags |= PENDING_CMD_KEYS_RESULT_VALID;
+ }
+ }
+
+ /* Check that all the keys are in the same hash slot, and obtain this
+ * slot and the node associated. */
+ for (i = 0; i < ms->count; i++) {
+ struct redisCommand *mcmd;
+ robj **margv;
+ int margc, j;
+ keyReference *keyindex;
+
+ pendingCommand *pcmd = ms->commands[i];
+
+ mcmd = pcmd->cmd;
+ margc = pcmd->argc;
+ margv = pcmd->argv;
+
+ /* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
+ if (!pubsubshard_included &&
+ doesCommandHaveChannelsWithFlags(mcmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE))
+ {
+ pubsubshard_included = 1;
+ }
+
+ /* If we have a cached keys result from preprocessCommand(), use it.
+ * Otherwise, extract keys result. */
+ int use_cache_keys_result = pcmd->flags & PENDING_CMD_KEYS_RESULT_VALID;
+ getKeysResult result = GETKEYS_RESULT_INIT;
+ if (use_cache_keys_result)
+ result = pcmd->keys_result;
+ else
+ getKeysFromCommand(mcmd,margv,margc,&result);
+ keyindex = result.keys;
+
+ for (j = 0; j < result.numkeys; j++) {
+ /* The command has keys and was checked for cross-slot between its keys in preprocessCommand() */
+ if (pcmd->read_error == CLIENT_READ_CROSS_SLOT) {
+ /* Error: multiple keys from different slots. */
+ if (error_code)
+ *error_code = CLUSTER_REDIR_CROSS_SLOT;
+ return NULL;
+ }
+
+ robj *thiskey = margv[keyindex[j].pos];
+ int thisslot = pcmd->slot;
+ if (thisslot == INVALID_CLUSTER_SLOT)
+ thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr));
+
+ if (firstkey == NULL) {
+ /* This is the first key we see. Check what is the slot
+ * and node. */
+ firstkey = thiskey;
+ slot = thisslot;
+ n = getNodeBySlot(slot);
+
+ /* Error: If a slot is not served, we are in "cluster down"
+ * state. However the state is yet to be updated, so this was
+ * not trapped earlier in processCommand(). Report the same
+ * error to the client. */
+ if (n == NULL) {
+ if (!use_cache_keys_result) getKeysFreeResult(&result);
+ if (error_code)
+ *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
+ return NULL;
+ }
+
+ /* If we are migrating or importing this slot, we need to check
+ * if we have all the keys in the request (the only way we
+ * can safely serve the request, otherwise we return a TRYAGAIN
+ * error). To do so we set the importing/migrating state and
+ * increment a counter for every missing key. */
+ if (n == myself &&
+ getMigratingSlotDest(slot) != NULL)
+ {
+ migrating_slot = 1;
+ } else if (getImportingSlotSource(slot) != NULL) {
+ importing_slot = 1;
+ }
+ } else {
+ /* If it is not the first key/channel, make sure it is exactly
+ * the same key/channel as the first we saw. */
+ if (slot != thisslot) {
+ /* Error: multiple keys from different slots. */
+ if (!use_cache_keys_result) getKeysFreeResult(&result);
+ if (error_code)
+ *error_code = CLUSTER_REDIR_CROSS_SLOT;
+ return NULL;
+ }
+ if (importing_slot && !multiple_keys && !equalStringObjects(firstkey,thiskey)) {
+ /* Flag this request as one with multiple different
+ * keys/channels when the slot is in importing state. */
+ multiple_keys = 1;
+ }
+ }
+
+ /* Migrating / Importing slot? Count keys we don't have.
+ * If it is pubsubshard command, it isn't required to check
+ * the channel being present or not in the node during the
+ * slot migration, the channel will be served from the source
+ * node until the migration completes with CLUSTER SETSLOT <slot>
+ * NODE <node-id>. */
+ int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
+ if ((migrating_slot || importing_slot) && !pubsubshard_included)
+ {
+ if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++;
+ else existing_keys++;
+ }
+ }
+ if (!use_cache_keys_result) getKeysFreeResult(&result);
+ }
+
+ /* No key at all in command? then we can serve the request
+ * without redirections or errors in all the cases. */
+ if (n == NULL) return myself;
+
+ /* Cluster is globally down but we got keys? We only serve the request
+ * if it is a read command and when allow_reads_when_down is enabled. */
+ if (!isClusterHealthy()) {
+ if (pubsubshard_included) {
+ if (!server.cluster_allow_pubsubshard_when_down) {
+ if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
+ return NULL;
+ }
+ } else if (!server.cluster_allow_reads_when_down) {
+ /* The cluster is configured to block commands when the
+ * cluster is down. */
+ if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
+ return NULL;
+ } else if (cmd_flags & CMD_WRITE) {
+ /* The cluster is configured to allow read only commands */
+ if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;
+ return NULL;
+ } else {
+ /* Fall through and allow the command to be executed:
+ * this happens when server.cluster_allow_reads_when_down is
+ * true and the command is not a write command */
+ }
+ }
+
+ /* Return the hashslot by reference. */
+ if (hashslot) *hashslot = slot;
+
+ /* MIGRATE always works in the context of the local node if the slot
+ * is open (migrating or importing state). We need to be able to freely
+ * move keys among instances in this case. */
+ if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
+ return myself;
+
+ /* If we don't have all the keys and we are migrating the slot, send
+ * an ASK redirection or TRYAGAIN. */
+ if (migrating_slot && missing_keys) {
+ /* If we have keys but we don't have all keys, we return TRYAGAIN */
+ if (existing_keys) {
+ if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
+ return NULL;
+ } else {
+ if (error_code) *error_code = CLUSTER_REDIR_ASK;
+ return getMigratingSlotDest(slot);
+ }
+ }
+
+ /* If we are receiving the slot, and the client correctly flagged the
+ * request as "ASKING", we can serve the request. However if the request
+ * involves multiple keys and we don't have them all, the only option is
+ * to send a TRYAGAIN error. */
+ if (importing_slot &&
+ (c->flags & CLIENT_ASKING || cmd_flags & CMD_ASKING))
+ {
+ if (multiple_keys && missing_keys) {
+ if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
+ return NULL;
+ } else {
+ return myself;
+ }
+ }
+
+ /* Handle the read-only client case reading from a slave: if this
+ * node is a slave and the request is about a hash slot our master
+ * is serving, we can reply without redirection. */
+ int is_write_command = (cmd_flags & CMD_WRITE) ||
+ (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
+ if (((c->flags & CLIENT_READONLY) || pubsubshard_included) &&
+ !is_write_command &&
+ clusterNodeIsSlave(myself) &&
+ clusterNodeGetSlaveof(myself) == n)
+ {
+ return myself;
+ }
+
+ /* Base case: just return the right node. However, if this node is not
+ * myself, set error_code to MOVED since we need to issue a redirection. */
+ if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
+ return n;
+}
+
+/* Send the client the right redirection code, according to error_code
+ * that should be set to one of CLUSTER_REDIR_* macros.
+ *
+ * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
+ * are used, then the node 'n' should not be NULL, but should be the
+ * node we want to mention in the redirection. Moreover hashslot should
+ * be set to the hash slot that caused the redirection. */
+void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
+ if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
+ addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot");
+ } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
+ /* The request spawns multiple keys in the same slot,
+ * but the slot is not "stable" currently as there is
+ * a migration or import in progress. */
+ addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot");
+ } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
+ addReplyError(c,"-CLUSTERDOWN The cluster is down");
+ } else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
+ addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands");
+ } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
+ addReplyError(c,"-CLUSTERDOWN Hash slot not served");
+ } else if (error_code == CLUSTER_REDIR_MOVED ||
+ error_code == CLUSTER_REDIR_ASK)
+ {
+ /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
+ int port = clusterNodeClientPort(n, shouldReturnTlsInfo());
+ addReplyErrorSds(c,sdscatprintf(sdsempty(),
+ "-%s %d %s:%d",
+ (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
+ hashslot, clusterNodePreferredEndpoint(n), port));
+ } else {
+ serverPanic("getNodeByQuery() unknown error.");
+ }
+}
+
+/* This function is called by the function processing clients incrementally
+ * to detect timeouts, in order to handle the following case:
+ *
+ * 1) A client blocks with BLPOP or similar blocking operation.
+ * 2) The master migrates the hash slot elsewhere or turns into a slave.
+ * 3) The client may remain blocked forever (or up to the max timeout time)
+ * waiting for a key change that will never happen.
+ *
+ * If the client is found to be blocked into a hash slot this node no
+ * longer handles, the client is sent a redirection error, and the function
+ * returns 1. Otherwise 0 is returned and no operation is performed. */
+int clusterRedirectBlockedClientIfNeeded(client *c) {
+ clusterNode *myself = getMyClusterNode();
+ if (c->flags & CLIENT_BLOCKED &&
+ (c->bstate.btype == BLOCKED_LIST ||
+ c->bstate.btype == BLOCKED_ZSET ||
+ c->bstate.btype == BLOCKED_STREAM ||
+ c->bstate.btype == BLOCKED_MODULE))
+ {
+ dictEntry *de;
+ dictIterator di;
+
+ /* If the cluster is down, unblock the client with the right error.
+ * If the cluster is configured to allow reads on cluster down, we
+ * still want to emit this error since a write will be required
+ * to unblock them which may never come. */
+ if (!isClusterHealthy()) {
+ clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
+ return 1;
+ }
+
+ /* If the client is blocked on module, but not on a specific key,
+ * don't unblock it (except for the CLUSTER_FAIL case above). */
+ if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c))
+ return 0;
+
+ /* All keys must belong to the same slot, so check first key only. */
+ dictInitIterator(&di, c->bstate.keys);
+ if ((de = dictNext(&di)) != NULL) {
+ robj *key = dictGetKey(de);
+ int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
+ clusterNode *node = getNodeBySlot(slot);
+
+ /* if the client is read-only and attempting to access key that our
+ * replica can handle, allow it. */
+ if ((c->flags & CLIENT_READONLY) &&
+ !(c->lastcmd->flags & CMD_WRITE) &&
+ clusterNodeIsSlave(myself) && clusterNodeGetSlaveof(myself) == node)
+ {
+ node = myself;
+ }
+
+ /* We send an error and unblock the client if:
+ * 1) The slot is unassigned, emitting a cluster down error.
+ * 2) The slot is not handled by this node, nor being imported. */
+ if (node != myself && getImportingSlotSource(slot) == NULL)
+ {
+ if (node == NULL) {
+ clusterRedirectClient(c,NULL,0,
+ CLUSTER_REDIR_DOWN_UNBOUND);
+ } else {
+ clusterRedirectClient(c,node,slot,
+ CLUSTER_REDIR_MOVED);
+ }
+ dictResetIterator(&di);
+ return 1;
+ }
+ }
+ dictResetIterator(&di);
+ }
+ return 0;
+}
+
+/* Returns an indication if the replica node is fully available
+ * and should be listed in CLUSTER SLOTS response.
+ * Returns 1 for available nodes, 0 for nodes that have
+ * not finished their initial sync, in failed state, or are
+ * otherwise considered not available to serve read commands. */
+static int isReplicaAvailable(clusterNode *node) {
+ if (clusterNodeIsFailing(node)) {
+ return 0;
+ }
+ long long repl_offset = clusterNodeReplOffset(node);
+ if (clusterNodeIsMyself(node)) {
+ /* Nodes do not update their own information
+ * in the cluster node list. */
+ repl_offset = replicationGetSlaveOffset();
+ }
+ return (repl_offset != 0);
+}
+
+void addNodeToNodeReply(client *c, clusterNode *node) {
+ char* hostname = clusterNodeHostname(node);
+ addReplyArrayLen(c, 4);
+ if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_IP) {
+ addReplyBulkCString(c, clusterNodeIp(node));
+ } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_HOSTNAME) {
+ if (hostname != NULL && hostname[0] != '\0') {
+ addReplyBulkCString(c, hostname);
+ } else {
+ addReplyBulkCString(c, "?");
+ }
+ } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT) {
+ addReplyNull(c);
+ } else {
+ serverPanic("Unrecognized preferred endpoint type");
+ }
+
+ /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
+ addReplyLongLong(c, clusterNodeClientPort(node, shouldReturnTlsInfo()));
+ addReplyBulkCBuffer(c, clusterNodeGetName(node), CLUSTER_NAMELEN);
+
+ /* Add the additional endpoint information, this is all the known networking information
+ * that is not the preferred endpoint. Note the logic is evaluated twice so we can
+ * correctly report the number of additional network arguments without using a deferred
+ * map, an assertion is made at the end to check we set the right length. */
+ int length = 0;
+ if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) {
+ length++;
+ }
+ if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME
+ && hostname != NULL && hostname[0] != '\0')
+ {
+ length++;
+ }
+ addReplyMapLen(c, length);
+
+ if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) {
+ addReplyBulkCString(c, "ip");
+ addReplyBulkCString(c, clusterNodeIp(node));
+ length--;
+ }
+ if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME
+ && hostname != NULL && hostname[0] != '\0')
+ {
+ addReplyBulkCString(c, "hostname");
+ addReplyBulkCString(c, hostname);
+ length--;
+ }
+ serverAssert(length == 0);
+}
+
+void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
+ int i, nested_elements = 3; /* slots (2) + master addr (1) */
+ for (i = 0; i < clusterNodeNumSlaves(node); i++) {
+ if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue;
+ nested_elements++;
+ }
+ addReplyArrayLen(c, nested_elements);
+ addReplyLongLong(c, start_slot);
+ addReplyLongLong(c, end_slot);
+ addNodeToNodeReply(c, node);
+
+ /* Remaining nodes in reply are replicas for slot range */
+ for (i = 0; i < clusterNodeNumSlaves(node); i++) {
+ /* This loop is copy/pasted from clusterGenNodeDescription()
+ * with modifications for per-slot node aggregation. */
+ if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue;
+ addNodeToNodeReply(c, clusterNodeGetSlave(node, i));
+ nested_elements--;
+ }
+ serverAssert(nested_elements == 3); /* Original 3 elements */
+}
+
+void clusterCommandSlots(client * c) {
+ /* Format: 1) 1) start slot
+ * 2) end slot
+ * 3) 1) master IP
+ * 2) master port
+ * 3) node ID
+ * 4) 1) replica IP
+ * 2) replica port
+ * 3) node ID
+ * ... continued until done
+ */
+ clusterNode *n = NULL;
+ int num_masters = 0, start = -1;
+ void *slot_replylen = addReplyDeferredLen(c);
+
+ for (int i = 0; i <= CLUSTER_SLOTS; i++) {
+ /* Find start node and slot id. */
+ if (n == NULL) {
+ if (i == CLUSTER_SLOTS) break;
+ n = getNodeBySlot(i);
+ start = i;
+ continue;
+ }
+
+ /* Add cluster slots info when occur different node with start
+ * or end of slot. */
+ if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) {
+ addNodeReplyForClusterSlot(c, n, start, i-1);
+ num_masters++;
+ if (i == CLUSTER_SLOTS) break;
+ n = getNodeBySlot(i);
+ start = i;
+ }
+ }
+ setDeferredArrayLen(c, slot_replylen, num_masters);
+}
+
+/* -----------------------------------------------------------------------------
+ * Cluster functions related to serving / redirecting clients
+ * -------------------------------------------------------------------------- */
+
+/* The ASKING command is required after a -ASK redirection.
+ * The client should issue ASKING before to actually send the command to
+ * the target instance. See the Redis Cluster specification for more
+ * information. */
+void askingCommand(client *c) {
+ if (server.cluster_enabled == 0) {
+ addReplyError(c,"This instance has cluster support disabled");
+ return;
+ }
+ c->flags |= CLIENT_ASKING;
+ addReply(c,shared.ok);
+}
+
+/* The READONLY command is used by clients to enter the read-only mode.
+ * In this mode slaves will not redirect clients as long as clients access
+ * with read-only commands to keys that are served by the slave's master. */
+void readonlyCommand(client *c) {
+ if (server.cluster_enabled == 0) {
+ addReplyError(c,"This instance has cluster support disabled");
+ return;
+ }
+ c->flags |= CLIENT_READONLY;
+ addReply(c,shared.ok);
+}
+
+/* Remove all the keys in the specified hash slot.
+ * The number of removed items is returned. */
+unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command) {
+ unsigned int j = 0;
+
+ if (!kvstoreDictSize(server.db->keys, (int) hashslot))
+ return 0;
+
+ kvstoreDictIterator kvs_di;
+ dictEntry *de = NULL;
+ kvstoreInitDictSafeIterator(&kvs_di, server.db->keys, (int) hashslot);
+ while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
+ enterExecutionUnit(1, 0);
+ sds sdskey = kvobjGetKey(dictGetKV(de));
+ robj *key = createStringObject(sdskey, sdslen(sdskey));
+ dbDelete(&server.db[0], key);
+
+ keyModified(NULL, &server.db[0], key, NULL, 1);
+ if (by_command) {
+ /* Keys are deleted by a command (trimslots), we need to notify the
+ * keyspace event. Though, we don't need to propagate the DEL
+ * command, as the command (trimslots) will be propagated. */
+ notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
+ } else {
+ /* Propagate the DEL command */
+ propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
+ /* The keys are not actually logically deleted from the database,
+ * just moved to another node. The modules needs to know that these
+ * keys are no longer available locally, so just send the keyspace
+ * notification to the modules, but not to clients. */
+ moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
+ }
+ exitExecutionUnit();
+ postExecutionUnitOperations();
+ decrRefCount(key);
+ j++;
+ server.dirty++;
+ }
+ kvstoreResetDictIterator(&kvs_di);
+ return j;
+}
+
+/* Delete the keys in the slot ranges. Returns the number of deleted items */
+unsigned int clusterDelKeysInSlotRangeArray(slotRangeArray *slots, int by_command) {
+ unsigned int j = 0;
+ for (int i = 0; i < slots->num_ranges; i++) {
+ for (int slot = slots->ranges[i].start; slot <= slots->ranges[i].end; slot++) {
+ j += clusterDelKeysInSlot(slot, by_command);
+ }
+ }
+ return j;
+}
+
+int clusterIsMySlot(int slot) {
+ return getMyClusterNode() == getNodeBySlot(slot);
+}
+
+void replySlotsFlushAndFree(client *c, slotRangeArray *slots) {
+ addReplyArrayLen(c, slots->num_ranges);
+ for (int i = 0 ; i < slots->num_ranges ; i++) {
+ addReplyArrayLen(c, 2);
+ addReplyLongLong(c, slots->ranges[i].start);
+ addReplyLongLong(c, slots->ranges[i].end);
+ }
+ slotRangeArrayFree(slots);
+}
+
+/* Normalizes (sorts and merges adjacent ranges), checks that slot ranges are
+ * well-formed and non-overlapping. */
+int slotRangeArrayNormalizeAndValidate(slotRangeArray *slots, sds *err) {
+ unsigned char used_slots[CLUSTER_SLOTS] = {0};
+
+ if (slots->num_ranges <= 0 || slots->num_ranges >= CLUSTER_SLOTS) {
+ *err = sdscatprintf(sdsempty(), "invalid number of slot ranges: %d", slots->num_ranges);
+ return C_ERR;
+ }
+
+ /* Sort and merge adjacent slot ranges. */
+ slotRangeArraySortAndMerge(slots);
+
+ for (int i = 0; i < slots->num_ranges; i++) {
+ if (slots->ranges[i].start >= CLUSTER_SLOTS ||
+ slots->ranges[i].end >= CLUSTER_SLOTS)
+ {
+ *err = sdscatprintf(sdsempty(), "slot range is out of range: %d-%d",
+ slots->ranges[i].start, slots->ranges[i].end);
+ return C_ERR;
+ }
+
+ if (slots->ranges[i].start > slots->ranges[i].end) {
+ *err = sdscatprintf(sdsempty(), "start slot number %d is greater than end slot number %d",
+ slots->ranges[i].start, slots->ranges[i].end);
+ return C_ERR;
+ }
+
+ for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) {
+ if (used_slots[j]) {
+ *err = sdscatprintf(sdsempty(), "Slot %d specified multiple times", j);
+ return C_ERR;
+ }
+ used_slots[j]++;
+ }
+ }
+ return C_OK;
+}
+
+/* Create a slot range array with the specified number of ranges. */
+slotRangeArray *slotRangeArrayCreate(int num_ranges) {
+ slotRangeArray *slots = zcalloc(sizeof(slotRangeArray) + num_ranges * sizeof(slotRange));
+ slots->num_ranges = num_ranges;
+ return slots;
+}
+
+/* Duplicate the slot range array. */
+slotRangeArray *slotRangeArrayDup(slotRangeArray *slots) {
+ slotRangeArray *dup = slotRangeArrayCreate(slots->num_ranges);
+ memcpy(dup->ranges, slots->ranges, sizeof(slotRange) * slots->num_ranges);
+ return dup;
+}
+
+/* Set the slot range at the specified index. */
+void slotRangeArraySet(slotRangeArray *slots, int idx, int start, int end) {
+ slots->ranges[idx].start = start;
+ slots->ranges[idx].end = end;
+}
+
+/* Create a slot range string in the format of: "1000-2000 3000-4000 ..." */
+sds slotRangeArrayToString(slotRangeArray *slots) {
+ sds s = sdsempty();
+ if (slots == NULL || slots->num_ranges == 0) return s;
+
+ for (int i = 0; i < slots->num_ranges; i++) {
+ slotRange *sr = &slots->ranges[i];
+ s = sdscatprintf(s, "%d-%d ", sr->start, sr->end);
+ }
+ sdssetlen(s, sdslen(s) - 1);
+ s[sdslen(s)] = '\0';
+
+ return s;
+}
+
+/* Parse a slot range string in the format "1000-2000 3000-4000 ..." into a slotRangeArray.
+ * Returns a new slotRangeArray on success, NULL on failure. */
+slotRangeArray *slotRangeArrayFromString(sds data) {
+ int num_ranges;
+ long long start, end;
+ slotRangeArray *slots = NULL;
+ if (!data || sdslen(data) == 0) return NULL;
+
+ sds *parts = sdssplitlen(data, sdslen(data), " ", 1, &num_ranges);
+ if (num_ranges <= 0) goto err;
+
+ slots = slotRangeArrayCreate(num_ranges);
+
+ /* Parse each slot range */
+ for (int i = 0; i < num_ranges; i++) {
+ char *dash = strchr(parts[i], '-');
+ if (!dash) goto err;
+
+ if (string2ll(parts[i], dash - parts[i], &start) == 0 ||
+ string2ll(dash + 1, sdslen(parts[i]) - (dash - parts[i]) - 1, &end) == 0)
+ goto err;
+ slotRangeArraySet(slots, i, start, end);
+ }
+
+ /* Validate all ranges */
+ sds err_msg = NULL;
+ if (slotRangeArrayNormalizeAndValidate(slots, &err_msg) != C_OK) {
+ if (err_msg) sdsfree(err_msg);
+ goto err;
+ }
+ sdsfreesplitres(parts, num_ranges);
+ return slots;
+
+err:
+ if (slots) slotRangeArrayFree(slots);
+ sdsfreesplitres(parts, num_ranges);
+ return NULL;
+}
+
+static int compareSlotRange(const void *a, const void *b) {
+ const slotRange *sa = a;
+ const slotRange *sb = b;
+ if (sa->start < sb->start) return -1;
+ if (sa->start > sb->start) return 1;
+ return 0;
+}
+
+/* Sort slot ranges by start slot and merge adjacent ranges.
+ * Adjacent means: prev.end + 1 == next.start.
+ * e.g. 1000-2000 2001-3000 0-100 => 0-100 1000-3000
+ *
+ * Note: Overlapping ranges are not merged.*/
+void slotRangeArraySortAndMerge(slotRangeArray *slots) {
+ if (!slots || slots->num_ranges <= 1) return;
+
+ qsort(slots->ranges, slots->num_ranges, sizeof(slotRange), compareSlotRange);
+
+ int idx = 0;
+ for (int i = 1; i < slots->num_ranges; i++) {
+ if (slots->ranges[idx].end + 1 == slots->ranges[i].start)
+ slots->ranges[idx].end = slots->ranges[i].end;
+ else
+ slots->ranges[++idx] = slots->ranges[i];
+ }
+ slots->num_ranges = idx + 1;
+}
+
+/* Compare two slot range arrays, return 1 if equal, 0 otherwise */
+int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2) {
+ slotRangeArraySortAndMerge(slots1);
+ slotRangeArraySortAndMerge(slots2);
+
+ if (slots1->num_ranges != slots2->num_ranges) return 0;
+
+ for (int i = 0; i < slots1->num_ranges; i++) {
+ if (slots1->ranges[i].start != slots2->ranges[i].start ||
+ slots1->ranges[i].end != slots2->ranges[i].end) {
+ return 0;
+ }
+ }
+ return 1;
+}
+
+/* Add a slot to the slot range array.
+ * Usage:
+ * slotRangeArray *slots = NULL
+ * slots = slotRangeArrayAppend(slots, 1000);
+ * slots = slotRangeArrayAppend(slots, 1001);
+ * slots = slotRangeArrayAppend(slots, 1003);
+ * slots = slotRangeArrayAppend(slots, 1004);
+ * slots = slotRangeArrayAppend(slots, 1005);
+ *
+ * Result: 1000-1001, 1003-1005
+ * Note: `slot` must be greater than the previous slot.
+ * */
+slotRangeArray *slotRangeArrayAppend(slotRangeArray *slots, int slot) {
+ if (slots == NULL) {
+ slots = slotRangeArrayCreate(4);
+ slots->ranges[0].start = slot;
+ slots->ranges[0].end = slot;
+ slots->num_ranges = 1;
+ return slots;
+ }
+
+ serverAssert(slots->num_ranges >= 0 && slots->num_ranges <= CLUSTER_SLOTS);
+ serverAssert(slot > slots->ranges[slots->num_ranges - 1].end);
+
+ /* Check if we can extend the last range */
+ slotRange *last = &slots->ranges[slots->num_ranges - 1];
+ if (slot == last->end + 1) {
+ last->end = slot;
+ return slots;
+ }
+
+ /* Calculate current capacity and reallocate if needed */
+ int cap = (int) ((zmalloc_size(slots) - sizeof(slotRangeArray)) / sizeof(slotRange));
+ if (slots->num_ranges >= cap)
+ slots = zrealloc(slots, sizeof(slotRangeArray) + sizeof(slotRange) * cap * 2);
+
+ /* Add new single-slot range */
+ slots->ranges[slots->num_ranges].start = slot;
+ slots->ranges[slots->num_ranges].end = slot;
+ slots->num_ranges++;
+
+ return slots;
+}
+
+/* Returns 1 if the slot range array contains the given slot, 0 otherwise. */
+int slotRangeArrayContains(slotRangeArray *slots, unsigned int slot) {
+ for (int i = 0; i < slots->num_ranges; i++)
+ if (slots->ranges[i].start <= slot && slots->ranges[i].end >= slot)
+ return 1;
+ return 0;
+}
+
+/* Free the slot range array. */
+void slotRangeArrayFree(slotRangeArray *slots) {
+ zfree(slots);
+}
+
+/* Generic version of slotRangeArrayFree(). */
+void slotRangeArrayFreeGeneric(void *slots) {
+ slotRangeArrayFree(slots);
+}
+
+/* Slot range array iterator */
+slotRangeArrayIter *slotRangeArrayGetIterator(slotRangeArray *slots) {
+ slotRangeArrayIter *it = zmalloc(sizeof(*it));
+ it->slots = slots;
+ it->range_index = 0;
+ it->cur_slot = slots->num_ranges > 0 ? slots->ranges[0].start : -1;
+ return it;
+}
+
+/* Returns the next slot in the array, or -1 if there are no more slots. */
+int slotRangeArrayNext(slotRangeArrayIter *it) {
+ if (it->range_index >= it->slots->num_ranges) return -1;
+
+ if (it->cur_slot < it->slots->ranges[it->range_index].end) {
+ it->cur_slot++;
+ } else {
+ it->range_index++;
+ if (it->range_index < it->slots->num_ranges)
+ it->cur_slot = it->slots->ranges[it->range_index].start;
+ else
+ it->cur_slot = -1; /* finished */
+ }
+ return it->cur_slot;
+}
+
+int slotRangeArrayGetCurrentSlot(slotRangeArrayIter *it) {
+ return it->cur_slot;
+}
+
+void slotRangeArrayIteratorFree(slotRangeArrayIter *it) {
+ zfree(it);
+}
+
+/* Parse slot range pairs from argv starting at `pos`.
+ * `argc` is the argument count, `pos` is the first slot argument index.
+ * Returns a slotRangeArray or NULL on error. */
+slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos) {
+ int start, end, count;
+ slotRangeArray *slots;
+
+ /* Ensure there is at least one (start,end) slot range pairs. */
+ if (argc < 0 || pos < 0 || pos >= argc || (argc - pos) < 2 || ((argc - pos) % 2) != 0) {
+ addReplyErrorArity(c);
+ return NULL;
+ }
+
+ count = (argc - pos) / 2;
+ slots = slotRangeArrayCreate(count);
+ slots->num_ranges = 0;
+
+ for (int j = pos; j < argc; j += 2) {
+ if ((start = getSlotOrReply(c, c->argv[j])) == -1 ||
+ (end = getSlotOrReply(c, c->argv[j + 1])) == -1)
+ {
+ slotRangeArrayFree(slots);
+ return NULL;
+ }
+ slotRangeArraySet(slots, slots->num_ranges, start, end);
+ slots->num_ranges++;
+ }
+
+ sds err = NULL;
+ if (slotRangeArrayNormalizeAndValidate(slots, &err) != C_OK) {
+ addReplyErrorSds(c, err);
+ slotRangeArrayFree(slots);
+ return NULL;
+ }
+ return slots;
+}
+
+/* Return 1 if the keys in the slot can be accessed, 0 otherwise. */
+int clusterCanAccessKeysInSlot(int slot) {
+ /* If not in cluster mode, all keys are accessible */
+ if (server.cluster_enabled == 0) return 1;
+
+ /* If the slot is being imported under old slot migration approach, we should
+ * allow to list keys from the slot as previously. */
+ if (getImportingSlotSource(slot)) return 1;
+
+ /* If using atomic slot migration, check if the slot belongs to the current
+ * node or its master, return 1 if so. */
+ clusterNode *myself = getMyClusterNode();
+ if (clusterNodeIsSlave(myself)) {
+ clusterNode *master = clusterNodeGetMaster(myself);
+ if (master && clusterNodeCoversSlot(master, slot))
+ return 1;
+ } else {
+ if (clusterNodeCoversSlot(myself, slot))
+ return 1;
+ }
+ return 0;
+}
+
+/* Return the slot ranges that belong to the current node or its master. */
+slotRangeArray *clusterGetLocalSlotRanges(void) {
+ slotRangeArray *slots = NULL;
+
+ if (!server.cluster_enabled) {
+ slots = slotRangeArrayCreate(1);
+ slotRangeArraySet(slots, 0, 0, CLUSTER_SLOTS - 1);
+ return slots;
+ }
+
+ clusterNode *master = clusterNodeGetMaster(getMyClusterNode());
+ if (master) {
+ for (int i = 0; i < CLUSTER_SLOTS; i++) {
+ if (clusterNodeCoversSlot(master, i))
+ slots = slotRangeArrayAppend(slots, i);
+ }
+ }
+ return slots ? slots : slotRangeArrayCreate(0);
+}
+
+/* Partially flush destination DB in a cluster node, based on the slot range.
+ *
+ * Usage: SFLUSH <start-slot> <end slot> [<start-slot> <end slot>]* [SYNC|ASYNC]
+ *
+ * This is an initial implementation of SFLUSH (slots flush) which is limited to
+ * flushing a single shard as a whole, but in the future the same command may be
+ * used to partially flush a shard based on hash slots. Currently only if provided
+ * slots cover entirely the slots of a node, the node will be flushed and the
+ * return value will be pairs of slot ranges. Otherwise, a single empty set will
+ * be returned. If possible, SFLUSH SYNC will be run as blocking ASYNC as an
+ * optimization.
+ */
+void sflushCommand(client *c) {
+ int flags = EMPTYDB_NO_FLAGS, argc = c->argc;
+
+ if (server.cluster_enabled == 0) {
+ addReplyError(c,"This instance has cluster support disabled");
+ return;
+ }
+
+ /* check if last argument is SYNC or ASYNC */
+ if (!strcasecmp(c->argv[c->argc-1]->ptr,"sync")) {
+ flags = EMPTYDB_NO_FLAGS;
+ argc--;
+ } else if (!strcasecmp(c->argv[c->argc-1]->ptr,"async")) {
+ flags = EMPTYDB_ASYNC;
+ argc--;
+ } else if (server.lazyfree_lazy_user_flush) {
+ flags = EMPTYDB_ASYNC;
+ }
+
+ /* parse the slot range */
+ if (argc % 2 == 0) {
+ addReplyErrorArity(c);
+ return;
+ }
+
+ /* Parse slot ranges from the command arguments. */
+ slotRangeArray *slots = parseSlotRangesOrReply(c, argc, 1);
+ if (!slots) return;
+
+ /* Iterate and find the slot ranges that belong to this node. Save them in
+ * a new slotRangeArray. It is allocated on heap since there is a chance
+ * that FLUSH SYNC will be running as blocking ASYNC and only later reply
+ * with slot ranges */
+ unsigned char slots_to_flush[CLUSTER_SLOTS] = {0}; /* Requested slots to flush */
+ slotRangeArray *myslots = NULL;
+ for (int i = 0; i < slots->num_ranges; i++) {
+ for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) {
+ if (clusterIsMySlot(j)) {
+ myslots = slotRangeArrayAppend(myslots, j);
+ slots_to_flush[j] = 1;
+ }
+ }
+ }
+
+ /* Verify that all slots of mynode got covered. See sflushCommand() comment. */
+ int all_slots_covered = 1;
+ for (int i = 0; i < CLUSTER_SLOTS; i++) {
+ if (clusterIsMySlot(i) && !slots_to_flush[i]) {
+ all_slots_covered = 0;
+ break;
+ }
+ }
+ if (myslots == NULL || !all_slots_covered) {
+ addReplyArrayLen(c, 0);
+ slotRangeArrayFree(slots);
+ slotRangeArrayFree(myslots);
+ return;
+ }
+ slotRangeArrayFree(slots);
+
+ /* Flush selected slots. If not flush as blocking async, then reply immediately */
+ if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, myslots) == 0)
+ replySlotsFlushAndFree(c, myslots);
+}
+
+/* The READWRITE command just clears the READONLY command state. */
+void readwriteCommand(client *c) {
+ if (server.cluster_enabled == 0) {
+ addReplyError(c,"This instance has cluster support disabled");
+ return;
+ }
+ c->flags &= ~CLIENT_READONLY;
+ addReply(c,shared.ok);
+}
+
+/* Resets transient cluster stats that we expose via INFO or other means that we want
+ * to reset via CONFIG RESETSTAT. The function is also used in order to
+ * initialize these fields in clusterInit() at server startup. */
+void resetClusterStats(void) {
+ if (!server.cluster_enabled) return;
+
+ clusterSlotStatResetAll();
+}
+
+/* This function is called at server startup in order to initialize cluster data
+ * structures that are shared between the different cluster implementations. */
+void clusterCommonInit(void) {
+ resetClusterStats();
+ asmInit();
+}
+
+/* This function is called after the node startup in order to check if there
+ * are any slots that we have keys for, but are not assigned to us. If so,
+ * we delete the keys. */
+void clusterDeleteKeysInUnownedSlots(void) {
+ if (clusterNodeIsSlave(getMyClusterNode())) return;
+
+ /* Check that all the slots we have keys for are assigned to us. Otherwise,
+ * delete the keys. */
+ for (int i = 0; i < CLUSTER_SLOTS; i++) {
+ /* Skip if: no keys in the slot, it's our slot, or we are importing it. */
+ if (!countKeysInSlot(i) ||
+ clusterIsMySlot(i) ||
+ getImportingSlotSource(i))
+ {
+ continue;
+ }
+
+ serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is "
+ "assigned to another node. "
+ "Deleting keys in the slot.", i);
+ /* With atomic slot migration, it is safe to drop keys from slots
+ * that are not owned. This will not result in data loss under the
+ * legacy slot migration approach either, since the importing state
+ * has already been persisted in node.conf. */
+ clusterDelKeysInSlot(i, 0);
+ }
+}
+
+
+/* This function is called after the node startup in order to verify that data
+ * loaded from disk is in agreement with the cluster configuration:
+ *
+ * 1) If we find keys about hash slots we have no responsibility for, the
+ * following happens:
+ * A) If no other node is in charge according to the current cluster
+ * configuration, we add these slots to our node.
+ * B) If according to our config other nodes are already in charge for
+ * this slots, we set the slots as IMPORTING from our point of view
+ * in order to justify we have those slots, and in order to make
+ * redis-cli aware of the issue, so that it can try to fix it.
+ * 2) If we find data in a DB different than DB0 we return C_ERR to
+ * signal the caller it should quit the server with an error message
+ * or take other actions.
+ *
+ * The function always returns C_OK even if it will try to correct
+ * the error described in "1". However if data is found in DB different
+ * from DB0, C_ERR is returned.
+ *
+ * The function also uses the logging facility in order to warn the user
+ * about desynchronizations between the data we have in memory and the
+ * cluster configuration. */
+int verifyClusterConfigWithData(void) {
+ /* Return ASAP if a module disabled cluster redirections. In that case
+ * every master can store keys about every possible hash slot. */
+ if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
+ return C_OK;
+
+ /* If this node is a slave, don't perform the check at all as we
+ * completely depend on the replication stream. */
+ if (clusterNodeIsSlave(getMyClusterNode())) return C_OK;
+
+ /* Make sure we only have keys in DB0. */
+ for (int i = 1; i < server.dbnum; i++) {
+ if (kvstoreSize(server.db[i].keys)) return C_ERR;
+ }
+
+ /* Take over slots that we have keys for, but are assigned to no one. */
+ clusterClaimUnassignedSlots();
+ /* Delete keys in unowned slots */
+ clusterDeleteKeysInUnownedSlots();
+ return C_OK;
+}