From 5d8dfe892a2ea89f706ee140c3bdcfd89fe03fda Mon Sep 17 00:00:00 2001 From: Mitja Felicijan Date: Wed, 21 Jan 2026 22:40:55 +0100 Subject: Add Redis source code for testing --- examples/redis-unstable/src/iothread.c | 955 +++++++++++++++++++++++++++++++++ 1 file changed, 955 insertions(+) create mode 100644 examples/redis-unstable/src/iothread.c (limited to 'examples/redis-unstable/src/iothread.c') diff --git a/examples/redis-unstable/src/iothread.c b/examples/redis-unstable/src/iothread.c new file mode 100644 index 0000000..54a9cae --- /dev/null +++ b/examples/redis-unstable/src/iothread.c @@ -0,0 +1,955 @@ +/* iothread.c -- The threaded io implementation. + * + * Copyright (c) 2024-Present, Redis Ltd. + * All rights reserved. + * + * Licensed under your choice of (a) the Redis Source Available License 2.0 + * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the + * GNU Affero General Public License v3 (AGPLv3). + */ + +#include "server.h" + +/* IO threads. */ +static IOThread IOThreads[IO_THREADS_MAX_NUM]; + +/* For main thread */ +static list *mainThreadPendingClientsToIOThreads[IO_THREADS_MAX_NUM]; /* Clients to IO threads */ +static list *mainThreadProcessingClients[IO_THREADS_MAX_NUM]; /* Clients in processing */ +static list *mainThreadPendingClients[IO_THREADS_MAX_NUM]; /* Pending clients from IO threads */ +static pthread_mutex_t mainThreadPendingClientsMutexes[IO_THREADS_MAX_NUM]; /* Mutex for pending clients */ +static eventNotifier* mainThreadPendingClientsNotifiers[IO_THREADS_MAX_NUM]; /* Notifier for pending clients */ + +/* Send the clients to the main thread for processing when the number of clients + * in pending list reaches IO_THREAD_MAX_PENDING_CLIENTS, or check_size is 0. */ +static inline void sendPendingClientsToMainThreadIfNeeded(IOThread *t, int check_size) { + size_t len = listLength(t->pending_clients_to_main_thread); + if (len == 0 || (check_size && len < IO_THREAD_MAX_PENDING_CLIENTS)) return; + + int running = 0, pending = 0; + pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]); + pending = listLength(mainThreadPendingClients[t->id]); + listJoin(mainThreadPendingClients[t->id], t->pending_clients_to_main_thread); + pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]); + if (!pending) atomicGetWithSync(server.running, running); + + /* Only notify main thread if it is not running and no pending clients to process, + * to avoid unnecessary notify/wakeup. If the main thread is running, it will + * process the clients in beforeSleep. If there are pending clients, we may + * already notify the main thread if needed. */ + if (!running && !pending) { + triggerEventNotifier(mainThreadPendingClientsNotifiers[t->id]); + } +} + +/* When moving a client from IO thread to main thread we may need to update + * some of its variables as they are duplicated to avoid contention with main + * thread. + * For now this is valid only for master or slave clients. */ +void updateClientDataFromIOThread(client *c) { + if (!(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_SLAVE)) return; + + serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID && + c->running_tid == IOTHREAD_MAIN_THREAD_ID); + + if (c->io_repl_ack_time > c->repl_ack_time) { + serverAssert(c->flags & CLIENT_SLAVE); + c->repl_ack_time = c->io_repl_ack_time; + } + if (c->io_lastinteraction > c->lastinteraction) { + serverAssert(c->flags & CLIENT_MASTER); + c->lastinteraction = c->io_lastinteraction; + } + if (c->io_read_reploff > c->read_reploff) { + serverAssert(c->flags & CLIENT_MASTER); + c->read_reploff = c->io_read_reploff; + } + + /* Update replication buffer referenced node if IO thread has sent some data. */ + if (c->flags & CLIENT_SLAVE && c->ref_repl_buf_node != NULL && + (c->io_curr_repl_node != c->ref_repl_buf_node || + c->io_curr_block_pos != c->ref_block_pos)) + { + ((replBufBlock*)listNodeValue(c->ref_repl_buf_node))->refcount--; + ((replBufBlock*)listNodeValue(c->io_curr_repl_node))->refcount++; + c->ref_block_pos = c->io_curr_block_pos; + c->ref_repl_buf_node = c->io_curr_repl_node; + incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); + } +} + +/* Check to see if the client needs any cron jobs run for them. Return 1 if the + * client should be terminated */ +int runClientCronFromIOThread(client *c) { + if (c->flags & CLIENT_MASTER && + c->io_last_repl_cron + 1000 <= server.mstime) + { + c->io_last_repl_cron = server.mstime; + if (replicationCronRunMasterClient()) return 1; + } + + /* Run client cron task for the client per second or it is marked as pending cron. */ + if (c->io_last_client_cron + 1000 <= server.mstime || + c->io_flags & CLIENT_IO_PENDING_CRON) + { + c->io_last_client_cron = server.mstime; + if (clientsCronRunClient(c)) return 1; + } else { + /* Update the client in the mem usage if clientsCronRunClient is not + * being called, since that function already performs the update. */ + updateClientMemUsageAndBucket(c); + } + + return 0; +} + +/* When IO threads read a complete query of clients or want to free clients, it + * should remove it from its clients list and put the client in the list to main + * thread, we will send these clients to main thread in IOThreadBeforeSleep. */ +void enqueuePendingClientsToMainThread(client *c, int unbind) { + /* If the IO thread may no longer manage it, such as closing client, we should + * unbind client from event loop, so main thread doesn't need to do it costly. */ + if (unbind) connUnbindEventLoop(c->conn); + /* Just skip if it already is transferred. */ + if (c->io_thread_client_list_node) { + IOThread *t = &IOThreads[c->tid]; + /* If there are several clients to process, let the main thread handle them ASAP. + * Since the client being added to the queue may still need to be processed by + * the IO thread, we must call this before adding it to the queue to avoid + * races with the main thread. */ + sendPendingClientsToMainThreadIfNeeded(t, 1); + /* Disable read and write to avoid race when main thread processes. */ + c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED); + /* Remove the client from IO thread, add it to main thread's pending list. */ + listUnlinkNode(t->clients, c->io_thread_client_list_node); + listLinkNodeTail(t->pending_clients_to_main_thread, c->io_thread_client_list_node); + c->io_thread_client_list_node = NULL; + } +} + +void enqueuePendingClienstToIOThreads(client *c) { + serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID && + c->running_tid == IOTHREAD_MAIN_THREAD_ID); + + if (c->flags & CLIENT_PENDING_WRITE) { + c->flags &= ~CLIENT_PENDING_WRITE; + listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node); + } + if (c->flags & CLIENT_SLAVE) { + serverAssert(c->ref_repl_buf_node != NULL); + + c->io_repl_ack_time = c->repl_ack_time; + c->io_curr_repl_node = c->ref_repl_buf_node; + c->io_curr_block_pos = c->ref_block_pos; + c->io_bound_repl_node = listLast(server.repl_buffer_blocks); + c->io_bound_block_pos = ((replBufBlock*)listNodeValue(c->io_bound_repl_node))->used; + } + if (c->flags & CLIENT_MASTER) { + c->io_read_reploff = c->read_reploff; + c->io_lastinteraction = c->lastinteraction; + } + + c->running_tid = c->tid; + listAddNodeHead(mainThreadPendingClientsToIOThreads[c->tid], c); +} + +/* Unbind connection of client from io thread event loop, write and read handlers + * also be removed, ensures that we can operate the client safely. */ +void unbindClientFromIOThreadEventLoop(client *c) { + serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID && + c->running_tid == IOTHREAD_MAIN_THREAD_ID); + if (!connHasEventLoop(c->conn)) return; + /* As calling in main thread, we should pause the io thread to make it safe. */ + pauseIOThread(c->tid); + connUnbindEventLoop(c->conn); + resumeIOThread(c->tid); +} + +/* When main thread is processing a client from IO thread, and wants to keep it, + * we should unbind connection of client from io thread event loop first, + * and then bind the client connection into server's event loop. */ +void keepClientInMainThread(client *c) { + if (c->tid == IOTHREAD_MAIN_THREAD_ID) return; + serverAssert(c->running_tid == IOTHREAD_MAIN_THREAD_ID); + /* IO thread no longer manage it. */ + server.io_threads_clients_num[c->tid]--; + /* Unbind connection of client from io thread event loop. */ + unbindClientFromIOThreadEventLoop(c); + /* Update the client's data in case it was just fetched from IO thread */ + updateClientDataFromIOThread(c); + /* Let main thread to run it, rebind event loop and read handler */ + connRebindEventLoop(c->conn, server.el); + connSetReadHandler(c->conn, readQueryFromClient); + c->io_flags |= CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED; + c->tid = IOTHREAD_MAIN_THREAD_ID; + freeClientDeferredObjects(c, 1); /* Free deferred objects. */ + freeClientIODeferredObjects(c, 1); /* Free IO deferred objects. */ + tryUnlinkClientFromPendingRefReply(c, 0); + /* Main thread starts to manage it. */ + server.io_threads_clients_num[c->tid]++; +} + +/* If the client is managed by IO thread, we should fetch it from IO thread + * and then main thread will can process it. Just like IO Thread transfers + * the client to the main thread for processing. */ +void fetchClientFromIOThread(client *c) { + serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID && + c->running_tid != IOTHREAD_MAIN_THREAD_ID); + pauseIOThread(c->tid); + /* Remove the client from clients list of IO thread or main thread. */ + if (c->io_thread_client_list_node) { + listDelNode(IOThreads[c->tid].clients, c->io_thread_client_list_node); + c->io_thread_client_list_node = NULL; + } else { + list *clients[5] = { + IOThreads[c->tid].pending_clients, + IOThreads[c->tid].pending_clients_to_main_thread, + mainThreadPendingClients[c->tid], + mainThreadProcessingClients[c->tid], + mainThreadPendingClientsToIOThreads[c->tid] + }; + for (int i = 0; i < 5; i++) { + listNode *ln = listSearchKey(clients[i], c); + if (ln) { + listDelNode(clients[i], ln); + /* Client only can be in one client list. */ + break; + } + } + } + /* Unbind connection of client from io thread event loop. */ + connUnbindEventLoop(c->conn); + /* Now main thread can process it. */ + resumeIOThread(c->tid); + + /* Keep the client in main thread. */ + c->running_tid = IOTHREAD_MAIN_THREAD_ID; + keepClientInMainThread(c); +} + +/* For some clients, we must handle them in the main thread, since there is + * data race to be processed in IO threads. + * + * - Close ASAP, we must free the client in main thread. + * - Pubsub, monitor, blocked, tracking clients, main thread may + * directly write them a reply when conditions are met. + * - Script command with debug may operate connection directly. + * - Master/Replica are only handled by IO thread when RDB replication is + * completed. Note we need to check them after checking for other flags + * that may overlap with CLIENT_MASTER/SLAVE - CLOSE_ASAP, MONITOR, + * (UN)BLOCKED, TRACKING. */ +int isClientMustHandledByMainThread(client *c) { + if (c->flags & (CLIENT_CLOSE_ASAP | + CLIENT_PUBSUB | CLIENT_MONITOR | CLIENT_BLOCKED | + CLIENT_UNBLOCKED | CLIENT_TRACKING | CLIENT_LUA_DEBUG | + CLIENT_LUA_DEBUG_SYNC | CLIENT_ASM_MIGRATING | + CLIENT_ASM_IMPORTING)) + { + return 1; + } + + /* If RDB replication is done it's safe to move the master client to an IO thread. + * Note that we keep the master client in main thread during failover so as + * not to slow down the failover process by waiting the master replication + * cron in IO thread. */ + if (c->flags & CLIENT_MASTER && + server.repl_state == REPL_STATE_CONNECTED && + server.repl_rdb_ch_state == REPL_RDB_CH_STATE_NONE && + server.failover_state == NO_FAILOVER) + { + return 0; + } + + /* If RDB replication is done for this slave it's safe to move it to an IO thread + * Note that we also check if the ref_repl_buf_node is initialized in order + * to prevent race conditions with main thread when it feeds the replication + * buffer. */ + if (c->flags & CLIENT_SLAVE && + (c->replstate == SLAVE_STATE_ONLINE || + c->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM) && + c->repl_start_cmd_stream_on_ack == 0 && + c->ref_repl_buf_node != NULL) + { + return 0; + } + + if (c->flags & (CLIENT_MASTER | CLIENT_SLAVE)) return 1; + + return 0; +} + +/* When the main thread accepts a new client or transfers clients to IO threads, + * it assigns the client to the IO thread with the fewest clients. */ +void assignClientToIOThread(client *c) { + serverAssert(c->tid == IOTHREAD_MAIN_THREAD_ID); + /* Find the IO thread with the fewest clients. */ + int min_id = 0; + int min = INT_MAX; + for (int i = 1; i < server.io_threads_num; i++) { + if (server.io_threads_clients_num[i] < min) { + min = server.io_threads_clients_num[i]; + min_id = i; + } + } + + /* Assign the client to the IO thread. */ + server.io_threads_clients_num[c->tid]--; + c->tid = min_id; + server.io_threads_clients_num[min_id]++; + + /* The client running in IO thread needs to have deferred objects array. */ + c->deferred_objects = zmalloc(sizeof(deferredObject) * CLIENT_MAX_DEFERRED_OBJECTS); + + /* Unbind connection of client from main thread event loop, disable read and + * write, and then put it in the list, main thread will send these clients + * to IO thread in beforeSleep. */ + connUnbindEventLoop(c->conn); + c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED); + + enqueuePendingClienstToIOThreads(c); +} + +/* If updating maxclients config, we not only resize the event loop of main thread + * but also resize the event loop of all io threads, and if one thread is failed, + * it is failed totally, since a fd can be distributed into any IO thread. */ +int resizeAllIOThreadsEventLoops(size_t newsize) { + int result = AE_OK; + if (server.io_threads_num <= 1) return result; + + /* To make context safe. */ + pauseAllIOThreads(); + for (int i = 1; i < server.io_threads_num; i++) { + IOThread *t = &IOThreads[i]; + if (aeResizeSetSize(t->el, newsize) == AE_ERR) + result = AE_ERR; + } + resumeAllIOThreads(); + return result; +} + +/* In the main thread, we may want to operate data of io threads, maybe uninstall + * event handler, access query/output buffer or resize event loop, we need a clean + * and safe context to do that. We pause io thread in IOThreadBeforeSleep, do some + * jobs and then resume it. To avoid thread suspended, we use busy waiting to confirm + * the target status. Besides we use atomic variable to make sure memory visibility + * and ordering. + * + * Make sure that only the main thread can call these function, + * - pauseIOThread, resumeIOThread + * - pauseAllIOThreads, resumeAllIOThreads + * - pauseIOThreadsRange, resumeIOThreadsRange + * + * The main thread will pause the io thread, and then wait for the io thread to + * be paused. The io thread will check the paused status in IOThreadBeforeSleep, + * and then pause itself. + * + * The main thread will resume the io thread, and then wait for the io thread to + * be resumed. The io thread will check the paused status in IOThreadBeforeSleep, + * and then resume itself. + */ + +/* We may pause the same io thread nestedly, so we need to record the times of + * pausing, and only when the times of pausing is 0, we can pause the io thread, + * and only when the times of pausing is 1, we can resume the io thread. */ +static int PausedIOThreads[IO_THREADS_MAX_NUM] = {0}; + +/* Pause the specific range of io threads, and wait for them to be paused. */ +void pauseIOThreadsRange(int start, int end) { + if (!server.io_threads_active) return; + serverAssert(start >= 1 && end < server.io_threads_num && start <= end); + serverAssert(pthread_equal(pthread_self(), server.main_thread_id)); + + /* Try to make all io threads paused in parallel */ + for (int i = start; i <= end; i++) { + PausedIOThreads[i]++; + /* Skip if already paused */ + if (PausedIOThreads[i] > 1) continue; + + int paused; + atomicGetWithSync(IOThreads[i].paused, paused); + /* Don't support to call reentrant */ + serverAssert(paused == IO_THREAD_UNPAUSED); + atomicSetWithSync(IOThreads[i].paused, IO_THREAD_PAUSING); + /* Just notify io thread, no actual job, since io threads check paused + * status in IOThreadBeforeSleep, so just wake it up if polling wait. */ + triggerEventNotifier(IOThreads[i].pending_clients_notifier); + } + + /* Wait for all io threads paused */ + for (int i = start; i <= end; i++) { + if (PausedIOThreads[i] > 1) continue; + int paused = IO_THREAD_PAUSING; + while (paused != IO_THREAD_PAUSED) { + atomicGetWithSync(IOThreads[i].paused, paused); + } + } +} + +/* Resume the specific range of io threads, and wait for them to be resumed. */ +void resumeIOThreadsRange(int start, int end) { + if (!server.io_threads_active) return; + serverAssert(start >= 1 && end < server.io_threads_num && start <= end); + serverAssert(pthread_equal(pthread_self(), server.main_thread_id)); + + for (int i = start; i <= end; i++) { + serverAssert(PausedIOThreads[i] > 0); + PausedIOThreads[i]--; + if (PausedIOThreads[i] > 0) continue; + + int paused; + /* Check if it is paused, since we must call 'pause' and + * 'resume' in pairs */ + atomicGetWithSync(IOThreads[i].paused, paused); + serverAssert(paused == IO_THREAD_PAUSED); + /* Resume */ + atomicSetWithSync(IOThreads[i].paused, IO_THREAD_RESUMING); + while (paused != IO_THREAD_UNPAUSED) { + atomicGetWithSync(IOThreads[i].paused, paused); + } + } +} + +/* The IO thread checks whether it is being paused, and if so, it pauses itself + * and waits for resuming, corresponding to the pause/resumeIOThread* functions. + * Currently, this is only called in IOThreadBeforeSleep, as there are no pending + * I/O events at this point, with a clean context. */ +void handlePauseAndResume(IOThread *t) { + int paused; + /* Check if i am being paused. */ + atomicGetWithSync(t->paused, paused); + if (paused == IO_THREAD_PAUSING) { + atomicSetWithSync(t->paused, IO_THREAD_PAUSED); + /* Wait for resuming */ + while (paused != IO_THREAD_RESUMING) { + atomicGetWithSync(t->paused, paused); + } + atomicSetWithSync(t->paused, IO_THREAD_UNPAUSED); + } +} + +/* Pause the specific io thread, and wait for it to be paused. */ +void pauseIOThread(int id) { + pauseIOThreadsRange(id, id); +} + +/* Resume the specific io thread, and wait for it to be resumed. */ +void resumeIOThread(int id) { + resumeIOThreadsRange(id, id); +} + +/* Pause all io threads, and wait for them to be paused. */ +void pauseAllIOThreads(void) { + pauseIOThreadsRange(1, server.io_threads_num-1); +} + +/* Resume all io threads, and wait for them to be resumed. */ +void resumeAllIOThreads(void) { + resumeIOThreadsRange(1, server.io_threads_num-1); +} + +/* Add the pending clients to the list of IO threads, and trigger an event to + * notify io threads to handle. */ +int sendPendingClientsToIOThreads(void) { + int processed = 0; + for (int i = 1; i < server.io_threads_num; i++) { + int len = listLength(mainThreadPendingClientsToIOThreads[i]); + if (len > 0) { + IOThread *t = &IOThreads[i]; + pthread_mutex_lock(&t->pending_clients_mutex); + listJoin(t->pending_clients, mainThreadPendingClientsToIOThreads[i]); + pthread_mutex_unlock(&t->pending_clients_mutex); + /* Trigger an event, maybe an error is returned when buffer is full + * if using pipe, but no worry, io thread will handle all clients + * in list when receiving a notification. */ + triggerEventNotifier(t->pending_clients_notifier); + } + processed += len; + } + return processed; +} + +/* Prefetch the commands from the IO thread. The return value is the number + * of clients that have been prefetched. */ +int prefetchIOThreadCommands(IOThread *t) { + int len = listLength(mainThreadProcessingClients[t->id]); + int to_prefetch = determinePrefetchCount(len); + if (to_prefetch == 0) return 0; + + /* Two-phase approach to optimize cache utilization: + * Phase 1: Issue prefetch hints for client structures + * Phase 2: Access the now-cached client data and add commands to batch */ + /* Since we double the configured size for better performance, + * see also `determinePrefetchCount` */ + static client *c[PREFETCH_BATCH_MAX_SIZE*2]; + serverAssert(PREFETCH_BATCH_MAX_SIZE*2 >= to_prefetch ); + int clients = 0; + listIter li; + listNode *ln; + listRewind(mainThreadProcessingClients[t->id], &li); + /* Phase 1: Issue prefetch instructions for client struct and pending_cmds. + * These prefetches will bring data into cache asynchronously. */ + for (int i = 0; i < to_prefetch && (ln = listNext(&li)); i++) { + c[i] = listNodeValue(ln); + redis_prefetch_read(c[i]); + redis_prefetch_read(&c[i]->pending_cmds); + } + /* Phase 2: Access client data (now likely in cache) and add to batch. + * Also prefetch additional fields (reply, mem_usage_bucket) that will be + * needed later during command execution. */ + for (int i = 0; i < to_prefetch; i++) { + if (addCommandToBatch(c[i]) == C_ERR) break; + if (c[i]->reply) redis_prefetch_read(c[i]->reply); + redis_prefetch_read(&c[i]->mem_usage_bucket); + clients++; + } + /* Prefetch the commands in the batch. */ + prefetchCommands(); + return clients; +} + +extern int ProcessingEventsWhileBlocked; + +/* Send the pending clients to the IO thread if the number of pending clients + * is greater than IO_THREAD_MAX_PENDING_CLIENTS, or if size_check is 0. */ +static inline void sendPendingClientsToIOThreadIfNeeded(IOThread *t, int size_check) { + size_t len = listLength(mainThreadPendingClientsToIOThreads[t->id]); + if (len == 0 || (size_check && len < IO_THREAD_MAX_PENDING_CLIENTS)) return; + + /* If AOF fsync policy is always, we should not let io thread handle these + * clients now since we don't flush AOF buffer to file and sync yet. + * So these clients will be delayed to send io threads in beforeSleep after + * flushAppendOnlyFile. + * + * If we are in processEventsWhileBlocked, we don't send clients to io threads + * now, we want to update server.events_processed_while_blocked accurately. */ + if (server.aof_fsync != AOF_FSYNC_ALWAYS && !ProcessingEventsWhileBlocked) { + int running = 0, pending = 0; + pthread_mutex_lock(&(t->pending_clients_mutex)); + pending = listLength(t->pending_clients); + listJoin(t->pending_clients, mainThreadPendingClientsToIOThreads[t->id]); + pthread_mutex_unlock(&(t->pending_clients_mutex)); + if (!pending) atomicGetWithSync(t->running, running); + + /* Only notify io thread if it is not running and no pending clients to + * process, to avoid unnecessary notify/wakeup. If the io thread is running, + * it will process the clients in beforeSleep. If there are pending clients, + * we may already notify the io thread if needed. */ + if(!running && !pending) triggerEventNotifier(t->pending_clients_notifier); + } +} + +/* The main thread processes the clients from IO threads, these clients may have + * a complete command to execute or need to be freed. Note that IO threads never + * free client since this operation access much server data. + * + * Please notice that this function may be called reentrantly, i,e, the same goes + * for handleClientsFromIOThread and processClientsOfAllIOThreads. For example, + * when processing script command, it may call processEventsWhileBlocked to + * process new events, if the clients with fired events from the same io thread, + * it may call this function reentrantly. */ +int processClientsFromIOThread(IOThread *t) { + /* Get the list of clients to process. */ + pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]); + listJoin(mainThreadProcessingClients[t->id], mainThreadPendingClients[t->id]); + pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]); + size_t processed = listLength(mainThreadProcessingClients[t->id]); + if (processed == 0) return 0; + + int prefetch_clients = 0; + /* We may call processClientsFromIOThread reentrantly, so we need to + * reset the prefetching batch, besides, users may change the config + * of prefetch batch size, so we need to reset the prefetching batch. */ + resetCommandsBatch(); + + listNode *node = NULL; + while (listLength(mainThreadProcessingClients[t->id])) { + if (prefetch_clients <= 0) { + /* Reset the prefetching batch if we have processed all clients. */ + resetCommandsBatch(); + /* Prefetch the commands if no clients in the batch. */ + prefetch_clients = prefetchIOThreadCommands(t); + } + prefetch_clients--; + + /* Each time we pop up only the first client to process to guarantee + * reentrancy safety. */ + if (node) zfree(node); + node = listFirst(mainThreadProcessingClients[t->id]); + listUnlinkNode(mainThreadProcessingClients[t->id], node); + client *c = listNodeValue(node); + + /* Make sure the client is neither readable nor writable in io thread to + * avoid data race. */ + serverAssert(!(c->io_flags & (CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED))); + serverAssert(!(c->flags & CLIENT_CLOSE_ASAP)); + + /* Let main thread to run it, set running thread id first. */ + c->running_tid = IOTHREAD_MAIN_THREAD_ID; + + /* Free objects queued by IO thread for deferred freeing. */ + freeClientIODeferredObjects(c, 0); + tryUnlinkClientFromPendingRefReply(c, 0); + + /* If a read error occurs, handle it in the main thread first, since we + * want to print logs about client information before freeing. */ + if (isClientReadErrorFatal(c)) handleClientReadError(c); + + /* The client is asked to close in IO thread. */ + if (c->io_flags & CLIENT_IO_CLOSE_ASAP) { + freeClient(c); + continue; + } + + /* Update some client's members while we are in main thread so we avoid + * data races. */ + updateClientDataFromIOThread(c); + + /* Check if we need to run a cron job for the client */ + if (runClientCronFromIOThread(c)) continue; + + /* Process the pending command and input buffer. */ + if (!isClientReadErrorFatal(c) && c->io_flags & CLIENT_IO_PENDING_COMMAND) { + c->flags |= CLIENT_PENDING_COMMAND; + if (processPendingCommandAndInputBuffer(c) == C_ERR) { + /* If the client is no longer valid, it must be freed safely. */ + continue; + } + } + + /* We may have pending replies if io thread may not finish writing + * reply to client, so we did not put the client in pending write + * queue. And we should do that first since we may keep the client + * in main thread instead of returning to io threads. */ + if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c)) + putClientInPendingWriteQueue(c); + + /* The client only can be processed in the main thread, otherwise data + * race will happen, since we may touch client's data in main thread. */ + if (isClientMustHandledByMainThread(c)) { + keepClientInMainThread(c); + continue; + } + + /* Handle replica clients in putReplicasInPendingClientsToIOThreads in + * beforeSleep */ + if (c->flags & CLIENT_SLAVE) continue; + + /* Remove this client from pending write clients queue of main thread, + * And some clients may do not have reply if CLIENT REPLY OFF/SKIP. */ + if (c->flags & CLIENT_PENDING_WRITE) { + c->flags &= ~CLIENT_PENDING_WRITE; + listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node); + } + c->running_tid = c->tid; + listLinkNodeHead(mainThreadPendingClientsToIOThreads[c->tid], node); + node = NULL; + + /* If there are several clients to process, let io thread handle them ASAP. */ + sendPendingClientsToIOThreadIfNeeded(t, 1); + } + if (node) zfree(node); + + /* Send the clients to io thread without pending size check, since main thread + * may process clients from other io threads, so we need to send them to the + * io thread to process in prallel. */ + sendPendingClientsToIOThreadIfNeeded(t, 0); + + return processed; +} + +/* When the io thread finishes processing the client with the read event, it will + * notify the main thread through event triggering in IOThreadBeforeSleep. The main + * thread handles the event through this function. */ +void handleClientsFromIOThread(struct aeEventLoop *el, int fd, void *ptr, int mask) { + UNUSED(el); + UNUSED(mask); + + IOThread *t = ptr; + + /* Handle fd event first. */ + serverAssert(fd == getReadEventFd(mainThreadPendingClientsNotifiers[t->id])); + handleEventNotifier(mainThreadPendingClientsNotifiers[t->id]); + + /* Process the clients from IO threads. */ + processClientsFromIOThread(t); +} + +/* In the new threaded io design, one thread may process multiple clients, so when + * an io thread notifies the main thread of an event, there may be multiple clients + * with commands that need to be processed. But in the event handler function + * handleClientsFromIOThread may be blocked when processing the specific command, + * the previous clients can not get a reply, and the subsequent clients can not be + * processed, so we need to handle this scenario in beforeSleep. The function is to + * process the commands of subsequent clients from io threads. And another function + * sendPendingClientsToIOThreads make sure clients from io thread can get replies. + * See also beforeSleep. + * + * In beforeSleep, we also call this function to handle the clients that are + * transferred from io threads without notification. */ +int processClientsOfAllIOThreads(void) { + int processed = 0; + for (int i = 1; i < server.io_threads_num; i++) { + processed += processClientsFromIOThread(&IOThreads[i]); + } + return processed; +} + +/* After the main thread processes the clients, it will send the clients back to + * io threads to handle, and fire an event, the io thread handles the event by + * this function. */ +void handleClientsFromMainThread(struct aeEventLoop *ae, int fd, void *ptr, int mask) { + UNUSED(ae); + UNUSED(mask); + + IOThread *t = ptr; + + /* Handle fd event first. */ + serverAssert(fd == getReadEventFd(t->pending_clients_notifier)); + handleEventNotifier(t->pending_clients_notifier); + + /* Process the clients from main thread. */ + processClientsFromMainThread(t); +} + +/* Processing clients that have finished executing commands from the main thread. + * If the client is not binded to the event loop, we should bind it first and + * install read handler. If the client still has query buffer, we should process + * the input buffer. If the client has pending reply, we just reply to client, + * and then install write handler if needed. */ +int processClientsFromMainThread(IOThread *t) { + pthread_mutex_lock(&t->pending_clients_mutex); + listJoin(t->processing_clients, t->pending_clients); + pthread_mutex_unlock(&t->pending_clients_mutex); + size_t processed = listLength(t->processing_clients); + if (processed == 0) return 0; + + listIter li; + listNode *ln; + listRewind(t->processing_clients, &li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + serverAssert(!(c->io_flags & (CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED))); + /* Main thread must handle clients with CLIENT_CLOSE_ASAP flag, since + * we only set io_flags when clients in io thread are freed ASAP. */ + serverAssert(!(c->flags & CLIENT_CLOSE_ASAP)); + + /* Link client in IO thread clients list first. */ + serverAssert(c->io_thread_client_list_node == NULL); + listUnlinkNode(t->processing_clients, ln); + listLinkNodeTail(t->clients, ln); + c->io_thread_client_list_node = listLast(t->clients); + + /* The client now is in the IO thread, let's free deferred objects. */ + freeClientDeferredObjects(c, 0); + + /* The client is asked to close, we just let main thread free it. */ + if (c->io_flags & CLIENT_IO_CLOSE_ASAP) { + enqueuePendingClientsToMainThread(c, 1); + continue; + } + + /* Enable read and write and reset some flags. */ + c->io_flags |= CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED; + c->io_flags &= ~(CLIENT_IO_PENDING_COMMAND | CLIENT_IO_PENDING_CRON); + + /* Only bind once, we never remove read handler unless freeing client. */ + if (!connHasEventLoop(c->conn)) { + connRebindEventLoop(c->conn, t->el); + serverAssert(!connHasReadHandler(c->conn)); + connSetReadHandler(c->conn, readQueryFromClient); + } + + /* If the client has pending replies, write replies to client. */ + if (clientHasPendingReplies(c)) { + writeToClient(c, 0); + if (!(c->io_flags & CLIENT_IO_CLOSE_ASAP) && clientHasPendingReplies(c)) { + connSetWriteHandler(c->conn, sendReplyToClient); + } + } + } + /* All clients must are processed. */ + serverAssert(listLength(t->processing_clients) == 0); + return processed; +} + +void IOThreadBeforeSleep(struct aeEventLoop *el) { + IOThread *t = el->privdata[0]; + + /* Handle pending data(typical TLS). */ + connTypeProcessPendingData(el); + + /* If any connection type(typical TLS) still has pending unread data don't sleep at all. */ + int dont_sleep = connTypeHasPendingData(el); + + /* Process clients from main thread, since the main thread may deliver clients + * without notification during IO thread processing events. */ + if (processClientsFromMainThread(t) > 0) { + /* If there are clients that are processed, we should not sleep since main + * thread may want to continue deliverring clients without notification, so + * IO thread can process them ASAP, and the main thread can avoid unnecessary + * notification (write fd and wake up) is costly. */ + dont_sleep = 1; + } + if (!dont_sleep) { + atomicSetWithSync(t->running, 0); /* Not running if going to sleep. */ + /* Try to process clients from main thread again, since before we set + * running to 0, the main thread may deliver clients to this io thread. */ + processClientsFromMainThread(t); + } + aeSetDontWait(t->el, dont_sleep); + + /* Check if i am being paused, pause myself and resume. */ + handlePauseAndResume(t); + + /* Send clients to main thread to process, we don't check size here since + * we want to send all clients to main thread before going to sleeping. */ + sendPendingClientsToMainThreadIfNeeded(t, 0); +} + +void IOThreadAfterSleep(struct aeEventLoop *el) { + IOThread *t = el->privdata[0]; + + /* Set the IO thread to running state, so the main thread can deliver + * clients to it without extra notifications. */ + atomicSetWithSync(t->running, 1); +} + +/* Periodically transfer part of clients to the main thread for processing. */ +void IOThreadClientsCron(IOThread *t) { + /* Process at least a few clients while we are at it, even if we need + * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract + * of processing each client once per second. */ + int iterations = listLength(t->clients) / CONFIG_DEFAULT_HZ; + if (iterations < CLIENTS_CRON_MIN_ITERATIONS) { + iterations = CLIENTS_CRON_MIN_ITERATIONS; + } + + listIter li; + listNode *ln; + listRewind(t->clients, &li); + while ((ln = listNext(&li)) && iterations--) { + client *c = listNodeValue(ln); + /* Mark the client as pending cron, main thread will process it. */ + c->io_flags |= CLIENT_IO_PENDING_CRON; + enqueuePendingClientsToMainThread(c, 0); + } +} + +/* This is the IO thread timer interrupt, CONFIG_DEFAULT_HZ times per second. + * The current responsibility is to detect clients that have been stuck in the + * IO thread for too long and hand them over to the main thread for handling. */ +int IOThreadCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { + UNUSED(eventLoop); + UNUSED(id); + IOThread *t = clientData; + + /* Run cron tasks for the clients in the IO thread. */ + IOThreadClientsCron(t); + + return 1000/CONFIG_DEFAULT_HZ; +} + +/* The main function of IO thread, it will run an event loop. The mian thread + * and IO thread will communicate through event notifier. */ +void *IOThreadMain(void *ptr) { + IOThread *t = ptr; + char thdname[16]; + snprintf(thdname, sizeof(thdname), "io_thd_%d", t->id); + redis_set_thread_title(thdname); + redisSetCpuAffinity(server.server_cpulist); + makeThreadKillable(); + aeSetBeforeSleepProc(t->el, IOThreadBeforeSleep); + aeSetAfterSleepProc(t->el, IOThreadAfterSleep); + aeMain(t->el); + return NULL; +} + +/* Initialize the data structures needed for threaded I/O. */ +void initThreadedIO(void) { + if (server.io_threads_num <= 1) return; + + server.io_threads_active = 1; + + if (server.io_threads_num > IO_THREADS_MAX_NUM) { + serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " + "The maximum number is %d.", IO_THREADS_MAX_NUM); + exit(1); + } + + /* Spawn and initialize the I/O threads. */ + for (int i = 1; i < server.io_threads_num; i++) { + IOThread *t = &IOThreads[i]; + t->id = i; + t->el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); + t->el->privdata[0] = t; + t->pending_clients = listCreate(); + t->processing_clients = listCreate(); + t->pending_clients_to_main_thread = listCreate(); + t->clients = listCreate(); + atomicSetWithSync(t->paused, IO_THREAD_UNPAUSED); + atomicSetWithSync(t->running, 0); + + pthread_mutexattr_t *attr = NULL; + #if defined(__linux__) && defined(__GLIBC__) + attr = zmalloc(sizeof(pthread_mutexattr_t)); + pthread_mutexattr_init(attr); + pthread_mutexattr_settype(attr, PTHREAD_MUTEX_ADAPTIVE_NP); + #endif + pthread_mutex_init(&t->pending_clients_mutex, attr); + + t->pending_clients_notifier = createEventNotifier(); + if (aeCreateFileEvent(t->el, getReadEventFd(t->pending_clients_notifier), + AE_READABLE, handleClientsFromMainThread, t) != AE_OK) + { + serverLog(LL_WARNING, "Fatal: Can't register file event for IO thread notifications."); + exit(1); + } + + /* This is the timer callback of the IO thread, used to gradually handle + * some background operations, such as clients cron. */ + if (aeCreateTimeEvent(t->el, 1, IOThreadCron, t, NULL) == AE_ERR) { + serverLog(LL_WARNING, "Fatal: Can't create event loop timers in IO thread."); + exit(1); + } + + /* Create IO thread */ + if (pthread_create(&t->tid, NULL, IOThreadMain, (void*)t) != 0) { + serverLog(LL_WARNING, "Fatal: Can't initialize IO thread."); + exit(1); + } + + /* For main thread */ + mainThreadPendingClientsToIOThreads[i] = listCreate(); + mainThreadPendingClients[i] = listCreate(); + mainThreadProcessingClients[i] = listCreate(); + pthread_mutex_init(&mainThreadPendingClientsMutexes[i], attr); + mainThreadPendingClientsNotifiers[i] = createEventNotifier(); + if (aeCreateFileEvent(server.el, getReadEventFd(mainThreadPendingClientsNotifiers[i]), + AE_READABLE, handleClientsFromIOThread, t) != AE_OK) + { + serverLog(LL_WARNING, "Fatal: Can't register file event for main thread notifications."); + exit(1); + } + if (attr) zfree(attr); + } +} + +/* Kill the IO threads, TODO: release the applied resources. */ +void killIOThreads(void) { + if (server.io_threads_num <= 1) return; + + int err, j; + for (j = 1; j < server.io_threads_num; j++) { + if (IOThreads[j].tid == pthread_self()) continue; + if (IOThreads[j].tid && pthread_cancel(IOThreads[j].tid) == 0) { + if ((err = pthread_join(IOThreads[j].tid,NULL)) != 0) { + serverLog(LL_WARNING, + "IO thread(tid:%lu) can not be joined: %s", + (unsigned long)IOThreads[j].tid, strerror(err)); + } else { + serverLog(LL_WARNING, + "IO thread(tid:%lu) terminated",(unsigned long)IOThreads[j].tid); + } + } + } +} -- cgit v1.2.3