Skip to content
Snippets Groups Projects
build.ts 6.77 KiB
Newer Older
import {
  decode,
  decodeWithOptions,
  makeEncode,
  makeRecipes,
  readMsg,
} from "../commands";
import {
  CallFn,
  OnFn,
  Options,
  USP,
  Command,
  Proto,
  BuildConnectionFn,
} from "../types";
import { makeCallbackRouter, makeRouter } from "../util";

const defaultPublishEndpoint = "/usp/endpoint";
const defaultSubscribeEndpoint = "/usp/controller";
const defaultIdEndpoint = "obuspa/EndpointID";
const defaultFromId = "proto::interop-usp-controller";
const defaultConnectionTimeout = 5000;
const idResolveTimeout = 5000;

const fixId = (s: string) => s.split("+").join("%2B");

const wait = (ms: number) => {
  let waitId;
  let waitPromise = new Promise((_, reject) => {
    waitId = setTimeout(() => {
      reject(`timeout of ${ms}ms reached`);
    }, ms);
  });
  return [waitId, waitPromise];
};

const timeoutWrapper = (promise: () => Promise<any>, ms?: number) => {
  if (typeof ms === "number" && ms >= 0) {
    const [waitId, waitPromise] = wait(ms);
    return Promise.race([
      promise().then((res) => {
        clearTimeout(waitId);
        return res;
      }),
      waitPromise,
    ]);
  } else return promise();
};

const wrap = <T extends (...args: any[]) => any>(
  opts: Options,
  cmdName: keyof USP,
  cmd: T
): T => {
  return <T>((...args: any[]) => {
    // Promise.resolve added to handle non-promise commands
    const cmdCall = () => Promise.resolve(cmd(...args));
    const finalCall = () =>
      timeoutWrapper(cmdCall, opts.timeout)
        .then((res) => {
          opts.postCall && opts.postCall(cmdName, args, res);
          return res;
        })
        .catch((err) => {
          opts.postCall && opts.postCall(cmdName, args, err);
          throw err;
        });
    if (opts.preCall) opts.preCall(cmdName, args);
    return finalCall();
  });
};

const addOptions = (usp: Partial<USP>, opts: Options): USP => {
  const mainUSP = Object.entries(usp).reduce(
    (acc: Partial<USP>, [k, fn]): Partial<USP> => ({
      ...acc,
      [k]: wrap(opts, k as keyof USP, fn as Command),
    }),
    {} as any
  ) as USP;
  mainUSP.options = (opts) => addOptions(mainUSP, opts);
  return mainUSP;
};

const buildConnect: BuildConnectionFn =
  ({ connectClient, decodeID, loadProtobuf }) =>
  async (options, events) => {
    const subscribeEndpoint =
      options.subscribeEndpoint || defaultSubscribeEndpoint;
    const publishEndpoint = options.publishEndpoint || defaultPublishEndpoint;
    const idEndpoint = options.idEndpoint || defaultIdEndpoint;

    const proto: Proto = await loadProtobuf();

    const router = makeRouter();
    const callbackRouter = makeCallbackRouter();
    const handleError = (err: any) =>
      events && events.onError && events.onError(err);
    const handleOffline = () =>
      events && events.onOffline && events.onOffline();
    const handleReconnect = () =>
      events && events.onReconnect && events.onReconnect();
    const handleClose = () => events && events.onClose && events.onClose();

    callbackRouter.add("error", handleError);

    const client = await connectClient(options);

    const handleInit = () =>
      new Promise<string>((resolve, reject) => {
        const id = setTimeout(
          () =>
            reject({
              errMsg: `toId was not received within timeout(${idResolveTimeout})`,
            }),
          idResolveTimeout
        );
        client.on("message", (topic, data: any) => {
          clearTimeout(id);
          client.unsubscribe(idEndpoint);
          resolve(decodeID(data || topic));
        });
        client.subscribe(idEndpoint);
      });

    const toId = fixId(options.toId || (await handleInit()));
    const fromId = options.fromId || defaultFromId;

    client.on("message", (_topic, data: any) => {
      const parsedMsg = readMsg(proto, data || _topic);
      const [id, message, err, cmdType] = decode(parsedMsg);
      if (typeof id !== "string") {
        handleError(
          `Could not locate id for message:\n${JSON.stringify(
            parsedMsg,
            null,
            2
          )}`
        );
      } else {
        const call = router.get(id, cmdType || "NOTIFY");
        if (call && call.resolve && call.reject) {
          if (err) call.reject(err);
          else if (call.options) {
            const messageAfterOptions = decodeWithOptions(
              parsedMsg,
              cmdType,
              call.options
            );
            call.resolve(messageAfterOptions);
          } else call.resolve(message);
        }

        const cbs = callbackRouter.get(id);
        cbs.forEach((cb) => {
          if (message) cb(message, parsedMsg);
          else if (err) cb(err, parsedMsg);
        });
    let reconnectCount = 0;
    const isTrackingReconnects =
      typeof options.reconnectsBeforeClosing === "number";

    client.on("error", (err) => {
      callbackRouter.get("error").forEach((cb) => cb(err));
      handleError(JSON.stringify(err, null, 2));
    });

    client.on("offline", handleOffline);
    client.on("reconnect", () => {
      handleReconnect();
      if (isTrackingReconnects) reconnectCount++;
    });
    client.on("close", () => {
      handleClose();
      if (options.closeOnDisconnect === true) client.end();
      if (
        isTrackingReconnects &&
        reconnectCount > (options.reconnectsBeforeClosing as number)
      )
        client.end();
    });

    const on: OnFn = (ident, callback) => {
      callbackRouter.add(ident, callback);
      return () => {
        callbackRouter.del(ident);
      };
    };

    const encode = makeEncode(proto, { fromId, toId });
    const call: CallFn = (command, args, callOpts) =>
      new Promise((resolve, reject) => {
        const [id, msg, err] = encode(command, args);
        if (err) reject(err);
        else {
          router.add(id, callOpts?.responseMsgType || command, {
            resolve,
            reject,
            options: args.options,
          });
          client.publish(publishEndpoint, msg);
        }
      });

    await client.subscribe(subscribeEndpoint);

    const baseUSP: Partial<USP> = {
      get: (paths, options) => call("GET", { paths, options }),
      set: (path, value) => call("SET", { path, value }),
      add: (path, value) => call("ADD", { path, value }),
      del: (paths, allowPartial) => call("DELETE", { paths, allowPartial }),
      instances: (paths, opts) => call("GET_INSTANCES", { paths, opts }),
      supportedDM: (paths, opts) => call("GET_SUPPORTED_DM", { paths, opts }),
      supportedProto: (versions) => call("GET_SUPPORTED_PROTO", { versions }),
      _operate: (path, id, resp, input) =>
        call("OPERATE", { path, input, id, resp }),
      on,
      ...makeRecipes(call, on),
      disconnect: () => client.end(),
    };

    return {
      ...baseUSP,
      options: (opts) => addOptions(baseUSP, opts),
    } as USP;
  };

export default buildConnect;