diff --git a/main/stasis_bridges.c b/main/stasis_bridges.c index 2a79056d11bda7fd512f5df7a8d6b205d71405aa..be1294ad0a9b2f4cc2db08830426d1a49c9574c4 100644 --- a/main/stasis_bridges.c +++ b/main/stasis_bridges.c @@ -132,6 +132,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") static struct ast_manager_event_blob *attended_transfer_to_ami(struct stasis_message *message); static struct ast_manager_event_blob *blind_transfer_to_ami(struct stasis_message *message); +static struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg); +static struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg); +static struct ast_json *ast_bridge_merge_message_to_json(struct stasis_message *msg); static struct stasis_cp_all *bridge_cache_all; @@ -139,9 +142,12 @@ static struct stasis_cp_all *bridge_cache_all; * @{ \brief Define bridge message types. */ STASIS_MESSAGE_TYPE_DEFN(ast_bridge_snapshot_type); -STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type); -STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type); -STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type); +STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type, + .to_json = ast_bridge_merge_message_to_json); +STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type, + .to_json = ast_channel_entered_bridge_to_json); +STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type, + .to_json = ast_channel_left_bridge_to_json); STASIS_MESSAGE_TYPE_DEFN(ast_blind_transfer_type, .to_ami = blind_transfer_to_ami); STASIS_MESSAGE_TYPE_DEFN(ast_attended_transfer_type, .to_ami = attended_transfer_to_ami); /*! @} */ @@ -307,6 +313,19 @@ static struct ast_bridge_merge_message *bridge_merge_message_create(struct ast_b return msg; } +static struct ast_json *ast_bridge_merge_message_to_json(struct stasis_message *msg) +{ + struct ast_bridge_merge_message *merge; + + merge = stasis_message_data(msg); + + return ast_json_pack("{s: s, s: o, s: o, s: o}", + "type", "BridgeMerged", + "timestamp", ast_json_timeval(*stasis_message_timestamp(msg), NULL), + "bridge", ast_bridge_snapshot_to_json(merge->to), + "bridge_from", ast_bridge_snapshot_to_json(merge->from)); +} + void ast_bridge_publish_merge(struct ast_bridge *to, struct ast_bridge *from) { RAII_VAR(struct ast_bridge_merge_message *, merge_msg, NULL, ao2_cleanup); @@ -417,6 +436,35 @@ void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *cha stasis_publish(ast_bridge_topic(bridge), msg); } +static struct ast_json *simple_bridge_channel_event( + const char *type, + struct ast_bridge_snapshot *bridge_snapshot, + struct ast_channel_snapshot *channel_snapshot, + const struct timeval *tv) +{ + return ast_json_pack("{s: s, s: o, s: o, s: o}", + "type", type, + "timestamp", ast_json_timeval(*tv, NULL), + "bridge", ast_bridge_snapshot_to_json(bridge_snapshot), + "channel", ast_channel_snapshot_to_json(channel_snapshot)); +} + +struct ast_json *ast_channel_entered_bridge_to_json(struct stasis_message *msg) +{ + struct ast_bridge_blob *obj = stasis_message_data(msg); + + return simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge, + obj->channel, stasis_message_timestamp(msg)); +} + +struct ast_json *ast_channel_left_bridge_to_json(struct stasis_message *msg) +{ + struct ast_bridge_blob *obj = stasis_message_data(msg); + + return simple_bridge_channel_event("ChannelLeftBridge", obj->bridge, + obj->channel, stasis_message_timestamp(msg)); +} + typedef struct ast_json *(*json_item_serializer_cb)(void *obj); static struct ast_json *container_to_json_array(struct ao2_container *items, json_item_serializer_cb item_cb) diff --git a/res/res_ari_asterisk.c b/res/res_ari_asterisk.c index 3f34c7ab64112393544fc1e5acc12ba0c142f1fe..3f0c285ad475bec4ee64daf88513d29a2dbb072d 100644 --- a/res/res_ari_asterisk.c +++ b/res/res_ari_asterisk.c @@ -81,8 +81,16 @@ static void ast_ari_get_asterisk_info_cb( goto fin; } - args.only_count = ast_app_separate_args( - args.only_parse, ',', vals, ARRAY_LEN(vals)); + if (strlen(args.only_parse) == 0) { + /* ast_app_separate_args can't handle "" */ + args.only_count = 1; + vals[0] = args.only_parse; + } else { + args.only_count = ast_app_separate_args( + args.only_parse, ',', vals, + ARRAY_LEN(vals)); + } + if (args.only_count == 0) { ast_ari_response_alloc_failed(response); goto fin; diff --git a/res/res_ari_bridges.c b/res/res_ari_bridges.c index bc8e20041e4c3ea8a19df3b4a154b3a0b1b6127f..d3b3a649d20469717b1c973d74758bf8a0cb4280 100644 --- a/res/res_ari_bridges.c +++ b/res/res_ari_bridges.c @@ -300,8 +300,16 @@ static void ast_ari_add_channel_to_bridge_cb( goto fin; } - args.channel_count = ast_app_separate_args( - args.channel_parse, ',', vals, ARRAY_LEN(vals)); + if (strlen(args.channel_parse) == 0) { + /* ast_app_separate_args can't handle "" */ + args.channel_count = 1; + vals[0] = args.channel_parse; + } else { + args.channel_count = ast_app_separate_args( + args.channel_parse, ',', vals, + ARRAY_LEN(vals)); + } + if (args.channel_count == 0) { ast_ari_response_alloc_failed(response); goto fin; @@ -403,8 +411,16 @@ static void ast_ari_remove_channel_from_bridge_cb( goto fin; } - args.channel_count = ast_app_separate_args( - args.channel_parse, ',', vals, ARRAY_LEN(vals)); + if (strlen(args.channel_parse) == 0) { + /* ast_app_separate_args can't handle "" */ + args.channel_count = 1; + vals[0] = args.channel_parse; + } else { + args.channel_count = ast_app_separate_args( + args.channel_parse, ',', vals, + ARRAY_LEN(vals)); + } + if (args.channel_count == 0) { ast_ari_response_alloc_failed(response); goto fin; diff --git a/res/res_ari_events.c b/res/res_ari_events.c index 5cea06f0ed019bf2de1a29943ddffce5cca452e0..567167f14c4f44abe036d3c0fd7de3447972b24f 100644 --- a/res/res_ari_events.c +++ b/res/res_ari_events.c @@ -89,8 +89,16 @@ static void ast_ari_event_websocket_ws_cb(struct ast_websocket *ws_session, goto fin; } - args.app_count = ast_app_separate_args( - args.app_parse, ',', vals, ARRAY_LEN(vals)); + if (strlen(args.app_parse) == 0) { + /* ast_app_separate_args can't handle "" */ + args.app_count = 1; + vals[0] = args.app_parse; + } else { + args.app_count = ast_app_separate_args( + args.app_parse, ',', vals, + ARRAY_LEN(vals)); + } + if (args.app_count == 0) { ast_ari_response_alloc_failed(response); goto fin; @@ -126,14 +134,16 @@ fin: __attribute__((unused)) * negotiation. Param parsing should happen earlier, but we * need a way to pass it through the WebSocket code to the * callback */ - RAII_VAR(char *, msg, NULL, ast_free); + RAII_VAR(char *, msg, NULL, ast_json_free); if (response->message) { msg = ast_json_dump_string(response->message); } else { - msg = ast_strdup("?"); + ast_log(LOG_ERROR, "Missing response message\n"); + } + if (msg) { + ast_websocket_write(ws_session, + AST_WEBSOCKET_OPCODE_TEXT, msg, strlen(msg)); } - ast_websocket_write(ws_session, AST_WEBSOCKET_OPCODE_TEXT, msg, - strlen(msg)); } ast_free(args.app_parse); ast_free(args.app); diff --git a/res/res_stasis.c b/res/res_stasis.c index 35c1847bdefc0f51eb8c75177beb9693a78508b0..ab2bf5c8690714a1e21fe9299017d8812f12be89 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -86,6 +86,12 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") */ #define CONTROLS_NUM_BUCKETS 127 +/*! + * \brief Number of buckets for the Stasis bridges hash table. Remember to + * keep it a prime number! + */ +#define BRIDGES_NUM_BUCKETS 127 + /*! * \brief Stasis application container. */ @@ -97,12 +103,6 @@ struct ao2_container *app_bridges; struct ao2_container *app_bridges_moh; -/*! \brief Message router for the channel caching topic */ -struct stasis_message_router *channel_router; - -/*! \brief Message router for the bridge caching topic */ -struct stasis_message_router *bridge_router; - /*! AO2 hash function for \ref app */ static int app_hash(const void *obj, const int flags) { @@ -153,6 +153,30 @@ static int control_compare(void *lhs, void *rhs, int flags) } } +static int cleanup_cb(void *obj, void *arg, int flags) +{ + struct app *app = obj; + + if (!app_is_finished(app)) { + return 0; + } + + ast_verb(1, "Shutting down application '%s'\n", app_name(app)); + app_shutdown(app); + + return CMP_MATCH; + +} + +/*! + * \brief Clean up any old apps that we don't need any more. + */ +static void cleanup(void) +{ + ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK, + cleanup_cb, NULL); +} + struct stasis_app_control *stasis_app_control_create(struct ast_channel *chan) { return control_create(chan); @@ -435,229 +459,6 @@ struct ast_bridge *stasis_app_bridge_find_by_id( return ao2_find(app_bridges, bridge_id, OBJ_KEY); } -/*! \brief Typedef for blob handler callbacks */ -typedef struct ast_json *(*channel_blob_handler_cb)(struct ast_channel_blob *); - -/*! \brief Callback to check whether an app is watching a given channel */ -static int app_watching_channel_cb(void *obj, void *arg, int flags) -{ - struct app *app = obj; - char *uniqueid = arg; - - return app_is_watching_channel(app, uniqueid) ? CMP_MATCH : 0; -} - -/*! \brief Get a container full of apps that are interested in the specified channel */ -static struct ao2_container *get_apps_watching_channel(const char *uniqueid) -{ - struct ao2_container *watching_apps; - char *uniqueid_dup; - RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy); - ast_assert(uniqueid != NULL); - - uniqueid_dup = ast_strdupa(uniqueid); - - watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_channel_cb, uniqueid_dup); - watching_apps = watching_apps_iter->c; - - if (!ao2_container_count(watching_apps)) { - return NULL; - } - - ao2_ref(watching_apps, +1); - return watching_apps_iter->c; -} - -/*! \brief Typedef for callbacks that get called on channel snapshot updates */ -typedef struct ast_json *(*channel_snapshot_monitor)( - struct ast_channel_snapshot *old_snapshot, - struct ast_channel_snapshot *new_snapshot, - const struct timeval *tv); - -static struct ast_json *simple_channel_event( - const char *type, - struct ast_channel_snapshot *snapshot, - const struct timeval *tv) -{ - return ast_json_pack("{s: s, s: o, s: o}", - "type", type, - "timestamp", ast_json_timeval(*tv, NULL), - "channel", ast_channel_snapshot_to_json(snapshot)); -} - -static struct ast_json *channel_created_event( - struct ast_channel_snapshot *snapshot, - const struct timeval *tv) -{ - return simple_channel_event("ChannelCreated", snapshot, tv); -} - -static struct ast_json *channel_destroyed_event( - struct ast_channel_snapshot *snapshot, - const struct timeval *tv) -{ - return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}", - "type", "ChannelDestroyed", - "timestamp", ast_json_timeval(*tv, NULL), - "cause", snapshot->hangupcause, - "cause_txt", ast_cause2str(snapshot->hangupcause), - "channel", ast_channel_snapshot_to_json(snapshot)); -} - -static struct ast_json *channel_state_change_event( - struct ast_channel_snapshot *snapshot, - const struct timeval *tv) -{ - return simple_channel_event("ChannelStateChange", snapshot, tv); -} - -/*! \brief Handle channel state changes */ -static struct ast_json *channel_state( - struct ast_channel_snapshot *old_snapshot, - struct ast_channel_snapshot *new_snapshot, - const struct timeval *tv) -{ - struct ast_channel_snapshot *snapshot = new_snapshot ? new_snapshot : old_snapshot; - - if (!old_snapshot) { - return channel_created_event(snapshot, tv); - } else if (!new_snapshot) { - return channel_destroyed_event(snapshot, tv); - } else if (old_snapshot->state != new_snapshot->state) { - return channel_state_change_event(snapshot, tv); - } - - return NULL; -} - -static struct ast_json *channel_dialplan( - struct ast_channel_snapshot *old_snapshot, - struct ast_channel_snapshot *new_snapshot, - const struct timeval *tv) -{ - RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); - - /* No Newexten event on cache clear */ - if (!new_snapshot) { - return NULL; - } - - /* Empty application is not valid for a Newexten event */ - if (ast_strlen_zero(new_snapshot->appl)) { - return NULL; - } - - if (old_snapshot && ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) { - return NULL; - } - - return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}", - "type", "ChannelDialplan", - "timestamp", ast_json_timeval(*tv, NULL), - "dialplan_app", new_snapshot->appl, - "dialplan_app_data", new_snapshot->data, - "channel", ast_channel_snapshot_to_json(new_snapshot)); -} - -static struct ast_json *channel_callerid( - struct ast_channel_snapshot *old_snapshot, - struct ast_channel_snapshot *new_snapshot, - const struct timeval *tv) -{ - RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); - - /* No NewCallerid event on cache clear or first event */ - if (!old_snapshot || !new_snapshot) { - return NULL; - } - - if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) { - return NULL; - } - - return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}", - "type", "ChannelCallerId", - "timestamp", ast_json_timeval(*tv, NULL), - "caller_presentation", new_snapshot->caller_pres, - "caller_presentation_txt", ast_describe_caller_presentation( - new_snapshot->caller_pres), - "channel", ast_channel_snapshot_to_json(new_snapshot)); -} - -channel_snapshot_monitor channel_monitors[] = { - channel_state, - channel_dialplan, - channel_callerid -}; - -static int app_send_cb(void *obj, void *arg, int flags) -{ - struct app *app = obj; - struct ast_json *msg = arg; - - app_send(app, msg); - return 0; -} - -static void sub_channel_snapshot_handler(void *data, - struct stasis_subscription *sub, - struct stasis_topic *topic, - struct stasis_message *message) -{ - RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup); - struct stasis_cache_update *update = stasis_message_data(message); - struct ast_channel_snapshot *new_snapshot = stasis_message_data(update->new_snapshot); - struct ast_channel_snapshot *old_snapshot = stasis_message_data(update->old_snapshot); - /* Pull timestamp from the new snapshot, or from the update message - * when there isn't one. */ - const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message); - int i; - - watching_apps = get_apps_watching_channel(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid); - if (!watching_apps) { - return; - } - - for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) { - RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); - - msg = channel_monitors[i](old_snapshot, new_snapshot, tv); - if (msg) { - ao2_callback(watching_apps, OBJ_NODATA, app_send_cb, msg); - } - } -} - -static void distribute_message(struct ao2_container *apps, struct ast_json *msg) -{ - ao2_callback(apps, OBJ_NODATA, app_send_cb, msg); -} - -static void sub_channel_blob_handler(void *data, - struct stasis_subscription *sub, - struct stasis_topic *topic, - struct stasis_message *message) -{ - RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); - RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup); - struct ast_channel_blob *obj = stasis_message_data(message); - - if (!obj->snapshot) { - return; - } - - msg = stasis_message_to_json(message); - if (!msg) { - return; - } - - watching_apps = get_apps_watching_channel(obj->snapshot->uniqueid); - if (!watching_apps) { - return; - } - - distribute_message(watching_apps, msg); -} /*! * \brief In addition to running ao2_cleanup(), this function also removes the @@ -709,7 +510,7 @@ void stasis_app_bridge_destroy(const char *bridge_id) ast_bridge_destroy(bridge, 0); } -int app_send_start_msg(struct app *app, struct ast_channel *chan, +static int send_start_msg(struct app *app, struct ast_channel *chan, int argc, char *argv[]) { RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); @@ -726,8 +527,9 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan, return -1; } - msg = ast_json_pack("{s: s, s: [], s: o}", + msg = ast_json_pack("{s: s, s: o, s: [], s: o}", "type", "StasisStart", + "timestamp", ast_json_timeval(ast_tvnow(), NULL), "args", "channel", ast_channel_snapshot_to_json(snapshot)); if (!msg) { @@ -750,7 +552,7 @@ int app_send_start_msg(struct app *app, struct ast_channel *chan, return 0; } -int app_send_end_msg(struct app *app, struct ast_channel *chan) +static int send_end_msg(struct app *app, struct ast_channel *chan) { RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); @@ -763,8 +565,9 @@ int app_send_end_msg(struct app *app, struct ast_channel *chan) return -1; } - msg = ast_json_pack("{s: s, s: o}", + msg = ast_json_pack("{s: s, s: o, s: o}", "type", "StasisEnd", + "timestamp", ast_json_timeval(ast_tvnow(), NULL), "channel", ast_channel_snapshot_to_json(snapshot)); if (!msg) { return -1; @@ -815,15 +618,17 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, } ao2_link(app_controls, control); - res = app_send_start_msg(app, chan, argc, argv); + res = send_start_msg(app, chan, argc, argv); if (res != 0) { ast_log(LOG_ERROR, "Error sending start message to '%s'\n", app_name); - return res; + return -1; } - if (app_add_channel(app, chan)) { - ast_log(LOG_ERROR, "Error adding listener for channel %s to app %s\n", ast_channel_name(chan), app_name); + res = app_subscribe_channel(app, chan); + if (res != 0) { + ast_log(LOG_ERROR, "Error subscribing app '%s' to channel '%s'\n", + app_name, ast_channel_name(chan)); return -1; } @@ -831,13 +636,23 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor); int r; int command_count; + struct ast_bridge *last_bridge = NULL; + struct ast_bridge *bridge = NULL; /* Check to see if a bridge absorbed our hangup frame */ if (ast_check_hangup_locked(chan)) { break; } - if (stasis_app_get_bridge(control)) { + last_bridge = bridge; + bridge = stasis_app_get_bridge(control); + + if (bridge != last_bridge) { + app_unsubscribe_bridge(app, last_bridge); + app_subscribe_bridge(app, bridge); + } + + if (bridge) { /* Bridge is handling channel frames */ control_wait(control); control_dispatch_all(control, chan); @@ -882,14 +697,21 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc, } } - app_remove_channel(app, chan); - res = app_send_end_msg(app, chan); + app_unsubscribe_bridge(app, stasis_app_get_bridge(control)); + app_unsubscribe_channel(app, chan); + + res = send_end_msg(app, chan); if (res != 0) { ast_log(LOG_ERROR, "Error sending end message to %s\n", app_name); return res; } + /* There's an off chance that app is ready for cleanup. Go ahead + * and clean up, just in case + */ + cleanup(); + return res; } @@ -912,29 +734,6 @@ int stasis_app_send(const char *app_name, struct ast_json *message) return 0; } -static int cleanup_cb(void *obj, void *arg, int flags) -{ - struct app *app = obj; - - if (!app_is_finished(app)) { - return 0; - } - - ast_verb(1, "Cleaning up application '%s'\n", app_name(app)); - - return CMP_MATCH; - -} - -/*! - * \brief Clean up any old apps that we don't need any more. - */ -static void cleanup(void) -{ - ao2_callback(apps_registry, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK, - cleanup_cb, NULL); -} - int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data) { RAII_VAR(struct app *, app, NULL, ao2_cleanup); @@ -994,249 +793,22 @@ void stasis_app_unref(void) ast_module_unref(ast_module_info->self); } -/*! \brief Callback to check whether an app is watching a given bridge */ -static int app_watching_bridge_cb(void *obj, void *arg, int flags) -{ - struct app *app = obj; - char *uniqueid = arg; - - return app_is_watching_bridge(app, uniqueid) ? CMP_MATCH : 0; -} - -/*! \brief Get a container full of apps that are interested in the specified bridge */ -static struct ao2_container *get_apps_watching_bridge(const char *uniqueid) -{ - struct ao2_container *watching_apps; - char *uniqueid_dup; - RAII_VAR(struct ao2_iterator *,watching_apps_iter, NULL, ao2_iterator_destroy); - ast_assert(uniqueid != NULL); - - uniqueid_dup = ast_strdupa(uniqueid); - - watching_apps_iter = ao2_callback(apps_registry, OBJ_MULTIPLE, app_watching_bridge_cb, uniqueid_dup); - watching_apps = watching_apps_iter->c; - - if (!ao2_container_count(watching_apps)) { - return NULL; - } - - ao2_ref(watching_apps, +1); - return watching_apps_iter->c; -} - -/*! Callback used to remove an app's interest in a bridge */ -static int remove_bridge_cb(void *obj, void *arg, int flags) -{ - app_remove_bridge(obj, arg); - return 0; -} - -static struct ast_json *simple_bridge_event( - const char *type, - struct ast_bridge_snapshot *snapshot, - const struct timeval *tv) -{ - return ast_json_pack("{s: s, s: o, s: o}", - "type", type, - "timestamp", ast_json_timeval(*tv, NULL), - "bridge", ast_bridge_snapshot_to_json(snapshot)); -} - -static struct ast_json *simple_bridge_channel_event( - const char *type, - struct ast_bridge_snapshot *bridge_snapshot, - struct ast_channel_snapshot *channel_snapshot, - const struct timeval *tv) -{ - return ast_json_pack("{s: s, s: o, s: o, s: o}", - "type", type, - "timestamp", ast_json_timeval(*tv, NULL), - "bridge", ast_bridge_snapshot_to_json(bridge_snapshot), - "channel", ast_channel_snapshot_to_json(channel_snapshot)); -} - -static void sub_bridge_snapshot_handler(void *data, - struct stasis_subscription *sub, - struct stasis_topic *topic, - struct stasis_message *message) -{ - RAII_VAR(struct ao2_container *, watching_apps, NULL, ao2_cleanup); - struct stasis_cache_update *update = stasis_message_data(message); - struct ast_bridge_snapshot *new_snapshot = stasis_message_data(update->new_snapshot); - struct ast_bridge_snapshot *old_snapshot = stasis_message_data(update->old_snapshot); - const struct timeval *tv = update->new_snapshot ? stasis_message_timestamp(update->new_snapshot) : stasis_message_timestamp(message); - - RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); - - watching_apps = get_apps_watching_bridge(new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid); - if (!watching_apps || !ao2_container_count(watching_apps)) { - return; - } - - if (!new_snapshot) { - RAII_VAR(char *, bridge_id, ast_strdup(old_snapshot->uniqueid), ast_free); - - /* The bridge has gone away. Create the message, make sure no apps are - * watching this bridge anymore, and destroy the bridge's control - * structure */ - msg = simple_bridge_event("BridgeDestroyed", old_snapshot, tv); - ao2_callback(watching_apps, OBJ_NODATA, remove_bridge_cb, bridge_id); - stasis_app_bridge_destroy(old_snapshot->uniqueid); - } else if (!old_snapshot) { - msg = simple_bridge_event("BridgeCreated", old_snapshot, tv); - } - - if (!msg) { - return; - } - - distribute_message(watching_apps, msg); -} - -/*! \brief Callback used to merge two containers of applications */ -static int list_merge_cb(void *obj, void *arg, int flags) -{ - /* remove any current entries for this app */ - ao2_find(arg, obj, OBJ_POINTER | OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE); - /* relink as the only entry */ - ao2_link(arg, obj); - return 0; -} - -/*! \brief Merge container src into container dst without modifying src */ -static void update_apps_list(struct ao2_container *dst, struct ao2_container *src) -{ - ao2_callback(src, OBJ_NODATA, list_merge_cb, dst); -} - -/*! \brief Callback for adding to an app's bridges of interest */ -static int app_add_bridge_cb(void *obj, void *arg, int flags) -{ - app_add_bridge(obj, arg); - return 0; -} - -/*! \brief Add interest in the given bridge to all apps in the container */ -static void update_bridge_interest(struct ao2_container *apps, const char *bridge_id) -{ - RAII_VAR(char *, bridge_id_dup, ast_strdup(bridge_id), ast_free); - ao2_callback(apps, OBJ_NODATA, app_add_bridge_cb, bridge_id_dup); -} - -static void sub_bridge_merge_handler(void *data, - struct stasis_subscription *sub, - struct stasis_topic *topic, - struct stasis_message *message) -{ - RAII_VAR(struct ao2_container *, watching_apps_to, NULL, ao2_cleanup); - RAII_VAR(struct ao2_container *, watching_apps_from, NULL, ao2_cleanup); - RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup); - struct ast_bridge_merge_message *merge = stasis_message_data(message); - RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); - RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref); - const struct timeval *tv = stasis_message_timestamp(message); - - watching_apps_to = get_apps_watching_bridge(merge->to->uniqueid); - if (watching_apps_to) { - update_apps_list(watching_apps_all, watching_apps_to); - } - - watching_apps_from = get_apps_watching_bridge(merge->from->uniqueid); - if (watching_apps_from) { - update_bridge_interest(watching_apps_from, merge->to->uniqueid); - update_apps_list(watching_apps_all, watching_apps_from); - } - - if (!ao2_container_count(watching_apps_all)) { - return; - } - - msg = ast_json_pack("{s: s, s: o, s: o, s: o}", - "type", "BridgeMerged", - "timestamp", ast_json_timeval(*tv, NULL), - "bridge", ast_bridge_snapshot_to_json(merge->to), - "bridge_from", ast_bridge_snapshot_to_json(merge->from)); - - if (!msg) { - return; - } - - distribute_message(watching_apps_all, msg); -} - -static void sub_bridge_enter_handler(void *data, - struct stasis_subscription *sub, - struct stasis_topic *topic, - struct stasis_message *message) -{ - RAII_VAR(struct ao2_container *, watching_apps_channel, NULL, ao2_cleanup); - RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup); - RAII_VAR(struct ao2_container *, watching_apps_all, ao2_container_alloc(1, NULL, NULL), ao2_cleanup); - struct ast_bridge_blob *obj = stasis_message_data(message); - RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); - - watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid); - if (watching_apps_bridge) { - update_apps_list(watching_apps_all, watching_apps_bridge); - } - - watching_apps_channel = get_apps_watching_channel(obj->channel->uniqueid); - if (watching_apps_channel) { - update_bridge_interest(watching_apps_channel, obj->bridge->uniqueid); - update_apps_list(watching_apps_all, watching_apps_channel); - } - - if (!ao2_container_count(watching_apps_all)) { - return; - } - - msg = simple_bridge_channel_event("ChannelEnteredBridge", obj->bridge, - obj->channel, stasis_message_timestamp(message)); - - distribute_message(watching_apps_all, msg); -} - -static void sub_bridge_leave_handler(void *data, - struct stasis_subscription *sub, - struct stasis_topic *topic, - struct stasis_message *message) -{ - RAII_VAR(struct ao2_container *, watching_apps_bridge, NULL, ao2_cleanup); - struct ast_bridge_blob *obj = stasis_message_data(message); - RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); - - watching_apps_bridge = get_apps_watching_bridge(obj->bridge->uniqueid); - if (!watching_apps_bridge) { - return; - } - - msg = simple_bridge_channel_event("ChannelLeftBridge", obj->bridge, - obj->channel, stasis_message_timestamp(message)); - - distribute_message(watching_apps_bridge, msg); -} - static int load_module(void) { - int r = 0; - - apps_registry = - ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare); + apps_registry = ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, + app_compare); if (apps_registry == NULL) { return AST_MODULE_LOAD_FAILURE; } - app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, - control_hash, control_compare); + app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS, control_hash, + control_compare); if (app_controls == NULL) { return AST_MODULE_LOAD_FAILURE; } - app_bridges = ao2_container_alloc(CONTROLS_NUM_BUCKETS, - bridges_hash, bridges_compare); - if (app_bridges == NULL) { - return AST_MODULE_LOAD_FAILURE; - } + app_bridges = ao2_container_alloc(BRIDGES_NUM_BUCKETS, bridges_hash, + bridges_compare); app_bridges_moh = ao2_container_alloc_hash( AO2_ALLOC_OPT_LOCK_MUTEX, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT, @@ -1246,52 +818,11 @@ static int load_module(void) return AST_MODULE_LOAD_FAILURE; } - channel_router = stasis_message_router_create(ast_channel_topic_all_cached()); - if (!channel_router) { - return AST_MODULE_LOAD_FAILURE; - } - - r |= stasis_message_router_add(channel_router, stasis_cache_update_type(), sub_channel_snapshot_handler, NULL); - /* TODO: This could be handled a lot better. Instead of subscribing to - * the one caching topic and filtering out messages by channel id, we - * should have individual caching topics per-channel, with a shared - * back-end cache. That would simplify a lot of what's going on right - * here. - */ - r |= stasis_message_router_add(channel_router, ast_channel_user_event_type(), sub_channel_blob_handler, NULL); - r |= stasis_message_router_add(channel_router, ast_channel_varset_type(), sub_channel_blob_handler, NULL); - r |= stasis_message_router_add(channel_router, ast_channel_dtmf_end_type(), sub_channel_blob_handler, NULL); - r |= stasis_message_router_add(channel_router, ast_channel_hangup_request_type(), sub_channel_blob_handler, NULL); - if (r) { - return AST_MODULE_LOAD_FAILURE; - } - - bridge_router = stasis_message_router_create(ast_bridge_topic_all_cached()); - if (!bridge_router) { - return AST_MODULE_LOAD_FAILURE; - } - - r |= stasis_message_router_add(bridge_router, stasis_cache_update_type(), sub_bridge_snapshot_handler, NULL); - r |= stasis_message_router_add(bridge_router, ast_bridge_merge_message_type(), sub_bridge_merge_handler, NULL); - r |= stasis_message_router_add(bridge_router, ast_channel_entered_bridge_type(), sub_bridge_enter_handler, NULL); - r |= stasis_message_router_add(bridge_router, ast_channel_left_bridge_type(), sub_bridge_leave_handler, NULL); - if (r) { - return AST_MODULE_LOAD_FAILURE; - } - return AST_MODULE_LOAD_SUCCESS; } static int unload_module(void) { - int r = 0; - - stasis_message_router_unsubscribe_and_join(channel_router); - channel_router = NULL; - - stasis_message_router_unsubscribe_and_join(bridge_router); - bridge_router = NULL; - ao2_cleanup(apps_registry); apps_registry = NULL; @@ -1304,7 +835,7 @@ static int unload_module(void) ao2_cleanup(app_bridges_moh); app_bridges_moh = NULL; - return r; + return 0; } AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application support", diff --git a/res/stasis/app.c b/res/stasis/app.c index 6f80ed64ab6c7250662ce532aaa259085fe1582a..8abe0c19c23be80f829fd0d234084ba8de089417 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -29,132 +29,519 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "app.h" +#include "asterisk/callerid.h" #include "asterisk/stasis_app.h" +#include "asterisk/stasis_bridges.h" #include "asterisk/stasis_channels.h" - -/*! - * \brief Number of buckets for the channels container for app instances. Remember - * to keep it a prime number! - */ -#define APP_CHANNELS_BUCKETS 7 - -/*! - * \brief Number of buckets for the bridges container for app instances. Remember - * to keep it a prime number! - */ -#define APP_BRIDGES_BUCKETS 7 +#include "asterisk/stasis_message_router.h" struct app { + /*! Aggregation topic for this application. */ + struct stasis_topic *topic; + /*! Router for handling messages forwarded to \a topic. */ + struct stasis_message_router *router; + /*! Subscription to watch for bridge merge messages */ + struct stasis_subscription *bridge_merge_sub; + /*! Container of the channel forwards to this app's topic. */ + struct ao2_container *forwards; /*! Callback function for this application. */ stasis_app_cb handler; /*! Opaque data to hand to callback function. */ void *data; - /*! List of channel identifiers this app instance is interested in */ - struct ao2_container *channels; - /*! List of bridge identifiers this app instance owns */ - struct ao2_container *bridges; /*! Name of the Stasis application */ char name[]; }; +/*! Subscription info for a particular channel/bridge. */ +struct app_forwards { + /*! Count of number of times this channel/bridge has been subscribed */ + int interested; + + /*! Forward for the regular topic */ + struct stasis_subscription *topic_forward; + /*! Forward for the caching topic */ + struct stasis_subscription *topic_cached_forward; + + /*! Unique id of the object being forwarded */ + char id[]; +}; + +static void forwards_dtor(void *obj) +{ + struct app_forwards *forwards = obj; + + ast_assert(forwards->topic_forward == NULL); + ast_assert(forwards->topic_cached_forward == NULL); +} + +static void forwards_unsubscribe(struct app_forwards *forwards) +{ + stasis_unsubscribe(forwards->topic_forward); + forwards->topic_forward = NULL; + stasis_unsubscribe(forwards->topic_cached_forward); + forwards->topic_cached_forward = NULL; +} + +static struct app_forwards *forwards_create(struct app *app, + const char *id) +{ + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + + if (!app || ast_strlen_zero(id)) { + return NULL; + } + + forwards = ao2_alloc(sizeof(*forwards) + strlen(id) + 1, forwards_dtor); + if (!forwards) { + return NULL; + } + + strcpy(forwards->id, id); + + ao2_ref(forwards, +1); + return forwards; +} + +/*! Forward a channel's topics to an app */ +static struct app_forwards *forwards_create_channel(struct app *app, + struct ast_channel *chan) +{ + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + + if (!app || !chan) { + return NULL; + } + + forwards = forwards_create(app, ast_channel_uniqueid(chan)); + if (!forwards) { + return NULL; + } + + forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan), + app->topic); + if (!forwards->topic_forward) { + return NULL; + } + + forwards->topic_cached_forward = stasis_forward_all( + ast_channel_topic_cached(chan), app->topic); + if (!forwards->topic_cached_forward) { + /* Half-subscribed is a bad thing */ + stasis_unsubscribe(forwards->topic_forward); + forwards->topic_forward = NULL; + return NULL; + } + + ao2_ref(forwards, +1); + return forwards; +} + +/*! Forward a bridge's topics to an app */ +static struct app_forwards *forwards_create_bridge(struct app *app, + struct ast_bridge *bridge) +{ + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + + if (!app || !bridge) { + return NULL; + } + + forwards = forwards_create(app, bridge->uniqueid); + if (!forwards) { + return NULL; + } + + forwards->topic_forward = stasis_forward_all(ast_bridge_topic(bridge), + app->topic); + if (!forwards->topic_forward) { + return NULL; + } + + forwards->topic_cached_forward = stasis_forward_all( + ast_bridge_topic_cached(bridge), app->topic); + if (!forwards->topic_cached_forward) { + /* Half-subscribed is a bad thing */ + stasis_unsubscribe(forwards->topic_forward); + forwards->topic_forward = NULL; + return NULL; + } + + ao2_ref(forwards, +1); + return forwards; +} + +static int forwards_sort(const void *obj_left, const void *obj_right, int flags) +{ + const struct app_forwards *object_left = obj_left; + const struct app_forwards *object_right = obj_right; + const char *right_key = obj_right; + int cmp; + + switch (flags & (OBJ_POINTER | OBJ_KEY | OBJ_PARTIAL_KEY)) { + case OBJ_POINTER: + right_key = object_right->id; + /* Fall through */ + case OBJ_KEY: + cmp = strcmp(object_left->id, right_key); + break; + case OBJ_PARTIAL_KEY: + /* + * We could also use a partial key struct containing a length + * so strlen() does not get called for every comparison instead. + */ + cmp = strncmp(object_left->id, right_key, strlen(right_key)); + break; + default: + /* Sort can only work on something with a full or partial key. */ + ast_assert(0); + cmp = 0; + break; + } + return cmp; +} + static void app_dtor(void *obj) { struct app *app = obj; + ast_verb(1, "Destroying Stasis app %s\n", app->name); + + ast_assert(app->router == NULL); + ast_assert(app->bridge_merge_sub == NULL); + + ao2_cleanup(app->topic); + app->topic = NULL; + ao2_cleanup(app->forwards); + app->forwards = NULL; ao2_cleanup(app->data); app->data = NULL; - ao2_cleanup(app->channels); - app->channels = NULL; - ao2_cleanup(app->bridges); - app->bridges = NULL; } -struct app *app_create(const char *name, stasis_app_cb handler, void *data) +static void sub_default_handler(void *data, struct stasis_subscription *sub, + struct stasis_topic *topic, struct stasis_message *message) { - RAII_VAR(struct app *, app, NULL, ao2_cleanup); - size_t size; + struct app *app = data; + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); - ast_assert(name != NULL); - ast_assert(handler != NULL); + if (stasis_subscription_final_message(sub, message)) { + ao2_cleanup(app); + } - ast_verb(1, "Creating Stasis app '%s'\n", name); + /* By default, send any message that has a JSON representation */ + json = stasis_message_to_json(message); + if (!json) { + return; + } - size = sizeof(*app) + strlen(name) + 1; - app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX); + app_send(app, json); +} - if (!app) { - return NULL; +/*! \brief Typedef for callbacks that get called on channel snapshot updates */ +typedef struct ast_json *(*channel_snapshot_monitor)( + struct ast_channel_snapshot *old_snapshot, + struct ast_channel_snapshot *new_snapshot, + const struct timeval *tv); + +static struct ast_json *simple_channel_event( + const char *type, + struct ast_channel_snapshot *snapshot, + const struct timeval *tv) +{ + return ast_json_pack("{s: s, s: o, s: o}", + "type", type, + "timestamp", ast_json_timeval(*tv, NULL), + "channel", ast_channel_snapshot_to_json(snapshot)); +} + +static struct ast_json *channel_created_event( + struct ast_channel_snapshot *snapshot, + const struct timeval *tv) +{ + return simple_channel_event("ChannelCreated", snapshot, tv); +} + +static struct ast_json *channel_destroyed_event( + struct ast_channel_snapshot *snapshot, + const struct timeval *tv) +{ + return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}", + "type", "ChannelDestroyed", + "timestamp", ast_json_timeval(*tv, NULL), + "cause", snapshot->hangupcause, + "cause_txt", ast_cause2str(snapshot->hangupcause), + "channel", ast_channel_snapshot_to_json(snapshot)); +} + +static struct ast_json *channel_state_change_event( + struct ast_channel_snapshot *snapshot, + const struct timeval *tv) +{ + return simple_channel_event("ChannelStateChange", snapshot, tv); +} + +/*! \brief Handle channel state changes */ +static struct ast_json *channel_state( + struct ast_channel_snapshot *old_snapshot, + struct ast_channel_snapshot *new_snapshot, + const struct timeval *tv) +{ + struct ast_channel_snapshot *snapshot = new_snapshot ? + new_snapshot : old_snapshot; + + if (!old_snapshot) { + return channel_created_event(snapshot, tv); + } else if (!new_snapshot) { + return channel_destroyed_event(snapshot, tv); + } else if (old_snapshot->state != new_snapshot->state) { + return channel_state_change_event(snapshot, tv); } - strncpy(app->name, name, size - sizeof(*app)); - app->handler = handler; - ao2_ref(data, +1); - app->data = data; + return NULL; +} - app->channels = ast_str_container_alloc(APP_CHANNELS_BUCKETS); - if (!app->channels) { +static struct ast_json *channel_dialplan( + struct ast_channel_snapshot *old_snapshot, + struct ast_channel_snapshot *new_snapshot, + const struct timeval *tv) +{ + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + + /* No Newexten event on cache clear or first event */ + if (!old_snapshot || !new_snapshot) { return NULL; } - app->bridges = ast_str_container_alloc(APP_BRIDGES_BUCKETS); - if (!app->bridges) { + /* Empty application is not valid for a Newexten event */ + if (ast_strlen_zero(new_snapshot->appl)) { return NULL; } - ao2_ref(app, +1); - return app; + if (ast_channel_snapshot_cep_equal(old_snapshot, new_snapshot)) { + return NULL; + } + + return ast_json_pack("{s: s, s: o, s: s, s: s, s: o}", + "type", "ChannelDialplan", + "timestamp", ast_json_timeval(*tv, NULL), + "dialplan_app", new_snapshot->appl, + "dialplan_app_data", new_snapshot->data, + "channel", ast_channel_snapshot_to_json(new_snapshot)); } -int app_add_channel(struct app *app, const struct ast_channel *chan) +static struct ast_json *channel_callerid( + struct ast_channel_snapshot *old_snapshot, + struct ast_channel_snapshot *new_snapshot, + const struct timeval *tv) { - SCOPED_AO2LOCK(lock, app); - const char *uniqueid; + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); - ast_assert(app != NULL); - ast_assert(chan != NULL); + /* No NewCallerid event on cache clear or first event */ + if (!old_snapshot || !new_snapshot) { + return NULL; + } - /* Don't accept new channels in an inactive application */ - if (!app->handler) { - return -1; + if (ast_channel_snapshot_caller_id_equal(old_snapshot, new_snapshot)) { + return NULL; } - uniqueid = ast_channel_uniqueid(chan); - return ast_str_container_add(app->channels, uniqueid) ? -1 : 0; + return ast_json_pack("{s: s, s: o, s: i, s: s, s: o}", + "type", "ChannelCallerId", + "timestamp", ast_json_timeval(*tv, NULL), + "caller_presentation", new_snapshot->caller_pres, + "caller_presentation_txt", ast_describe_caller_presentation( + new_snapshot->caller_pres), + "channel", ast_channel_snapshot_to_json(new_snapshot)); } -void app_remove_channel(struct app* app, const struct ast_channel *chan) +static channel_snapshot_monitor channel_monitors[] = { + channel_state, + channel_dialplan, + channel_callerid +}; + +static void sub_channel_update_handler(void *data, + struct stasis_subscription *sub, + struct stasis_topic *topic, + struct stasis_message *message) { - SCOPED_AO2LOCK(lock, app); + struct app *app = data; + struct stasis_cache_update *update; + struct ast_channel_snapshot *new_snapshot; + struct ast_channel_snapshot *old_snapshot; + const struct timeval *tv; + int i; + + ast_assert(stasis_message_type(message) == stasis_cache_update_type()); + + update = stasis_message_data(message); + + ast_assert(update->type == ast_channel_snapshot_type()); + + new_snapshot = stasis_message_data(update->new_snapshot); + old_snapshot = stasis_message_data(update->old_snapshot); + + /* Pull timestamp from the new snapshot, or from the update message + * when there isn't one. */ + tv = update->new_snapshot ? + stasis_message_timestamp(update->new_snapshot) : + stasis_message_timestamp(message); - ast_assert(app != NULL); - ast_assert(chan != NULL); + for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) { + RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref); - ao2_find(app->channels, ast_channel_uniqueid(chan), OBJ_KEY | OBJ_NODATA | OBJ_UNLINK); + msg = channel_monitors[i](old_snapshot, new_snapshot, tv); + if (msg) { + app_send(app, msg); + } + } } -int app_add_bridge(struct app *app, const char *uniqueid) +static struct ast_json *simple_bridge_event( + const char *type, + struct ast_bridge_snapshot *snapshot, + const struct timeval *tv) { - SCOPED_AO2LOCK(lock, app); + return ast_json_pack("{s: s, s: o, s: o}", + "type", type, + "timestamp", ast_json_timeval(*tv, NULL), + "bridge", ast_bridge_snapshot_to_json(snapshot)); +} - ast_assert(app != NULL); - ast_assert(uniqueid != NULL); +static void sub_bridge_update_handler(void *data, + struct stasis_subscription *sub, + struct stasis_topic *topic, + struct stasis_message *message) +{ + RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); + struct app *app = data; + struct stasis_cache_update *update; + struct ast_bridge_snapshot *new_snapshot; + struct ast_bridge_snapshot *old_snapshot; + const struct timeval *tv; - /* Don't accept new bridges in an inactive application */ - if (!app->handler) { - return -1; + ast_assert(stasis_message_type(message) == stasis_cache_update_type()); + + update = stasis_message_data(message); + + ast_assert(update->type == ast_bridge_snapshot_type()); + + new_snapshot = stasis_message_data(update->new_snapshot); + old_snapshot = stasis_message_data(update->old_snapshot); + tv = update->new_snapshot ? + stasis_message_timestamp(update->new_snapshot) : + stasis_message_timestamp(message); + + if (!new_snapshot) { + json = simple_bridge_event("BridgeDestroyed", old_snapshot, tv); + } else if (!old_snapshot) { + json = simple_bridge_event("BridgeCreated", old_snapshot, tv); + } + + if (!json) { + return; + } + + app_send(app, json); +} + +static void bridge_merge_handler(void *data, struct stasis_subscription *sub, + struct stasis_topic *topic, struct stasis_message *message) +{ + struct app *app = data; + struct ast_bridge_merge_message *merge; + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + + if (stasis_subscription_final_message(sub, message)) { + ao2_cleanup(app); } - return ast_str_container_add(app->bridges, uniqueid) ? -1 : 0; + if (stasis_message_type(message) != ast_bridge_merge_message_type()) { + return; + } + + merge = stasis_message_data(message); + + /* Find out if we're subscribed to either bridge */ + forwards = ao2_find(app->forwards, merge->from->uniqueid, + OBJ_SEARCH_KEY); + if (!forwards) { + forwards = ao2_find(app->forwards, merge->to->uniqueid, + OBJ_SEARCH_KEY); + } + + if (!forwards) { + return; + } + + /* Forward the message to the app */ + stasis_forward_message(app->topic, topic, message); } -void app_remove_bridge(struct app* app, const char *uniqueid) +struct app *app_create(const char *name, stasis_app_cb handler, void *data) { - SCOPED_AO2LOCK(lock, app); + RAII_VAR(struct app *, app, NULL, ao2_cleanup); + size_t size; + int res = 0; + + ast_assert(name != NULL); + ast_assert(handler != NULL); + + ast_verb(1, "Creating Stasis app '%s'\n", name); + + size = sizeof(*app) + strlen(name) + 1; + app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX); + + if (!app) { + return NULL; + } + + app->forwards = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_MUTEX, + AO2_CONTAINER_ALLOC_OPT_DUPS_OBJ_REJECT, + forwards_sort, NULL); + if (!app->forwards) { + return NULL; + } + + app->topic = stasis_topic_create(name); + if (!app->topic) { + return NULL; + } + + app->bridge_merge_sub = stasis_subscribe(ast_bridge_topic_all(), + bridge_merge_handler, app); + if (!app->bridge_merge_sub) { + return NULL; + } + /* Subscription holds a reference */ + ao2_ref(app, +1); + + app->router = stasis_message_router_create(app->topic); + if (!app->router) { + return NULL; + } + + res |= stasis_message_router_add_cache_update(app->router, + ast_bridge_snapshot_type(), sub_bridge_update_handler, app); + + res |= stasis_message_router_add_cache_update(app->router, + ast_channel_snapshot_type(), sub_channel_update_handler, app); + + res |= stasis_message_router_set_default(app->router, + sub_default_handler, app); - ast_assert(app != NULL); - ast_assert(uniqueid != NULL); + if (res != 0) { + return NULL; + } + /* Router holds a reference */ + ao2_ref(app, +1); + + strncpy(app->name, name, size - sizeof(*app)); + app->handler = handler; + ao2_ref(data, +1); + app->data = data; - ao2_find(app->bridges, uniqueid, OBJ_KEY | OBJ_NODATA | OBJ_UNLINK | OBJ_MULTIPLE); + ao2_ref(app, +1); + return app; } /*! @@ -196,6 +583,18 @@ void app_deactivate(struct app *app) app->data = NULL; } +void app_shutdown(struct app *app) +{ + SCOPED_AO2LOCK(lock, app); + + ast_assert(app_is_finished(app)); + + stasis_message_router_unsubscribe(app->router); + app->router = NULL; + stasis_unsubscribe(app->bridge_merge_sub); + app->bridge_merge_sub = NULL; +} + int app_is_active(struct app *app) { SCOPED_AO2LOCK(lock, app); @@ -206,8 +605,7 @@ int app_is_finished(struct app *app) { SCOPED_AO2LOCK(lock, app); - return app->handler == NULL && - ao2_container_count(app->channels) == 0; + return app->handler == NULL && ao2_container_count(app->forwards) == 0; } void app_update(struct app *app, stasis_app_cb handler, void *data) @@ -229,7 +627,6 @@ void app_update(struct app *app, stasis_app_cb handler, void *data) ast_verb(1, "Activating Stasis app '%s'\n", app->name); } - app->handler = handler; ao2_cleanup(app->data); if (data) { @@ -243,16 +640,100 @@ const char *app_name(const struct app *app) return app->name; } -int app_is_watching_channel(struct app *app, const char *uniqueid) +int app_subscribe_channel(struct app *app, struct ast_channel *chan) { - RAII_VAR(char *, found, NULL, ao2_cleanup); - found = ao2_find(app->channels, uniqueid, OBJ_KEY); - return found != NULL; + int res; + + if (!app || !chan) { + return -1; + } else { + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + SCOPED_AO2LOCK(lock, app->forwards); + + forwards = ao2_find(app->forwards, ast_channel_uniqueid(chan), + OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!forwards) { + /* Forwards not found, create one */ + forwards = forwards_create_channel(app, chan); + if (!forwards) { + return -1; + } + + res = ao2_link_flags(app->forwards, forwards, + OBJ_NOLOCK); + if (!res) { + return -1; + } + } + + ++forwards->interested; + return 0; + } +} + +static int unsubscribe(struct app *app, const char *kind, const char *id) +{ + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + SCOPED_AO2LOCK(lock, app->forwards); + + forwards = ao2_find(app->forwards, id, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (!forwards) { + ast_log(LOG_ERROR, + "App '%s' not subscribed to %s '%s'", + app->name, kind, id); + return -1; + } + + if (--forwards->interested == 0) { + /* No one is interested any more; unsubscribe */ + forwards_unsubscribe(forwards); + ao2_find(app->forwards, forwards, + OBJ_POINTER | OBJ_NOLOCK | OBJ_UNLINK | + OBJ_NODATA); + } + + return 0; } -int app_is_watching_bridge(struct app *app, const char *uniqueid) +int app_unsubscribe_channel(struct app *app, struct ast_channel *chan) { - RAII_VAR(char *, found, NULL, ao2_cleanup); - found = ao2_find(app->bridges, uniqueid, OBJ_KEY); - return found != NULL; + if (!app || !chan) { + return -1; + } + + return unsubscribe(app, "channel", ast_channel_uniqueid(chan)); +} + +int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge) +{ + if (!app || !bridge) { + return -1; + } else { + RAII_VAR(struct app_forwards *, forwards, NULL, ao2_cleanup); + SCOPED_AO2LOCK(lock, app->forwards); + + forwards = ao2_find(app->forwards, bridge->uniqueid, + OBJ_SEARCH_KEY | OBJ_NOLOCK); + + if (!forwards) { + /* Forwards not found, create one */ + forwards = forwards_create_bridge(app, bridge); + if (!forwards) { + return -1; + } + ao2_link_flags(app->forwards, forwards, OBJ_NOLOCK); + } + + ++forwards->interested; + return 0; + } +} + +int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge) +{ + if (!app || !bridge) { + return -1; + } + + return unsubscribe(app, "bridge", bridge->uniqueid); } diff --git a/res/stasis/app.h b/res/stasis/app.h index 0cf92217fb90e0099a8169ac2e964cf3290b4f59..5f9f1d7e7ecce080fae8967420f11bb06d659916 100644 --- a/res/stasis/app.h +++ b/res/stasis/app.h @@ -47,6 +47,15 @@ struct app; */ struct app *app_create(const char *name, stasis_app_cb handler, void *data); +/*! + * \brief Tears down an application. + * + * It should be finished before calling this. + * + * \param app Application to unsubscribe. + */ +void app_shutdown(struct app *app); + /*! * \brief Deactivates an application. * @@ -95,17 +104,6 @@ void app_update(struct app *app, stasis_app_cb handler, void *data); */ const char *app_name(const struct app *app); -/*! - * \brief Subscribe an application to a topic. - * - * \param app Application. - * \param topic Topic to subscribe to. - * \return New subscription. - * \return \c NULL on error. - */ -struct stasis_subscription *app_subscribe(struct app *app, - struct stasis_topic *topic); - /*! * \brief Send a message to an application. * @@ -114,83 +112,44 @@ struct stasis_subscription *app_subscribe(struct app *app, */ void app_send(struct app *app, struct ast_json *message); -/*! - * \brief Send the start message to an application. - * - * \param app Application. - * \param chan The channel entering the application. - * \param argc The number of arguments for the application. - * \param argv The arguments for the application. - * \return 0 on success. - * \return Non-zero on error. - */ -int app_send_start_msg(struct app *app, struct ast_channel *chan, int argc, - char *argv[]); +struct app_forwards; /*! - * \brief Send the end message to an application. + * \brief Subscribes an application to a channel. * * \param app Application. - * \param chan The channel leaving the application. + * \param chan Channel to subscribe to. * \return 0 on success. * \return Non-zero on error. */ -int app_send_end_msg(struct app *app, struct ast_channel *chan); +int app_subscribe_channel(struct app *app, struct ast_channel *chan); /*! - * \brief Checks if an application is watching a given channel. + * \brief Cancel the subscription an app has for a channel. * - * \param app Application. - * \param uniqueid Uniqueid of the channel to check about. - * \return True (non-zero) if \a app is watching channel with given \a uniqueid - * \return False (zero) if \a app isn't. + * \param app Subscribing application. + * \param forwards Returned object from app_subscribe_channel(). */ -int app_is_watching_channel(struct app *app, const char *uniqueid); +int app_unsubscribe_channel(struct app *app, struct ast_channel *chan); /*! - * \brief Add a channel to an application's watch list. + * \brief Add a bridge subscription to an existing channel subscription. * * \param app Application. - * \param chan Channel to watch. + * \param bridge Bridge to subscribe to. * \return 0 on success. * \return Non-zero on error. */ -int app_add_channel(struct app *app, const struct ast_channel *chan); +int app_subscribe_bridge(struct app *app, struct ast_bridge *bridge); /*! - * \brief Remove a channel from an application's watch list. + * \brief Cancel the bridge subscription for an application. * - * \param app Application. - * \param chan Channel to watch. - */ -void app_remove_channel(struct app *app, const struct ast_channel *chan); - -/*! - * \brief Add a bridge to an application's watch list by uniqueid. - * - * \param app Application. - * \param bridge Bridge to watch. + * \param forwards Return from app_subscribe_channel(). + * \param bridge Bridge to subscribe to. * \return 0 on success. * \return Non-zero on error. */ -int app_add_bridge(struct app *app, const char *uniqueid); - -/*! - * \brief Remove a bridge from an application's watch list by uniqueid. - * - * \param app Application. - * \param bridge Bridge to remove. - */ -void app_remove_bridge(struct app* app, const char *uniqueid); - -/*! - * \brief Checks if an application is watching a given bridge. - * - * \param app Application. - * \param uniqueid Uniqueid of the bridge to check. - * \return True (non-zero) if \a app is watching bridge with given \a uniqueid - * \return False (zero) if \a app isn't. - */ -int app_is_watching_bridge(struct app *app, const char *uniqueid); +int app_unsubscribe_bridge(struct app *app, struct ast_bridge *bridge); #endif /* _ASTERISK_RES_STASIS_APP_H */ diff --git a/rest-api-templates/param_parsing.mustache b/rest-api-templates/param_parsing.mustache index 59c59e958e932abdb59d9d84e010f26022ca2330..aabd728fd4c0fd04ed12c7b173b3abcd67e0d4a4 100644 --- a/rest-api-templates/param_parsing.mustache +++ b/rest-api-templates/param_parsing.mustache @@ -36,8 +36,16 @@ goto fin; } - args.{{c_name}}_count = ast_app_separate_args( - args.{{c_name}}_parse, ',', vals, ARRAY_LEN(vals)); + if (strlen(args.{{c_name}}_parse) == 0) { + /* ast_app_separate_args can't handle "" */ + args.{{c_name}}_count = 1; + vals[0] = args.{{c_name}}_parse; + } else { + args.{{c_name}}_count = ast_app_separate_args( + args.{{c_name}}_parse, ',', vals, + ARRAY_LEN(vals)); + } + if (args.{{c_name}}_count == 0) { ast_ari_response_alloc_failed(response); goto fin; diff --git a/rest-api-templates/res_ari_resource.c.mustache b/rest-api-templates/res_ari_resource.c.mustache index 906d55f0dc15669338090216ce7df6da551307fc..e6b2a88f4203e15d8b9bd72a0983fd326354c1b3 100644 --- a/rest-api-templates/res_ari_resource.c.mustache +++ b/rest-api-templates/res_ari_resource.c.mustache @@ -174,14 +174,16 @@ fin: __attribute__((unused)) * negotiation. Param parsing should happen earlier, but we * need a way to pass it through the WebSocket code to the * callback */ - RAII_VAR(char *, msg, NULL, ast_free); + RAII_VAR(char *, msg, NULL, ast_json_free); if (response->message) { msg = ast_json_dump_string(response->message); } else { - msg = ast_strdup("?"); + ast_log(LOG_ERROR, "Missing response message\n"); + } + if (msg) { + ast_websocket_write(ws_session, + AST_WEBSOCKET_OPCODE_TEXT, msg, strlen(msg)); } - ast_websocket_write(ws_session, AST_WEBSOCKET_OPCODE_TEXT, msg, - strlen(msg)); } {{> param_cleanup}} }