Skip to content
Snippets Groups Projects
Commit a37fb2e8 authored by Mark Michelson's avatar Mark Michelson
Browse files

Add some doxygen and rearrange code.


git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377209 65c4cc65-6c06-0410-ace0-fbb531ad65f3
parent 47a9abde
No related branches found
No related tags found
No related merge requests found
...@@ -93,6 +93,19 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis ...@@ -93,6 +93,19 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis
*/ */
void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int size); void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int size);
/*!
* \brief Push a task to the threadpool
*
* Tasks pushed into the threadpool will be automatically taken by
* one of the threads within
* \param pool The threadpool to add the task to
* \param task The task to add
* \param data The parameter for the task
* \retval 0 success
* \retval -1 failure
*/
int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data);
/*! /*!
* \brief Shut down a threadpool and destroy it * \brief Shut down a threadpool and destroy it
* *
......
...@@ -26,85 +26,116 @@ ...@@ -26,85 +26,116 @@
#define THREAD_BUCKETS 89 #define THREAD_BUCKETS 89
static int id_counter; /*!
* \brief An opaque threadpool structure
*
* A threadpool is a collection of threads that execute
* tasks from a common queue.
*/
struct ast_threadpool { struct ast_threadpool {
/*! Threadpool listener */
struct ast_threadpool_listener *listener; struct ast_threadpool_listener *listener;
/*!
* \brief The container of active threads.
* Active threads are those that are currently running tasks
*/
struct ao2_container *active_threads; struct ao2_container *active_threads;
/*!
* \brief The container of idle threads.
* Idle threads are those that are currenly waiting to run tasks
*/
struct ao2_container *idle_threads; struct ao2_container *idle_threads;
/*!
* \brief The container of zombie threads.
* Zombie threads may be running tasks, but they are scheduled to die soon
*/
struct ao2_container *zombie_threads; struct ao2_container *zombie_threads;
/*!
* \brief The main taskprocessor
*
* Tasks that are queued in this taskprocessor are
* doled out to the worker threads. Worker threads that
* execute tasks from the threadpool are executing tasks
* in this taskprocessor.
*
* The threadpool itself is actually the private data for
* this taskprocessor's listener. This way, as taskprocessor
* changes occur, the threadpool can alert its listeners
* appropriately.
*/
struct ast_taskprocessor *tps; struct ast_taskprocessor *tps;
/*!
* \brief The control taskprocessor
*
* This is a standard taskprocessor that uses the default
* taskprocessor listener. In other words, all tasks queued to
* this taskprocessor have a single thread that executes the
* tasks.
*
* All tasks that modify the state of the threadpool and all tasks
* that call out to threadpool listeners are pushed to this
* taskprocessor.
*
* For instance, when the threadpool changes sizes, a task is put
* into this taskprocessor to do so. When it comes time to tell the
* threadpool listener that worker threads have changed state,
* the task is placed in this taskprocessor.
*
* This is done for three main reasons
* 1) It ensures that listeners are given an accurate portrayal
* of the threadpool's current state. In other words, when a listener
* gets told a count of active, idle and zombie threads, it does not
* need to worry that internal state of the threadpool might be different
* from what it has been told.
* 2) It minimizes the locking required in both the threadpool and in
* threadpool listener's callbacks.
* 3) It ensures that listener callbacks are called in the same order
* that the threadpool had its state change.
*/
struct ast_taskprocessor *control_tps; struct ast_taskprocessor *control_tps;
}; };
/*!
* \brief states for worker threads
*/
enum worker_state { enum worker_state {
/*! The worker is either active or idle */
ALIVE, ALIVE,
/*!
* The worker has been asked to shut down but
* may still be in the process of executing tasks.
* This transition happens when the threadpool needs
* to shrink and needs to kill active threads in order
* to do so.
*/
ZOMBIE, ZOMBIE,
/*!
* The worker has been asked to shut down. Typically
* only idle threads go to this state directly, but
* active threads may go straight to this state when
* the threadpool is shut down.
*/
DEAD, DEAD,
}; };
struct worker_thread { /* Worker thread forward declarations. See definitions for documentation */
int id; struct worker_thread;
ast_cond_t cond; static int worker_thread_hash(const void *obj, int flags);
ast_mutex_t lock; static int worker_thread_cmp(void *obj, void *arg, int flags);
pthread_t thread; static void worker_thread_destroy(void *obj);
struct ast_threadpool *pool; static void worker_active(struct worker_thread *worker);
enum worker_state state; static void *worker_start(void *arg);
int wake_up; static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
}; static int worker_idle(struct worker_thread *worker);
static void worker_set_state(struct worker_thread *worker, enum worker_state state);
static int worker_thread_hash(const void *obj, int flags) static int worker_shutdown(void *obj, void *arg, int flags);
{
const struct worker_thread *worker = obj; /*!
* \brief Notify the threadpool listener that the state has changed.
return worker->id; *
} * This notifies the threadpool listener via its state_changed callback.
* \param pool The threadpool whose state has changed
static int worker_thread_cmp(void *obj, void *arg, int flags) */
{
struct worker_thread *worker1 = obj;
struct worker_thread *worker2 = arg;
return worker1->id == worker2->id ? CMP_MATCH : 0;
}
static void worker_thread_destroy(void *obj)
{
struct worker_thread *worker = obj;
ast_mutex_destroy(&worker->lock);
ast_cond_destroy(&worker->cond);
}
static int worker_active(struct worker_thread *worker);
static void *worker_start(void *arg)
{
struct worker_thread *worker = arg;
worker_active(worker);
return NULL;
}
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
{
struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
if (!worker) {
return NULL;
}
worker->id = ast_atomic_fetchadd_int(&id_counter, 1);
ast_mutex_init(&worker->lock);
ast_cond_init(&worker->cond, NULL);
worker->pool = pool;
worker->thread = AST_PTHREADT_NULL;
worker->state = ALIVE;
if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
ast_log(LOG_ERROR, "Unable to start worker thread!\n");
ao2_ref(worker, -1);
return NULL;
}
return worker;
}
static void threadpool_send_state_changed(struct ast_threadpool *pool) static void threadpool_send_state_changed(struct ast_threadpool *pool)
{ {
int active_size = ao2_container_count(pool->active_threads); int active_size = ao2_container_count(pool->active_threads);
...@@ -114,11 +145,19 @@ static void threadpool_send_state_changed(struct ast_threadpool *pool) ...@@ -114,11 +145,19 @@ static void threadpool_send_state_changed(struct ast_threadpool *pool)
pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size, zombie_size); pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size, zombie_size);
} }
/*!
* \brief Struct used for queued operations involving worker state changes
*/
struct thread_worker_pair { struct thread_worker_pair {
/*! Threadpool that contains the worker whose state has changed */
struct ast_threadpool *pool; struct ast_threadpool *pool;
/*! Worker whose state has changed */
struct worker_thread *worker; struct worker_thread *worker;
}; };
/*!
* \brief Destructor for thread_worker_pair
*/
static void thread_worker_pair_destructor(void *obj) static void thread_worker_pair_destructor(void *obj)
{ {
struct thread_worker_pair *pair = obj; struct thread_worker_pair *pair = obj;
...@@ -126,6 +165,11 @@ static void thread_worker_pair_destructor(void *obj) ...@@ -126,6 +165,11 @@ static void thread_worker_pair_destructor(void *obj)
ao2_ref(pair->worker, -1); ao2_ref(pair->worker, -1);
} }
/*!
* \brief Allocate and initialize a thread_worker_pair
* \param pool Threadpool to assign to the thread_worker_pair
* \param worker Worker thread to assign to the thread_worker_pair
*/
static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool, static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
struct worker_thread *worker) struct worker_thread *worker)
{ {
...@@ -140,6 +184,13 @@ static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool ...@@ -140,6 +184,13 @@ static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool
return pair; return pair;
} }
/*!
* \brief Move a worker thread from the active container to the idle container.
*
* This function is called from the threadpool's control taskprocessor thread.
* \param data A thread_worker_pair containing the threadpool and the worker to move.
* \return 0
*/
static int queued_active_thread_idle(void *data) static int queued_active_thread_idle(void *data)
{ {
struct thread_worker_pair *pair = data; struct thread_worker_pair *pair = data;
...@@ -153,6 +204,14 @@ static int queued_active_thread_idle(void *data) ...@@ -153,6 +204,14 @@ static int queued_active_thread_idle(void *data)
return 0; return 0;
} }
/*!
* \brief Queue a task to move a thread from the active list to the idle list
*
* This is called by a worker thread when it runs out of tasks to perform and
* goes idle.
* \param pool The threadpool to which the worker belongs
* \param worker The worker thread that has gone idle
*/
static void threadpool_active_thread_idle(struct ast_threadpool *pool, static void threadpool_active_thread_idle(struct ast_threadpool *pool,
struct worker_thread *worker) struct worker_thread *worker)
{ {
...@@ -163,17 +222,31 @@ static void threadpool_active_thread_idle(struct ast_threadpool *pool, ...@@ -163,17 +222,31 @@ static void threadpool_active_thread_idle(struct ast_threadpool *pool,
ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair); ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
} }
/*!
* \brief Kill a zombie thread
*
* This runs from the threadpool's control taskprocessor thread.
*
* \param data A thread_worker_pair containing the threadpool and the zombie thread
* \return 0
*/
static int queued_zombie_thread_dead(void *data) static int queued_zombie_thread_dead(void *data)
{ {
struct thread_worker_pair *pair = data; struct thread_worker_pair *pair = data;
ao2_unlink(pair->pool->zombie_threads, pair->worker); ao2_unlink(pair->pool, pair->worker);
threadpool_send_state_changed(pair->pool); threadpool_send_state_changed(pair->pool);
ao2_ref(pair, -1); ao2_ref(pair, -1);
return 0; return 0;
} }
/*!
* \brief Queue a task to kill a zombie thread
*
* This is called by a worker thread when it acknowledges that it is time for
* it to die.
*/
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
struct worker_thread *worker) struct worker_thread *worker)
{ {
...@@ -184,83 +257,56 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, ...@@ -184,83 +257,56 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair); ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
} }
static int worker_idle(struct worker_thread *worker) /*!
{ * \brief Execute a task in the threadpool
SCOPED_MUTEX(lock, &worker->lock); *
if (worker->state != ALIVE) { * This is the function that worker threads call in order to execute tasks
return 0; * in the threadpool
} *
threadpool_active_thread_idle(worker->pool, worker); * \param pool The pool to which the tasks belong.
while (!worker->wake_up) { * \retval 0 Either the pool has been shut down or there are no tasks.
ast_cond_wait(&worker->cond, lock); * \retval 1 There are still tasks remaining in the pool.
} */
worker->wake_up = 0;
return worker->state == ALIVE;
}
static int threadpool_execute(struct ast_threadpool *pool) static int threadpool_execute(struct ast_threadpool *pool)
{ {
return ast_taskprocessor_execute(pool->tps); return ast_taskprocessor_execute(pool->tps);
} }
static int worker_active(struct worker_thread *worker) /*!
{ * \brief Destroy a threadpool's components.
int alive = 1; *
while (alive) { * This is the destructor called automatically when the threadpool's
if (threadpool_execute(worker->pool)) { * reference count reaches zero. This is not to be confused with
alive = worker_idle(worker); * threadpool_destroy.
} *
} * By the time this actually gets called, most of the cleanup has already
* been done in the pool. The only thing left to do is to release the
/* Reaching this portion means the thread is * final reference to the threadpool listener.
* on death's door. It may have been killed while *
* it was idle, in which case it can just die * \param obj The pool to destroy
* peacefully. If it's a zombie, though, then */
* it needs to let the pool know so static void threadpool_destructor(void *obj)
* that the thread can be removed from the
* list of zombie threads.
*/
if (worker->state == ZOMBIE) {
threadpool_zombie_thread_dead(worker->pool, worker);
}
return 0;
}
static void worker_set_state(struct worker_thread *worker, enum worker_state state)
{
SCOPED_MUTEX(lock, &worker->lock);
worker->state = state;
worker->wake_up = 1;
ast_cond_signal(&worker->cond);
}
static int worker_shutdown(void *obj, void *arg, int flags)
{
struct worker_thread *worker = obj;
worker_set_state(worker, DEAD);
if (worker->thread != AST_PTHREADT_NULL) {
pthread_join(worker->thread, NULL);
worker->thread = AST_PTHREADT_NULL;
}
return 0;
}
static void threadpool_destructor(void *private_data)
{ {
struct ast_threadpool *pool = private_data; struct ast_threadpool *pool = obj;
/* XXX Probably should let the listener know we're being destroyed? */ /* XXX Probably should let the listener know we're being destroyed? */
/* Threads should all be shut down by now, so this should be a painless /* Threads should all be shut down by now, so this should be a painless
* operation * operation
*/ */
ao2_cleanup(pool->active_threads);
ao2_cleanup(pool->idle_threads);
ao2_cleanup(pool->zombie_threads);
ao2_cleanup(pool->listener); ao2_cleanup(pool->listener);
} }
/*
* \brief Allocate a threadpool
*
* This is implemented as a taskprocessor listener's alloc callback. This
* is because the threadpool exists as the private data on a taskprocessor
* listener.
*
* \param listener The taskprocessor listener where the threadpool will live.
* \retval NULL Could not initialize threadpool properly
* \retval non-NULL The newly-allocated threadpool
*/
static void *threadpool_alloc(struct ast_taskprocessor_listener *listener) static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
{ {
RAII_VAR(struct ast_threadpool *, pool, RAII_VAR(struct ast_threadpool *, pool,
...@@ -289,17 +335,32 @@ static void *threadpool_alloc(struct ast_taskprocessor_listener *listener) ...@@ -289,17 +335,32 @@ static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
return pool; return pool;
} }
/*!
* \brief helper used for queued task when tasks are pushed
*/
struct task_pushed_data { struct task_pushed_data {
/*! Pool into which a task was pushed */
struct ast_threadpool *pool; struct ast_threadpool *pool;
/*! Indicator of whether the pool had no tasks prior to the new task being added */
int was_empty; int was_empty;
}; };
/*!
* \brief Destructor for task_pushed_data
*/
static void task_pushed_data_destroy(void *obj) static void task_pushed_data_destroy(void *obj)
{ {
struct task_pushed_data *tpd = obj; struct task_pushed_data *tpd = obj;
ao2_ref(tpd->pool, -1); ao2_ref(tpd->pool, -1);
} }
/*!
* \brief Allocate and initialize a task_pushed_data
* \param pool The threadpool to set in the task_pushed_data
* \param was_empty The was_empty value to set in the task_pushed_data
* \retval NULL Unable to allocate task_pushed_data
* \retval non-NULL The newly-allocated task_pushed_data
*/
static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool, static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
int was_empty) int was_empty)
{ {
...@@ -315,6 +376,18 @@ static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *po ...@@ -315,6 +376,18 @@ static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *po
return tpd; return tpd;
} }
/*!
* \brief Activate idle threads
*
* This function always returns CMP_MATCH because all threads that this
* function acts on need to be seen as matches so they are unlinked from the
* list of idle threads.
*
* Called as an ao2_callback in the threadpool's control taskprocessor thread.
* \param obj The worker to activate
* \param arg The pool where the worker belongs
* \retval CMP_MATCH
*/
static int activate_threads(void *obj, void *arg, int flags) static int activate_threads(void *obj, void *arg, int flags)
{ {
struct worker_thread *worker = obj; struct worker_thread *worker = obj;
...@@ -322,9 +395,18 @@ static int activate_threads(void *obj, void *arg, int flags) ...@@ -322,9 +395,18 @@ static int activate_threads(void *obj, void *arg, int flags)
ao2_link(pool->active_threads, worker); ao2_link(pool->active_threads, worker);
worker_set_state(worker, ALIVE); worker_set_state(worker, ALIVE);
return 0; return CMP_MATCH;
} }
/*!
* \brief Queue task called when tasks are pushed into the threadpool
*
* This function first calls into the threadpool's listener to let it know
* that a task has been pushed. It then wakes up all idle threads and moves
* them into the active thread container.
* \param data A task_pushed_data
* \return 0
*/
static int handle_task_pushed(void *data) static int handle_task_pushed(void *data)
{ {
struct task_pushed_data *tpd = data; struct task_pushed_data *tpd = data;
...@@ -332,11 +414,21 @@ static int handle_task_pushed(void *data) ...@@ -332,11 +414,21 @@ static int handle_task_pushed(void *data)
int was_empty = tpd->was_empty; int was_empty = tpd->was_empty;
pool->listener->callbacks->tps_task_pushed(pool->listener, was_empty); pool->listener->callbacks->tps_task_pushed(pool->listener, was_empty);
ao2_callback(pool->idle_threads, OBJ_UNLINK, activate_threads, pool); ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
activate_threads, pool);
ao2_ref(tpd, -1); ao2_ref(tpd, -1);
return 0; return 0;
} }
/*!
* \brief Taskprocessor listener callback called when a task is added
*
* The threadpool uses this opportunity to queue a task on its control taskprocessor
* in order to activate idle threads and notify the threadpool listener that the
* task has been pushed.
* \param listener The taskprocessor listener. The threadpool is the listener's private data
* \param was_empty True if the taskprocessor was empty prior to the task being pushed
*/
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
int was_empty) int was_empty)
{ {
...@@ -350,6 +442,13 @@ static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listen ...@@ -350,6 +442,13 @@ static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listen
ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd); ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd);
} }
/*!
* \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
*
* This simply lets the threadpool's listener know that the threadpool is devoid of tasks
* \param data The pool that has become empty
* \return 0
*/
static int handle_emptied(void *data) static int handle_emptied(void *data)
{ {
struct ast_threadpool *pool = data; struct ast_threadpool *pool = data;
...@@ -359,6 +458,13 @@ static int handle_emptied(void *data) ...@@ -359,6 +458,13 @@ static int handle_emptied(void *data)
return 0; return 0;
} }
/*!
* \brief Taskprocessor listener emptied callback
*
* The threadpool queues a task to let the threadpool listener know that
* the threadpool no longer contains any tasks.
* \param listener The taskprocessor listener. The threadpool is the listener's private data.
*/
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
{ {
struct ast_threadpool *pool = listener->private_data; struct ast_threadpool *pool = listener->private_data;
...@@ -367,34 +473,44 @@ static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) ...@@ -367,34 +473,44 @@ static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
ast_taskprocessor_push(pool->control_tps, handle_emptied, pool); ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
} }
/*!
* \brief Taskprocessor listener shutdown callback
*
* The threadpool will shut down and destroy all of its worker threads when
* this is called back. By the time this gets called, the taskprocessor's
* control taskprocessor has already been destroyed. Therefore there is no risk
* in outright destroying the worker threads here.
* \param listener The taskprocessor listener. The threadpool is the listener's private data.
*/
static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener) static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
{ {
/*
* The threadpool triggers the taskprocessor to shut down. As a result,
* we have the freedom of shutting things down in three stages:
*
* 1) Before the tasprocessor is shut down
* 2) During taskprocessor shutdown (here)
* 3) After taskprocessor shutdown
*
* In the spirit of the taskprocessor shutdown, this would be
* where we make sure that all the worker threads are no longer
* executing. We could just do this before we even shut down
* the taskprocessor, but this feels more "right".
*/
struct ast_threadpool *pool = listener->private_data; struct ast_threadpool *pool = listener->private_data;
ao2_callback(pool->active_threads, 0, worker_shutdown, NULL); int flags = (OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE);
ao2_callback(pool->idle_threads, 0, worker_shutdown, NULL);
ao2_callback(pool->zombie_threads, 0, worker_shutdown, NULL); ao2_cleanup(pool->active_threads);
ao2_cleanup(pool->idle_threads);
ao2_cleanup(pool->zombie_threads);
} }
/*!
* \brief Taskprocessor listener destroy callback
*
* Since the threadpool is an ao2 object, all that is necessary is to
* decrease the refcount. Since the control taskprocessor should already
* be destroyed by this point, this should be the final reference to the
* threadpool.
*
* \param private_data The threadpool to destroy
*/
static void threadpool_destroy(void *private_data) static void threadpool_destroy(void *private_data)
{ {
struct ast_threadpool *pool = private_data; struct ast_threadpool *pool = private_data;
ao2_cleanup(pool); ao2_cleanup(pool);
} }
/*!
* \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
*/
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = { static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
.alloc = threadpool_alloc, .alloc = threadpool_alloc,
.task_pushed = threadpool_tps_task_pushed, .task_pushed = threadpool_tps_task_pushed,
...@@ -403,6 +519,13 @@ static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callb ...@@ -403,6 +519,13 @@ static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callb
.destroy = threadpool_destroy, .destroy = threadpool_destroy,
}; };
/*!
* \brief Add threads to the threadpool
*
* This function is called from the threadpool's control taskprocessor thread.
* \param pool The pool that is expanding
* \delta The number of threads to add to the pool
*/
static void grow(struct ast_threadpool *pool, int delta) static void grow(struct ast_threadpool *pool, int delta)
{ {
int i; int i;
...@@ -415,19 +538,49 @@ static void grow(struct ast_threadpool *pool, int delta) ...@@ -415,19 +538,49 @@ static void grow(struct ast_threadpool *pool, int delta)
} }
} }
/*!
* \brief ao2 callback to kill a set number of threads.
*
* Threads will be unlinked from the container as long as the
* counter has not reached zero. The counter is decremented with
* each thread that is removed.
* \param obj The worker thread up for possible destruction
* \param arg The counter
* \param flags Unused
* \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
* \retval CMP_STOP The counter has reached zero so no more threads should be removed.
*/
static int kill_threads(void *obj, void *arg, int flags) static int kill_threads(void *obj, void *arg, int flags)
{ {
struct worker_thread *worker = obj; struct worker_thread *worker = obj;
int *num_to_kill = arg; int *num_to_kill = arg;
if ((*num_to_kill)-- > 0) { if ((*num_to_kill)-- > 0) {
worker_shutdown(worker, arg, flags);
return CMP_MATCH; return CMP_MATCH;
} else { } else {
return CMP_STOP; return CMP_STOP;
} }
} }
/*!
* \brief ao2 callback to zombify a set number of threads.
*
* Threads will be zombified as long as as the counter has not reached
* zero. The counter is decremented with each thread that is zombified.
*
* Zombifying a thread involves removing it from its current container,
* adding it to the zombie container, and changing the state of the
* worker to a zombie
*
* This callback is called from the threadpool control taskprocessor thread.
*
* \param obj The worker thread that may be zombified
* \param arg The pool to which the worker belongs
* \param data The counter
* \param flags Unused
* \retval CMP_MATCH The zombified thread should be removed from its current container
* \retval CMP_STOP Stop attempting to zombify threads
*/
static int zombify_threads(void *obj, void *arg, void *data, int flags) static int zombify_threads(void *obj, void *arg, void *data, int flags)
{ {
struct worker_thread *worker = obj; struct worker_thread *worker = obj;
...@@ -443,6 +596,18 @@ static int zombify_threads(void *obj, void *arg, void *data, int flags) ...@@ -443,6 +596,18 @@ static int zombify_threads(void *obj, void *arg, void *data, int flags)
} }
} }
/*!
* \brief Remove threads from the threadpool
*
* The preference is to kill idle threads. However, if there are
* more threads to remove than there are idle threads, then active
* threads will be zombified instead.
*
* This function is called from the threadpool control taskprocessor thread.
*
* \param pool The threadpool to remove threads from
* \param delta The number of threads to remove
*/
static void shrink(struct ast_threadpool *pool, int delta) static void shrink(struct ast_threadpool *pool, int delta)
{ {
/* /*
...@@ -454,24 +619,38 @@ static void shrink(struct ast_threadpool *pool, int delta) ...@@ -454,24 +619,38 @@ static void shrink(struct ast_threadpool *pool, int delta)
int idle_threads_to_kill = MIN(delta, idle_threads); int idle_threads_to_kill = MIN(delta, idle_threads);
int active_threads_to_zombify = delta - idle_threads_to_kill; int active_threads_to_zombify = delta - idle_threads_to_kill;
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK, ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK,
kill_threads, &idle_threads_to_kill); kill_threads, &idle_threads_to_kill);
ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK, ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK,
zombify_threads, pool, &active_threads_to_zombify); zombify_threads, pool, &active_threads_to_zombify);
} }
/*!
* \brief Helper struct used for queued operations that change the size of the threadpool
*/
struct set_size_data { struct set_size_data {
/*! The pool whose size is to change */
struct ast_threadpool *pool; struct ast_threadpool *pool;
/*! The requested new size of the pool */
unsigned int size; unsigned int size;
}; };
/*!
* \brief Destructor for set_size_data
* \param obj The set_size_data to destroy
*/
static void set_size_data_destroy(void *obj) static void set_size_data_destroy(void *obj)
{ {
struct set_size_data *ssd = obj; struct set_size_data *ssd = obj;
ao2_ref(ssd->pool, -1); ao2_ref(ssd->pool, -1);
} }
/*!
* \brief Allocate and initialize a set_size_data
* \param pool The pool for the set_size_data
* \param size The size to store in the set_size_data
*/
static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool, static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
unsigned int size) unsigned int size)
{ {
...@@ -486,6 +665,17 @@ static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool, ...@@ -486,6 +665,17 @@ static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
return ssd; return ssd;
} }
/*!
* \brief Change the size of the threadpool
*
* This can either result in shrinking or growing the threadpool depending
* on the new desired size and the current size.
*
* This function is run from the threadpool control taskprocessor thread
*
* \param data A set_size_data used for determining how to act
* \return 0
*/
static int queued_set_size(void *data) static int queued_set_size(void *data)
{ {
struct set_size_data *ssd = data; struct set_size_data *ssd = data;
...@@ -548,12 +738,209 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis ...@@ -548,12 +738,209 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis
return pool; return pool;
} }
int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
{
return ast_taskprocessor_push(pool->tps, task, data);
}
void ast_threadpool_shutdown(struct ast_threadpool *pool) void ast_threadpool_shutdown(struct ast_threadpool *pool)
{ {
/* Pretty simple really. We just shut down the /* Shut down the taskprocessors and everything else just
* taskprocessors and everything else just
* takes care of itself via the taskprocessor callbacks * takes care of itself via the taskprocessor callbacks
*/ */
ast_taskprocessor_unreference(pool->control_tps); ast_taskprocessor_unreference(pool->control_tps);
ast_taskprocessor_unreference(pool->tps); ast_taskprocessor_unreference(pool->tps);
} }
/*!
* A thread that executes threadpool tasks
*/
struct worker_thread {
/*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
int id;
/*! Condition used in conjunction with state changes */
ast_cond_t cond;
/*! Lock used alongside the condition for state changes */
ast_mutex_t lock;
/*! The actual thread that is executing tasks */
pthread_t thread;
/*! A pointer to the threadpool. Needed to be able to execute tasks */
struct ast_threadpool *pool;
/*! The current state of the worker thread */
enum worker_state state;
/*! A boolean used to determine if an idle thread should become active */
int wake_up;
};
/*!
* A monotonically increasing integer used for worker
* thread identification.
*/
static int worker_id_counter;
static int worker_thread_hash(const void *obj, int flags)
{
const struct worker_thread *worker = obj;
return worker->id;
}
static int worker_thread_cmp(void *obj, void *arg, int flags)
{
struct worker_thread *worker1 = obj;
struct worker_thread *worker2 = arg;
return worker1->id == worker2->id ? CMP_MATCH : 0;
}
/*!
* \brief shut a worker thread down
*
* Set the worker dead and then wait for its thread
* to finish executing.
*
* \param worker The worker thread to shut down
*/
static void worker_shutdown(struct worker_thread *worker)
{
struct worker_thread *worker = obj;
worker_set_state(worker, DEAD);
if (worker->thread != AST_PTHREADT_NULL) {
pthread_join(worker->thread, NULL);
worker->thread = AST_PTHREADT_NULL;
}
}
/*!
* \brief Worker thread destructor
*
* Called automatically when refcount reaches 0. Shuts
* down the worker thread and destroys its component
* parts
*/
static void worker_thread_destroy(void *obj)
{
struct worker_thread *worker = obj;
worker_shutdown(worker);
ast_mutex_destroy(&worker->lock);
ast_cond_destroy(&worker->cond);
}
/*!
* \brief start point for worker threads
*
* Worker threads start in the active state but may
* immediately go idle if there is no work to be
* done
*
* \param arg The worker thread
* \retval NULL
*/
static void *worker_start(void *arg)
{
struct worker_thread *worker = arg;
worker_active(worker);
return NULL;
}
/*!
* \brief Allocate and initialize a new worker thread
*
* This will create, initialize, and start the thread.
*
* \param pool The threadpool to which the worker will be added
* \retval NULL Failed to allocate or start the worker thread
* \retval non-NULL The newly-created worker thread
*/
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
{
struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
if (!worker) {
return NULL;
}
worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1);
ast_mutex_init(&worker->lock);
ast_cond_init(&worker->cond, NULL);
worker->pool = pool;
worker->thread = AST_PTHREADT_NULL;
worker->state = ALIVE;
if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
ast_log(LOG_ERROR, "Unable to start worker thread!\n");
ao2_ref(worker, -1);
return NULL;
}
return worker;
}
/*!
* \brief Active loop for worker threads
*
* The worker will stay in this loop for its lifetime,
* executing tasks as they become available. If there
* are no tasks currently available, then the thread
* will go idle.
*
* \param worker The worker thread executing tasks.
*/
static void worker_active(struct worker_thread *worker)
{
int alive = 1;
while (alive) {
if (threadpool_execute(worker->pool)) {
alive = worker_idle(worker);
}
}
/* Reaching this portion means the thread is
* on death's door. It may have been killed while
* it was idle, in which case it can just die
* peacefully. If it's a zombie, though, then
* it needs to let the pool know so
* that the thread can be removed from the
* list of zombie threads.
*/
if (worker->state == ZOMBIE) {
threadpool_zombie_thread_dead(worker->pool, worker);
}
}
/*!
* \brief Idle function for worker threads
*
* The worker waits here until it gets told by the threadpool
* to wake up.
*
* \param worker The idle worker
* \retval 0 The thread is being woken up so that it can conclude.
* \retval non-zero The thread is being woken up to do more work.
*/
static int worker_idle(struct worker_thread *worker)
{
SCOPED_MUTEX(lock, &worker->lock);
if (worker->state != ALIVE) {
return 0;
}
threadpool_active_thread_idle(worker->pool, worker);
while (!worker->wake_up) {
ast_cond_wait(&worker->cond, lock);
}
worker->wake_up = 0;
return worker->state == ALIVE;
}
/*!
* \brief Change a worker's state
*
* The threadpool calls into this function in order to let a worker know
* how it should proceed.
*/
static void worker_set_state(struct worker_thread *worker, enum worker_state state)
{
SCOPED_MUTEX(lock, &worker->lock);
worker->state = state;
worker->wake_up = 1;
ast_cond_signal(&worker->cond);
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment