diff --git a/main/stasis_cache.c b/main/stasis_cache.c index 8c3c7f15b6648265488d941a1f1efa02229ecd93..9907c6c22661ba39e6a08f51f72a05bfe3d2ac39 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -48,6 +48,7 @@ struct stasis_cache { snapshot_get_id id_fn; cache_aggregate_calc_fn aggregate_calc_fn; cache_aggregate_publish_fn aggregate_publish_fn; + int registered; }; /*! \internal */ @@ -69,6 +70,8 @@ static void stasis_caching_topic_dtor(void *obj) * be bad. */ ast_assert(stasis_subscription_is_done(caching_topic->sub)); + ao2_container_unregister(stasis_topic_name(caching_topic->topic)); + ao2_cleanup(caching_topic->sub); caching_topic->sub = NULL; ao2_cleanup(caching_topic->cache); @@ -813,7 +816,31 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, } msg_type = stasis_message_type(message); - if (stasis_cache_clear_type() == msg_type) { + + if (stasis_subscription_change_type() == msg_type) { + struct stasis_subscription_change *change = stasis_message_data(message); + + /* + * If this change type is an unsubscribe, we need to find the original + * subscribe and remove it from the cache otherwise the cache will + * continue to grow unabated. + */ + if (strcmp(change->description, "Unsubscribe") == 0) { + struct stasis_cache_entry *sub; + + ao2_wrlock(caching_topic->cache->entries); + sub = cache_find(caching_topic->cache->entries, stasis_subscription_change_type(), change->uniqueid); + if (sub) { + cache_remove(caching_topic->cache->entries, sub, stasis_message_eid(message)); + ao2_cleanup(sub); + } + ao2_unlock(caching_topic->cache->entries); + ao2_cleanup(caching_topic_needs_unref); + return; + } + msg_put = message; + msg = message; + } else if (stasis_cache_clear_type() == msg_type) { /* Cache clear event. */ msg_put = NULL; msg = stasis_message_data(message); @@ -866,6 +893,17 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, ao2_cleanup(caching_topic_needs_unref); } +static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt) +{ + struct stasis_cache_entry *entry = v_obj; + + if (!entry) { + return; + } + prnt(where, "Type: %s ID: %s Hash: %u", stasis_message_type_name(entry->key.type), + entry->key.id, entry->key.hash); +} + struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache) { struct stasis_caching_topic *caching_topic; @@ -886,15 +924,24 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or } caching_topic->topic = stasis_topic_create(new_name); - ast_free(new_name); if (caching_topic->topic == NULL) { ao2_ref(caching_topic, -1); + ast_free(new_name); return NULL; } ao2_ref(cache, +1); caching_topic->cache = cache; + if (!cache->registered) { + if (ao2_container_register(new_name, cache->entries, print_cache_entry)) { + ast_log(LOG_ERROR, "Stasis cache container '%p' for '%s' did not register\n", + cache->entries, new_name); + } else { + cache->registered = 1; + } + } + ast_free(new_name); caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0); if (caching_topic->sub == NULL) {