Skip to content
Snippets Groups Projects
Commit c93c5791 authored by Kevin Harwell's avatar Kevin Harwell
Browse files

app_voicemail: Remove dependency on the stasis cache

app_voicemail utilized the stasis cache when polling mailboxes for MWI. This
caused a memory leak (items were not being appropriately removed from the
cache), and subsequent slowdown in system processing. This patch removes the
stasis cache dependency, thus alleviating the memory leak. It does this by
utilizing the new MWI API that better manages state lifetime.

ASTERISK-28443
ASTERISK-27121

Change-Id: Ie89fedaca81ea1fd03d150d9d3a1ef3d53740e46
parent 9637e1df
No related branches found
No related tags found
No related merge requests found
...@@ -1016,43 +1016,8 @@ static ast_cond_t poll_cond = PTHREAD_COND_INITIALIZER; ...@@ -1016,43 +1016,8 @@ static ast_cond_t poll_cond = PTHREAD_COND_INITIALIZER;
static pthread_t poll_thread = AST_PTHREADT_NULL; static pthread_t poll_thread = AST_PTHREADT_NULL;
static unsigned char poll_thread_run; static unsigned char poll_thread_run;
   
/*! Subscription to MWI event subscription changes */
static struct stasis_subscription *mwi_sub_sub;
/*!
* \brief An MWI subscription
*
* This is so we can keep track of which mailboxes are subscribed to.
* This way, we know which mailboxes to poll when the pollmailboxes
* option is being used.
*/
struct mwi_sub {
AST_RWLIST_ENTRY(mwi_sub) entry;
int old_urgent;
int old_new;
int old_old;
char *uniqueid;
char mailbox[0];
};
struct mwi_sub_task {
const char *mailbox;
const char *context;
const char *uniqueid;
};
static void mwi_sub_task_dtor(struct mwi_sub_task *mwist)
{
ast_free((void *) mwist->mailbox);
ast_free((void *) mwist->context);
ast_free((void *) mwist->uniqueid);
ast_free(mwist);
}
static struct ast_taskprocessor *mwi_subscription_tps; static struct ast_taskprocessor *mwi_subscription_tps;
   
static AST_RWLIST_HEAD_STATIC(mwi_subs, mwi_sub);
struct alias_mailbox_mapping { struct alias_mailbox_mapping {
char *alias; char *alias;
char *mailbox; char *mailbox;
...@@ -6316,7 +6281,7 @@ static int inboxcount(const char *mailbox, int *newmsgs, int *oldmsgs) ...@@ -6316,7 +6281,7 @@ static int inboxcount(const char *mailbox, int *newmsgs, int *oldmsgs)
return res; return res;
} }
   
