From 4442fd886efd958c6e921497a50c5d06c75cd4a6 Mon Sep 17 00:00:00 2001
From: Marin Karamihalev <marin.karamihalev@iopsys.eu>
Date: Sun, 7 May 2023 14:57:17 +0200
Subject: [PATCH] skip session init if session id is provided, readMsg error
 capture, disconnect message capture

---
 src/commands/index.ts       | 34 ++++++++++++++++++++----------
 src/configurations/build.ts | 42 +++++++++++++++++++++++--------------
 2 files changed, 49 insertions(+), 27 deletions(-)

diff --git a/src/commands/index.ts b/src/commands/index.ts
index 72f17d6..070bcdd 100644
--- a/src/commands/index.ts
+++ b/src/commands/index.ts
@@ -59,19 +59,31 @@ export const decodeRecord = (proto: Proto, data: any): Record<string, any> => {
   return JSON.parse(JSON.stringify(decodedRecord));
 };
 
-export const readMsg = (proto: Proto, data: any): Record<string, any> => {
+export const readMsg = (
+  proto: Proto,
+  data: any
+): [Record<string, any>, unknown, boolean] => {
   const record = proto.rootRecord.lookupType("usp_record.Record");
   const decodedRecord = record.decode(
     "binaryData" in data ? data.binaryData : data
   );
-  const msg = proto.rootMsg.lookupType("usp.Msg");
-  const decodedMsg = msg.decode(
-    decodedRecord.noSessionContext?.payload ||
-      decodedRecord.sessionContext?.payload[0]
-  );
 
-  // forces conversion
-  return JSON.parse(JSON.stringify(decodedMsg));
+  const convertedDecodedRecord = JSON.parse(JSON.stringify(decodedRecord))
+  if ("disconnect" in convertedDecodedRecord)
+    return [{}, convertedDecodedRecord.disconnect, true];
+
+  try {
+    const msg = proto.rootMsg.lookupType("usp.Msg");
+    const decodedMsg = msg.decode(
+      decodedRecord.noSessionContext?.payload ||
+        decodedRecord.sessionContext?.payload[0]
+    );
+
+    // forces conversion
+    return [JSON.parse(JSON.stringify(decodedMsg)), null, false];
+  } catch (err) {
+    return [{}, err, false];
+  }
 };
 
 type DecodeFn = (parsedMsg: any, version: USPVersion) => DecodeResponse;
@@ -175,9 +187,9 @@ export const makeEncode =
   };
 
 export const makeSession = (sessionId: number | null): USPSession => ({
-  sessionId: sessionId || 0,
-  sequenceId: 0,
-  expectedId: 0,
+  sessionId: sessionId || 1,
+  sequenceId: 1,
+  expectedId: 1,
 });
 
 export const hasSessionDisconnectError = (msg: Record<string, any>) =>
diff --git a/src/configurations/build.ts b/src/configurations/build.ts
index 400f1ee..39f864d 100644
--- a/src/configurations/build.ts
+++ b/src/configurations/build.ts
@@ -132,7 +132,7 @@ const makeDeferred = () => {
   return { resolve, reject, promise };
 };
 
-const initializeSession = async ({
+const initializeSession = ({
   client,
   proto,
   bufferOptions,
@@ -143,7 +143,7 @@ const initializeSession = async ({
 }) => {
   const deferred = makeDeferred();
   let sessionInitTimeoutId: any = 0;
-  let sessionId = -1;
+  let sessionId = 1;
 
   client.on("message", (_topic, data) => {
     const decodedMsg = decodeRecord(proto, data);
@@ -163,15 +163,15 @@ const initializeSession = async ({
     true
   );
 
-  await client.subscribe(subscribeEndpoint);
-  await client.publish(publishEndpoint, sessionMessage);
+  client.subscribe(subscribeEndpoint);
+  client.publish(publishEndpoint, sessionMessage);
 
   sessionInitTimeoutId = setTimeout(() => {
     deferred.reject();
     throw `Session initalization timed out(${defaultSessionInitTimeout}ms)`;
   }, defaultSessionInitTimeout);
 
-  await deferred.promise;
+  return deferred.promise;
 };
 
 const buildConnect: BuildConnectionFn =
@@ -234,20 +234,29 @@ const buildConnect: BuildConnectionFn =
       options.useSession || typeof options.sessionId === "number";
     let sessionOptions: USPSession = makeSession(options.sessionId || null);
     let encode = makeEncode(proto, bufferOptions, useSession);
-    const sessionReqs = {
-      client,
-      proto,
-      bufferOptions,
-      sessionOptions,
-      version,
-      publishEndpoint,
-      subscribeEndpoint,
-    };
 
-    if (useSession) await initializeSession(sessionReqs);
+    if (useSession && !options.sessionId)
+      await initializeSession({
+        client,
+        proto,
+        bufferOptions,
+        sessionOptions,
+        version,
+        publishEndpoint,
+        subscribeEndpoint,
+      });
 
     client.on("message", (_topic, data: any) => {
-      const parsedMsg = readMsg(proto, data || _topic);
+      const [parsedMsg, parseErr, isDisconnect] = readMsg(
+        proto,
+        data || _topic
+      );
+      if (isDisconnect) throw parseErr;
+      if (parseErr) {
+        handleError(JSON.stringify(parseErr, null, 2));
+        return;
+      }
+
       const [id, message, err, cmdType] = decode(parsedMsg, version);
       if (typeof id !== "string") {
         handleError(
@@ -306,6 +315,7 @@ const buildConnect: BuildConnectionFn =
         // todo make looky nice
         sessionOptions.sequenceId++;
         sessionOptions.expectedId = sessionOptions.sequenceId;
+
         const [id, msg, err] = encode(command, version, args, sessionOptions);
         if (err) reject(err);
         else {
-- 
GitLab