useSendWebsocketMessage.ts 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. import { useMemo, useRef } from 'react';
  2. import useWebSocket from './useWebSocket';
  3. import { Observable } from 'rxjs';
  4. import Token from '@/utils/token';
  5. import type { WebsocketPayload } from '@/hooks/websocket/typings';
  6. import { notification } from 'antd';
  7. import SystemConst from '@/utils/const';
  8. const url = `${document.location.protocol.replace('http', 'ws')}//${document.location.host}/${
  9. SystemConst.API_BASE
  10. }/messaging/${Token.get()}?:X_Access_Token=${Token.get()}`;
  11. enum MsgType {
  12. sub = 'sub',
  13. unsub = 'unsub',
  14. }
  15. const subscribeList: Record<string, { next: any; complete: any }[]> = {};
  16. export const useSendWebsocketMessage = () => {
  17. const messageHistory = useRef<any>([]);
  18. /**
  19. * 分发消息
  20. * @param message
  21. */
  22. const dispenseMessage = (message: MessageEvent) => {
  23. const data = JSON.parse(message.data) as WebsocketPayload;
  24. if (data.type === 'error') {
  25. notification.error({ key: 'websocket-error', message: data.message });
  26. }
  27. if (subscribeList[data.requestId]) {
  28. if (data.type === 'complete') {
  29. subscribeList[data.requestId].forEach((element: any) => {
  30. element.complete();
  31. });
  32. } else if (data.type === 'result') {
  33. subscribeList[data.requestId].forEach((element: any) => {
  34. element.next(data);
  35. });
  36. }
  37. }
  38. };
  39. const { sendMessage, latestMessage } = useWebSocket(url, {
  40. reconnectInterval: 1000,
  41. reconnectLimit: 1,
  42. onClose: () => notification.error({ key: 'websocket-error', message: '网络错误,请刷新重试' }),
  43. onOpen: (event) => console.log('打开链接', event),
  44. onError: (event) => console.log('报错了', event),
  45. onMessage: dispenseMessage,
  46. });
  47. messageHistory.current = useMemo(
  48. () => messageHistory.current.concat(latestMessage),
  49. [latestMessage],
  50. );
  51. const subscribeTopic = (
  52. id: string,
  53. topic: string,
  54. parameter: Record<string, any>,
  55. ): Observable<any> => {
  56. return new Observable((subscriber) => {
  57. if (!subscribeList[id]) {
  58. subscribeList[id] = [];
  59. }
  60. subscribeList[id].push({
  61. next: (value: any) => subscriber.next(value),
  62. complete: () => subscriber.complete(),
  63. });
  64. const message = JSON.stringify({ id, topic, parameter, type: MsgType.sub });
  65. sendMessage?.(message);
  66. return () => {
  67. const unsub = JSON.stringify({ id, type: MsgType.unsub });
  68. delete subscribeList[id];
  69. sendMessage?.(unsub);
  70. };
  71. });
  72. };
  73. return [subscribeTopic, sendMessage];
  74. };
  75. export default useSendWebsocketMessage;