static void run_externnotify(char *context, char *extension, const char *flag) static void run_externnotify(const char *context, const char *extension, const char *flag)
{ {
char arguments[255]; char arguments[255];
char ext_context[256] = ""; char ext_context[256] = "";
...@@ -13197,38 +13162,29 @@ static struct ast_cli_entry cli_voicemail[] = { ...@@ -13197,38 +13162,29 @@ static struct ast_cli_entry cli_voicemail[] = {
AST_CLI_DEFINE(handle_voicemail_reload, "Reload voicemail configuration"), AST_CLI_DEFINE(handle_voicemail_reload, "Reload voicemail configuration"),
}; };
   
static void poll_subscribed_mailbox(struct mwi_sub *mwi_sub) static int poll_subscribed_mailbox(struct ast_mwi_state *mwi_state, void *data)
{ {
int new = 0, old = 0, urgent = 0; int new = 0, old = 0, urgent = 0;
   
inboxcount2(mwi_sub->mailbox, &urgent, &new, &old); if (!mwi_state) {
/* This should only occur due to allocation failure of a default mwi state object */
return 0;
}
inboxcount2(mwi_state->uniqueid, &urgent, &new, &old);
   
#ifdef IMAP_STORAGE #ifdef IMAP_STORAGE
if (imap_poll_logout) { if (imap_poll_logout) {
imap_logout(mwi_sub->mailbox); imap_logout(mwi_state->uniqueid);
} }
#endif #endif
   
if (urgent != mwi_sub->old_urgent || new != mwi_sub->old_new || old != mwi_sub->old_old) { if (urgent != mwi_state->urgent_msgs || new != mwi_state->new_msgs || old != mwi_state->old_msgs) {
mwi_sub->old_urgent = urgent; queue_mwi_event(NULL, mwi_state->uniqueid, urgent, new, old);
mwi_sub->old_new = new; run_externnotify(NULL, mwi_state->uniqueid, NULL);
mwi_sub->old_old = old;
queue_mwi_event(NULL, mwi_sub->mailbox, urgent, new, old);
run_externnotify(NULL, mwi_sub->mailbox, NULL);
} }
}
   
static void poll_subscribed_mailboxes(void) return 0;
{
struct mwi_sub *mwi_sub;
AST_RWLIST_RDLOCK(&mwi_subs);
AST_RWLIST_TRAVERSE(&mwi_subs, mwi_sub, entry) {
if (!ast_strlen_zero(mwi_sub->mailbox)) {
poll_subscribed_mailbox(mwi_sub);
}
}
AST_RWLIST_UNLOCK(&mwi_subs);
} }
   
static void *mb_poll_thread(void *data) static void *mb_poll_thread(void *data)
...@@ -13237,6 +13193,12 @@ static void *mb_poll_thread(void *data) ...@@ -13237,6 +13193,12 @@ static void *mb_poll_thread(void *data)
struct timespec ts = { 0, }; struct timespec ts = { 0, };
struct timeval wait; struct timeval wait;
   
ast_mwi_state_callback_subscribed(poll_subscribed_mailbox, NULL);
if (!poll_thread_run) {
break;
}
wait = ast_tvadd(ast_tvnow(), ast_samp2tv(poll_freq, 1)); wait = ast_tvadd(ast_tvnow(), ast_samp2tv(poll_freq, 1));
ts.tv_sec = wait.tv_sec; ts.tv_sec = wait.tv_sec;
ts.tv_nsec = wait.tv_usec * 1000; ts.tv_nsec = wait.tv_usec * 1000;
...@@ -13244,22 +13206,11 @@ static void *mb_poll_thread(void *data) ...@@ -13244,22 +13206,11 @@ static void *mb_poll_thread(void *data)
ast_mutex_lock(&poll_lock); ast_mutex_lock(&poll_lock);
ast_cond_timedwait(&poll_cond, &poll_lock, &ts); ast_cond_timedwait(&poll_cond, &poll_lock, &ts);
ast_mutex_unlock(&poll_lock); ast_mutex_unlock(&poll_lock);
if (!poll_thread_run)
break;
poll_subscribed_mailboxes();
} }
   
return NULL; return NULL;
} }
   
static void mwi_sub_destroy(struct mwi_sub *mwi_sub)
{
ast_free(mwi_sub->uniqueid);
ast_free(mwi_sub);
}
#ifdef IMAP_STORAGE #ifdef IMAP_STORAGE
static void imap_logout(const char *mailbox_id) static void imap_logout(const char *mailbox_id)
{ {
...@@ -13295,157 +13246,74 @@ static void imap_logout(const char *mailbox_id) ...@@ -13295,157 +13246,74 @@ static void imap_logout(const char *mailbox_id)
vmstate_delete(vms); vmstate_delete(vms);
} }
   
