diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 6bc5171e00f5efff8e2b7837da3c770967ce2f59..1a420da2fc7d47d008be55037cd1bab2cc5cd34e 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -186,6 +186,12 @@ struct stasis_message_type; */ struct stasis_message; +/*! + * \brief Opaque type for a Stasis subscription. + * \since 12 + */ +struct stasis_subscription; + /*! * \brief Structure containing callbacks for Stasis message sanitization * @@ -377,20 +383,35 @@ const char *stasis_topic_name(const struct stasis_topic *topic); * \brief Publish a message to a topic's subscribers. * \param topic Topic. * \param message Message to publish. + * + * This call is asynchronous and will return immediately upon queueing + * the message for delivery to the topic's subscribers. + * * \since 12 */ void stasis_publish(struct stasis_topic *topic, struct stasis_message *message); +/*! + * \brief Publish a message to a topic's subscribers, synchronizing + * on the specified subscriber + * \param sub Subscription to synchronize on. + * \param message Message to publish. + * + * The caller of stasis_publish_sync will block until the specified + * subscriber completes handling of the message. + * + * All other subscribers to the topic the \ref stasis_subpscription + * is subscribed to are also delivered the message; this delivery however + * happens asynchronously. + * + * \since 12.1.0 + */ +void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message); + /*! @} */ /*! @{ */ -/*! - * \brief Opaque type for a Stasis subscription. - * \since 12 - */ -struct stasis_subscription; - /*! * \brief Callback function type for Stasis subscriptions. * \param data Data field provided with subscription. diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index 3209adb16778de3768a9c4082d226192b964ed44..613a2bd7f0469a2a98f77084898ea3a2362eb514 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -92,6 +92,24 @@ void stasis_message_router_unsubscribe_and_join( */ int stasis_message_router_is_done(struct stasis_message_router *router); +/*! + * \brief Publish a message to a message router's subscription synchronously + * + * \param router Router + * \param message The \ref stasis message + * + * This should be used when a message needs to be published synchronously to + * the underlying subscription created by a message router. This is analagous + * to \ref stasis_publish_sync. + * + * Note that the caller will be blocked until the thread servicing the message + * on the message router's subscription completes handling of the message. + * + * \since 12.1.0 + */ +void stasis_message_router_publish_sync(struct stasis_message_router *router, + struct stasis_message *message); + /*! * \brief Add a route to a message router. * diff --git a/main/stasis.c b/main/stasis.c index 587a6a85e33ba04dd63eded6baabddd16ddf43a9..0a5db2f16f4073579e552611df0ed59c780a4472 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -505,11 +505,11 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s } /*! - * \brief Dispatch a message to a subscriber - * \param data \ref dispatch object + * \internal \brief Dispatch a message to a subscriber asynchronously + * \param local \ref ast_taskprocessor_local object * \return 0 */ -static int dispatch_exec(struct ast_taskprocessor_local *local) +static int dispatch_exec_async(struct ast_taskprocessor_local *local) { struct stasis_subscription *sub = local->local_data; struct stasis_message *message = local->data; @@ -520,23 +520,105 @@ static int dispatch_exec(struct ast_taskprocessor_local *local) return 0; } +/*! + * \internal \brief Data passed to \ref dispatch_exec_sync to synchronize + * a published message to a subscriber + */ +struct sync_task_data { + ast_mutex_t lock; + ast_cond_t cond; + int complete; + void *task_data; +}; + +/*! + * \internal \brief Dispatch a message to a subscriber synchronously + * \param local \ref ast_taskprocessor_local object + * \return 0 + */ +static int dispatch_exec_sync(struct ast_taskprocessor_local *local) +{ + struct stasis_subscription *sub = local->local_data; + struct sync_task_data *std = local->data; + struct stasis_message *message = std->task_data; + + subscription_invoke(sub, message); + ao2_cleanup(message); + + ast_mutex_lock(&std->lock); + std->complete = 1; + ast_cond_signal(&std->cond); + ast_mutex_unlock(&std->lock); + + return 0; +} + +/*! + * \internal \brief Dispatch a message to a subscriber + * \param sub The subscriber to dispatch to + * \param message The message to send + * \param synchronous If non-zero, synchronize on the subscriber receiving + * the message + */ static void dispatch_message(struct stasis_subscription *sub, - struct stasis_message *message) + struct stasis_message *message, + int synchronous) { - if (sub->mailbox) { - ao2_bump(message); - if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) { + if (!sub->mailbox) { + /* Dispatch directly */ + subscription_invoke(sub, message); + return; + } + + /* Bump the message for the taskprocessor push. This will get de-ref'd + * by the task processor callback. + */ + ao2_bump(message); + if (!synchronous) { + if (ast_taskprocessor_push_local(sub->mailbox, + dispatch_exec_async, + message) != 0) { /* Push failed; ugh. */ - ast_log(LOG_ERROR, "Dropping dispatch\n"); + ast_log(LOG_ERROR, "Dropping async dispatch\n"); ao2_cleanup(message); } } else { - /* Dispatch directly */ - subscription_invoke(sub, message); + struct sync_task_data std; + + ast_mutex_init(&std.lock); + ast_cond_init(&std.cond, NULL); + std.complete = 0; + std.task_data = message; + + if (ast_taskprocessor_push_local(sub->mailbox, + dispatch_exec_sync, + &std)) { + /* Push failed; ugh. */ + ast_log(LOG_ERROR, "Dropping sync dispatch\n"); + ao2_cleanup(message); + return; + } + + ast_mutex_lock(&std.lock); + while (!std.complete) { + ast_cond_wait(&std.cond, &std.lock); + } + ast_mutex_unlock(&std.lock); + + ast_mutex_destroy(&std.lock); + ast_cond_destroy(&std.cond); } } -void stasis_publish(struct stasis_topic *topic, struct stasis_message *message) +/*! + * \internal \brief Publish a message to a topic's subscribers + * \brief topic The topic to publish to + * \brief message The message to publish + * \brief sync_sub An optional subscriber of the topic to publish synchronously + * to + */ +static void publish_msg(struct stasis_topic *topic, + struct stasis_message *message, struct stasis_subscription *sync_sub) { size_t i; @@ -554,12 +636,24 @@ void stasis_publish(struct stasis_topic *topic, struct stasis_message *message) ast_assert(sub != NULL); - dispatch_message(sub, message); + dispatch_message(sub, message, (sub == sync_sub)); } ao2_unlock(topic); ao2_ref(topic, -1); } +void stasis_publish(struct stasis_topic *topic, struct stasis_message *message) +{ + publish_msg(topic, message, NULL); +} + +void stasis_publish_sync(struct stasis_subscription *sub, struct stasis_message *message) +{ + ast_assert(sub != NULL); + + publish_msg(sub->topic, message, sub); +} + /*! * \brief Forwarding information * @@ -721,7 +815,7 @@ static void send_subscription_unsubscribe(struct stasis_topic *topic, stasis_publish(topic, msg); /* Now we have to dispatch to the subscription itself */ - dispatch_message(sub, msg); + dispatch_message(sub, msg, 0); } struct topic_pool_entry { diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index ec0448befb4772a6e3bf94318766c8488c51b566..41495db11f66fab2b18395293ec50077c796691c 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -261,6 +261,16 @@ int stasis_message_router_is_done(struct stasis_message_router *router) return stasis_subscription_is_done(router->subscription); } +void stasis_message_router_publish_sync(struct stasis_message_router *router, + struct stasis_message *message) +{ + ast_assert(router != NULL); + + ao2_bump(router); + stasis_publish_sync(router->subscription, message); + ao2_cleanup(router); +} + int stasis_message_router_add(struct stasis_message_router *router, struct stasis_message_type *message_type, stasis_subscription_cb callback, void *data) diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 48d5a6c2a25fa1b813b2b5ed6f8d8697ceb8f637..0cfce2c3f99f64c26976e48d1133b626a79afd35 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -206,6 +206,27 @@ static void consumer_exec(void *data, struct stasis_subscription *sub, struct st ast_cond_signal(&consumer->out); } +static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message) +{ + struct consumer *consumer = data; + RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup); + SCOPED_MUTEX(lock, &consumer->lock); + + if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) { + + ++consumer->messages_rxed_len; + consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len); + ast_assert(consumer->messages_rxed != NULL); + consumer->messages_rxed[consumer->messages_rxed_len - 1] = message; + ao2_ref(message, +1); + } + + if (stasis_subscription_final_message(sub, message)) { + consumer->complete = 1; + consumer_needs_cleanup = consumer; + } +} + static int consumer_wait_for(struct consumer *consumer, size_t expected_len) { struct timeval start = ast_tvnow(); @@ -341,8 +362,8 @@ AST_TEST_DEFINE(publish) case TEST_INIT: info->name = __func__; info->category = test_category; - info->summary = "Test simple subscriptions"; - info->description = "Test simple subscriptions"; + info->summary = "Test publishing"; + info->description = "Test publishing"; return AST_TEST_NOT_RUN; case TEST_EXECUTE: break; @@ -373,6 +394,53 @@ AST_TEST_DEFINE(publish) return AST_TEST_PASS; } +AST_TEST_DEFINE(publish_sync) +{ + RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe); + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + int actual_len; + const char *actual; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test synchronous publishing"; + info->description = "Test synchronous publishing"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + topic = stasis_topic_create("TestTopic"); + ast_test_validate(test, NULL != topic); + + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + + uut = stasis_subscribe(topic, consumer_exec_sync, consumer); + ast_test_validate(test, NULL != uut); + ao2_ref(consumer, +1); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + test_message_type = stasis_message_type_create("TestMessage", NULL); + test_message = stasis_message_create(test_message_type, test_data); + + stasis_publish_sync(uut, test_message); + + actual_len = consumer->messages_rxed_len; + ast_test_validate(test, 1 == actual_len); + actual = stasis_message_data(consumer->messages_rxed[0]); + ast_test_validate(test, test_data == actual); + + return AST_TEST_PASS; +} + AST_TEST_DEFINE(unsubscribe_stops_messages) { RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); @@ -1324,6 +1392,7 @@ static int unload_module(void) AST_TEST_UNREGISTER(message); AST_TEST_UNREGISTER(subscription_messages); AST_TEST_UNREGISTER(publish); + AST_TEST_UNREGISTER(publish_sync); AST_TEST_UNREGISTER(unsubscribe_stops_messages); AST_TEST_UNREGISTER(forward); AST_TEST_UNREGISTER(cache_filter); @@ -1347,6 +1416,7 @@ static int load_module(void) AST_TEST_REGISTER(message); AST_TEST_REGISTER(subscription_messages); AST_TEST_REGISTER(publish); + AST_TEST_REGISTER(publish_sync); AST_TEST_REGISTER(unsubscribe_stops_messages); AST_TEST_REGISTER(forward); AST_TEST_REGISTER(cache_filter);