production-taskbar-client / src / renderer / apis / backendWs.js
backendWs.js
Raw
import { ipcRenderer } from "electron";
import log from "electron-log/renderer";

import taskbarApi from "./backend";
import { isNotificationMessage } from "./schemaValidators";

const isDev = process.env.NODE_ENV === "development";
const WS_INFORMING_PATH = "ws/informing";
const kInformingChannel = "informing";

let informingWs;
let wsCloseCode = 1000; // due rtk query close websocket connection itself its impossible set close code via ws.close(code)
const kCloseCode = 3333;
let informinWsRetries = 0;
const pingData = Buffer.from("ping");
const pingDelay = 60 * 1000;
const baseUrl = new URL(process.env.BACKEND_URL);
const protocol = baseUrl.protocol === "http:" ? "ws:" : "wss:";
const baseUrlWs = `${protocol}//${baseUrl.host}${baseUrl.pathname}`;

const generateRandomDelay = (min = 0, max = 120) => {
  // delay in prod 5 sec
  if (isDev) return 5000;
  // min-max in seconds in prod
  max = max > 120 ? 120 : max;
  return (Math.floor(Math.random() * (max - min)) + min) * 1000;
};

export const taskbarWsApi = taskbarApi.injectEndpoints({
  endpoints: (builder) => ({
    sendInforming: builder.mutation({
      queryFn: (message) => {
        if (!message) return { data: null };
        try {
          informingWs.send(JSON.stringify(message));
        } catch (e) {
          return { data: false };
        }
        return { data: true };
      },
    }),
    getInforming: builder.query({
      queryFn: (/* channel */) => {
        return { data: {} };
      },
      keepUnusedDataFor: 5, // close ws connection after 5 sec if no component subscribers
      async onCacheEntryAdded(
        channel,
        { updateCachedData, cacheDataLoaded, cacheEntryRemoved }
      ) {
        if (!channel) return;
        try {
          await cacheDataLoaded;

          const openWebsocket = () => {
            let pingInterval;
            wsCloseCode = 1000;

            const _ws = new WebSocket(
              `${baseUrlWs}${WS_INFORMING_PATH}/${channel}/`
            );
            const listener = async (event) => {
              if (typeof event.data === "string") {
                const data = JSON.parse(event.data);
                if (!isNotificationMessage(data)) return;
                updateCachedData((draft) => Object.assign(draft, data));
              }
            };

            const onclose = ({ code }) => {
              clearInterval(pingInterval);
              informingWs = null;
              pingInterval = null;
              // Abnormal Closure or Internal Error or Service Restart
              if (wsCloseCode !== kCloseCode) {
                ipcRenderer.invoke(kInformingChannel, {
                  event: "connection-changed",
                  data: false,
                });
                const delay = generateRandomDelay(10, 60 + informinWsRetries);
                log.info(
                  `websocket closed(${code}/${wsCloseCode}), reconnect in ${
                    delay / 1000
                  } sec, ${informinWsRetries} retries`
                );
                setTimeout(() => {
                  informingWs = openWebsocket();
                  informinWsRetries += 1;
                }, delay);
              } else {
                log.info(`websocket closed with code ${code}/${wsCloseCode}`);
              }
              wsCloseCode = 1000;
            };

            const onopen = () => {
              informinWsRetries = 0;
              ipcRenderer.invoke(kInformingChannel, {
                event: "connection-changed",
                data: true,
              });

              // ping without connection force ws close, no pong needed
              pingInterval = setInterval(() => _ws.send(pingData), pingDelay);
              log.info(`websocket opened with channel ${channel}`);
            };

            _ws.addEventListener("open", onopen);
            _ws.addEventListener("message", listener);
            _ws.addEventListener("close", onclose);
            return _ws;
          };
          informingWs = openWebsocket();
        } catch (e) {
          // no-op in case `cacheEntryRemoved` resolves before `cacheDataLoaded`,
          // in which case `cacheDataLoaded` will throw
        }
        await cacheEntryRemoved;
        wsCloseCode = 3333;
        informingWs?.close();
      },
    }),
  }),
});

export const { useGetInformingQuery, useSendInformingMutation } = taskbarWsApi;