Skip to content
Snippets Groups Projects
threadpool.c 39.1 KiB
Newer Older
/*
 * Asterisk -- An open source telephony toolkit.
 *
 * Copyright (C) 2012-2013, Digium, Inc.
 *
 * Mark Michelson <mmmichelson@digium.com>
 *
 * See http://www.asterisk.org for more information about
 * the Asterisk project. Please do not directly contact
 * any of the maintainers of this project for assistance;
 * the project provides a web site, mailing lists and IRC
 * channels for your use.
 *
 * This program is free software, distributed under the terms of
 * the GNU General Public License Version 2. See the LICENSE file
 * at the top of the source tree.
 */


#include "asterisk.h"

#include "asterisk/threadpool.h"
#include "asterisk/taskprocessor.h"
Mark Michelson's avatar
Mark Michelson committed
#include "asterisk/astobj2.h"
#include "asterisk/utils.h"
/* Needs to stay prime if increased */
Mark Michelson's avatar
Mark Michelson committed
#define THREAD_BUCKETS 89

/*!
 * \brief An opaque threadpool structure
 *
 * A threadpool is a collection of threads that execute
 * tasks from a common queue.
 */
Mark Michelson's avatar
Mark Michelson committed
struct ast_threadpool {
	/*! Threadpool listener */
Mark Michelson's avatar
Mark Michelson committed
	struct ast_threadpool_listener *listener;
	 * \brief The container of active threads.
	 * Active threads are those that are currently running tasks
	 */
Mark Michelson's avatar
Mark Michelson committed
	struct ao2_container *active_threads;
	 * \brief The container of idle threads.
	 * Idle threads are those that are currenly waiting to run tasks
	 */
Mark Michelson's avatar
Mark Michelson committed
	struct ao2_container *idle_threads;
	 * \brief The container of zombie threads.
	 * Zombie threads may be running tasks, but they are scheduled to die soon
	 */
	struct ao2_container *zombie_threads;
	 * \brief The main taskprocessor
	 * Tasks that are queued in this taskprocessor are
	 * doled out to the worker threads. Worker threads that
	 * execute tasks from the threadpool are executing tasks
	 * in this taskprocessor.
	 *
	 * The threadpool itself is actually the private data for
	 * this taskprocessor's listener. This way, as taskprocessor
	 * changes occur, the threadpool can alert its listeners
	 * appropriately.
	 */
Mark Michelson's avatar
Mark Michelson committed
	struct ast_taskprocessor *tps;
	/*!
	 * \brief The control taskprocessor
	 *
	 * This is a standard taskprocessor that uses the default
	 * taskprocessor listener. In other words, all tasks queued to
	 * this taskprocessor have a single thread that executes the
	 * tasks.
	 *
	 * All tasks that modify the state of the threadpool and all tasks
	 * that call out to threadpool listeners are pushed to this
	 * taskprocessor.
	 *
	 * For instance, when the threadpool changes sizes, a task is put
	 * into this taskprocessor to do so. When it comes time to tell the
	 * threadpool listener that worker threads have changed state,
	 * the task is placed in this taskprocessor.
	 *
	 * This is done for three main reasons
	 * 1) It ensures that listeners are given an accurate portrayal
	 * of the threadpool's current state. In other words, when a listener
	 * gets told a count of active, idle and zombie threads, it does not
	 * need to worry that internal state of the threadpool might be different
	 * from what it has been told.
	 * 2) It minimizes the locking required in both the threadpool and in
	 * threadpool listener's callbacks.
	 * 3) It ensures that listener callbacks are called in the same order
	 * that the threadpool had its state change.
	 */
Mark Michelson's avatar
Mark Michelson committed
	struct ast_taskprocessor *control_tps;
	/*! True if the threadpool is in the process of shutting down */
	/*! Threadpool-specific options */
	struct ast_threadpool_options options;
Mark Michelson's avatar
Mark Michelson committed
};
/*!
 * \brief listener for a threadpool
 *
 * The listener is notified of changes in a threadpool. It can
 * react by doing things like increasing the number of threads
 * in the pool
 */
