DTS Application Library  0.2.3
Application library containing referenced objects and interfaces to common libraries
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Groups Pages
thread.c
Go to the documentation of this file.
1 /*
2 Copyright (C) 2012 Gregory Nietsky <gregory@distrotetch.co.za>
3  http://www.distrotech.co.za
4 
5 This program is free software: you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation, either version 3 of the License, or
8 (at your option) any later version.
9 
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14 
15 You should have received a copy of the GNU General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18 
27 #include <pthread.h>
28 #include <signal.h>
29 #include <unistd.h>
30 #include <stdint.h>
31 
32 #include "include/dtsapp.h"
33 
36 enum threadopt {
38  TL_THREAD_NONE = 1 << 0,
40  TL_THREAD_RUN = 1 << 1,
42  TL_THREAD_DONE = 1 << 2,
45  TL_THREAD_JOIN = 1 << 3,
48  TL_THREAD_STOP = 1 << 4,
49 
53  TL_THREAD_JOINABLE = 1 << 17,
54  TL_THREAD_RETURN = 1 << 18
55 };
56 
58 struct thread_pvt {
60  void *data;
62  pthread_t thr;
75 };
76 
80  struct bucket_list *list;
83 };
84 
86 struct threadcontainer *threads = NULL;
87 
93 
94 static int32_t hash_thread(const void *data, int key) {
95  const struct thread_pvt *thread = data;
96  const pthread_t *th = (key) ? data : &thread->thr;
97  return jenhash(th, sizeof(pthread_t), 0);
98 }
99 
100 static void close_threads(void *data) {
101  struct threadcontainer *tc = data;
102 
103  if (tc->list) {
104  objunref(tc->list);
105  }
106 
107  if (tc->manager) {
108  objunref(tc->manager);
109  tc->manager = NULL;
110  }
111  threads = NULL;
112 }
113 
114 static void free_thread(void *data) {
115  struct thread_pvt *thread = data;
116 
117  if (thread->data) {
118  objunref(thread->data);
119  }
120 }
121 
122 static struct thread_pvt *get_thread_from_id() {
123  struct thread_pvt *thr;
124  struct threadcontainer *tc;
125  pthread_t me;
126 
127  if (!(tc = (objref(threads)) ? threads : NULL)) {
128  return NULL;
129  }
130  me = pthread_self();
131 
132  objlock(tc);
133  thr = bucket_list_find_key(tc->list, &me);
134  objunlock(tc);
135  objunref(tc);
136  return thr;
137 }
138 
139 
143 extern int framework_threadok() {
144  struct thread_pvt *thr;
145  int ret;
146 
147  thr = get_thread_from_id();
148  ret =(thr) ? testflag(thr, TL_THREAD_RUN) : 0;
149  objunref(thr);
150 
151  return ret;
152 }
153 
154 /*
155  * close all threads when we get SIGHUP
156  */
157 static int manager_sig(int sig, void *data) {
158 #ifndef __WIN32
159  struct thread_pvt *thread;
160 
161  if (!(thread = get_thread_from_id())) {
162  return 0;
163  }
164 
165 
166  switch(sig) {
167  case SIGHUP:
168  clearflag(thread, TL_THREAD_RUN);
169  break;
170  case SIGINT:
171  case SIGTERM:
172  clearflag(thread, TL_THREAD_RUN);
173  setflag(thread, TL_THREAD_STOP);
174  }
175  objunref(thread);
176  return 1;
177 #else
178  return 0;
179 #endif
180 }
181 
182 /* if im here im the last thread*/
183 static void manage_clean(void *data) {
184 
185  /*make sure im still here when turning off*/
186  objlock(threads);
187  thread_can_start = 0;
188  objunlock(threads);
189  objunref(threads);
190 }
191 
192 static void stop_threads(void *data, void *data2) {
193  struct thread_pvt *thread = data;
194  struct thread_pvt *man = data2;
195 
196  /*Dont footbullet*/
197  if (!pthread_equal(man->thr, thread->thr)) {
198  if (thread->sighandler) {
199  pthread_kill(thread->thr, SIGTERM);
200  }
201  if (testflag(thread, TL_THREAD_CAN_CANCEL) && testflag(thread, TL_THREAD_RUN)) {
202  pthread_cancel(thread->thr);
203  }
204  clearflag(thread, TL_THREAD_RUN);
205  }
206 }
207 
208 /*
209  * loop through all threads till they stoped
210  * setting stop will flag threads to stop
211  */
212 static void *managethread(void *data) {
213  struct thread_pvt *thread;
214  int last = 0;
215 
216  if (!(thread = get_thread_from_id())) {
217  return NULL;
218  }
219 
220  for(;;) {
221  /*if im the last one leave this is done locked to make sure no items are added/removed*/
222  objlock(threads);
223  if (!(bucket_list_cnt(threads->list) - last)) {
224  if (threads->manager) {
225  objunref(threads->manager);
226  threads->manager = NULL;
227  }
228  objunlock(threads);
229  objunref(thread);
230  break;
231  }
232  objunlock(threads);
233 
234  /* Ive been joined so i can leave when im alone*/
235  if (testflag(thread, TL_THREAD_JOIN)) {
236  clearflag(thread, TL_THREAD_JOIN);
237  last = 1;
238  }
239 
240  /*Cancel all running threads*/
241  if (testflag(thread, TL_THREAD_STOP)) {
242  clearflag(thread, TL_THREAD_STOP);
243  /* Stop any more threads*/
244  objlock(threads);
245  if (threads->manager) {
246  objunref(threads->manager);
247  threads->manager = NULL;
248  }
249  objunlock(threads);
250 
251  /* cancel all threads now that they stoped*/
252  bucketlist_callback(threads->list, stop_threads, thread);
253  last = 1;
254  }
255 #ifdef __WIN32__
256  Sleep(1000);
257 #else
258  sleep(1);
259 #endif
260  }
261  return NULL;
262 }
263 
268 extern int startthreads(void) {
269  struct threadcontainer *tc;
270 
271  tc = (objref(threads)) ? threads : NULL;
272 
273  if (tc) {
274  objunref(tc);
275  return 1;
276  }
277 
278  if (!(tc = objalloc(sizeof(*threads), close_threads))) {
279  return 0;
280  }
281 
282  if (!tc->list && !(tc->list = create_bucketlist(4, hash_thread))) {
283  objunref(tc);
284  return 0;
285  }
286 
287  threads = tc;
288  if (!(tc->manager = framework_mkthread(managethread, manage_clean, manager_sig, NULL, THREAD_OPTION_JOINABLE | THREAD_OPTION_RETURN))) {
289  objunref(tc);
290  return 0;
291  }
292 
293  return 1;
294 }
295 
303 extern void stopthreads(int join) {
304  struct threadcontainer *tc;
305 
306  tc = (objref(threads)) ? threads : NULL;
307  if (!tc) {
308  return;
309  }
310 
311  objlock(tc);
312  if (tc->manager) {
314  if (join) {
316  objunlock(tc);
317  pthread_join(tc->manager->thr, NULL);
318  } else {
319  objunlock(tc);
320  }
321  } else {
322  objunlock(tc);
323  }
324  objunlock(tc);
325  objunref(tc);
326 }
327 
328 static void thread_cleanup(void *data) {
329  struct thread_pvt *thread = data;
330 
331  /*remove from thread list manager unrefs threads in cleanup run 1st*/
332  remove_bucket_item(threads->list, thread);
333 
334  /*Run cleanup*/
335  clearflag(thread, TL_THREAD_RUN);
336  setflag(thread, TL_THREAD_DONE);
337  if (thread->cleanup) {
338  thread->cleanup(thread->data);
339  }
340 
341  /*remove thread reference*/
342  objunref(thread);
343 }
344 
345 static void *threadwrap(void *data) {
346  struct thread_pvt *thread = data;
347  void *ret = NULL;
348  int cnt;
349 
350  objref(thread);
351 
352  for(cnt = 0;!testflag(thread, TL_THREAD_RUN) && (cnt < 100); cnt++) {
353  usleep(1000);
354  }
355 
356  if (cnt == 100) {
357  return NULL;
358  }
359 
360  pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL);
361  if (!testflag(thread, TL_THREAD_CAN_CANCEL)) {
362  pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
363  }
364 
365  if (!testflag(thread, TL_THREAD_JOINABLE)) {
366  pthread_detach(thread->thr);
367  }
368 
369  pthread_cleanup_push(thread_cleanup, thread);
370  ret = thread->func(thread->data);
371  pthread_cleanup_pop(1);
372 
373  return (ret);
374 }
375 
387 extern struct thread_pvt *framework_mkthread(threadfunc func, threadcleanup cleanup, threadsighandler sig_handler, void *data, int flags) {
388  struct thread_pvt *thread;
389  struct threadcontainer *tc = NULL;
390 
391  /*Grab a reference for threads in this scope start up if we can*/
392  if (!(tc = (objref(threads)) ? threads : NULL)) {
393  if (!thread_can_start) {
394  return NULL;
395  } else if (!startthreads()) {
396  return NULL;
397  }
398  if (!(tc = (objref(threads)) ? threads : NULL)) {
399  return NULL;
400  }
401  }
402 
403  objlock(tc);
404  /* dont allow threads if no manager or it not started*/
405  if ((!tc->manager || !func) && (func != managethread)) {
406  /*im shuting down*/
407  objunlock(tc);
408  objunref(tc);
409  return NULL;
410  } else if (!(thread = objalloc(sizeof(*thread), free_thread))) {
411  /* could not create*/
412  objunlock(tc);
413  objunref(tc);
414  return NULL;
415  }
416 
417  thread->data = (objref(data)) ? data : NULL;
418  thread->flags = flags << 16;
419  thread->cleanup = cleanup;
420  thread->sighandler = sig_handler;
421  thread->func = func;
422  objunlock(tc);
423 
424  /* start thread and check it*/
425  if (pthread_create(&thread->thr, NULL, threadwrap, thread) || pthread_kill(thread->thr, 0)) {
426  objunref(thread);
427  objunref(tc);
428  return NULL;
429  }
430 
431  /*Activate the thread it needs to be flaged to run or it will die*/
432  objlock(tc);
433  addtobucket(tc->list, thread);
434  setflag(thread, TL_THREAD_RUN);
435  objunlock(tc);
436  objunref(tc);
437 
438  if (testflag(thread, TL_THREAD_RETURN)) {
439  return thread;
440  } else {
441  objunref(thread);
442  return NULL;
443  }
444 }
445 
450 extern void jointhreads(void) {
451  struct threadcontainer *tc;
452 
453  tc = (objref(threads)) ? threads : NULL;
454  if (!tc) {
455  return;
456  }
457 
458  objlock(tc);
459  if (tc->manager) {
461  objunlock(tc);
462  pthread_join(tc->manager->thr, NULL);
463  } else {
464  objunlock(tc);
465  }
466  objunref(tc);
467 }
468 
469 
470 #ifndef __WIN32
471 static int handle_thread_signal(struct thread_pvt *thread, int sig) {
472  int ret;
473 
474  if (thread->sighandler) {
475  thread->sighandler(sig, thread->data);
476  ret = 1;
477  } else {
478  ret = -1;
479  }
480  return ret;
481 }
482 #endif
483 
496 extern int thread_signal(int sig) {
497  int ret = 0;
498 #ifndef __WIN32
499  struct thread_pvt *thread = NULL;
500 
501  if (!(thread = get_thread_from_id())) {
502  return 0;
503  }
504 
505  switch(sig) {
506  case SIGUSR1:
507  case SIGUSR2:
508  case SIGHUP:
509  case SIGALRM:
510  ret = handle_thread_signal(thread, sig);
511  break;
512  case SIGINT:
513  case SIGTERM:
514  ret = handle_thread_signal(thread, sig);
515  }
516  objunref(thread);
517 #endif
518  return ret;
519 }
520 
threadcleanup cleanup
Thread cleanup callback.
Definition: thread.c:65
thread struct used to create threads data needs to be first element
Definition: thread.c:58
void jointhreads(void)
Join the manager thread.
Definition: thread.c:450
void * create_bucketlist(int bitmask, blisthash hash_function)
Definition: refobj.c:356
Quit when only manager is left.
Definition: thread.c:48
int objref(void *data)
Reference a object.
Definition: refobj.c:153
Create the the thread joinable only do this if you will be joining it cancelable threads are best det...
Definition: dtsapp.h:122
Global threads data.
Definition: thread.c:78
#define testflag(obj, flag)
Atomically test a flag in the flags field of a referenced object.
Definition: dtsapp.h:932
void *(* threadfunc)(void *)
Thread function.
Definition: dtsapp.h:245
void stopthreads(int join)
Signal manager to stop and cancel all running threads.
Definition: thread.c:303
int objlock(void *data)
Lock the reference.
Definition: refobj.c:269
threadsighandler sighandler
Thread signal handler.
Definition: thread.c:71
threadfunc func
Thread function.
Definition: thread.c:68
int thread_can_start
Automatically start manager thread.
Definition: thread.c:92
Quit when only manager is left.
Definition: thread.c:45
void * objalloc(int size, objdestroy)
Allocate a referenced lockable object.
Definition: refobj.c:129
No status.
Definition: thread.c:38
int startthreads(void)
Initialise the threadlist and start manager thread.
Definition: thread.c:268
int bucket_list_cnt(struct bucket_list *blist)
Return number of items in the list.
Definition: refobj.c:552
#define setflag(obj, flag)
Atomically set a flag in the flags field of a referenced object.
Definition: dtsapp.h:925
#define clearflag(obj, flag)
Atomically clear a flag in the flags field of a referenced object.
Definition: dtsapp.h:918
DTS Application library API Include file.
Flag to enable pthread_cancel calls.
Definition: thread.c:51
struct thread_pvt * framework_mkthread(threadfunc, threadcleanup, threadsighandler, void *data, int flags)
create a thread result must be unreferenced
Definition: thread.c:387
Return reference to thread this must be unreferenced.
Definition: dtsapp.h:124
struct bucket_list * list
Hashed bucket list of threads.
Definition: thread.c:80
void * data
Reference to data held on thread creation.
Definition: thread.c:60
int thread_signal(int sig)
Handle signal if its for me.
Definition: thread.c:496
enum threadopt flags
thread options
Definition: thread.c:74
pthread_t thr
Thread information.
Definition: thread.c:62
int objunlock(void *data)
Unlock a reference.
Definition: refobj.c:301
int framework_threadok(void)
let threads check there status.
Definition: thread.c:143
thread is marked as complete
Definition: thread.c:42
thread is marked as running
Definition: thread.c:40
void(* threadcleanup)(void *)
Function called after thread termination.
Definition: dtsapp.h:238
struct threadcontainer * threads
Thread control data.
Definition: thread.c:86
threadopt
Thread status a thread can be disabled by unsetting TL_THREAD_RUN.
Definition: thread.c:36
int(* threadsighandler)(int, void *)
Thread signal handler function.
Definition: dtsapp.h:252
struct thread_pvt * manager
Manager thread.
Definition: thread.c:82
void bucketlist_callback(struct bucket_list *blist, blist_cb callback, void *data2)
Run a callback function on all items in the list.
Definition: refobj.c:613
#define jenhash(key, length, initval)
Define jenhash as hashlittle on big endian it should be hashbig.
Definition: dtsapp.h:914
Flag to enable pthread_cancel calls.
Definition: thread.c:53
int addtobucket(struct bucket_list *blist, void *data)
Add a reference to the bucketlist.
Definition: refobj.c:428
void * bucket_list_find_key(struct bucket_list *list, const void *key)
Find and return a reference to a item matching supplied key.
Definition: refobj.c:572
void remove_bucket_item(struct bucket_list *blist, void *data)
Remove and unreference a item from the list.
Definition: refobj.c:517
int objunref(void *data)
Drop reference held.
Definition: refobj.c:184
Bucket list, hold hashed objects in buckets.
Definition: refobj.c:75