diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index c7ff9fd125a4ee57b213e9648f77f39055cd7652..f263dcc2b48bac913b596446bd82a66054800c30 100644 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -305,7 +305,7 @@ int (*iax2_regfunk)(const char *username, int onoff) = NULL; } while(0) static struct io_context *io; -static struct sched_context *sched; +static struct ast_sched_thread *sched; static int iax2_capability = IAX_CAPABILITY_FULLBANDWIDTH; @@ -332,9 +332,6 @@ static int iax2_encryption = 0; static struct ast_flags globalflags = { 0 }; static pthread_t netthreadid = AST_PTHREADT_NULL; -static pthread_t schedthreadid = AST_PTHREADT_NULL; -AST_MUTEX_DEFINE_STATIC(sched_lock); -static ast_cond_t sched_cond; enum iax2_state { IAX_STATE_STARTED = (1 << 0), @@ -1261,22 +1258,18 @@ static int __schedule_action(void (*func)(const void *data), const void *data, c #define schedule_action(func, data) __schedule_action(func, data, __PRETTY_FUNCTION__) #endif -static int iax2_sched_replace(int id, struct sched_context *con, int when, ast_sched_cb callback, const void *data) +static int iax2_sched_replace(int id, struct ast_sched_thread *st, int when, + ast_sched_cb callback, const void *data) { - AST_SCHED_REPLACE(id, con, when, callback, data); - signal_condition(&sched_lock, &sched_cond); + ast_sched_thread_del(st, id); - return id; + return ast_sched_thread_add(st, when, callback, data); } -static int iax2_sched_add(struct sched_context *con, int when, ast_sched_cb callback, const void *data) +static int iax2_sched_add(struct ast_sched_thread *st, int when, + ast_sched_cb callback, const void *data) { - int res; - - res = ast_sched_add(con, when, callback, data); - signal_condition(&sched_lock, &sched_cond); - - return res; + return ast_sched_thread_add(st, when, callback, data); } static int send_ping(const void *data); @@ -1525,18 +1518,18 @@ static void iax2_destroy_helper(struct chan_iax2_pvt *pvt) ast_clear_flag(pvt, IAX_MAXAUTHREQ); } /* No more pings or lagrq's */ - AST_SCHED_DEL_SPINLOCK(sched, pvt->pingid, &iaxsl[pvt->callno]); - AST_SCHED_DEL_SPINLOCK(sched, pvt->lagid, &iaxsl[pvt->callno]); - AST_SCHED_DEL(sched, pvt->autoid); - AST_SCHED_DEL(sched, pvt->authid); - AST_SCHED_DEL(sched, pvt->initid); - AST_SCHED_DEL(sched, pvt->jbid); - AST_SCHED_DEL(sched, pvt->keyrotateid); + AST_SCHED_DEL_SPINLOCK(ast_sched_thread_get_context(sched), pvt->pingid, &iaxsl[pvt->callno]); + AST_SCHED_DEL_SPINLOCK(ast_sched_thread_get_context(sched), pvt->lagid, &iaxsl[pvt->callno]); + ast_sched_thread_del(sched, pvt->autoid); + ast_sched_thread_del(sched, pvt->authid); + ast_sched_thread_del(sched, pvt->initid); + ast_sched_thread_del(sched, pvt->jbid); + ast_sched_thread_del(sched, pvt->keyrotateid); } static void iax2_frame_free(struct iax_frame *fr) { - AST_SCHED_DEL(sched, fr->retrans); + ast_sched_thread_del(sched, fr->retrans); iax_frame_free(fr); } @@ -1725,8 +1718,8 @@ static int make_trunk(unsigned short callno, int locked) * \note We delete these before switching the slot, because if * they fire in the meantime, they will generate a warning. */ - AST_SCHED_DEL(sched, iaxs[callno]->pingid); - AST_SCHED_DEL(sched, iaxs[callno]->lagid); + ast_sched_thread_del(sched, iaxs[callno]->pingid); + ast_sched_thread_del(sched, iaxs[callno]->lagid); iaxs[x] = iaxs[callno]; iaxs[x]->callno = x; iaxs[callno] = NULL; @@ -3214,7 +3207,7 @@ static int schedule_delivery(struct iax_frame *fr, int updatehistory, int fromtr jb_reset(iaxs[fr->callno]->jb); - AST_SCHED_DEL(sched, iaxs[fr->callno]->jbid); + ast_sched_thread_del(sched, iaxs[fr->callno]->jbid); /* deliver this frame now */ if (tsout) @@ -3254,7 +3247,7 @@ static int iax2_transmit(struct iax_frame *fr) /* Wake up the network and scheduler thread */ if (netthreadid != AST_PTHREADT_NULL) pthread_kill(netthreadid, SIGURG); - signal_condition(&sched_lock, &sched_cond); + ast_sched_thread_poke(sched); return 0; } @@ -3390,7 +3383,7 @@ static struct iax2_peer *realtime_peer(const char *peername, struct sockaddr_in ast_copy_flags(peer, &globalflags, IAX_RTAUTOCLEAR|IAX_RTCACHEFRIENDS); if (ast_test_flag(peer, IAX_RTAUTOCLEAR)) { if (peer->expire > -1) { - if (!ast_sched_del(sched, peer->expire)) { + if (!ast_sched_thread_del(sched, peer->expire)) { peer->expire = -1; peer_unref(peer); } @@ -3948,7 +3941,7 @@ static int iax2_hangup(struct ast_channel *c) ast_debug(1, "Really destroying %s now...\n", c->name); iax2_destroy(callno); } else if (iaxs[callno]) { - if (ast_sched_add(sched, 10000, scheduled_destroy, CALLNO_TO_PTR(callno)) < 0) { + if (ast_sched_thread_add(sched, 10000, scheduled_destroy, CALLNO_TO_PTR(callno)) < 0) { ast_log(LOG_ERROR, "Unable to schedule iax2 callno %d destruction?!! Destroying immediately.\n", callno); iax2_destroy(callno); } @@ -4051,7 +4044,7 @@ static int iax2_key_rotate(const void *vpvt) ast_mutex_lock(&iaxsl[pvt->callno]); pvt->keyrotateid = - ast_sched_add(sched, 120000 + (ast_random() % 180001), iax2_key_rotate, vpvt); + ast_sched_thread_add(sched, 120000 + (ast_random() % 180001), iax2_key_rotate, vpvt); snprintf(key, sizeof(key), "%lX", ast_random()); @@ -7072,14 +7065,14 @@ static void prune_peers(void); static void unlink_peer(struct iax2_peer *peer) { if (peer->expire > -1) { - if (!ast_sched_del(sched, peer->expire)) { + if (!ast_sched_thread_del(sched, peer->expire)) { peer->expire = -1; peer_unref(peer); } } if (peer->pokeexpire > -1) { - if (!ast_sched_del(sched, peer->pokeexpire)) { + if (!ast_sched_thread_del(sched, peer->pokeexpire)) { peer->pokeexpire = -1; peer_unref(peer); } @@ -7153,7 +7146,7 @@ static void reg_source_db(struct iax2_peer *p) p->addr.sin_addr = in; p->addr.sin_port = htons(atoi(c)); if (p->expire > -1) { - if (!ast_sched_del(sched, p->expire)) { + if (!ast_sched_thread_del(sched, p->expire)) { p->expire = -1; peer_unref(p); } @@ -7249,7 +7242,7 @@ static int update_registry(struct sockaddr_in *sin, int callno, char *devtype, i p->sockfd = fd; /* Setup the expiry */ if (p->expire > -1) { - if (!ast_sched_del(sched, p->expire)) { + if (!ast_sched_thread_del(sched, p->expire)) { p->expire = -1; peer_unref(p); } @@ -8733,7 +8726,7 @@ retryowner: } } if (f.frametype == AST_FRAME_IAX) { - AST_SCHED_DEL(sched, iaxs[fr->callno]->initid); + ast_sched_thread_del(sched, iaxs[fr->callno]->initid); /* Handle the IAX pseudo frame itself */ if (iaxdebug) ast_debug(1, "IAX subclass %d received\n", f.subclass); @@ -9215,7 +9208,7 @@ retryowner2: /* Remove scheduled iax2_poke_noanswer */ if (peer->pokeexpire > -1) { - if (!ast_sched_del(sched, peer->pokeexpire)) { + if (!ast_sched_thread_del(sched, peer->pokeexpire)) { peer_unref(peer); peer->pokeexpire = -1; } @@ -10268,7 +10261,7 @@ static int iax2_poke_peer(struct iax2_peer *peer, int heldcall) iaxs[peer->callno]->peerpoke = peer; if (peer->pokeexpire > -1) { - if (!ast_sched_del(sched, peer->pokeexpire)) { + if (!ast_sched_thread_del(sched, peer->pokeexpire)) { peer->pokeexpire = -1; peer_unref(peer); } @@ -10383,34 +10376,6 @@ static struct ast_channel *iax2_request(const char *type, int format, void *data return c; } -static void *sched_thread(void *ignore) -{ - int count; - int res; - struct timeval wait; - struct timespec ts; - - for (;;) { - pthread_testcancel(); - ast_mutex_lock(&sched_lock); - res = ast_sched_wait(sched); - if ((res > 1000) || (res < 0)) - res = 1000; - wait = ast_tvadd(ast_tvnow(), ast_samp2tv(res, 1000)); - ts.tv_sec = wait.tv_sec; - ts.tv_nsec = wait.tv_usec * 1000; - ast_cond_timedwait(&sched_cond, &sched_lock, &ts); - ast_mutex_unlock(&sched_lock); - pthread_testcancel(); - - count = ast_sched_runq(sched); - if (count >= 20) - ast_debug(1, "chan_iax2: ast_sched_runq ran %d scheduled tasks all at once\n", count); - } - - return NULL; -} - static void *network_thread(void *ignore) { /* Our job is simple: Send queued messages, retrying if necessary. Read frames @@ -10498,7 +10463,6 @@ static int start_network_thread(void) AST_LIST_UNLOCK(&idle_list); } } - ast_pthread_create_background(&schedthreadid, NULL, sched_thread, NULL); ast_pthread_create_background(&netthreadid, NULL, network_thread, NULL); ast_verb(2, "%d helper threads started\n", threadcount); return 0; @@ -10771,7 +10735,7 @@ static struct iax2_peer *build_peer(const char *name, struct ast_variable *v, st } } else { /* Non-dynamic. Make sure we become that way if we're not */ - AST_SCHED_DEL(sched, peer->expire); + ast_sched_thread_del(sched, peer->expire); ast_clear_flag(peer, IAX_DYNAMIC); if (ast_dnsmgr_lookup(v->value, &peer->addr, &peer->dnsmgr, srvlookup ? "_iax._udp" : NULL)) return peer_unref(peer); @@ -11144,7 +11108,7 @@ static void delete_users(void) AST_LIST_LOCK(®istrations); while ((reg = AST_LIST_REMOVE_HEAD(®istrations, entry))) { - AST_SCHED_DEL(sched, reg->expire); + ast_sched_thread_del(sched, reg->expire); if (reg->callno) { int callno = reg->callno; ast_mutex_lock(&iaxsl[callno]); @@ -12327,21 +12291,13 @@ static int __unload_module(void) /* Cancel the network thread, close the net socket */ if (netthreadid != AST_PTHREADT_NULL) { AST_LIST_LOCK(&frame_queue); - ast_mutex_lock(&sched_lock); pthread_cancel(netthreadid); - ast_cond_signal(&sched_cond); - ast_mutex_unlock(&sched_lock); /* Release the schedule lock resource */ AST_LIST_UNLOCK(&frame_queue); pthread_join(netthreadid, NULL); } - if (schedthreadid != AST_PTHREADT_NULL) { - ast_mutex_lock(&sched_lock); - pthread_cancel(schedthreadid); - ast_cond_signal(&sched_cond); - ast_mutex_unlock(&sched_lock); - pthread_join(schedthreadid, NULL); - } + sched = ast_sched_thread_destroy(sched); + /* Call for all threads to halt */ AST_LIST_LOCK(&idle_list); while ((thread = AST_LIST_REMOVE_HEAD(&idle_list, list))) @@ -12379,7 +12335,6 @@ static int __unload_module(void) ast_channel_unregister(&iax2_tech); delete_users(); iax_provision_unload(); - sched_context_destroy(sched); reload_firmware(1); for (x = 0; x < ARRAY_LEN(iaxsl); x++) { @@ -12494,23 +12449,21 @@ static int load_module(void) ast_mutex_init(&iaxsl[x]); } - ast_cond_init(&sched_cond, NULL); - - if (!(sched = sched_context_create())) { - ast_log(LOG_ERROR, "Failed to create scheduler context\n"); + if (!(sched = ast_sched_thread_create())) { + ast_log(LOG_ERROR, "Failed to create scheduler thread\n"); return AST_MODULE_LOAD_FAILURE; } if (!(io = io_context_create())) { ast_log(LOG_ERROR, "Failed to create I/O context\n"); - sched_context_destroy(sched); + sched = ast_sched_thread_destroy(sched); return AST_MODULE_LOAD_FAILURE; } if (!(netsock = ast_netsock_list_alloc())) { ast_log(LOG_ERROR, "Failed to create netsock list\n"); io_context_destroy(io); - sched_context_destroy(sched); + sched = ast_sched_thread_destroy(sched); return AST_MODULE_LOAD_FAILURE; } ast_netsock_init(netsock); @@ -12519,7 +12472,7 @@ static int load_module(void) if (!outsock) { ast_log(LOG_ERROR, "Could not allocate outsock list.\n"); io_context_destroy(io); - sched_context_destroy(sched); + sched = ast_sched_thread_destroy(sched); return AST_MODULE_LOAD_FAILURE; } ast_netsock_init(outsock); diff --git a/include/asterisk/sched.h b/include/asterisk/sched.h index ab328af959b72c90b448ad28178933266c4873e7..f9e1ca5d2df840c6811b7cec16c561740e3ebbef 100644 --- a/include/asterisk/sched.h +++ b/include/asterisk/sched.h @@ -49,15 +49,17 @@ extern "C" { * and not a copy of the value of the id. */ #define AST_SCHED_DEL(sched, id) \ - do { \ + ({ \ int _count = 0; \ - while (id > -1 && ast_sched_del(sched, id) && ++_count < 10) { \ + int _sched_res = -1; \ + while (id > -1 && (_sched_res = ast_sched_del(sched, id)) && ++_count < 10) \ usleep(1); \ + if (_count == 10 && option_debug > 2) { \ + ast_log(LOG_DEBUG, "Unable to cancel schedule ID %d.\n", id); \ } \ - if (_count == 10) \ - ast_debug(3, "Unable to cancel schedule ID %d.\n", id); \ id = -1; \ - } while (0); + (_sched_res); \ + }) #define AST_SCHED_DEL_UNREF(sched, id, refcall) \ do { \ @@ -282,6 +284,114 @@ long ast_sched_when(struct sched_context *con,int id); } \ } while(0) +/*! + * \brief An opaque type representing a scheduler thread + * + * The purpose of the ast_sched_thread API is to provide a common implementation + * of the case where a module wants to have a dedicated thread for handling the + * scheduler. + */ +struct ast_sched_thread; + +/*! + * \brief Create a scheduler with a dedicated thread + * + * This function should be used to allocate a scheduler context and a dedicated + * thread for processing scheduler entries. The thread is started immediately. + * + * \retval NULL error + * \retval non-NULL a handle to the scheduler and its dedicated thread. + */ +struct ast_sched_thread *ast_sched_thread_create(void); + +/*! + * \brief Destroy a scheduler and its thread + * + * This function is used to destroy a scheduler context and the dedicated thread + * that was created for handling scheduler entries. Any entries in the scheduler + * that have not yet been processed will be thrown away. Once this function is + * called, the handle must not be used again. + * + * \param st the handle to the scheduler and thread + * + * \return NULL for convenience + */ +struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st); + +/*! + * \brief Add a scheduler entry + * + * \param st the handle to the scheduler and thread + * \param when the number of ms in the future to run the task. A value <= 0 + * is treated as "run now". + * \param cb the function to call when the scheduled time arrives + * \param data the parameter to pass to the scheduler callback + * + * \retval 0 success + * \retval non-zero failure + */ +int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb, + const void *data); + +/*! + * \brief Add a variable reschedule time scheduler entry + * + * \param st the handle to the scheduler and thread + * \param when the number of ms in the future to run the task. A value <= 0 + * is treated as "run now". + * \param cb the function to call when the scheduled time arrives + * \param data the parameter to pass to the scheduler callback + * \param variable If this value is non-zero, then the scheduler will use the return + * value of the scheduler as the amount of time in the future to run the + * task again. Normally, a return value of 0 means do not re-schedule, and + * non-zero means re-schedule using the time provided when the scheduler + * entry was first created. + * + * \retval 0 success + * \retval non-zero failure + */ +int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb, + const void *data, int variable); + +/*! + * \brief Get the scheduler context for a given ast_sched_thread + * + * This function should be used only when direct access to the scheduler context + * is required. Its use is discouraged unless necessary. The cases where + * this is currently required is when you want to take advantage of one of the + * AST_SCHED macros. + * + * \param st the handle to the scheduler and thread + * + * \return the sched_context associated with an ast_sched_thread + */ +struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st); + +/*! + * \brief Delete a scheduler entry + * + * This uses the AST_SCHED_DEL macro internally. + * + * \param st the handle to the scheduler and thread + * \param id scheduler entry id to delete + * + * \retval 0 success + * \retval non-zero failure + */ +#define ast_sched_thread_del(st, id) ({ \ + struct sched_context *__tmp_context = ast_sched_thread_get_context(st); \ + AST_SCHED_DEL(__tmp_context, id); \ +}) + +/*! + * \brief Force re-processing of the scheduler context + * + * \param st the handle to the scheduler and thread + * + * \return nothing + */ +void ast_sched_thread_poke(struct ast_sched_thread *st); + #if defined(__cplusplus) || defined(c_plusplus) } #endif diff --git a/main/sched.c b/main/sched.c index d62ca115e4ecf1c53d71492f20bce513ecd04ae9..8b69814a3f0b9880e58fbabfd3f3549c36da8798 100644 --- a/main/sched.c +++ b/main/sched.c @@ -1,7 +1,7 @@ /* * Asterisk -- An open source telephony toolkit. * - * Copyright (C) 1999 - 2005, Digium, Inc. + * Copyright (C) 1999 - 2008, Digium, Inc. * * Mark Spencer <markster@digium.com> * @@ -70,6 +70,148 @@ struct sched_context { #endif }; +struct ast_sched_thread { + pthread_t thread; + ast_mutex_t lock; + ast_cond_t cond; + struct sched_context *context; + unsigned int stop:1; +}; + +static void *sched_run(void *data) +{ + struct ast_sched_thread *st = data; + + while (!st->stop) { + int ms; + struct timespec ts = { + .tv_sec = 0, + }; + + ast_mutex_lock(&st->lock); + + if (st->stop) { + ast_mutex_unlock(&st->lock); + return NULL; + } + + ms = ast_sched_wait(st->context); + + if (ms == -1) { + ast_cond_wait(&st->cond, &st->lock); + } else { + struct timeval tv; + tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000)); + ts.tv_sec = tv.tv_sec; + ts.tv_nsec = tv.tv_usec * 1000; + ast_cond_timedwait(&st->cond, &st->lock, &ts); + } + + ast_mutex_unlock(&st->lock); + + if (st->stop) { + return NULL; + } + + ast_sched_runq(st->context); + } + + return NULL; +} + +void ast_sched_thread_poke(struct ast_sched_thread *st) +{ + ast_mutex_lock(&st->lock); + ast_cond_signal(&st->cond); + ast_mutex_unlock(&st->lock); +} + +struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st) +{ + return st->context; +} + +struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st) +{ + if (st->thread != AST_PTHREADT_NULL) { + ast_mutex_lock(&st->lock); + st->stop = 1; + ast_cond_signal(&st->cond); + ast_mutex_unlock(&st->lock); + pthread_join(st->thread, NULL); + st->thread = AST_PTHREADT_NULL; + } + + ast_mutex_destroy(&st->lock); + ast_cond_destroy(&st->cond); + + if (st->context) { + sched_context_destroy(st->context); + st->context = NULL; + } + + ast_free(st); + + return NULL; +} + +struct ast_sched_thread *ast_sched_thread_create(void) +{ + struct ast_sched_thread *st; + + if (!(st = ast_calloc(1, sizeof(*st)))) { + return NULL; + } + + ast_mutex_init(&st->lock); + ast_cond_init(&st->cond, NULL); + + st->thread = AST_PTHREADT_NULL; + + if (!(st->context = sched_context_create())) { + ast_log(LOG_ERROR, "Failed to create scheduler\n"); + ast_sched_thread_destroy(st); + return NULL; + } + + if (ast_pthread_create_background(&st->thread, NULL, sched_run, st)) { + ast_log(LOG_ERROR, "Failed to create scheduler thread\n"); + ast_sched_thread_destroy(st); + return NULL; + } + + return st; +} + +int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb, + const void *data, int variable) +{ + int res; + + ast_mutex_lock(&st->lock); + res = ast_sched_add_variable(st->context, when, cb, data, variable); + if (res != -1) { + ast_cond_signal(&st->cond); + } + ast_mutex_unlock(&st->lock); + + return res; +} + +int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb, + const void *data) +{ + int res; + + ast_mutex_lock(&st->lock); + res = ast_sched_add(st->context, when, cb, data); + if (res != -1) { + ast_cond_signal(&st->cond); + } + ast_mutex_unlock(&st->lock); + + return res; +} /* hash routines for sched */