diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-01-21 22:40:55 +0100 |
| commit | 5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda (patch) | |
| tree | 1acdfa5220cd13b7be43a2a01368e80d306473ca /examples/redis-unstable/src/threads_mngr.c | |
| parent | c7ab12bba64d9c20ccd79b132dac475f7bc3923e (diff) | |
| download | crep-5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda.tar.gz | |
Add Redis source code for testing
Diffstat (limited to 'examples/redis-unstable/src/threads_mngr.c')
| -rw-r--r-- | examples/redis-unstable/src/threads_mngr.c | 171 |
1 files changed, 171 insertions, 0 deletions
diff --git a/examples/redis-unstable/src/threads_mngr.c b/examples/redis-unstable/src/threads_mngr.c new file mode 100644 index 0000000..dec83e3 --- /dev/null +++ b/examples/redis-unstable/src/threads_mngr.c @@ -0,0 +1,171 @@ +/* + * Copyright (c) 2021-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#include "threads_mngr.h" +/* Anti-warning macro... */ +#define UNUSED(V) ((void) V) + +#ifdef __linux__ +#include "atomicvar.h" +#include "server.h" + +#include <signal.h> +#include <time.h> +#include <sys/syscall.h> + +#define IN_PROGRESS 1 +static const clock_t RUN_ON_THREADS_TIMEOUT = 2; + +/*================================= Globals ================================= */ + +static redisAtomic run_on_thread_cb g_callback = NULL; +static redisAtomic size_t g_tids_len = 0; +static redisAtomic size_t g_num_threads_done = 0; + +/* This flag is set while ThreadsManager_runOnThreads is running */ +static redisAtomic int g_in_progress = 0; + +/*============================ Internal prototypes ========================== */ + +static void invoke_callback(int sig); +/* returns 0 if it is safe to start, IN_PROGRESS otherwise. */ +static int test_and_start(void); +static void wait_threads(void); +/* Clean up global variable. +Assuming we are under the g_in_progress protection, this is not a thread-safe function */ +static void ThreadsManager_cleanups(void); + +/*============================ API functions implementations ========================== */ + +void ThreadsManager_init(void) { + /* Register signal handler */ + struct sigaction act; + sigemptyset(&act.sa_mask); + /* Not setting SA_RESTART flag means that If a signal handler is invoked while a + system call or library function call is blocked, use the default behavior + i.e., the call fails with the error EINTR */ + act.sa_flags = 0; + act.sa_handler = invoke_callback; + sigaction(SIGUSR2, &act, NULL); +} + +__attribute__ ((noinline)) +int ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback) { + /* Check if it is safe to start running. If not - return */ + if(test_and_start() == IN_PROGRESS) { + return 0; + } + + /* Update g_callback */ + atomicSet(g_callback, callback); + + /* Set g_tids_len */ + atomicSet(g_tids_len, tids_len); + + /* set g_num_threads_done to 0 To handler the case where in the previous run we reached the timeout + and called ThreadsManager_cleanups before one or more threads were done and increased + (the already set to 0) g_num_threads_done */ + atomicSet(g_num_threads_done, 0); + + /* Send signal to all the threads in tids */ + pid_t pid = getpid(); + for (size_t i = 0; i < tids_len ; ++i) { + syscall(SYS_tgkill, pid, tids[i], THREADS_SIGNAL); + } + + /* Wait for all the threads to write to the output array, or until timeout is reached */ + wait_threads(); + + /* Cleanups to allow next execution */ + ThreadsManager_cleanups(); + + return 1; +} + +/*============================ Internal functions implementations ========================== */ + + +static int test_and_start(void) { + /* atomicFlagGetSet sets the variable to 1 and returns the previous value */ + int prev_state; + atomicFlagGetSet(g_in_progress, prev_state); + + /* If prev_state is 1, g_in_progress was on. */ + return prev_state; +} + +__attribute__ ((noinline)) +static void invoke_callback(int sig) { + UNUSED(sig); + + run_on_thread_cb callback; + atomicGet(g_callback, callback); + if (callback) { + callback(); + atomicIncr(g_num_threads_done, 1); + } else { + serverLogFromHandler(LL_WARNING, "tid %ld: ThreadsManager g_callback is NULL", syscall(SYS_gettid)); + } +} + +static void wait_threads(void) { + struct timespec timeout_time; + clock_gettime(CLOCK_REALTIME, &timeout_time); + + /* calculate relative time until timeout */ + timeout_time.tv_sec += RUN_ON_THREADS_TIMEOUT; + + /* Wait until all threads are done to invoke the callback or until we reached the timeout */ + size_t curr_done_count; + struct timespec curr_time; + size_t tids_len; + + do { + struct timeval tv = { + .tv_sec = 0, + .tv_usec = 10}; + /* Sleep a bit to yield to other threads. */ + /* usleep isn't listed as signal safe, so we use select instead */ + select(0, NULL, NULL, NULL, &tv); + atomicGet(g_num_threads_done, curr_done_count); + clock_gettime(CLOCK_REALTIME, &curr_time); + atomicGet(g_tids_len, tids_len); + } while (curr_done_count < tids_len && + curr_time.tv_sec <= timeout_time.tv_sec); + + if (curr_time.tv_sec > timeout_time.tv_sec) { + serverLogRawFromHandler(LL_WARNING, "wait_threads(): waiting threads timed out"); + } + +} + +static void ThreadsManager_cleanups(void) { + atomicSet(g_callback, NULL); + atomicSet(g_tids_len, 0); + atomicSet(g_num_threads_done, 0); + + /* Lastly, turn off g_in_progress */ + atomicSet(g_in_progress, 0); + +} +#else + +void ThreadsManager_init(void) { + /* DO NOTHING */ +} + +int ThreadsManager_runOnThreads(pid_t *tids, size_t tids_len, run_on_thread_cb callback) { + /* DO NOTHING */ + UNUSED(tids); + UNUSED(tids_len); + UNUSED(callback); + return 1; +} + +#endif /* __linux__ */ |
