diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
| commit | 5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch) | |
| tree | 1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/tests/modules/atomicslotmigration.c | |
| parent | c7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff) | |
| download | crep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz | |
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/tests/modules/atomicslotmigration.c')
| -rw-r--r-- | examples/redis-unstable/tests/modules/atomicslotmigration.c | 594 |
1 files changed, 594 insertions, 0 deletions
diff --git a/examples/redis-unstable/tests/modules/atomicslotmigration.c b/examples/redis-unstable/tests/modules/atomicslotmigration.c new file mode 100644 index 0000000..83393cd --- /dev/null +++ b/examples/redis-unstable/tests/modules/atomicslotmigration.c @@ -0,0 +1,594 @@ +#include "redismodule.h" + +#include <stdlib.h> +#include <memory.h> +#include <errno.h> + +#define MAX_EVENTS 1024 + +/* Log of cluster events. */ +const char *clusterEventLog[MAX_EVENTS]; +int numClusterEvents = 0; + +/* Log of cluster trim events. */ +const char *clusterTrimEventLog[MAX_EVENTS]; +int numClusterTrimEvents = 0; + +/* Log of last deleted key event. */ +const char *lastDeletedKeyLog = NULL; + +/* Flag to disable trim. */ +int disableTrimFlag = 0; + +int replicateModuleCommand = 0; /* Enable or disable module command replication. */ +RedisModuleString *moduleCommandKeyName = NULL; /* Key name to replicate. */ +RedisModuleString *moduleCommandKeyVal = NULL; /* Key value to replicate. */ + +/* Enable or disable module command replication. */ +int replicate_module_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 4) { + RedisModule_ReplyWithError(ctx, "ERR wrong number of arguments"); + return REDISMODULE_OK; + } + + long long enable = 0; + if (RedisModule_StringToLongLong(argv[1], &enable) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "ERR enable value"); + return REDISMODULE_OK; + } + replicateModuleCommand = (enable != 0); + + /* Set the key name and value to replicate. */ + if (moduleCommandKeyName) RedisModule_FreeString(ctx, moduleCommandKeyName); + if (moduleCommandKeyVal) RedisModule_FreeString(ctx, moduleCommandKeyVal); + moduleCommandKeyName = RedisModule_CreateStringFromString(ctx, argv[2]); + moduleCommandKeyVal = RedisModule_CreateStringFromString(ctx, argv[3]); + + RedisModule_ReplyWithSimpleString(ctx, "OK"); + return REDISMODULE_OK; +} + +int lpush_and_replicate_crossslot_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 3) return RedisModule_WrongArity(ctx); + + /* LPUSH */ + RedisModuleCallReply *rep = RedisModule_Call(ctx, "LPUSH", "!ss", argv[1], argv[2]); + RedisModule_Assert(RedisModule_CallReplyType(rep) != REDISMODULE_REPLY_ERROR); + RedisModule_FreeCallReply(rep); + + /* Replicate cross slot command */ + int ret = RedisModule_Replicate(ctx, "MSET", "cccccc", "key1", "val1", "key2", "val2", "key3", "val3"); + RedisModule_Assert(ret == REDISMODULE_OK); + + RedisModule_ReplyWithSimpleString(ctx, "OK"); + return REDISMODULE_OK; +} + +int testClusterGetLocalSlotRanges(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + static int use_auto_memory = 0; + use_auto_memory = !use_auto_memory; + + RedisModuleSlotRangeArray *slots; + if (use_auto_memory) { + RedisModule_AutoMemory(ctx); + slots = RedisModule_ClusterGetLocalSlotRanges(ctx); + } else { + slots = RedisModule_ClusterGetLocalSlotRanges(NULL); + } + + RedisModule_ReplyWithArray(ctx, slots->num_ranges); + for (int i = 0; i < slots->num_ranges; i++) { + RedisModule_ReplyWithArray(ctx, 2); + RedisModule_ReplyWithLongLong(ctx, slots->ranges[i].start); + RedisModule_ReplyWithLongLong(ctx, slots->ranges[i].end); + } + if (!use_auto_memory) + RedisModule_ClusterFreeSlotRanges(NULL, slots); + return REDISMODULE_OK; +} + +/* Helper function to check if a slot range array contains a given slot. */ +int slotRangeArrayContains(RedisModuleSlotRangeArray *sra, unsigned int slot) { + for (int i = 0; i < sra->num_ranges; i++) + if (sra->ranges[i].start <= slot && sra->ranges[i].end >= slot) + return 1; + return 0; +} + +/* Sanity check. */ +int sanity(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + RedisModule_Assert(RedisModule_ClusterCanAccessKeysInSlot(-1) == 0); + RedisModule_Assert(RedisModule_ClusterCanAccessKeysInSlot(16384) == 0); + RedisModule_Assert(RedisModule_ClusterCanAccessKeysInSlot(100000) == 0); + + /* Call with invalid args. */ + errno = 0; + RedisModule_Assert(RedisModule_ClusterPropagateForSlotMigration(NULL, NULL, NULL) == REDISMODULE_ERR); + RedisModule_Assert(errno == EINVAL); + + /* Call with invalid args. */ + errno = 0; + RedisModule_Assert(RedisModule_ClusterPropagateForSlotMigration(ctx, NULL, NULL) == REDISMODULE_ERR); + RedisModule_Assert(errno == EINVAL); + + /* Call with invalid args. */ + errno = 0; + RedisModule_Assert(RedisModule_ClusterPropagateForSlotMigration(NULL, "asm.keyless_cmd", "") == REDISMODULE_ERR); + RedisModule_Assert(errno == EINVAL); + + /* Call outside of slot migration. */ + errno = 0; + RedisModule_Assert(RedisModule_ClusterPropagateForSlotMigration(ctx, "asm.keyless_cmd", "") == REDISMODULE_ERR); + RedisModule_Assert(errno == EBADF); + + RedisModule_ReplyWithSimpleString(ctx, "OK"); + return REDISMODULE_OK; +} + +/* Command to test RM_ClusterCanAccessKeysInSlot(). */ +int testClusterCanAccessKeysInSlot(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argc); + long long slot = 0; + + if (RedisModule_StringToLongLong(argv[1],&slot) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx,"ERR invalid slot"); + } + RedisModule_ReplyWithLongLong(ctx, RedisModule_ClusterCanAccessKeysInSlot(slot)); + return REDISMODULE_OK; +} + +/* Generate a string representation of the info struct and subevent. + e.g. 'sub: cluster-slot-migration-import-started, task_id: aeBd..., slots: 0-100,200-300' */ +const char *clusterAsmInfoToString(RedisModuleClusterSlotMigrationInfo *info, uint64_t sub) { + char buf[1024] = {0}; + + if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_STARTED) + snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-import-started, "); + else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_FAILED) + snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-import-failed, "); + else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_COMPLETED) + snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-import-completed, "); + else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_STARTED) + snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-migrate-started, "); + else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_FAILED) + snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-migrate-failed, "); + else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED) + snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-migrate-completed, "); + else { + RedisModule_Assert(0); + } + snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), "source_node_id:%.40s, destination_node_id:%.40s, ", + info->source_node_id, info->destination_node_id); + snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), "task_id:%s, slots:", info->task_id); + for (int i = 0; i < info->slots->num_ranges; i++) { + RedisModuleSlotRange *sr = &info->slots->ranges[i]; + snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), "%d-%d", sr->start, sr->end); + if (i != info->slots->num_ranges - 1) + snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), ","); + } + return RedisModule_Strdup(buf); +} + +/* Generate a string representation of the info struct and subevent. + e.g. 'sub: cluster-slot-migration-trim-started, task_id: aeBd..., slots:0-100,200-300' */ +const char *clusterTrimInfoToString(RedisModuleClusterSlotMigrationTrimInfo *info, uint64_t sub) { + RedisModule_Assert(info); + char buf[1024] = {0}; + + if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_BACKGROUND) + snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-trim-background, "); + else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_STARTED) + snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-trim-started, "); + else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_COMPLETED) + snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-trim-completed, "); + else { + RedisModule_Assert(0); + } + snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), "slots:"); + for (int i = 0; i < info->slots->num_ranges; i++) { + RedisModuleSlotRange *sr = &info->slots->ranges[i]; + snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), "%d-%d", sr->start, sr->end); + if (i != info->slots->num_ranges - 1) + snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), ","); + } + return RedisModule_Strdup(buf); +} + +static void testReplicatingOutsideSlotRange(RedisModuleCtx *ctx, RedisModuleClusterSlotMigrationInfo *info) { + int slot = 0; + while (slot >= 0 && slot <= 16383) { + if (!slotRangeArrayContains(info->slots, slot)) { + break; + } + slot++; + } + char buf[128] = {0}; + const char *prefix = RedisModule_ClusterCanonicalKeyNameInSlot(slot); + snprintf(buf, sizeof(buf), "{%s}%s", prefix, "modulekey"); + errno = 0; + int ret = RedisModule_ClusterPropagateForSlotMigration(ctx, "SET", "cc", buf, "value"); + RedisModule_Assert(ret == REDISMODULE_ERR); + RedisModule_Assert(errno == ERANGE); +} + +static void testReplicatingCrossslotCommand(RedisModuleCtx *ctx) { + errno = 0; + int ret = RedisModule_ClusterPropagateForSlotMigration(ctx, "MSET", "cccccc", "key1", "val1", "key2", "val2", "key3", "val3"); + RedisModule_Assert(ret == REDISMODULE_ERR); + RedisModule_Assert(errno == ENOTSUP); +} + +static void testReplicatingUnknownCommand(RedisModuleCtx *ctx) { + errno = 0; + int ret = RedisModule_ClusterPropagateForSlotMigration(ctx, "unknowncommand", ""); + RedisModule_Assert(ret == REDISMODULE_ERR); + RedisModule_Assert(errno == ENOENT); +} + +static void testNonFatalScenarios(RedisModuleCtx *ctx, RedisModuleClusterSlotMigrationInfo *info) { + testReplicatingOutsideSlotRange(ctx, info); + testReplicatingCrossslotCommand(ctx); + testReplicatingUnknownCommand(ctx); +} + +int disableTrimCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + disableTrimFlag = 1; + /* Only disable when MIGRATE_COMPLETED for simulating recommended usage. */ + // RedisModule_ClusterDisableTrim(ctx) + RedisModule_ReplyWithSimpleString(ctx, "OK"); + return REDISMODULE_OK; +} + +int enableTrimCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + disableTrimFlag = 0; + RedisModule_Assert(RedisModule_ClusterEnableTrim(ctx) == REDISMODULE_OK); + RedisModule_ReplyWithSimpleString(ctx, "OK"); + return REDISMODULE_OK; +} + +int trimInProgressCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + uint64_t flags = RedisModule_GetContextFlags(ctx); + RedisModule_ReplyWithLongLong(ctx, !!(flags & REDISMODULE_CTX_FLAGS_TRIM_IN_PROGRESS)); + return REDISMODULE_OK; +} + +void clusterEventCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { + REDISMODULE_NOT_USED(ctx); + int ret; + + RedisModule_Assert(RedisModule_IsSubEventSupported(e, sub)); + + if (e.id == REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION) { + RedisModuleClusterSlotMigrationInfo *info = data; + + if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE) { + /* Test some non-fatal scenarios. */ + testNonFatalScenarios(ctx, info); + + if (replicateModuleCommand == 0) return; + + /* Replicate a keyless command. */ + ret = RedisModule_ClusterPropagateForSlotMigration(ctx, "asm.keyless_cmd", ""); + RedisModule_Assert(ret == REDISMODULE_OK); + + /* Propagate configured key and value. */ + ret = RedisModule_ClusterPropagateForSlotMigration(ctx, "SET", "ss", moduleCommandKeyName, moduleCommandKeyVal); + RedisModule_Assert(ret == REDISMODULE_OK); + } else { + /* Log the event. */ + if (numClusterEvents >= MAX_EVENTS) return; + clusterEventLog[numClusterEvents++] = clusterAsmInfoToString(info, sub); + + if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED) { + /* If users ask to disable trim, we disable trim. */ + if (disableTrimFlag) { + RedisModule_Assert(RedisModule_ClusterDisableTrim(ctx) == REDISMODULE_OK); + } + } + } + } +} + +int getPendingTrimKeyCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) { + RedisModule_ReplyWithError(ctx, "ERR wrong number of arguments"); + return REDISMODULE_ERR; + } + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], + REDISMODULE_READ | REDISMODULE_OPEN_KEY_ACCESS_TRIMMED); + if (!key) { + RedisModule_ReplyWithNull(ctx); + return REDISMODULE_OK; + } + if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_STRING) { + RedisModule_ReplyWithError(ctx, "key is not a string"); + return REDISMODULE_ERR; + } + size_t len; + const char *value = RedisModule_StringDMA(key, &len, 0); + RedisModule_ReplyWithStringBuffer(ctx, value, len); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +void clusterTrimEventCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { + REDISMODULE_NOT_USED(ctx); + + RedisModule_Assert(RedisModule_IsSubEventSupported(e, sub)); + + if (e.id == REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM) { + /* Log the event. */ + if (numClusterTrimEvents >= MAX_EVENTS) return; + RedisModuleClusterSlotMigrationTrimInfo *info = data; + clusterTrimEventLog[numClusterTrimEvents++] = clusterTrimInfoToString(info, sub); + } +} + +static int keyspaceNotificationTrimmedCallback(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { + REDISMODULE_NOT_USED(ctx); + + RedisModule_Assert(type == REDISMODULE_NOTIFY_KEY_TRIMMED); + RedisModule_Assert(strcmp(event, "key_trimmed") == 0); + + if (numClusterTrimEvents >= MAX_EVENTS) return REDISMODULE_OK; + + /* Log the trimmed key event. */ + size_t len; + const char *key_str = RedisModule_StringPtrLen(key, &len); + + char buf[1024] = {0}; + snprintf(buf, sizeof(buf), "keyspace: key_trimmed, key: %s", key_str); + + clusterTrimEventLog[numClusterTrimEvents++] = RedisModule_Strdup(buf); + return REDISMODULE_OK; +} + +/* ASM.PARENT SET key value (just proxy to Redis SET) */ +static int asmParentSet(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 4) return RedisModule_WrongArity(ctx); + RedisModuleCallReply *reply = RedisModule_Call(ctx, "SET", "ss", argv[2], argv[3]); + if (!reply) return RedisModule_ReplyWithError(ctx, "ERR internal"); + RedisModule_ReplyWithCallReply(ctx, reply); + RedisModule_FreeCallReply(reply); + RedisModule_ReplicateVerbatim(ctx); + return REDISMODULE_OK; +} + +/* Clear both the cluster and trim event logs. */ +int clearEventLog(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + for (int i = 0; i < numClusterEvents; i++) + RedisModule_Free((void *)clusterEventLog[i]); + numClusterEvents = 0; + + for (int i = 0; i < numClusterTrimEvents; i++) + RedisModule_Free((void *)clusterTrimEventLog[i]); + numClusterTrimEvents = 0; + + RedisModule_ReplyWithSimpleString(ctx, "OK"); + return REDISMODULE_OK; +} + +/* Reply with the cluster event log. */ +int getClusterEventLog(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + RedisModule_ReplyWithArray(ctx, numClusterEvents); + for (int i = 0; i < numClusterEvents; i++) + RedisModule_ReplyWithStringBuffer(ctx, clusterEventLog[i], strlen(clusterEventLog[i])); + return REDISMODULE_OK; +} + +/* Reply with the cluster trim event log. */ +int getClusterTrimEventLog(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + RedisModule_ReplyWithArray(ctx, numClusterTrimEvents); + for (int i = 0; i < numClusterTrimEvents; i++) + RedisModule_ReplyWithStringBuffer(ctx, clusterTrimEventLog[i], strlen(clusterTrimEventLog[i])); + return REDISMODULE_OK; +} + +/* A keyless command to test module command replication. */ +int moduledata = 0; +int keylessCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + moduledata++; + RedisModule_ReplyWithLongLong(ctx, moduledata); + return REDISMODULE_OK; +} +int readkeylessCmdVal(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModule_ReplyWithLongLong(ctx, moduledata); + return REDISMODULE_OK; +} + +int subscribeTrimmedEvent(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(ctx); + if (argc != 2) + return RedisModule_WrongArity(ctx); + + long long subscribe = 0; + if (RedisModule_StringToLongLong(argv[1], &subscribe) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "ERR subscribe value"); + return REDISMODULE_OK; + } + + if (subscribe) { + /* Unsubscribe first to avoid duplicate subscription. */ + RedisModule_UnsubscribeFromKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_TRIMMED, keyspaceNotificationTrimmedCallback); + int ret = RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_TRIMMED, keyspaceNotificationTrimmedCallback); + RedisModule_Assert(ret == REDISMODULE_OK); + } else { + int ret = RedisModule_UnsubscribeFromKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_TRIMMED, keyspaceNotificationTrimmedCallback); + RedisModule_Assert(ret == REDISMODULE_OK); + } + RedisModule_ReplyWithSimpleString(ctx, "OK"); + return REDISMODULE_OK; +} + +void keyEventCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(e); + + if (sub == REDISMODULE_SUBEVENT_KEY_DELETED) { + RedisModuleKeyInfoV1 *ei = data; + RedisModuleKey *kp = ei->key; + RedisModuleString *key = (RedisModuleString *) RedisModule_GetKeyNameFromModuleKey(kp); + size_t keylen; + const char *keyname = RedisModule_StringPtrLen(key, &keylen); + + /* Verify value can be read. It will be used to verify key's value can + * be read in a trim callback. */ + size_t valuelen = 0; + const char *value = ""; + RedisModuleKey *mk = RedisModule_OpenKey(ctx, key, REDISMODULE_READ); + if (RedisModule_KeyType(mk) == REDISMODULE_KEYTYPE_STRING) { + value = RedisModule_StringDMA(mk, &valuelen, 0); + } + RedisModule_CloseKey(mk); + + char buf[1024] = {0}; + snprintf(buf, sizeof(buf), "keyevent: key: %.*s, value: %.*s", (int) keylen, keyname, (int)valuelen, value); + + if (lastDeletedKeyLog) RedisModule_Free((void *)lastDeletedKeyLog); + lastDeletedKeyLog = RedisModule_Strdup(buf); + } +} + +int getLastDeletedKey(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(ctx); + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (lastDeletedKeyLog) { + RedisModule_ReplyWithStringBuffer(ctx, lastDeletedKeyLog, strlen(lastDeletedKeyLog)); + } else { + RedisModule_ReplyWithNull(ctx); + } + return REDISMODULE_OK; +} + +int asmGetCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(ctx); + + if (argc != 2) return RedisModule_WrongArity(ctx); + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + if (key == NULL) { + RedisModule_ReplyWithNull(ctx); + return REDISMODULE_OK; + } + + RedisModule_Assert(RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_STRING); + size_t len; + const char *value = RedisModule_StringDMA(key, &len, 0); + RedisModule_ReplyWithStringBuffer(ctx, value, len); + RedisModule_CloseKey(key); + + return REDISMODULE_OK; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + if (RedisModule_Init(ctx, "asm", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.cluster_can_access_keys_in_slot", testClusterCanAccessKeysInSlot, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.clear_event_log", clearEventLog, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.get_cluster_event_log", getClusterEventLog, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.get_cluster_trim_event_log", getClusterTrimEventLog, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.keyless_cmd", keylessCmd, "write", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.disable_trim", disableTrimCmd, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.enable_trim", enableTrimCmd, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.read_pending_trim_key", getPendingTrimKeyCmd, "readonly", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.trim_in_progress", trimInProgressCmd, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.read_keyless_cmd_val", readkeylessCmdVal, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.sanity", sanity, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.subscribe_trimmed_event", subscribeTrimmedEvent, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.replicate_module_command", replicate_module_command, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.lpush_replicate_crossslot_command", lpush_and_replicate_crossslot_command, "write", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.cluster_get_local_slot_ranges", testClusterGetLocalSlotRanges, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.get_last_deleted_key", getLastDeletedKey, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.get", asmGetCommand, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "asm.parent", NULL, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + RedisModuleCommand *parent = RedisModule_GetCommand(ctx, "asm.parent"); + if (!parent) return REDISMODULE_ERR; + + /* Subcommand: ASM.PARENT SET (write) */ + if (RedisModule_CreateSubcommand(parent, "set", asmParentSet, "write fast", 2, 2, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_ClusterSlotMigration, clusterEventCallback) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_ClusterSlotMigrationTrim, clusterTrimEventCallback) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_TRIMMED, keyspaceNotificationTrimmedCallback) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_Key, keyEventCallback) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} |
