diff --git a/main/stasis.c b/main/stasis.c index 2ad0caf93b4699ce714a9ae6d1028042a9bbde45..d7769790cf908c2e1e77f96c662a0d1ab392b35c 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -131,7 +131,7 @@ static void subscription_dtor(void *obj) static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description); -static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox) +struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data) { RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup); RAII_VAR(struct ast_uuid *, id, NULL, ast_free); @@ -148,11 +148,10 @@ static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic return NULL; } ast_uuid_to_str(id, uniqueid, sizeof(uniqueid)); - if (needs_mailbox) { - sub->mailbox = ast_threadpool_serializer(uniqueid, pool); - if (!sub->mailbox) { - return NULL; - } + + sub->mailbox = ast_threadpool_serializer(uniqueid, pool); + if (!sub->mailbox) { + return NULL; } sub->uniqueid = ast_strdup(uniqueid); @@ -170,11 +169,6 @@ static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic return sub; } -struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data) -{ - return __stasis_subscribe(topic, callback, data, 1); -} - struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub) { if (sub) { @@ -338,58 +332,29 @@ static int dispatch_exec(void *data) void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message) { - struct stasis_subscription **subscribers = NULL; - size_t num_subscribers, i; + size_t i; + SCOPED_AO2LOCK(lock, topic); ast_assert(topic != NULL); ast_assert(publisher_topic != NULL); ast_assert(message != NULL); - /* Copy the subscribers, so we don't have to hold the mutex for long */ - { - SCOPED_AO2LOCK(lock, topic); - num_subscribers = topic->num_subscribers_current; - subscribers = ast_malloc(num_subscribers * sizeof(*subscribers)); - if (subscribers) { - for (i = 0; i < num_subscribers; ++i) { - ao2_ref(topic->subscribers[i], +1); - subscribers[i] = topic->subscribers[i]; - } - } - } - - if (!subscribers) { - ast_log(LOG_ERROR, "Dropping message\n"); - return; - } - - for (i = 0; i < num_subscribers; ++i) { - struct stasis_subscription *sub = subscribers[i]; + for (i = 0; i < topic->num_subscribers_current; ++i) { + struct stasis_subscription *sub = topic->subscribers[i]; + RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); ast_assert(sub != NULL); - if (sub->mailbox) { - RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup); - - dispatch = dispatch_create(publisher_topic, message, sub); - if (!dispatch) { - ast_log(LOG_DEBUG, "Dropping dispatch\n"); - break; - } - - if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) { - dispatch = NULL; /* Ownership transferred to mailbox */ - } - } else { - /* No mailbox; dispatch directly */ - sub->callback(sub->data, sub, sub->topic, message); + dispatch = dispatch_create(publisher_topic, message, sub); + if (!dispatch) { + ast_log(LOG_DEBUG, "Dropping dispatch\n"); + break; } - } - for (i = 0; i < num_subscribers; ++i) { - ao2_cleanup(subscribers[i]); + if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) { + dispatch = NULL; /* Ownership transferred to mailbox */ + } } - ast_free(subscribers); } void stasis_publish(struct stasis_topic *topic, struct stasis_message *message) @@ -414,8 +379,8 @@ struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, if (!from_topic || !to_topic) { return NULL; } - /* Subscribe without a mailbox, since we're just forwarding messages */ - sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0); + + sub = stasis_subscribe(from_topic, stasis_forward_cb, to_topic); if (sub) { /* hold a ref to to_topic for this forwarding subscription */ ao2_ref(to_topic, +1);