static void imap_close_subscribed_mailboxes(void) static int imap_close_subscribed_mailbox(struct ast_mwi_state *mwi_state, void *data)
{
struct mwi_sub *mwi_sub;
AST_RWLIST_RDLOCK(&mwi_subs);
AST_RWLIST_TRAVERSE(&mwi_subs, mwi_sub, entry) {
if (!ast_strlen_zero(mwi_sub->mailbox)) {
imap_logout(mwi_sub->mailbox);
}
}
AST_RWLIST_UNLOCK(&mwi_subs);
}
#endif
static int handle_unsubscribe(void *datap)
{ {
struct mwi_sub *mwi_sub; if (mwi_state && !ast_strlen_zero(mwi_state->uniqueid)) {
char *uniqueid = datap; imap_logout(mwi_state->uniqueid);
AST_RWLIST_WRLOCK(&mwi_subs);
AST_RWLIST_TRAVERSE_SAFE_BEGIN(&mwi_subs, mwi_sub, entry) {
if (!strcmp(mwi_sub->uniqueid, uniqueid)) {
AST_LIST_REMOVE_CURRENT(entry);
/* Don't break here since a duplicate uniqueid
* may have been added as a result of a cache dump. */
#ifdef IMAP_STORAGE
imap_logout(mwi_sub->mailbox);
#endif
mwi_sub_destroy(mwi_sub);
}
} }
AST_RWLIST_TRAVERSE_SAFE_END
AST_RWLIST_UNLOCK(&mwi_subs);
   
ast_free(uniqueid);
return 0; return 0;
} }
   
static int handle_subscribe(void *datap) #endif
{
unsigned int len;
struct mwi_sub *mwi_sub;
struct mwi_sub_task *p = datap;
len = sizeof(*mwi_sub) + 1;
if (!ast_strlen_zero(p->mailbox))
len += strlen(p->mailbox);
if (!ast_strlen_zero(p->context))
len += strlen(p->context) + 1; /* Allow for seperator */
   
if (!(mwi_sub = ast_calloc(1, len))) static int mwi_handle_unsubscribe2(void *data)
return -1; {
struct ast_mwi_state *mwi_state = data;
   
mwi_sub->uniqueid = ast_strdup(p->uniqueid); /*
if (!ast_strlen_zero(p->mailbox)) * Go ahead and clear the implicit MWI publisher here to avoid a leak. If a backing
strcpy(mwi_sub->mailbox, p->mailbox); * configuration is available it'll re-initialize (reset the cached state) on its
* next publish.
*/
ast_delete_mwi_state_full(mwi_state->uniqueid, NULL, NULL);
   
if (!ast_strlen_zero(p->context)) { #ifdef IMAP_STORAGE
strcat(mwi_sub->mailbox, "@"); imap_close_subscribed_mailbox(mwi_state, NULL);
strcat(mwi_sub->mailbox, p->context); #endif
}
   
AST_RWLIST_WRLOCK(&mwi_subs); ao2_ref(mwi_state, -1);
AST_RWLIST_INSERT_TAIL(&mwi_subs, mwi_sub, entry);
AST_RWLIST_UNLOCK(&mwi_subs);
mwi_sub_task_dtor(p);
poll_subscribed_mailbox(mwi_sub);
return 0; return 0;
} }
   
static void mwi_unsub_event_cb(struct stasis_subscription_change *change) static void mwi_handle_unsubscribe(const char *id, struct ast_mwi_subscriber *sub)
{ {
char *uniqueid = ast_strdup(change->uniqueid); void *data = ast_mwi_subscriber_data(sub);
if (!uniqueid) {
ast_log(LOG_ERROR, "Unable to allocate memory for uniqueid\n");
return;
}
   
if (ast_taskprocessor_push(mwi_subscription_tps, handle_unsubscribe, uniqueid) < 0) { /* Don't bump data's reference. We'll just use the one returned above */
ast_free(uniqueid); if (ast_taskprocessor_push(mwi_subscription_tps, mwi_handle_unsubscribe2, data) < 0) {
/* A reference was returned for data when retrieving, so remove it on error */
ao2_ref(data, -1);
} }
} }
   
