aboutsummaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/cluster_legacy.c
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:52:54 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:52:54 +0100
commitdcacc00e3750300617ba6e16eb346713f91a783a (patch)
tree38e2d4fb5ed9d119711d4295c6eda4b014af73fd /examples/redis-unstable/src/cluster_legacy.c
parent58dac10aeb8f5a041c46bddbeaf4c7966a99b998 (diff)
downloadcrep-dcacc00e3750300617ba6e16eb346713f91a783a.tar.gz
Remove testing data
Diffstat (limited to 'examples/redis-unstable/src/cluster_legacy.c')
-rw-r--r--examples/redis-unstable/src/cluster_legacy.c6581
1 files changed, 0 insertions, 6581 deletions
diff --git a/examples/redis-unstable/src/cluster_legacy.c b/examples/redis-unstable/src/cluster_legacy.c
deleted file mode 100644
index c93aea2..0000000
--- a/examples/redis-unstable/src/cluster_legacy.c
+++ /dev/null
@@ -1,6581 +0,0 @@
1/*
2 * Copyright (c) 2009-Present, Redis Ltd.
3 * All rights reserved.
4 *
5 * Copyright (c) 2024-present, Valkey contributors.
6 * All rights reserved.
7 *
8 * Licensed under your choice of (a) the Redis Source Available License 2.0
9 * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
10 * GNU Affero General Public License v3 (AGPLv3).
11 *
12 * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
13 */
14
15/*
16 * cluster_legacy.c contains the implementation of the cluster API that is
17 * specific to the standard, Redis cluster-bus based clustering mechanism.
18 */
19
20#include "server.h"
21#include "cluster.h"
22#include "cluster_legacy.h"
23#include "cluster_asm.h"
24#include "cluster_slot_stats.h"
25#include "endianconv.h"
26#include "connection.h"
27
28#include <sys/types.h>
29#include <sys/socket.h>
30#include <arpa/inet.h>
31#include <fcntl.h>
32#include <unistd.h>
33#include <sys/stat.h>
34#include <math.h>
35#include <sys/file.h>
36
37/* A global reference to myself is handy to make code more clear.
38 * Myself always points to server.cluster->myself, that is, the clusterNode
39 * that represents this node. */
40clusterNode *myself = NULL;
41
42clusterNode *createClusterNode(char *nodename, int flags);
43void clusterAddNode(clusterNode *node);
44void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask);
45void clusterReadHandler(connection *conn);
46void clusterSendPing(clusterLink *link, int type);
47void clusterSendFail(char *nodename);
48void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request);
49void clusterUpdateState(void);
50int clusterNodeCoversSlot(clusterNode *n, int slot);
51list *clusterGetNodesInMyShard(clusterNode *node);
52int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
53int clusterAddSlot(clusterNode *n, int slot);
54int clusterDelSlot(int slot);
55int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node);
56int clusterDelNodeSlots(clusterNode *node);
57int clusterNodeSetSlotBit(clusterNode *n, int slot);
58void clusterSetMaster(clusterNode *n);
59void clusterHandleSlaveFailover(void);
60void clusterHandleSlaveMigration(int max_slaves);
61int bitmapTestBit(unsigned char *bitmap, int pos);
62void bitmapSetBit(unsigned char *bitmap, int pos);
63void bitmapClearBit(unsigned char *bitmap, int pos);
64void clusterDoBeforeSleep(int flags);
65void clusterSendUpdate(clusterLink *link, clusterNode *node);
66void resetManualFailover(void);
67void clusterCloseAllSlots(void);
68void clusterSetNodeAsMaster(clusterNode *n);
69void clusterDelNode(clusterNode *delnode);
70sds representClusterNodeFlags(sds ci, uint16_t flags);
71sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pairs_count);
72void clusterFreeNodesSlotsInfo(clusterNode *n);
73uint64_t clusterGetMaxEpoch(void);
74int clusterBumpConfigEpochWithoutConsensus(void);
75void moduleCallClusterReceivers(const char *sender_id, uint64_t module_id, uint8_t type, const unsigned char *payload, uint32_t len);
76const char *clusterGetMessageTypeString(int type);
77void removeChannelsInSlot(unsigned int slot);
78unsigned int countKeysInSlot(unsigned int hashslot);
79unsigned int countChannelsInSlot(unsigned int hashslot);
80unsigned int clusterDelKeysInSlot(unsigned int hashslot, int flags);
81void clusterAddNodeToShard(const char *shard_id, clusterNode *node);
82list *clusterLookupNodeListByShardId(const char *shard_id);
83void clusterRemoveNodeFromShard(clusterNode *node);
84int auxShardIdSetter(clusterNode *n, void *value, int length);
85sds auxShardIdGetter(clusterNode *n, sds s);
86int auxShardIdPresent(clusterNode *n);
87int auxHumanNodenameSetter(clusterNode *n, void *value, int length);
88sds auxHumanNodenameGetter(clusterNode *n, sds s);
89int auxHumanNodenamePresent(clusterNode *n);
90int auxTcpPortSetter(clusterNode *n, void *value, int length);
91sds auxTcpPortGetter(clusterNode *n, sds s);
92int auxTcpPortPresent(clusterNode *n);
93int auxTlsPortSetter(clusterNode *n, void *value, int length);
94sds auxTlsPortGetter(clusterNode *n, sds s);
95int auxTlsPortPresent(clusterNode *n);
96static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
97void freeClusterLink(clusterLink *link);
98int verifyClusterNodeId(const char *name, int length);
99static void updateShardId(clusterNode *node, const char *shard_id);
100
101int getNodeDefaultClientPort(clusterNode *n) {
102 return server.tls_cluster ? n->tls_port : n->tcp_port;
103}
104
105static inline int getNodeDefaultReplicationPort(clusterNode *n) {
106 return server.tls_replication ? n->tls_port : n->tcp_port;
107}
108
109int clusterNodeClientPort(clusterNode *n, int use_tls) {
110 return use_tls ? n->tls_port : n->tcp_port;
111}
112
113static inline int defaultClientPort(void) {
114 return server.tls_cluster ? server.tls_port : server.port;
115}
116
117#define isSlotUnclaimed(slot) \
118 (server.cluster->slots[slot] == NULL || \
119 bitmapTestBit(server.cluster->owner_not_claiming_slot, slot))
120
121#define RCVBUF_INIT_LEN 1024
122#define RCVBUF_MAX_PREALLOC (1<<20) /* 1MB */
123
124/* Cluster nodes hash table, mapping nodes addresses 1.2.3.4:6379 to
125 * clusterNode structures. */
126dictType clusterNodesDictType = {
127 dictSdsHash, /* hash function */
128 NULL, /* key dup */
129 NULL, /* val dup */
130 dictSdsKeyCompare, /* key compare */
131 dictSdsDestructor, /* key destructor */
132 NULL, /* val destructor */
133 NULL /* allow to expand */
134};
135
136/* Cluster re-addition blacklist. This maps node IDs to the time
137 * we can re-add this node. The goal is to avoid reading a removed
138 * node for some time. */
139dictType clusterNodesBlackListDictType = {
140 dictSdsCaseHash, /* hash function */
141 NULL, /* key dup */
142 NULL, /* val dup */
143 dictSdsKeyCaseCompare, /* key compare */
144 dictSdsDestructor, /* key destructor */
145 NULL, /* val destructor */
146 NULL /* allow to expand */
147};
148
149/* Cluster shards hash table, mapping shard id to list of nodes */
150dictType clusterSdsToListType = {
151 dictSdsHash, /* hash function */
152 NULL, /* key dup */
153 NULL, /* val dup */
154 dictSdsKeyCompare, /* key compare */
155 dictSdsDestructor, /* key destructor */
156 dictListDestructor, /* val destructor */
157 NULL /* allow to expand */
158};
159
160/* Aux fields are introduced in Redis 7.2 to support the persistence
161 * of various important node properties, such as shard id, in nodes.conf.
162 * Aux fields take an explicit format of name=value pairs and have no
163 * intrinsic order among them. Aux fields are always grouped together
164 * at the end of the second column of each row after the node's IP
165 * address/port/cluster_port and the optional hostname. Aux fields
166 * are separated by ','. */
167
168/* Aux field setter function prototype
169 * return C_OK when the update is successful; C_ERR otherwise */
170typedef int (aux_value_setter) (clusterNode* n, void *value, int length);
171/* Aux field getter function prototype
172 * return an sds that is a concatenation of the input sds string and
173 * the aux value */
174typedef sds (aux_value_getter) (clusterNode* n, sds s);
175
176typedef int (aux_value_present) (clusterNode* n);
177
178typedef struct {
179 char *field;
180 aux_value_setter *setter;
181 aux_value_getter *getter;
182 aux_value_present *isPresent;
183} auxFieldHandler;
184
185/* Assign index to each aux field */
186typedef enum {
187 af_shard_id,
188 af_human_nodename,
189 af_tcp_port,
190 af_tls_port,
191 af_count,
192} auxFieldIndex;
193
194/* Note that
195 * 1. the order of the elements below must match that of their
196 * indices as defined in auxFieldIndex
197 * 2. aux name can contain characters that pass the isValidAuxChar check only */
198auxFieldHandler auxFieldHandlers[] = {
199 {"shard-id", auxShardIdSetter, auxShardIdGetter, auxShardIdPresent},
200 {"nodename", auxHumanNodenameSetter, auxHumanNodenameGetter, auxHumanNodenamePresent},
201 {"tcp-port", auxTcpPortSetter, auxTcpPortGetter, auxTcpPortPresent},
202 {"tls-port", auxTlsPortSetter, auxTlsPortGetter, auxTlsPortPresent},
203};
204
205int auxShardIdSetter(clusterNode *n, void *value, int length) {
206 if (verifyClusterNodeId(value, length) == C_ERR) {
207 return C_ERR;
208 }
209 memcpy(n->shard_id, value, CLUSTER_NAMELEN);
210 /* if n already has replicas, make sure they all use
211 * the primary shard id */
212 for (int i = 0; i < n->numslaves; i++) {
213 if (memcmp(n->slaves[i]->shard_id, n->shard_id, CLUSTER_NAMELEN) != 0)
214 updateShardId(n->slaves[i], n->shard_id);
215 }
216 clusterAddNodeToShard(value, n);
217 return C_OK;
218}
219
220sds auxShardIdGetter(clusterNode *n, sds s) {
221 return sdscatprintf(s, "%.40s", n->shard_id);
222}
223
224int auxShardIdPresent(clusterNode *n) {
225 return strlen(n->shard_id);
226}
227
228int auxHumanNodenameSetter(clusterNode *n, void *value, int length) {
229 if (n && !strncmp(value, n->human_nodename, length)) {
230 return C_OK;
231 } else if (!n && (length == 0)) {
232 return C_OK;
233 }
234 if (n) {
235 n->human_nodename = sdscpylen(n->human_nodename, value, length);
236 } else if (sdslen(n->human_nodename) != 0) {
237 sdsclear(n->human_nodename);
238 } else {
239 return C_ERR;
240 }
241 return C_OK;
242}
243
244sds auxHumanNodenameGetter(clusterNode *n, sds s) {
245 return sdscatprintf(s, "%s", n->human_nodename);
246}
247
248int auxHumanNodenamePresent(clusterNode *n) {
249 return sdslen(n->human_nodename);
250}
251
252int auxTcpPortSetter(clusterNode *n, void *value, int length) {
253 if (length > 5 || length < 1) {
254 return C_ERR;
255 }
256 char buf[length + 1];
257 memcpy(buf, (char*)value, length);
258 buf[length] = '\0';
259 n->tcp_port = atoi(buf);
260 return (n->tcp_port < 0 || n->tcp_port >= 65536) ? C_ERR : C_OK;
261}
262
263sds auxTcpPortGetter(clusterNode *n, sds s) {
264 return sdscatprintf(s, "%d", n->tcp_port);
265}
266
267int auxTcpPortPresent(clusterNode *n) {
268 return n->tcp_port >= 0 && n->tcp_port < 65536;
269}
270
271int auxTlsPortSetter(clusterNode *n, void *value, int length) {
272 if (length > 5 || length < 1) {
273 return C_ERR;
274 }
275 char buf[length + 1];
276 memcpy(buf, (char*)value, length);
277 buf[length] = '\0';
278 n->tls_port = atoi(buf);
279 return (n->tls_port < 0 || n->tls_port >= 65536) ? C_ERR : C_OK;
280}
281
282sds auxTlsPortGetter(clusterNode *n, sds s) {
283 return sdscatprintf(s, "%d", n->tls_port);
284}
285
286int auxTlsPortPresent(clusterNode *n) {
287 return n->tls_port >= 0 && n->tls_port < 65536;
288}
289
290/* clusterLink send queue blocks */
291typedef struct {
292 size_t totlen; /* Total length of this block including the message */
293 int refcount; /* Number of cluster link send msg queues containing the message */
294 clusterMsg msg[];
295} clusterMsgSendBlock;
296
297/* Helper function to extract a normal message from a send block. */
298static clusterMsg *getMessageFromSendBlock(clusterMsgSendBlock *msgblock) {
299 return &msgblock->msg[0];
300}
301
302/* -----------------------------------------------------------------------------
303 * Initialization
304 * -------------------------------------------------------------------------- */
305
306/* Load the cluster config from 'filename'.
307 *
308 * If the file does not exist or is zero-length (this may happen because
309 * when we lock the nodes.conf file, we create a zero-length one for the
310 * sake of locking if it does not already exist), C_ERR is returned.
311 * If the configuration was loaded from the file, C_OK is returned. */
312int clusterLoadConfig(char *filename) {
313 FILE *fp = fopen(filename,"r");
314 struct stat sb;
315 char *line;
316 int maxline, j;
317
318 if (fp == NULL) {
319 if (errno == ENOENT) {
320 return C_ERR;
321 } else {
322 serverLog(LL_WARNING,
323 "Loading the cluster node config from %s: %s",
324 filename, strerror(errno));
325 exit(1);
326 }
327 }
328
329 if (redis_fstat(fileno(fp),&sb) == -1) {
330 serverLog(LL_WARNING,
331 "Unable to obtain the cluster node config file stat %s: %s",
332 filename, strerror(errno));
333 exit(1);
334 }
335 /* Check if the file is zero-length: if so return C_ERR to signal
336 * we have to write the config. */
337 if (sb.st_size == 0) {
338 fclose(fp);
339 return C_ERR;
340 }
341
342 /* Parse the file. Note that single lines of the cluster config file can
343 * be really long as they include all the hash slots of the node.
344 * This means in the worst possible case, half of the Redis slots will be
345 * present in a single line, possibly in importing or migrating state, so
346 * together with the node ID of the sender/receiver.
347 *
348 * To simplify we allocate 1024+CLUSTER_SLOTS*128 bytes per line. */
349 maxline = 1024+CLUSTER_SLOTS*128;
350 line = zmalloc(maxline);
351 while(fgets(line,maxline,fp) != NULL) {
352 int argc, aux_argc;
353 sds *argv, *aux_argv;
354 clusterNode *n, *master;
355 char *p, *s;
356
357 /* Skip blank lines, they can be created either by users manually
358 * editing nodes.conf or by the config writing process if stopped
359 * before the truncate() call. */
360 if (line[0] == '\n' || line[0] == '\0') continue;
361
362 /* Split the line into arguments for processing. */
363 argv = sdssplitargs(line,&argc);
364 if (argv == NULL) goto fmterr;
365
366 /* Handle the special "vars" line. Don't pretend it is the last
367 * line even if it actually is when generated by Redis. */
368 if (strcasecmp(argv[0],"vars") == 0) {
369 if (!(argc % 2)) goto fmterr;
370 for (j = 1; j < argc; j += 2) {
371 if (strcasecmp(argv[j],"currentEpoch") == 0) {
372 server.cluster->currentEpoch =
373 strtoull(argv[j+1],NULL,10);
374 } else if (strcasecmp(argv[j],"lastVoteEpoch") == 0) {
375 server.cluster->lastVoteEpoch =
376 strtoull(argv[j+1],NULL,10);
377 } else {
378 serverLog(LL_NOTICE,
379 "Skipping unknown cluster config variable '%s'",
380 argv[j]);
381 }
382 }
383 sdsfreesplitres(argv,argc);
384 continue;
385 }
386
387 /* Regular config lines have at least eight fields */
388 if (argc < 8) {
389 sdsfreesplitres(argv,argc);
390 goto fmterr;
391 }
392
393 /* Create this node if it does not exist */
394 if (verifyClusterNodeId(argv[0], sdslen(argv[0])) == C_ERR) {
395 sdsfreesplitres(argv, argc);
396 goto fmterr;
397 }
398 n = clusterLookupNode(argv[0], sdslen(argv[0]));
399 if (!n) {
400 n = createClusterNode(argv[0],0);
401 clusterAddNode(n);
402 }
403 /* Format for the node address and auxiliary argument information:
404 * ip:port[@cport][,hostname][,aux=val]*] */
405
406 aux_argv = sdssplitlen(argv[1], sdslen(argv[1]), ",", 1, &aux_argc);
407 if (aux_argv == NULL) {
408 sdsfreesplitres(argv,argc);
409 goto fmterr;
410 }
411
412 /* Hostname is an optional argument that defines the endpoint
413 * that can be reported to clients instead of IP. */
414 if (aux_argc > 1 && sdslen(aux_argv[1]) > 0) {
415 n->hostname = sdscpy(n->hostname, aux_argv[1]);
416 } else if (sdslen(n->hostname) != 0) {
417 sdsclear(n->hostname);
418 }
419
420 /* All fields after hostname are auxiliary and they take on
421 * the format of "aux=val" where both aux and val can contain
422 * characters that pass the isValidAuxChar check only. The order
423 * of the aux fields is insignificant. */
424 int aux_tcp_port = 0;
425 int aux_tls_port = 0;
426 int aux_shard_id = 0;
427 for (int i = 2; i < aux_argc; i++) {
428 int field_argc;
429 sds *field_argv;
430 field_argv = sdssplitlen(aux_argv[i], sdslen(aux_argv[i]), "=", 1, &field_argc);
431 if (field_argv == NULL || field_argc != 2) {
432 /* Invalid aux field format */
433 if (field_argv != NULL) sdsfreesplitres(field_argv, field_argc);
434 sdsfreesplitres(aux_argv, aux_argc);
435 sdsfreesplitres(argv,argc);
436 goto fmterr;
437 }
438
439 /* Validate that both aux and value contain valid characters only */
440 for (unsigned j = 0; j < 2; j++) {
441 if (!isValidAuxString(field_argv[j],sdslen(field_argv[j]))){
442 /* Invalid aux field format */
443 sdsfreesplitres(field_argv, field_argc);
444 sdsfreesplitres(aux_argv, aux_argc);
445 sdsfreesplitres(argv,argc);
446 goto fmterr;
447 }
448 }
449
450 /* Note that we don't expect lots of aux fields in the foreseeable
451 * future so a linear search is completely fine. */
452 int field_found = 0;
453 for (unsigned j = 0; j < numElements(auxFieldHandlers); j++) {
454 if (sdslen(field_argv[0]) != strlen(auxFieldHandlers[j].field) ||
455 memcmp(field_argv[0], auxFieldHandlers[j].field, sdslen(field_argv[0])) != 0) {
456 continue;
457 }
458 field_found = 1;
459 aux_shard_id |= j == af_shard_id;
460 aux_tcp_port |= j == af_tcp_port;
461 aux_tls_port |= j == af_tls_port;
462 if (auxFieldHandlers[j].setter(n, field_argv[1], sdslen(field_argv[1])) != C_OK) {
463 /* Invalid aux field format */
464 sdsfreesplitres(field_argv, field_argc);
465 sdsfreesplitres(aux_argv, aux_argc);
466 sdsfreesplitres(argv,argc);
467 goto fmterr;
468 }
469 }
470
471 if (field_found == 0) {
472 /* Invalid aux field format */
473 sdsfreesplitres(field_argv, field_argc);
474 sdsfreesplitres(aux_argv, aux_argc);
475 sdsfreesplitres(argv,argc);
476 goto fmterr;
477 }
478
479 sdsfreesplitres(field_argv, field_argc);
480 }
481 /* Address and port */
482 if ((p = strrchr(aux_argv[0],':')) == NULL) {
483 sdsfreesplitres(aux_argv, aux_argc);
484 sdsfreesplitres(argv,argc);
485 goto fmterr;
486 }
487 *p = '\0';
488 memcpy(n->ip,aux_argv[0],strlen(aux_argv[0])+1);
489 char *port = p+1;
490 char *busp = strchr(port,'@');
491 if (busp) {
492 *busp = '\0';
493 busp++;
494 }
495 /* If neither TCP or TLS port is found in aux field, it is considered
496 * an old version of nodes.conf file.*/
497 if (!aux_tcp_port && !aux_tls_port) {
498 if (server.tls_cluster) {
499 n->tls_port = atoi(port);
500 } else {
501 n->tcp_port = atoi(port);
502 }
503 } else if (!aux_tcp_port) {
504 n->tcp_port = atoi(port);
505 } else if (!aux_tls_port) {
506 n->tls_port = atoi(port);
507 }
508 /* In older versions of nodes.conf the "@busport" part is missing.
509 * In this case we set it to the default offset of 10000 from the
510 * base port. */
511 n->cport = busp ? atoi(busp) : (getNodeDefaultClientPort(n) + CLUSTER_PORT_INCR);
512
513 /* The plaintext port for client in a TLS cluster (n->pport) is not
514 * stored in nodes.conf. It is received later over the bus protocol. */
515
516 sdsfreesplitres(aux_argv, aux_argc);
517
518 /* Parse flags */
519 p = s = argv[2];
520 while(p) {
521 p = strchr(s,',');
522 if (p) *p = '\0';
523 if (!strcasecmp(s,"myself")) {
524 serverAssert(server.cluster->myself == NULL);
525 myself = server.cluster->myself = n;
526 n->flags |= CLUSTER_NODE_MYSELF;
527 } else if (!strcasecmp(s,"master")) {
528 n->flags |= CLUSTER_NODE_MASTER;
529 } else if (!strcasecmp(s,"slave")) {
530 n->flags |= CLUSTER_NODE_SLAVE;
531 } else if (!strcasecmp(s,"fail?")) {
532 n->flags |= CLUSTER_NODE_PFAIL;
533 } else if (!strcasecmp(s,"fail")) {
534 n->flags |= CLUSTER_NODE_FAIL;
535 n->fail_time = mstime();
536 } else if (!strcasecmp(s,"handshake")) {
537 n->flags |= CLUSTER_NODE_HANDSHAKE;
538 } else if (!strcasecmp(s,"noaddr")) {
539 n->flags |= CLUSTER_NODE_NOADDR;
540 } else if (!strcasecmp(s,"nofailover")) {
541 n->flags |= CLUSTER_NODE_NOFAILOVER;
542 } else if (!strcasecmp(s,"noflags")) {
543 /* nothing to do */
544 } else {
545 serverPanic("Unknown flag in redis cluster config file");
546 }
547 if (p) s = p+1;
548 }
549
550 /* Get master if any. Set the master and populate master's
551 * slave list. */
552 if (argv[3][0] != '-') {
553 if (verifyClusterNodeId(argv[3], sdslen(argv[3])) == C_ERR) {
554 sdsfreesplitres(argv, argc);
555 goto fmterr;
556 }
557 master = clusterLookupNode(argv[3], sdslen(argv[3]));
558 if (!master) {
559 master = createClusterNode(argv[3],0);
560 clusterAddNode(master);
561 }
562 /* shard_id can be absent if we are loading a nodes.conf generated
563 * by an older version of Redis;
564 * ignore replica's shard_id in the file, only use the primary's.
565 * If replica precedes primary in file, it will be corrected
566 * later by the auxShardIdSetter.
567 * Remove node from its old shard before adding it to the new one. */
568 if (aux_shard_id == 1) clusterRemoveNodeFromShard(n);
569 memcpy(n->shard_id, master->shard_id, CLUSTER_NAMELEN);
570 clusterAddNodeToShard(master->shard_id, n);
571 n->slaveof = master;
572 clusterNodeAddSlave(master,n);
573 } else if (aux_shard_id == 0) {
574 /* n is a primary but it does not have a persisted shard_id.
575 * This happens if we are loading a nodes.conf generated by
576 * an older version of Redis. We should manually update the
577 * shard membership in this case */
578 clusterAddNodeToShard(n->shard_id, n);
579 }
580
581 /* Set ping sent / pong received timestamps */
582 if (atoi(argv[4])) n->ping_sent = mstime();
583 if (atoi(argv[5])) n->pong_received = mstime();
584
585 /* Set configEpoch for this node.
586 * If the node is a replica, set its config epoch to 0.
587 * If it's a primary, load the config epoch from the configuration file. */
588 n->configEpoch = (nodeIsSlave(n) && n->slaveof) ? 0 : strtoull(argv[6],NULL,10);
589
590 /* Populate hash slots served by this instance. */
591 for (j = 8; j < argc; j++) {
592 int start, stop;
593
594 if (argv[j][0] == '[') {
595 /* Here we handle migrating / importing slots */
596 int slot;
597 char direction;
598 clusterNode *cn;
599
600 p = strchr(argv[j],'-');
601 serverAssert(p != NULL);
602 *p = '\0';
603 direction = p[1]; /* Either '>' or '<' */
604 slot = atoi(argv[j]+1);
605 if (slot < 0 || slot >= CLUSTER_SLOTS) {
606 sdsfreesplitres(argv,argc);
607 goto fmterr;
608 }
609 p += 3;
610
611 char *pr = strchr(p, ']');
612 size_t node_len = pr - p;
613 if (pr == NULL || verifyClusterNodeId(p, node_len) == C_ERR) {
614 sdsfreesplitres(argv, argc);
615 goto fmterr;
616 }
617 cn = clusterLookupNode(p, CLUSTER_NAMELEN);
618 if (!cn) {
619 cn = createClusterNode(p,0);
620 clusterAddNode(cn);
621 }
622 if (direction == '>') {
623 server.cluster->migrating_slots_to[slot] = cn;
624 } else {
625 server.cluster->importing_slots_from[slot] = cn;
626 }
627 continue;
628 } else if ((p = strchr(argv[j],'-')) != NULL) {
629 *p = '\0';
630 start = atoi(argv[j]);
631 stop = atoi(p+1);
632 } else {
633 start = stop = atoi(argv[j]);
634 }
635 if (start < 0 || start >= CLUSTER_SLOTS ||
636 stop < 0 || stop >= CLUSTER_SLOTS)
637 {
638 sdsfreesplitres(argv,argc);
639 goto fmterr;
640 }
641 while(start <= stop) clusterAddSlot(n, start++);
642 }
643
644 sdsfreesplitres(argv,argc);
645 }
646 /* Config sanity check */
647 if (server.cluster->myself == NULL) goto fmterr;
648 if (!(myself->flags & (CLUSTER_NODE_MASTER | CLUSTER_NODE_SLAVE))) goto fmterr;
649 if (nodeIsSlave(myself) && myself->slaveof == NULL) goto fmterr;
650
651 zfree(line);
652 fclose(fp);
653
654 serverLog(LL_NOTICE,"Node configuration loaded, I'm %.40s", myself->name);
655
656 /* Something that should never happen: currentEpoch smaller than
657 * the max epoch found in the nodes configuration. However we handle this
658 * as some form of protection against manual editing of critical files. */
659 if (clusterGetMaxEpoch() > server.cluster->currentEpoch) {
660 server.cluster->currentEpoch = clusterGetMaxEpoch();
661 }
662 return C_OK;
663
664fmterr:
665 serverLog(LL_WARNING,
666 "Unrecoverable error: corrupted cluster config file \"%s\".", line);
667 zfree(line);
668 if (fp) fclose(fp);
669 exit(1);
670}
671
672/* Cluster node configuration is exactly the same as CLUSTER NODES output.
673 *
674 * This function writes the node config and returns 0, on error -1
675 * is returned.
676 *
677 * Note: we need to write the file in an atomic way from the point of view
678 * of the POSIX filesystem semantics, so that if the server is stopped
679 * or crashes during the write, we'll end with either the old file or the
680 * new one. Since we have the full payload to write available we can use
681 * a single write to write the whole file. If the pre-existing file was
682 * bigger we pad our payload with newlines that are anyway ignored and truncate
683 * the file afterward. */
684int clusterSaveConfig(int do_fsync) {
685 sds ci,tmpfilename;
686 size_t content_size,offset = 0;
687 ssize_t written_bytes;
688 int fd = -1;
689 int retval = C_ERR;
690
691 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_SAVE_CONFIG;
692
693 /* Get the nodes description and concatenate our "vars" directive to
694 * save currentEpoch and lastVoteEpoch. */
695 ci = clusterGenNodesDescription(NULL, CLUSTER_NODE_HANDSHAKE, 0);
696 ci = sdscatprintf(ci,"vars currentEpoch %llu lastVoteEpoch %llu\n",
697 (unsigned long long) server.cluster->currentEpoch,
698 (unsigned long long) server.cluster->lastVoteEpoch);
699 content_size = sdslen(ci);
700
701 /* Create a temp file with the new content. */
702 tmpfilename = sdscatfmt(sdsempty(),"%s.tmp-%i-%I",
703 server.cluster_configfile,(int) getpid(),mstime());
704 if ((fd = open(tmpfilename,O_WRONLY|O_CREAT,0644)) == -1) {
705 serverLog(LL_WARNING,"Could not open temp cluster config file: %s",strerror(errno));
706 goto cleanup;
707 }
708
709 while (offset < content_size) {
710 written_bytes = write(fd,ci + offset,content_size - offset);
711 if (written_bytes <= 0) {
712 if (errno == EINTR) continue;
713 serverLog(LL_WARNING,"Failed after writing (%zd) bytes to tmp cluster config file: %s",
714 offset,strerror(errno));
715 goto cleanup;
716 }
717 offset += written_bytes;
718 }
719
720 if (do_fsync) {
721 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_FSYNC_CONFIG;
722 if (redis_fsync(fd) == -1) {
723 serverLog(LL_WARNING,"Could not sync tmp cluster config file: %s",strerror(errno));
724 goto cleanup;
725 }
726 }
727
728 if (rename(tmpfilename, server.cluster_configfile) == -1) {
729 serverLog(LL_WARNING,"Could not rename tmp cluster config file: %s",strerror(errno));
730 goto cleanup;
731 }
732
733 if (do_fsync) {
734 if (fsyncFileDir(server.cluster_configfile) == -1) {
735 serverLog(LL_WARNING,"Could not sync cluster config file dir: %s",strerror(errno));
736 goto cleanup;
737 }
738 }
739 retval = C_OK; /* If we reached this point, everything is fine. */
740
741cleanup:
742 if (fd != -1) close(fd);
743 if (retval) unlink(tmpfilename);
744 sdsfree(tmpfilename);
745 sdsfree(ci);
746 return retval;
747}
748
749void clusterSaveConfigOrDie(int do_fsync) {
750 if (clusterSaveConfig(do_fsync) == -1) {
751 serverLog(LL_WARNING,"Fatal: can't update cluster config file.");
752 exit(1);
753 }
754}
755
756/* Lock the cluster config using flock(), and retain the file descriptor used to
757 * acquire the lock so that the file will be locked as long as the process is up.
758 *
759 * This works because we always update nodes.conf with a new version
760 * in-place, reopening the file, and writing to it in place (later adjusting
761 * the length with ftruncate()).
762 *
763 * On success C_OK is returned, otherwise an error is logged and
764 * the function returns C_ERR to signal a lock was not acquired. */
765int clusterLockConfig(char *filename) {
766/* flock() does not exist on Solaris
767 * and a fcntl-based solution won't help, as we constantly re-open that file,
768 * which will release _all_ locks anyway
769 */
770#if !defined(__sun)
771 /* To lock it, we need to open the file in a way it is created if
772 * it does not exist, otherwise there is a race condition with other
773 * processes. */
774 int fd = open(filename,O_WRONLY|O_CREAT|O_CLOEXEC,0644);
775 if (fd == -1) {
776 serverLog(LL_WARNING,
777 "Can't open %s in order to acquire a lock: %s",
778 filename, strerror(errno));
779 return C_ERR;
780 }
781
782 if (flock(fd,LOCK_EX|LOCK_NB) == -1) {
783 if (errno == EWOULDBLOCK) {
784 serverLog(LL_WARNING,
785 "Sorry, the cluster configuration file %s is already used "
786 "by a different Redis Cluster node. Please make sure that "
787 "different nodes use different cluster configuration "
788 "files.", filename);
789 } else {
790 serverLog(LL_WARNING,
791 "Impossible to lock %s: %s", filename, strerror(errno));
792 }
793 close(fd);
794 return C_ERR;
795 }
796 /* Lock acquired: leak the 'fd' by not closing it until shutdown time, so that
797 * we'll retain the lock to the file as long as the process exists.
798 *
799 * After fork, the child process will get the fd opened by the parent process,
800 * we need save `fd` to `cluster_config_file_lock_fd`, so that in redisFork(),
801 * it will be closed in the child process.
802 * If it is not closed, when the main process is killed -9, but the child process
803 * (redis-aof-rewrite) is still alive, the fd(lock) will still be held by the
804 * child process, and the main process will fail to get lock, means fail to start. */
805 server.cluster_config_file_lock_fd = fd;
806#else
807 UNUSED(filename);
808#endif /* __sun */
809
810 return C_OK;
811}
812
813/* Derives our ports to be announced in the cluster bus. */
814void deriveAnnouncedPorts(int *announced_tcp_port, int *announced_tls_port,
815 int *announced_cport) {
816 /* Config overriding announced ports. */
817 *announced_tcp_port = server.cluster_announce_port ?
818 server.cluster_announce_port : server.port;
819 *announced_tls_port = server.cluster_announce_tls_port ?
820 server.cluster_announce_tls_port : server.tls_port;
821 /* Derive cluster bus port. */
822 if (server.cluster_announce_bus_port) {
823 *announced_cport = server.cluster_announce_bus_port;
824 } else if (server.cluster_port) {
825 *announced_cport = server.cluster_port;
826 } else {
827 *announced_cport = defaultClientPort() + CLUSTER_PORT_INCR;
828 }
829}
830
831/* Some flags (currently just the NOFAILOVER flag) may need to be updated
832 * in the "myself" node based on the current configuration of the node,
833 * that may change at runtime via CONFIG SET. This function changes the
834 * set of flags in myself->flags accordingly. */
835void clusterUpdateMyselfFlags(void) {
836 if (!myself) return;
837 int oldflags = myself->flags;
838 int nofailover = server.cluster_slave_no_failover ?
839 CLUSTER_NODE_NOFAILOVER : 0;
840 myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
841 myself->flags |= nofailover;
842 if (myself->flags != oldflags) {
843 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
844 CLUSTER_TODO_UPDATE_STATE);
845 }
846}
847
848
849/* We want to take myself->port/cport/pport in sync with the
850* cluster-announce-port/cluster-announce-bus-port/cluster-announce-tls-port option.
851* The option can be set at runtime via CONFIG SET. */
852void clusterUpdateMyselfAnnouncedPorts(void) {
853 if (!myself) return;
854 deriveAnnouncedPorts(&myself->tcp_port,&myself->tls_port,&myself->cport);
855}
856
857/* We want to take myself->ip in sync with the cluster-announce-ip option.
858* The option can be set at runtime via CONFIG SET. */
859void clusterUpdateMyselfIp(void) {
860 if (!myself) return;
861 static char *prev_ip = NULL;
862 char *curr_ip = server.cluster_announce_ip;
863 int changed = 0;
864
865 if (prev_ip == NULL && curr_ip != NULL) changed = 1;
866 else if (prev_ip != NULL && curr_ip == NULL) changed = 1;
867 else if (prev_ip && curr_ip && strcmp(prev_ip,curr_ip)) changed = 1;
868
869 if (changed) {
870 if (prev_ip) zfree(prev_ip);
871 prev_ip = curr_ip;
872
873 if (curr_ip) {
874 /* We always take a copy of the previous IP address, by
875 * duplicating the string. This way later we can check if
876 * the address really changed. */
877 prev_ip = zstrdup(prev_ip);
878 redis_strlcpy(myself->ip,server.cluster_announce_ip,NET_IP_STR_LEN);
879 } else {
880 myself->ip[0] = '\0'; /* Force autodetection. */
881 }
882 }
883}
884
885/* Update the hostname for the specified node with the provided C string. */
886static void updateAnnouncedHostname(clusterNode *node, char *new) {
887 /* Previous and new hostname are the same, no need to update. */
888 if (new && !strcmp(new, node->hostname)) {
889 return;
890 } else if (!new && (sdslen(node->hostname) == 0)) {
891 return;
892 }
893
894 if (new) {
895 node->hostname = sdscpy(node->hostname, new);
896 } else if (sdslen(node->hostname) != 0) {
897 sdsclear(node->hostname);
898 }
899 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
900}
901
902static void updateAnnouncedHumanNodename(clusterNode *node, char *new) {
903 if (new && !strcmp(new, node->human_nodename)) {
904 return;
905 } else if (!new && (sdslen(node->human_nodename) == 0)) {
906 return;
907 }
908
909 if (new) {
910 node->human_nodename = sdscpy(node->human_nodename, new);
911 } else if (sdslen(node->human_nodename) != 0) {
912 sdsclear(node->human_nodename);
913 }
914 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
915}
916
917static void assignShardIdToNode(clusterNode *node, const char *shard_id, int flag) {
918 clusterRemoveNodeFromShard(node);
919 memcpy(node->shard_id, shard_id, CLUSTER_NAMELEN);
920 clusterAddNodeToShard(shard_id, node);
921 clusterDoBeforeSleep(flag);
922}
923
924static void updateShardId(clusterNode *node, const char *shard_id) {
925 if (shard_id && memcmp(node->shard_id, shard_id, CLUSTER_NAMELEN) != 0) {
926 /* We always make our best effort to keep the shard-id consistent
927 * between the master and its replicas:
928 *
929 * 1. When updating the master's shard-id, we simultaneously update the
930 * shard-id of all its replicas to ensure consistency.
931 * 2. When updating replica's shard-id, if it differs from its master's shard-id,
932 * we discard this replica's shard-id and continue using master's shard-id.
933 * This applies even if the master does not support shard-id, in which
934 * case we rely on the master's randomly generated shard-id. */
935 if (node->slaveof == NULL) {
936 assignShardIdToNode(node, shard_id, CLUSTER_TODO_SAVE_CONFIG);
937 for (int i = 0; i < clusterNodeNumSlaves(node); i++) {
938 clusterNode *slavenode = clusterNodeGetSlave(node, i);
939 if (memcmp(slavenode->shard_id, shard_id, CLUSTER_NAMELEN) != 0)
940 assignShardIdToNode(slavenode, shard_id, CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
941 }
942 } else if (memcmp(node->slaveof->shard_id, shard_id, CLUSTER_NAMELEN) == 0) {
943 assignShardIdToNode(node, shard_id, CLUSTER_TODO_SAVE_CONFIG);
944 }
945 }
946}
947
948/* Update my hostname based on server configuration values */
949void clusterUpdateMyselfHostname(void) {
950 if (!myself) return;
951 updateAnnouncedHostname(myself, server.cluster_announce_hostname);
952}
953
954void clusterUpdateMyselfHumanNodename(void) {
955 if (!myself) return;
956 updateAnnouncedHumanNodename(myself, server.cluster_announce_human_nodename);
957}
958
959void clusterInit(void) {
960 int saveconf = 0;
961
962 server.cluster = zmalloc(sizeof(struct clusterState));
963 server.cluster->myself = NULL;
964 server.cluster->currentEpoch = 0;
965 server.cluster->state = CLUSTER_FAIL;
966 server.cluster->size = 0;
967 server.cluster->todo_before_sleep = 0;
968 server.cluster->nodes = dictCreate(&clusterNodesDictType);
969 server.cluster->shards = dictCreate(&clusterSdsToListType);
970 server.cluster->nodes_black_list =
971 dictCreate(&clusterNodesBlackListDictType);
972 server.cluster->failover_auth_time = 0;
973 server.cluster->failover_auth_count = 0;
974 server.cluster->failover_auth_rank = 0;
975 server.cluster->failover_auth_epoch = 0;
976 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
977 server.cluster->lastVoteEpoch = 0;
978
979 /* Initialize stats */
980 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
981 server.cluster->stats_bus_messages_sent[i] = 0;
982 server.cluster->stats_bus_messages_received[i] = 0;
983 }
984 server.cluster->stats_pfail_nodes = 0;
985 server.cluster->stat_cluster_links_buffer_limit_exceeded = 0;
986
987 memset(server.cluster->slots,0, sizeof(server.cluster->slots));
988 clusterCloseAllSlots();
989
990 memset(server.cluster->owner_not_claiming_slot, 0, sizeof(server.cluster->owner_not_claiming_slot));
991
992 /* Lock the cluster config file to make sure every node uses
993 * its own nodes.conf. */
994 server.cluster_config_file_lock_fd = -1;
995 if (clusterLockConfig(server.cluster_configfile) == C_ERR)
996 exit(1);
997
998 /* Load or create a new nodes configuration. */
999 if (clusterLoadConfig(server.cluster_configfile) == C_ERR) {
1000 /* No configuration found. We will just use the random name provided
1001 * by the createClusterNode() function. */
1002 myself = server.cluster->myself =
1003 createClusterNode(NULL,CLUSTER_NODE_MYSELF|CLUSTER_NODE_MASTER);
1004 serverLog(LL_NOTICE,"No cluster configuration found, I'm %.40s",
1005 myself->name);
1006 clusterAddNode(myself);
1007 clusterAddNodeToShard(myself->shard_id, myself);
1008 saveconf = 1;
1009 }
1010 if (saveconf) clusterSaveConfigOrDie(1);
1011
1012 /* Port sanity check II
1013 * The other handshake port check is triggered too late to stop
1014 * us from trying to use a too-high cluster port number. */
1015 int port = defaultClientPort();
1016 if (!server.cluster_port && port > (65535-CLUSTER_PORT_INCR)) {
1017 serverLog(LL_WARNING, "Redis port number too high. "
1018 "Cluster communication port is 10,000 port "
1019 "numbers higher than your Redis port. "
1020 "Your Redis port number must be 55535 or less.");
1021 exit(1);
1022 }
1023 if (!server.bindaddr_count) {
1024 serverLog(LL_WARNING, "No bind address is configured, but it is required for the Cluster bus.");
1025 exit(1);
1026 }
1027
1028 /* Set myself->port/cport/pport to my listening ports, we'll just need to
1029 * discover the IP address via MEET messages. */
1030 deriveAnnouncedPorts(&myself->tcp_port, &myself->tls_port, &myself->cport);
1031
1032 server.cluster->mf_end = 0;
1033 server.cluster->mf_slave = NULL;
1034 resetManualFailover();
1035 clusterUpdateMyselfFlags();
1036 clusterUpdateMyselfIp();
1037 clusterUpdateMyselfHostname();
1038 clusterUpdateMyselfHumanNodename();
1039
1040 getRandomHexChars(server.cluster->internal_secret, CLUSTER_INTERNALSECRETLEN);
1041}
1042
1043void clusterInitLast(void) {
1044 if (connectionIndexByType(connTypeOfCluster()->get_type(NULL)) < 0) {
1045 serverLog(LL_WARNING, "Missing connection type %s, but it is required for the Cluster bus.", connTypeOfCluster()->get_type(NULL));
1046 exit(1);
1047 }
1048
1049 int port = defaultClientPort();
1050 connListener *listener = &server.clistener;
1051 listener->count = 0;
1052 listener->bindaddr = server.bindaddr;
1053 listener->bindaddr_count = server.bindaddr_count;
1054 listener->port = server.cluster_port ? server.cluster_port : port + CLUSTER_PORT_INCR;
1055 listener->ct = connTypeOfCluster();
1056 if (connListen(listener) == C_ERR ) {
1057 /* Note: the following log text is matched by the test suite. */
1058 serverLog(LL_WARNING, "Failed listening on port %u (cluster), aborting.", listener->port);
1059 exit(1);
1060 }
1061
1062 if (createSocketAcceptHandler(&server.clistener, clusterAcceptHandler) != C_OK) {
1063 serverPanic("Unrecoverable error creating Redis Cluster socket accept handler.");
1064 }
1065}
1066
1067/* Reset a node performing a soft or hard reset:
1068 *
1069 * 1) All other nodes are forgotten.
1070 * 2) All the assigned / open slots are released.
1071 * 3) If the node is a slave, it turns into a master.
1072 * 4) Only for hard reset: a new Node ID is generated.
1073 * 5) Only for hard reset: currentEpoch and configEpoch are set to 0.
1074 * 6) The new configuration is saved and the cluster state updated.
1075 * 7) If the node was a slave, the whole data set is flushed away. */
1076void clusterReset(int hard) {
1077 dictIterator di;
1078 dictEntry *de;
1079 int j;
1080
1081 /* Turn into master. */
1082 if (nodeIsSlave(myself)) {
1083 asmFinalizeMasterTask();
1084 clusterSetNodeAsMaster(myself);
1085 replicationUnsetMaster();
1086 emptyData(-1,EMPTYDB_NO_FLAGS,NULL);
1087 }
1088
1089 /* Close slots, reset manual failover state. */
1090 clusterCloseAllSlots();
1091 resetManualFailover();
1092
1093 /* Cancel all ASM tasks */
1094 clusterAsmCancel(NULL, "CLUSTER RESET");
1095 asmCancelTrimJobs();
1096
1097 /* Unassign all the slots. */
1098 for (j = 0; j < CLUSTER_SLOTS; j++) clusterDelSlot(j);
1099
1100 /* Recreate shards dict */
1101 dictEmpty(server.cluster->shards, NULL);
1102
1103 /* Forget all the nodes, but myself. */
1104 dictInitSafeIterator(&di, server.cluster->nodes);
1105 while((de = dictNext(&di)) != NULL) {
1106 clusterNode *node = dictGetVal(de);
1107
1108 if (node == myself) continue;
1109 clusterDelNode(node);
1110 }
1111 dictResetIterator(&di);
1112
1113 /* Empty the nodes blacklist. */
1114 dictEmpty(server.cluster->nodes_black_list, NULL);
1115
1116 /* Hard reset only: set epochs to 0, change node ID. */
1117 if (hard) {
1118 sds oldname;
1119
1120 server.cluster->currentEpoch = 0;
1121 server.cluster->lastVoteEpoch = 0;
1122 myself->configEpoch = 0;
1123 serverLog(LL_NOTICE, "configEpoch set to 0 via CLUSTER RESET HARD");
1124
1125 /* To change the Node ID we need to remove the old name from the
1126 * nodes table, change the ID, and re-add back with new name. */
1127 oldname = sdsnewlen(myself->name, CLUSTER_NAMELEN);
1128 dictDelete(server.cluster->nodes,oldname);
1129 sdsfree(oldname);
1130 getRandomHexChars(myself->name, CLUSTER_NAMELEN);
1131 getRandomHexChars(myself->shard_id, CLUSTER_NAMELEN);
1132 clusterAddNode(myself);
1133 serverLog(LL_NOTICE,"Node hard reset, now I'm %.40s", myself->name);
1134 }
1135
1136 /* Re-populate shards */
1137 clusterAddNodeToShard(myself->shard_id, myself);
1138
1139 /* Make sure to persist the new config and update the state. */
1140 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1141 CLUSTER_TODO_UPDATE_STATE|
1142 CLUSTER_TODO_FSYNC_CONFIG);
1143}
1144
1145/* -----------------------------------------------------------------------------
1146 * CLUSTER communication link
1147 * -------------------------------------------------------------------------- */
1148static clusterMsgSendBlock *createClusterMsgSendBlock(int type, uint32_t msglen) {
1149 uint32_t blocklen = msglen + sizeof(clusterMsgSendBlock);
1150 clusterMsgSendBlock *msgblock = zcalloc(blocklen);
1151 msgblock->refcount = 1;
1152 msgblock->totlen = blocklen;
1153 server.stat_cluster_links_memory += blocklen;
1154 clusterBuildMessageHdr(getMessageFromSendBlock(msgblock),type,msglen);
1155 return msgblock;
1156}
1157
1158static void clusterMsgSendBlockDecrRefCount(void *node) {
1159 clusterMsgSendBlock *msgblock = (clusterMsgSendBlock*)node;
1160 msgblock->refcount--;
1161 serverAssert(msgblock->refcount >= 0);
1162 if (msgblock->refcount == 0) {
1163 server.stat_cluster_links_memory -= msgblock->totlen;
1164 zfree(msgblock);
1165 }
1166}
1167
1168clusterLink *createClusterLink(clusterNode *node) {
1169 clusterLink *link = zmalloc(sizeof(*link));
1170 link->ctime = mstime();
1171 link->send_msg_queue = listCreate();
1172 listSetFreeMethod(link->send_msg_queue, clusterMsgSendBlockDecrRefCount);
1173 link->head_msg_send_offset = 0;
1174 link->send_msg_queue_mem = sizeof(list);
1175 link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
1176 link->rcvbuf_len = 0;
1177 server.stat_cluster_links_memory += link->rcvbuf_alloc + link->send_msg_queue_mem;
1178 link->conn = NULL;
1179 link->node = node;
1180 /* Related node can only possibly be known at link creation time if this is an outbound link */
1181 link->inbound = (node == NULL);
1182 if (!link->inbound) {
1183 node->link = link;
1184 }
1185 return link;
1186}
1187
1188/* Free a cluster link, but does not free the associated node of course.
1189 * This function will just make sure that the original node associated
1190 * with this link will have the 'link' field set to NULL. */
1191void freeClusterLink(clusterLink *link) {
1192 if (link->conn) {
1193 connClose(link->conn);
1194 link->conn = NULL;
1195 }
1196 server.stat_cluster_links_memory -= sizeof(list) + listLength(link->send_msg_queue)*sizeof(listNode);
1197 listRelease(link->send_msg_queue);
1198 server.stat_cluster_links_memory -= link->rcvbuf_alloc;
1199 zfree(link->rcvbuf);
1200 if (link->node) {
1201 if (link->node->link == link) {
1202 serverAssert(!link->inbound);
1203 link->node->link = NULL;
1204 } else if (link->node->inbound_link == link) {
1205 serverAssert(link->inbound);
1206 link->node->inbound_link = NULL;
1207 }
1208 }
1209 zfree(link);
1210}
1211
1212void setClusterNodeToInboundClusterLink(clusterNode *node, clusterLink *link) {
1213 serverAssert(!link->node);
1214 serverAssert(link->inbound);
1215 if (node->inbound_link) {
1216 /* A peer may disconnect and then reconnect with us, and it's not guaranteed that
1217 * we would always process the disconnection of the existing inbound link before
1218 * accepting a new existing inbound link. Therefore, it's possible to have more than
1219 * one inbound link from the same node at the same time. Our cleanup logic assumes
1220 * a one to one relationship between nodes and inbound links, so we need to kill
1221 * one of the links. The existing link is more likely the outdated one, but it's
1222 * possible the other node may need to open another link. */
1223 serverLog(LL_DEBUG, "Replacing inbound link fd %d from node %.40s with fd %d",
1224 node->inbound_link->conn->fd, node->name, link->conn->fd);
1225 freeClusterLink(node->inbound_link);
1226 }
1227 serverAssert(!node->inbound_link);
1228 node->inbound_link = link;
1229 link->node = node;
1230}
1231
1232static void clusterConnAcceptHandler(connection *conn) {
1233 clusterLink *link;
1234
1235 if (connGetState(conn) != CONN_STATE_CONNECTED) {
1236 serverLog(LL_VERBOSE,
1237 "Error accepting cluster node connection: %s", connGetLastError(conn));
1238 connClose(conn);
1239 return;
1240 }
1241
1242 /* Create a link object we use to handle the connection.
1243 * It gets passed to the readable handler when data is available.
1244 * Initially the link->node pointer is set to NULL as we don't know
1245 * which node is, but the right node is references once we know the
1246 * node identity. */
1247 link = createClusterLink(NULL);
1248 link->conn = conn;
1249 connSetPrivateData(conn, link);
1250
1251 /* Register read handler */
1252 connSetReadHandler(conn, clusterReadHandler);
1253}
1254
1255#define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
1256void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1257 int cport, cfd;
1258 int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
1259 char cip[NET_IP_STR_LEN];
1260 int require_auth = TLS_CLIENT_AUTH_YES;
1261 UNUSED(el);
1262 UNUSED(mask);
1263 UNUSED(privdata);
1264
1265 /* If the server is starting up, don't accept cluster connections:
1266 * UPDATE messages may interact with the database content. */
1267 if (server.masterhost == NULL && server.loading) return;
1268
1269 while(max--) {
1270 cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
1271 if (cfd == ANET_ERR) {
1272 if (anetAcceptFailureNeedsRetry(errno))
1273 continue;
1274 if (errno != EWOULDBLOCK)
1275 serverLog(LL_VERBOSE,
1276 "Error accepting cluster node: %s", server.neterr);
1277 return;
1278 }
1279
1280 connection *conn = connCreateAccepted(server.el, connTypeOfCluster(), cfd, &require_auth);
1281
1282 /* Make sure connection is not in an error state */
1283 if (connGetState(conn) != CONN_STATE_ACCEPTING) {
1284 serverLog(LL_VERBOSE,
1285 "Error creating an accepting connection for cluster node: %s",
1286 connGetLastError(conn));
1287 connClose(conn);
1288 return;
1289 }
1290 connEnableTcpNoDelay(conn);
1291 connKeepAlive(conn,server.cluster_node_timeout / 1000 * 2);
1292
1293 /* Use non-blocking I/O for cluster messages. */
1294 serverLog(LL_VERBOSE,"Accepting cluster node connection from %s:%d", cip, cport);
1295
1296 /* Accept the connection now. connAccept() may call our handler directly
1297 * or schedule it for later depending on connection implementation.
1298 */
1299 if (connAccept(conn, clusterConnAcceptHandler) == C_ERR) {
1300 if (connGetState(conn) == CONN_STATE_ERROR)
1301 serverLog(LL_VERBOSE,
1302 "Error accepting cluster node connection: %s",
1303 connGetLastError(conn));
1304 connClose(conn);
1305 return;
1306 }
1307 }
1308}
1309
1310/* Return the approximated number of sockets we are using in order to
1311 * take the cluster bus connections. */
1312unsigned long getClusterConnectionsCount(void) {
1313 /* We decrement the number of nodes by one, since there is the
1314 * "myself" node too in the list. Each node uses two file descriptors,
1315 * one incoming and one outgoing, thus the multiplication by 2. */
1316 return server.cluster_enabled ?
1317 ((dictSize(server.cluster->nodes)-1)*2) : 0;
1318}
1319
1320/* -----------------------------------------------------------------------------
1321 * CLUSTER node API
1322 * -------------------------------------------------------------------------- */
1323
1324/* Create a new cluster node, with the specified flags.
1325 * If "nodename" is NULL this is considered a first handshake and a random
1326 * node name is assigned to this node (it will be fixed later when we'll
1327 * receive the first pong).
1328 *
1329 * The node is created and returned to the user, but it is not automatically
1330 * added to the nodes hash table. */
1331clusterNode *createClusterNode(char *nodename, int flags) {
1332 clusterNode *node = zmalloc(sizeof(*node));
1333
1334 if (nodename)
1335 memcpy(node->name, nodename, CLUSTER_NAMELEN);
1336 else
1337 getRandomHexChars(node->name, CLUSTER_NAMELEN);
1338 getRandomHexChars(node->shard_id, CLUSTER_NAMELEN);
1339 node->ctime = mstime();
1340 node->configEpoch = 0;
1341 node->flags = flags;
1342 memset(node->slots,0,sizeof(node->slots));
1343 node->slot_info_pairs = NULL;
1344 node->slot_info_pairs_count = 0;
1345 node->numslots = 0;
1346 node->numslaves = 0;
1347 node->slaves = NULL;
1348 node->slaveof = NULL;
1349 node->last_in_ping_gossip = 0;
1350 node->ping_sent = node->pong_received = 0;
1351 node->data_received = 0;
1352 node->fail_time = 0;
1353 node->link = NULL;
1354 node->inbound_link = NULL;
1355 memset(node->ip,0,sizeof(node->ip));
1356 node->hostname = sdsempty();
1357 node->human_nodename = sdsempty();
1358 node->tcp_port = 0;
1359 node->cport = 0;
1360 node->tls_port = 0;
1361 node->fail_reports = listCreate();
1362 node->voted_time = 0;
1363 node->orphaned_time = 0;
1364 node->repl_offset_time = 0;
1365 node->repl_offset = 0;
1366 listSetFreeMethod(node->fail_reports,zfree);
1367 return node;
1368}
1369
1370/* This function is called every time we get a failure report from a node.
1371 * The side effect is to populate the fail_reports list (or to update
1372 * the timestamp of an existing report).
1373 *
1374 * 'failing' is the node that is in failure state according to the
1375 * 'sender' node.
1376 *
1377 * The function returns 0 if it just updates a timestamp of an existing
1378 * failure report from the same sender. 1 is returned if a new failure
1379 * report is created. */
1380int clusterNodeAddFailureReport(clusterNode *failing, clusterNode *sender) {
1381 list *l = failing->fail_reports;
1382 listNode *ln;
1383 listIter li;
1384 clusterNodeFailReport *fr;
1385
1386 /* If a failure report from the same sender already exists, just update
1387 * the timestamp. */
1388 listRewind(l,&li);
1389 while ((ln = listNext(&li)) != NULL) {
1390 fr = ln->value;
1391 if (fr->node == sender) {
1392 fr->time = mstime();
1393 return 0;
1394 }
1395 }
1396
1397 /* Otherwise create a new report. */
1398 fr = zmalloc(sizeof(*fr));
1399 fr->node = sender;
1400 fr->time = mstime();
1401 listAddNodeTail(l,fr);
1402 return 1;
1403}
1404
1405/* Remove failure reports that are too old, where too old means reasonably
1406 * older than the global node timeout. Note that anyway for a node to be
1407 * flagged as FAIL we need to have a local PFAIL state that is at least
1408 * older than the global node timeout, so we don't just trust the number
1409 * of failure reports from other nodes. */
1410void clusterNodeCleanupFailureReports(clusterNode *node) {
1411 list *l = node->fail_reports;
1412 listNode *ln;
1413 listIter li;
1414 clusterNodeFailReport *fr;
1415 mstime_t maxtime = server.cluster_node_timeout *
1416 CLUSTER_FAIL_REPORT_VALIDITY_MULT;
1417 mstime_t now = mstime();
1418
1419 listRewind(l,&li);
1420 while ((ln = listNext(&li)) != NULL) {
1421 fr = ln->value;
1422 if (now - fr->time > maxtime) listDelNode(l,ln);
1423 }
1424}
1425
1426/* Remove the failing report for 'node' if it was previously considered
1427 * failing by 'sender'. This function is called when a node informs us via
1428 * gossip that a node is OK from its point of view (no FAIL or PFAIL flags).
1429 *
1430 * Note that this function is called relatively often as it gets called even
1431 * when there are no nodes failing, and is O(N), however when the cluster is
1432 * fine the failure reports list is empty so the function runs in constant
1433 * time.
1434 *
1435 * The function returns 1 if the failure report was found and removed.
1436 * Otherwise 0 is returned. */
1437int clusterNodeDelFailureReport(clusterNode *node, clusterNode *sender) {
1438 list *l = node->fail_reports;
1439 listNode *ln;
1440 listIter li;
1441 clusterNodeFailReport *fr;
1442
1443 /* Search for a failure report from this sender. */
1444 listRewind(l,&li);
1445 while ((ln = listNext(&li)) != NULL) {
1446 fr = ln->value;
1447 if (fr->node == sender) break;
1448 }
1449 if (!ln) return 0; /* No failure report from this sender. */
1450
1451 /* Remove the failure report. */
1452 listDelNode(l,ln);
1453 clusterNodeCleanupFailureReports(node);
1454 return 1;
1455}
1456
1457/* Return the number of external nodes that believe 'node' is failing,
1458 * not including this node, that may have a PFAIL or FAIL state for this
1459 * node as well. */
1460int clusterNodeFailureReportsCount(clusterNode *node) {
1461 clusterNodeCleanupFailureReports(node);
1462 return listLength(node->fail_reports);
1463}
1464
1465int clusterNodeRemoveSlave(clusterNode *master, clusterNode *slave) {
1466 int j;
1467
1468 for (j = 0; j < master->numslaves; j++) {
1469 if (master->slaves[j] == slave) {
1470 if ((j+1) < master->numslaves) {
1471 int remaining_slaves = (master->numslaves - j) - 1;
1472 memmove(master->slaves+j,master->slaves+(j+1),
1473 (sizeof(*master->slaves) * remaining_slaves));
1474 }
1475 master->numslaves--;
1476 if (master->numslaves == 0)
1477 master->flags &= ~CLUSTER_NODE_MIGRATE_TO;
1478 return C_OK;
1479 }
1480 }
1481 return C_ERR;
1482}
1483
1484int clusterNodeAddSlave(clusterNode *master, clusterNode *slave) {
1485 int j;
1486
1487 /* If it's already a slave, don't add it again. */
1488 for (j = 0; j < master->numslaves; j++)
1489 if (master->slaves[j] == slave) return C_ERR;
1490 master->slaves = zrealloc(master->slaves,
1491 sizeof(clusterNode*)*(master->numslaves+1));
1492 master->slaves[master->numslaves] = slave;
1493 master->numslaves++;
1494 master->flags |= CLUSTER_NODE_MIGRATE_TO;
1495 return C_OK;
1496}
1497
1498int clusterCountNonFailingSlaves(clusterNode *n) {
1499 int j, okslaves = 0;
1500
1501 for (j = 0; j < n->numslaves; j++)
1502 if (!nodeFailed(n->slaves[j])) okslaves++;
1503 return okslaves;
1504}
1505
1506/* Low level cleanup of the node structure. Only called by clusterDelNode(). */
1507void freeClusterNode(clusterNode *n) {
1508 sds nodename;
1509 int j;
1510
1511 /* If the node has associated slaves, we have to set
1512 * all the slaves->slaveof fields to NULL (unknown). */
1513 for (j = 0; j < n->numslaves; j++)
1514 n->slaves[j]->slaveof = NULL;
1515
1516 /* Remove this node from the list of slaves of its master. */
1517 if (nodeIsSlave(n) && n->slaveof) clusterNodeRemoveSlave(n->slaveof,n);
1518
1519 /* Unlink from the set of nodes. */
1520 nodename = sdsnewlen(n->name, CLUSTER_NAMELEN);
1521 serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK);
1522 sdsfree(nodename);
1523 sdsfree(n->hostname);
1524 sdsfree(n->human_nodename);
1525
1526 /* Release links and associated data structures. */
1527 if (n->link) freeClusterLink(n->link);
1528 if (n->inbound_link) freeClusterLink(n->inbound_link);
1529 listRelease(n->fail_reports);
1530 zfree(n->slaves);
1531 zfree(n);
1532}
1533
1534/* Add a node to the nodes hash table */
1535void clusterAddNode(clusterNode *node) {
1536 int retval;
1537
1538 retval = dictAdd(server.cluster->nodes,
1539 sdsnewlen(node->name,CLUSTER_NAMELEN), node);
1540 serverAssert(retval == DICT_OK);
1541}
1542
1543/* Remove a node from the cluster. The function performs the high level
1544 * cleanup, calling freeClusterNode() for the low level cleanup.
1545 * Here we do the following:
1546 *
1547 * 1) Mark all the slots handled by it as unassigned.
1548 * 2) Remove all the failure reports sent by this node and referenced by
1549 * other nodes.
1550 * 3) Remove the node from the owning shard
1551 * 4) Cancel all ASM tasks that involve the node.
1552 * 5) Free the node with freeClusterNode() that will in turn remove it
1553 * from the hash table and from the list of slaves of its master, if
1554 * it is a slave node.
1555 */
1556void clusterDelNode(clusterNode *delnode) {
1557 int j;
1558 dictIterator di;
1559 dictEntry *de;
1560
1561 /* 1) Mark slots as unassigned. */
1562 for (j = 0; j < CLUSTER_SLOTS; j++) {
1563 if (server.cluster->importing_slots_from[j] == delnode)
1564 server.cluster->importing_slots_from[j] = NULL;
1565 if (server.cluster->migrating_slots_to[j] == delnode)
1566 server.cluster->migrating_slots_to[j] = NULL;
1567 if (server.cluster->slots[j] == delnode)
1568 clusterDelSlot(j);
1569 }
1570
1571 /* 2) Remove failure reports. */
1572 dictInitSafeIterator(&di, server.cluster->nodes);
1573 while((de = dictNext(&di)) != NULL) {
1574 clusterNode *node = dictGetVal(de);
1575
1576 if (node == delnode) continue;
1577 clusterNodeDelFailureReport(node,delnode);
1578 }
1579 dictResetIterator(&di);
1580
1581 /* 3) Remove the node from the owning shard */
1582 clusterRemoveNodeFromShard(delnode);
1583
1584 /* 4) Cancel all ASM tasks that involve the node. */
1585 clusterAsmCancelByNode(delnode, "node deleted");
1586
1587 /* 5) Free the node, unlinking it from the cluster. */
1588 freeClusterNode(delnode);
1589}
1590
1591/* Node lookup by name */
1592clusterNode *clusterLookupNode(const char *name, int length) {
1593 if (verifyClusterNodeId(name, length) != C_OK) return NULL;
1594 sds s = sdsnewlen(name, length);
1595 dictEntry *de = dictFind(server.cluster->nodes, s);
1596 sdsfree(s);
1597 if (de == NULL) return NULL;
1598 return dictGetVal(de);
1599}
1600
1601const char *clusterGetSecret(size_t *len) {
1602 if (!server.cluster) {
1603 return NULL;
1604 }
1605 *len = CLUSTER_INTERNALSECRETLEN;
1606 return server.cluster->internal_secret;
1607}
1608
1609/* Get all the nodes in my shard.
1610 * Note that the list returned is not computed on the fly
1611 * via slaveof; rather, it is maintained permanently to
1612 * track the shard membership and its life cycle is tied
1613 * to this Redis process. Therefore, the caller must not
1614 * release the list. */
1615list *clusterGetNodesInMyShard(clusterNode *node) {
1616 sds s = sdsnewlen(node->shard_id, CLUSTER_NAMELEN);
1617 dictEntry *de = dictFind(server.cluster->shards,s);
1618 sdsfree(s);
1619 return (de != NULL) ? dictGetVal(de) : NULL;
1620}
1621
1622/* This is only used after the handshake. When we connect a given IP/PORT
1623 * as a result of CLUSTER MEET we don't have the node name yet, so we
1624 * pick a random one, and will fix it when we receive the PONG request using
1625 * this function. */
1626void clusterRenameNode(clusterNode *node, char *newname) {
1627 int retval;
1628 sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);
1629
1630 serverLog(LL_DEBUG,"Renaming node %.40s into %.40s",
1631 node->name, newname);
1632 retval = dictDelete(server.cluster->nodes, s);
1633 sdsfree(s);
1634 serverAssert(retval == DICT_OK);
1635 memcpy(node->name, newname, CLUSTER_NAMELEN);
1636 clusterAddNode(node);
1637 clusterAddNodeToShard(node->shard_id, node);
1638}
1639
1640void clusterAddNodeToShard(const char *shard_id, clusterNode *node) {
1641 sds s = sdsnewlen(shard_id, CLUSTER_NAMELEN);
1642 dictEntry *de = dictFind(server.cluster->shards,s);
1643 if (de == NULL) {
1644 list *l = listCreate();
1645 listAddNodeTail(l, node);
1646 serverAssert(dictAdd(server.cluster->shards, s, l) == DICT_OK);
1647 } else {
1648 list *l = dictGetVal(de);
1649 if (listSearchKey(l, node) == NULL) {
1650 listAddNodeTail(l, node);
1651 }
1652 sdsfree(s);
1653 }
1654}
1655
1656void clusterRemoveNodeFromShard(clusterNode *node) {
1657 sds s = sdsnewlen(node->shard_id, CLUSTER_NAMELEN);
1658 dictEntry *de = dictFind(server.cluster->shards, s);
1659 if (de != NULL) {
1660 list *l = dictGetVal(de);
1661 listNode *ln = listSearchKey(l, node);
1662 if (ln != NULL) {
1663 listDelNode(l, ln);
1664 }
1665 if (listLength(l) == 0) {
1666 dictDelete(server.cluster->shards, s);
1667 }
1668 }
1669 sdsfree(s);
1670}
1671
1672/* -----------------------------------------------------------------------------
1673 * CLUSTER config epoch handling
1674 * -------------------------------------------------------------------------- */
1675
1676/* Return the greatest configEpoch found in the cluster, or the current
1677 * epoch if greater than any node configEpoch. */
1678uint64_t clusterGetMaxEpoch(void) {
1679 uint64_t max = 0;
1680 dictIterator di;
1681 dictEntry *de;
1682
1683 dictInitSafeIterator(&di, server.cluster->nodes);
1684 while((de = dictNext(&di)) != NULL) {
1685 clusterNode *node = dictGetVal(de);
1686 if (node->configEpoch > max) max = node->configEpoch;
1687 }
1688 dictResetIterator(&di);
1689 if (max < server.cluster->currentEpoch) max = server.cluster->currentEpoch;
1690 return max;
1691}
1692
1693/* If this node epoch is zero or is not already the greatest across the
1694 * cluster (from the POV of the local configuration), this function will:
1695 *
1696 * 1) Generate a new config epoch, incrementing the current epoch.
1697 * 2) Assign the new epoch to this node, WITHOUT any consensus.
1698 * 3) Persist the configuration on disk before sending packets with the
1699 * new configuration.
1700 *
1701 * If the new config epoch is generated and assigned, C_OK is returned,
1702 * otherwise C_ERR is returned (since the node has already the greatest
1703 * configuration around) and no operation is performed.
1704 *
1705 * Important note: this function violates the principle that config epochs
1706 * should be generated with consensus and should be unique across the cluster.
1707 * However Redis Cluster uses this auto-generated new config epochs in two
1708 * cases:
1709 *
1710 * 1) When slots are closed after importing. Otherwise resharding would be
1711 * too expensive.
1712 * 2) When CLUSTER FAILOVER is called with options that force a slave to
1713 * failover its master even if there is not master majority able to
1714 * create a new configuration epoch.
1715 *
1716 * Redis Cluster will not explode using this function, even in the case of
1717 * a collision between this node and another node, generating the same
1718 * configuration epoch unilaterally, because the config epoch conflict
1719 * resolution algorithm will eventually move colliding nodes to different
1720 * config epochs. However using this function may violate the "last failover
1721 * wins" rule, so should only be used with care. */
1722int clusterBumpConfigEpochWithoutConsensus(void) {
1723 uint64_t maxEpoch = clusterGetMaxEpoch();
1724
1725 if (myself->configEpoch == 0 ||
1726 myself->configEpoch != maxEpoch)
1727 {
1728 server.cluster->currentEpoch++;
1729 myself->configEpoch = server.cluster->currentEpoch;
1730 /* Save the new config epoch and broadcast it to the other nodes. */
1731 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
1732 CLUSTER_TODO_FSYNC_CONFIG|
1733 CLUSTER_TODO_BROADCAST_PONG);
1734 serverLog(LL_NOTICE,
1735 "New configEpoch set to %llu",
1736 (unsigned long long) myself->configEpoch);
1737 return C_OK;
1738 } else {
1739 return C_ERR;
1740 }
1741}
1742
1743/* This function is called when this node is a master, and we receive from
1744 * another master a configuration epoch that is equal to our configuration
1745 * epoch.
1746 *
1747 * BACKGROUND
1748 *
1749 * It is not possible that different slaves get the same config
1750 * epoch during a failover election, because the slaves need to get voted
1751 * by a majority. However when we perform a manual resharding of the cluster
1752 * the node will assign a configuration epoch to itself without to ask
1753 * for agreement. Usually resharding happens when the cluster is working well
1754 * and is supervised by the sysadmin, however it is possible for a failover
1755 * to happen exactly while the node we are resharding a slot to assigns itself
1756 * a new configuration epoch, but before it is able to propagate it.
1757 *
1758 * So technically it is possible in this condition that two nodes end with
1759 * the same configuration epoch.
1760 *
1761 * Another possibility is that there are bugs in the implementation causing
1762 * this to happen.
1763 *
1764 * Moreover when a new cluster is created, all the nodes start with the same
1765 * configEpoch. This collision resolution code allows nodes to automatically
1766 * end with a different configEpoch at startup automatically.
1767 *
1768 * In all the cases, we want a mechanism that resolves this issue automatically
1769 * as a safeguard. The same configuration epoch for masters serving different
1770 * set of slots is not harmful, but it is if the nodes end serving the same
1771 * slots for some reason (manual errors or software bugs) without a proper
1772 * failover procedure.
1773 *
1774 * In general we want a system that eventually always ends with different
1775 * masters having different configuration epochs whatever happened, since
1776 * nothing is worse than a split-brain condition in a distributed system.
1777 *
1778 * BEHAVIOR
1779 *
1780 * When this function gets called, what happens is that if this node
1781 * has the lexicographically smaller Node ID compared to the other node
1782 * with the conflicting epoch (the 'sender' node), it will assign itself
1783 * the greatest configuration epoch currently detected among nodes plus 1.
1784 *
1785 * This means that even if there are multiple nodes colliding, the node
1786 * with the greatest Node ID never moves forward, so eventually all the nodes
1787 * end with a different configuration epoch.
1788 */
1789void clusterHandleConfigEpochCollision(clusterNode *sender) {
1790 /* Prerequisites: nodes have the same configEpoch and are both masters. */
1791 if (sender->configEpoch != myself->configEpoch ||
1792 !clusterNodeIsMaster(sender) || !clusterNodeIsMaster(myself)) return;
1793 /* Don't act if the colliding node has a smaller Node ID. */
1794 if (memcmp(sender->name,myself->name,CLUSTER_NAMELEN) <= 0) return;
1795 /* Get the next ID available at the best of this node knowledge. */
1796 server.cluster->currentEpoch++;
1797 myself->configEpoch = server.cluster->currentEpoch;
1798 clusterSaveConfigOrDie(1);
1799 /* Broadcast new config epoch to the other nodes. */
1800 clusterDoBeforeSleep(CLUSTER_TODO_BROADCAST_PONG);
1801 serverLog(LL_VERBOSE,
1802 "WARNING: configEpoch collision with node %.40s (%s)."
1803 " configEpoch set to %llu",
1804 sender->name,sender->human_nodename,
1805 (unsigned long long) myself->configEpoch);
1806}
1807
1808/* -----------------------------------------------------------------------------
1809 * CLUSTER nodes blacklist
1810 *
1811 * The nodes blacklist is just a way to ensure that a given node with a given
1812 * Node ID is not re-added before some time elapsed (this time is specified
1813 * in seconds in CLUSTER_BLACKLIST_TTL).
1814 *
1815 * This is useful when we want to remove a node from the cluster completely:
1816 * when CLUSTER FORGET is called, it also puts the node into the blacklist so
1817 * that even if we receive gossip messages from other nodes that still remember
1818 * about the node we want to remove, we don't re-add it before some time.
1819 *
1820 * Currently the CLUSTER_BLACKLIST_TTL is set to 1 minute, this means
1821 * that redis-cli has 60 seconds to send CLUSTER FORGET messages to nodes
1822 * in the cluster without dealing with the problem of other nodes re-adding
1823 * back the node to nodes we already sent the FORGET command to.
1824 *
1825 * The data structure used is a hash table with an sds string representing
1826 * the node ID as key, and the time when it is ok to re-add the node as
1827 * value.
1828 * -------------------------------------------------------------------------- */
1829
1830#define CLUSTER_BLACKLIST_TTL 60 /* 1 minute. */
1831
1832
1833/* Before of the addNode() or Exists() operations we always remove expired
1834 * entries from the black list. This is an O(N) operation but it is not a
1835 * problem since add / exists operations are called very infrequently and
1836 * the hash table is supposed to contain very little elements at max.
1837 * However without the cleanup during long uptime and with some automated
1838 * node add/removal procedures, entries could accumulate. */
1839void clusterBlacklistCleanup(void) {
1840 dictIterator di;
1841 dictEntry *de;
1842
1843 dictInitSafeIterator(&di, server.cluster->nodes_black_list);
1844 while((de = dictNext(&di)) != NULL) {
1845 int64_t expire = dictGetUnsignedIntegerVal(de);
1846
1847 if (expire < server.unixtime)
1848 dictDelete(server.cluster->nodes_black_list,dictGetKey(de));
1849 }
1850 dictResetIterator(&di);
1851}
1852
1853/* Cleanup the blacklist and add a new node ID to the black list. */
1854void clusterBlacklistAddNode(clusterNode *node) {
1855 dictEntry *de;
1856 sds id = sdsnewlen(node->name,CLUSTER_NAMELEN);
1857
1858 clusterBlacklistCleanup();
1859 if (dictAdd(server.cluster->nodes_black_list,id,NULL) == DICT_OK) {
1860 /* If the key was added, duplicate the sds string representation of
1861 * the key for the next lookup. We'll free it at the end. */
1862 id = sdsdup(id);
1863 }
1864 de = dictFind(server.cluster->nodes_black_list,id);
1865 dictSetUnsignedIntegerVal(de,time(NULL)+CLUSTER_BLACKLIST_TTL);
1866 sdsfree(id);
1867}
1868
1869/* Return non-zero if the specified node ID exists in the blacklist.
1870 * You don't need to pass an sds string here, any pointer to 40 bytes
1871 * will work. */
1872int clusterBlacklistExists(char *nodeid, size_t len) {
1873 sds id = sdsnewlen(nodeid,len);
1874 int retval;
1875
1876 clusterBlacklistCleanup();
1877 retval = dictFind(server.cluster->nodes_black_list,id) != NULL;
1878 sdsfree(id);
1879 return retval;
1880}
1881
1882/* -----------------------------------------------------------------------------
1883 * CLUSTER messages exchange - PING/PONG and gossip
1884 * -------------------------------------------------------------------------- */
1885
1886/* This function checks if a given node should be marked as FAIL.
1887 * It happens if the following conditions are met:
1888 *
1889 * 1) We received enough failure reports from other master nodes via gossip.
1890 * Enough means that the majority of the masters signaled the node is
1891 * down recently.
1892 * 2) We believe this node is in PFAIL state.
1893 *
1894 * If a failure is detected we also inform the whole cluster about this
1895 * event trying to force every other node to set the FAIL flag for the node.
1896 *
1897 * Note that the form of agreement used here is weak, as we collect the majority
1898 * of masters state during some time, and even if we force agreement by
1899 * propagating the FAIL message, because of partitions we may not reach every
1900 * node. However:
1901 *
1902 * 1) Either we reach the majority and eventually the FAIL state will propagate
1903 * to all the cluster.
1904 * 2) Or there is no majority so no slave promotion will be authorized and the
1905 * FAIL flag will be cleared after some time.
1906 */
1907void markNodeAsFailingIfNeeded(clusterNode *node) {
1908 int failures;
1909 int needed_quorum = (server.cluster->size / 2) + 1;
1910
1911 if (!nodeTimedOut(node)) return; /* We can reach it. */
1912 if (nodeFailed(node)) return; /* Already FAILing. */
1913
1914 failures = clusterNodeFailureReportsCount(node);
1915 /* Also count myself as a voter if I'm a master. */
1916 if (clusterNodeIsMaster(myself)) failures++;
1917 if (failures < needed_quorum) return; /* No weak agreement from masters. */
1918
1919 serverLog(LL_NOTICE,
1920 "Marking node %.40s (%s) as failing (quorum reached).", node->name, node->human_nodename);
1921
1922 /* Mark the node as failing. */
1923 node->flags &= ~CLUSTER_NODE_PFAIL;
1924 node->flags |= CLUSTER_NODE_FAIL;
1925 node->fail_time = mstime();
1926
1927 /* Broadcast the failing node name to everybody, forcing all the other
1928 * reachable nodes to flag the node as FAIL.
1929 * We do that even if this node is a replica and not a master: anyway
1930 * the failing state is triggered collecting failure reports from masters,
1931 * so here the replica is only helping propagating this status. */
1932 clusterSendFail(node->name);
1933 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1934}
1935
1936/* This function is called only if a node is marked as FAIL, but we are able
1937 * to reach it again. It checks if there are the conditions to undo the FAIL
1938 * state. */
1939void clearNodeFailureIfNeeded(clusterNode *node) {
1940 mstime_t now = mstime();
1941
1942 serverAssert(nodeFailed(node));
1943
1944 /* For slaves we always clear the FAIL flag if we can contact the
1945 * node again. */
1946 if (nodeIsSlave(node) || node->numslots == 0) {
1947 serverLog(LL_NOTICE,
1948 "Clear FAIL state for node %.40s (%s):%s is reachable again.",
1949 node->name,node->human_nodename,
1950 nodeIsSlave(node) ? "replica" : "master without slots");
1951 node->flags &= ~CLUSTER_NODE_FAIL;
1952 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1953 }
1954
1955 /* If it is a master and...
1956 * 1) The FAIL state is old enough.
1957 * 2) It is yet serving slots from our point of view (not failed over).
1958 * Apparently no one is going to fix these slots, clear the FAIL flag. */
1959 if (clusterNodeIsMaster(node) && node->numslots > 0 &&
1960 (now - node->fail_time) >
1961 (server.cluster_node_timeout * CLUSTER_FAIL_UNDO_TIME_MULT))
1962 {
1963 serverLog(LL_NOTICE,
1964 "Clear FAIL state for node %.40s (%s): is reachable again and nobody is serving its slots after some time.",
1965 node->name, node->human_nodename);
1966 node->flags &= ~CLUSTER_NODE_FAIL;
1967 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
1968 }
1969}
1970
1971/* Return true if we already have a node in HANDSHAKE state matching the
1972 * specified ip address and port number. This function is used in order to
1973 * avoid adding a new handshake node for the same address multiple times. */
1974int clusterHandshakeInProgress(char *ip, int port, int cport) {
1975 dictIterator di;
1976 dictEntry *de;
1977
1978 dictInitSafeIterator(&di, server.cluster->nodes);
1979 while((de = dictNext(&di)) != NULL) {
1980 clusterNode *node = dictGetVal(de);
1981
1982 if (!nodeInHandshake(node)) continue;
1983 if (!strcasecmp(node->ip,ip) &&
1984 getNodeDefaultClientPort(node) == port &&
1985 node->cport == cport) break;
1986 }
1987 dictResetIterator(&di);
1988 return de != NULL;
1989}
1990
1991/* Start a handshake with the specified address if there is not one
1992 * already in progress. Returns non-zero if the handshake was actually
1993 * started. On error zero is returned and errno is set to one of the
1994 * following values:
1995 *
1996 * EAGAIN - There is already a handshake in progress for this address.
1997 * EINVAL - IP or port are not valid. */
1998int clusterStartHandshake(char *ip, int port, int cport) {
1999 clusterNode *n;
2000 char norm_ip[NET_IP_STR_LEN];
2001 struct sockaddr_storage sa;
2002
2003 /* IP sanity check */
2004 if (inet_pton(AF_INET,ip,
2005 &(((struct sockaddr_in *)&sa)->sin_addr)))
2006 {
2007 sa.ss_family = AF_INET;
2008 } else if (inet_pton(AF_INET6,ip,
2009 &(((struct sockaddr_in6 *)&sa)->sin6_addr)))
2010 {
2011 sa.ss_family = AF_INET6;
2012 } else {
2013 errno = EINVAL;
2014 return 0;
2015 }
2016
2017 /* Port sanity check */
2018 if (port <= 0 || port > 65535 || cport <= 0 || cport > 65535) {
2019 errno = EINVAL;
2020 return 0;
2021 }
2022
2023 /* Set norm_ip as the normalized string representation of the node
2024 * IP address. */
2025 memset(norm_ip,0,NET_IP_STR_LEN);
2026 if (sa.ss_family == AF_INET)
2027 inet_ntop(AF_INET,
2028 (void*)&(((struct sockaddr_in *)&sa)->sin_addr),
2029 norm_ip,NET_IP_STR_LEN);
2030 else
2031 inet_ntop(AF_INET6,
2032 (void*)&(((struct sockaddr_in6 *)&sa)->sin6_addr),
2033 norm_ip,NET_IP_STR_LEN);
2034
2035 if (clusterHandshakeInProgress(norm_ip,port,cport)) {
2036 errno = EAGAIN;
2037 return 0;
2038 }
2039
2040 /* Add the node with a random address (NULL as first argument to
2041 * createClusterNode()). Everything will be fixed during the
2042 * handshake. */
2043 n = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_MEET);
2044 memcpy(n->ip,norm_ip,sizeof(n->ip));
2045 if (server.tls_cluster) {
2046 n->tls_port = port;
2047 } else {
2048 n->tcp_port = port;
2049 }
2050 n->cport = cport;
2051 clusterAddNode(n);
2052 return 1;
2053}
2054
2055static void getClientPortFromClusterMsg(clusterMsg *hdr, int *tls_port, int *tcp_port) {
2056 if (server.tls_cluster) {
2057 *tls_port = ntohs(hdr->port);
2058 *tcp_port = ntohs(hdr->pport);
2059 } else {
2060 *tls_port = ntohs(hdr->pport);
2061 *tcp_port = ntohs(hdr->port);
2062 }
2063}
2064
2065static void getClientPortFromGossip(clusterMsgDataGossip *g, int *tls_port, int *tcp_port) {
2066 if (server.tls_cluster) {
2067 *tls_port = ntohs(g->port);
2068 *tcp_port = ntohs(g->pport);
2069 } else {
2070 *tls_port = ntohs(g->pport);
2071 *tcp_port = ntohs(g->port);
2072 }
2073}
2074
2075/* Returns a string with the byte representation of the node ID (i.e. nodename)
2076 * along with 8 trailing bytes for debugging purposes. */
2077char *getCorruptedNodeIdByteString(clusterMsgDataGossip *gossip_msg) {
2078 const int num_bytes = CLUSTER_NAMELEN + 8;
2079 /* Allocate enough room for 4 chars per byte + null terminator */
2080 char *byte_string = (char*) zmalloc((num_bytes*4) + 1);
2081 const char *name_ptr = gossip_msg->nodename;
2082
2083 /* Ensure we won't print beyond the bounds of the message */
2084 serverAssert(name_ptr + num_bytes <= (char*)gossip_msg + sizeof(clusterMsgDataGossip));
2085
2086 for (int i = 0; i < num_bytes; i++) {
2087 snprintf(byte_string + 4*i, 5, "\\x%02hhX", name_ptr[i]);
2088 }
2089 return byte_string;
2090}
2091
2092/* Returns the number of nodes in the gossip with invalid IDs. */
2093int verifyGossipSectionNodeIds(clusterMsgDataGossip *g, uint16_t count) {
2094 int invalid_ids = 0;
2095 for (int i = 0; i < count; i++) {
2096 const char *nodename = g[i].nodename;
2097 if (verifyClusterNodeId(nodename, CLUSTER_NAMELEN) != C_OK) {
2098 invalid_ids++;
2099 char *raw_node_id = getCorruptedNodeIdByteString(g);
2100 serverLog(LL_WARNING,
2101 "Received gossip about a node with invalid ID %.40s. For debugging purposes, "
2102 "the 48 bytes including the invalid ID and 8 trailing bytes are: %s",
2103 nodename, raw_node_id);
2104 zfree(raw_node_id);
2105 }
2106 }
2107 return invalid_ids;
2108}
2109
2110/* Process the gossip section of PING or PONG packets.
2111 * Note that this function assumes that the packet is already sanity-checked
2112 * by the caller, not in the content of the gossip section, but in the
2113 * length. */
2114void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) {
2115 uint16_t count = ntohs(hdr->count);
2116 clusterMsgDataGossip *g = (clusterMsgDataGossip*) hdr->data.ping.gossip;
2117 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender, CLUSTER_NAMELEN);
2118
2119 /* Abort if the gossip contains invalid node IDs to avoid adding incorrect information to
2120 * the nodes dictionary. An invalid ID indicates memory corruption on the sender side. */
2121 int invalid_ids = verifyGossipSectionNodeIds(g, count);
2122 if (invalid_ids) {
2123 if (sender) {
2124 serverLog(LL_WARNING, "Node %.40s (%s) gossiped %d nodes with invalid IDs.", sender->name, sender->human_nodename, invalid_ids);
2125 } else {
2126 serverLog(LL_WARNING, "Unknown node gossiped %d nodes with invalid IDs.", invalid_ids);
2127 }
2128 return;
2129 }
2130
2131 while(count--) {
2132 uint16_t flags = ntohs(g->flags);
2133 clusterNode *node;
2134 sds ci;
2135
2136 if (server.verbosity == LL_DEBUG) {
2137 ci = representClusterNodeFlags(sdsempty(), flags);
2138 serverLog(LL_DEBUG,"GOSSIP %.40s %s:%d@%d %s",
2139 g->nodename,
2140 g->ip,
2141 ntohs(g->port),
2142 ntohs(g->cport),
2143 ci);
2144 sdsfree(ci);
2145 }
2146
2147 /* Convert port and pport into TCP port and TLS port. */
2148 int msg_tls_port, msg_tcp_port;
2149 getClientPortFromGossip(g, &msg_tls_port, &msg_tcp_port);
2150
2151 /* Update our state accordingly to the gossip sections */
2152 node = clusterLookupNode(g->nodename, CLUSTER_NAMELEN);
2153 /* Ignore gossips about self. */
2154 if (node && node != myself) {
2155 /* We already know this node.
2156 Handle failure reports, only when the sender is a master. */
2157 if (sender && clusterNodeIsMaster(sender)) {
2158 if (flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) {
2159 if (clusterNodeAddFailureReport(node,sender)) {
2160 serverLog(LL_VERBOSE,
2161 "Node %.40s (%s) reported node %.40s (%s) as not reachable.",
2162 sender->name, sender->human_nodename, node->name, node->human_nodename);
2163 }
2164 markNodeAsFailingIfNeeded(node);
2165 } else {
2166 if (clusterNodeDelFailureReport(node,sender)) {
2167 serverLog(LL_VERBOSE,
2168 "Node %.40s (%s) reported node %.40s (%s) is back online.",
2169 sender->name, sender->human_nodename, node->name, node->human_nodename);
2170 }
2171 }
2172 }
2173
2174 /* If from our POV the node is up (no failure flags are set),
2175 * we have no pending ping for the node, nor we have failure
2176 * reports for this node, update the last pong time with the
2177 * one we see from the other nodes. */
2178 if (!(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
2179 node->ping_sent == 0 &&
2180 clusterNodeFailureReportsCount(node) == 0)
2181 {
2182 mstime_t pongtime = ntohl(g->pong_received);
2183 pongtime *= 1000; /* Convert back to milliseconds. */
2184
2185 /* Replace the pong time with the received one only if
2186 * it's greater than our view but is not in the future
2187 * (with 500 milliseconds tolerance) from the POV of our
2188 * clock. */
2189 if (pongtime <= (server.mstime+500) &&
2190 pongtime > node->pong_received)
2191 {
2192 node->pong_received = pongtime;
2193 }
2194 }
2195
2196 /* If we already know this node, but it is not reachable, and
2197 * we see a different address in the gossip section of a node that
2198 * can talk with this other node, update the address, disconnect
2199 * the old link if any, so that we'll attempt to connect with the
2200 * new address. */
2201 if (node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL) &&
2202 !(flags & CLUSTER_NODE_NOADDR) &&
2203 !(flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) &&
2204 (strcasecmp(node->ip,g->ip) ||
2205 node->tls_port != (server.tls_cluster ? ntohs(g->port) : ntohs(g->pport)) ||
2206 node->tcp_port != (server.tls_cluster ? ntohs(g->pport) : ntohs(g->port)) ||
2207 node->cport != ntohs(g->cport)))
2208 {
2209 if (node->link) freeClusterLink(node->link);
2210 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
2211 node->tcp_port = msg_tcp_port;
2212 node->tls_port = msg_tls_port;
2213 node->cport = ntohs(g->cport);
2214 node->flags &= ~CLUSTER_NODE_NOADDR;
2215 }
2216 } else if (!node) {
2217 /* If it's not in NOADDR state and we don't have it, we
2218 * add it to our trusted dict with exact nodeid and flag.
2219 * Note that we cannot simply start a handshake against
2220 * this IP/PORT pairs, since IP/PORT can be reused already,
2221 * otherwise we risk joining another cluster.
2222 *
2223 * Note that we require that the sender of this gossip message
2224 * is a well known node in our cluster, otherwise we risk
2225 * joining another cluster. */
2226 if (sender &&
2227 !(flags & CLUSTER_NODE_NOADDR) &&
2228 !clusterBlacklistExists(g->nodename, CLUSTER_NAMELEN))
2229 {
2230 clusterNode *node;
2231 node = createClusterNode(g->nodename, flags);
2232 memcpy(node->ip,g->ip,NET_IP_STR_LEN);
2233 node->tcp_port = msg_tcp_port;
2234 node->tls_port = msg_tls_port;
2235 node->cport = ntohs(g->cport);
2236 clusterAddNode(node);
2237 clusterAddNodeToShard(node->shard_id, node);
2238 }
2239 }
2240
2241 /* Next node */
2242 g++;
2243 }
2244}
2245
2246/* IP -> string conversion. 'buf' is supposed to at least be 46 bytes.
2247 * If 'announced_ip' length is non-zero, it is used instead of extracting
2248 * the IP from the socket peer address. */
2249int nodeIp2String(char *buf, clusterLink *link, char *announced_ip) {
2250 if (announced_ip[0] != '\0') {
2251 memcpy(buf,announced_ip,NET_IP_STR_LEN);
2252 buf[NET_IP_STR_LEN-1] = '\0'; /* We are not sure the input is sane. */
2253 return C_OK;
2254 } else {
2255 if (connAddrPeerName(link->conn, buf, NET_IP_STR_LEN, NULL) == -1) {
2256 serverLog(LL_NOTICE, "Error converting peer IP to string: %s",
2257 link->conn ? connGetLastError(link->conn) : "no link");
2258 return C_ERR;
2259 }
2260 return C_OK;
2261 }
2262}
2263
2264/* Update the node address to the IP address that can be extracted
2265 * from link->fd, or if hdr->myip is non empty, to the address the node
2266 * is announcing us. The port is taken from the packet header as well.
2267 *
2268 * If the address or port changed, disconnect the node link so that we'll
2269 * connect again to the new address.
2270 *
2271 * If the ip/port pair are already correct no operation is performed at
2272 * all.
2273 *
2274 * The function returns 0 if the node address is still the same,
2275 * otherwise 1 is returned. */
2276int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link,
2277 clusterMsg *hdr)
2278{
2279 char ip[NET_IP_STR_LEN] = {0};
2280 int cport = ntohs(hdr->cport);
2281 int tcp_port, tls_port;
2282 getClientPortFromClusterMsg(hdr, &tls_port, &tcp_port);
2283
2284 /* We don't proceed if the link is the same as the sender link, as this
2285 * function is designed to see if the node link is consistent with the
2286 * symmetric link that is used to receive PINGs from the node.
2287 *
2288 * As a side effect this function never frees the passed 'link', so
2289 * it is safe to call during packet processing. */
2290 if (link == node->link) return 0;
2291
2292 /* If the peer IP is unavailable for some reasons like invalid fd or closed
2293 * link, just give up the update this time, and the update will be retried
2294 * in the next round of PINGs */
2295 if (nodeIp2String(ip,link,hdr->myip) == C_ERR) return 0;
2296
2297 if (node->tcp_port == tcp_port && node->cport == cport && node->tls_port == tls_port &&
2298 strcmp(ip,node->ip) == 0) return 0;
2299
2300 /* IP / port is different, update it. */
2301 memcpy(node->ip,ip,sizeof(ip));
2302 node->tcp_port = tcp_port;
2303 node->tls_port = tls_port;
2304 node->cport = cport;
2305 if (node->link) freeClusterLink(node->link);
2306 node->flags &= ~CLUSTER_NODE_NOADDR;
2307 serverLog(LL_NOTICE,"Address updated for node %.40s (%s), now %s:%d",
2308 node->name, node->human_nodename, node->ip, getNodeDefaultClientPort(node));
2309
2310 /* Check if this is our master and we have to change the
2311 * replication target as well. */
2312 if (nodeIsSlave(myself) && myself->slaveof == node)
2313 replicationSetMaster(node->ip, getNodeDefaultReplicationPort(node));
2314 return 1;
2315}
2316
2317/* Reconfigure the specified node 'n' as a master. This function is called when
2318 * a node that we believed to be a slave is now acting as master in order to
2319 * update the state of the node. */
2320void clusterSetNodeAsMaster(clusterNode *n) {
2321 if (clusterNodeIsMaster(n)) return;
2322
2323 if (n->slaveof) {
2324 clusterNodeRemoveSlave(n->slaveof,n);
2325 if (n != myself) n->flags |= CLUSTER_NODE_MIGRATE_TO;
2326 }
2327 n->flags &= ~CLUSTER_NODE_SLAVE;
2328 n->flags |= CLUSTER_NODE_MASTER;
2329 n->slaveof = NULL;
2330
2331 /* Update config and state. */
2332 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2333 CLUSTER_TODO_UPDATE_STATE);
2334}
2335
2336/* This function is called when we receive a master configuration via a
2337 * PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
2338 * node, and the set of slots claimed under this configEpoch.
2339 *
2340 * What we do is to rebind the slots with newer configuration compared to our
2341 * local configuration, and if needed, we turn ourself into a replica of the
2342 * node (see the function comments for more info).
2343 *
2344 * The 'sender' is the node for which we received a configuration update.
2345 * Sometimes it is not actually the "Sender" of the information, like in the
2346 * case we receive the info via an UPDATE packet. */
2347void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
2348 int j;
2349 clusterNode *curmaster = NULL, *newmaster = NULL;
2350 /* The dirty slots list is a list of slots for which we lose the ownership
2351 * while having still keys inside. This usually happens after a failover
2352 * or after a manual cluster reconfiguration operated by the admin.
2353 *
2354 * If the update message is not able to demote a master to slave (in this
2355 * case we'll resync with the master updating the whole key space), we
2356 * need to delete all the keys in the slots we lost ownership. */
2357 uint16_t dirty_slots[CLUSTER_SLOTS];
2358 int dirty_slots_count = 0;
2359
2360 /* We should detect if sender is new master of our shard.
2361 * We will know it if all our slots were migrated to sender, and sender
2362 * has no slots except ours */
2363 int sender_slots = 0;
2364 int migrated_our_slots = 0;
2365
2366 /* Here we set curmaster to this node or the node this node
2367 * replicates to if it's a slave. In the for loop we are
2368 * interested to check if slots are taken away from curmaster. */
2369 curmaster = clusterNodeIsMaster(myself) ? myself : myself->slaveof;
2370
2371 if (sender == myself) {
2372 serverLog(LL_NOTICE,"Discarding UPDATE message about myself.");
2373 return;
2374 }
2375
2376 slotRangeArray *sra = NULL;
2377 for (j = 0; j < CLUSTER_SLOTS; j++) {
2378 if (bitmapTestBit(slots,j)) {
2379 sender_slots++;
2380
2381 /* The slot is already bound to the sender of this message. */
2382 if (server.cluster->slots[j] == sender) {
2383 bitmapClearBit(server.cluster->owner_not_claiming_slot, j);
2384 continue;
2385 }
2386
2387 /* The slot is in importing state, it should be modified only
2388 * manually via redis-cli (example: a resharding is in progress
2389 * and the migrating side slot was already closed and is advertising
2390 * a new config. We still want the slot to be closed manually). */
2391 if (server.cluster->importing_slots_from[j]) continue;
2392
2393 /* We rebind the slot to the new node claiming it if:
2394 * 1) The slot was unassigned or the previous owner no longer owns the slot or
2395 * the new node claims it with a greater configEpoch.
2396 * 2) We are not currently importing the slot. */
2397 if (isSlotUnclaimed(j) ||
2398 server.cluster->slots[j]->configEpoch < senderConfigEpoch)
2399 {
2400 /* After completing slot ranges migration, the destination node
2401 * will broadcast a PONG message to all the nodes. We need to
2402 * detect that the slot was moved from us to the sender, and
2403 * call asmNotifyConfigUpdated() to notify the ASM state machine. */
2404 if (server.cluster->slots[j] == myself && sender != myself)
2405 sra = slotRangeArrayAppend(sra, j);
2406
2407 /* Was this slot mine, and still contains keys? Mark it as
2408 * a dirty slot. */
2409 if (server.cluster->slots[j] == myself &&
2410 countKeysInSlot(j) &&
2411 sender != myself)
2412 {
2413 dirty_slots[dirty_slots_count] = j;
2414 dirty_slots_count++;
2415 }
2416
2417 if (server.cluster->slots[j] == curmaster) {
2418 newmaster = sender;
2419 migrated_our_slots++;
2420 }
2421 clusterDelSlot(j);
2422 clusterAddSlot(sender,j);
2423 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2424 CLUSTER_TODO_UPDATE_STATE|
2425 CLUSTER_TODO_FSYNC_CONFIG);
2426 }
2427 } else if (server.cluster->slots[j] == sender) {
2428 /* The slot is currently bound to the sender but the sender is no longer
2429 * claiming it. We don't want to unbind the slot yet as it can cause the cluster
2430 * to move to FAIL state and also throw client error. Keeping the slot bound to
2431 * the previous owner will cause a few client side redirects, but won't throw
2432 * any errors. We will keep track of the uncertainty in ownership to avoid
2433 * propagating misinformation about this slot's ownership using UPDATE
2434 * messages. */
2435 bitmapSetBit(server.cluster->owner_not_claiming_slot, j);
2436 }
2437 }
2438
2439 /* Notify ASM about the config update */
2440 struct asmTask *asm_task = NULL;
2441 if (sra && sra->num_ranges > 0 && server.masterhost == NULL) {
2442 sds err = NULL;
2443 asm_task = asmLookupTaskBySlotRangeArray(sra);
2444 if (!asm_task) {
2445 /* If no task was found, it means the config update is not related
2446 * to current ASM task, but this node learned about the config
2447 * update from cluster protocol, and we need to cancel any
2448 * conflicting tasks that overlap with the slot ranges. */
2449 clusterAsmCancelBySlotRangeArray(sra, "slots configuration updated");
2450 } else if (asmNotifyConfigUpdated(asm_task, &err) != C_OK) {
2451 serverLog(LL_WARNING, "ASM config update failed: %s", err);
2452 sdsfree(err);
2453 }
2454 }
2455 slotRangeArrayFree(sra);
2456
2457 /* After updating the slots configuration, don't do any actual change
2458 * in the state of the server if a module disabled Redis Cluster
2459 * keys redirections. */
2460 if (server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_REDIRECTION)
2461 return;
2462
2463 /* If at least one slot was reassigned from a node to another node
2464 * with a greater configEpoch, it is possible that:
2465 * 1) We are a master left without slots. This means that we were
2466 * failed over and we should turn into a replica of the new
2467 * master.
2468 * 2) We are a slave and our master is left without slots. We need
2469 * to replicate to the new slots owner. */
2470 if (newmaster && curmaster->numslots == 0 &&
2471 (server.cluster_allow_replica_migration ||
2472 sender_slots == migrated_our_slots)) {
2473 serverLog(LL_NOTICE,
2474 "Configuration change detected. Reconfiguring myself "
2475 "as a replica of %.40s (%s)", sender->name, sender->human_nodename);
2476 clusterSetMaster(sender);
2477 /* Save the new config and broadcast it to the other nodes. */
2478 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2479 CLUSTER_TODO_UPDATE_STATE|
2480 CLUSTER_TODO_FSYNC_CONFIG|
2481 CLUSTER_TODO_BROADCAST_PONG);
2482 } else if (myself->slaveof && myself->slaveof->slaveof &&
2483 /* In some rare case when CLUSTER FAILOVER TAKEOVER is used, it
2484 * can happen that myself is a replica of a replica of myself. If
2485 * this happens, we do nothing to avoid a crash and wait for the
2486 * admin to repair the cluster. */
2487 myself->slaveof->slaveof != myself)
2488 {
2489 /* Safeguard against sub-replicas. A replica's master can turn itself
2490 * into a replica if its last slot is removed. If no other node takes
2491 * over the slot, there is nothing else to trigger replica migration. */
2492 serverLog(LL_NOTICE,
2493 "I'm a sub-replica! Reconfiguring myself as a replica of grandmaster %.40s (%s)",
2494 myself->slaveof->slaveof->name, myself->slaveof->slaveof->human_nodename);
2495 clusterSetMaster(myself->slaveof->slaveof);
2496 /* Save the new config and broadcast to the other nodes. */
2497 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2498 CLUSTER_TODO_UPDATE_STATE|
2499 CLUSTER_TODO_FSYNC_CONFIG|
2500 CLUSTER_TODO_BROADCAST_PONG);
2501 } else if (dirty_slots_count && !asm_task) {
2502 /* If we are here, we received an update message which removed
2503 * ownership for certain slots we still have keys about, but still
2504 * we are serving some slots, so this master node was not demoted to
2505 * a slave.
2506 *
2507 * In order to maintain a consistent state between keys and slots
2508 * we need to remove all the keys from the slots we lost. */
2509 for (j = 0; j < dirty_slots_count; j++)
2510 clusterDelKeysInSlot(dirty_slots[j], 0);
2511 }
2512}
2513
2514/* Cluster ping extensions.
2515 *
2516 * The ping/pong/meet messages support arbitrary extensions to add additional
2517 * metadata to the messages that are sent between the various nodes in the
2518 * cluster. The extensions take the form:
2519 * [ Header length + type (8 bytes) ]
2520 * [ Extension information (Arbitrary length, but must be 8 byte padded) ]
2521 */
2522
2523
2524/* Returns the length of a given extension */
2525static uint32_t getPingExtLength(clusterMsgPingExt *ext) {
2526 return ntohl(ext->length);
2527}
2528
2529/* Returns the initial position of ping extensions. May return an invalid
2530 * address if there are no ping extensions. */
2531static clusterMsgPingExt *getInitialPingExt(clusterMsg *hdr, int count) {
2532 clusterMsgPingExt *initial = (clusterMsgPingExt*) &(hdr->data.ping.gossip[count]);
2533 return initial;
2534}
2535
2536/* Given a current ping extension, returns the start of the next extension. May return
2537 * an invalid address if there are no further ping extensions. */
2538static clusterMsgPingExt *getNextPingExt(clusterMsgPingExt *ext) {
2539 clusterMsgPingExt *next = (clusterMsgPingExt *) (((char *) ext) + getPingExtLength(ext));
2540 return next;
2541}
2542
2543/* All PING extensions must be 8-byte aligned */
2544uint32_t getAlignedPingExtSize(uint32_t dataSize) {
2545
2546 return sizeof(clusterMsgPingExt) + EIGHT_BYTE_ALIGN(dataSize);
2547}
2548
2549uint32_t getHostnamePingExtSize(void) {
2550 if (sdslen(myself->hostname) == 0) {
2551 return 0;
2552 }
2553 return getAlignedPingExtSize(sdslen(myself->hostname) + 1);
2554}
2555
2556uint32_t getHumanNodenamePingExtSize(void) {
2557 if (sdslen(myself->human_nodename) == 0) {
2558 return 0;
2559 }
2560 return getAlignedPingExtSize(sdslen(myself->human_nodename) + 1);
2561}
2562
2563uint32_t getShardIdPingExtSize(void) {
2564 return getAlignedPingExtSize(sizeof(clusterMsgPingExtShardId));
2565}
2566
2567uint32_t getInternalSecretPingExtSize(void) {
2568 return getAlignedPingExtSize(sizeof(clusterMsgPingExtInternalSecret));
2569}
2570
2571uint32_t getForgottenNodeExtSize(void) {
2572 return getAlignedPingExtSize(sizeof(clusterMsgPingExtForgottenNode));
2573}
2574
2575void *preparePingExt(clusterMsgPingExt *ext, uint16_t type, uint32_t length) {
2576 ext->type = htons(type);
2577 ext->length = htonl(length);
2578 return &ext->ext[0];
2579}
2580
2581clusterMsgPingExt *nextPingExt(clusterMsgPingExt *ext) {
2582 return (clusterMsgPingExt *)((char*)ext + ntohl(ext->length));
2583}
2584
2585/* 1. If a NULL hdr is provided, compute the extension size;
2586 * 2. If a non-NULL hdr is provided, write the hostname ping
2587 * extension at the start of the cursor. This function
2588 * will update the cursor to point to the end of the
2589 * written extension and will return the amount of bytes
2590 * written. */
2591uint32_t writePingExt(clusterMsg *hdr, int gossipcount) {
2592 uint16_t extensions = 0;
2593 uint32_t totlen = 0;
2594 clusterMsgPingExt *cursor = NULL;
2595 /* Set the initial extension position */
2596 if (hdr != NULL) {
2597 cursor = getInitialPingExt(hdr, gossipcount);
2598 }
2599
2600 /* hostname is optional */
2601 if (sdslen(myself->hostname) != 0) {
2602 if (cursor != NULL) {
2603 /* Populate hostname */
2604 clusterMsgPingExtHostname *ext = preparePingExt(cursor, CLUSTERMSG_EXT_TYPE_HOSTNAME, getHostnamePingExtSize());
2605 memcpy(ext->hostname, myself->hostname, sdslen(myself->hostname));
2606
2607 /* Move the write cursor */
2608 cursor = nextPingExt(cursor);
2609 }
2610
2611 totlen += getHostnamePingExtSize();
2612 extensions++;
2613 }
2614
2615 if (sdslen(myself->human_nodename) != 0) {
2616 if (cursor != NULL) {
2617 /* Populate human_nodename */
2618 clusterMsgPingExtHumanNodename *ext = preparePingExt(cursor, CLUSTERMSG_EXT_TYPE_HUMAN_NODENAME, getHumanNodenamePingExtSize());
2619 memcpy(ext->human_nodename, myself->human_nodename, sdslen(myself->human_nodename));
2620
2621 /* Move the write cursor */
2622 cursor = nextPingExt(cursor);
2623 }
2624
2625 totlen += getHumanNodenamePingExtSize();
2626 extensions++;
2627 }
2628
2629 /* Gossip forgotten nodes */
2630 if (dictSize(server.cluster->nodes_black_list) > 0) {
2631 dictIterator di;
2632 dictEntry *de;
2633
2634 dictInitIterator(&di, server.cluster->nodes_black_list);
2635 while ((de = dictNext(&di)) != NULL) {
2636 if (cursor != NULL) {
2637 uint64_t expire = dictGetUnsignedIntegerVal(de);
2638 if ((time_t)expire < server.unixtime) continue; /* already expired */
2639 uint64_t ttl = expire - server.unixtime;
2640 clusterMsgPingExtForgottenNode *ext = preparePingExt(cursor, CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE, getForgottenNodeExtSize());
2641 memcpy(ext->name, dictGetKey(de), CLUSTER_NAMELEN);
2642 ext->ttl = htonu64(ttl);
2643
2644 /* Move the write cursor */
2645 cursor = nextPingExt(cursor);
2646 }
2647 totlen += getForgottenNodeExtSize();
2648 extensions++;
2649 }
2650 dictResetIterator(&di);
2651 }
2652
2653 /* Populate shard_id */
2654 if (cursor != NULL) {
2655 clusterMsgPingExtShardId *ext = preparePingExt(cursor, CLUSTERMSG_EXT_TYPE_SHARDID, getShardIdPingExtSize());
2656 memcpy(ext->shard_id, myself->shard_id, CLUSTER_NAMELEN);
2657
2658 /* Move the write cursor */
2659 cursor = nextPingExt(cursor);
2660 }
2661 totlen += getShardIdPingExtSize();
2662 extensions++;
2663
2664 /* Populate internal secret */
2665 if (cursor != NULL) {
2666 clusterMsgPingExtInternalSecret *ext = preparePingExt(cursor, CLUSTERMSG_EXT_TYPE_INTERNALSECRET, getInternalSecretPingExtSize());
2667 memcpy(ext->internal_secret, server.cluster->internal_secret, CLUSTER_INTERNALSECRETLEN);
2668
2669 /* Move the write cursor */
2670 cursor = nextPingExt(cursor);
2671 }
2672 totlen += getInternalSecretPingExtSize();
2673 extensions++;
2674
2675 if (hdr != NULL) {
2676 hdr->extensions = htons(extensions);
2677 }
2678
2679 return totlen;
2680}
2681
2682/* We previously validated the extensions, so this function just needs to
2683 * handle the extensions. */
2684void clusterProcessPingExtensions(clusterMsg *hdr, clusterLink *link) {
2685 clusterNode *sender = link->node ? link->node : clusterLookupNode(hdr->sender, CLUSTER_NAMELEN);
2686 char *ext_hostname = NULL;
2687 char *ext_humannodename = NULL;
2688 char *ext_shardid = NULL;
2689 uint16_t extensions = ntohs(hdr->extensions);
2690 /* Loop through all the extensions and process them */
2691 clusterMsgPingExt *ext = getInitialPingExt(hdr, ntohs(hdr->count));
2692 while (extensions--) {
2693 uint16_t type = ntohs(ext->type);
2694 if (type == CLUSTERMSG_EXT_TYPE_HOSTNAME) {
2695 clusterMsgPingExtHostname *hostname_ext = (clusterMsgPingExtHostname *) &(ext->ext[0].hostname);
2696 ext_hostname = hostname_ext->hostname;
2697 } else if (type == CLUSTERMSG_EXT_TYPE_HUMAN_NODENAME) {
2698 clusterMsgPingExtHumanNodename *humannodename_ext = (clusterMsgPingExtHumanNodename *) &(ext->ext[0].human_nodename);
2699 ext_humannodename = humannodename_ext->human_nodename;
2700 } else if (type == CLUSTERMSG_EXT_TYPE_FORGOTTEN_NODE) {
2701 clusterMsgPingExtForgottenNode *forgotten_node_ext = &(ext->ext[0].forgotten_node);
2702 clusterNode *n = clusterLookupNode(forgotten_node_ext->name, CLUSTER_NAMELEN);
2703 if (n && n != myself && !(nodeIsSlave(myself) && myself->slaveof == n)) {
2704 sds id = sdsnewlen(forgotten_node_ext->name, CLUSTER_NAMELEN);
2705 dictEntry *de = dictAddOrFind(server.cluster->nodes_black_list, id);
2706 if (dictGetKey(de) != id) sdsfree(id);
2707 uint64_t expire = server.unixtime + ntohu64(forgotten_node_ext->ttl);
2708 dictSetUnsignedIntegerVal(de, expire);
2709 clusterDelNode(n);
2710 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
2711 CLUSTER_TODO_SAVE_CONFIG);
2712 }
2713 } else if (type == CLUSTERMSG_EXT_TYPE_SHARDID) {
2714 clusterMsgPingExtShardId *shardid_ext = (clusterMsgPingExtShardId *) &(ext->ext[0].shard_id);
2715 ext_shardid = shardid_ext->shard_id;
2716 } else if (type == CLUSTERMSG_EXT_TYPE_INTERNALSECRET) {
2717 clusterMsgPingExtInternalSecret *internal_secret_ext = (clusterMsgPingExtInternalSecret *) &(ext->ext[0].internal_secret);
2718 if (memcmp(server.cluster->internal_secret, internal_secret_ext->internal_secret, CLUSTER_INTERNALSECRETLEN) > 0 ) {
2719 memcpy(server.cluster->internal_secret, internal_secret_ext->internal_secret, CLUSTER_INTERNALSECRETLEN);
2720 }
2721 } else {
2722 /* Unknown type, we will ignore it but log what happened. */
2723 serverLog(LL_VERBOSE, "Received unknown extension type %d", type);
2724 }
2725
2726 /* We know this will be valid since we validated it ahead of time */
2727 ext = getNextPingExt(ext);
2728 }
2729
2730 /* If the node did not send us a hostname extension, assume
2731 * they don't have an announced hostname. Otherwise, we'll
2732 * set it now. */
2733 updateAnnouncedHostname(sender, ext_hostname);
2734 updateAnnouncedHumanNodename(sender, ext_humannodename);
2735 /* If the node did not send us a shard-id extension, it means the sender
2736 * does not support it (old version), node->shard_id is randomly generated.
2737 * A cluster-wide consensus for the node's shard_id is not necessary.
2738 * The key is maintaining consistency of the shard_id on each individual 7.2 node.
2739 * As the cluster progressively upgrades to version 7.2, we can expect the shard_ids
2740 * across all nodes to naturally converge and align.
2741 *
2742 * If sender is a replica, set the shard_id to the shard_id of its master.
2743 * Otherwise, we'll set it now. */
2744 if (ext_shardid == NULL) ext_shardid = clusterNodeGetMaster(sender)->shard_id;
2745
2746 updateShardId(sender, ext_shardid);
2747}
2748
2749static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) {
2750 clusterNode *sender;
2751 if (link->node && !nodeInHandshake(link->node)) {
2752 /* If the link has an associated node, use that so that we don't have to look it
2753 * up every time, except when the node is still in handshake, the node still has
2754 * a random name thus not truly "known". */
2755 sender = link->node;
2756 } else {
2757 /* Otherwise, fetch sender based on the message */
2758 sender = clusterLookupNode(hdr->sender, CLUSTER_NAMELEN);
2759 /* We know the sender node but haven't associate it with the link. This must
2760 * be an inbound link because only for inbound links we didn't know which node
2761 * to associate when they were created. */
2762 if (sender && !link->node) {
2763 setClusterNodeToInboundClusterLink(sender, link);
2764 }
2765 }
2766 return sender;
2767}
2768
2769/* When this function is called, there is a packet to process starting
2770 * at link->rcvbuf. Releasing the buffer is up to the caller, so this
2771 * function should just handle the higher level stuff of processing the
2772 * packet, modifying the cluster state if needed.
2773 *
2774 * The function returns 1 if the link is still valid after the packet
2775 * was processed, otherwise 0 if the link was freed since the packet
2776 * processing lead to some inconsistency error (for instance a PONG
2777 * received from the wrong sender ID). */
2778int clusterProcessPacket(clusterLink *link) {
2779 clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
2780 uint32_t totlen = ntohl(hdr->totlen);
2781 uint16_t type = ntohs(hdr->type);
2782 mstime_t now = mstime();
2783
2784 if (type < CLUSTERMSG_TYPE_COUNT)
2785 server.cluster->stats_bus_messages_received[type]++;
2786 serverLog(LL_DEBUG,"--- Processing packet of type %s, %lu bytes",
2787 clusterGetMessageTypeString(type), (unsigned long) totlen);
2788
2789 /* Perform sanity checks */
2790 if (totlen < 16) return 1; /* At least signature, version, totlen, count. */
2791 if (totlen > link->rcvbuf_len) return 1;
2792
2793 if (ntohs(hdr->ver) != CLUSTER_PROTO_VER) {
2794 /* Can't handle messages of different versions. */
2795 return 1;
2796 }
2797
2798 if (type == server.cluster_drop_packet_filter) {
2799 serverLog(LL_WARNING, "Dropping packet that matches debug drop filter");
2800 return 1;
2801 }
2802
2803 uint16_t flags = ntohs(hdr->flags);
2804 uint16_t extensions = ntohs(hdr->extensions);
2805 uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
2806 uint32_t explen; /* expected length of this packet */
2807 clusterNode *sender;
2808
2809 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
2810 type == CLUSTERMSG_TYPE_MEET)
2811 {
2812 uint16_t count = ntohs(hdr->count);
2813
2814 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2815 explen += (sizeof(clusterMsgDataGossip)*count);
2816
2817 /* If there is extension data, which doesn't have a fixed length,
2818 * loop through them and validate the length of it now. */
2819 if (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA) {
2820 clusterMsgPingExt *ext = getInitialPingExt(hdr, count);
2821 while (extensions--) {
2822 uint16_t extlen = getPingExtLength(ext);
2823 if (extlen % 8 != 0) {
2824 serverLog(LL_WARNING, "Received a %s packet without proper padding (%d bytes)",
2825 clusterGetMessageTypeString(type), (int) extlen);
2826 return 1;
2827 }
2828 if ((totlen - explen) < extlen) {
2829 serverLog(LL_WARNING, "Received invalid %s packet with extension data that exceeds "
2830 "total packet length (%lld)", clusterGetMessageTypeString(type),
2831 (unsigned long long) totlen);
2832 return 1;
2833 }
2834 explen += extlen;
2835 ext = getNextPingExt(ext);
2836 }
2837 }
2838 } else if (type == CLUSTERMSG_TYPE_FAIL) {
2839 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2840 explen += sizeof(clusterMsgDataFail);
2841 } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
2842 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2843 explen += sizeof(clusterMsgDataPublish) -
2844 8 +
2845 ntohl(hdr->data.publish.msg.channel_len) +
2846 ntohl(hdr->data.publish.msg.message_len);
2847 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST ||
2848 type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
2849 type == CLUSTERMSG_TYPE_MFSTART)
2850 {
2851 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2852 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
2853 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2854 explen += sizeof(clusterMsgDataUpdate);
2855 } else if (type == CLUSTERMSG_TYPE_MODULE) {
2856 explen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
2857 explen += sizeof(clusterMsgModule) -
2858 3 + ntohl(hdr->data.module.msg.len);
2859 } else {
2860 /* We don't know this type of packet, so we assume it's well formed. */
2861 explen = totlen;
2862 }
2863
2864 if (totlen != explen) {
2865 serverLog(LL_WARNING, "Received invalid %s packet of length %lld but expected length %lld",
2866 clusterGetMessageTypeString(type), (unsigned long long) totlen, (unsigned long long) explen);
2867 return 1;
2868 }
2869
2870 sender = getNodeFromLinkAndMsg(link, hdr);
2871 if (sender && (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) {
2872 sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
2873 }
2874
2875 /* Update the last time we saw any data from this node. We
2876 * use this in order to avoid detecting a timeout from a node that
2877 * is just sending a lot of data in the cluster bus, for instance
2878 * because of Pub/Sub. */
2879 if (sender) sender->data_received = now;
2880
2881 if (sender && !nodeInHandshake(sender)) {
2882 /* Update our currentEpoch if we see a newer epoch in the cluster. */
2883 senderCurrentEpoch = ntohu64(hdr->currentEpoch);
2884 senderConfigEpoch = ntohu64(hdr->configEpoch);
2885 if (senderCurrentEpoch > server.cluster->currentEpoch)
2886 server.cluster->currentEpoch = senderCurrentEpoch;
2887 /* Update the sender configEpoch if it is publishing a newer one. */
2888 if (senderConfigEpoch > sender->configEpoch) {
2889 sender->configEpoch = senderConfigEpoch;
2890 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2891 CLUSTER_TODO_FSYNC_CONFIG);
2892 }
2893 /* Update the replication offset info for this node. */
2894 sender->repl_offset = ntohu64(hdr->offset);
2895 sender->repl_offset_time = now;
2896 /* If we are a slave performing a manual failover and our master
2897 * sent its offset while already paused, populate the MF state. */
2898 if (server.cluster->mf_end &&
2899 nodeIsSlave(myself) &&
2900 myself->slaveof == sender &&
2901 hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
2902 server.cluster->mf_master_offset == -1)
2903 {
2904 server.cluster->mf_master_offset = sender->repl_offset;
2905 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER);
2906 serverLog(LL_NOTICE,
2907 "Received replication offset for paused "
2908 "master manual failover: %lld",
2909 server.cluster->mf_master_offset);
2910 }
2911 }
2912
2913 /* Initial processing of PING and MEET requests replying with a PONG. */
2914 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) {
2915 /* We use incoming MEET messages in order to set the address
2916 * for 'myself', since only other cluster nodes will send us
2917 * MEET messages on handshakes, when the cluster joins, or
2918 * later if we changed address, and those nodes will use our
2919 * official address to connect to us. So by obtaining this address
2920 * from the socket is a simple way to discover / update our own
2921 * address in the cluster without it being hardcoded in the config.
2922 *
2923 * However if we don't have an address at all, we update the address
2924 * even with a normal PING packet. If it's wrong it will be fixed
2925 * by MEET later. */
2926 if ((type == CLUSTERMSG_TYPE_MEET || myself->ip[0] == '\0') &&
2927 server.cluster_announce_ip == NULL)
2928 {
2929 char ip[NET_IP_STR_LEN];
2930
2931 if (connAddrSockName(link->conn,ip,sizeof(ip),NULL) != -1 &&
2932 strcmp(ip,myself->ip))
2933 {
2934 memcpy(myself->ip,ip,NET_IP_STR_LEN);
2935 serverLog(LL_NOTICE,"IP address for this node updated to %s",
2936 myself->ip);
2937 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
2938 }
2939 }
2940
2941 /* Add this node if it is new for us and the msg type is MEET.
2942 * In this stage we don't try to add the node with the right
2943 * flags, slaveof pointer, and so forth, as this details will be
2944 * resolved when we'll receive PONGs from the node. */
2945 if (!sender && type == CLUSTERMSG_TYPE_MEET) {
2946 clusterNode *node;
2947 char ip[NET_IP_STR_LEN] = {0};
2948 if (nodeIp2String(ip, link, hdr->myip) != C_OK) {
2949 /* Unable to retrieve the node's IP address from the connection. Without a
2950 * valid IP, the node becomes unusable in the cluster. This failure might be
2951 * due to the connection being closed. */
2952 serverLog(LL_NOTICE, "Closing link even though we received a MEET packet on it, "
2953 "because the connection has an error");
2954 freeClusterLink(link);
2955 return 0;
2956 }
2957
2958 node = createClusterNode(NULL,CLUSTER_NODE_HANDSHAKE);
2959 memcpy(node->ip, ip, sizeof(ip));
2960 getClientPortFromClusterMsg(hdr, &node->tls_port, &node->tcp_port);
2961 node->cport = ntohs(hdr->cport);
2962 clusterAddNode(node);
2963 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
2964 }
2965
2966 /* If this is a MEET packet from an unknown node, we still process
2967 * the gossip section here since we have to trust the sender because
2968 * of the message type. */
2969 if (!sender && type == CLUSTERMSG_TYPE_MEET)
2970 clusterProcessGossipSection(hdr,link);
2971
2972 /* Anyway reply with a PONG */
2973 clusterSendPing(link,CLUSTERMSG_TYPE_PONG);
2974 }
2975
2976 /* PING, PONG, MEET: process config information. */
2977 if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
2978 type == CLUSTERMSG_TYPE_MEET)
2979 {
2980 serverLog(LL_DEBUG,"%s packet received: %.40s",
2981 clusterGetMessageTypeString(type),
2982 link->node ? link->node->name : "NULL");
2983 if (!link->inbound) {
2984 if (nodeInHandshake(link->node)) {
2985 /* If we already have this node, try to change the
2986 * IP/port of the node with the new one. */
2987 if (sender) {
2988 serverLog(LL_VERBOSE,
2989 "Handshake: we already know node %.40s (%s), "
2990 "updating the address if needed.", sender->name, sender->human_nodename);
2991 if (nodeUpdateAddressIfNeeded(sender,link,hdr))
2992 {
2993 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
2994 CLUSTER_TODO_UPDATE_STATE);
2995 }
2996 /* Free this node as we already have it. This will
2997 * cause the link to be freed as well. */
2998 clusterDelNode(link->node);
2999 return 0;
3000 }
3001
3002 /* First thing to do is replacing the random name with the
3003 * right node name if this was a handshake stage. */
3004 clusterRenameNode(link->node, hdr->sender);
3005 serverLog(LL_DEBUG,"Handshake with node %.40s completed.",
3006 link->node->name);
3007 link->node->flags &= ~CLUSTER_NODE_HANDSHAKE;
3008 link->node->flags |= flags&(CLUSTER_NODE_MASTER|CLUSTER_NODE_SLAVE);
3009 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
3010 } else if (memcmp(link->node->name,hdr->sender,
3011 CLUSTER_NAMELEN) != 0)
3012 {
3013 /* If the reply has a non matching node ID we
3014 * disconnect this node and set it as not having an associated
3015 * address. */
3016 serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
3017 link->node->name,
3018 (int)(now-(link->node->ctime)),
3019 link->node->flags);
3020 link->node->flags |= CLUSTER_NODE_NOADDR;
3021 link->node->ip[0] = '\0';
3022 link->node->tcp_port = 0;
3023 link->node->tls_port = 0;
3024 link->node->cport = 0;
3025 freeClusterLink(link);
3026 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
3027 return 0;
3028 }
3029 }
3030
3031 /* Copy the CLUSTER_NODE_NOFAILOVER flag from what the sender
3032 * announced. This is a dynamic flag that we receive from the
3033 * sender, and the latest status must be trusted. We need it to
3034 * be propagated because the slave ranking used to understand the
3035 * delay of each slave in the voting process, needs to know
3036 * what are the instances really competing. */
3037 if (sender) {
3038 int nofailover = flags & CLUSTER_NODE_NOFAILOVER;
3039 sender->flags &= ~CLUSTER_NODE_NOFAILOVER;
3040 sender->flags |= nofailover;
3041 }
3042
3043 /* Update the node address if it changed. */
3044 if (sender && type == CLUSTERMSG_TYPE_PING &&
3045 !nodeInHandshake(sender) &&
3046 nodeUpdateAddressIfNeeded(sender,link,hdr))
3047 {
3048 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
3049 CLUSTER_TODO_UPDATE_STATE);
3050 }
3051
3052 /* Update our info about the node */
3053 if (!link->inbound && type == CLUSTERMSG_TYPE_PONG) {
3054 link->node->pong_received = now;
3055 link->node->ping_sent = 0;
3056
3057 /* The PFAIL condition can be reversed without external
3058 * help if it is momentary (that is, if it does not
3059 * turn into a FAIL state).
3060 *
3061 * The FAIL condition is also reversible under specific
3062 * conditions detected by clearNodeFailureIfNeeded(). */
3063 if (nodeTimedOut(link->node)) {
3064 link->node->flags &= ~CLUSTER_NODE_PFAIL;
3065 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
3066 CLUSTER_TODO_UPDATE_STATE);
3067 } else if (nodeFailed(link->node)) {
3068 clearNodeFailureIfNeeded(link->node);
3069 }
3070 }
3071
3072 /* Check for role switch: slave -> master or master -> slave. */
3073 if (sender) {
3074 if (!memcmp(hdr->slaveof,CLUSTER_NODE_NULL_NAME,
3075 sizeof(hdr->slaveof)))
3076 {
3077 /* Node is a master. */
3078 clusterSetNodeAsMaster(sender);
3079 } else {
3080 /* Node is a slave. */
3081 clusterNode *master = clusterLookupNode(hdr->slaveof, CLUSTER_NAMELEN);
3082
3083 if (clusterNodeIsMaster(sender)) {
3084 /* Master turned into a slave! Reconfigure the node. */
3085 if (master && !memcmp(master->shard_id, sender->shard_id, CLUSTER_NAMELEN)) {
3086 /* `sender` was a primary and was in the same shard as `master`, its new primary */
3087 if (sender->configEpoch > senderConfigEpoch) {
3088 serverLog(LL_NOTICE,
3089 "Ignore stale message from %.40s (%s) in shard %.40s;"
3090 " gossip config epoch: %llu, current config epoch: %llu",
3091 sender->name,
3092 sender->human_nodename,
3093 sender->shard_id,
3094 (unsigned long long)senderConfigEpoch,
3095 (unsigned long long)sender->configEpoch);
3096 } else {
3097 /* A failover occurred in the shard where `sender` belongs to and `sender` is no longer
3098 * a primary. Update slot assignment to `master`, which is the new primary in the shard */
3099 int slots = clusterMoveNodeSlots(sender, master);
3100 /* `master` is still a `slave` in this observer node's view; update its role and configEpoch */
3101 clusterSetNodeAsMaster(master);
3102 master->configEpoch = senderConfigEpoch;
3103 serverLog(LL_NOTICE, "A failover occurred in shard %.40s; node %.40s (%s)"
3104 " lost %d slot(s) to node %.40s (%s) with a config epoch of %llu",
3105 sender->shard_id,
3106 sender->name,
3107 sender->human_nodename,
3108 slots,
3109 master->name,
3110 master->human_nodename,
3111 (unsigned long long) master->configEpoch);
3112 }
3113 } else {
3114 /* `sender` was moved to another shard and has become a replica, remove its slot assignment */
3115 int slots = clusterDelNodeSlots(sender);
3116 serverLog(LL_NOTICE, "Node %.40s (%s) is no longer master of shard %.40s;"
3117 " removed all %d slot(s) it used to own",
3118 sender->name,
3119 sender->human_nodename,
3120 sender->shard_id,
3121 slots);
3122 if (master != NULL) {
3123 serverLog(LL_NOTICE, "Node %.40s (%s) is now part of shard %.40s",
3124 sender->name,
3125 sender->human_nodename,
3126 master->shard_id);
3127 }
3128 }
3129 sender->flags &= ~(CLUSTER_NODE_MASTER|
3130 CLUSTER_NODE_MIGRATE_TO);
3131 sender->flags |= CLUSTER_NODE_SLAVE;
3132
3133 /* Update config and state. */
3134 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
3135 CLUSTER_TODO_UPDATE_STATE);
3136 }
3137
3138 /* Master node changed for this slave? */
3139 if (master && sender->slaveof != master) {
3140 if (sender->slaveof)
3141 clusterNodeRemoveSlave(sender->slaveof,sender);
3142 clusterNodeAddSlave(master,sender);
3143 sender->slaveof = master;
3144
3145 /* Update the shard_id when a replica is connected to its
3146 * primary in the very first time. */
3147 updateShardId(sender, master->shard_id);
3148
3149 /* Update config. */
3150 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
3151 }
3152 }
3153 }
3154
3155 /* Update our info about served slots.
3156 *
3157 * Note: this MUST happen after we update the master/slave state
3158 * so that CLUSTER_NODE_MASTER flag will be set. */
3159
3160 /* Many checks are only needed if the set of served slots this
3161 * instance claims is different compared to the set of slots we have
3162 * for it. Check this ASAP to avoid other computational expansive
3163 * checks later. */
3164 clusterNode *sender_master = NULL; /* Sender or its master if slave. */
3165 int dirty_slots = 0; /* Sender claimed slots don't match my view? */
3166
3167 if (sender) {
3168 sender_master = clusterNodeIsMaster(sender) ? sender : sender->slaveof;
3169 if (sender_master) {
3170 dirty_slots = memcmp(sender_master->slots,
3171 hdr->myslots,sizeof(hdr->myslots)) != 0;
3172 }
3173 }
3174
3175 /* 1) If the sender of the message is a master, and we detected that
3176 * the set of slots it claims changed, scan the slots to see if we
3177 * need to update our configuration. */
3178 if (sender && clusterNodeIsMaster(sender) && dirty_slots)
3179 clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);
3180
3181 /* 2) We also check for the reverse condition, that is, the sender
3182 * claims to serve slots we know are served by a master with a
3183 * greater configEpoch. If this happens we inform the sender.
3184 *
3185 * This is useful because sometimes after a partition heals, a
3186 * reappearing master may be the last one to claim a given set of
3187 * hash slots, but with a configuration that other instances know to
3188 * be deprecated. Example:
3189 *
3190 * A and B are master and slave for slots 1,2,3.
3191 * A is partitioned away, B gets promoted.
3192 * B is partitioned away, and A returns available.
3193 *
3194 * Usually B would PING A publishing its set of served slots and its
3195 * configEpoch, but because of the partition B can't inform A of the
3196 * new configuration, so other nodes that have an updated table must
3197 * do it. In this way A will stop to act as a master (or can try to
3198 * failover if there are the conditions to win the election). */
3199 if (sender && dirty_slots) {
3200 int j;
3201
3202 for (j = 0; j < CLUSTER_SLOTS; j++) {
3203 if (bitmapTestBit(hdr->myslots,j)) {
3204 if (server.cluster->slots[j] == sender ||
3205 isSlotUnclaimed(j)) continue;
3206 if (server.cluster->slots[j]->configEpoch >
3207 senderConfigEpoch)
3208 {
3209 serverLog(LL_VERBOSE,
3210 "Node %.40s has old slots configuration, sending "
3211 "an UPDATE message about %.40s",
3212 sender->name, server.cluster->slots[j]->name);
3213 clusterSendUpdate(sender->link,
3214 server.cluster->slots[j]);
3215
3216 /* TODO: instead of exiting the loop send every other
3217 * UPDATE packet for other nodes that are the new owner
3218 * of sender's slots. */
3219 break;
3220 }
3221 }
3222 }
3223 }
3224
3225 /* If our config epoch collides with the sender's try to fix
3226 * the problem. */
3227 if (sender && clusterNodeIsMaster(myself) && clusterNodeIsMaster(sender) &&
3228 senderConfigEpoch == myself->configEpoch)
3229 {
3230 clusterHandleConfigEpochCollision(sender);
3231 }
3232
3233 /* Get info from the gossip section */
3234 if (sender) {
3235 clusterProcessGossipSection(hdr,link);
3236 clusterProcessPingExtensions(hdr,link);
3237 }
3238 } else if (type == CLUSTERMSG_TYPE_FAIL) {
3239 clusterNode *failing;
3240
3241 if (sender) {
3242 failing = clusterLookupNode(hdr->data.fail.about.nodename, CLUSTER_NAMELEN);
3243 if (failing &&
3244 !(failing->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_MYSELF)))
3245 {
3246 serverLog(LL_NOTICE,
3247 "FAIL message received from %.40s (%s) about %.40s (%s)",
3248 hdr->sender, sender->human_nodename, hdr->data.fail.about.nodename, failing->human_nodename);
3249 failing->flags |= CLUSTER_NODE_FAIL;
3250 failing->fail_time = now;
3251 failing->flags &= ~CLUSTER_NODE_PFAIL;
3252 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
3253 CLUSTER_TODO_UPDATE_STATE);
3254 }
3255 } else {
3256 serverLog(LL_NOTICE,
3257 "Ignoring FAIL message from unknown node %.40s about %.40s",
3258 hdr->sender, hdr->data.fail.about.nodename);
3259 }
3260 } else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
3261 if (!sender) return 1; /* We don't know that node. */
3262
3263 robj *channel, *message;
3264 uint32_t channel_len, message_len;
3265
3266 /* Don't bother creating useless objects if there are no
3267 * Pub/Sub subscribers. */
3268 if ((type == CLUSTERMSG_TYPE_PUBLISH
3269 && serverPubsubSubscriptionCount() > 0)
3270 || (type == CLUSTERMSG_TYPE_PUBLISHSHARD
3271 && serverPubsubShardSubscriptionCount() > 0))
3272 {
3273 channel_len = ntohl(hdr->data.publish.msg.channel_len);
3274 message_len = ntohl(hdr->data.publish.msg.message_len);
3275 channel = createStringObject(
3276 (char*)hdr->data.publish.msg.bulk_data,channel_len);
3277 message = createStringObject(
3278 (char*)hdr->data.publish.msg.bulk_data+channel_len,
3279 message_len);
3280 pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD);
3281 decrRefCount(channel);
3282 decrRefCount(message);
3283 }
3284 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
3285 if (!sender) return 1; /* We don't know that node. */
3286 clusterSendFailoverAuthIfNeeded(sender,hdr);
3287 } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
3288 if (!sender) return 1; /* We don't know that node. */
3289 /* We consider this vote only if the sender is a master serving
3290 * a non zero number of slots, and its currentEpoch is greater or
3291 * equal to epoch where this node started the election. */
3292 if (clusterNodeIsMaster(sender) && sender->numslots > 0 &&
3293 senderCurrentEpoch >= server.cluster->failover_auth_epoch)
3294 {
3295 server.cluster->failover_auth_count++;
3296 /* Maybe we reached a quorum here, set a flag to make sure
3297 * we check ASAP. */
3298 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
3299 }
3300 } else if (type == CLUSTERMSG_TYPE_MFSTART) {
3301 /* This message is acceptable only if I'm a master and the sender
3302 * is one of my slaves. */
3303 if (!sender || sender->slaveof != myself) return 1;
3304 /* Cancel all ASM tasks when starting manual failover */
3305 clusterAsmCancel(NULL, "manual failover");
3306 /* Manual failover requested from slaves. Initialize the state
3307 * accordingly. */
3308 resetManualFailover();
3309 server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
3310 server.cluster->mf_slave = sender;
3311 pauseActions(PAUSE_DURING_FAILOVER,
3312 now + (CLUSTER_MF_TIMEOUT * CLUSTER_MF_PAUSE_MULT),
3313 PAUSE_ACTIONS_CLIENT_WRITE_SET);
3314 serverLog(LL_NOTICE,"Manual failover requested by replica %.40s (%s).",
3315 sender->name, sender->human_nodename);
3316 /* We need to send a ping message to the replica, as it would carry
3317 * `server.cluster->mf_master_offset`, which means the master paused clients
3318 * at offset `server.cluster->mf_master_offset`, so that the replica would
3319 * know that it is safe to set its `server.cluster->mf_can_start` to 1 so as
3320 * to complete failover as quickly as possible. */
3321 clusterSendPing(link, CLUSTERMSG_TYPE_PING);
3322 } else if (type == CLUSTERMSG_TYPE_UPDATE) {
3323 clusterNode *n; /* The node the update is about. */
3324 uint64_t reportedConfigEpoch =
3325 ntohu64(hdr->data.update.nodecfg.configEpoch);
3326
3327 if (!sender) return 1; /* We don't know the sender. */
3328 n = clusterLookupNode(hdr->data.update.nodecfg.nodename, CLUSTER_NAMELEN);
3329 if (!n) return 1; /* We don't know the reported node. */
3330 if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */
3331
3332 /* If in our current config the node is a slave, set it as a master. */
3333 if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);
3334
3335 /* Update the node's configEpoch. */
3336 n->configEpoch = reportedConfigEpoch;
3337 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
3338 CLUSTER_TODO_FSYNC_CONFIG);
3339
3340 /* Check the bitmap of served slots and update our
3341 * config accordingly. */
3342 clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
3343 hdr->data.update.nodecfg.slots);
3344 } else if (type == CLUSTERMSG_TYPE_MODULE) {
3345 if (!sender) return 1; /* Protect the module from unknown nodes. */
3346 /* We need to route this message back to the right module subscribed
3347 * for the right message type. */
3348 uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
3349 uint32_t len = ntohl(hdr->data.module.msg.len);
3350 uint8_t type = hdr->data.module.msg.type;
3351 unsigned char *payload = hdr->data.module.msg.bulk_data;
3352 moduleCallClusterReceivers(sender->name,module_id,type,payload,len);
3353 } else {
3354 serverLog(LL_WARNING,"Received unknown packet type: %d", type);
3355 }
3356 return 1;
3357}
3358
3359/* This function is called when we detect the link with this node is lost.
3360 We set the node as no longer connected. The Cluster Cron will detect
3361 this connection and will try to get it connected again.
3362
3363 Instead if the node is a temporary node used to accept a query, we
3364 completely free the node on error. */
3365void handleLinkIOError(clusterLink *link) {
3366 freeClusterLink(link);
3367}
3368
3369/* Send the messages queued for the link. */
3370void clusterWriteHandler(connection *conn) {
3371 clusterLink *link = connGetPrivateData(conn);
3372 ssize_t nwritten;
3373 size_t totwritten = 0;
3374
3375 while (totwritten < NET_MAX_WRITES_PER_EVENT && listLength(link->send_msg_queue) > 0) {
3376 listNode *head = listFirst(link->send_msg_queue);
3377 clusterMsgSendBlock *msgblock = (clusterMsgSendBlock*)head->value;
3378 clusterMsg *msg = getMessageFromSendBlock(msgblock);
3379 size_t msg_offset = link->head_msg_send_offset;
3380 size_t msg_len = ntohl(msg->totlen);
3381
3382 nwritten = connWrite(conn, (char*)msg + msg_offset, msg_len - msg_offset);
3383 if (nwritten <= 0) {
3384 serverLog(LL_DEBUG,"I/O error writing to node link: %s",
3385 (nwritten == -1) ? connGetLastError(conn) : "short write");
3386 handleLinkIOError(link);
3387 return;
3388 }
3389 if (msg_offset + nwritten < msg_len) {
3390 /* If full message wasn't written, record the offset
3391 * and continue sending from this point next time */
3392 link->head_msg_send_offset += nwritten;
3393 return;
3394 }
3395 serverAssert((msg_offset + nwritten) == msg_len);
3396 link->head_msg_send_offset = 0;
3397
3398 /* Delete the node and update our memory tracking */
3399 uint32_t blocklen = msgblock->totlen;
3400 listDelNode(link->send_msg_queue, head);
3401 server.stat_cluster_links_memory -= sizeof(listNode);
3402 link->send_msg_queue_mem -= sizeof(listNode) + blocklen;
3403
3404 totwritten += nwritten;
3405 }
3406
3407 if (listLength(link->send_msg_queue) == 0)
3408 connSetWriteHandler(link->conn, NULL);
3409}
3410
3411/* A connect handler that gets called when a connection to another node
3412 * gets established.
3413 */
3414void clusterLinkConnectHandler(connection *conn) {
3415 clusterLink *link = connGetPrivateData(conn);
3416 clusterNode *node = link->node;
3417
3418 /* Check if connection succeeded */
3419 if (connGetState(conn) != CONN_STATE_CONNECTED) {
3420 serverLog(LL_VERBOSE, "Connection with Node %.40s at %s:%d failed: %s",
3421 node->name, node->ip, node->cport,
3422 connGetLastError(conn));
3423 freeClusterLink(link);
3424 return;
3425 }
3426
3427 /* Register a read handler from now on */
3428 connSetReadHandler(conn, clusterReadHandler);
3429
3430 /* Queue a PING in the new connection ASAP: this is crucial
3431 * to avoid false positives in failure detection.
3432 *
3433 * If the node is flagged as MEET, we send a MEET message instead
3434 * of a PING one, to force the receiver to add us in its node
3435 * table. */
3436 mstime_t old_ping_sent = node->ping_sent;
3437 clusterSendPing(link, node->flags & CLUSTER_NODE_MEET ?
3438 CLUSTERMSG_TYPE_MEET : CLUSTERMSG_TYPE_PING);
3439 if (old_ping_sent) {
3440 /* If there was an active ping before the link was
3441 * disconnected, we want to restore the ping time, otherwise
3442 * replaced by the clusterSendPing() call. */
3443 node->ping_sent = old_ping_sent;
3444 }
3445 /* We can clear the flag after the first packet is sent.
3446 * If we'll never receive a PONG, we'll never send new packets
3447 * to this node. Instead after the PONG is received and we
3448 * are no longer in meet/handshake status, we want to send
3449 * normal PING packets. */
3450 node->flags &= ~CLUSTER_NODE_MEET;
3451
3452 serverLog(LL_DEBUG,"Connecting with Node %.40s at %s:%d",
3453 node->name, node->ip, node->cport);
3454}
3455
3456/* Read data. Try to read the first field of the header first to check the
3457 * full length of the packet. When a whole packet is in memory this function
3458 * will call the function to process the packet. And so forth. */
3459void clusterReadHandler(connection *conn) {
3460 clusterMsg buf[1];
3461 ssize_t nread;
3462 clusterMsg *hdr;
3463 clusterLink *link = connGetPrivateData(conn);
3464 unsigned int readlen, rcvbuflen;
3465
3466 while(1) { /* Read as long as there is data to read. */
3467 rcvbuflen = link->rcvbuf_len;
3468 if (rcvbuflen < 8) {
3469 /* First, obtain the first 8 bytes to get the full message
3470 * length. */
3471 readlen = 8 - rcvbuflen;
3472 } else {
3473 /* Finally read the full message. */
3474 hdr = (clusterMsg*) link->rcvbuf;
3475 if (rcvbuflen == 8) {
3476 /* Perform some sanity check on the message signature
3477 * and length. */
3478 if (memcmp(hdr->sig,"RCmb",4) != 0 ||
3479 ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN)
3480 {
3481 char ip[NET_IP_STR_LEN];
3482 int port;
3483 if (connAddrPeerName(conn, ip, sizeof(ip), &port) == -1) {
3484 serverLog(LL_WARNING,
3485 "Bad message length or signature received "
3486 "on the Cluster bus.");
3487 } else {
3488 serverLog(LL_WARNING,
3489 "Bad message length or signature received "
3490 "on the Cluster bus from %s:%d", ip, port);
3491 }
3492 handleLinkIOError(link);
3493 return;
3494 }
3495 }
3496 readlen = ntohl(hdr->totlen) - rcvbuflen;
3497 if (readlen > sizeof(buf)) readlen = sizeof(buf);
3498 }
3499
3500 nread = connRead(conn,buf,readlen);
3501 if (nread == -1 && (connGetState(conn) == CONN_STATE_CONNECTED)) return; /* No more data ready. */
3502
3503 if (nread <= 0) {
3504 /* I/O error... */
3505 serverLog(LL_DEBUG,"I/O error reading from node link: %s",
3506 (nread == 0) ? "connection closed" : connGetLastError(conn));
3507 handleLinkIOError(link);
3508 return;
3509 } else {
3510 /* Read data and recast the pointer to the new buffer. */
3511 size_t unused = link->rcvbuf_alloc - link->rcvbuf_len;
3512 if ((size_t)nread > unused) {
3513 size_t required = link->rcvbuf_len + nread;
3514 size_t prev_rcvbuf_alloc = link->rcvbuf_alloc;
3515 /* If less than 1mb, grow to twice the needed size, if larger grow by 1mb. */
3516 link->rcvbuf_alloc = required < RCVBUF_MAX_PREALLOC ? required * 2: required + RCVBUF_MAX_PREALLOC;
3517 link->rcvbuf = zrealloc(link->rcvbuf, link->rcvbuf_alloc);
3518 server.stat_cluster_links_memory += link->rcvbuf_alloc - prev_rcvbuf_alloc;
3519 }
3520 memcpy(link->rcvbuf + link->rcvbuf_len, buf, nread);
3521 link->rcvbuf_len += nread;
3522 hdr = (clusterMsg*) link->rcvbuf;
3523 rcvbuflen += nread;
3524 }
3525
3526 /* Total length obtained? Process this packet. */
3527 if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
3528 if (clusterProcessPacket(link)) {
3529 if (link->rcvbuf_alloc > RCVBUF_INIT_LEN) {
3530 size_t prev_rcvbuf_alloc = link->rcvbuf_alloc;
3531 zfree(link->rcvbuf);
3532 link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
3533 server.stat_cluster_links_memory += link->rcvbuf_alloc - prev_rcvbuf_alloc;
3534 }
3535 link->rcvbuf_len = 0;
3536 } else {
3537 return; /* Link no longer valid. */
3538 }
3539 }
3540 }
3541}
3542
3543/* Put the message block into the link's send queue.
3544 *
3545 * It is guaranteed that this function will never have as a side effect
3546 * the link to be invalidated, so it is safe to call this function
3547 * from event handlers that will do stuff with the same link later. */
3548void clusterSendMessage(clusterLink *link, clusterMsgSendBlock *msgblock) {
3549 if (!link) {
3550 return;
3551 }
3552 if (listLength(link->send_msg_queue) == 0 && getMessageFromSendBlock(msgblock)->totlen != 0)
3553 connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);
3554
3555 listAddNodeTail(link->send_msg_queue, msgblock);
3556 msgblock->refcount++;
3557
3558 /* Update memory tracking */
3559 link->send_msg_queue_mem += sizeof(listNode) + msgblock->totlen;
3560 server.stat_cluster_links_memory += sizeof(listNode);
3561
3562 /* Populate sent messages stats. */
3563 uint16_t type = ntohs(getMessageFromSendBlock(msgblock)->type);
3564 if (type < CLUSTERMSG_TYPE_COUNT)
3565 server.cluster->stats_bus_messages_sent[type]++;
3566}
3567
3568/* Send a message to all the nodes that are part of the cluster having
3569 * a connected link.
3570 *
3571 * It is guaranteed that this function will never have as a side effect
3572 * some node->link to be invalidated, so it is safe to call this function
3573 * from event handlers that will do stuff with node links later. */
3574void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) {
3575 dictIterator di;
3576 dictEntry *de;
3577
3578 dictInitSafeIterator(&di, server.cluster->nodes);
3579 while((de = dictNext(&di)) != NULL) {
3580 clusterNode *node = dictGetVal(de);
3581
3582 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
3583 continue;
3584 clusterSendMessage(node->link,msgblock);
3585 }
3586 dictResetIterator(&di);
3587}
3588
3589/* Build the message header. hdr must point to a buffer at least
3590 * sizeof(clusterMsg) in bytes. */
3591static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) {
3592 uint64_t offset;
3593 clusterNode *master;
3594
3595 /* If this node is a master, we send its slots bitmap and configEpoch.
3596 * If this node is a slave we send the master's information instead (the
3597 * node is flagged as slave so the receiver knows that it is NOT really
3598 * in charge for this slots. */
3599 master = (nodeIsSlave(myself) && myself->slaveof) ?
3600 myself->slaveof : myself;
3601
3602 hdr->ver = htons(CLUSTER_PROTO_VER);
3603 hdr->sig[0] = 'R';
3604 hdr->sig[1] = 'C';
3605 hdr->sig[2] = 'm';
3606 hdr->sig[3] = 'b';
3607 hdr->type = htons(type);
3608 memcpy(hdr->sender,myself->name,CLUSTER_NAMELEN);
3609
3610 /* If cluster-announce-ip option is enabled, force the receivers of our
3611 * packets to use the specified address for this node. Otherwise if the
3612 * first byte is zero, they'll do auto discovery. */
3613 memset(hdr->myip,0,NET_IP_STR_LEN);
3614 if (server.cluster_announce_ip) {
3615 redis_strlcpy(hdr->myip,server.cluster_announce_ip,NET_IP_STR_LEN);
3616 }
3617
3618 /* Handle cluster-announce-[tls-|bus-]port. */
3619 int announced_tcp_port, announced_tls_port, announced_cport;
3620 deriveAnnouncedPorts(&announced_tcp_port, &announced_tls_port, &announced_cport);
3621
3622 memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots));
3623 memset(hdr->slaveof,0,CLUSTER_NAMELEN);
3624 if (myself->slaveof != NULL)
3625 memcpy(hdr->slaveof,myself->slaveof->name, CLUSTER_NAMELEN);
3626 if (server.tls_cluster) {
3627 hdr->port = htons(announced_tls_port);
3628 hdr->pport = htons(announced_tcp_port);
3629 } else {
3630 hdr->port = htons(announced_tcp_port);
3631 hdr->pport = htons(announced_tls_port);
3632 }
3633 hdr->cport = htons(announced_cport);
3634 hdr->flags = htons(myself->flags);
3635 hdr->state = server.cluster->state;
3636
3637 /* Set the currentEpoch and configEpochs. */
3638 hdr->currentEpoch = htonu64(server.cluster->currentEpoch);
3639 hdr->configEpoch = htonu64(master->configEpoch);
3640
3641 /* Set the replication offset. */
3642 if (nodeIsSlave(myself))
3643 offset = replicationGetSlaveOffset();
3644 else
3645 offset = server.master_repl_offset;
3646 hdr->offset = htonu64(offset);
3647
3648 /* Set the message flags. */
3649 if (clusterNodeIsMaster(myself) && server.cluster->mf_end)
3650 hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
3651 hdr->mflags[0] |= CLUSTERMSG_FLAG0_EXT_DATA; /* Always make other nodes know that
3652 * this node supports extension data. */
3653
3654 hdr->totlen = htonl(msglen);
3655}
3656
3657/* Set the i-th entry of the gossip section in the message pointed by 'hdr'
3658 * to the info of the specified node 'n'. */
3659void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
3660 clusterMsgDataGossip *gossip;
3661 gossip = &(hdr->data.ping.gossip[i]);
3662 memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN);
3663 gossip->ping_sent = htonl(n->ping_sent/1000);
3664 gossip->pong_received = htonl(n->pong_received/1000);
3665 memcpy(gossip->ip,n->ip,sizeof(n->ip));
3666 if (server.tls_cluster) {
3667 gossip->port = htons(n->tls_port);
3668 gossip->pport = htons(n->tcp_port);
3669 } else {
3670 gossip->port = htons(n->tcp_port);
3671 gossip->pport = htons(n->tls_port);
3672 }
3673 gossip->cport = htons(n->cport);
3674 gossip->flags = htons(n->flags);
3675 gossip->notused1 = 0;
3676}
3677
3678/* Send a PING or PONG packet to the specified node, making sure to add enough
3679 * gossip information. */
3680void clusterSendPing(clusterLink *link, int type) {
3681 static unsigned long long cluster_pings_sent = 0;
3682 cluster_pings_sent++;
3683 int gossipcount = 0; /* Number of gossip sections added so far. */
3684 int wanted; /* Number of gossip sections we want to append if possible. */
3685 int estlen; /* Upper bound on estimated packet length */
3686 /* freshnodes is the max number of nodes we can hope to append at all:
3687 * nodes available minus two (ourself and the node we are sending the
3688 * message to). However practically there may be less valid nodes since
3689 * nodes in handshake state, disconnected, are not considered. */
3690 int freshnodes = dictSize(server.cluster->nodes)-2;
3691
3692 /* How many gossip sections we want to add? 1/10 of the number of nodes
3693 * and anyway at least 3. Why 1/10?
3694 *
3695 * If we have N masters, with N/10 entries, and we consider that in
3696 * node_timeout we exchange with each other node at least 4 packets
3697 * (we ping in the worst case in node_timeout/2 time, and we also
3698 * receive two pings from the host), we have a total of 8 packets
3699 * in the node_timeout*2 failure reports validity time. So we have
3700 * that, for a single PFAIL node, we can expect to receive the following
3701 * number of failure reports (in the specified window of time):
3702 *
3703 * PROB * GOSSIP_ENTRIES_PER_PACKET * TOTAL_PACKETS:
3704 *
3705 * PROB = probability of being featured in a single gossip entry,
3706 * which is 1 / NUM_OF_NODES.
3707 * ENTRIES = 10.
3708 * TOTAL_PACKETS = 2 * 4 * NUM_OF_MASTERS.
3709 *
3710 * If we assume we have just masters (so num of nodes and num of masters
3711 * is the same), with 1/10 we always get over the majority, and specifically
3712 * 80% of the number of nodes, to account for many masters failing at the
3713 * same time.
3714 *
3715 * Since we have non-voting slaves that lower the probability of an entry
3716 * to feature our node, we set the number of entries per packet as
3717 * 10% of the total nodes we have. */
3718 wanted = floor(dictSize(server.cluster->nodes)/10);
3719 if (wanted < 3) wanted = 3;
3720 if (wanted > freshnodes) wanted = freshnodes;
3721
3722 /* Include all the nodes in PFAIL state, so that failure reports are
3723 * faster to propagate to go from PFAIL to FAIL state. */
3724 int pfail_wanted = server.cluster->stats_pfail_nodes;
3725
3726 /* Compute the maximum estlen to allocate our buffer. We'll fix the estlen
3727 * later according to the number of gossip sections we really were able
3728 * to put inside the packet. */
3729 estlen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
3730 estlen += (sizeof(clusterMsgDataGossip)*(wanted + pfail_wanted));
3731 if (link->node && nodeSupportsExtensions(link->node)) {
3732 estlen += writePingExt(NULL, 0);
3733 }
3734 /* Note: clusterBuildMessageHdr() expects the buffer to be always at least
3735 * sizeof(clusterMsg) or more. */
3736 if (estlen < (int)sizeof(clusterMsg)) estlen = sizeof(clusterMsg);
3737 clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, estlen);
3738 clusterMsg *hdr = getMessageFromSendBlock(msgblock);
3739
3740 if (!link->inbound && type == CLUSTERMSG_TYPE_PING)
3741 link->node->ping_sent = mstime();
3742
3743 /* Populate the gossip fields */
3744 int maxiterations = wanted*3;
3745 while(freshnodes > 0 && gossipcount < wanted && maxiterations--) {
3746 dictEntry *de = dictGetRandomKey(server.cluster->nodes);
3747 clusterNode *this = dictGetVal(de);
3748
3749 /* Don't include this node: the whole packet header is about us
3750 * already, so we just gossip about other nodes.
3751 * Also, don't include the receiver. Receiver will not update its state
3752 * based on gossips about itself. */
3753 if (this == myself || this == link->node) continue;
3754
3755 /* PFAIL nodes will be added later. */
3756 if (this->flags & CLUSTER_NODE_PFAIL) continue;
3757
3758 /* In the gossip section don't include:
3759 * 1) Nodes in HANDSHAKE state.
3760 * 3) Nodes with the NOADDR flag set.
3761 * 4) Disconnected nodes if they don't have configured slots.
3762 */
3763 if (this->flags & (CLUSTER_NODE_HANDSHAKE|CLUSTER_NODE_NOADDR) ||
3764 (this->link == NULL && this->numslots == 0))
3765 {
3766 freshnodes--; /* Technically not correct, but saves CPU. */
3767 continue;
3768 }
3769
3770 /* Do not add a node we already have. */
3771 if (this->last_in_ping_gossip == cluster_pings_sent) continue;
3772
3773 /* Add it */
3774 clusterSetGossipEntry(hdr,gossipcount,this);
3775 this->last_in_ping_gossip = cluster_pings_sent;
3776 freshnodes--;
3777 gossipcount++;
3778 }
3779
3780 /* If there are PFAIL nodes, add them at the end. */
3781 if (pfail_wanted) {
3782 dictIterator di;
3783 dictEntry *de;
3784
3785 dictInitSafeIterator(&di, server.cluster->nodes);
3786 while((de = dictNext(&di)) != NULL && pfail_wanted > 0) {
3787 clusterNode *node = dictGetVal(de);
3788 if (node->flags & CLUSTER_NODE_HANDSHAKE) continue;
3789 if (node->flags & CLUSTER_NODE_NOADDR) continue;
3790 if (!(node->flags & CLUSTER_NODE_PFAIL)) continue;
3791 clusterSetGossipEntry(hdr,gossipcount,node);
3792 gossipcount++;
3793 /* We take the count of the slots we allocated, since the
3794 * PFAIL stats may not match perfectly with the current number
3795 * of PFAIL nodes. */
3796 pfail_wanted--;
3797 }
3798 dictResetIterator(&di);
3799 }
3800
3801 /* Compute the actual total length and send! */
3802 uint32_t totlen = 0;
3803 if (link->node && nodeSupportsExtensions(link->node)) {
3804 totlen += writePingExt(hdr, gossipcount);
3805 }
3806 totlen += sizeof(clusterMsg)-sizeof(union clusterMsgData);
3807 totlen += (sizeof(clusterMsgDataGossip)*gossipcount);
3808 serverAssert(gossipcount < USHRT_MAX);
3809 hdr->count = htons(gossipcount);
3810 hdr->totlen = htonl(totlen);
3811
3812 clusterSendMessage(link,msgblock);
3813 clusterMsgSendBlockDecrRefCount(msgblock);
3814}
3815
3816/* Send a PONG packet to every connected node that's not in handshake state
3817 * and for which we have a valid link.
3818 *
3819 * In Redis Cluster pongs are not used just for failure detection, but also
3820 * to carry important configuration information. So broadcasting a pong is
3821 * useful when something changes in the configuration and we want to make
3822 * the cluster aware ASAP (for instance after a slave promotion).
3823 *
3824 * The 'target' argument specifies the receiving instances using the
3825 * defines below:
3826 *
3827 * CLUSTER_BROADCAST_ALL -> All known instances.
3828 * CLUSTER_BROADCAST_LOCAL_SLAVES -> All slaves in my master-slaves ring.
3829 */
3830#define CLUSTER_BROADCAST_ALL 0
3831#define CLUSTER_BROADCAST_LOCAL_SLAVES 1
3832void clusterBroadcastPong(int target) {
3833 dictIterator di;
3834 dictEntry *de;
3835
3836 dictInitSafeIterator(&di, server.cluster->nodes);
3837 while((de = dictNext(&di)) != NULL) {
3838 clusterNode *node = dictGetVal(de);
3839
3840 if (!node->link) continue;
3841 if (node == myself || nodeInHandshake(node)) continue;
3842 if (target == CLUSTER_BROADCAST_LOCAL_SLAVES) {
3843 int local_slave =
3844 nodeIsSlave(node) && node->slaveof &&
3845 (node->slaveof == myself || node->slaveof == myself->slaveof);
3846 if (!local_slave) continue;
3847 }
3848 clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG);
3849 }
3850 dictResetIterator(&di);
3851}
3852
3853/* Create a PUBLISH message block.
3854 *
3855 * Sanitizer suppression: In clusterMsgDataPublish, sizeof(bulk_data) is 8.
3856 * As all the struct is used as a buffer, when more than 8 bytes are copied into
3857 * the 'bulk_data', sanitizer generates an out-of-bounds error which is a false
3858 * positive in this context. */
3859REDIS_NO_SANITIZE("bounds")
3860clusterMsgSendBlock *clusterCreatePublishMsgBlock(robj *channel, robj *message, uint16_t type) {
3861
3862 uint32_t channel_len, message_len;
3863
3864 channel = getDecodedObject(channel);
3865 message = getDecodedObject(message);
3866 channel_len = sdslen(channel->ptr);
3867 message_len = sdslen(message->ptr);
3868
3869 size_t msglen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
3870 msglen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
3871 clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, msglen);
3872
3873 clusterMsg *hdr = getMessageFromSendBlock(msgblock);
3874 hdr->data.publish.msg.channel_len = htonl(channel_len);
3875 hdr->data.publish.msg.message_len = htonl(message_len);
3876 memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
3877 memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
3878 message->ptr,sdslen(message->ptr));
3879
3880 decrRefCount(channel);
3881 decrRefCount(message);
3882
3883 return msgblock;
3884}
3885
3886/* Send a FAIL message to all the nodes we are able to contact.
3887 * The FAIL message is sent when we detect that a node is failing
3888 * (CLUSTER_NODE_PFAIL) and we also receive a gossip confirmation of this:
3889 * we switch the node state to CLUSTER_NODE_FAIL and ask all the other
3890 * nodes to do the same ASAP. */
3891void clusterSendFail(char *nodename) {
3892 uint32_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData)
3893 + sizeof(clusterMsgDataFail);
3894 clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_FAIL, msglen);
3895
3896 clusterMsg *hdr = getMessageFromSendBlock(msgblock);
3897 memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
3898
3899 clusterBroadcastMessage(msgblock);
3900 clusterMsgSendBlockDecrRefCount(msgblock);
3901}
3902
3903/* Send an UPDATE message to the specified link carrying the specified 'node'
3904 * slots configuration. The node name, slots bitmap, and configEpoch info
3905 * are included. */
3906void clusterSendUpdate(clusterLink *link, clusterNode *node) {
3907 if (link == NULL) return;
3908
3909 uint32_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData)
3910 + sizeof(clusterMsgDataUpdate);
3911 clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_UPDATE, msglen);
3912
3913 clusterMsg *hdr = getMessageFromSendBlock(msgblock);
3914 memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
3915 hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
3916 memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
3917 for (unsigned int i = 0; i < sizeof(node->slots); i++) {
3918 /* Don't advertise slots that the node stopped claiming */
3919 hdr->data.update.nodecfg.slots[i] = hdr->data.update.nodecfg.slots[i] & (~server.cluster->owner_not_claiming_slot[i]);
3920 }
3921
3922 clusterSendMessage(link,msgblock);
3923 clusterMsgSendBlockDecrRefCount(msgblock);
3924}
3925
3926/* Send a MODULE message.
3927 *
3928 * If link is NULL, then the message is broadcasted to the whole cluster. */
3929void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
3930 const char *payload, uint32_t len) {
3931 uint32_t msglen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
3932 msglen += sizeof(clusterMsgModule) - 3 + len;
3933 clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_MODULE, msglen);
3934
3935 clusterMsg *hdr = getMessageFromSendBlock(msgblock);
3936 hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
3937 hdr->data.module.msg.type = type;
3938 hdr->data.module.msg.len = htonl(len);
3939 memcpy(hdr->data.module.msg.bulk_data,payload,len);
3940
3941 if (link)
3942 clusterSendMessage(link,msgblock);
3943 else
3944 clusterBroadcastMessage(msgblock);
3945
3946 clusterMsgSendBlockDecrRefCount(msgblock);
3947}
3948
3949/* This function gets a cluster node ID string as target, the same way the nodes
3950 * addresses are represented in the modules side, resolves the node, and sends
3951 * the message. If the target is NULL the message is broadcasted.
3952 *
3953 * The function returns C_OK if the target is valid, otherwise C_ERR is
3954 * returned. */
3955int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len) {
3956 clusterNode *node = NULL;
3957
3958 if (target != NULL) {
3959 node = clusterLookupNode(target, strlen(target));
3960 if (node == NULL || node->link == NULL) return C_ERR;
3961 }
3962
3963 clusterSendModule(target ? node->link : NULL,
3964 module_id, type, payload, len);
3965 return C_OK;
3966}
3967
3968/* -----------------------------------------------------------------------------
3969 * CLUSTER Pub/Sub support
3970 *
3971 * If `sharded` is 0:
3972 * For now we do very little, just propagating [S]PUBLISH messages across the whole
3973 * cluster. In the future we'll try to get smarter and avoiding propagating those
3974 * messages to hosts without receives for a given channel.
3975 * Otherwise:
3976 * Publish this message across the slot (primary/replica).
3977 * -------------------------------------------------------------------------- */
3978void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
3979 clusterMsgSendBlock *msgblock;
3980
3981 if (!sharded) {
3982 msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISH);
3983 clusterBroadcastMessage(msgblock);
3984 clusterMsgSendBlockDecrRefCount(msgblock);
3985 return;
3986 }
3987
3988 listIter li;
3989 listNode *ln;
3990 list *nodes_for_slot = clusterGetNodesInMyShard(server.cluster->myself);
3991 serverAssert(nodes_for_slot != NULL);
3992 listRewind(nodes_for_slot, &li);
3993 msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD);
3994 while((ln = listNext(&li))) {
3995 clusterNode *node = listNodeValue(ln);
3996 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
3997 continue;
3998 clusterSendMessage(node->link,msgblock);
3999 }
4000 clusterMsgSendBlockDecrRefCount(msgblock);
4001}
4002
4003/* -----------------------------------------------------------------------------
4004 * SLAVE node specific functions
4005 * -------------------------------------------------------------------------- */
4006
4007/* This function sends a FAILOVER_AUTH_REQUEST message to every node in order to
4008 * see if there is the quorum for this slave instance to failover its failing
4009 * master.
4010 *
4011 * Note that we send the failover request to everybody, master and slave nodes,
4012 * but only the masters are supposed to reply to our query. */
4013void clusterRequestFailoverAuth(void) {
4014 uint32_t msglen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
4015 clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST, msglen);
4016
4017 /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
4018 * in the header to communicate the nodes receiving the message that
4019 * they should authorized the failover even if the master is working. */
4020 if (server.cluster->mf_end) msgblock->msg[0].mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
4021 clusterBroadcastMessage(msgblock);
4022 clusterMsgSendBlockDecrRefCount(msgblock);
4023}
4024
4025/* Send a FAILOVER_AUTH_ACK message to the specified node. */
4026void clusterSendFailoverAuth(clusterNode *node) {
4027 if (!node->link) return;
4028
4029 uint32_t msglen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
4030 clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK, msglen);
4031
4032 clusterSendMessage(node->link,msgblock);
4033 clusterMsgSendBlockDecrRefCount(msgblock);
4034}
4035
4036/* Send a MFSTART message to the specified node. */
4037void clusterSendMFStart(clusterNode *node) {
4038 if (!node->link) return;
4039
4040 uint32_t msglen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
4041 clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_MFSTART, msglen);
4042
4043 clusterSendMessage(node->link,msgblock);
4044 clusterMsgSendBlockDecrRefCount(msgblock);
4045}
4046
4047/* Vote for the node asking for our vote if there are the conditions. */
4048void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
4049 clusterNode *master = node->slaveof;
4050 uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
4051 uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
4052 unsigned char *claimed_slots = request->myslots;
4053 int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
4054 int j;
4055
4056 /* IF we are not a master serving at least 1 slot, we don't have the
4057 * right to vote, as the cluster size in Redis Cluster is the number
4058 * of masters serving at least one slot, and quorum is the cluster
4059 * size + 1 */
4060 if (nodeIsSlave(myself) || myself->numslots == 0) return;
4061
4062 /* Request epoch must be >= our currentEpoch.
4063 * Note that it is impossible for it to actually be greater since
4064 * our currentEpoch was updated as a side effect of receiving this
4065 * request, if the request epoch was greater. */
4066 if (requestCurrentEpoch < server.cluster->currentEpoch) {
4067 serverLog(LL_WARNING,
4068 "Failover auth denied to %.40s (%s): reqEpoch (%llu) < curEpoch(%llu)",
4069 node->name, node->human_nodename,
4070 (unsigned long long) requestCurrentEpoch,
4071 (unsigned long long) server.cluster->currentEpoch);
4072 return;
4073 }
4074
4075 /* I already voted for this epoch? Return ASAP. */
4076 if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
4077 serverLog(LL_WARNING,
4078 "Failover auth denied to %.40s (%s): already voted for epoch %llu",
4079 node->name, node->human_nodename,
4080 (unsigned long long) server.cluster->currentEpoch);
4081 return;
4082 }
4083
4084 /* Node must be a slave and its master down.
4085 * The master can be non failing if the request is flagged
4086 * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
4087 if (clusterNodeIsMaster(node) || master == NULL ||
4088 (!nodeFailed(master) && !force_ack))
4089 {
4090 if (clusterNodeIsMaster(node)) {
4091 serverLog(LL_WARNING,
4092 "Failover auth denied to %.40s (%s): it is a master node",
4093 node->name, node->human_nodename);
4094 } else if (master == NULL) {
4095 serverLog(LL_WARNING,
4096 "Failover auth denied to %.40s (%s): I don't know its master",
4097 node->name, node->human_nodename);
4098 } else if (!nodeFailed(master)) {
4099 serverLog(LL_WARNING,
4100 "Failover auth denied to %.40s (%s): its master is up",
4101 node->name, node->human_nodename);
4102 }
4103 return;
4104 }
4105
4106 /* We did not voted for a slave about this master for two
4107 * times the node timeout. This is not strictly needed for correctness
4108 * of the algorithm but makes the base case more linear. */
4109 if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
4110 {
4111 serverLog(LL_WARNING,
4112 "Failover auth denied to %.40s %s: "
4113 "can't vote about this master before %lld milliseconds",
4114 node->name, node->human_nodename,
4115 (long long) ((server.cluster_node_timeout*2)-
4116 (mstime() - node->slaveof->voted_time)));
4117 return;
4118 }
4119
4120 /* The slave requesting the vote must have a configEpoch for the claimed
4121 * slots that is >= the one of the masters currently serving the same
4122 * slots in the current configuration. */
4123 for (j = 0; j < CLUSTER_SLOTS; j++) {
4124 if (bitmapTestBit(claimed_slots, j) == 0) continue;
4125 if (isSlotUnclaimed(j) ||
4126 server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
4127 {
4128 continue;
4129 }
4130 /* If we reached this point we found a slot that in our current slots
4131 * is served by a master with a greater configEpoch than the one claimed
4132 * by the slave requesting our vote. Refuse to vote for this slave. */
4133 serverLog(LL_WARNING,
4134 "Failover auth denied to %.40s (%s): "
4135 "slot %d epoch (%llu) > reqEpoch (%llu)",
4136 node->name, node->human_nodename, j,
4137 (unsigned long long) server.cluster->slots[j]->configEpoch,
4138 (unsigned long long) requestConfigEpoch);
4139 return;
4140 }
4141
4142 /* We can vote for this slave. */
4143 server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
4144 node->slaveof->voted_time = mstime();
4145 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG);
4146 clusterSendFailoverAuth(node);
4147 serverLog(LL_NOTICE, "Failover auth granted to %.40s (%s) for epoch %llu",
4148 node->name, node->human_nodename, (unsigned long long) server.cluster->currentEpoch);
4149}
4150
4151/* This function returns the "rank" of this instance, a slave, in the context
4152 * of its master-slaves ring. The rank of the slave is given by the number of
4153 * other slaves for the same master that have a better replication offset
4154 * compared to the local one (better means, greater, so they claim more data).
4155 *
4156 * A slave with rank 0 is the one with the greatest (most up to date)
4157 * replication offset, and so forth. Note that because how the rank is computed
4158 * multiple slaves may have the same rank, in case they have the same offset.
4159 *
4160 * The slave rank is used to add a delay to start an election in order to
4161 * get voted and replace a failing master. Slaves with better replication
4162 * offsets are more likely to win. */
4163int clusterGetSlaveRank(void) {
4164 long long myoffset;
4165 int j, rank = 0;
4166 clusterNode *master;
4167
4168 serverAssert(nodeIsSlave(myself));
4169 master = myself->slaveof;
4170 if (master == NULL) return 0; /* Never called by slaves without master. */
4171
4172 myoffset = replicationGetSlaveOffset();
4173 for (j = 0; j < master->numslaves; j++)
4174 if (master->slaves[j] != myself &&
4175 !nodeCantFailover(master->slaves[j]) &&
4176 master->slaves[j]->repl_offset > myoffset) rank++;
4177 return rank;
4178}
4179
4180/* This function is called by clusterHandleSlaveFailover() in order to
4181 * let the slave log why it is not able to failover. Sometimes there are
4182 * not the conditions, but since the failover function is called again and
4183 * again, we can't log the same things continuously.
4184 *
4185 * This function works by logging only if a given set of conditions are
4186 * true:
4187 *
4188 * 1) The reason for which the failover can't be initiated changed.
4189 * The reasons also include a NONE reason we reset the state to
4190 * when the slave finds that its master is fine (no FAIL flag).
4191 * 2) Also, the log is emitted again if the master is still down and
4192 * the reason for not failing over is still the same, but more than
4193 * CLUSTER_CANT_FAILOVER_RELOG_PERIOD seconds elapsed.
4194 * 3) Finally, the function only logs if the slave is down for more than
4195 * five seconds + NODE_TIMEOUT. This way nothing is logged when a
4196 * failover starts in a reasonable time.
4197 *
4198 * The function is called with the reason why the slave can't failover
4199 * which is one of the integer macros CLUSTER_CANT_FAILOVER_*.
4200 *
4201 * The function is guaranteed to be called only if 'myself' is a slave. */
4202void clusterLogCantFailover(int reason) {
4203 char *msg;
4204 static time_t lastlog_time = 0;
4205 mstime_t nolog_fail_time = server.cluster_node_timeout + 5000;
4206
4207 /* Don't log if we have the same reason for some time. */
4208 if (reason == server.cluster->cant_failover_reason &&
4209 time(NULL)-lastlog_time < CLUSTER_CANT_FAILOVER_RELOG_PERIOD)
4210 return;
4211
4212 server.cluster->cant_failover_reason = reason;
4213
4214 /* We also don't emit any log if the master failed no long ago, the
4215 * goal of this function is to log slaves in a stalled condition for
4216 * a long time. */
4217 if (myself->slaveof &&
4218 nodeFailed(myself->slaveof) &&
4219 (mstime() - myself->slaveof->fail_time) < nolog_fail_time) return;
4220
4221 switch(reason) {
4222 case CLUSTER_CANT_FAILOVER_DATA_AGE:
4223 msg = "Disconnected from master for longer than allowed. "
4224 "Please check the 'cluster-replica-validity-factor' configuration "
4225 "option.";
4226 break;
4227 case CLUSTER_CANT_FAILOVER_WAITING_DELAY:
4228 msg = "Waiting the delay before I can start a new failover.";
4229 break;
4230 case CLUSTER_CANT_FAILOVER_EXPIRED:
4231 msg = "Failover attempt expired.";
4232 break;
4233 case CLUSTER_CANT_FAILOVER_WAITING_VOTES:
4234 msg = "Waiting for votes, but majority still not reached.";
4235 break;
4236 default:
4237 msg = "Unknown reason code.";
4238 break;
4239 }
4240 lastlog_time = time(NULL);
4241 serverLog(LL_NOTICE,"Currently unable to failover: %s", msg);
4242
4243 int cur_vote = server.cluster->failover_auth_count;
4244 int cur_quorum = (server.cluster->size / 2) + 1;
4245 /* Emits a log when an election is in progress and waiting for votes or when the failover attempt expired. */
4246 if (reason == CLUSTER_CANT_FAILOVER_WAITING_VOTES || reason == CLUSTER_CANT_FAILOVER_EXPIRED) {
4247 serverLog(LL_NOTICE, "Needed quorum: %d. Number of votes received so far: %d", cur_quorum, cur_vote);
4248 }
4249}
4250
4251/* This function implements the final part of automatic and manual failovers,
4252 * where the slave grabs its master's hash slots, and propagates the new
4253 * configuration.
4254 *
4255 * Note that it's up to the caller to be sure that the node got a new
4256 * configuration epoch already. */
4257void clusterFailoverReplaceYourMaster(void) {
4258 int j;
4259 clusterNode *oldmaster = myself->slaveof;
4260
4261 if (clusterNodeIsMaster(myself) || oldmaster == NULL) return;
4262
4263 /* 1) Turn this node into a master. */
4264 clusterSetNodeAsMaster(myself);
4265 replicationUnsetMaster();
4266
4267 /* 2) Claim all the slots assigned to our master. */
4268 for (j = 0; j < CLUSTER_SLOTS; j++) {
4269 if (clusterNodeCoversSlot(oldmaster, j)) {
4270 clusterDelSlot(j);
4271 clusterAddSlot(myself,j);
4272 }
4273 }
4274
4275 /* 3) Update state and save config. */
4276 clusterUpdateState();
4277 clusterSaveConfigOrDie(1);
4278
4279 /* 4) Pong all the other nodes so that they can update the state
4280 * accordingly and detect that we switched to master role. */
4281 clusterDoBeforeSleep(CLUSTER_TODO_BROADCAST_PONG);
4282
4283 /* 5) If there was a manual failover in progress, clear the state. */
4284 resetManualFailover();
4285
4286 /* 6) Handle the ASM task from previous master. */
4287 asmFinalizeMasterTask();
4288}
4289
4290/* This function is called if we are a slave node and our master serving
4291 * a non-zero amount of hash slots is in FAIL state.
4292 *
4293 * The goal of this function is:
4294 * 1) To check if we are able to perform a failover, is our data updated?
4295 * 2) Try to get elected by masters.
4296 * 3) Perform the failover informing all the other nodes.
4297 */
4298void clusterHandleSlaveFailover(void) {
4299 mstime_t data_age;
4300 mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
4301 int needed_quorum = (server.cluster->size / 2) + 1;
4302 int manual_failover = server.cluster->mf_end != 0 &&
4303 server.cluster->mf_can_start;
4304 mstime_t auth_timeout, auth_retry_time;
4305
4306 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;
4307
4308 /* Compute the failover timeout (the max time we have to send votes
4309 * and wait for replies), and the failover retry time (the time to wait
4310 * before trying to get voted again).
4311 *
4312 * Timeout is MAX(NODE_TIMEOUT*2,2000) milliseconds.
4313 * Retry is two times the Timeout.
4314 */
4315 auth_timeout = server.cluster_node_timeout*2;
4316 if (auth_timeout < 2000) auth_timeout = 2000;
4317 auth_retry_time = auth_timeout*2;
4318
4319 /* Pre conditions to run the function, that must be met both in case
4320 * of an automatic or manual failover:
4321 * 1) We are a slave.
4322 * 2) Our master is flagged as FAIL, or this is a manual failover.
4323 * 3) We don't have the no failover configuration set, and this is
4324 * not a manual failover.
4325 * 4) It is serving slots. */
4326 if (clusterNodeIsMaster(myself) ||
4327 myself->slaveof == NULL ||
4328 (!nodeFailed(myself->slaveof) && !manual_failover) ||
4329 (server.cluster_slave_no_failover && !manual_failover) ||
4330 myself->slaveof->numslots == 0)
4331 {
4332 /* There are no reasons to failover, so we set the reason why we
4333 * are returning without failing over to NONE. */
4334 server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE;
4335 return;
4336 }
4337
4338 /* Set data_age to the number of milliseconds we are disconnected from
4339 * the master. */
4340 if (server.repl_state == REPL_STATE_CONNECTED) {
4341 data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
4342 * 1000;
4343 } else {
4344 data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
4345 }
4346
4347 /* Remove the node timeout from the data age as it is fine that we are
4348 * disconnected from our master at least for the time it was down to be
4349 * flagged as FAIL, that's the baseline. */
4350 if (data_age > server.cluster_node_timeout)
4351 data_age -= server.cluster_node_timeout;
4352
4353 /* Check if our data is recent enough according to the slave validity
4354 * factor configured by the user.
4355 *
4356 * Check bypassed for manual failovers. */
4357 if (server.cluster_slave_validity_factor &&
4358 data_age >
4359 (((mstime_t)server.repl_ping_slave_period * 1000) +
4360 (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
4361 {
4362 if (!manual_failover) {
4363 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_DATA_AGE);
4364 return;
4365 }
4366 }
4367
4368 /* If the previous failover attempt timeout and the retry time has
4369 * elapsed, we can setup a new one. */
4370 if (auth_age > auth_retry_time) {
4371 server.cluster->failover_auth_time = mstime() +
4372 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
4373 random() % 500; /* Random delay between 0 and 500 milliseconds. */
4374 server.cluster->failover_auth_count = 0;
4375 server.cluster->failover_auth_sent = 0;
4376 server.cluster->failover_auth_rank = clusterGetSlaveRank();
4377 /* We add another delay that is proportional to the slave rank.
4378 * Specifically 1 second * rank. This way slaves that have a probably
4379 * less updated replication offset, are penalized. */
4380 server.cluster->failover_auth_time +=
4381 server.cluster->failover_auth_rank * 1000;
4382 /* However if this is a manual failover, no delay is needed. */
4383 if (server.cluster->mf_end) {
4384 server.cluster->failover_auth_time = mstime();
4385 server.cluster->failover_auth_rank = 0;
4386 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
4387 }
4388 serverLog(LL_NOTICE,
4389 "Start of election delayed for %lld milliseconds "
4390 "(rank #%d, offset %lld).",
4391 server.cluster->failover_auth_time - mstime(),
4392 server.cluster->failover_auth_rank,
4393 replicationGetSlaveOffset());
4394 /* Now that we have a scheduled election, broadcast our offset
4395 * to all the other slaves so that they'll updated their offsets
4396 * if our offset is better. */
4397 clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
4398 return;
4399 }
4400
4401 /* It is possible that we received more updated offsets from other
4402 * slaves for the same master since we computed our election delay.
4403 * Update the delay if our rank changed.
4404 *
4405 * Not performed if this is a manual failover. */
4406 if (server.cluster->failover_auth_sent == 0 &&
4407 server.cluster->mf_end == 0)
4408 {
4409 int newrank = clusterGetSlaveRank();
4410 if (newrank > server.cluster->failover_auth_rank) {
4411 long long added_delay =
4412 (newrank - server.cluster->failover_auth_rank) * 1000;
4413 server.cluster->failover_auth_time += added_delay;
4414 server.cluster->failover_auth_rank = newrank;
4415 serverLog(LL_NOTICE,
4416 "Replica rank updated to #%d, added %lld milliseconds of delay.",
4417 newrank, added_delay);
4418 }
4419 }
4420
4421 /* Return ASAP if we can't still start the election. */
4422 if (mstime() < server.cluster->failover_auth_time) {
4423 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_DELAY);
4424 return;
4425 }
4426
4427 /* Return ASAP if the election is too old to be valid. */
4428 if (auth_age > auth_timeout) {
4429 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_EXPIRED);
4430 return;
4431 }
4432
4433 /* Ask for votes if needed. */
4434 if (server.cluster->failover_auth_sent == 0) {
4435 server.cluster->currentEpoch++;
4436 server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
4437 serverLog(LL_NOTICE,"Starting a failover election for epoch %llu.",
4438 (unsigned long long) server.cluster->currentEpoch);
4439 clusterRequestFailoverAuth();
4440 server.cluster->failover_auth_sent = 1;
4441 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
4442 CLUSTER_TODO_UPDATE_STATE|
4443 CLUSTER_TODO_FSYNC_CONFIG);
4444 return; /* Wait for replies. */
4445 }
4446
4447 /* Check if we reached the quorum. */
4448 if (server.cluster->failover_auth_count >= needed_quorum) {
4449 /* We have the quorum, we can finally failover the master. */
4450
4451 serverLog(LL_NOTICE,
4452 "Failover election won: I'm the new master.");
4453
4454 /* Update my configEpoch to the epoch of the election. */
4455 if (myself->configEpoch < server.cluster->failover_auth_epoch) {
4456 myself->configEpoch = server.cluster->failover_auth_epoch;
4457 serverLog(LL_NOTICE,
4458 "configEpoch set to %llu after successful failover",
4459 (unsigned long long) myself->configEpoch);
4460 }
4461
4462 /* Take responsibility for the cluster slots. */
4463 clusterFailoverReplaceYourMaster();
4464 } else {
4465 clusterLogCantFailover(CLUSTER_CANT_FAILOVER_WAITING_VOTES);
4466 }
4467}
4468
4469/* -----------------------------------------------------------------------------
4470 * CLUSTER slave migration
4471 *
4472 * Slave migration is the process that allows a slave of a master that is
4473 * already covered by at least another slave, to "migrate" to a master that
4474 * is orphaned, that is, left with no working slaves.
4475 * ------------------------------------------------------------------------- */
4476
4477/* This function is responsible to decide if this replica should be migrated
4478 * to a different (orphaned) master. It is called by the clusterCron() function
4479 * only if:
4480 *
4481 * 1) We are a slave node.
4482 * 2) It was detected that there is at least one orphaned master in
4483 * the cluster.
4484 * 3) We are a slave of one of the masters with the greatest number of
4485 * slaves.
4486 *
4487 * This checks are performed by the caller since it requires to iterate
4488 * the nodes anyway, so we spend time into clusterHandleSlaveMigration()
4489 * if definitely needed.
4490 *
4491 * The function is called with a pre-computed max_slaves, that is the max
4492 * number of working (not in FAIL state) slaves for a single master.
4493 *
4494 * Additional conditions for migration are examined inside the function.
4495 */
4496void clusterHandleSlaveMigration(int max_slaves) {
4497 int j, okslaves = 0;
4498 clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
4499 dictIterator di;
4500 dictEntry *de;
4501
4502 /* Step 1: Don't migrate if the cluster state is not ok. */
4503 if (server.cluster->state != CLUSTER_OK) return;
4504
4505 /* Step 2: Don't migrate if my master will not be left with at least
4506 * 'migration-barrier' slaves after my migration. */
4507 if (mymaster == NULL) return;
4508 for (j = 0; j < mymaster->numslaves; j++)
4509 if (!nodeFailed(mymaster->slaves[j]) &&
4510 !nodeTimedOut(mymaster->slaves[j])) okslaves++;
4511 if (okslaves <= server.cluster_migration_barrier) return;
4512
4513 /* Step 3: Identify a candidate for migration, and check if among the
4514 * masters with the greatest number of ok slaves, I'm the one with the
4515 * smallest node ID (the "candidate slave").
4516 *
4517 * Note: this means that eventually a replica migration will occur
4518 * since slaves that are reachable again always have their FAIL flag
4519 * cleared, so eventually there must be a candidate.
4520 * There is a possible race condition causing multiple
4521 * slaves to migrate at the same time, but this is unlikely to
4522 * happen and relatively harmless when it does. */
4523 candidate = myself;
4524 dictInitSafeIterator(&di, server.cluster->nodes);
4525 while((de = dictNext(&di)) != NULL) {
4526 clusterNode *node = dictGetVal(de);
4527 int okslaves = 0, is_orphaned = 1;
4528
4529 /* We want to migrate only if this master is working, orphaned, and
4530 * used to have slaves or if failed over a master that had slaves
4531 * (MIGRATE_TO flag). This way we only migrate to instances that were
4532 * supposed to have replicas. */
4533 if (nodeIsSlave(node) || nodeFailed(node)) is_orphaned = 0;
4534 if (!(node->flags & CLUSTER_NODE_MIGRATE_TO)) is_orphaned = 0;
4535
4536 /* Check number of working slaves. */
4537 if (clusterNodeIsMaster(node)) okslaves = clusterCountNonFailingSlaves(node);
4538 if (okslaves > 0) is_orphaned = 0;
4539
4540 if (is_orphaned) {
4541 if (!target && node->numslots > 0) target = node;
4542
4543 /* Track the starting time of the orphaned condition for this
4544 * master. */
4545 if (!node->orphaned_time) node->orphaned_time = mstime();
4546 } else {
4547 node->orphaned_time = 0;
4548 }
4549
4550 /* Check if I'm the slave candidate for the migration: attached
4551 * to a master with the maximum number of slaves and with the smallest
4552 * node ID. */
4553 if (okslaves == max_slaves) {
4554 for (j = 0; j < node->numslaves; j++) {
4555 if (memcmp(node->slaves[j]->name,
4556 candidate->name,
4557 CLUSTER_NAMELEN) < 0)
4558 {
4559 candidate = node->slaves[j];
4560 }
4561 }
4562 }
4563 }
4564 dictResetIterator(&di);
4565
4566 /* Step 4: perform the migration if there is a target, and if I'm the
4567 * candidate, but only if the master is continuously orphaned for a
4568 * couple of seconds, so that during failovers, we give some time to
4569 * the natural slaves of this instance to advertise their switch from
4570 * the old master to the new one. */
4571 if (target && candidate == myself &&
4572 (mstime()-target->orphaned_time) > CLUSTER_SLAVE_MIGRATION_DELAY &&
4573 !(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
4574 {
4575 serverLog(LL_NOTICE,"Migrating to orphaned master %.40s",
4576 target->name);
4577 clusterSetMaster(target);
4578 /* Save the new config and broadcast it to the other nodes. */
4579 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
4580 CLUSTER_TODO_FSYNC_CONFIG|
4581 CLUSTER_TODO_BROADCAST_PONG);
4582 }
4583}
4584
4585/* -----------------------------------------------------------------------------
4586 * CLUSTER manual failover
4587 *
4588 * This are the important steps performed by slaves during a manual failover:
4589 * 1) User send CLUSTER FAILOVER command. The failover state is initialized
4590 * setting mf_end to the millisecond unix time at which we'll abort the
4591 * attempt.
4592 * 2) Slave sends a MFSTART message to the master requesting to pause clients
4593 * for two times the manual failover timeout CLUSTER_MF_TIMEOUT.
4594 * When master is paused for manual failover, it also starts to flag
4595 * packets with CLUSTERMSG_FLAG0_PAUSED.
4596 * 3) Slave waits for master to send its replication offset flagged as PAUSED.
4597 * 4) If slave received the offset from the master, and its offset matches,
4598 * mf_can_start is set to 1, and clusterHandleSlaveFailover() will perform
4599 * the failover as usually, with the difference that the vote request
4600 * will be modified to force masters to vote for a slave that has a
4601 * working master.
4602 *
4603 * From the point of view of the master things are simpler: when a
4604 * PAUSE_CLIENTS packet is received the master sets mf_end as well and
4605 * the sender in mf_slave. During the time limit for the manual failover
4606 * the master will just send PINGs more often to this slave, flagged with
4607 * the PAUSED flag, so that the slave will set mf_master_offset when receiving
4608 * a packet from the master with this flag set.
4609 *
4610 * The goal of the manual failover is to perform a fast failover without
4611 * data loss due to the asynchronous master-slave replication.
4612 * -------------------------------------------------------------------------- */
4613
4614/* Reset the manual failover state. This works for both masters and slaves
4615 * as all the state about manual failover is cleared.
4616 *
4617 * The function can be used both to initialize the manual failover state at
4618 * startup or to abort a manual failover in progress. */
4619void resetManualFailover(void) {
4620 if (server.cluster->mf_slave) {
4621 /* We were a master failing over, so we paused clients and related actions.
4622 * Regardless of the outcome we unpause now to allow traffic again. */
4623 unpauseActions(PAUSE_DURING_FAILOVER);
4624 }
4625 server.cluster->mf_end = 0; /* No manual failover in progress. */
4626 server.cluster->mf_can_start = 0;
4627 server.cluster->mf_slave = NULL;
4628 server.cluster->mf_master_offset = -1;
4629}
4630
4631/* If a manual failover timed out, abort it. */
4632void manualFailoverCheckTimeout(void) {
4633 if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
4634 serverLog(LL_WARNING,"Manual failover timed out.");
4635 resetManualFailover();
4636 }
4637}
4638
4639/* This function is called from the cluster cron function in order to go
4640 * forward with a manual failover state machine. */
4641void clusterHandleManualFailover(void) {
4642 /* Return ASAP if no manual failover is in progress. */
4643 if (server.cluster->mf_end == 0) return;
4644
4645 /* If mf_can_start is non-zero, the failover was already triggered so the
4646 * next steps are performed by clusterHandleSlaveFailover(). */
4647 if (server.cluster->mf_can_start) return;
4648
4649 if (server.cluster->mf_master_offset == -1) return; /* Wait for offset... */
4650
4651 if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
4652 /* Our replication offset matches the master replication offset
4653 * announced after clients were paused. We can start the failover. */
4654 server.cluster->mf_can_start = 1;
4655 serverLog(LL_NOTICE,
4656 "All master replication stream processed, "
4657 "manual failover can start.");
4658 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
4659 return;
4660 }
4661 clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_MANUALFAILOVER);
4662}
4663
4664/* -----------------------------------------------------------------------------
4665 * CLUSTER cron job
4666 * -------------------------------------------------------------------------- */
4667
4668/* Check if the node is disconnected and re-establish the connection.
4669 * Also update a few stats while we are here, that can be used to make
4670 * better decisions in other part of the code. */
4671static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout, mstime_t now) {
4672 /* Not interested in reconnecting the link with myself or nodes
4673 * for which we have no address. */
4674 if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) return 1;
4675
4676 if (node->flags & CLUSTER_NODE_PFAIL)
4677 server.cluster->stats_pfail_nodes++;
4678
4679 /* A Node in HANDSHAKE state has a limited lifespan equal to the
4680 * configured node timeout. */
4681 if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
4682 clusterDelNode(node);
4683 return 1;
4684 }
4685
4686 if (node->link == NULL) {
4687 clusterLink *link = createClusterLink(node);
4688 link->conn = connCreate(server.el, connTypeOfCluster());
4689 connSetPrivateData(link->conn, link);
4690 if (connConnect(link->conn, node->ip, node->cport, server.bind_source_addr,
4691 clusterLinkConnectHandler) == C_ERR) {
4692 /* We got a synchronous error from connect before
4693 * clusterSendPing() had a chance to be called.
4694 * If node->ping_sent is zero, failure detection can't work,
4695 * so we claim we actually sent a ping now (that will
4696 * be really sent as soon as the link is obtained). */
4697 if (node->ping_sent == 0) node->ping_sent = mstime();
4698 serverLog(LL_DEBUG, "Unable to connect to "
4699 "Cluster Node [%s]:%d -> %s", node->ip,
4700 node->cport, server.neterr);
4701
4702 freeClusterLink(link);
4703 return 0;
4704 }
4705 }
4706 return 0;
4707}
4708
4709static void freeClusterLinkOnBufferLimitReached(clusterLink *link) {
4710 if (link == NULL || server.cluster_link_msg_queue_limit_bytes == 0) {
4711 return;
4712 }
4713
4714 unsigned long long mem_link = link->send_msg_queue_mem;
4715 if (mem_link > server.cluster_link_msg_queue_limit_bytes) {
4716 serverLog(LL_WARNING, "Freeing cluster link(%s node %.40s, used memory: %llu) due to "
4717 "exceeding send buffer memory limit.", link->inbound ? "from" : "to",
4718 link->node ? link->node->name : "", mem_link);
4719 freeClusterLink(link);
4720 server.cluster->stat_cluster_links_buffer_limit_exceeded++;
4721 }
4722}
4723
4724/* Free outbound link to a node if its send buffer size exceeded limit. */
4725static void clusterNodeCronFreeLinkOnBufferLimitReached(clusterNode *node) {
4726 freeClusterLinkOnBufferLimitReached(node->link);
4727 freeClusterLinkOnBufferLimitReached(node->inbound_link);
4728}
4729
4730/* This is executed 10 times every second */
4731void clusterCron(void) {
4732 dictIterator di;
4733 dictEntry *de;
4734 int update_state = 0;
4735 int orphaned_masters; /* How many masters there are without ok slaves. */
4736 int max_slaves; /* Max number of ok slaves for a single master. */
4737 int this_slaves; /* Number of ok slaves for our master (if we are slave). */
4738 mstime_t min_pong = 0, now = mstime();
4739 clusterNode *min_pong_node = NULL;
4740 static unsigned long long iteration = 0;
4741 mstime_t handshake_timeout;
4742
4743 iteration++; /* Number of times this function was called so far. */
4744
4745 clusterUpdateMyselfHostname();
4746
4747 /* The handshake timeout is the time after which a handshake node that was
4748 * not turned into a normal node is removed from the nodes. Usually it is
4749 * just the NODE_TIMEOUT value, but when NODE_TIMEOUT is too small we use
4750 * the value of 1 second. */
4751 handshake_timeout = server.cluster_node_timeout;
4752 if (handshake_timeout < 1000) handshake_timeout = 1000;
4753
4754 /* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */
4755 server.cluster->stats_pfail_nodes = 0;
4756 /* Run through some of the operations we want to do on each cluster node. */
4757 dictInitSafeIterator(&di, server.cluster->nodes);
4758 while((de = dictNext(&di)) != NULL) {
4759 clusterNode *node = dictGetVal(de);
4760 /* We free the inbound or outboud link to the node if the link has an
4761 * oversized message send queue and immediately try reconnecting. */
4762 clusterNodeCronFreeLinkOnBufferLimitReached(node);
4763 /* The protocol is that function(s) below return non-zero if the node was
4764 * terminated.
4765 */
4766 if(clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue;
4767 }
4768 dictResetIterator(&di);
4769
4770 /* Ping some random node 1 time every 10 iterations, so that we usually ping
4771 * one random node every second. */
4772 if (!(iteration % 10)) {
4773 int j;
4774
4775 /* Check a few random nodes and ping the one with the oldest
4776 * pong_received time. */
4777 for (j = 0; j < 5; j++) {
4778 de = dictGetRandomKey(server.cluster->nodes);
4779 clusterNode *this = dictGetVal(de);
4780
4781 /* Don't ping nodes disconnected or with a ping currently active. */
4782 if (this->link == NULL || this->ping_sent != 0) continue;
4783 if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
4784 continue;
4785 if (min_pong_node == NULL || min_pong > this->pong_received) {
4786 min_pong_node = this;
4787 min_pong = this->pong_received;
4788 }
4789 }
4790 if (min_pong_node) {
4791 serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
4792 clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
4793 }
4794 }
4795
4796 /* Iterate nodes to check if we need to flag something as failing.
4797 * This loop is also responsible to:
4798 * 1) Check if there are orphaned masters (masters without non failing
4799 * slaves).
4800 * 2) Count the max number of non failing slaves for a single master.
4801 * 3) Count the number of slaves for our master, if we are a slave. */
4802 orphaned_masters = 0;
4803 max_slaves = 0;
4804 this_slaves = 0;
4805 dictInitSafeIterator(&di, server.cluster->nodes);
4806 while((de = dictNext(&di)) != NULL) {
4807 clusterNode *node = dictGetVal(de);
4808 now = mstime(); /* Use an updated time at every iteration. */
4809
4810 if (node->flags &
4811 (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
4812 continue;
4813
4814 /* Orphaned master check, useful only if the current instance
4815 * is a slave that may migrate to another master. */
4816 if (nodeIsSlave(myself) && clusterNodeIsMaster(node) && !nodeFailed(node)) {
4817 int okslaves = clusterCountNonFailingSlaves(node);
4818
4819 /* A master is orphaned if it is serving a non-zero number of
4820 * slots, have no working slaves, but used to have at least one
4821 * slave, or failed over a master that used to have slaves. */
4822 if (okslaves == 0 && node->numslots > 0 &&
4823 node->flags & CLUSTER_NODE_MIGRATE_TO)
4824 {
4825 orphaned_masters++;
4826 }
4827 if (okslaves > max_slaves) max_slaves = okslaves;
4828 if (myself->slaveof == node)
4829 this_slaves = okslaves;
4830 }
4831
4832 /* If we are not receiving any data for more than half the cluster
4833 * timeout, reconnect the link: maybe there is a connection
4834 * issue even if the node is alive. */
4835 mstime_t ping_delay = now - node->ping_sent;
4836 mstime_t data_delay = now - node->data_received;
4837 if (node->link && /* is connected */
4838 now - node->link->ctime >
4839 server.cluster_node_timeout && /* was not already reconnected */
4840 node->ping_sent && /* we already sent a ping */
4841 /* and we are waiting for the pong more than timeout/2 */
4842 ping_delay > server.cluster_node_timeout/2 &&
4843 /* and in such interval we are not seeing any traffic at all. */
4844 data_delay > server.cluster_node_timeout/2)
4845 {
4846 /* Disconnect the link, it will be reconnected automatically. */
4847 freeClusterLink(node->link);
4848 }
4849
4850 /* If we have currently no active ping in this instance, and the
4851 * received PONG is older than half the cluster timeout, send
4852 * a new ping now, to ensure all the nodes are pinged without
4853 * a too big delay. */
4854 mstime_t ping_interval = server.cluster_ping_interval ?
4855 server.cluster_ping_interval : server.cluster_node_timeout/2;
4856 if (node->link &&
4857 node->ping_sent == 0 &&
4858 (now - node->pong_received) > ping_interval)
4859 {
4860 clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
4861 continue;
4862 }
4863
4864 /* If we are a master and one of the slaves requested a manual
4865 * failover, ping it continuously. */
4866 if (server.cluster->mf_end &&
4867 clusterNodeIsMaster(myself) &&
4868 server.cluster->mf_slave == node &&
4869 node->link)
4870 {
4871 clusterSendPing(node->link, CLUSTERMSG_TYPE_PING);
4872 continue;
4873 }
4874
4875 /* Check only if we have an active ping for this instance. */
4876 if (node->ping_sent == 0) continue;
4877
4878 /* Check if this node looks unreachable.
4879 * Note that if we already received the PONG, then node->ping_sent
4880 * is zero, so can't reach this code at all, so we don't risk of
4881 * checking for a PONG delay if we didn't sent the PING.
4882 *
4883 * We also consider every incoming data as proof of liveness, since
4884 * our cluster bus link is also used for data: under heavy data
4885 * load pong delays are possible. */
4886 mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :
4887 data_delay;
4888
4889 if (node_delay > server.cluster_node_timeout) {
4890 /* Timeout reached. Set the node as possibly failing if it is
4891 * not already in this state. */
4892 if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {
4893 node->flags |= CLUSTER_NODE_PFAIL;
4894 update_state = 1;
4895 if (clusterNodeIsMaster(myself) && server.cluster->size == 1) {
4896 markNodeAsFailingIfNeeded(node);
4897 } else {
4898 serverLog(LL_DEBUG,"*** NODE %.40s possibly failing", node->name);
4899 }
4900 }
4901 }
4902 }
4903 dictResetIterator(&di);
4904
4905 /* If we are a slave node but the replication is still turned off,
4906 * enable it if we know the address of our master and it appears to
4907 * be up. */
4908 if (nodeIsSlave(myself) &&
4909 server.masterhost == NULL &&
4910 myself->slaveof &&
4911 nodeHasAddr(myself->slaveof))
4912 {
4913 replicationSetMaster(myself->slaveof->ip, getNodeDefaultReplicationPort(myself->slaveof));
4914 }
4915
4916 /* Abort a manual failover if the timeout is reached. */
4917 manualFailoverCheckTimeout();
4918
4919 if (nodeIsSlave(myself)) {
4920 clusterHandleManualFailover();
4921 if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
4922 clusterHandleSlaveFailover();
4923 /* If there are orphaned slaves, and we are a slave among the masters
4924 * with the max number of non-failing slaves, consider migrating to
4925 * the orphaned masters. Note that it does not make sense to try
4926 * a migration if there is no master with at least *two* working
4927 * slaves. */
4928 if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves &&
4929 server.cluster_allow_replica_migration)
4930 clusterHandleSlaveMigration(max_slaves);
4931 }
4932
4933 if (update_state || server.cluster->state == CLUSTER_FAIL)
4934 clusterUpdateState();
4935}
4936
4937/* This function is called before the event handler returns to sleep for
4938 * events. It is useful to perform operations that must be done ASAP in
4939 * reaction to events fired but that are not safe to perform inside event
4940 * handlers, or to perform potentially expansive tasks that we need to do
4941 * a single time before replying to clients. */
4942void clusterBeforeSleep(void) {
4943 int flags = server.cluster->todo_before_sleep;
4944
4945 /* Reset our flags (not strictly needed since every single function
4946 * called for flags set should be able to clear its flag). */
4947 server.cluster->todo_before_sleep = 0;
4948
4949 if (flags & CLUSTER_TODO_HANDLE_MANUALFAILOVER) {
4950 /* Handle manual failover as soon as possible so that won't have a 100ms
4951 * as it was handled only in clusterCron */
4952 if(nodeIsSlave(myself)) {
4953 clusterHandleManualFailover();
4954 if (!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER))
4955 clusterHandleSlaveFailover();
4956 }
4957 } else if (flags & CLUSTER_TODO_HANDLE_FAILOVER) {
4958 /* Handle failover, this is needed when it is likely that there is already
4959 * the quorum from masters in order to react fast. */
4960 clusterHandleSlaveFailover();
4961 }
4962
4963 /* Update the cluster state. */
4964 if (flags & CLUSTER_TODO_UPDATE_STATE)
4965 clusterUpdateState();
4966
4967 /* Save the config, possibly using fsync. */
4968 if (flags & CLUSTER_TODO_SAVE_CONFIG) {
4969 int fsync = flags & CLUSTER_TODO_FSYNC_CONFIG;
4970 clusterSaveConfigOrDie(fsync);
4971 }
4972
4973 /* Broadcast a PONG to all the nodes. */
4974 if (flags & CLUSTER_TODO_BROADCAST_PONG)
4975 clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
4976}
4977
4978void clusterDoBeforeSleep(int flags) {
4979 server.cluster->todo_before_sleep |= flags;
4980}
4981
4982/* -----------------------------------------------------------------------------
4983 * Slots management
4984 * -------------------------------------------------------------------------- */
4985
4986/* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set,
4987 * otherwise 0. */
4988int bitmapTestBit(unsigned char *bitmap, int pos) {
4989 off_t byte = pos/8;
4990 int bit = pos&7;
4991 return (bitmap[byte] & (1<<bit)) != 0;
4992}
4993
4994/* Set the bit at position 'pos' in a bitmap. */
4995void bitmapSetBit(unsigned char *bitmap, int pos) {
4996 off_t byte = pos/8;
4997 int bit = pos&7;
4998 bitmap[byte] |= 1<<bit;
4999}
5000
5001/* Clear the bit at position 'pos' in a bitmap. */
5002void bitmapClearBit(unsigned char *bitmap, int pos) {
5003 off_t byte = pos/8;
5004 int bit = pos&7;
5005 bitmap[byte] &= ~(1<<bit);
5006}
5007
5008/* Return non-zero if there is at least one master with slaves in the cluster.
5009 * Otherwise zero is returned. Used by clusterNodeSetSlotBit() to set the
5010 * MIGRATE_TO flag the when a master gets the first slot. */
5011int clusterMastersHaveSlaves(void) {
5012 dictIterator di;
5013 dictEntry *de;
5014 int slaves = 0;
5015
5016 dictInitSafeIterator(&di, server.cluster->nodes);
5017 while((de = dictNext(&di)) != NULL) {
5018 clusterNode *node = dictGetVal(de);
5019
5020 if (nodeIsSlave(node)) continue;
5021 slaves += node->numslaves;
5022 }
5023 dictResetIterator(&di);
5024 return slaves != 0;
5025}
5026
5027/* Set the slot bit and return the old value. */
5028int clusterNodeSetSlotBit(clusterNode *n, int slot) {
5029 int old = bitmapTestBit(n->slots,slot);
5030 if (!old) {
5031 bitmapSetBit(n->slots,slot);
5032 n->numslots++;
5033 /* When a master gets its first slot, even if it has no slaves,
5034 * it gets flagged with MIGRATE_TO, that is, the master is a valid
5035 * target for replicas migration, if and only if at least one of
5036 * the other masters has slaves right now.
5037 *
5038 * Normally masters are valid targets of replica migration if:
5039 * 1. The used to have slaves (but no longer have).
5040 * 2. They are slaves failing over a master that used to have slaves.
5041 *
5042 * However new masters with slots assigned are considered valid
5043 * migration targets if the rest of the cluster is not a slave-less.
5044 *
5045 * See https://github.com/redis/redis/issues/3043 for more info. */
5046 if (n->numslots == 1 && clusterMastersHaveSlaves())
5047 n->flags |= CLUSTER_NODE_MIGRATE_TO;
5048 }
5049 return old;
5050}
5051
5052/* Clear the slot bit and return the old value. */
5053int clusterNodeClearSlotBit(clusterNode *n, int slot) {
5054 int old = bitmapTestBit(n->slots,slot);
5055 if (old) {
5056 bitmapClearBit(n->slots,slot);
5057 n->numslots--;
5058 }
5059 return old;
5060}
5061
5062/* Return the slot bit from the cluster node structure. */
5063int clusterNodeCoversSlot(clusterNode *n, int slot) {
5064 return bitmapTestBit(n->slots,slot);
5065}
5066
5067/* Add the specified slot to the list of slots that node 'n' will
5068 * serve. Return C_OK if the operation ended with success.
5069 * If the slot is already assigned to another instance this is considered
5070 * an error and C_ERR is returned. */
5071int clusterAddSlot(clusterNode *n, int slot) {
5072 if (server.cluster->slots[slot]) return C_ERR;
5073 clusterNodeSetSlotBit(n,slot);
5074 server.cluster->slots[slot] = n;
5075 /* Make owner_not_claiming_slot flag consistent with slot ownership information. */
5076 bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
5077 clusterSlotStatReset(slot);
5078 return C_OK;
5079}
5080
5081/* Delete the specified slot marking it as unassigned.
5082 * Returns C_OK if the slot was assigned, otherwise if the slot was
5083 * already unassigned C_ERR is returned. */
5084int clusterDelSlot(int slot) {
5085 clusterNode *n = server.cluster->slots[slot];
5086
5087 if (!n) return C_ERR;
5088
5089 /* Cleanup the channels in master/replica as part of slot deletion. */
5090 removeChannelsInSlot(slot);
5091 /* Clear the slot bit. */
5092 serverAssert(clusterNodeClearSlotBit(n,slot) == 1);
5093 server.cluster->slots[slot] = NULL;
5094 /* Make owner_not_claiming_slot flag consistent with slot ownership information. */
5095 bitmapClearBit(server.cluster->owner_not_claiming_slot, slot);
5096 clusterSlotStatReset(slot);
5097 return C_OK;
5098}
5099
5100/* Transfer slots from `from_node` to `to_node`.
5101 * Iterates over all cluster slots, transferring each slot covered by `from_node` to `to_node`.
5102 * Counts and returns the number of slots transferred. */
5103int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node) {
5104 int processed = 0;
5105
5106 for (int j = 0; j < CLUSTER_SLOTS; j++) {
5107 if (clusterNodeCoversSlot(from_node, j)) {
5108 clusterDelSlot(j);
5109 clusterAddSlot(to_node, j);
5110 processed++;
5111 }
5112 }
5113 return processed;
5114}
5115
5116/* Delete all the slots associated with the specified node.
5117 * The number of deleted slots is returned. */
5118int clusterDelNodeSlots(clusterNode *node) {
5119 int deleted = 0, j;
5120
5121 for (j = 0; j < CLUSTER_SLOTS; j++) {
5122 if (clusterNodeCoversSlot(node, j)) {
5123 clusterDelSlot(j);
5124 deleted++;
5125 }
5126 }
5127 return deleted;
5128}
5129
5130/* Clear the migrating / importing state for all the slots.
5131 * This is useful at initialization and when turning a master into slave. */
5132void clusterCloseAllSlots(void) {
5133 memset(server.cluster->migrating_slots_to,0,
5134 sizeof(server.cluster->migrating_slots_to));
5135 memset(server.cluster->importing_slots_from,0,
5136 sizeof(server.cluster->importing_slots_from));
5137}
5138
5139/* -----------------------------------------------------------------------------
5140 * Cluster state evaluation function
5141 * -------------------------------------------------------------------------- */
5142
5143/* The following are defines that are only used in the evaluation function
5144 * and are based on heuristics. Actually the main point about the rejoin and
5145 * writable delay is that they should be a few orders of magnitude larger
5146 * than the network latency. */
5147#define CLUSTER_MAX_REJOIN_DELAY 5000
5148#define CLUSTER_MIN_REJOIN_DELAY 500
5149#define CLUSTER_WRITABLE_DELAY 2000
5150
5151void clusterUpdateState(void) {
5152 int j, new_state;
5153 int reachable_masters = 0;
5154 static mstime_t among_minority_time;
5155 static mstime_t first_call_time = 0;
5156
5157 server.cluster->todo_before_sleep &= ~CLUSTER_TODO_UPDATE_STATE;
5158
5159 /* If this is a master node, wait some time before turning the state
5160 * into OK, since it is not a good idea to rejoin the cluster as a writable
5161 * master, after a reboot, without giving the cluster a chance to
5162 * reconfigure this node. Note that the delay is calculated starting from
5163 * the first call to this function and not since the server start, in order
5164 * to not count the DB loading time. */
5165 if (first_call_time == 0) first_call_time = mstime();
5166 if (clusterNodeIsMaster(myself) &&
5167 server.cluster->state == CLUSTER_FAIL &&
5168 mstime() - first_call_time < CLUSTER_WRITABLE_DELAY) return;
5169
5170 /* Start assuming the state is OK. We'll turn it into FAIL if there
5171 * are the right conditions. */
5172 new_state = CLUSTER_OK;
5173
5174 /* Check if all the slots are covered. */
5175 if (server.cluster_require_full_coverage) {
5176 for (j = 0; j < CLUSTER_SLOTS; j++) {
5177 if (server.cluster->slots[j] == NULL ||
5178 server.cluster->slots[j]->flags & (CLUSTER_NODE_FAIL))
5179 {
5180 new_state = CLUSTER_FAIL;
5181 break;
5182 }
5183 }
5184 }
5185
5186 /* Compute the cluster size, that is the number of master nodes
5187 * serving at least a single slot.
5188 *
5189 * At the same time count the number of reachable masters having
5190 * at least one slot. */
5191 {
5192 dictIterator di;
5193 dictEntry *de;
5194
5195 server.cluster->size = 0;
5196 dictInitSafeIterator(&di, server.cluster->nodes);
5197 while((de = dictNext(&di)) != NULL) {
5198 clusterNode *node = dictGetVal(de);
5199
5200 if (clusterNodeIsMaster(node) && node->numslots) {
5201 server.cluster->size++;
5202 if ((node->flags & (CLUSTER_NODE_FAIL|CLUSTER_NODE_PFAIL)) == 0)
5203 reachable_masters++;
5204 }
5205 }
5206 dictResetIterator(&di);
5207 }
5208
5209 /* If we are in a minority partition, change the cluster state
5210 * to FAIL. */
5211 {
5212 int needed_quorum = (server.cluster->size / 2) + 1;
5213
5214 if (reachable_masters < needed_quorum) {
5215 new_state = CLUSTER_FAIL;
5216 among_minority_time = mstime();
5217 }
5218 }
5219
5220 /* Log a state change */
5221 if (new_state != server.cluster->state) {
5222 mstime_t rejoin_delay = server.cluster_node_timeout;
5223
5224 /* If the instance is a master and was partitioned away with the
5225 * minority, don't let it accept queries for some time after the
5226 * partition heals, to make sure there is enough time to receive
5227 * a configuration update. */
5228 if (rejoin_delay > CLUSTER_MAX_REJOIN_DELAY)
5229 rejoin_delay = CLUSTER_MAX_REJOIN_DELAY;
5230 if (rejoin_delay < CLUSTER_MIN_REJOIN_DELAY)
5231 rejoin_delay = CLUSTER_MIN_REJOIN_DELAY;
5232
5233 if (new_state == CLUSTER_OK &&
5234 clusterNodeIsMaster(myself) &&
5235 mstime() - among_minority_time < rejoin_delay)
5236 {
5237 return;
5238 }
5239
5240 /* Change the state and log the event. */
5241 serverLog(new_state == CLUSTER_OK ? LL_NOTICE : LL_WARNING,
5242 "Cluster state changed: %s",
5243 new_state == CLUSTER_OK ? "ok" : "fail");
5244 server.cluster->state = new_state;
5245 }
5246}
5247
5248/* Remove all the shard channel related information not owned by the current shard. */
5249static inline void removeAllNotOwnedShardChannelSubscriptions(void) {
5250 if (!kvstoreSize(server.pubsubshard_channels)) return;
5251 clusterNode *currmaster = clusterNodeIsMaster(myself) ? myself : myself->slaveof;
5252 for (int j = 0; j < CLUSTER_SLOTS; j++) {
5253 if (server.cluster->slots[j] != currmaster) {
5254 removeChannelsInSlot(j);
5255 }
5256 }
5257}
5258
5259/* This function is called after the node startup in order to check if there
5260 * are any slots that we have keys for, but are assigned to no one. If so,
5261 * we take ownership of them. */
5262void clusterClaimUnassignedSlots(void) {
5263 if (nodeIsSlave(myself)) return;
5264
5265 int update_config = 0;
5266 for (int i = 0; i < CLUSTER_SLOTS; i++) {
5267 /* Skip if: no keys, already has an owner, or we are importing it. */
5268 if (!countKeysInSlot(i) ||
5269 server.cluster->slots[i] != NULL ||
5270 server.cluster->importing_slots_from[i] != NULL)
5271 {
5272 continue;
5273 }
5274
5275 /* If we are here data and cluster config don't agree, and we have
5276 * slot 'i' populated even if we are not importing it, nor anyone else
5277 * is assigned to it. Fix this condition by taking ownership. */
5278 update_config++;
5279 serverLog(LL_NOTICE, "I have keys for unassigned slot %d. "
5280 "Taking responsibility for it.", i);
5281 clusterAddSlot(myself, i);
5282 }
5283 if (update_config) clusterSaveConfigOrDie(1);
5284}
5285
5286/* -----------------------------------------------------------------------------
5287 * SLAVE nodes handling
5288 * -------------------------------------------------------------------------- */
5289
5290/* Set the specified node 'n' as master for this node.
5291 * If this node is currently a master, it is turned into a slave. */
5292void clusterSetMaster(clusterNode *n) {
5293 serverAssert(n != myself);
5294 serverAssert(myself->numslots == 0);
5295
5296 int was_master = clusterNodeIsMaster(myself);
5297 if (was_master) {
5298 myself->flags &= ~(CLUSTER_NODE_MASTER|CLUSTER_NODE_MIGRATE_TO);
5299 myself->flags |= CLUSTER_NODE_SLAVE;
5300 clusterCloseAllSlots();
5301 } else {
5302 if (myself->slaveof)
5303 clusterNodeRemoveSlave(myself->slaveof,myself);
5304 }
5305 myself->slaveof = n;
5306 updateShardId(myself, n->shard_id);
5307 clusterNodeAddSlave(n,myself);
5308 replicationSetMaster(n->ip, getNodeDefaultReplicationPort(n));
5309 removeAllNotOwnedShardChannelSubscriptions();
5310 resetManualFailover();
5311
5312 /* Cancel all ASM tasks when switching into slave */
5313 if (was_master) clusterAsmCancel(NULL, "switching to replica");
5314}
5315
5316/* -----------------------------------------------------------------------------
5317 * Nodes to string representation functions.
5318 * -------------------------------------------------------------------------- */
5319
5320struct redisNodeFlags {
5321 uint16_t flag;
5322 char *name;
5323};
5324
5325static struct redisNodeFlags redisNodeFlagsTable[] = {
5326 {CLUSTER_NODE_MYSELF, "myself,"},
5327 {CLUSTER_NODE_MASTER, "master,"},
5328 {CLUSTER_NODE_SLAVE, "slave,"},
5329 {CLUSTER_NODE_PFAIL, "fail?,"},
5330 {CLUSTER_NODE_FAIL, "fail,"},
5331 {CLUSTER_NODE_HANDSHAKE, "handshake,"},
5332 {CLUSTER_NODE_NOADDR, "noaddr,"},
5333 {CLUSTER_NODE_NOFAILOVER, "nofailover,"}
5334};
5335
5336/* Concatenate the comma separated list of node flags to the given SDS
5337 * string 'ci'. */
5338sds representClusterNodeFlags(sds ci, uint16_t flags) {
5339 size_t orig_len = sdslen(ci);
5340 int i, size = sizeof(redisNodeFlagsTable)/sizeof(struct redisNodeFlags);
5341 for (i = 0; i < size; i++) {
5342 struct redisNodeFlags *nodeflag = redisNodeFlagsTable + i;
5343 if (flags & nodeflag->flag) ci = sdscat(ci, nodeflag->name);
5344 }
5345 /* If no flag was added, add the "noflags" special flag. */
5346 if (sdslen(ci) == orig_len) ci = sdscat(ci,"noflags,");
5347 sdsIncrLen(ci,-1); /* Remove trailing comma. */
5348 return ci;
5349}
5350
5351/* Concatenate the slot ownership information to the given SDS string 'ci'.
5352 * If the slot ownership is in a contiguous block, it's represented as start-end pair,
5353 * else each slot is added separately. */
5354sds representSlotInfo(sds ci, uint16_t *slot_info_pairs, int slot_info_pairs_count) {
5355 for (int i = 0; i< slot_info_pairs_count; i+=2) {
5356 unsigned long start = slot_info_pairs[i];
5357 unsigned long end = slot_info_pairs[i+1];
5358 if (start == end) {
5359 ci = sdscatfmt(ci, " %i", start);
5360 } else {
5361 ci = sdscatfmt(ci, " %i-%i", start, end);
5362 }
5363 }
5364 return ci;
5365}
5366
5367/* Generate a csv-alike representation of the specified cluster node.
5368 * See clusterGenNodesDescription() top comment for more information.
5369 *
5370 * The function returns the string representation as an SDS string. */
5371sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary) {
5372 int j, start;
5373 sds ci;
5374 int port = clusterNodeClientPort(node, tls_primary);
5375
5376 /* Node coordinates */
5377 ci = sdscatlen(sdsempty(),node->name,CLUSTER_NAMELEN);
5378 ci = sdscatfmt(ci," %s:%i@%i",
5379 node->ip,
5380 port,
5381 node->cport);
5382 if (sdslen(node->hostname) != 0) {
5383 ci = sdscatfmt(ci,",%s", node->hostname);
5384 }
5385 /* Don't expose aux fields to any clients yet but do allow them
5386 * to be persisted to nodes.conf */
5387 if (c == NULL) {
5388 if (sdslen(node->hostname) == 0) {
5389 ci = sdscatfmt(ci,",", 1);
5390 }
5391 for (int i = af_count-1; i >=0; i--) {
5392 if ((tls_primary && i == af_tls_port) || (!tls_primary && i == af_tcp_port)) {
5393 continue;
5394 }
5395 if (auxFieldHandlers[i].isPresent(node)) {
5396 ci = sdscatprintf(ci, ",%s=", auxFieldHandlers[i].field);
5397 ci = auxFieldHandlers[i].getter(node, ci);
5398 }
5399 }
5400 }
5401
5402 /* Flags */
5403 ci = sdscatlen(ci," ",1);
5404 ci = representClusterNodeFlags(ci, node->flags);
5405
5406 /* Slave of... or just "-" */
5407 ci = sdscatlen(ci," ",1);
5408 if (node->slaveof)
5409 ci = sdscatlen(ci,node->slaveof->name,CLUSTER_NAMELEN);
5410 else
5411 ci = sdscatlen(ci,"-",1);
5412
5413 unsigned long long nodeEpoch = node->configEpoch;
5414 if (nodeIsSlave(node) && node->slaveof) {
5415 nodeEpoch = node->slaveof->configEpoch;
5416 }
5417 /* Latency from the POV of this node, config epoch, link status */
5418 ci = sdscatfmt(ci," %I %I %U %s",
5419 (long long) node->ping_sent,
5420 (long long) node->pong_received,
5421 nodeEpoch,
5422 (node->link || node->flags & CLUSTER_NODE_MYSELF) ?
5423 "connected" : "disconnected");
5424
5425 /* Slots served by this instance. If we already have slots info,
5426 * append it directly, otherwise, generate slots only if it has. */
5427 if (node->slot_info_pairs) {
5428 ci = representSlotInfo(ci, node->slot_info_pairs, node->slot_info_pairs_count);
5429 } else if (node->numslots > 0) {
5430 start = -1;
5431 for (j = 0; j < CLUSTER_SLOTS; j++) {
5432 int bit;
5433
5434 if ((bit = clusterNodeCoversSlot(node, j)) != 0) {
5435 if (start == -1) start = j;
5436 }
5437 if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {
5438 if (bit && j == CLUSTER_SLOTS-1) j++;
5439
5440 if (start == j-1) {
5441 ci = sdscatfmt(ci," %i",start);
5442 } else {
5443 ci = sdscatfmt(ci," %i-%i",start,j-1);
5444 }
5445 start = -1;
5446 }
5447 }
5448 }
5449
5450 /* Just for MYSELF node we also dump info about slots that
5451 * we are migrating to other instances or importing from other
5452 * instances. */
5453 if (node->flags & CLUSTER_NODE_MYSELF) {
5454 for (j = 0; j < CLUSTER_SLOTS; j++) {
5455 if (server.cluster->migrating_slots_to[j]) {
5456 ci = sdscatprintf(ci," [%d->-%.40s]",j,
5457 server.cluster->migrating_slots_to[j]->name);
5458 } else if (server.cluster->importing_slots_from[j]) {
5459 ci = sdscatprintf(ci," [%d-<-%.40s]",j,
5460 server.cluster->importing_slots_from[j]->name);
5461 }
5462 }
5463 }
5464 return ci;
5465}
5466
5467/* Generate the slot topology for all nodes and store the string representation
5468 * in the slots_info struct on the node. This is used to improve the efficiency
5469 * of clusterGenNodesDescription() because it removes looping of the slot space
5470 * for generating the slot info for each node individually. */
5471void clusterGenNodesSlotsInfo(int filter) {
5472 clusterNode *n = NULL;
5473 int start = -1;
5474
5475 for (int i = 0; i <= CLUSTER_SLOTS; i++) {
5476 /* Find start node and slot id. */
5477 if (n == NULL) {
5478 if (i == CLUSTER_SLOTS) break;
5479 n = server.cluster->slots[i];
5480 start = i;
5481 continue;
5482 }
5483
5484 /* Generate slots info when occur different node with start
5485 * or end of slot. */
5486 if (i == CLUSTER_SLOTS || n != server.cluster->slots[i]) {
5487 if (!(n->flags & filter)) {
5488 if (!n->slot_info_pairs) {
5489 n->slot_info_pairs = zmalloc(2 * n->numslots * sizeof(uint16_t));
5490 }
5491 serverAssert((n->slot_info_pairs_count + 1) < (2 * n->numslots));
5492 n->slot_info_pairs[n->slot_info_pairs_count++] = start;
5493 n->slot_info_pairs[n->slot_info_pairs_count++] = i-1;
5494 }
5495 if (i == CLUSTER_SLOTS) break;
5496 n = server.cluster->slots[i];
5497 start = i;
5498 }
5499 }
5500}
5501
5502void clusterFreeNodesSlotsInfo(clusterNode *n) {
5503 zfree(n->slot_info_pairs);
5504 n->slot_info_pairs = NULL;
5505 n->slot_info_pairs_count = 0;
5506}
5507
5508/* Generate a csv-alike representation of the nodes we are aware of,
5509 * including the "myself" node, and return an SDS string containing the
5510 * representation (it is up to the caller to free it).
5511 *
5512 * All the nodes matching at least one of the node flags specified in
5513 * "filter" are excluded from the output, so using zero as a filter will
5514 * include all the known nodes in the representation, including nodes in
5515 * the HANDSHAKE state.
5516 *
5517 * Setting tls_primary to 1 to put TLS port in the main <ip>:<port>
5518 * field and put TCP port in aux field, instead of the opposite way.
5519 *
5520 * The representation obtained using this function is used for the output
5521 * of the CLUSTER NODES function, and as format for the cluster
5522 * configuration file (nodes.conf) for a given node. */
5523sds clusterGenNodesDescription(client *c, int filter, int tls_primary) {
5524 sds ci = sdsempty(), ni;
5525 dictIterator di;
5526 dictEntry *de;
5527
5528 /* Generate all nodes slots info firstly. */
5529 clusterGenNodesSlotsInfo(filter);
5530
5531 dictInitSafeIterator(&di, server.cluster->nodes);
5532 while((de = dictNext(&di)) != NULL) {
5533 clusterNode *node = dictGetVal(de);
5534
5535 if (node->flags & filter) continue;
5536 ni = clusterGenNodeDescription(c, node, tls_primary);
5537 ci = sdscatsds(ci,ni);
5538 sdsfree(ni);
5539 ci = sdscatlen(ci,"\n",1);
5540
5541 /* Release slots info. */
5542 clusterFreeNodesSlotsInfo(node);
5543 }
5544 dictResetIterator(&di);
5545 return ci;
5546}
5547
5548/* Add to the output buffer of the given client the description of the given cluster link.
5549 * The description is a map with each entry being an attribute of the link. */
5550void addReplyClusterLinkDescription(client *c, clusterLink *link) {
5551 addReplyMapLen(c, 6);
5552
5553 addReplyBulkCString(c, "direction");
5554 addReplyBulkCString(c, link->inbound ? "from" : "to");
5555
5556 /* addReplyClusterLinkDescription is only called for links that have been
5557 * associated with nodes. The association is always bi-directional, so
5558 * in addReplyClusterLinkDescription, link->node should never be NULL. */
5559 serverAssert(link->node);
5560 sds node_name = sdsnewlen(link->node->name, CLUSTER_NAMELEN);
5561 addReplyBulkCString(c, "node");
5562 addReplyBulkCString(c, node_name);
5563 sdsfree(node_name);
5564
5565 addReplyBulkCString(c, "create-time");
5566 addReplyLongLong(c, link->ctime);
5567
5568 char events[3], *p;
5569 p = events;
5570 if (link->conn) {
5571 if (connHasReadHandler(link->conn)) *p++ = 'r';
5572 if (connHasWriteHandler(link->conn)) *p++ = 'w';
5573 }
5574 *p = '\0';
5575 addReplyBulkCString(c, "events");
5576 addReplyBulkCString(c, events);
5577
5578 addReplyBulkCString(c, "send-buffer-allocated");
5579 addReplyLongLong(c, link->send_msg_queue_mem);
5580
5581 addReplyBulkCString(c, "send-buffer-used");
5582 addReplyLongLong(c, link->send_msg_queue_mem);
5583}
5584
5585/* Add to the output buffer of the given client an array of cluster link descriptions,
5586 * with array entry being a description of a single current cluster link. */
5587void addReplyClusterLinksDescription(client *c) {
5588 dictIterator di;
5589 dictEntry *de;
5590 void *arraylen_ptr = NULL;
5591 int num_links = 0;
5592
5593 arraylen_ptr = addReplyDeferredLen(c);
5594
5595 dictInitSafeIterator(&di, server.cluster->nodes);
5596 while((de = dictNext(&di)) != NULL) {
5597 clusterNode *node = dictGetVal(de);
5598 if (node->link) {
5599 num_links++;
5600 addReplyClusterLinkDescription(c, node->link);
5601 }
5602 if (node->inbound_link) {
5603 num_links++;
5604 addReplyClusterLinkDescription(c, node->inbound_link);
5605 }
5606 }
5607 dictResetIterator(&di);
5608
5609 setDeferredArrayLen(c, arraylen_ptr, num_links);
5610}
5611
5612/* -----------------------------------------------------------------------------
5613 * CLUSTER command
5614 * -------------------------------------------------------------------------- */
5615
5616const char *clusterGetMessageTypeString(int type) {
5617 switch(type) {
5618 case CLUSTERMSG_TYPE_PING: return "ping";
5619 case CLUSTERMSG_TYPE_PONG: return "pong";
5620 case CLUSTERMSG_TYPE_MEET: return "meet";
5621 case CLUSTERMSG_TYPE_FAIL: return "fail";
5622 case CLUSTERMSG_TYPE_PUBLISH: return "publish";
5623 case CLUSTERMSG_TYPE_PUBLISHSHARD: return "publishshard";
5624 case CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST: return "auth-req";
5625 case CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK: return "auth-ack";
5626 case CLUSTERMSG_TYPE_UPDATE: return "update";
5627 case CLUSTERMSG_TYPE_MFSTART: return "mfstart";
5628 case CLUSTERMSG_TYPE_MODULE: return "module";
5629 }
5630 return "unknown";
5631}
5632
5633int checkSlotAssignmentsOrReply(client *c, unsigned char *slots, int del, int start_slot, int end_slot) {
5634 int slot;
5635 for (slot = start_slot; slot <= end_slot; slot++) {
5636 if (del && server.cluster->slots[slot] == NULL) {
5637 addReplyErrorFormat(c,"Slot %d is already unassigned", slot);
5638 return C_ERR;
5639 } else if (!del && server.cluster->slots[slot]) {
5640 addReplyErrorFormat(c,"Slot %d is already busy", slot);
5641 return C_ERR;
5642 }
5643 if (slots[slot]++ == 1) {
5644 addReplyErrorFormat(c,"Slot %d specified multiple times",(int)slot);
5645 return C_ERR;
5646 }
5647 }
5648 return C_OK;
5649}
5650
5651void clusterUpdateSlots(client *c, unsigned char *slots, int del) {
5652 int j;
5653 for (j = 0; j < CLUSTER_SLOTS; j++) {
5654 if (slots[j]) {
5655 int retval;
5656
5657 /* If this slot was set as importing we can clear this
5658 * state as now we are the real owner of the slot. */
5659 if (server.cluster->importing_slots_from[j])
5660 server.cluster->importing_slots_from[j] = NULL;
5661
5662 /* Cancel any ASM task that overlaps with the slot. */
5663 clusterAsmCancelBySlot(j, "slots configuration updated");
5664
5665 retval = del ? clusterDelSlot(j) :
5666 clusterAddSlot(myself,j);
5667 serverAssertWithInfo(c,NULL,retval == C_OK);
5668 }
5669 }
5670}
5671
5672int clusterGetShardCount(void) {
5673 return dictSize(server.cluster->shards);
5674}
5675
5676void *clusterGetShardIterator(void) {
5677 return dictGetSafeIterator(server.cluster->shards);
5678}
5679
5680void *clusterNextShardHandle(void *shard_iterator) {
5681 dictEntry *de = dictNext(shard_iterator);
5682 if(de == NULL) return NULL;
5683 return dictGetVal(de);
5684}
5685
5686void clusterFreeShardIterator(void *shard_iterator) {
5687 dictReleaseIterator(shard_iterator);
5688}
5689
5690int clusterNodeHasSlotInfo(clusterNode *n) {
5691 return n->slot_info_pairs != NULL;
5692}
5693
5694int clusterNodeSlotInfoCount(clusterNode *n) {
5695 return n->slot_info_pairs_count;
5696}
5697
5698uint16_t clusterNodeSlotInfoEntry(clusterNode *n, int idx) {
5699 return n->slot_info_pairs[idx];
5700}
5701
5702int clusterGetShardNodeCount(void *shard) {
5703 return listLength((list*)shard);
5704}
5705
5706void *clusterShardHandleGetNodeIterator(void *shard) {
5707 listIter *li = zmalloc(sizeof(listIter));
5708 listRewind((list*)shard, li);
5709 return li;
5710}
5711
5712void clusterShardNodeIteratorFree(void *node_iterator) {
5713 zfree(node_iterator);
5714}
5715
5716clusterNode *clusterShardNodeIteratorNext(void *node_iterator) {
5717 listNode *item = listNext((listIter*)node_iterator);
5718 if (item == NULL) return NULL;
5719 return listNodeValue(item);
5720}
5721
5722clusterNode *clusterShardNodeFirst(void *shard) {
5723 listNode *item = listFirst((list*)shard);
5724 if (item == NULL) return NULL;
5725 return listNodeValue(item);
5726}
5727
5728int clusterNodeTcpPort(clusterNode *node) {
5729 return node->tcp_port;
5730}
5731
5732int clusterNodeTlsPort(clusterNode *node) {
5733 return node->tls_port;
5734}
5735
5736sds genClusterInfoString(void) {
5737 sds info = sdsempty();
5738 char *statestr[] = {"ok","fail"};
5739 int slots_assigned = 0, slots_ok = 0, slots_pfail = 0, slots_fail = 0;
5740 uint64_t myepoch;
5741 int j;
5742
5743 for (j = 0; j < CLUSTER_SLOTS; j++) {
5744 clusterNode *n = server.cluster->slots[j];
5745
5746 if (n == NULL) continue;
5747 slots_assigned++;
5748 if (nodeFailed(n)) {
5749 slots_fail++;
5750 } else if (nodeTimedOut(n)) {
5751 slots_pfail++;
5752 } else {
5753 slots_ok++;
5754 }
5755 }
5756
5757 myepoch = (nodeIsSlave(myself) && myself->slaveof) ?
5758 myself->slaveof->configEpoch : myself->configEpoch;
5759
5760 info = sdscatprintf(info,
5761 "cluster_state:%s\r\n"
5762 "cluster_slots_assigned:%d\r\n"
5763 "cluster_slots_ok:%d\r\n"
5764 "cluster_slots_pfail:%d\r\n"
5765 "cluster_slots_fail:%d\r\n"
5766 "cluster_known_nodes:%lu\r\n"
5767 "cluster_size:%d\r\n"
5768 "cluster_current_epoch:%llu\r\n"
5769 "cluster_my_epoch:%llu\r\n"
5770 , statestr[server.cluster->state],
5771 slots_assigned,
5772 slots_ok,
5773 slots_pfail,
5774 slots_fail,
5775 dictSize(server.cluster->nodes),
5776 server.cluster->size,
5777 (unsigned long long) server.cluster->currentEpoch,
5778 (unsigned long long) myepoch
5779 );
5780
5781 /* Show stats about messages sent and received. */
5782 long long tot_msg_sent = 0;
5783 long long tot_msg_received = 0;
5784
5785 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
5786 if (server.cluster->stats_bus_messages_sent[i] == 0) continue;
5787 tot_msg_sent += server.cluster->stats_bus_messages_sent[i];
5788 info = sdscatprintf(info,
5789 "cluster_stats_messages_%s_sent:%lld\r\n",
5790 clusterGetMessageTypeString(i),
5791 server.cluster->stats_bus_messages_sent[i]);
5792 }
5793 info = sdscatprintf(info,
5794 "cluster_stats_messages_sent:%lld\r\n", tot_msg_sent);
5795
5796 for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) {
5797 if (server.cluster->stats_bus_messages_received[i] == 0) continue;
5798 tot_msg_received += server.cluster->stats_bus_messages_received[i];
5799 info = sdscatprintf(info,
5800 "cluster_stats_messages_%s_received:%lld\r\n",
5801 clusterGetMessageTypeString(i),
5802 server.cluster->stats_bus_messages_received[i]);
5803 }
5804 info = sdscatprintf(info,
5805 "cluster_stats_messages_received:%lld\r\n", tot_msg_received);
5806
5807 info = sdscatprintf(info,
5808 "total_cluster_links_buffer_limit_exceeded:%llu\r\n",
5809 server.cluster->stat_cluster_links_buffer_limit_exceeded);
5810
5811 info = asmCatInfoString(info);
5812
5813 return info;
5814}
5815
5816
5817void removeChannelsInSlot(unsigned int slot) {
5818 if (countChannelsInSlot(slot) == 0) return;
5819
5820 pubsubShardUnsubscribeAllChannelsInSlot(slot);
5821}
5822
5823/* Get the count of the channels for a given slot. */
5824unsigned int countChannelsInSlot(unsigned int hashslot) {
5825 return kvstoreDictSize(server.pubsubshard_channels, hashslot);
5826}
5827
5828int clusterNodeIsMyself(clusterNode *n) {
5829 return n == server.cluster->myself;
5830}
5831
5832clusterNode *getMyClusterNode(void) {
5833 return server.cluster->myself;
5834}
5835
5836int clusterManualFailoverTimeLimit(void) {
5837 return server.cluster->mf_end;
5838}
5839
5840int getClusterSize(void) {
5841 return dictSize(server.cluster->nodes);
5842}
5843
5844int getMyShardSlotCount(void) {
5845 if (!nodeIsSlave(server.cluster->myself)) {
5846 return server.cluster->myself->numslots;
5847 } else if (server.cluster->myself->slaveof) {
5848 return server.cluster->myself->slaveof->numslots;
5849 } else {
5850 return 0;
5851 }
5852}
5853
5854char **getClusterNodesList(size_t *numnodes) {
5855 size_t count = dictSize(server.cluster->nodes);
5856 char **ids = zmalloc((count+1)*CLUSTER_NAMELEN);
5857 dictIterator di;
5858 dictEntry *de;
5859 int j = 0;
5860
5861 dictInitIterator(&di, server.cluster->nodes);
5862 while((de = dictNext(&di)) != NULL) {
5863 clusterNode *node = dictGetVal(de);
5864 if (node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) continue;
5865 ids[j] = zmalloc(CLUSTER_NAMELEN);
5866 memcpy(ids[j],node->name,CLUSTER_NAMELEN);
5867 j++;
5868 }
5869 *numnodes = j;
5870 ids[j] = NULL; /* Null term so that FreeClusterNodesList does not need
5871 * to also get the count argument. */
5872 dictResetIterator(&di);
5873 return ids;
5874}
5875
5876int clusterNodeIsMaster(clusterNode *n) {
5877 return n->flags & CLUSTER_NODE_MASTER;
5878}
5879
5880int handleDebugClusterCommand(client *c) {
5881 if (c->argc != 5 ||
5882 strcasecmp(c->argv[1]->ptr, "CLUSTERLINK") ||
5883 strcasecmp(c->argv[2]->ptr, "KILL")) {
5884 return 0;
5885 }
5886
5887 if (!server.cluster_enabled) {
5888 addReplyError(c, "Debug option only available for cluster mode enabled setup!");
5889 return 1;
5890 }
5891
5892 /* Find the node. */
5893 clusterNode *n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
5894 if (!n) {
5895 addReplyErrorFormat(c, "Unknown node %s", (char *) c->argv[4]->ptr);
5896 return 1;
5897 }
5898
5899 /* Terminate the link based on the direction or all. */
5900 if (!strcasecmp(c->argv[3]->ptr, "from")) {
5901 if (n->inbound_link) freeClusterLink(n->inbound_link);
5902 } else if (!strcasecmp(c->argv[3]->ptr, "to")) {
5903 if (n->link) freeClusterLink(n->link);
5904 } else if (!strcasecmp(c->argv[3]->ptr, "all")) {
5905 if (n->link) freeClusterLink(n->link);
5906 if (n->inbound_link) freeClusterLink(n->inbound_link);
5907 } else {
5908 addReplyErrorFormat(c, "Unknown direction %s", (char *) c->argv[3]->ptr);
5909 }
5910 addReply(c, shared.ok);
5911
5912 return 1;
5913}
5914
5915int clusterNodePending(clusterNode *node) {
5916 return node->flags & (CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE);
5917}
5918
5919char *clusterNodeIp(clusterNode *node) {
5920 return node->ip;
5921}
5922
5923int clusterNodeIsSlave(clusterNode *node) {
5924 return node->flags & CLUSTER_NODE_SLAVE;
5925}
5926
5927clusterNode *clusterNodeGetSlaveof(clusterNode *node) {
5928 return node->slaveof;
5929}
5930
5931clusterNode *clusterNodeGetMaster(clusterNode *node) {
5932 while (node->slaveof != NULL) node = node->slaveof;
5933 return node;
5934}
5935
5936char *clusterNodeGetName(clusterNode *node) {
5937 return node->name;
5938}
5939
5940int clusterNodeTimedOut(clusterNode *node) {
5941 return nodeTimedOut(node);
5942}
5943
5944int clusterNodeIsFailing(clusterNode *node) {
5945 return nodeFailed(node);
5946}
5947
5948int clusterNodeIsNoFailover(clusterNode *node) {
5949 return node->flags & CLUSTER_NODE_NOFAILOVER;
5950}
5951
5952const char **clusterDebugCommandExtendedHelp(void) {
5953 static const char *help[] = {
5954 "CLUSTERLINK KILL <to|from|all> <node-id>",
5955 " Kills the link based on the direction to/from (both) with the provided node.",
5956 NULL
5957 };
5958
5959 return help;
5960}
5961
5962char *clusterNodeGetShardId(clusterNode *node) {
5963 return node->shard_id;
5964}
5965
5966int clusterCommandSpecial(client *c) {
5967 if (!strcasecmp(c->argv[1]->ptr,"meet") && (c->argc == 4 || c->argc == 5)) {
5968 /* CLUSTER MEET <ip> <port> [cport] */
5969 long long port, cport;
5970
5971 if (getLongLongFromObject(c->argv[3], &port) != C_OK) {
5972 addReplyErrorFormat(c,"Invalid base port specified: %s",
5973 (char*)c->argv[3]->ptr);
5974 return 1;
5975 }
5976
5977 if (c->argc == 5) {
5978 if (getLongLongFromObject(c->argv[4], &cport) != C_OK) {
5979 addReplyErrorFormat(c,"Invalid bus port specified: %s",
5980 (char*)c->argv[4]->ptr);
5981 return 1;
5982 }
5983 } else {
5984 cport = port + CLUSTER_PORT_INCR;
5985 }
5986
5987 if (clusterStartHandshake(c->argv[2]->ptr,port,cport) == 0 &&
5988 errno == EINVAL)
5989 {
5990 addReplyErrorFormat(c,"Invalid node address specified: %s:%s",
5991 (char*)c->argv[2]->ptr, (char*)c->argv[3]->ptr);
5992 } else {
5993 addReply(c,shared.ok);
5994 }
5995 } else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
5996 /* CLUSTER FLUSHSLOTS */
5997 if (kvstoreSize(server.db[0].keys) != 0) {
5998 addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
5999 return 1;
6000 }
6001 clusterDelNodeSlots(myself);
6002 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
6003 addReply(c,shared.ok);
6004 } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
6005 !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) {
6006 /* CLUSTER ADDSLOTS <slot> [slot] ... */
6007 /* CLUSTER DELSLOTS <slot> [slot] ... */
6008 int j, slot;
6009 unsigned char *slots = zmalloc(CLUSTER_SLOTS);
6010 int del = !strcasecmp(c->argv[1]->ptr,"delslots");
6011
6012 memset(slots,0,CLUSTER_SLOTS);
6013 /* Check that all the arguments are parseable.*/
6014 for (j = 2; j < c->argc; j++) {
6015 if ((slot = getSlotOrReply(c,c->argv[j])) == C_ERR) {
6016 zfree(slots);
6017 return 1;
6018 }
6019 }
6020 /* Check that the slots are not already busy. */
6021 for (j = 2; j < c->argc; j++) {
6022 slot = getSlotOrReply(c,c->argv[j]);
6023 if (checkSlotAssignmentsOrReply(c, slots, del, slot, slot) == C_ERR) {
6024 zfree(slots);
6025 return 1;
6026 }
6027 }
6028 clusterUpdateSlots(c, slots, del);
6029 zfree(slots);
6030 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
6031 addReply(c,shared.ok);
6032 } else if ((!strcasecmp(c->argv[1]->ptr,"addslotsrange") ||
6033 !strcasecmp(c->argv[1]->ptr,"delslotsrange")) && c->argc >= 4) {
6034 if (c->argc % 2 == 1) {
6035 addReplyErrorArity(c);
6036 return 1;
6037 }
6038 /* CLUSTER ADDSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...] */
6039 /* CLUSTER DELSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...] */
6040 int j, startslot, endslot;
6041 unsigned char *slots = zmalloc(CLUSTER_SLOTS);
6042 int del = !strcasecmp(c->argv[1]->ptr,"delslotsrange");
6043
6044 memset(slots,0,CLUSTER_SLOTS);
6045 /* Check that all the arguments are parseable and that all the
6046 * slots are not already busy. */
6047 for (j = 2; j < c->argc; j += 2) {
6048 if ((startslot = getSlotOrReply(c,c->argv[j])) == C_ERR) {
6049 zfree(slots);
6050 return 1;
6051 }
6052 if ((endslot = getSlotOrReply(c,c->argv[j+1])) == C_ERR) {
6053 zfree(slots);
6054 return 1;
6055 }
6056 if (startslot > endslot) {
6057 addReplyErrorFormat(c,"start slot number %d is greater than end slot number %d", startslot, endslot);
6058 zfree(slots);
6059 return 1;
6060 }
6061
6062 if (checkSlotAssignmentsOrReply(c, slots, del, startslot, endslot) == C_ERR) {
6063 zfree(slots);
6064 return 1;
6065 }
6066 }
6067 clusterUpdateSlots(c, slots, del);
6068 zfree(slots);
6069 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
6070 addReply(c,shared.ok);
6071 } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) {
6072 /* SETSLOT 10 MIGRATING <node ID> */
6073 /* SETSLOT 10 IMPORTING <node ID> */
6074 /* SETSLOT 10 STABLE */
6075 /* SETSLOT 10 NODE <node ID> */
6076 int slot;
6077 clusterNode *n;
6078
6079 if (nodeIsSlave(myself)) {
6080 addReplyError(c,"Please use SETSLOT only with masters.");
6081 return 1;
6082 }
6083
6084 if ((slot = getSlotOrReply(c, c->argv[2])) == -1) return 1;
6085
6086 /* Don't allow legacy slot migration if the slot is in an ASM task. */
6087 if (isSlotInAsmTask(slot)) {
6088 addReplyErrorFormat(c, "Slot %d is currently in an active atomic slot migration. "
6089 "CLUSTER SETSLOT cannot be used at this time. To perform a legacy slot migration "
6090 "instead, first cancel the ongoing task with CLUSTER MIGRATION CANCEL", slot);
6091 return 1;
6092 }
6093
6094 if (isSlotInTrimJob(slot)) {
6095 addReplyErrorFormat(c, "There is a pending trim job for slot %d. "
6096 "Most probably, this is due to a failed atomic slot migration. "
6097 "CLUSTER SETSLOT cannot be used at this time. "
6098 "Please retry later once the trim job is completed.", slot);
6099 return 1;
6100 }
6101
6102 if (!strcasecmp(c->argv[3]->ptr,"migrating") && c->argc == 5) {
6103 if (server.cluster->slots[slot] != myself) {
6104 addReplyErrorFormat(c,"I'm not the owner of hash slot %u",slot);
6105 return 1;
6106 }
6107 n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
6108 if (n == NULL) {
6109 addReplyErrorFormat(c,"I don't know about node %s",
6110 (char*)c->argv[4]->ptr);
6111 return 1;
6112 }
6113 if (nodeIsSlave(n)) {
6114 addReplyError(c,"Target node is not a master");
6115 return 1;
6116 }
6117 server.cluster->migrating_slots_to[slot] = n;
6118 } else if (!strcasecmp(c->argv[3]->ptr,"importing") && c->argc == 5) {
6119 if (server.cluster->slots[slot] == myself) {
6120 addReplyErrorFormat(c,
6121 "I'm already the owner of hash slot %u",slot);
6122 return 1;
6123 }
6124 n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
6125 if (n == NULL) {
6126 addReplyErrorFormat(c,"I don't know about node %s",
6127 (char*)c->argv[4]->ptr);
6128 return 1;
6129 }
6130 if (nodeIsSlave(n)) {
6131 addReplyError(c,"Target node is not a master");
6132 return 1;
6133 }
6134 server.cluster->importing_slots_from[slot] = n;
6135 } else if (!strcasecmp(c->argv[3]->ptr,"stable") && c->argc == 4) {
6136 /* CLUSTER SETSLOT <SLOT> STABLE */
6137 server.cluster->importing_slots_from[slot] = NULL;
6138 server.cluster->migrating_slots_to[slot] = NULL;
6139 } else if (!strcasecmp(c->argv[3]->ptr,"node") && c->argc == 5) {
6140 /* CLUSTER SETSLOT <SLOT> NODE <NODE ID> */
6141 n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
6142 if (!n) {
6143 addReplyErrorFormat(c,"Unknown node %s",
6144 (char*)c->argv[4]->ptr);
6145 return 1;
6146 }
6147 if (nodeIsSlave(n)) {
6148 addReplyError(c,"Target node is not a master");
6149 return 1;
6150 }
6151 /* If this hash slot was served by 'myself' before to switch
6152 * make sure there are no longer local keys for this hash slot. */
6153 if (server.cluster->slots[slot] == myself && n != myself) {
6154 if (countKeysInSlot(slot) != 0) {
6155 addReplyErrorFormat(c,
6156 "Can't assign hashslot %d to a different node "
6157 "while I still hold keys for this hash slot.", slot);
6158 return 1;
6159 }
6160 }
6161 /* If this slot is in migrating status but we have no keys
6162 * for it assigning the slot to another node will clear
6163 * the migrating status. */
6164 if (countKeysInSlot(slot) == 0 &&
6165 server.cluster->migrating_slots_to[slot])
6166 server.cluster->migrating_slots_to[slot] = NULL;
6167
6168 int slot_was_mine = server.cluster->slots[slot] == myself;
6169 clusterDelSlot(slot);
6170 clusterAddSlot(n,slot);
6171
6172 /* If we are a master left without slots, we should turn into a
6173 * replica of the new master. */
6174 if (slot_was_mine &&
6175 n != myself &&
6176 myself->numslots == 0 &&
6177 server.cluster_allow_replica_migration) {
6178 serverLog(LL_NOTICE,
6179 "Configuration change detected. Reconfiguring myself "
6180 "as a replica of %.40s (%s)", n->name, n->human_nodename);
6181 clusterSetMaster(n);
6182 /* Save the new config and broadcast it to the other nodes. */
6183 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG |
6184 CLUSTER_TODO_UPDATE_STATE |
6185 CLUSTER_TODO_FSYNC_CONFIG |
6186 CLUSTER_TODO_BROADCAST_PONG);
6187 }
6188
6189 /* If this node was importing this slot, assigning the slot to
6190 * itself also clears the importing status. */
6191 if (n == myself &&
6192 server.cluster->importing_slots_from[slot]) {
6193 /* This slot was manually migrated, set this node configEpoch
6194 * to a new epoch so that the new version can be propagated
6195 * by the cluster.
6196 *
6197 * Note that if this ever results in a collision with another
6198 * node getting the same configEpoch, for example because a
6199 * failover happens at the same time we close the slot, the
6200 * configEpoch collision resolution will fix it assigning
6201 * a different epoch to each node. */
6202 if (clusterBumpConfigEpochWithoutConsensus() == C_OK) {
6203 serverLog(LL_NOTICE,
6204 "configEpoch updated after importing slot %d", slot);
6205 }
6206 server.cluster->importing_slots_from[slot] = NULL;
6207 /* After importing this slot, let the other nodes know as
6208 * soon as possible. */
6209 clusterDoBeforeSleep(CLUSTER_TODO_BROADCAST_PONG);
6210 }
6211 } else {
6212 addReplyError(c,
6213 "Invalid CLUSTER SETSLOT action or number of arguments. Try CLUSTER HELP");
6214 return 1;
6215 }
6216 clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE);
6217 addReply(c,shared.ok);
6218 } else if (!strcasecmp(c->argv[1]->ptr,"bumpepoch") && c->argc == 2) {
6219 /* CLUSTER BUMPEPOCH */
6220 int retval = clusterBumpConfigEpochWithoutConsensus();
6221 sds reply = sdscatprintf(sdsempty(),"+%s %llu\r\n",
6222 (retval == C_OK) ? "BUMPED" : "STILL",
6223 (unsigned long long) myself->configEpoch);
6224 addReplySds(c,reply);
6225 } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) {
6226 int retval = clusterSaveConfig(1);
6227
6228 if (retval == 0)
6229 addReply(c,shared.ok);
6230 else
6231 addReplyErrorFormat(c,"error saving the cluster node config: %s",
6232 strerror(errno));
6233 } else if (!strcasecmp(c->argv[1]->ptr,"forget") && c->argc == 3) {
6234 /* CLUSTER FORGET <NODE ID> */
6235 clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
6236 if (!n) {
6237 if (clusterBlacklistExists((char*)c->argv[2]->ptr, sdslen(c->argv[2]->ptr)))
6238 /* Already forgotten. The deletion may have been gossipped by
6239 * another node, so we pretend it succeeded. */
6240 addReply(c,shared.ok);
6241 else
6242 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
6243 return 1;
6244 } else if (n == myself) {
6245 addReplyError(c,"I tried hard but I can't forget myself...");
6246 return 1;
6247 } else if (nodeIsSlave(myself) && myself->slaveof == n) {
6248 addReplyError(c,"Can't forget my master!");
6249 return 1;
6250 }
6251 clusterBlacklistAddNode(n);
6252 clusterDelNode(n);
6253 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
6254 CLUSTER_TODO_SAVE_CONFIG);
6255 addReply(c,shared.ok);
6256 } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
6257 /* CLUSTER REPLICATE <NODE ID> */
6258 /* Lookup the specified node in our table. */
6259 clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
6260 if (!n) {
6261 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
6262 return 1;
6263 }
6264
6265 /* I can't replicate myself. */
6266 if (n == myself) {
6267 addReplyError(c,"Can't replicate myself");
6268 return 1;
6269 }
6270
6271 /* Can't replicate a slave. */
6272 if (nodeIsSlave(n)) {
6273 addReplyError(c,"I can only replicate a master, not a replica.");
6274 return 1;
6275 }
6276
6277 /* If the instance is currently a master, it should have no assigned
6278 * slots nor keys to accept to replicate some other node.
6279 * Slaves can switch to another master without issues. */
6280 if (clusterNodeIsMaster(myself) &&
6281 (myself->numslots != 0 || kvstoreSize(server.db[0].keys) != 0)) {
6282 addReplyError(c,
6283 "To set a master the node must be empty and "
6284 "without assigned slots.");
6285 return 1;
6286 }
6287
6288 /* Set the master. */
6289 clusterSetMaster(n);
6290 /* Save the new config and broadcast it to the other nodes. */
6291 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
6292 CLUSTER_TODO_SAVE_CONFIG|
6293 CLUSTER_TODO_BROADCAST_PONG);
6294 addReply(c,shared.ok);
6295 } else if (!strcasecmp(c->argv[1]->ptr,"count-failure-reports") &&
6296 c->argc == 3)
6297 {
6298 /* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
6299 clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
6300
6301 if (!n) {
6302 addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
6303 return 1;
6304 } else {
6305 addReplyLongLong(c,clusterNodeFailureReportsCount(n));
6306 }
6307 } else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
6308 (c->argc == 2 || c->argc == 3))
6309 {
6310 /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
6311 int force = 0, takeover = 0;
6312
6313 if (c->argc == 3) {
6314 if (!strcasecmp(c->argv[2]->ptr,"force")) {
6315 force = 1;
6316 } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
6317 takeover = 1;
6318 force = 1; /* Takeover also implies force. */
6319 } else {
6320 addReplyErrorObject(c,shared.syntaxerr);
6321 return 1;
6322 }
6323 }
6324
6325 /* Check preconditions. */
6326 if (clusterNodeIsMaster(myself)) {
6327 addReplyError(c,"You should send CLUSTER FAILOVER to a replica");
6328 return 1;
6329 } else if (myself->slaveof == NULL) {
6330 addReplyError(c,"I'm a replica but my master is unknown to me");
6331 return 1;
6332 } else if (!force &&
6333 (nodeFailed(myself->slaveof) ||
6334 myself->slaveof->link == NULL))
6335 {
6336 addReplyError(c,"Master is down or failed, "
6337 "please use CLUSTER FAILOVER FORCE");
6338 return 1;
6339 }
6340 resetManualFailover();
6341 server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
6342
6343 if (takeover) {
6344 /* A takeover does not perform any initial check. It just
6345 * generates a new configuration epoch for this node without
6346 * consensus, claims the master's slots, and broadcast the new
6347 * configuration. */
6348 serverLog(LL_NOTICE,"Taking over the master (user request).");
6349 clusterBumpConfigEpochWithoutConsensus();
6350 clusterFailoverReplaceYourMaster();
6351 } else if (force) {
6352 /* If this is a forced failover, we don't need to talk with our
6353 * master to agree about the offset. We just failover taking over
6354 * it without coordination. */
6355 serverLog(LL_NOTICE,"Forced failover user request accepted.");
6356 server.cluster->mf_can_start = 1;
6357 } else {
6358 serverLog(LL_NOTICE,"Manual failover user request accepted.");
6359 clusterSendMFStart(myself->slaveof);
6360 }
6361 addReply(c,shared.ok);
6362 } else if (!strcasecmp(c->argv[1]->ptr,"set-config-epoch") && c->argc == 3)
6363 {
6364 /* CLUSTER SET-CONFIG-EPOCH <epoch>
6365 *
6366 * The user is allowed to set the config epoch only when a node is
6367 * totally fresh: no config epoch, no other known node, and so forth.
6368 * This happens at cluster creation time to start with a cluster where
6369 * every node has a different node ID, without to rely on the conflicts
6370 * resolution system which is too slow when a big cluster is created. */
6371 long long epoch;
6372
6373 if (getLongLongFromObjectOrReply(c,c->argv[2],&epoch,NULL) != C_OK)
6374 return 1;
6375
6376 if (epoch < 0) {
6377 addReplyErrorFormat(c,"Invalid config epoch specified: %lld",epoch);
6378 } else if (dictSize(server.cluster->nodes) > 1) {
6379 addReplyError(c,"The user can assign a config epoch only when the "
6380 "node does not know any other node.");
6381 } else if (myself->configEpoch != 0) {
6382 addReplyError(c,"Node config epoch is already non-zero");
6383 } else {
6384 myself->configEpoch = epoch;
6385 serverLog(LL_NOTICE,
6386 "configEpoch set to %llu via CLUSTER SET-CONFIG-EPOCH",
6387 (unsigned long long) myself->configEpoch);
6388
6389 if (server.cluster->currentEpoch < (uint64_t)epoch)
6390 server.cluster->currentEpoch = epoch;
6391 /* No need to fsync the config here since in the unlucky event
6392 * of a failure to persist the config, the conflict resolution code
6393 * will assign a unique config to this node. */
6394 clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|
6395 CLUSTER_TODO_SAVE_CONFIG);
6396 addReply(c,shared.ok);
6397 }
6398 } else if (!strcasecmp(c->argv[1]->ptr,"reset") &&
6399 (c->argc == 2 || c->argc == 3))
6400 {
6401 /* CLUSTER RESET [SOFT|HARD] */
6402 int hard = 0;
6403
6404 /* Parse soft/hard argument. Default is soft. */
6405 if (c->argc == 3) {
6406 if (!strcasecmp(c->argv[2]->ptr,"hard")) {
6407 hard = 1;
6408 } else if (!strcasecmp(c->argv[2]->ptr,"soft")) {
6409 hard = 0;
6410 } else {
6411 addReplyErrorObject(c,shared.syntaxerr);
6412 return 1;
6413 }
6414 }
6415
6416 /* Slaves can be reset while containing data, but not master nodes
6417 * that must be empty. */
6418 if (clusterNodeIsMaster(myself) && kvstoreSize(c->db->keys) != 0) {
6419 addReplyError(c,"CLUSTER RESET can't be called with "
6420 "master nodes containing keys");
6421 return 1;
6422 }
6423 clusterReset(hard);
6424 addReply(c,shared.ok);
6425 } else if (!strcasecmp(c->argv[1]->ptr,"links") && c->argc == 2) {
6426 /* CLUSTER LINKS */
6427 addReplyClusterLinksDescription(c);
6428 } else {
6429 return 0;
6430 }
6431
6432 return 1;
6433}
6434
6435const char **clusterCommandExtendedHelp(void) {
6436 static const char *help[] = {
6437 "ADDSLOTS <slot> [<slot> ...]",
6438 " Assign slots to current node.",
6439 "ADDSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...]",
6440 " Assign slots which are between <start-slot> and <end-slot> to current node.",
6441 "BUMPEPOCH",
6442 " Advance the cluster config epoch.",
6443 "COUNT-FAILURE-REPORTS <node-id>",
6444 " Return number of failure reports for <node-id>.",
6445 "DELSLOTS <slot> [<slot> ...]",
6446 " Delete slots information from current node.",
6447 "DELSLOTSRANGE <start slot> <end slot> [<start slot> <end slot> ...]",
6448 " Delete slots information which are between <start-slot> and <end-slot> from current node.",
6449 "FAILOVER [FORCE|TAKEOVER]",
6450 " Promote current replica node to being a master.",
6451 "FORGET <node-id>",
6452 " Remove a node from the cluster.",
6453 "FLUSHSLOTS",
6454 " Delete current node own slots information.",
6455 "MEET <ip> <port> [<bus-port>]",
6456 " Connect nodes into a working cluster.",
6457 "REPLICATE <node-id>",
6458 " Configure current node as replica to <node-id>.",
6459 "RESET [HARD|SOFT]",
6460 " Reset current node (default: soft).",
6461 "SET-CONFIG-EPOCH <epoch>",
6462 " Set config epoch of current node.",
6463 "SETSLOT <slot> (IMPORTING <node-id>|MIGRATING <node-id>|STABLE|NODE <node-id>)",
6464 " Set slot state.",
6465 "SAVECONFIG",
6466 " Force saving cluster configuration on disk.",
6467 "LINKS",
6468 " Return information about all network links between this node and its peers.",
6469 " Output format is an array where each array element is a map containing attributes of a link",
6470 "MIGRATION IMPORT <start-slot end-slot [start-slot end-slot ...]> |",
6471 " STATUS [ID <task-id> | ALL] | CANCEL [ID <task-id> | ALL]",
6472 " Start, monitor and cancel slot migration.",
6473 NULL
6474 };
6475
6476 return help;
6477}
6478
6479int clusterNodeNumSlaves(clusterNode *node) {
6480 return node->numslaves;
6481}
6482
6483clusterNode *clusterNodeGetSlave(clusterNode *node, int slave_idx) {
6484 return node->slaves[slave_idx];
6485}
6486
6487clusterNode *getMigratingSlotDest(int slot) {
6488 return server.cluster->migrating_slots_to[slot];
6489}
6490
6491clusterNode *getImportingSlotSource(int slot) {
6492 return server.cluster->importing_slots_from[slot];
6493}
6494
6495int isClusterHealthy(void) {
6496 return server.cluster->state == CLUSTER_OK;
6497}
6498
6499clusterNode *getNodeBySlot(int slot) {
6500 return server.cluster->slots[slot];
6501}
6502
6503char *clusterNodeHostname(clusterNode *node) {
6504 return node->hostname;
6505}
6506
6507long long clusterNodeReplOffset(clusterNode *node) {
6508 return node->repl_offset;
6509}
6510
6511const char *clusterNodePreferredEndpoint(clusterNode *n) {
6512 char *hostname = clusterNodeHostname(n);
6513 switch (server.cluster_preferred_endpoint_type) {
6514 case CLUSTER_ENDPOINT_TYPE_IP:
6515 return clusterNodeIp(n);
6516 case CLUSTER_ENDPOINT_TYPE_HOSTNAME:
6517 return (hostname != NULL && hostname[0] != '\0') ? hostname : "?";
6518 case CLUSTER_ENDPOINT_TYPE_UNKNOWN_ENDPOINT:
6519 return "";
6520 }
6521 return "unknown";
6522}
6523
6524int clusterAllowFailoverCmd(client *c) {
6525 if (!server.cluster_enabled) {
6526 return 1;
6527 }
6528 addReplyError(c,"FAILOVER not allowed in cluster mode. "
6529 "Use CLUSTER FAILOVER command instead.");
6530 return 0;
6531}
6532
6533void clusterPromoteSelfToMaster(void) {
6534 replicationUnsetMaster();
6535 asmFinalizeMasterTask();
6536}
6537
6538int clusterAsmOnEvent(const char *task_id, int event, void *arg) {
6539 sds str = NULL;
6540
6541 slotRangeArray *slots = asmTaskGetSlotRanges(task_id);
6542 if (slots) str = slotRangeArrayToString(slots);
6543 else if (arg) str = slotRangeArrayToString(arg);
6544
6545 serverLog(LL_VERBOSE, "Slot migration task %s received event %d for slots: %s",
6546 task_id, event, str ? str : "unknown");
6547
6548 switch (event) {
6549 case ASM_EVENT_TAKEOVER:
6550 for (int i = 0; i < slots->num_ranges; i++) {
6551 slotRange *sr = &slots->ranges[i];
6552 for (int j = sr->start; j <= sr->end; j++) {
6553 clusterDelSlot(j);
6554 clusterAddSlot(myself, j);
6555 }
6556 }
6557 /* Bump config epoch and broadcast the new config to the other nodes. */
6558 clusterBumpConfigEpochWithoutConsensus();
6559 clusterSaveConfigOrDie(1);
6560 clusterDoBeforeSleep(CLUSTER_TODO_BROADCAST_PONG);
6561 clusterAsmProcess(task_id, ASM_EVENT_DONE, NULL, NULL);
6562 break;
6563 case ASM_EVENT_MIGRATE_FAILED:
6564 unpauseActions(PAUSE_DURING_SLOT_HANDOFF);
6565 break;
6566 case ASM_EVENT_HANDOFF_PREP:
6567 pauseActions(PAUSE_DURING_SLOT_HANDOFF,
6568 LLONG_MAX,
6569 PAUSE_ACTIONS_CLIENT_WRITE_SET);
6570 clusterAsmProcess(task_id, ASM_EVENT_HANDOFF, NULL, NULL);
6571 break;
6572 case ASM_EVENT_MIGRATE_COMPLETED:
6573 unpauseActions(PAUSE_DURING_SLOT_HANDOFF);
6574 break;
6575 default:
6576 break;
6577 }
6578
6579 sdsfree(str);
6580 return C_OK;
6581}