1#include "worker-pool.h"
  2
  3#include <qurt.h>
  4#include <stdatomic.h>
  5#include <stdint.h>
  6#include <stdio.h>
  7#include <stdlib.h>
  8#include <string.h>
  9
 10#include "HAP_farf.h"
 11
 12#define WORKER_THREAD_STACK_SZ  (2 * 16384)
 13#define LOWEST_USABLE_QURT_PRIO (254)
 14
 15struct worker_pool_s;
 16
 17// internal structure kept in thread-local storage per instance of worker pool
 18typedef struct {
 19    struct worker_pool_s * pool;
 20    unsigned int           id;
 21} worker_context_t;
 22
 23// internal structure kept in thread-local storage per instance of worker pool
 24typedef struct worker_pool_s {
 25    worker_pool_job_t job[MAX_NUM_WORKERS];      // list of job descriptors
 26    qurt_thread_t     thread[MAX_NUM_WORKERS];   // thread ID's of the workers
 27    worker_context_t  context[MAX_NUM_WORKERS];  // worker contexts
 28    void *            stack[MAX_NUM_WORKERS];    // thread stack pointers
 29    unsigned int      n_threads;                 // number of workers in this pool
 30
 31    atomic_uint seqn;                            // seqno used to detect new jobs
 32    atomic_uint next_job;                        // next job index
 33    atomic_uint n_pending;                       // number of pending jobs
 34    atomic_uint n_jobs;                          // number of current jobs
 35    atomic_bool killed;                          // threads need to exit
 36} worker_pool_t;
 37
 38static void worker_pool_main(void * context) {
 39    worker_context_t * me   = (worker_context_t *) context;
 40    worker_pool_t *    pool = me->pool;
 41
 42    FARF(HIGH, "worker-pool: thread %u started", me->id);
 43
 44    unsigned int prev_seqn = 0;
 45    while (!atomic_load(&pool->killed)) {
 46        unsigned int seqn = atomic_load(&pool->seqn);
 47        if (seqn == prev_seqn) {
 48            // Nothing to do
 49            qurt_futex_wait(&pool->seqn, prev_seqn);
 50            continue;
 51        }
 52
 53        // New job
 54        prev_seqn = seqn;
 55
 56        unsigned int n = atomic_load(&pool->n_jobs);
 57        unsigned int i = atomic_fetch_add(&pool->next_job, 1);
 58        if (i >= n) {
 59            // Spurios wakeup
 60            continue;
 61        }
 62
 63        pool->job[i].func(n, i, pool->job[i].data);
 64
 65        atomic_fetch_sub(&pool->n_pending, 1);
 66    }
 67
 68    FARF(HIGH, "worker-pool: thread %u stopped", me->id);
 69}
 70
 71AEEResult worker_pool_init_with_stack_size(worker_pool_context_t * context, uint32_t n_threads, uint32_t stack_size) {
 72    int err = 0;
 73
 74    if (NULL == context) {
 75        FARF(ERROR, "NULL context passed to worker_pool_init().");
 76        return AEE_EBADPARM;
 77    }
 78
 79    // Allocations
 80    int size = (stack_size * n_threads) + (sizeof(worker_pool_t));
 81
 82    unsigned char * mem_blob = (unsigned char *) malloc(size);
 83    if (!mem_blob) {
 84        FARF(ERROR, "Could not allocate memory for worker pool!!");
 85        return AEE_ENOMEMORY;
 86    }
 87
 88    worker_pool_t * me = (worker_pool_t *) (mem_blob + stack_size * n_threads);
 89
 90    // name for the first worker, useful in debugging threads
 91    char name[19];
 92    snprintf(name, 12, "0x%8x:", (int) me);
 93    strcat(name, "worker0");
 94    me->n_threads = n_threads;
 95
 96    // initializations
 97    for (unsigned int i = 0; i < me->n_threads; i++) {
 98        me->stack[i]  = NULL;
 99        me->thread[i] = 0;
100
101        me->context[i].id   = i;
102        me->context[i].pool = me;
103    }
104
105    // initialize job queue
106    me->n_pending = 0;
107    me->n_jobs    = 0;
108    me->next_job  = 0;
109    me->seqn      = 0;
110    me->killed    = 0;
111
112    // launch the workers
113    qurt_thread_attr_t attr;
114    qurt_thread_attr_init(&attr);
115
116    for (unsigned int i = 0; i < me->n_threads; i++) {
117        // set up stack
118        me->stack[i] = mem_blob;
119        mem_blob += stack_size;
120        qurt_thread_attr_set_stack_addr(&attr, me->stack[i]);
121        qurt_thread_attr_set_stack_size(&attr, stack_size);
122
123        // set up name
124        qurt_thread_attr_set_name(&attr, name);
125        name[17] = (name[17] + 1);
126        // name threads context:worker0, context:worker1, .. (recycle at 9, but num threads should be less than that anyway)
127        if (name[17] > '9') {
128            name[17] = '0';
129        }
130
131        // set up priority - by default, match the creating thread's prio
132        int prio = qurt_thread_get_priority(qurt_thread_get_id());
133
134        if (prio < 1) {
135            prio = 1;
136        }
137        if (prio > LOWEST_USABLE_QURT_PRIO) {
138            prio = LOWEST_USABLE_QURT_PRIO;
139        }
140
141        qurt_thread_attr_set_priority(&attr, prio);
142
143        // launch
144        err = qurt_thread_create(&me->thread[i], &attr, worker_pool_main, (void *) &me->context[i]);
145        if (err) {
146            FARF(ERROR, "Could not launch worker threads!");
147            worker_pool_release((worker_pool_context_t *) &me);
148            return AEE_EQURTTHREADCREATE;
149        }
150    }
151    *context = (worker_pool_context_t *) me;
152    return AEE_SUCCESS;
153}
154
155AEEResult worker_pool_init(worker_pool_context_t * context, uint32_t n_threads) {
156    return worker_pool_init_with_stack_size(context, n_threads, WORKER_THREAD_STACK_SZ);
157}
158
159// clean up worker pool
160void worker_pool_release(worker_pool_context_t * context) {
161    worker_pool_t * me = (worker_pool_t *) *context;
162
163    // if no worker pool exists, return error.
164    if (NULL == me) {
165        return;
166    }
167
168    atomic_store(&me->killed, 1);
169    atomic_fetch_add(&me->seqn, 1);
170    qurt_futex_wake(&me->seqn, me->n_threads);
171
172    // de-initializations
173    for (unsigned int i = 0; i < me->n_threads; i++) {
174        if (me->thread[i]) {
175            int status;
176            (void) qurt_thread_join(me->thread[i], &status);
177        }
178    }
179
180    // free allocated memory (were allocated as a single buffer starting at stack[0])
181    if (me->stack[0]) {
182        free(me->stack[0]);
183    }
184
185    *context = NULL;
186}
187
188// run jobs
189AEEResult worker_pool_run_jobs(worker_pool_context_t context, worker_pool_job_t * job, unsigned int n) {
190    worker_pool_t * me = (worker_pool_t *) context;
191    if (NULL == me) {
192        FARF(ERROR, "worker-pool: invalid context");
193        return AEE_EBADPARM;
194    }
195
196    if (n > me->n_threads) {
197        FARF(ERROR, "worker-pool: invalid number of jobs %u for n-threads %u", n, me->n_threads);
198        return AEE_EBADPARM;
199    }
200
201    memcpy(me->job, job, sizeof(worker_pool_job_t) * n);
202
203    if (n > 1) {
204        atomic_store(&me->next_job, 1);
205        atomic_store(&me->n_jobs, n);
206        atomic_store(&me->n_pending, n - 1);
207
208        // wake up workers
209        atomic_fetch_add(&me->seqn, 1);
210        qurt_futex_wake(&me->seqn, n - 1);
211    }
212
213    // main thread runs job #0
214    me->job[0].func(n, 0, me->job[0].data);
215
216    if (n > 1) {
217        while (atomic_load(&me->n_pending))
218            ;
219    }
220
221    return 0;
222}
223
224// run func
225AEEResult worker_pool_run_func(worker_pool_context_t context, worker_callback_t func, void * data, unsigned int n) {
226    worker_pool_job_t job[n];
227
228    for (unsigned int i = 0; i < n; i++) {
229        job[i].func = func;
230        job[i].data = data;
231    }
232
233    return worker_pool_run_jobs(context, job, n);
234}
235
236AEEResult worker_pool_set_thread_priority(worker_pool_context_t context, unsigned int prio) {
237    worker_pool_t * me = (worker_pool_t *) context;
238
239    // if no worker pool exists, return error.
240    if (!me) {
241        return AEE_ENOMORE;
242    }
243
244    int result = AEE_SUCCESS;
245    if (prio < 1) {
246        prio = 1;
247    }
248    if (prio > LOWEST_USABLE_QURT_PRIO) {
249        prio = LOWEST_USABLE_QURT_PRIO;
250    }
251
252    for (unsigned int i = 0; i < me->n_threads; i++) {
253        int res = qurt_thread_set_priority(me->thread[i], (unsigned short) prio);
254        if (0 != res) {
255            result = AEE_EBADPARM;
256            FARF(ERROR, "QURT failed to set priority of thread %d, ERROR = %d", me->thread[i], res);
257        }
258    }
259
260    return result;
261}
262
263AEEResult worker_pool_retrieve_thread_id(worker_pool_context_t context, unsigned int * tids) {
264    worker_pool_t * me = (worker_pool_t *) context;
265    if (!me) {
266        FARF(ERROR, "worker-pool: invalid context");
267        return AEE_EBADPARM;
268        ;
269    }
270
271    for (int i = 0; i < me->n_threads; i++) {
272        tids[i] = me->thread[i];
273    }
274
275    return AEE_SUCCESS;
276}
277
278AEEResult worker_pool_get_thread_priority(worker_pool_context_t context, unsigned int * prio) {
279    worker_pool_t * me = (worker_pool_t *) context;
280    if (!me) {
281        FARF(ERROR, "worker-pool: invalid context");
282        return AEE_EBADPARM;
283    }
284
285    int priority = qurt_thread_get_priority(me->thread[0]);
286    if (priority > 0) {
287        *prio = priority;
288        return 0;
289    } else {
290        *prio = 0;
291        return AEE_EBADSTATE;
292    }
293}