diff options
Diffstat (limited to 'examples/redis-unstable/src/iothread.c')
| -rw-r--r-- | examples/redis-unstable/src/iothread.c | 955 |
1 files changed, 0 insertions, 955 deletions
diff --git a/examples/redis-unstable/src/iothread.c b/examples/redis-unstable/src/iothread.c deleted file mode 100644 index 54a9cae..0000000 --- a/examples/redis-unstable/src/iothread.c +++ /dev/null @@ -1,955 +0,0 @@ -/* iothread.c -- The threaded io implementation. - * - * Copyright (c) 2024-Present, Redis Ltd. - * All rights reserved. - * - * Licensed under your choice of (a) the Redis Source Available License 2.0 - * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the - * GNU Affero General Public License v3 (AGPLv3). - */ - -#include "server.h" - -/* IO threads. */ -static IOThread IOThreads[IO_THREADS_MAX_NUM]; - -/* For main thread */ -static list *mainThreadPendingClientsToIOThreads[IO_THREADS_MAX_NUM]; /* Clients to IO threads */ -static list *mainThreadProcessingClients[IO_THREADS_MAX_NUM]; /* Clients in processing */ -static list *mainThreadPendingClients[IO_THREADS_MAX_NUM]; /* Pending clients from IO threads */ -static pthread_mutex_t mainThreadPendingClientsMutexes[IO_THREADS_MAX_NUM]; /* Mutex for pending clients */ -static eventNotifier* mainThreadPendingClientsNotifiers[IO_THREADS_MAX_NUM]; /* Notifier for pending clients */ - -/* Send the clients to the main thread for processing when the number of clients - * in pending list reaches IO_THREAD_MAX_PENDING_CLIENTS, or check_size is 0. */ -static inline void sendPendingClientsToMainThreadIfNeeded(IOThread *t, int check_size) { - size_t len = listLength(t->pending_clients_to_main_thread); - if (len == 0 || (check_size && len < IO_THREAD_MAX_PENDING_CLIENTS)) return; - - int running = 0, pending = 0; - pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]); - pending = listLength(mainThreadPendingClients[t->id]); - listJoin(mainThreadPendingClients[t->id], t->pending_clients_to_main_thread); - pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]); - if (!pending) atomicGetWithSync(server.running, running); - - /* Only notify main thread if it is not running and no pending clients to process, - * to avoid unnecessary notify/wakeup. If the main thread is running, it will - * process the clients in beforeSleep. If there are pending clients, we may - * already notify the main thread if needed. */ - if (!running && !pending) { - triggerEventNotifier(mainThreadPendingClientsNotifiers[t->id]); - } -} - -/* When moving a client from IO thread to main thread we may need to update - * some of its variables as they are duplicated to avoid contention with main - * thread. - * For now this is valid only for master or slave clients. */ -void updateClientDataFromIOThread(client *c) { - if (!(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_SLAVE)) return; - - serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID && - c->running_tid == IOTHREAD_MAIN_THREAD_ID); - - if (c->io_repl_ack_time > c->repl_ack_time) { - serverAssert(c->flags & CLIENT_SLAVE); - c->repl_ack_time = c->io_repl_ack_time; - } - if (c->io_lastinteraction > c->lastinteraction) { - serverAssert(c->flags & CLIENT_MASTER); - c->lastinteraction = c->io_lastinteraction; - } - if (c->io_read_reploff > c->read_reploff) { - serverAssert(c->flags & CLIENT_MASTER); - c->read_reploff = c->io_read_reploff; - } - - /* Update replication buffer referenced node if IO thread has sent some data. */ - if (c->flags & CLIENT_SLAVE && c->ref_repl_buf_node != NULL && - (c->io_curr_repl_node != c->ref_repl_buf_node || - c->io_curr_block_pos != c->ref_block_pos)) - { - ((replBufBlock*)listNodeValue(c->ref_repl_buf_node))->refcount--; - ((replBufBlock*)listNodeValue(c->io_curr_repl_node))->refcount++; - c->ref_block_pos = c->io_curr_block_pos; - c->ref_repl_buf_node = c->io_curr_repl_node; - incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL); - } -} - -/* Check to see if the client needs any cron jobs run for them. Return 1 if the - * client should be terminated */ -int runClientCronFromIOThread(client *c) { - if (c->flags & CLIENT_MASTER && - c->io_last_repl_cron + 1000 <= server.mstime) - { - c->io_last_repl_cron = server.mstime; - if (replicationCronRunMasterClient()) return 1; - } - - /* Run client cron task for the client per second or it is marked as pending cron. */ - if (c->io_last_client_cron + 1000 <= server.mstime || - c->io_flags & CLIENT_IO_PENDING_CRON) - { - c->io_last_client_cron = server.mstime; - if (clientsCronRunClient(c)) return 1; - } else { - /* Update the client in the mem usage if clientsCronRunClient is not - * being called, since that function already performs the update. */ - updateClientMemUsageAndBucket(c); - } - - return 0; -} - -/* When IO threads read a complete query of clients or want to free clients, it - * should remove it from its clients list and put the client in the list to main - * thread, we will send these clients to main thread in IOThreadBeforeSleep. */ -void enqueuePendingClientsToMainThread(client *c, int unbind) { - /* If the IO thread may no longer manage it, such as closing client, we should - * unbind client from event loop, so main thread doesn't need to do it costly. */ - if (unbind) connUnbindEventLoop(c->conn); - /* Just skip if it already is transferred. */ - if (c->io_thread_client_list_node) { - IOThread *t = &IOThreads[c->tid]; - /* If there are several clients to process, let the main thread handle them ASAP. - * Since the client being added to the queue may still need to be processed by - * the IO thread, we must call this before adding it to the queue to avoid - * races with the main thread. */ - sendPendingClientsToMainThreadIfNeeded(t, 1); - /* Disable read and write to avoid race when main thread processes. */ - c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED); - /* Remove the client from IO thread, add it to main thread's pending list. */ - listUnlinkNode(t->clients, c->io_thread_client_list_node); - listLinkNodeTail(t->pending_clients_to_main_thread, c->io_thread_client_list_node); - c->io_thread_client_list_node = NULL; - } -} - -void enqueuePendingClienstToIOThreads(client *c) { - serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID && - c->running_tid == IOTHREAD_MAIN_THREAD_ID); - - if (c->flags & CLIENT_PENDING_WRITE) { - c->flags &= ~CLIENT_PENDING_WRITE; - listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node); - } - if (c->flags & CLIENT_SLAVE) { - serverAssert(c->ref_repl_buf_node != NULL); - - c->io_repl_ack_time = c->repl_ack_time; - c->io_curr_repl_node = c->ref_repl_buf_node; - c->io_curr_block_pos = c->ref_block_pos; - c->io_bound_repl_node = listLast(server.repl_buffer_blocks); - c->io_bound_block_pos = ((replBufBlock*)listNodeValue(c->io_bound_repl_node))->used; - } - if (c->flags & CLIENT_MASTER) { - c->io_read_reploff = c->read_reploff; - c->io_lastinteraction = c->lastinteraction; - } - - c->running_tid = c->tid; - listAddNodeHead(mainThreadPendingClientsToIOThreads[c->tid], c); -} - -/* Unbind connection of client from io thread event loop, write and read handlers - * also be removed, ensures that we can operate the client safely. */ -void unbindClientFromIOThreadEventLoop(client *c) { - serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID && - c->running_tid == IOTHREAD_MAIN_THREAD_ID); - if (!connHasEventLoop(c->conn)) return; - /* As calling in main thread, we should pause the io thread to make it safe. */ - pauseIOThread(c->tid); - connUnbindEventLoop(c->conn); - resumeIOThread(c->tid); -} - -/* When main thread is processing a client from IO thread, and wants to keep it, - * we should unbind connection of client from io thread event loop first, - * and then bind the client connection into server's event loop. */ -void keepClientInMainThread(client *c) { - if (c->tid == IOTHREAD_MAIN_THREAD_ID) return; - serverAssert(c->running_tid == IOTHREAD_MAIN_THREAD_ID); - /* IO thread no longer manage it. */ - server.io_threads_clients_num[c->tid]--; - /* Unbind connection of client from io thread event loop. */ - unbindClientFromIOThreadEventLoop(c); - /* Update the client's data in case it was just fetched from IO thread */ - updateClientDataFromIOThread(c); - /* Let main thread to run it, rebind event loop and read handler */ - connRebindEventLoop(c->conn, server.el); - connSetReadHandler(c->conn, readQueryFromClient); - c->io_flags |= CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED; - c->tid = IOTHREAD_MAIN_THREAD_ID; - freeClientDeferredObjects(c, 1); /* Free deferred objects. */ - freeClientIODeferredObjects(c, 1); /* Free IO deferred objects. */ - tryUnlinkClientFromPendingRefReply(c, 0); - /* Main thread starts to manage it. */ - server.io_threads_clients_num[c->tid]++; -} - -/* If the client is managed by IO thread, we should fetch it from IO thread - * and then main thread will can process it. Just like IO Thread transfers - * the client to the main thread for processing. */ -void fetchClientFromIOThread(client *c) { - serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID && - c->running_tid != IOTHREAD_MAIN_THREAD_ID); - pauseIOThread(c->tid); - /* Remove the client from clients list of IO thread or main thread. */ - if (c->io_thread_client_list_node) { - listDelNode(IOThreads[c->tid].clients, c->io_thread_client_list_node); - c->io_thread_client_list_node = NULL; - } else { - list *clients[5] = { - IOThreads[c->tid].pending_clients, - IOThreads[c->tid].pending_clients_to_main_thread, - mainThreadPendingClients[c->tid], - mainThreadProcessingClients[c->tid], - mainThreadPendingClientsToIOThreads[c->tid] - }; - for (int i = 0; i < 5; i++) { - listNode *ln = listSearchKey(clients[i], c); - if (ln) { - listDelNode(clients[i], ln); - /* Client only can be in one client list. */ - break; - } - } - } - /* Unbind connection of client from io thread event loop. */ - connUnbindEventLoop(c->conn); - /* Now main thread can process it. */ - resumeIOThread(c->tid); - - /* Keep the client in main thread. */ - c->running_tid = IOTHREAD_MAIN_THREAD_ID; - keepClientInMainThread(c); -} - -/* For some clients, we must handle them in the main thread, since there is - * data race to be processed in IO threads. - * - * - Close ASAP, we must free the client in main thread. - * - Pubsub, monitor, blocked, tracking clients, main thread may - * directly write them a reply when conditions are met. - * - Script command with debug may operate connection directly. - * - Master/Replica are only handled by IO thread when RDB replication is - * completed. Note we need to check them after checking for other flags - * that may overlap with CLIENT_MASTER/SLAVE - CLOSE_ASAP, MONITOR, - * (UN)BLOCKED, TRACKING. */ -int isClientMustHandledByMainThread(client *c) { - if (c->flags & (CLIENT_CLOSE_ASAP | - CLIENT_PUBSUB | CLIENT_MONITOR | CLIENT_BLOCKED | - CLIENT_UNBLOCKED | CLIENT_TRACKING | CLIENT_LUA_DEBUG | - CLIENT_LUA_DEBUG_SYNC | CLIENT_ASM_MIGRATING | - CLIENT_ASM_IMPORTING)) - { - return 1; - } - - /* If RDB replication is done it's safe to move the master client to an IO thread. - * Note that we keep the master client in main thread during failover so as - * not to slow down the failover process by waiting the master replication - * cron in IO thread. */ - if (c->flags & CLIENT_MASTER && - server.repl_state == REPL_STATE_CONNECTED && - server.repl_rdb_ch_state == REPL_RDB_CH_STATE_NONE && - server.failover_state == NO_FAILOVER) - { - return 0; - } - - /* If RDB replication is done for this slave it's safe to move it to an IO thread - * Note that we also check if the ref_repl_buf_node is initialized in order - * to prevent race conditions with main thread when it feeds the replication - * buffer. */ - if (c->flags & CLIENT_SLAVE && - (c->replstate == SLAVE_STATE_ONLINE || - c->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM) && - c->repl_start_cmd_stream_on_ack == 0 && - c->ref_repl_buf_node != NULL) - { - return 0; - } - - if (c->flags & (CLIENT_MASTER | CLIENT_SLAVE)) return 1; - - return 0; -} - -/* When the main thread accepts a new client or transfers clients to IO threads, - * it assigns the client to the IO thread with the fewest clients. */ -void assignClientToIOThread(client *c) { - serverAssert(c->tid == IOTHREAD_MAIN_THREAD_ID); - /* Find the IO thread with the fewest clients. */ - int min_id = 0; - int min = INT_MAX; - for (int i = 1; i < server.io_threads_num; i++) { - if (server.io_threads_clients_num[i] < min) { - min = server.io_threads_clients_num[i]; - min_id = i; - } - } - - /* Assign the client to the IO thread. */ - server.io_threads_clients_num[c->tid]--; - c->tid = min_id; - server.io_threads_clients_num[min_id]++; - - /* The client running in IO thread needs to have deferred objects array. */ - c->deferred_objects = zmalloc(sizeof(deferredObject) * CLIENT_MAX_DEFERRED_OBJECTS); - - /* Unbind connection of client from main thread event loop, disable read and - * write, and then put it in the list, main thread will send these clients - * to IO thread in beforeSleep. */ - connUnbindEventLoop(c->conn); - c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED); - - enqueuePendingClienstToIOThreads(c); -} - -/* If updating maxclients config, we not only resize the event loop of main thread - * but also resize the event loop of all io threads, and if one thread is failed, - * it is failed totally, since a fd can be distributed into any IO thread. */ -int resizeAllIOThreadsEventLoops(size_t newsize) { - int result = AE_OK; - if (server.io_threads_num <= 1) return result; - - /* To make context safe. */ - pauseAllIOThreads(); - for (int i = 1; i < server.io_threads_num; i++) { - IOThread *t = &IOThreads[i]; - if (aeResizeSetSize(t->el, newsize) == AE_ERR) - result = AE_ERR; - } - resumeAllIOThreads(); - return result; -} - -/* In the main thread, we may want to operate data of io threads, maybe uninstall - * event handler, access query/output buffer or resize event loop, we need a clean - * and safe context to do that. We pause io thread in IOThreadBeforeSleep, do some - * jobs and then resume it. To avoid thread suspended, we use busy waiting to confirm - * the target status. Besides we use atomic variable to make sure memory visibility - * and ordering. - * - * Make sure that only the main thread can call these function, - * - pauseIOThread, resumeIOThread - * - pauseAllIOThreads, resumeAllIOThreads - * - pauseIOThreadsRange, resumeIOThreadsRange - * - * The main thread will pause the io thread, and then wait for the io thread to - * be paused. The io thread will check the paused status in IOThreadBeforeSleep, - * and then pause itself. - * - * The main thread will resume the io thread, and then wait for the io thread to - * be resumed. The io thread will check the paused status in IOThreadBeforeSleep, - * and then resume itself. - */ - -/* We may pause the same io thread nestedly, so we need to record the times of - * pausing, and only when the times of pausing is 0, we can pause the io thread, - * and only when the times of pausing is 1, we can resume the io thread. */ -static int PausedIOThreads[IO_THREADS_MAX_NUM] = {0}; - -/* Pause the specific range of io threads, and wait for them to be paused. */ -void pauseIOThreadsRange(int start, int end) { - if (!server.io_threads_active) return; - serverAssert(start >= 1 && end < server.io_threads_num && start <= end); - serverAssert(pthread_equal(pthread_self(), server.main_thread_id)); - - /* Try to make all io threads paused in parallel */ - for (int i = start; i <= end; i++) { - PausedIOThreads[i]++; - /* Skip if already paused */ - if (PausedIOThreads[i] > 1) continue; - - int paused; - atomicGetWithSync(IOThreads[i].paused, paused); - /* Don't support to call reentrant */ - serverAssert(paused == IO_THREAD_UNPAUSED); - atomicSetWithSync(IOThreads[i].paused, IO_THREAD_PAUSING); - /* Just notify io thread, no actual job, since io threads check paused - * status in IOThreadBeforeSleep, so just wake it up if polling wait. */ - triggerEventNotifier(IOThreads[i].pending_clients_notifier); - } - - /* Wait for all io threads paused */ - for (int i = start; i <= end; i++) { - if (PausedIOThreads[i] > 1) continue; - int paused = IO_THREAD_PAUSING; - while (paused != IO_THREAD_PAUSED) { - atomicGetWithSync(IOThreads[i].paused, paused); - } - } -} - -/* Resume the specific range of io threads, and wait for them to be resumed. */ -void resumeIOThreadsRange(int start, int end) { - if (!server.io_threads_active) return; - serverAssert(start >= 1 && end < server.io_threads_num && start <= end); - serverAssert(pthread_equal(pthread_self(), server.main_thread_id)); - - for (int i = start; i <= end; i++) { - serverAssert(PausedIOThreads[i] > 0); - PausedIOThreads[i]--; - if (PausedIOThreads[i] > 0) continue; - - int paused; - /* Check if it is paused, since we must call 'pause' and - * 'resume' in pairs */ - atomicGetWithSync(IOThreads[i].paused, paused); - serverAssert(paused == IO_THREAD_PAUSED); - /* Resume */ - atomicSetWithSync(IOThreads[i].paused, IO_THREAD_RESUMING); - while (paused != IO_THREAD_UNPAUSED) { - atomicGetWithSync(IOThreads[i].paused, paused); - } - } -} - -/* The IO thread checks whether it is being paused, and if so, it pauses itself - * and waits for resuming, corresponding to the pause/resumeIOThread* functions. - * Currently, this is only called in IOThreadBeforeSleep, as there are no pending - * I/O events at this point, with a clean context. */ -void handlePauseAndResume(IOThread *t) { - int paused; - /* Check if i am being paused. */ - atomicGetWithSync(t->paused, paused); - if (paused == IO_THREAD_PAUSING) { - atomicSetWithSync(t->paused, IO_THREAD_PAUSED); - /* Wait for resuming */ - while (paused != IO_THREAD_RESUMING) { - atomicGetWithSync(t->paused, paused); - } - atomicSetWithSync(t->paused, IO_THREAD_UNPAUSED); - } -} - -/* Pause the specific io thread, and wait for it to be paused. */ -void pauseIOThread(int id) { - pauseIOThreadsRange(id, id); -} - -/* Resume the specific io thread, and wait for it to be resumed. */ -void resumeIOThread(int id) { - resumeIOThreadsRange(id, id); -} - -/* Pause all io threads, and wait for them to be paused. */ -void pauseAllIOThreads(void) { - pauseIOThreadsRange(1, server.io_threads_num-1); -} - -/* Resume all io threads, and wait for them to be resumed. */ -void resumeAllIOThreads(void) { - resumeIOThreadsRange(1, server.io_threads_num-1); -} - -/* Add the pending clients to the list of IO threads, and trigger an event to - * notify io threads to handle. */ -int sendPendingClientsToIOThreads(void) { - int processed = 0; - for (int i = 1; i < server.io_threads_num; i++) { - int len = listLength(mainThreadPendingClientsToIOThreads[i]); - if (len > 0) { - IOThread *t = &IOThreads[i]; - pthread_mutex_lock(&t->pending_clients_mutex); - listJoin(t->pending_clients, mainThreadPendingClientsToIOThreads[i]); - pthread_mutex_unlock(&t->pending_clients_mutex); - /* Trigger an event, maybe an error is returned when buffer is full - * if using pipe, but no worry, io thread will handle all clients - * in list when receiving a notification. */ - triggerEventNotifier(t->pending_clients_notifier); - } - processed += len; - } - return processed; -} - -/* Prefetch the commands from the IO thread. The return value is the number - * of clients that have been prefetched. */ -int prefetchIOThreadCommands(IOThread *t) { - int len = listLength(mainThreadProcessingClients[t->id]); - int to_prefetch = determinePrefetchCount(len); - if (to_prefetch == 0) return 0; - - /* Two-phase approach to optimize cache utilization: - * Phase 1: Issue prefetch hints for client structures - * Phase 2: Access the now-cached client data and add commands to batch */ - /* Since we double the configured size for better performance, - * see also `determinePrefetchCount` */ - static client *c[PREFETCH_BATCH_MAX_SIZE*2]; - serverAssert(PREFETCH_BATCH_MAX_SIZE*2 >= to_prefetch ); - int clients = 0; - listIter li; - listNode *ln; - listRewind(mainThreadProcessingClients[t->id], &li); - /* Phase 1: Issue prefetch instructions for client struct and pending_cmds. - * These prefetches will bring data into cache asynchronously. */ - for (int i = 0; i < to_prefetch && (ln = listNext(&li)); i++) { - c[i] = listNodeValue(ln); - redis_prefetch_read(c[i]); - redis_prefetch_read(&c[i]->pending_cmds); - } - /* Phase 2: Access client data (now likely in cache) and add to batch. - * Also prefetch additional fields (reply, mem_usage_bucket) that will be - * needed later during command execution. */ - for (int i = 0; i < to_prefetch; i++) { - if (addCommandToBatch(c[i]) == C_ERR) break; - if (c[i]->reply) redis_prefetch_read(c[i]->reply); - redis_prefetch_read(&c[i]->mem_usage_bucket); - clients++; - } - /* Prefetch the commands in the batch. */ - prefetchCommands(); - return clients; -} - -extern int ProcessingEventsWhileBlocked; - -/* Send the pending clients to the IO thread if the number of pending clients - * is greater than IO_THREAD_MAX_PENDING_CLIENTS, or if size_check is 0. */ -static inline void sendPendingClientsToIOThreadIfNeeded(IOThread *t, int size_check) { - size_t len = listLength(mainThreadPendingClientsToIOThreads[t->id]); - if (len == 0 || (size_check && len < IO_THREAD_MAX_PENDING_CLIENTS)) return; - - /* If AOF fsync policy is always, we should not let io thread handle these - * clients now since we don't flush AOF buffer to file and sync yet. - * So these clients will be delayed to send io threads in beforeSleep after - * flushAppendOnlyFile. - * - * If we are in processEventsWhileBlocked, we don't send clients to io threads - * now, we want to update server.events_processed_while_blocked accurately. */ - if (server.aof_fsync != AOF_FSYNC_ALWAYS && !ProcessingEventsWhileBlocked) { - int running = 0, pending = 0; - pthread_mutex_lock(&(t->pending_clients_mutex)); - pending = listLength(t->pending_clients); - listJoin(t->pending_clients, mainThreadPendingClientsToIOThreads[t->id]); - pthread_mutex_unlock(&(t->pending_clients_mutex)); - if (!pending) atomicGetWithSync(t->running, running); - - /* Only notify io thread if it is not running and no pending clients to - * process, to avoid unnecessary notify/wakeup. If the io thread is running, - * it will process the clients in beforeSleep. If there are pending clients, - * we may already notify the io thread if needed. */ - if(!running && !pending) triggerEventNotifier(t->pending_clients_notifier); - } -} - -/* The main thread processes the clients from IO threads, these clients may have - * a complete command to execute or need to be freed. Note that IO threads never - * free client since this operation access much server data. - * - * Please notice that this function may be called reentrantly, i,e, the same goes - * for handleClientsFromIOThread and processClientsOfAllIOThreads. For example, - * when processing script command, it may call processEventsWhileBlocked to - * process new events, if the clients with fired events from the same io thread, - * it may call this function reentrantly. */ -int processClientsFromIOThread(IOThread *t) { - /* Get the list of clients to process. */ - pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]); - listJoin(mainThreadProcessingClients[t->id], mainThreadPendingClients[t->id]); - pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]); - size_t processed = listLength(mainThreadProcessingClients[t->id]); - if (processed == 0) return 0; - - int prefetch_clients = 0; - /* We may call processClientsFromIOThread reentrantly, so we need to - * reset the prefetching batch, besides, users may change the config - * of prefetch batch size, so we need to reset the prefetching batch. */ - resetCommandsBatch(); - - listNode *node = NULL; - while (listLength(mainThreadProcessingClients[t->id])) { - if (prefetch_clients <= 0) { - /* Reset the prefetching batch if we have processed all clients. */ - resetCommandsBatch(); - /* Prefetch the commands if no clients in the batch. */ - prefetch_clients = prefetchIOThreadCommands(t); - } - prefetch_clients--; - - /* Each time we pop up only the first client to process to guarantee - * reentrancy safety. */ - if (node) zfree(node); - node = listFirst(mainThreadProcessingClients[t->id]); - listUnlinkNode(mainThreadProcessingClients[t->id], node); - client *c = listNodeValue(node); - - /* Make sure the client is neither readable nor writable in io thread to - * avoid data race. */ - serverAssert(!(c->io_flags & (CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED))); - serverAssert(!(c->flags & CLIENT_CLOSE_ASAP)); - - /* Let main thread to run it, set running thread id first. */ - c->running_tid = IOTHREAD_MAIN_THREAD_ID; - - /* Free objects queued by IO thread for deferred freeing. */ - freeClientIODeferredObjects(c, 0); - tryUnlinkClientFromPendingRefReply(c, 0); - - /* If a read error occurs, handle it in the main thread first, since we - * want to print logs about client information before freeing. */ - if (isClientReadErrorFatal(c)) handleClientReadError(c); - - /* The client is asked to close in IO thread. */ - if (c->io_flags & CLIENT_IO_CLOSE_ASAP) { - freeClient(c); - continue; - } - - /* Update some client's members while we are in main thread so we avoid - * data races. */ - updateClientDataFromIOThread(c); - - /* Check if we need to run a cron job for the client */ - if (runClientCronFromIOThread(c)) continue; - - /* Process the pending command and input buffer. */ - if (!isClientReadErrorFatal(c) && c->io_flags & CLIENT_IO_PENDING_COMMAND) { - c->flags |= CLIENT_PENDING_COMMAND; - if (processPendingCommandAndInputBuffer(c) == C_ERR) { - /* If the client is no longer valid, it must be freed safely. */ - continue; - } - } - - /* We may have pending replies if io thread may not finish writing - * reply to client, so we did not put the client in pending write - * queue. And we should do that first since we may keep the client - * in main thread instead of returning to io threads. */ - if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c)) - putClientInPendingWriteQueue(c); - - /* The client only can be processed in the main thread, otherwise data - * race will happen, since we may touch client's data in main thread. */ - if (isClientMustHandledByMainThread(c)) { - keepClientInMainThread(c); - continue; - } - - /* Handle replica clients in putReplicasInPendingClientsToIOThreads in - * beforeSleep */ - if (c->flags & CLIENT_SLAVE) continue; - - /* Remove this client from pending write clients queue of main thread, - * And some clients may do not have reply if CLIENT REPLY OFF/SKIP. */ - if (c->flags & CLIENT_PENDING_WRITE) { - c->flags &= ~CLIENT_PENDING_WRITE; - listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node); - } - c->running_tid = c->tid; - listLinkNodeHead(mainThreadPendingClientsToIOThreads[c->tid], node); - node = NULL; - - /* If there are several clients to process, let io thread handle them ASAP. */ - sendPendingClientsToIOThreadIfNeeded(t, 1); - } - if (node) zfree(node); - - /* Send the clients to io thread without pending size check, since main thread - * may process clients from other io threads, so we need to send them to the - * io thread to process in prallel. */ - sendPendingClientsToIOThreadIfNeeded(t, 0); - - return processed; -} - -/* When the io thread finishes processing the client with the read event, it will - * notify the main thread through event triggering in IOThreadBeforeSleep. The main - * thread handles the event through this function. */ -void handleClientsFromIOThread(struct aeEventLoop *el, int fd, void *ptr, int mask) { - UNUSED(el); - UNUSED(mask); - - IOThread *t = ptr; - - /* Handle fd event first. */ - serverAssert(fd == getReadEventFd(mainThreadPendingClientsNotifiers[t->id])); - handleEventNotifier(mainThreadPendingClientsNotifiers[t->id]); - - /* Process the clients from IO threads. */ - processClientsFromIOThread(t); -} - -/* In the new threaded io design, one thread may process multiple clients, so when - * an io thread notifies the main thread of an event, there may be multiple clients - * with commands that need to be processed. But in the event handler function - * handleClientsFromIOThread may be blocked when processing the specific command, - * the previous clients can not get a reply, and the subsequent clients can not be - * processed, so we need to handle this scenario in beforeSleep. The function is to - * process the commands of subsequent clients from io threads. And another function - * sendPendingClientsToIOThreads make sure clients from io thread can get replies. - * See also beforeSleep. - * - * In beforeSleep, we also call this function to handle the clients that are - * transferred from io threads without notification. */ -int processClientsOfAllIOThreads(void) { - int processed = 0; - for (int i = 1; i < server.io_threads_num; i++) { - processed += processClientsFromIOThread(&IOThreads[i]); - } - return processed; -} - -/* After the main thread processes the clients, it will send the clients back to - * io threads to handle, and fire an event, the io thread handles the event by - * this function. */ -void handleClientsFromMainThread(struct aeEventLoop *ae, int fd, void *ptr, int mask) { - UNUSED(ae); - UNUSED(mask); - - IOThread *t = ptr; - - /* Handle fd event first. */ - serverAssert(fd == getReadEventFd(t->pending_clients_notifier)); - handleEventNotifier(t->pending_clients_notifier); - - /* Process the clients from main thread. */ - processClientsFromMainThread(t); -} - -/* Processing clients that have finished executing commands from the main thread. - * If the client is not binded to the event loop, we should bind it first and - * install read handler. If the client still has query buffer, we should process - * the input buffer. If the client has pending reply, we just reply to client, - * and then install write handler if needed. */ -int processClientsFromMainThread(IOThread *t) { - pthread_mutex_lock(&t->pending_clients_mutex); - listJoin(t->processing_clients, t->pending_clients); - pthread_mutex_unlock(&t->pending_clients_mutex); - size_t processed = listLength(t->processing_clients); - if (processed == 0) return 0; - - listIter li; - listNode *ln; - listRewind(t->processing_clients, &li); - while((ln = listNext(&li))) { - client *c = listNodeValue(ln); - serverAssert(!(c->io_flags & (CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED))); - /* Main thread must handle clients with CLIENT_CLOSE_ASAP flag, since - * we only set io_flags when clients in io thread are freed ASAP. */ - serverAssert(!(c->flags & CLIENT_CLOSE_ASAP)); - - /* Link client in IO thread clients list first. */ - serverAssert(c->io_thread_client_list_node == NULL); - listUnlinkNode(t->processing_clients, ln); - listLinkNodeTail(t->clients, ln); - c->io_thread_client_list_node = listLast(t->clients); - - /* The client now is in the IO thread, let's free deferred objects. */ - freeClientDeferredObjects(c, 0); - - /* The client is asked to close, we just let main thread free it. */ - if (c->io_flags & CLIENT_IO_CLOSE_ASAP) { - enqueuePendingClientsToMainThread(c, 1); - continue; - } - - /* Enable read and write and reset some flags. */ - c->io_flags |= CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED; - c->io_flags &= ~(CLIENT_IO_PENDING_COMMAND | CLIENT_IO_PENDING_CRON); - - /* Only bind once, we never remove read handler unless freeing client. */ - if (!connHasEventLoop(c->conn)) { - connRebindEventLoop(c->conn, t->el); - serverAssert(!connHasReadHandler(c->conn)); - connSetReadHandler(c->conn, readQueryFromClient); - } - - /* If the client has pending replies, write replies to client. */ - if (clientHasPendingReplies(c)) { - writeToClient(c, 0); - if (!(c->io_flags & CLIENT_IO_CLOSE_ASAP) && clientHasPendingReplies(c)) { - connSetWriteHandler(c->conn, sendReplyToClient); - } - } - } - /* All clients must are processed. */ - serverAssert(listLength(t->processing_clients) == 0); - return processed; -} - -void IOThreadBeforeSleep(struct aeEventLoop *el) { - IOThread *t = el->privdata[0]; - - /* Handle pending data(typical TLS). */ - connTypeProcessPendingData(el); - - /* If any connection type(typical TLS) still has pending unread data don't sleep at all. */ - int dont_sleep = connTypeHasPendingData(el); - - /* Process clients from main thread, since the main thread may deliver clients - * without notification during IO thread processing events. */ - if (processClientsFromMainThread(t) > 0) { - /* If there are clients that are processed, we should not sleep since main - * thread may want to continue deliverring clients without notification, so - * IO thread can process them ASAP, and the main thread can avoid unnecessary - * notification (write fd and wake up) is costly. */ - dont_sleep = 1; - } - if (!dont_sleep) { - atomicSetWithSync(t->running, 0); /* Not running if going to sleep. */ - /* Try to process clients from main thread again, since before we set - * running to 0, the main thread may deliver clients to this io thread. */ - processClientsFromMainThread(t); - } - aeSetDontWait(t->el, dont_sleep); - - /* Check if i am being paused, pause myself and resume. */ - handlePauseAndResume(t); - - /* Send clients to main thread to process, we don't check size here since - * we want to send all clients to main thread before going to sleeping. */ - sendPendingClientsToMainThreadIfNeeded(t, 0); -} - -void IOThreadAfterSleep(struct aeEventLoop *el) { - IOThread *t = el->privdata[0]; - - /* Set the IO thread to running state, so the main thread can deliver - * clients to it without extra notifications. */ - atomicSetWithSync(t->running, 1); -} - -/* Periodically transfer part of clients to the main thread for processing. */ -void IOThreadClientsCron(IOThread *t) { - /* Process at least a few clients while we are at it, even if we need - * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract - * of processing each client once per second. */ - int iterations = listLength(t->clients) / CONFIG_DEFAULT_HZ; - if (iterations < CLIENTS_CRON_MIN_ITERATIONS) { - iterations = CLIENTS_CRON_MIN_ITERATIONS; - } - - listIter li; - listNode *ln; - listRewind(t->clients, &li); - while ((ln = listNext(&li)) && iterations--) { - client *c = listNodeValue(ln); - /* Mark the client as pending cron, main thread will process it. */ - c->io_flags |= CLIENT_IO_PENDING_CRON; - enqueuePendingClientsToMainThread(c, 0); - } -} - -/* This is the IO thread timer interrupt, CONFIG_DEFAULT_HZ times per second. - * The current responsibility is to detect clients that have been stuck in the - * IO thread for too long and hand them over to the main thread for handling. */ -int IOThreadCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { - UNUSED(eventLoop); - UNUSED(id); - IOThread *t = clientData; - - /* Run cron tasks for the clients in the IO thread. */ - IOThreadClientsCron(t); - - return 1000/CONFIG_DEFAULT_HZ; -} - -/* The main function of IO thread, it will run an event loop. The mian thread - * and IO thread will communicate through event notifier. */ -void *IOThreadMain(void *ptr) { - IOThread *t = ptr; - char thdname[16]; - snprintf(thdname, sizeof(thdname), "io_thd_%d", t->id); - redis_set_thread_title(thdname); - redisSetCpuAffinity(server.server_cpulist); - makeThreadKillable(); - aeSetBeforeSleepProc(t->el, IOThreadBeforeSleep); - aeSetAfterSleepProc(t->el, IOThreadAfterSleep); - aeMain(t->el); - return NULL; -} - -/* Initialize the data structures needed for threaded I/O. */ -void initThreadedIO(void) { - if (server.io_threads_num <= 1) return; - - server.io_threads_active = 1; - - if (server.io_threads_num > IO_THREADS_MAX_NUM) { - serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " - "The maximum number is %d.", IO_THREADS_MAX_NUM); - exit(1); - } - - /* Spawn and initialize the I/O threads. */ - for (int i = 1; i < server.io_threads_num; i++) { - IOThread *t = &IOThreads[i]; - t->id = i; - t->el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); - t->el->privdata[0] = t; - t->pending_clients = listCreate(); - t->processing_clients = listCreate(); - t->pending_clients_to_main_thread = listCreate(); - t->clients = listCreate(); - atomicSetWithSync(t->paused, IO_THREAD_UNPAUSED); - atomicSetWithSync(t->running, 0); - - pthread_mutexattr_t *attr = NULL; - #if defined(__linux__) && defined(__GLIBC__) - attr = zmalloc(sizeof(pthread_mutexattr_t)); - pthread_mutexattr_init(attr); - pthread_mutexattr_settype(attr, PTHREAD_MUTEX_ADAPTIVE_NP); - #endif - pthread_mutex_init(&t->pending_clients_mutex, attr); - - t->pending_clients_notifier = createEventNotifier(); - if (aeCreateFileEvent(t->el, getReadEventFd(t->pending_clients_notifier), - AE_READABLE, handleClientsFromMainThread, t) != AE_OK) - { - serverLog(LL_WARNING, "Fatal: Can't register file event for IO thread notifications."); - exit(1); - } - - /* This is the timer callback of the IO thread, used to gradually handle - * some background operations, such as clients cron. */ - if (aeCreateTimeEvent(t->el, 1, IOThreadCron, t, NULL) == AE_ERR) { - serverLog(LL_WARNING, "Fatal: Can't create event loop timers in IO thread."); - exit(1); - } - - /* Create IO thread */ - if (pthread_create(&t->tid, NULL, IOThreadMain, (void*)t) != 0) { - serverLog(LL_WARNING, "Fatal: Can't initialize IO thread."); - exit(1); - } - - /* For main thread */ - mainThreadPendingClientsToIOThreads[i] = listCreate(); - mainThreadPendingClients[i] = listCreate(); - mainThreadProcessingClients[i] = listCreate(); - pthread_mutex_init(&mainThreadPendingClientsMutexes[i], attr); - mainThreadPendingClientsNotifiers[i] = createEventNotifier(); - if (aeCreateFileEvent(server.el, getReadEventFd(mainThreadPendingClientsNotifiers[i]), - AE_READABLE, handleClientsFromIOThread, t) != AE_OK) - { - serverLog(LL_WARNING, "Fatal: Can't register file event for main thread notifications."); - exit(1); - } - if (attr) zfree(attr); - } -} - -/* Kill the IO threads, TODO: release the applied resources. */ -void killIOThreads(void) { - if (server.io_threads_num <= 1) return; - - int err, j; - for (j = 1; j < server.io_threads_num; j++) { - if (IOThreads[j].tid == pthread_self()) continue; - if (IOThreads[j].tid && pthread_cancel(IOThreads[j].tid) == 0) { - if ((err = pthread_join(IOThreads[j].tid,NULL)) != 0) { - serverLog(LL_WARNING, - "IO thread(tid:%lu) can not be joined: %s", - (unsigned long)IOThreads[j].tid, strerror(err)); - } else { - serverLog(LL_WARNING, - "IO thread(tid:%lu) terminated",(unsigned long)IOThreads[j].tid); - } - } - } -} |
