Newer
Older
Mark Michelson
committed
info->name = "task_distribution";
info->category = "/main/threadpool/";
info->summary = "Test that tasks are evenly distributed to threads";
info->description =
"Push two tasks into a threadpool. Ensure that each is handled by\n"
"a separate thread\n";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
listener = ast_threadpool_listener_alloc(&test_callbacks);
if (!listener) {
return AST_TEST_FAIL;
}
tld = listener->private_data;
Mark Michelson
committed
pool = ast_threadpool_create(info->name, listener, 0, &options);
if (!pool) {
goto end;
}
ctd1 = complex_task_data_alloc();
ctd2 = complex_task_data_alloc();
if (!ctd1 || !ctd2) {
goto end;
}
ast_threadpool_push(pool, complex_task, ctd1);
ast_threadpool_push(pool, complex_task, ctd2);
ast_threadpool_set_size(pool, 2);
res = wait_until_thread_state(test, tld, 2, 0);
if (res == AST_TEST_FAIL) {
goto end;
}
res = listener_check(test, listener, 1, 0, 2, 2, 0, 0);
if (res == AST_TEST_FAIL) {
goto end;
}
/* The tasks are stalled until we poke them */
poke_worker(ctd1);
poke_worker(ctd2);
res = wait_for_complex_completion(ctd1);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_for_complex_completion(ctd2);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_until_thread_state(test, tld, 0, 2);
if (res == AST_TEST_FAIL) {
goto end;
}
res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
end:
if (pool) {
ast_threadpool_shutdown(pool);
}
ao2_cleanup(listener);
ast_free(ctd1);
ast_free(ctd2);
return res;
}
AST_TEST_DEFINE(threadpool_more_destruction)
{
struct ast_threadpool *pool = NULL;
struct ast_threadpool_listener *listener = NULL;
struct complex_task_data *ctd1 = NULL;
struct complex_task_data *ctd2 = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
struct test_listener_data *tld;
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
switch (cmd) {
case TEST_INIT:
Mark Michelson
committed
info->name = "more_destruction";
info->category = "/main/threadpool/";
info->summary = "Test that threads are destroyed as expected";
info->description =
"Push two tasks into a threadpool. Set the threadpool size to 4\n"
"Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
"threadpool down to 1 thread. Ensure that the thread leftove is active\n"
"and ensure that both tasks complete.\n";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
listener = ast_threadpool_listener_alloc(&test_callbacks);
if (!listener) {
return AST_TEST_FAIL;
}
tld = listener->private_data;
Mark Michelson
committed
pool = ast_threadpool_create(info->name, listener, 0, &options);
if (!pool) {
goto end;
}
ctd1 = complex_task_data_alloc();
ctd2 = complex_task_data_alloc();
if (!ctd1 || !ctd2) {
goto end;
}
ast_threadpool_push(pool, complex_task, ctd1);
ast_threadpool_push(pool, complex_task, ctd2);
ast_threadpool_set_size(pool, 4);
res = wait_until_thread_state(test, tld, 2, 2);
if (res == AST_TEST_FAIL) {
goto end;
}
res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
if (res == AST_TEST_FAIL) {
goto end;
}
ast_threadpool_set_size(pool, 1);
/* Shrinking the threadpool should kill off the two idle threads
* and one of the active threads.
*/
res = wait_until_thread_state(test, tld, 1, 0);
if (res == AST_TEST_FAIL) {
goto end;
}
res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
if (res == AST_TEST_FAIL) {
goto end;
}
/* The tasks are stalled until we poke them */
poke_worker(ctd1);
poke_worker(ctd2);
res = wait_for_complex_completion(ctd1);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_for_complex_completion(ctd2);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_until_thread_state(test, tld, 0, 1);
if (res == AST_TEST_FAIL) {
goto end;
}
res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
end:
if (pool) {
ast_threadpool_shutdown(pool);
}
ao2_cleanup(listener);
ast_free(ctd1);
ast_free(ctd2);
return res;
}
static int unload_module(void)
{
ast_test_unregister(threadpool_push);
ast_test_unregister(threadpool_thread_creation);
ast_test_unregister(threadpool_thread_destruction);
ast_test_unregister(threadpool_thread_timeout);
ast_test_unregister(threadpool_one_task_one_thread);
ast_test_unregister(threadpool_one_thread_one_task);
ast_test_unregister(threadpool_one_thread_multiple_tasks);
ast_test_unregister(threadpool_auto_increment);
ast_test_unregister(threadpool_reactivation);
ast_test_unregister(threadpool_task_distribution);
ast_test_unregister(threadpool_more_destruction);
return 0;
}
static int load_module(void)
{
ast_test_register(threadpool_push);
ast_test_register(threadpool_thread_creation);
ast_test_register(threadpool_thread_destruction);
ast_test_register(threadpool_thread_timeout);
ast_test_register(threadpool_one_task_one_thread);
ast_test_register(threadpool_one_thread_one_task);
ast_test_register(threadpool_one_thread_multiple_tasks);
ast_test_register(threadpool_auto_increment);
ast_test_register(threadpool_reactivation);
ast_test_register(threadpool_task_distribution);
ast_test_register(threadpool_more_destruction);
return AST_MODULE_LOAD_SUCCESS;
}
AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "threadpool test module");