Skip to content
Snippets Groups Projects
dm_exec.c 46.9 KiB
Newer Older
  • Learn to ignore specific revisions
  • William Lupton's avatar
    William Lupton committed
    /*
     *
    
     * Copyright (C) 2019-2021, Broadband Forum
     * Copyright (C) 2016-2021  CommScope, Inc
     * Copyright (C) 2020,  BT PLC
     *
    
    William Lupton's avatar
    William Lupton committed
     * Redistribution and use in source and binary forms, with or without
     * modification, are permitted provided that the following conditions
     * are met:
    
    William Lupton's avatar
    William Lupton committed
     * 1. Redistributions of source code must retain the above copyright
     *    notice, this list of conditions and the following disclaimer.
    
    William Lupton's avatar
    William Lupton committed
     * 2. Redistributions in binary form must reproduce the above copyright
     *    notice, this list of conditions and the following disclaimer in the
     *    documentation and/or other materials provided with the distribution.
    
    William Lupton's avatar
    William Lupton committed
     * 3. Neither the name of the copyright holder nor the names of its
     *    contributors may be used to endorse or promote products derived from
     *    this software without specific prior written permission.
    
    William Lupton's avatar
    William Lupton committed
     * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
     * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
     * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
     * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
    
     * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
     * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
    
    William Lupton's avatar
    William Lupton committed
     * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
     * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
     * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
    
     * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
    
    William Lupton's avatar
    William Lupton committed
     * THE POSSIBILITY OF SUCH DAMAGE.
     *
     */
    
    /**
     * \file dm_exec.c
     *
     * Main loop for data model thread
     *
     */
    
    #include <string.h>
    #include <sys/socket.h>
    #include <errno.h>
    
    #include "common_defs.h"
    #include "mtp_exec.h"
    
    Richard Holme's avatar
    Richard Holme committed
    #include "dm_exec.h"
    
    William Lupton's avatar
    William Lupton committed
    #include "data_model.h"
    #include "sync_timer.h"
    #include "cli.h"
    #include "data_model.h"
    #include "dm_access.h"
    #include "device.h"
    #include "msg_handler.h"
    #include "os_utils.h"
    #include "database.h"
    #include "dm_trans.h"
    #include "nu_ipaddr.h"
    
    Richard Holme's avatar
    Richard Holme committed
    #include "stomp.h"
    
    #include "proto_trace.h"
    #include "usp-record.pb-c.h"
    
    William Lupton's avatar
    William Lupton committed
    
    #ifdef ENABLE_COAP
    #include "usp_coap.h"
    #endif
    
    
    #ifdef ENABLE_WEBSOCKETS
    #include "wsclient.h"
    #endif
    
    
    William Lupton's avatar
    William Lupton committed
    //------------------------------------------------------------------------------
    // Unix domain socket pair used to implement a message queue
    // One socket is always used for sending, and the other always used for receiving
    static int dm_mq_sockets[2] = {-1, -1};
    
    #define mq_rx_socket  dm_mq_sockets[0]
    #define mq_tx_socket  dm_mq_sockets[1]
    
    //-------------------------------------------------------------------------
    // Type of message on data model's message queue
    typedef enum
    {
        kDmExecMsg_OperComplete,       // Sent from a thread performing an operation to signal that the operation has completed
        kDmExecMsg_OperStatus,         // Sent from a thread performing an operation to signal the new value for the status of the operation (the data model thread performs the actual update of this value)
        kDmExecMsg_EventComplete,      // Sent from a thread to signal that an event has occurred
        kDmExecMsg_ObjAdded,           // Sent from a thread to signal that an object has been added by the vendor
        kDmExecMsg_ObjDeleted,         // Sent from a thread to signal that an object has been deleted by the vendor
        kDmExecMsg_ProcessUspRecord,   // Sent from the MTP thread with a USP Record to process
        kDmExecMsg_StompHandshakeComplete, // Sent from the MTP thread to notify the controller trust role to use for all controllers connected to the specified stomp connection
        kDmExecMsg_MtpThreadExited,    // Sent to signal that the MTP thread has exited as requested by a scheduled exit
        kDmExecMsg_BdcTransferResult,  // Sent to signal that the BDC thread has sent (or failed to send) a report
    
        kDmExecMsg_MqttHandshakeComplete, // Sent from the MTP thread to notify the controller trust role to use for all controllers connected to the specified mqtt client
    
    William Lupton's avatar
    William Lupton committed
    } dm_exec_msg_type_t;
    
    
    // Operation complete parameters in data model handler message
    typedef struct
    {
        int instance;
        int err_code;
        char *err_msg;
        kv_vector_t *output_args;
    } oper_complete_msg_t;
    
    // Event complete parameters in data model message
    typedef struct
    {
        char *event_name;
        kv_vector_t *output_args;
    } event_complete_msg_t;
    
    // Operation status parameters in data model message
    typedef struct
    {
        int instance;
        char *status;
    } oper_status_msg_t;
    
    
    // Process USP Record parameters in data model message
    typedef struct
    {
        unsigned char *pbuf;
        int pbuf_len;
        ctrust_role_t role;
    
    Richard Holme's avatar
    Richard Holme committed
        mtp_reply_to_t mtp_reply_to;    // destination to send the USP message response to
    
    William Lupton's avatar
    William Lupton committed
    } process_usp_record_msg_t;
    
    // Notify controller trust role for all controllers connected to the specified STOMP connection
    typedef struct
    {
        int stomp_instance;
        ctrust_role_t role;
    } stomp_complete_msg_t;
    
    
    // Notify controller trust role for all controllers connected to the specified MQTT client
    typedef struct
    {
        int mqtt_instance;
        ctrust_role_t role;
    } mqtt_complete_msg_t;
    
    
    
    William Lupton's avatar
    William Lupton committed
    // Object added parameters in data model message
    typedef struct
    {
        char *path;
    } obj_added_msg_t;
    
    // Object deleted parameters in data model message
    typedef struct
    {
        char *path;
    } obj_deleted_msg_t;
    
    // Management IP address changed parameters in data model message
    typedef struct
    {
        char ip_addr[NU_IPADDRSTRLEN];
    } mgmt_ip_addr_msg_t;
    
    
    Richard Holme's avatar
    Richard Holme committed
    // MTP Thread exited
    typedef struct
    {
        unsigned flags;         // Bitmask indicating which thread exited
    } mtp_thread_exited_msg_t;
    
    
    William Lupton's avatar
    William Lupton committed
    // Bulk Data Collection thread sent (or failed to send) a report
    typedef struct
    {
        int profile_id;         // Instance number of profile in Device.Bulkdata.Profile.{i}
    
    Richard Holme's avatar
    Richard Holme committed
        bdc_transfer_result_t transfer_result;   // Result code of sending the report
    
    William Lupton's avatar
    William Lupton committed
    } bdc_transfer_result_msg_t;
    
    
    // Structure of data model message
    typedef struct
    {
        dm_exec_msg_type_t type;
        union
        {
            oper_complete_msg_t oper_complete;
            event_complete_msg_t event_complete;
            oper_status_msg_t oper_status;
            obj_added_msg_t obj_added;
            obj_deleted_msg_t obj_deleted;
            process_usp_record_msg_t usp_record;
            stomp_complete_msg_t stomp_complete;
    
            mqtt_complete_msg_t mqtt_complete;
    
    William Lupton's avatar
    William Lupton committed
            mgmt_ip_addr_msg_t mgmt_ip_addr;
    
    Richard Holme's avatar
    Richard Holme committed
            mtp_thread_exited_msg_t mtp_thread_exited;
    
    William Lupton's avatar
    William Lupton committed
            bdc_transfer_result_msg_t bdc_transfer_result;
        } params;
    
    William Lupton's avatar
    William Lupton committed
    } dm_exec_msg_t;
    
    //------------------------------------------------------------------------------------
    // Mutex used to protect access to this component
    // This mutex is only really necessary for an orderly shutdown, to ensure the thread isn't doing anything when we free it's memory
    static pthread_mutex_t dm_access_mutex;
    
    
    Richard Holme's avatar
    Richard Holme committed
    //------------------------------------------------------------------------------
    // Bitmask of MTP threads that have exited. Used to only shutdown the datamodel when all MTP threads have exited
    unsigned cumulative_mtp_threads_exited = 0;
    
    
    //------------------------------------------------------------------------------
    // Boolean which is set once the MTP has been connected to successfully
    // The purpose of this flag is to avoid USP notifications getting enqueued before the MTP has been connected to successfully
    // for the first time after bootup
    static bool is_notifications_enabled = false;
    
    
    William Lupton's avatar
    William Lupton committed
    //------------------------------------------------------------------------------
    // Forward declarations. Note these are not static, because we need them in the symbol table for USP_LOG_Callstack() to show them
    void UpdateSockSet(socket_set_t *set);
    void ProcessSocketActivity(socket_set_t *set);
    void ProcessMessageQueueSocketActivity(socket_set_t *set);
    
    void ProcessBinaryUspRecord(unsigned char *pbuf, int pbuf_len, ctrust_role_t role, mtp_reply_to_t *mrt);
    
    William Lupton's avatar
    William Lupton committed
    
    /*********************************************************************//**
    **
    ** DM_EXEC_Init
    **
    ** Initialises the functionality in this module
    **
    ** \param   None
    **
    ** \return  USP_ERR_OK if successful
    **
    **************************************************************************/
    int DM_EXEC_Init(void)
    {
        int err;
    
        // Exit if unable to initialize the unix domain socket pair used to implement a message queue
        err = socketpair(AF_UNIX, SOCK_DGRAM, 0, dm_mq_sockets);
        if (err != 0)
        {
    
    Richard Holme's avatar
    Richard Holme committed
            USP_ERR_ERRNO("socketpair", errno);
    
    William Lupton's avatar
    William Lupton committed
            return USP_ERR_INTERNAL_ERROR;
        }
    
        // Exit if unable to create mutex protecting access to this subsystem
        err = OS_UTILS_InitMutex(&dm_access_mutex);
        if (err != USP_ERR_OK)
        {
            return err;
        }
    
        return USP_ERR_OK;
    }
    
    /*********************************************************************//**
    **
    ** DM_EXEC_Destroy
    **
    ** Frees all memory used by the data model thread
    **
    ** \param   None
    **
    ** \return  USP_ERR_OK if successful
    **
    **************************************************************************/
    void DM_EXEC_Destroy(void)
    {
        DATA_MODEL_Stop();
        DATABASE_Destroy();
        SYNC_TIMER_Destroy();
    }
    
    /*********************************************************************//**
    **
    ** USP_SIGNAL_OperationComplete
    **
    ** Posts an operation complete message on the data model's message queue
    ** NOTE: Ownership of the (dynamically allocated by caller) output_args passes to data model
    **       But err_msg is copied by this function (ie ownership of err_msg does not pass to this function)
    ** NOTE: Error messages in this function are only logged rather than writing in the error message buffer (USP_ERR_SetMessage())
    **       because this function is normally called from a non core thread and if they did write, this might cause corruption of
    **       the core agent error message buffer
    **
    ** \param   instance - instance number of operation in Device.LocalAgent.Request table
    ** \param   err_code - error code of the operation (USP_ERR_OK indicates success)
    ** \param   err_msg - error message if the operation failed, or NULL if operation was successful
    ** \param   output_args - results of the completed operation (if successful)
    **
    ** \return  USP_ERR_OK if successful
    **
    **************************************************************************/
    int USP_SIGNAL_OperationComplete(int instance, int err_code, char *err_msg, kv_vector_t *output_args)
    {
        dm_exec_msg_t  msg;
        oper_complete_msg_t *ocm;
        int bytes_sent;
    
        // Exit if this function has been called with a mismatch between err_code and err_msg
    
        if ( ((err_code == USP_ERR_OK) && (err_msg != NULL)) ||
    
    William Lupton's avatar
    William Lupton committed
             ((err_code != USP_ERR_OK) && (err_msg == NULL)) )
        {
            USP_LOG_Error("%s: Mismatch in calling arguments err_code=%d, but err_msg='%s'", __FUNCTION__, err_code, err_msg);
            return USP_ERR_INTERNAL_ERROR;
        }
    
        // Exit if message queue is not setup yet
        if (mq_tx_socket == -1)
        {
            USP_LOG_Error("%s is being called before data model has been initialised", __FUNCTION__);
            return USP_ERR_INTERNAL_ERROR;
        }
    
        // Form message
        memset(&msg, 0, sizeof(msg));
        msg.type = kDmExecMsg_OperComplete;
        ocm = &msg.params.oper_complete;
        ocm->instance = instance;
        ocm->err_code = err_code;
        ocm->err_msg = (err_msg==NULL) ? NULL : USP_STRDUP(err_msg);
        ocm->output_args = output_args;
    
        // Send the message
        bytes_sent = send(mq_tx_socket, &msg, sizeof(msg), 0);
        if (bytes_sent != sizeof(msg))
        {
            char buf[USP_ERR_MAXLEN];
    
    Richard Holme's avatar
    Richard Holme committed
            USP_LOG_Error("%s(%d): send failed : (err=%d) %s", __FUNCTION__, __LINE__, errno, USP_ERR_ToString(errno, buf, sizeof(buf)) );
    
    William Lupton's avatar
    William Lupton committed
            return USP_ERR_INTERNAL_ERROR;
        }
    
        return USP_ERR_OK;
    }
    
    /*********************************************************************//**
    **
    ** USP_SIGNAL_DataModelEvent
    **
    ** Posts an event message on the data model's message queue
    ** NOTE: Ownership of the (dynamically allocated by caller) output_args passes to data model
    **       But event_name is copied by this function (ie ownership of event_name does not pass to this function)
    ** NOTE: Error messages in this function are only logged rather than writing in the error message buffer (USP_ERR_SetMessage())
    **       because this function is normally called from a non core thread and if they did write, this might cause corruption of
    **       the core agent error message buffer
    **
    ** \param   event_name - name of the event
    ** \param   output_args - arguments for the event
    **
    ** \return  USP_ERR_OK if successful
    **
    **************************************************************************/
    int USP_SIGNAL_DataModelEvent(char *event_name, kv_vector_t *output_args)
    {
        dm_exec_msg_t  msg;
        event_complete_msg_t *ecm;
        int bytes_sent;
    
        // Exit if message queue is not setup yet
        if (mq_tx_socket == -1)
        {
            USP_LOG_Error("%s is being called before data model has been initialised", __FUNCTION__);
            return USP_ERR_INTERNAL_ERROR;
        }
    
        // Form message
        memset(&msg, 0, sizeof(msg));
        msg.type = kDmExecMsg_EventComplete;
        ecm = &msg.params.event_complete;
        ecm->event_name = USP_STRDUP(event_name);
        ecm->output_args = output_args;
    
        // Send the message
        bytes_sent = send(mq_tx_socket, &msg, sizeof(msg), 0);
        if (bytes_sent != sizeof(msg))
        {
            char buf[USP_ERR_MAXLEN];
    
    Richard Holme's avatar
    Richard Holme committed
            USP_LOG_Error("%s(%d): send failed : (err=%d) %s", __FUNCTION__, __LINE__, errno, USP_ERR_ToString(errno, buf, sizeof(buf)) );
    
    William Lupton's avatar
    William Lupton committed
            return USP_ERR_INTERNAL_ERROR;
        }
    
        return USP_ERR_OK;
    }
    
    /*********************************************************************//**
    **
    ** USP_SIGNAL_OperationStatus
    **
    ** Posts an operation status message on the data model's message queue
    ** This function can be used by an operation thread to set the value of Device.LocalAgent.Request.{i}.Status in a thread-safe way
    ** NOTE: Error messages in this function are only logged rather than writing in the error message buffer (USP_ERR_SetMessage())
    **       because this function is normally called from a non core thread and if they did write, this might cause corruption of
    **       the core agent error message buffer
    **
    ** \param   instance - instance number of operation in Device.LocalAgent.Request table
    ** \param   status - status string to set
    **
    ** \return  USP_ERR_OK if successful
    **
    **************************************************************************/
    int USP_SIGNAL_OperationStatus(int instance, char *status)
    {
        dm_exec_msg_t  msg;
        oper_status_msg_t *osm;
        int bytes_sent;
    
        // Exit if this function has been called with invalid parameters
        if (status == NULL)
        {
            USP_LOG_Error("%s: status input argument must point to a string", __FUNCTION__);
            return USP_ERR_INTERNAL_ERROR;
        }
    
        // Exit if message queue is not setup yet
        if (mq_tx_socket == -1)
        {
            USP_LOG_Error("%s is being called before data model has been initialised", __FUNCTION__);
            return USP_ERR_INTERNAL_ERROR;
        }
    
        // Form message
        memset(&msg, 0, sizeof(msg));
        msg.type = kDmExecMsg_OperStatus;
        osm = &msg.params.oper_status;
        osm->instance = instance;
        osm->status = USP_STRDUP(status);
    
        // Send the message
        bytes_sent = send(mq_tx_socket, &msg, sizeof(msg), 0);
        if (bytes_sent != sizeof(msg))
        {
            char buf[USP_ERR_MAXLEN];
    
    Richard Holme's avatar
    Richard Holme committed
            USP_LOG_Error("%s(%d): send failed : (err=%d) %s", __FUNCTION__, __LINE__, errno, USP_ERR_ToString(errno, buf, sizeof(buf)) );
    
    William Lupton's avatar
    William Lupton committed
            return USP_ERR_INTERNAL_ERROR;
        }
    
        return USP_ERR_OK;
    }
    
    /*********************************************************************//**
    **
    ** USP_SIGNAL_ObjectAdded
    **
    ** Signals to USP core that the vendor has added an object instance to the data model
    ** This function may be called from any vendor thread
    **
    ** \param   path - path of object that has been added
    **
    ** \return  USP_ERR_OK if successful
    **
    **************************************************************************/
    int USP_SIGNAL_ObjectAdded(char *path)
    {
        dm_exec_msg_t  msg;
        obj_added_msg_t *oam;
        int bytes_sent;
    
        // Exit if this function has been called with invalid parameters
        if (path == NULL)
        {
            USP_LOG_Error("%s: path input argument must point to a string", __FUNCTION__);
            return USP_ERR_INTERNAL_ERROR;
        }
    
        // Exit if message queue is not setup yet
        if (mq_tx_socket == -1)
        {
            USP_LOG_Error("%s is being called before data model has been initialised", __FUNCTION__);
            return USP_ERR_INTERNAL_ERROR;
        }
    
        // Form message
        memset(&msg, 0, sizeof(msg));
        msg.type = kDmExecMsg_ObjAdded;
        oam = &msg.params.obj_added;
        oam->path = USP_STRDUP(path);
    
        // Send the message
        bytes_sent = send(mq_tx_socket, &msg, sizeof(msg), 0);
        if (bytes_sent != sizeof(msg))
        {
            char buf[USP_ERR_MAXLEN];
    
    Richard Holme's avatar
    Richard Holme committed
            USP_LOG_Error("%s(%d): send failed : (err=%d) %s", __FUNCTION__, __LINE__, errno, USP_ERR_ToString(errno, buf, sizeof(buf)) );
    
    William Lupton's avatar
    William Lupton committed
            return USP_ERR_INTERNAL_ERROR;
        }
    
        return USP_ERR_OK;
    }
    
    /*********************************************************************//**
    **
    ** USP_SIGNAL_ObjectDeleted
    **
    ** Signals to USP core that the vendor has deleted an object instance from the data model
    ** This function may be called from any vendor thread
    **
    ** \param   path - path of object that has been deleted
    **
    ** \return  USP_ERR_OK if successful
    **
    **************************************************************************/
    int USP_SIGNAL_ObjectDeleted(char *path)
    {
        dm_exec_msg_t  msg;
        obj_deleted_msg_t *odm;
        int bytes_sent;
    
        // Exit if this function has been called with invalid parameters
        if (path == NULL)
        {
            USP_LOG_Error("%s: path input argument must point to a string", __FUNCTION__);
            return USP_ERR_INTERNAL_ERROR;
        }
    
        // Exit if message queue is not setup yet
        if (mq_tx_socket == -1)
        {
            USP_LOG_Error("%s is being called before data model has been initialised", __FUNCTION__);
            return USP_ERR_INTERNAL_ERROR;
        }
    
        // Form message
        memset(&msg, 0, sizeof(msg));
        msg.type = kDmExecMsg_ObjDeleted;
        odm = &msg.params.obj_deleted;
        odm->path = USP_STRDUP(path);
    
        // Send the message
        bytes_sent = send(mq_tx_socket, &msg, sizeof(msg), 0);
        if (bytes_sent != sizeof(msg))
        {
            char buf[USP_ERR_MAXLEN];
    
    Richard Holme's avatar
    Richard Holme committed
            USP_LOG_Error("%s(%d): send failed : (err=%d) %s", __FUNCTION__, __LINE__, errno, USP_ERR_ToString(errno, buf, sizeof(buf)) );
    
    William Lupton's avatar
    William Lupton committed
            return USP_ERR_INTERNAL_ERROR;
        }
    
        return USP_ERR_OK;
    }
    
    /*********************************************************************//**
    **
    ** DM_EXEC_PostUspRecord
    **
    ** Posts a USP record to be processed by the data model thread
    **
    ** \param   pbuf - pointer to buffer containing protobuf encoded USP record
    
    Richard Holme's avatar
    Richard Holme committed
    **                 NOTE: This is part of larger buffer (with STOMP), so must be copied before sending to the data model thread)
    
    William Lupton's avatar
    William Lupton committed
    ** \param   pbuf_len - length of protobuf encoded message
    ** \param   role - Controller Trust Role allowed for this message
    
    Richard Holme's avatar
    Richard Holme committed
    ** \param   mrt - details of where response to this USP message should be sent
    
    William Lupton's avatar
    William Lupton committed
    **
    ** \return  None
    **
    **************************************************************************/
    
    void DM_EXEC_PostUspRecord(unsigned char *pbuf, int pbuf_len, ctrust_role_t role, mtp_reply_to_t *mrt)
    
    William Lupton's avatar
    William Lupton committed
    {
    
    #if 1
        // NOTE: This function differs in the USP Test Controller than in the OBUSPA agent codebase
        //       It cannot post to the data model thread, because the test controller runs instead of the data model
        //       Instead, it logs the received USP record and message
    
        UspRecord__Record *rec;
        Usp__Msg *usp;
    
        // Exit if unable to unpack the USP record
        rec = usp_record__record__unpack(pbuf_allocator, pbuf_len, pbuf);
        if (rec == NULL)
        {
            USP_ERR_SetMessage("%s: usp_record__session_record__unpack failed. Ignoring USP Record", __FUNCTION__);
            return;
        }
    
        // Print USP record in human readable form
        PROTO_TRACE_ProtobufMessage(&rec->base);
    
        // Exit if no USP message contained in USP Record
        if ((rec->record_type_case != USP_RECORD__RECORD__RECORD_TYPE_NO_SESSION_CONTEXT) ||
            (rec->no_session_context == NULL) || (rec->no_session_context->payload.len == 0) || 
            (rec->no_session_context->payload.data==NULL))
        {
            USP_ERR_SetMessage("%s: USP Record contained no USP message (or message was in a E2E session context). Ignoring USP Record", __FUNCTION__);
            usp_record__record__free_unpacked(rec, pbuf_allocator);
            return;
        }
    
        // Exit if unable to unpack the USP message
        usp = usp__msg__unpack(pbuf_allocator, rec->no_session_context->payload.len, rec->no_session_context->payload.data);
        if (usp == NULL)
        {
            USP_ERR_SetMessage("%s: usp__msg__unpack failed", __FUNCTION__);
            usp_record__record__free_unpacked(rec, pbuf_allocator);
            return;
        }
    
        // Print USP message in human readable form
        PROTO_TRACE_ProtobufMessage(&usp->base);
    
        // Free unpacked protobuf structures
        usp__msg__free_unpacked(usp, pbuf_allocator);
        usp_record__record__free_unpacked(rec, pbuf_allocator);
        return;
    
    #else
    
    William Lupton's avatar
    William Lupton committed
        dm_exec_msg_t  msg;
        process_usp_record_msg_t *pur;
        int bytes_sent;
    
        // Exit if message queue is not setup yet
        if (mq_tx_socket == -1)
        {
            USP_LOG_Error("%s is being called before data model has been initialised", __FUNCTION__);
            return;
        }
    
        // Form message
        memset(&msg, 0, sizeof(msg));
        msg.type = kDmExecMsg_ProcessUspRecord;
        pur = &msg.params.usp_record;
        pur->pbuf = USP_MALLOC(pbuf_len);
        memcpy(pur->pbuf, pbuf, pbuf_len);
        pur->pbuf_len = pbuf_len;
        pur->role = role;
    
    Richard Holme's avatar
    Richard Holme committed
        pur->mtp_reply_to.protocol = mrt->protocol;
        pur->mtp_reply_to.is_reply_to_specified = mrt->is_reply_to_specified;
        pur->mtp_reply_to.stomp_dest = USP_STRDUP(mrt->stomp_dest);
        pur->mtp_reply_to.stomp_instance = mrt->stomp_instance;
        pur->mtp_reply_to.stomp_err_id = USP_STRDUP(mrt->stomp_err_id);
        pur->mtp_reply_to.coap_host = USP_STRDUP(mrt->coap_host);
        pur->mtp_reply_to.coap_port = mrt->coap_port;
        pur->mtp_reply_to.coap_resource = USP_STRDUP(mrt->coap_resource);
        pur->mtp_reply_to.coap_encryption = mrt->coap_encryption;
        pur->mtp_reply_to.coap_reset_session_hint = mrt->coap_reset_session_hint;
    
        pur->mtp_reply_to.mqtt_topic = USP_STRDUP(mrt->mqtt_topic);
        pur->mtp_reply_to.mqtt_instance = mrt->mqtt_instance;
        pur->mtp_reply_to.wsclient_cont_instance = mrt->wsclient_cont_instance;
        pur->mtp_reply_to.wsclient_mtp_instance = mrt->wsclient_mtp_instance;
    
    William Lupton's avatar
    William Lupton committed
    
        // Send the message
        bytes_sent = send(mq_tx_socket, &msg, sizeof(msg), 0);
        if (bytes_sent != sizeof(msg))
        {
            char buf[USP_ERR_MAXLEN];
    
    Richard Holme's avatar
    Richard Holme committed
            USP_LOG_Error("%s(%d): send failed : (err=%d) %s", __FUNCTION__, __LINE__, errno, USP_ERR_ToString(errno, buf, sizeof(buf)) );
    
    William Lupton's avatar
    William Lupton committed
            return;
        }
    
    William Lupton's avatar
    William Lupton committed
    }
    
    /*********************************************************************//**
    **
    ** DM_EXEC_PostStompHandshakeComplete
    **
    ** Posts the role associated with a Stomp connection, after the STOMP initial TLS handshake has completed
    
    ** This notifies the DataModel of the role to use for each controller connected to a STOMP broker
    
    William Lupton's avatar
    William Lupton committed
    ** This message will unblock processing of Boot! event and subscriptions, which are held up until the controller
    ** trust role associated with each controller is known (otherwise they would use the wrong role when getting data)
    ** Note: Restarting of async operations are also held up, because we want them to occur after the Boot! event
    **
    ** \param   stomp_instance - instance number of STOMP connection in Device.STOMP.Connection.{i}
    ** \param   role - Role to use for controllers connected to the specified STOMP connection
    **
    ** \return  None
    **
    **************************************************************************/
    
    void DM_EXEC_PostStompHandshakeComplete(int stomp_instance, ctrust_role_t role)
    
    William Lupton's avatar
    William Lupton committed
    {
        dm_exec_msg_t  msg;
        stomp_complete_msg_t *scm;
        int bytes_sent;
    
        // Exit if message queue is not setup yet
        if (mq_tx_socket == -1)
        {
            USP_LOG_Error("%s is being called before data model has been initialised", __FUNCTION__);
            return;
        }
    
        // Form message
        memset(&msg, 0, sizeof(msg));
        msg.type = kDmExecMsg_StompHandshakeComplete;
        scm = &msg.params.stomp_complete;
        scm->stomp_instance = stomp_instance;
        scm->role = role;
    
        // Send the message
        bytes_sent = send(mq_tx_socket, &msg, sizeof(msg), 0);
        if (bytes_sent != sizeof(msg))
        {
            char buf[USP_ERR_MAXLEN];
    
    Richard Holme's avatar
    Richard Holme committed
            USP_LOG_Error("%s(%d): send failed : (err=%d) %s", __FUNCTION__, __LINE__, errno, USP_ERR_ToString(errno, buf, sizeof(buf)) );
    
    William Lupton's avatar
    William Lupton committed
            return;
        }
    }
    
    
    /*********************************************************************//**
    **
    ** DM_EXEC_PostMqttHandshakeComplete
    **
    ** Posts the role associated with an MQTT connection, after the TLS handshake has completed
    ** This notifies the DataModel of the role to use for each controller connected to an MQTT broker
    ** This message will unblock processing of Boot! event and subscriptions, which are held up until the controller
    ** trust role associated with each controller is known (otherwise they would use the wrong role when getting data)
    ** Note: Restarting of async operations are also held up, because we want them to occur after the Boot! event
    **
    ** \param   stomp_instance - instance number of STOMP connection in Device.STOMP.Connection.{i}
    ** \param   role - Role to use for controllers connected to the specified STOMP connection
    **
    ** \return  None
    **
    **************************************************************************/
    void DM_EXEC_PostMqttHandshakeComplete(int mqtt_instance, ctrust_role_t role)
    {
        dm_exec_msg_t  msg;
        mqtt_complete_msg_t *mcm;
        int bytes_sent;
    
        // Exit if message queue is not setup yet
        if (mq_tx_socket == -1)
        {
            USP_LOG_Error("%s is being called before data model has been initialised", __FUNCTION__);
            return;
        }
    
        // Form message
        memset(&msg, 0, sizeof(msg));
        msg.type = kDmExecMsg_MqttHandshakeComplete;
        mcm = &msg.params.mqtt_complete;
        mcm->mqtt_instance = mqtt_instance;
        mcm->role = role;
    
        // Send the message
        bytes_sent = send(mq_tx_socket, &msg, sizeof(msg), 0);
        if (bytes_sent != sizeof(msg))
        {
            char buf[USP_ERR_MAXLEN];
            USP_LOG_Error("%s(%d): send failed : (err=%d) %s", __FUNCTION__, __LINE__, errno, USP_ERR_ToString(errno, buf, sizeof(buf)) );
            return;
        }
    }
    
    
    
    William Lupton's avatar
    William Lupton committed
    /*********************************************************************//**
    **
    ** DM_EXEC_PostMtpThreadExited
    **
    ** Signals that the MTP thread has exited, this will be because an exit was scheduled
    
    ** (either due to the controller requesting a reboot, or factory reset, or a stop CLI command being sent)
    
    William Lupton's avatar
    William Lupton committed
    **
    
    Richard Holme's avatar
    Richard Holme committed
    ** \param   flags - flags determining which stomp thread exited
    
    William Lupton's avatar
    William Lupton committed
    **
    ** \return  None
    **
    **************************************************************************/
    
    Richard Holme's avatar
    Richard Holme committed
    void DM_EXEC_PostMtpThreadExited(unsigned flags)
    
    William Lupton's avatar
    William Lupton committed
    {
        dm_exec_msg_t  msg;
        int bytes_sent;
    
    Richard Holme's avatar
    Richard Holme committed
        mtp_thread_exited_msg_t *tem;
    
    William Lupton's avatar
    William Lupton committed
    
        // Exit if message queue is not setup yet
        if (mq_tx_socket == -1)
        {
            USP_LOG_Error("%s is being called before data model has been initialised", __FUNCTION__);
            return;
        }
    
        // Form message
        memset(&msg, 0, sizeof(msg));
        msg.type = kDmExecMsg_MtpThreadExited;
    
    Richard Holme's avatar
    Richard Holme committed
        tem = &msg.params.mtp_thread_exited;
        tem->flags = flags;
    
    William Lupton's avatar
    William Lupton committed
        // Send the message
        bytes_sent = send(mq_tx_socket, &msg, sizeof(msg), 0);
        if (bytes_sent != sizeof(msg))
        {
            char buf[USP_ERR_MAXLEN];
    
    Richard Holme's avatar
    Richard Holme committed
            USP_LOG_Error("%s(%d): send failed : (err=%d) %s", __FUNCTION__, __LINE__, errno, USP_ERR_ToString(errno, buf, sizeof(buf)) );
    
    William Lupton's avatar
    William Lupton committed
            return;
        }
    }
    
    
    
    /*********************************************************************//**
    **
    ** DM_EXEC_NotifyBdcTransferResult
    **
    ** Posts a message that signals to data model thread that the specified Bulk Data Collection
    ** report has been sent, or failed to send
    **
    ** \param   profile_id - Instance number of profile in Device.Bulkdata.Profile.{i}
    
    Richard Holme's avatar
    Richard Holme committed
    ** \param   transfer_result - result code of the transfer
    
    William Lupton's avatar
    William Lupton committed
    **
    ** \return  USP_ERR_OK if successful
    **
    **************************************************************************/
    
    Richard Holme's avatar
    Richard Holme committed
    int DM_EXEC_NotifyBdcTransferResult(int profile_id, bdc_transfer_result_t transfer_result)
    
    William Lupton's avatar
    William Lupton committed
    {
        dm_exec_msg_t  msg;
        bdc_transfer_result_msg_t *btr;
        int bytes_sent;
    
        // Exit if message queue is not setup yet
        if (mq_tx_socket == -1)
        {
            USP_LOG_Error("%s is being called before data model has been initialised", __FUNCTION__);
            return USP_ERR_INTERNAL_ERROR;
        }
    
        // Form message
        memset(&msg, 0, sizeof(msg));
        msg.type = kDmExecMsg_BdcTransferResult;
        btr = &msg.params.bdc_transfer_result;
        btr->profile_id = profile_id;
        btr->transfer_result = transfer_result;
    
        // Send the message
        bytes_sent = send(mq_tx_socket, &msg, sizeof(msg), 0);
        if (bytes_sent != sizeof(msg))
        {
            char buf[USP_ERR_MAXLEN];
    
    Richard Holme's avatar
    Richard Holme committed
            USP_LOG_Error("%s(%d): send failed : (err=%d) %s", __FUNCTION__, __LINE__, errno, USP_ERR_ToString(errno, buf, sizeof(buf)) );
    
    William Lupton's avatar
    William Lupton committed
            return USP_ERR_INTERNAL_ERROR;
        }
    
    
    William Lupton's avatar
    William Lupton committed
        return USP_ERR_OK;
    }
    
    /*********************************************************************//**
    **
    ** DM_EXEC_EnableNotifications
    **
    ** Unblocks processing of Boot! event and subscriptions, which are held up until the controller
    ** trust role associated with each controller is known (otherwise they would use the wrong role when getting data)
    ** Note: Restarting of async operations are also held up, because we want them to occur after the Boot! event
    **
    ** \param   None
    **
    ** \return  None
    **
    **************************************************************************/
    void DM_EXEC_EnableNotifications(void)
    {
        // Exit if the notifications are already enabled
        if (is_notifications_enabled)
        {
            return;
        }
    
        // Then start the parts of the data model which were held up, waiting for the controller's role to be known
    
    
        // Queue object creation/deletion notifications
        DEVICE_SUBSCRIPTION_ProcessAllObjectLifeEventSubscriptions();
    
        // Send out initial Boot NotifyReq, and determine the initial values of all value change parameters
        // This also starts the sync timer to poll for value change notifications
        DEVICE_SUBSCRIPTION_Update(0);
    
        // Restart all asynchronous operations that did not complete (and that are meant to be restarted after a reboot)
        // NOTE: This also sends out events for all operations that required a reboot to complete them
        DEVICE_REQUEST_RestartAsyncOperations();
    
        // Set flag, so that subsequent calls to this function (eg stomp reconnections) do not start the data model again
        is_notifications_enabled = true;
    }
    
    
    /*********************************************************************//**
    **
    ** DM_EXEC_IsNotificationsEnabled
    **
    ** Returns whether USP notifications may be generated.
    ** NOTE: USP notifications are not generated before the Boot! event has been sent
    **       This function is called to avoid a large queue of USP notifications in the case of MTP connection failure
    **
    ** \param   None
    **
    ** \return  true if USP notifications can be sent
    **
    **************************************************************************/
    bool DM_EXEC_IsNotificationsEnabled(void)
    {
        return is_notifications_enabled;
    }
    
    
    William Lupton's avatar
    William Lupton committed
    /*********************************************************************//**
    **
    ** DM_EXEC_Main
    **
    ** Main loop of the data model thread
    **
    ** \param   args - arguments (currently unused)
    **
    ** \return  None
    **
    **************************************************************************/
    void *DM_EXEC_Main(void *args)
    {
        int err;
        int num_sockets;
        socket_set_t set;
    
        int enabled_connections = 0;
    
    William Lupton's avatar
    William Lupton committed
    
        // Exit if unable to connect to the unix domain socket used to implement the CLI server
        err = CLI_SERVER_Init();
        if (err != USP_ERR_OK)
        {
            USP_LOG_Error("%s: CLI_SERVER_Init() failed. Aborting Data Model thread", __FUNCTION__);
            return NULL;
        }
    
    
        // Determine whether we have to wait for a STOMP or MQTT connection, before enabling notifications
        // This is necessary because the contents of the Boot! event may be dependant on the permissions that the USP Controller has, and we'll only know this after connecting to it
    #ifndef DISABLE_STOMP
        enabled_connections += DEVICE_STOMP_CountEnabledConnections();
    #endif
    
    #ifdef ENABLE_MQTT
        enabled_connections += DEVICE_MQTT_CountEnabledConnections();
    #endif
    
        // Enable notifications now, if we don't have to wait for a STOMP or MQTT connection before generating a Boot! notification
        if (enabled_connections == 0)
    
    William Lupton's avatar
    William Lupton committed
        {
            DM_EXEC_EnableNotifications();
        }
    
        OS_UTILS_LockMutex(&dm_access_mutex);
    
        while(FOREVER)
        {
            // Create the socket set to receive/transmit on (with timeout)
            UpdateSockSet(&set);
    
            // Unlock the mutex around the select()
            OS_UTILS_UnlockMutex(&dm_access_mutex);
    
            // Wait for read/write activity on sockets or timeout
            num_sockets = SOCKET_SET_Select(&set);
    
            OS_UTILS_LockMutex(&dm_access_mutex);
    
            // Execute all timers which are ready to fire
            SYNC_TIMER_Execute();
    
            // Process socket activity
            switch(num_sockets)
            {
                case -1:
                    // An unrecoverable error has occurred
                    USP_LOG_Error("%s: Unrecoverable socket select() error. Aborting Data Model thread", __FUNCTION__);
                    return NULL;
                    break;
    
                case 0:
                    // No controllers with any activity, but we still may need to process a timeout, so fall-through
                default:
                    // Controllers with activity
                    ProcessSocketActivity(&set);
                    break;
            }
    
            // Queue any object creation/deletion events which have been generated by the message or timer callbacks
            DEVICE_SUBSCRIPTION_ProcessAllObjectLifeEventSubscriptions();
    
            // Print out any memory allocations that got added for this time around the loop
            //USP_MEM_Print();
        }
    }
    
    /*********************************************************************//**
    **
    ** UpdateSockSet
    **
    ** Adds all sockets to wait for activity on, into the socket set
    ** Also updates the associated timeout for activity
    ** This function must be called every time before the call to select(), as select alters the socket set
    **
    ** \param   set - pointer to socket set structure to update with sockets to wait for activity on
    **
    ** \return  None
    **
    **************************************************************************/
    void UpdateSockSet(socket_set_t *set)
    {
        int delay_ms;
    
    William Lupton's avatar
    William Lupton committed
        SOCKET_SET_Clear(set);
    
        // Add the CLI server socket to the socket set
        CLI_SERVER_UpdateSocketSet(set);
    
        // Add the message queue receiving socket to the socket set
    
    Richard Holme's avatar
    Richard Holme committed
        SOCKET_SET_AddSocketToReceiveFrom(mq_rx_socket, MAX_SOCKET_TIMEOUT, set);
    
    William Lupton's avatar
    William Lupton committed
    
        // Update socket timeout time with the time to the next timer
        delay_ms = SYNC_TIMER_TimeToNext();
        SOCKET_SET_UpdateTimeout(delay_ms, set);
    }
    
    /*********************************************************************//**