diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
| commit | dcacc00e3750300617ba6e16eb346713f91a783a (patch) | |
| tree | 38e2d4fb5ed9d119711d4295c6eda4b014af73fd /examples/redis-unstable/src/cluster_asm.c | |
| parent | 58dac10aeb8f5a041c46bddbeaf4c7966a99b998 (diff) | |
| download | crep-dcacc00e3750300617ba6e16eb346713f91a783a.tar.gz | |
Remove testing data
Diffstat (limited to 'examples/redis-unstable/src/cluster_asm.c')
| -rw-r--r-- | examples/redis-unstable/src/cluster_asm.c | 3602 |
1 files changed, 0 insertions, 3602 deletions
diff --git a/examples/redis-unstable/src/cluster_asm.c b/examples/redis-unstable/src/cluster_asm.c deleted file mode 100644 index a090453..0000000 --- a/examples/redis-unstable/src/cluster_asm.c +++ /dev/null @@ -1,3602 +0,0 @@ -/* cluster_asm.c -- Atomic slot migration implementation for cluster - * - * Copyright (c) 2025-Present, Redis Ltd. - * All rights reserved. - * - * Licensed under your choice of (a) the Redis Source Available License 2.0 - * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the - * GNU Affero General Public License v3 (AGPLv3). - */ - -#include "server.h" -#include "cluster.h" -#include "functions.h" -#include "cluster_asm.h" -#include "cluster_slot_stats.h" - -#define ASM_IMPORT (1 << 1) -#define ASM_MIGRATE (1 << 2) - -#define ASM_DEBUG_TRIM_DEFAULT 0 -#define ASM_DEBUG_TRIM_NONE 1 -#define ASM_DEBUG_TRIM_BG 2 -#define ASM_DEBUG_TRIM_ACTIVE 3 - -#define ASM_AOF_MIN_ITEMS_PER_KEY 512 /* Minimum number of items per key to use AOF format encoding */ - -typedef struct asmTask { - sds id; /* Task ID */ - int operation; /* Either ASM_IMPORT or ASM_MIGRATE */ - slotRangeArray *slots; /* List of slot ranges for this migration task */ - int state; /* Current state of the task */ - int dest_state; /* Destination node's main state (approximate) */ - char source[CLUSTER_NAMELEN]; /* Source node name */ - char dest[CLUSTER_NAMELEN]; /* Destination node name */ - clusterNode *source_node; /* Source node */ - connection *main_channel_conn; /* Main channel connection */ - connection *rdb_channel_conn; /* RDB channel connection */ - int rdb_channel_state; /* State of the RDB channel */ - unsigned long long dest_offset; /* Destination offset */ - unsigned long long source_offset; /* Source offset */ - int cross_slot_during_propagating; /* If cross-slot commands are encountered during propagating */ - int stream_eof_during_streaming; /* If STREAM-EOF is received during streaming buffer */ - replDataBuf sync_buffer; /* Buffer for the stream */ - client *main_channel_client; /* Client for the main channel on the source side */ - client *rdb_channel_client; /* Client for the RDB channel on the source side */ - long long retry_count; /* Number of retries for this task */ - mstime_t create_time; /* Task creation time */ - mstime_t start_time; /* Task start time */ - mstime_t end_time; /* Task end time */ - mstime_t paused_time; /* The time when the slot writes were paused */ - mstime_t dest_slots_snapshot_time; /* The time when the destination starts applying the slot snapshot */ - mstime_t dest_accum_applied_time; /* The time when the destination finishes applying the accumulated buffer */ - sds error; /* Error message for this task */ - redisOpArray *pre_snapshot_module_cmds; /* Module commands to be propagated at the beginning of slot migration */ -} asmTask; - -struct asmManager { - list *tasks; /* List of asmTask to be processed */ - list *archived_tasks; /* List of archived asmTask */ - list *pending_trim_jobs; /* List of pending trim jobs (due to write pause) */ - list *active_trim_jobs; /* List of active trim jobs */ - slotRangeArrayIter *active_trim_it; /* Iterator of the current active trim job */ - size_t sync_buffer_peak; /* Peak size of sync buffer */ - asmTask *master_task; /* The task that is currently active on the master */ - - /* Fail point injection for debugging */ - int debug_fail_channel; /* Channel where the task will fail */ - int debug_fail_state; /* State where the task will fail */ - int debug_trim_method; /* Method to trim the buffer */ - int debug_active_trim_delay; /* Sleep before trimming each key */ - - /* Active trim stats */ - unsigned long long active_trim_started; /* Number of times active trim was started */ - unsigned long long active_trim_completed; /* Number of times active trim was completed */ - unsigned long long active_trim_cancelled; /* Number of times active trim was cancelled */ - unsigned long long active_trim_current_job_keys; /* Total number of keys to trim in the current job */ - unsigned long long active_trim_current_job_trimmed; /* Number of keys trimmed in the current job */ -}; - -enum asmState { - /* Common state */ - ASM_NONE = 0, - ASM_CONNECTING, - ASM_AUTH_REPLY, - ASM_CANCELED, - ASM_FAILED, - ASM_COMPLETED, - - /* Import state */ - ASM_SEND_HANDSHAKE, - ASM_HANDSHAKE_REPLY, - ASM_SEND_SYNCSLOTS, - ASM_SYNCSLOTS_REPLY, - ASM_INIT_RDBCHANNEL, - ASM_ACCUMULATE_BUF, - ASM_READY_TO_STREAM, - ASM_STREAMING_BUF, - ASM_WAIT_STREAM_EOF, - ASM_TAKEOVER, - - /* Migrate state */ - ASM_WAIT_RDBCHANNEL, - ASM_WAIT_BGSAVE_START, - ASM_SEND_BULK_AND_STREAM, - ASM_SEND_STREAM, - ASM_HANDOFF_PREP, - ASM_HANDOFF, - ASM_STREAM_EOF, - - /* RDB channel state */ - ASM_RDBCHANNEL_REQUEST, - ASM_RDBCHANNEL_REPLY, - ASM_RDBCHANNEL_TRANSFER, -}; - -enum asmChannel { - ASM_IMPORT_MAIN_CHANNEL = 1, /* Main channel for the import task */ - ASM_IMPORT_RDB_CHANNEL, /* RDB channel for the import task */ - ASM_MIGRATE_MAIN_CHANNEL, /* Main channel for the migrate task */ - ASM_MIGRATE_RDB_CHANNEL /* RDB channel for the migrate task */ -}; - -/* Global ASM manager */ -struct asmManager *asmManager = NULL; - -/* replication.c */ -char *sendCommand(connection *conn, ...); -char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens); -char *receiveSynchronousResponse(connection *conn); -ConnectionType *connTypeOfReplication(void); -int startBgsaveForReplication(int mincapa, int req); -void createReplicationBacklogIfNeeded(void); -/* cluster.c */ -void createDumpPayload(rio *payload, robj *o, robj *key, int dbid, int skip_checksum); -/* cluster_asm.c */ -static void asmStartImportTask(asmTask *task); -static void asmTaskCancel(asmTask *task, const char *reason); -static void asmSyncBufferReadFromConn(connection *conn); -static void propagateTrimSlots(slotRangeArray *slots); -void asmTrimJobSchedule(slotRangeArray *slots); -void asmTrimJobProcessPending(void); -void asmCancelPendingTrimJobs(void); -void asmTriggerActiveTrim(slotRangeArray *slots); -void asmActiveTrimEnd(void); -int asmIsAnyTrimJobOverlaps(slotRangeArray *slots); -void asmTrimSlotsIfNotOwned(slotRangeArray *slots); -void asmNotifyStateChange(asmTask *task, int event); - -void asmInit(void) { - asmManager = zcalloc(sizeof(*asmManager)); - asmManager->tasks = listCreate(); - asmManager->archived_tasks = listCreate(); - asmManager->pending_trim_jobs = listCreate(); - asmManager->sync_buffer_peak = 0; - asmManager->master_task = NULL; - asmManager->debug_fail_channel = -1; - asmManager->debug_fail_state = -1; - asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT; - asmManager->debug_active_trim_delay = 0; - asmManager->active_trim_jobs = listCreate(); - asmManager->active_trim_started = 0; - asmManager->active_trim_completed = 0; - asmManager->active_trim_cancelled = 0; - listSetFreeMethod(asmManager->active_trim_jobs, slotRangeArrayFreeGeneric); -} - -char *asmTaskStateToString(int state) { - switch (state) { - case ASM_NONE: return "none"; - case ASM_CONNECTING: return "connecting"; - case ASM_AUTH_REPLY: return "auth-reply"; - case ASM_CANCELED: return "canceled"; - case ASM_FAILED: return "failed"; - case ASM_COMPLETED: return "completed"; - - /* Import state */ - case ASM_SEND_HANDSHAKE: return "send-handshake"; - case ASM_HANDSHAKE_REPLY: return "handshake-reply"; - case ASM_SEND_SYNCSLOTS: return "send-syncslots"; - case ASM_SYNCSLOTS_REPLY: return "syncslots-reply"; - case ASM_INIT_RDBCHANNEL: return "init-rdbchannel"; - case ASM_ACCUMULATE_BUF: return "accumulate-buffer"; - case ASM_READY_TO_STREAM: return "ready-to-stream"; - case ASM_STREAMING_BUF: return "streaming-buffer"; - case ASM_WAIT_STREAM_EOF: return "wait-stream-eof"; - case ASM_TAKEOVER: return "takeover"; - - /* Migrate state */ - case ASM_WAIT_RDBCHANNEL: return "wait-rdbchannel"; - case ASM_WAIT_BGSAVE_START: return "wait-bgsave-start"; - case ASM_SEND_BULK_AND_STREAM: return "send-bulk-and-stream"; - case ASM_SEND_STREAM: return "send-stream"; - case ASM_HANDOFF_PREP: return "handoff-prep"; - case ASM_HANDOFF: return "handoff"; - case ASM_STREAM_EOF: return "stream-eof"; - - /* RDB channel state */ - case ASM_RDBCHANNEL_REQUEST: return "rdbchannel-request"; - case ASM_RDBCHANNEL_REPLY: return "rdbchannel-reply"; - case ASM_RDBCHANNEL_TRANSFER: return "rdbchannel-transfer"; - - default: return "unknown"; - } - serverAssert(0); /* Unreachable */ -} - -const char *asmChannelToString(int channel) { - switch (channel) { - case ASM_IMPORT_MAIN_CHANNEL: return "import-main-channel"; - case ASM_IMPORT_RDB_CHANNEL: return "import-rdb-channel"; - case ASM_MIGRATE_MAIN_CHANNEL: return "migrate-main-channel"; - case ASM_MIGRATE_RDB_CHANNEL: return "migrate-rdb-channel"; - default: return "unknown"; - } -} - -int asmDebugSetFailPoint(char *channel, char *state) { - if (!asmManager) { - serverLog(LL_WARNING, "ASM manager is not initialized"); - return C_ERR; - } - asmManager->debug_fail_channel = -1; - asmManager->debug_fail_state = -1; - if (!channel && !state) return C_ERR; - if (sdslen(channel) == 0 && sdslen(state) == 0) { - serverLog(LL_WARNING, "ASM fail point is cleared"); - return C_OK; - } - - for (int i = ASM_IMPORT_MAIN_CHANNEL; i <= ASM_MIGRATE_RDB_CHANNEL; i++) { - if (!strcasecmp(channel, asmChannelToString(i))) { - asmManager->debug_fail_channel = i; - break; - } - } - if (asmManager->debug_fail_channel == -1) return C_ERR; - - for (int i = ASM_NONE; i <= ASM_RDBCHANNEL_TRANSFER; i++) { - if (!strcasecmp(state, asmTaskStateToString(i))) { - asmManager->debug_fail_state = i; - break; - } - } - if (asmManager->debug_fail_state == -1) return C_ERR; - - serverLog(LL_NOTICE, "ASM fail point set: channel=%s, state=%s", channel, state); - return C_OK; -} - -int asmDebugSetTrimMethod(const char *method, int active_trim_delay) { - if (!asmManager) { - serverLog(LL_WARNING, "ASM manager is not initialized"); - return C_ERR; - } - int prev = asmManager->debug_trim_method; - if (!strcasecmp(method, "default")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT; - else if (!strcasecmp(method, "none")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_NONE; - else if (!strcasecmp(method, "bg")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_BG; - else if (!strcasecmp(method, "active")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_ACTIVE; - else return C_ERR; - - /* If we are switching from none to default, delete all the keys in the - * slots we don't own */ - if (prev == ASM_DEBUG_TRIM_NONE && asmManager->debug_trim_method != ASM_DEBUG_TRIM_NONE) { - for (int i = 0; i < CLUSTER_SLOTS; i++) - if (!clusterIsMySlot(i)) - clusterDelKeysInSlot(i, 0); - } - asmManager->debug_active_trim_delay = active_trim_delay; - serverLog(LL_NOTICE, "ASM trim method was set=%s, active_trim_delay=%d", method, active_trim_delay); - return C_OK; -} - -int asmDebugIsFailPointActive(int channel, int state) { - if (!asmManager) return 0; /* ASM manager not initialized */ - if (asmManager->debug_fail_channel == channel && asmManager->debug_fail_state == state) { - serverLog(LL_NOTICE, "ASM fail point active: channel=%s, state=%s", - asmChannelToString(channel), asmTaskStateToString(state)); - return 1; - } - return 0; -} - -sds asmCatInfoString(sds info) { - int active_tasks = 0; - - listIter li; - listNode *ln; - listRewind(asmManager->tasks, &li); - while ((ln = listNext(&li)) != NULL) { - asmTask *task = listNodeValue(ln); - if (task->operation == ASM_IMPORT || - (task->operation == ASM_MIGRATE && task->state != ASM_FAILED)) - { - active_tasks++; - } - } - - return sdscatprintf(info ? info : sdsempty(), - "cluster_slot_migration_active_tasks:%d\r\n" - "cluster_slot_migration_active_trim_running:%lu\r\n" - "cluster_slot_migration_active_trim_current_job_keys:%llu\r\n" - "cluster_slot_migration_active_trim_current_job_trimmed:%llu\r\n" - "cluster_slot_migration_stats_active_trim_started:%llu\r\n" - "cluster_slot_migration_stats_active_trim_completed:%llu\r\n" - "cluster_slot_migration_stats_active_trim_cancelled:%llu\r\n", - active_tasks, - listLength(asmManager->active_trim_jobs), - asmManager->active_trim_current_job_keys, - asmManager->active_trim_current_job_trimmed, - asmManager->active_trim_started, - asmManager->active_trim_completed, - asmManager->active_trim_cancelled); -} - -void asmTaskReset(asmTask *task) { - task->state = ASM_NONE; - task->dest_state = ASM_NONE; - task->rdb_channel_state = ASM_NONE; - task->main_channel_conn = NULL; - task->rdb_channel_conn = NULL; - task->dest_offset = 0; - task->source_offset = 0; - task->stream_eof_during_streaming = 0; - task->cross_slot_during_propagating = 0; - replDataBufInit(&task->sync_buffer); - task->main_channel_client = NULL; - task->rdb_channel_client = NULL; - task->paused_time = 0; - task->dest_slots_snapshot_time = 0; - task->dest_accum_applied_time = 0; - task->pre_snapshot_module_cmds = NULL; -} - -asmTask *asmTaskCreate(const char *task_id) { - asmTask *task = zcalloc(sizeof(*task)); - task->error = sdsempty(); - asmTaskReset(task); - task->slots = NULL; - task->source_node = NULL; - task->retry_count = 0; - task->create_time = server.mstime; - task->start_time = -1; - task->end_time = -1; - if (task_id) { - task->id = sdsnew(task_id); - } else { - task->id = sdsnewlen(NULL, CLUSTER_NAMELEN); - getRandomHexChars(task->id, CLUSTER_NAMELEN); - } - - return task; -} - -void asmTaskFree(asmTask *task) { - replDataBufClear(&task->sync_buffer); - sdsfree(task->id); - slotRangeArrayFree(task->slots); - sdsfree(task->error); - zfree(task); -} - -/* Convert the task state to the corresponding event. */ -int asmTaskStateToEvent(asmTask *task) { - if (task->operation == ASM_IMPORT) { - if (task->state == ASM_COMPLETED) return ASM_EVENT_IMPORT_COMPLETED; - else if (task->state == ASM_FAILED) return ASM_EVENT_IMPORT_FAILED; - else return ASM_EVENT_IMPORT_STARTED; - } else { - if (task->state == ASM_COMPLETED) return ASM_EVENT_MIGRATE_COMPLETED; - else if (task->state == ASM_FAILED) return ASM_EVENT_MIGRATE_FAILED; - else return ASM_EVENT_MIGRATE_STARTED; - } -} - -/* Serialize ASM task information into a string for transmission to replicas. - * Format: "task_id:source_node:dest_node:operation:state:slot_ranges" - * Where slot_ranges is in the format "1000-2000 3000-4000 ..." */ -sds asmTaskSerialize(asmTask *task) { - sds serialized = sdsempty(); - - /* Add task ID */ - serialized = sdscatprintf(serialized, "%s:", task->id); - - /* Add source node ID (40 chars) */ - serialized = sdscatlen(serialized, task->source, CLUSTER_NAMELEN); - serialized = sdscat(serialized, ":"); - - /* Add destination node ID (40 chars) */ - serialized = sdscatlen(serialized, task->dest, CLUSTER_NAMELEN); - serialized = sdscat(serialized, ":"); - - /* Add operation type */ - serialized = sdscatprintf(serialized, "%s:", task->operation == ASM_IMPORT ? - "import" : "migrate"); - - /* Add current state */ - serialized = sdscatprintf(serialized, "%s:", asmTaskStateToString(task->state)); - - /* Add slot ranges sds */ - sds slots_str = slotRangeArrayToString(task->slots); - serialized = sdscatprintf(serialized, "%s", slots_str); - sdsfree(slots_str); - - return serialized; -} - -/* Deserialize ASM task information from a string and create a complete asmTask. - * Format: "task_id:source_node:dest_node:operation:state:slot_ranges" - * Returns a new asmTask on success, NULL on failure. */ -asmTask *asmTaskDeserialize(sds data) { - int count, idx = 0; - asmTask *task = NULL; - if (!data || sdslen(data) == 0) return NULL; - - sds *parts = sdssplitlen(data, sdslen(data), ":", 1, &count); - if (count < 6) goto err; - - /* Parse task ID */ - if (sdslen(parts[idx]) == 0) goto err; - task = asmTaskCreate(parts[idx]); - if (!task) goto err; - idx++; - - /* Parse source node ID */ - if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err; - memcpy(task->source, parts[idx], CLUSTER_NAMELEN); - idx++; - - /* Parse destination node ID */ - if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err; - memcpy(task->dest, parts[idx], CLUSTER_NAMELEN); - idx++; - - /* Parse operation type */ - if (!strcasecmp(parts[idx], "import")) { - task->operation = ASM_IMPORT; - } else if (!strcasecmp(parts[idx], "migrate")) { - task->operation = ASM_MIGRATE; - } else { - goto err; - } - idx++; - - /* Parse state */ - task->state = ASM_NONE; /* Default state */ - for (int state = ASM_NONE; state <= ASM_RDBCHANNEL_TRANSFER; state++) { - if (!strcasecmp(parts[idx], asmTaskStateToString(state))) { - task->state = state; - break; - } - } - idx++; - - /* Parse slot ranges */ - task->slots = slotRangeArrayFromString(parts[idx]); - if (!task->slots) goto err; - idx++; - - /* Ignore any extra fields for future compatibility */ - - sdsfreesplitres(parts, count); - return task; - -err: - if (task) asmTaskFree(task); - sdsfreesplitres(parts, count); - return NULL; -} - -/* Notify replicas about ASM task information to maintain consistency during - * slot migration. This function sends a CLUSTER SYNCSLOTS CONF ASM-TASK command - * to all connected replicas with the serialized task information. */ -void asmNotifyReplicasStateChange(struct asmTask *task) { - if (!server.cluster_enabled || !clusterNodeIsMaster(getMyClusterNode())) return; - - /* Create command arguments for CLUSTER SYNCSLOTS CONF ASM-TASK */ - robj *argv[5]; - argv[0] = createStringObject("CLUSTER", 7); - argv[1] = createStringObject("SYNCSLOTS", 9); - argv[2] = createStringObject("CONF", 4); - argv[3] = createStringObject("ASM-TASK", 8); - argv[4] = createObject(OBJ_STRING, asmTaskSerialize(task)); - - /* Send the command to all replicas */ - replicationFeedSlaves(server.slaves, -1, argv, 5); - - /* Clean up command objects */ - for (int i = 0; i < 5; i++) { - decrRefCount(argv[i]); - } -} - -/* Dump the active import ASM task information. */ -sds asmDumpActiveImportTask(void) { - if (!server.cluster_enabled) return NULL; - - /* For replica, dump the master active task. */ - if (clusterNodeIsSlave(getMyClusterNode()) && - asmManager->master_task && - asmManager->master_task->state != ASM_FAILED && - asmManager->master_task->state != ASM_COMPLETED) - { - return asmTaskSerialize(asmManager->master_task); - } - - /* For master, dump the first active task. */ - if (!asmManager || listLength(asmManager->tasks) == 0) return NULL; - asmTask *task = listNodeValue(listFirst(asmManager->tasks)); - if (task->state == ASM_NONE || task->state == ASM_FAILED || - task->state == ASM_COMPLETED) return NULL; - - return asmTaskSerialize(task); -} - -size_t asmGetPeakSyncBufferSize(void) { - if (!asmManager) return 0; - /* Compute peak sync buffer usage. The current task's peak may not - * reflect in asmManager->sync_buffer_peak immediately. */ - size_t peak = asmManager->sync_buffer_peak; - asmTask *task = listFirst(asmManager->tasks) ? - listNodeValue(listFirst(asmManager->tasks)) : NULL; - if (task && task->operation == ASM_IMPORT) - peak = max(task->sync_buffer.peak, asmManager->sync_buffer_peak); - - return peak; -} - -size_t asmGetImportInputBufferSize(void) { - if (!asmManager || listLength(asmManager->tasks) == 0) return 0; - - asmTask *task = listNodeValue(listFirst(asmManager->tasks)); - if (task->operation == ASM_IMPORT) - return task->sync_buffer.mem_used; - - return 0; -} - -size_t asmGetMigrateOutputBufferSize(void) { - if (!asmManager || listLength(asmManager->tasks) == 0) return 0; - - asmTask *task = listNodeValue(listFirst(asmManager->tasks)); - if (task->operation == ASM_MIGRATE && task->main_channel_client) - return getClientOutputBufferMemoryUsage(task->main_channel_client); - - return 0; -} - -/* Returns the ASM task with the given ID, or NULL if no such task exists. */ -static asmTask *asmLookupTaskAt(list *tasks, const char *id) { - listIter li; - listNode *ln; - - listRewind(tasks, &li); - while ((ln = listNext(&li)) != NULL) { - asmTask *task = listNodeValue(ln); - if (!strcmp(task->id, id)) return task; - } - return NULL; -} - -/* Returns the ASM task with the given ID, or NULL if no such task exists. */ -asmTask *asmLookupTaskById(const char *id) { - return asmLookupTaskAt(asmManager->tasks, id); -} - -/* Returns the ASM task that is identical to the given slot range array, or NULL - * if no such task exists. */ -asmTask *asmLookupTaskBySlotRangeArray(slotRangeArray *slots) { - listIter li; - listNode *ln; - - listRewind(asmManager->tasks, &li); - while ((ln = listNext(&li)) != NULL) { - asmTask *task = listNodeValue(ln); - if (slotRangeArrayIsEqual(task->slots, slots)) - return task; - } - return NULL; -} - -/* Returns the slot range array for the given task ID */ -slotRangeArray *asmTaskGetSlotRanges(const char *task_id) { - asmTask *task = NULL; - if (!task_id || (task = asmLookupTaskById(task_id)) == NULL) return NULL; - - return task->slots; -} - -/* Returns 1 if the slot range array overlaps with the given slot range. */ -static int slotRangeArrayOverlaps(slotRangeArray *slots, slotRange *req) { - for (int i = 0; i < slots->num_ranges; i++) { - slotRange *sr = &slots->ranges[i]; - if (sr->start <= req->end && sr->end >= req->start) - return 1; - } - return 0; -} - -/* Returns 1 if the two slot range arrays overlap, 0 otherwise. */ -static int slotRangeArraysOverlap(slotRangeArray *slots1, slotRangeArray *slots2) { - for (int i = 0; i < slots1->num_ranges; i++) { - slotRange *sr1 = &slots1->ranges[i]; - if (slotRangeArrayOverlaps(slots2, sr1)) return 1; - } - return 0; -} - -/* Returns the ASM task that overlaps with the given slot range, or NULL if - * no such task exists. */ -static asmTask *lookupAsmTaskBySlotRange(slotRange *req) { - listIter li; - listNode *ln; - - listRewind(asmManager->tasks, &li); - while ((ln = listNext(&li)) != NULL) { - asmTask *task = listNodeValue(ln); - if (slotRangeArrayOverlaps(task->slots, req)) - return task; - } - return NULL; -} - -/* Validates the given slot ranges for a migration task: - * - Ensures the current node is a master. - * - Verifies all slots are in a STABLE state. - * - Confirms all slots belong to a single source node. - * - Confirms no ongoing import task that overlaps with the slot ranges. - * - * Returns the source node if validation succeeds. - * Otherwise, returns NULL and sets 'err' variable. */ -static clusterNode *validateImportSlotRanges(slotRangeArray *slots, sds *err, asmTask *current) { - clusterNode *source = NULL; - - *err = NULL; - - /* Ensure this is a master node */ - if (!clusterNodeIsMaster(getMyClusterNode())) { - *err = sdsnew("slot migration not allowed on replica."); - goto out; - } - - /* Ensure no manual migration is in progress. */ - for (int i = 0; i < CLUSTER_SLOTS; i++) { - if (getImportingSlotSource(i) != NULL || - getMigratingSlotDest(i) != NULL) - { - *err = sdsnew("all slot states must be STABLE to start a slot migration task."); - goto out; - } - } - - for (int i = 0; i < slots->num_ranges; i++) { - slotRange *sr = &slots->ranges[i]; - - /* Ensure no import task overlaps with this slot range. - * Skip check current task that is running for this slot range. */ - asmTask *task = lookupAsmTaskBySlotRange(sr); - if (task && task != current && task->operation == ASM_IMPORT) { - *err = sdscatprintf(sdsempty(), - "overlapping import exists for slot range: %d-%d", - sr->start, sr->end); - goto out; - } - - /* Validate if we can start migration task for this slot range. */ - for (int j = sr->start; j <= sr->end; j++) { - clusterNode *node = getNodeBySlot(j); - if (node == NULL) { - *err = sdscatprintf(sdsempty(), "slot has no owner: %d", j); - goto out; - } - - if (!source) { - source = node; - } else if (source != node) { - *err = sdsnew("slots belong to different source nodes"); - goto out; - } - } - } - -out: - return *err ? NULL : source; -} - -/* Returns 1 if a task with the specified operation is in progress, 0 otherwise. */ -static int asmTaskInProgress(int operation) { - listIter li; - listNode *ln; - - if (!asmManager || listLength(asmManager->tasks) == 0) return 0; - - listRewind(asmManager->tasks, &li); - while ((ln = listNext(&li)) != NULL) { - asmTask *task = listNodeValue(ln); - if (task->operation == operation) return 1; - } - return 0; -} - -/* Returns 1 if a migrate task is in progress, 0 otherwise. */ -int asmMigrateInProgress(void) { - return asmTaskInProgress(ASM_MIGRATE); -} - -/* Returns 1 if an import task is in progress, 0 otherwise. */ -int asmImportInProgress(void) { - return asmTaskInProgress(ASM_IMPORT); -} - -/* Returns 1 if the task is in a state where it can receive replication stream -* for the slot range, 0 otherwise. */ -inline static int asmCanFeedMigrationClient(asmTask *task) { - return task->operation == ASM_MIGRATE && - !task->cross_slot_during_propagating && - (task->state == ASM_SEND_BULK_AND_STREAM || - task->state == ASM_SEND_STREAM || - task->state == ASM_HANDOFF_PREP); -} - -/* Feed the migration client with the replication stream for the slot range. */ -void asmFeedMigrationClient(robj **argv, int argc) { - asmTask *task = NULL; - - if (server.cluster_enabled == 0 || listLength(asmManager->tasks) == 0) - return; - - /* Check if there is a migrate task that can receive replication stream. */ - task = listNodeValue(listFirst(asmManager->tasks)); - if (!asmCanFeedMigrationClient(task)) return; - - /* Ensure all arguments are converted to string encoding if necessary, - * since getSlotFromCommand expects them to be string-encoded. - * Generally the arguments are string-encoded, but we may rewrite - * the command arguments to integer encoding. */ - for (int i = 0; i < argc; i++) { - if (!sdsEncodedObject(argv[i])) { - serverAssert(argv[i]->encoding == OBJ_ENCODING_INT); - robj *old = argv[i]; - argv[i] = createStringObjectFromLongLongWithSds((long)old->ptr); - decrRefCount(old); - } - } - - /* Check if the command belongs to the slot range. */ - struct redisCommand *cmd = lookupCommand(argv, argc); - serverAssert(cmd); - - int slot = getSlotFromCommand(cmd, argv, argc); - - /* If the command does not have keys, skip it now. - * SELECT is not propagated, since we only support a single db in cluster mode. - * MULTI/EXEC is not needed, since transaction semantics are unnecessary - * before the slot handoff. - * FUNCTION subcommands should be executed on all nodes, so here we skip it, - * and even propagating them may cause an error when executing. - * - * NOTICE: if some keyless commands should be propagated to the destination, - * we should identify them here and send. */ - if (slot == INVALID_CLUSTER_SLOT) return; - - /* Generally we reject cross-slot commands before executing, but module may - * replicate this kind of command, so we check again. To guarantee data - * consistency, we cancel the task if we encounter a cross-slot command. */ - if (slot == CLUSTER_CROSSSLOT) { - /* We cannot cancel the task directly here, since it may lead to a recursive - * call: asmTaskCancel() --> moduleFireServerEvent() --> moduleFreeContext() - * --> postExecutionUnitOperations() --> propagateNow(). Even worse, this - * could result in propagating pending commands to the replication stream twice. - * To avoid this, we simply set a flag here, cancel the task in beforeSleep. */ - task->cross_slot_during_propagating = 1; - return; - } - - /* Check if the slot belongs to the task's slot range. */ - slotRange sr = {slot, slot}; - if (!slotRangeArrayOverlaps(task->slots, &sr)) return; - - if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) - freeClientAsync(task->main_channel_client); - - /* Feed main channel with the command. */ - client *c = task->main_channel_client; - size_t prev_bytes = getNormalClientPendingReplyBytes(c); - - addReplyArrayLen(c, argc); - for (int i = 0; i < argc; i++) - addReplyBulk(c, argv[i]); - - /* Update the task's source offset to reflect the bytes sent. */ - task->source_offset += (getNormalClientPendingReplyBytes(c) - prev_bytes); -} - -asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *err) { - clusterNode *source; - - *err = NULL; - /* Validate that the slot ranges are valid and that migration can be - * initiated for them. */ - source = validateImportSlotRanges(slots, err, NULL); - if (!source) - goto err; - - if (source == getMyClusterNode()) { - *err = sdsnew("this node is already the owner of the slot range"); - goto err; - } - - /* Only support a single task at a time now. */ - if (listLength(asmManager->tasks) != 0) { - asmTask *current = listNodeValue(listFirst(asmManager->tasks)); - if (current->state == ASM_FAILED) { - /* We can create a new import task only if the current one is failed, - * cancel the failed task to create a new one. */ - asmTaskCancel(current, "new import requested"); - } else { - *err = sdsnew("another ASM task is already in progress"); - goto err; - } - } - /* There should be no task in progress. */ - serverAssert(listLength(asmManager->tasks) == 0); - - /* Create a slot migration task */ - asmTask *task = asmTaskCreate(task_id); - task->slots = slots; - task->state = ASM_NONE; - task->operation = ASM_IMPORT; - task->source_node = source; - memcpy(task->source, clusterNodeGetName(source), CLUSTER_NAMELEN); - memcpy(task->dest, getMyClusterId(), CLUSTER_NAMELEN); - - listAddNodeTail(asmManager->tasks, task); - sds slots_str = slotRangeArrayToString(slots); - serverLog(LL_NOTICE, "Import task %s created: src=%.40s, dest=%.40s, slots=%s", - task->id, task->source, task->dest, slots_str); - sdsfree(slots_str); - - return task; - -err: - slotRangeArrayFree(slots); - return NULL; -} - -/* CLUSTER MIGRATION IMPORT <start-slot end-slot [start-slot end-slot ...]> - * - * Sent by operator to the destination node to start the migration. */ -static void clusterMigrationCommandImport(client *c) { - /* Validate slot range arg count */ - int remaining = c->argc - 3; - if (remaining == 0 || remaining % 2 != 0) { - addReplyErrorArity(c); - return; - } - - slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 3); - if (!slots) return; - - sds err = NULL; - asmTask *task = asmCreateImportTask(NULL, slots, &err); - if (!task) { - addReplyErrorSds(c, err); - return; - } - - addReplyBulkCString(c, task->id); -} - -/* CLUSTER MIGRATION CANCEL [ID <task-id> | ALL] - * - Reply: Number of cancelled tasks - * - * Cancels import tasks that overlap with the specified slot ranges. - * Multiple tasks may be cancelled. */ -static void clusterMigrationCommandCancel(client *c) { - sds task_id = NULL; - int num_cancelled = 0; - - /* Validate slot range arg count */ - if (c->argc != 4 && c->argc != 5) { - addReplyErrorArity(c); - return; - } - - if (!strcasecmp(c->argv[3]->ptr, "id")) { - if (c->argc != 5) { - addReplyErrorArity(c); - return; - } - task_id = c->argv[4]->ptr; - } else if (!strcasecmp(c->argv[3]->ptr, "all")) { - if (c->argc != 4) { - addReplyErrorArity(c); - return; - } - } else { - addReplyError(c, "unknown argument"); - return; - } - - num_cancelled = clusterAsmCancel(task_id, "user request"); - addReplyLongLong(c, num_cancelled); -} - -/* Reply with the status of the task. */ -static void replyTaskStatus(client *c, asmTask *task) { - mstime_t p = 0; - - addReplyMapLen(c, 12); - addReplyBulkCString(c, "id"); - addReplyBulkCString(c, task->id); - addReplyBulkCString(c, "slots"); - addReplyBulkSds(c, slotRangeArrayToString(task->slots)); - addReplyBulkCString(c, "source"); - addReplyBulkCBuffer(c, task->source, CLUSTER_NAMELEN); - addReplyBulkCString(c, "dest"); - addReplyBulkCBuffer(c, task->dest, CLUSTER_NAMELEN); - addReplyBulkCString(c, "operation"); - addReplyBulkCString(c, task->operation == ASM_IMPORT ? "import" : "migrate"); - addReplyBulkCString(c, "state"); - addReplyBulkCString(c, asmTaskStateToString(task->state)); - addReplyBulkCString(c, "last_error"); - addReplyBulkCBuffer(c, task->error, sdslen(task->error)); - addReplyBulkCString(c, "retries"); - addReplyLongLong(c, task->retry_count); - addReplyBulkCString(c, "create_time"); - addReplyLongLong(c, task->create_time); - addReplyBulkCString(c, "start_time"); - addReplyLongLong(c, task->start_time); - addReplyBulkCString(c, "end_time"); - addReplyLongLong(c, task->end_time); - - if (task->operation == ASM_MIGRATE && task->state == ASM_COMPLETED) - p = task->end_time - task->paused_time; - addReplyBulkCString(c, "write_pause_ms"); - addReplyLongLong(c, p); -} - -/* CLUSTER MIGRATION STATUS [ID <task-id> | ALL] - * - Reply: Array of atomic slot migration tasks */ -static void clusterMigrationCommandStatus(client *c) { - listIter li; - listNode *ln; - - if (c->argc != 4 && c->argc != 5) { - addReplyErrorArity(c); - return; - } - - if (!strcasecmp(c->argv[3]->ptr, "id")) { - if (c->argc != 5) { - addReplyErrorArity(c); - return; - } - sds id = c->argv[4]->ptr; - asmTask *task = asmLookupTaskAt(asmManager->tasks, id); - if (!task) task = asmLookupTaskAt(asmManager->archived_tasks, id); - if (!task) { - addReplyArrayLen(c, 0); - return; - } - - addReplyArrayLen(c, 1); - replyTaskStatus(c, task); - } else if (!strcasecmp(c->argv[3]->ptr, "all")) { - if (c->argc != 4) { - addReplyErrorArity(c); - return; - } - addReplyArrayLen(c, listLength(asmManager->tasks) + - listLength(asmManager->archived_tasks)); - listRewind(asmManager->tasks, &li); - while ((ln = listNext(&li)) != NULL) - replyTaskStatus(c, listNodeValue(ln)); - - listRewind(asmManager->archived_tasks, &li); - while ((ln = listNext(&li)) != NULL) - replyTaskStatus(c, listNodeValue(ln)); - } else { - addReplyError(c, "unknown argument"); - return; - } -} - -/* CLUSTER MIGRATION - * <IMPORT <start-slot end-slot [start-slot end-slot ...]> | - * STATUS [ID <task-id> | ALL] | - * CANCEL [ID <task-id> | ALL]> -*/ -void clusterMigrationCommand(client *c) { - if (c->argc < 4) { - addReplyErrorArity(c); - return; - } - - if (strcasecmp(c->argv[2]->ptr, "import") == 0) { - clusterMigrationCommandImport(c); - } else if (strcasecmp(c->argv[2]->ptr, "status") == 0) { - clusterMigrationCommandStatus(c); - } else if (strcasecmp(c->argv[2]->ptr, "cancel") == 0) { - clusterMigrationCommandCancel(c); - } else { - addReplyError(c, "unknown argument"); - } -} - -/* Return the number of keys in the specified slot ranges. */ -unsigned long long asmCountKeysInSlots(slotRangeArray *slots) { - if (!slots) return 0; - - unsigned long long key_count = 0; - for (int i = 0; i < slots->num_ranges; i++) { - for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { - key_count += kvstoreDictSize(server.db[0].keys, j); - } - } - return key_count; -} - -/* Log a human-readable message for ASM task lifecycle events. */ -void asmLogTaskEvent(asmTask *task, int event) { - sds str = slotRangeArrayToString(task->slots); - - switch (event) { - case ASM_EVENT_IMPORT_STARTED: - serverLog(LL_NOTICE, "Import task %s started for slots: %s", task->id, str); - break; - case ASM_EVENT_IMPORT_FAILED: - serverLog(LL_NOTICE, "Import task %s failed for slots: %s", task->id, str); - break; - case ASM_EVENT_TAKEOVER: - serverLog(LL_NOTICE, "Import task %s is ready to takeover slots: %s", task->id, str); - break; - case ASM_EVENT_IMPORT_COMPLETED: - serverLog(LL_NOTICE, "Import task %s completed for slots: %s (imported %llu keys)", - task->id, str, asmCountKeysInSlots(task->slots)); - break; - case ASM_EVENT_MIGRATE_STARTED: - serverLog(LL_NOTICE, "Migrate task %s started for slots: %s (keys at start: %llu)", - task->id, str, asmCountKeysInSlots(task->slots)); - break; - case ASM_EVENT_MIGRATE_FAILED: - serverLog(LL_NOTICE, "Migrate task %s failed for slots: %s", task->id, str); - break; - case ASM_EVENT_HANDOFF_PREP: - serverLog(LL_NOTICE, "Migrate task %s preparing to handoff for slots: %s", task->id, str); - break; - case ASM_EVENT_MIGRATE_COMPLETED: - serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s (migrated %llu keys)", - task->id, str, asmCountKeysInSlots(task->slots)); - break; - default: - break; - } - - sdsfree(str); -} - -/* Notify the state change to the module and the cluster implementation. */ -void asmNotifyStateChange(asmTask *task, int event) { - RedisModuleClusterSlotMigrationInfo info = { - .version = REDISMODULE_CLUSTER_SLOT_MIGRATION_INFO_VERSION, - .task_id = task->id, - .slots = (RedisModuleSlotRangeArray *) task->slots - }; - memcpy(info.source_node_id, task->source, CLUSTER_NAMELEN); - memcpy(info.destination_node_id, task->dest, CLUSTER_NAMELEN); - - int module_event = -1; - if (event == ASM_EVENT_IMPORT_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_STARTED; - else if (event == ASM_EVENT_IMPORT_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_COMPLETED; - else if (event == ASM_EVENT_IMPORT_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_FAILED; - else if (event == ASM_EVENT_MIGRATE_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_STARTED; - else if (event == ASM_EVENT_MIGRATE_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED; - else if (event == ASM_EVENT_MIGRATE_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_FAILED; - serverAssert(module_event != -1); - - moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION, module_event, &info); - serverLog(LL_DEBUG, "Fire cluster asm module event, task %s: state=%s", - task->id, asmTaskStateToString(task->state)); - - if (clusterNodeIsMaster(getMyClusterNode())) { - /* Notify the cluster impl only if it is a real active import task. */ - if (task != asmManager->master_task) { - asmLogTaskEvent(task, event); - clusterAsmOnEvent(task->id, event, task->slots); - } - asmNotifyReplicasStateChange(task); /* Propagate state change to replicas */ - } -} - -void asmImportSetFailed(asmTask *task) { - serverAssert(task->operation == ASM_IMPORT); - if (task->state == ASM_FAILED) return; - - /* If we are in the RDB channel transfer state, we need to - * close the client that was created for the RDB channel. */ - if (task->rdb_channel_conn && task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER) { - client *c = connGetPrivateData(task->rdb_channel_conn); - serverAssert(c->task == task); - task->rdb_channel_conn = NULL; - c->task = NULL; - c->flags &= ~CLIENT_MASTER; - freeClientAsync(c); - } - - /* If in the wait stream EOF or streaming buffer state, we need to close the - * client that was created for the main channel. */ - if (task->main_channel_conn && - (task->state == ASM_STREAMING_BUF || task->state == ASM_WAIT_STREAM_EOF)) - { - client *c = connGetPrivateData(task->main_channel_conn); - serverAssert(c->task == task); - task->main_channel_conn = NULL; - c->task = NULL; - c->flags &= ~CLIENT_MASTER; - freeClientAsync(c); - } - - /* Close the connections */ - if (task->rdb_channel_conn) connClose(task->rdb_channel_conn); - if (task->main_channel_conn) connClose(task->main_channel_conn); - task->rdb_channel_conn = NULL; - task->main_channel_conn = NULL; - - /* Clear the replication data buffer */ - asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak, task->sync_buffer.peak); - replDataBufClear(&task->sync_buffer); - - /* Mark the task as failed and notify the cluster */ - task->state = ASM_FAILED; - asmNotifyStateChange(task, ASM_EVENT_IMPORT_FAILED); - /* This node may become replica, only master can setup new slot trimming jobs. */ - if (clusterNodeIsMaster(getMyClusterNode())) - asmTrimJobSchedule(task->slots); -} - -void asmMigrateSetFailed(asmTask *task) { - serverAssert(task->operation == ASM_MIGRATE); - if (task->state == ASM_FAILED) return; - - /* Close the RDB and main channel clients*/ - if (task->rdb_channel_client) { - task->rdb_channel_client->task = NULL; - freeClientAsync(task->rdb_channel_client); - task->rdb_channel_client = NULL; - } - if (task->main_channel_client) { - task->main_channel_client->task = NULL; - freeClientAsync(task->main_channel_client); - task->main_channel_client = NULL; - } - - /* Actually it is not necessary to clear the sync buffer here, - * to make asmTaskReset work properly after migrate task failed */ - replDataBufClear(&task->sync_buffer); - - /* Mark the task as failed and notify the cluster */ - task->state = ASM_FAILED; - asmNotifyStateChange(task, ASM_EVENT_MIGRATE_FAILED); -} - -void asmTaskSetFailed(asmTask *task, const char *fmt, ...) { - va_list ap; - sds error = sdsempty(); - - /* Set the error message */ - va_start(ap, fmt); - error = sdscatvprintf(error, fmt, ap); - va_end(ap); - error = sdscatprintf(error, " (state: %s, rdb_channel_state: %s)", - asmTaskStateToString(task->state), - asmTaskStateToString(task->rdb_channel_state)); - sdsfree(task->error); - task->error = error; - - /* Log the error */ - sds slots_str = slotRangeArrayToString(task->slots); - serverLog(LL_WARNING, "%s task %s failed: slots=%s, err=%s", - task->operation == ASM_IMPORT ? "Import" : "Migrate", - task->id, slots_str, task->error); - sdsfree(slots_str); - - if (task->operation == ASM_IMPORT) - asmImportSetFailed(task); - else - asmMigrateSetFailed(task); -} - -/* The task is completed or canceled. Update stats and move it to - * the archived list. */ -void asmTaskFinalize(asmTask *task) { - listNode *ln = listFirst(asmManager->tasks); - serverAssert(ln->value == task); - - task->source_node = NULL; /* Should never access it */ - task->end_time = server.mstime; - - if (task->operation == ASM_IMPORT) { - asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak, - task->sync_buffer.peak); - replDataBufClear(&task->sync_buffer); /* Not used, so save memory */ - } - - /* Move the task to the archived list */ - listUnlinkNode(asmManager->tasks, ln); - listLinkNodeHead(asmManager->archived_tasks, ln); -} - -static void asmTaskCancel(asmTask *task, const char *reason) { - if (task->state == ASM_CANCELED) return; - - asmTaskSetFailed(task, "Cancelled due to %s", reason); - task->state = ASM_CANCELED; - asmTaskFinalize(task); -} - -void asmImportTakeover(asmTask *task) { - serverAssert(task->state == ASM_WAIT_STREAM_EOF || - task->state == ASM_STREAMING_BUF); - - /* Free the main channel connection since it is no longer needed. */ - serverAssert(task->main_channel_conn != NULL); - client *c = connGetPrivateData(task->main_channel_conn); - c->task = NULL; - c->flags &= ~CLIENT_MASTER; - freeClientAsync(c); - task->main_channel_conn = NULL; - - task->state = ASM_TAKEOVER; - asmLogTaskEvent(task, ASM_EVENT_TAKEOVER); - clusterAsmOnEvent(task->id, ASM_EVENT_TAKEOVER, task->slots); -} - -void asmCallbackOnFreeClient(client *c) { - asmTask *task = c->task; - if (!task) return; - - /* If the RDB channel connection is closed, mark the task as failed. */ - if (c->conn && task->rdb_channel_conn == c->conn) { - /* We create the client only when transferring data on the RDB channel */ - serverAssert(task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER); - task->rdb_channel_conn = NULL; /* Will be freed by freeClient */ - c->flags &= ~CLIENT_MASTER; - asmTaskSetFailed(task, "RDB channel - Connection is closed"); - return; - } - - if (c->conn && task->main_channel_conn == c->conn) { - /* After or in the process of streaming buffer to DB, a client will be - * created based on the main channel connection. */ - serverAssert(task->state == ASM_STREAMING_BUF || - task->state == ASM_WAIT_STREAM_EOF); - task->main_channel_conn = NULL; /* Will be freed by freeClient */ - c->flags &= ~CLIENT_MASTER; - asmTaskSetFailed(task, "Main channel - Connection is closed"); - return; - } - - if (c == task->rdb_channel_client) { - /* TODO: Detect whether the bgsave is completed successfully and - * update the state properly. */ - task->rdb_channel_state = ASM_COMPLETED; - /* We may not have detected whether the child process has exited yet, - * so we can't determine whether the client has completed the slots - * snapshot transfer. If the RDB channel is interrupted unexpectedly, - * the destination side will also close the main channel. - * So here we just reset the RDB channel client of task. */ - task->rdb_channel_client = NULL; - return; - } - - /* If the main channel client is closed, we need to mark the task as failed - * and clean up the RDB channel client if it exists. */ - if (c == task->main_channel_client) { - task->main_channel_client = NULL; - /* The rdb channel client will be cleaned up */ - asmTaskSetFailed(task, "Main and RDB channel clients are disconnected."); - return; - } -} - -/* Sends an AUTH command to the source node using the internal secret. - * Returns an error string if the command fails, or NULL on success. */ -char *asmSendInternalAuth(connection *conn) { - size_t len = 0; - const char *internal_secret = clusterGetSecret(&len); - serverAssert(internal_secret != NULL); - - sds secret = sdsnewlen(internal_secret, len); - char *err = sendCommand(conn, "AUTH", "internal connection", secret, NULL); - sdsfree(secret); - return err; -} - -/* Handles the RDB channel sync with the source node. - * This function is called when the RDB channel is established - * and ready to sync with the source node. */ -void asmRdbChannelSyncWithSource(connection *conn) { - asmTask *task = connGetPrivateData(conn); - char *err = NULL; - sds task_error_msg = NULL; - - /* Check for errors in the socket: after a non blocking connect() we - * may find that the socket is in error state. */ - if (connGetState(conn) != CONN_STATE_CONNECTED) - goto error; - - /* Check if the task is in a fail point state */ - if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) { - char buf[1]; - /* Simulate a failure by shutting down the connection. On some operating - * systems (e.g. Linux), the socket's receive buffer is not flushed - * immediately, so we issue a dummy read to drain any pending data and - * surface the error condition. - * using shutdown() instead of connShutdown() because connTLSShutdown() - * will free the connection directly, which is not what we want. */ - shutdown(conn->fd, SHUT_RDWR); - connRead(conn, buf, 1); - } - - if (task->rdb_channel_state == ASM_CONNECTING) { - connSetReadHandler(conn, asmRdbChannelSyncWithSource); - connSetWriteHandler(conn, NULL); - - /* Send AUTH command to source node using internal auth */ - err = asmSendInternalAuth(conn); - if (err) goto write_error; - task->rdb_channel_state = ASM_AUTH_REPLY; - return; - } - - if (task->rdb_channel_state == ASM_AUTH_REPLY) { - err = receiveSynchronousResponse(conn); - /* The source node did not reply */ - if (err == NULL) goto no_response_error; - - /* Check `+OK` reply */ - if (!strcmp(err, "+OK")) { - sdsfree(err); - err = NULL; - task->rdb_channel_state = ASM_RDBCHANNEL_REQUEST; - serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots rdb channel operation can continue..."); - } else { - task_error_msg = sdscatprintf(sdsempty(), - "Error reply to AUTH from source: %s", err); - sdsfree(err); - goto error; - } - } - - if (task->rdb_channel_state == ASM_RDBCHANNEL_REQUEST) { - err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "RDBCHANNEL", task->id, NULL); - if (err) goto write_error; - task->rdb_channel_state = ASM_RDBCHANNEL_REPLY; - return; - } - - if (task->rdb_channel_state == ASM_RDBCHANNEL_REPLY) { - err = receiveSynchronousResponse(conn); - /* The source node did not reply */ - if (err == NULL) goto no_response_error; - - /* Ignore ‘\n' sent from the source node to keep the connection alive. */ - if (sdslen(err) == 0) { - serverLog(LL_DEBUG, "Received an empty line in RDBCHANNEL reply, slots snapshot delivery will start later"); - sdsfree(err); - return; - } - - /* Check `+SLOTSSNAPSHOT` reply */ - if (!strncmp(err, "+SLOTSSNAPSHOT", strlen("+SLOTSSNAPSHOT"))) { - sdsfree(err); - err = NULL; - task->state = ASM_ACCUMULATE_BUF; - /* The main channel buffers pending commands. */ - connSetReadHandler(task->main_channel_conn, asmSyncBufferReadFromConn); - - task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER; - client *c = createClient(conn); - c->flags |= (CLIENT_MASTER | CLIENT_INTERNAL | CLIENT_ASM_IMPORTING); - c->querybuf = sdsempty(); - c->authenticated = 1; - c->user = NULL; - c->task = task; - serverLog(LL_NOTICE, - "Source node replied to SLOTSSNAPSHOT, syncing slots snapshot can continue..."); - } else { - task_error_msg = sdscatprintf(sdsempty(), - "Error reply to CLUSTER SYNCSLOTS RDBCHANNEL from the source: %s", err); - sdsfree(err); - goto error; - } - return; - } - return; - -no_response_error: - task_error_msg = sdsnew("Source node did not respond to command during RDBCHANNELSYNCSLOTS handshake"); - /* Fall through to regular error handling */ - -error: - asmTaskSetFailed(task, "RDB channel - Failed to sync with the source node: %s", - task_error_msg ? task_error_msg : connGetLastError(conn)); - sdsfree(task_error_msg); - return; - -write_error: /* Handle sendCommand() errors. */ - task_error_msg = sdscatprintf(sdsempty(), "Failed to send command to the source node: %s", err); - sdsfree(err); - goto error; -} - -char *asmSendSlotRangesSync(connection *conn, asmTask *task) { - /* Prepare CLUSTER SYNCSLOTS SYNC command */ - serverAssert(task->slots->num_ranges <= CLUSTER_SLOTS); - int argc = task->slots->num_ranges * 2 + 4; - char **args = zcalloc(sizeof(char*) * argc); - size_t *lens = zcalloc(sizeof(size_t) * argc); - - args[0] = "CLUSTER"; - args[1] = "SYNCSLOTS"; - args[2] = "SYNC"; - args[3] = task->id; - lens[0] = strlen("CLUSTER"); - lens[1] = strlen("SYNCSLOTS"); - lens[2] = strlen("SYNC"); - lens[3] = sdslen(task->id); - - int i = 4; - for (int j = 0; j < task->slots->num_ranges; j++) { - slotRange *sr = &task->slots->ranges[j]; - args[i] = sdscatprintf(sdsempty(), "%d", sr->start); - lens[i] = sdslen(args[i]); - args[i+1] = sdscatprintf(sdsempty(), "%d", sr->end); - lens[i+1] = sdslen(args[i+1]); - i += 2; - } - serverAssert(i == argc); - - /* Send command to source node */ - char *err = sendCommandArgv(conn, argc, args, lens); - - /* Free allocated memory */ - for (int j = 4; j < argc; j++) { - sdsfree(args[j]); - } - zfree(args); - zfree(lens); - - return err; -} - -void asmSyncWithSource(connection *conn) { - asmTask *task = connGetPrivateData(conn); - char *err = NULL; - - /* Some task errors are not network issues, we record them explicitly. */ - sds task_error_msg = NULL; - - /* Check for errors in the socket: after a non blocking connect() we - * may find that the socket is in error state. */ - if (connGetState(conn) != CONN_STATE_CONNECTED) - goto error; - - /* Check if the fail point is active for this channel and state */ - if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_MAIN_CHANNEL, task->state))) { - char buf[1]; - shutdown(conn->fd, SHUT_RDWR); - connRead(conn, buf, 1); - } - - if (task->state == ASM_CONNECTING) { - connSetReadHandler(conn, asmSyncWithSource); - connSetWriteHandler(conn, NULL); - /* Send AUTH command to source node using internal auth */ - err = asmSendInternalAuth(conn); - if (err) goto write_error; - task->state = ASM_AUTH_REPLY; - return; - } - - if (task->state == ASM_AUTH_REPLY) { - err = receiveSynchronousResponse(conn); - /* The source node did not reply */ - if (err == NULL) goto no_response_error; - - /* Check `+OK` reply */ - if (!strcmp(err, "+OK")) { - sdsfree(err); - err = NULL; - task->state = ASM_SEND_HANDSHAKE; - serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots can continue..."); - } else { - task_error_msg = sdscatprintf(sdsempty(), - "Error reply to AUTH from the source: %s", err); - sdsfree(err); - goto error; - } - } - - if (task->state == ASM_SEND_HANDSHAKE) { - sds node_id = sdsnewlen(clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN); - err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "CONF", "NODE-ID", node_id, NULL); - sdsfree(node_id); - if (err) goto write_error; - task->state = ASM_HANDSHAKE_REPLY; - return; - } - - if (task->state == ASM_HANDSHAKE_REPLY) { - err = receiveSynchronousResponse(conn); - /* The source node did not reply */ - if (err == NULL) goto no_response_error; - - /* Check `+OK` reply */ - if (!strcmp(err, "+OK")) { - sdsfree(err); - err = NULL; - task->state = ASM_SEND_SYNCSLOTS; - serverLog(LL_NOTICE, "Source node replied to SYNCSLOTS CONF command, syncslots can continue..."); - } else { - task_error_msg = sdscatprintf(sdsempty(), - "Error reply to CLUSTER SYNCSLOTS CONF from the source: %s", err); - sdsfree(err); - goto error; - } - } - - if (task->state == ASM_SEND_SYNCSLOTS) { - err = asmSendSlotRangesSync(conn, task); - if (err) goto write_error; - - task->state = ASM_SYNCSLOTS_REPLY; - return; - } - - if (task->state == ASM_SYNCSLOTS_REPLY) { - err = receiveSynchronousResponse(conn); - /* The source node did not reply */ - if (err == NULL) goto no_response_error; - - /* Check `+RDBCHANNELSYNCSLOTS` reply */ - if (!strncmp(err, "+RDBCHANNELSYNCSLOTS", strlen("+RDBCHANNELSYNCSLOTS"))) { - sdsfree(err); - err = NULL; - task->state = ASM_INIT_RDBCHANNEL; - serverLog(LL_NOTICE, - "Source node replied to SYNCSLOTS SYNC, syncslots can continue..."); - } else if (!strncmp(err, "-NOTREADY", strlen("-NOTREADY"))) { - /* The source-side cluster is temporarily not ready to start a - * migration and replied -NOTREADY. We could fail this attempt and - * let the import task start another attempt later but that could - * trigger unnecessary cleanup in the cluster implementation. - * Instead, we'll retry sending SYNCSLOTS later in asmCron(). */ - sdsfree(err); - task->state = ASM_SEND_SYNCSLOTS; - serverLog(LL_NOTICE, - "Source node replied to SYNCSLOTS SYNC with -NOTREADY, will retry later..."); - return; - } else { - task_error_msg = sdscatprintf(sdsempty(), - "Error reply to CLUSTER SYNCSLOTS SYNC from the source: %s", err); - sdsfree(err); - goto error; - } - } - - if (task->state == ASM_INIT_RDBCHANNEL) { - /* Create RDB channel connection */ - char *ip = clusterNodeIp(task->source_node); - int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) : - clusterNodeTcpPort(task->source_node); - task->rdb_channel_conn = connCreate(server.el, connTypeOfReplication()); - if (connConnect(task->rdb_channel_conn, ip, port, - server.bind_source_addr, asmRdbChannelSyncWithSource) == C_ERR) - { - serverLog(LL_WARNING, "Unable to connect to the source node: %s", - connGetLastError(task->rdb_channel_conn)); - goto error; - } - task->rdb_channel_state = ASM_CONNECTING; - connSetPrivateData(task->rdb_channel_conn, task); - serverLog(LL_NOTICE, - "RDB channel connection to source node %.40s established, waiting for AUTH reply...", - task->source); - - /* Main channel waits for the new event */ - connSetReadHandler(conn, NULL); - return; - } - return; - -no_response_error: - serverLog(LL_WARNING, "Source node did not respond to command during SYNCSLOTS handshake"); - /* Fall through to regular error handling */ - -error: - asmTaskSetFailed(task, "Main channel - Failed to sync with source node: %s", - task_error_msg ? task_error_msg : connGetLastError(conn)); - sdsfree(task_error_msg); - return; - -write_error: /* Handle sendCommand() errors. */ - serverLog(LL_WARNING, "Failed to send command to source node: %s", err); - sdsfree(err); - goto error; -} - -int asmImportSendACK(asmTask *task) { - serverAssert(task->operation == ASM_IMPORT && task->state == ASM_WAIT_STREAM_EOF); - serverLog(LL_DEBUG, "Destination node applied offset is %lld", task->dest_offset); - - char offset[64]; - ull2string(offset, sizeof(offset), task->dest_offset); - - char *err = sendCommand(task->main_channel_conn, "CLUSTER", "SYNCSLOTS", "ACK", - asmTaskStateToString(task->state), offset, NULL); - if (err) { - asmTaskSetFailed(task, "Main channel - Failed to send ACK: %s", err); - sdsfree(err); - return C_ERR; - } - return C_OK; -} - -/* Called when the RDB channel begins sending the snapshot. - * From this point on, the main channel also starts sending incremental streams. */ -void asmSlotSnapshotAndStreamStart(struct asmTask *task) { - if (task == NULL || task->state != ASM_WAIT_BGSAVE_START) return; - - if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_RDB_CHANNEL, task->state))) { - shutdown(task->rdb_channel_client->conn->fd, SHUT_RDWR); - return; - } - task->main_channel_client->replstate = SLAVE_STATE_SEND_BULK_AND_STREAM; - - task->state = ASM_SEND_BULK_AND_STREAM; - task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER; - - /* From the source node's perspective, the destination node begins to accumulate - * the buffer while the RDB channel starts applying the slot snapshot data. */ - task->dest_state = ASM_ACCUMULATE_BUF; - task->dest_slots_snapshot_time = server.mstime; -} - -/* Called when the RDB channel has succeeded in sending the snapshot. */ -void asmSlotSnapshotSucceed(struct asmTask *task) { - if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; - - /* The destination starts sending ACKs to keep the main channel alive after - * receiving the snapshot, so here we need to update the last interaction - * time to avoid false timeout. */ - task->main_channel_client->lastinteraction = server.unixtime; - - task->state = ASM_SEND_STREAM; - task->rdb_channel_state = ASM_COMPLETED; -} - -/* Called when the RDB channel fails to send the snapshot. */ -void asmSlotSnapshotFailed(struct asmTask *task) { - if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return; - - asmTaskSetFailed(task, "RDB channel - Failed to send slots snapshot"); -} - -/* CLUSTER SYNCSLOTS SNAPSHOT-EOF - * - * This command is sent by the source node to the destination node to indicate - * that the slots snapshot has ended. */ -void clusterSyncSlotsSnapshotEOF(client *c) { - /* This client is RDB channel connection. */ - asmTask *task = c->task; - if (!task || task->rdb_channel_state != ASM_RDBCHANNEL_TRANSFER || - c->conn != task->rdb_channel_conn) - { - /* Unexpected SNAPSHOT-EOF command */ - serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS SNAPSHOT-EOF command: " - "rdb_channel_state=%s", - asmTaskStateToString(task ? task->rdb_channel_state : ASM_NONE)); - freeClientAsync(c); - return; - } - - /* RDB channel state: ASM_RDBCHANNEL_TRANSFER */ - if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) { - freeClientAsync(c); /* Simulate a failure */ - return; - } - - /* Clear the RDB channel connection */ - task->rdb_channel_conn = NULL; - task->rdb_channel_state = ASM_COMPLETED; - serverLog(LL_NOTICE, "RDB channel snapshot transfer completed for the import task."); - - /* Free the RDB channel connection. */ - c->task = NULL; - c->flags &= ~CLIENT_MASTER; - freeClientAsync(c); - - /* Will start streaming the buffer to DB, don't start here since now - * we are in the context of executing command, otherwise, redis will - * generate a big MULTI-EXEC including all the commands in the buffer. - * just update the state here, and do it in beforeSleep(). */ - task->state = ASM_READY_TO_STREAM; - connSetReadHandler(task->main_channel_conn, NULL); -} - -/* CLUSTER SYNCSLOTS STREAM-EOF - * - * This command is sent by the source node to the destination node to indicate - * that the slot sync stream has ended and the slots can be handed off. */ -void clusterSyncSlotsStreamEOF(client *c) { - asmTask *task = c->task; - - if (!task || task->operation != ASM_IMPORT) { - serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF command"); - freeClientAsync(c); - return; - } - - if (task->state == ASM_STREAMING_BUF) { - /* We are still streaming the buffer to DB, mark the EOF received, and we - * can take over after streaming is EOF. Since we may release the context - * in asmImportTakeover, this breaks the context for streaming buffer. */ - task->stream_eof_during_streaming = 1; - serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received during streaming buffer"); - return; - } - - if (task->state != ASM_WAIT_STREAM_EOF) { - serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF state: %s", - asmTaskStateToString(task->state)); - freeClientAsync(c); - return; - } - serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received when waiting for STREAM-EOF"); - - /* STREAM-EOF received, the source is ready to handoff, takeover now. */ - asmImportTakeover(task); -} - -/* Start the import task. */ -static void asmStartImportTask(asmTask *task) { - if (task->operation != ASM_IMPORT || task->state != ASM_NONE) return; - sds slots_str = slotRangeArrayToString(task->slots); - - /* Sanity check: Clean up any keys that exist in slots not owned by this node. - * This handles cases where users previously migrated slots using legacy method - * but left behind orphaned keys, or maybe cluster missed cleaning up during - * previous operations, which could interfere with the ASM import process. */ - asmTrimSlotsIfNotOwned(task->slots); - - /* Check if there is any trim job in progress for the slot ranges. - * We can't start the import task since the trim job will modify the data.*/ - int trim_in_progress = asmIsAnyTrimJobOverlaps(task->slots); - - /* Notify the cluster implementation to prepare for the import task. */ - int impl_ret = clusterAsmOnEvent(task->id, ASM_EVENT_IMPORT_PREP, task->slots); - - /* We do not start the import task if trim is disabled by module. */ - int disabled_by_module = server.cluster_module_trim_disablers > 0; - - static int start_blocked_logged = 0; - /* Cannot start import task since pause action is performed. Otherwise, we - * will break the promise that no writes are performed during the pause. */ - if (isPausedActions(PAUSE_ACTION_CLIENT_ALL) || - isPausedActions(PAUSE_ACTION_CLIENT_WRITE) || - trim_in_progress || - impl_ret != C_OK || - disabled_by_module) - { - const char *reason = disabled_by_module ? "trim is disabled by module" : - impl_ret != C_OK ? "cluster is not ready" : - trim_in_progress ? "trim in progress for some of the slots" : - "server paused"; - if (start_blocked_logged == 0) { - serverLog(LL_WARNING, "Can not start import task %s for slots: %s due to %s", - task->id, slots_str, reason); - start_blocked_logged = 1; - } - sdsfree(slots_str); - return; - } - start_blocked_logged = 0; /* Reset the log flag */ - - /* Detect if the cluster topology is changed. We should cancel the task if - * we can not schedule it, and update the source node if needed. */ - sds err = NULL; - clusterNode *source = validateImportSlotRanges(task->slots, &err, task); - if (!source) { - asmTaskCancel(task, err); - sdsfree(slots_str); - sdsfree(err); - return; - } - /* Now I'm the owner of the slot range, cancel the import task. */ - if (source == getMyClusterNode()) { - asmTaskCancel(task, "slots owned by myself now"); - sdsfree(slots_str); - return; - } - /* Change the source node if needed. */ - if (source != task->source_node) { - task->source_node = source; - memcpy(task->source, clusterNodeGetName(source), CLUSTER_NAMELEN); - serverLog(LL_NOTICE, "Import task %s source node changed: slots=%s, " - "new_source=%.40s", task->id, slots_str, clusterNodeGetName(source)); - } - sdsfree(slots_str); - - task->state = ASM_CONNECTING; - task->start_time = server.mstime; - asmNotifyStateChange(task, ASM_EVENT_IMPORT_STARTED); - - task->main_channel_conn = connCreate(server.el, connTypeOfReplication()); - char *ip = clusterNodeIp(task->source_node); - int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) : - clusterNodeTcpPort(task->source_node); - if (connConnect(task->main_channel_conn, ip, port, server.bind_source_addr, - asmSyncWithSource) == C_ERR) - { - asmTaskSetFailed(task, "Main channel - Failed to connect to source node: %s", - connGetLastError(task->main_channel_conn)); - return; - } - connSetPrivateData(task->main_channel_conn, task); -} - -void clusterSyncSlotsCommand(client *c) { - /* Only internal clients are allowed to execute this command to avoid - * potential attack, since some state changes are not well protected, - * external clients may damage the slot migration state. */ - if (!(c->flags & (CLIENT_INTERNAL | CLIENT_MASTER))) { - addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for internal clients"); - c->flags |= CLIENT_CLOSE_AFTER_REPLY; - return; - } - - /* On replica, only allow master client to execute CONF subcommand. */ - if (!clusterNodeIsMaster(getMyClusterNode())) { - if (!(c->flags & CLIENT_MASTER)) { - /* Not master client, reject all subcommands and close the connection. */ - addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for master"); - c->flags |= CLIENT_CLOSE_AFTER_REPLY; - return; - } else { - /* Only allow CONF subcommand on replica. */ - if (strcasecmp(c->argv[2]->ptr, "conf")) return; - } - } - - if (!strcasecmp(c->argv[2]->ptr, "sync") && c->argc >= 6) { - /* CLUSTER SYNCSLOTS SYNC <ID> <start-slot> <end-slot> [<start-slot> <end-slot>] */ - if (c->argc % 2 == 1) { - addReplyErrorArity(c); - return; - } - - slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 4); - if (!slots) return; - - /* Validate that the slot ranges are valid and that migration can be - * initiated for them. */ - sds err = NULL; - clusterNode *source = validateImportSlotRanges(slots, &err, NULL); - if (!source) { - addReplyErrorSds(c, err); - slotRangeArrayFree(slots); - return; - } - - /* Check if the source node is the same as the current node. */ - if (source != getMyClusterNode()) { - addReplyError(c, "This node is not the owner of the slots"); - slotRangeArrayFree(slots); - return; - } - - /* Verify the destination node is known and is a master. */ - if (c->node_id) { - clusterNode *dest = clusterLookupNode(c->node_id, CLUSTER_NAMELEN); - if (dest == NULL || !clusterNodeIsMaster(dest)) { - addReplyErrorFormat(c, "Destination node %.40s is not a master", c->node_id); - slotRangeArrayFree(slots); - return; - } - } - - sds task_id = c->argv[3]->ptr; - /* Notify the cluster implementation to prepare for the migrate task. */ - if (clusterAsmOnEvent(task_id, ASM_EVENT_MIGRATE_PREP, slots) != C_OK || - asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, ASM_NONE)) - { - addReplyError(c, "-NOTREADY Cluster is not ready to migrate slots"); - slotRangeArrayFree(slots); - return; - } - - /* We do not start the migrate task if trim is disabled by module. */ - int disabled_by_module = server.cluster_module_trim_disablers > 0; - if (disabled_by_module) { - addReplyError(c, "Trim is disabled by module"); - slotRangeArrayFree(slots); - return; - } - - asmTask *task = listLength(asmManager->tasks) == 0 ? NULL : - listNodeValue(listFirst(asmManager->tasks)); - if (task && !strcmp(task->id, task_id) && - task->operation == ASM_MIGRATE && task->state == ASM_FAILED && - slotRangeArrayIsEqual(slots, task->slots) && - memcmp(task->dest, c->node_id, CLUSTER_NAMELEN) == 0) - { - /* Reuse the failed task */ - asmTaskReset(task); - slotRangeArrayFree(task->slots); /* Will be set again later */ - task->retry_count++; - } else if (task) { - if (task->state == ASM_FAILED) { - /* We can create a new migrate task only if the current one is - * failed, cancel the failed task to create a new one. */ - asmTaskCancel(task, "new migration requested"); - task = NULL; - } else { - addReplyError(c, "Another ASM task is already in progress"); - slotRangeArrayFree(slots); - return; - } - } - - /* Create the migrate slots task and add it to the list, - * otherwise reuse the existing one */ - if (task == NULL) { - task = asmTaskCreate(task_id); - task->start_time = server.mstime; /* Start immediately */ - serverAssert(listLength(asmManager->tasks) == 0); - listAddNodeTail(asmManager->tasks, task); - } - - task->slots = slots; - task->operation = ASM_MIGRATE; - memcpy(task->source, clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN); - if (c->node_id) memcpy(task->dest, c->node_id, CLUSTER_NAMELEN); - - task->main_channel_client = c; - c->task = task; - - /* We mark the main channel client as a replica, so this client is limited - * by the client output buffer settings for replicas. The replstate has - * no real significance, just to prevent it from going online. */ - c->flags |= (CLIENT_SLAVE | CLIENT_ASM_MIGRATING); - c->replstate = SLAVE_STATE_WAIT_RDB_CHANNEL; - if (server.repl_disable_tcp_nodelay) - connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */ - listAddNodeTail(server.slaves, c); - createReplicationBacklogIfNeeded(); - - /* Wait for RDB channel to be ready */ - task->state = ASM_WAIT_RDBCHANNEL; - - sds slots_str = slotRangeArrayToString(slots); - serverLog(LL_NOTICE, "Migrate task %s created: src=%.40s, dest=%.40s, slots=%s", - task->id, task->source, task->dest, slots_str); - sdsfree(slots_str); - - asmNotifyStateChange(task, ASM_EVENT_MIGRATE_STARTED); - - /* Keep the client in the main thread to avoid data races between the - * connWrite call below and the client's event handler in IO threads. */ - if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); - - /* addReply*() is not suitable for clients in SLAVE_STATE_WAIT_RDB_CHANNEL state. */ - if (connWrite(c->conn, "+RDBCHANNELSYNCSLOTS\r\n", 22) != 22) - freeClientAsync(c); - } else if (!strcasecmp(c->argv[2]->ptr, "rdbchannel") && c->argc == 4) { - /* CLUSTER SYNCSLOTS RDBCHANNEL <task-id> */ - sds task_id = c->argv[3]->ptr; - if (sdslen(task_id) != CLUSTER_NAMELEN) { - addReplyError(c, "Invalid task id"); - return; - } - - if (listLength(asmManager->tasks) == 0) { - addReplyError(c, "No slot migration task in progress"); - return; - } - - asmTask *task = listNodeValue(listFirst(asmManager->tasks)); - if (task->operation != ASM_MIGRATE || task->state != ASM_WAIT_RDBCHANNEL || - strcmp(task->id, task_id) != 0) - { - addReplyError(c, "Another migration task is already in progress"); - return; - } - - if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) { - /* Close the main channel client before rdb channel client connects */ - if (task->main_channel_client) - freeClient(task->main_channel_client); - } - - /* The main channel client must be present when setting RDB channel client */ - if (task->main_channel_client == NULL) { - /* Maybe the main channel connection is closed. */ - addReplyError(c, "Main channel connection is not established"); - return; - } - - /* Mark the client as a slave to generate slots snapshot */ - c->flags |= (CLIENT_SLAVE | CLIENT_REPL_RDB_CHANNEL | CLIENT_REPL_RDBONLY | CLIENT_ASM_MIGRATING); - c->slave_capa |= SLAVE_CAPA_EOF; - c->slave_req |= (SLAVE_REQ_SLOTS_SNAPSHOT | SLAVE_REQ_RDB_CHANNEL); - c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; - c->repldbfd = -1; - if (server.repl_disable_tcp_nodelay) - connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */ - listAddNodeTail(server.slaves, c); - - /* Wait for bgsave to start for slots sync */ - task->state = ASM_WAIT_BGSAVE_START; - task->rdb_channel_state = ASM_WAIT_BGSAVE_START; - task->rdb_channel_client = c; - c->task = task; - - /* Keep the client in the main thread to avoid data races between the - * connWrite call in startBgsaveForReplication and the client's event - * handler in IO threads. */ - if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c); - - if (!hasActiveChildProcess()) { - startBgsaveForReplication(c->slave_capa, c->slave_req); - } else { - serverLog(LL_NOTICE, "BGSAVE for slots snapshot sync delayed"); - } - } else if (!strcasecmp(c->argv[2]->ptr, "snapshot-eof") && c->argc == 3) { - /* CLUSTER SYNCSLOTS SNAPSHOT-EOF */ - clusterSyncSlotsSnapshotEOF(c); - } else if (!strcasecmp(c->argv[2]->ptr, "stream-eof") && c->argc == 3) { - /* CLUSTER SYNCSLOTS STREAM-EOF */ - clusterSyncSlotsStreamEOF(c); - } else if (!strcasecmp(c->argv[2]->ptr, "ack") && c->argc == 5) { - /* CLUSTER SYNCSLOTS ACK <state> <offset> */ - long long offset; - int dest_state; - - if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_STREAMING_BUF))) { - dest_state = ASM_STREAMING_BUF; - } else if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_WAIT_STREAM_EOF))) { - dest_state = ASM_WAIT_STREAM_EOF; - } else { - return; /* Not support now. */ - } - - if ((getLongLongFromObject(c->argv[4], &offset) != C_OK)) - return; - - if (c->task && c->task->operation == ASM_MIGRATE) { - /* Update the state and ACKed offset from destination. */ - asmTask *task = c->task; - task->dest_state = dest_state; - if (task->dest_offset > (unsigned long long) offset) { - serverLog(LL_WARNING, "CLUSTER SYNCSLOTS ACK received, dest state: %s, " - "but offset %lld is less than the current dest offset %lld", - asmTaskStateToString(dest_state), offset, task->dest_offset); - return; - } - task->dest_offset = offset; - serverLog(LL_DEBUG, "CLUSTER SYNCSLOTS ACK received, dest state: %s, " - "updated dest offset to %lld, source offset: %lld", - asmTaskStateToString(dest_state), task->dest_offset, task->source_offset); - - /* Record the time when the destination finishes applying the accumulated buffer */ - if (task->dest_state == ASM_WAIT_STREAM_EOF && task->dest_accum_applied_time == 0) - task->dest_accum_applied_time = server.mstime; - - /* Pause write if needed */ - if (task->state == ASM_SEND_BULK_AND_STREAM || task->state == ASM_SEND_STREAM) { - /* Pause writes on the main channel if the lag is less than the threshold. */ - if (task->dest_offset + server.asm_handoff_max_lag_bytes >= task->source_offset) { - if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, ASM_HANDOFF_PREP))) - return; /* Do not enter handoff prep state for testing buffer drain timeout. */ - - serverLog(LL_NOTICE, "The applied offset lag %lld is less than the threshold %lld, " - "pausing writes for slot handoff", - task->source_offset - task->dest_offset, - server.asm_handoff_max_lag_bytes); - task->state = ASM_HANDOFF_PREP; - asmLogTaskEvent(task, ASM_EVENT_HANDOFF_PREP); - clusterAsmOnEvent(task->id, ASM_EVENT_HANDOFF_PREP, task->slots); - } - } - } - } else if (!strcasecmp(c->argv[2]->ptr, "fail") && c->argc == 4) { - /* CLUSTER SYNCSLOTS FAIL <err> */ - return; /* This is a no-op, just to handle the command syntax. */ - } else if (!strcasecmp(c->argv[2]->ptr, "conf") && c->argc >= 5) { - /* CLUSTER SYNCSLOTS CONF <option> <value> [<option> <value>] */ - for (int j = 3; j < c->argc; j += 2) { - if (j + 1 >= c->argc) { - addReplyErrorArity(c); - return; - } - /* Handle each option here */ - if (!strcasecmp(c->argv[j]->ptr, "node-id")) { - /* node-id <node-id> */ - sds node_id = c->argv[j + 1]->ptr; - int node_id_len = (int) sdslen(node_id); - if (node_id_len != CLUSTER_NAMELEN) { - addReplyErrorFormat(c, "Invalid node id length %d", node_id_len); - return; - } - - /* Lookup the node in the cluster. */ - clusterNode *node = clusterLookupNode(node_id, node_id_len); - if (node == NULL) { - addReplyErrorFormat(c, "Node %s not found in cluster", node_id); - return; - } - - if (c->node_id) sdsfree(c->node_id); - c->node_id = sdsdup(node_id); - } else if (!strcasecmp(c->argv[j]->ptr, "slot-info")) { - /* slot-info slot:key_size:expire_size */ - int count; - long long slot, key_size, expire_size; - sds slot_info = c->argv[j + 1]->ptr; - sds *parts = sdssplitlen(slot_info, sdslen(slot_info), ":", 1, &count); - - /* Validate the slot info format, parse slot, key_size, expire_size */ - if (parts == NULL || count != 3 || - (string2ll(parts[0], sdslen(parts[0]), &slot) == 0 || slot < 0 || slot >= CLUSTER_SLOTS) || - (string2ll(parts[1], sdslen(parts[1]), &key_size) == 0 || key_size < 0) || - (string2ll(parts[2], sdslen(parts[2]), &expire_size) == 0 || expire_size < 0)) - { - addReplyErrorFormat(c, "Invalid slot info: %s", slot_info); - sdsfreesplitres(parts, count); - return; - } - - /* We resize individual slot specific dictionaries. */ - redisDb *db = c->db; - serverAssert(db->id == 0); /* Only support DB 0 for cluster mode. */ - kvstoreDictExpand(db->keys, slot, key_size); - kvstoreDictExpand(db->expires, slot, expire_size); - - sdsfreesplitres(parts, count); - } else if (!strcasecmp(c->argv[j]->ptr, "asm-task")) { - /* asm-task task_id:source_node:dest_node:operation:state:slot_ranges */ - if (clusterNodeIsMaster(getMyClusterNode())) { - addReplyError(c, "CLUSTER SYNCSLOTS CONF ASM-TASK only allowed on replica"); - return; - } - if (asmReplicaHandleMasterTask(c->argv[j + 1]->ptr) != C_OK) { - addReplyErrorFormat(c, "Failed to handle master task: %s", - (char *)c->argv[j + 1]->ptr); - } - } else if (!strcasecmp(c->argv[j]->ptr, "capa")) { - /* Ignore unrecognized capabilities. This is for future extensions. */ - } else { - addReplyErrorFormat(c, "Unknown option %s", (char *)c->argv[j]->ptr); - } - } - addReply(c, shared.ok); - } else { - addReplyErrorObject(c, shared.syntaxerr); - } -} - -/* Save a key-value pair to stream I/O using either RESTORE or AOF format. */ -static int slotSnapshotSaveKeyValuePair(rio *rdb, kvobj *o, int dbid) { - /* Get the expire time */ - long long expiretime = kvobjGetExpire(o); - - /* Set on stack string object for key */ - robj key; - initStaticStringObject(key, kvobjGetKey(o)); - - /* If module object or non-string object that is not too big, - * use RESTORE command (RDB format) to migrate data. - * Generally RDB binary format is more efficient, but it may cause - * block in the destination if the object is too large, so fall back - * to AOF format if necessary. */ - if ((o->type == OBJ_MODULE) || - (o->type != OBJ_STRING && getObjectLength(o) <= ASM_AOF_MIN_ITEMS_PER_KEY)) - { - if (rioWriteBulkCount(rdb, '*', 5) == 0) return C_ERR; - if (rioWriteBulkString(rdb, "RESTORE", 7) == 0) return C_ERR; - if (rioWriteBulkObject(rdb, &key) == 0) return C_ERR; - if (rioWriteBulkLongLong(rdb, expiretime == -1 ? 0 : expiretime) == 0) return C_ERR; - - /* Create the DUMP encoded representation. */ - rio payload; - createDumpPayload(&payload, o, &key, dbid, 1); - sds buf = payload.io.buffer.ptr; - if (rioWriteBulkString(rdb, buf, sdslen(buf)) == 0) { - sdsfree(payload.io.buffer.ptr); - return C_ERR; - } - sdsfree(payload.io.buffer.ptr); - - /* Write ABSTTL */ - if (rioWriteBulkString(rdb, "ABSTTL", 6) == 0) return C_ERR; - } else { - /* Use AOF format to migrate data */ - if (rewriteObject(rdb, &key, o, dbid, expiretime) == C_ERR) return C_ERR; - } - - return C_OK; -} - -/* Modules can use RM_ClusterPropagateForSlotMigration() during the - * CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE event to propagate commands - * that should be delivered just before the slot snapshot delivery starts. This - * function triggers the event, collects the commands and writes them to the rio. */ -static int propagateModuleCommands(asmTask *task, rio *rdb) { - RedisModuleClusterSlotMigrationInfo info = { - .version = REDISMODULE_CLUSTER_SLOT_MIGRATION_INFO_VERSION, - .task_id = task->id, - .slots = (RedisModuleSlotRangeArray *) task->slots - }; - memcpy(info.source_node_id, task->source, CLUSTER_NAMELEN); - memcpy(info.destination_node_id, task->dest, CLUSTER_NAMELEN); - - task->pre_snapshot_module_cmds = zcalloc(sizeof(*task->pre_snapshot_module_cmds)); - moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION, - REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE, - &info - ); - - int ret = C_OK; - /* Write the module commands to the rio */ - for (int i = 0; i < task->pre_snapshot_module_cmds->numops; i++) { - redisOp *op = &task->pre_snapshot_module_cmds->ops[i]; - if (rioWriteBulkCount(rdb, '*', op->argc) == 0) { - ret = C_ERR; - break; - } - for (int j = 0; j < op->argc; j++) - if (rioWriteBulkObject(rdb, op->argv[j]) == 0) { - ret = C_ERR; - break; - } - } - redisOpArrayFree(task->pre_snapshot_module_cmds); - zfree(task->pre_snapshot_module_cmds); - task->pre_snapshot_module_cmds = NULL; - return ret; -} - -/* Save the slot ranges snapshot to the file. It generates the DUMP encoded - * representation of each key in the slot ranges and writes it to the file. - * - * Returns C_OK on success, or C_ERR on error. */ -int slotSnapshotSaveRio(int req, rio *rdb, int *error) { - serverAssert(req & SLAVE_REQ_SLOTS_SNAPSHOT); - - dictEntry *de; - kvstoreDictIterator kvs_di; - - if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_RDB_CHANNEL, ASM_SEND_BULK_AND_STREAM))) - rioAbort(rdb); /* Simulate a failure */ - - /* Disable RDB compression for slots snapshot since compression is too - * expensive both in source and destination. */ - server.rdb_compression = 0; - - /* Only support a single migrate task */ - serverAssert(listLength(asmManager->tasks) == 1); - asmTask *task = listNodeValue(listFirst(asmManager->tasks)); - serverAssert(task->operation == ASM_MIGRATE); - - if (propagateModuleCommands(task, rdb) == C_ERR) goto werr; - - /* Dump functions and send to destination side. */ - rio payload; - createFunctionDumpPayload(&payload); - sds functions = payload.io.buffer.ptr; - if (rioWriteBulkCount(rdb, '*', 4) == 0) goto werr; - if (rioWriteBulkString(rdb, "FUNCTION", 8) == 0) goto werr; - if (rioWriteBulkString(rdb, "RESTORE", 7) == 0) goto werr; - if (rioWriteBulkString(rdb, functions, sdslen(functions)) == 0) { - sdsfree(payload.io.buffer.ptr); - goto werr; - } - sdsfree(payload.io.buffer.ptr); - /* Add the REPLACE option to the RESTORE command, to avoid error - * when migrating to a node with existing libraries. */ - if (rioWriteBulkString(rdb, "REPLACE", 7) == 0) goto werr; - - for (int i = 0; i < server.dbnum; i++) { - char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; - redisDb *db = server.db + i; - if (kvstoreSize(db->keys) == 0) continue; - - /* SELECT the new DB */ - if (rioWrite(rdb,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; - if (rioWriteBulkLongLong(rdb, i) == 0) goto werr; - - /* Iterate all slot ranges, and generate the DUMP encoded - * representation of each key in the DB. */ - for (int j = 0; j < task->slots->num_ranges; j++) { - slotRange *sr = &task->slots->ranges[j]; - /* Iterate all keys in the slot range */ - for (int k = sr->start; k <= sr->end; k++) { - int send_slot_info = 0; - - kvstoreInitDictIterator(&kvs_di, server.db->keys, k); - while ((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) { - /* Send slot info before the first key in the slot */ - if (!send_slot_info) { - /* Format slot info */ - char buf[128]; - int len = snprintf(buf, sizeof(buf), "%d:%lu:%lu", - k, kvstoreDictSize(db->keys, k), - kvstoreDictSize(db->expires, k)); - serverAssert(len > 0 && len < (int)sizeof(buf)); - - /* Send slot info */ - if (rioWriteBulkCount(rdb, '*', 5) == 0) goto werr2; - if (rioWriteBulkString(rdb, "CLUSTER", 7) == 0) goto werr2; - if (rioWriteBulkString(rdb, "SYNCSLOTS", 9) == 0) goto werr2; - if (rioWriteBulkString(rdb, "CONF", 4) == 0) goto werr2; - if (rioWriteBulkString(rdb, "SLOT-INFO", 9) == 0) goto werr2; - if (rioWriteBulkString(rdb, buf, len) == 0) goto werr2; - send_slot_info = 1; - } - - /* Save a key-value pair */ - kvobj *o = dictGetKV(de); - if (slotSnapshotSaveKeyValuePair(rdb, o, db->id) == C_ERR) goto werr2; - - /* Delay return if required (for testing) */ - if (unlikely(server.rdb_key_save_delay)) { - /* Send buffer to the destination ASAP. */ - if (rioFlush(rdb) == 0) goto werr2; - debugDelay(server.rdb_key_save_delay); - } - } - kvstoreResetDictIterator(&kvs_di); - } - } - } - - /* Write the end of the snapshot file command */ - if (rioWriteBulkCount(rdb, '*', 3) == 0) goto werr; - if (rioWriteBulkString(rdb, "CLUSTER", 7) == 0) goto werr; - if (rioWriteBulkString(rdb, "SYNCSLOTS", 9) == 0) goto werr; - if (rioWriteBulkString(rdb, "SNAPSHOT-EOF", 12) == 0) goto werr; - return C_OK; - -werr2: - kvstoreResetDictIterator(&kvs_di); -werr: - if (error) *error = errno; - return C_ERR; -} - -/* Read error handler for sync buffer */ -static void asmReadSyncBufferErrorHandler(connection *conn) { - if (listLength(asmManager->tasks) == 0) return; - asmTask *task = listNodeValue(listFirst(asmManager->tasks)); - if (task->state != ASM_ACCUMULATE_BUF && task->state != ASM_STREAMING_BUF) return; - - if (task->state == ASM_STREAMING_BUF) { - freeClient(connGetPrivateData(conn)); - } else { - asmTaskSetFailed(task, "Main channel - Read error: %s", connGetLastError(conn)); - } -} - -/* Read data from connection into sync buffer. */ -static void asmSyncBufferReadFromConn(connection *conn) { - /* The task may be canceled (move to finished list) or failed during streaming buffer. */ - if (listLength(asmManager->tasks) == 0) return; - asmTask *task = listNodeValue(listFirst(asmManager->tasks)); - if (task->state != ASM_ACCUMULATE_BUF && task->state != ASM_STREAMING_BUF) return; - - /* ASM_ACCUMULATE_BUF and ASM_STREAMING_BUF fail points are handled here */ - if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_MAIN_CHANNEL, task->state))) - shutdown(conn->fd, SHUT_RDWR); - - replDataBuf *buf = &task->sync_buffer; - if (task->state == ASM_STREAMING_BUF) { - /* While streaming accumulated buffers, we continue reading from the - * source to prevent accumulation on source side as much as possible. - * However, we aim to drain buffer eventually. To ensure we consume more - * than we read, we'll read at most one block after two blocks of - * buffers are consumed. */ - if (listLength(buf->blocks) + 1 >= buf->last_num_blocks) - return; - buf->last_num_blocks = listLength(buf->blocks); - } - - replDataBufReadFromConn(conn, buf, asmReadSyncBufferErrorHandler); -} - -static void asmSyncBufferStreamYieldCallback(void *ctx) { - replDataBufToDbCtx *context = ctx; - asmTask *task = context->privdata; - client *c = context->client; - - char offset[64]; - ull2string(offset, sizeof(offset), context->applied_offset); - - char *err = sendCommand(c->conn, "CLUSTER", "SYNCSLOTS", "ACK", - asmTaskStateToString(task->state), offset, NULL); - if (err) { - serverLog(LL_WARNING, "Error sending CLUSTER SYNCSLOTS ACK: %s", err); - sdsfree(err); - freeClient(c); - } - serverLog(LL_DEBUG, "Yielding sending ACK during streaming buffer, applied offset: %zu", - context->applied_offset); -} - -static int asmSyncBufferStreamShouldContinue(void *ctx) { - replDataBufToDbCtx *context = ctx; - - /* If the task is failed or canceled, we should stop streaming immediately. */ - asmTask *task = context->privdata; - if (task->state == ASM_FAILED || task->state == ASM_CANCELED) return 0; - - /* Check the client-close flag only if the task has not failed or been canceled, - * otherwise the client may have already been freed. */ - if (context->client->flags & CLIENT_CLOSE_ASAP) return 0; - - return 1; -} - -/* Stream the sync buffer to the database. */ -void asmSyncBufferStreamToDb(asmTask *task) { - task->state = ASM_STREAMING_BUF; - serverLog(LL_NOTICE, "Starting to stream accumulated buffer for the import task (%zu bytes)", - task->sync_buffer.used); - - /* The buffered stream from the main channel connection into - * the database is processed by a fake client. */ - client *c = createClient(task->main_channel_conn); - c->flags |= (CLIENT_MASTER | CLIENT_INTERNAL | CLIENT_ASM_IMPORTING); - c->querybuf = sdsempty(); - c->authenticated = 1; - c->user = NULL; - c->task = task; - - /* Mark the peek buffer block count. We'll use it to verify we consume - * faster than we read from the source side. */ - task->sync_buffer.last_num_blocks = listLength(task->sync_buffer.blocks); - - /* Continue accumulating during streaming to prevent accumulation on source side. */ - connSetReadHandler(c->conn, asmSyncBufferReadFromConn); - - replDataBufToDbCtx ctx = { - .privdata = task, - .client = c, - .applied_offset = 0, - .should_continue = asmSyncBufferStreamShouldContinue, - .yield_callback = asmSyncBufferStreamYieldCallback, - }; - - /* Start streaming the buffer to the DB. This task may fail due to network - * errors or cancellations. We never release the task immediately; instead, - * it may be moved to the finished list. The actual free happens in serverCron, - * which ensures there is no use-after-free issue. */ - int ret = replDataBufStreamToDb(&task->sync_buffer, &ctx); - - if (ret == C_OK) { - if (task->stream_eof_during_streaming) { - /* STREAM-EOF received during streaming, we can take over now. */ - asmImportTakeover(task); - return; - } - - /* Update the dest offset according to applied bytes. */ - task->dest_offset = ctx.applied_offset; - /* Wait STREAM-EOF from the source node. */ - task->state = ASM_WAIT_STREAM_EOF; - connSetReadHandler(task->main_channel_conn, readQueryFromClient); - serverLog(LL_NOTICE, "Successfully streamed accumulated buffer for the import task, applied offset: %lld", - task->dest_offset); - - if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_MAIN_CHANNEL, task->state))) - shutdown(task->main_channel_conn->fd, SHUT_RDWR); /* Simulate a failure */ - - /* ACK offset after streaming buffer is done. */ - asmImportSendACK(task); - } else { - /* If the task is already canceled or failed, we don't need to do anything here. */ - if (task->state == ASM_FAILED || task->state == ASM_CANCELED) return; - - asmTaskSetFailed(task, "Main channel - Failed to stream into the DB"); - } -} - -void asmImportIncrAppliedBytes(struct asmTask *task, size_t bytes) { - if (!task || task->state != ASM_WAIT_STREAM_EOF) return; - task->dest_offset += bytes; -} - -/* Send STREAM-EOF if the sync buffer stream is drained. */ -void asmSendStreamEofIfDrained(asmTask *task) { - client *c = task->main_channel_client; - - /* The command streams for slot ranges have been drained. */ - if (!clientHasPendingReplies(c)) { - serverLog(LL_NOTICE, "Slot migration command stream drained, sending STREAM-EOF to the destination"); - - if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) - shutdown(c->conn->fd, SHUT_RDWR); - - /* Send STREAM-EOF to indicate the end of the stream. */ - char *err = sendCommand(c->conn, "CLUSTER", "SYNCSLOTS", "STREAM-EOF", NULL); - if (err) { - asmTaskSetFailed(task, "Main channel - Failed to send STREAM-EOF: %s", err); - sdsfree(err); - return; - } - - /* Even though the main channel client is no longer needed, we - * can't close it directly because the destination may still be - * sending ACKs over this connection. Instead, we leave it to the - * destination to close it. We just clear the task and client - * references */ - task->main_channel_client->task = NULL; - task->main_channel_client = NULL; - - /* There may be a delay to handle the disconnection of RDB channel, - * so we clear the task and client references here. */ - if (task->rdb_channel_client != NULL) { - task->rdb_channel_state = ASM_COMPLETED; - task->rdb_channel_client->task = NULL; - freeClientAsync(task->rdb_channel_client); - task->rdb_channel_client = NULL; - } - - task->state = ASM_STREAM_EOF; - } -} - -void asmBeforeSleep(void) { - asmTrimJobProcessPending(); - - if (listLength(asmManager->tasks) == 0) return; - asmTask *task = listNodeValue(listFirst(asmManager->tasks)); - - if (task->operation == ASM_IMPORT) { - if (task->state == ASM_NONE) - asmStartImportTask(task); - else if (task->state == ASM_READY_TO_STREAM) - asmSyncBufferStreamToDb(task); - } - - if (task->operation == ASM_MIGRATE) { - if (task->cross_slot_during_propagating) { - asmTaskCancel(task, "propagating cross slot command"); - return; - } - - if (task->state == ASM_HANDOFF) { - /* To avoid long pause, we fail the task if the pause takes too long. */ - if (server.mstime - task->paused_time >= server.asm_write_pause_timeout) { - asmTaskSetFailed(task, "Server paused timeout"); - return; - } - asmSendStreamEofIfDrained(task); - } else if (task->state == ASM_STREAM_EOF) { - /* In state ASM_STREAM_EOF (server is still paused), we are waiting - * for the destination node to broadcast the slot ownership change. - * But maybe the destination node is failed or network is not available, - * the source node may be paused forever. So we fail the task if it - * takes too long. - * - * NOTE: There is a tricky case where the destination node may advertise - * ownership of the slot, causing a temporary configuration conflict. - * However, the configuration will eventually converge. In most cases, - * the destination node becomes the winner, since it bumps its config - * epoch before taking over slot ownership. */ - if (server.mstime - task->paused_time >= server.asm_write_pause_timeout) - asmTaskSetFailed(task, "Server paused timeout"); - } - } -} - -void asmCron(void) { - static unsigned long long asm_cron_runs = 0; - asm_cron_runs++; - - if (listLength(asmManager->tasks) == 0) return; - asmTask *task = listNodeValue(listFirst(asmManager->tasks)); - - if (task->operation == ASM_IMPORT) { - if (task->state == ASM_FAILED) { - /* Retry every 1 second */ - if (asm_cron_runs % 10 == 0) { - asmTaskReset(task); - task->retry_count++; - serverAssert(task->state == ASM_NONE); - asmStartImportTask(task); - } - } else if (task->state == ASM_WAIT_STREAM_EOF) { - if (asmImportSendACK(task) == C_ERR) return; - - /* Check if the main channel is timed out */ - client *c = connGetPrivateData(task->main_channel_conn); - serverAssert(c->task == task); - if (server.unixtime - c->lastinteraction > server.repl_timeout) - asmTaskSetFailed(task, "Main channel - Connection timeout"); - } else if (task->state == ASM_ACCUMULATE_BUF && - task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER) - { - /* Check if the RDB channel is timed out */ - client *c = connGetPrivateData(task->rdb_channel_conn); - serverAssert(c->task == task); - if (server.unixtime - c->lastinteraction > server.repl_timeout) - asmTaskSetFailed(task, "RDB channel - Connection timeout"); - } else if (task->state == ASM_SEND_SYNCSLOTS) { - /* Rare case: the source node replied to SYNCSLOTS with -NOTREADY - * because it wasn't ready to start a migration. We'll retry - * SYNCSLOTS every second instead of failing the attempt which could - * trigger unnecessary cleanup in the cluster implementation. */ - if (asm_cron_runs % 10 == 0) - asmSyncWithSource(task->main_channel_conn); - } - } else if (task->operation == ASM_MIGRATE) { - if (task->state == ASM_SEND_STREAM) { - /* Currently, we only need to check the main channel timeout when sending streams. - * For RDB channel connections, the timeout is handled by the socket itself - * during writes in slotSnapshotSaveRio. */ - if (server.unixtime - task->main_channel_client->lastinteraction > server.repl_timeout) - asmTaskSetFailed(task, "Main channel - Connection timeout"); - - /* After the destination applies the accumulated buffer, the source continues - * sending commands for migrating slots. The destination keeps applying them, - * but the gap remains above the acceptable limit, which may cause endless - * synchronization. A timeout check is required to handle this case. - * - * The timeout is calculated as the maximum of two values: - * - A configurable timeout (cluster-slot-migration-sync-buffer-drain-timeout) to - * avoid false positives. - * - A dynamic timeout based on the time that the destination took to apply the - * slot snapshot and the accumulated buffer during slot snapshot delivery. - * The destination should be able to drain the remaining sync buffer in less - * time than this. We multiply it by 2 to be more conservative. */ - if (task->dest_state == ASM_WAIT_STREAM_EOF && task->dest_accum_applied_time && - server.mstime - task->dest_accum_applied_time > - max(server.asm_sync_buffer_drain_timeout, - (task->dest_accum_applied_time - task->dest_slots_snapshot_time) * 2)) - { - asmTaskSetFailed(task, "Sync buffer drain timeout"); - } - } - } - - /* Trim the archived tasks list if it grows too large */ - while (listLength(asmManager->archived_tasks) > (unsigned long)server.asm_max_archived_tasks) { - asmTask *oldest = listNodeValue(listLast(asmManager->archived_tasks)); - asmTaskFree(oldest); - listDelNode(asmManager->archived_tasks, listLast(asmManager->archived_tasks)); - } -} - -/* Cancel a specific task if ID is provided, otherwise cancel all tasks. */ -int clusterAsmCancel(const char *task_id, const char *reason) { - if (asmManager == NULL) return 0; - - if (task_id) { - asmTask *task = asmLookupTaskById(task_id); - if (!task) return 0; /* Not found */ - - asmTaskCancel(task, reason); - return 1; - } else { - int num_cancelled = 0; - listIter li; - listNode *ln; - - listRewind(asmManager->tasks, &li); - while ((ln = listNext(&li)) != NULL) { - asmTask *task = listNodeValue(ln); - asmTaskCancel(task, reason); - num_cancelled++; - } - return num_cancelled; - } -} - -/* Cancel all tasks that overlap with the given slot ranges. - * If slots is NULL, cancel all tasks. */ -int clusterAsmCancelBySlotRangeArray(struct slotRangeArray *slots, const char *reason) { - if (asmManager == NULL) return 0; - - int num_cancelled = 0; - listIter li; - listNode *ln; - listRewind(asmManager->tasks, &li); - while ((ln = listNext(&li)) != NULL) { - asmTask *task = listNodeValue(ln); - if (!slots || slotRangeArraysOverlap(task->slots, slots)) { - asmTaskCancel(task, reason); - num_cancelled++; - } - } - return num_cancelled; -} - -/* Cancel the task that overlap with the given slot. */ -int clusterAsmCancelBySlot(int slot, const char *reason) { - slotRange req = {slot, slot}; - if (asmManager == NULL) return 0; - - /* Cancel it if found. */ - asmTask *task = lookupAsmTaskBySlotRange(&req); - if (task) asmTaskCancel(task, reason); - - return task ? 1 : 0; -} - -/* Cancel all tasks that involve the given node. */ -int clusterAsmCancelByNode(void *node, const char *reason) { - if (asmManager == NULL || node == NULL) return 0; - - /* If the node to be deleted is myself, cancel all tasks. */ - clusterNode *n = node; - if (n == getMyClusterNode()) return clusterAsmCancel(NULL, reason); - - int num_cancelled = 0; - listIter li; - listNode *ln; - listRewind(asmManager->tasks, &li); - while ((ln = listNext(&li)) != NULL) { - asmTask *task = listNodeValue(ln); - /* Cancel the task if the source node is the one to be deleted, or - * the dest node is the one to be deleted. */ - if (task->source_node == n || - !memcmp(task->dest, clusterNodeGetName(n), CLUSTER_NAMELEN) || - !memcmp(task->source, clusterNodeGetName(n), CLUSTER_NAMELEN)) - { - asmTaskCancel(task, reason); - num_cancelled++; - } - } - return num_cancelled; -} - -/* Check if the slot is in an active ASM task. */ -int isSlotInAsmTask(int slot) { - slotRange req = {slot, slot}; - if (!asmManager) return 0; - - listIter li; - listNode *ln; - listRewind(asmManager->tasks, &li); - while ((ln = listNext(&li)) != NULL) { - asmTask *task = listNodeValue(ln); - if (slotRangeArrayOverlaps(task->slots, &req)) - return 1; - } - return 0; -} - -/* Check if the slot is in a pending trim job. It may happen if we can't trim - * the slots immediately due to a write pause or when active trim is in progress. */ -int isSlotInTrimJob(int slot) { - slotRange req = {slot, slot}; - - if (!asmManager || !asmIsTrimInProgress()) return 0; - - /* Check if the slot is in any pending trim job. */ - listIter li; - listNode *ln; - listRewind(asmManager->pending_trim_jobs, &li); - while ((ln = listNext(&li)) != NULL) { - slotRangeArray *slots = listNodeValue(ln); - if (slotRangeArrayOverlaps(slots, &req)) - return 1; - } - - /* Check if the slot is in any active trim job. */ - listRewind(asmManager->active_trim_jobs, &li); - while ((ln = listNext(&li)) != NULL) { - slotRangeArray *slots = listNodeValue(ln); - if (slotRangeArrayOverlaps(slots, &req)) - return 1; - } - return 0; -} - -int clusterAsmHandoff(const char *task_id, sds *err) { - serverAssert(task_id); - - asmTask *task = asmLookupTaskById(task_id); - if (!task || task->state != ASM_HANDOFF_PREP) { - *err = sdscatprintf(sdsempty(), "No suitable ASM task found for id: %s, task_state: %s", - task_id, task ? asmTaskStateToString(task->state) : "null"); - return C_ERR; - } - - task->state = ASM_HANDOFF; - task->paused_time = server.mstime; - - return C_OK; -} - -/* Notify Redis that the config is updated for the task. */ -int asmNotifyConfigUpdated(asmTask *task, sds *err) { - int event = -1; - - if (task->operation == ASM_IMPORT && task->state == ASM_TAKEOVER) { - event = ASM_EVENT_IMPORT_COMPLETED; - } else if (task->operation == ASM_MIGRATE && task->state == ASM_STREAM_EOF) { - event = ASM_EVENT_MIGRATE_COMPLETED; - } else { - *err = sdscatprintf(sdsempty(), - "ASM task is not in the correct state for config update: %s", - asmTaskStateToString(task->state)); - asmTaskCancel(task, "slots configuration updated"); - return C_ERR; - } - - /* Reset per-slot statistics for the migrated/imported ranges. - * Note: cluster_legacy.c also cleans up, so this may run twice, but - * required if an alternative cluster impl is in use. */ - for (int i = 0; i < task->slots->num_ranges; i++) { - slotRange *sr = &task->slots->ranges[i]; - for (int j = sr->start; j <= sr->end; j++) - clusterSlotStatReset(j); - } - - /* Clear error message if successful. */ - sdsfree(task->error); - task->error = sdsempty(); - task->state = ASM_COMPLETED; - - asmNotifyStateChange(task, event); - asmTaskFinalize(task); - - /* Trim the slots after the migrate task is completed. */ - if (event == ASM_EVENT_MIGRATE_COMPLETED) - asmTrimJobSchedule(task->slots); - - return C_OK; -} - -/* Import/Migrate task is done, config is updated. */ -int clusterAsmDone(const char *task_id, sds *err) { - serverAssert(task_id); - - asmTask *task = asmLookupTaskById(task_id); - if (!task) { - *err = sdscatprintf(sdsempty(), "No ASM task found for id: %s", task_id); - return C_ERR; - } - return asmNotifyConfigUpdated(task, err); -} - -int clusterAsmProcess(const char *task_id, int event, void *arg, char **err) { - int ret, num_cancelled; - sds errsds = NULL; - static char buf[256]; - - if (err) *err = NULL; - - switch (event) { - case ASM_EVENT_IMPORT_START: { - /* Validate the slot ranges. */ - slotRangeArray *slots = slotRangeArrayDup(arg); - if (slotRangeArrayNormalizeAndValidate(slots, &errsds) != C_OK) { - slotRangeArrayFree(slots); - ret = C_ERR; - break; - } - ret = asmCreateImportTask(task_id, slots, &errsds) ? C_OK : C_ERR; - break; - } - case ASM_EVENT_CANCEL: { - num_cancelled = clusterAsmCancel(task_id, "user request"); - if (arg) *((int *)arg) = num_cancelled; - ret = C_OK; - break; - } - case ASM_EVENT_HANDOFF: { - ret = clusterAsmHandoff(task_id, &errsds); - break; - } - case ASM_EVENT_DONE: { - ret = clusterAsmDone(task_id, &errsds); - break; - } - default: { - ret = C_ERR; - errsds = sdscatprintf(sdsempty(), "Unknown operation: %d", event); - break; - } - } - - if (ret != C_OK && errsds && err) { - snprintf(buf, sizeof(buf), "%s", errsds); - *err = buf; - } - sdsfree(errsds); - - return ret; -} - -/* Propagate TRIMSLOTS command to AOF and replicas. */ -static void propagateTrimSlots(slotRangeArray *slots) { - int argc = slots->num_ranges * 2 + 3; - robj **argv = zmalloc(sizeof(robj*) * argc); - argv[0] = createStringObject("TRIMSLOTS", 9); - argv[1] = createStringObject("RANGES", 6); - argv[2] = createStringObjectFromLongLong(slots->num_ranges); - for (int i = 0; i < slots->num_ranges; i++) { - argv[i*2+3] = createStringObjectFromLongLong(slots->ranges[i].start); - argv[i*2+4] = createStringObjectFromLongLong(slots->ranges[i].end); - } - - enterExecutionUnit(1, 0); - - int prev_replication_allowed = server.replication_allowed; - server.replication_allowed = 1; - alsoPropagate(-1, argv, argc, PROPAGATE_AOF | PROPAGATE_REPL); - server.replication_allowed = prev_replication_allowed; - - exitExecutionUnit(); - postExecutionUnitOperations(); - - for (int i = 0; i < argc; i++) - decrRefCount(argv[i]); - zfree(argv); -} - -/* If this node is a replica and there is an active trim or a pending trim - * job (due to write pause), we cannot process commands from the master for the - * slots that are waiting to be trimmed. Otherwise, the trim cycle could - * mistakenly delete newly added keys. In this case, the master will be blocked - * until the trim job finishes. This is supposed to be a rare event as it needs - * to migrate slots and import them back before the trim job is done. */ -void asmUnblockMasterAfterTrim(void) { - if (server.master && - server.master->flags & CLIENT_BLOCKED && - server.master->bstate.btype == BLOCKED_POSTPONE_TRIM) - { - unblockClient(server.master, 1); - serverLog(LL_NOTICE, "Unblocking master client after active trim is completed"); - } -} - -/* Trim the slots asynchronously in the BIO thread. */ -void asmTriggerBackgroundTrim(slotRangeArray *slots) { - RedisModuleClusterSlotMigrationTrimInfoV1 fsi = { - REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION, - (RedisModuleSlotRangeArray *) slots - }; - - moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, - REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_BACKGROUND, - &fsi); - - signalFlushedDb(0, 1, slots); - - /* Create temp kvstores and estore, move relevant slot dicts/ebuckets into them, - * and delete them in BIO thread asynchronously. */ - kvstore *keys = kvstoreCreate(&kvstoreBaseType, &dbDictType, - CLUSTER_SLOT_MASK_BITS, - KVSTORE_ALLOCATE_DICTS_ON_DEMAND); - kvstore *expires = kvstoreCreate(&kvstoreBaseType, &dbExpiresDictType, - CLUSTER_SLOT_MASK_BITS, - KVSTORE_ALLOCATE_DICTS_ON_DEMAND); - estore *subexpires = estoreCreate(&subexpiresBucketsType, CLUSTER_SLOT_MASK_BITS); - - size_t total_keys = 0; - - for (int i = 0; i < slots->num_ranges; i++) { - for (int slot = slots->ranges[i].start; slot <= slots->ranges[i].end; slot++) { - total_keys += kvstoreDictSize(server.db[0].keys, slot); - kvstoreMoveDict(server.db[0].keys, keys, slot); - kvstoreMoveDict(server.db[0].expires, expires, slot); - estoreMoveEbuckets(server.db[0].subexpires, subexpires, slot); - } - } - - emptyDbDataAsync(keys, expires, subexpires); - - sds str = slotRangeArrayToString(slots); - serverLog(LL_NOTICE, "Background trim started for slots: %s to trim %zu keys.", str, total_keys); - sdsfree(str); - - /* Unblock master if blocked. This can only happen in a very unlikely case, - * trim job will be in pending list due to write pause and master will send - * commands for the slots that are waiting to be trimmed. Just keeping this - * call here for being defensive as it is harmless. */ - asmUnblockMasterAfterTrim(); -} - -/* Trim the slots. */ -void asmTrimSlots(slotRangeArray *slots) { - if (asmManager->debug_trim_method == ASM_DEBUG_TRIM_NONE) - return; - - /* Trigger active trim for the following cases: - * 1. Debug override: trim method is set to 'active'. - * 2. There are clients using client side caching (client tracking is enabled): - * There is no way to invalidate specific slots in the client tracking - * protocol. For now, we just use active trim to trim the slots. - * 3. Module subscribers: If any module is subscribed to TRIMMED event, we - * assume module needs per key notification and cannot use background trim. - */ - int activetrim = server.tracking_clients != 0 || - (asmManager->debug_trim_method == ASM_DEBUG_TRIM_ACTIVE) || - (asmManager->debug_trim_method == ASM_DEBUG_TRIM_DEFAULT && - moduleHasSubscribersForKeyspaceEvent(NOTIFY_KEY_TRIMMED)); - if (activetrim) - asmTriggerActiveTrim(slots); - else - asmTriggerBackgroundTrim(slots); -} - -/* Schedule a trim job for the specified slot ranges. The job will be - * deferred and handled later in asmBeforeSleep(). We delay the trim jobs to - * asmBeforeSleep() to ensure it only runs when there is no write pause. */ -void asmTrimJobSchedule(slotRangeArray *slots) { - listAddNodeTail(asmManager->pending_trim_jobs, slotRangeArrayDup(slots)); -} - -/* Process any pending trim jobs. */ -void asmTrimJobProcessPending(void) { - /* Check if there is any pending trim job and we can propagate it. */ - if (listLength(asmManager->pending_trim_jobs) == 0 || - asmManager->debug_trim_method == ASM_DEBUG_TRIM_NONE) - { - return; - } - - /* If this node is a replica, it should not initiate slot trimming actively. - * Cancel the trim job and unblock the master if it is blocked. */ - if (clusterNodeIsSlave(getMyClusterNode())) { - asmCancelPendingTrimJobs(); - asmUnblockMasterAfterTrim(); - return; - } - - /* Determine if we can start the trim job: - * - require client writes not paused (so key deletions are allowed) - * - require replicas not paused (so TRIMSLOTS can be propagated). - * - require trim is not disabled via RedisModule_ClusterDisableTrim(). - */ - static int logged = 0; - int disabled_by_module = server.cluster_module_trim_disablers > 0; - - if (isPausedActions(PAUSE_ACTION_CLIENT_WRITE) || - isPausedActions(PAUSE_ACTION_CLIENT_ALL) || - isPausedActions(PAUSE_ACTION_REPLICA) || - disabled_by_module) - { - if (logged == 0) { - logged = 1; - const char *reason = disabled_by_module ? "trim is disabled by module" : - "pause action is in effect"; - serverLog(LL_NOTICE, "Trim job is deferred since %s.", reason); - } - return; - } - logged = 0; - - listIter li; - listNode *ln; - listRewind(asmManager->pending_trim_jobs, &li); - while ((ln = listNext(&li)) != NULL) { - slotRangeArray *slots = listNodeValue(ln); - asmTrimSlots(slots); - propagateTrimSlots(slots); - listDelNode(asmManager->pending_trim_jobs, ln); - slotRangeArrayFree(slots); - } -} - -/* Trim keys in slots not owned by this node (if any). */ -void asmTrimSlotsIfNotOwned(slotRangeArray *slots) { - if (!server.cluster_enabled || !clusterNodeIsMaster(getMyClusterNode())) return; - - size_t num_keys = 0; - slotRangeArray *trim_slots = NULL; - for (int i = 0; i < slots->num_ranges; i++) { - for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { - if (clusterIsMySlot(j) || - kvstoreDictSize(server.db[0].keys, j) == 0 || - isSlotInTrimJob(j)) - { - continue; - } - - trim_slots = slotRangeArrayAppend(trim_slots, j); - num_keys += kvstoreDictSize(server.db[0].keys, j); - } - } - if (!trim_slots) return; - - sds str = slotRangeArrayToString(trim_slots); - serverLog(LL_NOTICE, - "Detected keys in slots that do not belong to this node. " - "Scheduling trim for %zu keys in slots: %s", num_keys, str); - sdsfree(str); - - asmTrimJobSchedule(trim_slots); - slotRangeArrayFree(trim_slots); -} - -/* Handle the master task when it is no longer used, trim unowned slots if necessary. - * This function is called when the replica is just promoted to master. */ -void asmFinalizeMasterTask(void) { - if (!server.cluster_enabled) return; - - asmTask *task = asmManager->master_task; - if (task == NULL) return; - - if (task->operation == ASM_IMPORT) { - /* Check if there is an ASM task that master did not finish. */ - if (task->state != ASM_COMPLETED && task->state != ASM_FAILED) { - sds slots_str = slotRangeArrayToString(task->slots); - serverLog(LL_WARNING, "Import task %s from old master failed: slots=%s", - task->id, slots_str); - sdsfree(slots_str); - /* Mark the task as failed and notify the replicas. */ - task->state = ASM_FAILED; - asmNotifyStateChange(task, ASM_EVENT_IMPORT_FAILED); - } - - /* Trim the slots if the import task is failed. */ - if (clusterNodeIsMaster(getMyClusterNode()) && task->state == ASM_FAILED) { - asmTrimSlotsIfNotOwned(task->slots); - } - } else if (task->operation == ASM_MIGRATE) { - /* For migrate tasks, attempt to trim slots if necessary. After ASM completed, - * the previous master may not have initiated slot trimming before the failover - * occurred. In that case, we need to initiate slot trimming here. - * However, if ASM failed, slot ownership did not change, so no slot trimming - * is needed. */ - if (clusterNodeIsMaster(getMyClusterNode()) && task->state != ASM_FAILED) { - asmTrimSlotsIfNotOwned(task->slots); - } - } - - /* Clear the master task since it is not a replica anymore. */ - asmTaskFree(asmManager->master_task); - asmManager->master_task = NULL; -} - -/* The replicas handle the master import ASM task information. */ -int asmReplicaHandleMasterTask(sds task_info) { - if (!server.cluster_enabled || !clusterNodeIsSlave(getMyClusterNode())) return C_ERR; - - /* If the master task is migrating, just clear it when receiving a new task info, - * even the task info is empty since it means the master finished the task. */ - if (asmManager->master_task && asmManager->master_task->operation == ASM_MIGRATE) { - asmTaskFree(asmManager->master_task); - asmManager->master_task = NULL; - } - - /* If the master task is empty, it means the master finished the task, the - * replica should check the slot ownership to decide to raise completed or - * failed event. */ - if (!task_info || sdslen(task_info) == 0) { - asmTask *task = asmManager->master_task; - if (task && task->state != ASM_COMPLETED && task->state != ASM_FAILED) { - /* Check if the slots are owned by the master. */ - int owned_by_master = 1; - for (int i = 0; i < task->slots->num_ranges; i++) { - slotRange *sr = &task->slots->ranges[i]; - for (int j = sr->start; j <= sr->end; j++) { - clusterNode *master = clusterNodeGetMaster(getMyClusterNode()); - if (!master || !clusterNodeCoversSlot(master, j)) { - owned_by_master = 0; - break; - } - } - } - if (owned_by_master) { - task->state = ASM_COMPLETED; - asmNotifyStateChange(task, ASM_EVENT_IMPORT_COMPLETED); - } else { - task->state = ASM_FAILED; - asmNotifyStateChange(task, ASM_EVENT_IMPORT_FAILED); - } - } - return C_OK; - } - - asmTask *task = asmTaskDeserialize(task_info); - if (!task) return C_ERR; - - /* For migrate task, replica just keeps the task info, doesn't notify any event. */ - if (task->operation == ASM_MIGRATE) { - if (asmManager->master_task) asmTaskFree(asmManager->master_task); - asmManager->master_task = task; - return C_OK; - } - - int notify_event = 0; - int event = asmTaskStateToEvent(task); - if (asmManager->master_task) { - /* Notify when the task or event is changed, to avoid duplicated notification. */ - if (strcmp(task->id, asmManager->master_task->id) != 0 || - event != asmTaskStateToEvent(asmManager->master_task)) - { - notify_event = 1; - } - asmTaskFree(asmManager->master_task); - } else { - /* Ignore completed or failed task when there is no active master task. */ - if (task->state != ASM_FAILED && task->state != ASM_COMPLETED) - notify_event = 1; - } - - asmManager->master_task = task; - if (notify_event) asmNotifyStateChange(task, event); - return C_OK; -} - -/* Cancel all pending trim jobs. */ -void asmCancelPendingTrimJobs(void) { - if (!asmManager) return; - - listIter li; - listNode *ln; - listRewind(asmManager->pending_trim_jobs, &li); - while ((ln = listNext(&li)) != NULL) { - slotRangeArray *slots = listNodeValue(ln); - listDelNode(asmManager->pending_trim_jobs, ln); - sds str = slotRangeArrayToString(slots); - serverLog(LL_NOTICE, "Cancelling the pending trim job for slots: %s", str); - sdsfree(str); - slotRangeArrayFree(slots); - } -} - -/* Cancel all pending and active trim jobs. */ -void asmCancelTrimJobs(void) { - if (!asmManager) return; - - /* Unblock master if blocked */ - asmUnblockMasterAfterTrim(); - - /* Cancel pending trim jobs */ - asmCancelPendingTrimJobs(); - - /* Cancel active trim jobs */ - if (listLength(asmManager->active_trim_jobs) == 0) - return; - - serverLog(LL_NOTICE, "Cancelling all active trim jobs"); - asmManager->active_trim_cancelled += listLength(asmManager->active_trim_jobs); - asmActiveTrimEnd(); - listEmpty(asmManager->active_trim_jobs); -} - -/* It's used to trim slots after the migration is completed or import is failed. - * TRIMSLOTS RANGES <numranges> <start-slot> <end-slot> ... */ -void trimslotsCommand(client *c) { - long numranges = 0; - - if (server.cluster_enabled == 0) { - addReplyError(c,"This instance has cluster support disabled"); - return; - } - - if (c->argc < 5) { - addReplyErrorArity(c); - return; - } - - /* Validate the ranges argument */ - if (strcasecmp(c->argv[1]->ptr, "ranges") != 0) { - addReplyError(c, "missing ranges argument"); - return; - } - - /* Get the number of ranges */ - if (getLongFromObjectOrReply(c, c->argv[2], &numranges, NULL) != C_OK) - return; - - /* Validate the number of ranges and argument count */ - if (numranges < 1 || numranges > CLUSTER_SLOTS || c->argc != 3 + numranges * 2) { - addReplyError(c, "invalid number of ranges"); - return; - } - - /* Parse the slot ranges and start trimming */ - slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 3); - if (!slots) return; - - if (c->id == CLIENT_ID_AOF) { - serverAssert(server.loading); - /* If we are loading the AOF, we can't trigger active trim because next - * command may have an update for the same key that is supposed to be - * trimmed. We have to trim the keys synchronously. */ - clusterDelKeysInSlotRangeArray(slots, 1); - } else { - /* We cannot trim any slot served by this node. */ - if (clusterNodeIsMaster(getMyClusterNode())) { - for (int i = 0; i < slots->num_ranges; i++) { - for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { - if (clusterCanAccessKeysInSlot(j)) { - addReplyErrorFormat(c, "the slot %d is served by this node", j); - slotRangeArrayFree(slots); - return; - } - } - } - } - asmTrimSlots(slots); - } - - /* Command will not be propagated automatically since it does not modify - * the dataset. */ - forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF); - - slotRangeArrayFree(slots); - addReply(c, shared.ok); -} - -/* Start the active trim job. */ -void asmActiveTrimStart(void) { - slotRangeArray *slots = listNodeValue(listFirst(asmManager->active_trim_jobs)); - - serverAssert(asmManager->active_trim_it == NULL); - asmManager->active_trim_it = slotRangeArrayGetIterator(slots); - asmManager->active_trim_started++; - asmManager->active_trim_current_job_keys = 0; - asmManager->active_trim_current_job_trimmed = 0; - - /* Count the number of keys to trim */ - asmManager->active_trim_current_job_keys += asmCountKeysInSlots(slots); - - RedisModuleClusterSlotMigrationTrimInfoV1 fsi = { - REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION, - (RedisModuleSlotRangeArray *) slots - }; - - moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, - REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_STARTED, - &fsi); - - sds str = slotRangeArrayToString(slots); - serverLog(LL_NOTICE, "Active trim initiated for slots: %s, to trim %llu keys.", - str, asmManager->active_trim_current_job_keys); - sdsfree(str); -} - -/* Schedule an active trim job. */ -void asmTriggerActiveTrim(slotRangeArray *slots) { - listAddNodeTail(asmManager->active_trim_jobs, slotRangeArrayDup(slots)); - sds str = slotRangeArrayToString(slots); - serverLog(LL_NOTICE, "Active trim scheduled for slots: %s", str); - sdsfree(str); - - /* Start an active trim job if no active trim job is running. */ - if (asmManager->active_trim_it == NULL) { - serverAssert(listLength(asmManager->active_trim_jobs) > 0); - asmActiveTrimStart(); - } -} - -/* End the active trim job. */ -void asmActiveTrimEnd(void) { - slotRangeArray *slots = listNodeValue(listFirst(asmManager->active_trim_jobs)); - - if (asmManager->active_trim_it) { - slotRangeArrayIteratorFree(asmManager->active_trim_it); - asmManager->active_trim_it = NULL; - } - - /* Unblock the master if it is blocked */ - asmUnblockMasterAfterTrim(); - - RedisModuleClusterSlotMigrationTrimInfoV1 fsi = { - REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION, - (RedisModuleSlotRangeArray *) slots - }; - - moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM, - REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_COMPLETED, - &fsi); - - sds str = slotRangeArrayToString(slots); - serverLog(LL_NOTICE, "Active trim completed for slots: %s, %llu keys trimmed.", - str, asmManager->active_trim_current_job_trimmed); - sdsfree(str); - listDelNode(asmManager->active_trim_jobs, listFirst(asmManager->active_trim_jobs)); - asmManager->active_trim_completed++; -} - -/* Check if the slot range array overlaps with any trim job. */ -int asmIsAnyTrimJobOverlaps(slotRangeArray *slots) { - if (!asmIsTrimInProgress()) return 0; - for (int i = 0; i < slots->num_ranges; i++) { - for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) { - if (isSlotInTrimJob(j)) return 1; - } - } - return 0; -} - -/* Check if there is any trim job in progress. */ -int asmIsTrimInProgress(void) { - if (!server.cluster_enabled) return 0; - return (listLength(asmManager->active_trim_jobs) != 0 || - listLength(asmManager->pending_trim_jobs) != 0); -} - - -/* Check if the command is accessing keys in a slot being trimmed. - * Return the slot if found, otherwise return -1. */ -int asmGetTrimmingSlotForCommand(struct redisCommand *cmd, robj **argv, int argc) { - if (!asmIsTrimInProgress()) return -1; - - /* Get the keys from the command */ - getKeysResult result = GETKEYS_RESULT_INIT; - int numkeys = getKeysFromCommand(cmd, argv, argc, &result); - - int last_checked_slot = -1; - for (int j = 0; j < numkeys; j++) { - robj *key = argv[result.keys[j].pos]; - int slot = keyHashSlot((char*) key->ptr, sdslen(key->ptr)); - if (slot == last_checked_slot) continue; - if (isSlotInTrimJob(slot)) { - getKeysFreeResult(&result); - return slot; - } - last_checked_slot = slot; - } - getKeysFreeResult(&result); - return -1; -} - -/* Delete the key and notify the modules. */ -void asmActiveTrimDeleteKey(redisDb *db, robj *keyobj) { - if (asmManager->debug_active_trim_delay > 0) - debugDelay(asmManager->debug_active_trim_delay); - - /* The key needs to be converted from static to heap before deletion. */ - int static_key = keyobj->refcount == OBJ_STATIC_REFCOUNT; - if (static_key) keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr)); - - dbDelete(db, keyobj); - keyModified(NULL, db, keyobj, NULL, 1); - /* The keys are not actually logically deleted from the database, just moved - * to another node. The modules need to know that these keys are no longer - * available locally, so just send the keyspace notification to the modules, - * but not to clients. */ - moduleNotifyKeyspaceEvent(NOTIFY_KEY_TRIMMED, "key_trimmed", keyobj, db->id); - asmManager->active_trim_current_job_trimmed++; - - if (static_key) decrRefCount(keyobj); -} - -/* Trim keys in the active trim job. */ -void asmActiveTrimCycle(void) { - if (asmManager->debug_active_trim_delay < 0 || - listLength(asmManager->active_trim_jobs) == 0) - { - return; - } - - /* Verify client pause is not in effect and trim is not disabled by module, - * so we can delete keys. */ - static int blocked = 0; - int disabled_by_module = server.cluster_module_trim_disablers > 0; - if (isPausedActions(PAUSE_ACTION_CLIENT_ALL) || - isPausedActions(PAUSE_ACTION_CLIENT_WRITE) || - disabled_by_module) - { - if (blocked == 0) { - blocked = 1; - const char *reason = disabled_by_module ? "trim is disabled by module" : - "pause action is in effect"; - serverLog(LL_NOTICE, "Active trim cycle is blocked since %s.", reason); - } - return; - } - if (blocked) serverLog(LL_NOTICE, "Active trim cycle is unblocked."); - blocked = 0; - - /* This works in a similar way to activeExpireCycle, in the sense that - * we do incremental work across calls. */ - const int trim_cycle_time_perc = 25; - int time_exceeded = 0; - long long start = ustime(), timelimit; - unsigned long long num_deleted = 0; - - /* Calculate the time limit in microseconds for this cycle. */ - timelimit = 1000000 * trim_cycle_time_perc / server.hz / 100; - if (timelimit <= 0) timelimit = 1; - - serverAssert(asmManager->active_trim_it); - int slot = slotRangeArrayGetCurrentSlot(asmManager->active_trim_it); - - while (!time_exceeded && slot != -1) { - dictEntry *de; - kvstoreDictIterator kvs_di; - kvstoreInitDictSafeIterator(&kvs_di, server.db[0].keys, slot); - while ((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) { - kvobj *kv = dictGetKV(de); - sds sdskey = kvobjGetKey(kv); - - enterExecutionUnit(1, 0); - robj *keyobj = createStringObject(sdskey, sdslen(sdskey)); - asmActiveTrimDeleteKey(&server.db[0], keyobj); - decrRefCount(keyobj); - exitExecutionUnit(); - postExecutionUnitOperations(); - num_deleted++; - - /* Once in 32 deletions check if we reached the time limit. */ - if (num_deleted % 32 == 0 && (ustime() - start) > timelimit) { - time_exceeded = 1; - break; - } - } - kvstoreResetDictIterator(&kvs_di); - if (!time_exceeded) slot = slotRangeArrayNext(asmManager->active_trim_it); - } - - if (slot == -1) { -#if defined(USE_JEMALLOC) - jemalloc_purge(); -#endif - asmActiveTrimEnd(); - - /* Immediately start the next trim job upon completion of the current - * one. Eliminates gaps in notifications so modules are informed about - * trimming unowned keys, which is important for modules that - * continuously filter unowned keys from their replies. */ - if (listLength(asmManager->active_trim_jobs) != 0) - asmActiveTrimStart(); - } -} - -/* Check if the key in a trim job. */ -int asmIsKeyInTrimJob(sds keyname) { - if (!asmIsTrimInProgress() || !isSlotInTrimJob(getKeySlot(keyname))) - return 0; - return 1; -} - -/* Modules can use RM_ClusterPropagateForSlotMigration() during the - * CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE event to propagate commands - * that should be delivered just before the slot snapshot delivery starts. */ -int asmModulePropagateBeforeSlotSnapshot(struct redisCommand *cmd, robj **argv, int argc) { - /* This API is only called in the fork child. */ - if (server.cluster_enabled == 0 || - server.in_fork_child != CHILD_TYPE_RDB || - listLength(asmManager->tasks) == 0) - { - errno = EBADF; - return C_ERR; - } - - /* Check if the task state is right. */ - asmTask *task = listNodeValue(listFirst(asmManager->tasks)); - if (task->operation != ASM_MIGRATE || - task->state != ASM_SEND_BULK_AND_STREAM || - task->pre_snapshot_module_cmds == NULL) - { - errno = EBADF; - return C_ERR; - } - - /* Ensure all arguments are converted to string encoding if necessary, - * since getSlotFromCommand expects them to be string-encoded. */ - for (int i = 0; i < argc; i++) { - if (!sdsEncodedObject(argv[i])) { - serverAssert(argv[i]->encoding == OBJ_ENCODING_INT); - robj *old = argv[i]; - argv[i] = createStringObjectFromLongLongWithSds((long)old->ptr); - decrRefCount(old); - } - } - - /* Crossslot commands are not allowed */ - int slot = getSlotFromCommand(cmd, argv, argc); - if (slot == CLUSTER_CROSSSLOT) { - errno = ENOTSUP; - return C_ERR; - } - - /* Allow no-keys commands or if keys are in the slot range. */ - slotRange sr = {slot, slot}; - if (slot != INVALID_CLUSTER_SLOT && !slotRangeArrayOverlaps(task->slots, &sr)) { - errno = ERANGE; - return C_ERR; - } - - robj **argvcopy = zmalloc(sizeof(robj*) * argc); - for (int i = 0; i < argc; i++) { - argvcopy[i] = argv[i]; - incrRefCount(argv[i]); - } - - redisOpArrayAppend(task->pre_snapshot_module_cmds, 0, argvcopy, argc, 0); - return C_OK; -} |
