diff --git a/README.md b/README.md
index 9adcc1e84e99de20306038bfbbb702372305130a..558b0a0f4b5b0e9bbfee1b812ed772bb60ee2f3d 100644
--- a/README.md
+++ b/README.md
@@ -98,7 +98,9 @@ const usp = await connect(options);
 
   - operate with arguments (for required args check USP Reference)
   ```javascript
-  await usp.operate("Device.IP.Diagnostics.IPPing()", { Host: "iopsys.eu" });
+  const [ping, cleanPing] = await usp.operate("Device.IP.Diagnostics.IPPing()");
+  const results = await ping({ Host: "iopsys.eu" })
+  await cleanPing() // clears ping subscription (optional)
   ```
 
 
diff --git a/src/index.ts b/src/index.ts
index ae8ca6370e7c9ac36626842435629106fb0cf313..5b398c964b853ced93bda51829155c1852e895aa 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -84,7 +84,7 @@ const isPromiseResult = (res: any): res is PromiseResult =>
 /**
  * Set of commands which have a return value
  */
-const commandsWithReturnValue = ["get", "operate", "add"];
+const commandsWithReturnValue = ["get", "notify", "add"];
 
 /**
  * Transform USPMessage to expected response type
@@ -135,19 +135,41 @@ const makeDel = (send: SendFn) => (path: string | string[]) =>
     path
   ) as Promise<void>;
 
+const operateSubscriptionPath = "Device.LocalAgent.Subscription.";
+type AddFn = (path: string, input?: JSObject) => Promise<string>;
+type DelFn = (path: string | string[]) => Promise<void>;
+
 /**
  * Make operate function
  */
-const makeOperate = (send: SendFn) => (path: string, input?: JSObject) => {
+const makeOperate = (send: SendFn, add: AddFn, del: DelFn) => async (
+  path: string,
+  opts?: OperateOptions
+): Promise<[OperateFunction, OperateCleanupFunction]> => {
+  const Persistent = opts?.Persistent === undefined ? false : opts.Persistent;
+  const ID = opts?.ID || `${path}_${Date.now()}`;
   const action = path.split(".").pop() || "";
-  return handleSend(
-    send("operate", {
-      path: path.replace(action, ""),
-      action,
-      input,
-    }),
-    path
-  ) as Promise<JSType>;
+  const operateInput = {
+    Enable: true,
+    ID,
+    NotifType: "OperationComplete",
+    ReferenceList: path,
+    Persistent,
+  };
+  const newSubPath = await add(operateSubscriptionPath, operateInput);
+  const command = (input?: JSObject) =>
+    handleSend(
+      send("operate", {
+        path: path.replace(action, ""),
+        action,
+        input,
+        operateID: ID,
+      }),
+      path
+    ) as Promise<JSType>;
+
+  const cleanup = () => del(newSubPath)
+  return [command, cleanup];
 };
 
 /**
@@ -175,11 +197,7 @@ const connect = async (
       if (isPromiseResult(handle)) {
         if (isError(resp)) handle.reject(resp);
         else handle.resolve(resp);
-      } else
-        console.error(
-          "Unknown message received",
-          JSON.stringify(resp, null, 2)
-        );
+      }
     },
   });
 
@@ -213,14 +231,16 @@ const connect = async (
     .catch(throwError);
 
   const get = makeGet(send);
+  const add = makeAdd(send);
+  const del = makeDel(send);
   return {
     id: () => sessionID,
     roles: () => roles,
     get,
     set: makeSet(send),
-    operate: makeOperate(send),
-    add: makeAdd(send),
-    del: makeDel(send),
+    add,
+    del,
+    operate: makeOperate(send, add, del),
     disconnect: protocol.close,
     resolve: async (data: JSType, level = 1) =>
       await resolveReferences(data, get, level),
diff --git a/src/protocol/index.ts b/src/protocol/index.ts
index 180b026f029450ab8ea94fdabdcdfcce5b805d13..5283a56bea452b29b46c300ebfacb69ddb5393b1 100644
--- a/src/protocol/index.ts
+++ b/src/protocol/index.ts
@@ -1,4 +1,5 @@
 import mqttAsync from "async-mqtt";
+import { parse } from "protobufjs";
 import * as messages from "./js-usp-protobuf/protoMessage";
 import { search, searchAll, unflatten, unwrapArray, unwrapObject } from "./lib";
 
@@ -36,11 +37,12 @@ const encodeSet = (path: string, value: any): MQTTRequest => {
 const encodeOperate = (
   path: string,
   action: string,
+  operateID: string,
   input?: object
 ): MQTTRequest => {
-  const [data, id] = messages.makeOperateMessage(path + action, input);
+  const [data] = messages.makeOperateMessage(path + action, input);
   return {
-    id,
+    id: operateID,
     data,
   };
 };
@@ -48,6 +50,7 @@ const encodeOperate = (
 /** Extract message type */
 const extractType = (msg: any): MessageType => {
   const msgType: string = search(msg, "msgType");
+  if (!msgType) return "get";
   const command = msgType.split("_")[0];
   return command ? (command.toLowerCase() as MessageType) : "get";
 };
@@ -68,9 +71,10 @@ const decodeGet = (msg: any, id: string): GetMessage => {
   };
 };
 
