summaryrefslogtreecommitdiff
path: root/llama.cpp/tools/server/server-queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'llama.cpp/tools/server/server-queue.h')
-rw-r--r--llama.cpp/tools/server/server-queue.h197
1 files changed, 197 insertions, 0 deletions
diff --git a/llama.cpp/tools/server/server-queue.h b/llama.cpp/tools/server/server-queue.h
new file mode 100644
index 0000000..164f09b
--- /dev/null
+++ b/llama.cpp/tools/server/server-queue.h
@@ -0,0 +1,197 @@
+#pragma once
+
+#include "server-task.h"
+
+#include <condition_variable>
+#include <deque>
+#include <mutex>
+#include <vector>
+#include <unordered_set>
+
+// struct for managing server tasks
+// in most cases, use server_response_reader to post new tasks and retrieve results
+struct server_queue {
+private:
+ int id = 0;
+ bool running = false;
+ bool sleeping = false;
+ bool req_stop_sleeping = false;
+ int64_t time_last_task = 0;
+
+ // queues
+ std::deque<server_task> queue_tasks;
+ std::deque<server_task> queue_tasks_deferred;
+
+ std::mutex mutex_tasks;
+ std::condition_variable condition_tasks;
+
+ // callback functions
+ std::function<void(server_task &&)> callback_new_task;
+ std::function<void(void)> callback_update_slots;
+ std::function<void(bool)> callback_sleeping_state;
+
+public:
+ // Add a new task to the end of the queue
+ int post(server_task && task, bool front = false);
+
+ // multi-task version of post()
+ int post(std::vector<server_task> && tasks, bool front = false);
+
+ // Add a new task, but defer until one slot is available
+ void defer(server_task && task);
+
+ // Get the next id for creating a new task
+ int get_new_id();
+
+ // Call when the state of one slot is changed, it will move one task from deferred to main queue
+ // prioritize tasks that use the specified slot (otherwise, pop the first deferred task)
+ void pop_deferred_task(int id_slot);
+
+ // if sleeping, request exiting sleep state and wait until it is done
+ // returns immediately if not sleeping
+ void wait_until_no_sleep();
+
+ bool is_sleeping() {
+ std::unique_lock<std::mutex> lock(mutex_tasks);
+ return sleeping;
+ }
+
+ // end the start_loop routine
+ void terminate();
+
+ /**
+ * Main loop consists of these steps:
+ * - Wait until a new task arrives
+ * - Process the task (i.e. maybe copy data into slot)
+ * - Check if multitask is finished
+ * - Update all slots
+ *
+ * Sleeping procedure (disabled if idle_sleep_ms < 0):
+ * - If there is no task after idle_sleep_ms, enter sleeping state
+ * - Call callback_sleeping_state(true)
+ * - Wait until req_stop_sleeping is set to true
+ * - Call callback_sleeping_state(false)
+ * - Exit sleeping state
+ */
+ void start_loop(int64_t idle_sleep_ms = -1);
+
+ // for metrics
+ size_t queue_tasks_deferred_size() {
+ std::unique_lock<std::mutex> lock(mutex_tasks);
+ return queue_tasks_deferred.size();
+ }
+
+ //
+ // Functions below are not thread-safe, must only be used before start_loop() is called
+ //
+
+ // Register function to process a new task
+ void on_new_task(std::function<void(server_task &&)> callback) {
+ callback_new_task = std::move(callback);
+ }
+
+ // Register the function to be called when all slots data is ready to be processed
+ void on_update_slots(std::function<void(void)> callback) {
+ callback_update_slots = std::move(callback);
+ }
+
+ // Register callback for sleeping state change
+ // note: when entering sleeping state, the callback is called AFTER sleeping is set to true
+ // when leaving sleeping state, the callback is called BEFORE sleeping is set to false
+ void on_sleeping_state(std::function<void(bool)> callback) {
+ callback_sleeping_state = std::move(callback);
+ }
+
+private:
+ void cleanup_pending_task(int id_target);
+};
+
+// struct for managing server responses
+// in most cases, use server_response_reader to retrieve results
+struct server_response {
+private:
+ bool running = true;
+
+ // for keeping track of all tasks waiting for the result
+ std::unordered_set<int> waiting_task_ids;
+
+ // the main result queue (using ptr for polymorphism)
+ std::vector<server_task_result_ptr> queue_results;
+
+ std::mutex mutex_results;
+ std::condition_variable condition_results;
+
+public:
+ // add the id_task to the list of tasks waiting for response
+ void add_waiting_task_id(int id_task);
+
+ void add_waiting_task_ids(const std::unordered_set<int> & id_tasks);
+
+ // when the request is finished, we can remove task associated with it
+ void remove_waiting_task_id(int id_task);
+
+ // remove multiple tasks from waiting list
+ void remove_waiting_task_ids(const std::unordered_set<int> & id_tasks);
+
+ // This function blocks the thread until there is a response for one of the id_tasks
+ server_task_result_ptr recv(const std::unordered_set<int> & id_tasks);
+
+ // same as recv(), but have timeout in seconds
+ // if timeout is reached, nullptr is returned
+ server_task_result_ptr recv_with_timeout(const std::unordered_set<int> & id_tasks, int timeout);
+
+ // single-task version of recv()
+ server_task_result_ptr recv(int id_task);
+
+ // Send a new result to a waiting id_task
+ void send(server_task_result_ptr && result);
+
+ // terminate the waiting loop
+ void terminate();
+};
+
+// utility class to make working with server_queue and server_response easier
+// it provides a generator-like API for server responses
+// support pooling connection state and aggregating multiple results
+struct server_response_reader {
+ std::unordered_set<int> id_tasks;
+ server_queue & queue_tasks;
+ server_response & queue_results;
+ size_t received_count = 0;
+ bool cancelled = false;
+ int polling_interval_seconds;
+
+ // tracking generation state and partial tool calls
+ // only used by streaming completions
+ std::vector<task_result_state> states;
+
+ // should_stop function will be called each polling_interval_seconds
+ server_response_reader(server_queue & queue_tasks, server_response & queue_results, int polling_interval_seconds)
+ : queue_tasks(queue_tasks), queue_results(queue_results), polling_interval_seconds(polling_interval_seconds) {}
+ ~server_response_reader() {
+ stop();
+ }
+
+ int get_new_id() {
+ return queue_tasks.get_new_id();
+ }
+
+ // if front = true, the task will be posted to the front of the queue (high priority)
+ void post_task(server_task && task, bool front = false);
+ void post_tasks(std::vector<server_task> && tasks, bool front = false);
+ bool has_next() const;
+
+ // return nullptr if should_stop() is true before receiving a result
+ // note: if one error is received, it will stop further processing and return error result
+ server_task_result_ptr next(const std::function<bool()> & should_stop);
+
+ struct batch_response {
+ bool is_terminated = false; // if true, indicates that processing was stopped before all results were received
+ std::vector<server_task_result_ptr> results;
+ server_task_result_ptr error; // nullptr if no error
+ };
+ // aggregate multiple results
+ batch_response wait_for_all(const std::function<bool()> & should_stop);
+
+ void stop();
+};