diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index d4c10b8d59617586a2e42b325a6a0b0d5ae815e5..8ff21849229294efdaea44c98e63a040881f3f7c 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -93,6 +93,19 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis */ void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int size); +/*! + * \brief Push a task to the threadpool + * + * Tasks pushed into the threadpool will be automatically taken by + * one of the threads within + * \param pool The threadpool to add the task to + * \param task The task to add + * \param data The parameter for the task + * \retval 0 success + * \retval -1 failure + */ +int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data); + /*! * \brief Shut down a threadpool and destroy it * diff --git a/main/threadpool.c b/main/threadpool.c index aa38ee5dbfd556322ffe95ac11556c7025f413b6..1c2e9634e5754a15b0bb3876905ff791e8468963 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -26,85 +26,116 @@ #define THREAD_BUCKETS 89 -static int id_counter; - +/*! + * \brief An opaque threadpool structure + * + * A threadpool is a collection of threads that execute + * tasks from a common queue. + */ struct ast_threadpool { + /*! Threadpool listener */ struct ast_threadpool_listener *listener; + /*! + * \brief The container of active threads. + * Active threads are those that are currently running tasks + */ struct ao2_container *active_threads; + /*! + * \brief The container of idle threads. + * Idle threads are those that are currenly waiting to run tasks + */ struct ao2_container *idle_threads; + /*! + * \brief The container of zombie threads. + * Zombie threads may be running tasks, but they are scheduled to die soon + */ struct ao2_container *zombie_threads; + /*! + * \brief The main taskprocessor + * + * Tasks that are queued in this taskprocessor are + * doled out to the worker threads. Worker threads that + * execute tasks from the threadpool are executing tasks + * in this taskprocessor. + * + * The threadpool itself is actually the private data for + * this taskprocessor's listener. This way, as taskprocessor + * changes occur, the threadpool can alert its listeners + * appropriately. + */ struct ast_taskprocessor *tps; + /*! + * \brief The control taskprocessor + * + * This is a standard taskprocessor that uses the default + * taskprocessor listener. In other words, all tasks queued to + * this taskprocessor have a single thread that executes the + * tasks. + * + * All tasks that modify the state of the threadpool and all tasks + * that call out to threadpool listeners are pushed to this + * taskprocessor. + * + * For instance, when the threadpool changes sizes, a task is put + * into this taskprocessor to do so. When it comes time to tell the + * threadpool listener that worker threads have changed state, + * the task is placed in this taskprocessor. + * + * This is done for three main reasons + * 1) It ensures that listeners are given an accurate portrayal + * of the threadpool's current state. In other words, when a listener + * gets told a count of active, idle and zombie threads, it does not + * need to worry that internal state of the threadpool might be different + * from what it has been told. + * 2) It minimizes the locking required in both the threadpool and in + * threadpool listener's callbacks. + * 3) It ensures that listener callbacks are called in the same order + * that the threadpool had its state change. + */ struct ast_taskprocessor *control_tps; }; +/*! + * \brief states for worker threads + */ enum worker_state { + /*! The worker is either active or idle */ ALIVE, + /*! + * The worker has been asked to shut down but + * may still be in the process of executing tasks. + * This transition happens when the threadpool needs + * to shrink and needs to kill active threads in order + * to do so. + */ ZOMBIE, + /*! + * The worker has been asked to shut down. Typically + * only idle threads go to this state directly, but + * active threads may go straight to this state when + * the threadpool is shut down. + */ DEAD, }; -struct worker_thread { - int id; - ast_cond_t cond; - ast_mutex_t lock; - pthread_t thread; - struct ast_threadpool *pool; - enum worker_state state; - int wake_up; -}; - -static int worker_thread_hash(const void *obj, int flags) -{ - const struct worker_thread *worker = obj; - - return worker->id; -} - -static int worker_thread_cmp(void *obj, void *arg, int flags) -{ - struct worker_thread *worker1 = obj; - struct worker_thread *worker2 = arg; - - return worker1->id == worker2->id ? CMP_MATCH : 0; -} - -static void worker_thread_destroy(void *obj) -{ - struct worker_thread *worker = obj; - ast_mutex_destroy(&worker->lock); - ast_cond_destroy(&worker->cond); -} - -static int worker_active(struct worker_thread *worker); - -static void *worker_start(void *arg) -{ - struct worker_thread *worker = arg; - - worker_active(worker); - return NULL; -} - -static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool) -{ - struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy); - if (!worker) { - return NULL; - } - worker->id = ast_atomic_fetchadd_int(&id_counter, 1); - ast_mutex_init(&worker->lock); - ast_cond_init(&worker->cond, NULL); - worker->pool = pool; - worker->thread = AST_PTHREADT_NULL; - worker->state = ALIVE; - if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) { - ast_log(LOG_ERROR, "Unable to start worker thread!\n"); - ao2_ref(worker, -1); - return NULL; - } - return worker; -} - +/* Worker thread forward declarations. See definitions for documentation */ +struct worker_thread; +static int worker_thread_hash(const void *obj, int flags); +static int worker_thread_cmp(void *obj, void *arg, int flags); +static void worker_thread_destroy(void *obj); +static void worker_active(struct worker_thread *worker); +static void *worker_start(void *arg); +static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool); +static int worker_idle(struct worker_thread *worker); +static void worker_set_state(struct worker_thread *worker, enum worker_state state); +static int worker_shutdown(void *obj, void *arg, int flags); + +/*! + * \brief Notify the threadpool listener that the state has changed. + * + * This notifies the threadpool listener via its state_changed callback. + * \param pool The threadpool whose state has changed + */ static void threadpool_send_state_changed(struct ast_threadpool *pool) { int active_size = ao2_container_count(pool->active_threads); @@ -114,11 +145,19 @@ static void threadpool_send_state_changed(struct ast_threadpool *pool) pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size, zombie_size); } +/*! + * \brief Struct used for queued operations involving worker state changes + */ struct thread_worker_pair { + /*! Threadpool that contains the worker whose state has changed */ struct ast_threadpool *pool; + /*! Worker whose state has changed */ struct worker_thread *worker; }; +/*! + * \brief Destructor for thread_worker_pair + */ static void thread_worker_pair_destructor(void *obj) { struct thread_worker_pair *pair = obj; @@ -126,6 +165,11 @@ static void thread_worker_pair_destructor(void *obj) ao2_ref(pair->worker, -1); } +/*! + * \brief Allocate and initialize a thread_worker_pair + * \param pool Threadpool to assign to the thread_worker_pair + * \param worker Worker thread to assign to the thread_worker_pair + */ static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool, struct worker_thread *worker) { @@ -140,6 +184,13 @@ static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool return pair; } +/*! + * \brief Move a worker thread from the active container to the idle container. + * + * This function is called from the threadpool's control taskprocessor thread. + * \param data A thread_worker_pair containing the threadpool and the worker to move. + * \return 0 + */ static int queued_active_thread_idle(void *data) { struct thread_worker_pair *pair = data; @@ -153,6 +204,14 @@ static int queued_active_thread_idle(void *data) return 0; } +/*! + * \brief Queue a task to move a thread from the active list to the idle list + * + * This is called by a worker thread when it runs out of tasks to perform and + * goes idle. + * \param pool The threadpool to which the worker belongs + * \param worker The worker thread that has gone idle + */ static void threadpool_active_thread_idle(struct ast_threadpool *pool, struct worker_thread *worker) { @@ -163,17 +222,31 @@ static void threadpool_active_thread_idle(struct ast_threadpool *pool, ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair); } +/*! + * \brief Kill a zombie thread + * + * This runs from the threadpool's control taskprocessor thread. + * + * \param data A thread_worker_pair containing the threadpool and the zombie thread + * \return 0 + */ static int queued_zombie_thread_dead(void *data) { struct thread_worker_pair *pair = data; - ao2_unlink(pair->pool->zombie_threads, pair->worker); + ao2_unlink(pair->pool, pair->worker); threadpool_send_state_changed(pair->pool); ao2_ref(pair, -1); return 0; } +/*! + * \brief Queue a task to kill a zombie thread + * + * This is called by a worker thread when it acknowledges that it is time for + * it to die. + */ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, struct worker_thread *worker) { @@ -184,83 +257,56 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair); } -static int worker_idle(struct worker_thread *worker) -{ - SCOPED_MUTEX(lock, &worker->lock); - if (worker->state != ALIVE) { - return 0; - } - threadpool_active_thread_idle(worker->pool, worker); - while (!worker->wake_up) { - ast_cond_wait(&worker->cond, lock); - } - worker->wake_up = 0; - return worker->state == ALIVE; -} - +/*! + * \brief Execute a task in the threadpool + * + * This is the function that worker threads call in order to execute tasks + * in the threadpool + * + * \param pool The pool to which the tasks belong. + * \retval 0 Either the pool has been shut down or there are no tasks. + * \retval 1 There are still tasks remaining in the pool. + */ static int threadpool_execute(struct ast_threadpool *pool) { return ast_taskprocessor_execute(pool->tps); } -static int worker_active(struct worker_thread *worker) -{ - int alive = 1; - while (alive) { - if (threadpool_execute(worker->pool)) { - alive = worker_idle(worker); - } - } - - /* Reaching this portion means the thread is - * on death's door. It may have been killed while - * it was idle, in which case it can just die - * peacefully. If it's a zombie, though, then - * it needs to let the pool know so - * that the thread can be removed from the - * list of zombie threads. - */ - if (worker->state == ZOMBIE) { - threadpool_zombie_thread_dead(worker->pool, worker); - } - - return 0; -} - -static void worker_set_state(struct worker_thread *worker, enum worker_state state) -{ - SCOPED_MUTEX(lock, &worker->lock); - worker->state = state; - worker->wake_up = 1; - ast_cond_signal(&worker->cond); -} - -static int worker_shutdown(void *obj, void *arg, int flags) -{ - struct worker_thread *worker = obj; - - worker_set_state(worker, DEAD); - if (worker->thread != AST_PTHREADT_NULL) { - pthread_join(worker->thread, NULL); - worker->thread = AST_PTHREADT_NULL; - } - return 0; -} - -static void threadpool_destructor(void *private_data) +/*! + * \brief Destroy a threadpool's components. + * + * This is the destructor called automatically when the threadpool's + * reference count reaches zero. This is not to be confused with + * threadpool_destroy. + * + * By the time this actually gets called, most of the cleanup has already + * been done in the pool. The only thing left to do is to release the + * final reference to the threadpool listener. + * + * \param obj The pool to destroy + */ +static void threadpool_destructor(void *obj) { - struct ast_threadpool *pool = private_data; + struct ast_threadpool *pool = obj; /* XXX Probably should let the listener know we're being destroyed? */ /* Threads should all be shut down by now, so this should be a painless * operation */ - ao2_cleanup(pool->active_threads); - ao2_cleanup(pool->idle_threads); - ao2_cleanup(pool->zombie_threads); ao2_cleanup(pool->listener); } +/* + * \brief Allocate a threadpool + * + * This is implemented as a taskprocessor listener's alloc callback. This + * is because the threadpool exists as the private data on a taskprocessor + * listener. + * + * \param listener The taskprocessor listener where the threadpool will live. + * \retval NULL Could not initialize threadpool properly + * \retval non-NULL The newly-allocated threadpool + */ static void *threadpool_alloc(struct ast_taskprocessor_listener *listener) { RAII_VAR(struct ast_threadpool *, pool, @@ -289,17 +335,32 @@ static void *threadpool_alloc(struct ast_taskprocessor_listener *listener) return pool; } +/*! + * \brief helper used for queued task when tasks are pushed + */ struct task_pushed_data { + /*! Pool into which a task was pushed */ struct ast_threadpool *pool; + /*! Indicator of whether the pool had no tasks prior to the new task being added */ int was_empty; }; +/*! + * \brief Destructor for task_pushed_data + */ static void task_pushed_data_destroy(void *obj) { struct task_pushed_data *tpd = obj; ao2_ref(tpd->pool, -1); } +/*! + * \brief Allocate and initialize a task_pushed_data + * \param pool The threadpool to set in the task_pushed_data + * \param was_empty The was_empty value to set in the task_pushed_data + * \retval NULL Unable to allocate task_pushed_data + * \retval non-NULL The newly-allocated task_pushed_data + */ static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool, int was_empty) { @@ -315,6 +376,18 @@ static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *po return tpd; } +/*! + * \brief Activate idle threads + * + * This function always returns CMP_MATCH because all threads that this + * function acts on need to be seen as matches so they are unlinked from the + * list of idle threads. + * + * Called as an ao2_callback in the threadpool's control taskprocessor thread. + * \param obj The worker to activate + * \param arg The pool where the worker belongs + * \retval CMP_MATCH + */ static int activate_threads(void *obj, void *arg, int flags) { struct worker_thread *worker = obj; @@ -322,9 +395,18 @@ static int activate_threads(void *obj, void *arg, int flags) ao2_link(pool->active_threads, worker); worker_set_state(worker, ALIVE); - return 0; + return CMP_MATCH; } +/*! + * \brief Queue task called when tasks are pushed into the threadpool + * + * This function first calls into the threadpool's listener to let it know + * that a task has been pushed. It then wakes up all idle threads and moves + * them into the active thread container. + * \param data A task_pushed_data + * \return 0 + */ static int handle_task_pushed(void *data) { struct task_pushed_data *tpd = data; @@ -332,11 +414,21 @@ static int handle_task_pushed(void *data) int was_empty = tpd->was_empty; pool->listener->callbacks->tps_task_pushed(pool->listener, was_empty); - ao2_callback(pool->idle_threads, OBJ_UNLINK, activate_threads, pool); + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, + activate_threads, pool); ao2_ref(tpd, -1); return 0; } +/*! + * \brief Taskprocessor listener callback called when a task is added + * + * The threadpool uses this opportunity to queue a task on its control taskprocessor + * in order to activate idle threads and notify the threadpool listener that the + * task has been pushed. + * \param listener The taskprocessor listener. The threadpool is the listener's private data + * \param was_empty True if the taskprocessor was empty prior to the task being pushed + */ static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty) { @@ -350,6 +442,13 @@ static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listen ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd); } +/*! + * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied + * + * This simply lets the threadpool's listener know that the threadpool is devoid of tasks + * \param data The pool that has become empty + * \return 0 + */ static int handle_emptied(void *data) { struct ast_threadpool *pool = data; @@ -359,6 +458,13 @@ static int handle_emptied(void *data) return 0; } +/*! + * \brief Taskprocessor listener emptied callback + * + * The threadpool queues a task to let the threadpool listener know that + * the threadpool no longer contains any tasks. + * \param listener The taskprocessor listener. The threadpool is the listener's private data. + */ static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) { struct ast_threadpool *pool = listener->private_data; @@ -367,34 +473,44 @@ static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) ast_taskprocessor_push(pool->control_tps, handle_emptied, pool); } +/*! + * \brief Taskprocessor listener shutdown callback + * + * The threadpool will shut down and destroy all of its worker threads when + * this is called back. By the time this gets called, the taskprocessor's + * control taskprocessor has already been destroyed. Therefore there is no risk + * in outright destroying the worker threads here. + * \param listener The taskprocessor listener. The threadpool is the listener's private data. + */ static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener) { - /* - * The threadpool triggers the taskprocessor to shut down. As a result, - * we have the freedom of shutting things down in three stages: - * - * 1) Before the tasprocessor is shut down - * 2) During taskprocessor shutdown (here) - * 3) After taskprocessor shutdown - * - * In the spirit of the taskprocessor shutdown, this would be - * where we make sure that all the worker threads are no longer - * executing. We could just do this before we even shut down - * the taskprocessor, but this feels more "right". - */ - struct ast_threadpool *pool = listener->private_data; - ao2_callback(pool->active_threads, 0, worker_shutdown, NULL); - ao2_callback(pool->idle_threads, 0, worker_shutdown, NULL); - ao2_callback(pool->zombie_threads, 0, worker_shutdown, NULL); + int flags = (OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE); + + ao2_cleanup(pool->active_threads); + ao2_cleanup(pool->idle_threads); + ao2_cleanup(pool->zombie_threads); } +/*! + * \brief Taskprocessor listener destroy callback + * + * Since the threadpool is an ao2 object, all that is necessary is to + * decrease the refcount. Since the control taskprocessor should already + * be destroyed by this point, this should be the final reference to the + * threadpool. + * + * \param private_data The threadpool to destroy + */ static void threadpool_destroy(void *private_data) { struct ast_threadpool *pool = private_data; ao2_cleanup(pool); } +/*! + * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor + */ static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = { .alloc = threadpool_alloc, .task_pushed = threadpool_tps_task_pushed, @@ -403,6 +519,13 @@ static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callb .destroy = threadpool_destroy, }; +/*! + * \brief Add threads to the threadpool + * + * This function is called from the threadpool's control taskprocessor thread. + * \param pool The pool that is expanding + * \delta The number of threads to add to the pool + */ static void grow(struct ast_threadpool *pool, int delta) { int i; @@ -415,19 +538,49 @@ static void grow(struct ast_threadpool *pool, int delta) } } +/*! + * \brief ao2 callback to kill a set number of threads. + * + * Threads will be unlinked from the container as long as the + * counter has not reached zero. The counter is decremented with + * each thread that is removed. + * \param obj The worker thread up for possible destruction + * \param arg The counter + * \param flags Unused + * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed. + * \retval CMP_STOP The counter has reached zero so no more threads should be removed. + */ static int kill_threads(void *obj, void *arg, int flags) { struct worker_thread *worker = obj; int *num_to_kill = arg; if ((*num_to_kill)-- > 0) { - worker_shutdown(worker, arg, flags); return CMP_MATCH; } else { return CMP_STOP; } } +/*! + * \brief ao2 callback to zombify a set number of threads. + * + * Threads will be zombified as long as as the counter has not reached + * zero. The counter is decremented with each thread that is zombified. + * + * Zombifying a thread involves removing it from its current container, + * adding it to the zombie container, and changing the state of the + * worker to a zombie + * + * This callback is called from the threadpool control taskprocessor thread. + * + * \param obj The worker thread that may be zombified + * \param arg The pool to which the worker belongs + * \param data The counter + * \param flags Unused + * \retval CMP_MATCH The zombified thread should be removed from its current container + * \retval CMP_STOP Stop attempting to zombify threads + */ static int zombify_threads(void *obj, void *arg, void *data, int flags) { struct worker_thread *worker = obj; @@ -443,6 +596,18 @@ static int zombify_threads(void *obj, void *arg, void *data, int flags) } } +/*! + * \brief Remove threads from the threadpool + * + * The preference is to kill idle threads. However, if there are + * more threads to remove than there are idle threads, then active + * threads will be zombified instead. + * + * This function is called from the threadpool control taskprocessor thread. + * + * \param pool The threadpool to remove threads from + * \param delta The number of threads to remove + */ static void shrink(struct ast_threadpool *pool, int delta) { /* @@ -454,24 +619,38 @@ static void shrink(struct ast_threadpool *pool, int delta) int idle_threads_to_kill = MIN(delta, idle_threads); int active_threads_to_zombify = delta - idle_threads_to_kill; - ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK, + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK, kill_threads, &idle_threads_to_kill); - ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK, + ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK, zombify_threads, pool, &active_threads_to_zombify); } +/*! + * \brief Helper struct used for queued operations that change the size of the threadpool + */ struct set_size_data { + /*! The pool whose size is to change */ struct ast_threadpool *pool; + /*! The requested new size of the pool */ unsigned int size; }; +/*! + * \brief Destructor for set_size_data + * \param obj The set_size_data to destroy + */ static void set_size_data_destroy(void *obj) { struct set_size_data *ssd = obj; ao2_ref(ssd->pool, -1); } +/*! + * \brief Allocate and initialize a set_size_data + * \param pool The pool for the set_size_data + * \param size The size to store in the set_size_data + */ static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool, unsigned int size) { @@ -486,6 +665,17 @@ static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool, return ssd; } +/*! + * \brief Change the size of the threadpool + * + * This can either result in shrinking or growing the threadpool depending + * on the new desired size and the current size. + * + * This function is run from the threadpool control taskprocessor thread + * + * \param data A set_size_data used for determining how to act + * \return 0 + */ static int queued_set_size(void *data) { struct set_size_data *ssd = data; @@ -548,12 +738,209 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis return pool; } +int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data) +{ + return ast_taskprocessor_push(pool->tps, task, data); +} + void ast_threadpool_shutdown(struct ast_threadpool *pool) { - /* Pretty simple really. We just shut down the - * taskprocessors and everything else just + /* Shut down the taskprocessors and everything else just * takes care of itself via the taskprocessor callbacks */ ast_taskprocessor_unreference(pool->control_tps); ast_taskprocessor_unreference(pool->tps); } + +/*! + * A thread that executes threadpool tasks + */ +struct worker_thread { + /*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */ + int id; + /*! Condition used in conjunction with state changes */ + ast_cond_t cond; + /*! Lock used alongside the condition for state changes */ + ast_mutex_t lock; + /*! The actual thread that is executing tasks */ + pthread_t thread; + /*! A pointer to the threadpool. Needed to be able to execute tasks */ + struct ast_threadpool *pool; + /*! The current state of the worker thread */ + enum worker_state state; + /*! A boolean used to determine if an idle thread should become active */ + int wake_up; +}; + +/*! + * A monotonically increasing integer used for worker + * thread identification. + */ +static int worker_id_counter; + +static int worker_thread_hash(const void *obj, int flags) +{ + const struct worker_thread *worker = obj; + + return worker->id; +} + +static int worker_thread_cmp(void *obj, void *arg, int flags) +{ + struct worker_thread *worker1 = obj; + struct worker_thread *worker2 = arg; + + return worker1->id == worker2->id ? CMP_MATCH : 0; +} + +/*! + * \brief shut a worker thread down + * + * Set the worker dead and then wait for its thread + * to finish executing. + * + * \param worker The worker thread to shut down + */ +static void worker_shutdown(struct worker_thread *worker) +{ + struct worker_thread *worker = obj; + + worker_set_state(worker, DEAD); + if (worker->thread != AST_PTHREADT_NULL) { + pthread_join(worker->thread, NULL); + worker->thread = AST_PTHREADT_NULL; + } +} + +/*! + * \brief Worker thread destructor + * + * Called automatically when refcount reaches 0. Shuts + * down the worker thread and destroys its component + * parts + */ +static void worker_thread_destroy(void *obj) +{ + struct worker_thread *worker = obj; + worker_shutdown(worker); + ast_mutex_destroy(&worker->lock); + ast_cond_destroy(&worker->cond); +} + +/*! + * \brief start point for worker threads + * + * Worker threads start in the active state but may + * immediately go idle if there is no work to be + * done + * + * \param arg The worker thread + * \retval NULL + */ +static void *worker_start(void *arg) +{ + struct worker_thread *worker = arg; + + worker_active(worker); + return NULL; +} + +/*! + * \brief Allocate and initialize a new worker thread + * + * This will create, initialize, and start the thread. + * + * \param pool The threadpool to which the worker will be added + * \retval NULL Failed to allocate or start the worker thread + * \retval non-NULL The newly-created worker thread + */ +static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool) +{ + struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy); + if (!worker) { + return NULL; + } + worker->id = ast_atomic_fetchadd_int(&worker_id_counter, 1); + ast_mutex_init(&worker->lock); + ast_cond_init(&worker->cond, NULL); + worker->pool = pool; + worker->thread = AST_PTHREADT_NULL; + worker->state = ALIVE; + if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) { + ast_log(LOG_ERROR, "Unable to start worker thread!\n"); + ao2_ref(worker, -1); + return NULL; + } + return worker; +} + +/*! + * \brief Active loop for worker threads + * + * The worker will stay in this loop for its lifetime, + * executing tasks as they become available. If there + * are no tasks currently available, then the thread + * will go idle. + * + * \param worker The worker thread executing tasks. + */ +static void worker_active(struct worker_thread *worker) +{ + int alive = 1; + while (alive) { + if (threadpool_execute(worker->pool)) { + alive = worker_idle(worker); + } + } + + /* Reaching this portion means the thread is + * on death's door. It may have been killed while + * it was idle, in which case it can just die + * peacefully. If it's a zombie, though, then + * it needs to let the pool know so + * that the thread can be removed from the + * list of zombie threads. + */ + if (worker->state == ZOMBIE) { + threadpool_zombie_thread_dead(worker->pool, worker); + } +} + +/*! + * \brief Idle function for worker threads + * + * The worker waits here until it gets told by the threadpool + * to wake up. + * + * \param worker The idle worker + * \retval 0 The thread is being woken up so that it can conclude. + * \retval non-zero The thread is being woken up to do more work. + */ +static int worker_idle(struct worker_thread *worker) +{ + SCOPED_MUTEX(lock, &worker->lock); + if (worker->state != ALIVE) { + return 0; + } + threadpool_active_thread_idle(worker->pool, worker); + while (!worker->wake_up) { + ast_cond_wait(&worker->cond, lock); + } + worker->wake_up = 0; + return worker->state == ALIVE; +} + +/*! + * \brief Change a worker's state + * + * The threadpool calls into this function in order to let a worker know + * how it should proceed. + */ +static void worker_set_state(struct worker_thread *worker, enum worker_state state) +{ + SCOPED_MUTEX(lock, &worker->lock); + worker->state = state; + worker->wake_up = 1; + ast_cond_signal(&worker->cond); +} +