diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index 649eef13717309a6df6fb6582ca8617417cbd083..ba3f8bbaa6ab702b0c0365b3b267e74b47faa5ee 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -228,7 +228,7 @@ void stasis_message_router_remove_cache_update( * \brief Sets the default route of a router. * * \param router Router to set the default route of. - * \param callback Callback to forard messages which otherwise have no home. + * \param callback Callback to forward messages which otherwise have no home. * \param data Data pointer to pass to \a callback. * * \retval 0 on success @@ -244,6 +244,27 @@ int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data); +/*! + * \brief Sets the default route of a router with formatters. + * + * \param router Router to set the default route of. + * \param callback Callback to forward messages which otherwise have no home. + * \param data Data pointer to pass to \a callback. + * \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive. + * + * \since 13.26.0 + * \since 16.3.0 + * + * \note If formatters are specified then the message router will remain in a selective + * filtering state. Any explicit routes will receive messages of their message type and + * the default callback will only receive messages that have one of the given formatters. + * Explicit routes will not be filtered according to the given formatters. + */ +void stasis_message_router_set_formatters_default(struct stasis_message_router *router, + stasis_subscription_cb callback, + void *data, + enum stasis_subscription_message_formatters formatters); + /*! * \brief Indicate to a message router that we are interested in messages with one or more formatters. * diff --git a/main/manager.c b/main/manager.c index 3e41198c0f03b81c8653b5f2753f8fd2bb86213e..0c715e45f2893ebf7d5ba3bce910f52c68d250a0 100644 --- a/main/manager.c +++ b/main/manager.c @@ -8899,8 +8899,8 @@ static int manager_subscriptions_init(void) stasis_message_router_set_congestion_limits(stasis_router, -1, 6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); - res |= stasis_message_router_set_default(stasis_router, - manager_default_msg_cb, NULL); + stasis_message_router_set_formatters_default(stasis_router, + manager_default_msg_cb, NULL, STASIS_SUBSCRIPTION_FORMATTER_AMI); res |= stasis_message_router_add(stasis_router, ast_manager_get_generic_type(), manager_generic_msg_cb, NULL); diff --git a/main/manager_channels.c b/main/manager_channels.c index edbc770a075ae9a9a59c8dcb13f0f863874fac6e..218e0b4a6057edf5832b719a327a5a184b34a5b6 100644 --- a/main/manager_channels.c +++ b/main/manager_channels.c @@ -1255,6 +1255,10 @@ int manager_channels_init(void) ast_register_cleanup(manager_channels_shutdown); + /* The snapshot type has a special handler as it can result in multiple + * manager events being queued due to aspects of the snapshot itself + * changing. + */ ret |= stasis_message_router_add(message_router, ast_channel_snapshot_type(), channel_snapshot_update, NULL); diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 3d1a1a091bb53670ab145d2a38b79d9067637b59..7f2358f5b6820a1c097b791c74bf9fc2ca23233b 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -387,19 +387,34 @@ void stasis_message_router_remove_cache_update( int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data) +{ + stasis_message_router_set_formatters_default(router, callback, data, STASIS_SUBSCRIPTION_FORMATTER_NONE); + + /* While this implementation can never fail, it used to be able to */ + return 0; +} + +void stasis_message_router_set_formatters_default(struct stasis_message_router *router, + stasis_subscription_cb callback, + void *data, + enum stasis_subscription_message_formatters formatters) { ast_assert(router != NULL); ast_assert(callback != NULL); + stasis_subscription_accept_formatters(router->subscription, formatters); + ao2_lock(router); router->default_route.callback = callback; router->default_route.data = data; ao2_unlock(router); - stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE); - - /* While this implementation can never fail, it used to be able to */ - return 0; + if (formatters == STASIS_SUBSCRIPTION_FORMATTER_NONE) { + /* Formatters govern what messages the default callback get, so it is only if none is + * specified that we accept all messages regardless. + */ + stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE); + } } void stasis_message_router_accept_formatters(struct stasis_message_router *router, diff --git a/res/stasis/app.c b/res/stasis/app.c index eb49243c58c7686fa0c30578ada630b6b4d12ea5..585eddaf8121ac4de646244734d6fb83706c35a0 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -317,16 +317,25 @@ static void call_forwarded_handler(struct stasis_app *app, struct stasis_message ast_channel_unref(chan); } -static void sub_default_handler(void *data, struct stasis_subscription *sub, +static void sub_subscription_change_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message) { struct stasis_app *app = data; - struct ast_json *json; if (stasis_subscription_final_message(sub, message)) { ao2_cleanup(app); } +} + +static void sub_default_handler(void *data, struct stasis_subscription *sub, + struct stasis_message *message) +{ + struct stasis_app *app = data; + struct ast_json *json; + /* The dial type can be converted to JSON so it will always be passed + * here. + */ if (stasis_message_type(message) == ast_channel_dial_type()) { call_forwarded_handler(app, message); } @@ -803,7 +812,7 @@ static void bridge_attended_transfer_handler(void *data, struct stasis_subscript } } -static void bridge_default_handler(void *data, struct stasis_subscription *sub, +static void bridge_subscription_change_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message) { struct stasis_app *app = data; @@ -930,8 +939,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat res |= stasis_message_router_add(app->bridge_router, ast_attended_transfer_type(), bridge_attended_transfer_handler, app); - res |= stasis_message_router_set_default(app->bridge_router, - bridge_default_handler, app); + res |= stasis_message_router_add(app->bridge_router, + stasis_subscription_change_type(), bridge_subscription_change_handler, app); if (res != 0) { return NULL; @@ -953,8 +962,11 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat res |= stasis_message_router_add_cache_update(app->router, ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app); - res |= stasis_message_router_set_default(app->router, - sub_default_handler, app); + res |= stasis_message_router_add(app->router, + stasis_subscription_change_type(), sub_subscription_change_handler, app); + + stasis_message_router_set_formatters_default(app->router, + sub_default_handler, app, STASIS_SUBSCRIPTION_FORMATTER_JSON); if (res != 0) { return NULL;