diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 6d423d901522c5e93e9e6bd8a1da7a8ac26726ae..2e274a67baf8e7cc36630ecb249e4c6e356af791 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -604,8 +604,14 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st * has been subscribed. This occurs immediately before accepted message * types can be set and the callback must expect to receive it. */ +#ifdef AST_DEVMODE +struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, + stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func); +#define stasis_subscribe(topic, callback, data) __stasis_subscribe(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__) +#else struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data); +#endif /*! * \brief Create a subscription whose callbacks occur on a thread pool @@ -633,8 +639,14 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, * has been subscribed. This occurs immediately before accepted message * types can be set and the callback must expect to receive it. */ +#ifdef AST_DEVMODE +struct stasis_subscription *__stasis_subscribe_pool(struct stasis_topic *topic, + stasis_subscription_cb callback, void *data, const char *file, int lineno, const char *func); +#define stasis_subscribe_pool(topic, callback, data) __stasis_subscribe_pool(topic, callback, data, __FILE__, __LINE__, __PRETTY_FUNCTION__) +#else struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data); +#endif /*! * \brief Indicate to a subscription that we are interested in a message type. diff --git a/include/asterisk/stasis_internal.h b/include/asterisk/stasis_internal.h index bc6122c2b2038ac9fd4842d5bbde1cdbfb87838f..c9df032210a22da17734095d27d516887b72f9b9 100644 --- a/include/asterisk/stasis_internal.h +++ b/include/asterisk/stasis_internal.h @@ -60,11 +60,23 @@ * \return \c NULL on error. * \since 12 */ +#ifdef AST_DEVMODE +struct stasis_subscription *internal_stasis_subscribe( + struct stasis_topic *topic, + stasis_subscription_cb callback, + void *data, + int needs_mailbox, + int use_thread_pool, + const char *file, + int lineno, + const char *func); +#else struct stasis_subscription *internal_stasis_subscribe( struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool); +#endif #endif /* STASIS_INTERNAL_H_ */ diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index 9897d62b7dea35c8786ccdea0a5a019c95bd9582..93a2140cb46153f75d7835e16569cb65c0c29bdf 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -55,8 +55,14 @@ struct stasis_message_router; * * \since 12 */ +#ifdef AST_DEVMODE +struct stasis_message_router *__stasis_message_router_create( + struct stasis_topic *topic, const char *file, int lineno, const char *func); +#define stasis_message_router_create(topic) __stasis_message_router_create(topic, __FILE__, __LINE__, __PRETTY_FUNCTION__) +#else struct stasis_message_router *stasis_message_router_create( struct stasis_topic *topic); +#endif /*! * \brief Create a new message router object. @@ -71,8 +77,14 @@ struct stasis_message_router *stasis_message_router_create( * * \since 12.8.0 */ +#ifdef AST_DEVMODE +struct stasis_message_router *__stasis_message_router_create_pool( + struct stasis_topic *topic, const char *file, int lineno, const char *func); +#define stasis_message_router_create_pool(topic) __stasis_message_router_create_pool(topic, __FILE__, __LINE__, __PRETTY_FUNCTION__) +#else struct stasis_message_router *stasis_message_router_create_pool( struct stasis_topic *topic); +#endif /*! * \brief Unsubscribe the router from the upstream topic. diff --git a/main/asterisk.c b/main/asterisk.c index 7eb49df317a01d0317c5ca0437beca090cd23239..36e956fd06c005819471b040853ce1d3580704b7 100644 --- a/main/asterisk.c +++ b/main/asterisk.c @@ -4070,6 +4070,8 @@ static void asterisk_daemon(int isroot, const char *runuser, const char *rungrou check_init(ast_tps_init(), "Task Processor Core"); check_init(ast_fd_init(), "File Descriptor Debugging"); check_init(ast_pbx_init(), "ast_pbx_init"); + check_init(aco_init(), "Configuration Option Framework"); + check_init(stasis_init(), "Stasis"); #ifdef TEST_FRAMEWORK check_init(ast_test_init(), "Test Framework"); #endif @@ -4082,9 +4084,7 @@ static void asterisk_daemon(int isroot, const char *runuser, const char *rungrou check_init(ast_format_init(), "Formats"); check_init(ast_format_cache_init(), "Format Cache"); check_init(ast_codec_builtin_init(), "Built-in Codecs"); - check_init(aco_init(), "Configuration Option Framework"); check_init(ast_bucket_init(), "Bucket API"); - check_init(stasis_init(), "Stasis"); check_init(ast_stasis_system_init(), "Stasis system-level information"); check_init(ast_endpoint_stasis_init(), "Stasis Endpoint"); diff --git a/main/asterisk.exports.in b/main/asterisk.exports.in index f3549e6cacceb673bbf0d64232ec764df50de542..02328551ea25801fa424fdd71ce392b1c5992c49 100644 --- a/main/asterisk.exports.in +++ b/main/asterisk.exports.in @@ -27,6 +27,7 @@ LINKER_SYMBOL_PREFIXstrsep; LINKER_SYMBOL_PREFIXsetenv; LINKER_SYMBOL_PREFIXstasis_*; + LINKER_SYMBOL_PREFIX__stasis_*; LINKER_SYMBOL_PREFIXunsetenv; LINKER_SYMBOL_PREFIXstrcasestr; LINKER_SYMBOL_PREFIXstrnlen; diff --git a/main/stasis.c b/main/stasis.c index 69ec1a5269662c3ce1c1041e780f5ce958458d3c..3216e217399fb05457c787197e743d8438b58c6a 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -41,6 +41,9 @@ #include "asterisk/stasis_bridges.h" #include "asterisk/stasis_endpoints.h" #include "asterisk/config_options.h" +#ifdef AST_DEVMODE +#include "asterisk/cli.h" +#endif /*** DOCUMENTATION <managerEvent language="en_US" name="UserEvent"> @@ -304,14 +307,67 @@ static struct ast_threadpool *pool; STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type); +#ifdef AST_DEVMODE + +/*! The number of buckets to use for topic statistics */ +#define TOPIC_STATISTICS_BUCKETS 57 + +/*! The number of buckets to use for subscription statistics */ +#define SUBSCRIPTION_STATISTICS_BUCKETS 57 + +/*! Container which stores statistics for topics */ +static struct ao2_container *topic_statistics; + +/*! Container which stores statistics for subscriptions */ +static struct ao2_container *subscription_statistics; + +/*! \internal */ +struct stasis_message_type_statistics { + /*! \brief The number of messages of this published */ + int published; + /*! \brief The number of messages of this that did not reach a subscriber */ + int unused; + /*! \brief The stasis message type */ + struct stasis_message_type *message_type; +}; + +/*! Lock to protect the message types vector */ +AST_MUTEX_DEFINE_STATIC(message_type_statistics_lock); + +/*! Vector containing message type information */ +static AST_VECTOR(, struct stasis_message_type_statistics) message_type_statistics; + +/*! \internal */ +struct stasis_topic_statistics { + /*! \brief The number of messages that were not dispatched to any subscriber */ + int messages_not_dispatched; + /*! \brief The number of messages that were dispatched to at least 1 subscriber */ + int messages_dispatched; + /*! \brief Highest time spent dispatching messages to subscribers */ + int64_t highest_time_dispatched; + /*! \brief Lowest time spent dispatching messages to subscribers */ + int64_t lowest_time_dispatched; + /*! \brief The number of subscribers to this topic */ + int subscriber_count; + /*! \brief Name of the topic */ + char name[0]; +}; +#endif + /*! \internal */ struct stasis_topic { - char *name; /*! Variable length array of the subscribers */ AST_VECTOR(, struct stasis_subscription *) subscribers; /*! Topics forwarding into this topic */ AST_VECTOR(, struct stasis_topic *) upstream_topics; + +#ifdef AST_DEVMODE + struct stasis_topic_statistics *statistics; +#endif + + /*! Name of the topic */ + char name[0]; }; /* Forward declarations for the tightly-coupled subscription object */ @@ -337,28 +393,54 @@ static void topic_dtor(void *obj) * unsubscribed before we get here. */ ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0); - ast_free(topic->name); - topic->name = NULL; - AST_VECTOR_FREE(&topic->subscribers); AST_VECTOR_FREE(&topic->upstream_topics); + +#ifdef AST_DEVMODE + if (topic->statistics) { + ao2_unlink(topic_statistics, topic->statistics); + ao2_ref(topic->statistics, -1); + } +#endif +} + +#ifdef AST_DEVMODE +static struct stasis_topic_statistics *stasis_topic_statistics_create(const char *name) +{ + struct stasis_topic_statistics *statistics; + + statistics = ao2_alloc(sizeof(*statistics) + strlen(name) + 1, NULL); + if (!statistics) { + return NULL; + } + + strcpy(statistics->name, name); /* SAFE */ + ao2_link(topic_statistics, statistics); + + return statistics; } +#endif struct stasis_topic *stasis_topic_create(const char *name) { struct stasis_topic *topic; int res = 0; - topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name); + topic = ao2_t_alloc(sizeof(*topic) + strlen(name) + 1, topic_dtor, name); if (!topic) { return NULL; } - topic->name = ast_strdup(name); + strcpy(topic->name, name); /* SAFE */ res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX); res |= AST_VECTOR_INIT(&topic->upstream_topics, 0); +#ifdef AST_DEVMODE + topic->statistics = stasis_topic_statistics_create(name); + if (!topic->name || !topic->statistics || res) { +#else if (!topic->name || res) { - ao2_cleanup(topic); +#endif + ao2_ref(topic, -1); return NULL; } @@ -375,6 +457,35 @@ size_t stasis_topic_subscribers(const struct stasis_topic *topic) return AST_VECTOR_SIZE(&topic->subscribers); } +#ifdef AST_DEVMODE +struct stasis_subscription_statistics { + /*! \brief The filename where the subscription originates */ + const char *file; + /*! \brief The line number where the subscription originates */ + int lineno; + /*! \brief The function where the subscription originates */ + const char *func; + /*! \brief The number of messages that were filtered out */ + int messages_dropped; + /*! \brief The number of messages that passed filtering */ + int messages_passed; + /*! \brief Highest time spent invoking a message */ + int64_t highest_time_invoked; + /*! \brief The message type that currently took the longest to process */ + struct stasis_message_type *highest_time_message_type; + /*! \brief Lowest time spent invoking a message */ + int64_t lowest_time_invoked; + /*! \brief Using a mailbox to queue messages */ + int uses_mailbox; + /*! \brief Using stasis threadpool for handling messages */ + int uses_threadpool; + /*! \brief Name of the topic we subscribed to */ + char *topic; + /*! \brief Unique ID of the subscription */ + char uniqueid[0]; +}; +#endif + /*! \internal */ struct stasis_subscription { /*! Unique ID for this subscription */ @@ -403,6 +514,11 @@ struct stasis_subscription { enum stasis_subscription_message_formatters accepted_formatters; /*! The message filter currently in use */ enum stasis_subscription_message_filter filter; + +#ifdef AST_DEVMODE + /*! Statistics information */ + struct stasis_subscription_statistics *statistics; +#endif }; static void subscription_dtor(void *obj) @@ -423,6 +539,13 @@ static void subscription_dtor(void *obj) ast_cond_destroy(&sub->join_cond); AST_VECTOR_FREE(&sub->accepted_message_types); + +#ifdef AST_DEVMODE + if (sub->statistics) { + ao2_unlink(subscription_statistics, sub->statistics); + ao2_ref(sub->statistics, -1); + } +#endif } /*! @@ -436,6 +559,12 @@ static void subscription_invoke(struct stasis_subscription *sub, { unsigned int final = stasis_subscription_final_message(sub, message); int message_type_id = stasis_message_type_id(stasis_subscription_change_type()); +#ifdef AST_DEVMODE + struct timeval start; + int elapsed; + + start = ast_tvnow(); +#endif /* Notify that the final message has been received */ if (final) { @@ -462,6 +591,19 @@ static void subscription_invoke(struct stasis_subscription *sub, ast_cond_signal(&sub->join_cond); ao2_unlock(sub); } + +#ifdef AST_DEVMODE + elapsed = ast_tvdiff_ms(ast_tvnow(), start); + if (elapsed > sub->statistics->highest_time_invoked) { + sub->statistics->highest_time_invoked = elapsed; + ao2_lock(sub->statistics); + sub->statistics->highest_time_message_type = stasis_message_type(message); + ao2_unlock(sub->statistics); + } + if (elapsed < sub->statistics->lowest_time_invoked) { + sub->statistics->lowest_time_invoked = elapsed; + } +#endif } static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub); @@ -471,12 +613,51 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st { } +#ifdef AST_DEVMODE +static struct stasis_subscription_statistics *stasis_subscription_statistics_create(const char *uniqueid, + const char *topic, int needs_mailbox, int use_thread_pool, const char *file, int lineno, + const char *func) +{ + struct stasis_subscription_statistics *statistics; + size_t uniqueid_len = strlen(uniqueid) + 1; + + statistics = ao2_alloc(sizeof(*statistics) + uniqueid_len + strlen(topic) + 1, NULL); + if (!statistics) { + return NULL; + } + + statistics->file = file; + statistics->lineno = lineno; + statistics->func = func; + statistics->uses_mailbox = needs_mailbox; + statistics->uses_threadpool = use_thread_pool; + strcpy(statistics->uniqueid, uniqueid); /* SAFE */ + statistics->topic = statistics->uniqueid + uniqueid_len; + strcpy(statistics->topic, topic); /* SAFE */ + ao2_link(subscription_statistics, statistics); + + return statistics; +} +#endif + +#ifdef AST_DEVMODE +struct stasis_subscription *internal_stasis_subscribe( + struct stasis_topic *topic, + stasis_subscription_cb callback, + void *data, + int needs_mailbox, + int use_thread_pool, + const char *file, + int lineno, + const char *func) +#else struct stasis_subscription *internal_stasis_subscribe( struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox, int use_thread_pool) +#endif { struct stasis_subscription *sub; @@ -491,6 +672,15 @@ struct stasis_subscription *internal_stasis_subscribe( } ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid)); +#ifdef AST_DEVMODE + sub->statistics = stasis_subscription_statistics_create(sub->uniqueid, topic->name, needs_mailbox, + use_thread_pool, file, lineno, func); + if (!sub->statistics) { + ao2_ref(sub, -1); + return NULL; + } +#endif + if (needs_mailbox) { char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; @@ -538,6 +728,18 @@ struct stasis_subscription *internal_stasis_subscribe( return sub; } +#ifdef AST_DEVMODE +struct stasis_subscription *__stasis_subscribe( + struct stasis_topic *topic, + stasis_subscription_cb callback, + void *data, + const char *file, + int lineno, + const char *func) +{ + return internal_stasis_subscribe(topic, callback, data, 1, 0, file, lineno, func); +} +#else struct stasis_subscription *stasis_subscribe( struct stasis_topic *topic, stasis_subscription_cb callback, @@ -545,7 +747,20 @@ struct stasis_subscription *stasis_subscribe( { return internal_stasis_subscribe(topic, callback, data, 1, 0); } +#endif +#ifdef AST_DEVMODE +struct stasis_subscription *__stasis_subscribe_pool( + struct stasis_topic *topic, + stasis_subscription_cb callback, + void *data, + const char *file, + int lineno, + const char *func) +{ + return internal_stasis_subscribe(topic, callback, data, 1, 1, file, lineno, func); +} +#else struct stasis_subscription *stasis_subscribe_pool( struct stasis_topic *topic, stasis_subscription_cb callback, @@ -553,6 +768,7 @@ struct stasis_subscription *stasis_subscribe_pool( { return internal_stasis_subscribe(topic, callback, data, 1, 1); } +#endif static int sub_cleanup(void *data) { @@ -808,6 +1024,11 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs topic_add_subscription( AST_VECTOR_GET(&topic->upstream_topics, idx), sub); } + +#ifdef AST_DEVMODE + topic->statistics->subscriber_count += 1; +#endif + ao2_unlock(topic); return 0; @@ -825,6 +1046,13 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s } res = AST_VECTOR_REMOVE_ELEM_UNORDERED(&topic->subscribers, sub, AST_VECTOR_ELEM_CLEANUP_NOOP); + +#ifdef AST_DEVMODE + if (!res) { + topic->statistics->subscriber_count -= 1; + } +#endif + ao2_unlock(topic); return res; @@ -885,8 +1113,10 @@ static int dispatch_exec_sync(struct ast_taskprocessor_local *local) * \param message The message to send * \param synchronous If non-zero, synchronize on the subscriber receiving * the message + * \retval 0 if message was not dispatched + * \retval 1 if message was dispatched */ -static void dispatch_message(struct stasis_subscription *sub, +static unsigned int dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous) { @@ -938,14 +1168,22 @@ static void dispatch_message(struct stasis_subscription *sub, break; } - return; +#ifdef AST_DEVMODE + ast_atomic_fetchadd_int(&sub->statistics->messages_dropped, +1); +#endif + + return 0; } while (0); +#ifdef AST_DEVMODE + ast_atomic_fetchadd_int(&sub->statistics->messages_passed, +1); +#endif + if (!sub->mailbox) { /* Dispatch directly */ subscription_invoke(sub, message); - return; + return 1; } /* Bump the message for the taskprocessor push. This will get de-ref'd @@ -957,6 +1195,7 @@ static void dispatch_message(struct stasis_subscription *sub, /* Push failed; ugh. */ ast_log(LOG_ERROR, "Dropping async dispatch\n"); ao2_cleanup(message); + return 0; } } else { struct sync_task_data std; @@ -972,7 +1211,7 @@ static void dispatch_message(struct stasis_subscription *sub, ao2_cleanup(message); ast_mutex_destroy(&std.lock); ast_cond_destroy(&std.cond); - return; + return 0; } ast_mutex_lock(&std.lock); @@ -984,6 +1223,8 @@ static void dispatch_message(struct stasis_subscription *sub, ast_mutex_destroy(&std.lock); ast_cond_destroy(&std.cond); } + + return 1; } /*! @@ -997,12 +1238,41 @@ static void publish_msg(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sync_sub) { size_t i; + unsigned int dispatched = 0; +#ifdef AST_DEVMODE + int message_type_id = stasis_message_type_id(stasis_message_type(message)); + struct stasis_message_type_statistics *statistics; + struct timeval start; + int elapsed; +#endif ast_assert(topic != NULL); ast_assert(message != NULL); +#ifdef AST_DEVMODE + ast_mutex_lock(&message_type_statistics_lock); + if (message_type_id >= AST_VECTOR_SIZE(&message_type_statistics)) { + struct stasis_message_type_statistics new_statistics = { + .published = 0, + }; + if (AST_VECTOR_REPLACE(&message_type_statistics, message_type_id, new_statistics)) { + ast_mutex_unlock(&message_type_statistics_lock); + return; + } + } + statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, message_type_id); + statistics->message_type = stasis_message_type(message); + ast_mutex_unlock(&message_type_statistics_lock); + + ast_atomic_fetchadd_int(&statistics->published, +1); +#endif + /* If there are no subscribers don't bother */ if (!stasis_topic_subscribers(topic)) { +#ifdef AST_DEVMODE + ast_atomic_fetchadd_int(&statistics->unused, +1); + ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1); +#endif return; } @@ -1011,15 +1281,35 @@ static void publish_msg(struct stasis_topic *topic, * Make sure we hold onto a reference while dispatching. */ ao2_ref(topic, +1); +#ifdef AST_DEVMODE + start = ast_tvnow(); +#endif ao2_lock(topic); for (i = 0; i < AST_VECTOR_SIZE(&topic->subscribers); ++i) { struct stasis_subscription *sub = AST_VECTOR_GET(&topic->subscribers, i); ast_assert(sub != NULL); - dispatch_message(sub, message, (sub == sync_sub)); + dispatched += dispatch_message(sub, message, (sub == sync_sub)); } ao2_unlock(topic); + +#ifdef AST_DEVMODE + elapsed = ast_tvdiff_ms(ast_tvnow(), start); + if (elapsed > topic->statistics->highest_time_dispatched) { + topic->statistics->highest_time_dispatched = elapsed; + } + if (elapsed < topic->statistics->lowest_time_dispatched) { + topic->statistics->lowest_time_dispatched = elapsed; + } + if (dispatched) { + ast_atomic_fetchadd_int(&topic->statistics->messages_dispatched, +1); + } else { + ast_atomic_fetchadd_int(&statistics->unused, +1); + ast_atomic_fetchadd_int(&topic->statistics->messages_not_dispatched, +1); + } +#endif + ao2_ref(topic, -1); } @@ -1805,9 +2095,458 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type, /*! @} */ +#ifdef AST_DEVMODE + +/*! + * \internal + * \brief CLI command implementation for 'stasis statistics show subscriptions' + */ +static char *statistics_show_subscriptions(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + struct ao2_iterator iter; + struct stasis_subscription_statistics *statistics; + int count = 0; + int dropped = 0; + int passed = 0; +#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n" +#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n" +#define FMT_FIELDS2 "%-64s %10d %10d\n" + + switch (cmd) { + case CLI_INIT: + e->command = "stasis statistics show subscriptions"; + e->usage = + "Usage: stasis statistics show subscriptions\n" + " Shows a list of subscriptions and their general statistics\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + + if (a->argc != e->args) { + return CLI_SHOWUSAGE; + } + + ast_cli(a->fd, "\n" FMT_HEADERS, "Subscription", "Dropped", "Passed", "Lowest Invoke", "Highest Invoke"); + + iter = ao2_iterator_init(subscription_statistics, 0); + while ((statistics = ao2_iterator_next(&iter))) { + ast_cli(a->fd, FMT_FIELDS, statistics->uniqueid, statistics->messages_dropped, statistics->messages_passed, + statistics->lowest_time_invoked, statistics->highest_time_invoked); + dropped += statistics->messages_dropped; + passed += statistics->messages_passed; + ao2_ref(statistics, -1); + ++count; + } + ao2_iterator_destroy(&iter); + + ast_cli(a->fd, FMT_FIELDS2, "Total", dropped, passed); + ast_cli(a->fd, "\n%d subscriptions\n\n", count); + +#undef FMT_HEADERS +#undef FMT_FIELDS +#undef FMT_FIELDS2 + + return CLI_SUCCESS; +} + +/*! + * \internal + * \brief CLI tab completion for subscription statistics names + */ +static char *subscription_statistics_complete_name(const char *word, int state) +{ + struct stasis_subscription_statistics *statistics; + struct ao2_iterator it_statistics; + int wordlen = strlen(word); + int which = 0; + char *result = NULL; + + it_statistics = ao2_iterator_init(subscription_statistics, 0); + while ((statistics = ao2_iterator_next(&it_statistics))) { + if (!strncasecmp(word, statistics->uniqueid, wordlen) + && ++which > state) { + result = ast_strdup(statistics->uniqueid); + } + ao2_ref(statistics, -1); + if (result) { + break; + } + } + ao2_iterator_destroy(&it_statistics); + return result; +} + +/*! + * \internal + * \brief CLI command implementation for 'stasis statistics show subscription' + */ +static char *statistics_show_subscription(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + struct stasis_subscription_statistics *statistics; + + switch (cmd) { + case CLI_INIT: + e->command = "stasis statistics show subscription"; + e->usage = + "Usage: stasis statistics show subscription <uniqueid>\n" + " Show stasis subscription statistics.\n"; + return NULL; + case CLI_GENERATE: + if (a->pos == 4) { + return subscription_statistics_complete_name(a->word, a->n); + } else { + return NULL; + } + } + + if (a->argc != 5) { + return CLI_SHOWUSAGE; + } + + statistics = ao2_find(subscription_statistics, a->argv[4], OBJ_SEARCH_KEY); + if (!statistics) { + ast_cli(a->fd, "Specified subscription '%s' does not exist\n", a->argv[4]); + return CLI_FAILURE; + } + + ast_cli(a->fd, "Subscription: %s\n", statistics->uniqueid); + ast_cli(a->fd, "Topic: %s\n", statistics->topic); + ast_cli(a->fd, "Source filename: %s\n", S_OR(statistics->file, "<unavailable>")); + ast_cli(a->fd, "Source line number: %d\n", statistics->lineno); + ast_cli(a->fd, "Source function: %s\n", S_OR(statistics->func, "<unavailable>")); + ast_cli(a->fd, "Number of messages dropped due to filtering: %d\n", statistics->messages_dropped); + ast_cli(a->fd, "Number of messages passed to subscriber callback: %d\n", statistics->messages_passed); + ast_cli(a->fd, "Using mailbox to queue messages: %s\n", statistics->uses_mailbox ? "Yes" : "No"); + ast_cli(a->fd, "Using stasis threadpool for handling messages: %s\n", statistics->uses_threadpool ? "Yes" : "No"); + ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->lowest_time_invoked); + ast_cli(a->fd, "Highest amount of time (in milliseconds) spent invoking message: %ld\n", statistics->highest_time_invoked); + + ao2_lock(statistics); + if (statistics->highest_time_message_type) { + ast_cli(a->fd, "Offender message type for highest invoking time: %s\n", stasis_message_type_name(statistics->highest_time_message_type)); + } + ao2_unlock(statistics); + + ao2_ref(statistics, -1); + + return CLI_SUCCESS; +} + +/*! + * \internal + * \brief CLI command implementation for 'stasis statistics show topics' + */ +static char *statistics_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + struct ao2_iterator iter; + struct stasis_topic_statistics *statistics; + int count = 0; + int not_dispatched = 0; + int dispatched = 0; +#define FMT_HEADERS "%-64s %10s %10s %16s %16s\n" +#define FMT_FIELDS "%-64s %10d %10d %16ld %16ld\n" +#define FMT_FIELDS2 "%-64s %10d %10d\n" + + switch (cmd) { + case CLI_INIT: + e->command = "stasis statistics show topics"; + e->usage = + "Usage: stasis statistics show topics\n" + " Shows a list of topics and their general statistics\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + + if (a->argc != e->args) { + return CLI_SHOWUSAGE; + } + + ast_cli(a->fd, "\n" FMT_HEADERS, "Topic", "Dropped", "Dispatched", "Lowest Dispatch", "Highest Dispatch"); + + iter = ao2_iterator_init(topic_statistics, 0); + while ((statistics = ao2_iterator_next(&iter))) { + ast_cli(a->fd, FMT_FIELDS, statistics->name, statistics->messages_not_dispatched, statistics->messages_dispatched, + statistics->lowest_time_dispatched, statistics->highest_time_dispatched); + not_dispatched += statistics->messages_not_dispatched; + dispatched += statistics->messages_dispatched; + ao2_ref(statistics, -1); + ++count; + } + ao2_iterator_destroy(&iter); + + ast_cli(a->fd, FMT_FIELDS2, "Total", not_dispatched, dispatched); + ast_cli(a->fd, "\n%d topics\n\n", count); + +#undef FMT_HEADERS +#undef FMT_FIELDS +#undef FMT_FIELDS2 + + return CLI_SUCCESS; +} + +/*! + * \internal + * \brief CLI tab completion for topic statistics names + */ +static char *topic_statistics_complete_name(const char *word, int state) +{ + struct stasis_topic_statistics *statistics; + struct ao2_iterator it_statistics; + int wordlen = strlen(word); + int which = 0; + char *result = NULL; + + it_statistics = ao2_iterator_init(topic_statistics, 0); + while ((statistics = ao2_iterator_next(&it_statistics))) { + if (!strncasecmp(word, statistics->name, wordlen) + && ++which > state) { + result = ast_strdup(statistics->name); + } + ao2_ref(statistics, -1); + if (result) { + break; + } + } + ao2_iterator_destroy(&it_statistics); + return result; +} + +/*! + * \internal + * \brief CLI command implementation for 'stasis statistics show topic' + */ +static char *statistics_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + struct stasis_topic_statistics *statistics; + + switch (cmd) { + case CLI_INIT: + e->command = "stasis statistics show topic"; + e->usage = + "Usage: stasis statistics show topic <name>\n" + " Show stasis topic statistics.\n"; + return NULL; + case CLI_GENERATE: + if (a->pos == 4) { + return topic_statistics_complete_name(a->word, a->n); + } else { + return NULL; + } + } + + if (a->argc != 5) { + return CLI_SHOWUSAGE; + } + + statistics = ao2_find(topic_statistics, a->argv[4], OBJ_SEARCH_KEY); + if (!statistics) { + ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[4]); + return CLI_FAILURE; + } + + ast_cli(a->fd, "Topic: %s\n", statistics->name); + ast_cli(a->fd, "Number of messages published that went to no subscriber: %d\n", statistics->messages_not_dispatched); + ast_cli(a->fd, "Number of messages that went to at least one subscriber: %d\n", statistics->messages_dispatched); + ast_cli(a->fd, "Lowest amount of time (in milliseconds) spent dispatching message: %ld\n", statistics->lowest_time_dispatched); + ast_cli(a->fd, "Highest amount of time (in milliseconds) spent dispatching messages: %ld\n", statistics->highest_time_dispatched); + ast_cli(a->fd, "Number of subscribers: %d\n", statistics->subscriber_count); + + ao2_ref(statistics, -1); + + return CLI_SUCCESS; +} + +/*! + * \internal + * \brief CLI command implementation for 'stasis statistics show messages' + */ +static char *statistics_show_messages(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + int i; + int count = 0; + int published = 0; + int unused = 0; +#define FMT_HEADERS "%-64s %10s %10s\n" +#define FMT_FIELDS "%-64s %10d %10d\n" + + switch (cmd) { + case CLI_INIT: + e->command = "stasis statistics show messages"; + e->usage = + "Usage: stasis statistics show messages\n" + " Shows a list of message types and their general statistics\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + + if (a->argc != e->args) { + return CLI_SHOWUSAGE; + } + + ast_cli(a->fd, "\n" FMT_HEADERS, "Message Type", "Published", "Unused"); + + ast_mutex_lock(&message_type_statistics_lock); + for (i = 0; i < AST_VECTOR_SIZE(&message_type_statistics); ++i) { + struct stasis_message_type_statistics *statistics = AST_VECTOR_GET_ADDR(&message_type_statistics, i); + + if (!statistics->message_type) { + continue; + } + + ast_cli(a->fd, FMT_FIELDS, stasis_message_type_name(statistics->message_type), statistics->published, + statistics->unused); + published += statistics->published; + unused += statistics->unused; + ++count; + } + ast_mutex_unlock(&message_type_statistics_lock); + + ast_cli(a->fd, FMT_FIELDS, "Total", published, unused); + ast_cli(a->fd, "\n%d seen message types\n\n", count); + +#undef FMT_HEADERS +#undef FMT_FIELDS + + return CLI_SUCCESS; +} + +static struct ast_cli_entry cli_stasis_statistics[] = { + AST_CLI_DEFINE(statistics_show_subscriptions, "Show subscriptions with general statistics"), + AST_CLI_DEFINE(statistics_show_subscription, "Show subscription statistics"), + AST_CLI_DEFINE(statistics_show_topics, "Show topics with general statistics"), + AST_CLI_DEFINE(statistics_show_topic, "Show topic statistics"), + AST_CLI_DEFINE(statistics_show_messages, "Show message types with general statistics"), +}; + +static int subscription_statistics_hash(const void *obj, const int flags) +{ + const struct stasis_subscription_statistics *object; + const char *key; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_KEY: + key = obj; + break; + case OBJ_SEARCH_OBJECT: + object = obj; + key = object->uniqueid; + break; + default: + /* Hash can only work on something with a full key. */ + ast_assert(0); + return 0; + } + return ast_str_case_hash(key); +} + +static int subscription_statistics_cmp(void *obj, void *arg, int flags) +{ + const struct stasis_subscription_statistics *object_left = obj; + const struct stasis_subscription_statistics *object_right = arg; + const char *right_key = arg; + int cmp; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_OBJECT: + right_key = object_right->uniqueid; + /* Fall through */ + case OBJ_SEARCH_KEY: + cmp = strcasecmp(object_left->uniqueid, right_key); + break; + case OBJ_SEARCH_PARTIAL_KEY: + /* Not supported by container */ + ast_assert(0); + cmp = -1; + break; + default: + /* + * What arg points to is specific to this traversal callback + * and has no special meaning to astobj2. + */ + cmp = 0; + break; + } + if (cmp) { + return 0; + } + /* + * At this point the traversal callback is identical to a sorted + * container. + */ + return CMP_MATCH; +} + +static int topic_statistics_hash(const void *obj, const int flags) +{ + const struct stasis_topic_statistics *object; + const char *key; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_KEY: + key = obj; + break; + case OBJ_SEARCH_OBJECT: + object = obj; + key = object->name; + break; + default: + /* Hash can only work on something with a full key. */ + ast_assert(0); + return 0; + } + return ast_str_case_hash(key); +} + +static int topic_statistics_cmp(void *obj, void *arg, int flags) +{ + const struct stasis_topic_statistics *object_left = obj; + const struct stasis_topic_statistics *object_right = arg; + const char *right_key = arg; + int cmp; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_OBJECT: + right_key = object_right->name; + /* Fall through */ + case OBJ_SEARCH_KEY: + cmp = strcasecmp(object_left->name, right_key); + break; + case OBJ_SEARCH_PARTIAL_KEY: + /* Not supported by container */ + ast_assert(0); + cmp = -1; + break; + default: + /* + * What arg points to is specific to this traversal callback + * and has no special meaning to astobj2. + */ + cmp = 0; + break; + } + if (cmp) { + return 0; + } + /* + * At this point the traversal callback is identical to a sorted + * container. + */ + return CMP_MATCH; +} +#endif + /*! \brief Cleanup function for graceful shutdowns */ static void stasis_cleanup(void) { +#ifdef AST_DEVMODE + ast_cli_unregister_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics)); + AST_VECTOR_FREE(&message_type_statistics); + ao2_cleanup(subscription_statistics); + ao2_cleanup(topic_statistics); +#endif ast_threadpool_shutdown(pool); pool = NULL; STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type); @@ -1902,5 +2641,28 @@ int stasis_init(void) return -1; } +#ifdef AST_DEVMODE + /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying + * topic or subscripton. + */ + subscription_statistics = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, SUBSCRIPTION_STATISTICS_BUCKETS, + subscription_statistics_hash, 0, subscription_statistics_cmp); + if (!subscription_statistics) { + return -1; + } + + topic_statistics = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_STATISTICS_BUCKETS, + topic_statistics_hash, 0, topic_statistics_cmp); + if (!topic_statistics) { + return -1; + } + + AST_VECTOR_INIT(&message_type_statistics, 0); + + if (ast_cli_register_multiple(cli_stasis_statistics, ARRAY_LEN(cli_stasis_statistics))) { + return -1; + } +#endif + return 0; } diff --git a/main/stasis_cache.c b/main/stasis_cache.c index bc975fd3da2d0471e57486002bfa599e13bf6c46..5aa04fb1a739b8cc14e509a464e77109ec161e46 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -971,7 +971,11 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or } ast_free(new_name); +#ifdef AST_DEVMODE + caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0, __FILE__, __LINE__, __PRETTY_FUNCTION__); +#else caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0); +#endif if (caching_topic->sub == NULL) { ao2_ref(caching_topic, -1); diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 197f7f939f22ad89b739bf01fe42ddab1f8f6419..9a390ef3f3c9ced596c1974ebc2b9836d1147bae 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -204,8 +204,14 @@ static void router_dispatch(void *data, } } +#ifdef AST_DEVMODE +static struct stasis_message_router *stasis_message_router_create_internal( + struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno, + const char *func) +#else static struct stasis_message_router *stasis_message_router_create_internal( struct stasis_topic *topic, int use_thread_pool) +#endif { int res; struct stasis_message_router *router; @@ -224,11 +230,20 @@ static struct stasis_message_router *stasis_message_router_create_internal( return NULL; } +#ifdef AST_DEVMODE + if (use_thread_pool) { + router->subscription = __stasis_subscribe_pool(topic, router_dispatch, router, file, lineno, func); + } else { + router->subscription = __stasis_subscribe(topic, router_dispatch, router, file, lineno, func); + } +#else if (use_thread_pool) { router->subscription = stasis_subscribe_pool(topic, router_dispatch, router); } else { router->subscription = stasis_subscribe(topic, router_dispatch, router); } +#endif + if (!router->subscription) { ao2_ref(router, -1); @@ -241,17 +256,33 @@ static struct stasis_message_router *stasis_message_router_create_internal( return router; } +#ifdef AST_DEVMODE +struct stasis_message_router *__stasis_message_router_create( + struct stasis_topic *topic, const char *file, int lineno, const char *func) +{ + return stasis_message_router_create_internal(topic, 0, file, lineno, func); +} +#else struct stasis_message_router *stasis_message_router_create( struct stasis_topic *topic) { return stasis_message_router_create_internal(topic, 0); } +#endif +#ifdef AST_DEVMODE +struct stasis_message_router *__stasis_message_router_create_pool( + struct stasis_topic *topic, const char *file, int lineno, const char *func) +{ + return stasis_message_router_create_internal(topic, 1, file, lineno, func); +} +#else struct stasis_message_router *stasis_message_router_create_pool( struct stasis_topic *topic) { return stasis_message_router_create_internal(topic, 1); } +#endif void stasis_message_router_unsubscribe(struct stasis_message_router *router) {