Skip to content
Snippets Groups Projects
index.ts 3.77 KiB
Newer Older
  • Learn to ignore specific revisions
  • import mqttAsync from "async-mqtt";
    
    import { decode, decodeId, makeEncode, makeRecipes, readMsg } from "./commands";
    
    import {
      ConnectionOptions,
      CallFn,
      URLConnectionOptions,
      OnFn,
    
    } from "./types";
    import { makeCallbackRouter, makeRouter } from "./util";
    
    const defaultPublishEndpoint = "/usp/endpoint";
    const defaultSubscribeEndpoint = "/usp/controller";
    
    const defaultIdEndpoint = "obuspa/EndpointID";
    const defaultFromId = "proto::interop-usp-controller";
    const idResolveTimeout = 5000;
    
    
    const isURL = (opts: ConnectionOptions): opts is URLConnectionOptions =>
      "url" in opts;
    
    const _connect = (opts: ConnectionOptions) => {
      if (isURL(opts)) return mqttAsync.connectAsync(opts.url, opts as any);
      else
        return opts.protocol?.startsWith("ws")
          ? mqttAsync.connectAsync(
              `${opts.protocol}://${opts.host}:${opts.port}`,
              opts as any
            )
          : mqttAsync.connectAsync(opts);
    
    Marin Karamihalev's avatar
    Marin Karamihalev committed
    /**
    
    Marin Karamihalev's avatar
    Marin Karamihalev committed
     * Connect to device
    
    Marin Karamihalev's avatar
    Marin Karamihalev committed
     * @param opts - Connection options
     * @param events - Optional event handlers
     * @returns A set of functions for interacting with the device
    
    Marin Karamihalev's avatar
    Marin Karamihalev committed
     */
    
    const connect: Connect = async (options, events) => {
    
      const subscribeEndpoint =
        options.subscribeEndpoint || defaultSubscribeEndpoint;
      const publishEndpoint = options.publishEndpoint || defaultPublishEndpoint;
    
      const idEndpoint = options.idEndpoint || defaultIdEndpoint;
    
    
      const router = makeRouter();
      const callbackRouter = makeCallbackRouter();
    
      const handleError = (err: any) =>
        events && events.onError && events.onError(err);
    
      callbackRouter.add("error", handleError);
    
      const client = await _connect(options);
    
      const handleInit = () =>
        new Promise<string>((resolve, reject) => {
          const id = setTimeout(() => reject({ errMsg: `toId was not received within timeout(${idResolveTimeout})` }), idResolveTimeout)
          client.on("message", (_topic, data: any) => {
            clearTimeout(id);
    
            resolve(decodeId(data));
          });
          client.subscribe(idEndpoint);
        });
    
      const toId = options.toId || await handleInit();
      const fromId = options.fromId || defaultFromId;
    
    
      client.on("message", (_topic, data: any) => {
    
        const parsedMsg = readMsg(data);
        const [id, message, err] = decode(parsedMsg);
    
        const call = router.get(id);
        if (call && call.resolve && call.resolve) {
          if (err) call.reject(err);
          else call.resolve(message);
        }
    
        const cbs = callbackRouter.get(id);
    
        cbs.forEach((cb) => {
          if (message) cb(message, parsedMsg);
          else if (err) cb(err, parsedMsg);
        });
    
      client.on("error", (err) => {
    
        callbackRouter.get("error").forEach((cb) => cb(err));
    
        handleError(JSON.stringify(err, null, 2));
      });
    
      const on: OnFn = (ident, callback) => {
        callbackRouter.add(ident, callback);
        return () => {
          callbackRouter.del(ident);
        };
      };
    
      
      const encode = makeEncode({ fromId, toId });
    
      const call: CallFn = (command, args): any =>
        new Promise((resolve, reject) => {
          const [id, msg, err] = encode(command, args);
          if (err) reject(err);
          else {
            router.add(id, { resolve, reject });
            client.publish(publishEndpoint, msg);
          }
    
      return {
        get: (paths) => call("GET", { paths }),
        set: (path, value) => call("SET", { path, value }),
        add: (path, value) => call("ADD", { path, value }),
        del: (paths, allowPartial) => call("DELETE", { paths, allowPartial }),
        instances: (paths, opts) => call("GET_INSTANCES", { paths, opts }),
        supportedDM: (paths, opts) => call("GET_SUPPORTED_DM", { paths, opts }),
        supportedProto: (versions) => call("GET_SUPPORTED_PROTO", { versions }),
        on,
        ...makeRecipes(call, on),
        disconnect: () => client.end(),
      };