From 5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda Mon Sep 17 00:00:00 2001 From: Mitja Felicijan Date: Wed, 21 Jan 2026 22:40:55 +0100 Subject: Add Redis source code for testing --- examples/redis-unstable/src/cluster_asm.c | 3602 +++++++++++++++++++++++++++++ 1 file changed, 3602 insertions(+) create mode 100644 examples/redis-unstable/src/cluster_asm.c (limited to 'examples/redis-unstable/src/cluster_asm.c') diff --git a/examples/redis-unstable/src/cluster_asm.c b/examples/redis-unstable/src/cluster_asm.c new file mode 100644 index 0000000..a090453 --- /dev/null +++ b/examples/redis-unstable/src/cluster_asm.c @@ -0,0 +1,3602 @@ +/* cluster_asm.c -- Atomic slot migration implementation for cluster + * + * Copyright (c) 2025-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#include "server.h" +#include "cluster.h" +#include "functions.h" +#include "cluster_asm.h" +#include "cluster_slot_stats.h" + +#define ASM_IMPORT (1 << 1) +#define ASM_MIGRATE (1 << 2) + +#define ASM_DEBUG_TRIM_DEFAULT 0 +#define ASM_DEBUG_TRIM_NONE 1 +#define ASM_DEBUG_TRIM_BG 2 +#define ASM_DEBUG_TRIM_ACTIVE 3 + +#define ASM_AOF_MIN_ITEMS_PER_KEY 512 /* Minimum number of items per key to use AOF format encoding */ + +typedef struct asmTask { + sds id; /* Task ID */ + int operation; /* Either ASM_IMPORT or ASM_MIGRATE */ + slotRangeArray *slots; /* List of slot ranges for this migration task */ + int state; /* Current state of the task */ + int dest_state; /* Destination node's main state (approximate) */ + char source[CLUSTER_NAMELEN]; /* Source node name */ + char dest[CLUSTER_NAMELEN]; /* Destination node name */ + clusterNode *source_node; /* Source node */ + connection *main_channel_conn; /* Main channel connection */ + connection *rdb_channel_conn; /* RDB channel connection */ + int rdb_channel_state; /* State of the RDB channel */ + unsigned long long dest_offset; /* Destination offset */ + unsigned long long source_offset; /* Source offset */ + int cross_slot_during_propagating; /* If cross-slot commands are encountered during propagating */ + int stream_eof_during_streaming; /* If STREAM-EOF is received during streaming buffer */ + replDataBuf sync_buffer; /* Buffer for the stream */ + client *main_channel_client; /* Client for the main channel on the source side */ + client *rdb_channel_client; /* Client for the RDB channel on the source side */ + long long retry_count; /* Number of retries for this task */ + mstime_t create_time; /* Task creation time */ + mstime_t start_time; /* Task start time */ + mstime_t end_time; /* Task end time */ + mstime_t paused_time; /* The time when the slot writes were paused */ + mstime_t dest_slots_snapshot_time; /* The time when the destination starts applying the slot snapshot */ + mstime_t dest_accum_applied_time; /* The time when the destination finishes applying the accumulated buffer */ + sds error; /* Error message for this task */ + redisOpArray *pre_snapshot_module_cmds; /* Module commands to be propagated at the beginning of slot migration */ +} asmTask; + +struct asmManager { + list *tasks; /* List of asmTask to be processed */ + list *archived_tasks; /* List of archived asmTask */ + list *pending_trim_jobs; /* List of pending trim jobs (due to write pause) */ + list *active_trim_jobs; /* List of active trim jobs */ + slotRangeArrayIter *active_trim_it; /* Iterator of the current active trim job */ + size_t sync_buffer_peak; /* Peak size of sync buffer */ + asmTask *master_task; /* The task that is currently active on the master */ + + /* Fail point injection for debugging */ + int debug_fail_channel; /* Channel where the task will fail */ + int debug_fail_state; /* State where the task will fail */ + int debug_trim_method; /* Method to trim the buffer */ + int debug_active_trim_delay; /* Sleep before trimming each key */ + + /* Active trim stats */ + unsigned long long active_trim_started; /* Number of times active trim was started */ + unsigned long long active_trim_completed; /* Number of times active trim was completed */ + unsigned long long active_trim_cancelled; /* Number of times active trim was cancelled */ + unsigned long long active_trim_current_job_keys; /* Total number of keys to trim in the current job */ + unsigned long long active_trim_current_job_trimmed; /* Number of keys trimmed in the current job */ +}; + +enum asmState { + /* Common state */ + ASM_NONE = 0, + ASM_CONNECTING, + ASM_AUTH_REPLY, + ASM_CANCELED, + ASM_FAILED, + ASM_COMPLETED, + + /* Import state */ + ASM_SEND_HANDSHAKE, + ASM_HANDSHAKE_REPLY, + ASM_SEND_SYNCSLOTS, + ASM_SYNCSLOTS_REPLY, + ASM_INIT_RDBCHANNEL, + ASM_ACCUMULATE_BUF, + ASM_READY_TO_STREAM, + ASM_STREAMING_BUF, + ASM_WAIT_STREAM_EOF, + ASM_TAKEOVER, + + /* Migrate state */ + ASM_WAIT_RDBCHANNEL, + ASM_WAIT_BGSAVE_START, + ASM_SEND_BULK_AND_STREAM, + ASM_SEND_STREAM, + ASM_HANDOFF_PREP, + ASM_HANDOFF, + ASM_STREAM_EOF, + + /* RDB channel state */ + ASM_RDBCHANNEL_REQUEST, + ASM_RDBCHANNEL_REPLY, + ASM_RDBCHANNEL_TRANSFER, +}; + +enum asmChannel { + ASM_IMPORT_MAIN_CHANNEL = 1, /* Main channel for the import task */ + ASM_IMPORT_RDB_CHANNEL, /* RDB channel for the import task */ + ASM_MIGRATE_MAIN_CHANNEL, /* Main channel for the migrate task */ + ASM_MIGRATE_RDB_CHANNEL /* RDB channel for the migrate task */ +}; + +/* Global ASM manager */ +struct asmManager *asmManager = NULL; + +/* replication.c */ +char *sendCommand(connection *conn, ...); +char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens); +char *receiveSynchronousResponse(connection *conn); +ConnectionType *connTypeOfReplication(void); +int startBgsaveForReplication(int mincapa, int req); +void createReplicationBacklogIfNeeded(void); +/* cluster.c */ +void createDumpPayload(rio *payload, robj *o, robj *key, int dbid, int skip_checksum); +/* cluster_asm.c */ +static void asmStartImportTask(asmTask *task); +static void asmTaskCancel(asmTask *task, const char *reason); +static void asmSyncBufferReadFromConn(connection *conn); +static void propagateTrimSlots(slotRangeArray *slots); +void asmTrimJobSchedule(slotRangeArray *slots); +void asmTrimJobProcessPending(void); +void asmCancelPendingTrimJobs(void); +void asmTriggerActiveTrim(slotRangeArray *slots); +void asmActiveTrimEnd(void); +int asmIsAnyTrimJobOverlaps(slotRangeArray *slots); +void asmTrimSlotsIfNotOwned(slotRangeArray *slots); +void asmNotifyStateChange(asmTask *task, int event); + +void asmInit(void) { + asmManager = zcalloc(sizeof(*asmManager)); + asmManager->tasks = listCreate(); + asmManager->archived_tasks = listCreate(); + asmManager->pending_trim_jobs = listCreate(); + asmManager->sync_buffer_peak = 0; + asmManager->master_task = NULL; + asmManager->debug_fail_channel = -1; + asmManager->debug_fail_state = -1; + asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT; + asmManager->debug_active_trim_delay = 0; + asmManager->active_trim_jobs = listCreate(); + asmManager->active_trim_started = 0; + asmManager->active_trim_completed = 0; + asmManager->active_trim_cancelled = 0; + listSetFreeMethod(asmManager->active_trim_jobs, slotRangeArrayFreeGeneric); +} + +char *asmTaskStateToString(int state) { + switch (state) { + case ASM_NONE: return "none"; + case ASM_CONNECTING: return "connecting"; + case ASM_AUTH_REPLY: return "auth-reply"; + case ASM_CANCELED: return "canceled"; + case ASM_FAILED: return "failed"; + case ASM_COMPLETED: return "completed"; + + /* Import state */ + case ASM_SEND_HANDSHAKE: return "send-handshake"; + case ASM_HANDSHAKE_REPLY: return "handshake-reply"; + case ASM_SEND_SYNCSLOTS: return "send-syncslots"; + case ASM_SYNCSLOTS_REPLY: return "syncslots-reply"; + case ASM_INIT_RDBCHANNEL: return "init-rdbchannel"; + case ASM_ACCUMULATE_BUF: return "accumulate-buffer"; + case ASM_READY_TO_STREAM: return "ready-to-stream"; + case ASM_STREAMING_BUF: return "streaming-buffer"; + case ASM_WAIT_STREAM_EOF: return "wait-stream-eof"; + case ASM_TAKEOVER: return "takeover"; + + /* Migrate state */ + case ASM_WAIT_RDBCHANNEL: return "wait-rdbchannel"; + case ASM_WAIT_BGSAVE_START: return "wait-bgsave-start"; + case ASM_SEND_BULK_AND_STREAM: return "send-bulk-and-stream"; + case ASM_SEND_STREAM: return "send-stream"; + case ASM_HANDOFF_PREP: return "handoff-prep"; + case ASM_HANDOFF: return "handoff"; + case ASM_STREAM_EOF: return "stream-eof"; + + /* RDB channel state */ + case ASM_RDBCHANNEL_REQUEST: return "rdbchannel-request"; + case ASM_RDBCHANNEL_REPLY: return "rdbchannel-reply"; + case ASM_RDBCHANNEL_TRANSFER: return "rdbchannel-transfer"; + + default: return "unknown"; + } + serverAssert(0); /* Unreachable */ +} + +const char *asmChannelToString(int channel) { + switch (channel) { + case ASM_IMPORT_MAIN_CHANNEL: return "import-main-channel"; + case ASM_IMPORT_RDB_CHANNEL: return "import-rdb-channel"; + case ASM_MIGRATE_MAIN_CHANNEL: return "migrate-main-channel"; + case ASM_MIGRATE_RDB_CHANNEL: return "migrate-rdb-channel"; + default: return "unknown"; + } +} + +int asmDebugSetFailPoint(char *channel, char *state) { + if (!asmManager) { + serverLog(LL_WARNING, "ASM manager is not initialized"); + return C_ERR; + } + asmManager->debug_fail_channel = -1; + asmManager->debug_fail_state = -1; + if (!channel && !state) return C_ERR; + if (sdslen(channel) == 0 && sdslen(state) == 0) { + serverLog(LL_WARNING, "ASM fail point is cleared"); + return C_OK; + } + + for (int i = ASM_IMPORT_MAIN_CHANNEL; i <= ASM_MIGRATE_RDB_CHANNEL; i++) { + if (!strcasecmp(channel, asmChannelToString(i))) { + asmManager->debug_fail_channel = i; + break; + } + } + if (asmManager->debug_fail_channel == -1) return C_ERR; + + for (int i = ASM_NONE; i <= ASM_RDBCHANNEL_TRANSFER; i++) { + if (!strcasecmp(state, asmTaskStateToString(i))) { + asmManager->debug_fail_state = i; + break; + } + } + if (asmManager->debug_fail_state == -1) return C_ERR; + + serverLog(LL_NOTICE, "ASM fail point set: channel=%s, state=%s", channel, state); + return C_OK; +} + +int asmDebugSetTrimMethod(const char *method, int active_trim_delay) { + if (!asmManager) { + serverLog(LL_WARNING, "ASM manager is not initialized"); + return C_ERR; + } + int prev = asmManager->debug_trim_method; + if (!strcasecmp(method, "default")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT; + else if (!strcasecmp(method, "none")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_NONE; + else if (!strcasecmp(method, "bg")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_BG; + else if (!strcasecmp(method, "active")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_ACTIVE; + else return C_ERR; + + /* If we are switching from none to default, delete all the keys in the + * slots we don't own */ + if (prev == ASM_DEBUG_TRIM_NONE && asmManager->debug_trim_method != ASM_DEBUG_TRIM_NONE) { + for (int i = 0; i < CLUSTER_SLOTS; i++) + if (!clusterIsMySlot(i)) + clusterDelKeysInSlot(i, 0); + } + asmManager->debug_active_trim_delay = active_trim_delay; + serverLog(LL_NOTICE, "ASM trim method was set=%s, active_trim_delay=%d", method, active_trim_delay); + return C_OK; +} + +int asmDebugIsFailPointActive(int channel, int state) { + if (!asmManager) return 0; /* ASM manager not initialized */ + if (asmManager->debug_fail_channel == channel && asmManager->debug_fail_state == state) { + serverLog(LL_NOTICE, "ASM fail point active: channel=%s, state=%s", + asmChannelToString(channel), asmTaskStateToString(state)); + return 1; + } + return 0; +} + +sds asmCatInfoString(sds info) { + int active_tasks = 0; + + listIter li; + listNode *ln; + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (task->operation == ASM_IMPORT || + (task->operation == ASM_MIGRATE && task->state != ASM_FAILED)) + { + active_tasks++; + } + } + + return sdscatprintf(info ? info : sdsempty(), + "cluster_slot_migration_active_tasks:%d\r\n" + "cluster_slot_migration_active_trim_running:%lu\r\n" + "cluster_slot_migration_active_trim_current_job_keys:%llu\r\n" + "cluster_slot_migration_active_trim_current_job_trimmed:%llu\r\n" + "cluster_slot_migration_stats_active_trim_started:%llu\r\n" + "cluster_slot_migration_stats_active_trim_completed:%llu\r\n" + "cluster_slot_migration_stats_active_trim_cancelled:%llu\r\n", + active_tasks, + listLength(asmManager->active_trim_jobs), + asmManager->active_trim_current_job_keys, + asmManager->active_trim_current_job_trimmed, + asmManager->active_trim_started, + asmManager->active_trim_completed, + asmManager->active_trim_cancelled); +} + +void asmTaskReset(asmTask *task) { + task->state = ASM_NONE; + task->dest_state = ASM_NONE; + task->rdb_channel_state = ASM_NONE; + task->main_channel_conn = NULL; + task->rdb_channel_conn = NULL; + task->dest_offset = 0; + task->source_offset = 0; + task->stream_eof_during_streaming = 0; + task->cross_slot_during_propagating = 0; + replDataBufInit(&task->sync_buffer); + task->main_channel_client = NULL; + task->rdb_channel_client = NULL; + task->paused_time = 0; + task->dest_slots_snapshot_time = 0; + task->dest_accum_applied_time = 0; + task->pre_snapshot_module_cmds = NULL; +} + +asmTask *asmTaskCreate(const char *task_id) { + asmTask *task = zcalloc(sizeof(*task)); + task->error = sdsempty(); + asmTaskReset(task); + task->slots = NULL; + task->source_node = NULL; + task->retry_count = 0; + task->create_time = server.mstime; + task->start_time = -1; + task->end_time = -1; + if (task_id) { + task->id = sdsnew(task_id); + } else { + task->id = sdsnewlen(NULL, CLUSTER_NAMELEN); + getRandomHexChars(task->id, CLUSTER_NAMELEN); + } + + return task; +} + +void asmTaskFree(asmTask *task) { + replDataBufClear(&task->sync_buffer); + sdsfree(task->id); + slotRangeArrayFree(task->slots); + sdsfree(task->error); + zfree(task); +} + +/* Convert the task state to the corresponding event. */ +int asmTaskStateToEvent(asmTask *task) { + if (task->operation == ASM_IMPORT) { + if (task->state == ASM_COMPLETED) return ASM_EVENT_IMPORT_COMPLETED; + else if (task->state == ASM_FAILED) return ASM_EVENT_IMPORT_FAILED; + else return ASM_EVENT_IMPORT_STARTED; + } else { + if (task->state == ASM_COMPLETED) return ASM_EVENT_MIGRATE_COMPLETED; + else if (task->state == ASM_FAILED) return ASM_EVENT_MIGRATE_FAILED; + else return ASM_EVENT_MIGRATE_STARTED; + } +} + +/* Serialize ASM task information into a string for transmission to replicas. + * Format: "task_id:source_node:dest_node:operation:state:slot_ranges" + * Where slot_ranges is in the format "1000-2000 3000-4000 ..." */ +sds asmTaskSerialize(asmTask *task) { + sds serialized = sdsempty(); + + /* Add task ID */ + serialized = sdscatprintf(serialized, "%s:", task->id); + + /* Add source node ID (40 chars) */ + serialized = sdscatlen(serialized, task->source, CLUSTER_NAMELEN); + serialized = sdscat(serialized, ":"); + + /* Add destination node ID (40 chars) */ + serialized = sdscatlen(serialized, task->dest, CLUSTER_NAMELEN); + serialized = sdscat(serialized, ":"); + + /* Add operation type */ + serialized = sdscatprintf(serialized, "%s:", task->operation == ASM_IMPORT ? + "import" : "migrate"); + + /* Add current state */ + serialized = sdscatprintf(serialized, "%s:", asmTaskStateToString(task->state)); + + /* Add slot ranges sds */ + sds slots_str = slotRangeArrayToString(task->slots); + serialized = sdscatprintf(serialized, "%s", slots_str); + sdsfree(slots_str); + + return serialized; +} + +/* Deserialize ASM task information from a string and create a complete asmTask. + * Format: "task_id:source_node:dest_node:operation:state:slot_ranges" + * Returns a new asmTask on success, NULL on failure. */ +asmTask *asmTaskDeserialize(sds data) { + int count, idx = 0; + asmTask *task = NULL; + if (!data || sdslen(data) == 0) return NULL; + + sds *parts = sdssplitlen(data, sdslen(data), ":", 1, &count); + if (count < 6) goto err; + + /* Parse task ID */ + if (sdslen(parts[idx]) == 0) goto err; + task = asmTaskCreate(parts[idx]); + if (!task) goto err; + idx++; + + /* Parse source node ID */ + if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err; + memcpy(task->source, parts[idx], CLUSTER_NAMELEN); + idx++; + + /* Parse destination node ID */ + if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err; + memcpy(task->dest, parts[idx], CLUSTER_NAMELEN); + idx++; + + /* Parse operation type */ + if (!strcasecmp(parts[idx], "import")) { + task->operation = ASM_IMPORT; + } else if (!strcasecmp(parts[idx], "migrate")) { + task->operation = ASM_MIGRATE; + } else { + goto err; + } + idx++; + + /* Parse state */ + task->state = ASM_NONE; /* Default state */ + for (int state = ASM_NONE; state <= ASM_RDBCHANNEL_TRANSFER; state++) { + if (!strcasecmp(parts[idx], asmTaskStateToString(state))) { + task->state = state; + break; + } + } + idx++; + + /* Parse slot ranges */ + task->slots = slotRangeArrayFromString(parts[idx]); + if (!task->slots) goto err; + idx++; + + /* Ignore any extra fields for future compatibility */ + + sdsfreesplitres(parts, count); + return task; + +err: + if (task) asmTaskFree(task); + sdsfreesplitres(parts, count); + return NULL; +} + +/* Notify replicas about ASM task information to maintain consistency during + * slot migration. This function sends a CLUSTER SYNCSLOTS CONF ASM-TASK command + * to all connected replicas with the serialized task information. */ +void asmNotifyReplicasStateChange(struct asmTask *task) { + if (!server.cluster_enabled || !clusterNodeIsMaster(getMyClusterNode())) return; + + /* Create command arguments for CLUSTER SYNCSLOTS CONF ASM-TASK */ + robj *argv[5]; + argv[0] = createStringObject("CLUSTER", 7); + argv[1] = createStringObject("SYNCSLOTS", 9); + argv[2] = createStringObject("CONF", 4); + argv[3] = createStringObject("ASM-TASK", 8); + argv[4] = createObject(OBJ_STRING, asmTaskSerialize(task)); + + /* Send the command to all replicas */ + replicationFeedSlaves(server.slaves, -1, argv, 5); + + /* Clean up command objects */ + for (int i = 0; i < 5; i++) { + decrRefCount(argv[i]); + } +} + +/* Dump the active import ASM task information. */ +sds asmDumpActiveImportTask(void) { + if (!server.cluster_enabled) return NULL; + + /* For replica, dump the master active task. */ + if (clusterNodeIsSlave(getMyClusterNode()) && + asmManager->master_task && + asmManager->master_task->state != ASM_FAILED && + asmManager->master_task->state != ASM_COMPLETED) + { + return asmTaskSerialize(asmManager->master_task); + } + + /* For master, dump the first active task. */ + if (!asmManager || listLength(asmManager->tasks) == 0) return NULL; + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->state == ASM_NONE || task->state == ASM_FAILED || + task->state == ASM_COMPLETED) return NULL; + + return asmTaskSerialize(task); +} + +size_t asmGetPeakSyncBufferSize(void) { + if (!asmManager) return 0; + /* Compute peak sync buffer usage. The current task's peak may not + * reflect in asmManager->sync_buffer_peak immediately. */ + size_t peak = asmManager->sync_buffer_peak; + asmTask *task = listFirst(asmManager->tasks) ? + listNodeValue(listFirst(asmManager->tasks)) : NULL; + if (task && task->operation == ASM_IMPORT) + peak = max(task->sync_buffer.peak, asmManager->sync_buffer_peak); + + return peak; +} + +size_t asmGetImportInputBufferSize(void) { + if (!asmManager || listLength(asmManager->tasks) == 0) return 0; + + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->operation == ASM_IMPORT) + return task->sync_buffer.mem_used; + + return 0; +} + +size_t asmGetMigrateOutputBufferSize(void) { + if (!asmManager || listLength(asmManager->tasks) == 0) return 0; + + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->operation == ASM_MIGRATE && task->main_channel_client) + return getClientOutputBufferMemoryUsage(task->main_channel_client); + + return 0; +} + +/* Returns the ASM task with the given ID, or NULL if no such task exists. */ +static asmTask *asmLookupTaskAt(list *tasks, const char *id) { + listIter li; + listNode *ln; + + listRewind(tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (!strcmp(task->id, id)) return task; + } + return NULL; +} + +/* Returns the ASM task with the given ID, or NULL if no such task exists. */ +asmTask *asmLookupTaskById(const char *id) { + return asmLookupTaskAt(asmManager->tasks, id); +} + +/* Returns the ASM task that is identical to the given slot range array, or NULL + * if no such task exists. */ +asmTask *asmLookupTaskBySlotRangeArray(slotRangeArray *slots) { + listIter li; + listNode *ln; + + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (slotRangeArrayIsEqual(task->slots, slots)) + return task; + } + return NULL; +} + +/* Returns the slot range array for the given task ID */ +slotRangeArray *asmTaskGetSlotRanges(const char *task_id) { + asmTask *task = NULL; + if (!task_id || (task = asmLookupTaskById(task_id)) == NULL) return NULL; + + return task->slots; +} + +/* Returns 1 if the slot range array overlaps with the given slot range. */ +static int slotRangeArrayOverlaps(slotRangeArray *slots, slotRange *req) { + for (int i = 0; i < slots->num_ranges; i++) { + slotRange *sr = &slots->ranges[i]; + if (sr->start <= req->end && sr->end >= req->start) + return 1; + } + return 0; +} + +/* Returns 1 if the two slot range arrays overlap, 0 otherwise. */ +static int slotRangeArraysOverlap(slotRangeArray *slots1, slotRangeArray *slots2) { + for (int i = 0; i < slots1->num_ranges; i++) { + slotRange *sr1 = &slots1->ranges[i]; + if (slotRangeArrayOverlaps(slots2, sr1)) return 1; + } + return 0; +} + +/* Returns the ASM task that overlaps with the given slot range, or NULL if + * no such task exists. */ +static asmTask *lookupAsmTaskBySlotRange(slotRange *req) { + listIter li; + listNode *ln; + + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (slotRangeArrayOverlaps(task->slots, req)) + return task; + } + return NULL; +} + +/* Validates the given slot ranges for a migration task: + * - Ensures the current node is a master. + * - Verifies all slots are in a STABLE state. + * - Confirms all slots belong to a single source node. + * - Confirms no ongoing import task that overlaps with the slot ranges. + * + * Returns the source node if validation succeeds. + * Otherwise, returns NULL and sets 'err' variable. */ +static clusterNode *validateImportSlotRanges(slotRangeArray *slots, sds *err, asmTask *current) { + clusterNode *source = NULL; + + *err = NULL; + + /* Ensure this is a master node */ + if (!clusterNodeIsMaster(getMyClusterNode())) { + *err = sdsnew("slot migration not allowed on replica."); + goto out; + } + + /* Ensure no manual migration is in progress. */ + for (int i = 0; i < CLUSTER_SLOTS; i++) { + if (getImportingSlotSource(i) != NULL || + getMigratingSlotDest(i) != NULL) + { + *err = sdsnew("all slot states must be STABLE to start a slot migration task."); + goto out; + } + } + + for (int i = 0; i < slots->num_ranges; i++) { + slotRange *sr = &slots->ranges[i]; + + /* Ensure no import task overlaps with this slot range. + * Skip check current task that is running for this slot range. */ + asmTask *task = lookupAsmTaskBySlotRange(sr); + if (task && task != current && task->operation == ASM_IMPORT) { + *err = sdscatprintf(sdsempty(), + "overlapping import exists for slot range: %d-%d", + sr->start, sr->end); + goto out; + } + + /* Validate if we can start migration task for this slot range. */ + for (int j = sr->start; j <= sr->end; j++) { + clusterNode *node = getNodeBySlot(j); + if (node == NULL) { + *err = sdscatprintf(sdsempty(), "slot has no owner: %d", j); + goto out; + } + + if (!source) { + source = node; + } else if (source != node) { + *err = sdsnew("slots belong to different source nodes"); + goto out; + } + } + } + +out: + return *err ? NULL : source; +} + +/* Returns 1 if a task with the specified operation is in progress, 0 otherwise. */ +static int asmTaskInProgress(int operation) { + listIter li; + listNode *ln; + + if (!asmManager || listLength(asmManager->tasks) == 0) return 0; + + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (task->operation == operation) return 1; + } + return 0; +} + +/* Returns 1 if a migrate task is in progress, 0 otherwise. */ +int asmMigrateInProgress(void) { + return asmTaskInProgress(ASM_MIGRATE); +} + +/* Returns 1 if an import task is in progress, 0 otherwise. */ +int asmImportInProgress(void) { + return asmTaskInProgress(ASM_IMPORT); +} + +/* Returns 1 if the task is in a state where it can receive replication stream +* for the slot range, 0 otherwise. */ +inline static int asmCanFeedMigrationClient(asmTask *task) { + return task->operation == ASM_MIGRATE && + !task->cross_slot_during_propagating && + (task->state == ASM_SEND_BULK_AND_STREAM || + task->state == ASM_SEND_STREAM || + task->state == ASM_HANDOFF_PREP); +} + +/* Feed the migration client with the replication stream for the slot range. */ +void asmFeedMigrationClient(robj **argv, int argc) { + asmTask *task = NULL; + + if (server.cluster_enabled == 0 || listLength(asmManager->tasks) == 0) + return; + + /* Check if there is a migrate task that can receive replication stream. */ + task = listNodeValue(listFirst(asmManager->tasks)); + if (!asmCanFeedMigrationClient(task)) return; + + /* Ensure all arguments are converted to string encoding if necessary, + * since getSlotFromCommand expects them to be string-encoded. + * Generally the arguments are string-encoded, but we may rewrite + * the command arguments to integer encoding. */ + for (int i = 0; i < argc; i++) { + if (!sdsEncodedObject(argv[i])) { + serverAssert(argv[i]->encoding == OBJ_ENCODING_INT); + robj *old = argv[i]; + argv[i] = createStringObjectFromLongLongWithSds((long)old->ptr); + decrRefCount(old); + } + } + + /* Check if the command belongs to the slot range. */ + struct redisCommand *cmd = lookupCommand(argv, argc); + serverAssert(cmd); + + int slot = getSlotFromCommand(cmd, argv, argc); + + /* If the command does not have keys, skip it now. + * SELECT is not propagated, since we only support a single db in cluster mode. + * MULTI/EXEC is not needed, since transaction semantics are unnecessary + * before the slot handoff. + * FUNCTION subcommands should be executed on all nodes, so here we skip it, + * and even propagating them may cause an error when executing. + * + * NOTICE: if some keyless commands should be propagated to the destination, + * we should identify them here and send. */ + if (slot == INVALID_CLUSTER_SLOT) return; + + /* Generally we reject cross-slot commands before executing, but module may + * replicate this kind of command, so we check again. To guarantee data + * consistency, we cancel the task if we encounter a cross-slot command. */ + if (slot == CLUSTER_CROSSSLOT) { + /* We cannot cancel the task directly here, since it may lead to a recursive + * call: asmTaskCancel() --> moduleFireServerEvent() --> moduleFreeContext() + * --> postExecutionUnitOperations() --> propagateNow(). Even worse, this + * could result in propagating pending commands to the replication stream twice. + * To avoid this, we simply set a flag here, cancel the task in beforeSleep. */ + task->cross_slot_during_propagating = 1; + return; + } + + /* Check if the slot belongs to the task's slot range. */ + slotRange sr = {slot, slot}; + if (!slotRangeArrayOverlaps(task->slots, &sr)) return; + + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) + freeClientAsync(task->main_channel_client); + + /* Feed main channel with the command. */ + client *c = task->main_channel_client; + size_t prev_bytes = getNormalClientPendingReplyBytes(c); + + addReplyArrayLen(c, argc); + for (int i = 0; i < argc; i++) + addReplyBulk(c, argv[i]); + + /* Update the task's source offset to reflect the bytes sent. */ + task->source_offset += (getNormalClientPendingReplyBytes(c) - prev_bytes); +} + +asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *err) { + clusterNode *source; + + *err = NULL; + /* Validate that the slot ranges are valid and that migration can be + * initiated for them. */ + source = validateImportSlotRanges(slots, err, NULL); + if (!source) + goto err; + + if (source == getMyClusterNode()) { + *err = sdsnew("this node is already the owner of the slot range"); + goto err; + } + + /* Only support a single task at a time now. */ + if (listLength(asmManager->tasks) != 0) { + asmTask *current = listNodeValue(listFirst(asmManager->tasks)); + if (current->state == ASM_FAILED) { + /* We can create a new import task only if the current one is failed, + * cancel the failed task to create a new one. */ + asmTaskCancel(current, "new import requested"); + } else { + *err = sdsnew("another ASM task is already in progress"); + goto err; + } + } + /* There should be no task in progress. */ + serverAssert(listLength(asmManager->tasks) == 0); + + /* Create a slot migration task */ + asmTask *task = asmTaskCreate(task_id); + task->slots = slots; + task->state = ASM_NONE; + task->operation = ASM_IMPORT; + task->source_node = source; + memcpy(task->source, clusterNodeGetName(source), CLUSTER_NAMELEN); + memcpy(task->dest, getMyClusterId(), CLUSTER_NAMELEN); + + listAddNodeTail(asmManager->tasks, task); + sds slots_str = slotRangeArrayToString(slots); + serverLog(LL_NOTICE, "Import task %s created: src=%.40s, dest=%.40s, slots=%s", + task->id, task->source, task->dest, slots_str); + sdsfree(slots_str); + + return task; + +err: + slotRangeArrayFree(slots); + return NULL; +} + +/* CLUSTER MIGRATION IMPORT + * + * Sent by operator to the destination node to start the migration. */ +static void clusterMigrationCommandImport(client *c) { + /* Validate slot range arg count */ + int remaining = c->argc - 3; + if (remaining == 0 || remaining % 2 != 0) { + addReplyErrorArity(c); + return; + } + + slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 3); + if (!slots) return; + + sds err = NULL; + asmTask *task = asmCreateImportTask(NULL, slots, &err); + if (!task) { + addReplyErrorSds(c, err); + return; + } + + addReplyBulkCString(c, task->id); +} + +/* CLUSTER MIGRATION CANCEL [ID | ALL] + * - Reply: Number of cancelled tasks + * + * Cancels import tasks that overlap with the specified slot ranges. + * Multiple tasks may be cancelled. */ +static void clusterMigrationCommandCancel(client *c) { + sds task_id = NULL; + int num_cancelled = 0; + + /* Validate slot range arg count */ + if (c->argc != 4 && c->argc != 5) { + addReplyErrorArity(c); + return; + } + + if (!strcasecmp(c->argv[3]->ptr, "id")) { + if (c->argc != 5) { + addReplyErrorArity(c); + return; + } + task_id = c->argv[4]->ptr; + } else if (!strcasecmp(c->argv[3]->ptr, "all")) { + if (c->argc != 4) { + addReplyErrorArity(c); + return; + } + } else { + addReplyError(c, "unknown argument"); + return; + } + + num_cancelled = clusterAsmCancel(task_id, "user request"); + addReplyLongLong(c, num_cancelled); +} + +/* Reply with the status of the task. */ +static void replyTaskStatus(client *c, asmTask *task) { + mstime_t p = 0; + + addReplyMapLen(c, 12); + addReplyBulkCString(c, "id"); + addReplyBulkCString(c, task->id); + addReplyBulkCString(c, "slots"); + addReplyBulkSds(c, slotRangeArrayToString(task->slots)); + addReplyBulkCString(c, "source"); + addReplyBulkCBuffer(c, task->source, CLUSTER_NAMELEN); + addReplyBulkCString(c, "dest"); + addReplyBulkCBuffer(c, task->dest, CLUSTER_NAMELEN); + addReplyBulkCString(c, "operation"); + addReplyBulkCString(c, task->operation == ASM_IMPORT ? "import" : "migrate"); + addReplyBulkCString(c, "state"); + addReplyBulkCString(c, asmTaskStateToString(task->state)); + addReplyBulkCString(c, "last_error"); + addReplyBulkCBuffer(c, task->error, sdslen(task->error)); + addReplyBulkCString(c, "retries"); + addReplyLongLong(c, task->retry_count); + addReplyBulkCString(c, "create_time"); + addReplyLongLong(c, task->create_time); + addReplyBulkCString(c, "start_time"); + addReplyLongLong(c, task->start_time); + addReplyBulkCString(c, "end_time"); + addReplyLongLong(c, task->end_time); + + if (task->operation == ASM_MIGRATE && task->state == ASM_COMPLETED) + p = task->end_time - task->paused_time; + addReplyBulkCString(c, "write_pause_ms"); + addReplyLongLong(c, p); +} + +/* CLUSTER MIGRATION STATUS [ID | ALL] + * - Reply: Array of atomic slot migration tasks */ +static void clusterMigrationCommandStatus(client *c) { + listIter li; + listNode *ln; + + if (c->argc != 4 && c->argc != 5) { + addReplyErrorArity(c); + return; + } + + if (!strcasecmp(c->argv[3]->ptr, "id")) { + if (c->argc != 5) { + addReplyErrorArity(c); + return; + } + sds id = c->argv[4]->ptr; + asmTask *task = asmLookupTaskAt(asmManager->tasks, id); + if (!task) task = asmLookupTaskAt(asmManager->archived_tasks, id); + if (!task) { + addReplyArrayLen(c, 0); + return; + } + + addReplyArrayLen(c, 1); + replyTaskStatus(c, task); + } else if (!strcasecmp(c->argv[3]->ptr, "all")) { + if (c->argc != 4) { + addReplyErrorArity(c); + return; + } + addReplyArrayLen(c, listLength(asmManager->tasks) + + listLength(asmManager->archived_tasks)); + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) + replyTaskStatus(c, listNodeValue(ln)); + + listRewind(asmManager->archived_tasks, &li); + while ((ln = listNext(&li)) != NULL) + replyTaskStatus(c, listNodeValue(ln)); + } else { + addReplyError(c, "unknown argument"); + return; + } +} + +/* CLUSTER MIGRATION + * | + * STATUS [ID | ALL] | + * CANCEL [ID | ALL]> +*/ +void clusterMigrationCommand(client *c) { + if (c->argc < 4) { + addReplyErrorArity(c); + return; + } + + if (strcasecmp(c->argv[2]->ptr, "import") == 0) { + clusterMigrationCommandImport(c); + } else if (strcasecmp(c->argv[2]->ptr, "status") == 0) { + clusterMigrationCommandStatus(c); + } else if (strcasecmp(c->argv[2]->ptr, "cancel") == 0) { + clusterMigrationCommandCancel(c); + } else { + addReplyError(c, "unknown argument"); + } +} + +/* Return the number of keys in the specified slot ranges. */ +unsigned long long asmCountKeysInSlots(slotRangeArray *slots) { + if (!slots) return 0; + + unsigned long long key_count = 0; + for (int i = 0; i < slots->num_ranges; i++) { + for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { + key_count += kvstoreDictSize(server.db[0].keys, j); + } + } + return key_count; +} + +/* Log a human-readable message for ASM task lifecycle events. */ +void asmLogTaskEvent(asmTask *task, int event) { + sds str = slotRangeArrayToString(task->slots); + + switch (event) { + case ASM_EVENT_IMPORT_STARTED: + serverLog(LL_NOTICE, "Import task %s started for slots: %s", task->id, str); + break; + case ASM_EVENT_IMPORT_FAILED: + serverLog(LL_NOTICE, "Import task %s failed for slots: %s", task->id, str); + break; + case ASM_EVENT_TAKEOVER: + serverLog(LL_NOTICE, "Import task %s is ready to takeover slots: %s", task->id, str); + break; + case ASM_EVENT_IMPORT_COMPLETED: + serverLog(LL_NOTICE, "Import task %s completed for slots: %s (imported %llu keys)", + task->id, str, asmCountKeysInSlots(task->slots)); + break; + case ASM_EVENT_MIGRATE_STARTED: + serverLog(LL_NOTICE, "Migrate task %s started for slots: %s (keys at start: %llu)", + task->id, str, asmCountKeysInSlots(task->slots)); + break; + case ASM_EVENT_MIGRATE_FAILED: + serverLog(LL_NOTICE, "Migrate task %s failed for slots: %s", task->id, str); + break; + case ASM_EVENT_HANDOFF_PREP: + serverLog(LL_NOTICE, "Migrate task %s preparing to handoff for slots: %s", task->id, str); + break; + case ASM_EVENT_MIGRATE_COMPLETED: + serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s (migrated %llu keys)", + task->id, str, asmCountKeysInSlots(task->slots)); + break; + default: + break; + } + + sdsfree(str); +} + +/* Notify the state change to the module and the cluster implementation. */ +void asmNotifyStateChange(asmTask *task, int event) { + RedisModuleClusterSlotMigrationInfo info = { + .version = REDISMODULE_CLUSTER_SLOT_MIGRATION_INFO_VERSION, + .task_id = task->id, + .slots = (RedisModuleSlotRangeArray *) task->slots + }; + memcpy(info.source_node_id, task->source, CLUSTER_NAMELEN); + memcpy(info.destination_node_id, task->dest, CLUSTER_NAMELEN); + + int module_event = -1; + if (event == ASM_EVENT_IMPORT_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_STARTED; + else if (event == ASM_EVENT_IMPORT_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_COMPLETED; + else if (event == ASM_EVENT_IMPORT_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_FAILED; + else if (event == ASM_EVENT_MIGRATE_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_STARTED; + else if (event == ASM_EVENT_MIGRATE_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED; + else if (event == ASM_EVENT_MIGRATE_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_FAILED; + serverAssert(module_event != -1); + + moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION, module_event, &info); + serverLog(LL_DEBUG, "Fire cluster asm module event, task %s: state=%s", + task->id, asmTaskStateToString(task->state)); + + if (clusterNodeIsMaster(getMyClusterNode())) { + /* Notify the cluster impl only if it is a real active import task. */ + if (task != asmManager->master_task) { + asmLogTaskEvent(task, event); + clusterAsmOnEvent(task->id, event, task->slots); + } + asmNotifyReplicasStateChange(task); /* Propagate state change to replicas */ + } +} + +void asmImportSetFailed(asmTask *task) { + serverAssert(task->operation == ASM_IMPORT); + if (task->state == ASM_FAILED) return; + + /* If we are in the RDB channel transfer state, we need to + * close the client that was created for the RDB channel. */ + if (task->rdb_channel_conn && task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER) { + client *c = connGetPrivateData(task->rdb_channel_conn); + serverAssert(c->task == task); + task->rdb_channel_conn = NULL; + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + } + + /* If in the wait stream EOF or streaming buffer state, we need to close the + * client that was created for the main channel. */ + if (task->main_channel_conn && + (task->state == ASM_STREAMING_BUF || task->state == ASM_WAIT_STREAM_EOF)) + { + client *c = connGetPrivateData(task->main_channel_conn); + serverAssert(c->task == task); + task->main_channel_conn = NULL; + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + } + + /* Close the connections */ + if (task->rdb_channel_conn) connClose(task->rdb_channel_conn); + if (task->main_channel_conn) connClose(task->main_channel_conn); + task->rdb_channel_conn = NULL; + task->main_channel_conn = NULL; + + /* Clear the replication data buffer */ + asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak, task->sync_buffer.peak); + replDataBufClear(&task->sync_buffer); + + /* Mark the task as failed and notify the cluster */ + task->state = ASM_FAILED; + asmNotifyStateChange(task, ASM_EVENT_IMPORT_FAILED); + /* This node may become replica, only master can setup new slot trimming jobs. */ + if (clusterNodeIsMaster(getMyClusterNode())) + asmTrimJobSchedule(task->slots); +} + +void asmMigrateSetFailed(asmTask *task) { + serverAssert(task->operation == ASM_MIGRATE); + if (task->state == ASM_FAILED) return; + + /* Close the RDB and main channel clients*/ + if (task->rdb_channel_client) { + task->rdb_channel_client->task = NULL; + freeClientAsync(task->rdb_channel_client); + task->rdb_channel_client = NULL; + } + if (task->main_channel_client) { + task->main_channel_client->task = NULL; + freeClientAsync(task->main_channel_client); + task->main_channel_client = NULL; + } + + /* Actually it is not necessary to clear the sync buffer here, + * to make asmTaskReset work properly after migrate task failed */ + replDataBufClear(&task->sync_buffer); + + /* Mark the task as failed and notify the cluster */ + task->state = ASM_FAILED; + asmNotifyStateChange(task, ASM_EVENT_MIGRATE_FAILED); +} + +void asmTaskSetFailed(asmTask *task, const char *fmt, ...) { + va_list ap; + sds error = sdsempty(); + + /* Set the error message */ + va_start(ap, fmt); + error = sdscatvprintf(error, fmt, ap); + va_end(ap); + error = sdscatprintf(error, " (state: %s, rdb_channel_state: %s)", + asmTaskStateToString(task->state), + asmTaskStateToString(task->rdb_channel_state)); + sdsfree(task->error); + task->error = error; + + /* Log the error */ + sds slots_str = slotRangeArrayToString(task->slots); + serverLog(LL_WARNING, "%s task %s failed: slots=%s, err=%s", + task->operation == ASM_IMPORT ? "Import" : "Migrate", + task->id, slots_str, task->error); + sdsfree(slots_str); + + if (task->operation == ASM_IMPORT) + asmImportSetFailed(task); + else + asmMigrateSetFailed(task); +} + +/* The task is completed or canceled. Update stats and move it to + * the archived list. */ +void asmTaskFinalize(asmTask *task) { + listNode *ln = listFirst(asmManager->tasks); + serverAssert(ln->value == task); + + task->source_node = NULL; /* Should never access it */ + task->end_time = server.mstime; + + if (task->operation == ASM_IMPORT) { + asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak, + task->sync_buffer.peak); + replDataBufClear(&task->sync_buffer); /* Not used, so save memory */ + } + + /* Move the task to the archived list */ + listUnlinkNode(asmManager->tasks, ln); + listLinkNodeHead(asmManager->archived_tasks, ln); +} + +static void asmTaskCancel(asmTask *task, const char *reason) { + if (task->state == ASM_CANCELED) return; + + asmTaskSetFailed(task, "Cancelled due to %s", reason); + task->state = ASM_CANCELED; + asmTaskFinalize(task); +} + +void asmImportTakeover(asmTask *task) { + serverAssert(task->state == ASM_WAIT_STREAM_EOF || + task->state == ASM_STREAMING_BUF); + + /* Free the main channel connection since it is no longer needed. */ + serverAssert(task->main_channel_conn != NULL); + client *c = connGetPrivateData(task->main_channel_conn); + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + task->main_channel_conn = NULL; + + task->state = ASM_TAKEOVER; + asmLogTaskEvent(task, ASM_EVENT_TAKEOVER); + clusterAsmOnEvent(task->id, ASM_EVENT_TAKEOVER, task->slots); +} + +void asmCallbackOnFreeClient(client *c) { + asmTask *task = c->task; + if (!task) return; + + /* If the RDB channel connection is closed, mark the task as failed. */ + if (c->conn && task->rdb_channel_conn == c->conn) { + /* We create the client only when transferring data on the RDB channel */ + serverAssert(task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER); + task->rdb_channel_conn = NULL; /* Will be freed by freeClient */ + c->flags &= ~CLIENT_MASTER; + asmTaskSetFailed(task, "RDB channel - Connection is closed"); + return; + } + + if (c->conn && task->main_channel_conn == c->conn) { + /* After or in the process of streaming buffer to DB, a client will be + * created based on the main channel connection. */ + serverAssert(task->state == ASM_STREAMING_BUF || + task->state == ASM_WAIT_STREAM_EOF); + task->main_channel_conn = NULL; /* Will be freed by freeClient */ + c->flags &= ~CLIENT_MASTER; + asmTaskSetFailed(task, "Main channel - Connection is closed"); + return; + } + + if (c == task->rdb_channel_client) { + /* TODO: Detect whether the bgsave is completed successfully and + * update the state properly. */ + task->rdb_channel_state = ASM_COMPLETED; + /* We may not have detected whether the child process has exited yet, + * so we can't determine whether the client has completed the slots + * snapshot transfer. If the RDB channel is interrupted unexpectedly, + * the destination side will also close the main channel. + * So here we just reset the RDB channel client of task. */ + task->rdb_channel_client = NULL; + return; + } + + /* If the main channel client is closed, we need to mark the task as failed + * and clean up the RDB channel client if it exists. */ + if (c == task->main_channel_client) { + task->main_channel_client = NULL; + /* The rdb channel client will be cleaned up */ + asmTaskSetFailed(task, "Main and RDB channel clients are disconnected."); + return; + } +} + +/* Sends an AUTH command to the source node using the internal secret. + * Returns an error string if the command fails, or NULL on success. */ +char *asmSendInternalAuth(connection *conn) { + size_t len = 0; + const char *internal_secret = clusterGetSecret(&len); + serverAssert(internal_secret != NULL); + + sds secret = sdsnewlen(internal_secret, len); + char *err = sendCommand(conn, "AUTH", "internal connection", secret, NULL); + sdsfree(secret); + return err; +} + +/* Handles the RDB channel sync with the source node. + * This function is called when the RDB channel is established + * and ready to sync with the source node. */ +void asmRdbChannelSyncWithSource(connection *conn) { + asmTask *task = connGetPrivateData(conn); + char *err = NULL; + sds task_error_msg = NULL; + + /* Check for errors in the socket: after a non blocking connect() we + * may find that the socket is in error state. */ + if (connGetState(conn) != CONN_STATE_CONNECTED) + goto error; + + /* Check if the task is in a fail point state */ + if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) { + char buf[1]; + /* Simulate a failure by shutting down the connection. On some operating + * systems (e.g. Linux), the socket's receive buffer is not flushed + * immediately, so we issue a dummy read to drain any pending data and + * surface the error condition. + * using shutdown() instead of connShutdown() because connTLSShutdown() + * will free the connection directly, which is not what we want. */ + shutdown(conn->fd, SHUT_RDWR); + connRead(conn, buf, 1); + } + + if (task->rdb_channel_state == ASM_CONNECTING) { + connSetReadHandler(conn, asmRdbChannelSyncWithSource); + connSetWriteHandler(conn, NULL); + + /* Send AUTH command to source node using internal auth */ + err = asmSendInternalAuth(conn); + if (err) goto write_error; + task->rdb_channel_state = ASM_AUTH_REPLY; + return; + } + + if (task->rdb_channel_state == ASM_AUTH_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+OK` reply */ + if (!strcmp(err, "+OK")) { + sdsfree(err); + err = NULL; + task->rdb_channel_state = ASM_RDBCHANNEL_REQUEST; + serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots rdb channel operation can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to AUTH from source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->rdb_channel_state == ASM_RDBCHANNEL_REQUEST) { + err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "RDBCHANNEL", task->id, NULL); + if (err) goto write_error; + task->rdb_channel_state = ASM_RDBCHANNEL_REPLY; + return; + } + + if (task->rdb_channel_state == ASM_RDBCHANNEL_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Ignore ‘\n' sent from the source node to keep the connection alive. */ + if (sdslen(err) == 0) { + serverLog(LL_DEBUG, "Received an empty line in RDBCHANNEL reply, slots snapshot delivery will start later"); + sdsfree(err); + return; + } + + /* Check `+SLOTSSNAPSHOT` reply */ + if (!strncmp(err, "+SLOTSSNAPSHOT", strlen("+SLOTSSNAPSHOT"))) { + sdsfree(err); + err = NULL; + task->state = ASM_ACCUMULATE_BUF; + /* The main channel buffers pending commands. */ + connSetReadHandler(task->main_channel_conn, asmSyncBufferReadFromConn); + + task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER; + client *c = createClient(conn); + c->flags |= (CLIENT_MASTER | CLIENT_INTERNAL | CLIENT_ASM_IMPORTING); + c->querybuf = sdsempty(); + c->authenticated = 1; + c->user = NULL; + c->task = task; + serverLog(LL_NOTICE, + "Source node replied to SLOTSSNAPSHOT, syncing slots snapshot can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to CLUSTER SYNCSLOTS RDBCHANNEL from the source: %s", err); + sdsfree(err); + goto error; + } + return; + } + return; + +no_response_error: + task_error_msg = sdsnew("Source node did not respond to command during RDBCHANNELSYNCSLOTS handshake"); + /* Fall through to regular error handling */ + +error: + asmTaskSetFailed(task, "RDB channel - Failed to sync with the source node: %s", + task_error_msg ? task_error_msg : connGetLastError(conn)); + sdsfree(task_error_msg); + return; + +write_error: /* Handle sendCommand() errors. */ + task_error_msg = sdscatprintf(sdsempty(), "Failed to send command to the source node: %s", err); + sdsfree(err); + goto error; +} + +char *asmSendSlotRangesSync(connection *conn, asmTask *task) { + /* Prepare CLUSTER SYNCSLOTS SYNC command */ + serverAssert(task->slots->num_ranges <= CLUSTER_SLOTS); + int argc = task->slots->num_ranges * 2 + 4; + char **args = zcalloc(sizeof(char*) * argc); + size_t *lens = zcalloc(sizeof(size_t) * argc); + + args[0] = "CLUSTER"; + args[1] = "SYNCSLOTS"; + args[2] = "SYNC"; + args[3] = task->id; + lens[0] = strlen("CLUSTER"); + lens[1] = strlen("SYNCSLOTS"); + lens[2] = strlen("SYNC"); + lens[3] = sdslen(task->id); + + int i = 4; + for (int j = 0; j < task->slots->num_ranges; j++) { + slotRange *sr = &task->slots->ranges[j]; + args[i] = sdscatprintf(sdsempty(), "%d", sr->start); + lens[i] = sdslen(args[i]); + args[i+1] = sdscatprintf(sdsempty(), "%d", sr->end); + lens[i+1] = sdslen(args[i+1]); + i += 2; + } + serverAssert(i == argc); + + /* Send command to source node */ + char *err = sendCommandArgv(conn, argc, args, lens); + + /* Free allocated memory */ + for (int j = 4; j < argc; j++) { + sdsfree(args[j]); + } + zfree(args); + zfree(lens); + + return err; +} + +void asmSyncWithSource(connection *conn) { + asmTask *task = connGetPrivateData(conn); + char *err = NULL; + + /* Some task errors are not network issues, we record them explicitly. */ + sds task_error_msg = NULL; + + /* Check for errors in the socket: after a non blocking connect() we + * may find that the socket is in error state. */ + if (connGetState(conn) != CONN_STATE_CONNECTED) + goto error; + + /* Check if the fail point is active for this channel and state */ + if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_MAIN_CHANNEL, task->state))) { + char buf[1]; + shutdown(conn->fd, SHUT_RDWR); + connRead(conn, buf, 1); + } + + if (task->state == ASM_CONNECTING) { + connSetReadHandler(conn, asmSyncWithSource); + connSetWriteHandler(conn, NULL); + /* Send AUTH command to source node using internal auth */ + err = asmSendInternalAuth(conn); + if (err) goto write_error; + task->state = ASM_AUTH_REPLY; + return; + } + + if (task->state == ASM_AUTH_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+OK` reply */ + if (!strcmp(err, "+OK")) { + sdsfree(err); + err = NULL; + task->state = ASM_SEND_HANDSHAKE; + serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to AUTH from the source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->state == ASM_SEND_HANDSHAKE) { + sds node_id = sdsnewlen(clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN); + err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "CONF", "NODE-ID", node_id, NULL); + sdsfree(node_id); + if (err) goto write_error; + task->state = ASM_HANDSHAKE_REPLY; + return; + } + + if (task->state == ASM_HANDSHAKE_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+OK` reply */ + if (!strcmp(err, "+OK")) { + sdsfree(err); + err = NULL; + task->state = ASM_SEND_SYNCSLOTS; + serverLog(LL_NOTICE, "Source node replied to SYNCSLOTS CONF command, syncslots can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to CLUSTER SYNCSLOTS CONF from the source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->state == ASM_SEND_SYNCSLOTS) { + err = asmSendSlotRangesSync(conn, task); + if (err) goto write_error; + + task->state = ASM_SYNCSLOTS_REPLY; + return; + } + + if (task->state == ASM_SYNCSLOTS_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+RDBCHANNELSYNCSLOTS` reply */ + if (!strncmp(err, "+RDBCHANNELSYNCSLOTS", strlen("+RDBCHANNELSYNCSLOTS"))) { + sdsfree(err); + err = NULL; + task->state = ASM_INIT_RDBCHANNEL; + serverLog(LL_NOTICE, + "Source node replied to SYNCSLOTS SYNC, syncslots can continue..."); + } else if (!strncmp(err, "-NOTREADY", strlen("-NOTREADY"))) { + /* The source-side cluster is temporarily not ready to start a + * migration and replied -NOTREADY. We could fail this attempt and + * let the import task start another attempt later but that could + * trigger unnecessary cleanup in the cluster implementation. + * Instead, we'll retry sending SYNCSLOTS later in asmCron(). */ + sdsfree(err); + task->state = ASM_SEND_SYNCSLOTS; + serverLog(LL_NOTICE, + "Source node replied to SYNCSLOTS SYNC with -NOTREADY, will retry later..."); + return; + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to CLUSTER SYNCSLOTS SYNC from the source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->state == ASM_INIT_RDBCHANNEL) { + /* Create RDB channel connection */ + char *ip = clusterNodeIp(task->source_node); + int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) : + clusterNodeTcpPort(task->source_node); + task->rdb_channel_conn = connCreate(server.el, connTypeOfReplication()); + if (connConnect(task->rdb_channel_conn, ip, port, + server.bind_source_addr, asmRdbChannelSyncWithSource) == C_ERR) + { + serverLog(LL_WARNING, "Unable to connect to the source node: %s", + connGetLastError(task->rdb_channel_conn)); + goto error; + } + task->rdb_channel_state = ASM_CONNECTING; + connSetPrivateData(task->rdb_channel_conn, task); + serverLog(LL_NOTICE, + "RDB channel connection to source node %.40s established, waiting for AUTH reply...", + task->source); + + /* Main channel waits for the new event */ + connSetReadHandler(conn, NULL); + return; + } + return; + +no_response_error: + serverLog(LL_WARNING, "Source node did not respond to command during SYNCSLOTS handshake"); + /* Fall through to regular error handling */ + +error: + asmTaskSetFailed(task, "Main channel - Failed to sync with source node: %s", + task_error_msg ? task_error_msg : connGetLastError(conn)); + sdsfree(task_error_msg); + return; + +write_error: /* Handle sendCommand() errors. */ + serverLog(LL_WARNING, "Failed to send command to source node: %s", err); + sdsfree(err); + goto error; +} + +int asmImportSendACK(asmTask *task) { + serverAssert(task->operation == ASM_IMPORT && task->state == ASM_WAIT_STREAM_EOF); + serverLog(LL_DEBUG, "Destination node applied offset is %lld", task->dest_offset); + + char offset[64]; + ull2string(offset, sizeof(offset), task->dest_offset); + + char *err = sendCommand(task->main_channel_conn, "CLUSTER", "SYNCSLOTS", "ACK", + asmTaskStateToString(task->state), offset, NULL); + if (err) { + asmTaskSetFailed(task, "Main channel - Failed to send ACK: %s", err); + sdsfree(err); + return C_ERR; + } + return C_OK; +} + +/* Called when the RDB channel begins sending the snapshot. + * From this point on, the main channel also starts sending incremental streams. */ +void asmSlotSnapshotAndStreamStart(struct asmTask *task) { + if (task == NULL || task->state != ASM_WAIT_BGSAVE_START) return; + + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_RDB_CHANNEL, task->state))) { + shutdown(task->rdb_channel_client->conn->fd, SHUT_RDWR); + return; + } + task->main_channel_client->replstate = SLAVE_STATE_SEND_BULK_AND_STREAM; + + task->state = ASM_SEND_BULK_AND_STREAM; + task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER; + + /* From the source node's perspective, the destination node begins to accumulate + * the buffer while the RDB channel starts applying the slot snapshot data. */ + task->dest_state = ASM_ACCUMULATE_BUF; + task->dest_slots_snapshot_time = server.mstime; +} + +/* Called when the RDB channel has succeeded in sending the snapshot. */ +void asmSlotSnapshotSucceed(struct asmTask *task) { + if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; + + /* The destination starts sending ACKs to keep the main channel alive after + * receiving the snapshot, so here we need to update the last interaction + * time to avoid false timeout. */ + task->main_channel_client->lastinteraction = server.unixtime; + + task->state = ASM_SEND_STREAM; + task->rdb_channel_state = ASM_COMPLETED; +} + +/* Called when the RDB channel fails to send the snapshot. */ +void asmSlotSnapshotFailed(struct asmTask *task) { + if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; + + asmTaskSetFailed(task, "RDB channel - Failed to send slots snapshot"); +} + +/* CLUSTER SYNCSLOTS SNAPSHOT-EOF + * + * This command is sent by the source node to the destination node to indicate + * that the slots snapshot has ended. */ +void clusterSyncSlotsSnapshotEOF(client *c) { + /* This client is RDB channel connection. */ + asmTask *task = c->task; + if (!task || task->rdb_channel_state != ASM_RDBCHANNEL_TRANSFER || + c->conn != task->rdb_channel_conn) + { + /* Unexpected SNAPSHOT-EOF command */ + serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS SNAPSHOT-EOF command: " + "rdb_channel_state=%s", + asmTaskStateToString(task ? task->rdb_channel_state : ASM_NONE)); + freeClientAsync(c); + return; + } + + /* RDB channel state: ASM_RDBCHANNEL_TRANSFER */ + if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) { + freeClientAsync(c); /* Simulate a failure */ + return; + } + + /* Clear the RDB channel connection */ + task->rdb_channel_conn = NULL; + task->rdb_channel_state = ASM_COMPLETED; + serverLog(LL_NOTICE, "RDB channel snapshot transfer completed for the import task."); + + /* Free the RDB channel connection. */ + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + + /* Will start streaming the buffer to DB, don't start here since now + * we are in the context of executing command, otherwise, redis will + * generate a big MULTI-EXEC including all the commands in the buffer. + * just update the state here, and do it in beforeSleep(). */ + task->state = ASM_READY_TO_STREAM; + connSetReadHandler(task->main_channel_conn, NULL); +} + +/* CLUSTER SYNCSLOTS STREAM-EOF + * + * This command is sent by the source node to the destination node to indicate + * that the slot sync stream has ended and the slots can be handed off. */ +void clusterSyncSlotsStreamEOF(client *c) { + asmTask *task = c->task; + + if (!task || task->operation != ASM_IMPORT) { + serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF command"); + freeClientAsync(c); + return; + } + + if (task->state == ASM_STREAMING_BUF) { + /* We are still streaming the buffer to DB, mark the EOF received, and we + * can take over after streaming is EOF. Since we may release the context + * in asmImportTakeover, this breaks the context for streaming buffer. */ + task->stream_eof_during_streaming = 1; + serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received during streaming buffer"); + return; + } + + if (task->state != ASM_WAIT_STREAM_EOF) { + serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF state: %s", + asmTaskStateToString(task->state)); + freeClientAsync(c); + return; + } + serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received when waiting for STREAM-EOF"); + + /* STREAM-EOF received, the source is ready to handoff, takeover now. */ + asmImportTakeover(task); +} + +/* Start the import task. */ +static void asmStartImportTask(asmTask *task) { + if (task->operation != ASM_IMPORT || task->state != ASM_NONE) return; + sds slots_str = slotRangeArrayToString(task->slots); + + /* Sanity check: Clean up any keys that exist in slots not owned by this node. + * This handles cases where users previously migrated slots using legacy method + * but left behind orphaned keys, or maybe cluster missed cleaning up during + * previous operations, which could interfere with the ASM import process. */ + asmTrimSlotsIfNotOwned(task->slots); + + /* Check if there is any trim job in progress for the slot ranges. + * We can't start the import task since the trim job will modify the data.*/ + int trim_in_progress = asmIsAnyTrimJobOverlaps(task->slots); + + /* Notify the cluster implementation to prepare for the import task. */ + int impl_ret = clusterAsmOnEvent(task->id, ASM_EVENT_IMPORT_PREP, task->slots); + + /* We do not start the import task if trim is disabled by module. */ + int disabled_by_module = server.cluster_module_trim_disablers > 0; + + static int start_blocked_logged = 0; + /* Cannot start import task since pause action is performed. Otherwise, we + * will break the promise that no writes are performed during the pause. */ + if (isPausedActions(PAUSE_ACTION_CLIENT_ALL) || + isPausedActions(PAUSE_ACTION_CLIENT_WRITE) || + trim_in_progress || + impl_ret != C_OK || + disabled_by_module) + { + const char *reason = disabled_by_module ? "trim is disabled by module" : + impl_ret != C_OK ? "cluster is not ready" : + trim_in_progress ? "trim in progress for some of the slots" : + "server paused"; + if (start_blocked_logged == 0) { + serverLog(LL_WARNING, "Can not start import task %s for slots: %s due to %s", + task->id, slots_str, reason); + start_blocked_logged = 1; + } + sdsfree(slots_str); + return; + } + start_blocked_logged = 0; /* Reset the log flag */ + + /* Detect if the cluster topology is changed. We should cancel the task if + * we can not schedule it, and update the source node if needed. */ + sds err = NULL; + clusterNode *source = validateImportSlotRanges(task->slots, &err, task); + if (!source) { + asmTaskCancel(task, err); + sdsfree(slots_str); + sdsfree(err); + return; + } + /* Now I'm the owner of the slot range, cancel the import task. */ + if (source == getMyClusterNode()) { + asmTaskCancel(task, "slots owned by myself now"); + sdsfree(slots_str); + return; + } + /* Change the source node if needed. */ + if (source != task->source_node) { + task->source_node = source; + memcpy(task->source, clusterNodeGetName(source), CLUSTER_NAMELEN); + serverLog(LL_NOTICE, "Import task %s source node changed: slots=%s, " + "new_source=%.40s", task->id, slots_str, clusterNodeGetName(source)); + } + sdsfree(slots_str); + + task->state = ASM_CONNECTING; + task->start_time = server.mstime; + asmNotifyStateChange(task, ASM_EVENT_IMPORT_STARTED); + + task->main_channel_conn = connCreate(server.el, connTypeOfReplication()); + char *ip = clusterNodeIp(task->source_node); + int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) : + clusterNodeTcpPort(task->source_node); + if (connConnect(task->main_channel_conn, ip, port, server.bind_source_addr, + asmSyncWithSource) == C_ERR) + { + asmTaskSetFailed(task, "Main channel - Failed to connect to source node: %s", + connGetLastError(task->main_channel_conn)); + return; + } + connSetPrivateData(task->main_channel_conn, task); +} + +void clusterSyncSlotsCommand(client *c) { + /* Only internal clients are allowed to execute this command to avoid + * potential attack, since some state changes are not well protected, + * external clients may damage the slot migration state. */ + if (!(c->flags & (CLIENT_INTERNAL | CLIENT_MASTER))) { + addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for internal clients"); + c->flags |= CLIENT_CLOSE_AFTER_REPLY; + return; + } + + /* On replica, only allow master client to execute CONF subcommand. */ + if (!clusterNodeIsMaster(getMyClusterNode())) { + if (!(c->flags & CLIENT_MASTER)) { + /* Not master client, reject all subcommands and close the connection. */ + addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for master"); + c->flags |= CLIENT_CLOSE_AFTER_REPLY; + return; + } else { + /* Only allow CONF subcommand on replica. */ + if (strcasecmp(c->argv[2]->ptr, "conf")) return; + } + } + + if (!strcasecmp(c->argv[2]->ptr, "sync") && c->argc >= 6) { + /* CLUSTER SYNCSLOTS SYNC [ ] */ + if (c->argc % 2 == 1) { + addReplyErrorArity(c); + return; + } + + slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 4); + if (!slots) return; + + /* Validate that the slot ranges are valid and that migration can be + * initiated for them. */ + sds err = NULL; + clusterNode *source = validateImportSlotRanges(slots, &err, NULL); + if (!source) { + addReplyErrorSds(c, err); + slotRangeArrayFree(slots); + return; + } + + /* Check if the source node is the same as the current node. */ + if (source != getMyClusterNode()) { + addReplyError(c, "This node is not the owner of the slots"); + slotRangeArrayFree(slots); + return; + } + + /* Verify the destination node is known and is a master. */ + if (c->node_id) { + clusterNode *dest = clusterLookupNode(c->node_id, CLUSTER_NAMELEN); + if (dest == NULL || !clusterNodeIsMaster(dest)) { + addReplyErrorFormat(c, "Destination node %.40s is not a master", c->node_id); + slotRangeArrayFree(slots); + return; + } + } + + sds task_id = c->argv[3]->ptr; + /* Notify the cluster implementation to prepare for the migrate task. */ + if (clusterAsmOnEvent(task_id, ASM_EVENT_MIGRATE_PREP, slots) != C_OK || + asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, ASM_NONE)) + { + addReplyError(c, "-NOTREADY Cluster is not ready to migrate slots"); + slotRangeArrayFree(slots); + return; + } + + /* We do not start the migrate task if trim is disabled by module. */ + int disabled_by_module = server.cluster_module_trim_disablers > 0; + if (disabled_by_module) { + addReplyError(c, "Trim is disabled by module"); + slotRangeArrayFree(slots); + return; + } + + asmTask *task = listLength(asmManager->tasks) == 0 ? NULL : + listNodeValue(listFirst(asmManager->tasks)); + if (task && !strcmp(task->id, task_id) && + task->operation == ASM_MIGRATE && task->state == ASM_FAILED && + slotRangeArrayIsEqual(slots, task->slots) && + memcmp(task->dest, c->node_id, CLUSTER_NAMELEN) == 0) + { + /* Reuse the failed task */ + asmTaskReset(task); + slotRangeArrayFree(task->slots); /* Will be set again later */ + task->retry_count++; + } else if (task) { + if (task->state == ASM_FAILED) { + /* We can create a new migrate task only if the current one is + * failed, cancel the failed task to create a new one. */ + asmTaskCancel(task, "new migration requested"); + task = NULL; + } else { + addReplyError(c, "Another ASM task is already in progress"); + slotRangeArrayFree(slots); + return; + } + } + + /* Create the migrate slots task and add it to the list, + * otherwise reuse the existing one */ + if (task == NULL) { + task = asmTaskCreate(task_id); + task->start_time = server.mstime; /* Start immediately */ + serverAssert(listLength(asmManager->tasks) == 0); + listAddNodeTail(asmManager->tasks, task); + } + + task->slots = slots; + task->operation = ASM_MIGRATE; + memcpy(task->source, clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN); + if (c->node_id) memcpy(task->dest, c->node_id, CLUSTER_NAMELEN); + + task->main_channel_client = c; + c->task = task; + + /* We mark the main channel client as a replica, so this client is limited + * by the client output buffer settings for replicas. The replstate has + * no real significance, just to prevent it from going online. */ + c->flags |= (CLIENT_SLAVE | CLIENT_ASM_MIGRATING); + c->replstate = SLAVE_STATE_WAIT_RDB_CHANNEL; + if (server.repl_disable_tcp_nodelay) + connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */ + listAddNodeTail(server.slaves, c); + createReplicationBacklogIfNeeded(); + + /* Wait for RDB channel to be ready */ + task->state = ASM_WAIT_RDBCHANNEL; + + sds slots_str = slotRangeArrayToString(slots); + serverLog(LL_NOTICE, "Migrate task %s created: src=%.40s, dest=%.40s, slots=%s", + task->id, task->source, task->dest, slots_str); + sdsfree(slots_str); + + asmNotifyStateChange(task, ASM_EVENT_MIGRATE_STARTED); + + /* Keep the client in the main thread to avoid data races between the + * connWrite call below and the client's event handler in IO threads. */ + if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); + + /* addReply*() is not suitable for clients in SLAVE_STATE_WAIT_RDB_CHANNEL state. */ + if (connWrite(c->conn, "+RDBCHANNELSYNCSLOTS\r\n", 22) != 22) + freeClientAsync(c); + } else if (!strcasecmp(c->argv[2]->ptr, "rdbchannel") && c->argc == 4) { + /* CLUSTER SYNCSLOTS RDBCHANNEL */ + sds task_id = c->argv[3]->ptr; + if (sdslen(task_id) != CLUSTER_NAMELEN) { + addReplyError(c, "Invalid task id"); + return; + } + + if (listLength(asmManager->tasks) == 0) { + addReplyError(c, "No slot migration task in progress"); + return; + } + + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->operation != ASM_MIGRATE || task->state != ASM_WAIT_RDBCHANNEL || + strcmp(task->id, task_id) != 0) + { + addReplyError(c, "Another migration task is already in progress"); + return; + } + + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) { + /* Close the main channel client before rdb channel client connects */ + if (task->main_channel_client) + freeClient(task->main_channel_client); + } + + /* The main channel client must be present when setting RDB channel client */ + if (task->main_channel_client == NULL) { + /* Maybe the main channel connection is closed. */ + addReplyError(c, "Main channel connection is not established"); + return; + } + + /* Mark the client as a slave to generate slots snapshot */ + c->flags |= (CLIENT_SLAVE | CLIENT_REPL_RDB_CHANNEL | CLIENT_REPL_RDBONLY | CLIENT_ASM_MIGRATING); + c->slave_capa |= SLAVE_CAPA_EOF; + c->slave_req |= (SLAVE_REQ_SLOTS_SNAPSHOT | SLAVE_REQ_RDB_CHANNEL); + c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; + c->repldbfd = -1; + if (server.repl_disable_tcp_nodelay) + connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */ + listAddNodeTail(server.slaves, c); + + /* Wait for bgsave to start for slots sync */ + task->state = ASM_WAIT_BGSAVE_START; + task->rdb_channel_state = ASM_WAIT_BGSAVE_START; + task->rdb_channel_client = c; + c->task = task; + + /* Keep the client in the main thread to avoid data races between the + * connWrite call in startBgsaveForReplication and the client's event + * handler in IO threads. */ + if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); + + if (!hasActiveChildProcess()) { + startBgsaveForReplication(c->slave_capa, c->slave_req); + } else { + serverLog(LL_NOTICE, "BGSAVE for slots snapshot sync delayed"); + } + } else if (!strcasecmp(c->argv[2]->ptr, "snapshot-eof") && c->argc == 3) { + /* CLUSTER SYNCSLOTS SNAPSHOT-EOF */ + clusterSyncSlotsSnapshotEOF(c); + } else if (!strcasecmp(c->argv[2]->ptr, "stream-eof") && c->argc == 3) { + /* CLUSTER SYNCSLOTS STREAM-EOF */ + clusterSyncSlotsStreamEOF(c); + } else if (!strcasecmp(c->argv[2]->ptr, "ack") && c->argc == 5) { + /* CLUSTER SYNCSLOTS ACK */ + long long offset; + int dest_state; + + if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_STREAMING_BUF))) { + dest_state = ASM_STREAMING_BUF; + } else if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_WAIT_STREAM_EOF))) { + dest_state = ASM_WAIT_STREAM_EOF; + } else { + return; /* Not support now. */ + } + + if ((getLongLongFromObject(c->argv[4], &offset) != C_OK)) + return; + + if (c->task && c->task->operation == ASM_MIGRATE) { + /* Update the state and ACKed offset from destination. */ + asmTask *task = c->task; + task->dest_state = dest_state; + if (task->dest_offset > (unsigned long long) offset) { + serverLog(LL_WARNING, "CLUSTER SYNCSLOTS ACK received, dest state: %s, " + "but offset %lld is less than the current dest offset %lld", + asmTaskStateToString(dest_state), offset, task->dest_offset); + return; + } + task->dest_offset = offset; + serverLog(LL_DEBUG, "CLUSTER SYNCSLOTS ACK received, dest state: %s, " + "updated dest offset to %lld, source offset: %lld", + asmTaskStateToString(dest_state), task->dest_offset, task->source_offset); + + /* Record the time when the destination finishes applying the accumulated buffer */ + if (task->dest_state == ASM_WAIT_STREAM_EOF && task->dest_accum_applied_time == 0) + task->dest_accum_applied_time = server.mstime; + + /* Pause write if needed */ + if (task->state == ASM_SEND_BULK_AND_STREAM || task->state == ASM_SEND_STREAM) { + /* Pause writes on the main channel if the lag is less than the threshold. */ + if (task->dest_offset + server.asm_handoff_max_lag_bytes >= task->source_offset) { + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, ASM_HANDOFF_PREP))) + return; /* Do not enter handoff prep state for testing buffer drain timeout. */ + + serverLog(LL_NOTICE, "The applied offset lag %lld is less than the threshold %lld, " + "pausing writes for slot handoff", + task->source_offset - task->dest_offset, + server.asm_handoff_max_lag_bytes); + task->state = ASM_HANDOFF_PREP; + asmLogTaskEvent(task, ASM_EVENT_HANDOFF_PREP); + clusterAsmOnEvent(task->id, ASM_EVENT_HANDOFF_PREP, task->slots); + } + } + } + } else if (!strcasecmp(c->argv[2]->ptr, "fail") && c->argc == 4) { + /* CLUSTER SYNCSLOTS FAIL */ + return; /* This is a no-op, just to handle the command syntax. */ + } else if (!strcasecmp(c->argv[2]->ptr, "conf") && c->argc >= 5) { + /* CLUSTER SYNCSLOTS CONF