From dcacc00e3750300617ba6e16eb346713f91a783a Mon Sep 17 00:00:00 2001 From: Mitja Felicijan Date: Wed, 21 Jan 2026 22:52:54 +0100 Subject: Remove testing data --- examples/redis-unstable/src/cluster_asm.c | 3602 ----------------------------- 1 file changed, 3602 deletions(-) delete mode 100644 examples/redis-unstable/src/cluster_asm.c (limited to 'examples/redis-unstable/src/cluster_asm.c') diff --git a/examples/redis-unstable/src/cluster_asm.c b/examples/redis-unstable/src/cluster_asm.c deleted file mode 100644 index a090453..0000000 --- a/examples/redis-unstable/src/cluster_asm.c +++ /dev/null @@ -1,3602 +0,0 @@ -/* cluster_asm.c -- Atomic slot migration implementation for cluster - * - * Copyright (c) 2025-Present, Redis Ltd. - * All rights reserved. - * - * Licensed under your choice of (a) the Redis Source Available License 2.0 - * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the - * GNU Affero General Public License v3 (AGPLv3). - */ - -#include "server.h" -#include "cluster.h" -#include "functions.h" -#include "cluster_asm.h" -#include "cluster_slot_stats.h" - -#define ASM_IMPORT (1 << 1) -#define ASM_MIGRATE (1 << 2) - -#define ASM_DEBUG_TRIM_DEFAULT 0 -#define ASM_DEBUG_TRIM_NONE 1 -#define ASM_DEBUG_TRIM_BG 2 -#define ASM_DEBUG_TRIM_ACTIVE 3 - -#define ASM_AOF_MIN_ITEMS_PER_KEY 512 /* Minimum number of items per key to use AOF format encoding */ - -typedef struct asmTask { - sds id; /* Task ID */ - int operation; /* Either ASM_IMPORT or ASM_MIGRATE */ - slotRangeArray *slots; /* List of slot ranges for this migration task */ - int state; /* Current state of the task */ - int dest_state; /* Destination node's main state (approximate) */ - char source[CLUSTER_NAMELEN]; /* Source node name */ - char dest[CLUSTER_NAMELEN]; /* Destination node name */ - clusterNode *source_node; /* Source node */ - connection *main_channel_conn; /* Main channel connection */ - connection *rdb_channel_conn; /* RDB channel connection */ - int rdb_channel_state; /* State of the RDB channel */ - unsigned long long dest_offset; /* Destination offset */ - unsigned long long source_offset; /* Source offset */ - int cross_slot_during_propagating; /* If cross-slot commands are encountered during propagating */ - int stream_eof_during_streaming; /* If STREAM-EOF is received during streaming buffer */ - replDataBuf sync_buffer; /* Buffer for the stream */ - client *main_channel_client; /* Client for the main channel on the source side */ - client *rdb_channel_client; /* Client for the RDB channel on the source side */ - long long retry_count; /* Number of retries for this task */ - mstime_t create_time; /* Task creation time */ - mstime_t start_time; /* Task start time */ - mstime_t end_time; /* Task end time */ - mstime_t paused_time; /* The time when the slot writes were paused */ - mstime_t dest_slots_snapshot_time; /* The time when the destination starts applying the slot snapshot */ - mstime_t dest_accum_applied_time; /* The time when the destination finishes applying the accumulated buffer */ - sds error; /* Error message for this task */ - redisOpArray *pre_snapshot_module_cmds; /* Module commands to be propagated at the beginning of slot migration */ -} asmTask; - -struct asmManager { - list *tasks; /* List of asmTask to be processed */ - list *archived_tasks; /* List of archived asmTask */ - list *pending_trim_jobs; /* List of pending trim jobs (due to write pause) */ - list *active_trim_jobs; /* List of active trim jobs */ - slotRangeArrayIter *active_trim_it; /* Iterator of the current active trim job */ - size_t sync_buffer_peak; /* Peak size of sync buffer */ - asmTask *master_task; /* The task that is currently active on the master */ - - /* Fail point injection for debugging */ - int debug_fail_channel; /* Channel where the task will fail */ - int debug_fail_state; /* State where the task will fail */ - int debug_trim_method; /* Method to trim the buffer */ - int debug_active_trim_delay; /* Sleep before trimming each key */ - - /* Active trim stats */ - unsigned long long active_trim_started; /* Number of times active trim was started */ - unsigned long long active_trim_completed; /* Number of times active trim was completed */ - unsigned long long active_trim_cancelled; /* Number of times active trim was cancelled */ - unsigned long long active_trim_current_job_keys; /* Total number of keys to trim in the current job */ - unsigned long long active_trim_current_job_trimmed; /* Number of keys trimmed in the current job */ -}; - -enum asmState { - /* Common state */ - ASM_NONE = 0, - ASM_CONNECTING, - ASM_AUTH_REPLY, - ASM_CANCELED, - ASM_FAILED, - ASM_COMPLETED, - - /* Import state */ - ASM_SEND_HANDSHAKE, - ASM_HANDSHAKE_REPLY, - ASM_SEND_SYNCSLOTS, - ASM_SYNCSLOTS_REPLY, - ASM_INIT_RDBCHANNEL, - ASM_ACCUMULATE_BUF, - ASM_READY_TO_STREAM, - ASM_STREAMING_BUF, - ASM_WAIT_STREAM_EOF, - ASM_TAKEOVER, - - /* Migrate state */ - ASM_WAIT_RDBCHANNEL, - ASM_WAIT_BGSAVE_START, - ASM_SEND_BULK_AND_STREAM, - ASM_SEND_STREAM, - ASM_HANDOFF_PREP, - ASM_HANDOFF, - ASM_STREAM_EOF, - - /* RDB channel state */ - ASM_RDBCHANNEL_REQUEST, - ASM_RDBCHANNEL_REPLY, - ASM_RDBCHANNEL_TRANSFER, -}; - -enum asmChannel { - ASM_IMPORT_MAIN_CHANNEL = 1, /* Main channel for the import task */ - ASM_IMPORT_RDB_CHANNEL, /* RDB channel for the import task */ - ASM_MIGRATE_MAIN_CHANNEL, /* Main channel for the migrate task */ - ASM_MIGRATE_RDB_CHANNEL /* RDB channel for the migrate task */ -}; - -/* Global ASM manager */ -struct asmManager *asmManager = NULL; - -/* replication.c */ -char *sendCommand(connection *conn, ...); -char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens); -char *receiveSynchronousResponse(connection *conn); -ConnectionType *connTypeOfReplication(void); -int startBgsaveForReplication(int mincapa, int req); -void createReplicationBacklogIfNeeded(void); -/* cluster.c */ -void createDumpPayload(rio *payload, robj *o, robj *key, int dbid, int skip_checksum); -/* cluster_asm.c */ -static void asmStartImportTask(asmTask *task); -static void asmTaskCancel(asmTask *task, const char *reason); -static void asmSyncBufferReadFromConn(connection *conn); -static void propagateTrimSlots(slotRangeArray *slots); -void asmTrimJobSchedule(slotRangeArray *slots); -void asmTrimJobProcessPending(void); -void asmCancelPendingTrimJobs(void); -void asmTriggerActiveTrim(slotRangeArray *slots); -void asmActiveTrimEnd(void); -int asmIsAnyTrimJobOverlaps(slotRangeArray *slots); -void asmTrimSlotsIfNotOwned(slotRangeArray *slots); -void asmNotifyStateChange(asmTask *task, int event); - -void asmInit(void) { - asmManager = zcalloc(sizeof(*asmManager)); - asmManager->tasks = listCreate(); - asmManager->archived_tasks = listCreate(); - asmManager->pending_trim_jobs = listCreate(); - asmManager->sync_buffer_peak = 0; - asmManager->master_task = NULL; - asmManager->debug_fail_channel = -1; - asmManager->debug_fail_state = -1; - asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT; - asmManager->debug_active_trim_delay = 0; - asmManager->active_trim_jobs = listCreate(); - asmManager->active_trim_started = 0; - asmManager->active_trim_completed = 0; - asmManager->active_trim_cancelled = 0; - listSetFreeMethod(asmManager->active_trim_jobs, slotRangeArrayFreeGeneric); -} - -char *asmTaskStateToString(int state) { - switch (state) { - case ASM_NONE: return "none"; - case ASM_CONNECTING: return "connecting"; - case ASM_AUTH_REPLY: return "auth-reply"; - case ASM_CANCELED: return "canceled"; - case ASM_FAILED: return "failed"; - case ASM_COMPLETED: return "completed"; - - /* Import state */ - case ASM_SEND_HANDSHAKE: return "send-handshake"; - case ASM_HANDSHAKE_REPLY: return "handshake-reply"; - case ASM_SEND_SYNCSLOTS: return "send-syncslots"; - case ASM_SYNCSLOTS_REPLY: return "syncslots-reply"; - case ASM_INIT_RDBCHANNEL: return "init-rdbchannel"; - case ASM_ACCUMULATE_BUF: return "accumulate-buffer"; - case ASM_READY_TO_STREAM: return "ready-to-stream"; - case ASM_STREAMING_BUF: return "streaming-buffer"; - case ASM_WAIT_STREAM_EOF: return "wait-stream-eof"; - case ASM_TAKEOVER: return "takeover"; - - /* Migrate state */ - case ASM_WAIT_RDBCHANNEL: return "wait-rdbchannel"; - case ASM_WAIT_BGSAVE_START: return "wait-bgsave-start"; - case ASM_SEND_BULK_AND_STREAM: return "send-bulk-and-stream"; - case ASM_SEND_STREAM: return "send-stream"; - case ASM_HANDOFF_PREP: return "handoff-prep"; - case ASM_HANDOFF: return "handoff"; - case ASM_STREAM_EOF: return "stream-eof"; - - /* RDB channel state */ - case ASM_RDBCHANNEL_REQUEST: return "rdbchannel-request"; - case ASM_RDBCHANNEL_REPLY: return "rdbchannel-reply"; - case ASM_RDBCHANNEL_TRANSFER: return "rdbchannel-transfer"; - - default: return "unknown"; - } - serverAssert(0); /* Unreachable */ -} - -const char *asmChannelToString(int channel) { - switch (channel) { - case ASM_IMPORT_MAIN_CHANNEL: return "import-main-channel"; - case ASM_IMPORT_RDB_CHANNEL: return "import-rdb-channel"; - case ASM_MIGRATE_MAIN_CHANNEL: return "migrate-main-channel"; - case ASM_MIGRATE_RDB_CHANNEL: return "migrate-rdb-channel"; - default: return "unknown"; - } -} - -int asmDebugSetFailPoint(char *channel, char *state) { - if (!asmManager) { - serverLog(LL_WARNING, "ASM manager is not initialized"); - return C_ERR; - } - asmManager->debug_fail_channel = -1; - asmManager->debug_fail_state = -1; - if (!channel && !state) return C_ERR; - if (sdslen(channel) == 0 && sdslen(state) == 0) { - serverLog(LL_WARNING, "ASM fail point is cleared"); - return C_OK; - } - - for (int i = ASM_IMPORT_MAIN_CHANNEL; i <= ASM_MIGRATE_RDB_CHANNEL; i++) { - if (!strcasecmp(channel, asmChannelToString(i))) { - asmManager->debug_fail_channel = i; - break; - } - } - if (asmManager->debug_fail_channel == -1) return C_ERR; - - for (int i = ASM_NONE; i <= ASM_RDBCHANNEL_TRANSFER; i++) { - if (!strcasecmp(state, asmTaskStateToString(i))) { - asmManager->debug_fail_state = i; - break; - } - } - if (asmManager->debug_fail_state == -1) return C_ERR; - - serverLog(LL_NOTICE, "ASM fail point set: channel=%s, state=%s", channel, state); - return C_OK; -} - -int asmDebugSetTrimMethod(const char *method, int active_trim_delay) { - if (!asmManager) { - serverLog(LL_WARNING, "ASM manager is not initialized"); - return C_ERR; - } - int prev = asmManager->debug_trim_method; - if (!strcasecmp(method, "default")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT; - else if (!strcasecmp(method, "none")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_NONE; - else if (!strcasecmp(method, "bg")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_BG; - else if (!strcasecmp(method, "active")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_ACTIVE; - else return C_ERR; - - /* If we are switching from none to default, delete all the keys in the - * slots we don't own */ - if (prev == ASM_DEBUG_TRIM_NONE && asmManager->debug_trim_method != ASM_DEBUG_TRIM_NONE) { - for (int i = 0; i < CLUSTER_SLOTS; i++) - if (!clusterIsMySlot(i)) - clusterDelKeysInSlot(i, 0); - } - asmManager->debug_active_trim_delay = active_trim_delay; - serverLog(LL_NOTICE, "ASM trim method was set=%s, active_trim_delay=%d", method, active_trim_delay); - return C_OK; -} - -int asmDebugIsFailPointActive(int channel, int state) { - if (!asmManager) return 0; /* ASM manager not initialized */ - if (asmManager->debug_fail_channel == channel && asmManager->debug_fail_state == state) { - serverLog(LL_NOTICE, "ASM fail point active: channel=%s, state=%s", - asmChannelToString(channel), asmTaskStateToString(state)); - return 1; - } - return 0; -} - -sds asmCatInfoString(sds info) { - int active_tasks = 0; - - listIter li; - listNode *ln; - listRewind(asmManager->tasks, &li); - while ((ln = listNext(&li)) != NULL) { - asmTask *task = listNodeValue(ln); - if (task->operation == ASM_IMPORT || - (task->operation == ASM_MIGRATE && task->state != ASM_FAILED)) - { - active_tasks++; - } - } - - return sdscatprintf(info ? info : sdsempty(), - "cluster_slot_migration_active_tasks:%d\r\n" - "cluster_slot_migration_active_trim_running:%lu\r\n" - "cluster_slot_migration_active_trim_current_job_keys:%llu\r\n" - "cluster_slot_migration_active_trim_current_job_trimmed:%llu\r\n" - "cluster_slot_migration_stats_active_trim_started:%llu\r\n" - "cluster_slot_migration_stats_active_trim_completed:%llu\r\n" - "cluster_slot_migration_stats_active_trim_cancelled:%llu\r\n", - active_tasks, - listLength(asmManager->active_trim_jobs), - asmManager->active_trim_current_job_keys, - asmManager->active_trim_current_job_trimmed, - asmManager->active_trim_started, - asmManager->active_trim_completed, - asmManager->active_trim_cancelled); -} - -void asmTaskReset(asmTask *task) { - task->state = ASM_NONE; - task->dest_state = ASM_NONE; - task->rdb_channel_state = ASM_NONE; - task->main_channel_conn = NULL; - task->rdb_channel_conn = NULL; - task->dest_offset = 0; - task->source_offset = 0; - task->stream_eof_during_streaming = 0; - task->cross_slot_during_propagating = 0; - replDataBufInit(&task->sync_buffer); - task->main_channel_client = NULL; - task->rdb_channel_client = NULL; - task->paused_time = 0; - task->dest_slots_snapshot_time = 0; - task->dest_accum_applied_time = 0; - task->pre_snapshot_module_cmds = NULL; -} - -asmTask *asmTaskCreate(const char *task_id) { - asmTask *task = zcalloc(sizeof(*task)); - task->error = sdsempty(); - asmTaskReset(task); - task->slots = NULL; - task->source_node = NULL; - task->retry_count = 0; - task->create_time = server.mstime; - task->start_time = -1; - task->end_time = -1; - if (task_id) { - task->id = sdsnew(task_id); - } else { - task->id = sdsnewlen(NULL, CLUSTER_NAMELEN); - getRandomHexChars(task->id, CLUSTER_NAMELEN); - } - - return task; -} - -void asmTaskFree(asmTask *task) { - replDataBufClear(&task->sync_buffer); - sdsfree(task->id); - slotRangeArrayFree(task->slots); - sdsfree(task->error); - zfree(task); -} - -/* Convert the task state to the corresponding event. */ -int asmTaskStateToEvent(asmTask *task) { - if (task->operation == ASM_IMPORT) { - if (task->state == ASM_COMPLETED) return ASM_EVENT_IMPORT_COMPLETED; - else if (task->state == ASM_FAILED) return ASM_EVENT_IMPORT_FAILED; - else return ASM_EVENT_IMPORT_STARTED; - } else { - if (task->state == ASM_COMPLETED) return ASM_EVENT_MIGRATE_COMPLETED; - else if (task->state == ASM_FAILED) return ASM_EVENT_MIGRATE_FAILED; - else return ASM_EVENT_MIGRATE_STARTED; - } -} - -/* Serialize ASM task information into a string for transmission to replicas. - * Format: "task_id:source_node:dest_node:operation:state:slot_ranges" - * Where slot_ranges is in the format "1000-2000 3000-4000 ..." */ -sds asmTaskSerialize(asmTask *task) { - sds serialized = sdsempty(); - - /* Add task ID */ - serialized = sdscatprintf(serialized, "%s:", task->id); - - /* Add source node ID (40 chars) */ - serialized = sdscatlen(serialized, task->source, CLUSTER_NAMELEN); - serialized = sdscat(serialized, ":"); - - /* Add destination node ID (40 chars) */ - serialized = sdscatlen(serialized, task->dest, CLUSTER_NAMELEN); - serialized = sdscat(serialized, ":"); - - /* Add operation type */ - serialized = sdscatprintf(serialized, "%s:", task->operation == ASM_IMPORT ? - "import" : "migrate"); - - /* Add current state */ - serialized = sdscatprintf(serialized, "%s:", asmTaskStateToString(task->state)); - - /* Add slot ranges sds */ - sds slots_str = slotRangeArrayToString(task->slots); - serialized = sdscatprintf(serialized, "%s", slots_str); - sdsfree(slots_str); - - return serialized; -} - -/* Deserialize ASM task information from a string and create a complete asmTask. - * Format: "task_id:source_node:dest_node:operation:state:slot_ranges" - * Returns a new asmTask on success, NULL on failure. */ -asmTask *asmTaskDeserialize(sds data) { - int count, idx = 0; - asmTask *task = NULL; - if (!data || sdslen(data) == 0) return NULL; - - sds *parts = sdssplitlen(data, sdslen(data), ":", 1, &count); - if (count < 6) goto err; - - /* Parse task ID */ - if (sdslen(parts[idx]) == 0) goto err; - task = asmTaskCreate(parts[idx]); - if (!task) goto err; - idx++; - - /* Parse source node ID */ - if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err; - memcpy(task->source, parts[idx], CLUSTER_NAMELEN); - idx++; - - /* Parse destination node ID */ - if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err; - memcpy(task->dest, parts[idx], CLUSTER_NAMELEN); - idx++; - - /* Parse operation type */ - if (!strcasecmp(parts[idx], "import")) { - task->operation = ASM_IMPORT; - } else if (!strcasecmp(parts[idx], "migrate")) { - task->operation = ASM_MIGRATE; - } else { - goto err; - } - idx++; - - /* Parse state */ - task->state = ASM_NONE; /* Default state */ - for (int state = ASM_NONE; state <= ASM_RDBCHANNEL_TRANSFER; state++) { - if (!strcasecmp(parts[idx], asmTaskStateToString(state))) { - task->state = state; - break; - } - } - idx++; - - /* Parse slot ranges */ - task->slots = slotRangeArrayFromString(parts[idx]); - if (!task->slots) goto err; - idx++; - - /* Ignore any extra fields for future compatibility */ - - sdsfreesplitres(parts, count); - return task; - -err: - if (task) asmTaskFree(task); - sdsfreesplitres(parts, count); - return NULL; -} - -/* Notify replicas about ASM task information to maintain consistency during - * slot migration. This function sends a CLUSTER SYNCSLOTS CONF ASM-TASK command - * to all connected replicas with the serialized task information. */ -void asmNotifyReplicasStateChange(struct asmTask *task) { - if (!server.cluster_enabled || !clusterNodeIsMaster(getMyClusterNode())) return; - - /* Create command arguments for CLUSTER SYNCSLOTS CONF ASM-TASK */ - robj *argv[5]; - argv[0] = createStringObject("CLUSTER", 7); - argv[1] = createStringObject("SYNCSLOTS", 9); - argv[2] = createStringObject("CONF", 4); - argv[3] = createStringObject("ASM-TASK", 8); - argv[4] = createObject(OBJ_STRING, asmTaskSerialize(task)); - - /* Send the command to all replicas */ - replicationFeedSlaves(server.slaves, -1, argv, 5); - - /* Clean up command objects */ - for (int i = 0; i < 5; i++) { - decrRefCount(argv[i]); - } -} - -/* Dump the active import ASM task information. */ -sds asmDumpActiveImportTask(void) { - if (!server.cluster_enabled) return NULL; - - /* For replica, dump the master active task. */ - if (clusterNodeIsSlave(getMyClusterNode()) && - asmManager->master_task && - asmManager->master_task->state != ASM_FAILED && - asmManager->master_task->state != ASM_COMPLETED) - { - return asmTaskSerialize(asmManager->master_task); - } - - /* For master, dump the first active task. */ - if (!asmManager || listLength(asmManager->tasks) == 0) return NULL; - asmTask *task = listNodeValue(listFirst(asmManager->tasks)); - if (task->state == ASM_NONE || task->state == ASM_FAILED || - task->state == ASM_COMPLETED) return NULL; - - return asmTaskSerialize(task); -} - -size_t asmGetPeakSyncBufferSize(void) { - if (!asmManager) return 0; - /* Compute peak sync buffer usage. The current task's peak may not - * reflect in asmManager->sync_buffer_peak immediately. */ - size_t peak = asmManager->sync_buffer_peak; - asmTask *task = listFirst(asmManager->tasks) ? - listNodeValue(listFirst(asmManager->tasks)) : NULL; - if (task && task->operation == ASM_IMPORT) - peak = max(task->sync_buffer.peak, asmManager->sync_buffer_peak); - - return peak; -} - -size_t asmGetImportInputBufferSize(void) { - if (!asmManager || listLength(asmManager->tasks) == 0) return 0; - - asmTask *task = listNodeValue(listFirst(asmManager->tasks)); - if (task->operation == ASM_IMPORT) - return task->sync_buffer.mem_used; - - return 0; -} - -size_t asmGetMigrateOutputBufferSize(void) { - if (!asmManager || listLength(asmManager->tasks) == 0) return 0; - - asmTask *task = listNodeValue(listFirst(asmManager->tasks)); - if (task->operation == ASM_MIGRATE && task->main_channel_client) - return getClientOutputBufferMemoryUsage(task->main_channel_client); - - return 0; -} - -/* Returns the ASM task with the given ID, or NULL if no such task exists. */ -static asmTask *asmLookupTaskAt(list *tasks, const char *id) { - listIter li; - listNode *ln; - - listRewind(tasks, &li); - while ((ln = listNext(&li)) != NULL) { - asmTask *task = listNodeValue(ln); - if (!strcmp(task->id, id)) return task; - } - return NULL; -} - -/* Returns the ASM task with the given ID, or NULL if no such task exists. */ -asmTask *asmLookupTaskById(const char *id) { - return asmLookupTaskAt(asmManager->tasks, id); -} - -/* Returns the ASM task that is identical to the given slot range array, or NULL - * if no such task exists. */ -asmTask *asmLookupTaskBySlotRangeArray(slotRangeArray *slots) { - listIter li; - listNode *ln; - - listRewind(asmManager->tasks, &li); - while ((ln = listNext(&li)) != NULL) { - asmTask *task = listNodeValue(ln); - if (slotRangeArrayIsEqual(task->slots, slots)) - return task; - } - return NULL; -} - -/* Returns the slot range array for the given task ID */ -slotRangeArray *asmTaskGetSlotRanges(const char *task_id) { - asmTask *task = NULL; - if (!task_id || (task = asmLookupTaskById(task_id)) == NULL) return NULL; - - return task->slots; -} - -/* Returns 1 if the slot range array overlaps with the given slot range. */ -static int slotRangeArrayOverlaps(slotRangeArray *slots, slotRange *req) { - for (int i = 0; i < slots->num_ranges; i++) { - slotRange *sr = &slots->ranges[i]; - if (sr->start <= req->end && sr->end >= req->start) - return 1; - } - return 0; -} - -/* Returns 1 if the two slot range arrays overlap, 0 otherwise. */ -static int slotRangeArraysOverlap(slotRangeArray *slots1, slotRangeArray *slots2) { - for (int i = 0; i < slots1->num_ranges; i++) { - slotRange *sr1 = &slots1->ranges[i]; - if (slotRangeArrayOverlaps(slots2, sr1)) return 1; - } - return 0; -} - -/* Returns the ASM task that overlaps with the given slot range, or NULL if - * no such task exists. */ -static asmTask *lookupAsmTaskBySlotRange(slotRange *req) { - listIter li; - listNode *ln; - - listRewind(asmManager->tasks, &li); - while ((ln = listNext(&li)) != NULL) { - asmTask *task = listNodeValue(ln); - if (slotRangeArrayOverlaps(task->slots, req)) - return task; - } - return NULL; -} - -/* Validates the given slot ranges for a migration task: - * - Ensures the current node is a master. - * - Verifies all slots are in a STABLE state. - * - Confirms all slots belong to a single source node. - * - Confirms no ongoing import task that overlaps with the slot ranges. - * - * Returns the source node if validation succeeds. - * Otherwise, returns NULL and sets 'err' variable. */ -static clusterNode *validateImportSlotRanges(slotRangeArray *slots, sds *err, asmTask *current) { - clusterNode *source = NULL; - - *err = NULL; - - /* Ensure this is a master node */ - if (!clusterNodeIsMaster(getMyClusterNode())) { - *err = sdsnew("slot migration not allowed on replica."); - goto out; - } - - /* Ensure no manual migration is in progress. */ - for (int i = 0; i < CLUSTER_SLOTS; i++) { - if (getImportingSlotSource(i) != NULL || - getMigratingSlotDest(i) != NULL) - { - *err = sdsnew("all slot states must be STABLE to start a slot migration task."); - goto out; - } - } - - for (int i = 0; i < slots->num_ranges; i++) { - slotRange *sr = &slots->ranges[i]; - - /* Ensure no import task overlaps with this slot range. - * Skip check current task that is running for this slot range. */ - asmTask *task = lookupAsmTaskBySlotRange(sr); - if (task && task != current && task->operation == ASM_IMPORT) { - *err = sdscatprintf(sdsempty(), - "overlapping import exists for slot range: %d-%d", - sr->start, sr->end); - goto out; - } - - /* Validate if we can start migration task for this slot range. */ - for (int j = sr->start; j <= sr->end; j++) { - clusterNode *node = getNodeBySlot(j); - if (node == NULL) { - *err = sdscatprintf(sdsempty(), "slot has no owner: %d", j); - goto out; - } - - if (!source) { - source = node; - } else if (source != node) { - *err = sdsnew("slots belong to different source nodes"); - goto out; - } - } - } - -out: - return *err ? NULL : source; -} - -/* Returns 1 if a task with the specified operation is in progress, 0 otherwise. */ -static int asmTaskInProgress(int operation) { - listIter li; - listNode *ln; - - if (!asmManager || listLength(asmManager->tasks) == 0) return 0; - - listRewind(asmManager->tasks, &li); - while ((ln = listNext(&li)) != NULL) { - asmTask *task = listNodeValue(ln); - if (task->operation == operation) return 1; - } - return 0; -} - -/* Returns 1 if a migrate task is in progress, 0 otherwise. */ -int asmMigrateInProgress(void) { - return asmTaskInProgress(ASM_MIGRATE); -} - -/* Returns 1 if an import task is in progress, 0 otherwise. */ -int asmImportInProgress(void) { - return asmTaskInProgress(ASM_IMPORT); -} - -/* Returns 1 if the task is in a state where it can receive replication stream -* for the slot range, 0 otherwise. */ -inline static int asmCanFeedMigrationClient(asmTask *task) { - return task->operation == ASM_MIGRATE && - !task->cross_slot_during_propagating && - (task->state == ASM_SEND_BULK_AND_STREAM || - task->state == ASM_SEND_STREAM || - task->state == ASM_HANDOFF_PREP); -} - -/* Feed the migration client with the replication stream for the slot range. */ -void asmFeedMigrationClient(robj **argv, int argc) { - asmTask *task = NULL; - - if (server.cluster_enabled == 0 || listLength(asmManager->tasks) == 0) - return; - - /* Check if there is a migrate task that can receive replication stream. */ - task = listNodeValue(listFirst(asmManager->tasks)); - if (!asmCanFeedMigrationClient(task)) return; - - /* Ensure all arguments are converted to string encoding if necessary, - * since getSlotFromCommand expects them to be string-encoded. - * Generally the arguments are string-encoded, but we may rewrite - * the command arguments to integer encoding. */ - for (int i = 0; i < argc; i++) { - if (!sdsEncodedObject(argv[i])) { - serverAssert(argv[i]->encoding == OBJ_ENCODING_INT); - robj *old = argv[i]; - argv[i] = createStringObjectFromLongLongWithSds((long)old->ptr); - decrRefCount(old); - } - } - - /* Check if the command belongs to the slot range. */ - struct redisCommand *cmd = lookupCommand(argv, argc); - serverAssert(cmd); - - int slot = getSlotFromCommand(cmd, argv, argc); - - /* If the command does not have keys, skip it now. - * SELECT is not propagated, since we only support a single db in cluster mode. - * MULTI/EXEC is not needed, since transaction semantics are unnecessary - * before the slot handoff. - * FUNCTION subcommands should be executed on all nodes, so here we skip it, - * and even propagating them may cause an error when executing. - * - * NOTICE: if some keyless commands should be propagated to the destination, - * we should identify them here and send. */ - if (slot == INVALID_CLUSTER_SLOT) return; - - /* Generally we reject cross-slot commands before executing, but module may - * replicate this kind of command, so we check again. To guarantee data - * consistency, we cancel the task if we encounter a cross-slot command. */ - if (slot == CLUSTER_CROSSSLOT) { - /* We cannot cancel the task directly here, since it may lead to a recursive - * call: asmTaskCancel() --> moduleFireServerEvent() --> moduleFreeContext() - * --> postExecutionUnitOperations() --> propagateNow(). Even worse, this - * could result in propagating pending commands to the replication stream twice. - * To avoid this, we simply set a flag here, cancel the task in beforeSleep. */ - task->cross_slot_during_propagating = 1; - return; - } - - /* Check if the slot belongs to the task's slot range. */ - slotRange sr = {slot, slot}; - if (!slotRangeArrayOverlaps(task->slots, &sr)) return; - - if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) - freeClientAsync(task->main_channel_client); - - /* Feed main channel with the command. */ - client *c = task->main_channel_client; - size_t prev_bytes = getNormalClientPendingReplyBytes(c); - - addReplyArrayLen(c, argc); - for (int i = 0; i < argc; i++) - addReplyBulk(c, argv[i]); - - /* Update the task's source offset to reflect the bytes sent. */ - task->source_offset += (getNormalClientPendingReplyBytes(c) - prev_bytes); -} - -asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *err) { - clusterNode *source; - - *err = NULL; - /* Validate that the slot ranges are valid and that migration can be - * initiated for them. */ - source = validateImportSlotRanges(slots, err, NULL); - if (!source) - goto err; - - if (source == getMyClusterNode()) { - *err = sdsnew("this node is already the owner of the slot range"); - goto err; - } - - /* Only support a single task at a time now. */ - if (listLength(asmManager->tasks) != 0) { - asmTask *current = listNodeValue(listFirst(asmManager->tasks)); - if (current->state == ASM_FAILED) { - /* We can create a new import task only if the current one is failed, - * cancel the failed task to create a new one. */ - asmTaskCancel(current, "new import requested"); - } else { - *err = sdsnew("another ASM task is already in progress"); - goto err; - } - } - /* There should be no task in progress. */ - serverAssert(listLength(asmManager->tasks) == 0); - - /* Create a slot migration task */ - asmTask *task = asmTaskCreate(task_id); - task->slots = slots; - task->state = ASM_NONE; - task->operation = ASM_IMPORT; - task->source_node = source; - memcpy(task->source, clusterNodeGetName(source), CLUSTER_NAMELEN); - memcpy(task->dest, getMyClusterId(), CLUSTER_NAMELEN); - - listAddNodeTail(asmManager->tasks, task); - sds slots_str = slotRangeArrayToString(slots); - serverLog(LL_NOTICE, "Import task %s created: src=%.40s, dest=%.40s, slots=%s", - task->id, task->source, task->dest, slots_str); - sdsfree(slots_str); - - return task; - -err: - slotRangeArrayFree(slots); - return NULL; -} - -/* CLUSTER MIGRATION IMPORT - * - * Sent by operator to the destination node to start the migration. */ -static void clusterMigrationCommandImport(client *c) { - /* Validate slot range arg count */ - int remaining = c->argc - 3; - if (remaining == 0 || remaining % 2 != 0) { - addReplyErrorArity(c); - return; - } - - slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 3); - if (!slots) return; - - sds err = NULL; - asmTask *task = asmCreateImportTask(NULL, slots, &err); - if (!task) { - addReplyErrorSds(c, err); - return; - } - - addReplyBulkCString(c, task->id); -} - -/* CLUSTER MIGRATION CANCEL [ID | ALL] - * - Reply: Number of cancelled tasks - * - * Cancels import tasks that overlap with the specified slot ranges. - * Multiple tasks may be cancelled. */ -static void clusterMigrationCommandCancel(client *c) { - sds task_id = NULL; - int num_cancelled = 0; - - /* Validate slot range arg count */ - if (c->argc != 4 && c->argc != 5) { - addReplyErrorArity(c); - return; - } - - if (!strcasecmp(c->argv[3]->ptr, "id")) { - if (c->argc != 5) { - addReplyErrorArity(c); - return; - } - task_id = c->argv[4]->ptr; - } else if (!strcasecmp(c->argv[3]->ptr, "all")) { - if (c->argc != 4) { - addReplyErrorArity(c); - return; - } - } else { - addReplyError(c, "unknown argument"); - return; - } - - num_cancelled = clusterAsmCancel(task_id, "user request"); - addReplyLongLong(c, num_cancelled); -} - -/* Reply with the status of the task. */ -static void replyTaskStatus(client *c, asmTask *task) { - mstime_t p = 0; - - addReplyMapLen(c, 12); - addReplyBulkCString(c, "id"); - addReplyBulkCString(c, task->id); - addReplyBulkCString(c, "slots"); - addReplyBulkSds(c, slotRangeArrayToString(task->slots)); - addReplyBulkCString(c, "source"); - addReplyBulkCBuffer(c, task->source, CLUSTER_NAMELEN); - addReplyBulkCString(c, "dest"); - addReplyBulkCBuffer(c, task->dest, CLUSTER_NAMELEN); - addReplyBulkCString(c, "operation"); - addReplyBulkCString(c, task->operation == ASM_IMPORT ? "import" : "migrate"); - addReplyBulkCString(c, "state"); - addReplyBulkCString(c, asmTaskStateToString(task->state)); - addReplyBulkCString(c, "last_error"); - addReplyBulkCBuffer(c, task->error, sdslen(task->error)); - addReplyBulkCString(c, "retries"); - addReplyLongLong(c, task->retry_count); - addReplyBulkCString(c, "create_time"); - addReplyLongLong(c, task->create_time); - addReplyBulkCString(c, "start_time"); - addReplyLongLong(c, task->start_time); - addReplyBulkCString(c, "end_time"); - addReplyLongLong(c, task->end_time); - - if (task->operation == ASM_MIGRATE && task->state == ASM_COMPLETED) - p = task->end_time - task->paused_time; - addReplyBulkCString(c, "write_pause_ms"); - addReplyLongLong(c, p); -} - -/* CLUSTER MIGRATION STATUS [ID | ALL] - * - Reply: Array of atomic slot migration tasks */ -static void clusterMigrationCommandStatus(client *c) { - listIter li; - listNode *ln; - - if (c->argc != 4 && c->argc != 5) { - addReplyErrorArity(c); - return; - } - - if (!strcasecmp(c->argv[3]->ptr, "id")) { - if (c->argc != 5) { - addReplyErrorArity(c); - return; - } - sds id = c->argv[4]->ptr; - asmTask *task = asmLookupTaskAt(asmManager->tasks, id); - if (!task) task = asmLookupTaskAt(asmManager->archived_tasks, id); - if (!task) { - addReplyArrayLen(c, 0); - return; - } - - addReplyArrayLen(c, 1); - replyTaskStatus(c, task); - } else if (!strcasecmp(c->argv[3]->ptr, "all")) { - if (c->argc != 4) { - addReplyErrorArity(c); - return; - } - addReplyArrayLen(c, listLength(asmManager->tasks) + - listLength(asmManager->archived_tasks)); - listRewind(asmManager->tasks, &li); - while ((ln = listNext(&li)) != NULL) - replyTaskStatus(c, listNodeValue(ln)); - - listRewind(asmManager->archived_tasks, &li); - while ((ln = listNext(&li)) != NULL) - replyTaskStatus(c, listNodeValue(ln)); - } else { - addReplyError(c, "unknown argument"); - return; - } -} - -/* CLUSTER MIGRATION - * | - * STATUS [ID | ALL] | - * CANCEL [ID | ALL]> -*/ -void clusterMigrationCommand(client *c) { - if (c->argc < 4) { - addReplyErrorArity(c); - return; - } - - if (strcasecmp(c->argv[2]->ptr, "import") == 0) { - clusterMigrationCommandImport(c); - } else if (strcasecmp(c->argv[2]->ptr, "status") == 0) { - clusterMigrationCommandStatus(c); - } else if (strcasecmp(c->argv[2]->ptr, "cancel") == 0) { - clusterMigrationCommandCancel(c); - } else { - addReplyError(c, "unknown argument"); - } -} - -/* Return the number of keys in the specified slot ranges. */ -unsigned long long asmCountKeysInSlots(slotRangeArray *slots) { - if (!slots) return 0; - - unsigned long long key_count = 0; - for (int i = 0; i < slots->num_ranges; i++) { - for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { - key_count += kvstoreDictSize(server.db[0].keys, j); - } - } - return key_count; -} - -/* Log a human-readable message for ASM task lifecycle events. */ -void asmLogTaskEvent(asmTask *task, int event) { - sds str = slotRangeArrayToString(task->slots); - - switch (event) { - case ASM_EVENT_IMPORT_STARTED: - serverLog(LL_NOTICE, "Import task %s started for slots: %s", task->id, str); - break; - case ASM_EVENT_IMPORT_FAILED: - serverLog(LL_NOTICE, "Import task %s failed for slots: %s", task->id, str); - break; - case ASM_EVENT_TAKEOVER: - serverLog(LL_NOTICE, "Import task %s is ready to takeover slots: %s", task->id, str); - break; - case ASM_EVENT_IMPORT_COMPLETED: - serverLog(LL_NOTICE, "Import task %s completed for slots: %s (imported %llu keys)", - task->id, str, asmCountKeysInSlots(task->slots)); - break; - case ASM_EVENT_MIGRATE_STARTED: - serverLog(LL_NOTICE, "Migrate task %s started for slots: %s (keys at start: %llu)", - task->id, str, asmCountKeysInSlots(task->slots)); - break; - case ASM_EVENT_MIGRATE_FAILED: - serverLog(LL_NOTICE, "Migrate task %s failed for slots: %s", task->id, str); - break; - case ASM_EVENT_HANDOFF_PREP: - serverLog(LL_NOTICE, "Migrate task %s preparing to handoff for slots: %s", task->id, str); - break; - case ASM_EVENT_MIGRATE_COMPLETED: - serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s (migrated %llu keys)", - task->id, str, asmCountKeysInSlots(task->slots)); - break; - default: - break; - } - - sdsfree(str); -} - -/* Notify the state change to the module and the cluster implementation. */ -void asmNotifyStateChange(asmTask *task, int event) { - RedisModuleClusterSlotMigrationInfo info = { - .version = REDISMODULE_CLUSTER_SLOT_MIGRATION_INFO_VERSION, - .task_id = task->id, - .slots = (RedisModuleSlotRangeArray *) task->slots - }; - memcpy(info.source_node_id, task->source, CLUSTER_NAMELEN); - memcpy(info.destination_node_id, task->dest, CLUSTER_NAMELEN); - - int module_event = -1; - if (event == ASM_EVENT_IMPORT_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_STARTED; - else if (event == ASM_EVENT_IMPORT_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_COMPLETED; - else if (event == ASM_EVENT_IMPORT_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_FAILED; - else if (event == ASM_EVENT_MIGRATE_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_STARTED; - else if (event == ASM_EVENT_MIGRATE_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED; - else if (event == ASM_EVENT_MIGRATE_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_FAILED; - serverAssert(module_event != -1); - - moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION, module_event, &info); - serverLog(LL_DEBUG, "Fire cluster asm module event, task %s: state=%s", - task->id, asmTaskStateToString(task->state)); - - if (clusterNodeIsMaster(getMyClusterNode())) { - /* Notify the cluster impl only if it is a real active import task. */ - if (task != asmManager->master_task) { - asmLogTaskEvent(task, event); - clusterAsmOnEvent(task->id, event, task->slots); - } - asmNotifyReplicasStateChange(task); /* Propagate state change to replicas */ - } -} - -void asmImportSetFailed(asmTask *task) { - serverAssert(task->operation == ASM_IMPORT); - if (task->state == ASM_FAILED) return; - - /* If we are in the RDB channel transfer state, we need to - * close the client that was created for the RDB channel. */ - if (task->rdb_channel_conn && task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER) { - client *c = connGetPrivateData(task->rdb_channel_conn); - serverAssert(c->task == task); - task->rdb_channel_conn = NULL; - c->task = NULL; - c->flags &= ~CLIENT_MASTER; - freeClientAsync(c); - } - - /* If in the wait stream EOF or streaming buffer state, we need to close the - * client that was created for the main channel. */ - if (task->main_channel_conn && - (task->state == ASM_STREAMING_BUF || task->state == ASM_WAIT_STREAM_EOF)) - { - client *c = connGetPrivateData(task->main_channel_conn); - serverAssert(c->task == task); - task->main_channel_conn = NULL; - c->task = NULL; - c->flags &= ~CLIENT_MASTER; - freeClientAsync(c); - } - - /* Close the connections */ - if (task->rdb_channel_conn) connClose(task->rdb_channel_conn); - if (task->main_channel_conn) connClose(task->main_channel_conn); - task->rdb_channel_conn = NULL; - task->main_channel_conn = NULL; - - /* Clear the replication data buffer */ - asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak, task->sync_buffer.peak); - replDataBufClear(&task->sync_buffer); - - /* Mark the task as failed and notify the cluster */ - task->state = ASM_FAILED; - asmNotifyStateChange(task, ASM_EVENT_IMPORT_FAILED); - /* This node may become replica, only master can setup new slot trimming jobs. */ - if (clusterNodeIsMaster(getMyClusterNode())) - asmTrimJobSchedule(task->slots); -} - -void asmMigrateSetFailed(asmTask *task) { - serverAssert(task->operation == ASM_MIGRATE); - if (task->state == ASM_FAILED) return; - - /* Close the RDB and main channel clients*/ - if (task->rdb_channel_client) { - task->rdb_channel_client->task = NULL; - freeClientAsync(task->rdb_channel_client); - task->rdb_channel_client = NULL; - } - if (task->main_channel_client) { - task->main_channel_client->task = NULL; - freeClientAsync(task->main_channel_client); - task->main_channel_client = NULL; - } - - /* Actually it is not necessary to clear the sync buffer here, - * to make asmTaskReset work properly after migrate task failed */ - replDataBufClear(&task->sync_buffer); - - /* Mark the task as failed and notify the cluster */ - task->state = ASM_FAILED; - asmNotifyStateChange(task, ASM_EVENT_MIGRATE_FAILED); -} - -void asmTaskSetFailed(asmTask *task, const char *fmt, ...) { - va_list ap; - sds error = sdsempty(); - - /* Set the error message */ - va_start(ap, fmt); - error = sdscatvprintf(error, fmt, ap); - va_end(ap); - error = sdscatprintf(error, " (state: %s, rdb_channel_state: %s)", - asmTaskStateToString(task->state), - asmTaskStateToString(task->rdb_channel_state)); - sdsfree(task->error); - task->error = error; - - /* Log the error */ - sds slots_str = slotRangeArrayToString(task->slots); - serverLog(LL_WARNING, "%s task %s failed: slots=%s, err=%s", - task->operation == ASM_IMPORT ? "Import" : "Migrate", - task->id, slots_str, task->error); - sdsfree(slots_str); - - if (task->operation == ASM_IMPORT) - asmImportSetFailed(task); - else - asmMigrateSetFailed(task); -} - -/* The task is completed or canceled. Update stats and move it to - * the archived list. */ -void asmTaskFinalize(asmTask *task) { - listNode *ln = listFirst(asmManager->tasks); - serverAssert(ln->value == task); - - task->source_node = NULL; /* Should never access it */ - task->end_time = server.mstime; - - if (task->operation == ASM_IMPORT) { - asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak, - task->sync_buffer.peak); - replDataBufClear(&task->sync_buffer); /* Not used, so save memory */ - } - - /* Move the task to the archived list */ - listUnlinkNode(asmManager->tasks, ln); - listLinkNodeHead(asmManager->archived_tasks, ln); -} - -static void asmTaskCancel(asmTask *task, const char *reason) { - if (task->state == ASM_CANCELED) return; - - asmTaskSetFailed(task, "Cancelled due to %s", reason); - task->state = ASM_CANCELED; - asmTaskFinalize(task); -} - -void asmImportTakeover(asmTask *task) { - serverAssert(task->state == ASM_WAIT_STREAM_EOF || - task->state == ASM_STREAMING_BUF); - - /* Free the main channel connection since it is no longer needed. */ - serverAssert(task->main_channel_conn != NULL); - client *c = connGetPrivateData(task->main_channel_conn); - c->task = NULL; - c->flags &= ~CLIENT_MASTER; - freeClientAsync(c); - task->main_channel_conn = NULL; - - task->state = ASM_TAKEOVER; - asmLogTaskEvent(task, ASM_EVENT_TAKEOVER); - clusterAsmOnEvent(task->id, ASM_EVENT_TAKEOVER, task->slots); -} - -void asmCallbackOnFreeClient(client *c) { - asmTask *task = c->task; - if (!task) return; - - /* If the RDB channel connection is closed, mark the task as failed. */ - if (c->conn && task->rdb_channel_conn == c->conn) { - /* We create the client only when transferring data on the RDB channel */ - serverAssert(task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER); - task->rdb_channel_conn = NULL; /* Will be freed by freeClient */ - c->flags &= ~CLIENT_MASTER; - asmTaskSetFailed(task, "RDB channel - Connection is closed"); - return; - } - - if (c->conn && task->main_channel_conn == c->conn) { - /* After or in the process of streaming buffer to DB, a client will be - * created based on the main channel connection. */ - serverAssert(task->state == ASM_STREAMING_BUF || - task->state == ASM_WAIT_STREAM_EOF); - task->main_channel_conn = NULL; /* Will be freed by freeClient */ - c->flags &= ~CLIENT_MASTER; - asmTaskSetFailed(task, "Main channel - Connection is closed"); - return; - } - - if (c == task->rdb_channel_client) { - /* TODO: Detect whether the bgsave is completed successfully and - * update the state properly. */ - task->rdb_channel_state = ASM_COMPLETED; - /* We may not have detected whether the child process has exited yet, - * so we can't determine whether the client has completed the slots - * snapshot transfer. If the RDB channel is interrupted unexpectedly, - * the destination side will also close the main channel. - * So here we just reset the RDB channel client of task. */ - task->rdb_channel_client = NULL; - return; - } - - /* If the main channel client is closed, we need to mark the task as failed - * and clean up the RDB channel client if it exists. */ - if (c == task->main_channel_client) { - task->main_channel_client = NULL; - /* The rdb channel client will be cleaned up */ - asmTaskSetFailed(task, "Main and RDB channel clients are disconnected."); - return; - } -} - -/* Sends an AUTH command to the source node using the internal secret. - * Returns an error string if the command fails, or NULL on success. */ -char *asmSendInternalAuth(connection *conn) { - size_t len = 0; - const char *internal_secret = clusterGetSecret(&len); - serverAssert(internal_secret != NULL); - - sds secret = sdsnewlen(internal_secret, len); - char *err = sendCommand(conn, "AUTH", "internal connection", secret, NULL); - sdsfree(secret); - return err; -} - -/* Handles the RDB channel sync with the source node. - * This function is called when the RDB channel is established - * and ready to sync with the source node. */ -void asmRdbChannelSyncWithSource(connection *conn) { - asmTask *task = connGetPrivateData(conn); - char *err = NULL; - sds task_error_msg = NULL; - - /* Check for errors in the socket: after a non blocking connect() we - * may find that the socket is in error state. */ - if (connGetState(conn) != CONN_STATE_CONNECTED) - goto error; - - /* Check if the task is in a fail point state */ - if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) { - char buf[1]; - /* Simulate a failure by shutting down the connection. On some operating - * systems (e.g. Linux), the socket's receive buffer is not flushed - * immediately, so we issue a dummy read to drain any pending data and - * surface the error condition. - * using shutdown() instead of connShutdown() because connTLSShutdown() - * will free the connection directly, which is not what we want. */ - shutdown(conn->fd, SHUT_RDWR); - connRead(conn, buf, 1); - } - - if (task->rdb_channel_state == ASM_CONNECTING) { - connSetReadHandler(conn, asmRdbChannelSyncWithSource); - connSetWriteHandler(conn, NULL); - - /* Send AUTH command to source node using internal auth */ - err = asmSendInternalAuth(conn); - if (err) goto write_error; - task->rdb_channel_state = ASM_AUTH_REPLY; - return; - } - - if (task->rdb_channel_state == ASM_AUTH_REPLY) { - err = receiveSynchronousResponse(conn); - /* The source node did not reply */ - if (err == NULL) goto no_response_error; - - /* Check `+OK` reply */ - if (!strcmp(err, "+OK")) { - sdsfree(err); - err = NULL; - task->rdb_channel_state = ASM_RDBCHANNEL_REQUEST; - serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots rdb channel operation can continue..."); - } else { - task_error_msg = sdscatprintf(sdsempty(), - "Error reply to AUTH from source: %s", err); - sdsfree(err); - goto error; - } - } - - if (task->rdb_channel_state == ASM_RDBCHANNEL_REQUEST) { - err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "RDBCHANNEL", task->id, NULL); - if (err) goto write_error; - task->rdb_channel_state = ASM_RDBCHANNEL_REPLY; - return; - } - - if (task->rdb_channel_state == ASM_RDBCHANNEL_REPLY) { - err = receiveSynchronousResponse(conn); - /* The source node did not reply */ - if (err == NULL) goto no_response_error; - - /* Ignore ‘\n' sent from the source node to keep the connection alive. */ - if (sdslen(err) == 0) { - serverLog(LL_DEBUG, "Received an empty line in RDBCHANNEL reply, slots snapshot delivery will start later"); - sdsfree(err); - return; - } - - /* Check `+SLOTSSNAPSHOT` reply */ - if (!strncmp(err, "+SLOTSSNAPSHOT", strlen("+SLOTSSNAPSHOT"))) { - sdsfree(err); - err = NULL; - task->state = ASM_ACCUMULATE_BUF; - /* The main channel buffers pending commands. */ - connSetReadHandler(task->main_channel_conn, asmSyncBufferReadFromConn); - - task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER; - client *c = createClient(conn); - c->flags |= (CLIENT_MASTER | CLIENT_INTERNAL | CLIENT_ASM_IMPORTING); - c->querybuf = sdsempty(); - c->authenticated = 1; - c->user = NULL; - c->task = task; - serverLog(LL_NOTICE, - "Source node replied to SLOTSSNAPSHOT, syncing slots snapshot can continue..."); - } else { - task_error_msg = sdscatprintf(sdsempty(), - "Error reply to CLUSTER SYNCSLOTS RDBCHANNEL from the source: %s", err); - sdsfree(err); - goto error; - } - return; - } - return; - -no_response_error: - task_error_msg = sdsnew("Source node did not respond to command during RDBCHANNELSYNCSLOTS handshake"); - /* Fall through to regular error handling */ - -error: - asmTaskSetFailed(task, "RDB channel - Failed to sync with the source node: %s", - task_error_msg ? task_error_msg : connGetLastError(conn)); - sdsfree(task_error_msg); - return; - -write_error: /* Handle sendCommand() errors. */ - task_error_msg = sdscatprintf(sdsempty(), "Failed to send command to the source node: %s", err); - sdsfree(err); - goto error; -} - -char *asmSendSlotRangesSync(connection *conn, asmTask *task) { - /* Prepare CLUSTER SYNCSLOTS SYNC command */ - serverAssert(task->slots->num_ranges <= CLUSTER_SLOTS); - int argc = task->slots->num_ranges * 2 + 4; - char **args = zcalloc(sizeof(char*) * argc); - size_t *lens = zcalloc(sizeof(size_t) * argc); - - args[0] = "CLUSTER"; - args[1] = "SYNCSLOTS"; - args[2] = "SYNC"; - args[3] = task->id; - lens[0] = strlen("CLUSTER"); - lens[1] = strlen("SYNCSLOTS"); - lens[2] = strlen("SYNC"); - lens[3] = sdslen(task->id); - - int i = 4; - for (int j = 0; j < task->slots->num_ranges; j++) { - slotRange *sr = &task->slots->ranges[j]; - args[i] = sdscatprintf(sdsempty(), "%d", sr->start); - lens[i] = sdslen(args[i]); - args[i+1] = sdscatprintf(sdsempty(), "%d", sr->end); - lens[i+1] = sdslen(args[i+1]); - i += 2; - } - serverAssert(i == argc); - - /* Send command to source node */ - char *err = sendCommandArgv(conn, argc, args, lens); - - /* Free allocated memory */ - for (int j = 4; j < argc; j++) { - sdsfree(args[j]); - } - zfree(args); - zfree(lens); - - return err; -} - -void asmSyncWithSource(connection *conn) { - asmTask *task = connGetPrivateData(conn); - char *err = NULL; - - /* Some task errors are not network issues, we record them explicitly. */ - sds task_error_msg = NULL; - - /* Check for errors in the socket: after a non blocking connect() we - * may find that the socket is in error state. */ - if (connGetState(conn) != CONN_STATE_CONNECTED) - goto error; - - /* Check if the fail point is active for this channel and state */ - if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_MAIN_CHANNEL, task->state))) { - char buf[1]; - shutdown(conn->fd, SHUT_RDWR); - connRead(conn, buf, 1); - } - - if (task->state == ASM_CONNECTING) { - connSetReadHandler(conn, asmSyncWithSource); - connSetWriteHandler(conn, NULL); - /* Send AUTH command to source node using internal auth */ - err = asmSendInternalAuth(conn); - if (err) goto write_error; - task->state = ASM_AUTH_REPLY; - return; - } - - if (task->state == ASM_AUTH_REPLY) { - err = receiveSynchronousResponse(conn); - /* The source node did not reply */ - if (err == NULL) goto no_response_error; - - /* Check `+OK` reply */ - if (!strcmp(err, "+OK")) { - sdsfree(err); - err = NULL; - task->state = ASM_SEND_HANDSHAKE; - serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots can continue..."); - } else { - task_error_msg = sdscatprintf(sdsempty(), - "Error reply to AUTH from the source: %s", err); - sdsfree(err); - goto error; - } - } - - if (task->state == ASM_SEND_HANDSHAKE) { - sds node_id = sdsnewlen(clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN); - err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "CONF", "NODE-ID", node_id, NULL); - sdsfree(node_id); - if (err) goto write_error; - task->state = ASM_HANDSHAKE_REPLY; - return; - } - - if (task->state == ASM_HANDSHAKE_REPLY) { - err = receiveSynchronousResponse(conn); - /* The source node did not reply */ - if (err == NULL) goto no_response_error; - - /* Check `+OK` reply */ - if (!strcmp(err, "+OK")) { - sdsfree(err); - err = NULL; - task->state = ASM_SEND_SYNCSLOTS; - serverLog(LL_NOTICE, "Source node replied to SYNCSLOTS CONF command, syncslots can continue..."); - } else { - task_error_msg = sdscatprintf(sdsempty(), - "Error reply to CLUSTER SYNCSLOTS CONF from the source: %s", err); - sdsfree(err); - goto error; - } - } - - if (task->state == ASM_SEND_SYNCSLOTS) { - err = asmSendSlotRangesSync(conn, task); - if (err) goto write_error; - - task->state = ASM_SYNCSLOTS_REPLY; - return; - } - - if (task->state == ASM_SYNCSLOTS_REPLY) { - err = receiveSynchronousResponse(conn); - /* The source node did not reply */ - if (err == NULL) goto no_response_error; - - /* Check `+RDBCHANNELSYNCSLOTS` reply */ - if (!strncmp(err, "+RDBCHANNELSYNCSLOTS", strlen("+RDBCHANNELSYNCSLOTS"))) { - sdsfree(err); - err = NULL; - task->state = ASM_INIT_RDBCHANNEL; - serverLog(LL_NOTICE, - "Source node replied to SYNCSLOTS SYNC, syncslots can continue..."); - } else if (!strncmp(err, "-NOTREADY", strlen("-NOTREADY"))) { - /* The source-side cluster is temporarily not ready to start a - * migration and replied -NOTREADY. We could fail this attempt and - * let the import task start another attempt later but that could - * trigger unnecessary cleanup in the cluster implementation. - * Instead, we'll retry sending SYNCSLOTS later in asmCron(). */ - sdsfree(err); - task->state = ASM_SEND_SYNCSLOTS; - serverLog(LL_NOTICE, - "Source node replied to SYNCSLOTS SYNC with -NOTREADY, will retry later..."); - return; - } else { - task_error_msg = sdscatprintf(sdsempty(), - "Error reply to CLUSTER SYNCSLOTS SYNC from the source: %s", err); - sdsfree(err); - goto error; - } - } - - if (task->state == ASM_INIT_RDBCHANNEL) { - /* Create RDB channel connection */ - char *ip = clusterNodeIp(task->source_node); - int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) : - clusterNodeTcpPort(task->source_node); - task->rdb_channel_conn = connCreate(server.el, connTypeOfReplication()); - if (connConnect(task->rdb_channel_conn, ip, port, - server.bind_source_addr, asmRdbChannelSyncWithSource) == C_ERR) - { - serverLog(LL_WARNING, "Unable to connect to the source node: %s", - connGetLastError(task->rdb_channel_conn)); - goto error; - } - task->rdb_channel_state = ASM_CONNECTING; - connSetPrivateData(task->rdb_channel_conn, task); - serverLog(LL_NOTICE, - "RDB channel connection to source node %.40s established, waiting for AUTH reply...", - task->source); - - /* Main channel waits for the new event */ - connSetReadHandler(conn, NULL); - return; - } - return; - -no_response_error: - serverLog(LL_WARNING, "Source node did not respond to command during SYNCSLOTS handshake"); - /* Fall through to regular error handling */ - -error: - asmTaskSetFailed(task, "Main channel - Failed to sync with source node: %s", - task_error_msg ? task_error_msg : connGetLastError(conn)); - sdsfree(task_error_msg); - return; - -write_error: /* Handle sendCommand() errors. */ - serverLog(LL_WARNING, "Failed to send command to source node: %s", err); - sdsfree(err); - goto error; -} - -int asmImportSendACK(asmTask *task) { - serverAssert(task->operation == ASM_IMPORT && task->state == ASM_WAIT_STREAM_EOF); - serverLog(LL_DEBUG, "Destination node applied offset is %lld", task->dest_offset); - - char offset[64]; - ull2string(offset, sizeof(offset), task->dest_offset); - - char *err = sendCommand(task->main_channel_conn, "CLUSTER", "SYNCSLOTS", "ACK", - asmTaskStateToString(task->state), offset, NULL); - if (err) { - asmTaskSetFailed(task, "Main channel - Failed to send ACK: %s", err); - sdsfree(err); - return C_ERR; - } - return C_OK; -} - -/* Called when the RDB channel begins sending the snapshot. - * From this point on, the main channel also starts sending incremental streams. */ -void asmSlotSnapshotAndStreamStart(struct asmTask *task) { - if (task == NULL || task->state != ASM_WAIT_BGSAVE_START) return; - - if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_RDB_CHANNEL, task->state))) { - shutdown(task->rdb_channel_client->conn->fd, SHUT_RDWR); - return; - } - task->main_channel_client->replstate = SLAVE_STATE_SEND_BULK_AND_STREAM; - - task->state = ASM_SEND_BULK_AND_STREAM; - task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER; - - /* From the source node's perspective, the destination node begins to accumulate - * the buffer while the RDB channel starts applying the slot snapshot data. */ - task->dest_state = ASM_ACCUMULATE_BUF; - task->dest_slots_snapshot_time = server.mstime; -} - -/* Called when the RDB channel has succeeded in sending the snapshot. */ -void asmSlotSnapshotSucceed(struct asmTask *task) { - if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; - - /* The destination starts sending ACKs to keep the main channel alive after - * receiving the snapshot, so here we need to update the last interaction - * time to avoid false timeout. */ - task->main_channel_client->lastinteraction = server.unixtime; - - task->state = ASM_SEND_STREAM; - task->rdb_channel_state = ASM_COMPLETED; -} - -/* Called when the RDB channel fails to send the snapshot. */ -void asmSlotSnapshotFailed(struct asmTask *task) { - if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; - - asmTaskSetFailed(task, "RDB channel - Failed to send slots snapshot"); -} - -/* CLUSTER SYNCSLOTS SNAPSHOT-EOF - * - * This command is sent by the source node to the destination node to indicate - * that the slots snapshot has ended. */ -void clusterSyncSlotsSnapshotEOF(client *c) { - /* This client is RDB channel connection. */ - asmTask *task = c->task; - if (!task || task->rdb_channel_state != ASM_RDBCHANNEL_TRANSFER || - c->conn != task->rdb_channel_conn) - { - /* Unexpected SNAPSHOT-EOF command */ - serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS SNAPSHOT-EOF command: " - "rdb_channel_state=%s", - asmTaskStateToString(task ? task->rdb_channel_state : ASM_NONE)); - freeClientAsync(c); - return; - } - - /* RDB channel state: ASM_RDBCHANNEL_TRANSFER */ - if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) { - freeClientAsync(c); /* Simulate a failure */ - return; - } - - /* Clear the RDB channel connection */ - task->rdb_channel_conn = NULL; - task->rdb_channel_state = ASM_COMPLETED; - serverLog(LL_NOTICE, "RDB channel snapshot transfer completed for the import task."); - - /* Free the RDB channel connection. */ - c->task = NULL; - c->flags &= ~CLIENT_MASTER; - freeClientAsync(c); - - /* Will start streaming the buffer to DB, don't start here since now - * we are in the context of executing command, otherwise, redis will - * generate a big MULTI-EXEC including all the commands in the buffer. - * just update the state here, and do it in beforeSleep(). */ - task->state = ASM_READY_TO_STREAM; - connSetReadHandler(task->main_channel_conn, NULL); -} - -/* CLUSTER SYNCSLOTS STREAM-EOF - * - * This command is sent by the source node to the destination node to indicate - * that the slot sync stream has ended and the slots can be handed off. */ -void clusterSyncSlotsStreamEOF(client *c) { - asmTask *task = c->task; - - if (!task || task->operation != ASM_IMPORT) { - serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF command"); - freeClientAsync(c); - return; - } - - if (task->state == ASM_STREAMING_BUF) { - /* We are still streaming the buffer to DB, mark the EOF received, and we - * can take over after streaming is EOF. Since we may release the context - * in asmImportTakeover, this breaks the context for streaming buffer. */ - task->stream_eof_during_streaming = 1; - serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received during streaming buffer"); - return; - } - - if (task->state != ASM_WAIT_STREAM_EOF) { - serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF state: %s", - asmTaskStateToString(task->state)); - freeClientAsync(c); - return; - } - serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received when waiting for STREAM-EOF"); - - /* STREAM-EOF received, the source is ready to handoff, takeover now. */ - asmImportTakeover(task); -} - -/* Start the import task. */ -static void asmStartImportTask(asmTask *task) { - if (task->operation != ASM_IMPORT || task->state != ASM_NONE) return; - sds slots_str = slotRangeArrayToString(task->slots); - - /* Sanity check: Clean up any keys that exist in slots not owned by this node. - * This handles cases where users previously migrated slots using legacy method - * but left behind orphaned keys, or maybe cluster missed cleaning up during - * previous operations, which could interfere with the ASM import process. */ - asmTrimSlotsIfNotOwned(task->slots); - - /* Check if there is any trim job in progress for the slot ranges. - * We can't start the import task since the trim job will modify the data.*/ - int trim_in_progress = asmIsAnyTrimJobOverlaps(task->slots); - - /* Notify the cluster implementation to prepare for the import task. */ - int impl_ret = clusterAsmOnEvent(task->id, ASM_EVENT_IMPORT_PREP, task->slots); - - /* We do not start the import task if trim is disabled by module. */ - int disabled_by_module = server.cluster_module_trim_disablers > 0; - - static int start_blocked_logged = 0; - /* Cannot start import task since pause action is performed. Otherwise, we - * will break the promise that no writes are performed during the pause. */ - if (isPausedActions(PAUSE_ACTION_CLIENT_ALL) || - isPausedActions(PAUSE_ACTION_CLIENT_WRITE) || - trim_in_progress || - impl_ret != C_OK || - disabled_by_module) - { - const char *reason = disabled_by_module ? "trim is disabled by module" : - impl_ret != C_OK ? "cluster is not ready" : - trim_in_progress ? "trim in progress for some of the slots" : - "server paused"; - if (start_blocked_logged == 0) { - serverLog(LL_WARNING, "Can not start import task %s for slots: %s due to %s", - task->id, slots_str, reason); - start_blocked_logged = 1; - } - sdsfree(slots_str); - return; - } - start_blocked_logged = 0; /* Reset the log flag */ - - /* Detect if the cluster topology is changed. We should cancel the task if - * we can not schedule it, and update the source node if needed. */ - sds err = NULL; - clusterNode *source = validateImportSlotRanges(task->slots, &err, task); - if (!source) { - asmTaskCancel(task, err); - sdsfree(slots_str); - sdsfree(err); - return; - } - /* Now I'm the owner of the slot range, cancel the import task. */ - if (source == getMyClusterNode()) { - asmTaskCancel(task, "slots owned by myself now"); - sdsfree(slots_str); - return; - } - /* Change the source node if needed. */ - if (source != task->source_node) { - task->source_node = source; - memcpy(task->source, clusterNodeGetName(source), CLUSTER_NAMELEN); - serverLog(LL_NOTICE, "Import task %s source node changed: slots=%s, " - "new_source=%.40s", task->id, slots_str, clusterNodeGetName(source)); - } - sdsfree(slots_str); - - task->state = ASM_CONNECTING; - task->start_time = server.mstime; - asmNotifyStateChange(task, ASM_EVENT_IMPORT_STARTED); - - task->main_channel_conn = connCreate(server.el, connTypeOfReplication()); - char *ip = clusterNodeIp(task->source_node); - int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) : - clusterNodeTcpPort(task->source_node); - if (connConnect(task->main_channel_conn, ip, port, server.bind_source_addr, - asmSyncWithSource) == C_ERR) - { - asmTaskSetFailed(task, "Main channel - Failed to connect to source node: %s", - connGetLastError(task->main_channel_conn)); - return; - } - connSetPrivateData(task->main_channel_conn, task); -} - -void clusterSyncSlotsCommand(client *c) { - /* Only internal clients are allowed to execute this command to avoid - * potential attack, since some state changes are not well protected, - * external clients may damage the slot migration state. */ - if (!(c->flags & (CLIENT_INTERNAL | CLIENT_MASTER))) { - addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for internal clients"); - c->flags |= CLIENT_CLOSE_AFTER_REPLY; - return; - } - - /* On replica, only allow master client to execute CONF subcommand. */ - if (!clusterNodeIsMaster(getMyClusterNode())) { - if (!(c->flags & CLIENT_MASTER)) { - /* Not master client, reject all subcommands and close the connection. */ - addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for master"); - c->flags |= CLIENT_CLOSE_AFTER_REPLY; - return; - } else { - /* Only allow CONF subcommand on replica. */ - if (strcasecmp(c->argv[2]->ptr, "conf")) return; - } - } - - if (!strcasecmp(c->argv[2]->ptr, "sync") && c->argc >= 6) { - /* CLUSTER SYNCSLOTS SYNC [ ] */ - if (c->argc % 2 == 1) { - addReplyErrorArity(c); - return; - } - - slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 4); - if (!slots) return; - - /* Validate that the slot ranges are valid and that migration can be - * initiated for them. */ - sds err = NULL; - clusterNode *source = validateImportSlotRanges(slots, &err, NULL); - if (!source) { - addReplyErrorSds(c, err); - slotRangeArrayFree(slots); - return; - } - - /* Check if the source node is the same as the current node. */ - if (source != getMyClusterNode()) { - addReplyError(c, "This node is not the owner of the slots"); - slotRangeArrayFree(slots); - return; - } - - /* Verify the destination node is known and is a master. */ - if (c->node_id) { - clusterNode *dest = clusterLookupNode(c->node_id, CLUSTER_NAMELEN); - if (dest == NULL || !clusterNodeIsMaster(dest)) { - addReplyErrorFormat(c, "Destination node %.40s is not a master", c->node_id); - slotRangeArrayFree(slots); - return; - } - } - - sds task_id = c->argv[3]->ptr; - /* Notify the cluster implementation to prepare for the migrate task. */ - if (clusterAsmOnEvent(task_id, ASM_EVENT_MIGRATE_PREP, slots) != C_OK || - asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, ASM_NONE)) - { - addReplyError(c, "-NOTREADY Cluster is not ready to migrate slots"); - slotRangeArrayFree(slots); - return; - } - - /* We do not start the migrate task if trim is disabled by module. */ - int disabled_by_module = server.cluster_module_trim_disablers > 0; - if (disabled_by_module) { - addReplyError(c, "Trim is disabled by module"); - slotRangeArrayFree(slots); - return; - } - - asmTask *task = listLength(asmManager->tasks) == 0 ? NULL : - listNodeValue(listFirst(asmManager->tasks)); - if (task && !strcmp(task->id, task_id) && - task->operation == ASM_MIGRATE && task->state == ASM_FAILED && - slotRangeArrayIsEqual(slots, task->slots) && - memcmp(task->dest, c->node_id, CLUSTER_NAMELEN) == 0) - { - /* Reuse the failed task */ - asmTaskReset(task); - slotRangeArrayFree(task->slots); /* Will be set again later */ - task->retry_count++; - } else if (task) { - if (task->state == ASM_FAILED) { - /* We can create a new migrate task only if the current one is - * failed, cancel the failed task to create a new one. */ - asmTaskCancel(task, "new migration requested"); - task = NULL; - } else { - addReplyError(c, "Another ASM task is already in progress"); - slotRangeArrayFree(slots); - return; - } - } - - /* Create the migrate slots task and add it to the list, - * otherwise reuse the existing one */ - if (task == NULL) { - task = asmTaskCreate(task_id); - task->start_time = server.mstime; /* Start immediately */ - serverAssert(listLength(asmManager->tasks) == 0); - listAddNodeTail(asmManager->tasks, task); - } - - task->slots = slots; - task->operation = ASM_MIGRATE; - memcpy(task->source, clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN); - if (c->node_id) memcpy(task->dest, c->node_id, CLUSTER_NAMELEN); - - task->main_channel_client = c; - c->task = task; - - /* We mark the main channel client as a replica, so this client is limited - * by the client output buffer settings for replicas. The replstate has - * no real significance, just to prevent it from going online. */ - c->flags |= (CLIENT_SLAVE | CLIENT_ASM_MIGRATING); - c->replstate = SLAVE_STATE_WAIT_RDB_CHANNEL; - if (server.repl_disable_tcp_nodelay) - connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */ - listAddNodeTail(server.slaves, c); - createReplicationBacklogIfNeeded(); - - /* Wait for RDB channel to be ready */ - task->state = ASM_WAIT_RDBCHANNEL; - - sds slots_str = slotRangeArrayToString(slots); - serverLog(LL_NOTICE, "Migrate task %s created: src=%.40s, dest=%.40s, slots=%s", - task->id, task->source, task->dest, slots_str); - sdsfree(slots_str); - - asmNotifyStateChange(task, ASM_EVENT_MIGRATE_STARTED); - - /* Keep the client in the main thread to avoid data races between the - * connWrite call below and the client's event handler in IO threads. */ - if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); - - /* addReply*() is not suitable for clients in SLAVE_STATE_WAIT_RDB_CHANNEL state. */ - if (connWrite(c->conn, "+RDBCHANNELSYNCSLOTS\r\n", 22) != 22) - freeClientAsync(c); - } else if (!strcasecmp(c->argv[2]->ptr, "rdbchannel") && c->argc == 4) { - /* CLUSTER SYNCSLOTS RDBCHANNEL */ - sds task_id = c->argv[3]->ptr; - if (sdslen(task_id) != CLUSTER_NAMELEN) { - addReplyError(c, "Invalid task id"); - return; - } - - if (listLength(asmManager->tasks) == 0) { - addReplyError(c, "No slot migration task in progress"); - return; - } - - asmTask *task = listNodeValue(listFirst(asmManager->tasks)); - if (task->operation != ASM_MIGRATE || task->state != ASM_WAIT_RDBCHANNEL || - strcmp(task->id, task_id) != 0) - { - addReplyError(c, "Another migration task is already in progress"); - return; - } - - if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) { - /* Close the main channel client before rdb channel client connects */ - if (task->main_channel_client) - freeClient(task->main_channel_client); - } - - /* The main channel client must be present when setting RDB channel client */ - if (task->main_channel_client == NULL) { - /* Maybe the main channel connection is closed. */ - addReplyError(c, "Main channel connection is not established"); - return; - } - - /* Mark the client as a slave to generate slots snapshot */ - c->flags |= (CLIENT_SLAVE | CLIENT_REPL_RDB_CHANNEL | CLIENT_REPL_RDBONLY | CLIENT_ASM_MIGRATING); - c->slave_capa |= SLAVE_CAPA_EOF; - c->slave_req |= (SLAVE_REQ_SLOTS_SNAPSHOT | SLAVE_REQ_RDB_CHANNEL); - c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; - c->repldbfd = -1; - if (server.repl_disable_tcp_nodelay) - connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */ - listAddNodeTail(server.slaves, c); - - /* Wait for bgsave to start for slots sync */ - task->state = ASM_WAIT_BGSAVE_START; - task->rdb_channel_state = ASM_WAIT_BGSAVE_START; - task->rdb_channel_client = c; - c->task = task; - - /* Keep the client in the main thread to avoid data races between the - * connWrite call in startBgsaveForReplication and the client's event - * handler in IO threads. */ - if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); - - if (!hasActiveChildProcess()) { - startBgsaveForReplication(c->slave_capa, c->slave_req); - } else { - serverLog(LL_NOTICE, "BGSAVE for slots snapshot sync delayed"); - } - } else if (!strcasecmp(c->argv[2]->ptr, "snapshot-eof") && c->argc == 3) { - /* CLUSTER SYNCSLOTS SNAPSHOT-EOF */ - clusterSyncSlotsSnapshotEOF(c); - } else if (!strcasecmp(c->argv[2]->ptr, "stream-eof") && c->argc == 3) { - /* CLUSTER SYNCSLOTS STREAM-EOF */ - clusterSyncSlotsStreamEOF(c); - } else if (!strcasecmp(c->argv[2]->ptr, "ack") && c->argc == 5) { - /* CLUSTER SYNCSLOTS ACK */ - long long offset; - int dest_state; - - if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_STREAMING_BUF))) { - dest_state = ASM_STREAMING_BUF; - } else if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_WAIT_STREAM_EOF))) { - dest_state = ASM_WAIT_STREAM_EOF; - } else { - return; /* Not support now. */ - } - - if ((getLongLongFromObject(c->argv[4], &offset) != C_OK)) - return; - - if (c->task && c->task->operation == ASM_MIGRATE) { - /* Update the state and ACKed offset from destination. */ - asmTask *task = c->task; - task->dest_state = dest_state; - if (task->dest_offset > (unsigned long long) offset) { - serverLog(LL_WARNING, "CLUSTER SYNCSLOTS ACK received, dest state: %s, " - "but offset %lld is less than the current dest offset %lld", - asmTaskStateToString(dest_state), offset, task->dest_offset); - return; - } - task->dest_offset = offset; - serverLog(LL_DEBUG, "CLUSTER SYNCSLOTS ACK received, dest state: %s, " - "updated dest offset to %lld, source offset: %lld", - asmTaskStateToString(dest_state), task->dest_offset, task->source_offset); - - /* Record the time when the destination finishes applying the accumulated buffer */ - if (task->dest_state == ASM_WAIT_STREAM_EOF && task->dest_accum_applied_time == 0) - task->dest_accum_applied_time = server.mstime; - - /* Pause write if needed */ - if (task->state == ASM_SEND_BULK_AND_STREAM || task->state == ASM_SEND_STREAM) { - /* Pause writes on the main channel if the lag is less than the threshold. */ - if (task->dest_offset + server.asm_handoff_max_lag_bytes >= task->source_offset) { - if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, ASM_HANDOFF_PREP))) - return; /* Do not enter handoff prep state for testing buffer drain timeout. */ - - serverLog(LL_NOTICE, "The applied offset lag %lld is less than the threshold %lld, " - "pausing writes for slot handoff", - task->source_offset - task->dest_offset, - server.asm_handoff_max_lag_bytes); - task->state = ASM_HANDOFF_PREP; - asmLogTaskEvent(task, ASM_EVENT_HANDOFF_PREP); - clusterAsmOnEvent(task->id, ASM_EVENT_HANDOFF_PREP, task->slots); - } - } - } - } else if (!strcasecmp(c->argv[2]->ptr, "fail") && c->argc == 4) { - /* CLUSTER SYNCSLOTS FAIL */ - return; /* This is a no-op, just to handle the command syntax. */ - } else if (!strcasecmp(c->argv[2]->ptr, "conf") && c->argc >= 5) { - /* CLUSTER SYNCSLOTS CONF