swmsgq.c 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. /************************************************************************
  2. * AUTHOR: NiuJiuRu
  3. * FILENAME: swmsgq.c
  4. * DESCRIPTION: 消息队列
  5. * NOTE:
  6. * HISTORY:
  7. * 1, [2010-09-06] created by NiuJiuRu
  8. * 2, [2016-08-08] 优化代码, 保障线程安全
  9. ***********************************************************************/
  10. #include "swapi.h"
  11. #include "swsignal.h"
  12. #include "swmutex.h"
  13. #include "swmem.h"
  14. #include "swmsgq.h"
  15. #define MAX_QUEUE_DEPTH 64 // 消息队列的最大深度
  16. typedef struct
  17. {
  18. int msg;
  19. unsigned long wParam;
  20. unsigned long lParam;
  21. } SMsg;
  22. typedef struct
  23. {
  24. // 消息队列中的消息数据
  25. SMsg msg[MAX_QUEUE_DEPTH];
  26. // 消息队列的实际深度
  27. int depth;
  28. // 消息队列中的读位置
  29. int read;
  30. // 消息队列中的写位置
  31. int write;
  32. // 指示消息队列中是否有未读消息的信号量
  33. void *hSignal;
  34. // 控制消息队列进和出的互斥锁
  35. void *hMutex;
  36. } SMsgQueue;
  37. /* 创建一个消息队列 */
  38. void *sw_msgq_create(int max)
  39. {
  40. SMsgQueue *q = NULL;
  41. if(max <= 0) goto ErrorP;
  42. q = (SMsgQueue *)sw_heap_malloc(sizeof(SMsgQueue));
  43. if(!q) goto ErrorP;
  44. memset(q, 0, sizeof(SMsgQueue));
  45. q->depth = (max+1) < MAX_QUEUE_DEPTH ? (max+1) : MAX_QUEUE_DEPTH;
  46. q->hSignal = sw_signal_create(); if(!q->hSignal) goto ErrorP;
  47. q->hMutex = sw_mutex_create(); if(!q->hMutex) goto ErrorP;
  48. q->write = q->read = 0;
  49. return q;
  50. ErrorP:
  51. sw_msgq_destroy(q);
  52. return NULL;
  53. }
  54. /* 销毁一个消息队列 */
  55. void sw_msgq_destroy(void *hQueue)
  56. {
  57. SMsgQueue *q = (SMsgQueue *)hQueue;
  58. if(!q) return;
  59. if(q->hMutex)
  60. {
  61. sw_mutex_destroy(q->hMutex);
  62. q->hMutex = NULL;
  63. }
  64. if(q->hSignal)
  65. {
  66. sw_signal_destroy(q->hSignal);
  67. q->hSignal = NULL;
  68. }
  69. sw_heap_free(q);
  70. }
  71. /* 向一个消息队列添加消息 */
  72. int sw_msgq_post(void *hQueue, int msg, unsigned long wParam, unsigned long lParam)
  73. {
  74. SMsgQueue *q = (SMsgQueue *)hQueue;
  75. if(!q) return -1;
  76. // lock
  77. sw_mutex_lock(q->hMutex, WAIT_FOREVER);
  78. // 添加消息到"消息队列"
  79. q->msg[q->write].msg = msg;
  80. q->msg[q->write].wParam = wParam;
  81. q->msg[q->write].lParam = lParam;
  82. q->write = (q->write < q->depth-1 ? q->write+1 : 0);
  83. // 如果队列已满, 则丢弃最老的消息
  84. if(q->write == q->read)
  85. {
  86. q->read = (q->read < q->depth-1 ? q->read+1 : 0);
  87. sw_log_warn("a message queue drop a oldest message!");
  88. }
  89. // 公告"消息队列"中有新的消息加入
  90. sw_signal_give(q->hSignal);
  91. // unlock
  92. sw_mutex_unlock(q->hMutex);
  93. return 0;
  94. }
  95. /* 从一个消息队列提取消息(超时设置的时间单位为: 毫秒, 并且当timeout = -1时表示无限等待) */
  96. int sw_msgq_read(void *hQueue, int *msg, unsigned long *wParam, unsigned long *lParam, int timeout)
  97. {
  98. SMsgQueue *q = (SMsgQueue *)hQueue;
  99. if(!q) return -1;
  100. // 检查"消息队列"中是否有消息, 如没有则等待, 等待超时则退出
  101. if(sw_signal_wait(q->hSignal, timeout) != 0) return -1;
  102. // lock
  103. sw_mutex_lock(q->hMutex, WAIT_FOREVER);
  104. // 判断"消息队列"中是否还有尚未读取的消息
  105. if(q->write == q->read)
  106. {
  107. sw_mutex_unlock(q->hMutex); // unlock
  108. return -1;
  109. }
  110. // 提取消息
  111. if(msg) *msg = q->msg[q->read].msg;
  112. if(wParam) *wParam = q->msg[q->read].wParam;
  113. if(lParam) *lParam = q->msg[q->read].lParam;
  114. q->read = (q->read < q->depth-1 ? q->read+1 : 0);
  115. // unlock
  116. sw_mutex_unlock(q->hMutex);
  117. return 0;
  118. }
  119. /* 删除一个消息队列里的所有消息 */
  120. void sw_msgq_clear(void *hQueue)
  121. {
  122. SMsgQueue *q = (SMsgQueue *)hQueue;
  123. if(!q) return;
  124. sw_mutex_lock(q->hMutex, WAIT_FOREVER); // lock
  125. while(sw_signal_wait(q->hSignal, 0) == 0) ;
  126. q->write = q->read = 0;
  127. sw_mutex_unlock(q->hMutex); // unlock
  128. }
  129. /* 取得一个消息队列中现有的消息数目 */
  130. int sw_msgq_get_num(void *hQueue)
  131. {
  132. SMsgQueue *q = (SMsgQueue *)hQueue;
  133. int num = 0;
  134. if(!q) return -1;
  135. sw_mutex_lock(q->hMutex, WAIT_FOREVER); // lock
  136. num = (q->write-q->read+q->depth) % q->depth;
  137. sw_mutex_unlock(q->hMutex); // unlock
  138. return num;
  139. }
  140. /* 得到一个消息队列所允许的最大消息数目 */
  141. int sw_msgq_get_max(void *hQueue)
  142. {
  143. SMsgQueue *q = (SMsgQueue *)hQueue;
  144. if(!q) return -1;
  145. // 因为在任一时刻"消息队列"中必须有一个"空闲"位置来存放新的消息, 因此
  146. // 无论何时, "消息队列"中有效地消息个数最多为: "消息队列"的深度-1
  147. return (q->depth-1);
  148. }