summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/cluster_asm.c
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:52:54 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:52:54 +0100
commitdcacc00e3750300617ba6e16eb346713f91a783a (patch)
tree38e2d4fb5ed9d119711d4295c6eda4b014af73fd /examples/redis-unstable/src/cluster_asm.c
parent58dac10aeb8f5a041c46bddbeaf4c7966a99b998 (diff)
downloadcrep-dcacc00e3750300617ba6e16eb346713f91a783a.tar.gz
Remove testing data
Diffstat (limited to 'examples/redis-unstable/src/cluster_asm.c')
-rw-r--r--examples/redis-unstable/src/cluster_asm.c3602
1 files changed, 0 insertions, 3602 deletions
diff --git a/examples/redis-unstable/src/cluster_asm.c b/examples/redis-unstable/src/cluster_asm.c
deleted file mode 100644
index a090453..0000000
--- a/examples/redis-unstable/src/cluster_asm.c
+++ /dev/null
@@ -1,3602 +0,0 @@
-/* cluster_asm.c -- Atomic slot migration implementation for cluster
- *
- * Copyright (c) 2025-Present, Redis Ltd.
- * All rights reserved.
- *
- * Licensed under your choice of (a) the Redis Source Available License 2.0
- * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
- * GNU Affero General Public License v3 (AGPLv3).
- */
-
-#include "server.h"
-#include "cluster.h"
-#include "functions.h"
-#include "cluster_asm.h"
-#include "cluster_slot_stats.h"
-
-#define ASM_IMPORT (1 << 1)
-#define ASM_MIGRATE (1 << 2)
-
-#define ASM_DEBUG_TRIM_DEFAULT 0
-#define ASM_DEBUG_TRIM_NONE 1
-#define ASM_DEBUG_TRIM_BG 2
-#define ASM_DEBUG_TRIM_ACTIVE 3
-
-#define ASM_AOF_MIN_ITEMS_PER_KEY 512 /* Minimum number of items per key to use AOF format encoding */
-
-typedef struct asmTask {
- sds id; /* Task ID */
- int operation; /* Either ASM_IMPORT or ASM_MIGRATE */
- slotRangeArray *slots; /* List of slot ranges for this migration task */
- int state; /* Current state of the task */
- int dest_state; /* Destination node's main state (approximate) */
- char source[CLUSTER_NAMELEN]; /* Source node name */
- char dest[CLUSTER_NAMELEN]; /* Destination node name */
- clusterNode *source_node; /* Source node */
- connection *main_channel_conn; /* Main channel connection */
- connection *rdb_channel_conn; /* RDB channel connection */
- int rdb_channel_state; /* State of the RDB channel */
- unsigned long long dest_offset; /* Destination offset */
- unsigned long long source_offset; /* Source offset */
- int cross_slot_during_propagating; /* If cross-slot commands are encountered during propagating */
- int stream_eof_during_streaming; /* If STREAM-EOF is received during streaming buffer */
- replDataBuf sync_buffer; /* Buffer for the stream */
- client *main_channel_client; /* Client for the main channel on the source side */
- client *rdb_channel_client; /* Client for the RDB channel on the source side */
- long long retry_count; /* Number of retries for this task */
- mstime_t create_time; /* Task creation time */
- mstime_t start_time; /* Task start time */
- mstime_t end_time; /* Task end time */
- mstime_t paused_time; /* The time when the slot writes were paused */
- mstime_t dest_slots_snapshot_time; /* The time when the destination starts applying the slot snapshot */
- mstime_t dest_accum_applied_time; /* The time when the destination finishes applying the accumulated buffer */
- sds error; /* Error message for this task */
- redisOpArray *pre_snapshot_module_cmds; /* Module commands to be propagated at the beginning of slot migration */
-} asmTask;
-
-struct asmManager {
- list *tasks; /* List of asmTask to be processed */
- list *archived_tasks; /* List of archived asmTask */
- list *pending_trim_jobs; /* List of pending trim jobs (due to write pause) */
- list *active_trim_jobs; /* List of active trim jobs */
- slotRangeArrayIter *active_trim_it; /* Iterator of the current active trim job */
- size_t sync_buffer_peak; /* Peak size of sync buffer */
- asmTask *master_task; /* The task that is currently active on the master */
-
- /* Fail point injection for debugging */
- int debug_fail_channel; /* Channel where the task will fail */
- int debug_fail_state; /* State where the task will fail */
- int debug_trim_method; /* Method to trim the buffer */
- int debug_active_trim_delay; /* Sleep before trimming each key */
-
- /* Active trim stats */
- unsigned long long active_trim_started; /* Number of times active trim was started */
- unsigned long long active_trim_completed; /* Number of times active trim was completed */
- unsigned long long active_trim_cancelled; /* Number of times active trim was cancelled */
- unsigned long long active_trim_current_job_keys; /* Total number of keys to trim in the current job */
- unsigned long long active_trim_current_job_trimmed; /* Number of keys trimmed in the current job */
-};
-
-enum asmState {
- /* Common state */
- ASM_NONE = 0,
- ASM_CONNECTING,
- ASM_AUTH_REPLY,
- ASM_CANCELED,
- ASM_FAILED,
- ASM_COMPLETED,
-
- /* Import state */
- ASM_SEND_HANDSHAKE,
- ASM_HANDSHAKE_REPLY,
- ASM_SEND_SYNCSLOTS,
- ASM_SYNCSLOTS_REPLY,
- ASM_INIT_RDBCHANNEL,
- ASM_ACCUMULATE_BUF,
- ASM_READY_TO_STREAM,
- ASM_STREAMING_BUF,
- ASM_WAIT_STREAM_EOF,
- ASM_TAKEOVER,
-
- /* Migrate state */
- ASM_WAIT_RDBCHANNEL,
- ASM_WAIT_BGSAVE_START,
- ASM_SEND_BULK_AND_STREAM,
- ASM_SEND_STREAM,
- ASM_HANDOFF_PREP,
- ASM_HANDOFF,
- ASM_STREAM_EOF,
-
- /* RDB channel state */
- ASM_RDBCHANNEL_REQUEST,
- ASM_RDBCHANNEL_REPLY,
- ASM_RDBCHANNEL_TRANSFER,
-};
-
-enum asmChannel {
- ASM_IMPORT_MAIN_CHANNEL = 1, /* Main channel for the import task */
- ASM_IMPORT_RDB_CHANNEL, /* RDB channel for the import task */
- ASM_MIGRATE_MAIN_CHANNEL, /* Main channel for the migrate task */
- ASM_MIGRATE_RDB_CHANNEL /* RDB channel for the migrate task */
-};
-
-/* Global ASM manager */
-struct asmManager *asmManager = NULL;
-
-/* replication.c */
-char *sendCommand(connection *conn, ...);
-char *sendCommandArgv(connection *conn, int argc, char **argv, size_t *argv_lens);
-char *receiveSynchronousResponse(connection *conn);
-ConnectionType *connTypeOfReplication(void);
-int startBgsaveForReplication(int mincapa, int req);
-void createReplicationBacklogIfNeeded(void);
-/* cluster.c */
-void createDumpPayload(rio *payload, robj *o, robj *key, int dbid, int skip_checksum);
-/* cluster_asm.c */
-static void asmStartImportTask(asmTask *task);
-static void asmTaskCancel(asmTask *task, const char *reason);
-static void asmSyncBufferReadFromConn(connection *conn);
-static void propagateTrimSlots(slotRangeArray *slots);
-void asmTrimJobSchedule(slotRangeArray *slots);
-void asmTrimJobProcessPending(void);
-void asmCancelPendingTrimJobs(void);
-void asmTriggerActiveTrim(slotRangeArray *slots);
-void asmActiveTrimEnd(void);
-int asmIsAnyTrimJobOverlaps(slotRangeArray *slots);
-void asmTrimSlotsIfNotOwned(slotRangeArray *slots);
-void asmNotifyStateChange(asmTask *task, int event);
-
-void asmInit(void) {
- asmManager = zcalloc(sizeof(*asmManager));
- asmManager->tasks = listCreate();
- asmManager->archived_tasks = listCreate();
- asmManager->pending_trim_jobs = listCreate();
- asmManager->sync_buffer_peak = 0;
- asmManager->master_task = NULL;
- asmManager->debug_fail_channel = -1;
- asmManager->debug_fail_state = -1;
- asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT;
- asmManager->debug_active_trim_delay = 0;
- asmManager->active_trim_jobs = listCreate();
- asmManager->active_trim_started = 0;
- asmManager->active_trim_completed = 0;
- asmManager->active_trim_cancelled = 0;
- listSetFreeMethod(asmManager->active_trim_jobs, slotRangeArrayFreeGeneric);
-}
-
-char *asmTaskStateToString(int state) {
- switch (state) {
- case ASM_NONE: return "none";
- case ASM_CONNECTING: return "connecting";
- case ASM_AUTH_REPLY: return "auth-reply";
- case ASM_CANCELED: return "canceled";
- case ASM_FAILED: return "failed";
- case ASM_COMPLETED: return "completed";
-
- /* Import state */
- case ASM_SEND_HANDSHAKE: return "send-handshake";
- case ASM_HANDSHAKE_REPLY: return "handshake-reply";
- case ASM_SEND_SYNCSLOTS: return "send-syncslots";
- case ASM_SYNCSLOTS_REPLY: return "syncslots-reply";
- case ASM_INIT_RDBCHANNEL: return "init-rdbchannel";
- case ASM_ACCUMULATE_BUF: return "accumulate-buffer";
- case ASM_READY_TO_STREAM: return "ready-to-stream";
- case ASM_STREAMING_BUF: return "streaming-buffer";
- case ASM_WAIT_STREAM_EOF: return "wait-stream-eof";
- case ASM_TAKEOVER: return "takeover";
-
- /* Migrate state */
- case ASM_WAIT_RDBCHANNEL: return "wait-rdbchannel";
- case ASM_WAIT_BGSAVE_START: return "wait-bgsave-start";
- case ASM_SEND_BULK_AND_STREAM: return "send-bulk-and-stream";
- case ASM_SEND_STREAM: return "send-stream";
- case ASM_HANDOFF_PREP: return "handoff-prep";
- case ASM_HANDOFF: return "handoff";
- case ASM_STREAM_EOF: return "stream-eof";
-
- /* RDB channel state */
- case ASM_RDBCHANNEL_REQUEST: return "rdbchannel-request";
- case ASM_RDBCHANNEL_REPLY: return "rdbchannel-reply";
- case ASM_RDBCHANNEL_TRANSFER: return "rdbchannel-transfer";
-
- default: return "unknown";
- }
- serverAssert(0); /* Unreachable */
-}
-
-const char *asmChannelToString(int channel) {
- switch (channel) {
- case ASM_IMPORT_MAIN_CHANNEL: return "import-main-channel";
- case ASM_IMPORT_RDB_CHANNEL: return "import-rdb-channel";
- case ASM_MIGRATE_MAIN_CHANNEL: return "migrate-main-channel";
- case ASM_MIGRATE_RDB_CHANNEL: return "migrate-rdb-channel";
- default: return "unknown";
- }
-}
-
-int asmDebugSetFailPoint(char *channel, char *state) {
- if (!asmManager) {
- serverLog(LL_WARNING, "ASM manager is not initialized");
- return C_ERR;
- }
- asmManager->debug_fail_channel = -1;
- asmManager->debug_fail_state = -1;
- if (!channel && !state) return C_ERR;
- if (sdslen(channel) == 0 && sdslen(state) == 0) {
- serverLog(LL_WARNING, "ASM fail point is cleared");
- return C_OK;
- }
-
- for (int i = ASM_IMPORT_MAIN_CHANNEL; i <= ASM_MIGRATE_RDB_CHANNEL; i++) {
- if (!strcasecmp(channel, asmChannelToString(i))) {
- asmManager->debug_fail_channel = i;
- break;
- }
- }
- if (asmManager->debug_fail_channel == -1) return C_ERR;
-
- for (int i = ASM_NONE; i <= ASM_RDBCHANNEL_TRANSFER; i++) {
- if (!strcasecmp(state, asmTaskStateToString(i))) {
- asmManager->debug_fail_state = i;
- break;
- }
- }
- if (asmManager->debug_fail_state == -1) return C_ERR;
-
- serverLog(LL_NOTICE, "ASM fail point set: channel=%s, state=%s", channel, state);
- return C_OK;
-}
-
-int asmDebugSetTrimMethod(const char *method, int active_trim_delay) {
- if (!asmManager) {
- serverLog(LL_WARNING, "ASM manager is not initialized");
- return C_ERR;
- }
- int prev = asmManager->debug_trim_method;
- if (!strcasecmp(method, "default")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_DEFAULT;
- else if (!strcasecmp(method, "none")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_NONE;
- else if (!strcasecmp(method, "bg")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_BG;
- else if (!strcasecmp(method, "active")) asmManager->debug_trim_method = ASM_DEBUG_TRIM_ACTIVE;
- else return C_ERR;
-
- /* If we are switching from none to default, delete all the keys in the
- * slots we don't own */
- if (prev == ASM_DEBUG_TRIM_NONE && asmManager->debug_trim_method != ASM_DEBUG_TRIM_NONE) {
- for (int i = 0; i < CLUSTER_SLOTS; i++)
- if (!clusterIsMySlot(i))
- clusterDelKeysInSlot(i, 0);
- }
- asmManager->debug_active_trim_delay = active_trim_delay;
- serverLog(LL_NOTICE, "ASM trim method was set=%s, active_trim_delay=%d", method, active_trim_delay);
- return C_OK;
-}
-
-int asmDebugIsFailPointActive(int channel, int state) {
- if (!asmManager) return 0; /* ASM manager not initialized */
- if (asmManager->debug_fail_channel == channel && asmManager->debug_fail_state == state) {
- serverLog(LL_NOTICE, "ASM fail point active: channel=%s, state=%s",
- asmChannelToString(channel), asmTaskStateToString(state));
- return 1;
- }
- return 0;
-}
-
-sds asmCatInfoString(sds info) {
- int active_tasks = 0;
-
- listIter li;
- listNode *ln;
- listRewind(asmManager->tasks, &li);
- while ((ln = listNext(&li)) != NULL) {
- asmTask *task = listNodeValue(ln);
- if (task->operation == ASM_IMPORT ||
- (task->operation == ASM_MIGRATE && task->state != ASM_FAILED))
- {
- active_tasks++;
- }
- }
-
- return sdscatprintf(info ? info : sdsempty(),
- "cluster_slot_migration_active_tasks:%d\r\n"
- "cluster_slot_migration_active_trim_running:%lu\r\n"
- "cluster_slot_migration_active_trim_current_job_keys:%llu\r\n"
- "cluster_slot_migration_active_trim_current_job_trimmed:%llu\r\n"
- "cluster_slot_migration_stats_active_trim_started:%llu\r\n"
- "cluster_slot_migration_stats_active_trim_completed:%llu\r\n"
- "cluster_slot_migration_stats_active_trim_cancelled:%llu\r\n",
- active_tasks,
- listLength(asmManager->active_trim_jobs),
- asmManager->active_trim_current_job_keys,
- asmManager->active_trim_current_job_trimmed,
- asmManager->active_trim_started,
- asmManager->active_trim_completed,
- asmManager->active_trim_cancelled);
-}
-
-void asmTaskReset(asmTask *task) {
- task->state = ASM_NONE;
- task->dest_state = ASM_NONE;
- task->rdb_channel_state = ASM_NONE;
- task->main_channel_conn = NULL;
- task->rdb_channel_conn = NULL;
- task->dest_offset = 0;
- task->source_offset = 0;
- task->stream_eof_during_streaming = 0;
- task->cross_slot_during_propagating = 0;
- replDataBufInit(&task->sync_buffer);
- task->main_channel_client = NULL;
- task->rdb_channel_client = NULL;
- task->paused_time = 0;
- task->dest_slots_snapshot_time = 0;
- task->dest_accum_applied_time = 0;
- task->pre_snapshot_module_cmds = NULL;
-}
-
-asmTask *asmTaskCreate(const char *task_id) {
- asmTask *task = zcalloc(sizeof(*task));
- task->error = sdsempty();
- asmTaskReset(task);
- task->slots = NULL;
- task->source_node = NULL;
- task->retry_count = 0;
- task->create_time = server.mstime;
- task->start_time = -1;
- task->end_time = -1;
- if (task_id) {
- task->id = sdsnew(task_id);
- } else {
- task->id = sdsnewlen(NULL, CLUSTER_NAMELEN);
- getRandomHexChars(task->id, CLUSTER_NAMELEN);
- }
-
- return task;
-}
-
-void asmTaskFree(asmTask *task) {
- replDataBufClear(&task->sync_buffer);
- sdsfree(task->id);
- slotRangeArrayFree(task->slots);
- sdsfree(task->error);
- zfree(task);
-}
-
-/* Convert the task state to the corresponding event. */
-int asmTaskStateToEvent(asmTask *task) {
- if (task->operation == ASM_IMPORT) {
- if (task->state == ASM_COMPLETED) return ASM_EVENT_IMPORT_COMPLETED;
- else if (task->state == ASM_FAILED) return ASM_EVENT_IMPORT_FAILED;
- else return ASM_EVENT_IMPORT_STARTED;
- } else {
- if (task->state == ASM_COMPLETED) return ASM_EVENT_MIGRATE_COMPLETED;
- else if (task->state == ASM_FAILED) return ASM_EVENT_MIGRATE_FAILED;
- else return ASM_EVENT_MIGRATE_STARTED;
- }
-}
-
-/* Serialize ASM task information into a string for transmission to replicas.
- * Format: "task_id:source_node:dest_node:operation:state:slot_ranges"
- * Where slot_ranges is in the format "1000-2000 3000-4000 ..." */
-sds asmTaskSerialize(asmTask *task) {
- sds serialized = sdsempty();
-
- /* Add task ID */
- serialized = sdscatprintf(serialized, "%s:", task->id);
-
- /* Add source node ID (40 chars) */
- serialized = sdscatlen(serialized, task->source, CLUSTER_NAMELEN);
- serialized = sdscat(serialized, ":");
-
- /* Add destination node ID (40 chars) */
- serialized = sdscatlen(serialized, task->dest, CLUSTER_NAMELEN);
- serialized = sdscat(serialized, ":");
-
- /* Add operation type */
- serialized = sdscatprintf(serialized, "%s:", task->operation == ASM_IMPORT ?
- "import" : "migrate");
-
- /* Add current state */
- serialized = sdscatprintf(serialized, "%s:", asmTaskStateToString(task->state));
-
- /* Add slot ranges sds */
- sds slots_str = slotRangeArrayToString(task->slots);
- serialized = sdscatprintf(serialized, "%s", slots_str);
- sdsfree(slots_str);
-
- return serialized;
-}
-
-/* Deserialize ASM task information from a string and create a complete asmTask.
- * Format: "task_id:source_node:dest_node:operation:state:slot_ranges"
- * Returns a new asmTask on success, NULL on failure. */
-asmTask *asmTaskDeserialize(sds data) {
- int count, idx = 0;
- asmTask *task = NULL;
- if (!data || sdslen(data) == 0) return NULL;
-
- sds *parts = sdssplitlen(data, sdslen(data), ":", 1, &count);
- if (count < 6) goto err;
-
- /* Parse task ID */
- if (sdslen(parts[idx]) == 0) goto err;
- task = asmTaskCreate(parts[idx]);
- if (!task) goto err;
- idx++;
-
- /* Parse source node ID */
- if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err;
- memcpy(task->source, parts[idx], CLUSTER_NAMELEN);
- idx++;
-
- /* Parse destination node ID */
- if (sdslen(parts[idx]) != CLUSTER_NAMELEN) goto err;
- memcpy(task->dest, parts[idx], CLUSTER_NAMELEN);
- idx++;
-
- /* Parse operation type */
- if (!strcasecmp(parts[idx], "import")) {
- task->operation = ASM_IMPORT;
- } else if (!strcasecmp(parts[idx], "migrate")) {
- task->operation = ASM_MIGRATE;
- } else {
- goto err;
- }
- idx++;
-
- /* Parse state */
- task->state = ASM_NONE; /* Default state */
- for (int state = ASM_NONE; state <= ASM_RDBCHANNEL_TRANSFER; state++) {
- if (!strcasecmp(parts[idx], asmTaskStateToString(state))) {
- task->state = state;
- break;
- }
- }
- idx++;
-
- /* Parse slot ranges */
- task->slots = slotRangeArrayFromString(parts[idx]);
- if (!task->slots) goto err;
- idx++;
-
- /* Ignore any extra fields for future compatibility */
-
- sdsfreesplitres(parts, count);
- return task;
-
-err:
- if (task) asmTaskFree(task);
- sdsfreesplitres(parts, count);
- return NULL;
-}
-
-/* Notify replicas about ASM task information to maintain consistency during
- * slot migration. This function sends a CLUSTER SYNCSLOTS CONF ASM-TASK command
- * to all connected replicas with the serialized task information. */
-void asmNotifyReplicasStateChange(struct asmTask *task) {
- if (!server.cluster_enabled || !clusterNodeIsMaster(getMyClusterNode())) return;
-
- /* Create command arguments for CLUSTER SYNCSLOTS CONF ASM-TASK */
- robj *argv[5];
- argv[0] = createStringObject("CLUSTER", 7);
- argv[1] = createStringObject("SYNCSLOTS", 9);
- argv[2] = createStringObject("CONF", 4);
- argv[3] = createStringObject("ASM-TASK", 8);
- argv[4] = createObject(OBJ_STRING, asmTaskSerialize(task));
-
- /* Send the command to all replicas */
- replicationFeedSlaves(server.slaves, -1, argv, 5);
-
- /* Clean up command objects */
- for (int i = 0; i < 5; i++) {
- decrRefCount(argv[i]);
- }
-}
-
-/* Dump the active import ASM task information. */
-sds asmDumpActiveImportTask(void) {
- if (!server.cluster_enabled) return NULL;
-
- /* For replica, dump the master active task. */
- if (clusterNodeIsSlave(getMyClusterNode()) &&
- asmManager->master_task &&
- asmManager->master_task->state != ASM_FAILED &&
- asmManager->master_task->state != ASM_COMPLETED)
- {
- return asmTaskSerialize(asmManager->master_task);
- }
-
- /* For master, dump the first active task. */
- if (!asmManager || listLength(asmManager->tasks) == 0) return NULL;
- asmTask *task = listNodeValue(listFirst(asmManager->tasks));
- if (task->state == ASM_NONE || task->state == ASM_FAILED ||
- task->state == ASM_COMPLETED) return NULL;
-
- return asmTaskSerialize(task);
-}
-
-size_t asmGetPeakSyncBufferSize(void) {
- if (!asmManager) return 0;
- /* Compute peak sync buffer usage. The current task's peak may not
- * reflect in asmManager->sync_buffer_peak immediately. */
- size_t peak = asmManager->sync_buffer_peak;
- asmTask *task = listFirst(asmManager->tasks) ?
- listNodeValue(listFirst(asmManager->tasks)) : NULL;
- if (task && task->operation == ASM_IMPORT)
- peak = max(task->sync_buffer.peak, asmManager->sync_buffer_peak);
-
- return peak;
-}
-
-size_t asmGetImportInputBufferSize(void) {
- if (!asmManager || listLength(asmManager->tasks) == 0) return 0;
-
- asmTask *task = listNodeValue(listFirst(asmManager->tasks));
- if (task->operation == ASM_IMPORT)
- return task->sync_buffer.mem_used;
-
- return 0;
-}
-
-size_t asmGetMigrateOutputBufferSize(void) {
- if (!asmManager || listLength(asmManager->tasks) == 0) return 0;
-
- asmTask *task = listNodeValue(listFirst(asmManager->tasks));
- if (task->operation == ASM_MIGRATE && task->main_channel_client)
- return getClientOutputBufferMemoryUsage(task->main_channel_client);
-
- return 0;
-}
-
-/* Returns the ASM task with the given ID, or NULL if no such task exists. */
-static asmTask *asmLookupTaskAt(list *tasks, const char *id) {
- listIter li;
- listNode *ln;
-
- listRewind(tasks, &li);
- while ((ln = listNext(&li)) != NULL) {
- asmTask *task = listNodeValue(ln);
- if (!strcmp(task->id, id)) return task;
- }
- return NULL;
-}
-
-/* Returns the ASM task with the given ID, or NULL if no such task exists. */
-asmTask *asmLookupTaskById(const char *id) {
- return asmLookupTaskAt(asmManager->tasks, id);
-}
-
-/* Returns the ASM task that is identical to the given slot range array, or NULL
- * if no such task exists. */
-asmTask *asmLookupTaskBySlotRangeArray(slotRangeArray *slots) {
- listIter li;
- listNode *ln;
-
- listRewind(asmManager->tasks, &li);
- while ((ln = listNext(&li)) != NULL) {
- asmTask *task = listNodeValue(ln);
- if (slotRangeArrayIsEqual(task->slots, slots))
- return task;
- }
- return NULL;
-}
-
-/* Returns the slot range array for the given task ID */
-slotRangeArray *asmTaskGetSlotRanges(const char *task_id) {
- asmTask *task = NULL;
- if (!task_id || (task = asmLookupTaskById(task_id)) == NULL) return NULL;
-
- return task->slots;
-}
-
-/* Returns 1 if the slot range array overlaps with the given slot range. */
-static int slotRangeArrayOverlaps(slotRangeArray *slots, slotRange *req) {
- for (int i = 0; i < slots->num_ranges; i++) {
- slotRange *sr = &slots->ranges[i];
- if (sr->start <= req->end && sr->end >= req->start)
- return 1;
- }
- return 0;
-}
-
-/* Returns 1 if the two slot range arrays overlap, 0 otherwise. */
-static int slotRangeArraysOverlap(slotRangeArray *slots1, slotRangeArray *slots2) {
- for (int i = 0; i < slots1->num_ranges; i++) {
- slotRange *sr1 = &slots1->ranges[i];
- if (slotRangeArrayOverlaps(slots2, sr1)) return 1;
- }
- return 0;
-}
-
-/* Returns the ASM task that overlaps with the given slot range, or NULL if
- * no such task exists. */
-static asmTask *lookupAsmTaskBySlotRange(slotRange *req) {
- listIter li;
- listNode *ln;
-
- listRewind(asmManager->tasks, &li);
- while ((ln = listNext(&li)) != NULL) {
- asmTask *task = listNodeValue(ln);
- if (slotRangeArrayOverlaps(task->slots, req))
- return task;
- }
- return NULL;
-}
-
-/* Validates the given slot ranges for a migration task:
- * - Ensures the current node is a master.
- * - Verifies all slots are in a STABLE state.
- * - Confirms all slots belong to a single source node.
- * - Confirms no ongoing import task that overlaps with the slot ranges.
- *
- * Returns the source node if validation succeeds.
- * Otherwise, returns NULL and sets 'err' variable. */
-static clusterNode *validateImportSlotRanges(slotRangeArray *slots, sds *err, asmTask *current) {
- clusterNode *source = NULL;
-
- *err = NULL;
-
- /* Ensure this is a master node */
- if (!clusterNodeIsMaster(getMyClusterNode())) {
- *err = sdsnew("slot migration not allowed on replica.");
- goto out;
- }
-
- /* Ensure no manual migration is in progress. */
- for (int i = 0; i < CLUSTER_SLOTS; i++) {
- if (getImportingSlotSource(i) != NULL ||
- getMigratingSlotDest(i) != NULL)
- {
- *err = sdsnew("all slot states must be STABLE to start a slot migration task.");
- goto out;
- }
- }
-
- for (int i = 0; i < slots->num_ranges; i++) {
- slotRange *sr = &slots->ranges[i];
-
- /* Ensure no import task overlaps with this slot range.
- * Skip check current task that is running for this slot range. */
- asmTask *task = lookupAsmTaskBySlotRange(sr);
- if (task && task != current && task->operation == ASM_IMPORT) {
- *err = sdscatprintf(sdsempty(),
- "overlapping import exists for slot range: %d-%d",
- sr->start, sr->end);
- goto out;
- }
-
- /* Validate if we can start migration task for this slot range. */
- for (int j = sr->start; j <= sr->end; j++) {
- clusterNode *node = getNodeBySlot(j);
- if (node == NULL) {
- *err = sdscatprintf(sdsempty(), "slot has no owner: %d", j);
- goto out;
- }
-
- if (!source) {
- source = node;
- } else if (source != node) {
- *err = sdsnew("slots belong to different source nodes");
- goto out;
- }
- }
- }
-
-out:
- return *err ? NULL : source;
-}
-
-/* Returns 1 if a task with the specified operation is in progress, 0 otherwise. */
-static int asmTaskInProgress(int operation) {
- listIter li;
- listNode *ln;
-
- if (!asmManager || listLength(asmManager->tasks) == 0) return 0;
-
- listRewind(asmManager->tasks, &li);
- while ((ln = listNext(&li)) != NULL) {
- asmTask *task = listNodeValue(ln);
- if (task->operation == operation) return 1;
- }
- return 0;
-}
-
-/* Returns 1 if a migrate task is in progress, 0 otherwise. */
-int asmMigrateInProgress(void) {
- return asmTaskInProgress(ASM_MIGRATE);
-}
-
-/* Returns 1 if an import task is in progress, 0 otherwise. */
-int asmImportInProgress(void) {
- return asmTaskInProgress(ASM_IMPORT);
-}
-
-/* Returns 1 if the task is in a state where it can receive replication stream
-* for the slot range, 0 otherwise. */
-inline static int asmCanFeedMigrationClient(asmTask *task) {
- return task->operation == ASM_MIGRATE &&
- !task->cross_slot_during_propagating &&
- (task->state == ASM_SEND_BULK_AND_STREAM ||
- task->state == ASM_SEND_STREAM ||
- task->state == ASM_HANDOFF_PREP);
-}
-
-/* Feed the migration client with the replication stream for the slot range. */
-void asmFeedMigrationClient(robj **argv, int argc) {
- asmTask *task = NULL;
-
- if (server.cluster_enabled == 0 || listLength(asmManager->tasks) == 0)
- return;
-
- /* Check if there is a migrate task that can receive replication stream. */
- task = listNodeValue(listFirst(asmManager->tasks));
- if (!asmCanFeedMigrationClient(task)) return;
-
- /* Ensure all arguments are converted to string encoding if necessary,
- * since getSlotFromCommand expects them to be string-encoded.
- * Generally the arguments are string-encoded, but we may rewrite
- * the command arguments to integer encoding. */
- for (int i = 0; i < argc; i++) {
- if (!sdsEncodedObject(argv[i])) {
- serverAssert(argv[i]->encoding == OBJ_ENCODING_INT);
- robj *old = argv[i];
- argv[i] = createStringObjectFromLongLongWithSds((long)old->ptr);
- decrRefCount(old);
- }
- }
-
- /* Check if the command belongs to the slot range. */
- struct redisCommand *cmd = lookupCommand(argv, argc);
- serverAssert(cmd);
-
- int slot = getSlotFromCommand(cmd, argv, argc);
-
- /* If the command does not have keys, skip it now.
- * SELECT is not propagated, since we only support a single db in cluster mode.
- * MULTI/EXEC is not needed, since transaction semantics are unnecessary
- * before the slot handoff.
- * FUNCTION subcommands should be executed on all nodes, so here we skip it,
- * and even propagating them may cause an error when executing.
- *
- * NOTICE: if some keyless commands should be propagated to the destination,
- * we should identify them here and send. */
- if (slot == INVALID_CLUSTER_SLOT) return;
-
- /* Generally we reject cross-slot commands before executing, but module may
- * replicate this kind of command, so we check again. To guarantee data
- * consistency, we cancel the task if we encounter a cross-slot command. */
- if (slot == CLUSTER_CROSSSLOT) {
- /* We cannot cancel the task directly here, since it may lead to a recursive
- * call: asmTaskCancel() --> moduleFireServerEvent() --> moduleFreeContext()
- * --> postExecutionUnitOperations() --> propagateNow(). Even worse, this
- * could result in propagating pending commands to the replication stream twice.
- * To avoid this, we simply set a flag here, cancel the task in beforeSleep. */
- task->cross_slot_during_propagating = 1;
- return;
- }
-
- /* Check if the slot belongs to the task's slot range. */
- slotRange sr = {slot, slot};
- if (!slotRangeArrayOverlaps(task->slots, &sr)) return;
-
- if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state)))
- freeClientAsync(task->main_channel_client);
-
- /* Feed main channel with the command. */
- client *c = task->main_channel_client;
- size_t prev_bytes = getNormalClientPendingReplyBytes(c);
-
- addReplyArrayLen(c, argc);
- for (int i = 0; i < argc; i++)
- addReplyBulk(c, argv[i]);
-
- /* Update the task's source offset to reflect the bytes sent. */
- task->source_offset += (getNormalClientPendingReplyBytes(c) - prev_bytes);
-}
-
-asmTask *asmCreateImportTask(const char *task_id, slotRangeArray *slots, sds *err) {
- clusterNode *source;
-
- *err = NULL;
- /* Validate that the slot ranges are valid and that migration can be
- * initiated for them. */
- source = validateImportSlotRanges(slots, err, NULL);
- if (!source)
- goto err;
-
- if (source == getMyClusterNode()) {
- *err = sdsnew("this node is already the owner of the slot range");
- goto err;
- }
-
- /* Only support a single task at a time now. */
- if (listLength(asmManager->tasks) != 0) {
- asmTask *current = listNodeValue(listFirst(asmManager->tasks));
- if (current->state == ASM_FAILED) {
- /* We can create a new import task only if the current one is failed,
- * cancel the failed task to create a new one. */
- asmTaskCancel(current, "new import requested");
- } else {
- *err = sdsnew("another ASM task is already in progress");
- goto err;
- }
- }
- /* There should be no task in progress. */
- serverAssert(listLength(asmManager->tasks) == 0);
-
- /* Create a slot migration task */
- asmTask *task = asmTaskCreate(task_id);
- task->slots = slots;
- task->state = ASM_NONE;
- task->operation = ASM_IMPORT;
- task->source_node = source;
- memcpy(task->source, clusterNodeGetName(source), CLUSTER_NAMELEN);
- memcpy(task->dest, getMyClusterId(), CLUSTER_NAMELEN);
-
- listAddNodeTail(asmManager->tasks, task);
- sds slots_str = slotRangeArrayToString(slots);
- serverLog(LL_NOTICE, "Import task %s created: src=%.40s, dest=%.40s, slots=%s",
- task->id, task->source, task->dest, slots_str);
- sdsfree(slots_str);
-
- return task;
-
-err:
- slotRangeArrayFree(slots);
- return NULL;
-}
-
-/* CLUSTER MIGRATION IMPORT <start-slot end-slot [start-slot end-slot ...]>
- *
- * Sent by operator to the destination node to start the migration. */
-static void clusterMigrationCommandImport(client *c) {
- /* Validate slot range arg count */
- int remaining = c->argc - 3;
- if (remaining == 0 || remaining % 2 != 0) {
- addReplyErrorArity(c);
- return;
- }
-
- slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 3);
- if (!slots) return;
-
- sds err = NULL;
- asmTask *task = asmCreateImportTask(NULL, slots, &err);
- if (!task) {
- addReplyErrorSds(c, err);
- return;
- }
-
- addReplyBulkCString(c, task->id);
-}
-
-/* CLUSTER MIGRATION CANCEL [ID <task-id> | ALL]
- * - Reply: Number of cancelled tasks
- *
- * Cancels import tasks that overlap with the specified slot ranges.
- * Multiple tasks may be cancelled. */
-static void clusterMigrationCommandCancel(client *c) {
- sds task_id = NULL;
- int num_cancelled = 0;
-
- /* Validate slot range arg count */
- if (c->argc != 4 && c->argc != 5) {
- addReplyErrorArity(c);
- return;
- }
-
- if (!strcasecmp(c->argv[3]->ptr, "id")) {
- if (c->argc != 5) {
- addReplyErrorArity(c);
- return;
- }
- task_id = c->argv[4]->ptr;
- } else if (!strcasecmp(c->argv[3]->ptr, "all")) {
- if (c->argc != 4) {
- addReplyErrorArity(c);
- return;
- }
- } else {
- addReplyError(c, "unknown argument");
- return;
- }
-
- num_cancelled = clusterAsmCancel(task_id, "user request");
- addReplyLongLong(c, num_cancelled);
-}
-
-/* Reply with the status of the task. */
-static void replyTaskStatus(client *c, asmTask *task) {
- mstime_t p = 0;
-
- addReplyMapLen(c, 12);
- addReplyBulkCString(c, "id");
- addReplyBulkCString(c, task->id);
- addReplyBulkCString(c, "slots");
- addReplyBulkSds(c, slotRangeArrayToString(task->slots));
- addReplyBulkCString(c, "source");
- addReplyBulkCBuffer(c, task->source, CLUSTER_NAMELEN);
- addReplyBulkCString(c, "dest");
- addReplyBulkCBuffer(c, task->dest, CLUSTER_NAMELEN);
- addReplyBulkCString(c, "operation");
- addReplyBulkCString(c, task->operation == ASM_IMPORT ? "import" : "migrate");
- addReplyBulkCString(c, "state");
- addReplyBulkCString(c, asmTaskStateToString(task->state));
- addReplyBulkCString(c, "last_error");
- addReplyBulkCBuffer(c, task->error, sdslen(task->error));
- addReplyBulkCString(c, "retries");
- addReplyLongLong(c, task->retry_count);
- addReplyBulkCString(c, "create_time");
- addReplyLongLong(c, task->create_time);
- addReplyBulkCString(c, "start_time");
- addReplyLongLong(c, task->start_time);
- addReplyBulkCString(c, "end_time");
- addReplyLongLong(c, task->end_time);
-
- if (task->operation == ASM_MIGRATE && task->state == ASM_COMPLETED)
- p = task->end_time - task->paused_time;
- addReplyBulkCString(c, "write_pause_ms");
- addReplyLongLong(c, p);
-}
-
-/* CLUSTER MIGRATION STATUS [ID <task-id> | ALL]
- * - Reply: Array of atomic slot migration tasks */
-static void clusterMigrationCommandStatus(client *c) {
- listIter li;
- listNode *ln;
-
- if (c->argc != 4 && c->argc != 5) {
- addReplyErrorArity(c);
- return;
- }
-
- if (!strcasecmp(c->argv[3]->ptr, "id")) {
- if (c->argc != 5) {
- addReplyErrorArity(c);
- return;
- }
- sds id = c->argv[4]->ptr;
- asmTask *task = asmLookupTaskAt(asmManager->tasks, id);
- if (!task) task = asmLookupTaskAt(asmManager->archived_tasks, id);
- if (!task) {
- addReplyArrayLen(c, 0);
- return;
- }
-
- addReplyArrayLen(c, 1);
- replyTaskStatus(c, task);
- } else if (!strcasecmp(c->argv[3]->ptr, "all")) {
- if (c->argc != 4) {
- addReplyErrorArity(c);
- return;
- }
- addReplyArrayLen(c, listLength(asmManager->tasks) +
- listLength(asmManager->archived_tasks));
- listRewind(asmManager->tasks, &li);
- while ((ln = listNext(&li)) != NULL)
- replyTaskStatus(c, listNodeValue(ln));
-
- listRewind(asmManager->archived_tasks, &li);
- while ((ln = listNext(&li)) != NULL)
- replyTaskStatus(c, listNodeValue(ln));
- } else {
- addReplyError(c, "unknown argument");
- return;
- }
-}
-
-/* CLUSTER MIGRATION
- * <IMPORT <start-slot end-slot [start-slot end-slot ...]> |
- * STATUS [ID <task-id> | ALL] |
- * CANCEL [ID <task-id> | ALL]>
-*/
-void clusterMigrationCommand(client *c) {
- if (c->argc < 4) {
- addReplyErrorArity(c);
- return;
- }
-
- if (strcasecmp(c->argv[2]->ptr, "import") == 0) {
- clusterMigrationCommandImport(c);
- } else if (strcasecmp(c->argv[2]->ptr, "status") == 0) {
- clusterMigrationCommandStatus(c);
- } else if (strcasecmp(c->argv[2]->ptr, "cancel") == 0) {
- clusterMigrationCommandCancel(c);
- } else {
- addReplyError(c, "unknown argument");
- }
-}
-
-/* Return the number of keys in the specified slot ranges. */
-unsigned long long asmCountKeysInSlots(slotRangeArray *slots) {
- if (!slots) return 0;
-
- unsigned long long key_count = 0;
- for (int i = 0; i < slots->num_ranges; i++) {
- for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) {
- key_count += kvstoreDictSize(server.db[0].keys, j);
- }
- }
- return key_count;
-}
-
-/* Log a human-readable message for ASM task lifecycle events. */
-void asmLogTaskEvent(asmTask *task, int event) {
- sds str = slotRangeArrayToString(task->slots);
-
- switch (event) {
- case ASM_EVENT_IMPORT_STARTED:
- serverLog(LL_NOTICE, "Import task %s started for slots: %s", task->id, str);
- break;
- case ASM_EVENT_IMPORT_FAILED:
- serverLog(LL_NOTICE, "Import task %s failed for slots: %s", task->id, str);
- break;
- case ASM_EVENT_TAKEOVER:
- serverLog(LL_NOTICE, "Import task %s is ready to takeover slots: %s", task->id, str);
- break;
- case ASM_EVENT_IMPORT_COMPLETED:
- serverLog(LL_NOTICE, "Import task %s completed for slots: %s (imported %llu keys)",
- task->id, str, asmCountKeysInSlots(task->slots));
- break;
- case ASM_EVENT_MIGRATE_STARTED:
- serverLog(LL_NOTICE, "Migrate task %s started for slots: %s (keys at start: %llu)",
- task->id, str, asmCountKeysInSlots(task->slots));
- break;
- case ASM_EVENT_MIGRATE_FAILED:
- serverLog(LL_NOTICE, "Migrate task %s failed for slots: %s", task->id, str);
- break;
- case ASM_EVENT_HANDOFF_PREP:
- serverLog(LL_NOTICE, "Migrate task %s preparing to handoff for slots: %s", task->id, str);
- break;
- case ASM_EVENT_MIGRATE_COMPLETED:
- serverLog(LL_NOTICE, "Migrate task %s completed for slots: %s (migrated %llu keys)",
- task->id, str, asmCountKeysInSlots(task->slots));
- break;
- default:
- break;
- }
-
- sdsfree(str);
-}
-
-/* Notify the state change to the module and the cluster implementation. */
-void asmNotifyStateChange(asmTask *task, int event) {
- RedisModuleClusterSlotMigrationInfo info = {
- .version = REDISMODULE_CLUSTER_SLOT_MIGRATION_INFO_VERSION,
- .task_id = task->id,
- .slots = (RedisModuleSlotRangeArray *) task->slots
- };
- memcpy(info.source_node_id, task->source, CLUSTER_NAMELEN);
- memcpy(info.destination_node_id, task->dest, CLUSTER_NAMELEN);
-
- int module_event = -1;
- if (event == ASM_EVENT_IMPORT_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_STARTED;
- else if (event == ASM_EVENT_IMPORT_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_COMPLETED;
- else if (event == ASM_EVENT_IMPORT_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_FAILED;
- else if (event == ASM_EVENT_MIGRATE_STARTED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_STARTED;
- else if (event == ASM_EVENT_MIGRATE_COMPLETED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED;
- else if (event == ASM_EVENT_MIGRATE_FAILED) module_event = REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_FAILED;
- serverAssert(module_event != -1);
-
- moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION, module_event, &info);
- serverLog(LL_DEBUG, "Fire cluster asm module event, task %s: state=%s",
- task->id, asmTaskStateToString(task->state));
-
- if (clusterNodeIsMaster(getMyClusterNode())) {
- /* Notify the cluster impl only if it is a real active import task. */
- if (task != asmManager->master_task) {
- asmLogTaskEvent(task, event);
- clusterAsmOnEvent(task->id, event, task->slots);
- }
- asmNotifyReplicasStateChange(task); /* Propagate state change to replicas */
- }
-}
-
-void asmImportSetFailed(asmTask *task) {
- serverAssert(task->operation == ASM_IMPORT);
- if (task->state == ASM_FAILED) return;
-
- /* If we are in the RDB channel transfer state, we need to
- * close the client that was created for the RDB channel. */
- if (task->rdb_channel_conn && task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER) {
- client *c = connGetPrivateData(task->rdb_channel_conn);
- serverAssert(c->task == task);
- task->rdb_channel_conn = NULL;
- c->task = NULL;
- c->flags &= ~CLIENT_MASTER;
- freeClientAsync(c);
- }
-
- /* If in the wait stream EOF or streaming buffer state, we need to close the
- * client that was created for the main channel. */
- if (task->main_channel_conn &&
- (task->state == ASM_STREAMING_BUF || task->state == ASM_WAIT_STREAM_EOF))
- {
- client *c = connGetPrivateData(task->main_channel_conn);
- serverAssert(c->task == task);
- task->main_channel_conn = NULL;
- c->task = NULL;
- c->flags &= ~CLIENT_MASTER;
- freeClientAsync(c);
- }
-
- /* Close the connections */
- if (task->rdb_channel_conn) connClose(task->rdb_channel_conn);
- if (task->main_channel_conn) connClose(task->main_channel_conn);
- task->rdb_channel_conn = NULL;
- task->main_channel_conn = NULL;
-
- /* Clear the replication data buffer */
- asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak, task->sync_buffer.peak);
- replDataBufClear(&task->sync_buffer);
-
- /* Mark the task as failed and notify the cluster */
- task->state = ASM_FAILED;
- asmNotifyStateChange(task, ASM_EVENT_IMPORT_FAILED);
- /* This node may become replica, only master can setup new slot trimming jobs. */
- if (clusterNodeIsMaster(getMyClusterNode()))
- asmTrimJobSchedule(task->slots);
-}
-
-void asmMigrateSetFailed(asmTask *task) {
- serverAssert(task->operation == ASM_MIGRATE);
- if (task->state == ASM_FAILED) return;
-
- /* Close the RDB and main channel clients*/
- if (task->rdb_channel_client) {
- task->rdb_channel_client->task = NULL;
- freeClientAsync(task->rdb_channel_client);
- task->rdb_channel_client = NULL;
- }
- if (task->main_channel_client) {
- task->main_channel_client->task = NULL;
- freeClientAsync(task->main_channel_client);
- task->main_channel_client = NULL;
- }
-
- /* Actually it is not necessary to clear the sync buffer here,
- * to make asmTaskReset work properly after migrate task failed */
- replDataBufClear(&task->sync_buffer);
-
- /* Mark the task as failed and notify the cluster */
- task->state = ASM_FAILED;
- asmNotifyStateChange(task, ASM_EVENT_MIGRATE_FAILED);
-}
-
-void asmTaskSetFailed(asmTask *task, const char *fmt, ...) {
- va_list ap;
- sds error = sdsempty();
-
- /* Set the error message */
- va_start(ap, fmt);
- error = sdscatvprintf(error, fmt, ap);
- va_end(ap);
- error = sdscatprintf(error, " (state: %s, rdb_channel_state: %s)",
- asmTaskStateToString(task->state),
- asmTaskStateToString(task->rdb_channel_state));
- sdsfree(task->error);
- task->error = error;
-
- /* Log the error */
- sds slots_str = slotRangeArrayToString(task->slots);
- serverLog(LL_WARNING, "%s task %s failed: slots=%s, err=%s",
- task->operation == ASM_IMPORT ? "Import" : "Migrate",
- task->id, slots_str, task->error);
- sdsfree(slots_str);
-
- if (task->operation == ASM_IMPORT)
- asmImportSetFailed(task);
- else
- asmMigrateSetFailed(task);
-}
-
-/* The task is completed or canceled. Update stats and move it to
- * the archived list. */
-void asmTaskFinalize(asmTask *task) {
- listNode *ln = listFirst(asmManager->tasks);
- serverAssert(ln->value == task);
-
- task->source_node = NULL; /* Should never access it */
- task->end_time = server.mstime;
-
- if (task->operation == ASM_IMPORT) {
- asmManager->sync_buffer_peak = max(asmManager->sync_buffer_peak,
- task->sync_buffer.peak);
- replDataBufClear(&task->sync_buffer); /* Not used, so save memory */
- }
-
- /* Move the task to the archived list */
- listUnlinkNode(asmManager->tasks, ln);
- listLinkNodeHead(asmManager->archived_tasks, ln);
-}
-
-static void asmTaskCancel(asmTask *task, const char *reason) {
- if (task->state == ASM_CANCELED) return;
-
- asmTaskSetFailed(task, "Cancelled due to %s", reason);
- task->state = ASM_CANCELED;
- asmTaskFinalize(task);
-}
-
-void asmImportTakeover(asmTask *task) {
- serverAssert(task->state == ASM_WAIT_STREAM_EOF ||
- task->state == ASM_STREAMING_BUF);
-
- /* Free the main channel connection since it is no longer needed. */
- serverAssert(task->main_channel_conn != NULL);
- client *c = connGetPrivateData(task->main_channel_conn);
- c->task = NULL;
- c->flags &= ~CLIENT_MASTER;
- freeClientAsync(c);
- task->main_channel_conn = NULL;
-
- task->state = ASM_TAKEOVER;
- asmLogTaskEvent(task, ASM_EVENT_TAKEOVER);
- clusterAsmOnEvent(task->id, ASM_EVENT_TAKEOVER, task->slots);
-}
-
-void asmCallbackOnFreeClient(client *c) {
- asmTask *task = c->task;
- if (!task) return;
-
- /* If the RDB channel connection is closed, mark the task as failed. */
- if (c->conn && task->rdb_channel_conn == c->conn) {
- /* We create the client only when transferring data on the RDB channel */
- serverAssert(task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER);
- task->rdb_channel_conn = NULL; /* Will be freed by freeClient */
- c->flags &= ~CLIENT_MASTER;
- asmTaskSetFailed(task, "RDB channel - Connection is closed");
- return;
- }
-
- if (c->conn && task->main_channel_conn == c->conn) {
- /* After or in the process of streaming buffer to DB, a client will be
- * created based on the main channel connection. */
- serverAssert(task->state == ASM_STREAMING_BUF ||
- task->state == ASM_WAIT_STREAM_EOF);
- task->main_channel_conn = NULL; /* Will be freed by freeClient */
- c->flags &= ~CLIENT_MASTER;
- asmTaskSetFailed(task, "Main channel - Connection is closed");
- return;
- }
-
- if (c == task->rdb_channel_client) {
- /* TODO: Detect whether the bgsave is completed successfully and
- * update the state properly. */
- task->rdb_channel_state = ASM_COMPLETED;
- /* We may not have detected whether the child process has exited yet,
- * so we can't determine whether the client has completed the slots
- * snapshot transfer. If the RDB channel is interrupted unexpectedly,
- * the destination side will also close the main channel.
- * So here we just reset the RDB channel client of task. */
- task->rdb_channel_client = NULL;
- return;
- }
-
- /* If the main channel client is closed, we need to mark the task as failed
- * and clean up the RDB channel client if it exists. */
- if (c == task->main_channel_client) {
- task->main_channel_client = NULL;
- /* The rdb channel client will be cleaned up */
- asmTaskSetFailed(task, "Main and RDB channel clients are disconnected.");
- return;
- }
-}
-
-/* Sends an AUTH command to the source node using the internal secret.
- * Returns an error string if the command fails, or NULL on success. */
-char *asmSendInternalAuth(connection *conn) {
- size_t len = 0;
- const char *internal_secret = clusterGetSecret(&len);
- serverAssert(internal_secret != NULL);
-
- sds secret = sdsnewlen(internal_secret, len);
- char *err = sendCommand(conn, "AUTH", "internal connection", secret, NULL);
- sdsfree(secret);
- return err;
-}
-
-/* Handles the RDB channel sync with the source node.
- * This function is called when the RDB channel is established
- * and ready to sync with the source node. */
-void asmRdbChannelSyncWithSource(connection *conn) {
- asmTask *task = connGetPrivateData(conn);
- char *err = NULL;
- sds task_error_msg = NULL;
-
- /* Check for errors in the socket: after a non blocking connect() we
- * may find that the socket is in error state. */
- if (connGetState(conn) != CONN_STATE_CONNECTED)
- goto error;
-
- /* Check if the task is in a fail point state */
- if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) {
- char buf[1];
- /* Simulate a failure by shutting down the connection. On some operating
- * systems (e.g. Linux), the socket's receive buffer is not flushed
- * immediately, so we issue a dummy read to drain any pending data and
- * surface the error condition.
- * using shutdown() instead of connShutdown() because connTLSShutdown()
- * will free the connection directly, which is not what we want. */
- shutdown(conn->fd, SHUT_RDWR);
- connRead(conn, buf, 1);
- }
-
- if (task->rdb_channel_state == ASM_CONNECTING) {
- connSetReadHandler(conn, asmRdbChannelSyncWithSource);
- connSetWriteHandler(conn, NULL);
-
- /* Send AUTH command to source node using internal auth */
- err = asmSendInternalAuth(conn);
- if (err) goto write_error;
- task->rdb_channel_state = ASM_AUTH_REPLY;
- return;
- }
-
- if (task->rdb_channel_state == ASM_AUTH_REPLY) {
- err = receiveSynchronousResponse(conn);
- /* The source node did not reply */
- if (err == NULL) goto no_response_error;
-
- /* Check `+OK` reply */
- if (!strcmp(err, "+OK")) {
- sdsfree(err);
- err = NULL;
- task->rdb_channel_state = ASM_RDBCHANNEL_REQUEST;
- serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots rdb channel operation can continue...");
- } else {
- task_error_msg = sdscatprintf(sdsempty(),
- "Error reply to AUTH from source: %s", err);
- sdsfree(err);
- goto error;
- }
- }
-
- if (task->rdb_channel_state == ASM_RDBCHANNEL_REQUEST) {
- err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "RDBCHANNEL", task->id, NULL);
- if (err) goto write_error;
- task->rdb_channel_state = ASM_RDBCHANNEL_REPLY;
- return;
- }
-
- if (task->rdb_channel_state == ASM_RDBCHANNEL_REPLY) {
- err = receiveSynchronousResponse(conn);
- /* The source node did not reply */
- if (err == NULL) goto no_response_error;
-
- /* Ignore ‘\n' sent from the source node to keep the connection alive. */
- if (sdslen(err) == 0) {
- serverLog(LL_DEBUG, "Received an empty line in RDBCHANNEL reply, slots snapshot delivery will start later");
- sdsfree(err);
- return;
- }
-
- /* Check `+SLOTSSNAPSHOT` reply */
- if (!strncmp(err, "+SLOTSSNAPSHOT", strlen("+SLOTSSNAPSHOT"))) {
- sdsfree(err);
- err = NULL;
- task->state = ASM_ACCUMULATE_BUF;
- /* The main channel buffers pending commands. */
- connSetReadHandler(task->main_channel_conn, asmSyncBufferReadFromConn);
-
- task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER;
- client *c = createClient(conn);
- c->flags |= (CLIENT_MASTER | CLIENT_INTERNAL | CLIENT_ASM_IMPORTING);
- c->querybuf = sdsempty();
- c->authenticated = 1;
- c->user = NULL;
- c->task = task;
- serverLog(LL_NOTICE,
- "Source node replied to SLOTSSNAPSHOT, syncing slots snapshot can continue...");
- } else {
- task_error_msg = sdscatprintf(sdsempty(),
- "Error reply to CLUSTER SYNCSLOTS RDBCHANNEL from the source: %s", err);
- sdsfree(err);
- goto error;
- }
- return;
- }
- return;
-
-no_response_error:
- task_error_msg = sdsnew("Source node did not respond to command during RDBCHANNELSYNCSLOTS handshake");
- /* Fall through to regular error handling */
-
-error:
- asmTaskSetFailed(task, "RDB channel - Failed to sync with the source node: %s",
- task_error_msg ? task_error_msg : connGetLastError(conn));
- sdsfree(task_error_msg);
- return;
-
-write_error: /* Handle sendCommand() errors. */
- task_error_msg = sdscatprintf(sdsempty(), "Failed to send command to the source node: %s", err);
- sdsfree(err);
- goto error;
-}
-
-char *asmSendSlotRangesSync(connection *conn, asmTask *task) {
- /* Prepare CLUSTER SYNCSLOTS SYNC command */
- serverAssert(task->slots->num_ranges <= CLUSTER_SLOTS);
- int argc = task->slots->num_ranges * 2 + 4;
- char **args = zcalloc(sizeof(char*) * argc);
- size_t *lens = zcalloc(sizeof(size_t) * argc);
-
- args[0] = "CLUSTER";
- args[1] = "SYNCSLOTS";
- args[2] = "SYNC";
- args[3] = task->id;
- lens[0] = strlen("CLUSTER");
- lens[1] = strlen("SYNCSLOTS");
- lens[2] = strlen("SYNC");
- lens[3] = sdslen(task->id);
-
- int i = 4;
- for (int j = 0; j < task->slots->num_ranges; j++) {
- slotRange *sr = &task->slots->ranges[j];
- args[i] = sdscatprintf(sdsempty(), "%d", sr->start);
- lens[i] = sdslen(args[i]);
- args[i+1] = sdscatprintf(sdsempty(), "%d", sr->end);
- lens[i+1] = sdslen(args[i+1]);
- i += 2;
- }
- serverAssert(i == argc);
-
- /* Send command to source node */
- char *err = sendCommandArgv(conn, argc, args, lens);
-
- /* Free allocated memory */
- for (int j = 4; j < argc; j++) {
- sdsfree(args[j]);
- }
- zfree(args);
- zfree(lens);
-
- return err;
-}
-
-void asmSyncWithSource(connection *conn) {
- asmTask *task = connGetPrivateData(conn);
- char *err = NULL;
-
- /* Some task errors are not network issues, we record them explicitly. */
- sds task_error_msg = NULL;
-
- /* Check for errors in the socket: after a non blocking connect() we
- * may find that the socket is in error state. */
- if (connGetState(conn) != CONN_STATE_CONNECTED)
- goto error;
-
- /* Check if the fail point is active for this channel and state */
- if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_MAIN_CHANNEL, task->state))) {
- char buf[1];
- shutdown(conn->fd, SHUT_RDWR);
- connRead(conn, buf, 1);
- }
-
- if (task->state == ASM_CONNECTING) {
- connSetReadHandler(conn, asmSyncWithSource);
- connSetWriteHandler(conn, NULL);
- /* Send AUTH command to source node using internal auth */
- err = asmSendInternalAuth(conn);
- if (err) goto write_error;
- task->state = ASM_AUTH_REPLY;
- return;
- }
-
- if (task->state == ASM_AUTH_REPLY) {
- err = receiveSynchronousResponse(conn);
- /* The source node did not reply */
- if (err == NULL) goto no_response_error;
-
- /* Check `+OK` reply */
- if (!strcmp(err, "+OK")) {
- sdsfree(err);
- err = NULL;
- task->state = ASM_SEND_HANDSHAKE;
- serverLog(LL_NOTICE, "Source node replied to AUTH command, syncslots can continue...");
- } else {
- task_error_msg = sdscatprintf(sdsempty(),
- "Error reply to AUTH from the source: %s", err);
- sdsfree(err);
- goto error;
- }
- }
-
- if (task->state == ASM_SEND_HANDSHAKE) {
- sds node_id = sdsnewlen(clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN);
- err = sendCommand(conn, "CLUSTER", "SYNCSLOTS", "CONF", "NODE-ID", node_id, NULL);
- sdsfree(node_id);
- if (err) goto write_error;
- task->state = ASM_HANDSHAKE_REPLY;
- return;
- }
-
- if (task->state == ASM_HANDSHAKE_REPLY) {
- err = receiveSynchronousResponse(conn);
- /* The source node did not reply */
- if (err == NULL) goto no_response_error;
-
- /* Check `+OK` reply */
- if (!strcmp(err, "+OK")) {
- sdsfree(err);
- err = NULL;
- task->state = ASM_SEND_SYNCSLOTS;
- serverLog(LL_NOTICE, "Source node replied to SYNCSLOTS CONF command, syncslots can continue...");
- } else {
- task_error_msg = sdscatprintf(sdsempty(),
- "Error reply to CLUSTER SYNCSLOTS CONF from the source: %s", err);
- sdsfree(err);
- goto error;
- }
- }
-
- if (task->state == ASM_SEND_SYNCSLOTS) {
- err = asmSendSlotRangesSync(conn, task);
- if (err) goto write_error;
-
- task->state = ASM_SYNCSLOTS_REPLY;
- return;
- }
-
- if (task->state == ASM_SYNCSLOTS_REPLY) {
- err = receiveSynchronousResponse(conn);
- /* The source node did not reply */
- if (err == NULL) goto no_response_error;
-
- /* Check `+RDBCHANNELSYNCSLOTS` reply */
- if (!strncmp(err, "+RDBCHANNELSYNCSLOTS", strlen("+RDBCHANNELSYNCSLOTS"))) {
- sdsfree(err);
- err = NULL;
- task->state = ASM_INIT_RDBCHANNEL;
- serverLog(LL_NOTICE,
- "Source node replied to SYNCSLOTS SYNC, syncslots can continue...");
- } else if (!strncmp(err, "-NOTREADY", strlen("-NOTREADY"))) {
- /* The source-side cluster is temporarily not ready to start a
- * migration and replied -NOTREADY. We could fail this attempt and
- * let the import task start another attempt later but that could
- * trigger unnecessary cleanup in the cluster implementation.
- * Instead, we'll retry sending SYNCSLOTS later in asmCron(). */
- sdsfree(err);
- task->state = ASM_SEND_SYNCSLOTS;
- serverLog(LL_NOTICE,
- "Source node replied to SYNCSLOTS SYNC with -NOTREADY, will retry later...");
- return;
- } else {
- task_error_msg = sdscatprintf(sdsempty(),
- "Error reply to CLUSTER SYNCSLOTS SYNC from the source: %s", err);
- sdsfree(err);
- goto error;
- }
- }
-
- if (task->state == ASM_INIT_RDBCHANNEL) {
- /* Create RDB channel connection */
- char *ip = clusterNodeIp(task->source_node);
- int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) :
- clusterNodeTcpPort(task->source_node);
- task->rdb_channel_conn = connCreate(server.el, connTypeOfReplication());
- if (connConnect(task->rdb_channel_conn, ip, port,
- server.bind_source_addr, asmRdbChannelSyncWithSource) == C_ERR)
- {
- serverLog(LL_WARNING, "Unable to connect to the source node: %s",
- connGetLastError(task->rdb_channel_conn));
- goto error;
- }
- task->rdb_channel_state = ASM_CONNECTING;
- connSetPrivateData(task->rdb_channel_conn, task);
- serverLog(LL_NOTICE,
- "RDB channel connection to source node %.40s established, waiting for AUTH reply...",
- task->source);
-
- /* Main channel waits for the new event */
- connSetReadHandler(conn, NULL);
- return;
- }
- return;
-
-no_response_error:
- serverLog(LL_WARNING, "Source node did not respond to command during SYNCSLOTS handshake");
- /* Fall through to regular error handling */
-
-error:
- asmTaskSetFailed(task, "Main channel - Failed to sync with source node: %s",
- task_error_msg ? task_error_msg : connGetLastError(conn));
- sdsfree(task_error_msg);
- return;
-
-write_error: /* Handle sendCommand() errors. */
- serverLog(LL_WARNING, "Failed to send command to source node: %s", err);
- sdsfree(err);
- goto error;
-}
-
-int asmImportSendACK(asmTask *task) {
- serverAssert(task->operation == ASM_IMPORT && task->state == ASM_WAIT_STREAM_EOF);
- serverLog(LL_DEBUG, "Destination node applied offset is %lld", task->dest_offset);
-
- char offset[64];
- ull2string(offset, sizeof(offset), task->dest_offset);
-
- char *err = sendCommand(task->main_channel_conn, "CLUSTER", "SYNCSLOTS", "ACK",
- asmTaskStateToString(task->state), offset, NULL);
- if (err) {
- asmTaskSetFailed(task, "Main channel - Failed to send ACK: %s", err);
- sdsfree(err);
- return C_ERR;
- }
- return C_OK;
-}
-
-/* Called when the RDB channel begins sending the snapshot.
- * From this point on, the main channel also starts sending incremental streams. */
-void asmSlotSnapshotAndStreamStart(struct asmTask *task) {
- if (task == NULL || task->state != ASM_WAIT_BGSAVE_START) return;
-
- if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_RDB_CHANNEL, task->state))) {
- shutdown(task->rdb_channel_client->conn->fd, SHUT_RDWR);
- return;
- }
- task->main_channel_client->replstate = SLAVE_STATE_SEND_BULK_AND_STREAM;
-
- task->state = ASM_SEND_BULK_AND_STREAM;
- task->rdb_channel_state = ASM_RDBCHANNEL_TRANSFER;
-
- /* From the source node's perspective, the destination node begins to accumulate
- * the buffer while the RDB channel starts applying the slot snapshot data. */
- task->dest_state = ASM_ACCUMULATE_BUF;
- task->dest_slots_snapshot_time = server.mstime;
-}
-
-/* Called when the RDB channel has succeeded in sending the snapshot. */
-void asmSlotSnapshotSucceed(struct asmTask *task) {
- if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return;
-
- /* The destination starts sending ACKs to keep the main channel alive after
- * receiving the snapshot, so here we need to update the last interaction
- * time to avoid false timeout. */
- task->main_channel_client->lastinteraction = server.unixtime;
-
- task->state = ASM_SEND_STREAM;
- task->rdb_channel_state = ASM_COMPLETED;
-}
-
-/* Called when the RDB channel fails to send the snapshot. */
-void asmSlotSnapshotFailed(struct asmTask *task) {
- if (task == NULL || task->state != ASM_SEND_BULK_AND_STREAM) return;
-
- asmTaskSetFailed(task, "RDB channel - Failed to send slots snapshot");
-}
-
-/* CLUSTER SYNCSLOTS SNAPSHOT-EOF
- *
- * This command is sent by the source node to the destination node to indicate
- * that the slots snapshot has ended. */
-void clusterSyncSlotsSnapshotEOF(client *c) {
- /* This client is RDB channel connection. */
- asmTask *task = c->task;
- if (!task || task->rdb_channel_state != ASM_RDBCHANNEL_TRANSFER ||
- c->conn != task->rdb_channel_conn)
- {
- /* Unexpected SNAPSHOT-EOF command */
- serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS SNAPSHOT-EOF command: "
- "rdb_channel_state=%s",
- asmTaskStateToString(task ? task->rdb_channel_state : ASM_NONE));
- freeClientAsync(c);
- return;
- }
-
- /* RDB channel state: ASM_RDBCHANNEL_TRANSFER */
- if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_RDB_CHANNEL, task->rdb_channel_state))) {
- freeClientAsync(c); /* Simulate a failure */
- return;
- }
-
- /* Clear the RDB channel connection */
- task->rdb_channel_conn = NULL;
- task->rdb_channel_state = ASM_COMPLETED;
- serverLog(LL_NOTICE, "RDB channel snapshot transfer completed for the import task.");
-
- /* Free the RDB channel connection. */
- c->task = NULL;
- c->flags &= ~CLIENT_MASTER;
- freeClientAsync(c);
-
- /* Will start streaming the buffer to DB, don't start here since now
- * we are in the context of executing command, otherwise, redis will
- * generate a big MULTI-EXEC including all the commands in the buffer.
- * just update the state here, and do it in beforeSleep(). */
- task->state = ASM_READY_TO_STREAM;
- connSetReadHandler(task->main_channel_conn, NULL);
-}
-
-/* CLUSTER SYNCSLOTS STREAM-EOF
- *
- * This command is sent by the source node to the destination node to indicate
- * that the slot sync stream has ended and the slots can be handed off. */
-void clusterSyncSlotsStreamEOF(client *c) {
- asmTask *task = c->task;
-
- if (!task || task->operation != ASM_IMPORT) {
- serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF command");
- freeClientAsync(c);
- return;
- }
-
- if (task->state == ASM_STREAMING_BUF) {
- /* We are still streaming the buffer to DB, mark the EOF received, and we
- * can take over after streaming is EOF. Since we may release the context
- * in asmImportTakeover, this breaks the context for streaming buffer. */
- task->stream_eof_during_streaming = 1;
- serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received during streaming buffer");
- return;
- }
-
- if (task->state != ASM_WAIT_STREAM_EOF) {
- serverLog(LL_WARNING, "Unexpected CLUSTER SYNCSLOTS STREAM-EOF state: %s",
- asmTaskStateToString(task->state));
- freeClientAsync(c);
- return;
- }
- serverLog(LL_NOTICE, "CLUSTER SYNCSLOTS STREAM-EOF received when waiting for STREAM-EOF");
-
- /* STREAM-EOF received, the source is ready to handoff, takeover now. */
- asmImportTakeover(task);
-}
-
-/* Start the import task. */
-static void asmStartImportTask(asmTask *task) {
- if (task->operation != ASM_IMPORT || task->state != ASM_NONE) return;
- sds slots_str = slotRangeArrayToString(task->slots);
-
- /* Sanity check: Clean up any keys that exist in slots not owned by this node.
- * This handles cases where users previously migrated slots using legacy method
- * but left behind orphaned keys, or maybe cluster missed cleaning up during
- * previous operations, which could interfere with the ASM import process. */
- asmTrimSlotsIfNotOwned(task->slots);
-
- /* Check if there is any trim job in progress for the slot ranges.
- * We can't start the import task since the trim job will modify the data.*/
- int trim_in_progress = asmIsAnyTrimJobOverlaps(task->slots);
-
- /* Notify the cluster implementation to prepare for the import task. */
- int impl_ret = clusterAsmOnEvent(task->id, ASM_EVENT_IMPORT_PREP, task->slots);
-
- /* We do not start the import task if trim is disabled by module. */
- int disabled_by_module = server.cluster_module_trim_disablers > 0;
-
- static int start_blocked_logged = 0;
- /* Cannot start import task since pause action is performed. Otherwise, we
- * will break the promise that no writes are performed during the pause. */
- if (isPausedActions(PAUSE_ACTION_CLIENT_ALL) ||
- isPausedActions(PAUSE_ACTION_CLIENT_WRITE) ||
- trim_in_progress ||
- impl_ret != C_OK ||
- disabled_by_module)
- {
- const char *reason = disabled_by_module ? "trim is disabled by module" :
- impl_ret != C_OK ? "cluster is not ready" :
- trim_in_progress ? "trim in progress for some of the slots" :
- "server paused";
- if (start_blocked_logged == 0) {
- serverLog(LL_WARNING, "Can not start import task %s for slots: %s due to %s",
- task->id, slots_str, reason);
- start_blocked_logged = 1;
- }
- sdsfree(slots_str);
- return;
- }
- start_blocked_logged = 0; /* Reset the log flag */
-
- /* Detect if the cluster topology is changed. We should cancel the task if
- * we can not schedule it, and update the source node if needed. */
- sds err = NULL;
- clusterNode *source = validateImportSlotRanges(task->slots, &err, task);
- if (!source) {
- asmTaskCancel(task, err);
- sdsfree(slots_str);
- sdsfree(err);
- return;
- }
- /* Now I'm the owner of the slot range, cancel the import task. */
- if (source == getMyClusterNode()) {
- asmTaskCancel(task, "slots owned by myself now");
- sdsfree(slots_str);
- return;
- }
- /* Change the source node if needed. */
- if (source != task->source_node) {
- task->source_node = source;
- memcpy(task->source, clusterNodeGetName(source), CLUSTER_NAMELEN);
- serverLog(LL_NOTICE, "Import task %s source node changed: slots=%s, "
- "new_source=%.40s", task->id, slots_str, clusterNodeGetName(source));
- }
- sdsfree(slots_str);
-
- task->state = ASM_CONNECTING;
- task->start_time = server.mstime;
- asmNotifyStateChange(task, ASM_EVENT_IMPORT_STARTED);
-
- task->main_channel_conn = connCreate(server.el, connTypeOfReplication());
- char *ip = clusterNodeIp(task->source_node);
- int port = server.tls_replication ? clusterNodeTlsPort(task->source_node) :
- clusterNodeTcpPort(task->source_node);
- if (connConnect(task->main_channel_conn, ip, port, server.bind_source_addr,
- asmSyncWithSource) == C_ERR)
- {
- asmTaskSetFailed(task, "Main channel - Failed to connect to source node: %s",
- connGetLastError(task->main_channel_conn));
- return;
- }
- connSetPrivateData(task->main_channel_conn, task);
-}
-
-void clusterSyncSlotsCommand(client *c) {
- /* Only internal clients are allowed to execute this command to avoid
- * potential attack, since some state changes are not well protected,
- * external clients may damage the slot migration state. */
- if (!(c->flags & (CLIENT_INTERNAL | CLIENT_MASTER))) {
- addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for internal clients");
- c->flags |= CLIENT_CLOSE_AFTER_REPLY;
- return;
- }
-
- /* On replica, only allow master client to execute CONF subcommand. */
- if (!clusterNodeIsMaster(getMyClusterNode())) {
- if (!(c->flags & CLIENT_MASTER)) {
- /* Not master client, reject all subcommands and close the connection. */
- addReplyError(c, "CLUSTER SYNCSLOTS subcommands are only allowed for master");
- c->flags |= CLIENT_CLOSE_AFTER_REPLY;
- return;
- } else {
- /* Only allow CONF subcommand on replica. */
- if (strcasecmp(c->argv[2]->ptr, "conf")) return;
- }
- }
-
- if (!strcasecmp(c->argv[2]->ptr, "sync") && c->argc >= 6) {
- /* CLUSTER SYNCSLOTS SYNC <ID> <start-slot> <end-slot> [<start-slot> <end-slot>] */
- if (c->argc % 2 == 1) {
- addReplyErrorArity(c);
- return;
- }
-
- slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 4);
- if (!slots) return;
-
- /* Validate that the slot ranges are valid and that migration can be
- * initiated for them. */
- sds err = NULL;
- clusterNode *source = validateImportSlotRanges(slots, &err, NULL);
- if (!source) {
- addReplyErrorSds(c, err);
- slotRangeArrayFree(slots);
- return;
- }
-
- /* Check if the source node is the same as the current node. */
- if (source != getMyClusterNode()) {
- addReplyError(c, "This node is not the owner of the slots");
- slotRangeArrayFree(slots);
- return;
- }
-
- /* Verify the destination node is known and is a master. */
- if (c->node_id) {
- clusterNode *dest = clusterLookupNode(c->node_id, CLUSTER_NAMELEN);
- if (dest == NULL || !clusterNodeIsMaster(dest)) {
- addReplyErrorFormat(c, "Destination node %.40s is not a master", c->node_id);
- slotRangeArrayFree(slots);
- return;
- }
- }
-
- sds task_id = c->argv[3]->ptr;
- /* Notify the cluster implementation to prepare for the migrate task. */
- if (clusterAsmOnEvent(task_id, ASM_EVENT_MIGRATE_PREP, slots) != C_OK ||
- asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, ASM_NONE))
- {
- addReplyError(c, "-NOTREADY Cluster is not ready to migrate slots");
- slotRangeArrayFree(slots);
- return;
- }
-
- /* We do not start the migrate task if trim is disabled by module. */
- int disabled_by_module = server.cluster_module_trim_disablers > 0;
- if (disabled_by_module) {
- addReplyError(c, "Trim is disabled by module");
- slotRangeArrayFree(slots);
- return;
- }
-
- asmTask *task = listLength(asmManager->tasks) == 0 ? NULL :
- listNodeValue(listFirst(asmManager->tasks));
- if (task && !strcmp(task->id, task_id) &&
- task->operation == ASM_MIGRATE && task->state == ASM_FAILED &&
- slotRangeArrayIsEqual(slots, task->slots) &&
- memcmp(task->dest, c->node_id, CLUSTER_NAMELEN) == 0)
- {
- /* Reuse the failed task */
- asmTaskReset(task);
- slotRangeArrayFree(task->slots); /* Will be set again later */
- task->retry_count++;
- } else if (task) {
- if (task->state == ASM_FAILED) {
- /* We can create a new migrate task only if the current one is
- * failed, cancel the failed task to create a new one. */
- asmTaskCancel(task, "new migration requested");
- task = NULL;
- } else {
- addReplyError(c, "Another ASM task is already in progress");
- slotRangeArrayFree(slots);
- return;
- }
- }
-
- /* Create the migrate slots task and add it to the list,
- * otherwise reuse the existing one */
- if (task == NULL) {
- task = asmTaskCreate(task_id);
- task->start_time = server.mstime; /* Start immediately */
- serverAssert(listLength(asmManager->tasks) == 0);
- listAddNodeTail(asmManager->tasks, task);
- }
-
- task->slots = slots;
- task->operation = ASM_MIGRATE;
- memcpy(task->source, clusterNodeGetName(getMyClusterNode()), CLUSTER_NAMELEN);
- if (c->node_id) memcpy(task->dest, c->node_id, CLUSTER_NAMELEN);
-
- task->main_channel_client = c;
- c->task = task;
-
- /* We mark the main channel client as a replica, so this client is limited
- * by the client output buffer settings for replicas. The replstate has
- * no real significance, just to prevent it from going online. */
- c->flags |= (CLIENT_SLAVE | CLIENT_ASM_MIGRATING);
- c->replstate = SLAVE_STATE_WAIT_RDB_CHANNEL;
- if (server.repl_disable_tcp_nodelay)
- connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */
- listAddNodeTail(server.slaves, c);
- createReplicationBacklogIfNeeded();
-
- /* Wait for RDB channel to be ready */
- task->state = ASM_WAIT_RDBCHANNEL;
-
- sds slots_str = slotRangeArrayToString(slots);
- serverLog(LL_NOTICE, "Migrate task %s created: src=%.40s, dest=%.40s, slots=%s",
- task->id, task->source, task->dest, slots_str);
- sdsfree(slots_str);
-
- asmNotifyStateChange(task, ASM_EVENT_MIGRATE_STARTED);
-
- /* Keep the client in the main thread to avoid data races between the
- * connWrite call below and the client's event handler in IO threads. */
- if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c);
-
- /* addReply*() is not suitable for clients in SLAVE_STATE_WAIT_RDB_CHANNEL state. */
- if (connWrite(c->conn, "+RDBCHANNELSYNCSLOTS\r\n", 22) != 22)
- freeClientAsync(c);
- } else if (!strcasecmp(c->argv[2]->ptr, "rdbchannel") && c->argc == 4) {
- /* CLUSTER SYNCSLOTS RDBCHANNEL <task-id> */
- sds task_id = c->argv[3]->ptr;
- if (sdslen(task_id) != CLUSTER_NAMELEN) {
- addReplyError(c, "Invalid task id");
- return;
- }
-
- if (listLength(asmManager->tasks) == 0) {
- addReplyError(c, "No slot migration task in progress");
- return;
- }
-
- asmTask *task = listNodeValue(listFirst(asmManager->tasks));
- if (task->operation != ASM_MIGRATE || task->state != ASM_WAIT_RDBCHANNEL ||
- strcmp(task->id, task_id) != 0)
- {
- addReplyError(c, "Another migration task is already in progress");
- return;
- }
-
- if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state))) {
- /* Close the main channel client before rdb channel client connects */
- if (task->main_channel_client)
- freeClient(task->main_channel_client);
- }
-
- /* The main channel client must be present when setting RDB channel client */
- if (task->main_channel_client == NULL) {
- /* Maybe the main channel connection is closed. */
- addReplyError(c, "Main channel connection is not established");
- return;
- }
-
- /* Mark the client as a slave to generate slots snapshot */
- c->flags |= (CLIENT_SLAVE | CLIENT_REPL_RDB_CHANNEL | CLIENT_REPL_RDBONLY | CLIENT_ASM_MIGRATING);
- c->slave_capa |= SLAVE_CAPA_EOF;
- c->slave_req |= (SLAVE_REQ_SLOTS_SNAPSHOT | SLAVE_REQ_RDB_CHANNEL);
- c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
- c->repldbfd = -1;
- if (server.repl_disable_tcp_nodelay)
- connDisableTcpNoDelay(c->conn); /* Non-critical if it fails. */
- listAddNodeTail(server.slaves, c);
-
- /* Wait for bgsave to start for slots sync */
- task->state = ASM_WAIT_BGSAVE_START;
- task->rdb_channel_state = ASM_WAIT_BGSAVE_START;
- task->rdb_channel_client = c;
- c->task = task;
-
- /* Keep the client in the main thread to avoid data races between the
- * connWrite call in startBgsaveForReplication and the client's event
- * handler in IO threads. */
- if (c->tid != IOTHREAD_MAIN_THREAD_ID) keepClientInMainThread(c);
-
- if (!hasActiveChildProcess()) {
- startBgsaveForReplication(c->slave_capa, c->slave_req);
- } else {
- serverLog(LL_NOTICE, "BGSAVE for slots snapshot sync delayed");
- }
- } else if (!strcasecmp(c->argv[2]->ptr, "snapshot-eof") && c->argc == 3) {
- /* CLUSTER SYNCSLOTS SNAPSHOT-EOF */
- clusterSyncSlotsSnapshotEOF(c);
- } else if (!strcasecmp(c->argv[2]->ptr, "stream-eof") && c->argc == 3) {
- /* CLUSTER SYNCSLOTS STREAM-EOF */
- clusterSyncSlotsStreamEOF(c);
- } else if (!strcasecmp(c->argv[2]->ptr, "ack") && c->argc == 5) {
- /* CLUSTER SYNCSLOTS ACK <state> <offset> */
- long long offset;
- int dest_state;
-
- if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_STREAMING_BUF))) {
- dest_state = ASM_STREAMING_BUF;
- } else if (!strcasecmp(c->argv[3]->ptr, asmTaskStateToString(ASM_WAIT_STREAM_EOF))) {
- dest_state = ASM_WAIT_STREAM_EOF;
- } else {
- return; /* Not support now. */
- }
-
- if ((getLongLongFromObject(c->argv[4], &offset) != C_OK))
- return;
-
- if (c->task && c->task->operation == ASM_MIGRATE) {
- /* Update the state and ACKed offset from destination. */
- asmTask *task = c->task;
- task->dest_state = dest_state;
- if (task->dest_offset > (unsigned long long) offset) {
- serverLog(LL_WARNING, "CLUSTER SYNCSLOTS ACK received, dest state: %s, "
- "but offset %lld is less than the current dest offset %lld",
- asmTaskStateToString(dest_state), offset, task->dest_offset);
- return;
- }
- task->dest_offset = offset;
- serverLog(LL_DEBUG, "CLUSTER SYNCSLOTS ACK received, dest state: %s, "
- "updated dest offset to %lld, source offset: %lld",
- asmTaskStateToString(dest_state), task->dest_offset, task->source_offset);
-
- /* Record the time when the destination finishes applying the accumulated buffer */
- if (task->dest_state == ASM_WAIT_STREAM_EOF && task->dest_accum_applied_time == 0)
- task->dest_accum_applied_time = server.mstime;
-
- /* Pause write if needed */
- if (task->state == ASM_SEND_BULK_AND_STREAM || task->state == ASM_SEND_STREAM) {
- /* Pause writes on the main channel if the lag is less than the threshold. */
- if (task->dest_offset + server.asm_handoff_max_lag_bytes >= task->source_offset) {
- if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, ASM_HANDOFF_PREP)))
- return; /* Do not enter handoff prep state for testing buffer drain timeout. */
-
- serverLog(LL_NOTICE, "The applied offset lag %lld is less than the threshold %lld, "
- "pausing writes for slot handoff",
- task->source_offset - task->dest_offset,
- server.asm_handoff_max_lag_bytes);
- task->state = ASM_HANDOFF_PREP;
- asmLogTaskEvent(task, ASM_EVENT_HANDOFF_PREP);
- clusterAsmOnEvent(task->id, ASM_EVENT_HANDOFF_PREP, task->slots);
- }
- }
- }
- } else if (!strcasecmp(c->argv[2]->ptr, "fail") && c->argc == 4) {
- /* CLUSTER SYNCSLOTS FAIL <err> */
- return; /* This is a no-op, just to handle the command syntax. */
- } else if (!strcasecmp(c->argv[2]->ptr, "conf") && c->argc >= 5) {
- /* CLUSTER SYNCSLOTS CONF <option> <value> [<option> <value>] */
- for (int j = 3; j < c->argc; j += 2) {
- if (j + 1 >= c->argc) {
- addReplyErrorArity(c);
- return;
- }
- /* Handle each option here */
- if (!strcasecmp(c->argv[j]->ptr, "node-id")) {
- /* node-id <node-id> */
- sds node_id = c->argv[j + 1]->ptr;
- int node_id_len = (int) sdslen(node_id);
- if (node_id_len != CLUSTER_NAMELEN) {
- addReplyErrorFormat(c, "Invalid node id length %d", node_id_len);
- return;
- }
-
- /* Lookup the node in the cluster. */
- clusterNode *node = clusterLookupNode(node_id, node_id_len);
- if (node == NULL) {
- addReplyErrorFormat(c, "Node %s not found in cluster", node_id);
- return;
- }
-
- if (c->node_id) sdsfree(c->node_id);
- c->node_id = sdsdup(node_id);
- } else if (!strcasecmp(c->argv[j]->ptr, "slot-info")) {
- /* slot-info slot:key_size:expire_size */
- int count;
- long long slot, key_size, expire_size;
- sds slot_info = c->argv[j + 1]->ptr;
- sds *parts = sdssplitlen(slot_info, sdslen(slot_info), ":", 1, &count);
-
- /* Validate the slot info format, parse slot, key_size, expire_size */
- if (parts == NULL || count != 3 ||
- (string2ll(parts[0], sdslen(parts[0]), &slot) == 0 || slot < 0 || slot >= CLUSTER_SLOTS) ||
- (string2ll(parts[1], sdslen(parts[1]), &key_size) == 0 || key_size < 0) ||
- (string2ll(parts[2], sdslen(parts[2]), &expire_size) == 0 || expire_size < 0))
- {
- addReplyErrorFormat(c, "Invalid slot info: %s", slot_info);
- sdsfreesplitres(parts, count);
- return;
- }
-
- /* We resize individual slot specific dictionaries. */
- redisDb *db = c->db;
- serverAssert(db->id == 0); /* Only support DB 0 for cluster mode. */
- kvstoreDictExpand(db->keys, slot, key_size);
- kvstoreDictExpand(db->expires, slot, expire_size);
-
- sdsfreesplitres(parts, count);
- } else if (!strcasecmp(c->argv[j]->ptr, "asm-task")) {
- /* asm-task task_id:source_node:dest_node:operation:state:slot_ranges */
- if (clusterNodeIsMaster(getMyClusterNode())) {
- addReplyError(c, "CLUSTER SYNCSLOTS CONF ASM-TASK only allowed on replica");
- return;
- }
- if (asmReplicaHandleMasterTask(c->argv[j + 1]->ptr) != C_OK) {
- addReplyErrorFormat(c, "Failed to handle master task: %s",
- (char *)c->argv[j + 1]->ptr);
- }
- } else if (!strcasecmp(c->argv[j]->ptr, "capa")) {
- /* Ignore unrecognized capabilities. This is for future extensions. */
- } else {
- addReplyErrorFormat(c, "Unknown option %s", (char *)c->argv[j]->ptr);
- }
- }
- addReply(c, shared.ok);
- } else {
- addReplyErrorObject(c, shared.syntaxerr);
- }
-}
-
-/* Save a key-value pair to stream I/O using either RESTORE or AOF format. */
-static int slotSnapshotSaveKeyValuePair(rio *rdb, kvobj *o, int dbid) {
- /* Get the expire time */
- long long expiretime = kvobjGetExpire(o);
-
- /* Set on stack string object for key */
- robj key;
- initStaticStringObject(key, kvobjGetKey(o));
-
- /* If module object or non-string object that is not too big,
- * use RESTORE command (RDB format) to migrate data.
- * Generally RDB binary format is more efficient, but it may cause
- * block in the destination if the object is too large, so fall back
- * to AOF format if necessary. */
- if ((o->type == OBJ_MODULE) ||
- (o->type != OBJ_STRING && getObjectLength(o) <= ASM_AOF_MIN_ITEMS_PER_KEY))
- {
- if (rioWriteBulkCount(rdb, '*', 5) == 0) return C_ERR;
- if (rioWriteBulkString(rdb, "RESTORE", 7) == 0) return C_ERR;
- if (rioWriteBulkObject(rdb, &key) == 0) return C_ERR;
- if (rioWriteBulkLongLong(rdb, expiretime == -1 ? 0 : expiretime) == 0) return C_ERR;
-
- /* Create the DUMP encoded representation. */
- rio payload;
- createDumpPayload(&payload, o, &key, dbid, 1);
- sds buf = payload.io.buffer.ptr;
- if (rioWriteBulkString(rdb, buf, sdslen(buf)) == 0) {
- sdsfree(payload.io.buffer.ptr);
- return C_ERR;
- }
- sdsfree(payload.io.buffer.ptr);
-
- /* Write ABSTTL */
- if (rioWriteBulkString(rdb, "ABSTTL", 6) == 0) return C_ERR;
- } else {
- /* Use AOF format to migrate data */
- if (rewriteObject(rdb, &key, o, dbid, expiretime) == C_ERR) return C_ERR;
- }
-
- return C_OK;
-}
-
-/* Modules can use RM_ClusterPropagateForSlotMigration() during the
- * CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE event to propagate commands
- * that should be delivered just before the slot snapshot delivery starts. This
- * function triggers the event, collects the commands and writes them to the rio. */
-static int propagateModuleCommands(asmTask *task, rio *rdb) {
- RedisModuleClusterSlotMigrationInfo info = {
- .version = REDISMODULE_CLUSTER_SLOT_MIGRATION_INFO_VERSION,
- .task_id = task->id,
- .slots = (RedisModuleSlotRangeArray *) task->slots
- };
- memcpy(info.source_node_id, task->source, CLUSTER_NAMELEN);
- memcpy(info.destination_node_id, task->dest, CLUSTER_NAMELEN);
-
- task->pre_snapshot_module_cmds = zcalloc(sizeof(*task->pre_snapshot_module_cmds));
- moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION,
- REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE,
- &info
- );
-
- int ret = C_OK;
- /* Write the module commands to the rio */
- for (int i = 0; i < task->pre_snapshot_module_cmds->numops; i++) {
- redisOp *op = &task->pre_snapshot_module_cmds->ops[i];
- if (rioWriteBulkCount(rdb, '*', op->argc) == 0) {
- ret = C_ERR;
- break;
- }
- for (int j = 0; j < op->argc; j++)
- if (rioWriteBulkObject(rdb, op->argv[j]) == 0) {
- ret = C_ERR;
- break;
- }
- }
- redisOpArrayFree(task->pre_snapshot_module_cmds);
- zfree(task->pre_snapshot_module_cmds);
- task->pre_snapshot_module_cmds = NULL;
- return ret;
-}
-
-/* Save the slot ranges snapshot to the file. It generates the DUMP encoded
- * representation of each key in the slot ranges and writes it to the file.
- *
- * Returns C_OK on success, or C_ERR on error. */
-int slotSnapshotSaveRio(int req, rio *rdb, int *error) {
- serverAssert(req & SLAVE_REQ_SLOTS_SNAPSHOT);
-
- dictEntry *de;
- kvstoreDictIterator kvs_di;
-
- if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_RDB_CHANNEL, ASM_SEND_BULK_AND_STREAM)))
- rioAbort(rdb); /* Simulate a failure */
-
- /* Disable RDB compression for slots snapshot since compression is too
- * expensive both in source and destination. */
- server.rdb_compression = 0;
-
- /* Only support a single migrate task */
- serverAssert(listLength(asmManager->tasks) == 1);
- asmTask *task = listNodeValue(listFirst(asmManager->tasks));
- serverAssert(task->operation == ASM_MIGRATE);
-
- if (propagateModuleCommands(task, rdb) == C_ERR) goto werr;
-
- /* Dump functions and send to destination side. */
- rio payload;
- createFunctionDumpPayload(&payload);
- sds functions = payload.io.buffer.ptr;
- if (rioWriteBulkCount(rdb, '*', 4) == 0) goto werr;
- if (rioWriteBulkString(rdb, "FUNCTION", 8) == 0) goto werr;
- if (rioWriteBulkString(rdb, "RESTORE", 7) == 0) goto werr;
- if (rioWriteBulkString(rdb, functions, sdslen(functions)) == 0) {
- sdsfree(payload.io.buffer.ptr);
- goto werr;
- }
- sdsfree(payload.io.buffer.ptr);
- /* Add the REPLACE option to the RESTORE command, to avoid error
- * when migrating to a node with existing libraries. */
- if (rioWriteBulkString(rdb, "REPLACE", 7) == 0) goto werr;
-
- for (int i = 0; i < server.dbnum; i++) {
- char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
- redisDb *db = server.db + i;
- if (kvstoreSize(db->keys) == 0) continue;
-
- /* SELECT the new DB */
- if (rioWrite(rdb,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
- if (rioWriteBulkLongLong(rdb, i) == 0) goto werr;
-
- /* Iterate all slot ranges, and generate the DUMP encoded
- * representation of each key in the DB. */
- for (int j = 0; j < task->slots->num_ranges; j++) {
- slotRange *sr = &task->slots->ranges[j];
- /* Iterate all keys in the slot range */
- for (int k = sr->start; k <= sr->end; k++) {
- int send_slot_info = 0;
-
- kvstoreInitDictIterator(&kvs_di, server.db->keys, k);
- while ((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
- /* Send slot info before the first key in the slot */
- if (!send_slot_info) {
- /* Format slot info */
- char buf[128];
- int len = snprintf(buf, sizeof(buf), "%d:%lu:%lu",
- k, kvstoreDictSize(db->keys, k),
- kvstoreDictSize(db->expires, k));
- serverAssert(len > 0 && len < (int)sizeof(buf));
-
- /* Send slot info */
- if (rioWriteBulkCount(rdb, '*', 5) == 0) goto werr2;
- if (rioWriteBulkString(rdb, "CLUSTER", 7) == 0) goto werr2;
- if (rioWriteBulkString(rdb, "SYNCSLOTS", 9) == 0) goto werr2;
- if (rioWriteBulkString(rdb, "CONF", 4) == 0) goto werr2;
- if (rioWriteBulkString(rdb, "SLOT-INFO", 9) == 0) goto werr2;
- if (rioWriteBulkString(rdb, buf, len) == 0) goto werr2;
- send_slot_info = 1;
- }
-
- /* Save a key-value pair */
- kvobj *o = dictGetKV(de);
- if (slotSnapshotSaveKeyValuePair(rdb, o, db->id) == C_ERR) goto werr2;
-
- /* Delay return if required (for testing) */
- if (unlikely(server.rdb_key_save_delay)) {
- /* Send buffer to the destination ASAP. */
- if (rioFlush(rdb) == 0) goto werr2;
- debugDelay(server.rdb_key_save_delay);
- }
- }
- kvstoreResetDictIterator(&kvs_di);
- }
- }
- }
-
- /* Write the end of the snapshot file command */
- if (rioWriteBulkCount(rdb, '*', 3) == 0) goto werr;
- if (rioWriteBulkString(rdb, "CLUSTER", 7) == 0) goto werr;
- if (rioWriteBulkString(rdb, "SYNCSLOTS", 9) == 0) goto werr;
- if (rioWriteBulkString(rdb, "SNAPSHOT-EOF", 12) == 0) goto werr;
- return C_OK;
-
-werr2:
- kvstoreResetDictIterator(&kvs_di);
-werr:
- if (error) *error = errno;
- return C_ERR;
-}
-
-/* Read error handler for sync buffer */
-static void asmReadSyncBufferErrorHandler(connection *conn) {
- if (listLength(asmManager->tasks) == 0) return;
- asmTask *task = listNodeValue(listFirst(asmManager->tasks));
- if (task->state != ASM_ACCUMULATE_BUF && task->state != ASM_STREAMING_BUF) return;
-
- if (task->state == ASM_STREAMING_BUF) {
- freeClient(connGetPrivateData(conn));
- } else {
- asmTaskSetFailed(task, "Main channel - Read error: %s", connGetLastError(conn));
- }
-}
-
-/* Read data from connection into sync buffer. */
-static void asmSyncBufferReadFromConn(connection *conn) {
- /* The task may be canceled (move to finished list) or failed during streaming buffer. */
- if (listLength(asmManager->tasks) == 0) return;
- asmTask *task = listNodeValue(listFirst(asmManager->tasks));
- if (task->state != ASM_ACCUMULATE_BUF && task->state != ASM_STREAMING_BUF) return;
-
- /* ASM_ACCUMULATE_BUF and ASM_STREAMING_BUF fail points are handled here */
- if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_MAIN_CHANNEL, task->state)))
- shutdown(conn->fd, SHUT_RDWR);
-
- replDataBuf *buf = &task->sync_buffer;
- if (task->state == ASM_STREAMING_BUF) {
- /* While streaming accumulated buffers, we continue reading from the
- * source to prevent accumulation on source side as much as possible.
- * However, we aim to drain buffer eventually. To ensure we consume more
- * than we read, we'll read at most one block after two blocks of
- * buffers are consumed. */
- if (listLength(buf->blocks) + 1 >= buf->last_num_blocks)
- return;
- buf->last_num_blocks = listLength(buf->blocks);
- }
-
- replDataBufReadFromConn(conn, buf, asmReadSyncBufferErrorHandler);
-}
-
-static void asmSyncBufferStreamYieldCallback(void *ctx) {
- replDataBufToDbCtx *context = ctx;
- asmTask *task = context->privdata;
- client *c = context->client;
-
- char offset[64];
- ull2string(offset, sizeof(offset), context->applied_offset);
-
- char *err = sendCommand(c->conn, "CLUSTER", "SYNCSLOTS", "ACK",
- asmTaskStateToString(task->state), offset, NULL);
- if (err) {
- serverLog(LL_WARNING, "Error sending CLUSTER SYNCSLOTS ACK: %s", err);
- sdsfree(err);
- freeClient(c);
- }
- serverLog(LL_DEBUG, "Yielding sending ACK during streaming buffer, applied offset: %zu",
- context->applied_offset);
-}
-
-static int asmSyncBufferStreamShouldContinue(void *ctx) {
- replDataBufToDbCtx *context = ctx;
-
- /* If the task is failed or canceled, we should stop streaming immediately. */
- asmTask *task = context->privdata;
- if (task->state == ASM_FAILED || task->state == ASM_CANCELED) return 0;
-
- /* Check the client-close flag only if the task has not failed or been canceled,
- * otherwise the client may have already been freed. */
- if (context->client->flags & CLIENT_CLOSE_ASAP) return 0;
-
- return 1;
-}
-
-/* Stream the sync buffer to the database. */
-void asmSyncBufferStreamToDb(asmTask *task) {
- task->state = ASM_STREAMING_BUF;
- serverLog(LL_NOTICE, "Starting to stream accumulated buffer for the import task (%zu bytes)",
- task->sync_buffer.used);
-
- /* The buffered stream from the main channel connection into
- * the database is processed by a fake client. */
- client *c = createClient(task->main_channel_conn);
- c->flags |= (CLIENT_MASTER | CLIENT_INTERNAL | CLIENT_ASM_IMPORTING);
- c->querybuf = sdsempty();
- c->authenticated = 1;
- c->user = NULL;
- c->task = task;
-
- /* Mark the peek buffer block count. We'll use it to verify we consume
- * faster than we read from the source side. */
- task->sync_buffer.last_num_blocks = listLength(task->sync_buffer.blocks);
-
- /* Continue accumulating during streaming to prevent accumulation on source side. */
- connSetReadHandler(c->conn, asmSyncBufferReadFromConn);
-
- replDataBufToDbCtx ctx = {
- .privdata = task,
- .client = c,
- .applied_offset = 0,
- .should_continue = asmSyncBufferStreamShouldContinue,
- .yield_callback = asmSyncBufferStreamYieldCallback,
- };
-
- /* Start streaming the buffer to the DB. This task may fail due to network
- * errors or cancellations. We never release the task immediately; instead,
- * it may be moved to the finished list. The actual free happens in serverCron,
- * which ensures there is no use-after-free issue. */
- int ret = replDataBufStreamToDb(&task->sync_buffer, &ctx);
-
- if (ret == C_OK) {
- if (task->stream_eof_during_streaming) {
- /* STREAM-EOF received during streaming, we can take over now. */
- asmImportTakeover(task);
- return;
- }
-
- /* Update the dest offset according to applied bytes. */
- task->dest_offset = ctx.applied_offset;
- /* Wait STREAM-EOF from the source node. */
- task->state = ASM_WAIT_STREAM_EOF;
- connSetReadHandler(task->main_channel_conn, readQueryFromClient);
- serverLog(LL_NOTICE, "Successfully streamed accumulated buffer for the import task, applied offset: %lld",
- task->dest_offset);
-
- if (unlikely(asmDebugIsFailPointActive(ASM_IMPORT_MAIN_CHANNEL, task->state)))
- shutdown(task->main_channel_conn->fd, SHUT_RDWR); /* Simulate a failure */
-
- /* ACK offset after streaming buffer is done. */
- asmImportSendACK(task);
- } else {
- /* If the task is already canceled or failed, we don't need to do anything here. */
- if (task->state == ASM_FAILED || task->state == ASM_CANCELED) return;
-
- asmTaskSetFailed(task, "Main channel - Failed to stream into the DB");
- }
-}
-
-void asmImportIncrAppliedBytes(struct asmTask *task, size_t bytes) {
- if (!task || task->state != ASM_WAIT_STREAM_EOF) return;
- task->dest_offset += bytes;
-}
-
-/* Send STREAM-EOF if the sync buffer stream is drained. */
-void asmSendStreamEofIfDrained(asmTask *task) {
- client *c = task->main_channel_client;
-
- /* The command streams for slot ranges have been drained. */
- if (!clientHasPendingReplies(c)) {
- serverLog(LL_NOTICE, "Slot migration command stream drained, sending STREAM-EOF to the destination");
-
- if (unlikely(asmDebugIsFailPointActive(ASM_MIGRATE_MAIN_CHANNEL, task->state)))
- shutdown(c->conn->fd, SHUT_RDWR);
-
- /* Send STREAM-EOF to indicate the end of the stream. */
- char *err = sendCommand(c->conn, "CLUSTER", "SYNCSLOTS", "STREAM-EOF", NULL);
- if (err) {
- asmTaskSetFailed(task, "Main channel - Failed to send STREAM-EOF: %s", err);
- sdsfree(err);
- return;
- }
-
- /* Even though the main channel client is no longer needed, we
- * can't close it directly because the destination may still be
- * sending ACKs over this connection. Instead, we leave it to the
- * destination to close it. We just clear the task and client
- * references */
- task->main_channel_client->task = NULL;
- task->main_channel_client = NULL;
-
- /* There may be a delay to handle the disconnection of RDB channel,
- * so we clear the task and client references here. */
- if (task->rdb_channel_client != NULL) {
- task->rdb_channel_state = ASM_COMPLETED;
- task->rdb_channel_client->task = NULL;
- freeClientAsync(task->rdb_channel_client);
- task->rdb_channel_client = NULL;
- }
-
- task->state = ASM_STREAM_EOF;
- }
-}
-
-void asmBeforeSleep(void) {
- asmTrimJobProcessPending();
-
- if (listLength(asmManager->tasks) == 0) return;
- asmTask *task = listNodeValue(listFirst(asmManager->tasks));
-
- if (task->operation == ASM_IMPORT) {
- if (task->state == ASM_NONE)
- asmStartImportTask(task);
- else if (task->state == ASM_READY_TO_STREAM)
- asmSyncBufferStreamToDb(task);
- }
-
- if (task->operation == ASM_MIGRATE) {
- if (task->cross_slot_during_propagating) {
- asmTaskCancel(task, "propagating cross slot command");
- return;
- }
-
- if (task->state == ASM_HANDOFF) {
- /* To avoid long pause, we fail the task if the pause takes too long. */
- if (server.mstime - task->paused_time >= server.asm_write_pause_timeout) {
- asmTaskSetFailed(task, "Server paused timeout");
- return;
- }
- asmSendStreamEofIfDrained(task);
- } else if (task->state == ASM_STREAM_EOF) {
- /* In state ASM_STREAM_EOF (server is still paused), we are waiting
- * for the destination node to broadcast the slot ownership change.
- * But maybe the destination node is failed or network is not available,
- * the source node may be paused forever. So we fail the task if it
- * takes too long.
- *
- * NOTE: There is a tricky case where the destination node may advertise
- * ownership of the slot, causing a temporary configuration conflict.
- * However, the configuration will eventually converge. In most cases,
- * the destination node becomes the winner, since it bumps its config
- * epoch before taking over slot ownership. */
- if (server.mstime - task->paused_time >= server.asm_write_pause_timeout)
- asmTaskSetFailed(task, "Server paused timeout");
- }
- }
-}
-
-void asmCron(void) {
- static unsigned long long asm_cron_runs = 0;
- asm_cron_runs++;
-
- if (listLength(asmManager->tasks) == 0) return;
- asmTask *task = listNodeValue(listFirst(asmManager->tasks));
-
- if (task->operation == ASM_IMPORT) {
- if (task->state == ASM_FAILED) {
- /* Retry every 1 second */
- if (asm_cron_runs % 10 == 0) {
- asmTaskReset(task);
- task->retry_count++;
- serverAssert(task->state == ASM_NONE);
- asmStartImportTask(task);
- }
- } else if (task->state == ASM_WAIT_STREAM_EOF) {
- if (asmImportSendACK(task) == C_ERR) return;
-
- /* Check if the main channel is timed out */
- client *c = connGetPrivateData(task->main_channel_conn);
- serverAssert(c->task == task);
- if (server.unixtime - c->lastinteraction > server.repl_timeout)
- asmTaskSetFailed(task, "Main channel - Connection timeout");
- } else if (task->state == ASM_ACCUMULATE_BUF &&
- task->rdb_channel_state == ASM_RDBCHANNEL_TRANSFER)
- {
- /* Check if the RDB channel is timed out */
- client *c = connGetPrivateData(task->rdb_channel_conn);
- serverAssert(c->task == task);
- if (server.unixtime - c->lastinteraction > server.repl_timeout)
- asmTaskSetFailed(task, "RDB channel - Connection timeout");
- } else if (task->state == ASM_SEND_SYNCSLOTS) {
- /* Rare case: the source node replied to SYNCSLOTS with -NOTREADY
- * because it wasn't ready to start a migration. We'll retry
- * SYNCSLOTS every second instead of failing the attempt which could
- * trigger unnecessary cleanup in the cluster implementation. */
- if (asm_cron_runs % 10 == 0)
- asmSyncWithSource(task->main_channel_conn);
- }
- } else if (task->operation == ASM_MIGRATE) {
- if (task->state == ASM_SEND_STREAM) {
- /* Currently, we only need to check the main channel timeout when sending streams.
- * For RDB channel connections, the timeout is handled by the socket itself
- * during writes in slotSnapshotSaveRio. */
- if (server.unixtime - task->main_channel_client->lastinteraction > server.repl_timeout)
- asmTaskSetFailed(task, "Main channel - Connection timeout");
-
- /* After the destination applies the accumulated buffer, the source continues
- * sending commands for migrating slots. The destination keeps applying them,
- * but the gap remains above the acceptable limit, which may cause endless
- * synchronization. A timeout check is required to handle this case.
- *
- * The timeout is calculated as the maximum of two values:
- * - A configurable timeout (cluster-slot-migration-sync-buffer-drain-timeout) to
- * avoid false positives.
- * - A dynamic timeout based on the time that the destination took to apply the
- * slot snapshot and the accumulated buffer during slot snapshot delivery.
- * The destination should be able to drain the remaining sync buffer in less
- * time than this. We multiply it by 2 to be more conservative. */
- if (task->dest_state == ASM_WAIT_STREAM_EOF && task->dest_accum_applied_time &&
- server.mstime - task->dest_accum_applied_time >
- max(server.asm_sync_buffer_drain_timeout,
- (task->dest_accum_applied_time - task->dest_slots_snapshot_time) * 2))
- {
- asmTaskSetFailed(task, "Sync buffer drain timeout");
- }
- }
- }
-
- /* Trim the archived tasks list if it grows too large */
- while (listLength(asmManager->archived_tasks) > (unsigned long)server.asm_max_archived_tasks) {
- asmTask *oldest = listNodeValue(listLast(asmManager->archived_tasks));
- asmTaskFree(oldest);
- listDelNode(asmManager->archived_tasks, listLast(asmManager->archived_tasks));
- }
-}
-
-/* Cancel a specific task if ID is provided, otherwise cancel all tasks. */
-int clusterAsmCancel(const char *task_id, const char *reason) {
- if (asmManager == NULL) return 0;
-
- if (task_id) {
- asmTask *task = asmLookupTaskById(task_id);
- if (!task) return 0; /* Not found */
-
- asmTaskCancel(task, reason);
- return 1;
- } else {
- int num_cancelled = 0;
- listIter li;
- listNode *ln;
-
- listRewind(asmManager->tasks, &li);
- while ((ln = listNext(&li)) != NULL) {
- asmTask *task = listNodeValue(ln);
- asmTaskCancel(task, reason);
- num_cancelled++;
- }
- return num_cancelled;
- }
-}
-
-/* Cancel all tasks that overlap with the given slot ranges.
- * If slots is NULL, cancel all tasks. */
-int clusterAsmCancelBySlotRangeArray(struct slotRangeArray *slots, const char *reason) {
- if (asmManager == NULL) return 0;
-
- int num_cancelled = 0;
- listIter li;
- listNode *ln;
- listRewind(asmManager->tasks, &li);
- while ((ln = listNext(&li)) != NULL) {
- asmTask *task = listNodeValue(ln);
- if (!slots || slotRangeArraysOverlap(task->slots, slots)) {
- asmTaskCancel(task, reason);
- num_cancelled++;
- }
- }
- return num_cancelled;
-}
-
-/* Cancel the task that overlap with the given slot. */
-int clusterAsmCancelBySlot(int slot, const char *reason) {
- slotRange req = {slot, slot};
- if (asmManager == NULL) return 0;
-
- /* Cancel it if found. */
- asmTask *task = lookupAsmTaskBySlotRange(&req);
- if (task) asmTaskCancel(task, reason);
-
- return task ? 1 : 0;
-}
-
-/* Cancel all tasks that involve the given node. */
-int clusterAsmCancelByNode(void *node, const char *reason) {
- if (asmManager == NULL || node == NULL) return 0;
-
- /* If the node to be deleted is myself, cancel all tasks. */
- clusterNode *n = node;
- if (n == getMyClusterNode()) return clusterAsmCancel(NULL, reason);
-
- int num_cancelled = 0;
- listIter li;
- listNode *ln;
- listRewind(asmManager->tasks, &li);
- while ((ln = listNext(&li)) != NULL) {
- asmTask *task = listNodeValue(ln);
- /* Cancel the task if the source node is the one to be deleted, or
- * the dest node is the one to be deleted. */
- if (task->source_node == n ||
- !memcmp(task->dest, clusterNodeGetName(n), CLUSTER_NAMELEN) ||
- !memcmp(task->source, clusterNodeGetName(n), CLUSTER_NAMELEN))
- {
- asmTaskCancel(task, reason);
- num_cancelled++;
- }
- }
- return num_cancelled;
-}
-
-/* Check if the slot is in an active ASM task. */
-int isSlotInAsmTask(int slot) {
- slotRange req = {slot, slot};
- if (!asmManager) return 0;
-
- listIter li;
- listNode *ln;
- listRewind(asmManager->tasks, &li);
- while ((ln = listNext(&li)) != NULL) {
- asmTask *task = listNodeValue(ln);
- if (slotRangeArrayOverlaps(task->slots, &req))
- return 1;
- }
- return 0;
-}
-
-/* Check if the slot is in a pending trim job. It may happen if we can't trim
- * the slots immediately due to a write pause or when active trim is in progress. */
-int isSlotInTrimJob(int slot) {
- slotRange req = {slot, slot};
-
- if (!asmManager || !asmIsTrimInProgress()) return 0;
-
- /* Check if the slot is in any pending trim job. */
- listIter li;
- listNode *ln;
- listRewind(asmManager->pending_trim_jobs, &li);
- while ((ln = listNext(&li)) != NULL) {
- slotRangeArray *slots = listNodeValue(ln);
- if (slotRangeArrayOverlaps(slots, &req))
- return 1;
- }
-
- /* Check if the slot is in any active trim job. */
- listRewind(asmManager->active_trim_jobs, &li);
- while ((ln = listNext(&li)) != NULL) {
- slotRangeArray *slots = listNodeValue(ln);
- if (slotRangeArrayOverlaps(slots, &req))
- return 1;
- }
- return 0;
-}
-
-int clusterAsmHandoff(const char *task_id, sds *err) {
- serverAssert(task_id);
-
- asmTask *task = asmLookupTaskById(task_id);
- if (!task || task->state != ASM_HANDOFF_PREP) {
- *err = sdscatprintf(sdsempty(), "No suitable ASM task found for id: %s, task_state: %s",
- task_id, task ? asmTaskStateToString(task->state) : "null");
- return C_ERR;
- }
-
- task->state = ASM_HANDOFF;
- task->paused_time = server.mstime;
-
- return C_OK;
-}
-
-/* Notify Redis that the config is updated for the task. */
-int asmNotifyConfigUpdated(asmTask *task, sds *err) {
- int event = -1;
-
- if (task->operation == ASM_IMPORT && task->state == ASM_TAKEOVER) {
- event = ASM_EVENT_IMPORT_COMPLETED;
- } else if (task->operation == ASM_MIGRATE && task->state == ASM_STREAM_EOF) {
- event = ASM_EVENT_MIGRATE_COMPLETED;
- } else {
- *err = sdscatprintf(sdsempty(),
- "ASM task is not in the correct state for config update: %s",
- asmTaskStateToString(task->state));
- asmTaskCancel(task, "slots configuration updated");
- return C_ERR;
- }
-
- /* Reset per-slot statistics for the migrated/imported ranges.
- * Note: cluster_legacy.c also cleans up, so this may run twice, but
- * required if an alternative cluster impl is in use. */
- for (int i = 0; i < task->slots->num_ranges; i++) {
- slotRange *sr = &task->slots->ranges[i];
- for (int j = sr->start; j <= sr->end; j++)
- clusterSlotStatReset(j);
- }
-
- /* Clear error message if successful. */
- sdsfree(task->error);
- task->error = sdsempty();
- task->state = ASM_COMPLETED;
-
- asmNotifyStateChange(task, event);
- asmTaskFinalize(task);
-
- /* Trim the slots after the migrate task is completed. */
- if (event == ASM_EVENT_MIGRATE_COMPLETED)
- asmTrimJobSchedule(task->slots);
-
- return C_OK;
-}
-
-/* Import/Migrate task is done, config is updated. */
-int clusterAsmDone(const char *task_id, sds *err) {
- serverAssert(task_id);
-
- asmTask *task = asmLookupTaskById(task_id);
- if (!task) {
- *err = sdscatprintf(sdsempty(), "No ASM task found for id: %s", task_id);
- return C_ERR;
- }
- return asmNotifyConfigUpdated(task, err);
-}
-
-int clusterAsmProcess(const char *task_id, int event, void *arg, char **err) {
- int ret, num_cancelled;
- sds errsds = NULL;
- static char buf[256];
-
- if (err) *err = NULL;
-
- switch (event) {
- case ASM_EVENT_IMPORT_START: {
- /* Validate the slot ranges. */
- slotRangeArray *slots = slotRangeArrayDup(arg);
- if (slotRangeArrayNormalizeAndValidate(slots, &errsds) != C_OK) {
- slotRangeArrayFree(slots);
- ret = C_ERR;
- break;
- }
- ret = asmCreateImportTask(task_id, slots, &errsds) ? C_OK : C_ERR;
- break;
- }
- case ASM_EVENT_CANCEL: {
- num_cancelled = clusterAsmCancel(task_id, "user request");
- if (arg) *((int *)arg) = num_cancelled;
- ret = C_OK;
- break;
- }
- case ASM_EVENT_HANDOFF: {
- ret = clusterAsmHandoff(task_id, &errsds);
- break;
- }
- case ASM_EVENT_DONE: {
- ret = clusterAsmDone(task_id, &errsds);
- break;
- }
- default: {
- ret = C_ERR;
- errsds = sdscatprintf(sdsempty(), "Unknown operation: %d", event);
- break;
- }
- }
-
- if (ret != C_OK && errsds && err) {
- snprintf(buf, sizeof(buf), "%s", errsds);
- *err = buf;
- }
- sdsfree(errsds);
-
- return ret;
-}
-
-/* Propagate TRIMSLOTS command to AOF and replicas. */
-static void propagateTrimSlots(slotRangeArray *slots) {
- int argc = slots->num_ranges * 2 + 3;
- robj **argv = zmalloc(sizeof(robj*) * argc);
- argv[0] = createStringObject("TRIMSLOTS", 9);
- argv[1] = createStringObject("RANGES", 6);
- argv[2] = createStringObjectFromLongLong(slots->num_ranges);
- for (int i = 0; i < slots->num_ranges; i++) {
- argv[i*2+3] = createStringObjectFromLongLong(slots->ranges[i].start);
- argv[i*2+4] = createStringObjectFromLongLong(slots->ranges[i].end);
- }
-
- enterExecutionUnit(1, 0);
-
- int prev_replication_allowed = server.replication_allowed;
- server.replication_allowed = 1;
- alsoPropagate(-1, argv, argc, PROPAGATE_AOF | PROPAGATE_REPL);
- server.replication_allowed = prev_replication_allowed;
-
- exitExecutionUnit();
- postExecutionUnitOperations();
-
- for (int i = 0; i < argc; i++)
- decrRefCount(argv[i]);
- zfree(argv);
-}
-
-/* If this node is a replica and there is an active trim or a pending trim
- * job (due to write pause), we cannot process commands from the master for the
- * slots that are waiting to be trimmed. Otherwise, the trim cycle could
- * mistakenly delete newly added keys. In this case, the master will be blocked
- * until the trim job finishes. This is supposed to be a rare event as it needs
- * to migrate slots and import them back before the trim job is done. */
-void asmUnblockMasterAfterTrim(void) {
- if (server.master &&
- server.master->flags & CLIENT_BLOCKED &&
- server.master->bstate.btype == BLOCKED_POSTPONE_TRIM)
- {
- unblockClient(server.master, 1);
- serverLog(LL_NOTICE, "Unblocking master client after active trim is completed");
- }
-}
-
-/* Trim the slots asynchronously in the BIO thread. */
-void asmTriggerBackgroundTrim(slotRangeArray *slots) {
- RedisModuleClusterSlotMigrationTrimInfoV1 fsi = {
- REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION,
- (RedisModuleSlotRangeArray *) slots
- };
-
- moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM,
- REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_BACKGROUND,
- &fsi);
-
- signalFlushedDb(0, 1, slots);
-
- /* Create temp kvstores and estore, move relevant slot dicts/ebuckets into them,
- * and delete them in BIO thread asynchronously. */
- kvstore *keys = kvstoreCreate(&kvstoreBaseType, &dbDictType,
- CLUSTER_SLOT_MASK_BITS,
- KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
- kvstore *expires = kvstoreCreate(&kvstoreBaseType, &dbExpiresDictType,
- CLUSTER_SLOT_MASK_BITS,
- KVSTORE_ALLOCATE_DICTS_ON_DEMAND);
- estore *subexpires = estoreCreate(&subexpiresBucketsType, CLUSTER_SLOT_MASK_BITS);
-
- size_t total_keys = 0;
-
- for (int i = 0; i < slots->num_ranges; i++) {
- for (int slot = slots->ranges[i].start; slot <= slots->ranges[i].end; slot++) {
- total_keys += kvstoreDictSize(server.db[0].keys, slot);
- kvstoreMoveDict(server.db[0].keys, keys, slot);
- kvstoreMoveDict(server.db[0].expires, expires, slot);
- estoreMoveEbuckets(server.db[0].subexpires, subexpires, slot);
- }
- }
-
- emptyDbDataAsync(keys, expires, subexpires);
-
- sds str = slotRangeArrayToString(slots);
- serverLog(LL_NOTICE, "Background trim started for slots: %s to trim %zu keys.", str, total_keys);
- sdsfree(str);
-
- /* Unblock master if blocked. This can only happen in a very unlikely case,
- * trim job will be in pending list due to write pause and master will send
- * commands for the slots that are waiting to be trimmed. Just keeping this
- * call here for being defensive as it is harmless. */
- asmUnblockMasterAfterTrim();
-}
-
-/* Trim the slots. */
-void asmTrimSlots(slotRangeArray *slots) {
- if (asmManager->debug_trim_method == ASM_DEBUG_TRIM_NONE)
- return;
-
- /* Trigger active trim for the following cases:
- * 1. Debug override: trim method is set to 'active'.
- * 2. There are clients using client side caching (client tracking is enabled):
- * There is no way to invalidate specific slots in the client tracking
- * protocol. For now, we just use active trim to trim the slots.
- * 3. Module subscribers: If any module is subscribed to TRIMMED event, we
- * assume module needs per key notification and cannot use background trim.
- */
- int activetrim = server.tracking_clients != 0 ||
- (asmManager->debug_trim_method == ASM_DEBUG_TRIM_ACTIVE) ||
- (asmManager->debug_trim_method == ASM_DEBUG_TRIM_DEFAULT &&
- moduleHasSubscribersForKeyspaceEvent(NOTIFY_KEY_TRIMMED));
- if (activetrim)
- asmTriggerActiveTrim(slots);
- else
- asmTriggerBackgroundTrim(slots);
-}
-
-/* Schedule a trim job for the specified slot ranges. The job will be
- * deferred and handled later in asmBeforeSleep(). We delay the trim jobs to
- * asmBeforeSleep() to ensure it only runs when there is no write pause. */
-void asmTrimJobSchedule(slotRangeArray *slots) {
- listAddNodeTail(asmManager->pending_trim_jobs, slotRangeArrayDup(slots));
-}
-
-/* Process any pending trim jobs. */
-void asmTrimJobProcessPending(void) {
- /* Check if there is any pending trim job and we can propagate it. */
- if (listLength(asmManager->pending_trim_jobs) == 0 ||
- asmManager->debug_trim_method == ASM_DEBUG_TRIM_NONE)
- {
- return;
- }
-
- /* If this node is a replica, it should not initiate slot trimming actively.
- * Cancel the trim job and unblock the master if it is blocked. */
- if (clusterNodeIsSlave(getMyClusterNode())) {
- asmCancelPendingTrimJobs();
- asmUnblockMasterAfterTrim();
- return;
- }
-
- /* Determine if we can start the trim job:
- * - require client writes not paused (so key deletions are allowed)
- * - require replicas not paused (so TRIMSLOTS can be propagated).
- * - require trim is not disabled via RedisModule_ClusterDisableTrim().
- */
- static int logged = 0;
- int disabled_by_module = server.cluster_module_trim_disablers > 0;
-
- if (isPausedActions(PAUSE_ACTION_CLIENT_WRITE) ||
- isPausedActions(PAUSE_ACTION_CLIENT_ALL) ||
- isPausedActions(PAUSE_ACTION_REPLICA) ||
- disabled_by_module)
- {
- if (logged == 0) {
- logged = 1;
- const char *reason = disabled_by_module ? "trim is disabled by module" :
- "pause action is in effect";
- serverLog(LL_NOTICE, "Trim job is deferred since %s.", reason);
- }
- return;
- }
- logged = 0;
-
- listIter li;
- listNode *ln;
- listRewind(asmManager->pending_trim_jobs, &li);
- while ((ln = listNext(&li)) != NULL) {
- slotRangeArray *slots = listNodeValue(ln);
- asmTrimSlots(slots);
- propagateTrimSlots(slots);
- listDelNode(asmManager->pending_trim_jobs, ln);
- slotRangeArrayFree(slots);
- }
-}
-
-/* Trim keys in slots not owned by this node (if any). */
-void asmTrimSlotsIfNotOwned(slotRangeArray *slots) {
- if (!server.cluster_enabled || !clusterNodeIsMaster(getMyClusterNode())) return;
-
- size_t num_keys = 0;
- slotRangeArray *trim_slots = NULL;
- for (int i = 0; i < slots->num_ranges; i++) {
- for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) {
- if (clusterIsMySlot(j) ||
- kvstoreDictSize(server.db[0].keys, j) == 0 ||
- isSlotInTrimJob(j))
- {
- continue;
- }
-
- trim_slots = slotRangeArrayAppend(trim_slots, j);
- num_keys += kvstoreDictSize(server.db[0].keys, j);
- }
- }
- if (!trim_slots) return;
-
- sds str = slotRangeArrayToString(trim_slots);
- serverLog(LL_NOTICE,
- "Detected keys in slots that do not belong to this node. "
- "Scheduling trim for %zu keys in slots: %s", num_keys, str);
- sdsfree(str);
-
- asmTrimJobSchedule(trim_slots);
- slotRangeArrayFree(trim_slots);
-}
-
-/* Handle the master task when it is no longer used, trim unowned slots if necessary.
- * This function is called when the replica is just promoted to master. */
-void asmFinalizeMasterTask(void) {
- if (!server.cluster_enabled) return;
-
- asmTask *task = asmManager->master_task;
- if (task == NULL) return;
-
- if (task->operation == ASM_IMPORT) {
- /* Check if there is an ASM task that master did not finish. */
- if (task->state != ASM_COMPLETED && task->state != ASM_FAILED) {
- sds slots_str = slotRangeArrayToString(task->slots);
- serverLog(LL_WARNING, "Import task %s from old master failed: slots=%s",
- task->id, slots_str);
- sdsfree(slots_str);
- /* Mark the task as failed and notify the replicas. */
- task->state = ASM_FAILED;
- asmNotifyStateChange(task, ASM_EVENT_IMPORT_FAILED);
- }
-
- /* Trim the slots if the import task is failed. */
- if (clusterNodeIsMaster(getMyClusterNode()) && task->state == ASM_FAILED) {
- asmTrimSlotsIfNotOwned(task->slots);
- }
- } else if (task->operation == ASM_MIGRATE) {
- /* For migrate tasks, attempt to trim slots if necessary. After ASM completed,
- * the previous master may not have initiated slot trimming before the failover
- * occurred. In that case, we need to initiate slot trimming here.
- * However, if ASM failed, slot ownership did not change, so no slot trimming
- * is needed. */
- if (clusterNodeIsMaster(getMyClusterNode()) && task->state != ASM_FAILED) {
- asmTrimSlotsIfNotOwned(task->slots);
- }
- }
-
- /* Clear the master task since it is not a replica anymore. */
- asmTaskFree(asmManager->master_task);
- asmManager->master_task = NULL;
-}
-
-/* The replicas handle the master import ASM task information. */
-int asmReplicaHandleMasterTask(sds task_info) {
- if (!server.cluster_enabled || !clusterNodeIsSlave(getMyClusterNode())) return C_ERR;
-
- /* If the master task is migrating, just clear it when receiving a new task info,
- * even the task info is empty since it means the master finished the task. */
- if (asmManager->master_task && asmManager->master_task->operation == ASM_MIGRATE) {
- asmTaskFree(asmManager->master_task);
- asmManager->master_task = NULL;
- }
-
- /* If the master task is empty, it means the master finished the task, the
- * replica should check the slot ownership to decide to raise completed or
- * failed event. */
- if (!task_info || sdslen(task_info) == 0) {
- asmTask *task = asmManager->master_task;
- if (task && task->state != ASM_COMPLETED && task->state != ASM_FAILED) {
- /* Check if the slots are owned by the master. */
- int owned_by_master = 1;
- for (int i = 0; i < task->slots->num_ranges; i++) {
- slotRange *sr = &task->slots->ranges[i];
- for (int j = sr->start; j <= sr->end; j++) {
- clusterNode *master = clusterNodeGetMaster(getMyClusterNode());
- if (!master || !clusterNodeCoversSlot(master, j)) {
- owned_by_master = 0;
- break;
- }
- }
- }
- if (owned_by_master) {
- task->state = ASM_COMPLETED;
- asmNotifyStateChange(task, ASM_EVENT_IMPORT_COMPLETED);
- } else {
- task->state = ASM_FAILED;
- asmNotifyStateChange(task, ASM_EVENT_IMPORT_FAILED);
- }
- }
- return C_OK;
- }
-
- asmTask *task = asmTaskDeserialize(task_info);
- if (!task) return C_ERR;
-
- /* For migrate task, replica just keeps the task info, doesn't notify any event. */
- if (task->operation == ASM_MIGRATE) {
- if (asmManager->master_task) asmTaskFree(asmManager->master_task);
- asmManager->master_task = task;
- return C_OK;
- }
-
- int notify_event = 0;
- int event = asmTaskStateToEvent(task);
- if (asmManager->master_task) {
- /* Notify when the task or event is changed, to avoid duplicated notification. */
- if (strcmp(task->id, asmManager->master_task->id) != 0 ||
- event != asmTaskStateToEvent(asmManager->master_task))
- {
- notify_event = 1;
- }
- asmTaskFree(asmManager->master_task);
- } else {
- /* Ignore completed or failed task when there is no active master task. */
- if (task->state != ASM_FAILED && task->state != ASM_COMPLETED)
- notify_event = 1;
- }
-
- asmManager->master_task = task;
- if (notify_event) asmNotifyStateChange(task, event);
- return C_OK;
-}
-
-/* Cancel all pending trim jobs. */
-void asmCancelPendingTrimJobs(void) {
- if (!asmManager) return;
-
- listIter li;
- listNode *ln;
- listRewind(asmManager->pending_trim_jobs, &li);
- while ((ln = listNext(&li)) != NULL) {
- slotRangeArray *slots = listNodeValue(ln);
- listDelNode(asmManager->pending_trim_jobs, ln);
- sds str = slotRangeArrayToString(slots);
- serverLog(LL_NOTICE, "Cancelling the pending trim job for slots: %s", str);
- sdsfree(str);
- slotRangeArrayFree(slots);
- }
-}
-
-/* Cancel all pending and active trim jobs. */
-void asmCancelTrimJobs(void) {
- if (!asmManager) return;
-
- /* Unblock master if blocked */
- asmUnblockMasterAfterTrim();
-
- /* Cancel pending trim jobs */
- asmCancelPendingTrimJobs();
-
- /* Cancel active trim jobs */
- if (listLength(asmManager->active_trim_jobs) == 0)
- return;
-
- serverLog(LL_NOTICE, "Cancelling all active trim jobs");
- asmManager->active_trim_cancelled += listLength(asmManager->active_trim_jobs);
- asmActiveTrimEnd();
- listEmpty(asmManager->active_trim_jobs);
-}
-
-/* It's used to trim slots after the migration is completed or import is failed.
- * TRIMSLOTS RANGES <numranges> <start-slot> <end-slot> ... */
-void trimslotsCommand(client *c) {
- long numranges = 0;
-
- if (server.cluster_enabled == 0) {
- addReplyError(c,"This instance has cluster support disabled");
- return;
- }
-
- if (c->argc < 5) {
- addReplyErrorArity(c);
- return;
- }
-
- /* Validate the ranges argument */
- if (strcasecmp(c->argv[1]->ptr, "ranges") != 0) {
- addReplyError(c, "missing ranges argument");
- return;
- }
-
- /* Get the number of ranges */
- if (getLongFromObjectOrReply(c, c->argv[2], &numranges, NULL) != C_OK)
- return;
-
- /* Validate the number of ranges and argument count */
- if (numranges < 1 || numranges > CLUSTER_SLOTS || c->argc != 3 + numranges * 2) {
- addReplyError(c, "invalid number of ranges");
- return;
- }
-
- /* Parse the slot ranges and start trimming */
- slotRangeArray *slots = parseSlotRangesOrReply(c, c->argc, 3);
- if (!slots) return;
-
- if (c->id == CLIENT_ID_AOF) {
- serverAssert(server.loading);
- /* If we are loading the AOF, we can't trigger active trim because next
- * command may have an update for the same key that is supposed to be
- * trimmed. We have to trim the keys synchronously. */
- clusterDelKeysInSlotRangeArray(slots, 1);
- } else {
- /* We cannot trim any slot served by this node. */
- if (clusterNodeIsMaster(getMyClusterNode())) {
- for (int i = 0; i < slots->num_ranges; i++) {
- for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) {
- if (clusterCanAccessKeysInSlot(j)) {
- addReplyErrorFormat(c, "the slot %d is served by this node", j);
- slotRangeArrayFree(slots);
- return;
- }
- }
- }
- }
- asmTrimSlots(slots);
- }
-
- /* Command will not be propagated automatically since it does not modify
- * the dataset. */
- forceCommandPropagation(c, PROPAGATE_REPL | PROPAGATE_AOF);
-
- slotRangeArrayFree(slots);
- addReply(c, shared.ok);
-}
-
-/* Start the active trim job. */
-void asmActiveTrimStart(void) {
- slotRangeArray *slots = listNodeValue(listFirst(asmManager->active_trim_jobs));
-
- serverAssert(asmManager->active_trim_it == NULL);
- asmManager->active_trim_it = slotRangeArrayGetIterator(slots);
- asmManager->active_trim_started++;
- asmManager->active_trim_current_job_keys = 0;
- asmManager->active_trim_current_job_trimmed = 0;
-
- /* Count the number of keys to trim */
- asmManager->active_trim_current_job_keys += asmCountKeysInSlots(slots);
-
- RedisModuleClusterSlotMigrationTrimInfoV1 fsi = {
- REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION,
- (RedisModuleSlotRangeArray *) slots
- };
-
- moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM,
- REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_STARTED,
- &fsi);
-
- sds str = slotRangeArrayToString(slots);
- serverLog(LL_NOTICE, "Active trim initiated for slots: %s, to trim %llu keys.",
- str, asmManager->active_trim_current_job_keys);
- sdsfree(str);
-}
-
-/* Schedule an active trim job. */
-void asmTriggerActiveTrim(slotRangeArray *slots) {
- listAddNodeTail(asmManager->active_trim_jobs, slotRangeArrayDup(slots));
- sds str = slotRangeArrayToString(slots);
- serverLog(LL_NOTICE, "Active trim scheduled for slots: %s", str);
- sdsfree(str);
-
- /* Start an active trim job if no active trim job is running. */
- if (asmManager->active_trim_it == NULL) {
- serverAssert(listLength(asmManager->active_trim_jobs) > 0);
- asmActiveTrimStart();
- }
-}
-
-/* End the active trim job. */
-void asmActiveTrimEnd(void) {
- slotRangeArray *slots = listNodeValue(listFirst(asmManager->active_trim_jobs));
-
- if (asmManager->active_trim_it) {
- slotRangeArrayIteratorFree(asmManager->active_trim_it);
- asmManager->active_trim_it = NULL;
- }
-
- /* Unblock the master if it is blocked */
- asmUnblockMasterAfterTrim();
-
- RedisModuleClusterSlotMigrationTrimInfoV1 fsi = {
- REDISMODULE_CLUSTER_SLOT_MIGRATION_TRIMINFO_VERSION,
- (RedisModuleSlotRangeArray *) slots
- };
-
- moduleFireServerEvent(REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM,
- REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_COMPLETED,
- &fsi);
-
- sds str = slotRangeArrayToString(slots);
- serverLog(LL_NOTICE, "Active trim completed for slots: %s, %llu keys trimmed.",
- str, asmManager->active_trim_current_job_trimmed);
- sdsfree(str);
- listDelNode(asmManager->active_trim_jobs, listFirst(asmManager->active_trim_jobs));
- asmManager->active_trim_completed++;
-}
-
-/* Check if the slot range array overlaps with any trim job. */
-int asmIsAnyTrimJobOverlaps(slotRangeArray *slots) {
- if (!asmIsTrimInProgress()) return 0;
- for (int i = 0; i < slots->num_ranges; i++) {
- for (int j = slots->ranges[i].start; j <= slots->ranges[i].end; j++) {
- if (isSlotInTrimJob(j)) return 1;
- }
- }
- return 0;
-}
-
-/* Check if there is any trim job in progress. */
-int asmIsTrimInProgress(void) {
- if (!server.cluster_enabled) return 0;
- return (listLength(asmManager->active_trim_jobs) != 0 ||
- listLength(asmManager->pending_trim_jobs) != 0);
-}
-
-
-/* Check if the command is accessing keys in a slot being trimmed.
- * Return the slot if found, otherwise return -1. */
-int asmGetTrimmingSlotForCommand(struct redisCommand *cmd, robj **argv, int argc) {
- if (!asmIsTrimInProgress()) return -1;
-
- /* Get the keys from the command */
- getKeysResult result = GETKEYS_RESULT_INIT;
- int numkeys = getKeysFromCommand(cmd, argv, argc, &result);
-
- int last_checked_slot = -1;
- for (int j = 0; j < numkeys; j++) {
- robj *key = argv[result.keys[j].pos];
- int slot = keyHashSlot((char*) key->ptr, sdslen(key->ptr));
- if (slot == last_checked_slot) continue;
- if (isSlotInTrimJob(slot)) {
- getKeysFreeResult(&result);
- return slot;
- }
- last_checked_slot = slot;
- }
- getKeysFreeResult(&result);
- return -1;
-}
-
-/* Delete the key and notify the modules. */
-void asmActiveTrimDeleteKey(redisDb *db, robj *keyobj) {
- if (asmManager->debug_active_trim_delay > 0)
- debugDelay(asmManager->debug_active_trim_delay);
-
- /* The key needs to be converted from static to heap before deletion. */
- int static_key = keyobj->refcount == OBJ_STATIC_REFCOUNT;
- if (static_key) keyobj = createStringObject(keyobj->ptr, sdslen(keyobj->ptr));
-
- dbDelete(db, keyobj);
- keyModified(NULL, db, keyobj, NULL, 1);
- /* The keys are not actually logically deleted from the database, just moved
- * to another node. The modules need to know that these keys are no longer
- * available locally, so just send the keyspace notification to the modules,
- * but not to clients. */
- moduleNotifyKeyspaceEvent(NOTIFY_KEY_TRIMMED, "key_trimmed", keyobj, db->id);
- asmManager->active_trim_current_job_trimmed++;
-
- if (static_key) decrRefCount(keyobj);
-}
-
-/* Trim keys in the active trim job. */
-void asmActiveTrimCycle(void) {
- if (asmManager->debug_active_trim_delay < 0 ||
- listLength(asmManager->active_trim_jobs) == 0)
- {
- return;
- }
-
- /* Verify client pause is not in effect and trim is not disabled by module,
- * so we can delete keys. */
- static int blocked = 0;
- int disabled_by_module = server.cluster_module_trim_disablers > 0;
- if (isPausedActions(PAUSE_ACTION_CLIENT_ALL) ||
- isPausedActions(PAUSE_ACTION_CLIENT_WRITE) ||
- disabled_by_module)
- {
- if (blocked == 0) {
- blocked = 1;
- const char *reason = disabled_by_module ? "trim is disabled by module" :
- "pause action is in effect";
- serverLog(LL_NOTICE, "Active trim cycle is blocked since %s.", reason);
- }
- return;
- }
- if (blocked) serverLog(LL_NOTICE, "Active trim cycle is unblocked.");
- blocked = 0;
-
- /* This works in a similar way to activeExpireCycle, in the sense that
- * we do incremental work across calls. */
- const int trim_cycle_time_perc = 25;
- int time_exceeded = 0;
- long long start = ustime(), timelimit;
- unsigned long long num_deleted = 0;
-
- /* Calculate the time limit in microseconds for this cycle. */
- timelimit = 1000000 * trim_cycle_time_perc / server.hz / 100;
- if (timelimit <= 0) timelimit = 1;
-
- serverAssert(asmManager->active_trim_it);
- int slot = slotRangeArrayGetCurrentSlot(asmManager->active_trim_it);
-
- while (!time_exceeded && slot != -1) {
- dictEntry *de;
- kvstoreDictIterator kvs_di;
- kvstoreInitDictSafeIterator(&kvs_di, server.db[0].keys, slot);
- while ((de = kvstoreDictIteratorNext(&kvs_di)) != NULL) {
- kvobj *kv = dictGetKV(de);
- sds sdskey = kvobjGetKey(kv);
-
- enterExecutionUnit(1, 0);
- robj *keyobj = createStringObject(sdskey, sdslen(sdskey));
- asmActiveTrimDeleteKey(&server.db[0], keyobj);
- decrRefCount(keyobj);
- exitExecutionUnit();
- postExecutionUnitOperations();
- num_deleted++;
-
- /* Once in 32 deletions check if we reached the time limit. */
- if (num_deleted % 32 == 0 && (ustime() - start) > timelimit) {
- time_exceeded = 1;
- break;
- }
- }
- kvstoreResetDictIterator(&kvs_di);
- if (!time_exceeded) slot = slotRangeArrayNext(asmManager->active_trim_it);
- }
-
- if (slot == -1) {
-#if defined(USE_JEMALLOC)
- jemalloc_purge();
-#endif
- asmActiveTrimEnd();
-
- /* Immediately start the next trim job upon completion of the current
- * one. Eliminates gaps in notifications so modules are informed about
- * trimming unowned keys, which is important for modules that
- * continuously filter unowned keys from their replies. */
- if (listLength(asmManager->active_trim_jobs) != 0)
- asmActiveTrimStart();
- }
-}
-
-/* Check if the key in a trim job. */
-int asmIsKeyInTrimJob(sds keyname) {
- if (!asmIsTrimInProgress() || !isSlotInTrimJob(getKeySlot(keyname)))
- return 0;
- return 1;
-}
-
-/* Modules can use RM_ClusterPropagateForSlotMigration() during the
- * CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE event to propagate commands
- * that should be delivered just before the slot snapshot delivery starts. */
-int asmModulePropagateBeforeSlotSnapshot(struct redisCommand *cmd, robj **argv, int argc) {
- /* This API is only called in the fork child. */
- if (server.cluster_enabled == 0 ||
- server.in_fork_child != CHILD_TYPE_RDB ||
- listLength(asmManager->tasks) == 0)
- {
- errno = EBADF;
- return C_ERR;
- }
-
- /* Check if the task state is right. */
- asmTask *task = listNodeValue(listFirst(asmManager->tasks));
- if (task->operation != ASM_MIGRATE ||
- task->state != ASM_SEND_BULK_AND_STREAM ||
- task->pre_snapshot_module_cmds == NULL)
- {
- errno = EBADF;
- return C_ERR;
- }
-
- /* Ensure all arguments are converted to string encoding if necessary,
- * since getSlotFromCommand expects them to be string-encoded. */
- for (int i = 0; i < argc; i++) {
- if (!sdsEncodedObject(argv[i])) {
- serverAssert(argv[i]->encoding == OBJ_ENCODING_INT);
- robj *old = argv[i];
- argv[i] = createStringObjectFromLongLongWithSds((long)old->ptr);
- decrRefCount(old);
- }
- }
-
- /* Crossslot commands are not allowed */
- int slot = getSlotFromCommand(cmd, argv, argc);
- if (slot == CLUSTER_CROSSSLOT) {
- errno = ENOTSUP;
- return C_ERR;
- }
-
- /* Allow no-keys commands or if keys are in the slot range. */
- slotRange sr = {slot, slot};
- if (slot != INVALID_CLUSTER_SLOT && !slotRangeArrayOverlaps(task->slots, &sr)) {
- errno = ERANGE;
- return C_ERR;
- }
-
- robj **argvcopy = zmalloc(sizeof(robj*) * argc);
- for (int i = 0; i < argc; i++) {
- argvcopy[i] = argv[i];
- incrRefCount(argv[i]);
- }
-
- redisOpArrayAppend(task->pre_snapshot_module_cmds, 0, argvcopy, argc, 0);
- return C_OK;
-}