Skip to content
Snippets Groups Projects
Commit 7995bc63 authored by Mark Michelson's avatar Mark Michelson
Browse files

Some general cleanup, plus we now send state changes when threads activate.

This is now ready for review board, imo!



git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377805 65c4cc65-6c06-0410-ace0-fbb531ad65f3
parent 2abda640
No related branches found
No related tags found
No related merge requests found
......@@ -89,7 +89,7 @@ struct ast_threadpool_options {
/*!
* \brief Time limit in seconds for idle threads
*
* A time of 0 or less will mean an infinite timeout.
* A time of 0 or less will mean no timeout.
*/
int idle_timeout;
/*!
......
......@@ -435,7 +435,28 @@ static int activate_thread(void *obj, void *arg, int flags)
return CMP_MATCH;
}
static void grow(struct ast_threadpool *pool, int delta);
/*!
* \brief Add threads to the threadpool
*
* This function is called from the threadpool's control taskprocessor thread.
* \param pool The pool that is expanding
* \delta The number of threads to add to the pool
*/
static void grow(struct ast_threadpool *pool, int delta)
{
int i;
ast_debug(1, "Going to increase threadpool size by %d\n", delta);
for (i = 0; i < delta; ++i) {
struct worker_thread *worker = worker_thread_alloc(pool);
if (!worker) {
return;
}
ao2_link(pool->active_threads, worker);
ao2_ref(worker, -1);
}
}
/*!
* \brief Queued task called when tasks are pushed into the threadpool
......@@ -451,15 +472,22 @@ static int queued_task_pushed(void *data)
struct task_pushed_data *tpd = data;
struct ast_threadpool *pool = tpd->pool;
int was_empty = tpd->was_empty;
int state_changed;
pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
if (ao2_container_count(pool->idle_threads) == 0 && pool->options.auto_increment > 0) {
grow(pool, pool->options.auto_increment);
if (ao2_container_count(pool->idle_threads) == 0) {
if (pool->options.auto_increment > 0) {
grow(pool, pool->options.auto_increment);
state_changed = 1;
}
} else {
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
activate_thread, pool);
}
threadpool_send_state_changed(pool);
state_changed = 1;
}
if (state_changed) {
threadpool_send_state_changed(pool);
}
ao2_ref(tpd, -1);
return 0;
}
......@@ -571,29 +599,6 @@ static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callb
.destroy = threadpool_destroy,
};
/*!
* \brief Add threads to the threadpool
*
* This function is called from the threadpool's control taskprocessor thread.
* \param pool The pool that is expanding
* \delta The number of threads to add to the pool
*/
static void grow(struct ast_threadpool *pool, int delta)
{
int i;
ast_debug(1, "Going to increase threadpool size by %d\n", delta);
for (i = 0; i < delta; ++i) {
struct worker_thread *worker = worker_thread_alloc(pool);
if (!worker) {
return;
}
ao2_link(pool->active_threads, worker);
ao2_ref(worker, -1);
}
}
/*!
* \brief ao2 callback to kill a set number of threads.
*
......@@ -680,7 +685,7 @@ static void shrink(struct ast_threadpool *pool, int delta)
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
kill_threads, &idle_threads_to_kill);
ast_debug(1, "Goign to kill off %d active threads\n", active_threads_to_zombify);
ast_debug(1, "Going to kill off %d active threads\n", active_threads_to_zombify);
ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
zombify_threads, pool, &active_threads_to_zombify);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment