diff --git a/apps/app_minivm.c b/apps/app_minivm.c index 498c6ea2f9253495d8bfb45de6fd541fa796212f..53c5f0937b0efd5acbe3b573d962712826931066 100644 --- a/apps/app_minivm.c +++ b/apps/app_minivm.c @@ -2013,7 +2013,6 @@ static int leave_voicemail(struct ast_channel *chan, char *username, struct leav * \brief Queue a message waiting event */ static void queue_mwi_event(const char *mbx, const char *ctx, int urgent, int new, int old) { - struct ast_event *event; char *mailbox, *context; mailbox = ast_strdupa(mbx); @@ -2022,16 +2021,7 @@ static void queue_mwi_event(const char *mbx, const char *ctx, int urgent, int ne context = "default"; } - if (!(event = ast_event_new(AST_EVENT_MWI, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context, - AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_UINT, (new+urgent), - AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_UINT, old, - AST_EVENT_IE_END))) { - return; - } - - ast_event_queue_and_cache(event); + stasis_publish_mwi_state(mailbox, context, new + urgent, old); } /*!\internal diff --git a/apps/app_voicemail.c b/apps/app_voicemail.c index d0a8a782066be08c08ec083b2ca80230e978bc03..f1778691cb230a8402a6fbd7396158d04b3eb865 100644 --- a/apps/app_voicemail.c +++ b/apps/app_voicemail.c @@ -974,10 +974,8 @@ static ast_cond_t poll_cond = PTHREAD_COND_INITIALIZER; static pthread_t poll_thread = AST_PTHREADT_NULL; static unsigned char poll_thread_run; -/*! Subscription to ... MWI event subscriptions */ -static struct ast_event_sub *mwi_sub_sub; -/*! Subscription to ... MWI event un-subscriptions */ -static struct ast_event_sub *mwi_unsub_sub; +/*! Subscription to MWI event subscription changes */ +static struct stasis_subscription *mwi_sub_sub; /*! * \brief An MWI subscription @@ -991,16 +989,24 @@ struct mwi_sub { int old_urgent; int old_new; int old_old; - uint32_t uniqueid; + char *uniqueid; char mailbox[1]; }; struct mwi_sub_task { const char *mailbox; const char *context; - uint32_t uniqueid; + const char *uniqueid; }; +static void mwi_sub_task_dtor(struct mwi_sub_task *mwist) +{ + ast_free((void *) mwist->mailbox); + ast_free((void *) mwist->context); + ast_free((void *) mwist->uniqueid); + ast_free(mwist); +} + static struct ast_taskprocessor *mwi_subscription_tps; static AST_RWLIST_HEAD_STATIC(mwi_subs, mwi_sub); @@ -7721,25 +7727,16 @@ static int vm_forwardoptions(struct ast_channel *chan, struct ast_vm_user *vmu, static void queue_mwi_event(const char *box, int urgent, int new, int old) { - struct ast_event *event; char *mailbox, *context; /* Strip off @default */ context = mailbox = ast_strdupa(box); strsep(&context, "@"); - if (ast_strlen_zero(context)) + if (ast_strlen_zero(context)) { context = "default"; - - if (!(event = ast_event_new(AST_EVENT_MWI, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context, - AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_UINT, (new+urgent), - AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_UINT, old, - AST_EVENT_IE_END))) { - return; } - ast_event_queue_and_cache(event); + stasis_publish_mwi_state(mailbox, context, new + urgent, old); } /*! @@ -12533,28 +12530,28 @@ static void *mb_poll_thread(void *data) static void mwi_sub_destroy(struct mwi_sub *mwi_sub) { + ast_free(mwi_sub->uniqueid); ast_free(mwi_sub); } static int handle_unsubscribe(void *datap) { struct mwi_sub *mwi_sub; - uint32_t *uniqueid = datap; - + char *uniqueid = datap; + AST_RWLIST_WRLOCK(&mwi_subs); AST_RWLIST_TRAVERSE_SAFE_BEGIN(&mwi_subs, mwi_sub, entry) { - if (mwi_sub->uniqueid == *uniqueid) { + if (!strcmp(mwi_sub->uniqueid, uniqueid)) { AST_LIST_REMOVE_CURRENT(entry); - break; + /* Don't break here since a duplicate uniqueid + * may have been added as a result of a cache dump. */ + mwi_sub_destroy(mwi_sub); } } AST_RWLIST_TRAVERSE_SAFE_END AST_RWLIST_UNLOCK(&mwi_subs); - if (mwi_sub) - mwi_sub_destroy(mwi_sub); - - ast_free(uniqueid); + ast_free(uniqueid); return 0; } @@ -12574,7 +12571,7 @@ static int handle_subscribe(void *datap) if (!(mwi_sub = ast_calloc(1, len))) return -1; - mwi_sub->uniqueid = p->uniqueid; + mwi_sub->uniqueid = ast_strdup(p->uniqueid); if (!ast_strlen_zero(p->mailbox)) strcpy(mwi_sub->mailbox, p->mailbox); @@ -12586,75 +12583,85 @@ static int handle_subscribe(void *datap) AST_RWLIST_WRLOCK(&mwi_subs); AST_RWLIST_INSERT_TAIL(&mwi_subs, mwi_sub, entry); AST_RWLIST_UNLOCK(&mwi_subs); - ast_free((void *) p->mailbox); - ast_free((void *) p->context); - ast_free(p); + mwi_sub_task_dtor(p); poll_subscribed_mailbox(mwi_sub); return 0; } -static void mwi_unsub_event_cb(const struct ast_event *event, void *userdata) +static void mwi_unsub_event_cb(struct stasis_subscription_change *change) { - uint32_t u, *uniqueid = ast_calloc(1, sizeof(*uniqueid)); + char *uniqueid = ast_strdup(change->uniqueid); if (!uniqueid) { ast_log(LOG_ERROR, "Unable to allocate memory for uniqueid\n"); return; } - if (ast_event_get_type(event) != AST_EVENT_UNSUB) { + if (ast_taskprocessor_push(mwi_subscription_tps, handle_unsubscribe, uniqueid) < 0) { ast_free(uniqueid); - return; } +} - if (ast_event_get_ie_uint(event, AST_EVENT_IE_EVENTTYPE) != AST_EVENT_MWI) { - ast_free(uniqueid); +static void mwi_sub_event_cb(struct stasis_subscription_change *change) +{ + struct mwi_sub_task *mwist; + char *context = ast_strdupa(stasis_topic_name(change->topic)); + char *mailbox; + + if ((mwist = ast_calloc(1, (sizeof(*mwist)))) == NULL) { return; } - u = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID); - *uniqueid = u; - if (ast_taskprocessor_push(mwi_subscription_tps, handle_unsubscribe, uniqueid) < 0) { - ast_free(uniqueid); + mailbox = strsep(&context, "@"); + + mwist->mailbox = ast_strdup(mailbox); + mwist->context = ast_strdup(context); + mwist->uniqueid = ast_strdup(change->uniqueid); + + if (ast_taskprocessor_push(mwi_subscription_tps, handle_subscribe, mwist) < 0) { + mwi_sub_task_dtor(mwist); } } -static void mwi_sub_event_cb(const struct ast_event *event, void *userdata) +static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { - struct mwi_sub_task *mwist; - - if (ast_event_get_type(event) != AST_EVENT_SUB) - return; - - if (ast_event_get_ie_uint(event, AST_EVENT_IE_EVENTTYPE) != AST_EVENT_MWI) + struct stasis_subscription_change *change; + /* Only looking for subscription change notices here */ + if (stasis_message_type(msg) != stasis_subscription_change()) { return; + } - if ((mwist = ast_calloc(1, (sizeof(*mwist)))) == NULL) { - ast_log(LOG_ERROR, "could not allocate a mwi_sub_task\n"); + change = stasis_message_data(msg); + if (change->topic == stasis_mwi_topic_all()) { return; } - mwist->mailbox = ast_strdup(ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX)); - mwist->context = ast_strdup(ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT)); - mwist->uniqueid = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID); - - if (ast_taskprocessor_push(mwi_subscription_tps, handle_subscribe, mwist) < 0) { - ast_free(mwist); + + if (!strcmp(change->description, "Subscribe")) { + mwi_sub_event_cb(change); + } else if (!strcmp(change->description, "Unsubscribe")) { + mwi_unsub_event_cb(change); } } +static int dump_cache(void *obj, void *arg, int flags) +{ + struct stasis_message *msg = obj; + mwi_event_cb(NULL, NULL, NULL, msg); + return 0; +} + static void start_poll_thread(void) { int errcode; - mwi_sub_sub = ast_event_subscribe(AST_EVENT_SUB, mwi_sub_event_cb, "Voicemail MWI subscription", NULL, - AST_EVENT_IE_EVENTTYPE, AST_EVENT_IE_PLTYPE_UINT, AST_EVENT_MWI, - AST_EVENT_IE_END); - - mwi_unsub_sub = ast_event_subscribe(AST_EVENT_UNSUB, mwi_unsub_event_cb, "Voicemail MWI subscription", NULL, - AST_EVENT_IE_EVENTTYPE, AST_EVENT_IE_PLTYPE_UINT, AST_EVENT_MWI, - AST_EVENT_IE_END); + mwi_sub_sub = stasis_subscribe(stasis_mwi_topic_all(), mwi_event_cb, NULL); - if (mwi_sub_sub) - ast_event_report_subs(mwi_sub_sub); + if (mwi_sub_sub) { + struct ao2_container *cached = stasis_cache_dump(stasis_mwi_topic_cached(), stasis_subscription_change()); + if (cached) { + ao2_callback(cached, OBJ_MULTIPLE | OBJ_NODATA, dump_cache, NULL); + } + ao2_cleanup(cached); + } poll_thread_run = 1; @@ -12668,13 +12675,7 @@ static void stop_poll_thread(void) poll_thread_run = 0; if (mwi_sub_sub) { - ast_event_unsubscribe(mwi_sub_sub); - mwi_sub_sub = NULL; - } - - if (mwi_unsub_sub) { - ast_event_unsubscribe(mwi_unsub_sub); - mwi_unsub_sub = NULL; + mwi_sub_sub = stasis_unsubscribe(mwi_sub_sub); } ast_mutex_lock(&poll_lock); diff --git a/channels/chan_dahdi.c b/channels/chan_dahdi.c index e077e842266698d23d9dd1dade74d30300868005..15075ebf2b7b7e832595ae65cf2d1682e8c78698 100644 --- a/channels/chan_dahdi.c +++ b/channels/chan_dahdi.c @@ -502,7 +502,7 @@ static enum ast_bridge_result dahdi_bridge(struct ast_channel *c0, struct ast_ch static int dahdi_sendtext(struct ast_channel *c, const char *text); -static void mwi_event_cb(const struct ast_event *event, void *userdata) +static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { /* This module does not handle MWI in an event-based manner. However, it * subscribes to MWI for each mailbox that is configured so that the core @@ -1215,7 +1215,7 @@ struct dahdi_pvt { */ char mailbox[AST_MAX_EXTENSION]; /*! \brief Opaque event subscription parameters for message waiting indication support. */ - struct ast_event_sub *mwi_event_sub; + struct stasis_subscription *mwi_event_sub; /*! \brief Delayed dialing for E911. Overlap digits for ISDN. */ char dialdest[256]; #ifdef HAVE_DAHDI_LINEREVERSE_VMWI @@ -3753,7 +3753,6 @@ struct sig_ss7_callback sig_ss7_callbacks = static void notify_message(char *mailbox_full, int thereornot) { char s[sizeof(mwimonitornotify) + 80]; - struct ast_event *event; char *mailbox, *context; /* Strip off @default */ @@ -3762,16 +3761,7 @@ static void notify_message(char *mailbox_full, int thereornot) if (ast_strlen_zero(context)) context = "default"; - if (!(event = ast_event_new(AST_EVENT_MWI, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context, - AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_UINT, thereornot, - AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_UINT, thereornot, - AST_EVENT_IE_END))) { - return; - } - - ast_event_queue_and_cache(event); + stasis_publish_mwi_state(mailbox, context, thereornot, thereornot); if (!ast_strlen_zero(mailbox) && !ast_strlen_zero(mwimonitornotify)) { snprintf(s, sizeof(s), "%s %s %d", mwimonitornotify, mailbox, thereornot); @@ -5413,24 +5403,25 @@ static int send_cwcidspill(struct dahdi_pvt *p) static int has_voicemail(struct dahdi_pvt *p) { int new_msgs; - struct ast_event *event; char *mailbox, *context; + RAII_VAR(struct stasis_message *, mwi_message, NULL, ao2_cleanup); + struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID); mailbox = context = ast_strdupa(p->mailbox); strsep(&context, "@"); - if (ast_strlen_zero(context)) + if (ast_strlen_zero(context)) { context = "default"; + } - event = ast_event_get_cached(AST_EVENT_MWI, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context, - AST_EVENT_IE_END); + ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context); + mwi_message = stasis_cache_get(stasis_mwi_topic_cached(), stasis_mwi_state_message(), ast_str_buffer(uniqueid)); - if (event) { - new_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS); - ast_event_destroy(event); - } else + if (mwi_message) { + struct stasis_mwi_state *mwi_state = stasis_message_data(mwi_message); + new_msgs = mwi_state->new_msgs; + } else { new_msgs = ast_app_has_voicemail(p->mailbox, NULL); + } return new_msgs; } @@ -5965,10 +5956,12 @@ static void destroy_dahdi_pvt(struct dahdi_pvt *pvt) } } ast_free(p->cidspill); - if (p->use_smdi) + if (p->use_smdi) { ast_smdi_interface_unref(p->smdi_iface); - if (p->mwi_event_sub) - ast_event_unsubscribe(p->mwi_event_sub); + } + if (p->mwi_event_sub) { + p->mwi_event_sub = stasis_unsubscribe(p->mwi_event_sub); + } if (p->vars) { ast_variables_destroy(p->vars); } @@ -5981,8 +5974,9 @@ static void destroy_dahdi_pvt(struct dahdi_pvt *pvt) ast_mutex_destroy(&p->lock); dahdi_close_sub(p, SUB_REAL); - if (p->owner) + if (p->owner) { ast_channel_tech_pvt_set(p->owner, NULL); + } ast_free(p); } @@ -13226,15 +13220,20 @@ static struct dahdi_pvt *mkintf(int channel, const struct dahdi_chan_conf *conf, ast_copy_string(tmp->mailbox, conf->chan.mailbox, sizeof(tmp->mailbox)); if (channel != CHAN_PSEUDO && !ast_strlen_zero(tmp->mailbox)) { char *mailbox, *context; + struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID); + struct stasis_topic *mailbox_specific_topic; + mailbox = context = ast_strdupa(tmp->mailbox); strsep(&context, "@"); if (ast_strlen_zero(context)) context = "default"; - tmp->mwi_event_sub = ast_event_subscribe(AST_EVENT_MWI, mwi_event_cb, "Dahdi MWI subscription", NULL, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context, - AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_EXISTS, - AST_EVENT_IE_END); + + ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context); + + mailbox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid)); + if (mailbox_specific_topic) { + tmp->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL); + } } #ifdef HAVE_DAHDI_LINEREVERSE_VMWI tmp->mwisend_setting = conf->chan.mwisend_setting; diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index 5a307bf0c6201e7de63dcfb73c0245f9bb8e4867..7280c480e6aaf770dd2e61a1763bdbcbfb94ac16 100644 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -533,7 +533,7 @@ struct iax2_peer { int expire; /*!< Schedule entry for expiry */ int expiry; /*!< How soon to expire */ - iax2_format capability; /*!< Capability */ + iax2_format capability; /*!< Capability */ /* Qualification */ int callno; /*!< Call number of POKE request */ @@ -545,12 +545,12 @@ struct iax2_peer { int pokefreqnotok; /*!< How often to check when the host has been determined to be down */ int historicms; /*!< How long recent average responses took */ int smoothing; /*!< Sample over how many units to determine historic ms */ - uint16_t maxcallno; /*!< Max call number limit for this peer. Set on registration */ + uint16_t maxcallno; /*!< Max call number limit for this peer. Set on registration */ - struct ast_event_sub *mwi_event_sub; + struct stasis_subscription *mwi_event_sub; /*!< This subscription lets pollmailboxes know which mailboxes need to be polled */ struct ast_acl_list *acl; - enum calltoken_peer_enum calltoken_required; /*!< Is calltoken validation required or not, can be YES, NO, or AUTO */ + enum calltoken_peer_enum calltoken_required; /*!< Is calltoken validation required or not, can be YES, NO, or AUTO */ }; #define IAX2_TRUNK_PREFACE (sizeof(struct iax_frame) + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr)) @@ -1316,7 +1316,7 @@ static void iax2_lock_owner(int callno) } } -static void mwi_event_cb(const struct ast_event *event, void *userdata) +static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { /* The MWI subscriptions exist just so the core knows we care about those * mailboxes. However, we just grab the events out of the cache when it @@ -8743,23 +8743,24 @@ static int update_registry(struct sockaddr_in *sin, int callno, char *devtype, i iax_ie_append_short(&ied, IAX_IE_REFRESH, p->expiry); iax_ie_append_addr(&ied, IAX_IE_APPARENT_ADDR, &peer_addr); if (!ast_strlen_zero(p->mailbox)) { - struct ast_event *event; int new, old; char *mailbox, *context; + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID); context = mailbox = ast_strdupa(p->mailbox); strsep(&context, "@"); - if (ast_strlen_zero(context)) + if (ast_strlen_zero(context)) { context = "default"; + } + + ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context); + msg = stasis_cache_get(stasis_mwi_topic_cached(), stasis_mwi_state_message(), ast_str_buffer(uniqueid)); - event = ast_event_get_cached(AST_EVENT_MWI, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context, - AST_EVENT_IE_END); - if (event) { - new = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS); - old = ast_event_get_ie_uint(event, AST_EVENT_IE_OLDMSGS); - ast_event_destroy(event); + if (msg) { + struct stasis_mwi_state *mwi_state = stasis_message_data(msg); + new = mwi_state->new_msgs; + old = mwi_state->old_msgs; } else { /* Fall back on checking the mailbox directly */ ast_app_inboxcount(p->mailbox, &new, &old); } @@ -12392,8 +12393,9 @@ static void peer_destructor(void *obj) if (peer->dnsmgr) ast_dnsmgr_release(peer->dnsmgr); - if (peer->mwi_event_sub) - ast_event_unsubscribe(peer->mwi_event_sub); + if (peer->mwi_event_sub) { + peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub); + } ast_string_field_free_memory(peer); } @@ -12667,14 +12669,21 @@ static struct iax2_peer *build_peer(const char *name, struct ast_variable *v, st if (!ast_strlen_zero(peer->mailbox)) { char *mailbox, *context; + struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID); + struct stasis_topic *mailbox_specific_topic; + context = mailbox = ast_strdupa(peer->mailbox); strsep(&context, "@"); - if (ast_strlen_zero(context)) + if (ast_strlen_zero(context)) { context = "default"; - peer->mwi_event_sub = ast_event_subscribe(AST_EVENT_MWI, mwi_event_cb, "IAX MWI subscription", NULL, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context, - AST_EVENT_IE_END); + } + + ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context); + + mailbox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid)); + if (mailbox_specific_topic) { + peer->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL); + } } if (subscribe_acl_change) { diff --git a/channels/chan_mgcp.c b/channels/chan_mgcp.c index 0c3dac6074138b1ac554c20532e0a46be2325662..65a53900ee8bcf36acdf955b8b8983f3d5a8a8a3 100644 --- a/channels/chan_mgcp.c +++ b/channels/chan_mgcp.c @@ -82,6 +82,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/event.h" #include "asterisk/chanvars.h" #include "asterisk/pktccops.h" +#include "asterisk/stasis.h" /* * Define to work around buggy dlink MGCP phone firmware which @@ -342,7 +343,7 @@ struct mgcp_endpoint { char curtone[80]; /*!< Current tone */ char mailbox[AST_MAX_EXTENSION]; char parkinglot[AST_MAX_CONTEXT]; /*!< Parkinglot */ - struct ast_event_sub *mwi_event_sub; + struct stasis_subscription *mwi_event_sub; ast_group_t callgroup; ast_group_t pickupgroup; int callwaiting; @@ -483,7 +484,7 @@ static struct ast_channel_tech mgcp_tech = { .func_channel_read = acf_channel_read, }; -static void mwi_event_cb(const struct ast_event *event, void *userdata) +static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { /* This module does not handle MWI in an event-based manner. However, it * subscribes to MWI for each mailbox that is configured so that the core @@ -494,24 +495,26 @@ static void mwi_event_cb(const struct ast_event *event, void *userdata) static int has_voicemail(struct mgcp_endpoint *p) { int new_msgs; - struct ast_event *event; + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID); char *mbox, *cntx; cntx = mbox = ast_strdupa(p->mailbox); strsep(&cntx, "@"); - if (ast_strlen_zero(cntx)) + if (ast_strlen_zero(cntx)) { cntx = "default"; + } - event = ast_event_get_cached(AST_EVENT_MWI, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mbox, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, cntx, - AST_EVENT_IE_END); + ast_str_set(&uniqueid, 0, "%s@%s", mbox, cntx); - if (event) { - new_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS); - ast_event_destroy(event); - } else + msg = stasis_cache_get(stasis_mwi_topic_cached(), stasis_mwi_state(), ast_str_buffer(uniqueid)); + + if (msg) { + struct stasis_mwi_state *mwi_state = stasis_message_data(msg); + new_msgs = mwi_state->new_msgs; + } else { new_msgs = ast_app_has_voicemail(p->mailbox, NULL); + } return new_msgs; } @@ -3972,6 +3975,7 @@ static struct mgcp_gateway *build_gateway(char *cat, struct ast_variable *v) struct mgcp_endpoint *e; struct mgcp_subchannel *sub; struct ast_variable *chanvars = NULL; + struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID); /*char txident[80];*/ int i=0, y=0; @@ -4168,16 +4172,20 @@ static struct mgcp_gateway *build_gateway(char *cat, struct ast_variable *v) ast_copy_string(e->parkinglot, parkinglot, sizeof(e->parkinglot)); if (!ast_strlen_zero(e->mailbox)) { char *mbox, *cntx; + struct stasis_topic *mailbox_specific_topic; + cntx = mbox = ast_strdupa(e->mailbox); strsep(&cntx, "@"); if (ast_strlen_zero(cntx)) { cntx = "default"; } - e->mwi_event_sub = ast_event_subscribe(AST_EVENT_MWI, mwi_event_cb, "MGCP MWI subscription", NULL, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mbox, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, cntx, - AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_EXISTS, - AST_EVENT_IE_END); + ast_str_reset(uniqueid); + ast_str_set(&uniqueid, 0, "%s@%s", mbox, cntx); + + maibox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid)); + if (mailbox_specific_topic) { + e->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL); + } } snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", ast_random()); e->msgstate = -1; @@ -4516,8 +4524,9 @@ static void destroy_endpoint(struct mgcp_endpoint *e) ast_free(s); } - if (e->mwi_event_sub) - ast_event_unsubscribe(e->mwi_event_sub); + if (e->mwi_event_sub) { + e->mwi_event_sub = stasis_unsubscribe(e->mwi_event_sub); + } if (e->chanvars) { ast_variables_destroy(e->chanvars); diff --git a/channels/chan_sip.c b/channels/chan_sip.c index 631c1db5d4db7f9508ff9ecc03ad132166462d1c..28e8d4d354df027a8cedaa070bc472aabc5b3646 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -294,6 +294,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "sip/include/dialplan_functions.h" #include "sip/include/security_events.h" #include "asterisk/sip_api.h" +#include "asterisk/app.h" /*** DOCUMENTATION <application name="SIPDtmfMode" language="en_US"> @@ -1275,7 +1276,7 @@ static int sip_poke_noanswer(const void *data); static int sip_poke_peer(struct sip_peer *peer, int force); static void sip_poke_all_peers(void); static void sip_peer_hold(struct sip_pvt *p, int hold); -static void mwi_event_cb(const struct ast_event *, void *); +static void mwi_event_cb(void *, struct stasis_subscription *, struct stasis_topic *, struct stasis_message *); static void network_change_event_cb(const struct ast_event *, void *); static void acl_change_event_cb(const struct ast_event *event, void *userdata); static void sip_keepalive_all_peers(void); @@ -5225,8 +5226,9 @@ static void register_peer_exten(struct sip_peer *peer, int onoff) /*! Destroy mailbox subscriptions */ static void destroy_mailbox(struct sip_mailbox *mailbox) { - if (mailbox->event_sub) - ast_event_unsubscribe(mailbox->event_sub); + if (mailbox->event_sub) { + mailbox->event_sub = stasis_unsubscribe(mailbox->event_sub); + } ast_free(mailbox); } @@ -16644,11 +16646,16 @@ static void sip_peer_hold(struct sip_pvt *p, int hold) } /*! \brief Receive MWI events that we have subscribed to */ -static void mwi_event_cb(const struct ast_event *event, void *userdata) +static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { struct sip_peer *peer = userdata; - - sip_send_mwi_to_peer(peer, 0); + if (stasis_subscription_final_message(sub, msg)) { + ao2_cleanup(peer); + return; + } + if (stasis_mwi_state_message() == stasis_message_type(msg)) { + sip_send_mwi_to_peer(peer, 0); + } } static void network_change_event_subscribe(void) @@ -24787,16 +24794,9 @@ static int handle_request_notify(struct sip_pvt *p, struct sip_request *req, str if (!ast_strlen_zero(mailbox) && !ast_strlen_zero(c)) { char *old = strsep(&c, " "); char *new = strsep(&old, "/"); - struct ast_event *event; - if ((event = ast_event_new(AST_EVENT_MWI, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, "SIP_Remote", - AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_UINT, atoi(new), - AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_UINT, atoi(old), - AST_EVENT_IE_END))) { - ast_event_queue_and_cache(event); - } + stasis_publish_mwi_state(mailbox, "SIP_Remote", atoi(new), atoi(old)); + transmit_response(p, "200 OK", req); } else { transmit_response(p, "489 Bad event", req); @@ -27617,16 +27617,20 @@ static int handle_request_publish(struct sip_pvt *p, struct sip_request *req, st static void add_peer_mwi_subs(struct sip_peer *peer) { struct sip_mailbox *mailbox; + struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID); AST_LIST_TRAVERSE(&peer->mailboxes, mailbox, entry) { - if (mailbox->event_sub) { - ast_event_unsubscribe(mailbox->event_sub); - } + struct stasis_topic *mailbox_specific_topic; + mailbox->event_sub = stasis_unsubscribe(mailbox->event_sub); + + ast_str_reset(uniqueid); + ast_str_set(&uniqueid, 0, "%s@%s", mailbox->mailbox, S_OR(mailbox->context, "default")); - mailbox->event_sub = ast_event_subscribe(AST_EVENT_MWI, mwi_event_cb, "SIP mbox event", peer, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox->mailbox, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, S_OR(mailbox->context, "default"), - AST_EVENT_IE_END); + mailbox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid)); + if (mailbox_specific_topic) { + ao2_ref(peer, +1); + mailbox->event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, peer); + } } } @@ -28832,19 +28836,24 @@ static int get_cached_mwi(struct sip_peer *peer, int *new, int *old) { struct sip_mailbox *mailbox; int in_cache; + struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID); in_cache = 0; AST_LIST_TRAVERSE(&peer->mailboxes, mailbox, entry) { - struct ast_event *event; - event = ast_event_get_cached(AST_EVENT_MWI, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox->mailbox, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, S_OR(mailbox->context, "default"), - AST_EVENT_IE_END); - if (!event) + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + struct stasis_mwi_state *mwi_state; + + ast_str_reset(uniqueid); + ast_str_set(&uniqueid, 0, "%s@%s", mailbox->mailbox, S_OR(mailbox->context, "default")); + + msg = stasis_cache_get(stasis_mwi_topic_cached(), stasis_mwi_state_message(), ast_str_buffer(uniqueid)); + if (!msg) { continue; - *new += ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS); - *old += ast_event_get_ie_uint(event, AST_EVENT_IE_OLDMSGS); - ast_event_destroy(event); + } + + mwi_state = stasis_message_data(msg); + *new += mwi_state->new_msgs; + *old += mwi_state->old_msgs; in_cache = 1; } diff --git a/channels/chan_skinny.c b/channels/chan_skinny.c index c974caee80ad0c67bb2d6d15c0ec624ea82ff166..6045a09f07185a9231eb2537e9357b024ab02efa 100644 --- a/channels/chan_skinny.c +++ b/channels/chan_skinny.c @@ -1442,7 +1442,7 @@ struct skinny_line { SKINNY_LINE_OPTIONS ast_mutex_t lock; struct skinny_container *container; - struct ast_event_sub *mwi_event_sub; /* Event based MWI */ + struct stasis_subscription *mwi_event_sub; /* Event based MWI */ struct skinny_subchannel *activesub; AST_LIST_HEAD(, skinny_subchannel) sub; AST_LIST_HEAD(, skinny_subline) sublines; @@ -1611,7 +1611,7 @@ static int skinny_indicate(struct ast_channel *ast, int ind, const void *data, s static int skinny_fixup(struct ast_channel *oldchan, struct ast_channel *newchan); static int skinny_senddigit_begin(struct ast_channel *ast, char digit); static int skinny_senddigit_end(struct ast_channel *ast, char digit, unsigned int duration); -static void mwi_event_cb(const struct ast_event *event, void *userdata); +static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg); static int skinny_dialer_cb(const void *data); static int skinny_reload(void); @@ -2261,7 +2261,7 @@ static int skinny_register(struct skinny_req *req, struct skinnysession *s) manager_event(EVENT_FLAG_SYSTEM, "PeerStatus", "ChannelType: Skinny\r\nPeer: Skinny/%s@%s\r\nPeerStatus: Registered\r\n", l->name, d->name); register_exten(l); /* initialize MWI on line and device */ - mwi_event_cb(0, l); + mwi_event_cb(l, NULL, NULL, NULL); AST_LIST_TRAVERSE(&l->sublines, subline, list) { ast_extension_state_add(subline->context, subline->exten, skinny_extensionstate_cb, subline->container); } @@ -3507,7 +3507,7 @@ static void update_connectedline(struct skinny_subchannel *sub, const void *data send_callinfo(sub); } -static void mwi_event_cb(const struct ast_event *event, void *userdata) +static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { struct skinny_line *l = userdata; struct skinny_device *d = l->device; @@ -3518,8 +3518,9 @@ static void mwi_event_cb(const struct ast_event *event, void *userdata) return; } - if (event) { - l->newmsgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS); + if (msg && stasis_mwi_state_message() == stasis_message_type(msg)) { + struct stasis_mwi_state *mwi_state = stasis_message_data(msg); + l->newmsgs = mwi_state->new_msgs; } if (l->newmsgs) { @@ -8250,16 +8251,22 @@ static struct skinny_line *config_line(const char *lname, struct ast_variable *v if (!ast_strlen_zero(l->mailbox)) { char *cfg_mailbox, *cfg_context; + struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID); + struct stasis_topic *mailbox_specific_topic; + cfg_context = cfg_mailbox = ast_strdupa(l->mailbox); ast_verb(3, "Setting mailbox '%s' on line %s\n", cfg_mailbox, l->name); strsep(&cfg_context, "@"); - if (ast_strlen_zero(cfg_context)) - cfg_context = "default"; - l->mwi_event_sub = ast_event_subscribe(AST_EVENT_MWI, mwi_event_cb, "skinny MWI subsciption", l, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, cfg_mailbox, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, cfg_context, - AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_EXISTS, - AST_EVENT_IE_END); + if (ast_strlen_zero(cfg_context)) { + cfg_context = "default"; + } + + ast_str_set(&uniqueid, 0, "%s@%s", cfg_mailbox, cfg_context); + + mailbox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid)); + if (mailbox_specific_topic) { + l->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, l); + } } if (!ast_strlen_zero(vmexten) && ast_strlen_zero(l->vmexten)) { @@ -8694,8 +8701,9 @@ static int unload_module(void) } ast_mutex_unlock(&sub->lock); } - if (l->mwi_event_sub) - ast_event_unsubscribe(l->mwi_event_sub); + if (l->mwi_event_sub) { + l->mwi_event_sub = stasis_unsubscribe(l->mwi_event_sub); + } ast_mutex_unlock(&l->lock); manager_event(EVENT_FLAG_SYSTEM, "PeerStatus", "ChannelType: Skinny\r\nPeer: Skinny/%s@%s\r\nPeerStatus: Unregistered\r\n", l->name, d->name); unregister_exten(l); diff --git a/channels/chan_unistim.c b/channels/chan_unistim.c index b4ec9e455f925ed1f8d3cf517b9fa0005fb6547a..130549c0173852f6a460561e2f14d43c7ec9e607 100644 --- a/channels/chan_unistim.c +++ b/channels/chan_unistim.c @@ -5500,23 +5500,24 @@ static int unistim_sendtext(struct ast_channel *ast, const char *text) /*--- unistim_send_mwi_to_peer: Send message waiting indication ---*/ static int unistim_send_mwi_to_peer(struct unistim_line *peer, unsigned int tick) { - struct ast_event *event; int new; char *mailbox, *context; + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID); context = mailbox = ast_strdupa(peer->mailbox); strsep(&context, "@"); if (ast_strlen_zero(context)) { context = "default"; } - event = ast_event_get_cached(AST_EVENT_MWI, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context, - AST_EVENT_IE_END); - if (event) { - new = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS); - ast_event_destroy(event); + ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context); + + msg = stasis_cache_get(stasis_mwi_topic_cached(), stasis_mwi_state_message(), ast_str_buffer(uniqueid)); + + if (msg) { + struct stasis_mwi_state *mwi_state = stasis_message_data(msg); + new = mwi_state->new_msgs; } else { /* Fall back on checking the mailbox directly */ new = ast_app_has_voicemail(peer->mailbox, "INBOX"); } diff --git a/channels/sig_pri.c b/channels/sig_pri.c index e01ceadd39095ac880aac41b7d47fc993581f330..e3b9b3fb6fdf294b5bf69de23b43b92b8478e63f 100644 --- a/channels/sig_pri.c +++ b/channels/sig_pri.c @@ -8752,23 +8752,30 @@ static void sig_pri_send_mwi_indication(struct sig_pri_span *pri, const char *vm * * \return Nothing */ -static void sig_pri_mwi_event_cb(const struct ast_event *event, void *userdata) +static void sig_pri_mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { struct sig_pri_span *pri = userdata; const char *mbox_context; const char *mbox_number; int num_messages; int idx; + struct stasis_mwi_state *mwi_state; - mbox_number = ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX); + if (stasis_mwi_state_message() != stasis_message_type(msg)) { + return; + } + + mwi_state = stasis_message_data(msg); + + mbox_number = mwi_state->mailbox; if (ast_strlen_zero(mbox_number)) { return; } - mbox_context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT); + mbox_context = mwi_state->context; if (ast_strlen_zero(mbox_context)) { return; } - num_messages = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS); + num_messages = mwi_state->new_msgs; for (idx = 0; idx < ARRAY_LEN(pri->mbox); ++idx) { if (!pri->mbox[idx].sub) { @@ -8799,27 +8806,28 @@ static void sig_pri_mwi_event_cb(const struct ast_event *event, void *userdata) static void sig_pri_mwi_cache_update(struct sig_pri_span *pri) { int idx; - int num_messages; - struct ast_event *event; + struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID); + struct stasis_mwi_state *mwi_state; for (idx = 0; idx < ARRAY_LEN(pri->mbox); ++idx) { + RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); if (!pri->mbox[idx].sub) { /* Mailbox slot is empty */ continue; } - event = ast_event_get_cached(AST_EVENT_MWI, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, pri->mbox[idx].number, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, pri->mbox[idx].context, - AST_EVENT_IE_END); - if (!event) { + ast_str_reset(uniqueid); + ast_str_set(&uniqueid, 0, "%s@%s", pri->mbox[idx].number, pri->mbox[idx].context); + + msg = stasis_cache_get(stasis_mwi_topic_cached(), stasis_mwi_state_message(), ast_str_buffer(uniqueid)); + if (!msg) { /* No cached event for this mailbox. */ continue; } - num_messages = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS); + + mwi_state = stasis_message_data(msg); sig_pri_send_mwi_indication(pri, pri->mbox[idx].vm_number, pri->mbox[idx].number, - pri->mbox[idx].context, num_messages); - ast_event_destroy(event); + pri->mbox[idx].context, mwi_state->new_msgs); } } #endif /* defined(HAVE_PRI_MWI) */ @@ -8841,7 +8849,7 @@ void sig_pri_stop_pri(struct sig_pri_span *pri) #if defined(HAVE_PRI_MWI) for (idx = 0; idx < ARRAY_LEN(pri->mbox); ++idx) { if (pri->mbox[idx].sub) { - pri->mbox[idx].sub = ast_event_unsubscribe(pri->mbox[idx].sub); + pri->mbox[idx].sub = stasis_unsubscribe(pri->mbox[idx].sub); } } #endif /* defined(HAVE_PRI_MWI) */ @@ -8905,13 +8913,14 @@ int sig_pri_start_pri(struct sig_pri_span *pri) char *saveptr; char *prev_vm_number; struct ast_str *mwi_description = ast_str_alloca(64); + struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID); #endif /* defined(HAVE_PRI_MWI) */ #if defined(HAVE_PRI_MWI) /* Prepare the mbox[] for use. */ for (i = 0; i < ARRAY_LEN(pri->mbox); ++i) { if (pri->mbox[i].sub) { - pri->mbox[i].sub = ast_event_unsubscribe(pri->mbox[i].sub); + pri->mbox[i].sub = stasis_unsubscribe(pri->mbox[i].sub); } } #endif /* defined(HAVE_PRI_MWI) */ @@ -8951,6 +8960,7 @@ int sig_pri_start_pri(struct sig_pri_span *pri) for (i = 0; i < ARRAY_LEN(pri->mbox); ++i) { char *mbox_number; char *mbox_context; + struct stasis_topic *mailbox_specific_topic; mbox_number = strsep(&saveptr, ","); if (!mbox_number) { @@ -8976,13 +8986,17 @@ int sig_pri_start_pri(struct sig_pri_span *pri) /* Fill the mbox[] element. */ pri->mbox[i].number = mbox_number; pri->mbox[i].context = mbox_context; + + ast_str_reset(uniqueid); + ast_str_set(&uniqueid, 0, "%s@%s", mbox_number, mbox_context); + ast_str_set(&mwi_description, -1, "%s span %d[%d] MWI mailbox %s@%s", sig_pri_cc_type_name, pri->span, i, mbox_number, mbox_context); - pri->mbox[i].sub = ast_event_subscribe(AST_EVENT_MWI, sig_pri_mwi_event_cb, - ast_str_buffer(mwi_description), pri, - AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mbox_number, - AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, mbox_context, - AST_EVENT_IE_END); + + mailbox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid)); + if (mailbox_specific_topic) { + pri->mbox[i].sub = stasis_subscribe(mailbox_specific_topic, sig_pri_mwi_event_cb, pri); + } if (!pri->mbox[i].sub) { ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s@%s.", sig_pri_cc_type_name, pri->span, mbox_number, mbox_context); diff --git a/channels/sig_pri.h b/channels/sig_pri.h index 4de9077e7d2a0e83147c1cf4b3d776f26425500b..db052862de2d153416c295475135e5eedcfa6dae 100644 --- a/channels/sig_pri.h +++ b/channels/sig_pri.h @@ -405,7 +405,7 @@ struct sig_pri_mbox { * \brief MWI mailbox event subscription. * \note NULL if mailbox not configured. */ - struct ast_event_sub *sub; + struct stasis_subscription *sub; /*! \brief Mailbox number */ const char *number; /*! \brief Mailbox context. */ diff --git a/channels/sip/include/sip.h b/channels/sip/include/sip.h index e5177bd6a8b9de80b32cbafba92ec5a02afb4e94..6eb2f29a530832f95da92161e52db8b128b6cf56 100644 --- a/channels/sip/include/sip.h +++ b/channels/sip/include/sip.h @@ -1262,7 +1262,7 @@ struct sip_pkt { */ struct sip_mailbox { /*! Associated MWI subscription */ - struct ast_event_sub *event_sub; + struct stasis_subscription *event_sub; AST_LIST_ENTRY(sip_mailbox) entry; unsigned int delme:1; char *context; diff --git a/include/asterisk/app.h b/include/asterisk/app.h index cdc40e7d0a0c683db1bab1568b9667faa7a2b4dd..0505786e6a047db9b352b5577badcae910b95dc4 100644 --- a/include/asterisk/app.h +++ b/include/asterisk/app.h @@ -28,6 +28,7 @@ #include "asterisk/threadstorage.h" #include "asterisk/file.h" #include "asterisk/linkedlists.h" +#include "asterisk/utils.h" struct ast_flags64; @@ -1086,6 +1087,96 @@ void ast_safe_fork_cleanup(void); */ int ast_app_parse_timelen(const char *timestr, int *result, enum ast_timelen defunit); +/*! + * \brief Publish a MWI state update via stasis + * \param[in] uniqueid A unique identifier for this mailbox (usually mailbox@context) + * \param[in] mailbox The number identifying this mailbox + * \param[in] context The context this mailbox resides in + * \param[in] new_msgs The number of new messages in this mailbox + * \param[in] old_msgs The number of old messages in this mailbox + * \retval 0 Success + * \retval -1 Failure + * \since 12 + */ +#define stasis_publish_mwi_state(mailbox, context, new_msgs, old_msgs) \ + stasis_publish_mwi_state_full(mailbox, context, new_msgs, old_msgs, NULL) + +/*! + * \brief Publish a MWI state update via stasis with EID + * \param[in] uniqueid A unique identifier for this mailbox (usually mailbox@context) + * \param[in] mailbox The number identifying this mailbox + * \param[in] context The context this mailbox resides in + * \param[in] new_msgs The number of new messages in this mailbox + * \param[in] old_msgs The number of old messages in this mailbox + * \param[in] eid The EID of the server that originally published the message + * \retval 0 Success + * \retval -1 Failure + * \since 12 + */ +int stasis_publish_mwi_state_full( + const char *mailbox, + const char *context, + int new_msgs, + int old_msgs, + struct ast_eid *eid); + +/*! + * \brief The structure that contains MWI state + * \since 12 + */ +struct stasis_mwi_state { + AST_DECLARE_STRING_FIELDS( + AST_STRING_FIELD(uniqueid); /*!< Unique identifier for this mailbox/context */ + AST_STRING_FIELD(mailbox); /*!< Mailbox for this event */ + AST_STRING_FIELD(context); /*!< Context that this mailbox belongs to */ + ); + int new_msgs; /*!< The current number of new messages for this mailbox */ + int old_msgs; /*!< The current number of old messages for this mailbox */ + struct ast_eid eid; /*!< The EID of the server where this message originated */ +}; + +/*! + * \brief Get the Stasis topic for MWI messages + * \retval The topic structure for MWI messages + * \retval NULL if it has not been allocated + * \since 12 + */ +struct stasis_topic *stasis_mwi_topic_all(void); + +/*! + * \brief Get the Stasis topic for MWI messages on a unique ID + * \param uniqueid The unique id for which to get the topic + * \retval The topic structure for MWI messages for a given uniqueid + * \retval NULL if it failed to be found or allocated + * \since 12 + */ +struct stasis_topic *stasis_mwi_topic(const char *uniqueid); + +/*! + * \brief Get the Stasis caching topic for MWI messages + * \retval The caching topic structure for MWI messages + * \retval NULL if it has not been allocated + * \since 12 + */ +struct stasis_caching_topic *stasis_mwi_topic_cached(void); + +/*! + * \brief Get the Stasis message type for MWI messages + * \retval The message type structure for MWI messages + * \retval NULL if it has not been allocated + * \since 12 + */ +struct stasis_message_type *stasis_mwi_state_message(void); + +/*! + * \brief Initialize the application core + * \retval 0 Success + * \retval -1 Failure + * \since 12 + */ +int app_init(void); + +#define AST_MAX_MAILBOX_UNIQUEID (AST_MAX_EXTENSION + AST_MAX_CONTEXT + 2) #if defined(__cplusplus) || defined(c_plusplus) } #endif diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 9a5f7537c5d7c63c4bd937f245e861be3c78a625..f0d73fd7227860d60657ca4c62e5667c93824a3f 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -377,6 +377,27 @@ struct stasis_subscription_change { */ struct stasis_message_type *stasis_subscription_change(void); +/*! + * \brief Pool for topic aggregation + */ +struct stasis_topic_pool; + +/*! + * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic + * \param pooled_topic Topic to which messages will be routed + * \retval the new stasis_topic_pool or NULL on failure + */ +struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic); + +/*! + * \brief Find or create a topic in the pool + * \param pool Pool for which to get the topic + * \param topic_name Name of the topic to get + * \retval The already stored or newly allocated topic + * \retval NULL if the topic was not found and could not be allocated + */ +struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name); + /*! @} */ /*! @{ */ diff --git a/include/asterisk/xmpp.h b/include/asterisk/xmpp.h index 07abb6e67bb182baded24c81746671d0e31e6cf8..6833b6cce644d710d66e8d97a685f275d502cc78 100644 --- a/include/asterisk/xmpp.h +++ b/include/asterisk/xmpp.h @@ -134,7 +134,7 @@ struct ast_xmpp_client { pthread_t thread; int timeout; unsigned int reconnect:1; /*!< Reconnect this client */ - struct ast_event_sub *mwi_sub; /*!< If distributing event information the MWI subscription */ + struct stasis_subscription *mwi_sub; /*!< If distributing event information the MWI subscription */ struct ast_event_sub *device_state_sub; /*!< If distributing event information the device state subscription */ }; diff --git a/main/app.c b/main/app.c index 6db65f37117fb84ca99cad36d14cf59c51f85459..dca74849fdd1b34c33b4760db9531873715031d0 100644 --- a/main/app.c +++ b/main/app.c @@ -66,6 +66,10 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/threadstorage.h" #include "asterisk/test.h" #include "asterisk/module.h" +#include "asterisk/astobj2.h" +#include "asterisk/stasis.h" + +#define MWI_TOPIC_BUCKETS 57 AST_THREADSTORAGE_PUBLIC(ast_str_thread_global_buf); @@ -78,6 +82,11 @@ struct zombie { static AST_LIST_HEAD_STATIC(zombies, zombie); +static struct stasis_topic *mwi_topic_all; +static struct stasis_caching_topic *mwi_topic_cached; +static struct stasis_message_type *mwi_message_type; +static struct stasis_topic_pool *mwi_topic_pool; + static void *shaun_of_the_dead(void *data) { struct zombie *cur; @@ -2632,3 +2641,123 @@ int ast_app_parse_timelen(const char *timestr, int *result, enum ast_timelen uni return 0; } + + +static void mwi_state_dtor(void *obj) +{ + struct stasis_mwi_state *mwi_state = obj; + ast_string_field_free_memory(mwi_state); +} + +struct stasis_topic *stasis_mwi_topic_all(void) +{ + return mwi_topic_all; +} + +struct stasis_caching_topic *stasis_mwi_topic_cached(void) +{ + return mwi_topic_cached; +} + +struct stasis_message_type *stasis_mwi_state_message(void) +{ + return mwi_message_type; +} + +struct stasis_topic *stasis_mwi_topic(const char *uniqueid) +{ + return stasis_topic_pool_get_topic(mwi_topic_pool, uniqueid); +} + +int stasis_publish_mwi_state_full( + const char *mailbox, + const char *context, + int new_msgs, + int old_msgs, + struct ast_eid *eid) +{ + RAII_VAR(struct stasis_mwi_state *, mwi_state, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup); + struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID); + struct stasis_topic *mailbox_specific_topic; + + ast_assert(!ast_strlen_zero(mailbox)); + ast_assert(!ast_strlen_zero(context)); + + ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context); + + mwi_state = ao2_alloc(sizeof(*mwi_state), mwi_state_dtor); + if (ast_string_field_init(mwi_state, 256)) { + return -1; + } + + ast_string_field_set(mwi_state, uniqueid, ast_str_buffer(uniqueid)); + ast_string_field_set(mwi_state, mailbox, mailbox); + ast_string_field_set(mwi_state, context, context); + mwi_state->new_msgs = new_msgs; + mwi_state->old_msgs = old_msgs; + if (eid) { + mwi_state->eid = *eid; + } else { + ast_set_default_eid(&mwi_state->eid); + } + + message = stasis_message_create(stasis_mwi_state_message(), mwi_state); + + mailbox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid)); + if (!mailbox_specific_topic) { + return -1; + } + + stasis_publish(mailbox_specific_topic, message); + + return 0; +} + +static const char *mwi_state_get_id(struct stasis_message *message) +{ + if (stasis_mwi_state_message() == stasis_message_type(message)) { + struct stasis_mwi_state *mwi_state = stasis_message_data(message); + return mwi_state->uniqueid; + } else if (stasis_subscription_change() == stasis_message_type(message)) { + struct stasis_subscription_change *change = stasis_message_data(message); + return change->uniqueid; + } + + return NULL; +} + +static void app_exit(void) +{ + ao2_cleanup(mwi_topic_all); + mwi_topic_all = NULL; + mwi_topic_cached = stasis_caching_unsubscribe(mwi_topic_cached); + ao2_cleanup(mwi_message_type); + mwi_message_type = NULL; + ao2_cleanup(mwi_topic_pool); + mwi_topic_pool = NULL; +} + +int app_init(void) +{ + mwi_topic_all = stasis_topic_create("stasis_mwi_topic"); + if (!mwi_topic_all) { + return -1; + } + mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_get_id); + if (!mwi_topic_cached) { + return -1; + } + mwi_message_type = stasis_message_type_create("stasis_mwi_state"); + if (!mwi_message_type) { + return -1; + } + mwi_topic_pool = stasis_topic_pool_create(mwi_topic_all); + if (!mwi_topic_pool) { + return -1; + } + + ast_register_atexit(app_exit); + return 0; +} + diff --git a/main/asterisk.c b/main/asterisk.c index 4e5e58c056a8cd620ce79b9334f29d91d869a381..3a0e87c4125e202215d88fa4d1fd1d5d7dc9b971 100644 --- a/main/asterisk.c +++ b/main/asterisk.c @@ -4178,6 +4178,11 @@ int main(int argc, char *argv[]) aco_init(); + if (app_init()) { + printf("App core initialization failed.\n%s", term_quit()); + exit(1); + } + if (astdb_init()) { printf("%s", term_quit()); exit(1); diff --git a/main/channel.c b/main/channel.c index 3289edaa43eff4b8c7cf692d8c58cb786f01d394..3f8319b341b95064fb661e3e2496df57f6270aad 100644 --- a/main/channel.c +++ b/main/channel.c @@ -8637,8 +8637,7 @@ static void channels_shutdown(void) __channel_varset = NULL; ao2_cleanup(__channel_topic_all); __channel_topic_all = NULL; - stasis_caching_unsubscribe(__channel_topic_all_cached); - __channel_topic_all_cached = NULL; + __channel_topic_all_cached = stasis_caching_unsubscribe(__channel_topic_all_cached); ast_data_unregister(NULL); ast_cli_unregister_multiple(cli_channel, ARRAY_LEN(cli_channel)); if (channels) { diff --git a/main/stasis.c b/main/stasis.c index a4d44b8192263a304ed2a847db7eb7b23ff6077c..2ad0caf93b4699ce714a9ae6d1028042a9bbde45 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -41,6 +41,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") /*! Initial size of the subscribers list. */ #define INITIAL_SUBSCRIBERS_MAX 4 +/*! The number of buckets to use for topic pools */ +#define TOPIC_POOL_BUCKETS 57 + /*! Threadpool for dispatching notifications to subscribers */ static struct ast_threadpool *pool; @@ -470,6 +473,96 @@ static void send_subscription_change_message(struct stasis_topic *topic, char *u stasis_publish(topic, msg); } +struct topic_pool_entry { + struct stasis_subscription *forward; + struct stasis_topic *topic; +}; + +static void topic_pool_entry_dtor(void *obj) +{ + struct topic_pool_entry *entry = obj; + entry->forward = stasis_unsubscribe(entry->forward); + ao2_cleanup(entry->topic); + entry->topic = NULL; +} + +static struct topic_pool_entry *topic_pool_entry_alloc(void) +{ + return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor); +} + +struct stasis_topic_pool { + struct ao2_container *pool_container; + struct stasis_topic *pool_topic; +}; + +static void topic_pool_dtor(void *obj) +{ + struct stasis_topic_pool *pool = obj; + ao2_cleanup(pool->pool_container); + pool->pool_container = NULL; + ao2_cleanup(pool->pool_topic); + pool->pool_topic = NULL; +} + +static int topic_pool_entry_hash(const void *obj, const int flags) +{ + const char *topic_name= (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic); + return ast_str_case_hash(topic_name); +} + +static int topic_pool_entry_cmp(void *obj, void *arg, int flags) +{ + struct topic_pool_entry *opt1 = obj, *opt2 = arg; + const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic); + return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP; +} + +struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic) +{ + RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup); + if (!pool) { + return NULL; + } + pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp); + ao2_ref(pooled_topic, +1); + pool->pool_topic = pooled_topic; + + ao2_ref(pool, +1); + return pool; +} + +struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name) +{ + RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup); + SCOPED_AO2LOCK(topic_container_lock, pool->pool_container); + topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK); + + if (topic_pool_entry) { + return topic_pool_entry->topic; + } + + topic_pool_entry = topic_pool_entry_alloc(); + + if (!topic_pool_entry) { + return NULL; + } + + topic_pool_entry->topic = stasis_topic_create(topic_name); + if (!topic_pool_entry->topic) { + return NULL; + } + + topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic); + if (!topic_pool_entry->forward) { + return NULL; + } + + ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK); + + return topic_pool_entry->topic; +} + /*! \brief Cleanup function */ static void stasis_exit(void) { diff --git a/res/res_jabber.c b/res/res_jabber.c index e8e79051e4b4f3d00046102ade7aa6bedcaa6278..1f9ddbaa393a63851e8753e5931010f815aa1f0a 100644 --- a/res/res_jabber.c +++ b/res/res_jabber.c @@ -373,7 +373,7 @@ static void aji_pubsub_purge_nodes(struct aji_client *client, static void aji_publish_mwi(struct aji_client *client, const char *mailbox, const char *context, const char *oldmsgs, const char *newmsgs); static void aji_devstate_cb(const struct ast_event *ast_event, void *data); -static void aji_mwi_cb(const struct ast_event *ast_event, void *data); +static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg); static iks* aji_build_publish_skeleton(struct aji_client *client, const char *node, const char *event_type, unsigned int cachable); /* No transports in this version */ @@ -410,7 +410,7 @@ static char *app_ajileave = "JabberLeave"; static struct aji_client_container clients; static struct aji_capabilities *capabilities = NULL; -static struct ast_event_sub *mwi_sub = NULL; +static struct stasis_subscription *mwi_sub = NULL; static struct ast_event_sub *device_state_sub = NULL; static ast_cond_t message_received_condition; static ast_mutex_t messagelock; @@ -3240,30 +3240,33 @@ int ast_aji_disconnect(struct aji_client *client) * \param data void pointer to ast_client structure * \return void */ -static void aji_mwi_cb(const struct ast_event *ast_event, void *data) +static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { const char *mailbox; const char *context; char oldmsgs[10]; char newmsgs[10]; - struct aji_client *client; - if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) - { + struct aji_client *client = data; + struct stasis_mwi_state *mwi_state; + + if (!stasis_subscription_is_subscribed(sub) || stasis_mwi_state() != stasis_message_type(msg)) { + return; + } + + mwi_state = stasis_message_data(msg); + + if (ast_eid_cmp(&ast_eid_default, &mwi_state->eid)) { /* If the event didn't originate from this server, don't send it back out. */ - ast_debug(1, "Returning here\n"); return; } - client = ASTOBJ_REF((struct aji_client *) data); - mailbox = ast_event_get_ie_str(ast_event, AST_EVENT_IE_MAILBOX); - context = ast_event_get_ie_str(ast_event, AST_EVENT_IE_CONTEXT); + mailbox = mwi_state->mailbox; + context = mwi_state->context; snprintf(oldmsgs, sizeof(oldmsgs), "%d", - ast_event_get_ie_uint(ast_event, AST_EVENT_IE_OLDMSGS)); + mwi_state->old_msgs); snprintf(newmsgs, sizeof(newmsgs), "%d", - ast_event_get_ie_uint(ast_event, AST_EVENT_IE_NEWMSGS)); + mwi_state->new_msgs); aji_publish_mwi(client, mailbox, context, oldmsgs, newmsgs); - ASTOBJ_UNREF(client, ast_aji_client_destroy); - } /*! * \brief Callback function for device state events @@ -3300,8 +3303,7 @@ static void aji_devstate_cb(const struct ast_event *ast_event, void *data) static void aji_init_event_distribution(struct aji_client *client) { if (!mwi_sub) { - mwi_sub = ast_event_subscribe(AST_EVENT_MWI, aji_mwi_cb, "aji_mwi_subscription", - client, AST_EVENT_IE_END); + mwi_sub = stasis_subscribe(stasis_mwi_topic_all(), aji_mwi_cb, client); } if (!device_state_sub) { if (ast_enable_distributed_devstate()) { @@ -3364,14 +3366,10 @@ static int aji_handle_pubsub_event(void *data, ikspak *pak) context = strsep(&item_id, "@"); sscanf(iks_find_cdata(item_content, "OLDMSGS"), "%10d", &oldmsgs); sscanf(iks_find_cdata(item_content, "NEWMSGS"), "%10d", &newmsgs); - if (!(event = ast_event_new(AST_EVENT_MWI, AST_EVENT_IE_MAILBOX, - AST_EVENT_IE_PLTYPE_STR, item_id, AST_EVENT_IE_CONTEXT, - AST_EVENT_IE_PLTYPE_STR, context, AST_EVENT_IE_OLDMSGS, - AST_EVENT_IE_PLTYPE_UINT, oldmsgs, AST_EVENT_IE_NEWMSGS, - AST_EVENT_IE_PLTYPE_UINT, newmsgs, AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, - &pubsub_eid, sizeof(pubsub_eid), AST_EVENT_IE_END))) { - return IKS_FILTER_EAT; - } + + stasis_publish_mwi_state_full(item_id, context, newmsgs, oldmsgs, &pubsub_eid); + + return IKS_FILTER_EAT; } else { ast_debug(1, "Don't know how to handle PubSub event of type %s\n", iks_name(item_content)); @@ -4771,7 +4769,7 @@ static int unload_module(void) ast_manager_unregister("JabberSend"); ast_custom_function_unregister(&jabberstatus_function); if (mwi_sub) { - ast_event_unsubscribe(mwi_sub); + mwi_sub = stasis_unsubscribe(mwi_sub); } if (device_state_sub) { ast_event_unsubscribe(device_state_sub); diff --git a/res/res_xmpp.c b/res/res_xmpp.c index f2f200c9a9b7d9c2df0d72ad71f56e7f2495e4ff..1901aa25b5d103d8a295cf479ba98d8d7f435223 100644 --- a/res/res_xmpp.c +++ b/res/res_xmpp.c @@ -1319,24 +1319,30 @@ static void xmpp_pubsub_publish_device_state(struct ast_xmpp_client *client, con * \param data void pointer to ast_client structure * \return void */ -static void xmpp_pubsub_mwi_cb(const struct ast_event *ast_event, void *data) +static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg) { struct ast_xmpp_client *client = data; const char *mailbox, *context; char oldmsgs[10], newmsgs[10]; + struct stasis_mwi_state *mwi_state; - if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) { + if (!stasis_subscription_is_subscribed(sub) || stasis_mwi_state_message() != stasis_message_type(msg)) { + return; + } + + mwi_state = stasis_message_data(msg); + + if (ast_eid_cmp(&ast_eid_default, &mwi_state->eid)) { /* If the event didn't originate from this server, don't send it back out. */ - ast_debug(1, "Returning here\n"); return; } - mailbox = ast_event_get_ie_str(ast_event, AST_EVENT_IE_MAILBOX); - context = ast_event_get_ie_str(ast_event, AST_EVENT_IE_CONTEXT); + mailbox = mwi_state->mailbox; + context = mwi_state->context; snprintf(oldmsgs, sizeof(oldmsgs), "%d", - ast_event_get_ie_uint(ast_event, AST_EVENT_IE_OLDMSGS)); + mwi_state->old_msgs); snprintf(newmsgs, sizeof(newmsgs), "%d", - ast_event_get_ie_uint(ast_event, AST_EVENT_IE_NEWMSGS)); + mwi_state->new_msgs); xmpp_pubsub_publish_mwi(client, mailbox, context, oldmsgs, newmsgs); } @@ -1479,14 +1485,10 @@ static int xmpp_pubsub_handle_event(void *data, ikspak *pak) context = strsep(&item_id, "@"); sscanf(iks_find_cdata(item_content, "OLDMSGS"), "%10d", &oldmsgs); sscanf(iks_find_cdata(item_content, "NEWMSGS"), "%10d", &newmsgs); - if (!(event = ast_event_new(AST_EVENT_MWI, AST_EVENT_IE_MAILBOX, - AST_EVENT_IE_PLTYPE_STR, item_id, AST_EVENT_IE_CONTEXT, - AST_EVENT_IE_PLTYPE_STR, context, AST_EVENT_IE_OLDMSGS, - AST_EVENT_IE_PLTYPE_UINT, oldmsgs, AST_EVENT_IE_NEWMSGS, - AST_EVENT_IE_PLTYPE_UINT, newmsgs, AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, - &pubsub_eid, sizeof(pubsub_eid), AST_EVENT_IE_END))) { - return IKS_FILTER_EAT; - } + + stasis_publish_mwi_state_full(item_id, context, newmsgs, oldmsgs, &pubsub_eid); + + return IKS_FILTER_EAT; } else { ast_debug(1, "Don't know how to handle PubSub event of type %s\n", iks_name(item_content)); @@ -1587,20 +1589,17 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client) xmpp_pubsub_unsubscribe(client, "device_state"); xmpp_pubsub_unsubscribe(client, "message_waiting"); - if (!(client->mwi_sub = ast_event_subscribe(AST_EVENT_MWI, xmpp_pubsub_mwi_cb, "xmpp_pubsub_mwi_subscription", - client, AST_EVENT_IE_END))) { + if (!(client->mwi_sub = stasis_subscribe(stasis_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) { return; } if (ast_enable_distributed_devstate()) { return; } - if (!(client->device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE, xmpp_pubsub_devstate_cb, "xmpp_pubsub_devstate_subscription", client, AST_EVENT_IE_END))) { - ast_event_unsubscribe(client->mwi_sub); - client->mwi_sub = NULL; + client->mwi_sub = stasis_unsubscribe(client->mwi_sub); return; } @@ -3524,8 +3523,7 @@ int ast_xmpp_client_disconnect(struct ast_xmpp_client *client) } if (client->mwi_sub) { - ast_event_unsubscribe(client->mwi_sub); - client->mwi_sub = NULL; + client->mwi_sub = stasis_unsubscribe(client->mwi_sub); xmpp_pubsub_unsubscribe(client, "message_waiting"); }