diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 3b9cec34f2138f7ee37c8a5d2c88fefa2bc406a3..1a3dae00f769f35620fe61872d1f2eee33109727 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -110,12 +110,12 @@ * It's a thread safe container, so freely use the stasis_cache_get() and * stasis_cache_dump() to query the cache. * - * The \ref stasis_caching_topic provides a topic that forwards non-cacheable - * messages unchanged. A cacheable message is wrapped in a \ref - * stasis_cache_update message which provides the old snapshot (or \c NULL if - * this is a new cache entry), and the new snapshot (or \c NULL if the entry was - * removed from the cache). A stasis_cache_clear_create() message must be sent - * to the topic in order to remove entries from the cache. + * The \ref stasis_caching_topic discards non-cacheable messages. A cacheable + * message is wrapped in a \ref stasis_cache_update message which provides the + * old snapshot (or \c NULL if this is a new cache entry), and the new snapshot + * (or \c NULL if the entry was removed from the cache). A + * stasis_cache_clear_create() message must be sent to the topic in order to + * remove entries from the cache. * * In order to unsubscribe a \ref stasis_caching_topic from the upstream topic, * call stasis_caching_unsubscribe(). Due to cyclic references, the \ref diff --git a/main/stasis_cache.c b/main/stasis_cache.c index 3d50656657a8004e642840978f7cf71669d39d20..2ca4083df41ce6457e789045161f7524e418aacf 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -426,8 +426,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, id = caching_topic->cache->id_fn(message); if (id == NULL) { - /* Object isn't cached; forward */ - stasis_forward_message(caching_topic->topic, topic, message); + /* Object isn't cached; discard */ } else { /* Update the cache */ RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup); diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c index 18ae8617ed10bcf780008259bedda801322a6e29..0ee57ba242fc42f2bfb65141e75dbb961bd98057 100644 --- a/main/stasis_cache_pattern.c +++ b/main/stasis_cache_pattern.c @@ -38,14 +38,16 @@ struct stasis_cp_all { struct stasis_topic *topic; struct stasis_topic *topic_cached; struct stasis_cache *cache; + + struct stasis_subscription *forward_all_to_cached; }; struct stasis_cp_single { struct stasis_topic *topic; struct stasis_caching_topic *topic_cached; - struct stasis_subscription *forward; - struct stasis_subscription *forward_cached; + struct stasis_subscription *forward_topic_to_all; + struct stasis_subscription *forward_cached_to_all; }; static void all_dtor(void *obj) @@ -53,8 +55,13 @@ static void all_dtor(void *obj) struct stasis_cp_all *all = obj; ao2_cleanup(all->topic); + all->topic = NULL; ao2_cleanup(all->topic_cached); + all->topic_cached = NULL; ao2_cleanup(all->cache); + all->cache = NULL; + stasis_unsubscribe_and_join(all->forward_all_to_cached); + all->forward_all_to_cached = NULL; } struct stasis_cp_all *stasis_cp_all_create(const char *name, @@ -76,8 +83,11 @@ struct stasis_cp_all *stasis_cp_all_create(const char *name, all->topic = stasis_topic_create(name); all->topic_cached = stasis_topic_create(cached_name); all->cache = stasis_cache_create(id_fn); + all->forward_all_to_cached = + stasis_forward_all(all->topic, all->topic_cached); - if (!all->topic || !all->topic_cached || !all->cache) { + if (!all->topic || !all->topic_cached || !all->cache || + !all->forward_all_to_cached) { return NULL; } @@ -116,8 +126,8 @@ static void one_dtor(void *obj) /* Should already be unsubscribed */ ast_assert(one->topic_cached == NULL); - ast_assert(one->forward == NULL); - ast_assert(one->forward_cached == NULL); + ast_assert(one->forward_topic_to_all == NULL); + ast_assert(one->forward_cached_to_all == NULL); ao2_cleanup(one->topic); one->topic = NULL; @@ -142,13 +152,13 @@ struct stasis_cp_single *stasis_cp_single_create(struct stasis_cp_all *all, return NULL; } - one->forward = stasis_forward_all(one->topic, all->topic); - if (!one->forward) { + one->forward_topic_to_all = stasis_forward_all(one->topic, all->topic); + if (!one->forward_topic_to_all) { return NULL; } - one->forward_cached = stasis_forward_all( + one->forward_cached_to_all = stasis_forward_all( stasis_caching_get_topic(one->topic_cached), all->topic_cached); - if (!one->forward_cached) { + if (!one->forward_cached_to_all) { return NULL; } @@ -162,12 +172,11 @@ void stasis_cp_single_unsubscribe(struct stasis_cp_single *one) return; } + stasis_unsubscribe(one->forward_topic_to_all); + one->forward_topic_to_all = NULL; + stasis_unsubscribe(one->forward_cached_to_all); + one->forward_cached_to_all = NULL; stasis_caching_unsubscribe(one->topic_cached); - one->topic_cached = NULL; - stasis_unsubscribe(one->forward); - one->forward = NULL; - stasis_unsubscribe(one->forward_cached); - one->forward_cached = NULL; } struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one) diff --git a/tests/test_stasis.c b/tests/test_stasis.c index 0b63da42e9068adf84794085f519fd548893bddd..12e052988e8a612acdd0eb202b53ed4a82d25e78 100644 --- a/tests/test_stasis.c +++ b/tests/test_stasis.c @@ -610,7 +610,7 @@ static const char *cache_test_data_id(struct stasis_message *message) { return cachable->id; } -AST_TEST_DEFINE(cache_passthrough) +AST_TEST_DEFINE(cache_filter) { RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup); RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); @@ -620,14 +620,13 @@ AST_TEST_DEFINE(cache_passthrough) RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe); RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup); int actual_len; - struct stasis_message_type *actual_type; switch (cmd) { case TEST_INIT: info->name = __func__; info->category = test_category; - info->summary = "Test passing messages through cache topic unscathed."; - info->description = "Test passing messages through cache topic unscathed."; + info->summary = "Test caching topics only forward cache_update messages."; + info->description = "Test caching topics only forward cache_update messages."; return AST_TEST_NOT_RUN; case TEST_EXECUTE: break; @@ -652,13 +651,8 @@ AST_TEST_DEFINE(cache_passthrough) stasis_publish(topic, test_message); - actual_len = consumer_wait_for(consumer, 1); - ast_test_validate(test, 1 == actual_len); - - actual_type = stasis_message_type(consumer->messages_rxed[0]); - ast_test_validate(test, non_cache_type == actual_type); - - ast_test_validate(test, test_message == consumer->messages_rxed[0]); + actual_len = consumer_should_stay(consumer, 0); + ast_test_validate(test, 0 == actual_len); return AST_TEST_PASS; } @@ -1113,8 +1107,9 @@ AST_TEST_DEFINE(router_cache_updates) ast_test_validate(test, 1 == actual_len); actual_len = consumer_wait_for(consumer2, 1); ast_test_validate(test, 1 == actual_len); - actual_len = consumer_wait_for(consumer3, 1); - ast_test_validate(test, 1 == actual_len); + /* Uncacheable message should not be passed through */ + actual_len = consumer_should_stay(consumer3, 0); + ast_test_validate(test, 0 == actual_len); actual = consumer1->messages_rxed[0]; ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual)); @@ -1128,9 +1123,6 @@ AST_TEST_DEFINE(router_cache_updates) ast_test_validate(test, test_message_type2 == update->type); ast_test_validate(test, test_message2 == update->new_snapshot); - actual = consumer3->messages_rxed[0]; - ast_test_validate(test, test_message3 == actual); - /* consumer1 and consumer2 do not get the final message. */ ao2_cleanup(consumer1); ao2_cleanup(consumer2); @@ -1287,7 +1279,7 @@ static int unload_module(void) AST_TEST_UNREGISTER(publish); AST_TEST_UNREGISTER(unsubscribe_stops_messages); AST_TEST_UNREGISTER(forward); - AST_TEST_UNREGISTER(cache_passthrough); + AST_TEST_UNREGISTER(cache_filter); AST_TEST_UNREGISTER(cache); AST_TEST_UNREGISTER(cache_dump); AST_TEST_UNREGISTER(route_conflicts); @@ -1309,7 +1301,7 @@ static int load_module(void) AST_TEST_REGISTER(publish); AST_TEST_REGISTER(unsubscribe_stops_messages); AST_TEST_REGISTER(forward); - AST_TEST_REGISTER(cache_passthrough); + AST_TEST_REGISTER(cache_filter); AST_TEST_REGISTER(cache); AST_TEST_REGISTER(cache_dump); AST_TEST_REGISTER(route_conflicts);