blob: 382a9c744ea5c12927ca806ea0f4a4c077d86c15 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
|
#include "tpool.h"
#include <stdio.h>
#include <stdlib.h>
typedef struct ThreadPoolJobNode {
ThreadPoolJob job;
struct ThreadPoolJobNode *next;
} ThreadPoolJobNode;
struct ThreadPool {
pthread_mutex_t lock;
pthread_cond_t notify;
pthread_cond_t working_cond;
pthread_t *threads;
int num_threads;
ThreadPoolJobNode *queue_head;
ThreadPoolJobNode *queue_tail;
int active_jobs; // Jobs currently running
int queued_jobs; // Jobs waiting in queue
bool stop;
};
static void *tp_worker(void *arg) {
ThreadPool *pool = (ThreadPool *)arg;
while (1) {
pthread_mutex_lock(&pool->lock);
while (pool->queue_head == NULL && !pool->stop) {
pthread_cond_wait(&pool->notify, &pool->lock);
}
if (pool->stop && pool->queue_head == NULL) {
pthread_mutex_unlock(&pool->lock);
break;
}
ThreadPoolJobNode *node = pool->queue_head;
pool->queue_head = node->next;
if (pool->queue_head == NULL) {
pool->queue_tail = NULL;
}
pool->queued_jobs--;
pool->active_jobs++;
pthread_mutex_unlock(&pool->lock);
// Execute job
if (node->job.function) {
node->job.function(node->job.arg);
}
free(node);
pthread_mutex_lock(&pool->lock);
pool->active_jobs--;
if (pool->active_jobs == 0 && pool->queue_head == NULL) {
pthread_cond_signal(&pool->working_cond);
}
pthread_mutex_unlock(&pool->lock);
}
return NULL;
}
ThreadPool *tp_create(int num_threads) {
ThreadPool *pool = (ThreadPool *)malloc(sizeof(ThreadPool));
if (pool == NULL)
return NULL;
pool->num_threads = num_threads;
pool->queue_head = NULL;
pool->queue_tail = NULL;
pool->active_jobs = 0;
pool->queued_jobs = 0;
pool->stop = false;
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->notify, NULL);
pthread_cond_init(&pool->working_cond, NULL);
pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * num_threads);
for (int i = 0; i < num_threads; i++) {
pthread_create(&pool->threads[i], NULL, tp_worker, pool);
}
return pool;
}
void tp_add_job(ThreadPool *pool, thread_func_t function, void *arg) {
ThreadPoolJobNode *node = (ThreadPoolJobNode *)malloc(sizeof(ThreadPoolJobNode));
if (node == NULL) {
perror("malloc");
exit(EXIT_FAILURE);
}
node->job.function = function;
node->job.arg = arg;
node->next = NULL;
pthread_mutex_lock(&pool->lock);
if (pool->queue_tail) {
pool->queue_tail->next = node;
} else {
pool->queue_head = node;
}
pool->queue_tail = node;
pool->queued_jobs++;
pthread_cond_signal(&pool->notify);
pthread_mutex_unlock(&pool->lock);
}
void tp_wait(ThreadPool *pool) {
pthread_mutex_lock(&pool->lock);
while (pool->active_jobs > 0 || pool->queue_head != NULL) {
pthread_cond_wait(&pool->working_cond, &pool->lock);
}
pthread_mutex_unlock(&pool->lock);
}
void tp_destroy(ThreadPool *pool) {
pthread_mutex_lock(&pool->lock);
pool->stop = true;
pthread_cond_broadcast(&pool->notify);
pthread_mutex_unlock(&pool->lock);
for (int i = 0; i < pool->num_threads; i++) {
pthread_join(pool->threads[i], NULL);
}
free(pool->threads);
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->notify);
pthread_cond_destroy(&pool->working_cond);
free(pool);
}
|