1#include "worker-pool.h"
2
3#include <qurt.h>
4#include <stdatomic.h>
5#include <stdint.h>
6#include <stdio.h>
7#include <stdlib.h>
8#include <string.h>
9
10#include "HAP_farf.h"
11
12#define WORKER_THREAD_STACK_SZ (2 * 16384)
13#define LOWEST_USABLE_QURT_PRIO (254)
14
15struct worker_pool_s;
16
17// internal structure kept in thread-local storage per instance of worker pool
18typedef struct {
19 struct worker_pool_s * pool;
20 unsigned int id;
21} worker_context_t;
22
23// internal structure kept in thread-local storage per instance of worker pool
24typedef struct worker_pool_s {
25 worker_pool_job_t job[MAX_NUM_WORKERS]; // list of job descriptors
26 qurt_thread_t thread[MAX_NUM_WORKERS]; // thread ID's of the workers
27 worker_context_t context[MAX_NUM_WORKERS]; // worker contexts
28 void * stack[MAX_NUM_WORKERS]; // thread stack pointers
29 unsigned int n_threads; // number of workers in this pool
30
31 atomic_uint seqn; // seqno used to detect new jobs
32 atomic_uint next_job; // next job index
33 atomic_uint n_pending; // number of pending jobs
34 atomic_uint n_jobs; // number of current jobs
35 atomic_bool killed; // threads need to exit
36} worker_pool_t;
37
38static void worker_pool_main(void * context) {
39 worker_context_t * me = (worker_context_t *) context;
40 worker_pool_t * pool = me->pool;
41
42 FARF(HIGH, "worker-pool: thread %u started", me->id);
43
44 unsigned int prev_seqn = 0;
45 while (!atomic_load(&pool->killed)) {
46 unsigned int seqn = atomic_load(&pool->seqn);
47 if (seqn == prev_seqn) {
48 // Nothing to do
49 qurt_futex_wait(&pool->seqn, prev_seqn);
50 continue;
51 }
52
53 // New job
54 prev_seqn = seqn;
55
56 unsigned int n = atomic_load(&pool->n_jobs);
57 unsigned int i = atomic_fetch_add(&pool->next_job, 1);
58 if (i >= n) {
59 // Spurios wakeup
60 continue;
61 }
62
63 pool->job[i].func(n, i, pool->job[i].data);
64
65 atomic_fetch_sub(&pool->n_pending, 1);
66 }
67
68 FARF(HIGH, "worker-pool: thread %u stopped", me->id);
69}
70
71AEEResult worker_pool_init_with_stack_size(worker_pool_context_t * context, uint32_t n_threads, uint32_t stack_size) {
72 int err = 0;
73
74 if (NULL == context) {
75 FARF(ERROR, "NULL context passed to worker_pool_init().");
76 return AEE_EBADPARM;
77 }
78
79 // Allocations
80 int size = (stack_size * n_threads) + (sizeof(worker_pool_t));
81
82 unsigned char * mem_blob = (unsigned char *) malloc(size);
83 if (!mem_blob) {
84 FARF(ERROR, "Could not allocate memory for worker pool!!");
85 return AEE_ENOMEMORY;
86 }
87
88 worker_pool_t * me = (worker_pool_t *) (mem_blob + stack_size * n_threads);
89
90 // name for the first worker, useful in debugging threads
91 char name[19];
92 snprintf(name, 12, "0x%8x:", (int) me);
93 strcat(name, "worker0");
94 me->n_threads = n_threads;
95
96 // initializations
97 for (unsigned int i = 0; i < me->n_threads; i++) {
98 me->stack[i] = NULL;
99 me->thread[i] = 0;
100
101 me->context[i].id = i;
102 me->context[i].pool = me;
103 }
104
105 // initialize job queue
106 me->n_pending = 0;
107 me->n_jobs = 0;
108 me->next_job = 0;
109 me->seqn = 0;
110 me->killed = 0;
111
112 // launch the workers
113 qurt_thread_attr_t attr;
114 qurt_thread_attr_init(&attr);
115
116 for (unsigned int i = 0; i < me->n_threads; i++) {
117 // set up stack
118 me->stack[i] = mem_blob;
119 mem_blob += stack_size;
120 qurt_thread_attr_set_stack_addr(&attr, me->stack[i]);
121 qurt_thread_attr_set_stack_size(&attr, stack_size);
122
123 // set up name
124 qurt_thread_attr_set_name(&attr, name);
125 name[17] = (name[17] + 1);
126 // name threads context:worker0, context:worker1, .. (recycle at 9, but num threads should be less than that anyway)
127 if (name[17] > '9') {
128 name[17] = '0';
129 }
130
131 // set up priority - by default, match the creating thread's prio
132 int prio = qurt_thread_get_priority(qurt_thread_get_id());
133
134 if (prio < 1) {
135 prio = 1;
136 }
137 if (prio > LOWEST_USABLE_QURT_PRIO) {
138 prio = LOWEST_USABLE_QURT_PRIO;
139 }
140
141 qurt_thread_attr_set_priority(&attr, prio);
142
143 // launch
144 err = qurt_thread_create(&me->thread[i], &attr, worker_pool_main, (void *) &me->context[i]);
145 if (err) {
146 FARF(ERROR, "Could not launch worker threads!");
147 worker_pool_release((worker_pool_context_t *) &me);
148 return AEE_EQURTTHREADCREATE;
149 }
150 }
151 *context = (worker_pool_context_t *) me;
152 return AEE_SUCCESS;
153}
154
155AEEResult worker_pool_init(worker_pool_context_t * context, uint32_t n_threads) {
156 return worker_pool_init_with_stack_size(context, n_threads, WORKER_THREAD_STACK_SZ);
157}
158
159// clean up worker pool
160void worker_pool_release(worker_pool_context_t * context) {
161 worker_pool_t * me = (worker_pool_t *) *context;
162
163 // if no worker pool exists, return error.
164 if (NULL == me) {
165 return;
166 }
167
168 atomic_store(&me->killed, 1);
169 atomic_fetch_add(&me->seqn, 1);
170 qurt_futex_wake(&me->seqn, me->n_threads);
171
172 // de-initializations
173 for (unsigned int i = 0; i < me->n_threads; i++) {
174 if (me->thread[i]) {
175 int status;
176 (void) qurt_thread_join(me->thread[i], &status);
177 }
178 }
179
180 // free allocated memory (were allocated as a single buffer starting at stack[0])
181 if (me->stack[0]) {
182 free(me->stack[0]);
183 }
184
185 *context = NULL;
186}
187
188// run jobs
189AEEResult worker_pool_run_jobs(worker_pool_context_t context, worker_pool_job_t * job, unsigned int n) {
190 worker_pool_t * me = (worker_pool_t *) context;
191 if (NULL == me) {
192 FARF(ERROR, "worker-pool: invalid context");
193 return AEE_EBADPARM;
194 }
195
196 if (n > me->n_threads) {
197 FARF(ERROR, "worker-pool: invalid number of jobs %u for n-threads %u", n, me->n_threads);
198 return AEE_EBADPARM;
199 }
200
201 memcpy(me->job, job, sizeof(worker_pool_job_t) * n);
202
203 if (n > 1) {
204 atomic_store(&me->next_job, 1);
205 atomic_store(&me->n_jobs, n);
206 atomic_store(&me->n_pending, n - 1);
207
208 // wake up workers
209 atomic_fetch_add(&me->seqn, 1);
210 qurt_futex_wake(&me->seqn, n - 1);
211 }
212
213 // main thread runs job #0
214 me->job[0].func(n, 0, me->job[0].data);
215
216 if (n > 1) {
217 while (atomic_load(&me->n_pending))
218 ;
219 }
220
221 return 0;
222}
223
224// run func
225AEEResult worker_pool_run_func(worker_pool_context_t context, worker_callback_t func, void * data, unsigned int n) {
226 worker_pool_job_t job[n];
227
228 for (unsigned int i = 0; i < n; i++) {
229 job[i].func = func;
230 job[i].data = data;
231 }
232
233 return worker_pool_run_jobs(context, job, n);
234}
235
236AEEResult worker_pool_set_thread_priority(worker_pool_context_t context, unsigned int prio) {
237 worker_pool_t * me = (worker_pool_t *) context;
238
239 // if no worker pool exists, return error.
240 if (!me) {
241 return AEE_ENOMORE;
242 }
243
244 int result = AEE_SUCCESS;
245 if (prio < 1) {
246 prio = 1;
247 }
248 if (prio > LOWEST_USABLE_QURT_PRIO) {
249 prio = LOWEST_USABLE_QURT_PRIO;
250 }
251
252 for (unsigned int i = 0; i < me->n_threads; i++) {
253 int res = qurt_thread_set_priority(me->thread[i], (unsigned short) prio);
254 if (0 != res) {
255 result = AEE_EBADPARM;
256 FARF(ERROR, "QURT failed to set priority of thread %d, ERROR = %d", me->thread[i], res);
257 }
258 }
259
260 return result;
261}
262
263AEEResult worker_pool_retrieve_thread_id(worker_pool_context_t context, unsigned int * tids) {
264 worker_pool_t * me = (worker_pool_t *) context;
265 if (!me) {
266 FARF(ERROR, "worker-pool: invalid context");
267 return AEE_EBADPARM;
268 ;
269 }
270
271 for (int i = 0; i < me->n_threads; i++) {
272 tids[i] = me->thread[i];
273 }
274
275 return AEE_SUCCESS;
276}
277
278AEEResult worker_pool_get_thread_priority(worker_pool_context_t context, unsigned int * prio) {
279 worker_pool_t * me = (worker_pool_t *) context;
280 if (!me) {
281 FARF(ERROR, "worker-pool: invalid context");
282 return AEE_EBADPARM;
283 }
284
285 int priority = qurt_thread_get_priority(me->thread[0]);
286 if (priority > 0) {
287 *prio = priority;
288 return 0;
289 } else {
290 *prio = 0;
291 return AEE_EBADSTATE;
292 }
293}