| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- /************************************************************************
- * AUTHOR: NiuJiuRu
- * FILENAME: swthrdpool.c
- * CONTENT: 线程池
- * NOTE:
- * HISTORY:
- * 1, [2010-09-20] created by NiuJiuRu
- * 2, [2018-03-23] add "sw_thrdpool_create2()" function and redefine
- * "sw_thrdpool_create()" function
- ***********************************************************************/
- #include "swapi.h"
- #include "swmutex.h"
- #include "swmem.h"
- #include "swthrd.h"
- #include "swthrdpool.h"
- typedef struct
- {
- // 线程池中的线程
- void **hThrds;
- // 访问线程池的互斥锁
- void *hMutex;
- // 线程池的容量
- int maxNum;
- // 线程池中线程的优先级
- int priority;
- // 线程池中线程的堆栈大小
- int stackSize;
- // 被强行杀死的线程个数
- int killedNum;
- } SThrdPool;
- /* 创建线程池 */
- void *sw_thrdpool_create2(int max, int priority, int stack_size)
- {
- SThrdPool *tp = NULL;
- int size = 0;
-
- if(max <= 0) goto ErrorP;
-
- size = sizeof(SThrdPool) + max*sizeof(void *);
- tp = (SThrdPool *)sw_heap_malloc(size); // request memory
- if(!tp) goto ErrorP;
-
- memset(tp, 0, size);
- tp->hThrds = (void **)(tp+1);
- tp->hMutex = sw_mutex_create();
- if(!tp->hMutex) goto ErrorP;
- tp->maxNum = max;
- tp->priority = priority;
- tp->stackSize = stack_size;
- tp->killedNum = 0;
-
- return tp;
-
- ErrorP:
- sw_thrdpool_destroy(tp, 0);
- return NULL;
- }
- /* 销毁线程池, 超时前尝试安全的销毁, 超时后则强制销毁 */
- void sw_thrdpool_destroy(void *hPool, int timeout)
- {
- SThrdPool *tp = (SThrdPool *)hPool;
- int i = 0;
-
- if(!tp) return;
-
- for(i = 0; i < tp->maxNum; i++)
- {
- if(tp->hThrds[i]) { sw_thrd_destroy(tp->hThrds[i], timeout); tp->hThrds[i] = NULL; } // destroy thread
- }
-
- if(tp->hMutex) { sw_mutex_destroy(tp->hMutex); tp->hMutex = NULL; }
-
- sw_heap_free(tp);
- }
- /* 从线程池中分配一个线程, 成功后得到的线程默认处于暂停状态 */
- void *sw_thrdpool_alloc(void *hPool, PThrdProc proc, unsigned long wParam, unsigned long lParam, int timeout)
- {
- SThrdPool *tp = (SThrdPool *)hPool;
- void *hThrd = NULL; PThrdProc oldProc = NULL;
- char name[32] = { 0 }; int i = 0;
-
- if(!tp) return NULL;
-
- sw_mutex_lock(tp->hMutex, WAIT_FOREVER); // lock
-
- // 1, 寻找一个没有被分配出去并且已经退出(用户)回调函数的线程
- for(i = 0; i < tp->maxNum; i++)
- {
- if(!tp->hThrds[i]) continue;
- oldProc = NULL;
- sw_thrd_getProc(tp->hThrds[i], &oldProc, NULL, NULL);
- if(oldProc == NULL && sw_thrd_isBusy(tp->hThrds[i]) == false)
- {
- hThrd = tp->hThrds[i];
- if(!sw_thrd_isAlive(hThrd))
- {
- sw_thrd_destroy(hThrd, timeout); // destroy thread
- hThrd = tp->hThrds[i] = NULL;
-
- sprintf(name, "thread%d", i);
- hThrd = sw_thrd_create(name, tp->priority, tp->stackSize, proc, wParam, lParam);
- tp->hThrds[i] = hThrd;
- }
- else
- sw_thrd_setProc(hThrd, proc, wParam, lParam);
- break;
- }
- }
-
- // 2, 如果没有找到可用的线程, 并且该线程池未满, 则创建一个
- if(hThrd == NULL)
- {
- for(i = 0; i < tp->maxNum; i++)
- {
- if(tp->hThrds[i]) continue;
- sprintf(name, "thread%d", i);
- hThrd = sw_thrd_create(name, tp->priority, tp->stackSize, proc, wParam, lParam);
- tp->hThrds[i] = hThrd;
- break;
- }
- }
-
- // 3, 线程池满, 销毁一个还没有从(用户)回调函数中退出的可能已经异常的线程, 空出名额以创建新线程
- if(hThrd == NULL)
- {
- for(i = 0; i < tp->maxNum; i++)
- {
- if(!tp->hThrds[i]) continue;
- oldProc = NULL;
- sw_thrd_getProc(tp->hThrds[i], &oldProc, NULL, NULL);
- if(oldProc == NULL && sw_thrd_isBusy(tp->hThrds[i]) == true)
- {
- sw_thrd_destroy(tp->hThrds[i], timeout); // destroy thread
- tp->hThrds[i] = NULL;
- tp->killedNum += 1;
-
- sprintf(name, "thread%d", i);
- hThrd = sw_thrd_create(name, tp->priority, tp->stackSize, proc, wParam, lParam);
- tp->hThrds[i] = hThrd;
- break;
- }
- }
- }
-
- sw_mutex_unlock(tp->hMutex); // unlock
-
- //4, 没有分配到
- if(hThrd == NULL)
- {
- sw_log_fatal("sorry, alloc thread fail, because %p thread pool is full!!!", tp);
- sw_thrdpool_printf(tp);
- }
-
- return hThrd;
- }
- /* 从线程池中释放一个线程, 超时前尝试安全的关闭线程, 超时后则强制关闭线程 */
- void sw_thrdpool_free(void *hPool, void *hThrd, int timeout)
- {
- SThrdPool *tp = (SThrdPool *)hPool;
- int cnt = 0, i = 0;
-
- if(!tp || !hThrd) return;
-
- sw_mutex_lock(tp->hMutex, WAIT_FOREVER); // lock
-
- for(i = 0; i < tp->maxNum; i++)
- {
- if(tp->hThrds[i] != hThrd) continue;
-
- sw_thrd_pause(hThrd);
- sw_thrd_setProc(hThrd, NULL, 0, 0);
- while(cnt < timeout || timeout < 0)
- {
- if(sw_thrd_isBusy(hThrd))
- { // 等待线程的(用户)回调函数退出
- sw_mutex_unlock(tp->hMutex); // unlock
- sw_thrd_delay(1);
- sw_mutex_lock(tp->hMutex, WAIT_FOREVER); // lock
-
- cnt += 1;
- }
- else break;
- }
- if(sw_thrd_isBusy(hThrd))
- {
- sw_mutex_unlock(tp->hMutex); // unlock
- sw_thrd_destroy(hThrd, timeout); // destroy thread
- sw_mutex_lock(tp->hMutex, WAIT_FOREVER); // lock
-
- tp->hThrds[i] = NULL;
- tp->killedNum += 1;
- }
-
- break;
- }
-
- sw_mutex_unlock(tp->hMutex); // unlock
- }
- /* 获取线程池当前的线程分配情况 */
- void sw_thrdpool_get_status(void *hPool, int *max_num, int *alloc_num, int *idle_num, int *sick_num, int *killed_num)
- {
- SThrdPool *tp = (SThrdPool *)hPool;
- PThrdProc proc = NULL;
- int allocNum = 0, idleNum = 0, sickNum = 0, i = 0;
-
- if(!tp) return;
-
- sw_mutex_lock(tp->hMutex, WAIT_FOREVER); // lock
-
- for(i = 0; i < tp->maxNum; i++)
- {
- if(!tp->hThrds[i]) { idleNum += 1; continue; }
-
- proc = NULL;
- sw_thrd_getProc(tp->hThrds[i], &proc, NULL, NULL); // 得到线程的(用户)回调函数
-
- if(proc) allocNum += 1;
- else
- {
- if(sw_thrd_isBusy(tp->hThrds[i]) == true) sickNum += 1;
- else idleNum += 1;
- }
- }
-
- sw_mutex_unlock(tp->hMutex); // unlock
-
- if(max_num) *max_num = tp->maxNum;
- if(alloc_num) *alloc_num = allocNum;
- if(idle_num) *idle_num = idleNum;
- if(sick_num) *sick_num = sickNum;
- if(killed_num) *killed_num = tp->killedNum;
- }
- /* 打印线程池当前的线程分配情况 */
- void sw_thrdpool_printf(void *hPool)
- {
- SThrdPool *tp = (SThrdPool *)hPool;
- int maxNum = 0, allocNum = 0, idleNum = 0, sickNum = 0, killedNum= 0;
- if(!tp) return;
- sw_thrdpool_get_status(tp, &maxNum, &allocNum, &idleNum, &sickNum, &killedNum);
- sw_log_info("%p thread pool(%d): allocated=%d, idle=%d, sick=%d, killed=%d", \
- tp, maxNum, allocNum, idleNum, sickNum, killedNum);
- }
|