import { ipcRenderer } from "electron"; import log from "electron-log/renderer"; import taskbarApi from "./backend"; import { isNotificationMessage } from "./schemaValidators"; const isDev = process.env.NODE_ENV === "development"; const WS_INFORMING_PATH = "ws/informing"; const kInformingChannel = "informing"; let informingWs; let wsCloseCode = 1000; // due rtk query close websocket connection itself its impossible set close code via ws.close(code) const kCloseCode = 3333; let informinWsRetries = 0; const pingData = Buffer.from("ping"); const pingDelay = 60 * 1000; const baseUrl = new URL(process.env.BACKEND_URL); const protocol = baseUrl.protocol === "http:" ? "ws:" : "wss:"; const baseUrlWs = `${protocol}//${baseUrl.host}${baseUrl.pathname}`; const generateRandomDelay = (min = 0, max = 120) => { // delay in prod 5 sec if (isDev) return 5000; // min-max in seconds in prod max = max > 120 ? 120 : max; return (Math.floor(Math.random() * (max - min)) + min) * 1000; }; export const taskbarWsApi = taskbarApi.injectEndpoints({ endpoints: (builder) => ({ sendInforming: builder.mutation({ queryFn: (message) => { if (!message) return { data: null }; try { informingWs.send(JSON.stringify(message)); } catch (e) { return { data: false }; } return { data: true }; }, }), getInforming: builder.query({ queryFn: (/* channel */) => { return { data: {} }; }, keepUnusedDataFor: 5, // close ws connection after 5 sec if no component subscribers async onCacheEntryAdded( channel, { updateCachedData, cacheDataLoaded, cacheEntryRemoved } ) { if (!channel) return; try { await cacheDataLoaded; const openWebsocket = () => { let pingInterval; wsCloseCode = 1000; const _ws = new WebSocket( `${baseUrlWs}${WS_INFORMING_PATH}/${channel}/` ); const listener = async (event) => { if (typeof event.data === "string") { const data = JSON.parse(event.data); if (!isNotificationMessage(data)) return; updateCachedData((draft) => Object.assign(draft, data)); } }; const onclose = ({ code }) => { clearInterval(pingInterval); informingWs = null; pingInterval = null; // Abnormal Closure or Internal Error or Service Restart if (wsCloseCode !== kCloseCode) { ipcRenderer.invoke(kInformingChannel, { event: "connection-changed", data: false, }); const delay = generateRandomDelay(10, 60 + informinWsRetries); log.info( `websocket closed(${code}/${wsCloseCode}), reconnect in ${ delay / 1000 } sec, ${informinWsRetries} retries` ); setTimeout(() => { informingWs = openWebsocket(); informinWsRetries += 1; }, delay); } else { log.info(`websocket closed with code ${code}/${wsCloseCode}`); } wsCloseCode = 1000; }; const onopen = () => { informinWsRetries = 0; ipcRenderer.invoke(kInformingChannel, { event: "connection-changed", data: true, }); // ping without connection force ws close, no pong needed pingInterval = setInterval(() => _ws.send(pingData), pingDelay); log.info(`websocket opened with channel ${channel}`); }; _ws.addEventListener("open", onopen); _ws.addEventListener("message", listener); _ws.addEventListener("close", onclose); return _ws; }; informingWs = openWebsocket(); } catch (e) { // no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`, // in which case `cacheDataLoaded` will throw } await cacheEntryRemoved; wsCloseCode = 3333; informingWs?.close(); }, }), }), }); export const { useGetInformingQuery, useSendInformingMutation } = taskbarWsApi;