diff --git a/main/stasis.c b/main/stasis.c index 2bd432a0c6da9328c3a25f310ed55b1ef899329b..3388a8042a541948013c7a3c0be3246d68ad4f6b 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -61,7 +61,6 @@ struct stasis_topic { }; /* Forward declarations for the tightly-coupled subscription object */ -struct stasis_subscription; static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub); static void topic_dtor(void *obj) @@ -127,9 +126,30 @@ static void subscription_dtor(void *obj) sub->mailbox = NULL; } +/*! + * \brief Invoke the subscription's callback. + * \param sub Subscription to invoke. + * \param topic Topic message was published to. + * \param message Message to send. + */ +static void subscription_invoke(struct stasis_subscription *sub, + struct stasis_topic *topic, + struct stasis_message *message) +{ + /* Since sub->topic doesn't change, no need to lock sub */ + sub->callback(sub->data, + sub, + topic, + message); +} + static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description); -struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data) +static struct stasis_subscription *__stasis_subscribe( + struct stasis_topic *topic, + stasis_subscription_cb callback, + void *data, + int needs_mailbox) { RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup); @@ -140,9 +160,11 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_ ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid)); - sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool); - if (!sub->mailbox) { - return NULL; + if (needs_mailbox) { + sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool); + if (!sub->mailbox) { + return NULL; + } } ao2_ref(topic, +1); @@ -159,6 +181,14 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_ return sub; } +struct stasis_subscription *stasis_subscribe( + struct stasis_topic *topic, + stasis_subscription_cb callback, + void *data) +{ + return __stasis_subscribe(topic, callback, data, 1); +} + struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) { if (sub) { @@ -305,17 +335,8 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi static int dispatch_exec(void *data) { RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup); - RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup); - - /* Since sub->topic doesn't change, no need to lock sub */ - ast_assert(dispatch->sub->topic != NULL); - ao2_ref(dispatch->sub->topic, +1); - sub_topic = dispatch->sub->topic; - dispatch->sub->callback(dispatch->sub->data, - dispatch->sub, - sub_topic, - dispatch->message); + subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message); return 0; } @@ -331,18 +352,28 @@ void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *pub for (i = 0; i < topic->num_subscribers_current; ++i) { struct stasis_subscription *sub = topic->subscribers[i]; - RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); ast_assert(sub != NULL); - dispatch = dispatch_create(publisher_topic, message, sub); - if (!dispatch) { - ast_log(LOG_DEBUG, "Dropping dispatch\n"); - break; - } + if (sub->mailbox) { + RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); + + dispatch = dispatch_create(publisher_topic, message, sub); + if (!dispatch) { + ast_log(LOG_DEBUG, "Dropping dispatch\n"); + break; + } - if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) { - dispatch = NULL; /* Ownership transferred to mailbox */ + if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) { + /* Ownership transferred to mailbox. + * Don't increment ref, b/c the task processor + * may have already gotten rid of the object. + */ + dispatch = NULL; + } + } else { + /* Dispatch directly */ + subscription_invoke(sub, publisher_topic, message); } } } @@ -370,7 +401,11 @@ struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, return NULL; } - sub = stasis_subscribe(from_topic, stasis_forward_cb, to_topic); + /* Forwarding subscriptions should dispatch directly instead of having a + * mailbox. Otherwise, messages forwarded to the same topic from + * different topics may get reordered. Which is bad. + */ + sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0); if (sub) { /* hold a ref to to_topic for this forwarding subscription */ ao2_ref(to_topic, +1); diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 52d510f144e9ba54ee94517dee242f9810497485..3a7d52c07f1083edb970c85000e4208a781ff15b 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -391,7 +391,6 @@ AST_TEST_DEFINE(unsubscribe_stops_messages) return AST_TEST_PASS; } - AST_TEST_DEFINE(forward) { RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup); @@ -458,6 +457,89 @@ AST_TEST_DEFINE(forward) return AST_TEST_PASS; } +AST_TEST_DEFINE(interleaving) +{ + RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup); + + RAII_VAR(char *, test_data, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup); + RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup); + + RAII_VAR(struct stasis_subscription *, forward_sub1, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_subscription *, forward_sub2, NULL, stasis_unsubscribe); + RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); + + RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup); + + int actual_len; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category; + info->summary = "Test sending interleaved events to a parent topic"; + info->description = "Test sending events to a parent topic.\n" + "This test creates three topics (one parent, two children)\n" + "and publishes messages alternately between the children.\n" + "It verifies that the messages are received in the expected\n" + "order."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + test_message_type = stasis_message_type_create("test"); + ast_test_validate(test, NULL != test_message_type); + + test_data = ao2_alloc(1, NULL); + ast_test_validate(test, NULL != test_data); + + test_message1 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message1); + test_message2 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message2); + test_message3 = stasis_message_create(test_message_type, test_data); + ast_test_validate(test, NULL != test_message3); + + parent_topic = stasis_topic_create("ParentTestTopic"); + ast_test_validate(test, NULL != parent_topic); + topic1 = stasis_topic_create("Topic1"); + ast_test_validate(test, NULL != topic1); + topic2 = stasis_topic_create("Topic2"); + ast_test_validate(test, NULL != topic2); + + forward_sub1 = stasis_forward_all(topic1, parent_topic); + ast_test_validate(test, NULL != forward_sub1); + forward_sub2 = stasis_forward_all(topic2, parent_topic); + ast_test_validate(test, NULL != forward_sub2); + + consumer = consumer_create(1); + ast_test_validate(test, NULL != consumer); + + sub = stasis_subscribe(parent_topic, consumer_exec, consumer); + ast_test_validate(test, NULL != sub); + ao2_ref(consumer, +1); + + stasis_publish(topic1, test_message1); + stasis_publish(topic2, test_message2); + stasis_publish(topic1, test_message3); + + actual_len = consumer_wait_for(consumer, 3); + ast_test_validate(test, 3 == actual_len); + + ast_test_validate(test, test_message1 == consumer->messages_rxed[0]); + ast_test_validate(test, test_message2 == consumer->messages_rxed[1]); + ast_test_validate(test, test_message3 == consumer->messages_rxed[2]); + + return AST_TEST_PASS; +} + struct cache_test_data { char *id; char *value; @@ -829,6 +911,7 @@ static int unload_module(void) AST_TEST_UNREGISTER(cache); AST_TEST_UNREGISTER(route_conflicts); AST_TEST_UNREGISTER(router); + AST_TEST_UNREGISTER(interleaving); return 0; } @@ -844,6 +927,7 @@ static int load_module(void) AST_TEST_REGISTER(cache); AST_TEST_REGISTER(route_conflicts); AST_TEST_REGISTER(router); + AST_TEST_REGISTER(interleaving); return AST_MODULE_LOAD_SUCCESS; }