diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index f38e0b39a62e8263775a1f2e8e70bedc08170d1c..b26aba90d0d335f08d7a3847caca1b5b5602ca7f 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -2980,6 +2980,11 @@ const char *ast_sip_get_host_ip_string(int af); */ long ast_sip_threadpool_queue_size(void); +/*! + * \brief Retrieve the SIP threadpool object + */ +struct ast_threadpool *ast_sip_threadpool(void); + /*! * \brief Retrieve transport state * \since 13.7.1 diff --git a/res/res_pjsip.c b/res/res_pjsip.c index c6594708bb22766ef8ede59f95cd291c11b319bc..0dcbcea83b3800ea38cb99455e7d125115b8fbe8 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -34,6 +34,7 @@ #include "asterisk/utils.h" #include "asterisk/astobj2.h" #include "asterisk/module.h" +#include "asterisk/serializer.h" #include "asterisk/threadpool.h" #include "asterisk/taskprocessor.h" #include "asterisk/uuid.h" @@ -2840,7 +2841,7 @@ #define SERIALIZER_POOL_SIZE 8 /*! Pool of serializers to use if not supplied. */ -static struct ast_taskprocessor *serializer_pool[SERIALIZER_POOL_SIZE]; +static struct ast_serializer_pool *sip_serializer_pool; static pjsip_endpoint *ast_pjsip_endpoint; @@ -4626,71 +4627,10 @@ struct ast_taskprocessor *ast_sip_create_serializer(const char *name) return ast_sip_create_serializer_group(name, NULL); } -/*! - * \internal - * \brief Shutdown the serializers in the default pool. - * \since 14.0.0 - * - * \return Nothing - */ -static void serializer_pool_shutdown(void) -{ - int idx; - - for (idx = 0; idx < SERIALIZER_POOL_SIZE; ++idx) { - ast_taskprocessor_unreference(serializer_pool[idx]); - serializer_pool[idx] = NULL; - } -} - -/*! - * \internal - * \brief Setup the serializers in the default pool. - * \since 14.0.0 - * - * \retval 0 on success. - * \retval -1 on error. - */ -static int serializer_pool_setup(void) -{ - char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; - int idx; - - for (idx = 0; idx < SERIALIZER_POOL_SIZE; ++idx) { - /* Create name with seq number appended. */ - ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/default"); - - serializer_pool[idx] = ast_sip_create_serializer(tps_name); - if (!serializer_pool[idx]) { - serializer_pool_shutdown(); - return -1; - } - } - return 0; -} - -static struct ast_taskprocessor *serializer_pool_pick(void) -{ - int idx; - int pos = 0; - - if (!serializer_pool[0]) { - return NULL; - } - - for (idx = 1; idx < SERIALIZER_POOL_SIZE; ++idx) { - if (ast_taskprocessor_size(serializer_pool[idx]) < ast_taskprocessor_size(serializer_pool[pos])) { - pos = idx; - } - } - - return serializer_pool[pos]; -} - int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { if (!serializer) { - serializer = serializer_pool_pick(); + serializer = ast_serializer_pool_get(sip_serializer_pool); } return ast_taskprocessor_push(serializer, sip_task, task_data); @@ -4771,7 +4711,7 @@ int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int { if (!serializer) { /* Caller doesn't care which PJSIP serializer the task executes under. */ - serializer = serializer_pool_pick(); + serializer = ast_serializer_pool_get(sip_serializer_pool); if (!serializer) { /* No serializer picked to execute the task */ return -1; @@ -5133,6 +5073,11 @@ long ast_sip_threadpool_queue_size(void) return ast_threadpool_queue_size(sip_threadpool); } +struct ast_threadpool *ast_sip_threadpool(void) +{ + return sip_threadpool; +} + #ifdef TEST_FRAMEWORK AST_TEST_DEFINE(xml_sanitization_end_null) { @@ -5204,7 +5149,7 @@ static int unload_pjsip(void *data) * These calls need the pjsip endpoint and serializer to clean up. * If they're not set, then there's nothing to clean up anyway. */ - if (ast_pjsip_endpoint && serializer_pool[0]) { + if (ast_pjsip_endpoint && sip_serializer_pool) { ast_res_pjsip_cleanup_options_handling(); ast_res_pjsip_cleanup_message_filter(); ast_sip_destroy_distributor(); @@ -5340,7 +5285,9 @@ static int load_module(void) goto error; } - if (serializer_pool_setup()) { + sip_serializer_pool = ast_serializer_pool_create( + "pjsip/default", SERIALIZER_POOL_SIZE, sip_threadpool, -1); + if (!sip_serializer_pool) { ast_log(LOG_ERROR, "Failed to create SIP serializer pool. Aborting load\n"); goto error; } @@ -5413,7 +5360,7 @@ error: /* These functions all check for NULLs and are safe to call at any time */ ast_sip_destroy_scheduler(); - serializer_pool_shutdown(); + ast_serializer_pool_destroy(sip_serializer_pool); ast_threadpool_shutdown(sip_threadpool); return AST_MODULE_LOAD_DECLINE; @@ -5444,7 +5391,7 @@ static int unload_module(void) */ ast_sip_push_task_wait_servant(NULL, unload_pjsip, NULL); ast_sip_destroy_scheduler(); - serializer_pool_shutdown(); + ast_serializer_pool_destroy(sip_serializer_pool); ast_threadpool_shutdown(sip_threadpool); return 0; diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c index c89d383855a07bf32b89b0407a9e927656675fb6..d7749fab6468ce3b6cea751212abb4b1e180ddcc 100644 --- a/res/res_pjsip_mwi.c +++ b/res/res_pjsip_mwi.c @@ -36,13 +36,15 @@ #include "asterisk/logger.h" #include "asterisk/astobj2.h" #include "asterisk/taskprocessor.h" +#include "asterisk/serializer.h" #include "asterisk/sorcery.h" #include "asterisk/stasis.h" #include "asterisk/mwi.h" struct mwi_subscription; -static struct ao2_container *unsolicited_mwi; -static struct ao2_container *solicited_mwi; + +AO2_GLOBAL_OBJ_STATIC(mwi_unsolicited); +AO2_GLOBAL_OBJ_STATIC(mwi_solicited); static char *default_voicemail_extension; @@ -57,8 +59,11 @@ static char *default_voicemail_extension; /*! Number of serializers in pool if one not supplied. */ #define MWI_SERIALIZER_POOL_SIZE 8 +/*! Max timeout for all threads to join during an unload. */ +#define MAX_UNLOAD_TIMEOUT_TIME 10 /* Seconds */ + /*! Pool of serializers to use if not supplied. */ -static struct ast_taskprocessor *mwi_serializer_pool[MWI_SERIALIZER_POOL_SIZE]; +static struct ast_serializer_pool *mwi_serializer_pool; static void mwi_subscription_shutdown(struct ast_sip_subscription *sub); static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf); @@ -129,117 +134,6 @@ struct mwi_subscription { char id[1]; }; -/*! - * \internal - * \brief Shutdown the serializers in the mwi pool. - * \since 13.12.0 - * - * \return Nothing - */ -static void mwi_serializer_pool_shutdown(void) -{ - int idx; - - for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { - ast_taskprocessor_unreference(mwi_serializer_pool[idx]); - mwi_serializer_pool[idx] = NULL; - } -} - -/*! - * \internal - * \brief Setup the serializers in the mwi pool. - * \since 13.12.0 - * - * \retval 0 on success. - * \retval -1 on error. - */ -static int mwi_serializer_pool_setup(void) -{ - char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; - int idx; - - for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { - /* Create name with seq number appended. */ - ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/mwi"); - - mwi_serializer_pool[idx] = ast_sip_create_serializer(tps_name); - if (!mwi_serializer_pool[idx]) { - mwi_serializer_pool_shutdown(); - return -1; - } - } - return 0; -} - -/*! - * \internal - * \brief Pick a mwi serializer from the pool. - * \since 13.12.0 - * - * \retval least queue size task processor. - */ -static struct ast_taskprocessor *get_mwi_serializer(void) -{ - int idx; - int pos; - - if (!mwi_serializer_pool[0]) { - return NULL; - } - - for (pos = idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { - if (ast_taskprocessor_size(mwi_serializer_pool[idx]) < ast_taskprocessor_size(mwi_serializer_pool[pos])) { - pos = idx; - } - } - - return mwi_serializer_pool[pos]; -} - -/*! - * \internal - * \brief Set taskprocessor alert levels for the serializers in the mwi pool. - * \since 13.12.0 - * - * \retval 0 on success. - * \retval -1 on error. - */ -static int mwi_serializer_set_alert_levels(void) -{ - int idx; - long tps_queue_high; - long tps_queue_low; - - if (!mwi_serializer_pool[0]) { - return -1; - } - - tps_queue_high = ast_sip_get_mwi_tps_queue_high(); - if (tps_queue_high <= 0) { - ast_log(AST_LOG_WARNING, "Invalid taskprocessor high water alert trigger level '%ld'\n", - tps_queue_high); - tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL; - } - - tps_queue_low = ast_sip_get_mwi_tps_queue_low(); - if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) { - ast_log(AST_LOG_WARNING, "Invalid taskprocessor low water clear alert level '%ld'\n", - tps_queue_low); - tps_queue_low = -1; - } - - for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) { - if (ast_taskprocessor_alert_set_levels(mwi_serializer_pool[idx], tps_queue_low, tps_queue_high)) { - ast_log(AST_LOG_WARNING, "Failed to set alert levels for MWI serializer pool #%d.\n", - idx); - } - } - - return 0; -} - - static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg); @@ -688,13 +582,15 @@ static int unsubscribe_stasis(void *obj, void *arg, int flags) } static int create_unsolicited_mwi_subscriptions(struct ast_sip_endpoint *endpoint, - int recreate, int send_now); + int recreate, int send_now, struct ao2_container *unsolicited_mwi, struct ao2_container *solicited_mwi); static void mwi_subscription_shutdown(struct ast_sip_subscription *sub) { struct mwi_subscription *mwi_sub; struct ast_datastore *mwi_datastore; struct ast_sip_endpoint *endpoint = NULL; + struct ao2_container *unsolicited_mwi; + struct ao2_container *solicited_mwi; mwi_datastore = ast_sip_subscription_get_datastore(sub, MWI_DATASTORE); if (!mwi_datastore) { @@ -708,18 +604,25 @@ static void mwi_subscription_shutdown(struct ast_sip_subscription *sub) endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", mwi_sub->id); ao2_ref(mwi_datastore, -1); - ao2_unlink(solicited_mwi, mwi_sub); + + solicited_mwi = ao2_global_obj_ref(mwi_solicited); + if (solicited_mwi) { + ao2_unlink(solicited_mwi, mwi_sub); + } /* * When a solicited subscription is removed it's possible an unsolicited one * needs to be [re-]created. Attempt to establish unsolicited MWI. */ + unsolicited_mwi = ao2_global_obj_ref(mwi_unsolicited); if (unsolicited_mwi && endpoint) { ao2_lock(unsolicited_mwi); - create_unsolicited_mwi_subscriptions(endpoint, 1, 1); + create_unsolicited_mwi_subscriptions(endpoint, 1, 1, unsolicited_mwi, solicited_mwi); ao2_unlock(unsolicited_mwi); + ao2_ref(unsolicited_mwi, -1); } + ao2_cleanup(solicited_mwi); ao2_cleanup(endpoint); } @@ -785,6 +688,10 @@ static int has_mwi_subscription(struct ao2_container *container, *mwi_sub = NULL; *mwi_stasis = NULL; + if (!container) { + return 0; + } + mwi_subs = ao2_find(container, ast_sorcery_object_get_id(endpoint), OBJ_SEARCH_KEY | OBJ_MULTIPLE | OBJ_NOLOCK); if (!mwi_subs) { @@ -814,11 +721,13 @@ static int has_mwi_subscription(struct ao2_container *container, * * \param endpoint The endpoint * \param mailbox The mailbox + * \param unsolicited_mwi A container of unsolicited mwi objects * * \retval 1 if a solicited subscription is allowed for the endpoint/mailbox * 0 otherwise */ -static int allow_and_or_replace_unsolicited(struct ast_sip_endpoint *endpoint, const char *mailbox) +static int allow_and_or_replace_unsolicited(struct ast_sip_endpoint *endpoint, const char *mailbox, + struct ao2_container *unsolicited_mwi) { struct mwi_subscription *mwi_sub; struct mwi_stasis_subscription *mwi_stasis; @@ -865,11 +774,14 @@ static int send_notify(void *obj, void *arg, int flags); * * \param endpoint The endpoint * \param mailbox The mailbox + * \param unsolicited_mwi A container of unsolicited mwi objects + * \param solicited_mwi A container of solicited mwi objects * * \retval 1 if an unsolicited subscription is allowed for the endpoint/mailbox * 0 otherwise */ -static int is_unsolicited_allowed(struct ast_sip_endpoint *endpoint, const char *mailbox) +static int is_unsolicited_allowed(struct ast_sip_endpoint *endpoint, const char *mailbox, + struct ao2_container *unsolicited_mwi, struct ao2_container *solicited_mwi) { struct mwi_subscription *mwi_sub; struct mwi_stasis_subscription *mwi_stasis; @@ -935,28 +847,41 @@ static int mwi_validate_for_aor(void *obj, void *arg, int flags) struct ast_sip_endpoint *endpoint = arg; char *mailboxes; char *mailbox; + struct ao2_container *unsolicited_mwi; if (ast_strlen_zero(aor->mailboxes)) { return 0; } /* A reload could be taking place so lock while checking if allowed */ - ao2_lock(unsolicited_mwi); + unsolicited_mwi = ao2_global_obj_ref(mwi_unsolicited); + if (unsolicited_mwi) { + ao2_lock(unsolicited_mwi); + } + mailboxes = ast_strdupa(aor->mailboxes); while ((mailbox = ast_strip(strsep(&mailboxes, ",")))) { if (ast_strlen_zero(mailbox)) { continue; } - if (!allow_and_or_replace_unsolicited(endpoint, mailbox)) { + if (!allow_and_or_replace_unsolicited(endpoint, mailbox, unsolicited_mwi)) { ast_debug(1, "Endpoint '%s' already configured for unsolicited MWI for mailbox '%s'. " "Denying MWI subscription to %s\n", ast_sorcery_object_get_id(endpoint), mailbox, ast_sorcery_object_get_id(aor)); - ao2_unlock(unsolicited_mwi); + + if (unsolicited_mwi) { + ao2_unlock(unsolicited_mwi); + ao2_ref(unsolicited_mwi, -1); + } return -1; } } - ao2_unlock(unsolicited_mwi); + + if (unsolicited_mwi) { + ao2_unlock(unsolicited_mwi); + ao2_ref(unsolicited_mwi, -1); + } return 0; } @@ -1082,6 +1007,7 @@ static int mwi_subscription_established(struct ast_sip_subscription *sip_sub) const char *resource = ast_sip_subscription_get_resource_name(sip_sub); struct mwi_subscription *sub; struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sip_sub); + struct ao2_container *solicited_mwi; /* no aor in uri? subscribe to all on endpoint */ if (ast_strlen_zero(resource)) { @@ -1102,7 +1028,12 @@ static int mwi_subscription_established(struct ast_sip_subscription *sip_sub) ast_sip_subscription_remove_datastore(sip_sub, MWI_DATASTORE); } - ao2_link(solicited_mwi, sub); + solicited_mwi = ao2_global_obj_ref(mwi_solicited); + if (solicited_mwi) { + ao2_link(solicited_mwi, sub); + ao2_ref(solicited_mwi, -1); + } + ao2_cleanup(sub); ao2_cleanup(endpoint); return 0; @@ -1213,7 +1144,7 @@ static int send_notify(void *obj, void *arg, int flags) struct mwi_subscription *mwi_sub = obj; struct ast_taskprocessor *serializer = mwi_sub->is_solicited ? ast_sip_subscription_get_serializer(mwi_sub->sip_sub) - : get_mwi_serializer(); + : ast_serializer_pool_get(mwi_serializer_pool); if (ast_sip_push_task(serializer, serialized_notify, ao2_bump(mwi_sub))) { ao2_ref(mwi_sub, -1); @@ -1228,7 +1159,8 @@ static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub, struct mwi_subscription *mwi_sub = userdata; if (stasis_subscription_final_message(sub, msg)) { - if (ast_sip_push_task(NULL, serialized_cleanup, ao2_bump(mwi_sub))) { + if (ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool), + serialized_cleanup, ao2_bump(mwi_sub))) { ao2_ref(mwi_sub, -1); } return; @@ -1248,11 +1180,13 @@ static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub, * \param endpoint An endpoint object * \param recreate Whether or not unsolicited subscriptions are potentially being recreated * \param send_now Whether or not to send a notify once the subscription is created + * \param unsolicited_mwi A container of unsolicited mwi objects + * \param solicited_mwi A container of solicited mwi objects * * \retval 0 */ static int create_unsolicited_mwi_subscriptions(struct ast_sip_endpoint *endpoint, - int recreate, int send_now) + int recreate, int send_now, struct ao2_container *unsolicited_mwi, struct ao2_container *solicited_mwi) { RAII_VAR(struct mwi_subscription *, aggregate_sub, NULL, ao2_cleanup); char *mailboxes; @@ -1287,14 +1221,16 @@ static int create_unsolicited_mwi_subscriptions(struct ast_sip_endpoint *endpoin } /* Lock solicited so we don't potentially add to both containers */ - ao2_lock(solicited_mwi); + if (solicited_mwi) { + ao2_lock(solicited_mwi); + } mailboxes = ast_strdupa(endpoint->subscription.mwi.mailboxes); while ((mailbox = ast_strip(strsep(&mailboxes, ",")))) { struct mwi_subscription *sub; struct mwi_stasis_subscription *mwi_stasis_sub; - if (!is_unsolicited_allowed(endpoint, mailbox)) { + if (!is_unsolicited_allowed(endpoint, mailbox, unsolicited_mwi, solicited_mwi)) { continue; } @@ -1334,13 +1270,16 @@ static int create_unsolicited_mwi_subscriptions(struct ast_sip_endpoint *endpoin } } - ao2_unlock(solicited_mwi); + if (solicited_mwi) { + ao2_unlock(solicited_mwi); + } + return 0; } -static int create_mwi_subscriptions_for_endpoint(void *obj, void *arg, int flags) +static int create_mwi_subscriptions_for_endpoint(void *obj, void *arg, void *data, int flags) { - return create_unsolicited_mwi_subscriptions(obj, 0, 0); + return create_unsolicited_mwi_subscriptions(obj, 0, 0, arg, data); } static int unsubscribe(void *obj, void *arg, int flags) @@ -1354,9 +1293,16 @@ static int unsubscribe(void *obj, void *arg, int flags) static void create_mwi_subscriptions(void) { + struct ao2_container *unsolicited_mwi; + struct ao2_container *solicited_mwi; struct ao2_container *endpoints; struct ast_variable *var; + unsolicited_mwi = ao2_global_obj_ref(mwi_unsolicited); + if (!unsolicited_mwi) { + return; + } + var = ast_variable_new("mailboxes !=", "", ""); endpoints = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "endpoint", @@ -1364,9 +1310,12 @@ static void create_mwi_subscriptions(void) ast_variables_destroy(var); if (!endpoints) { + ao2_ref(unsolicited_mwi, -1); return; } + solicited_mwi = ao2_global_obj_ref(mwi_solicited); + /* We remove all the old stasis subscriptions first before applying the new configuration. This * prevents a situation where there might be multiple overlapping stasis subscriptions for an * endpoint for mailboxes. Though there may be mailbox changes during the gap between unsubscribing @@ -1375,10 +1324,12 @@ static void create_mwi_subscriptions(void) */ ao2_lock(unsolicited_mwi); ao2_callback(unsolicited_mwi, OBJ_NOLOCK | OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL); - ao2_callback(endpoints, OBJ_NODATA, create_mwi_subscriptions_for_endpoint, NULL); + ao2_callback_data(endpoints, OBJ_NODATA, create_mwi_subscriptions_for_endpoint, unsolicited_mwi, solicited_mwi); ao2_unlock(unsolicited_mwi); ao2_ref(endpoints, -1); + ao2_cleanup(solicited_mwi); + ao2_ref(unsolicited_mwi, -1); } /*! \brief Function called to send MWI NOTIFY on any unsolicited mailboxes relating to this AOR */ @@ -1391,7 +1342,8 @@ static int send_contact_notify(void *obj, void *arg, int flags) return 0; } - if (ast_sip_push_task(get_mwi_serializer(), serialized_notify, ao2_bump(mwi_sub))) { + if (ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool), + serialized_notify, ao2_bump(mwi_sub))) { ao2_ref(mwi_sub, -1); } @@ -1404,6 +1356,8 @@ static void mwi_contact_changed(const struct ast_sip_contact *contact) char *id = ast_strdupa(ast_sorcery_object_get_id(contact)); char *aor = NULL; struct ast_sip_endpoint *endpoint = NULL; + struct ao2_container *unsolicited_mwi; + struct ao2_container *solicited_mwi; if (contact->endpoint) { endpoint = ao2_bump(contact->endpoint); @@ -1418,10 +1372,20 @@ static void mwi_contact_changed(const struct ast_sip_contact *contact) return; } + unsolicited_mwi = ao2_global_obj_ref(mwi_unsolicited); + if (!unsolicited_mwi) { + ao2_cleanup(endpoint); + return; + } + + solicited_mwi = ao2_global_obj_ref(mwi_solicited); + ao2_lock(unsolicited_mwi); - create_mwi_subscriptions_for_endpoint(endpoint, NULL, 0); + create_unsolicited_mwi_subscriptions(endpoint, 0, 0, unsolicited_mwi, solicited_mwi); ao2_unlock(unsolicited_mwi); ao2_cleanup(endpoint); + ao2_cleanup(solicited_mwi); + ao2_ref(unsolicited_mwi, -1); aor = strsep(&id, ";@"); ao2_callback(unsolicited_mwi, OBJ_NODATA, send_contact_notify, aor); @@ -1447,6 +1411,7 @@ static void mwi_contact_deleted(const void *object) struct mwi_subscription *mwi_sub; struct ast_sip_endpoint *endpoint = NULL; struct ast_sip_contact *found_contact; + struct ao2_container *unsolicited_mwi; if (contact->endpoint) { endpoint = ao2_bump(contact->endpoint); @@ -1469,6 +1434,11 @@ static void mwi_contact_deleted(const void *object) return; } + unsolicited_mwi = ao2_global_obj_ref(mwi_unsolicited); + if (!unsolicited_mwi) { + return; + } + ao2_lock(unsolicited_mwi); mwi_subs = ao2_find(unsolicited_mwi, contact->endpoint_name, OBJ_SEARCH_KEY | OBJ_MULTIPLE | OBJ_NOLOCK | OBJ_UNLINK); @@ -1479,6 +1449,7 @@ static void mwi_contact_deleted(const void *object) ao2_iterator_destroy(mwi_subs); } ao2_unlock(unsolicited_mwi); + ao2_ref(unsolicited_mwi, -1); } /*! \brief Observer for contacts so unsolicited MWI is sent when a contact changes */ @@ -1491,7 +1462,12 @@ static const struct ast_sorcery_observer mwi_contact_observer = { /*! \brief Task invoked to send initial MWI NOTIFY for unsolicited */ static int send_initial_notify_all(void *obj) { - ao2_callback(unsolicited_mwi, OBJ_NODATA, send_notify, NULL); + struct ao2_container *unsolicited_mwi = ao2_global_obj_ref(mwi_unsolicited); + + if (unsolicited_mwi) { + ao2_callback(unsolicited_mwi, OBJ_NODATA, send_notify, NULL); + ao2_ref(unsolicited_mwi, -1); + } return 0; } @@ -1513,7 +1489,8 @@ static void mwi_startup_event_cb(void *data, struct stasis_subscription *sub, st return; } - ast_sip_push_task(NULL, send_initial_notify_all, NULL); + ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool), + send_initial_notify_all, NULL); stasis_unsubscribe(sub); } @@ -1522,7 +1499,8 @@ static void global_loaded(const char *object_type) { ast_free(default_voicemail_extension); default_voicemail_extension = ast_sip_get_default_voicemail_extension(); - mwi_serializer_set_alert_levels(); + ast_serializer_pool_set_alerts(mwi_serializer_pool, + ast_sip_get_mwi_tps_queue_high(), ast_sip_get_mwi_tps_queue_low()); } static struct ast_sorcery_observer global_observer = { @@ -1537,32 +1515,65 @@ static int reload(void) return 0; } +static int unload_module(void) +{ + struct ao2_container *unsolicited_mwi; + + ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer); + ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer); + + unsolicited_mwi = ao2_global_obj_replace(mwi_unsolicited, NULL); + if (unsolicited_mwi) { + ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL); + ao2_ref(unsolicited_mwi, -1); + } + + ao2_global_obj_release(mwi_solicited); + + if (ast_serializer_pool_destroy(mwi_serializer_pool)) { + ast_log(LOG_WARNING, "Unload incomplete. Try again later\n"); + return -1; + } + mwi_serializer_pool = NULL; + + ast_sip_unregister_subscription_handler(&mwi_handler); + + ast_free(default_voicemail_extension); + default_voicemail_extension = NULL; + return 0; +} + static int load_module(void) { + struct ao2_container *mwi_container; + if (ast_sip_register_subscription_handler(&mwi_handler)) { return AST_MODULE_LOAD_DECLINE; } - if (mwi_serializer_pool_setup()) { + mwi_serializer_pool = ast_serializer_pool_create("pjsip/mwi", + MWI_SERIALIZER_POOL_SIZE, ast_sip_threadpool(), MAX_UNLOAD_TIMEOUT_TIME); + if (!mwi_serializer_pool) { ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n"); } - solicited_mwi = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS, + mwi_container = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS, mwi_sub_hash, NULL, mwi_sub_cmp); - if (!solicited_mwi) { - mwi_serializer_pool_shutdown(); - ast_sip_unregister_subscription_handler(&mwi_handler); + if (!mwi_container) { + unload_module(); return AST_MODULE_LOAD_DECLINE; } + ao2_global_obj_replace_unref(mwi_solicited, mwi_container); + ao2_ref(mwi_container, -1); - unsolicited_mwi = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS, + mwi_container = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, MWI_BUCKETS, mwi_sub_hash, NULL, mwi_sub_cmp); - if (!unsolicited_mwi) { - mwi_serializer_pool_shutdown(); - ast_sip_unregister_subscription_handler(&mwi_handler); - ao2_ref(solicited_mwi, -1); + if (!mwi_container) { + unload_module(); return AST_MODULE_LOAD_DECLINE; } + ao2_global_obj_replace_unref(mwi_unsolicited, mwi_container); + ao2_ref(mwi_container, -1); ast_sorcery_observer_add(ast_sip_get_sorcery(), "contact", &mwi_contact_observer); ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer); @@ -1571,7 +1582,8 @@ static int load_module(void) if (!ast_sip_get_mwi_disable_initial_unsolicited()) { create_mwi_subscriptions(); if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { - ast_sip_push_task(NULL, send_initial_notify_all, NULL); + ast_sip_push_task(ast_serializer_pool_get(mwi_serializer_pool), + send_initial_notify_all, NULL); } else { struct stasis_subscription *sub; @@ -1581,27 +1593,16 @@ static int load_module(void) } } - return AST_MODULE_LOAD_SUCCESS; -} - -static int unload_module(void) -{ - ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer); - ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer); - - ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL); - ao2_ref(unsolicited_mwi, -1); - unsolicited_mwi = NULL; - - ao2_cleanup(solicited_mwi); - - mwi_serializer_pool_shutdown(); - - ast_sip_unregister_subscription_handler(&mwi_handler); + if (!mwi_serializer_pool) { + /* + * If the mwi serializer pool was unable to be established then the module will + * use the default serializer pool. If this happens prevent manual unloading + * since there would now exist the potential for a crash on unload. + */ + ast_module_shutdown_ref(ast_module_info->self); + } - ast_free(default_voicemail_extension); - default_voicemail_extension = NULL; - return 0; + return AST_MODULE_LOAD_SUCCESS; } AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "PJSIP MWI resource",