Skip to content
Snippets Groups Projects
taskprocessor.c 33.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • 		ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
    		return NULL;
    	}
    
    	ao2_lock(tps_singletons);
    	p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
    	if (p || (create & TPS_REF_IF_EXISTS)) {
    
    		/* calling function does not want a new taskprocessor to be created if it doesn't already exist */
    
    		ao2_unlock(tps_singletons);
    		return p;
    
    	/* Create a new taskprocessor. Start by creating a default listener */
    
    	pvt = default_listener_pvt_alloc();
    	if (!pvt) {
    
    		ao2_unlock(tps_singletons);
    
    		return NULL;
    	}
    	listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
    
    		ao2_unlock(tps_singletons);
    
    		default_listener_pvt_destroy(pvt);
    
    	p = __allocate_taskprocessor(name, listener);
    
    	ao2_unlock(tps_singletons);
    	p = __start_taskprocessor(p);
    
    	ao2_ref(listener, -1);
    
    	return p;
    }
    
    struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
    {
    
    	struct ast_taskprocessor *p;
    
    	ao2_lock(tps_singletons);
    	p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
    
    		ao2_unlock(tps_singletons);
    
    		ast_taskprocessor_unreference(p);
    		return NULL;
    	}
    
    
    	p = __allocate_taskprocessor(name, listener);
    	ao2_unlock(tps_singletons);
    
    	return __start_taskprocessor(p);
    
    void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
    	void *local_data)
    {
    	SCOPED_AO2LOCK(lock, tps);
    	tps->local_data = local_data;
    }
    
    
    /* decrement the taskprocessor reference count and unlink from the container if necessary */
    void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
    {
    
    	if (!tps) {
    		return NULL;
    
    	/* To prevent another thread from finding and getting a reference to this
    	 * taskprocessor we hold the singletons lock. If we didn't do this then
    	 * they may acquire it and find that the listener has been shut down.
    	 */
    	ao2_lock(tps_singletons);
    
    
    	if (ao2_ref(tps, -1) > 3) {
    
    	/* If we're down to 3 references, then those must be:
    	 * 1. The reference we just got rid of
    	 * 2. The container
    	 * 3. The listener
    	 */
    
    	ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
    	ao2_unlock(tps_singletons);
    
    
    	listener_shutdown(tps->listener);
    
    /* push the task into the taskprocessor queue */
    
    static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
    
    	int previous_size;
    
    	if (!tps) {
    		ast_log(LOG_ERROR, "tps is NULL!\n");
    
    
    	if (!t) {
    		ast_log(LOG_ERROR, "t is NULL!\n");
    
    	AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
    
    	previous_size = tps->tps_queue_size++;
    
    	if (tps->tps_queue_high <= tps->tps_queue_size) {
    
    		if (!tps->high_water_alert) {
    
    			ast_log(LOG_WARNING, "The '%s' task processor queue reached %ld scheduled tasks%s.\n",
    				tps->name, tps->tps_queue_size, tps->high_water_warned ? " again" : "");
    			tps->high_water_warned = 1;
    
    			tps->high_water_alert = 1;
    			tps_alert_add(tps, +1);
    		}
    
    	/* The currently executing task counts as still in queue */
    	was_empty = tps->executing ? 0 : previous_size == 0;
    
    	tps->listener->callbacks->task_pushed(tps->listener, was_empty);
    
    int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
    {
    	return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
    }
    
    int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
    {
    	return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
    }
    
    
    int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
    {
    	if (tps) {
    		ao2_lock(tps);
    		tps->suspended = 1;
    		ao2_unlock(tps);
    		return 0;
    	}
    	return -1;
    }
    
    int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
    {
    	if (tps) {
    		ao2_lock(tps);
    		tps->suspended = 0;
    		ao2_unlock(tps);
    		return 0;
    	}
    	return -1;
    }
    
    int ast_taskprocessor_is_suspended(struct ast_taskprocessor *tps)
    {
    	return tps ? tps->suspended : -1;
    }
    
    
    int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
    {
    
    	struct ast_taskprocessor_local local;
    
    	struct tps_task *t;
    
    	t = tps_taskprocessor_pop(tps);
    	if (!t) {
    		ao2_unlock(tps);
    		return 0;
    	}
    
    
    	tps->thread = pthread_self();
    
    	tps->executing = 1;
    
    	if (t->wants_local) {
    		local.local_data = tps->local_data;
    		local.data = t->datap;
    	}
    	ao2_unlock(tps);
    
    	if (t->wants_local) {
    		t->callback.execute_local(&local);
    	} else {
    		t->callback.execute(t->datap);
    
    	tps->thread = AST_PTHREADT_NULL;
    
    	/* We need to check size in the same critical section where we reset the
    	 * executing bit. Avoids a race condition where a task is pushed right
    	 * after we pop an empty stack.
    	 */
    	tps->executing = 0;
    
    	size = ast_taskprocessor_size(tps);
    
    	++tps->stats._tasks_processed_count;
    
    	/* Include the task we just executed as part of the queue size. */
    	if (size >= tps->stats.max_qsize) {
    		tps->stats.max_qsize = size + 1;
    
    	/* If we executed a task, check for the transition to empty */
    
    	if (size == 0 && tps->listener->callbacks->emptied) {
    
    		tps->listener->callbacks->emptied(tps->listener);
    	}
    
    	return size > 0;
    
    
    int ast_taskprocessor_is_task(struct ast_taskprocessor *tps)
    {
    	int is_task;
    
    	ao2_lock(tps);
    	is_task = pthread_equal(tps->thread, pthread_self());
    	ao2_unlock(tps);
    	return is_task;
    }
    
    
    unsigned int ast_taskprocessor_seq_num(void)
    {
    	static int seq_num;
    
    	return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
    }
    
    void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format, ...)
    {
    	va_list ap;
    	int user_size;
    #define SEQ_STR_SIZE (1 + 8 + 1)	/* Dash plus 8 hex digits plus null terminator */
    
    	ast_assert(buf != NULL);
    	ast_assert(SEQ_STR_SIZE <= size);
    
    	va_start(ap, format);
    	user_size = vsnprintf(buf, size - (SEQ_STR_SIZE - 1), format, ap);
    	va_end(ap);
    	if (user_size < 0) {
    		/*
    		 * Wow!  We got an output error to a memory buffer.
    		 * Assume no user part of name written.
    		 */
    		user_size = 0;
    	} else if (size < user_size + SEQ_STR_SIZE) {
    		/* Truncate user part of name to make sequence number fit. */
    		user_size = size - SEQ_STR_SIZE;
    	}
    
    	/* Append sequence number to end of user name. */
    	snprintf(buf + user_size, SEQ_STR_SIZE, "-%08x", ast_taskprocessor_seq_num());
    }