diff --git a/main/stasis.c b/main/stasis.c index 0a5db2f16f4073579e552611df0ed59c780a4472..4d05f18e85c9a7d1f32ec9f7d8d46052b79e8980 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -178,28 +178,22 @@ static void topic_dtor(void *obj) struct stasis_topic *stasis_topic_create(const char *name) { - RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup); + struct stasis_topic *topic; int res = 0; topic = ao2_alloc(sizeof(*topic), topic_dtor); - if (!topic) { return NULL; } topic->name = ast_strdup(name); - if (!topic->name) { - return NULL; - } - res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX); res |= AST_VECTOR_INIT(&topic->upstream_topics, 0); - - if (res != 0) { + if (!topic->name || res) { + ao2_cleanup(topic); return NULL; } - ao2_ref(topic, +1); return topic; } @@ -221,8 +215,6 @@ struct stasis_subscription { /*! Data pointer to be handed to the callback. */ void *data; - /*! Lock for completion flags \c final_message_{rxed,processed}. */ - ast_mutex_t join_lock; /*! Condition for joining with subscription. */ ast_cond_t join_cond; /*! Flag set when final message for sub has been received. @@ -248,7 +240,6 @@ static void subscription_dtor(void *obj) sub->topic = NULL; ast_taskprocessor_unreference(sub->mailbox); sub->mailbox = NULL; - ast_mutex_destroy(&sub->join_lock); ast_cond_destroy(&sub->join_cond); } @@ -263,7 +254,8 @@ static void subscription_invoke(struct stasis_subscription *sub, { /* Notify that the final message has been received */ if (stasis_subscription_final_message(sub, message)) { - SCOPED_MUTEX(lock, &sub->join_lock); + SCOPED_AO2LOCK(lock, sub); + sub->final_message_rxed = 1; ast_cond_signal(&sub->join_cond); } @@ -273,7 +265,8 @@ static void subscription_invoke(struct stasis_subscription *sub, /* Notify that the final message has been processed */ if (stasis_subscription_final_message(sub, message)) { - SCOPED_MUTEX(lock, &sub->join_lock); + SCOPED_AO2LOCK(lock, sub); + sub->final_message_processed = 1; ast_cond_signal(&sub->join_cond); } @@ -294,6 +287,7 @@ struct stasis_subscription *internal_stasis_subscribe( return NULL; } + /* The ao2 lock is used for join_cond. */ sub = ao2_alloc(sizeof(*sub), subscription_dtor); if (!sub) { return NULL; @@ -323,7 +317,6 @@ struct stasis_subscription *internal_stasis_subscribe( sub->topic = topic; sub->callback = callback; sub->data = data; - ast_mutex_init(&sub->join_lock); ast_cond_init(&sub->join_cond, NULL); if (topic_add_subscription(topic, sub) != 0) { @@ -385,11 +378,12 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) void stasis_subscription_join(struct stasis_subscription *subscription) { if (subscription) { - SCOPED_MUTEX(lock, &subscription->join_lock); + SCOPED_AO2LOCK(lock, subscription); + /* Wait until the processed flag has been set */ while (!subscription->final_message_processed) { ast_cond_wait(&subscription->join_cond, - &subscription->join_lock); + ao2_object_get_lockaddr(subscription)); } } } @@ -397,7 +391,8 @@ void stasis_subscription_join(struct stasis_subscription *subscription) int stasis_subscription_is_done(struct stasis_subscription *subscription) { if (subscription) { - SCOPED_MUTEX(lock, &subscription->join_lock); + SCOPED_AO2LOCK(lock, subscription); + return subscription->final_message_rxed; } @@ -446,6 +441,7 @@ const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub) int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg) { struct stasis_subscription_change *change; + if (stasis_message_type(msg) != stasis_subscription_change_type()) { return 0; } @@ -575,9 +571,7 @@ static void dispatch_message(struct stasis_subscription *sub, */ ao2_bump(message); if (!synchronous) { - if (ast_taskprocessor_push_local(sub->mailbox, - dispatch_exec_async, - message) != 0) { + if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) { /* Push failed; ugh. */ ast_log(LOG_ERROR, "Dropping async dispatch\n"); ao2_cleanup(message); @@ -590,12 +584,12 @@ static void dispatch_message(struct stasis_subscription *sub, std.complete = 0; std.task_data = message; - if (ast_taskprocessor_push_local(sub->mailbox, - dispatch_exec_sync, - &std)) { + if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) { /* Push failed; ugh. */ ast_log(LOG_ERROR, "Dropping sync dispatch\n"); ao2_cleanup(message); + ast_mutex_destroy(&std.lock); + ast_cond_destroy(&std.cond); return; } @@ -718,7 +712,7 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic, return NULL; } - forward = ao2_alloc(sizeof(*forward), forward_dtor); + forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK); if (!forward) { return NULL; } @@ -746,16 +740,18 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic, static void subscription_change_dtor(void *obj) { struct stasis_subscription_change *change = obj; + ast_string_field_free_memory(change); ao2_cleanup(change->topic); } static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description) { - RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup); + struct stasis_subscription_change *change; change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor); - if (ast_string_field_init(change, 128)) { + if (!change || ast_string_field_init(change, 128)) { + ao2_cleanup(change); return NULL; } @@ -764,51 +760,50 @@ static struct stasis_subscription_change *subscription_change_alloc(struct stasi ao2_ref(topic, +1); change->topic = topic; - ao2_ref(change, +1); return change; } static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub) { - RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup); - RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + struct stasis_subscription_change *change; + struct stasis_message *msg; /* This assumes that we have already unsubscribed */ ast_assert(stasis_subscription_is_subscribed(sub)); change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe"); - if (!change) { return; } msg = stasis_message_create(stasis_subscription_change_type(), change); - if (!msg) { + ao2_cleanup(change); return; } stasis_publish(topic, msg); + ao2_cleanup(msg); + ao2_cleanup(change); } static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub) { - RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup); - RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup); + struct stasis_subscription_change *change; + struct stasis_message *msg; /* This assumes that we have already unsubscribed */ ast_assert(!stasis_subscription_is_subscribed(sub)); change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe"); - if (!change) { return; } msg = stasis_message_create(stasis_subscription_change_type(), change); - if (!msg) { + ao2_cleanup(change); return; } @@ -816,6 +811,9 @@ static void send_subscription_unsubscribe(struct stasis_topic *topic, /* Now we have to dispatch to the subscription itself */ dispatch_message(sub, msg, 0); + + ao2_cleanup(msg); + ao2_cleanup(change); } struct topic_pool_entry { @@ -826,6 +824,7 @@ struct topic_pool_entry { static void topic_pool_entry_dtor(void *obj) { struct topic_pool_entry *entry = obj; + entry->forward = stasis_forward_cancel(entry->forward); ao2_cleanup(entry->topic); entry->topic = NULL; @@ -833,7 +832,8 @@ static void topic_pool_entry_dtor(void *obj) static struct topic_pool_entry *topic_pool_entry_alloc(void) { - return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor); + return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor, + AO2_ALLOC_OPT_LOCK_NOLOCK); } struct stasis_topic_pool { @@ -844,6 +844,7 @@ struct stasis_topic_pool { static void topic_pool_dtor(void *obj) { struct stasis_topic_pool *pool = obj; + ao2_cleanup(pool->pool_container); pool->pool_container = NULL; ao2_cleanup(pool->pool_topic); @@ -852,28 +853,80 @@ static void topic_pool_dtor(void *obj) static int topic_pool_entry_hash(const void *obj, const int flags) { - const char *topic_name = (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic); - return ast_str_case_hash(topic_name); + const struct topic_pool_entry *object; + const char *key; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_KEY: + key = obj; + break; + case OBJ_SEARCH_OBJECT: + object = obj; + key = stasis_topic_name(object->topic); + break; + default: + /* Hash can only work on something with a full key. */ + ast_assert(0); + return 0; + } + return ast_str_case_hash(key); } static int topic_pool_entry_cmp(void *obj, void *arg, int flags) { - struct topic_pool_entry *opt1 = obj, *opt2 = arg; - const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic); - return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP; + const struct topic_pool_entry *object_left = obj; + const struct topic_pool_entry *object_right = arg; + const char *right_key = arg; + int cmp; + + switch (flags & OBJ_SEARCH_MASK) { + case OBJ_SEARCH_OBJECT: + right_key = stasis_topic_name(object_right->topic); + /* Fall through */ + case OBJ_SEARCH_KEY: + cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key); + break; + case OBJ_SEARCH_PARTIAL_KEY: + /* Not supported by container */ + ast_assert(0); + cmp = -1; + break; + default: + /* + * What arg points to is specific to this traversal callback + * and has no special meaning to astobj2. + */ + cmp = 0; + break; + } + if (cmp) { + return 0; + } + /* + * At this point the traversal callback is identical to a sorted + * container. + */ + return CMP_MATCH; } struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic) { - RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup); + struct stasis_topic_pool *pool; + + pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK); if (!pool) { return NULL; } - pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp); + + pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, + topic_pool_entry_hash, topic_pool_entry_cmp); + if (!pool->pool_container) { + ao2_cleanup(pool); + return NULL; + } ao2_ref(pooled_topic, +1); pool->pool_topic = pooled_topic; - ao2_ref(pool, +1); return pool; } @@ -881,14 +934,13 @@ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, { RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup); SCOPED_AO2LOCK(topic_container_lock, pool->pool_container); - topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK); + topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK); if (topic_pool_entry) { return topic_pool_entry->topic; } topic_pool_entry = topic_pool_entry_alloc(); - if (!topic_pool_entry) { return NULL; } @@ -903,7 +955,9 @@ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, return NULL; } - ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK); + if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) { + return NULL; + } return topic_pool_entry->topic; }