summaryrefslogtreecommitdiff
path: root/examples/redis-unstable/src/iothread.c
diff options
context:
space:
mode:
Diffstat (limited to 'examples/redis-unstable/src/iothread.c')
-rw-r--r--examples/redis-unstable/src/iothread.c955
1 files changed, 0 insertions, 955 deletions
diff --git a/examples/redis-unstable/src/iothread.c b/examples/redis-unstable/src/iothread.c
deleted file mode 100644
index 54a9cae..0000000
--- a/examples/redis-unstable/src/iothread.c
+++ /dev/null
@@ -1,955 +0,0 @@
-/* iothread.c -- The threaded io implementation.
- *
- * Copyright (c) 2024-Present, Redis Ltd.
- * All rights reserved.
- *
- * Licensed under your choice of (a) the Redis Source Available License 2.0
- * (RSALv2); or (b) the Server Side Public License v1 (SSPLv1); or (c) the
- * GNU Affero General Public License v3 (AGPLv3).
- */
-
-#include "server.h"
-
-/* IO threads. */
-static IOThread IOThreads[IO_THREADS_MAX_NUM];
-
-/* For main thread */
-static list *mainThreadPendingClientsToIOThreads[IO_THREADS_MAX_NUM]; /* Clients to IO threads */
-static list *mainThreadProcessingClients[IO_THREADS_MAX_NUM]; /* Clients in processing */
-static list *mainThreadPendingClients[IO_THREADS_MAX_NUM]; /* Pending clients from IO threads */
-static pthread_mutex_t mainThreadPendingClientsMutexes[IO_THREADS_MAX_NUM]; /* Mutex for pending clients */
-static eventNotifier* mainThreadPendingClientsNotifiers[IO_THREADS_MAX_NUM]; /* Notifier for pending clients */
-
-/* Send the clients to the main thread for processing when the number of clients
- * in pending list reaches IO_THREAD_MAX_PENDING_CLIENTS, or check_size is 0. */
-static inline void sendPendingClientsToMainThreadIfNeeded(IOThread *t, int check_size) {
- size_t len = listLength(t->pending_clients_to_main_thread);
- if (len == 0 || (check_size && len < IO_THREAD_MAX_PENDING_CLIENTS)) return;
-
- int running = 0, pending = 0;
- pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]);
- pending = listLength(mainThreadPendingClients[t->id]);
- listJoin(mainThreadPendingClients[t->id], t->pending_clients_to_main_thread);
- pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]);
- if (!pending) atomicGetWithSync(server.running, running);
-
- /* Only notify main thread if it is not running and no pending clients to process,
- * to avoid unnecessary notify/wakeup. If the main thread is running, it will
- * process the clients in beforeSleep. If there are pending clients, we may
- * already notify the main thread if needed. */
- if (!running && !pending) {
- triggerEventNotifier(mainThreadPendingClientsNotifiers[t->id]);
- }
-}
-
-/* When moving a client from IO thread to main thread we may need to update
- * some of its variables as they are duplicated to avoid contention with main
- * thread.
- * For now this is valid only for master or slave clients. */
-void updateClientDataFromIOThread(client *c) {
- if (!(c->flags & CLIENT_MASTER) && !(c->flags & CLIENT_SLAVE)) return;
-
- serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID &&
- c->running_tid == IOTHREAD_MAIN_THREAD_ID);
-
- if (c->io_repl_ack_time > c->repl_ack_time) {
- serverAssert(c->flags & CLIENT_SLAVE);
- c->repl_ack_time = c->io_repl_ack_time;
- }
- if (c->io_lastinteraction > c->lastinteraction) {
- serverAssert(c->flags & CLIENT_MASTER);
- c->lastinteraction = c->io_lastinteraction;
- }
- if (c->io_read_reploff > c->read_reploff) {
- serverAssert(c->flags & CLIENT_MASTER);
- c->read_reploff = c->io_read_reploff;
- }
-
- /* Update replication buffer referenced node if IO thread has sent some data. */
- if (c->flags & CLIENT_SLAVE && c->ref_repl_buf_node != NULL &&
- (c->io_curr_repl_node != c->ref_repl_buf_node ||
- c->io_curr_block_pos != c->ref_block_pos))
- {
- ((replBufBlock*)listNodeValue(c->ref_repl_buf_node))->refcount--;
- ((replBufBlock*)listNodeValue(c->io_curr_repl_node))->refcount++;
- c->ref_block_pos = c->io_curr_block_pos;
- c->ref_repl_buf_node = c->io_curr_repl_node;
- incrementalTrimReplicationBacklog(REPL_BACKLOG_TRIM_BLOCKS_PER_CALL);
- }
-}
-
-/* Check to see if the client needs any cron jobs run for them. Return 1 if the
- * client should be terminated */
-int runClientCronFromIOThread(client *c) {
- if (c->flags & CLIENT_MASTER &&
- c->io_last_repl_cron + 1000 <= server.mstime)
- {
- c->io_last_repl_cron = server.mstime;
- if (replicationCronRunMasterClient()) return 1;
- }
-
- /* Run client cron task for the client per second or it is marked as pending cron. */
- if (c->io_last_client_cron + 1000 <= server.mstime ||
- c->io_flags & CLIENT_IO_PENDING_CRON)
- {
- c->io_last_client_cron = server.mstime;
- if (clientsCronRunClient(c)) return 1;
- } else {
- /* Update the client in the mem usage if clientsCronRunClient is not
- * being called, since that function already performs the update. */
- updateClientMemUsageAndBucket(c);
- }
-
- return 0;
-}
-
-/* When IO threads read a complete query of clients or want to free clients, it
- * should remove it from its clients list and put the client in the list to main
- * thread, we will send these clients to main thread in IOThreadBeforeSleep. */
-void enqueuePendingClientsToMainThread(client *c, int unbind) {
- /* If the IO thread may no longer manage it, such as closing client, we should
- * unbind client from event loop, so main thread doesn't need to do it costly. */
- if (unbind) connUnbindEventLoop(c->conn);
- /* Just skip if it already is transferred. */
- if (c->io_thread_client_list_node) {
- IOThread *t = &IOThreads[c->tid];
- /* If there are several clients to process, let the main thread handle them ASAP.
- * Since the client being added to the queue may still need to be processed by
- * the IO thread, we must call this before adding it to the queue to avoid
- * races with the main thread. */
- sendPendingClientsToMainThreadIfNeeded(t, 1);
- /* Disable read and write to avoid race when main thread processes. */
- c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED);
- /* Remove the client from IO thread, add it to main thread's pending list. */
- listUnlinkNode(t->clients, c->io_thread_client_list_node);
- listLinkNodeTail(t->pending_clients_to_main_thread, c->io_thread_client_list_node);
- c->io_thread_client_list_node = NULL;
- }
-}
-
-void enqueuePendingClienstToIOThreads(client *c) {
- serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID &&
- c->running_tid == IOTHREAD_MAIN_THREAD_ID);
-
- if (c->flags & CLIENT_PENDING_WRITE) {
- c->flags &= ~CLIENT_PENDING_WRITE;
- listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
- }
- if (c->flags & CLIENT_SLAVE) {
- serverAssert(c->ref_repl_buf_node != NULL);
-
- c->io_repl_ack_time = c->repl_ack_time;
- c->io_curr_repl_node = c->ref_repl_buf_node;
- c->io_curr_block_pos = c->ref_block_pos;
- c->io_bound_repl_node = listLast(server.repl_buffer_blocks);
- c->io_bound_block_pos = ((replBufBlock*)listNodeValue(c->io_bound_repl_node))->used;
- }
- if (c->flags & CLIENT_MASTER) {
- c->io_read_reploff = c->read_reploff;
- c->io_lastinteraction = c->lastinteraction;
- }
-
- c->running_tid = c->tid;
- listAddNodeHead(mainThreadPendingClientsToIOThreads[c->tid], c);
-}
-
-/* Unbind connection of client from io thread event loop, write and read handlers
- * also be removed, ensures that we can operate the client safely. */
-void unbindClientFromIOThreadEventLoop(client *c) {
- serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID &&
- c->running_tid == IOTHREAD_MAIN_THREAD_ID);
- if (!connHasEventLoop(c->conn)) return;
- /* As calling in main thread, we should pause the io thread to make it safe. */
- pauseIOThread(c->tid);
- connUnbindEventLoop(c->conn);
- resumeIOThread(c->tid);
-}
-
-/* When main thread is processing a client from IO thread, and wants to keep it,
- * we should unbind connection of client from io thread event loop first,
- * and then bind the client connection into server's event loop. */
-void keepClientInMainThread(client *c) {
- if (c->tid == IOTHREAD_MAIN_THREAD_ID) return;
- serverAssert(c->running_tid == IOTHREAD_MAIN_THREAD_ID);
- /* IO thread no longer manage it. */
- server.io_threads_clients_num[c->tid]--;
- /* Unbind connection of client from io thread event loop. */
- unbindClientFromIOThreadEventLoop(c);
- /* Update the client's data in case it was just fetched from IO thread */
- updateClientDataFromIOThread(c);
- /* Let main thread to run it, rebind event loop and read handler */
- connRebindEventLoop(c->conn, server.el);
- connSetReadHandler(c->conn, readQueryFromClient);
- c->io_flags |= CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED;
- c->tid = IOTHREAD_MAIN_THREAD_ID;
- freeClientDeferredObjects(c, 1); /* Free deferred objects. */
- freeClientIODeferredObjects(c, 1); /* Free IO deferred objects. */
- tryUnlinkClientFromPendingRefReply(c, 0);
- /* Main thread starts to manage it. */
- server.io_threads_clients_num[c->tid]++;
-}
-
-/* If the client is managed by IO thread, we should fetch it from IO thread
- * and then main thread will can process it. Just like IO Thread transfers
- * the client to the main thread for processing. */
-void fetchClientFromIOThread(client *c) {
- serverAssert(c->tid != IOTHREAD_MAIN_THREAD_ID &&
- c->running_tid != IOTHREAD_MAIN_THREAD_ID);
- pauseIOThread(c->tid);
- /* Remove the client from clients list of IO thread or main thread. */
- if (c->io_thread_client_list_node) {
- listDelNode(IOThreads[c->tid].clients, c->io_thread_client_list_node);
- c->io_thread_client_list_node = NULL;
- } else {
- list *clients[5] = {
- IOThreads[c->tid].pending_clients,
- IOThreads[c->tid].pending_clients_to_main_thread,
- mainThreadPendingClients[c->tid],
- mainThreadProcessingClients[c->tid],
- mainThreadPendingClientsToIOThreads[c->tid]
- };
- for (int i = 0; i < 5; i++) {
- listNode *ln = listSearchKey(clients[i], c);
- if (ln) {
- listDelNode(clients[i], ln);
- /* Client only can be in one client list. */
- break;
- }
- }
- }
- /* Unbind connection of client from io thread event loop. */
- connUnbindEventLoop(c->conn);
- /* Now main thread can process it. */
- resumeIOThread(c->tid);
-
- /* Keep the client in main thread. */
- c->running_tid = IOTHREAD_MAIN_THREAD_ID;
- keepClientInMainThread(c);
-}
-
-/* For some clients, we must handle them in the main thread, since there is
- * data race to be processed in IO threads.
- *
- * - Close ASAP, we must free the client in main thread.
- * - Pubsub, monitor, blocked, tracking clients, main thread may
- * directly write them a reply when conditions are met.
- * - Script command with debug may operate connection directly.
- * - Master/Replica are only handled by IO thread when RDB replication is
- * completed. Note we need to check them after checking for other flags
- * that may overlap with CLIENT_MASTER/SLAVE - CLOSE_ASAP, MONITOR,
- * (UN)BLOCKED, TRACKING. */
-int isClientMustHandledByMainThread(client *c) {
- if (c->flags & (CLIENT_CLOSE_ASAP |
- CLIENT_PUBSUB | CLIENT_MONITOR | CLIENT_BLOCKED |
- CLIENT_UNBLOCKED | CLIENT_TRACKING | CLIENT_LUA_DEBUG |
- CLIENT_LUA_DEBUG_SYNC | CLIENT_ASM_MIGRATING |
- CLIENT_ASM_IMPORTING))
- {
- return 1;
- }
-
- /* If RDB replication is done it's safe to move the master client to an IO thread.
- * Note that we keep the master client in main thread during failover so as
- * not to slow down the failover process by waiting the master replication
- * cron in IO thread. */
- if (c->flags & CLIENT_MASTER &&
- server.repl_state == REPL_STATE_CONNECTED &&
- server.repl_rdb_ch_state == REPL_RDB_CH_STATE_NONE &&
- server.failover_state == NO_FAILOVER)
- {
- return 0;
- }
-
- /* If RDB replication is done for this slave it's safe to move it to an IO thread
- * Note that we also check if the ref_repl_buf_node is initialized in order
- * to prevent race conditions with main thread when it feeds the replication
- * buffer. */
- if (c->flags & CLIENT_SLAVE &&
- (c->replstate == SLAVE_STATE_ONLINE ||
- c->replstate == SLAVE_STATE_SEND_BULK_AND_STREAM) &&
- c->repl_start_cmd_stream_on_ack == 0 &&
- c->ref_repl_buf_node != NULL)
- {
- return 0;
- }
-
- if (c->flags & (CLIENT_MASTER | CLIENT_SLAVE)) return 1;
-
- return 0;
-}
-
-/* When the main thread accepts a new client or transfers clients to IO threads,
- * it assigns the client to the IO thread with the fewest clients. */
-void assignClientToIOThread(client *c) {
- serverAssert(c->tid == IOTHREAD_MAIN_THREAD_ID);
- /* Find the IO thread with the fewest clients. */
- int min_id = 0;
- int min = INT_MAX;
- for (int i = 1; i < server.io_threads_num; i++) {
- if (server.io_threads_clients_num[i] < min) {
- min = server.io_threads_clients_num[i];
- min_id = i;
- }
- }
-
- /* Assign the client to the IO thread. */
- server.io_threads_clients_num[c->tid]--;
- c->tid = min_id;
- server.io_threads_clients_num[min_id]++;
-
- /* The client running in IO thread needs to have deferred objects array. */
- c->deferred_objects = zmalloc(sizeof(deferredObject) * CLIENT_MAX_DEFERRED_OBJECTS);
-
- /* Unbind connection of client from main thread event loop, disable read and
- * write, and then put it in the list, main thread will send these clients
- * to IO thread in beforeSleep. */
- connUnbindEventLoop(c->conn);
- c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED);
-
- enqueuePendingClienstToIOThreads(c);
-}
-
-/* If updating maxclients config, we not only resize the event loop of main thread
- * but also resize the event loop of all io threads, and if one thread is failed,
- * it is failed totally, since a fd can be distributed into any IO thread. */
-int resizeAllIOThreadsEventLoops(size_t newsize) {
- int result = AE_OK;
- if (server.io_threads_num <= 1) return result;
-
- /* To make context safe. */
- pauseAllIOThreads();
- for (int i = 1; i < server.io_threads_num; i++) {
- IOThread *t = &IOThreads[i];
- if (aeResizeSetSize(t->el, newsize) == AE_ERR)
- result = AE_ERR;
- }
- resumeAllIOThreads();
- return result;
-}
-
-/* In the main thread, we may want to operate data of io threads, maybe uninstall
- * event handler, access query/output buffer or resize event loop, we need a clean
- * and safe context to do that. We pause io thread in IOThreadBeforeSleep, do some
- * jobs and then resume it. To avoid thread suspended, we use busy waiting to confirm
- * the target status. Besides we use atomic variable to make sure memory visibility
- * and ordering.
- *
- * Make sure that only the main thread can call these function,
- * - pauseIOThread, resumeIOThread
- * - pauseAllIOThreads, resumeAllIOThreads
- * - pauseIOThreadsRange, resumeIOThreadsRange
- *
- * The main thread will pause the io thread, and then wait for the io thread to
- * be paused. The io thread will check the paused status in IOThreadBeforeSleep,
- * and then pause itself.
- *
- * The main thread will resume the io thread, and then wait for the io thread to
- * be resumed. The io thread will check the paused status in IOThreadBeforeSleep,
- * and then resume itself.
- */
-
-/* We may pause the same io thread nestedly, so we need to record the times of
- * pausing, and only when the times of pausing is 0, we can pause the io thread,
- * and only when the times of pausing is 1, we can resume the io thread. */
-static int PausedIOThreads[IO_THREADS_MAX_NUM] = {0};
-
-/* Pause the specific range of io threads, and wait for them to be paused. */
-void pauseIOThreadsRange(int start, int end) {
- if (!server.io_threads_active) return;
- serverAssert(start >= 1 && end < server.io_threads_num && start <= end);
- serverAssert(pthread_equal(pthread_self(), server.main_thread_id));
-
- /* Try to make all io threads paused in parallel */
- for (int i = start; i <= end; i++) {
- PausedIOThreads[i]++;
- /* Skip if already paused */
- if (PausedIOThreads[i] > 1) continue;
-
- int paused;
- atomicGetWithSync(IOThreads[i].paused, paused);
- /* Don't support to call reentrant */
- serverAssert(paused == IO_THREAD_UNPAUSED);
- atomicSetWithSync(IOThreads[i].paused, IO_THREAD_PAUSING);
- /* Just notify io thread, no actual job, since io threads check paused
- * status in IOThreadBeforeSleep, so just wake it up if polling wait. */
- triggerEventNotifier(IOThreads[i].pending_clients_notifier);
- }
-
- /* Wait for all io threads paused */
- for (int i = start; i <= end; i++) {
- if (PausedIOThreads[i] > 1) continue;
- int paused = IO_THREAD_PAUSING;
- while (paused != IO_THREAD_PAUSED) {
- atomicGetWithSync(IOThreads[i].paused, paused);
- }
- }
-}
-
-/* Resume the specific range of io threads, and wait for them to be resumed. */
-void resumeIOThreadsRange(int start, int end) {
- if (!server.io_threads_active) return;
- serverAssert(start >= 1 && end < server.io_threads_num && start <= end);
- serverAssert(pthread_equal(pthread_self(), server.main_thread_id));
-
- for (int i = start; i <= end; i++) {
- serverAssert(PausedIOThreads[i] > 0);
- PausedIOThreads[i]--;
- if (PausedIOThreads[i] > 0) continue;
-
- int paused;
- /* Check if it is paused, since we must call 'pause' and
- * 'resume' in pairs */
- atomicGetWithSync(IOThreads[i].paused, paused);
- serverAssert(paused == IO_THREAD_PAUSED);
- /* Resume */
- atomicSetWithSync(IOThreads[i].paused, IO_THREAD_RESUMING);
- while (paused != IO_THREAD_UNPAUSED) {
- atomicGetWithSync(IOThreads[i].paused, paused);
- }
- }
-}
-
-/* The IO thread checks whether it is being paused, and if so, it pauses itself
- * and waits for resuming, corresponding to the pause/resumeIOThread* functions.
- * Currently, this is only called in IOThreadBeforeSleep, as there are no pending
- * I/O events at this point, with a clean context. */
-void handlePauseAndResume(IOThread *t) {
- int paused;
- /* Check if i am being paused. */
- atomicGetWithSync(t->paused, paused);
- if (paused == IO_THREAD_PAUSING) {
- atomicSetWithSync(t->paused, IO_THREAD_PAUSED);
- /* Wait for resuming */
- while (paused != IO_THREAD_RESUMING) {
- atomicGetWithSync(t->paused, paused);
- }
- atomicSetWithSync(t->paused, IO_THREAD_UNPAUSED);
- }
-}
-
-/* Pause the specific io thread, and wait for it to be paused. */
-void pauseIOThread(int id) {
- pauseIOThreadsRange(id, id);
-}
-
-/* Resume the specific io thread, and wait for it to be resumed. */
-void resumeIOThread(int id) {
- resumeIOThreadsRange(id, id);
-}
-
-/* Pause all io threads, and wait for them to be paused. */
-void pauseAllIOThreads(void) {
- pauseIOThreadsRange(1, server.io_threads_num-1);
-}
-
-/* Resume all io threads, and wait for them to be resumed. */
-void resumeAllIOThreads(void) {
- resumeIOThreadsRange(1, server.io_threads_num-1);
-}
-
-/* Add the pending clients to the list of IO threads, and trigger an event to
- * notify io threads to handle. */
-int sendPendingClientsToIOThreads(void) {
- int processed = 0;
- for (int i = 1; i < server.io_threads_num; i++) {
- int len = listLength(mainThreadPendingClientsToIOThreads[i]);
- if (len > 0) {
- IOThread *t = &IOThreads[i];
- pthread_mutex_lock(&t->pending_clients_mutex);
- listJoin(t->pending_clients, mainThreadPendingClientsToIOThreads[i]);
- pthread_mutex_unlock(&t->pending_clients_mutex);
- /* Trigger an event, maybe an error is returned when buffer is full
- * if using pipe, but no worry, io thread will handle all clients
- * in list when receiving a notification. */
- triggerEventNotifier(t->pending_clients_notifier);
- }
- processed += len;
- }
- return processed;
-}
-
-/* Prefetch the commands from the IO thread. The return value is the number
- * of clients that have been prefetched. */
-int prefetchIOThreadCommands(IOThread *t) {
- int len = listLength(mainThreadProcessingClients[t->id]);
- int to_prefetch = determinePrefetchCount(len);
- if (to_prefetch == 0) return 0;
-
- /* Two-phase approach to optimize cache utilization:
- * Phase 1: Issue prefetch hints for client structures
- * Phase 2: Access the now-cached client data and add commands to batch */
- /* Since we double the configured size for better performance,
- * see also `determinePrefetchCount` */
- static client *c[PREFETCH_BATCH_MAX_SIZE*2];
- serverAssert(PREFETCH_BATCH_MAX_SIZE*2 >= to_prefetch );
- int clients = 0;
- listIter li;
- listNode *ln;
- listRewind(mainThreadProcessingClients[t->id], &li);
- /* Phase 1: Issue prefetch instructions for client struct and pending_cmds.
- * These prefetches will bring data into cache asynchronously. */
- for (int i = 0; i < to_prefetch && (ln = listNext(&li)); i++) {
- c[i] = listNodeValue(ln);
- redis_prefetch_read(c[i]);
- redis_prefetch_read(&c[i]->pending_cmds);
- }
- /* Phase 2: Access client data (now likely in cache) and add to batch.
- * Also prefetch additional fields (reply, mem_usage_bucket) that will be
- * needed later during command execution. */
- for (int i = 0; i < to_prefetch; i++) {
- if (addCommandToBatch(c[i]) == C_ERR) break;
- if (c[i]->reply) redis_prefetch_read(c[i]->reply);
- redis_prefetch_read(&c[i]->mem_usage_bucket);
- clients++;
- }
- /* Prefetch the commands in the batch. */
- prefetchCommands();
- return clients;
-}
-
-extern int ProcessingEventsWhileBlocked;
-
-/* Send the pending clients to the IO thread if the number of pending clients
- * is greater than IO_THREAD_MAX_PENDING_CLIENTS, or if size_check is 0. */
-static inline void sendPendingClientsToIOThreadIfNeeded(IOThread *t, int size_check) {
- size_t len = listLength(mainThreadPendingClientsToIOThreads[t->id]);
- if (len == 0 || (size_check && len < IO_THREAD_MAX_PENDING_CLIENTS)) return;
-
- /* If AOF fsync policy is always, we should not let io thread handle these
- * clients now since we don't flush AOF buffer to file and sync yet.
- * So these clients will be delayed to send io threads in beforeSleep after
- * flushAppendOnlyFile.
- *
- * If we are in processEventsWhileBlocked, we don't send clients to io threads
- * now, we want to update server.events_processed_while_blocked accurately. */
- if (server.aof_fsync != AOF_FSYNC_ALWAYS && !ProcessingEventsWhileBlocked) {
- int running = 0, pending = 0;
- pthread_mutex_lock(&(t->pending_clients_mutex));
- pending = listLength(t->pending_clients);
- listJoin(t->pending_clients, mainThreadPendingClientsToIOThreads[t->id]);
- pthread_mutex_unlock(&(t->pending_clients_mutex));
- if (!pending) atomicGetWithSync(t->running, running);
-
- /* Only notify io thread if it is not running and no pending clients to
- * process, to avoid unnecessary notify/wakeup. If the io thread is running,
- * it will process the clients in beforeSleep. If there are pending clients,
- * we may already notify the io thread if needed. */
- if(!running && !pending) triggerEventNotifier(t->pending_clients_notifier);
- }
-}
-
-/* The main thread processes the clients from IO threads, these clients may have
- * a complete command to execute or need to be freed. Note that IO threads never
- * free client since this operation access much server data.
- *
- * Please notice that this function may be called reentrantly, i,e, the same goes
- * for handleClientsFromIOThread and processClientsOfAllIOThreads. For example,
- * when processing script command, it may call processEventsWhileBlocked to
- * process new events, if the clients with fired events from the same io thread,
- * it may call this function reentrantly. */
-int processClientsFromIOThread(IOThread *t) {
- /* Get the list of clients to process. */
- pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]);
- listJoin(mainThreadProcessingClients[t->id], mainThreadPendingClients[t->id]);
- pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]);
- size_t processed = listLength(mainThreadProcessingClients[t->id]);
- if (processed == 0) return 0;
-
- int prefetch_clients = 0;
- /* We may call processClientsFromIOThread reentrantly, so we need to
- * reset the prefetching batch, besides, users may change the config
- * of prefetch batch size, so we need to reset the prefetching batch. */
- resetCommandsBatch();
-
- listNode *node = NULL;
- while (listLength(mainThreadProcessingClients[t->id])) {
- if (prefetch_clients <= 0) {
- /* Reset the prefetching batch if we have processed all clients. */
- resetCommandsBatch();
- /* Prefetch the commands if no clients in the batch. */
- prefetch_clients = prefetchIOThreadCommands(t);
- }
- prefetch_clients--;
-
- /* Each time we pop up only the first client to process to guarantee
- * reentrancy safety. */
- if (node) zfree(node);
- node = listFirst(mainThreadProcessingClients[t->id]);
- listUnlinkNode(mainThreadProcessingClients[t->id], node);
- client *c = listNodeValue(node);
-
- /* Make sure the client is neither readable nor writable in io thread to
- * avoid data race. */
- serverAssert(!(c->io_flags & (CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED)));
- serverAssert(!(c->flags & CLIENT_CLOSE_ASAP));
-
- /* Let main thread to run it, set running thread id first. */
- c->running_tid = IOTHREAD_MAIN_THREAD_ID;
-
- /* Free objects queued by IO thread for deferred freeing. */
- freeClientIODeferredObjects(c, 0);
- tryUnlinkClientFromPendingRefReply(c, 0);
-
- /* If a read error occurs, handle it in the main thread first, since we
- * want to print logs about client information before freeing. */
- if (isClientReadErrorFatal(c)) handleClientReadError(c);
-
- /* The client is asked to close in IO thread. */
- if (c->io_flags & CLIENT_IO_CLOSE_ASAP) {
- freeClient(c);
- continue;
- }
-
- /* Update some client's members while we are in main thread so we avoid
- * data races. */
- updateClientDataFromIOThread(c);
-
- /* Check if we need to run a cron job for the client */
- if (runClientCronFromIOThread(c)) continue;
-
- /* Process the pending command and input buffer. */
- if (!isClientReadErrorFatal(c) && c->io_flags & CLIENT_IO_PENDING_COMMAND) {
- c->flags |= CLIENT_PENDING_COMMAND;
- if (processPendingCommandAndInputBuffer(c) == C_ERR) {
- /* If the client is no longer valid, it must be freed safely. */
- continue;
- }
- }
-
- /* We may have pending replies if io thread may not finish writing
- * reply to client, so we did not put the client in pending write
- * queue. And we should do that first since we may keep the client
- * in main thread instead of returning to io threads. */
- if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
- putClientInPendingWriteQueue(c);
-
- /* The client only can be processed in the main thread, otherwise data
- * race will happen, since we may touch client's data in main thread. */
- if (isClientMustHandledByMainThread(c)) {
- keepClientInMainThread(c);
- continue;
- }
-
- /* Handle replica clients in putReplicasInPendingClientsToIOThreads in
- * beforeSleep */
- if (c->flags & CLIENT_SLAVE) continue;
-
- /* Remove this client from pending write clients queue of main thread,
- * And some clients may do not have reply if CLIENT REPLY OFF/SKIP. */
- if (c->flags & CLIENT_PENDING_WRITE) {
- c->flags &= ~CLIENT_PENDING_WRITE;
- listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
- }
- c->running_tid = c->tid;
- listLinkNodeHead(mainThreadPendingClientsToIOThreads[c->tid], node);
- node = NULL;
-
- /* If there are several clients to process, let io thread handle them ASAP. */
- sendPendingClientsToIOThreadIfNeeded(t, 1);
- }
- if (node) zfree(node);
-
- /* Send the clients to io thread without pending size check, since main thread
- * may process clients from other io threads, so we need to send them to the
- * io thread to process in prallel. */
- sendPendingClientsToIOThreadIfNeeded(t, 0);
-
- return processed;
-}
-
-/* When the io thread finishes processing the client with the read event, it will
- * notify the main thread through event triggering in IOThreadBeforeSleep. The main
- * thread handles the event through this function. */
-void handleClientsFromIOThread(struct aeEventLoop *el, int fd, void *ptr, int mask) {
- UNUSED(el);
- UNUSED(mask);
-
- IOThread *t = ptr;
-
- /* Handle fd event first. */
- serverAssert(fd == getReadEventFd(mainThreadPendingClientsNotifiers[t->id]));
- handleEventNotifier(mainThreadPendingClientsNotifiers[t->id]);
-
- /* Process the clients from IO threads. */
- processClientsFromIOThread(t);
-}
-
-/* In the new threaded io design, one thread may process multiple clients, so when
- * an io thread notifies the main thread of an event, there may be multiple clients
- * with commands that need to be processed. But in the event handler function
- * handleClientsFromIOThread may be blocked when processing the specific command,
- * the previous clients can not get a reply, and the subsequent clients can not be
- * processed, so we need to handle this scenario in beforeSleep. The function is to
- * process the commands of subsequent clients from io threads. And another function
- * sendPendingClientsToIOThreads make sure clients from io thread can get replies.
- * See also beforeSleep.
- *
- * In beforeSleep, we also call this function to handle the clients that are
- * transferred from io threads without notification. */
-int processClientsOfAllIOThreads(void) {
- int processed = 0;
- for (int i = 1; i < server.io_threads_num; i++) {
- processed += processClientsFromIOThread(&IOThreads[i]);
- }
- return processed;
-}
-
-/* After the main thread processes the clients, it will send the clients back to
- * io threads to handle, and fire an event, the io thread handles the event by
- * this function. */
-void handleClientsFromMainThread(struct aeEventLoop *ae, int fd, void *ptr, int mask) {
- UNUSED(ae);
- UNUSED(mask);
-
- IOThread *t = ptr;
-
- /* Handle fd event first. */
- serverAssert(fd == getReadEventFd(t->pending_clients_notifier));
- handleEventNotifier(t->pending_clients_notifier);
-
- /* Process the clients from main thread. */
- processClientsFromMainThread(t);
-}
-
-/* Processing clients that have finished executing commands from the main thread.
- * If the client is not binded to the event loop, we should bind it first and
- * install read handler. If the client still has query buffer, we should process
- * the input buffer. If the client has pending reply, we just reply to client,
- * and then install write handler if needed. */
-int processClientsFromMainThread(IOThread *t) {
- pthread_mutex_lock(&t->pending_clients_mutex);
- listJoin(t->processing_clients, t->pending_clients);
- pthread_mutex_unlock(&t->pending_clients_mutex);
- size_t processed = listLength(t->processing_clients);
- if (processed == 0) return 0;
-
- listIter li;
- listNode *ln;
- listRewind(t->processing_clients, &li);
- while((ln = listNext(&li))) {
- client *c = listNodeValue(ln);
- serverAssert(!(c->io_flags & (CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED)));
- /* Main thread must handle clients with CLIENT_CLOSE_ASAP flag, since
- * we only set io_flags when clients in io thread are freed ASAP. */
- serverAssert(!(c->flags & CLIENT_CLOSE_ASAP));
-
- /* Link client in IO thread clients list first. */
- serverAssert(c->io_thread_client_list_node == NULL);
- listUnlinkNode(t->processing_clients, ln);
- listLinkNodeTail(t->clients, ln);
- c->io_thread_client_list_node = listLast(t->clients);
-
- /* The client now is in the IO thread, let's free deferred objects. */
- freeClientDeferredObjects(c, 0);
-
- /* The client is asked to close, we just let main thread free it. */
- if (c->io_flags & CLIENT_IO_CLOSE_ASAP) {
- enqueuePendingClientsToMainThread(c, 1);
- continue;
- }
-
- /* Enable read and write and reset some flags. */
- c->io_flags |= CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED;
- c->io_flags &= ~(CLIENT_IO_PENDING_COMMAND | CLIENT_IO_PENDING_CRON);
-
- /* Only bind once, we never remove read handler unless freeing client. */
- if (!connHasEventLoop(c->conn)) {
- connRebindEventLoop(c->conn, t->el);
- serverAssert(!connHasReadHandler(c->conn));
- connSetReadHandler(c->conn, readQueryFromClient);
- }
-
- /* If the client has pending replies, write replies to client. */
- if (clientHasPendingReplies(c)) {
- writeToClient(c, 0);
- if (!(c->io_flags & CLIENT_IO_CLOSE_ASAP) && clientHasPendingReplies(c)) {
- connSetWriteHandler(c->conn, sendReplyToClient);
- }
- }
- }
- /* All clients must are processed. */
- serverAssert(listLength(t->processing_clients) == 0);
- return processed;
-}
-
-void IOThreadBeforeSleep(struct aeEventLoop *el) {
- IOThread *t = el->privdata[0];
-
- /* Handle pending data(typical TLS). */
- connTypeProcessPendingData(el);
-
- /* If any connection type(typical TLS) still has pending unread data don't sleep at all. */
- int dont_sleep = connTypeHasPendingData(el);
-
- /* Process clients from main thread, since the main thread may deliver clients
- * without notification during IO thread processing events. */
- if (processClientsFromMainThread(t) > 0) {
- /* If there are clients that are processed, we should not sleep since main
- * thread may want to continue deliverring clients without notification, so
- * IO thread can process them ASAP, and the main thread can avoid unnecessary
- * notification (write fd and wake up) is costly. */
- dont_sleep = 1;
- }
- if (!dont_sleep) {
- atomicSetWithSync(t->running, 0); /* Not running if going to sleep. */
- /* Try to process clients from main thread again, since before we set
- * running to 0, the main thread may deliver clients to this io thread. */
- processClientsFromMainThread(t);
- }
- aeSetDontWait(t->el, dont_sleep);
-
- /* Check if i am being paused, pause myself and resume. */
- handlePauseAndResume(t);
-
- /* Send clients to main thread to process, we don't check size here since
- * we want to send all clients to main thread before going to sleeping. */
- sendPendingClientsToMainThreadIfNeeded(t, 0);
-}
-
-void IOThreadAfterSleep(struct aeEventLoop *el) {
- IOThread *t = el->privdata[0];
-
- /* Set the IO thread to running state, so the main thread can deliver
- * clients to it without extra notifications. */
- atomicSetWithSync(t->running, 1);
-}
-
-/* Periodically transfer part of clients to the main thread for processing. */
-void IOThreadClientsCron(IOThread *t) {
- /* Process at least a few clients while we are at it, even if we need
- * to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
- * of processing each client once per second. */
- int iterations = listLength(t->clients) / CONFIG_DEFAULT_HZ;
- if (iterations < CLIENTS_CRON_MIN_ITERATIONS) {
- iterations = CLIENTS_CRON_MIN_ITERATIONS;
- }
-
- listIter li;
- listNode *ln;
- listRewind(t->clients, &li);
- while ((ln = listNext(&li)) && iterations--) {
- client *c = listNodeValue(ln);
- /* Mark the client as pending cron, main thread will process it. */
- c->io_flags |= CLIENT_IO_PENDING_CRON;
- enqueuePendingClientsToMainThread(c, 0);
- }
-}
-
-/* This is the IO thread timer interrupt, CONFIG_DEFAULT_HZ times per second.
- * The current responsibility is to detect clients that have been stuck in the
- * IO thread for too long and hand them over to the main thread for handling. */
-int IOThreadCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
- UNUSED(eventLoop);
- UNUSED(id);
- IOThread *t = clientData;
-
- /* Run cron tasks for the clients in the IO thread. */
- IOThreadClientsCron(t);
-
- return 1000/CONFIG_DEFAULT_HZ;
-}
-
-/* The main function of IO thread, it will run an event loop. The mian thread
- * and IO thread will communicate through event notifier. */
-void *IOThreadMain(void *ptr) {
- IOThread *t = ptr;
- char thdname[16];
- snprintf(thdname, sizeof(thdname), "io_thd_%d", t->id);
- redis_set_thread_title(thdname);
- redisSetCpuAffinity(server.server_cpulist);
- makeThreadKillable();
- aeSetBeforeSleepProc(t->el, IOThreadBeforeSleep);
- aeSetAfterSleepProc(t->el, IOThreadAfterSleep);
- aeMain(t->el);
- return NULL;
-}
-
-/* Initialize the data structures needed for threaded I/O. */
-void initThreadedIO(void) {
- if (server.io_threads_num <= 1) return;
-
- server.io_threads_active = 1;
-
- if (server.io_threads_num > IO_THREADS_MAX_NUM) {
- serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
- "The maximum number is %d.", IO_THREADS_MAX_NUM);
- exit(1);
- }
-
- /* Spawn and initialize the I/O threads. */
- for (int i = 1; i < server.io_threads_num; i++) {
- IOThread *t = &IOThreads[i];
- t->id = i;
- t->el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
- t->el->privdata[0] = t;
- t->pending_clients = listCreate();
- t->processing_clients = listCreate();
- t->pending_clients_to_main_thread = listCreate();
- t->clients = listCreate();
- atomicSetWithSync(t->paused, IO_THREAD_UNPAUSED);
- atomicSetWithSync(t->running, 0);
-
- pthread_mutexattr_t *attr = NULL;
- #if defined(__linux__) && defined(__GLIBC__)
- attr = zmalloc(sizeof(pthread_mutexattr_t));
- pthread_mutexattr_init(attr);
- pthread_mutexattr_settype(attr, PTHREAD_MUTEX_ADAPTIVE_NP);
- #endif
- pthread_mutex_init(&t->pending_clients_mutex, attr);
-
- t->pending_clients_notifier = createEventNotifier();
- if (aeCreateFileEvent(t->el, getReadEventFd(t->pending_clients_notifier),
- AE_READABLE, handleClientsFromMainThread, t) != AE_OK)
- {
- serverLog(LL_WARNING, "Fatal: Can't register file event for IO thread notifications.");
- exit(1);
- }
-
- /* This is the timer callback of the IO thread, used to gradually handle
- * some background operations, such as clients cron. */
- if (aeCreateTimeEvent(t->el, 1, IOThreadCron, t, NULL) == AE_ERR) {
- serverLog(LL_WARNING, "Fatal: Can't create event loop timers in IO thread.");
- exit(1);
- }
-
- /* Create IO thread */
- if (pthread_create(&t->tid, NULL, IOThreadMain, (void*)t) != 0) {
- serverLog(LL_WARNING, "Fatal: Can't initialize IO thread.");
- exit(1);
- }
-
- /* For main thread */
- mainThreadPendingClientsToIOThreads[i] = listCreate();
- mainThreadPendingClients[i] = listCreate();
- mainThreadProcessingClients[i] = listCreate();
- pthread_mutex_init(&mainThreadPendingClientsMutexes[i], attr);
- mainThreadPendingClientsNotifiers[i] = createEventNotifier();
- if (aeCreateFileEvent(server.el, getReadEventFd(mainThreadPendingClientsNotifiers[i]),
- AE_READABLE, handleClientsFromIOThread, t) != AE_OK)
- {
- serverLog(LL_WARNING, "Fatal: Can't register file event for main thread notifications.");
- exit(1);
- }
- if (attr) zfree(attr);
- }
-}
-
-/* Kill the IO threads, TODO: release the applied resources. */
-void killIOThreads(void) {
- if (server.io_threads_num <= 1) return;
-
- int err, j;
- for (j = 1; j < server.io_threads_num; j++) {
- if (IOThreads[j].tid == pthread_self()) continue;
- if (IOThreads[j].tid && pthread_cancel(IOThreads[j].tid) == 0) {
- if ((err = pthread_join(IOThreads[j].tid,NULL)) != 0) {
- serverLog(LL_WARNING,
- "IO thread(tid:%lu) can not be joined: %s",
- (unsigned long)IOThreads[j].tid, strerror(err));
- } else {
- serverLog(LL_WARNING,
- "IO thread(tid:%lu) terminated",(unsigned long)IOThreads[j].tid);
- }
- }
- }
-}