Newer
Older
Dwayne M. Hubbard
committed
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2007-2013, Digium, Inc.
Dwayne M. Hubbard
committed
*
* Dwayne M. Hubbard <dhubbard@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
Dwayne M. Hubbard
committed
* \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
*
* \author Dwayne Hubbard <dhubbard@digium.com>
*/
/*** MODULEINFO
<support_level>core</support_level>
***/
Dwayne M. Hubbard
committed
#include "asterisk.h"
Dwayne M. Hubbard
committed
#include "asterisk/_private.h"
#include "asterisk/module.h"
#include "asterisk/time.h"
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/sem.h"
Dwayne M. Hubbard
committed
/*!
* \brief tps_task structure is queued to a taskprocessor
Dwayne M. Hubbard
committed
*
* tps_tasks are processed in FIFO order and freed by the taskprocessing
* thread after the task handler returns. The callback function that is assigned
* to the execute() function pointer is responsible for releasing datap resources if necessary.
*/
Dwayne M. Hubbard
committed
struct tps_task {
/*! \brief The execute() task callback function pointer */
union {
int (*execute)(void *datap);
int (*execute_local)(struct ast_taskprocessor_local *local);
} callback;
Dwayne M. Hubbard
committed
/*! \brief The data pointer for the task execute() function */
void *datap;
/*! \brief AST_LIST_ENTRY overhead */
AST_LIST_ENTRY(tps_task) list;
unsigned int wants_local:1;
Dwayne M. Hubbard
committed
};
/*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
struct tps_taskprocessor_stats {
/*! \brief This is the maximum number of tasks queued at any one time */
unsigned long max_qsize;
/*! \brief This is the current number of tasks processed */
unsigned long _tasks_processed_count;
};
/*! \brief A ast_taskprocessor structure is a singleton by name */
struct ast_taskprocessor {
/*! \brief Taskprocessor statistics */
struct tps_taskprocessor_stats stats;
Dwayne M. Hubbard
committed
/*! \brief Taskprocessor current queue size */
long tps_queue_size;
/*! \brief Taskprocessor low water clear alert level */
long tps_queue_low;
/*! \brief Taskprocessor high water alert trigger level */
long tps_queue_high;
Dwayne M. Hubbard
committed
/*! \brief Taskprocessor queue */
AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
struct ast_taskprocessor_listener *listener;
/*! Current thread executing the tasks */
pthread_t thread;
/*! Indicates if the taskprocessor is currently executing a task */
unsigned int executing:1;
/*! Indicates that a high water warning has been issued on this task processor */
unsigned int high_water_warned:1;
/*! Indicates that a high water alert is active on this taskprocessor */
unsigned int high_water_alert:1;
/*! Indicates if the taskprocessor is currently suspended */
unsigned int suspended:1;
/*! \brief Friendly name of the taskprocessor */
char name[0];
Dwayne M. Hubbard
committed
};
/*!
* \brief A listener for taskprocessors
*
* \since 12.0.0
*
* When a taskprocessor's state changes, the listener
* is notified of the change. This allows for tasks
* to be addressed in whatever way is appropriate for
* the module using the taskprocessor.
*/
struct ast_taskprocessor_listener {
/*! The callbacks the taskprocessor calls into to notify of state changes */
const struct ast_taskprocessor_listener_callbacks *callbacks;
/*! The taskprocessor that the listener is listening to */
struct ast_taskprocessor *tps;
/*! Data private to the listener */
void *user_data;
};
#ifdef LOW_MEMORY
#define TPS_MAX_BUCKETS 61
#else
/*! \brief Number of buckets in the tps_singletons container. */
#define TPS_MAX_BUCKETS 1567
#endif
Dwayne M. Hubbard
committed
/*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
static struct ao2_container *tps_singletons;
/*! \brief CLI <example>taskprocessor ping <blah></example> operation requires a ping condition */
Dwayne M. Hubbard
committed
static ast_cond_t cli_ping_cond;
/*! \brief CLI <example>taskprocessor ping <blah></example> operation requires a ping condition lock */
AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
Dwayne M. Hubbard
committed
/*! \brief The astobj2 hash callback for taskprocessors */
static int tps_hash_cb(const void *obj, const int flags);
/*! \brief The astobj2 compare callback for taskprocessors */
static int tps_cmp_cb(void *obj, void *arg, int flags);
Dwayne M. Hubbard
committed
/*! \brief CLI <example>taskprocessor ping <blah></example> handler function */
Dwayne M. Hubbard
committed
static int tps_ping_handler(void *datap);
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static struct ast_cli_entry taskprocessor_clis[] = {
AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
Dwayne M. Hubbard
committed
AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
};
struct default_taskprocessor_listener_pvt {
pthread_t poll_thread;
int dead;
struct ast_sem sem;
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
ast_assert(pvt->dead);
ast_sem_destroy(&pvt->sem);
ast_free(pvt);
static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
default_listener_pvt_destroy(pvt);
listener->user_data = NULL;
/*!
* \brief Function that processes tasks in the taskprocessor
* \internal
*/
static void *default_tps_processing_function(void *data)
{
struct ast_taskprocessor_listener *listener = data;
struct ast_taskprocessor *tps = listener->tps;
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
int sem_value;
int res;
while (!pvt->dead) {
res = ast_sem_wait(&pvt->sem);
if (res != 0 && errno != EINTR) {
ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
strerror(errno));
/* Just give up */
break;
ast_taskprocessor_execute(tps);
/* No posting to a dead taskprocessor! */
res = ast_sem_getvalue(&pvt->sem, &sem_value);
ast_assert(res == 0 && sem_value == 0);
/* Free the shutdown reference (see default_listener_shutdown) */
ao2_t_ref(listener->tps, -1, "tps-shutdown");
Loading
Loading full blame...