From 42922af7b81b87bf588db8871e978455a19149d3 Mon Sep 17 00:00:00 2001 From: "Joshua C. Colp" <jcolp@digium.com> Date: Thu, 10 Jan 2019 15:34:32 -0400 Subject: [PATCH] stasis / manager / ari: Better filter messages. Previously both AMI and ARI used a default route on their stasis message router to handle some of the messages for publishing out their respective connection. This caused messages to be given to their subscription that could not be formatted into AMI or JSON. This change adds an API call to the stasis message router which allows a default route to be set as well as formatters that the default route is expecting. This allows both AMI and ARI to specify that their default route only wants messages of their given formatter. By doing so stasis can more intelligently filter at publishing time so that they do not receive messages which will not be turned into AMI or JSON. ASTERISK-28244 Change-Id: I65272819a53ce99f869181d1d370da559a7d1703 --- include/asterisk/stasis_message_router.h | 23 ++++++++++++++++++++- main/manager.c | 4 ++-- main/manager_channels.c | 4 ++++ main/stasis_message_router.c | 23 +++++++++++++++++---- res/stasis/app.c | 26 +++++++++++++++++------- 5 files changed, 66 insertions(+), 14 deletions(-) diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index fefbea0a0e..e9e2c62888 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -238,7 +238,7 @@ void stasis_message_router_remove_cache_update( * \brief Sets the default route of a router. * * \param router Router to set the default route of. - * \param callback Callback to forard messages which otherwise have no home. + * \param callback Callback to forward messages which otherwise have no home. * \param data Data pointer to pass to \a callback. * * \retval 0 on success @@ -254,6 +254,27 @@ int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data); +/*! + * \brief Sets the default route of a router with formatters. + * + * \param router Router to set the default route of. + * \param callback Callback to forward messages which otherwise have no home. + * \param data Data pointer to pass to \a callback. + * \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive. + * + * \since 13.26.0 + * \since 16.3.0 + * + * \note If formatters are specified then the message router will remain in a selective + * filtering state. Any explicit routes will receive messages of their message type and + * the default callback will only receive messages that have one of the given formatters. + * Explicit routes will not be filtered according to the given formatters. + */ +void stasis_message_router_set_formatters_default(struct stasis_message_router *router, + stasis_subscription_cb callback, + void *data, + enum stasis_subscription_message_formatters formatters); + /*! * \brief Indicate to a message router that we are interested in messages with one or more formatters. * diff --git a/main/manager.c b/main/manager.c index 47b31bbde6..789e779bbe 100644 --- a/main/manager.c +++ b/main/manager.c @@ -8887,8 +8887,8 @@ static int manager_subscriptions_init(void) stasis_message_router_set_congestion_limits(stasis_router, -1, 6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); - res |= stasis_message_router_set_default(stasis_router, - manager_default_msg_cb, NULL); + stasis_message_router_set_formatters_default(stasis_router, + manager_default_msg_cb, NULL, STASIS_SUBSCRIPTION_FORMATTER_AMI); res |= stasis_message_router_add(stasis_router, ast_manager_get_generic_type(), manager_generic_msg_cb, NULL); diff --git a/main/manager_channels.c b/main/manager_channels.c index 6938c6ee4c..f66b60bab2 100644 --- a/main/manager_channels.c +++ b/main/manager_channels.c @@ -1240,6 +1240,10 @@ int manager_channels_init(void) ast_register_cleanup(manager_channels_shutdown); + /* The snapshot type has a special handler as it can result in multiple + * manager events being queued due to aspects of the snapshot itself + * changing. + */ ret |= stasis_message_router_add_cache_update(message_router, ast_channel_snapshot_type(), channel_snapshot_update, NULL); diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 8ada0dddf3..1c7972f9c0 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -389,19 +389,34 @@ void stasis_message_router_remove_cache_update( int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data) +{ + stasis_message_router_set_formatters_default(router, callback, data, STASIS_SUBSCRIPTION_FORMATTER_NONE); + + /* While this implementation can never fail, it used to be able to */ + return 0; +} + +void stasis_message_router_set_formatters_default(struct stasis_message_router *router, + stasis_subscription_cb callback, + void *data, + enum stasis_subscription_message_formatters formatters) { ast_assert(router != NULL); ast_assert(callback != NULL); + stasis_subscription_accept_formatters(router->subscription, formatters); + ao2_lock(router); router->default_route.callback = callback; router->default_route.data = data; ao2_unlock(router); - stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE); - - /* While this implementation can never fail, it used to be able to */ - return 0; + if (formatters == STASIS_SUBSCRIPTION_FORMATTER_NONE) { + /* Formatters govern what messages the default callback get, so it is only if none is + * specified that we accept all messages regardless. + */ + stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE); + } } void stasis_message_router_accept_formatters(struct stasis_message_router *router, diff --git a/res/stasis/app.c b/res/stasis/app.c index ccb93bc4dd..fe05d36815 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -333,16 +333,25 @@ static void call_forwarded_handler(struct stasis_app *app, struct stasis_message ast_channel_unref(chan); } -static void sub_default_handler(void *data, struct stasis_subscription *sub, +static void sub_subscription_change_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message) { struct stasis_app *app = data; - struct ast_json *json; if (stasis_subscription_final_message(sub, message)) { ao2_cleanup(app); } +} +static void sub_default_handler(void *data, struct stasis_subscription *sub, + struct stasis_message *message) +{ + struct stasis_app *app = data; + struct ast_json *json; + + /* The dial type can be converted to JSON so it will always be passed + * here. + */ if (stasis_message_type(message) == ast_channel_dial_type()) { call_forwarded_handler(app, message); } @@ -846,7 +855,7 @@ static void bridge_attended_transfer_handler(void *data, struct stasis_subscript } } -static void bridge_default_handler(void *data, struct stasis_subscription *sub, +static void bridge_subscription_change_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message) { struct stasis_app *app = data; @@ -973,8 +982,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat res |= stasis_message_router_add(app->bridge_router, ast_attended_transfer_type(), bridge_attended_transfer_handler, app); - res |= stasis_message_router_set_default(app->bridge_router, - bridge_default_handler, app); + res |= stasis_message_router_add(app->bridge_router, + stasis_subscription_change_type(), bridge_subscription_change_handler, app); if (res != 0) { return NULL; @@ -996,8 +1005,11 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat res |= stasis_message_router_add_cache_update(app->router, ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app); - res |= stasis_message_router_set_default(app->router, - sub_default_handler, app); + res |= stasis_message_router_add(app->router, + stasis_subscription_change_type(), sub_subscription_change_handler, app); + + stasis_message_router_set_formatters_default(app->router, + sub_default_handler, app, STASIS_SUBSCRIPTION_FORMATTER_JSON); if (res != 0) { return NULL; -- GitLab