diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index fefbea0a0e81c5514929307c144f7727bb7d6157..e9e2c62888cba957861e35840dd4b1baaeedf687 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -238,7 +238,7 @@ void stasis_message_router_remove_cache_update( * \brief Sets the default route of a router. * * \param router Router to set the default route of. - * \param callback Callback to forard messages which otherwise have no home. + * \param callback Callback to forward messages which otherwise have no home. * \param data Data pointer to pass to \a callback. * * \retval 0 on success @@ -254,6 +254,27 @@ int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data); +/*! + * \brief Sets the default route of a router with formatters. + * + * \param router Router to set the default route of. + * \param callback Callback to forward messages which otherwise have no home. + * \param data Data pointer to pass to \a callback. + * \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive. + * + * \since 13.26.0 + * \since 16.3.0 + * + * \note If formatters are specified then the message router will remain in a selective + * filtering state. Any explicit routes will receive messages of their message type and + * the default callback will only receive messages that have one of the given formatters. + * Explicit routes will not be filtered according to the given formatters. + */ +void stasis_message_router_set_formatters_default(struct stasis_message_router *router, + stasis_subscription_cb callback, + void *data, + enum stasis_subscription_message_formatters formatters); + /*! * \brief Indicate to a message router that we are interested in messages with one or more formatters. * diff --git a/main/manager.c b/main/manager.c index 47b31bbde6d7ef9a6b66b8a814a381a002dae151..789e779bbe9bf1cdb42185ee3f4efca93380a898 100644 --- a/main/manager.c +++ b/main/manager.c @@ -8887,8 +8887,8 @@ static int manager_subscriptions_init(void) stasis_message_router_set_congestion_limits(stasis_router, -1, 6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); - res |= stasis_message_router_set_default(stasis_router, - manager_default_msg_cb, NULL); + stasis_message_router_set_formatters_default(stasis_router, + manager_default_msg_cb, NULL, STASIS_SUBSCRIPTION_FORMATTER_AMI); res |= stasis_message_router_add(stasis_router, ast_manager_get_generic_type(), manager_generic_msg_cb, NULL); diff --git a/main/manager_channels.c b/main/manager_channels.c index 6938c6ee4ca8ac6060e1914be915e0cc95e7f8e9..f66b60bab24a5cc70a8670c652ff2c1830be925d 100644 --- a/main/manager_channels.c +++ b/main/manager_channels.c @@ -1240,6 +1240,10 @@ int manager_channels_init(void) ast_register_cleanup(manager_channels_shutdown); + /* The snapshot type has a special handler as it can result in multiple + * manager events being queued due to aspects of the snapshot itself + * changing. + */ ret |= stasis_message_router_add_cache_update(message_router, ast_channel_snapshot_type(), channel_snapshot_update, NULL); diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 8ada0dddf34b6addb7f8fb725aa86cf1cfd7bf21..1c7972f9c04ffde77cb4fe67591cd5ec5ff58c18 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -389,19 +389,34 @@ void stasis_message_router_remove_cache_update( int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data) +{ + stasis_message_router_set_formatters_default(router, callback, data, STASIS_SUBSCRIPTION_FORMATTER_NONE); + + /* While this implementation can never fail, it used to be able to */ + return 0; +} + +void stasis_message_router_set_formatters_default(struct stasis_message_router *router, + stasis_subscription_cb callback, + void *data, + enum stasis_subscription_message_formatters formatters) { ast_assert(router != NULL); ast_assert(callback != NULL); + stasis_subscription_accept_formatters(router->subscription, formatters); + ao2_lock(router); router->default_route.callback = callback; router->default_route.data = data; ao2_unlock(router); - stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE); - - /* While this implementation can never fail, it used to be able to */ - return 0; + if (formatters == STASIS_SUBSCRIPTION_FORMATTER_NONE) { + /* Formatters govern what messages the default callback get, so it is only if none is + * specified that we accept all messages regardless. + */ + stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE); + } } void stasis_message_router_accept_formatters(struct stasis_message_router *router, diff --git a/res/stasis/app.c b/res/stasis/app.c index ccb93bc4dd29b0235e8d5270801edd95b46c1418..fe05d36815fd0d05949dab21cd0101ab9a49f093 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -333,16 +333,25 @@ static void call_forwarded_handler(struct stasis_app *app, struct stasis_message ast_channel_unref(chan); } -static void sub_default_handler(void *data, struct stasis_subscription *sub, +static void sub_subscription_change_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message) { struct stasis_app *app = data; - struct ast_json *json; if (stasis_subscription_final_message(sub, message)) { ao2_cleanup(app); } +} +static void sub_default_handler(void *data, struct stasis_subscription *sub, + struct stasis_message *message) +{ + struct stasis_app *app = data; + struct ast_json *json; + + /* The dial type can be converted to JSON so it will always be passed + * here. + */ if (stasis_message_type(message) == ast_channel_dial_type()) { call_forwarded_handler(app, message); } @@ -846,7 +855,7 @@ static void bridge_attended_transfer_handler(void *data, struct stasis_subscript } } -static void bridge_default_handler(void *data, struct stasis_subscription *sub, +static void bridge_subscription_change_handler(void *data, struct stasis_subscription *sub, struct stasis_message *message) { struct stasis_app *app = data; @@ -973,8 +982,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat res |= stasis_message_router_add(app->bridge_router, ast_attended_transfer_type(), bridge_attended_transfer_handler, app); - res |= stasis_message_router_set_default(app->bridge_router, - bridge_default_handler, app); + res |= stasis_message_router_add(app->bridge_router, + stasis_subscription_change_type(), bridge_subscription_change_handler, app); if (res != 0) { return NULL; @@ -996,8 +1005,11 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat res |= stasis_message_router_add_cache_update(app->router, ast_endpoint_snapshot_type(), sub_endpoint_update_handler, app); - res |= stasis_message_router_set_default(app->router, - sub_default_handler, app); + res |= stasis_message_router_add(app->router, + stasis_subscription_change_type(), sub_subscription_change_handler, app); + + stasis_message_router_set_formatters_default(app->router, + sub_default_handler, app, STASIS_SUBSCRIPTION_FORMATTER_JSON); if (res != 0) { return NULL;