struct ast_threadpool_listener {
	/*! Callbacks called by the threadpool */
	const struct ast_threadpool_listener_callbacks *callbacks;
	/*! User data for the listener */
	void *user_data;
};

/*!
 * \brief states for worker threads
 */
enum worker_state {
	/*! The worker is either active or idle */
	/*!
	 * The worker has been asked to shut down but
	 * may still be in the process of executing tasks.
	 * This transition happens when the threadpool needs
	 * to shrink and needs to kill active threads in order
	 * to do so.
	 */
	ZOMBIE,
	/*!
	 * The worker has been asked to shut down. Typically
	 * only idle threads go to this state directly, but
	 * active threads may go straight to this state when
	 * the threadpool is shut down.
	 */
/*!
 * A thread that executes threadpool tasks
 */
struct worker_thread {
	/*! A unique (within a run of Asterisk) ID for the thread. Used for hashing and searching */
	int id;
	/*! Condition used in conjunction with state changes */
	ast_cond_t cond;
	/*! Lock used alongside the condition for state changes */
	ast_mutex_t lock;
	/*! The actual thread that is executing tasks */
	pthread_t thread;
	/*! A pointer to the threadpool. Needed to be able to execute tasks */
	struct ast_threadpool *pool;
	/*! The current state of the worker thread */
	enum worker_state state;
	/*! A boolean used to determine if an idle thread should become active */
	int wake_up;
	/*! Options for this threadpool */
	struct ast_threadpool_options options;
};

/* Worker thread forward declarations. See definitions for documentation */
static int worker_thread_hash(const void *obj, int flags);
static int worker_thread_cmp(void *obj, void *arg, int flags);
static void worker_thread_destroy(void *obj);
static void worker_active(struct worker_thread *worker);
static void *worker_start(void *arg);
static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool);
static int worker_thread_start(struct worker_thread *worker);
static int worker_idle(struct worker_thread *worker);
static int worker_set_state(struct worker_thread *worker, enum worker_state state);
static void worker_shutdown(struct worker_thread *worker);

/*!
 * \brief Notify the threadpool listener that the state has changed.
 *
 * This notifies the threadpool listener via its state_changed callback.
 * \param pool The threadpool whose state has changed
 */
Mark Michelson's avatar
Mark Michelson committed
static void threadpool_send_state_changed(struct ast_threadpool *pool)
{
	int active_size = ao2_container_count(pool->active_threads);
	int idle_size = ao2_container_count(pool->idle_threads);

	if (pool->listener && pool->listener->callbacks->state_changed) {
		pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
	}
/*!
 * \brief Struct used for queued operations involving worker state changes
 */
Mark Michelson's avatar
Mark Michelson committed
struct thread_worker_pair {
	/*! Threadpool that contains the worker whose state has changed */
Mark Michelson's avatar
Mark Michelson committed
	struct ast_threadpool *pool;
	/*! Worker whose state has changed */
Mark Michelson's avatar
Mark Michelson committed
	struct worker_thread *worker;
/*!
 * \brief Destructor for thread_worker_pair
 */
static void thread_worker_pair_free(struct thread_worker_pair *pair)
Mark Michelson's avatar
Mark Michelson committed
{
	ao2_ref(pair->worker, -1);
/*!
 * \brief Allocate and initialize a thread_worker_pair
 * \param pool Threadpool to assign to the thread_worker_pair
 * \param worker Worker thread to assign to the thread_worker_pair
 */
static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool,
Mark Michelson's avatar
Mark Michelson committed
		struct worker_thread *worker)
{
	struct thread_worker_pair *pair = ast_malloc(sizeof(*pair));
Mark Michelson's avatar
Mark Michelson committed
	if (!pair) {
		return NULL;
	}
Mark Michelson's avatar
Mark Michelson committed
	pair->pool = pool;
	ao2_ref(worker, +1);
	pair->worker = worker;
Mark Michelson's avatar
Mark Michelson committed
	return pair;
}

/*!
 * \brief Move a worker thread from the active container to the idle container.
 *
 * This function is called from the threadpool's control taskprocessor thread.
 * \param data A thread_worker_pair containing the threadpool and the worker to move.
 * \return 0
 */
Mark Michelson's avatar
Mark Michelson committed
static int queued_active_thread_idle(void *data)
{
	struct thread_worker_pair *pair = data;

	ao2_link(pair->pool->idle_threads, pair->worker);
	ao2_unlink(pair->pool->active_threads, pair->worker);

	threadpool_send_state_changed(pair->pool);

	thread_worker_pair_free(pair);
Mark Michelson's avatar
Mark Michelson committed
	return 0;
}

/*!
 * \brief Queue a task to move a thread from the active list to the idle list
 *
 * This is called by a worker thread when it runs out of tasks to perform and
 * goes idle.
 * \param pool The threadpool to which the worker belongs
 * \param worker The worker thread that has gone idle
 */
Mark Michelson's avatar
Mark Michelson committed
static void threadpool_active_thread_idle(struct ast_threadpool *pool,
		struct worker_thread *worker)
{
	struct thread_worker_pair *pair;
	SCOPED_AO2LOCK(lock, pool);
	if (pool->shutting_down) {
		return;
	}
	pair = thread_worker_pair_alloc(pool, worker);
Mark Michelson's avatar
Mark Michelson committed
	if (!pair) {
		return;
	}

	if (ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair)) {
		thread_worker_pair_free(pair);
	}
/*!
 * \brief Kill a zombie thread
 *
 * This runs from the threadpool's control taskprocessor thread.
 *
 * \param data A thread_worker_pair containing the threadpool and the zombie thread
 * \return 0
 */
static int queued_zombie_thread_dead(void *data)
{
	struct thread_worker_pair *pair = data;

	ao2_unlink(pair->pool->zombie_threads, pair->worker);
	threadpool_send_state_changed(pair->pool);

	thread_worker_pair_free(pair);
	return 0;
}

/*!
 * \brief Queue a task to kill a zombie thread
 *
 * This is called by a worker thread when it acknowledges that it is time for
 * it to die.
 */
static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
		struct worker_thread *worker)
{
	struct thread_worker_pair *pair;
	SCOPED_AO2LOCK(lock, pool);
	if (pool->shutting_down) {
		return;
	}
	pair = thread_worker_pair_alloc(pool, worker);
	if (!pair) {
		return;
	}

	if (ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair)) {
		thread_worker_pair_free(pair);
	}
static int queued_idle_thread_dead(void *data)
{
	struct thread_worker_pair *pair = data;

	ao2_unlink(pair->pool->idle_threads, pair->worker);
	threadpool_send_state_changed(pair->pool);

	thread_worker_pair_free(pair);
	return 0;
}

static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
		struct worker_thread *worker)
{
	struct thread_worker_pair *pair;
	SCOPED_AO2LOCK(lock, pool);
	if (pool->shutting_down) {
		return;
	}
	pair = thread_worker_pair_alloc(pool, worker);
	if (!pair) {
		return;
	}

	if (ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair)) {
		thread_worker_pair_free(pair);
	}
/*!
 * \brief Execute a task in the threadpool
 * This is the function that worker threads call in order to execute tasks
 * in the threadpool
 *
 * \param pool The pool to which the tasks belong.
 * \retval 0 Either the pool has been shut down or there are no tasks.
 * \retval 1 There are still tasks remaining in the pool.
 */
Mark Michelson's avatar
Mark Michelson committed
static int threadpool_execute(struct ast_threadpool *pool)
{
	ao2_lock(pool);
		ao2_unlock(pool);
		return ast_taskprocessor_execute(pool->tps);
	}
	ao2_unlock(pool);
/*!
 * \brief Destroy a threadpool's components.
 *
 * This is the destructor called automatically when the threadpool's
 * reference count reaches zero. This is not to be confused with
 * threadpool_destroy.
 *
 * By the time this actually gets called, most of the cleanup has already
 * been done in the pool. The only thing left to do is to release the
 * final reference to the threadpool listener.
 *
 * \param obj The pool to destroy
 */
static void threadpool_destructor(void *obj)
Mark Michelson's avatar
Mark Michelson committed
{
	struct ast_threadpool *pool = obj;
	ao2_cleanup(pool->listener);
Mark Michelson's avatar
Mark Michelson committed
}
/*
 * \brief Allocate a threadpool
 *
 * This is implemented as a taskprocessor listener's alloc callback. This
 * is because the threadpool exists as the private data on a taskprocessor
 * listener.
 *
 * \param name The name of the threadpool.
 * \param options The options the threadpool uses.
 * \retval NULL Could not initialize threadpool properly
 * \retval non-NULL The newly-allocated threadpool
 */
static struct ast_threadpool *threadpool_alloc(const char *name, const struct ast_threadpool_options *options)
	RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
	struct ast_str *control_tps_name;
	pool = ao2_alloc(sizeof(*pool), threadpool_destructor);
	control_tps_name = ast_str_create(64);
	if (!pool || !control_tps_name) {
		ast_free(control_tps_name);
	ast_str_set(&control_tps_name, 0, "%s-control", name);
	pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
	ast_free(control_tps_name);
Mark Michelson's avatar
Mark Michelson committed
	if (!pool->control_tps) {
		return NULL;
	}
	pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
	if (!pool->active_threads) {
		return NULL;
	}
	pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
	if (!pool->idle_threads) {
		return NULL;
	}
	pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
	if (!pool->zombie_threads) {
		return NULL;
	}
Mark Michelson's avatar
Mark Michelson committed
	ao2_ref(pool, +1);
	return pool;
static int threadpool_tps_start(struct ast_taskprocessor_listener *listener)
{
	return 0;
}

/*!
 * \brief helper used for queued task when tasks are pushed
 */
Mark Michelson's avatar
Mark Michelson committed
struct task_pushed_data {
	/*! Pool into which a task was pushed */
Mark Michelson's avatar
Mark Michelson committed
	struct ast_threadpool *pool;
	/*! Indicator of whether the pool had no tasks prior to the new task being added */
Mark Michelson's avatar
Mark Michelson committed
	int was_empty;
};

/*!
 * \brief Allocate and initialize a task_pushed_data
 * \param pool The threadpool to set in the task_pushed_data
 * \param was_empty The was_empty value to set in the task_pushed_data
 * \retval NULL Unable to allocate task_pushed_data
 * \retval non-NULL The newly-allocated task_pushed_data
 */
Mark Michelson's avatar
Mark Michelson committed
static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
		int was_empty)
	struct task_pushed_data *tpd = ast_malloc(sizeof(*tpd));
Mark Michelson's avatar
Mark Michelson committed

	if (!tpd) {
		return NULL;
	}
	tpd->pool = pool;
	tpd->was_empty = was_empty;
	return tpd;
/*!
 * \brief Activate idle threads
 *
 * This function always returns CMP_MATCH because all workers that this
 * function acts on need to be seen as matches so they are unlinked from the
 * list of idle threads.
 *
 * Called as an ao2_callback in the threadpool's control taskprocessor thread.
 * \param obj The worker to activate
 * \param arg The pool where the worker belongs
 * \retval CMP_MATCH
 */
static int activate_thread(void *obj, void *arg, int flags)
Mark Michelson's avatar
Mark Michelson committed
{
	struct worker_thread *worker = obj;
	struct ast_threadpool *pool = arg;

	if (!ao2_link(pool->active_threads, worker)) {
		/* If we can't link the idle thread into the active container, then
		 * we'll just leave the thread idle and not wake it up.
		 */
		ast_log(LOG_WARNING, "Failed to activate thread %d. Remaining idle\n",
				worker->id);
		return 0;
	}

	if (worker_set_state(worker, ALIVE)) {
		ast_debug(1, "Failed to activate thread %d. It is dead\n",
				worker->id);
		/* The worker thread will no longer exist in the active threads or
		 * idle threads container after this.
		 */
		ao2_unlink(pool->active_threads, worker);
	}

	return CMP_MATCH;
/*!
 * \brief Add threads to the threadpool
 *
 * This function is called from the threadpool's control taskprocessor thread.
 * \param pool The pool that is expanding
 * \delta The number of threads to add to the pool
 */
static void grow(struct ast_threadpool *pool, int delta)
{
	int i;

	int current_size = ao2_container_count(pool->active_threads) +
		ao2_container_count(pool->idle_threads);

	if (pool->options.max_size && current_size + delta > pool->options.max_size) {
		delta = pool->options.max_size - current_size;
	}

	ast_debug(3, "Increasing threadpool %s's size by %d\n",
			ast_taskprocessor_name(pool->tps), delta);

	for (i = 0; i < delta; ++i) {
		struct worker_thread *worker = worker_thread_alloc(pool);
		if (!worker) {
			return;
		}
		if (ao2_link(pool->idle_threads, worker)) {
			if (worker_thread_start(worker)) {
				ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
				ao2_unlink(pool->active_threads, worker);
			}
		} else {
			ast_log(LOG_WARNING, "Failed to activate worker thread %d. Destroying.\n", worker->id);
		}
 * \brief Queued task called when tasks are pushed into the threadpool
 *
 * This function first calls into the threadpool's listener to let it know
 * that a task has been pushed. It then wakes up all idle threads and moves
 * them into the active thread container.
 * \param data A task_pushed_data
 * \return 0
 */
static int queued_task_pushed(void *data)
Mark Michelson's avatar
Mark Michelson committed
	struct task_pushed_data *tpd = data;
	struct ast_threadpool *pool = tpd->pool;
	int was_empty = tpd->was_empty;
Mark Michelson's avatar
Mark Michelson committed

	if (pool->listener && pool->listener->callbacks->task_pushed) {
		pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
	}

	existing_active = ao2_container_count(pool->active_threads);

	/* The first pass transitions any existing idle threads to be active, and
	 * will also remove any worker threads that have recently entered the dead
	 * state.
	 */
	ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
			activate_thread, pool);

	/* If no idle threads could be transitioned to active grow the pool as permitted. */
	if (ao2_container_count(pool->active_threads) == existing_active) {
		if (!pool->options.auto_increment) {
			return 0;
		grow(pool, pool->options.auto_increment);
		/* An optional second pass transitions any newly added threads. */
		ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
				activate_thread, pool);

	threadpool_send_state_changed(pool);
Mark Michelson's avatar
Mark Michelson committed
	return 0;
/*!
 * \brief Taskprocessor listener callback called when a task is added
 *
 * The threadpool uses this opportunity to queue a task on its control taskprocessor
 * in order to activate idle threads and notify the threadpool listener that the
 * task has been pushed.
 * \param listener The taskprocessor listener. The threadpool is the listener's private data
 * \param was_empty True if the taskprocessor was empty prior to the task being pushed
 */
Mark Michelson's avatar
Mark Michelson committed
static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
		int was_empty)
	struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
	struct task_pushed_data *tpd;
	SCOPED_AO2LOCK(lock, pool);
Mark Michelson's avatar
Mark Michelson committed

	if (pool->shutting_down) {
		return;
	}
	tpd = task_pushed_data_alloc(pool, was_empty);
Mark Michelson's avatar
Mark Michelson committed
	if (!tpd) {
		return;
	}

	if (ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd)) {
		ast_free(tpd);
	}
/*!
 * \brief Queued task that handles the case where the threadpool's taskprocessor is emptied
 *
 * This simply lets the threadpool's listener know that the threadpool is devoid of tasks
 * \param data The pool that has become empty
 * \return 0
 */
static int queued_emptied(void *data)
Mark Michelson's avatar
Mark Michelson committed
{
	struct ast_threadpool *pool = data;

	/* We already checked for existence of this callback when this was queued */
	pool->listener->callbacks->emptied(pool, pool->listener);
Mark Michelson's avatar
Mark Michelson committed
	return 0;
}

/*!
 * \brief Taskprocessor listener emptied callback
 *
 * The threadpool queues a task to let the threadpool listener know that
 * the threadpool no longer contains any tasks.
 * \param listener The taskprocessor listener. The threadpool is the listener's private data.
 */
Mark Michelson's avatar
Mark Michelson committed
static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
{
	struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
	SCOPED_AO2LOCK(lock, pool);

	if (pool->shutting_down) {
		return;
	}
Mark Michelson's avatar
Mark Michelson committed

	if (pool->listener && pool->listener->callbacks->emptied) {
		ast_taskprocessor_push(pool->control_tps, queued_emptied, pool);
	}
/*!
 * \brief Taskprocessor listener shutdown callback
 *
 * The threadpool will shut down and destroy all of its worker threads when
 * this is called back. By the time this gets called, the taskprocessor's
 * control taskprocessor has already been destroyed. Therefore there is no risk
 * in outright destroying the worker threads here.
 * \param listener The taskprocessor listener. The threadpool is the listener's private data.
 */
Mark Michelson's avatar
Mark Michelson committed
static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
{
	struct ast_threadpool *pool = ast_taskprocessor_listener_get_user_data(listener);
	if (pool->listener && pool->listener->callbacks->shutdown) {
		pool->listener->callbacks->shutdown(pool->listener);
	}
	ao2_cleanup(pool->active_threads);
	ao2_cleanup(pool->idle_threads);
	ao2_cleanup(pool->zombie_threads);
	ao2_cleanup(pool);
}

/*!
 * \brief Table of taskprocessor listener callbacks for threadpool's main taskprocessor
 */
static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
	.start = threadpool_tps_start,
	.task_pushed = threadpool_tps_task_pushed,
	.emptied = threadpool_tps_emptied,
	.shutdown = threadpool_tps_shutdown,
};

/*!
 * \brief ao2 callback to kill a set number of threads.
 *
 * Threads will be unlinked from the container as long as the
 * counter has not reached zero. The counter is decremented with
 * each thread that is removed.
 * \param obj The worker thread up for possible destruction
 * \param arg The counter
 * \param flags Unused
 * \retval CMP_MATCH The counter has not reached zero, so this flag should be removed.
 * \retval CMP_STOP The counter has reached zero so no more threads should be removed.
 */
Mark Michelson's avatar
Mark Michelson committed
static int kill_threads(void *obj, void *arg, int flags)
{
	int *num_to_kill = arg;

	if (*num_to_kill > 0) {
		--(*num_to_kill);
		return CMP_MATCH;
	} else {
		return CMP_STOP;
	}
}

/*!
 * \brief ao2 callback to zombify a set number of threads.
 *
Corey Farrell's avatar
Corey Farrell committed
 * Threads will be zombified as long as the counter has not reached
 * zero. The counter is decremented with each thread that is zombified.
 *
 * Zombifying a thread involves removing it from its current container,
 * adding it to the zombie container, and changing the state of the
 * worker to a zombie
 *
 * This callback is called from the threadpool control taskprocessor thread.
 *
 * \param obj The worker thread that may be zombified
 * \param arg The pool to which the worker belongs
 * \param data The counter
 * \param flags Unused
 * \retval CMP_MATCH The zombified thread should be removed from its current container
 * \retval CMP_STOP Stop attempting to zombify threads
 */
static int zombify_threads(void *obj, void *arg, void *data, int flags)
{
	struct worker_thread *worker = obj;
	struct ast_threadpool *pool = arg;
	int *num_to_zombify = data;

	if ((*num_to_zombify)-- > 0) {
		if (!ao2_link(pool->zombie_threads, worker)) {
			ast_log(LOG_WARNING, "Failed to zombify active thread %d. Thread will remain active\n", worker->id);
			return 0;
		}
		worker_set_state(worker, ZOMBIE);
Mark Michelson's avatar
Mark Michelson committed
		return CMP_MATCH;
	} else {
		return CMP_STOP;
	}
}

/*!
 * \brief Remove threads from the threadpool
 *
 * The preference is to kill idle threads. However, if there are
 * more threads to remove than there are idle threads, then active
 * threads will be zombified instead.
 *
 * This function is called from the threadpool control taskprocessor thread.
 *
 * \param pool The threadpool to remove threads from
 * \param delta The number of threads to remove
 */
Mark Michelson's avatar
Mark Michelson committed
static void shrink(struct ast_threadpool *pool, int delta)
{
	 * Preference is to kill idle threads, but
	 * we'll move on to deactivating active threads
	 * if we have to
	 */
Mark Michelson's avatar
Mark Michelson committed
	int idle_threads = ao2_container_count(pool->idle_threads);
	int idle_threads_to_kill = MIN(delta, idle_threads);
	int active_threads_to_zombify = delta - idle_threads_to_kill;
	ast_debug(3, "Destroying %d idle threads in threadpool %s\n", idle_threads_to_kill,
			ast_taskprocessor_name(pool->tps));
	ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
Mark Michelson's avatar
Mark Michelson committed
			kill_threads, &idle_threads_to_kill);

	ast_debug(3, "Destroying %d active threads in threadpool %s\n", active_threads_to_zombify,
			ast_taskprocessor_name(pool->tps));
	ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
			zombify_threads, pool, &active_threads_to_zombify);
/*!
 * \brief Helper struct used for queued operations that change the size of the threadpool
 */
Mark Michelson's avatar
Mark Michelson committed
struct set_size_data {
	/*! The pool whose size is to change */
Mark Michelson's avatar
Mark Michelson committed
	struct ast_threadpool *pool;
	/*! The requested new size of the pool */
	unsigned int size;
/*!
 * \brief Allocate and initialize a set_size_data
 * \param pool The pool for the set_size_data
 * \param size The size to store in the set_size_data
 */
Mark Michelson's avatar
Mark Michelson committed
static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
		unsigned int size)
Mark Michelson's avatar
Mark Michelson committed
{
	struct set_size_data *ssd = ast_malloc(sizeof(*ssd));
Mark Michelson's avatar
Mark Michelson committed
	if (!ssd) {
		return NULL;
	}

	ssd->pool = pool;
	ssd->size = size;
	return ssd;
}

/*!
 * \brief Change the size of the threadpool
 *
 * This can either result in shrinking or growing the threadpool depending
 * on the new desired size and the current size.
 *
 * This function is run from the threadpool control taskprocessor thread
 *
 * \param data A set_size_data used for determining how to act
 * \return 0
 */
Mark Michelson's avatar
Mark Michelson committed
static int queued_set_size(void *data)
{
	struct set_size_data *ssd = data;
Mark Michelson's avatar
Mark Michelson committed
	struct ast_threadpool *pool = ssd->pool;
	unsigned int num_threads = ssd->size;

	/* We don't count zombie threads as being "live" when potentially resizing */
	unsigned int current_size = ao2_container_count(pool->active_threads) +
			ao2_container_count(pool->idle_threads);
	if (current_size == num_threads) {
		ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n",
			  num_threads, current_size);
Mark Michelson's avatar
Mark Michelson committed
		return 0;
	}

	if (current_size < num_threads) {
		ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
				activate_thread, pool);

		/* As the above may have altered the number of current threads update it */
		current_size = ao2_container_count(pool->active_threads) +
				ao2_container_count(pool->idle_threads);
		grow(pool, num_threads - current_size);
		ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
				activate_thread, pool);
Mark Michelson's avatar
Mark Michelson committed
	} else {
		shrink(pool, current_size - num_threads);
Mark Michelson's avatar
Mark Michelson committed
	}

	threadpool_send_state_changed(pool);
Mark Michelson's avatar
Mark Michelson committed
	return 0;
Mark Michelson's avatar
Mark Michelson committed
void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
Mark Michelson's avatar
Mark Michelson committed
	struct set_size_data *ssd;
	SCOPED_AO2LOCK(lock, pool);
	if (pool->shutting_down) {
		return;
	}
Mark Michelson's avatar
Mark Michelson committed
	ssd = set_size_data_alloc(pool, size);
Mark Michelson's avatar
Mark Michelson committed
	if (!ssd) {
		return;
	}

	if (ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd)) {
		ast_free(ssd);
	}
struct ast_threadpool_listener *ast_threadpool_listener_alloc(
		const struct ast_threadpool_listener_callbacks *callbacks, void *user_data)
	struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), NULL);
	if (!listener) {
		return NULL;
	}
	listener->callbacks = callbacks;
	listener->user_data = user_data;
	return listener;
}

void *ast_threadpool_listener_get_user_data(const struct ast_threadpool_listener *listener)
{
	return listener->user_data;
}

struct pool_options_pair {
	struct ast_threadpool *pool;
	struct ast_threadpool_options options;
};

struct ast_threadpool *ast_threadpool_create(const char *name,
		struct ast_threadpool_listener *listener,
		const struct ast_threadpool_options *options)
Mark Michelson's avatar
Mark Michelson committed
	struct ast_taskprocessor *tps;
	RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
	RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
Mark Michelson's avatar
Mark Michelson committed

	pool = threadpool_alloc(name, options);
	tps_listener = ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks, pool);
Mark Michelson's avatar
Mark Michelson committed
	if (!tps_listener) {
		return NULL;
	}

	if (options->version != AST_THREADPOOL_OPTIONS_VERSION) {
		ast_log(LOG_WARNING, "Incompatible version of threadpool options in use.\n");
		return NULL;
	}

	tps = ast_taskprocessor_create_with_listener(name, tps_listener);
	pool->tps = tps;
	if (listener) {
		ao2_ref(listener, +1);
		pool->listener = listener;
	}
	ast_threadpool_set_size(pool, pool->options.initial_size);
Mark Michelson's avatar
Mark Michelson committed

int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), void *data)
{
	SCOPED_AO2LOCK(lock, pool);
	if (!pool->shutting_down) {
		return ast_taskprocessor_push(pool->tps, task, data);
	}
Mark Michelson's avatar
Mark Michelson committed
void ast_threadpool_shutdown(struct ast_threadpool *pool)
{
	/* Shut down the taskprocessors and everything else just
Mark Michelson's avatar
Mark Michelson committed
	 * takes care of itself via the taskprocessor callbacks
	 */
	ao2_lock(pool);
	pool->shutting_down = 1;
	ao2_unlock(pool);
Mark Michelson's avatar
Mark Michelson committed
	ast_taskprocessor_unreference(pool->control_tps);
	ast_taskprocessor_unreference(pool->tps);
}

/*!
 * A monotonically increasing integer used for worker
 * thread identification.
 */
static int worker_id_counter;

static int worker_thread_hash(const void *obj, int flags)
{
	const struct worker_thread *worker = obj;

	return worker->id;
}

static int worker_thread_cmp(void *obj, void *arg, int flags)
{
	struct worker_thread *worker1 = obj;
	struct worker_thread *worker2 = arg;

	return worker1->id == worker2->id ? CMP_MATCH : 0;
}

/*!
 * \brief shut a worker thread down
 *
 * Set the worker dead and then wait for its thread
 * to finish executing.
 *
 * \param worker The worker thread to shut down
 */