diff --git a/include/asterisk/ari.h b/include/asterisk/ari.h index c3df46a2b421cedab0adf1fc01f0432712ec2fde..c9f47a6e5876997be8d6a2859027cb01ad0907c7 100644 --- a/include/asterisk/ari.h +++ b/include/asterisk/ari.h @@ -187,6 +187,16 @@ struct ast_json *ast_ari_websocket_session_read( int ast_ari_websocket_session_write(struct ast_ari_websocket_session *session, struct ast_json *message); +/*! + * \brief Get the Session ID for an ARI WebSocket. + * + * \param session Session to query. + * \return Session ID. + * \return \c NULL on error. + */ +const char *ast_ari_websocket_session_id( + const struct ast_ari_websocket_session *session); + /*! * \brief The stock message to return when out of memory. * diff --git a/include/asterisk/astobj2.h b/include/asterisk/astobj2.h index 4a7aeeedd2a73fe5be2002b58cddb315049780be..c28dd2387324b72018e022cf6333be7df28307ff 100644 --- a/include/asterisk/astobj2.h +++ b/include/asterisk/astobj2.h @@ -1642,7 +1642,7 @@ void *__ao2_unlink(struct ao2_container *c, void *obj, int flags, * The use of flags argument is the follow: * * OBJ_UNLINK unlinks the object found - * OBJ_NODATA on match, do return an object + * OBJ_NODATA on match, do not return an object * Callbacks use OBJ_NODATA as a default * functions such as find() do * OBJ_MULTIPLE return multiple matches diff --git a/include/asterisk/http_websocket.h b/include/asterisk/http_websocket.h index 5adc089255cf66dc5c7db829259bc9a90ba576bb..23492ff95563ee25e1625b3334e87316c28bfcb4 100644 --- a/include/asterisk/http_websocket.h +++ b/include/asterisk/http_websocket.h @@ -77,13 +77,14 @@ struct ast_websocket; * \param ser The TCP/TLS session * \param parameters Parameters extracted from the request URI * \param headers Headers included in the request + * \param session_id The id of the current session. * * \retval 0 The session should be accepted * \retval -1 The session should be rejected. Note that the caller must send an error * response using \ref ast_http_error. * \since 13.5.0 */ -typedef int (*ast_websocket_pre_callback)(struct ast_tcptls_session_instance *ser, struct ast_variable *parameters, struct ast_variable *headers); +typedef int (*ast_websocket_pre_callback)(struct ast_tcptls_session_instance *ser, struct ast_variable *parameters, struct ast_variable *headers, const char *session_id); /*! * \brief Callback for when a new connection for a sub-protocol is established @@ -359,6 +360,13 @@ AST_OPTIONAL_API(int, ast_websocket_is_secure, (struct ast_websocket *session), */ AST_OPTIONAL_API(int, ast_websocket_set_nonblock, (struct ast_websocket *session), { errno = ENOSYS; return -1;}); +/*! + * \brief Get the session ID for a WebSocket session. + * + * \retval session id + */ +AST_OPTIONAL_API(const char *, ast_websocket_session_id, (struct ast_websocket *session), { errno = ENOSYS; return NULL;}); + /*! * \brief Result code for a websocket client. */ diff --git a/include/asterisk/vector.h b/include/asterisk/vector.h index 0a13c560badc3b40f72f5e54e32f6170c552029d..be909127235b8a6bdcf26a40557c7d3aed3c7c64 100644 --- a/include/asterisk/vector.h +++ b/include/asterisk/vector.h @@ -110,7 +110,7 @@ /*! * \brief Deallocates this vector. * - * If any code to free the elements of this vector need to be run, that should + * If any code to free the elements of this vector needs to be run, that should * be done prior to this call. * * \param vec Vector to deallocate. diff --git a/res/ari/ari_websockets.c b/res/ari/ari_websockets.c index ac5b5788dc3583237e7313c59778881755876a5a..9a4f2c9fe6b8cd375a27007509268856b82bdf77 100644 --- a/res/ari/ari_websockets.c +++ b/res/ari/ari_websockets.c @@ -153,7 +153,7 @@ struct ast_json *ast_ari_websocket_session_read( "{" \ " \"error\": \"InvalidMessage\"," \ " \"message\": \"Message validation failed\"" \ - "}" + "}" int ast_ari_websocket_session_write(struct ast_ari_websocket_session *session, struct ast_json *message) @@ -196,3 +196,9 @@ void ari_handle_websocket(struct ast_websocket_server *ws_server, ast_websocket_uri_cb(ser, &fake_urih, uri, method, get_params, headers); } + +const char *ast_ari_websocket_session_id( + const struct ast_ari_websocket_session *session) +{ + return ast_websocket_session_id(session->ws_session); +} diff --git a/res/ari/resource_events.c b/res/ari/resource_events.c index e666f2e05866b889b6d648f52940bbad64fe06db..f1342b7fae3cfd9dab65ba7261d264f8d925d8e2 100644 --- a/res/ari/resource_events.c +++ b/res/ari/resource_events.c @@ -27,226 +27,455 @@ ASTERISK_REGISTER_FILE() +#include "resource_events.h" #include "asterisk/astobj2.h" #include "asterisk/stasis_app.h" -#include "resource_events.h" +#include "asterisk/vector.h" -/*! Number of buckets for the Stasis application hash table. Remember to keep it - * a prime number! - */ +/*! Number of buckets for the event session registry. Remember to keep it a prime number! */ +#define EVENT_SESSION_NUM_BUCKETS 23 + +/*! Number of buckets for a websocket apps container. Remember to keep it a prime number! */ #define APPS_NUM_BUCKETS 7 -/*! \brief A connection to the event WebSocket */ +/*! Initial size of a message queue. */ +#define MESSAGES_INIT_SIZE 23 + + +/*! \brief A wrapper for the /ref ast_ari_websocket_session. */ struct event_session { - struct ast_ari_websocket_session *ws_session; - struct ao2_container *websocket_apps; + struct ast_ari_websocket_session *ws_session; /*!< Handle to the websocket session. */ + struct ao2_container *websocket_apps; /*!< List of Stasis apps registered to + the websocket session. */ + AST_VECTOR(, struct ast_json *) message_queue; /*!< Container for holding delayed messages. */ + char session_id[]; /*!< The id for the websocket session. */ }; +/*! \brief \ref event_session error types. */ +enum event_session_error_type { + ERROR_TYPE_STASIS_REGISTRATION = 1, /*!< Stasis failed to register the application. */ + ERROR_TYPE_OOM = 2, /*!< Insufficient memory to create the event + session. */ + ERROR_TYPE_MISSING_APP_PARAM = 3, /*!< HTTP request was missing an [app] parameter. */ + ERROR_TYPE_INVALID_APP_PARAM = 4, /*!< HTTP request contained an invalid [app] + parameter. */ +}; + +/*! \brief Local registry for created \ref event_session objects. */ +static struct ao2_container *event_session_registry; + /*! - * \brief Explicitly shutdown a session. + * \brief Callback handler for Stasis application messages. * - * An explicit shutdown is necessary, since stasis-app has a reference to this - * session. We also need to be sure to null out the \c ws_session field, since - * the websocket is about to go away. + * \internal * - * \param session Session info struct. + * \param data Void pointer to the event session (\ref event_session). + * \param app_name Name of the Stasis application that dispatched the message. + * \param message The dispatched message. */ -static void session_shutdown(struct event_session *session) +static void stasis_app_message_handler( + void *data, const char *app_name, struct ast_json *message) { - struct ao2_iterator i; - char *app; - SCOPED_AO2LOCK(lock, session); + struct event_session *session = data; + const char *msg_type, *msg_application; + + ast_assert(session != NULL); + + ao2_lock(session); + + msg_type = S_OR(ast_json_string_get(ast_json_object_get(message, "type")), ""); + msg_application = S_OR( + ast_json_string_get(ast_json_object_get(message, "application")), ""); - i = ao2_iterator_init(session->websocket_apps, 0); - while ((app = ao2_iterator_next(&i))) { - stasis_app_unregister(app); - ao2_cleanup(app); + /* If we've been replaced, remove the application from our local + websocket_apps container */ + if (strcmp(msg_type, "ApplicationReplaced") == 0 && + strcmp(msg_application, app_name) == 0) { + ao2_find(session->websocket_apps, msg_application, + OBJ_UNLINK | OBJ_NODATA); } - ao2_iterator_destroy(&i); - ao2_cleanup(session->websocket_apps); - session->websocket_apps = NULL; - session->ws_session = NULL; + /* Now, we need to determine our state to see how we will handle the message */ + if (ast_json_object_set(message, "application", ast_json_string_create(app_name))) { + /* We failed to add an application element to our json message */ + ast_log(LOG_WARNING, + "Failed to dispatch '%s' message from Stasis app '%s'; could not update message\n", + msg_type, + msg_application); + } else if (!session->ws_session) { + /* If the websocket is NULL, the message goes to the queue */ + AST_VECTOR_APPEND(&session->message_queue, message); + ast_log(LOG_WARNING, + "Queued '%s' message for Stasis app '%s'; websocket is not ready\n", + msg_type, + msg_application); + } else { + /* We are ready to publish the message */ + ast_ari_websocket_session_write(session->ws_session, message); + } + + ao2_unlock(session); } -static void session_dtor(void *obj) +/*! + * \brief AO2 comparison function for \ref event_session objects. + * + * \internal + * + * \param obj Void pointer to the \ref event_session container. + * \param arg Void pointer to the \ref event_session object. + * \param flags The \ref search_flags to use when creating the hash key. + * + * \retval 0 The objects are not equal. + * \retval CMP_MATCH The objects are equal. + */ +static int event_session_compare(void *obj, void *arg, int flags) { -#ifdef AST_DEVMODE /* Avoid unused variable warning */ - struct event_session *session = obj; -#endif + const struct event_session *object_left = obj; + const struct event_session *object_right = arg; + const char *right_key = arg; + int cmp = 0; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_OBJECT: + right_key = object_right->session_id; + /* Fall through */ + case OBJ_SEARCH_KEY: + cmp = strcmp(object_left->session_id, right_key); + break; + case OBJ_SEARCH_PARTIAL_KEY: + cmp = strncmp(object_left->session_id, right_key, strlen(right_key)); + break; + default: + break; + } - /* session_shutdown should have been called before */ - ast_assert(session->ws_session == NULL); - ast_assert(session->websocket_apps == NULL); + return cmp ? 0 : CMP_MATCH; +} + +/*! + * \brief AO2 hash function for \ref event_session objects. + * + * \details Computes hash value for the given \ref event_session, with respect to the + * provided search flags. + * + * \internal + * + * \param obj Void pointer to the \ref event_session object. + * \param flags The \ref search_flags to use when creating the hash key. + * + * \retval > 0 on success + * \retval 0 on failure + */ +static int event_session_hash(const void *obj, const int flags) +{ + const struct event_session *session; + const char *key; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_KEY: + key = obj; + break; + case OBJ_SEARCH_OBJECT: + session = obj; + key = session->session_id; + break; + default: + /* Hash can only work on something with a full key. */ + ast_assert(0); + return 0; + } + return ast_str_hash(key); } -static void session_cleanup(struct event_session *session) +/*! + * \brief Explicitly shutdown a session. + * + * \details An explicit shutdown is necessary, since the \ref stasis_app has a reference + * to this session. We also need to be sure to null out the \c ws_session field, + * since the websocket is about to go away. + * + * \internal + * + * \param session Event session object (\ref event_session). + */ +static void event_session_shutdown(struct event_session *session) { - session_shutdown(session); - ao2_cleanup(session); + struct ao2_iterator i; + char *app; + int j; + SCOPED_AO2LOCK(lock, session); + + /* Clean up the websocket_apps container */ + if (session->websocket_apps) { + i = ao2_iterator_init(session->websocket_apps, 0); + while ((app = ao2_iterator_next(&i))) { + stasis_app_unregister(app); + ao2_cleanup(app); + } + ao2_iterator_destroy(&i); + ao2_cleanup(session->websocket_apps); + session->websocket_apps = NULL; + } + + /* Clean up the message_queue container */ + for (j = 0; j < AST_VECTOR_SIZE(&session->message_queue); j++) { + struct ast_json *msg = AST_VECTOR_GET(&session->message_queue, j); + ast_json_unref(msg); + } + AST_VECTOR_FREE(&session->message_queue); + + /* Remove the handle to the underlying websocket session */ + session->ws_session = NULL; } -static struct event_session *session_create( - struct ast_ari_websocket_session *ws_session) +/*! + * \brief Updates the websocket session for an \ref event_session. + * + * \details The websocket for the given \ref event_session will be updated to the value + * of the \c ws_session argument. + * + * If the value of the \c ws_session is not \c NULL and there are messages in the + * event session's \c message_queue, the messages are dispatched and removed from + * the queue. + * + * \internal + * + * \param session The event session object to update (\ref event_session). + * \param ws_session Handle to the underlying websocket session + * (\ref ast_ari_websocket_session). + */ +static void event_session_update_websocket( + struct event_session *session, struct ast_ari_websocket_session *ws_session) { - RAII_VAR(struct event_session *, session, NULL, ao2_cleanup); + int i; + + ast_assert(session != NULL); - session = ao2_alloc(sizeof(*session), session_dtor); + ao2_lock(session); session->ws_session = ws_session; - session->websocket_apps = - ast_str_container_alloc(APPS_NUM_BUCKETS); - if (!session->websocket_apps) { - return NULL; + for (i = 0; i < AST_VECTOR_SIZE(&session->message_queue); i++) { + struct ast_json *msg = AST_VECTOR_GET(&session->message_queue, i); + ast_ari_websocket_session_write(session->ws_session, msg); + ast_json_unref(msg); } - ao2_ref(session, +1); - return session; + AST_VECTOR_RESET(&session->message_queue, AST_VECTOR_ELEM_CLEANUP_NOOP); + ao2_unlock(session); } /*! - * \brief Callback handler for Stasis application messages. + * \brief Processes cleanup actions for a \ref event_session object. + * + * \internal + * + * \param session The event session object to cleanup (\ref event_session). */ -static void app_handler(void *data, const char *app_name, - struct ast_json *message) +static void event_session_cleanup(struct event_session *session) { - struct event_session *session = data; - int res; - const char *msg_type = S_OR( - ast_json_string_get(ast_json_object_get(message, "type")), - ""); - const char *msg_application = S_OR( - ast_json_string_get(ast_json_object_get(message, "application")), - ""); - if (!session) { return; } - - /* Determine if we've been replaced */ - if (strcmp(msg_type, "ApplicationReplaced") == 0 && - strcmp(msg_application, app_name) == 0) { - ao2_find(session->websocket_apps, msg_application, - OBJ_UNLINK | OBJ_NODATA); - } - res = ast_json_object_set(message, "application", - ast_json_string_create(app_name)); - if(res != 0) { - return; - } + event_session_shutdown(session); + ao2_unlink(event_session_registry, session); +} - ao2_lock(session); - if (session->ws_session) { - ast_ari_websocket_session_write(session->ws_session, message); - } - ao2_unlock(session); +/*! + * \brief Event session object destructor (\ref event_session). + * + * \internal + * + * \param obj Void pointer to the \ref event_session object. + */ +static void event_session_dtor(void *obj) +{ +#ifdef AST_DEVMODE /* Avoid unused variable warning */ + struct event_session *session = obj; + +#endif + + /* event_session_shutdown should have been called before now */ + ast_assert(session->ws_session == NULL); + ast_assert(session->websocket_apps == NULL); + ast_assert(AST_VECTOR_SIZE(&session->message_queue) == 0); } /*! - * \brief Register for all of the apps given. - * \param session Session info struct. - * \param app_name Name of application to register. + * \brief Handles \ref event_session error processing. + * + * \internal + * + * \param session The \ref event_session object. + * \param error The \ref event_session_error_type to handle. + * \param ser HTTP TCP/TLS Server Session (\ref ast_tcptls_session_instance). + * + * \retval -1 Always returns -1. */ -static int session_register_app(struct event_session *session, - const char *app_name) +static int event_session_allocation_error_handler( + struct event_session *session, enum event_session_error_type error, + struct ast_tcptls_session_instance *ser) { - SCOPED_AO2LOCK(lock, session); + /* Notify the client */ + switch (error) { + case ERROR_TYPE_STASIS_REGISTRATION: + ast_http_error(ser, 500, "Internal Server Error", + "Stasis registration failed"); + break; - ast_assert(session->ws_session != NULL); - ast_assert(session->websocket_apps != NULL); + case ERROR_TYPE_OOM: + ast_http_error(ser, 500, "Internal Server Error", + "Allocation failed"); + break; - if (ast_strlen_zero(app_name)) { - return -1; - } + case ERROR_TYPE_MISSING_APP_PARAM: + ast_http_error(ser, 400, "Bad Request", + "HTTP request is missing param: [app]"); + break; - if (ast_str_container_add(session->websocket_apps, app_name)) { - ast_ari_websocket_session_write(session->ws_session, - ast_ari_oom_json()); - return -1; - } + case ERROR_TYPE_INVALID_APP_PARAM: + ast_http_error(ser, 400, "Bad Request", + "Invalid application provided in param [app]."); + break; - stasis_app_register(app_name, app_handler, session); + default: + break; + } - return 0; + /* Cleanup the session */ + event_session_cleanup(session); + return -1; } -int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session_instance *ser, - struct ast_variable *headers, - struct ast_ari_events_event_websocket_args *args) +/*! + * \brief Creates an \ref event_session object and registers its apps with Stasis. + * + * \internal + * + * \param ser HTTP TCP/TLS Server Session (\ref ast_tcptls_session_instance). + * \param args The Stasis [app] parameters as parsed from the HTTP request + * (\ref ast_ari_events_event_websocket_args). + * \param session_id The id for the websocket session that will be created for this + * event session. + * + * \retval 0 on success + * \retval -1 on failure + */ +static int event_session_alloc(struct ast_tcptls_session_instance *ser, + struct ast_ari_events_event_websocket_args *args, const char *session_id) { - int res = 0; - size_t i, j; - - ast_debug(3, "/events WebSocket attempted\n"); + RAII_VAR(struct event_session *, session, NULL, ao2_cleanup); + size_t size, i; + /* The request must have at least one [app] parameter */ if (args->app_count == 0) { - ast_http_error(ser, 400, "Bad Request", "Missing param 'app'"); - return -1; + return event_session_allocation_error_handler( + session, ERROR_TYPE_MISSING_APP_PARAM, ser); + } + + size = sizeof(*session) + strlen(session_id) + 1; + + /* Instantiate the event session */ + session = ao2_alloc(size, event_session_dtor); + if (!session) { + return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser); } + strncpy(session->session_id, session_id, size - sizeof(*session)); + + /* Instantiate the hash table for Stasis apps */ + session->websocket_apps = + ast_str_container_alloc(APPS_NUM_BUCKETS); + + if (!session->websocket_apps) { + return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser); + } + + /* Instantiate the message queue */ + if (AST_VECTOR_INIT(&session->message_queue, MESSAGES_INIT_SIZE)) { + return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser); + } + + /* Register the apps with Stasis */ for (i = 0; i < args->app_count; ++i) { - if (ast_strlen_zero(args->app[i])) { - res = -1; - break; + const char *app = args->app[i]; + + if (ast_strlen_zero(app)) { + return event_session_allocation_error_handler( + session, ERROR_TYPE_INVALID_APP_PARAM, ser); } - res |= stasis_app_register(args->app[i], app_handler, NULL); - } + if (ast_str_container_add(session->websocket_apps, app)) { + return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser); + } - if (res) { - for (j = 0; j < i; ++j) { - stasis_app_unregister(args->app[j]); + if (stasis_app_register(app, stasis_app_message_handler, session)) { + ast_log(LOG_WARNING, "Stasis registration failed for application: '%s'\n", app); + return event_session_allocation_error_handler( + session, ERROR_TYPE_STASIS_REGISTRATION, ser); } - ast_http_error(ser, 400, "Bad Request", "Invalid application provided in param 'app'."); } - return res; + /* Add the event session to the local registry */ + if (!ao2_link(event_session_registry, session)) { + return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser); + } + + return 0; } -void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websocket_session *ws_session, - struct ast_variable *headers, - struct ast_ari_events_event_websocket_args *args) +int ast_ari_websocket_events_event_websocket_init(void) { - RAII_VAR(struct event_session *, session, NULL, session_cleanup); - struct ast_json *msg; - int res; - size_t i; + /* Try to instantiate the registry */ + event_session_registry = ao2_container_alloc(EVENT_SESSION_NUM_BUCKETS, + event_session_hash, + event_session_compare); + + if (!event_session_registry) { + /* This is bad, bad. */ + ast_log(LOG_WARNING, + "Failed to allocate the local registry for websocket applications\n"); + return -1; + } - ast_debug(3, "/events WebSocket connection\n"); + return 0; +} - session = session_create(ws_session); - if (!session) { - ast_ari_websocket_session_write(ws_session, ast_ari_oom_json()); - return; - } +int ast_ari_websocket_events_event_websocket_attempted( + struct ast_tcptls_session_instance *ser, struct ast_variable *headers, + struct ast_ari_events_event_websocket_args *args, const char *session_id) +{ + ast_debug(3, "/events WebSocket attempted\n"); - res = 0; - for (i = 0; i < args->app_count; ++i) { - if (ast_strlen_zero(args->app[i])) { - continue; - } - res |= session_register_app(session, args->app[i]); - } + /* Create the event session */ + return event_session_alloc(ser, args, session_id); +} - if (ao2_container_count(session->websocket_apps) == 0) { - RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); +void ast_ari_websocket_events_event_websocket_established( + struct ast_ari_websocket_session *ws_session, struct ast_variable *headers, + struct ast_ari_events_event_websocket_args *args) +{ + RAII_VAR(struct event_session *, session, NULL, event_session_cleanup); + struct ast_json *msg; + const char *session_id; - msg = ast_json_pack("{s: s, s: [s]}", - "type", "MissingParams", - "params", "app"); - if (!msg) { - msg = ast_json_ref(ast_ari_oom_json()); - } + ast_debug(3, "/events WebSocket established\n"); - ast_ari_websocket_session_write(session->ws_session, msg); - return; - } + ast_assert(ws_session != NULL); - if (res != 0) { - ast_ari_websocket_session_write(ws_session, ast_ari_oom_json()); - return; + session_id = ast_ari_websocket_session_id(ws_session); + + /* Find the event_session and update its websocket */ + session = ao2_find(event_session_registry, session_id, OBJ_SEARCH_KEY); + + if (session) { + ao2_unlink(event_session_registry, session); + event_session_update_websocket(session, ws_session); + } else { + ast_log(LOG_WARNING, + "Failed to locate an event session for the provided websocket session\n"); } /* We don't process any input, but we'll consume it waiting for EOF */ @@ -309,4 +538,3 @@ void ast_ari_events_user_event(struct ast_variable *headers, "Error processing request"); } } - diff --git a/res/ari/resource_events.h b/res/ari/resource_events.h index 2b631819b266ac93fa9cb23564ed58d778f03db3..bc763ebd3874f0a1b71a81696cddfd2c18500585 100644 --- a/res/ari/resource_events.h +++ b/res/ari/resource_events.h @@ -49,17 +49,27 @@ struct ast_ari_events_event_websocket_args { char *app_parse; }; +/*! + * \brief WebSocket connection for events. + * + * \retval 0 success + * \retval -1 error + */ +int ast_ari_websocket_events_event_websocket_init(void); + /*! * \brief WebSocket connection for events. * * \param ser HTTP TCP/TLS Server Session * \param headers HTTP headers * \param args Swagger parameters + * \param session_id The id of the current session. * * \retval 0 success * \retval non-zero error */ -int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session_instance *ser, struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args); +int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session_instance *ser, + struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args, const char *session_id); /*! * \brief WebSocket connection for events. @@ -67,8 +77,10 @@ int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session * \param session ARI WebSocket. * \param headers HTTP headers. * \param args Swagger parameters. + * \param session_id The id of the current session. */ -void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websocket_session *session, struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args); +void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websocket_session *session, + struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args); /*! Argument struct for ast_ari_events_user_event() */ struct ast_ari_events_user_event_args { /*! Event name */ diff --git a/res/res_ari_events.c b/res/res_ari_events.c index 454233945e7f234e208667b0e83df69868bef5d9..bf33aeaff27a8f24a1900260a3d4f703b87ce438 100644 --- a/res/res_ari_events.c +++ b/res/res_ari_events.c @@ -53,7 +53,8 @@ ASTERISK_REGISTER_FILE() #define MAX_VALS 128 -static int ast_ari_events_event_websocket_ws_attempted_cb(struct ast_tcptls_session_instance *ser, struct ast_variable *get_params, struct ast_variable *headers) +static int ast_ari_events_event_websocket_ws_attempted_cb(struct ast_tcptls_session_instance *ser, + struct ast_variable *get_params, struct ast_variable *headers, const char *session_id) { struct ast_ari_events_event_websocket_args args = {}; int res = 0; @@ -113,7 +114,7 @@ static int ast_ari_events_event_websocket_ws_attempted_cb(struct ast_tcptls_sess {} } - res = ast_ari_websocket_events_event_websocket_attempted(ser, headers, &args); + res = ast_ari_websocket_events_event_websocket_attempted(ser, headers, &args, session_id); fin: __attribute__((unused)) if (!response) { @@ -433,6 +434,10 @@ static int load_module(void) int res = 0; struct ast_websocket_protocol *protocol; + if (ast_ari_websocket_events_event_websocket_init() == -1) { + return AST_MODULE_LOAD_FAILURE; + } + events.ws_server = ast_websocket_server_create(); if (!events.ws_server) { return AST_MODULE_LOAD_FAILURE; diff --git a/res/res_http_websocket.c b/res/res_http_websocket.c index ecae039192bb73e47195575de845bcea967b74d5..3fe774a0be048b393ac8afe300216240e127a4a6 100644 --- a/res/res_http_websocket.c +++ b/res/res_http_websocket.c @@ -38,6 +38,7 @@ ASTERISK_REGISTER_FILE() #include "asterisk/file.h" #include "asterisk/unaligned.h" #include "asterisk/uri.h" +#include "asterisk/uuid.h" #define AST_API_MODULE #include "asterisk/http_websocket.h" @@ -74,18 +75,19 @@ ASTERISK_REGISTER_FILE() /*! \brief Structure definition for session */ struct ast_websocket { - FILE *f; /*!< Pointer to the file instance used for writing and reading */ - int fd; /*!< File descriptor for the session, only used for polling */ - struct ast_sockaddr address; /*!< Address of the remote client */ - enum ast_websocket_opcode opcode; /*!< Cached opcode for multi-frame messages */ - size_t payload_len; /*!< Length of the payload */ - char *payload; /*!< Pointer to the payload */ - size_t reconstruct; /*!< Number of bytes before a reconstructed payload will be returned and a new one started */ - int timeout; /*!< The timeout for operations on the socket */ - unsigned int secure:1; /*!< Bit to indicate that the transport is secure */ - unsigned int closing:1; /*!< Bit to indicate that the session is in the process of being closed */ - unsigned int close_sent:1; /*!< Bit to indicate that the session close opcode has been sent and no further data will be sent */ - struct websocket_client *client; /*!< Client object when connected as a client websocket */ + FILE *f; /*!< Pointer to the file instance used for writing and reading */ + int fd; /*!< File descriptor for the session, only used for polling */ + struct ast_sockaddr address; /*!< Address of the remote client */ + enum ast_websocket_opcode opcode; /*!< Cached opcode for multi-frame messages */ + size_t payload_len; /*!< Length of the payload */ + char *payload; /*!< Pointer to the payload */ + size_t reconstruct; /*!< Number of bytes before a reconstructed payload will be returned and a new one started */ + int timeout; /*!< The timeout for operations on the socket */ + unsigned int secure:1; /*!< Bit to indicate that the transport is secure */ + unsigned int closing:1; /*!< Bit to indicate that the session is in the process of being closed */ + unsigned int close_sent:1; /*!< Bit to indicate that the session close opcode has been sent and no further data will be sent */ + struct websocket_client *client; /*!< Client object when connected as a client websocket */ + char session_id[AST_UUID_STR_LEN]; /*!< The identifier for the websocket session */ }; /*! \brief Hashing function for protocols */ @@ -414,6 +416,12 @@ int AST_OPTIONAL_API_NAME(ast_websocket_set_timeout)(struct ast_websocket *sessi return 0; } +const char * AST_OPTIONAL_API_NAME(ast_websocket_session_id)(struct ast_websocket *session) +{ + return session->session_id; +} + + /* MAINTENANCE WARNING on ast_websocket_read()! * * We have to keep in mind during this function that the fact that session->fd seems ready @@ -764,7 +772,7 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan return 0; } - if (!(session = ao2_alloc(sizeof(*session), session_destroy_fn))) { + if (!(session = ao2_alloc(sizeof(*session) + AST_UUID_STR_LEN + 1, session_destroy_fn))) { ast_log(LOG_WARNING, "WebSocket connection from '%s' could not be accepted\n", ast_sockaddr_stringify(&ser->remote_address)); websocket_bad_request(ser); @@ -773,8 +781,16 @@ int AST_OPTIONAL_API_NAME(ast_websocket_uri_cb)(struct ast_tcptls_session_instan } session->timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT; + /* Generate the session id */ + if (!ast_uuid_generate_str(session->session_id, sizeof(session->session_id))) { + ast_log(LOG_WARNING, "WebSocket connection from '%s' could not be accepted - failed to generate a session id\n", + ast_sockaddr_stringify(&ser->remote_address)); + ast_http_error(ser, 500, "Internal Server Error", "Allocation failed"); + return 0; + } + if (protocol_handler->session_attempted - && protocol_handler->session_attempted(ser, get_vars, headers)) { + && protocol_handler->session_attempted(ser, get_vars, headers, session->session_id)) { ast_debug(3, "WebSocket connection from '%s' rejected by protocol handler '%s'\n", ast_sockaddr_stringify(&ser->remote_address), protocol_handler->name); ao2_ref(protocol_handler, -1); diff --git a/rest-api-templates/ari_resource.h.mustache b/rest-api-templates/ari_resource.h.mustache index d3f40b6bd65e2e21b9a0edaa7092940777242726..f28e832d189dd374890c0b6e3764d2a62b5d4d4c 100644 --- a/rest-api-templates/ari_resource.h.mustache +++ b/rest-api-templates/ari_resource.h.mustache @@ -90,6 +90,18 @@ void ast_ari_{{c_name}}_{{c_nickname}}(struct ast_variable *headers, struct ast_ {{/is_req}} {{#is_websocket}} +/*! + * \brief {{summary}} +{{#notes}} + * + * {{{notes}}} +{{/notes}} + * + * \retval 0 success + * \retval -1 error + */ +int ast_ari_websocket_{{c_name}}_{{c_nickname}}_init(void); + /*! * \brief {{summary}} {{#notes}} @@ -100,11 +112,13 @@ void ast_ari_{{c_name}}_{{c_nickname}}(struct ast_variable *headers, struct ast_ * \param ser HTTP TCP/TLS Server Session * \param headers HTTP headers * \param args Swagger parameters + * \param session_id The id of the current session. * * \retval 0 success * \retval non-zero error */ -int ast_ari_websocket_{{c_name}}_{{c_nickname}}_attempted(struct ast_tcptls_session_instance *ser, struct ast_variable *headers, struct ast_ari_{{c_name}}_{{c_nickname}}_args *args); +int ast_ari_websocket_{{c_name}}_{{c_nickname}}_attempted(struct ast_tcptls_session_instance *ser, + struct ast_variable *headers, struct ast_ari_{{c_name}}_{{c_nickname}}_args *args, const char *session_id); /*! * \brief {{summary}} @@ -116,8 +130,10 @@ int ast_ari_websocket_{{c_name}}_{{c_nickname}}_attempted(struct ast_tcptls_sess * \param session ARI WebSocket. * \param headers HTTP headers. * \param args Swagger parameters. + * \param session_id The id of the current session. */ -void ast_ari_websocket_{{c_name}}_{{c_nickname}}_established(struct ast_ari_websocket_session *session, struct ast_variable *headers, struct ast_ari_{{c_name}}_{{c_nickname}}_args *args); +void ast_ari_websocket_{{c_name}}_{{c_nickname}}_established(struct ast_ari_websocket_session *session, + struct ast_variable *headers, struct ast_ari_{{c_name}}_{{c_nickname}}_args *args); {{/is_websocket}} {{/operations}} {{/apis}} diff --git a/rest-api-templates/res_ari_resource.c.mustache b/rest-api-templates/res_ari_resource.c.mustache index 7fe360e896c75983299e8d4bc2714fa3165948ea..36ca035ee519a59cea1a76955180fa4a2aa17bfc 100644 --- a/rest-api-templates/res_ari_resource.c.mustache +++ b/rest-api-templates/res_ari_resource.c.mustache @@ -137,7 +137,8 @@ fin: __attribute__((unused)) } {{/is_req}} {{#is_websocket}} -static int ast_ari_{{c_name}}_{{c_nickname}}_ws_attempted_cb(struct ast_tcptls_session_instance *ser, struct ast_variable *get_params, struct ast_variable *headers) +static int ast_ari_{{c_name}}_{{c_nickname}}_ws_attempted_cb(struct ast_tcptls_session_instance *ser, + struct ast_variable *get_params, struct ast_variable *headers, const char *session_id) { struct ast_ari_{{c_name}}_{{c_nickname}}_args args = {}; {{#has_parameters}} @@ -156,7 +157,7 @@ static int ast_ari_{{c_name}}_{{c_nickname}}_ws_attempted_cb(struct ast_tcptls_s {{> param_parsing}} - res = ast_ari_websocket_{{c_name}}_{{c_nickname}}_attempted(ser, headers, &args); + res = ast_ari_websocket_{{c_name}}_{{c_nickname}}_attempted(ser, headers, &args, session_id); fin: __attribute__((unused)) if (!response) { @@ -255,6 +256,10 @@ static int load_module(void) {{#has_websocket}} struct ast_websocket_protocol *protocol; + if (ast_ari_websocket_{{c_name}}_{{c_nickname}}_init() == -1) { + return AST_MODULE_LOAD_FAILURE; + } + {{full_name}}.ws_server = ast_websocket_server_create(); if (!{{full_name}}.ws_server) { return AST_MODULE_LOAD_FAILURE;