diff --git a/apps/app_queue.c b/apps/app_queue.c index 3455cc748b744ea42b3bc7c88d6bf8f73a63b927..c63cd071ed0191c55ed99d5d95aa19d4766438c9 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -9866,9 +9866,7 @@ static int unload_module(void) res |= ast_data_unregister(NULL); - if (device_state_sub) { - device_state_sub = stasis_unsubscribe(device_state_sub); - } + device_state_sub = stasis_unsubscribe_and_join(device_state_sub); ast_extension_state_del(0, extension_state_cb); diff --git a/apps/app_voicemail.c b/apps/app_voicemail.c index 13bcd3ea57dbe881e93e26bc66d63359fd0f224e..b3ceeebc9c9c74bd311a8dd044b85cb14772214e 100644 --- a/apps/app_voicemail.c +++ b/apps/app_voicemail.c @@ -12689,9 +12689,7 @@ static void stop_poll_thread(void) { poll_thread_run = 0; - if (mwi_sub_sub) { - mwi_sub_sub = stasis_unsubscribe(mwi_sub_sub); - } + mwi_sub_sub = stasis_unsubscribe_and_join(mwi_sub_sub); ast_mutex_lock(&poll_lock); ast_cond_signal(&poll_cond); diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index eeffb6696eb2a4e4bcf3571bf45b9eb76d464264..112a99375366b9e2bdf954437a610d3c0a5a46ce 100644 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -1334,9 +1334,7 @@ static void network_change_stasis_subscribe(void) static void network_change_stasis_unsubscribe(void) { - if (network_change_sub) { - network_change_sub = stasis_unsubscribe(network_change_sub); - } + network_change_sub = stasis_unsubscribe_and_join(network_change_sub); } static void acl_change_stasis_subscribe(void) @@ -1349,9 +1347,7 @@ static void acl_change_stasis_subscribe(void) static void acl_change_stasis_unsubscribe(void) { - if (acl_change_sub) { - acl_change_sub = stasis_unsubscribe(acl_change_sub); - } + acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub); } static int network_change_sched_cb(const void *data) @@ -12424,9 +12420,7 @@ static void peer_destructor(void *obj) if (peer->dnsmgr) ast_dnsmgr_release(peer->dnsmgr); - if (peer->mwi_event_sub) { - peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub); - } + peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub); ast_string_field_free_memory(peer); } diff --git a/channels/chan_sip.c b/channels/chan_sip.c index 937acb94ebcf55cbe3badc5635422222f74b5bd0..88965fc73a94f01de15f60c534d55ce3023973f4 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -16742,25 +16742,21 @@ static void network_change_stasis_subscribe(void) static void network_change_stasis_unsubscribe(void) { - if (network_change_sub) { - network_change_sub = stasis_unsubscribe(network_change_sub); - } + network_change_sub = stasis_unsubscribe_and_join(network_change_sub); } static void acl_change_stasis_subscribe(void) { if (!acl_change_sub) { acl_change_sub = stasis_subscribe(ast_security_topic(), - acl_change_stasis_cb, NULL); + acl_change_stasis_cb, NULL); } } static void acl_change_event_stasis_unsubscribe(void) { - if (acl_change_sub) { - acl_change_sub = stasis_unsubscribe(acl_change_sub); - } + acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub); } static int network_change_sched_cb(const void *data) diff --git a/funcs/func_presencestate.c b/funcs/func_presencestate.c index 01e6d09c2c03f06c59524178b6412b453f5c2798..3bf4a81b31370b38f845b3ef36443b5db24ca9f2 100644 --- a/funcs/func_presencestate.c +++ b/funcs/func_presencestate.c @@ -706,7 +706,7 @@ AST_TEST_DEFINE(test_presence_state_change) return AST_TEST_FAIL; } - test_sub = stasis_unsubscribe(test_sub); + test_sub = stasis_unsubscribe_and_join(test_sub); ao2_cleanup(cb_data->presence_state); ast_free((char *)cb_data); diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 7e46dbf412436025ae793e34d81953092aaa5564..e6ea6fa1306ce87bc4c3dddb78ec561d62a63378 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -124,6 +124,34 @@ * stasis_subscription. Due to cyclic references, the \ref * stasis_subscription will not be freed until after it has been unsubscribed, * and all other ao2_ref()'s have been cleaned up. + * + * \par Shutdown + * + * Subscriptions have two options for unsubscribing, depending upon the context + * in which you need to unsubscribe. + * + * If your subscription is owned by a module, and you must unsubscribe from the + * module_unload() function, then you'll want to use the + * stasis_unsubscribe_and_join() function. This will block until the final + * message has been received on the subscription. Otherwise, there's the danger + * of invoking the callback function after it has been unloaded. + * + * If your subscription is owned by an object, then your object should have an + * explicit shutdown() function, which calls stasis_unsubscribe(). In your + * subscription handler, when the stasis_subscription_final_message() has been + * received, decrement the refcount on your object. In your object's destructor, + * you may assert that stasis_subscription_is_done() to validate that the + * subscription's callback will no longer be invoked. + * + * \b Note: You may be tempted to simply call stasis_unsubscribe_and_join() from + * an object's destructor. While code that does this may work most of the time, + * it's got one big downside. There's a general assumption that object + * destruction is non-blocking. If you block the destruction waiting for the + * subscription to complete, there's the danger that the subscription may + * process a message which will bump the refcount up by one. Then it does + * whatever it does, decrements the refcount, which then proceeds to re-destroy + * the object. Now you've got hard to reproduce bugs that only show up under + * certain loads. */ #include "asterisk/utils.h" @@ -292,8 +320,7 @@ typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *s * \since 12 */ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, - stasis_subscription_cb callback, - void *data); + stasis_subscription_cb callback, void *data); /*! * \brief Cancel a subscription. @@ -304,10 +331,52 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, * delivery of the final message. * * \param subscription Subscription to cancel. - * \retval NULL for convenience + * \return \c NULL for convenience + * \since 12 + */ +struct stasis_subscription *stasis_unsubscribe( + struct stasis_subscription *subscription); + +/*! + * \brief Block until the last message is processed on a subscription. + * + * This function will not return until the \a subscription's callback for the + * stasis_subscription_final_message() completes. This allows cleanup routines + * to run before unblocking the joining thread. + * + * \param subscription Subscription to block on. + * \since 12 + */ +void stasis_subscription_join(struct stasis_subscription *subscription); + +/*! + * \brief Returns whether \a subscription has received its final message. + * + * Note that a subscription is considered done even while the + * stasis_subscription_final_message() is being processed. This allows cleanup + * routines to check the status of the subscription. + * + * \param subscription Subscription. + * \return True (non-zero) if stasis_subscription_final_message() has been + * received. + * \return False (zero) if waiting for the end. + */ +int stasis_subscription_is_done(struct stasis_subscription *subscription); + +/*! + * \brief Cancel a subscription, blocking until the last message is processed. + * + * While normally it's recommended to stasis_unsubscribe() and wait for + * stasis_subscription_final_message(), there are times (like during a module + * unload) where you have to wait for the final message (otherwise you'll call + * a function in a shared module that no longer exists). + * + * \param subscription Subscription to cancel. + * \return \c NULL for convenience * \since 12 */ -struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subscription); +struct stasis_subscription *stasis_unsubscribe_and_join( + struct stasis_subscription *subscription); /*! * \brief Create a subscription which forwards all messages from one topic to @@ -322,7 +391,8 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *subsc * \return \c NULL on error. * \since 12 */ -struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic); +struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, + struct stasis_topic *to_topic); /*! * \brief Get the unique ID for the subscription. @@ -389,7 +459,8 @@ struct stasis_topic_pool; /*! * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic * \param pooled_topic Topic to which messages will be routed - * \retval the new stasis_topic_pool or NULL on failure + * \return the new stasis_topic_pool + * \return \c NULL on failure */ struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic); @@ -397,8 +468,8 @@ struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_t * \brief Find or create a topic in the pool * \param pool Pool for which to get the topic * \param topic_name Name of the topic to get - * \retval The already stored or newly allocated topic - * \retval NULL if the topic was not found and could not be allocated + * \return The already stored or newly allocated topic + * \return \c NULL if the topic was not found and could not be allocated */ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name); @@ -496,12 +567,31 @@ typedef const char *(*snapshot_get_id)(struct stasis_message *message); struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn); /*! - * Unsubscribes a caching topic from its upstream topic. + * \brief Unsubscribes a caching topic from its upstream topic. + * + * This function returns immediately, so be sure to cleanup when + * stasis_subscription_final_message() is received. + * + * \param caching_topic Caching topic to unsubscribe + * \return \c NULL for convenience + * \since 12 + */ +struct stasis_caching_topic *stasis_caching_unsubscribe( + struct stasis_caching_topic *caching_topic); + +/*! + * \brief Unsubscribes a caching topic from its upstream topic, blocking until + * all messages have been forwarded. + * + * See stasis_unsubscriben_and_join() for more info on when to use this as + * opposed to stasis_caching_unsubscribe(). + * * \param caching_topic Caching topic to unsubscribe - * \retval NULL for convenience + * \return \c NULL for convenience * \since 12 */ -struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic); +struct stasis_caching_topic *stasis_caching_unsubscribe_and_join( + struct stasis_caching_topic *caching_topic); /*! * \brief Returns the topic of cached events from a caching topics. @@ -530,9 +620,9 @@ struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_top /*! * \brief Dump cached items to a subscription * \param caching_topic The topic returned from stasis_caching_topic_create(). - * \param type Type of message to dump (any type if NULL). + * \param type Type of message to dump (any type if \c NULL). * \return ao2_container containing all matches (must be unreffed by caller) - * \return NULL on allocation error + * \return \c NULL on allocation error * \since 12 */ struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index 42770d293eaf7dbf77221cbaf7e47a08017986d5..e7d5a4cc65f252a3cb5788d8e6fd7ff2f4a405e7 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -57,11 +57,35 @@ struct stasis_message_router *stasis_message_router_create( /*! * \brief Unsubscribe the router from the upstream topic. + * * \param router Router to unsubscribe. * \since 12 */ void stasis_message_router_unsubscribe(struct stasis_message_router *router); +/*! + * \brief Unsubscribe the router from the upstream topic, blocking until the + * final message has been processed. + * + * See stasis_unsubscribe_and_join() for info on when to use this + * vs. stasis_message_router_unsubscribe(). + * + * \param router Router to unsubscribe. + * \since 12 + */ +void stasis_message_router_unsubscribe_and_join( + struct stasis_message_router *router); + +/*! + * \brief Returns whether \a router has received its final message. + * + * \param router Router. + * \return True (non-zero) if stasis_subscription_final_message() has been + * received. + * \return False (zero) if waiting for the end. + */ +int stasis_message_router_is_done(struct stasis_message_router *router); + /*! * \brief Add a route to a message router. * \param router Router to add the route to. diff --git a/main/app.c b/main/app.c index 0cdd9d31de8e2f79dac12146ae769a5be76bd502..0e8a68f7baa012e93376554c7d8e8f5104dd92e1 100644 --- a/main/app.c +++ b/main/app.c @@ -2727,7 +2727,7 @@ static void app_exit(void) { ao2_cleanup(mwi_topic_all); mwi_topic_all = NULL; - mwi_topic_cached = stasis_caching_unsubscribe(mwi_topic_cached); + mwi_topic_cached = stasis_caching_unsubscribe_and_join(mwi_topic_cached); STASIS_MESSAGE_TYPE_CLEANUP(stasis_mwi_state_type); ao2_cleanup(mwi_topic_pool); mwi_topic_pool = NULL; diff --git a/main/devicestate.c b/main/devicestate.c index aa31dbfd6f80ac260296a57b9f02d909431dd3a7..afa9621d31e8048882d4e8939f79e60c9b3b6266 100644 --- a/main/devicestate.c +++ b/main/devicestate.c @@ -776,7 +776,7 @@ static void devstate_exit(void) { ao2_cleanup(device_state_topic_all); device_state_topic_all = NULL; - device_state_topic_cached = stasis_caching_unsubscribe(device_state_topic_cached); + device_state_topic_cached = stasis_caching_unsubscribe_and_join(device_state_topic_cached); STASIS_MESSAGE_TYPE_CLEANUP(ast_device_state_message_type); ao2_cleanup(device_state_topic_pool); device_state_topic_pool = NULL; diff --git a/main/endpoints.c b/main/endpoints.c index 95397f9602ae4b6ba9c4ed215c75dfb135bf52ba..c2d0577f977613bf21313cd64e561a5220b5c7ac 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -106,7 +106,9 @@ static void endpoint_dtor(void *obj) struct ast_endpoint *endpoint = obj; /* The router should be shut down already */ - ast_assert(endpoint->router == NULL); + ast_assert(stasis_message_router_is_done(endpoint->router)); + ao2_cleanup(endpoint->router); + endpoint->router = NULL; stasis_unsubscribe(endpoint->forward); endpoint->forward = NULL; @@ -258,8 +260,9 @@ void ast_endpoint_shutdown(struct ast_endpoint *endpoint) stasis_publish(endpoint->topic, message); } + /* Bump refcount to hold on to the router */ + ao2_ref(endpoint->router, +1); stasis_message_router_unsubscribe(endpoint->router); - endpoint->router = NULL; } const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint) diff --git a/main/manager.c b/main/manager.c index 4d2923eb5dc78a692c36126afafe8cea40e56a44..c9b2fbe1e1a74c9a2190f591d7e741586a5e9bf8 100644 --- a/main/manager.c +++ b/main/manager.c @@ -1077,9 +1077,7 @@ static void acl_change_stasis_subscribe(void) static void acl_change_stasis_unsubscribe(void) { - if (acl_change_sub) { - acl_change_sub = stasis_unsubscribe(acl_change_sub); - } + acl_change_sub = stasis_unsubscribe_and_join(acl_change_sub); } /* In order to understand what the heck is going on with the diff --git a/main/manager_channels.c b/main/manager_channels.c index 63380a762c1a8e746240956d12d185d9db26423f..e1f918868913b4636994009228b520e04c91c361 100644 --- a/main/manager_channels.c +++ b/main/manager_channels.c @@ -805,7 +805,7 @@ static void channel_dial_cb(void *data, struct stasis_subscription *sub, static void manager_channels_shutdown(void) { - stasis_message_router_unsubscribe(channel_state_router); + stasis_message_router_unsubscribe_and_join(channel_state_router); channel_state_router = NULL; } diff --git a/main/pbx.c b/main/pbx.c index 74aeee928aaf05475b752bf250be66d9a61222f9..9492d955947c8b5713a0ed07d28457efad065c45 100644 --- a/main/pbx.c +++ b/main/pbx.c @@ -11700,12 +11700,8 @@ static void unload_pbx(void) { int x; - if (presence_state_sub) { - presence_state_sub = stasis_unsubscribe(presence_state_sub); - } - if (device_state_sub) { - device_state_sub = stasis_unsubscribe(device_state_sub); - } + presence_state_sub = stasis_unsubscribe_and_join(presence_state_sub); + device_state_sub = stasis_unsubscribe_and_join(device_state_sub); /* Unregister builtin applications */ for (x = 0; x < ARRAY_LEN(builtins); x++) { diff --git a/main/stasis.c b/main/stasis.c index 98dff95a6b35a42937e285adcd830f46a9a61ffb..d0ded401c6619755c3f8173218d5369f982e3926 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -114,16 +114,30 @@ struct stasis_subscription { stasis_subscription_cb callback; /*! Data pointer to be handed to the callback. */ void *data; + + /*! Lock for joining with subscription. */ + ast_mutex_t join_lock; + /*! Condition for joining with subscription. */ + ast_cond_t join_cond; + /*! Flag set when final message for sub has been received. + * Be sure join_lock is held before reading/setting. */ + int final_message_rxed; + /*! Flag set when final message for sub has been processed. + * Be sure join_lock is held before reading/setting. */ + int final_message_processed; }; static void subscription_dtor(void *obj) { struct stasis_subscription *sub = obj; ast_assert(!stasis_subscription_is_subscribed(sub)); + ast_assert(stasis_subscription_is_done(sub)); ao2_cleanup(sub->topic); sub->topic = NULL; ast_taskprocessor_unreference(sub->mailbox); sub->mailbox = NULL; + ast_mutex_destroy(&sub->join_lock); + ast_cond_destroy(&sub->join_cond); } /*! @@ -136,11 +150,22 @@ static void subscription_invoke(struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message) { - /* Since sub->topic doesn't change, no need to lock sub */ - sub->callback(sub->data, - sub, - topic, - message); + /* Notify that the final message has been received */ + if (stasis_subscription_final_message(sub, message)) { + SCOPED_MUTEX(lock, &sub->join_lock); + sub->final_message_rxed = 1; + ast_cond_signal(&sub->join_cond); + } + + /* Since sub is mostly immutable, no need to lock sub */ + sub->callback(sub->data, sub, topic, message); + + /* Notify that the final message has been processed */ + if (stasis_subscription_final_message(sub, message)) { + SCOPED_MUTEX(lock, &sub->join_lock); + sub->final_message_processed = 1; + ast_cond_signal(&sub->join_cond); + } } static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description); @@ -171,6 +196,8 @@ static struct stasis_subscription *__stasis_subscribe( sub->topic = topic; sub->callback = callback; sub->data = data; + ast_mutex_init(&sub->join_lock); + ast_cond_init(&sub->join_cond, NULL); if (topic_add_subscription(topic, sub) != 0) { return NULL; @@ -212,6 +239,50 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) return NULL; } +/*! + * \brief Block until the final message has been received on a subscription. + * + * \param subscription Subscription to wait on. + */ +void stasis_subscription_join(struct stasis_subscription *subscription) +{ + if (subscription) { + SCOPED_MUTEX(lock, &subscription->join_lock); + /* Wait until the processed flag has been set */ + while (!subscription->final_message_processed) { + ast_cond_wait(&subscription->join_cond, + &subscription->join_lock); + } + } +} + +int stasis_subscription_is_done(struct stasis_subscription *subscription) +{ + if (subscription) { + SCOPED_MUTEX(lock, &subscription->join_lock); + return subscription->final_message_rxed; + } + + /* Null subscription is about as done as you can get */ + return 1; +} + +struct stasis_subscription *stasis_unsubscribe_and_join( + struct stasis_subscription *subscription) +{ + if (!subscription) { + return NULL; + } + + /* Bump refcount to hold it past the unsubscribe */ + ao2_ref(subscription, +1); + stasis_unsubscribe(subscription); + stasis_subscription_join(subscription); + /* Now decrement the refcount back */ + ao2_cleanup(subscription); + return NULL; +} + int stasis_subscription_is_subscribed(const struct stasis_subscription *sub) { if (sub) { diff --git a/main/stasis_cache.c b/main/stasis_cache.c index 75e6b5f956e53841ccf37c453f94885e7e19f6f0..154b4f020a93f04d556fd8f9fecdfa5fb7350492 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -53,6 +53,8 @@ struct stasis_caching_topic { static void stasis_caching_topic_dtor(void *obj) { struct stasis_caching_topic *caching_topic = obj; ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub)); + ast_assert(stasis_subscription_is_done(caching_topic->sub)); + ao2_cleanup(caching_topic->sub); caching_topic->sub = NULL; ao2_cleanup(caching_topic->cache); caching_topic->cache = NULL; @@ -69,6 +71,9 @@ struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_to { if (caching_topic) { if (stasis_subscription_is_subscribed(caching_topic->sub)) { + /* Increment the reference to hold on to it past the + * unsubscribe */ + ao2_ref(caching_topic->sub, +1); stasis_unsubscribe(caching_topic->sub); } else { ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n"); @@ -77,6 +82,20 @@ struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_to return NULL; } +struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic) +{ + if (!caching_topic) { + return NULL; + } + + /* Hold a ref past the unsubscribe */ + ao2_ref(caching_topic, +1); + stasis_caching_unsubscribe(caching_topic); + stasis_subscription_join(caching_topic->sub); + ao2_cleanup(caching_topic); + return NULL; +} + struct cache_entry { struct stasis_message_type *type; char *id; diff --git a/main/stasis_channels.c b/main/stasis_channels.c index 3df52d0b3702235a634939731bf434a0fe1d234b..95f5f9d0e95ad432bc8877b37c15c98437216741 100644 --- a/main/stasis_channels.c +++ b/main/stasis_channels.c @@ -505,6 +505,9 @@ int ast_channel_snapshot_caller_id_equal( void ast_stasis_channels_shutdown(void) { + channel_topic_all_cached = stasis_caching_unsubscribe_and_join(channel_topic_all_cached); + ao2_cleanup(channel_topic_all); + channel_topic_all = NULL; STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type); @@ -512,9 +515,6 @@ void ast_stasis_channels_shutdown(void) STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type); STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type); - ao2_cleanup(channel_topic_all); - channel_topic_all = NULL; - channel_topic_all_cached = stasis_caching_unsubscribe(channel_topic_all_cached); } void ast_stasis_channels_init(void) diff --git a/main/stasis_endpoints.c b/main/stasis_endpoints.c index 7428e2cf1f01c9c9a9e9266e79fb02d9ddafa2a2..252614f629aadf7afb9d68d64202f8481c94f5bb 100644 --- a/main/stasis_endpoints.c +++ b/main/stasis_endpoints.c @@ -101,15 +101,6 @@ static const char *endpoint_snapshot_get_id(struct stasis_message *message) } -static void endpoints_stasis_shutdown(void) -{ - ao2_cleanup(endpoint_topic_all); - endpoint_topic_all = NULL; - - stasis_caching_unsubscribe(endpoint_topic_all_cached); - endpoint_topic_all_cached = NULL; -} - struct ast_json *ast_endpoint_snapshot_to_json( const struct ast_endpoint_snapshot *snapshot) { @@ -149,6 +140,15 @@ struct ast_json *ast_endpoint_snapshot_to_json( return ast_json_ref(json); } +static void endpoints_stasis_shutdown(void) +{ + stasis_caching_unsubscribe_and_join(endpoint_topic_all_cached); + endpoint_topic_all_cached = NULL; + + ao2_cleanup(endpoint_topic_all); + endpoint_topic_all = NULL; +} + int ast_endpoint_stasis_init(void) { ast_register_atexit(endpoints_stasis_shutdown); diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 97ed7ad92c343bf37c27707958f7ca3b7f2b3621..c7acca1ffd795f0f58e7b85a3738ddea112c0321 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -74,6 +74,7 @@ static void router_dtor(void *obj) size_t i; ast_assert(!stasis_subscription_is_subscribed(router->subscription)); + ast_assert(stasis_subscription_is_done(router->subscription)); router->subscription = NULL; for (i = 0; i < router->num_routes_current; ++i) { ao2_cleanup(router->routes[i]); @@ -165,6 +166,26 @@ void stasis_message_router_unsubscribe(struct stasis_message_router *router) stasis_unsubscribe(router->subscription); } +void stasis_message_router_unsubscribe_and_join( + struct stasis_message_router *router) +{ + if (!router) { + return; + } + stasis_unsubscribe_and_join(router->subscription); +} + +int stasis_message_router_is_done(struct stasis_message_router *router) +{ + if (!router) { + /* Null router is about as done as you can get */ + return 1; + } + + return stasis_subscription_is_done(router->subscription); +} + + static struct stasis_message_route *route_create( struct stasis_message_type *message_type, stasis_subscription_cb callback, diff --git a/res/res_chan_stats.c b/res/res_chan_stats.c index f5f2267aa52e38c3c1f610a8a5568fa3c67224ce..d54ba2f0b3bf002f771b8cd73c9127eab4d692ce 100644 --- a/res/res_chan_stats.c +++ b/res/res_chan_stats.c @@ -172,9 +172,9 @@ static int load_module(void) static int unload_module(void) { - stasis_unsubscribe(sub); + stasis_unsubscribe_and_join(sub); sub = NULL; - stasis_message_router_unsubscribe(router); + stasis_message_router_unsubscribe_and_join(router); router = NULL; return 0; } diff --git a/res/res_jabber.c b/res/res_jabber.c index 69f3665228418103df23519b57c88c1933fa2a33..2070c8024c8f2aafe6015e3120a560da914dc3e0 100644 --- a/res/res_jabber.c +++ b/res/res_jabber.c @@ -4770,12 +4770,8 @@ static int unload_module(void) ast_unregister_application(app_ajileave); ast_manager_unregister("JabberSend"); ast_custom_function_unregister(&jabberstatus_function); - if (mwi_sub) { - mwi_sub = stasis_unsubscribe(mwi_sub); - } - if (device_state_sub) { - device_state_sub = stasis_unsubscribe(device_state_sub); - } + mwi_sub = stasis_unsubscribe_and_join(mwi_sub); + device_state_sub = stasis_unsubscribe_and_join(device_state_sub); ast_custom_function_unregister(&jabberreceive_function); ASTOBJ_CONTAINER_TRAVERSE(&clients, 1, { diff --git a/res/res_stasis.c b/res/res_stasis.c index 5327ca4acf65c4e5228460c9b53acd3cc1d5c79f..e9931a09a71110bebae761b184387e5d739b08a8 100644 --- a/res/res_stasis.c +++ b/res/res_stasis.c @@ -722,7 +722,7 @@ static int unload_module(void) { int r = 0; - stasis_message_router_unsubscribe(channel_router); + stasis_message_router_unsubscribe_and_join(channel_router); channel_router = NULL; ao2_cleanup(apps_registry);