diff options
Diffstat (limited to 'examples/redis-unstable/src/cluster_asm.c')
| -rw-r--r-- | examples/redis-unstable/src/cluster_asm.c | 3602 |
1 files changed, 3602 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/cluster_asm.c b/examples/redis-unstable/src/cluster_asm.c new file mode 100644 index 0000000..a090453 --- /dev/null +++ b/examples/redis-unstable/src/cluster_asm.c @@ -0,0 +1,3602 @@ +/* cluster_asm.c -- Atomic slot migration implementation for cluster + * + * Copyright (c) 2025-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#include "server.h" +#include "cluster.h" +#include "functions.h" +#include "cluster_asm.h" +#include "cluster_slot_stats.h" + +#define ASM_IMPORT (1 << 1) +#define ASM_MIGRATE (1 << 2) + +#define ASM_DEBUG_TRIM_DEFAULT 0 +#define ASM_DEBUG_TRIM_NONE 1 +#define ASM_DEBUG_TRIM_BG 2 +#define ASM_DEBUG_TRIM_ACTIVE 3 + +#define ASM_AOF_MIN_ITEMS_PER_KEY 512 /* Minimum number of items per key to use AOF format encoding */ + +typedef struct asmTask { + sds id; /* Task ID */ + int operation; /* Either ASM_IMPORT or ASM_MIGRATE */ + slotRangeArray *slots; /* List of slot ranges for this migration task */ + int state; /* Current state of the task */ + int dest_state; /* Destination node's main state (approximate) */ + char source[CLUSTER_NAMELEN]; /* Source node name */ + char dest[CLUSTER_NAMELEN]; /* Destination node name */ + clusterNode *source_node; /* Source node */ + connection *main_channel_conn; /* Main channel connection */ + connection *rdb_channel_conn; /* RDB channel connection */ + int rdb_channel_state; /* State of the RDB channel */ + unsigned long long dest_offset; /* Destination offset */ + unsigned long long source_offset; /* Source offset */ + int cross_slot_during_propagating; /* If cross-slot commands are encountered during propagating */ + int stream_eof_during_streaming; /* If STREAM-EOF is received during streaming buffer */ + replDataBuf sync_buffer; /* Buffer for the stream */ + client *main_channel_client; /* Client for the main channel on the source side */ + client *rdb_channel_client; /* Client for the RDB channel on the source side */ + long long retry_count; /* Number of retries for this task */ + mstime_t create_time; /* Task creation time */ + mstime_t start_time; /* Task start time */ + mstime_t end_time; /* Task end time */ + mstime_t paused_time; /* The time when the slot writes were paused */ + mstime_t dest_slots_snapshot_time; /* The time when the destination starts applying the slot snapshot */ + mstime_t dest_accum_applied_time; /* The time when the destination finishes applying the accumulated buffer */ + sds error; /* Error message for this task */ + redisOpArray *pre_snapshot_module_cmds; /* Module commands to be propagated at the beginning of slot migration */ +} asmTask; + +struct asmManager { + list *tasks; /* List of asmTask to be processed */ + list *archived_tasks; /* List of archived asmTask */ + list *pending_trim_jobs; /* List of pending trim jobs (due to write pause) */ + list *active_trim_jobs; /* List of active trim jobs */ + slotRangeArrayIter *active_trim_it; /* Iterator of the current active trim job */ + size_t sync_buffer_peak; /* Peak size of sync buffer */ + asmTask *master_task; /* The task that is currently active on the master */ + + /* Fail point injection for debugging */ + int debug_fail_channel; /* Channel where the task will fail */ + int debug_fail_state; /* State where the task will fail */ + int debug_trim_method; /* Method to trim the buffer */ + int debug_active_trim_delay; /* Sleep before trimming each key */ + + /* Active trim stats */ + unsigned long long active_trim_started; /* Number of times active trim was started */ + unsigned long long active_trim_completed; /* Number of times active trim was completed */ + unsigned long long active_trim_cancelled; /* Number of times active trim was cancelled */ + unsigned long long active_trim_current_job_keys; /* Total number of keys to trim in the current job */ + unsigned long long active_trim_current_job_trimmed; /* Number of keys trimmed in the current job */ +}; + +enum asmState { + /* Common state */ + ASM_NONE = 0, + ASM_CONNECTING, + ASM_AUTH_REPLY, + ASM_CANCELED, + ASM_FAILED, + ASM_COMPLETED, + + /* Import state */ + ASM_SEND_HANDSHAKE, + ASM_HANDSHAKE_REPLY, + ASM_SEND_SYNCSLOTS, + ASM_SYNCSLOTS_REPLY, + ASM_INIT_RDBCHANNEL, + ASM_ACCUMULATE_BUF, + ASM_READY_TO_STREAM, + ASM_STREAMING_BUF, + ASM_WAIT_STREAM_EOF, + ASM_TAKEOVER, + + /* Migrate state */ + ASM_WAIT_RDBCHANNEL, + ASM_WAIT_BGSAVE_START, + ASM_SEND_BULK_AND_STREAM, + ASM_SEND_STREAM, + ASM_HANDOFF_PREP, + ASM_HANDOFF, + ASM_STREAM_EOF, + + /* RDB channel state */ + ASM_RDBCHANNEL_REQUEST, + ASM_RDBCHANNEL_REPLY, + ASM_RDBCHANNEL_TRANSFER, +}; + +enum asmChannel { + ASM_IMPORT_MAIN_CHANNEL = 1, /* Main channel for the import task */ + ASM_IMPORT_RDB_CHANNEL, /* RDB channel for the import task */ + ASM_MIGRATE_MAIN_CHANNEL, /* Main channel for the migrate task */ + ASM_MIGRATE_RDB_CHANNEL /* RDB channel for the migrate task */ +}; + +/* Global ASM manager */ +struct asmManager *asmManager = NULL; + +/* replication.c */ +char *sendCommand(connection *conn, ...); +char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens); +char *receiveSynchronousResponse(connection *conn); +ConnectionType *connTypeOfReplication(void); +int startBgsaveForReplication(int mincapa, int req); +void createReplicationBacklogIfNeeded(void); +/* cluster.c */ +void createDumpPayload(rio *payload, robj *o, robj *key, int dbid, int skip_checksum); +/* cluster_asm.c */ +static void asmStartImportTask(asmTask *task); +static void asmTaskCancel(asmTask *task, const char *reason); +static void asmSyncBufferReadFromConn(connection *conn); +static void propagateTrimSlots(slotRangeArray *slots); +void asmTrimJobSchedule(slotRangeArray *slots); +void asmTrimJobProcessPending(void); +void asmCancelPendingTrimJobs(void); +void asmTriggerActiveTrim(slotRangeArray *slots); +void asmActiveTrimEnd(void); +int asmIsAnyTrimJobOverlaps(slotRangeArray *slots); +void asmTrimSlotsIfNotOwned(slotRangeArray *slots); +void asmNotifyStateChange(asmTask *task, int event); + +void asmInit(void) { + asmManager = zcalloc(sizeof(*asmManager)); + asmManager->tasks = listCreate(); + asmManager->archived_tasks = listCreate(); + asmManager->pending_trim_jobs = listCreate(); + asmManager->sync_buffer_peak = 0; + asmManager->master_task = NULL; + asmManager->debug_fail_channel = -1; + asmManager->debug_fail_state = -1; + asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT; + asmManager->debug_active_trim_delay = 0; + asmManager->active_trim_jobs = listCreate(); + asmManager->active_trim_started = 0; + asmManager->active_trim_completed = 0; + asmManager->active_trim_cancelled = 0; + listSetFreeMethod(asmManager->active_trim_jobs, slotRangeArrayFreeGeneric); +} + +char *asmTaskStateToString(int state) { + switch (state) { + case ASM_NONE: return "none"; + case ASM_CONNECTING: return "connecting"; + case ASM_AUTH_REPLY: return "auth-reply"; + case ASM_CANCELED: return "canceled"; + case ASM_FAILED: return "failed"; + case ASM_COMPLETED: return "completed"; + + /* Import state */ + case ASM_SEND_HANDSHAKE: return "send-handshake"; + case ASM_HANDSHAKE_REPLY: return "handshake-reply"; + case ASM_SEND_SYNCSLOTS: return "send-syncslots"; + case ASM_SYNCSLOTS_REPLY: return "syncslots-reply"; + case ASM_INIT_RDBCHANNEL: return "init-rdbchannel"; + case ASM_ACCUMULATE_BUF: return "accumulate-buffer"; + case ASM_READY_TO_STREAM: return "ready-to-stream"; + case ASM_STREAMING_BUF: return "streaming-buffer"; + case ASM_WAIT_STREAM_EOF: return "wait-stream-eof"; + case ASM_TAKEOVER: return "takeover"; + + /* Migrate state */ + case ASM_WAIT_RDBCHANNEL: return "wait-rdbchannel"; + case ASM_WAIT_BGSAVE_START: return "wait-bgsave-start"; + case ASM_SEND_BULK_AND_STREAM: return "send-bulk-and-stream"; + case ASM_SEND_STREAM: return "send-stream"; + case ASM_HANDOFF_PREP: return "handoff-prep"; + case ASM_HANDOFF: return "handoff"; + case ASM_STREAM_EOF: return "stream-eof"; + + /* RDB channel state */ + case ASM_RDBCHANNEL_REQUEST: return "rdbchannel-request"; + case ASM_RDBCHANNEL_REPLY: return "rdbchannel-reply"; + case ASM_RDBCHANNEL_TRANSFER: return "rdbchannel-transfer"; + + default: return "unknown"; + } + serverAssert(0); /* Unreachable */ +} + +const char *asmChannelToString(int channel) { + switch (channel) { + case ASM_IMPORT_MAIN_CHANNEL: return "import-main-channel"; + case ASM_IMPORT_RDB_CHANNEL: return "import-rdb-channel"; + case ASM_MIGRATE_MAIN_CHANNEL: return "migrate-main-channel"; + case ASM_MIGRATE_RDB_CHANNEL: return "migrate-rdb-channel"; + default: return "unknown"; + } +} + +int asmDebugSetFailPoint(char *channel, char *state) { + if (!asmManager) { + serverLog(LL_WARNING, "ASM manager is not initialized"); + return C_ERR; + } + asmManager->debug_fail_channel = -1; + asmManager->debug_fail_state = -1; + if (!channel && !state) return C_ERR; + if (sdslen(channel) == 0 && sdslen(state) == 0) { + serverLog(LL_WARNING, "ASM fail point is cleared"); + return C_OK; + } + + for (int i = ASM_IMPORT_MAIN_CHANNEL; i <= ASM_MIGRATE_RDB_CHANNEL; i++) { + if (!strcasecmp(channel, asmChannelToString(i))) { + asmManager->debug_fail_channel = i; + break; + } + } + if (asmManager->debug_fail_channel == -1) return C_ERR; + + for (int i = ASM_NONE; i <= ASM_RDBCHANNEL_TRANSFER; i++) { + if (!strcasecmp(state, asmTaskStateToString(i))) { + asmManager->debug_fail_state = i; + break; + } + } + if (asmManager->debug_fail_state == -1) return C_ERR; + + serverLog(LL_NOTICE, "ASM fail point set: channel=%s, state=%s", channel, state); + return C_OK; +} + +int asmDebugSetTrimMethod(const char *method, int active_trim_delay) { + if (!asmManager) { + serverLog(LL_WARNING, "ASM manager is not initialized"); + return C_ERR; + } + int prev = asmManager->debug_trim_method; + if (!strcasecmp(method, "default")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT; + else if (!strcasecmp(method, "none")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_NONE; + else if (!strcasecmp(method, "bg")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_BG; + else if (!strcasecmp(method, "active")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_ACTIVE; + else return C_ERR; + + /* If we are switching from none to default, delete all the keys in the + * slots we don't own */ + if (prev == ASM_DEBUG_TRIM_NONE && asmManager->debug_trim_method != ASM_DEBUG_TRIM_NONE) { + for (int i = 0; i < CLUSTER_SLOTS; i++) + if (!clusterIsMySlot(i)) + clusterDelKeysInSlot(i, 0); + } + asmManager->debug_active_trim_delay = active_trim_delay; + serverLog(LL_NOTICE, "ASM trim method was set=%s, active_trim_delay=%d", method, active_trim_delay); + return C_OK; +} + +int asmDebugIsFailPointActive(int channel, int state) { + if (!asmManager) return 0; /* ASM manager not initialized */ + if (asmManager->debug_fail_channel == channel && asmManager->debug_fail_state == state) { + serverLog(LL_NOTICE, "ASM fail point active: channel=%s, state=%s", + asmChannelToString(channel), asmTaskStateToString(state)); + return 1; + } + return 0; +} + +sds asmCatInfoString(sds info) { + int active_tasks = 0; + + listIter li; + listNode *ln; + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (task->operation == ASM_IMPORT || + (task->operation == ASM_MIGRATE && task->state != ASM_FAILED)) + { + active_tasks++; + } + } + + return sdscatprintf(info ? info : sdsempty(), + "cluster_slot_migration_active_tasks:%d\r\n" + "cluster_slot_migration_active_trim_running:%lu\r\n" + "cluster_slot_migration_active_trim_current_job_keys:%llu\r\n" + "cluster_slot_migration_active_trim_current_job_trimmed:%llu\r\n" + "cluster_slot_migration_stats_active_trim_started:%llu\r\n" + "cluster_slot_migration_stats_active_trim_completed:%llu\r\n" + "cluster_slot_migration_stats_active_trim_cancelled:%llu\r\n", + active_tasks, + listLength(asmManager->active_trim_jobs), + asmManager->active_trim_current_job_keys, + asmManager->active_trim_current_job_trimmed, + asmManager->active_trim_started, + asmManager->active_trim_completed, + asmManager->active_trim_cancelled); +} + +void asmTaskReset(asmTask *task) { + task->state = ASM_NONE; + task->dest_state = ASM_NONE; + task->rdb_channel_state = ASM_NONE; + task->main_channel_conn = NULL; + task->rdb_channel_conn = NULL; + task->dest_offset = 0; + task->source_offset = 0; + task->stream_eof_during_streaming = 0; + task->cross_slot_during_propagating = 0; + replDataBufInit(&task->sync_buffer); + task->main_channel_client = NULL; + task->rdb_channel_client = NULL; + task->paused_time = 0; + task->dest_slots_snapshot_time = 0; + task->dest_accum_applied_time = 0; + task->pre_snapshot_module_cmds = NULL; +} + +asmTask *asmTaskCreate(const char *task_id) { + asmTask *task = zcalloc(sizeof(*task)); + task->error = sdsempty(); + asmTaskReset(task); + task->slots = NULL; + task->source_node = NULL; + task->retry_count = 0; + task->create_time = server.mstime; + task->start_time = -1; + task->end_time = -1; + if (task_id) { + task->id = sdsnew(task_id); + } else { + task->id = sdsnewlen(NULL, CLUSTER_NAMELEN); + getRandomHexChars(task->id, CLUSTER_NAMELEN); + } + + return task; +} + +void asmTaskFree(asmTask *task) { + replDataBufClear(&task->sync_buffer); + sdsfree(task->id); + slotRangeArrayFree(task->slots); + sdsfree(task->error); + zfree(task); +} + +/* Convert the task state to the corresponding event. */ +int asmTaskStateToEvent(asmTask *task) { + if (task->operation == ASM_IMPORT) { + if (task->state == ASM_COMPLETED) return ASM_EVENT_IMPORT_COMPLETED; + else if (task->state == ASM_FAILED) return ASM_EVENT_IMPORT_FAILED; + else return ASM_EVENT_IMPORT_STARTED; + } else { + if (task->state == ASM_COMPLETED) return ASM_EVENT_MIGRATE_COMPLETED; + else if (task->state == ASM_FAILED) return ASM_EVENT_MIGRATE_FAILED; + else return ASM_EVENT_MIGRATE_STARTED; + } +} + +/* Serialize ASM task information into a string for transmission to replicas. + * Format: "task_id:source_node:dest_node:operation:state:slot_ranges" + * Where slot_ranges is in the format "1000-2000 3000-4000 ..." */ +sds asmTaskSerialize(asmTask *task) { + sds serialized = sdsempty(); + + /* Add task ID */ + serialized = sdscatprintf(serialized, "%s:", task->id); + + /* Add source node ID (40 chars) */ + serialized = sdscatlen(serialized, task->source, CLUSTER_NAMELEN); + serialized = sdscat(serialized, ":"); + + /* Add destination node ID (40 chars) */ + serialized = sdscatlen(serialized, task->dest, CLUSTER_NAMELEN); + serialized = sdscat(serialized, ":"); + + /* Add operation type */ + serialized = sdscatprintf(serialized, "%s:", task->operation == ASM_IMPORT ? + "import" : "migrate"); + + /* Add current state */ + serialized = sdscatprintf(serialized, "%s:", asmTaskStateToString(task->state)); + + /* Add slot ranges sds */ + sds slots_str = slotRangeArrayToString(task->slots); + serialized = sdscatprintf(serialized, "%s", slots_str); + sdsfree(slots_str); + + return serialized; +} + +/* Deserialize ASM task information from a string and create a complete asmTask. + * Format: "task_id:source_node:dest_node:operation:state:slot_ranges" + * Returns a new asmTask on success, NULL on failure. */ +asmTask *asmTaskDeserialize(sds data) { + int count, idx = 0; + asmTask *task = NULL; + if (!data || sdslen(data) == 0) return NULL; + + sds *parts = sdssplitlen(data, sdslen(data), ":", 1, &count); + if (count < 6) goto err; + + /* Parse task ID */ + if (sdslen(parts[idx]) == 0) goto err; + task = asmTaskCreate(parts[idx]); + if (!task) goto err; + idx++; + + /* Parse source node ID */ + if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err; + memcpy(task->source, parts[idx], CLUSTER_NAMELEN); + idx++; + + /* Parse destination node ID */ + if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err; + memcpy(task->dest, parts[idx], CLUSTER_NAMELEN); + idx++; + + /* Parse operation type */ + if (!strcasecmp(parts[idx], "import")) { + task->operation = ASM_IMPORT; + } else if (!strcasecmp(parts[idx], "migrate")) { + task->operation = ASM_MIGRATE; + } else { + goto err; + } + idx++; + + /* Parse state */ + task->state = ASM_NONE; /* Default state */ + for (int state = ASM_NONE; state <= ASM_RDBCHANNEL_TRANSFER; state++) { + if (!strcasecmp(parts[idx], asmTaskStateToString(state))) { + task->state = state; + break; + } + } + idx++; + + /* Parse slot ranges */ + task->slots = slotRangeArrayFromString(parts[idx]); + if (!task->slots) goto err; + idx++; + + /* Ignore any extra fields for future compatibility */ + + sdsfreesplitres(parts, count); + return task; + +err: + if (task) asmTaskFree(task); + sdsfreesplitres(parts, count); + return NULL; +} + +/* Notify replicas about ASM task information to maintain consistency during + * slot migration. This function sends a CLUSTER SYNCSLOTS CONF ASM-TASK command + * to all connected replicas with the serialized task information. */ +void asmNotifyReplicasStateChange(struct asmTask *task) { + if (!server.cluster_enabled || !clusterNodeIsMaster(getMyClusterNode())) return; + + /* Create command arguments for CLUSTER SYNCSLOTS CONF ASM-TASK */ + robj *argv[5]; + argv[0] = createStringObject("CLUSTER", 7); + argv[1] = createStringObject("SYNCSLOTS", 9); + argv[2] = createStringObject("CONF", 4); + argv[3] = createStringObject("ASM-TASK", 8); + argv[4] = createObject(OBJ_STRING, asmTaskSerialize(task)); + + /* Send the command to all replicas */ + replicationFeedSlaves(server.slaves, -1, argv, 5); + + /* Clean up command objects */ + for (int i = 0; i < 5; i++) { + decrRefCount(argv[i]); + } +} + +/* Dump the active import ASM task information. */ +sds asmDumpActiveImportTask(void) { + if (!server.cluster_enabled) return NULL; + + /* For replica, dump the master active task. */ + if (clusterNodeIsSlave(getMyClusterNode()) && + asmManager->master_task && + asmManager->master_task->state != ASM_FAILED && + asmManager->master_task->state != ASM_COMPLETED) + { + return asmTaskSerialize(asmManager->master_task); + } + + /* For master, dump the first active task. */ + if (!asmManager || listLength(asmManager->tasks) == 0) return NULL; + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->state == ASM_NONE || task->state == ASM_FAILED || + task->state == ASM_COMPLETED) return NULL; + + return asmTaskSerialize(task); +} + +size_t asmGetPeakSyncBufferSize(void) { + if (!asmManager) return 0; + /* Compute peak sync buffer usage. The current task's peak may not + * reflect in asmManager->sync_buffer_peak immediately. */ + size_t peak = asmManager->sync_buffer_peak; + asmTask *task = listFirst(asmManager->tasks) ? + listNodeValue(listFirst(asmManager->tasks)) : NULL; + if (task && task->operation == ASM_IMPORT) + peak = max(task->sync_buffer.peak, asmManager->sync_buffer_peak); + + return peak; +} + +size_t asmGetImportInputBufferSize(void) { + if (!asmManager || listLength(asmManager->tasks) == 0) return 0; + + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->operation == ASM_IMPORT) + return task->sync_buffer.mem_used; + + return 0; +} + +size_t asmGetMigrateOutputBufferSize(void) { + if (!asmManager || listLength(asmManager->tasks) == 0) return 0; + + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->operation == ASM_MIGRATE && task->main_channel_client) + return getClientOutputBufferMemoryUsage(task->main_channel_client); + + return 0; +} + +/* Returns the ASM task with the given ID, or NULL if no such task exists. */ +static asmTask *asmLookupTaskAt(list *tasks, const char *id) { + listIter li; + listNode *ln; + + listRewind(tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (!strcmp(task->id, id)) return task; + } + return NULL; +} + +/* Returns the ASM task with the given ID, or NULL if no such task exists. */ +asmTask *asmLookupTaskById(const char *id) { + return asmLookupTaskAt(asmManager->tasks, id); +} + +/* Returns the ASM task that is identical to the given slot range array, or NULL + * if no such task exists. */ +asmTask *asmLookupTaskBySlotRangeArray(slotRangeArray *slots) { + listIter li; + listNode *ln; + + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (slotRangeArrayIsEqual(task->slots, slots)) + return task; + } + return NULL; +} + +/* Returns the slot range array for the given task ID */ +slotRangeArray *asmTaskGetSlotRanges(const char *task_id) { + asmTask *task = NULL; + if (!task_id || (task = asmLookupTaskById(task_id)) == NULL) return NULL; + + return task->slots; +} + +/* Returns 1 if the slot range array overlaps with the given slot range. */ +static int slotRangeArrayOverlaps(slotRangeArray *slots, slotRange *req) { + for (int i = 0; i < slots->num_ranges; i++) { + slotRange *sr = &slots->ranges[i]; + if (sr->start <= req->end && sr->end >= req->start) + return 1; + } + return 0; +} + +/* Returns 1 if the two slot range arrays overlap, 0 otherwise. */ +static int slotRangeArraysOverlap(slotRangeArray *slots1, slotRangeArray *slots2) { + for (int i = 0; i < slots1->num_ranges; i++) { + slotRange *sr1 = &slots1->ranges[i]; + if (slotRangeArrayOverlaps(slots2, sr1)) return 1; + } + return 0; +} + +/* Returns the ASM task that overlaps with the given slot range, or NULL if + * no such task exists. */ +static asmTask *lookupAsmTaskBySlotRange(slotRange *req) { + listIter li; + listNode *ln; + + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (slotRangeArrayOverlaps(task->slots, req)) + return task; + } + return NULL; +} + +/* Validates the given slot ranges for a migration task: + * - Ensures the current node is a master. + * - Verifies all slots are in a STABLE state. + * - Confirms all slots belong to a single source node. + * - Confirms no ongoing import task that overlaps with the slot ranges. + * + * Returns the source node if validation succeeds. + * Otherwise, returns NULL and sets 'err' variable. */ +static clusterNode *validateImportSlotRanges(slotRangeArray *slots, sds *err, asmTask *current) { + clusterNode *source = NULL; + + *err = NULL; + + /* Ensure this is a master node */ + if (!clusterNodeIsMaster(getMyClusterNode())) { + *err = sdsnew("slot migration not allowed on replica."); + goto out; + } + + /* Ensure no manual migration is in progress. */ + for (int i = 0; i < CLUSTER_SLOTS; i++) { + if (getImportingSlotSource(i) != NULL || + getMigratingSlotDest(i) != NULL) + { + *err = sdsnew("all slot states must be STABLE to start a slot migration task."); + goto out; + } + } + + for (int i = 0; i < slots->num_ranges; i++) { + slotRange *sr = &slots->ranges[i]; + + /* Ensure no import task overlaps with this slot range. + * Skip check current task that is running for this slot range. */ + asmTask *task = lookupAsmTaskBySlotRange(sr); + if (task && task != current && task->operation == ASM_IMPORT) { + *err = sdscatprintf(sdsempty(), + "overlapping import exists for slot range: %d-%d", + sr->start, sr->end); + goto out; + } + + /* Validate if we can start migration task for this slot range. */ + for (int j = sr->start; j <= sr->end; j++) { + clusterNode *node = getNodeBySlot(j); + if (node == NULL) { + *err = sdscatprintf(sdsempty(), "slot has no owner: %d", j); + goto out; + } + + if (!source) { + source = node; + } else if (source != node) { + *err = sdsnew("slots belong to different source nodes"); + goto out; + } + } + } + +out: + return *err ? NULL : source; +} + +/* Returns 1 if a task with the specified operation is in progress, 0 otherwise. */ +static int asmTaskInProgress(int operation) { + listIter li; + listNode *ln; + + if (!asmManager || listLength(asmManager->tasks) == 0) return 0; + + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (task->operation == operation) return 1; + } + return 0; +} + +/* Returns 1 if a migrate task is in progress, 0 otherwise. */ +int asmMigrateInProgress(void) { + return asmTaskInProgress(ASM_MIGRATE); +} + +/* Returns 1 if an import task is in progress, 0 otherwise. */ +int asmImportInProgress(void) { + return asmTaskInProgress(ASM_IMPORT); +} + +/* Returns 1 if the task is in a state where it can receive replication stream +* for the slot range, 0 otherwise. */ +inline static int asmCanFeedMigrationClient(asmTask *task) { + return task->operation == ASM_MIGRATE && + !task->cross_slot_during_propagating && + (task->state == ASM_SEND_BULK_AND_STREAM || + task->state == ASM_SEND_STREAM || + task->state == ASM_HANDOFF_PREP); +} + +/* Feed the migration client with the replication stream for the slot range. */ +void asmFeedMigrationClient(robj **argv, int argc) { + asmTask *task = NULL; + + if (server.cluster_enabled == 0 || listLength(asmManager->tasks) == 0) + return; + + /* Check if there is a migrate task that can receive replication stream. */ + task = listNodeValue(listFirst(asmManager->tasks)); + if (!asmCanFeedMigrationClient(task)) return; + + /* Ensure all arguments are converted to string encoding if necessary, + * since getSlotFromCommand expects them to be string-encoded. + * Generally the arguments are string-encoded, but we may rewrite + * the command arguments to integer encoding. */ + for (int i = 0; i < argc; i++) { + if (!sdsEncodedObject(argv[i])) { + serverAssert(argv[i]->encoding == OBJ_ENCODING_INT); + robj *old = argv[i]; + argv[i] = createStringObjectFromLongLongWithSds((long)old->ptr); + decrRefCount(old); + } + } + + /* Check if the command belongs to the slot range. */ + struct redisCommand *cmd = lookupCommand(argv, argc); + serverAssert(cmd); + + int slot = getSlotFromCommand(cmd, argv, argc); + + /* If the command does not have keys, skip it now. + * SELECT is not propagated, since we only support a single db in cluster mode. + * MULTI/EXEC is not needed, since transaction semantics are unnecessary + * before the slot handoff. + * FUNCTION subcommands should be executed on all nodes, so here we skip it, + * and even propagating them may cause an error when executing. + * + * NOTICE: if some keyless commands should be propagated to the destination, + * we should identify them here and send. */ + if (slot == INVALID_CLUSTER_SLOT) return; + + /* Generally we reject cross-slot commands before executing, but module may + * replicate this kind of command, so we check again. To guarantee data + * consistency, we cancel the task if we encounter a cross-slot command. */ + if (slot == CLUSTER_CROSSSLOT) { + /* We cannot cancel the task directly here, since it may lead to a recursive + * call: asmTaskCancel() --> moduleFireServerEvent() --> moduleFreeContext() + * --> postExecutionUnitOperations() --> propagateNow(). Even worse, this + * could result in propagating pending commands to the replication stream twice. + * To avoid this, we simply set a flag here, cancel the task in beforeSleep. */ + task->cross_slot_during_propagating = 1; + return; + } + + /* Check if the slot belongs to the task's slot range. */ + slotRange sr = {slot, slot}; + if (!slotRangeArrayOverlaps(task->slots, &sr)) return; + + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) + freeClientAsync(task->main_channel_client); + + /* Feed main channel with the command. */ + client *c = task->main_channel_client; + size_t prev_bytes = getNormalClientPendingReplyBytes(c); + + addReplyArrayLen(c, argc); + for (int i = 0; i < argc; i++) + addReplyBulk(c, argv[i]); + + /* Update the task's source offset to reflect the bytes sent. */ + task->source_offset += (getNormalClientPendingReplyBytes(c) - prev_bytes); +} + +asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *err) { + clusterNode *source; + + *err = NULL; + /* Validate that the slot ranges are valid and that migration can be + * initiated for them. */ + source = validateImportSlotRanges(slots, err, NULL); + if (!source) + goto err; + + if (source == getMyClusterNode()) { + *err = sdsnew("this node is already the owner of the slot range"); + goto err; + } + + /* Only support a single task at a time now. */ + if (listLength(asmManager->tasks) != 0) { + asmTask *current = listNodeValue(listFirst(asmManager->tasks)); + if (current->state == ASM_FAILED) { + /* We can create a new import task only if the current one is failed, + * cancel the failed task to create a new one. */ + asmTaskCancel(current, "new import requested"); + } else { + *err = sdsnew("another ASM task is already in progress"); + goto err; + } + } + /* There should be no task in progress. */ + serverAssert(listLength(asmManager->tasks) == 0); + + /* Create a slot migration task */ + asmTask *task = asmTaskCreate(task_id); + task->slots = slots; + task->state = ASM_NONE; + task->operation = ASM_IMPORT; + task->source_node = source; + memcpy(task->source, clusterNodeGetName(source), CLUSTER_NAMELEN); + memcpy(task->dest, getMyClusterId(), CLUSTER_NAMELEN); + + listAddNodeTail(asmManager->tasks, task); + sds slots_str = slotRangeArrayToString(slots); + serverLog(LL_NOTICE, "Import task %s created: src=%.40s, dest=%.40s, slots=%s", + task->id, task->source, task->dest, slots_str); + sdsfree(slots_str); + + return task; + +err: + slotRangeArrayFree(slots); + return NULL; +} + +/* CLUSTER MIGRATION IMPORT <start-slot end-slot [start-slot end-slot ...]> + * + * Sent by operator to the destination node to start the migration. */ +static void clusterMigrationCommandImport(client *c) { + /* Validate slot range arg count */ + int remaining = c->argc - 3; + if (remaining == 0 || remaining % 2 != 0) { + addReplyErrorArity(c); + return; + } + + slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 3); + if (!slots) return; + + sds err = NULL; + asmTask *task = asmCreateImportTask(NULL, slots, &err); + if (!task) { + addReplyErrorSds(c, err); + return; + } + + addReplyBulkCString(c, task->id); +} + +/* CLUSTER MIGRATION CANCEL [ID <task-id> | ALL] + * - Reply: Number of cancelled tasks + * + * Cancels import tasks that overlap with the specified slot ranges. + * Multiple tasks may be cancelled. */ +static void clusterMigrationCommandCancel(client *c) { + sds task_id = NULL; + int num_cancelled = 0; + + /* Validate slot range arg count */ + if (c->argc != 4 && c->argc != 5) { + addReplyErrorArity(c); + return; + } + + if (!strcasecmp(c->argv[3]->ptr, "id")) { + if (c->argc != 5) { + addReplyErrorArity(c); + return; + } + task_id = c->argv[4]->ptr; + } else if (!strcasecmp(c->argv[3]->ptr, "all")) { + if (c->argc != 4) { + addReplyErrorArity(c); + return; + } + } else { + addReplyError(c, "unknown argument"); + return; + } + + num_cancelled = clusterAsmCancel(task_id, "user request"); + addReplyLongLong(c, num_cancelled); +} + +/* Reply with the status of the task. */ +static void replyTaskStatus(client *c, asmTask *task) { + mstime_t p = 0; + + addReplyMapLen(c, 12); + addReplyBulkCString(c, "id"); + addReplyBulkCString(c, task->id); + addReplyBulkCString(c, "slots"); + addReplyBulkSds(c, slotRangeArrayToString(task->slots)); + addReplyBulkCString(c, "source"); + addReplyBulkCBuffer(c, task->source, CLUSTER_NAMELEN); + addReplyBulkCString(c, "dest"); + addReplyBulkCBuffer(c, task->dest, CLUSTER_NAMELEN); + addReplyBulkCString(c, "operation"); + addReplyBulkCString(c, task->operation == ASM_IMPORT ? "import" : "migrate"); + addReplyBulkCString(c, "state"); + addReplyBulkCString(c, asmTaskStateToString(task->state)); + addReplyBulkCString(c, "last_error"); + addReplyBulkCBuffer(c, task->error, sdslen(task->error)); + addReplyBulkCString(c, "retries"); + addReplyLongLong(c, task->retry_count); + addReplyBulkCString(c, "create_time"); + addReplyLongLong(c, task->create_time); + addReplyBulkCString(c, "start_time"); + addReplyLongLong(c, task->start_time); + addReplyBulkCString(c, "end_time"); + addReplyLongLong(c, task->end_time); + + if (task->operation == ASM_MIGRATE && task->state == ASM_COMPLETED) + p = task->end_time - task->paused_time; + addReplyBulkCString(c, "write_pause_ms"); + addReplyLongLong(c, p); +} + +/* CLUSTER MIGRATION STATUS [ID <task-id> | ALL] + * - Reply: Array of atomic slot migration tasks */ +static void clusterMigrationCommandStatus(client *c) { + listIter li; + listNode *ln; + + if (c->argc != 4 && c->argc != 5) { + addReplyErrorArity(c); + return; + } + + if (!strcasecmp(c->argv[3]->ptr, "id")) { + if (c->argc != 5) { + addReplyErrorArity(c); + return; + } + sds id = c->argv[4]->ptr; + asmTask *task = asmLookupTaskAt(asmManager->tasks, id); + if (!task) task = asmLookupTaskAt(asmManager->archived_tasks, id); + if (!task) { + addReplyArrayLen(c, 0); + return; + } + + addReplyArrayLen(c, 1); + replyTaskStatus(c, task); + } else if (!strcasecmp(c->argv[3]->ptr, "all")) { + if (c->argc != 4) { + addReplyErrorArity(c); + return; + } + addReplyArrayLen(c, listLength(asmManager->tasks) + + listLength(asmManager->archived_tasks)); + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) + replyTaskStatus(c, listNodeValue(ln)); + + listRewind(asmManager->archived_tasks, &li); + while ((ln = listNext(&li)) != NULL) + replyTaskStatus(c, listNodeValue(ln)); + } else { + addReplyError(c, "unknown argument"); + return; + } +} + +/* CLUSTER MIGRATION + * <IMPORT <start-slot end-slot [start-slot end-slot ...]> | + * STATUS [ID <task-id> | ALL] | + * CANCEL [ID <task-id> | ALL]> +*/ +void clusterMigrationCommand(client *c) { + if (c->argc < 4) { + addReplyErrorArity(c); + return; + } + + if (strcasecmp(c->argv[2]->ptr, "import") == 0) { + clusterMigrationCommandImport(c); + } else if (strcasecmp(c->argv[2]->ptr, "status") == 0) { + clusterMigrationCommandStatus(c); + } else if (strcasecmp(c->argv[2]->ptr, "cancel") == 0) { + clusterMigrationCommandCancel(c); + } else { + addReplyError(c, "unknown argument"); + } +} + +/* Return the number of keys in the specified slot ranges. */ +unsigned long long asmCountKeysInSlots(slotRangeArray *slots) { + if (!slots) return 0; + + unsigned long long key_count = 0; + for (int i = 0; i < slots->num_ranges; i++) { + for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { + key_count += kvstoreDictSize(server.db[0].keys, j); + } + } + return key_count; +} + +/* Log a human-readable message for ASM task lifecycle events. */ +void asmLogTaskEvent(asmTask *task, int event) { + sds str = slotRangeArrayToString(task->slots); + + switch (event) { + case ASM_EVENT_IMPORT_STARTED: + serverLog(LL_NOTICE, "Import task %s started for slots: %s", task->id, str); + break; + case ASM_EVENT_IMPORT_FAILED: + serverLog(LL_NOTICE, "Import task %s failed for slots: %s", task->id, str); + break; + case ASM_EVENT_TAKEOVER: + serverLog(LL_NOTICE, "Import task %s is ready to takeover slots: %s", task->id, str); + break; + case ASM_EVENT_IMPORT_COMPLETED: + serverLog(LL_NOTICE, "Import task %s completed for slots: %s (imported %llu keys)", + task->id, str, asmCountKeysInSlots(task->slots)); + break; + case ASM_EVENT_MIGRATE_STARTED: + serverLog(LL_NOTICE, "Migrate task %s started for slots: %s (keys at start: %llu)", + task->id, str, asmCountKeysInSlots(task->slots)); + break; + case ASM_EVENT_MIGRATE_FAILED: + serverLog(LL_NOTICE, "Migrate task %s failed for slots: %s", task->id, str); + break; + case ASM_EVENT_HANDOFF_PREP: + serverLog(LL_NOTICE, "Migrate task %s preparing to handoff for slots: %s", task->id, str); + break; + case ASM_EVENT_MIGRATE_COMPLETED: + serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s (migrated %llu keys)", + task->id, str, asmCountKeysInSlots(task->slots)); + break; + default: + break; + } + + sdsfree(str); +} + +/* Notify the state change to the module and the cluster implementation. */ +void asmNotifyStateChange(asmTask *task, int event) { + RedisModuleClusterSlotMigrationInfo info = { + .version = REDISMODULE_CLUSTER_SLOT_MIGRATION_INFO_VERSION, + .task_id = task->id, + .slots = (RedisModuleSlotRangeArray *) task->slots + }; + memcpy(info.source_node_id, task->source, CLUSTER_NAMELEN); + memcpy(info.destination_node_id, task->dest, CLUSTER_NAMELEN); + + int module_event = -1; + if (event == ASM_EVENT_IMPORT_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_STARTED; + else if (event == ASM_EVENT_IMPORT_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_COMPLETED; + else if (event == ASM_EVENT_IMPORT_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_FAILED; + else if (event == ASM_EVENT_MIGRATE_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_STARTED; + else if (event == ASM_EVENT_MIGRATE_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED; + else if (event == ASM_EVENT_MIGRATE_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_FAILED; + serverAssert(module_event != -1); + + moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION, module_event, &info); + serverLog(LL_DEBUG, "Fire cluster asm module event, task %s: state=%s", + task->id, asmTaskStateToString(task->state)); + + if (clusterNodeIsMaster(getMyClusterNode())) { + /* Notify the cluster impl only if it is a real active import task. */ + if (task != asmManager->master_task) { + asmLogTaskEvent(task, event); + clusterAsmOnEvent(task->id, event, task->slots); + } + asmNotifyReplicasStateChange(task); /* Propagate state change to replicas */ + } +} + +void asmImportSetFailed(asmTask *task) { + serverAssert(task->operation == ASM_IMPORT); + if (task->state == ASM_FAILED) return; + + /* If we are in the RDB channel transfer state, we need to + * close the client that was created for the RDB channel. */ + if (task->rdb_channel_conn && task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER) { + client *c = connGetPrivateData(task->rdb_channel_conn); + serverAssert(c->task == task); + task->rdb_channel_conn = NULL; + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + } + + /* If in the wait stream EOF or streaming buffer state, we need to close the + * client that was created for the main channel. */ + if (task->main_channel_conn && + (task->state == ASM_STREAMING_BUF || task->state == ASM_WAIT_STREAM_EOF)) + { + client *c = connGetPrivateData(task->main_channel_conn); + serverAssert(c->task == task); + task->main_channel_conn = NULL; + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + } + + /* Close the connections */ + if (task->rdb_channel_conn) connClose(task->rdb_channel_conn); + if (task->main_channel_conn) connClose(task->main_channel_conn); + task->rdb_channel_conn = NULL; + task->main_channel_conn = NULL; + + /* Clear the replication data buffer */ + asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak, task->sync_buffer.peak); + replDataBufClear(&task->sync_buffer); + + /* Mark the task as failed and notify the cluster */ + task->state = ASM_FAILED; + asmNotifyStateChange(task, ASM_EVENT_IMPORT_FAILED); + /* This node may become replica, only master can setup new slot trimming jobs. */ + if (clusterNodeIsMaster(getMyClusterNode())) + asmTrimJobSchedule(task->slots); +} + +void asmMigrateSetFailed(asmTask *task) { + serverAssert(task->operation == ASM_MIGRATE); + if (task->state == ASM_FAILED) return; + + /* Close the RDB and main channel clients*/ + if (task->rdb_channel_client) { + task->rdb_channel_client->task = NULL; + freeClientAsync(task->rdb_channel_client); + task->rdb_channel_client = NULL; + } + if (task->main_channel_client) { + task->main_channel_client->task = NULL; + freeClientAsync(task->main_channel_client); + task->main_channel_client = NULL; + } + + /* Actually it is not necessary to clear the sync buffer here, + * to make asmTaskReset work properly after migrate task failed */ + replDataBufClear(&task->sync_buffer); + + /* Mark the task as failed and notify the cluster */ + task->state = ASM_FAILED; + asmNotifyStateChange(task, ASM_EVENT_MIGRATE_FAILED); +} + +void asmTaskSetFailed(asmTask *task, const char *fmt, ...) { + va_list ap; + sds error = sdsempty(); + + /* Set the error message */ + va_start(ap, fmt); + error = sdscatvprintf(error, fmt, ap); + va_end(ap); + error = sdscatprintf(error, " (state: %s, rdb_channel_state: %s)", + asmTaskStateToString(task->state), + asmTaskStateToString(task->rdb_channel_state)); + sdsfree(task->error); + task->error = error; + + /* Log the error */ + sds slots_str = slotRangeArrayToString(task->slots); + serverLog(LL_WARNING, "%s task %s failed: slots=%s, err=%s", + task->operation == ASM_IMPORT ? "Import" : "Migrate", + task->id, slots_str, task->error); + sdsfree(slots_str); + + if (task->operation == ASM_IMPORT) + asmImportSetFailed(task); + else + asmMigrateSetFailed(task); +} + +/* The task is completed or canceled. Update stats and move it to + * the archived list. */ +void asmTaskFinalize(asmTask *task) { + listNode *ln = listFirst(asmManager->tasks); + serverAssert(ln->value == task); + + task->source_node = NULL; /* Should never access it */ + task->end_time = server.mstime; + + if (task->operation == ASM_IMPORT) { + asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak, + task->sync_buffer.peak); + replDataBufClear(&task->sync_buffer); /* Not used, so save memory */ + } + + /* Move the task to the archived list */ + listUnlinkNode(asmManager->tasks, ln); + listLinkNodeHead(asmManager->archived_tasks, ln); +} + +static void asmTaskCancel(asmTask *task, const char *reason) { + if (task->state == ASM_CANCELED) return; + + asmTaskSetFailed(task, "Cancelled due to %s", reason); + task->state = ASM_CANCELED; + asmTaskFinalize(task); +} + +void asmImportTakeover(asmTask *task) { + serverAssert(task->state == ASM_WAIT_STREAM_EOF || + task->state == ASM_STREAMING_BUF); + + /* Free the main channel connection since it is no longer needed. */ + serverAssert(task->main_channel_conn != NULL); + client *c = connGetPrivateData(task->main_channel_conn); + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + task->main_channel_conn = NULL; + + task->state = ASM_TAKEOVER; + asmLogTaskEvent(task, ASM_EVENT_TAKEOVER); + clusterAsmOnEvent(task->id, ASM_EVENT_TAKEOVER, task->slots); +} + +void asmCallbackOnFreeClient(client *c) { + asmTask *task = c->task; + if (!task) return; + + /* If the RDB channel connection is closed, mark the task as failed. */ + if (c->conn && task->rdb_channel_conn == c->conn) { + /* We create the client only when transferring data on the RDB channel */ + serverAssert(task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER); + task->rdb_channel_conn = NULL; /* Will be freed by freeClient */ + c->flags &= ~CLIENT_MASTER; + asmTaskSetFailed(task, "RDB channel - Connection is closed"); + return; + } + + if (c->conn && task->main_channel_conn == c->conn) { + /* After or in the process of streaming buffer to DB, a client will be + * created based on the main channel connection. */ + serverAssert(task->state == ASM_STREAMING_BUF || + task->state == ASM_WAIT_STREAM_EOF); + task->main_channel_conn = NULL; /* Will be freed by freeClient */ + c->flags &= ~CLIENT_MASTER; + asmTaskSetFailed(task, "Main channel - Connection is closed"); + return; + } + + if (c == task->rdb_channel_client) { + /* TODO: Detect whether the bgsave is completed successfully and + * update the state properly. */ + task->rdb_channel_state = ASM_COMPLETED; + /* We may not have detected whether the child process has exited yet, + * so we can't determine whether the client has completed the slots + * snapshot transfer. If the RDB channel is interrupted unexpectedly, + * the destination side will also close the main channel. + * So here we just reset the RDB channel client of task. */ + task->rdb_channel_client = NULL; + return; + } + + /* If the main channel client is closed, we need to mark the task as failed + * and clean up the RDB channel client if it exists. */ + if (c == task->main_channel_client) { + task->main_channel_client = NULL; + /* The rdb channel client will be cleaned up */ + asmTaskSetFailed(task, "Main and RDB channel clients are disconnected."); + return; + } +} + +/* Sends an AUTH command to the source node using the internal secret. + * Returns an error string if the command fails, or NULL on success. */ +char *asmSendInternalAuth(connection *conn) { + size_t len = 0; + const char *internal_secret = clusterGetSecret(&len); + serverAssert(internal_secret != NULL); + + sds secret = sdsnewlen(internal_secret, len); + char *err = sendCommand(conn, "AUTH", "internal connection", secret, NULL); + sdsfree(secret); + return err; +} + +/* Handles the RDB channel sync with the source node. + * This function is called when the RDB channel is established + * and ready to sync with the source node. */ +void asmRdbChannelSyncWithSource(connection *conn) { + asmTask *task = connGetPrivateData(conn); + char *err = NULL; + sds task_error_msg = NULL; + + /* Check for errors in the socket: after a non blocking connect() we + * may find that the socket is in error state. */ + if (connGetState(conn) != CONN_STATE_CONNECTED) + goto error; + + /* Check if the task is in a fail point state */ + if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) { + char buf[1]; + /* Simulate a failure by shutting down the connection. On some operating + * systems (e.g. Linux), the socket's receive buffer is not flushed + * immediately, so we issue a dummy read to drain any pending data and + * surface the error condition. + * using shutdown() instead of connShutdown() because connTLSShutdown() + * will free the connection directly, which is not what we want. */ + shutdown(conn->fd, SHUT_RDWR); + connRead(conn, buf, 1); + } + + if (task->rdb_channel_state == ASM_CONNECTING) { + connSetReadHandler(conn, asmRdbChannelSyncWithSource); + connSetWriteHandler(conn, NULL); + + /* Send AUTH command to source node using internal auth */ + err = asmSendInternalAuth(conn); + if (err) goto write_error; + task->rdb_channel_state = ASM_AUTH_REPLY; + return; + } + + if (task->rdb_channel_state == ASM_AUTH_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+OK` reply */ + if (!strcmp(err, "+OK")) { + sdsfree(err); + err = NULL; + task->rdb_channel_state = ASM_RDBCHANNEL_REQUEST; + serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots rdb channel operation can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to AUTH from source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->rdb_channel_state == ASM_RDBCHANNEL_REQUEST) { + err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "RDBCHANNEL", task->id, NULL); + if (err) goto write_error; + task->rdb_channel_state = ASM_RDBCHANNEL_REPLY; + return; + } + + if (task->rdb_channel_state == ASM_RDBCHANNEL_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Ignore ‘\n' sent from the source node to keep the connection alive. */ + if (sdslen(err) == 0) { + serverLog(LL_DEBUG, "Received an empty line in RDBCHANNEL reply, slots snapshot delivery will start later"); + sdsfree(err); + return; + } + + /* Check `+SLOTSSNAPSHOT` reply */ + if (!strncmp(err, "+SLOTSSNAPSHOT", strlen("+SLOTSSNAPSHOT"))) { + sdsfree(err); + err = NULL; + task->state = ASM_ACCUMULATE_BUF; + /* The main channel buffers pending commands. */ + connSetReadHandler(task->main_channel_conn, asmSyncBufferReadFromConn); + + task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER; + client *c = createClient(conn); + c->flags |= (CLIENT_MASTER | CLIENT_INTERNAL | CLIENT_ASM_IMPORTING); + c->querybuf = sdsempty(); + c->authenticated = 1; + c->user = NULL; + c->task = task; + serverLog(LL_NOTICE, + "Source node replied to SLOTSSNAPSHOT, syncing slots snapshot can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to CLUSTER SYNCSLOTS RDBCHANNEL from the source: %s", err); + sdsfree(err); + goto error; + } + return; + } + return; + +no_response_error: + task_error_msg = sdsnew("Source node did not respond to command during RDBCHANNELSYNCSLOTS handshake"); + /* Fall through to regular error handling */ + +error: + asmTaskSetFailed(task, "RDB channel - Failed to sync with the source node: %s", + task_error_msg ? task_error_msg : connGetLastError(conn)); + sdsfree(task_error_msg); + return; + +write_error: /* Handle sendCommand() errors. */ + task_error_msg = sdscatprintf(sdsempty(), "Failed to send command to the source node: %s", err); + sdsfree(err); + goto error; +} + +char *asmSendSlotRangesSync(connection *conn, asmTask *task) { + /* Prepare CLUSTER SYNCSLOTS SYNC command */ + serverAssert(task->slots->num_ranges <= CLUSTER_SLOTS); + int argc = task->slots->num_ranges * 2 + 4; + char **args = zcalloc(sizeof(char*) * argc); + size_t *lens = zcalloc(sizeof(size_t) * argc); + + args[0] = "CLUSTER"; + args[1] = "SYNCSLOTS"; + args[2] = "SYNC"; + args[3] = task->id; + lens[0] = strlen("CLUSTER"); + lens[1] = strlen("SYNCSLOTS"); + lens[2] = strlen("SYNC"); + lens[3] = sdslen(task->id); + + int i = 4; + for (int j = 0; j < task->slots->num_ranges; j++) { + slotRange *sr = &task->slots->ranges[j]; + args[i] = sdscatprintf(sdsempty(), "%d", sr->start); + lens[i] = sdslen(args[i]); + args[i+1] = sdscatprintf(sdsempty(), "%d", sr->end); + lens[i+1] = sdslen(args[i+1]); + i += 2; + } + serverAssert(i == argc); + + /* Send command to source node */ + char *err = sendCommandArgv(conn, argc, args, lens); + + /* Free allocated memory */ + for (int j = 4; j < argc; j++) { + sdsfree(args[j]); + } + zfree(args); + zfree(lens); + + return err; +} + +void asmSyncWithSource(connection *conn) { + asmTask *task = connGetPrivateData(conn); + char *err = NULL; + + /* Some task errors are not network issues, we record them explicitly. */ + sds task_error_msg = NULL; + + /* Check for errors in the socket: after a non blocking connect() we + * may find that the socket is in error state. */ + if (connGetState(conn) != CONN_STATE_CONNECTED) + goto error; + + /* Check if the fail point is active for this channel and state */ + if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_MAIN_CHANNEL, task->state))) { + char buf[1]; + shutdown(conn->fd, SHUT_RDWR); + connRead(conn, buf, 1); + } + + if (task->state == ASM_CONNECTING) { + connSetReadHandler(conn, asmSyncWithSource); + connSetWriteHandler(conn, NULL); + /* Send AUTH command to source node using internal auth */ + err = asmSendInternalAuth(conn); + if (err) goto write_error; + task->state = ASM_AUTH_REPLY; + return; + } + + if (task->state == ASM_AUTH_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+OK` reply */ + if (!strcmp(err, "+OK")) { + sdsfree(err); + err = NULL; + task->state = ASM_SEND_HANDSHAKE; + serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to AUTH from the source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->state == ASM_SEND_HANDSHAKE) { + sds node_id = sdsnewlen(clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN); + err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "CONF", "NODE-ID", node_id, NULL); + sdsfree(node_id); + if (err) goto write_error; + task->state = ASM_HANDSHAKE_REPLY; + return; + } + + if (task->state == ASM_HANDSHAKE_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+OK` reply */ + if (!strcmp(err, "+OK")) { + sdsfree(err); + err = NULL; + task->state = ASM_SEND_SYNCSLOTS; + serverLog(LL_NOTICE, "Source node replied to SYNCSLOTS CONF command, syncslots can continue..."); + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to CLUSTER SYNCSLOTS CONF from the source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->state == ASM_SEND_SYNCSLOTS) { + err = asmSendSlotRangesSync(conn, task); + if (err) goto write_error; + + task->state = ASM_SYNCSLOTS_REPLY; + return; + } + + if (task->state == ASM_SYNCSLOTS_REPLY) { + err = receiveSynchronousResponse(conn); + /* The source node did not reply */ + if (err == NULL) goto no_response_error; + + /* Check `+RDBCHANNELSYNCSLOTS` reply */ + if (!strncmp(err, "+RDBCHANNELSYNCSLOTS", strlen("+RDBCHANNELSYNCSLOTS"))) { + sdsfree(err); + err = NULL; + task->state = ASM_INIT_RDBCHANNEL; + serverLog(LL_NOTICE, + "Source node replied to SYNCSLOTS SYNC, syncslots can continue..."); + } else if (!strncmp(err, "-NOTREADY", strlen("-NOTREADY"))) { + /* The source-side cluster is temporarily not ready to start a + * migration and replied -NOTREADY. We could fail this attempt and + * let the import task start another attempt later but that could + * trigger unnecessary cleanup in the cluster implementation. + * Instead, we'll retry sending SYNCSLOTS later in asmCron(). */ + sdsfree(err); + task->state = ASM_SEND_SYNCSLOTS; + serverLog(LL_NOTICE, + "Source node replied to SYNCSLOTS SYNC with -NOTREADY, will retry later..."); + return; + } else { + task_error_msg = sdscatprintf(sdsempty(), + "Error reply to CLUSTER SYNCSLOTS SYNC from the source: %s", err); + sdsfree(err); + goto error; + } + } + + if (task->state == ASM_INIT_RDBCHANNEL) { + /* Create RDB channel connection */ + char *ip = clusterNodeIp(task->source_node); + int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) : + clusterNodeTcpPort(task->source_node); + task->rdb_channel_conn = connCreate(server.el, connTypeOfReplication()); + if (connConnect(task->rdb_channel_conn, ip, port, + server.bind_source_addr, asmRdbChannelSyncWithSource) == C_ERR) + { + serverLog(LL_WARNING, "Unable to connect to the source node: %s", + connGetLastError(task->rdb_channel_conn)); + goto error; + } + task->rdb_channel_state = ASM_CONNECTING; + connSetPrivateData(task->rdb_channel_conn, task); + serverLog(LL_NOTICE, + "RDB channel connection to source node %.40s established, waiting for AUTH reply...", + task->source); + + /* Main channel waits for the new event */ + connSetReadHandler(conn, NULL); + return; + } + return; + +no_response_error: + serverLog(LL_WARNING, "Source node did not respond to command during SYNCSLOTS handshake"); + /* Fall through to regular error handling */ + +error: + asmTaskSetFailed(task, "Main channel - Failed to sync with source node: %s", + task_error_msg ? task_error_msg : connGetLastError(conn)); + sdsfree(task_error_msg); + return; + +write_error: /* Handle sendCommand() errors. */ + serverLog(LL_WARNING, "Failed to send command to source node: %s", err); + sdsfree(err); + goto error; +} + +int asmImportSendACK(asmTask *task) { + serverAssert(task->operation == ASM_IMPORT && task->state == ASM_WAIT_STREAM_EOF); + serverLog(LL_DEBUG, "Destination node applied offset is %lld", task->dest_offset); + + char offset[64]; + ull2string(offset, sizeof(offset), task->dest_offset); + + char *err = sendCommand(task->main_channel_conn, "CLUSTER", "SYNCSLOTS", "ACK", + asmTaskStateToString(task->state), offset, NULL); + if (err) { + asmTaskSetFailed(task, "Main channel - Failed to send ACK: %s", err); + sdsfree(err); + return C_ERR; + } + return C_OK; +} + +/* Called when the RDB channel begins sending the snapshot. + * From this point on, the main channel also starts sending incremental streams. */ +void asmSlotSnapshotAndStreamStart(struct asmTask *task) { + if (task == NULL || task->state != ASM_WAIT_BGSAVE_START) return; + + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_RDB_CHANNEL, task->state))) { + shutdown(task->rdb_channel_client->conn->fd, SHUT_RDWR); + return; + } + task->main_channel_client->replstate = SLAVE_STATE_SEND_BULK_AND_STREAM; + + task->state = ASM_SEND_BULK_AND_STREAM; + task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER; + + /* From the source node's perspective, the destination node begins to accumulate + * the buffer while the RDB channel starts applying the slot snapshot data. */ + task->dest_state = ASM_ACCUMULATE_BUF; + task->dest_slots_snapshot_time = server.mstime; +} + +/* Called when the RDB channel has succeeded in sending the snapshot. */ +void asmSlotSnapshotSucceed(struct asmTask *task) { + if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; + + /* The destination starts sending ACKs to keep the main channel alive after + * receiving the snapshot, so here we need to update the last interaction + * time to avoid false timeout. */ + task->main_channel_client->lastinteraction = server.unixtime; + + task->state = ASM_SEND_STREAM; + task->rdb_channel_state = ASM_COMPLETED; +} + +/* Called when the RDB channel fails to send the snapshot. */ +void asmSlotSnapshotFailed(struct asmTask *task) { + if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; + + asmTaskSetFailed(task, "RDB channel - Failed to send slots snapshot"); +} + +/* CLUSTER SYNCSLOTS SNAPSHOT-EOF + * + * This command is sent by the source node to the destination node to indicate + * that the slots snapshot has ended. */ +void clusterSyncSlotsSnapshotEOF(client *c) { + /* This client is RDB channel connection. */ + asmTask *task = c->task; + if (!task || task->rdb_channel_state != ASM_RDBCHANNEL_TRANSFER || + c->conn != task->rdb_channel_conn) + { + /* Unexpected SNAPSHOT-EOF command */ + serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS SNAPSHOT-EOF command: " + "rdb_channel_state=%s", + asmTaskStateToString(task ? task->rdb_channel_state : ASM_NONE)); + freeClientAsync(c); + return; + } + + /* RDB channel state: ASM_RDBCHANNEL_TRANSFER */ + if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) { + freeClientAsync(c); /* Simulate a failure */ + return; + } + + /* Clear the RDB channel connection */ + task->rdb_channel_conn = NULL; + task->rdb_channel_state = ASM_COMPLETED; + serverLog(LL_NOTICE, "RDB channel snapshot transfer completed for the import task."); + + /* Free the RDB channel connection. */ + c->task = NULL; + c->flags &= ~CLIENT_MASTER; + freeClientAsync(c); + + /* Will start streaming the buffer to DB, don't start here since now + * we are in the context of executing command, otherwise, redis will + * generate a big MULTI-EXEC including all the commands in the buffer. + * just update the state here, and do it in beforeSleep(). */ + task->state = ASM_READY_TO_STREAM; + connSetReadHandler(task->main_channel_conn, NULL); +} + +/* CLUSTER SYNCSLOTS STREAM-EOF + * + * This command is sent by the source node to the destination node to indicate + * that the slot sync stream has ended and the slots can be handed off. */ +void clusterSyncSlotsStreamEOF(client *c) { + asmTask *task = c->task; + + if (!task || task->operation != ASM_IMPORT) { + serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF command"); + freeClientAsync(c); + return; + } + + if (task->state == ASM_STREAMING_BUF) { + /* We are still streaming the buffer to DB, mark the EOF received, and we + * can take over after streaming is EOF. Since we may release the context + * in asmImportTakeover, this breaks the context for streaming buffer. */ + task->stream_eof_during_streaming = 1; + serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received during streaming buffer"); + return; + } + + if (task->state != ASM_WAIT_STREAM_EOF) { + serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF state: %s", + asmTaskStateToString(task->state)); + freeClientAsync(c); + return; + } + serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received when waiting for STREAM-EOF"); + + /* STREAM-EOF received, the source is ready to handoff, takeover now. */ + asmImportTakeover(task); +} + +/* Start the import task. */ +static void asmStartImportTask(asmTask *task) { + if (task->operation != ASM_IMPORT || task->state != ASM_NONE) return; + sds slots_str = slotRangeArrayToString(task->slots); + + /* Sanity check: Clean up any keys that exist in slots not owned by this node. + * This handles cases where users previously migrated slots using legacy method + * but left behind orphaned keys, or maybe cluster missed cleaning up during + * previous operations, which could interfere with the ASM import process. */ + asmTrimSlotsIfNotOwned(task->slots); + + /* Check if there is any trim job in progress for the slot ranges. + * We can't start the import task since the trim job will modify the data.*/ + int trim_in_progress = asmIsAnyTrimJobOverlaps(task->slots); + + /* Notify the cluster implementation to prepare for the import task. */ + int impl_ret = clusterAsmOnEvent(task->id, ASM_EVENT_IMPORT_PREP, task->slots); + + /* We do not start the import task if trim is disabled by module. */ + int disabled_by_module = server.cluster_module_trim_disablers > 0; + + static int start_blocked_logged = 0; + /* Cannot start import task since pause action is performed. Otherwise, we + * will break the promise that no writes are performed during the pause. */ + if (isPausedActions(PAUSE_ACTION_CLIENT_ALL) || + isPausedActions(PAUSE_ACTION_CLIENT_WRITE) || + trim_in_progress || + impl_ret != C_OK || + disabled_by_module) + { + const char *reason = disabled_by_module ? "trim is disabled by module" : + impl_ret != C_OK ? "cluster is not ready" : + trim_in_progress ? "trim in progress for some of the slots" : + "server paused"; + if (start_blocked_logged == 0) { + serverLog(LL_WARNING, "Can not start import task %s for slots: %s due to %s", + task->id, slots_str, reason); + start_blocked_logged = 1; + } + sdsfree(slots_str); + return; + } + start_blocked_logged = 0; /* Reset the log flag */ + + /* Detect if the cluster topology is changed. We should cancel the task if + * we can not schedule it, and update the source node if needed. */ + sds err = NULL; + clusterNode *source = validateImportSlotRanges(task->slots, &err, task); + if (!source) { + asmTaskCancel(task, err); + sdsfree(slots_str); + sdsfree(err); + return; + } + /* Now I'm the owner of the slot range, cancel the import task. */ + if (source == getMyClusterNode()) { + asmTaskCancel(task, "slots owned by myself now"); + sdsfree(slots_str); + return; + } + /* Change the source node if needed. */ + if (source != task->source_node) { + task->source_node = source; + memcpy(task->source, clusterNodeGetName(source), CLUSTER_NAMELEN); + serverLog(LL_NOTICE, "Import task %s source node changed: slots=%s, " + "new_source=%.40s", task->id, slots_str, clusterNodeGetName(source)); + } + sdsfree(slots_str); + + task->state = ASM_CONNECTING; + task->start_time = server.mstime; + asmNotifyStateChange(task, ASM_EVENT_IMPORT_STARTED); + + task->main_channel_conn = connCreate(server.el, connTypeOfReplication()); + char *ip = clusterNodeIp(task->source_node); + int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) : + clusterNodeTcpPort(task->source_node); + if (connConnect(task->main_channel_conn, ip, port, server.bind_source_addr, + asmSyncWithSource) == C_ERR) + { + asmTaskSetFailed(task, "Main channel - Failed to connect to source node: %s", + connGetLastError(task->main_channel_conn)); + return; + } + connSetPrivateData(task->main_channel_conn, task); +} + +void clusterSyncSlotsCommand(client *c) { + /* Only internal clients are allowed to execute this command to avoid + * potential attack, since some state changes are not well protected, + * external clients may damage the slot migration state. */ + if (!(c->flags & (CLIENT_INTERNAL | CLIENT_MASTER))) { + addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for internal clients"); + c->flags |= CLIENT_CLOSE_AFTER_REPLY; + return; + } + + /* On replica, only allow master client to execute CONF subcommand. */ + if (!clusterNodeIsMaster(getMyClusterNode())) { + if (!(c->flags & CLIENT_MASTER)) { + /* Not master client, reject all subcommands and close the connection. */ + addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for master"); + c->flags |= CLIENT_CLOSE_AFTER_REPLY; + return; + } else { + /* Only allow CONF subcommand on replica. */ + if (strcasecmp(c->argv[2]->ptr, "conf")) return; + } + } + + if (!strcasecmp(c->argv[2]->ptr, "sync") && c->argc >= 6) { + /* CLUSTER SYNCSLOTS SYNC <ID> <start-slot> <end-slot> [<start-slot> <end-slot>] */ + if (c->argc % 2 == 1) { + addReplyErrorArity(c); + return; + } + + slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 4); + if (!slots) return; + + /* Validate that the slot ranges are valid and that migration can be + * initiated for them. */ + sds err = NULL; + clusterNode *source = validateImportSlotRanges(slots, &err, NULL); + if (!source) { + addReplyErrorSds(c, err); + slotRangeArrayFree(slots); + return; + } + + /* Check if the source node is the same as the current node. */ + if (source != getMyClusterNode()) { + addReplyError(c, "This node is not the owner of the slots"); + slotRangeArrayFree(slots); + return; + } + + /* Verify the destination node is known and is a master. */ + if (c->node_id) { + clusterNode *dest = clusterLookupNode(c->node_id, CLUSTER_NAMELEN); + if (dest == NULL || !clusterNodeIsMaster(dest)) { + addReplyErrorFormat(c, "Destination node %.40s is not a master", c->node_id); + slotRangeArrayFree(slots); + return; + } + } + + sds task_id = c->argv[3]->ptr; + /* Notify the cluster implementation to prepare for the migrate task. */ + if (clusterAsmOnEvent(task_id, ASM_EVENT_MIGRATE_PREP, slots) != C_OK || + asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, ASM_NONE)) + { + addReplyError(c, "-NOTREADY Cluster is not ready to migrate slots"); + slotRangeArrayFree(slots); + return; + } + + /* We do not start the migrate task if trim is disabled by module. */ + int disabled_by_module = server.cluster_module_trim_disablers > 0; + if (disabled_by_module) { + addReplyError(c, "Trim is disabled by module"); + slotRangeArrayFree(slots); + return; + } + + asmTask *task = listLength(asmManager->tasks) == 0 ? NULL : + listNodeValue(listFirst(asmManager->tasks)); + if (task && !strcmp(task->id, task_id) && + task->operation == ASM_MIGRATE && task->state == ASM_FAILED && + slotRangeArrayIsEqual(slots, task->slots) && + memcmp(task->dest, c->node_id, CLUSTER_NAMELEN) == 0) + { + /* Reuse the failed task */ + asmTaskReset(task); + slotRangeArrayFree(task->slots); /* Will be set again later */ + task->retry_count++; + } else if (task) { + if (task->state == ASM_FAILED) { + /* We can create a new migrate task only if the current one is + * failed, cancel the failed task to create a new one. */ + asmTaskCancel(task, "new migration requested"); + task = NULL; + } else { + addReplyError(c, "Another ASM task is already in progress"); + slotRangeArrayFree(slots); + return; + } + } + + /* Create the migrate slots task and add it to the list, + * otherwise reuse the existing one */ + if (task == NULL) { + task = asmTaskCreate(task_id); + task->start_time = server.mstime; /* Start immediately */ + serverAssert(listLength(asmManager->tasks) == 0); + listAddNodeTail(asmManager->tasks, task); + } + + task->slots = slots; + task->operation = ASM_MIGRATE; + memcpy(task->source, clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN); + if (c->node_id) memcpy(task->dest, c->node_id, CLUSTER_NAMELEN); + + task->main_channel_client = c; + c->task = task; + + /* We mark the main channel client as a replica, so this client is limited + * by the client output buffer settings for replicas. The replstate has + * no real significance, just to prevent it from going online. */ + c->flags |= (CLIENT_SLAVE | CLIENT_ASM_MIGRATING); + c->replstate = SLAVE_STATE_WAIT_RDB_CHANNEL; + if (server.repl_disable_tcp_nodelay) + connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */ + listAddNodeTail(server.slaves, c); + createReplicationBacklogIfNeeded(); + + /* Wait for RDB channel to be ready */ + task->state = ASM_WAIT_RDBCHANNEL; + + sds slots_str = slotRangeArrayToString(slots); + serverLog(LL_NOTICE, "Migrate task %s created: src=%.40s, dest=%.40s, slots=%s", + task->id, task->source, task->dest, slots_str); + sdsfree(slots_str); + + asmNotifyStateChange(task, ASM_EVENT_MIGRATE_STARTED); + + /* Keep the client in the main thread to avoid data races between the + * connWrite call below and the client's event handler in IO threads. */ + if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); + + /* addReply*() is not suitable for clients in SLAVE_STATE_WAIT_RDB_CHANNEL state. */ + if (connWrite(c->conn, "+RDBCHANNELSYNCSLOTS\r\n", 22) != 22) + freeClientAsync(c); + } else if (!strcasecmp(c->argv[2]->ptr, "rdbchannel") && c->argc == 4) { + /* CLUSTER SYNCSLOTS RDBCHANNEL <task-id> */ + sds task_id = c->argv[3]->ptr; + if (sdslen(task_id) != CLUSTER_NAMELEN) { + addReplyError(c, "Invalid task id"); + return; + } + + if (listLength(asmManager->tasks) == 0) { + addReplyError(c, "No slot migration task in progress"); + return; + } + + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->operation != ASM_MIGRATE || task->state != ASM_WAIT_RDBCHANNEL || + strcmp(task->id, task_id) != 0) + { + addReplyError(c, "Another migration task is already in progress"); + return; + } + + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) { + /* Close the main channel client before rdb channel client connects */ + if (task->main_channel_client) + freeClient(task->main_channel_client); + } + + /* The main channel client must be present when setting RDB channel client */ + if (task->main_channel_client == NULL) { + /* Maybe the main channel connection is closed. */ + addReplyError(c, "Main channel connection is not established"); + return; + } + + /* Mark the client as a slave to generate slots snapshot */ + c->flags |= (CLIENT_SLAVE | CLIENT_REPL_RDB_CHANNEL | CLIENT_REPL_RDBONLY | CLIENT_ASM_MIGRATING); + c->slave_capa |= SLAVE_CAPA_EOF; + c->slave_req |= (SLAVE_REQ_SLOTS_SNAPSHOT | SLAVE_REQ_RDB_CHANNEL); + c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; + c->repldbfd = -1; + if (server.repl_disable_tcp_nodelay) + connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */ + listAddNodeTail(server.slaves, c); + + /* Wait for bgsave to start for slots sync */ + task->state = ASM_WAIT_BGSAVE_START; + task->rdb_channel_state = ASM_WAIT_BGSAVE_START; + task->rdb_channel_client = c; + c->task = task; + + /* Keep the client in the main thread to avoid data races between the + * connWrite call in startBgsaveForReplication and the client's event + * handler in IO threads. */ + if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); + + if (!hasActiveChildProcess()) { + startBgsaveForReplication(c->slave_capa, c->slave_req); + } else { + serverLog(LL_NOTICE, "BGSAVE for slots snapshot sync delayed"); + } + } else if (!strcasecmp(c->argv[2]->ptr, "snapshot-eof") && c->argc == 3) { + /* CLUSTER SYNCSLOTS SNAPSHOT-EOF */ + clusterSyncSlotsSnapshotEOF(c); + } else if (!strcasecmp(c->argv[2]->ptr, "stream-eof") && c->argc == 3) { + /* CLUSTER SYNCSLOTS STREAM-EOF */ + clusterSyncSlotsStreamEOF(c); + } else if (!strcasecmp(c->argv[2]->ptr, "ack") && c->argc == 5) { + /* CLUSTER SYNCSLOTS ACK <state> <offset> */ + long long offset; + int dest_state; + + if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_STREAMING_BUF))) { + dest_state = ASM_STREAMING_BUF; + } else if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_WAIT_STREAM_EOF))) { + dest_state = ASM_WAIT_STREAM_EOF; + } else { + return; /* Not support now. */ + } + + if ((getLongLongFromObject(c->argv[4], &offset) != C_OK)) + return; + + if (c->task && c->task->operation == ASM_MIGRATE) { + /* Update the state and ACKed offset from destination. */ + asmTask *task = c->task; + task->dest_state = dest_state; + if (task->dest_offset > (unsigned long long) offset) { + serverLog(LL_WARNING, "CLUSTER SYNCSLOTS ACK received, dest state: %s, " + "but offset %lld is less than the current dest offset %lld", + asmTaskStateToString(dest_state), offset, task->dest_offset); + return; + } + task->dest_offset = offset; + serverLog(LL_DEBUG, "CLUSTER SYNCSLOTS ACK received, dest state: %s, " + "updated dest offset to %lld, source offset: %lld", + asmTaskStateToString(dest_state), task->dest_offset, task->source_offset); + + /* Record the time when the destination finishes applying the accumulated buffer */ + if (task->dest_state == ASM_WAIT_STREAM_EOF && task->dest_accum_applied_time == 0) + task->dest_accum_applied_time = server.mstime; + + /* Pause write if needed */ + if (task->state == ASM_SEND_BULK_AND_STREAM || task->state == ASM_SEND_STREAM) { + /* Pause writes on the main channel if the lag is less than the threshold. */ + if (task->dest_offset + server.asm_handoff_max_lag_bytes >= task->source_offset) { + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, ASM_HANDOFF_PREP))) + return; /* Do not enter handoff prep state for testing buffer drain timeout. */ + + serverLog(LL_NOTICE, "The applied offset lag %lld is less than the threshold %lld, " + "pausing writes for slot handoff", + task->source_offset - task->dest_offset, + server.asm_handoff_max_lag_bytes); + task->state = ASM_HANDOFF_PREP; + asmLogTaskEvent(task, ASM_EVENT_HANDOFF_PREP); + clusterAsmOnEvent(task->id, ASM_EVENT_HANDOFF_PREP, task->slots); + } + } + } + } else if (!strcasecmp(c->argv[2]->ptr, "fail") && c->argc == 4) { + /* CLUSTER SYNCSLOTS FAIL <err> */ + return; /* This is a no-op, just to handle the command syntax. */ + } else if (!strcasecmp(c->argv[2]->ptr, "conf") && c->argc >= 5) { + /* CLUSTER SYNCSLOTS CONF <option> <value> [<option> <value>] */ + for (int j = 3; j < c->argc; j += 2) { + if (j + 1 >= c->argc) { + addReplyErrorArity(c); + return; + } + /* Handle each option here */ + if (!strcasecmp(c->argv[j]->ptr, "node-id")) { + /* node-id <node-id> */ + sds node_id = c->argv[j + 1]->ptr; + int node_id_len = (int) sdslen(node_id); + if (node_id_len != CLUSTER_NAMELEN) { + addReplyErrorFormat(c, "Invalid node id length %d", node_id_len); + return; + } + + /* Lookup the node in the cluster. */ + clusterNode *node = clusterLookupNode(node_id, node_id_len); + if (node == NULL) { + addReplyErrorFormat(c, "Node %s not found in cluster", node_id); + return; + } + + if (c->node_id) sdsfree(c->node_id); + c->node_id = sdsdup(node_id); + } else if (!strcasecmp(c->argv[j]->ptr, "slot-info")) { + /* slot-info slot:key_size:expire_size */ + int count; + long long slot, key_size, expire_size; + sds slot_info = c->argv[j + 1]->ptr; + sds *parts = sdssplitlen(slot_info, sdslen(slot_info), ":", 1, &count); + + /* Validate the slot info format, parse slot, key_size, expire_size */ + if (parts == NULL || count != 3 || + (string2ll(parts[0], sdslen(parts[0]), &slot) == 0 || slot < 0 || slot >= CLUSTER_SLOTS) || + (string2ll(parts[1], sdslen(parts[1]), &key_size) == 0 || key_size < 0) || + (string2ll(parts[2], sdslen(parts[2]), &expire_size) == 0 || expire_size < 0)) + { + addReplyErrorFormat(c, "Invalid slot info: %s", slot_info); + sdsfreesplitres(parts, count); + return; + } + + /* We resize individual slot specific dictionaries. */ + redisDb *db = c->db; + serverAssert(db->id == 0); /* Only support DB 0 for cluster mode. */ + kvstoreDictExpand(db->keys, slot, key_size); + kvstoreDictExpand(db->expires, slot, expire_size); + + sdsfreesplitres(parts, count); + } else if (!strcasecmp(c->argv[j]->ptr, "asm-task")) { + /* asm-task task_id:source_node:dest_node:operation:state:slot_ranges */ + if (clusterNodeIsMaster(getMyClusterNode())) { + addReplyError(c, "CLUSTER SYNCSLOTS CONF ASM-TASK only allowed on replica"); + return; + } + if (asmReplicaHandleMasterTask(c->argv[j + 1]->ptr) != C_OK) { + addReplyErrorFormat(c, "Failed to handle master task: %s", + (char *)c->argv[j + 1]->ptr); + } + } else if (!strcasecmp(c->argv[j]->ptr, "capa")) { + /* Ignore unrecognized capabilities. This is for future extensions. */ + } else { + addReplyErrorFormat(c, "Unknown option %s", (char *)c->argv[j]->ptr); + } + } + addReply(c, shared.ok); + } else { + addReplyErrorObject(c, shared.syntaxerr); + } +} + +/* Save a key-value pair to stream I/O using either RESTORE or AOF format. */ +static int slotSnapshotSaveKeyValuePair(rio *rdb, kvobj *o, int dbid) { + /* Get the expire time */ + long long expiretime = kvobjGetExpire(o); + + /* Set on stack string object for key */ + robj key; + initStaticStringObject(key, kvobjGetKey(o)); + + /* If module object or non-string object that is not too big, + * use RESTORE command (RDB format) to migrate data. + * Generally RDB binary format is more efficient, but it may cause + * block in the destination if the object is too large, so fall back + * to AOF format if necessary. */ + if ((o->type == OBJ_MODULE) || + (o->type != OBJ_STRING && getObjectLength(o) <= ASM_AOF_MIN_ITEMS_PER_KEY)) + { + if (rioWriteBulkCount(rdb, '*', 5) == 0) return C_ERR; + if (rioWriteBulkString(rdb, "RESTORE", 7) == 0) return C_ERR; + if (rioWriteBulkObject(rdb, &key) == 0) return C_ERR; + if (rioWriteBulkLongLong(rdb, expiretime == -1 ? 0 : expiretime) == 0) return C_ERR; + + /* Create the DUMP encoded representation. */ + rio payload; + createDumpPayload(&payload, o, &key, dbid, 1); + sds buf = payload.io.buffer.ptr; + if (rioWriteBulkString(rdb, buf, sdslen(buf)) == 0) { + sdsfree(payload.io.buffer.ptr); + return C_ERR; + } + sdsfree(payload.io.buffer.ptr); + + /* Write ABSTTL */ + if (rioWriteBulkString(rdb, "ABSTTL", 6) == 0) return C_ERR; + } else { + /* Use AOF format to migrate data */ + if (rewriteObject(rdb, &key, o, dbid, expiretime) == C_ERR) return C_ERR; + } + + return C_OK; +} + +/* Modules can use RM_ClusterPropagateForSlotMigration() during the + * CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE event to propagate commands + * that should be delivered just before the slot snapshot delivery starts. This + * function triggers the event, collects the commands and writes them to the rio. */ +static int propagateModuleCommands(asmTask *task, rio *rdb) { + RedisModuleClusterSlotMigrationInfo info = { + .version = REDISMODULE_CLUSTER_SLOT_MIGRATION_INFO_VERSION, + .task_id = task->id, + .slots = (RedisModuleSlotRangeArray *) task->slots + }; + memcpy(info.source_node_id, task->source, CLUSTER_NAMELEN); + memcpy(info.destination_node_id, task->dest, CLUSTER_NAMELEN); + + task->pre_snapshot_module_cmds = zcalloc(sizeof(*task->pre_snapshot_module_cmds)); + moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION, + REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE, + &info + ); + + int ret = C_OK; + /* Write the module commands to the rio */ + for (int i = 0; i < task->pre_snapshot_module_cmds->numops; i++) { + redisOp *op = &task->pre_snapshot_module_cmds->ops[i]; + if (rioWriteBulkCount(rdb, '*', op->argc) == 0) { + ret = C_ERR; + break; + } + for (int j = 0; j < op->argc; j++) + if (rioWriteBulkObject(rdb, op->argv[j]) == 0) { + ret = C_ERR; + break; + } + } + redisOpArrayFree(task->pre_snapshot_module_cmds); + zfree(task->pre_snapshot_module_cmds); + task->pre_snapshot_module_cmds = NULL; + return ret; +} + +/* Save the slot ranges snapshot to the file. It generates the DUMP encoded + * representation of each key in the slot ranges and writes it to the file. + * + * Returns C_OK on success, or C_ERR on error. */ +int slotSnapshotSaveRio(int req, rio *rdb, int *error) { + serverAssert(req & SLAVE_REQ_SLOTS_SNAPSHOT); + + dictEntry *de; + kvstoreDictIterator kvs_di; + + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_RDB_CHANNEL, ASM_SEND_BULK_AND_STREAM))) + rioAbort(rdb); /* Simulate a failure */ + + /* Disable RDB compression for slots snapshot since compression is too + * expensive both in source and destination. */ + server.rdb_compression = 0; + + /* Only support a single migrate task */ + serverAssert(listLength(asmManager->tasks) == 1); + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + serverAssert(task->operation == ASM_MIGRATE); + + if (propagateModuleCommands(task, rdb) == C_ERR) goto werr; + + /* Dump functions and send to destination side. */ + rio payload; + createFunctionDumpPayload(&payload); + sds functions = payload.io.buffer.ptr; + if (rioWriteBulkCount(rdb, '*', 4) == 0) goto werr; + if (rioWriteBulkString(rdb, "FUNCTION", 8) == 0) goto werr; + if (rioWriteBulkString(rdb, "RESTORE", 7) == 0) goto werr; + if (rioWriteBulkString(rdb, functions, sdslen(functions)) == 0) { + sdsfree(payload.io.buffer.ptr); + goto werr; + } + sdsfree(payload.io.buffer.ptr); + /* Add the REPLACE option to the RESTORE command, to avoid error + * when migrating to a node with existing libraries. */ + if (rioWriteBulkString(rdb, "REPLACE", 7) == 0) goto werr; + + for (int i = 0; i < server.dbnum; i++) { + char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; + redisDb *db = server.db + i; + if (kvstoreSize(db->keys) == 0) continue; + + /* SELECT the new DB */ + if (rioWrite(rdb,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; + if (rioWriteBulkLongLong(rdb, i) == 0) goto werr; + + /* Iterate all slot ranges, and generate the DUMP encoded + * representation of each key in the DB. */ + for (int j = 0; j < task->slots->num_ranges; j++) { + slotRange *sr = &task->slots->ranges[j]; + /* Iterate all keys in the slot range */ + for (int k = sr->start; k <= sr->end; k++) { + int send_slot_info = 0; + + kvstoreInitDictIterator(&kvs_di, server.db->keys, k); + while ((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) { + /* Send slot info before the first key in the slot */ + if (!send_slot_info) { + /* Format slot info */ + char buf[128]; + int len = snprintf(buf, sizeof(buf), "%d:%lu:%lu", + k, kvstoreDictSize(db->keys, k), + kvstoreDictSize(db->expires, k)); + serverAssert(len > 0 && len < (int)sizeof(buf)); + + /* Send slot info */ + if (rioWriteBulkCount(rdb, '*', 5) == 0) goto werr2; + if (rioWriteBulkString(rdb, "CLUSTER", 7) == 0) goto werr2; + if (rioWriteBulkString(rdb, "SYNCSLOTS", 9) == 0) goto werr2; + if (rioWriteBulkString(rdb, "CONF", 4) == 0) goto werr2; + if (rioWriteBulkString(rdb, "SLOT-INFO", 9) == 0) goto werr2; + if (rioWriteBulkString(rdb, buf, len) == 0) goto werr2; + send_slot_info = 1; + } + + /* Save a key-value pair */ + kvobj *o = dictGetKV(de); + if (slotSnapshotSaveKeyValuePair(rdb, o, db->id) == C_ERR) goto werr2; + + /* Delay return if required (for testing) */ + if (unlikely(server.rdb_key_save_delay)) { + /* Send buffer to the destination ASAP. */ + if (rioFlush(rdb) == 0) goto werr2; + debugDelay(server.rdb_key_save_delay); + } + } + kvstoreResetDictIterator(&kvs_di); + } + } + } + + /* Write the end of the snapshot file command */ + if (rioWriteBulkCount(rdb, '*', 3) == 0) goto werr; + if (rioWriteBulkString(rdb, "CLUSTER", 7) == 0) goto werr; + if (rioWriteBulkString(rdb, "SYNCSLOTS", 9) == 0) goto werr; + if (rioWriteBulkString(rdb, "SNAPSHOT-EOF", 12) == 0) goto werr; + return C_OK; + +werr2: + kvstoreResetDictIterator(&kvs_di); +werr: + if (error) *error = errno; + return C_ERR; +} + +/* Read error handler for sync buffer */ +static void asmReadSyncBufferErrorHandler(connection *conn) { + if (listLength(asmManager->tasks) == 0) return; + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->state != ASM_ACCUMULATE_BUF && task->state != ASM_STREAMING_BUF) return; + + if (task->state == ASM_STREAMING_BUF) { + freeClient(connGetPrivateData(conn)); + } else { + asmTaskSetFailed(task, "Main channel - Read error: %s", connGetLastError(conn)); + } +} + +/* Read data from connection into sync buffer. */ +static void asmSyncBufferReadFromConn(connection *conn) { + /* The task may be canceled (move to finished list) or failed during streaming buffer. */ + if (listLength(asmManager->tasks) == 0) return; + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->state != ASM_ACCUMULATE_BUF && task->state != ASM_STREAMING_BUF) return; + + /* ASM_ACCUMULATE_BUF and ASM_STREAMING_BUF fail points are handled here */ + if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_MAIN_CHANNEL, task->state))) + shutdown(conn->fd, SHUT_RDWR); + + replDataBuf *buf = &task->sync_buffer; + if (task->state == ASM_STREAMING_BUF) { + /* While streaming accumulated buffers, we continue reading from the + * source to prevent accumulation on source side as much as possible. + * However, we aim to drain buffer eventually. To ensure we consume more + * than we read, we'll read at most one block after two blocks of + * buffers are consumed. */ + if (listLength(buf->blocks) + 1 >= buf->last_num_blocks) + return; + buf->last_num_blocks = listLength(buf->blocks); + } + + replDataBufReadFromConn(conn, buf, asmReadSyncBufferErrorHandler); +} + +static void asmSyncBufferStreamYieldCallback(void *ctx) { + replDataBufToDbCtx *context = ctx; + asmTask *task = context->privdata; + client *c = context->client; + + char offset[64]; + ull2string(offset, sizeof(offset), context->applied_offset); + + char *err = sendCommand(c->conn, "CLUSTER", "SYNCSLOTS", "ACK", + asmTaskStateToString(task->state), offset, NULL); + if (err) { + serverLog(LL_WARNING, "Error sending CLUSTER SYNCSLOTS ACK: %s", err); + sdsfree(err); + freeClient(c); + } + serverLog(LL_DEBUG, "Yielding sending ACK during streaming buffer, applied offset: %zu", + context->applied_offset); +} + +static int asmSyncBufferStreamShouldContinue(void *ctx) { + replDataBufToDbCtx *context = ctx; + + /* If the task is failed or canceled, we should stop streaming immediately. */ + asmTask *task = context->privdata; + if (task->state == ASM_FAILED || task->state == ASM_CANCELED) return 0; + + /* Check the client-close flag only if the task has not failed or been canceled, + * otherwise the client may have already been freed. */ + if (context->client->flags & CLIENT_CLOSE_ASAP) return 0; + + return 1; +} + +/* Stream the sync buffer to the database. */ +void asmSyncBufferStreamToDb(asmTask *task) { + task->state = ASM_STREAMING_BUF; + serverLog(LL_NOTICE, "Starting to stream accumulated buffer for the import task (%zu bytes)", + task->sync_buffer.used); + + /* The buffered stream from the main channel connection into + * the database is processed by a fake client. */ + client *c = createClient(task->main_channel_conn); + c->flags |= (CLIENT_MASTER | CLIENT_INTERNAL | CLIENT_ASM_IMPORTING); + c->querybuf = sdsempty(); + c->authenticated = 1; + c->user = NULL; + c->task = task; + + /* Mark the peek buffer block count. We'll use it to verify we consume + * faster than we read from the source side. */ + task->sync_buffer.last_num_blocks = listLength(task->sync_buffer.blocks); + + /* Continue accumulating during streaming to prevent accumulation on source side. */ + connSetReadHandler(c->conn, asmSyncBufferReadFromConn); + + replDataBufToDbCtx ctx = { + .privdata = task, + .client = c, + .applied_offset = 0, + .should_continue = asmSyncBufferStreamShouldContinue, + .yield_callback = asmSyncBufferStreamYieldCallback, + }; + + /* Start streaming the buffer to the DB. This task may fail due to network + * errors or cancellations. We never release the task immediately; instead, + * it may be moved to the finished list. The actual free happens in serverCron, + * which ensures there is no use-after-free issue. */ + int ret = replDataBufStreamToDb(&task->sync_buffer, &ctx); + + if (ret == C_OK) { + if (task->stream_eof_during_streaming) { + /* STREAM-EOF received during streaming, we can take over now. */ + asmImportTakeover(task); + return; + } + + /* Update the dest offset according to applied bytes. */ + task->dest_offset = ctx.applied_offset; + /* Wait STREAM-EOF from the source node. */ + task->state = ASM_WAIT_STREAM_EOF; + connSetReadHandler(task->main_channel_conn, readQueryFromClient); + serverLog(LL_NOTICE, "Successfully streamed accumulated buffer for the import task, applied offset: %lld", + task->dest_offset); + + if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_MAIN_CHANNEL, task->state))) + shutdown(task->main_channel_conn->fd, SHUT_RDWR); /* Simulate a failure */ + + /* ACK offset after streaming buffer is done. */ + asmImportSendACK(task); + } else { + /* If the task is already canceled or failed, we don't need to do anything here. */ + if (task->state == ASM_FAILED || task->state == ASM_CANCELED) return; + + asmTaskSetFailed(task, "Main channel - Failed to stream into the DB"); + } +} + +void asmImportIncrAppliedBytes(struct asmTask *task, size_t bytes) { + if (!task || task->state != ASM_WAIT_STREAM_EOF) return; + task->dest_offset += bytes; +} + +/* Send STREAM-EOF if the sync buffer stream is drained. */ +void asmSendStreamEofIfDrained(asmTask *task) { + client *c = task->main_channel_client; + + /* The command streams for slot ranges have been drained. */ + if (!clientHasPendingReplies(c)) { + serverLog(LL_NOTICE, "Slot migration command stream drained, sending STREAM-EOF to the destination"); + + if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) + shutdown(c->conn->fd, SHUT_RDWR); + + /* Send STREAM-EOF to indicate the end of the stream. */ + char *err = sendCommand(c->conn, "CLUSTER", "SYNCSLOTS", "STREAM-EOF", NULL); + if (err) { + asmTaskSetFailed(task, "Main channel - Failed to send STREAM-EOF: %s", err); + sdsfree(err); + return; + } + + /* Even though the main channel client is no longer needed, we + * can't close it directly because the destination may still be + * sending ACKs over this connection. Instead, we leave it to the + * destination to close it. We just clear the task and client + * references */ + task->main_channel_client->task = NULL; + task->main_channel_client = NULL; + + /* There may be a delay to handle the disconnection of RDB channel, + * so we clear the task and client references here. */ + if (task->rdb_channel_client != NULL) { + task->rdb_channel_state = ASM_COMPLETED; + task->rdb_channel_client->task = NULL; + freeClientAsync(task->rdb_channel_client); + task->rdb_channel_client = NULL; + } + + task->state = ASM_STREAM_EOF; + } +} + +void asmBeforeSleep(void) { + asmTrimJobProcessPending(); + + if (listLength(asmManager->tasks) == 0) return; + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + + if (task->operation == ASM_IMPORT) { + if (task->state == ASM_NONE) + asmStartImportTask(task); + else if (task->state == ASM_READY_TO_STREAM) + asmSyncBufferStreamToDb(task); + } + + if (task->operation == ASM_MIGRATE) { + if (task->cross_slot_during_propagating) { + asmTaskCancel(task, "propagating cross slot command"); + return; + } + + if (task->state == ASM_HANDOFF) { + /* To avoid long pause, we fail the task if the pause takes too long. */ + if (server.mstime - task->paused_time >= server.asm_write_pause_timeout) { + asmTaskSetFailed(task, "Server paused timeout"); + return; + } + asmSendStreamEofIfDrained(task); + } else if (task->state == ASM_STREAM_EOF) { + /* In state ASM_STREAM_EOF (server is still paused), we are waiting + * for the destination node to broadcast the slot ownership change. + * But maybe the destination node is failed or network is not available, + * the source node may be paused forever. So we fail the task if it + * takes too long. + * + * NOTE: There is a tricky case where the destination node may advertise + * ownership of the slot, causing a temporary configuration conflict. + * However, the configuration will eventually converge. In most cases, + * the destination node becomes the winner, since it bumps its config + * epoch before taking over slot ownership. */ + if (server.mstime - task->paused_time >= server.asm_write_pause_timeout) + asmTaskSetFailed(task, "Server paused timeout"); + } + } +} + +void asmCron(void) { + static unsigned long long asm_cron_runs = 0; + asm_cron_runs++; + + if (listLength(asmManager->tasks) == 0) return; + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + + if (task->operation == ASM_IMPORT) { + if (task->state == ASM_FAILED) { + /* Retry every 1 second */ + if (asm_cron_runs % 10 == 0) { + asmTaskReset(task); + task->retry_count++; + serverAssert(task->state == ASM_NONE); + asmStartImportTask(task); + } + } else if (task->state == ASM_WAIT_STREAM_EOF) { + if (asmImportSendACK(task) == C_ERR) return; + + /* Check if the main channel is timed out */ + client *c = connGetPrivateData(task->main_channel_conn); + serverAssert(c->task == task); + if (server.unixtime - c->lastinteraction > server.repl_timeout) + asmTaskSetFailed(task, "Main channel - Connection timeout"); + } else if (task->state == ASM_ACCUMULATE_BUF && + task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER) + { + /* Check if the RDB channel is timed out */ + client *c = connGetPrivateData(task->rdb_channel_conn); + serverAssert(c->task == task); + if (server.unixtime - c->lastinteraction > server.repl_timeout) + asmTaskSetFailed(task, "RDB channel - Connection timeout"); + } else if (task->state == ASM_SEND_SYNCSLOTS) { + /* Rare case: the source node replied to SYNCSLOTS with -NOTREADY + * because it wasn't ready to start a migration. We'll retry + * SYNCSLOTS every second instead of failing the attempt which could + * trigger unnecessary cleanup in the cluster implementation. */ + if (asm_cron_runs % 10 == 0) + asmSyncWithSource(task->main_channel_conn); + } + } else if (task->operation == ASM_MIGRATE) { + if (task->state == ASM_SEND_STREAM) { + /* Currently, we only need to check the main channel timeout when sending streams. + * For RDB channel connections, the timeout is handled by the socket itself + * during writes in slotSnapshotSaveRio. */ + if (server.unixtime - task->main_channel_client->lastinteraction > server.repl_timeout) + asmTaskSetFailed(task, "Main channel - Connection timeout"); + + /* After the destination applies the accumulated buffer, the source continues + * sending commands for migrating slots. The destination keeps applying them, + * but the gap remains above the acceptable limit, which may cause endless + * synchronization. A timeout check is required to handle this case. + * + * The timeout is calculated as the maximum of two values: + * - A configurable timeout (cluster-slot-migration-sync-buffer-drain-timeout) to + * avoid false positives. + * - A dynamic timeout based on the time that the destination took to apply the + * slot snapshot and the accumulated buffer during slot snapshot delivery. + * The destination should be able to drain the remaining sync buffer in less + * time than this. We multiply it by 2 to be more conservative. */ + if (task->dest_state == ASM_WAIT_STREAM_EOF && task->dest_accum_applied_time && + server.mstime - task->dest_accum_applied_time > + max(server.asm_sync_buffer_drain_timeout, + (task->dest_accum_applied_time - task->dest_slots_snapshot_time) * 2)) + { + asmTaskSetFailed(task, "Sync buffer drain timeout"); + } + } + } + + /* Trim the archived tasks list if it grows too large */ + while (listLength(asmManager->archived_tasks) > (unsigned long)server.asm_max_archived_tasks) { + asmTask *oldest = listNodeValue(listLast(asmManager->archived_tasks)); + asmTaskFree(oldest); + listDelNode(asmManager->archived_tasks, listLast(asmManager->archived_tasks)); + } +} + +/* Cancel a specific task if ID is provided, otherwise cancel all tasks. */ +int clusterAsmCancel(const char *task_id, const char *reason) { + if (asmManager == NULL) return 0; + + if (task_id) { + asmTask *task = asmLookupTaskById(task_id); + if (!task) return 0; /* Not found */ + + asmTaskCancel(task, reason); + return 1; + } else { + int num_cancelled = 0; + listIter li; + listNode *ln; + + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + asmTaskCancel(task, reason); + num_cancelled++; + } + return num_cancelled; + } +} + +/* Cancel all tasks that overlap with the given slot ranges. + * If slots is NULL, cancel all tasks. */ +int clusterAsmCancelBySlotRangeArray(struct slotRangeArray *slots, const char *reason) { + if (asmManager == NULL) return 0; + + int num_cancelled = 0; + listIter li; + listNode *ln; + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (!slots || slotRangeArraysOverlap(task->slots, slots)) { + asmTaskCancel(task, reason); + num_cancelled++; + } + } + return num_cancelled; +} + +/* Cancel the task that overlap with the given slot. */ +int clusterAsmCancelBySlot(int slot, const char *reason) { + slotRange req = {slot, slot}; + if (asmManager == NULL) return 0; + + /* Cancel it if found. */ + asmTask *task = lookupAsmTaskBySlotRange(&req); + if (task) asmTaskCancel(task, reason); + + return task ? 1 : 0; +} + +/* Cancel all tasks that involve the given node. */ +int clusterAsmCancelByNode(void *node, const char *reason) { + if (asmManager == NULL || node == NULL) return 0; + + /* If the node to be deleted is myself, cancel all tasks. */ + clusterNode *n = node; + if (n == getMyClusterNode()) return clusterAsmCancel(NULL, reason); + + int num_cancelled = 0; + listIter li; + listNode *ln; + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + /* Cancel the task if the source node is the one to be deleted, or + * the dest node is the one to be deleted. */ + if (task->source_node == n || + !memcmp(task->dest, clusterNodeGetName(n), CLUSTER_NAMELEN) || + !memcmp(task->source, clusterNodeGetName(n), CLUSTER_NAMELEN)) + { + asmTaskCancel(task, reason); + num_cancelled++; + } + } + return num_cancelled; +} + +/* Check if the slot is in an active ASM task. */ +int isSlotInAsmTask(int slot) { + slotRange req = {slot, slot}; + if (!asmManager) return 0; + + listIter li; + listNode *ln; + listRewind(asmManager->tasks, &li); + while ((ln = listNext(&li)) != NULL) { + asmTask *task = listNodeValue(ln); + if (slotRangeArrayOverlaps(task->slots, &req)) + return 1; + } + return 0; +} + +/* Check if the slot is in a pending trim job. It may happen if we can't trim + * the slots immediately due to a write pause or when active trim is in progress. */ +int isSlotInTrimJob(int slot) { + slotRange req = {slot, slot}; + + if (!asmManager || !asmIsTrimInProgress()) return 0; + + /* Check if the slot is in any pending trim job. */ + listIter li; + listNode *ln; + listRewind(asmManager->pending_trim_jobs, &li); + while ((ln = listNext(&li)) != NULL) { + slotRangeArray *slots = listNodeValue(ln); + if (slotRangeArrayOverlaps(slots, &req)) + return 1; + } + + /* Check if the slot is in any active trim job. */ + listRewind(asmManager->active_trim_jobs, &li); + while ((ln = listNext(&li)) != NULL) { + slotRangeArray *slots = listNodeValue(ln); + if (slotRangeArrayOverlaps(slots, &req)) + return 1; + } + return 0; +} + +int clusterAsmHandoff(const char *task_id, sds *err) { + serverAssert(task_id); + + asmTask *task = asmLookupTaskById(task_id); + if (!task || task->state != ASM_HANDOFF_PREP) { + *err = sdscatprintf(sdsempty(), "No suitable ASM task found for id: %s, task_state: %s", + task_id, task ? asmTaskStateToString(task->state) : "null"); + return C_ERR; + } + + task->state = ASM_HANDOFF; + task->paused_time = server.mstime; + + return C_OK; +} + +/* Notify Redis that the config is updated for the task. */ +int asmNotifyConfigUpdated(asmTask *task, sds *err) { + int event = -1; + + if (task->operation == ASM_IMPORT && task->state == ASM_TAKEOVER) { + event = ASM_EVENT_IMPORT_COMPLETED; + } else if (task->operation == ASM_MIGRATE && task->state == ASM_STREAM_EOF) { + event = ASM_EVENT_MIGRATE_COMPLETED; + } else { + *err = sdscatprintf(sdsempty(), + "ASM task is not in the correct state for config update: %s", + asmTaskStateToString(task->state)); + asmTaskCancel(task, "slots configuration updated"); + return C_ERR; + } + + /* Reset per-slot statistics for the migrated/imported ranges. + * Note: cluster_legacy.c also cleans up, so this may run twice, but + * required if an alternative cluster impl is in use. */ + for (int i = 0; i < task->slots->num_ranges; i++) { + slotRange *sr = &task->slots->ranges[i]; + for (int j = sr->start; j <= sr->end; j++) + clusterSlotStatReset(j); + } + + /* Clear error message if successful. */ + sdsfree(task->error); + task->error = sdsempty(); + task->state = ASM_COMPLETED; + + asmNotifyStateChange(task, event); + asmTaskFinalize(task); + + /* Trim the slots after the migrate task is completed. */ + if (event == ASM_EVENT_MIGRATE_COMPLETED) + asmTrimJobSchedule(task->slots); + + return C_OK; +} + +/* Import/Migrate task is done, config is updated. */ +int clusterAsmDone(const char *task_id, sds *err) { + serverAssert(task_id); + + asmTask *task = asmLookupTaskById(task_id); + if (!task) { + *err = sdscatprintf(sdsempty(), "No ASM task found for id: %s", task_id); + return C_ERR; + } + return asmNotifyConfigUpdated(task, err); +} + +int clusterAsmProcess(const char *task_id, int event, void *arg, char **err) { + int ret, num_cancelled; + sds errsds = NULL; + static char buf[256]; + + if (err) *err = NULL; + + switch (event) { + case ASM_EVENT_IMPORT_START: { + /* Validate the slot ranges. */ + slotRangeArray *slots = slotRangeArrayDup(arg); + if (slotRangeArrayNormalizeAndValidate(slots, &errsds) != C_OK) { + slotRangeArrayFree(slots); + ret = C_ERR; + break; + } + ret = asmCreateImportTask(task_id, slots, &errsds) ? C_OK : C_ERR; + break; + } + case ASM_EVENT_CANCEL: { + num_cancelled = clusterAsmCancel(task_id, "user request"); + if (arg) *((int *)arg) = num_cancelled; + ret = C_OK; + break; + } + case ASM_EVENT_HANDOFF: { + ret = clusterAsmHandoff(task_id, &errsds); + break; + } + case ASM_EVENT_DONE: { + ret = clusterAsmDone(task_id, &errsds); + break; + } + default: { + ret = C_ERR; + errsds = sdscatprintf(sdsempty(), "Unknown operation: %d", event); + break; + } + } + + if (ret != C_OK && errsds && err) { + snprintf(buf, sizeof(buf), "%s", errsds); + *err = buf; + } + sdsfree(errsds); + + return ret; +} + +/* Propagate TRIMSLOTS command to AOF and replicas. */ +static void propagateTrimSlots(slotRangeArray *slots) { + int argc = slots->num_ranges * 2 + 3; + robj **argv = zmalloc(sizeof(robj*) * argc); + argv[0] = createStringObject("TRIMSLOTS", 9); + argv[1] = createStringObject("RANGES", 6); + argv[2] = createStringObjectFromLongLong(slots->num_ranges); + for (int i = 0; i < slots->num_ranges; i++) { + argv[i*2+3] = createStringObjectFromLongLong(slots->ranges[i].start); + argv[i*2+4] = createStringObjectFromLongLong(slots->ranges[i].end); + } + + enterExecutionUnit(1, 0); + + int prev_replication_allowed = server.replication_allowed; + server.replication_allowed = 1; + alsoPropagate(-1, argv, argc, PROPAGATE_AOF | PROPAGATE_REPL); + server.replication_allowed = prev_replication_allowed; + + exitExecutionUnit(); + postExecutionUnitOperations(); + + for (int i = 0; i < argc; i++) + decrRefCount(argv[i]); + zfree(argv); +} + +/* If this node is a replica and there is an active trim or a pending trim + * job (due to write pause), we cannot process commands from the master for the + * slots that are waiting to be trimmed. Otherwise, the trim cycle could + * mistakenly delete newly added keys. In this case, the master will be blocked + * until the trim job finishes. This is supposed to be a rare event as it needs + * to migrate slots and import them back before the trim job is done. */ +void asmUnblockMasterAfterTrim(void) { + if (server.master && + server.master->flags & CLIENT_BLOCKED && + server.master->bstate.btype == BLOCKED_POSTPONE_TRIM) + { + unblockClient(server.master, 1); + serverLog(LL_NOTICE, "Unblocking master client after active trim is completed"); + } +} + +/* Trim the slots asynchronously in the BIO thread. */ +void asmTriggerBackgroundTrim(slotRangeArray *slots) { + RedisModuleClusterSlotMigrationTrimInfoV1 fsi = { + REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION, + (RedisModuleSlotRangeArray *) slots + }; + + moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, + REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_BACKGROUND, + &fsi); + + signalFlushedDb(0, 1, slots); + + /* Create temp kvstores and estore, move relevant slot dicts/ebuckets into them, + * and delete them in BIO thread asynchronously. */ + kvstore *keys = kvstoreCreate(&kvstoreBaseType, &dbDictType, + CLUSTER_SLOT_MASK_BITS, + KVSTORE_ALLOCATE_DICTS_ON_DEMAND); + kvstore *expires = kvstoreCreate(&kvstoreBaseType, &dbExpiresDictType, + CLUSTER_SLOT_MASK_BITS, + KVSTORE_ALLOCATE_DICTS_ON_DEMAND); + estore *subexpires = estoreCreate(&subexpiresBucketsType, CLUSTER_SLOT_MASK_BITS); + + size_t total_keys = 0; + + for (int i = 0; i < slots->num_ranges; i++) { + for (int slot = slots->ranges[i].start; slot <= slots->ranges[i].end; slot++) { + total_keys += kvstoreDictSize(server.db[0].keys, slot); + kvstoreMoveDict(server.db[0].keys, keys, slot); + kvstoreMoveDict(server.db[0].expires, expires, slot); + estoreMoveEbuckets(server.db[0].subexpires, subexpires, slot); + } + } + + emptyDbDataAsync(keys, expires, subexpires); + + sds str = slotRangeArrayToString(slots); + serverLog(LL_NOTICE, "Background trim started for slots: %s to trim %zu keys.", str, total_keys); + sdsfree(str); + + /* Unblock master if blocked. This can only happen in a very unlikely case, + * trim job will be in pending list due to write pause and master will send + * commands for the slots that are waiting to be trimmed. Just keeping this + * call here for being defensive as it is harmless. */ + asmUnblockMasterAfterTrim(); +} + +/* Trim the slots. */ +void asmTrimSlots(slotRangeArray *slots) { + if (asmManager->debug_trim_method == ASM_DEBUG_TRIM_NONE) + return; + + /* Trigger active trim for the following cases: + * 1. Debug override: trim method is set to 'active'. + * 2. There are clients using client side caching (client tracking is enabled): + * There is no way to invalidate specific slots in the client tracking + * protocol. For now, we just use active trim to trim the slots. + * 3. Module subscribers: If any module is subscribed to TRIMMED event, we + * assume module needs per key notification and cannot use background trim. + */ + int activetrim = server.tracking_clients != 0 || + (asmManager->debug_trim_method == ASM_DEBUG_TRIM_ACTIVE) || + (asmManager->debug_trim_method == ASM_DEBUG_TRIM_DEFAULT && + moduleHasSubscribersForKeyspaceEvent(NOTIFY_KEY_TRIMMED)); + if (activetrim) + asmTriggerActiveTrim(slots); + else + asmTriggerBackgroundTrim(slots); +} + +/* Schedule a trim job for the specified slot ranges. The job will be + * deferred and handled later in asmBeforeSleep(). We delay the trim jobs to + * asmBeforeSleep() to ensure it only runs when there is no write pause. */ +void asmTrimJobSchedule(slotRangeArray *slots) { + listAddNodeTail(asmManager->pending_trim_jobs, slotRangeArrayDup(slots)); +} + +/* Process any pending trim jobs. */ +void asmTrimJobProcessPending(void) { + /* Check if there is any pending trim job and we can propagate it. */ + if (listLength(asmManager->pending_trim_jobs) == 0 || + asmManager->debug_trim_method == ASM_DEBUG_TRIM_NONE) + { + return; + } + + /* If this node is a replica, it should not initiate slot trimming actively. + * Cancel the trim job and unblock the master if it is blocked. */ + if (clusterNodeIsSlave(getMyClusterNode())) { + asmCancelPendingTrimJobs(); + asmUnblockMasterAfterTrim(); + return; + } + + /* Determine if we can start the trim job: + * - require client writes not paused (so key deletions are allowed) + * - require replicas not paused (so TRIMSLOTS can be propagated). + * - require trim is not disabled via RedisModule_ClusterDisableTrim(). + */ + static int logged = 0; + int disabled_by_module = server.cluster_module_trim_disablers > 0; + + if (isPausedActions(PAUSE_ACTION_CLIENT_WRITE) || + isPausedActions(PAUSE_ACTION_CLIENT_ALL) || + isPausedActions(PAUSE_ACTION_REPLICA) || + disabled_by_module) + { + if (logged == 0) { + logged = 1; + const char *reason = disabled_by_module ? "trim is disabled by module" : + "pause action is in effect"; + serverLog(LL_NOTICE, "Trim job is deferred since %s.", reason); + } + return; + } + logged = 0; + + listIter li; + listNode *ln; + listRewind(asmManager->pending_trim_jobs, &li); + while ((ln = listNext(&li)) != NULL) { + slotRangeArray *slots = listNodeValue(ln); + asmTrimSlots(slots); + propagateTrimSlots(slots); + listDelNode(asmManager->pending_trim_jobs, ln); + slotRangeArrayFree(slots); + } +} + +/* Trim keys in slots not owned by this node (if any). */ +void asmTrimSlotsIfNotOwned(slotRangeArray *slots) { + if (!server.cluster_enabled || !clusterNodeIsMaster(getMyClusterNode())) return; + + size_t num_keys = 0; + slotRangeArray *trim_slots = NULL; + for (int i = 0; i < slots->num_ranges; i++) { + for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { + if (clusterIsMySlot(j) || + kvstoreDictSize(server.db[0].keys, j) == 0 || + isSlotInTrimJob(j)) + { + continue; + } + + trim_slots = slotRangeArrayAppend(trim_slots, j); + num_keys += kvstoreDictSize(server.db[0].keys, j); + } + } + if (!trim_slots) return; + + sds str = slotRangeArrayToString(trim_slots); + serverLog(LL_NOTICE, + "Detected keys in slots that do not belong to this node. " + "Scheduling trim for %zu keys in slots: %s", num_keys, str); + sdsfree(str); + + asmTrimJobSchedule(trim_slots); + slotRangeArrayFree(trim_slots); +} + +/* Handle the master task when it is no longer used, trim unowned slots if necessary. + * This function is called when the replica is just promoted to master. */ +void asmFinalizeMasterTask(void) { + if (!server.cluster_enabled) return; + + asmTask *task = asmManager->master_task; + if (task == NULL) return; + + if (task->operation == ASM_IMPORT) { + /* Check if there is an ASM task that master did not finish. */ + if (task->state != ASM_COMPLETED && task->state != ASM_FAILED) { + sds slots_str = slotRangeArrayToString(task->slots); + serverLog(LL_WARNING, "Import task %s from old master failed: slots=%s", + task->id, slots_str); + sdsfree(slots_str); + /* Mark the task as failed and notify the replicas. */ + task->state = ASM_FAILED; + asmNotifyStateChange(task, ASM_EVENT_IMPORT_FAILED); + } + + /* Trim the slots if the import task is failed. */ + if (clusterNodeIsMaster(getMyClusterNode()) && task->state == ASM_FAILED) { + asmTrimSlotsIfNotOwned(task->slots); + } + } else if (task->operation == ASM_MIGRATE) { + /* For migrate tasks, attempt to trim slots if necessary. After ASM completed, + * the previous master may not have initiated slot trimming before the failover + * occurred. In that case, we need to initiate slot trimming here. + * However, if ASM failed, slot ownership did not change, so no slot trimming + * is needed. */ + if (clusterNodeIsMaster(getMyClusterNode()) && task->state != ASM_FAILED) { + asmTrimSlotsIfNotOwned(task->slots); + } + } + + /* Clear the master task since it is not a replica anymore. */ + asmTaskFree(asmManager->master_task); + asmManager->master_task = NULL; +} + +/* The replicas handle the master import ASM task information. */ +int asmReplicaHandleMasterTask(sds task_info) { + if (!server.cluster_enabled || !clusterNodeIsSlave(getMyClusterNode())) return C_ERR; + + /* If the master task is migrating, just clear it when receiving a new task info, + * even the task info is empty since it means the master finished the task. */ + if (asmManager->master_task && asmManager->master_task->operation == ASM_MIGRATE) { + asmTaskFree(asmManager->master_task); + asmManager->master_task = NULL; + } + + /* If the master task is empty, it means the master finished the task, the + * replica should check the slot ownership to decide to raise completed or + * failed event. */ + if (!task_info || sdslen(task_info) == 0) { + asmTask *task = asmManager->master_task; + if (task && task->state != ASM_COMPLETED && task->state != ASM_FAILED) { + /* Check if the slots are owned by the master. */ + int owned_by_master = 1; + for (int i = 0; i < task->slots->num_ranges; i++) { + slotRange *sr = &task->slots->ranges[i]; + for (int j = sr->start; j <= sr->end; j++) { + clusterNode *master = clusterNodeGetMaster(getMyClusterNode()); + if (!master || !clusterNodeCoversSlot(master, j)) { + owned_by_master = 0; + break; + } + } + } + if (owned_by_master) { + task->state = ASM_COMPLETED; + asmNotifyStateChange(task, ASM_EVENT_IMPORT_COMPLETED); + } else { + task->state = ASM_FAILED; + asmNotifyStateChange(task, ASM_EVENT_IMPORT_FAILED); + } + } + return C_OK; + } + + asmTask *task = asmTaskDeserialize(task_info); + if (!task) return C_ERR; + + /* For migrate task, replica just keeps the task info, doesn't notify any event. */ + if (task->operation == ASM_MIGRATE) { + if (asmManager->master_task) asmTaskFree(asmManager->master_task); + asmManager->master_task = task; + return C_OK; + } + + int notify_event = 0; + int event = asmTaskStateToEvent(task); + if (asmManager->master_task) { + /* Notify when the task or event is changed, to avoid duplicated notification. */ + if (strcmp(task->id, asmManager->master_task->id) != 0 || + event != asmTaskStateToEvent(asmManager->master_task)) + { + notify_event = 1; + } + asmTaskFree(asmManager->master_task); + } else { + /* Ignore completed or failed task when there is no active master task. */ + if (task->state != ASM_FAILED && task->state != ASM_COMPLETED) + notify_event = 1; + } + + asmManager->master_task = task; + if (notify_event) asmNotifyStateChange(task, event); + return C_OK; +} + +/* Cancel all pending trim jobs. */ +void asmCancelPendingTrimJobs(void) { + if (!asmManager) return; + + listIter li; + listNode *ln; + listRewind(asmManager->pending_trim_jobs, &li); + while ((ln = listNext(&li)) != NULL) { + slotRangeArray *slots = listNodeValue(ln); + listDelNode(asmManager->pending_trim_jobs, ln); + sds str = slotRangeArrayToString(slots); + serverLog(LL_NOTICE, "Cancelling the pending trim job for slots: %s", str); + sdsfree(str); + slotRangeArrayFree(slots); + } +} + +/* Cancel all pending and active trim jobs. */ +void asmCancelTrimJobs(void) { + if (!asmManager) return; + + /* Unblock master if blocked */ + asmUnblockMasterAfterTrim(); + + /* Cancel pending trim jobs */ + asmCancelPendingTrimJobs(); + + /* Cancel active trim jobs */ + if (listLength(asmManager->active_trim_jobs) == 0) + return; + + serverLog(LL_NOTICE, "Cancelling all active trim jobs"); + asmManager->active_trim_cancelled += listLength(asmManager->active_trim_jobs); + asmActiveTrimEnd(); + listEmpty(asmManager->active_trim_jobs); +} + +/* It's used to trim slots after the migration is completed or import is failed. + * TRIMSLOTS RANGES <numranges> <start-slot> <end-slot> ... */ +void trimslotsCommand(client *c) { + long numranges = 0; + + if (server.cluster_enabled == 0) { + addReplyError(c,"This instance has cluster support disabled"); + return; + } + + if (c->argc < 5) { + addReplyErrorArity(c); + return; + } + + /* Validate the ranges argument */ + if (strcasecmp(c->argv[1]->ptr, "ranges") != 0) { + addReplyError(c, "missing ranges argument"); + return; + } + + /* Get the number of ranges */ + if (getLongFromObjectOrReply(c, c->argv[2], &numranges, NULL) != C_OK) + return; + + /* Validate the number of ranges and argument count */ + if (numranges < 1 || numranges > CLUSTER_SLOTS || c->argc != 3 + numranges * 2) { + addReplyError(c, "invalid number of ranges"); + return; + } + + /* Parse the slot ranges and start trimming */ + slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 3); + if (!slots) return; + + if (c->id == CLIENT_ID_AOF) { + serverAssert(server.loading); + /* If we are loading the AOF, we can't trigger active trim because next + * command may have an update for the same key that is supposed to be + * trimmed. We have to trim the keys synchronously. */ + clusterDelKeysInSlotRangeArray(slots, 1); + } else { + /* We cannot trim any slot served by this node. */ + if (clusterNodeIsMaster(getMyClusterNode())) { + for (int i = 0; i < slots->num_ranges; i++) { + for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { + if (clusterCanAccessKeysInSlot(j)) { + addReplyErrorFormat(c, "the slot %d is served by this node", j); + slotRangeArrayFree(slots); + return; + } + } + } + } + asmTrimSlots(slots); + } + + /* Command will not be propagated automatically since it does not modify + * the dataset. */ + forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF); + + slotRangeArrayFree(slots); + addReply(c, shared.ok); +} + +/* Start the active trim job. */ +void asmActiveTrimStart(void) { + slotRangeArray *slots = listNodeValue(listFirst(asmManager->active_trim_jobs)); + + serverAssert(asmManager->active_trim_it == NULL); + asmManager->active_trim_it = slotRangeArrayGetIterator(slots); + asmManager->active_trim_started++; + asmManager->active_trim_current_job_keys = 0; + asmManager->active_trim_current_job_trimmed = 0; + + /* Count the number of keys to trim */ + asmManager->active_trim_current_job_keys += asmCountKeysInSlots(slots); + + RedisModuleClusterSlotMigrationTrimInfoV1 fsi = { + REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION, + (RedisModuleSlotRangeArray *) slots + }; + + moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, + REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_STARTED, + &fsi); + + sds str = slotRangeArrayToString(slots); + serverLog(LL_NOTICE, "Active trim initiated for slots: %s, to trim %llu keys.", + str, asmManager->active_trim_current_job_keys); + sdsfree(str); +} + +/* Schedule an active trim job. */ +void asmTriggerActiveTrim(slotRangeArray *slots) { + listAddNodeTail(asmManager->active_trim_jobs, slotRangeArrayDup(slots)); + sds str = slotRangeArrayToString(slots); + serverLog(LL_NOTICE, "Active trim scheduled for slots: %s", str); + sdsfree(str); + + /* Start an active trim job if no active trim job is running. */ + if (asmManager->active_trim_it == NULL) { + serverAssert(listLength(asmManager->active_trim_jobs) > 0); + asmActiveTrimStart(); + } +} + +/* End the active trim job. */ +void asmActiveTrimEnd(void) { + slotRangeArray *slots = listNodeValue(listFirst(asmManager->active_trim_jobs)); + + if (asmManager->active_trim_it) { + slotRangeArrayIteratorFree(asmManager->active_trim_it); + asmManager->active_trim_it = NULL; + } + + /* Unblock the master if it is blocked */ + asmUnblockMasterAfterTrim(); + + RedisModuleClusterSlotMigrationTrimInfoV1 fsi = { + REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION, + (RedisModuleSlotRangeArray *) slots + }; + + moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, + REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_COMPLETED, + &fsi); + + sds str = slotRangeArrayToString(slots); + serverLog(LL_NOTICE, "Active trim completed for slots: %s, %llu keys trimmed.", + str, asmManager->active_trim_current_job_trimmed); + sdsfree(str); + listDelNode(asmManager->active_trim_jobs, listFirst(asmManager->active_trim_jobs)); + asmManager->active_trim_completed++; +} + +/* Check if the slot range array overlaps with any trim job. */ +int asmIsAnyTrimJobOverlaps(slotRangeArray *slots) { + if (!asmIsTrimInProgress()) return 0; + for (int i = 0; i < slots->num_ranges; i++) { + for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { + if (isSlotInTrimJob(j)) return 1; + } + } + return 0; +} + +/* Check if there is any trim job in progress. */ +int asmIsTrimInProgress(void) { + if (!server.cluster_enabled) return 0; + return (listLength(asmManager->active_trim_jobs) != 0 || + listLength(asmManager->pending_trim_jobs) != 0); +} + + +/* Check if the command is accessing keys in a slot being trimmed. + * Return the slot if found, otherwise return -1. */ +int asmGetTrimmingSlotForCommand(struct redisCommand *cmd, robj **argv, int argc) { + if (!asmIsTrimInProgress()) return -1; + + /* Get the keys from the command */ + getKeysResult result = GETKEYS_RESULT_INIT; + int numkeys = getKeysFromCommand(cmd, argv, argc, &result); + + int last_checked_slot = -1; + for (int j = 0; j < numkeys; j++) { + robj *key = argv[result.keys[j].pos]; + int slot = keyHashSlot((char*) key->ptr, sdslen(key->ptr)); + if (slot == last_checked_slot) continue; + if (isSlotInTrimJob(slot)) { + getKeysFreeResult(&result); + return slot; + } + last_checked_slot = slot; + } + getKeysFreeResult(&result); + return -1; +} + +/* Delete the key and notify the modules. */ +void asmActiveTrimDeleteKey(redisDb *db, robj *keyobj) { + if (asmManager->debug_active_trim_delay > 0) + debugDelay(asmManager->debug_active_trim_delay); + + /* The key needs to be converted from static to heap before deletion. */ + int static_key = keyobj->refcount == OBJ_STATIC_REFCOUNT; + if (static_key) keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr)); + + dbDelete(db, keyobj); + keyModified(NULL, db, keyobj, NULL, 1); + /* The keys are not actually logically deleted from the database, just moved + * to another node. The modules need to know that these keys are no longer + * available locally, so just send the keyspace notification to the modules, + * but not to clients. */ + moduleNotifyKeyspaceEvent(NOTIFY_KEY_TRIMMED, "key_trimmed", keyobj, db->id); + asmManager->active_trim_current_job_trimmed++; + + if (static_key) decrRefCount(keyobj); +} + +/* Trim keys in the active trim job. */ +void asmActiveTrimCycle(void) { + if (asmManager->debug_active_trim_delay < 0 || + listLength(asmManager->active_trim_jobs) == 0) + { + return; + } + + /* Verify client pause is not in effect and trim is not disabled by module, + * so we can delete keys. */ + static int blocked = 0; + int disabled_by_module = server.cluster_module_trim_disablers > 0; + if (isPausedActions(PAUSE_ACTION_CLIENT_ALL) || + isPausedActions(PAUSE_ACTION_CLIENT_WRITE) || + disabled_by_module) + { + if (blocked == 0) { + blocked = 1; + const char *reason = disabled_by_module ? "trim is disabled by module" : + "pause action is in effect"; + serverLog(LL_NOTICE, "Active trim cycle is blocked since %s.", reason); + } + return; + } + if (blocked) serverLog(LL_NOTICE, "Active trim cycle is unblocked."); + blocked = 0; + + /* This works in a similar way to activeExpireCycle, in the sense that + * we do incremental work across calls. */ + const int trim_cycle_time_perc = 25; + int time_exceeded = 0; + long long start = ustime(), timelimit; + unsigned long long num_deleted = 0; + + /* Calculate the time limit in microseconds for this cycle. */ + timelimit = 1000000 * trim_cycle_time_perc / server.hz / 100; + if (timelimit <= 0) timelimit = 1; + + serverAssert(asmManager->active_trim_it); + int slot = slotRangeArrayGetCurrentSlot(asmManager->active_trim_it); + + while (!time_exceeded && slot != -1) { + dictEntry *de; + kvstoreDictIterator kvs_di; + kvstoreInitDictSafeIterator(&kvs_di, server.db[0].keys, slot); + while ((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) { + kvobj *kv = dictGetKV(de); + sds sdskey = kvobjGetKey(kv); + + enterExecutionUnit(1, 0); + robj *keyobj = createStringObject(sdskey, sdslen(sdskey)); + asmActiveTrimDeleteKey(&server.db[0], keyobj); + decrRefCount(keyobj); + exitExecutionUnit(); + postExecutionUnitOperations(); + num_deleted++; + + /* Once in 32 deletions check if we reached the time limit. */ + if (num_deleted % 32 == 0 && (ustime() - start) > timelimit) { + time_exceeded = 1; + break; + } + } + kvstoreResetDictIterator(&kvs_di); + if (!time_exceeded) slot = slotRangeArrayNext(asmManager->active_trim_it); + } + + if (slot == -1) { +#if defined(USE_JEMALLOC) + jemalloc_purge(); +#endif + asmActiveTrimEnd(); + + /* Immediately start the next trim job upon completion of the current + * one. Eliminates gaps in notifications so modules are informed about + * trimming unowned keys, which is important for modules that + * continuously filter unowned keys from their replies. */ + if (listLength(asmManager->active_trim_jobs) != 0) + asmActiveTrimStart(); + } +} + +/* Check if the key in a trim job. */ +int asmIsKeyInTrimJob(sds keyname) { + if (!asmIsTrimInProgress() || !isSlotInTrimJob(getKeySlot(keyname))) + return 0; + return 1; +} + +/* Modules can use RM_ClusterPropagateForSlotMigration() during the + * CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE event to propagate commands + * that should be delivered just before the slot snapshot delivery starts. */ +int asmModulePropagateBeforeSlotSnapshot(struct redisCommand *cmd, robj **argv, int argc) { + /* This API is only called in the fork child. */ + if (server.cluster_enabled == 0 || + server.in_fork_child != CHILD_TYPE_RDB || + listLength(asmManager->tasks) == 0) + { + errno = EBADF; + return C_ERR; + } + + /* Check if the task state is right. */ + asmTask *task = listNodeValue(listFirst(asmManager->tasks)); + if (task->operation != ASM_MIGRATE || + task->state != ASM_SEND_BULK_AND_STREAM || + task->pre_snapshot_module_cmds == NULL) + { + errno = EBADF; + return C_ERR; + } + + /* Ensure all arguments are converted to string encoding if necessary, + * since getSlotFromCommand expects them to be string-encoded. */ + for (int i = 0; i < argc; i++) { + if (!sdsEncodedObject(argv[i])) { + serverAssert(argv[i]->encoding == OBJ_ENCODING_INT); + robj *old = argv[i]; + argv[i] = createStringObjectFromLongLongWithSds((long)old->ptr); + decrRefCount(old); + } + } + + /* Crossslot commands are not allowed */ + int slot = getSlotFromCommand(cmd, argv, argc); + if (slot == CLUSTER_CROSSSLOT) { + errno = ENOTSUP; + return C_ERR; + } + + /* Allow no-keys commands or if keys are in the slot range. */ + slotRange sr = {slot, slot}; + if (slot != INVALID_CLUSTER_SLOT && !slotRangeArrayOverlaps(task->slots, &sr)) { + errno = ERANGE; + return C_ERR; + } + + robj **argvcopy = zmalloc(sizeof(robj*) * argc); + for (int i = 0; i < argc; i++) { + argvcopy[i] = argv[i]; + incrRefCount(argv[i]); + } + + redisOpArrayAppend(task->pre_snapshot_module_cmds, 0, argvcopy, argc, 0); + return C_OK; +} |
