diff --git a/UPGRADE.txt b/UPGRADE.txt index f00983176f1f6f1e66e002ed093ceb6d870fab1c..9720bf0d0c46748eb3d11e2b0273444248d9a10e 100644 --- a/UPGRADE.txt +++ b/UPGRADE.txt @@ -23,6 +23,13 @@ From 13.0.0 to 13.1.0: +Core: + - The core of Asterisk uses a message bus called "Stasis" to distribute + information to internal components. For performance reasons, the message + distribution was modified to make use of a thread pool instead of a + dedicated thread per consumer in certain cases. The initial settings for + the thread pool can now be configured in 'stasis.conf'. + PJSIP: - Added the pjsip.conf system type disable_tcp_switch option. The option allows the user to disable switching from UDP to TCP transports described diff --git a/apps/app_queue.c b/apps/app_queue.c index 29cef378bc3ee54fff2a5a4981e79dca6ca6d7da..185b2d43ad5c46d8b4b36ba492a8f7b16ebd7fc8 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -6056,7 +6056,7 @@ static int setup_stasis_subs(struct queue_ent *qe, struct ast_channel *peer, str return -1; } - queue_data->bridge_router = stasis_message_router_create(ast_bridge_topic_all()); + queue_data->bridge_router = stasis_message_router_create_pool(ast_bridge_topic_all()); if (!queue_data->bridge_router) { ao2_ref(queue_data, -1); return -1; @@ -6071,7 +6071,7 @@ static int setup_stasis_subs(struct queue_ent *qe, struct ast_channel *peer, str stasis_message_router_set_default(queue_data->bridge_router, queue_bridge_cb, queue_data); - queue_data->channel_router = stasis_message_router_create(ast_channel_topic_all()); + queue_data->channel_router = stasis_message_router_create_pool(ast_channel_topic_all()); if (!queue_data->channel_router) { /* Unsubscribing from the bridge router will remove the only ref of queue_data, * thus beginning the destruction process diff --git a/channels/chan_dahdi.c b/channels/chan_dahdi.c index 9ccec641f64881a8fb2de205da63ece662b4f28d..8ef0cec3020777948ab06c3665a5871b7ca3cb75 100644 --- a/channels/chan_dahdi.c +++ b/channels/chan_dahdi.c @@ -12581,7 +12581,7 @@ static struct dahdi_pvt *mkintf(int channel, const struct dahdi_chan_conf *conf, mailbox_specific_topic = ast_mwi_topic(tmp->mailbox); if (mailbox_specific_topic) { - tmp->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL); + tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL); } } #ifdef HAVE_DAHDI_LINEREVERSE_VMWI diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index d093438c8112b9dbded5718b2a393ac91ff32437..39861db0e3013a5226bfabab3b816dc6bd015a20 100644 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -13096,7 +13096,7 @@ static struct iax2_peer *build_peer(const char *name, struct ast_variable *v, st mailbox_specific_topic = ast_mwi_topic(peer->mailbox); if (mailbox_specific_topic) { - peer->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL); + peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL); } } diff --git a/channels/chan_mgcp.c b/channels/chan_mgcp.c index 53d7b42e5ce290bffcc28b5831eb9fe48b65543c..08c4dc2e20fb0e138ffa080782f8dd91fadfbbde 100644 --- a/channels/chan_mgcp.c +++ b/channels/chan_mgcp.c @@ -4237,7 +4237,7 @@ static struct mgcp_gateway *build_gateway(char *cat, struct ast_variable *v) mailbox_specific_topic = ast_mwi_topic(e->mailbox); if (mailbox_specific_topic) { - e->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL); + e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, NULL); } } snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random()); diff --git a/channels/chan_sip.c b/channels/chan_sip.c index ea20f8d6d27a355e4fa1673e587f819abc42bb4d..ab03580ad969de1036301b55ca7971c12e66e24c 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -27245,7 +27245,7 @@ static void add_peer_mwi_subs(struct sip_peer *peer) if (!peer_name) { return; } - mailbox->event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, peer_name); + mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer_name); } } } diff --git a/channels/chan_skinny.c b/channels/chan_skinny.c index 9c074ec6e99d23802390bbf6379d44c80d04fa34..9269fbf6e2bf734aebcb5902ac9b2f594c883fa5 100644 --- a/channels/chan_skinny.c +++ b/channels/chan_skinny.c @@ -8295,7 +8295,7 @@ static struct skinny_line *config_line(const char *lname, struct ast_variable *v mailbox_specific_topic = ast_mwi_topic(l->mailbox); if (mailbox_specific_topic) { - l->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, l); + l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l); } } diff --git a/channels/sig_pri.c b/channels/sig_pri.c index e9e17322f2b42f9cfa2fbe257001af9ce640ea1d..a26b5661142edd788c229b76dc29c88b2af8da5e 100644 --- a/channels/sig_pri.c +++ b/channels/sig_pri.c @@ -9174,7 +9174,7 @@ int sig_pri_start_pri(struct sig_pri_span *pri) mailbox_specific_topic = ast_mwi_topic(mbox_id); if (mailbox_specific_topic) { - pri->mbox[i].sub = stasis_subscribe(mailbox_specific_topic, sig_pri_mwi_event_cb, pri); + pri->mbox[i].sub = stasis_subscribe_pool(mailbox_specific_topic, sig_pri_mwi_event_cb, pri); } if (!pri->mbox[i].sub) { ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n", diff --git a/configs/samples/stasis.conf.sample b/configs/samples/stasis.conf.sample index 3aac230cbb7580b5fb9441e089d25d0487935872..e591e7637fd651b0146cfa95574c3ad695288c2e 100644 --- a/configs/samples/stasis.conf.sample +++ b/configs/samples/stasis.conf.sample @@ -1,3 +1,13 @@ +[threadpool] +;initial_size = 5 ; Initial size of the threadpool. +; ; 0 means the threadpool has no threads initially +; ; until a task needs a thread. +;idle_timeout_sec = 20 ; Number of seconds a thread should be idle before +; ; dying. 0 means threads never time out. +;max_size = 50 ; Maximum number of threads in the Stasis threadpool. +; ; 0 means no limit to the number of threads in the +; ; threadpool. + [declined_message_types] ; This config section contains the names of message types that should be prevented ; from being created. By default, all message types are allowed to be created. diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 4189513ac4b7076eed9930cf8338b965cbcf748f..0b1b1e83f88ca01f5969af5c7993a794d0816af2 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -541,6 +541,31 @@ typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *s struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data); +/*! + * \brief Create a subscription whose callbacks occur on a thread pool + * + * In addition to being AO2 managed memory (requiring an ao2_cleanup() to free + * up this reference), the subscription must be explicitly unsubscribed from its + * topic using stasis_unsubscribe(). + * + * The invocations of the callback are serialized, but will almost certainly not + * always happen on the same thread. The invocation order of different subscriptions + * is unspecified. + * + * Unlike \ref stasis_subscribe, this function will explicitly use a threadpool to + * dispatch items to its \c callback. This form of subscription should be used + * when many subscriptions may be made to the specified \c topic. + * + * \param topic Topic to subscribe to. + * \param callback Callback function for subscription messages. + * \param data Data to be passed to the callback, in addition to the message. + * \return New \ref stasis_subscription object. + * \return \c NULL on error. + * \since 12.8.0 + */ +struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic, + stasis_subscription_cb callback, void *data); + /*! * \brief Cancel a subscription. * diff --git a/include/asterisk/stasis_internal.h b/include/asterisk/stasis_internal.h index bb7b6cc0a2afbeafeb097a22814f6417ab802ae1..bc6122c2b2038ac9fd4842d5bbde1cdbfb87838f 100644 --- a/include/asterisk/stasis_internal.h +++ b/include/asterisk/stasis_internal.h @@ -52,8 +52,10 @@ * \param callback Callback function for subscription messages. * \param data Data to be passed to the callback, in addition to the message. * \param needs_mailbox Determines whether or not the subscription requires a mailbox. - * Subscriptions with mailboxes will be delivered on a thread in the Stasis threadpool; + * Subscriptions with mailboxes will be delivered on some non-publisher thread; * subscriptions without mailboxes will be delivered on the publisher thread. + * \param use_thread_pool Use the thread pool for the subscription. This is only + * relevant if \c needs_mailbox is non-zero. * \return New \ref stasis_subscription object. * \return \c NULL on error. * \since 12 @@ -62,6 +64,7 @@ struct stasis_subscription *internal_stasis_subscribe( struct stasis_topic *topic, stasis_subscription_cb callback, void *data, - int needs_mailbox); + int needs_mailbox, + int use_thread_pool); #endif /* STASIS_INTERNAL_H_ */ diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index 613a2bd7f0469a2a98f77084898ea3a2362eb514..89657a5ee5eddd10dfe653cd8c72162af18e1093 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -58,6 +58,22 @@ struct stasis_message_router; struct stasis_message_router *stasis_message_router_create( struct stasis_topic *topic); +/*! + * \brief Create a new message router object. + * + * The subscription created for this message router will dispatch + * callbacks on a thread pool. + * + * \param topic Topic to subscribe route to. + * + * \return New \ref stasis_message_router. + * \return \c NULL on error. + * + * \since 12.8.0 + */ +struct stasis_message_router *stasis_message_router_create_pool( + struct stasis_topic *topic); + /*! * \brief Unsubscribe the router from the upstream topic. * diff --git a/main/endpoints.c b/main/endpoints.c index cc2eccc705ee6b1eff3e3d8cc78b269b1d90b2dc..f8cca45b89f82a907bf83e20cef8300793b27e8e 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -310,7 +310,7 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha } if (!ast_strlen_zero(resource)) { - endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint)); + endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint)); if (!endpoint->router) { return NULL; } diff --git a/main/stasis.c b/main/stasis.c index b85135a5b828e5c82cb72ae029859cd753a64990..dbb6e4c12820df77384697be54836ec0d9168776 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -35,6 +35,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); #include "asterisk/stasis_internal.h" #include "asterisk/stasis.h" #include "asterisk/taskprocessor.h" +#include "asterisk/threadpool.h" #include "asterisk/utils.h" #include "asterisk/uuid.h" #include "asterisk/vector.h" @@ -63,6 +64,18 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); </managerEvent> <configInfo name="stasis" language="en_US"> <configFile name="stasis.conf"> + <configObject name="threadpool"> + <synopsis>Settings that configure the threadpool Stasis uses to deliver some messages.</synopsis> + <configOption name="initial_size" default="5"> + <synopsis>Initial number of threads in the message bus threadpool.</synopsis> + </configOption> + <configOption name="idle_timeout_sec" default="20"> + <synopsis>Number of seconds before an idle thread is disposed of.</synopsis> + </configOption> + <configOption name="max_size" default="50"> + <synopsis>Maximum number of threads in the threadpool.</synopsis> + </configOption> + </configObject> <configObject name="declined_message_types"> <synopsis>Stasis message types for which to decline creation.</synopsis> <configOption name="decline"> @@ -287,6 +300,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$"); /*! The number of buckets to use for topic pools */ #define TOPIC_POOL_BUCKETS 57 +/*! Thread pool for topics that don't want a dedicated taskprocessor */ +static struct ast_threadpool *pool; + STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type); /*! \internal */ @@ -432,7 +448,8 @@ struct stasis_subscription *internal_stasis_subscribe( struct stasis_topic *topic, stasis_subscription_cb callback, void *data, - int needs_mailbox) + int needs_mailbox, + int use_thread_pool) { RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup); @@ -445,19 +462,19 @@ struct stasis_subscription *internal_stasis_subscribe( if (!sub) { return NULL; } - ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid)); if (needs_mailbox) { /* With a small number of subscribers, a thread-per-sub is - * acceptable. If our usage changes so that we have larger - * numbers of subscribers, we'll probably want to consider - * a threadpool. We had that originally, but with so few - * subscribers it was actually a performance loss instead of - * a gain. + * acceptable. For larger number of subscribers, a thread + * pool should be used. */ - sub->mailbox = ast_taskprocessor_get(sub->uniqueid, - TPS_REF_DEFAULT); + if (use_thread_pool) { + sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool); + } else { + sub->mailbox = ast_taskprocessor_get(sub->uniqueid, + TPS_REF_DEFAULT); + } if (!sub->mailbox) { return NULL; } @@ -486,7 +503,15 @@ struct stasis_subscription *stasis_subscribe( stasis_subscription_cb callback, void *data) { - return internal_stasis_subscribe(topic, callback, data, 1); + return internal_stasis_subscribe(topic, callback, data, 1, 0); +} + +struct stasis_subscription *stasis_subscribe_pool( + struct stasis_topic *topic, + stasis_subscription_cb callback, + void *data) +{ + return internal_stasis_subscribe(topic, callback, data, 1, 1); } static int sub_cleanup(void *data) @@ -1365,11 +1390,33 @@ struct stasis_declined_config { struct ao2_container *declined; }; +/*! \brief Threadpool configuration options */ +struct stasis_threadpool_conf { + /*! Initial size of the thread pool */ + int initial_size; + /*! Time, in seconds, before we expire a thread */ + int idle_timeout_sec; + /*! Maximum number of thread to allow */ + int max_size; +}; struct stasis_config { + /*! Thread pool configuration options */ + struct stasis_threadpool_conf *threadpool_options; + /*! Declined message types */ struct stasis_declined_config *declined_message_types; }; +static struct aco_type threadpool_option = { + .type = ACO_GLOBAL, + .name = "threadpool", + .item_offset = offsetof(struct stasis_config, threadpool_options), + .category = "^threadpool$", + .category_match = ACO_WHITELIST, +}; + +static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option); + /*! \brief An aco_type structure to link the "declined_message_types" category to the stasis_declined_config type */ static struct aco_type declined_option = { .type = ACO_GLOBAL, @@ -1383,7 +1430,7 @@ struct aco_type *declined_options[] = ACO_TYPES(&declined_option); struct aco_file stasis_conf = { .filename = "stasis.conf", - .types = ACO_TYPES(&declined_option), + .types = ACO_TYPES(&declined_option, &threadpool_option), }; /*! \brief A global object container that will contain the stasis_config that gets swapped out on reloads */ @@ -1399,13 +1446,16 @@ CONFIG_INFO_CORE("stasis", cfg_info, globals, stasis_config_alloc, static void stasis_declined_config_destructor(void *obj) { struct stasis_declined_config *declined = obj; + ao2_cleanup(declined->declined); } static void stasis_config_destructor(void *obj) { struct stasis_config *cfg = obj; + ao2_cleanup(cfg->declined_message_types); + ast_free(cfg->threadpool_options); } static void *stasis_config_alloc(void) @@ -1416,21 +1466,26 @@ static void *stasis_config_alloc(void) return NULL; } - /* Allocate/initialize memory */ - cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types), stasis_declined_config_destructor); + cfg->threadpool_options = ast_calloc(1, sizeof(*cfg->threadpool_options)); + if (!cfg->threadpool_options) { + ao2_ref(cfg, -1); + return NULL; + } + + cfg->declined_message_types = ao2_alloc(sizeof(*cfg->declined_message_types), + stasis_declined_config_destructor); if (!cfg->declined_message_types) { - goto error; + ao2_ref(cfg, -1); + return NULL; } cfg->declined_message_types->declined = ast_str_container_alloc(13); if (!cfg->declined_message_types->declined) { - goto error; + ao2_ref(cfg, -1); + return NULL; } return cfg; -error: - ao2_ref(cfg, -1); - return NULL; } int stasis_message_type_declined(const char *name) @@ -1478,6 +1533,13 @@ STASIS_MESSAGE_TYPE_DEFN(ast_multi_user_event_type, /*! @} */ +/*! \brief Shutdown function */ +static void stasis_exit(void) +{ + ast_threadpool_shutdown(pool); + pool = NULL; +} + /*! \brief Cleanup function for graceful shutdowns */ static void stasis_cleanup(void) { @@ -1489,27 +1551,71 @@ static void stasis_cleanup(void) int stasis_init(void) { + RAII_VAR(struct stasis_config *, cfg, NULL, ao2_cleanup); int cache_init; + struct ast_threadpool_options threadpool_opts = { 0, }; /* Be sure the types are cleaned up after the message bus */ ast_register_cleanup(stasis_cleanup); + ast_register_atexit(stasis_exit); if (aco_info_init(&cfg_info)) { return -1; } - aco_option_register_custom(&cfg_info, "decline", ACO_EXACT, declined_options, "", declined_handler, 0); + aco_option_register_custom(&cfg_info, "decline", ACO_EXACT, + declined_options, "", declined_handler, 0); + aco_option_register(&cfg_info, "initial_size", ACO_EXACT, + threadpool_options, "5", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_threadpool_conf, initial_size), 0, + INT_MAX); + aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT, + threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0, + INT_MAX); + aco_option_register(&cfg_info, "max_size", ACO_EXACT, + threadpool_options, "50", OPT_INT_T, PARSE_IN_RANGE, + FLDSET(struct stasis_threadpool_conf, max_size), 0, + INT_MAX); if (aco_process_config(&cfg_info, 0) == ACO_PROCESS_ERROR) { - RAII_VAR(struct stasis_config *, stasis_cfg, stasis_config_alloc(), ao2_cleanup); + struct stasis_config *default_cfg = stasis_config_alloc(); + + if (!default_cfg) { + return -1; + } - if (aco_set_defaults(&declined_option, "declined_message_types", stasis_cfg->declined_message_types)) { + if (aco_set_defaults(&threadpool_option, "threadpool", default_cfg->threadpool_options)) { + ast_log(LOG_ERROR, "Failed to initialize defaults on Stasis configuration object\n"); + ao2_ref(default_cfg, -1); + return -1; + } + + if (aco_set_defaults(&declined_option, "declined_message_types", default_cfg->declined_message_types)) { ast_log(LOG_ERROR, "Failed to load stasis.conf and failed to initialize defaults.\n"); return -1; } - ast_log(LOG_NOTICE, "Could not load stasis config; using defaults\n"); - ao2_global_obj_replace_unref(globals, stasis_cfg); + ast_log(LOG_NOTICE, "Could not load Stasis configuration; using defaults\n"); + ao2_global_obj_replace_unref(globals, default_cfg); + cfg = default_cfg; + } else { + cfg = ao2_global_obj_ref(globals); + if (!cfg) { + ast_log(LOG_ERROR, "Failed to obtain Stasis configuration object\n"); + return -1; + } + } + + threadpool_opts.version = AST_THREADPOOL_OPTIONS_VERSION; + threadpool_opts.initial_size = cfg->threadpool_options->initial_size; + threadpool_opts.auto_increment = 1; + threadpool_opts.max_size = cfg->threadpool_options->max_size; + threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec; + pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts); + if (!pool) { + ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n"); + return -1; } cache_init = stasis_cache_init(); diff --git a/main/stasis_cache.c b/main/stasis_cache.c index c492307d63aef631cf15ca299b31e177f5fa0e39..9129c0064c9cfb5c93e99b5319e3037c6eaa3bd0 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -894,7 +894,7 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or ao2_ref(cache, +1); caching_topic->cache = cache; - sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0); + sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0); if (sub == NULL) { return NULL; } diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index da288e864cee214513eaf48e49a2688e4b38db44..a9e458456fcd3c27119bc0ccda677d9877534e02 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -206,8 +206,8 @@ static void router_dispatch(void *data, } } -struct stasis_message_router *stasis_message_router_create( - struct stasis_topic *topic) +static struct stasis_message_router *stasis_message_router_create_internal( + struct stasis_topic *topic, int use_thread_pool) { int res; RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup); @@ -224,7 +224,11 @@ struct stasis_message_router *stasis_message_router_create( return NULL; } - router->subscription = stasis_subscribe(topic, router_dispatch, router); + if (use_thread_pool) { + router->subscription = stasis_subscribe_pool(topic, router_dispatch, router); + } else { + router->subscription = stasis_subscribe(topic, router_dispatch, router); + } if (!router->subscription) { return NULL; } @@ -233,6 +237,18 @@ struct stasis_message_router *stasis_message_router_create( return router; } +struct stasis_message_router *stasis_message_router_create( + struct stasis_topic *topic) +{ + return stasis_message_router_create_internal(topic, 0); +} + +struct stasis_message_router *stasis_message_router_create_pool( + struct stasis_topic *topic) +{ + return stasis_message_router_create_internal(topic, 1); +} + void stasis_message_router_unsubscribe(struct stasis_message_router *router) { if (!router) { diff --git a/res/parking/parking_applications.c b/res/parking/parking_applications.c index 8bb57b62b2b709962faf3ea748e15a24565d8aed..c5214b36a336323d5808e22b2a1ee9a0267e0aae 100644 --- a/res/parking/parking_applications.c +++ b/res/parking/parking_applications.c @@ -832,7 +832,7 @@ static int park_and_announce_app_exec(struct ast_channel *chan, const char *data return -1; } - if (!(parking_subscription = stasis_subscribe(ast_parking_topic(), park_announce_update_cb, pa_data))) { + if (!(parking_subscription = stasis_subscribe_pool(ast_parking_topic(), park_announce_update_cb, pa_data))) { /* Failed to create subscription */ park_announce_subscription_data_destroy(pa_data); return -1; diff --git a/res/parking/parking_bridge_features.c b/res/parking/parking_bridge_features.c index 61cb85f008d45baab6d243ea3f1542abed83134b..a21be90687ec565a32c600a6ba4df5db11c4a499 100644 --- a/res/parking/parking_bridge_features.c +++ b/res/parking/parking_bridge_features.c @@ -192,7 +192,7 @@ static int create_parked_subscription_full(struct ast_channel *chan, const char strcpy(subscription_data->parkee_uuid, parkee_uuid); strcpy(subscription_data->parker_uuid, parker_uuid); - if (!(parked_datastore->parked_subscription = stasis_subscribe(ast_parking_topic(), parker_update_cb, subscription_data))) { + if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) { return -1; } diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c index eaf0f32afcc4c6d5b34a6d10d30a638cdcd9e8de..bf0925dd480e0c678bdbf60a3713474237377d83 100644 --- a/res/res_pjsip_mwi.c +++ b/res/res_pjsip_mwi.c @@ -138,7 +138,7 @@ static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char strcpy(mwi_stasis_sub->mailbox, mailbox); ao2_ref(mwi_sub, +1); ast_debug(3, "Creating stasis MWI subscription to mailbox %s for endpoint %s\n", mailbox, mwi_sub->id); - mwi_stasis_sub->stasis_sub = stasis_subscribe(topic, mwi_stasis_cb, mwi_sub); + mwi_stasis_sub->stasis_sub = stasis_subscribe_pool(topic, mwi_stasis_cb, mwi_sub); return mwi_stasis_sub; } diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c index 344bda3cd5a68bb468f4c0a22643f3cdd4420bc2..02deeb668e43f9fe3dbb32da717dd6f390ed5388 100644 --- a/res/res_pjsip_pubsub.c +++ b/res/res_pjsip_pubsub.c @@ -4257,7 +4257,7 @@ static int load_module(void) if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { ast_sip_push_task(NULL, subscription_persistence_load, NULL); } else { - stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL); + stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL); } ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM, diff --git a/res/res_pjsip_refer.c b/res/res_pjsip_refer.c index 99d43fd2f0d127056311fd423a655084d52e2719..7b8c53761970f57d6395ac9ecda3cce21ffab93c 100644 --- a/res/res_pjsip_refer.c +++ b/res/res_pjsip_refer.c @@ -550,7 +550,7 @@ static void refer_blind_callback(struct ast_channel *chan, struct transfer_chann /* We also will need to detect if the transferee enters a bridge. This is currently the only reliable way to * detect if the transfer target has answered the call */ - refer->progress->bridge_sub = stasis_subscribe(ast_bridge_topic_all(), refer_progress_bridge, refer->progress); + refer->progress->bridge_sub = stasis_subscribe_pool(ast_bridge_topic_all(), refer_progress_bridge, refer->progress); if (!refer->progress->bridge_sub) { struct refer_progress_notification *notification = refer_progress_notification_alloc(refer->progress, 200, PJSIP_EVSUB_STATE_TERMINATED); diff --git a/res/res_stasis_device_state.c b/res/res_stasis_device_state.c index 40219c007cd59ee60603b751edfe069def881aeb..8a1c230491864689ef70f8c84b0965666f754cf4 100644 --- a/res/res_stasis_device_state.c +++ b/res/res_stasis_device_state.c @@ -330,7 +330,7 @@ static int subscribe_device_state(struct stasis_app *app, void *obj) return 0; } - if (!(sub->sub = stasis_subscribe( + if (!(sub->sub = stasis_subscribe_pool( ast_device_state_topic(sub->device_name), device_state_cb, sub))) { ast_log(LOG_ERROR, "Unable to subscribe to device %s\n", diff --git a/res/res_xmpp.c b/res/res_xmpp.c index 3cb6fc572cdd62dd5379f56f0df30940d09f9b40..e3eff9390b3d66fdd706fd48548e93bc1c470f95 100644 --- a/res/res_xmpp.c +++ b/res/res_xmpp.c @@ -1606,7 +1606,7 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client) xmpp_pubsub_unsubscribe(client, "device_state"); xmpp_pubsub_unsubscribe(client, "message_waiting"); - if (!(client->mwi_sub = stasis_subscribe(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) { + if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) { return; } diff --git a/tests/test_stasis.c b/tests/test_stasis.c index ba82e83adb06aa625d949bbb9834071d252f8ceb..2e83e3b7096eecbd0b91d9e47b170ebdd442858a 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -361,6 +361,61 @@ AST_TEST_DEFINE(subscription_messages) return AST_TEST_PASS; } +AST_TEST_DEFINE(subscription_pool_messages) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + RAII_VAR(char *, expected_uniqueid, NULL, ast_free); + int complete; + struct stasis_subscription_change *change; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test subscribe/unsubscribe messages using a threadpool subscription"; + info->description = "Test subscribe/unsubscribe messages using a threadpool subscription"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer = consumer_create(0); + ast_test_validate(test, NULL != consumer); + + uut = stasis_subscribe_pool(topic, consumer_exec, consumer); + ast_test_validate(test, NULL != uut); + ao2_ref(consumer, +1); + expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut)); + + uut = stasis_unsubscribe(uut); + complete = consumer_wait_for_completion(consumer); + ast_test_validate(test, 1 == complete); + + ast_test_validate(test, 2 == consumer->messages_rxed_len); + ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0])); + ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1])); + + change = stasis_message_data(consumer->messages_rxed[0]); + ast_test_validate(test, topic == change->topic); + ast_test_validate(test, 0 == strcmp("Subscribe", change->description)); + ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid)); + + change = stasis_message_data(consumer->messages_rxed[1]); + ast_test_validate(test, topic == change->topic); + ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description)); + ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid)); + + return AST_TEST_PASS; +} + AST_TEST_DEFINE(publish) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); @@ -455,6 +510,55 @@ AST_TEST_DEFINE(publish_sync) return AST_TEST_PASS; } +AST_TEST_DEFINE(publish_pool) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + int actual_len; + const char *actual; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test publishing with a threadpool"; + info->description = "Test publishing to a subscriber whose\n" + "subscription dictates messages are received through a\n" + "threadpool."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + + uut = stasis_subscribe_pool(topic, consumer_exec, consumer); + ast_test_validate(test, NULL != uut); + ao2_ref(consumer, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message_type = stasis_message_type_create("TestMessage", NULL); + test_message = stasis_message_create(test_message_type, test_data); + + stasis_publish(topic, test_message); + + actual_len = consumer_wait_for(consumer, 1); + ast_test_validate(test, 1 == actual_len); + actual = stasis_message_data(consumer->messages_rxed[0]); + ast_test_validate(test, test_data == actual); + + return AST_TEST_PASS; +} + AST_TEST_DEFINE(unsubscribe_stops_messages) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); @@ -650,6 +754,106 @@ AST_TEST_DEFINE(interleaving) return AST_TEST_PASS; } +AST_TEST_DEFINE(subscription_interleaving) +{ + RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel); + RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel); + RAII_VAR(struct stasis_subscription *, sub1, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_subscription *, sub2, NULL, stasis_unsubscribe); + + RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup); + + int actual_len; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test sending interleaved events to a parent topic with different subscribers"; + info->description = "Test sending events to a parent topic.\n" + "This test creates three topics (one parent, two children)\n" + "and publishes messages alternately between the children.\n" + "It verifies that the messages are received in the expected\n" + "order, for different subscription types: one with a dedicated\n" + "thread, the other on the Stasis threadpool.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + test_message_type = stasis_message_type_create("test", NULL); + ast_test_validate(test, NULL != test_message_type); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + + test_message1 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message1); + test_message2 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message2); + test_message3 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message3); + + parent_topic = stasis_topic_create("ParentTestTopic"); + ast_test_validate(test, NULL != parent_topic); + topic1 = stasis_topic_create("Topic1"); + ast_test_validate(test, NULL != topic1); + topic2 = stasis_topic_create("Topic2"); + ast_test_validate(test, NULL != topic2); + + forward_sub1 = stasis_forward_all(topic1, parent_topic); + ast_test_validate(test, NULL != forward_sub1); + forward_sub2 = stasis_forward_all(topic2, parent_topic); + ast_test_validate(test, NULL != forward_sub2); + + consumer1 = consumer_create(1); + ast_test_validate(test, NULL != consumer1); + + consumer2 = consumer_create(1); + ast_test_validate(test, NULL != consumer2); + + sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1); + ast_test_validate(test, NULL != sub1); + ao2_ref(consumer1, +1); + + sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2); + ast_test_validate(test, NULL != sub2); + ao2_ref(consumer2, +1); + + stasis_publish(topic1, test_message1); + stasis_publish(topic2, test_message2); + stasis_publish(topic1, test_message3); + + actual_len = consumer_wait_for(consumer1, 3); + ast_test_validate(test, 3 == actual_len); + + actual_len = consumer_wait_for(consumer2, 3); + ast_test_validate(test, 3 == actual_len); + + ast_test_validate(test, test_message1 == consumer1->messages_rxed[0]); + ast_test_validate(test, test_message2 == consumer1->messages_rxed[1]); + ast_test_validate(test, test_message3 == consumer1->messages_rxed[2]); + + ast_test_validate(test, test_message1 == consumer2->messages_rxed[0]); + ast_test_validate(test, test_message2 == consumer2->messages_rxed[1]); + ast_test_validate(test, test_message3 == consumer2->messages_rxed[2]); + + return AST_TEST_PASS; +} + struct cache_test_data { char *id; char *value; @@ -1389,6 +1593,104 @@ AST_TEST_DEFINE(router) return AST_TEST_PASS; } +AST_TEST_DEFINE(router_pool) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup); + int actual_len, ret; + struct stasis_message *actual; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test message routing via threadpool"; + info->description = "Test simple message routing when\n" + "the subscriptions dictate usage of the Stasis\n" + "threadpool.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer1 = consumer_create(1); + ast_test_validate(test, NULL != consumer1); + consumer2 = consumer_create(1); + ast_test_validate(test, NULL != consumer2); + consumer3 = consumer_create(1); + ast_test_validate(test, NULL != consumer3); + + test_message_type1 = stasis_message_type_create("TestMessage1", NULL); + ast_test_validate(test, NULL != test_message_type1); + test_message_type2 = stasis_message_type_create("TestMessage2", NULL); + ast_test_validate(test, NULL != test_message_type2); + test_message_type3 = stasis_message_type_create("TestMessage3", NULL); + ast_test_validate(test, NULL != test_message_type3); + + uut = stasis_message_router_create_pool(topic); + ast_test_validate(test, NULL != uut); + + ret = stasis_message_router_add( + uut, test_message_type1, consumer_exec, consumer1); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer1, +1); + ret = stasis_message_router_add( + uut, test_message_type2, consumer_exec, consumer2); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer2, +1); + ret = stasis_message_router_set_default(uut, consumer_exec, consumer3); + ast_test_validate(test, 0 == ret); + ao2_ref(consumer3, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message1 = stasis_message_create(test_message_type1, test_data); + ast_test_validate(test, NULL != test_message1); + test_message2 = stasis_message_create(test_message_type2, test_data); + ast_test_validate(test, NULL != test_message2); + test_message3 = stasis_message_create(test_message_type3, test_data); + ast_test_validate(test, NULL != test_message3); + + stasis_publish(topic, test_message1); + stasis_publish(topic, test_message2); + stasis_publish(topic, test_message3); + + actual_len = consumer_wait_for(consumer1, 1); + ast_test_validate(test, 1 == actual_len); + actual_len = consumer_wait_for(consumer2, 1); + ast_test_validate(test, 1 == actual_len); + actual_len = consumer_wait_for(consumer3, 1); + ast_test_validate(test, 1 == actual_len); + + actual = consumer1->messages_rxed[0]; + ast_test_validate(test, test_message1 == actual); + + actual = consumer2->messages_rxed[0]; + ast_test_validate(test, test_message2 == actual); + + actual = consumer3->messages_rxed[0]; + ast_test_validate(test, test_message3 == actual); + + /* consumer1 and consumer2 do not get the final message. */ + ao2_cleanup(consumer1); + ao2_cleanup(consumer2); + + return AST_TEST_PASS; +} + static const char *cache_simple(struct stasis_message *message) { const char *type_name = @@ -1748,8 +2050,10 @@ static int unload_module(void) AST_TEST_UNREGISTER(message_type); AST_TEST_UNREGISTER(message); AST_TEST_UNREGISTER(subscription_messages); + AST_TEST_UNREGISTER(subscription_pool_messages); AST_TEST_UNREGISTER(publish); AST_TEST_UNREGISTER(publish_sync); + AST_TEST_UNREGISTER(publish_pool); AST_TEST_UNREGISTER(unsubscribe_stops_messages); AST_TEST_UNREGISTER(forward); AST_TEST_UNREGISTER(cache_filter); @@ -1757,8 +2061,10 @@ static int unload_module(void) AST_TEST_UNREGISTER(cache_dump); AST_TEST_UNREGISTER(cache_eid_aggregate); AST_TEST_UNREGISTER(router); + AST_TEST_UNREGISTER(router_pool); AST_TEST_UNREGISTER(router_cache_updates); AST_TEST_UNREGISTER(interleaving); + AST_TEST_UNREGISTER(subscription_interleaving); AST_TEST_UNREGISTER(no_to_json); AST_TEST_UNREGISTER(to_json); AST_TEST_UNREGISTER(no_to_ami); @@ -1773,8 +2079,10 @@ static int load_module(void) AST_TEST_REGISTER(message_type); AST_TEST_REGISTER(message); AST_TEST_REGISTER(subscription_messages); + AST_TEST_REGISTER(subscription_pool_messages); AST_TEST_REGISTER(publish); AST_TEST_REGISTER(publish_sync); + AST_TEST_REGISTER(publish_pool); AST_TEST_REGISTER(unsubscribe_stops_messages); AST_TEST_REGISTER(forward); AST_TEST_REGISTER(cache_filter); @@ -1782,8 +2090,10 @@ static int load_module(void) AST_TEST_REGISTER(cache_dump); AST_TEST_REGISTER(cache_eid_aggregate); AST_TEST_REGISTER(router); + AST_TEST_REGISTER(router_pool); AST_TEST_REGISTER(router_cache_updates); AST_TEST_REGISTER(interleaving); + AST_TEST_REGISTER(subscription_interleaving); AST_TEST_REGISTER(no_to_json); AST_TEST_REGISTER(to_json); AST_TEST_REGISTER(no_to_ami);