aboutsummaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/replication.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/replication.c')
-rw-r--r--examples/redis-unstable/src/replication.c5387
1 files changed, 0 insertions, 5387 deletions
diff --git a/examples/redis-unstable/src/replication.c b/examples/redis-unstable/src/replication.c
deleted file mode 100644
index 309d6c4..0000000
--- a/examples/redis-unstable/src/replication.c
+++ /dev/null
@@ -1,5387 +0,0 @@
1/* Asynchronous replication implementation.
2 *
3 * Copyright (c) 2009-Present, Redis Ltd.
4 * All rights reserved.
5 *
6 * Copyright (c) 2024-present, Valkey contributors.
7 * All rights reserved.
8 *
9 * Licensed under your choice of (a) the Redis Source Available License 2.0
10 * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
11 * GNU Affero General Public License v3 (AGPLv3).
12 *
13 * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
14 */
15
16/*
17 * replication.c - Replication Management
18 *
19 * This file contains the implementation of Redis's replication logic, which
20 * enables data synchronization between master and replica instances.
21 * It handles:
22 * - Master-to-replica synchronization
23 * - Full and partial resynchronizations
24 * - Replication backlog management
25 * - State machines for replica operations
26 * - RDB Channel for Full Sync (lookup "rdb channel for full sync")
27 */
28
29#include "server.h"
30#include "cluster.h"
31#include "cluster_slot_stats.h"
32#include "bio.h"
33#include "functions.h"
34#include "connection.h"
35#include "cluster_asm.h"
36
37#include <memory.h>
38#include <sys/time.h>
39#include <unistd.h>
40#include <fcntl.h>
41#include <sys/socket.h>
42#include <sys/stat.h>
43
44void replicationDiscardCachedMaster(void);
45void replicationResurrectCachedMaster(connection *conn);
46void replicationSendAck(void);
47int replicaPutOnline(client *slave);
48void replicaStartCommandStream(client *slave);
49int cancelReplicationHandshake(int reconnect);
50static void rdbChannelFullSyncWithMaster(connection *conn);
51static int rdbChannelAbort(void);
52static void rdbChannelBufferReplData(connection *conn);
53static void rdbChannelReplDataBufInit(void);
54static void rdbChannelStreamReplDataToDb(void);
55static void rdbChannelCleanup(void);
56
57/* We take a global flag to remember if this instance generated an RDB
58 * because of replication, so that we can remove the RDB file in case
59 * the instance is configured to have no persistence. */
60int RDBGeneratedByReplication = 0;
61
62
63/* A reference to diskless loading rio to abort it asynchronously. It's needed
64 * for rdbchannel replication. While loading from rdbchannel connection, we may
65 * yield back to eventloop. If main channel connection detects a network problem
66 * we want to abort loading. It calls rioAbort() in this case, so next rioRead()
67 * from rdbchannel connection will return error to cancel loading safely. */
68static rio *disklessLoadingRio = NULL;
69
70/* --------------------------- Utility functions ---------------------------- */
71
72/* Returns 1 if the replica is rdbchannel and there is an associated main
73 * channel slave with that. */
74int replicationCheckHasMainChannel(client *replica) {
75 if (!(replica->flags & CLIENT_REPL_RDB_CHANNEL) ||
76 !replica->main_ch_client_id ||
77 lookupClientByID(replica->main_ch_client_id) == NULL)
78 {
79 return 0;
80 }
81 return 1;
82}
83
84/* During rdb channel replication, replica opens two connections. From master
85 * POV, these connections are distinct replicas in server.slaves. This function
86 * counts associated replicas as one and returns logical replica count. */
87unsigned long replicationLogicalReplicaCount(void) {
88 unsigned long count = 0;
89 listNode *ln;
90 listIter li;
91
92 listRewind(server.slaves,&li);
93 while ((ln = listNext(&li))) {
94 client *replica = listNodeValue(ln);
95 if (!replicationCheckHasMainChannel(replica))
96 count++;
97 }
98 return count;
99}
100
101int replicaFromIOThreadHasPendingRead(client *c) {
102 serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID);
103
104 int pending_read;
105 atomicGetWithSync(c->pending_read, pending_read);
106 return pending_read;
107}
108
109/* Send replicas to their respective IO threads if it has pending reads or
110 * writes. Otherwise it remains in main thread so it can check for new data in
111 * the replication buffer ASAP. */
112void putReplicasInPendingClientsToIOThreads(void) {
113 if (server.io_threads_num <= 1) return;
114
115 serverAssert(pthread_equal(pthread_self(), server.main_thread_id));
116
117 listIter li;
118 listNode *ln;
119 listRewind(server.slaves,&li);
120 while((ln = listNext(&li))) {
121 client *replica = listNodeValue(ln);
122
123 /* We only care about replicas that need to run on IO thread but are
124 * currently in main */
125 if (replica->tid == IOTHREAD_MAIN_THREAD_ID ||
126 replica->running_tid != IOTHREAD_MAIN_THREAD_ID)
127 {
128 continue;
129 }
130
131 /* Skip the replica if it's scheduled for close */
132 if (replica->flags & CLIENT_CLOSE_ASAP) continue;
133
134 /* The call to clientHasPendingReplies may seem redundant but in the
135 * case of replica being in IO thread we can have the following case:
136 * replica gets back to main thread after sending the repl buffer it
137 * knows about. In the mean time main thread has accumulated new repl
138 * data. In that case the replica's client wouldn't have been put in
139 * the pending write queue but will still have new repl data it needs to
140 * send, so we make sure to check for that and send it back to IO thread
141 * if so. On the other hand if replica gets back to main thread before
142 * any new repl data has accumulated then after a new cmd is propagated
143 * the replica will be put in the pending write queue as usual so we
144 * need to check for that also.
145 * In addition, if the replica client has pending read events, we should
146 * also send them to the IO thread. */
147 if (replica->flags & CLIENT_PENDING_WRITE ||
148 clientHasPendingReplies(replica) ||
149 replicaFromIOThreadHasPendingRead(replica))
150 {
151 enqueuePendingClienstToIOThreads(replica);
152 }
153 }
154}
155
156/* Run some cron tasks for a connected master client. Return 1 when the client
157 * is freed, 0 otherwise. */
158int replicationCronRunMasterClient(void) {
159 if (!server.masterhost || !server.master) return 0;
160
161 if (server.master->running_tid != IOTHREAD_MAIN_THREAD_ID) return 0;
162
163 /* Timed out master when we are an already connected slave? */
164 if (server.repl_state == REPL_STATE_CONNECTED &&
165 (time(NULL)-server.master->lastinteraction) > server.repl_timeout)
166 {
167 serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
168 freeClient(server.master);
169 return 1;
170 }
171
172 /* Send ACK to master from time to time.
173 * Note that we do not send periodic acks to masters that don't
174 * support PSYNC and replication offsets. */
175 if (!(server.master->flags & CLIENT_PRE_PSYNC))
176 replicationSendAck();
177
178 return 0;
179}
180
181ConnectionType *connTypeOfReplication(void) {
182 if (server.tls_replication) {
183 return connectionTypeTls();
184 }
185
186 return connectionTypeTcp();
187}
188
189/* Return the pointer to a string representing the slave ip:listening_port
190 * pair. Mostly useful for logging, since we want to log a slave using its
191 * IP address and its listening port which is more clear for the user, for
192 * example: "Closing connection with replica 10.1.2.3:6380". */
193char *replicationGetSlaveName(client *c) {
194 static char buf[NET_HOST_PORT_STR_LEN];
195 char ip[NET_IP_STR_LEN];
196
197 ip[0] = '\0';
198 buf[0] = '\0';
199 if (c->slave_addr ||
200 connAddrPeerName(c->conn,ip,sizeof(ip),NULL) != -1)
201 {
202 char *addr = c->slave_addr ? c->slave_addr : ip;
203 if (c->slave_listening_port)
204 formatAddr(buf,sizeof(buf),addr,c->slave_listening_port);
205 else
206 snprintf(buf,sizeof(buf),"%s:<unknown-replica-port>",addr);
207 } else {
208 snprintf(buf,sizeof(buf),"client id #%llu",
209 (unsigned long long) c->id);
210 }
211 return buf;
212}
213
214/* Plain unlink() can block for quite some time in order to actually apply
215 * the file deletion to the filesystem. This call removes the file in a
216 * background thread instead. We actually just do close() in the thread,
217 * by using the fact that if there is another instance of the same file open,
218 * the foreground unlink() will only remove the fs name, and deleting the
219 * file's storage space will only happen once the last reference is lost. */
220int bg_unlink(const char *filename) {
221 int fd = open(filename,O_RDONLY|O_NONBLOCK);
222 if (fd == -1) {
223 /* Can't open the file? Fall back to unlinking in the main thread. */
224 return unlink(filename);
225 } else {
226 /* The following unlink() removes the name but doesn't free the
227 * file contents because a process still has it open. */
228 int retval = unlink(filename);
229 if (retval == -1) {
230 /* If we got an unlink error, we just return it, closing the
231 * new reference we have to the file. */
232 int old_errno = errno;
233 close(fd); /* This would overwrite our errno. So we saved it. */
234 errno = old_errno;
235 return -1;
236 }
237 bioCreateCloseJob(fd, 0, 0);
238 return 0; /* Success. */
239 }
240}
241
242/* ---------------------------------- MASTER -------------------------------- */
243
244void createReplicationBacklog(void) {
245 serverAssert(server.repl_backlog == NULL);
246 server.repl_backlog = zmalloc(sizeof(replBacklog));
247 server.repl_backlog->ref_repl_buf_node = NULL;
248 server.repl_backlog->unindexed_count = 0;
249 server.repl_backlog->blocks_index = raxNew();
250 server.repl_backlog->histlen = 0;
251 /* We don't have any data inside our buffer, but virtually the first
252 * byte we have is the next byte that will be generated for the
253 * replication stream. */
254 server.repl_backlog->offset = server.master_repl_offset+1;
255}
256
257/* This function is called when the user modifies the replication backlog
258 * size at runtime. It is up to the function to resize the buffer and setup it
259 * so that it contains the same data as the previous one (possibly less data,
260 * but the most recent bytes, or the same data and more free space in case the
261 * buffer is enlarged). */
262void resizeReplicationBacklog(void) {
263 if (server.repl_backlog_size < CONFIG_REPL_BACKLOG_MIN_SIZE)
264 server.repl_backlog_size = CONFIG_REPL_BACKLOG_MIN_SIZE;
265 if (server.repl_backlog)
266 incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
267}
268
269void freeReplicationBacklog(void) {
270 serverAssert(listLength(server.slaves) == 0);
271 if (server.repl_backlog == NULL) return;
272
273 /* Decrease the start buffer node reference count. */
274 if (server.repl_backlog->ref_repl_buf_node) {
275 replBufBlock *o = listNodeValue(
276 server.repl_backlog->ref_repl_buf_node);
277 serverAssert(o->refcount == 1); /* Last reference. */
278 o->refcount--;
279 }
280
281 /* Replication buffer blocks are completely released when we free the
282 * backlog, since the backlog is released only when there are no replicas
283 * and the backlog keeps the last reference of all blocks. */
284 freeReplicationBacklogRefMemAsync(server.repl_buffer_blocks,
285 server.repl_backlog->blocks_index);
286 resetReplicationBuffer();
287 zfree(server.repl_backlog);
288 server.repl_backlog = NULL;
289}
290
291/* To make search offset from replication buffer blocks quickly
292 * when replicas ask partial resynchronization, we create one index
293 * block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */
294void createReplicationBacklogIndex(listNode *ln) {
295 server.repl_backlog->unindexed_count++;
296 if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) {
297 replBufBlock *o = listNodeValue(ln);
298 uint64_t encoded_offset = htonu64(o->repl_offset);
299 raxInsert(server.repl_backlog->blocks_index,
300 (unsigned char*)&encoded_offset, sizeof(uint64_t),
301 ln, NULL);
302 server.repl_backlog->unindexed_count = 0;
303 }
304}
305
306/* Rebase replication buffer blocks' offset since the initial
307 * setting offset starts from 0 when master restart. */
308void rebaseReplicationBuffer(long long base_repl_offset) {
309 raxFree(server.repl_backlog->blocks_index);
310 server.repl_backlog->blocks_index = raxNew();
311 server.repl_backlog->unindexed_count = 0;
312
313 listIter li;
314 listNode *ln;
315 listRewind(server.repl_buffer_blocks, &li);
316 while ((ln = listNext(&li))) {
317 replBufBlock *o = listNodeValue(ln);
318 o->repl_offset += base_repl_offset;
319 createReplicationBacklogIndex(ln);
320 }
321}
322
323void resetReplicationBuffer(void) {
324 server.repl_buffer_mem = 0;
325 server.repl_buffer_blocks = listCreate();
326 listSetFreeMethod(server.repl_buffer_blocks, zfree);
327}
328
329int canFeedReplicaReplBuffer(client *replica) {
330 /* Don't feed replicas that only want the RDB or main channels of migration
331 * destinations which need filtered stream for migrating slot ranges. */
332 if (replica->flags & CLIENT_REPL_RDBONLY ||
333 replica->flags & CLIENT_ASM_MIGRATING) return 0;
334
335 /* Don't feed replicas that are still waiting for BGSAVE to start. */
336 if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
337 replica->replstate == SLAVE_STATE_WAIT_RDB_CHANNEL) return 0;
338
339 /* Don't feed replicas that are going to be closed ASAP. */
340 if (replica->flags & CLIENT_CLOSE_ASAP) return 0;
341
342 return 1;
343}
344
345/* Create the replication backlog if needed. */
346void createReplicationBacklogIfNeeded(void) {
347 if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
348 /* When we create the backlog from scratch, we always use a new
349 * replication ID and clear the ID2, since there is no valid
350 * past history. */
351 changeReplicationId();
352 clearReplicationId2();
353 createReplicationBacklog();
354 serverLog(LL_NOTICE,"Replication backlog created, my new "
355 "replication IDs are '%s' and '%s'",
356 server.replid, server.replid2);
357 }
358}
359/* Similar with 'prepareClientToWrite', note that we must call this function
360 * before feeding replication stream into global replication buffer, since
361 * clientHasPendingReplies in prepareClientToWrite will access the global
362 * replication buffer to make judgements. */
363int prepareReplicasToWrite(void) {
364 listIter li;
365 listNode *ln;
366 int prepared = 0;
367
368 listRewind(server.slaves,&li);
369 while((ln = listNext(&li))) {
370 client *slave = ln->value;
371 if (!canFeedReplicaReplBuffer(slave)) continue;
372 if (prepareClientToWrite(slave) == C_ERR) continue;
373 prepared++;
374 }
375
376 return prepared;
377}
378
379/* Wrapper for feedReplicationBuffer() that takes Redis string objects
380 * as input. */
381void feedReplicationBufferWithObject(robj *o) {
382 char llstr[LONG_STR_SIZE];
383 void *p;
384 size_t len;
385
386 if (o->encoding == OBJ_ENCODING_INT) {
387 len = ll2string(llstr,sizeof(llstr),(long)o->ptr);
388 p = llstr;
389 } else {
390 len = sdslen(o->ptr);
391 p = o->ptr;
392 }
393 feedReplicationBuffer(p,len);
394}
395
396/* Generally, we only have one replication buffer block to trim when replication
397 * backlog size exceeds our setting and no replica reference it. But if replica
398 * clients disconnect, we need to free many replication buffer blocks that are
399 * referenced. It would cost much time if there are a lots blocks to free, that
400 * will freeze server, so we trim replication backlog incrementally. */
401void incrementalTrimReplicationBacklog(size_t max_blocks) {
402 serverAssert(server.repl_backlog != NULL);
403
404 size_t trimmed_blocks = 0;
405 while (server.repl_backlog->histlen > server.repl_backlog_size &&
406 trimmed_blocks < max_blocks)
407 {
408 /* We never trim backlog to less than one block. */
409 if (listLength(server.repl_buffer_blocks) <= 1) break;
410
411 /* Replicas increment the refcount of the first replication buffer block
412 * they refer to, in that case, we don't trim the backlog even if
413 * backlog_histlen exceeds backlog_size. This implicitly makes backlog
414 * bigger than our setting, but makes the master accept partial resync as
415 * much as possible. So that backlog must be the last reference of
416 * replication buffer blocks. */
417 listNode *first = listFirst(server.repl_buffer_blocks);
418 serverAssert(first == server.repl_backlog->ref_repl_buf_node);
419 replBufBlock *fo = listNodeValue(first);
420 if (fo->refcount != 1) break;
421
422 /* We don't try trim backlog if backlog valid size will be lessen than
423 * setting backlog size once we release the first repl buffer block. */
424 if (server.repl_backlog->histlen - (long long)fo->size <=
425 server.repl_backlog_size) break;
426
427 /* Decr refcount and release the first block later. */
428 fo->refcount--;
429 trimmed_blocks++;
430 server.repl_backlog->histlen -= fo->size;
431
432 /* Go to use next replication buffer block node. */
433 listNode *next = listNextNode(first);
434 server.repl_backlog->ref_repl_buf_node = next;
435 serverAssert(server.repl_backlog->ref_repl_buf_node != NULL);
436 /* Incr reference count to keep the new head node. */
437 ((replBufBlock *)listNodeValue(next))->refcount++;
438
439 /* Remove the node in recorded blocks. */
440 uint64_t encoded_offset = htonu64(fo->repl_offset);
441 raxRemove(server.repl_backlog->blocks_index,
442 (unsigned char*)&encoded_offset, sizeof(uint64_t), NULL);
443
444 /* Delete the first node from global replication buffer. */
445 serverAssert(fo->refcount == 0 && fo->used == fo->size);
446 server.repl_buffer_mem -= (fo->size +
447 sizeof(listNode) + sizeof(replBufBlock));
448 listDelNode(server.repl_buffer_blocks, first);
449 }
450
451 /* Set the offset of the first byte we have in the backlog. */
452 server.repl_backlog->offset = server.master_repl_offset -
453 server.repl_backlog->histlen + 1;
454}
455
456/* Free replication buffer blocks that are referenced by this client. */
457void freeReplicaReferencedReplBuffer(client *replica) {
458 serverAssert(replica->running_tid == IOTHREAD_MAIN_THREAD_ID);
459
460 if (replica->ref_repl_buf_node != NULL) {
461 /* Decrease the start buffer node reference count. */
462 replBufBlock *o = listNodeValue(replica->ref_repl_buf_node);
463 serverAssert(o->refcount > 0);
464 o->refcount--;
465 incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
466 }
467 replica->ref_repl_buf_node = NULL;
468 replica->ref_block_pos = 0;
469}
470
471/* Append bytes into the global replication buffer list, replication backlog and
472 * all replica clients use replication buffers collectively, this function replace
473 * 'addReply*', 'feedReplicationBacklog' for replicas and replication backlog,
474 * First we add buffer into global replication buffer block list, and then
475 * update replica / replication-backlog referenced node and block position. */
476void feedReplicationBuffer(char *s, size_t len) {
477 static long long repl_block_id = 0;
478
479 if (server.repl_backlog == NULL) return;
480
481 clusterSlotStatsIncrNetworkBytesOutForReplication(len);
482
483 /* Update the current cmd's keys with the commands replication bytes*/
484 hotkeyMetrics metrics = {0, len};
485 hotkeyStatsUpdateCurrentCmd(server.hotkeys, metrics);
486
487 while(len > 0) {
488 size_t start_pos = 0; /* The position of referenced block to start sending. */
489 listNode *start_node = NULL; /* Replica/backlog starts referenced node. */
490 int add_new_block = 0; /* Create new block if current block is total used. */
491 listNode *ln = listLast(server.repl_buffer_blocks);
492 replBufBlock *tail = ln ? listNodeValue(ln) : NULL;
493
494 /* Append to tail string when possible. */
495 if (tail && tail->size > tail->used) {
496 start_node = listLast(server.repl_buffer_blocks);
497 start_pos = tail->used;
498 /* Copy the part we can fit into the tail, and leave the rest for a
499 * new node */
500 size_t avail = tail->size - tail->used;
501 size_t copy = (avail >= len) ? len : avail;
502 memcpy(tail->buf + tail->used, s, copy);
503 tail->used += copy;
504 s += copy;
505 len -= copy;
506 server.master_repl_offset += copy;
507 server.repl_backlog->histlen += copy;
508 }
509 if (len) {
510 /* Create a new node, make sure it is allocated to at
511 * least PROTO_REPLY_CHUNK_BYTES */
512 size_t usable_size;
513 /* Avoid creating nodes smaller than PROTO_REPLY_CHUNK_BYTES, so that we can append more data into them,
514 * and also avoid creating nodes bigger than repl_backlog_size / 16, so that we won't have huge nodes that can't
515 * trim when we only still need to hold a small portion from them. */
516 size_t limit = max((size_t)server.repl_backlog_size / 16, (size_t)PROTO_REPLY_CHUNK_BYTES);
517 size_t size = min(max(len, (size_t)PROTO_REPLY_CHUNK_BYTES), limit);
518 tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size);
519 /* Take over the allocation's internal fragmentation */
520 tail->size = usable_size - sizeof(replBufBlock);
521 size_t copy = (tail->size >= len) ? len : tail->size;
522 tail->used = copy;
523 tail->refcount = 0;
524 tail->repl_offset = server.master_repl_offset + 1;
525 tail->id = repl_block_id++;
526 memcpy(tail->buf, s, copy);
527 listAddNodeTail(server.repl_buffer_blocks, tail);
528 /* We also count the list node memory into replication buffer memory. */
529 server.repl_buffer_mem += (usable_size + sizeof(listNode));
530 add_new_block = 1;
531 if (start_node == NULL) {
532 start_node = listLast(server.repl_buffer_blocks);
533 start_pos = 0;
534 }
535 s += copy;
536 len -= copy;
537 server.master_repl_offset += copy;
538 server.repl_backlog->histlen += copy;
539 }
540
541 /* For output buffer of replicas. */
542 listIter li;
543 listRewind(server.slaves,&li);
544 while((ln = listNext(&li))) {
545 client *slave = ln->value;
546 if (!canFeedReplicaReplBuffer(slave)) continue;
547
548 /* Update shared replication buffer start position. */
549 if (slave->ref_repl_buf_node == NULL) {
550 slave->ref_repl_buf_node = start_node;
551 slave->ref_block_pos = start_pos;
552 /* Only increase the start block reference count. */
553 ((replBufBlock *)listNodeValue(start_node))->refcount++;
554 }
555
556 /* Check output buffer limit only when add new block. */
557 if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1);
558 }
559
560 /* For replication backlog */
561 if (server.repl_backlog->ref_repl_buf_node == NULL) {
562 server.repl_backlog->ref_repl_buf_node = start_node;
563 /* Only increase the start block reference count. */
564 ((replBufBlock *)listNodeValue(start_node))->refcount++;
565
566 /* Replication buffer must be empty before adding replication stream
567 * into replication backlog. */
568 serverAssert(add_new_block == 1 && start_pos == 0);
569 }
570 if (add_new_block) {
571 createReplicationBacklogIndex(listLast(server.repl_buffer_blocks));
572
573 /* It is important to trim after adding replication data to keep the backlog size close to
574 * repl_backlog_size in the common case. We wait until we add a new block to avoid repeated
575 * unnecessary trimming attempts when small amounts of data are added. See comments in
576 * freeMemoryGetNotCountedMemory() for details on replication backlog memory tracking. */
577 incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
578 }
579 }
580}
581
582/* Propagate write commands to replication stream.
583 *
584 * This function is used if the instance is a master: we use the commands
585 * received by our clients in order to create the replication stream.
586 * Instead if the instance is a replica and has sub-replicas attached, we use
587 * replicationFeedStreamFromMasterStream() */
588void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
589 int j, len;
590 char llstr[LONG_STR_SIZE];
591
592 /* In case we propagate a command that doesn't touch keys (PING, REPLCONF) we
593 * pass dbid=-1 that indicate there is no need to replicate `select` command. */
594 serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum));
595
596 /* If the instance is not a top level master, return ASAP: we'll just proxy
597 * the stream of data we receive from our master instead, in order to
598 * propagate *identical* replication stream. In this way this slave can
599 * advertise the same replication ID as the master (since it shares the
600 * master replication history and has the same backlog and offsets). */
601 if (server.masterhost != NULL) return;
602
603 /* If current client is marked as master, we will proxy the command stream
604 * to our slaves instead of replicating them, that also happens when being
605 * in atomic slot migration. */
606 if (server.current_client && server.current_client->flags & CLIENT_MASTER) return;
607
608 /* If there aren't slaves, and there is no backlog buffer to populate,
609 * we can return ASAP. */
610 if (server.repl_backlog == NULL && listLength(slaves) == 0) {
611 /* We increment the repl_offset anyway, since we use that for tracking AOF fsyncs
612 * even when there's no replication active. This code will not be reached if AOF
613 * is also disabled. */
614 server.master_repl_offset += 1;
615 return;
616 }
617
618 /* We can't have slaves attached and no backlog. */
619 serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));
620
621 /* Update the time of sending replication stream to replicas. */
622 server.repl_stream_lastio = server.unixtime;
623
624 /* Must install write handler for all replicas first before feeding
625 * replication stream. */
626 prepareReplicasToWrite();
627
628 /* Send SELECT command to every slave if needed. */
629 if (dictid != -1 && server.slaveseldb != dictid) {
630 robj *selectcmd;
631
632 /* For a few DBs we have pre-computed SELECT command. */
633 if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
634 selectcmd = shared.select[dictid];
635 } else {
636 int dictid_len;
637
638 dictid_len = ll2string(llstr,sizeof(llstr),dictid);
639 selectcmd = createObject(OBJ_STRING,
640 sdscatprintf(sdsempty(),
641 "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
642 dictid_len, llstr));
643 }
644
645 feedReplicationBufferWithObject(selectcmd);
646
647 /* Although the SELECT command is not associated with any slot,
648 * its per-slot network-bytes-out accumulation is made by the above function call.
649 * To cancel-out this accumulation, below adjustment is made. */
650 clusterSlotStatsDecrNetworkBytesOutForReplication(sdslen(selectcmd->ptr));
651
652 if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
653 decrRefCount(selectcmd);
654
655 server.slaveseldb = dictid;
656 }
657
658 /* Write the command to the replication buffer if any. */
659 char aux[LONG_STR_SIZE+3];
660
661 /* Add the multi bulk reply length. */
662 aux[0] = '*';
663 len = ll2string(aux+1,sizeof(aux)-1,argc);
664 aux[len+1] = '\r';
665 aux[len+2] = '\n';
666 feedReplicationBuffer(aux,len+3);
667
668 for (j = 0; j < argc; j++) {
669 long objlen = stringObjectLen(argv[j]);
670
671 /* We need to feed the buffer with the object as a bulk reply
672 * not just as a plain string, so create the $..CRLF payload len
673 * and add the final CRLF */
674 aux[0] = '$';
675 len = ll2string(aux+1,sizeof(aux)-1,objlen);
676 aux[len+1] = '\r';
677 aux[len+2] = '\n';
678 feedReplicationBuffer(aux,len+3);
679 feedReplicationBufferWithObject(argv[j]);
680 feedReplicationBuffer(aux+len+1,2);
681 }
682}
683
684/* This is a debugging function that gets called when we detect something
685 * wrong with the replication protocol: the goal is to peek into the
686 * replication backlog and show a few final bytes to make simpler to
687 * guess what kind of bug it could be. */
688void showLatestBacklog(void) {
689 if (server.repl_backlog == NULL) return;
690 if (listLength(server.repl_buffer_blocks) == 0) return;
691 if (server.hide_user_data_from_log) {
692 serverLog(LL_NOTICE,"hide-user-data-from-log is on, skip logging backlog content to avoid spilling PII.");
693 return;
694 }
695
696 size_t dumplen = 256;
697 if (server.repl_backlog->histlen < (long long)dumplen)
698 dumplen = server.repl_backlog->histlen;
699
700 sds dump = sdsempty();
701 listNode *node = listLast(server.repl_buffer_blocks);
702 while(dumplen) {
703 if (node == NULL) break;
704 replBufBlock *o = listNodeValue(node);
705 size_t thislen = o->used >= dumplen ? dumplen : o->used;
706 sds head = sdscatrepr(sdsempty(), o->buf+o->used-thislen, thislen);
707 sds tmp = sdscatsds(head, dump);
708 sdsfree(dump);
709 dump = tmp;
710 dumplen -= thislen;
711 node = listPrevNode(node);
712 }
713
714 /* Finally log such bytes: this is vital debugging info to
715 * understand what happened. */
716 serverLog(LL_NOTICE,"Latest backlog is: '%s'", dump);
717 sdsfree(dump);
718}
719
720/* This function is used in order to proxy what we receive from our master
721 * to our sub-slaves. Besides, we also proxy the replication stream from
722 * the source node when being in atomic slot migration. */
723void replicationFeedStreamFromMasterStream(char *buf, size_t buflen) {
724 /* There must be replication backlog if having attached slaves. */
725 if (listLength(server.slaves)) serverAssert(server.repl_backlog != NULL);
726 if (server.repl_backlog) {
727 /* Must install write handler for all replicas first before feeding
728 * replication stream. */
729 prepareReplicasToWrite();
730 feedReplicationBuffer(buf,buflen);
731 } else if (server.masterhost == NULL && server.aof_enabled) {
732 /* We increment the repl_offset anyway, since we use that for tracking
733 * AOF fsyncs even when there's no replication active. This code will
734 * not be reached if AOF is also disabled.
735 *
736 * As we skip feeding the replication buffer in atomic slot migration,
737 * so here we need to update the replication offset manually. */
738 server.master_repl_offset += 1;
739 }
740}
741
742void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
743 /* Fast path to return if the monitors list is empty or the server is in loading. */
744 if (monitors == NULL || listLength(monitors) == 0 || server.loading) return;
745 listNode *ln;
746 listIter li;
747 int j;
748 sds cmdrepr = sdsnew("+");
749 robj *cmdobj;
750 struct timeval tv;
751
752 gettimeofday(&tv,NULL);
753 cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec);
754 if (c->flags & CLIENT_SCRIPT) {
755 cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid);
756 } else if (c->flags & CLIENT_UNIX_SOCKET) {
757 cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket);
758 } else {
759 cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c));
760 }
761
762 for (j = 0; j < argc; j++) {
763 if (argv[j]->encoding == OBJ_ENCODING_INT) {
764 cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr);
765 } else {
766 cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr,
767 sdslen(argv[j]->ptr));
768 }
769 if (j != argc-1)
770 cmdrepr = sdscatlen(cmdrepr," ",1);
771 }
772 cmdrepr = sdscatlen(cmdrepr,"\r\n",2);
773 cmdobj = createObject(OBJ_STRING,cmdrepr);
774
775 listRewind(monitors,&li);
776 while((ln = listNext(&li))) {
777 client *monitor = ln->value;
778 /* Do not show internal commands to non-internal clients. */
779 if (c->realcmd && (c->realcmd->flags & CMD_INTERNAL) && !(monitor->flags & CLIENT_INTERNAL)) {
780 continue;
781 }
782 addReply(monitor,cmdobj);
783 updateClientMemUsageAndBucket(monitor);
784 }
785 decrRefCount(cmdobj);
786}
787
788/* Feed the slave 'c' with the replication backlog starting from the
789 * specified 'offset' up to the end of the backlog. */
790long long addReplyReplicationBacklog(client *c, long long offset) {
791 serverAssert(c->running_tid == IOTHREAD_MAIN_THREAD_ID);
792
793 long long skip;
794
795 serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset);
796
797 if (server.repl_backlog->histlen == 0) {
798 serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero");
799 return 0;
800 }
801
802 serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld",
803 server.repl_backlog_size);
804 serverLog(LL_DEBUG, "[PSYNC] First byte: %lld",
805 server.repl_backlog->offset);
806 serverLog(LL_DEBUG, "[PSYNC] History len: %lld",
807 server.repl_backlog->histlen);
808
809 /* Compute the amount of bytes we need to discard. */
810 skip = offset - server.repl_backlog->offset;
811 serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip);
812
813 /* Iterate recorded blocks, quickly search the approximate node. */
814 listNode *node = NULL;
815 if (raxSize(server.repl_backlog->blocks_index) > 0) {
816 uint64_t encoded_offset = htonu64(offset);
817 raxIterator ri;
818 raxStart(&ri, server.repl_backlog->blocks_index);
819 raxSeek(&ri, ">", (unsigned char*)&encoded_offset, sizeof(uint64_t));
820 if (raxEOF(&ri)) {
821 /* No found, so search from the last recorded node. */
822 raxSeek(&ri, "$", NULL, 0);
823 raxPrev(&ri);
824 node = (listNode *)ri.data;
825 } else {
826 raxPrev(&ri); /* Skip the sought node. */
827 /* We should search from the prev node since the offset of current
828 * sought node exceeds searching offset. */
829 if (raxPrev(&ri))
830 node = (listNode *)ri.data;
831 else
832 node = server.repl_backlog->ref_repl_buf_node;
833 }
834 raxStop(&ri);
835 } else {
836 /* No recorded blocks, just from the start node to search. */
837 node = server.repl_backlog->ref_repl_buf_node;
838 }
839
840 /* Search the exact node. */
841 while (node != NULL) {
842 replBufBlock *o = listNodeValue(node);
843 if (o->repl_offset + (long long)o->used >= offset) break;
844 node = listNextNode(node);
845 }
846 serverAssert(node != NULL);
847
848 /* Install a writer handler first.*/
849 prepareClientToWrite(c);
850 /* Setting output buffer of the replica. */
851 replBufBlock *o = listNodeValue(node);
852 o->refcount++;
853 c->ref_repl_buf_node = node;
854 c->ref_block_pos = offset - o->repl_offset;
855
856 return server.repl_backlog->histlen - skip;
857}
858
859/* Return the offset to provide as reply to the PSYNC command received
860 * from the slave. The returned value is only valid immediately after
861 * the BGSAVE process started and before executing any other command
862 * from clients. */
863long long getPsyncInitialOffset(void) {
864 return server.master_repl_offset;
865}
866
867/* Send a FULLRESYNC reply in the specific case of a full resynchronization,
868 * as a side effect setup the slave for a full sync in different ways:
869 *
870 * 1) Remember, into the slave client structure, the replication offset
871 * we sent here, so that if new slaves will later attach to the same
872 * background RDB saving process (by duplicating this client output
873 * buffer), we can get the right offset from this slave.
874 * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that
875 * we start accumulating differences from this point.
876 * 3) Force the replication stream to re-emit a SELECT statement so
877 * the new slave incremental differences will start selecting the
878 * right database number.
879 *
880 * Normally this function should be called immediately after a successful
881 * BGSAVE for replication was started, or when there is one already in
882 * progress that we attached our slave to. */
883int replicationSetupSlaveForFullResync(client *slave, long long offset) {
884 char buf[128];
885 int buflen;
886
887 slave->psync_initial_offset = offset;
888 slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
889 /* We are going to accumulate the incremental changes for this
890 * slave as well. Set slaveseldb to -1 in order to force to re-emit
891 * a SELECT statement in the replication stream. */
892 server.slaveseldb = -1;
893
894 /* Slots snapshot. */
895 if (slave->flags & CLIENT_REPL_RDB_CHANNEL &&
896 slave->slave_req & SLAVE_REQ_SLOTS_SNAPSHOT)
897 {
898 /* Start to deliver the commands stream on migrating slots. */
899 asmSlotSnapshotAndStreamStart(slave->task);
900
901 buflen = snprintf(buf, sizeof(buf), "+SLOTSSNAPSHOT\r\n");
902 if (connWrite(slave->conn, buf, buflen) != buflen) {
903 freeClientAsync(slave);
904 return C_ERR;
905 }
906 return C_OK;
907 }
908
909 /* Don't send this reply to slaves that approached us with
910 * the old SYNC command. */
911 if (!(slave->flags & CLIENT_PRE_PSYNC)) {
912 if (slave->flags & CLIENT_REPL_RDB_CHANNEL) {
913 /* This slave is rdbchannel. Find its associated main channel and
914 * change its state so we can deliver replication stream from now
915 * on, in parallel to rdb. */
916 uint64_t id = slave->main_ch_client_id;
917 client *c = lookupClientByID(id);
918 if (c && c->replstate == SLAVE_STATE_WAIT_RDB_CHANNEL) {
919 c->replstate = SLAVE_STATE_SEND_BULK_AND_STREAM;
920 serverLog(LL_NOTICE, "Starting to deliver RDB and replication stream to replica: %s",
921 replicationGetSlaveName(c));
922 } else {
923 serverLog(LL_WARNING, "Starting to deliver RDB to replica %s"
924 " but it has no associated main channel",
925 replicationGetSlaveName(slave));
926 }
927 }
928 buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n",
929 server.replid,offset);
930 if (connWrite(slave->conn,buf,buflen) != buflen) {
931 freeClientAsync(slave);
932 return C_ERR;
933 }
934 }
935 return C_OK;
936}
937
938/* This function handles the PSYNC command from the point of view of a
939 * master receiving a request for partial resynchronization.
940 *
941 * On success return C_OK, otherwise C_ERR is returned and we proceed
942 * with the usual full resync. */
943int masterTryPartialResynchronization(client *c, long long psync_offset) {
944 long long psync_len;
945 char *master_replid = c->argv[1]->ptr;
946 char buf[128];
947 int buflen;
948
949 /* Is the replication ID of this master the same advertised by the wannabe
950 * slave via PSYNC? If the replication ID changed this master has a
951 * different replication history, and there is no way to continue.
952 *
953 * Note that there are two potentially valid replication IDs: the ID1
954 * and the ID2. The ID2 however is only valid up to a specific offset. */
955 if (strcasecmp(master_replid, server.replid) &&
956 (strcasecmp(master_replid, server.replid2) ||
957 psync_offset > server.second_replid_offset))
958 {
959 /* Replid "?" is used by slaves that want to force a full resync. */
960 if (master_replid[0] != '?') {
961 if (strcasecmp(master_replid, server.replid) &&
962 strcasecmp(master_replid, server.replid2))
963 {
964 serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
965 "Replication ID mismatch (Replica asked for '%s', my "
966 "replication IDs are '%s' and '%s')",
967 master_replid, server.replid, server.replid2);
968 } else {
969 serverLog(LL_NOTICE,"Partial resynchronization not accepted: "
970 "Requested offset for second ID was %lld, but I can reply "
971 "up to %lld", psync_offset, server.second_replid_offset);
972 }
973 } else {
974 serverLog(LL_NOTICE,"Full resync requested by replica %s %s",
975 replicationGetSlaveName(c),
976 c->flags & CLIENT_REPL_RDB_CHANNEL ? "(rdb-channel)" : "");
977 }
978 goto need_full_resync;
979 }
980
981 /* We still have the data our slave is asking for? */
982 if (!server.repl_backlog ||
983 psync_offset < server.repl_backlog->offset ||
984 psync_offset > (server.repl_backlog->offset + server.repl_backlog->histlen))
985 {
986 serverLog(LL_NOTICE,
987 "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset);
988 if (psync_offset > server.master_repl_offset) {
989 serverLog(LL_WARNING,
990 "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c));
991 }
992 goto need_full_resync;
993 }
994
995 /* If we reached this point, we are able to perform a partial resync:
996 * 1) Set client state to make it a slave.
997 * 2) Inform the client we can continue with +CONTINUE
998 * 3) Send the backlog data (from the offset to the end) to the slave. */
999 c->flags |= CLIENT_SLAVE;
1000 c->replstate = SLAVE_STATE_ONLINE;
1001 c->repl_ack_time = server.unixtime;
1002 c->repl_start_cmd_stream_on_ack = 0;
1003 listAddNodeTail(server.slaves,c);
1004 /* We can't use the connection buffers since they are used to accumulate
1005 * new commands at this stage. But we are sure the socket send buffer is
1006 * empty so this write will never fail actually. */
1007 if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
1008 buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
1009 } else {
1010 buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
1011 }
1012 if (connWrite(c->conn,buf,buflen) != buflen) {
1013 freeClientAsync(c);
1014 return C_OK;
1015 }
1016 psync_len = addReplyReplicationBacklog(c,psync_offset);
1017 serverLog(LL_NOTICE,
1018 "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.",
1019 replicationGetSlaveName(c),
1020 psync_len, psync_offset);
1021 /* Note that we don't need to set the selected DB at server.slaveseldb
1022 * to -1 to force the master to emit SELECT, since the slave already
1023 * has this state from the previous connection with the master. */
1024
1025 refreshGoodSlavesCount();
1026
1027 /* Fire the replica change modules event. */
1028 moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
1029 REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
1030 NULL);
1031
1032 return C_OK; /* The caller can return, no full resync needed. */
1033
1034need_full_resync:
1035 /* We need a full resync for some reason... Note that we can't
1036 * reply to PSYNC right now if a full SYNC is needed. The reply
1037 * must include the master offset at the time the RDB file we transfer
1038 * is generated, so we need to delay the reply to that moment. */
1039 return C_ERR;
1040}
1041
1042/* Start a BGSAVE for replication goals, which is, selecting the disk or
1043 * socket target depending on the configuration, and making sure that
1044 * the script cache is flushed before to start.
1045 *
1046 * The mincapa argument is the bitwise AND among all the slaves capabilities
1047 * of the slaves waiting for this BGSAVE, so represents the slave capabilities
1048 * all the slaves support. Can be tested via SLAVE_CAPA_* macros.
1049 *
1050 * Side effects, other than starting a BGSAVE:
1051 *
1052 * 1) Handle the slaves in WAIT_START state, by preparing them for a full
1053 * sync if the BGSAVE was successfully started, or sending them an error
1054 * and dropping them from the list of slaves.
1055 *
1056 * 2) Flush the Lua scripting script cache if the BGSAVE was actually
1057 * started.
1058 *
1059 * Returns C_OK on success or C_ERR otherwise. */
1060int startBgsaveForReplication(int mincapa, int req) {
1061 int retval;
1062 int socket_target = 0;
1063 listIter li;
1064 listNode *ln;
1065
1066 /* We use a socket target if slave can handle the EOF marker and we're configured to do diskless syncs.
1067 * Note that in case we're creating a "filtered" RDB (functions-only, for example) we also force socket replication
1068 * to avoid overwriting the snapshot RDB file with filtered data. */
1069 socket_target = (server.repl_diskless_sync || req & SLAVE_REQ_RDB_MASK) && (mincapa & SLAVE_CAPA_EOF);
1070 /* `SYNC` should have failed with error if we don't support socket and require a filter, assert this here */
1071 serverAssert(socket_target || !(req & SLAVE_REQ_RDB_MASK));
1072
1073 int slots_req = req & SLAVE_REQ_SLOTS_SNAPSHOT;
1074 serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s%s",
1075 socket_target ? (slots_req ? "slot migration destination socket" : "replicas sockets") : "disk",
1076 (req & SLAVE_REQ_RDB_CHANNEL) ? " (rdb-channel)" : "");
1077
1078 rdbSaveInfo rsi, *rsiptr;
1079 rsiptr = rdbPopulateSaveInfo(&rsi);
1080 /* Only do rdbSave* when rsiptr is not NULL,
1081 * otherwise slave will miss repl-stream-db. */
1082 if (rsiptr) {
1083 if (socket_target)
1084 retval = rdbSaveToSlavesSockets(req,rsiptr);
1085 else {
1086 /* Keep the page cache since it'll get used soon */
1087 retval = rdbSaveBackground(req, server.rdb_filename, rsiptr, RDBFLAGS_REPLICATION | RDBFLAGS_KEEP_CACHE);
1088 }
1089 if (server.repl_debug_pause & REPL_DEBUG_AFTER_FORK)
1090 debugPauseProcess();
1091 } else {
1092 serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");
1093 retval = C_ERR;
1094 }
1095
1096 /* If we succeeded to start a BGSAVE with disk target, let's remember
1097 * this fact, so that we can later delete the file if needed. Note
1098 * that we don't set the flag to 1 if the feature is disabled, otherwise
1099 * it would never be cleared: the file is not deleted. This way if
1100 * the user enables it later with CONFIG SET, we are fine. */
1101 if (retval == C_OK && !socket_target && server.rdb_del_sync_files)
1102 RDBGeneratedByReplication = 1;
1103
1104 /* If we failed to BGSAVE, remove the slaves waiting for a full
1105 * resynchronization from the list of slaves, inform them with
1106 * an error about what happened, close the connection ASAP. */
1107 if (retval == C_ERR) {
1108 serverLog(LL_WARNING,"BGSAVE for replication failed");
1109 listRewind(server.slaves,&li);
1110 while((ln = listNext(&li))) {
1111 client *slave = ln->value;
1112
1113 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
1114 slave->replstate = REPL_STATE_NONE;
1115 slave->flags &= ~CLIENT_SLAVE;
1116 listDelNode(server.slaves,ln);
1117 addReplyError(slave,
1118 "BGSAVE failed, replication can't continue");
1119 slave->flags |= CLIENT_CLOSE_AFTER_REPLY;
1120 }
1121 }
1122 return retval;
1123 }
1124
1125 /* If the target is socket, rdbSaveToSlavesSockets() already setup
1126 * the slaves for a full resync. Otherwise for disk target do it now.*/
1127 if (!socket_target) {
1128 listRewind(server.slaves,&li);
1129 while((ln = listNext(&li))) {
1130 client *slave = ln->value;
1131
1132 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
1133 /* Check slave has the exact requirements */
1134 if (slave->slave_req != req)
1135 continue;
1136 replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset());
1137 }
1138 }
1139 }
1140
1141 return retval;
1142}
1143
1144/* SYNC and PSYNC command implementation. */
1145void syncCommand(client *c) {
1146 /* ignore SYNC if already slave or in monitor mode */
1147 if (c->flags & CLIENT_SLAVE) return;
1148
1149 /* Check if this is a failover request to a replica with the same replid and
1150 * become a master if so. */
1151 if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr,"psync") &&
1152 !strcasecmp(c->argv[3]->ptr,"failover"))
1153 {
1154 serverLog(LL_NOTICE, "Failover request received for replid %s.",
1155 (unsigned char *)c->argv[1]->ptr);
1156 if (!server.masterhost) {
1157 addReplyError(c, "PSYNC FAILOVER can't be sent to a master.");
1158 return;
1159 }
1160
1161 if (!strcasecmp(c->argv[1]->ptr,server.replid)) {
1162 if (server.cluster_enabled) {
1163 clusterPromoteSelfToMaster();
1164 } else {
1165 replicationUnsetMaster();
1166 }
1167 sds client = catClientInfoString(sdsempty(),c);
1168 serverLog(LL_NOTICE,
1169 "MASTER MODE enabled (failover request from '%s')",client);
1170 sdsfree(client);
1171 } else {
1172 addReplyError(c, "PSYNC FAILOVER replid must match my replid.");
1173 return;
1174 }
1175 }
1176
1177 /* Don't let replicas sync with us while we're failing over */
1178 if (server.failover_state != NO_FAILOVER) {
1179 addReplyError(c,"-NOMASTERLINK Can't SYNC while failing over");
1180 return;
1181 }
1182
1183 /* Refuse SYNC requests if we are a slave but the link with our master
1184 * is not ok... */
1185 if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
1186 addReplyError(c,"-NOMASTERLINK Can't SYNC while not connected with my master");
1187 return;
1188 }
1189
1190 /* SYNC can't be issued when the server has pending data to send to
1191 * the client about already issued commands. We need a fresh reply
1192 * buffer registering the differences between the BGSAVE and the current
1193 * dataset, so that we can copy to other slaves if needed. */
1194 if (clientHasPendingReplies(c)) {
1195 addReplyError(c,"SYNC and PSYNC are invalid with pending output");
1196 return;
1197 }
1198
1199 /* Fail sync if slave doesn't support EOF capability but wants a filtered RDB. This is because we force filtered
1200 * RDB's to be generated over a socket and not through a file to avoid conflicts with the snapshot files. Forcing
1201 * use of a socket is handled, if needed, in `startBgsaveForReplication`. */
1202 if (c->slave_req & SLAVE_REQ_RDB_MASK && !(c->slave_capa & SLAVE_CAPA_EOF)) {
1203 addReplyError(c,"Filtered replica requires EOF capability");
1204 return;
1205 }
1206
1207 serverLog(LL_NOTICE,"Replica %s asks for synchronization",
1208 replicationGetSlaveName(c));
1209
1210 /* Try a partial resynchronization if this is a PSYNC command.
1211 * If it fails, we continue with usual full resynchronization, however
1212 * when this happens replicationSetupSlaveForFullResync will replied
1213 * with:
1214 *
1215 * +FULLRESYNC <replid> <offset>
1216 *
1217 * So the slave knows the new replid and offset to try a PSYNC later
1218 * if the connection with the master is lost. */
1219 if (!strcasecmp(c->argv[0]->ptr,"psync")) {
1220 long long psync_offset;
1221 if (getLongLongFromObjectOrReply(c, c->argv[2], &psync_offset, NULL) != C_OK) {
1222 serverLog(LL_WARNING, "Replica %s asks for synchronization but with a wrong offset",
1223 replicationGetSlaveName(c));
1224 return;
1225 }
1226
1227 if (masterTryPartialResynchronization(c, psync_offset) == C_OK) {
1228 server.stat_sync_partial_ok++;
1229 return; /* No full resync needed, return. */
1230 } else {
1231 char *master_replid = c->argv[1]->ptr;
1232
1233 /* Increment stats for failed PSYNCs, but only if the
1234 * replid is not "?", as this is used by slaves to force a full
1235 * resync on purpose when they are not able to partially
1236 * resync. */
1237 if (master_replid[0] != '?') server.stat_sync_partial_err++;
1238 if (c->slave_capa & SLAVE_CAPA_RDB_CHANNEL_REPL) {
1239 int len;
1240 char buf[128];
1241 /* Replica is capable of rdbchannel replication. This is
1242 * replica's main channel. Let replica know full sync is needed.
1243 * Replica will open another connection (rdbchannel). Once rdb
1244 * delivery starts, we'll stream repl data to the main channel.*/
1245 c->flags |= CLIENT_SLAVE;
1246 c->replstate = SLAVE_STATE_WAIT_RDB_CHANNEL;
1247 c->repl_ack_time = server.unixtime;
1248 listAddNodeTail(server.slaves, c);
1249 createReplicationBacklogIfNeeded();
1250
1251 serverLog(LL_NOTICE,
1252 "Replica %s is capable of rdb channel synchronization, and partial sync isn't possible. "
1253 "Full sync will continue with dedicated rdb channel.",
1254 replicationGetSlaveName(c));
1255
1256 /* Send +RDBCHANNELSYNC with client id so we can associate replica connections on master.*/
1257 len = snprintf(buf, sizeof(buf), "+RDBCHANNELSYNC %llu\r\n",
1258 (unsigned long long) c->id);
1259 if (connWrite(c->conn, buf, strlen(buf)) != len)
1260 freeClientAsync(c);
1261
1262 return;
1263 }
1264 }
1265 } else {
1266 /* If a slave uses SYNC, we are dealing with an old implementation
1267 * of the replication protocol (like redis-cli --slave). Flag the client
1268 * so that we don't expect to receive REPLCONF ACK feedbacks. */
1269 c->flags |= CLIENT_PRE_PSYNC;
1270 }
1271
1272 /* Full resynchronization. */
1273 server.stat_sync_full++;
1274
1275 /* Setup the slave as one waiting for BGSAVE to start. The following code
1276 * paths will change the state if we handle the slave differently. */
1277 c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
1278 if (server.repl_disable_tcp_nodelay)
1279 connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */
1280 c->repldbfd = -1;
1281 c->flags |= CLIENT_SLAVE;
1282 listAddNodeTail(server.slaves,c);
1283
1284 /* Create the replication backlog if needed. */
1285 createReplicationBacklogIfNeeded();
1286
1287 /* Keep the client in the main thread to avoid data races between the
1288 * connWrite call in startBgsaveForReplication and the client's event
1289 * handler in IO threads. */
1290 if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c);
1291
1292 /* CASE 1: BGSAVE is in progress, with disk target. */
1293 if (server.child_type == CHILD_TYPE_RDB &&
1294 server.rdb_child_type == RDB_CHILD_TYPE_DISK)
1295 {
1296 /* Ok a background save is in progress. Let's check if it is a good
1297 * one for replication, i.e. if there is another slave that is
1298 * registering differences since the server forked to save. */
1299 client *slave;
1300 listNode *ln;
1301 listIter li;
1302
1303 listRewind(server.slaves,&li);
1304 while((ln = listNext(&li))) {
1305 slave = ln->value;
1306 /* If the client needs a buffer of commands, we can't use
1307 * a replica without replication buffer. */
1308 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
1309 (!(slave->flags & CLIENT_REPL_RDBONLY) ||
1310 (c->flags & CLIENT_REPL_RDBONLY)))
1311 break;
1312 }
1313 /* To attach this slave, we check that it has at least all the
1314 * capabilities of the slave that triggered the current BGSAVE
1315 * and its exact requirements. */
1316 if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa) &&
1317 c->slave_req == slave->slave_req) {
1318 /* Perfect, the server is already registering differences for
1319 * another slave. Set the right state, and copy the buffer.
1320 * We don't copy buffer if clients don't want. */
1321 if (!(c->flags & CLIENT_REPL_RDBONLY))
1322 copyReplicaOutputBuffer(c,slave);
1323 replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
1324 serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
1325 } else {
1326 /* No way, we need to wait for the next BGSAVE in order to
1327 * register differences. */
1328 serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC");
1329 }
1330
1331 /* CASE 2: BGSAVE is in progress, with socket target. */
1332 } else if (server.child_type == CHILD_TYPE_RDB &&
1333 server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
1334 {
1335 /* There is an RDB child process but it is writing directly to
1336 * children sockets. We need to wait for the next BGSAVE
1337 * in order to synchronize. */
1338 serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
1339
1340 /* CASE 3: There is no BGSAVE is in progress. */
1341 } else {
1342 if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF) &&
1343 server.repl_diskless_sync_delay)
1344 {
1345 /* Diskless replication RDB child is created inside
1346 * replicationCron() since we want to delay its start a
1347 * few seconds to wait for more slaves to arrive. */
1348 serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
1349 } else {
1350 /* We don't have a BGSAVE in progress, let's start one. Diskless
1351 * or disk-based mode is determined by replica's capacity. */
1352 if (!hasActiveChildProcess()) {
1353 startBgsaveForReplication(c->slave_capa, c->slave_req);
1354 } else {
1355 serverLog(LL_NOTICE,
1356 "No BGSAVE in progress, but another BG operation is active. "
1357 "BGSAVE for replication delayed");
1358 }
1359 }
1360 }
1361 return;
1362}
1363
1364/* REPLCONF <option> <value> <option> <value> ...
1365 * This command is used by a replica in order to configure the replication
1366 * process before starting it with the SYNC command.
1367 * This command is also used by a master in order to get the replication
1368 * offset from a replica.
1369 *
1370 * Currently we support these options:
1371 *
1372 * - listening-port <port>
1373 * - ip-address <ip>
1374 * What is the listening ip and port of the Replica redis instance, so that
1375 * the master can accurately lists replicas and their listening ports in the
1376 * INFO output.
1377 *
1378 * - capa <eof|psync2|rdb-channel-repl>
1379 * What is the capabilities of this instance.
1380 * eof: supports EOF-style RDB transfer for diskless replication.
1381 * psync2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
1382 *
1383 * - ack <offset> [fack <aofofs>]
1384 * Replica informs the master the amount of replication stream that it
1385 * processed so far, and optionally the replication offset fsynced to the AOF file.
1386 * This special pattern doesn't reply to the caller.
1387 *
1388 * - getack <dummy>
1389 * Unlike other subcommands, this is used by master to get the replication
1390 * offset from a replica.
1391 *
1392 * - rdb-only <0|1>
1393 * Only wants RDB snapshot without replication buffer.
1394 *
1395 * - rdb-filter-only <include-filters>
1396 * Define "include" filters for the RDB snapshot. Currently we only support
1397 * a single include filter: "functions". Passing an empty string "" will
1398 * result in an empty RDB.
1399 *
1400 * - main-ch-client-id <client-id>
1401 * Replica's main channel informs master that this is the main channel of the
1402 * rdb channel identified by the client-id. */
1403void replconfCommand(client *c) {
1404 int j;
1405
1406 if ((c->argc % 2) == 0) {
1407 /* Number of arguments must be odd to make sure that every
1408 * option has a corresponding value. */
1409 addReplyErrorObject(c,shared.syntaxerr);
1410 return;
1411 }
1412
1413 /* Process every option-value pair. */
1414 for (j = 1; j < c->argc; j+=2) {
1415 if (!strcasecmp(c->argv[j]->ptr,"listening-port")) {
1416 long port;
1417
1418 if ((getLongFromObjectOrReply(c,c->argv[j+1],
1419 &port,NULL) != C_OK))
1420 return;
1421 c->slave_listening_port = port;
1422 } else if (!strcasecmp(c->argv[j]->ptr,"ip-address")) {
1423 sds addr = c->argv[j+1]->ptr;
1424 if (sdslen(addr) < NET_HOST_STR_LEN) {
1425 if (c->slave_addr) sdsfree(c->slave_addr);
1426 c->slave_addr = sdsdup(addr);
1427 } else {
1428 addReplyErrorFormat(c,"REPLCONF ip-address provided by "
1429 "replica instance is too long: %zd bytes", sdslen(addr));
1430 return;
1431 }
1432 } else if (!strcasecmp(c->argv[j]->ptr,"capa")) {
1433 /* Ignore capabilities not understood by this master. */
1434 if (!strcasecmp(c->argv[j+1]->ptr,"eof"))
1435 c->slave_capa |= SLAVE_CAPA_EOF;
1436 else if (!strcasecmp(c->argv[j+1]->ptr,"psync2"))
1437 c->slave_capa |= SLAVE_CAPA_PSYNC2;
1438 else if (!strcasecmp(c->argv[j+1]->ptr,"rdb-channel-repl") && server.repl_rdb_channel &&
1439 server.repl_diskless_sync) {
1440 c->slave_capa |= SLAVE_CAPA_RDB_CHANNEL_REPL;
1441 }
1442 } else if (!strcasecmp(c->argv[j]->ptr,"ack")) {
1443 /* REPLCONF ACK is used by slave to inform the master the amount
1444 * of replication stream that it processed so far. It is an
1445 * internal only command that normal clients should never use. */
1446 long long offset;
1447
1448 if (!(c->flags & CLIENT_SLAVE)) return;
1449 if ((getLongLongFromObject(c->argv[j+1], &offset) != C_OK))
1450 return;
1451 if (offset > c->repl_ack_off)
1452 c->repl_ack_off = offset;
1453 if (c->argc > j+3 && !strcasecmp(c->argv[j+2]->ptr,"fack")) {
1454 if ((getLongLongFromObject(c->argv[j+3], &offset) != C_OK))
1455 return;
1456 if (offset > c->repl_aof_off)
1457 c->repl_aof_off = offset;
1458 }
1459 c->repl_ack_time = server.unixtime;
1460 /* If this was a diskless replication, we need to really put
1461 * the slave online when the first ACK is received (which
1462 * confirms slave is online and ready to get more data). This
1463 * allows for simpler and less CPU intensive EOF detection
1464 * when streaming RDB files.
1465 * There's a chance the ACK got to us before we detected that the
1466 * bgsave is done (since that depends on cron ticks), so run a
1467 * quick check first (instead of waiting for the next ACK. */
1468 if (server.child_type == CHILD_TYPE_RDB && c->replstate == SLAVE_STATE_WAIT_BGSAVE_END)
1469 checkChildrenDone();
1470 if (c->repl_start_cmd_stream_on_ack && c->replstate == SLAVE_STATE_ONLINE)
1471 replicaStartCommandStream(c);
1472 /* If state is send_bulk_and_stream, it means this is the main
1473 * channel of the slave in rdbchannel replication. Normally, slave
1474 * will be put online after rdb fork is completed. There is chance
1475 * that 'ack' might be received before we detect bgsave is done. */
1476 if (c->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM)
1477 replicaPutOnline(c);
1478 /* Note: this command does not reply anything! */
1479 return;
1480 } else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
1481 /* REPLCONF GETACK is used in order to request an ACK ASAP
1482 * to the slave. */
1483 if (server.masterhost && server.master) replicationSendAck();
1484 return;
1485 } else if (!strcasecmp(c->argv[j]->ptr,"rdb-only")) {
1486 /* REPLCONF RDB-ONLY is used to identify the client only wants
1487 * RDB snapshot without replication buffer. */
1488 long rdb_only = 0;
1489 if (getRangeLongFromObjectOrReply(c,c->argv[j+1],
1490 0,1,&rdb_only,NULL) != C_OK)
1491 return;
1492 if (rdb_only == 1) {
1493 c->flags |= CLIENT_REPL_RDBONLY;
1494 /* If replicas ask for RDB only, We can apply the background
1495 * RDB transfer optimization based on the configurations. */
1496 if (server.repl_rdb_channel && server.repl_diskless_sync)
1497 c->slave_req |= SLAVE_REQ_RDB_CHANNEL;
1498 } else {
1499 c->flags &= ~CLIENT_REPL_RDBONLY;
1500 c->slave_req &= ~SLAVE_REQ_RDB_CHANNEL;
1501 }
1502 } else if (!strcasecmp(c->argv[j]->ptr,"rdb-filter-only")) {
1503 /* REPLCONFG RDB-FILTER-ONLY is used to define "include" filters
1504 * for the RDB snapshot. Currently we only support a single
1505 * include filter: "functions". In the future we may want to add
1506 * other filters like key patterns, key types, non-volatile, module
1507 * aux fields, ...
1508 * We might want to add the complementing "RDB-FILTER-EXCLUDE" to
1509 * filter out certain data. */
1510 int filter_count, i;
1511 sds *filters;
1512 if (!(filters = sdssplitargs(c->argv[j+1]->ptr, &filter_count))) {
1513 addReplyError(c, "Missing rdb-filter-only values");
1514 return;
1515 }
1516 /* By default filter out all parts of the rdb */
1517 c->slave_req |= SLAVE_REQ_RDB_EXCLUDE_DATA;
1518 c->slave_req |= SLAVE_REQ_RDB_EXCLUDE_FUNCTIONS;
1519 for (i = 0; i < filter_count; i++) {
1520 if (!strcasecmp(filters[i], "functions"))
1521 c->slave_req &= ~SLAVE_REQ_RDB_EXCLUDE_FUNCTIONS;
1522 else {
1523 addReplyErrorFormat(c, "Unsupported rdb-filter-only option: %s", (char*)filters[i]);
1524 sdsfreesplitres(filters, filter_count);
1525 return;
1526 }
1527 }
1528 sdsfreesplitres(filters, filter_count);
1529 } else if (!strcasecmp(c->argv[j]->ptr, "rdb-channel")) {
1530 long rdb_channel = 0;
1531 if (getRangeLongFromObjectOrReply(c, c->argv[j + 1], 0, 1, &rdb_channel, NULL) != C_OK)
1532 return;
1533 if (rdb_channel == 1) {
1534 c->flags |= CLIENT_REPL_RDB_CHANNEL;
1535 } else {
1536 c->flags &= ~CLIENT_REPL_RDB_CHANNEL;
1537 }
1538 } else if (!strcasecmp(c->argv[j]->ptr, "main-ch-client-id")) {
1539 /* REPLCONF main-ch-client-id <client-id> is used to identify
1540 * the current replica rdb channel with existing main channel
1541 * connection. */
1542 long long client_id = 0;
1543 client *main_ch;
1544 if (getLongLongFromObjectOrReply(c, c->argv[j + 1], &client_id, NULL) != C_OK)
1545 return;
1546 main_ch = lookupClientByID(client_id);
1547 if (!main_ch || main_ch->replstate != SLAVE_STATE_WAIT_RDB_CHANNEL) {
1548 addReplyErrorFormat(c, "Unrecognized RDB client id: %lld", client_id);
1549 return;
1550 }
1551 c->main_ch_client_id = (uint64_t)client_id;
1552 /* Inherit the rdb-no-compress request from the main channel. */
1553 if (main_ch->slave_req & SLAVE_REQ_RDB_NO_COMPRESS)
1554 c->slave_req |= SLAVE_REQ_RDB_NO_COMPRESS;
1555 } else if (!strcasecmp(c->argv[j]->ptr, "rdb-no-compress")) {
1556 long rdb_no_compress = 0;
1557 if (getRangeLongFromObjectOrReply(c, c->argv[j + 1], 0, 1, &rdb_no_compress, NULL) != C_OK)
1558 return;
1559 if (rdb_no_compress == 1) {
1560 c->slave_req |= SLAVE_REQ_RDB_NO_COMPRESS;
1561 } else {
1562 c->slave_req &= ~SLAVE_REQ_RDB_NO_COMPRESS;
1563 }
1564 } else {
1565 addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
1566 (char*)c->argv[j]->ptr);
1567 return;
1568 }
1569 }
1570 addReply(c,shared.ok);
1571}
1572
1573/* This function puts a replica in the online state, and should be called just
1574 * after a replica received the RDB file for the initial synchronization.
1575 *
1576 * It does a few things:
1577 * 1) Put the slave in ONLINE state.
1578 * 2) Update the count of "good replicas".
1579 * 3) Trigger the module event.
1580 *
1581 * the return value indicates that the replica should be disconnected.
1582 * */
1583int replicaPutOnline(client *slave) {
1584 if (slave->flags & CLIENT_REPL_RDBONLY) {
1585 slave->replstate = SLAVE_STATE_RDB_TRANSMITTED;
1586 /* The client asked for RDB only so we should close it ASAP */
1587 serverLog(LL_NOTICE,
1588 "RDB transfer completed, rdb only replica (%s) should be disconnected asap",
1589 replicationGetSlaveName(slave));
1590 return 0;
1591 }
1592
1593 /* Don't put migration destination client online. */
1594 if (slave->flags & CLIENT_ASM_MIGRATING) return 0;
1595
1596 slave->replstate = SLAVE_STATE_ONLINE;
1597 slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
1598
1599 refreshGoodSlavesCount();
1600 /* Fire the replica change modules event. */
1601 moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE,
1602 REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE,
1603 NULL);
1604 serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
1605 replicationGetSlaveName(slave));
1606 return 1;
1607}
1608
1609/* This function should be called just after a replica received the RDB file
1610 * for the initial synchronization, and we are finally ready to send the
1611 * incremental stream of commands.
1612 *
1613 * It does a few things:
1614 * 1) Close the replica's connection async if it doesn't need replication
1615 * commands buffer stream, since it actually isn't a valid replica.
1616 * 2) Make sure the writable event is re-installed, since when calling the SYNC
1617 * command we had no replies and it was disabled, and then we could
1618 * accumulate output buffer data without sending it to the replica so it
1619 * won't get mixed with the RDB stream. */
1620void replicaStartCommandStream(client *slave) {
1621 serverAssert(!(slave->flags & CLIENT_REPL_RDBONLY));
1622 slave->repl_start_cmd_stream_on_ack = 0;
1623
1624 putClientInPendingWriteQueue(slave);
1625}
1626
1627/* We call this function periodically to remove an RDB file that was
1628 * generated because of replication, in an instance that is otherwise
1629 * without any persistence. We don't want instances without persistence
1630 * to take RDB files around, this violates certain policies in certain
1631 * environments. */
1632void removeRDBUsedToSyncReplicas(void) {
1633 /* If the feature is disabled, return ASAP but also clear the
1634 * RDBGeneratedByReplication flag in case it was set. Otherwise if the
1635 * feature was enabled, but gets disabled later with CONFIG SET, the
1636 * flag may remain set to one: then next time the feature is re-enabled
1637 * via CONFIG SET we have it set even if no RDB was generated
1638 * because of replication recently. */
1639 if (!server.rdb_del_sync_files) {
1640 RDBGeneratedByReplication = 0;
1641 return;
1642 }
1643
1644 if (allPersistenceDisabled() && RDBGeneratedByReplication) {
1645 client *slave;
1646 listNode *ln;
1647 listIter li;
1648
1649 int delrdb = 1;
1650 listRewind(server.slaves,&li);
1651 while((ln = listNext(&li))) {
1652 slave = ln->value;
1653 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
1654 slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END ||
1655 slave->replstate == SLAVE_STATE_SEND_BULK)
1656 {
1657 delrdb = 0;
1658 break; /* No need to check the other replicas. */
1659 }
1660 }
1661 if (delrdb) {
1662 struct stat sb;
1663 if (lstat(server.rdb_filename,&sb) != -1) {
1664 RDBGeneratedByReplication = 0;
1665 serverLog(LL_NOTICE,
1666 "Removing the RDB file used to feed replicas "
1667 "in a persistence-less instance");
1668 bg_unlink(server.rdb_filename);
1669 }
1670 }
1671 }
1672}
1673
1674/* Close the repldbfd and reclaim the page cache if the client hold
1675 * the last reference to replication DB */
1676void closeRepldbfd(client *myself) {
1677 listNode *ln;
1678 listIter li;
1679 int reclaim = 1;
1680 listRewind(server.slaves,&li);
1681 while((ln = listNext(&li))) {
1682 client *slave = ln->value;
1683 if (slave != myself && slave->replstate == SLAVE_STATE_SEND_BULK) {
1684 reclaim = 0;
1685 break;
1686 }
1687 }
1688
1689 if (reclaim) {
1690 bioCreateCloseJob(myself->repldbfd, 0, 1);
1691 } else {
1692 close(myself->repldbfd);
1693 }
1694 myself->repldbfd = -1;
1695}
1696
1697void sendBulkToSlave(connection *conn) {
1698 client *slave = connGetPrivateData(conn);
1699 char buf[PROTO_IOBUF_LEN];
1700 ssize_t nwritten, buflen;
1701
1702 /* Before sending the RDB file, we send the preamble as configured by the
1703 * replication process. Currently the preamble is just the bulk count of
1704 * the file in the form "$<length>\r\n". */
1705 if (slave->replpreamble) {
1706 nwritten = connWrite(conn,slave->replpreamble,sdslen(slave->replpreamble));
1707 if (nwritten == -1) {
1708 serverLog(LL_WARNING,
1709 "Write error sending RDB preamble to replica: %s",
1710 connGetLastError(conn));
1711 freeClient(slave);
1712 return;
1713 }
1714 atomicIncr(server.stat_net_repl_output_bytes, nwritten);
1715 sdsrange(slave->replpreamble,nwritten,-1);
1716 if (sdslen(slave->replpreamble) == 0) {
1717 sdsfree(slave->replpreamble);
1718 slave->replpreamble = NULL;
1719 /* fall through sending data. */
1720 } else {
1721 return;
1722 }
1723 }
1724
1725 /* If the preamble was already transferred, send the RDB bulk data. */
1726 if (lseek(slave->repldbfd,slave->repldboff,SEEK_SET) == -1) {
1727 serverLog(LL_WARNING,"Failed to lseek the RDB file to offset %lld for replica %s: %s",
1728 (long long)slave->repldboff, replicationGetSlaveName(slave), strerror(errno));
1729 freeClient(slave);
1730 return;
1731 }
1732 buflen = read(slave->repldbfd,buf,PROTO_IOBUF_LEN);
1733 if (buflen <= 0) {
1734 serverLog(LL_WARNING,"Read error sending DB to replica: %s",
1735 (buflen == 0) ? "premature EOF" : strerror(errno));
1736 freeClient(slave);
1737 return;
1738 }
1739 if ((nwritten = connWrite(conn,buf,buflen)) == -1) {
1740 if (connGetState(conn) != CONN_STATE_CONNECTED) {
1741 serverLog(LL_WARNING,"Write error sending DB to replica: %s",
1742 connGetLastError(conn));
1743 freeClient(slave);
1744 }
1745 return;
1746 }
1747 slave->repldboff += nwritten;
1748 atomicIncr(server.stat_net_repl_output_bytes, nwritten);
1749 if (slave->repldboff == slave->repldbsize) {
1750 closeRepldbfd(slave);
1751 connSetWriteHandler(slave->conn,NULL);
1752 if (!replicaPutOnline(slave)) {
1753 freeClient(slave);
1754 return;
1755 }
1756 replicaStartCommandStream(slave);
1757 }
1758}
1759
1760/* Remove one write handler from the list of connections waiting to be writable
1761 * during rdb pipe transfer. */
1762void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
1763 if (!connHasWriteHandler(conn))
1764 return;
1765 connSetWriteHandler(conn, NULL);
1766 client *slave = connGetPrivateData(conn);
1767 slave->repl_last_partial_write = 0;
1768 server.rdb_pipe_numconns_writing--;
1769 /* if there are no more writes for now for this conn, or write error: */
1770 if (server.rdb_pipe_numconns_writing == 0) {
1771 if (aeCreateFileEvent(server.el, server.rdb_pipe_read, AE_READABLE, rdbPipeReadHandler,NULL) == AE_ERR) {
1772 serverPanic("Unrecoverable error creating server.rdb_pipe_read file event.");
1773 }
1774 }
1775}
1776
1777/* Called in diskless master during transfer of data from the rdb pipe, when
1778 * the replica becomes writable again. */
1779void rdbPipeWriteHandler(struct connection *conn) {
1780 serverAssert(server.rdb_pipe_bufflen>0);
1781 client *slave = connGetPrivateData(conn);
1782 ssize_t nwritten;
1783 if ((nwritten = connWrite(conn, server.rdb_pipe_buff + slave->repldboff,
1784 server.rdb_pipe_bufflen - slave->repldboff)) == -1)
1785 {
1786 if (connGetState(conn) == CONN_STATE_CONNECTED)
1787 return; /* equivalent to EAGAIN */
1788 serverLog(LL_WARNING,"Write error sending DB to replica: %s",
1789 connGetLastError(conn));
1790 freeClient(slave);
1791 return;
1792 } else {
1793 slave->repldboff += nwritten;
1794 atomicIncr(server.stat_net_repl_output_bytes, nwritten);
1795 if (slave->repldboff < server.rdb_pipe_bufflen) {
1796 slave->repl_last_partial_write = server.unixtime;
1797 return; /* more data to write.. */
1798 }
1799 }
1800 rdbPipeWriteHandlerConnRemoved(conn);
1801}
1802
1803/* Called in diskless master, when there's data to read from the child's rdb pipe */
1804void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask) {
1805 UNUSED(mask);
1806 UNUSED(clientData);
1807 UNUSED(eventLoop);
1808 int i;
1809 if (!server.rdb_pipe_buff)
1810 server.rdb_pipe_buff = zmalloc(PROTO_IOBUF_LEN);
1811 serverAssert(server.rdb_pipe_numconns_writing==0);
1812
1813 while (1) {
1814 server.rdb_pipe_bufflen = read(fd, server.rdb_pipe_buff, PROTO_IOBUF_LEN);
1815 if (server.rdb_pipe_bufflen < 0) {
1816 if (errno == EAGAIN || errno == EWOULDBLOCK)
1817 return;
1818 serverLog(LL_WARNING,"Diskless rdb transfer, read error sending DB to replicas: %s", strerror(errno));
1819 for (i=0; i < server.rdb_pipe_numconns; i++) {
1820 connection *conn = server.rdb_pipe_conns[i];
1821 if (!conn)
1822 continue;
1823 client *slave = connGetPrivateData(conn);
1824 freeClient(slave);
1825 server.rdb_pipe_conns[i] = NULL;
1826 }
1827 killRDBChild();
1828 return;
1829 }
1830
1831 if (server.rdb_pipe_bufflen == 0) {
1832 /* EOF - write end was closed. */
1833 int stillUp = 0;
1834 aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
1835 for (i=0; i < server.rdb_pipe_numconns; i++)
1836 {
1837 connection *conn = server.rdb_pipe_conns[i];
1838 if (!conn)
1839 continue;
1840 stillUp++;
1841 }
1842 serverLog(LL_NOTICE,"Diskless rdb transfer, done reading from pipe, %d replicas still up.", stillUp);
1843 /* Now that the replicas have finished reading, notify the child that it's safe to exit.
1844 * When the server detects the child has exited, it can mark the replica as online, and
1845 * start streaming the replication buffers. */
1846 close(server.rdb_child_exit_pipe);
1847 server.rdb_child_exit_pipe = -1;
1848 return;
1849 }
1850
1851 int stillAlive = 0;
1852 for (i=0; i < server.rdb_pipe_numconns; i++)
1853 {
1854 ssize_t nwritten;
1855 connection *conn = server.rdb_pipe_conns[i];
1856 if (!conn)
1857 continue;
1858
1859 client *slave = connGetPrivateData(conn);
1860 if ((nwritten = connWrite(conn, server.rdb_pipe_buff, server.rdb_pipe_bufflen)) == -1) {
1861 if (connGetState(conn) != CONN_STATE_CONNECTED) {
1862 serverLog(LL_WARNING,"Diskless rdb transfer, write error sending DB to replica: %s",
1863 connGetLastError(conn));
1864 freeClient(slave);
1865 server.rdb_pipe_conns[i] = NULL;
1866 continue;
1867 }
1868 /* An error and still in connected state, is equivalent to EAGAIN */
1869 slave->repldboff = 0;
1870 } else {
1871 /* Note: when use diskless replication, 'repldboff' is the offset
1872 * of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */
1873 slave->repldboff = nwritten;
1874 atomicIncr(server.stat_net_repl_output_bytes, nwritten);
1875 }
1876 /* If we were unable to write all the data to one of the replicas,
1877 * setup write handler (and disable pipe read handler, below) */
1878 if (nwritten != server.rdb_pipe_bufflen) {
1879 slave->repl_last_partial_write = server.unixtime;
1880 server.rdb_pipe_numconns_writing++;
1881 connSetWriteHandler(conn, rdbPipeWriteHandler);
1882 }
1883 stillAlive++;
1884 }
1885
1886 if (stillAlive == 0) {
1887 serverLog(LL_WARNING,"Diskless rdb transfer, last replica dropped, killing fork child.");
1888 /* Avoid deleting events after killRDBChild as it may trigger new bgsaves for other replicas. */
1889 aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
1890 killRDBChild();
1891 break;
1892 }
1893 /* Remove the pipe read handler if at least one write handler was set. */
1894 else if (server.rdb_pipe_numconns_writing) {
1895 aeDeleteFileEvent(server.el, server.rdb_pipe_read, AE_READABLE);
1896 break;
1897 }
1898 }
1899}
1900
1901/* This function is called at the end of every background saving.
1902 *
1903 * The argument bgsaveerr is C_OK if the background saving succeeded
1904 * otherwise C_ERR is passed to the function.
1905 * The 'type' argument is the type of the child that terminated
1906 * (if it had a disk or socket target). */
1907void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
1908 listNode *ln;
1909 listIter li;
1910
1911 /* Note: there's a chance we got here from within the REPLCONF ACK command
1912 * so we must avoid using freeClient, otherwise we'll crash on our way up. */
1913
1914 listRewind(server.slaves,&li);
1915 while((ln = listNext(&li))) {
1916 client *slave = ln->value;
1917
1918 /* We can get here via freeClient()->killRDBChild()->checkChildrenDone(). skip disconnected slaves. */
1919 if (!slave->conn) continue;
1920
1921 if (slave->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM) {
1922 /* This is the main channel of the slave that received the RDB.
1923 * Put it online if RDB delivery is successful. */
1924 if (bgsaveerr == C_OK) {
1925 /* Notify the task that the snapshot bulk delivery is done */
1926 if (slave->flags & CLIENT_ASM_MIGRATING)
1927 asmSlotSnapshotSucceed(slave->task);
1928 replicaPutOnline(slave);
1929 } else {
1930 freeClientAsync(slave);
1931 }
1932 } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
1933 struct redis_stat buf;
1934
1935 if (bgsaveerr != C_OK) {
1936 /* Notify the task that the snapshot bulk delivery failed */
1937 if (slave->flags & CLIENT_ASM_MIGRATING)
1938 asmSlotSnapshotFailed(slave->task);
1939 freeClientAsync(slave);
1940 serverLog(LL_WARNING,"SYNC failed. BGSAVE child returned an error");
1941 continue;
1942 }
1943
1944 /* If this was an RDB on disk save, we have to prepare to send
1945 * the RDB from disk to the slave socket. Otherwise if this was
1946 * already an RDB -> Slaves socket transfer, used in the case of
1947 * diskless replication, our work is trivial, we can just put
1948 * the slave online. */
1949 if (type == RDB_CHILD_TYPE_SOCKET) {
1950 /* Slots snapshot */
1951 if (slave->slave_req & SLAVE_REQ_SLOTS_SNAPSHOT) {
1952 serverLog(LL_NOTICE, "Streamed slots snapshot transfer succeeded");
1953 freeClientAsync(slave);
1954 continue;
1955 }
1956
1957 serverLog(LL_NOTICE,
1958 "Streamed RDB transfer with replica %s succeeded (socket). Waiting for REPLCONF ACK from replica to enable streaming",
1959 replicationGetSlaveName(slave));
1960 /* Note: we wait for a REPLCONF ACK message from the replica in
1961 * order to really put it online (install the write handler
1962 * so that the accumulated data can be transferred). However
1963 * we change the replication state ASAP, since our slave
1964 * is technically online now.
1965 *
1966 * So things work like that:
1967 *
1968 * 1. We end transferring the RDB file via socket.
1969 * 2. The replica is put ONLINE but the write handler
1970 * is not installed.
1971 * 3. The replica however goes really online, and pings us
1972 * back via REPLCONF ACK commands.
1973 * 4. Now we finally install the write handler, and send
1974 * the buffers accumulated so far to the replica.
1975 *
1976 * But why we do that? Because the replica, when we stream
1977 * the RDB directly via the socket, must detect the RDB
1978 * EOF (end of file), that is a special random string at the
1979 * end of the RDB (for streamed RDBs we don't know the length
1980 * in advance). Detecting such final EOF string is much
1981 * simpler and less CPU intensive if no more data is sent
1982 * after such final EOF. So we don't want to glue the end of
1983 * the RDB transfer with the start of the other replication
1984 * data. */
1985 if (!replicaPutOnline(slave)) {
1986 freeClientAsync(slave);
1987 continue;
1988 }
1989 slave->repl_start_cmd_stream_on_ack = 1;
1990 } else {
1991 if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
1992 redis_fstat(slave->repldbfd,&buf) == -1) {
1993 freeClientAsync(slave);
1994 serverLog(LL_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
1995 continue;
1996 }
1997 slave->repldboff = 0;
1998 slave->repldbsize = buf.st_size;
1999 slave->replstate = SLAVE_STATE_SEND_BULK;
2000 slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n",
2001 (unsigned long long) slave->repldbsize);
2002
2003 connSetWriteHandler(slave->conn,NULL);
2004 if (connSetWriteHandler(slave->conn,sendBulkToSlave) == C_ERR) {
2005 freeClientAsync(slave);
2006 continue;
2007 }
2008 }
2009 }
2010 }
2011}
2012
2013/* Change the current instance replication ID with a new, random one.
2014 * This will prevent successful PSYNCs between this master and other
2015 * slaves, so the command should be called when something happens that
2016 * alters the current story of the dataset. */
2017void changeReplicationId(void) {
2018 getRandomHexChars(server.replid,CONFIG_RUN_ID_SIZE);
2019 server.replid[CONFIG_RUN_ID_SIZE] = '\0';
2020}
2021
2022/* Clear (invalidate) the secondary replication ID. This happens, for
2023 * example, after a full resynchronization, when we start a new replication
2024 * history. */
2025void clearReplicationId2(void) {
2026 memset(server.replid2,'0',sizeof(server.replid));
2027 server.replid2[CONFIG_RUN_ID_SIZE] = '\0';
2028 server.second_replid_offset = -1;
2029}
2030
2031/* Use the current replication ID / offset as secondary replication
2032 * ID, and change the current one in order to start a new history.
2033 * This should be used when an instance is switched from slave to master
2034 * so that it can serve PSYNC requests performed using the master
2035 * replication ID. */
2036void shiftReplicationId(void) {
2037 memcpy(server.replid2,server.replid,sizeof(server.replid));
2038 /* We set the second replid offset to the master offset + 1, since
2039 * the slave will ask for the first byte it has not yet received, so
2040 * we need to add one to the offset: for example if, as a slave, we are
2041 * sure we have the same history as the master for 50 bytes, after we
2042 * are turned into a master, we can accept a PSYNC request with offset
2043 * 51, since the slave asking has the same history up to the 50th
2044 * byte, and is asking for the new bytes starting at offset 51. */
2045 server.second_replid_offset = server.master_repl_offset+1;
2046 changeReplicationId();
2047 serverLog(LL_NOTICE,"Setting secondary replication ID to %s, valid up to offset: %lld. New replication ID is %s", server.replid2, server.second_replid_offset, server.replid);
2048}
2049
2050/* ----------------------------------- SLAVE -------------------------------- */
2051
2052/* Replication: Replica side. */
2053void slaveGetPortStr(char *buf, size_t size) {
2054 long long port;
2055 if (server.slave_announce_port) {
2056 port = server.slave_announce_port;
2057 } else if (server.tls_replication && server.tls_port) {
2058 port = server.tls_port;
2059 } else {
2060 port = server.port;
2061 }
2062 ll2string(buf, size, port);
2063}
2064
2065/* Returns 1 if the given replication state is a handshake state,
2066 * 0 otherwise. */
2067int slaveIsInHandshakeState(void) {
2068 return server.repl_state >= REPL_STATE_RECEIVE_PING_REPLY &&
2069 server.repl_state <= REPL_STATE_RECEIVE_PSYNC_REPLY;
2070}
2071
2072/* Avoid the master to detect the slave is timing out while loading the
2073 * RDB file in initial synchronization. We send a single newline character
2074 * that is valid protocol but is guaranteed to either be sent entirely or
2075 * not, since the byte is indivisible.
2076 *
2077 * The function is called in two contexts: while we flush the current
2078 * data with emptyData(), and while we load the new data received as an
2079 * RDB file from the master. */
2080void replicationSendNewlineToMaster(void) {
2081 static time_t newline_sent;
2082 if (time(NULL) != newline_sent) {
2083 newline_sent = time(NULL);
2084 /* Pinging back in this stage is best-effort. */
2085 if (server.repl_transfer_s) connWrite(server.repl_transfer_s, "\n", 1);
2086 }
2087}
2088
2089/* Callback used by emptyData() while flushing away old data to load
2090 * the new dataset received by the master or to clear partial db if loading
2091 * fails. */
2092void replicationEmptyDbCallback(dict *d) {
2093 UNUSED(d);
2094 if (server.repl_state == REPL_STATE_TRANSFER)
2095 replicationSendNewlineToMaster();
2096
2097 processEventsWhileBlocked();
2098}
2099
2100/* Function to flush old db or the partial db on error. */
2101static void rdbLoadEmptyDbFunc(void) {
2102 serverAssert(server.loading);
2103
2104 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
2105 int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
2106 EMPTYDB_NO_FLAGS;
2107
2108 emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
2109}
2110
2111/* Once we have a link with the master and the synchronization was
2112 * performed, this function materializes the master client we store
2113 * at server.master, starting from the specified file descriptor. */
2114void replicationCreateMasterClient(connection *conn, int dbid) {
2115 server.master = createClient(conn);
2116 if (conn)
2117 connSetReadHandler(server.master->conn, readQueryFromClient);
2118
2119 /**
2120 * Important note:
2121 * The CLIENT_DENY_BLOCKING flag is not, and should not, be set here.
2122 * For commands like BLPOP, it makes no sense to block the master
2123 * connection, and such blocking attempt will probably cause deadlock and
2124 * break the replication. We consider such a thing as a bug because
2125 * commands as BLPOP should never be sent on the replication link.
2126 * A possible use-case for blocking the replication link is if a module wants
2127 * to pass the execution to a background thread and unblock after the
2128 * execution is done. This is the reason why we allow blocking the replication
2129 * connection. */
2130 server.master->flags |= CLIENT_MASTER;
2131
2132 /* Allocate a private query buffer for the master client instead of using the reusable query buffer.
2133 * This is done because the master's query buffer data needs to be preserved for my sub-replicas to use. */
2134 server.master->querybuf = sdsempty();
2135 server.master->authenticated = 1;
2136 server.master->reploff = server.master_initial_offset;
2137 server.master->read_reploff = server.master->reploff;
2138 server.master->user = NULL; /* This client can do everything. */
2139 memcpy(server.master->replid, server.master_replid,
2140 sizeof(server.master_replid));
2141 /* If master offset is set to -1, this master is old and is not
2142 * PSYNC capable, so we flag it accordingly. */
2143 if (server.master->reploff == -1)
2144 server.master->flags |= CLIENT_PRE_PSYNC;
2145 if (dbid != -1) selectDb(server.master,dbid);
2146}
2147
2148static int useDisklessLoad(void) {
2149 /* compute boolean decision to use diskless load */
2150 int enabled = server.repl_diskless_load == REPL_DISKLESS_LOAD_ALWAYS || server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB ||
2151 (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0);
2152
2153 if (enabled) {
2154 /* Check all modules handle read errors, otherwise it's not safe to use diskless load. */
2155 if (server.repl_diskless_load != REPL_DISKLESS_LOAD_ALWAYS && !moduleAllDatatypesHandleErrors()) {
2156 serverLog(LL_NOTICE,
2157 "Skipping diskless-load because there are modules that don't handle read errors.");
2158 enabled = 0;
2159 }
2160 /* Check all modules handle async replication, otherwise it's not safe to use diskless load. */
2161 else if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB && !moduleAllModulesHandleReplAsyncLoad()) {
2162 serverLog(LL_NOTICE,
2163 "Skipping diskless-load because there are modules that are not aware of async replication.");
2164 enabled = 0;
2165 }
2166 }
2167 return enabled;
2168}
2169
2170/* Helper function for readSyncBulkPayload() to initialize tempDb
2171 * before socket-loading the new db from master. The tempDb may be populated
2172 * by swapMainDbWithTempDb or freed by disklessLoadDiscardTempDb later. */
2173redisDb *disklessLoadInitTempDb(void) {
2174 return initTempDb();
2175}
2176
2177/* Helper function for readSyncBulkPayload() to discard our tempDb
2178 * when the loading succeeded or failed. */
2179void disklessLoadDiscardTempDb(redisDb *tempDb) {
2180 discardTempDb(tempDb);
2181}
2182
2183/* If we know we got an entirely different data set from our master
2184 * we have no way to incrementally feed our replicas after that.
2185 * We want our replicas to resync with us as well, if we have any sub-replicas.
2186 * This is useful on readSyncBulkPayload in places where we just finished transferring db. */
2187void replicationAttachToNewMaster(void) {
2188 /* Replica starts to apply data from new master, we must discard the cached
2189 * master structure. */
2190 serverAssert(server.master == NULL);
2191 replicationDiscardCachedMaster();
2192
2193 disconnectSlaves(); /* Force our replicas to resync with us as well. */
2194 freeReplicationBacklog(); /* Don't allow our chained replicas to PSYNC. */
2195}
2196
2197/* Asynchronously read the SYNC payload we receive from a master */
2198#define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */
2199void readSyncBulkPayload(connection *conn) {
2200 char buf[PROTO_IOBUF_LEN];
2201 ssize_t nread, readlen, nwritten;
2202 int use_diskless_load = useDisklessLoad();
2203 int rdbchannel = (conn == server.repl_rdb_transfer_s);
2204 int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC :
2205 EMPTYDB_NO_FLAGS;
2206 off_t left;
2207
2208 /* Static vars used to hold the EOF mark, and the last bytes received
2209 * from the server: when they match, we reached the end of the transfer. */
2210 static char eofmark[CONFIG_RUN_ID_SIZE];
2211 static char lastbytes[CONFIG_RUN_ID_SIZE];
2212 static int usemark = 0;
2213
2214 /* If repl_transfer_size == -1 we still have to read the bulk length
2215 * from the master reply. */
2216 if (server.repl_transfer_size == -1) {
2217 nread = connSyncReadLine(conn,buf,1024,server.repl_syncio_timeout*1000);
2218 if (nread == -1) {
2219 serverLog(LL_WARNING,
2220 "I/O error reading bulk count from MASTER: %s",
2221 connGetLastError(conn));
2222 goto error;
2223 } else {
2224 /* nread here is returned by connSyncReadLine(), which calls syncReadLine() and
2225 * convert "\r\n" to '\0' so 1 byte is lost. */
2226 atomicIncr(server.stat_net_repl_input_bytes, nread+1);
2227 }
2228
2229 if (buf[0] == '-') {
2230 serverLog(LL_WARNING,
2231 "MASTER aborted replication with an error: %s",
2232 buf+1);
2233 goto error;
2234 } else if (buf[0] == '\0') {
2235 /* At this stage just a newline works as a PING in order to take
2236 * the connection live. So we refresh our last interaction
2237 * timestamp. */
2238 server.repl_transfer_lastio = server.unixtime;
2239 return;
2240 } else if (buf[0] != '$') {
2241 serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);
2242 goto error;
2243 }
2244
2245 /* There are two possible forms for the bulk payload. One is the
2246 * usual $<count> bulk format. The other is used for diskless transfers
2247 * when the master does not know beforehand the size of the file to
2248 * transfer. In the latter case, the following format is used:
2249 *
2250 * $EOF:<40 bytes delimiter>
2251 *
2252 * At the end of the file the announced delimiter is transmitted. The
2253 * delimiter is long and random enough that the probability of a
2254 * collision with the actual file content can be ignored. */
2255 if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {
2256 usemark = 1;
2257 memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);
2258 memset(lastbytes,0,CONFIG_RUN_ID_SIZE);
2259 /* Set any repl_transfer_size to avoid entering this code path
2260 * at the next call. */
2261 server.repl_transfer_size = 0;
2262 serverLog(LL_NOTICE,
2263 "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s",
2264 use_diskless_load? "to parser":"to disk");
2265 } else {
2266 usemark = 0;
2267 server.repl_transfer_size = strtol(buf+1,NULL,10);
2268 serverLog(LL_NOTICE,
2269 "MASTER <-> REPLICA sync: receiving %lld bytes from master %s",
2270 (long long) server.repl_transfer_size,
2271 use_diskless_load? "to parser":"to disk");
2272 }
2273 return;
2274 }
2275
2276 if (!use_diskless_load) {
2277 /* Read the data from the socket, store it to a file and search
2278 * for the EOF. */
2279 if (usemark) {
2280 readlen = sizeof(buf);
2281 } else {
2282 left = server.repl_transfer_size - server.repl_transfer_read;
2283 readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
2284 }
2285
2286 nread = connRead(conn,buf,readlen);
2287 if (nread <= 0) {
2288 if (connGetState(conn) == CONN_STATE_CONNECTED) {
2289 /* equivalent to EAGAIN */
2290 return;
2291 }
2292 serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
2293 (nread == -1) ? connGetLastError(conn) : "connection lost");
2294 cancelReplicationHandshake(1);
2295 return;
2296 }
2297 atomicIncr(server.stat_net_repl_input_bytes, nread);
2298
2299 /* When a mark is used, we want to detect EOF asap in order to avoid
2300 * writing the EOF mark into the file... */
2301 int eof_reached = 0;
2302
2303 if (usemark) {
2304 /* Update the last bytes array, and check if it matches our
2305 * delimiter. */
2306 if (nread >= CONFIG_RUN_ID_SIZE) {
2307 memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,
2308 CONFIG_RUN_ID_SIZE);
2309 } else {
2310 int rem = CONFIG_RUN_ID_SIZE-nread;
2311 memmove(lastbytes,lastbytes+nread,rem);
2312 memcpy(lastbytes+rem,buf,nread);
2313 }
2314 if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0)
2315 eof_reached = 1;
2316 }
2317
2318 /* Update the last I/O time for the replication transfer (used in
2319 * order to detect timeouts during replication), and write what we
2320 * got from the socket to the dump file on disk. */
2321 server.repl_transfer_lastio = server.unixtime;
2322 if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {
2323 serverLog(LL_WARNING,
2324 "Write error or short write writing to the DB dump file "
2325 "needed for MASTER <-> REPLICA synchronization: %s",
2326 (nwritten == -1) ? strerror(errno) : "short write");
2327 goto error;
2328 }
2329 server.repl_transfer_read += nread;
2330
2331 /* Delete the last 40 bytes from the file if we reached EOF. */
2332 if (usemark && eof_reached) {
2333 if (ftruncate(server.repl_transfer_fd,
2334 server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1)
2335 {
2336 serverLog(LL_WARNING,
2337 "Error truncating the RDB file received from the master "
2338 "for SYNC: %s", strerror(errno));
2339 goto error;
2340 }
2341 }
2342
2343 /* Sync data on disk from time to time, otherwise at the end of the
2344 * transfer we may suffer a big delay as the memory buffers are copied
2345 * into the actual disk. */
2346 if (server.repl_transfer_read >=
2347 server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC)
2348 {
2349 off_t sync_size = server.repl_transfer_read -
2350 server.repl_transfer_last_fsync_off;
2351 rdb_fsync_range(server.repl_transfer_fd,
2352 server.repl_transfer_last_fsync_off, sync_size);
2353 server.repl_transfer_last_fsync_off += sync_size;
2354 }
2355
2356 /* Check if the transfer is now complete */
2357 if (!usemark) {
2358 if (server.repl_transfer_read == server.repl_transfer_size)
2359 eof_reached = 1;
2360 }
2361
2362 /* If the transfer is yet not complete, we need to read more, so
2363 * return ASAP and wait for the handler to be called again. */
2364 if (!eof_reached) return;
2365 }
2366
2367 /* We reach this point in one of the following cases:
2368 *
2369 * 1. The replica is using diskless replication, that is, it reads data
2370 * directly from the socket to the Redis memory, without using
2371 * a temporary RDB file on disk. In that case we just block and
2372 * read everything from the socket.
2373 *
2374 * 2. Or when we are done reading from the socket to the RDB file, in
2375 * such case we want just to read the RDB file in memory. */
2376
2377 /* We need to stop any AOF rewriting child before flushing and parsing
2378 * the RDB, otherwise we'll create a copy-on-write disaster. */
2379 if (server.aof_state != AOF_OFF) stopAppendOnly();
2380 /* Also try to stop save RDB child before flushing and parsing the RDB:
2381 * 1. Ensure background save doesn't overwrite synced data after being loaded.
2382 * 2. Avoid copy-on-write disaster. */
2383 if (server.child_type == CHILD_TYPE_RDB) {
2384 if (!use_diskless_load) {
2385 serverLog(LL_NOTICE,
2386 "Replica is about to load the RDB file received from the "
2387 "master, but there is a pending RDB child running. "
2388 "Killing process %ld and removing its temp file to avoid "
2389 "any race",
2390 (long) server.child_pid);
2391 }
2392 killRDBChild();
2393 }
2394
2395 /* Attach to the new master immediately if we are not using swapdb. */
2396 if (!use_diskless_load || server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB)
2397 replicationAttachToNewMaster();
2398
2399 /* Before loading the DB into memory we need to delete the readable
2400 * handler, otherwise it will get called recursively since
2401 * rdbLoad() will call the event loop to process events from time to
2402 * time for non blocking loading. */
2403 connSetReadHandler(conn, NULL);
2404
2405 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
2406 rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
2407 if (use_diskless_load) {
2408 rio rdb;
2409 redisDb *dbarray;
2410 functionsLibCtx* functions_lib_ctx;
2411 int asyncLoading = 0;
2412
2413 if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
2414 moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD,
2415 REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED,
2416 NULL);
2417 /* Async loading means we continue serving read commands during full resync, and
2418 * "swap" the new db with the old db only when loading is done.
2419 * It is enabled only on SWAPDB diskless replication when master replication ID hasn't changed,
2420 * because in that state the old content of the db represents a different point in time of the same
2421 * data set we're currently receiving from the master. */
2422 if (memcmp(server.replid, server.master_replid, CONFIG_RUN_ID_SIZE) == 0) {
2423 asyncLoading = 1;
2424 }
2425 }
2426
2427 /* Set disklessLoadingRio before calling emptyData() which may yield
2428 * back to networking. */
2429 rioInitWithConn(&rdb,conn,server.repl_transfer_size);
2430 disklessLoadingRio = &rdb;
2431
2432 /* Empty db */
2433 loadingSetFlags(NULL, server.repl_transfer_size, asyncLoading);
2434 if (server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) {
2435 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");
2436 /* Note that inside loadingSetFlags(), server.loading is set.
2437 * replicationEmptyDbCallback() may yield back to event-loop to
2438 * reply -LOADING. */
2439 emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
2440 }
2441 loadingFireEvent(RDBFLAGS_REPLICATION);
2442
2443 if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
2444 dbarray = disklessLoadInitTempDb();
2445 functions_lib_ctx = functionsLibCtxCreate();
2446 } else {
2447 dbarray = server.db;
2448 functions_lib_ctx = functionsLibCtxGetCurrent();
2449 functionsLibCtxClear(functions_lib_ctx);
2450 }
2451
2452 /* Put the socket in blocking mode to simplify RDB transfer.
2453 * We'll restore it when the RDB is received. */
2454 connBlock(conn);
2455 connRecvTimeout(conn, server.repl_timeout*1000);
2456
2457 int loadingFailed = 0;
2458 rdbLoadingCtx loadingCtx = { .dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx };
2459 if (rdbLoadRioWithLoadingCtx(&rdb,RDBFLAGS_REPLICATION,&rsi,&loadingCtx) != C_OK) {
2460 /* RDB loading failed. */
2461 serverLog(LL_WARNING,
2462 "Failed trying to load the MASTER synchronization DB "
2463 "from socket, check server logs.");
2464 loadingFailed = 1;
2465 } else if (usemark) {
2466 /* Verify the end mark is correct. */
2467 if (!rioRead(&rdb, buf, CONFIG_RUN_ID_SIZE) ||
2468 memcmp(buf, eofmark, CONFIG_RUN_ID_SIZE) != 0)
2469 {
2470 serverLog(LL_WARNING, "Replication stream EOF marker is broken");
2471 loadingFailed = 1;
2472 }
2473 }
2474 disklessLoadingRio = NULL;
2475
2476 if (loadingFailed) {
2477 rioFreeConn(&rdb, NULL);
2478
2479 if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
2480 /* Discard potentially partially loaded tempDb. */
2481 moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD,
2482 REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_ABORTED,
2483 NULL);
2484
2485 disklessLoadDiscardTempDb(dbarray);
2486 functionsLibCtxFree(functions_lib_ctx);
2487 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding temporary DB in background");
2488 } else {
2489 /* Remove the half-loaded data in case we started with an empty replica. */
2490 emptyData(-1,empty_db_flags,replicationEmptyDbCallback);
2491 }
2492
2493 /* Note that replicationEmptyDbCallback() may yield back to event
2494 * loop to reply -LOADING if flushing the db takes a long time. So,
2495 * stopLoading() must be called after emptyData() above. */
2496 stopLoading(0);
2497
2498 /* This must be called after stopLoading(0) as it checks loading
2499 * flag in case of rdbchannel replication. */
2500 cancelReplicationHandshake(1);
2501
2502 /* Note that there's no point in restarting the AOF on SYNC
2503 * failure, it'll be restarted when sync succeeds or the replica
2504 * gets promoted. */
2505 return;
2506 }
2507
2508 /* RDB loading succeeded if we reach this point. */
2509 if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
2510 /* Cancel all ASM trim jobs as we are about to swap the main db. */
2511 asmCancelTrimJobs();
2512 /* We will soon swap main db with tempDb and replicas will start
2513 * to apply data from new master, we must discard the cached
2514 * master structure and force resync of sub-replicas. */
2515 replicationAttachToNewMaster();
2516
2517 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Swapping active DB with loaded DB");
2518 swapMainDbWithTempDb(dbarray);
2519
2520 /* swap existing functions ctx with the temporary one */
2521 functionsLibCtxSwapWithCurrent(functions_lib_ctx);
2522
2523 moduleFireServerEvent(REDISMODULE_EVENT_REPL_ASYNC_LOAD,
2524 REDISMODULE_SUBEVENT_REPL_ASYNC_LOAD_COMPLETED,
2525 NULL);
2526
2527 /* Delete the old db as it's useless now. */
2528 disklessLoadDiscardTempDb(dbarray);
2529 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Discarding old DB in background");
2530 }
2531
2532 /* Inform about db change, as replication was diskless and didn't cause a save. */
2533 server.dirty++;
2534
2535 stopLoading(1);
2536
2537 /* Cleanup and restore the socket to the original state to continue
2538 * with the normal replication. */
2539 rioFreeConn(&rdb, NULL);
2540 connNonBlock(conn);
2541 connRecvTimeout(conn,0);
2542 } else {
2543
2544 /* Make sure the new file (also used for persistence) is fully synced
2545 * (not covered by earlier calls to rdb_fsync_range). */
2546 if (fsync(server.repl_transfer_fd) == -1) {
2547 serverLog(LL_WARNING,
2548 "Failed trying to sync the temp DB to disk in "
2549 "MASTER <-> REPLICA synchronization: %s",
2550 strerror(errno));
2551 cancelReplicationHandshake(1);
2552 return;
2553 }
2554
2555 /* Rename rdb like renaming rewrite aof asynchronously. */
2556 int old_rdb_fd = open(server.rdb_filename,O_RDONLY|O_NONBLOCK);
2557 if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {
2558 serverLog(LL_WARNING,
2559 "Failed trying to rename the temp DB into %s in "
2560 "MASTER <-> REPLICA synchronization: %s",
2561 server.rdb_filename, strerror(errno));
2562 cancelReplicationHandshake(1);
2563 if (old_rdb_fd != -1) close(old_rdb_fd);
2564 return;
2565 }
2566 /* Close old rdb asynchronously. */
2567 if (old_rdb_fd != -1) bioCreateCloseJob(old_rdb_fd, 0, 0);
2568
2569 /* Sync the directory to ensure rename is persisted */
2570 if (fsyncFileDir(server.rdb_filename) == -1) {
2571 serverLog(LL_WARNING,
2572 "Failed trying to sync DB directory %s in "
2573 "MASTER <-> REPLICA synchronization: %s",
2574 server.rdb_filename, strerror(errno));
2575 cancelReplicationHandshake(1);
2576 return;
2577 }
2578
2579 if (rdbLoadWithEmptyFunc(server.rdb_filename,&rsi,RDBFLAGS_REPLICATION,rdbLoadEmptyDbFunc) != RDB_OK) {
2580 serverLog(LL_WARNING,
2581 "Failed trying to load the MASTER synchronization "
2582 "DB from disk, check server logs.");
2583 cancelReplicationHandshake(1);
2584 if (server.rdb_del_sync_files && allPersistenceDisabled()) {
2585 serverLog(LL_NOTICE,"Removing the RDB file obtained from "
2586 "the master. This replica has persistence "
2587 "disabled");
2588 bg_unlink(server.rdb_filename);
2589 }
2590
2591 /* Note that there's no point in restarting the AOF on sync failure,
2592 it'll be restarted when sync succeeds or replica promoted. */
2593 return;
2594 }
2595
2596 /* Cleanup. */
2597 if (server.rdb_del_sync_files && allPersistenceDisabled()) {
2598 serverLog(LL_NOTICE,"Removing the RDB file obtained from "
2599 "the master. This replica has persistence "
2600 "disabled");
2601 bg_unlink(server.rdb_filename);
2602 }
2603
2604 zfree(server.repl_transfer_tmpfile);
2605 close(server.repl_transfer_fd);
2606 server.repl_transfer_fd = -1;
2607 server.repl_transfer_tmpfile = NULL;
2608 }
2609
2610 /* Final setup of the connected slave <- master link */
2611 replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
2612 server.repl_state = REPL_STATE_CONNECTED;
2613 server.repl_down_since = 0;
2614 server.repl_up_since = server.unixtime;
2615
2616 if (server.repl_disconnect_start_time != 0) {
2617 server.repl_total_disconnect_time += server.unixtime - server.repl_disconnect_start_time;
2618 server.repl_disconnect_start_time = 0;
2619 }
2620 /* Fire the master link modules event. */
2621 moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
2622 REDISMODULE_SUBEVENT_MASTER_LINK_UP,
2623 NULL);
2624
2625 /* After a full resynchronization we use the replication ID and
2626 * offset of the master. The secondary ID / offset are cleared since
2627 * we are starting a new history. */
2628 memcpy(server.replid,server.master->replid,sizeof(server.replid));
2629 server.master_repl_offset = server.master->reploff;
2630 clearReplicationId2();
2631
2632 /* Let's create the replication backlog if needed. Slaves need to
2633 * accumulate the backlog regardless of the fact they have sub-slaves
2634 * or not, in order to behave correctly if they are promoted to
2635 * masters after a failover. */
2636 if (server.repl_backlog == NULL) createReplicationBacklog();
2637 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
2638
2639 if (server.supervised_mode == SUPERVISED_SYSTEMD) {
2640 redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Finished with success. Ready to accept connections in read-write mode.\n");
2641 }
2642
2643 /* Send the initial ACK immediately to put this replica in online state. */
2644 if (usemark) replicationSendAck();
2645
2646 /* Restart the AOF subsystem now that we finished the sync. This
2647 * will trigger an AOF rewrite, and when done will start appending
2648 * to the new file. */
2649 if (server.aof_enabled) {
2650 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Starting AOF after a successful sync");
2651 startAppendOnlyWithRetry();
2652 }
2653
2654 /* Stream accumulated replication buffer to the db and finalize fullsync */
2655 if (rdbchannel) {
2656 if (server.repl_rdb_transfer_s) {
2657 connClose(server.repl_rdb_transfer_s);
2658 server.repl_rdb_transfer_s = NULL;
2659 }
2660 rdbChannelStreamReplDataToDb();
2661 }
2662
2663 return;
2664
2665error:
2666 cancelReplicationHandshake(1);
2667 return;
2668}
2669
2670char *receiveSynchronousResponse(connection *conn) {
2671 char buf[256];
2672 /* Read the reply from the server. */
2673 if (connSyncReadLine(conn,buf,sizeof(buf),server.repl_syncio_timeout*1000) == -1)
2674 {
2675 serverLog(LL_WARNING, "Failed to read response from the server: %s", connGetLastError(conn));
2676 return NULL;
2677 }
2678 server.repl_transfer_lastio = server.unixtime;
2679 return sdsnew(buf);
2680}
2681
2682/* Send a pre-formatted multi-bulk command to the connection. */
2683char* sendCommandRaw(connection *conn, sds cmd) {
2684 if (connSyncWrite(conn,cmd,sdslen(cmd),server.repl_syncio_timeout*1000) == -1) {
2685 return sdscatprintf(sdsempty(),"-Writing to master: %s",
2686 connGetLastError(conn));
2687 }
2688 return NULL;
2689}
2690
2691/* Compose a multi-bulk command and send it to the connection.
2692 * Used to send AUTH and REPLCONF commands to the master before starting the
2693 * replication.
2694 *
2695 * Takes a list of char* arguments, terminated by a NULL argument.
2696 *
2697 * The command returns an sds string representing the result of the
2698 * operation. On error the first byte is a "-".
2699 */
2700char *sendCommand(connection *conn, ...) {
2701 va_list ap;
2702 sds cmd = sdsempty();
2703 sds cmdargs = sdsempty();
2704 size_t argslen = 0;
2705 char *arg;
2706
2707 /* Create the command to send to the master, we use redis binary
2708 * protocol to make sure correct arguments are sent. This function
2709 * is not safe for all binary data. */
2710 va_start(ap,conn);
2711 while(1) {
2712 arg = va_arg(ap, char*);
2713 if (arg == NULL) break;
2714 cmdargs = sdscatprintf(cmdargs,"$%zu\r\n%s\r\n",strlen(arg),arg);
2715 argslen++;
2716 }
2717
2718 cmd = sdscatprintf(cmd,"*%zu\r\n",argslen);
2719 cmd = sdscatsds(cmd,cmdargs);
2720 sdsfree(cmdargs);
2721
2722 va_end(ap);
2723 char* err = sendCommandRaw(conn, cmd);
2724 sdsfree(cmd);
2725 if(err)
2726 return err;
2727 return NULL;
2728}
2729
2730/* Compose a multi-bulk command and send it to the connection.
2731 * Used to send AUTH and REPLCONF commands to the master before starting the
2732 * replication.
2733 *
2734 * argv_lens is optional, when NULL, strlen is used.
2735 *
2736 * The command returns an sds string representing the result of the
2737 * operation. On error the first byte is a "-".
2738 */
2739char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens) {
2740 sds cmd = sdsempty();
2741 char *arg;
2742 int i;
2743
2744 /* Create the command to send to the master. */
2745 cmd = sdscatfmt(cmd,"*%i\r\n",argc);
2746 for (i=0; i<argc; i++) {
2747 int len;
2748 arg = argv[i];
2749 len = argv_lens ? argv_lens[i] : strlen(arg);
2750 cmd = sdscatfmt(cmd,"$%i\r\n",len);
2751 cmd = sdscatlen(cmd,arg,len);
2752 cmd = sdscatlen(cmd,"\r\n",2);
2753 }
2754 char* err = sendCommandRaw(conn, cmd);
2755 sdsfree(cmd);
2756 if (err)
2757 return err;
2758 return NULL;
2759}
2760
2761/* Try a partial resynchronization with the master if we are about to reconnect.
2762 * If there is no cached master structure, at least try to issue a
2763 * "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
2764 * command in order to obtain the master replid and the master replication
2765 * global offset.
2766 *
2767 * This function is designed to be called from syncWithMaster(), so the
2768 * following assumptions are made:
2769 *
2770 * 1) We pass the function an already connected socket "fd".
2771 * 2) This function does not close the file descriptor "fd". However in case
2772 * of successful partial resynchronization, the function will reuse
2773 * 'fd' as file descriptor of the server.master client structure.
2774 *
2775 * The function is split in two halves: if read_reply is 0, the function
2776 * writes the PSYNC command on the socket, and a new function call is
2777 * needed, with read_reply set to 1, in order to read the reply of the
2778 * command. This is useful in order to support non blocking operations, so
2779 * that we write, return into the event loop, and read when there are data.
2780 *
2781 * When read_reply is 0 the function returns PSYNC_WRITE_ERR if there
2782 * was a write error, or PSYNC_WAIT_REPLY to signal we need another call
2783 * with read_reply set to 1. However even when read_reply is set to 1
2784 * the function may return PSYNC_WAIT_REPLY again to signal there were
2785 * insufficient data to read to complete its work. We should re-enter
2786 * into the event loop and wait in such a case.
2787 *
2788 * The function returns:
2789 *
2790 * PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue.
2791 * PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
2792 * In this case the master replid and global replication
2793 * offset is saved.
2794 * PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
2795 * the caller should fall back to SYNC.
2796 * PSYNC_WRITE_ERROR: There was an error writing the command to the socket.
2797 * PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1.
2798 * PSYNC_TRY_LATER: Master is currently in a transient error condition.
2799 *
2800 * Notable side effects:
2801 *
2802 * 1) As a side effect of the function call the function removes the readable
2803 * event handler from "fd", unless the return value is PSYNC_WAIT_REPLY.
2804 * 2) server.master_initial_offset is set to the right value according
2805 * to the master reply. This will be used to populate the 'server.master'
2806 * structure replication offset.
2807 */
2808
2809#define PSYNC_WRITE_ERROR 0
2810#define PSYNC_WAIT_REPLY 1
2811#define PSYNC_CONTINUE 2
2812#define PSYNC_FULLRESYNC 3
2813#define PSYNC_NOT_SUPPORTED 4
2814#define PSYNC_TRY_LATER 5
2815#define PSYNC_FULLRESYNC_RDBCHANNEL 6
2816int slaveTryPartialResynchronization(connection *conn, int read_reply) {
2817 char *psync_replid;
2818 char psync_offset[32];
2819 sds reply;
2820
2821 /* Writing half */
2822 if (!read_reply) {
2823 /* Initially set master_initial_offset to -1 to mark the current
2824 * master replid and offset as not valid. Later if we'll be able to do
2825 * a FULL resync using the PSYNC command we'll set the offset at the
2826 * right value, so that this information will be propagated to the
2827 * client structure representing the master into server.master. */
2828 server.master_initial_offset = -1;
2829
2830 if (server.cached_master) {
2831 psync_replid = server.cached_master->replid;
2832 snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
2833 serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
2834 } else {
2835 serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
2836 psync_replid = "?";
2837 memcpy(psync_offset,"-1",3);
2838 }
2839
2840 /* Issue the PSYNC command, if this is a master with a failover in
2841 * progress then send the failover argument to the replica to cause it
2842 * to become a master */
2843 if (server.failover_state == FAILOVER_IN_PROGRESS) {
2844 reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,"FAILOVER",NULL);
2845 } else {
2846 reply = sendCommand(conn,"PSYNC",psync_replid,psync_offset,NULL);
2847 }
2848
2849 if (reply != NULL) {
2850 serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
2851 sdsfree(reply);
2852 connSetReadHandler(conn, NULL);
2853 return PSYNC_WRITE_ERROR;
2854 }
2855 return PSYNC_WAIT_REPLY;
2856 }
2857
2858 /* Reading half */
2859 reply = receiveSynchronousResponse(conn);
2860 /* Master did not reply to PSYNC */
2861 if (reply == NULL) {
2862 connSetReadHandler(conn, NULL);
2863 serverLog(LL_WARNING, "Master did not reply to PSYNC, will try later");
2864 return PSYNC_TRY_LATER;
2865 }
2866
2867 if (sdslen(reply) == 0) {
2868 /* The master may send empty newlines after it receives PSYNC
2869 * and before to reply, just to keep the connection alive. */
2870 sdsfree(reply);
2871 return PSYNC_WAIT_REPLY;
2872 }
2873
2874 connSetReadHandler(conn, NULL);
2875
2876 if (!strncmp(reply,"+FULLRESYNC",11)) {
2877 char *replid = NULL, *offset = NULL;
2878
2879 /* FULL RESYNC, parse the reply in order to extract the replid
2880 * and the replication offset. */
2881 replid = strchr(reply,' ');
2882 if (replid) {
2883 replid++;
2884 offset = strchr(replid,' ');
2885 if (offset) offset++;
2886 }
2887 if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
2888 serverLog(LL_WARNING,
2889 "Master replied with wrong +FULLRESYNC syntax.");
2890 /* This is an unexpected condition, actually the +FULLRESYNC
2891 * reply means that the master supports PSYNC, but the reply
2892 * format seems wrong. To stay safe we blank the master
2893 * replid to make sure next PSYNCs will fail. */
2894 memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
2895 } else {
2896 memcpy(server.master_replid, replid, offset-replid-1);
2897 server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
2898 server.master_initial_offset = strtoll(offset,NULL,10);
2899 serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
2900 server.master_replid,
2901 server.master_initial_offset);
2902 }
2903 sdsfree(reply);
2904 return PSYNC_FULLRESYNC;
2905 }
2906
2907 if (!strncmp(reply, "+RDBCHANNELSYNC", strlen("+RDBCHANNELSYNC"))) {
2908 char *client_id = strchr(reply,' ');
2909 if (client_id)
2910 client_id++;
2911
2912 if (!client_id) {
2913 serverLog(LL_WARNING,
2914 "Master replied with wrong +RDBCHANNELSYNC syntax: %s", reply);
2915 sdsfree(reply);
2916 return PSYNC_NOT_SUPPORTED;
2917 }
2918 server.repl_main_ch_client_id = strtoll(client_id, NULL, 10);;
2919 /* A response of +RDBCHANNELSYNC from the master implies that partial
2920 * synchronization is not possible and that the master supports full
2921 * sync using dedicated RDB channel. Full sync will continue that way.*/
2922 serverLog(LL_NOTICE, "PSYNC is not possible, initialize RDB channel.");
2923 sdsfree(reply);
2924 return PSYNC_FULLRESYNC_RDBCHANNEL;
2925 }
2926
2927 if (!strncmp(reply,"+CONTINUE",9)) {
2928 /* Partial resync was accepted. */
2929 serverLog(LL_NOTICE,
2930 "Successful partial resynchronization with master.");
2931
2932 /* Check the new replication ID advertised by the master. If it
2933 * changed, we need to set the new ID as primary ID, and set
2934 * secondary ID as the old master ID up to the current offset, so
2935 * that our sub-slaves will be able to PSYNC with us after a
2936 * disconnection. */
2937 char *start = reply+10;
2938 char *end = reply+9;
2939 while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
2940 if (end-start == CONFIG_RUN_ID_SIZE) {
2941 char new[CONFIG_RUN_ID_SIZE+1];
2942 memcpy(new,start,CONFIG_RUN_ID_SIZE);
2943 new[CONFIG_RUN_ID_SIZE] = '\0';
2944
2945 if (strcmp(new,server.cached_master->replid)) {
2946 /* Master ID changed. */
2947 serverLog(LL_NOTICE,"Master replication ID changed to %s",new);
2948
2949 /* Set the old ID as our ID2, up to the current offset+1. */
2950 memcpy(server.replid2,server.cached_master->replid,
2951 sizeof(server.replid2));
2952 server.second_replid_offset = server.master_repl_offset+1;
2953
2954 /* Update the cached master ID and our own primary ID to the
2955 * new one. */
2956 memcpy(server.replid,new,sizeof(server.replid));
2957 memcpy(server.cached_master->replid,new,sizeof(server.replid));
2958
2959 /* Disconnect all the sub-slaves: they need to be notified. */
2960 disconnectSlaves();
2961 }
2962 }
2963
2964 /* Setup the replication to continue. */
2965 sdsfree(reply);
2966 replicationResurrectCachedMaster(conn);
2967
2968 /* If this instance was restarted and we read the metadata to
2969 * PSYNC from the persistence file, our replication backlog could
2970 * be still not initialized. Create it. */
2971 if (server.repl_backlog == NULL) createReplicationBacklog();
2972 return PSYNC_CONTINUE;
2973 }
2974
2975 /* If we reach this point we received either an error (since the master does
2976 * not understand PSYNC or because it is in a special state and cannot
2977 * serve our request), or an unexpected reply from the master.
2978 *
2979 * Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise
2980 * return PSYNC_TRY_LATER if we believe this is a transient error. */
2981
2982 if (!strncmp(reply,"-NOMASTERLINK",13) ||
2983 !strncmp(reply,"-LOADING",8))
2984 {
2985 serverLog(LL_NOTICE,
2986 "Master is currently unable to PSYNC "
2987 "but should be in the future: %s", reply);
2988 sdsfree(reply);
2989 return PSYNC_TRY_LATER;
2990 }
2991
2992 if (strncmp(reply,"-ERR",4)) {
2993 /* If it's not an error, log the unexpected event. */
2994 serverLog(LL_WARNING,
2995 "Unexpected reply to PSYNC from master: %s", reply);
2996 } else {
2997 serverLog(LL_NOTICE,
2998 "Master does not support PSYNC or is in "
2999 "error state (reply: %s)", reply);
3000 }
3001 sdsfree(reply);
3002 return PSYNC_NOT_SUPPORTED;
3003}
3004
3005/* This handler fires when the non blocking connect was able to
3006 * establish a connection with the master. */
3007void syncWithMaster(connection *conn) {
3008 char tmpfile[256], *err = NULL;
3009 int dfd = -1, maxtries = 5;
3010 int psync_result;
3011 static int replconf_rdb_no_compress = 0;
3012
3013 /* If this event fired after the user turned the instance into a master
3014 * with SLAVEOF NO ONE we must just return ASAP. */
3015 if (server.repl_state == REPL_STATE_NONE) {
3016 connClose(conn);
3017 return;
3018 }
3019
3020 /* Check for errors in the socket: after a non blocking connect() we
3021 * may find that the socket is in error state. */
3022 if (connGetState(conn) != CONN_STATE_CONNECTED) {
3023 serverLog(LL_WARNING,"Error condition on socket for SYNC: %s",
3024 connGetLastError(conn));
3025 goto error;
3026 }
3027
3028 /* Send a PING to check the master is able to reply without errors. */
3029 if (server.repl_state == REPL_STATE_CONNECTING) {
3030 serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
3031 /* Delete the writable event so that the readable event remains
3032 * registered and we can wait for the PONG reply. */
3033 connSetReadHandler(conn, syncWithMaster);
3034 connSetWriteHandler(conn, NULL);
3035 server.repl_state = REPL_STATE_RECEIVE_PING_REPLY;
3036 /* Send the PING, don't check for errors at all, we have the timeout
3037 * that will take care about this. */
3038 err = sendCommand(conn,"PING",NULL);
3039 if (err) goto write_error;
3040 return;
3041 }
3042
3043 /* Receive the PONG command. */
3044 if (server.repl_state == REPL_STATE_RECEIVE_PING_REPLY) {
3045 err = receiveSynchronousResponse(conn);
3046
3047 /* The master did not reply */
3048 if (err == NULL) goto no_response_error;
3049
3050 /* We accept only two replies as valid, a positive +PONG reply
3051 * (we just check for "+") or an authentication error.
3052 * Note that older versions of Redis replied with "operation not
3053 * permitted" instead of using a proper error code, so we test
3054 * both. */
3055 if (err[0] != '+' &&
3056 strncmp(err,"-NOAUTH",7) != 0 &&
3057 strncmp(err,"-NOPERM",7) != 0 &&
3058 strncmp(err,"-ERR operation not permitted",28) != 0)
3059 {
3060 serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
3061 sdsfree(err);
3062 goto error;
3063 } else {
3064 serverLog(LL_NOTICE,
3065 "Master replied to PING, replication can continue...");
3066 }
3067 sdsfree(err);
3068 err = NULL;
3069 server.repl_state = REPL_STATE_SEND_HANDSHAKE;
3070 }
3071
3072 if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) {
3073 /* AUTH with the master if required. */
3074 if (server.masterauth) {
3075 char *args[3] = {"AUTH",NULL,NULL};
3076 size_t lens[3] = {4,0,0};
3077 int argc = 1;
3078 if (server.masteruser) {
3079 args[argc] = server.masteruser;
3080 lens[argc] = strlen(server.masteruser);
3081 argc++;
3082 }
3083 args[argc] = server.masterauth;
3084 lens[argc] = sdslen(server.masterauth);
3085 argc++;
3086 err = sendCommandArgv(conn, argc, args, lens);
3087 if (err) goto write_error;
3088 }
3089
3090 /* Set the slave port, so that Master's INFO command can list the
3091 * slave listening port correctly. */
3092 {
3093 char buf[LONG_STR_SIZE];
3094
3095 slaveGetPortStr(buf, sizeof(buf));
3096 err = sendCommand(conn,"REPLCONF",
3097 "listening-port",buf, NULL);
3098 if (err) goto write_error;
3099 }
3100
3101 /* Set the slave ip, so that Master's INFO command can list the
3102 * slave IP address port correctly in case of port forwarding or NAT.
3103 * Skip REPLCONF ip-address if there is no slave-announce-ip option set. */
3104 if (server.slave_announce_ip) {
3105 err = sendCommand(conn,"REPLCONF",
3106 "ip-address",server.slave_announce_ip, NULL);
3107 if (err) goto write_error;
3108 }
3109
3110 /* If we are not going to save the RDB to disk, request that RDB
3111 * compression be disabled, which speeds up RDB delivery. */
3112 replconf_rdb_no_compress = 0;
3113 if (useDisklessLoad()) {
3114 replconf_rdb_no_compress = 1;
3115 err = sendCommand(conn, "REPLCONF", "rdb-no-compress", "1", NULL);
3116 if (err) goto write_error;
3117 }
3118
3119 /* Inform the master of our (slave) capabilities.
3120 *
3121 * EOF: supports EOF-style RDB transfer for diskless replication.
3122 * PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
3123 *
3124 * The master will ignore capabilities it does not understand. */
3125 err = sendCommand(conn,"REPLCONF",
3126 "capa","eof","capa","psync2",
3127 server.repl_rdb_channel ? "capa" : NULL, "rdb-channel-repl", NULL);
3128
3129 if (err) goto write_error;
3130
3131 server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY;
3132 return;
3133 }
3134
3135 if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY && !server.masterauth)
3136 server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY;
3137
3138 /* Receive AUTH reply. */
3139 if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY) {
3140 err = receiveSynchronousResponse(conn);
3141 if (err == NULL) goto no_response_error;
3142 if (err[0] == '-') {
3143 serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
3144 sdsfree(err);
3145 goto error;
3146 }
3147 sdsfree(err);
3148 err = NULL;
3149 server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY;
3150 return;
3151 }
3152
3153 /* Receive REPLCONF listening-port reply. */
3154 if (server.repl_state == REPL_STATE_RECEIVE_PORT_REPLY) {
3155 err = receiveSynchronousResponse(conn);
3156 if (err == NULL) goto no_response_error;
3157 /* Ignore the error if any, not all the Redis versions support
3158 * REPLCONF listening-port. */
3159 if (err[0] == '-') {
3160 serverLog(LL_NOTICE,"(Non critical) Master does not understand "
3161 "REPLCONF listening-port: %s", err);
3162 }
3163 sdsfree(err);
3164 server.repl_state = REPL_STATE_RECEIVE_IP_REPLY;
3165 return;
3166 }
3167
3168 if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY && !server.slave_announce_ip)
3169 server.repl_state = REPL_STATE_RECEIVE_COMP_REPLY;
3170
3171 /* Receive REPLCONF ip-address reply. */
3172 if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY) {
3173 err = receiveSynchronousResponse(conn);
3174 if (err == NULL) goto no_response_error;
3175 /* Ignore the error if any, not all the Redis versions support
3176 * REPLCONF ip-address. */
3177 if (err[0] == '-') {
3178 serverLog(LL_NOTICE,"(Non critical) Master does not understand "
3179 "REPLCONF ip-address: %s", err);
3180 }
3181 sdsfree(err);
3182 server.repl_state = REPL_STATE_RECEIVE_COMP_REPLY;
3183 return;
3184 }
3185
3186 if (server.repl_state == REPL_STATE_RECEIVE_COMP_REPLY && !replconf_rdb_no_compress)
3187 server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
3188
3189 /* Receive REPLCONF rdb-no-compress reply. */
3190 if (server.repl_state == REPL_STATE_RECEIVE_COMP_REPLY) {
3191 err = receiveSynchronousResponse(conn);
3192 if (err == NULL) goto no_response_error;
3193 /* Ignore the error if any, not all the Redis versions support
3194 * REPLCONF rdb-no-compress. */
3195 if (err[0] == '-') {
3196 serverLog(LL_NOTICE,"(Non critical) Master does not understand "
3197 "REPLCONF rdb-no-compress: %s", err);
3198 }
3199 sdsfree(err);
3200 server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
3201 return;
3202 }
3203
3204 /* Receive CAPA reply. */
3205 if (server.repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) {
3206 err = receiveSynchronousResponse(conn);
3207 if (err == NULL) goto no_response_error;
3208 /* Ignore the error if any, not all the Redis versions support
3209 * REPLCONF capa. */
3210 if (err[0] == '-') {
3211 serverLog(LL_NOTICE,"(Non critical) Master does not understand "
3212 "REPLCONF capa: %s", err);
3213 }
3214 sdsfree(err);
3215 err = NULL;
3216 server.repl_state = REPL_STATE_SEND_PSYNC;
3217 }
3218
3219 /* Try a partial resynchronization. If we don't have a cached master
3220 * slaveTryPartialResynchronization() will at least try to use PSYNC
3221 * to start a full resynchronization so that we get the master replid
3222 * and the global offset, to try a partial resync at the next
3223 * reconnection attempt. */
3224 if (server.repl_state == REPL_STATE_SEND_PSYNC) {
3225 if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) {
3226 err = sdsnew("Write error sending the PSYNC command.");
3227 abortFailover("Write error to failover target");
3228 goto write_error;
3229 }
3230 server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY;
3231 return;
3232 }
3233
3234 /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */
3235 if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) {
3236 serverLog(LL_WARNING,"syncWithMaster(): state machine error, "
3237 "state should be RECEIVE_PSYNC_REPLY but is %d",
3238 server.repl_state);
3239 goto error;
3240 }
3241
3242 psync_result = slaveTryPartialResynchronization(conn,1);
3243 if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
3244
3245 /* Check the status of the planned failover. We expect PSYNC_CONTINUE,
3246 * but there is nothing technically wrong with a full resync which
3247 * could happen in edge cases. */
3248 if (server.failover_state == FAILOVER_IN_PROGRESS) {
3249 if (psync_result == PSYNC_CONTINUE ||
3250 psync_result == PSYNC_FULLRESYNC ||
3251 psync_result == PSYNC_FULLRESYNC_RDBCHANNEL)
3252 {
3253 clearFailoverState();
3254 } else {
3255 abortFailover("Failover target rejected psync request");
3256 return;
3257 }
3258 }
3259
3260 /* If the master is in an transient error, we should try to PSYNC
3261 * from scratch later, so go to the error path. This happens when
3262 * the server is loading the dataset or is not connected with its
3263 * master and so forth. */
3264 if (psync_result == PSYNC_TRY_LATER) goto error;
3265
3266 /* Note: if PSYNC does not return WAIT_REPLY, it will take care of
3267 * uninstalling the read handler from the file descriptor. */
3268
3269 if (psync_result == PSYNC_CONTINUE) {
3270 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
3271 if (server.supervised_mode == SUPERVISED_SYSTEMD) {
3272 redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections in read-write mode.\n");
3273 }
3274 return;
3275 }
3276
3277 /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
3278 * and the server.master_replid and master_initial_offset are
3279 * already populated. */
3280 if (psync_result == PSYNC_NOT_SUPPORTED) {
3281 serverLog(LL_NOTICE,"Retrying with SYNC...");
3282 if (connSyncWrite(conn,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
3283 serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
3284 connGetLastError(conn));
3285 goto error;
3286 }
3287 }
3288
3289 /* Prepare a suitable temp file for bulk transfer */
3290 if (!useDisklessLoad()) {
3291 while(maxtries--) {
3292 snprintf(tmpfile,256,
3293 "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
3294 dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
3295 if (dfd != -1) break;
3296 sleep(1);
3297 }
3298 if (dfd == -1) {
3299 serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
3300 goto error;
3301 }
3302 server.repl_transfer_tmpfile = zstrdup(tmpfile);
3303 server.repl_transfer_fd = dfd;
3304 }
3305
3306 server.repl_transfer_size = -1;
3307 server.repl_transfer_read = 0;
3308 server.repl_transfer_last_fsync_off = 0;
3309 server.repl_transfer_lastio = server.unixtime;
3310
3311 /* Using rdb channel replication, the master responded +RDBCHANNELSYNC.
3312 * We need to initialize the RDB channel. */
3313 if (psync_result == PSYNC_FULLRESYNC_RDBCHANNEL) {
3314 /* Create RDB connection */
3315 server.repl_rdb_transfer_s = connCreate(server.el, connTypeOfReplication());
3316 if (connConnect(server.repl_rdb_transfer_s, server.masterhost,
3317 server.masterport, server.bind_source_addr,
3318 rdbChannelFullSyncWithMaster) == C_ERR) {
3319 serverLog(LL_WARNING, "Unable to connect to master: %s", connGetLastError(server.repl_rdb_transfer_s));
3320 goto error;
3321 }
3322 server.repl_rdb_ch_state = REPL_RDB_CH_SEND_HANDSHAKE;
3323 connSetReadHandler(server.repl_transfer_s, NULL);
3324 return;
3325 }
3326
3327 /* Setup the non blocking download of the bulk file. */
3328 if (connSetReadHandler(conn, readSyncBulkPayload)
3329 == C_ERR)
3330 {
3331 char conninfo[CONN_INFO_LEN];
3332 serverLog(LL_WARNING,
3333 "Can't create readable event for SYNC: %s (%s)",
3334 strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo)));
3335 goto error;
3336 }
3337
3338 server.repl_state = REPL_STATE_TRANSFER;
3339 return;
3340
3341no_response_error: /* Handle receiveSynchronousResponse() error when master has no reply */
3342 serverLog(LL_WARNING, "Master did not respond to command during SYNC handshake");
3343 /* Fall through to regular error handling */
3344
3345error:
3346 if (dfd != -1) close(dfd);
3347 connClose(conn);
3348 if (server.repl_rdb_transfer_s)
3349 connClose(server.repl_rdb_transfer_s);
3350 server.repl_rdb_transfer_s = NULL;
3351 server.repl_transfer_s = NULL;
3352 if (server.repl_transfer_fd != -1)
3353 close(server.repl_transfer_fd);
3354 if (server.repl_transfer_tmpfile)
3355 zfree(server.repl_transfer_tmpfile);
3356 server.repl_transfer_tmpfile = NULL;
3357 server.repl_transfer_fd = -1;
3358 server.repl_state = REPL_STATE_CONNECT;
3359 return;
3360
3361write_error: /* Handle sendCommand() errors. */
3362 serverLog(LL_WARNING,"Sending command to master in replication handshake: %s", err);
3363 sdsfree(err);
3364 goto error;
3365}
3366
3367int connectWithMaster(void) {
3368 server.repl_current_sync_attempts++;
3369 server.repl_total_sync_attempts++;
3370 server.repl_transfer_s = connCreate(server.el, connTypeOfReplication());
3371 if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport,
3372 server.bind_source_addr, syncWithMaster) == C_ERR) {
3373 serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
3374 connGetLastError(server.repl_transfer_s));
3375 connClose(server.repl_transfer_s);
3376 server.repl_transfer_s = NULL;
3377 return C_ERR;
3378 }
3379
3380
3381 server.repl_transfer_lastio = server.unixtime;
3382 server.repl_state = REPL_STATE_CONNECTING;
3383 serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
3384 return C_OK;
3385}
3386
3387/* This function can be called when a non blocking connection is currently
3388 * in progress to undo it.
3389 * Never call this function directly, use cancelReplicationHandshake() instead.
3390 */
3391void undoConnectWithMaster(void) {
3392 connClose(server.repl_transfer_s);
3393 server.repl_transfer_s = NULL;
3394}
3395
3396/* Abort the async download of the bulk dataset while SYNC-ing with master.
3397 * Never call this function directly, use cancelReplicationHandshake() instead.
3398 */
3399void replicationAbortSyncTransfer(void) {
3400 serverAssert(server.repl_state == REPL_STATE_TRANSFER);
3401 undoConnectWithMaster();
3402 if (server.repl_disconnect_start_time == 0)
3403 server.repl_disconnect_start_time = server.unixtime;
3404 if (server.repl_transfer_fd!=-1) {
3405 close(server.repl_transfer_fd);
3406 bg_unlink(server.repl_transfer_tmpfile);
3407 zfree(server.repl_transfer_tmpfile);
3408 server.repl_transfer_tmpfile = NULL;
3409 server.repl_transfer_fd = -1;
3410 }
3411}
3412
3413/* This function aborts a non blocking replication attempt if there is one
3414 * in progress, by canceling the non-blocking connect attempt or
3415 * the initial bulk transfer.
3416 *
3417 * If there was a replication handshake in progress 1 is returned and
3418 * the replication state (server.repl_state) set to REPL_STATE_CONNECT.
3419 *
3420 * Otherwise zero is returned and no operation is performed at all. */
3421int cancelReplicationHandshake(int reconnect) {
3422 if (rdbChannelAbort() != C_OK)
3423 return 1;
3424
3425 if (server.repl_state == REPL_STATE_TRANSFER) {
3426 replicationAbortSyncTransfer();
3427 server.repl_state = REPL_STATE_CONNECT;
3428 } else if (server.repl_state == REPL_STATE_CONNECTING ||
3429 slaveIsInHandshakeState())
3430 {
3431 undoConnectWithMaster();
3432 server.repl_state = REPL_STATE_CONNECT;
3433 } else {
3434 return 0;
3435 }
3436
3437 if (!reconnect)
3438 return 1;
3439
3440 /* try to re-connect without waiting for replicationCron, this is needed
3441 * for the "diskless loading short read" test. */
3442 serverLog(LL_NOTICE,"Reconnecting to MASTER %s:%d after failure",
3443 server.masterhost, server.masterport);
3444 connectWithMaster();
3445
3446 return 1;
3447}
3448
3449/* Set replication to the specified master address and port. */
3450void replicationSetMaster(char *ip, int port) {
3451 int was_master = server.masterhost == NULL;
3452
3453 sdsfree(server.masterhost);
3454 server.masterhost = NULL;
3455 if (server.master) {
3456 freeClient(server.master);
3457 }
3458 disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
3459
3460 /* Setting masterhost only after the call to freeClient since it calls
3461 * replicationHandleMasterDisconnection which can trigger a re-connect
3462 * directly from within that call. */
3463 server.masterhost = sdsnew(ip);
3464 server.masterport = port;
3465
3466 /* Update oom_score_adj */
3467 setOOMScoreAdj(-1);
3468
3469 /* Here we don't disconnect with replicas, since they may hopefully be able
3470 * to partially resync with us. We will disconnect with replicas and force
3471 * them to resync with us when changing replid on partially resync with new
3472 * master, or finishing transferring RDB and preparing loading DB on full
3473 * sync with new master. */
3474
3475 cancelReplicationHandshake(0);
3476 /* Before destroying our master state, create a cached master using
3477 * our own parameters, to later PSYNC with the new master. */
3478 if (was_master) {
3479 replicationDiscardCachedMaster();
3480 replicationCacheMasterUsingMyself();
3481 }
3482
3483 /* Fire the role change modules event. */
3484 moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
3485 REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
3486 NULL);
3487
3488 /* Fire the master link modules event. */
3489 if (server.repl_state == REPL_STATE_CONNECTED)
3490 moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
3491 REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
3492 NULL);
3493
3494 server.repl_state = REPL_STATE_CONNECT;
3495 server.repl_current_sync_attempts = 0;
3496 server.repl_total_sync_attempts = 0;
3497 serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
3498 server.masterhost, server.masterport);
3499 connectWithMaster();
3500}
3501
3502/* Cancel replication, setting the instance as a master itself. */
3503void replicationUnsetMaster(void) {
3504 if (server.masterhost == NULL) return; /* Nothing to do. */
3505
3506 /* Fire the master link modules event. */
3507 if (server.repl_state == REPL_STATE_CONNECTED)
3508 moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
3509 REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
3510 NULL);
3511
3512 /* Clear masterhost first, since the freeClient calls
3513 * replicationHandleMasterDisconnection which can attempt to re-connect. */
3514 sdsfree(server.masterhost);
3515 server.masterhost = NULL;
3516 if (server.master) freeClient(server.master);
3517 replicationDiscardCachedMaster();
3518 cancelReplicationHandshake(0);
3519 /* When a slave is turned into a master, the current replication ID
3520 * (that was inherited from the master at synchronization time) is
3521 * used as secondary ID up to the current offset, and a new replication
3522 * ID is created to continue with a new replication history. */
3523 shiftReplicationId();
3524 /* Disconnecting all the slaves is required: we need to inform slaves
3525 * of the replication ID change (see shiftReplicationId() call). However
3526 * the slaves will be able to partially resync with us, so it will be
3527 * a very fast reconnection. */
3528 disconnectSlaves();
3529 server.repl_state = REPL_STATE_NONE;
3530 /* Reset the attempts number. */
3531 server.repl_current_sync_attempts = 0;
3532 server.repl_total_sync_attempts = 0;
3533 /* We need to make sure the new master will start the replication stream
3534 * with a SELECT statement. This is forced after a full resync, but
3535 * with PSYNC version 2, there is no need for full resync after a
3536 * master switch. */
3537 server.slaveseldb = -1;
3538
3539 /* Update oom_score_adj */
3540 setOOMScoreAdj(-1);
3541
3542 /* Once we turn from slave to master, we consider the starting time without
3543 * slaves (that is used to count the replication backlog time to live) as
3544 * starting from now. Otherwise the backlog will be freed after a
3545 * failover if slaves do not connect immediately. */
3546 server.repl_no_slaves_since = server.unixtime;
3547
3548 /* Reset up and down time so it'll be ready for when we turn into replica again. */
3549 server.repl_down_since = 0;
3550 server.repl_up_since = 0;
3551 /* Fire the role change modules event. */
3552 moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
3553 REDISMODULE_EVENT_REPLROLECHANGED_NOW_MASTER,
3554 NULL);
3555
3556 /* Restart the AOF subsystem in case we shut it down during a sync when
3557 * we were still a slave. */
3558 if (server.aof_enabled && server.aof_state == AOF_OFF) {
3559 serverLog(LL_NOTICE, "Restarting AOF after becoming master");
3560 startAppendOnlyWithRetry();
3561 }
3562}
3563
3564/* This function is called when the slave lose the connection with the
3565 * master into an unexpected way. */
3566void replicationHandleMasterDisconnection(void) {
3567 /* Fire the master link modules event. */
3568 if (server.repl_state == REPL_STATE_CONNECTED)
3569 moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
3570 REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
3571 NULL);
3572
3573 server.master = NULL;
3574 if (server.repl_state == REPL_STATE_CONNECTED)
3575 server.repl_current_sync_attempts = 0;
3576 server.repl_state = REPL_STATE_CONNECT;
3577 server.repl_down_since = server.unixtime;
3578 server.repl_up_since = 0;
3579 server.repl_num_master_disconnection++;
3580
3581 /* If we are in the loop of streaming accumulated buffers, discard the
3582 * buffer and clean up the rdbchannel state. The outer loop will abort once
3583 * it detects that the master client has been disconnected. For details,
3584 * see rdbChannelStreamReplDataToDb() */
3585 if (server.repl_main_ch_state & REPL_MAIN_CH_STREAMING_BUF)
3586 rdbChannelCleanup();
3587
3588 if (server.repl_disconnect_start_time == 0)
3589 server.repl_disconnect_start_time = server.unixtime;
3590 /* We lost connection with our master, don't disconnect slaves yet,
3591 * maybe we'll be able to PSYNC with our master later. We'll disconnect
3592 * the slaves only if we'll have to do a full resync with our master. */
3593
3594 /* Try to re-connect immediately rather than wait for replicationCron
3595 * waiting 1 second may risk backlog being recycled. */
3596 if (server.masterhost) {
3597 serverLog(LL_NOTICE,"Reconnecting to MASTER %s:%d",
3598 server.masterhost, server.masterport);
3599 connectWithMaster();
3600 }
3601}
3602
3603/* Rdb channel for full sync
3604 *
3605 * - During a full sync, when master is delivering RDB to the replica, incoming
3606 * write commands are kept in a replication buffer in order to be sent to the
3607 * replica once RDB delivery is completed. If RDB delivery takes a long time,
3608 * it might create memory pressure on master. Also, once a replica connection
3609 * accumulates replication data which is larger than output buffer limits,
3610 * master will kill replica connection. This may cause a replication failure.
3611 *
3612 * The main benefit of the rdb channel replication is streaming incoming
3613 * commands in parallel to the RDB delivery. This approach shifts replication
3614 * stream buffering to the replica and reduces load on master. We do this by
3615 * opening another connection for RDB delivery. The main channel on replica
3616 * will be receiving replication stream while rdb channel is receiving the RDB.
3617 *
3618 * This feature also helps to reduce master's main process CPU load. By
3619 * opening a dedicated connection for the RDB transfer, the bgsave process has
3620 * direct access to the new connection and it will stream RDB directly to the
3621 * replicas. Before this change, due to TLS connection restriction, the bgsave
3622 * process was writing RDB bytes to a pipe and the main process was forwarding
3623 * it to the replica. This is no longer necessary, the main process can avoid
3624 * these expensive socket read/write syscalls.
3625 *
3626 * Implementation
3627 * - When replica connects to the master, it sends 'rdb-channel-repl' as part
3628 * of capability exchange to let master to know replica supports rdb channel.
3629 * - When replica lacks sufficient data for PSYNC, master sends +RDBCHANNELSYNC
3630 * reply with replica's client id. As the next step, the replica opens a new
3631 * connection (rdb-channel) and configures it against the master with the
3632 * appropriate capabilities and requirements. It also sends given client id
3633 * back to master over rdbchannel so that master can associate these
3634 * channels (initial replica connection will be referred as main-channel)
3635 * Then, replica requests fullsync using the RDB channel.
3636 * - Prior to forking, master attaches the replica's main channel to the
3637 * replication backlog to deliver replication stream starting at the snapshot
3638 * end offset.
3639 * - The master main process sends replication stream via the main channel,
3640 * while the bgsave process sends the RDB directly to the replica via the
3641 * rdb-channel. Replica accumulates replication stream in a local buffer,
3642 * while the RDB is being loaded into the memory.
3643 * - Once the replica completes loading the rdb, it drops the rdb channel and
3644 * streams the accumulated replication stream into the db. Sync is completed.
3645 *
3646 * * Replica state machine *
3647 *
3648 * Main channel state
3649 * ┌───────────────────┐
3650 * │RECEIVE_PING_REPLY │
3651 * └────────┬──────────┘
3652 * │ +PONG
3653 * ┌────────▼──────────┐
3654 * │SEND_HANDSHAKE │ RDB channel state
3655 * └────────┬──────────┘ ┌───────────────────────────────┐
3656 * │+OK ┌───► RDB_CH_SEND_HANDSHAKE │
3657 * ┌────────▼──────────┐ │ └──────────────┬────────────────┘
3658 * │RECEIVE_AUTH_REPLY │ │ REPLCONF main-ch-client-id <clientid>
3659 * └────────┬──────────┘ │ ┌──────────────▼────────────────┐
3660 * │+OK │ │ RDB_CH_RECEIVE_AUTH_REPLY │
3661 * ┌────────▼──────────┐ │ └──────────────┬────────────────┘
3662 * │RECEIVE_PORT_REPLY │ │ │ +OK
3663 * └────────┬──────────┘ │ ┌──────────────▼────────────────┐
3664 * │+OK │ │ RDB_CH_RECEIVE_REPLCONF_REPLY│
3665 * ┌────────▼──────────┐ │ └──────────────┬────────────────┘
3666 * │RECEIVE_IP_REPLY │ │ │ +OK
3667 * └────────┬──────────┘ │ ┌──────────────▼────────────────┐
3668 * │+OK │ │ RDB_CH_RECEIVE_FULLRESYNC │
3669 * ┌────────▼──────────┐ │ └──────────────┬────────────────┘
3670 * │RECEIVE_CAPA_REPLY │ │ │+FULLRESYNC
3671 * └────────┬──────────┘ │ │Rdb delivery
3672 * │ │ ┌──────────────▼────────────────┐
3673 * ┌────────▼──────────┐ │ │ RDB_CH_RDB_LOADING │
3674 * │SEND_PSYNC │ │ └──────────────┬────────────────┘
3675 * └─┬─────────────────┘ │ │ Done loading
3676 * │PSYNC (use cached-master) │ │
3677 * ┌─▼─────────────────┐ │ │
3678 * │RECEIVE_PSYNC_REPLY│ │ ┌────────────►│ Replica streams replication
3679 * └─┬─────────────────┘ │ │ │ buffer into memory
3680 * │ │ │ │
3681 * │+RDBCHANNELSYNC client-id │ │ │
3682 * ├──────┬───────────────────┘ │ │
3683 * │ │ Main channel │ │
3684 * │ │ accumulates repl data │ │
3685 * │ ┌──▼────────────────┐ │ ┌───────▼───────────┐
3686 * │ │ REPL_TRANSFER ├───────┘ │ CONNECTED │
3687 * │ └───────────────────┘ └────▲───▲──────────┘
3688 * │ │ │
3689 * │ │ │
3690 * │ +FULLRESYNC ┌───────────────────┐ │ │
3691 * ├────────────────► REPL_TRANSFER ├────┘ │
3692 * │ └───────────────────┘ │
3693 * │ +CONTINUE │
3694 * └──────────────────────────────────────────────┘
3695 */
3696
3697/* Replication: Replica side. */
3698static int rdbChannelSendHandshake(connection *conn, sds *err) {
3699 /* AUTH with the master if required. */
3700 if (server.masterauth) {
3701 char *args[] = {"AUTH", NULL, NULL};
3702 size_t lens[] = {4, 0, 0};
3703 int argc = 1;
3704 if (server.masteruser) {
3705 args[argc] = server.masteruser;
3706 lens[argc] = strlen(server.masteruser);
3707 argc++;
3708 }
3709 args[argc] = server.masterauth;
3710 lens[argc] = sdslen(server.masterauth);
3711 argc++;
3712 *err = sendCommandArgv(conn, argc, args, lens);
3713 if (*err) {
3714 serverLog(LL_WARNING, "Error sending AUTH to master in rdb channel replication handshake: %s", *err);
3715 return C_ERR;
3716 }
3717 }
3718
3719 char buf[LONG_STR_SIZE];
3720 slaveGetPortStr(buf, sizeof(buf));
3721
3722 char cid[LONG_STR_SIZE];
3723 ull2string(cid, sizeof(cid), server.repl_main_ch_client_id);
3724
3725 *err = sendCommand(conn, "REPLCONF", "capa", "eof", "rdb-only", "1",
3726 "rdb-channel", "1", "main-ch-client-id", cid,
3727 "listening-port", buf, NULL);
3728 if (*err) {
3729 serverLog(LL_WARNING, "Error sending REPLCONF command to master in rdb channel handshake: %s", *err);
3730 return C_ERR;
3731 }
3732
3733 if (connSetReadHandler(conn, rdbChannelFullSyncWithMaster) == C_ERR) {
3734 char conninfo[CONN_INFO_LEN];
3735 serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)",
3736 strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo)));
3737 return C_ERR;
3738 }
3739 return C_OK;
3740}
3741
3742/* Replication: Replica side. */
3743static int rdbChannelHandleAuthReply(connection *conn, sds *err) {
3744 *err = receiveSynchronousResponse(conn);
3745 if (*err == NULL) {
3746 serverLog(LL_WARNING, "Master did not respond to auth command during rdb channel handshake");
3747 return C_ERR;
3748 }
3749 if ((*err)[0] == '-') {
3750 serverLog(LL_WARNING, "Unable to AUTH to master: %s", *err);
3751 return C_ERR;
3752 }
3753 server.repl_rdb_ch_state = REPL_RDB_CH_RECEIVE_REPLCONF_REPLY;
3754 return C_OK;
3755}
3756
3757/* Replication: Replica side. */
3758static int rdbChannelHandleReplconfReply(connection *conn, sds *err) {
3759 *err = receiveSynchronousResponse(conn);
3760 if (*err == NULL) {
3761 serverLog(LL_WARNING, "Master did not respond to replconf command during rdb channel handshake");
3762 return C_ERR;
3763 }
3764 if (*err[0] == '-') {
3765 serverLog(LL_WARNING, "Master replied error to replconf: %s", *err);
3766 return C_ERR;
3767 }
3768 sdsfree(*err);
3769
3770 if (server.repl_debug_pause & REPL_DEBUG_BEFORE_RDB_CHANNEL)
3771 debugPauseProcess();
3772
3773 /* Request rdb from master */
3774 *err = sendCommand(conn, "PSYNC", "?", "-1", NULL);
3775 if (*err) {
3776 serverLog(LL_WARNING, "I/O error writing to Master: %s", *err);
3777 return C_ERR;
3778 }
3779
3780 return C_OK;
3781}
3782
3783/* Replication: Replica side. */
3784static int rdbChannelHandleFullresyncReply(connection *conn, sds *err) {
3785 char *replid = NULL, *offset = NULL;
3786
3787 *err = receiveSynchronousResponse(conn);
3788 if (*err == NULL)
3789 return C_ERR;
3790
3791 if (*err[0] == '\0') {
3792 /* Retry again later */
3793 serverLog(LL_DEBUG, "Received empty psync reply");
3794 return C_RETRY;
3795 }
3796
3797 /* FULL RESYNC, parse the reply in order to extract the replid
3798 * and the replication offset. */
3799 replid = strchr(*err,' ');
3800 if (replid) {
3801 replid++;
3802 offset = strchr(replid, ' ');
3803 if (offset) offset++;
3804 }
3805 if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
3806 serverLog(LL_WARNING, "Received unexpected psync reply: %s", *err);
3807 return C_ERR;
3808 }
3809 memcpy(server.master_replid, replid, offset-replid-1);
3810 server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
3811 server.master_initial_offset = strtoll(offset,NULL,10);
3812
3813 /* Prepare the main and rdb channels for rdb and repl stream delivery.*/
3814 server.repl_state = REPL_STATE_TRANSFER;
3815 rdbChannelReplDataBufInit();
3816
3817 serverLog(LL_NOTICE, "Starting to receive RDB and replication stream in parallel.");
3818
3819 /* Setup connection to accumulate repl data. */
3820 server.repl_main_ch_state = REPL_MAIN_CH_ACCUMULATE_BUF;
3821 if (connSetReadHandler(server.repl_transfer_s,
3822 rdbChannelBufferReplData) != C_OK)
3823 {
3824 serverLog(LL_WARNING, "Can't set read handler for main channel: %s",
3825 strerror(errno));
3826 return C_ERR;
3827 }
3828
3829 /* Prepare RDB channel connection for RDB download. */
3830 if (connSetReadHandler(server.repl_rdb_transfer_s,
3831 readSyncBulkPayload) != C_OK)
3832 {
3833 char inf[CONN_INFO_LEN];
3834 serverLog(LL_WARNING,
3835 "Can't create readable event for rdb channel connection: %s (%s)",
3836 strerror(errno),
3837 connGetInfo(server.repl_rdb_transfer_s, inf, sizeof(inf)));
3838 return C_ERR;
3839 }
3840
3841 return C_OK;
3842}
3843
3844/* Replication: Replica side.
3845 * This connection handler is used to initialize the RDB channel connection.*/
3846static void rdbChannelFullSyncWithMaster(connection *conn) {
3847 int ret = 0;
3848 char *err = NULL;
3849 serverAssert(conn == server.repl_rdb_transfer_s);
3850
3851 /* Check for errors in the socket: after a non blocking connect() we
3852 * may find that the socket is in error state. */
3853 if (connGetState(conn) != CONN_STATE_CONNECTED) {
3854 serverLog(LL_WARNING, "Error condition on socket for rdb channel replication: %s",
3855 connGetLastError(conn));
3856 goto error;
3857 }
3858 switch (server.repl_rdb_ch_state) {
3859 case REPL_RDB_CH_SEND_HANDSHAKE:
3860 ret = rdbChannelSendHandshake(conn, &err);
3861 if (ret == C_OK)
3862 server.repl_rdb_ch_state = REPL_RDB_CH_RECEIVE_AUTH_REPLY;
3863 break;
3864 case REPL_RDB_CH_RECEIVE_AUTH_REPLY:
3865 if (server.masterauth) {
3866 ret = rdbChannelHandleAuthReply(conn, &err);
3867 if (ret == C_OK)
3868 server.repl_rdb_ch_state = REPL_RDB_CH_RECEIVE_REPLCONF_REPLY;
3869 /* Wait for next bulk before trying to read replconf reply. */
3870 break;
3871 }
3872 server.repl_rdb_ch_state = REPL_RDB_CH_RECEIVE_REPLCONF_REPLY;
3873 /* fall through */
3874 case REPL_RDB_CH_RECEIVE_REPLCONF_REPLY:
3875 ret = rdbChannelHandleReplconfReply(conn, &err);
3876 if (ret == C_OK)
3877 server.repl_rdb_ch_state = REPL_RDB_CH_RECEIVE_FULLRESYNC;
3878 break;
3879 case REPL_RDB_CH_RECEIVE_FULLRESYNC:
3880 ret = rdbChannelHandleFullresyncReply(conn, &err);
3881 if (ret == C_OK)
3882 server.repl_rdb_ch_state = REPL_RDB_CH_RDB_LOADING;
3883 break;
3884 default:
3885 serverPanic("Unknown rdb channel state: %d", server.repl_rdb_ch_state);
3886 }
3887
3888 if (ret == C_ERR)
3889 goto error;
3890
3891 sdsfree(err);
3892 return;
3893
3894error:
3895 if (err) {
3896 serverLog(LL_WARNING, "rdb channel sync failed with error: %s", err);
3897 sdsfree(err);
3898 }
3899 if (server.repl_transfer_s) {
3900 connClose(server.repl_transfer_s);
3901 server.repl_transfer_s = NULL;
3902 }
3903 server.repl_state = REPL_STATE_CONNECT;
3904 rdbChannelAbort();
3905}
3906
3907void replDataBufInit(replDataBuf *buf) {
3908 serverAssert(buf->blocks == NULL);
3909 buf->size = 0;
3910 buf->used = 0;
3911 buf->last_num_blocks = 0;
3912 buf->mem_used = 0;
3913 buf->blocks = listCreate();
3914 buf->blocks->free = zfree;
3915}
3916
3917void replDataBufClear(replDataBuf *buf) {
3918 if (buf->blocks) listRelease(buf->blocks);
3919 buf->blocks = NULL;
3920 buf->size = 0;
3921 buf->used = 0;
3922 buf->last_num_blocks = 0;
3923 buf->mem_used = 0;
3924}
3925
3926/* Replication: Replica side.
3927 * Initialize replica's local replication buffer to accumulate repl stream
3928 * during rdb channel sync. */
3929static void rdbChannelReplDataBufInit(void) {
3930 replDataBufInit(&server.repl_full_sync_buffer);
3931}
3932
3933/* Replication: Replica side.
3934 * Clear replica's local replication buffer */
3935static void rdbChannelReplDataBufClear(void) {
3936 replDataBufClear(&server.repl_full_sync_buffer);
3937}
3938
3939/* Generic function to read data from connection into the last block. */
3940static int replDataBufReadIntoLastBlock(connection *conn, replDataBuf *buf,
3941 void (*error_handler)(connection *conn))
3942{
3943 atomicIncr(server.stat_io_reads_processed[IOTHREAD_MAIN_THREAD_ID], 1);
3944
3945 replDataBufBlock *block = listNodeValue(listLast(buf->blocks));
3946 serverAssert(block && block->size > block->used);
3947
3948 int nread = connRead(conn, block->buf + block->used, block->size - block->used);
3949 if (nread <= 0) {
3950 if (nread == 0 || connGetState(conn) != CONN_STATE_CONNECTED) {
3951 error_handler(conn);
3952 }
3953 return -1;
3954 }
3955
3956 block->used += nread;
3957 if (buf) buf->used += nread;
3958 atomicIncr(server.stat_net_repl_input_bytes, nread);
3959
3960 return nread;
3961}
3962
3963/* Generic function to read data from connection into a buffer. */
3964void replDataBufReadFromConn(connection *conn, replDataBuf *buf, void (*error_handler)(connection *conn)) {
3965 const int buflen = 1024 * 1024;
3966 const int minread = 16 * 1024;
3967 int nread = 0;
3968 int needs_read = 1;
3969
3970 listNode *ln = listLast(buf->blocks);
3971 replDataBufBlock *tail = ln ? listNodeValue(ln) : NULL;
3972
3973 /* Try to append last node. */
3974 if (tail && tail->size > tail->used) {
3975 nread = replDataBufReadIntoLastBlock(conn, buf, error_handler);
3976 if (nread <= 0)
3977 return;
3978
3979 /* If buffer is filled fully, there might be more data in socket buffer.
3980 * Only read again if we've read small amount (less than minread). */
3981 needs_read = (tail->size == tail->used) && nread < minread;
3982 }
3983
3984 if (needs_read) {
3985 unsigned long long limit;
3986 size_t usable_size;
3987
3988 /* For accumulation limit, if 'replica-full-sync-buffer-limit' is set,
3989 * we'll use it. Otherwise, 'client-output-buffer-limit <replica>' is
3990 * the limit.*/
3991 limit = server.repl_full_sync_buffer_limit;
3992 if (limit == 0)
3993 limit = server.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes;
3994
3995 if (limit != 0 && buf->size > limit) {
3996 /* Currently this function is only used for replication and slots sync.
3997 * Log accordingly, maybe should be extendable in the future. */
3998 if (server.masterhost)
3999 serverLog(LL_NOTICE, "Replication buffer limit has been reached (%llu bytes), "
4000 "stopped buffering replication stream. Further accumulation may occur on master side.", limit);
4001 else
4002 serverLog(LL_NOTICE, "Slots sync buffer limit has been reached (%llu bytes), "
4003 "stopped buffering slots sync stream. Further accumulation may occur on source side.", limit);
4004
4005 connSetReadHandler(conn, NULL);
4006 return;
4007 }
4008
4009 tail = zmalloc_usable(buflen, &usable_size);
4010 tail->size = usable_size - sizeof(replDataBufBlock);
4011 tail->used = 0;
4012
4013 listAddNodeTail(buf->blocks, tail);
4014 buf->size += tail->size;
4015 buf->mem_used += usable_size + sizeof(listNode);
4016
4017 /* Update buffer's peak */
4018 if (buf->peak < buf->size)
4019 buf->peak = buf->size;
4020
4021 replDataBufReadIntoLastBlock(conn, buf, error_handler);
4022 }
4023}
4024
4025/* Replication: Replica side.
4026 * Main channel read error handler */
4027static void readReplBufferErrorHandler(connection *conn) {
4028 serverLog(LL_WARNING, "Main channel error while reading from master: %s",
4029 connGetLastError(conn));
4030 cancelReplicationHandshake(1);
4031}
4032
4033/* Replication: Replica side.
4034 * Read handler for buffering incoming repl data during RDB download/loading. */
4035static void rdbChannelBufferReplData(connection *conn) {
4036 replDataBuf *buf = &server.repl_full_sync_buffer;
4037
4038 if (server.repl_main_ch_state & REPL_MAIN_CH_STREAMING_BUF) {
4039 /* While streaming accumulated buffers, we continue reading from the
4040 * master to prevent accumulation on master side as much as possible.
4041 * However, we aim to drain buffer eventually. To ensure we consume more
4042 * than we read, we'll read at most one block after two blocks of
4043 * buffers are consumed. */
4044 if (listLength(buf->blocks) + 1 >= buf->last_num_blocks)
4045 return;
4046 buf->last_num_blocks = listLength(buf->blocks);
4047 }
4048
4049 replDataBufReadFromConn(conn, buf, readReplBufferErrorHandler);
4050}
4051
4052/* Generic function to stream replDataBuf data into database
4053 * Returns C_OK on success, C_ERR on error */
4054int replDataBufStreamToDb(replDataBuf *buf, replDataBufToDbCtx *ctx) {
4055 listNode *n;
4056 int ret = C_OK;
4057 client *c = ctx->client;
4058
4059 blockingOperationStarts();
4060 while ((n = listFirst(buf->blocks))) {
4061 replDataBufBlock *o = listNodeValue(n);
4062 listUnlinkNode(buf->blocks, n);
4063 zfree(n);
4064
4065 size_t processed = 0;
4066 while (processed < o->used) {
4067 size_t bytes = min(PROTO_IOBUF_LEN, o->used - processed);
4068 c->querybuf = sdscatlen(c->querybuf, &o->buf[processed], bytes);
4069 c->read_reploff += (long long int) bytes;
4070 c->lastinteraction = server.unixtime;
4071
4072 /* We don't expect error return value but just in case. */
4073 ret = processInputBuffer(c);
4074 if (ret != C_OK) break;
4075
4076 processed += bytes;
4077 buf->used -= bytes;
4078
4079 if (server.repl_debug_pause & REPL_DEBUG_ON_STREAMING_REPL_BUF)
4080 debugPauseProcess();
4081
4082 /* Check if we should yield back to the event loop */
4083 if (server.loading_process_events_interval_bytes &&
4084 ((ctx->applied_offset + bytes) / server.loading_process_events_interval_bytes >
4085 ctx->applied_offset / server.loading_process_events_interval_bytes))
4086 {
4087 ctx->yield_callback(ctx);
4088 processEventsWhileBlocked();
4089 }
4090 ctx->applied_offset += bytes;
4091
4092 /* Check if we should continue processing */
4093 if (!ctx->should_continue(ctx)) {
4094 ret = C_ERR;
4095 break;
4096 }
4097
4098 /* Streaming buffer into the database more slowly is useful in order
4099 * to test certain edge cases. */
4100 if (server.key_load_delay) debugDelay(server.key_load_delay);
4101 }
4102 size_t size = o->size;
4103 zfree(o);
4104
4105 /* Break the loop if there is an error. */
4106 if (ret != C_OK) break;
4107
4108 /* Update stats */
4109 buf->size -= size;
4110 buf->mem_used -= (size + sizeof(listNode) + sizeof(replDataBufBlock));
4111 }
4112 blockingOperationEnds();
4113
4114 return ret;
4115}
4116
4117/* Replication: Replica side.
4118 * Yield callback for streaming replDataBuf to database */
4119static void rdbChannelStreamYieldCallback(void *ctx) {
4120 UNUSED(ctx);
4121 replicationSendNewlineToMaster();
4122}
4123
4124/* Replication: Replica side.
4125 * Global variable to track number of master disconnection.
4126 * Used to detect master disconnection when streaming replDataBuf to database */
4127static uint64_t ReplNumMasterDisconnection = 0;
4128
4129/* Replication: Replica side.
4130 * Check if we should continue streaming replDataBuf to database */
4131static int rdbChannelStreamShouldContinue(void *ctx) {
4132 replDataBufToDbCtx *context = ctx;
4133
4134 /* Check if master client was freed in processEventsWhileBlocked().
4135 * It can happen if we receive 'replicaof' command or 'client kill'
4136 * command for the master. */
4137 if (ReplNumMasterDisconnection != server.repl_num_master_disconnection ||
4138 !server.repl_full_sync_buffer.blocks ||
4139 context->client->flags & CLIENT_CLOSE_ASAP)
4140 {
4141 return 0;
4142 }
4143 return 1;
4144}
4145
4146/* Replication: Replica side.
4147 * Streams accumulated replication data into the database. */
4148static void rdbChannelStreamReplDataToDb(void) {
4149 int ret = C_OK, close_asap = 0;
4150 client *c = server.master;
4151
4152 /* Save repl_num_master_disconnection to figure out if master gets
4153 * disconnected when we yield back to processEventsWhileBlocked() */
4154 ReplNumMasterDisconnection = server.repl_num_master_disconnection;
4155
4156 server.repl_main_ch_state |= REPL_MAIN_CH_STREAMING_BUF;
4157 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Starting to stream replication buffer into the db"
4158 " (%zu bytes).", server.repl_full_sync_buffer.used);
4159 if (!server.repl_full_sync_buffer.blocks)
4160 goto out;
4161
4162 /* Mark the peek buffer block count. We'll use it to verify we consume
4163 * faster than we read from the master. */
4164 server.repl_full_sync_buffer.last_num_blocks = listLength(server.repl_full_sync_buffer.blocks);
4165 /* Set read handler to continue accumulating during streaming */
4166 connSetReadHandler(c->conn, rdbChannelBufferReplData);
4167
4168 replDataBufToDbCtx ctx = {
4169 .client = c,
4170 .applied_offset = 0,
4171 .should_continue = rdbChannelStreamShouldContinue,
4172 .yield_callback = rdbChannelStreamYieldCallback,
4173 };
4174
4175 ret = replDataBufStreamToDb(&server.repl_full_sync_buffer, &ctx);
4176
4177out:
4178 /* If main channel state is CLOSE_ASAP, it means main channel faced a
4179 * problem while RDB is being loaded or while we are applying the
4180 * accumulated buffer. It stopped replication stream buffering. It's okay
4181 * though. We streamed whatever we have into the db, now we can free master
4182 * client and replica can try psync. */
4183 close_asap = (server.repl_main_ch_state & REPL_MAIN_CH_CLOSE_ASAP);
4184
4185 if (ret == C_OK) {
4186 serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Successfully streamed replication buffer into the db (%zu bytes in total)",
4187 ctx.applied_offset);
4188 /* Revert the read handler */
4189 if (!close_asap && connSetReadHandler(c->conn, readQueryFromClient) != C_OK) {
4190 serverLog(LL_WARNING,
4191 "Can't create readable event for master client: %s",
4192 strerror(errno));
4193 close_asap = 1;
4194 }
4195 } else {
4196 serverLog(LL_WARNING, "Master client was freed while streaming accumulated replication data to db.");
4197 close_asap = 1;
4198 }
4199
4200 /* If master is disconnected, state should have been cleaned up
4201 * already. Otherwise, we do it here. */
4202 if (ReplNumMasterDisconnection == server.repl_num_master_disconnection) {
4203 rdbChannelCleanup();
4204 if (server.master && close_asap)
4205 freeClient(server.master);
4206 }
4207}
4208
4209static void rdbChannelCleanup(void) {
4210 server.repl_rdb_ch_state = REPL_RDB_CH_STATE_NONE;
4211 server.repl_main_ch_state = REPL_MAIN_CH_NONE;
4212 rdbChannelReplDataBufClear();
4213}
4214
4215/* Replication: Replica side.
4216 * On rdb channel failure, close rdb-connection and reset state.
4217 * Return C_OK if cleanup is done. Otherwise, returns C_ERR which means cleanup
4218 * will be done asynchronously. */
4219static int rdbChannelAbort(void) {
4220 if (server.repl_rdb_ch_state == REPL_RDB_CH_STATE_NONE)
4221 return C_OK;
4222
4223 /* This function may also be called if a problem is detected on the main
4224 * channel. In this case, we handle the situation differently based on
4225 * the current state:
4226 * - If we started loading the RDB file and the RDB is disk-based, we mark
4227 * the main channel's state as CLOSE_ASAP and defer the failure handling
4228 * until after the RDB has been loaded. This way we allow the replica to
4229 * retry psync after the RDB is loaded.
4230 * - For diskless loading, we cannot safely free the rdb channel connection
4231 * object. Instead, we mark the RIO object as aborted so the next
4232 * rioRead() will fail safely.
4233 * - If the RDB has already been loaded, and we are streaming the
4234 * accumulated buffer to the database, we mark the main connection
4235 * as CLOSE_ASAP and wait until the accumulated buffer is drained.
4236 * Once done, the replica can attempt psync with the offset it has. */
4237 int async_cleanup = (server.repl_rdb_transfer_s && server.loading) ||
4238 (server.repl_main_ch_state & REPL_MAIN_CH_STREAMING_BUF);
4239 if (async_cleanup) {
4240 if (server.repl_rdb_transfer_s && server.loading) {
4241 serverLog(LL_NOTICE, "Aborting rdb channel sync while loading the RDB.");
4242
4243 if (disklessLoadingRio)
4244 /* Mark rio with abort flag, next rioRead() will return error.*/
4245 rioAbort(disklessLoadingRio);
4246 else {
4247 /* For disk based loading, we can wait until loading is done.
4248 * This way, replica will have a chance for a successful psync
4249 * later.*/
4250 serverLog(LL_NOTICE, "After loading RDB, replica will try psync with master.");
4251 }
4252 }
4253
4254 if (server.repl_transfer_s)
4255 connSetReadHandler(server.repl_transfer_s, NULL);
4256
4257 server.repl_main_ch_state |= REPL_MAIN_CH_CLOSE_ASAP;
4258 return C_ERR;
4259 }
4260
4261 serverLog(LL_NOTICE, "Aborting rdb channel sync");
4262
4263 if (server.repl_rdb_transfer_s) {
4264 connClose(server.repl_rdb_transfer_s);
4265 server.repl_rdb_transfer_s = NULL;
4266 }
4267 if (server.repl_transfer_fd != -1) {
4268 close(server.repl_transfer_fd);
4269 server.repl_transfer_fd = -1;
4270 }
4271 if (server.repl_transfer_tmpfile) {
4272 bg_unlink(server.repl_transfer_tmpfile);
4273 zfree(server.repl_transfer_tmpfile);
4274 server.repl_transfer_tmpfile = NULL;
4275 }
4276 rdbChannelCleanup();
4277 return C_OK;
4278}
4279
4280void replicaofCommand(client *c) {
4281 /* SLAVEOF is not allowed in cluster mode as replication is automatically
4282 * configured using the current address of the master node. */
4283 if (server.cluster_enabled) {
4284 addReplyError(c,"REPLICAOF not allowed in cluster mode.");
4285 return;
4286 }
4287
4288 if (server.failover_state != NO_FAILOVER) {
4289 addReplyError(c,"REPLICAOF not allowed while failing over.");
4290 return;
4291 }
4292
4293 /* The special host/port combination "NO" "ONE" turns the instance
4294 * into a master. Otherwise the new master address is set. */
4295 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4296 !strcasecmp(c->argv[2]->ptr,"one")) {
4297 if (server.masterhost) {
4298 replicationUnsetMaster();
4299 sds client = catClientInfoString(sdsempty(),c);
4300 serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')",
4301 client);
4302 sdsfree(client);
4303 }
4304 } else {
4305 long port;
4306
4307 if (c->flags & CLIENT_SLAVE)
4308 {
4309 /* If a client is already a replica they cannot run this command,
4310 * because it involves flushing all replicas (including this
4311 * client) */
4312 addReplyError(c, "Command is not valid when client is a replica.");
4313 return;
4314 }
4315
4316 if (getRangeLongFromObjectOrReply(c, c->argv[2], 0, 65535, &port,
4317 "Invalid master port") != C_OK)
4318 return;
4319
4320 /* Check if we are already attached to the specified master */
4321 if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)
4322 && server.masterport == port) {
4323 serverLog(LL_NOTICE,"REPLICAOF would result into synchronization "
4324 "with the master we are already connected "
4325 "with. No operation performed.");
4326 addReplySds(c,sdsnew("+OK Already connected to specified "
4327 "master\r\n"));
4328 return;
4329 }
4330 /* There was no previous master or the user specified a different one,
4331 * we can continue. */
4332 replicationSetMaster(c->argv[1]->ptr, port);
4333 sds client = catClientInfoString(sdsempty(),c);
4334 serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')",
4335 server.masterhost, server.masterport, client);
4336 sdsfree(client);
4337 }
4338 addReply(c,shared.ok);
4339}
4340
4341/* ROLE command: provide information about the role of the instance
4342 * (master or slave) and additional information related to replication
4343 * in an easy to process format. */
4344void roleCommand(client *c) {
4345 if (server.sentinel_mode) {
4346 sentinelRoleCommand(c);
4347 return;
4348 }
4349
4350 if (server.masterhost == NULL) {
4351 listIter li;
4352 listNode *ln;
4353 void *mbcount;
4354 int slaves = 0;
4355
4356 addReplyArrayLen(c,3);
4357 addReplyBulkCBuffer(c,"master",6);
4358 addReplyLongLong(c,server.master_repl_offset);
4359 mbcount = addReplyDeferredLen(c);
4360 listRewind(server.slaves,&li);
4361 while((ln = listNext(&li))) {
4362 client *slave = ln->value;
4363 char ip[NET_IP_STR_LEN], *slaveaddr = slave->slave_addr;
4364
4365 if (!slaveaddr) {
4366 if (connAddrPeerName(slave->conn,ip,sizeof(ip),NULL) == -1)
4367 continue;
4368 slaveaddr = ip;
4369 }
4370 if (slave->replstate != SLAVE_STATE_ONLINE) continue;
4371 addReplyArrayLen(c,3);
4372 addReplyBulkCString(c,slaveaddr);
4373 addReplyBulkLongLong(c,slave->slave_listening_port);
4374 addReplyBulkLongLong(c,slave->repl_ack_off);
4375 slaves++;
4376 }
4377 setDeferredArrayLen(c,mbcount,slaves);
4378 } else {
4379 char *slavestate = NULL;
4380
4381 addReplyArrayLen(c,5);
4382 addReplyBulkCBuffer(c,"slave",5);
4383 addReplyBulkCString(c,server.masterhost);
4384 addReplyLongLong(c,server.masterport);
4385 if (slaveIsInHandshakeState()) {
4386 slavestate = "handshake";
4387 } else {
4388 switch(server.repl_state) {
4389 case REPL_STATE_NONE: slavestate = "none"; break;
4390 case REPL_STATE_CONNECT: slavestate = "connect"; break;
4391 case REPL_STATE_CONNECTING: slavestate = "connecting"; break;
4392 case REPL_STATE_TRANSFER: slavestate = "sync"; break;
4393 case REPL_STATE_CONNECTED: slavestate = "connected"; break;
4394 default: slavestate = "unknown"; break;
4395 }
4396 }
4397 addReplyBulkCString(c,slavestate);
4398 addReplyLongLong(c,server.master ? server.master->reploff : -1);
4399 }
4400}
4401
4402/* Send a REPLCONF ACK command to the master to inform it about the current
4403 * processed offset. If we are not connected with a master, the command has
4404 * no effects. */
4405void replicationSendAck(void) {
4406 client *c = server.master;
4407
4408 if (c != NULL) {
4409 int send_fack = server.fsynced_reploff != -1;
4410 c->flags |= CLIENT_MASTER_FORCE_REPLY;
4411 addReplyArrayLen(c,send_fack ? 5 : 3);
4412 addReplyBulkCString(c,"REPLCONF");
4413 addReplyBulkCString(c,"ACK");
4414 addReplyBulkLongLong(c,c->reploff);
4415 if (send_fack) {
4416 addReplyBulkCString(c,"FACK");
4417 addReplyBulkLongLong(c,server.fsynced_reploff);
4418 }
4419 c->flags &= ~CLIENT_MASTER_FORCE_REPLY;
4420 /* Accumulation from above replies must be reset back to 0 manually,
4421 * as this subroutine does not invoke resetClient(). */
4422 c->net_output_bytes_curr_cmd = 0;
4423 }
4424}
4425
4426/* ---------------------- MASTER CACHING FOR PSYNC -------------------------- */
4427
4428/* In order to implement partial synchronization we need to be able to cache
4429 * our master's client structure after a transient disconnection.
4430 * It is cached into server.cached_master and flushed away using the following
4431 * functions. */
4432
4433/* This function is called by freeClient() in order to cache the master
4434 * client structure instead of destroying it. freeClient() will return
4435 * ASAP after this function returns, so every action needed to avoid problems
4436 * with a client that is really "suspended" has to be done by this function.
4437 *
4438 * The other functions that will deal with the cached master are:
4439 *
4440 * replicationDiscardCachedMaster() that will make sure to kill the client
4441 * as for some reason we don't want to use it in the future.
4442 *
4443 * replicationResurrectCachedMaster() that is used after a successful PSYNC
4444 * handshake in order to reactivate the cached master.
4445 */
4446void replicationCacheMaster(client *c) {
4447 serverAssert(server.master != NULL && server.cached_master == NULL);
4448 serverAssert(server.master->tid == IOTHREAD_MAIN_THREAD_ID);
4449 serverLog(LL_NOTICE,"Caching the disconnected master state.");
4450
4451 /* Unlink the client from the server structures. */
4452 unlinkClient(c);
4453
4454 /* Reset the master client so that's ready to accept new commands:
4455 * we want to discard the non processed query buffers and non processed
4456 * offsets, including pending transactions, already populated arguments,
4457 * pending outputs to the master. */
4458 sdsclear(server.master->querybuf);
4459 server.master->qb_pos = 0;
4460 server.master->repl_applied = 0;
4461 server.master->read_reploff = server.master->reploff;
4462 server.master->reploff_next = 0;
4463 if (c->flags & CLIENT_MULTI) discardTransaction(c);
4464 listEmpty(c->reply);
4465 c->sentlen = 0;
4466 c->reply_bytes = 0;
4467 c->bufpos = 0;
4468 resetClient(c, -1);
4469 resetClientQbufState(c);
4470
4471 /* Save the master. Server.master will be set to null later by
4472 * replicationHandleMasterDisconnection(). */
4473 server.cached_master = server.master;
4474
4475 /* Invalidate the Peer ID cache. */
4476 if (c->peerid) {
4477 sdsfree(c->peerid);
4478 c->peerid = NULL;
4479 }
4480 /* Invalidate the Sock Name cache. */
4481 if (c->sockname) {
4482 sdsfree(c->sockname);
4483 c->sockname = NULL;
4484 }
4485
4486 /* Caching the master happens instead of the actual freeClient() call,
4487 * so make sure to adjust the replication state. This function will
4488 * also set server.master to NULL. */
4489 replicationHandleMasterDisconnection();
4490}
4491
4492/* This function is called when a master is turned into a slave, in order to
4493 * create from scratch a cached master for the new client, that will allow
4494 * to PSYNC with the slave that was promoted as the new master after a
4495 * failover.
4496 *
4497 * Assuming this instance was previously the master instance of the new master,
4498 * the new master will accept its replication ID, and potential also the
4499 * current offset if no data was lost during the failover. So we use our
4500 * current replication ID and offset in order to synthesize a cached master. */
4501void replicationCacheMasterUsingMyself(void) {
4502 serverLog(LL_NOTICE,
4503 "Before turning into a replica, using my own master parameters "
4504 "to synthesize a cached master: I may be able to synchronize with "
4505 "the new master with just a partial transfer.");
4506
4507 /* This will be used to populate the field server.master->reploff
4508 * by replicationCreateMasterClient(). We'll later set the created
4509 * master as server.cached_master, so the replica will use such
4510 * offset for PSYNC. */
4511 server.master_initial_offset = server.master_repl_offset;
4512
4513 /* The master client we create can be set to any DBID, because
4514 * the new master will start its replication stream with SELECT. */
4515 replicationCreateMasterClient(NULL,-1);
4516
4517 /* Use our own ID / offset. */
4518 memcpy(server.master->replid, server.replid, sizeof(server.replid));
4519
4520 /* Set as cached master. */
4521 unlinkClient(server.master);
4522 server.cached_master = server.master;
4523 server.master = NULL;
4524}
4525
4526/* Free a cached master, called when there are no longer the conditions for
4527 * a partial resync on reconnection. */
4528void replicationDiscardCachedMaster(void) {
4529 if (server.cached_master == NULL) return;
4530
4531 serverLog(LL_NOTICE,"Discarding previously cached master state.");
4532 server.cached_master->flags &= ~CLIENT_MASTER;
4533 freeClient(server.cached_master);
4534 server.cached_master = NULL;
4535}
4536
4537/* Turn the cached master into the current master, using the file descriptor
4538 * passed as argument as the socket for the new master.
4539 *
4540 * This function is called when successfully setup a partial resynchronization
4541 * so the stream of data that we'll receive will start from where this
4542 * master left. */
4543void replicationResurrectCachedMaster(connection *conn) {
4544 serverAssert(server.cached_master->tid == IOTHREAD_MAIN_THREAD_ID);
4545
4546 server.master = server.cached_master;
4547 server.cached_master = NULL;
4548 server.master->conn = conn;
4549 connSetPrivateData(server.master->conn, server.master);
4550 server.master->flags &= ~(CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP);
4551 server.master->authenticated = 1;
4552 server.master->lastinteraction = server.unixtime;
4553 server.repl_state = REPL_STATE_CONNECTED;
4554 server.repl_down_since = 0;
4555 server.repl_up_since = server.unixtime;
4556 if (server.repl_disconnect_start_time != 0) {
4557 server.repl_total_disconnect_time += server.unixtime - server.repl_disconnect_start_time;
4558 server.repl_disconnect_start_time = 0;
4559 }
4560 /* Fire the master link modules event. */
4561 moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
4562 REDISMODULE_SUBEVENT_MASTER_LINK_UP,
4563 NULL);
4564
4565 /* Re-add to the list of clients. */
4566 linkClient(server.master);
4567 if (connSetReadHandler(server.master->conn, readQueryFromClient)) {
4568 serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno));
4569 freeClientAsync(server.master); /* Close ASAP. */
4570 }
4571
4572 /* We may also need to install the write handler as well if there is
4573 * pending data in the write buffers. */
4574 if (clientHasPendingReplies(server.master)) {
4575 if (connSetWriteHandler(server.master->conn, sendReplyToClient)) {
4576 serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno));
4577 freeClientAsync(server.master); /* Close ASAP. */
4578 }
4579 }
4580}
4581
4582/* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */
4583
4584/* This function counts the number of slaves with lag <= min-slaves-max-lag.
4585 * If the option is active, the server will prevent writes if there are not
4586 * enough connected slaves with the specified lag (or less). */
4587void refreshGoodSlavesCount(void) {
4588 listIter li;
4589 listNode *ln;
4590 int good = 0;
4591
4592 if (!server.repl_min_slaves_to_write ||
4593 !server.repl_min_slaves_max_lag) return;
4594
4595 listRewind(server.slaves,&li);
4596 while((ln = listNext(&li))) {
4597 client *slave = ln->value;
4598 time_t lag = server.unixtime - slave->repl_ack_time;
4599
4600 if (slave->replstate == SLAVE_STATE_ONLINE &&
4601 lag <= server.repl_min_slaves_max_lag) good++;
4602 }
4603 server.repl_good_slaves_count = good;
4604}
4605
4606/* return true if status of good replicas is OK. otherwise false */
4607int checkGoodReplicasStatus(void) {
4608 return server.masterhost || /* not a primary status should be OK */
4609 !server.repl_min_slaves_max_lag || /* Min slave max lag not configured */
4610 !server.repl_min_slaves_to_write || /* Min slave to write not configured */
4611 server.repl_good_slaves_count >= server.repl_min_slaves_to_write; /* check if we have enough slaves */
4612}
4613
4614/* ----------------------- SYNCHRONOUS REPLICATION --------------------------
4615 * Redis synchronous replication design can be summarized in points:
4616 *
4617 * - Redis masters have a global replication offset, used by PSYNC.
4618 * - Master increment the offset every time new commands are sent to slaves.
4619 * - Slaves ping back masters with the offset processed so far.
4620 *
4621 * So synchronous replication adds a new WAIT command in the form:
4622 *
4623 * WAIT <num_replicas> <milliseconds_timeout>
4624 *
4625 * That returns the number of replicas that processed the query when
4626 * we finally have at least num_replicas, or when the timeout was
4627 * reached.
4628 *
4629 * The command is implemented in this way:
4630 *
4631 * - Every time a client processes a command, we remember the replication
4632 * offset after sending that command to the slaves.
4633 * - When WAIT is called, we ask slaves to send an acknowledgement ASAP.
4634 * The client is blocked at the same time (see blocked.c).
4635 * - Once we receive enough ACKs for a given offset or when the timeout
4636 * is reached, the WAIT command is unblocked and the reply sent to the
4637 * client.
4638 */
4639
4640/* This just set a flag so that we broadcast a REPLCONF GETACK command
4641 * to all the slaves in the beforeSleep() function. Note that this way
4642 * we "group" all the clients that want to wait for synchronous replication
4643 * in a given event loop iteration, and send a single GETACK for them all. */
4644void replicationRequestAckFromSlaves(void) {
4645 server.get_ack_from_slaves = 1;
4646}
4647
4648/* Return the number of slaves that already acknowledged the specified
4649 * replication offset. */
4650int replicationCountAcksByOffset(long long offset) {
4651 listIter li;
4652 listNode *ln;
4653 int count = 0;
4654
4655 listRewind(server.slaves,&li);
4656 while((ln = listNext(&li))) {
4657 client *slave = ln->value;
4658
4659 if (slave->replstate != SLAVE_STATE_ONLINE) continue;
4660 if (slave->repl_ack_off >= offset) count++;
4661 }
4662 return count;
4663}
4664
4665/* Return the number of replicas that already acknowledged the specified
4666 * replication offset being AOF fsynced. */
4667int replicationCountAOFAcksByOffset(long long offset) {
4668 listIter li;
4669 listNode *ln;
4670 int count = 0;
4671
4672 listRewind(server.slaves,&li);
4673 while((ln = listNext(&li))) {
4674 client *slave = ln->value;
4675
4676 if (slave->replstate != SLAVE_STATE_ONLINE) continue;
4677 if (slave->repl_aof_off >= offset) count++;
4678 }
4679 return count;
4680}
4681
4682/* WAIT for N replicas to acknowledge the processing of our latest
4683 * write command (and all the previous commands). */
4684void waitCommand(client *c) {
4685 mstime_t timeout;
4686 long numreplicas, ackreplicas;
4687 long long offset = c->woff;
4688
4689 if (server.masterhost) {
4690 addReplyError(c,"WAIT cannot be used with replica instances. Please also note that since Redis 4.0 if a replica is configured to be writable (which is not the default) writes to replicas are just local and are not propagated.");
4691 return;
4692 }
4693
4694 /* Argument parsing. */
4695 if (getLongFromObjectOrReply(c,c->argv[1],&numreplicas,NULL) != C_OK)
4696 return;
4697 if (getTimeoutFromObjectOrReply(c,c->argv[2],&timeout,UNIT_MILLISECONDS)
4698 != C_OK) return;
4699
4700 /* First try without blocking at all. */
4701 ackreplicas = replicationCountAcksByOffset(c->woff);
4702 if (ackreplicas >= numreplicas || c->flags & CLIENT_DENY_BLOCKING) {
4703 addReplyLongLong(c,ackreplicas);
4704 return;
4705 }
4706
4707 /* Otherwise block the client and put it into our list of clients
4708 * waiting for ack from slaves. */
4709 blockForReplication(c,timeout,offset,numreplicas);
4710
4711 /* Make sure that the server will send an ACK request to all the slaves
4712 * before returning to the event loop. */
4713 replicationRequestAckFromSlaves();
4714}
4715
4716/* WAIT for N replicas and / or local master to acknowledge our latest
4717 * write command got synced to the disk. */
4718void waitaofCommand(client *c) {
4719 mstime_t timeout;
4720 long numreplicas, numlocal, ackreplicas, acklocal;
4721
4722 /* Argument parsing. */
4723 if (getRangeLongFromObjectOrReply(c,c->argv[1],0,1,&numlocal,NULL) != C_OK)
4724 return;
4725 if (getPositiveLongFromObjectOrReply(c,c->argv[2],&numreplicas,NULL) != C_OK)
4726 return;
4727 if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout,UNIT_MILLISECONDS) != C_OK)
4728 return;
4729
4730 if (server.masterhost) {
4731 addReplyError(c,"WAITAOF cannot be used with replica instances. Please also note that writes to replicas are just local and are not propagated.");
4732 return;
4733 }
4734 if (numlocal && !server.aof_enabled) {
4735 addReplyError(c, "WAITAOF cannot be used when numlocal is set but appendonly is disabled.");
4736 return;
4737 }
4738
4739 /* First try without blocking at all. */
4740 ackreplicas = replicationCountAOFAcksByOffset(c->woff);
4741 acklocal = server.fsynced_reploff >= c->woff;
4742 if ((ackreplicas >= numreplicas && acklocal >= numlocal) || c->flags & CLIENT_DENY_BLOCKING) {
4743 addReplyArrayLen(c,2);
4744 addReplyLongLong(c,acklocal);
4745 addReplyLongLong(c,ackreplicas);
4746 return;
4747 }
4748
4749 /* Otherwise block the client and put it into our list of clients
4750 * waiting for ack from slaves. */
4751 blockForAofFsync(c,timeout,c->woff,numlocal,numreplicas);
4752
4753 /* Make sure that the server will send an ACK request to all the slaves
4754 * before returning to the event loop. */
4755 replicationRequestAckFromSlaves();
4756}
4757
4758/* This is called by unblockClient() to perform the blocking op type
4759 * specific cleanup. We just remove the client from the list of clients
4760 * waiting for replica acks. Never call it directly, call unblockClient()
4761 * instead. */
4762void unblockClientWaitingReplicas(client *c) {
4763 listNode *ln = listSearchKey(server.clients_waiting_acks,c);
4764 serverAssert(ln != NULL);
4765 listDelNode(server.clients_waiting_acks,ln);
4766 updateStatsOnUnblock(c, 0, 0, 0);
4767}
4768
4769/* Check if there are clients blocked in WAIT or WAITAOF that can be unblocked
4770 * since we received enough ACKs from slaves. */
4771void processClientsWaitingReplicas(void) {
4772 long long last_offset = 0;
4773 long long last_aof_offset = 0;
4774 int last_numreplicas = 0;
4775 int last_aof_numreplicas = 0;
4776
4777 listIter li;
4778 listNode *ln;
4779
4780 listRewind(server.clients_waiting_acks,&li);
4781 while((ln = listNext(&li))) {
4782 int numlocal = 0;
4783 int numreplicas = 0;
4784
4785 client *c = ln->value;
4786 int is_wait_aof = c->bstate.btype == BLOCKED_WAITAOF;
4787
4788 if (is_wait_aof && c->bstate.numlocal && !server.aof_enabled) {
4789 addReplyError(c, "WAITAOF cannot be used when numlocal is set but appendonly is disabled.");
4790 unblockClient(c, 1);
4791 continue;
4792 }
4793
4794 /* Every time we find a client that is satisfied for a given
4795 * offset and number of replicas, we remember it so the next client
4796 * may be unblocked without calling replicationCountAcksByOffset()
4797 * or calling replicationCountAOFAcksByOffset()
4798 * if the requested offset / replicas were equal or less. */
4799 if (!is_wait_aof && last_offset && last_offset >= c->bstate.reploffset &&
4800 last_numreplicas >= c->bstate.numreplicas)
4801 {
4802 numreplicas = last_numreplicas;
4803 } else if (is_wait_aof && last_aof_offset && last_aof_offset >= c->bstate.reploffset &&
4804 last_aof_numreplicas >= c->bstate.numreplicas)
4805 {
4806 numreplicas = last_aof_numreplicas;
4807 } else {
4808 numreplicas = is_wait_aof ?
4809 replicationCountAOFAcksByOffset(c->bstate.reploffset) :
4810 replicationCountAcksByOffset(c->bstate.reploffset);
4811
4812 /* Check if the number of replicas is satisfied. */
4813 if (numreplicas < c->bstate.numreplicas) continue;
4814
4815 if (is_wait_aof) {
4816 last_aof_offset = c->bstate.reploffset;
4817 last_aof_numreplicas = numreplicas;
4818 } else {
4819 last_offset = c->bstate.reploffset;
4820 last_numreplicas = numreplicas;
4821 }
4822 }
4823
4824 /* Check if the local constraint of WAITAOF is served */
4825 if (is_wait_aof) {
4826 numlocal = server.fsynced_reploff >= c->bstate.reploffset;
4827 if (numlocal < c->bstate.numlocal) continue;
4828 }
4829
4830 /* Reply before unblocking, because unblock client calls reqresAppendResponse */
4831 if (is_wait_aof) {
4832 /* WAITAOF has an array reply */
4833 addReplyArrayLen(c, 2);
4834 addReplyLongLong(c, numlocal);
4835 addReplyLongLong(c, numreplicas);
4836 } else {
4837 addReplyLongLong(c, numreplicas);
4838 }
4839
4840 unblockClient(c, 1);
4841 }
4842}
4843
4844/* Return the slave replication offset for this instance, that is
4845 * the offset for which we already processed the master replication stream. */
4846long long replicationGetSlaveOffset(void) {
4847 long long offset = 0;
4848
4849 if (server.masterhost != NULL) {
4850 if (server.master) {
4851 offset = server.master->reploff;
4852 } else if (server.cached_master) {
4853 offset = server.cached_master->reploff;
4854 }
4855 }
4856 /* offset may be -1 when the master does not support it at all, however
4857 * this function is designed to return an offset that can express the
4858 * amount of data processed by the master, so we return a positive
4859 * integer. */
4860 if (offset < 0) offset = 0;
4861 return offset;
4862}
4863
4864/* --------------------------- REPLICATION CRON ---------------------------- */
4865
4866/* Replication cron function, called 1 time per second. */
4867void replicationCron(void) {
4868 /* Check failover status first, to see if we need to start
4869 * handling the failover. */
4870 updateFailoverStatus();
4871
4872 /* Non blocking connection timeout? */
4873 if (server.masterhost &&
4874 (server.repl_state == REPL_STATE_CONNECTING ||
4875 slaveIsInHandshakeState()) &&
4876 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
4877 {
4878 serverLog(LL_WARNING,"Timeout connecting to the MASTER...");
4879 cancelReplicationHandshake(1);
4880 }
4881
4882 /* Bulk transfer I/O timeout? */
4883 if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER &&
4884 (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout)
4885 {
4886 serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value.");
4887 cancelReplicationHandshake(1);
4888 }
4889
4890 /* Check if we should connect to a MASTER */
4891 if (server.repl_state == REPL_STATE_CONNECT) {
4892 serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
4893 server.masterhost, server.masterport);
4894 connectWithMaster();
4895 }
4896
4897 replicationCronRunMasterClient();
4898
4899 /* If we have attached slaves, PING them from time to time.
4900 * So slaves can implement an explicit timeout to masters, and will
4901 * be able to detect a link disconnection even if the TCP connection
4902 * will not actually go down. */
4903 listIter li;
4904 listNode *ln;
4905 robj *ping_argv[1];
4906
4907 /* First, send PING according to ping_slave_period. The reason why master
4908 * sends PING is to keep the connection with replica active, so master need
4909 * not send PING to replicas if already sent replication stream in the past
4910 * repl_ping_slave_period time. */
4911 if (server.masterhost == NULL && listLength(server.slaves) &&
4912 server.unixtime >= server.repl_stream_lastio + server.repl_ping_slave_period)
4913 {
4914 /* Note that we don't send the PING if the clients are paused during
4915 * a Redis Cluster manual failover: the PING we send will otherwise
4916 * alter the replication offsets of master and slave, and will no longer
4917 * match the one stored into 'mf_master_offset' state. */
4918 int manual_failover_in_progress =
4919 ((server.cluster_enabled &&
4920 clusterManualFailoverTimeLimit()) ||
4921 server.failover_end_time) &&
4922 isPausedActionsWithUpdate(PAUSE_ACTION_REPLICA);
4923
4924 if (!manual_failover_in_progress) {
4925 ping_argv[0] = shared.ping;
4926 replicationFeedSlaves(server.slaves, -1,
4927 ping_argv, 1);
4928 }
4929 }
4930
4931 /* Second, send a newline to all the slaves in pre-synchronization
4932 * stage, that is, slaves waiting for the master to create the RDB file.
4933 *
4934 * Also send the a newline to all the chained slaves we have, if we lost
4935 * connection from our master, to keep the slaves aware that their
4936 * master is online. This is needed since sub-slaves only receive proxied
4937 * data from top-level masters, so there is no explicit pinging in order
4938 * to avoid altering the replication offsets. This special out of band
4939 * pings (newlines) can be sent, they will have no effect in the offset.
4940 *
4941 * The newline will be ignored by the slave but will refresh the
4942 * last interaction timer preventing a timeout. In this case we ignore the
4943 * ping period and refresh the connection once per second since certain
4944 * timeouts are set at a few seconds (example: PSYNC response). */
4945 listRewind(server.slaves,&li);
4946 while((ln = listNext(&li))) {
4947 client *slave = ln->value;
4948
4949 int is_presync =
4950 (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START ||
4951 (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
4952 server.rdb_child_type != RDB_CHILD_TYPE_SOCKET));
4953
4954 if (is_presync && !(slave->flags & CLIENT_CLOSE_ASAP)) {
4955 connWrite(slave->conn, "\n", 1);
4956 }
4957 }
4958
4959 /* Disconnect timedout slaves. */
4960 if (listLength(server.slaves)) {
4961 listIter li;
4962 listNode *ln;
4963
4964 listRewind(server.slaves,&li);
4965 while((ln = listNext(&li))) {
4966 client *slave = ln->value;
4967
4968 if (slave->replstate == SLAVE_STATE_ONLINE) {
4969 if (slave->flags & CLIENT_PRE_PSYNC)
4970 continue;
4971 if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) {
4972 serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s",
4973 replicationGetSlaveName(slave));
4974 freeClient(slave);
4975 continue;
4976 }
4977 }
4978 /* We consider disconnecting only diskless replicas because disk-based replicas aren't fed
4979 * by the fork child so if a disk-based replica is stuck it doesn't prevent the fork child
4980 * from terminating. */
4981 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) {
4982 if (slave->repl_last_partial_write != 0 &&
4983 (server.unixtime - slave->repl_last_partial_write) > server.repl_timeout)
4984 {
4985 serverLog(LL_WARNING, "Disconnecting timedout replica (full sync): %s",
4986 replicationGetSlaveName(slave));
4987 freeClient(slave);
4988 continue;
4989 }
4990 }
4991 }
4992 }
4993
4994 /* If this is a master without attached slaves and there is a replication
4995 * backlog active, in order to reclaim memory we can free it after some
4996 * (configured) time. Note that this cannot be done for slaves: slaves
4997 * without sub-slaves attached should still accumulate data into the
4998 * backlog, in order to reply to PSYNC queries if they are turned into
4999 * masters after a failover. */
5000 if (listLength(server.slaves) == 0 && server.repl_backlog_time_limit &&
5001 server.repl_backlog && server.masterhost == NULL)
5002 {
5003 time_t idle = server.unixtime - server.repl_no_slaves_since;
5004
5005 if (idle > server.repl_backlog_time_limit) {
5006 /* When we free the backlog, we always use a new
5007 * replication ID and clear the ID2. This is needed
5008 * because when there is no backlog, the master_repl_offset
5009 * is not updated, but we would still retain our replication
5010 * ID, leading to the following problem:
5011 *
5012 * 1. We are a master instance.
5013 * 2. Our slave is promoted to master. It's repl-id-2 will
5014 * be the same as our repl-id.
5015 * 3. We, yet as master, receive some updates, that will not
5016 * increment the master_repl_offset.
5017 * 4. Later we are turned into a slave, connect to the new
5018 * master that will accept our PSYNC request by second
5019 * replication ID, but there will be data inconsistency
5020 * because we received writes. */
5021 changeReplicationId();
5022 clearReplicationId2();
5023 freeReplicationBacklog();
5024 serverLog(LL_NOTICE,
5025 "Replication backlog freed after %d seconds "
5026 "without connected replicas.",
5027 (int) server.repl_backlog_time_limit);
5028 }
5029 }
5030
5031 replicationStartPendingFork();
5032
5033 /* Remove the RDB file used for replication if Redis is not running
5034 * with any persistence. */
5035 removeRDBUsedToSyncReplicas();
5036
5037 /* Sanity check replication buffer, the first block of replication buffer blocks
5038 * must be referenced by someone, since it will be freed when not referenced,
5039 * otherwise, server will OOM. also, its refcount must not be more than
5040 * replicas number + 1(replication backlog). */
5041 if (listLength(server.repl_buffer_blocks) > 0) {
5042 replBufBlock *o = listNodeValue(listFirst(server.repl_buffer_blocks));
5043 serverAssert(o->refcount > 0 &&
5044 o->refcount <= (int)listLength(server.slaves)+1);
5045 }
5046
5047 /* Refresh the number of slaves with lag <= min-slaves-max-lag. */
5048 refreshGoodSlavesCount();
5049}
5050
5051int shouldStartChildReplication(int *mincapa_out, int *req_out) {
5052 /* We should start a BGSAVE good for replication if we have slaves in
5053 * WAIT_BGSAVE_START state.
5054 *
5055 * In case of diskless replication, we make sure to wait the specified
5056 * number of seconds (according to configuration) so that other slaves
5057 * have the time to arrive before we start streaming. */
5058 if (!hasActiveChildProcess()) {
5059 time_t idle, max_idle = 0;
5060 int slaves_waiting = 0;
5061 int mincapa;
5062 int req;
5063 int first = 1;
5064 listNode *ln;
5065 listIter li;
5066
5067 listRewind(server.slaves,&li);
5068 while((ln = listNext(&li))) {
5069 client *slave = ln->value;
5070 if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
5071 if (first) {
5072 /* Get first slave's requirements */
5073 req = slave->slave_req;
5074 } else if (req != slave->slave_req) {
5075 /* Skip slaves that don't match */
5076 continue;
5077 }
5078 idle = server.unixtime - slave->lastinteraction;
5079 /* If the slave requests a slots snapshot, we should start BGSAVE
5080 * immediately since it can't share the RDB with other slaves. */
5081 if (slave->slave_req & SLAVE_REQ_SLOTS_SNAPSHOT)
5082 idle = server.repl_diskless_sync_delay; /* Threshold for BGSAVE */
5083 if (idle > max_idle) max_idle = idle;
5084 slaves_waiting++;
5085 mincapa = first ? slave->slave_capa : (mincapa & slave->slave_capa);
5086 first = 0;
5087 }
5088 }
5089
5090 if (slaves_waiting &&
5091 (!server.repl_diskless_sync ||
5092 (server.repl_diskless_sync_max_replicas > 0 &&
5093 slaves_waiting >= server.repl_diskless_sync_max_replicas) ||
5094 max_idle >= server.repl_diskless_sync_delay))
5095 {
5096 if (mincapa_out)
5097 *mincapa_out = mincapa;
5098 if (req_out)
5099 *req_out = req;
5100 return 1;
5101 }
5102 }
5103
5104 return 0;
5105}
5106
5107void replicationStartPendingFork(void) {
5108 int mincapa = -1;
5109 int req = -1;
5110
5111 if (shouldStartChildReplication(&mincapa, &req)) {
5112 /* Start the BGSAVE. The called function may start a
5113 * BGSAVE with socket target or disk target depending on the
5114 * configuration and slaves capabilities and requirements. */
5115 startBgsaveForReplication(mincapa, req);
5116 }
5117}
5118
5119/* Find replica at IP:PORT from replica list */
5120static client *findReplica(char *host, int port) {
5121 listIter li;
5122 listNode *ln;
5123 client *replica;
5124
5125 listRewind(server.slaves,&li);
5126 while((ln = listNext(&li))) {
5127 replica = ln->value;
5128 char ip[NET_IP_STR_LEN], *replicaip = replica->slave_addr;
5129
5130 if (!replicaip) {
5131 if (connAddrPeerName(replica->conn, ip, sizeof(ip), NULL) == -1)
5132 continue;
5133 replicaip = ip;
5134 }
5135
5136 if (!strcasecmp(host, replicaip) &&
5137 (port == replica->slave_listening_port))
5138 return replica;
5139 }
5140
5141 return NULL;
5142}
5143
5144const char *getFailoverStateString(void) {
5145 switch(server.failover_state) {
5146 case NO_FAILOVER: return "no-failover";
5147 case FAILOVER_IN_PROGRESS: return "failover-in-progress";
5148 case FAILOVER_WAIT_FOR_SYNC: return "waiting-for-sync";
5149 default: return "unknown";
5150 }
5151}
5152
5153/* Resets the internal failover configuration, this needs
5154 * to be called after a failover either succeeds or fails
5155 * as it includes the client unpause. */
5156void clearFailoverState(void) {
5157 server.failover_end_time = 0;
5158 server.force_failover = 0;
5159 zfree(server.target_replica_host);
5160 server.target_replica_host = NULL;
5161 server.target_replica_port = 0;
5162 server.failover_state = NO_FAILOVER;
5163 unpauseActions(PAUSE_DURING_FAILOVER);
5164}
5165
5166/* Abort an ongoing failover if one is going on. */
5167void abortFailover(const char *err) {
5168 if (server.failover_state == NO_FAILOVER) return;
5169
5170 if (server.target_replica_host) {
5171 serverLog(LL_NOTICE,"FAILOVER to %s:%d aborted: %s",
5172 server.target_replica_host,server.target_replica_port,err);
5173 } else {
5174 serverLog(LL_NOTICE,"FAILOVER to any replica aborted: %s",err);
5175 }
5176 if (server.failover_state == FAILOVER_IN_PROGRESS) {
5177 replicationUnsetMaster();
5178 }
5179 clearFailoverState();
5180}
5181
5182/*
5183 * FAILOVER [TO <HOST> <PORT> [FORCE]] [ABORT] [TIMEOUT <timeout>]
5184 *
5185 * This command will coordinate a failover between the master and one
5186 * of its replicas. The happy path contains the following steps:
5187 * 1) The master will initiate a client pause write, to stop replication
5188 * traffic.
5189 * 2) The master will periodically check if any of its replicas has
5190 * consumed the entire replication stream through acks.
5191 * 3) Once any replica has caught up, the master will itself become a replica.
5192 * 4) The master will send a PSYNC FAILOVER request to the target replica, which
5193 * if accepted will cause the replica to become the new master and start a sync.
5194 *
5195 * FAILOVER ABORT is the only way to abort a failover command, as replicaof
5196 * will be disabled. This may be needed if the failover is unable to progress.
5197 *
5198 * The optional arguments [TO <HOST> <IP>] allows designating a specific replica
5199 * to be failed over to.
5200 *
5201 * FORCE flag indicates that even if the target replica is not caught up,
5202 * failover to it anyway. This must be specified with a timeout and a target
5203 * HOST and IP.
5204 *
5205 * TIMEOUT <timeout> indicates how long should the primary wait for
5206 * a replica to sync up before aborting. If not specified, the failover
5207 * will attempt forever and must be manually aborted.
5208 */
5209void failoverCommand(client *c) {
5210 if (!clusterAllowFailoverCmd(c)) {
5211 return;
5212 }
5213
5214 /* Handle special case for abort */
5215 if ((c->argc == 2) && !strcasecmp(c->argv[1]->ptr,"abort")) {
5216 if (server.failover_state == NO_FAILOVER) {
5217 addReplyError(c, "No failover in progress.");
5218 return;
5219 }
5220
5221 abortFailover("Failover manually aborted");
5222 addReply(c,shared.ok);
5223 return;
5224 }
5225
5226 long timeout_in_ms = 0;
5227 int force_flag = 0;
5228 long port = 0;
5229 char *host = NULL;
5230
5231 /* Parse the command for syntax and arguments. */
5232 for (int j = 1; j < c->argc; j++) {
5233 if (!strcasecmp(c->argv[j]->ptr,"timeout") && (j + 1 < c->argc) &&
5234 timeout_in_ms == 0)
5235 {
5236 if (getLongFromObjectOrReply(c,c->argv[j + 1],
5237 &timeout_in_ms,NULL) != C_OK) return;
5238 if (timeout_in_ms <= 0) {
5239 addReplyError(c,"FAILOVER timeout must be greater than 0");
5240 return;
5241 }
5242 j++;
5243 } else if (!strcasecmp(c->argv[j]->ptr,"to") && (j + 2 < c->argc) &&
5244 !host)
5245 {
5246 if (getLongFromObjectOrReply(c,c->argv[j + 2],&port,NULL) != C_OK)
5247 return;
5248 host = c->argv[j + 1]->ptr;
5249 j += 2;
5250 } else if (!strcasecmp(c->argv[j]->ptr,"force") && !force_flag) {
5251 force_flag = 1;
5252 } else {
5253 addReplyErrorObject(c,shared.syntaxerr);
5254 return;
5255 }
5256 }
5257
5258 if (server.failover_state != NO_FAILOVER) {
5259 addReplyError(c,"FAILOVER already in progress.");
5260 return;
5261 }
5262
5263 if (server.masterhost) {
5264 addReplyError(c,"FAILOVER is not valid when server is a replica.");
5265 return;
5266 }
5267
5268 if (listLength(server.slaves) == 0) {
5269 addReplyError(c,"FAILOVER requires connected replicas.");
5270 return;
5271 }
5272
5273 if (force_flag && (!timeout_in_ms || !host)) {
5274 addReplyError(c,"FAILOVER with force option requires both a timeout "
5275 "and target HOST and IP.");
5276 return;
5277 }
5278
5279 /* If a replica address was provided, validate that it is connected. */
5280 if (host) {
5281 client *replica = findReplica(host, port);
5282
5283 if (replica == NULL) {
5284 addReplyError(c,"FAILOVER target HOST and PORT is not "
5285 "a replica.");
5286 return;
5287 }
5288
5289 /* Check if requested replica is online */
5290 if (replica->replstate != SLAVE_STATE_ONLINE) {
5291 addReplyError(c,"FAILOVER target replica is not online.");
5292 return;
5293 }
5294
5295 server.target_replica_host = zstrdup(host);
5296 server.target_replica_port = port;
5297 serverLog(LL_NOTICE,"FAILOVER requested to %s:%ld.",host,port);
5298 } else {
5299 serverLog(LL_NOTICE,"FAILOVER requested to any replica.");
5300 }
5301
5302 mstime_t now = commandTimeSnapshot();
5303 if (timeout_in_ms) {
5304 server.failover_end_time = now + timeout_in_ms;
5305 }
5306
5307 server.force_failover = force_flag;
5308 server.failover_state = FAILOVER_WAIT_FOR_SYNC;
5309 /* Cancel all ASM tasks when starting failover */
5310 clusterAsmCancel(NULL, "failover requested");
5311 /* Cluster failover will unpause eventually */
5312 pauseActions(PAUSE_DURING_FAILOVER,
5313 LLONG_MAX,
5314 PAUSE_ACTIONS_CLIENT_WRITE_SET);
5315 addReply(c,shared.ok);
5316}
5317
5318/* Failover cron function, checks coordinated failover state.
5319 *
5320 * Implementation note: The current implementation calls replicationSetMaster()
5321 * to start the failover request, this has some unintended side effects if the
5322 * failover doesn't work like blocked clients will be unblocked and replicas will
5323 * be disconnected. This could be optimized further.
5324 */
5325void updateFailoverStatus(void) {
5326 if (server.failover_state != FAILOVER_WAIT_FOR_SYNC) return;
5327 mstime_t now = server.mstime;
5328
5329 /* Check if failover operation has timed out */
5330 if (server.failover_end_time && server.failover_end_time <= now) {
5331 if (server.force_failover) {
5332 serverLog(LL_NOTICE,
5333 "FAILOVER to %s:%d time out exceeded, failing over.",
5334 server.target_replica_host, server.target_replica_port);
5335 server.failover_state = FAILOVER_IN_PROGRESS;
5336 /* If timeout has expired force a failover if requested. */
5337 replicationSetMaster(server.target_replica_host,
5338 server.target_replica_port);
5339 return;
5340 } else {
5341 /* Force was not requested, so timeout. */
5342 abortFailover("Replica never caught up before timeout");
5343 return;
5344 }
5345 }
5346
5347 /* Check to see if the replica has caught up so failover can start */
5348 client *replica = NULL;
5349 if (server.target_replica_host) {
5350 replica = findReplica(server.target_replica_host,
5351 server.target_replica_port);
5352 } else {
5353 listIter li;
5354 listNode *ln;
5355
5356 listRewind(server.slaves,&li);
5357 /* Find any replica that has matched our repl_offset */
5358 while((ln = listNext(&li))) {
5359 replica = ln->value;
5360 if (replica->repl_ack_off == server.master_repl_offset) {
5361 char ip[NET_IP_STR_LEN], *replicaaddr = replica->slave_addr;
5362
5363 if (!replicaaddr) {
5364 if (connAddrPeerName(replica->conn,ip,sizeof(ip),NULL) == -1)
5365 continue;
5366 replicaaddr = ip;
5367 }
5368
5369 /* We are now failing over to this specific node */
5370 server.target_replica_host = zstrdup(replicaaddr);
5371 server.target_replica_port = replica->slave_listening_port;
5372 break;
5373 }
5374 }
5375 }
5376
5377 /* We've found a replica that is caught up */
5378 if (replica && (replica->repl_ack_off == server.master_repl_offset)) {
5379 server.failover_state = FAILOVER_IN_PROGRESS;
5380 serverLog(LL_NOTICE,
5381 "Failover target %s:%d is synced, failing over.",
5382 server.target_replica_host, server.target_replica_port);
5383 /* Designated replica is caught up, failover to it. */
5384 replicationSetMaster(server.target_replica_host,
5385 server.target_replica_port);
5386 }
5387}