From 30d568ddec3390ea00775dd82b97fa18b8c6bf44 Mon Sep 17 00:00:00 2001 From: sungtae kim <sungtae@messagebird.com> Date: Sat, 26 Jan 2019 22:51:48 +0100 Subject: [PATCH] stasis.c: Added topic_all container Added topic_all container for centralizing the topic. This makes more easier to managing the topics. Added cli commands. stasis show topics : It shows all registered topics. stasis show topic <name> : It shows speicifed topic's detail info. ASTERISK-28264 Change-Id: Ie86d125d2966f93de74ee00f47ae6fbc8c081c5f --- include/asterisk/stasis.h | 43 +++++- main/stasis.c | 317 ++++++++++++++++++++++++++++++++++++-- 2 files changed, 347 insertions(+), 13 deletions(-) diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 0b229bf7fd..8e9c6c7be9 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -519,15 +519,56 @@ struct stasis_topic; */ struct stasis_topic *stasis_topic_create(const char *name); +/*! + * \brief Create a new topic with given detail. + * \param name Name of the new topic. + * \param detail Detail description of the new topic. i.e. "Queue main topic for subscribing every queue event" + * \return New topic instance. + * \return \c NULL on error. + * + * \note There is no explicit ability to unsubscribe all subscribers + * from a topic and destroy it. As a result the topic can persist until + * the last subscriber unsubscribes itself even if there is no + * publisher. + */ +struct stasis_topic *stasis_topic_create_with_detail( + const char *name, const char *detail); + +/*! + * \brief Get a topic of the given name. + * \param name Topic's name. + * \return Name of the topic. + * \return \c NULL on error or not exist. + * + * \note This SHOULD NOT be used in normal operation for publishing messages. + */ +struct stasis_topic *stasis_topic_get(const char *name); + +/*! + * \brief Return the uniqueid of a topic. + * \param topic Topic. + * \return Uniqueid of the topic. + * \return \c NULL if topic is \c NULL. + */ +const char *stasis_topic_uniqueid(const struct stasis_topic *topic); + /*! * \brief Return the name of a topic. * \param topic Topic. * \return Name of the topic. * \return \c NULL if topic is \c NULL. - * \since 12 */ const char *stasis_topic_name(const struct stasis_topic *topic); +/*! + * \brief Return the detail of a topic. + * \param topic Topic. + * \return Detail of the topic. + * \return \c NULL if topic is \c NULL. + * \since 12 + */ +const char *stasis_topic_detail(const struct stasis_topic *topic); + /*! * \brief Return the number of subscribers of a topic. * \param topic Topic. diff --git a/main/stasis.c b/main/stasis.c index 7dd3893d08..4ce7052332 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -41,9 +41,7 @@ #include "asterisk/stasis_bridges.h" #include "asterisk/stasis_endpoints.h" #include "asterisk/config_options.h" -#ifdef AST_DEVMODE #include "asterisk/cli.h" -#endif /*** DOCUMENTATION <managerEvent language="en_US" name="UserEvent"> @@ -307,6 +305,16 @@ static struct ast_threadpool *pool; STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type); +#if defined(LOW_MEMORY) + +#define TOPIC_ALL_BUCKETS 257 + +#else + +#define TOPIC_ALL_BUCKETS 997 + +#endif + #ifdef AST_DEVMODE /*! The number of buckets to use for topic statistics */ @@ -372,9 +380,37 @@ struct stasis_topic { int subscriber_id; /*! Name of the topic */ - char name[0]; + char *name; + + /*! Detail of the topic */ + char *detail; + + /*! Creation time */ + struct timeval *creationtime; }; +struct ao2_container *topic_all; + +struct topic_proxy { + AO2_WEAKPROXY(); + + char *name; + char *detail; + + struct timeval creationtime; + + char buf[0]; +}; + +AO2_STRING_FIELD_HASH_FN(topic_proxy, name); +AO2_STRING_FIELD_CMP_FN(topic_proxy, name); +AO2_STRING_FIELD_CASE_SORT_FN(topic_proxy, name); + +static void proxy_dtor(void *weakproxy, void *data) +{ + ao2_unlink(topic_all, weakproxy); +} + /* Forward declarations for the tightly-coupled subscription object */ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub); @@ -394,6 +430,9 @@ static void topic_dtor(void *obj) { struct stasis_topic *topic = obj; + ast_debug(2, "Destroying topic. name: %s, detail: %s\n", + topic->name, topic->detail); + /* Subscribers hold a reference to topics, so they should all be * unsubscribed before we get here. */ ast_assert(AST_VECTOR_SIZE(&topic->subscribers) == 0); @@ -442,40 +481,145 @@ static struct stasis_topic_statistics *stasis_topic_statistics_create(struct sta } #endif -struct stasis_topic *stasis_topic_create(const char *name) +static int link_topic_proxy(struct stasis_topic *topic, const char *name, const char *detail) +{ + struct topic_proxy *proxy; + struct stasis_topic* topic_tmp; + + if (!topic || !name || !strlen(name) || !detail) { + return -1; + } + + ao2_wrlock(topic_all); + + topic_tmp = stasis_topic_get(name); + if (topic_tmp) { + ast_log(LOG_ERROR, "The same topic is already exist. name: %s\n", name); + ao2_ref(topic_tmp, -1); + ao2_unlock(topic_all); + + return -1; + } + + proxy = ao2_t_weakproxy_alloc( + sizeof(*proxy) + strlen(name) + 1 + strlen(detail) + 1, NULL, topic->name); + if (!proxy) { + ao2_unlock(topic_all); + + return -1; + } + + /* set the proxy info */ + proxy->name = proxy->buf; + proxy->detail = proxy->name + strlen(name) + 1; + + strcpy(proxy->name, name); /* SAFE */ + strcpy(proxy->detail, detail); /* SAFE */ + proxy->creationtime = ast_tvnow(); + + /* We have exclusive access to proxy, no need for locking here. */ + if (ao2_t_weakproxy_set_object(proxy, topic, OBJ_NOLOCK, "weakproxy link")) { + ao2_cleanup(proxy); + ao2_unlock(topic_all); + + return -1; + } + + if (ao2_weakproxy_subscribe(proxy, proxy_dtor, NULL, OBJ_NOLOCK)) { + ao2_cleanup(proxy); + ao2_unlock(topic_all); + + return -1; + } + + /* setting the topic point to the proxy */ + topic->name = proxy->name; + topic->detail = proxy->detail; + topic->creationtime = &(proxy->creationtime); + + ao2_link_flags(topic_all, proxy, OBJ_NOLOCK); + ao2_ref(proxy, -1); + + ao2_unlock(topic_all); + + return 0; +} + +struct stasis_topic *stasis_topic_create_with_detail( + const char *name, const char* detail + ) { struct stasis_topic *topic; int res = 0; - topic = ao2_t_alloc(sizeof(*topic) + strlen(name) + 1, topic_dtor, name); + if (!name|| !strlen(name) || !detail) { + return NULL; + } + ast_debug(2, "Creating topic. name: %s, detail: %s\n", name, detail); + + topic = stasis_topic_get(name); + if (topic) { + ast_debug(2, "Topic is already exist. name: %s, detail: %s\n", + name, detail); + return topic; + } + + topic = ao2_t_alloc(sizeof(*topic), topic_dtor, name); if (!topic) { return NULL; } - strcpy(topic->name, name); /* SAFE */ res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX); res |= AST_VECTOR_INIT(&topic->upstream_topics, 0); - ast_debug(1, "Topic '%s': %p created\n", topic->name, topic); + if (res) { + ao2_ref(topic, -1); + return NULL; + } + + /* link to the proxy */ + if (link_topic_proxy(topic, name, detail)) { + ao2_ref(topic, -1); + return NULL; + } #ifdef AST_DEVMODE topic->statistics = stasis_topic_statistics_create(topic); - if (!topic->name || !topic->statistics || res) -#else - if (!topic->name || res) -#endif - { + if (!topic->statistics) { ao2_ref(topic, -1); return NULL; } +#endif + ast_debug(1, "Topic '%s': %p created\n", topic->name, topic); return topic; } +struct stasis_topic *stasis_topic_create(const char *name) +{ + return stasis_topic_create_with_detail(name, ""); +} + +struct stasis_topic *stasis_topic_get(const char *name) +{ + return ao2_weakproxy_find(topic_all, name, OBJ_SEARCH_KEY, ""); +} + const char *stasis_topic_name(const struct stasis_topic *topic) { + if (!topic) { + return NULL; + } return topic->name; } +const char *stasis_topic_detail(const struct stasis_topic *topic) +{ + if (!topic) { + return NULL; + } + return topic->detail; +} + size_t stasis_topic_subscribers(const struct stasis_topic *topic) { return AST_VECTOR_SIZE(&topic->subscribers); @@ -2134,6 +2278,142 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type, /*! @} */ +/*! + * \internal + * \brief CLI command implementation for 'stasis show topics' + */ +static char *stasis_show_topics(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + struct ao2_iterator iter; + struct topic_proxy *topic; + struct ao2_container *tmp_container; + int count = 0; +#define FMT_HEADERS "%-64s %-64s\n" +#define FMT_FIELDS "%-64s %-64s\n" + + switch (cmd) { + case CLI_INIT: + e->command = "stasis show topics"; + e->usage = + "Usage: stasis show topics\n" + " Shows a list of topics\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + + if (a->argc != e->args) { + return CLI_SHOWUSAGE; + } + + ast_cli(a->fd, "\n" FMT_HEADERS, "Name", "Detail"); + + tmp_container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, + topic_proxy_sort_fn, NULL); + + if (!tmp_container || ao2_container_dup(tmp_container, topic_all, OBJ_SEARCH_OBJECT)) { + ao2_cleanup(tmp_container); + + return NULL; + } + + /* getting all topic in order */ + iter = ao2_iterator_init(tmp_container, AO2_ITERATOR_UNLINK); + while ((topic = ao2_iterator_next(&iter))) { + ast_cli(a->fd, FMT_FIELDS, topic->name, topic->detail); + ao2_ref(topic, -1); + ++count; + } + ao2_iterator_destroy(&iter); + ao2_cleanup(tmp_container); + + ast_cli(a->fd, "\n%d Total topics\n\n", count); + +#undef FMT_HEADERS +#undef FMT_FIELDS + + return CLI_SUCCESS; +} + +/*! + * \internal + * \brief CLI tab completion for topic names + */ +static char *topic_complete_name(const char *word) +{ + struct topic_proxy *topic; + struct ao2_iterator it; + int wordlen = strlen(word); + int ret; + + it = ao2_iterator_init(topic_all, 0); + while ((topic = ao2_iterator_next(&it))) { + if (!strncasecmp(word, topic->name, wordlen)) { + ret = ast_cli_completion_add(ast_strdup(topic->name)); + if (ret) { + ao2_ref(topic, -1); + break; + } + } + ao2_ref(topic, -1); + } + ao2_iterator_destroy(&it); + return NULL; +} + +/*! + * \internal + * \brief CLI command implementation for 'stasis show topic' + */ +static char *stasis_show_topic(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + struct stasis_topic *topic; + char print_time[32]; + + switch (cmd) { + case CLI_INIT: + e->command = "stasis show topic"; + e->usage = + "Usage: stasis show topic <name>\n" + " Show stasis topic detail info.\n"; + return NULL; + case CLI_GENERATE: + if (a->pos == 3) { + return topic_complete_name(a->word); + } else { + return NULL; + } + } + + if (a->argc != 4) { + return CLI_SHOWUSAGE; + } + + topic = stasis_topic_get(a->argv[3]); + if (!topic) { + ast_cli(a->fd, "Specified topic '%s' does not exist\n", a->argv[3]); + return CLI_FAILURE; + } + + ast_cli(a->fd, "Name: %s\n", topic->name); + ast_cli(a->fd, "Detail: %s\n", topic->detail); + ast_cli(a->fd, "Subscribers count: %lu\n", AST_VECTOR_SIZE(&topic->subscribers)); + ast_cli(a->fd, "Forwarding topic count: %lu\n", AST_VECTOR_SIZE(&topic->upstream_topics)); + ast_format_duration_hh_mm_ss(ast_tvnow().tv_sec - topic->creationtime->tv_sec, print_time, sizeof(print_time)); + ast_cli(a->fd, "Duration time: %s\n", print_time); + + ao2_ref(topic, -1); + + return CLI_SUCCESS; +} + + +static struct ast_cli_entry cli_stasis[] = { + AST_CLI_DEFINE(stasis_show_topics, "Show all topics"), + AST_CLI_DEFINE(stasis_show_topic, "Show topic"), +}; + + #ifdef AST_DEVMODE AO2_STRING_FIELD_SORT_FN(stasis_subscription_statistics, uniqueid); @@ -2646,6 +2926,9 @@ static void stasis_cleanup(void) ao2_cleanup(subscription_statistics); ao2_cleanup(topic_statistics); #endif + ast_cli_unregister_multiple(cli_stasis, ARRAY_LEN(cli_stasis)); + ao2_cleanup(topic_all); + topic_all = NULL; ast_threadpool_shutdown(pool); pool = NULL; STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type); @@ -2740,6 +3023,16 @@ int stasis_init(void) return -1; } + topic_all = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TOPIC_ALL_BUCKETS, + topic_proxy_hash_fn, 0, topic_proxy_cmp_fn); + if (!topic_all) { + return -1; + } + + if (ast_cli_register_multiple(cli_stasis, ARRAY_LEN(cli_stasis))) { + return -1; + } + #ifdef AST_DEVMODE /* Statistics information is stored separately so that we don't alter or interrupt the lifetime of the underlying * topic or subscripton. -- GitLab