Skip to content
Snippets Groups Projects
Commit 4442fd88 authored by Marin Karamihalev's avatar Marin Karamihalev
Browse files

skip session init if session id is provided, readMsg error capture, disconnect message capture

parent 2d5526d1
Branches
Tags
No related merge requests found
Pipeline #96834 passed
...@@ -59,19 +59,31 @@ export const decodeRecord = (proto: Proto, data: any): Record<string, any> => { ...@@ -59,19 +59,31 @@ export const decodeRecord = (proto: Proto, data: any): Record<string, any> => {
return JSON.parse(JSON.stringify(decodedRecord)); return JSON.parse(JSON.stringify(decodedRecord));
}; };
export const readMsg = (proto: Proto, data: any): Record<string, any> => { export const readMsg = (
proto: Proto,
data: any
): [Record<string, any>, unknown, boolean] => {
const record = proto.rootRecord.lookupType("usp_record.Record"); const record = proto.rootRecord.lookupType("usp_record.Record");
const decodedRecord = record.decode( const decodedRecord = record.decode(
"binaryData" in data ? data.binaryData : data "binaryData" in data ? data.binaryData : data
); );
const msg = proto.rootMsg.lookupType("usp.Msg");
const decodedMsg = msg.decode(
decodedRecord.noSessionContext?.payload ||
decodedRecord.sessionContext?.payload[0]
);
// forces conversion const convertedDecodedRecord = JSON.parse(JSON.stringify(decodedRecord))
return JSON.parse(JSON.stringify(decodedMsg)); if ("disconnect" in convertedDecodedRecord)
return [{}, convertedDecodedRecord.disconnect, true];
try {
const msg = proto.rootMsg.lookupType("usp.Msg");
const decodedMsg = msg.decode(
decodedRecord.noSessionContext?.payload ||
decodedRecord.sessionContext?.payload[0]
);
// forces conversion
return [JSON.parse(JSON.stringify(decodedMsg)), null, false];
} catch (err) {
return [{}, err, false];
}
}; };
type DecodeFn = (parsedMsg: any, version: USPVersion) => DecodeResponse; type DecodeFn = (parsedMsg: any, version: USPVersion) => DecodeResponse;
...@@ -175,9 +187,9 @@ export const makeEncode = ...@@ -175,9 +187,9 @@ export const makeEncode =
}; };
export const makeSession = (sessionId: number | null): USPSession => ({ export const makeSession = (sessionId: number | null): USPSession => ({
sessionId: sessionId || 0, sessionId: sessionId || 1,
sequenceId: 0, sequenceId: 1,
expectedId: 0, expectedId: 1,
}); });
export const hasSessionDisconnectError = (msg: Record<string, any>) => export const hasSessionDisconnectError = (msg: Record<string, any>) =>
......
...@@ -132,7 +132,7 @@ const makeDeferred = () => { ...@@ -132,7 +132,7 @@ const makeDeferred = () => {
return { resolve, reject, promise }; return { resolve, reject, promise };
}; };
const initializeSession = async ({ const initializeSession = ({
client, client,
proto, proto,
bufferOptions, bufferOptions,
...@@ -143,7 +143,7 @@ const initializeSession = async ({ ...@@ -143,7 +143,7 @@ const initializeSession = async ({
}) => { }) => {
const deferred = makeDeferred(); const deferred = makeDeferred();
let sessionInitTimeoutId: any = 0; let sessionInitTimeoutId: any = 0;
let sessionId = -1; let sessionId = 1;
client.on("message", (_topic, data) => { client.on("message", (_topic, data) => {
const decodedMsg = decodeRecord(proto, data); const decodedMsg = decodeRecord(proto, data);
...@@ -163,15 +163,15 @@ const initializeSession = async ({ ...@@ -163,15 +163,15 @@ const initializeSession = async ({
true true
); );
await client.subscribe(subscribeEndpoint); client.subscribe(subscribeEndpoint);
await client.publish(publishEndpoint, sessionMessage); client.publish(publishEndpoint, sessionMessage);
sessionInitTimeoutId = setTimeout(() => { sessionInitTimeoutId = setTimeout(() => {
deferred.reject(); deferred.reject();
throw `Session initalization timed out(${defaultSessionInitTimeout}ms)`; throw `Session initalization timed out(${defaultSessionInitTimeout}ms)`;
}, defaultSessionInitTimeout); }, defaultSessionInitTimeout);
await deferred.promise; return deferred.promise;
}; };
const buildConnect: BuildConnectionFn = const buildConnect: BuildConnectionFn =
...@@ -234,20 +234,29 @@ const buildConnect: BuildConnectionFn = ...@@ -234,20 +234,29 @@ const buildConnect: BuildConnectionFn =
options.useSession || typeof options.sessionId === "number"; options.useSession || typeof options.sessionId === "number";
let sessionOptions: USPSession = makeSession(options.sessionId || null); let sessionOptions: USPSession = makeSession(options.sessionId || null);
let encode = makeEncode(proto, bufferOptions, useSession); let encode = makeEncode(proto, bufferOptions, useSession);
const sessionReqs = {
client,
proto,
bufferOptions,
sessionOptions,
version,
publishEndpoint,
subscribeEndpoint,
};
if (useSession) await initializeSession(sessionReqs); if (useSession && !options.sessionId)
await initializeSession({
client,
proto,
bufferOptions,
sessionOptions,
version,
publishEndpoint,
subscribeEndpoint,
});
client.on("message", (_topic, data: any) => { client.on("message", (_topic, data: any) => {
const parsedMsg = readMsg(proto, data || _topic); const [parsedMsg, parseErr, isDisconnect] = readMsg(
proto,
data || _topic
);
if (isDisconnect) throw parseErr;
if (parseErr) {
handleError(JSON.stringify(parseErr, null, 2));
return;
}
const [id, message, err, cmdType] = decode(parsedMsg, version); const [id, message, err, cmdType] = decode(parsedMsg, version);
if (typeof id !== "string") { if (typeof id !== "string") {
handleError( handleError(
...@@ -306,6 +315,7 @@ const buildConnect: BuildConnectionFn = ...@@ -306,6 +315,7 @@ const buildConnect: BuildConnectionFn =
// todo make looky nice // todo make looky nice
sessionOptions.sequenceId++; sessionOptions.sequenceId++;
sessionOptions.expectedId = sessionOptions.sequenceId; sessionOptions.expectedId = sessionOptions.sequenceId;
const [id, msg, err] = encode(command, version, args, sessionOptions); const [id, msg, err] = encode(command, version, args, sessionOptions);
if (err) reject(err); if (err) reject(err);
else { else {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment