Newer
Older
import mqttAsync from "async-mqtt";
Marin Karamihalev
committed
import { decode, decodeId, makeEncode, makeRecipes, readMsg } from "./commands";
import {
ConnectionOptions,
CallFn,
URLConnectionOptions,
OnFn,
} from "./types";
import { makeCallbackRouter, makeRouter } from "./util";
Marin Karamihalev
committed
const defaultPublishEndpoint = "/usp/endpoint";
const defaultSubscribeEndpoint = "/usp/controller";
Marin Karamihalev
committed
const defaultIdEndpoint = "obuspa/EndpointID";
const defaultFromId = "proto::interop-usp-controller";
const idResolveTimeout = 5000;
const isURL = (opts: ConnectionOptions): opts is URLConnectionOptions =>
"url" in opts;
const _connect = (opts: ConnectionOptions) => {
if (isURL(opts)) return mqttAsync.connectAsync(opts.url, opts as any);
else
return opts.protocol?.startsWith("ws")
? mqttAsync.connectAsync(
`${opts.protocol}://${opts.host}:${opts.port}`,
opts as any
)
: mqttAsync.connectAsync(opts);
* @param opts - Connection options
* @param events - Optional event handlers
* @returns A set of functions for interacting with the device
const connect: Connect = async (options, events) => {
const subscribeEndpoint =
options.subscribeEndpoint || defaultSubscribeEndpoint;
const publishEndpoint = options.publishEndpoint || defaultPublishEndpoint;
Marin Karamihalev
committed
const idEndpoint = options.idEndpoint || defaultIdEndpoint;
const router = makeRouter();
const callbackRouter = makeCallbackRouter();
Marin Karamihalev
committed
const handleError = (err: any) =>
events && events.onError && events.onError(err);
callbackRouter.add("error", handleError);
Marin Karamihalev
committed
const client = await _connect(options);
const handleInit = () =>
new Promise<string>((resolve, reject) => {
const id = setTimeout(() => reject({ errMsg: `toId was not received within timeout(${idResolveTimeout})` }), idResolveTimeout)
client.on("message", (_topic, data: any) => {
clearTimeout(id);
client.unsubscribe(idEndpoint);
Marin Karamihalev
committed
resolve(decodeId(data));
});
client.subscribe(idEndpoint);
});
const toId = options.toId || await handleInit();
const fromId = options.fromId || defaultFromId;
client.on("message", (_topic, data: any) => {
const parsedMsg = readMsg(data);
const [id, message, err] = decode(parsedMsg);
const call = router.get(id);
if (call && call.resolve && call.resolve) {
if (err) call.reject(err);
else call.resolve(message);
}
const cbs = callbackRouter.get(id);
cbs.forEach((cb) => {
if (message) cb(message, parsedMsg);
else if (err) cb(err, parsedMsg);
});
client.on("error", (err) => {
callbackRouter.get("error").forEach((cb) => cb(err));
handleError(JSON.stringify(err, null, 2));
});
const on: OnFn = (ident, callback) => {
callbackRouter.add(ident, callback);
return () => {
callbackRouter.del(ident);
};
};
Marin Karamihalev
committed
const encode = makeEncode({ fromId, toId });
const call: CallFn = (command, args): any =>
new Promise((resolve, reject) => {
const [id, msg, err] = encode(command, args);
if (err) reject(err);
else {
router.add(id, { resolve, reject });
client.publish(publishEndpoint, msg);
}
});
Marin Karamihalev
committed
await client.subscribe(subscribeEndpoint);
return {
get: (paths) => call("GET", { paths }),
set: (path, value) => call("SET", { path, value }),
add: (path, value) => call("ADD", { path, value }),
del: (paths, allowPartial) => call("DELETE", { paths, allowPartial }),
instances: (paths, opts) => call("GET_INSTANCES", { paths, opts }),
supportedDM: (paths, opts) => call("GET_SUPPORTED_DM", { paths, opts }),
supportedProto: (versions) => call("GET_SUPPORTED_PROTO", { versions }),
on,
...makeRecipes(call, on),
disconnect: () => client.end(),
};
Marin Karamihalev
committed
};
Marin Karamihalev
committed
export default connect;