diff --git a/main/threadpool.c b/main/threadpool.c index 1ff76014afb96de83e51c25835ef24f0b1b7ec09..6cb241bcf43c684a44837275d6b01eafbb6404fa 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -510,7 +510,7 @@ static void grow(struct ast_threadpool *pool, int delta) if (!worker) { return; } - if (ao2_link(pool->active_threads, worker)) { + if (ao2_link(pool->idle_threads, worker)) { if (worker_thread_start(worker)) { ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id); ao2_unlink(pool->active_threads, worker); @@ -536,24 +536,21 @@ static int queued_task_pushed(void *data) struct task_pushed_data *tpd = data; struct ast_threadpool *pool = tpd->pool; int was_empty = tpd->was_empty; - int state_changed; if (pool->listener && pool->listener->callbacks->task_pushed) { pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty); } if (ao2_container_count(pool->idle_threads) == 0) { - if (pool->options.auto_increment > 0) { - grow(pool, pool->options.auto_increment); - state_changed = 1; + if (!pool->options.auto_increment) { + return 0; } - } else { - ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, - activate_thread, pool); - state_changed = 1; - } - if (state_changed) { - threadpool_send_state_changed(pool); + grow(pool, pool->options.auto_increment); } + + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA, + activate_thread, pool); + + threadpool_send_state_changed(pool); ao2_ref(tpd, -1); return 0; } @@ -808,6 +805,8 @@ static int queued_set_size(void *data) if (current_size < num_threads) { grow(pool, num_threads - current_size); + ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, + activate_thread, pool); } else { shrink(pool, current_size - num_threads); } @@ -986,7 +985,31 @@ static void *worker_start(void *arg) if (worker->options.thread_start) { worker->options.thread_start(); } - worker_active(worker); + + ast_mutex_lock(&worker->lock); + while (worker_idle(worker)) { + ast_mutex_unlock(&worker->lock); + worker_active(worker); + ast_mutex_lock(&worker->lock); + if (worker->state != ALIVE) { + break; + } + threadpool_active_thread_idle(worker->pool, worker); + } + ast_mutex_unlock(&worker->lock); + + /* Reaching this portion means the thread is + * on death's door. It may have been killed while + * it was idle, in which case it can just die + * peacefully. If it's a zombie, though, then + * it needs to let the pool know so + * that the thread can be removed from the + * list of zombie threads. + */ + if (worker->state == ZOMBIE) { + threadpool_zombie_thread_dead(worker->pool, worker); + } + if (worker->options.thread_end) { worker->options.thread_end(); } @@ -1035,24 +1058,19 @@ static int worker_thread_start(struct worker_thread *worker) */ static void worker_active(struct worker_thread *worker) { - int alive = 1; - while (alive) { - if (!threadpool_execute(worker->pool)) { - alive = worker_idle(worker); - } - } + int alive; - /* Reaching this portion means the thread is - * on death's door. It may have been killed while - * it was idle, in which case it can just die - * peacefully. If it's a zombie, though, then - * it needs to let the pool know so - * that the thread can be removed from the - * list of zombie threads. + /* The following is equivalent to + * + * while (threadpool_execute(worker->pool)); + * + * However, reviewers have suggested in the past + * doing that can cause optimizers to (wrongly) + * optimize the code away. */ - if (worker->state == ZOMBIE) { - threadpool_zombie_thread_dead(worker->pool, worker); - } + do { + alive = threadpool_execute(worker->pool); + } while (alive); } /*! @@ -1061,6 +1079,8 @@ static void worker_active(struct worker_thread *worker) * The worker waits here until it gets told by the threadpool * to wake up. * + * worker is locked before entering this function. + * * \param worker The idle worker * \retval 0 The thread is being woken up so that it can conclude. * \retval non-zero The thread is being woken up to do more work. @@ -1072,15 +1092,10 @@ static int worker_idle(struct worker_thread *worker) .tv_sec = start.tv_sec + worker->options.idle_timeout, .tv_nsec = start.tv_usec * 1000, }; - SCOPED_MUTEX(lock, &worker->lock); - if (worker->state != ALIVE) { - return 0; - } - threadpool_active_thread_idle(worker->pool, worker); while (!worker->wake_up) { if (worker->options.idle_timeout <= 0) { - ast_cond_wait(&worker->cond, lock); - } else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) { + ast_cond_wait(&worker->cond, &worker->lock); + } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) { break; } }