diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index 6359c057e7788c5c49e643f350e3c95a6e37c258..4f61939f873188ebd08c8363b10d251ff1923d39 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -84,6 +84,15 @@ struct ast_taskprocessor_listener_callbacks { * \retval non-NULL Allocated private data */ void *(*alloc)(struct ast_taskprocessor_listener *listener); + /*! + * \brief The taskprocessor has started completely + * + * This indicates that the taskprocessor is fully set up and the listener + * can now start interacting with it. + * + * \param listener The listener to start + */ + int (*start)(struct ast_taskprocessor_listener *listener); /*! * \brief Indicates a task was pushed to the processor * diff --git a/main/taskprocessor.c b/main/taskprocessor.c index d83228f33512ba1c843a2fe0f002773a08d1be9a..80875ec4af0243519391ccf62006160e73d99940 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -171,12 +171,20 @@ static void *default_listener_alloc(struct ast_taskprocessor_listener *listener) ast_cond_init(&pvt->cond, NULL); ast_mutex_init(&pvt->lock); pvt->poll_thread = AST_PTHREADT_NULL; - if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) { - return NULL; - } return pvt; } +static int default_listener_start(struct ast_taskprocessor_listener *listener) +{ + struct default_taskprocessor_listener_pvt *pvt = listener->private_data; + + if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) { + return -1; + } + + return 0; +} + static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) { struct default_taskprocessor_listener_pvt *pvt = listener->private_data; @@ -209,6 +217,7 @@ static void default_listener_destroy(void *obj) static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = { .alloc = default_listener_alloc, + .start = default_listener_start, .task_pushed = default_task_pushed, .emptied = default_emptied, .shutdown = default_listener_shutdown, @@ -556,6 +565,12 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam return NULL; } + if (p->listener->callbacks->start(p->listener)) { + ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name); + ast_taskprocessor_unreference(p); + return NULL; + } + /* RAII_VAR will decrement the refcount at the end of the function. * Since we want to pass back a reference to p, we bump the refcount */ diff --git a/main/threadpool.c b/main/threadpool.c index 1da0d0766c5e1ab85f02b1d1f060f2d40684f44e..1b047792615e917695f918b8ec61416b06f099a7 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -268,6 +268,11 @@ static void *threadpool_alloc(struct ast_taskprocessor_listener *listener) return pool; } +static int threadpool_tps_start(struct ast_taskprocessor_listener *listener) +{ + return 0; +} + /*! * \brief helper used for queued task when tasks are pushed */ @@ -431,6 +436,7 @@ static void threadpool_destroy(void *private_data) */ static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = { .alloc = threadpool_alloc, + .start = threadpool_tps_start, .task_pushed = threadpool_tps_task_pushed, .emptied = threadpool_tps_emptied, .shutdown = threadpool_tps_shutdown, @@ -623,6 +629,7 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis pool = tps_listener->private_data; pool->tps = tps; + ast_log(LOG_NOTICE, "The taskprocessor I've created is located at %p\n", pool->tps); ao2_ref(listener, +1); pool->listener = listener; ast_threadpool_set_size(pool, initial_size); diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c index cbab754a9406cd718e318f1604eebd18672c85f6..377a2b3e3ce7a4a3b5b8e3e167056b2dd9c37cb1 100644 --- a/tests/test_taskprocessor.c +++ b/tests/test_taskprocessor.c @@ -116,6 +116,7 @@ AST_TEST_DEFINE(default_taskprocessor) break; } } + ast_mutex_unlock(&task_data.lock); if (!task_data.task_complete) { ast_test_status_update(test, "Queued task did not execute!\n"); @@ -218,6 +219,7 @@ AST_TEST_DEFINE(default_taskprocessor_load) break; } } + ast_mutex_unlock(&load_task_results.lock); if (load_task_results.tasks_completed != NUM_TASKS) { ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n", @@ -266,6 +268,14 @@ static void *test_alloc(struct ast_taskprocessor_listener *listener) return pvt; } +/*! + * \brief test taskprocessor listener's start callback + */ +static int test_start(struct ast_taskprocessor_listener *listener) +{ + return 0; +} + /*! * \brief test taskprocessor listener's task_pushed callback * @@ -309,6 +319,7 @@ static void test_destroy(void *private_data) static const struct ast_taskprocessor_listener_callbacks test_callbacks = { .alloc = test_alloc, + .start = test_start, .task_pushed = test_task_pushed, .emptied = test_emptied, .shutdown = test_shutdown, diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c index eed4dc20febb31bfdc14d354bb843a0cf398db12..fbbe670bb3210768a73a88d101b8808f8e5347ae 100644 --- a/tests/test_threadpool.c +++ b/tests/test_threadpool.c @@ -36,6 +36,7 @@ #include "asterisk/module.h" #include "asterisk/lock.h" #include "asterisk/astobj2.h" +#include "asterisk/logger.h" struct test_listener_data { int num_active; @@ -66,6 +67,7 @@ static void test_state_changed(struct ast_threadpool *pool, { struct test_listener_data *tld = listener->private_data; SCOPED_MUTEX(lock, &tld->lock); + ast_log(LOG_NOTICE, "State changed: num_active: %d, num_idle: %d\n", active_threads, idle_threads); tld->num_active = active_threads; tld->num_idle = idle_threads; ast_cond_signal(&tld->cond); @@ -95,6 +97,7 @@ static void test_emptied(struct ast_threadpool *pool, static void test_destroy(void *private_data) { struct test_listener_data *tld = private_data; + ast_debug(1, "Poop\n"); ast_cond_destroy(&tld->cond); ast_mutex_destroy(&tld->lock); ast_free(tld); @@ -135,6 +138,15 @@ static int simple_task(void *data) return 0; } +#define WAIT_WHILE(tld, condition) \ +{\ + ast_mutex_lock(&tld->lock);\ + while ((condition)) {\ + ast_cond_wait(&tld->cond, &tld->lock);\ + }\ + ast_mutex_unlock(&tld->lock);\ +}\ + static void wait_for_task_pushed(struct ast_threadpool_listener *listener) { struct test_listener_data *tld = listener->private_data; @@ -246,15 +258,64 @@ end: return res; } +AST_TEST_DEFINE(threadpool_thread_creation) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld; + + switch (cmd) { + case TEST_INIT: + info->name = "threadpool_thread_creation"; + info->category = "/main/threadpool_thread_creation/"; + info->summary = "Test threadpool thread creation"; + info->description = + "Ensure that threads can be added to a threadpool"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks); + if (!listener) { + return AST_TEST_FAIL; + } + tld = listener->private_data; + + pool = ast_threadpool_create(listener, 0); + if (!pool) { + goto end; + } + + /* Now let's create a thread. It should start active, then go + * idle immediately + */ + ast_threadpool_set_size(pool, 1); + + WAIT_WHILE(tld, tld->num_idle == 0); + + res = listener_check(test, listener, 0, 0, 0, 0, 1, 0); + +end: + if (pool) { + ast_threadpool_shutdown(pool); + } + ao2_cleanup(listener); + return res; +} + static int unload_module(void) { ast_test_unregister(threadpool_push); + ast_test_unregister(threadpool_thread_creation); return 0; } static int load_module(void) { ast_test_register(threadpool_push); + ast_test_register(threadpool_thread_creation); return AST_MODULE_LOAD_SUCCESS; }