import { Observable } from 'rxjs' import { BASE_API_PATH } from '@/utils/variable'; import { notification } from 'ant-design-vue'; import { getToken } from '@/utils/comm'; let ws: any = null let count = 0 // 重连计数 let timer: NodeJS.Timeout = null let lockReconnect = false // 避免重复连接 const total = 100 // 重连总次数 const subs = {} const timeout = 5000 const tempQueue: any[] = [] // websocket未连接上时,缓存消息列 export const initWebSocket = () => { if (ws) { return ws } const token = getToken() //const url = `${document.location.protocol.replace('http', 'ws')}//${document.location.host}${BASE_API_PATH}/messaging/${token}?:X_Access_Token=${token}`; //const url = `${document.location.protocol.replace('http', 'ws')}//${document.location.host}${BASE_API_PATH}/messaging/${token}?:X_Access_Token=${token}`; const url = `ws://119.45.226.246:8848/messaging/${token}?:X_Access_Token=${token}`; if (count < total) { count += 1 ws = new WebSocket(url) ws.onopen = () => { count = 0 timer = setInterval(heartCheck, 2000) if (tempQueue.length > 0) { for (let i = tempQueue.length - 1; i >= 0; i--) { ws.send(tempQueue[i]) tempQueue.splice(i, 1) } } } ws.onclose = () => { console.log('onerror', count) ws = null reconnect() } ws.onmessage = (msg: Record) => { const data = JSON.parse(msg.data) if (data.type === 'error') { notification.error({ key: 'wserr', message: data.message }) } if (subs[data.requestId]) { if (data.type === 'complete') { subs[data.requestId].forEach((item: Record) => { item.complete() }) } else if (data.type === 'result') { subs[data.requestId].forEach((element: Record) => { element.next(data) }) } } } ws.onerror = () => { console.log('onerror', count) ws = null reconnect() } return ws } } export const getWebSocket = (id: string, topic: string, parameter: Record) => new Observable(subscriber => { if (!subs[id]) { subs[id] = [] } subs[id].push({ next(val: Record) { subscriber.next(val) }, complete() { subscriber.complete() } }) const msg = JSON.stringify({ id, topic, parameter, type: 'sub' }) const thisWs = initWebSocket() if (thisWs) { if (thisWs.readyState === WebSocket.OPEN) { thisWs.send(msg) } else { tempQueue.push(msg) } } return () => { const unsub = JSON.stringify({ id, type: 'unsub' }) delete subs[id] if (thisWs) { thisWs.send(unsub) } } }) /** * 重连 */ function reconnect() { timer && clearInterval(timer) if (lockReconnect) { return } lockReconnect = true timer = setTimeout(() => { initWebSocket() lockReconnect = false }, timeout * count) } /** * 心跳检测 */ function heartCheck() { if (ws) { ws.send(JSON.stringify({ type: 'ping' })) } }