Newer
Older
import {
decode,
decodeWithOptions,
makeEncode,
makeRecipes,
readMsg,
} from "../commands";
import {
CallFn,
OnFn,
Options,
USP,
Command,
Proto,
BuildConnectionFn,
} from "../types";
import { makeCallbackRouter, makeRouter } from "../util";
const defaultPublishEndpoint = "/usp/endpoint";
const defaultSubscribeEndpoint = "/usp/controller";
const defaultIdEndpoint = "obuspa/EndpointID";
const defaultFromId = "proto::interop-usp-controller";
const defaultConnectionTimeout = 5000;
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
const idResolveTimeout = 5000;
const fixId = (s: string) => s.split("+").join("%2B");
const wait = (ms: number) => {
let waitId;
let waitPromise = new Promise((_, reject) => {
waitId = setTimeout(() => {
reject(`timeout of ${ms}ms reached`);
}, ms);
});
return [waitId, waitPromise];
};
const timeoutWrapper = (promise: () => Promise<any>, ms?: number) => {
if (typeof ms === "number" && ms >= 0) {
const [waitId, waitPromise] = wait(ms);
return Promise.race([
promise().then((res) => {
clearTimeout(waitId);
return res;
}),
waitPromise,
]);
} else return promise();
};
const wrap = <T extends (...args: any[]) => any>(
opts: Options,
cmdName: keyof USP,
cmd: T
): T => {
return <T>((...args: any[]) => {
// Promise.resolve added to handle non-promise commands
const cmdCall = () => Promise.resolve(cmd(...args));
const finalCall = () =>
timeoutWrapper(cmdCall, opts.timeout)
.then((res) => {
opts.postCall && opts.postCall(cmdName, args, res);
return res;
})
.catch((err) => {
opts.postCall && opts.postCall(cmdName, args, err);
throw err;
});
if (opts.preCall) opts.preCall(cmdName, args);
return finalCall();
});
};
const addOptions = (usp: Partial<USP>, opts: Options): USP => {
const mainUSP = Object.entries(usp).reduce(
(acc: Partial<USP>, [k, fn]): Partial<USP> => ({
...acc,
[k]: wrap(opts, k as keyof USP, fn as Command),
}),
{} as any
) as USP;
mainUSP.options = (opts) => addOptions(mainUSP, opts);
return mainUSP;
};
const buildConnect: BuildConnectionFn =
({ connectClient, decodeID, loadProtobuf }) =>
async (options, events) => {
const subscribeEndpoint =
options.subscribeEndpoint || defaultSubscribeEndpoint;
const publishEndpoint = options.publishEndpoint || defaultPublishEndpoint;
const idEndpoint = options.idEndpoint || defaultIdEndpoint;
const proto: Proto = await loadProtobuf();
const router = makeRouter();
const callbackRouter = makeCallbackRouter();
const handleError = (err: any) =>
events && events.onError && events.onError(err);
const handleOffline = () =>
events && events.onOffline && events.onOffline();
const handleReconnect = () =>
events && events.onReconnect && events.onReconnect();
const handleClose = () => events && events.onClose && events.onClose();
callbackRouter.add("error", handleError);
const client = await connectClient(options);
const handleInit = () =>
new Promise<string>((resolve, reject) => {
const id = setTimeout(
() =>
reject({
errMsg: `toId was not received within timeout(${idResolveTimeout})`,
}),
idResolveTimeout
);
client.on("message", (topic, data: any) => {
clearTimeout(id);
client.unsubscribe(idEndpoint);
resolve(decodeID(data || topic));
});
client.subscribe(idEndpoint);
});
const toId = fixId(options.toId || (await handleInit()));
const fromId = options.fromId || defaultFromId;
client.on("message", (_topic, data: any) => {
const parsedMsg = readMsg(proto, data || _topic);
const [id, message, err, cmdType] = decode(parsedMsg);
handleError(
`Could not locate id for message:\n${JSON.stringify(
parsedMsg,
null,
2
)}`
);
} else {
const call = router.get(id, cmdType || "NOTIFY");
if (call && call.resolve && call.reject) {
if (err) call.reject(err);
else if (call.options) {
const messageAfterOptions = decodeWithOptions(
parsedMsg,
cmdType,
call.options
);
call.resolve(messageAfterOptions);
} else call.resolve(message);
}
const cbs = callbackRouter.get(id);
cbs.forEach((cb) => {
if (message) cb(message, parsedMsg);
else if (err) cb(err, parsedMsg);
});
let reconnectCount = 0;
const isTrackingReconnects =
typeof options.reconnectsBeforeClosing === "number";
client.on("error", (err) => {
callbackRouter.get("error").forEach((cb) => cb(err));
handleError(JSON.stringify(err, null, 2));
});
client.on("offline", handleOffline);
client.on("reconnect", () => {
handleReconnect();
if (isTrackingReconnects) reconnectCount++;
});
client.on("close", () => {
handleClose();
if (options.closeOnDisconnect === true) client.end();
if (
isTrackingReconnects &&
reconnectCount > (options.reconnectsBeforeClosing as number)
)
client.end();
});
const on: OnFn = (ident, callback) => {
callbackRouter.add(ident, callback);
return () => {
callbackRouter.del(ident);
};
};
const encode = makeEncode(proto, { fromId, toId });
const call: CallFn = (command, args, callOpts) =>
new Promise((resolve, reject) => {
const [id, msg, err] = encode(command, args);
if (err) reject(err);
else {
router.add(id, callOpts?.responseMsgType || command, {
resolve,
reject,
});
client.publish(publishEndpoint, msg);
}
});
await client.subscribe(subscribeEndpoint);
const baseUSP: Partial<USP> = {
get: (paths, options) => call("GET", { paths, options }),
set: (path, value) => call("SET", { path, value }),
add: (path, value) => call("ADD", { path, value }),
del: (paths, allowPartial) => call("DELETE", { paths, allowPartial }),
instances: (paths, opts) => call("GET_INSTANCES", { paths, opts }),
supportedDM: (paths, opts) => call("GET_SUPPORTED_DM", { paths, opts }),
supportedProto: (versions) => call("GET_SUPPORTED_PROTO", { versions }),
_operate: (path, id, resp, input) =>
call("OPERATE", { path, input, id, resp }),
on,
...makeRecipes(call, on),
disconnect: () => client.end(),
};
return {
...baseUSP,
options: (opts) => addOptions(baseUSP, opts),
} as USP;
};
export default buildConnect;