Skip to content
Snippets Groups Projects
test_threadpool.c 32.9 KiB
Newer Older
  • Learn to ignore specific revisions
  • 
    	res = wait_for_completion(test, std);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	res = wait_until_thread_state(test, tld, 0, 2);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	res = listener_check(test, listener, 1, 1, 1, 0, 2, 1);
    end:
    	ast_threadpool_shutdown(pool);
    	ao2_cleanup(listener);
    	ast_free(std);
    	ast_free(tld);
    	return res;
    }
    
    
    AST_TEST_DEFINE(threadpool_reactivation)
    {
    	struct ast_threadpool *pool = NULL;
    	struct ast_threadpool_listener *listener = NULL;
    	struct simple_task_data *std1 = NULL;
    	struct simple_task_data *std2 = NULL;
    	enum ast_test_result_state res = AST_TEST_FAIL;
    
    	struct test_listener_data *tld = NULL;
    
    	struct ast_threadpool_options options = {
    		.version = AST_THREADPOOL_OPTIONS_VERSION,
    		.idle_timeout = 0,
    
    		.auto_increment = 0,
    
    
    	switch (cmd) {
    	case TEST_INIT:
    
    		info->category = "/main/threadpool/";
    		info->summary = "Test that a threadpool reactivates when work is added";
    		info->description =
    			"Push a task into a threadpool. Make sure the task executes and the\n"
    			"thread goes idle. Then push a second task and ensure that the thread\n"
    			"awakens and executes the second task.\n";
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    
    	tld = test_alloc();
    	if (!tld) {
    
    		return AST_TEST_FAIL;
    	}
    
    
    	listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
    	if (!listener) {
    		goto end;
    	}
    
    	pool = ast_threadpool_create(info->name, listener, &options);
    
    	if (!pool) {
    		goto end;
    	}
    
    	std1 = simple_task_data_alloc();
    	std2 = simple_task_data_alloc();
    	if (!std1 || !std2) {
    		goto end;
    	}
    
    	ast_threadpool_push(pool, simple_task, std1);
    
    	ast_threadpool_set_size(pool, 1);
    
    
    	res = wait_for_completion(test, std1);
    
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = wait_for_empty_notice(test, tld);
    
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	res = wait_until_thread_state(test, tld, 0, 1);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
    
    
    	/* Now make sure the threadpool reactivates when we add a second task */
    	ast_threadpool_push(pool, simple_task, std2);
    
    
    	res = wait_for_completion(test, std2);
    
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = wait_for_empty_notice(test, tld);
    
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	res = wait_until_thread_state(test, tld, 0, 1);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = listener_check(test, listener, 1, 1, 2, 0, 1, 1);
    
    end:
    
    	ast_threadpool_shutdown(pool);
    
    	ao2_cleanup(listener);
    	ast_free(std1);
    	ast_free(std2);
    
    struct complex_task_data {
    	int task_executed;
    	int continue_task;
    	ast_mutex_t lock;
    	ast_cond_t stall_cond;
    	ast_cond_t done_cond;
    };
    
    static struct complex_task_data *complex_task_data_alloc(void)
    {
    	struct complex_task_data *ctd = ast_calloc(1, sizeof(*ctd));
    
    	if (!ctd) {
    		return NULL;
    	}
    	ast_mutex_init(&ctd->lock);
    	ast_cond_init(&ctd->stall_cond, NULL);
    	ast_cond_init(&ctd->done_cond, NULL);
    	return ctd;
    }
    
    static int complex_task(void *data)
    {
    	struct complex_task_data *ctd = data;
    	SCOPED_MUTEX(lock, &ctd->lock);
    	while (!ctd->continue_task) {
    		ast_cond_wait(&ctd->stall_cond, lock);
    	}
    	/* We got poked. Finish up */
    	ctd->task_executed = 1;
    	ast_cond_signal(&ctd->done_cond);
    	return 0;
    }
    
    static void poke_worker(struct complex_task_data *ctd)
    {
    	SCOPED_MUTEX(lock, &ctd->lock);
    	ctd->continue_task = 1;
    	ast_cond_signal(&ctd->stall_cond);
    }
    
    static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
    {
    	struct timeval start = ast_tvnow();
    	struct timespec end = {
    		.tv_sec = start.tv_sec + 5,
    		.tv_nsec = start.tv_usec * 1000
    	};
    	enum ast_test_result_state res = AST_TEST_PASS;
    	SCOPED_MUTEX(lock, &ctd->lock);
    
    	while (!ctd->task_executed) {
    
    		if (ast_cond_timedwait(&ctd->done_cond, lock, &end) == ETIMEDOUT) {
    			break;
    		}
    
    	}
    
    	if (!ctd->task_executed) {
    		res = AST_TEST_FAIL;
    	}
    	return res;
    }
    
    AST_TEST_DEFINE(threadpool_task_distribution)
    {
    	struct ast_threadpool *pool = NULL;
    	struct ast_threadpool_listener *listener = NULL;
    	struct complex_task_data *ctd1 = NULL;
    	struct complex_task_data *ctd2 = NULL;
    	enum ast_test_result_state res = AST_TEST_FAIL;
    
    	struct test_listener_data *tld = NULL;
    
    	struct ast_threadpool_options options = {
    		.version = AST_THREADPOOL_OPTIONS_VERSION,
    		.idle_timeout = 0,
    
    		.auto_increment = 0,
    
    
    	switch (cmd) {
    	case TEST_INIT:
    
    		info->category = "/main/threadpool/";
    		info->summary = "Test that tasks are evenly distributed to threads";
    		info->description =
    			"Push two tasks into a threadpool. Ensure that each is handled by\n"
    			"a separate thread\n";
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    
    	tld = test_alloc();
    	if (!tld) {
    
    		return AST_TEST_FAIL;
    	}
    
    
    	listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
    	if (!listener) {
    		goto end;
    	}
    
    	pool = ast_threadpool_create(info->name, listener, &options);
    
    	if (!pool) {
    		goto end;
    	}
    
    	ctd1 = complex_task_data_alloc();
    	ctd2 = complex_task_data_alloc();
    	if (!ctd1 || !ctd2) {
    		goto end;
    	}
    
    	ast_threadpool_push(pool, complex_task, ctd1);
    	ast_threadpool_push(pool, complex_task, ctd2);
    
    	ast_threadpool_set_size(pool, 2);
    
    
    	res = wait_until_thread_state(test, tld, 2, 0);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = listener_check(test, listener, 1, 0, 2, 2, 0, 0);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	/* The tasks are stalled until we poke them */
    	poke_worker(ctd1);
    	poke_worker(ctd2);
    
    	res = wait_for_complex_completion(ctd1);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    	res = wait_for_complex_completion(ctd2);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = wait_until_thread_state(test, tld, 0, 2);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
    
    end:
    
    	ast_threadpool_shutdown(pool);
    
    	ao2_cleanup(listener);
    	ast_free(ctd1);
    	ast_free(ctd2);
    
    AST_TEST_DEFINE(threadpool_more_destruction)
    {
    	struct ast_threadpool *pool = NULL;
    	struct ast_threadpool_listener *listener = NULL;
    	struct complex_task_data *ctd1 = NULL;
    	struct complex_task_data *ctd2 = NULL;
    	enum ast_test_result_state res = AST_TEST_FAIL;
    
    	struct test_listener_data *tld = NULL;
    
    	struct ast_threadpool_options options = {
    		.version = AST_THREADPOOL_OPTIONS_VERSION,
    		.idle_timeout = 0,
    
    		.auto_increment = 0,
    
    
    	switch (cmd) {
    	case TEST_INIT:
    
    		info->category = "/main/threadpool/";
    		info->summary = "Test that threads are destroyed as expected";
    		info->description =
    			"Push two tasks into a threadpool. Set the threadpool size to 4\n"
    			"Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
    
    			"threadpool down to 1 thread. Ensure that the thread leftover is active\n"
    
    			"and ensure that both tasks complete.\n";
    		return AST_TEST_NOT_RUN;
    	case TEST_EXECUTE:
    		break;
    	}
    
    
    	tld = test_alloc();
    	if (!tld) {
    
    		return AST_TEST_FAIL;
    	}
    
    
    	listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
    	if (!listener) {
    		goto end;
    	}
    
    	pool = ast_threadpool_create(info->name, listener, &options);
    
    	if (!pool) {
    		goto end;
    	}
    
    	ctd1 = complex_task_data_alloc();
    	ctd2 = complex_task_data_alloc();
    	if (!ctd1 || !ctd2) {
    		goto end;
    	}
    
    	ast_threadpool_push(pool, complex_task, ctd1);
    	ast_threadpool_push(pool, complex_task, ctd2);
    
    	ast_threadpool_set_size(pool, 4);
    
    
    	res = wait_until_thread_state(test, tld, 2, 2);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	ast_threadpool_set_size(pool, 1);
    
    	/* Shrinking the threadpool should kill off the two idle threads
    	 * and one of the active threads.
    	 */
    
    	res = wait_until_thread_state(test, tld, 1, 0);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    	/* The tasks are stalled until we poke them */
    	poke_worker(ctd1);
    	poke_worker(ctd2);
    
    	res = wait_for_complex_completion(ctd1);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    	res = wait_for_complex_completion(ctd2);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = wait_until_thread_state(test, tld, 0, 1);
    	if (res == AST_TEST_FAIL) {
    		goto end;
    	}
    
    
    	res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
    
    end:
    
    	ast_threadpool_shutdown(pool);
    
    	ao2_cleanup(listener);
    	ast_free(ctd1);
    	ast_free(ctd2);
    
    static int unload_module(void)
    {
    	ast_test_unregister(threadpool_push);
    
    	ast_test_unregister(threadpool_initial_threads);
    
    	ast_test_unregister(threadpool_thread_creation);
    
    	ast_test_unregister(threadpool_thread_destruction);
    
    	ast_test_unregister(threadpool_thread_timeout);
    
    	ast_test_unregister(threadpool_one_task_one_thread);
    
    	ast_test_unregister(threadpool_one_thread_one_task);
    
    	ast_test_unregister(threadpool_one_thread_multiple_tasks);
    
    	ast_test_unregister(threadpool_auto_increment);
    
    	ast_test_unregister(threadpool_max_size);
    
    	ast_test_unregister(threadpool_reactivation);
    
    	ast_test_unregister(threadpool_task_distribution);
    
    	ast_test_unregister(threadpool_more_destruction);
    
    	return 0;
    }
    
    static int load_module(void)
    {
    	ast_test_register(threadpool_push);
    
    	ast_test_register(threadpool_initial_threads);
    
    	ast_test_register(threadpool_thread_creation);
    
    	ast_test_register(threadpool_thread_destruction);
    
    	ast_test_register(threadpool_thread_timeout);
    
    	ast_test_register(threadpool_one_task_one_thread);
    
    	ast_test_register(threadpool_one_thread_one_task);
    
    	ast_test_register(threadpool_one_thread_multiple_tasks);
    
    	ast_test_register(threadpool_auto_increment);
    
    	ast_test_register(threadpool_max_size);
    
    	ast_test_register(threadpool_reactivation);
    
    	ast_test_register(threadpool_task_distribution);
    
    	ast_test_register(threadpool_more_destruction);
    
    	return AST_MODULE_LOAD_SUCCESS;
    }
    
    AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "threadpool test module");