summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/tests/modules/stream.c
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:52:54 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:52:54 +0100
commitdcacc00e3750300617ba6e16eb346713f91a783a (patch)
tree38e2d4fb5ed9d119711d4295c6eda4b014af73fd /examples/redis-unstable/tests/modules/stream.c
parent58dac10aeb8f5a041c46bddbeaf4c7966a99b998 (diff)
downloadcrep-dcacc00e3750300617ba6e16eb346713f91a783a.tar.gz
Remove testing data
Diffstat (limited to 'examples/redis-unstable/tests/modules/stream.c')
-rw-r--r--examples/redis-unstable/tests/modules/stream.c258
1 files changed, 0 insertions, 258 deletions
diff --git a/examples/redis-unstable/tests/modules/stream.c b/examples/redis-unstable/tests/modules/stream.c
deleted file mode 100644
index 65762a3..0000000
--- a/examples/redis-unstable/tests/modules/stream.c
+++ /dev/null
@@ -1,258 +0,0 @@
-#include "redismodule.h"
-
-#include <string.h>
-#include <strings.h>
-#include <assert.h>
-#include <unistd.h>
-#include <errno.h>
-
-/* Command which adds a stream entry with automatic ID, like XADD *.
- *
- * Syntax: STREAM.ADD key field1 value1 [ field2 value2 ... ]
- *
- * The response is the ID of the added stream entry or an error message.
- */
-int stream_add(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc < 2 || argc % 2 != 0) {
- RedisModule_WrongArity(ctx);
- return REDISMODULE_OK;
- }
-
- RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
- RedisModuleStreamID id;
- if (RedisModule_StreamAdd(key, REDISMODULE_STREAM_ADD_AUTOID, &id,
- &argv[2], (argc-2)/2) == REDISMODULE_OK) {
- RedisModuleString *id_str = RedisModule_CreateStringFromStreamID(ctx, &id);
- RedisModule_ReplyWithString(ctx, id_str);
- RedisModule_FreeString(ctx, id_str);
- } else {
- RedisModule_ReplyWithError(ctx, "ERR StreamAdd failed");
- }
- RedisModule_CloseKey(key);
- return REDISMODULE_OK;
-}
-
-/* Command which adds a stream entry N times.
- *
- * Syntax: STREAM.ADD key N field1 value1 [ field2 value2 ... ]
- *
- * Returns the number of successfully added entries.
- */
-int stream_addn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc < 3 || argc % 2 == 0) {
- RedisModule_WrongArity(ctx);
- return REDISMODULE_OK;
- }
-
- long long n, i;
- if (RedisModule_StringToLongLong(argv[2], &n) == REDISMODULE_ERR) {
- RedisModule_ReplyWithError(ctx, "N must be a number");
- return REDISMODULE_OK;
- }
-
- RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
- for (i = 0; i < n; i++) {
- if (RedisModule_StreamAdd(key, REDISMODULE_STREAM_ADD_AUTOID, NULL,
- &argv[3], (argc-3)/2) == REDISMODULE_ERR)
- break;
- }
- RedisModule_ReplyWithLongLong(ctx, i);
- RedisModule_CloseKey(key);
- return REDISMODULE_OK;
-}
-
-/* STREAM.DELETE key stream-id */
-int stream_delete(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc != 3) return RedisModule_WrongArity(ctx);
- RedisModuleStreamID id;
- if (RedisModule_StringToStreamID(argv[2], &id) != REDISMODULE_OK) {
- return RedisModule_ReplyWithError(ctx, "Invalid stream ID");
- }
- RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
- if (RedisModule_StreamDelete(key, &id) == REDISMODULE_OK) {
- RedisModule_ReplyWithSimpleString(ctx, "OK");
- } else {
- RedisModule_ReplyWithError(ctx, "ERR StreamDelete failed");
- }
- RedisModule_CloseKey(key);
- return REDISMODULE_OK;
-}
-
-/* STREAM.RANGE key start-id end-id
- *
- * Returns an array of stream items. Each item is an array on the form
- * [stream-id, [field1, value1, field2, value2, ...]].
- *
- * A funny side-effect used for testing RM_StreamIteratorDelete() is that if any
- * entry has a field named "selfdestruct", the stream entry is deleted. It is
- * however included in the results of this command.
- */
-int stream_range(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc != 4) {
- RedisModule_WrongArity(ctx);
- return REDISMODULE_OK;
- }
-
- RedisModuleStreamID startid, endid;
- if (RedisModule_StringToStreamID(argv[2], &startid) != REDISMODULE_OK ||
- RedisModule_StringToStreamID(argv[3], &endid) != REDISMODULE_OK) {
- RedisModule_ReplyWithError(ctx, "Invalid stream ID");
- return REDISMODULE_OK;
- }
-
- /* If startid > endid, we swap and set the reverse flag. */
- int flags = 0;
- if (startid.ms > endid.ms ||
- (startid.ms == endid.ms && startid.seq > endid.seq)) {
- RedisModuleStreamID tmp = startid;
- startid = endid;
- endid = tmp;
- flags |= REDISMODULE_STREAM_ITERATOR_REVERSE;
- }
-
- /* Open key and start iterator. */
- int openflags = REDISMODULE_READ | REDISMODULE_WRITE;
- RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], openflags);
- if (RedisModule_StreamIteratorStart(key, flags,
- &startid, &endid) != REDISMODULE_OK) {
- /* Key is not a stream, etc. */
- RedisModule_ReplyWithError(ctx, "ERR StreamIteratorStart failed");
- RedisModule_CloseKey(key);
- return REDISMODULE_OK;
- }
-
- /* Check error handling: Delete current entry when no current entry. */
- assert(RedisModule_StreamIteratorDelete(key) ==
- REDISMODULE_ERR);
- assert(errno == ENOENT);
-
- /* Check error handling: Fetch fields when no current entry. */
- assert(RedisModule_StreamIteratorNextField(key, NULL, NULL) ==
- REDISMODULE_ERR);
- assert(errno == ENOENT);
-
- /* Return array. */
- RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_LEN);
- RedisModule_AutoMemory(ctx);
- RedisModuleStreamID id;
- long numfields;
- long len = 0;
- while (RedisModule_StreamIteratorNextID(key, &id,
- &numfields) == REDISMODULE_OK) {
- RedisModule_ReplyWithArray(ctx, 2);
- RedisModuleString *id_str = RedisModule_CreateStringFromStreamID(ctx, &id);
- RedisModule_ReplyWithString(ctx, id_str);
- RedisModule_ReplyWithArray(ctx, numfields * 2);
- int delete = 0;
- RedisModuleString *field, *value;
- for (long i = 0; i < numfields; i++) {
- assert(RedisModule_StreamIteratorNextField(key, &field, &value) ==
- REDISMODULE_OK);
- RedisModule_ReplyWithString(ctx, field);
- RedisModule_ReplyWithString(ctx, value);
- /* check if this is a "selfdestruct" field */
- size_t field_len;
- const char *field_str = RedisModule_StringPtrLen(field, &field_len);
- if (!strncmp(field_str, "selfdestruct", field_len)) delete = 1;
- }
- if (delete) {
- assert(RedisModule_StreamIteratorDelete(key) == REDISMODULE_OK);
- }
- /* check error handling: no more fields to fetch */
- assert(RedisModule_StreamIteratorNextField(key, &field, &value) ==
- REDISMODULE_ERR);
- assert(errno == ENOENT);
- len++;
- }
- RedisModule_ReplySetArrayLength(ctx, len);
- RedisModule_StreamIteratorStop(key);
- RedisModule_CloseKey(key);
- return REDISMODULE_OK;
-}
-
-/*
- * STREAM.TRIM key (MAXLEN (=|~) length | MINID (=|~) id)
- */
-int stream_trim(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc != 5) {
- RedisModule_WrongArity(ctx);
- return REDISMODULE_OK;
- }
-
- /* Parse args */
- int trim_by_id = 0; /* 0 = maxlen, 1 = minid */
- long long maxlen;
- RedisModuleStreamID minid;
- size_t arg_len;
- const char *arg = RedisModule_StringPtrLen(argv[2], &arg_len);
- if (!strcasecmp(arg, "minid")) {
- trim_by_id = 1;
- if (RedisModule_StringToStreamID(argv[4], &minid) != REDISMODULE_OK) {
- RedisModule_ReplyWithError(ctx, "ERR Invalid stream ID");
- return REDISMODULE_OK;
- }
- } else if (!strcasecmp(arg, "maxlen")) {
- if (RedisModule_StringToLongLong(argv[4], &maxlen) == REDISMODULE_ERR) {
- RedisModule_ReplyWithError(ctx, "ERR Maxlen must be a number");
- return REDISMODULE_OK;
- }
- } else {
- RedisModule_ReplyWithError(ctx, "ERR Invalid arguments");
- return REDISMODULE_OK;
- }
-
- /* Approx or exact */
- int flags;
- arg = RedisModule_StringPtrLen(argv[3], &arg_len);
- if (arg_len == 1 && arg[0] == '~') {
- flags = REDISMODULE_STREAM_TRIM_APPROX;
- } else if (arg_len == 1 && arg[0] == '=') {
- flags = 0;
- } else {
- RedisModule_ReplyWithError(ctx, "ERR Invalid approx-or-exact mark");
- return REDISMODULE_OK;
- }
-
- /* Trim */
- RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
- long long trimmed;
- if (trim_by_id) {
- trimmed = RedisModule_StreamTrimByID(key, flags, &minid);
- } else {
- trimmed = RedisModule_StreamTrimByLength(key, flags, maxlen);
- }
-
- /* Return result */
- if (trimmed < 0) {
- RedisModule_ReplyWithError(ctx, "ERR Trimming failed");
- } else {
- RedisModule_ReplyWithLongLong(ctx, trimmed);
- }
- RedisModule_CloseKey(key);
- return REDISMODULE_OK;
-}
-
-int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argv);
- REDISMODULE_NOT_USED(argc);
- if (RedisModule_Init(ctx, "stream", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "stream.add", stream_add, "write",
- 1, 1, 1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "stream.addn", stream_addn, "write",
- 1, 1, 1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "stream.delete", stream_delete, "write",
- 1, 1, 1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "stream.range", stream_range, "write",
- 1, 1, 1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
- if (RedisModule_CreateCommand(ctx, "stream.trim", stream_trim, "write",
- 1, 1, 1) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- return REDISMODULE_OK;
-}