summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/cluster.h
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
commit5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch)
tree1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/src/cluster.h
parentc7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff)
downloadcrep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/src/cluster.h')
-rw-r--r--examples/redis-unstable/src/cluster.h354
1 files changed, 354 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/cluster.h b/examples/redis-unstable/src/cluster.h
new file mode 100644
index 0000000..7daf093
--- /dev/null
+++ b/examples/redis-unstable/src/cluster.h
@@ -0,0 +1,354 @@
+/*
+ * Copyright (c) 2009-Present, Redis Ltd.
+ * All rights reserved.
+ *
+ * Copyright (c) 2024-present, Valkey contributors.
+ * All rights reserved.
+ *
+ * Licensed under your choice of (a) the Redis Source Available License 2.0
+ * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
+ * GNU Affero General Public License v3 (AGPLv3).
+ *
+ * Portions of this file are available under BSD3 terms; see REDISCONTRIBUTIONS for more information.
+ */
+
+#ifndef __CLUSTER_H
+#define __CLUSTER_H
+
+/*-----------------------------------------------------------------------------
+ * Redis cluster exported API.
+ *----------------------------------------------------------------------------*/
+
+#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
+#define CLUSTER_SLOTS (1<<CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
+#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */
+#define INVALID_CLUSTER_SLOT (-1) /* Invalid slot number. */
+#define CLUSTER_CROSSSLOT (-2)
+#define CLUSTER_OK 0 /* Everything looks ok */
+#define CLUSTER_FAIL 1 /* The cluster can't work */
+#define CLUSTER_NAMELEN 40 /* sha1 hex length */
+
+/* Redirection errors returned by getNodeByQuery(). */
+#define CLUSTER_REDIR_NONE 0 /* Node can serve the request. */
+#define CLUSTER_REDIR_CROSS_SLOT 1 /* -CROSSSLOT request. */
+#define CLUSTER_REDIR_UNSTABLE 2 /* -TRYAGAIN redirection required */
+#define CLUSTER_REDIR_ASK 3 /* -ASK redirection required. */
+#define CLUSTER_REDIR_MOVED 4 /* -MOVED redirection required. */
+#define CLUSTER_REDIR_DOWN_STATE 5 /* -CLUSTERDOWN, global state. */
+#define CLUSTER_REDIR_DOWN_UNBOUND 6 /* -CLUSTERDOWN, unbound slot. */
+#define CLUSTER_REDIR_DOWN_RO_STATE 7 /* -CLUSTERDOWN, allow reads. */
+
+typedef struct _clusterNode clusterNode;
+struct clusterState;
+
+/* Flags that a module can set in order to prevent certain Redis Cluster
+ * features to be enabled. Useful when implementing a different distributed
+ * system on top of Redis Cluster message bus, using modules. */
+#define CLUSTER_MODULE_FLAG_NONE 0
+#define CLUSTER_MODULE_FLAG_NO_FAILOVER (1<<1)
+#define CLUSTER_MODULE_FLAG_NO_REDIRECTION (1<<2)
+
+/* ---------------------- API exported outside cluster.c -------------------- */
+
+/* We have 16384 hash slots. The hash slot of a given key is obtained
+ * as the least significant 14 bits of the crc16 of the key.
+ *
+ * However, if the key contains the {...} pattern, only the part between
+ * { and } is hashed. This may be useful in the future to force certain
+ * keys to be in the same node (assuming no resharding is in progress). */
+static inline unsigned int keyHashSlot(const char *key, int keylen) {
+ int s, e; /* start-end indexes of { and } */
+
+ for (s = 0; s < keylen; s++)
+ if (key[s] == '{') break;
+
+ /* No '{' ? Hash the whole key. This is the base case. */
+ if (likely(s == keylen)) return crc16(key,keylen) & 0x3FFF;
+
+ /* '{' found? Check if we have the corresponding '}'. */
+ for (e = s+1; e < keylen; e++)
+ if (key[e] == '}') break;
+
+ /* No '}' or nothing between {} ? Hash the whole key. */
+ if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
+
+ /* If we are here there is both a { and a } on its right. Hash
+ * what is in the middle between { and }. */
+ return crc16(key+s+1,e-s-1) & 0x3FFF;
+}
+
+/* functions requiring mechanism specific implementations */
+void clusterInit(void);
+void clusterInitLast(void);
+void clusterCommonInit(void);
+void clusterCron(void);
+void clusterBeforeSleep(void);
+void clusterClaimUnassignedSlots(void);
+int verifyClusterConfigWithData(void);
+
+int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len);
+
+void clusterUpdateMyselfFlags(void);
+void clusterUpdateMyselfIp(void);
+void clusterUpdateMyselfHostname(void);
+void clusterUpdateMyselfAnnouncedPorts(void);
+void clusterUpdateMyselfHumanNodename(void);
+
+void clusterPropagatePublish(robj *channel, robj *message, int sharded);
+
+unsigned long getClusterConnectionsCount(void);
+int isClusterHealthy(void);
+
+sds clusterGenNodesDescription(client *c, int filter, int tls_primary);
+sds genClusterInfoString(void);
+/* handle implementation specific debug cluster commands. Return 1 if handled, 0 otherwise. */
+int handleDebugClusterCommand(client *c);
+const char **clusterDebugCommandExtendedHelp(void);
+/* handle implementation specific cluster commands. Return 1 if handled, 0 otherwise. */
+int clusterCommandSpecial(client *c);
+const char** clusterCommandExtendedHelp(void);
+
+int clusterAllowFailoverCmd(client *c);
+void clusterPromoteSelfToMaster(void);
+int clusterManualFailoverTimeLimit(void);
+
+void clusterCommandSlots(client * c);
+void clusterCommandMyId(client *c);
+void clusterCommandMyShardId(client *c);
+
+sds clusterGenNodeDescription(client *c, clusterNode *node, int tls_primary);
+
+int clusterNodeCoversSlot(clusterNode *n, int slot);
+int getNodeDefaultClientPort(clusterNode *n);
+int clusterNodeIsMyself(clusterNode *n);
+clusterNode *getMyClusterNode(void);
+char *getMyClusterId(void);
+int getClusterSize(void);
+int getMyShardSlotCount(void);
+int clusterNodePending(clusterNode *node);
+char **getClusterNodesList(size_t *numnodes);
+int clusterNodeIsMaster(clusterNode *n);
+char *clusterNodeIp(clusterNode *node);
+int clusterNodeIsSlave(clusterNode *node);
+clusterNode *clusterNodeGetSlaveof(clusterNode *node);
+clusterNode *clusterNodeGetMaster(clusterNode *node);
+char *clusterNodeGetName(clusterNode *node);
+int clusterNodeTimedOut(clusterNode *node);
+int clusterNodeIsFailing(clusterNode *node);
+int clusterNodeIsNoFailover(clusterNode *node);
+char *clusterNodeGetShardId(clusterNode *node);
+int clusterNodeNumSlaves(clusterNode *node);
+clusterNode *clusterNodeGetSlave(clusterNode *node, int slave_idx);
+clusterNode *getMigratingSlotDest(int slot);
+clusterNode *getImportingSlotSource(int slot);
+clusterNode *getNodeBySlot(int slot);
+int clusterNodeClientPort(clusterNode *n, int use_tls);
+char *clusterNodeHostname(clusterNode *node);
+const char *clusterNodePreferredEndpoint(clusterNode *n);
+long long clusterNodeReplOffset(clusterNode *node);
+clusterNode *clusterLookupNode(const char *name, int length);
+const char *clusterGetSecret(size_t *len);
+unsigned int countKeysInSlot(unsigned int slot);
+int getSlotOrReply(client *c, robj *o);
+int clusterIsMySlot(int slot);
+int clusterCanAccessKeysInSlot(int slot);
+struct slotRangeArray *clusterGetLocalSlotRanges(void);
+
+/* functions with shared implementations */
+clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot,
+ getKeysResult *result, uint8_t read_error, uint64_t cmd_flags, int *error_code);
+int extractSlotFromKeysResult(robj **argv, getKeysResult *keys_result);
+int clusterRedirectBlockedClientIfNeeded(client *c);
+void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code);
+void migrateCloseTimedoutSockets(void);
+int patternHashSlot(char *pattern, int length);
+int isValidAuxString(char *s, unsigned int length);
+void migrateCommand(client *c);
+void clusterCommand(client *c);
+ConnectionType *connTypeOfCluster(void);
+
+typedef struct slotRange {
+ unsigned short start, end;
+} slotRange;
+typedef struct slotRangeArray {
+ int num_ranges;
+ slotRange ranges[];
+} slotRangeArray;
+typedef struct slotRangeArrayIter {
+ slotRangeArray *slots; /* the array we’re iterating */
+ int range_index; /* current range index */
+ int cur_slot; /* current slot within the range */
+} slotRangeArrayIter;
+slotRangeArray *slotRangeArrayCreate(int num_ranges);
+slotRangeArray *slotRangeArrayDup(slotRangeArray *slots);
+void slotRangeArraySet(slotRangeArray *slots, int idx, int start, int end);
+sds slotRangeArrayToString(slotRangeArray *slots);
+slotRangeArray *slotRangeArrayFromString(sds data);
+void slotRangeArraySortAndMerge(slotRangeArray *slots);
+int slotRangeArrayIsEqual(slotRangeArray *slots1, slotRangeArray *slots2);
+slotRangeArray *slotRangeArrayAppend(slotRangeArray *slots, int slot);
+int slotRangeArrayContains(slotRangeArray *slots, unsigned int slot);
+void slotRangeArrayFree(slotRangeArray *slots);
+void slotRangeArrayFreeGeneric(void *slots);
+slotRangeArrayIter *slotRangeArrayGetIterator(slotRangeArray *slots);
+int slotRangeArrayNext(slotRangeArrayIter *it);
+int slotRangeArrayGetCurrentSlot(slotRangeArrayIter *it);
+void slotRangeArrayIteratorFree(slotRangeArrayIter *it);
+int slotRangeArrayNormalizeAndValidate(slotRangeArray *slots, sds *err);
+slotRangeArray *parseSlotRangesOrReply(client *c, int argc, int pos);
+
+unsigned int clusterDelKeysInSlot(unsigned int hashslot, int by_command);
+unsigned int clusterDelKeysInSlotRangeArray(slotRangeArray *slots, int by_command);
+
+void clusterGenNodesSlotsInfo(int filter);
+void clusterFreeNodesSlotsInfo(clusterNode *n);
+int clusterNodeSlotInfoCount(clusterNode *n);
+uint16_t clusterNodeSlotInfoEntry(clusterNode *n, int idx);
+int clusterNodeHasSlotInfo(clusterNode *n);
+void resetClusterStats(void);
+
+int clusterGetShardCount(void);
+void *clusterGetShardIterator(void);
+void *clusterNextShardHandle(void *shard_iterator);
+void clusterFreeShardIterator(void *shard_iterator);
+int clusterGetShardNodeCount(void *shard);
+void *clusterShardHandleGetNodeIterator(void *shard);
+clusterNode *clusterShardNodeIteratorNext(void *node_iterator);
+void clusterShardNodeIteratorFree(void *node_iterator);
+clusterNode *clusterShardNodeFirst(void *shard);
+
+int clusterNodeTcpPort(clusterNode *node);
+int clusterNodeTlsPort(clusterNode *node);
+
+/* API for alternative cluster implementations to start and coordinate
+ * Atomic Slot Migration (ASM).
+ *
+ * These two functions drive ASM for alternative cluster implementations.
+ * - clusterAsmProcess(...) impl -> redis: initiates/advances/cancels ASM operations
+ * - clusterAsmOnEvent(...) redis -> impl: notifies state changes
+ *
+ * Generic steps for an alternative implementation:
+ * - On destination side, implementation calls clusterAsmProcess(ASM_EVENT_IMPORT_START)
+ * to start an import operation.
+ * - Redis calls clusterAsmOnEvent() when an ASM event occurs.
+ * - On the source side, Redis will call clusterAsmOnEvent(ASM_EVENT_HANDOFF_PREP)
+ * when slots are ready to be handed off and the write pause is needed.
+ * - Implementation stops the traffic to the slots and calls clusterAsmProcess(ASM_EVENT_HANDOFF)
+ * - On the destination side, Redis calls clusterAsmOnEvent(ASM_EVENT_TAKEOVER)
+ * when destination node is ready to take over the slot, waiting for ownership change.
+ * - Cluster implementation updates the config and calls clusterAsmProcess(ASM_EVENT_DONE)
+ * to notify Redis that the slots ownership has changed.
+ *
+ * Sequence diagram for import:
+ * - Note: shows only the events that cluster implementation needs to react.
+ *
+ * ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐
+ * │ Destination │ │ Destination │ │ Source │ │ Source │
+ * │ Cluster impl │ │ Master │ │ Master │ │ Cluster impl │
+ * └───────┬───────┘ └───────┬───────┘ └───────┬───────┘ └───────┬───────┘
+ * │ │ │ │
+ * │ ASM_EVENT_IMPORT_START │ │ │
+ * ├─────────────────────────────►│ │ │
+ * │ │ CLUSTER SYNCSLOTS <arg> │ │
+ * │ ├────────────────────────►│ │
+ * │ │ │ │
+ * │ │ SNAPSHOT(restore cmds) │ │
+ * │ │◄────────────────────────┤ │
+ * │ │ Repl stream │ │
+ * │ │◄────────────────────────┤ │
+ * │ │ │ ASM_EVENT_HANDOFF_PREP │
+ * │ │ ├────────────────────────────►│
+ * │ │ │ ASM_EVENT_HANDOFF │
+ * │ │ │◄────────────────────────────┤
+ * │ │ Drain repl stream │ │
+ * │ │◄────────────────────────┤ │
+ * │ ASM_EVENT_TAKEOVER │ │ │
+ * │◄─────────────────────────────┤ │ │
+ * │ │ │ │
+ * │ ASM_EVENT_DONE │ │ │
+ * ├─────────────────────────────►│ │ ASM_EVENT_DONE │
+ * │ │ │◄────────────────────────────┤
+ * │ │ │ │
+ */
+
+#define ASM_EVENT_IMPORT_START 1 /* Start a new import operation (destination side) */
+#define ASM_EVENT_CANCEL 2 /* Cancel an ongoing import/migrate operation (source and destination side) */
+#define ASM_EVENT_HANDOFF_PREP 3 /* Slot is ready to be handed off to the destination shard (source side) */
+#define ASM_EVENT_HANDOFF 4 /* Notify that the slot can be handed off (source side) */
+#define ASM_EVENT_TAKEOVER 5 /* Ready to take over the slot, waiting for config change (destination side) */
+#define ASM_EVENT_DONE 6 /* Notify that import/migrate is completed, config is updated (source and destination side) */
+
+#define ASM_EVENT_IMPORT_PREP 7 /* Import is about to start, the implementation may reject by returning C_ERR */
+#define ASM_EVENT_IMPORT_STARTED 8 /* Import started */
+#define ASM_EVENT_IMPORT_FAILED 9 /* Import failed */
+#define ASM_EVENT_IMPORT_COMPLETED 10 /* Import completed (config updated) */
+#define ASM_EVENT_MIGRATE_PREP 11 /* Migrate is about to start, the implementation may reject by returning C_ERR */
+#define ASM_EVENT_MIGRATE_STARTED 12 /* Migrate started */
+#define ASM_EVENT_MIGRATE_FAILED 13 /* Migrate failed */
+#define ASM_EVENT_MIGRATE_COMPLETED 14 /* Migrate completed (config updated) */
+
+
+/* Called by cluster implementation to request an ASM operation. (cluster impl --> redis)
+ * Valid values for 'event':
+ * ASM_EVENT_IMPORT_START
+ * ASM_EVENT_CANCEL
+ * ASM_EVENT_HANDOFF
+ * ASM_EVENT_DONE
+ *
+ * For ASM_EVENT_IMPORT_START, 'task_id' should be a unique string.
+ * For other events (ASM_EVENT_CANCEL, ASM_EVENT_HANDOFF, ASM_EVENT_DONE),
+ * 'task_id' should match the ID from the corresponding import operation.
+ * Usage:
+ * char *task_id = malloc(CLUSTER_NAMELEN + 1);
+ * getRandomHexChars(task_id, CLUSTER_NAMELEN);
+ * task_id[CLUSTER_NAMELEN] = '\0';
+ *
+ * slotRangeArray *slots = slotRangeArrayCreate(1);
+ * slotRangeArraySet(slots, 0, 0, 1000);
+ *
+ * const char *err = NULL;
+ * int ret = clusterAsmProcess(task_id, ASM_EVENT_IMPORT_START, slots, &err);
+ * zfree(task_id);
+ * slotRangeArrayFree(slots);
+ *
+ * if (ret != C_OK) {
+ * perror(err);
+ * return;
+ * }
+ *
+ * For ASM_EVENT_CANCEL, if `task_id` is NULL, all tasks will be cancelled.
+ * If `arg` parameter is provided, it should be a pointer to an int. It will be
+ * set to the number of tasks cancelled.
+ *
+ * Return value:
+ * - Returns C_OK on success, C_ERR on failure and 'err' will be set to the
+ * error message.
+ *
+ * Memory management:
+ * - There is no ownership transfer of 'task_id', 'err' or `slotRangeArray`.
+ * - `task_id` and `slotRangeArray` should be allocated and be freed by the
+ * caller. Redis internally will make a copy of these.
+ * - `err` is allocated by Redis and should NOT be freed by the caller.
+ **/
+int clusterAsmProcess(const char *task_id, int event, void *arg, char **err);
+
+/* Called when an ASM event occurs to notify the cluster implementation. (redis --> cluster impl)
+ *
+ * `arg` will point to a `slotRangeArray` for the following events:
+ * ASM_EVENT_IMPORT_PREP
+ * ASM_EVENT_IMPORT_STARTED
+ * ASM_EVENT_MIGRATE_PREP
+ * ASM_EVENT_MIGRATE_STARTED
+ * ASM_EVENT_HANDOFF_PREP
+ *
+ * Memory management:
+ * - Redis owns the `task_id` and `slotRangeArray`.
+ *
+ * Returns C_OK on success.
+ *
+ * If the cluster implementation returns C_ERR for ASM_EVENT_IMPORT_PREP or
+ * ASM_EVENT_MIGRATE_PREP, operation will not start.
+ **/
+int clusterAsmOnEvent(const char *task_id, int event, void *arg);
+
+#endif /* __CLUSTER_H */