Skip to content
Snippets Groups Projects
sched.c 15.9 KiB
Newer Older
  • Learn to ignore specific revisions
  • Mark Spencer's avatar
    Mark Spencer committed
    /*
    
     * Asterisk -- An open source telephony toolkit.
     *
    
     * Copyright (C) 1999 - 2010, Digium, Inc.
    
    Russell Bryant's avatar
    Russell Bryant committed
     * Mark Spencer <markster@digium.com>
    
     * Russell Bryant <russell@digium.com>
    
    Mark Spencer's avatar
    Mark Spencer committed
     *
    
     * See http://www.asterisk.org for more information about
     * the Asterisk project. Please do not directly contact
     * any of the maintainers of this project for assistance;
     * the project provides a web site, mailing lists and IRC
     * channels for your use.
     *
     * This program is free software, distributed under the terms of
     * the GNU General Public License Version 2. See the LICENSE file
     * at the top of the source tree.
     */
    
    
    Mark Spencer's avatar
    Mark Spencer committed
     *
    
     * \brief Scheduler Routines (from cheops-NG)
    
    Mark Spencer's avatar
    Mark Spencer committed
     *
    
     * \author Mark Spencer <markster@digium.com>
    
    /*** MODULEINFO
    	<support_level>core</support_level>
     ***/
    
    
    ASTERISK_REGISTER_FILE()
    
    Mark Spencer's avatar
    Mark Spencer committed
    #ifdef DEBUG_SCHEDULER
    
    #define DEBUG(a) do { \
    	if (option_debug) \
    		DEBUG_M(a) \
    	} while (0)
    
    Mark Spencer's avatar
    Mark Spencer committed
    #else
    
    #define DEBUG(a)
    
    Mark Spencer's avatar
    Mark Spencer committed
    #endif
    
    #include <sys/time.h>
    
    #include "asterisk/sched.h"
    #include "asterisk/channel.h"
    #include "asterisk/lock.h"
    
     * \brief Max num of schedule structs
     *
     * \note The max number of schedule structs to keep around
     * for use.  Undefine to disable schedule structure
     * caching. (Only disable this on very low memory
     * machines)
     */
    #define SCHED_MAX_CACHE 128
    
    
    Mark Spencer's avatar
    Mark Spencer committed
    
    struct sched {
    
    	AST_LIST_ENTRY(sched) list;
    
    	int id;                       /*!< ID number of event */
    	struct timeval when;          /*!< Absolute time event should take place */
    	int resched;                  /*!< When to reschedule */
    	int variable;                 /*!< Use return value from callback to reschedule */
    
    	const void *data;             /*!< Data */
    
    	ast_sched_cb callback;        /*!< Callback */
    
    	/*!
    	 * Used to synchronize between thread running a task and thread
    	 * attempting to delete a task
    	 */
    	ast_cond_t cond;
    	/*! Indication that a running task was deleted. */
    	unsigned int deleted:1;
    
    struct sched_thread {
    	pthread_t thread;
    	ast_cond_t cond;
    	unsigned int stop:1;
    };
    
    struct ast_sched_context {
    
    Mark Spencer's avatar
    Mark Spencer committed
    	ast_mutex_t lock;
    
    	unsigned int eventcnt;                  /*!< Number of events processed */
    
    	unsigned int highwater;					/*!< highest count so far */
    
    	struct ast_heap *sched_heap;
    
    	struct sched_thread *sched_thread;
    
    	/*! The scheduled task that is currently executing */
    	struct sched *currently_executing;
    
    Mark Spencer's avatar
    Mark Spencer committed
    
    #ifdef SCHED_MAX_CACHE
    
    	AST_LIST_HEAD_NOLOCK(, sched) schedc;   /*!< Cache of unused schedule structures and how many */
    
    Mark Spencer's avatar
    Mark Spencer committed
    #endif
    };
    
    
    	struct ast_sched_context *con = data;
    
    	while (!con->sched_thread->stop) {
    
    		ast_mutex_lock(&con->lock);
    
    		if (con->sched_thread->stop) {
    			ast_mutex_unlock(&con->lock);
    
    		ms = ast_sched_wait(con);
    
    			ast_cond_wait(&con->sched_thread->cond, &con->lock);
    		} else {
    
    			struct timeval tv;
    			tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
    			ts.tv_sec = tv.tv_sec;
    			ts.tv_nsec = tv.tv_usec * 1000;
    
    			ast_cond_timedwait(&con->sched_thread->cond, &con->lock, &ts);
    
    		ast_mutex_unlock(&con->lock);
    
    		if (con->sched_thread->stop) {
    
    		ast_sched_runq(con);
    
    static void sched_thread_destroy(struct ast_sched_context *con)
    
    	if (!con->sched_thread) {
    		return;
    
    	if (con->sched_thread->thread != AST_PTHREADT_NULL) {
    		ast_mutex_lock(&con->lock);
    		con->sched_thread->stop = 1;
    		ast_cond_signal(&con->sched_thread->cond);
    		ast_mutex_unlock(&con->lock);
    		pthread_join(con->sched_thread->thread, NULL);
    		con->sched_thread->thread = AST_PTHREADT_NULL;
    
    	ast_cond_destroy(&con->sched_thread->cond);
    
    	ast_free(con->sched_thread);
    
    	con->sched_thread = NULL;
    
    int ast_sched_start_thread(struct ast_sched_context *con)
    
    	struct sched_thread *st;
    
    	if (con->sched_thread) {
    		ast_log(LOG_ERROR, "Thread already started on this scheduler context\n");
    		return -1;
    	}
    
    	}
    
    	ast_cond_init(&st->cond, NULL);
    
    	st->thread = AST_PTHREADT_NULL;
    
    
    	con->sched_thread = st;
    
    	if (ast_pthread_create_background(&st->thread, NULL, sched_run, con)) {
    		ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
    		sched_thread_destroy(con);
    		return -1;
    
    static int sched_time_cmp(void *a, void *b)
    {
    
    	return ast_tvcmp(((struct sched *) b)->when, ((struct sched *) a)->when);
    
    struct ast_sched_context *ast_sched_context_create(void)
    
    Mark Spencer's avatar
    Mark Spencer committed
    {
    
    	struct ast_sched_context *tmp;
    
    	if (!(tmp = ast_calloc(1, sizeof(*tmp)))) {
    
    
    	if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
    			offsetof(struct sched, __heap_index)))) {
    
    		ast_sched_context_destroy(tmp);
    
    Mark Spencer's avatar
    Mark Spencer committed
    	return tmp;
    }
    
    
    static void sched_free(struct sched *task)
    {
    	ast_cond_destroy(&task->cond);
    	ast_free(task);
    }
    
    
    void ast_sched_context_destroy(struct ast_sched_context *con)
    
    Mark Spencer's avatar
    Mark Spencer committed
    {
    
    	sched_thread_destroy(con);
    	con->sched_thread = NULL;
    
    
    Mark Spencer's avatar
    Mark Spencer committed
    	ast_mutex_lock(&con->lock);
    
    Mark Spencer's avatar
    Mark Spencer committed
    #ifdef SCHED_MAX_CACHE
    
    	while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
    
    Mark Spencer's avatar
    Mark Spencer committed
    #endif
    
    	if (con->sched_heap) {
    		while ((s = ast_heap_pop(con->sched_heap))) {
    
    		}
    		ast_heap_destroy(con->sched_heap);
    		con->sched_heap = NULL;
    	}
    
    Mark Spencer's avatar
    Mark Spencer committed
    	ast_mutex_unlock(&con->lock);
    
    static struct sched *sched_alloc(struct ast_sched_context *con)
    
    Mark Spencer's avatar
    Mark Spencer committed
    {
    
    Mark Spencer's avatar
    Mark Spencer committed
    	/*
    	 * We keep a small cache of schedule entries
    	 * to minimize the number of necessary malloc()'s
    	 */
    #ifdef SCHED_MAX_CACHE
    
    	if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
    
    Mark Spencer's avatar
    Mark Spencer committed
    		con->schedccnt--;
    
    Mark Spencer's avatar
    Mark Spencer committed
    #endif
    
    		tmp = ast_calloc(1, sizeof(*tmp));
    
    Mark Spencer's avatar
    Mark Spencer committed
    	return tmp;
    }
    
    
    static void sched_release(struct ast_sched_context *con, struct sched *tmp)
    
    Mark Spencer's avatar
    Mark Spencer committed
    {
    	/*
    	 * Add to the cache, or just free() if we
    	 * already have too many cache entries
    	 */
    
    
    #ifdef SCHED_MAX_CACHE
    
    Mark Spencer's avatar
    Mark Spencer committed
    	if (con->schedccnt < SCHED_MAX_CACHE) {
    
    		AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
    
    Mark Spencer's avatar
    Mark Spencer committed
    		con->schedccnt++;
    	} else
    #endif
    
    void ast_sched_clean_by_callback(struct ast_sched_context *con, ast_sched_cb match, ast_sched_cb cleanup_cb)
    {
    	int i = 1;
    	struct sched *current;
    
    	ast_mutex_lock(&con->lock);
    	while ((current = ast_heap_peek(con->sched_heap, i))) {
    		if (current->callback != match) {
    			i++;
    			continue;
    		}
    
    		ast_heap_remove(con->sched_heap, current);
    
    		cleanup_cb(current->data);
    		sched_release(con, current);
    	}
    	ast_mutex_unlock(&con->lock);
    }
    
    
    Olle Johansson's avatar
    Olle Johansson committed
    /*! \brief
    
     * Return the number of milliseconds
    
    Olle Johansson's avatar
    Olle Johansson committed
     * until the next scheduled event
     */
    
    int ast_sched_wait(struct ast_sched_context *con)
    
    Mark Spencer's avatar
    Mark Spencer committed
    {
    	int ms;
    
    	DEBUG(ast_debug(1, "ast_sched_wait()\n"));
    
    Mark Spencer's avatar
    Mark Spencer committed
    	ast_mutex_lock(&con->lock);
    
    	if ((s = ast_heap_peek(con->sched_heap, 1))) {
    		ms = ast_tvdiff_ms(s->when, ast_tvnow());
    		if (ms < 0) {
    
    Mark Spencer's avatar
    Mark Spencer committed
    			ms = 0;
    
    Mark Spencer's avatar
    Mark Spencer committed
    	}
    	ast_mutex_unlock(&con->lock);
    
    Olle Johansson's avatar
    Olle Johansson committed
    /*! \brief
     * Take a sched structure and put it in the
     * queue, such that the soonest event is
    
     * first in the list.
    
    Olle Johansson's avatar
    Olle Johansson committed
     */
    
    static void schedule(struct ast_sched_context *con, struct sched *s)
    
    Mark Spencer's avatar
    Mark Spencer committed
    {
    
    	ast_heap_push(con->sched_heap, s);
    
    	if (ast_heap_size(con->sched_heap) > con->highwater) {
    		con->highwater = ast_heap_size(con->sched_heap);
    
    Olle Johansson's avatar
    Olle Johansson committed
    /*! \brief
    
     * given the last event *tv and the offset in milliseconds 'when',
     * computes the next value,
     */
    
    static int sched_settime(struct timeval *t, int when)
    
    Mark Spencer's avatar
    Mark Spencer committed
    {
    
    	/*ast_debug(1, "TV -> %lu,%lu\n", tv->tv_sec, tv->tv_usec);*/
    
    	if (ast_tvzero(*t))	/* not supplied, default to now */
    		*t = now;
    	*t = ast_tvadd(*t, ast_samp2tv(when, 1000));
    	if (ast_tvcmp(*t, now) < 0) {
    		*t = now;
    
    Mark Spencer's avatar
    Mark Spencer committed
    	}
    	return 0;
    }
    
    
    int ast_sched_replace_variable(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
    
    	/* 0 means the schedule item is new; do not delete */
    
    	return ast_sched_add_variable(con, when, callback, data, variable);
    }
    
    Olle Johansson's avatar
    Olle Johansson committed
    /*! \brief
     * Schedule callback(data) to happen when ms into the future
     */
    
    int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
    
    Mark Spencer's avatar
    Mark Spencer committed
    {
    	struct sched *tmp;
    
    Mark Spencer's avatar
    Mark Spencer committed
    	int res = -1;
    
    	DEBUG(ast_debug(1, "ast_sched_add()\n"));
    
    Mark Spencer's avatar
    Mark Spencer committed
    	ast_mutex_lock(&con->lock);
    
    Mark Spencer's avatar
    Mark Spencer committed
    	if ((tmp = sched_alloc(con))) {
    		tmp->id = con->eventcnt++;
    		tmp->callback = callback;
    		tmp->data = data;
    		tmp->resched = when;
    
    		tmp->variable = variable;
    
    Mark Spencer's avatar
    Mark Spencer committed
    		if (sched_settime(&tmp->when, when)) {
    			sched_release(con, tmp);
    
    Mark Spencer's avatar
    Mark Spencer committed
    		} else {
    
    Mark Spencer's avatar
    Mark Spencer committed
    			schedule(con, tmp);
    
    Mark Spencer's avatar
    Mark Spencer committed
    			res = tmp->id;
    		}
    	}
    
    #ifdef DUMP_SCHEDULER
    	/* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
    
    	if (con->sched_thread) {
    		ast_cond_signal(&con->sched_thread->cond);
    	}
    
    Mark Spencer's avatar
    Mark Spencer committed
    	ast_mutex_unlock(&con->lock);
    
    Mark Spencer's avatar
    Mark Spencer committed
    	return res;
    
    int ast_sched_replace(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
    
    	return ast_sched_add(con, when, callback, data);
    }
    
    
    int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
    
    {
    	return ast_sched_add_variable(con, when, callback, data, 0);
    }
    
    
    static struct sched *sched_find(struct ast_sched_context *con, int id)
    
    	int x;
    	size_t heap_size;
    
    	heap_size = ast_heap_size(con->sched_heap);
    	for (x = 1; x <= heap_size; x++) {
    		struct sched *cur = ast_heap_peek(con->sched_heap, x);
    
    		if (cur->id == id) {
    			return cur;
    		}
    	}
    
    
    const void *ast_sched_find_data(struct ast_sched_context *con, int id)
    {
    	struct sched *s;
    	const void *data = NULL;
    
    	ast_mutex_lock(&con->lock);
    
    	s = sched_find(con, id);
    	if (s) {
    		data = s->data;
    	}
    
    	ast_mutex_unlock(&con->lock);
    
    	return data;
    }
    
    
    Olle Johansson's avatar
    Olle Johansson committed
    /*! \brief
     * Delete the schedule entry with number
     * "id".  It's nearly impossible that there
     * would be two or more in the list with that
     * id.
     */
    
    Tilghman Lesher's avatar
    Tilghman Lesher committed
    #ifndef AST_DEVMODE
    
    int ast_sched_del(struct ast_sched_context *con, int id)
    
    int _ast_sched_del(struct ast_sched_context *con, int id, const char *file, int line, const char *function)
    
    Mark Spencer's avatar
    Mark Spencer committed
    {
    
    	struct sched *s = NULL;
    
    	int *last_id = ast_threadstorage_get(&last_del_id, sizeof(int));
    
    	DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
    
    Mark Spencer's avatar
    Mark Spencer committed
    	ast_mutex_lock(&con->lock);
    
    
    	s = sched_find(con, id);
    
    		if (!ast_heap_remove(con->sched_heap, s)) {
    			ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->id);
    		}
    
    	} else if (con->currently_executing && (id == con->currently_executing->id)) {
    		s = con->currently_executing;
    		s->deleted = 1;
    		/* Wait for executing task to complete so that caller of ast_sched_del() does not
    		 * free memory out from under the task.
    		 */
    		ast_cond_wait(&s->cond, &con->lock);
    		/* Do not sched_release() here because ast_sched_runq() will do it */
    
    #ifdef DUMP_SCHEDULER
    	/* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
    
    	if (con->sched_thread) {
    		ast_cond_signal(&con->sched_thread->cond);
    	}
    
    Mark Spencer's avatar
    Mark Spencer committed
    	ast_mutex_unlock(&con->lock);
    
    		ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
    
    Tilghman Lesher's avatar
    Tilghman Lesher committed
    #ifndef AST_DEVMODE
    
    		ast_assert(s != NULL);
    #else
    
    			char buf[100];
    
    			snprintf(buf, sizeof(buf), "s != NULL, id=%d", id);
    			_ast_assert(0, buf, file, line, function);
    
    Mark Spencer's avatar
    Mark Spencer committed
    		return -1;
    
    void ast_sched_report(struct ast_sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
    
    	int countlist[cbnames->numassocs + 1];
    
    	memset(countlist, 0, sizeof(countlist));
    
    	ast_str_set(buf, 0, " Highwater = %u\n schedcnt = %zu\n", con->highwater, ast_heap_size(con->sched_heap));
    
    	ast_mutex_lock(&con->lock);
    
    
    	heap_size = ast_heap_size(con->sched_heap);
    	for (x = 1; x <= heap_size; x++) {
    		cur = ast_heap_peek(con->sched_heap, x);
    
    		/* match the callback to the cblist */
    
    		for (i = 0; i < cbnames->numassocs; i++) {
    			if (cur->callback == cbnames->cblist[i]) {
    
    		if (i < cbnames->numassocs) {
    
    	ast_mutex_unlock(&con->lock);
    
    	for (i = 0; i < cbnames->numassocs; i++) {
    		ast_str_append(buf, 0, "    %s : %d\n", cbnames->list[i], countlist[i]);
    	}
    
    	ast_str_append(buf, 0, "   <unknown> : %d\n", countlist[cbnames->numassocs]);
    }
    
    /*! \brief Dump the contents of the scheduler to LOG_DEBUG */
    
    void ast_sched_dump(struct ast_sched_context *con)
    
    Mark Spencer's avatar
    Mark Spencer committed
    {
    	struct sched *q;
    
    	struct timeval when = ast_tvnow();
    
    Mark Spencer's avatar
    Mark Spencer committed
    #ifdef SCHED_MAX_CACHE
    
    	ast_debug(1, "Asterisk Schedule Dump (%zu in Q, %u Total, %u Cache, %u high-water)\n", ast_heap_size(con->sched_heap), con->eventcnt - 1, con->schedccnt, con->highwater);
    
    Mark Spencer's avatar
    Mark Spencer committed
    #else
    
    	ast_debug(1, "Asterisk Schedule Dump (%zu in Q, %u Total, %u high-water)\n", ast_heap_size(con->sched_heap), con->eventcnt - 1, con->highwater);
    
    Mark Spencer's avatar
    Mark Spencer committed
    #endif
    
    
    	ast_debug(1, "=============================================================\n");
    	ast_debug(1, "|ID    Callback          Data              Time  (sec:ms)   |\n");
    	ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
    
    	ast_mutex_lock(&con->lock);
    
    	heap_size = ast_heap_size(con->sched_heap);
    	for (x = 1; x <= heap_size; x++) {
    		struct timeval delta;
    		q = ast_heap_peek(con->sched_heap, x);
    		delta = ast_tvsub(q->when, when);
    
    		ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
    
    Mark Spencer's avatar
    Mark Spencer committed
    	}
    
    	ast_mutex_unlock(&con->lock);
    
    	ast_debug(1, "=============================================================\n");
    
    Olle Johansson's avatar
    Olle Johansson committed
    /*! \brief
     * Launch all events which need to be run at this time.
     */
    
    int ast_sched_runq(struct ast_sched_context *con)
    
    Mark Spencer's avatar
    Mark Spencer committed
    {
    	struct sched *current;
    
    	DEBUG(ast_debug(1, "ast_sched_runq()\n"));
    
    Mark Spencer's avatar
    Mark Spencer committed
    	ast_mutex_lock(&con->lock);
    
    	when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
    
    	for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
    
    		/* schedule all events which are going to expire within 1ms.
    		 * We only care about millisecond accuracy anyway, so this will
    		 * help us get more than one event at one time if they are very
    		 * close together.
    		 */
    
    		if (ast_tvcmp(current->when, when) != -1) {
    
    		current = ast_heap_pop(con->sched_heap);
    
    
    		/*
    		 * At this point, the schedule queue is still intact.  We
    		 * have removed the first event and the rest is still there,
    		 * so it's permissible for the callback to add new events, but
    		 * trying to delete itself won't work because it isn't in
    
    		 * the schedule queue.  If that's what it wants to do, it
    
    		con->currently_executing = current;
    
    		ast_mutex_unlock(&con->lock);
    		res = current->callback(current->data);
    		ast_mutex_lock(&con->lock);
    
    		con->currently_executing = NULL;
    		ast_cond_signal(&current->cond);
    
    		if (res && !current->deleted) {
    
    			 * If they return non-zero, we should schedule them to be
    			 * run again.
    			 */
    			if (sched_settime(&current->when, current->variable? res : current->resched)) {
    				sched_release(con, current);
    
    		} else {
    			/* No longer needed, so release it */
    
    			sched_release(con, current);
    
    Mark Spencer's avatar
    Mark Spencer committed
    	}
    
    Mark Spencer's avatar
    Mark Spencer committed
    	ast_mutex_unlock(&con->lock);
    
    Mark Spencer's avatar
    Mark Spencer committed
    }
    
    long ast_sched_when(struct ast_sched_context *con,int id)
    
    	struct sched *s;
    
    	DEBUG(ast_debug(1, "ast_sched_when()\n"));
    
    
    	ast_mutex_lock(&con->lock);
    
    	s = sched_find(con, id);
    
    Luigi Rizzo's avatar
    Luigi Rizzo committed
    		secs = s->when.tv_sec - now.tv_sec;
    
    	ast_mutex_unlock(&con->lock);
    
    	return secs;
    }