swthrdpool.c 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. /************************************************************************
  2. * AUTHOR: NiuJiuRu
  3. * FILENAME: swthrdpool.c
  4. * CONTENT: 线程池
  5. * NOTE:
  6. * HISTORY:
  7. * 1, [2010-09-20] created by NiuJiuRu
  8. * 2, [2018-03-23] add "sw_thrdpool_create2()" function and redefine
  9. * "sw_thrdpool_create()" function
  10. ***********************************************************************/
  11. #include "swapi.h"
  12. #include "swmutex.h"
  13. #include "swmem.h"
  14. #include "swthrd.h"
  15. #include "swthrdpool.h"
  16. typedef struct
  17. {
  18. // 线程池中的线程
  19. void **hThrds;
  20. // 访问线程池的互斥锁
  21. void *hMutex;
  22. // 线程池的容量
  23. int maxNum;
  24. // 线程池中线程的优先级
  25. int priority;
  26. // 线程池中线程的堆栈大小
  27. int stackSize;
  28. // 被强行杀死的线程个数
  29. int killedNum;
  30. } SThrdPool;
  31. /* 创建线程池 */
  32. void *sw_thrdpool_create2(int max, int priority, int stack_size)
  33. {
  34. SThrdPool *tp = NULL;
  35. int size = 0;
  36. if(max <= 0) goto ErrorP;
  37. size = sizeof(SThrdPool) + max*sizeof(void *);
  38. tp = (SThrdPool *)sw_heap_malloc(size); // request memory
  39. if(!tp) goto ErrorP;
  40. memset(tp, 0, size);
  41. tp->hThrds = (void **)(tp+1);
  42. tp->hMutex = sw_mutex_create();
  43. if(!tp->hMutex) goto ErrorP;
  44. tp->maxNum = max;
  45. tp->priority = priority;
  46. tp->stackSize = stack_size;
  47. tp->killedNum = 0;
  48. return tp;
  49. ErrorP:
  50. sw_thrdpool_destroy(tp, 0);
  51. return NULL;
  52. }
  53. /* 销毁线程池, 超时前尝试安全的销毁, 超时后则强制销毁 */
  54. void sw_thrdpool_destroy(void *hPool, int timeout)
  55. {
  56. SThrdPool *tp = (SThrdPool *)hPool;
  57. int i = 0;
  58. if(!tp) return;
  59. for(i = 0; i < tp->maxNum; i++)
  60. {
  61. if(tp->hThrds[i]) { sw_thrd_destroy(tp->hThrds[i], timeout); tp->hThrds[i] = NULL; } // destroy thread
  62. }
  63. if(tp->hMutex) { sw_mutex_destroy(tp->hMutex); tp->hMutex = NULL; }
  64. sw_heap_free(tp);
  65. }
  66. /* 从线程池中分配一个线程, 成功后得到的线程默认处于暂停状态 */
  67. void *sw_thrdpool_alloc(void *hPool, PThrdProc proc, unsigned long wParam, unsigned long lParam, int timeout)
  68. {
  69. SThrdPool *tp = (SThrdPool *)hPool;
  70. void *hThrd = NULL; PThrdProc oldProc = NULL;
  71. char name[32] = { 0 }; int i = 0;
  72. if(!tp) return NULL;
  73. sw_mutex_lock(tp->hMutex, WAIT_FOREVER); // lock
  74. // 1, 寻找一个没有被分配出去并且已经退出(用户)回调函数的线程
  75. for(i = 0; i < tp->maxNum; i++)
  76. {
  77. if(!tp->hThrds[i]) continue;
  78. oldProc = NULL;
  79. sw_thrd_getProc(tp->hThrds[i], &oldProc, NULL, NULL);
  80. if(oldProc == NULL && sw_thrd_isBusy(tp->hThrds[i]) == false)
  81. {
  82. hThrd = tp->hThrds[i];
  83. if(!sw_thrd_isAlive(hThrd))
  84. {
  85. sw_thrd_destroy(hThrd, timeout); // destroy thread
  86. hThrd = tp->hThrds[i] = NULL;
  87. sprintf(name, "thread%d", i);
  88. hThrd = sw_thrd_create(name, tp->priority, tp->stackSize, proc, wParam, lParam);
  89. tp->hThrds[i] = hThrd;
  90. }
  91. else
  92. sw_thrd_setProc(hThrd, proc, wParam, lParam);
  93. break;
  94. }
  95. }
  96. // 2, 如果没有找到可用的线程, 并且该线程池未满, 则创建一个
  97. if(hThrd == NULL)
  98. {
  99. for(i = 0; i < tp->maxNum; i++)
  100. {
  101. if(tp->hThrds[i]) continue;
  102. sprintf(name, "thread%d", i);
  103. hThrd = sw_thrd_create(name, tp->priority, tp->stackSize, proc, wParam, lParam);
  104. tp->hThrds[i] = hThrd;
  105. break;
  106. }
  107. }
  108. // 3, 线程池满, 销毁一个还没有从(用户)回调函数中退出的可能已经异常的线程, 空出名额以创建新线程
  109. if(hThrd == NULL)
  110. {
  111. for(i = 0; i < tp->maxNum; i++)
  112. {
  113. if(!tp->hThrds[i]) continue;
  114. oldProc = NULL;
  115. sw_thrd_getProc(tp->hThrds[i], &oldProc, NULL, NULL);
  116. if(oldProc == NULL && sw_thrd_isBusy(tp->hThrds[i]) == true)
  117. {
  118. sw_thrd_destroy(tp->hThrds[i], timeout); // destroy thread
  119. tp->hThrds[i] = NULL;
  120. tp->killedNum += 1;
  121. sprintf(name, "thread%d", i);
  122. hThrd = sw_thrd_create(name, tp->priority, tp->stackSize, proc, wParam, lParam);
  123. tp->hThrds[i] = hThrd;
  124. break;
  125. }
  126. }
  127. }
  128. sw_mutex_unlock(tp->hMutex); // unlock
  129. //4, 没有分配到
  130. if(hThrd == NULL)
  131. {
  132. sw_log_fatal("sorry, alloc thread fail, because %p thread pool is full!!!", tp);
  133. sw_thrdpool_printf(tp);
  134. }
  135. return hThrd;
  136. }
  137. /* 从线程池中释放一个线程, 超时前尝试安全的关闭线程, 超时后则强制关闭线程 */
  138. void sw_thrdpool_free(void *hPool, void *hThrd, int timeout)
  139. {
  140. SThrdPool *tp = (SThrdPool *)hPool;
  141. int cnt = 0, i = 0;
  142. if(!tp || !hThrd) return;
  143. sw_mutex_lock(tp->hMutex, WAIT_FOREVER); // lock
  144. for(i = 0; i < tp->maxNum; i++)
  145. {
  146. if(tp->hThrds[i] != hThrd) continue;
  147. sw_thrd_pause(hThrd);
  148. sw_thrd_setProc(hThrd, NULL, 0, 0);
  149. while(cnt < timeout || timeout < 0)
  150. {
  151. if(sw_thrd_isBusy(hThrd))
  152. { // 等待线程的(用户)回调函数退出
  153. sw_mutex_unlock(tp->hMutex); // unlock
  154. sw_thrd_delay(1);
  155. sw_mutex_lock(tp->hMutex, WAIT_FOREVER); // lock
  156. cnt += 1;
  157. }
  158. else break;
  159. }
  160. if(sw_thrd_isBusy(hThrd))
  161. {
  162. sw_mutex_unlock(tp->hMutex); // unlock
  163. sw_thrd_destroy(hThrd, timeout); // destroy thread
  164. sw_mutex_lock(tp->hMutex, WAIT_FOREVER); // lock
  165. tp->hThrds[i] = NULL;
  166. tp->killedNum += 1;
  167. }
  168. break;
  169. }
  170. sw_mutex_unlock(tp->hMutex); // unlock
  171. }
  172. /* 获取线程池当前的线程分配情况 */
  173. void sw_thrdpool_get_status(void *hPool, int *max_num, int *alloc_num, int *idle_num, int *sick_num, int *killed_num)
  174. {
  175. SThrdPool *tp = (SThrdPool *)hPool;
  176. PThrdProc proc = NULL;
  177. int allocNum = 0, idleNum = 0, sickNum = 0, i = 0;
  178. if(!tp) return;
  179. sw_mutex_lock(tp->hMutex, WAIT_FOREVER); // lock
  180. for(i = 0; i < tp->maxNum; i++)
  181. {
  182. if(!tp->hThrds[i]) { idleNum += 1; continue; }
  183. proc = NULL;
  184. sw_thrd_getProc(tp->hThrds[i], &proc, NULL, NULL); // 得到线程的(用户)回调函数
  185. if(proc) allocNum += 1;
  186. else
  187. {
  188. if(sw_thrd_isBusy(tp->hThrds[i]) == true) sickNum += 1;
  189. else idleNum += 1;
  190. }
  191. }
  192. sw_mutex_unlock(tp->hMutex); // unlock
  193. if(max_num) *max_num = tp->maxNum;
  194. if(alloc_num) *alloc_num = allocNum;
  195. if(idle_num) *idle_num = idleNum;
  196. if(sick_num) *sick_num = sickNum;
  197. if(killed_num) *killed_num = tp->killedNum;
  198. }
  199. /* 打印线程池当前的线程分配情况 */
  200. void sw_thrdpool_printf(void *hPool)
  201. {
  202. SThrdPool *tp = (SThrdPool *)hPool;
  203. int maxNum = 0, allocNum = 0, idleNum = 0, sickNum = 0, killedNum= 0;
  204. if(!tp) return;
  205. sw_thrdpool_get_status(tp, &maxNum, &allocNum, &idleNum, &sickNum, &killedNum);
  206. sw_log_info("%p thread pool(%d): allocated=%d, idle=%d, sick=%d, killed=%d", \
  207. tp, maxNum, allocNum, idleNum, sickNum, killedNum);
  208. }