diff options
| author | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-02-12 20:57:17 +0100 |
|---|---|---|
| committer | Mitja Felicijan <mitja.felicijan@gmail.com> | 2026-02-12 20:57:17 +0100 |
| commit | b333b06772c89d96aacb5490d6a219fba7c09cc6 (patch) | |
| tree | 211df60083a5946baa2ed61d33d8121b7e251b06 /llama.cpp/ggml/src/ggml-hexagon/htp/worker-pool.c | |
| download | llmnpc-b333b06772c89d96aacb5490d6a219fba7c09cc6.tar.gz | |
Engage!
Diffstat (limited to 'llama.cpp/ggml/src/ggml-hexagon/htp/worker-pool.c')
| -rw-r--r-- | llama.cpp/ggml/src/ggml-hexagon/htp/worker-pool.c | 293 |
1 files changed, 293 insertions, 0 deletions
diff --git a/llama.cpp/ggml/src/ggml-hexagon/htp/worker-pool.c b/llama.cpp/ggml/src/ggml-hexagon/htp/worker-pool.c new file mode 100644 index 0000000..894815f --- /dev/null +++ b/llama.cpp/ggml/src/ggml-hexagon/htp/worker-pool.c @@ -0,0 +1,293 @@ +#include "worker-pool.h" + +#include <qurt.h> +#include <stdatomic.h> +#include <stdint.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "HAP_farf.h" + +#define WORKER_THREAD_STACK_SZ (2 * 16384) +#define LOWEST_USABLE_QURT_PRIO (254) + +struct worker_pool_s; + +// internal structure kept in thread-local storage per instance of worker pool +typedef struct { + struct worker_pool_s * pool; + unsigned int id; +} worker_context_t; + +// internal structure kept in thread-local storage per instance of worker pool +typedef struct worker_pool_s { + worker_pool_job_t job[MAX_NUM_WORKERS]; // list of job descriptors + qurt_thread_t thread[MAX_NUM_WORKERS]; // thread ID's of the workers + worker_context_t context[MAX_NUM_WORKERS]; // worker contexts + void * stack[MAX_NUM_WORKERS]; // thread stack pointers + unsigned int n_threads; // number of workers in this pool + + atomic_uint seqn; // seqno used to detect new jobs + atomic_uint next_job; // next job index + atomic_uint n_pending; // number of pending jobs + atomic_uint n_jobs; // number of current jobs + atomic_bool killed; // threads need to exit +} worker_pool_t; + +static void worker_pool_main(void * context) { + worker_context_t * me = (worker_context_t *) context; + worker_pool_t * pool = me->pool; + + FARF(HIGH, "worker-pool: thread %u started", me->id); + + unsigned int prev_seqn = 0; + while (!atomic_load(&pool->killed)) { + unsigned int seqn = atomic_load(&pool->seqn); + if (seqn == prev_seqn) { + // Nothing to do + qurt_futex_wait(&pool->seqn, prev_seqn); + continue; + } + + // New job + prev_seqn = seqn; + + unsigned int n = atomic_load(&pool->n_jobs); + unsigned int i = atomic_fetch_add(&pool->next_job, 1); + if (i >= n) { + // Spurios wakeup + continue; + } + + pool->job[i].func(n, i, pool->job[i].data); + + atomic_fetch_sub(&pool->n_pending, 1); + } + + FARF(HIGH, "worker-pool: thread %u stopped", me->id); +} + +AEEResult worker_pool_init_with_stack_size(worker_pool_context_t * context, uint32_t n_threads, uint32_t stack_size) { + int err = 0; + + if (NULL == context) { + FARF(ERROR, "NULL context passed to worker_pool_init()."); + return AEE_EBADPARM; + } + + // Allocations + int size = (stack_size * n_threads) + (sizeof(worker_pool_t)); + + unsigned char * mem_blob = (unsigned char *) malloc(size); + if (!mem_blob) { + FARF(ERROR, "Could not allocate memory for worker pool!!"); + return AEE_ENOMEMORY; + } + + worker_pool_t * me = (worker_pool_t *) (mem_blob + stack_size * n_threads); + + // name for the first worker, useful in debugging threads + char name[19]; + snprintf(name, 12, "0x%8x:", (int) me); + strcat(name, "worker0"); + me->n_threads = n_threads; + + // initializations + for (unsigned int i = 0; i < me->n_threads; i++) { + me->stack[i] = NULL; + me->thread[i] = 0; + + me->context[i].id = i; + me->context[i].pool = me; + } + + // initialize job queue + me->n_pending = 0; + me->n_jobs = 0; + me->next_job = 0; + me->seqn = 0; + me->killed = 0; + + // launch the workers + qurt_thread_attr_t attr; + qurt_thread_attr_init(&attr); + + for (unsigned int i = 0; i < me->n_threads; i++) { + // set up stack + me->stack[i] = mem_blob; + mem_blob += stack_size; + qurt_thread_attr_set_stack_addr(&attr, me->stack[i]); + qurt_thread_attr_set_stack_size(&attr, stack_size); + + // set up name + qurt_thread_attr_set_name(&attr, name); + name[17] = (name[17] + 1); + // name threads context:worker0, context:worker1, .. (recycle at 9, but num threads should be less than that anyway) + if (name[17] > '9') { + name[17] = '0'; + } + + // set up priority - by default, match the creating thread's prio + int prio = qurt_thread_get_priority(qurt_thread_get_id()); + + if (prio < 1) { + prio = 1; + } + if (prio > LOWEST_USABLE_QURT_PRIO) { + prio = LOWEST_USABLE_QURT_PRIO; + } + + qurt_thread_attr_set_priority(&attr, prio); + + // launch + err = qurt_thread_create(&me->thread[i], &attr, worker_pool_main, (void *) &me->context[i]); + if (err) { + FARF(ERROR, "Could not launch worker threads!"); + worker_pool_release((worker_pool_context_t *) &me); + return AEE_EQURTTHREADCREATE; + } + } + *context = (worker_pool_context_t *) me; + return AEE_SUCCESS; +} + +AEEResult worker_pool_init(worker_pool_context_t * context, uint32_t n_threads) { + return worker_pool_init_with_stack_size(context, n_threads, WORKER_THREAD_STACK_SZ); +} + +// clean up worker pool +void worker_pool_release(worker_pool_context_t * context) { + worker_pool_t * me = (worker_pool_t *) *context; + + // if no worker pool exists, return error. + if (NULL == me) { + return; + } + + atomic_store(&me->killed, 1); + atomic_fetch_add(&me->seqn, 1); + qurt_futex_wake(&me->seqn, me->n_threads); + + // de-initializations + for (unsigned int i = 0; i < me->n_threads; i++) { + if (me->thread[i]) { + int status; + (void) qurt_thread_join(me->thread[i], &status); + } + } + + // free allocated memory (were allocated as a single buffer starting at stack[0]) + if (me->stack[0]) { + free(me->stack[0]); + } + + *context = NULL; +} + +// run jobs +AEEResult worker_pool_run_jobs(worker_pool_context_t context, worker_pool_job_t * job, unsigned int n) { + worker_pool_t * me = (worker_pool_t *) context; + if (NULL == me) { + FARF(ERROR, "worker-pool: invalid context"); + return AEE_EBADPARM; + } + + if (n > me->n_threads) { + FARF(ERROR, "worker-pool: invalid number of jobs %u for n-threads %u", n, me->n_threads); + return AEE_EBADPARM; + } + + memcpy(me->job, job, sizeof(worker_pool_job_t) * n); + + if (n > 1) { + atomic_store(&me->next_job, 1); + atomic_store(&me->n_jobs, n); + atomic_store(&me->n_pending, n - 1); + + // wake up workers + atomic_fetch_add(&me->seqn, 1); + qurt_futex_wake(&me->seqn, n - 1); + } + + // main thread runs job #0 + me->job[0].func(n, 0, me->job[0].data); + + if (n > 1) { + while (atomic_load(&me->n_pending)) + ; + } + + return 0; +} + +// run func +AEEResult worker_pool_run_func(worker_pool_context_t context, worker_callback_t func, void * data, unsigned int n) { + worker_pool_job_t job[n]; + + for (unsigned int i = 0; i < n; i++) { + job[i].func = func; + job[i].data = data; + } + + return worker_pool_run_jobs(context, job, n); +} + +AEEResult worker_pool_set_thread_priority(worker_pool_context_t context, unsigned int prio) { + worker_pool_t * me = (worker_pool_t *) context; + + // if no worker pool exists, return error. + if (!me) { + return AEE_ENOMORE; + } + + int result = AEE_SUCCESS; + if (prio < 1) { + prio = 1; + } + if (prio > LOWEST_USABLE_QURT_PRIO) { + prio = LOWEST_USABLE_QURT_PRIO; + } + + for (unsigned int i = 0; i < me->n_threads; i++) { + int res = qurt_thread_set_priority(me->thread[i], (unsigned short) prio); + if (0 != res) { + result = AEE_EBADPARM; + FARF(ERROR, "QURT failed to set priority of thread %d, ERROR = %d", me->thread[i], res); + } + } + + return result; +} + +AEEResult worker_pool_retrieve_thread_id(worker_pool_context_t context, unsigned int * tids) { + worker_pool_t * me = (worker_pool_t *) context; + if (!me) { + FARF(ERROR, "worker-pool: invalid context"); + return AEE_EBADPARM; + ; + } + + for (int i = 0; i < me->n_threads; i++) { + tids[i] = me->thread[i]; + } + + return AEE_SUCCESS; +} + +AEEResult worker_pool_get_thread_priority(worker_pool_context_t context, unsigned int * prio) { + worker_pool_t * me = (worker_pool_t *) context; + if (!me) { + FARF(ERROR, "worker-pool: invalid context"); + return AEE_EBADPARM; + } + + int priority = qurt_thread_get_priority(me->thread[0]); + if (priority > 0) { + *prio = priority; + return 0; + } else { + *prio = 0; + return AEE_EBADSTATE; + } +} |
