From d5716ecae2238c9e96576870c2146606ab46bc6c Mon Sep 17 00:00:00 2001
From: Mark Michelson <mmichelson@digium.com>
Date: Fri, 9 Nov 2012 22:28:10 +0000
Subject: [PATCH] Genericize the allocation and destruction of taskprocessor
 listeners.

The goal of this is to take the responsibility away from individual
listeners to be sure to properly unref the taskprocessor.



git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@376121 65c4cc65-6c06-0410-ace0-fbb531ad65f3
---
 include/asterisk/taskprocessor.h |  7 +++
 main/astobj2.c                   |  1 +
 main/taskprocessor.c             | 79 +++++++++++++++++++++-----------
 3 files changed, 61 insertions(+), 26 deletions(-)

diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h
index df66f59f07..a92e1f31c2 100644
--- a/include/asterisk/taskprocessor.h
+++ b/include/asterisk/taskprocessor.h
@@ -63,10 +63,14 @@ enum ast_tps_options {
 struct ast_taskprocessor_listener;
 
 struct ast_taskprocessor_listener_callbacks {
+	/*! Allocate the listener's private data */
+	void *(*alloc)(struct ast_taskprocessor_listener *listener);
 	/*! Indicates a task was pushed to the processor */
 	void (*task_pushed)(struct ast_taskprocessor_listener *listener, int was_empty);
 	/*! Indicates the task processor has become empty */
 	void (*emptied)(struct ast_taskprocessor_listener *listener);
+	/*! Destroy the listener's private data */
+	void (*destroy)(void *private_data);
 };
 
 struct ast_taskprocessor_listener {
@@ -75,6 +79,9 @@ struct ast_taskprocessor_listener {
 	void *private_data;
 };
 
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps,
+		struct ast_taskprocessor_listener_callbacks *callbacks);
+
 /*!
  * \brief Get a reference to a taskprocessor with the specified name and create the taskprocessor if necessary
  *
diff --git a/main/astobj2.c b/main/astobj2.c
index 082dfc0384..b36cee837e 100644
--- a/main/astobj2.c
+++ b/main/astobj2.c
@@ -431,6 +431,7 @@ static int internal_ao2_ref(void *user_data, int delta, const char *file, int li
 	int ret;
 
 	if (obj == NULL) {
+		ast_backtrace();
 		ast_assert(0);
 		return -1;
 	}
diff --git a/main/taskprocessor.c b/main/taskprocessor.c
index bd94103d25..4ca01f9ca2 100644
--- a/main/taskprocessor.c
+++ b/main/taskprocessor.c
@@ -131,17 +131,11 @@ static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt,
 	ast_cond_signal(&pvt->cond);
 }
 
-static void default_listener_destroy(void *obj)
+static void listener_destroy(void *obj)
 {
 	struct ast_taskprocessor_listener *listener = obj;
-	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
 
-	default_tps_wake_up(pvt, 1);
-	pthread_join(pvt->poll_thread, NULL);
-	pvt->poll_thread = AST_PTHREADT_NULL;
-	ast_mutex_destroy(&pvt->lock);
-	ast_cond_destroy(&pvt->cond);
-	ast_free(pvt);
+	listener->callbacks->destroy(listener->private_data);
 
 	ao2_ref(listener->tps, -1);
 	listener->tps = NULL;
@@ -173,6 +167,35 @@ static void *tps_processing_function(void *data)
 	return NULL;
 }
 
+static void *default_listener_alloc(struct ast_taskprocessor_listener *listener)
+{
+	struct default_taskprocessor_listener_pvt *pvt;
+
+	pvt = ast_calloc(1, sizeof(*pvt));
+	if (!pvt) {
+		return NULL;
+	}
+	ast_cond_init(&pvt->cond, NULL);
+	ast_mutex_init(&pvt->lock);
+	pvt->poll_thread = AST_PTHREADT_NULL;
+	if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
+		return NULL;
+	}
+	return pvt;
+}
+
+static void default_listener_destroy(void *obj)
+{
+	struct default_taskprocessor_listener_pvt *pvt = obj;
+
+	default_tps_wake_up(pvt, 1);
+	pthread_join(pvt->poll_thread, NULL);
+	pvt->poll_thread = AST_PTHREADT_NULL;
+	ast_mutex_destroy(&pvt->lock);
+	ast_cond_destroy(&pvt->cond);
+	ast_free(pvt);
+}
+
 static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
 {
 	struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
@@ -188,8 +211,10 @@ static void default_emptied(struct ast_taskprocessor_listener *listener)
 }
 
 static struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
+	.alloc = default_listener_alloc,
 	.task_pushed = default_task_pushed,
 	.emptied = default_emptied,
+	.destroy = default_listener_destroy,
 };
 
 /*! \internal \brief Clean up resources on Asterisk shutdown */
@@ -432,29 +457,22 @@ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
 	return tps->name;
 }
 
-static struct ast_taskprocessor_listener *default_listener_alloc(void)
+struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(struct ast_taskprocessor *tps,
+		struct ast_taskprocessor_listener_callbacks *callbacks)
 {
-	struct ast_taskprocessor_listener *listener;
-	struct default_taskprocessor_listener_pvt *pvt;
-
-	listener = ao2_alloc(sizeof(*listener), default_listener_destroy);
+	RAII_VAR(struct ast_taskprocessor_listener *, listener,
+			ao2_alloc(sizeof(*listener), listener_destroy), ao2_cleanup);
+	
 	if (!listener) {
 		return NULL;
 	}
-	pvt = ast_calloc(1, sizeof(*pvt));
-	if (!pvt) {
-		ao2_ref(listener, -1);
-		return NULL;
-	}
-	listener->callbacks = &default_listener_callbacks;
-	listener->private_data = pvt;
-	ast_cond_init(&pvt->cond, NULL);
-	ast_mutex_init(&pvt->lock);
-	pvt->poll_thread = AST_PTHREADT_NULL;
-	if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener) < 0) {
-		ao2_ref(listener, -1);
+	listener->callbacks = callbacks;
+	listener->private_data = listener->callbacks->alloc(listener);
+	if (!listener->private_data) {
 		return NULL;
 	}
+
+	ao2_ref(listener, +1);
 	return listener;
 }
 
@@ -480,9 +498,18 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
 		return NULL;
 	}
 	/* Create a new taskprocessor. Start by creating a default listener */
-	listener = default_listener_alloc();
+	listener = ast_taskprocessor_listener_alloc(p, &default_listener_callbacks);
+	if (!listener) {
+		return NULL;
+	}
 
 	p = ast_taskprocessor_create_with_listener(name, listener);
+	if (!p) {
+		ao2_ref(listener, -1);
+		return NULL;
+	}
+
+	/* Unref listener here since the taskprocessor has gained a reference to the listener */
 	ao2_ref(listener, -1);
 	return p;
 
-- 
GitLab