Skip to content
Snippets Groups Projects
test_stasis.c 73.1 KiB
Newer Older
  • Learn to ignore specific revisions
  • 	ast_test_validate(test, stasis_message_type_create("NonCacheable", NULL, &non_cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    	ast_test_validate(test, NULL != non_cache_type);
    	topic = stasis_topic_create("SomeTopic");
    	ast_test_validate(test, NULL != topic);
    
    	cache = stasis_cache_create(cache_test_data_id);
    	ast_test_validate(test, NULL != cache);
    	caching_topic = stasis_caching_topic_create(topic, cache);
    
    	ast_test_validate(test, NULL != caching_topic);
    	consumer = consumer_create(1);
    	ast_test_validate(test, NULL != consumer);
    	sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
    	ast_test_validate(test, NULL != sub);
    	ao2_ref(consumer, +1);
    
    	test_message = cache_test_message_create(non_cache_type, "1", "1");
    	ast_test_validate(test, NULL != test_message);
    
    	stasis_publish(topic, test_message);
    
    
    	actual_len = consumer_should_stay(consumer, 0);
    	ast_test_validate(test, 0 == actual_len);
    
    
    	return AST_TEST_PASS;
    }
    
    AST_TEST_DEFINE(cache)
    {
    	RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
    
    	RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
    
    	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
    	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
    	RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
    	int actual_len;
    	struct stasis_cache_update *actual_update;
    
    	switch (cmd) {
    	case TEST_INIT:
    		info->name = __func__;
    		info->category = test_category;
    		info->summary = "Test passing messages through cache topic unscathed.";
    		info->description = "Test passing messages through cache topic unscathed.";
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    
    	ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    	ast_test_validate(test, NULL != cache_type);
    	topic = stasis_topic_create("SomeTopic");
    	ast_test_validate(test, NULL != topic);
    
    	cache = stasis_cache_create(cache_test_data_id);
    	ast_test_validate(test, NULL != cache);
    	caching_topic = stasis_caching_topic_create(topic, cache);
    
    	ast_test_validate(test, NULL != caching_topic);
    	consumer = consumer_create(1);
    	ast_test_validate(test, NULL != consumer);
    	sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
    	ast_test_validate(test, NULL != sub);
    	ao2_ref(consumer, +1);
    
    	test_message1_1 = cache_test_message_create(cache_type, "1", "1");
    	ast_test_validate(test, NULL != test_message1_1);
    	test_message2_1 = cache_test_message_create(cache_type, "2", "1");
    	ast_test_validate(test, NULL != test_message2_1);
    
    	/* Post a couple of snapshots */
    	stasis_publish(topic, test_message1_1);
    	stasis_publish(topic, test_message2_1);
    	actual_len = consumer_wait_for(consumer, 2);
    	ast_test_validate(test, 2 == actual_len);
    
    	/* Check for new snapshot messages */
    
    	ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
    
    	actual_update = stasis_message_data(consumer->messages_rxed[0]);
    	ast_test_validate(test, NULL == actual_update->old_snapshot);
    	ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
    
    	ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
    
    	/* stasis_cache_get returned a ref, so unref test_message1_1 */
    	ao2_ref(test_message1_1, -1);
    
    
    	ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
    
    	actual_update = stasis_message_data(consumer->messages_rxed[1]);
    	ast_test_validate(test, NULL == actual_update->old_snapshot);
    	ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
    
    	ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
    
    	/* stasis_cache_get returned a ref, so unref test_message2_1 */
    	ao2_ref(test_message2_1, -1);
    
    	/* Update snapshot 2 */
    	test_message2_2 = cache_test_message_create(cache_type, "2", "2");
    	ast_test_validate(test, NULL != test_message2_2);
    	stasis_publish(topic, test_message2_2);
    
    	actual_len = consumer_wait_for(consumer, 3);
    	ast_test_validate(test, 3 == actual_len);
    
    	actual_update = stasis_message_data(consumer->messages_rxed[2]);
    	ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
    	ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
    
    	ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
    
    	/* stasis_cache_get returned a ref, so unref test_message2_2 */
    	ao2_ref(test_message2_2, -1);
    
    	/* Clear snapshot 1 */
    
    	test_message1_clear = stasis_cache_clear_create(test_message1_1);
    
    	ast_test_validate(test, NULL != test_message1_clear);
    	stasis_publish(topic, test_message1_clear);
    
    	actual_len = consumer_wait_for(consumer, 4);
    	ast_test_validate(test, 4 == actual_len);
    
    	actual_update = stasis_message_data(consumer->messages_rxed[3]);
    	ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
    	ast_test_validate(test, NULL == actual_update->new_snapshot);
    
    	ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
    
    	return AST_TEST_PASS;
    }
    
    AST_TEST_DEFINE(cache_dump)
    {
    	RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
    
    	RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
    
    	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
    	RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
    	RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
    	RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
    	int actual_len;
    	struct ao2_iterator i;
    	void *obj;
    
    	switch (cmd) {
    	case TEST_INIT:
    		info->name = __func__;
    		info->category = test_category;
    
    		info->summary = "Test cache dump routines.";
    		info->description = "Test cache dump routines.";
    
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    
    	ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    	ast_test_validate(test, NULL != cache_type);
    	topic = stasis_topic_create("SomeTopic");
    	ast_test_validate(test, NULL != topic);
    
    	cache = stasis_cache_create(cache_test_data_id);
    	ast_test_validate(test, NULL != cache);
    	caching_topic = stasis_caching_topic_create(topic, cache);
    
    	ast_test_validate(test, NULL != caching_topic);
    	consumer = consumer_create(1);
    	ast_test_validate(test, NULL != consumer);
    	sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
    	ast_test_validate(test, NULL != sub);
    	ao2_ref(consumer, +1);
    
    	test_message1_1 = cache_test_message_create(cache_type, "1", "1");
    	ast_test_validate(test, NULL != test_message1_1);
    	test_message2_1 = cache_test_message_create(cache_type, "2", "1");
    	ast_test_validate(test, NULL != test_message2_1);
    
    	/* Post a couple of snapshots */
    	stasis_publish(topic, test_message1_1);
    	stasis_publish(topic, test_message2_1);
    	actual_len = consumer_wait_for(consumer, 2);
    	ast_test_validate(test, 2 == actual_len);
    
    	/* Check the cache */
    
    	ao2_cleanup(cache_dump);
    
    	cache_dump = stasis_cache_dump(cache, NULL);
    
    	ast_test_validate(test, NULL != cache_dump);
    	ast_test_validate(test, 2 == ao2_container_count(cache_dump));
    	i = ao2_iterator_init(cache_dump, 0);
    	while ((obj = ao2_iterator_next(&i))) {
    		RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
    		ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_1);
    	}
    
    
    	/* Update snapshot 2 */
    	test_message2_2 = cache_test_message_create(cache_type, "2", "2");
    	ast_test_validate(test, NULL != test_message2_2);
    	stasis_publish(topic, test_message2_2);
    
    	actual_len = consumer_wait_for(consumer, 3);
    	ast_test_validate(test, 3 == actual_len);
    
    	/* Check the cache */
    
    	ao2_cleanup(cache_dump);
    
    	cache_dump = stasis_cache_dump(cache, NULL);
    
    	ast_test_validate(test, NULL != cache_dump);
    	ast_test_validate(test, 2 == ao2_container_count(cache_dump));
    	i = ao2_iterator_init(cache_dump, 0);
    	while ((obj = ao2_iterator_next(&i))) {
    		RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
    		ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_2);
    	}
    
    	test_message1_clear = stasis_cache_clear_create(test_message1_1);
    
    	ast_test_validate(test, NULL != test_message1_clear);
    	stasis_publish(topic, test_message1_clear);
    
    	actual_len = consumer_wait_for(consumer, 4);
    	ast_test_validate(test, 4 == actual_len);
    
    	/* Check the cache */
    
    	ao2_cleanup(cache_dump);
    
    	cache_dump = stasis_cache_dump(cache, NULL);
    
    	ast_test_validate(test, NULL != cache_dump);
    
    	ast_test_validate(test, 1 == ao2_container_count(cache_dump));
    
    	i = ao2_iterator_init(cache_dump, 0);
    	while ((obj = ao2_iterator_next(&i))) {
    		RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
    		ast_test_validate(test, actual_cache_entry == test_message2_2);
    	}
    
    
    	/* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
    
    	ao2_cleanup(cache_dump);
    
    	cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type());
    
    	ast_test_validate(test, 0 == ao2_container_count(cache_dump));
    
    
    AST_TEST_DEFINE(cache_eid_aggregate)
    {
    	RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
    	RAII_VAR(struct consumer *, cache_consumer, NULL, ao2_cleanup);
    	RAII_VAR(struct consumer *, topic_consumer, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_subscription *, topic_sub, NULL, stasis_unsubscribe);
    	RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
    	RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message2_3, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message2_4, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message2_clear, NULL, ao2_cleanup);
    	RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
    	int actual_len;
    	struct ao2_iterator i;
    	void *obj;
    	struct ast_eid foreign_eid1;
    	struct ast_eid foreign_eid2;
    
    	switch (cmd) {
    	case TEST_INIT:
    		info->name = __func__;
    		info->category = test_category;
    		info->summary = "Test cache eid and aggregate support.";
    		info->description = "Test cache eid and aggregate support.";
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    	memset(&foreign_eid1, 0xAA, sizeof(foreign_eid1));
    	memset(&foreign_eid2, 0xBB, sizeof(foreign_eid2));
    
    
    	ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    	ast_test_validate(test, NULL != cache_type);
    
    	topic = stasis_topic_create("SomeTopic");
    	ast_test_validate(test, NULL != topic);
    
    	/* To consume events published to the topic. */
    	topic_consumer = consumer_create(1);
    	ast_test_validate(test, NULL != topic_consumer);
    
    	topic_sub = stasis_subscribe(topic, consumer_exec, topic_consumer);
    	ast_test_validate(test, NULL != topic_sub);
    	ao2_ref(topic_consumer, +1);
    
    	cache = stasis_cache_create_full(cache_test_data_id,
    		cache_test_aggregate_calc_fn, cache_test_aggregate_publish_fn);
    	ast_test_validate(test, NULL != cache);
    
    	caching_topic = stasis_caching_topic_create(topic, cache);
    	ast_test_validate(test, NULL != caching_topic);
    
    	/* To consume update events published to the caching_topic. */
    	cache_consumer = consumer_create(1);
    	ast_test_validate(test, NULL != cache_consumer);
    
    	cache_sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, cache_consumer);
    	ast_test_validate(test, NULL != cache_sub);
    	ao2_ref(cache_consumer, +1);
    
    	/* Create test messages. */
    	test_message1_1 = cache_test_message_create_full(cache_type, "1", "1", &ast_eid_default);
    	ast_test_validate(test, NULL != test_message1_1);
    	test_message2_1 = cache_test_message_create_full(cache_type, "2", "1", &ast_eid_default);
    	ast_test_validate(test, NULL != test_message2_1);
    	test_message2_2 = cache_test_message_create_full(cache_type, "2", "2", &foreign_eid1);
    	ast_test_validate(test, NULL != test_message2_2);
    	test_message2_3 = cache_test_message_create_full(cache_type, "2", "3", &foreign_eid2);
    	ast_test_validate(test, NULL != test_message2_3);
    	test_message2_4 = cache_test_message_create_full(cache_type, "2", "4", &foreign_eid2);
    	ast_test_validate(test, NULL != test_message2_4);
    
    	/* Post some snapshots */
    	stasis_publish(topic, test_message1_1);
    	ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", "1"));
    	stasis_publish(topic, test_message2_1);
    	ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "1"));
    	stasis_publish(topic, test_message2_2);
    	ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "3"));
    
    	actual_len = consumer_wait_for(cache_consumer, 6);
    	ast_test_validate(test, 6 == actual_len);
    	actual_len = consumer_wait_for(topic_consumer, 6);
    	ast_test_validate(test, 6 == actual_len);
    
    	/* Check the cache */
    	ao2_cleanup(cache_dump);
    	cache_dump = stasis_cache_dump_all(cache, NULL);
    	ast_test_validate(test, NULL != cache_dump);
    	ast_test_validate(test, 3 == ao2_container_count(cache_dump));
    	i = ao2_iterator_init(cache_dump, 0);
    	while ((obj = ao2_iterator_next(&i))) {
    		RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
    
    		ast_test_validate(test,
    			actual_cache_entry == test_message1_1
    			|| actual_cache_entry == test_message2_1
    			|| actual_cache_entry == test_message2_2);
    	}
    	ao2_iterator_destroy(&i);
    
    	/* Check the local cached items */
    	ao2_cleanup(cache_dump);
    	cache_dump = stasis_cache_dump_by_eid(cache, NULL, &ast_eid_default);
    	ast_test_validate(test, NULL != cache_dump);
    	ast_test_validate(test, 2 == ao2_container_count(cache_dump));
    	i = ao2_iterator_init(cache_dump, 0);
    	while ((obj = ao2_iterator_next(&i))) {
    		RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
    
    		ast_test_validate(test,
    			actual_cache_entry == test_message1_1
    			|| actual_cache_entry == test_message2_1);
    	}
    	ao2_iterator_destroy(&i);
    
    	/* Post snapshot 2 from another eid. */
    	stasis_publish(topic, test_message2_3);
    	ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "6"));
    
    	actual_len = consumer_wait_for(cache_consumer, 8);
    	ast_test_validate(test, 8 == actual_len);
    	actual_len = consumer_wait_for(topic_consumer, 8);
    	ast_test_validate(test, 8 == actual_len);
    
    	/* Check the cache */
    	ao2_cleanup(cache_dump);
    	cache_dump = stasis_cache_dump_all(cache, NULL);
    	ast_test_validate(test, NULL != cache_dump);
    	ast_test_validate(test, 4 == ao2_container_count(cache_dump));
    	i = ao2_iterator_init(cache_dump, 0);
    	while ((obj = ao2_iterator_next(&i))) {
    		RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
    
    		ast_test_validate(test,
    			actual_cache_entry == test_message1_1
    			|| actual_cache_entry == test_message2_1
    			|| actual_cache_entry == test_message2_2
    			|| actual_cache_entry == test_message2_3);
    	}
    	ao2_iterator_destroy(&i);
    
    	/* Check the remote cached items */
    	ao2_cleanup(cache_dump);
    	cache_dump = stasis_cache_dump_by_eid(cache, NULL, &foreign_eid1);
    	ast_test_validate(test, NULL != cache_dump);
    	ast_test_validate(test, 1 == ao2_container_count(cache_dump));
    	i = ao2_iterator_init(cache_dump, 0);
    	while ((obj = ao2_iterator_next(&i))) {
    		RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
    
    		ast_test_validate(test, actual_cache_entry == test_message2_2);
    	}
    	ao2_iterator_destroy(&i);
    
    	/* Post snapshot 2 from a repeated eid. */
    	stasis_publish(topic, test_message2_4);
    	ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "7"));
    
    	actual_len = consumer_wait_for(cache_consumer, 10);
    	ast_test_validate(test, 10 == actual_len);
    	actual_len = consumer_wait_for(topic_consumer, 10);
    	ast_test_validate(test, 10 == actual_len);
    
    	/* Check the cache */
    	ao2_cleanup(cache_dump);
    	cache_dump = stasis_cache_dump_all(cache, NULL);
    	ast_test_validate(test, NULL != cache_dump);
    	ast_test_validate(test, 4 == ao2_container_count(cache_dump));
    	i = ao2_iterator_init(cache_dump, 0);
    	while ((obj = ao2_iterator_next(&i))) {
    		RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
    
    		ast_test_validate(test,
    			actual_cache_entry == test_message1_1
    			|| actual_cache_entry == test_message2_1
    			|| actual_cache_entry == test_message2_2
    			|| actual_cache_entry == test_message2_4);
    	}
    	ao2_iterator_destroy(&i);
    
    	/* Check all snapshot 2 cache entries. */
    	ao2_cleanup(cache_dump);
    	cache_dump = stasis_cache_get_all(cache, cache_type, "2");
    	ast_test_validate(test, NULL != cache_dump);
    	ast_test_validate(test, 3 == ao2_container_count(cache_dump));
    	i = ao2_iterator_init(cache_dump, 0);
    	while ((obj = ao2_iterator_next(&i))) {
    		RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
    
    		ast_test_validate(test,
    			actual_cache_entry == test_message2_1
    			|| actual_cache_entry == test_message2_2
    			|| actual_cache_entry == test_message2_4);
    	}
    	ao2_iterator_destroy(&i);
    
    	/* Clear snapshot 1 */
    	test_message1_clear = stasis_cache_clear_create(test_message1_1);
    	ast_test_validate(test, NULL != test_message1_clear);
    	stasis_publish(topic, test_message1_clear);
    	ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", NULL));
    
    	actual_len = consumer_wait_for(cache_consumer, 12);
    	ast_test_validate(test, 12 == actual_len);
    	actual_len = consumer_wait_for(topic_consumer, 11);
    	ast_test_validate(test, 11 == actual_len);
    
    	/* Check the cache */
    	ao2_cleanup(cache_dump);
    	cache_dump = stasis_cache_dump_all(cache, NULL);
    	ast_test_validate(test, NULL != cache_dump);
    	ast_test_validate(test, 3 == ao2_container_count(cache_dump));
    	i = ao2_iterator_init(cache_dump, 0);
    	while ((obj = ao2_iterator_next(&i))) {
    		RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
    
    		ast_test_validate(test,
    			actual_cache_entry == test_message2_1
    			|| actual_cache_entry == test_message2_2
    			|| actual_cache_entry == test_message2_4);
    	}
    	ao2_iterator_destroy(&i);
    
    	/* Clear snapshot 2 from a remote eid */
    	test_message2_clear = stasis_cache_clear_create(test_message2_2);
    	ast_test_validate(test, NULL != test_message2_clear);
    	stasis_publish(topic, test_message2_clear);
    	ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "5"));
    
    	actual_len = consumer_wait_for(cache_consumer, 14);
    	ast_test_validate(test, 14 == actual_len);
    	actual_len = consumer_wait_for(topic_consumer, 13);
    	ast_test_validate(test, 13 == actual_len);
    
    	/* Check the cache */
    	ao2_cleanup(cache_dump);
    	cache_dump = stasis_cache_dump_all(cache, NULL);
    	ast_test_validate(test, NULL != cache_dump);
    	ast_test_validate(test, 2 == ao2_container_count(cache_dump));
    	i = ao2_iterator_init(cache_dump, 0);
    	while ((obj = ao2_iterator_next(&i))) {
    		RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
    
    		ast_test_validate(test,
    			actual_cache_entry == test_message2_1
    			|| actual_cache_entry == test_message2_4);
    	}
    	ao2_iterator_destroy(&i);
    
    	return AST_TEST_PASS;
    }
    
    
    AST_TEST_DEFINE(router)
    {
    	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
    
    	RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
    
    	RAII_VAR(char *, test_data, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
    	RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
    	RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
    	RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
    	int actual_len, ret;
    	struct stasis_message *actual;
    
    	switch (cmd) {
    	case TEST_INIT:
    		info->name = __func__;
    		info->category = test_category;
    		info->summary = "Test simple message routing";
    		info->description = "Test simple message routing";
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    	topic = stasis_topic_create("TestTopic");
    	ast_test_validate(test, NULL != topic);
    
    	consumer1 = consumer_create(1);
    	ast_test_validate(test, NULL != consumer1);
    	consumer2 = consumer_create(1);
    	ast_test_validate(test, NULL != consumer2);
    	consumer3 = consumer_create(1);
    	ast_test_validate(test, NULL != consumer3);
    
    
    	ast_test_validate(test, stasis_message_type_create("TestMessage1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    	ast_test_validate(test, NULL != test_message_type1);
    
    	ast_test_validate(test, stasis_message_type_create("TestMessage2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    	ast_test_validate(test, NULL != test_message_type2);
    
    	ast_test_validate(test, stasis_message_type_create("TestMessage3", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    	ast_test_validate(test, NULL != test_message_type3);
    
    	uut = stasis_message_router_create(topic);
    	ast_test_validate(test, NULL != uut);
    
    	ret = stasis_message_router_add(
    		uut, test_message_type1, consumer_exec, consumer1);
    	ast_test_validate(test, 0 == ret);
    	ao2_ref(consumer1, +1);
    	ret = stasis_message_router_add(
    		uut, test_message_type2, consumer_exec, consumer2);
    	ast_test_validate(test, 0 == ret);
    	ao2_ref(consumer2, +1);
    	ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
    	ast_test_validate(test, 0 == ret);
    	ao2_ref(consumer3, +1);
    
    	test_data = ao2_alloc(1, NULL);
    	ast_test_validate(test, NULL != test_data);
    	test_message1 = stasis_message_create(test_message_type1, test_data);
    	ast_test_validate(test, NULL != test_message1);
    	test_message2 = stasis_message_create(test_message_type2, test_data);
    	ast_test_validate(test, NULL != test_message2);
    	test_message3 = stasis_message_create(test_message_type3, test_data);
    	ast_test_validate(test, NULL != test_message3);
    
    	stasis_publish(topic, test_message1);
    	stasis_publish(topic, test_message2);
    	stasis_publish(topic, test_message3);
    
    	actual_len = consumer_wait_for(consumer1, 1);
    	ast_test_validate(test, 1 == actual_len);
    	actual_len = consumer_wait_for(consumer2, 1);
    	ast_test_validate(test, 1 == actual_len);
    	actual_len = consumer_wait_for(consumer3, 1);
    	ast_test_validate(test, 1 == actual_len);
    
    	actual = consumer1->messages_rxed[0];
    	ast_test_validate(test, test_message1 == actual);
    
    	actual = consumer2->messages_rxed[0];
    	ast_test_validate(test, test_message2 == actual);
    
    	actual = consumer3->messages_rxed[0];
    	ast_test_validate(test, test_message3 == actual);
    
    	/* consumer1 and consumer2 do not get the final message. */
    	ao2_cleanup(consumer1);
    	ao2_cleanup(consumer2);
    
    	return AST_TEST_PASS;
    }
    
    
    AST_TEST_DEFINE(router_pool)
    {
    	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
    	RAII_VAR(char *, test_data, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
    	RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
    	RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
    	RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
    	int actual_len, ret;
    	struct stasis_message *actual;
    
    	switch (cmd) {
    	case TEST_INIT:
    		info->name = __func__;
    		info->category = test_category;
    		info->summary = "Test message routing via threadpool";
    		info->description = "Test simple message routing when\n"
    			"the subscriptions dictate usage of the Stasis\n"
    
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    	topic = stasis_topic_create("TestTopic");
    	ast_test_validate(test, NULL != topic);
    
    	consumer1 = consumer_create(1);
    	ast_test_validate(test, NULL != consumer1);
    	consumer2 = consumer_create(1);
    	ast_test_validate(test, NULL != consumer2);
    	consumer3 = consumer_create(1);
    	ast_test_validate(test, NULL != consumer3);
    
    
    	ast_test_validate(test, stasis_message_type_create("TestMessage1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    	ast_test_validate(test, NULL != test_message_type1);
    
    	ast_test_validate(test, stasis_message_type_create("TestMessage2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    	ast_test_validate(test, NULL != test_message_type2);
    
    	ast_test_validate(test, stasis_message_type_create("TestMessage3", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    	ast_test_validate(test, NULL != test_message_type3);
    
    	uut = stasis_message_router_create_pool(topic);
    	ast_test_validate(test, NULL != uut);
    
    	ret = stasis_message_router_add(
    		uut, test_message_type1, consumer_exec, consumer1);
    	ast_test_validate(test, 0 == ret);
    	ao2_ref(consumer1, +1);
    	ret = stasis_message_router_add(
    		uut, test_message_type2, consumer_exec, consumer2);
    	ast_test_validate(test, 0 == ret);
    	ao2_ref(consumer2, +1);
    	ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
    	ast_test_validate(test, 0 == ret);
    	ao2_ref(consumer3, +1);
    
    	test_data = ao2_alloc(1, NULL);
    	ast_test_validate(test, NULL != test_data);
    	test_message1 = stasis_message_create(test_message_type1, test_data);
    	ast_test_validate(test, NULL != test_message1);
    	test_message2 = stasis_message_create(test_message_type2, test_data);
    	ast_test_validate(test, NULL != test_message2);
    	test_message3 = stasis_message_create(test_message_type3, test_data);
    	ast_test_validate(test, NULL != test_message3);
    
    	stasis_publish(topic, test_message1);
    	stasis_publish(topic, test_message2);
    	stasis_publish(topic, test_message3);
    
    	actual_len = consumer_wait_for(consumer1, 1);
    	ast_test_validate(test, 1 == actual_len);
    	actual_len = consumer_wait_for(consumer2, 1);
    	ast_test_validate(test, 1 == actual_len);
    	actual_len = consumer_wait_for(consumer3, 1);
    	ast_test_validate(test, 1 == actual_len);
    
    	actual = consumer1->messages_rxed[0];
    	ast_test_validate(test, test_message1 == actual);
    
    	actual = consumer2->messages_rxed[0];
    	ast_test_validate(test, test_message2 == actual);
    
    	actual = consumer3->messages_rxed[0];
    	ast_test_validate(test, test_message3 == actual);
    
    	/* consumer1 and consumer2 do not get the final message. */
    	ao2_cleanup(consumer1);
    	ao2_cleanup(consumer2);
    
    	return AST_TEST_PASS;
    }
    
    
    static const char *cache_simple(struct stasis_message *message)
    {
    
    	const char *type_name =
    		stasis_message_type_name(stasis_message_type(message));
    	if (!ast_begins_with(type_name, "Cache")) {
    		return NULL;
    	}
    
    	return "cached";
    }
    
    AST_TEST_DEFINE(router_cache_updates)
    {
    	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
    
    	RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe_and_join);
    
    	RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
    	RAII_VAR(char *, test_data, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
    	RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
    	RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
    	RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
    	struct stasis_cache_update *update;
    	int actual_len, ret;
    	struct stasis_message *actual;
    
    	switch (cmd) {
    	case TEST_INIT:
    		info->name = __func__;
    		info->category = test_category;
    		info->summary = "Test special handling cache_update messages";
    		info->description = "Test special handling cache_update messages";
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    	topic = stasis_topic_create("TestTopic");
    	ast_test_validate(test, NULL != topic);
    
    
    	cache = stasis_cache_create(cache_simple);
    	ast_test_validate(test, NULL != cache);
    	caching_topic = stasis_caching_topic_create(topic, cache);
    
    	ast_test_validate(test, NULL != caching_topic);
    
    	consumer1 = consumer_create(1);
    	ast_test_validate(test, NULL != consumer1);
    	consumer2 = consumer_create(1);
    	ast_test_validate(test, NULL != consumer2);
    	consumer3 = consumer_create(1);
    	ast_test_validate(test, NULL != consumer3);
    
    
    	ast_test_validate(test, stasis_message_type_create("Cache1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    	ast_test_validate(test, NULL != test_message_type1);
    
    	ast_test_validate(test, stasis_message_type_create("Cache2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    	ast_test_validate(test, NULL != test_message_type2);
    
    	ast_test_validate(test, stasis_message_type_create("NonCache", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    	ast_test_validate(test, NULL != test_message_type3);
    
    	uut = stasis_message_router_create(
    		stasis_caching_get_topic(caching_topic));
    	ast_test_validate(test, NULL != uut);
    
    	ret = stasis_message_router_add_cache_update(
    		uut, test_message_type1, consumer_exec, consumer1);
    	ast_test_validate(test, 0 == ret);
    	ao2_ref(consumer1, +1);
    	ret = stasis_message_router_add(
    		uut, stasis_cache_update_type(), consumer_exec, consumer2);
    	ast_test_validate(test, 0 == ret);
    	ao2_ref(consumer2, +1);
    	ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
    	ast_test_validate(test, 0 == ret);
    	ao2_ref(consumer3, +1);
    
    	test_data = ao2_alloc(1, NULL);
    	ast_test_validate(test, NULL != test_data);
    	test_message1 = stasis_message_create(test_message_type1, test_data);
    	ast_test_validate(test, NULL != test_message1);
    	test_message2 = stasis_message_create(test_message_type2, test_data);
    	ast_test_validate(test, NULL != test_message2);
    	test_message3 = stasis_message_create(test_message_type3, test_data);
    	ast_test_validate(test, NULL != test_message3);
    
    	stasis_publish(topic, test_message1);
    	stasis_publish(topic, test_message2);
    	stasis_publish(topic, test_message3);
    
    	actual_len = consumer_wait_for(consumer1, 1);
    	ast_test_validate(test, 1 == actual_len);
    	actual_len = consumer_wait_for(consumer2, 1);
    	ast_test_validate(test, 1 == actual_len);
    
    	/* Uncacheable message should not be passed through */
    	actual_len = consumer_should_stay(consumer3, 0);
    	ast_test_validate(test, 0 == actual_len);
    
    
    	actual = consumer1->messages_rxed[0];
    	ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
    	update = stasis_message_data(actual);
    	ast_test_validate(test, test_message_type1 == update->type);
    	ast_test_validate(test, test_message1 == update->new_snapshot);
    
    	actual = consumer2->messages_rxed[0];
    	ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
    	update = stasis_message_data(actual);
    	ast_test_validate(test, test_message_type2 == update->type);
    	ast_test_validate(test, test_message2 == update->new_snapshot);
    
    	/* consumer1 and consumer2 do not get the final message. */
    	ao2_cleanup(consumer1);
    	ao2_cleanup(consumer2);
    
    	return AST_TEST_PASS;
    }
    
    
    AST_TEST_DEFINE(no_to_json)
    {
    	RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
    	RAII_VAR(char *, data, NULL, ao2_cleanup);
    	RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
    	char *expected = "SomeData";
    
    	switch (cmd) {
    	case TEST_INIT:
    		info->name = __func__;
    		info->category = test_category;
    		info->summary = "Test message to_json function";
    		info->description = "Test message to_json function";
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    	/* Test NULL */
    
    	actual = stasis_message_to_json(NULL, NULL);
    
    	ast_test_validate(test, NULL == actual);
    
    	/* Test message with NULL to_json function */
    
    	ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    
    	data = ao2_alloc(strlen(expected) + 1, NULL);
    	strcpy(data, expected);
    	uut = stasis_message_create(type, data);
    	ast_test_validate(test, NULL != uut);
    
    
    	actual = stasis_message_to_json(uut, NULL);
    
    	ast_test_validate(test, NULL == actual);
    
    	return AST_TEST_PASS;
    }
    
    AST_TEST_DEFINE(to_json)
    {
    	RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
    	RAII_VAR(char *, data, NULL, ao2_cleanup);
    	RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
    	const char *expected_text = "SomeData";
    	RAII_VAR(struct ast_json *, expected, NULL, ast_json_unref);
    
    	switch (cmd) {
    	case TEST_INIT:
    		info->name = __func__;
    		info->category = test_category;
    		info->summary = "Test message to_json function when NULL";
    		info->description = "Test message to_json function when NULL";
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    
    	ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    
    	data = ao2_alloc(strlen(expected_text) + 1, NULL);
    	strcpy(data, expected_text);
    	uut = stasis_message_create(type, data);
    	ast_test_validate(test, NULL != uut);
    
    	expected = ast_json_string_create(expected_text);
    
    	actual = stasis_message_to_json(uut, NULL);
    
    	ast_test_validate(test, ast_json_equal(expected, actual));
    
    	return AST_TEST_PASS;
    }
    
    AST_TEST_DEFINE(no_to_ami)
    {
    	RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
    	RAII_VAR(char *, data, NULL, ao2_cleanup);
    	RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
    	char *expected = "SomeData";
    
    	switch (cmd) {
    	case TEST_INIT:
    		info->name = __func__;
    		info->category = test_category;
    		info->summary = "Test message to_ami function when NULL";
    		info->description = "Test message to_ami function when NULL";
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    	/* Test NULL */
    	actual = stasis_message_to_ami(NULL);
    	ast_test_validate(test, NULL == actual);
    
    	/* Test message with NULL to_ami function */
    
    	ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    
    	data = ao2_alloc(strlen(expected) + 1, NULL);
    	strcpy(data, expected);
    	uut = stasis_message_create(type, data);
    	ast_test_validate(test, NULL != uut);
    
    	actual = stasis_message_to_ami(uut);
    	ast_test_validate(test, NULL == actual);
    
    	return AST_TEST_PASS;
    }
    
    AST_TEST_DEFINE(to_ami)
    {
    	RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
    	RAII_VAR(char *, data, NULL, ao2_cleanup);
    	RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
    	const char *expected_text = "SomeData";
    
    	const char *expected = "Message: SomeData\r\n";
    
    
    	switch (cmd) {
    	case TEST_INIT:
    		info->name = __func__;
    		info->category = test_category;
    		info->summary = "Test message to_ami function";
    		info->description = "Test message to_ami function";
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    
    	ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
    
    
    	data = ao2_alloc(strlen(expected_text) + 1, NULL);
    	strcpy(data, expected_text);
    	uut = stasis_message_create(type, data);
    	ast_test_validate(test, NULL != uut);
    
    	actual = stasis_message_to_ami(uut);
    	ast_test_validate(test, strcmp(expected, actual->extra_fields) == 0);
    
    	return AST_TEST_PASS;
    }
    
    
    static void noop(void *data, struct stasis_subscription *sub,
    
    	struct stasis_message *message)
    
    {
    	/* no-op */
    }
    
    AST_TEST_DEFINE(dtor_order)
    {
    	RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
    	RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
    
    	switch (cmd) {
    	case TEST_INIT:
    		info->name = __func__;
    		info->category = test_category;
    		info->summary = "Test that destruction order doesn't bomb stuff";
    		info->description = "Test that destruction order doesn't bomb stuff";
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    	topic = stasis_topic_create("test-topic");
    	ast_test_validate(test, NULL != topic);
    
    	sub = stasis_subscribe(topic, noop, NULL);
    	ast_test_validate(test, NULL != sub);
    
    	/* With any luck, this won't completely blow everything up */
    	ao2_cleanup(topic);
    	stasis_unsubscribe(sub);
    
    	/* These refs were cleaned up manually */
    	topic = NULL;
    	sub = NULL;
    
    	return AST_TEST_PASS;
    }
    
    static const char *noop_get_id(struct stasis_message *message)
    {
    	return NULL;
    }
    
    AST_TEST_DEFINE(caching_dtor_order)