diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 85e78dcd11955d51a2da5dc8662350d0b5b3797c..6d423d901522c5e93e9e6bd8a1da7a8ac26726ae 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -300,6 +300,21 @@ enum stasis_subscription_message_filter { STASIS_SUBSCRIPTION_FILTER_SELECTIVE, /*!< Only messages of allowed message types are raised */ }; +/*! + * \brief Stasis subscription formatter filters + * + * There should be an entry here for each member of \ref stasis_message_vtable + * + * \since 13.25.0 + * \since 16.2.0 + */ +enum stasis_subscription_message_formatters { + STASIS_SUBSCRIPTION_FORMATTER_NONE = 0, + STASIS_SUBSCRIPTION_FORMATTER_JSON = 1 << 0, /*!< Allow messages with a to_json formatter */ + STASIS_SUBSCRIPTION_FORMATTER_AMI = 1 << 1, /*!< Allow messages with a to_ami formatter */ + STASIS_SUBSCRIPTION_FORMATTER_EVENT = 1 << 2, /*!< Allow messages with a to_event formatter */ +}; + /*! * \brief Create a new message type. * @@ -675,6 +690,30 @@ int stasis_subscription_decline_message_type(struct stasis_subscription *subscri int stasis_subscription_set_filter(struct stasis_subscription *subscription, enum stasis_subscription_message_filter filter); +/*! + * \brief Indicate to a subscription that we are interested in messages with one or more formatters. + * + * \param subscription Subscription to alter. + * \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive. + * + * \since 13.25.0 + * \since 16.2.0 + */ +void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, + enum stasis_subscription_message_formatters formatters); + +/*! + * \brief Get a bitmap of available formatters for a message type + * + * \param message_type Message type + * \return A bitmap of \ref stasis_subscription_message_formatters + * + * \since 13.25.0 + * \since 16.2.0 + */ +enum stasis_subscription_message_formatters stasis_message_type_available_formatters( + const struct stasis_message_type *message_type); + /*! * \brief Cancel a subscription. * diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index 8dcdfcc913ec6b9e98898cd9e4566a8e57239123..9897d62b7dea35c8786ccdea0a5a019c95bd9582 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -242,4 +242,23 @@ int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, void *data); +/*! + * \brief Indicate to a message router that we are interested in messages with one or more formatters. + * + * The formatters are passed on to the underlying subscription. + * + * \warning With direct subscriptions, adding a formatter filter is an OR operation + * with any message type filters. In the current implementation of message router however, + * it's an AND operation. Even when setting a default route, the callback will only get + * messages that have the formatters provides in this call. + * + * \param router Router to set the formatters of. + * \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive. + * + * \since 13.25.0 + * \since 16.2.0 + */ +void stasis_message_router_accept_formatters(struct stasis_message_router *router, + enum stasis_subscription_message_formatters formatters); + #endif /* _ASTERISK_STASIS_MESSAGE_ROUTER_H */ diff --git a/main/stasis.c b/main/stasis.c index 0c60b13290bdf2361854f74bca141658e81afce2..69ec1a5269662c3ce1c1041e780f5ce958458d3c 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -399,6 +399,8 @@ struct stasis_subscription { /*! The message types this subscription is accepting */ AST_VECTOR(, char) accepted_message_types; + /*! The message formatters this subscription is accepting */ + enum stasis_subscription_message_formatters accepted_formatters; /*! The message filter currently in use */ enum stasis_subscription_message_filter filter; }; @@ -443,6 +445,10 @@ static void subscription_invoke(struct stasis_subscription *sub, ao2_unlock(sub); } + /* + * If filtering is turned on and this is a 'final' message, we only invoke the callback + * if the subscriber accepts subscription_change message types. + */ if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE || (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) { /* Since sub is mostly immutable, no need to lock sub */ @@ -520,6 +526,7 @@ struct stasis_subscription *internal_stasis_subscribe( ast_cond_init(&sub->join_cond, NULL); sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE; AST_VECTOR_INIT(&sub->accepted_message_types, 0); + sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE; if (topic_add_subscription(topic, sub) != 0) { ao2_ref(sub, -1); @@ -676,6 +683,18 @@ int stasis_subscription_set_filter(struct stasis_subscription *subscription, return 0; } +void stasis_subscription_accept_formatters(struct stasis_subscription *subscription, + enum stasis_subscription_message_formatters formatters) +{ + ast_assert(subscription != NULL); + + ao2_lock(subscription->topic); + subscription->accepted_formatters = formatters; + ao2_unlock(subscription->topic); + + return; +} + void stasis_subscription_join(struct stasis_subscription *subscription) { if (subscription) { @@ -871,17 +890,57 @@ static void dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous) { - /* Determine if this subscription is interested in this message. Note that final - * messages are special and are always invoked on the subscription. + int is_final = stasis_subscription_final_message(sub, message); + + /* + * The 'do while' gives us an easy way to skip remaining logic once + * we determine the message should be accepted. + * The code looks more verbose than it needs to be but it optimizes + * down very nicely. It's just easier to understand and debug this way. */ - if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) { - int message_type_id = stasis_message_type_id(stasis_message_type(message)); - if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) || - !AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) && - !stasis_subscription_final_message(sub, message)) { - return; + do { + struct stasis_message_type *message_type = stasis_message_type(message); + int type_id = stasis_message_type_id(message_type); + int type_filter_specified = 0; + int formatter_filter_specified = 0; + int type_filter_passed = 0; + int formatter_filter_passed = 0; + + /* We always accept final messages so only run the filter logic if not final */ + if (is_final) { + break; } - } + + type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE; + formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE; + + /* Accept if no filters of either type were specified */ + if (!type_filter_specified && !formatter_filter_specified) { + break; + } + + type_filter_passed = type_filter_specified + && type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) + && AST_VECTOR_GET(&sub->accepted_message_types, type_id); + + /* + * Since the type and formatter filters are OR'd, we can skip + * the formatter check if the type check passes. + */ + if (type_filter_passed) { + break; + } + + formatter_filter_passed = formatter_filter_specified + && (sub->accepted_formatters & stasis_message_type_available_formatters(message_type)); + + if (formatter_filter_passed) { + break; + } + + return; + + } while (0); if (!sub->mailbox) { /* Dispatch directly */ diff --git a/main/stasis_message.c b/main/stasis_message.c index 1fdbe858e3be8e43dcc2aaefcfc86064775d982c..d3f304cc77a16ed70ffd6184a1229ef99358aa4a 100644 --- a/main/stasis_message.c +++ b/main/stasis_message.c @@ -40,6 +40,7 @@ struct stasis_message_type { char *name; unsigned int hash; int id; + enum stasis_subscription_message_formatters available_formatters; }; static struct stasis_message_vtable null_vtable = {}; @@ -80,6 +81,15 @@ int stasis_message_type_create(const char *name, } type->hash = ast_hashtab_hash_string(name); type->vtable = vtable; + if (vtable->to_json) { + type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_JSON; + } + if (vtable->to_ami) { + type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_AMI; + } + if (vtable->to_event) { + type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_EVENT; + } type->id = ast_atomic_fetchadd_int(&message_type_id, +1); *result = type; @@ -101,6 +111,12 @@ int stasis_message_type_id(const struct stasis_message_type *type) return type->id; } +enum stasis_subscription_message_formatters stasis_message_type_available_formatters( + const struct stasis_message_type *type) +{ + return type->available_formatters; +} + /*! \internal */ struct stasis_message { /*! Time the message was created */ diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 41ebc7ea8a9ed99134f695c043c6ae2eb5b8975d..197f7f939f22ad89b739bf01fe42ddab1f8f6419 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -399,3 +399,13 @@ int stasis_message_router_set_default(struct stasis_message_router *router, /* While this implementation can never fail, it used to be able to */ return 0; } + +void stasis_message_router_accept_formatters(struct stasis_message_router *router, + enum stasis_subscription_message_formatters formatters) +{ + ast_assert(router != NULL); + + stasis_subscription_accept_formatters(router->subscription, formatters); + + return; +} diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 5bc38c50eef9b4480389a40a0347164d5a20e84c..e620039edded240c72cced0750f8001c690fbaac 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -38,7 +38,13 @@ #include "asterisk/stasis_message_router.h" #include "asterisk/test.h" -static const char *test_category = "/stasis/core/"; +#define test_category "/stasis/core/" + +static struct ast_event *fake_event(struct stasis_message *message) +{ + return ast_event_new(AST_EVENT_CUSTOM, + AST_EVENT_IE_DESCRIPTION, AST_EVENT_IE_PLTYPE_STR, "Dummy", AST_EVENT_IE_END); +} static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize) { @@ -2044,6 +2050,389 @@ AST_TEST_DEFINE(caching_dtor_order) return AST_TEST_PASS; } +struct test_message_types { + struct stasis_message_type *none; + struct stasis_message_type *ami; + struct stasis_message_type *json; + struct stasis_message_type *event; + struct stasis_message_type *amievent; + struct stasis_message_type *type1; + struct stasis_message_type *type2; + struct stasis_message_type *type3; + struct stasis_message_type *change; +}; + +static void destroy_message_types(void *obj) +{ + struct test_message_types *types = obj; + + ao2_cleanup(types->none); + ao2_cleanup(types->ami); + ao2_cleanup(types->json); + ao2_cleanup(types->event); + ao2_cleanup(types->amievent); + ao2_cleanup(types->type1); + ao2_cleanup(types->type2); + ao2_cleanup(types->type3); + /* N.B. Don't cleanup types->change! */ +} + +static struct test_message_types *create_message_types(struct ast_test *test) +{ + struct stasis_message_vtable vtable = { 0 }; + struct test_message_types *types; + enum ast_test_result_state __attribute__ ((unused)) rc; + + types = ao2_alloc(sizeof(*types), destroy_message_types); + if (!types) { + return NULL; + } + + ast_test_validate_cleanup(test, + stasis_message_type_create("TestMessageNONE", &vtable, &types->none) == STASIS_MESSAGE_TYPE_SUCCESS, + rc, cleanup); + + vtable.to_ami = fake_ami; + ast_test_validate_cleanup(test, + stasis_message_type_create("TestMessageAMI", &vtable, &types->ami) == STASIS_MESSAGE_TYPE_SUCCESS, + rc, cleanup); + + vtable.to_ami = NULL; + vtable.to_json = fake_json; + ast_test_validate_cleanup(test, + stasis_message_type_create("TestMessageJSON", &vtable, &types->json) == STASIS_MESSAGE_TYPE_SUCCESS, + rc, cleanup); + + vtable.to_ami = NULL; + vtable.to_json = NULL; + vtable.to_event = fake_event; + ast_test_validate_cleanup(test, + stasis_message_type_create("TestMessageEVENT", &vtable, &types->event) == STASIS_MESSAGE_TYPE_SUCCESS, + rc, cleanup); + + vtable.to_ami = fake_ami; + ast_test_validate_cleanup(test, + stasis_message_type_create("TestMessageAMIEVENT", &vtable, &types->amievent) == STASIS_MESSAGE_TYPE_SUCCESS, + rc, cleanup); + + ast_test_validate_cleanup(test, + stasis_message_type_create("TestMessageType1", NULL, &types->type1) == STASIS_MESSAGE_TYPE_SUCCESS, + rc, cleanup); + + ast_test_validate_cleanup(test, + stasis_message_type_create("TestMessageType2", NULL, &types->type2) == STASIS_MESSAGE_TYPE_SUCCESS, + rc, cleanup); + + ast_test_validate_cleanup(test, + stasis_message_type_create("TestMessageType3", NULL, &types->type3) == STASIS_MESSAGE_TYPE_SUCCESS, + rc, cleanup); + + types->change = stasis_subscription_change_type(); + + return types; + +cleanup: + ao2_cleanup(types); + return NULL; +} + +struct cts { + struct consumer *consumer; + struct stasis_topic *topic; + struct stasis_subscription *sub; +}; + +static void destroy_cts(void *obj) +{ + struct cts *c = obj; + + stasis_unsubscribe(c->sub); + ao2_cleanup(c->topic); + ao2_cleanup(c->consumer); +} + +static struct cts *create_cts(struct ast_test *test) +{ + struct cts *cts = ao2_alloc(sizeof(*cts), destroy_cts); + enum ast_test_result_state __attribute__ ((unused)) rc; + + ast_test_validate_cleanup(test, cts, rc, cleanup); + + cts->topic = stasis_topic_create("TestTopic"); + ast_test_validate_cleanup(test, NULL != cts->topic, rc, cleanup); + + cts->consumer = consumer_create(0); + ast_test_validate_cleanup(test, NULL != cts->consumer, rc, cleanup); + + ao2_ref(cts->consumer, +1); + cts->sub = stasis_subscribe(cts->topic, consumer_exec, cts->consumer); + ast_test_validate_cleanup(test, NULL != cts->sub, rc, cleanup); + + return cts; + +cleanup: + ao2_cleanup(cts); + return NULL; +} + +static int is_msg(struct stasis_message *msg, struct stasis_message_type *mtype, const char *data) +{ + struct stasis_subscription_change *msg_data = stasis_message_data(msg); + + if (stasis_message_type(msg) != mtype) { + return 0; + } + + if (data) { + return (strcmp(data, msg_data->description) == 0); + } + + return 1; +} + +static void dump_consumer(struct ast_test *test, struct cts *cts) +{ + int i; + struct stasis_subscription_change *data; + + ast_test_status_update(test, "Messages received: %ld Final? %s\n", cts->consumer->messages_rxed_len, + cts->consumer->complete ? "yes" : "no"); + for (i = 0; i < cts->consumer->messages_rxed_len; i++) { + data = stasis_message_data(cts->consumer->messages_rxed[i]); + ast_test_status_update(test, "Message type received: %s %s\n", + stasis_message_type_name(stasis_message_type(cts->consumer->messages_rxed[i])), + data && data->description ? data->description : "no data"); + } +} + +static int send_msg(struct ast_test *test, struct cts *cts, struct stasis_message_type *msg_type, + const char *data) +{ + struct stasis_message *msg; + struct stasis_subscription_change *test_data = + ao2_alloc(sizeof(*test_data) + (data ? strlen(data) : strlen("no data")) + 1, NULL); + + if (!test_data) { + return 0; + } + strcpy(test_data->description, S_OR(data, "no data")); /* Safe */ + + msg = stasis_message_create(msg_type, test_data); + ao2_ref(test_data, -1); + if (!msg) { + ast_test_status_update(test, "Unable to create %s message\n", + stasis_message_type_name(msg_type)); + return 0; + } + + stasis_publish(cts->topic, msg); + ao2_ref(msg, -1); + + return 1; +} + +AST_TEST_DEFINE(type_filters) +{ + RAII_VAR(struct cts *, cts, NULL, ao2_cleanup); + RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup); + int ix = 0; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category "filtering/"; + info->summary = "Test message filtering by type"; + info->description = "Test message filtering by type"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + types = create_message_types(test); + ast_test_validate(test, NULL != types); + + cts = create_cts(test); + ast_test_validate(test, NULL != cts); + + ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0); + ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0); + ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0); + ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0); + + /* We should get these */ + ast_test_validate(test, send_msg(test, cts, types->type1, "Pass")); + ast_test_validate(test, send_msg(test, cts, types->type2, "Pass")); + /* ... but not this one */ + ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL")); + + /* Wait for change(subscribe) and "Pass" messages */ + consumer_wait_for(cts->consumer, 3); + + /* Remove type 1 */ + ast_test_validate(test, stasis_subscription_decline_message_type(cts->sub, types->type1) == 0); + + /* We should now NOT get this one */ + ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL")); + /* We should get this one (again) */ + ast_test_validate(test, send_msg(test, cts, types->type2, "Pass2")); + /* We still should NOT get this one */ + ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL")); + + /* We should now have a second type2 */ + consumer_wait_for(cts->consumer, 4); + + stasis_unsubscribe(cts->sub); + cts->sub = NULL; + consumer_wait_for_completion(cts->consumer); + + dump_consumer(test, cts); + + ast_test_validate(test, 1 == cts->consumer->complete); + ast_test_validate(test, 5 == cts->consumer->messages_rxed_len); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass2")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe")); + + return AST_TEST_PASS; +} + +AST_TEST_DEFINE(formatter_filters) +{ + RAII_VAR(struct cts *, cts, NULL, ao2_cleanup); + RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup) ; + int ix = 0; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category "filtering/"; + info->summary = "Test message filtering by formatter"; + info->description = "Test message filtering by formatter"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + types = create_message_types(test); + ast_test_validate(test, NULL != types); + + cts = create_cts(test); + ast_test_validate(test, NULL != cts); + + stasis_subscription_accept_formatters(cts->sub, + STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON); + + /* We should get these */ + ast_test_validate(test, send_msg(test, cts, types->ami, "Pass")); + ast_test_validate(test, send_msg(test, cts, types->json, "Pass")); + ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass")); + + /* ... but not these */ + ast_test_validate(test, send_msg(test, cts, types->none, "FAIL")); + ast_test_validate(test, send_msg(test, cts, types->event, "FAIL")); + ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL")); + + /* Wait for change(subscribe) and the "Pass" messages */ + consumer_wait_for(cts->consumer, 4); + + /* Change the subscription to accept only event formatters */ + stasis_subscription_accept_formatters(cts->sub, STASIS_SUBSCRIPTION_FORMATTER_EVENT); + + /* We should NOT get these now */ + ast_test_validate(test, send_msg(test, cts, types->ami, "FAIL")); + ast_test_validate(test, send_msg(test, cts, types->json, "FAIL")); + /* ... but we should still get this one */ + ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass2")); + /* ... and this one should be new */ + ast_test_validate(test, send_msg(test, cts, types->event, "Pass")); + + /* We should now have a second amievent */ + consumer_wait_for(cts->consumer, 6); + + stasis_unsubscribe(cts->sub); + cts->sub = NULL; + consumer_wait_for_completion(cts->consumer); + + dump_consumer(test, cts); + + ast_test_validate(test, 1 == cts->consumer->complete); + ast_test_validate(test, 7 == cts->consumer->messages_rxed_len); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass2")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->event, "Pass")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe")); + + return AST_TEST_PASS; +} + +AST_TEST_DEFINE(combo_filters) +{ + RAII_VAR(struct cts *, cts, NULL, ao2_cleanup); + RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup); + int ix = 0; + + switch (cmd) { + case TEST_INIT: + info->name = __func__; + info->category = test_category "filtering/"; + info->summary = "Test message filtering by type and formatter"; + info->description = "Test message filtering by type and formatter"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + types = create_message_types(test); + ast_test_validate(test, NULL != types); + + cts = create_cts(test); + ast_test_validate(test, NULL != cts); + + ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0); + ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0); + ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0); + ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0); + stasis_subscription_accept_formatters(cts->sub, + STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON); + + /* We should get these */ + ast_test_validate(test, send_msg(test, cts, types->type1, "Pass")); + ast_test_validate(test, send_msg(test, cts, types->type2, "Pass")); + ast_test_validate(test, send_msg(test, cts, types->ami, "Pass")); + ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass")); + ast_test_validate(test, send_msg(test, cts, types->json, "Pass")); + + /* ... but not these */ + ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL")); + ast_test_validate(test, send_msg(test, cts, types->event, "FAIL")); + + /* Wait for change(subscribe) and the "Pass" messages */ + consumer_wait_for(cts->consumer, 6); + + stasis_unsubscribe(cts->sub); + cts->sub = NULL; + consumer_wait_for_completion(cts->consumer); + + dump_consumer(test, cts); + + ast_test_validate(test, 1 == cts->consumer->complete); + ast_test_validate(test, 7 == cts->consumer->messages_rxed_len); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass")); + ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe")); + + return AST_TEST_PASS; +} + static int unload_module(void) { AST_TEST_UNREGISTER(message_type); @@ -2070,6 +2459,9 @@ static int unload_module(void) AST_TEST_UNREGISTER(to_ami); AST_TEST_UNREGISTER(dtor_order); AST_TEST_UNREGISTER(caching_dtor_order); + AST_TEST_UNREGISTER(type_filters); + AST_TEST_UNREGISTER(formatter_filters); + AST_TEST_UNREGISTER(combo_filters); return 0; } @@ -2099,6 +2491,9 @@ static int load_module(void) AST_TEST_REGISTER(to_ami); AST_TEST_REGISTER(dtor_order); AST_TEST_REGISTER(caching_dtor_order); + AST_TEST_REGISTER(type_filters); + AST_TEST_REGISTER(formatter_filters); + AST_TEST_REGISTER(combo_filters); return AST_MODULE_LOAD_SUCCESS; }