From dcacc00e3750300617ba6e16eb346713f91a783a Mon Sep 17 00:00:00 2001 From: Mitja Felicijan Date: Wed, 21 Jan 2026 22:52:54 +0100 Subject: Remove testing data --- examples/redis-unstable/src/replication.c | 5387 ----------------------------- 1 file changed, 5387 deletions(-) delete mode 100644 examples/redis-unstable/src/replication.c (limited to 'examples/redis-unstable/src/replication.c') diff --git a/examples/redis-unstable/src/replication.c b/examples/redis-unstable/src/replication.c deleted file mode 100644 index 309d6c4..0000000 --- a/examples/redis-unstable/src/replication.c +++ /dev/null @@ -1,5387 +0,0 @@ -/* Asynchronous replication implementation. - * - * Copyright (c) 2009-Present, Redis Ltd. - * All rights reserved. - * - * Copyright (c) 2024-present, Valkey contributors. - * All rights reserved. - * - * Licensed under your choice of (a) the Redis Source Available License 2.0 - * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the - * GNU Affero General Public License v3 (AGPLv3). - * - * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information. - */ - -/* - * replication.c - Replication Management - * - * This file contains the implementation of Redis's replication logic, which - * enables data synchronization between master and replica instances. - * It handles: - * - Master-to-replica synchronization - * - Full and partial resynchronizations - * - Replication backlog management - * - State machines for replica operations - * - RDB Channel for Full Sync (lookup "rdb channel for full sync") - */ - -#include "server.h" -#include "cluster.h" -#include "cluster_slot_stats.h" -#include "bio.h" -#include "functions.h" -#include "connection.h" -#include "cluster_asm.h" - -#include -#include -#include -#include -#include -#include - -void replicationDiscardCachedMaster(void); -void replicationResurrectCachedMaster(connection *conn); -void replicationSendAck(void); -int replicaPutOnline(client *slave); -void replicaStartCommandStream(client *slave); -int cancelReplicationHandshake(int reconnect); -static void rdbChannelFullSyncWithMaster(connection *conn); -static int rdbChannelAbort(void); -static void rdbChannelBufferReplData(connection *conn); -static void rdbChannelReplDataBufInit(void); -static void rdbChannelStreamReplDataToDb(void); -static void rdbChannelCleanup(void); - -/* We take a global flag to remember if this instance generated an RDB - * because of replication, so that we can remove the RDB file in case - * the instance is configured to have no persistence. */ -int RDBGeneratedByReplication = 0; - - -/* A reference to diskless loading rio to abort it asynchronously. It's needed - * for rdbchannel replication. While loading from rdbchannel connection, we may - * yield back to eventloop. If main channel connection detects a network problem - * we want to abort loading. It calls rioAbort() in this case, so next rioRead() - * from rdbchannel connection will return error to cancel loading safely. */ -static rio *disklessLoadingRio = NULL; - -/* --------------------------- Utility functions ---------------------------- */ - -/* Returns 1 if the replica is rdbchannel and there is an associated main - * channel slave with that. */ -int replicationCheckHasMainChannel(client *replica) { - if (!(replica->flags & CLIENT_REPL_RDB_CHANNEL) || - !replica->main_ch_client_id || - lookupClientByID(replica->main_ch_client_id) == NULL) - { - return 0; - } - return 1; -} - -/* During rdb channel replication, replica opens two connections. From master - * POV, these connections are distinct replicas in server.slaves. This function - * counts associated replicas as one and returns logical replica count. */ -unsigned long replicationLogicalReplicaCount(void) { - unsigned long count = 0; - listNode *ln; - listIter li; - - listRewind(server.slaves,&li); - while ((ln = listNext(&li))) { - client *replica = listNodeValue(ln); - if (!replicationCheckHasMainChannel(replica)) - count++; - } - return count; -} - -int replicaFromIOThreadHasPendingRead(client *c) { - serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID); - - int pending_read; - atomicGetWithSync(c->pending_read, pending_read); - return pending_read; -} - -/* Send replicas to their respective IO threads if it has pending reads or - * writes. Otherwise it remains in main thread so it can check for new data in - * the replication buffer ASAP. */ -void putReplicasInPendingClientsToIOThreads(void) { - if (server.io_threads_num <= 1) return; - - serverAssert(pthread_equal(pthread_self(), server.main_thread_id)); - - listIter li; - listNode *ln; - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *replica = listNodeValue(ln); - - /* We only care about replicas that need to run on IO thread but are - * currently in main */ - if (replica->tid == IOTHREAD_MAIN_THREAD_ID || - replica->running_tid != IOTHREAD_MAIN_THREAD_ID) - { - continue; - } - - /* Skip the replica if it's scheduled for close */ - if (replica->flags & CLIENT_CLOSE_ASAP) continue; - - /* The call to clientHasPendingReplies may seem redundant but in the - * case of replica being in IO thread we can have the following case: - * replica gets back to main thread after sending the repl buffer it - * knows about. In the mean time main thread has accumulated new repl - * data. In that case the replica's client wouldn't have been put in - * the pending write queue but will still have new repl data it needs to - * send, so we make sure to check for that and send it back to IO thread - * if so. On the other hand if replica gets back to main thread before - * any new repl data has accumulated then after a new cmd is propagated - * the replica will be put in the pending write queue as usual so we - * need to check for that also. - * In addition, if the replica client has pending read events, we should - * also send them to the IO thread. */ - if (replica->flags & CLIENT_PENDING_WRITE || - clientHasPendingReplies(replica) || - replicaFromIOThreadHasPendingRead(replica)) - { - enqueuePendingClienstToIOThreads(replica); - } - } -} - -/* Run some cron tasks for a connected master client. Return 1 when the client - * is freed, 0 otherwise. */ -int replicationCronRunMasterClient(void) { - if (!server.masterhost || !server.master) return 0; - - if (server.master->running_tid != IOTHREAD_MAIN_THREAD_ID) return 0; - - /* Timed out master when we are an already connected slave? */ - if (server.repl_state == REPL_STATE_CONNECTED && - (time(NULL)-server.master->lastinteraction) > server.repl_timeout) - { - serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); - freeClient(server.master); - return 1; - } - - /* Send ACK to master from time to time. - * Note that we do not send periodic acks to masters that don't - * support PSYNC and replication offsets. */ - if (!(server.master->flags & CLIENT_PRE_PSYNC)) - replicationSendAck(); - - return 0; -} - -ConnectionType *connTypeOfReplication(void) { - if (server.tls_replication) { - return connectionTypeTls(); - } - - return connectionTypeTcp(); -} - -/* Return the pointer to a string representing the slave ip:listening_port - * pair. Mostly useful for logging, since we want to log a slave using its - * IP address and its listening port which is more clear for the user, for - * example: "Closing connection with replica 10.1.2.3:6380". */ -char *replicationGetSlaveName(client *c) { - static char buf[NET_HOST_PORT_STR_LEN]; - char ip[NET_IP_STR_LEN]; - - ip[0] = '\0'; - buf[0] = '\0'; - if (c->slave_addr || - connAddrPeerName(c->conn,ip,sizeof(ip),NULL) != -1) - { - char *addr = c->slave_addr ? c->slave_addr : ip; - if (c->slave_listening_port) - formatAddr(buf,sizeof(buf),addr,c->slave_listening_port); - else - snprintf(buf,sizeof(buf),"%s:",addr); - } else { - snprintf(buf,sizeof(buf),"client id #%llu", - (unsigned long long) c->id); - } - return buf; -} - -/* Plain unlink() can block for quite some time in order to actually apply - * the file deletion to the filesystem. This call removes the file in a - * background thread instead. We actually just do close() in the thread, - * by using the fact that if there is another instance of the same file open, - * the foreground unlink() will only remove the fs name, and deleting the - * file's storage space will only happen once the last reference is lost. */ -int bg_unlink(const char *filename) { - int fd = open(filename,O_RDONLY|O_NONBLOCK); - if (fd == -1) { - /* Can't open the file? Fall back to unlinking in the main thread. */ - return unlink(filename); - } else { - /* The following unlink() removes the name but doesn't free the - * file contents because a process still has it open. */ - int retval = unlink(filename); - if (retval == -1) { - /* If we got an unlink error, we just return it, closing the - * new reference we have to the file. */ - int old_errno = errno; - close(fd); /* This would overwrite our errno. So we saved it. */ - errno = old_errno; - return -1; - } - bioCreateCloseJob(fd, 0, 0); - return 0; /* Success. */ - } -} - -/* ---------------------------------- MASTER -------------------------------- */ - -void createReplicationBacklog(void) { - serverAssert(server.repl_backlog == NULL); - server.repl_backlog = zmalloc(sizeof(replBacklog)); - server.repl_backlog->ref_repl_buf_node = NULL; - server.repl_backlog->unindexed_count = 0; - server.repl_backlog->blocks_index = raxNew(); - server.repl_backlog->histlen = 0; - /* We don't have any data inside our buffer, but virtually the first - * byte we have is the next byte that will be generated for the - * replication stream. */ - server.repl_backlog->offset = server.master_repl_offset+1; -} - -/* This function is called when the user modifies the replication backlog - * size at runtime. It is up to the function to resize the buffer and setup it - * so that it contains the same data as the previous one (possibly less data, - * but the most recent bytes, or the same data and more free space in case the - * buffer is enlarged). */ -void resizeReplicationBacklog(void) { - if (server.repl_backlog_size < CONFIG_REPL_BACKLOG_MIN_SIZE) - server.repl_backlog_size = CONFIG_REPL_BACKLOG_MIN_SIZE; - if (server.repl_backlog) - incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); -} - -void freeReplicationBacklog(void) { - serverAssert(listLength(server.slaves) == 0); - if (server.repl_backlog == NULL) return; - - /* Decrease the start buffer node reference count. */ - if (server.repl_backlog->ref_repl_buf_node) { - replBufBlock *o = listNodeValue( - server.repl_backlog->ref_repl_buf_node); - serverAssert(o->refcount == 1); /* Last reference. */ - o->refcount--; - } - - /* Replication buffer blocks are completely released when we free the - * backlog, since the backlog is released only when there are no replicas - * and the backlog keeps the last reference of all blocks. */ - freeReplicationBacklogRefMemAsync(server.repl_buffer_blocks, - server.repl_backlog->blocks_index); - resetReplicationBuffer(); - zfree(server.repl_backlog); - server.repl_backlog = NULL; -} - -/* To make search offset from replication buffer blocks quickly - * when replicas ask partial resynchronization, we create one index - * block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */ -void createReplicationBacklogIndex(listNode *ln) { - server.repl_backlog->unindexed_count++; - if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) { - replBufBlock *o = listNodeValue(ln); - uint64_t encoded_offset = htonu64(o->repl_offset); - raxInsert(server.repl_backlog->blocks_index, - (unsigned char*)&encoded_offset, sizeof(uint64_t), - ln, NULL); - server.repl_backlog->unindexed_count = 0; - } -} - -/* Rebase replication buffer blocks' offset since the initial - * setting offset starts from 0 when master restart. */ -void rebaseReplicationBuffer(long long base_repl_offset) { - raxFree(server.repl_backlog->blocks_index); - server.repl_backlog->blocks_index = raxNew(); - server.repl_backlog->unindexed_count = 0; - - listIter li; - listNode *ln; - listRewind(server.repl_buffer_blocks, &li); - while ((ln = listNext(&li))) { - replBufBlock *o = listNodeValue(ln); - o->repl_offset += base_repl_offset; - createReplicationBacklogIndex(ln); - } -} - -void resetReplicationBuffer(void) { - server.repl_buffer_mem = 0; - server.repl_buffer_blocks = listCreate(); - listSetFreeMethod(server.repl_buffer_blocks, zfree); -} - -int canFeedReplicaReplBuffer(client *replica) { - /* Don't feed replicas that only want the RDB or main channels of migration - * destinations which need filtered stream for migrating slot ranges. */ - if (replica->flags & CLIENT_REPL_RDBONLY || - replica->flags & CLIENT_ASM_MIGRATING) return 0; - - /* Don't feed replicas that are still waiting for BGSAVE to start. */ - if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START || - replica->replstate == SLAVE_STATE_WAIT_RDB_CHANNEL) return 0; - - /* Don't feed replicas that are going to be closed ASAP. */ - if (replica->flags & CLIENT_CLOSE_ASAP) return 0; - - return 1; -} - -/* Create the replication backlog if needed. */ -void createReplicationBacklogIfNeeded(void) { - if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) { - /* When we create the backlog from scratch, we always use a new - * replication ID and clear the ID2, since there is no valid - * past history. */ - changeReplicationId(); - clearReplicationId2(); - createReplicationBacklog(); - serverLog(LL_NOTICE,"Replication backlog created, my new " - "replication IDs are '%s' and '%s'", - server.replid, server.replid2); - } -} -/* Similar with 'prepareClientToWrite', note that we must call this function - * before feeding replication stream into global replication buffer, since - * clientHasPendingReplies in prepareClientToWrite will access the global - * replication buffer to make judgements. */ -int prepareReplicasToWrite(void) { - listIter li; - listNode *ln; - int prepared = 0; - - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - if (!canFeedReplicaReplBuffer(slave)) continue; - if (prepareClientToWrite(slave) == C_ERR) continue; - prepared++; - } - - return prepared; -} - -/* Wrapper for feedReplicationBuffer() that takes Redis string objects - * as input. */ -void feedReplicationBufferWithObject(robj *o) { - char llstr[LONG_STR_SIZE]; - void *p; - size_t len; - - if (o->encoding == OBJ_ENCODING_INT) { - len = ll2string(llstr,sizeof(llstr),(long)o->ptr); - p = llstr; - } else { - len = sdslen(o->ptr); - p = o->ptr; - } - feedReplicationBuffer(p,len); -} - -/* Generally, we only have one replication buffer block to trim when replication - * backlog size exceeds our setting and no replica reference it. But if replica - * clients disconnect, we need to free many replication buffer blocks that are - * referenced. It would cost much time if there are a lots blocks to free, that - * will freeze server, so we trim replication backlog incrementally. */ -void incrementalTrimReplicationBacklog(size_t max_blocks) { - serverAssert(server.repl_backlog != NULL); - - size_t trimmed_blocks = 0; - while (server.repl_backlog->histlen > server.repl_backlog_size && - trimmed_blocks < max_blocks) - { - /* We never trim backlog to less than one block. */ - if (listLength(server.repl_buffer_blocks) <= 1) break; - - /* Replicas increment the refcount of the first replication buffer block - * they refer to, in that case, we don't trim the backlog even if - * backlog_histlen exceeds backlog_size. This implicitly makes backlog - * bigger than our setting, but makes the master accept partial resync as - * much as possible. So that backlog must be the last reference of - * replication buffer blocks. */ - listNode *first = listFirst(server.repl_buffer_blocks); - serverAssert(first == server.repl_backlog->ref_repl_buf_node); - replBufBlock *fo = listNodeValue(first); - if (fo->refcount != 1) break; - - /* We don't try trim backlog if backlog valid size will be lessen than - * setting backlog size once we release the first repl buffer block. */ - if (server.repl_backlog->histlen - (long long)fo->size <= - server.repl_backlog_size) break; - - /* Decr refcount and release the first block later. */ - fo->refcount--; - trimmed_blocks++; - server.repl_backlog->histlen -= fo->size; - - /* Go to use next replication buffer block node. */ - listNode *next = listNextNode(first); - server.repl_backlog->ref_repl_buf_node = next; - serverAssert(server.repl_backlog->ref_repl_buf_node != NULL); - /* Incr reference count to keep the new head node. */ - ((replBufBlock *)listNodeValue(next))->refcount++; - - /* Remove the node in recorded blocks. */ - uint64_t encoded_offset = htonu64(fo->repl_offset); - raxRemove(server.repl_backlog->blocks_index, - (unsigned char*)&encoded_offset, sizeof(uint64_t), NULL); - - /* Delete the first node from global replication buffer. */ - serverAssert(fo->refcount == 0 && fo->used == fo->size); - server.repl_buffer_mem -= (fo->size + - sizeof(listNode) + sizeof(replBufBlock)); - listDelNode(server.repl_buffer_blocks, first); - } - - /* Set the offset of the first byte we have in the backlog. */ - server.repl_backlog->offset = server.master_repl_offset - - server.repl_backlog->histlen + 1; -} - -/* Free replication buffer blocks that are referenced by this client. */ -void freeReplicaReferencedReplBuffer(client *replica) { - serverAssert(replica->running_tid == IOTHREAD_MAIN_THREAD_ID); - - if (replica->ref_repl_buf_node != NULL) { - /* Decrease the start buffer node reference count. */ - replBufBlock *o = listNodeValue(replica->ref_repl_buf_node); - serverAssert(o->refcount > 0); - o->refcount--; - incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); - } - replica->ref_repl_buf_node = NULL; - replica->ref_block_pos = 0; -} - -/* Append bytes into the global replication buffer list, replication backlog and - * all replica clients use replication buffers collectively, this function replace - * 'addReply*', 'feedReplicationBacklog' for replicas and replication backlog, - * First we add buffer into global replication buffer block list, and then - * update replica / replication-backlog referenced node and block position. */ -void feedReplicationBuffer(char *s, size_t len) { - static long long repl_block_id = 0; - - if (server.repl_backlog == NULL) return; - - clusterSlotStatsIncrNetworkBytesOutForReplication(len); - - /* Update the current cmd's keys with the commands replication bytes*/ - hotkeyMetrics metrics = {0, len}; - hotkeyStatsUpdateCurrentCmd(server.hotkeys, metrics); - - while(len > 0) { - size_t start_pos = 0; /* The position of referenced block to start sending. */ - listNode *start_node = NULL; /* Replica/backlog starts referenced node. */ - int add_new_block = 0; /* Create new block if current block is total used. */ - listNode *ln = listLast(server.repl_buffer_blocks); - replBufBlock *tail = ln ? listNodeValue(ln) : NULL; - - /* Append to tail string when possible. */ - if (tail && tail->size > tail->used) { - start_node = listLast(server.repl_buffer_blocks); - start_pos = tail->used; - /* Copy the part we can fit into the tail, and leave the rest for a - * new node */ - size_t avail = tail->size - tail->used; - size_t copy = (avail >= len) ? len : avail; - memcpy(tail->buf + tail->used, s, copy); - tail->used += copy; - s += copy; - len -= copy; - server.master_repl_offset += copy; - server.repl_backlog->histlen += copy; - } - if (len) { - /* Create a new node, make sure it is allocated to at - * least PROTO_REPLY_CHUNK_BYTES */ - size_t usable_size; - /* Avoid creating nodes smaller than PROTO_REPLY_CHUNK_BYTES, so that we can append more data into them, - * and also avoid creating nodes bigger than repl_backlog_size / 16, so that we won't have huge nodes that can't - * trim when we only still need to hold a small portion from them. */ - size_t limit = max((size_t)server.repl_backlog_size / 16, (size_t)PROTO_REPLY_CHUNK_BYTES); - size_t size = min(max(len, (size_t)PROTO_REPLY_CHUNK_BYTES), limit); - tail = zmalloc_usable(size + sizeof(replBufBlock), &usable_size); - /* Take over the allocation's internal fragmentation */ - tail->size = usable_size - sizeof(replBufBlock); - size_t copy = (tail->size >= len) ? len : tail->size; - tail->used = copy; - tail->refcount = 0; - tail->repl_offset = server.master_repl_offset + 1; - tail->id = repl_block_id++; - memcpy(tail->buf, s, copy); - listAddNodeTail(server.repl_buffer_blocks, tail); - /* We also count the list node memory into replication buffer memory. */ - server.repl_buffer_mem += (usable_size + sizeof(listNode)); - add_new_block = 1; - if (start_node == NULL) { - start_node = listLast(server.repl_buffer_blocks); - start_pos = 0; - } - s += copy; - len -= copy; - server.master_repl_offset += copy; - server.repl_backlog->histlen += copy; - } - - /* For output buffer of replicas. */ - listIter li; - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - if (!canFeedReplicaReplBuffer(slave)) continue; - - /* Update shared replication buffer start position. */ - if (slave->ref_repl_buf_node == NULL) { - slave->ref_repl_buf_node = start_node; - slave->ref_block_pos = start_pos; - /* Only increase the start block reference count. */ - ((replBufBlock *)listNodeValue(start_node))->refcount++; - } - - /* Check output buffer limit only when add new block. */ - if (add_new_block) closeClientOnOutputBufferLimitReached(slave, 1); - } - - /* For replication backlog */ - if (server.repl_backlog->ref_repl_buf_node == NULL) { - server.repl_backlog->ref_repl_buf_node = start_node; - /* Only increase the start block reference count. */ - ((replBufBlock *)listNodeValue(start_node))->refcount++; - - /* Replication buffer must be empty before adding replication stream - * into replication backlog. */ - serverAssert(add_new_block == 1 && start_pos == 0); - } - if (add_new_block) { - createReplicationBacklogIndex(listLast(server.repl_buffer_blocks)); - - /* It is important to trim after adding replication data to keep the backlog size close to - * repl_backlog_size in the common case. We wait until we add a new block to avoid repeated - * unnecessary trimming attempts when small amounts of data are added. See comments in - * freeMemoryGetNotCountedMemory() for details on replication backlog memory tracking. */ - incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); - } - } -} - -/* Propagate write commands to replication stream. - * - * This function is used if the instance is a master: we use the commands - * received by our clients in order to create the replication stream. - * Instead if the instance is a replica and has sub-replicas attached, we use - * replicationFeedStreamFromMasterStream() */ -void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { - int j, len; - char llstr[LONG_STR_SIZE]; - - /* In case we propagate a command that doesn't touch keys (PING, REPLCONF) we - * pass dbid=-1 that indicate there is no need to replicate `select` command. */ - serverAssert(dictid == -1 || (dictid >= 0 && dictid < server.dbnum)); - - /* If the instance is not a top level master, return ASAP: we'll just proxy - * the stream of data we receive from our master instead, in order to - * propagate *identical* replication stream. In this way this slave can - * advertise the same replication ID as the master (since it shares the - * master replication history and has the same backlog and offsets). */ - if (server.masterhost != NULL) return; - - /* If current client is marked as master, we will proxy the command stream - * to our slaves instead of replicating them, that also happens when being - * in atomic slot migration. */ - if (server.current_client && server.current_client->flags & CLIENT_MASTER) return; - - /* If there aren't slaves, and there is no backlog buffer to populate, - * we can return ASAP. */ - if (server.repl_backlog == NULL && listLength(slaves) == 0) { - /* We increment the repl_offset anyway, since we use that for tracking AOF fsyncs - * even when there's no replication active. This code will not be reached if AOF - * is also disabled. */ - server.master_repl_offset += 1; - return; - } - - /* We can't have slaves attached and no backlog. */ - serverAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL)); - - /* Update the time of sending replication stream to replicas. */ - server.repl_stream_lastio = server.unixtime; - - /* Must install write handler for all replicas first before feeding - * replication stream. */ - prepareReplicasToWrite(); - - /* Send SELECT command to every slave if needed. */ - if (dictid != -1 && server.slaveseldb != dictid) { - robj *selectcmd; - - /* For a few DBs we have pre-computed SELECT command. */ - if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { - selectcmd = shared.select[dictid]; - } else { - int dictid_len; - - dictid_len = ll2string(llstr,sizeof(llstr),dictid); - selectcmd = createObject(OBJ_STRING, - sdscatprintf(sdsempty(), - "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", - dictid_len, llstr)); - } - - feedReplicationBufferWithObject(selectcmd); - - /* Although the SELECT command is not associated with any slot, - * its per-slot network-bytes-out accumulation is made by the above function call. - * To cancel-out this accumulation, below adjustment is made. */ - clusterSlotStatsDecrNetworkBytesOutForReplication(sdslen(selectcmd->ptr)); - - if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) - decrRefCount(selectcmd); - - server.slaveseldb = dictid; - } - - /* Write the command to the replication buffer if any. */ - char aux[LONG_STR_SIZE+3]; - - /* Add the multi bulk reply length. */ - aux[0] = '*'; - len = ll2string(aux+1,sizeof(aux)-1,argc); - aux[len+1] = '\r'; - aux[len+2] = '\n'; - feedReplicationBuffer(aux,len+3); - - for (j = 0; j < argc; j++) { - long objlen = stringObjectLen(argv[j]); - - /* We need to feed the buffer with the object as a bulk reply - * not just as a plain string, so create the $..CRLF payload len - * and add the final CRLF */ - aux[0] = '$'; - len = ll2string(aux+1,sizeof(aux)-1,objlen); - aux[len+1] = '\r'; - aux[len+2] = '\n'; - feedReplicationBuffer(aux,len+3); - feedReplicationBufferWithObject(argv[j]); - feedReplicationBuffer(aux+len+1,2); - } -} - -/* This is a debugging function that gets called when we detect something - * wrong with the replication protocol: the goal is to peek into the - * replication backlog and show a few final bytes to make simpler to - * guess what kind of bug it could be. */ -void showLatestBacklog(void) { - if (server.repl_backlog == NULL) return; - if (listLength(server.repl_buffer_blocks) == 0) return; - if (server.hide_user_data_from_log) { - serverLog(LL_NOTICE,"hide-user-data-from-log is on, skip logging backlog content to avoid spilling PII."); - return; - } - - size_t dumplen = 256; - if (server.repl_backlog->histlen < (long long)dumplen) - dumplen = server.repl_backlog->histlen; - - sds dump = sdsempty(); - listNode *node = listLast(server.repl_buffer_blocks); - while(dumplen) { - if (node == NULL) break; - replBufBlock *o = listNodeValue(node); - size_t thislen = o->used >= dumplen ? dumplen : o->used; - sds head = sdscatrepr(sdsempty(), o->buf+o->used-thislen, thislen); - sds tmp = sdscatsds(head, dump); - sdsfree(dump); - dump = tmp; - dumplen -= thislen; - node = listPrevNode(node); - } - - /* Finally log such bytes: this is vital debugging info to - * understand what happened. */ - serverLog(LL_NOTICE,"Latest backlog is: '%s'", dump); - sdsfree(dump); -} - -/* This function is used in order to proxy what we receive from our master - * to our sub-slaves. Besides, we also proxy the replication stream from - * the source node when being in atomic slot migration. */ -void replicationFeedStreamFromMasterStream(char *buf, size_t buflen) { - /* There must be replication backlog if having attached slaves. */ - if (listLength(server.slaves)) serverAssert(server.repl_backlog != NULL); - if (server.repl_backlog) { - /* Must install write handler for all replicas first before feeding - * replication stream. */ - prepareReplicasToWrite(); - feedReplicationBuffer(buf,buflen); - } else if (server.masterhost == NULL && server.aof_enabled) { - /* We increment the repl_offset anyway, since we use that for tracking - * AOF fsyncs even when there's no replication active. This code will - * not be reached if AOF is also disabled. - * - * As we skip feeding the replication buffer in atomic slot migration, - * so here we need to update the replication offset manually. */ - server.master_repl_offset += 1; - } -} - -void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { - /* Fast path to return if the monitors list is empty or the server is in loading. */ - if (monitors == NULL || listLength(monitors) == 0 || server.loading) return; - listNode *ln; - listIter li; - int j; - sds cmdrepr = sdsnew("+"); - robj *cmdobj; - struct timeval tv; - - gettimeofday(&tv,NULL); - cmdrepr = sdscatprintf(cmdrepr,"%ld.%06ld ",(long)tv.tv_sec,(long)tv.tv_usec); - if (c->flags & CLIENT_SCRIPT) { - cmdrepr = sdscatprintf(cmdrepr,"[%d lua] ",dictid); - } else if (c->flags & CLIENT_UNIX_SOCKET) { - cmdrepr = sdscatprintf(cmdrepr,"[%d unix:%s] ",dictid,server.unixsocket); - } else { - cmdrepr = sdscatprintf(cmdrepr,"[%d %s] ",dictid,getClientPeerId(c)); - } - - for (j = 0; j < argc; j++) { - if (argv[j]->encoding == OBJ_ENCODING_INT) { - cmdrepr = sdscatprintf(cmdrepr, "\"%ld\"", (long)argv[j]->ptr); - } else { - cmdrepr = sdscatrepr(cmdrepr,(char*)argv[j]->ptr, - sdslen(argv[j]->ptr)); - } - if (j != argc-1) - cmdrepr = sdscatlen(cmdrepr," ",1); - } - cmdrepr = sdscatlen(cmdrepr,"\r\n",2); - cmdobj = createObject(OBJ_STRING,cmdrepr); - - listRewind(monitors,&li); - while((ln = listNext(&li))) { - client *monitor = ln->value; - /* Do not show internal commands to non-internal clients. */ - if (c->realcmd && (c->realcmd->flags & CMD_INTERNAL) && !(monitor->flags & CLIENT_INTERNAL)) { - continue; - } - addReply(monitor,cmdobj); - updateClientMemUsageAndBucket(monitor); - } - decrRefCount(cmdobj); -} - -/* Feed the slave 'c' with the replication backlog starting from the - * specified 'offset' up to the end of the backlog. */ -long long addReplyReplicationBacklog(client *c, long long offset) { - serverAssert(c->running_tid == IOTHREAD_MAIN_THREAD_ID); - - long long skip; - - serverLog(LL_DEBUG, "[PSYNC] Replica request offset: %lld", offset); - - if (server.repl_backlog->histlen == 0) { - serverLog(LL_DEBUG, "[PSYNC] Backlog history len is zero"); - return 0; - } - - serverLog(LL_DEBUG, "[PSYNC] Backlog size: %lld", - server.repl_backlog_size); - serverLog(LL_DEBUG, "[PSYNC] First byte: %lld", - server.repl_backlog->offset); - serverLog(LL_DEBUG, "[PSYNC] History len: %lld", - server.repl_backlog->histlen); - - /* Compute the amount of bytes we need to discard. */ - skip = offset - server.repl_backlog->offset; - serverLog(LL_DEBUG, "[PSYNC] Skipping: %lld", skip); - - /* Iterate recorded blocks, quickly search the approximate node. */ - listNode *node = NULL; - if (raxSize(server.repl_backlog->blocks_index) > 0) { - uint64_t encoded_offset = htonu64(offset); - raxIterator ri; - raxStart(&ri, server.repl_backlog->blocks_index); - raxSeek(&ri, ">", (unsigned char*)&encoded_offset, sizeof(uint64_t)); - if (raxEOF(&ri)) { - /* No found, so search from the last recorded node. */ - raxSeek(&ri, "$", NULL, 0); - raxPrev(&ri); - node = (listNode *)ri.data; - } else { - raxPrev(&ri); /* Skip the sought node. */ - /* We should search from the prev node since the offset of current - * sought node exceeds searching offset. */ - if (raxPrev(&ri)) - node = (listNode *)ri.data; - else - node = server.repl_backlog->ref_repl_buf_node; - } - raxStop(&ri); - } else { - /* No recorded blocks, just from the start node to search. */ - node = server.repl_backlog->ref_repl_buf_node; - } - - /* Search the exact node. */ - while (node != NULL) { - replBufBlock *o = listNodeValue(node); - if (o->repl_offset + (long long)o->used >= offset) break; - node = listNextNode(node); - } - serverAssert(node != NULL); - - /* Install a writer handler first.*/ - prepareClientToWrite(c); - /* Setting output buffer of the replica. */ - replBufBlock *o = listNodeValue(node); - o->refcount++; - c->ref_repl_buf_node = node; - c->ref_block_pos = offset - o->repl_offset; - - return server.repl_backlog->histlen - skip; -} - -/* Return the offset to provide as reply to the PSYNC command received - * from the slave. The returned value is only valid immediately after - * the BGSAVE process started and before executing any other command - * from clients. */ -long long getPsyncInitialOffset(void) { - return server.master_repl_offset; -} - -/* Send a FULLRESYNC reply in the specific case of a full resynchronization, - * as a side effect setup the slave for a full sync in different ways: - * - * 1) Remember, into the slave client structure, the replication offset - * we sent here, so that if new slaves will later attach to the same - * background RDB saving process (by duplicating this client output - * buffer), we can get the right offset from this slave. - * 2) Set the replication state of the slave to WAIT_BGSAVE_END so that - * we start accumulating differences from this point. - * 3) Force the replication stream to re-emit a SELECT statement so - * the new slave incremental differences will start selecting the - * right database number. - * - * Normally this function should be called immediately after a successful - * BGSAVE for replication was started, or when there is one already in - * progress that we attached our slave to. */ -int replicationSetupSlaveForFullResync(client *slave, long long offset) { - char buf[128]; - int buflen; - - slave->psync_initial_offset = offset; - slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END; - /* We are going to accumulate the incremental changes for this - * slave as well. Set slaveseldb to -1 in order to force to re-emit - * a SELECT statement in the replication stream. */ - server.slaveseldb = -1; - - /* Slots snapshot. */ - if (slave->flags & CLIENT_REPL_RDB_CHANNEL && - slave->slave_req & SLAVE_REQ_SLOTS_SNAPSHOT) - { - /* Start to deliver the commands stream on migrating slots. */ - asmSlotSnapshotAndStreamStart(slave->task); - - buflen = snprintf(buf, sizeof(buf), "+SLOTSSNAPSHOT\r\n"); - if (connWrite(slave->conn, buf, buflen) != buflen) { - freeClientAsync(slave); - return C_ERR; - } - return C_OK; - } - - /* Don't send this reply to slaves that approached us with - * the old SYNC command. */ - if (!(slave->flags & CLIENT_PRE_PSYNC)) { - if (slave->flags & CLIENT_REPL_RDB_CHANNEL) { - /* This slave is rdbchannel. Find its associated main channel and - * change its state so we can deliver replication stream from now - * on, in parallel to rdb. */ - uint64_t id = slave->main_ch_client_id; - client *c = lookupClientByID(id); - if (c && c->replstate == SLAVE_STATE_WAIT_RDB_CHANNEL) { - c->replstate = SLAVE_STATE_SEND_BULK_AND_STREAM; - serverLog(LL_NOTICE, "Starting to deliver RDB and replication stream to replica: %s", - replicationGetSlaveName(c)); - } else { - serverLog(LL_WARNING, "Starting to deliver RDB to replica %s" - " but it has no associated main channel", - replicationGetSlaveName(slave)); - } - } - buflen = snprintf(buf,sizeof(buf),"+FULLRESYNC %s %lld\r\n", - server.replid,offset); - if (connWrite(slave->conn,buf,buflen) != buflen) { - freeClientAsync(slave); - return C_ERR; - } - } - return C_OK; -} - -/* This function handles the PSYNC command from the point of view of a - * master receiving a request for partial resynchronization. - * - * On success return C_OK, otherwise C_ERR is returned and we proceed - * with the usual full resync. */ -int masterTryPartialResynchronization(client *c, long long psync_offset) { - long long psync_len; - char *master_replid = c->argv[1]->ptr; - char buf[128]; - int buflen; - - /* Is the replication ID of this master the same advertised by the wannabe - * slave via PSYNC? If the replication ID changed this master has a - * different replication history, and there is no way to continue. - * - * Note that there are two potentially valid replication IDs: the ID1 - * and the ID2. The ID2 however is only valid up to a specific offset. */ - if (strcasecmp(master_replid, server.replid) && - (strcasecmp(master_replid, server.replid2) || - psync_offset > server.second_replid_offset)) - { - /* Replid "?" is used by slaves that want to force a full resync. */ - if (master_replid[0] != '?') { - if (strcasecmp(master_replid, server.replid) && - strcasecmp(master_replid, server.replid2)) - { - serverLog(LL_NOTICE,"Partial resynchronization not accepted: " - "Replication ID mismatch (Replica asked for '%s', my " - "replication IDs are '%s' and '%s')", - master_replid, server.replid, server.replid2); - } else { - serverLog(LL_NOTICE,"Partial resynchronization not accepted: " - "Requested offset for second ID was %lld, but I can reply " - "up to %lld", psync_offset, server.second_replid_offset); - } - } else { - serverLog(LL_NOTICE,"Full resync requested by replica %s %s", - replicationGetSlaveName(c), - c->flags & CLIENT_REPL_RDB_CHANNEL ? "(rdb-channel)" : ""); - } - goto need_full_resync; - } - - /* We still have the data our slave is asking for? */ - if (!server.repl_backlog || - psync_offset < server.repl_backlog->offset || - psync_offset > (server.repl_backlog->offset + server.repl_backlog->histlen)) - { - serverLog(LL_NOTICE, - "Unable to partial resync with replica %s for lack of backlog (Replica request was: %lld).", replicationGetSlaveName(c), psync_offset); - if (psync_offset > server.master_repl_offset) { - serverLog(LL_WARNING, - "Warning: replica %s tried to PSYNC with an offset that is greater than the master replication offset.", replicationGetSlaveName(c)); - } - goto need_full_resync; - } - - /* If we reached this point, we are able to perform a partial resync: - * 1) Set client state to make it a slave. - * 2) Inform the client we can continue with +CONTINUE - * 3) Send the backlog data (from the offset to the end) to the slave. */ - c->flags |= CLIENT_SLAVE; - c->replstate = SLAVE_STATE_ONLINE; - c->repl_ack_time = server.unixtime; - c->repl_start_cmd_stream_on_ack = 0; - listAddNodeTail(server.slaves,c); - /* We can't use the connection buffers since they are used to accumulate - * new commands at this stage. But we are sure the socket send buffer is - * empty so this write will never fail actually. */ - if (c->slave_capa & SLAVE_CAPA_PSYNC2) { - buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid); - } else { - buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n"); - } - if (connWrite(c->conn,buf,buflen) != buflen) { - freeClientAsync(c); - return C_OK; - } - psync_len = addReplyReplicationBacklog(c,psync_offset); - serverLog(LL_NOTICE, - "Partial resynchronization request from %s accepted. Sending %lld bytes of backlog starting from offset %lld.", - replicationGetSlaveName(c), - psync_len, psync_offset); - /* Note that we don't need to set the selected DB at server.slaveseldb - * to -1 to force the master to emit SELECT, since the slave already - * has this state from the previous connection with the master. */ - - refreshGoodSlavesCount(); - - /* Fire the replica change modules event. */ - moduleFireServerEvent(REDISMODULE_EVENT_REPLICA_CHANGE, - REDISMODULE_SUBEVENT_REPLICA_CHANGE_ONLINE, - NULL); - - return C_OK; /* The caller can return, no full resync needed. */ - -need_full_resync: - /* We need a full resync for some reason... Note that we can't - * reply to PSYNC right now if a full SYNC is needed. The reply - * must include the master offset at the time the RDB file we transfer - * is generated, so we need to delay the reply to that moment. */ - return C_ERR; -} - -/* Start a BGSAVE for replication goals, which is, selecting the disk or - * socket target depending on the configuration, and making sure that - * the script cache is flushed before to start. - * - * The mincapa argument is the bitwise AND among all the slaves capabilities - * of the slaves waiting for this BGSAVE, so represents the slave capabilities - * all the slaves support. Can be tested via SLAVE_CAPA_* macros. - * - * Side effects, other than starting a BGSAVE: - * - * 1) Handle the slaves in WAIT_START state, by preparing them for a full - * sync if the BGSAVE was successfully started, or sending them an error - * and dropping them from the list of slaves. - * - * 2) Flush the Lua scripting script cache if the BGSAVE was actually - * started. - * - * Returns C_OK on success or C_ERR otherwise. */ -int startBgsaveForReplication(int mincapa, int req) { - int retval; - int socket_target = 0; - listIter li; - listNode *ln; - - /* We use a socket target if slave can handle the EOF marker and we're configured to do diskless syncs. - * Note that in case we're creating a "filtered" RDB (functions-only, for example) we also force socket replication - * to avoid overwriting the snapshot RDB file with filtered data. */ - socket_target = (server.repl_diskless_sync || req & SLAVE_REQ_RDB_MASK) && (mincapa & SLAVE_CAPA_EOF); - /* `SYNC` should have failed with error if we don't support socket and require a filter, assert this here */ - serverAssert(socket_target || !(req & SLAVE_REQ_RDB_MASK)); - - int slots_req = req & SLAVE_REQ_SLOTS_SNAPSHOT; - serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s%s", - socket_target ? (slots_req ? "slot migration destination socket" : "replicas sockets") : "disk", - (req & SLAVE_REQ_RDB_CHANNEL) ? " (rdb-channel)" : ""); - - rdbSaveInfo rsi, *rsiptr; - rsiptr = rdbPopulateSaveInfo(&rsi); - /* Only do rdbSave* when rsiptr is not NULL, - * otherwise slave will miss repl-stream-db. */ - if (rsiptr) { - if (socket_target) - retval = rdbSaveToSlavesSockets(req,rsiptr); - else { - /* Keep the page cache since it'll get used soon */ - retval = rdbSaveBackground(req, server.rdb_filename, rsiptr, RDBFLAGS_REPLICATION | RDBFLAGS_KEEP_CACHE); - } - if (server.repl_debug_pause & REPL_DEBUG_AFTER_FORK) - debugPauseProcess(); - } else { - serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later."); - retval = C_ERR; - } - - /* If we succeeded to start a BGSAVE with disk target, let's remember - * this fact, so that we can later delete the file if needed. Note - * that we don't set the flag to 1 if the feature is disabled, otherwise - * it would never be cleared: the file is not deleted. This way if - * the user enables it later with CONFIG SET, we are fine. */ - if (retval == C_OK && !socket_target && server.rdb_del_sync_files) - RDBGeneratedByReplication = 1; - - /* If we failed to BGSAVE, remove the slaves waiting for a full - * resynchronization from the list of slaves, inform them with - * an error about what happened, close the connection ASAP. */ - if (retval == C_ERR) { - serverLog(LL_WARNING,"BGSAVE for replication failed"); - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { - slave->replstate = REPL_STATE_NONE; - slave->flags &= ~CLIENT_SLAVE; - listDelNode(server.slaves,ln); - addReplyError(slave, - "BGSAVE failed, replication can't continue"); - slave->flags |= CLIENT_CLOSE_AFTER_REPLY; - } - } - return retval; - } - - /* If the target is socket, rdbSaveToSlavesSockets() already setup - * the slaves for a full resync. Otherwise for disk target do it now.*/ - if (!socket_target) { - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - client *slave = ln->value; - - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { - /* Check slave has the exact requirements */ - if (slave->slave_req != req) - continue; - replicationSetupSlaveForFullResync(slave, getPsyncInitialOffset()); - } - } - } - - return retval; -} - -/* SYNC and PSYNC command implementation. */ -void syncCommand(client *c) { - /* ignore SYNC if already slave or in monitor mode */ - if (c->flags & CLIENT_SLAVE) return; - - /* Check if this is a failover request to a replica with the same replid and - * become a master if so. */ - if (c->argc > 3 && !strcasecmp(c->argv[0]->ptr,"psync") && - !strcasecmp(c->argv[3]->ptr,"failover")) - { - serverLog(LL_NOTICE, "Failover request received for replid %s.", - (unsigned char *)c->argv[1]->ptr); - if (!server.masterhost) { - addReplyError(c, "PSYNC FAILOVER can't be sent to a master."); - return; - } - - if (!strcasecmp(c->argv[1]->ptr,server.replid)) { - if (server.cluster_enabled) { - clusterPromoteSelfToMaster(); - } else { - replicationUnsetMaster(); - } - sds client = catClientInfoString(sdsempty(),c); - serverLog(LL_NOTICE, - "MASTER MODE enabled (failover request from '%s')",client); - sdsfree(client); - } else { - addReplyError(c, "PSYNC FAILOVER replid must match my replid."); - return; - } - } - - /* Don't let replicas sync with us while we're failing over */ - if (server.failover_state != NO_FAILOVER) { - addReplyError(c,"-NOMASTERLINK Can't SYNC while failing over"); - return; - } - - /* Refuse SYNC requests if we are a slave but the link with our master - * is not ok... */ - if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { - addReplyError(c,"-NOMASTERLINK Can't SYNC while not connected with my master"); - return; - } - - /* SYNC can't be issued when the server has pending data to send to - * the client about already issued commands. We need a fresh reply - * buffer registering the differences between the BGSAVE and the current - * dataset, so that we can copy to other slaves if needed. */ - if (clientHasPendingReplies(c)) { - addReplyError(c,"SYNC and PSYNC are invalid with pending output"); - return; - } - - /* Fail sync if slave doesn't support EOF capability but wants a filtered RDB. This is because we force filtered - * RDB's to be generated over a socket and not through a file to avoid conflicts with the snapshot files. Forcing - * use of a socket is handled, if needed, in `startBgsaveForReplication`. */ - if (c->slave_req & SLAVE_REQ_RDB_MASK && !(c->slave_capa & SLAVE_CAPA_EOF)) { - addReplyError(c,"Filtered replica requires EOF capability"); - return; - } - - serverLog(LL_NOTICE,"Replica %s asks for synchronization", - replicationGetSlaveName(c)); - - /* Try a partial resynchronization if this is a PSYNC command. - * If it fails, we continue with usual full resynchronization, however - * when this happens replicationSetupSlaveForFullResync will replied - * with: - * - * +FULLRESYNC - * - * So the slave knows the new replid and offset to try a PSYNC later - * if the connection with the master is lost. */ - if (!strcasecmp(c->argv[0]->ptr,"psync")) { - long long psync_offset; - if (getLongLongFromObjectOrReply(c, c->argv[2], &psync_offset, NULL) != C_OK) { - serverLog(LL_WARNING, "Replica %s asks for synchronization but with a wrong offset", - replicationGetSlaveName(c)); - return; - } - - if (masterTryPartialResynchronization(c, psync_offset) == C_OK) { - server.stat_sync_partial_ok++; - return; /* No full resync needed, return. */ - } else { - char *master_replid = c->argv[1]->ptr; - - /* Increment stats for failed PSYNCs, but only if the - * replid is not "?", as this is used by slaves to force a full - * resync on purpose when they are not able to partially - * resync. */ - if (master_replid[0] != '?') server.stat_sync_partial_err++; - if (c->slave_capa & SLAVE_CAPA_RDB_CHANNEL_REPL) { - int len; - char buf[128]; - /* Replica is capable of rdbchannel replication. This is - * replica's main channel. Let replica know full sync is needed. - * Replica will open another connection (rdbchannel). Once rdb - * delivery starts, we'll stream repl data to the main channel.*/ - c->flags |= CLIENT_SLAVE; - c->replstate = SLAVE_STATE_WAIT_RDB_CHANNEL; - c->repl_ack_time = server.unixtime; - listAddNodeTail(server.slaves, c); - createReplicationBacklogIfNeeded(); - - serverLog(LL_NOTICE, - "Replica %s is capable of rdb channel synchronization, and partial sync isn't possible. " - "Full sync will continue with dedicated rdb channel.", - replicationGetSlaveName(c)); - - /* Send +RDBCHANNELSYNC with client id so we can associate replica connections on master.*/ - len = snprintf(buf, sizeof(buf), "+RDBCHANNELSYNC %llu\r\n", - (unsigned long long) c->id); - if (connWrite(c->conn, buf, strlen(buf)) != len) - freeClientAsync(c); - - return; - } - } - } else { - /* If a slave uses SYNC, we are dealing with an old implementation - * of the replication protocol (like redis-cli --slave). Flag the client - * so that we don't expect to receive REPLCONF ACK feedbacks. */ - c->flags |= CLIENT_PRE_PSYNC; - } - - /* Full resynchronization. */ - server.stat_sync_full++; - - /* Setup the slave as one waiting for BGSAVE to start. The following code - * paths will change the state if we handle the slave differently. */ - c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; - if (server.repl_disable_tcp_nodelay) - connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */ - c->repldbfd = -1; - c->flags |= CLIENT_SLAVE; - listAddNodeTail(server.slaves,c); - - /* Create the replication backlog if needed. */ - createReplicationBacklogIfNeeded(); - - /* Keep the client in the main thread to avoid data races between the - * connWrite call in startBgsaveForReplication and the client's event - * handler in IO threads. */ - if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); - - /* CASE 1: BGSAVE is in progress, with disk target. */ - if (server.child_type == CHILD_TYPE_RDB && - server.rdb_child_type == RDB_CHILD_TYPE_DISK) - { - /* Ok a background save is in progress. Let's check if it is a good - * one for replication, i.e. if there is another slave that is - * registering differences since the server forked to save. */ - client *slave; - listNode *ln; - listIter li; - - listRewind(server.slaves,&li); - while((ln = listNext(&li))) { - slave = ln->value; - /* If the client needs a buffer of commands, we can't use - * a replica without replication buffer. */ - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && - (!(slave->flags & CLIENT_REPL_RDBONLY) || - (c->flags & CLIENT_REPL_RDBONLY))) - break; - } - /* To attach this slave, we check that it has at least all the - * capabilities of the slave that triggered the current BGSAVE - * and its exact requirements. */ - if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa) && - c->slave_req == slave->slave_req) { - /* Perfect, the server is already registering differences for - * another slave. Set the right state, and copy the buffer. - * We don't copy buffer if clients don't want. */ - if (!(c->flags & CLIENT_REPL_RDBONLY)) - copyReplicaOutputBuffer(c,slave); - replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); - serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); - } else { - /* No way, we need to wait for the next BGSAVE in order to - * register differences. */ - serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC"); - } - - /* CASE 2: BGSAVE is in progress, with socket target. */ - } else if (server.child_type == CHILD_TYPE_RDB && - server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) - { - /* There is an RDB child process but it is writing directly to - * children sockets. We need to wait for the next BGSAVE - * in order to synchronize. */ - serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC"); - - /* CASE 3: There is no BGSAVE is in progress. */ - } else { - if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF) && - server.repl_diskless_sync_delay) - { - /* Diskless replication RDB child is created inside - * replicationCron() since we want to delay its start a - * few seconds to wait for more slaves to arrive. */ - serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC"); - } else { - /* We don't have a BGSAVE in progress, let's start one. Diskless - * or disk-based mode is determined by replica's capacity. */ - if (!hasActiveChildProcess()) { - startBgsaveForReplication(c->slave_capa, c->slave_req); - } else { - serverLog(LL_NOTICE, - "No BGSAVE in progress, but another BG operation is active. " - "BGSAVE for replication delayed"); - } - } - } - return; -} - -/* REPLCONF