diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c index 4fb7bab2d75b39717cf3303bf570712e2c376adb..3435b2877e5a933da19d9e79fab12ae066e6be37 100644 --- a/tests/test_threadpool.c +++ b/tests/test_threadpool.c @@ -703,6 +703,145 @@ end: } +struct complex_task_data { + int task_executed; + int continue_task; + ast_mutex_t lock; + ast_cond_t stall_cond; + ast_cond_t done_cond; +}; + +static struct complex_task_data *complex_task_data_alloc(void) +{ + struct complex_task_data *ctd = ast_calloc(1, sizeof(*ctd)); + + if (!ctd) { + return NULL; + } + ast_mutex_init(&ctd->lock); + ast_cond_init(&ctd->stall_cond, NULL); + ast_cond_init(&ctd->done_cond, NULL); + return ctd; +} + +static int complex_task(void *data) +{ + struct complex_task_data *ctd = data; + SCOPED_MUTEX(lock, &ctd->lock); + while (!ctd->continue_task) { + ast_cond_wait(&ctd->stall_cond, lock); + } + /* We got poked. Finish up */ + ctd->task_executed = 1; + ast_cond_signal(&ctd->done_cond); + return 0; +} + +static void poke_worker(struct complex_task_data *ctd) +{ + SCOPED_MUTEX(lock, &ctd->lock); + ctd->continue_task = 1; + ast_cond_signal(&ctd->stall_cond); +} + +static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 5, + .tv_nsec = start.tv_usec * 1000 + }; + enum ast_test_result_state res = AST_TEST_PASS; + SCOPED_MUTEX(lock, &ctd->lock); + + while (!ctd->task_executed) { + ast_cond_timedwait(&ctd->done_cond, lock, &end); + } + + if (!ctd->task_executed) { + res = AST_TEST_FAIL; + } + return res; +} + +AST_TEST_DEFINE(threadpool_task_distribution) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct complex_task_data *ctd1 = NULL; + struct complex_task_data *ctd2 = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld; + + switch (cmd) { + case TEST_INIT: + info->name = "threadpool_task_distribution"; + info->category = "/main/threadpool/"; + info->summary = "Test that tasks are evenly distributed to threads"; + info->description = + "Push two tasks into a threadpool. Ensure that each is handled by\n" + "a separate thread\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks); + if (!listener) { + return AST_TEST_FAIL; + } + tld = listener->private_data; + + pool = ast_threadpool_create(listener, 0); + if (!pool) { + goto end; + } + + ctd1 = complex_task_data_alloc(); + ctd2 = complex_task_data_alloc(); + if (!ctd1 || !ctd2) { + goto end; + } + + ast_threadpool_push(pool, complex_task, ctd1); + ast_threadpool_push(pool, complex_task, ctd2); + + ast_threadpool_set_size(pool, 2); + + WAIT_WHILE(tld, tld->num_active < 2); + + res = listener_check(test, listener, 1, 0, 2, 2, 0, 0); + if (res == AST_TEST_FAIL) { + goto end; + } + + /* The tasks are stalled until we poke them */ + poke_worker(ctd1); + poke_worker(ctd2); + + res = wait_for_complex_completion(ctd1); + if (res == AST_TEST_FAIL) { + goto end; + } + res = wait_for_complex_completion(ctd2); + if (res == AST_TEST_FAIL) { + goto end; + } + + WAIT_WHILE(tld, tld->num_idle < 2); + + res = listener_check(test, listener, 1, 0, 2, 0, 2, 1); + +end: + if (pool) { + ast_threadpool_shutdown(pool); + } + ao2_cleanup(listener); + ast_free(ctd1); + ast_free(ctd2); + return res; +} + static int unload_module(void) { ast_test_unregister(threadpool_push); @@ -712,6 +851,7 @@ static int unload_module(void) ast_test_unregister(threadpool_one_thread_one_task); ast_test_unregister(threadpool_one_thread_multiple_tasks); ast_test_unregister(threadpool_reactivation); + ast_test_unregister(threadpool_task_distribution); return 0; } @@ -724,6 +864,7 @@ static int load_module(void) ast_test_register(threadpool_one_thread_one_task); ast_test_register(threadpool_one_thread_multiple_tasks); ast_test_register(threadpool_reactivation); + ast_test_register(threadpool_task_distribution); return AST_MODULE_LOAD_SUCCESS; }