Skip to content
Snippets Groups Projects
Commit cc63d2c3 authored by Mark Michelson's avatar Mark Michelson
Browse files

Add better listener support.

Add some parameters to listener callbacks.
Add alloc and destroy callbacks for listeners.
Add public function for allocating a listener.



git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377226 65c4cc65-6c06-0410-ace0-fbb531ad65f3
parent 4acb2070
No related branches found
No related tags found
No related merge requests found
...@@ -25,6 +25,15 @@ struct ast_taskprocessor; ...@@ -25,6 +25,15 @@ struct ast_taskprocessor;
struct ast_threadpool_listener; struct ast_threadpool_listener;
struct ast_threadpool_listener_callbacks { struct ast_threadpool_listener_callbacks {
/*!
* \brief Allocate the listener's private data
*
* It is not necessary to assign the private data to the listener.
* \param listener The listener the private data will belong to
* \retval NULL Failure to allocate private data
* \retval non-NULL The newly allocated private data
*/
void *(*alloc)(struct ast_threadpool_listener *listener);
/*! /*!
* \brief Indicates that the state of threads in the pool has changed * \brief Indicates that the state of threads in the pool has changed
* *
...@@ -32,23 +41,31 @@ struct ast_threadpool_listener_callbacks { ...@@ -32,23 +41,31 @@ struct ast_threadpool_listener_callbacks {
* \param active_threads The number of active threads in the pool * \param active_threads The number of active threads in the pool
* \param idle_threads The number of idle threads in the pool * \param idle_threads The number of idle threads in the pool
*/ */
void (*state_changed)(struct ast_threadpool_listener *listener, void (*state_changed)(struct ast_threadpool *pool,
struct ast_threadpool_listener *listener,
int active_threads, int active_threads,
int idle_threads); int idle_threads);
/*! /*!
* \brief Indicates that a task was pushed to the threadpool's taskprocessor * \brief Indicates that a task was pushed to the threadpool
* *
* \param listener The threadpool listener * \param listener The threadpool listener
* \param was_empty Indicates whether the taskprocessor was empty prior to adding the task * \param was_empty Indicates whether there were any tasks prior to adding the new one.
*/ */
void (*tps_task_pushed)(struct ast_threadpool_listener *listener, void (*task_pushed)(struct ast_threadpool *pool,
struct ast_threadpool_listener *listener,
int was_empty); int was_empty);
/*! /*!
* \brief Indicates the threadpoo's taskprocessor has become empty * \brief Indicates the threadpoo's taskprocessor has become empty
* *
* \param listener The threadpool's listener * \param listener The threadpool's listener
*/ */
void (*emptied)(struct ast_threadpool_listener *listener); void (*emptied)(struct ast_threadpool *pool, struct ast_threadpool_listener *listener);
/*!
* \brief Free the listener's private data
* \param private_data The private data to destroy
*/
void (*destroy)(void *private_data);
}; };
/*! /*!
...@@ -60,13 +77,24 @@ struct ast_threadpool_listener_callbacks { ...@@ -60,13 +77,24 @@ struct ast_threadpool_listener_callbacks {
*/ */
struct ast_threadpool_listener { struct ast_threadpool_listener {
/*! Callbacks called by the threadpool */ /*! Callbacks called by the threadpool */
struct ast_threadpool_listener_callbacks *callbacks; const struct ast_threadpool_listener_callbacks *callbacks;
/*! Handle to the threadpool */
struct ast_threadpool *threadpool;
/*! User data for the listener */ /*! User data for the listener */
void *private_data; void *private_data;
}; };
/*!
* \brief Allocate a threadpool listener
*
* This function will call back into the alloc callback for the
* listener.
*
* \param callbacks Listener callbacks to assign to the listener
* \retval NULL Failed to allocate the listener
* \retval non-NULL The newly-created threadpool listener
*/
struct ast_threadpool_listener *ast_threadpool_listener_alloc(
const struct ast_threadpool_listener_callbacks *callbacks);
/*! /*!
* \brief Create a new threadpool * \brief Create a new threadpool
* *
......
...@@ -123,7 +123,7 @@ static void threadpool_send_state_changed(struct ast_threadpool *pool) ...@@ -123,7 +123,7 @@ static void threadpool_send_state_changed(struct ast_threadpool *pool)
int active_size = ao2_container_count(pool->active_threads); int active_size = ao2_container_count(pool->active_threads);
int idle_size = ao2_container_count(pool->idle_threads); int idle_size = ao2_container_count(pool->idle_threads);
pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size); pool->listener->callbacks->state_changed(pool, pool->listener, active_size, idle_size);
} }
/*! /*!
...@@ -232,11 +232,6 @@ static int threadpool_execute(struct ast_threadpool *pool) ...@@ -232,11 +232,6 @@ static int threadpool_execute(struct ast_threadpool *pool)
static void threadpool_destructor(void *obj) static void threadpool_destructor(void *obj)
{ {
struct ast_threadpool *pool = obj; struct ast_threadpool *pool = obj;
/* XXX Probably should let the listener know we're being destroyed? */
/* Threads should all be shut down by now, so this should be a painless
* operation
*/
ao2_cleanup(pool->listener); ao2_cleanup(pool->listener);
} }
...@@ -342,7 +337,7 @@ static int handle_task_pushed(void *data) ...@@ -342,7 +337,7 @@ static int handle_task_pushed(void *data)
struct ast_threadpool *pool = tpd->pool; struct ast_threadpool *pool = tpd->pool;
int was_empty = tpd->was_empty; int was_empty = tpd->was_empty;
pool->listener->callbacks->tps_task_pushed(pool->listener, was_empty); pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE, ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
activate_threads, pool); activate_threads, pool);
ao2_ref(tpd, -1); ao2_ref(tpd, -1);
...@@ -382,7 +377,7 @@ static int handle_emptied(void *data) ...@@ -382,7 +377,7 @@ static int handle_emptied(void *data)
{ {
struct ast_threadpool *pool = data; struct ast_threadpool *pool = data;
pool->listener->callbacks->emptied(pool->listener); pool->listener->callbacks->emptied(pool, pool->listener);
return 0; return 0;
} }
...@@ -587,6 +582,29 @@ void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size) ...@@ -587,6 +582,29 @@ void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd); ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
} }
static void listener_destructor(void *obj)
{
struct ast_threadpool_listener *listener = obj;
listener->callbacks->destroy(listener->private_data);
}
struct ast_threadpool_listener *ast_threadpool_listener_alloc(
const struct ast_threadpool_listener_callbacks *callbacks)
{
struct ast_threadpool_listener *listener = ao2_alloc(sizeof(*listener), listener_destructor);
if (!listener) {
return NULL;
}
listener->callbacks = callbacks;
listener->private_data = listener->callbacks->alloc(listener);
if (!listener->private_data) {
ao2_ref(listener, -1);
return NULL;
}
return listener;
}
struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size) struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
{ {
struct ast_threadpool *pool; struct ast_threadpool *pool;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment