From b23e8e19507af448986554d4b360ca3ceefc4c18 Mon Sep 17 00:00:00 2001 From: "David M. Lee" <dlee@digium.com> Date: Mon, 1 Apr 2013 13:37:51 +0000 Subject: [PATCH] stasis: Fixed message ordering issues when forwarding This patch fixes an issue of message ordering that occurs when multiple topics are forwarded to an aggregator topic (such as ast_channel_topic_all()). It is (very reasonably) expected that the rules governing message dispatch order still apply, so long as the messages start from the same thread, and are received by the same subscription. Because the existing code had an additional layer of dispatching via the Stasis thread pool for forwards, those promises couldn't be kept. Forwarding subscriptions no longer have their own mailbox, and now dispatch directly from the forwarding topic's stasis_publish() call. This means that the topic's lock is held for the duration of not only a message's dispatch, but the dispatch of all the forwards. This shouldn't be a problem right now, but if an aggregator topic had many subscribers, it could become a problem. But I figure we can write more clever code when the time comes, if necessary. Review: https://reviewboard.asterisk.org/r/2419/ git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@384413 65c4cc65-6c06-0410-ace0-fbb531ad65f3 --- main/stasis.c | 83 ++++++++++++++++++++++++++++++------------- tests/test_stasis.c | 86 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 144 insertions(+), 25 deletions(-) diff --git a/main/stasis.c b/main/stasis.c index 2bd432a0c6..3388a8042a 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -61,7 +61,6 @@ struct stasis_topic { }; /* Forward declarations for the tightly-coupled subscription object */ -struct stasis_subscription; static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub); static void topic_dtor(void *obj) @@ -127,9 +126,30 @@ static void subscription_dtor(void *obj) sub->mailbox = NULL; } +/*! + * \brief Invoke the subscription's callback. + * \param sub Subscription to invoke. + * \param topic Topic message was published to. + * \param message Message to send. + */ +static void subscription_invoke(struct stasis_subscription *sub, + struct stasis_topic *topic, + struct stasis_message *message) +{ + /* Since sub->topic doesn't change, no need to lock sub */ + sub->callback(sub->data, + sub, + topic, + message); +} + static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description); -struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data) +static struct stasis_subscription *__stasis_subscribe( + struct stasis_topic *topic, + stasis_subscription_cb callback, + void *data, + int needs_mailbox) { RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup); @@ -140,9 +160,11 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_ ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid)); - sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool); - if (!sub->mailbox) { - return NULL; + if (needs_mailbox) { + sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool); + if (!sub->mailbox) { + return NULL; + } } ao2_ref(topic, +1); @@ -159,6 +181,14 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_ return sub; } +struct stasis_subscription *stasis_subscribe( + struct stasis_topic *topic, + stasis_subscription_cb callback, + void *data) +{ + return __stasis_subscribe(topic, callback, data, 1); +} + struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) { if (sub) { @@ -305,17 +335,8 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi static int dispatch_exec(void *data) { RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup); - RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup); - - /* Since sub->topic doesn't change, no need to lock sub */ - ast_assert(dispatch->sub->topic != NULL); - ao2_ref(dispatch->sub->topic, +1); - sub_topic = dispatch->sub->topic; - dispatch->sub->callback(dispatch->sub->data, - dispatch->sub, - sub_topic, - dispatch->message); + subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message); return 0; } @@ -331,18 +352,28 @@ void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *pub for (i = 0; i < topic->num_subscribers_current; ++i) { struct stasis_subscription *sub = topic->subscribers[i]; - RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); ast_assert(sub != NULL); - dispatch = dispatch_create(publisher_topic, message, sub); - if (!dispatch) { - ast_log(LOG_DEBUG, "Dropping dispatch\n"); - break; - } + if (sub->mailbox) { + RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); + + dispatch = dispatch_create(publisher_topic, message, sub); + if (!dispatch) { + ast_log(LOG_DEBUG, "Dropping dispatch\n"); + break; + } - if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) { - dispatch = NULL; /* Ownership transferred to mailbox */ + if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) { + /* Ownership transferred to mailbox. + * Don't increment ref, b/c the task processor + * may have already gotten rid of the object. + */ + dispatch = NULL; + } + } else { + /* Dispatch directly */ + subscription_invoke(sub, publisher_topic, message); } } } @@ -370,7 +401,11 @@ struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, return NULL; } - sub = stasis_subscribe(from_topic, stasis_forward_cb, to_topic); + /* Forwarding subscriptions should dispatch directly instead of having a + * mailbox. Otherwise, messages forwarded to the same topic from + * different topics may get reordered. Which is bad. + */ + sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0); if (sub) { /* hold a ref to to_topic for this forwarding subscription */ ao2_ref(to_topic, +1); diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 52d510f144..3a7d52c07f 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -391,7 +391,6 @@ AST_TEST_DEFINE(unsubscribe_stops_messages) return AST_TEST_PASS; } - AST_TEST_DEFINE(forward) { RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup); @@ -458,6 +457,89 @@ AST_TEST_DEFINE(forward) return AST_TEST_PASS; } +AST_TEST_DEFINE(interleaving) +{ + RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_subscription *, forward_sub1, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_subscription *, forward_sub2, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); + + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + + int actual_len; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test sending interleaved events to a parent topic"; + info->description = "Test sending events to a parent topic.\n" + "This test creates three topics (one parent, two children)\n" + "and publishes messages alternately between the children.\n" + "It verifies that the messages are received in the expected\n" + "order."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + test_message_type = stasis_message_type_create("test"); + ast_test_validate(test, NULL != test_message_type); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + + test_message1 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message1); + test_message2 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message2); + test_message3 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message3); + + parent_topic = stasis_topic_create("ParentTestTopic"); + ast_test_validate(test, NULL != parent_topic); + topic1 = stasis_topic_create("Topic1"); + ast_test_validate(test, NULL != topic1); + topic2 = stasis_topic_create("Topic2"); + ast_test_validate(test, NULL != topic2); + + forward_sub1 = stasis_forward_all(topic1, parent_topic); + ast_test_validate(test, NULL != forward_sub1); + forward_sub2 = stasis_forward_all(topic2, parent_topic); + ast_test_validate(test, NULL != forward_sub2); + + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + + sub = stasis_subscribe(parent_topic, consumer_exec, consumer); + ast_test_validate(test, NULL != sub); + ao2_ref(consumer, +1); + + stasis_publish(topic1, test_message1); + stasis_publish(topic2, test_message2); + stasis_publish(topic1, test_message3); + + actual_len = consumer_wait_for(consumer, 3); + ast_test_validate(test, 3 == actual_len); + + ast_test_validate(test, test_message1 == consumer->messages_rxed[0]); + ast_test_validate(test, test_message2 == consumer->messages_rxed[1]); + ast_test_validate(test, test_message3 == consumer->messages_rxed[2]); + + return AST_TEST_PASS; +} + struct cache_test_data { char *id; char *value; @@ -829,6 +911,7 @@ static int unload_module(void) AST_TEST_UNREGISTER(cache); AST_TEST_UNREGISTER(route_conflicts); AST_TEST_UNREGISTER(router); + AST_TEST_UNREGISTER(interleaving); return 0; } @@ -844,6 +927,7 @@ static int load_module(void) AST_TEST_REGISTER(cache); AST_TEST_REGISTER(route_conflicts); AST_TEST_REGISTER(router); + AST_TEST_REGISTER(interleaving); return AST_MODULE_LOAD_SUCCESS; } -- GitLab