diff options
Diffstat (limited to 'llama.cpp/tools/server/server-queue.h')
| -rw-r--r-- | llama.cpp/tools/server/server-queue.h | 197 |
1 files changed, 197 insertions, 0 deletions
diff --git a/llama.cpp/tools/server/server-queue.h b/llama.cpp/tools/server/server-queue.h new file mode 100644 index 0000000..164f09b --- /dev/null +++ b/llama.cpp/tools/server/server-queue.h @@ -0,0 +1,197 @@ +#pragma once + +#include "server-task.h" + +#include <condition_variable> +#include <deque> +#include <mutex> +#include <vector> +#include <unordered_set> + +// struct for managing server tasks +// in most cases, use server_response_reader to post new tasks and retrieve results +struct server_queue { +private: + int id = 0; + bool running = false; + bool sleeping = false; + bool req_stop_sleeping = false; + int64_t time_last_task = 0; + + // queues + std::deque<server_task> queue_tasks; + std::deque<server_task> queue_tasks_deferred; + + std::mutex mutex_tasks; + std::condition_variable condition_tasks; + + // callback functions + std::function<void(server_task &&)> callback_new_task; + std::function<void(void)> callback_update_slots; + std::function<void(bool)> callback_sleeping_state; + +public: + // Add a new task to the end of the queue + int post(server_task && task, bool front = false); + + // multi-task version of post() + int post(std::vector<server_task> && tasks, bool front = false); + + // Add a new task, but defer until one slot is available + void defer(server_task && task); + + // Get the next id for creating a new task + int get_new_id(); + + // Call when the state of one slot is changed, it will move one task from deferred to main queue + // prioritize tasks that use the specified slot (otherwise, pop the first deferred task) + void pop_deferred_task(int id_slot); + + // if sleeping, request exiting sleep state and wait until it is done + // returns immediately if not sleeping + void wait_until_no_sleep(); + + bool is_sleeping() { + std::unique_lock<std::mutex> lock(mutex_tasks); + return sleeping; + } + + // end the start_loop routine + void terminate(); + + /** + * Main loop consists of these steps: + * - Wait until a new task arrives + * - Process the task (i.e. maybe copy data into slot) + * - Check if multitask is finished + * - Update all slots + * + * Sleeping procedure (disabled if idle_sleep_ms < 0): + * - If there is no task after idle_sleep_ms, enter sleeping state + * - Call callback_sleeping_state(true) + * - Wait until req_stop_sleeping is set to true + * - Call callback_sleeping_state(false) + * - Exit sleeping state + */ + void start_loop(int64_t idle_sleep_ms = -1); + + // for metrics + size_t queue_tasks_deferred_size() { + std::unique_lock<std::mutex> lock(mutex_tasks); + return queue_tasks_deferred.size(); + } + + // + // Functions below are not thread-safe, must only be used before start_loop() is called + // + + // Register function to process a new task + void on_new_task(std::function<void(server_task &&)> callback) { + callback_new_task = std::move(callback); + } + + // Register the function to be called when all slots data is ready to be processed + void on_update_slots(std::function<void(void)> callback) { + callback_update_slots = std::move(callback); + } + + // Register callback for sleeping state change + // note: when entering sleeping state, the callback is called AFTER sleeping is set to true + // when leaving sleeping state, the callback is called BEFORE sleeping is set to false + void on_sleeping_state(std::function<void(bool)> callback) { + callback_sleeping_state = std::move(callback); + } + +private: + void cleanup_pending_task(int id_target); +}; + +// struct for managing server responses +// in most cases, use server_response_reader to retrieve results +struct server_response { +private: + bool running = true; + + // for keeping track of all tasks waiting for the result + std::unordered_set<int> waiting_task_ids; + + // the main result queue (using ptr for polymorphism) + std::vector<server_task_result_ptr> queue_results; + + std::mutex mutex_results; + std::condition_variable condition_results; + +public: + // add the id_task to the list of tasks waiting for response + void add_waiting_task_id(int id_task); + + void add_waiting_task_ids(const std::unordered_set<int> & id_tasks); + + // when the request is finished, we can remove task associated with it + void remove_waiting_task_id(int id_task); + + // remove multiple tasks from waiting list + void remove_waiting_task_ids(const std::unordered_set<int> & id_tasks); + + // This function blocks the thread until there is a response for one of the id_tasks + server_task_result_ptr recv(const std::unordered_set<int> & id_tasks); + + // same as recv(), but have timeout in seconds + // if timeout is reached, nullptr is returned + server_task_result_ptr recv_with_timeout(const std::unordered_set<int> & id_tasks, int timeout); + + // single-task version of recv() + server_task_result_ptr recv(int id_task); + + // Send a new result to a waiting id_task + void send(server_task_result_ptr && result); + + // terminate the waiting loop + void terminate(); +}; + +// utility class to make working with server_queue and server_response easier +// it provides a generator-like API for server responses +// support pooling connection state and aggregating multiple results +struct server_response_reader { + std::unordered_set<int> id_tasks; + server_queue & queue_tasks; + server_response & queue_results; + size_t received_count = 0; + bool cancelled = false; + int polling_interval_seconds; + + // tracking generation state and partial tool calls + // only used by streaming completions + std::vector<task_result_state> states; + + // should_stop function will be called each polling_interval_seconds + server_response_reader(server_queue & queue_tasks, server_response & queue_results, int polling_interval_seconds) + : queue_tasks(queue_tasks), queue_results(queue_results), polling_interval_seconds(polling_interval_seconds) {} + ~server_response_reader() { + stop(); + } + + int get_new_id() { + return queue_tasks.get_new_id(); + } + + // if front = true, the task will be posted to the front of the queue (high priority) + void post_task(server_task && task, bool front = false); + void post_tasks(std::vector<server_task> && tasks, bool front = false); + bool has_next() const; + + // return nullptr if should_stop() is true before receiving a result + // note: if one error is received, it will stop further processing and return error result + server_task_result_ptr next(const std::function<bool()> & should_stop); + + struct batch_response { + bool is_terminated = false; // if true, indicates that processing was stopped before all results were received + std::vector<server_task_result_ptr> results; + server_task_result_ptr error; // nullptr if no error + }; + // aggregate multiple results + batch_response wait_for_all(const std::function<bool()> & should_stop); + + void stop(); +}; |
