diff --git a/include/asterisk/stasis_app.h b/include/asterisk/stasis_app.h index 567670b69293056dda10df8c969c42ce91ff3637..f2b07e0bfebb93eba2150023dc92cb49fd2780bc 100644 --- a/include/asterisk/stasis_app.h +++ b/include/asterisk/stasis_app.h @@ -91,6 +91,21 @@ struct ao2_container *stasis_app_get_all(void); */ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data); +/*! + * \brief Register a new Stasis application that receives all Asterisk events. + * + * If an application is already registered with the given name, the old + * application is sent a 'replaced' message and unregistered. + * + * \param app_name Name of this application. + * \param handler Callback for application messages. + * \param data Data blob to pass to the callback. Must be AO2 managed. + * + * \return 0 for success + * \return -1 for error. + */ +int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data); + /*! * \brief Unregister a Stasis application. * \param app_name Name of the application to unregister. diff --git a/res/ari/resource_events.c b/res/ari/resource_events.c index deb7f9cc05774db3861a60db18d24ff15b3c65ff..8fa15f5aa069d7eec2cd28186c0294023e3e33af 100644 --- a/res/ari/resource_events.c +++ b/res/ari/resource_events.c @@ -280,7 +280,9 @@ static void event_session_cleanup(struct event_session *session) } event_session_shutdown(session); - ao2_unlink(event_session_registry, session); + if (event_session_registry) { + ao2_unlink(event_session_registry, session); + } } /*! @@ -367,6 +369,7 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser, struct ast_ari_events_event_websocket_args *args, const char *session_id) { RAII_VAR(struct event_session *, session, NULL, ao2_cleanup); + int (* register_handler)(const char *, stasis_app_cb handler, void *data); size_t size, i; /* The request must have at least one [app] parameter */ @@ -399,6 +402,12 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser, } /* Register the apps with Stasis */ + if (args->subscribe_all) { + register_handler = &stasis_app_register_all; + } else { + register_handler = &stasis_app_register; + } + for (i = 0; i < args->app_count; ++i) { const char *app = args->app[i]; @@ -411,10 +420,10 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser, return event_session_allocation_error_handler(session, ERROR_TYPE_OOM, ser); } - if (stasis_app_register(app, stasis_app_message_handler, session)) { + if (register_handler(app, stasis_app_message_handler, session)) { ast_log(LOG_WARNING, "Stasis registration failed for application: '%s'\n", app); return event_session_allocation_error_handler( - session, ERROR_TYPE_STASIS_REGISTRATION, ser); + session, ERROR_TYPE_STASIS_REGISTRATION, ser); } } @@ -426,8 +435,17 @@ static int event_session_alloc(struct ast_tcptls_session_instance *ser, return 0; } +static int event_session_shutdown_cb(void *session, void *arg, int flags) +{ + event_session_cleanup(session); + + return 0; +} + void ast_ari_websocket_events_event_websocket_dtor(void) { + ao2_callback(event_session_registry, OBJ_MULTIPLE | OBJ_NODATA, event_session_shutdown_cb, NULL); + ao2_cleanup(event_session_registry); event_session_registry = NULL; } @@ -462,7 +480,8 @@ void ast_ari_websocket_events_event_websocket_established( struct ast_ari_websocket_session *ws_session, struct ast_variable *headers, struct ast_ari_events_event_websocket_args *args) { - RAII_VAR(struct event_session *, session, NULL, event_session_cleanup); + struct event_session *session; + struct ast_json *msg; const char *session_id; @@ -474,7 +493,6 @@ void ast_ari_websocket_events_event_websocket_established( /* Find the event_session and update its websocket */ session = ao2_find(event_session_registry, session_id, OBJ_SEARCH_KEY); - if (session) { ao2_unlink(event_session_registry, session); event_session_update_websocket(session, ws_session); @@ -487,6 +505,9 @@ void ast_ari_websocket_events_event_websocket_established( while ((msg = ast_ari_websocket_session_read(ws_session))) { ast_json_unref(msg); } + + event_session_cleanup(session); + ao2_ref(session, -1); } void ast_ari_events_user_event(struct ast_variable *headers, diff --git a/res/ari/resource_events.h b/res/ari/resource_events.h index aa1e3dfd645f2c829419477e8d4c95bf2b1f5bc3..8c03af4b0e2df7cf45b53bf6a910198354cbbff7 100644 --- a/res/ari/resource_events.h +++ b/res/ari/resource_events.h @@ -47,6 +47,8 @@ struct ast_ari_events_event_websocket_args { size_t app_count; /*! Parsing context for app. */ char *app_parse; + /*! Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'. */ + int subscribe_all; }; /*! diff --git a/res/res_ari_events.c b/res/res_ari_events.c index 4b2b151aa0ad0b339d1e51550fc9449d24da306e..e4fda0a54b3b1aebf6697f71336f8e3d1395081e 100644 --- a/res/res_ari_events.c +++ b/res/res_ari_events.c @@ -111,6 +111,9 @@ static int ast_ari_events_event_websocket_ws_attempted_cb(struct ast_tcptls_sess args.app[j] = (vals[j]); } } else + if (strcmp(i->name, "subscribeAll") == 0) { + args.subscribe_all = ast_true(i->value); + } else {} } @@ -209,6 +212,9 @@ static void ast_ari_events_event_websocket_ws_established_cb(struct ast_websocke args.app[j] = (vals[j]); } } else + if (strcmp(i->name, "subscribeAll") == 0) { + args.subscribe_all = ast_true(i->value); + } else {} } diff --git a/res/res_stasis.c b/res/res_stasis.c index f7d8299f44547dbd7144f9749b95f145e872624d..69e9b935d042d98ddb0a4c89519918b3b7aa7d44 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -109,6 +109,11 @@ struct ao2_container *app_bridges_moh; struct ao2_container *app_bridges_playback; +/*! + * \internal \brief List of registered event sources. + */ +AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source); + static struct ast_json *stasis_end_to_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize) { @@ -1469,7 +1474,7 @@ struct ao2_container *stasis_app_get_all(void) return ao2_bump(apps); } -int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data) +static int __stasis_app_register(const char *app_name, stasis_app_cb handler, void *data, int all_events) { RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup); @@ -1482,8 +1487,20 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data) if (app) { app_update(app, handler, data); } else { - app = app_create(app_name, handler, data); + app = app_create(app_name, handler, data, all_events ? STASIS_APP_SUBSCRIBE_ALL : STASIS_APP_SUBSCRIBE_MANUAL); if (app) { + if (all_events) { + struct stasis_app_event_source *source; + SCOPED_LOCK(lock, &event_sources, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK); + + AST_LIST_TRAVERSE(&event_sources, source, next) { + if (!source->subscribe) { + continue; + } + + source->subscribe(app, NULL); + } + } ao2_link_flags(apps_registry, app, OBJ_NOLOCK); } else { ao2_unlock(apps_registry); @@ -1499,6 +1516,16 @@ int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data) return 0; } +int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data) +{ + return __stasis_app_register(app_name, handler, data, 0); +} + +int stasis_app_register_all(const char *app_name, stasis_app_cb handler, void *data) +{ + return __stasis_app_register(app_name, handler, data, 1); +} + void stasis_app_unregister(const char *app_name) { RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup); @@ -1526,11 +1553,6 @@ void stasis_app_unregister(const char *app_name) cleanup(); } -/*! - * \internal \brief List of registered event sources. - */ -AST_RWLIST_HEAD_STATIC(event_sources, stasis_app_event_source); - void stasis_app_register_event_source(struct stasis_app_event_source *obj) { SCOPED_LOCK(lock, &event_sources, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); @@ -1727,8 +1749,8 @@ static enum stasis_app_subscribe_res app_subscribe( ast_debug(3, "%s: Checking %s\n", app_name, uri); - if (!event_source->find || - (!(obj = event_source->find(app, uri + strlen(event_source->scheme))))) { + if (!ast_strlen_zero(uri + strlen(event_source->scheme)) && + (!event_source->find || (!(obj = event_source->find(app, uri + strlen(event_source->scheme)))))) { ast_log(LOG_WARNING, "Event source not found: %s\n", uri); return STASIS_ASR_EVENT_SOURCE_NOT_FOUND; } @@ -2062,6 +2084,7 @@ static int load_module(void) } AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support", + .load_pri = AST_MODPRI_APP_DEPEND, .support_level = AST_MODULE_SUPPORT_CORE, .load = load_module, .unload = unload_module, diff --git a/res/stasis/app.c b/res/stasis/app.c index b99e23205e560fef439f0854bdb1a5048b245329..5002a0ba88aff20ba391cdbb7f797f0ac11b877b 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -38,6 +38,10 @@ ASTERISK_REGISTER_FILE() #include "asterisk/stasis_endpoints.h" #include "asterisk/stasis_message_router.h" +#define BRIDGE_ALL "__AST_BRIDGE_ALL_TOPIC" +#define CHANNEL_ALL "__AST_CHANNEL_ALL_TOPIC" +#define ENDPOINT_ALL "__AST_ENDPOINT_ALL_TOPIC" + static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int terminate); struct stasis_app { @@ -47,12 +51,16 @@ struct stasis_app { struct stasis_message_router *router; /*! Router for handling messages to the bridge all \a topic. */ struct stasis_message_router *bridge_router; + /*! Optional router for handling endpoint messages in 'all' subscriptions */ + struct stasis_message_router *endpoint_router; /*! Container of the channel forwards to this app's topic. */ struct ao2_container *forwards; /*! Callback function for this application. */ stasis_app_cb handler; /*! Opaque data to hand to callback function. */ void *data; + /*! Subscription model for the application */ + enum stasis_app_subscription_model subscription_model; /*! Name of the Stasis application */ char name[]; }; @@ -121,34 +129,33 @@ static struct app_forwards *forwards_create(struct stasis_app *app, static struct app_forwards *forwards_create_channel(struct stasis_app *app, struct ast_channel *chan) { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + struct app_forwards *forwards; - if (!app || !chan) { + if (!app) { return NULL; } - forwards = forwards_create(app, ast_channel_uniqueid(chan)); + forwards = forwards_create(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL); if (!forwards) { return NULL; } forwards->forward_type = FORWARD_CHANNEL; - forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan), - app->topic); - if (!forwards->topic_forward) { - return NULL; + if (chan) { + forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan), + app->topic); } - forwards->topic_cached_forward = stasis_forward_all( - ast_channel_topic_cached(chan), app->topic); - if (!forwards->topic_cached_forward) { + chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(), + app->topic); + + if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) { /* Half-subscribed is a bad thing */ - stasis_forward_cancel(forwards->topic_forward); - forwards->topic_forward = NULL; + forwards_unsubscribe(forwards); + ao2_ref(forwards, -1); return NULL; } - ao2_ref(forwards, +1); return forwards; } @@ -156,69 +163,100 @@ static struct app_forwards *forwards_create_channel(struct stasis_app *app, static struct app_forwards *forwards_create_bridge(struct stasis_app *app, struct ast_bridge *bridge) { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + struct app_forwards *forwards; - if (!app || !bridge) { + if (!app) { return NULL; } - forwards = forwards_create(app, bridge->uniqueid); + forwards = forwards_create(app, bridge ? bridge->uniqueid : BRIDGE_ALL); if (!forwards) { return NULL; } forwards->forward_type = FORWARD_BRIDGE; - forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), - app->topic); - if (!forwards->topic_forward) { - return NULL; + if (bridge) { + forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), + app->topic); } - forwards->topic_cached_forward = stasis_forward_all( - ast_bridge_topic_cached(bridge), app->topic); - if (!forwards->topic_cached_forward) { + bridge ? ast_bridge_topic_cached(bridge) : ast_bridge_topic_all_cached(), + app->topic); + + if ((!forwards->topic_forward && bridge) || !forwards->topic_cached_forward) { /* Half-subscribed is a bad thing */ - stasis_forward_cancel(forwards->topic_forward); - forwards->topic_forward = NULL; + forwards_unsubscribe(forwards); + ao2_ref(forwards, -1); return NULL; } - ao2_ref(forwards, +1); return forwards; } +static void endpoint_state_cb(void *data, struct stasis_subscription *sub, + struct stasis_message *message) +{ + struct stasis_app *app = data; + + stasis_publish(app->topic, message); +} + /*! Forward a endpoint's topics to an app */ static struct app_forwards *forwards_create_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint) { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + struct app_forwards *forwards; + int ret = 0; - if (!app || !endpoint) { + if (!app) { return NULL; } - forwards = forwards_create(app, ast_endpoint_get_id(endpoint)); + forwards = forwards_create(app, endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL); if (!forwards) { return NULL; } forwards->forward_type = FORWARD_ENDPOINT; - forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint), - app->topic); - if (!forwards->topic_forward) { - return NULL; - } + if (endpoint) { + forwards->topic_forward = stasis_forward_all(ast_endpoint_topic(endpoint), + app->topic); + forwards->topic_cached_forward = stasis_forward_all( + ast_endpoint_topic_cached(endpoint), app->topic); + + if (!forwards->topic_forward || !forwards->topic_cached_forward) { + /* Half-subscribed is a bad thing */ + forwards_unsubscribe(forwards); + ao2_ref(forwards, -1); + return NULL; + } + } else { + /* Since endpoint subscriptions also subscribe to channels, in the case + * of all endpoint subscriptions, we only want messages for the endpoints. + * As such, we route those particular messages and then re-publish them + * on the app's topic. + */ + ast_assert(app->endpoint_router == NULL); + app->endpoint_router = stasis_message_router_create(ast_endpoint_topic_all_cached()); + if (!app->endpoint_router) { + forwards_unsubscribe(forwards); + ao2_ref(forwards, -1); + return NULL; + } - forwards->topic_cached_forward = stasis_forward_all( - ast_endpoint_topic_cached(endpoint), app->topic); - if (!forwards->topic_cached_forward) { - /* Half-subscribed is a bad thing */ - stasis_forward_cancel(forwards->topic_forward); - forwards->topic_forward = NULL; - return NULL; + ret |= stasis_message_router_add(app->endpoint_router, + ast_endpoint_state_type(), endpoint_state_cb, app); + ret |= stasis_message_router_add(app->endpoint_router, + ast_endpoint_contact_state_type(), endpoint_state_cb, app); + + if (ret) { + ao2_ref(app->endpoint_router, -1); + app->endpoint_router = NULL; + ao2_ref(forwards, -1); + return NULL; + } } - ao2_ref(forwards, +1); return forwards; } @@ -260,6 +298,7 @@ static void app_dtor(void *obj) ast_assert(app->router == NULL); ast_assert(app->bridge_router == NULL); + ast_assert(app->endpoint_router == NULL); ao2_cleanup(app->topic); app->topic = NULL; @@ -793,7 +832,7 @@ static void bridge_default_handler(void *data, struct stasis_subscription *sub, } } -struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data) +struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model) { RAII_VAR(struct stasis_app *, app, NULL, ao2_cleanup); size_t size; @@ -806,10 +845,10 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat size = sizeof(*app) + strlen(name) + 1; app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX); - if (!app) { return NULL; } + app->subscription_model = subscription_model; app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT, @@ -877,7 +916,8 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat return app; } -struct stasis_topic *ast_app_get_topic(struct stasis_app *app) { +struct stasis_topic *ast_app_get_topic(struct stasis_app *app) +{ return app->topic; } @@ -930,6 +970,8 @@ void app_shutdown(struct stasis_app *app) app->router = NULL; stasis_message_router_unsubscribe(app->bridge_router); app->bridge_router = NULL; + stasis_message_router_unsubscribe(app->endpoint_router); + app->endpoint_router = NULL; } int app_is_active(struct stasis_app *app) @@ -1029,34 +1071,47 @@ struct ast_json *app_to_json(const struct stasis_app *app) int app_subscribe_channel(struct stasis_app *app, struct ast_channel *chan) { + struct app_forwards *forwards; + SCOPED_AO2LOCK(lock, app->forwards); int res; - if (!app || !chan) { + if (!app) { return -1; - } else { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); - SCOPED_AO2LOCK(lock, app->forwards); + } - forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan), - OBJ_SEARCH_KEY | OBJ_NOLOCK); - if (!forwards) { - /* Forwards not found, create one */ - forwards = forwards_create_channel(app, chan); - if (!forwards) { - return -1; - } + /* If subscribed to all, don't subscribe again */ + forwards = ao2_find(app->forwards, CHANNEL_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (forwards) { + ao2_ref(forwards, -1); + return 0; + } - res = ao2_link_flags(app->forwards, forwards, - OBJ_NOLOCK); - if (!res) { - return -1; - } + forwards = ao2_find(app->forwards, + chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL, + OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!forwards) { + /* Forwards not found, create one */ + forwards = forwards_create_channel(app, chan); + if (!forwards) { + return -1; } - ++forwards->interested; - ast_debug(3, "Channel '%s' is %d interested in %s\n", ast_channel_uniqueid(chan), forwards->interested, app->name); - return 0; + res = ao2_link_flags(app->forwards, forwards, + OBJ_NOLOCK); + if (!res) { + ao2_ref(forwards, -1); + return -1; + } } + + ++forwards->interested; + ast_debug(3, "Channel '%s' is %d interested in %s\n", + chan ? ast_channel_uniqueid(chan) : "ALL", + forwards->interested, + app->name); + + ao2_ref(forwards, -1); + return 0; } static int subscribe_channel(struct stasis_app *app, void *obj) @@ -1069,6 +1124,19 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); SCOPED_AO2LOCK(lock, app->forwards); + if (!id) { + if (!strcmp(kind, "bridge")) { + id = BRIDGE_ALL; + } else if (!strcmp(kind, "channel")) { + id = CHANNEL_ALL; + } else if (!strcmp(kind, "endpoint")) { + id = ENDPOINT_ALL; + } else { + ast_log(LOG_WARNING, "Unknown subscription kind '%s'\n", kind); + return -1; + } + } + forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK); if (!forwards) { ast_debug(3, "App '%s' not subscribed to %s '%s'\n", app->name, kind, id); @@ -1095,16 +1163,16 @@ static int unsubscribe(struct stasis_app *app, const char *kind, const char *id, int app_unsubscribe_channel(struct stasis_app *app, struct ast_channel *chan) { - if (!app || !chan) { + if (!app) { return -1; } - return app_unsubscribe_channel_id(app, ast_channel_uniqueid(chan)); + return app_unsubscribe_channel_id(app, chan ? ast_channel_uniqueid(chan) : CHANNEL_ALL); } int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id) { - if (!app || !channel_id) { + if (!app) { return -1; } @@ -1114,6 +1182,10 @@ int app_unsubscribe_channel_id(struct stasis_app *app, const char *channel_id) int app_is_subscribed_channel_id(struct stasis_app *app, const char *channel_id) { RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + + if (ast_strlen_zero(channel_id)) { + channel_id = CHANNEL_ALL; + } forwards = ao2_find(app->forwards, channel_id, OBJ_SEARCH_KEY); return forwards != NULL; } @@ -1133,28 +1205,39 @@ struct stasis_app_event_source channel_event_source = { int app_subscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge) { - if (!app || !bridge) { + struct app_forwards *forwards; + SCOPED_AO2LOCK(lock, app->forwards); + + if (!app) { return -1; - } else { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); - SCOPED_AO2LOCK(lock, app->forwards); + } - forwards = ao2_find(app->forwards, bridge->uniqueid, - OBJ_SEARCH_KEY | OBJ_NOLOCK); + /* If subscribed to all, don't subscribe again */ + forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (forwards) { + ao2_ref(forwards, -1); + return 0; + } + forwards = ao2_find(app->forwards, bridge ? bridge->uniqueid : BRIDGE_ALL, + OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!forwards) { + /* Forwards not found, create one */ + forwards = forwards_create_bridge(app, bridge); if (!forwards) { - /* Forwards not found, create one */ - forwards = forwards_create_bridge(app, bridge); - if (!forwards) { - return -1; - } - ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); + return -1; } - - ++forwards->interested; - ast_debug(3, "Bridge '%s' is %d interested in %s\n", bridge->uniqueid, forwards->interested, app->name); - return 0; + ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); } + + ++forwards->interested; + ast_debug(3, "Bridge '%s' is %d interested in %s\n", + bridge ? bridge->uniqueid : "ALL", + forwards->interested, + app->name); + + ao2_ref(forwards, -1); + return 0; } static int subscribe_bridge(struct stasis_app *app, void *obj) @@ -1164,16 +1247,16 @@ static int subscribe_bridge(struct stasis_app *app, void *obj) int app_unsubscribe_bridge(struct stasis_app *app, struct ast_bridge *bridge) { - if (!app || !bridge) { + if (!app) { return -1; } - return app_unsubscribe_bridge_id(app, bridge->uniqueid); + return app_unsubscribe_bridge_id(app, bridge ? bridge->uniqueid : BRIDGE_ALL); } int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id) { - if (!app || !bridge_id) { + if (!app) { return -1; } @@ -1182,9 +1265,26 @@ int app_unsubscribe_bridge_id(struct stasis_app *app, const char *bridge_id) int app_is_subscribed_bridge_id(struct stasis_app *app, const char *bridge_id) { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); - forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY); - return forwards != NULL; + struct app_forwards *forwards; + SCOPED_AO2LOCK(lock, app->forwards); + + forwards = ao2_find(app->forwards, BRIDGE_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (forwards) { + ao2_ref(forwards, -1); + return 1; + } + + if (ast_strlen_zero(bridge_id)) { + bridge_id = BRIDGE_ALL; + } + + forwards = ao2_find(app->forwards, bridge_id, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (forwards) { + ao2_ref(forwards, -1); + return 1; + } + + return 0; } static void *bridge_find(const struct stasis_app *app, const char *id) @@ -1202,31 +1302,43 @@ struct stasis_app_event_source bridge_event_source = { int app_subscribe_endpoint(struct stasis_app *app, struct ast_endpoint *endpoint) { - if (!app || !endpoint) { + struct app_forwards *forwards; + SCOPED_AO2LOCK(lock, app->forwards); + + if (!app) { return -1; - } else { - RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); - SCOPED_AO2LOCK(lock, app->forwards); + } - forwards = ao2_find(app->forwards, ast_endpoint_get_id(endpoint), - OBJ_SEARCH_KEY | OBJ_NOLOCK); + /* If subscribed to all, don't subscribe again */ + forwards = ao2_find(app->forwards, ENDPOINT_ALL, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (forwards) { + ao2_ref(forwards, -1); + return 0; + } + forwards = ao2_find(app->forwards, + endpoint ? ast_endpoint_get_id(endpoint) : ENDPOINT_ALL, + OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!forwards) { + /* Forwards not found, create one */ + forwards = forwards_create_endpoint(app, endpoint); if (!forwards) { - /* Forwards not found, create one */ - forwards = forwards_create_endpoint(app, endpoint); - if (!forwards) { - return -1; - } - ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); - - /* Subscribe for messages */ - messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app); + return -1; } + ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); - ++forwards->interested; - ast_debug(3, "Endpoint '%s' is %d interested in %s\n", ast_endpoint_get_id(endpoint), forwards->interested, app->name); - return 0; + /* Subscribe for messages */ + messaging_app_subscribe_endpoint(app->name, endpoint, &message_received_handler, app); } + + ++forwards->interested; + ast_debug(3, "Endpoint '%s' is %d interested in %s\n", + endpoint ? ast_endpoint_get_id(endpoint) : "ALL", + forwards->interested, + app->name); + + ao2_ref(forwards, -1); + return 0; } static int subscribe_endpoint(struct stasis_app *app, void *obj) @@ -1236,7 +1348,7 @@ static int subscribe_endpoint(struct stasis_app *app, void *obj) int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id) { - if (!app || !endpoint_id) { + if (!app) { return -1; } @@ -1246,6 +1358,10 @@ int app_unsubscribe_endpoint_id(struct stasis_app *app, const char *endpoint_id) int app_is_subscribed_endpoint_id(struct stasis_app *app, const char *endpoint_id) { RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + + if (ast_strlen_zero(endpoint_id)) { + endpoint_id = ENDPOINT_ALL; + } forwards = ao2_find(app->forwards, endpoint_id, OBJ_SEARCH_KEY); return forwards != NULL; } diff --git a/res/stasis/app.h b/res/stasis/app.h index 59574f5849ad648c44c6b3d1c0f2718123b22df6..2c8db1ccd3b3a2ce2e1e9319cb8603dee67de9ca 100644 --- a/res/stasis/app.h +++ b/res/stasis/app.h @@ -36,6 +36,19 @@ */ struct stasis_app; +enum stasis_app_subscription_model { + /* + * \brief An application must manually subscribe to each + * resource that it cares about. This is the default approach. + */ + STASIS_APP_SUBSCRIBE_MANUAL, + /* + * \brief An application is automatically subscribed to all + * resources in Asterisk, even if it does not control them. + */ + STASIS_APP_SUBSCRIBE_ALL +}; + /*! * \brief Create a res_stasis application. * @@ -45,7 +58,7 @@ struct stasis_app; * \return New \c res_stasis application. * \return \c NULL on error. */ -struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data); +struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *data, enum stasis_app_subscription_model subscription_model); /*! * \brief Tears down an application. diff --git a/res/stasis/messaging.c b/res/stasis/messaging.c index bf8aebb969236a07b4b641aad1c85121a94fd72e..16e167e8f439ecb9a9c1fd1a99fa868a75c96427 100644 --- a/res/stasis/messaging.c +++ b/res/stasis/messaging.c @@ -37,6 +37,11 @@ ASTERISK_REGISTER_FILE() #include "asterisk/test.h" #include "messaging.h" +/*! + * \brief Subscription to all technologies + */ +#define TECH_WILDCARD "__AST_ALL_TECH" + /*! * \brief Number of buckets for the \ref endpoint_subscriptions container */ @@ -219,10 +224,14 @@ static int has_destination_cb(const struct ast_msg *msg) for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) { sub = AST_VECTOR_GET(&tech_subscriptions, i); - if (sub && (!strncasecmp(sub->token, buf, strlen(sub->token)) - || !strncasecmp(sub->token, buf, strlen(sub->token)))) { + if (!sub) { + continue; + } + + if (!strcmp(sub->token, TECH_WILDCARD) + || !strncasecmp(sub->token, buf, strlen(sub->token)) + || !strncasecmp(sub->token, buf, strlen(sub->token))) { ast_rwlock_unlock(&tech_subscriptions_lock); - sub = NULL; /* No ref bump! */ goto match; } @@ -231,6 +240,7 @@ static int has_destination_cb(const struct ast_msg *msg) sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY); if (sub) { + ao2_ref(sub, -1); goto match; } @@ -238,7 +248,6 @@ static int has_destination_cb(const struct ast_msg *msg) return 0; match: - ao2_cleanup(sub); return 1; } @@ -301,7 +310,8 @@ static int handle_msg_cb(struct ast_msg *msg) continue; } - if (!strncasecmp(sub->token, buf, strlen(sub->token))) { + if (!strcmp(sub->token, TECH_WILDCARD) + || !strncasecmp(sub->token, buf, strlen(sub->token))) { ast_rwlock_unlock(&tech_subscriptions_lock); ao2_bump(sub); endpoint_name = buf; @@ -374,7 +384,7 @@ static struct message_subscription *get_subscription(struct ast_endpoint *endpoi { struct message_subscription *sub = NULL; - if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { + if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { sub = ao2_find(endpoint_subscriptions, endpoint, OBJ_SEARCH_KEY); } else { int i; @@ -383,7 +393,7 @@ static struct message_subscription *get_subscription(struct ast_endpoint *endpoi for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) { sub = AST_VECTOR_GET(&tech_subscriptions, i); - if (sub && !strcmp(sub->token, ast_endpoint_get_tech(endpoint))) { + if (sub && !strcmp(sub->token, endpoint ? ast_endpoint_get_tech(endpoint) : TECH_WILDCARD)) { ao2_bump(sub); break; } @@ -400,10 +410,6 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup); endpoint = ast_endpoint_find_by_id(endpoint_id); - if (!endpoint) { - return; - } - sub = get_subscription(endpoint); if (!sub) { return; @@ -417,11 +423,11 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi AST_VECTOR_REMOVE_CMP_UNORDERED(&sub->applications, app_name, application_tuple_cmp, ao2_cleanup); if (AST_VECTOR_SIZE(&sub->applications) == 0) { - if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { + if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { ao2_unlink(endpoint_subscriptions, sub); } else { ast_rwlock_wrlock(&tech_subscriptions_lock); - AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, ast_endpoint_get_id(endpoint), + AST_VECTOR_REMOVE_CMP_UNORDERED(&tech_subscriptions, endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD, messaging_subscription_cmp, AST_VECTOR_ELEM_CLEANUP_NOOP); ast_rwlock_unlock(&tech_subscriptions_lock); } @@ -429,9 +435,9 @@ void messaging_app_unsubscribe_endpoint(const char *app_name, const char *endpoi ao2_unlock(sub); ao2_ref(sub, -1); - ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint)); + ast_debug(3, "App '%s' unsubscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --"); ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Unsubscribed\r\nAppName: %s\r\nToken: %s\r\n", - app_name, ast_endpoint_get_id(endpoint)); + app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL"); } static struct message_subscription *get_or_create_subscription(struct ast_endpoint *endpoint) @@ -442,12 +448,12 @@ static struct message_subscription *get_or_create_subscription(struct ast_endpoi return sub; } - sub = message_subscription_alloc(ast_endpoint_get_id(endpoint)); + sub = message_subscription_alloc(endpoint ? ast_endpoint_get_id(endpoint) : TECH_WILDCARD); if (!sub) { return NULL; } - if (!ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { + if (endpoint && !ast_strlen_zero(ast_endpoint_get_resource(endpoint))) { ao2_link(endpoint_subscriptions, sub); } else { ast_rwlock_wrlock(&tech_subscriptions_lock); @@ -482,9 +488,9 @@ int messaging_app_subscribe_endpoint(const char *app_name, struct ast_endpoint * AST_VECTOR_APPEND(&sub->applications, tuple); ao2_unlock(sub); - ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, ast_endpoint_get_id(endpoint)); + ast_debug(3, "App '%s' subscribed to messages from endpoint '%s'\n", app_name, endpoint ? ast_endpoint_get_id(endpoint) : "-- ALL --"); ast_test_suite_event_notify("StasisMessagingSubscription", "SubState: Subscribed\r\nAppName: %s\r\nToken: %s\r\n", - app_name, ast_endpoint_get_id(endpoint)); + app_name, endpoint ? ast_endpoint_get_id(endpoint) : "ALL"); return 0; } diff --git a/rest-api/api-docs/events.json b/rest-api/api-docs/events.json index 54269a407bfc92dc212f45308186cd576e0f545e..dee7c2db934c6fe0fbd6afea41b9da517bc1377d 100644 --- a/rest-api/api-docs/events.json +++ b/rest-api/api-docs/events.json @@ -26,6 +26,14 @@ "required": true, "allowMultiple": true, "dataType": "string" + }, + { + "name": "subscribeAll", + "description": "Subscribe to all Asterisk events. If provided, the applications listed will be subscribed to all events, effectively disabling the application specific subscriptions. Default is 'false'.", + "paramType": "query", + "required": false, + "allowMultiple": false, + "dataType": "boolean" } ] }