aboutsummaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/cluster.c
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:52:54 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:52:54 +0100
commitdcacc00e3750300617ba6e16eb346713f91a783a (patch)
tree38e2d4fb5ed9d119711d4295c6eda4b014af73fd /examples/redis-unstable/src/cluster.c
parent58dac10aeb8f5a041c46bddbeaf4c7966a99b998 (diff)
downloadcrep-dcacc00e3750300617ba6e16eb346713f91a783a.tar.gz
Remove testing data
Diffstat (limited to 'examples/redis-unstable/src/cluster.c')
-rw-r--r--examples/redis-unstable/src/cluster.c2263
1 files changed, 0 insertions, 2263 deletions
diff --git a/examples/redis-unstable/src/cluster.c b/examples/redis-unstable/src/cluster.c
deleted file mode 100644
index d07c31c..0000000
--- a/examples/redis-unstable/src/cluster.c
+++ /dev/null
@@ -1,2263 +0,0 @@
1/*
2 * Copyright (c) 2009-Present, Redis Ltd.
3 * All rights reserved.
4 *
5 * Copyright (c) 2024-present, Valkey contributors.
6 * All rights reserved.
7 *
8 * Licensed under your choice of (a) the Redis Source Available License 2.0
9 * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
10 * GNU Affero General Public License v3 (AGPLv3).
11 *
12 * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
13 */
14
15/*
16 * cluster.c contains the common parts of a clustering
17 * implementation, the parts that are shared between
18 * any implementation of Redis clustering.
19 */
20
21#include "server.h"
22#include "cluster.h"
23#include "cluster_asm.h"
24#include "cluster_slot_stats.h"
25
26#include <ctype.h>
27
28/* -----------------------------------------------------------------------------
29 * Key space handling
30 * -------------------------------------------------------------------------- */
31
32/* If it can be inferred that the given glob-style pattern, as implemented in
33 * stringmatchlen() in util.c, only can match keys belonging to a single slot,
34 * that slot is returned. Otherwise -1 is returned. */
35int patternHashSlot(char *pattern, int length) {
36 int s = -1; /* index of the first '{' */
37
38 for (int i = 0; i < length; i++) {
39 if (pattern[i] == '*' || pattern[i] == '?' || pattern[i] == '[') {
40 /* Wildcard or character class found. Keys can be in any slot. */
41 return -1;
42 } else if (pattern[i] == '\\') {
43 /* Escaped character. Computing slot in this case is not
44 * implemented. We would need a temp buffer. */
45 return -1;
46 } else if (s == -1 && pattern[i] == '{') {
47 /* Opening brace '{' found. */
48 s = i;
49 } else if (s >= 0 && pattern[i] == '}' && i == s + 1) {
50 /* Empty tag '{}' found. The whole key is hashed. Ignore braces. */
51 s = -2;
52 } else if (s >= 0 && pattern[i] == '}') {
53 /* Non-empty tag '{...}' found. Hash what's between braces. */
54 return crc16(pattern + s + 1, i - s - 1) & 0x3FFF;
55 }
56 }
57
58 /* The pattern matches a single key. Hash the whole pattern. */
59 return crc16(pattern, length) & 0x3FFF;
60}
61
62int getSlotOrReply(client *c, robj *o) {
63 long long slot;
64
65 if (getLongLongFromObject(o,&slot) != C_OK ||
66 slot < 0 || slot >= CLUSTER_SLOTS)
67 {
68 addReplyError(c,"Invalid or out of range slot");
69 return -1;
70 }
71 return (int) slot;
72}
73
74ConnectionType *connTypeOfCluster(void) {
75 if (server.tls_cluster) {
76 return connectionTypeTls();
77 }
78
79 return connectionTypeTcp();
80}
81
82/* -----------------------------------------------------------------------------
83 * DUMP, RESTORE and MIGRATE commands
84 * -------------------------------------------------------------------------- */
85
86/* Generates a DUMP-format representation of the object 'o', adding it to the
87 * io stream pointed by 'rio'. This function can't fail. */
88void createDumpPayload(rio *payload, robj *o, robj *key, int dbid, int skip_checksum) {
89 unsigned char buf[2];
90 uint64_t crc = 0;
91
92 /* Serialize the object in an RDB-like format. It consist of an object type
93 * byte followed by the serialized object. This is understood by RESTORE. */
94 rioInitWithBuffer(payload,sdsempty());
95
96 /* Save key metadata if present without (handles TTL separately via command args) */
97 if (getModuleMetaBits(o->metabits))
98 serverAssert(rdbSaveKeyMetadata(payload, key, o, dbid) != -1);
99 serverAssert(rdbSaveObjectType(payload,o));
100 serverAssert(rdbSaveObject(payload,o,key,dbid));
101
102 /* Write the footer, this is how it looks like:
103 * ----------------+---------------------+---------------+
104 * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
105 * ----------------+---------------------+---------------+
106 * RDB version and CRC are both in little endian.
107 */
108
109 /* RDB version */
110 buf[0] = RDB_VERSION & 0xff;
111 buf[1] = (RDB_VERSION >> 8) & 0xff;
112 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,buf,2);
113
114 /* If crc checksum is disabled, crc is set to 0 and no checksum validation
115 * will be performed on RESTORE. */
116 if (!skip_checksum) {
117 /* CRC64 */
118 crc = crc64(0,(unsigned char*)payload->io.buffer.ptr,
119 sdslen(payload->io.buffer.ptr));
120 memrev64ifbe(&crc);
121 }
122 payload->io.buffer.ptr = sdscatlen(payload->io.buffer.ptr,&crc,8);
123}
124
125/* Verify that the RDB version of the dump payload matches the one of this Redis
126 * instance and that the checksum is ok.
127 * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
128 * is returned. If rdbver_ptr is not NULL, its populated with the value read
129 * from the input buffer. */
130int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) {
131 unsigned char *footer;
132 uint16_t rdbver;
133 uint64_t crc;
134
135 /* At least 2 bytes of RDB version and 8 of CRC64 should be present. */
136 if (len < 10) return C_ERR;
137 footer = p+(len-10);
138
139 /* Set and verify RDB version. */
140 rdbver = (footer[1] << 8) | footer[0];
141 if (rdbver_ptr) {
142 *rdbver_ptr = rdbver;
143 }
144 if (rdbver > RDB_VERSION) return C_ERR;
145
146 if (server.skip_checksum_validation)
147 return C_OK;
148
149 uint64_t crc_payload;
150 memcpy(&crc_payload, footer+2, 8);
151 if (crc_payload == 0) /* No checksum. */
152 return C_OK;
153
154 /* Verify CRC64 */
155 crc = crc64(0,p,len-8);
156 memrev64ifbe(&crc);
157 return crc == crc_payload ? C_OK : C_ERR;
158}
159
160/* DUMP keyname
161 * DUMP is actually not used by Redis Cluster but it is the obvious
162 * complement of RESTORE and can be useful for different applications. */
163void dumpCommand(client *c) {
164 kvobj *o;
165 rio payload;
166
167 /* Check if the key is here. */
168 if ((o = lookupKeyRead(c->db,c->argv[1])) == NULL) {
169 addReplyNull(c);
170 return;
171 }
172
173 /* Create the DUMP encoded representation. */
174 createDumpPayload(&payload,o,c->argv[1],c->db->id,0);
175
176 /* Transfer to the client */
177 addReplyBulkSds(c,payload.io.buffer.ptr);
178 return;
179}
180
181/* RESTORE key ttl serialized-value [REPLACE] [ABSTTL] [IDLETIME seconds] [FREQ frequency] */
182void restoreCommand(client *c) {
183 long long ttl, lfu_freq = -1, lru_idle = -1, lru_clock = -1;
184 rio payload;
185 int j, type, replace = 0, absttl = 0;
186 robj *obj;
187
188 /* Parse additional options */
189 for (j = 4; j < c->argc; j++) {
190 int additional = c->argc-j-1;
191 if (!strcasecmp(c->argv[j]->ptr,"replace")) {
192 replace = 1;
193 } else if (!strcasecmp(c->argv[j]->ptr,"absttl")) {
194 absttl = 1;
195 } else if (!strcasecmp(c->argv[j]->ptr,"idletime") && additional >= 1 &&
196 lfu_freq == -1)
197 {
198 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lru_idle,NULL)
199 != C_OK) return;
200 if (lru_idle < 0) {
201 addReplyError(c,"Invalid IDLETIME value, must be >= 0");
202 return;
203 }
204 lru_clock = LRU_CLOCK();
205 j++; /* Consume additional arg. */
206 } else if (!strcasecmp(c->argv[j]->ptr,"freq") && additional >= 1 &&
207 lru_idle == -1)
208 {
209 if (getLongLongFromObjectOrReply(c,c->argv[j+1],&lfu_freq,NULL)
210 != C_OK) return;
211 if (lfu_freq < 0 || lfu_freq > 255) {
212 addReplyError(c,"Invalid FREQ value, must be >= 0 and <= 255");
213 return;
214 }
215 j++; /* Consume additional arg. */
216 } else {
217 addReplyErrorObject(c,shared.syntaxerr);
218 return;
219 }
220 }
221
222 /* Make sure this key does not already exist here... */
223 robj *key = c->argv[1];
224 kvobj *oldval = lookupKeyWrite(c->db,key);
225 int oldtype = oldval ? oldval->type : -1;
226 if (!replace && oldval) {
227 addReplyErrorObject(c,shared.busykeyerr);
228 return;
229 }
230
231 /* Check if the TTL value makes sense */
232 if (getLongLongFromObjectOrReply(c,c->argv[2],&ttl,NULL) != C_OK) {
233 return;
234 } else if (ttl < 0) {
235 addReplyError(c,"Invalid TTL value, must be >= 0");
236 return;
237 }
238
239 /* Verify RDB version and data checksum. */
240 if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr),NULL) == C_ERR)
241 {
242 addReplyError(c,"DUMP payload version or checksum are wrong");
243 return;
244 }
245
246 rioInitWithBuffer(&payload,c->argv[3]->ptr);
247
248 /* Initialize metadata spec to collect metadata+expiry from payload. */
249 KeyMetaSpec keymeta;
250 keyMetaSpecInit(&keymeta);
251
252 /* Compute TTL early so we can add it to metadata spec in correct order */
253 if (ttl) {
254 if (!absttl) ttl+=commandTimeSnapshot();
255 keyMetaSpecAdd(&keymeta, KEY_META_ID_EXPIRE, ttl);
256 }
257
258 /* With metadata, type = RDB_OPCODE_KEY_META. Layout: [<META>,]<TYPE>,<KEY>,<VALUE> */
259 type = rdbLoadType(&payload);
260 if (rdbResolveKeyType(&payload, &type, c->db->id, &keymeta) == -1) {
261 addReplyError(c,"Bad data format");
262 return;
263 }
264
265 /* Load the object */
266 if ((obj = rdbLoadObject(type,&payload,key->ptr,c->db->id,NULL)) == NULL)
267 {
268 keyMetaSpecCleanup(&keymeta);
269 addReplyError(c,"Bad data format");
270 return;
271 }
272
273 /* Remove the old key if needed. */
274 int deleted = 0;
275 if (replace)
276 deleted = dbDelete(c->db,key);
277
278 if (ttl && checkAlreadyExpired(ttl)) {
279 if (deleted) {
280 robj *aux = server.lazyfree_lazy_server_del ? shared.unlink : shared.del;
281 rewriteClientCommandVector(c, 2, aux, key);
282 keyModified(c,c->db,key,NULL,1);
283 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
284 server.dirty++;
285 }
286 keyMetaSpecCleanup(&keymeta);
287 decrRefCount(obj);
288 addReply(c, shared.ok);
289 return;
290 }
291
292 /* Create the key and set the TTL if any */
293 kvobj *kv = dbAddInternal(c->db, key, &obj, NULL, &keymeta);
294
295 /* If minExpiredField was set, then the object is hash with expiration
296 * on fields and need to register it in global HFE DS */
297 if (kv->type == OBJ_HASH) {
298 uint64_t minExpiredField = hashTypeGetMinExpire(kv, 1);
299 if (minExpiredField != EB_EXPIRE_TIME_INVALID)
300 estoreAdd(c->db->subexpires, getKeySlot(key->ptr), kv, minExpiredField);
301 }
302
303 if (ttl) {
304 if (!absttl) {
305 /* Propagate TTL as absolute timestamp */
306 robj *ttl_obj = createStringObjectFromLongLong(ttl);
307 rewriteClientCommandArgument(c,2,ttl_obj);
308 decrRefCount(ttl_obj);
309 rewriteClientCommandArgument(c,c->argc,shared.absttl);
310 }
311 }
312 objectSetLRUOrLFU(kv, lfu_freq, lru_idle, lru_clock, 1000);
313 keyModified(c,c->db,key,NULL,1);
314 notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
315
316 /* If we deleted a key that means REPLACE parameter was passed and the
317 * destination key existed. */
318 if (deleted) {
319 notifyKeyspaceEvent(NOTIFY_OVERWRITTEN, "overwritten", key, c->db->id);
320 if (oldtype != kv->type) {
321 notifyKeyspaceEvent(NOTIFY_TYPE_CHANGED, "type_changed", key, c->db->id);
322 }
323 }
324 addReply(c,shared.ok);
325 server.dirty++;
326}
327/* MIGRATE socket cache implementation.
328 *
329 * We take a map between host:ip and a TCP socket that we used to connect
330 * to this instance in recent time.
331 * This sockets are closed when the max number we cache is reached, and also
332 * in serverCron() when they are around for more than a few seconds. */
333#define MIGRATE_SOCKET_CACHE_ITEMS 64 /* max num of items in the cache. */
334#define MIGRATE_SOCKET_CACHE_TTL 10 /* close cached sockets after 10 sec. */
335
336typedef struct migrateCachedSocket {
337 connection *conn;
338 long last_dbid;
339 time_t last_use_time;
340} migrateCachedSocket;
341
342/* Return a migrateCachedSocket containing a TCP socket connected with the
343 * target instance, possibly returning a cached one.
344 *
345 * This function is responsible of sending errors to the client if a
346 * connection can't be established. In this case -1 is returned.
347 * Otherwise on success the socket is returned, and the caller should not
348 * attempt to free it after usage.
349 *
350 * If the caller detects an error while using the socket, migrateCloseSocket()
351 * should be called so that the connection will be created from scratch
352 * the next time. */
353migrateCachedSocket* migrateGetSocket(client *c, robj *host, robj *port, long timeout) {
354 connection *conn;
355 sds name = sdsempty();
356 migrateCachedSocket *cs;
357
358 /* Check if we have an already cached socket for this ip:port pair. */
359 name = sdscatlen(name,host->ptr,sdslen(host->ptr));
360 name = sdscatlen(name,":",1);
361 name = sdscatlen(name,port->ptr,sdslen(port->ptr));
362 cs = dictFetchValue(server.migrate_cached_sockets,name);
363 if (cs) {
364 sdsfree(name);
365 cs->last_use_time = server.unixtime;
366 return cs;
367 }
368
369 /* No cached socket, create one. */
370 if (dictSize(server.migrate_cached_sockets) == MIGRATE_SOCKET_CACHE_ITEMS) {
371 /* Too many items, drop one at random. */
372 dictEntry *de = dictGetRandomKey(server.migrate_cached_sockets);
373 cs = dictGetVal(de);
374 connClose(cs->conn);
375 zfree(cs);
376 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
377 }
378
379 /* Create the connection */
380 conn = connCreate(server.el, connTypeOfCluster());
381 if (connBlockingConnect(conn, host->ptr, atoi(port->ptr), timeout)
382 != C_OK) {
383 addReplyError(c,"-IOERR error or timeout connecting to the client");
384 connClose(conn);
385 sdsfree(name);
386 return NULL;
387 }
388 connEnableTcpNoDelay(conn);
389
390 /* Add to the cache and return it to the caller. */
391 cs = zmalloc(sizeof(*cs));
392 cs->conn = conn;
393
394 cs->last_dbid = -1;
395 cs->last_use_time = server.unixtime;
396 dictAdd(server.migrate_cached_sockets,name,cs);
397 return cs;
398}
399
400/* Free a migrate cached connection. */
401void migrateCloseSocket(robj *host, robj *port) {
402 sds name = sdsempty();
403 migrateCachedSocket *cs;
404
405 name = sdscatlen(name,host->ptr,sdslen(host->ptr));
406 name = sdscatlen(name,":",1);
407 name = sdscatlen(name,port->ptr,sdslen(port->ptr));
408 cs = dictFetchValue(server.migrate_cached_sockets,name);
409 if (!cs) {
410 sdsfree(name);
411 return;
412 }
413
414 connClose(cs->conn);
415 zfree(cs);
416 dictDelete(server.migrate_cached_sockets,name);
417 sdsfree(name);
418}
419
420void migrateCloseTimedoutSockets(void) {
421 dictIterator di;
422 dictEntry *de;
423
424 dictInitSafeIterator(&di, server.migrate_cached_sockets);
425 while((de = dictNext(&di)) != NULL) {
426 migrateCachedSocket *cs = dictGetVal(de);
427
428 if ((server.unixtime - cs->last_use_time) > MIGRATE_SOCKET_CACHE_TTL) {
429 connClose(cs->conn);
430 zfree(cs);
431 dictDelete(server.migrate_cached_sockets,dictGetKey(de));
432 }
433 }
434 dictResetIterator(&di);
435}
436
437/* MIGRATE host port key dbid timeout [COPY | REPLACE | AUTH password |
438 * AUTH2 username password]
439 *
440 * On in the multiple keys form:
441 *
442 * MIGRATE host port "" dbid timeout [COPY | REPLACE | AUTH password |
443 * AUTH2 username password] KEYS key1 key2 ... keyN */
444void migrateCommand(client *c) {
445 migrateCachedSocket *cs;
446 int copy = 0, replace = 0, j;
447 char *username = NULL;
448 char *password = NULL;
449 long timeout;
450 long dbid;
451 robj **kvArray = NULL; /* Objects to migrate. */
452 robj **keyArray = NULL; /* Key names. */
453 robj **newargv = NULL; /* Used to rewrite the command as DEL ... keys ... */
454 rio cmd, payload;
455 int may_retry = 1;
456 int write_error = 0;
457 int argv_rewritten = 0;
458
459 /* To support the KEYS option we need the following additional state. */
460 int first_key = 3; /* Argument index of the first key. */
461 int num_keys = 1; /* By default only migrate the 'key' argument. */
462
463 /* Parse additional options */
464 for (j = 6; j < c->argc; j++) {
465 int moreargs = (c->argc-1) - j;
466 if (!strcasecmp(c->argv[j]->ptr,"copy")) {
467 copy = 1;
468 } else if (!strcasecmp(c->argv[j]->ptr,"replace")) {
469 replace = 1;
470 } else if (!strcasecmp(c->argv[j]->ptr,"auth")) {
471 if (!moreargs) {
472 addReplyErrorObject(c,shared.syntaxerr);
473 return;
474 }
475 j++;
476 password = c->argv[j]->ptr;
477 redactClientCommandArgument(c,j);
478 } else if (!strcasecmp(c->argv[j]->ptr,"auth2")) {
479 if (moreargs < 2) {
480 addReplyErrorObject(c,shared.syntaxerr);
481 return;
482 }
483 username = c->argv[++j]->ptr;
484 redactClientCommandArgument(c,j);
485 password = c->argv[++j]->ptr;
486 redactClientCommandArgument(c,j);
487 } else if (!strcasecmp(c->argv[j]->ptr,"keys")) {
488 if (sdslen(c->argv[3]->ptr) != 0) {
489 addReplyError(c,
490 "When using MIGRATE KEYS option, the key argument"
491 " must be set to the empty string");
492 return;
493 }
494 first_key = j+1;
495 num_keys = c->argc - j - 1;
496 break; /* All the remaining args are keys. */
497 } else {
498 addReplyErrorObject(c,shared.syntaxerr);
499 return;
500 }
501 }
502
503 /* Sanity check */
504 if (getLongFromObjectOrReply(c,c->argv[5],&timeout,NULL) != C_OK ||
505 getLongFromObjectOrReply(c,c->argv[4],&dbid,NULL) != C_OK)
506 {
507 return;
508 }
509 if (timeout <= 0) timeout = 1000;
510
511 /* Check if the keys are here. If at least one key is to migrate, do it
512 * otherwise if all the keys are missing reply with "NOKEY" to signal
513 * the caller there was nothing to migrate. We don't return an error in
514 * this case, since often this is due to a normal condition like the key
515 * expiring in the meantime. */
516 kvArray = zrealloc(kvArray,sizeof(kvobj*)*num_keys);
517 keyArray = zrealloc(keyArray,sizeof(robj*)*num_keys);
518 int num_exists = 0;
519
520 for (j = 0; j < num_keys; j++) {
521 if ((kvArray[num_exists] = lookupKeyRead(c->db,c->argv[first_key+j])) != NULL) {
522 keyArray[num_exists] = c->argv[first_key+j];
523 num_exists++;
524 }
525 }
526 num_keys = num_exists;
527 if (num_keys == 0) {
528 zfree(kvArray); zfree(keyArray);
529 addReplySds(c,sdsnew("+NOKEY\r\n"));
530 return;
531 }
532
533 try_again:
534 write_error = 0;
535
536 /* Connect */
537 cs = migrateGetSocket(c,c->argv[1],c->argv[2],timeout);
538 if (cs == NULL) {
539 zfree(kvArray); zfree(keyArray);
540 return; /* error sent to the client by migrateGetSocket() */
541 }
542
543 rioInitWithBuffer(&cmd,sdsempty());
544
545 /* Authentication */
546 if (password) {
547 int arity = username ? 3 : 2;
548 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',arity));
549 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"AUTH",4));
550 if (username) {
551 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,username,
552 sdslen(username)));
553 }
554 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,password,
555 sdslen(password)));
556 }
557
558 /* Send the SELECT command if the current DB is not already selected. */
559 int select = cs->last_dbid != dbid; /* Should we emit SELECT? */
560 if (select) {
561 serverAssertWithInfo(c,NULL,rioWriteBulkCount(&cmd,'*',2));
562 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"SELECT",6));
563 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,dbid));
564 }
565
566 int non_expired = 0; /* Number of keys that we'll find non expired.
567 Note that serializing large keys may take some time
568 so certain keys that were found non expired by the
569 lookupKey() function, may be expired later. */
570
571 /* Create RESTORE payload and generate the protocol to call the command. */
572 for (j = 0; j < num_keys; j++) {
573 long long ttl = 0;
574 long long expireat = kvobjGetExpire(kvArray[j]);
575
576 if (expireat != -1) {
577 ttl = expireat-commandTimeSnapshot();
578 if (ttl < 0) {
579 continue;
580 }
581 if (ttl < 1) ttl = 1;
582 }
583
584 /* Relocate valid (non expired) keys and values into the array in successive
585 * positions to remove holes created by the keys that were present
586 * in the first lookup but are now expired after the second lookup. */
587 kvArray[non_expired] = kvArray[j];
588 keyArray[non_expired++] = keyArray[j];
589
590 serverAssertWithInfo(c,NULL,
591 rioWriteBulkCount(&cmd,'*',replace ? 5 : 4));
592
593 if (server.cluster_enabled)
594 serverAssertWithInfo(c,NULL,
595 rioWriteBulkString(&cmd,"RESTORE-ASKING",14));
596 else
597 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"RESTORE",7));
598 serverAssertWithInfo(c,NULL,sdsEncodedObject(keyArray[j]));
599 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,keyArray[j]->ptr,
600 sdslen(keyArray[j]->ptr)));
601 serverAssertWithInfo(c,NULL,rioWriteBulkLongLong(&cmd,ttl));
602
603 /* Emit the payload argument, that is the serialized object using
604 * the DUMP format. */
605 createDumpPayload(&payload,kvArray[j],keyArray[j],dbid,0);
606 serverAssertWithInfo(c,NULL,
607 rioWriteBulkString(&cmd,payload.io.buffer.ptr,
608 sdslen(payload.io.buffer.ptr)));
609 sdsfree(payload.io.buffer.ptr);
610
611 /* Add the REPLACE option to the RESTORE command if it was specified
612 * as a MIGRATE option. */
613 if (replace)
614 serverAssertWithInfo(c,NULL,rioWriteBulkString(&cmd,"REPLACE",7));
615 }
616
617 /* Fix the actual number of keys we are migrating. */
618 num_keys = non_expired;
619
620 /* Transfer the query to the other node in 64K chunks. */
621 errno = 0;
622 {
623 sds buf = cmd.io.buffer.ptr;
624 size_t pos = 0, towrite;
625 int nwritten = 0;
626
627 while ((towrite = sdslen(buf)-pos) > 0) {
628 towrite = (towrite > (64*1024) ? (64*1024) : towrite);
629 nwritten = connSyncWrite(cs->conn,buf+pos,towrite,timeout);
630 if (nwritten != (signed)towrite) {
631 write_error = 1;
632 goto socket_err;
633 }
634 pos += nwritten;
635 }
636 }
637
638 char buf0[1024]; /* Auth reply. */
639 char buf1[1024]; /* Select reply. */
640 char buf2[1024]; /* Restore reply. */
641
642 /* Read the AUTH reply if needed. */
643 if (password && connSyncReadLine(cs->conn, buf0, sizeof(buf0), timeout) <= 0)
644 goto socket_err;
645
646 /* Read the SELECT reply if needed. */
647 if (select && connSyncReadLine(cs->conn, buf1, sizeof(buf1), timeout) <= 0)
648 goto socket_err;
649
650 /* Read the RESTORE replies. */
651 int error_from_target = 0;
652 int socket_error = 0;
653 int del_idx = 1; /* Index of the key argument for the replicated DEL op. */
654
655 /* Allocate the new argument vector that will replace the current command,
656 * to propagate the MIGRATE as a DEL command (if no COPY option was given).
657 * We allocate num_keys+1 because the additional argument is for "DEL"
658 * command name itself. */
659 if (!copy) newargv = zmalloc(sizeof(robj*)*(num_keys+1));
660
661 for (j = 0; j < num_keys; j++) {
662 if (connSyncReadLine(cs->conn, buf2, sizeof(buf2), timeout) <= 0) {
663 socket_error = 1;
664 break;
665 }
666 if ((password && buf0[0] == '-') ||
667 (select && buf1[0] == '-') ||
668 buf2[0] == '-')
669 {
670 /* On error assume that last_dbid is no longer valid. */
671 if (!error_from_target) {
672 cs->last_dbid = -1;
673 char *errbuf;
674 if (password && buf0[0] == '-') errbuf = buf0;
675 else if (select && buf1[0] == '-') errbuf = buf1;
676 else errbuf = buf2;
677
678 error_from_target = 1;
679 addReplyErrorFormat(c,"Target instance replied with error: %s",
680 errbuf+1);
681 }
682 } else {
683 if (!copy) {
684 /* No COPY option: remove the local key, signal the change. */
685 dbDelete(c->db,keyArray[j]);
686 keyModified(c,c->db,keyArray[j],NULL,1);
687 notifyKeyspaceEvent(NOTIFY_GENERIC,"del",keyArray[j],c->db->id);
688 server.dirty++;
689
690 /* Populate the argument vector to replace the old one. */
691 newargv[del_idx++] = keyArray[j];
692 incrRefCount(keyArray[j]);
693 }
694 }
695 }
696
697 /* On socket error, if we want to retry, do it now before rewriting the
698 * command vector. We only retry if we are sure nothing was processed
699 * and we failed to read the first reply (j == 0 test). */
700 if (!error_from_target && socket_error && j == 0 && may_retry &&
701 errno != ETIMEDOUT)
702 {
703 goto socket_err; /* A retry is guaranteed because of tested conditions.*/
704 }
705
706 /* On socket errors, close the migration socket now that we still have
707 * the original host/port in the ARGV. Later the original command may be
708 * rewritten to DEL and will be too later. */
709 if (socket_error) migrateCloseSocket(c->argv[1],c->argv[2]);
710
711 if (!copy) {
712 /* Translate MIGRATE as DEL for replication/AOF. Note that we do
713 * this only for the keys for which we received an acknowledgement
714 * from the receiving Redis server, by using the del_idx index. */
715 if (del_idx > 1) {
716 newargv[0] = createStringObject("DEL",3);
717 /* Note that the following call takes ownership of newargv. */
718 replaceClientCommandVector(c,del_idx,newargv);
719 argv_rewritten = 1;
720 } else {
721 /* No key transfer acknowledged, no need to rewrite as DEL. */
722 zfree(newargv);
723 }
724 newargv = NULL; /* Make it safe to call zfree() on it in the future. */
725 }
726
727 /* If we are here and a socket error happened, we don't want to retry.
728 * Just signal the problem to the client, but only do it if we did not
729 * already queue a different error reported by the destination server. */
730 if (!error_from_target && socket_error) {
731 may_retry = 0;
732 goto socket_err;
733 }
734
735 if (!error_from_target) {
736 /* Success! Update the last_dbid in migrateCachedSocket, so that we can
737 * avoid SELECT the next time if the target DB is the same. Reply +OK.
738 *
739 * Note: If we reached this point, even if socket_error is true
740 * still the SELECT command succeeded (otherwise the code jumps to
741 * socket_err label. */
742 cs->last_dbid = dbid;
743 addReply(c,shared.ok);
744 } else {
745 /* On error we already sent it in the for loop above, and set
746 * the currently selected socket to -1 to force SELECT the next time. */
747 }
748
749 sdsfree(cmd.io.buffer.ptr);
750 zfree(kvArray); zfree(keyArray); zfree(newargv);
751 return;
752
753/* On socket errors we try to close the cached socket and try again.
754 * It is very common for the cached socket to get closed, if just reopening
755 * it works it's a shame to notify the error to the caller. */
756 socket_err:
757 /* Cleanup we want to perform in both the retry and no retry case.
758 * Note: Closing the migrate socket will also force SELECT next time. */
759 sdsfree(cmd.io.buffer.ptr);
760
761 /* If the command was rewritten as DEL and there was a socket error,
762 * we already closed the socket earlier. While migrateCloseSocket()
763 * is idempotent, the host/port arguments are now gone, so don't do it
764 * again. */
765 if (!argv_rewritten) migrateCloseSocket(c->argv[1],c->argv[2]);
766 zfree(newargv);
767 newargv = NULL; /* This will get reallocated on retry. */
768
769 /* Retry only if it's not a timeout and we never attempted a retry
770 * (or the code jumping here did not set may_retry to zero). */
771 if (errno != ETIMEDOUT && may_retry) {
772 may_retry = 0;
773 goto try_again;
774 }
775
776 /* Cleanup we want to do if no retry is attempted. */
777 zfree(kvArray); zfree(keyArray);
778 addReplyErrorSds(c, sdscatprintf(sdsempty(),
779 "-IOERR error or timeout %s to target instance",
780 write_error ? "writing" : "reading"));
781 return;
782}
783
784/* Cluster node sanity check. Returns C_OK if the node id
785 * is valid an C_ERR otherwise. */
786int verifyClusterNodeId(const char *name, int length) {
787 if (length != CLUSTER_NAMELEN) return C_ERR;
788 for (int i = 0; i < length; i++) {
789 if (name[i] >= 'a' && name[i] <= 'z') continue;
790 if (name[i] >= '0' && name[i] <= '9') continue;
791 return C_ERR;
792 }
793 return C_OK;
794}
795
796int isValidAuxChar(int c) {
797 return isalnum(c) || (strchr("!#$%&()*+:;<>?@[]^{|}~", c) == NULL);
798}
799
800int isValidAuxString(char *s, unsigned int length) {
801 for (unsigned i = 0; i < length; i++) {
802 if (!isValidAuxChar(s[i])) return 0;
803 }
804 return 1;
805}
806
807void clusterCommandMyId(client *c) {
808 char *name = clusterNodeGetName(getMyClusterNode());
809 if (name) {
810 addReplyBulkCBuffer(c,name, CLUSTER_NAMELEN);
811 } else {
812 addReplyError(c, "No ID yet");
813 }
814}
815
816char* getMyClusterId(void) {
817 return clusterNodeGetName(getMyClusterNode());
818}
819
820void clusterCommandMyShardId(client *c) {
821 char *sid = clusterNodeGetShardId(getMyClusterNode());
822 if (sid) {
823 addReplyBulkCBuffer(c,sid, CLUSTER_NAMELEN);
824 } else {
825 addReplyError(c, "No shard ID yet");
826 }
827}
828
829/* When a cluster command is called, we need to decide whether to return TLS info or
830 * non-TLS info by the client's connection type. However if the command is called by
831 * a Lua script or RM_call, there is no connection in the fake client, so we use
832 * server.current_client here to get the real client if available. And if it is not
833 * available (modules may call commands without a real client), we return the default
834 * info, which is determined by server.tls_cluster. */
835static int shouldReturnTlsInfo(void) {
836 if (server.current_client && server.current_client->conn) {
837 return connIsTLS(server.current_client->conn);
838 } else {
839 return server.tls_cluster;
840 }
841}
842
843unsigned int countKeysInSlot(unsigned int slot) {
844 return kvstoreDictSize(server.db->keys, slot);
845}
846
847/* Add detailed information of a node to the output buffer of the given client. */
848void addNodeDetailsToShardReply(client *c, clusterNode *node) {
849
850 int reply_count = 0;
851 char *hostname;
852 void *node_replylen = addReplyDeferredLen(c);
853
854 addReplyBulkCString(c, "id");
855 addReplyBulkCBuffer(c, clusterNodeGetName(node), CLUSTER_NAMELEN);
856 reply_count++;
857
858 if (clusterNodeTcpPort(node)) {
859 addReplyBulkCString(c, "port");
860 addReplyLongLong(c, clusterNodeTcpPort(node));
861 reply_count++;
862 }
863
864 if (clusterNodeTlsPort(node)) {
865 addReplyBulkCString(c, "tls-port");
866 addReplyLongLong(c, clusterNodeTlsPort(node));
867 reply_count++;
868 }
869
870 addReplyBulkCString(c, "ip");
871 addReplyBulkCString(c, clusterNodeIp(node));
872 reply_count++;
873
874 addReplyBulkCString(c, "endpoint");
875 addReplyBulkCString(c, clusterNodePreferredEndpoint(node));
876 reply_count++;
877
878 hostname = clusterNodeHostname(node);
879 if (hostname != NULL && *hostname != '\0') {
880 addReplyBulkCString(c, "hostname");
881 addReplyBulkCString(c, hostname);
882 reply_count++;
883 }
884
885 long long node_offset;
886 if (clusterNodeIsMyself(node)) {
887 node_offset = clusterNodeIsSlave(node) ? replicationGetSlaveOffset() : server.master_repl_offset;
888 } else {
889 node_offset = clusterNodeReplOffset(node);
890 }
891
892 addReplyBulkCString(c, "role");
893 addReplyBulkCString(c, clusterNodeIsSlave(node) ? "replica" : "master");
894 reply_count++;
895
896 addReplyBulkCString(c, "replication-offset");
897 addReplyLongLong(c, node_offset);
898 reply_count++;
899
900 addReplyBulkCString(c, "health");
901 const char *health_msg = NULL;
902 if (clusterNodeIsFailing(node)) {
903 health_msg = "fail";
904 } else if (clusterNodeIsSlave(node) && node_offset == 0) {
905 health_msg = "loading";
906 } else {
907 health_msg = "online";
908 }
909 addReplyBulkCString(c, health_msg);
910 reply_count++;
911
912 setDeferredMapLen(c, node_replylen, reply_count);
913}
914
915static clusterNode *clusterGetMasterFromShard(void *shard_handle) {
916 clusterNode *n = NULL;
917 void *node_it = clusterShardHandleGetNodeIterator(shard_handle);
918 while((n = clusterShardNodeIteratorNext(node_it)) != NULL) {
919 if (!clusterNodeIsFailing(n)) {
920 break;
921 }
922 }
923 clusterShardNodeIteratorFree(node_it);
924 if (!n) return NULL;
925 return clusterNodeGetMaster(n);
926}
927
928/* Add the shard reply of a single shard based off the given primary node. */
929void addShardReplyForClusterShards(client *c, void *shard_handle) {
930 serverAssert(clusterGetShardNodeCount(shard_handle) > 0);
931 addReplyMapLen(c, 2);
932 addReplyBulkCString(c, "slots");
933
934 /* Use slot_info_pairs from the primary only */
935 clusterNode *master_node = clusterGetMasterFromShard(shard_handle);
936
937 if (master_node && clusterNodeHasSlotInfo(master_node)) {
938 serverAssert((clusterNodeSlotInfoCount(master_node) % 2) == 0);
939 addReplyArrayLen(c, clusterNodeSlotInfoCount(master_node));
940 for (int i = 0; i < clusterNodeSlotInfoCount(master_node); i++)
941 addReplyLongLong(c, (unsigned long)clusterNodeSlotInfoEntry(master_node, i));
942 } else {
943 /* If no slot info pair is provided, the node owns no slots */
944 addReplyArrayLen(c, 0);
945 }
946
947 addReplyBulkCString(c, "nodes");
948 addReplyArrayLen(c, clusterGetShardNodeCount(shard_handle));
949 void *node_it = clusterShardHandleGetNodeIterator(shard_handle);
950 for (clusterNode *n = clusterShardNodeIteratorNext(node_it); n != NULL; n = clusterShardNodeIteratorNext(node_it)) {
951 addNodeDetailsToShardReply(c, n);
952 clusterFreeNodesSlotsInfo(n);
953 }
954 clusterShardNodeIteratorFree(node_it);
955}
956
957/* Add to the output buffer of the given client, an array of slot (start, end)
958 * pair owned by the shard, also the primary and set of replica(s) along with
959 * information about each node. */
960void clusterCommandShards(client *c) {
961 addReplyArrayLen(c, clusterGetShardCount());
962 /* This call will add slot_info_pairs to all nodes */
963 clusterGenNodesSlotsInfo(0);
964 dictIterator *shard_it = clusterGetShardIterator();
965 for(void *shard_handle = clusterNextShardHandle(shard_it); shard_handle != NULL; shard_handle = clusterNextShardHandle(shard_it)) {
966 addShardReplyForClusterShards(c, shard_handle);
967 }
968 clusterFreeShardIterator(shard_it);
969}
970
971void clusterCommandHelp(client *c) {
972 const char *help[] = {
973 "COUNTKEYSINSLOT <slot>",
974 " Return the number of keys in <slot>.",
975 "GETKEYSINSLOT <slot> <count>",
976 " Return key names stored by current node in a slot.",
977 "INFO",
978 " Return information about the cluster.",
979 "KEYSLOT <key>",
980 " Return the hash slot for <key>.",
981 "MYID",
982 " Return the node id.",
983 "MYSHARDID",
984 " Return the node's shard id.",
985 "NODES",
986 " Return cluster configuration seen by node. Output format:",
987 " <id> <ip:port@bus-port[,hostname]> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ...",
988 "REPLICAS <node-id>",
989 " Return <node-id> replicas.",
990 "SLOTS",
991 " Return information about slots range mappings. Each range is made of:",
992 " start, end, master and replicas IP addresses, ports and ids",
993 "SLOT-STATS",
994 " Return an array of slot usage statistics for slots assigned to the current node.",
995 "SHARDS",
996 " Return information about slot range mappings and the nodes associated with them.",
997 NULL
998 };
999
1000 addExtendedReplyHelp(c, help, clusterCommandExtendedHelp());
1001}
1002
1003void clusterCommand(client *c) {
1004 if (server.cluster_enabled == 0) {
1005 addReplyError(c,"This instance has cluster support disabled");
1006 return;
1007 }
1008
1009 if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
1010 clusterCommandHelp(c);
1011 } else if (!strcasecmp(c->argv[1]->ptr,"nodes") && c->argc == 2) {
1012 /* CLUSTER NODES */
1013 /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
1014 sds nodes = clusterGenNodesDescription(c, 0, shouldReturnTlsInfo());
1015 addReplyVerbatim(c,nodes,sdslen(nodes),"txt");
1016 sdsfree(nodes);
1017 } else if (!strcasecmp(c->argv[1]->ptr,"myid") && c->argc == 2) {
1018 /* CLUSTER MYID */
1019 clusterCommandMyId(c);
1020 } else if (!strcasecmp(c->argv[1]->ptr,"myshardid") && c->argc == 2) {
1021 /* CLUSTER MYSHARDID */
1022 clusterCommandMyShardId(c);
1023 } else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {
1024 /* CLUSTER SLOTS */
1025 clusterCommandSlots(c);
1026 } else if (!strcasecmp(c->argv[1]->ptr,"shards") && c->argc == 2) {
1027 /* CLUSTER SHARDS */
1028 clusterCommandShards(c);
1029 } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) {
1030 /* CLUSTER INFO */
1031
1032 sds info = genClusterInfoString();
1033
1034 /* Produce the reply protocol. */
1035 addReplyVerbatim(c,info,sdslen(info),"txt");
1036 sdsfree(info);
1037 } else if (!strcasecmp(c->argv[1]->ptr,"keyslot") && c->argc == 3) {
1038 /* CLUSTER KEYSLOT <key> */
1039 sds key = c->argv[2]->ptr;
1040
1041 addReplyLongLong(c,keyHashSlot(key,sdslen(key)));
1042 } else if (!strcasecmp(c->argv[1]->ptr,"countkeysinslot") && c->argc == 3) {
1043 /* CLUSTER COUNTKEYSINSLOT <slot> */
1044 long long slot;
1045
1046 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
1047 return;
1048 if (slot < 0 || slot >= CLUSTER_SLOTS) {
1049 addReplyError(c,"Invalid slot");
1050 return;
1051 }
1052
1053 if (!clusterCanAccessKeysInSlot(slot)) {
1054 addReplyLongLong(c, 0);
1055 return;
1056 }
1057 addReplyLongLong(c,countKeysInSlot(slot));
1058 } else if (!strcasecmp(c->argv[1]->ptr,"getkeysinslot") && c->argc == 4) {
1059 /* CLUSTER GETKEYSINSLOT <slot> <count> */
1060 long long maxkeys, slot;
1061
1062 if (getLongLongFromObjectOrReply(c,c->argv[2],&slot,NULL) != C_OK)
1063 return;
1064 if (getLongLongFromObjectOrReply(c,c->argv[3],&maxkeys,NULL)
1065 != C_OK)
1066 return;
1067 if (slot < 0 || slot >= CLUSTER_SLOTS || maxkeys < 0) {
1068 addReplyError(c,"Invalid slot or number of keys");
1069 return;
1070 }
1071
1072 if (!clusterCanAccessKeysInSlot(slot)) {
1073 addReplyArrayLen(c, 0);
1074 return;
1075 }
1076
1077 unsigned int keys_in_slot = countKeysInSlot(slot);
1078 unsigned int numkeys = maxkeys > keys_in_slot ? keys_in_slot : maxkeys;
1079 addReplyArrayLen(c,numkeys);
1080 kvstoreDictIterator kvs_di;
1081 dictEntry *de = NULL;
1082 kvstoreInitDictIterator(&kvs_di, server.db->keys, slot);
1083 for (unsigned int i = 0; i < numkeys; i++) {
1084 de = kvstoreDictIteratorNext(&kvs_di);
1085 serverAssert(de != NULL);
1086 sds sdskey = kvobjGetKey(dictGetKV(de));
1087 addReplyBulkCBuffer(c, sdskey, sdslen(sdskey));
1088 }
1089 kvstoreResetDictIterator(&kvs_di);
1090 } else if ((!strcasecmp(c->argv[1]->ptr,"slaves") ||
1091 !strcasecmp(c->argv[1]->ptr,"replicas")) && c->argc == 3) {
1092 /* CLUSTER SLAVES <NODE ID> */
1093 /* CLUSTER REPLICAS <NODE ID> */
1094 clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
1095 int j;
1096
1097 /* Lookup the specified node in our table. */
1098 if (!n) {
1099 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
1100 return;
1101 }
1102
1103 if (clusterNodeIsSlave(n)) {
1104 addReplyError(c,"The specified node is not a master");
1105 return;
1106 }
1107
1108 /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
1109 addReplyArrayLen(c, clusterNodeNumSlaves(n));
1110 for (j = 0; j < clusterNodeNumSlaves(n); j++) {
1111 sds ni = clusterGenNodeDescription(c, clusterNodeGetSlave(n, j), shouldReturnTlsInfo());
1112 addReplyBulkCString(c,ni);
1113 sdsfree(ni);
1114 }
1115 } else if (!strcasecmp(c->argv[1]->ptr, "migration")) {
1116 clusterMigrationCommand(c);
1117 } else if (!strcasecmp(c->argv[1]->ptr,"syncslots") && c->argc >= 3) {
1118 clusterSyncSlotsCommand(c);
1119 } else if(!clusterCommandSpecial(c)) {
1120 addReplySubcommandSyntaxError(c);
1121 return;
1122 }
1123}
1124
1125/* Extract slot number from keys in a keys_result structure and return to caller.
1126 * Returns:
1127 * - The slot number if all keys belong to the same slot
1128 * - INVALID_CLUSTER_SLOT if there are no keys or cluster is disabled
1129 * - CLUSTER_CROSSSLOT if keys belong to different slots (cross-slot error) */
1130int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result) {
1131 if (keys_result->numkeys == 0 || !server.cluster_enabled)
1132 return INVALID_CLUSTER_SLOT;
1133
1134 int first_slot = INVALID_CLUSTER_SLOT;
1135 for (int j = 0; j < keys_result->numkeys; j++) {
1136 robj *this_key = argv[keys_result->keys[j].pos];
1137 int this_slot = (int)keyHashSlot((char*)this_key->ptr, sdslen(this_key->ptr));
1138
1139 if (first_slot == INVALID_CLUSTER_SLOT)
1140 first_slot = this_slot;
1141 else if (first_slot != this_slot) {
1142 return CLUSTER_CROSSSLOT;
1143 }
1144 }
1145 return first_slot;
1146}
1147
1148/* Return the pointer to the cluster node that is able to serve the command.
1149 * For the function to succeed the command should only target either:
1150 *
1151 * 1) A single key (even multiple times like RPOPLPUSH mylist mylist).
1152 * 2) Multiple keys in the same hash slot, while the slot is stable (no
1153 * resharding in progress).
1154 *
1155 * On success the function returns the node that is able to serve the request.
1156 * If the node is not 'myself' a redirection must be performed. The kind of
1157 * redirection is specified setting the integer passed by reference
1158 * 'error_code', which will be set to CLUSTER_REDIR_ASK or
1159 * CLUSTER_REDIR_MOVED.
1160 *
1161 * When the node is 'myself' 'error_code' is set to CLUSTER_REDIR_NONE.
1162 *
1163 * If the command fails NULL is returned, and the reason of the failure is
1164 * provided via 'error_code', which will be set to:
1165 *
1166 * CLUSTER_REDIR_CROSS_SLOT if the request contains multiple keys that
1167 * don't belong to the same hash slot.
1168 *
1169 * CLUSTER_REDIR_UNSTABLE if the request contains multiple keys
1170 * belonging to the same slot, but the slot is not stable (in migration or
1171 * importing state, likely because a resharding is in progress).
1172 *
1173 * CLUSTER_REDIR_DOWN_UNBOUND if the request addresses a slot which is
1174 * not bound to any node. In this case the cluster global state should be
1175 * already "down" but it is fragile to rely on the update of the global state,
1176 * so we also handle it here.
1177 *
1178 * CLUSTER_REDIR_DOWN_STATE and CLUSTER_REDIR_DOWN_RO_STATE if the cluster is
1179 * down but the user attempts to execute a command that addresses one or more keys. */
1180clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot,
1181 getKeysResult *keys_result, uint8_t read_error, uint64_t cmd_flags, int *error_code)
1182{
1183 clusterNode *myself = getMyClusterNode();
1184 clusterNode *n = NULL;
1185 robj *firstkey = NULL;
1186 int multiple_keys = 0;
1187 multiState *ms, _ms;
1188 pendingCommand mc;
1189 pendingCommand *mcp = &mc;
1190 int i, slot = 0, migrating_slot = 0, importing_slot = 0, missing_keys = 0,
1191 existing_keys = 0;
1192 int pubsubshard_included = 0; /* Flag to indicate if a pubsub shard cmd is included. */
1193
1194 /* Allow any key to be set if a module disabled cluster redirections. */
1195 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
1196 return myself;
1197
1198 /* Set error code optimistically for the base case. */
1199 if (error_code) *error_code = CLUSTER_REDIR_NONE;
1200
1201 /* Modules can turn off Redis Cluster redirection: this is useful
1202 * when writing a module that implements a completely different
1203 * distributed system. */
1204
1205 /* We handle all the cases as if they were EXEC commands, so we have
1206 * a common code path for everything */
1207 if (cmd->proc == execCommand) {
1208 /* If CLIENT_MULTI flag is not set EXEC is just going to return an
1209 * error. */
1210 if (!(c->flags & CLIENT_MULTI)) return myself;
1211 ms = &c->mstate;
1212 } else {
1213 /* In order to have a single codepath create a fake Multi State
1214 * structure if the client is not in MULTI/EXEC state, this way
1215 * we have a single codepath below. */
1216 ms = &_ms;
1217 _ms.commands = &mcp;
1218 _ms.count = 1;
1219
1220 /* Properly initialize the fake pendingCommand */
1221 initPendingCommand(&mc);
1222 mc.argv = argv;
1223 mc.argc = argc;
1224 mc.cmd = cmd;
1225 mc.slot = hashslot ? *hashslot : INVALID_CLUSTER_SLOT;
1226 mc.read_error = read_error;
1227 if (keys_result) {
1228 mc.keys_result = *keys_result;
1229 mc.flags |= PENDING_CMD_KEYS_RESULT_VALID;
1230 }
1231 }
1232
1233 /* Check that all the keys are in the same hash slot, and obtain this
1234 * slot and the node associated. */
1235 for (i = 0; i < ms->count; i++) {
1236 struct redisCommand *mcmd;
1237 robj **margv;
1238 int margc, j;
1239 keyReference *keyindex;
1240
1241 pendingCommand *pcmd = ms->commands[i];
1242
1243 mcmd = pcmd->cmd;
1244 margc = pcmd->argc;
1245 margv = pcmd->argv;
1246
1247 /* Only valid for sharded pubsub as regular pubsub can operate on any node and bypasses this layer. */
1248 if (!pubsubshard_included &&
1249 doesCommandHaveChannelsWithFlags(mcmd, CMD_CHANNEL_PUBLISH | CMD_CHANNEL_SUBSCRIBE))
1250 {
1251 pubsubshard_included = 1;
1252 }
1253
1254 /* If we have a cached keys result from preprocessCommand(), use it.
1255 * Otherwise, extract keys result. */
1256 int use_cache_keys_result = pcmd->flags & PENDING_CMD_KEYS_RESULT_VALID;
1257 getKeysResult result = GETKEYS_RESULT_INIT;
1258 if (use_cache_keys_result)
1259 result = pcmd->keys_result;
1260 else
1261 getKeysFromCommand(mcmd,margv,margc,&result);
1262 keyindex = result.keys;
1263
1264 for (j = 0; j < result.numkeys; j++) {
1265 /* The command has keys and was checked for cross-slot between its keys in preprocessCommand() */
1266 if (pcmd->read_error == CLIENT_READ_CROSS_SLOT) {
1267 /* Error: multiple keys from different slots. */
1268 if (error_code)
1269 *error_code = CLUSTER_REDIR_CROSS_SLOT;
1270 return NULL;
1271 }
1272
1273 robj *thiskey = margv[keyindex[j].pos];
1274 int thisslot = pcmd->slot;
1275 if (thisslot == INVALID_CLUSTER_SLOT)
1276 thisslot = keyHashSlot((char*)thiskey->ptr, sdslen(thiskey->ptr));
1277
1278 if (firstkey == NULL) {
1279 /* This is the first key we see. Check what is the slot
1280 * and node. */
1281 firstkey = thiskey;
1282 slot = thisslot;
1283 n = getNodeBySlot(slot);
1284
1285 /* Error: If a slot is not served, we are in "cluster down"
1286 * state. However the state is yet to be updated, so this was
1287 * not trapped earlier in processCommand(). Report the same
1288 * error to the client. */
1289 if (n == NULL) {
1290 if (!use_cache_keys_result) getKeysFreeResult(&result);
1291 if (error_code)
1292 *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
1293 return NULL;
1294 }
1295
1296 /* If we are migrating or importing this slot, we need to check
1297 * if we have all the keys in the request (the only way we
1298 * can safely serve the request, otherwise we return a TRYAGAIN
1299 * error). To do so we set the importing/migrating state and
1300 * increment a counter for every missing key. */
1301 if (n == myself &&
1302 getMigratingSlotDest(slot) != NULL)
1303 {
1304 migrating_slot = 1;
1305 } else if (getImportingSlotSource(slot) != NULL) {
1306 importing_slot = 1;
1307 }
1308 } else {
1309 /* If it is not the first key/channel, make sure it is exactly
1310 * the same key/channel as the first we saw. */
1311 if (slot != thisslot) {
1312 /* Error: multiple keys from different slots. */
1313 if (!use_cache_keys_result) getKeysFreeResult(&result);
1314 if (error_code)
1315 *error_code = CLUSTER_REDIR_CROSS_SLOT;
1316 return NULL;
1317 }
1318 if (importing_slot && !multiple_keys && !equalStringObjects(firstkey,thiskey)) {
1319 /* Flag this request as one with multiple different
1320 * keys/channels when the slot is in importing state. */
1321 multiple_keys = 1;
1322 }
1323 }
1324
1325 /* Migrating / Importing slot? Count keys we don't have.
1326 * If it is pubsubshard command, it isn't required to check
1327 * the channel being present or not in the node during the
1328 * slot migration, the channel will be served from the source
1329 * node until the migration completes with CLUSTER SETSLOT <slot>
1330 * NODE <node-id>. */
1331 int flags = LOOKUP_NOTOUCH | LOOKUP_NOSTATS | LOOKUP_NONOTIFY | LOOKUP_NOEXPIRE;
1332 if ((migrating_slot || importing_slot) && !pubsubshard_included)
1333 {
1334 if (lookupKeyReadWithFlags(&server.db[0], thiskey, flags) == NULL) missing_keys++;
1335 else existing_keys++;
1336 }
1337 }
1338 if (!use_cache_keys_result) getKeysFreeResult(&result);
1339 }
1340
1341 /* No key at all in command? then we can serve the request
1342 * without redirections or errors in all the cases. */
1343 if (n == NULL) return myself;
1344
1345 /* Cluster is globally down but we got keys? We only serve the request
1346 * if it is a read command and when allow_reads_when_down is enabled. */
1347 if (!isClusterHealthy()) {
1348 if (pubsubshard_included) {
1349 if (!server.cluster_allow_pubsubshard_when_down) {
1350 if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
1351 return NULL;
1352 }
1353 } else if (!server.cluster_allow_reads_when_down) {
1354 /* The cluster is configured to block commands when the
1355 * cluster is down. */
1356 if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE;
1357 return NULL;
1358 } else if (cmd_flags & CMD_WRITE) {
1359 /* The cluster is configured to allow read only commands */
1360 if (error_code) *error_code = CLUSTER_REDIR_DOWN_RO_STATE;
1361 return NULL;
1362 } else {
1363 /* Fall through and allow the command to be executed:
1364 * this happens when server.cluster_allow_reads_when_down is
1365 * true and the command is not a write command */
1366 }
1367 }
1368
1369 /* Return the hashslot by reference. */
1370 if (hashslot) *hashslot = slot;
1371
1372 /* MIGRATE always works in the context of the local node if the slot
1373 * is open (migrating or importing state). We need to be able to freely
1374 * move keys among instances in this case. */
1375 if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
1376 return myself;
1377
1378 /* If we don't have all the keys and we are migrating the slot, send
1379 * an ASK redirection or TRYAGAIN. */
1380 if (migrating_slot && missing_keys) {
1381 /* If we have keys but we don't have all keys, we return TRYAGAIN */
1382 if (existing_keys) {
1383 if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
1384 return NULL;
1385 } else {
1386 if (error_code) *error_code = CLUSTER_REDIR_ASK;
1387 return getMigratingSlotDest(slot);
1388 }
1389 }
1390
1391 /* If we are receiving the slot, and the client correctly flagged the
1392 * request as "ASKING", we can serve the request. However if the request
1393 * involves multiple keys and we don't have them all, the only option is
1394 * to send a TRYAGAIN error. */
1395 if (importing_slot &&
1396 (c->flags & CLIENT_ASKING || cmd_flags & CMD_ASKING))
1397 {
1398 if (multiple_keys && missing_keys) {
1399 if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
1400 return NULL;
1401 } else {
1402 return myself;
1403 }
1404 }
1405
1406 /* Handle the read-only client case reading from a slave: if this
1407 * node is a slave and the request is about a hash slot our master
1408 * is serving, we can reply without redirection. */
1409 int is_write_command = (cmd_flags & CMD_WRITE) ||
1410 (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
1411 if (((c->flags & CLIENT_READONLY) || pubsubshard_included) &&
1412 !is_write_command &&
1413 clusterNodeIsSlave(myself) &&
1414 clusterNodeGetSlaveof(myself) == n)
1415 {
1416 return myself;
1417 }
1418
1419 /* Base case: just return the right node. However, if this node is not
1420 * myself, set error_code to MOVED since we need to issue a redirection. */
1421 if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
1422 return n;
1423}
1424
1425/* Send the client the right redirection code, according to error_code
1426 * that should be set to one of CLUSTER_REDIR_* macros.
1427 *
1428 * If CLUSTER_REDIR_ASK or CLUSTER_REDIR_MOVED error codes
1429 * are used, then the node 'n' should not be NULL, but should be the
1430 * node we want to mention in the redirection. Moreover hashslot should
1431 * be set to the hash slot that caused the redirection. */
1432void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
1433 if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
1434 addReplyError(c,"-CROSSSLOT Keys in request don't hash to the same slot");
1435 } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
1436 /* The request spawns multiple keys in the same slot,
1437 * but the slot is not "stable" currently as there is
1438 * a migration or import in progress. */
1439 addReplyError(c,"-TRYAGAIN Multiple keys request during rehashing of slot");
1440 } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
1441 addReplyError(c,"-CLUSTERDOWN The cluster is down");
1442 } else if (error_code == CLUSTER_REDIR_DOWN_RO_STATE) {
1443 addReplyError(c,"-CLUSTERDOWN The cluster is down and only accepts read commands");
1444 } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
1445 addReplyError(c,"-CLUSTERDOWN Hash slot not served");
1446 } else if (error_code == CLUSTER_REDIR_MOVED ||
1447 error_code == CLUSTER_REDIR_ASK)
1448 {
1449 /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
1450 int port = clusterNodeClientPort(n, shouldReturnTlsInfo());
1451 addReplyErrorSds(c,sdscatprintf(sdsempty(),
1452 "-%s %d %s:%d",
1453 (error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
1454 hashslot, clusterNodePreferredEndpoint(n), port));
1455 } else {
1456 serverPanic("getNodeByQuery() unknown error.");
1457 }
1458}
1459
1460/* This function is called by the function processing clients incrementally
1461 * to detect timeouts, in order to handle the following case:
1462 *
1463 * 1) A client blocks with BLPOP or similar blocking operation.
1464 * 2) The master migrates the hash slot elsewhere or turns into a slave.
1465 * 3) The client may remain blocked forever (or up to the max timeout time)
1466 * waiting for a key change that will never happen.
1467 *
1468 * If the client is found to be blocked into a hash slot this node no
1469 * longer handles, the client is sent a redirection error, and the function
1470 * returns 1. Otherwise 0 is returned and no operation is performed. */
1471int clusterRedirectBlockedClientIfNeeded(client *c) {
1472 clusterNode *myself = getMyClusterNode();
1473 if (c->flags & CLIENT_BLOCKED &&
1474 (c->bstate.btype == BLOCKED_LIST ||
1475 c->bstate.btype == BLOCKED_ZSET ||
1476 c->bstate.btype == BLOCKED_STREAM ||
1477 c->bstate.btype == BLOCKED_MODULE))
1478 {
1479 dictEntry *de;
1480 dictIterator di;
1481
1482 /* If the cluster is down, unblock the client with the right error.
1483 * If the cluster is configured to allow reads on cluster down, we
1484 * still want to emit this error since a write will be required
1485 * to unblock them which may never come. */
1486 if (!isClusterHealthy()) {
1487 clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
1488 return 1;
1489 }
1490
1491 /* If the client is blocked on module, but not on a specific key,
1492 * don't unblock it (except for the CLUSTER_FAIL case above). */
1493 if (c->bstate.btype == BLOCKED_MODULE && !moduleClientIsBlockedOnKeys(c))
1494 return 0;
1495
1496 /* All keys must belong to the same slot, so check first key only. */
1497 dictInitIterator(&di, c->bstate.keys);
1498 if ((de = dictNext(&di)) != NULL) {
1499 robj *key = dictGetKey(de);
1500 int slot = keyHashSlot((char*)key->ptr, sdslen(key->ptr));
1501 clusterNode *node = getNodeBySlot(slot);
1502
1503 /* if the client is read-only and attempting to access key that our
1504 * replica can handle, allow it. */
1505 if ((c->flags & CLIENT_READONLY) &&
1506 !(c->lastcmd->flags & CMD_WRITE) &&
1507 clusterNodeIsSlave(myself) && clusterNodeGetSlaveof(myself) == node)
1508 {
1509 node = myself;
1510 }
1511
1512 /* We send an error and unblock the client if:
1513 * 1) The slot is unassigned, emitting a cluster down error.
1514 * 2) The slot is not handled by this node, nor being imported. */
1515 if (node != myself && getImportingSlotSource(slot) == NULL)
1516 {
1517 if (node == NULL) {
1518 clusterRedirectClient(c,NULL,0,
1519 CLUSTER_REDIR_DOWN_UNBOUND);
1520 } else {
1521 clusterRedirectClient(c,node,slot,
1522 CLUSTER_REDIR_MOVED);
1523 }
1524 dictResetIterator(&di);
1525 return 1;
1526 }
1527 }
1528 dictResetIterator(&di);
1529 }
1530 return 0;
1531}
1532
1533/* Returns an indication if the replica node is fully available
1534 * and should be listed in CLUSTER SLOTS response.
1535 * Returns 1 for available nodes, 0 for nodes that have
1536 * not finished their initial sync, in failed state, or are
1537 * otherwise considered not available to serve read commands. */
1538static int isReplicaAvailable(clusterNode *node) {
1539 if (clusterNodeIsFailing(node)) {
1540 return 0;
1541 }
1542 long long repl_offset = clusterNodeReplOffset(node);
1543 if (clusterNodeIsMyself(node)) {
1544 /* Nodes do not update their own information
1545 * in the cluster node list. */
1546 repl_offset = replicationGetSlaveOffset();
1547 }
1548 return (repl_offset != 0);
1549}
1550
1551void addNodeToNodeReply(client *c, clusterNode *node) {
1552 char* hostname = clusterNodeHostname(node);
1553 addReplyArrayLen(c, 4);
1554 if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_IP) {
1555 addReplyBulkCString(c, clusterNodeIp(node));
1556 } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_HOSTNAME) {
1557 if (hostname != NULL && hostname[0] != '\0') {
1558 addReplyBulkCString(c, hostname);
1559 } else {
1560 addReplyBulkCString(c, "?");
1561 }
1562 } else if (server.cluster_preferred_endpoint_type == CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT) {
1563 addReplyNull(c);
1564 } else {
1565 serverPanic("Unrecognized preferred endpoint type");
1566 }
1567
1568 /* Report TLS ports to TLS client, and report non-TLS port to non-TLS client. */
1569 addReplyLongLong(c, clusterNodeClientPort(node, shouldReturnTlsInfo()));
1570 addReplyBulkCBuffer(c, clusterNodeGetName(node), CLUSTER_NAMELEN);
1571
1572 /* Add the additional endpoint information, this is all the known networking information
1573 * that is not the preferred endpoint. Note the logic is evaluated twice so we can
1574 * correctly report the number of additional network arguments without using a deferred
1575 * map, an assertion is made at the end to check we set the right length. */
1576 int length = 0;
1577 if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) {
1578 length++;
1579 }
1580 if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME
1581 && hostname != NULL && hostname[0] != '\0')
1582 {
1583 length++;
1584 }
1585 addReplyMapLen(c, length);
1586
1587 if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_IP) {
1588 addReplyBulkCString(c, "ip");
1589 addReplyBulkCString(c, clusterNodeIp(node));
1590 length--;
1591 }
1592 if (server.cluster_preferred_endpoint_type != CLUSTER_ENDPOINT_TYPE_HOSTNAME
1593 && hostname != NULL && hostname[0] != '\0')
1594 {
1595 addReplyBulkCString(c, "hostname");
1596 addReplyBulkCString(c, hostname);
1597 length--;
1598 }
1599 serverAssert(length == 0);
1600}
1601
1602void addNodeReplyForClusterSlot(client *c, clusterNode *node, int start_slot, int end_slot) {
1603 int i, nested_elements = 3; /* slots (2) + master addr (1) */
1604 for (i = 0; i < clusterNodeNumSlaves(node); i++) {
1605 if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue;
1606 nested_elements++;
1607 }
1608 addReplyArrayLen(c, nested_elements);
1609 addReplyLongLong(c, start_slot);
1610 addReplyLongLong(c, end_slot);
1611 addNodeToNodeReply(c, node);
1612
1613 /* Remaining nodes in reply are replicas for slot range */
1614 for (i = 0; i < clusterNodeNumSlaves(node); i++) {
1615 /* This loop is copy/pasted from clusterGenNodeDescription()
1616 * with modifications for per-slot node aggregation. */
1617 if (!isReplicaAvailable(clusterNodeGetSlave(node, i))) continue;
1618 addNodeToNodeReply(c, clusterNodeGetSlave(node, i));
1619 nested_elements--;
1620 }
1621 serverAssert(nested_elements == 3); /* Original 3 elements */
1622}
1623
1624void clusterCommandSlots(client * c) {
1625 /* Format: 1) 1) start slot
1626 * 2) end slot
1627 * 3) 1) master IP
1628 * 2) master port
1629 * 3) node ID
1630 * 4) 1) replica IP
1631 * 2) replica port
1632 * 3) node ID
1633 * ... continued until done
1634 */
1635 clusterNode *n = NULL;
1636 int num_masters = 0, start = -1;
1637 void *slot_replylen = addReplyDeferredLen(c);
1638
1639 for (int i = 0; i <= CLUSTER_SLOTS; i++) {
1640 /* Find start node and slot id. */
1641 if (n == NULL) {
1642 if (i == CLUSTER_SLOTS) break;
1643 n = getNodeBySlot(i);
1644 start = i;
1645 continue;
1646 }
1647
1648 /* Add cluster slots info when occur different node with start
1649 * or end of slot. */
1650 if (i == CLUSTER_SLOTS || n != getNodeBySlot(i)) {
1651 addNodeReplyForClusterSlot(c, n, start, i-1);
1652 num_masters++;
1653 if (i == CLUSTER_SLOTS) break;
1654 n = getNodeBySlot(i);
1655 start = i;
1656 }
1657 }
1658 setDeferredArrayLen(c, slot_replylen, num_masters);
1659}
1660
1661/* -----------------------------------------------------------------------------
1662 * Cluster functions related to serving / redirecting clients
1663 * -------------------------------------------------------------------------- */
1664
1665/* The ASKING command is required after a -ASK redirection.
1666 * The client should issue ASKING before to actually send the command to
1667 * the target instance. See the Redis Cluster specification for more
1668 * information. */
1669void askingCommand(client *c) {
1670 if (server.cluster_enabled == 0) {
1671 addReplyError(c,"This instance has cluster support disabled");
1672 return;
1673 }
1674 c->flags |= CLIENT_ASKING;
1675 addReply(c,shared.ok);
1676}
1677
1678/* The READONLY command is used by clients to enter the read-only mode.
1679 * In this mode slaves will not redirect clients as long as clients access
1680 * with read-only commands to keys that are served by the slave's master. */
1681void readonlyCommand(client *c) {
1682 if (server.cluster_enabled == 0) {
1683 addReplyError(c,"This instance has cluster support disabled");
1684 return;
1685 }
1686 c->flags |= CLIENT_READONLY;
1687 addReply(c,shared.ok);
1688}
1689
1690/* Remove all the keys in the specified hash slot.
1691 * The number of removed items is returned. */
1692unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command) {
1693 unsigned int j = 0;
1694
1695 if (!kvstoreDictSize(server.db->keys, (int) hashslot))
1696 return 0;
1697
1698 kvstoreDictIterator kvs_di;
1699 dictEntry *de = NULL;
1700 kvstoreInitDictSafeIterator(&kvs_di, server.db->keys, (int) hashslot);
1701 while((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
1702 enterExecutionUnit(1, 0);
1703 sds sdskey = kvobjGetKey(dictGetKV(de));
1704 robj *key = createStringObject(sdskey, sdslen(sdskey));
1705 dbDelete(&server.db[0], key);
1706
1707 keyModified(NULL, &server.db[0], key, NULL, 1);
1708 if (by_command) {
1709 /* Keys are deleted by a command (trimslots), we need to notify the
1710 * keyspace event. Though, we don't need to propagate the DEL
1711 * command, as the command (trimslots) will be propagated. */
1712 notifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
1713 } else {
1714 /* Propagate the DEL command */
1715 propagateDeletion(&server.db[0], key, server.lazyfree_lazy_server_del);
1716 /* The keys are not actually logically deleted from the database,
1717 * just moved to another node. The modules needs to know that these
1718 * keys are no longer available locally, so just send the keyspace
1719 * notification to the modules, but not to clients. */
1720 moduleNotifyKeyspaceEvent(NOTIFY_GENERIC, "del", key, server.db[0].id);
1721 }
1722 exitExecutionUnit();
1723 postExecutionUnitOperations();
1724 decrRefCount(key);
1725 j++;
1726 server.dirty++;
1727 }
1728 kvstoreResetDictIterator(&kvs_di);
1729 return j;
1730}
1731
1732/* Delete the keys in the slot ranges. Returns the number of deleted items */
1733unsigned int clusterDelKeysInSlotRangeArray(slotRangeArray *slots, int by_command) {
1734 unsigned int j = 0;
1735 for (int i = 0; i < slots->num_ranges; i++) {
1736 for (int slot = slots->ranges[i].start; slot <= slots->ranges[i].end; slot++) {
1737 j += clusterDelKeysInSlot(slot, by_command);
1738 }
1739 }
1740 return j;
1741}
1742
1743int clusterIsMySlot(int slot) {
1744 return getMyClusterNode() == getNodeBySlot(slot);
1745}
1746
1747void replySlotsFlushAndFree(client *c, slotRangeArray *slots) {
1748 addReplyArrayLen(c, slots->num_ranges);
1749 for (int i = 0 ; i < slots->num_ranges ; i++) {
1750 addReplyArrayLen(c, 2);
1751 addReplyLongLong(c, slots->ranges[i].start);
1752 addReplyLongLong(c, slots->ranges[i].end);
1753 }
1754 slotRangeArrayFree(slots);
1755}
1756
1757/* Normalizes (sorts and merges adjacent ranges), checks that slot ranges are
1758 * well-formed and non-overlapping. */
1759int slotRangeArrayNormalizeAndValidate(slotRangeArray *slots, sds *err) {
1760 unsigned char used_slots[CLUSTER_SLOTS] = {0};
1761
1762 if (slots->num_ranges <= 0 || slots->num_ranges >= CLUSTER_SLOTS) {
1763 *err = sdscatprintf(sdsempty(), "invalid number of slot ranges: %d", slots->num_ranges);
1764 return C_ERR;
1765 }
1766
1767 /* Sort and merge adjacent slot ranges. */
1768 slotRangeArraySortAndMerge(slots);
1769
1770 for (int i = 0; i < slots->num_ranges; i++) {
1771 if (slots->ranges[i].start >= CLUSTER_SLOTS ||
1772 slots->ranges[i].end >= CLUSTER_SLOTS)
1773 {
1774 *err = sdscatprintf(sdsempty(), "slot range is out of range: %d-%d",
1775 slots->ranges[i].start, slots->ranges[i].end);
1776 return C_ERR;
1777 }
1778
1779 if (slots->ranges[i].start > slots->ranges[i].end) {
1780 *err = sdscatprintf(sdsempty(), "start slot number %d is greater than end slot number %d",
1781 slots->ranges[i].start, slots->ranges[i].end);
1782 return C_ERR;
1783 }
1784
1785 for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) {
1786 if (used_slots[j]) {
1787 *err = sdscatprintf(sdsempty(), "Slot %d specified multiple times", j);
1788 return C_ERR;
1789 }
1790 used_slots[j]++;
1791 }
1792 }
1793 return C_OK;
1794}
1795
1796/* Create a slot range array with the specified number of ranges. */
1797slotRangeArray *slotRangeArrayCreate(int num_ranges) {
1798 slotRangeArray *slots = zcalloc(sizeof(slotRangeArray) + num_ranges * sizeof(slotRange));
1799 slots->num_ranges = num_ranges;
1800 return slots;
1801}
1802
1803/* Duplicate the slot range array. */
1804slotRangeArray *slotRangeArrayDup(slotRangeArray *slots) {
1805 slotRangeArray *dup = slotRangeArrayCreate(slots->num_ranges);
1806 memcpy(dup->ranges, slots->ranges, sizeof(slotRange) * slots->num_ranges);
1807 return dup;
1808}
1809
1810/* Set the slot range at the specified index. */
1811void slotRangeArraySet(slotRangeArray *slots, int idx, int start, int end) {
1812 slots->ranges[idx].start = start;
1813 slots->ranges[idx].end = end;
1814}
1815
1816/* Create a slot range string in the format of: "1000-2000 3000-4000 ..." */
1817sds slotRangeArrayToString(slotRangeArray *slots) {
1818 sds s = sdsempty();
1819 if (slots == NULL || slots->num_ranges == 0) return s;
1820
1821 for (int i = 0; i < slots->num_ranges; i++) {
1822 slotRange *sr = &slots->ranges[i];
1823 s = sdscatprintf(s, "%d-%d ", sr->start, sr->end);
1824 }
1825 sdssetlen(s, sdslen(s) - 1);
1826 s[sdslen(s)] = '\0';
1827
1828 return s;
1829}
1830
1831/* Parse a slot range string in the format "1000-2000 3000-4000 ..." into a slotRangeArray.
1832 * Returns a new slotRangeArray on success, NULL on failure. */
1833slotRangeArray *slotRangeArrayFromString(sds data) {
1834 int num_ranges;
1835 long long start, end;
1836 slotRangeArray *slots = NULL;
1837 if (!data || sdslen(data) == 0) return NULL;
1838
1839 sds *parts = sdssplitlen(data, sdslen(data), " ", 1, &num_ranges);
1840 if (num_ranges <= 0) goto err;
1841
1842 slots = slotRangeArrayCreate(num_ranges);
1843
1844 /* Parse each slot range */
1845 for (int i = 0; i < num_ranges; i++) {
1846 char *dash = strchr(parts[i], '-');
1847 if (!dash) goto err;
1848
1849 if (string2ll(parts[i], dash - parts[i], &start) == 0 ||
1850 string2ll(dash + 1, sdslen(parts[i]) - (dash - parts[i]) - 1, &end) == 0)
1851 goto err;
1852 slotRangeArraySet(slots, i, start, end);
1853 }
1854
1855 /* Validate all ranges */
1856 sds err_msg = NULL;
1857 if (slotRangeArrayNormalizeAndValidate(slots, &err_msg) != C_OK) {
1858 if (err_msg) sdsfree(err_msg);
1859 goto err;
1860 }
1861 sdsfreesplitres(parts, num_ranges);
1862 return slots;
1863
1864err:
1865 if (slots) slotRangeArrayFree(slots);
1866 sdsfreesplitres(parts, num_ranges);
1867 return NULL;
1868}
1869
1870static int compareSlotRange(const void *a, const void *b) {
1871 const slotRange *sa = a;
1872 const slotRange *sb = b;
1873 if (sa->start < sb->start) return -1;
1874 if (sa->start > sb->start) return 1;
1875 return 0;
1876}
1877
1878/* Sort slot ranges by start slot and merge adjacent ranges.
1879 * Adjacent means: prev.end + 1 == next.start.
1880 * e.g. 1000-2000 2001-3000 0-100 => 0-100 1000-3000
1881 *
1882 * Note: Overlapping ranges are not merged.*/
1883void slotRangeArraySortAndMerge(slotRangeArray *slots) {
1884 if (!slots || slots->num_ranges <= 1) return;
1885
1886 qsort(slots->ranges, slots->num_ranges, sizeof(slotRange), compareSlotRange);
1887
1888 int idx = 0;
1889 for (int i = 1; i < slots->num_ranges; i++) {
1890 if (slots->ranges[idx].end + 1 == slots->ranges[i].start)
1891 slots->ranges[idx].end = slots->ranges[i].end;
1892 else
1893 slots->ranges[++idx] = slots->ranges[i];
1894 }
1895 slots->num_ranges = idx + 1;
1896}
1897
1898/* Compare two slot range arrays, return 1 if equal, 0 otherwise */
1899int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2) {
1900 slotRangeArraySortAndMerge(slots1);
1901 slotRangeArraySortAndMerge(slots2);
1902
1903 if (slots1->num_ranges != slots2->num_ranges) return 0;
1904
1905 for (int i = 0; i < slots1->num_ranges; i++) {
1906 if (slots1->ranges[i].start != slots2->ranges[i].start ||
1907 slots1->ranges[i].end != slots2->ranges[i].end) {
1908 return 0;
1909 }
1910 }
1911 return 1;
1912}
1913
1914/* Add a slot to the slot range array.
1915 * Usage:
1916 * slotRangeArray *slots = NULL
1917 * slots = slotRangeArrayAppend(slots, 1000);
1918 * slots = slotRangeArrayAppend(slots, 1001);
1919 * slots = slotRangeArrayAppend(slots, 1003);
1920 * slots = slotRangeArrayAppend(slots, 1004);
1921 * slots = slotRangeArrayAppend(slots, 1005);
1922 *
1923 * Result: 1000-1001, 1003-1005
1924 * Note: `slot` must be greater than the previous slot.
1925 * */
1926slotRangeArray *slotRangeArrayAppend(slotRangeArray *slots, int slot) {
1927 if (slots == NULL) {
1928 slots = slotRangeArrayCreate(4);
1929 slots->ranges[0].start = slot;
1930 slots->ranges[0].end = slot;
1931 slots->num_ranges = 1;
1932 return slots;
1933 }
1934
1935 serverAssert(slots->num_ranges >= 0 && slots->num_ranges <= CLUSTER_SLOTS);
1936 serverAssert(slot > slots->ranges[slots->num_ranges - 1].end);
1937
1938 /* Check if we can extend the last range */
1939 slotRange *last = &slots->ranges[slots->num_ranges - 1];
1940 if (slot == last->end + 1) {
1941 last->end = slot;
1942 return slots;
1943 }
1944
1945 /* Calculate current capacity and reallocate if needed */
1946 int cap = (int) ((zmalloc_size(slots) - sizeof(slotRangeArray)) / sizeof(slotRange));
1947 if (slots->num_ranges >= cap)
1948 slots = zrealloc(slots, sizeof(slotRangeArray) + sizeof(slotRange) * cap * 2);
1949
1950 /* Add new single-slot range */
1951 slots->ranges[slots->num_ranges].start = slot;
1952 slots->ranges[slots->num_ranges].end = slot;
1953 slots->num_ranges++;
1954
1955 return slots;
1956}
1957
1958/* Returns 1 if the slot range array contains the given slot, 0 otherwise. */
1959int slotRangeArrayContains(slotRangeArray *slots, unsigned int slot) {
1960 for (int i = 0; i < slots->num_ranges; i++)
1961 if (slots->ranges[i].start <= slot && slots->ranges[i].end >= slot)
1962 return 1;
1963 return 0;
1964}
1965
1966/* Free the slot range array. */
1967void slotRangeArrayFree(slotRangeArray *slots) {
1968 zfree(slots);
1969}
1970
1971/* Generic version of slotRangeArrayFree(). */
1972void slotRangeArrayFreeGeneric(void *slots) {
1973 slotRangeArrayFree(slots);
1974}
1975
1976/* Slot range array iterator */
1977slotRangeArrayIter *slotRangeArrayGetIterator(slotRangeArray *slots) {
1978 slotRangeArrayIter *it = zmalloc(sizeof(*it));
1979 it->slots = slots;
1980 it->range_index = 0;
1981 it->cur_slot = slots->num_ranges > 0 ? slots->ranges[0].start : -1;
1982 return it;
1983}
1984
1985/* Returns the next slot in the array, or -1 if there are no more slots. */
1986int slotRangeArrayNext(slotRangeArrayIter *it) {
1987 if (it->range_index >= it->slots->num_ranges) return -1;
1988
1989 if (it->cur_slot < it->slots->ranges[it->range_index].end) {
1990 it->cur_slot++;
1991 } else {
1992 it->range_index++;
1993 if (it->range_index < it->slots->num_ranges)
1994 it->cur_slot = it->slots->ranges[it->range_index].start;
1995 else
1996 it->cur_slot = -1; /* finished */
1997 }
1998 return it->cur_slot;
1999}
2000
2001int slotRangeArrayGetCurrentSlot(slotRangeArrayIter *it) {
2002 return it->cur_slot;
2003}
2004
2005void slotRangeArrayIteratorFree(slotRangeArrayIter *it) {
2006 zfree(it);
2007}
2008
2009/* Parse slot range pairs from argv starting at `pos`.
2010 * `argc` is the argument count, `pos` is the first slot argument index.
2011 * Returns a slotRangeArray or NULL on error. */
2012slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos) {
2013 int start, end, count;
2014 slotRangeArray *slots;
2015
2016 /* Ensure there is at least one (start,end) slot range pairs. */
2017 if (argc < 0 || pos < 0 || pos >= argc || (argc - pos) < 2 || ((argc - pos) % 2) != 0) {
2018 addReplyErrorArity(c);
2019 return NULL;
2020 }
2021
2022 count = (argc - pos) / 2;
2023 slots = slotRangeArrayCreate(count);
2024 slots->num_ranges = 0;
2025
2026 for (int j = pos; j < argc; j += 2) {
2027 if ((start = getSlotOrReply(c, c->argv[j])) == -1 ||
2028 (end = getSlotOrReply(c, c->argv[j + 1])) == -1)
2029 {
2030 slotRangeArrayFree(slots);
2031 return NULL;
2032 }
2033 slotRangeArraySet(slots, slots->num_ranges, start, end);
2034 slots->num_ranges++;
2035 }
2036
2037 sds err = NULL;
2038 if (slotRangeArrayNormalizeAndValidate(slots, &err) != C_OK) {
2039 addReplyErrorSds(c, err);
2040 slotRangeArrayFree(slots);
2041 return NULL;
2042 }
2043 return slots;
2044}
2045
2046/* Return 1 if the keys in the slot can be accessed, 0 otherwise. */
2047int clusterCanAccessKeysInSlot(int slot) {
2048 /* If not in cluster mode, all keys are accessible */
2049 if (server.cluster_enabled == 0) return 1;
2050
2051 /* If the slot is being imported under old slot migration approach, we should
2052 * allow to list keys from the slot as previously. */
2053 if (getImportingSlotSource(slot)) return 1;
2054
2055 /* If using atomic slot migration, check if the slot belongs to the current
2056 * node or its master, return 1 if so. */
2057 clusterNode *myself = getMyClusterNode();
2058 if (clusterNodeIsSlave(myself)) {
2059 clusterNode *master = clusterNodeGetMaster(myself);
2060 if (master && clusterNodeCoversSlot(master, slot))
2061 return 1;
2062 } else {
2063 if (clusterNodeCoversSlot(myself, slot))
2064 return 1;
2065 }
2066 return 0;
2067}
2068
2069/* Return the slot ranges that belong to the current node or its master. */
2070slotRangeArray *clusterGetLocalSlotRanges(void) {
2071 slotRangeArray *slots = NULL;
2072
2073 if (!server.cluster_enabled) {
2074 slots = slotRangeArrayCreate(1);
2075 slotRangeArraySet(slots, 0, 0, CLUSTER_SLOTS - 1);
2076 return slots;
2077 }
2078
2079 clusterNode *master = clusterNodeGetMaster(getMyClusterNode());
2080 if (master) {
2081 for (int i = 0; i < CLUSTER_SLOTS; i++) {
2082 if (clusterNodeCoversSlot(master, i))
2083 slots = slotRangeArrayAppend(slots, i);
2084 }
2085 }
2086 return slots ? slots : slotRangeArrayCreate(0);
2087}
2088
2089/* Partially flush destination DB in a cluster node, based on the slot range.
2090 *
2091 * Usage: SFLUSH <start-slot> <end slot> [<start-slot> <end slot>]* [SYNC|ASYNC]
2092 *
2093 * This is an initial implementation of SFLUSH (slots flush) which is limited to
2094 * flushing a single shard as a whole, but in the future the same command may be
2095 * used to partially flush a shard based on hash slots. Currently only if provided
2096 * slots cover entirely the slots of a node, the node will be flushed and the
2097 * return value will be pairs of slot ranges. Otherwise, a single empty set will
2098 * be returned. If possible, SFLUSH SYNC will be run as blocking ASYNC as an
2099 * optimization.
2100 */
2101void sflushCommand(client *c) {
2102 int flags = EMPTYDB_NO_FLAGS, argc = c->argc;
2103
2104 if (server.cluster_enabled == 0) {
2105 addReplyError(c,"This instance has cluster support disabled");
2106 return;
2107 }
2108
2109 /* check if last argument is SYNC or ASYNC */
2110 if (!strcasecmp(c->argv[c->argc-1]->ptr,"sync")) {
2111 flags = EMPTYDB_NO_FLAGS;
2112 argc--;
2113 } else if (!strcasecmp(c->argv[c->argc-1]->ptr,"async")) {
2114 flags = EMPTYDB_ASYNC;
2115 argc--;
2116 } else if (server.lazyfree_lazy_user_flush) {
2117 flags = EMPTYDB_ASYNC;
2118 }
2119
2120 /* parse the slot range */
2121 if (argc % 2 == 0) {
2122 addReplyErrorArity(c);
2123 return;
2124 }
2125
2126 /* Parse slot ranges from the command arguments. */
2127 slotRangeArray *slots = parseSlotRangesOrReply(c, argc, 1);
2128 if (!slots) return;
2129
2130 /* Iterate and find the slot ranges that belong to this node. Save them in
2131 * a new slotRangeArray. It is allocated on heap since there is a chance
2132 * that FLUSH SYNC will be running as blocking ASYNC and only later reply
2133 * with slot ranges */
2134 unsigned char slots_to_flush[CLUSTER_SLOTS] = {0}; /* Requested slots to flush */
2135 slotRangeArray *myslots = NULL;
2136 for (int i = 0; i < slots->num_ranges; i++) {
2137 for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) {
2138 if (clusterIsMySlot(j)) {
2139 myslots = slotRangeArrayAppend(myslots, j);
2140 slots_to_flush[j] = 1;
2141 }
2142 }
2143 }
2144
2145 /* Verify that all slots of mynode got covered. See sflushCommand() comment. */
2146 int all_slots_covered = 1;
2147 for (int i = 0; i < CLUSTER_SLOTS; i++) {
2148 if (clusterIsMySlot(i) && !slots_to_flush[i]) {
2149 all_slots_covered = 0;
2150 break;
2151 }
2152 }
2153 if (myslots == NULL || !all_slots_covered) {
2154 addReplyArrayLen(c, 0);
2155 slotRangeArrayFree(slots);
2156 slotRangeArrayFree(myslots);
2157 return;
2158 }
2159 slotRangeArrayFree(slots);
2160
2161 /* Flush selected slots. If not flush as blocking async, then reply immediately */
2162 if (flushCommandCommon(c, FLUSH_TYPE_SLOTS, flags, myslots) == 0)
2163 replySlotsFlushAndFree(c, myslots);
2164}
2165
2166/* The READWRITE command just clears the READONLY command state. */
2167void readwriteCommand(client *c) {
2168 if (server.cluster_enabled == 0) {
2169 addReplyError(c,"This instance has cluster support disabled");
2170 return;
2171 }
2172 c->flags &= ~CLIENT_READONLY;
2173 addReply(c,shared.ok);
2174}
2175
2176/* Resets transient cluster stats that we expose via INFO or other means that we want
2177 * to reset via CONFIG RESETSTAT. The function is also used in order to
2178 * initialize these fields in clusterInit() at server startup. */
2179void resetClusterStats(void) {
2180 if (!server.cluster_enabled) return;
2181
2182 clusterSlotStatResetAll();
2183}
2184
2185/* This function is called at server startup in order to initialize cluster data
2186 * structures that are shared between the different cluster implementations. */
2187void clusterCommonInit(void) {
2188 resetClusterStats();
2189 asmInit();
2190}
2191
2192/* This function is called after the node startup in order to check if there
2193 * are any slots that we have keys for, but are not assigned to us. If so,
2194 * we delete the keys. */
2195void clusterDeleteKeysInUnownedSlots(void) {
2196 if (clusterNodeIsSlave(getMyClusterNode())) return;
2197
2198 /* Check that all the slots we have keys for are assigned to us. Otherwise,
2199 * delete the keys. */
2200 for (int i = 0; i < CLUSTER_SLOTS; i++) {
2201 /* Skip if: no keys in the slot, it's our slot, or we are importing it. */
2202 if (!countKeysInSlot(i) ||
2203 clusterIsMySlot(i) ||
2204 getImportingSlotSource(i))
2205 {
2206 continue;
2207 }
2208
2209 serverLog(LL_NOTICE, "I have keys for slot %d, but the slot is "
2210 "assigned to another node. "
2211 "Deleting keys in the slot.", i);
2212 /* With atomic slot migration, it is safe to drop keys from slots
2213 * that are not owned. This will not result in data loss under the
2214 * legacy slot migration approach either, since the importing state
2215 * has already been persisted in node.conf. */
2216 clusterDelKeysInSlot(i, 0);
2217 }
2218}
2219
2220
2221/* This function is called after the node startup in order to verify that data
2222 * loaded from disk is in agreement with the cluster configuration:
2223 *
2224 * 1) If we find keys about hash slots we have no responsibility for, the
2225 * following happens:
2226 * A) If no other node is in charge according to the current cluster
2227 * configuration, we add these slots to our node.
2228 * B) If according to our config other nodes are already in charge for
2229 * this slots, we set the slots as IMPORTING from our point of view
2230 * in order to justify we have those slots, and in order to make
2231 * redis-cli aware of the issue, so that it can try to fix it.
2232 * 2) If we find data in a DB different than DB0 we return C_ERR to
2233 * signal the caller it should quit the server with an error message
2234 * or take other actions.
2235 *
2236 * The function always returns C_OK even if it will try to correct
2237 * the error described in "1". However if data is found in DB different
2238 * from DB0, C_ERR is returned.
2239 *
2240 * The function also uses the logging facility in order to warn the user
2241 * about desynchronizations between the data we have in memory and the
2242 * cluster configuration. */
2243int verifyClusterConfigWithData(void) {
2244 /* Return ASAP if a module disabled cluster redirections. In that case
2245 * every master can store keys about every possible hash slot. */
2246 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
2247 return C_OK;
2248
2249 /* If this node is a slave, don't perform the check at all as we
2250 * completely depend on the replication stream. */
2251 if (clusterNodeIsSlave(getMyClusterNode())) return C_OK;
2252
2253 /* Make sure we only have keys in DB0. */
2254 for (int i = 1; i < server.dbnum; i++) {
2255 if (kvstoreSize(server.db[i].keys)) return C_ERR;
2256 }
2257
2258 /* Take over slots that we have keys for, but are assigned to no one. */
2259 clusterClaimUnassignedSlots();
2260 /* Delete keys in unowned slots */
2261 clusterDeleteKeysInUnownedSlots();
2262 return C_OK;
2263}