static void mwi_sub_event_cb(struct stasis_subscription_change *change) static int mwi_handle_subscribe2(void *data)
{ {
struct mwi_sub_task *mwist; poll_subscribed_mailbox(data, NULL);
const char *topic; ao2_ref(data, -1);
char *context; return 0;
char *mailbox;
mwist = ast_calloc(1, (sizeof(*mwist)));
if (!mwist) {
return;
}
/* The topic name is prefixed with "mwi:all/" as this is a pool topic */
topic = stasis_topic_name(change->topic) + 8;
if (separate_mailbox(ast_strdupa(topic), &mailbox, &context)) {
ast_free(mwist);
return;
}
mwist->mailbox = ast_strdup(mailbox);
mwist->context = ast_strdup(context);
mwist->uniqueid = ast_strdup(change->uniqueid);
if (ast_taskprocessor_push(mwi_subscription_tps, handle_subscribe, mwist) < 0) {
mwi_sub_task_dtor(mwist);
}
} }
   
static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg) static void mwi_handle_subscribe(const char *id, struct ast_mwi_subscriber *sub)
{ {
struct stasis_subscription_change *change; void *data = ast_mwi_subscriber_data(sub);
/* Only looking for subscription change notices here */
if (stasis_message_type(msg) != stasis_subscription_change_type()) {
return;
}
change = stasis_message_data(msg);
if (change->topic == ast_mwi_topic_all()) {
return;
}
   
if (!strcmp(change->description, "Subscribe")) { /* Don't bump data's reference. We'll just use the one returned above */
mwi_sub_event_cb(change); if (ast_taskprocessor_push(mwi_subscription_tps, mwi_handle_subscribe2, data) < 0) {
} else if (!strcmp(change->description, "Unsubscribe")) { /* A reference was returned for data when retrieving, so remove it on error */
mwi_unsub_event_cb(change); ao2_ref(data, -1);
} }
} }
   
static int dump_cache(void *obj, void *arg, int flags) struct ast_mwi_observer mwi_observer = {
{ .on_subscribe = mwi_handle_subscribe,
struct stasis_message *msg = obj; .on_unsubscribe = mwi_handle_unsubscribe,
mwi_event_cb(NULL, NULL, msg); };
return 0;
}
   
