From 5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda Mon Sep 17 00:00:00 2001 From: Mitja Felicijan Date: Wed, 21 Jan 2026 22:40:55 +0100 Subject: Add Redis source code for testing --- examples/redis-unstable/src/replication.c | 5387 +++++++++++++++++++++++++++++ 1 file changed, 5387 insertions(+) create mode 100644 examples/redis-unstable/src/replication.c (limited to 'examples/redis-unstable/src/replication.c') diff --git a/examples/redis-unstable/src/replication.c b/examples/redis-unstable/src/replication.c new file mode 100644 index 0000000..309d6c4 --- /dev/null +++ b/examples/redis-unstable/src/replication.c @@ -0,0 +1,5387 @@ +/* Asynchronous replication implementation. + * + * Copyright (c) 2009-Present, Redis Ltd. + * All rights reserved. + * + * Copyright (c) 2024-present, Valkey contributors. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + * + * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. + */ + +/* + * replication.c - Replication Management + * + * This file contains the implementation of Redis's replication logic, which + * enables data synchronization between master and replica instances. + * It handles: + * - Master-to-replica synchronization + * - Full and partial resynchronizations + * - Replication backlog management + * - State machines for replica operations + * - RDB Channel for Full Sync (lookup "rdb channel for full sync") + */ + +#include "server.h" +#include "cluster.h" +#include "cluster_slot_stats.h" +#include "bio.h" +#include "functions.h" +#include "connection.h" +#include "cluster_asm.h" + +#include +#include +#include +#include +#include +#include + +void replicationDiscardCachedMaster(void); +void replicationResurrectCachedMaster(connection *conn); +void replicationSendAck(void); +int replicaPutOnline(client *slave); +void replicaStartCommandStream(client *slave); +int cancelReplicationHandshake(int reconnect); +static void rdbChannelFullSyncWithMaster(connection *conn); +static int rdbChannelAbort(void); +static void rdbChannelBufferReplData(connection *conn); +static void rdbChannelReplDataBufInit(void); +static void rdbChannelStreamReplDataToDb(void); +static void rdbChannelCleanup(void); + +/* We take a global flag to remember if this instance generated an RDB + * because of replication, so that we can remove the RDB file in case + * the instance is configured to have no persistence. */ +int RDBGeneratedByReplication = 0; + + +/* A reference to diskless loading rio to abort it asynchronously. It's needed + * for rdbchannel replication. While loading from rdbchannel connection, we may + * yield back to eventloop. If main channel connection detects a network problem + * we want to abort loading. It calls rioAbort() in this case, so next rioRead() + * from rdbchannel connection will return error to cancel loading safely. */ +static rio *disklessLoadingRio = NULL; + +/* --------------------------- Utility functions ---------------------------- */ + +/* Returns 1 if the replica is rdbchannel and there is an associated main + * channel slave with that. */ +int replicationCheckHasMainChannel(client *replica) { + if (!(replica->flags & CLIENT_REPL_RDB_CHANNEL) || + !replica->main_ch_client_id || + lookupClientByID(replica->main_ch_client_id) == NULL) + { + return 0; + } + return 1; +} + +/* During rdb channel replication, replica opens two connections. From master + * POV, these connections are distinct replicas in server.slaves. This function + * counts associated replicas as one and returns logical replica count. */ +unsigned long replicationLogicalReplicaCount(void) { + unsigned long count = 0; + listNode *ln; + listIter li; + + listRewind(server.slaves,&li); + while ((ln = listNext(&li))) { + client *replica = listNodeValue(ln); + if (!replicationCheckHasMainChannel(replica)) + count++; + } + return count; +} + +int replicaFromIOThreadHasPendingRead(client *c) { + serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID); + + int pending_read; + atomicGetWithSync(c->pending_read, pending_read); + return pending_read; +} + +/* Send replicas to their respective IO threads if it has pending reads or + * writes. Otherwise it remains in main thread so it can check for new data in + * the replication buffer ASAP. */ +void putReplicasInPendingClientsToIOThreads(void) { + if (server.io_threads_num <= 1) return; + + serverAssert(pthread_equal(pthread_self(), server.main_thread_id)); + + listIter li; + listNode *ln; + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *replica = listNodeValue(ln); + + /* We only care about replicas that need to run on IO thread but are + * currently in main */ + if (replica->tid == IOTHREAD_MAIN_THREAD_ID || + replica->running_tid != IOTHREAD_MAIN_THREAD_ID) + { + continue; + } + + /* Skip the replica if it's scheduled for close */ + if (replica->flags & CLIENT_CLOSE_ASAP) continue; + + /* The call to clientHasPendingReplies may seem redundant but in the + * case of replica being in IO thread we can have the following case: + * replica gets back to main thread after sending the repl buffer it + * knows about. In the mean time main thread has accumulated new repl + * data. In that case the replica's client wouldn't have been put in + * the pending write queue but will still have new repl data it needs to + * send, so we make sure to check for that and send it back to IO thread + * if so. On the other hand if replica gets back to main thread before + * any new repl data has accumulated then after a new cmd is propagated + * the replica will be put in the pending write queue as usual so we + * need to check for that also. + * In addition, if the replica client has pending read events, we should + * also send them to the IO thread. */ + if (replica->flags & CLIENT_PENDING_WRITE || + clientHasPendingReplies(replica) || + replicaFromIOThreadHasPendingRead(replica)) + { + enqueuePendingClienstToIOThreads(replica); + } + } +} + +/* Run some cron tasks for a connected master client. Return 1 when the client + * is freed, 0 otherwise. */ +int replicationCronRunMasterClient(void) { + if (!server.masterhost || !server.master) return 0; + + if (server.master->running_tid != IOTHREAD_MAIN_THREAD_ID) return 0; + + /* Timed out master when we are an already connected slave? */ + if (server.repl_state == REPL_STATE_CONNECTED && + (time(NULL)-server.master->lastinteraction) > server.repl_timeout) + { + serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); + freeClient(server.master); + return 1; + } + + /* Send ACK to master from time to time. + * Note that we do not send periodic acks to masters that don't + * support PSYNC and replication offsets. */ + if (!(server.master->flags & CLIENT_PRE_PSYNC)) + replicationSendAck(); + + return 0; +} + +ConnectionType *connTypeOfReplication(void) { + if (server.tls_replication) { + return connectionTypeTls(); + } + + return connectionTypeTcp(); +} + +/* Return the pointer to a string representing the slave ip:listening_port + * pair. Mostly useful for logging, since we want to log a slave using its + * IP address and its listening port which is more clear for the user, for + * example: "Closing connection with replica 10.1.2.3:6380". */ +char *replicationGetSlaveName(client *c) { + static char buf[NET_HOST_PORT_STR_LEN]; + char ip[NET_IP_STR_LEN]; + + ip[0] = '\0'; + buf[0] = '\0'; + if (c->slave_addr || + connAddrPeerName(c->conn,ip,sizeof(ip),NULL) != -1) + { + char *addr = c->slave_addr ? c->slave_addr : ip; + if (c->slave_listening_port) + formatAddr(buf,sizeof(buf),addr,c->slave_listening_port); + else + snprintf(buf,sizeof(buf),"%s:",addr); + } else { + snprintf(buf,sizeof(buf),"client id #%llu", + (unsigned long long) c->id); + } + return buf; +} + +/* Plain unlink() can block for quite some time in order to actually apply + * the file deletion to the filesystem. This call removes the file in a + * background thread instead. We actually just do close() in the thread, + * by using the fact that if there is another instance of the same file open, + * the foreground unlink() will only remove the fs name, and deleting the + * file's storage space will only happen once the last reference is lost. */ +int bg_unlink(const char *filename) { + int fd = open(filename,O_RDONLY|O_NONBLOCK); + if (fd == -1) { + /* Can't open the file? Fall back to unlinking in the main thread. */ + return unlink(filename); + } else { + /* The following unlink() removes the name but doesn't free the + * file contents because a process still has it open. */ + int retval = unlink(filename); + if (retval == -1) { + /* If we got an unlink error, we just return it, closing the + * new reference we have to the file. */ + int old_errno = errno; + close(fd); /* This would overwrite our errno. So we saved it. */ + errno = old_errno; + return -1; + } + bioCreateCloseJob(fd, 0, 0); + return 0; /* Success. */ + } +} + +/* ---------------------------------- MASTER -------------------------------- */ + +void createReplicationBacklog(void) { + serverAssert(server.repl_backlog == NULL); + server.repl_backlog = zmalloc(sizeof(replBacklog)); + server.repl_backlog->ref_repl_buf_node = NULL; + server.repl_backlog->unindexed_count = 0; + server.repl_backlog->blocks_index = raxNew(); + server.repl_backlog->histlen = 0; + /* We don't have any data inside our buffer, but virtually the first + * byte we have is the next byte that will be generated for the + * replication stream. */ + server.repl_backlog->offset = server.master_repl_offset+1; +} + +/* This function is called when the user modifies the replication backlog + * size at runtime. It is up to the function to resize the buffer and setup it + * so that it contains the same data as the previous one (possibly less data, + * but the most recent bytes, or the same data and more free space in case the + * buffer is enlarged). */ +void resizeReplicationBacklog(void) { + if (server.repl_backlog_size < CONFIG_REPL_BACKLOG_MIN_SIZE) + server.repl_backlog_size = CONFIG_REPL_BACKLOG_MIN_SIZE; + if (server.repl_backlog) + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); +} + +void freeReplicationBacklog(void) { + serverAssert(listLength(server.slaves) == 0); + if (server.repl_backlog == NULL) return; + + /* Decrease the start buffer node reference count. */ + if (server.repl_backlog->ref_repl_buf_node) { + replBufBlock *o = listNodeValue( + server.repl_backlog->ref_repl_buf_node); + serverAssert(o->refcount == 1); /* Last reference. */ + o->refcount--; + } + + /* Replication buffer blocks are completely released when we free the + * backlog, since the backlog is released only when there are no replicas + * and the backlog keeps the last reference of all blocks. */ + freeReplicationBacklogRefMemAsync(server.repl_buffer_blocks, + server.repl_backlog->blocks_index); + resetReplicationBuffer(); + zfree(server.repl_backlog); + server.repl_backlog = NULL; +} + +/* To make search offset from replication buffer blocks quickly + * when replicas ask partial resynchronization, we create one index + * block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */ +void createReplicationBacklogIndex(listNode *ln) { + server.repl_backlog->unindexed_count++; + if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) { + replBufBlock *o = listNodeValue(ln); + uint64_t encoded_offset = htonu64(o->repl_offset); + raxInsert(server.repl_backlog->blocks_index, + (unsigned char*)&encoded_offset, sizeof(uint64_t), + ln, NULL); + server.repl_backlog->unindexed_count = 0; + } +} + +/* Rebase replication buffer blocks' offset since the initial + * setting offset starts from 0 when master restart. */ +void rebaseReplicationBuffer(long long base_repl_offset) { + raxFree(server.repl_backlog->blocks_index); + server.repl_backlog->blocks_index = raxNew(); + server.repl_backlog->unindexed_count = 0; + + listIter li; + listNode *ln; + listRewind(server.repl_buffer_blocks, &li); + while ((ln = listNext(&li))) { + replBufBlock *o = listNodeValue(ln); + o->repl_offset += base_repl_offset; + createReplicationBacklogIndex(ln); + } +} + +void resetReplicationBuffer(void) { + server.repl_buffer_mem = 0; + server.repl_buffer_blocks = listCreate(); + listSetFreeMethod(server.repl_buffer_blocks, zfree); +} + +int canFeedReplicaReplBuffer(client *replica) { + /* Don't feed replicas that only want the RDB or main channels of migration + * destinations which need filtered stream for migrating slot ranges. */ + if (replica->flags & CLIENT_REPL_RDBONLY || + replica->flags & CLIENT_ASM_MIGRATING) return 0; + + /* Don't feed replicas that are still waiting for BGSAVE to start. */ + if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START || + replica->replstate == SLAVE_STATE_WAIT_RDB_CHANNEL) return 0; + + /* Don't feed replicas that are going to be closed ASAP. */ + if (replica->flags & CLIENT_CLOSE_ASAP) return 0; + + return 1; +} + +/* Create the replication backlog if needed. */ +void createReplicationBacklogIfNeeded(void) { + if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) { + /* When we create the backlog from scratch, we always use a new + * replication ID and clear the ID2, since there is no valid + * past history. */ + changeReplicationId(); + clearReplicationId2(); + createReplicationBacklog(); + serverLog(LL_NOTICE,"Replication backlog created, my new " + "replication IDs are '%s' and '%s'", + server.replid, server.replid2); + } +} +/* Similar with 'prepareClientToWrite', note that we must call this function + * before feeding replication stream into global replication buffer, since + * clientHasPendingReplies in prepareClientToWrite will access the global + * replication buffer to make judgements. */ +int prepareReplicasToWrite(void) { + listIter li; + listNode *ln; + int prepared = 0; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *slave = ln->value; + if (!canFeedReplicaReplBuffer(slave)) continue; + if (prepareClientToWrite(slave) == C_ERR) continue; + prepared++; + } + + return prepared; +} + +/* Wrapper for feedReplicationBuffer() that takes Redis string objects + * as input. */ +void feedReplicationBufferWithObject(robj *o) { + char llstr[LONG_STR_SIZE]; + void *p; + size_t len; + + if (o->encoding == OBJ_ENCODING_INT) { + len = ll2string(llstr,sizeof(llstr),(long)o->ptr); + p = llstr; + } else { + len = sdslen(o->ptr); + p = o->ptr; + } + feedReplicationBuffer(p,len); +} + +/* Generally, we only have one replication buffer block to trim when replication + * backlog size exceeds our setting and no replica reference it. But if replica + * clients disconnect, we need to free many replication buffer blocks that are + * referenced. It would cost much time if there are a lots blocks to free, that + * will freeze server, so we trim replication backlog incrementally. */ +void incrementalTrimReplicationBacklog(size_t max_blocks) { + serverAssert(server.repl_backlog != NULL); + + size_t trimmed_blocks = 0; + while (server.repl_backlog->histlen > server.repl_backlog_size && + trimmed_blocks < max_blocks) + { + /* We never trim backlog to less than one block. */ + if (listLength(server.repl_buffer_blocks) <= 1) break; + + /* Replicas increment the refcount of the first replication buffer block + * they refer to, in that case, we don't trim the backlog even if + * backlog_histlen exceeds backlog_size. This implicitly makes backlog + * bigger than our setting, but makes the master accept partial resync as + * much as possible. So that backlog must be the last reference of + * replication buffer blocks. */ + listNode *first = listFirst(server.repl_buffer_blocks); + serverAssert(first == server.repl_backlog->ref_repl_buf_node); + replBufBlock *fo = listNodeValue(first); + if (fo->refcount != 1) break; + + /* We don't try trim backlog if backlog valid size will be lessen than + * setting backlog size once we release the first repl buffer block. */ + if (server.repl_backlog->histlen - (long long)fo->size <= + server.repl_backlog_size) break; + + /* Decr refcount and release the first block later. */ + fo->refcount--; + trimmed_blocks++; + server.repl_backlog->histlen -= fo->size; + + /* Go to use next replication buffer block node. */ + listNode *next = listNextNode(first); + server.repl_backlog->ref_repl_buf_node = next; + serverAssert(server.repl_backlog->ref_repl_buf_node != NULL); + /* Incr reference count to keep the new head node. */ + ((replBufBlock *)listNodeValue(next))->refcount++; + + /* Remove the node in recorded blocks. */ + uint64_t encoded_offset = htonu64(fo->repl_offset); + raxRemove(server.repl_backlog->blocks_index, + (unsigned char*)&encoded_offset, sizeof(uint64_t), NULL); + + /* Delete the first node from global replication buffer. */ + serverAssert(fo->refcount == 0 && fo->used == fo->size); + server.repl_buffer_mem -= (fo->size + + sizeof(listNode) + sizeof(replBufBlock)); + listDelNode(server.repl_buffer_blocks, first); + } + + /* Set the offset of the first byte we have in the backlog. */ + server.repl_backlog->offset = server.master_repl_offset - + server.repl_backlog->histlen + 1; +} + +/* Free replication buffer blocks that are referenced by this client. */ +void freeReplicaReferencedReplBuffer(client *replica) { + serverAssert(replica->running_tid == IOTHREAD_MAIN_THREAD_ID); + + if (replica->ref_repl_buf_node != NULL) { + /* Decrease the start buffer node reference count. */ + replBufBlock *o = listNodeValue(replica->ref_repl_buf_node); + serverAssert(o->refcount > 0); + o->refcount--; + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); + } + replica->ref_repl_buf_node = NULL; + replica->ref_block_pos = 0; +} + +/* Append bytes into the global replication buffer list, replication backlog and + * all replica clients use replication buffers collectively, this function replace + * 'addReply*', 'feedReplicationBacklog' for replicas and replication backlog, + * First we add buffer into global replication buffer block list, and then + * update replica / replication-backlog referenced node and block position. */ +void feedReplicationBuffer(char *s, size_t len) { + static long long repl_block_id = 0; + + if (server.repl_backlog == NULL) return; + + clusterSlotStatsIncrNetworkBytesOutForReplication(len); + + /* Update the current cmd's keys with the commands replication bytes*/ + hotkeyMetrics metrics = {0, len}; + hotkeyStatsUpdateCurrentCmd(server.hotkeys, metrics); + + while(len > 0) { + size_t start_pos = 0; /* The position of referenced block to start sending. */ + listNode *start_node = NULL; /* Replica/backlog starts referenced node. */ + int add_new_block = 0; /* Create new block if current block is total used. */ + listNode *ln = listLast(server.repl_buffer_blocks); + replBufBlock *tail = ln ? listNodeValue(ln) : NULL; + + /* Append to tail string when possible. */ + if (tail && tail->size > tail->used) { + start_node = listLast(server.repl_buffer_blocks); + start_pos = tail->used; + /* Copy the part we can fit into the tail, and leave the rest for a + * new node */ + size_t avail = tail->size - tail->used; + size_t copy = (avail >= len) ? len : avail; + memcpy(tail->buf + tail->used, s, copy); + tail->used += copy; + s += copy; + len -= copy; + server.master_repl_offset += copy; + server.repl_backlog->histlen += copy; + } + if (len) { + /* Create a new node, make sure it is allocated to at + * least PROTO_REPLY_CHUNK_BYTES */ + size_t usable_size; + /* Avoid creating nodes smaller than PROTO_REPLY_CHUNK_BYTES, so that we can append more data into them, + * and also avoid creating nodes bigger than repl_backlog_size / 16, so that we won't have huge nodes that can't + * trim when we only still need to hold a small portion from them. */ + size_t limit = max((size_t)server.repl_backlog_size / 16, (size_t)PROTO_REPLY_CHUNK_BYTES); + size_t size = min(max(len, (size_t)PROTO_REPLY_CHUNK_BYTES), limit); + tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size); + /* Take over the allocation's internal fragmentation */ + tail->size = usable_size - sizeof(replBufBlock); + size_t copy = (tail->size >= len) ? len : tail->size; + tail->used = copy; + tail->refcount = 0; + tail->repl_offset = server.master_repl_offset + 1; + tail->id = repl_block_id++; + memcpy(tail->buf, s, copy); + listAddNodeTail(server.repl_buffer_blocks, tail); + /* We also count the list node memory into replication buffer memory. */ + server.repl_buffer_mem += (usable_size + sizeof(listNode)); + add_new_block = 1; + if (start_node == NULL) { + start_node = listLast(server.repl_buffer_blocks); + start_pos = 0; + } + s += copy; + len -= copy; + server.master_repl_offset += copy; + server.repl_backlog->histlen += copy; + } + + /* For output buffer of replicas. */ + listIter li; + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *slave = ln->value; + if (!canFeedReplicaReplBuffer(slave)) continue; + + /* Update shared replication buffer start position. */ + if (slave->ref_repl_buf_node == NULL) { + slave->ref_repl_buf_node = start_node; + slave->ref_block_pos = start_pos; + /* Only increase the start block reference count. */ + ((replBufBlock *)listNodeValue(start_node))->refcount++; + } + + /* Check output buffer limit only when add new block. */ + if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1); + } + + /* For replication backlog */ + if (server.repl_backlog->ref_repl_buf_node == NULL) { + server.repl_backlog->ref_repl_buf_node = start_node; + /* Only increase the start block reference count. */ + ((replBufBlock *)listNodeValue(start_node))->refcount++; + + /* Replication buffer must be empty before adding replication stream + * into replication backlog. */ + serverAssert(add_new_block == 1 && start_pos == 0); + } + if (add_new_block) { + createReplicationBacklogIndex(listLast(server.repl_buffer_blocks)); + + /* It is important to trim after adding replication data to keep the backlog size close to + * repl_backlog_size in the common case. We wait until we add a new block to avoid repeated + * unnecessary trimming attempts when small amounts of data are added. See comments in + * freeMemoryGetNotCountedMemory() for details on replication backlog memory tracking. */ + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); + } + } +} + +/* Propagate write commands to replication stream. + * + * This function is used if the instance is a master: we use the commands + * received by our clients in order to create the replication stream. + * Instead if the instance is a replica and has sub-replicas attached, we use + * replicationFeedStreamFromMasterStream() */ +void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { + int j, len; + char llstr[LONG_STR_SIZE]; + + /* In case we propagate a command that doesn't touch keys (PING, REPLCONF) we + * pass dbid=-1 that indicate there is no need to replicate `select` command. */ + serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum)); + + /* If the instance is not a top level master, return ASAP: we'll just proxy + * the stream of data we receive from our master instead, in order to + * propagate *identical* replication stream. In this way this slave can + * advertise the same replication ID as the master (since it shares the + * master replication history and has the same backlog and offsets). */ + if (server.masterhost != NULL) return; + + /* If current client is marked as master, we will proxy the command stream + * to our slaves instead of replicating them, that also happens when being + * in atomic slot migration. */ + if (server.current_client && server.current_client->flags & CLIENT_MASTER) return; + + /* If there aren't slaves, and there is no backlog buffer to populate, + * we can return ASAP. */ + if (server.repl_backlog == NULL && listLength(slaves) == 0) { + /* We increment the repl_offset anyway, since we use that for tracking AOF fsyncs + * even when there's no replication active. This code will not be reached if AOF + * is also disabled. */ + server.master_repl_offset += 1; + return; + } + + /* We can't have slaves attached and no backlog. */ + serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); + + /* Update the time of sending replication stream to replicas. */ + server.repl_stream_lastio = server.unixtime; + + /* Must install write handler for all replicas first before feeding + * replication stream. */ + prepareReplicasToWrite(); + + /* Send SELECT command to every slave if needed. */ + if (dictid != -1 && server.slaveseldb != dictid) { + robj *selectcmd; + + /* For a few DBs we have pre-computed SELECT command. */ + if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { + selectcmd = shared.select[dictid]; + } else { + int dictid_len; + + dictid_len = ll2string(llstr,sizeof(llstr),dictid); + selectcmd = createObject(OBJ_STRING, + sdscatprintf(sdsempty(), + "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", + dictid_len, llstr)); + } + + feedReplicationBufferWithObject(selectcmd); + + /* Although the SELECT command is not associated with any slot, + * its per-slot network-bytes-out accumulation is made by the above function call. + * To cancel-out this accumulation, below adjustment is made. */ + clusterSlotStatsDecrNetworkBytesOutForReplication(sdslen(selectcmd->ptr)); + + if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) + decrRefCount(selectcmd); + + server.slaveseldb = dictid; + } + + /* Write the command to the replication buffer if any. */ + char aux[LONG_STR_SIZE+3]; + + /* Add the multi bulk reply length. */ + aux[0] = '*'; + len = ll2string(aux+1,sizeof(aux)-1,argc); + aux[len+1] = '\r'; + aux[len+2] = '\n'; + feedReplicationBuffer(aux,len+3); + + for (j = 0; j < argc; j++) { + long objlen = stringObjectLen(argv[j]); + + /* We need to feed the buffer with the object as a bulk reply + * not just as a plain string, so create the $..CRLF payload len + * and add the final CRLF */ + aux[0] = '$'; + len = ll2string(aux+1,sizeof(aux)-1,objlen); + aux[len+1] = '\r'; + aux[len+2] = '\n'; + feedReplicationBuffer(aux,len+3); + feedReplicationBufferWithObject(argv[j]); + feedReplicationBuffer(aux+len+1,2); + } +} + +/* This is a debugging function that gets called when we detect something + * wrong with the replication protocol: the goal is to peek into the + * replication backlog and show a few final bytes to make simpler to + * guess what kind of bug it could be. */ +void showLatestBacklog(void) { + if (server.repl_backlog == NULL) return; + if (listLength(server.repl_buffer_blocks) == 0) return; + if (server.hide_user_data_from_log) { + serverLog(LL_NOTICE,"hide-user-data-from-log is on, skip logging backlog content to avoid spilling PII."); + return; + } + + size_t dumplen = 256; + if (server.repl_backlog->histlen < (long long)dumplen) + dumplen = server.repl_backlog->histlen; + + sds dump = sdsempty(); + listNode *node = listLast(server.repl_buffer_blocks); + while(dumplen) { + if (node == NULL) break; + replBufBlock *o = listNodeValue(node); + size_t thislen = o->used >= dumplen ? dumplen : o->used; + sds head = sdscatrepr(sdsempty(), o->buf+o->used-thislen, thislen); + sds tmp = sdscatsds(head, dump); + sdsfree(dump); + dump = tmp; + dumplen -= thislen; + node = listPrevNode(node); + } + + /* Finally log such bytes: this is vital debugging info to + * understand what happened. */ + serverLog(LL_NOTICE,"Latest backlog is: '%s'", dump); + sdsfree(dump); +} + +/* This function is used in order to proxy what we receive from our master + * to our sub-slaves. Besides, we also proxy the replication stream from + * the source node when being in atomic slot migration. */ +void replicationFeedStreamFromMasterStream(char *buf, size_t buflen) { + /* There must be replication backlog if having attached slaves. */ + if (listLength(server.slaves)) serverAssert(server.repl_backlog != NULL); + if (server.repl_backlog) { + /* Must install write handler for all replicas first before feeding + * replication stream. */ + prepareReplicasToWrite(); + feedReplicationBuffer(buf,buflen); + } else if (server.masterhost == NULL && server.aof_enabled) { + /* We increment the repl_offset anyway, since we use that for tracking + * AOF fsyncs even when there's no replication active. This code will + * not be reached if AOF is also disabled. + * + * As we skip feeding the replication buffer in atomic slot migration, + * so here we need to update the replication offset manually. */ + server.master_repl_offset += 1; + } +} + +void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { + /* Fast path to return if the monitors list is empty or the server is in loading. */ + if (monitors == NULL || listLength(monitors) == 0 || server.loading) return; + listNode *ln; + listIter li; + int j; + sds cmdrepr = sdsnew("+"); + robj *cmdobj; + struct timeval tv; + + gettimeofday(&tv,NULL); + cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); + if (c->flags & CLIENT_SCRIPT) { + cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); + } else if (c->flags & CLIENT_UNIX_SOCKET) { + cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); + } else { + cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c)); + } + + for (j = 0; j < argc; j++) { + if (argv[j]->encoding == OBJ_ENCODING_INT) { + cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr); + } else { + cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, + sdslen(argv[j]->ptr)); + } + if (j != argc-1) + cmdrepr = sdscatlen(cmdrepr," ",1); + } + cmdrepr = sdscatlen(cmdrepr,"\r\n",2); + cmdobj = createObject(OBJ_STRING,cmdrepr); + + listRewind(monitors,&li); + while((ln = listNext(&li))) { + client *monitor = ln->value; + /* Do not show internal commands to non-internal clients. */ + if (c->realcmd && (c->realcmd->flags & CMD_INTERNAL) && !(monitor->flags & CLIENT_INTERNAL)) { + continue; + } + addReply(monitor,cmdobj); + updateClientMemUsageAndBucket(monitor); + } + decrRefCount(cmdobj); +} + +/* Feed the slave 'c' with the replication backlog starting from the + * specified 'offset' up to the end of the backlog. */ +long long addReplyReplicationBacklog(client *c, long long offset) { + serverAssert(c->running_tid == IOTHREAD_MAIN_THREAD_ID); + + long long skip; + + serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); + + if (server.repl_backlog->histlen == 0) { + serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); + return 0; + } + + serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", + server.repl_backlog_size); + serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", + server.repl_backlog->offset); + serverLog(LL_DEBUG, "[PSYNC] History len: %lld", + server.repl_backlog->histlen); + + /* Compute the amount of bytes we need to discard. */ + skip = offset - server.repl_backlog->offset; + serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); + + /* Iterate recorded blocks, quickly search the approximate node. */ + listNode *node = NULL; + if (raxSize(server.repl_backlog->blocks_index) > 0) { + uint64_t encoded_offset = htonu64(offset); + raxIterator ri; + raxStart(&ri, server.repl_backlog->blocks_index); + raxSeek(&ri, ">", (unsigned char*)&encoded_offset, sizeof(uint64_t)); + if (raxEOF(&ri)) { + /* No found, so search from the last recorded node. */ + raxSeek(&ri, "$", NULL, 0); + raxPrev(&ri); + node = (listNode *)ri.data; + } else { + raxPrev(&ri); /* Skip the sought node. */ + /* We should search from the prev node since the offset of current + * sought node exceeds searching offset. */ + if (raxPrev(&ri)) + node = (listNode *)ri.data; + else + node = server.repl_backlog->ref_repl_buf_node; + } + raxStop(&ri); + } else { + /* No recorded blocks, just from the start node to search. */ + node = server.repl_backlog->ref_repl_buf_node; + } + + /* Search the exact node. */ + while (node != NULL) { + replBufBlock *o = listNodeValue(node); + if (o->repl_offset + (long long)o->used >= offset) break; + node = listNextNode(node); + } + serverAssert(node != NULL); + + /* Install a writer handler first.*/ + prepareClientToWrite(c); + /* Setting output buffer of the replica. */ + replBufBlock *o = listNodeValue(node); + o->refcount++; + c->ref_repl_buf_node = node; + c->ref_block_pos = offset - o->repl_offset; + + return server.repl_backlog->histlen - skip; +} + +/* Return the offset to provide as reply to the PSYNC command received + * from the slave. The returned value is only valid immediately after + * the BGSAVE process started and before executing any other command + * from clients. */ +long long getPsyncInitialOffset(void) { + return server.master_repl_offset; +} + +/* Send a FULLRESYNC reply in the specific case of a full resynchronization, + * as a side effect setup the slave for a full sync in different ways: + * + * 1) Remember, into the slave client structure, the replication offset + * we sent here, so that if new slaves will later attach to the same + * background RDB saving process (by duplicating this client output + * buffer), we can get the right offset from this slave. + * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that + * we start accumulating differences from this point. + * 3) Force the replication stream to re-emit a SELECT statement so + * the new slave incremental differences will start selecting the + * right database number. + * + * Normally this function should be called immediately after a successful + * BGSAVE for replication was started, or when there is one already in + * progress that we attached our slave to. */ +int replicationSetupSlaveForFullResync(client *slave, long long offset) { + char buf[128]; + int buflen; + + slave->psync_initial_offset = offset; + slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; + /* We are going to accumulate the incremental changes for this + * slave as well. Set slaveseldb to -1 in order to force to re-emit + * a SELECT statement in the replication stream. */ + server.slaveseldb = -1; + + /* Slots snapshot. */ + if (slave->flags & CLIENT_REPL_RDB_CHANNEL && + slave->slave_req & SLAVE_REQ_SLOTS_SNAPSHOT) + { + /* Start to deliver the commands stream on migrating slots. */ + asmSlotSnapshotAndStreamStart(slave->task); + + buflen = snprintf(buf, sizeof(buf), "+SLOTSSNAPSHOT\r\n"); + if (connWrite(slave->conn, buf, buflen) != buflen) { + freeClientAsync(slave); + return C_ERR; + } + return C_OK; + } + + /* Don't send this reply to slaves that approached us with + * the old SYNC command. */ + if (!(slave->flags & CLIENT_PRE_PSYNC)) { + if (slave->flags & CLIENT_REPL_RDB_CHANNEL) { + /* This slave is rdbchannel. Find its associated main channel and + * change its state so we can deliver replication stream from now + * on, in parallel to rdb. */ + uint64_t id = slave->main_ch_client_id; + client *c = lookupClientByID(id); + if (c && c->replstate == SLAVE_STATE_WAIT_RDB_CHANNEL) { + c->replstate = SLAVE_STATE_SEND_BULK_AND_STREAM; + serverLog(LL_NOTICE, "Starting to deliver RDB and replication stream to replica: %s", + replicationGetSlaveName(c)); + } else { + serverLog(LL_WARNING, "Starting to deliver RDB to replica %s" + " but it has no associated main channel", + replicationGetSlaveName(slave)); + } + } + buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", + server.replid,offset); + if (connWrite(slave->conn,buf,buflen) != buflen) { + freeClientAsync(slave); + return C_ERR; + } + } + return C_OK; +} + +/* This function handles the PSYNC command from the point of view of a + * master receiving a request for partial resynchronization. + * + * On success return C_OK, otherwise C_ERR is returned and we proceed + * with the usual full resync. */ +int masterTryPartialResynchronization(client *c, long long psync_offset) { + long long psync_len; + char *master_replid = c->argv[1]->ptr; + char buf[128]; + int buflen; + + /* Is the replication ID of this master the same advertised by the wannabe + * slave via PSYNC? If the replication ID changed this master has a + * different replication history, and there is no way to continue. + * + * Note that there are two potentially valid replication IDs: the ID1 + * and the ID2. The ID2 however is only valid up to a specific offset. */ + if (strcasecmp(master_replid, server.replid) && + (strcasecmp(master_replid, server.replid2) || + psync_offset > server.second_replid_offset)) + { + /* Replid "?" is used by slaves that want to force a full resync. */ + if (master_replid[0] != '?') { + if (strcasecmp(master_replid, server.replid) && + strcasecmp(master_replid, server.replid2)) + { + serverLog(LL_NOTICE,"Partial resynchronization not accepted: " + "Replication ID mismatch (Replica asked for '%s', my " + "replication IDs are '%s' and '%s')", + master_replid, server.replid, server.replid2); + } else { + serverLog(LL_NOTICE,"Partial resynchronization not accepted: " + "Requested offset for second ID was %lld, but I can reply " + "up to %lld", psync_offset, server.second_replid_offset); + } + } else { + serverLog(LL_NOTICE,"Full resync requested by replica %s %s", + replicationGetSlaveName(c), + c->flags & CLIENT_REPL_RDB_CHANNEL ? "(rdb-channel)" : ""); + } + goto need_full_resync; + } + + /* We still have the data our slave is asking for? */ + if (!server.repl_backlog || + psync_offset < server.repl_backlog->offset || + psync_offset > (server.repl_backlog->offset + server.repl_backlog->histlen)) + { + serverLog(LL_NOTICE, + "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset); + if (psync_offset > server.master_repl_offset) { + serverLog(LL_WARNING, + "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); + } + goto need_full_resync; + } + + /* If we reached this point, we are able to perform a partial resync: + * 1) Set client state to make it a slave. + * 2) Inform the client we can continue with +CONTINUE + * 3) Send the backlog data (from the offset to the end) to the slave. */ + c->flags |= CLIENT_SLAVE; + c->replstate = SLAVE_STATE_ONLINE; + c->repl_ack_time = server.unixtime; + c->repl_start_cmd_stream_on_ack = 0; + listAddNodeTail(server.slaves,c); + /* We can't use the connection buffers since they are used to accumulate + * new commands at this stage. But we are sure the socket send buffer is + * empty so this write will never fail actually. */ + if (c->slave_capa & SLAVE_CAPA_PSYNC2) { + buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid); + } else { + buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); + } + if (connWrite(c->conn,buf,buflen) != buflen) { + freeClientAsync(c); + return C_OK; + } + psync_len = addReplyReplicationBacklog(c,psync_offset); + serverLog(LL_NOTICE, + "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", + replicationGetSlaveName(c), + psync_len, psync_offset); + /* Note that we don't need to set the selected DB at server.slaveseldb + * to -1 to force the master to emit SELECT, since the slave already + * has this state from the previous connection with the master. */ + + refreshGoodSlavesCount(); + + /* Fire the replica change modules event. */ + moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, + REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, + NULL); + + return C_OK; /* The caller can return, no full resync needed. */ + +need_full_resync: + /* We need a full resync for some reason... Note that we can't + * reply to PSYNC right now if a full SYNC is needed. The reply + * must include the master offset at the time the RDB file we transfer + * is generated, so we need to delay the reply to that moment. */ + return C_ERR; +} + +/* Start a BGSAVE for replication goals, which is, selecting the disk or + * socket target depending on the configuration, and making sure that + * the script cache is flushed before to start. + * + * The mincapa argument is the bitwise AND among all the slaves capabilities + * of the slaves waiting for this BGSAVE, so represents the slave capabilities + * all the slaves support. Can be tested via SLAVE_CAPA_* macros. + * + * Side effects, other than starting a BGSAVE: + * + * 1) Handle the slaves in WAIT_START state, by preparing them for a full + * sync if the BGSAVE was successfully started, or sending them an error + * and dropping them from the list of slaves. + * + * 2) Flush the Lua scripting script cache if the BGSAVE was actually + * started. + * + * Returns C_OK on success or C_ERR otherwise. */ +int startBgsaveForReplication(int mincapa, int req) { + int retval; + int socket_target = 0; + listIter li; + listNode *ln; + + /* We use a socket target if slave can handle the EOF marker and we're configured to do diskless syncs. + * Note that in case we're creating a "filtered" RDB (functions-only, for example) we also force socket replication + * to avoid overwriting the snapshot RDB file with filtered data. */ + socket_target = (server.repl_diskless_sync || req & SLAVE_REQ_RDB_MASK) && (mincapa & SLAVE_CAPA_EOF); + /* `SYNC` should have failed with error if we don't support socket and require a filter, assert this here */ + serverAssert(socket_target || !(req & SLAVE_REQ_RDB_MASK)); + + int slots_req = req & SLAVE_REQ_SLOTS_SNAPSHOT; + serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s%s", + socket_target ? (slots_req ? "slot migration destination socket" : "replicas sockets") : "disk", + (req & SLAVE_REQ_RDB_CHANNEL) ? " (rdb-channel)" : ""); + + rdbSaveInfo rsi, *rsiptr; + rsiptr = rdbPopulateSaveInfo(&rsi); + /* Only do rdbSave* when rsiptr is not NULL, + * otherwise slave will miss repl-stream-db. */ + if (rsiptr) { + if (socket_target) + retval = rdbSaveToSlavesSockets(req,rsiptr); + else { + /* Keep the page cache since it'll get used soon */ + retval = rdbSaveBackground(req, server.rdb_filename, rsiptr, RDBFLAGS_REPLICATION | RDBFLAGS_KEEP_CACHE); + } + if (server.repl_debug_pause & REPL_DEBUG_AFTER_FORK) + debugPauseProcess(); + } else { + serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later."); + retval = C_ERR; + } + + /* If we succeeded to start a BGSAVE with disk target, let's remember + * this fact, so that we can later delete the file if needed. Note + * that we don't set the flag to 1 if the feature is disabled, otherwise + * it would never be cleared: the file is not deleted. This way if + * the user enables it later with CONFIG SET, we are fine. */ + if (retval == C_OK && !socket_target && server.rdb_del_sync_files) + RDBGeneratedByReplication = 1; + + /* If we failed to BGSAVE, remove the slaves waiting for a full + * resynchronization from the list of slaves, inform them with + * an error about what happened, close the connection ASAP. */ + if (retval == C_ERR) { + serverLog(LL_WARNING,"BGSAVE for replication failed"); + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *slave = ln->value; + + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { + slave->replstate = REPL_STATE_NONE; + slave->flags &= ~CLIENT_SLAVE; + listDelNode(server.slaves,ln); + addReplyError(slave, + "BGSAVE failed, replication can't continue"); + slave->flags |= CLIENT_CLOSE_AFTER_REPLY; + } + } + return retval; + } + + /* If the target is socket, rdbSaveToSlavesSockets() already setup + * the slaves for a full resync. Otherwise for disk target do it now.*/ + if (!socket_target) { + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + client *slave = ln->value; + + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { + /* Check slave has the exact requirements */ + if (slave->slave_req != req) + continue; + replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset()); + } + } + } + + return retval; +} + +/* SYNC and PSYNC command implementation. */ +void syncCommand(client *c) { + /* ignore SYNC if already slave or in monitor mode */ + if (c->flags & CLIENT_SLAVE) return; + + /* Check if this is a failover request to a replica with the same replid and + * become a master if so. */ + if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr,"psync") && + !strcasecmp(c->argv[3]->ptr,"failover")) + { + serverLog(LL_NOTICE, "Failover request received for replid %s.", + (unsigned char *)c->argv[1]->ptr); + if (!server.masterhost) { + addReplyError(c, "PSYNC FAILOVER can't be sent to a master."); + return; + } + + if (!strcasecmp(c->argv[1]->ptr,server.replid)) { + if (server.cluster_enabled) { + clusterPromoteSelfToMaster(); + } else { + replicationUnsetMaster(); + } + sds client = catClientInfoString(sdsempty(),c); + serverLog(LL_NOTICE, + "MASTER MODE enabled (failover request from '%s')",client); + sdsfree(client); + } else { + addReplyError(c, "PSYNC FAILOVER replid must match my replid."); + return; + } + } + + /* Don't let replicas sync with us while we're failing over */ + if (server.failover_state != NO_FAILOVER) { + addReplyError(c,"-NOMASTERLINK Can't SYNC while failing over"); + return; + } + + /* Refuse SYNC requests if we are a slave but the link with our master + * is not ok... */ + if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { + addReplyError(c,"-NOMASTERLINK Can't SYNC while not connected with my master"); + return; + } + + /* SYNC can't be issued when the server has pending data to send to + * the client about already issued commands. We need a fresh reply + * buffer registering the differences between the BGSAVE and the current + * dataset, so that we can copy to other slaves if needed. */ + if (clientHasPendingReplies(c)) { + addReplyError(c,"SYNC and PSYNC are invalid with pending output"); + return; + } + + /* Fail sync if slave doesn't support EOF capability but wants a filtered RDB. This is because we force filtered + * RDB's to be generated over a socket and not through a file to avoid conflicts with the snapshot files. Forcing + * use of a socket is handled, if needed, in `startBgsaveForReplication`. */ + if (c->slave_req & SLAVE_REQ_RDB_MASK && !(c->slave_capa & SLAVE_CAPA_EOF)) { + addReplyError(c,"Filtered replica requires EOF capability"); + return; + } + + serverLog(LL_NOTICE,"Replica %s asks for synchronization", + replicationGetSlaveName(c)); + + /* Try a partial resynchronization if this is a PSYNC command. + * If it fails, we continue with usual full resynchronization, however + * when this happens replicationSetupSlaveForFullResync will replied + * with: + * + * +FULLRESYNC + * + * So the slave knows the new replid and offset to try a PSYNC later + * if the connection with the master is lost. */ + if (!strcasecmp(c->argv[0]->ptr,"psync")) { + long long psync_offset; + if (getLongLongFromObjectOrReply(c, c->argv[2], &psync_offset, NULL) != C_OK) { + serverLog(LL_WARNING, "Replica %s asks for synchronization but with a wrong offset", + replicationGetSlaveName(c)); + return; + } + + if (masterTryPartialResynchronization(c, psync_offset) == C_OK) { + server.stat_sync_partial_ok++; + return; /* No full resync needed, return. */ + } else { + char *master_replid = c->argv[1]->ptr; + + /* Increment stats for failed PSYNCs, but only if the + * replid is not "?", as this is used by slaves to force a full + * resync on purpose when they are not able to partially + * resync. */ + if (master_replid[0] != '?') server.stat_sync_partial_err++; + if (c->slave_capa & SLAVE_CAPA_RDB_CHANNEL_REPL) { + int len; + char buf[128]; + /* Replica is capable of rdbchannel replication. This is + * replica's main channel. Let replica know full sync is needed. + * Replica will open another connection (rdbchannel). Once rdb + * delivery starts, we'll stream repl data to the main channel.*/ + c->flags |= CLIENT_SLAVE; + c->replstate = SLAVE_STATE_WAIT_RDB_CHANNEL; + c->repl_ack_time = server.unixtime; + listAddNodeTail(server.slaves, c); + createReplicationBacklogIfNeeded(); + + serverLog(LL_NOTICE, + "Replica %s is capable of rdb channel synchronization, and partial sync isn't possible. " + "Full sync will continue with dedicated rdb channel.", + replicationGetSlaveName(c)); + + /* Send +RDBCHANNELSYNC with client id so we can associate replica connections on master.*/ + len = snprintf(buf, sizeof(buf), "+RDBCHANNELSYNC %llu\r\n", + (unsigned long long) c->id); + if (connWrite(c->conn, buf, strlen(buf)) != len) + freeClientAsync(c); + + return; + } + } + } else { + /* If a slave uses SYNC, we are dealing with an old implementation + * of the replication protocol (like redis-cli --slave). Flag the client + * so that we don't expect to receive REPLCONF ACK feedbacks. */ + c->flags |= CLIENT_PRE_PSYNC; + } + + /* Full resynchronization. */ + server.stat_sync_full++; + + /* Setup the slave as one waiting for BGSAVE to start. The following code + * paths will change the state if we handle the slave differently. */ + c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; + if (server.repl_disable_tcp_nodelay) + connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */ + c->repldbfd = -1; + c->flags |= CLIENT_SLAVE; + listAddNodeTail(server.slaves,c); + + /* Create the replication backlog if needed. */ + createReplicationBacklogIfNeeded(); + + /* Keep the client in the main thread to avoid data races between the + * connWrite call in startBgsaveForReplication and the client's event + * handler in IO threads. */ + if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); + + /* CASE 1: BGSAVE is in progress, with disk target. */ + if (server.child_type == CHILD_TYPE_RDB && + server.rdb_child_type == RDB_CHILD_TYPE_DISK) + { + /* Ok a background save is in progress. Let's check if it is a good + * one for replication, i.e. if there is another slave that is + * registering differences since the server forked to save. */ + client *slave; + listNode *ln; + listIter li; + + listRewind(server.slaves,&li); + while((ln = listNext(&li))) { + slave = ln->value; + /* If the client needs a buffer of commands, we can't use + * a replica without replication buffer. */ + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && + (!(slave->flags & CLIENT_REPL_RDBONLY) || + (c->flags & CLIENT_REPL_RDBONLY))) + break; + } + /* To attach this slave, we check that it has at least all the + * capabilities of the slave that triggered the current BGSAVE + * and its exact requirements. */ + if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa) && + c->slave_req == slave->slave_req) { + /* Perfect, the server is already registering differences for + * another slave. Set the right state, and copy the buffer. + * We don't copy buffer if clients don't want. */ + if (!(c->flags & CLIENT_REPL_RDBONLY)) + copyReplicaOutputBuffer(c,slave); + replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); + serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); + } else { + /* No way, we need to wait for the next BGSAVE in order to + * register differences. */ + serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC"); + } + + /* CASE 2: BGSAVE is in progress, with socket target. */ + } else if (server.child_type == CHILD_TYPE_RDB && + server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) + { + /* There is an RDB child process but it is writing directly to + * children sockets. We need to wait for the next BGSAVE + * in order to synchronize. */ + serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC"); + + /* CASE 3: There is no BGSAVE is in progress. */ + } else { + if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF) && + server.repl_diskless_sync_delay) + { + /* Diskless replication RDB child is created inside + * replicationCron() since we want to delay its start a + * few seconds to wait for more slaves to arrive. */ + serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC"); + } else { + /* We don't have a BGSAVE in progress, let's start one. Diskless + * or disk-based mode is determined by replica's capacity. */ + if (!hasActiveChildProcess()) { + startBgsaveForReplication(c->slave_capa, c->slave_req); + } else { + serverLog(LL_NOTICE, + "No BGSAVE in progress, but another BG operation is active. " + "BGSAVE for replication delayed"); + } + } + } + return; +} + +/* REPLCONF