diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index 0f1876e7af96ad6236b3012702ff7c8e814d3500..219166305749dee17224689c8c337c24a776b150 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -1,7 +1,7 @@ /* * Asterisk -- An open source telephony toolkit. * - * Copyright (C) 2007-2008, Digium, Inc. + * Copyright (C) 2007-2013, Digium, Inc. * * Dwayne M. Hubbard <dhubbard@digium.com> * @@ -22,22 +22,33 @@ * * \author Dwayne M. Hubbard <dhubbard@digium.com> * - * \note A taskprocessor is a named singleton containing a processing thread and - * a task queue that serializes tasks pushed into it by [a] module(s) that reference the taskprocessor. - * A taskprocessor is created the first time its name is requested via the ast_taskprocessor_get() - * function and destroyed when the taskprocessor reference count reaches zero. + * \note A taskprocessor is a named object containing a task queue that + * serializes tasks pushed into it by [a] module(s) that reference the taskprocessor. + * A taskprocessor is created the first time its name is requested via the + * ast_taskprocessor_get() function or the ast_taskprocessor_create_with_listener() + * function and destroyed when the taskprocessor reference count reaches zero. A + * taskprocessor also contains an accompanying listener that is notified when changes + * in the task queue occur. * - * Modules that obtain a reference to a taskprocessor can queue tasks into the taskprocessor - * to be processed by the singleton processing thread when the task is popped off the front - * of the queue. A task is a wrapper around a task-handling function pointer and a data - * pointer. It is the responsibility of the task handling function to free memory allocated for - * the task data pointer. A task is pushed into a taskprocessor queue using the + * A task is a wrapper around a task-handling function pointer and a data + * pointer. A task is pushed into a taskprocessor queue using the * ast_taskprocessor_push(taskprocessor, taskhandler, taskdata) function and freed by the - * taskprocessor after the task handling function returns. A module releases its reference to a - * taskprocessor using the ast_taskprocessor_unreference() function which may result in the - * destruction of the taskprocessor if the taskprocessor's reference count reaches zero. Tasks waiting - * to be processed in the taskprocessor queue when the taskprocessor reference count reaches zero - * will be purged and released from the taskprocessor queue without being processed. + * taskprocessor after the task handling function returns. A module releases its + * reference to a taskprocessor using the ast_taskprocessor_unreference() function which + * may result in the destruction of the taskprocessor if the taskprocessor's reference + * count reaches zero. When the taskprocessor's reference count reaches zero, its + * listener's shutdown() callback will be called. Any further attempts to execute tasks + * will be denied. + * + * The taskprocessor listener has the flexibility of doling out tasks to best fit the + * module's needs. For instance, a taskprocessor listener may have a single dispatch + * thread that handles all tasks, or it may dispatch tasks to a thread pool. + * + * There is a default taskprocessor listener that will be used if a taskprocessor is + * created without any explicit listener. This default listener runs tasks sequentially + * in a single thread. The listener will execute tasks as long as there are tasks to be + * processed. When the taskprocessor is shut down, the default listener will stop + * processing tasks and join its execution thread. */ #ifndef __AST_TASKPROCESSOR_H__ @@ -48,9 +59,9 @@ struct ast_taskprocessor; /*! * \brief ast_tps_options for specification of taskprocessor options * - * Specify whether a taskprocessor should be created via ast_taskprocessor_get() if the taskprocessor - * does not already exist. The default behavior is to create a taskprocessor if it does not already exist - * and provide its reference to the calling function. To only return a reference to a taskprocessor if + * Specify whether a taskprocessor should be created via ast_taskprocessor_get() if the taskprocessor + * does not already exist. The default behavior is to create a taskprocessor if it does not already exist + * and provide its reference to the calling function. To only return a reference to a taskprocessor if * and only if it exists, use the TPS_REF_IF_EXISTS option in ast_taskprocessor_get(). */ enum ast_tps_options { @@ -60,13 +71,86 @@ enum ast_tps_options { TPS_REF_IF_EXISTS = (1 << 0), }; +struct ast_taskprocessor_listener; + +struct ast_taskprocessor_listener_callbacks { + /*! + * \brief The taskprocessor has started completely + * + * This indicates that the taskprocessor is fully set up and the listener + * can now start interacting with it. + * + * \param listener The listener to start + */ + int (*start)(struct ast_taskprocessor_listener *listener); + /*! + * \brief Indicates a task was pushed to the processor + * + * \param listener The listener + * \param was_empty If non-zero, the taskprocessor was empty prior to the task being pushed + */ + void (*task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty); + /*! + * \brief Indicates the task processor has become empty + * + * \param listener The listener + */ + void (*emptied)(struct ast_taskprocessor_listener *listener); + /*! + * \brief Indicates the taskprocessor wishes to die. + * + * All operations on the task processor must to be stopped in + * this callback. This is an opportune time to free the listener's + * user data if it is not going to be used anywhere else. + * + * After this callback returns, it is NOT safe to operate on the + * listener's reference to the taskprocessor. + * + * \param listener The listener + */ + void (*shutdown)(struct ast_taskprocessor_listener *listener); +}; + +/*! + * \brief Get a reference to the listener's taskprocessor + * + * This will return the taskprocessor with its reference count increased. Release + * the reference to this object by using ast_taskprocessor_unreference() + * + * \param listener The listener that has the taskprocessor + * \return The taskprocessor + */ +struct ast_taskprocessor *ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener); + +/*! + * \brief Get the user data from the listener + * \param listener The taskprocessor listener + * \return The listener's user data + */ +void *ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener); + +/*! + * \brief Allocate a taskprocessor listener + * + * \since 12.0.0 + * + * This will result in the listener being allocated with the specified + * callbacks. + * + * \param callbacks The callbacks to assign to the listener + * \param user_data The user data for the listener + * \retval NULL Failure + * \retval non-NULL The newly allocated taskprocessor listener + */ +struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data); + /*! * \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary * * The default behavior of instantiating a taskprocessor if one does not already exist can be * disabled by specifying the TPS_REF_IF_EXISTS ast_tps_options as the second argument to ast_taskprocessor_get(). * \param name The name of the taskprocessor - * \param create Use 0 by default or specify TPS_REF_IF_EXISTS to return NULL if the taskprocessor does + * \param create Use 0 by default or specify TPS_REF_IF_EXISTS to return NULL if the taskprocessor does * not already exist * return A pointer to a reference counted taskprocessor under normal conditions, or NULL if the * TPS_REF_IF_EXISTS reference type is specified and the taskprocessor does not exist @@ -74,6 +158,22 @@ enum ast_tps_options { */ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create); +/*! + * \brief Create a taskprocessor with a custom listener + * + * \since 12.0.0 + * + * Note that when a taskprocessor is created in this way, it does not create + * any threads to execute the tasks. This job is left up to the listener. + * The listener's start() callback will be called during this function. + * + * \param name The name of the taskprocessor to create + * \param listener The listener for operations on this taskprocessor + * \retval NULL Failure + * \reval non-NULL success + */ +struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener); + /*! * \brief Unreference the specified taskprocessor and its reference count will decrement. * @@ -96,6 +196,17 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps); */ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap); +/*! + * \brief Pop a task off the taskprocessor and execute it. + * + * \since 12.0.0 + * + * \param tps The taskprocessor from which to execute. + * \retval 0 There is no further work to be done. + * \retval 1 Tasks still remain in the taskprocessor queue. + */ +int ast_taskprocessor_execute(struct ast_taskprocessor *tps); + /*! * \brief Return the name of the taskprocessor singleton * \since 1.6.1 diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h new file mode 100644 index 0000000000000000000000000000000000000000..e18ba6c6969c74bb109708f27edb7907ab6bd24f --- /dev/null +++ b/include/asterisk/threadpool.h @@ -0,0 +1,180 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2012-2013, Digium, Inc. + * + * Mark Michelson <mmmichelson@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + + +#ifndef _ASTERISK_THREADPOOL_H +#define _ASTERISK_THREADPOOL_H + +struct ast_threadpool; +struct ast_taskprocessor; +struct ast_threadpool_listener; + +struct ast_threadpool_listener_callbacks { + /*! + * \brief Indicates that the state of threads in the pool has changed + * + * \param pool The pool whose state has changed + * \param listener The threadpool listener + * \param active_threads The number of active threads in the pool + * \param idle_threads The number of idle threads in the pool + */ + void (*state_changed)(struct ast_threadpool *pool, + struct ast_threadpool_listener *listener, + int active_threads, + int idle_threads); + /*! + * \brief Indicates that a task was pushed to the threadpool + * + * \param pool The pool that had a task pushed + * \param listener The threadpool listener + * \param was_empty Indicates whether there were any tasks prior to adding the new one. + */ + void (*task_pushed)(struct ast_threadpool *pool, + struct ast_threadpool_listener *listener, + int was_empty); + /*! + * \brief Indicates the threadpool's taskprocessor has become empty + * + * \param pool The pool that has become empty + * \param listener The threadpool's listener + */ + void (*emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener); + + /*! + * \brief The threadpool is shutting down + * + * This would be an opportune time to free the listener's user data + * if one wishes. However, it is acceptable to not do so if the user data + * should persist beyond the lifetime of the pool. + * + * \param listener The threadpool's listener + */ + void (*shutdown)(struct ast_threadpool_listener *listener); +}; + +struct ast_threadpool_options { +#define AST_THREADPOOL_OPTIONS_VERSION 1 + /*! Version of threadpool options in use */ + int version; + /*! + * \brief Time limit in seconds for idle threads + * + * A time of 0 or less will mean no timeout. + */ + int idle_timeout; + /*! + * \brief Number of threads to increment pool by + * + * If a task is added into a pool and no idle thread is + * available to activate, then the pool can automatically + * grow by the given amount. + * + * Zero is a perfectly valid value to give here if you want + * to control threadpool growth yourself via your listener. + */ + int auto_increment; + /*! + * \brief Number of threads the pool will start with + * + * When the threadpool is allocated, it will immediately size + * itself to have this number of threads in it. + * + * Zero is a valid value if the threadpool should start + * without any threads allocated. + */ + int initial_size; + /*! + * \brief Maximum number of threads a pool may have + * + * When the threadpool's size increases, it can never increase + * beyond this number of threads. + */ + int max_size; +}; + +/*! + * \brief Allocate a threadpool listener + * + * This function will call back into the alloc callback for the + * listener. + * + * \param callbacks Listener callbacks to assign to the listener + * \param user_data User data to be stored in the threadpool listener + * \retval NULL Failed to allocate the listener + * \retval non-NULL The newly-created threadpool listener + */ +struct ast_threadpool_listener *ast_threadpool_listener_alloc( + const struct ast_threadpool_listener_callbacks *callbacks, void *user_data); + +/*! + * \brief Get the threadpool listener's user data + * \param listener The threadpool listener + * \return The user data + */ +void *ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener); + +/*! + * \brief Create a new threadpool + * + * This function creates a threadpool. Tasks may be pushed onto this thread pool + * in and will be automatically acted upon by threads within the pool. + * + * Only a single threadpool with a given name may exist. This function will fail + * if a threadpool with the given name already exists. + * + * \param name The unique name for the threadpool + * \param listener The listener the threadpool will notify of changes. Can be NULL. + * \param options The behavioral options for this threadpool + * \retval NULL Failed to create the threadpool + * \retval non-NULL The newly-created threadpool + */ +struct ast_threadpool *ast_threadpool_create(const char *name, + struct ast_threadpool_listener *listener, + const struct ast_threadpool_options *options); + +/*! + * \brief Set the number of threads for the thread pool + * + * This number may be more or less than the current number of + * threads in the threadpool. + * + * \param threadpool The threadpool to adjust + * \param size The new desired size of the threadpool + */ +void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int size); + +/*! + * \brief Push a task to the threadpool + * + * Tasks pushed into the threadpool will be automatically taken by + * one of the threads within + * \param pool The threadpool to add the task to + * \param task The task to add + * \param data The parameter for the task + * \retval 0 success + * \retval -1 failure + */ +int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data); + +/*! + * \brief Shut down a threadpool and destroy it + * + * \param pool The pool to shut down + */ +void ast_threadpool_shutdown(struct ast_threadpool *pool); +#endif /* ASTERISK_THREADPOOL_H */ diff --git a/main/taskprocessor.c b/main/taskprocessor.c index d8f1af3e84182e8bbc63b5977e638bd1b94d425d..b603e5738418a02b3b4eb1d8440e266456454a0b 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -1,7 +1,7 @@ /* * Asterisk -- An open source telephony toolkit. * - * Copyright (C) 2007-2008, Digium, Inc. + * Copyright (C) 2007-2013, Digium, Inc. * * Dwayne M. Hubbard <dhubbard@digium.com> * @@ -38,7 +38,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$") #include "asterisk/cli.h" #include "asterisk/taskprocessor.h" - /*! * \brief tps_task structure is queued to a taskprocessor * @@ -67,14 +66,6 @@ struct tps_taskprocessor_stats { struct ast_taskprocessor { /*! \brief Friendly name of the taskprocessor */ const char *name; - /*! \brief Thread poll condition */ - ast_cond_t poll_cond; - /*! \brief Taskprocessor thread */ - pthread_t poll_thread; - /*! \brief Taskprocessor lock */ - ast_mutex_t taskprocessor_lock; - /*! \brief Taskprocesor thread run flag */ - unsigned char poll_thread_run; /*! \brief Taskprocessor statistics */ struct tps_taskprocessor_stats *stats; /*! \brief Taskprocessor current queue size */ @@ -83,7 +74,30 @@ struct ast_taskprocessor { AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue; /*! \brief Taskprocessor singleton list entry */ AST_LIST_ENTRY(ast_taskprocessor) list; + struct ast_taskprocessor_listener *listener; + /*! Indicates if the taskprocessor is in the process of shuting down */ + unsigned int shutting_down:1; }; + +/*! + * \brief A listener for taskprocessors + * + * \since 12.0.0 + * + * When a taskprocessor's state changes, the listener + * is notified of the change. This allows for tasks + * to be addressed in whatever way is appropriate for + * the module using the taskprocessor. + */ +struct ast_taskprocessor_listener { + /*! The callbacks the taskprocessor calls into to notify of state changes */ + const struct ast_taskprocessor_listener_callbacks *callbacks; + /*! The taskprocessor that the listener is listening to */ + struct ast_taskprocessor *tps; + /*! Data private to the listener */ + void *user_data; +}; + #define TPS_MAX_BUCKETS 7 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */ static struct ao2_container *tps_singletons; @@ -122,6 +136,94 @@ static struct ast_cli_entry taskprocessor_clis[] = { AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"), }; +struct default_taskprocessor_listener_pvt { + pthread_t poll_thread; + ast_mutex_t lock; + ast_cond_t cond; + int wake_up; + int dead; +}; + + +static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die) +{ + SCOPED_MUTEX(lock, &pvt->lock); + pvt->wake_up = 1; + pvt->dead = should_die; + ast_cond_signal(&pvt->cond); +} + +static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt) +{ + SCOPED_MUTEX(lock, &pvt->lock); + while (!pvt->wake_up) { + ast_cond_wait(&pvt->cond, lock); + } + pvt->wake_up = 0; + return pvt->dead; +} + +/*! + * \brief Function that processes tasks in the taskprocessor + * \internal + */ +static void *tps_processing_function(void *data) +{ + struct ast_taskprocessor_listener *listener = data; + struct ast_taskprocessor *tps = listener->tps; + struct default_taskprocessor_listener_pvt *pvt = listener->user_data; + int dead = 0; + + while (!dead) { + if (!ast_taskprocessor_execute(tps)) { + dead = default_tps_idle(pvt); + } + } + return NULL; +} + +static int default_listener_start(struct ast_taskprocessor_listener *listener) +{ + struct default_taskprocessor_listener_pvt *pvt = listener->user_data; + + if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) { + return -1; + } + + return 0; +} + +static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) +{ + struct default_taskprocessor_listener_pvt *pvt = listener->user_data; + + if (was_empty) { + default_tps_wake_up(pvt, 0); + } +} + +static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt) +{ + ast_mutex_destroy(&pvt->lock); + ast_cond_destroy(&pvt->cond); + ast_free(pvt); +} + +static void default_listener_shutdown(struct ast_taskprocessor_listener *listener) +{ + struct default_taskprocessor_listener_pvt *pvt = listener->user_data; + default_tps_wake_up(pvt, 1); + pthread_join(pvt->poll_thread, NULL); + pvt->poll_thread = AST_PTHREADT_NULL; + default_listener_pvt_destroy(pvt); +} + +static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = { + .start = default_listener_start, + .task_pushed = default_task_pushed, + .shutdown = default_listener_shutdown, +}; + /*! * \internal * \brief Clean up resources on Asterisk shutdown @@ -291,118 +393,67 @@ static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_arg return CLI_SUCCESS; } -/* this is the task processing worker function */ -static void *tps_processing_function(void *data) -{ - struct ast_taskprocessor *i = data; - struct tps_task *t; - int size; - - if (!i) { - ast_log(LOG_ERROR, "cannot start thread_function loop without a ast_taskprocessor structure.\n"); - return NULL; - } - - while (i->poll_thread_run) { - ast_mutex_lock(&i->taskprocessor_lock); - if (!i->poll_thread_run) { - ast_mutex_unlock(&i->taskprocessor_lock); - break; - } - if (!(size = tps_taskprocessor_depth(i))) { - ast_cond_wait(&i->poll_cond, &i->taskprocessor_lock); - if (!i->poll_thread_run) { - ast_mutex_unlock(&i->taskprocessor_lock); - break; - } - } - ast_mutex_unlock(&i->taskprocessor_lock); - /* stuff is in the queue */ - if (!(t = tps_taskprocessor_pop(i))) { - ast_log(LOG_ERROR, "Wtf?? %d tasks in the queue, but we're popping blanks!\n", size); - continue; - } - if (!t->execute) { - ast_log(LOG_WARNING, "Task is missing a function to execute!\n"); - tps_task_free(t); - continue; - } - t->execute(t->datap); - - ast_mutex_lock(&i->taskprocessor_lock); - if (i->stats) { - i->stats->_tasks_processed_count++; - if (size > i->stats->max_qsize) { - i->stats->max_qsize = size; - } - } - ast_mutex_unlock(&i->taskprocessor_lock); - - tps_task_free(t); - } - while ((t = tps_taskprocessor_pop(i))) { - tps_task_free(t); - } - return NULL; -} - /* hash callback for astobj2 */ static int tps_hash_cb(const void *obj, const int flags) { const struct ast_taskprocessor *tps = obj; + const char *name = flags & OBJ_KEY ? obj : tps->name; - return ast_str_case_hash(tps->name); + return ast_str_case_hash(name); } /* compare callback for astobj2 */ static int tps_cmp_cb(void *obj, void *arg, int flags) { struct ast_taskprocessor *lhs = obj, *rhs = arg; + const char *rhsname = flags & OBJ_KEY ? arg : rhs->name; - return !strcasecmp(lhs->name, rhs->name) ? CMP_MATCH | CMP_STOP : 0; + return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0; } /* destroy the taskprocessor */ static void tps_taskprocessor_destroy(void *tps) { struct ast_taskprocessor *t = tps; + struct tps_task *task; if (!tps) { ast_log(LOG_ERROR, "missing taskprocessor\n"); return; } ast_debug(1, "destroying taskprocessor '%s'\n", t->name); - /* kill it */ - ast_mutex_lock(&t->taskprocessor_lock); - t->poll_thread_run = 0; - ast_cond_signal(&t->poll_cond); - ast_mutex_unlock(&t->taskprocessor_lock); - pthread_join(t->poll_thread, NULL); - t->poll_thread = AST_PTHREADT_NULL; - ast_mutex_destroy(&t->taskprocessor_lock); - ast_cond_destroy(&t->poll_cond); /* free it */ if (t->stats) { ast_free(t->stats); t->stats = NULL; } ast_free((char *) t->name); + if (t->listener) { + /* This code should not be reached since the listener + * should have been destroyed before the taskprocessor could + * be destroyed + */ + ao2_ref(t->listener, -1); + t->listener = NULL; + } + while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) { + tps_task_free(task); + } } /* pop the front task and return it */ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps) { struct tps_task *task; + SCOPED_AO2LOCK(lock, tps); - if (!tps) { - ast_log(LOG_ERROR, "missing taskprocessor\n"); + if (tps->shutting_down) { return NULL; } - ast_mutex_lock(&tps->taskprocessor_lock); + if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) { tps->tps_queue_size--; } - ast_mutex_unlock(&tps->taskprocessor_lock); return task; } @@ -421,80 +472,169 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps) return tps->name; } +static void listener_shutdown(struct ast_taskprocessor_listener *listener) +{ + listener->callbacks->shutdown(listener); + ao2_ref(listener->tps, -1); +} + +struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data) +{ + RAII_VAR(struct ast_taskprocessor_listener *, listener, + ao2_alloc(sizeof(*listener), NULL), ao2_cleanup); + + if (!listener) { + return NULL; + } + listener->callbacks = callbacks; + listener->user_data = user_data; + + ao2_ref(listener, +1); + return listener; +} + +struct ast_taskprocessor *ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener) +{ + ao2_ref(listener->tps, +1); + return listener->tps; +} + +void *ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener) +{ + return listener->user_data; +} + +static void *default_listener_pvt_alloc(void) +{ + struct default_taskprocessor_listener_pvt *pvt; + + pvt = ast_calloc(1, sizeof(*pvt)); + if (!pvt) { + return NULL; + } + ast_cond_init(&pvt->cond, NULL); + ast_mutex_init(&pvt->lock); + pvt->poll_thread = AST_PTHREADT_NULL; + return pvt; +} + +static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener) +{ + RAII_VAR(struct ast_taskprocessor *, p, + ao2_alloc(sizeof(*p), tps_taskprocessor_destroy), ao2_cleanup); + + if (!p) { + ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name); + return NULL; + } + + if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) { + ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name); + return NULL; + } + if (!(p->name = ast_strdup(name))) { + ao2_ref(p, -1); + return NULL; + } + + ao2_ref(listener, +1); + p->listener = listener; + + ao2_ref(p, +1); + listener->tps = p; + + if (!(ao2_link(tps_singletons, p))) { + ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name); + return NULL; + } + + if (p->listener->callbacks->start(p->listener)) { + ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name); + ast_taskprocessor_unreference(p); + return NULL; + } + + /* RAII_VAR will decrement the refcount at the end of the function. + * Since we want to pass back a reference to p, we bump the refcount + */ + ao2_ref(p, +1); + return p; + +} + /* Provide a reference to a taskprocessor. Create the taskprocessor if necessary, but don't * create the taskprocessor if we were told via ast_tps_options to return a reference only * if it already exists */ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create) { - struct ast_taskprocessor *p, tmp_tps = { - .name = name, - }; + struct ast_taskprocessor *p; + struct ast_taskprocessor_listener *listener; + struct default_taskprocessor_listener_pvt *pvt; if (ast_strlen_zero(name)) { ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n"); return NULL; } - ao2_lock(tps_singletons); - p = ao2_find(tps_singletons, &tmp_tps, OBJ_POINTER); + p = ao2_find(tps_singletons, name, OBJ_KEY); if (p) { - ao2_unlock(tps_singletons); return p; } if (create & TPS_REF_IF_EXISTS) { /* calling function does not want a new taskprocessor to be created if it doesn't already exist */ - ao2_unlock(tps_singletons); return NULL; } - /* create a new taskprocessor */ - if (!(p = ao2_alloc(sizeof(*p), tps_taskprocessor_destroy))) { - ao2_unlock(tps_singletons); - ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name); + /* Create a new taskprocessor. Start by creating a default listener */ + pvt = default_listener_pvt_alloc(); + if (!pvt) { return NULL; } - - ast_cond_init(&p->poll_cond, NULL); - ast_mutex_init(&p->taskprocessor_lock); - - if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) { - ao2_unlock(tps_singletons); - ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name); - ao2_ref(p, -1); + listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt); + if (!listener) { + default_listener_pvt_destroy(pvt); return NULL; } - if (!(p->name = ast_strdup(name))) { - ao2_unlock(tps_singletons); - ao2_ref(p, -1); - return NULL; - } - p->poll_thread_run = 1; - p->poll_thread = AST_PTHREADT_NULL; - if (ast_pthread_create(&p->poll_thread, NULL, tps_processing_function, p) < 0) { - ao2_unlock(tps_singletons); - ast_log(LOG_ERROR, "Taskprocessor '%s' failed to create the processing thread.\n", p->name); - ao2_ref(p, -1); + + p = __allocate_taskprocessor(name, listener); + if (!p) { + default_listener_pvt_destroy(pvt); + ao2_ref(listener, -1); return NULL; } - if (!(ao2_link(tps_singletons, p))) { - ao2_unlock(tps_singletons); - ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name); - ao2_ref(p, -1); + + /* Unref listener here since the taskprocessor has gained a reference to the listener */ + ao2_ref(listener, -1); + return p; + +} + +struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener) +{ + struct ast_taskprocessor *p = ao2_find(tps_singletons, name, OBJ_KEY); + + if (p) { + ast_taskprocessor_unreference(p); return NULL; } - ao2_unlock(tps_singletons); - return p; + return __allocate_taskprocessor(name, listener); } /* decrement the taskprocessor reference count and unlink from the container if necessary */ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps) { - if (tps) { - ao2_lock(tps_singletons); - ao2_unlink(tps_singletons, tps); - if (ao2_ref(tps, -1) > 1) { - ao2_link(tps_singletons, tps); - } - ao2_unlock(tps_singletons); + if (!tps) { + return NULL; } + + if (ao2_ref(tps, -1) > 3) { + return NULL; + } + /* If we're down to 3 references, then those must be: + * 1. The reference we just got rid of + * 2. The container + * 3. The listener + */ + ao2_unlink(tps_singletons, tps); + listener_shutdown(tps->listener); return NULL; } @@ -502,6 +642,7 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps) int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap) { struct tps_task *t; + int previous_size; if (!tps || !task_exe) { ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor"); @@ -511,11 +652,40 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void * ast_log(LOG_ERROR, "failed to allocate task! Can't push to '%s'\n", tps->name); return -1; } - ast_mutex_lock(&tps->taskprocessor_lock); + ao2_lock(tps); AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list); - tps->tps_queue_size++; - ast_cond_signal(&tps->poll_cond); - ast_mutex_unlock(&tps->taskprocessor_lock); + previous_size = tps->tps_queue_size++; + ao2_unlock(tps); + tps->listener->callbacks->task_pushed(tps->listener, previous_size ? 0 : 1); return 0; } +int ast_taskprocessor_execute(struct ast_taskprocessor *tps) +{ + struct tps_task *t; + int size; + + if (!(t = tps_taskprocessor_pop(tps))) { + return 0; + } + + t->execute(t->datap); + + tps_task_free(t); + + ao2_lock(tps); + size = tps_taskprocessor_depth(tps); + if (tps->stats) { + tps->stats->_tasks_processed_count++; + if (size > tps->stats->max_qsize) { + tps->stats->max_qsize = size; + } + } + ao2_unlock(tps); + + if (size == 0 && tps->listener->callbacks->emptied) { + tps->listener->callbacks->emptied(tps->listener); + return 0; + } + return 1; +} diff --git a/main/threadpool.c b/main/threadpool.c new file mode 100644 index 0000000000000000000000000000000000000000..adaf8a554306e0dc48ab9258aa18208d7f93d1e3 --- /dev/null +++ b/main/threadpool.c @@ -0,0 +1,1105 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2012-2013, Digium, Inc. + * + * Mark Michelson <mmmichelson@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + + +#include "asterisk.h" + +#include "asterisk/threadpool.h" +#include "asterisk/taskprocessor.h" +#include "asterisk/astobj2.h" +#include "asterisk/utils.h" + +/* Needs to stay prime if increased */ +#define THREAD_BUCKETS 89 + +/*! + * \brief An opaque threadpool structure + * + * A threadpool is a collection of threads that execute + * tasks from a common queue. + */ +struct ast_threadpool { + /*! Threadpool listener */ + struct ast_threadpool_listener *listener; + /*! + * \brief The container of active threads. + * Active threads are those that are currently running tasks + */ + struct ao2_container *active_threads; + /*! + * \brief The container of idle threads. + * Idle threads are those that are currenly waiting to run tasks + */ + struct ao2_container *idle_threads; + /*! + * \brief The container of zombie threads. + * Zombie threads may be running tasks, but they are scheduled to die soon + */ + struct ao2_container *zombie_threads; + /*! + * \brief The main taskprocessor + * + * Tasks that are queued in this taskprocessor are + * doled out to the worker threads. Worker threads that + * execute tasks from the threadpool are executing tasks + * in this taskprocessor. + * + * The threadpool itself is actually the private data for + * this taskprocessor's listener. This way, as taskprocessor + * changes occur, the threadpool can alert its listeners + * appropriately. + */ + struct ast_taskprocessor *tps; + /*! + * \brief The control taskprocessor + * + * This is a standard taskprocessor that uses the default + * taskprocessor listener. In other words, all tasks queued to + * this taskprocessor have a single thread that executes the + * tasks. + * + * All tasks that modify the state of the threadpool and all tasks + * that call out to threadpool listeners are pushed to this + * taskprocessor. + * + * For instance, when the threadpool changes sizes, a task is put + * into this taskprocessor to do so. When it comes time to tell the + * threadpool listener that worker threads have changed state, + * the task is placed in this taskprocessor. + * + * This is done for three main reasons + * 1) It ensures that listeners are given an accurate portrayal + * of the threadpool's current state. In other words, when a listener + * gets told a count of active, idle and zombie threads, it does not + * need to worry that internal state of the threadpool might be different + * from what it has been told. + * 2) It minimizes the locking required in both the threadpool and in + * threadpool listener's callbacks. + * 3) It ensures that listener callbacks are called in the same order + * that the threadpool had its state change. + */ + struct ast_taskprocessor *control_tps; + /*! True if the threadpool is in the processof shutting down */ + int shutting_down; + /*! Threadpool-specific options */ + struct ast_threadpool_options options; +}; + +/*! + * \brief listener for a threadpool + * + * The listener is notified of changes in a threadpool. It can + * react by doing things like increasing the number of threads + * in the pool + */ +struct ast_threadpool_listener { + /*! Callbacks called by the threadpool */ + const struct ast_threadpool_listener_callbacks *callbacks; + /*! User data for the listener */ + void *user_data; +}; + +/*! + * \brief states for worker threads + */ +enum worker_state { + /*! The worker is either active or idle */ + ALIVE, + /*! + * The worker has been asked to shut down but + * may still be in the process of executing tasks. + * This transition happens when the threadpool needs + * to shrink and needs to kill active threads in order + * to do so. + */ + ZOMBIE, + /*! + * The worker has been asked to shut down. Typically + * only idle threads go to this state directly, but + * active threads may go straight to this state when + * the threadpool is shut down. + */ + DEAD, +}; + +/*! + * A thread that executes threadpool tasks + */ +struct worker_thread { + /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */ + int id; + /*! Condition used in conjunction with state changes */ + ast_cond_t cond; + /*! Lock used alongside the condition for state changes */ + ast_mutex_t lock; + /*! The actual thread that is executing tasks */ + pthread_t thread; + /*! A pointer to the threadpool. Needed to be able to execute tasks */ + struct ast_threadpool *pool; + /*! The current state of the worker thread */ + enum worker_state state; + /*! A boolean used to determine if an idle thread should become active */ + int wake_up; + /*! Options for this threadpool */ + struct ast_threadpool_options options; +}; + +/* Worker thread forward declarations. See definitions for documentation */ +static int worker_thread_hash(const void *obj, int flags); +static int worker_thread_cmp(void *obj, void *arg, int flags); +static void worker_thread_destroy(void *obj); +static void worker_active(struct worker_thread *worker); +static void *worker_start(void *arg); +static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool); +static int worker_thread_start(struct worker_thread *worker); +static int worker_idle(struct worker_thread *worker); +static void worker_set_state(struct worker_thread *worker, enum worker_state state); +static void worker_shutdown(struct worker_thread *worker); + +/*! + * \brief Notify the threadpool listener that the state has changed. + * + * This notifies the threadpool listener via its state_changed callback. + * \param pool The threadpool whose state has changed + */ +static void threadpool_send_state_changed(struct ast_threadpool *pool) +{ + int active_size = ao2_container_count(pool->active_threads); + int idle_size = ao2_container_count(pool->idle_threads); + + if (pool->listener && pool->listener->callbacks->state_changed) { + pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size); + } +} + +/*! + * \brief Struct used for queued operations involving worker state changes + */ +struct thread_worker_pair { + /*! Threadpool that contains the worker whose state has changed */ + struct ast_threadpool *pool; + /*! Worker whose state has changed */ + struct worker_thread *worker; +}; + +/*! + * \brief Destructor for thread_worker_pair + */ +static void thread_worker_pair_destructor(void *obj) +{ + struct thread_worker_pair *pair = obj; + ao2_ref(pair->worker, -1); +} + +/*! + * \brief Allocate and initialize a thread_worker_pair + * \param pool Threadpool to assign to the thread_worker_pair + * \param worker Worker thread to assign to the thread_worker_pair + */ +static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool, + struct worker_thread *worker) +{ + struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor); + if (!pair) { + return NULL; + } + pair->pool = pool; + ao2_ref(worker, +1); + pair->worker = worker; + return pair; +} + +/*! + * \brief Move a worker thread from the active container to the idle container. + * + * This function is called from the threadpool's control taskprocessor thread. + * \param data A thread_worker_pair containing the threadpool and the worker to move. + * \return 0 + */ +static int queued_active_thread_idle(void *data) +{ + struct thread_worker_pair *pair = data; + + ao2_link(pair->pool->idle_threads, pair->worker); + ao2_unlink(pair->pool->active_threads, pair->worker); + + threadpool_send_state_changed(pair->pool); + + ao2_ref(pair, -1); + return 0; +} + +/*! + * \brief Queue a task to move a thread from the active list to the idle list + * + * This is called by a worker thread when it runs out of tasks to perform and + * goes idle. + * \param pool The threadpool to which the worker belongs + * \param worker The worker thread that has gone idle + */ +static void threadpool_active_thread_idle(struct ast_threadpool *pool, + struct worker_thread *worker) +{ + struct thread_worker_pair *pair; + SCOPED_AO2LOCK(lock, pool); + if (pool->shutting_down) { + return; + } + pair = thread_worker_pair_alloc(pool, worker); + if (!pair) { + return; + } + ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair); +} + +/*! + * \brief Kill a zombie thread + * + * This runs from the threadpool's control taskprocessor thread. + * + * \param data A thread_worker_pair containing the threadpool and the zombie thread + * \return 0 + */ +static int queued_zombie_thread_dead(void *data) +{ + struct thread_worker_pair *pair = data; + + ao2_unlink(pair->pool->zombie_threads, pair->worker); + threadpool_send_state_changed(pair->pool); + + ao2_ref(pair, -1); + return 0; +} + +/*! + * \brief Queue a task to kill a zombie thread + * + * This is called by a worker thread when it acknowledges that it is time for + * it to die. + */ +static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, + struct worker_thread *worker) +{ + struct thread_worker_pair *pair; + SCOPED_AO2LOCK(lock, pool); + if (pool->shutting_down) { + return; + } + pair = thread_worker_pair_alloc(pool, worker); + if (!pair) { + return; + } + ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair); +} + +static int queued_idle_thread_dead(void *data) +{ + struct thread_worker_pair *pair = data; + + ao2_unlink(pair->pool->idle_threads, pair->worker); + threadpool_send_state_changed(pair->pool); + + ao2_ref(pair, -1); + return 0; +} + +static void threadpool_idle_thread_dead(struct ast_threadpool *pool, + struct worker_thread *worker) +{ + struct thread_worker_pair *pair; + SCOPED_AO2LOCK(lock, pool); + if (pool->shutting_down) { + return; + } + pair = thread_worker_pair_alloc(pool, worker); + if (!pair) { + return; + } + ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair); +} + +/*! + * \brief Execute a task in the threadpool + * + * This is the function that worker threads call in order to execute tasks + * in the threadpool + * + * \param pool The pool to which the tasks belong. + * \retval 0 Either the pool has been shut down or there are no tasks. + * \retval 1 There are still tasks remaining in the pool. + */ +static int threadpool_execute(struct ast_threadpool *pool) +{ + ao2_lock(pool); + if (!pool->shutting_down) { + ao2_unlock(pool); + return ast_taskprocessor_execute(pool->tps); + } + ao2_unlock(pool); + return 0; +} + +/*! + * \brief Destroy a threadpool's components. + * + * This is the destructor called automatically when the threadpool's + * reference count reaches zero. This is not to be confused with + * threadpool_destroy. + * + * By the time this actually gets called, most of the cleanup has already + * been done in the pool. The only thing left to do is to release the + * final reference to the threadpool listener. + * + * \param obj The pool to destroy + */ +static void threadpool_destructor(void *obj) +{ + struct ast_threadpool *pool = obj; + ao2_cleanup(pool->listener); +} + +/* + * \brief Allocate a threadpool + * + * This is implemented as a taskprocessor listener's alloc callback. This + * is because the threadpool exists as the private data on a taskprocessor + * listener. + * + * \param name The name of the threadpool. + * \param options The options the threadpool uses. + * \retval NULL Could not initialize threadpool properly + * \retval non-NULL The newly-allocated threadpool + */ +static void *threadpool_alloc(const char *name, const struct ast_threadpool_options *options) +{ + RAII_VAR(struct ast_threadpool *, pool, + ao2_alloc(sizeof(*pool), threadpool_destructor), ao2_cleanup); + struct ast_str *control_tps_name = ast_str_create(64); + + if (!control_tps_name) { + return NULL; + } + + ast_str_set(&control_tps_name, 0, "%s-control", name); + + pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT); + ast_free(control_tps_name); + if (!pool->control_tps) { + return NULL; + } + pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp); + if (!pool->active_threads) { + return NULL; + } + pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp); + if (!pool->idle_threads) { + return NULL; + } + pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp); + if (!pool->zombie_threads) { + return NULL; + } + pool->options = *options; + + ao2_ref(pool, +1); + return pool; +} + +static int threadpool_tps_start(struct ast_taskprocessor_listener *listener) +{ + return 0; +} + +/*! + * \brief helper used for queued task when tasks are pushed + */ +struct task_pushed_data { + /*! Pool into which a task was pushed */ + struct ast_threadpool *pool; + /*! Indicator of whether the pool had no tasks prior to the new task being added */ + int was_empty; +}; + +/*! + * \brief Allocate and initialize a task_pushed_data + * \param pool The threadpool to set in the task_pushed_data + * \param was_empty The was_empty value to set in the task_pushed_data + * \retval NULL Unable to allocate task_pushed_data + * \retval non-NULL The newly-allocated task_pushed_data + */ +static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool, + int was_empty) +{ + struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL); + + if (!tpd) { + return NULL; + } + tpd->pool = pool; + tpd->was_empty = was_empty; + return tpd; +} + +/*! + * \brief Activate idle threads + * + * This function always returns CMP_MATCH because all workers that this + * function acts on need to be seen as matches so they are unlinked from the + * list of idle threads. + * + * Called as an ao2_callback in the threadpool's control taskprocessor thread. + * \param obj The worker to activate + * \param arg The pool where the worker belongs + * \retval CMP_MATCH + */ +static int activate_thread(void *obj, void *arg, int flags) +{ + struct worker_thread *worker = obj; + struct ast_threadpool *pool = arg; + + if (!ao2_link(pool->active_threads, worker)) { + /* If we can't link the idle thread into the active container, then + * we'll just leave the thread idle and not wake it up. + */ + ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n", + worker->id); + return 0; + } + worker_set_state(worker, ALIVE); + return CMP_MATCH; +} + +/*! + * \brief Add threads to the threadpool + * + * This function is called from the threadpool's control taskprocessor thread. + * \param pool The pool that is expanding + * \delta The number of threads to add to the pool + */ +static void grow(struct ast_threadpool *pool, int delta) +{ + int i; + + int current_size = ao2_container_count(pool->active_threads) + + ao2_container_count(pool->idle_threads); + + if (pool->options.max_size && current_size + delta > pool->options.max_size) { + delta = pool->options.max_size - current_size; + } + + ast_debug(3, "Increasing threadpool %s's size by %d\n", + ast_taskprocessor_name(pool->tps), delta); + + for (i = 0; i < delta; ++i) { + struct worker_thread *worker = worker_thread_alloc(pool); + if (!worker) { + return; + } + if (ao2_link(pool->active_threads, worker)) { + if (worker_thread_start(worker)) { + ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id); + ao2_unlink(pool->active_threads, worker); + } + } else { + ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id); + } + ao2_ref(worker, -1); + } +} + +/*! + * \brief Queued task called when tasks are pushed into the threadpool + * + * This function first calls into the threadpool's listener to let it know + * that a task has been pushed. It then wakes up all idle threads and moves + * them into the active thread container. + * \param data A task_pushed_data + * \return 0 + */ +static int queued_task_pushed(void *data) +{ + struct task_pushed_data *tpd = data; + struct ast_threadpool *pool = tpd->pool; + int was_empty = tpd->was_empty; + int state_changed; + + if (pool->listener && pool->listener->callbacks->task_pushed) { + pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty); + } + if (ao2_container_count(pool->idle_threads) == 0) { + if (pool->options.auto_increment > 0) { + grow(pool, pool->options.auto_increment); + state_changed = 1; + } + } else { + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, + activate_thread, pool); + state_changed = 1; + } + if (state_changed) { + threadpool_send_state_changed(pool); + } + ao2_ref(tpd, -1); + return 0; +} + +/*! + * \brief Taskprocessor listener callback called when a task is added + * + * The threadpool uses this opportunity to queue a task on its control taskprocessor + * in order to activate idle threads and notify the threadpool listener that the + * task has been pushed. + * \param listener The taskprocessor listener. The threadpool is the listener's private data + * \param was_empty True if the taskprocessor was empty prior to the task being pushed + */ +static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, + int was_empty) +{ + struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener); + struct task_pushed_data *tpd; + SCOPED_AO2LOCK(lock, pool); + + if (pool->shutting_down) { + return; + } + tpd = task_pushed_data_alloc(pool, was_empty); + if (!tpd) { + return; + } + + ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd); +} + +/*! + * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied + * + * This simply lets the threadpool's listener know that the threadpool is devoid of tasks + * \param data The pool that has become empty + * \return 0 + */ +static int queued_emptied(void *data) +{ + struct ast_threadpool *pool = data; + + /* We already checked for existence of this callback when this was queued */ + pool->listener->callbacks->emptied(pool, pool->listener); + return 0; +} + +/*! + * \brief Taskprocessor listener emptied callback + * + * The threadpool queues a task to let the threadpool listener know that + * the threadpool no longer contains any tasks. + * \param listener The taskprocessor listener. The threadpool is the listener's private data. + */ +static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) +{ + struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener); + SCOPED_AO2LOCK(lock, pool); + + if (pool->shutting_down) { + return; + } + + if (pool->listener && pool->listener->callbacks->emptied) { + ast_taskprocessor_push(pool->control_tps, queued_emptied, pool); + } +} + +/*! + * \brief Taskprocessor listener shutdown callback + * + * The threadpool will shut down and destroy all of its worker threads when + * this is called back. By the time this gets called, the taskprocessor's + * control taskprocessor has already been destroyed. Therefore there is no risk + * in outright destroying the worker threads here. + * \param listener The taskprocessor listener. The threadpool is the listener's private data. + */ +static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener) +{ + struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener); + + if (pool->listener && pool->listener->callbacks->shutdown) { + pool->listener->callbacks->shutdown(pool->listener); + } + ao2_cleanup(pool->active_threads); + ao2_cleanup(pool->idle_threads); + ao2_cleanup(pool->zombie_threads); + ao2_cleanup(pool); +} + +/*! + * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor + */ +static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = { + .start = threadpool_tps_start, + .task_pushed = threadpool_tps_task_pushed, + .emptied = threadpool_tps_emptied, + .shutdown = threadpool_tps_shutdown, +}; + +/*! + * \brief ao2 callback to kill a set number of threads. + * + * Threads will be unlinked from the container as long as the + * counter has not reached zero. The counter is decremented with + * each thread that is removed. + * \param obj The worker thread up for possible destruction + * \param arg The counter + * \param flags Unused + * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed. + * \retval CMP_STOP The counter has reached zero so no more threads should be removed. + */ +static int kill_threads(void *obj, void *arg, int flags) +{ + int *num_to_kill = arg; + + if (*num_to_kill > 0) { + --(*num_to_kill); + return CMP_MATCH; + } else { + return CMP_STOP; + } +} + +/*! + * \brief ao2 callback to zombify a set number of threads. + * + * Threads will be zombified as long as as the counter has not reached + * zero. The counter is decremented with each thread that is zombified. + * + * Zombifying a thread involves removing it from its current container, + * adding it to the zombie container, and changing the state of the + * worker to a zombie + * + * This callback is called from the threadpool control taskprocessor thread. + * + * \param obj The worker thread that may be zombified + * \param arg The pool to which the worker belongs + * \param data The counter + * \param flags Unused + * \retval CMP_MATCH The zombified thread should be removed from its current container + * \retval CMP_STOP Stop attempting to zombify threads + */ +static int zombify_threads(void *obj, void *arg, void *data, int flags) +{ + struct worker_thread *worker = obj; + struct ast_threadpool *pool = arg; + int *num_to_zombify = data; + + if ((*num_to_zombify)-- > 0) { + if (!ao2_link(pool->zombie_threads, worker)) { + ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id); + return 0; + } + worker_set_state(worker, ZOMBIE); + return CMP_MATCH; + } else { + return CMP_STOP; + } +} + +/*! + * \brief Remove threads from the threadpool + * + * The preference is to kill idle threads. However, if there are + * more threads to remove than there are idle threads, then active + * threads will be zombified instead. + * + * This function is called from the threadpool control taskprocessor thread. + * + * \param pool The threadpool to remove threads from + * \param delta The number of threads to remove + */ +static void shrink(struct ast_threadpool *pool, int delta) +{ + /* + * Preference is to kill idle threads, but + * we'll move on to deactivating active threads + * if we have to + */ + int idle_threads = ao2_container_count(pool->idle_threads); + int idle_threads_to_kill = MIN(delta, idle_threads); + int active_threads_to_zombify = delta - idle_threads_to_kill; + + ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill, + ast_taskprocessor_name(pool->tps)); + + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, + kill_threads, &idle_threads_to_kill); + + ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify, + ast_taskprocessor_name(pool->tps)); + + ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, + zombify_threads, pool, &active_threads_to_zombify); +} + +/*! + * \brief Helper struct used for queued operations that change the size of the threadpool + */ +struct set_size_data { + /*! The pool whose size is to change */ + struct ast_threadpool *pool; + /*! The requested new size of the pool */ + unsigned int size; +}; + +/*! + * \brief Allocate and initialize a set_size_data + * \param pool The pool for the set_size_data + * \param size The size to store in the set_size_data + */ +static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool, + unsigned int size) +{ + struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL); + if (!ssd) { + return NULL; + } + + ssd->pool = pool; + ssd->size = size; + return ssd; +} + +/*! + * \brief Change the size of the threadpool + * + * This can either result in shrinking or growing the threadpool depending + * on the new desired size and the current size. + * + * This function is run from the threadpool control taskprocessor thread + * + * \param data A set_size_data used for determining how to act + * \return 0 + */ +static int queued_set_size(void *data) +{ + struct set_size_data *ssd = data; + struct ast_threadpool *pool = ssd->pool; + unsigned int num_threads = ssd->size; + + /* We don't count zombie threads as being "live" when potentially resizing */ + unsigned int current_size = ao2_container_count(pool->active_threads) + + ao2_container_count(pool->idle_threads); + + if (current_size == num_threads) { + ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n", + num_threads, current_size); + return 0; + } + + if (current_size < num_threads) { + grow(pool, num_threads - current_size); + } else { + shrink(pool, current_size - num_threads); + } + + threadpool_send_state_changed(pool); + ao2_ref(ssd, -1); + return 0; +} + +void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size) +{ + struct set_size_data *ssd; + SCOPED_AO2LOCK(lock, pool); + if (pool->shutting_down) { + return; + } + + ssd = set_size_data_alloc(pool, size); + if (!ssd) { + return; + } + + ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd); +} + +struct ast_threadpool_listener *ast_threadpool_listener_alloc( + const struct ast_threadpool_listener_callbacks *callbacks, void *user_data) +{ + struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL); + if (!listener) { + return NULL; + } + listener->callbacks = callbacks; + listener->user_data = user_data; + return listener; +} + +void *ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener) +{ + return listener->user_data; +} + +struct pool_options_pair { + struct ast_threadpool *pool; + struct ast_threadpool_options options; +}; + +struct ast_threadpool *ast_threadpool_create(const char *name, + struct ast_threadpool_listener *listener, + const struct ast_threadpool_options *options) +{ + struct ast_taskprocessor *tps; + RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup); + RAII_VAR(struct ast_threadpool *, pool, threadpool_alloc(name, options), ao2_cleanup); + + if (!pool) { + return NULL; + } + + tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool); + if (!tps_listener) { + return NULL; + } + + if (options->version != AST_THREADPOOL_OPTIONS_VERSION) { + ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n"); + return NULL; + } + + tps = ast_taskprocessor_create_with_listener(name, tps_listener); + if (!tps) { + return NULL; + } + + pool->tps = tps; + if (listener) { + ao2_ref(listener, +1); + pool->listener = listener; + } + ast_threadpool_set_size(pool, pool->options.initial_size); + ao2_ref(pool, +1); + return pool; +} + +int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data) +{ + SCOPED_AO2LOCK(lock, pool); + if (!pool->shutting_down) { + return ast_taskprocessor_push(pool->tps, task, data); + } + return 0; +} + +void ast_threadpool_shutdown(struct ast_threadpool *pool) +{ + if (!pool) { + return; + } + /* Shut down the taskprocessors and everything else just + * takes care of itself via the taskprocessor callbacks + */ + ao2_lock(pool); + pool->shutting_down = 1; + ao2_unlock(pool); + ast_taskprocessor_unreference(pool->control_tps); + ast_taskprocessor_unreference(pool->tps); +} + +/*! + * A monotonically increasing integer used for worker + * thread identification. + */ +static int worker_id_counter; + +static int worker_thread_hash(const void *obj, int flags) +{ + const struct worker_thread *worker = obj; + + return worker->id; +} + +static int worker_thread_cmp(void *obj, void *arg, int flags) +{ + struct worker_thread *worker1 = obj; + struct worker_thread *worker2 = arg; + + return worker1->id == worker2->id ? CMP_MATCH : 0; +} + +/*! + * \brief shut a worker thread down + * + * Set the worker dead and then wait for its thread + * to finish executing. + * + * \param worker The worker thread to shut down + */ +static void worker_shutdown(struct worker_thread *worker) +{ + worker_set_state(worker, DEAD); + if (worker->thread != AST_PTHREADT_NULL) { + pthread_join(worker->thread, NULL); + worker->thread = AST_PTHREADT_NULL; + } +} + +/*! + * \brief Worker thread destructor + * + * Called automatically when refcount reaches 0. Shuts + * down the worker thread and destroys its component + * parts + */ +static void worker_thread_destroy(void *obj) +{ + struct worker_thread *worker = obj; + ast_debug(3, "Destroying worker thread %d\n", worker->id); + worker_shutdown(worker); + ast_mutex_destroy(&worker->lock); + ast_cond_destroy(&worker->cond); +} + +/*! + * \brief start point for worker threads + * + * Worker threads start in the active state but may + * immediately go idle if there is no work to be + * done + * + * \param arg The worker thread + * \retval NULL + */ +static void *worker_start(void *arg) +{ + struct worker_thread *worker = arg; + + worker_active(worker); + return NULL; +} + +/*! + * \brief Allocate and initialize a new worker thread + * + * This will create, initialize, and start the thread. + * + * \param pool The threadpool to which the worker will be added + * \retval NULL Failed to allocate or start the worker thread + * \retval non-NULL The newly-created worker thread + */ +static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool) +{ + struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy); + if (!worker) { + return NULL; + } + worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1); + ast_mutex_init(&worker->lock); + ast_cond_init(&worker->cond, NULL); + worker->pool = pool; + worker->thread = AST_PTHREADT_NULL; + worker->state = ALIVE; + worker->options = pool->options; + return worker; +} + +static int worker_thread_start(struct worker_thread *worker) +{ + return ast_pthread_create(&worker->thread, NULL, worker_start, worker); +} + +/*! + * \brief Active loop for worker threads + * + * The worker will stay in this loop for its lifetime, + * executing tasks as they become available. If there + * are no tasks currently available, then the thread + * will go idle. + * + * \param worker The worker thread executing tasks. + */ +static void worker_active(struct worker_thread *worker) +{ + int alive = 1; + while (alive) { + if (!threadpool_execute(worker->pool)) { + alive = worker_idle(worker); + } + } + + /* Reaching this portion means the thread is + * on death's door. It may have been killed while + * it was idle, in which case it can just die + * peacefully. If it's a zombie, though, then + * it needs to let the pool know so + * that the thread can be removed from the + * list of zombie threads. + */ + if (worker->state == ZOMBIE) { + threadpool_zombie_thread_dead(worker->pool, worker); + } +} + +/*! + * \brief Idle function for worker threads + * + * The worker waits here until it gets told by the threadpool + * to wake up. + * + * \param worker The idle worker + * \retval 0 The thread is being woken up so that it can conclude. + * \retval non-zero The thread is being woken up to do more work. + */ +static int worker_idle(struct worker_thread *worker) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + worker->options.idle_timeout, + .tv_nsec = start.tv_usec * 1000, + }; + SCOPED_MUTEX(lock, &worker->lock); + if (worker->state != ALIVE) { + return 0; + } + threadpool_active_thread_idle(worker->pool, worker); + while (!worker->wake_up) { + if (worker->options.idle_timeout <= 0) { + ast_cond_wait(&worker->cond, lock); + } else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) { + break; + } + } + + if (!worker->wake_up) { + ast_debug(1, "Worker thread idle timeout reached. Dying.\n"); + threadpool_idle_thread_dead(worker->pool, worker); + worker->state = DEAD; + } + worker->wake_up = 0; + return worker->state == ALIVE; +} + +/*! + * \brief Change a worker's state + * + * The threadpool calls into this function in order to let a worker know + * how it should proceed. + */ +static void worker_set_state(struct worker_thread *worker, enum worker_state state) +{ + SCOPED_MUTEX(lock, &worker->lock); + worker->state = state; + worker->wake_up = 1; + ast_cond_signal(&worker->cond); +} + diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c new file mode 100644 index 0000000000000000000000000000000000000000..e370dd78f356918bd77ae1e5fee63aca5f38d806 --- /dev/null +++ b/tests/test_taskprocessor.c @@ -0,0 +1,469 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2012-2013, Digium, Inc. + * + * Mark Michelson <mmichelson@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file + * \brief taskprocessor unit tests + * + * \author Mark Michelson <mmichelson@digium.com> + * + */ + +/*** MODULEINFO + <depend>TEST_FRAMEWORK</depend> + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +#include "asterisk/test.h" +#include "asterisk/taskprocessor.h" +#include "asterisk/module.h" +#include "asterisk/astobj2.h" + +/*! + * \brief userdata associated with baseline taskprocessor test + */ +struct task_data { + /* Condition used to signal to queuing thread that task was executed */ + ast_cond_t cond; + /* Lock protecting the condition */ + ast_mutex_t lock; + /*! Boolean indicating that the task was run */ + int task_complete; +}; + +/*! + * \brief Queued task for baseline test. + * + * The task simply sets a boolean to indicate the + * task has been run and then signals a condition + * saying it's complete + */ +static int task(void *data) +{ + struct task_data *task_data = data; + SCOPED_MUTEX(lock, &task_data->lock); + task_data->task_complete = 1; + ast_cond_signal(&task_data->cond); + return 0; +} + +/*! + * \brief Baseline test for default taskprocessor + * + * This test ensures that when a task is added to a taskprocessor that + * has been allocated with a default listener that the task gets executed + * as expected + */ +AST_TEST_DEFINE(default_taskprocessor) +{ + struct ast_taskprocessor *tps; + struct task_data task_data; + struct timeval start; + struct timespec ts; + enum ast_test_result_state res = AST_TEST_PASS; + int timedwait_res; + + switch (cmd) { + case TEST_INIT: + info->name = "default_taskprocessor"; + info->category = "/main/taskprocessor/"; + info->summary = "Test of default taskproccesor"; + info->description = + "Ensures that a queued task gets executed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT); + + if (!tps) { + ast_test_status_update(test, "Unable to create test taskprocessor\n"); + return AST_TEST_FAIL; + } + + start = ast_tvnow(); + + ts.tv_sec = start.tv_sec + 30; + ts.tv_nsec = start.tv_usec * 1000; + + ast_cond_init(&task_data.cond, NULL); + ast_mutex_init(&task_data.lock); + task_data.task_complete = 0; + + ast_taskprocessor_push(tps, task, &task_data); + ast_mutex_lock(&task_data.lock); + while (!task_data.task_complete) { + timedwait_res = ast_cond_timedwait(&task_data.cond, &task_data.lock, &ts); + if (timedwait_res == ETIMEDOUT) { + break; + } + } + ast_mutex_unlock(&task_data.lock); + + if (!task_data.task_complete) { + ast_test_status_update(test, "Queued task did not execute!\n"); + res = AST_TEST_FAIL; + goto test_end; + } + +test_end: + tps = ast_taskprocessor_unreference(tps); + ast_mutex_destroy(&task_data.lock); + ast_cond_destroy(&task_data.cond); + return res; +} + +#define NUM_TASKS 20000 + +/*! + * \brief Relevant data associated with taskprocessor load test + */ +static struct load_task_data { + /*! Condition used to indicate a task has completed executing */ + ast_cond_t cond; + /*! Lock used to protect the condition */ + ast_mutex_t lock; + /*! Counter of the number of completed tasks */ + int tasks_completed; + /*! Storage for task-specific data */ + int task_rand[NUM_TASKS]; +} load_task_results; + +/*! + * \brief a queued task to be used in the taskprocessor load test + * + * The task increments the number of tasks executed and puts the passed-in + * data into the next slot in the array of random data. + */ +static int load_task(void *data) +{ + int *randdata = data; + SCOPED_MUTEX(lock, &load_task_results.lock); + load_task_results.task_rand[load_task_results.tasks_completed++] = *randdata; + ast_cond_signal(&load_task_results.cond); + return 0; +} + +/*! + * \brief Load test for taskprocessor with default listener + * + * This test queues a large number of tasks, each with random data associated. + * The test ensures that all of the tasks are run and that the tasks are executed + * in the same order that they were queued + */ +AST_TEST_DEFINE(default_taskprocessor_load) +{ + struct ast_taskprocessor *tps; + struct timeval start; + struct timespec ts; + enum ast_test_result_state res = AST_TEST_PASS; + int timedwait_res; + int i; + int rand_data[NUM_TASKS]; + + switch (cmd) { + case TEST_INIT: + info->name = "default_taskprocessor_load"; + info->category = "/main/taskprocessor/"; + info->summary = "Load test of default taskproccesor"; + info->description = + "Ensure that a large number of queued tasks are executed in the proper order."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT); + + if (!tps) { + ast_test_status_update(test, "Unable to create test taskprocessor\n"); + return AST_TEST_FAIL; + } + + start = ast_tvnow(); + + ts.tv_sec = start.tv_sec + 60; + ts.tv_nsec = start.tv_usec * 1000; + + ast_cond_init(&load_task_results.cond, NULL); + ast_mutex_init(&load_task_results.lock); + load_task_results.tasks_completed = 0; + + for (i = 0; i < NUM_TASKS; ++i) { + rand_data[i] = ast_random(); + ast_taskprocessor_push(tps, load_task, &rand_data[i]); + } + + ast_mutex_lock(&load_task_results.lock); + while (load_task_results.tasks_completed < NUM_TASKS) { + timedwait_res = ast_cond_timedwait(&load_task_results.cond, &load_task_results.lock, &ts); + if (timedwait_res == ETIMEDOUT) { + break; + } + } + ast_mutex_unlock(&load_task_results.lock); + + if (load_task_results.tasks_completed != NUM_TASKS) { + ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n", + NUM_TASKS, load_task_results.tasks_completed); + res = AST_TEST_FAIL; + goto test_end; + } + + for (i = 0; i < NUM_TASKS; ++i) { + if (rand_data[i] != load_task_results.task_rand[i]) { + ast_test_status_update(test, "Queued tasks did not execute in order\n"); + res = AST_TEST_FAIL; + goto test_end; + } + } + +test_end: + tps = ast_taskprocessor_unreference(tps); + ast_mutex_destroy(&load_task_results.lock); + ast_cond_destroy(&load_task_results.cond); + return res; +} + +/*! + * \brief Private data for the test taskprocessor listener + */ +struct test_listener_pvt { + /* Counter of number of tasks pushed to the queue */ + int num_pushed; + /* Counter of number of times the queue was emptied */ + int num_emptied; + /* Counter of number of times that a pushed task occurred on an empty queue */ + int num_was_empty; + /* Boolean indicating whether the shutdown callback was called */ + int shutdown; +}; + +/*! + * \brief test taskprocessor listener's alloc callback + */ +static void *test_listener_pvt_alloc(void) +{ + struct test_listener_pvt *pvt; + + pvt = ast_calloc(1, sizeof(*pvt)); + return pvt; +} + +/*! + * \brief test taskprocessor listener's start callback + */ +static int test_start(struct ast_taskprocessor_listener *listener) +{ + return 0; +} + +/*! + * \brief test taskprocessor listener's task_pushed callback + * + * Adjusts private data's stats as indicated by the parameters. + */ +static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) +{ + struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener); + ++pvt->num_pushed; + if (was_empty) { + ++pvt->num_was_empty; + } +} + +/*! + * \brief test taskprocessor listener's emptied callback. + */ +static void test_emptied(struct ast_taskprocessor_listener *listener) +{ + struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener); + ++pvt->num_emptied; +} + +/*! + * \brief test taskprocessor listener's shutdown callback. + */ +static void test_shutdown(struct ast_taskprocessor_listener *listener) +{ + struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener); + pvt->shutdown = 1; +} + +static const struct ast_taskprocessor_listener_callbacks test_callbacks = { + .start = test_start, + .task_pushed = test_task_pushed, + .emptied = test_emptied, + .shutdown = test_shutdown, +}; + +/*! + * \brief Queued task for taskprocessor listener test. + * + * Does nothing. + */ +static int listener_test_task(void *ignore) +{ + return 0; +} + +/*! + * \brief helper to ensure that statistics the listener is keeping are what we expect + * + * \param test The currently-running test + * \param pvt The private data for the taskprocessor listener + * \param num_pushed The expected current number of tasks pushed to the processor + * \param num_emptied The expected current number of times the taskprocessor has become empty + * \param num_was_empty The expected current number of times that tasks were pushed to an empty taskprocessor + * \retval -1 Stats were not as expected + * \retval 0 Stats were as expected + */ +static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty) +{ + if (pvt->num_pushed != num_pushed) { + ast_test_status_update(test, "Unexpected number of tasks pushed. Expected %d but got %d\n", + num_pushed, pvt->num_pushed); + return -1; + } + + if (pvt->num_emptied != num_emptied) { + ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n", + num_emptied, pvt->num_emptied); + return -1; + } + + if (pvt->num_was_empty != num_was_empty) { + ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n", + num_was_empty, pvt->num_emptied); + return -1; + } + + return 0; +} + +/*! + * \brief Test for a taskprocessor with custom listener. + * + * This test pushes tasks to a taskprocessor with a custom listener, executes the taskss, + * and destroys the taskprocessor. + * + * The test ensures that the listener's callbacks are called when expected and that the data + * being passed in is accurate. + */ +AST_TEST_DEFINE(taskprocessor_listener) +{ + struct ast_taskprocessor *tps = NULL; + struct ast_taskprocessor_listener *listener = NULL; + struct test_listener_pvt *pvt = NULL; + enum ast_test_result_state res = AST_TEST_PASS; + + switch (cmd) { + case TEST_INIT: + info->name = "taskprocessor_listener"; + info->category = "/main/taskprocessor/"; + info->summary = "Test of taskproccesor listeners"; + info->description = + "Ensures that listener callbacks are called when expected."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + pvt = test_listener_pvt_alloc(); + if (!pvt) { + ast_test_status_update(test, "Unable to allocate test taskprocessor listener user data\n"); + return AST_TEST_FAIL; + } + + listener = ast_taskprocessor_listener_alloc(&test_callbacks, pvt); + if (!listener) { + ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n"); + res = AST_TEST_FAIL; + goto test_exit; + } + + tps = ast_taskprocessor_create_with_listener("test_listener", listener); + if (!tps) { + ast_test_status_update(test, "Unable to allocate test taskprocessor\n"); + res = AST_TEST_FAIL; + goto test_exit; + } + + ast_taskprocessor_push(tps, listener_test_task, NULL); + + if (check_stats(test, pvt, 1, 0, 1) < 0) { + res = AST_TEST_FAIL; + goto test_exit; + } + + ast_taskprocessor_push(tps, listener_test_task, NULL); + + if (check_stats(test, pvt, 2, 0, 1) < 0) { + res = AST_TEST_FAIL; + goto test_exit; + } + + ast_taskprocessor_execute(tps); + + if (check_stats(test, pvt, 2, 0, 1) < 0) { + res = AST_TEST_FAIL; + goto test_exit; + } + + ast_taskprocessor_execute(tps); + + if (check_stats(test, pvt, 2, 1, 1) < 0) { + res = AST_TEST_FAIL; + goto test_exit; + } + + tps = ast_taskprocessor_unreference(tps); + + if (!pvt->shutdown) { + res = AST_TEST_FAIL; + goto test_exit; + } + +test_exit: + ao2_cleanup(listener); + /* This is safe even if tps is NULL */ + ast_taskprocessor_unreference(tps); + ast_free(pvt); + return res; +} + +static int unload_module(void) +{ + ast_test_unregister(default_taskprocessor); + ast_test_unregister(default_taskprocessor_load); + ast_test_unregister(taskprocessor_listener); + return 0; +} + +static int load_module(void) +{ + ast_test_register(default_taskprocessor); + ast_test_register(default_taskprocessor_load); + ast_test_register(taskprocessor_listener); + return AST_MODULE_LOAD_SUCCESS; +} + +AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "taskprocessor test module"); diff --git a/tests/test_threadpool.c b/tests/test_threadpool.c new file mode 100644 index 0000000000000000000000000000000000000000..712b8581b59e5204f66137de8f2dca95e2a0468e --- /dev/null +++ b/tests/test_threadpool.c @@ -0,0 +1,1430 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2012-2013, Digium, Inc. + * + * Mark Michelson <mmichelson@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file + * \brief threadpool unit tests + * + * \author Mark Michelson <mmichelson@digium.com> + * + */ + +/*** MODULEINFO + <depend>TEST_FRAMEWORK</depend> + <support_level>core</support_level> + ***/ + +#include "asterisk.h" + +#include "asterisk/test.h" +#include "asterisk/threadpool.h" +#include "asterisk/module.h" +#include "asterisk/lock.h" +#include "asterisk/astobj2.h" +#include "asterisk/logger.h" + +struct test_listener_data { + int num_active; + int num_idle; + int task_pushed; + int num_tasks; + int empty_notice; + int was_empty; + ast_mutex_t lock; + ast_cond_t cond; +}; + +static struct test_listener_data *test_alloc(void) +{ + struct test_listener_data *tld = ast_calloc(1, sizeof(*tld)); + if (!tld) { + return NULL; + } + ast_mutex_init(&tld->lock); + ast_cond_init(&tld->cond, NULL); + return tld; +} + +static void test_state_changed(struct ast_threadpool *pool, + struct ast_threadpool_listener *listener, + int active_threads, + int idle_threads) +{ + struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener); + SCOPED_MUTEX(lock, &tld->lock); + tld->num_active = active_threads; + tld->num_idle = idle_threads; + ast_log(LOG_NOTICE, "Thread state: %d active, %d idle\n", tld->num_active, tld->num_idle); + ast_cond_signal(&tld->cond); +} + +static void test_task_pushed(struct ast_threadpool *pool, + struct ast_threadpool_listener *listener, + int was_empty) +{ + struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener); + SCOPED_MUTEX(lock, &tld->lock); + tld->task_pushed = 1; + ++tld->num_tasks; + tld->was_empty = was_empty; + ast_cond_signal(&tld->cond); +} + +static void test_emptied(struct ast_threadpool *pool, + struct ast_threadpool_listener *listener) +{ + struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener); + SCOPED_MUTEX(lock, &tld->lock); + tld->empty_notice = 1; + ast_cond_signal(&tld->cond); +} + +static void test_shutdown(struct ast_threadpool_listener *listener) +{ + struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener); + ast_cond_destroy(&tld->cond); + ast_mutex_destroy(&tld->lock); +} + +static const struct ast_threadpool_listener_callbacks test_callbacks = { + .state_changed = test_state_changed, + .task_pushed = test_task_pushed, + .emptied = test_emptied, + .shutdown = test_shutdown, +}; + +struct simple_task_data { + int task_executed; + ast_mutex_t lock; + ast_cond_t cond; +}; + +static struct simple_task_data *simple_task_data_alloc(void) +{ + struct simple_task_data *std = ast_calloc(1, sizeof(*std)); + + if (!std) { + return NULL; + } + ast_mutex_init(&std->lock); + ast_cond_init(&std->cond, NULL); + return std; +} + +static int simple_task(void *data) +{ + struct simple_task_data *std = data; + SCOPED_MUTEX(lock, &std->lock); + std->task_executed = 1; + ast_cond_signal(&std->cond); + return 0; +} + +static enum ast_test_result_state wait_until_thread_state(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 5, + .tv_nsec = start.tv_usec * 1000 + }; + enum ast_test_result_state res = AST_TEST_PASS; + SCOPED_MUTEX(lock, &tld->lock); + + while (!(tld->num_active == num_active && tld->num_idle == num_idle)) { + if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) { + break; + } + } + + if (tld->num_active != num_active && tld->num_idle != num_idle) { + ast_test_status_update(test, "Number of active threads and idle threads not what was expected.\n"); + ast_test_status_update(test, "Expected %d active threads but got %d\n", num_active, tld->num_active); + ast_test_status_update(test, "Expected %d idle threads but got %d\n", num_idle, tld->num_idle); + res = AST_TEST_FAIL; + } + + return res; +} + +static void wait_for_task_pushed(struct ast_threadpool_listener *listener) +{ + struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener); + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 5, + .tv_nsec = start.tv_usec * 1000 + }; + SCOPED_MUTEX(lock, &tld->lock); + + while (!tld->task_pushed) { + if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) { + break; + } + } +} + +static enum ast_test_result_state wait_for_completion(struct ast_test *test, struct simple_task_data *std) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 5, + .tv_nsec = start.tv_usec * 1000 + }; + enum ast_test_result_state res = AST_TEST_PASS; + SCOPED_MUTEX(lock, &std->lock); + + while (!std->task_executed) { + if (ast_cond_timedwait(&std->cond, lock, &end) == ETIMEDOUT) { + break; + } + } + + if (!std->task_executed) { + ast_test_status_update(test, "Task execution did not occur\n"); + res = AST_TEST_FAIL; + } + return res; +} + +static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, struct test_listener_data *tld) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 5, + .tv_nsec = start.tv_usec * 1000 + }; + enum ast_test_result_state res = AST_TEST_PASS; + SCOPED_MUTEX(lock, &tld->lock); + + while (!tld->empty_notice) { + if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) { + break; + } + } + + if (!tld->empty_notice) { + ast_test_status_update(test, "Test listener not notified that threadpool is empty\n"); + res = AST_TEST_FAIL; + } + + return res; +} + +static enum ast_test_result_state listener_check( + struct ast_test *test, + struct ast_threadpool_listener *listener, + int task_pushed, + int was_empty, + int num_tasks, + int num_active, + int num_idle, + int empty_notice) +{ + struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener); + enum ast_test_result_state res = AST_TEST_PASS; + + if (tld->task_pushed != task_pushed) { + ast_test_status_update(test, "Expected task %sto be pushed, but it was%s\n", + task_pushed ? "" : "not ", tld->task_pushed ? "" : " not"); + res = AST_TEST_FAIL; + } + if (tld->was_empty != was_empty) { + ast_test_status_update(test, "Expected %sto be empty, but it was%s\n", + was_empty ? "" : "not ", tld->was_empty ? "" : " not"); + res = AST_TEST_FAIL; + } + if (tld->num_tasks!= num_tasks) { + ast_test_status_update(test, "Expected %d tasks to be pushed, but got %d\n", + num_tasks, tld->num_tasks); + res = AST_TEST_FAIL; + } + if (tld->num_active != num_active) { + ast_test_status_update(test, "Expected %d active threads, but got %d\n", + num_active, tld->num_active); + res = AST_TEST_FAIL; + } + if (tld->num_idle != num_idle) { + ast_test_status_update(test, "Expected %d idle threads, but got %d\n", + num_idle, tld->num_idle); + res = AST_TEST_FAIL; + } + if (tld->empty_notice != empty_notice) { + ast_test_status_update(test, "Expected %s empty notice, but got %s\n", + was_empty ? "an" : "no", tld->task_pushed ? "one" : "none"); + res = AST_TEST_FAIL; + } + + return res; +} + +AST_TEST_DEFINE(threadpool_push) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct simple_task_data *std = NULL; + struct test_listener_data *tld = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 0, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "push"; + info->category = "/main/threadpool/"; + info->summary = "Test task"; + info->description = + "Basic threadpool test"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + std = simple_task_data_alloc(); + if (!std) { + goto end; + } + + ast_threadpool_push(pool, simple_task, std); + + wait_for_task_pushed(listener); + + res = listener_check(test, listener, 1, 1, 1, 0, 0, 0); + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(std); + ast_free(tld); + return res; +} + +AST_TEST_DEFINE(threadpool_initial_threads) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 3, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "initial_threads"; + info->category = "/main/threadpool/"; + info->summary = "Test threadpool initialization state"; + info->description = + "Ensure that a threadpool created with a specific size contains the\n" + "proper number of idle threads."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + res = wait_until_thread_state(test, tld, 0, 3); + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(tld); + return res; +} + + +AST_TEST_DEFINE(threadpool_thread_creation) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 0, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "thread_creation"; + info->category = "/main/threadpool/"; + info->summary = "Test threadpool thread creation"; + info->description = + "Ensure that threads can be added to a threadpool"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + /* Now let's create a thread. It should start active, then go + * idle immediately + */ + ast_threadpool_set_size(pool, 1); + + res = wait_until_thread_state(test, tld, 0, 1); + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(tld); + return res; +} + +AST_TEST_DEFINE(threadpool_thread_destruction) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 0, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "thread_destruction"; + info->category = "/main/threadpool/"; + info->summary = "Test threadpool thread destruction"; + info->description = + "Ensure that threads are properly destroyed in a threadpool"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + ast_threadpool_set_size(pool, 3); + + res = wait_until_thread_state(test, tld, 0, 3); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 0, 0, 0, 0, 3, 0); + if (res == AST_TEST_FAIL) { + goto end; + } + + ast_threadpool_set_size(pool, 2); + + res = wait_until_thread_state(test, tld, 0, 2); + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(tld); + return res; +} + +AST_TEST_DEFINE(threadpool_thread_timeout) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 2, + .auto_increment = 0, + .initial_size = 0, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "thread_timeout"; + info->category = "/main/threadpool/"; + info->summary = "Test threadpool thread timeout"; + info->description = + "Ensure that a thread with a two second timeout dies as expected."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + ast_threadpool_set_size(pool, 1); + + res = wait_until_thread_state(test, tld, 0, 1); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 0, 0, 0, 0, 1, 0); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_until_thread_state(test, tld, 0, 0); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 0, 0, 0, 0, 0, 0); + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(tld); + return res; +} + +AST_TEST_DEFINE(threadpool_one_task_one_thread) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct simple_task_data *std = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 0, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "one_task_one_thread"; + info->category = "/main/threadpool/"; + info->summary = "Test a single task with a single thread"; + info->description = + "Push a task into an empty threadpool, then add a thread to the pool."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + std = simple_task_data_alloc(); + if (!std) { + goto end; + } + + ast_threadpool_push(pool, simple_task, std); + + ast_threadpool_set_size(pool, 1); + + /* Threads added to the pool are active when they start, + * so the newly-created thread should immediately execute + * the waiting task. + */ + res = wait_for_completion(test, std); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_for_empty_notice(test, tld); + if (res == AST_TEST_FAIL) { + goto end; + } + + /* After completing the task, the thread should go idle */ + res = wait_until_thread_state(test, tld, 0, 1); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 1, 1, 1, 0, 1, 1); + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(std); + ast_free(tld); + return res; + +} + +AST_TEST_DEFINE(threadpool_one_thread_one_task) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct simple_task_data *std = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 0, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "one_thread_one_task"; + info->category = "/main/threadpool/"; + info->summary = "Test a single thread with a single task"; + info->description = + "Add a thread to the pool and then push a task to it."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + std = simple_task_data_alloc(); + if (!std) { + goto end; + } + + ast_threadpool_set_size(pool, 1); + + res = wait_until_thread_state(test, tld, 0, 1); + if (res == AST_TEST_FAIL) { + goto end; + } + + ast_threadpool_push(pool, simple_task, std); + + res = wait_for_completion(test, std); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_for_empty_notice(test, tld); + if (res == AST_TEST_FAIL) { + goto end; + } + + /* After completing the task, the thread should go idle */ + res = wait_until_thread_state(test, tld, 0, 1); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 1, 1, 1, 0, 1, 1); + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(std); + ast_free(tld); + return res; +} + +AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct simple_task_data *std1 = NULL; + struct simple_task_data *std2 = NULL; + struct simple_task_data *std3 = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 0, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "one_thread_multiple_tasks"; + info->category = "/main/threadpool/"; + info->summary = "Test a single thread with multiple tasks"; + info->description = + "Add a thread to the pool and then push three tasks to it."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + std1 = simple_task_data_alloc(); + std2 = simple_task_data_alloc(); + std3 = simple_task_data_alloc(); + if (!std1 || !std2 || !std3) { + goto end; + } + + ast_threadpool_set_size(pool, 1); + + res = wait_until_thread_state(test, tld, 0, 1); + if (res == AST_TEST_FAIL) { + goto end; + } + + ast_threadpool_push(pool, simple_task, std1); + ast_threadpool_push(pool, simple_task, std2); + ast_threadpool_push(pool, simple_task, std3); + + res = wait_for_completion(test, std1); + if (res == AST_TEST_FAIL) { + goto end; + } + res = wait_for_completion(test, std2); + if (res == AST_TEST_FAIL) { + goto end; + } + res = wait_for_completion(test, std3); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_for_empty_notice(test, tld); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_until_thread_state(test, tld, 0, 1); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 1, 0, 3, 0, 1, 1); + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(std1); + ast_free(std2); + ast_free(std3); + ast_free(tld); + return res; +} + +AST_TEST_DEFINE(threadpool_auto_increment) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct simple_task_data *std1 = NULL; + struct simple_task_data *std2 = NULL; + struct simple_task_data *std3 = NULL; + struct simple_task_data *std4 = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 3, + .initial_size = 0, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "auto_increment"; + info->category = "/main/threadpool/"; + info->summary = "Test that the threadpool grows as tasks are added"; + info->description = + "Create an empty threadpool and push a task to it. Once the task is\n" + "pushed, the threadpool should add three threads and be able to\n" + "handle the task. The threads should then go idle\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + std1 = simple_task_data_alloc(); + std2 = simple_task_data_alloc(); + std3 = simple_task_data_alloc(); + std4 = simple_task_data_alloc(); + if (!std1 || !std2 || !std3 || !std4) { + goto end; + } + + ast_threadpool_push(pool, simple_task, std1); + + /* Pushing the task should result in the threadpool growing + * by three threads. This will allow the task to actually execute + */ + res = wait_for_completion(test, std1); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_for_empty_notice(test, tld); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_until_thread_state(test, tld, 0, 3); + if (res == AST_TEST_FAIL) { + goto end; + } + + /* Now push three tasks into the pool and ensure the pool does not + * grow. + */ + ast_threadpool_push(pool, simple_task, std2); + ast_threadpool_push(pool, simple_task, std3); + ast_threadpool_push(pool, simple_task, std4); + + res = wait_for_completion(test, std2); + if (res == AST_TEST_FAIL) { + goto end; + } + res = wait_for_completion(test, std3); + if (res == AST_TEST_FAIL) { + goto end; + } + res = wait_for_completion(test, std4); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_for_empty_notice(test, tld); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_until_thread_state(test, tld, 0, 3); + if (res == AST_TEST_FAIL) { + goto end; + } + res = listener_check(test, listener, 1, 0, 4, 0, 3, 1); + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(std1); + ast_free(std2); + ast_free(std3); + ast_free(std4); + ast_free(tld); + return res; +} + +AST_TEST_DEFINE(threadpool_max_size) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct simple_task_data *std = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 3, + .initial_size = 0, + .max_size = 2, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "max_size"; + info->category = "/main/threadpool/"; + info->summary = "Test that the threadpool does not exceed its maximum size restriction"; + info->description = + "Create an empty threadpool and push a task to it. Once the task is\n" + "pushed, the threadpool should attempt to grow by three threads, but the\n" + "pool's restrictions should only allow two threads to be added.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + std = simple_task_data_alloc(); + if (!std) { + goto end; + } + + ast_threadpool_push(pool, simple_task, std); + + res = wait_for_completion(test, std); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_until_thread_state(test, tld, 0, 2); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 1, 1, 1, 0, 2, 1); +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(std); + ast_free(tld); + return res; +} + +AST_TEST_DEFINE(threadpool_reactivation) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct simple_task_data *std1 = NULL; + struct simple_task_data *std2 = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 0, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "reactivation"; + info->category = "/main/threadpool/"; + info->summary = "Test that a threadpool reactivates when work is added"; + info->description = + "Push a task into a threadpool. Make sure the task executes and the\n" + "thread goes idle. Then push a second task and ensure that the thread\n" + "awakens and executes the second task.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + std1 = simple_task_data_alloc(); + std2 = simple_task_data_alloc(); + if (!std1 || !std2) { + goto end; + } + + ast_threadpool_push(pool, simple_task, std1); + + ast_threadpool_set_size(pool, 1); + + res = wait_for_completion(test, std1); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_for_empty_notice(test, tld); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_until_thread_state(test, tld, 0, 1); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 1, 1, 1, 0, 1, 1); + if (res == AST_TEST_FAIL) { + goto end; + } + + /* Now make sure the threadpool reactivates when we add a second task */ + ast_threadpool_push(pool, simple_task, std2); + + res = wait_for_completion(test, std2); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_for_empty_notice(test, tld); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_until_thread_state(test, tld, 0, 1); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 1, 1, 2, 0, 1, 1); + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(std1); + ast_free(std2); + ast_free(tld); + return res; + +} + +struct complex_task_data { + int task_executed; + int continue_task; + ast_mutex_t lock; + ast_cond_t stall_cond; + ast_cond_t done_cond; +}; + +static struct complex_task_data *complex_task_data_alloc(void) +{ + struct complex_task_data *ctd = ast_calloc(1, sizeof(*ctd)); + + if (!ctd) { + return NULL; + } + ast_mutex_init(&ctd->lock); + ast_cond_init(&ctd->stall_cond, NULL); + ast_cond_init(&ctd->done_cond, NULL); + return ctd; +} + +static int complex_task(void *data) +{ + struct complex_task_data *ctd = data; + SCOPED_MUTEX(lock, &ctd->lock); + while (!ctd->continue_task) { + ast_cond_wait(&ctd->stall_cond, lock); + } + /* We got poked. Finish up */ + ctd->task_executed = 1; + ast_cond_signal(&ctd->done_cond); + return 0; +} + +static void poke_worker(struct complex_task_data *ctd) +{ + SCOPED_MUTEX(lock, &ctd->lock); + ctd->continue_task = 1; + ast_cond_signal(&ctd->stall_cond); +} + +static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd) +{ + struct timeval start = ast_tvnow(); + struct timespec end = { + .tv_sec = start.tv_sec + 5, + .tv_nsec = start.tv_usec * 1000 + }; + enum ast_test_result_state res = AST_TEST_PASS; + SCOPED_MUTEX(lock, &ctd->lock); + + while (!ctd->task_executed) { + if (ast_cond_timedwait(&ctd->done_cond, lock, &end) == ETIMEDOUT) { + break; + } + } + + if (!ctd->task_executed) { + res = AST_TEST_FAIL; + } + return res; +} + +AST_TEST_DEFINE(threadpool_task_distribution) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct complex_task_data *ctd1 = NULL; + struct complex_task_data *ctd2 = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 0, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "task_distribution"; + info->category = "/main/threadpool/"; + info->summary = "Test that tasks are evenly distributed to threads"; + info->description = + "Push two tasks into a threadpool. Ensure that each is handled by\n" + "a separate thread\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + ctd1 = complex_task_data_alloc(); + ctd2 = complex_task_data_alloc(); + if (!ctd1 || !ctd2) { + goto end; + } + + ast_threadpool_push(pool, complex_task, ctd1); + ast_threadpool_push(pool, complex_task, ctd2); + + ast_threadpool_set_size(pool, 2); + + res = wait_until_thread_state(test, tld, 2, 0); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 1, 0, 2, 2, 0, 0); + if (res == AST_TEST_FAIL) { + goto end; + } + + /* The tasks are stalled until we poke them */ + poke_worker(ctd1); + poke_worker(ctd2); + + res = wait_for_complex_completion(ctd1); + if (res == AST_TEST_FAIL) { + goto end; + } + res = wait_for_complex_completion(ctd2); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_until_thread_state(test, tld, 0, 2); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 1, 0, 2, 0, 2, 1); + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(ctd1); + ast_free(ctd2); + ast_free(tld); + return res; +} + +AST_TEST_DEFINE(threadpool_more_destruction) +{ + struct ast_threadpool *pool = NULL; + struct ast_threadpool_listener *listener = NULL; + struct complex_task_data *ctd1 = NULL; + struct complex_task_data *ctd2 = NULL; + enum ast_test_result_state res = AST_TEST_FAIL; + struct test_listener_data *tld = NULL; + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 0, + .max_size = 0, + }; + + switch (cmd) { + case TEST_INIT: + info->name = "more_destruction"; + info->category = "/main/threadpool/"; + info->summary = "Test that threads are destroyed as expected"; + info->description = + "Push two tasks into a threadpool. Set the threadpool size to 4\n" + "Ensure that there are 2 active and 2 idle threads. Then shrink the\n" + "threadpool down to 1 thread. Ensure that the thread leftover is active\n" + "and ensure that both tasks complete.\n"; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tld = test_alloc(); + if (!tld) { + return AST_TEST_FAIL; + } + + listener = ast_threadpool_listener_alloc(&test_callbacks, tld); + if (!listener) { + goto end; + } + + pool = ast_threadpool_create(info->name, listener, &options); + if (!pool) { + goto end; + } + + ctd1 = complex_task_data_alloc(); + ctd2 = complex_task_data_alloc(); + if (!ctd1 || !ctd2) { + goto end; + } + + ast_threadpool_push(pool, complex_task, ctd1); + ast_threadpool_push(pool, complex_task, ctd2); + + ast_threadpool_set_size(pool, 4); + + res = wait_until_thread_state(test, tld, 2, 2); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 1, 0, 2, 2, 2, 0); + if (res == AST_TEST_FAIL) { + goto end; + } + + ast_threadpool_set_size(pool, 1); + + /* Shrinking the threadpool should kill off the two idle threads + * and one of the active threads. + */ + res = wait_until_thread_state(test, tld, 1, 0); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 1, 0, 2, 1, 0, 0); + if (res == AST_TEST_FAIL) { + goto end; + } + + /* The tasks are stalled until we poke them */ + poke_worker(ctd1); + poke_worker(ctd2); + + res = wait_for_complex_completion(ctd1); + if (res == AST_TEST_FAIL) { + goto end; + } + res = wait_for_complex_completion(ctd2); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = wait_until_thread_state(test, tld, 0, 1); + if (res == AST_TEST_FAIL) { + goto end; + } + + res = listener_check(test, listener, 1, 0, 2, 0, 1, 1); + +end: + ast_threadpool_shutdown(pool); + ao2_cleanup(listener); + ast_free(ctd1); + ast_free(ctd2); + ast_free(tld); + return res; +} + +static int unload_module(void) +{ + ast_test_unregister(threadpool_push); + ast_test_unregister(threadpool_initial_threads); + ast_test_unregister(threadpool_thread_creation); + ast_test_unregister(threadpool_thread_destruction); + ast_test_unregister(threadpool_thread_timeout); + ast_test_unregister(threadpool_one_task_one_thread); + ast_test_unregister(threadpool_one_thread_one_task); + ast_test_unregister(threadpool_one_thread_multiple_tasks); + ast_test_unregister(threadpool_auto_increment); + ast_test_unregister(threadpool_max_size); + ast_test_unregister(threadpool_reactivation); + ast_test_unregister(threadpool_task_distribution); + ast_test_unregister(threadpool_more_destruction); + return 0; +} + +static int load_module(void) +{ + ast_test_register(threadpool_push); + ast_test_register(threadpool_initial_threads); + ast_test_register(threadpool_thread_creation); + ast_test_register(threadpool_thread_destruction); + ast_test_register(threadpool_thread_timeout); + ast_test_register(threadpool_one_task_one_thread); + ast_test_register(threadpool_one_thread_one_task); + ast_test_register(threadpool_one_thread_multiple_tasks); + ast_test_register(threadpool_auto_increment); + ast_test_register(threadpool_max_size); + ast_test_register(threadpool_reactivation); + ast_test_register(threadpool_task_distribution); + ast_test_register(threadpool_more_destruction); + return AST_MODULE_LOAD_SUCCESS; +} + +AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "threadpool test module");