summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/tests/modules/blockedclient.c
diff options
context:
space:
mode:
authorMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:52:54 +0100
committerMitja Felicijan <mitja.felicijan@gmail.com>2026-01-21 22:52:54 +0100
commitdcacc00e3750300617ba6e16eb346713f91a783a (patch)
tree38e2d4fb5ed9d119711d4295c6eda4b014af73fd /examples/redis-unstable/tests/modules/blockedclient.c
parent58dac10aeb8f5a041c46bddbeaf4c7966a99b998 (diff)
downloadcrep-dcacc00e3750300617ba6e16eb346713f91a783a.tar.gz
Remove testing data
Diffstat (limited to 'examples/redis-unstable/tests/modules/blockedclient.c')
-rw-r--r--examples/redis-unstable/tests/modules/blockedclient.c723
1 files changed, 0 insertions, 723 deletions
diff --git a/examples/redis-unstable/tests/modules/blockedclient.c b/examples/redis-unstable/tests/modules/blockedclient.c
deleted file mode 100644
index dc226ee..0000000
--- a/examples/redis-unstable/tests/modules/blockedclient.c
+++ /dev/null
@@ -1,723 +0,0 @@
-/* define macros for having usleep */
-#define _BSD_SOURCE
-#define _DEFAULT_SOURCE
-#include <unistd.h>
-
-#include "redismodule.h"
-#include <assert.h>
-#include <stdio.h>
-#include <pthread.h>
-#include <strings.h>
-
-#define UNUSED(V) ((void) V)
-
-/* used to test processing events during slow bg operation */
-static volatile int g_slow_bg_operation = 0;
-static volatile int g_is_in_slow_bg_operation = 0;
-
-void *sub_worker(void *arg) {
- // Get Redis module context
- RedisModuleCtx *ctx = (RedisModuleCtx *)arg;
-
- // Try acquiring GIL
- int res = RedisModule_ThreadSafeContextTryLock(ctx);
-
- // GIL is already taken by the calling thread expecting to fail.
- assert(res != REDISMODULE_OK);
-
- return NULL;
-}
-
-void *worker(void *arg) {
- // Retrieve blocked client
- RedisModuleBlockedClient *bc = (RedisModuleBlockedClient *)arg;
-
- // Get Redis module context
- RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bc);
-
- // Acquire GIL
- RedisModule_ThreadSafeContextLock(ctx);
-
- // Create another thread which will try to acquire the GIL
- pthread_t tid;
- int res = pthread_create(&tid, NULL, sub_worker, ctx);
- assert(res == 0);
-
- // Wait for thread
- pthread_join(tid, NULL);
-
- // Release GIL
- RedisModule_ThreadSafeContextUnlock(ctx);
-
- // Reply to client
- RedisModule_ReplyWithSimpleString(ctx, "OK");
-
- // Unblock client
- RedisModule_UnblockClient(bc, NULL);
-
- // Free the Redis module context
- RedisModule_FreeThreadSafeContext(ctx);
-
- return NULL;
-}
-
-int acquire_gil(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
-{
- UNUSED(argv);
- UNUSED(argc);
-
- int flags = RedisModule_GetContextFlags(ctx);
- int allFlags = RedisModule_GetContextFlagsAll();
- if ((allFlags & REDISMODULE_CTX_FLAGS_MULTI) &&
- (flags & REDISMODULE_CTX_FLAGS_MULTI)) {
- RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi");
- return REDISMODULE_OK;
- }
-
- if ((allFlags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) &&
- (flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING)) {
- RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not allowed");
- return REDISMODULE_OK;
- }
-
- /* This command handler tries to acquire the GIL twice
- * once in the worker thread using "RedisModule_ThreadSafeContextLock"
- * second in the sub-worker thread
- * using "RedisModule_ThreadSafeContextTryLock"
- * as the GIL is already locked. */
- RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
-
- pthread_t tid;
- int res = pthread_create(&tid, NULL, worker, bc);
- assert(res == 0);
- pthread_detach(tid);
-
- return REDISMODULE_OK;
-}
-
-typedef struct {
- RedisModuleString **argv;
- int argc;
- RedisModuleBlockedClient *bc;
-} bg_call_data;
-
-void *bg_call_worker(void *arg) {
- bg_call_data *bg = arg;
- RedisModuleBlockedClient *bc = bg->bc;
-
- // Get Redis module context
- RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(bg->bc);
-
- // Acquire GIL
- RedisModule_ThreadSafeContextLock(ctx);
-
- // Test slow operation yielding
- if (g_slow_bg_operation) {
- g_is_in_slow_bg_operation = 1;
- while (g_slow_bg_operation) {
- RedisModule_Yield(ctx, REDISMODULE_YIELD_FLAG_CLIENTS, "Slow module operation");
- usleep(1000);
- }
- g_is_in_slow_bg_operation = 0;
- }
-
- // Call the command
- const char *module_cmd = RedisModule_StringPtrLen(bg->argv[0], NULL);
- int cmd_pos = 1;
- RedisModuleString *format_redis_str = RedisModule_CreateString(NULL, "v", 1);
- if (!strcasecmp(module_cmd, "do_bg_rm_call_format")) {
- cmd_pos = 2;
- size_t format_len;
- const char *format = RedisModule_StringPtrLen(bg->argv[1], &format_len);
- RedisModule_StringAppendBuffer(NULL, format_redis_str, format, format_len);
- RedisModule_StringAppendBuffer(NULL, format_redis_str, "E", 1);
- }
- const char *format = RedisModule_StringPtrLen(format_redis_str, NULL);
- const char *cmd = RedisModule_StringPtrLen(bg->argv[cmd_pos], NULL);
- RedisModuleCallReply *rep = RedisModule_Call(ctx, cmd, format, bg->argv + cmd_pos + 1, (size_t)bg->argc - cmd_pos - 1);
- RedisModule_FreeString(NULL, format_redis_str);
-
- /* Free the arguments within GIL to prevent simultaneous freeing in main thread. */
- for (int i=0; i<bg->argc; i++)
- RedisModule_FreeString(ctx, bg->argv[i]);
- RedisModule_Free(bg->argv);
- RedisModule_Free(bg);
-
- // Release GIL
- RedisModule_ThreadSafeContextUnlock(ctx);
-
- // Reply to client
- if (!rep) {
- RedisModule_ReplyWithError(ctx, "NULL reply returned");
- } else {
- RedisModule_ReplyWithCallReply(ctx, rep);
- RedisModule_FreeCallReply(rep);
- }
-
- // Unblock client
- RedisModule_UnblockClient(bc, NULL);
-
- // Free the Redis module context
- RedisModule_FreeThreadSafeContext(ctx);
-
- return NULL;
-}
-
-int do_bg_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
-{
- UNUSED(argv);
- UNUSED(argc);
-
- /* Make sure we're not trying to block a client when we shouldn't */
- int flags = RedisModule_GetContextFlags(ctx);
- int allFlags = RedisModule_GetContextFlagsAll();
- if ((allFlags & REDISMODULE_CTX_FLAGS_MULTI) &&
- (flags & REDISMODULE_CTX_FLAGS_MULTI)) {
- RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not supported inside multi");
- return REDISMODULE_OK;
- }
- if ((allFlags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) &&
- (flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING)) {
- RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not allowed");
- return REDISMODULE_OK;
- }
-
- /* Make a copy of the arguments and pass them to the thread. */
- bg_call_data *bg = RedisModule_Alloc(sizeof(bg_call_data));
- bg->argv = RedisModule_Alloc(sizeof(RedisModuleString*)*argc);
- bg->argc = argc;
- for (int i=0; i<argc; i++)
- bg->argv[i] = RedisModule_HoldString(ctx, argv[i]);
-
- /* Block the client */
- bg->bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
-
- /* Start a thread to handle the request */
- pthread_t tid;
- int res = pthread_create(&tid, NULL, bg_call_worker, bg);
- assert(res == 0);
- pthread_detach(tid);
-
- return REDISMODULE_OK;
-}
-
-int do_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
- UNUSED(argv);
- UNUSED(argc);
-
- if(argc < 2){
- return RedisModule_WrongArity(ctx);
- }
-
- const char* cmd = RedisModule_StringPtrLen(argv[1], NULL);
-
- RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "Ev", argv + 2, (size_t)argc - 2);
- if(!rep){
- RedisModule_ReplyWithError(ctx, "NULL reply returned");
- }else{
- RedisModule_ReplyWithCallReply(ctx, rep);
- RedisModule_FreeCallReply(rep);
- }
-
- return REDISMODULE_OK;
-}
-
-static void rm_call_async_send_reply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) {
- RedisModule_ReplyWithCallReply(ctx, reply);
- RedisModule_FreeCallReply(reply);
-}
-
-/* Called when the command that was blocked on 'RM_Call' gets unblocked
- * and send the reply to the blocked client. */
-static void rm_call_async_on_unblocked(RedisModuleCtx *ctx, RedisModuleCallReply *reply, void *private_data) {
- UNUSED(ctx);
- RedisModuleBlockedClient *bc = private_data;
- RedisModuleCtx *bctx = RedisModule_GetThreadSafeContext(bc);
- rm_call_async_send_reply(bctx, reply);
- RedisModule_FreeThreadSafeContext(bctx);
- RedisModule_UnblockClient(bc, RedisModule_BlockClientGetPrivateData(bc));
-}
-
-int do_rm_call_async_fire_and_forget(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
- UNUSED(argv);
- UNUSED(argc);
-
- if(argc < 2){
- return RedisModule_WrongArity(ctx);
- }
- const char* cmd = RedisModule_StringPtrLen(argv[1], NULL);
-
- RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "!KEv", argv + 2, (size_t)argc - 2);
-
- if(RedisModule_CallReplyType(rep) != REDISMODULE_REPLY_PROMISE) {
- RedisModule_ReplyWithCallReply(ctx, rep);
- } else {
- RedisModule_ReplyWithSimpleString(ctx, "Blocked");
- }
- RedisModule_FreeCallReply(rep);
-
- return REDISMODULE_OK;
-}
-
-static void do_rm_call_async_free_pd(RedisModuleCtx * ctx, void *pd) {
- UNUSED(ctx);
- RedisModule_FreeCallReply(pd);
-}
-
-static void do_rm_call_async_disconnect(RedisModuleCtx *ctx, struct RedisModuleBlockedClient *bc) {
- UNUSED(ctx);
- RedisModuleCallReply* rep = RedisModule_BlockClientGetPrivateData(bc);
- RedisModule_CallReplyPromiseAbort(rep, NULL);
- RedisModule_FreeCallReply(rep);
- RedisModule_AbortBlock(bc);
-}
-
-/*
- * Callback for do_rm_call_async / do_rm_call_async_script_mode
- * Gets the command to invoke as the first argument to the command and runs it,
- * passing the rest of the arguments to the command invocation.
- * If the command got blocked, blocks the client and unblock it when the command gets unblocked,
- * this allows check the K (allow blocking) argument to RM_Call.
- */
-int do_rm_call_async(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
- UNUSED(argv);
- UNUSED(argc);
-
- if(argc < 2){
- return RedisModule_WrongArity(ctx);
- }
-
- size_t format_len = 0;
- char format[6] = {0};
-
- if (!(RedisModule_GetContextFlags(ctx) & REDISMODULE_CTX_FLAGS_DENY_BLOCKING)) {
- /* We are allowed to block the client so we can allow RM_Call to also block us */
- format[format_len++] = 'K';
- }
-
- const char* invoked_cmd = RedisModule_StringPtrLen(argv[0], NULL);
- if (strcasecmp(invoked_cmd, "do_rm_call_async_script_mode") == 0) {
- format[format_len++] = 'S';
- }
-
- format[format_len++] = 'E';
- format[format_len++] = 'v';
- if (strcasecmp(invoked_cmd, "do_rm_call_async_no_replicate") != 0) {
- /* Notice, without the '!' flag we will have inconsistency between master and replica.
- * This is used only to check '!' flag correctness on blocked commands. */
- format[format_len++] = '!';
- }
-
- const char* cmd = RedisModule_StringPtrLen(argv[1], NULL);
-
- RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, format, argv + 2, (size_t)argc - 2);
-
- if(RedisModule_CallReplyType(rep) != REDISMODULE_REPLY_PROMISE) {
- rm_call_async_send_reply(ctx, rep);
- } else {
- RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, do_rm_call_async_free_pd, 0);
- RedisModule_SetDisconnectCallback(bc, do_rm_call_async_disconnect);
- RedisModule_BlockClientSetPrivateData(bc, rep);
- RedisModule_CallReplyPromiseSetUnblockHandler(rep, rm_call_async_on_unblocked, bc);
- }
-
- return REDISMODULE_OK;
-}
-
-typedef struct ThreadedAsyncRMCallCtx{
- RedisModuleBlockedClient *bc;
- RedisModuleCallReply *reply;
-} ThreadedAsyncRMCallCtx;
-
-void *send_async_reply(void *arg) {
- ThreadedAsyncRMCallCtx *ta_rm_call_ctx = arg;
- rm_call_async_on_unblocked(NULL, ta_rm_call_ctx->reply, ta_rm_call_ctx->bc);
- RedisModule_Free(ta_rm_call_ctx);
- return NULL;
-}
-
-/* Called when the command that was blocked on 'RM_Call' gets unblocked
- * and schedule a thread to send the reply to the blocked client. */
-static void rm_call_async_reply_on_thread(RedisModuleCtx *ctx, RedisModuleCallReply *reply, void *private_data) {
- UNUSED(ctx);
- ThreadedAsyncRMCallCtx *ta_rm_call_ctx = RedisModule_Alloc(sizeof(*ta_rm_call_ctx));
- ta_rm_call_ctx->bc = private_data;
- ta_rm_call_ctx->reply = reply;
- pthread_t tid;
- int res = pthread_create(&tid, NULL, send_async_reply, ta_rm_call_ctx);
- assert(res == 0);
- pthread_detach(tid);
-}
-
-/*
- * Callback for do_rm_call_async_on_thread.
- * Gets the command to invoke as the first argument to the command and runs it,
- * passing the rest of the arguments to the command invocation.
- * If the command got blocked, blocks the client and unblock on a background thread.
- * this allows check the K (allow blocking) argument to RM_Call, and make sure that the reply
- * that passes to unblock handler is owned by the handler and are not attached to any
- * context that might be freed after the callback ends.
- */
-int do_rm_call_async_on_thread(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
- UNUSED(argv);
- UNUSED(argc);
-
- if(argc < 2){
- return RedisModule_WrongArity(ctx);
- }
-
- const char* cmd = RedisModule_StringPtrLen(argv[1], NULL);
-
- RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "KEv", argv + 2, (size_t)argc - 2);
-
- if(RedisModule_CallReplyType(rep) != REDISMODULE_REPLY_PROMISE) {
- rm_call_async_send_reply(ctx, rep);
- } else {
- RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
- RedisModule_CallReplyPromiseSetUnblockHandler(rep, rm_call_async_reply_on_thread, bc);
- RedisModule_FreeCallReply(rep);
- }
-
- return REDISMODULE_OK;
-}
-
-/* Private data for wait_and_do_rm_call_async that holds information about:
- * 1. the block client, to unblock when done.
- * 2. the arguments, contains the command to run using RM_Call */
-typedef struct WaitAndDoRMCallCtx {
- RedisModuleBlockedClient *bc;
- RedisModuleString **argv;
- int argc;
-} WaitAndDoRMCallCtx;
-
-/*
- * This callback will be called when the 'wait' command invoke on 'wait_and_do_rm_call_async' will finish.
- * This callback will continue the execution flow just like 'do_rm_call_async' command.
- */
-static void wait_and_do_rm_call_async_on_unblocked(RedisModuleCtx *ctx, RedisModuleCallReply *reply, void *private_data) {
- WaitAndDoRMCallCtx *wctx = private_data;
- if (RedisModule_CallReplyType(reply) != REDISMODULE_REPLY_INTEGER) {
- goto done;
- }
-
- if (RedisModule_CallReplyInteger(reply) != 1) {
- goto done;
- }
-
- RedisModule_FreeCallReply(reply);
- reply = NULL;
-
- const char* cmd = RedisModule_StringPtrLen(wctx->argv[0], NULL);
- reply = RedisModule_Call(ctx, cmd, "!EKv", wctx->argv + 1, (size_t)wctx->argc - 1);
-
-done:
- if(RedisModule_CallReplyType(reply) != REDISMODULE_REPLY_PROMISE) {
- RedisModuleCtx *bctx = RedisModule_GetThreadSafeContext(wctx->bc);
- rm_call_async_send_reply(bctx, reply);
- RedisModule_FreeThreadSafeContext(bctx);
- RedisModule_UnblockClient(wctx->bc, NULL);
- } else {
- RedisModule_CallReplyPromiseSetUnblockHandler(reply, rm_call_async_on_unblocked, wctx->bc);
- RedisModule_FreeCallReply(reply);
- }
- for (int i = 0 ; i < wctx->argc ; ++i) {
- RedisModule_FreeString(NULL, wctx->argv[i]);
- }
- RedisModule_Free(wctx->argv);
- RedisModule_Free(wctx);
-}
-
-/*
- * Callback for wait_and_do_rm_call
- * Gets the command to invoke as the first argument, runs 'wait'
- * command (using the K flag to RM_Call). Once the wait finished, runs the
- * command that was given (just like 'do_rm_call_async').
- */
-int wait_and_do_rm_call_async(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- UNUSED(argv);
- UNUSED(argc);
-
- if(argc < 2){
- return RedisModule_WrongArity(ctx);
- }
-
- int flags = RedisModule_GetContextFlags(ctx);
- if (flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) {
- return RedisModule_ReplyWithError(ctx, "Err can not run wait, blocking is not allowed.");
- }
-
- RedisModuleCallReply* rep = RedisModule_Call(ctx, "wait", "!EKcc", "1", "0");
- if(RedisModule_CallReplyType(rep) != REDISMODULE_REPLY_PROMISE) {
- rm_call_async_send_reply(ctx, rep);
- } else {
- RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
- WaitAndDoRMCallCtx *wctx = RedisModule_Alloc(sizeof(*wctx));
- *wctx = (WaitAndDoRMCallCtx){
- .bc = bc,
- .argv = RedisModule_Alloc((argc - 1) * sizeof(RedisModuleString*)),
- .argc = argc - 1,
- };
-
- for (int i = 1 ; i < argc ; ++i) {
- wctx->argv[i - 1] = RedisModule_HoldString(NULL, argv[i]);
- }
- RedisModule_CallReplyPromiseSetUnblockHandler(rep, wait_and_do_rm_call_async_on_unblocked, wctx);
- RedisModule_FreeCallReply(rep);
- }
-
- return REDISMODULE_OK;
-}
-
-static void blpop_and_set_multiple_keys_on_unblocked(RedisModuleCtx *ctx, RedisModuleCallReply *reply, void *private_data) {
- /* ignore the reply */
- RedisModule_FreeCallReply(reply);
- WaitAndDoRMCallCtx *wctx = private_data;
- for (int i = 0 ; i < wctx->argc ; i += 2) {
- RedisModuleCallReply* rep = RedisModule_Call(ctx, "set", "!ss", wctx->argv[i], wctx->argv[i + 1]);
- RedisModule_FreeCallReply(rep);
- }
-
- RedisModuleCtx *bctx = RedisModule_GetThreadSafeContext(wctx->bc);
- RedisModule_ReplyWithSimpleString(bctx, "OK");
- RedisModule_FreeThreadSafeContext(bctx);
- RedisModule_UnblockClient(wctx->bc, NULL);
-
- for (int i = 0 ; i < wctx->argc ; ++i) {
- RedisModule_FreeString(NULL, wctx->argv[i]);
- }
- RedisModule_Free(wctx->argv);
- RedisModule_Free(wctx);
-
-}
-
-/*
- * Performs a blpop command on a given list and when unblocked set multiple string keys.
- * This command allows checking that the unblock callback is performed as a unit
- * and its effect are replicated to the replica and AOF wrapped with multi exec.
- */
-int blpop_and_set_multiple_keys(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- UNUSED(argv);
- UNUSED(argc);
-
- if(argc < 2 || argc % 2 != 0){
- return RedisModule_WrongArity(ctx);
- }
-
- int flags = RedisModule_GetContextFlags(ctx);
- if (flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) {
- return RedisModule_ReplyWithError(ctx, "Err can not run wait, blocking is not allowed.");
- }
-
- RedisModuleCallReply* rep = RedisModule_Call(ctx, "blpop", "!EKsc", argv[1], "0");
- if(RedisModule_CallReplyType(rep) != REDISMODULE_REPLY_PROMISE) {
- rm_call_async_send_reply(ctx, rep);
- } else {
- RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
- WaitAndDoRMCallCtx *wctx = RedisModule_Alloc(sizeof(*wctx));
- *wctx = (WaitAndDoRMCallCtx){
- .bc = bc,
- .argv = RedisModule_Alloc((argc - 2) * sizeof(RedisModuleString*)),
- .argc = argc - 2,
- };
-
- for (int i = 0 ; i < argc - 2 ; ++i) {
- wctx->argv[i] = RedisModule_HoldString(NULL, argv[i + 2]);
- }
- RedisModule_CallReplyPromiseSetUnblockHandler(rep, blpop_and_set_multiple_keys_on_unblocked, wctx);
- RedisModule_FreeCallReply(rep);
- }
-
- return REDISMODULE_OK;
-}
-
-/* simulate a blocked client replying to a thread safe context without creating a thread */
-int do_fake_bg_true(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- UNUSED(argv);
- UNUSED(argc);
-
- RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
- RedisModuleCtx *bctx = RedisModule_GetThreadSafeContext(bc);
-
- RedisModule_ReplyWithBool(bctx, 1);
-
- RedisModule_FreeThreadSafeContext(bctx);
- RedisModule_UnblockClient(bc, NULL);
-
- return REDISMODULE_OK;
-}
-
-
-/* this flag is used to work with busy commands, that might take a while
- * and ability to stop the busy work with a different command*/
-static volatile int abort_flag = 0;
-
-int slow_fg_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc != 2) {
- RedisModule_WrongArity(ctx);
- return REDISMODULE_OK;
- }
- long long block_time = 0;
- if (RedisModule_StringToLongLong(argv[1], &block_time) != REDISMODULE_OK) {
- RedisModule_ReplyWithError(ctx, "Invalid integer value");
- return REDISMODULE_OK;
- }
-
- uint64_t start_time = RedisModule_MonotonicMicroseconds();
- /* when not blocking indefinitely, we don't process client commands in this test. */
- int yield_flags = block_time? REDISMODULE_YIELD_FLAG_NONE: REDISMODULE_YIELD_FLAG_CLIENTS;
- while (!abort_flag) {
- RedisModule_Yield(ctx, yield_flags, "Slow module operation");
- usleep(1000);
- if (block_time && RedisModule_MonotonicMicroseconds() - start_time > (uint64_t)block_time)
- break;
- }
-
- abort_flag = 0;
- RedisModule_ReplyWithLongLong(ctx, 1);
- return REDISMODULE_OK;
-}
-
-int stop_slow_fg_command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argv);
- REDISMODULE_NOT_USED(argc);
- abort_flag = 1;
- RedisModule_ReplyWithLongLong(ctx, 1);
- return REDISMODULE_OK;
-}
-
-/* used to enable or disable slow operation in do_bg_rm_call */
-static int set_slow_bg_operation(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- if (argc != 2) {
- RedisModule_WrongArity(ctx);
- return REDISMODULE_OK;
- }
- long long ll;
- if (RedisModule_StringToLongLong(argv[1], &ll) != REDISMODULE_OK) {
- RedisModule_ReplyWithError(ctx, "Invalid integer value");
- return REDISMODULE_OK;
- }
- g_slow_bg_operation = ll;
- RedisModule_ReplyWithSimpleString(ctx, "OK");
- return REDISMODULE_OK;
-}
-
-/* used to test if we reached the slow operation in do_bg_rm_call */
-static int is_in_slow_bg_operation(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- UNUSED(argv);
- if (argc != 1) {
- RedisModule_WrongArity(ctx);
- return REDISMODULE_OK;
- }
-
- RedisModule_ReplyWithLongLong(ctx, g_is_in_slow_bg_operation);
- return REDISMODULE_OK;
-}
-
-static void timer_callback(RedisModuleCtx *ctx, void *data)
-{
- UNUSED(ctx);
-
- RedisModuleBlockedClient *bc = data;
-
- // Get Redis module context
- RedisModuleCtx *reply_ctx = RedisModule_GetThreadSafeContext(bc);
-
- // Reply to client
- RedisModule_ReplyWithSimpleString(reply_ctx, "OK");
-
- // Unblock client
- RedisModule_UnblockClient(bc, NULL);
-
- // Free the Redis module context
- RedisModule_FreeThreadSafeContext(reply_ctx);
-}
-
-/* unblock_by_timer <period_ms> <timeout_ms>
- * period_ms is the period of the timer.
- * timeout_ms is the blocking timeout. */
-int unblock_by_timer(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
-{
- if (argc != 3)
- return RedisModule_WrongArity(ctx);
-
- long long period;
- long long timeout;
- if (RedisModule_StringToLongLong(argv[1],&period) != REDISMODULE_OK)
- return RedisModule_ReplyWithError(ctx,"ERR invalid period");
- if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK) {
- return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
- }
-
- RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, timeout);
- RedisModule_CreateTimer(ctx, period, timer_callback, bc);
- return REDISMODULE_OK;
-}
-
-int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
- REDISMODULE_NOT_USED(argv);
- REDISMODULE_NOT_USED(argc);
-
- if (RedisModule_Init(ctx, "blockedclient", 1, REDISMODULE_APIVER_1)== REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "acquire_gil", acquire_gil, "", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "do_rm_call", do_rm_call,
- "write", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "do_rm_call_async", do_rm_call_async,
- "write", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "do_rm_call_async_on_thread", do_rm_call_async_on_thread,
- "write", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "do_rm_call_async_script_mode", do_rm_call_async,
- "write", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "do_rm_call_async_no_replicate", do_rm_call_async,
- "write", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "do_rm_call_fire_and_forget", do_rm_call_async_fire_and_forget,
- "write", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "wait_and_do_rm_call", wait_and_do_rm_call_async,
- "write", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "blpop_and_set_multiple_keys", blpop_and_set_multiple_keys,
- "write", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "do_bg_rm_call", do_bg_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "do_bg_rm_call_format", do_bg_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "do_fake_bg_true", do_fake_bg_true, "", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "slow_fg_command", slow_fg_command,"", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "stop_slow_fg_command", stop_slow_fg_command,"allow-busy", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "set_slow_bg_operation", set_slow_bg_operation, "allow-busy", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "is_in_slow_bg_operation", is_in_slow_bg_operation, "allow-busy", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- if (RedisModule_CreateCommand(ctx, "unblock_by_timer", unblock_by_timer, "", 0, 0, 0) == REDISMODULE_ERR)
- return REDISMODULE_ERR;
-
- return REDISMODULE_OK;
-}