/************************************************************************ * AUTHOR: NiuJiuRu * FILENAME: swthrdpool.c * CONTENT: 线程池 * NOTE: * HISTORY: * 1, [2010-09-20] created by NiuJiuRu * 2, [2018-03-23] add "sw_thrdpool_create2()" function and redefine * "sw_thrdpool_create()" function ***********************************************************************/ #include "swapi.h" #include "swmutex.h" #include "swmem.h" #include "swthrd.h" #include "swthrdpool.h" typedef struct { // 线程池中的线程 void **hThrds; // 访问线程池的互斥锁 void *hMutex; // 线程池的容量 int maxNum; // 线程池中线程的优先级 int priority; // 线程池中线程的堆栈大小 int stackSize; // 被强行杀死的线程个数 int killedNum; } SThrdPool; /* 创建线程池 */ void *sw_thrdpool_create2(int max, int priority, int stack_size) { SThrdPool *tp = NULL; int size = 0; if(max <= 0) goto ErrorP; size = sizeof(SThrdPool) + max*sizeof(void *); tp = (SThrdPool *)sw_heap_malloc(size); // request memory if(!tp) goto ErrorP; memset(tp, 0, size); tp->hThrds = (void **)(tp+1); tp->hMutex = sw_mutex_create(); if(!tp->hMutex) goto ErrorP; tp->maxNum = max; tp->priority = priority; tp->stackSize = stack_size; tp->killedNum = 0; return tp; ErrorP: sw_thrdpool_destroy(tp, 0); return NULL; } /* 销毁线程池, 超时前尝试安全的销毁, 超时后则强制销毁 */ void sw_thrdpool_destroy(void *hPool, int timeout) { SThrdPool *tp = (SThrdPool *)hPool; int i = 0; if(!tp) return; for(i = 0; i < tp->maxNum; i++) { if(tp->hThrds[i]) { sw_thrd_destroy(tp->hThrds[i], timeout); tp->hThrds[i] = NULL; } // destroy thread } if(tp->hMutex) { sw_mutex_destroy(tp->hMutex); tp->hMutex = NULL; } sw_heap_free(tp); } /* 从线程池中分配一个线程, 成功后得到的线程默认处于暂停状态 */ void *sw_thrdpool_alloc(void *hPool, PThrdProc proc, unsigned long wParam, unsigned long lParam, int timeout) { SThrdPool *tp = (SThrdPool *)hPool; void *hThrd = NULL; PThrdProc oldProc = NULL; char name[32] = { 0 }; int i = 0; if(!tp) return NULL; sw_mutex_lock(tp->hMutex, WAIT_FOREVER); // lock // 1, 寻找一个没有被分配出去并且已经退出(用户)回调函数的线程 for(i = 0; i < tp->maxNum; i++) { if(!tp->hThrds[i]) continue; oldProc = NULL; sw_thrd_getProc(tp->hThrds[i], &oldProc, NULL, NULL); if(oldProc == NULL && sw_thrd_isBusy(tp->hThrds[i]) == false) { hThrd = tp->hThrds[i]; if(!sw_thrd_isAlive(hThrd)) { sw_thrd_destroy(hThrd, timeout); // destroy thread hThrd = tp->hThrds[i] = NULL; sprintf(name, "thread%d", i); hThrd = sw_thrd_create(name, tp->priority, tp->stackSize, proc, wParam, lParam); tp->hThrds[i] = hThrd; } else sw_thrd_setProc(hThrd, proc, wParam, lParam); break; } } // 2, 如果没有找到可用的线程, 并且该线程池未满, 则创建一个 if(hThrd == NULL) { for(i = 0; i < tp->maxNum; i++) { if(tp->hThrds[i]) continue; sprintf(name, "thread%d", i); hThrd = sw_thrd_create(name, tp->priority, tp->stackSize, proc, wParam, lParam); tp->hThrds[i] = hThrd; break; } } // 3, 线程池满, 销毁一个还没有从(用户)回调函数中退出的可能已经异常的线程, 空出名额以创建新线程 if(hThrd == NULL) { for(i = 0; i < tp->maxNum; i++) { if(!tp->hThrds[i]) continue; oldProc = NULL; sw_thrd_getProc(tp->hThrds[i], &oldProc, NULL, NULL); if(oldProc == NULL && sw_thrd_isBusy(tp->hThrds[i]) == true) { sw_thrd_destroy(tp->hThrds[i], timeout); // destroy thread tp->hThrds[i] = NULL; tp->killedNum += 1; sprintf(name, "thread%d", i); hThrd = sw_thrd_create(name, tp->priority, tp->stackSize, proc, wParam, lParam); tp->hThrds[i] = hThrd; break; } } } sw_mutex_unlock(tp->hMutex); // unlock //4, 没有分配到 if(hThrd == NULL) { sw_log_fatal("sorry, alloc thread fail, because %p thread pool is full!!!", tp); sw_thrdpool_printf(tp); } return hThrd; } /* 从线程池中释放一个线程, 超时前尝试安全的关闭线程, 超时后则强制关闭线程 */ void sw_thrdpool_free(void *hPool, void *hThrd, int timeout) { SThrdPool *tp = (SThrdPool *)hPool; int cnt = 0, i = 0; if(!tp || !hThrd) return; sw_mutex_lock(tp->hMutex, WAIT_FOREVER); // lock for(i = 0; i < tp->maxNum; i++) { if(tp->hThrds[i] != hThrd) continue; sw_thrd_pause(hThrd); sw_thrd_setProc(hThrd, NULL, 0, 0); while(cnt < timeout || timeout < 0) { if(sw_thrd_isBusy(hThrd)) { // 等待线程的(用户)回调函数退出 sw_mutex_unlock(tp->hMutex); // unlock sw_thrd_delay(1); sw_mutex_lock(tp->hMutex, WAIT_FOREVER); // lock cnt += 1; } else break; } if(sw_thrd_isBusy(hThrd)) { sw_mutex_unlock(tp->hMutex); // unlock sw_thrd_destroy(hThrd, timeout); // destroy thread sw_mutex_lock(tp->hMutex, WAIT_FOREVER); // lock tp->hThrds[i] = NULL; tp->killedNum += 1; } break; } sw_mutex_unlock(tp->hMutex); // unlock } /* 获取线程池当前的线程分配情况 */ void sw_thrdpool_get_status(void *hPool, int *max_num, int *alloc_num, int *idle_num, int *sick_num, int *killed_num) { SThrdPool *tp = (SThrdPool *)hPool; PThrdProc proc = NULL; int allocNum = 0, idleNum = 0, sickNum = 0, i = 0; if(!tp) return; sw_mutex_lock(tp->hMutex, WAIT_FOREVER); // lock for(i = 0; i < tp->maxNum; i++) { if(!tp->hThrds[i]) { idleNum += 1; continue; } proc = NULL; sw_thrd_getProc(tp->hThrds[i], &proc, NULL, NULL); // 得到线程的(用户)回调函数 if(proc) allocNum += 1; else { if(sw_thrd_isBusy(tp->hThrds[i]) == true) sickNum += 1; else idleNum += 1; } } sw_mutex_unlock(tp->hMutex); // unlock if(max_num) *max_num = tp->maxNum; if(alloc_num) *alloc_num = allocNum; if(idle_num) *idle_num = idleNum; if(sick_num) *sick_num = sickNum; if(killed_num) *killed_num = tp->killedNum; } /* 打印线程池当前的线程分配情况 */ void sw_thrdpool_printf(void *hPool) { SThrdPool *tp = (SThrdPool *)hPool; int maxNum = 0, allocNum = 0, idleNum = 0, sickNum = 0, killedNum= 0; if(!tp) return; sw_thrdpool_get_status(tp, &maxNum, &allocNum, &idleNum, &sickNum, &killedNum); sw_log_info("%p thread pool(%d): allocated=%d, idle=%d, sick=%d, killed=%d", \ tp, maxNum, allocNum, idleNum, sickNum, killedNum); }