+const ignorableTypes = ["operate"];
+
 /** Decode message */
-const decode = (rawMsg: any): USPMessage => {
-  const msg = JSON.parse(JSON.stringify(rawMsg)); // forces conversions
+const decode = (msg: any, type: MessageType): USPMessage => {
   const id = search(msg, "msgId");
   const path = search(msg, "requestedPath") || "";
   const errMsg = search(msg, "errMsg");
@@ -84,7 +88,6 @@ const decode = (rawMsg: any): USPMessage => {
       path,
     } as ErrorMessage;
 
-  const type: MessageType = extractType(msg);
   const baseMsg: BaseMessage = {
     type,
     path,
@@ -95,16 +98,16 @@ const decode = (rawMsg: any): USPMessage => {
   if (type === "add") {
     const updatedPath = search(msg, "instantiatedPath");
     return { ...baseMsg, data: updatedPath } as AddMessage;
-  }
-  if (type === "delete")
+  } else if (type === "delete")
     return { ...baseMsg, data: { deleted: true } } as DeleteMessage;
-  if (type === "set")
+  else if (type === "set")
     return { ...baseMsg, data: { updated: true } } as SetMessage;
-  if (type === "operate") {
+  else if (type === "notify") {
     return {
       ...baseMsg,
-      data: search(msg, "operationResults"),
-    } as OperateMessage;
+      id: search(msg, "subscriptionId"),
+      data: search(msg, "outputArgs"),
+    } as NotifyMessage;
   }
 
   return decodeGet(msg, id.toString());
@@ -112,7 +115,9 @@ const decode = (rawMsg: any): USPMessage => {
 
 /** Encode add message */
 const encodeAdd = (path: string, input?: JSObject): MQTTRequest => {
-  const pairs = input ? Object.entries(input) : [];
+  const pairs = input
+    ? Object.entries(input).map(([k, v]) => [k, v.toString()])
+    : [];
   const [data, id] = messages.makeAddMessage(path, pairs);
   return {
     id,
@@ -136,7 +141,12 @@ const encode = (
 ): { data: any; id: string } => {
   switch (command) {
     case "operate":
-      return encodeOperate(args.path || "", args.action || "", args.input);
+      return encodeOperate(
+        args.path || "",
+        args.action || "",
+        args.operateID || "",
+        args.input
+      );
     case "set":
       return encodeSet(args.path || "", args.value);
     case "add":
@@ -156,7 +166,7 @@ const connect = async (opts: mqttAsync.IClientOptions) =>
         opts
       )
     : mqttAsync.connectAsync(opts);
-    
+
 /** Generate mqtt protocol */
 const mqtt = (
   opts: mqttAsync.IClientOptions,
@@ -170,7 +180,10 @@ const mqtt = (
 
     client.on("message", (_topic: string, message: any) => {
       const msg = messages.decode(message);
-      events.onMessage(decode(msg));
+      const parsedMsg = JSON.parse(JSON.stringify(msg)); // forces conversions
+      const type: MessageType = extractType(parsedMsg);
+      if (!ignorableTypes.includes(type))
+        events.onMessage(decode(parsedMsg, type));
     });
 
     return {
diff --git a/src/testy.ts b/src/testy.ts
index 15c6df1eb9de4772d11a587028a22d8a6c9163c5..f7ef3856b3151c8dcde91a649fb4a7a77cd5a42c 100644
--- a/src/testy.ts
+++ b/src/testy.ts
@@ -1,4 +1,4 @@
-import connect from "./index"
+import connect from "./index";
 
 const run = async () => {
   const device = await connect({
@@ -7,46 +7,54 @@ const run = async () => {
     password: "admin",
   });
 
-  const addResp = await device.add("Device.NAT.PortMapping.", {
-    Description: "webserver1-set",
-    ExternalPort: "80",
-    Protocol: "TCP",
-    Interface: "Device.IP.Interface.1",
-    Enable: "0",
-    InternalClient: "192.168.2.125",
-    InternalPort: "5000",
-  });
-  console.log(" >>> ADD RESPONSE <<<");
-  console.log(JSON.stringify(addResp, null, 2));
-
-  const setResp = await device.set("Device.NAT.PortMapping.1.", {
-    "Description": "webserver1-set",
-    "ExternalPort": "80",
-    "Protocol": "TCP",
-    "Interface": "Device.IP.Interface.1",
-    "Enable": "0",
-    "InternalClient": "192.168.2.125",
-    "InternalPort": "5000"
-  }).catch(console.error)
-  console.log(' >>> SET RESPONSE <<<')
-  console.log(JSON.stringify(setResp, null, 2))
+  // const addResp = await device.add("Device.NAT.PortMapping.", {
+  //   Description: "webserver1-set",
+  //   ExternalPort: "80",
+  //   Protocol: "TCP",
+  //   Interface: "Device.IP.Interface.1",
+  //   Enable: "0",
+  //   InternalClient: "192.168.2.125",
+  //   InternalPort: "5000",
+  // });
+  // console.log(" >>> ADD RESPONSE <<<");
+  // console.log(JSON.stringify(addResp, null, 2));
+
+  // const setResp = await device.set("Device.NAT.PortMapping.1.", {
+  //   "Description": "webserver1-set",
+  //   "ExternalPort": "80",
+  //   "Protocol": "TCP",
+  //   "Interface": "Device.IP.Interface.1",
+  //   "Enable": "0",
+  //   "InternalClient": "192.168.2.125",
+  //   "InternalPort": "5000"
+  // }).catch(console.error)
+  // console.log(' >>> SET RESPONSE <<<')
+  // console.log(JSON.stringify(setResp, null, 2))
 
   // const delResp = await device.del(["Device.NAT.PortMapping.2.", "Device.NAT.PortMapping.3."])
   // console.log(' >>> DEL RESPONSE <<<')
   // console.log(JSON.stringify(delResp, null, 2))
 
-  const getResp = await device.get("Device.DeviceInfo.Description");
-  console.log(" >>> GET RESPONSE <<<");
-  console.log(JSON.stringify(getResp, null, 2));
-
-  const oprResp = await device.operate("Device.IP.Diagnostics.IPPing()", { Host: "google.com" })
-  console.log(' >>> OPR RESPONSE <<<')
-  console.log(JSON.stringify(oprResp, null, 2))
+  // await device.operate("Device.IP.Diagnostics.IPPing()", { Host: "iopsys.eu" }).then(console.log);
+  // const addResp = await device.add("Device.LocalAgent.Subscription.", {
+  //   Enable: true,
+  //   ID: "ipping_sub_4",
+  //   NotifType: "OperationComplete",
+  //   ReferenceList: "Device.IP.Diagnostics.IPPing()",
+  //   Persistent: true,
+  // });
+  // console.log({ addResp });
+  // const [ping, cleanPing] = await device.operate("Device.IP.Diagnostics.IPPing()")
+  // await ping({ Host: "iopsys.eu" }).then(console.log)
+  // await cleanPing()
+  // await ping({ Host: "iopsys.eu" }).then(console.log).catch(console.error)
+  
+
+  // const oprResp = await device.operate("Device.IP.Diagnostics.IPPing()", { Host: "google.com" })
+  // console.log(' >>> OPR RESPONSE <<<')
+  // console.log(JSON.stringify(oprResp, null, 2))
 
   await device.disconnect();
 };
 
 run().catch(console.error);
-
-// Notes:
-//  - add with input arg adds new object, does not change values to input arg
diff --git a/src/types.d.ts b/src/types.d.ts
index 5e51ad984ae4d2a09e80c74997470f6e4ed287da..45493030543bfafd5c20a1e2827c86633b04cbb5 100644
--- a/src/types.d.ts
+++ b/src/types.d.ts
@@ -17,41 +17,46 @@ interface ConnectEvents {
 }
 /** Device API */
 interface Device {
-  /** 
+  /**
    * Get value at path
    * @param path Location of value (e.g. "Device.DeviceInfo.")
    * ```
    * await usp.get("Device.WiFi.Radio.1.")
-   * // or 
+   * // or
    * await usp.get(["Device.WiFi.Radio.1.", "Device.WiFi.Radio.2."])
    * ```
    */
   get: (path: string | string[]) => Promise<JSType>;
 
-   /** 
+  /**
    * Set value at path
    * @param path Location of value (e.g. "Device.DeviceInfo.")
    * @param value Value to assign
    * ```
    * await usp.set("Device.WiFi.Radio.1.", { Name: "radio-1" })
-   * // or 
+   * // or
    * await usp.set("Device.WiFi.Radio.1.Name", "radio-1")
    * ```
    */
   set: (path: string, value: JSType) => Promise<void>;
 
-  
-   /** 
-   * Execute a command
-   * @param path Full path of command (e.g. "Device.SelfTestDiagnostics()")
-   * @param input Optional input to the command
+  /**
+   * Create a command
+   * @param path Full path of command (e.g. "Device.IP.Diagnostics.IPPing()")
+   * @param opts Subscription options (not required)
+   * @returns Function that executes command
    * ```
-   * await usp.operate("Device.SelfTestDiagnostics()")
+   * const [ping, cleanPing] = await usp.operate("Device.IP.Diagnostics.IPPing()")
+   * const results = await ping({ Host: "iopsys.eu" })
+   * await cleanPing()
    * ```
    */
-  operate: (path: string, input?: JSObject) => Promise<JSType>;
+  operate: (
+    path: string,
+    opts?: OperateOptions
+  ) => Promise<[OperateFunction, OperateCleanupFunction]>;
 
-  /** 
+  /**
    * Add object to path
    * @param path Path to add to (e.g. "Device.NAT.PortMapping.")
    * @param values Optional object to add (if skipped will use default values)
@@ -62,7 +67,7 @@ interface Device {
    */
   add: (path: string, values?: JSObject) => Promise<string>;
 
-  /** 
+  /**
    * Delete object at path
    * @param path Full path to delete (e.g. "Device.NAT.PortMapping.1.")
    * ```
@@ -71,7 +76,7 @@ interface Device {
    */
   del: (path: string | string[]) => Promise<void>;
 
-  /** 
+  /**
    * Resolve references in message
    * @param msg Message with reference in it
    * @param level Optional level of nesting to resolve to (avoid using high numbers)
@@ -81,7 +86,7 @@ interface Device {
    */
   resolve: (msg: JSType, level: number = 1) => Promise<JSType>;
 
-  /** 
+  /**
    * Disconenct from device
    * ```
    * await usp.disconnect()
@@ -96,6 +101,19 @@ interface Device {
   roles: () => Role[];
 }
 
+/** Options for Operate Subscription */
+interface OperateOptions {
+  /** Subscription ID (optional) */
+  ID?: string;
+  /** Should subscription remain through sessions (optional) */
+  Persistent?: boolean;
+}
+
+/** Executes a command */
+type OperateFunction = (input?: JSObject) => Promise<JSType>;
+/** Cleans up command subscription */
+type OperateCleanupFunction = () => Promise<void>;
+
 /** Arguments for encoding commands */
 interface EncodeArgs {
   path: string;
@@ -103,6 +121,7 @@ interface EncodeArgs {
   value: JSType;
   input: JSObject;
   action: string;
+  operateID: string;
 }
 
 /** Session role */
@@ -110,7 +129,7 @@ type Role = "admin" | "user" | "none";
 /** Representation of JS object */
 type JSObject = { [key: string]: JSType };
 /** USP Commands */
-type CommandType = "get" | "set" | "operate" | "add" | "delete";
+type CommandType = "get" | "set" | "operate" | "add" | "delete" | "notify";
 /** Message types */
 type MessageType = CommandType | "error";
 /** Represents js types */
@@ -152,6 +171,11 @@ interface OperateMessage extends BaseMessage {
   type: "operate";
   data: JSValue;
 }
+/** Notify Message */
+interface NotifyMessage extends BaseMessage {
+  type: "notify";
+  data: JSValue;
+}
 /** Add Message */
 interface AddMessage extends BaseMessage {
   type: "add";
@@ -172,6 +196,7 @@ type USPMessage =
   | OperateMessage
   | AddMessage
   | DeleteMessage
+  | NotifyMessage
   | ErrorMessage;
 
 /** Protocol event handlers */
diff --git a/tests/integration/index.test.js b/tests/integration/index.test.js
index f965c8df735101270e8e51926a935b159ebfa0f8..197304ee9f0778dd098680f7bd69f0c672ba47dc 100644
--- a/tests/integration/index.test.js
+++ b/tests/integration/index.test.js
@@ -48,7 +48,13 @@ describe("Test general API", () => {
       .catch((err) => assert.strictEqual(err.type, "error"));
   });
 
-  // OPERATE - WIP
+  // OPERATE
+
+  it("operate creates a working command", async () => {
+    const [ping] = await device.operate("Device.IP.Diagnostics.IPPing()")
+    const results = await ping({ Host: "iopsys.eu" })
+    assert.strictEqual(typeof results, "object")
+  })
 
   // ADD / DELETE