Newer
Older
Dwayne M. Hubbard
committed
ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
return NULL;
}
ao2_lock(tps_singletons);
p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
if (p || (create & TPS_REF_IF_EXISTS)) {
Dwayne M. Hubbard
committed
/* calling function does not want a new taskprocessor to be created if it doesn't already exist */
ao2_unlock(tps_singletons);
return p;
Dwayne M. Hubbard
committed
}
/* Create a new taskprocessor. Start by creating a default listener */
pvt = default_listener_pvt_alloc();
if (!pvt) {
ao2_unlock(tps_singletons);
return NULL;
}
listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
if (!listener) {
ao2_unlock(tps_singletons);
default_listener_pvt_destroy(pvt);
return NULL;
}
p = __allocate_taskprocessor(name, listener);
ao2_unlock(tps_singletons);
p = __start_taskprocessor(p);
return p;
}
struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
{
struct ast_taskprocessor *p;
ao2_lock(tps_singletons);
p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
ao2_unlock(tps_singletons);
ast_taskprocessor_unreference(p);
return NULL;
}
p = __allocate_taskprocessor(name, listener);
ao2_unlock(tps_singletons);
return __start_taskprocessor(p);
Dwayne M. Hubbard
committed
}
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
void *local_data)
{
SCOPED_AO2LOCK(lock, tps);
tps->local_data = local_data;
}
Dwayne M. Hubbard
committed
/* decrement the taskprocessor reference count and unlink from the container if necessary */
void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
{
if (!tps) {
return NULL;
Dwayne M. Hubbard
committed
}
/* To prevent another thread from finding and getting a reference to this
* taskprocessor we hold the singletons lock. If we didn't do this then
* they may acquire it and find that the listener has been shut down.
*/
ao2_lock(tps_singletons);
if (ao2_ref(tps, -1) > 3) {
ao2_unlock(tps_singletons);
/* If we're down to 3 references, then those must be:
* 1. The reference we just got rid of
* 2. The container
* 3. The listener
*/
ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
ao2_unlock(tps_singletons);
listener_shutdown(tps->listener);
Dwayne M. Hubbard
committed
return NULL;
}
/* push the task into the taskprocessor queue */
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
Dwayne M. Hubbard
committed
{
Dwayne M. Hubbard
committed
if (!tps) {
ast_log(LOG_ERROR, "tps is NULL!\n");
Dwayne M. Hubbard
committed
return -1;
}
if (!t) {
ast_log(LOG_ERROR, "t is NULL!\n");
Dwayne M. Hubbard
committed
return -1;
}
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
if (tps->tps_queue_high <= tps->tps_queue_size) {
if (!tps->high_water_alert) {
ast_log(LOG_WARNING, "The '%s' task processor queue reached %ld scheduled tasks%s.\n",
tps->name, tps->tps_queue_size, tps->high_water_warned ? " again" : "");
tps->high_water_warned = 1;
tps->high_water_alert = 1;
tps_alert_add(tps, +1);
}
/* The currently executing task counts as still in queue */
was_empty = tps->executing ? 0 : previous_size == 0;
tps->listener->callbacks->task_pushed(tps->listener, was_empty);
Dwayne M. Hubbard
committed
return 0;
}
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
{
return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
}
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
{
return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
}
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
{
if (tps) {
ao2_lock(tps);
tps->suspended = 1;
ao2_unlock(tps);
return 0;
}
return -1;
}
int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
{
if (tps) {
ao2_lock(tps);
tps->suspended = 0;
ao2_unlock(tps);
return 0;
}
return -1;
}
int ast_taskprocessor_is_suspended(struct ast_taskprocessor *tps)
{
return tps ? tps->suspended : -1;
}
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
{
struct ast_taskprocessor_local local;
long size;
t = tps_taskprocessor_pop(tps);
if (!t) {
ao2_unlock(tps);
return 0;
}
tps->thread = pthread_self();
if (t->wants_local) {
local.local_data = tps->local_data;
local.data = t->datap;
}
ao2_unlock(tps);
if (t->wants_local) {
t->callback.execute_local(&local);
} else {
t->callback.execute(t->datap);
tps->thread = AST_PTHREADT_NULL;
/* We need to check size in the same critical section where we reset the
* executing bit. Avoids a race condition where a task is pushed right
* after we pop an empty stack.
*/
tps->executing = 0;
size = ast_taskprocessor_size(tps);
/* Update the stats */
++tps->stats._tasks_processed_count;
/* Include the task we just executed as part of the queue size. */
if (size >= tps->stats.max_qsize) {
tps->stats.max_qsize = size + 1;
/* If we executed a task, check for the transition to empty */
if (size == 0 && tps->listener->callbacks->emptied) {
tps->listener->callbacks->emptied(tps->listener);
}
int ast_taskprocessor_is_task(struct ast_taskprocessor *tps)
{
int is_task;
ao2_lock(tps);
is_task = pthread_equal(tps->thread, pthread_self());
ao2_unlock(tps);
return is_task;
}
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
unsigned int ast_taskprocessor_seq_num(void)
{
static int seq_num;
return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
}
void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format, ...)
{
va_list ap;
int user_size;
#define SEQ_STR_SIZE (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */
ast_assert(buf != NULL);
ast_assert(SEQ_STR_SIZE <= size);
va_start(ap, format);
user_size = vsnprintf(buf, size - (SEQ_STR_SIZE - 1), format, ap);
va_end(ap);
if (user_size < 0) {
/*
* Wow! We got an output error to a memory buffer.
* Assume no user part of name written.
*/
user_size = 0;
} else if (size < user_size + SEQ_STR_SIZE) {
/* Truncate user part of name to make sequence number fit. */
user_size = size - SEQ_STR_SIZE;
}
/* Append sequence number to end of user name. */
snprintf(buf + user_size, SEQ_STR_SIZE, "-%08x", ast_taskprocessor_seq_num());
}