useSendWebsocketMessage.ts 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import { useMemo, useRef } from 'react';
  2. import useWebSocket from './useWebSocket';
  3. import { Observable } from 'rxjs';
  4. import Token from '@/utils/token';
  5. import type { WebsocketPayload } from '@/hooks/websocket/typings';
  6. import { notification } from 'antd';
  7. const url = `${document.location.protocol.replace('http', 'ws')}//${
  8. document.location.host
  9. }/jetlinks/messaging/${Token.get()}?:X_Access_Token=${Token.get()}`;
  10. enum MsgType {
  11. sub = 'sub',
  12. unsub = 'unsub',
  13. }
  14. const subscribeList: Record<string, { next: any; complete: any }[]> = {};
  15. export const useSendWebsocketMessage = () => {
  16. const messageHistory = useRef<any>([]);
  17. /**
  18. * 分发消息
  19. * @param message
  20. */
  21. const dispenseMessage = (message: MessageEvent) => {
  22. const data = JSON.parse(message.data) as WebsocketPayload;
  23. if (data.type === 'error') {
  24. notification.error({ key: 'websocket-error', message: data.message });
  25. }
  26. if (subscribeList[data.requestId]) {
  27. if (data.type === 'complete') {
  28. subscribeList[data.requestId].forEach((element: any) => {
  29. element.complete();
  30. });
  31. } else if (data.type === 'result') {
  32. subscribeList[data.requestId].forEach((element: any) => {
  33. element.next(data);
  34. });
  35. }
  36. }
  37. };
  38. const { sendMessage, latestMessage } = useWebSocket(url, {
  39. reconnectInterval: 1000,
  40. reconnectLimit: 1,
  41. onClose: () => notification.error({ key: 'websocket-error', message: '网络错误,请刷新重试' }),
  42. onOpen: (event) => console.log('打开链接', event),
  43. onError: (event) => console.log('报错了', event),
  44. onMessage: dispenseMessage,
  45. });
  46. messageHistory.current = useMemo(
  47. () => messageHistory.current.concat(latestMessage),
  48. [latestMessage],
  49. );
  50. const subscribeTopic = (
  51. id: string,
  52. topic: string,
  53. parameter: Record<string, any>,
  54. ): Observable<any> => {
  55. return new Observable((subscriber) => {
  56. if (!subscribeList[id]) {
  57. subscribeList[id] = [];
  58. }
  59. subscribeList[id].push({
  60. next: (value: any) => subscriber.next(value),
  61. complete: () => subscriber.complete(),
  62. });
  63. const message = JSON.stringify({ id, topic, parameter, type: MsgType.sub });
  64. sendMessage?.(message);
  65. return () => {
  66. const unsub = JSON.stringify({ id, type: MsgType.unsub });
  67. delete subscribeList[id];
  68. sendMessage?.(unsub);
  69. };
  70. });
  71. };
  72. return [subscribeTopic, sendMessage];
  73. };
  74. export default useSendWebsocketMessage;