diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
| commit | dcacc00e3750300617ba6e16eb346713f91a783a (patch) | |
| tree | 38e2d4fb5ed9d119711d4295c6eda4b014af73fd /examples/redis-unstable/src/cluster.c | |
| parent | 58dac10aeb8f5a041c46bddbeaf4c7966a99b998 (diff) | |
| download | crep-dcacc00e3750300617ba6e16eb346713f91a783a.tar.gz | |
Remove testing data
Diffstat (limited to 'examples/redis-unstable/src/cluster.c')
| -rw-r--r-- | examples/redis-unstable/src/cluster.c | 2263 |
1 files changed, 0 insertions, 2263 deletions
diff --git a/examples/redis-unstable/src/cluster.c b/examples/redis-unstable/src/cluster.c deleted file mode 100644 index d07c31c..0000000 --- a/examples/redis-unstable/src/cluster.c +++ /dev/null | |||
| @@ -1,2263 +0,0 @@ | |||
| 1 | /* | ||
| 2 | * Copyright (c) 2009-Present, Redis Ltd. | ||
| 3 | * All rights reserved. | ||
| 4 | * | ||
| 5 | * Copyright (c) 2024-present, Valkey contributors. | ||
| 6 | * All rights reserved. | ||
| 7 | * | ||
| 8 | * Licensed under your choice of (a) the Redis Source Available License 2.0 | ||
| 9 | * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the | ||
| 10 | * GNU Affero General Public License v3 (AGPLv3). | ||
| 11 | * | ||
| 12 | * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. | ||
| 13 | */ | ||
| 14 | |||
| 15 | /* | ||
| 16 | * cluster.c contains the common parts of a clustering | ||
| 17 | * implementation, the parts that are shared between | ||
| 18 | * any implementation of Redis clustering. | ||
| 19 | */ | ||
| 20 | |||
| 21 | #include "server.h" | ||
| 22 | #include "cluster.h" | ||
| 23 | #include "cluster_asm.h" | ||
| 24 | #include "cluster_slot_stats.h" | ||
| 25 | |||
| 26 | #include <ctype.h> | ||
| 27 | |||
| 28 | /* ----------------------------------------------------------------------------- | ||
| 29 | * Key space handling | ||
| 30 | * -------------------------------------------------------------------------- */ | ||
| 31 | |||
| 32 | /* If it can be inferred that the given glob-style pattern, as implemented in | ||
| 33 | * stringmatchlen() in util.c, only can match keys belonging to a single slot, | ||
| 34 | * that slot is returned. Otherwise -1 is returned. */ | ||
| 35 | int patternHashSlot(char *pattern, int length) { | ||
| 36 | int s = -1; /* index of the first '{' */ | ||
| 37 | |||
| 38 | for (int i = 0; i < length; i++) { | ||
| 39 | if (pattern[i] == '*' || pattern[i] == '?' || pattern[i] == '[') { | ||
| 40 | /* Wildcard or character class found. Keys can be in any slot. */ | ||
| 41 | return -1; | ||
| 42 | } else if (pattern[i] == '\\') { | ||
| 43 | /* Escaped character. Computing slot in this case is not | ||
| 44 | * implemented. We would need a temp buffer. */ | ||
| 45 | return -1; | ||
| 46 | } else if (s == -1 && pattern[i] == '{') { | ||
| 47 | /* Opening brace '{' found. */ | ||
| 48 | s = i; | ||
| 49 | } else if (s >= 0 && pattern[i] == '}' && i == s + 1) { | ||
| 50 | /* Empty tag '{}' found. The whole key is hashed. Ignore braces. */ | ||
| 51 | s = -2; | ||
| 52 | } else if (s >= 0 && pattern[i] == '}') { | ||
| 53 | /* Non-empty tag '{...}' found. Hash what's between braces. */ | ||
| 54 | return crc16(pattern + s + 1, i - s - 1) & 0x3FFF; | ||
| 55 | } | ||
| 56 | } | ||
| 57 | |||
| 58 | /* The pattern matches a single key. Hash the whole pattern. */ | ||
| 59 | return crc16(pattern, length) & 0x3FFF; | ||
| 60 | } | ||
| 61 | |||
| 62 | int getSlotOrReply(client *c, robj *o) { | ||
| 63 | long long slot; | ||
| 64 | |||
| 65 | if (getLongLongFromObject(o,&slot) != C_OK || | ||
| 66 | slot < 0 || slot >= CLUSTER_SLOTS) | ||
| 67 | { | ||
| 68 | addReplyError(c,"Invalid or out of range slot"); | ||
| 69 | return -1; | ||
| 70 | } | ||
| 71 | return (int) slot; | ||
| 72 | } | ||
| 73 | |||
| 74 | ConnectionType *connTypeOfCluster(void) { | ||
| 75 | if (server.tls_cluster) { | ||
| 76 | return connectionTypeTls(); | ||
| 77 | } | ||
| 78 | |||
| 79 | return connectionTypeTcp(); | ||
| 80 | } | ||
| 81 | |||
| 82 | /* ----------------------------------------------------------------------------- | ||
| 83 | * DUMP, RESTORE and MIGRATE commands | ||
| 84 | * -------------------------------------------------------------------------- */ | ||
| 85 | |||
| 86 | /* Generates a DUMP-format representation of the object 'o', adding it to the | ||
| 87 | * io stream pointed by 'rio'. This function can't fail. */ | ||
| 88 | void createDumpPayload(rio *payload, robj *o, robj *key, int dbid, int skip_checksum) { | ||
| 89 | unsigned char buf[2]; | ||
| 90 | uint64_t crc = 0; | ||
| 91 | |||
| 92 | /* Serialize the object in an RDB-like format. It consist of an object type | ||
| 93 | * byte followed by the serialized object. This is understood by RESTORE. */ | ||
| 94 | rioInitWithBuffer(payload,sdsempty()); | ||
| 95 | |||
| 96 | /* Save key metadata if present without (handles TTL separately via command args) */ | ||
| 97 | if (getModuleMetaBits(o->metabits)) | ||
| 98 | serverAssert(rdbSaveKeyMetadata(payload, key, o, dbid) != -1); | ||
| 99 | serverAssert(rdbSaveObjectType(payload,o)); | ||
| 100 | serverAssert(rdbSaveObject(payload,o,key,dbid)); | ||
| 101 | |||
| 102 | /* Write the footer, this is how it looks like: | ||
| 103 | * ----------------+---------------------+---------------+ | ||
| 104 | * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 | | ||
| 105 | * ----------------+---------------------+---------------+ | ||
| 106 | * RDB version and CRC are both in little endian. | ||
| 107 | */ | ||
| 108 | |||
| 109 | /* RDB version */ | ||
| 110 | buf[0] = RDB_VERSION & 0xff; | ||
| 111 | buf[1] = (RDB_VERSION >> 8) & 0xff; | ||
| 112 | payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2); | ||
| 113 | |||
| 114 | /* If crc checksum is disabled, crc is set to 0 and no checksum validation | ||
| 115 | * will be performed on RESTORE. */ | ||
| 116 | if (!skip_checksum) { | ||
| 117 | /* CRC64 */ | ||
| 118 | crc = crc64(0,(unsigned char*)payload->io.buffer.ptr, | ||
| 119 | sdslen(payload->io.buffer.ptr)); | ||
| 120 | memrev64ifbe(&crc); | ||
| 121 | } | ||
| 122 | payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8); | ||
| 123 | } | ||
| 124 | |||
| 125 | /* Verify that the RDB version of the dump payload matches the one of this Redis | ||
| 126 | * instance and that the checksum is ok. | ||
| 127 | * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR | ||
| 128 | * is returned. If rdbver_ptr is not NULL, its populated with the value read | ||
| 129 | * from the input buffer. */ | ||
| 130 | int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) { | ||
| 131 | unsigned char *footer; | ||
| 132 | uint16_t rdbver; | ||
| 133 | uint64_t crc; | ||
| 134 | |||
| 135 | /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */ | ||
| 136 | if (len < 10) return C_ERR; | ||
| 137 | footer = p+(len-10); | ||
| 138 | |||
| 139 | /* Set and verify RDB version. */ | ||
| 140 | rdbver = (footer[1] << 8) | footer[0]; | ||
| 141 | if (rdbver_ptr) { | ||
| 142 | *rdbver_ptr = rdbver; | ||
| 143 | } | ||
| 144 | if (rdbver > RDB_VERSION) return C_ERR; | ||
| 145 | |||
| 146 | if (server.skip_checksum_validation) | ||
| 147 | return C_OK; | ||
| 148 | |||
| 149 | uint64_t crc_payload; | ||
| 150 | memcpy(&crc_payload, footer+2, 8); | ||
| 151 | if (crc_payload == 0) /* No checksum. */ | ||
| 152 | return C_OK; | ||
| 153 | |||
| 154 | /* Verify CRC64 */ | ||
| 155 | crc = crc64(0,p,len-8); | ||
| 156 | memrev64ifbe(&crc); | ||
| 157 | return crc == crc_payload ? C_OK : C_ERR; | ||
| 158 | } | ||
| 159 | |||
| 160 | /* DUMP keyname | ||
| 161 | * DUMP is actually not used by Redis Cluster but it is the obvious | ||
| 162 | * complement of RESTORE and can be useful for different applications. */ | ||
| 163 | void dumpCommand(client *c) { | ||
| 164 | kvobj *o; | ||
| 165 | rio payload; | ||
| 166 | |||
| 167 | /* Check if the key is here. */ | ||
| 168 | if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) { | ||
| 169 | addReplyNull(c); | ||
| 170 | return; | ||
| 171 | } | ||
| 172 | |||
| 173 | /* Create the DUMP encoded representation. */ | ||
| 174 | createDumpPayload(&payload,o,c->argv[1],c->db->id,0); | ||
| 175 | |||
| 176 | /* Transfer to the client */ | ||
| 177 | addReplyBulkSds(c,payload.io.buffer.ptr); | ||
| 178 | return; | ||
| 179 | } | ||
| 180 | |||
| 181 | /* RESTORE key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency] */ | ||
| 182 | void restoreCommand(client *c) { | ||
| 183 | long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1; | ||
| 184 | rio payload; | ||
| 185 | int j, type, replace = 0, absttl = 0; | ||
| 186 | robj *obj; | ||
| 187 | |||
| 188 | /* Parse additional options */ | ||
| 189 | for (j = 4; j < c->argc; j++) { | ||
| 190 | int additional = c->argc-j-1; | ||
| 191 | if (!strcasecmp(c->argv[j]->ptr,"replace")) { | ||
| 192 | replace = 1; | ||
| 193 | } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) { | ||
| 194 | absttl = 1; | ||
| 195 | } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 && | ||
| 196 | lfu_freq == -1) | ||
| 197 | { | ||
| 198 | if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL) | ||
| 199 | != C_OK) return; | ||
| 200 | if (lru_idle < 0) { | ||
| 201 | addReplyError(c,"Invalid IDLETIME value, must be >= 0"); | ||
| 202 | return; | ||
| 203 | } | ||
| 204 | lru_clock = LRU_CLOCK(); | ||
| 205 | j++; /* Consume additional arg. */ | ||
| 206 | } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 && | ||
| 207 | lru_idle == -1) | ||
| 208 | { | ||
| 209 | if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL) | ||
| 210 | != C_OK) return; | ||
| 211 | if (lfu_freq < 0 || lfu_freq > 255) { | ||
| 212 | addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255"); | ||
| 213 | return; | ||
| 214 | } | ||
| 215 | j++; /* Consume additional arg. */ | ||
| 216 | } else { | ||
| 217 | addReplyErrorObject(c,shared.syntaxerr); | ||
| 218 | return; | ||
| 219 | } | ||
| 220 | } | ||
| 221 | |||
| 222 | /* Make sure this key does not already exist here... */ | ||
| 223 | robj *key = c->argv[1]; | ||
| 224 | kvobj *oldval = lookupKeyWrite(c->db,key); | ||
| 225 | int oldtype = oldval ? oldval->type : -1; | ||
| 226 | if (!replace && oldval) { | ||
| 227 | addReplyErrorObject(c,shared.busykeyerr); | ||
| 228 | return; | ||
| 229 | } | ||
| 230 | |||
| 231 | /* Check if the TTL value makes sense */ | ||
| 232 | if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) { | ||
| 233 | return; | ||
| 234 | } else if (ttl < 0) { | ||
| 235 | addReplyError(c,"Invalid TTL value, must be >= 0"); | ||
| 236 | return; | ||
| 237 | } | ||
| 238 | |||
| 239 | /* Verify RDB version and data checksum. */ | ||
| 240 | if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr),NULL) == C_ERR) | ||
| 241 | { | ||
| 242 | addReplyError(c,"DUMP payload version or checksum are wrong"); | ||
| 243 | return; | ||
| 244 | } | ||
| 245 | |||
| 246 | rioInitWithBuffer(&payload,c->argv[3]->ptr); | ||
| 247 | |||
| 248 | /* Initialize metadata spec to collect metadata+expiry from payload. */ | ||
| 249 | KeyMetaSpec keymeta; | ||
| 250 | keyMetaSpecInit(&keymeta); | ||
| 251 | |||
| 252 | /* Compute TTL early so we can add it to metadata spec in correct order */ | ||
| 253 | if (ttl) { | ||
| 254 | if (!absttl) ttl+=commandTimeSnapshot(); | ||
| 255 | keyMetaSpecAdd(&keymeta, KEY_META_ID_EXPIRE, ttl); | ||
| 256 | } | ||
| 257 | |||
| 258 | /* With metadata, type = RDB_OPCODE_KEY_META. Layout: [<META>,]<TYPE>,<KEY>,<VALUE> */ | ||
| 259 | type = rdbLoadType(&payload); | ||
| 260 | if (rdbResolveKeyType(&payload, &type, c->db->id, &keymeta) == -1) { | ||
| 261 | addReplyError(c,"Bad data format"); | ||
| 262 | return; | ||
| 263 | } | ||
| 264 | |||
| 265 | /* Load the object */ | ||
| 266 | if ((obj = rdbLoadObject(type,&payload,key->ptr,c->db->id,NULL)) == NULL) | ||
| 267 | { | ||
| 268 | keyMetaSpecCleanup(&keymeta); | ||
| 269 | addReplyError(c,"Bad data format"); | ||
| 270 | return; | ||
| 271 | } | ||
| 272 | |||
| 273 | /* Remove the old key if needed. */ | ||
| 274 | int deleted = 0; | ||
| 275 | if (replace) | ||
| 276 | deleted = dbDelete(c->db,key); | ||
| 277 | |||
| 278 | if (ttl && checkAlreadyExpired(ttl)) { | ||
| 279 | if (deleted) { | ||
| 280 | robj *aux = server.lazyfree_lazy_server_del ? shared.unlink : shared.del; | ||
| 281 | rewriteClientCommandVector(c, 2, aux, key); | ||
| 282 | keyModified(c,c->db,key,NULL,1); | ||
| 283 | notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id); | ||
| 284 | server.dirty++; | ||
| 285 | } | ||
| 286 | keyMetaSpecCleanup(&keymeta); | ||
| 287 | decrRefCount(obj); | ||
| 288 | addReply(c, shared.ok); | ||
| 289 | return; | ||
| 290 | } | ||
| 291 | |||
| 292 | /* Create the key and set the TTL if any */ | ||
| 293 | kvobj *kv = dbAddInternal(c->db, key, &obj, NULL, &keymeta); | ||
| 294 | |||
| 295 | /* If minExpiredField was set, then the object is hash with expiration | ||
| 296 | * on fields and need to register it in global HFE DS */ | ||
| 297 | if (kv->type == OBJ_HASH) { | ||
| 298 | uint64_t minExpiredField = hashTypeGetMinExpire(kv, 1); | ||
| 299 | if (minExpiredField != EB_EXPIRE_TIME_INVALID) | ||
| 300 | estoreAdd(c->db->subexpires, getKeySlot(key->ptr), kv, minExpiredField); | ||
| 301 | } | ||
| 302 | |||
| 303 | if (ttl) { | ||
| 304 | if (!absttl) { | ||
| 305 | /* Propagate TTL as absolute timestamp */ | ||
| 306 | robj *ttl_obj = createStringObjectFromLongLong(ttl); | ||
| 307 | rewriteClientCommandArgument(c,2,ttl_obj); | ||
| 308 | decrRefCount(ttl_obj); | ||
| 309 | rewriteClientCommandArgument(c,c->argc,shared.absttl); | ||
| 310 | } | ||
| 311 | } | ||
| 312 | objectSetLRUOrLFU(kv, lfu_freq, lru_idle, lru_clock, 1000); | ||
| 313 | keyModified(c,c->db,key,NULL,1); | ||
| 314 | notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id); | ||
| 315 | |||
| 316 | /* If we deleted a key that means REPLACE parameter was passed and the | ||
| 317 | * destination key existed. */ | ||
| 318 | if (deleted) { | ||
| 319 | notifyKeyspaceEvent(NOTIFY_OVERWRITTEN, "overwritten", key, c->db->id); | ||
| 320 | if (oldtype != kv->type) { | ||
| 321 | notifyKeyspaceEvent(NOTIFY_TYPE_CHANGED, "type_changed", key, c->db->id); | ||
| 322 | } | ||
| 323 | } | ||
| 324 | addReply(c,shared.ok); | ||
| 325 | server.dirty++; | ||
| 326 | } | ||
| 327 | /* MIGRATE socket cache implementation. | ||
| 328 | * | ||
| 329 | * We take a map between host:ip and a TCP socket that we used to connect | ||
| 330 | * to this instance in recent time. | ||
| 331 | * This sockets are closed when the max number we cache is reached, and also | ||
| 332 | * in serverCron() when they are around for more than a few seconds. */ | ||
| 333 | #define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */ | ||
| 334 | #define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */ | ||
| 335 | |||
| 336 | typedef struct migrateCachedSocket { | ||
| 337 | connection *conn; | ||
| 338 | long last_dbid; | ||
| 339 | time_t last_use_time; | ||
| 340 | } migrateCachedSocket; | ||
| 341 | |||
| 342 | /* Return a migrateCachedSocket containing a TCP socket connected with the | ||
| 343 | * target instance, possibly returning a cached one. | ||
| 344 | * | ||
| 345 | * This function is responsible of sending errors to the client if a | ||
| 346 | * connection can't be established. In this case -1 is returned. | ||
| 347 | * Otherwise on success the socket is returned, and the caller should not | ||
| 348 | * attempt to free it after usage. | ||
| 349 | * | ||
| 350 | * If the caller detects an error while using the socket, migrateCloseSocket() | ||
| 351 | * should be called so that the connection will be created from scratch | ||
| 352 | * the next time. */ | ||
| 353 | migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) { | ||
| 354 | connection *conn; | ||
| 355 | sds name = sdsempty(); | ||
| 356 | migrateCachedSocket *cs; | ||
| 357 | |||
| 358 | /* Check if we have an already cached socket for this ip:port pair. */ | ||
| 359 | name = sdscatlen(name,host->ptr,sdslen(host->ptr)); | ||
| 360 | name = sdscatlen(name,":",1); | ||
| 361 | name = sdscatlen(name,port->ptr,sdslen(port->ptr)); | ||
| 362 | cs = dictFetchValue(server.migrate_cached_sockets,name); | ||
| 363 | if (cs) { | ||
| 364 | sdsfree(name); | ||
| 365 | cs->last_use_time = server.unixtime; | ||
| 366 | return cs; | ||
| 367 | } | ||
| 368 | |||
| 369 | /* No cached socket, create one. */ | ||
| 370 | if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) { | ||
| 371 | /* Too many items, drop one at random. */ | ||
| 372 | dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets); | ||
| 373 | cs = dictGetVal(de); | ||
| 374 | connClose(cs->conn); | ||
| 375 | zfree(cs); | ||
| 376 | dictDelete(server.migrate_cached_sockets,dictGetKey(de)); | ||
| 377 | } | ||
| 378 | |||
| 379 | /* Create the connection */ | ||
| 380 | conn = connCreate(server.el, connTypeOfCluster()); | ||
| 381 | if (connBlockingConnect(conn, host->ptr, atoi(port->ptr), timeout) | ||
| 382 | != C_OK) { | ||
| 383 | addReplyError(c,"-IOERR error or timeout connecting to the client"); | ||
| 384 | connClose(conn); | ||
| 385 | sdsfree(name); | ||
| 386 | return NULL; | ||
| 387 | } | ||
| 388 | connEnableTcpNoDelay(conn); | ||
| 389 | |||
| 390 | /* Add to the cache and return it to the caller. */ | ||
| 391 | cs = zmalloc(sizeof(*cs)); | ||
| 392 | cs->conn = conn; | ||
| 393 | |||
| 394 | cs->last_dbid = -1; | ||
| 395 | cs->last_use_time = server.unixtime; | ||
| 396 | dictAdd(server.migrate_cached_sockets,name,cs); | ||
| 397 | return cs; | ||
| 398 | } | ||
| 399 | |||
| 400 | /* Free a migrate cached connection. */ | ||
| 401 | void migrateCloseSocket(robj *host, robj *port) { | ||
| 402 | sds name = sdsempty(); | ||
| 403 | migrateCachedSocket *cs; | ||
| 404 | |||
| 405 | name = sdscatlen(name,host->ptr,sdslen(host->ptr)); | ||
| 406 | name = sdscatlen(name,":",1); | ||
| 407 | name = sdscatlen(name,port->ptr,sdslen(port->ptr)); | ||
| 408 | cs = dictFetchValue(server.migrate_cached_sockets,name); | ||
| 409 | if (!cs) { | ||
| 410 | sdsfree(name); | ||
| 411 | return; | ||
| 412 | } | ||
| 413 | |||
| 414 | connClose(cs->conn); | ||
| 415 | zfree(cs); | ||
| 416 | dictDelete(server.migrate_cached_sockets,name); | ||
| 417 | sdsfree(name); | ||
| 418 | } | ||
| 419 | |||
| 420 | void migrateCloseTimedoutSockets(void) { | ||
| 421 | dictIterator di; | ||
| 422 | dictEntry *de; | ||
| 423 | |||
| 424 | dictInitSafeIterator(&di, server.migrate_cached_sockets); | ||
| 425 | while((de = dictNext(&di)) != NULL) { | ||
| 426 | migrateCachedSocket *cs = dictGetVal(de); | ||
| 427 | |||
| 428 | if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) { | ||
| 429 | connClose(cs->conn); | ||
| 430 | zfree(cs); | ||
| 431 | dictDelete(server.migrate_cached_sockets,dictGetKey(de)); | ||
| 432 | } | ||
| 433 | } | ||
| 434 | dictResetIterator(&di); | ||
| 435 | } | ||
| 436 | |||
| 437 | /* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password | | ||
| 438 | * AUTH2 username password] | ||
| 439 | * | ||
| 440 | * On in the multiple keys form: | ||
| 441 | * | ||
| 442 | * MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password | | ||
| 443 | * AUTH2 username password] KEYS key1 key2 ... keyN */ | ||
| 444 | void migrateCommand(client *c) { | ||
| 445 | migrateCachedSocket *cs; | ||
| 446 | int copy = 0, replace = 0, j; | ||
| 447 | char *username = NULL; | ||
| 448 | char *password = NULL; | ||
| 449 | long timeout; | ||
| 450 | long dbid; | ||
| 451 | robj **kvArray = NULL; /* Objects to migrate. */ | ||
| 452 | robj **keyArray = NULL; /* Key names. */ | ||
| 453 | robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */ | ||
| 454 | rio cmd, payload; | ||
| 455 | int may_retry = 1; | ||
| 456 | int write_error = 0; | ||
| 457 | int argv_rewritten = 0; | ||
| 458 | |||
| 459 | /* To support the KEYS option we need the following additional state. */ | ||
| 460 | int first_key = 3; /* Argument index of the first key. */ | ||
| 461 | int num_keys = 1; /* By default only migrate the 'key' argument. */ | ||
| 462 | |||
| 463 | /* Parse additional options */ | ||
| 464 | for (j = 6; j < c->argc; j++) { | ||
| 465 | int moreargs = (c->argc-1) - j; | ||
| 466 | if (!strcasecmp(c->argv[j]->ptr,"copy")) { | ||
| 467 | copy = 1; | ||
| 468 | } else if (!strcasecmp(c->argv[j]->ptr,"replace")) { | ||
| 469 | replace = 1; | ||
| 470 | } else if (!strcasecmp(c->argv[j]->ptr,"auth")) { | ||
| 471 | if (!moreargs) { | ||
| 472 | addReplyErrorObject(c,shared.syntaxerr); | ||
| 473 | return; | ||
| 474 | } | ||
| 475 | j++; | ||
| 476 | password = c->argv[j]->ptr; | ||
| 477 | redactClientCommandArgument(c,j); | ||
| 478 | } else if (!strcasecmp(c->argv[j]->ptr,"auth2")) { | ||
| 479 | if (moreargs < 2) { | ||
| 480 | addReplyErrorObject(c,shared.syntaxerr); | ||
| 481 | return; | ||
| 482 | } | ||
| 483 | username = c->argv[++j]->ptr; | ||
| 484 | redactClientCommandArgument(c,j); | ||
| 485 | password = c->argv[++j]->ptr; | ||
| 486 | redactClientCommandArgument(c,j); | ||
| 487 | } else if (!strcasecmp(c->argv[j]->ptr,"keys")) { | ||
| 488 | if (sdslen(c->argv[3]->ptr) != 0) { | ||
| 489 | addReplyError(c, | ||
| 490 | "When using MIGRATE KEYS option, the key argument" | ||
| 491 | " must be set to the empty string"); | ||
| 492 | return; | ||
| 493 | } | ||
| 494 | first_key = j+1; | ||
| 495 | num_keys = c->argc - j - 1; | ||
| 496 | break; /* All the remaining args are keys. */ | ||
| 497 | } else { | ||
| 498 | addReplyErrorObject(c,shared.syntaxerr); | ||
| 499 | return; | ||
| 500 | } | ||
| 501 | } | ||
| 502 | |||
| 503 | /* Sanity check */ | ||
| 504 | if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK || | ||
| 505 | getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK) | ||
| 506 | { | ||
| 507 | return; | ||
| 508 | } | ||
| 509 | if (timeout <= 0) timeout = 1000; | ||
| 510 | |||
| 511 | /* Check if the keys are here. If at least one key is to migrate, do it | ||
| 512 | * otherwise if all the keys are missing reply with "NOKEY" to signal | ||
| 513 | * the caller there was nothing to migrate. We don't return an error in | ||
| 514 | * this case, since often this is due to a normal condition like the key | ||
| 515 | * expiring in the meantime. */ | ||
| 516 | kvArray = zrealloc(kvArray,sizeof(kvobj*)*num_keys); | ||
| 517 | keyArray = zrealloc(keyArray,sizeof(robj*)*num_keys); | ||
| 518 | int num_exists = 0; | ||
| 519 | |||
| 520 | for (j = 0; j < num_keys; j++) { | ||
| 521 | if ((kvArray[num_exists] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) { | ||
| 522 | keyArray[num_exists] = c->argv[first_key+j]; | ||
| 523 | num_exists++; | ||
| 524 | } | ||
| 525 | } | ||
| 526 | num_keys = num_exists; | ||
| 527 | if (num_keys == 0) { | ||
| 528 | zfree(kvArray); zfree(keyArray); | ||
| 529 | addReplySds(c,sdsnew("+NOKEY\r\n")); | ||
| 530 | return; | ||
| 531 | } | ||
| 532 | |||
| 533 | try_again: | ||
| 534 | write_error = 0; | ||
| 535 | |||
| 536 | /* Connect */ | ||
| 537 | cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout); | ||
| 538 | if (cs == NULL) { | ||
| 539 | zfree(kvArray); zfree(keyArray); | ||
| 540 | return; /* error sent to the client by migrateGetSocket() */ | ||
| 541 | } | ||
| 542 | |||
| 543 | rioInitWithBuffer(&cmd,sdsempty()); | ||
| 544 | |||
| 545 | /* Authentication */ | ||
| 546 | if (password) { | ||
| 547 | int arity = username ? 3 : 2; | ||
| 548 | serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity)); | ||
| 549 | serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4)); | ||
| 550 | if (username) { | ||
| 551 | serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username, | ||
| 552 | sdslen(username))); | ||
| 553 | } | ||
| 554 | serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password, | ||
| 555 | sdslen(password))); | ||
| 556 | } | ||
| 557 | |||
| 558 | /* Send the SELECT command if the current DB is not already selected. */ | ||
| 559 | int select = cs->last_dbid != dbid; /* Should we emit SELECT? */ | ||
| 560 | if (select) { | ||
| 561 | serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2)); | ||
| 562 | serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6)); | ||
| 563 | serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid)); | ||
| 564 | } | ||
| 565 | |||
| 566 | int non_expired = 0; /* Number of keys that we'll find non expired. | ||
| 567 | Note that serializing large keys may take some time | ||
| 568 | so certain keys that were found non expired by the | ||
| 569 | lookupKey() function, may be expired later. */ | ||
| 570 | |||
| 571 | /* Create RESTORE payload and generate the protocol to call the command. */ | ||
| 572 | for (j = 0; j < num_keys; j++) { | ||
| 573 | long long ttl = 0; | ||
| 574 | long long expireat = kvobjGetExpire(kvArray[j]); | ||
| 575 | |||
| 576 | if (expireat != -1) { | ||
| 577 | ttl = expireat-commandTimeSnapshot(); | ||
| 578 | if (ttl < 0) { | ||
| 579 | continue; | ||
| 580 | } | ||
| 581 | if (ttl < 1) ttl = 1; | ||
| 582 | } | ||
| 583 | |||
| 584 | /* Relocate valid (non expired) keys and values into the array in successive | ||
| 585 | * positions to remove holes created by the keys that were present | ||
| 586 | * in the first lookup but are now expired after the second lookup. */ | ||
| 587 | kvArray[non_expired] = kvArray[j]; | ||
| 588 | keyArray[non_expired++] = keyArray[j]; | ||
| 589 | |||
| 590 | serverAssertWithInfo(c,NULL, | ||
| 591 | rioWriteBulkCount(&cmd,'*',replace ? 5 : 4)); | ||
| 592 | |||
| 593 | if (server.cluster_enabled) | ||
| 594 | serverAssertWithInfo(c,NULL, | ||
| 595 | rioWriteBulkString(&cmd,"RESTORE-ASKING",14)); | ||
| 596 | else | ||
| 597 | serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7)); | ||
| 598 | serverAssertWithInfo(c,NULL,sdsEncodedObject(keyArray[j])); | ||
| 599 | serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,keyArray[j]->ptr, | ||
| 600 | sdslen(keyArray[j]->ptr))); | ||
| 601 | serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl)); | ||
| 602 | |||
| 603 | /* Emit the payload argument, that is the serialized object using | ||
| 604 | * the DUMP format. */ | ||
| 605 | createDumpPayload(&payload,kvArray[j],keyArray[j],dbid,0); | ||
| 606 | serverAssertWithInfo(c,NULL, | ||
| 607 | rioWriteBulkString(&cmd,payload.io.buffer.ptr, | ||
| 608 | sdslen(payload.io.buffer.ptr))); | ||
| 609 | sdsfree(payload.io.buffer.ptr); | ||
| 610 | |||
| 611 | /* Add the REPLACE option to the RESTORE command if it was specified | ||
| 612 | * as a MIGRATE option. */ | ||
| 613 | if (replace) | ||
| 614 | serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7)); | ||
| 615 | } | ||
| 616 | |||
| 617 | /* Fix the actual number of keys we are migrating. */ | ||
| 618 | num_keys = non_expired; | ||
| 619 | |||
| 620 | /* Transfer the query to the other node in 64K chunks. */ | ||
| 621 | errno = 0; | ||
| 622 | { | ||
| 623 | sds buf = cmd.io.buffer.ptr; | ||
| 624 | size_t pos = 0, towrite; | ||
| 625 | int nwritten = 0; | ||
| 626 | |||
| 627 | while ((towrite = sdslen(buf)-pos) > 0) { | ||
| 628 | towrite = (towrite > (64*1024) ? (64*1024) : towrite); | ||
| 629 | nwritten = connSyncWrite(cs->conn,buf+pos,towrite,timeout); | ||
| 630 | if (nwritten != (signed)towrite) { | ||
| 631 | write_error = 1; | ||
| 632 | goto socket_err; | ||
| 633 | } | ||
| 634 | pos += nwritten; | ||
| 635 | } | ||
| 636 | } | ||
| 637 | |||
| 638 | char buf0[1024]; /* Auth reply. */ | ||
| 639 | char buf1[1024]; /* Select reply. */ | ||
| 640 | char buf2[1024]; /* Restore reply. */ | ||
| 641 | |||
| 642 | /* Read the AUTH reply if needed. */ | ||
| 643 | if (password && connSyncReadLine(cs->conn, buf0, sizeof(buf0), timeout) <= 0) | ||
| 644 | goto socket_err; | ||
| 645 | |||
| 646 | /* Read the SELECT reply if needed. */ | ||
| 647 | if (select && connSyncReadLine(cs->conn, buf1, sizeof(buf1), timeout) <= 0) | ||
| 648 | goto socket_err; | ||
| 649 | |||
| 650 | /* Read the RESTORE replies. */ | ||
| 651 | int error_from_target = 0; | ||
| 652 | int socket_error = 0; | ||
| 653 | int del_idx = 1; /* Index of the key argument for the replicated DEL op. */ | ||
| 654 | |||
| 655 | /* Allocate the new argument vector that will replace the current command, | ||
| 656 | * to propagate the MIGRATE as a DEL command (if no COPY option was given). | ||
| 657 | * We allocate num_keys+1 because the additional argument is for "DEL" | ||
| 658 | * command name itself. */ | ||
| 659 | if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1)); | ||
| 660 | |||
| 661 | for (j = 0; j < num_keys; j++) { | ||
| 662 | if (connSyncReadLine(cs->conn, buf2, sizeof(buf2), timeout) <= 0) { | ||
| 663 | socket_error = 1; | ||
| 664 | break; | ||
| 665 | } | ||
| 666 | if ((password && buf0[0] == '-') || | ||
| 667 | (select && buf1[0] == '-') || | ||
| 668 | buf2[0] == '-') | ||
| 669 | { | ||
| 670 | /* On error assume that last_dbid is no longer valid. */ | ||
| 671 | if (!error_from_target) { | ||
| 672 | cs->last_dbid = -1; | ||
| 673 | char *errbuf; | ||
| 674 | if (password && buf0[0] == '-') errbuf = buf0; | ||
| 675 | else if (select && buf1[0] == '-') errbuf = buf1; | ||
| 676 | else errbuf = buf2; | ||
| 677 | |||
| 678 | error_from_target = 1; | ||
| 679 | addReplyErrorFormat(c,"Target instance replied with error: %s", | ||
| 680 | errbuf+1); | ||
| 681 | } | ||
| 682 | } else { | ||
| 683 | if (!copy) { | ||
| 684 | /* No COPY option: remove the local key, signal the change. */ | ||
| 685 | dbDelete(c->db,keyArray[j]); | ||
| 686 | keyModified(c,c->db,keyArray[j],NULL,1); | ||
| 687 | notifyKeyspaceEvent(NOTIFY_GENERIC,"del",keyArray[j],c->db->id); | ||
| 688 | server.dirty++; | ||
| 689 | |||
| 690 | /* Populate the argument vector to replace the old one. */ | ||
| 691 | newargv[del_idx++] = keyArray[j]; | ||
| 692 | incrRefCount(keyArray[j]); | ||
| 693 | } | ||
| 694 | } | ||
| 695 | } | ||
| 696 | |||
| 697 | /* On socket error, if we want to retry, do it now before rewriting the | ||
| 698 | * command vector. We only retry if we are sure nothing was processed | ||
| 699 | * and we failed to read the first reply (j == 0 test). */ | ||
| 700 | if (!error_from_target && socket_error && j == 0 && may_retry && | ||
| 701 | errno != ETIMEDOUT) | ||
| 702 | { | ||
| 703 | goto socket_err; /* A retry is guaranteed because of tested conditions.*/ | ||
| 704 | } | ||
| 705 | |||
| 706 | /* On socket errors, close the migration socket now that we still have | ||
| 707 | * the original host/port in the ARGV. Later the original command may be | ||
| 708 | * rewritten to DEL and will be too later. */ | ||
| 709 | if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]); | ||
| 710 | |||
| 711 | if (!copy) { | ||
| 712 | /* Translate MIGRATE as DEL for replication/AOF. Note that we do | ||
| 713 | * this only for the keys for which we received an acknowledgement | ||
| 714 | * from the receiving Redis server, by using the del_idx index. */ | ||
| 715 | if (del_idx > 1) { | ||
| 716 | newargv[0] = createStringObject("DEL",3); | ||
| 717 | /* Note that the following call takes ownership of newargv. */ | ||
| 718 | replaceClientCommandVector(c,del_idx,newargv); | ||
| 719 | argv_rewritten = 1; | ||
| 720 | } else { | ||
| 721 | /* No key transfer acknowledged, no need to rewrite as DEL. */ | ||
| 722 | zfree(newargv); | ||
| 723 | } | ||
| 724 | newargv = NULL; /* Make it safe to call zfree() on it in the future. */ | ||
| 725 | } | ||
| 726 | |||
| 727 | /* If we are here and a socket error happened, we don't want to retry. | ||
| 728 | * Just signal the problem to the client, but only do it if we did not | ||
| 729 | * already queue a different error reported by the destination server. */ | ||
| 730 | if (!error_from_target && socket_error) { | ||
| 731 | may_retry = 0; | ||
| 732 | goto socket_err; | ||
| 733 | } | ||
| 734 | |||
| 735 | if (!error_from_target) { | ||
| 736 | /* Success! Update the last_dbid in migrateCachedSocket, so that we can | ||
| 737 | * avoid SELECT the next time if the target DB is the same. Reply +OK. | ||
| 738 | * | ||
| 739 | * Note: If we reached this point, even if socket_error is true | ||
| 740 | * still the SELECT command succeeded (otherwise the code jumps to | ||
| 741 | * socket_err label. */ | ||
| 742 | cs->last_dbid = dbid; | ||
| 743 | addReply(c,shared.ok); | ||
| 744 | } else { | ||
| 745 | /* On error we already sent it in the for loop above, and set | ||
| 746 | * the currently selected socket to -1 to force SELECT the next time. */ | ||
| 747 | } | ||
| 748 | |||
| 749 | sdsfree(cmd.io.buffer.ptr); | ||
| 750 | zfree(kvArray); zfree(keyArray); zfree(newargv); | ||
| 751 | return; | ||
| 752 | |||
| 753 | /* On socket errors we try to close the cached socket and try again. | ||
| 754 | * It is very common for the cached socket to get closed, if just reopening | ||
| 755 | * it works it's a shame to notify the error to the caller. */ | ||
| 756 | socket_err: | ||
| 757 | /* Cleanup we want to perform in both the retry and no retry case. | ||
| 758 | * Note: Closing the migrate socket will also force SELECT next time. */ | ||
| 759 | sdsfree(cmd.io.buffer.ptr); | ||
| 760 | |||
| 761 | /* If the command was rewritten as DEL and there was a socket error, | ||
| 762 | * we already closed the socket earlier. While migrateCloseSocket() | ||
| 763 | * is idempotent, the host/port arguments are now gone, so don't do it | ||
| 764 | * again. */ | ||
| 765 | if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]); | ||
| 766 | zfree(newargv); | ||
| 767 | newargv = NULL; /* This will get reallocated on retry. */ | ||
| 768 | |||
| 769 | /* Retry only if it's not a timeout and we never attempted a retry | ||
| 770 | * (or the code jumping here did not set may_retry to zero). */ | ||
| 771 | if (errno != ETIMEDOUT && may_retry) { | ||
| 772 | may_retry = 0; | ||
| 773 | goto try_again; | ||
| 774 | } | ||
| 775 | |||
| 776 | /* Cleanup we want to do if no retry is attempted. */ | ||
| 777 | zfree(kvArray); zfree(keyArray); | ||
| 778 | addReplyErrorSds(c, sdscatprintf(sdsempty(), | ||
| 779 | "-IOERR error or timeout %s to target instance", | ||
| 780 | write_error ? "writing" : "reading")); | ||
| 781 | return; | ||
| 782 | } | ||
| 783 | |||
| 784 | /* Cluster node sanity check. Returns C_OK if the node id | ||
| 785 | * is valid an C_ERR otherwise. */ | ||
| 786 | int verifyClusterNodeId(const char *name, int length) { | ||
| 787 | if (length != CLUSTER_NAMELEN) return C_ERR; | ||
| 788 | for (int i = 0; i < length; i++) { | ||
| 789 | if (name[i] >= 'a' && name[i] <= 'z') continue; | ||
| 790 | if (name[i] >= '0' && name[i] <= '9') continue; | ||
| 791 | return C_ERR; | ||
| 792 | } | ||
| 793 | return C_OK; | ||
| 794 | } | ||
| 795 | |||
| 796 | int isValidAuxChar(int c) { | ||
| 797 | return isalnum(c) || (strchr("!#$%&()*+:;<>?@[]^{|}~", c) == NULL); | ||
| 798 | } | ||
| 799 | |||
| 800 | int isValidAuxString(char *s, unsigned int length) { | ||
| 801 | for (unsigned i = 0; i < length; i++) { | ||
| 802 | if (!isValidAuxChar(s[i])) return 0; | ||
| 803 | } | ||
| 804 | return 1; | ||
| 805 | } | ||
| 806 | |||
| 807 | void clusterCommandMyId(client *c) { | ||
| 808 | char *name = clusterNodeGetName(getMyClusterNode()); | ||
| 809 | if (name) { | ||
| 810 | addReplyBulkCBuffer(c,name, CLUSTER_NAMELEN); | ||
| 811 | } else { | ||
| 812 | addReplyError(c, "No ID yet"); | ||
| 813 | } | ||
| 814 | } | ||
| 815 | |||
| 816 | char* getMyClusterId(void) { | ||
| 817 | return clusterNodeGetName(getMyClusterNode()); | ||
| 818 | } | ||
| 819 | |||
| 820 | void clusterCommandMyShardId(client *c) { | ||
| 821 | char *sid = clusterNodeGetShardId(getMyClusterNode()); | ||
| 822 | if (sid) { | ||
| 823 | addReplyBulkCBuffer(c,sid, CLUSTER_NAMELEN); | ||
| 824 | } else { | ||
| 825 | addReplyError(c, "No shard ID yet"); | ||
| 826 | } | ||
| 827 | } | ||
| 828 | |||
| 829 | /* When a cluster command is called, we need to decide whether to return TLS info or | ||
| 830 | * non-TLS info by the client's connection type. However if the command is called by | ||
| 831 | * a Lua script or RM_call, there is no connection in the fake client, so we use | ||
| 832 | * server.current_client here to get the real client if available. And if it is not | ||
| 833 | * available (modules may call commands without a real client), we return the default | ||
| 834 | * info, which is determined by server.tls_cluster. */ | ||
| 835 | static int shouldReturnTlsInfo(void) { | ||
| 836 | if (server.current_client && server.current_client->conn) { | ||
| 837 | return connIsTLS(server.current_client->conn); | ||
| 838 | } else { | ||
| 839 | return server.tls_cluster; | ||
| 840 | } | ||
| 841 | } | ||
| 842 | |||
| 843 | unsigned int countKeysInSlot(unsigned int slot) { | ||
| 844 | return kvstoreDictSize(server.db->keys, slot); | ||
| 845 | } | ||
| 846 | |||
| 847 | /* Add detailed information of a node to the output buffer of the given client. */ | ||
| 848 | void addNodeDetailsToShardReply(client *c, clusterNode *node) { | ||
| 849 | |||
| 850 | int reply_count = 0; | ||
| 851 | char *hostname; | ||
| 852 | void *node_replylen = addReplyDeferredLen(c); | ||
| 853 | |||
| 854 | addReplyBulkCString(c, "id"); | ||
| 855 | addReplyBulkCBuffer(c, clusterNodeGetName(node), CLUSTER_NAMELEN); | ||
| 856 | reply_count++; | ||
| 857 | |||
| 858 | if (clusterNodeTcpPort(node)) { | ||
| 859 | addReplyBulkCString(c, "port"); | ||
| 860 | addReplyLongLong(c, clusterNodeTcpPort(node)); | ||
| 861 | reply_count++; | ||
| 862 | } | ||
| 863 | |||
| 864 | if (clusterNodeTlsPort(node)) { | ||
| 865 | addReplyBulkCString(c, "tls-port"); | ||
| 866 | addReplyLongLong(c, clusterNodeTlsPort(node)); | ||
| 867 | reply_count++; | ||
| 868 | } | ||
| 869 | |||
| 870 | addReplyBulkCString(c, "ip"); | ||
| 871 | addReplyBulkCString(c, clusterNodeIp(node)); | ||
| 872 | reply_count++; | ||
| 873 | |||
| 874 | addReplyBulkCString(c, "endpoint"); | ||
| 875 | addReplyBulkCString(c, clusterNodePreferredEndpoint(node)); | ||
| 876 | reply_count++; | ||
| 877 | |||
| 878 | hostname = clusterNodeHostname(node); | ||
| 879 | if (hostname != NULL && *hostname != '\0') { | ||
| 880 | addReplyBulkCString(c, "hostname"); | ||
| 881 | addReplyBulkCString(c, hostname); | ||
| 882 | reply_count++; | ||
| 883 | } | ||
| 884 | |||
| 885 | long long node_offset; | ||
| 886 | if (clusterNodeIsMyself(node)) { | ||
| 887 | node_offset = clusterNodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset; | ||
| 888 | } else { | ||
| 889 | node_offset = clusterNodeReplOffset(node); | ||
| 890 | } | ||
| 891 | |||
| 892 | addReplyBulkCString(c, "role"); | ||
| 893 | addReplyBulkCString(c, clusterNodeIsSlave(node) ? "replica" : "master"); | ||
| 894 | reply_count++; | ||
| 895 | |||
| 896 | addReplyBulkCString(c, "replication-offset"); | ||
| 897 | addReplyLongLong(c, node_offset); | ||
| 898 | reply_count++; | ||
| 899 | |||
| 900 | addReplyBulkCString(c, "health"); | ||
| 901 | const char *health_msg = NULL; | ||
| 902 | if (clusterNodeIsFailing(node)) { | ||
| 903 | health_msg = "fail"; | ||
| 904 | } else if (clusterNodeIsSlave(node) && node_offset == 0) { | ||
| 905 | health_msg = "loading"; | ||
| 906 | } else { | ||
| 907 | health_msg = "online"; | ||
| 908 | } | ||
| 909 | addReplyBulkCString(c, health_msg); | ||
| 910 | reply_count++; | ||
| 911 | |||
| 912 | setDeferredMapLen(c, node_replylen, reply_count); | ||
| 913 | } | ||
| 914 | |||
| 915 | static clusterNode *clusterGetMasterFromShard(void *shard_handle) { | ||
| 916 | clusterNode *n = NULL; | ||
| 917 | void *node_it = clusterShardHandleGetNodeIterator(shard_handle); | ||
| 918 | while((n = clusterShardNodeIteratorNext(node_it)) != NULL) { | ||
| 919 | if (!clusterNodeIsFailing(n)) { | ||
| 920 | break; | ||
| 921 | } | ||
| 922 | } | ||
| 923 | clusterShardNodeIteratorFree(node_it); | ||
| 924 | if (!n) return NULL; | ||
| 925 | return clusterNodeGetMaster(n); | ||
| 926 | } | ||
| 927 | |||
| 928 | /* Add the shard reply of a single shard based off the given primary node. */ | ||
| 929 | void addShardReplyForClusterShards(client *c, void *shard_handle) { | ||
| 930 | serverAssert(clusterGetShardNodeCount(shard_handle) > 0); | ||
| 931 | addReplyMapLen(c, 2); | ||
| 932 | addReplyBulkCString(c, "slots"); | ||
| 933 | |||
| 934 | /* Use slot_info_pairs from the primary only */ | ||
| 935 | clusterNode *master_node = clusterGetMasterFromShard(shard_handle); | ||
| 936 | |||
| 937 | if (master_node && clusterNodeHasSlotInfo(master_node)) { | ||
| 938 | serverAssert((clusterNodeSlotInfoCount(master_node) % 2) == 0); | ||
| 939 | addReplyArrayLen(c, clusterNodeSlotInfoCount(master_node)); | ||
| 940 | for (int i = 0; i < clusterNodeSlotInfoCount(master_node); i++) | ||
| 941 | addReplyLongLong(c, (unsigned long)clusterNodeSlotInfoEntry(master_node, i)); | ||
| 942 | } else { | ||
| 943 | /* If no slot info pair is provided, the node owns no slots */ | ||
| 944 | addReplyArrayLen(c, 0); | ||
| 945 | } | ||
| 946 | |||
| 947 | addReplyBulkCString(c, "nodes"); | ||
| 948 | addReplyArrayLen(c, clusterGetShardNodeCount(shard_handle)); | ||
| 949 | void *node_it = clusterShardHandleGetNodeIterator(shard_handle); | ||
| 950 | for (clusterNode *n = clusterShardNodeIteratorNext(node_it); n != NULL; n = clusterShardNodeIteratorNext(node_it)) { | ||
| 951 | addNodeDetailsToShardReply(c, n); | ||
| 952 | clusterFreeNodesSlotsInfo(n); | ||
| 953 | } | ||
| 954 | clusterShardNodeIteratorFree(node_it); | ||
| 955 | } | ||
| 956 | |||
| 957 | /* Add to the output buffer of the given client, an array of slot (start, end) | ||
| 958 | * pair owned by the shard, also the primary and set of replica(s) along with | ||
| 959 | * information about each node. */ | ||
| 960 | void clusterCommandShards(client *c) { | ||
| 961 | addReplyArrayLen(c, clusterGetShardCount()); | ||
| 962 | /* This call will add slot_info_pairs to all nodes */ | ||
| 963 | clusterGenNodesSlotsInfo(0); | ||
| 964 | dictIterator *shard_it = clusterGetShardIterator(); | ||
| 965 | for(void *shard_handle = clusterNextShardHandle(shard_it); shard_handle != NULL; shard_handle = clusterNextShardHandle(shard_it)) { | ||
| 966 | addShardReplyForClusterShards(c, shard_handle); | ||
| 967 | } | ||
| 968 | clusterFreeShardIterator(shard_it); | ||
| 969 | } | ||
| 970 | |||
| 971 | void clusterCommandHelp(client *c) { | ||
| 972 | const char *help[] = { | ||
| 973 | "COUNTKEYSINSLOT <slot>", | ||
| 974 | " Return the number of keys in <slot>.", | ||
| 975 | "GETKEYSINSLOT <slot> <count>", | ||
| 976 | " Return key names stored by current node in a slot.", | ||
| 977 | "INFO", | ||
| 978 | " Return information about the cluster.", | ||
| 979 | "KEYSLOT <key>", | ||
| 980 | " Return the hash slot for <key>.", | ||
| 981 | "MYID", | ||
| 982 | " Return the node id.", | ||
| 983 | "MYSHARDID", | ||
| 984 | " Return the node's shard id.", | ||
| 985 | "NODES", | ||
| 986 | " Return cluster configuration seen by node. Output format:", | ||
| 987 | " <id> <ip:port@bus-port[,hostname]> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ...", | ||
| 988 | "REPLICAS <node-id>", | ||
| 989 | " Return <node-id> replicas.", | ||
| 990 | "SLOTS", | ||
| 991 | " Return information about slots range mappings. Each range is made of:", | ||
| 992 | " start, end, master and replicas IP addresses, ports and ids", | ||
| 993 | "SLOT-STATS", | ||
| 994 | " Return an array of slot usage statistics for slots assigned to the current node.", | ||
| 995 | "SHARDS", | ||
| 996 | " Return information about slot range mappings and the nodes associated with them.", | ||
| 997 | NULL | ||
| 998 | }; | ||
| 999 | |||
| 1000 | addExtendedReplyHelp(c, help, clusterCommandExtendedHelp()); | ||
| 1001 | } | ||
| 1002 | |||
| 1003 | void clusterCommand(client *c) { | ||
| 1004 | if (server.cluster_enabled == 0) { | ||
| 1005 | addReplyError(c,"This instance has cluster support disabled"); | ||
| 1006 | return; | ||
| 1007 | } | ||
| 1008 | |||
| 1009 | if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { | ||
| 1010 | clusterCommandHelp(c); | ||
| 1011 | } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) { | ||
| 1012 | /* CLUSTER NODES */ | ||
| 1013 | /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ | ||
| 1014 | sds nodes = clusterGenNodesDescription(c, 0, shouldReturnTlsInfo()); | ||
| 1015 | addReplyVerbatim(c,nodes,sdslen(nodes),"txt"); | ||
| 1016 | sdsfree(nodes); | ||
| 1017 | } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) { | ||
| 1018 | /* CLUSTER MYID */ | ||
| 1019 | clusterCommandMyId(c); | ||
| 1020 | } else if (!strcasecmp(c->argv[1]->ptr,"myshardid") && c->argc == 2) { | ||
| 1021 | /* CLUSTER MYSHARDID */ | ||
| 1022 | clusterCommandMyShardId(c); | ||
| 1023 | } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) { | ||
| 1024 | /* CLUSTER SLOTS */ | ||
| 1025 | clusterCommandSlots(c); | ||
| 1026 | } else if (!strcasecmp(c->argv[1]->ptr,"shards") && c->argc == 2) { | ||
| 1027 | /* CLUSTER SHARDS */ | ||
| 1028 | clusterCommandShards(c); | ||
| 1029 | } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) { | ||
| 1030 | /* CLUSTER INFO */ | ||
| 1031 | |||
| 1032 | sds info = genClusterInfoString(); | ||
| 1033 | |||
| 1034 | /* Produce the reply protocol. */ | ||
| 1035 | addReplyVerbatim(c,info,sdslen(info),"txt"); | ||
| 1036 | sdsfree(info); | ||
| 1037 | } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) { | ||
| 1038 | /* CLUSTER KEYSLOT <key> */ | ||
| 1039 | sds key = c->argv[2]->ptr; | ||
| 1040 | |||
| 1041 | addReplyLongLong(c,keyHashSlot(key,sdslen(key))); | ||
| 1042 | } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) { | ||
| 1043 | /* CLUSTER COUNTKEYSINSLOT <slot> */ | ||
| 1044 | long long slot; | ||
| 1045 | |||
| 1046 | if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK) | ||
| 1047 | return; | ||
| 1048 | if (slot < 0 || slot >= CLUSTER_SLOTS) { | ||
| 1049 | addReplyError(c,"Invalid slot"); | ||
| 1050 | return; | ||
| 1051 | } | ||
| 1052 | |||
| 1053 | if (!clusterCanAccessKeysInSlot(slot)) { | ||
| 1054 | addReplyLongLong(c, 0); | ||
| 1055 | return; | ||
| 1056 | } | ||
| 1057 | addReplyLongLong(c,countKeysInSlot(slot)); | ||
| 1058 | } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) { | ||
| 1059 | /* CLUSTER GETKEYSINSLOT <slot> <count> */ | ||
| 1060 | long long maxkeys, slot; | ||
| 1061 | |||
| 1062 | if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK) | ||
| 1063 | return; | ||
| 1064 | if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL) | ||
| 1065 | != C_OK) | ||
| 1066 | return; | ||
| 1067 | if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) { | ||
| 1068 | addReplyError(c,"Invalid slot or number of keys"); | ||
| 1069 | return; | ||
| 1070 | } | ||
| 1071 | |||
| 1072 | if (!clusterCanAccessKeysInSlot(slot)) { | ||
| 1073 | addReplyArrayLen(c, 0); | ||
| 1074 | return; | ||
| 1075 | } | ||
| 1076 | |||
| 1077 | unsigned int keys_in_slot = countKeysInSlot(slot); | ||
| 1078 | unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys; | ||
| 1079 | addReplyArrayLen(c,numkeys); | ||
| 1080 | kvstoreDictIterator kvs_di; | ||
| 1081 | dictEntry *de = NULL; | ||
| 1082 | kvstoreInitDictIterator(&kvs_di, server.db->keys, slot); | ||
| 1083 | for (unsigned int i = 0; i < numkeys; i++) { | ||
| 1084 | de = kvstoreDictIteratorNext(&kvs_di); | ||
| 1085 | serverAssert(de != NULL); | ||
| 1086 | sds sdskey = kvobjGetKey(dictGetKV(de)); | ||
| 1087 | addReplyBulkCBuffer(c, sdskey, sdslen(sdskey)); | ||
| 1088 | } | ||
| 1089 | kvstoreResetDictIterator(&kvs_di); | ||
| 1090 | } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") || | ||
| 1091 | !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) { | ||
| 1092 | /* CLUSTER SLAVES <NODE ID> */ | ||
| 1093 | /* CLUSTER REPLICAS <NODE ID> */ | ||
| 1094 | clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr)); | ||
| 1095 | int j; | ||
| 1096 | |||
| 1097 | /* Lookup the specified node in our table. */ | ||
| 1098 | if (!n) { | ||
| 1099 | addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr); | ||
| 1100 | return; | ||
| 1101 | } | ||
| 1102 | |||
| 1103 | if (clusterNodeIsSlave(n)) { | ||
| 1104 | addReplyError(c,"The specified node is not a master"); | ||
| 1105 | return; | ||
| 1106 | } | ||
| 1107 | |||
| 1108 | /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ | ||
| 1109 | addReplyArrayLen(c, clusterNodeNumSlaves(n)); | ||
| 1110 | for (j = 0; j < clusterNodeNumSlaves(n); j++) { | ||
| 1111 | sds ni = clusterGenNodeDescription(c, clusterNodeGetSlave(n, j), shouldReturnTlsInfo()); | ||
| 1112 | addReplyBulkCString(c,ni); | ||
| 1113 | sdsfree(ni); | ||
| 1114 | } | ||
| 1115 | } else if (!strcasecmp(c->argv[1]->ptr, "migration")) { | ||
| 1116 | clusterMigrationCommand(c); | ||
| 1117 | } else if (!strcasecmp(c->argv[1]->ptr,"syncslots") && c->argc >= 3) { | ||
| 1118 | clusterSyncSlotsCommand(c); | ||
| 1119 | } else if(!clusterCommandSpecial(c)) { | ||
| 1120 | addReplySubcommandSyntaxError(c); | ||
| 1121 | return; | ||
| 1122 | } | ||
| 1123 | } | ||
| 1124 | |||
| 1125 | /* Extract slot number from keys in a keys_result structure and return to caller. | ||
| 1126 | * Returns: | ||
| 1127 | * - The slot number if all keys belong to the same slot | ||
| 1128 | * - INVALID_CLUSTER_SLOT if there are no keys or cluster is disabled | ||
| 1129 | * - CLUSTER_CROSSSLOT if keys belong to different slots (cross-slot error) */ | ||
| 1130 | int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result) { | ||
| 1131 | if (keys_result->numkeys == 0 || !server.cluster_enabled) | ||
| 1132 | return INVALID_CLUSTER_SLOT; | ||
| 1133 | |||
| 1134 | int first_slot = INVALID_CLUSTER_SLOT; | ||
| 1135 | for (int j = 0; j < keys_result->numkeys; j++) { | ||
| 1136 | robj *this_key = argv[keys_result->keys[j].pos]; | ||
| 1137 | int this_slot = (int)keyHashSlot((char*)this_key->ptr, sdslen(this_key->ptr)); | ||
| 1138 | |||
| 1139 | if (first_slot == INVALID_CLUSTER_SLOT) | ||
| 1140 | first_slot = this_slot; | ||
| 1141 | else if (first_slot != this_slot) { | ||
| 1142 | return CLUSTER_CROSSSLOT; | ||
| 1143 | } | ||
| 1144 | } | ||
| 1145 | return first_slot; | ||
| 1146 | } | ||
| 1147 | |||
| 1148 | /* Return the pointer to the cluster node that is able to serve the command. | ||
| 1149 | * For the function to succeed the command should only target either: | ||
| 1150 | * | ||
| 1151 | * 1) A single key (even multiple times like RPOPLPUSH mylist mylist). | ||
| 1152 | * 2) Multiple keys in the same hash slot, while the slot is stable (no | ||
| 1153 | * resharding in progress). | ||
| 1154 | * | ||
| 1155 | * On success the function returns the node that is able to serve the request. | ||
| 1156 | * If the node is not 'myself' a redirection must be performed. The kind of | ||
| 1157 | * redirection is specified setting the integer passed by reference | ||
| 1158 | * 'error_code', which will be set to CLUSTER_REDIR_ASK or | ||
| 1159 | * CLUSTER_REDIR_MOVED. | ||
| 1160 | * | ||
| 1161 | * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE. | ||
| 1162 | * | ||
| 1163 | * If the command fails NULL is returned, and the reason of the failure is | ||
| 1164 | * provided via 'error_code', which will be set to: | ||
| 1165 | * | ||
| 1166 | * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that | ||
| 1167 | * don't belong to the same hash slot. | ||
| 1168 | * | ||
| 1169 | * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys | ||
| 1170 | * belonging to the same slot, but the slot is not stable (in migration or | ||
| 1171 | * importing state, likely because a resharding is in progress). | ||
| 1172 | * | ||
| 1173 | * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is | ||
| 1174 | * not bound to any node. In this case the cluster global state should be | ||
| 1175 | * already "down" but it is fragile to rely on the update of the global state, | ||
| 1176 | * so we also handle it here. | ||
| 1177 | * | ||
| 1178 | * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is | ||
| 1179 | * down but the user attempts to execute a command that addresses one or more keys. */ | ||
| 1180 | clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, | ||
| 1181 | getKeysResult *keys_result, uint8_t read_error, uint64_t cmd_flags, int *error_code) | ||
| 1182 | { | ||
| 1183 | clusterNode *myself = getMyClusterNode(); | ||
| 1184 | clusterNode *n = NULL; | ||
| 1185 | robj *firstkey = NULL; | ||
| 1186 | int multiple_keys = 0; | ||
| 1187 | multiState *ms, _ms; | ||
| 1188 | pendingCommand mc; | ||
| 1189 | pendingCommand *mcp = &mc; | ||
| 1190 | int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0, | ||
| 1191 | existing_keys = 0; | ||
| 1192 | int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */ | ||
| 1193 | |||
| 1194 | /* Allow any key to be set if a module disabled cluster redirections. */ | ||
| 1195 | if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) | ||
| 1196 | return myself; | ||
| 1197 | |||
| 1198 | /* Set error code optimistically for the base case. */ | ||
| 1199 | if (error_code) *error_code = CLUSTER_REDIR_NONE; | ||
| 1200 | |||
| 1201 | /* Modules can turn off Redis Cluster redirection: this is useful | ||
| 1202 | * when writing a module that implements a completely different | ||
| 1203 | * distributed system. */ | ||
| 1204 | |||
| 1205 | /* We handle all the cases as if they were EXEC commands, so we have | ||
| 1206 | * a common code path for everything */ | ||
| 1207 | if (cmd->proc == execCommand) { | ||
| 1208 | /* If CLIENT_MULTI flag is not set EXEC is just going to return an | ||
| 1209 | * error. */ | ||
| 1210 | if (!(c->flags & CLIENT_MULTI)) return myself; | ||
| 1211 | ms = &c->mstate; | ||
| 1212 | } else { | ||
| 1213 | /* In order to have a single codepath create a fake Multi State | ||
| 1214 | * structure if the client is not in MULTI/EXEC state, this way | ||
| 1215 | * we have a single codepath below. */ | ||
| 1216 | ms = &_ms; | ||
| 1217 | _ms.commands = &mcp; | ||
| 1218 | _ms.count = 1; | ||
| 1219 | |||
| 1220 | /* Properly initialize the fake pendingCommand */ | ||
| 1221 | initPendingCommand(&mc); | ||
| 1222 | mc.argv = argv; | ||
| 1223 | mc.argc = argc; | ||
| 1224 | mc.cmd = cmd; | ||
| 1225 | mc.slot = hashslot ? *hashslot : INVALID_CLUSTER_SLOT; | ||
| 1226 | mc.read_error = read_error; | ||
| 1227 | if (keys_result) { | ||
| 1228 | mc.keys_result = *keys_result; | ||
| 1229 | mc.flags |= PENDING_CMD_KEYS_RESULT_VALID; | ||
| 1230 | } | ||
| 1231 | } | ||
| 1232 | |||
| 1233 | /* Check that all the keys are in the same hash slot, and obtain this | ||
| 1234 | * slot and the node associated. */ | ||
| 1235 | for (i = 0; i < ms->count; i++) { | ||
| 1236 | struct redisCommand *mcmd; | ||
| 1237 | robj **margv; | ||
| 1238 | int margc, j; | ||
| 1239 | keyReference *keyindex; | ||
| 1240 | |||
| 1241 | pendingCommand *pcmd = ms->commands[i]; | ||
| 1242 | |||
| 1243 | mcmd = pcmd->cmd; | ||
| 1244 | margc = pcmd->argc; | ||
| 1245 | margv = pcmd->argv; | ||
| 1246 | |||
| 1247 | /* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */ | ||
| 1248 | if (!pubsubshard_included && | ||
| 1249 | doesCommandHaveChannelsWithFlags(mcmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE)) | ||
| 1250 | { | ||
| 1251 | pubsubshard_included = 1; | ||
| 1252 | } | ||
| 1253 | |||
| 1254 | /* If we have a cached keys result from preprocessCommand(), use it. | ||
| 1255 | * Otherwise, extract keys result. */ | ||
| 1256 | int use_cache_keys_result = pcmd->flags & PENDING_CMD_KEYS_RESULT_VALID; | ||
| 1257 | getKeysResult result = GETKEYS_RESULT_INIT; | ||
| 1258 | if (use_cache_keys_result) | ||
| 1259 | result = pcmd->keys_result; | ||
| 1260 | else | ||
| 1261 | getKeysFromCommand(mcmd,margv,margc,&result); | ||
| 1262 | keyindex = result.keys; | ||
| 1263 | |||
| 1264 | for (j = 0; j < result.numkeys; j++) { | ||
| 1265 | /* The command has keys and was checked for cross-slot between its keys in preprocessCommand() */ | ||
| 1266 | if (pcmd->read_error == CLIENT_READ_CROSS_SLOT) { | ||
| 1267 | /* Error: multiple keys from different slots. */ | ||
| 1268 | if (error_code) | ||
| 1269 | *error_code = CLUSTER_REDIR_CROSS_SLOT; | ||
| 1270 | return NULL; | ||
| 1271 | } | ||
| 1272 | |||
| 1273 | robj *thiskey = margv[keyindex[j].pos]; | ||
| 1274 | int thisslot = pcmd->slot; | ||
| 1275 | if (thisslot == INVALID_CLUSTER_SLOT) | ||
| 1276 | thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr)); | ||
| 1277 | |||
| 1278 | if (firstkey == NULL) { | ||
| 1279 | /* This is the first key we see. Check what is the slot | ||
| 1280 | * and node. */ | ||
| 1281 | firstkey = thiskey; | ||
| 1282 | slot = thisslot; | ||
| 1283 | n = getNodeBySlot(slot); | ||
| 1284 | |||
| 1285 | /* Error: If a slot is not served, we are in "cluster down" | ||
| 1286 | * state. However the state is yet to be updated, so this was | ||
| 1287 | * not trapped earlier in processCommand(). Report the same | ||
| 1288 | * error to the client. */ | ||
| 1289 | if (n == NULL) { | ||
| 1290 | if (!use_cache_keys_result) getKeysFreeResult(&result); | ||
| 1291 | if (error_code) | ||
| 1292 | *error_code = CLUSTER_REDIR_DOWN_UNBOUND; | ||
| 1293 | return NULL; | ||
| 1294 | } | ||
| 1295 | |||
| 1296 | /* If we are migrating or importing this slot, we need to check | ||
| 1297 | * if we have all the keys in the request (the only way we | ||
| 1298 | * can safely serve the request, otherwise we return a TRYAGAIN | ||
| 1299 | * error). To do so we set the importing/migrating state and | ||
| 1300 | * increment a counter for every missing key. */ | ||
| 1301 | if (n == myself && | ||
| 1302 | getMigratingSlotDest(slot) != NULL) | ||
| 1303 | { | ||
| 1304 | migrating_slot = 1; | ||
| 1305 | } else if (getImportingSlotSource(slot) != NULL) { | ||
| 1306 | importing_slot = 1; | ||
| 1307 | } | ||
| 1308 | } else { | ||
| 1309 | /* If it is not the first key/channel, make sure it is exactly | ||
| 1310 | * the same key/channel as the first we saw. */ | ||
| 1311 | if (slot != thisslot) { | ||
| 1312 | /* Error: multiple keys from different slots. */ | ||
| 1313 | if (!use_cache_keys_result) getKeysFreeResult(&result); | ||
| 1314 | if (error_code) | ||
| 1315 | *error_code = CLUSTER_REDIR_CROSS_SLOT; | ||
| 1316 | return NULL; | ||
| 1317 | } | ||
| 1318 | if (importing_slot && !multiple_keys && !equalStringObjects(firstkey,thiskey)) { | ||
| 1319 | /* Flag this request as one with multiple different | ||
| 1320 | * keys/channels when the slot is in importing state. */ | ||
| 1321 | multiple_keys = 1; | ||
| 1322 | } | ||
| 1323 | } | ||
| 1324 | |||
| 1325 | /* Migrating / Importing slot? Count keys we don't have. | ||
| 1326 | * If it is pubsubshard command, it isn't required to check | ||
| 1327 | * the channel being present or not in the node during the | ||
| 1328 | * slot migration, the channel will be served from the source | ||
| 1329 | * node until the migration completes with CLUSTER SETSLOT <slot> | ||
| 1330 | * NODE <node-id>. */ | ||
| 1331 | int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE; | ||
| 1332 | if ((migrating_slot || importing_slot) && !pubsubshard_included) | ||
| 1333 | { | ||
| 1334 | if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++; | ||
| 1335 | else existing_keys++; | ||
| 1336 | } | ||
| 1337 | } | ||
| 1338 | if (!use_cache_keys_result) getKeysFreeResult(&result); | ||
| 1339 | } | ||
| 1340 | |||
| 1341 | /* No key at all in command? then we can serve the request | ||
| 1342 | * without redirections or errors in all the cases. */ | ||
| 1343 | if (n == NULL) return myself; | ||
| 1344 | |||
| 1345 | /* Cluster is globally down but we got keys? We only serve the request | ||
| 1346 | * if it is a read command and when allow_reads_when_down is enabled. */ | ||
| 1347 | if (!isClusterHealthy()) { | ||
| 1348 | if (pubsubshard_included) { | ||
| 1349 | if (!server.cluster_allow_pubsubshard_when_down) { | ||
| 1350 | if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; | ||
| 1351 | return NULL; | ||
| 1352 | } | ||
| 1353 | } else if (!server.cluster_allow_reads_when_down) { | ||
| 1354 | /* The cluster is configured to block commands when the | ||
| 1355 | * cluster is down. */ | ||
| 1356 | if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; | ||
| 1357 | return NULL; | ||
| 1358 | } else if (cmd_flags & CMD_WRITE) { | ||
| 1359 | /* The cluster is configured to allow read only commands */ | ||
| 1360 | if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE; | ||
| 1361 | return NULL; | ||
| 1362 | } else { | ||
| 1363 | /* Fall through and allow the command to be executed: | ||
| 1364 | * this happens when server.cluster_allow_reads_when_down is | ||
| 1365 | * true and the command is not a write command */ | ||
| 1366 | } | ||
| 1367 | } | ||
| 1368 | |||
| 1369 | /* Return the hashslot by reference. */ | ||
| 1370 | if (hashslot) *hashslot = slot; | ||
| 1371 | |||
| 1372 | /* MIGRATE always works in the context of the local node if the slot | ||
| 1373 | * is open (migrating or importing state). We need to be able to freely | ||
| 1374 | * move keys among instances in this case. */ | ||
| 1375 | if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand) | ||
| 1376 | return myself; | ||
| 1377 | |||
| 1378 | /* If we don't have all the keys and we are migrating the slot, send | ||
| 1379 | * an ASK redirection or TRYAGAIN. */ | ||
| 1380 | if (migrating_slot && missing_keys) { | ||
| 1381 | /* If we have keys but we don't have all keys, we return TRYAGAIN */ | ||
| 1382 | if (existing_keys) { | ||
| 1383 | if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE; | ||
| 1384 | return NULL; | ||
| 1385 | } else { | ||
| 1386 | if (error_code) *error_code = CLUSTER_REDIR_ASK; | ||
| 1387 | return getMigratingSlotDest(slot); | ||
| 1388 | } | ||
| 1389 | } | ||
| 1390 | |||
| 1391 | /* If we are receiving the slot, and the client correctly flagged the | ||
| 1392 | * request as "ASKING", we can serve the request. However if the request | ||
| 1393 | * involves multiple keys and we don't have them all, the only option is | ||
| 1394 | * to send a TRYAGAIN error. */ | ||
| 1395 | if (importing_slot && | ||
| 1396 | (c->flags & CLIENT_ASKING || cmd_flags & CMD_ASKING)) | ||
| 1397 | { | ||
| 1398 | if (multiple_keys && missing_keys) { | ||
| 1399 | if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE; | ||
| 1400 | return NULL; | ||
| 1401 | } else { | ||
| 1402 | return myself; | ||
| 1403 | } | ||
| 1404 | } | ||
| 1405 | |||
| 1406 | /* Handle the read-only client case reading from a slave: if this | ||
| 1407 | * node is a slave and the request is about a hash slot our master | ||
| 1408 | * is serving, we can reply without redirection. */ | ||
| 1409 | int is_write_command = (cmd_flags & CMD_WRITE) || | ||
| 1410 | (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); | ||
| 1411 | if (((c->flags & CLIENT_READONLY) || pubsubshard_included) && | ||
| 1412 | !is_write_command && | ||
| 1413 | clusterNodeIsSlave(myself) && | ||
| 1414 | clusterNodeGetSlaveof(myself) == n) | ||
| 1415 | { | ||
| 1416 | return myself; | ||
| 1417 | } | ||
| 1418 | |||
| 1419 | /* Base case: just return the right node. However, if this node is not | ||
| 1420 | * myself, set error_code to MOVED since we need to issue a redirection. */ | ||
| 1421 | if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED; | ||
| 1422 | return n; | ||
| 1423 | } | ||
| 1424 | |||
| 1425 | /* Send the client the right redirection code, according to error_code | ||
| 1426 | * that should be set to one of CLUSTER_REDIR_* macros. | ||
| 1427 | * | ||
| 1428 | * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes | ||
| 1429 | * are used, then the node 'n' should not be NULL, but should be the | ||
| 1430 | * node we want to mention in the redirection. Moreover hashslot should | ||
| 1431 | * be set to the hash slot that caused the redirection. */ | ||
| 1432 | void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) { | ||
| 1433 | if (error_code == CLUSTER_REDIR_CROSS_SLOT) { | ||
| 1434 | addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot"); | ||
| 1435 | } else if (error_code == CLUSTER_REDIR_UNSTABLE) { | ||
| 1436 | /* The request spawns multiple keys in the same slot, | ||
| 1437 | * but the slot is not "stable" currently as there is | ||
| 1438 | * a migration or import in progress. */ | ||
| 1439 | addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot"); | ||
| 1440 | } else if (error_code == CLUSTER_REDIR_DOWN_STATE) { | ||
| 1441 | addReplyError(c,"-CLUSTERDOWN The cluster is down"); | ||
| 1442 | } else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) { | ||
| 1443 | addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands"); | ||
| 1444 | } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) { | ||
| 1445 | addReplyError(c,"-CLUSTERDOWN Hash slot not served"); | ||
| 1446 | } else if (error_code == CLUSTER_REDIR_MOVED || | ||
| 1447 | error_code == CLUSTER_REDIR_ASK) | ||
| 1448 | { | ||
| 1449 | /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ | ||
| 1450 | int port = clusterNodeClientPort(n, shouldReturnTlsInfo()); | ||
| 1451 | addReplyErrorSds(c,sdscatprintf(sdsempty(), | ||
| 1452 | "-%s %d %s:%d", | ||
| 1453 | (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED", | ||
| 1454 | hashslot, clusterNodePreferredEndpoint(n), port)); | ||
| 1455 | } else { | ||
| 1456 | serverPanic("getNodeByQuery() unknown error."); | ||
| 1457 | } | ||
| 1458 | } | ||
| 1459 | |||
| 1460 | /* This function is called by the function processing clients incrementally | ||
| 1461 | * to detect timeouts, in order to handle the following case: | ||
| 1462 | * | ||
| 1463 | * 1) A client blocks with BLPOP or similar blocking operation. | ||
| 1464 | * 2) The master migrates the hash slot elsewhere or turns into a slave. | ||
| 1465 | * 3) The client may remain blocked forever (or up to the max timeout time) | ||
| 1466 | * waiting for a key change that will never happen. | ||
| 1467 | * | ||
| 1468 | * If the client is found to be blocked into a hash slot this node no | ||
| 1469 | * longer handles, the client is sent a redirection error, and the function | ||
| 1470 | * returns 1. Otherwise 0 is returned and no operation is performed. */ | ||
| 1471 | int clusterRedirectBlockedClientIfNeeded(client *c) { | ||
| 1472 | clusterNode *myself = getMyClusterNode(); | ||
| 1473 | if (c->flags & CLIENT_BLOCKED && | ||
| 1474 | (c->bstate.btype == BLOCKED_LIST || | ||
| 1475 | c->bstate.btype == BLOCKED_ZSET || | ||
| 1476 | c->bstate.btype == BLOCKED_STREAM || | ||
| 1477 | c->bstate.btype == BLOCKED_MODULE)) | ||
| 1478 | { | ||
| 1479 | dictEntry *de; | ||
| 1480 | dictIterator di; | ||
| 1481 | |||
| 1482 | /* If the cluster is down, unblock the client with the right error. | ||
| 1483 | * If the cluster is configured to allow reads on cluster down, we | ||
| 1484 | * still want to emit this error since a write will be required | ||
| 1485 | * to unblock them which may never come. */ | ||
| 1486 | if (!isClusterHealthy()) { | ||
| 1487 | clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE); | ||
| 1488 | return 1; | ||
| 1489 | } | ||
| 1490 | |||
| 1491 | /* If the client is blocked on module, but not on a specific key, | ||
| 1492 | * don't unblock it (except for the CLUSTER_FAIL case above). */ | ||
| 1493 | if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c)) | ||
| 1494 | return 0; | ||
| 1495 | |||
| 1496 | /* All keys must belong to the same slot, so check first key only. */ | ||
| 1497 | dictInitIterator(&di, c->bstate.keys); | ||
| 1498 | if ((de = dictNext(&di)) != NULL) { | ||
| 1499 | robj *key = dictGetKey(de); | ||
| 1500 | int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr)); | ||
| 1501 | clusterNode *node = getNodeBySlot(slot); | ||
| 1502 | |||
| 1503 | /* if the client is read-only and attempting to access key that our | ||
| 1504 | * replica can handle, allow it. */ | ||
| 1505 | if ((c->flags & CLIENT_READONLY) && | ||
| 1506 | !(c->lastcmd->flags & CMD_WRITE) && | ||
| 1507 | clusterNodeIsSlave(myself) && clusterNodeGetSlaveof(myself) == node) | ||
| 1508 | { | ||
| 1509 | node = myself; | ||
| 1510 | } | ||
| 1511 | |||
| 1512 | /* We send an error and unblock the client if: | ||
| 1513 | * 1) The slot is unassigned, emitting a cluster down error. | ||
| 1514 | * 2) The slot is not handled by this node, nor being imported. */ | ||
| 1515 | if (node != myself && getImportingSlotSource(slot) == NULL) | ||
| 1516 | { | ||
| 1517 | if (node == NULL) { | ||
| 1518 | clusterRedirectClient(c,NULL,0, | ||
| 1519 | CLUSTER_REDIR_DOWN_UNBOUND); | ||
| 1520 | } else { | ||
| 1521 | clusterRedirectClient(c,node,slot, | ||
| 1522 | CLUSTER_REDIR_MOVED); | ||
| 1523 | } | ||
| 1524 | dictResetIterator(&di); | ||
| 1525 | return 1; | ||
| 1526 | } | ||
| 1527 | } | ||
| 1528 | dictResetIterator(&di); | ||
| 1529 | } | ||
| 1530 | return 0; | ||
| 1531 | } | ||
| 1532 | |||
| 1533 | /* Returns an indication if the replica node is fully available | ||
| 1534 | * and should be listed in CLUSTER SLOTS response. | ||
| 1535 | * Returns 1 for available nodes, 0 for nodes that have | ||
| 1536 | * not finished their initial sync, in failed state, or are | ||
| 1537 | * otherwise considered not available to serve read commands. */ | ||
| 1538 | static int isReplicaAvailable(clusterNode *node) { | ||
| 1539 | if (clusterNodeIsFailing(node)) { | ||
| 1540 | return 0; | ||
| 1541 | } | ||
| 1542 | long long repl_offset = clusterNodeReplOffset(node); | ||
| 1543 | if (clusterNodeIsMyself(node)) { | ||
| 1544 | /* Nodes do not update their own information | ||
| 1545 | * in the cluster node list. */ | ||
| 1546 | repl_offset = replicationGetSlaveOffset(); | ||
| 1547 | } | ||
| 1548 | return (repl_offset != 0); | ||
| 1549 | } | ||
| 1550 | |||
| 1551 | void addNodeToNodeReply(client *c, clusterNode *node) { | ||
| 1552 | char* hostname = clusterNodeHostname(node); | ||
| 1553 | addReplyArrayLen(c, 4); | ||
| 1554 | if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_IP) { | ||
| 1555 | addReplyBulkCString(c, clusterNodeIp(node)); | ||
| 1556 | } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_HOSTNAME) { | ||
| 1557 | if (hostname != NULL && hostname[0] != '\0') { | ||
| 1558 | addReplyBulkCString(c, hostname); | ||
| 1559 | } else { | ||
| 1560 | addReplyBulkCString(c, "?"); | ||
| 1561 | } | ||
| 1562 | } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT) { | ||
| 1563 | addReplyNull(c); | ||
| 1564 | } else { | ||
| 1565 | serverPanic("Unrecognized preferred endpoint type"); | ||
| 1566 | } | ||
| 1567 | |||
| 1568 | /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */ | ||
| 1569 | addReplyLongLong(c, clusterNodeClientPort(node, shouldReturnTlsInfo())); | ||
| 1570 | addReplyBulkCBuffer(c, clusterNodeGetName(node), CLUSTER_NAMELEN); | ||
| 1571 | |||
| 1572 | /* Add the additional endpoint information, this is all the known networking information | ||
| 1573 | * that is not the preferred endpoint. Note the logic is evaluated twice so we can | ||
| 1574 | * correctly report the number of additional network arguments without using a deferred | ||
| 1575 | * map, an assertion is made at the end to check we set the right length. */ | ||
| 1576 | int length = 0; | ||
| 1577 | if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) { | ||
| 1578 | length++; | ||
| 1579 | } | ||
| 1580 | if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME | ||
| 1581 | && hostname != NULL && hostname[0] != '\0') | ||
| 1582 | { | ||
| 1583 | length++; | ||
| 1584 | } | ||
| 1585 | addReplyMapLen(c, length); | ||
| 1586 | |||
| 1587 | if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) { | ||
| 1588 | addReplyBulkCString(c, "ip"); | ||
| 1589 | addReplyBulkCString(c, clusterNodeIp(node)); | ||
| 1590 | length--; | ||
| 1591 | } | ||
| 1592 | if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME | ||
| 1593 | && hostname != NULL && hostname[0] != '\0') | ||
| 1594 | { | ||
| 1595 | addReplyBulkCString(c, "hostname"); | ||
| 1596 | addReplyBulkCString(c, hostname); | ||
| 1597 | length--; | ||
| 1598 | } | ||
| 1599 | serverAssert(length == 0); | ||
| 1600 | } | ||
| 1601 | |||
| 1602 | void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) { | ||
| 1603 | int i, nested_elements = 3; /* slots (2) + master addr (1) */ | ||
| 1604 | for (i = 0; i < clusterNodeNumSlaves(node); i++) { | ||
| 1605 | if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue; | ||
| 1606 | nested_elements++; | ||
| 1607 | } | ||
| 1608 | addReplyArrayLen(c, nested_elements); | ||
| 1609 | addReplyLongLong(c, start_slot); | ||
| 1610 | addReplyLongLong(c, end_slot); | ||
| 1611 | addNodeToNodeReply(c, node); | ||
| 1612 | |||
| 1613 | /* Remaining nodes in reply are replicas for slot range */ | ||
| 1614 | for (i = 0; i < clusterNodeNumSlaves(node); i++) { | ||
| 1615 | /* This loop is copy/pasted from clusterGenNodeDescription() | ||
| 1616 | * with modifications for per-slot node aggregation. */ | ||
| 1617 | if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue; | ||
| 1618 | addNodeToNodeReply(c, clusterNodeGetSlave(node, i)); | ||
| 1619 | nested_elements--; | ||
| 1620 | } | ||
| 1621 | serverAssert(nested_elements == 3); /* Original 3 elements */ | ||
| 1622 | } | ||
| 1623 | |||
| 1624 | void clusterCommandSlots(client * c) { | ||
| 1625 | /* Format: 1) 1) start slot | ||
| 1626 | * 2) end slot | ||
| 1627 | * 3) 1) master IP | ||
| 1628 | * 2) master port | ||
| 1629 | * 3) node ID | ||
| 1630 | * 4) 1) replica IP | ||
| 1631 | * 2) replica port | ||
| 1632 | * 3) node ID | ||
| 1633 | * ... continued until done | ||
| 1634 | */ | ||
| 1635 | clusterNode *n = NULL; | ||
| 1636 | int num_masters = 0, start = -1; | ||
| 1637 | void *slot_replylen = addReplyDeferredLen(c); | ||
| 1638 | |||
| 1639 | for (int i = 0; i <= CLUSTER_SLOTS; i++) { | ||
| 1640 | /* Find start node and slot id. */ | ||
| 1641 | if (n == NULL) { | ||
| 1642 | if (i == CLUSTER_SLOTS) break; | ||
| 1643 | n = getNodeBySlot(i); | ||
| 1644 | start = i; | ||
| 1645 | continue; | ||
| 1646 | } | ||
| 1647 | |||
| 1648 | /* Add cluster slots info when occur different node with start | ||
| 1649 | * or end of slot. */ | ||
| 1650 | if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) { | ||
| 1651 | addNodeReplyForClusterSlot(c, n, start, i-1); | ||
| 1652 | num_masters++; | ||
| 1653 | if (i == CLUSTER_SLOTS) break; | ||
| 1654 | n = getNodeBySlot(i); | ||
| 1655 | start = i; | ||
| 1656 | } | ||
| 1657 | } | ||
| 1658 | setDeferredArrayLen(c, slot_replylen, num_masters); | ||
| 1659 | } | ||
| 1660 | |||
| 1661 | /* ----------------------------------------------------------------------------- | ||
| 1662 | * Cluster functions related to serving / redirecting clients | ||
| 1663 | * -------------------------------------------------------------------------- */ | ||
| 1664 | |||
| 1665 | /* The ASKING command is required after a -ASK redirection. | ||
| 1666 | * The client should issue ASKING before to actually send the command to | ||
| 1667 | * the target instance. See the Redis Cluster specification for more | ||
| 1668 | * information. */ | ||
| 1669 | void askingCommand(client *c) { | ||
| 1670 | if (server.cluster_enabled == 0) { | ||
| 1671 | addReplyError(c,"This instance has cluster support disabled"); | ||
| 1672 | return; | ||
| 1673 | } | ||
| 1674 | c->flags |= CLIENT_ASKING; | ||
| 1675 | addReply(c,shared.ok); | ||
| 1676 | } | ||
| 1677 | |||
| 1678 | /* The READONLY command is used by clients to enter the read-only mode. | ||
| 1679 | * In this mode slaves will not redirect clients as long as clients access | ||
| 1680 | * with read-only commands to keys that are served by the slave's master. */ | ||
| 1681 | void readonlyCommand(client *c) { | ||
| 1682 | if (server.cluster_enabled == 0) { | ||
| 1683 | addReplyError(c,"This instance has cluster support disabled"); | ||
| 1684 | return; | ||
| 1685 | } | ||
| 1686 | c->flags |= CLIENT_READONLY; | ||
| 1687 | addReply(c,shared.ok); | ||
| 1688 | } | ||
| 1689 | |||
| 1690 | /* Remove all the keys in the specified hash slot. | ||
| 1691 | * The number of removed items is returned. */ | ||
| 1692 | unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command) { | ||
| 1693 | unsigned int j = 0; | ||
| 1694 | |||
| 1695 | if (!kvstoreDictSize(server.db->keys, (int) hashslot)) | ||
| 1696 | return 0; | ||
| 1697 | |||
| 1698 | kvstoreDictIterator kvs_di; | ||
| 1699 | dictEntry *de = NULL; | ||
| 1700 | kvstoreInitDictSafeIterator(&kvs_di, server.db->keys, (int) hashslot); | ||
| 1701 | while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) { | ||
| 1702 | enterExecutionUnit(1, 0); | ||
| 1703 | sds sdskey = kvobjGetKey(dictGetKV(de)); | ||
| 1704 | robj *key = createStringObject(sdskey, sdslen(sdskey)); | ||
| 1705 | dbDelete(&server.db[0], key); | ||
| 1706 | |||
| 1707 | keyModified(NULL, &server.db[0], key, NULL, 1); | ||
| 1708 | if (by_command) { | ||
| 1709 | /* Keys are deleted by a command (trimslots), we need to notify the | ||
| 1710 | * keyspace event. Though, we don't need to propagate the DEL | ||
| 1711 | * command, as the command (trimslots) will be propagated. */ | ||
| 1712 | notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id); | ||
| 1713 | } else { | ||
| 1714 | /* Propagate the DEL command */ | ||
| 1715 | propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del); | ||
| 1716 | /* The keys are not actually logically deleted from the database, | ||
| 1717 | * just moved to another node. The modules needs to know that these | ||
| 1718 | * keys are no longer available locally, so just send the keyspace | ||
| 1719 | * notification to the modules, but not to clients. */ | ||
| 1720 | moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id); | ||
| 1721 | } | ||
| 1722 | exitExecutionUnit(); | ||
| 1723 | postExecutionUnitOperations(); | ||
| 1724 | decrRefCount(key); | ||
| 1725 | j++; | ||
| 1726 | server.dirty++; | ||
| 1727 | } | ||
| 1728 | kvstoreResetDictIterator(&kvs_di); | ||
| 1729 | return j; | ||
| 1730 | } | ||
| 1731 | |||
| 1732 | /* Delete the keys in the slot ranges. Returns the number of deleted items */ | ||
| 1733 | unsigned int clusterDelKeysInSlotRangeArray(slotRangeArray *slots, int by_command) { | ||
| 1734 | unsigned int j = 0; | ||
| 1735 | for (int i = 0; i < slots->num_ranges; i++) { | ||
| 1736 | for (int slot = slots->ranges[i].start; slot <= slots->ranges[i].end; slot++) { | ||
| 1737 | j += clusterDelKeysInSlot(slot, by_command); | ||
| 1738 | } | ||
| 1739 | } | ||
| 1740 | return j; | ||
| 1741 | } | ||
| 1742 | |||
| 1743 | int clusterIsMySlot(int slot) { | ||
| 1744 | return getMyClusterNode() == getNodeBySlot(slot); | ||
| 1745 | } | ||
| 1746 | |||
| 1747 | void replySlotsFlushAndFree(client *c, slotRangeArray *slots) { | ||
| 1748 | addReplyArrayLen(c, slots->num_ranges); | ||
| 1749 | for (int i = 0 ; i < slots->num_ranges ; i++) { | ||
| 1750 | addReplyArrayLen(c, 2); | ||
| 1751 | addReplyLongLong(c, slots->ranges[i].start); | ||
| 1752 | addReplyLongLong(c, slots->ranges[i].end); | ||
| 1753 | } | ||
| 1754 | slotRangeArrayFree(slots); | ||
| 1755 | } | ||
| 1756 | |||
| 1757 | /* Normalizes (sorts and merges adjacent ranges), checks that slot ranges are | ||
| 1758 | * well-formed and non-overlapping. */ | ||
| 1759 | int slotRangeArrayNormalizeAndValidate(slotRangeArray *slots, sds *err) { | ||
| 1760 | unsigned char used_slots[CLUSTER_SLOTS] = {0}; | ||
| 1761 | |||
| 1762 | if (slots->num_ranges <= 0 || slots->num_ranges >= CLUSTER_SLOTS) { | ||
| 1763 | *err = sdscatprintf(sdsempty(), "invalid number of slot ranges: %d", slots->num_ranges); | ||
| 1764 | return C_ERR; | ||
| 1765 | } | ||
| 1766 | |||
| 1767 | /* Sort and merge adjacent slot ranges. */ | ||
| 1768 | slotRangeArraySortAndMerge(slots); | ||
| 1769 | |||
| 1770 | for (int i = 0; i < slots->num_ranges; i++) { | ||
| 1771 | if (slots->ranges[i].start >= CLUSTER_SLOTS || | ||
| 1772 | slots->ranges[i].end >= CLUSTER_SLOTS) | ||
| 1773 | { | ||
| 1774 | *err = sdscatprintf(sdsempty(), "slot range is out of range: %d-%d", | ||
| 1775 | slots->ranges[i].start, slots->ranges[i].end); | ||
| 1776 | return C_ERR; | ||
| 1777 | } | ||
| 1778 | |||
| 1779 | if (slots->ranges[i].start > slots->ranges[i].end) { | ||
| 1780 | *err = sdscatprintf(sdsempty(), "start slot number %d is greater than end slot number %d", | ||
| 1781 | slots->ranges[i].start, slots->ranges[i].end); | ||
| 1782 | return C_ERR; | ||
| 1783 | } | ||
| 1784 | |||
| 1785 | for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { | ||
| 1786 | if (used_slots[j]) { | ||
| 1787 | *err = sdscatprintf(sdsempty(), "Slot %d specified multiple times", j); | ||
| 1788 | return C_ERR; | ||
| 1789 | } | ||
| 1790 | used_slots[j]++; | ||
| 1791 | } | ||
| 1792 | } | ||
| 1793 | return C_OK; | ||
| 1794 | } | ||
| 1795 | |||
| 1796 | /* Create a slot range array with the specified number of ranges. */ | ||
| 1797 | slotRangeArray *slotRangeArrayCreate(int num_ranges) { | ||
| 1798 | slotRangeArray *slots = zcalloc(sizeof(slotRangeArray) + num_ranges * sizeof(slotRange)); | ||
| 1799 | slots->num_ranges = num_ranges; | ||
| 1800 | return slots; | ||
| 1801 | } | ||
| 1802 | |||
| 1803 | /* Duplicate the slot range array. */ | ||
| 1804 | slotRangeArray *slotRangeArrayDup(slotRangeArray *slots) { | ||
| 1805 | slotRangeArray *dup = slotRangeArrayCreate(slots->num_ranges); | ||
| 1806 | memcpy(dup->ranges, slots->ranges, sizeof(slotRange) * slots->num_ranges); | ||
| 1807 | return dup; | ||
| 1808 | } | ||
| 1809 | |||
| 1810 | /* Set the slot range at the specified index. */ | ||
| 1811 | void slotRangeArraySet(slotRangeArray *slots, int idx, int start, int end) { | ||
| 1812 | slots->ranges[idx].start = start; | ||
| 1813 | slots->ranges[idx].end = end; | ||
| 1814 | } | ||
| 1815 | |||
| 1816 | /* Create a slot range string in the format of: "1000-2000 3000-4000 ..." */ | ||
| 1817 | sds slotRangeArrayToString(slotRangeArray *slots) { | ||
| 1818 | sds s = sdsempty(); | ||
| 1819 | if (slots == NULL || slots->num_ranges == 0) return s; | ||
| 1820 | |||
| 1821 | for (int i = 0; i < slots->num_ranges; i++) { | ||
| 1822 | slotRange *sr = &slots->ranges[i]; | ||
| 1823 | s = sdscatprintf(s, "%d-%d ", sr->start, sr->end); | ||
| 1824 | } | ||
| 1825 | sdssetlen(s, sdslen(s) - 1); | ||
| 1826 | s[sdslen(s)] = '\0'; | ||
| 1827 | |||
| 1828 | return s; | ||
| 1829 | } | ||
| 1830 | |||
| 1831 | /* Parse a slot range string in the format "1000-2000 3000-4000 ..." into a slotRangeArray. | ||
| 1832 | * Returns a new slotRangeArray on success, NULL on failure. */ | ||
| 1833 | slotRangeArray *slotRangeArrayFromString(sds data) { | ||
| 1834 | int num_ranges; | ||
| 1835 | long long start, end; | ||
| 1836 | slotRangeArray *slots = NULL; | ||
| 1837 | if (!data || sdslen(data) == 0) return NULL; | ||
| 1838 | |||
| 1839 | sds *parts = sdssplitlen(data, sdslen(data), " ", 1, &num_ranges); | ||
| 1840 | if (num_ranges <= 0) goto err; | ||
| 1841 | |||
| 1842 | slots = slotRangeArrayCreate(num_ranges); | ||
| 1843 | |||
| 1844 | /* Parse each slot range */ | ||
| 1845 | for (int i = 0; i < num_ranges; i++) { | ||
| 1846 | char *dash = strchr(parts[i], '-'); | ||
| 1847 | if (!dash) goto err; | ||
| 1848 | |||
| 1849 | if (string2ll(parts[i], dash - parts[i], &start) == 0 || | ||
| 1850 | string2ll(dash + 1, sdslen(parts[i]) - (dash - parts[i]) - 1, &end) == 0) | ||
| 1851 | goto err; | ||
| 1852 | slotRangeArraySet(slots, i, start, end); | ||
| 1853 | } | ||
| 1854 | |||
| 1855 | /* Validate all ranges */ | ||
| 1856 | sds err_msg = NULL; | ||
| 1857 | if (slotRangeArrayNormalizeAndValidate(slots, &err_msg) != C_OK) { | ||
| 1858 | if (err_msg) sdsfree(err_msg); | ||
| 1859 | goto err; | ||
| 1860 | } | ||
| 1861 | sdsfreesplitres(parts, num_ranges); | ||
| 1862 | return slots; | ||
| 1863 | |||
| 1864 | err: | ||
| 1865 | if (slots) slotRangeArrayFree(slots); | ||
| 1866 | sdsfreesplitres(parts, num_ranges); | ||
| 1867 | return NULL; | ||
| 1868 | } | ||
| 1869 | |||
| 1870 | static int compareSlotRange(const void *a, const void *b) { | ||
| 1871 | const slotRange *sa = a; | ||
| 1872 | const slotRange *sb = b; | ||
| 1873 | if (sa->start < sb->start) return -1; | ||
| 1874 | if (sa->start > sb->start) return 1; | ||
| 1875 | return 0; | ||
| 1876 | } | ||
| 1877 | |||
| 1878 | /* Sort slot ranges by start slot and merge adjacent ranges. | ||
| 1879 | * Adjacent means: prev.end + 1 == next.start. | ||
| 1880 | * e.g. 1000-2000 2001-3000 0-100 => 0-100 1000-3000 | ||
| 1881 | * | ||
| 1882 | * Note: Overlapping ranges are not merged.*/ | ||
| 1883 | void slotRangeArraySortAndMerge(slotRangeArray *slots) { | ||
| 1884 | if (!slots || slots->num_ranges <= 1) return; | ||
| 1885 | |||
| 1886 | qsort(slots->ranges, slots->num_ranges, sizeof(slotRange), compareSlotRange); | ||
| 1887 | |||
| 1888 | int idx = 0; | ||
| 1889 | for (int i = 1; i < slots->num_ranges; i++) { | ||
| 1890 | if (slots->ranges[idx].end + 1 == slots->ranges[i].start) | ||
| 1891 | slots->ranges[idx].end = slots->ranges[i].end; | ||
| 1892 | else | ||
| 1893 | slots->ranges[++idx] = slots->ranges[i]; | ||
| 1894 | } | ||
| 1895 | slots->num_ranges = idx + 1; | ||
| 1896 | } | ||
| 1897 | |||
| 1898 | /* Compare two slot range arrays, return 1 if equal, 0 otherwise */ | ||
| 1899 | int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2) { | ||
| 1900 | slotRangeArraySortAndMerge(slots1); | ||
| 1901 | slotRangeArraySortAndMerge(slots2); | ||
| 1902 | |||
| 1903 | if (slots1->num_ranges != slots2->num_ranges) return 0; | ||
| 1904 | |||
| 1905 | for (int i = 0; i < slots1->num_ranges; i++) { | ||
| 1906 | if (slots1->ranges[i].start != slots2->ranges[i].start || | ||
| 1907 | slots1->ranges[i].end != slots2->ranges[i].end) { | ||
| 1908 | return 0; | ||
| 1909 | } | ||
| 1910 | } | ||
| 1911 | return 1; | ||
| 1912 | } | ||
| 1913 | |||
| 1914 | /* Add a slot to the slot range array. | ||
| 1915 | * Usage: | ||
| 1916 | * slotRangeArray *slots = NULL | ||
| 1917 | * slots = slotRangeArrayAppend(slots, 1000); | ||
| 1918 | * slots = slotRangeArrayAppend(slots, 1001); | ||
| 1919 | * slots = slotRangeArrayAppend(slots, 1003); | ||
| 1920 | * slots = slotRangeArrayAppend(slots, 1004); | ||
| 1921 | * slots = slotRangeArrayAppend(slots, 1005); | ||
| 1922 | * | ||
| 1923 | * Result: 1000-1001, 1003-1005 | ||
| 1924 | * Note: `slot` must be greater than the previous slot. | ||
| 1925 | * */ | ||
| 1926 | slotRangeArray *slotRangeArrayAppend(slotRangeArray *slots, int slot) { | ||
| 1927 | if (slots == NULL) { | ||
| 1928 | slots = slotRangeArrayCreate(4); | ||
| 1929 | slots->ranges[0].start = slot; | ||
| 1930 | slots->ranges[0].end = slot; | ||
| 1931 | slots->num_ranges = 1; | ||
| 1932 | return slots; | ||
| 1933 | } | ||
| 1934 | |||
| 1935 | serverAssert(slots->num_ranges >= 0 && slots->num_ranges <= CLUSTER_SLOTS); | ||
| 1936 | serverAssert(slot > slots->ranges[slots->num_ranges - 1].end); | ||
| 1937 | |||
| 1938 | /* Check if we can extend the last range */ | ||
| 1939 | slotRange *last = &slots->ranges[slots->num_ranges - 1]; | ||
| 1940 | if (slot == last->end + 1) { | ||
| 1941 | last->end = slot; | ||
| 1942 | return slots; | ||
| 1943 | } | ||
| 1944 | |||
| 1945 | /* Calculate current capacity and reallocate if needed */ | ||
| 1946 | int cap = (int) ((zmalloc_size(slots) - sizeof(slotRangeArray)) / sizeof(slotRange)); | ||
| 1947 | if (slots->num_ranges >= cap) | ||
| 1948 | slots = zrealloc(slots, sizeof(slotRangeArray) + sizeof(slotRange) * cap * 2); | ||
| 1949 | |||
| 1950 | /* Add new single-slot range */ | ||
| 1951 | slots->ranges[slots->num_ranges].start = slot; | ||
| 1952 | slots->ranges[slots->num_ranges].end = slot; | ||
| 1953 | slots->num_ranges++; | ||
| 1954 | |||
| 1955 | return slots; | ||
| 1956 | } | ||
| 1957 | |||
| 1958 | /* Returns 1 if the slot range array contains the given slot, 0 otherwise. */ | ||
| 1959 | int slotRangeArrayContains(slotRangeArray *slots, unsigned int slot) { | ||
| 1960 | for (int i = 0; i < slots->num_ranges; i++) | ||
| 1961 | if (slots->ranges[i].start <= slot && slots->ranges[i].end >= slot) | ||
| 1962 | return 1; | ||
| 1963 | return 0; | ||
| 1964 | } | ||
| 1965 | |||
| 1966 | /* Free the slot range array. */ | ||
| 1967 | void slotRangeArrayFree(slotRangeArray *slots) { | ||
| 1968 | zfree(slots); | ||
| 1969 | } | ||
| 1970 | |||
| 1971 | /* Generic version of slotRangeArrayFree(). */ | ||
| 1972 | void slotRangeArrayFreeGeneric(void *slots) { | ||
| 1973 | slotRangeArrayFree(slots); | ||
| 1974 | } | ||
| 1975 | |||
| 1976 | /* Slot range array iterator */ | ||
| 1977 | slotRangeArrayIter *slotRangeArrayGetIterator(slotRangeArray *slots) { | ||
| 1978 | slotRangeArrayIter *it = zmalloc(sizeof(*it)); | ||
| 1979 | it->slots = slots; | ||
| 1980 | it->range_index = 0; | ||
| 1981 | it->cur_slot = slots->num_ranges > 0 ? slots->ranges[0].start : -1; | ||
| 1982 | return it; | ||
| 1983 | } | ||
| 1984 | |||
| 1985 | /* Returns the next slot in the array, or -1 if there are no more slots. */ | ||
| 1986 | int slotRangeArrayNext(slotRangeArrayIter *it) { | ||
| 1987 | if (it->range_index >= it->slots->num_ranges) return -1; | ||
| 1988 | |||
| 1989 | if (it->cur_slot < it->slots->ranges[it->range_index].end) { | ||
| 1990 | it->cur_slot++; | ||
| 1991 | } else { | ||
| 1992 | it->range_index++; | ||
| 1993 | if (it->range_index < it->slots->num_ranges) | ||
| 1994 | it->cur_slot = it->slots->ranges[it->range_index].start; | ||
| 1995 | else | ||
| 1996 | it->cur_slot = -1; /* finished */ | ||
| 1997 | } | ||
| 1998 | return it->cur_slot; | ||
| 1999 | } | ||
| 2000 | |||
| 2001 | int slotRangeArrayGetCurrentSlot(slotRangeArrayIter *it) { | ||
| 2002 | return it->cur_slot; | ||
| 2003 | } | ||
| 2004 | |||
| 2005 | void slotRangeArrayIteratorFree(slotRangeArrayIter *it) { | ||
| 2006 | zfree(it); | ||
| 2007 | } | ||
| 2008 | |||
| 2009 | /* Parse slot range pairs from argv starting at `pos`. | ||
| 2010 | * `argc` is the argument count, `pos` is the first slot argument index. | ||
| 2011 | * Returns a slotRangeArray or NULL on error. */ | ||
| 2012 | slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos) { | ||
| 2013 | int start, end, count; | ||
| 2014 | slotRangeArray *slots; | ||
| 2015 | |||
| 2016 | /* Ensure there is at least one (start,end) slot range pairs. */ | ||
| 2017 | if (argc < 0 || pos < 0 || pos >= argc || (argc - pos) < 2 || ((argc - pos) % 2) != 0) { | ||
| 2018 | addReplyErrorArity(c); | ||
| 2019 | return NULL; | ||
| 2020 | } | ||
| 2021 | |||
| 2022 | count = (argc - pos) / 2; | ||
| 2023 | slots = slotRangeArrayCreate(count); | ||
| 2024 | slots->num_ranges = 0; | ||
| 2025 | |||
| 2026 | for (int j = pos; j < argc; j += 2) { | ||
| 2027 | if ((start = getSlotOrReply(c, c->argv[j])) == -1 || | ||
| 2028 | (end = getSlotOrReply(c, c->argv[j + 1])) == -1) | ||
| 2029 | { | ||
| 2030 | slotRangeArrayFree(slots); | ||
| 2031 | return NULL; | ||
| 2032 | } | ||
| 2033 | slotRangeArraySet(slots, slots->num_ranges, start, end); | ||
| 2034 | slots->num_ranges++; | ||
| 2035 | } | ||
| 2036 | |||
| 2037 | sds err = NULL; | ||
| 2038 | if (slotRangeArrayNormalizeAndValidate(slots, &err) != C_OK) { | ||
| 2039 | addReplyErrorSds(c, err); | ||
| 2040 | slotRangeArrayFree(slots); | ||
| 2041 | return NULL; | ||
| 2042 | } | ||
| 2043 | return slots; | ||
| 2044 | } | ||
| 2045 | |||
| 2046 | /* Return 1 if the keys in the slot can be accessed, 0 otherwise. */ | ||
| 2047 | int clusterCanAccessKeysInSlot(int slot) { | ||
| 2048 | /* If not in cluster mode, all keys are accessible */ | ||
| 2049 | if (server.cluster_enabled == 0) return 1; | ||
| 2050 | |||
| 2051 | /* If the slot is being imported under old slot migration approach, we should | ||
| 2052 | * allow to list keys from the slot as previously. */ | ||
| 2053 | if (getImportingSlotSource(slot)) return 1; | ||
| 2054 | |||
| 2055 | /* If using atomic slot migration, check if the slot belongs to the current | ||
| 2056 | * node or its master, return 1 if so. */ | ||
| 2057 | clusterNode *myself = getMyClusterNode(); | ||
| 2058 | if (clusterNodeIsSlave(myself)) { | ||
| 2059 | clusterNode *master = clusterNodeGetMaster(myself); | ||
| 2060 | if (master && clusterNodeCoversSlot(master, slot)) | ||
| 2061 | return 1; | ||
| 2062 | } else { | ||
| 2063 | if (clusterNodeCoversSlot(myself, slot)) | ||
| 2064 | return 1; | ||
| 2065 | } | ||
| 2066 | return 0; | ||
| 2067 | } | ||
| 2068 | |||
| 2069 | /* Return the slot ranges that belong to the current node or its master. */ | ||
| 2070 | slotRangeArray *clusterGetLocalSlotRanges(void) { | ||
| 2071 | slotRangeArray *slots = NULL; | ||
| 2072 | |||
| 2073 | if (!server.cluster_enabled) { | ||
| 2074 | slots = slotRangeArrayCreate(1); | ||
| 2075 | slotRangeArraySet(slots, 0, 0, CLUSTER_SLOTS - 1); | ||
| 2076 | return slots; | ||
| 2077 | } | ||
| 2078 | |||
| 2079 | clusterNode *master = clusterNodeGetMaster(getMyClusterNode()); | ||
| 2080 | if (master) { | ||
| 2081 | for (int i = 0; i < CLUSTER_SLOTS; i++) { | ||
| 2082 | if (clusterNodeCoversSlot(master, i)) | ||
| 2083 | slots = slotRangeArrayAppend(slots, i); | ||
| 2084 | } | ||
| 2085 | } | ||
| 2086 | return slots ? slots : slotRangeArrayCreate(0); | ||
| 2087 | } | ||
| 2088 | |||
| 2089 | /* Partially flush destination DB in a cluster node, based on the slot range. | ||
| 2090 | * | ||
| 2091 | * Usage: SFLUSH <start-slot> <end slot> [<start-slot> <end slot>]* [SYNC|ASYNC] | ||
| 2092 | * | ||
| 2093 | * This is an initial implementation of SFLUSH (slots flush) which is limited to | ||
| 2094 | * flushing a single shard as a whole, but in the future the same command may be | ||
| 2095 | * used to partially flush a shard based on hash slots. Currently only if provided | ||
| 2096 | * slots cover entirely the slots of a node, the node will be flushed and the | ||
| 2097 | * return value will be pairs of slot ranges. Otherwise, a single empty set will | ||
| 2098 | * be returned. If possible, SFLUSH SYNC will be run as blocking ASYNC as an | ||
| 2099 | * optimization. | ||
| 2100 | */ | ||
| 2101 | void sflushCommand(client *c) { | ||
| 2102 | int flags = EMPTYDB_NO_FLAGS, argc = c->argc; | ||
| 2103 | |||
| 2104 | if (server.cluster_enabled == 0) { | ||
| 2105 | addReplyError(c,"This instance has cluster support disabled"); | ||
| 2106 | return; | ||
| 2107 | } | ||
| 2108 | |||
| 2109 | /* check if last argument is SYNC or ASYNC */ | ||
| 2110 | if (!strcasecmp(c->argv[c->argc-1]->ptr,"sync")) { | ||
| 2111 | flags = EMPTYDB_NO_FLAGS; | ||
| 2112 | argc--; | ||
| 2113 | } else if (!strcasecmp(c->argv[c->argc-1]->ptr,"async")) { | ||
| 2114 | flags = EMPTYDB_ASYNC; | ||
| 2115 | argc--; | ||
| 2116 | } else if (server.lazyfree_lazy_user_flush) { | ||
| 2117 | flags = EMPTYDB_ASYNC; | ||
| 2118 | } | ||
| 2119 | |||
| 2120 | /* parse the slot range */ | ||
| 2121 | if (argc % 2 == 0) { | ||
| 2122 | addReplyErrorArity(c); | ||
| 2123 | return; | ||
| 2124 | } | ||
| 2125 | |||
| 2126 | /* Parse slot ranges from the command arguments. */ | ||
| 2127 | slotRangeArray *slots = parseSlotRangesOrReply(c, argc, 1); | ||
| 2128 | if (!slots) return; | ||
| 2129 | |||
| 2130 | /* Iterate and find the slot ranges that belong to this node. Save them in | ||
| 2131 | * a new slotRangeArray. It is allocated on heap since there is a chance | ||
| 2132 | * that FLUSH SYNC will be running as blocking ASYNC and only later reply | ||
| 2133 | * with slot ranges */ | ||
| 2134 | unsigned char slots_to_flush[CLUSTER_SLOTS] = {0}; /* Requested slots to flush */ | ||
| 2135 | slotRangeArray *myslots = NULL; | ||
| 2136 | for (int i = 0; i < slots->num_ranges; i++) { | ||
| 2137 | for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { | ||
| 2138 | if (clusterIsMySlot(j)) { | ||
| 2139 | myslots = slotRangeArrayAppend(myslots, j); | ||
| 2140 | slots_to_flush[j] = 1; | ||
| 2141 | } | ||
| 2142 | } | ||
| 2143 | } | ||
| 2144 | |||
| 2145 | /* Verify that all slots of mynode got covered. See sflushCommand() comment. */ | ||
| 2146 | int all_slots_covered = 1; | ||
| 2147 | for (int i = 0; i < CLUSTER_SLOTS; i++) { | ||
| 2148 | if (clusterIsMySlot(i) && !slots_to_flush[i]) { | ||
| 2149 | all_slots_covered = 0; | ||
| 2150 | break; | ||
| 2151 | } | ||
| 2152 | } | ||
| 2153 | if (myslots == NULL || !all_slots_covered) { | ||
| 2154 | addReplyArrayLen(c, 0); | ||
| 2155 | slotRangeArrayFree(slots); | ||
| 2156 | slotRangeArrayFree(myslots); | ||
| 2157 | return; | ||
| 2158 | } | ||
| 2159 | slotRangeArrayFree(slots); | ||
| 2160 | |||
| 2161 | /* Flush selected slots. If not flush as blocking async, then reply immediately */ | ||
| 2162 | if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, myslots) == 0) | ||
| 2163 | replySlotsFlushAndFree(c, myslots); | ||
| 2164 | } | ||
| 2165 | |||
| 2166 | /* The READWRITE command just clears the READONLY command state. */ | ||
| 2167 | void readwriteCommand(client *c) { | ||
| 2168 | if (server.cluster_enabled == 0) { | ||
| 2169 | addReplyError(c,"This instance has cluster support disabled"); | ||
| 2170 | return; | ||
| 2171 | } | ||
| 2172 | c->flags &= ~CLIENT_READONLY; | ||
| 2173 | addReply(c,shared.ok); | ||
| 2174 | } | ||
| 2175 | |||
| 2176 | /* Resets transient cluster stats that we expose via INFO or other means that we want | ||
| 2177 | * to reset via CONFIG RESETSTAT. The function is also used in order to | ||
| 2178 | * initialize these fields in clusterInit() at server startup. */ | ||
| 2179 | void resetClusterStats(void) { | ||
| 2180 | if (!server.cluster_enabled) return; | ||
| 2181 | |||
| 2182 | clusterSlotStatResetAll(); | ||
| 2183 | } | ||
| 2184 | |||
| 2185 | /* This function is called at server startup in order to initialize cluster data | ||
| 2186 | * structures that are shared between the different cluster implementations. */ | ||
| 2187 | void clusterCommonInit(void) { | ||
| 2188 | resetClusterStats(); | ||
| 2189 | asmInit(); | ||
| 2190 | } | ||
| 2191 | |||
| 2192 | /* This function is called after the node startup in order to check if there | ||
| 2193 | * are any slots that we have keys for, but are not assigned to us. If so, | ||
| 2194 | * we delete the keys. */ | ||
| 2195 | void clusterDeleteKeysInUnownedSlots(void) { | ||
| 2196 | if (clusterNodeIsSlave(getMyClusterNode())) return; | ||
| 2197 | |||
| 2198 | /* Check that all the slots we have keys for are assigned to us. Otherwise, | ||
| 2199 | * delete the keys. */ | ||
| 2200 | for (int i = 0; i < CLUSTER_SLOTS; i++) { | ||
| 2201 | /* Skip if: no keys in the slot, it's our slot, or we are importing it. */ | ||
| 2202 | if (!countKeysInSlot(i) || | ||
| 2203 | clusterIsMySlot(i) || | ||
| 2204 | getImportingSlotSource(i)) | ||
| 2205 | { | ||
| 2206 | continue; | ||
| 2207 | } | ||
| 2208 | |||
| 2209 | serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is " | ||
| 2210 | "assigned to another node. " | ||
| 2211 | "Deleting keys in the slot.", i); | ||
| 2212 | /* With atomic slot migration, it is safe to drop keys from slots | ||
| 2213 | * that are not owned. This will not result in data loss under the | ||
| 2214 | * legacy slot migration approach either, since the importing state | ||
| 2215 | * has already been persisted in node.conf. */ | ||
| 2216 | clusterDelKeysInSlot(i, 0); | ||
| 2217 | } | ||
| 2218 | } | ||
| 2219 | |||
| 2220 | |||
| 2221 | /* This function is called after the node startup in order to verify that data | ||
| 2222 | * loaded from disk is in agreement with the cluster configuration: | ||
| 2223 | * | ||
| 2224 | * 1) If we find keys about hash slots we have no responsibility for, the | ||
| 2225 | * following happens: | ||
| 2226 | * A) If no other node is in charge according to the current cluster | ||
| 2227 | * configuration, we add these slots to our node. | ||
| 2228 | * B) If according to our config other nodes are already in charge for | ||
| 2229 | * this slots, we set the slots as IMPORTING from our point of view | ||
| 2230 | * in order to justify we have those slots, and in order to make | ||
| 2231 | * redis-cli aware of the issue, so that it can try to fix it. | ||
| 2232 | * 2) If we find data in a DB different than DB0 we return C_ERR to | ||
| 2233 | * signal the caller it should quit the server with an error message | ||
| 2234 | * or take other actions. | ||
| 2235 | * | ||
| 2236 | * The function always returns C_OK even if it will try to correct | ||
| 2237 | * the error described in "1". However if data is found in DB different | ||
| 2238 | * from DB0, C_ERR is returned. | ||
| 2239 | * | ||
| 2240 | * The function also uses the logging facility in order to warn the user | ||
| 2241 | * about desynchronizations between the data we have in memory and the | ||
| 2242 | * cluster configuration. */ | ||
| 2243 | int verifyClusterConfigWithData(void) { | ||
| 2244 | /* Return ASAP if a module disabled cluster redirections. In that case | ||
| 2245 | * every master can store keys about every possible hash slot. */ | ||
| 2246 | if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION) | ||
| 2247 | return C_OK; | ||
| 2248 | |||
| 2249 | /* If this node is a slave, don't perform the check at all as we | ||
| 2250 | * completely depend on the replication stream. */ | ||
| 2251 | if (clusterNodeIsSlave(getMyClusterNode())) return C_OK; | ||
| 2252 | |||
| 2253 | /* Make sure we only have keys in DB0. */ | ||
| 2254 | for (int i = 1; i < server.dbnum; i++) { | ||
| 2255 | if (kvstoreSize(server.db[i].keys)) return C_ERR; | ||
| 2256 | } | ||
| 2257 | |||
| 2258 | /* Take over slots that we have keys for, but are assigned to no one. */ | ||
| 2259 | clusterClaimUnassignedSlots(); | ||
| 2260 | /* Delete keys in unowned slots */ | ||
| 2261 | clusterDeleteKeysInUnownedSlots(); | ||
| 2262 | return C_OK; | ||
| 2263 | } | ||
