diff --git a/CHANGES b/CHANGES index 2c980628ceffca73e256f10fbaba8769b019315f..111a03099276979e91e7e02011bd08edd5f18f85 100644 --- a/CHANGES +++ b/CHANGES @@ -19,6 +19,22 @@ chan_sip https://wiki.asterisk.org/wiki/x/tAHOAQ https://wiki.asterisk.org/wiki/x/hYCLAQ +Channels +------------------ + * The core no longer uses the stasis cache for channels snapshots. + The following APIs are no longer available: + ast_channel_topic_cached() + ast_channel_topic_all_cached() + The ast_channel_cache_all() and ast_channel_cache_by_name() functions + now returns an ao2_container of ast_channel_snapshots rather than a + container of stasis_messages therefore you can't call stasis_cache + functions on it. + The ast_channel_topic_all() function now returns a normal topic, + not a cached one so you can't use stasis cache functions on it either. + The ast_channel_snapshot_type() stasis message now has the + ast_channel_snapshot_update structure as it's data. + ast_channel_snapshot_get_latest() still returns the latest snapshot. + ------------------------------------------------------------------------------ --- Functionality changes from Asterisk 16.0.0 to Asterisk 16.1.0 ------------ ------------------------------------------------------------------------------ diff --git a/UPGRADE.txt b/UPGRADE.txt index 3cf8e2784f981cc665a3254d7d93e2621620623b..c299d83d67c25c41b23bc9b0fac1078dc81b7d5c 100644 --- a/UPGRADE.txt +++ b/UPGRADE.txt @@ -43,3 +43,18 @@ res_parking: res_xmpp: - The JabberStatus application, deprecated in Asterisk 12, has been removed. + +Channels: + - The core no longer uses the stasis cache for channels snapshots. + The following APIs are no longer available: + ast_channel_topic_cached() + ast_channel_topic_all_cached() + The ast_channel_cache_all() and ast_channel_cache_by_name() functions + now returns an ao2_container of ast_channel_snapshots rather than a + container of stasis_messages therefore you can't call stasis_cache + functions on it. + The ast_channel_topic_all() function now returns a normal topic, + not a cached one so you can't use stasis cache functions on it either. + The ast_channel_snapshot_type() stasis message now has the + ast_channel_snapshot_update structure as it's data. + ast_channel_snapshot_get_latest() still returns the latest snapshot. diff --git a/apps/app_agent_pool.c b/apps/app_agent_pool.c index 805c403f5d6b342fb9328b33edabba4fc9b0bd47..5bd6a4d3469ac9a2cbefe4f0e527d8731e26cdff 100644 --- a/apps/app_agent_pool.c +++ b/apps/app_agent_pool.c @@ -1448,7 +1448,7 @@ static void send_agent_login(struct ast_channel *chan, const char *agent) return; } - ast_channel_publish_cached_blob(chan, ast_channel_agent_login_type(), blob); + ast_channel_publish_blob(chan, ast_channel_agent_login_type(), blob); } static void send_agent_logoff(struct ast_channel *chan, const char *agent, long logintime) @@ -1464,7 +1464,7 @@ static void send_agent_logoff(struct ast_channel *chan, const char *agent, long return; } - ast_channel_publish_cached_blob(chan, ast_channel_agent_logoff_type(), blob); + ast_channel_publish_blob(chan, ast_channel_agent_logoff_type(), blob); } /*! diff --git a/apps/confbridge/confbridge_manager.c b/apps/confbridge/confbridge_manager.c index e88bbc2b026221b29baee3c7b3f73663a7243589..2d8503338c69a5be86100a0fb168252a49d727d6 100644 --- a/apps/confbridge/confbridge_manager.c +++ b/apps/confbridge/confbridge_manager.c @@ -783,7 +783,7 @@ int manager_confbridge_init(void) } channel_state_router = stasis_message_router_create( - ast_channel_topic_all_cached()); + ast_channel_topic_all()); if (!channel_state_router) { manager_confbridge_shutdown(); diff --git a/channels/chan_pjsip.c b/channels/chan_pjsip.c index 9edd989982e81f5dfc0d34c57ba9bdbf4b61ef1e..d0a74cd40e638597076eeb4295a59025ae05ff00 100644 --- a/channels/chan_pjsip.c +++ b/channels/chan_pjsip.c @@ -1135,7 +1135,6 @@ static int chan_pjsip_devicestate(const char *data) RAII_VAR(struct ast_sip_endpoint *, endpoint, ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", data), ao2_cleanup); enum ast_device_state state = AST_DEVICE_UNKNOWN; RAII_VAR(struct ast_endpoint_snapshot *, endpoint_snapshot, NULL, ao2_cleanup); - RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); struct ast_devstate_aggregate aggregate; int num, inuse = 0; @@ -1156,27 +1155,20 @@ static int chan_pjsip_devicestate(const char *data) state = AST_DEVICE_NOT_INUSE; } - if (!endpoint_snapshot->num_channels || !(cache = ast_channel_cache())) { + if (!endpoint_snapshot->num_channels) { return state; } ast_devstate_aggregate_init(&aggregate); - ao2_ref(cache, +1); - for (num = 0; num < endpoint_snapshot->num_channels; num++) { - RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); struct ast_channel_snapshot *snapshot; - msg = stasis_cache_get(cache, ast_channel_snapshot_type(), - endpoint_snapshot->channel_ids[num]); - - if (!msg) { + snapshot = ast_channel_snapshot_get_latest(endpoint_snapshot->channel_ids[num]); + if (!snapshot) { continue; } - snapshot = stasis_message_data(msg); - if (chan_pjsip_get_hold(snapshot->uniqueid)) { ast_devstate_aggregate_add(&aggregate, AST_DEVICE_ONHOLD); } else { @@ -1187,6 +1179,8 @@ static int chan_pjsip_devicestate(const char *data) (snapshot->state == AST_STATE_BUSY)) { inuse++; } + + ao2_ref(snapshot, -1); } if (endpoint->devicestate_busy_at && (inuse == endpoint->devicestate_busy_at)) { diff --git a/channels/pjsip/cli_commands.c b/channels/pjsip/cli_commands.c index 33d0e02c11ddbf836c99ee5b96f4c2b26c908caf..9a8dc2938afe01c14566a97a40cc9c041cb02faf 100644 --- a/channels/pjsip/cli_commands.c +++ b/channels/pjsip/cli_commands.c @@ -169,9 +169,8 @@ static int cli_channelstats_compare(void *obj, void *arg, int flags) static int cli_message_to_snapshot(void *obj, void *arg, int flags) { - struct stasis_message *message = obj; + struct ast_channel_snapshot *snapshot = obj; struct ao2_container *snapshots = arg; - struct ast_channel_snapshot *snapshot = stasis_message_data(message); if (!strcmp(snapshot->type, "PJSIP")) { ao2_link(snapshots, snapshot); @@ -198,8 +197,7 @@ static struct ao2_container *get_container(const char *regex, ao2_sort_fn sort_f { struct ao2_container *child_container; regex_t regexbuf; - RAII_VAR(struct ao2_container *, parent_container, - stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()), ao2_cleanup); + RAII_VAR(struct ao2_container *, parent_container, ast_channel_cache_by_name(), ao2_cleanup); if (!parent_container) { return NULL; diff --git a/include/asterisk/channel.h b/include/asterisk/channel.h index 3f22cdd9c5ce74d6a2358061ef61e46381f69bfb..9627ae216f98d8ecd473f6b95119d24fcecebeb5 100644 --- a/include/asterisk/channel.h +++ b/include/asterisk/channel.h @@ -147,6 +147,15 @@ extern "C" { */ #define AST_MAX_PUBLIC_UNIQUEID 149 +/*! + * The number of buckets to store channels or channel information + */ +#ifdef LOW_MEMORY +#define AST_NUM_CHANNEL_BUCKETS 61 +#else +#define AST_NUM_CHANNEL_BUCKETS 1567 +#endif + /*! * Maximum size of an internal Asterisk channel unique ID. * @@ -2649,6 +2658,17 @@ void ast_channel_internal_swap_uniqueid_and_linkedid(struct ast_channel *a, stru */ void ast_channel_internal_swap_topics(struct ast_channel *a, struct ast_channel *b); +/*! + * \brief Swap snapshots beteween two channels + * \param a First channel + * \param b Second channel + * \return void + * + * \note + * This is used in masquerade to exchange snapshots + */ +void ast_channel_internal_swap_snapshots(struct ast_channel *a, struct ast_channel *b); + /*! * \brief Set uniqueid and linkedid string value only (not time) * \param chan The channel to set the uniqueid to @@ -4236,6 +4256,8 @@ enum ast_channel_adsicpe ast_channel_adsicpe(const struct ast_channel *chan); void ast_channel_adsicpe_set(struct ast_channel *chan, enum ast_channel_adsicpe value); enum ast_channel_state ast_channel_state(const struct ast_channel *chan); ast_callid ast_channel_callid(const struct ast_channel *chan); +struct ast_channel_snapshot *ast_channel_snapshot(const struct ast_channel *chan); +void ast_channel_snapshot_set(struct ast_channel *chan, struct ast_channel_snapshot *snapshot); /*! * \pre chan is locked @@ -4561,21 +4583,6 @@ struct varshead *ast_channel_get_vars(struct ast_channel *chan); */ struct stasis_topic *ast_channel_topic(struct ast_channel *chan); -/*! - * \since 12 - * \brief A topic which publishes the events for a particular channel. - * - * \ref ast_channel_snapshot messages are replaced with \ref stasis_cache_update - * - * If the given \a chan is \c NULL, ast_channel_topic_all_cached() is returned. - * - * \param chan Channel, or \c NULL. - * - * \retval Topic for channel's events. - * \retval ast_channel_topic_all() if \a chan is \c NULL. - */ -struct stasis_topic *ast_channel_topic_cached(struct ast_channel *chan); - /*! * \brief Get the bridge associated with a channel * \since 12.0.0 diff --git a/include/asterisk/stasis_channels.h b/include/asterisk/stasis_channels.h index 4843617db0c3e962af2ffa931c61ab9a9d7bffba..2aeff6f0b1c507c4ecb2d177634c4270cc91f0c6 100644 --- a/include/asterisk/stasis_channels.h +++ b/include/asterisk/stasis_channels.h @@ -75,6 +75,23 @@ struct ast_channel_snapshot { struct varshead *ari_vars; /*!< Variables to be appended to ARI events */ }; +/*! + * \since 17 + * \brief Structure representing a change of snapshot of channel state. + * + * While not enforced programmatically, this object is shared across multiple + * threads, and should be treated as an immutable object. + * + * \note This structure will not have a transition of an old snapshot with no + * new snapshot to indicate that a channel has gone away. A new snapshot will + * always exist and a channel going away can be determined by checking for the + * AST_FLAG_DEAD flag on the new snapshot. + */ +struct ast_channel_snapshot_update { + struct ast_channel_snapshot *old_snapshot; /*!< The old channel snapshot */ + struct ast_channel_snapshot *new_snapshot; /*!< The new channel snapshot */ +}; + /*! * \since 12 * \brief Blob of data associated with a channel. @@ -94,7 +111,7 @@ struct ast_channel_blob { */ struct ast_multi_channel_blob; -struct stasis_cp_all *ast_channel_cache_all(void); +struct ao2_container *ast_channel_cache_all(void); /*! * \since 12 @@ -103,36 +120,19 @@ struct stasis_cp_all *ast_channel_cache_all(void); */ struct stasis_topic *ast_channel_topic_all(void); -/*! - * \since 12 - * \brief A caching topic which caches \ref ast_channel_snapshot messages from - * ast_channel_events_all(void). - * - * \retval Topic for all channel events. - */ -struct stasis_topic *ast_channel_topic_all_cached(void); - -/*! - * \since 12 - * \brief Primary channel cache, indexed by Uniqueid. - * - * \retval Cache of \ref ast_channel_snapshot. - */ -struct stasis_cache *ast_channel_cache(void); - /*! * \since 12 * \brief Secondary channel cache, indexed by name. * * \retval Cache of \ref ast_channel_snapshot. */ -struct stasis_cache *ast_channel_cache_by_name(void); +struct ao2_container *ast_channel_cache_by_name(void); /*! * \since 12 - * \brief Message type for \ref ast_channel_snapshot. + * \brief Message type for \ref ast_channel_snapshot_update. * - * \retval Message type for \ref ast_channel_snapshot. + * \retval Message type for \ref ast_channel_snapshot_update. */ struct stasis_message_type *ast_channel_snapshot_type(void); @@ -175,6 +175,18 @@ struct ast_channel_snapshot *ast_channel_snapshot_get_latest(const char *uniquei */ struct ast_channel_snapshot *ast_channel_snapshot_get_latest_by_name(const char *name); +/*! + * \since 17 + * \brief Send the final channel snapshot for a channel, thus removing it from cache + * + * \pre chan is locked + * + * \param chan The channel to send the final channel snapshot for + * + * \note This will also remove the cached snapshot from the channel itself + */ +void ast_channel_publish_final_snapshot(struct ast_channel *chan); + /*! * \since 12 * \brief Creates a \ref ast_channel_blob message. @@ -303,6 +315,8 @@ void ast_multi_channel_blob_add_channel(struct ast_multi_channel_blob *obj, * \param type Type of stasis message. * \param blob The blob being published. (NULL if no blob) * + * \note This will use the current snapshot on the channel and will not generate a new one. + * * \return Nothing */ void ast_channel_publish_blob(struct ast_channel *chan, struct stasis_message_type *type, @@ -557,17 +571,6 @@ void ast_channel_publish_dial_forward(struct ast_channel *caller, const char *dialstatus, const char *forward); -/*! - * \since 12 - * \brief Publish in the \ref ast_channel_topic a \ref ast_channel_snapshot - * message indicating a change in channel state - * - * \pre chan is locked - * - * \param chan The channel whose state has changed - */ -void ast_publish_channel_state(struct ast_channel *chan); - /*! @} */ /*! diff --git a/main/aoc.c b/main/aoc.c index 253c7453b3d9331c7b976874e233cd445ad17903..b8cf301f04e57d41cb9a4a6a8798682f2398fe4b 100644 --- a/main/aoc.c +++ b/main/aoc.c @@ -1849,7 +1849,9 @@ static void aoc_publish_blob(struct ast_channel *chan, struct stasis_message_typ } if (chan) { - aoc_event->snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan)); + ast_channel_lock(chan); + aoc_event->snapshot = ao2_bump(ast_channel_snapshot(chan)); + ast_channel_unlock(chan); if (!aoc_event->snapshot) { ao2_ref(aoc_event, -1); return; diff --git a/main/app.c b/main/app.c index 953b77df0885aa95c676b901a541516d60b950c1..ec7449065642129ab9c27dad447a0ba3962de8dc 100644 --- a/main/app.c +++ b/main/app.c @@ -3244,15 +3244,7 @@ static struct stasis_message *mwi_state_create_message( mwi_state->old_msgs = old_msgs; if (!ast_strlen_zero(channel_id)) { - struct stasis_message *chan_message; - - chan_message = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), - channel_id); - if (chan_message) { - mwi_state->snapshot = stasis_message_data(chan_message); - ao2_ref(mwi_state->snapshot, +1); - } - ao2_cleanup(chan_message); + mwi_state->snapshot = ast_channel_snapshot_get_latest(channel_id); } if (eid) { diff --git a/main/bridge.c b/main/bridge.c index d6e7a51a5adf74e97c07d1346c0faf10149c2400..024c6abfeae94a115db6bbbbc6dcea44d1d87136 100644 --- a/main/bridge.c +++ b/main/bridge.c @@ -5150,16 +5150,15 @@ static int bridge_show_specific_print_channel(void *obj, void *arg, int flags) { const char *uniqueid = obj; struct ast_cli_args *a = arg; - RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); struct ast_channel_snapshot *snapshot; - msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), uniqueid); - if (!msg) { + snapshot = ast_channel_snapshot_get_latest(uniqueid); + if (!snapshot) { return 0; } - snapshot = stasis_message_data(msg); ast_cli(a->fd, "Channel: %s\n", snapshot->name); + ao2_ref(snapshot, -1); return 0; } diff --git a/main/cdr.c b/main/cdr.c index 1c47e24173574185f678c393bee234a9208f6f1f..e321c2215d80ace726b1b2f68c7fd0e9cb46d428 100644 --- a/main/cdr.c +++ b/main/cdr.c @@ -186,14 +186,6 @@ </configInfo> ***/ - -/* The prime here should be similar in size to the channel container. */ -#ifdef LOW_MEMORY -#define NUM_CDR_BUCKETS 61 -#else -#define NUM_CDR_BUCKETS 769 -#endif - #define DEFAULT_ENABLED "1" #define DEFAULT_BATCHMODE "0" #define DEFAULT_UNANSWERED "0" @@ -2056,9 +2048,9 @@ static int filter_channel_snapshot(struct ast_channel_snapshot *snapshot) /*! * \internal - * \brief Filter a channel cache update + * \brief Filter a channel snapshot update */ -static int filter_channel_cache_message(struct ast_channel_snapshot *old_snapshot, +static int filter_channel_snapshot_message(struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot) { int ret = 0; @@ -2256,52 +2248,38 @@ static int check_new_cdr_needed(struct ast_channel_snapshot *old_snapshot, } /*! - * \brief Handler for Stasis-Core channel cache update messages + * \brief Handler for channel snapshot update messages * \param data Passed on * \param sub The stasis subscription for this message callback * \param topic The topic this message was published for * \param message The message */ -static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_message *message) +static void handle_channel_snapshot_update_message(void *data, struct stasis_subscription *sub, struct stasis_message *message) { struct cdr_object *cdr; - struct stasis_cache_update *update = stasis_message_data(message); - struct ast_channel_snapshot *old_snapshot; - struct ast_channel_snapshot *new_snapshot; + struct ast_channel_snapshot_update *update = stasis_message_data(message); struct cdr_object *it_cdr; - ast_assert(update != NULL); - ast_assert(ast_channel_snapshot_type() == update->type); - - old_snapshot = stasis_message_data(update->old_snapshot); - new_snapshot = stasis_message_data(update->new_snapshot); - - if (filter_channel_cache_message(old_snapshot, new_snapshot)) { + if (filter_channel_snapshot_message(update->old_snapshot, update->new_snapshot)) { return; } - if (new_snapshot && !old_snapshot) { - cdr = cdr_object_alloc(new_snapshot); + if (update->new_snapshot && !update->old_snapshot) { + cdr = cdr_object_alloc(update->new_snapshot); if (!cdr) { return; } cdr->is_root = 1; ao2_link(active_cdrs_master, cdr); } else { - const char *uniqueid; - - uniqueid = new_snapshot ? new_snapshot->uniqueid : old_snapshot->uniqueid; - cdr = ao2_find(active_cdrs_master, uniqueid, OBJ_SEARCH_KEY); + cdr = ao2_find(active_cdrs_master, update->new_snapshot->uniqueid, OBJ_SEARCH_KEY); } /* Handle Party A */ if (!cdr) { - const char *name; - - name = new_snapshot ? new_snapshot->name : old_snapshot->name; - ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", name); + ast_log(AST_LOG_WARNING, "No CDR for channel %s\n", update->new_snapshot->name); ast_assert(0); - } else if (new_snapshot) { + } else { int all_reject = 1; ao2_lock(cdr); @@ -2309,21 +2287,23 @@ static void handle_channel_cache_message(void *data, struct stasis_subscription if (!it_cdr->fn_table->process_party_a) { continue; } - all_reject &= it_cdr->fn_table->process_party_a(it_cdr, new_snapshot); + all_reject &= it_cdr->fn_table->process_party_a(it_cdr, update->new_snapshot); } - if (all_reject && check_new_cdr_needed(old_snapshot, new_snapshot)) { + if (all_reject && check_new_cdr_needed(update->old_snapshot, update->new_snapshot)) { /* We're not hung up and we have a new snapshot - we need a new CDR */ struct cdr_object *new_cdr; new_cdr = cdr_object_create_and_append(cdr); if (new_cdr) { - new_cdr->fn_table->process_party_a(new_cdr, new_snapshot); + new_cdr->fn_table->process_party_a(new_cdr, update->new_snapshot); } } ao2_unlock(cdr); - } else { + } + + if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) { ao2_lock(cdr); - CDR_DEBUG("%p - Beginning finalize/dispatch for %s\n", cdr, old_snapshot->name); + CDR_DEBUG("%p - Beginning finalize/dispatch for %s\n", cdr, update->old_snapshot->name); for (it_cdr = cdr; it_cdr; it_cdr = it_cdr->next) { cdr_object_finalize(it_cdr); } @@ -2335,12 +2315,14 @@ static void handle_channel_cache_message(void *data, struct stasis_subscription } /* Handle Party B */ - if (new_snapshot) { + if (update->new_snapshot) { ao2_callback_data(active_cdrs_all, OBJ_NODATA | OBJ_MULTIPLE | OBJ_SEARCH_KEY, - cdr_object_update_party_b, (char *) new_snapshot->name, new_snapshot); - } else { + cdr_object_update_party_b, (char *) update->new_snapshot->name, update->new_snapshot); + } + + if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) { ao2_callback_data(active_cdrs_all, OBJ_NODATA | OBJ_MULTIPLE | OBJ_SEARCH_KEY, - cdr_object_finalize_party_b, (char *) old_snapshot->name, old_snapshot); + cdr_object_finalize_party_b, (char *) update->new_snapshot->name, update->new_snapshot); } ao2_cleanup(cdr); @@ -4302,7 +4284,7 @@ static int create_subscriptions(void) return 0; } - channel_subscription = stasis_forward_all(ast_channel_topic_all_cached(), cdr_topic); + channel_subscription = stasis_forward_all(ast_channel_topic_all(), cdr_topic); if (!channel_subscription) { return -1; } @@ -4522,7 +4504,7 @@ static int load_module(void) return AST_MODULE_LOAD_FAILURE; } - stasis_message_router_add_cache_update(stasis_router, ast_channel_snapshot_type(), handle_channel_cache_message, NULL); + stasis_message_router_add(stasis_router, ast_channel_snapshot_type(), handle_channel_snapshot_update_message, NULL); stasis_message_router_add(stasis_router, ast_channel_dial_type(), handle_dial_message, NULL); stasis_message_router_add(stasis_router, ast_channel_entered_bridge_type(), handle_bridge_enter_message, NULL); stasis_message_router_add(stasis_router, ast_channel_left_bridge_type(), handle_bridge_leave_message, NULL); @@ -4530,14 +4512,14 @@ static int load_module(void) stasis_message_router_add(stasis_router, cdr_sync_message_type(), handle_cdr_sync_message, NULL); active_cdrs_master = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, - NUM_CDR_BUCKETS, cdr_master_hash_fn, NULL, cdr_master_cmp_fn); + AST_NUM_CHANNEL_BUCKETS, cdr_master_hash_fn, NULL, cdr_master_cmp_fn); if (!active_cdrs_master) { return AST_MODULE_LOAD_FAILURE; } ao2_container_register("cdrs_master", active_cdrs_master, cdr_master_print_fn); active_cdrs_all = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, - NUM_CDR_BUCKETS, cdr_all_hash_fn, NULL, cdr_all_cmp_fn); + AST_NUM_CHANNEL_BUCKETS, cdr_all_hash_fn, NULL, cdr_all_cmp_fn); if (!active_cdrs_all) { return AST_MODULE_LOAD_FAILURE; } diff --git a/main/cel.c b/main/cel.c index feb3bee865794fb46804125067704727109ad4a9..97e35adf55abe7af094ceb74f4a40c61db80c868 100644 --- a/main/cel.c +++ b/main/cel.c @@ -888,14 +888,6 @@ static void cel_channel_state_change( { int is_hungup, was_hungup; - if (!new_snapshot) { - cel_report_event(old_snapshot, AST_CEL_CHANNEL_END, NULL, NULL, NULL); - if (ast_cel_track_event(AST_CEL_LINKEDID_END)) { - check_retire_linkedid(old_snapshot); - } - return; - } - if (!old_snapshot) { cel_report_event(new_snapshot, AST_CEL_CHANNEL_START, NULL, NULL, NULL); return; @@ -915,6 +907,11 @@ static void cel_channel_state_change( cel_report_event(new_snapshot, AST_CEL_HANGUP, NULL, extra, NULL); ast_json_unref(extra); ao2_cleanup(dialstatus); + + cel_report_event(new_snapshot, AST_CEL_CHANNEL_END, NULL, NULL, NULL); + if (ast_cel_track_event(AST_CEL_LINKEDID_END)) { + check_retire_linkedid(new_snapshot); + } return; } @@ -928,7 +925,7 @@ static void cel_channel_linkedid_change( struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot) { - if (!old_snapshot || !new_snapshot) { + if (!old_snapshot) { return; } @@ -946,8 +943,7 @@ static void cel_channel_app_change( struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot) { - if (new_snapshot && old_snapshot - && !strcmp(old_snapshot->appl, new_snapshot->appl)) { + if (old_snapshot && !strcmp(old_snapshot->appl, new_snapshot->appl)) { return; } @@ -957,7 +953,7 @@ static void cel_channel_app_change( } /* new snapshot has an application, start it */ - if (new_snapshot && !ast_strlen_zero(new_snapshot->appl)) { + if (!ast_strlen_zero(new_snapshot->appl)) { cel_report_event(new_snapshot, AST_CEL_APP_START, NULL, NULL, NULL); } } @@ -984,22 +980,15 @@ static int cel_filter_channel_snapshot(struct ast_channel_snapshot *snapshot) static void cel_snapshot_update_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) { - struct stasis_cache_update *update = stasis_message_data(message); - if (ast_channel_snapshot_type() == update->type) { - struct ast_channel_snapshot *old_snapshot; - struct ast_channel_snapshot *new_snapshot; - size_t i; - - old_snapshot = stasis_message_data(update->old_snapshot); - new_snapshot = stasis_message_data(update->new_snapshot); + struct ast_channel_snapshot_update *update = stasis_message_data(message); + size_t i; - if (cel_filter_channel_snapshot(old_snapshot) || cel_filter_channel_snapshot(new_snapshot)) { - return; - } + if (cel_filter_channel_snapshot(update->old_snapshot) || cel_filter_channel_snapshot(update->new_snapshot)) { + return; + } - for (i = 0; i < ARRAY_LEN(cel_channel_monitors); ++i) { - cel_channel_monitors[i](old_snapshot, new_snapshot); - } + for (i = 0; i < ARRAY_LEN(cel_channel_monitors); ++i) { + cel_channel_monitors[i](update->old_snapshot, update->new_snapshot); } } @@ -1453,7 +1442,7 @@ static int create_subscriptions(void) } cel_channel_forwarder = stasis_forward_all( - ast_channel_topic_all_cached(), + ast_channel_topic_all(), cel_aggregation_topic); if (!cel_channel_forwarder) { return -1; @@ -1498,7 +1487,7 @@ static int create_routes(void) 6 * AST_TASKPROCESSOR_HIGH_WATER_LEVEL); ret |= stasis_message_router_add(cel_state_router, - stasis_cache_update_type(), + ast_channel_snapshot_type(), cel_snapshot_update_cb, NULL); diff --git a/main/channel.c b/main/channel.c index 6c6e9f758b4e12a321249b679362e4ad3ca34303..3d8e244fd8c2228fdd94833f08d93d53d1990f0a 100644 --- a/main/channel.c +++ b/main/channel.c @@ -116,12 +116,6 @@ struct chanlist { /*! \brief the list of registered channel types */ static AST_RWLIST_HEAD_STATIC(backends, chanlist); -#ifdef LOW_MEMORY -#define NUM_CHANNEL_BUCKETS 61 -#else -#define NUM_CHANNEL_BUCKETS 1567 -#endif - /*! \brief All active channels on the system */ static struct ao2_container *channels; @@ -635,38 +629,6 @@ int ast_str2cause(const char *name) return -1; } -static struct stasis_message *create_channel_snapshot_message(struct ast_channel *channel) -{ - RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); - - if (!ast_channel_snapshot_type()) { - return NULL; - } - - ast_channel_lock(channel); - snapshot = ast_channel_snapshot_create(channel); - ast_channel_unlock(channel); - if (!snapshot) { - return NULL; - } - - return stasis_message_create(ast_channel_snapshot_type(), snapshot); -} - -static void publish_cache_clear(struct ast_channel *chan) -{ - RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); - RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup); - - clear_msg = create_channel_snapshot_message(chan); - if (!clear_msg) { - return; - } - - message = stasis_cache_clear_create(clear_msg); - stasis_publish(ast_channel_topic(chan), message); -} - /*! \brief Gives the string form of a given channel state. * * \note This function is not reentrant. @@ -1236,7 +1198,9 @@ int ast_queue_hold(struct ast_channel *chan, const char *musicclass) "musicclass", musicclass); } - ast_channel_publish_cached_blob(chan, ast_channel_hold_type(), blob); + ast_channel_lock(chan); + ast_channel_publish_blob(chan, ast_channel_hold_type(), blob); + ast_channel_unlock(chan); res = ast_queue_frame(chan, &f); @@ -1250,7 +1214,9 @@ int ast_queue_unhold(struct ast_channel *chan) struct ast_frame f = { AST_FRAME_CONTROL, .subclass.integer = AST_CONTROL_UNHOLD }; int res; - ast_channel_publish_cached_blob(chan, ast_channel_unhold_type(), NULL); + ast_channel_lock(chan); + ast_channel_publish_blob(chan, ast_channel_unhold_type(), NULL); + ast_channel_unlock(chan); res = ast_queue_frame(chan, &f); @@ -2230,9 +2196,8 @@ static void ast_channel_destructor(void *obj) ast_assert(!ast_test_flag(ast_channel_flags(chan), AST_FLAG_SNAPSHOT_STAGE)); ast_channel_lock(chan); - ast_channel_publish_snapshot(chan); + ast_channel_publish_final_snapshot(chan); ast_channel_unlock(chan); - publish_cache_clear(chan); } ast_channel_lock(chan); @@ -3344,7 +3309,7 @@ static void send_dtmf_begin_event(struct ast_channel *chan, return; } - ast_channel_publish_cached_blob(chan, ast_channel_dtmf_begin_type(), blob); + ast_channel_publish_blob(chan, ast_channel_dtmf_begin_type(), blob); } static void send_dtmf_end_event(struct ast_channel *chan, @@ -3361,7 +3326,7 @@ static void send_dtmf_end_event(struct ast_channel *chan, return; } - ast_channel_publish_cached_blob(chan, ast_channel_dtmf_end_type(), blob); + ast_channel_publish_blob(chan, ast_channel_dtmf_end_type(), blob); } static void ast_read_generator_actions(struct ast_channel *chan, struct ast_frame *f) @@ -6819,6 +6784,9 @@ static void channel_do_masquerade(struct ast_channel *original, struct ast_chann /* Make sure the Stasis topic on the channel is updated appropriately */ ast_channel_internal_swap_topics(clonechan, original); + /* The old snapshots need to follow the channels so the snapshot update is correct */ + ast_channel_internal_swap_snapshots(clonechan, original); + /* Swap channel names. This uses ast_channel_name_set directly, so we * don't get any spurious rename events. */ @@ -7246,7 +7214,7 @@ int ast_setstate(struct ast_channel *chan, enum ast_channel_state state) ast_channel_state_set(chan, state); - ast_publish_channel_state(chan); + ast_channel_publish_snapshot(chan); /* We have to pass AST_DEVICE_UNKNOWN here because it is entirely possible that the channel driver * for this channel is using the callback method for device state. If we pass in an actual state here @@ -7856,7 +7824,7 @@ static void channels_shutdown(void) int ast_channels_init(void) { - channels = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, NUM_CHANNEL_BUCKETS, + channels = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, AST_NUM_CHANNEL_BUCKETS, ast_channel_hash_cb, NULL, ast_channel_cmp_cb); if (!channels) { return -1; diff --git a/main/channel_internal_api.c b/main/channel_internal_api.c index a963a7d3ea03e13a30b7e1d2e454fdee42390468..436ba23034700b9aff2cacc277dd9eeb66031042 100644 --- a/main/channel_internal_api.c +++ b/main/channel_internal_api.c @@ -42,7 +42,6 @@ #include "asterisk/channel_internal.h" #include "asterisk/endpoints.h" #include "asterisk/indications.h" -#include "asterisk/stasis_cache_pattern.h" #include "asterisk/stasis_channels.h" #include "asterisk/stasis_endpoints.h" #include "asterisk/stringfields.h" @@ -215,12 +214,13 @@ struct ast_channel { char dtmf_digit_to_emulate; /*!< Digit being emulated */ char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */ struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */ - struct stasis_cp_single *topics; /*!< Topic for all channel's events */ + struct stasis_topic *topic; /*!< Topic for trhis channel */ + struct stasis_forward *channel_forward; /*!< Subscription for event forwarding to all channel topic */ struct stasis_forward *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */ - struct stasis_forward *endpoint_cache_forward; /*!< Subscription for cache updates to endpoint's topic */ struct ast_stream_topology *stream_topology; /*!< Stream topology */ void *stream_topology_change_source; /*!< Source that initiated a stream topology change */ struct ast_stream *default_streams[AST_MEDIA_TYPE_END]; /*!< Default streams indexed by media type */ + struct ast_channel_snapshot *snapshot; /*!< The current up to date snapshot of the channel */ }; /*! \brief The monotonically increasing integer counter for channel uniqueids */ @@ -1381,11 +1381,25 @@ void ast_channel_internal_swap_uniqueid_and_linkedid(struct ast_channel *a, stru void ast_channel_internal_swap_topics(struct ast_channel *a, struct ast_channel *b) { - struct stasis_cp_single *temp; + struct stasis_topic *topic; + struct stasis_forward *forward; - temp = a->topics; - a->topics = b->topics; - b->topics = temp; + topic = a->topic; + a->topic = b->topic; + b->topic = topic; + + forward = a->channel_forward; + a->channel_forward = b->channel_forward; + b->channel_forward = forward; +} + +void ast_channel_internal_swap_snapshots(struct ast_channel *a, struct ast_channel *b) +{ + struct ast_channel_snapshot *snapshot; + + snapshot = a->snapshot; + a->snapshot = b->snapshot; + b->snapshot = snapshot; } void ast_channel_internal_set_fake_ids(struct ast_channel *chan, const char *uniqueid, const char *linkedid) @@ -1404,11 +1418,11 @@ void ast_channel_internal_cleanup(struct ast_channel *chan) ast_string_field_free_memory(chan); + chan->channel_forward = stasis_forward_cancel(chan->channel_forward); chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward); - chan->endpoint_cache_forward = stasis_forward_cancel(chan->endpoint_cache_forward); - stasis_cp_single_unsubscribe(chan->topics); - chan->topics = NULL; + ao2_cleanup(chan->topic); + chan->topic = NULL; ast_channel_internal_set_stream_topology(chan, NULL); @@ -1431,16 +1445,7 @@ struct stasis_topic *ast_channel_topic(struct ast_channel *chan) return ast_channel_topic_all(); } - return stasis_cp_single_topic(chan->topics); -} - -struct stasis_topic *ast_channel_topic_cached(struct ast_channel *chan) -{ - if (!chan) { - return ast_channel_topic_all_cached(); - } - - return stasis_cp_single_topic_cached(chan->topics); + return chan->topic; } int ast_channel_forward_endpoint(struct ast_channel *chan, @@ -1456,28 +1461,28 @@ int ast_channel_forward_endpoint(struct ast_channel *chan, return -1; } - chan->endpoint_cache_forward = stasis_forward_all(ast_channel_topic_cached(chan), - ast_endpoint_topic(endpoint)); - if (!chan->endpoint_cache_forward) { - chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward); - return -1; - } - return 0; } int ast_channel_internal_setup_topics(struct ast_channel *chan) { const char *topic_name = chan->uniqueid.unique_id; - ast_assert(chan->topics == NULL); + ast_assert(chan->topic == NULL); if (ast_strlen_zero(topic_name)) { topic_name = "<dummy-channel>"; } - chan->topics = stasis_cp_single_create( - ast_channel_cache_all(), topic_name); - if (!chan->topics) { + chan->topic = stasis_topic_create(topic_name); + if (!chan->topic) { + return -1; + } + + chan->channel_forward = stasis_forward_all(ast_channel_topic(chan), + ast_channel_topic_all()); + if (!chan->channel_forward) { + ao2_ref(chan->topic, -1); + chan->topic = NULL; return -1; } @@ -1568,3 +1573,14 @@ int ast_channel_is_multistream(struct ast_channel *chan) { return (chan->tech && chan->tech->read_stream && chan->tech->write_stream); } + +struct ast_channel_snapshot *ast_channel_snapshot(const struct ast_channel *chan) +{ + return chan->snapshot; +} + +void ast_channel_snapshot_set(struct ast_channel *chan, struct ast_channel_snapshot *snapshot) +{ + ao2_cleanup(chan->snapshot); + chan->snapshot = ao2_bump(snapshot); +} diff --git a/main/cli.c b/main/cli.c index cf51d0d9521c4a693372a9deaf56628f0d4f84b8..5484e47e7ef2986a7b1a3d600bfe428111819acd 100644 --- a/main/cli.c +++ b/main/cli.c @@ -956,7 +956,7 @@ static char *handle_chanlist(struct ast_cli_entry *e, int cmd, struct ast_cli_ar struct ao2_container *channels; struct ao2_iterator it_chans; - struct stasis_message *msg; + struct ast_channel_snapshot *cs; int numchans = 0, concise = 0, verbose = 0, count = 0; switch (cmd) { @@ -989,11 +989,7 @@ static char *handle_chanlist(struct ast_cli_entry *e, int cmd, struct ast_cli_ar } else if (a->argc != e->args - 1) return CLI_SHOWUSAGE; - - if (!(channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()))) { - ast_cli(a->fd, "Failed to retrieve cached channels\n"); - return CLI_SUCCESS; - } + channels = ast_channel_cache_by_name(); if (!count) { if (!concise && !verbose) @@ -1004,8 +1000,7 @@ static char *handle_chanlist(struct ast_cli_entry *e, int cmd, struct ast_cli_ar } it_chans = ao2_iterator_init(channels, 0); - for (; (msg = ao2_iterator_next(&it_chans)); ao2_ref(msg, -1)) { - struct ast_channel_snapshot *cs = stasis_message_data(msg); + for (; (cs = ao2_iterator_next(&it_chans)); ao2_ref(cs, -1)) { char durbuf[16] = "-"; if (!count) { @@ -1679,29 +1674,25 @@ char *ast_complete_channels(const char *line, const char *word, int pos, int sta struct ao2_container *cached_channels; char *ret = NULL; struct ao2_iterator iter; - struct stasis_message *msg; + struct ast_channel_snapshot *snapshot; if (pos != rpos) { return NULL; } - if (!(cached_channels = stasis_cache_dump(ast_channel_cache(), ast_channel_snapshot_type()))) { - return NULL; - } + cached_channels = ast_channel_cache_all(); iter = ao2_iterator_init(cached_channels, 0); - for (; (msg = ao2_iterator_next(&iter)); ao2_ref(msg, -1)) { - struct ast_channel_snapshot *snapshot = stasis_message_data(msg); - + for (; (snapshot = ao2_iterator_next(&iter)); ao2_ref(snapshot, -1)) { if (!strncasecmp(word, snapshot->name, wordlen) && (++which > state)) { if (state != -1) { ret = ast_strdup(snapshot->name); - ao2_ref(msg, -1); + ao2_ref(snapshot, -1); break; } if (ast_cli_completion_add(ast_strdup(snapshot->name))) { - ao2_ref(msg, -1); + ao2_ref(snapshot, -1); break; } } diff --git a/main/endpoints.c b/main/endpoints.c index 030e26cb5b32ac36d697401af20c46650d7a97a3..f3e337225d662dec94602ce18ccdfb950cf58897 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -179,25 +179,23 @@ int ast_endpoint_add_channel(struct ast_endpoint *endpoint, return 0; } -/*! \brief Handler for channel snapshot cache clears */ +/*! \brief Handler for channel snapshot update */ static void endpoint_cache_clear(void *data, struct stasis_subscription *sub, struct stasis_message *message) { struct ast_endpoint *endpoint = data; - struct stasis_message *clear_msg = stasis_message_data(message); - struct ast_channel_snapshot *clear_snapshot; + struct ast_channel_snapshot_update *update = stasis_message_data(message); - if (stasis_message_type(clear_msg) != ast_channel_snapshot_type()) { + /* Only when the channel is dead do we remove it */ + if (!ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) { return; } - clear_snapshot = stasis_message_data(clear_msg); - ast_assert(endpoint != NULL); ao2_lock(endpoint); - ast_str_container_remove(endpoint->channel_ids, clear_snapshot->uniqueid); + ast_str_container_remove(endpoint->channel_ids, update->new_snapshot->uniqueid); ao2_unlock(endpoint); endpoint_publish_snapshot(endpoint); } @@ -271,7 +269,7 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha return NULL; } r |= stasis_message_router_add(endpoint->router, - stasis_cache_clear_type(), endpoint_cache_clear, + ast_channel_snapshot_type(), endpoint_cache_clear, endpoint); r |= stasis_message_router_add(endpoint->router, stasis_subscription_change_type(), endpoint_subscription_change, diff --git a/main/manager.c b/main/manager.c index 0469a737eeee0581d8c353b2bc4a840ac6c95342..76a827c09c85578354288e5f410a7cfa6eb95dda 100644 --- a/main/manager.c +++ b/main/manager.c @@ -6250,7 +6250,7 @@ static int action_coreshowchannels(struct mansession *s, const struct message *m int numchans = 0; struct ao2_container *channels; struct ao2_iterator it_chans; - struct stasis_message *msg; + struct ast_channel_snapshot *cs; if (!ast_strlen_zero(actionid)) { snprintf(idText, sizeof(idText), "ActionID: %s\r\n", actionid); @@ -6258,17 +6258,12 @@ static int action_coreshowchannels(struct mansession *s, const struct message *m idText[0] = '\0'; } - channels = stasis_cache_dump(ast_channel_cache_by_name(), ast_channel_snapshot_type()); - if (!channels) { - astman_send_error(s, m, "Could not get cached channels"); - return 0; - } + channels = ast_channel_cache_by_name(); astman_send_listack(s, m, "Channels will follow", "start"); it_chans = ao2_iterator_init(channels, 0); - for (; (msg = ao2_iterator_next(&it_chans)); ao2_ref(msg, -1)) { - struct ast_channel_snapshot *cs = stasis_message_data(msg); + for (; (cs = ao2_iterator_next(&it_chans)); ao2_ref(cs, -1)) { struct ast_str *built = ast_manager_build_channel_state_string_prefix(cs, ""); char durbuf[16] = ""; diff --git a/main/manager_bridges.c b/main/manager_bridges.c index b7059f40c8cb19203ca47350a43d7841b6188c22..1b5704968f087582adf18bd214d4fd046288c730 100644 --- a/main/manager_bridges.c +++ b/main/manager_bridges.c @@ -528,17 +528,14 @@ static int send_bridge_info_item_cb(void *obj, void *arg, void *data, int flags) char *uniqueid = obj; struct mansession *s = arg; struct bridge_list_data *list_data = data; - RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); - struct ast_channel_snapshot *snapshot; + RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup); RAII_VAR(struct ast_str *, channel_text, NULL, ast_free); - msg = stasis_cache_get(ast_channel_cache(), - ast_channel_snapshot_type(), uniqueid); - if (!msg) { + snapshot = ast_channel_snapshot_get_latest(uniqueid); + if (!snapshot) { return 0; } - snapshot = stasis_message_data(msg); if (snapshot->tech_properties & AST_CHAN_TP_INTERNAL) { return 0; } diff --git a/main/manager_channels.c b/main/manager_channels.c index ac09d42a33d92ba2e8502fb309ce866d994a2ff7..887f77e19351244bf7ae713b860c03dc1f171761 100644 --- a/main/manager_channels.c +++ b/main/manager_channels.c @@ -576,11 +576,6 @@ static struct ast_manager_event_blob *channel_state_change( { int is_hungup, was_hungup; - if (!new_snapshot) { - /* Ignore cache clearing events; we'll see the hangup first */ - return NULL; - } - /* The Newchannel, Newstate and Hangup events are closely related, in * in that they are mutually exclusive, basically different flavors * of a new channel state event. @@ -616,11 +611,6 @@ static struct ast_manager_event_blob *channel_newexten( struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot) { - /* No Newexten event on cache clear */ - if (!new_snapshot) { - return NULL; - } - /* Empty application is not valid for a Newexten event */ if (ast_strlen_zero(new_snapshot->appl)) { return NULL; @@ -654,8 +644,8 @@ static struct ast_manager_event_blob *channel_new_callerid( struct ast_manager_event_blob *res; char *callerid; - /* No NewCallerid event on cache clear or first event */ - if (!old_snapshot || !new_snapshot) { + /* No NewCallerid event on first channel snapshot */ + if (!old_snapshot) { return NULL; } @@ -682,8 +672,8 @@ static struct ast_manager_event_blob *channel_new_connected_line( struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot) { - /* No NewConnectedLine event on cache clear or first event */ - if (!old_snapshot || !new_snapshot) { + /* No NewConnectedLine event on first channel snapshot */ + if (!old_snapshot) { return NULL; } @@ -699,7 +689,7 @@ static struct ast_manager_event_blob *channel_new_accountcode( struct ast_channel_snapshot *old_snapshot, struct ast_channel_snapshot *new_snapshot) { - if (!old_snapshot || !new_snapshot) { + if (!old_snapshot) { return NULL; } @@ -724,21 +714,14 @@ static void channel_snapshot_update(void *data, struct stasis_subscription *sub, struct stasis_message *message) { RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free); - struct stasis_cache_update *update; - struct ast_channel_snapshot *old_snapshot; - struct ast_channel_snapshot *new_snapshot; + struct ast_channel_snapshot_update *update; size_t i; update = stasis_message_data(message); - ast_assert(ast_channel_snapshot_type() == update->type); - - old_snapshot = stasis_message_data(update->old_snapshot); - new_snapshot = stasis_message_data(update->new_snapshot); - for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) { RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup); - ev = channel_monitors[i](old_snapshot, new_snapshot); + ev = channel_monitors[i](update->old_snapshot, update->new_snapshot); if (!ev) { continue; @@ -747,7 +730,7 @@ static void channel_snapshot_update(void *data, struct stasis_subscription *sub, /* If we haven't already, build the channel event string */ if (!channel_event_string) { channel_event_string = - ast_manager_build_channel_state_string(new_snapshot); + ast_manager_build_channel_state_string(update->new_snapshot); if (!channel_event_string) { return; } @@ -1260,7 +1243,7 @@ int manager_channels_init(void) if (!message_router) { return -1; } - channel_topic = ast_channel_topic_all_cached(); + channel_topic = ast_channel_topic_all(); if (!channel_topic) { return -1; } @@ -1272,7 +1255,7 @@ int manager_channels_init(void) ast_register_cleanup(manager_channels_shutdown); - ret |= stasis_message_router_add_cache_update(message_router, + ret |= stasis_message_router_add(message_router, ast_channel_snapshot_type(), channel_snapshot_update, NULL); ret |= stasis_message_router_add(message_router, diff --git a/main/stasis_channels.c b/main/stasis_channels.c index 0da80057d95531aa43286d198c9131e694a77458..ec8d70cd22f284a01840a5f30918b13fcd01cc8d 100644 --- a/main/stasis_channels.c +++ b/main/stasis_channels.c @@ -36,7 +36,6 @@ #include "asterisk/bridge.h" #include "asterisk/translate.h" #include "asterisk/stasis.h" -#include "asterisk/stasis_cache_pattern.h" #include "asterisk/stasis_channels.h" #include "asterisk/dial.h" #include "asterisk/linkedlists.h" @@ -117,60 +116,83 @@ #define NUM_MULTI_CHANNEL_BLOB_BUCKETS 7 -static struct stasis_cp_all *channel_cache_all; -static struct stasis_cache *channel_cache_by_name; -static struct stasis_caching_topic *channel_by_name_topic; +static struct stasis_topic *channel_topic_all; +static struct ao2_container *channel_cache; +static struct ao2_container *channel_cache_by_name; -struct stasis_cp_all *ast_channel_cache_all(void) +struct ao2_container *ast_channel_cache_all(void) { - return channel_cache_all; -} - -struct stasis_cache *ast_channel_cache(void) -{ - return stasis_cp_all_cache(channel_cache_all); + return ao2_bump(channel_cache); } struct stasis_topic *ast_channel_topic_all(void) { - return stasis_cp_all_topic(channel_cache_all); + return channel_topic_all; } -struct stasis_topic *ast_channel_topic_all_cached(void) +struct ao2_container *ast_channel_cache_by_name(void) { - return stasis_cp_all_topic_cached(channel_cache_all); + return ao2_bump(channel_cache_by_name); } -struct stasis_cache *ast_channel_cache_by_name(void) +/*! + * \internal + * \brief Hash function for \ref ast_channel_snapshot objects + */ +static int channel_snapshot_hash_cb(const void *obj, const int flags) { - return channel_cache_by_name; -} + const struct ast_channel_snapshot *object = obj; + const char *key; -static const char *channel_snapshot_get_id(struct stasis_message *message) -{ - struct ast_channel_snapshot *snapshot; - if (ast_channel_snapshot_type() != stasis_message_type(message)) { - return NULL; + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_KEY: + key = obj; + break; + case OBJ_SEARCH_OBJECT: + key = object->name; + break; + default: + ast_assert(0); + return 0; } - snapshot = stasis_message_data(message); - return snapshot->uniqueid; + return ast_str_case_hash(key); } -static const char *channel_snapshot_get_name(struct stasis_message *message) +/*! + * \internal + * \brief Comparison function for \ref ast_channel_snapshot objects + */ +static int channel_snapshot_cmp_cb(void *obj, void *arg, int flags) { - struct ast_channel_snapshot *snapshot; - if (ast_channel_snapshot_type() != stasis_message_type(message)) { - return NULL; + const struct ast_channel_snapshot *object_left = obj; + const struct ast_channel_snapshot *object_right = arg; + const char *right_key = arg; + int cmp; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_OBJECT: + right_key = object_right->name; + case OBJ_SEARCH_KEY: + cmp = strcasecmp(object_left->name, right_key); + break; + case OBJ_SEARCH_PARTIAL_KEY: + cmp = strncasecmp(object_left->name, right_key, strlen(right_key)); + break; + default: + cmp = 0; + break; } - snapshot = stasis_message_data(message); - return snapshot->name; + if (cmp) { + return 0; + } + return CMP_MATCH; } /*! * \internal - * \brief Hash function for \ref ast_channel_snapshot objects + * \brief Hash function (using uniqueid) for \ref ast_channel_snapshot objects */ -static int channel_snapshot_hash_cb(const void *obj, const int flags) +static int channel_snapshot_uniqueid_hash_cb(const void *obj, const int flags) { const struct ast_channel_snapshot *object = obj; const char *key; @@ -180,7 +202,7 @@ static int channel_snapshot_hash_cb(const void *obj, const int flags) key = obj; break; case OBJ_SEARCH_OBJECT: - key = object->name; + key = object->uniqueid; break; default: ast_assert(0); @@ -191,9 +213,9 @@ static int channel_snapshot_hash_cb(const void *obj, const int flags) /*! * \internal - * \brief Comparison function for \ref ast_channel_snapshot objects + * \brief Comparison function (using uniqueid) for \ref ast_channel_snapshot objects */ -static int channel_snapshot_cmp_cb(void *obj, void *arg, int flags) +static int channel_snapshot_uniqueid_cmp_cb(void *obj, void *arg, int flags) { const struct ast_channel_snapshot *object_left = obj; const struct ast_channel_snapshot *object_right = arg; @@ -202,12 +224,12 @@ static int channel_snapshot_cmp_cb(void *obj, void *arg, int flags) switch (flags & OBJ_SEARCH_MASK) { case OBJ_SEARCH_OBJECT: - right_key = object_right->name; + right_key = object_right->uniqueid; case OBJ_SEARCH_KEY: - cmp = strcasecmp(object_left->name, right_key); + cmp = strcasecmp(object_left->uniqueid, right_key); break; case OBJ_SEARCH_PARTIAL_KEY: - cmp = strncasecmp(object_left->name, right_key, strlen(right_key)); + cmp = strncasecmp(object_left->uniqueid, right_key, strlen(right_key)); break; default: cmp = 0; @@ -309,6 +331,34 @@ struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *cha return snapshot; } +static void channel_snapshot_update_dtor(void *obj) +{ + struct ast_channel_snapshot_update *update = obj; + + ao2_cleanup(update->old_snapshot); + ao2_cleanup(update->new_snapshot); +} + +static struct ast_channel_snapshot_update *channel_snapshot_update_create(struct ast_channel *chan) +{ + struct ast_channel_snapshot_update *update; + + update = ao2_alloc_options(sizeof(*update), channel_snapshot_update_dtor, + AO2_ALLOC_OPT_LOCK_NOLOCK); + if (!update) { + return NULL; + } + + update->old_snapshot = ao2_bump(ast_channel_snapshot(chan)); + update->new_snapshot = ast_channel_snapshot_create(chan); + if (!update->new_snapshot) { + ao2_ref(update, -1); + return NULL; + } + + return update; +} + static void publish_message_for_channel_topics(struct stasis_message *message, struct ast_channel *chan) { if (chan) { @@ -521,7 +571,7 @@ struct stasis_message *ast_channel_blob_create(struct ast_channel *chan, return NULL; } - snapshot = chan ? ast_channel_snapshot_create(chan) : NULL; + snapshot = chan ? ao2_bump(ast_channel_snapshot(chan)) : NULL; msg = create_channel_blob_message(snapshot, type, blob); ao2_cleanup(snapshot); return msg; @@ -628,38 +678,48 @@ struct ast_multi_channel_blob *ast_multi_channel_blob_create(struct ast_json *bl struct ast_channel_snapshot *ast_channel_snapshot_get_latest(const char *uniqueid) { - struct stasis_message *message; - struct ast_channel_snapshot *snapshot; - ast_assert(!ast_strlen_zero(uniqueid)); - message = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), - uniqueid); - if (!message) { - return NULL; - } - - snapshot = ao2_bump(stasis_message_data(message)); - ao2_ref(message, -1); - return snapshot; + return ao2_find(channel_cache, uniqueid, OBJ_SEARCH_KEY); } struct ast_channel_snapshot *ast_channel_snapshot_get_latest_by_name(const char *name) { + ast_assert(!ast_strlen_zero(name)); + + return ao2_find(channel_cache_by_name, name, OBJ_SEARCH_KEY); +} + +void ast_channel_publish_final_snapshot(struct ast_channel *chan) +{ + struct ast_channel_snapshot_update *update; struct stasis_message *message; - struct ast_channel_snapshot *snapshot; - ast_assert(!ast_strlen_zero(name)); + if (!ast_channel_snapshot_type()) { + return; + } - message = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), - name); + update = channel_snapshot_update_create(chan); + if (!update) { + return; + } + + message = stasis_message_create(ast_channel_snapshot_type(), update); + /* In the success path message holds a reference to update so it will be valid + * for the lifetime of this function until the end. + */ + ao2_ref(update, -1); if (!message) { - return NULL; + return; } - snapshot = ao2_bump(stasis_message_data(message)); + ao2_unlink(channel_cache, update->old_snapshot); + ao2_unlink(channel_cache_by_name, update->old_snapshot); + + ast_channel_snapshot_set(chan, NULL); + + stasis_publish(ast_channel_topic(chan), message); ao2_ref(message, -1); - return snapshot; } static void channel_role_snapshot_dtor(void *obj) @@ -764,7 +824,7 @@ void ast_channel_stage_snapshot_done(struct ast_channel *chan) void ast_channel_publish_snapshot(struct ast_channel *chan) { - struct ast_channel_snapshot *snapshot; + struct ast_channel_snapshot_update *update; struct stasis_message *message; if (!ast_channel_snapshot_type()) { @@ -775,17 +835,40 @@ void ast_channel_publish_snapshot(struct ast_channel *chan) return; } - snapshot = ast_channel_snapshot_create(chan); - if (!snapshot) { + update = channel_snapshot_update_create(chan); + if (!update) { return; } - message = stasis_message_create(ast_channel_snapshot_type(), snapshot); - ao2_ref(snapshot, -1); + message = stasis_message_create(ast_channel_snapshot_type(), update); + /* In the success path message holds a reference to update so it will be valid + * for the lifetime of this function until the end. + */ + ao2_ref(update, -1); if (!message) { return; } + /* We lock these ourselves so that the update is atomic and there isn't time where a + * snapshot is not in the cache. + */ + ao2_wrlock(channel_cache); + if (update->old_snapshot) { + ao2_unlink_flags(channel_cache, update->old_snapshot, OBJ_NOLOCK); + } + ao2_link_flags(channel_cache, update->new_snapshot, OBJ_NOLOCK); + ao2_unlock(channel_cache); + + /* The same applies here. */ + ao2_wrlock(channel_cache_by_name); + if (update->old_snapshot) { + ao2_unlink_flags(channel_cache_by_name, update->old_snapshot, OBJ_NOLOCK); + } + ao2_link_flags(channel_cache_by_name, update->new_snapshot, OBJ_NOLOCK); + ao2_unlock(channel_cache_by_name); + + ast_channel_snapshot_set(chan, update->new_snapshot); + ast_assert(ast_channel_topic(chan) != NULL); stasis_publish(ast_channel_topic(chan), message); ao2_ref(message, -1); @@ -841,13 +924,8 @@ void ast_channel_publish_varset(struct ast_channel *chan, const char *name, cons ast_channel_publish_snapshot(chan); } - if (chan) { - ast_channel_publish_cached_blob(chan, ast_channel_varset_type(), blob); - } else { - /* This function is NULL safe for global variables */ - ast_channel_publish_blob(NULL, ast_channel_varset_type(), blob); - } - + /* This function is NULL safe for global variables */ + ast_channel_publish_blob(chan, ast_channel_varset_type(), blob); ast_json_unref(blob); } @@ -931,36 +1009,6 @@ static struct ast_manager_event_blob *agent_logoff_to_ami(struct stasis_message return ev; } -void ast_publish_channel_state(struct ast_channel *chan) -{ - struct ast_channel_snapshot *snapshot; - struct stasis_message *message; - - if (!ast_channel_snapshot_type()) { - return; - } - - ast_assert(chan != NULL); - if (!chan) { - return; - } - - snapshot = ast_channel_snapshot_create(chan); - if (!snapshot) { - return; - } - - message = stasis_message_create(ast_channel_snapshot_type(), snapshot); - ao2_ref(snapshot, -1); - if (!message) { - return; - } - - ast_assert(ast_channel_topic(chan) != NULL); - stasis_publish(ast_channel_topic(chan), message); - ao2_ref(message, -1); -} - struct ast_json *ast_channel_snapshot_to_json( const struct ast_channel_snapshot *snapshot, const struct stasis_message_sanitizer *sanitize) @@ -1332,12 +1380,12 @@ STASIS_MESSAGE_TYPE_DEFN(ast_channel_talking_stop, static void stasis_channels_cleanup(void) { - stasis_caching_unsubscribe_and_join(channel_by_name_topic); - channel_by_name_topic = NULL; + ao2_cleanup(channel_topic_all); + channel_topic_all = NULL; + ao2_cleanup(channel_cache); + channel_cache = NULL; ao2_cleanup(channel_cache_by_name); channel_cache_by_name = NULL; - ao2_cleanup(channel_cache_all); - channel_cache_all = NULL; STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type); @@ -1367,29 +1415,28 @@ int ast_stasis_channels_init(void) ast_register_cleanup(stasis_channels_cleanup); - channel_cache_all = stasis_cp_all_create("ast_channel_topic_all", - channel_snapshot_get_id); - if (!channel_cache_all) { + channel_topic_all = stasis_topic_create("ast_channel_topic_all"); + if (!channel_topic_all) { return -1; } - res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_login_type); - res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_logoff_type); - channel_cache_by_name = stasis_cache_create(channel_snapshot_get_name); - if (!channel_cache_by_name) { + channel_cache = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, + 0, AST_NUM_CHANNEL_BUCKETS, channel_snapshot_uniqueid_hash_cb, + NULL, channel_snapshot_uniqueid_cmp_cb); + if (!channel_cache) { return -1; } - /* This should be initialized before the caching topic */ - res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type); - - channel_by_name_topic = stasis_caching_topic_create( - stasis_cp_all_topic(channel_cache_all), - channel_cache_by_name); - if (!channel_by_name_topic) { + channel_cache_by_name = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, + 0, AST_NUM_CHANNEL_BUCKETS, channel_snapshot_hash_cb, + NULL, channel_snapshot_cmp_cb); + if (!channel_cache_by_name) { return -1; } + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_login_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_agent_logoff_type); + res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type); res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type); res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type); res |= STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type); diff --git a/res/ari/resource_channels.c b/res/ari/resource_channels.c index bca32f1243439c73bc8392929b0cfd1b8c281727..f96192b52fa05049eebcf48cb97c594ae3e73122 100644 --- a/res/ari/resource_channels.c +++ b/res/ari/resource_channels.c @@ -833,32 +833,19 @@ void ast_ari_channels_get(struct ast_variable *headers, struct ast_ari_channels_get_args *args, struct ast_ari_response *response) { - RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); - struct stasis_cache *cache; struct ast_channel_snapshot *snapshot; - cache = ast_channel_cache(); - if (!cache) { - ast_ari_response_error( - response, 500, "Internal Server Error", - "Message bus not initialized"); - return; - } - - msg = stasis_cache_get(cache, ast_channel_snapshot_type(), - args->channel_id); - if (!msg) { + snapshot = ast_channel_snapshot_get_latest(args->channel_id); + if (!snapshot) { ast_ari_response_error( response, 404, "Not Found", "Channel not found"); return; } - snapshot = stasis_message_data(msg); - ast_assert(snapshot != NULL); - ast_ari_response_ok(response, ast_channel_snapshot_to_json(snapshot, NULL)); + ao2_ref(snapshot, -1); } void ast_ari_channels_hangup(struct ast_variable *headers, @@ -903,27 +890,13 @@ void ast_ari_channels_list(struct ast_variable *headers, struct ast_ari_channels_list_args *args, struct ast_ari_response *response) { - RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup); RAII_VAR(struct ao2_container *, snapshots, NULL, ao2_cleanup); RAII_VAR(struct ast_json *, json, NULL, ast_json_unref); struct ao2_iterator i; void *obj; struct stasis_message_sanitizer *sanitize = stasis_app_get_sanitizer(); - cache = ast_channel_cache(); - if (!cache) { - ast_ari_response_error( - response, 500, "Internal Server Error", - "Message bus not initialized"); - return; - } - ao2_ref(cache, +1); - - snapshots = stasis_cache_dump(cache, ast_channel_snapshot_type()); - if (!snapshots) { - ast_ari_response_alloc_failed(response); - return; - } + snapshots = ast_channel_cache_all(); json = ast_json_array_create(); if (!json) { @@ -933,12 +906,12 @@ void ast_ari_channels_list(struct ast_variable *headers, i = ao2_iterator_init(snapshots, 0); while ((obj = ao2_iterator_next(&i))) { - RAII_VAR(struct stasis_message *, msg, obj, ao2_cleanup); - struct ast_channel_snapshot *snapshot = stasis_message_data(msg); + struct ast_channel_snapshot *snapshot = obj; int r; if (sanitize && sanitize->channel_snapshot && sanitize->channel_snapshot(snapshot)) { + ao2_ref(snapshot, -1); continue; } @@ -947,8 +920,10 @@ void ast_ari_channels_list(struct ast_variable *headers, if (r != 0) { ast_ari_response_alloc_failed(response); ao2_iterator_destroy(&i); + ao2_ref(snapshot, -1); return; } + ao2_ref(snapshot, -1); } ao2_iterator_destroy(&i); diff --git a/res/res_agi.c b/res/res_agi.c index 0931c1a0727a760e050715680120c610ed7ec58d..e322d7ff61f11da51bb8e04e49a8fda3f182f2cd 100644 --- a/res/res_agi.c +++ b/res/res_agi.c @@ -3182,13 +3182,13 @@ static int handle_channelstatus(struct ast_channel *chan, AGI *agi, int argc, co ast_agi_send(agi->fd, chan, "200 result=%u\n", ast_channel_state(chan)); return RESULT_SUCCESS; } else if (argc == 3) { - RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + struct ast_channel_snapshot *snapshot; /* one argument: look for info on the specified channel */ - if ((msg = stasis_cache_get(ast_channel_cache_by_name(), ast_channel_snapshot_type(), argv[2]))) { - struct ast_channel_snapshot *snapshot = stasis_message_data(msg); - + snapshot = ast_channel_snapshot_get_latest_by_name(argv[2]); + if (snapshot) { ast_agi_send(agi->fd, chan, "200 result=%u\n", snapshot->state); + ao2_ref(snapshot, -1); return RESULT_SUCCESS; } /* if we get this far no channel name matched the argument given */ diff --git a/res/res_chan_stats.c b/res/res_chan_stats.c index dbc79f03ef073afe155b5ea8f73aeed501f05b5c..bed95a03fb067d807ad6c09d3e97265a20bf70a4 100644 --- a/res/res_chan_stats.c +++ b/res/res_chan_stats.c @@ -78,7 +78,7 @@ static void statsmaker(void *data, struct stasis_subscription *sub, } /*! - * \brief Router callback for \ref stasis_cache_update messages. + * \brief Router callback for \ref ast_channel_snapshot_update messages. * \param data Data pointer given when added to router. * \param sub This subscription. * \param topic The topic the message was posted to. This is not necessarily the @@ -92,34 +92,25 @@ static void updates(void *data, struct stasis_subscription *sub, /* Since this came from a message router, we know the type of the * message. We can cast the data without checking its type. */ - struct stasis_cache_update *update = stasis_message_data(message); + struct ast_channel_snapshot_update *update = stasis_message_data(message); - /* We're only interested in channel snapshots, so check the type - * of the underlying message. - */ - if (ast_channel_snapshot_type() != update->type) { - return; - } - - /* There are three types of cache updates. - * !old && new -> Initial cache entry - * old && new -> Updated cache entry - * old && !new -> Cache entry removed. + /* There are three types of channel snapshot updates. + * !old && new -> Initial channel creation + * old && new -> Updated channel snapshot + * old && dead -> Final channel snapshot */ if (!update->old_snapshot && update->new_snapshot) { - /* Initial cache entry; count a channel creation */ + /* Initial channel snapshot; count a channel creation */ ast_statsd_log_string("channels.count", AST_STATSD_GAUGE, "+1", 1.0); - } else if (update->old_snapshot && !update->new_snapshot) { - /* Cache entry removed. Compute the age of the channel and post + } else if (update->old_snapshot && ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) { + /* Channel is gone. Compute the age of the channel and post * that, as well as decrementing the channel count. */ - struct ast_channel_snapshot *last; int64_t age; - last = stasis_message_data(update->old_snapshot); age = ast_tvdiff_ms(*stasis_message_timestamp(message), - last->creationtime); + update->new_snapshot->creationtime); ast_statsd_log("channels.calltime", AST_STATSD_TIMER, age); /* And decrement the channel count */ @@ -161,11 +152,11 @@ static int load_module(void) { /* You can create a message router to route messages by type */ router = stasis_message_router_create( - ast_channel_topic_all_cached()); + ast_channel_topic_all()); if (!router) { return AST_MODULE_LOAD_DECLINE; } - stasis_message_router_add(router, stasis_cache_update_type(), + stasis_message_router_add(router, ast_channel_snapshot_type(), updates, NULL); stasis_message_router_set_default(router, default_route, NULL); diff --git a/res/stasis/app.c b/res/stasis/app.c index 18ac7d6edb6a922f23e95be88011ce7f6a034260..b4f3bc6cfc422413ede0122fb23dcd0e16b7572c 100644 --- a/res/stasis/app.c +++ b/res/stasis/app.c @@ -144,17 +144,11 @@ static struct app_forwards *forwards_create_channel(struct stasis_app *app, } forwards->forward_type = FORWARD_CHANNEL; - if (chan) { - forwards->topic_forward = stasis_forward_all(ast_channel_topic(chan), - app->topic); - } - forwards->topic_cached_forward = stasis_forward_all( - chan ? ast_channel_topic_cached(chan) : ast_channel_topic_all_cached(), + forwards->topic_forward = stasis_forward_all( + chan ? ast_channel_topic(chan) : ast_channel_topic_all(), app->topic); - if ((!forwards->topic_forward && chan) || !forwards->topic_cached_forward) { - /* Half-subscribed is a bad thing */ - forwards_unsubscribe(forwards); + if (!forwards->topic_forward) { ao2_ref(forwards, -1); return NULL; } @@ -420,7 +414,7 @@ static struct ast_json *channel_state( if (!old_snapshot) { return channel_created_event(snapshot, tv); - } else if (!new_snapshot) { + } else if (ast_test_flag(&new_snapshot->flags, AST_FLAG_DEAD)) { return channel_destroyed_event(snapshot, tv); } else if (old_snapshot->state != new_snapshot->state) { return channel_state_change_event(snapshot, tv); @@ -436,8 +430,8 @@ static struct ast_json *channel_dialplan( { struct ast_json *json_channel; - /* No Newexten event on cache clear or first event */ - if (!old_snapshot || !new_snapshot) { + /* No Newexten event on first channel snapshot */ + if (!old_snapshot) { return NULL; } @@ -470,8 +464,8 @@ static struct ast_json *channel_callerid( { struct ast_json *json_channel; - /* No NewCallerid event on cache clear or first event */ - if (!old_snapshot || !new_snapshot) { + /* No NewCallerid event on first channel snapshot */ + if (!old_snapshot) { return NULL; } @@ -500,8 +494,8 @@ static struct ast_json *channel_connected_line( { struct ast_json *json_channel; - /* No ChannelConnectedLine event on cache clear or first event */ - if (!old_snapshot || !new_snapshot) { + /* No ChannelConnectedLine event on first channel snapshot */ + if (!old_snapshot) { return NULL; } @@ -532,39 +526,22 @@ static void sub_channel_update_handler(void *data, struct stasis_message *message) { struct stasis_app *app = data; - struct stasis_cache_update *update; - struct ast_channel_snapshot *new_snapshot; - struct ast_channel_snapshot *old_snapshot; - const struct timeval *tv; + struct ast_channel_snapshot_update *update = stasis_message_data(message); int i; - ast_assert(stasis_message_type(message) == stasis_cache_update_type()); - - update = stasis_message_data(message); - - ast_assert(update->type == ast_channel_snapshot_type()); - - new_snapshot = stasis_message_data(update->new_snapshot); - old_snapshot = stasis_message_data(update->old_snapshot); - - /* Pull timestamp from the new snapshot, or from the update message - * when there isn't one. */ - tv = update->new_snapshot ? - stasis_message_timestamp(update->new_snapshot) : - stasis_message_timestamp(message); - for (i = 0; i < ARRAY_LEN(channel_monitors); ++i) { struct ast_json *msg; - msg = channel_monitors[i](old_snapshot, new_snapshot, tv); + msg = channel_monitors[i](update->old_snapshot, update->new_snapshot, + stasis_message_timestamp(message)); if (msg) { app_send(app, msg); ast_json_unref(msg); } } - if (!new_snapshot && old_snapshot) { - unsubscribe(app, "channel", old_snapshot->uniqueid, 1); + if (ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) { + unsubscribe(app, "channel", update->new_snapshot->uniqueid, 1); } } @@ -987,7 +964,7 @@ struct stasis_app *app_create(const char *name, stasis_app_cb handler, void *dat res |= stasis_message_router_add_cache_update(app->router, ast_bridge_snapshot_type(), sub_bridge_update_handler, app); - res |= stasis_message_router_add_cache_update(app->router, + res |= stasis_message_router_add(app->router, ast_channel_snapshot_type(), sub_channel_update_handler, app); res |= stasis_message_router_add_cache_update(app->router, diff --git a/res/stasis/control.c b/res/stasis/control.c index e4d007cbe7570133cee8f1571890bf4d754c8db4..5b3b048757c052c218e843109b04069fe9d606dd 100644 --- a/res/stasis/control.c +++ b/res/stasis/control.c @@ -773,22 +773,7 @@ void stasis_app_control_silence_stop(struct stasis_app_control *control) struct ast_channel_snapshot *stasis_app_control_get_snapshot( const struct stasis_app_control *control) { - struct stasis_message *msg; - struct ast_channel_snapshot *snapshot; - - msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(), - stasis_app_control_get_channel_id(control)); - if (!msg) { - return NULL; - } - - snapshot = stasis_message_data(msg); - ast_assert(snapshot != NULL); - - ao2_ref(snapshot, +1); - ao2_ref(msg, -1); - - return snapshot; + return ast_channel_snapshot_get_latest(stasis_app_control_get_channel_id(control)); } static int app_send_command_on_condition(struct stasis_app_control *control, diff --git a/tests/test_cel.c b/tests/test_cel.c index 0b17d488b6956995f3f61e4372376343537f696d..c1e73403bd7d86c8e528565f5e15b77d2233f53b 100644 --- a/tests/test_cel.c +++ b/tests/test_cel.c @@ -276,8 +276,7 @@ static void do_sleep(void) ast_hangup((channel)); \ HANGUP_EVENT(channel, cause, dialstatus); \ APPEND_EVENT(channel, AST_CEL_CHANNEL_END, NULL, NULL); \ - ao2_cleanup(stasis_cache_get(ast_channel_cache(), \ - ast_channel_snapshot_type(), ast_channel_uniqueid(channel))); \ + ao2_cleanup(ast_channel_snapshot_get_latest(ast_channel_uniqueid(channel))); \ ao2_cleanup(channel); \ channel = NULL; \ } while (0) diff --git a/tests/test_stasis_endpoints.c b/tests/test_stasis_endpoints.c index 9a49dd8fb9d5177445dafba334013e6d86b60b6b..8408f364c3c748418bd5607d0ded8c0228913c92 100644 --- a/tests/test_stasis_endpoints.c +++ b/tests/test_stasis_endpoints.c @@ -255,32 +255,18 @@ AST_TEST_DEFINE(channel_messages) ast_hangup(chan); chan = NULL; - actual_count = stasis_message_sink_wait_for_count(sink, 6, + actual_count = stasis_message_sink_wait_for_count(sink, 3, STASIS_SINK_DEFAULT_WAIT); - ast_test_validate(test, 6 == actual_count); + ast_test_validate(test, 3 == actual_count); msg = sink->messages[1]; type = stasis_message_type(msg); - ast_test_validate(test, stasis_cache_update_type() == type); - - msg = sink->messages[2]; - type = stasis_message_type(msg); ast_test_validate(test, ast_channel_snapshot_type() == type); - msg = sink->messages[3]; - type = stasis_message_type(msg); - ast_test_validate(test, stasis_cache_update_type() == type); - - /* The ordering of the cache clear and endpoint snapshot are - * unspecified */ - msg = sink->messages[4]; - if (stasis_message_type(msg) == stasis_cache_clear_type()) { - /* Okay; the next message should be the endpoint snapshot */ - msg = sink->messages[5]; - } - + msg = sink->messages[2]; type = stasis_message_type(msg); ast_test_validate(test, ast_endpoint_snapshot_type() == type); + actual_snapshot = stasis_message_data(msg); ast_test_validate(test, 0 == actual_snapshot->num_channels);