/* cluster_asm.c -- Atomic slot migration implementation for cluster * * Copyright (c) 2025-Present, Redis Ltd. * All rights reserved. * * Licensed under your choice of (a) the Redis Source Available License 2.0 * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the * GNU Affero General Public License v3 (AGPLv3). */ #include "server.h" #include "cluster.h" #include "functions.h" #include "cluster_asm.h" #include "cluster_slot_stats.h" #define ASM_IMPORT (1 << 1) #define ASM_MIGRATE (1 << 2) #define ASM_DEBUG_TRIM_DEFAULT 0 #define ASM_DEBUG_TRIM_NONE 1 #define ASM_DEBUG_TRIM_BG 2 #define ASM_DEBUG_TRIM_ACTIVE 3 #define ASM_AOF_MIN_ITEMS_PER_KEY 512 /* Minimum number of items per key to use AOF format encoding */ typedef struct asmTask { sds id; /* Task ID */ int operation; /* Either ASM_IMPORT or ASM_MIGRATE */ slotRangeArray *slots; /* List of slot ranges for this migration task */ int state; /* Current state of the task */ int dest_state; /* Destination node's main state (approximate) */ char source[CLUSTER_NAMELEN]; /* Source node name */ char dest[CLUSTER_NAMELEN]; /* Destination node name */ clusterNode *source_node; /* Source node */ connection *main_channel_conn; /* Main channel connection */ connection *rdb_channel_conn; /* RDB channel connection */ int rdb_channel_state; /* State of the RDB channel */ unsigned long long dest_offset; /* Destination offset */ unsigned long long source_offset; /* Source offset */ int cross_slot_during_propagating; /* If cross-slot commands are encountered during propagating */ int stream_eof_during_streaming; /* If STREAM-EOF is received during streaming buffer */ replDataBuf sync_buffer; /* Buffer for the stream */ client *main_channel_client; /* Client for the main channel on the source side */ client *rdb_channel_client; /* Client for the RDB channel on the source side */ long long retry_count; /* Number of retries for this task */ mstime_t create_time; /* Task creation time */ mstime_t start_time; /* Task start time */ mstime_t end_time; /* Task end time */ mstime_t paused_time; /* The time when the slot writes were paused */ mstime_t dest_slots_snapshot_time; /* The time when the destination starts applying the slot snapshot */ mstime_t dest_accum_applied_time; /* The time when the destination finishes applying the accumulated buffer */ sds error; /* Error message for this task */ redisOpArray *pre_snapshot_module_cmds; /* Module commands to be propagated at the beginning of slot migration */ } asmTask; struct asmManager { list *tasks; /* List of asmTask to be processed */ list *archived_tasks; /* List of archived asmTask */ list *pending_trim_jobs; /* List of pending trim jobs (due to write pause) */ list *active_trim_jobs; /* List of active trim jobs */ slotRangeArrayIter *active_trim_it; /* Iterator of the current active trim job */ size_t sync_buffer_peak; /* Peak size of sync buffer */ asmTask *master_task; /* The task that is currently active on the master */ /* Fail point injection for debugging */ int debug_fail_channel; /* Channel where the task will fail */ int debug_fail_state; /* State where the task will fail */ int debug_trim_method; /* Method to trim the buffer */ int debug_active_trim_delay; /* Sleep before trimming each key */ /* Active trim stats */ unsigned long long active_trim_started; /* Number of times active trim was started */ unsigned long long active_trim_completed; /* Number of times active trim was completed */ unsigned long long active_trim_cancelled; /* Number of times active trim was cancelled */ unsigned long long active_trim_current_job_keys; /* Total number of keys to trim in the current job */ unsigned long long active_trim_current_job_trimmed; /* Number of keys trimmed in the current job */ }; enum asmState { /* Common state */ ASM_NONE = 0, ASM_CONNECTING, ASM_AUTH_REPLY, ASM_CANCELED, ASM_FAILED, ASM_COMPLETED, /* Import state */ ASM_SEND_HANDSHAKE, ASM_HANDSHAKE_REPLY, ASM_SEND_SYNCSLOTS, ASM_SYNCSLOTS_REPLY, ASM_INIT_RDBCHANNEL, ASM_ACCUMULATE_BUF, ASM_READY_TO_STREAM, ASM_STREAMING_BUF, ASM_WAIT_STREAM_EOF, ASM_TAKEOVER, /* Migrate state */ ASM_WAIT_RDBCHANNEL, ASM_WAIT_BGSAVE_START, ASM_SEND_BULK_AND_STREAM, ASM_SEND_STREAM, ASM_HANDOFF_PREP, ASM_HANDOFF, ASM_STREAM_EOF, /* RDB channel state */ ASM_RDBCHANNEL_REQUEST, ASM_RDBCHANNEL_REPLY, ASM_RDBCHANNEL_TRANSFER, }; enum asmChannel { ASM_IMPORT_MAIN_CHANNEL = 1, /* Main channel for the import task */ ASM_IMPORT_RDB_CHANNEL, /* RDB channel for the import task */ ASM_MIGRATE_MAIN_CHANNEL, /* Main channel for the migrate task */ ASM_MIGRATE_RDB_CHANNEL /* RDB channel for the migrate task */ }; /* Global ASM manager */ struct asmManager *asmManager = NULL; /* replication.c */ char *sendCommand(connection *conn, ...); char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens); char *receiveSynchronousResponse(connection *conn); ConnectionType *connTypeOfReplication(void); int startBgsaveForReplication(int mincapa, int req); void createReplicationBacklogIfNeeded(void); /* cluster.c */ void createDumpPayload(rio *payload, robj *o, robj *key, int dbid, int skip_checksum); /* cluster_asm.c */ static void asmStartImportTask(asmTask *task); static void asmTaskCancel(asmTask *task, const char *reason); static void asmSyncBufferReadFromConn(connection *conn); static void propagateTrimSlots(slotRangeArray *slots); void asmTrimJobSchedule(slotRangeArray *slots); void asmTrimJobProcessPending(void); void asmCancelPendingTrimJobs(void); void asmTriggerActiveTrim(slotRangeArray *slots); void asmActiveTrimEnd(void); int asmIsAnyTrimJobOverlaps(slotRangeArray *slots); void asmTrimSlotsIfNotOwned(slotRangeArray *slots); void asmNotifyStateChange(asmTask *task, int event); void asmInit(void) { asmManager = zcalloc(sizeof(*asmManager)); asmManager->tasks = listCreate(); asmManager->archived_tasks = listCreate(); asmManager->pending_trim_jobs = listCreate(); asmManager->sync_buffer_peak = 0; asmManager->master_task = NULL; asmManager->debug_fail_channel = -1; asmManager->debug_fail_state = -1; asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT; asmManager->debug_active_trim_delay = 0; asmManager->active_trim_jobs = listCreate(); asmManager->active_trim_started = 0; asmManager->active_trim_completed = 0; asmManager->active_trim_cancelled = 0; listSetFreeMethod(asmManager->active_trim_jobs, slotRangeArrayFreeGeneric); } char *asmTaskStateToString(int state) { switch (state) { case ASM_NONE: return "none"; case ASM_CONNECTING: return "connecting"; case ASM_AUTH_REPLY: return "auth-reply"; case ASM_CANCELED: return "canceled"; case ASM_FAILED: return "failed"; case ASM_COMPLETED: return "completed"; /* Import state */ case ASM_SEND_HANDSHAKE: return "send-handshake"; case ASM_HANDSHAKE_REPLY: return "handshake-reply"; case ASM_SEND_SYNCSLOTS: return "send-syncslots"; case ASM_SYNCSLOTS_REPLY: return "syncslots-reply"; case ASM_INIT_RDBCHANNEL: return "init-rdbchannel"; case ASM_ACCUMULATE_BUF: return "accumulate-buffer"; case ASM_READY_TO_STREAM: return "ready-to-stream"; case ASM_STREAMING_BUF: return "streaming-buffer"; case ASM_WAIT_STREAM_EOF: return "wait-stream-eof"; case ASM_TAKEOVER: return "takeover"; /* Migrate state */ case ASM_WAIT_RDBCHANNEL: return "wait-rdbchannel"; case ASM_WAIT_BGSAVE_START: return "wait-bgsave-start"; case ASM_SEND_BULK_AND_STREAM: return "send-bulk-and-stream"; case ASM_SEND_STREAM: return "send-stream"; case ASM_HANDOFF_PREP: return "handoff-prep"; case ASM_HANDOFF: return "handoff"; case ASM_STREAM_EOF: return "stream-eof"; /* RDB channel state */ case ASM_RDBCHANNEL_REQUEST: return "rdbchannel-request"; case ASM_RDBCHANNEL_REPLY: return "rdbchannel-reply"; case ASM_RDBCHANNEL_TRANSFER: return "rdbchannel-transfer"; default: return "unknown"; } serverAssert(0); /* Unreachable */ } const char *asmChannelToString(int channel) { switch (channel) { case ASM_IMPORT_MAIN_CHANNEL: return "import-main-channel"; case ASM_IMPORT_RDB_CHANNEL: return "import-rdb-channel"; case ASM_MIGRATE_MAIN_CHANNEL: return "migrate-main-channel"; case ASM_MIGRATE_RDB_CHANNEL: return "migrate-rdb-channel"; default: return "unknown"; } } int asmDebugSetFailPoint(char *channel, char *state) { if (!asmManager) { serverLog(LL_WARNING, "ASM manager is not initialized"); return C_ERR; } asmManager->debug_fail_channel = -1; asmManager->debug_fail_state = -1; if (!channel && !state) return C_ERR; if (sdslen(channel) == 0 && sdslen(state) == 0) { serverLog(LL_WARNING, "ASM fail point is cleared"); return C_OK; } for (int i = ASM_IMPORT_MAIN_CHANNEL; i <= ASM_MIGRATE_RDB_CHANNEL; i++) { if (!strcasecmp(channel, asmChannelToString(i))) { asmManager->debug_fail_channel = i; break; } } if (asmManager->debug_fail_channel == -1) return C_ERR; for (int i = ASM_NONE; i <= ASM_RDBCHANNEL_TRANSFER; i++) { if (!strcasecmp(state, asmTaskStateToString(i))) { asmManager->debug_fail_state = i; break; } } if (asmManager->debug_fail_state == -1) return C_ERR; serverLog(LL_NOTICE, "ASM fail point set: channel=%s, state=%s", channel, state); return C_OK; } int asmDebugSetTrimMethod(const char *method, int active_trim_delay) { if (!asmManager) { serverLog(LL_WARNING, "ASM manager is not initialized"); return C_ERR; } int prev = asmManager->debug_trim_method; if (!strcasecmp(method, "default")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT; else if (!strcasecmp(method, "none")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_NONE; else if (!strcasecmp(method, "bg")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_BG; else if (!strcasecmp(method, "active")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_ACTIVE; else return C_ERR; /* If we are switching from none to default, delete all the keys in the * slots we don't own */ if (prev == ASM_DEBUG_TRIM_NONE && asmManager->debug_trim_method != ASM_DEBUG_TRIM_NONE) { for (int i = 0; i < CLUSTER_SLOTS; i++) if (!clusterIsMySlot(i)) clusterDelKeysInSlot(i, 0); } asmManager->debug_active_trim_delay = active_trim_delay; serverLog(LL_NOTICE, "ASM trim method was set=%s, active_trim_delay=%d", method, active_trim_delay); return C_OK; } int asmDebugIsFailPointActive(int channel, int state) { if (!asmManager) return 0; /* ASM manager not initialized */ if (asmManager->debug_fail_channel == channel && asmManager->debug_fail_state == state) { serverLog(LL_NOTICE, "ASM fail point active: channel=%s, state=%s", asmChannelToString(channel), asmTaskStateToString(state)); return 1; } return 0; } sds asmCatInfoString(sds info) { int active_tasks = 0; listIter li; listNode *ln; listRewind(asmManager->tasks, &li); while ((ln = listNext(&li)) != NULL) { asmTask *task = listNodeValue(ln); if (task->operation == ASM_IMPORT || (task->operation == ASM_MIGRATE && task->state != ASM_FAILED)) { active_tasks++; } } return sdscatprintf(info ? info : sdsempty(), "cluster_slot_migration_active_tasks:%d\r\n" "cluster_slot_migration_active_trim_running:%lu\r\n" "cluster_slot_migration_active_trim_current_job_keys:%llu\r\n" "cluster_slot_migration_active_trim_current_job_trimmed:%llu\r\n" "cluster_slot_migration_stats_active_trim_started:%llu\r\n" "cluster_slot_migration_stats_active_trim_completed:%llu\r\n" "cluster_slot_migration_stats_active_trim_cancelled:%llu\r\n", active_tasks, listLength(asmManager->active_trim_jobs), asmManager->active_trim_current_job_keys, asmManager->active_trim_current_job_trimmed, asmManager->active_trim_started, asmManager->active_trim_completed, asmManager->active_trim_cancelled); } void asmTaskReset(asmTask *task) { task->state = ASM_NONE; task->dest_state = ASM_NONE; task->rdb_channel_state = ASM_NONE; task->main_channel_conn = NULL; task->rdb_channel_conn = NULL; task->dest_offset = 0; task->source_offset = 0; task->stream_eof_during_streaming = 0; task->cross_slot_during_propagating = 0; replDataBufInit(&task->sync_buffer); task->main_channel_client = NULL; task->rdb_channel_client = NULL; task->paused_time = 0; task->dest_slots_snapshot_time = 0; task->dest_accum_applied_time = 0; task->pre_snapshot_module_cmds = NULL; } asmTask *asmTaskCreate(const char *task_id) { asmTask *task = zcalloc(sizeof(*task)); task->error = sdsempty(); asmTaskReset(task); task->slots = NULL; task->source_node = NULL; task->retry_count = 0; task->create_time = server.mstime; task->start_time = -1; task->end_time = -1; if (task_id) { task->id = sdsnew(task_id); } else { task->id = sdsnewlen(NULL, CLUSTER_NAMELEN); getRandomHexChars(task->id, CLUSTER_NAMELEN); } return task; } void asmTaskFree(asmTask *task) { replDataBufClear(&task->sync_buffer); sdsfree(task->id); slotRangeArrayFree(task->slots); sdsfree(task->error); zfree(task); } /* Convert the task state to the corresponding event. */ int asmTaskStateToEvent(asmTask *task) { if (task->operation == ASM_IMPORT) { if (task->state == ASM_COMPLETED) return ASM_EVENT_IMPORT_COMPLETED; else if (task->state == ASM_FAILED) return ASM_EVENT_IMPORT_FAILED; else return ASM_EVENT_IMPORT_STARTED; } else { if (task->state == ASM_COMPLETED) return ASM_EVENT_MIGRATE_COMPLETED; else if (task->state == ASM_FAILED) return ASM_EVENT_MIGRATE_FAILED; else return ASM_EVENT_MIGRATE_STARTED; } } /* Serialize ASM task information into a string for transmission to replicas. * Format: "task_id:source_node:dest_node:operation:state:slot_ranges" * Where slot_ranges is in the format "1000-2000 3000-4000 ..." */ sds asmTaskSerialize(asmTask *task) { sds serialized = sdsempty(); /* Add task ID */ serialized = sdscatprintf(serialized, "%s:", task->id); /* Add source node ID (40 chars) */ serialized = sdscatlen(serialized, task->source, CLUSTER_NAMELEN); serialized = sdscat(serialized, ":"); /* Add destination node ID (40 chars) */ serialized = sdscatlen(serialized, task->dest, CLUSTER_NAMELEN); serialized = sdscat(serialized, ":"); /* Add operation type */ serialized = sdscatprintf(serialized, "%s:", task->operation == ASM_IMPORT ? "import" : "migrate"); /* Add current state */ serialized = sdscatprintf(serialized, "%s:", asmTaskStateToString(task->state)); /* Add slot ranges sds */ sds slots_str = slotRangeArrayToString(task->slots); serialized = sdscatprintf(serialized, "%s", slots_str); sdsfree(slots_str); return serialized; } /* Deserialize ASM task information from a string and create a complete asmTask. * Format: "task_id:source_node:dest_node:operation:state:slot_ranges" * Returns a new asmTask on success, NULL on failure. */ asmTask *asmTaskDeserialize(sds data) { int count, idx = 0; asmTask *task = NULL; if (!data || sdslen(data) == 0) return NULL; sds *parts = sdssplitlen(data, sdslen(data), ":", 1, &count); if (count < 6) goto err; /* Parse task ID */ if (sdslen(parts[idx]) == 0) goto err; task = asmTaskCreate(parts[idx]); if (!task) goto err; idx++; /* Parse source node ID */ if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err; memcpy(task->source, parts[idx], CLUSTER_NAMELEN); idx++; /* Parse destination node ID */ if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err; memcpy(task->dest, parts[idx], CLUSTER_NAMELEN); idx++; /* Parse operation type */ if (!strcasecmp(parts[idx], "import")) { task->operation = ASM_IMPORT; } else if (!strcasecmp(parts[idx], "migrate")) { task->operation = ASM_MIGRATE; } else { goto err; } idx++; /* Parse state */ task->state = ASM_NONE; /* Default state */ for (int state = ASM_NONE; state <= ASM_RDBCHANNEL_TRANSFER; state++) { if (!strcasecmp(parts[idx], asmTaskStateToString(state))) { task->state = state; break; } } idx++; /* Parse slot ranges */ task->slots = slotRangeArrayFromString(parts[idx]); if (!task->slots) goto err; idx++; /* Ignore any extra fields for future compatibility */ sdsfreesplitres(parts, count); return task; err: if (task) asmTaskFree(task); sdsfreesplitres(parts, count); return NULL; } /* Notify replicas about ASM task information to maintain consistency during * slot migration. This function sends a CLUSTER SYNCSLOTS CONF ASM-TASK command * to all connected replicas with the serialized task information. */ void asmNotifyReplicasStateChange(struct asmTask *task) { if (!server.cluster_enabled || !clusterNodeIsMaster(getMyClusterNode())) return; /* Create command arguments for CLUSTER SYNCSLOTS CONF ASM-TASK */ robj *argv[5]; argv[0] = createStringObject("CLUSTER", 7); argv[1] = createStringObject("SYNCSLOTS", 9); argv[2] = createStringObject("CONF", 4); argv[3] = createStringObject("ASM-TASK", 8); argv[4] = createObject(OBJ_STRING, asmTaskSerialize(task)); /* Send the command to all replicas */ replicationFeedSlaves(server.slaves, -1, argv, 5); /* Clean up command objects */ for (int i = 0; i < 5; i++) { decrRefCount(argv[i]); } } /* Dump the active import ASM task information. */ sds asmDumpActiveImportTask(void) { if (!server.cluster_enabled) return NULL; /* For replica, dump the master active task. */ if (clusterNodeIsSlave(getMyClusterNode()) && asmManager->master_task && asmManager->master_task->state != ASM_FAILED && asmManager->master_task->state != ASM_COMPLETED) { return asmTaskSerialize(asmManager->master_task); } /* For master, dump the first active task. */ if (!asmManager || listLength(asmManager->tasks) == 0) return NULL; asmTask *task = listNodeValue(listFirst(asmManager->tasks)); if (task->state == ASM_NONE || task->state == ASM_FAILED || task->state == ASM_COMPLETED) return NULL; return asmTaskSerialize(task); } size_t asmGetPeakSyncBufferSize(void) { if (!asmManager) return 0; /* Compute peak sync buffer usage. The current task's peak may not * reflect in asmManager->sync_buffer_peak immediately. */ size_t peak = asmManager->sync_buffer_peak; asmTask *task = listFirst(asmManager->tasks) ? listNodeValue(listFirst(asmManager->tasks)) : NULL; if (task && task->operation == ASM_IMPORT) peak = max(task->sync_buffer.peak, asmManager->sync_buffer_peak); return peak; } size_t asmGetImportInputBufferSize(void) { if (!asmManager || listLength(asmManager->tasks) == 0) return 0; asmTask *task = listNodeValue(listFirst(asmManager->tasks)); if (task->operation == ASM_IMPORT) return task->sync_buffer.mem_used; return 0; } size_t asmGetMigrateOutputBufferSize(void) { if (!asmManager || listLength(asmManager->tasks) == 0) return 0; asmTask *task = listNodeValue(listFirst(asmManager->tasks)); if (task->operation == ASM_MIGRATE && task->main_channel_client) return getClientOutputBufferMemoryUsage(task->main_channel_client); return 0; } /* Returns the ASM task with the given ID, or NULL if no such task exists. */ static asmTask *asmLookupTaskAt(list *tasks, const char *id) { listIter li; listNode *ln; listRewind(tasks, &li); while ((ln = listNext(&li)) != NULL) { asmTask *task = listNodeValue(ln); if (!strcmp(task->id, id)) return task; } return NULL; } /* Returns the ASM task with the given ID, or NULL if no such task exists. */ asmTask *asmLookupTaskById(const char *id) { return asmLookupTaskAt(asmManager->tasks, id); } /* Returns the ASM task that is identical to the given slot range array, or NULL * if no such task exists. */ asmTask *asmLookupTaskBySlotRangeArray(slotRangeArray *slots) { listIter li; listNode *ln; listRewind(asmManager->tasks, &li); while ((ln = listNext(&li)) != NULL) { asmTask *task = listNodeValue(ln); if (slotRangeArrayIsEqual(task->slots, slots)) return task; } return NULL; } /* Returns the slot range array for the given task ID */ slotRangeArray *asmTaskGetSlotRanges(const char *task_id) { asmTask *task = NULL; if (!task_id || (task = asmLookupTaskById(task_id)) == NULL) return NULL; return task->slots; } /* Returns 1 if the slot range array overlaps with the given slot range. */ static int slotRangeArrayOverlaps(slotRangeArray *slots, slotRange *req) { for (int i = 0; i < slots->num_ranges; i++) { slotRange *sr = &slots->ranges[i]; if (sr->start <= req->end && sr->end >= req->start) return 1; } return 0; } /* Returns 1 if the two slot range arrays overlap, 0 otherwise. */ static int slotRangeArraysOverlap(slotRangeArray *slots1, slotRangeArray *slots2) { for (int i = 0; i < slots1->num_ranges; i++) { slotRange *sr1 = &slots1->ranges[i]; if (slotRangeArrayOverlaps(slots2, sr1)) return 1; } return 0; } /* Returns the ASM task that overlaps with the given slot range, or NULL if * no such task exists. */ static asmTask *lookupAsmTaskBySlotRange(slotRange *req) { listIter li; listNode *ln; listRewind(asmManager->tasks, &li); while ((ln = listNext(&li)) != NULL) { asmTask *task = listNodeValue(ln); if (slotRangeArrayOverlaps(task->slots, req)) return task; } return NULL; } /* Validates the given slot ranges for a migration task: * - Ensures the current node is a master. * - Verifies all slots are in a STABLE state. * - Confirms all slots belong to a single source node. * - Confirms no ongoing import task that overlaps with the slot ranges. * * Returns the source node if validation succeeds. * Otherwise, returns NULL and sets 'err' variable. */ static clusterNode *validateImportSlotRanges(slotRangeArray *slots, sds *err, asmTask *current) { clusterNode *source = NULL; *err = NULL; /* Ensure this is a master node */ if (!clusterNodeIsMaster(getMyClusterNode())) { *err = sdsnew("slot migration not allowed on replica."); goto out; } /* Ensure no manual migration is in progress. */ for (int i = 0; i < CLUSTER_SLOTS; i++) { if (getImportingSlotSource(i) != NULL || getMigratingSlotDest(i) != NULL) { *err = sdsnew("all slot states must be STABLE to start a slot migration task."); goto out; } } for (int i = 0; i < slots->num_ranges; i++) { slotRange *sr = &slots->ranges[i]; /* Ensure no import task overlaps with this slot range. * Skip check current task that is running for this slot range. */ asmTask *task = lookupAsmTaskBySlotRange(sr); if (task && task != current && task->operation == ASM_IMPORT) { *err = sdscatprintf(sdsempty(), "overlapping import exists for slot range: %d-%d", sr->start, sr->end); goto out; } /* Validate if we can start migration task for this slot range. */ for (int j = sr->start; j <= sr->end; j++) { clusterNode *node = getNodeBySlot(j); if (node == NULL) { *err = sdscatprintf(sdsempty(), "slot has no owner: %d", j); goto out; } if (!source) { source = node; } else if (source != node) { *err = sdsnew("slots belong to different source nodes"); goto out; } } } out: return *err ? NULL : source; } /* Returns 1 if a task with the specified operation is in progress, 0 otherwise. */ static int asmTaskInProgress(int operation) { listIter li; listNode *ln; if (!asmManager || listLength(asmManager->tasks) == 0) return 0; listRewind(asmManager->tasks, &li); while ((ln = listNext(&li)) != NULL) { asmTask *task = listNodeValue(ln); if (task->operation == operation) return 1; } return 0; } /* Returns 1 if a migrate task is in progress, 0 otherwise. */ int asmMigrateInProgress(void) { return asmTaskInProgress(ASM_MIGRATE); } /* Returns 1 if an import task is in progress, 0 otherwise. */ int asmImportInProgress(void) { return asmTaskInProgress(ASM_IMPORT); } /* Returns 1 if the task is in a state where it can receive replication stream * for the slot range, 0 otherwise. */ inline static int asmCanFeedMigrationClient(asmTask *task) { return task->operation == ASM_MIGRATE && !task->cross_slot_during_propagating && (task->state == ASM_SEND_BULK_AND_STREAM || task->state == ASM_SEND_STREAM || task->state == ASM_HANDOFF_PREP); } /* Feed the migration client with the replication stream for the slot range. */ void asmFeedMigrationClient(robj **argv, int argc) { asmTask *task = NULL; if (server.cluster_enabled == 0 || listLength(asmManager->tasks) == 0) return; /* Check if there is a migrate task that can receive replication stream. */ task = listNodeValue(listFirst(asmManager->tasks)); if (!asmCanFeedMigrationClient(task)) return; /* Ensure all arguments are converted to string encoding if necessary, * since getSlotFromCommand expects them to be string-encoded. * Generally the arguments are string-encoded, but we may rewrite * the command arguments to integer encoding. */ for (int i = 0; i < argc; i++) { if (!sdsEncodedObject(argv[i])) { serverAssert(argv[i]->encoding == OBJ_ENCODING_INT); robj *old = argv[i]; argv[i] = createStringObjectFromLongLongWithSds((long)old->ptr); decrRefCount(old); } } /* Check if the command belongs to the slot range. */ struct redisCommand *cmd = lookupCommand(argv, argc); serverAssert(cmd); int slot = getSlotFromCommand(cmd, argv, argc); /* If the command does not have keys, skip it now. * SELECT is not propagated, since we only support a single db in cluster mode. * MULTI/EXEC is not needed, since transaction semantics are unnecessary * before the slot handoff. * FUNCTION subcommands should be executed on all nodes, so here we skip it, * and even propagating them may cause an error when executing. * * NOTICE: if some keyless commands should be propagated to the destination, * we should identify them here and send. */ if (slot == INVALID_CLUSTER_SLOT) return; /* Generally we reject cross-slot commands before executing, but module may * replicate this kind of command, so we check again. To guarantee data * consistency, we cancel the task if we encounter a cross-slot command. */ if (slot == CLUSTER_CROSSSLOT) { /* We cannot cancel the task directly here, since it may lead to a recursive * call: asmTaskCancel() --> moduleFireServerEvent() --> moduleFreeContext() * --> postExecutionUnitOperations() --> propagateNow(). Even worse, this * could result in propagating pending commands to the replication stream twice. * To avoid this, we simply set a flag here, cancel the task in beforeSleep. */ task->cross_slot_during_propagating = 1; return; } /* Check if the slot belongs to the task's slot range. */ slotRange sr = {slot, slot}; if (!slotRangeArrayOverlaps(task->slots, &sr)) return; if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) freeClientAsync(task->main_channel_client); /* Feed main channel with the command. */ client *c = task->main_channel_client; size_t prev_bytes = getNormalClientPendingReplyBytes(c); addReplyArrayLen(c, argc); for (int i = 0; i < argc; i++) addReplyBulk(c, argv[i]); /* Update the task's source offset to reflect the bytes sent. */ task->source_offset += (getNormalClientPendingReplyBytes(c) - prev_bytes); } asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *err) { clusterNode *source; *err = NULL; /* Validate that the slot ranges are valid and that migration can be * initiated for them. */ source = validateImportSlotRanges(slots, err, NULL); if (!source) goto err; if (source == getMyClusterNode()) { *err = sdsnew("this node is already the owner of the slot range"); goto err; } /* Only support a single task at a time now. */ if (listLength(asmManager->tasks) != 0) { asmTask *current = listNodeValue(listFirst(asmManager->tasks)); if (current->state == ASM_FAILED) { /* We can create a new import task only if the current one is failed, * cancel the failed task to create a new one. */ asmTaskCancel(current, "new import requested"); } else { *err = sdsnew("another ASM task is already in progress"); goto err; } } /* There should be no task in progress. */ serverAssert(listLength(asmManager->tasks) == 0); /* Create a slot migration task */ asmTask *task = asmTaskCreate(task_id); task->slots = slots; task->state = ASM_NONE; task->operation = ASM_IMPORT; task->source_node = source; memcpy(task->source, clusterNodeGetName(source), CLUSTER_NAMELEN); memcpy(task->dest, getMyClusterId(), CLUSTER_NAMELEN); listAddNodeTail(asmManager->tasks, task); sds slots_str = slotRangeArrayToString(slots); serverLog(LL_NOTICE, "Import task %s created: src=%.40s, dest=%.40s, slots=%s", task->id, task->source, task->dest, slots_str); sdsfree(slots_str); return task; err: slotRangeArrayFree(slots); return NULL; } /* CLUSTER MIGRATION IMPORT * * Sent by operator to the destination node to start the migration. */ static void clusterMigrationCommandImport(client *c) { /* Validate slot range arg count */ int remaining = c->argc - 3; if (remaining == 0 || remaining % 2 != 0) { addReplyErrorArity(c); return; } slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 3); if (!slots) return; sds err = NULL; asmTask *task = asmCreateImportTask(NULL, slots, &err); if (!task) { addReplyErrorSds(c, err); return; } addReplyBulkCString(c, task->id); } /* CLUSTER MIGRATION CANCEL [ID | ALL] * - Reply: Number of cancelled tasks * * Cancels import tasks that overlap with the specified slot ranges. * Multiple tasks may be cancelled. */ static void clusterMigrationCommandCancel(client *c) { sds task_id = NULL; int num_cancelled = 0; /* Validate slot range arg count */ if (c->argc != 4 && c->argc != 5) { addReplyErrorArity(c); return; } if (!strcasecmp(c->argv[3]->ptr, "id")) { if (c->argc != 5) { addReplyErrorArity(c); return; } task_id = c->argv[4]->ptr; } else if (!strcasecmp(c->argv[3]->ptr, "all")) { if (c->argc != 4) { addReplyErrorArity(c); return; } } else { addReplyError(c, "unknown argument"); return; } num_cancelled = clusterAsmCancel(task_id, "user request"); addReplyLongLong(c, num_cancelled); } /* Reply with the status of the task. */ static void replyTaskStatus(client *c, asmTask *task) { mstime_t p = 0; addReplyMapLen(c, 12); addReplyBulkCString(c, "id"); addReplyBulkCString(c, task->id); addReplyBulkCString(c, "slots"); addReplyBulkSds(c, slotRangeArrayToString(task->slots)); addReplyBulkCString(c, "source"); addReplyBulkCBuffer(c, task->source, CLUSTER_NAMELEN); addReplyBulkCString(c, "dest"); addReplyBulkCBuffer(c, task->dest, CLUSTER_NAMELEN); addReplyBulkCString(c, "operation"); addReplyBulkCString(c, task->operation == ASM_IMPORT ? "import" : "migrate"); addReplyBulkCString(c, "state"); addReplyBulkCString(c, asmTaskStateToString(task->state)); addReplyBulkCString(c, "last_error"); addReplyBulkCBuffer(c, task->error, sdslen(task->error)); addReplyBulkCString(c, "retries"); addReplyLongLong(c, task->retry_count); addReplyBulkCString(c, "create_time"); addReplyLongLong(c, task->create_time); addReplyBulkCString(c, "start_time"); addReplyLongLong(c, task->start_time); addReplyBulkCString(c, "end_time"); addReplyLongLong(c, task->end_time); if (task->operation == ASM_MIGRATE && task->state == ASM_COMPLETED) p = task->end_time - task->paused_time; addReplyBulkCString(c, "write_pause_ms"); addReplyLongLong(c, p); } /* CLUSTER MIGRATION STATUS [ID | ALL] * - Reply: Array of atomic slot migration tasks */ static void clusterMigrationCommandStatus(client *c) { listIter li; listNode *ln; if (c->argc != 4 && c->argc != 5) { addReplyErrorArity(c); return; } if (!strcasecmp(c->argv[3]->ptr, "id")) { if (c->argc != 5) { addReplyErrorArity(c); return; } sds id = c->argv[4]->ptr; asmTask *task = asmLookupTaskAt(asmManager->tasks, id); if (!task) task = asmLookupTaskAt(asmManager->archived_tasks, id); if (!task) { addReplyArrayLen(c, 0); return; } addReplyArrayLen(c, 1); replyTaskStatus(c, task); } else if (!strcasecmp(c->argv[3]->ptr, "all")) { if (c->argc != 4) { addReplyErrorArity(c); return; } addReplyArrayLen(c, listLength(asmManager->tasks) + listLength(asmManager->archived_tasks)); listRewind(asmManager->tasks, &li); while ((ln = listNext(&li)) != NULL) replyTaskStatus(c, listNodeValue(ln)); listRewind(asmManager->archived_tasks, &li); while ((ln = listNext(&li)) != NULL) replyTaskStatus(c, listNodeValue(ln)); } else { addReplyError(c, "unknown argument"); return; } } /* CLUSTER MIGRATION * | * STATUS [ID | ALL] | * CANCEL [ID | ALL]> */ void clusterMigrationCommand(client *c) { if (c->argc < 4) { addReplyErrorArity(c); return; } if (strcasecmp(c->argv[2]->ptr, "import") == 0) { clusterMigrationCommandImport(c); } else if (strcasecmp(c->argv[2]->ptr, "status") == 0) { clusterMigrationCommandStatus(c); } else if (strcasecmp(c->argv[2]->ptr, "cancel") == 0) { clusterMigrationCommandCancel(c); } else { addReplyError(c, "unknown argument"); } } /* Return the number of keys in the specified slot ranges. */ unsigned long long asmCountKeysInSlots(slotRangeArray *slots) { if (!slots) return 0; unsigned long long key_count = 0; for (int i = 0; i < slots->num_ranges; i++) { for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { key_count += kvstoreDictSize(server.db[0].keys, j); } } return key_count; } /* Log a human-readable message for ASM task lifecycle events. */ void asmLogTaskEvent(asmTask *task, int event) { sds str = slotRangeArrayToString(task->slots); switch (event) { case ASM_EVENT_IMPORT_STARTED: serverLog(LL_NOTICE, "Import task %s started for slots: %s", task->id, str); break; case ASM_EVENT_IMPORT_FAILED: serverLog(LL_NOTICE, "Import task %s failed for slots: %s", task->id, str); break; case ASM_EVENT_TAKEOVER: serverLog(LL_NOTICE, "Import task %s is ready to takeover slots: %s", task->id, str); break; case ASM_EVENT_IMPORT_COMPLETED: serverLog(LL_NOTICE, "Import task %s completed for slots: %s (imported %llu keys)", task->id, str, asmCountKeysInSlots(task->slots)); break; case ASM_EVENT_MIGRATE_STARTED: serverLog(LL_NOTICE, "Migrate task %s started for slots: %s (keys at start: %llu)", task->id, str, asmCountKeysInSlots(task->slots)); break; case ASM_EVENT_MIGRATE_FAILED: serverLog(LL_NOTICE, "Migrate task %s failed for slots: %s", task->id, str); break; case ASM_EVENT_HANDOFF_PREP: serverLog(LL_NOTICE, "Migrate task %s preparing to handoff for slots: %s", task->id, str); break; case ASM_EVENT_MIGRATE_COMPLETED: serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s (migrated %llu keys)", task->id, str, asmCountKeysInSlots(task->slots)); break; default: break; } sdsfree(str); } /* Notify the state change to the module and the cluster implementation. */ void asmNotifyStateChange(asmTask *task, int event) { RedisModuleClusterSlotMigrationInfo info = { .version = REDISMODULE_CLUSTER_SLOT_MIGRATION_INFO_VERSION, .task_id = task->id, .slots = (RedisModuleSlotRangeArray *) task->slots }; memcpy(info.source_node_id, task->source, CLUSTER_NAMELEN); memcpy(info.destination_node_id, task->dest, CLUSTER_NAMELEN); int module_event = -1; if (event == ASM_EVENT_IMPORT_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_STARTED; else if (event == ASM_EVENT_IMPORT_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_COMPLETED; else if (event == ASM_EVENT_IMPORT_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_FAILED; else if (event == ASM_EVENT_MIGRATE_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_STARTED; else if (event == ASM_EVENT_MIGRATE_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED; else if (event == ASM_EVENT_MIGRATE_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_FAILED; serverAssert(module_event != -1); moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION, module_event, &info); serverLog(LL_DEBUG, "Fire cluster asm module event, task %s: state=%s", task->id, asmTaskStateToString(task->state)); if (clusterNodeIsMaster(getMyClusterNode())) { /* Notify the cluster impl only if it is a real active import task. */ if (task != asmManager->master_task) { asmLogTaskEvent(task, event); clusterAsmOnEvent(task->id, event, task->slots); } asmNotifyReplicasStateChange(task); /* Propagate state change to replicas */ } } void asmImportSetFailed(asmTask *task) { serverAssert(task->operation == ASM_IMPORT); if (task->state == ASM_FAILED) return; /* If we are in the RDB channel transfer state, we need to * close the client that was created for the RDB channel. */ if (task->rdb_channel_conn && task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER) { client *c = connGetPrivateData(task->rdb_channel_conn); serverAssert(c->task == task); task->rdb_channel_conn = NULL; c->task = NULL; c->flags &= ~CLIENT_MASTER; freeClientAsync(c); } /* If in the wait stream EOF or streaming buffer state, we need to close the * client that was created for the main channel. */ if (task->main_channel_conn && (task->state == ASM_STREAMING_BUF || task->state == ASM_WAIT_STREAM_EOF)) { client *c = connGetPrivateData(task->main_channel_conn); serverAssert(c->task == task); task->main_channel_conn = NULL; c->task = NULL; c->flags &= ~CLIENT_MASTER; freeClientAsync(c); } /* Close the connections */ if (task->rdb_channel_conn) connClose(task->rdb_channel_conn); if (task->main_channel_conn) connClose(task->main_channel_conn); task->rdb_channel_conn = NULL; task->main_channel_conn = NULL; /* Clear the replication data buffer */ asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak, task->sync_buffer.peak); replDataBufClear(&task->sync_buffer); /* Mark the task as failed and notify the cluster */ task->state = ASM_FAILED; asmNotifyStateChange(task, ASM_EVENT_IMPORT_FAILED); /* This node may become replica, only master can setup new slot trimming jobs. */ if (clusterNodeIsMaster(getMyClusterNode())) asmTrimJobSchedule(task->slots); } void asmMigrateSetFailed(asmTask *task) { serverAssert(task->operation == ASM_MIGRATE); if (task->state == ASM_FAILED) return; /* Close the RDB and main channel clients*/ if (task->rdb_channel_client) { task->rdb_channel_client->task = NULL; freeClientAsync(task->rdb_channel_client); task->rdb_channel_client = NULL; } if (task->main_channel_client) { task->main_channel_client->task = NULL; freeClientAsync(task->main_channel_client); task->main_channel_client = NULL; } /* Actually it is not necessary to clear the sync buffer here, * to make asmTaskReset work properly after migrate task failed */ replDataBufClear(&task->sync_buffer); /* Mark the task as failed and notify the cluster */ task->state = ASM_FAILED; asmNotifyStateChange(task, ASM_EVENT_MIGRATE_FAILED); } void asmTaskSetFailed(asmTask *task, const char *fmt, ...) { va_list ap; sds error = sdsempty(); /* Set the error message */ va_start(ap, fmt); error = sdscatvprintf(error, fmt, ap); va_end(ap); error = sdscatprintf(error, " (state: %s, rdb_channel_state: %s)", asmTaskStateToString(task->state), asmTaskStateToString(task->rdb_channel_state)); sdsfree(task->error); task->error = error; /* Log the error */ sds slots_str = slotRangeArrayToString(task->slots); serverLog(LL_WARNING, "%s task %s failed: slots=%s, err=%s", task->operation == ASM_IMPORT ? "Import" : "Migrate", task->id, slots_str, task->error); sdsfree(slots_str); if (task->operation == ASM_IMPORT) asmImportSetFailed(task); else asmMigrateSetFailed(task); } /* The task is completed or canceled. Update stats and move it to * the archived list. */ void asmTaskFinalize(asmTask *task) { listNode *ln = listFirst(asmManager->tasks); serverAssert(ln->value == task); task->source_node = NULL; /* Should never access it */ task->end_time = server.mstime; if (task->operation == ASM_IMPORT) { asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak, task->sync_buffer.peak); replDataBufClear(&task->sync_buffer); /* Not used, so save memory */ } /* Move the task to the archived list */ listUnlinkNode(asmManager->tasks, ln); listLinkNodeHead(asmManager->archived_tasks, ln); } static void asmTaskCancel(asmTask *task, const char *reason) { if (task->state == ASM_CANCELED) return; asmTaskSetFailed(task, "Cancelled due to %s", reason); task->state = ASM_CANCELED; asmTaskFinalize(task); } void asmImportTakeover(asmTask *task) { serverAssert(task->state == ASM_WAIT_STREAM_EOF || task->state == ASM_STREAMING_BUF); /* Free the main channel connection since it is no longer needed. */ serverAssert(task->main_channel_conn != NULL); client *c = connGetPrivateData(task->main_channel_conn); c->task = NULL; c->flags &= ~CLIENT_MASTER; freeClientAsync(c); task->main_channel_conn = NULL; task->state = ASM_TAKEOVER; asmLogTaskEvent(task, ASM_EVENT_TAKEOVER); clusterAsmOnEvent(task->id, ASM_EVENT_TAKEOVER, task->slots); } void asmCallbackOnFreeClient(client *c) { asmTask *task = c->task; if (!task) return; /* If the RDB channel connection is closed, mark the task as failed. */ if (c->conn && task->rdb_channel_conn == c->conn) { /* We create the client only when transferring data on the RDB channel */ serverAssert(task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER); task->rdb_channel_conn = NULL; /* Will be freed by freeClient */ c->flags &= ~CLIENT_MASTER; asmTaskSetFailed(task, "RDB channel - Connection is closed"); return; } if (c->conn && task->main_channel_conn == c->conn) { /* After or in the process of streaming buffer to DB, a client will be * created based on the main channel connection. */ serverAssert(task->state == ASM_STREAMING_BUF || task->state == ASM_WAIT_STREAM_EOF); task->main_channel_conn = NULL; /* Will be freed by freeClient */ c->flags &= ~CLIENT_MASTER; asmTaskSetFailed(task, "Main channel - Connection is closed"); return; } if (c == task->rdb_channel_client) { /* TODO: Detect whether the bgsave is completed successfully and * update the state properly. */ task->rdb_channel_state = ASM_COMPLETED; /* We may not have detected whether the child process has exited yet, * so we can't determine whether the client has completed the slots * snapshot transfer. If the RDB channel is interrupted unexpectedly, * the destination side will also close the main channel. * So here we just reset the RDB channel client of task. */ task->rdb_channel_client = NULL; return; } /* If the main channel client is closed, we need to mark the task as failed * and clean up the RDB channel client if it exists. */ if (c == task->main_channel_client) { task->main_channel_client = NULL; /* The rdb channel client will be cleaned up */ asmTaskSetFailed(task, "Main and RDB channel clients are disconnected."); return; } } /* Sends an AUTH command to the source node using the internal secret. * Returns an error string if the command fails, or NULL on success. */ char *asmSendInternalAuth(connection *conn) { size_t len = 0; const char *internal_secret = clusterGetSecret(&len); serverAssert(internal_secret != NULL); sds secret = sdsnewlen(internal_secret, len); char *err = sendCommand(conn, "AUTH", "internal connection", secret, NULL); sdsfree(secret); return err; } /* Handles the RDB channel sync with the source node. * This function is called when the RDB channel is established * and ready to sync with the source node. */ void asmRdbChannelSyncWithSource(connection *conn) { asmTask *task = connGetPrivateData(conn); char *err = NULL; sds task_error_msg = NULL; /* Check for errors in the socket: after a non blocking connect() we * may find that the socket is in error state. */ if (connGetState(conn) != CONN_STATE_CONNECTED) goto error; /* Check if the task is in a fail point state */ if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) { char buf[1]; /* Simulate a failure by shutting down the connection. On some operating * systems (e.g. Linux), the socket's receive buffer is not flushed * immediately, so we issue a dummy read to drain any pending data and * surface the error condition. * using shutdown() instead of connShutdown() because connTLSShutdown() * will free the connection directly, which is not what we want. */ shutdown(conn->fd, SHUT_RDWR); connRead(conn, buf, 1); } if (task->rdb_channel_state == ASM_CONNECTING) { connSetReadHandler(conn, asmRdbChannelSyncWithSource); connSetWriteHandler(conn, NULL); /* Send AUTH command to source node using internal auth */ err = asmSendInternalAuth(conn); if (err) goto write_error; task->rdb_channel_state = ASM_AUTH_REPLY; return; } if (task->rdb_channel_state == ASM_AUTH_REPLY) { err = receiveSynchronousResponse(conn); /* The source node did not reply */ if (err == NULL) goto no_response_error; /* Check `+OK` reply */ if (!strcmp(err, "+OK")) { sdsfree(err); err = NULL; task->rdb_channel_state = ASM_RDBCHANNEL_REQUEST; serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots rdb channel operation can continue..."); } else { task_error_msg = sdscatprintf(sdsempty(), "Error reply to AUTH from source: %s", err); sdsfree(err); goto error; } } if (task->rdb_channel_state == ASM_RDBCHANNEL_REQUEST) { err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "RDBCHANNEL", task->id, NULL); if (err) goto write_error; task->rdb_channel_state = ASM_RDBCHANNEL_REPLY; return; } if (task->rdb_channel_state == ASM_RDBCHANNEL_REPLY) { err = receiveSynchronousResponse(conn); /* The source node did not reply */ if (err == NULL) goto no_response_error; /* Ignore ‘\n' sent from the source node to keep the connection alive. */ if (sdslen(err) == 0) { serverLog(LL_DEBUG, "Received an empty line in RDBCHANNEL reply, slots snapshot delivery will start later"); sdsfree(err); return; } /* Check `+SLOTSSNAPSHOT` reply */ if (!strncmp(err, "+SLOTSSNAPSHOT", strlen("+SLOTSSNAPSHOT"))) { sdsfree(err); err = NULL; task->state = ASM_ACCUMULATE_BUF; /* The main channel buffers pending commands. */ connSetReadHandler(task->main_channel_conn, asmSyncBufferReadFromConn); task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER; client *c = createClient(conn); c->flags |= (CLIENT_MASTER | CLIENT_INTERNAL | CLIENT_ASM_IMPORTING); c->querybuf = sdsempty(); c->authenticated = 1; c->user = NULL; c->task = task; serverLog(LL_NOTICE, "Source node replied to SLOTSSNAPSHOT, syncing slots snapshot can continue..."); } else { task_error_msg = sdscatprintf(sdsempty(), "Error reply to CLUSTER SYNCSLOTS RDBCHANNEL from the source: %s", err); sdsfree(err); goto error; } return; } return; no_response_error: task_error_msg = sdsnew("Source node did not respond to command during RDBCHANNELSYNCSLOTS handshake"); /* Fall through to regular error handling */ error: asmTaskSetFailed(task, "RDB channel - Failed to sync with the source node: %s", task_error_msg ? task_error_msg : connGetLastError(conn)); sdsfree(task_error_msg); return; write_error: /* Handle sendCommand() errors. */ task_error_msg = sdscatprintf(sdsempty(), "Failed to send command to the source node: %s", err); sdsfree(err); goto error; } char *asmSendSlotRangesSync(connection *conn, asmTask *task) { /* Prepare CLUSTER SYNCSLOTS SYNC command */ serverAssert(task->slots->num_ranges <= CLUSTER_SLOTS); int argc = task->slots->num_ranges * 2 + 4; char **args = zcalloc(sizeof(char*) * argc); size_t *lens = zcalloc(sizeof(size_t) * argc); args[0] = "CLUSTER"; args[1] = "SYNCSLOTS"; args[2] = "SYNC"; args[3] = task->id; lens[0] = strlen("CLUSTER"); lens[1] = strlen("SYNCSLOTS"); lens[2] = strlen("SYNC"); lens[3] = sdslen(task->id); int i = 4; for (int j = 0; j < task->slots->num_ranges; j++) { slotRange *sr = &task->slots->ranges[j]; args[i] = sdscatprintf(sdsempty(), "%d", sr->start); lens[i] = sdslen(args[i]); args[i+1] = sdscatprintf(sdsempty(), "%d", sr->end); lens[i+1] = sdslen(args[i+1]); i += 2; } serverAssert(i == argc); /* Send command to source node */ char *err = sendCommandArgv(conn, argc, args, lens); /* Free allocated memory */ for (int j = 4; j < argc; j++) { sdsfree(args[j]); } zfree(args); zfree(lens); return err; } void asmSyncWithSource(connection *conn) { asmTask *task = connGetPrivateData(conn); char *err = NULL; /* Some task errors are not network issues, we record them explicitly. */ sds task_error_msg = NULL; /* Check for errors in the socket: after a non blocking connect() we * may find that the socket is in error state. */ if (connGetState(conn) != CONN_STATE_CONNECTED) goto error; /* Check if the fail point is active for this channel and state */ if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_MAIN_CHANNEL, task->state))) { char buf[1]; shutdown(conn->fd, SHUT_RDWR); connRead(conn, buf, 1); } if (task->state == ASM_CONNECTING) { connSetReadHandler(conn, asmSyncWithSource); connSetWriteHandler(conn, NULL); /* Send AUTH command to source node using internal auth */ err = asmSendInternalAuth(conn); if (err) goto write_error; task->state = ASM_AUTH_REPLY; return; } if (task->state == ASM_AUTH_REPLY) { err = receiveSynchronousResponse(conn); /* The source node did not reply */ if (err == NULL) goto no_response_error; /* Check `+OK` reply */ if (!strcmp(err, "+OK")) { sdsfree(err); err = NULL; task->state = ASM_SEND_HANDSHAKE; serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots can continue..."); } else { task_error_msg = sdscatprintf(sdsempty(), "Error reply to AUTH from the source: %s", err); sdsfree(err); goto error; } } if (task->state == ASM_SEND_HANDSHAKE) { sds node_id = sdsnewlen(clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN); err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "CONF", "NODE-ID", node_id, NULL); sdsfree(node_id); if (err) goto write_error; task->state = ASM_HANDSHAKE_REPLY; return; } if (task->state == ASM_HANDSHAKE_REPLY) { err = receiveSynchronousResponse(conn); /* The source node did not reply */ if (err == NULL) goto no_response_error; /* Check `+OK` reply */ if (!strcmp(err, "+OK")) { sdsfree(err); err = NULL; task->state = ASM_SEND_SYNCSLOTS; serverLog(LL_NOTICE, "Source node replied to SYNCSLOTS CONF command, syncslots can continue..."); } else { task_error_msg = sdscatprintf(sdsempty(), "Error reply to CLUSTER SYNCSLOTS CONF from the source: %s", err); sdsfree(err); goto error; } } if (task->state == ASM_SEND_SYNCSLOTS) { err = asmSendSlotRangesSync(conn, task); if (err) goto write_error; task->state = ASM_SYNCSLOTS_REPLY; return; } if (task->state == ASM_SYNCSLOTS_REPLY) { err = receiveSynchronousResponse(conn); /* The source node did not reply */ if (err == NULL) goto no_response_error; /* Check `+RDBCHANNELSYNCSLOTS` reply */ if (!strncmp(err, "+RDBCHANNELSYNCSLOTS", strlen("+RDBCHANNELSYNCSLOTS"))) { sdsfree(err); err = NULL; task->state = ASM_INIT_RDBCHANNEL; serverLog(LL_NOTICE, "Source node replied to SYNCSLOTS SYNC, syncslots can continue..."); } else if (!strncmp(err, "-NOTREADY", strlen("-NOTREADY"))) { /* The source-side cluster is temporarily not ready to start a * migration and replied -NOTREADY. We could fail this attempt and * let the import task start another attempt later but that could * trigger unnecessary cleanup in the cluster implementation. * Instead, we'll retry sending SYNCSLOTS later in asmCron(). */ sdsfree(err); task->state = ASM_SEND_SYNCSLOTS; serverLog(LL_NOTICE, "Source node replied to SYNCSLOTS SYNC with -NOTREADY, will retry later..."); return; } else { task_error_msg = sdscatprintf(sdsempty(), "Error reply to CLUSTER SYNCSLOTS SYNC from the source: %s", err); sdsfree(err); goto error; } } if (task->state == ASM_INIT_RDBCHANNEL) { /* Create RDB channel connection */ char *ip = clusterNodeIp(task->source_node); int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) : clusterNodeTcpPort(task->source_node); task->rdb_channel_conn = connCreate(server.el, connTypeOfReplication()); if (connConnect(task->rdb_channel_conn, ip, port, server.bind_source_addr, asmRdbChannelSyncWithSource) == C_ERR) { serverLog(LL_WARNING, "Unable to connect to the source node: %s", connGetLastError(task->rdb_channel_conn)); goto error; } task->rdb_channel_state = ASM_CONNECTING; connSetPrivateData(task->rdb_channel_conn, task); serverLog(LL_NOTICE, "RDB channel connection to source node %.40s established, waiting for AUTH reply...", task->source); /* Main channel waits for the new event */ connSetReadHandler(conn, NULL); return; } return; no_response_error: serverLog(LL_WARNING, "Source node did not respond to command during SYNCSLOTS handshake"); /* Fall through to regular error handling */ error: asmTaskSetFailed(task, "Main channel - Failed to sync with source node: %s", task_error_msg ? task_error_msg : connGetLastError(conn)); sdsfree(task_error_msg); return; write_error: /* Handle sendCommand() errors. */ serverLog(LL_WARNING, "Failed to send command to source node: %s", err); sdsfree(err); goto error; } int asmImportSendACK(asmTask *task) { serverAssert(task->operation == ASM_IMPORT && task->state == ASM_WAIT_STREAM_EOF); serverLog(LL_DEBUG, "Destination node applied offset is %lld", task->dest_offset); char offset[64]; ull2string(offset, sizeof(offset), task->dest_offset); char *err = sendCommand(task->main_channel_conn, "CLUSTER", "SYNCSLOTS", "ACK", asmTaskStateToString(task->state), offset, NULL); if (err) { asmTaskSetFailed(task, "Main channel - Failed to send ACK: %s", err); sdsfree(err); return C_ERR; } return C_OK; } /* Called when the RDB channel begins sending the snapshot. * From this point on, the main channel also starts sending incremental streams. */ void asmSlotSnapshotAndStreamStart(struct asmTask *task) { if (task == NULL || task->state != ASM_WAIT_BGSAVE_START) return; if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_RDB_CHANNEL, task->state))) { shutdown(task->rdb_channel_client->conn->fd, SHUT_RDWR); return; } task->main_channel_client->replstate = SLAVE_STATE_SEND_BULK_AND_STREAM; task->state = ASM_SEND_BULK_AND_STREAM; task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER; /* From the source node's perspective, the destination node begins to accumulate * the buffer while the RDB channel starts applying the slot snapshot data. */ task->dest_state = ASM_ACCUMULATE_BUF; task->dest_slots_snapshot_time = server.mstime; } /* Called when the RDB channel has succeeded in sending the snapshot. */ void asmSlotSnapshotSucceed(struct asmTask *task) { if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; /* The destination starts sending ACKs to keep the main channel alive after * receiving the snapshot, so here we need to update the last interaction * time to avoid false timeout. */ task->main_channel_client->lastinteraction = server.unixtime; task->state = ASM_SEND_STREAM; task->rdb_channel_state = ASM_COMPLETED; } /* Called when the RDB channel fails to send the snapshot. */ void asmSlotSnapshotFailed(struct asmTask *task) { if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; asmTaskSetFailed(task, "RDB channel - Failed to send slots snapshot"); } /* CLUSTER SYNCSLOTS SNAPSHOT-EOF * * This command is sent by the source node to the destination node to indicate * that the slots snapshot has ended. */ void clusterSyncSlotsSnapshotEOF(client *c) { /* This client is RDB channel connection. */ asmTask *task = c->task; if (!task || task->rdb_channel_state != ASM_RDBCHANNEL_TRANSFER || c->conn != task->rdb_channel_conn) { /* Unexpected SNAPSHOT-EOF command */ serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS SNAPSHOT-EOF command: " "rdb_channel_state=%s", asmTaskStateToString(task ? task->rdb_channel_state : ASM_NONE)); freeClientAsync(c); return; } /* RDB channel state: ASM_RDBCHANNEL_TRANSFER */ if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) { freeClientAsync(c); /* Simulate a failure */ return; } /* Clear the RDB channel connection */ task->rdb_channel_conn = NULL; task->rdb_channel_state = ASM_COMPLETED; serverLog(LL_NOTICE, "RDB channel snapshot transfer completed for the import task."); /* Free the RDB channel connection. */ c->task = NULL; c->flags &= ~CLIENT_MASTER; freeClientAsync(c); /* Will start streaming the buffer to DB, don't start here since now * we are in the context of executing command, otherwise, redis will * generate a big MULTI-EXEC including all the commands in the buffer. * just update the state here, and do it in beforeSleep(). */ task->state = ASM_READY_TO_STREAM; connSetReadHandler(task->main_channel_conn, NULL); } /* CLUSTER SYNCSLOTS STREAM-EOF * * This command is sent by the source node to the destination node to indicate * that the slot sync stream has ended and the slots can be handed off. */ void clusterSyncSlotsStreamEOF(client *c) { asmTask *task = c->task; if (!task || task->operation != ASM_IMPORT) { serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF command"); freeClientAsync(c); return; } if (task->state == ASM_STREAMING_BUF) { /* We are still streaming the buffer to DB, mark the EOF received, and we * can take over after streaming is EOF. Since we may release the context * in asmImportTakeover, this breaks the context for streaming buffer. */ task->stream_eof_during_streaming = 1; serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received during streaming buffer"); return; } if (task->state != ASM_WAIT_STREAM_EOF) { serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF state: %s", asmTaskStateToString(task->state)); freeClientAsync(c); return; } serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received when waiting for STREAM-EOF"); /* STREAM-EOF received, the source is ready to handoff, takeover now. */ asmImportTakeover(task); } /* Start the import task. */ static void asmStartImportTask(asmTask *task) { if (task->operation != ASM_IMPORT || task->state != ASM_NONE) return; sds slots_str = slotRangeArrayToString(task->slots); /* Sanity check: Clean up any keys that exist in slots not owned by this node. * This handles cases where users previously migrated slots using legacy method * but left behind orphaned keys, or maybe cluster missed cleaning up during * previous operations, which could interfere with the ASM import process. */ asmTrimSlotsIfNotOwned(task->slots); /* Check if there is any trim job in progress for the slot ranges. * We can't start the import task since the trim job will modify the data.*/ int trim_in_progress = asmIsAnyTrimJobOverlaps(task->slots); /* Notify the cluster implementation to prepare for the import task. */ int impl_ret = clusterAsmOnEvent(task->id, ASM_EVENT_IMPORT_PREP, task->slots); /* We do not start the import task if trim is disabled by module. */ int disabled_by_module = server.cluster_module_trim_disablers > 0; static int start_blocked_logged = 0; /* Cannot start import task since pause action is performed. Otherwise, we * will break the promise that no writes are performed during the pause. */ if (isPausedActions(PAUSE_ACTION_CLIENT_ALL) || isPausedActions(PAUSE_ACTION_CLIENT_WRITE) || trim_in_progress || impl_ret != C_OK || disabled_by_module) { const char *reason = disabled_by_module ? "trim is disabled by module" : impl_ret != C_OK ? "cluster is not ready" : trim_in_progress ? "trim in progress for some of the slots" : "server paused"; if (start_blocked_logged == 0) { serverLog(LL_WARNING, "Can not start import task %s for slots: %s due to %s", task->id, slots_str, reason); start_blocked_logged = 1; } sdsfree(slots_str); return; } start_blocked_logged = 0; /* Reset the log flag */ /* Detect if the cluster topology is changed. We should cancel the task if * we can not schedule it, and update the source node if needed. */ sds err = NULL; clusterNode *source = validateImportSlotRanges(task->slots, &err, task); if (!source) { asmTaskCancel(task, err); sdsfree(slots_str); sdsfree(err); return; } /* Now I'm the owner of the slot range, cancel the import task. */ if (source == getMyClusterNode()) { asmTaskCancel(task, "slots owned by myself now"); sdsfree(slots_str); return; } /* Change the source node if needed. */ if (source != task->source_node) { task->source_node = source; memcpy(task->source, clusterNodeGetName(source), CLUSTER_NAMELEN); serverLog(LL_NOTICE, "Import task %s source node changed: slots=%s, " "new_source=%.40s", task->id, slots_str, clusterNodeGetName(source)); } sdsfree(slots_str); task->state = ASM_CONNECTING; task->start_time = server.mstime; asmNotifyStateChange(task, ASM_EVENT_IMPORT_STARTED); task->main_channel_conn = connCreate(server.el, connTypeOfReplication()); char *ip = clusterNodeIp(task->source_node); int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) : clusterNodeTcpPort(task->source_node); if (connConnect(task->main_channel_conn, ip, port, server.bind_source_addr, asmSyncWithSource) == C_ERR) { asmTaskSetFailed(task, "Main channel - Failed to connect to source node: %s", connGetLastError(task->main_channel_conn)); return; } connSetPrivateData(task->main_channel_conn, task); } void clusterSyncSlotsCommand(client *c) { /* Only internal clients are allowed to execute this command to avoid * potential attack, since some state changes are not well protected, * external clients may damage the slot migration state. */ if (!(c->flags & (CLIENT_INTERNAL | CLIENT_MASTER))) { addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for internal clients"); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return; } /* On replica, only allow master client to execute CONF subcommand. */ if (!clusterNodeIsMaster(getMyClusterNode())) { if (!(c->flags & CLIENT_MASTER)) { /* Not master client, reject all subcommands and close the connection. */ addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for master"); c->flags |= CLIENT_CLOSE_AFTER_REPLY; return; } else { /* Only allow CONF subcommand on replica. */ if (strcasecmp(c->argv[2]->ptr, "conf")) return; } } if (!strcasecmp(c->argv[2]->ptr, "sync") && c->argc >= 6) { /* CLUSTER SYNCSLOTS SYNC [ ] */ if (c->argc % 2 == 1) { addReplyErrorArity(c); return; } slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 4); if (!slots) return; /* Validate that the slot ranges are valid and that migration can be * initiated for them. */ sds err = NULL; clusterNode *source = validateImportSlotRanges(slots, &err, NULL); if (!source) { addReplyErrorSds(c, err); slotRangeArrayFree(slots); return; } /* Check if the source node is the same as the current node. */ if (source != getMyClusterNode()) { addReplyError(c, "This node is not the owner of the slots"); slotRangeArrayFree(slots); return; } /* Verify the destination node is known and is a master. */ if (c->node_id) { clusterNode *dest = clusterLookupNode(c->node_id, CLUSTER_NAMELEN); if (dest == NULL || !clusterNodeIsMaster(dest)) { addReplyErrorFormat(c, "Destination node %.40s is not a master", c->node_id); slotRangeArrayFree(slots); return; } } sds task_id = c->argv[3]->ptr; /* Notify the cluster implementation to prepare for the migrate task. */ if (clusterAsmOnEvent(task_id, ASM_EVENT_MIGRATE_PREP, slots) != C_OK || asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, ASM_NONE)) { addReplyError(c, "-NOTREADY Cluster is not ready to migrate slots"); slotRangeArrayFree(slots); return; } /* We do not start the migrate task if trim is disabled by module. */ int disabled_by_module = server.cluster_module_trim_disablers > 0; if (disabled_by_module) { addReplyError(c, "Trim is disabled by module"); slotRangeArrayFree(slots); return; } asmTask *task = listLength(asmManager->tasks) == 0 ? NULL : listNodeValue(listFirst(asmManager->tasks)); if (task && !strcmp(task->id, task_id) && task->operation == ASM_MIGRATE && task->state == ASM_FAILED && slotRangeArrayIsEqual(slots, task->slots) && memcmp(task->dest, c->node_id, CLUSTER_NAMELEN) == 0) { /* Reuse the failed task */ asmTaskReset(task); slotRangeArrayFree(task->slots); /* Will be set again later */ task->retry_count++; } else if (task) { if (task->state == ASM_FAILED) { /* We can create a new migrate task only if the current one is * failed, cancel the failed task to create a new one. */ asmTaskCancel(task, "new migration requested"); task = NULL; } else { addReplyError(c, "Another ASM task is already in progress"); slotRangeArrayFree(slots); return; } } /* Create the migrate slots task and add it to the list, * otherwise reuse the existing one */ if (task == NULL) { task = asmTaskCreate(task_id); task->start_time = server.mstime; /* Start immediately */ serverAssert(listLength(asmManager->tasks) == 0); listAddNodeTail(asmManager->tasks, task); } task->slots = slots; task->operation = ASM_MIGRATE; memcpy(task->source, clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN); if (c->node_id) memcpy(task->dest, c->node_id, CLUSTER_NAMELEN); task->main_channel_client = c; c->task = task; /* We mark the main channel client as a replica, so this client is limited * by the client output buffer settings for replicas. The replstate has * no real significance, just to prevent it from going online. */ c->flags |= (CLIENT_SLAVE | CLIENT_ASM_MIGRATING); c->replstate = SLAVE_STATE_WAIT_RDB_CHANNEL; if (server.repl_disable_tcp_nodelay) connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */ listAddNodeTail(server.slaves, c); createReplicationBacklogIfNeeded(); /* Wait for RDB channel to be ready */ task->state = ASM_WAIT_RDBCHANNEL; sds slots_str = slotRangeArrayToString(slots); serverLog(LL_NOTICE, "Migrate task %s created: src=%.40s, dest=%.40s, slots=%s", task->id, task->source, task->dest, slots_str); sdsfree(slots_str); asmNotifyStateChange(task, ASM_EVENT_MIGRATE_STARTED); /* Keep the client in the main thread to avoid data races between the * connWrite call below and the client's event handler in IO threads. */ if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); /* addReply*() is not suitable for clients in SLAVE_STATE_WAIT_RDB_CHANNEL state. */ if (connWrite(c->conn, "+RDBCHANNELSYNCSLOTS\r\n", 22) != 22) freeClientAsync(c); } else if (!strcasecmp(c->argv[2]->ptr, "rdbchannel") && c->argc == 4) { /* CLUSTER SYNCSLOTS RDBCHANNEL */ sds task_id = c->argv[3]->ptr; if (sdslen(task_id) != CLUSTER_NAMELEN) { addReplyError(c, "Invalid task id"); return; } if (listLength(asmManager->tasks) == 0) { addReplyError(c, "No slot migration task in progress"); return; } asmTask *task = listNodeValue(listFirst(asmManager->tasks)); if (task->operation != ASM_MIGRATE || task->state != ASM_WAIT_RDBCHANNEL || strcmp(task->id, task_id) != 0) { addReplyError(c, "Another migration task is already in progress"); return; } if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) { /* Close the main channel client before rdb channel client connects */ if (task->main_channel_client) freeClient(task->main_channel_client); } /* The main channel client must be present when setting RDB channel client */ if (task->main_channel_client == NULL) { /* Maybe the main channel connection is closed. */ addReplyError(c, "Main channel connection is not established"); return; } /* Mark the client as a slave to generate slots snapshot */ c->flags |= (CLIENT_SLAVE | CLIENT_REPL_RDB_CHANNEL | CLIENT_REPL_RDBONLY | CLIENT_ASM_MIGRATING); c->slave_capa |= SLAVE_CAPA_EOF; c->slave_req |= (SLAVE_REQ_SLOTS_SNAPSHOT | SLAVE_REQ_RDB_CHANNEL); c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; c->repldbfd = -1; if (server.repl_disable_tcp_nodelay) connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */ listAddNodeTail(server.slaves, c); /* Wait for bgsave to start for slots sync */ task->state = ASM_WAIT_BGSAVE_START; task->rdb_channel_state = ASM_WAIT_BGSAVE_START; task->rdb_channel_client = c; c->task = task; /* Keep the client in the main thread to avoid data races between the * connWrite call in startBgsaveForReplication and the client's event * handler in IO threads. */ if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); if (!hasActiveChildProcess()) { startBgsaveForReplication(c->slave_capa, c->slave_req); } else { serverLog(LL_NOTICE, "BGSAVE for slots snapshot sync delayed"); } } else if (!strcasecmp(c->argv[2]->ptr, "snapshot-eof") && c->argc == 3) { /* CLUSTER SYNCSLOTS SNAPSHOT-EOF */ clusterSyncSlotsSnapshotEOF(c); } else if (!strcasecmp(c->argv[2]->ptr, "stream-eof") && c->argc == 3) { /* CLUSTER SYNCSLOTS STREAM-EOF */ clusterSyncSlotsStreamEOF(c); } else if (!strcasecmp(c->argv[2]->ptr, "ack") && c->argc == 5) { /* CLUSTER SYNCSLOTS ACK */ long long offset; int dest_state; if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_STREAMING_BUF))) { dest_state = ASM_STREAMING_BUF; } else if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_WAIT_STREAM_EOF))) { dest_state = ASM_WAIT_STREAM_EOF; } else { return; /* Not support now. */ } if ((getLongLongFromObject(c->argv[4], &offset) != C_OK)) return; if (c->task && c->task->operation == ASM_MIGRATE) { /* Update the state and ACKed offset from destination. */ asmTask *task = c->task; task->dest_state = dest_state; if (task->dest_offset > (unsigned long long) offset) { serverLog(LL_WARNING, "CLUSTER SYNCSLOTS ACK received, dest state: %s, " "but offset %lld is less than the current dest offset %lld", asmTaskStateToString(dest_state), offset, task->dest_offset); return; } task->dest_offset = offset; serverLog(LL_DEBUG, "CLUSTER SYNCSLOTS ACK received, dest state: %s, " "updated dest offset to %lld, source offset: %lld", asmTaskStateToString(dest_state), task->dest_offset, task->source_offset); /* Record the time when the destination finishes applying the accumulated buffer */ if (task->dest_state == ASM_WAIT_STREAM_EOF && task->dest_accum_applied_time == 0) task->dest_accum_applied_time = server.mstime; /* Pause write if needed */ if (task->state == ASM_SEND_BULK_AND_STREAM || task->state == ASM_SEND_STREAM) { /* Pause writes on the main channel if the lag is less than the threshold. */ if (task->dest_offset + server.asm_handoff_max_lag_bytes >= task->source_offset) { if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, ASM_HANDOFF_PREP))) return; /* Do not enter handoff prep state for testing buffer drain timeout. */ serverLog(LL_NOTICE, "The applied offset lag %lld is less than the threshold %lld, " "pausing writes for slot handoff", task->source_offset - task->dest_offset, server.asm_handoff_max_lag_bytes); task->state = ASM_HANDOFF_PREP; asmLogTaskEvent(task, ASM_EVENT_HANDOFF_PREP); clusterAsmOnEvent(task->id, ASM_EVENT_HANDOFF_PREP, task->slots); } } } } else if (!strcasecmp(c->argv[2]->ptr, "fail") && c->argc == 4) { /* CLUSTER SYNCSLOTS FAIL */ return; /* This is a no-op, just to handle the command syntax. */ } else if (!strcasecmp(c->argv[2]->ptr, "conf") && c->argc >= 5) { /* CLUSTER SYNCSLOTS CONF