diff --git a/main/threadpool.c b/main/threadpool.c index e3d0e40fd364c0f60ecabfb8910814aefc732dee..e7abc8f8cf8f4d7a5eaf99925b56ff08999d142a 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -200,10 +200,10 @@ struct thread_worker_pair { /*! * \brief Destructor for thread_worker_pair */ -static void thread_worker_pair_destructor(void *obj) +static void thread_worker_pair_free(struct thread_worker_pair *pair) { - struct thread_worker_pair *pair = obj; ao2_ref(pair->worker, -1); + ast_free(pair); } /*! @@ -214,13 +214,14 @@ static void thread_worker_pair_destructor(void *obj) static struct thread_worker_pair *thread_worker_pair_alloc(struct ast_threadpool *pool, struct worker_thread *worker) { - struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor); + struct thread_worker_pair *pair = ast_malloc(sizeof(*pair)); if (!pair) { return NULL; } pair->pool = pool; ao2_ref(worker, +1); pair->worker = worker; + return pair; } @@ -240,7 +241,7 @@ static int queued_active_thread_idle(void *data) threadpool_send_state_changed(pair->pool); - ao2_ref(pair, -1); + thread_worker_pair_free(pair); return 0; } @@ -257,14 +258,19 @@ static void threadpool_active_thread_idle(struct ast_threadpool *pool, { struct thread_worker_pair *pair; SCOPED_AO2LOCK(lock, pool); + if (pool->shutting_down) { return; } + pair = thread_worker_pair_alloc(pool, worker); if (!pair) { return; } - ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair); + + if (ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair)) { + thread_worker_pair_free(pair); + } } /*! @@ -282,7 +288,7 @@ static int queued_zombie_thread_dead(void *data) ao2_unlink(pair->pool->zombie_threads, pair->worker); threadpool_send_state_changed(pair->pool); - ao2_ref(pair, -1); + thread_worker_pair_free(pair); return 0; } @@ -297,14 +303,19 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool, { struct thread_worker_pair *pair; SCOPED_AO2LOCK(lock, pool); + if (pool->shutting_down) { return; } + pair = thread_worker_pair_alloc(pool, worker); if (!pair) { return; } - ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair); + + if (ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair)) { + thread_worker_pair_free(pair); + } } static int queued_idle_thread_dead(void *data) @@ -314,7 +325,7 @@ static int queued_idle_thread_dead(void *data) ao2_unlink(pair->pool->idle_threads, pair->worker); threadpool_send_state_changed(pair->pool); - ao2_ref(pair, -1); + thread_worker_pair_free(pair); return 0; } @@ -323,14 +334,19 @@ static void threadpool_idle_thread_dead(struct ast_threadpool *pool, { struct thread_worker_pair *pair; SCOPED_AO2LOCK(lock, pool); + if (pool->shutting_down) { return; } + pair = thread_worker_pair_alloc(pool, worker); if (!pair) { return; } - ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair); + + if (ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair)) { + thread_worker_pair_free(pair); + } } /*! @@ -447,7 +463,7 @@ struct task_pushed_data { static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool, int was_empty) { - struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd), NULL); + struct task_pushed_data *tpd = ast_malloc(sizeof(*tpd)); if (!tpd) { return NULL; @@ -549,6 +565,8 @@ static int queued_task_pushed(void *data) int was_empty = tpd->was_empty; unsigned int existing_active; + ast_free(tpd); + if (pool->listener && pool->listener->callbacks->task_pushed) { pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty); } @@ -565,7 +583,6 @@ static int queued_task_pushed(void *data) /* If no idle threads could be transitioned to active grow the pool as permitted. */ if (ao2_container_count(pool->active_threads) == existing_active) { if (!pool->options.auto_increment) { - ao2_ref(tpd, -1); return 0; } grow(pool, pool->options.auto_increment); @@ -575,7 +592,6 @@ static int queued_task_pushed(void *data) } threadpool_send_state_changed(pool); - ao2_ref(tpd, -1); return 0; } @@ -598,12 +614,15 @@ static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listen if (pool->shutting_down) { return; } + tpd = task_pushed_data_alloc(pool, was_empty); if (!tpd) { return; } - ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd); + if (ast_taskprocessor_push(pool->control_tps, queued_task_pushed, tpd)) { + ast_free(tpd); + } } /*! @@ -790,7 +809,7 @@ struct set_size_data { static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool, unsigned int size) { - struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), NULL); + struct set_size_data *ssd = ast_malloc(sizeof(*ssd)); if (!ssd) { return NULL; } @@ -813,7 +832,7 @@ static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool, */ static int queued_set_size(void *data) { - RAII_VAR(struct set_size_data *, ssd, data, ao2_cleanup); + struct set_size_data *ssd = data; struct ast_threadpool *pool = ssd->pool; unsigned int num_threads = ssd->size; @@ -821,6 +840,8 @@ static int queued_set_size(void *data) unsigned int current_size = ao2_container_count(pool->active_threads) + ao2_container_count(pool->idle_threads); + ast_free(ssd); + if (current_size == num_threads) { ast_debug(3, "Not changing threadpool size since new size %u is the same as current %u\n", num_threads, current_size); @@ -849,6 +870,7 @@ void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size) { struct set_size_data *ssd; SCOPED_AO2LOCK(lock, pool); + if (pool->shutting_down) { return; } @@ -858,7 +880,9 @@ void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size) return; } - ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd); + if (ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd)) { + ast_free(ssd); + } } struct ast_threadpool_listener *ast_threadpool_listener_alloc(