summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/tests/modules/atomicslotmigration.c
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:40:55 +0100
commit5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch)
tree1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/tests/modules/atomicslotmigration.c
parentc7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff)
downloadcrep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/tests/modules/atomicslotmigration.c')
-rw-r--r--examples/redis-unstable/tests/modules/atomicslotmigration.c594
1 files changed, 594 insertions, 0 deletions
diff --git a/examples/redis-unstable/tests/modules/atomicslotmigration.c b/examples/redis-unstable/tests/modules/atomicslotmigration.c
new file mode 100644
index 0000000..83393cd
--- /dev/null
+++ b/examples/redis-unstable/tests/modules/atomicslotmigration.c
@@ -0,0 +1,594 @@
+#include "redismodule.h"
+
+#include <stdlib.h>
+#include <memory.h>
+#include <errno.h>
+
+#define MAX_EVENTS 1024
+
+/* Log of cluster events. */
+const char *clusterEventLog[MAX_EVENTS];
+int numClusterEvents = 0;
+
+/* Log of cluster trim events. */
+const char *clusterTrimEventLog[MAX_EVENTS];
+int numClusterTrimEvents = 0;
+
+/* Log of last deleted key event. */
+const char *lastDeletedKeyLog = NULL;
+
+/* Flag to disable trim. */
+int disableTrimFlag = 0;
+
+int replicateModuleCommand = 0; /* Enable or disable module command replication. */
+RedisModuleString *moduleCommandKeyName = NULL; /* Key name to replicate. */
+RedisModuleString *moduleCommandKeyVal = NULL; /* Key value to replicate. */
+
+/* Enable or disable module command replication. */
+int replicate_module_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 4) {
+ RedisModule_ReplyWithError(ctx, "ERR wrong number of arguments");
+ return REDISMODULE_OK;
+ }
+
+ long long enable = 0;
+ if (RedisModule_StringToLongLong(argv[1], &enable) != REDISMODULE_OK) {
+ RedisModule_ReplyWithError(ctx, "ERR enable value");
+ return REDISMODULE_OK;
+ }
+ replicateModuleCommand = (enable != 0);
+
+ /* Set the key name and value to replicate. */
+ if (moduleCommandKeyName) RedisModule_FreeString(ctx, moduleCommandKeyName);
+ if (moduleCommandKeyVal) RedisModule_FreeString(ctx, moduleCommandKeyVal);
+ moduleCommandKeyName = RedisModule_CreateStringFromString(ctx, argv[2]);
+ moduleCommandKeyVal = RedisModule_CreateStringFromString(ctx, argv[3]);
+
+ RedisModule_ReplyWithSimpleString(ctx, "OK");
+ return REDISMODULE_OK;
+}
+
+int lpush_and_replicate_crossslot_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 3) return RedisModule_WrongArity(ctx);
+
+ /* LPUSH */
+ RedisModuleCallReply *rep = RedisModule_Call(ctx, "LPUSH", "!ss", argv[1], argv[2]);
+ RedisModule_Assert(RedisModule_CallReplyType(rep) != REDISMODULE_REPLY_ERROR);
+ RedisModule_FreeCallReply(rep);
+
+ /* Replicate cross slot command */
+ int ret = RedisModule_Replicate(ctx, "MSET", "cccccc", "key1", "val1", "key2", "val2", "key3", "val3");
+ RedisModule_Assert(ret == REDISMODULE_OK);
+
+ RedisModule_ReplyWithSimpleString(ctx, "OK");
+ return REDISMODULE_OK;
+}
+
+int testClusterGetLocalSlotRanges(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ static int use_auto_memory = 0;
+ use_auto_memory = !use_auto_memory;
+
+ RedisModuleSlotRangeArray *slots;
+ if (use_auto_memory) {
+ RedisModule_AutoMemory(ctx);
+ slots = RedisModule_ClusterGetLocalSlotRanges(ctx);
+ } else {
+ slots = RedisModule_ClusterGetLocalSlotRanges(NULL);
+ }
+
+ RedisModule_ReplyWithArray(ctx, slots->num_ranges);
+ for (int i = 0; i < slots->num_ranges; i++) {
+ RedisModule_ReplyWithArray(ctx, 2);
+ RedisModule_ReplyWithLongLong(ctx, slots->ranges[i].start);
+ RedisModule_ReplyWithLongLong(ctx, slots->ranges[i].end);
+ }
+ if (!use_auto_memory)
+ RedisModule_ClusterFreeSlotRanges(NULL, slots);
+ return REDISMODULE_OK;
+}
+
+/* Helper function to check if a slot range array contains a given slot. */
+int slotRangeArrayContains(RedisModuleSlotRangeArray *sra, unsigned int slot) {
+ for (int i = 0; i < sra->num_ranges; i++)
+ if (sra->ranges[i].start <= slot && sra->ranges[i].end >= slot)
+ return 1;
+ return 0;
+}
+
+/* Sanity check. */
+int sanity(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ RedisModule_Assert(RedisModule_ClusterCanAccessKeysInSlot(-1) == 0);
+ RedisModule_Assert(RedisModule_ClusterCanAccessKeysInSlot(16384) == 0);
+ RedisModule_Assert(RedisModule_ClusterCanAccessKeysInSlot(100000) == 0);
+
+ /* Call with invalid args. */
+ errno = 0;
+ RedisModule_Assert(RedisModule_ClusterPropagateForSlotMigration(NULL, NULL, NULL) == REDISMODULE_ERR);
+ RedisModule_Assert(errno == EINVAL);
+
+ /* Call with invalid args. */
+ errno = 0;
+ RedisModule_Assert(RedisModule_ClusterPropagateForSlotMigration(ctx, NULL, NULL) == REDISMODULE_ERR);
+ RedisModule_Assert(errno == EINVAL);
+
+ /* Call with invalid args. */
+ errno = 0;
+ RedisModule_Assert(RedisModule_ClusterPropagateForSlotMigration(NULL, "asm.keyless_cmd", "") == REDISMODULE_ERR);
+ RedisModule_Assert(errno == EINVAL);
+
+ /* Call outside of slot migration. */
+ errno = 0;
+ RedisModule_Assert(RedisModule_ClusterPropagateForSlotMigration(ctx, "asm.keyless_cmd", "") == REDISMODULE_ERR);
+ RedisModule_Assert(errno == EBADF);
+
+ RedisModule_ReplyWithSimpleString(ctx, "OK");
+ return REDISMODULE_OK;
+}
+
+/* Command to test RM_ClusterCanAccessKeysInSlot(). */
+int testClusterCanAccessKeysInSlot(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argc);
+ long long slot = 0;
+
+ if (RedisModule_StringToLongLong(argv[1],&slot) != REDISMODULE_OK) {
+ return RedisModule_ReplyWithError(ctx,"ERR invalid slot");
+ }
+ RedisModule_ReplyWithLongLong(ctx, RedisModule_ClusterCanAccessKeysInSlot(slot));
+ return REDISMODULE_OK;
+}
+
+/* Generate a string representation of the info struct and subevent.
+ e.g. 'sub: cluster-slot-migration-import-started, task_id: aeBd..., slots: 0-100,200-300' */
+const char *clusterAsmInfoToString(RedisModuleClusterSlotMigrationInfo *info, uint64_t sub) {
+ char buf[1024] = {0};
+
+ if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_STARTED)
+ snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-import-started, ");
+ else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_FAILED)
+ snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-import-failed, ");
+ else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_COMPLETED)
+ snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-import-completed, ");
+ else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_STARTED)
+ snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-migrate-started, ");
+ else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_FAILED)
+ snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-migrate-failed, ");
+ else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED)
+ snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-migrate-completed, ");
+ else {
+ RedisModule_Assert(0);
+ }
+ snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), "source_node_id:%.40s, destination_node_id:%.40s, ",
+ info->source_node_id, info->destination_node_id);
+ snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), "task_id:%s, slots:", info->task_id);
+ for (int i = 0; i < info->slots->num_ranges; i++) {
+ RedisModuleSlotRange *sr = &info->slots->ranges[i];
+ snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), "%d-%d", sr->start, sr->end);
+ if (i != info->slots->num_ranges - 1)
+ snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), ",");
+ }
+ return RedisModule_Strdup(buf);
+}
+
+/* Generate a string representation of the info struct and subevent.
+ e.g. 'sub: cluster-slot-migration-trim-started, task_id: aeBd..., slots:0-100,200-300' */
+const char *clusterTrimInfoToString(RedisModuleClusterSlotMigrationTrimInfo *info, uint64_t sub) {
+ RedisModule_Assert(info);
+ char buf[1024] = {0};
+
+ if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_BACKGROUND)
+ snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-trim-background, ");
+ else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_STARTED)
+ snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-trim-started, ");
+ else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_COMPLETED)
+ snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-trim-completed, ");
+ else {
+ RedisModule_Assert(0);
+ }
+ snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), "slots:");
+ for (int i = 0; i < info->slots->num_ranges; i++) {
+ RedisModuleSlotRange *sr = &info->slots->ranges[i];
+ snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), "%d-%d", sr->start, sr->end);
+ if (i != info->slots->num_ranges - 1)
+ snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), ",");
+ }
+ return RedisModule_Strdup(buf);
+}
+
+static void testReplicatingOutsideSlotRange(RedisModuleCtx *ctx, RedisModuleClusterSlotMigrationInfo *info) {
+ int slot = 0;
+ while (slot >= 0 && slot <= 16383) {
+ if (!slotRangeArrayContains(info->slots, slot)) {
+ break;
+ }
+ slot++;
+ }
+ char buf[128] = {0};
+ const char *prefix = RedisModule_ClusterCanonicalKeyNameInSlot(slot);
+ snprintf(buf, sizeof(buf), "{%s}%s", prefix, "modulekey");
+ errno = 0;
+ int ret = RedisModule_ClusterPropagateForSlotMigration(ctx, "SET", "cc", buf, "value");
+ RedisModule_Assert(ret == REDISMODULE_ERR);
+ RedisModule_Assert(errno == ERANGE);
+}
+
+static void testReplicatingCrossslotCommand(RedisModuleCtx *ctx) {
+ errno = 0;
+ int ret = RedisModule_ClusterPropagateForSlotMigration(ctx, "MSET", "cccccc", "key1", "val1", "key2", "val2", "key3", "val3");
+ RedisModule_Assert(ret == REDISMODULE_ERR);
+ RedisModule_Assert(errno == ENOTSUP);
+}
+
+static void testReplicatingUnknownCommand(RedisModuleCtx *ctx) {
+ errno = 0;
+ int ret = RedisModule_ClusterPropagateForSlotMigration(ctx, "unknowncommand", "");
+ RedisModule_Assert(ret == REDISMODULE_ERR);
+ RedisModule_Assert(errno == ENOENT);
+}
+
+static void testNonFatalScenarios(RedisModuleCtx *ctx, RedisModuleClusterSlotMigrationInfo *info) {
+ testReplicatingOutsideSlotRange(ctx, info);
+ testReplicatingCrossslotCommand(ctx);
+ testReplicatingUnknownCommand(ctx);
+}
+
+int disableTrimCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ disableTrimFlag = 1;
+ /* Only disable when MIGRATE_COMPLETED for simulating recommended usage. */
+ // RedisModule_ClusterDisableTrim(ctx)
+ RedisModule_ReplyWithSimpleString(ctx, "OK");
+ return REDISMODULE_OK;
+}
+
+int enableTrimCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ disableTrimFlag = 0;
+ RedisModule_Assert(RedisModule_ClusterEnableTrim(ctx) == REDISMODULE_OK);
+ RedisModule_ReplyWithSimpleString(ctx, "OK");
+ return REDISMODULE_OK;
+}
+
+int trimInProgressCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ uint64_t flags = RedisModule_GetContextFlags(ctx);
+ RedisModule_ReplyWithLongLong(ctx, !!(flags & REDISMODULE_CTX_FLAGS_TRIM_IN_PROGRESS));
+ return REDISMODULE_OK;
+}
+
+void clusterEventCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) {
+ REDISMODULE_NOT_USED(ctx);
+ int ret;
+
+ RedisModule_Assert(RedisModule_IsSubEventSupported(e, sub));
+
+ if (e.id == REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION) {
+ RedisModuleClusterSlotMigrationInfo *info = data;
+
+ if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE) {
+ /* Test some non-fatal scenarios. */
+ testNonFatalScenarios(ctx, info);
+
+ if (replicateModuleCommand == 0) return;
+
+ /* Replicate a keyless command. */
+ ret = RedisModule_ClusterPropagateForSlotMigration(ctx, "asm.keyless_cmd", "");
+ RedisModule_Assert(ret == REDISMODULE_OK);
+
+ /* Propagate configured key and value. */
+ ret = RedisModule_ClusterPropagateForSlotMigration(ctx, "SET", "ss", moduleCommandKeyName, moduleCommandKeyVal);
+ RedisModule_Assert(ret == REDISMODULE_OK);
+ } else {
+ /* Log the event. */
+ if (numClusterEvents >= MAX_EVENTS) return;
+ clusterEventLog[numClusterEvents++] = clusterAsmInfoToString(info, sub);
+
+ if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED) {
+ /* If users ask to disable trim, we disable trim. */
+ if (disableTrimFlag) {
+ RedisModule_Assert(RedisModule_ClusterDisableTrim(ctx) == REDISMODULE_OK);
+ }
+ }
+ }
+ }
+}
+
+int getPendingTrimKeyCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 2) {
+ RedisModule_ReplyWithError(ctx, "ERR wrong number of arguments");
+ return REDISMODULE_ERR;
+ }
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1],
+ REDISMODULE_READ | REDISMODULE_OPEN_KEY_ACCESS_TRIMMED);
+ if (!key) {
+ RedisModule_ReplyWithNull(ctx);
+ return REDISMODULE_OK;
+ }
+ if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_STRING) {
+ RedisModule_ReplyWithError(ctx, "key is not a string");
+ return REDISMODULE_ERR;
+ }
+ size_t len;
+ const char *value = RedisModule_StringDMA(key, &len, 0);
+ RedisModule_ReplyWithStringBuffer(ctx, value, len);
+ RedisModule_CloseKey(key);
+ return REDISMODULE_OK;
+}
+
+void clusterTrimEventCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) {
+ REDISMODULE_NOT_USED(ctx);
+
+ RedisModule_Assert(RedisModule_IsSubEventSupported(e, sub));
+
+ if (e.id == REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM) {
+ /* Log the event. */
+ if (numClusterTrimEvents >= MAX_EVENTS) return;
+ RedisModuleClusterSlotMigrationTrimInfo *info = data;
+ clusterTrimEventLog[numClusterTrimEvents++] = clusterTrimInfoToString(info, sub);
+ }
+}
+
+static int keyspaceNotificationTrimmedCallback(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
+ REDISMODULE_NOT_USED(ctx);
+
+ RedisModule_Assert(type == REDISMODULE_NOTIFY_KEY_TRIMMED);
+ RedisModule_Assert(strcmp(event, "key_trimmed") == 0);
+
+ if (numClusterTrimEvents >= MAX_EVENTS) return REDISMODULE_OK;
+
+ /* Log the trimmed key event. */
+ size_t len;
+ const char *key_str = RedisModule_StringPtrLen(key, &len);
+
+ char buf[1024] = {0};
+ snprintf(buf, sizeof(buf), "keyspace: key_trimmed, key: %s", key_str);
+
+ clusterTrimEventLog[numClusterTrimEvents++] = RedisModule_Strdup(buf);
+ return REDISMODULE_OK;
+}
+
+/* ASM.PARENT SET key value (just proxy to Redis SET) */
+static int asmParentSet(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ if (argc != 4) return RedisModule_WrongArity(ctx);
+ RedisModuleCallReply *reply = RedisModule_Call(ctx, "SET", "ss", argv[2], argv[3]);
+ if (!reply) return RedisModule_ReplyWithError(ctx, "ERR internal");
+ RedisModule_ReplyWithCallReply(ctx, reply);
+ RedisModule_FreeCallReply(reply);
+ RedisModule_ReplicateVerbatim(ctx);
+ return REDISMODULE_OK;
+}
+
+/* Clear both the cluster and trim event logs. */
+int clearEventLog(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ for (int i = 0; i < numClusterEvents; i++)
+ RedisModule_Free((void *)clusterEventLog[i]);
+ numClusterEvents = 0;
+
+ for (int i = 0; i < numClusterTrimEvents; i++)
+ RedisModule_Free((void *)clusterTrimEventLog[i]);
+ numClusterTrimEvents = 0;
+
+ RedisModule_ReplyWithSimpleString(ctx, "OK");
+ return REDISMODULE_OK;
+}
+
+/* Reply with the cluster event log. */
+int getClusterEventLog(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(ctx);
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ RedisModule_ReplyWithArray(ctx, numClusterEvents);
+ for (int i = 0; i < numClusterEvents; i++)
+ RedisModule_ReplyWithStringBuffer(ctx, clusterEventLog[i], strlen(clusterEventLog[i]));
+ return REDISMODULE_OK;
+}
+
+/* Reply with the cluster trim event log. */
+int getClusterTrimEventLog(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(ctx);
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ RedisModule_ReplyWithArray(ctx, numClusterTrimEvents);
+ for (int i = 0; i < numClusterTrimEvents; i++)
+ RedisModule_ReplyWithStringBuffer(ctx, clusterTrimEventLog[i], strlen(clusterTrimEventLog[i]));
+ return REDISMODULE_OK;
+}
+
+/* A keyless command to test module command replication. */
+int moduledata = 0;
+int keylessCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(ctx);
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ moduledata++;
+ RedisModule_ReplyWithLongLong(ctx, moduledata);
+ return REDISMODULE_OK;
+}
+int readkeylessCmdVal(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(ctx);
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+ RedisModule_ReplyWithLongLong(ctx, moduledata);
+ return REDISMODULE_OK;
+}
+
+int subscribeTrimmedEvent(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(ctx);
+ if (argc != 2)
+ return RedisModule_WrongArity(ctx);
+
+ long long subscribe = 0;
+ if (RedisModule_StringToLongLong(argv[1], &subscribe) != REDISMODULE_OK) {
+ RedisModule_ReplyWithError(ctx, "ERR subscribe value");
+ return REDISMODULE_OK;
+ }
+
+ if (subscribe) {
+ /* Unsubscribe first to avoid duplicate subscription. */
+ RedisModule_UnsubscribeFromKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_TRIMMED, keyspaceNotificationTrimmedCallback);
+ int ret = RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_TRIMMED, keyspaceNotificationTrimmedCallback);
+ RedisModule_Assert(ret == REDISMODULE_OK);
+ } else {
+ int ret = RedisModule_UnsubscribeFromKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_TRIMMED, keyspaceNotificationTrimmedCallback);
+ RedisModule_Assert(ret == REDISMODULE_OK);
+ }
+ RedisModule_ReplyWithSimpleString(ctx, "OK");
+ return REDISMODULE_OK;
+}
+
+void keyEventCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) {
+ REDISMODULE_NOT_USED(ctx);
+ REDISMODULE_NOT_USED(e);
+
+ if (sub == REDISMODULE_SUBEVENT_KEY_DELETED) {
+ RedisModuleKeyInfoV1 *ei = data;
+ RedisModuleKey *kp = ei->key;
+ RedisModuleString *key = (RedisModuleString *) RedisModule_GetKeyNameFromModuleKey(kp);
+ size_t keylen;
+ const char *keyname = RedisModule_StringPtrLen(key, &keylen);
+
+ /* Verify value can be read. It will be used to verify key's value can
+ * be read in a trim callback. */
+ size_t valuelen = 0;
+ const char *value = "";
+ RedisModuleKey *mk = RedisModule_OpenKey(ctx, key, REDISMODULE_READ);
+ if (RedisModule_KeyType(mk) == REDISMODULE_KEYTYPE_STRING) {
+ value = RedisModule_StringDMA(mk, &valuelen, 0);
+ }
+ RedisModule_CloseKey(mk);
+
+ char buf[1024] = {0};
+ snprintf(buf, sizeof(buf), "keyevent: key: %.*s, value: %.*s", (int) keylen, keyname, (int)valuelen, value);
+
+ if (lastDeletedKeyLog) RedisModule_Free((void *)lastDeletedKeyLog);
+ lastDeletedKeyLog = RedisModule_Strdup(buf);
+ }
+}
+
+int getLastDeletedKey(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(ctx);
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ if (lastDeletedKeyLog) {
+ RedisModule_ReplyWithStringBuffer(ctx, lastDeletedKeyLog, strlen(lastDeletedKeyLog));
+ } else {
+ RedisModule_ReplyWithNull(ctx);
+ }
+ return REDISMODULE_OK;
+}
+
+int asmGetCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(ctx);
+
+ if (argc != 2) return RedisModule_WrongArity(ctx);
+ RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ);
+ if (key == NULL) {
+ RedisModule_ReplyWithNull(ctx);
+ return REDISMODULE_OK;
+ }
+
+ RedisModule_Assert(RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_STRING);
+ size_t len;
+ const char *value = RedisModule_StringDMA(key, &len, 0);
+ RedisModule_ReplyWithStringBuffer(ctx, value, len);
+ RedisModule_CloseKey(key);
+
+ return REDISMODULE_OK;
+}
+
+int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
+ REDISMODULE_NOT_USED(argv);
+ REDISMODULE_NOT_USED(argc);
+
+ if (RedisModule_Init(ctx, "asm", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.cluster_can_access_keys_in_slot", testClusterCanAccessKeysInSlot, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.clear_event_log", clearEventLog, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.get_cluster_event_log", getClusterEventLog, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.get_cluster_trim_event_log", getClusterTrimEventLog, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.keyless_cmd", keylessCmd, "write", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.disable_trim", disableTrimCmd, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.enable_trim", enableTrimCmd, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.read_pending_trim_key", getPendingTrimKeyCmd, "readonly", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.trim_in_progress", trimInProgressCmd, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.read_keyless_cmd_val", readkeylessCmdVal, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.sanity", sanity, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.subscribe_trimmed_event", subscribeTrimmedEvent, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.replicate_module_command", replicate_module_command, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.lpush_replicate_crossslot_command", lpush_and_replicate_crossslot_command, "write", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.cluster_get_local_slot_ranges", testClusterGetLocalSlotRanges, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.get_last_deleted_key", getLastDeletedKey, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.get", asmGetCommand, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_CreateCommand(ctx, "asm.parent", NULL, "", 0, 0, 0) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ RedisModuleCommand *parent = RedisModule_GetCommand(ctx, "asm.parent");
+ if (!parent) return REDISMODULE_ERR;
+
+ /* Subcommand: ASM.PARENT SET (write) */
+ if (RedisModule_CreateSubcommand(parent, "set", asmParentSet, "write fast", 2, 2, 1) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_ClusterSlotMigration, clusterEventCallback) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_ClusterSlotMigrationTrim, clusterTrimEventCallback) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_TRIMMED, keyspaceNotificationTrimmedCallback) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ if (RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_Key, keyEventCallback) == REDISMODULE_ERR)
+ return REDISMODULE_ERR;
+
+ return REDISMODULE_OK;
+}