diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:52:54 +0100 |
| commit | dcacc00e3750300617ba6e16eb346713f91a783a (patch) | |
| tree | 38e2d4fb5ed9d119711d4295c6eda4b014af73fd /examples/redis-unstable/tests/modules/atomicslotmigration.c | |
| parent | 58dac10aeb8f5a041c46bddbeaf4c7966a99b998 (diff) | |
| download | crep-dcacc00e3750300617ba6e16eb346713f91a783a.tar.gz | |
Remove testing data
Diffstat (limited to 'examples/redis-unstable/tests/modules/atomicslotmigration.c')
| -rw-r--r-- | examples/redis-unstable/tests/modules/atomicslotmigration.c | 594 |
1 files changed, 0 insertions, 594 deletions
diff --git a/examples/redis-unstable/tests/modules/atomicslotmigration.c b/examples/redis-unstable/tests/modules/atomicslotmigration.c deleted file mode 100644 index 83393cd..0000000 --- a/examples/redis-unstable/tests/modules/atomicslotmigration.c +++ /dev/null | |||
| @@ -1,594 +0,0 @@ | |||
| 1 | #include "redismodule.h" | ||
| 2 | |||
| 3 | #include <stdlib.h> | ||
| 4 | #include <memory.h> | ||
| 5 | #include <errno.h> | ||
| 6 | |||
| 7 | #define MAX_EVENTS 1024 | ||
| 8 | |||
| 9 | /* Log of cluster events. */ | ||
| 10 | const char *clusterEventLog[MAX_EVENTS]; | ||
| 11 | int numClusterEvents = 0; | ||
| 12 | |||
| 13 | /* Log of cluster trim events. */ | ||
| 14 | const char *clusterTrimEventLog[MAX_EVENTS]; | ||
| 15 | int numClusterTrimEvents = 0; | ||
| 16 | |||
| 17 | /* Log of last deleted key event. */ | ||
| 18 | const char *lastDeletedKeyLog = NULL; | ||
| 19 | |||
| 20 | /* Flag to disable trim. */ | ||
| 21 | int disableTrimFlag = 0; | ||
| 22 | |||
| 23 | int replicateModuleCommand = 0; /* Enable or disable module command replication. */ | ||
| 24 | RedisModuleString *moduleCommandKeyName = NULL; /* Key name to replicate. */ | ||
| 25 | RedisModuleString *moduleCommandKeyVal = NULL; /* Key value to replicate. */ | ||
| 26 | |||
| 27 | /* Enable or disable module command replication. */ | ||
| 28 | int replicate_module_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 29 | if (argc != 4) { | ||
| 30 | RedisModule_ReplyWithError(ctx, "ERR wrong number of arguments"); | ||
| 31 | return REDISMODULE_OK; | ||
| 32 | } | ||
| 33 | |||
| 34 | long long enable = 0; | ||
| 35 | if (RedisModule_StringToLongLong(argv[1], &enable) != REDISMODULE_OK) { | ||
| 36 | RedisModule_ReplyWithError(ctx, "ERR enable value"); | ||
| 37 | return REDISMODULE_OK; | ||
| 38 | } | ||
| 39 | replicateModuleCommand = (enable != 0); | ||
| 40 | |||
| 41 | /* Set the key name and value to replicate. */ | ||
| 42 | if (moduleCommandKeyName) RedisModule_FreeString(ctx, moduleCommandKeyName); | ||
| 43 | if (moduleCommandKeyVal) RedisModule_FreeString(ctx, moduleCommandKeyVal); | ||
| 44 | moduleCommandKeyName = RedisModule_CreateStringFromString(ctx, argv[2]); | ||
| 45 | moduleCommandKeyVal = RedisModule_CreateStringFromString(ctx, argv[3]); | ||
| 46 | |||
| 47 | RedisModule_ReplyWithSimpleString(ctx, "OK"); | ||
| 48 | return REDISMODULE_OK; | ||
| 49 | } | ||
| 50 | |||
| 51 | int lpush_and_replicate_crossslot_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 52 | if (argc != 3) return RedisModule_WrongArity(ctx); | ||
| 53 | |||
| 54 | /* LPUSH */ | ||
| 55 | RedisModuleCallReply *rep = RedisModule_Call(ctx, "LPUSH", "!ss", argv[1], argv[2]); | ||
| 56 | RedisModule_Assert(RedisModule_CallReplyType(rep) != REDISMODULE_REPLY_ERROR); | ||
| 57 | RedisModule_FreeCallReply(rep); | ||
| 58 | |||
| 59 | /* Replicate cross slot command */ | ||
| 60 | int ret = RedisModule_Replicate(ctx, "MSET", "cccccc", "key1", "val1", "key2", "val2", "key3", "val3"); | ||
| 61 | RedisModule_Assert(ret == REDISMODULE_OK); | ||
| 62 | |||
| 63 | RedisModule_ReplyWithSimpleString(ctx, "OK"); | ||
| 64 | return REDISMODULE_OK; | ||
| 65 | } | ||
| 66 | |||
| 67 | int testClusterGetLocalSlotRanges(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 68 | REDISMODULE_NOT_USED(argv); | ||
| 69 | REDISMODULE_NOT_USED(argc); | ||
| 70 | |||
| 71 | static int use_auto_memory = 0; | ||
| 72 | use_auto_memory = !use_auto_memory; | ||
| 73 | |||
| 74 | RedisModuleSlotRangeArray *slots; | ||
| 75 | if (use_auto_memory) { | ||
| 76 | RedisModule_AutoMemory(ctx); | ||
| 77 | slots = RedisModule_ClusterGetLocalSlotRanges(ctx); | ||
| 78 | } else { | ||
| 79 | slots = RedisModule_ClusterGetLocalSlotRanges(NULL); | ||
| 80 | } | ||
| 81 | |||
| 82 | RedisModule_ReplyWithArray(ctx, slots->num_ranges); | ||
| 83 | for (int i = 0; i < slots->num_ranges; i++) { | ||
| 84 | RedisModule_ReplyWithArray(ctx, 2); | ||
| 85 | RedisModule_ReplyWithLongLong(ctx, slots->ranges[i].start); | ||
| 86 | RedisModule_ReplyWithLongLong(ctx, slots->ranges[i].end); | ||
| 87 | } | ||
| 88 | if (!use_auto_memory) | ||
| 89 | RedisModule_ClusterFreeSlotRanges(NULL, slots); | ||
| 90 | return REDISMODULE_OK; | ||
| 91 | } | ||
| 92 | |||
| 93 | /* Helper function to check if a slot range array contains a given slot. */ | ||
| 94 | int slotRangeArrayContains(RedisModuleSlotRangeArray *sra, unsigned int slot) { | ||
| 95 | for (int i = 0; i < sra->num_ranges; i++) | ||
| 96 | if (sra->ranges[i].start <= slot && sra->ranges[i].end >= slot) | ||
| 97 | return 1; | ||
| 98 | return 0; | ||
| 99 | } | ||
| 100 | |||
| 101 | /* Sanity check. */ | ||
| 102 | int sanity(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 103 | REDISMODULE_NOT_USED(argv); | ||
| 104 | REDISMODULE_NOT_USED(argc); | ||
| 105 | |||
| 106 | RedisModule_Assert(RedisModule_ClusterCanAccessKeysInSlot(-1) == 0); | ||
| 107 | RedisModule_Assert(RedisModule_ClusterCanAccessKeysInSlot(16384) == 0); | ||
| 108 | RedisModule_Assert(RedisModule_ClusterCanAccessKeysInSlot(100000) == 0); | ||
| 109 | |||
| 110 | /* Call with invalid args. */ | ||
| 111 | errno = 0; | ||
| 112 | RedisModule_Assert(RedisModule_ClusterPropagateForSlotMigration(NULL, NULL, NULL) == REDISMODULE_ERR); | ||
| 113 | RedisModule_Assert(errno == EINVAL); | ||
| 114 | |||
| 115 | /* Call with invalid args. */ | ||
| 116 | errno = 0; | ||
| 117 | RedisModule_Assert(RedisModule_ClusterPropagateForSlotMigration(ctx, NULL, NULL) == REDISMODULE_ERR); | ||
| 118 | RedisModule_Assert(errno == EINVAL); | ||
| 119 | |||
| 120 | /* Call with invalid args. */ | ||
| 121 | errno = 0; | ||
| 122 | RedisModule_Assert(RedisModule_ClusterPropagateForSlotMigration(NULL, "asm.keyless_cmd", "") == REDISMODULE_ERR); | ||
| 123 | RedisModule_Assert(errno == EINVAL); | ||
| 124 | |||
| 125 | /* Call outside of slot migration. */ | ||
| 126 | errno = 0; | ||
| 127 | RedisModule_Assert(RedisModule_ClusterPropagateForSlotMigration(ctx, "asm.keyless_cmd", "") == REDISMODULE_ERR); | ||
| 128 | RedisModule_Assert(errno == EBADF); | ||
| 129 | |||
| 130 | RedisModule_ReplyWithSimpleString(ctx, "OK"); | ||
| 131 | return REDISMODULE_OK; | ||
| 132 | } | ||
| 133 | |||
| 134 | /* Command to test RM_ClusterCanAccessKeysInSlot(). */ | ||
| 135 | int testClusterCanAccessKeysInSlot(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 136 | REDISMODULE_NOT_USED(argc); | ||
| 137 | long long slot = 0; | ||
| 138 | |||
| 139 | if (RedisModule_StringToLongLong(argv[1],&slot) != REDISMODULE_OK) { | ||
| 140 | return RedisModule_ReplyWithError(ctx,"ERR invalid slot"); | ||
| 141 | } | ||
| 142 | RedisModule_ReplyWithLongLong(ctx, RedisModule_ClusterCanAccessKeysInSlot(slot)); | ||
| 143 | return REDISMODULE_OK; | ||
| 144 | } | ||
| 145 | |||
| 146 | /* Generate a string representation of the info struct and subevent. | ||
| 147 | e.g. 'sub: cluster-slot-migration-import-started, task_id: aeBd..., slots: 0-100,200-300' */ | ||
| 148 | const char *clusterAsmInfoToString(RedisModuleClusterSlotMigrationInfo *info, uint64_t sub) { | ||
| 149 | char buf[1024] = {0}; | ||
| 150 | |||
| 151 | if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_STARTED) | ||
| 152 | snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-import-started, "); | ||
| 153 | else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_FAILED) | ||
| 154 | snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-import-failed, "); | ||
| 155 | else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_IMPORT_COMPLETED) | ||
| 156 | snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-import-completed, "); | ||
| 157 | else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_STARTED) | ||
| 158 | snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-migrate-started, "); | ||
| 159 | else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_FAILED) | ||
| 160 | snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-migrate-failed, "); | ||
| 161 | else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED) | ||
| 162 | snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-migrate-completed, "); | ||
| 163 | else { | ||
| 164 | RedisModule_Assert(0); | ||
| 165 | } | ||
| 166 | snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), "source_node_id:%.40s, destination_node_id:%.40s, ", | ||
| 167 | info->source_node_id, info->destination_node_id); | ||
| 168 | snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), "task_id:%s, slots:", info->task_id); | ||
| 169 | for (int i = 0; i < info->slots->num_ranges; i++) { | ||
| 170 | RedisModuleSlotRange *sr = &info->slots->ranges[i]; | ||
| 171 | snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), "%d-%d", sr->start, sr->end); | ||
| 172 | if (i != info->slots->num_ranges - 1) | ||
| 173 | snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), ","); | ||
| 174 | } | ||
| 175 | return RedisModule_Strdup(buf); | ||
| 176 | } | ||
| 177 | |||
| 178 | /* Generate a string representation of the info struct and subevent. | ||
| 179 | e.g. 'sub: cluster-slot-migration-trim-started, task_id: aeBd..., slots:0-100,200-300' */ | ||
| 180 | const char *clusterTrimInfoToString(RedisModuleClusterSlotMigrationTrimInfo *info, uint64_t sub) { | ||
| 181 | RedisModule_Assert(info); | ||
| 182 | char buf[1024] = {0}; | ||
| 183 | |||
| 184 | if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_BACKGROUND) | ||
| 185 | snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-trim-background, "); | ||
| 186 | else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_STARTED) | ||
| 187 | snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-trim-started, "); | ||
| 188 | else if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_TRIM_COMPLETED) | ||
| 189 | snprintf(buf, sizeof(buf), "sub: cluster-slot-migration-trim-completed, "); | ||
| 190 | else { | ||
| 191 | RedisModule_Assert(0); | ||
| 192 | } | ||
| 193 | snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), "slots:"); | ||
| 194 | for (int i = 0; i < info->slots->num_ranges; i++) { | ||
| 195 | RedisModuleSlotRange *sr = &info->slots->ranges[i]; | ||
| 196 | snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), "%d-%d", sr->start, sr->end); | ||
| 197 | if (i != info->slots->num_ranges - 1) | ||
| 198 | snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), ","); | ||
| 199 | } | ||
| 200 | return RedisModule_Strdup(buf); | ||
| 201 | } | ||
| 202 | |||
| 203 | static void testReplicatingOutsideSlotRange(RedisModuleCtx *ctx, RedisModuleClusterSlotMigrationInfo *info) { | ||
| 204 | int slot = 0; | ||
| 205 | while (slot >= 0 && slot <= 16383) { | ||
| 206 | if (!slotRangeArrayContains(info->slots, slot)) { | ||
| 207 | break; | ||
| 208 | } | ||
| 209 | slot++; | ||
| 210 | } | ||
| 211 | char buf[128] = {0}; | ||
| 212 | const char *prefix = RedisModule_ClusterCanonicalKeyNameInSlot(slot); | ||
| 213 | snprintf(buf, sizeof(buf), "{%s}%s", prefix, "modulekey"); | ||
| 214 | errno = 0; | ||
| 215 | int ret = RedisModule_ClusterPropagateForSlotMigration(ctx, "SET", "cc", buf, "value"); | ||
| 216 | RedisModule_Assert(ret == REDISMODULE_ERR); | ||
| 217 | RedisModule_Assert(errno == ERANGE); | ||
| 218 | } | ||
| 219 | |||
| 220 | static void testReplicatingCrossslotCommand(RedisModuleCtx *ctx) { | ||
| 221 | errno = 0; | ||
| 222 | int ret = RedisModule_ClusterPropagateForSlotMigration(ctx, "MSET", "cccccc", "key1", "val1", "key2", "val2", "key3", "val3"); | ||
| 223 | RedisModule_Assert(ret == REDISMODULE_ERR); | ||
| 224 | RedisModule_Assert(errno == ENOTSUP); | ||
| 225 | } | ||
| 226 | |||
| 227 | static void testReplicatingUnknownCommand(RedisModuleCtx *ctx) { | ||
| 228 | errno = 0; | ||
| 229 | int ret = RedisModule_ClusterPropagateForSlotMigration(ctx, "unknowncommand", ""); | ||
| 230 | RedisModule_Assert(ret == REDISMODULE_ERR); | ||
| 231 | RedisModule_Assert(errno == ENOENT); | ||
| 232 | } | ||
| 233 | |||
| 234 | static void testNonFatalScenarios(RedisModuleCtx *ctx, RedisModuleClusterSlotMigrationInfo *info) { | ||
| 235 | testReplicatingOutsideSlotRange(ctx, info); | ||
| 236 | testReplicatingCrossslotCommand(ctx); | ||
| 237 | testReplicatingUnknownCommand(ctx); | ||
| 238 | } | ||
| 239 | |||
| 240 | int disableTrimCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 241 | REDISMODULE_NOT_USED(argv); | ||
| 242 | REDISMODULE_NOT_USED(argc); | ||
| 243 | disableTrimFlag = 1; | ||
| 244 | /* Only disable when MIGRATE_COMPLETED for simulating recommended usage. */ | ||
| 245 | // RedisModule_ClusterDisableTrim(ctx) | ||
| 246 | RedisModule_ReplyWithSimpleString(ctx, "OK"); | ||
| 247 | return REDISMODULE_OK; | ||
| 248 | } | ||
| 249 | |||
| 250 | int enableTrimCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 251 | REDISMODULE_NOT_USED(argv); | ||
| 252 | REDISMODULE_NOT_USED(argc); | ||
| 253 | disableTrimFlag = 0; | ||
| 254 | RedisModule_Assert(RedisModule_ClusterEnableTrim(ctx) == REDISMODULE_OK); | ||
| 255 | RedisModule_ReplyWithSimpleString(ctx, "OK"); | ||
| 256 | return REDISMODULE_OK; | ||
| 257 | } | ||
| 258 | |||
| 259 | int trimInProgressCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 260 | REDISMODULE_NOT_USED(argv); | ||
| 261 | REDISMODULE_NOT_USED(argc); | ||
| 262 | uint64_t flags = RedisModule_GetContextFlags(ctx); | ||
| 263 | RedisModule_ReplyWithLongLong(ctx, !!(flags & REDISMODULE_CTX_FLAGS_TRIM_IN_PROGRESS)); | ||
| 264 | return REDISMODULE_OK; | ||
| 265 | } | ||
| 266 | |||
| 267 | void clusterEventCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { | ||
| 268 | REDISMODULE_NOT_USED(ctx); | ||
| 269 | int ret; | ||
| 270 | |||
| 271 | RedisModule_Assert(RedisModule_IsSubEventSupported(e, sub)); | ||
| 272 | |||
| 273 | if (e.id == REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION) { | ||
| 274 | RedisModuleClusterSlotMigrationInfo *info = data; | ||
| 275 | |||
| 276 | if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_MODULE_PROPAGATE) { | ||
| 277 | /* Test some non-fatal scenarios. */ | ||
| 278 | testNonFatalScenarios(ctx, info); | ||
| 279 | |||
| 280 | if (replicateModuleCommand == 0) return; | ||
| 281 | |||
| 282 | /* Replicate a keyless command. */ | ||
| 283 | ret = RedisModule_ClusterPropagateForSlotMigration(ctx, "asm.keyless_cmd", ""); | ||
| 284 | RedisModule_Assert(ret == REDISMODULE_OK); | ||
| 285 | |||
| 286 | /* Propagate configured key and value. */ | ||
| 287 | ret = RedisModule_ClusterPropagateForSlotMigration(ctx, "SET", "ss", moduleCommandKeyName, moduleCommandKeyVal); | ||
| 288 | RedisModule_Assert(ret == REDISMODULE_OK); | ||
| 289 | } else { | ||
| 290 | /* Log the event. */ | ||
| 291 | if (numClusterEvents >= MAX_EVENTS) return; | ||
| 292 | clusterEventLog[numClusterEvents++] = clusterAsmInfoToString(info, sub); | ||
| 293 | |||
| 294 | if (sub == REDISMODULE_SUBEVENT_CLUSTER_SLOT_MIGRATION_MIGRATE_COMPLETED) { | ||
| 295 | /* If users ask to disable trim, we disable trim. */ | ||
| 296 | if (disableTrimFlag) { | ||
| 297 | RedisModule_Assert(RedisModule_ClusterDisableTrim(ctx) == REDISMODULE_OK); | ||
| 298 | } | ||
| 299 | } | ||
| 300 | } | ||
| 301 | } | ||
| 302 | } | ||
| 303 | |||
| 304 | int getPendingTrimKeyCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 305 | if (argc != 2) { | ||
| 306 | RedisModule_ReplyWithError(ctx, "ERR wrong number of arguments"); | ||
| 307 | return REDISMODULE_ERR; | ||
| 308 | } | ||
| 309 | RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], | ||
| 310 | REDISMODULE_READ | REDISMODULE_OPEN_KEY_ACCESS_TRIMMED); | ||
| 311 | if (!key) { | ||
| 312 | RedisModule_ReplyWithNull(ctx); | ||
| 313 | return REDISMODULE_OK; | ||
| 314 | } | ||
| 315 | if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_STRING) { | ||
| 316 | RedisModule_ReplyWithError(ctx, "key is not a string"); | ||
| 317 | return REDISMODULE_ERR; | ||
| 318 | } | ||
| 319 | size_t len; | ||
| 320 | const char *value = RedisModule_StringDMA(key, &len, 0); | ||
| 321 | RedisModule_ReplyWithStringBuffer(ctx, value, len); | ||
| 322 | RedisModule_CloseKey(key); | ||
| 323 | return REDISMODULE_OK; | ||
| 324 | } | ||
| 325 | |||
| 326 | void clusterTrimEventCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { | ||
| 327 | REDISMODULE_NOT_USED(ctx); | ||
| 328 | |||
| 329 | RedisModule_Assert(RedisModule_IsSubEventSupported(e, sub)); | ||
| 330 | |||
| 331 | if (e.id == REDISMODULE_EVENT_CLUSTER_SLOT_MIGRATION_TRIM) { | ||
| 332 | /* Log the event. */ | ||
| 333 | if (numClusterTrimEvents >= MAX_EVENTS) return; | ||
| 334 | RedisModuleClusterSlotMigrationTrimInfo *info = data; | ||
| 335 | clusterTrimEventLog[numClusterTrimEvents++] = clusterTrimInfoToString(info, sub); | ||
| 336 | } | ||
| 337 | } | ||
| 338 | |||
| 339 | static int keyspaceNotificationTrimmedCallback(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { | ||
| 340 | REDISMODULE_NOT_USED(ctx); | ||
| 341 | |||
| 342 | RedisModule_Assert(type == REDISMODULE_NOTIFY_KEY_TRIMMED); | ||
| 343 | RedisModule_Assert(strcmp(event, "key_trimmed") == 0); | ||
| 344 | |||
| 345 | if (numClusterTrimEvents >= MAX_EVENTS) return REDISMODULE_OK; | ||
| 346 | |||
| 347 | /* Log the trimmed key event. */ | ||
| 348 | size_t len; | ||
| 349 | const char *key_str = RedisModule_StringPtrLen(key, &len); | ||
| 350 | |||
| 351 | char buf[1024] = {0}; | ||
| 352 | snprintf(buf, sizeof(buf), "keyspace: key_trimmed, key: %s", key_str); | ||
| 353 | |||
| 354 | clusterTrimEventLog[numClusterTrimEvents++] = RedisModule_Strdup(buf); | ||
| 355 | return REDISMODULE_OK; | ||
| 356 | } | ||
| 357 | |||
| 358 | /* ASM.PARENT SET key value (just proxy to Redis SET) */ | ||
| 359 | static int asmParentSet(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 360 | if (argc != 4) return RedisModule_WrongArity(ctx); | ||
| 361 | RedisModuleCallReply *reply = RedisModule_Call(ctx, "SET", "ss", argv[2], argv[3]); | ||
| 362 | if (!reply) return RedisModule_ReplyWithError(ctx, "ERR internal"); | ||
| 363 | RedisModule_ReplyWithCallReply(ctx, reply); | ||
| 364 | RedisModule_FreeCallReply(reply); | ||
| 365 | RedisModule_ReplicateVerbatim(ctx); | ||
| 366 | return REDISMODULE_OK; | ||
| 367 | } | ||
| 368 | |||
| 369 | /* Clear both the cluster and trim event logs. */ | ||
| 370 | int clearEventLog(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 371 | REDISMODULE_NOT_USED(argv); | ||
| 372 | REDISMODULE_NOT_USED(argc); | ||
| 373 | |||
| 374 | for (int i = 0; i < numClusterEvents; i++) | ||
| 375 | RedisModule_Free((void *)clusterEventLog[i]); | ||
| 376 | numClusterEvents = 0; | ||
| 377 | |||
| 378 | for (int i = 0; i < numClusterTrimEvents; i++) | ||
| 379 | RedisModule_Free((void *)clusterTrimEventLog[i]); | ||
| 380 | numClusterTrimEvents = 0; | ||
| 381 | |||
| 382 | RedisModule_ReplyWithSimpleString(ctx, "OK"); | ||
| 383 | return REDISMODULE_OK; | ||
| 384 | } | ||
| 385 | |||
| 386 | /* Reply with the cluster event log. */ | ||
| 387 | int getClusterEventLog(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 388 | REDISMODULE_NOT_USED(ctx); | ||
| 389 | REDISMODULE_NOT_USED(argv); | ||
| 390 | REDISMODULE_NOT_USED(argc); | ||
| 391 | |||
| 392 | RedisModule_ReplyWithArray(ctx, numClusterEvents); | ||
| 393 | for (int i = 0; i < numClusterEvents; i++) | ||
| 394 | RedisModule_ReplyWithStringBuffer(ctx, clusterEventLog[i], strlen(clusterEventLog[i])); | ||
| 395 | return REDISMODULE_OK; | ||
| 396 | } | ||
| 397 | |||
| 398 | /* Reply with the cluster trim event log. */ | ||
| 399 | int getClusterTrimEventLog(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 400 | REDISMODULE_NOT_USED(ctx); | ||
| 401 | REDISMODULE_NOT_USED(argv); | ||
| 402 | REDISMODULE_NOT_USED(argc); | ||
| 403 | |||
| 404 | RedisModule_ReplyWithArray(ctx, numClusterTrimEvents); | ||
| 405 | for (int i = 0; i < numClusterTrimEvents; i++) | ||
| 406 | RedisModule_ReplyWithStringBuffer(ctx, clusterTrimEventLog[i], strlen(clusterTrimEventLog[i])); | ||
| 407 | return REDISMODULE_OK; | ||
| 408 | } | ||
| 409 | |||
| 410 | /* A keyless command to test module command replication. */ | ||
| 411 | int moduledata = 0; | ||
| 412 | int keylessCmd(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 413 | REDISMODULE_NOT_USED(ctx); | ||
| 414 | REDISMODULE_NOT_USED(argv); | ||
| 415 | REDISMODULE_NOT_USED(argc); | ||
| 416 | moduledata++; | ||
| 417 | RedisModule_ReplyWithLongLong(ctx, moduledata); | ||
| 418 | return REDISMODULE_OK; | ||
| 419 | } | ||
| 420 | int readkeylessCmdVal(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 421 | REDISMODULE_NOT_USED(ctx); | ||
| 422 | REDISMODULE_NOT_USED(argv); | ||
| 423 | REDISMODULE_NOT_USED(argc); | ||
| 424 | RedisModule_ReplyWithLongLong(ctx, moduledata); | ||
| 425 | return REDISMODULE_OK; | ||
| 426 | } | ||
| 427 | |||
| 428 | int subscribeTrimmedEvent(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 429 | REDISMODULE_NOT_USED(ctx); | ||
| 430 | if (argc != 2) | ||
| 431 | return RedisModule_WrongArity(ctx); | ||
| 432 | |||
| 433 | long long subscribe = 0; | ||
| 434 | if (RedisModule_StringToLongLong(argv[1], &subscribe) != REDISMODULE_OK) { | ||
| 435 | RedisModule_ReplyWithError(ctx, "ERR subscribe value"); | ||
| 436 | return REDISMODULE_OK; | ||
| 437 | } | ||
| 438 | |||
| 439 | if (subscribe) { | ||
| 440 | /* Unsubscribe first to avoid duplicate subscription. */ | ||
| 441 | RedisModule_UnsubscribeFromKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_TRIMMED, keyspaceNotificationTrimmedCallback); | ||
| 442 | int ret = RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_TRIMMED, keyspaceNotificationTrimmedCallback); | ||
| 443 | RedisModule_Assert(ret == REDISMODULE_OK); | ||
| 444 | } else { | ||
| 445 | int ret = RedisModule_UnsubscribeFromKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_TRIMMED, keyspaceNotificationTrimmedCallback); | ||
| 446 | RedisModule_Assert(ret == REDISMODULE_OK); | ||
| 447 | } | ||
| 448 | RedisModule_ReplyWithSimpleString(ctx, "OK"); | ||
| 449 | return REDISMODULE_OK; | ||
| 450 | } | ||
| 451 | |||
| 452 | void keyEventCallback(RedisModuleCtx *ctx, RedisModuleEvent e, uint64_t sub, void *data) { | ||
| 453 | REDISMODULE_NOT_USED(ctx); | ||
| 454 | REDISMODULE_NOT_USED(e); | ||
| 455 | |||
| 456 | if (sub == REDISMODULE_SUBEVENT_KEY_DELETED) { | ||
| 457 | RedisModuleKeyInfoV1 *ei = data; | ||
| 458 | RedisModuleKey *kp = ei->key; | ||
| 459 | RedisModuleString *key = (RedisModuleString *) RedisModule_GetKeyNameFromModuleKey(kp); | ||
| 460 | size_t keylen; | ||
| 461 | const char *keyname = RedisModule_StringPtrLen(key, &keylen); | ||
| 462 | |||
| 463 | /* Verify value can be read. It will be used to verify key's value can | ||
| 464 | * be read in a trim callback. */ | ||
| 465 | size_t valuelen = 0; | ||
| 466 | const char *value = ""; | ||
| 467 | RedisModuleKey *mk = RedisModule_OpenKey(ctx, key, REDISMODULE_READ); | ||
| 468 | if (RedisModule_KeyType(mk) == REDISMODULE_KEYTYPE_STRING) { | ||
| 469 | value = RedisModule_StringDMA(mk, &valuelen, 0); | ||
| 470 | } | ||
| 471 | RedisModule_CloseKey(mk); | ||
| 472 | |||
| 473 | char buf[1024] = {0}; | ||
| 474 | snprintf(buf, sizeof(buf), "keyevent: key: %.*s, value: %.*s", (int) keylen, keyname, (int)valuelen, value); | ||
| 475 | |||
| 476 | if (lastDeletedKeyLog) RedisModule_Free((void *)lastDeletedKeyLog); | ||
| 477 | lastDeletedKeyLog = RedisModule_Strdup(buf); | ||
| 478 | } | ||
| 479 | } | ||
| 480 | |||
| 481 | int getLastDeletedKey(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 482 | REDISMODULE_NOT_USED(ctx); | ||
| 483 | REDISMODULE_NOT_USED(argv); | ||
| 484 | REDISMODULE_NOT_USED(argc); | ||
| 485 | |||
| 486 | if (lastDeletedKeyLog) { | ||
| 487 | RedisModule_ReplyWithStringBuffer(ctx, lastDeletedKeyLog, strlen(lastDeletedKeyLog)); | ||
| 488 | } else { | ||
| 489 | RedisModule_ReplyWithNull(ctx); | ||
| 490 | } | ||
| 491 | return REDISMODULE_OK; | ||
| 492 | } | ||
| 493 | |||
| 494 | int asmGetCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 495 | REDISMODULE_NOT_USED(ctx); | ||
| 496 | |||
| 497 | if (argc != 2) return RedisModule_WrongArity(ctx); | ||
| 498 | RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); | ||
| 499 | if (key == NULL) { | ||
| 500 | RedisModule_ReplyWithNull(ctx); | ||
| 501 | return REDISMODULE_OK; | ||
| 502 | } | ||
| 503 | |||
| 504 | RedisModule_Assert(RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_STRING); | ||
| 505 | size_t len; | ||
| 506 | const char *value = RedisModule_StringDMA(key, &len, 0); | ||
| 507 | RedisModule_ReplyWithStringBuffer(ctx, value, len); | ||
| 508 | RedisModule_CloseKey(key); | ||
| 509 | |||
| 510 | return REDISMODULE_OK; | ||
| 511 | } | ||
| 512 | |||
| 513 | int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { | ||
| 514 | REDISMODULE_NOT_USED(argv); | ||
| 515 | REDISMODULE_NOT_USED(argc); | ||
| 516 | |||
| 517 | if (RedisModule_Init(ctx, "asm", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR) | ||
| 518 | return REDISMODULE_ERR; | ||
| 519 | |||
| 520 | if (RedisModule_CreateCommand(ctx, "asm.cluster_can_access_keys_in_slot", testClusterCanAccessKeysInSlot, "", 0, 0, 0) == REDISMODULE_ERR) | ||
| 521 | return REDISMODULE_ERR; | ||
| 522 | |||
| 523 | if (RedisModule_CreateCommand(ctx, "asm.clear_event_log", clearEventLog, "", 0, 0, 0) == REDISMODULE_ERR) | ||
| 524 | return REDISMODULE_ERR; | ||
| 525 | |||
| 526 | if (RedisModule_CreateCommand(ctx, "asm.get_cluster_event_log", getClusterEventLog, "", 0, 0, 0) == REDISMODULE_ERR) | ||
| 527 | return REDISMODULE_ERR; | ||
| 528 | |||
| 529 | if (RedisModule_CreateCommand(ctx, "asm.get_cluster_trim_event_log", getClusterTrimEventLog, "", 0, 0, 0) == REDISMODULE_ERR) | ||
| 530 | return REDISMODULE_ERR; | ||
| 531 | |||
| 532 | if (RedisModule_CreateCommand(ctx, "asm.keyless_cmd", keylessCmd, "write", 0, 0, 0) == REDISMODULE_ERR) | ||
| 533 | return REDISMODULE_ERR; | ||
| 534 | |||
| 535 | if (RedisModule_CreateCommand(ctx, "asm.disable_trim", disableTrimCmd, "", 0, 0, 0) == REDISMODULE_ERR) | ||
| 536 | return REDISMODULE_ERR; | ||
| 537 | |||
| 538 | if (RedisModule_CreateCommand(ctx, "asm.enable_trim", enableTrimCmd, "", 0, 0, 0) == REDISMODULE_ERR) | ||
| 539 | return REDISMODULE_ERR; | ||
| 540 | |||
| 541 | if (RedisModule_CreateCommand(ctx, "asm.read_pending_trim_key", getPendingTrimKeyCmd, "readonly", 0, 0, 0) == REDISMODULE_ERR) | ||
| 542 | return REDISMODULE_ERR; | ||
| 543 | |||
| 544 | if (RedisModule_CreateCommand(ctx, "asm.trim_in_progress", trimInProgressCmd, "", 0, 0, 0) == REDISMODULE_ERR) | ||
| 545 | return REDISMODULE_ERR; | ||
| 546 | |||
| 547 | if (RedisModule_CreateCommand(ctx, "asm.read_keyless_cmd_val", readkeylessCmdVal, "", 0, 0, 0) == REDISMODULE_ERR) | ||
| 548 | return REDISMODULE_ERR; | ||
| 549 | |||
| 550 | if (RedisModule_CreateCommand(ctx, "asm.sanity", sanity, "", 0, 0, 0) == REDISMODULE_ERR) | ||
| 551 | return REDISMODULE_ERR; | ||
| 552 | |||
| 553 | if (RedisModule_CreateCommand(ctx, "asm.subscribe_trimmed_event", subscribeTrimmedEvent, "", 0, 0, 0) == REDISMODULE_ERR) | ||
| 554 | return REDISMODULE_ERR; | ||
| 555 | |||
| 556 | if (RedisModule_CreateCommand(ctx, "asm.replicate_module_command", replicate_module_command, "", 0, 0, 0) == REDISMODULE_ERR) | ||
| 557 | return REDISMODULE_ERR; | ||
| 558 | |||
| 559 | if (RedisModule_CreateCommand(ctx, "asm.lpush_replicate_crossslot_command", lpush_and_replicate_crossslot_command, "write", 0, 0, 0) == REDISMODULE_ERR) | ||
| 560 | return REDISMODULE_ERR; | ||
| 561 | |||
| 562 | if (RedisModule_CreateCommand(ctx, "asm.cluster_get_local_slot_ranges", testClusterGetLocalSlotRanges, "", 0, 0, 0) == REDISMODULE_ERR) | ||
| 563 | return REDISMODULE_ERR; | ||
| 564 | |||
| 565 | if (RedisModule_CreateCommand(ctx, "asm.get_last_deleted_key", getLastDeletedKey, "", 0, 0, 0) == REDISMODULE_ERR) | ||
| 566 | return REDISMODULE_ERR; | ||
| 567 | |||
| 568 | if (RedisModule_CreateCommand(ctx, "asm.get", asmGetCommand, "", 0, 0, 0) == REDISMODULE_ERR) | ||
| 569 | return REDISMODULE_ERR; | ||
| 570 | |||
| 571 | if (RedisModule_CreateCommand(ctx, "asm.parent", NULL, "", 0, 0, 0) == REDISMODULE_ERR) | ||
| 572 | return REDISMODULE_ERR; | ||
| 573 | |||
| 574 | RedisModuleCommand *parent = RedisModule_GetCommand(ctx, "asm.parent"); | ||
| 575 | if (!parent) return REDISMODULE_ERR; | ||
| 576 | |||
| 577 | /* Subcommand: ASM.PARENT SET (write) */ | ||
| 578 | if (RedisModule_CreateSubcommand(parent, "set", asmParentSet, "write fast", 2, 2, 1) == REDISMODULE_ERR) | ||
| 579 | return REDISMODULE_ERR; | ||
| 580 | |||
| 581 | if (RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_ClusterSlotMigration, clusterEventCallback) == REDISMODULE_ERR) | ||
| 582 | return REDISMODULE_ERR; | ||
| 583 | |||
| 584 | if (RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_ClusterSlotMigrationTrim, clusterTrimEventCallback) == REDISMODULE_ERR) | ||
| 585 | return REDISMODULE_ERR; | ||
| 586 | |||
| 587 | if (RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_KEY_TRIMMED, keyspaceNotificationTrimmedCallback) == REDISMODULE_ERR) | ||
| 588 | return REDISMODULE_ERR; | ||
| 589 | |||
| 590 | if (RedisModule_SubscribeToServerEvent(ctx, RedisModuleEvent_Key, keyEventCallback) == REDISMODULE_ERR) | ||
| 591 | return REDISMODULE_ERR; | ||
| 592 | |||
| 593 | return REDISMODULE_OK; | ||
| 594 | } | ||
