Skip to content
Snippets Groups Projects
test_threadpool.c 41.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • 	pool = ast_threadpool_create(info->name, listener, &options);
    
    	std1 = simple_task_data_alloc();
    	std2 = simple_task_data_alloc();
    	std3 = simple_task_data_alloc();
    	std4 = simple_task_data_alloc();
    	if (!std1 || !std2 || !std3 || !std4) {
    
    	ast_threadpool_push(pool, simple_task, std1);
    
    
    	/* Pushing the task should result in the threadpool growing
    	 * by three threads. This will allow the task to actually execute
    	 */
    
    	res = wait_for_completion(test, std1);
    
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	res = wait_for_empty_notice(test, tld);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	res = wait_until_thread_state(test, tld, 0, 3);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	/* Now push three tasks into the pool and ensure the pool does not
    	 * grow.
    	 */
    	ast_threadpool_push(pool, simple_task, std2);
    	ast_threadpool_push(pool, simple_task, std3);
    	ast_threadpool_push(pool, simple_task, std4);
    
    	res = wait_for_completion(test, std2);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    	res = wait_for_completion(test, std3);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    	res = wait_for_completion(test, std4);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	res = wait_for_empty_notice(test, tld);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = wait_until_thread_state_task_pushed(test, tld, 0, 3, 4);
    
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	ast_threadpool_shutdown(pool);
    
    	ao2_cleanup(listener);
    
    	ast_free(std1);
    	ast_free(std2);
    	ast_free(std3);
    	ast_free(std4);
    
    AST_TEST_DEFINE(threadpool_max_size)
    {
    	struct ast_threadpool *pool = NULL;
    	struct ast_threadpool_listener *listener = NULL;
    	struct simple_task_data *std = NULL;
    	enum ast_test_result_state res = AST_TEST_FAIL;
    	struct test_listener_data *tld = NULL;
    	struct ast_threadpool_options options = {
    		.version = AST_THREADPOOL_OPTIONS_VERSION,
    		.idle_timeout = 0,
    		.auto_increment = 3,
    		.initial_size = 0,
    		.max_size = 2,
    	};
    
    	switch (cmd) {
    	case TEST_INIT:
    		info->name = "max_size";
    		info->category = "/main/threadpool/";
    		info->summary = "Test that the threadpool does not exceed its maximum size restriction";
    		info->description =
    			"Create an empty threadpool and push a task to it. Once the task is\n"
    			"pushed, the threadpool should attempt to grow by three threads, but the\n"
    
    			"pool's restrictions should only allow two threads to be added.";
    
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    	tld = test_alloc();
    	if (!tld) {
    		return AST_TEST_FAIL;
    	}
    
    	listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
    	if (!listener) {
    		goto end;
    	}
    
    	pool = ast_threadpool_create(info->name, listener, &options);
    	if (!pool) {
    		goto end;
    	}
    
    	std = simple_task_data_alloc();
    	if (!std) {
    		goto end;
    	}
    
    	ast_threadpool_push(pool, simple_task, std);
    
    	res = wait_for_completion(test, std);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	res = wait_until_thread_state(test, tld, 0, 2);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	res = listener_check(test, listener, 1, 1, 1, 0, 2, 1);
    end:
    	ast_threadpool_shutdown(pool);
    	ao2_cleanup(listener);
    	ast_free(std);
    	ast_free(tld);
    	return res;
    }
    
    
    AST_TEST_DEFINE(threadpool_reactivation)
    {
    	struct ast_threadpool *pool = NULL;
    	struct ast_threadpool_listener *listener = NULL;
    	struct simple_task_data *std1 = NULL;
    	struct simple_task_data *std2 = NULL;
    	enum ast_test_result_state res = AST_TEST_FAIL;
    
    	struct test_listener_data *tld = NULL;
    
    	struct ast_threadpool_options options = {
    		.version = AST_THREADPOOL_OPTIONS_VERSION,
    		.idle_timeout = 0,
    
    		.auto_increment = 0,
    
    
    	switch (cmd) {
    	case TEST_INIT:
    
    		info->category = "/main/threadpool/";
    		info->summary = "Test that a threadpool reactivates when work is added";
    		info->description =
    			"Push a task into a threadpool. Make sure the task executes and the\n"
    			"thread goes idle. Then push a second task and ensure that the thread\n"
    
    			"awakens and executes the second task.";
    
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    
    	tld = test_alloc();
    	if (!tld) {
    
    		return AST_TEST_FAIL;
    	}
    
    
    	listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
    	if (!listener) {
    		goto end;
    	}
    
    	pool = ast_threadpool_create(info->name, listener, &options);
    
    	if (!pool) {
    		goto end;
    	}
    
    	std1 = simple_task_data_alloc();
    	std2 = simple_task_data_alloc();
    	if (!std1 || !std2) {
    		goto end;
    	}
    
    	ast_threadpool_push(pool, simple_task, std1);
    
    	ast_threadpool_set_size(pool, 1);
    
    
    	res = wait_for_completion(test, std1);
    
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = wait_for_empty_notice(test, tld);
    
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	res = wait_until_thread_state(test, tld, 0, 1);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
    
    
    	/* Now make sure the threadpool reactivates when we add a second task */
    	ast_threadpool_push(pool, simple_task, std2);
    
    
    	res = wait_for_completion(test, std2);
    
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = wait_for_empty_notice(test, tld);
    
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	res = wait_until_thread_state(test, tld, 0, 1);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = listener_check(test, listener, 1, 1, 2, 0, 1, 1);
    
    end:
    
    	ast_threadpool_shutdown(pool);
    
    	ao2_cleanup(listener);
    	ast_free(std1);
    	ast_free(std2);
    
    struct complex_task_data {
    
    	int task_started;
    
    	int task_executed;
    	int continue_task;
    	ast_mutex_t lock;
    	ast_cond_t stall_cond;
    
    	ast_cond_t notify_cond;
    
    };
    
    static struct complex_task_data *complex_task_data_alloc(void)
    {
    	struct complex_task_data *ctd = ast_calloc(1, sizeof(*ctd));
    
    	if (!ctd) {
    		return NULL;
    	}
    	ast_mutex_init(&ctd->lock);
    	ast_cond_init(&ctd->stall_cond, NULL);
    
    	ast_cond_init(&ctd->notify_cond, NULL);
    
    	return ctd;
    }
    
    static int complex_task(void *data)
    {
    	struct complex_task_data *ctd = data;
    	SCOPED_MUTEX(lock, &ctd->lock);
    
    	/* Notify that we started */
    	ctd->task_started = 1;
    	ast_cond_signal(&ctd->notify_cond);
    
    	while (!ctd->continue_task) {
    		ast_cond_wait(&ctd->stall_cond, lock);
    	}
    	/* We got poked. Finish up */
    	ctd->task_executed = 1;
    
    	ast_cond_signal(&ctd->notify_cond);
    
    	return 0;
    }
    
    static void poke_worker(struct complex_task_data *ctd)
    {
    	SCOPED_MUTEX(lock, &ctd->lock);
    	ctd->continue_task = 1;
    	ast_cond_signal(&ctd->stall_cond);
    }
    
    
    static int wait_for_complex_start(struct complex_task_data *ctd)
    {
    	struct timeval start = ast_tvnow();
    	struct timespec end = {
    		.tv_sec = start.tv_sec + 5,
    		.tv_nsec = start.tv_usec * 1000
    	};
    	SCOPED_MUTEX(lock, &ctd->lock);
    
    	while (!ctd->task_started) {
    		if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
    			break;
    		}
    	}
    
    	return ctd->task_started;
    }
    
    static int has_complex_started(struct complex_task_data *ctd)
    {
    	struct timeval start = ast_tvnow();
    	struct timespec end = {
    		.tv_sec = start.tv_sec + 1,
    		.tv_nsec = start.tv_usec * 1000
    	};
    	SCOPED_MUTEX(lock, &ctd->lock);
    
    	while (!ctd->task_started) {
    		if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
    			break;
    		}
    	}
    
    	return ctd->task_started;
    }
    
    
    static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
    {
    	struct timeval start = ast_tvnow();
    	struct timespec end = {
    		.tv_sec = start.tv_sec + 5,
    		.tv_nsec = start.tv_usec * 1000
    	};
    	enum ast_test_result_state res = AST_TEST_PASS;
    	SCOPED_MUTEX(lock, &ctd->lock);
    
    	while (!ctd->task_executed) {
    
    		if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
    
    	}
    
    	if (!ctd->task_executed) {
    		res = AST_TEST_FAIL;
    	}
    	return res;
    }
    
    AST_TEST_DEFINE(threadpool_task_distribution)
    {
    	struct ast_threadpool *pool = NULL;
    	struct ast_threadpool_listener *listener = NULL;
    	struct complex_task_data *ctd1 = NULL;
    	struct complex_task_data *ctd2 = NULL;
    	enum ast_test_result_state res = AST_TEST_FAIL;
    
    	struct test_listener_data *tld = NULL;
    
    	struct ast_threadpool_options options = {
    		.version = AST_THREADPOOL_OPTIONS_VERSION,
    		.idle_timeout = 0,
    
    		.auto_increment = 0,
    
    
    	switch (cmd) {
    	case TEST_INIT:
    
    		info->category = "/main/threadpool/";
    		info->summary = "Test that tasks are evenly distributed to threads";
    		info->description =
    			"Push two tasks into a threadpool. Ensure that each is handled by\n"
    
    			"a separate thread";
    
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    
    	tld = test_alloc();
    	if (!tld) {
    
    		return AST_TEST_FAIL;
    	}
    
    
    	listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
    	if (!listener) {
    		goto end;
    	}
    
    	pool = ast_threadpool_create(info->name, listener, &options);
    
    	if (!pool) {
    		goto end;
    	}
    
    	ctd1 = complex_task_data_alloc();
    	ctd2 = complex_task_data_alloc();
    	if (!ctd1 || !ctd2) {
    		goto end;
    	}
    
    	ast_threadpool_push(pool, complex_task, ctd1);
    	ast_threadpool_push(pool, complex_task, ctd2);
    
    	ast_threadpool_set_size(pool, 2);
    
    
    	res = wait_until_thread_state(test, tld, 2, 0);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = listener_check(test, listener, 1, 0, 2, 2, 0, 0);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	/* The tasks are stalled until we poke them */
    	poke_worker(ctd1);
    	poke_worker(ctd2);
    
    	res = wait_for_complex_completion(ctd1);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    	res = wait_for_complex_completion(ctd2);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = wait_until_thread_state(test, tld, 0, 2);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
    
    end:
    
    	ast_threadpool_shutdown(pool);
    
    	ao2_cleanup(listener);
    	ast_free(ctd1);
    	ast_free(ctd2);
    
    AST_TEST_DEFINE(threadpool_more_destruction)
    {
    	struct ast_threadpool *pool = NULL;
    	struct ast_threadpool_listener *listener = NULL;
    	struct complex_task_data *ctd1 = NULL;
    	struct complex_task_data *ctd2 = NULL;
    	enum ast_test_result_state res = AST_TEST_FAIL;
    
    	struct test_listener_data *tld = NULL;
    
    	struct ast_threadpool_options options = {
    		.version = AST_THREADPOOL_OPTIONS_VERSION,
    		.idle_timeout = 0,
    
    		.auto_increment = 0,
    
    
    	switch (cmd) {
    	case TEST_INIT:
    
    		info->category = "/main/threadpool/";
    		info->summary = "Test that threads are destroyed as expected";
    		info->description =
    			"Push two tasks into a threadpool. Set the threadpool size to 4\n"
    			"Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
    
    			"threadpool down to 1 thread. Ensure that the thread leftover is active\n"
    
    			"and ensure that both tasks complete.";
    
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    
    	tld = test_alloc();
    	if (!tld) {
    
    		return AST_TEST_FAIL;
    	}
    
    
    	listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
    	if (!listener) {
    		goto end;
    	}
    
    	pool = ast_threadpool_create(info->name, listener, &options);
    
    	if (!pool) {
    		goto end;
    	}
    
    	ctd1 = complex_task_data_alloc();
    	ctd2 = complex_task_data_alloc();
    	if (!ctd1 || !ctd2) {
    		goto end;
    	}
    
    	ast_threadpool_push(pool, complex_task, ctd1);
    	ast_threadpool_push(pool, complex_task, ctd2);
    
    	ast_threadpool_set_size(pool, 4);
    
    
    	res = wait_until_thread_state(test, tld, 2, 2);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	ast_threadpool_set_size(pool, 1);
    
    	/* Shrinking the threadpool should kill off the two idle threads
    	 * and one of the active threads.
    	 */
    
    	res = wait_until_thread_state(test, tld, 1, 0);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	/* The tasks are stalled until we poke them */
    	poke_worker(ctd1);
    	poke_worker(ctd2);
    
    	res = wait_for_complex_completion(ctd1);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    	res = wait_for_complex_completion(ctd2);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = wait_until_thread_state(test, tld, 0, 1);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
    
    end:
    
    	ast_threadpool_shutdown(pool);
    
    	ao2_cleanup(listener);
    	ast_free(ctd1);
    	ast_free(ctd2);
    
    AST_TEST_DEFINE(threadpool_serializer)
    {
    	int started = 0;
    	int finished = 0;
    	enum ast_test_result_state res = AST_TEST_FAIL;
    	struct ast_threadpool *pool = NULL;
    	struct ast_taskprocessor *uut = NULL;
    	struct complex_task_data *data1 = NULL;
    	struct complex_task_data *data2 = NULL;
    	struct complex_task_data *data3 = NULL;
    	struct ast_threadpool_options options = {
    		.version = AST_THREADPOOL_OPTIONS_VERSION,
    		.idle_timeout = 0,
    		.auto_increment = 0,
    		.initial_size = 2,
    		.max_size = 0,
    	};
    
    	switch (cmd) {
    	case TEST_INIT:
    		info->name = "threadpool_serializer";
    		info->category = "/main/threadpool/";
    		info->summary = "Test that serializers";
    		info->description =
    
    			"Ensures that tasks enqueued to a serialize execute in sequence.";
    
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    	pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
    	if (!pool) {
    		ast_test_status_update(test, "Could not create threadpool\n");
    		goto end;
    	}
    	uut = ast_threadpool_serializer("ser1", pool);
    	data1 = complex_task_data_alloc();
    	data2 = complex_task_data_alloc();
    	data3 = complex_task_data_alloc();
    	if (!uut || !data1 || !data2 || !data3) {
    		ast_test_status_update(test, "Allocation failed\n");
    		goto end;
    	}
    
    	/* This should start right away */
    	if (ast_taskprocessor_push(uut, complex_task, data1)) {
    		ast_test_status_update(test, "Failed to enqueue data1\n");
    		goto end;
    	}
    	started = wait_for_complex_start(data1);
    	if (!started) {
    		ast_test_status_update(test, "Failed to start data1\n");
    		goto end;
    	}
    
    	/* This should not start until data 1 is complete */
    	if (ast_taskprocessor_push(uut, complex_task, data2)) {
    		ast_test_status_update(test, "Failed to enqueue data2\n");
    		goto end;
    	}
    	started = has_complex_started(data2);
    	if (started) {
    		ast_test_status_update(test, "data2 started out of order\n");
    		goto end;
    	}
    
    	/* But the free thread in the pool can still run */
    	if (ast_threadpool_push(pool, complex_task, data3)) {
    		ast_test_status_update(test, "Failed to enqueue data3\n");
    	}
    	started = wait_for_complex_start(data3);
    	if (!started) {
    		ast_test_status_update(test, "Failed to start data3\n");
    		goto end;
    	}
    
    	/* Finishing data1 should allow data2 to start */
    	poke_worker(data1);
    	finished = wait_for_complex_completion(data1) == AST_TEST_PASS;
    	if (!finished) {
    		ast_test_status_update(test, "data1 couldn't finish\n");
    		goto end;
    	}
    	started = wait_for_complex_start(data2);
    	if (!started) {
    		ast_test_status_update(test, "Failed to start data2\n");
    		goto end;
    	}
    
    	/* Finish up */
    	poke_worker(data2);
    	finished = wait_for_complex_completion(data2) == AST_TEST_PASS;
    	if (!finished) {
    		ast_test_status_update(test, "data2 couldn't finish\n");
    		goto end;
    	}
    	poke_worker(data3);
    	finished = wait_for_complex_completion(data3) == AST_TEST_PASS;
    	if (!finished) {
    		ast_test_status_update(test, "data3 couldn't finish\n");
    		goto end;
    	}
    
    	res = AST_TEST_PASS;
    
    end:
    	poke_worker(data1);
    	poke_worker(data2);
    	poke_worker(data3);
    	ast_taskprocessor_unreference(uut);
    	ast_threadpool_shutdown(pool);
    	ast_free(data1);
    	ast_free(data2);
    	ast_free(data3);
    	return res;
    }
    
    AST_TEST_DEFINE(threadpool_serializer_dupe)
    {
    	enum ast_test_result_state res = AST_TEST_FAIL;
    	struct ast_threadpool *pool = NULL;
    	struct ast_taskprocessor *uut = NULL;
    	struct ast_taskprocessor *there_can_be_only_one = NULL;
    	struct ast_threadpool_options options = {
    		.version = AST_THREADPOOL_OPTIONS_VERSION,
    		.idle_timeout = 0,
    		.auto_increment = 0,
    		.initial_size = 2,
    		.max_size = 0,
    	};
    
    	switch (cmd) {
    	case TEST_INIT:
    		info->name = "threadpool_serializer_dupe";
    		info->category = "/main/threadpool/";
    		info->summary = "Test that serializers are uniquely named";
    		info->description =
    			"Creating two serializers with the same name should\n"
    
    			"result in error.";
    
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    	pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
    	if (!pool) {
    		ast_test_status_update(test, "Could not create threadpool\n");
    		goto end;
    	}
    
    	uut = ast_threadpool_serializer("highlander", pool);
    	if (!uut) {
    		ast_test_status_update(test, "Allocation failed\n");
    		goto end;
    	}
    
    	there_can_be_only_one = ast_threadpool_serializer("highlander", pool);
    	if (there_can_be_only_one) {
    		ast_taskprocessor_unreference(there_can_be_only_one);
    		ast_test_status_update(test, "Duplicate name error\n");
    		goto end;
    	}
    
    	res = AST_TEST_PASS;
    
    end:
    	ast_taskprocessor_unreference(uut);
    	ast_threadpool_shutdown(pool);
    	return res;
    }
    
    
    static int unload_module(void)
    {
    	ast_test_unregister(threadpool_push);
    
    	ast_test_unregister(threadpool_initial_threads);
    
    	ast_test_unregister(threadpool_thread_creation);
    
    	ast_test_unregister(threadpool_thread_destruction);
    
    	ast_test_unregister(threadpool_thread_timeout);
    
    	ast_test_unregister(threadpool_thread_timeout_thrash);
    
    	ast_test_unregister(threadpool_one_task_one_thread);
    
    	ast_test_unregister(threadpool_one_thread_one_task);
    
    	ast_test_unregister(threadpool_one_thread_multiple_tasks);
    
    	ast_test_unregister(threadpool_auto_increment);
    
    	ast_test_unregister(threadpool_max_size);
    
    	ast_test_unregister(threadpool_reactivation);
    
    	ast_test_unregister(threadpool_task_distribution);
    
    	ast_test_unregister(threadpool_more_destruction);
    
    	ast_test_unregister(threadpool_serializer);
    	ast_test_unregister(threadpool_serializer_dupe);
    
    	return 0;
    }
    
    static int load_module(void)
    {
    	ast_test_register(threadpool_push);
    
    	ast_test_register(threadpool_initial_threads);
    
    	ast_test_register(threadpool_thread_creation);
    
    	ast_test_register(threadpool_thread_destruction);
    
    	ast_test_register(threadpool_thread_timeout);
    
    	ast_test_register(threadpool_thread_timeout_thrash);
    
    	ast_test_register(threadpool_one_task_one_thread);
    
    	ast_test_register(threadpool_one_thread_one_task);
    
    	ast_test_register(threadpool_one_thread_multiple_tasks);
    
    	ast_test_register(threadpool_auto_increment);
    
    	ast_test_register(threadpool_max_size);
    
    	ast_test_register(threadpool_reactivation);
    
    	ast_test_register(threadpool_task_distribution);
    
    	ast_test_register(threadpool_more_destruction);
    
    	ast_test_register(threadpool_serializer);
    	ast_test_register(threadpool_serializer_dupe);
    
    	return AST_MODULE_LOAD_SUCCESS;
    }
    
    AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "threadpool test module");