diff --git a/apps/app_queue.c b/apps/app_queue.c index 1db2fc2730a95c646477ce6ba8ef73513960cad7..ce6f54e664a299058a878bf53b64d12fa1169476 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -120,6 +120,7 @@ static struct strategy { #define MAX_PERIODIC_ANNOUNCEMENTS 10 /* The maximum periodic announcements we can have */ #define DEFAULT_MIN_ANNOUNCE_FREQUENCY 15 /* The minimum number of seconds between position announcements The default value of 15 provides backwards compatibility */ +#define MAX_QUEUE_BUCKETS 53 #define RES_OKAY 0 /* Action completed */ #define RES_EXISTS (-1) /* Entry already exists */ @@ -350,7 +351,6 @@ static AST_LIST_HEAD_STATIC(interfaces, member_interface); #define QUEUE_EVENT_VARIABLES 3 struct call_queue { - ast_mutex_t lock; char name[80]; /*!< Name */ char moh[80]; /*!< Music On Hold class to be used */ char announce[80]; /*!< Announcement to play when call is answered */ @@ -422,7 +422,7 @@ struct call_queue { AST_LIST_ENTRY(call_queue) list; /*!< Next call queue */ }; -static AST_LIST_HEAD_STATIC(queues, call_queue); +static struct ao2_container *queues; static void update_realtime_members(struct call_queue *q); static int set_member_paused(const char *queuename, const char *interface, const char *reason, int paused); @@ -463,6 +463,30 @@ static int strat2int(const char *strategy) return -1; } +static int queue_hash_cb(const void *obj, const int flags) +{ + const struct call_queue *q = obj; + return ast_str_hash(q->name); +} + +static int queue_cmp_cb(void *obj, void *arg, int flags) +{ + struct call_queue *q = obj, *q2 = arg; + return !strcasecmp(q->name, q2->name) ? CMP_MATCH : 0; +} + +static inline struct call_queue *queue_ref(struct call_queue *q) +{ + ao2_ref(q, 1); + return q; +} + +static inline struct call_queue *queue_unref(struct call_queue *q) +{ + ao2_ref(q, -1); + return q; +} + static void set_queue_variables(struct queue_ent *qe) { @@ -516,7 +540,7 @@ static enum queue_member_status get_member_status(struct call_queue *q, int max_ struct ao2_iterator mem_iter; enum queue_member_status result = QUEUE_NO_MEMBERS; - ast_mutex_lock(&q->lock); + ao2_lock(q); mem_iter = ao2_iterator_init(q->members, 0); for (; (member = ao2_iterator_next(&mem_iter)); ao2_ref(member, -1)) { if (max_penalty && (member->penalty > max_penalty)) @@ -534,7 +558,7 @@ static enum queue_member_status get_member_status(struct call_queue *q, int max_ if (member->paused) { result = QUEUE_NO_UNPAUSED_REACHABLE_MEMBERS; } else { - ast_mutex_unlock(&q->lock); + ao2_unlock(q); ao2_ref(member, -1); return QUEUE_NORMAL; } @@ -542,7 +566,7 @@ static enum queue_member_status get_member_status(struct call_queue *q, int max_ } } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); return result; } @@ -558,6 +582,7 @@ static void *handle_statechange(struct statechange *sc) struct member *cur; struct ao2_iterator mem_iter; struct member_interface *curint; + struct ao2_iterator queue_iter; char *loc; char *technology; @@ -589,9 +614,9 @@ static void *handle_statechange(struct statechange *sc) } ast_debug(1, "Device '%s/%s' changed to state '%d' (%s)\n", technology, loc, sc->state, devstate2str(sc->state)); - AST_LIST_LOCK(&queues); - AST_LIST_TRAVERSE(&queues, q, list) { - ast_mutex_lock(&q->lock); + queue_iter = ao2_iterator_init(queues, 0); + while ((q = ao2_iterator_next(&queue_iter))) { + ao2_lock(q); mem_iter = ao2_iterator_init(q->members, 0); while ((cur = ao2_iterator_next(&mem_iter))) { char *interface; @@ -628,9 +653,9 @@ static void *handle_statechange(struct statechange *sc) } ao2_ref(cur, -1); } - ast_mutex_unlock(&q->lock); + queue_unref(q); + ao2_unlock(q); } - AST_LIST_UNLOCK(&queues); return NULL; } @@ -734,16 +759,6 @@ static struct member *create_queue_member(const char *interface, const char *mem return cur; } -static struct call_queue *alloc_queue(const char *queuename) -{ - struct call_queue *q; - - if ((q = ast_calloc(1, sizeof(*q)))) { - ast_mutex_init(&q->lock); - ast_copy_string(q->name, queuename, sizeof(q->name)); - } - return q; -} static int compress_char(const char c) { @@ -857,22 +872,23 @@ static int interface_exists_global(const char *interface) { struct call_queue *q; struct member *mem, tmpmem; + struct ao2_iterator queue_iter; int ret = 0; ast_copy_string(tmpmem.interface, interface, sizeof(tmpmem.interface)); + queue_iter = ao2_iterator_init(queues, 0); + while ((q = ao2_iterator_next(&queue_iter))) { - AST_LIST_LOCK(&queues); - AST_LIST_TRAVERSE(&queues, q, list) { - ast_mutex_lock(&q->lock); + ao2_lock(q); if ((mem = ao2_find(q->members, &tmpmem, OBJ_POINTER))) { ao2_ref(mem, -1); - ret = 1; - } - ast_mutex_unlock(&q->lock); - if (ret) + ao2_unlock(q); + queue_unref(q); break; + } + ao2_unlock(q); + queue_unref(q); } - AST_LIST_UNLOCK(&queues); return ret; } @@ -1137,12 +1153,22 @@ static void free_members(struct call_queue *q, int all) } } -static void destroy_queue(struct call_queue *q) +static void destroy_queue(void *obj) { + struct call_queue *q = obj; + ast_debug(0, "Queue destructor called for queue '%s'!\n", q->name); free_members(q, 1); - ast_mutex_destroy(&q->lock); ao2_ref(q->members, -1); - free(q); +} + +static struct call_queue *alloc_queue(const char *queuename) +{ + struct call_queue *q; + + if ((q = ao2_alloc(sizeof(*q), destroy_queue))) { + ast_copy_string(q->name, queuename, sizeof(q->name)); + } + return q; } /*!\brief Reload a single queue via realtime. @@ -1151,7 +1177,7 @@ static void destroy_queue(struct call_queue *q) static struct call_queue *find_queue_by_name_rt(const char *queuename, struct ast_variable *queue_vars, struct ast_config *member_config) { struct ast_variable *v; - struct call_queue *q; + struct call_queue *q, tmpq; struct member *m; struct ao2_iterator mem_iter; char *interface = NULL; @@ -1159,24 +1185,23 @@ static struct call_queue *find_queue_by_name_rt(const char *queuename, struct as char tmpbuf[64]; /* Must be longer than the longest queue param name. */ /* Find the queue in the in-core list (we will create a new one if not found). */ - AST_LIST_TRAVERSE(&queues, q, list) { - if (!strcasecmp(q->name, queuename)) - break; - } + ast_copy_string(tmpq.name, queuename, sizeof(tmpq.name)); /* Static queues override realtime. */ - if (q) { - ast_mutex_lock(&q->lock); + if ((q = ao2_find(queues, &tmpq, OBJ_POINTER))) { + ao2_lock(q); if (!q->realtime) { if (q->dead) { - ast_mutex_unlock(&q->lock); + ao2_unlock(q); + queue_unref(q); return NULL; } else { ast_log(LOG_WARNING, "Static queue '%s' already exists. Not loading from realtime\n", q->name); - ast_mutex_unlock(&q->lock); + ao2_unlock(q); return q; } } + queue_unref(q); } else if (!member_config) /* Not found in the list, and it's not realtime ... */ return NULL; @@ -1192,13 +1217,9 @@ static struct call_queue *find_queue_by_name_rt(const char *queuename, struct as q->dead = 1; /* Delete if unused (else will be deleted when last caller leaves). */ - if (!q->count) { - /* Delete. */ - AST_LIST_REMOVE(&queues, q, list); - ast_mutex_unlock(&q->lock); - destroy_queue(q); - } else - ast_mutex_unlock(&q->lock); + ao2_unlink(queues, q); + ao2_unlock(q); + queue_unref(q); } return NULL; } @@ -1207,11 +1228,12 @@ static struct call_queue *find_queue_by_name_rt(const char *queuename, struct as if (!q) { if (!(q = alloc_queue(queuename))) return NULL; - ast_mutex_lock(&q->lock); + ao2_lock(q); clear_queue(q); q->realtime = 1; init_queue(q); /* Ensure defaults for all parameters not set explicitly. */ - AST_LIST_INSERT_HEAD(&queues, q, list); + ao2_link(queues, q); + queue_ref(q); } memset(tmpbuf, 0, sizeof(tmpbuf)); @@ -1250,15 +1272,15 @@ static struct call_queue *find_queue_by_name_rt(const char *queuename, struct as while ((m = ao2_iterator_next(&mem_iter))) { if (m->dead) { ao2_unlink(q->members, m); - ast_mutex_unlock(&q->lock); + ao2_unlock(q); remove_from_interfaces(m->interface); - ast_mutex_lock(&q->lock); + ao2_lock(q); q->membercount--; } ao2_ref(m, -1); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); return q; } @@ -1267,16 +1289,11 @@ static struct call_queue *load_realtime_queue(const char *queuename) { struct ast_variable *queue_vars; struct ast_config *member_config = NULL; - struct call_queue *q; + struct call_queue *q = NULL, tmpq; /* Find the queue in the in-core list first. */ - AST_LIST_LOCK(&queues); - AST_LIST_TRAVERSE(&queues, q, list) { - if (!strcasecmp(q->name, queuename)) { - break; - } - } - AST_LIST_UNLOCK(&queues); + ast_copy_string(tmpq.name, queuename, sizeof(tmpq.name)); + q = ao2_find(queues, &tmpq, OBJ_POINTER); if (!q || q->realtime) { /*! \note Load from realtime before taking the global qlock, to avoid blocking all @@ -1297,15 +1314,14 @@ static struct call_queue *load_realtime_queue(const char *queuename) } } - AST_LIST_LOCK(&queues); - + ao2_lock(queues); q = find_queue_by_name_rt(queuename, queue_vars, member_config); if (member_config) ast_config_destroy(member_config); if (queue_vars) ast_variables_destroy(queue_vars); + ao2_unlock(queues); - AST_LIST_UNLOCK(&queues); } else { update_realtime_members(q); } @@ -1345,7 +1361,7 @@ static void update_realtime_members(struct call_queue *q) return; } - ast_mutex_lock(&q->lock); + ao2_lock(q); /* Temporarily set realtime members dead so we can detect deleted ones.*/ mem_iter = ao2_iterator_init(q->members, 0); @@ -1367,14 +1383,14 @@ static void update_realtime_members(struct call_queue *q) while ((m = ao2_iterator_next(&mem_iter))) { if (m->dead) { ao2_unlink(q->members, m); - ast_mutex_unlock(&q->lock); + ao2_unlock(q); remove_from_interfaces(m->interface); - ast_mutex_lock(&q->lock); + ao2_lock(q); q->membercount--; } ao2_ref(m, -1); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); } static int join_queue(char *queuename, struct queue_ent *qe, enum queue_result *reason) @@ -1389,8 +1405,8 @@ static int join_queue(char *queuename, struct queue_ent *qe, enum queue_result * if (!(q = load_realtime_queue(queuename))) return res; - AST_LIST_LOCK(&queues); - ast_mutex_lock(&q->lock); + ao2_lock(queues); + ao2_lock(q); /* This is our one */ stat = get_member_status(q, qe->max_penalty); @@ -1437,8 +1453,8 @@ static int join_queue(char *queuename, struct queue_ent *qe, enum queue_result * q->name, qe->pos, q->count, qe->chan->uniqueid ); ast_debug(1, "Queue '%s' Join, Channel '%s', Position '%d'\n", q->name, qe->chan->name, qe->pos ); } - ast_mutex_unlock(&q->lock); - AST_LIST_UNLOCK(&queues); + ao2_unlock(q); + ao2_unlock(queues); return res; } @@ -1611,10 +1627,10 @@ static void recalc_holdtime(struct queue_ent *qe, int newholdtime) /* Thanks to SRT for this contribution */ /* 2^2 (4) is the filter coefficient; a higher exponent would give old entries more weight */ - ast_mutex_lock(&qe->parent->lock); + ao2_lock(qe->parent); oldvalue = qe->parent->holdtime; qe->parent->holdtime = (((oldvalue << 2) - oldvalue) + newholdtime) >> 2; - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); } @@ -1626,7 +1642,8 @@ static void leave_queue(struct queue_ent *qe) if (!(q = qe->parent)) return; - ast_mutex_lock(&q->lock); + queue_ref(q); + ao2_lock(q); prev = NULL; for (cur = q->head; cur; cur = cur->next) { @@ -1649,15 +1666,20 @@ static void leave_queue(struct queue_ent *qe) prev = cur; } } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); + + /*If the queue is a realtime queue, check to see if it's still defined in real time*/ + if(q->realtime) { + if(!ast_load_realtime("queues", "name", q->name, NULL)) + q->dead = 1; + } - if (q->dead && !q->count) { + if (q->dead) { /* It's dead and nobody is in it, so kill it */ - AST_LIST_LOCK(&queues); - AST_LIST_REMOVE(&queues, q, list); - AST_LIST_UNLOCK(&queues); - destroy_queue(q); + ao2_unlink(queues, q); + queue_unref(q); } + queue_unref(q); } /* Hang up a list of outgoing calls */ @@ -1684,7 +1706,7 @@ static int update_status(struct call_queue *q, struct member *member, int status /* Since a reload could have taken place, we have to traverse the list to be sure it's still valid */ - ast_mutex_lock(&q->lock); + ao2_lock(q); mem_iter = ao2_iterator_init(q->members, 0); while ((cur = ao2_iterator_next(&mem_iter))) { if (member != cur) { @@ -1709,7 +1731,7 @@ static int update_status(struct call_queue *q, struct member *member, int status } ao2_ref(cur, -1); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); return 0; } @@ -1733,13 +1755,17 @@ static int compare_weight(struct call_queue *rq, struct member *member) struct call_queue *q; struct member *mem; int found = 0; + struct ao2_iterator queue_iter; /* &qlock and &rq->lock already set by try_calling() * to solve deadlock */ - AST_LIST_TRAVERSE(&queues, q, list) { - if (q == rq) /* don't check myself, could deadlock */ + queue_iter = ao2_iterator_init(queues, 0); + while ((q = ao2_iterator_next(&queue_iter))) { + if (q == rq) { /* don't check myself, could deadlock */ + queue_unref(q); continue; - ast_mutex_lock(&q->lock); + } + ao2_lock(q); if (q->count && q->members) { if ((mem = ao2_find(q->members, member, OBJ_POINTER))) { ast_debug(1, "Found matching member %s in queue '%s'\n", mem->interface, q->name); @@ -1750,9 +1776,12 @@ static int compare_weight(struct call_queue *rq, struct member *member) ao2_ref(mem, -1); } } - ast_mutex_unlock(&q->lock); - if (found) + ao2_unlock(q); + if (found) { + queue_unref(q); break; + } + queue_unref(q); } return found; } @@ -1857,9 +1886,9 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies tmp->stillgoing = 0; update_dial_status(qe->parent, tmp->member, status); - ast_mutex_lock(&qe->parent->lock); + ao2_lock(qe->parent); qe->parent->rrpos++; - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); (*busies)++; return 0; @@ -2039,7 +2068,7 @@ static int say_periodic_announcement(struct queue_ent *qe, int ringing) static void record_abandoned(struct queue_ent *qe) { - ast_mutex_lock(&qe->parent->lock); + ao2_lock(qe->parent); set_queue_variables(qe); manager_event(EVENT_FLAG_AGENT, "QueueCallerAbandon", "Queue: %s\r\n" @@ -2050,7 +2079,7 @@ static void record_abandoned(struct queue_ent *qe) qe->parent->name, qe->chan->uniqueid, qe->pos, qe->opos, (int)(time(NULL) - qe->start)); qe->parent->callsabandoned++; - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); } /*! \brief RNA == Ring No Answer. Common code that is executed when we try a queue member and they don't answer. */ @@ -2280,8 +2309,9 @@ static struct callattempt *wait_for_answer(struct queue_ent *qe, struct callatte if (!f || ((f->frametype == AST_FRAME_CONTROL) && (f->subclass == AST_CONTROL_HANGUP))) { /* Got hung up */ *to = -1; - if (f) + if (f) { ast_frfree(f); + } return NULL; } if ((f->frametype == AST_FRAME_DTMF) && caller_disconnect && (f->subclass == '*')) { @@ -2335,7 +2365,7 @@ static int is_our_turn(struct queue_ent *qe) } else { /* This needs a lock. How many members are available to be served? */ - ast_mutex_lock(&qe->parent->lock); + ao2_lock(qe->parent); ch = qe->parent->head; @@ -2372,7 +2402,7 @@ static int is_our_turn(struct queue_ent *qe) res = 0; } - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); } return res; @@ -2443,13 +2473,13 @@ static int wait_our_turn(struct queue_ent *qe, int ringing, enum queue_result *r static int update_queue(struct call_queue *q, struct member *member, int callcompletedinsl) { - ast_mutex_lock(&q->lock); + ao2_lock(q); time(&member->lastcall); member->calls++; q->callscompleted++; if (callcompletedinsl) q->callscompletedinsl++; - ast_mutex_unlock(&q->lock); + ao2_unlock(q); return 0; } @@ -2618,8 +2648,8 @@ static int try_calling(struct queue_ent *qe, const char *options, char *announce /* Hold the lock while we setup the outgoing calls */ if (use_weight) - AST_LIST_LOCK(&queues); - ast_mutex_lock(&qe->parent->lock); + ao2_lock(queues); + ao2_lock(qe->parent); ast_debug(1, "%s is trying to call a queue member.\n", qe->chan->name); ast_copy_string(queuename, qe->parent->name, sizeof(queuename)); @@ -2634,9 +2664,9 @@ static int try_calling(struct queue_ent *qe, const char *options, char *announce if (!tmp) { ao2_ref(cur, -1); - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); if (use_weight) - AST_LIST_UNLOCK(&queues); + ao2_unlock(queues); goto out; } tmp->stillgoing = -1; @@ -2666,15 +2696,15 @@ static int try_calling(struct queue_ent *qe, const char *options, char *announce to = (qe->parent->timeout) ? qe->parent->timeout * 1000 : -1; orig = to; ring_one(qe, outgoing, &numbusies); - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); if (use_weight) - AST_LIST_UNLOCK(&queues); + ao2_unlock(queues); lpeer = wait_for_answer(qe, outgoing, &to, &digit, numbusies, ast_test_flag(&(bridge_config.features_caller), AST_FEATURE_DISCONNECT), forwardsallowed); - ast_mutex_lock(&qe->parent->lock); + ao2_lock(qe->parent); if (qe->parent->strategy == QUEUE_STRATEGY_RRMEMORY) { store_next(qe, outgoing); } - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); peer = lpeer ? lpeer->chan : NULL; if (!peer) { if (to) { @@ -2698,9 +2728,9 @@ static int try_calling(struct queue_ent *qe, const char *options, char *announce /* Update parameters for the queue */ time(&now); recalc_holdtime(qe, (now - qe->start)); - ast_mutex_lock(&qe->parent->lock); + ao2_lock(qe->parent); callcompletedinsl = ((now - qe->start) <= qe->parent->servicelevel); - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); member = lpeer->member; hangupcalls(outgoing, peer); outgoing = NULL; @@ -2782,7 +2812,7 @@ static int try_calling(struct queue_ent *qe, const char *options, char *announce ast_log(LOG_WARNING, "Announcement file '%s' is unavailable, continuing anyway...\n", qe->parent->sound_callerannounce); } - ast_mutex_lock(&qe->parent->lock); + ao2_lock(qe->parent); /* if setinterfacevar is defined, make member variables available to the channel */ /* use pbx_builtin_setvar to set a load of variables with one call */ if (qe->parent->setinterfacevar) { @@ -2801,7 +2831,7 @@ static int try_calling(struct queue_ent *qe, const char *options, char *announce /* try to set queue variables if configured to do so*/ set_queue_variables(qe); - ast_mutex_unlock(&qe->parent->lock); + ao2_unlock(qe->parent); /* Begin Monitoring */ if (qe->parent->monfmt && *qe->parent->monfmt) { @@ -3131,20 +3161,14 @@ static void dump_queue_members(struct call_queue *pm_queue) static int remove_from_queue(const char *queuename, const char *interface) { - struct call_queue *q; + struct call_queue *q, tmpq; struct member *mem, tmpmem; int res = RES_NOSUCHQUEUE; ast_copy_string(tmpmem.interface, interface, sizeof(tmpmem.interface)); - - AST_LIST_LOCK(&queues); - AST_LIST_TRAVERSE(&queues, q, list) { - ast_mutex_lock(&q->lock); - if (strcmp(q->name, queuename)) { - ast_mutex_unlock(&q->lock); - continue; - } - + ast_copy_string(tmpq.name, queuename, sizeof(tmpq.name)); + if((q = ao2_find(queues, &tmpq, OBJ_POINTER))) { + ao2_lock(q); if ((mem = ao2_find(q->members, &tmpmem, OBJ_POINTER | OBJ_UNLINK))) { q->membercount--; manager_event(EVENT_FLAG_AGENT, "QueueMemberRemoved", @@ -3161,15 +3185,13 @@ static int remove_from_queue(const char *queuename, const char *interface) } else { res = RES_EXISTS; } - ast_mutex_unlock(&q->lock); - break; + ao2_unlock(q); + queue_unref(q); } if (res == RES_OKAY) remove_from_interfaces(interface); - AST_LIST_UNLOCK(&queues); - return res; } @@ -3185,9 +3207,9 @@ static int add_to_queue(const char *queuename, const char *interface, const char if (!(q = load_realtime_queue(queuename))) return res; - AST_LIST_LOCK(&queues); + ao2_lock(queues); - ast_mutex_lock(&q->lock); + ao2_lock(q); if ((old_member = interface_exists(q, interface)) == NULL) { add_to_interfaces(interface); if ((new_member = create_queue_member(interface, membername, penalty, paused))) { @@ -3220,8 +3242,8 @@ static int add_to_queue(const char *queuename, const char *interface, const char ao2_ref(old_member, -1); res = RES_EXISTS; } - ast_mutex_unlock(&q->lock); - AST_LIST_UNLOCK(&queues); + ao2_unlock(q); + ao2_unlock(queues); return res; } @@ -3231,15 +3253,16 @@ static int set_member_paused(const char *queuename, const char *interface, const int found = 0; struct call_queue *q; struct member *mem; + struct ao2_iterator queue_iter; /* Special event for when all queues are paused - individual events still generated */ /* XXX In all other cases, we use the membername, but since this affects all queues, we cannot */ if (ast_strlen_zero(queuename)) ast_queue_log("NONE", "NONE", interface, (paused ? "PAUSEALL" : "UNPAUSEALL"), "%s", ""); - AST_LIST_LOCK(&queues); - AST_LIST_TRAVERSE(&queues, q, list) { - ast_mutex_lock(&q->lock); + queue_iter = ao2_iterator_init(queues, 0); + while((q = ao2_iterator_next(&queue_iter))) { + ao2_lock(q); if (ast_strlen_zero(queuename) || !strcasecmp(q->name, queuename)) { if ((mem = interface_exists(q, interface))) { found++; @@ -3275,9 +3298,9 @@ static int set_member_paused(const char *queuename, const char *interface, const ao2_ref(mem, -1); } } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); + queue_unref(q); } - AST_LIST_UNLOCK(&queues); return found ? RESULT_SUCCESS : RESULT_FAILURE; } @@ -3296,10 +3319,10 @@ static void reload_queue_members(void) int paused = 0; struct ast_db_entry *db_tree; struct ast_db_entry *entry; - struct call_queue *cur_queue; + struct call_queue *cur_queue, tmpq; char queue_data[PM_MAX_LEN]; - AST_LIST_LOCK(&queues); + ao2_lock(queues); /* Each key in 'pm_family' is the name of a queue */ db_tree = ast_db_gettree(pm_family, NULL); @@ -3307,12 +3330,8 @@ static void reload_queue_members(void) queue_name = entry->key + strlen(pm_family) + 2; - AST_LIST_TRAVERSE(&queues, cur_queue, list) { - ast_mutex_lock(&cur_queue->lock); - if (!strcmp(queue_name, cur_queue->name)) - break; - ast_mutex_unlock(&cur_queue->lock); - } + ast_copy_string(tmpq.name, queue_name, sizeof(tmpq.name)); + cur_queue = ao2_find(queues, &tmpq, OBJ_POINTER); if (!cur_queue) cur_queue = load_realtime_queue(queue_name); @@ -3323,11 +3342,12 @@ static void reload_queue_members(void) ast_log(LOG_WARNING, "Error loading persistent queue: '%s': it does not exist\n", queue_name); ast_db_del(pm_family, queue_name); continue; - } else - ast_mutex_unlock(&cur_queue->lock); + } - if (ast_db_get(pm_family, queue_name, queue_data, PM_MAX_LEN)) + if (ast_db_get(pm_family, queue_name, queue_data, PM_MAX_LEN)) { + queue_unref(cur_queue); continue; + } cur_ptr = queue_data; while ((member = strsep(&cur_ptr, ",|"))) { @@ -3368,9 +3388,10 @@ static void reload_queue_members(void) break; } } + queue_unref(cur_queue); } - AST_LIST_UNLOCK(&queues); + ao2_unlock(queues); if (db_tree) { ast_log(LOG_NOTICE, "Queue members successfully reloaded from database.\n"); ast_db_freetree(db_tree); @@ -3693,8 +3714,9 @@ check_turns: /* This is the wait loop for callers 2 through maxlen */ res = wait_our_turn(&qe, ringing, &reason); - if (res) + if (res) { goto stop; + } makeannouncement = 0; @@ -3731,8 +3753,9 @@ check_turns: /* Try calling all queue members for 'timeout' seconds */ res = try_calling(&qe, args.options, args.announceoverride, args.url, &tries, &noption, args.agi, args.macro, args.gosub, ringing); - if (res) + if (res) { goto stop; + } stat = get_member_status(qe.parent, qe.max_penalty); @@ -3841,7 +3864,7 @@ stop: static int queue_function_var(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len) { int res = -1; - struct call_queue *q; + struct call_queue *q, tmpq; char interfacevar[256]=""; float sl = 0; @@ -3853,16 +3876,10 @@ static int queue_function_var(struct ast_channel *chan, const char *cmd, char *d return -1; } - AST_LIST_LOCK(&queues); - AST_LIST_TRAVERSE(&queues, q, list) { - if (!strcasecmp(q->name, data)) { - ast_mutex_lock(&q->lock); - break; - } - } - AST_LIST_UNLOCK(&queues); + ast_copy_string(tmpq.name, data, sizeof(tmpq.name)); - if (q) { + if ((q = ao2_find(queues, &tmpq, OBJ_POINTER))) { + ao2_lock(q); if (q->setqueuevar) { sl = 0; res = 0; @@ -3877,7 +3894,8 @@ static int queue_function_var(struct ast_channel *chan, const char *cmd, char *d pbx_builtin_setvar(chan, interfacevar); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); + queue_unref(q); } else ast_log(LOG_WARNING, "queue %s was not found\n", data); @@ -3889,7 +3907,7 @@ static int queue_function_var(struct ast_channel *chan, const char *cmd, char *d static int queue_function_qac(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len) { int count = 0; - struct call_queue *q; + struct call_queue *q, tmpq; struct member *m; struct ao2_iterator mem_iter; @@ -3899,17 +3917,11 @@ static int queue_function_qac(struct ast_channel *chan, const char *cmd, char *d ast_log(LOG_ERROR, "%s requires an argument: queuename\n", cmd); return -1; } - - AST_LIST_LOCK(&queues); - AST_LIST_TRAVERSE(&queues, q, list) { - if (!strcasecmp(q->name, data)) { - ast_mutex_lock(&q->lock); - break; - } - } - AST_LIST_UNLOCK(&queues); - if (q) { + ast_copy_string(tmpq.name, data, sizeof(tmpq.name)); + + if ((q = ao2_find(queues, &tmpq, OBJ_POINTER))) { + ao2_lock(q); mem_iter = ao2_iterator_init(q->members, 0); while ((m = ao2_iterator_next(&mem_iter))) { /* Count the agents who are logged in and presently answering calls */ @@ -3918,7 +3930,8 @@ static int queue_function_qac(struct ast_channel *chan, const char *cmd, char *d } ao2_ref(m, -1); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); + queue_unref(q); } else ast_log(LOG_WARNING, "queue %s was not found\n", data); @@ -3930,7 +3943,7 @@ static int queue_function_qac(struct ast_channel *chan, const char *cmd, char *d static int queue_function_queuewaitingcount(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len) { int count = 0; - struct call_queue *q; + struct call_queue *q, tmpq; buf[0] = '\0'; @@ -3938,19 +3951,14 @@ static int queue_function_queuewaitingcount(struct ast_channel *chan, const char ast_log(LOG_ERROR, "%s requires an argument: queuename\n", cmd); return -1; } - - AST_LIST_LOCK(&queues); - AST_LIST_TRAVERSE(&queues, q, list) { - if (!strcasecmp(q->name, data)) { - ast_mutex_lock(&q->lock); - break; - } - } - AST_LIST_UNLOCK(&queues); - if (q) { + ast_copy_string(tmpq.name, data, sizeof(tmpq.name)); + + if ((q = ao2_find(queues, &tmpq, OBJ_POINTER))) { + ao2_lock(q); count = q->count; - ast_mutex_unlock(&q->lock); + ao2_unlock(q); + queue_unref(q); } else ast_log(LOG_WARNING, "queue %s was not found\n", data); @@ -3961,7 +3969,7 @@ static int queue_function_queuewaitingcount(struct ast_channel *chan, const char static int queue_function_queuememberlist(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len) { - struct call_queue *q; + struct call_queue *q, tmpq; struct member *m; /* Ensure an otherwise empty list doesn't return garbage */ @@ -3972,19 +3980,13 @@ static int queue_function_queuememberlist(struct ast_channel *chan, const char * return -1; } - AST_LIST_LOCK(&queues); - AST_LIST_TRAVERSE(&queues, q, list) { - if (!strcasecmp(q->name, data)) { - ast_mutex_lock(&q->lock); - break; - } - } - AST_LIST_UNLOCK(&queues); + ast_copy_string(tmpq.name, data, sizeof(tmpq.name)); - if (q) { + if ((q = ao2_find(queues, &tmpq, OBJ_POINTER))) { int buflen = 0, count = 0; struct ao2_iterator mem_iter = ao2_iterator_init(q->members, 0); + ao2_lock(q); while ((m = ao2_iterator_next(&mem_iter))) { /* strcat() is always faster than printf() */ if (count++) { @@ -4001,7 +4003,8 @@ static int queue_function_queuememberlist(struct ast_channel *chan, const char * } ao2_ref(m, -1); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); + queue_unref(q); } else ast_log(LOG_WARNING, "queue %s was not found\n", data); @@ -4058,7 +4061,7 @@ static struct ast_custom_function queuememberlist_function = { static int reload_queues(int reload) { - struct call_queue *q; + struct call_queue *q, tmpq; struct ast_config *cfg; char *cat, *tmp; struct ast_variable *var; @@ -4071,6 +4074,7 @@ static int reload_queues(int reload) char *membername = NULL; int penalty; struct ast_flags config_flags = { reload ? CONFIG_FLAG_FILEUNCHANGED : 0 }; + struct ao2_iterator queue_iter; AST_DECLARE_APP_ARGS(args, AST_APP_ARG(interface); AST_APP_ARG(penalty); @@ -4082,14 +4086,16 @@ static int reload_queues(int reload) return 0; } else if (cfg == CONFIG_STATUS_FILEUNCHANGED) return 0; - AST_LIST_LOCK(&queues); + ao2_lock(queues); use_weight=0; /* Mark all queues as dead for the moment */ - AST_LIST_TRAVERSE(&queues, q, list) { + queue_iter = ao2_iterator_init(queues, F_AO2I_DONTLOCK); + while ((q = ao2_iterator_next(&queue_iter))) { if(!q->realtime) { q->dead = 1; q->found = 0; } + queue_unref(q); } /* Chug through config file */ @@ -4116,11 +4122,8 @@ static int reload_queues(int reload) update_cdr = ast_true(general_val); } else { /* Define queue */ /* Look for an existing one */ - AST_LIST_TRAVERSE(&queues, q, list) { - if (!strcmp(q->name, cat)) - break; - } - if (!q) { + ast_copy_string(tmpq.name, cat, sizeof(tmpq.name)); + if(!(q = ao2_find(queues, &tmpq, OBJ_POINTER))) { /* Make one then */ if (!(q = alloc_queue(cat))) { /* TODO: Handle memory allocation failure */ @@ -4130,12 +4133,12 @@ static int reload_queues(int reload) new = 0; if (q) { if (!new) - ast_mutex_lock(&q->lock); + ao2_lock(q); /* Check if a queue with this name already exists */ if (q->found) { ast_log(LOG_WARNING, "Queue '%s' already defined! Skipping!\n", cat); if(!new) - ast_mutex_unlock(&q->lock); + ao2_unlock(q); continue; } /* Re-initialize the queue, and clear statistics */ @@ -4208,22 +4211,21 @@ static int reload_queues(int reload) } if (new) { - AST_LIST_INSERT_HEAD(&queues, q, list); + ao2_link(queues, q); + queue_ref(q); } else - ast_mutex_unlock(&q->lock); + ao2_unlock(q); + queue_unref(q); } } } ast_config_destroy(cfg); - AST_LIST_TRAVERSE_SAFE_BEGIN(&queues, q, list) { + queue_iter = ao2_iterator_init(queues, 0); + while ((q = ao2_iterator_next(&queue_iter))) { if (q->dead) { - AST_LIST_REMOVE_CURRENT(&queues, list); - if (!q->count) - destroy_queue(q); - else - ast_debug(1, "XXX Leaking a little memory :( XXX\n"); + ao2_unlink(queues, q); } else { - ast_mutex_lock(&q->lock); + ao2_lock(q); mem_iter = ao2_iterator_init(q->members, 0); while ((cur = ao2_iterator_next(&mem_iter))) { if (cur->dynamic) @@ -4231,11 +4233,11 @@ static int reload_queues(int reload) cur->status = ast_device_state(cur->interface); ao2_ref(cur, -1); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); } + queue_unref(q); } - AST_LIST_TRAVERSE_SAFE_END; - AST_LIST_UNLOCK(&queues); + ao2_unlock(queues); return 1; } @@ -4254,6 +4256,7 @@ static char *__queues_show(struct mansession *s, int fd, int argc, char **argv) struct ast_str *out = ast_str_alloca(240); int found = 0; time_t now = time(NULL); + struct ao2_iterator queue_iter; struct ao2_iterator mem_iter; if (argc != 2 && argc != 3) @@ -4263,13 +4266,13 @@ static char *__queues_show(struct mansession *s, int fd, int argc, char **argv) if (argc == 3) /* specific queue */ load_realtime_queue(argv[2]); - AST_LIST_LOCK(&queues); - AST_LIST_TRAVERSE(&queues, q, list) { + queue_iter = ao2_iterator_init(queues, 0); + while ((q = ao2_iterator_next(&queue_iter))) { float sl; - ast_mutex_lock(&q->lock); + ao2_lock(q); if (argc == 3 && strcasecmp(q->name, argv[2])) { - ast_mutex_unlock(&q->lock); + ao2_unlock(q); continue; } found = 1; @@ -4326,11 +4329,13 @@ static char *__queues_show(struct mansession *s, int fd, int argc, char **argv) } } do_print(s, fd, ""); /* blank line between entries */ - ast_mutex_unlock(&q->lock); - if (argc == 3) /* print a specific entry */ + ao2_unlock(q); + if (argc == 3) { /* print a specific entry */ + queue_unref(q); break; + } + queue_unref(q); } - AST_LIST_UNLOCK(&queues); if (!found) { if (argc == 3) ast_str_set(&out, 0, "No such queue: %s.", argv[2]); @@ -4347,15 +4352,17 @@ static char *complete_queue(const char *line, const char *word, int pos, int sta char *ret = NULL; int which = 0; int wordlen = strlen(word); + struct ao2_iterator queue_iter; - AST_LIST_LOCK(&queues); - AST_LIST_TRAVERSE(&queues, q, list) { + queue_iter = ao2_iterator_init(queues, 0); + while((q = ao2_iterator_next(&queue_iter))) { if (!strncasecmp(word, q->name, wordlen) && ++which > state) { ret = ast_strdup(q->name); + queue_unref(q); break; } + queue_unref(q); } - AST_LIST_UNLOCK(&queues); return ret; } @@ -4409,15 +4416,16 @@ static int manager_queues_summary(struct mansession *s, const struct message *m) struct call_queue *q; struct queue_ent *qe; struct member *mem; + struct ao2_iterator queue_iter; struct ao2_iterator mem_iter; astman_send_ack(s, m, "Queue summary will follow"); time(&now); - AST_LIST_LOCK(&queues); if (!ast_strlen_zero(id)) snprintf(idText, 256, "ActionID: %s\r\n", id); - AST_LIST_TRAVERSE(&queues, q, list) { - ast_mutex_lock(&q->lock); + queue_iter = ao2_iterator_init(queues, 0); + while((q = ao2_iterator_next(&queue_iter))) { + ao2_lock(q); /* List queue properties */ if (ast_strlen_zero(queuefilter) || !strcmp(q->name, queuefilter)) { @@ -4445,15 +4453,14 @@ static int manager_queues_summary(struct mansession *s, const struct message *m) "\r\n", q->name, qmemcount, qmemavail, qchancount, q->holdtime, idText); } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); + queue_unref(q); } astman_append(s, "Event: QueueSummaryComplete\r\n" "%s" "\r\n", idText); - AST_LIST_UNLOCK(&queues); - return RESULT_SUCCESS; } @@ -4470,16 +4477,17 @@ static int manager_queues_status(struct mansession *s, const struct message *m) struct queue_ent *qe; float sl = 0; struct member *mem; + struct ao2_iterator queue_iter; struct ao2_iterator mem_iter; astman_send_ack(s, m, "Queue status will follow"); time(&now); - AST_LIST_LOCK(&queues); if (!ast_strlen_zero(id)) snprintf(idText, sizeof(idText), "ActionID: %s\r\n", id); - AST_LIST_TRAVERSE(&queues, q, list) { - ast_mutex_lock(&q->lock); + queue_iter = ao2_iterator_init(queues, 0); + while ((q = ao2_iterator_next(&queue_iter))) { + ao2_lock(q); /* List queue properties */ if (ast_strlen_zero(queuefilter) || !strcmp(q->name, queuefilter)) { @@ -4538,7 +4546,8 @@ static int manager_queues_status(struct mansession *s, const struct message *m) (long) (now - qe->start), idText); } } - ast_mutex_unlock(&q->lock); + ao2_unlock(q); + queue_unref(q); } astman_append(s, @@ -4546,9 +4555,6 @@ static int manager_queues_status(struct mansession *s, const struct message *m) "%s" "\r\n",idText); - AST_LIST_UNLOCK(&queues); - - return RESULT_SUCCESS; } @@ -4778,6 +4784,7 @@ static char *complete_queue_remove_member(const char *line, const char *word, in int which = 0; struct call_queue *q; struct member *m; + struct ao2_iterator queue_iter; struct ao2_iterator mem_iter; int wordlen = strlen(word); @@ -4791,22 +4798,23 @@ static char *complete_queue_remove_member(const char *line, const char *word, in return complete_queue(line, word, pos, state); /* here is the case for 3, <member> */ - if (!AST_LIST_EMPTY(&queues)) { /* XXX unnecessary ? the traverse does that for us */ - AST_LIST_TRAVERSE(&queues, q, list) { - ast_mutex_lock(&q->lock); - mem_iter = ao2_iterator_init(q->members, 0); - while ((m = ao2_iterator_next(&mem_iter))) { - if (!strncasecmp(word, m->membername, wordlen) && ++which > state) { - char *tmp; - ast_mutex_unlock(&q->lock); - tmp = m->membername; - ao2_ref(m, -1); - return tmp; - } + queue_iter = ao2_iterator_init(queues, 0); + while ((q = ao2_iterator_next(&queue_iter))) { + ao2_lock(q); + mem_iter = ao2_iterator_init(q->members, 0); + while ((m = ao2_iterator_next(&mem_iter))) { + if (!strncasecmp(word, m->membername, wordlen) && ++which > state) { + char *tmp; + ao2_unlock(q); + tmp = m->membername; ao2_ref(m, -1); + queue_unref(q); + return tmp; } - ast_mutex_unlock(&q->lock); + ao2_ref(m, -1); } + ao2_unlock(q); + queue_unref(q); } return NULL; @@ -4978,6 +4986,8 @@ static int unload_module(void) clear_and_free_interfaces(); + ao2_ref(queues, -1); + return res; } @@ -4986,6 +4996,8 @@ static int load_module(void) int res; struct ast_context *con; + queues = ao2_container_alloc(MAX_QUEUE_BUCKETS, queue_hash_cb, queue_cmp_cb); + if (!reload_queues(0)) return AST_MODULE_LOAD_DECLINE;