Skip to content
Snippets Groups Projects
taskprocessor.c 27.1 KiB
Newer Older
/*
 * Asterisk -- An open source telephony toolkit.
 *
 * Copyright (C) 2007-2013, Digium, Inc.
 *
 * Dwayne M. Hubbard <dhubbard@digium.com>
 *
 * See http://www.asterisk.org for more information about
 * the Asterisk project. Please do not directly contact
 * any of the maintainers of this project for assistance;
 * the project provides a web site, mailing lists and IRC
 * channels for your use.
 *
 * This program is free software, distributed under the terms of
 * the GNU General Public License Version 2. See the LICENSE file
 * at the top of the source tree.
 */
 * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
 *
 * \author Dwayne Hubbard <dhubbard@digium.com>
 */

/*** MODULEINFO
	<support_level>core</support_level>
 ***/

#include "asterisk/_private.h"
#include "asterisk/module.h"
#include "asterisk/time.h"
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/sem.h"
/*!
 * \brief tps_task structure is queued to a taskprocessor
 *
 * tps_tasks are processed in FIFO order and freed by the taskprocessing
 * thread after the task handler returns.  The callback function that is assigned
 * to the execute() function pointer is responsible for releasing datap resources if necessary.
 */
struct tps_task {
	/*! \brief The execute() task callback function pointer */
	union {
		int (*execute)(void *datap);
		int (*execute_local)(struct ast_taskprocessor_local *local);
	} callback;
	/*! \brief The data pointer for the task execute() function */
	void *datap;
	/*! \brief AST_LIST_ENTRY overhead */
	AST_LIST_ENTRY(tps_task) list;
	unsigned int wants_local:1;
};

/*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
struct tps_taskprocessor_stats {
	/*! \brief This is the maximum number of tasks queued at any one time */
	unsigned long max_qsize;
	/*! \brief This is the current number of tasks processed */
	unsigned long _tasks_processed_count;
};

/*! \brief A ast_taskprocessor structure is a singleton by name */
struct ast_taskprocessor {
	/*! \brief Taskprocessor statistics */
	struct tps_taskprocessor_stats stats;
	/*! \brief Taskprocessor current queue size */
	/*! \brief Taskprocessor low water clear alert level */
	long tps_queue_low;
	/*! \brief Taskprocessor high water alert trigger level */
	long tps_queue_high;
	AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
	struct ast_taskprocessor_listener *listener;
	/*! Current thread executing the tasks */
	pthread_t thread;
	/*! Indicates if the taskprocessor is currently executing a task */
	unsigned int executing:1;
	/*! Indicates that a high water warning has been issued on this task processor */
	unsigned int high_water_warned:1;
	/*! Indicates that a high water alert is active on this taskprocessor */
	unsigned int high_water_alert:1;
	/*! Indicates if the taskprocessor is currently suspended */
	unsigned int suspended:1;
	/*! \brief Friendly name of the taskprocessor */
	char name[0];

/*!
 * \brief A listener for taskprocessors
 *
 * \since 12.0.0
 *
 * When a taskprocessor's state changes, the listener
 * is notified of the change. This allows for tasks
 * to be addressed in whatever way is appropriate for
 * the module using the taskprocessor.
 */
struct ast_taskprocessor_listener {
	/*! The callbacks the taskprocessor calls into to notify of state changes */
	const struct ast_taskprocessor_listener_callbacks *callbacks;
	/*! The taskprocessor that the listener is listening to */
	struct ast_taskprocessor *tps;
	/*! Data private to the listener */
	void *user_data;
};

#ifdef LOW_MEMORY
#define TPS_MAX_BUCKETS 61
#else
/*! \brief Number of buckets in the tps_singletons container. */
#define TPS_MAX_BUCKETS 1567
#endif

/*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
static struct ao2_container *tps_singletons;

/*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition */
/*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition lock */
AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);

/*! \brief The astobj2 hash callback for taskprocessors */
static int tps_hash_cb(const void *obj, const int flags);
/*! \brief The astobj2 compare callback for taskprocessors */
static int tps_cmp_cb(void *obj, void *arg, int flags);
/*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
static int tps_ping_handler(void *datap);

static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);

static struct ast_cli_entry taskprocessor_clis[] = {
	AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
	AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
};

struct default_taskprocessor_listener_pvt {
	pthread_t poll_thread;
	int dead;
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
	ast_assert(pvt->dead);
	ast_sem_destroy(&pvt->sem);
	ast_free(pvt);
static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;

	default_listener_pvt_destroy(pvt);

	listener->user_data = NULL;
/*!
 * \brief Function that processes tasks in the taskprocessor
 * \internal
 */
static void *default_tps_processing_function(void *data)
{
	struct ast_taskprocessor_listener *listener = data;
	struct ast_taskprocessor *tps = listener->tps;
	struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
	int sem_value;
	int res;

	while (!pvt->dead) {
		res = ast_sem_wait(&pvt->sem);
		if (res != 0 && errno != EINTR) {
			ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
				strerror(errno));
			/* Just give up */
			break;
		ast_taskprocessor_execute(tps);

	/* No posting to a dead taskprocessor! */
	res = ast_sem_getvalue(&pvt->sem, &sem_value);
	ast_assert(res == 0 && sem_value == 0);

	/* Free the shutdown reference (see default_listener_shutdown) */
	ao2_t_ref(listener->tps, -1, "tps-shutdown");

Loading
Loading full blame...