From c93c579190c50a648c652fdcf0b3592fb7d64e9d Mon Sep 17 00:00:00 2001 From: Kevin Harwell <kharwell@digium.com> Date: Wed, 12 Jun 2019 13:49:30 -0500 Subject: [PATCH] app_voicemail: Remove dependency on the stasis cache app_voicemail utilized the stasis cache when polling mailboxes for MWI. This caused a memory leak (items were not being appropriately removed from the cache), and subsequent slowdown in system processing. This patch removes the stasis cache dependency, thus alleviating the memory leak. It does this by utilizing the new MWI API that better manages state lifetime. ASTERISK-28443 ASTERISK-27121 Change-Id: Ie89fedaca81ea1fd03d150d9d3a1ef3d53740e46 --- apps/app_voicemail.c | 309 +++++++++++++------------------------------ 1 file changed, 89 insertions(+), 220 deletions(-) diff --git a/apps/app_voicemail.c b/apps/app_voicemail.c index c0edd447b5..5603bb5b5f 100644 --- a/apps/app_voicemail.c +++ b/apps/app_voicemail.c @@ -1016,43 +1016,8 @@ static ast_cond_t poll_cond = PTHREAD_COND_INITIALIZER; static pthread_t poll_thread = AST_PTHREADT_NULL; static unsigned char poll_thread_run; -/*! Subscription to MWI event subscription changes */ -static struct stasis_subscription *mwi_sub_sub; - -/*! - * \brief An MWI subscription - * - * This is so we can keep track of which mailboxes are subscribed to. - * This way, we know which mailboxes to poll when the pollmailboxes - * option is being used. - */ -struct mwi_sub { - AST_RWLIST_ENTRY(mwi_sub) entry; - int old_urgent; - int old_new; - int old_old; - char *uniqueid; - char mailbox[0]; -}; - -struct mwi_sub_task { - const char *mailbox; - const char *context; - const char *uniqueid; -}; - -static void mwi_sub_task_dtor(struct mwi_sub_task *mwist) -{ - ast_free((void *) mwist->mailbox); - ast_free((void *) mwist->context); - ast_free((void *) mwist->uniqueid); - ast_free(mwist); -} - static struct ast_taskprocessor *mwi_subscription_tps; -static AST_RWLIST_HEAD_STATIC(mwi_subs, mwi_sub); - struct alias_mailbox_mapping { char *alias; char *mailbox; @@ -6316,7 +6281,7 @@ static int inboxcount(const char *mailbox, int *newmsgs, int *oldmsgs) return res; } -static void run_externnotify(char *context, char *extension, const char *flag) +static void run_externnotify(const char *context, const char *extension, const char *flag) { char arguments[255]; char ext_context[256] = ""; @@ -13197,38 +13162,29 @@ static struct ast_cli_entry cli_voicemail[] = { AST_CLI_DEFINE(handle_voicemail_reload, "Reload voicemail configuration"), }; -static void poll_subscribed_mailbox(struct mwi_sub *mwi_sub) +static int poll_subscribed_mailbox(struct ast_mwi_state *mwi_state, void *data) { int new = 0, old = 0, urgent = 0; - inboxcount2(mwi_sub->mailbox, &urgent, &new, &old); + if (!mwi_state) { + /* This should only occur due to allocation failure of a default mwi state object */ + return 0; + } + + inboxcount2(mwi_state->uniqueid, &urgent, &new, &old); #ifdef IMAP_STORAGE if (imap_poll_logout) { - imap_logout(mwi_sub->mailbox); + imap_logout(mwi_state->uniqueid); } #endif - if (urgent != mwi_sub->old_urgent || new != mwi_sub->old_new || old != mwi_sub->old_old) { - mwi_sub->old_urgent = urgent; - mwi_sub->old_new = new; - mwi_sub->old_old = old; - queue_mwi_event(NULL, mwi_sub->mailbox, urgent, new, old); - run_externnotify(NULL, mwi_sub->mailbox, NULL); + if (urgent != mwi_state->urgent_msgs || new != mwi_state->new_msgs || old != mwi_state->old_msgs) { + queue_mwi_event(NULL, mwi_state->uniqueid, urgent, new, old); + run_externnotify(NULL, mwi_state->uniqueid, NULL); } -} -static void poll_subscribed_mailboxes(void) -{ - struct mwi_sub *mwi_sub; - - AST_RWLIST_RDLOCK(&mwi_subs); - AST_RWLIST_TRAVERSE(&mwi_subs, mwi_sub, entry) { - if (!ast_strlen_zero(mwi_sub->mailbox)) { - poll_subscribed_mailbox(mwi_sub); - } - } - AST_RWLIST_UNLOCK(&mwi_subs); + return 0; } static void *mb_poll_thread(void *data) @@ -13237,6 +13193,12 @@ static void *mb_poll_thread(void *data) struct timespec ts = { 0, }; struct timeval wait; + ast_mwi_state_callback_subscribed(poll_subscribed_mailbox, NULL); + + if (!poll_thread_run) { + break; + } + wait = ast_tvadd(ast_tvnow(), ast_samp2tv(poll_freq, 1)); ts.tv_sec = wait.tv_sec; ts.tv_nsec = wait.tv_usec * 1000; @@ -13244,22 +13206,11 @@ static void *mb_poll_thread(void *data) ast_mutex_lock(&poll_lock); ast_cond_timedwait(&poll_cond, &poll_lock, &ts); ast_mutex_unlock(&poll_lock); - - if (!poll_thread_run) - break; - - poll_subscribed_mailboxes(); } return NULL; } -static void mwi_sub_destroy(struct mwi_sub *mwi_sub) -{ - ast_free(mwi_sub->uniqueid); - ast_free(mwi_sub); -} - #ifdef IMAP_STORAGE static void imap_logout(const char *mailbox_id) { @@ -13295,157 +13246,74 @@ static void imap_logout(const char *mailbox_id) vmstate_delete(vms); } -static void imap_close_subscribed_mailboxes(void) -{ - struct mwi_sub *mwi_sub; - - AST_RWLIST_RDLOCK(&mwi_subs); - AST_RWLIST_TRAVERSE(&mwi_subs, mwi_sub, entry) { - if (!ast_strlen_zero(mwi_sub->mailbox)) { - imap_logout(mwi_sub->mailbox); - } - } - AST_RWLIST_UNLOCK(&mwi_subs); -} -#endif - -static int handle_unsubscribe(void *datap) +static int imap_close_subscribed_mailbox(struct ast_mwi_state *mwi_state, void *data) { - struct mwi_sub *mwi_sub; - char *uniqueid = datap; - - AST_RWLIST_WRLOCK(&mwi_subs); - AST_RWLIST_TRAVERSE_SAFE_BEGIN(&mwi_subs, mwi_sub, entry) { - if (!strcmp(mwi_sub->uniqueid, uniqueid)) { - AST_LIST_REMOVE_CURRENT(entry); - /* Don't break here since a duplicate uniqueid - * may have been added as a result of a cache dump. */ -#ifdef IMAP_STORAGE - imap_logout(mwi_sub->mailbox); -#endif - mwi_sub_destroy(mwi_sub); - } + if (mwi_state && !ast_strlen_zero(mwi_state->uniqueid)) { + imap_logout(mwi_state->uniqueid); } - AST_RWLIST_TRAVERSE_SAFE_END - AST_RWLIST_UNLOCK(&mwi_subs); - ast_free(uniqueid); return 0; } -static int handle_subscribe(void *datap) -{ - unsigned int len; - struct mwi_sub *mwi_sub; - struct mwi_sub_task *p = datap; - - len = sizeof(*mwi_sub) + 1; - if (!ast_strlen_zero(p->mailbox)) - len += strlen(p->mailbox); - - if (!ast_strlen_zero(p->context)) - len += strlen(p->context) + 1; /* Allow for seperator */ +#endif - if (!(mwi_sub = ast_calloc(1, len))) - return -1; +static int mwi_handle_unsubscribe2(void *data) +{ + struct ast_mwi_state *mwi_state = data; - mwi_sub->uniqueid = ast_strdup(p->uniqueid); - if (!ast_strlen_zero(p->mailbox)) - strcpy(mwi_sub->mailbox, p->mailbox); + /* + * Go ahead and clear the implicit MWI publisher here to avoid a leak. If a backing + * configuration is available it'll re-initialize (reset the cached state) on its + * next publish. + */ + ast_delete_mwi_state_full(mwi_state->uniqueid, NULL, NULL); - if (!ast_strlen_zero(p->context)) { - strcat(mwi_sub->mailbox, "@"); - strcat(mwi_sub->mailbox, p->context); - } +#ifdef IMAP_STORAGE + imap_close_subscribed_mailbox(mwi_state, NULL); +#endif - AST_RWLIST_WRLOCK(&mwi_subs); - AST_RWLIST_INSERT_TAIL(&mwi_subs, mwi_sub, entry); - AST_RWLIST_UNLOCK(&mwi_subs); - mwi_sub_task_dtor(p); - poll_subscribed_mailbox(mwi_sub); + ao2_ref(mwi_state, -1); return 0; } -static void mwi_unsub_event_cb(struct stasis_subscription_change *change) +static void mwi_handle_unsubscribe(const char *id, struct ast_mwi_subscriber *sub) { - char *uniqueid = ast_strdup(change->uniqueid); - - if (!uniqueid) { - ast_log(LOG_ERROR, "Unable to allocate memory for uniqueid\n"); - return; - } + void *data = ast_mwi_subscriber_data(sub); - if (ast_taskprocessor_push(mwi_subscription_tps, handle_unsubscribe, uniqueid) < 0) { - ast_free(uniqueid); + /* Don't bump data's reference. We'll just use the one returned above */ + if (ast_taskprocessor_push(mwi_subscription_tps, mwi_handle_unsubscribe2, data) < 0) { + /* A reference was returned for data when retrieving, so remove it on error */ + ao2_ref(data, -1); } } -static void mwi_sub_event_cb(struct stasis_subscription_change *change) +static int mwi_handle_subscribe2(void *data) { - struct mwi_sub_task *mwist; - const char *topic; - char *context; - char *mailbox; - - mwist = ast_calloc(1, (sizeof(*mwist))); - if (!mwist) { - return; - } - - /* The topic name is prefixed with "mwi:all/" as this is a pool topic */ - topic = stasis_topic_name(change->topic) + 8; - if (separate_mailbox(ast_strdupa(topic), &mailbox, &context)) { - ast_free(mwist); - return; - } - - mwist->mailbox = ast_strdup(mailbox); - mwist->context = ast_strdup(context); - mwist->uniqueid = ast_strdup(change->uniqueid); - - if (ast_taskprocessor_push(mwi_subscription_tps, handle_subscribe, mwist) < 0) { - mwi_sub_task_dtor(mwist); - } + poll_subscribed_mailbox(data, NULL); + ao2_ref(data, -1); + return 0; } -static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg) +static void mwi_handle_subscribe(const char *id, struct ast_mwi_subscriber *sub) { - struct stasis_subscription_change *change; - /* Only looking for subscription change notices here */ - if (stasis_message_type(msg) != stasis_subscription_change_type()) { - return; - } - change = stasis_message_data(msg); - if (change->topic == ast_mwi_topic_all()) { - return; - } + void *data = ast_mwi_subscriber_data(sub); - if (!strcmp(change->description, "Subscribe")) { - mwi_sub_event_cb(change); - } else if (!strcmp(change->description, "Unsubscribe")) { - mwi_unsub_event_cb(change); + /* Don't bump data's reference. We'll just use the one returned above */ + if (ast_taskprocessor_push(mwi_subscription_tps, mwi_handle_subscribe2, data) < 0) { + /* A reference was returned for data when retrieving, so remove it on error */ + ao2_ref(data, -1); } } -static int dump_cache(void *obj, void *arg, int flags) -{ - struct stasis_message *msg = obj; - mwi_event_cb(NULL, NULL, msg); - return 0; -} +struct ast_mwi_observer mwi_observer = { + .on_subscribe = mwi_handle_subscribe, + .on_unsubscribe = mwi_handle_unsubscribe, +}; static void start_poll_thread(void) { int errcode; - mwi_sub_sub = stasis_subscribe(ast_mwi_topic_all(), mwi_event_cb, NULL); - - if (mwi_sub_sub) { - struct ao2_container *cached = stasis_cache_dump(ast_mwi_state_cache(), stasis_subscription_change_type()); - if (cached) { - ao2_callback(cached, OBJ_MULTIPLE | OBJ_NODATA, dump_cache, NULL); - } - ao2_cleanup(cached); - } + ast_mwi_add_observer(&mwi_observer); poll_thread_run = 1; @@ -13458,15 +13326,14 @@ static void stop_poll_thread(void) { poll_thread_run = 0; - mwi_sub_sub = stasis_unsubscribe_and_join(mwi_sub_sub); - ast_mutex_lock(&poll_lock); ast_cond_signal(&poll_cond); ast_mutex_unlock(&poll_lock); pthread_join(poll_thread, NULL); - poll_thread = AST_PTHREADT_NULL; + + ast_mwi_remove_observer(&mwi_observer); } /*! @@ -13590,38 +13457,40 @@ static int append_vmu_info_astman( } -static int manager_voicemail_refresh(struct mansession *s, const struct message *m) +static int manager_match_mailbox(struct ast_mwi_state *mwi_state, void *data) { - const char *context = astman_get_header(m, "Context"); - const char *mailbox = astman_get_header(m, "Mailbox"); - struct mwi_sub *mwi_sub; + const char *context = astman_get_header(data, "Context"); + const char *mailbox = astman_get_header(data, "Mailbox"); const char *at; - AST_RWLIST_RDLOCK(&mwi_subs); - AST_RWLIST_TRAVERSE(&mwi_subs, mwi_sub, entry) { - if (!ast_strlen_zero(mwi_sub->mailbox)) { - if ( - /* First case: everything matches */ - (ast_strlen_zero(context) && ast_strlen_zero(mailbox)) || - /* Second case: match the mailbox only */ - (ast_strlen_zero(context) && !ast_strlen_zero(mailbox) && - (at = strchr(mwi_sub->mailbox, '@')) && - strncmp(mailbox, mwi_sub->mailbox, at - mwi_sub->mailbox) == 0) || - /* Third case: match the context only */ - (!ast_strlen_zero(context) && ast_strlen_zero(mailbox) && - (at = strchr(mwi_sub->mailbox, '@')) && - strcmp(context, at + 1) == 0) || - /* Final case: match an exact specified mailbox */ - (!ast_strlen_zero(context) && !ast_strlen_zero(mailbox) && - (at = strchr(mwi_sub->mailbox, '@')) && - strncmp(mailbox, mwi_sub->mailbox, at - mwi_sub->mailbox) == 0 && - strcmp(context, at + 1) == 0) + if (!ast_strlen_zero(mwi_state->uniqueid)) { + if ( + /* First case: everything matches */ + (ast_strlen_zero(context) && ast_strlen_zero(mailbox)) || + /* Second case: match the mailbox only */ + (ast_strlen_zero(context) && !ast_strlen_zero(mailbox) && + (at = strchr(mwi_state->uniqueid, '@')) && + strncmp(mailbox, mwi_state->uniqueid, at - mwi_state->uniqueid) == 0) || + /* Third case: match the context only */ + (!ast_strlen_zero(context) && ast_strlen_zero(mailbox) && + (at = strchr(mwi_state->uniqueid, '@')) && + strcmp(context, at + 1) == 0) || + /* Final case: match an exact specified mailbox */ + (!ast_strlen_zero(context) && !ast_strlen_zero(mailbox) && + (at = strchr(mwi_state->uniqueid, '@')) && + strncmp(mailbox, mwi_state->uniqueid, at - mwi_state->uniqueid) == 0 && + strcmp(context, at + 1) == 0) ) { - poll_subscribed_mailbox(mwi_sub); - } + poll_subscribed_mailbox(mwi_state, NULL); } } - AST_RWLIST_UNLOCK(&mwi_subs); + + return 0; +} + +static int manager_voicemail_refresh(struct mansession *s, const struct message *m) +{ + ast_mwi_state_callback_all(manager_match_mailbox, (void *)m); astman_send_ack(s, m, "Refresh sent"); return RESULT_SUCCESS; } @@ -13943,7 +13812,7 @@ static int actual_load_config(int reload, struct ast_config *cfg, struct ast_con strcpy(listen_control_stop_key, DEFAULT_LISTEN_CONTROL_STOP_KEY); #ifdef IMAP_STORAGE - imap_close_subscribed_mailboxes(); + ast_mwi_state_callback_all(imap_close_subscribed_mailbox, NULL); #endif /* Free all the users structure */ @@ -15326,7 +15195,7 @@ static int unload_module(void) ast_unload_realtime("voicemail_data"); #ifdef IMAP_STORAGE - imap_close_subscribed_mailboxes(); + ast_mwi_state_callback_all(imap_close_subscribed_mailbox, NULL); #endif free_vm_users(); free_vm_zones(); -- GitLab