static void start_poll_thread(void) static void start_poll_thread(void)
{ {
int errcode; int errcode;
mwi_sub_sub = stasis_subscribe(ast_mwi_topic_all(), mwi_event_cb, NULL); ast_mwi_add_observer(&mwi_observer);
if (mwi_sub_sub) {
struct ao2_container *cached = stasis_cache_dump(ast_mwi_state_cache(), stasis_subscription_change_type());
if (cached) {
ao2_callback(cached, OBJ_MULTIPLE | OBJ_NODATA, dump_cache, NULL);
}
ao2_cleanup(cached);
}
   
poll_thread_run = 1; poll_thread_run = 1;
   
...@@ -13458,15 +13326,14 @@ static void stop_poll_thread(void) ...@@ -13458,15 +13326,14 @@ static void stop_poll_thread(void)
{ {
poll_thread_run = 0; poll_thread_run = 0;
   
mwi_sub_sub = stasis_unsubscribe_and_join(mwi_sub_sub);
ast_mutex_lock(&poll_lock); ast_mutex_lock(&poll_lock);
ast_cond_signal(&poll_cond); ast_cond_signal(&poll_cond);
ast_mutex_unlock(&poll_lock); ast_mutex_unlock(&poll_lock);
   
pthread_join(poll_thread, NULL); pthread_join(poll_thread, NULL);
poll_thread = AST_PTHREADT_NULL; poll_thread = AST_PTHREADT_NULL;
ast_mwi_remove_observer(&mwi_observer);
} }
   
/*! /*!
...@@ -13590,38 +13457,40 @@ static int append_vmu_info_astman( ...@@ -13590,38 +13457,40 @@ static int append_vmu_info_astman(
   
} }
   
static int manager_voicemail_refresh(struct mansession *s, const struct message *m) static int manager_match_mailbox(struct ast_mwi_state *mwi_state, void *data)
{ {
const char *context = astman_get_header(m, "Context"); const char *context = astman_get_header(data, "Context");
const char *mailbox = astman_get_header(m, "Mailbox"); const char *mailbox = astman_get_header(data, "Mailbox");
struct mwi_sub *mwi_sub;
const char *at; const char *at;
   
AST_RWLIST_RDLOCK(&mwi_subs); if (!ast_strlen_zero(mwi_state->uniqueid)) {
AST_RWLIST_TRAVERSE(&mwi_subs, mwi_sub, entry) { if (
if (!ast_strlen_zero(mwi_sub->mailbox)) { /* First case: everything matches */
if ( (ast_strlen_zero(context) && ast_strlen_zero(mailbox)) ||
/* First case: everything matches */ /* Second case: match the mailbox only */
(ast_strlen_zero(context) && ast_strlen_zero(mailbox)) || (ast_strlen_zero(context) && !ast_strlen_zero(mailbox) &&
/* Second case: match the mailbox only */ (at = strchr(mwi_state->uniqueid, '@')) &&
(ast_strlen_zero(context) && !ast_strlen_zero(mailbox) && strncmp(mailbox, mwi_state->uniqueid, at - mwi_state->uniqueid) == 0) ||
(at = strchr(mwi_sub->mailbox, '@')) && /* Third case: match the context only */
strncmp(mailbox, mwi_sub->mailbox, at - mwi_sub->mailbox) == 0) || (!ast_strlen_zero(context) && ast_strlen_zero(mailbox) &&
/* Third case: match the context only */ (at = strchr(mwi_state->uniqueid, '@')) &&
(!ast_strlen_zero(context) && ast_strlen_zero(mailbox) && strcmp(context, at + 1) == 0) ||
(at = strchr(mwi_sub->mailbox, '@')) && /* Final case: match an exact specified mailbox */
strcmp(context, at + 1) == 0) || (!ast_strlen_zero(context) && !ast_strlen_zero(mailbox) &&
/* Final case: match an exact specified mailbox */ (at = strchr(mwi_state->uniqueid, '@')) &&
(!ast_strlen_zero(context) && !ast_strlen_zero(mailbox) && strncmp(mailbox, mwi_state->uniqueid, at - mwi_state->uniqueid) == 0 &&
(at = strchr(mwi_sub->mailbox, '@')) && strcmp(context, at + 1) == 0)
strncmp(mailbox, mwi_sub->mailbox, at - mwi_sub->mailbox) == 0 &&
strcmp(context, at + 1) == 0)
) { ) {
poll_subscribed_mailbox(mwi_sub); poll_subscribed_mailbox(mwi_state, NULL);
}
} }
} }
AST_RWLIST_UNLOCK(&mwi_subs);
return 0;
}
static int manager_voicemail_refresh(struct mansession *s, const struct message *m)
{
ast_mwi_state_callback_all(manager_match_mailbox, (void *)m);
astman_send_ack(s, m, "Refresh sent"); astman_send_ack(s, m, "Refresh sent");
return RESULT_SUCCESS; return RESULT_SUCCESS;
} }
...@@ -13943,7 +13812,7 @@ static int actual_load_config(int reload, struct ast_config *cfg, struct ast_con ...@@ -13943,7 +13812,7 @@ static int actual_load_config(int reload, struct ast_config *cfg, struct ast_con
strcpy(listen_control_stop_key, DEFAULT_LISTEN_CONTROL_STOP_KEY); strcpy(listen_control_stop_key, DEFAULT_LISTEN_CONTROL_STOP_KEY);
   
#ifdef IMAP_STORAGE #ifdef IMAP_STORAGE
imap_close_subscribed_mailboxes(); ast_mwi_state_callback_all(imap_close_subscribed_mailbox, NULL);
#endif #endif
   
/* Free all the users structure */ /* Free all the users structure */
...@@ -15326,7 +15195,7 @@ static int unload_module(void) ...@@ -15326,7 +15195,7 @@ static int unload_module(void)
ast_unload_realtime("voicemail_data"); ast_unload_realtime("voicemail_data");
   
#ifdef IMAP_STORAGE #ifdef IMAP_STORAGE
imap_close_subscribed_mailboxes(); ast_mwi_state_callback_all(imap_close_subscribed_mailbox, NULL);
#endif #endif
free_vm_users(); free_vm_users();
free_vm_zones(); free_vm_zones();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment