diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h new file mode 100644 index 0000000000000000000000000000000000000000..7a20abdcbea76cb4ffce450cf00dfb64c2046e3c --- /dev/null +++ b/include/asterisk/threadpool.h @@ -0,0 +1,96 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2012, Digium, Inc. + * + * Mark Michelson <mmmichelson@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + + +#ifndef _ASTERISK_THREADPOOL_H +#define _ASTERISK_THREADPOOL_H + +struct ast_threadpool; +struct ast_taskprocessor; +struct ast_threadpool_listener; + +struct ast_threadpool_listener_callbacks { + /*! + * \brief Indicates that the state of threads in the pool has changed + * + * \param listener The threadpool listener + * \param active_threads The number of active threads in the pool + * \param idle_threads The number of idle threads in the pool + * \param zombie_threads The number of zombie threads in the pool + */ + void (*state_changed)(struct ast_threadpool_listener *listener, + int active_threads, + int idle_threads, + int zombie_threads); + /*! + * \brief Indicates that a task was pushed to the threadpool's taskprocessor + * + * \param listener The threadpool listener + * \param was_empty Indicates whether the taskprocessor was empty prior to adding the task + */ + void (*tps_task_pushed)(struct ast_threadpool_listener *listener, + int was_empty); + /*! + * \brief Indicates the threadpoo's taskprocessor has become empty + * + * \param listener The threadpool's listener + */ + void (*emptied)(struct ast_threadpool_listener *listener); +}; + +/*! + * \brief listener for a threadpool + * + * The listener is notified of changes in a threadpool. It can + * react by doing things like increasing the number of threads + * in the pool + */ +struct ast_threadpool_listener { + /*! Callbacks called by the threadpool */ + struct ast_threadpool_listener_callbacks *callbacks; + /*! Handle to the threadpool */ + struct ast_threadpool *threadpool; + /*! User data for the listener */ + void *private_data; +}; + +/*! + * \brief Create a new threadpool + * + * This function creates a threadpool and returns a taskprocessor. Tasks pushed + * to this taskprocessor will be handled by the threadpool and will be reported + * on the threadpool's listener. + * + * \param listener The listener the threadpool will notify of changes + * \retval NULL Failed to create the threadpool + * \retval non-NULL The associated taskprocessor + */ +struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener); + +/*! + * \brief Set the number of threads for the thread pool + * + * This number may be more or less than the current number of + * threads in the threadpool. + * + * \param threadpool The threadpool to adjust + * \param size The new desired size of the threadpool + */ +void ast_threadpool_set_size(struct ast_threadpool *threadpool, unsigned int size); + +#endif /* ASTERISK_THREADPOOL_H */ diff --git a/main/threadpool.c b/main/threadpool.c new file mode 100644 index 0000000000000000000000000000000000000000..362c765cff0c0273aa4dd733e4dde083fa093ce7 --- /dev/null +++ b/main/threadpool.c @@ -0,0 +1,161 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2012, Digium, Inc. + * + * Mark Michelson <mmmichelson@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + + +#include "asterisk.h" + +#include "asterisk/threadpool.h" +#include "asterisk/taskprocessor.h" + +struct ast_threadpool; + +enum worker_state { + ALIVE, + ZOMBIE, + DEAD, +}; + +struct worker_thread { + ast_cond_t cond; + ast_mutex_t lock; + pthread_t thread; + struct ast_threadpool *pool; + AST_LIST_ENTRY(struct worker_thread) next; + int wake_up; + enum worker_state state; +}; + +static int worker_idle(struct worker_thread *worker) +{ + SCOPED_MUTEX(lock, &worker->lock); + if (worker->state != ALIVE) { + return false; + } + threadpool_active_thread_idle(worker->pool, worker); + while (!worker->wake_up) { + ast_cond_wait(&worker->cond, lock); + } + worker->wake_up = false; + return worker->state == ALIVE; +} + +static int worker_active(struct worker_thread *worker) +{ + int alive = 1; + while (alive) { + if (threadpool_execute(worker->pool)) { + alive = worker_idle(worker); + } + } + + /* Reaching this portion means the thread is + * on death's door. It may have been killed while + * it was idle, in which case it can just die + * peacefully. If it's a zombie, though, then + * it needs to let the pool know so + * that the thread can be removed from the + * list of zombie threads. + */ + if (worker->state == ZOMBIE) { + threadpool_zombie_thread_dead(worker->pool, worker); + } + + return 0; +} + +struct ast_threadpool { + struct ast_threadpool_listener *threadpool_listener; + int active_threads; + int idle_threads; + int zombie_threads; +} + +static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *listener) +{ + RAII_VAR(ast_threadpool *, threadpool, + ao2_alloc(sizeof(*threadpool), threadpool_destroy), ao2_cleanup); + + return threadpool; +} + +static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener) +{ + /* XXX stub */ +} + +static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener) +{ + /* XXX stub */ +} + +static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener) +{ + /* XXX stub */ +} + +static void threadpool_tps_listener_destroy(struct ast_taskprocessor_listener *listener) +{ + /* XXX stub */ +} + +static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = { + .alloc = threadpool_tps_listener_alloc, + .task_pushed = threadpool_tps_task_pushed, + .emptied = threadpool_tps_emptied, + .shutdown = threadpool_tps_shutdown, + .destroy = threadpool_tps_listener_destroy, +}; + +/*! + * \brief Allocate the taskprocessor to be used for the threadpool + * + * We use a custom taskprocessor listener. We allocate our custom + * listener and then create a taskprocessor. + */ +static struct ast_taskprocessor_listener *threadpool_tps_alloc(void) +{ + RAII_VAR(struct threadpool_tps_listener *, tps_listener, + ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks), + ao2_cleanup); + + if (!tps_listener) { + return NULL; + } + + return ast_taskprocessor_create_with_listener(tps_listener); +} + +void ast_threadpool_set_size(struct ast_threadpool *pool, int size) +{ +} + +struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size) +{ + struct ast_threadpool *pool; + RAII_VAR(ast_taskprocessor *, tps, threadpool_tps_alloc(), ast_taskprocessor_unreference); + + if (!tps) { + return NULL; + } + + pool = tps->listener->private_data; + pool->tps = tps; + ast_threadpool_set_size(pool, initial_size); + + return pool; +}