/************************************************************************ * AUTHOR: NiuJiuRu * FILENAME: swmsgq.c * DESCRIPTION: 消息队列 * NOTE: * HISTORY: * 1, [2010-09-06] created by NiuJiuRu * 2, [2016-08-08] 优化代码, 保障线程安全 ***********************************************************************/ #include "swapi.h" #include "swsignal.h" #include "swmutex.h" #include "swmem.h" #include "swmsgq.h" #define MAX_QUEUE_DEPTH 64 // 消息队列的最大深度 typedef struct { int msg; unsigned long wParam; unsigned long lParam; } SMsg; typedef struct { // 消息队列中的消息数据 SMsg msg[MAX_QUEUE_DEPTH]; // 消息队列的实际深度 int depth; // 消息队列中的读位置 int read; // 消息队列中的写位置 int write; // 指示消息队列中是否有未读消息的信号量 void *hSignal; // 控制消息队列进和出的互斥锁 void *hMutex; } SMsgQueue; /* 创建一个消息队列 */ void *sw_msgq_create(int max) { SMsgQueue *q = NULL; if(max <= 0) goto ErrorP; q = (SMsgQueue *)sw_heap_malloc(sizeof(SMsgQueue)); if(!q) goto ErrorP; memset(q, 0, sizeof(SMsgQueue)); q->depth = (max+1) < MAX_QUEUE_DEPTH ? (max+1) : MAX_QUEUE_DEPTH; q->hSignal = sw_signal_create(); if(!q->hSignal) goto ErrorP; q->hMutex = sw_mutex_create(); if(!q->hMutex) goto ErrorP; q->write = q->read = 0; return q; ErrorP: sw_msgq_destroy(q); return NULL; } /* 销毁一个消息队列 */ void sw_msgq_destroy(void *hQueue) { SMsgQueue *q = (SMsgQueue *)hQueue; if(!q) return; if(q->hMutex) { sw_mutex_destroy(q->hMutex); q->hMutex = NULL; } if(q->hSignal) { sw_signal_destroy(q->hSignal); q->hSignal = NULL; } sw_heap_free(q); } /* 向一个消息队列添加消息 */ int sw_msgq_post(void *hQueue, int msg, unsigned long wParam, unsigned long lParam) { SMsgQueue *q = (SMsgQueue *)hQueue; if(!q) return -1; // lock sw_mutex_lock(q->hMutex, WAIT_FOREVER); // 添加消息到"消息队列" q->msg[q->write].msg = msg; q->msg[q->write].wParam = wParam; q->msg[q->write].lParam = lParam; q->write = (q->write < q->depth-1 ? q->write+1 : 0); // 如果队列已满, 则丢弃最老的消息 if(q->write == q->read) { q->read = (q->read < q->depth-1 ? q->read+1 : 0); sw_log_warn("a message queue drop a oldest message!"); } // 公告"消息队列"中有新的消息加入 sw_signal_give(q->hSignal); // unlock sw_mutex_unlock(q->hMutex); return 0; } /* 从一个消息队列提取消息(超时设置的时间单位为: 毫秒, 并且当timeout = -1时表示无限等待) */ int sw_msgq_read(void *hQueue, int *msg, unsigned long *wParam, unsigned long *lParam, int timeout) { SMsgQueue *q = (SMsgQueue *)hQueue; if(!q) return -1; // 检查"消息队列"中是否有消息, 如没有则等待, 等待超时则退出 if(sw_signal_wait(q->hSignal, timeout) != 0) return -1; // lock sw_mutex_lock(q->hMutex, WAIT_FOREVER); // 判断"消息队列"中是否还有尚未读取的消息 if(q->write == q->read) { sw_mutex_unlock(q->hMutex); // unlock return -1; } // 提取消息 if(msg) *msg = q->msg[q->read].msg; if(wParam) *wParam = q->msg[q->read].wParam; if(lParam) *lParam = q->msg[q->read].lParam; q->read = (q->read < q->depth-1 ? q->read+1 : 0); // unlock sw_mutex_unlock(q->hMutex); return 0; } /* 删除一个消息队列里的所有消息 */ void sw_msgq_clear(void *hQueue) { SMsgQueue *q = (SMsgQueue *)hQueue; if(!q) return; sw_mutex_lock(q->hMutex, WAIT_FOREVER); // lock while(sw_signal_wait(q->hSignal, 0) == 0) ; q->write = q->read = 0; sw_mutex_unlock(q->hMutex); // unlock } /* 取得一个消息队列中现有的消息数目 */ int sw_msgq_get_num(void *hQueue) { SMsgQueue *q = (SMsgQueue *)hQueue; int num = 0; if(!q) return -1; sw_mutex_lock(q->hMutex, WAIT_FOREVER); // lock num = (q->write-q->read+q->depth) % q->depth; sw_mutex_unlock(q->hMutex); // unlock return num; } /* 得到一个消息队列所允许的最大消息数目 */ int sw_msgq_get_max(void *hQueue) { SMsgQueue *q = (SMsgQueue *)hQueue; if(!q) return -1; // 因为在任一时刻"消息队列"中必须有一个"空闲"位置来存放新的消息, 因此 // 无论何时, "消息队列"中有效地消息个数最多为: "消息队列"的深度-1 return (q->depth-1); }