diff --git a/res/stasis/messaging.c b/res/stasis/messaging.c index 78d6494a54754492718e79272ae26e3b625ce7f1..2caa8ed228db6438fbccdf5eaffa6a2ccedd60a0 100644 --- a/res/stasis/messaging.c +++ b/res/stasis/messaging.c @@ -289,18 +289,42 @@ static struct ast_json *msg_to_json(struct ast_msg *msg) return json_obj; } +static void dispatch_message(struct message_subscription *sub, const char *endpoint_name, struct ast_json *json_msg) +{ + int i; + + ast_debug(3, "Dispatching message to subscription %s for endpoint %s\n", + sub->token, + endpoint_name); + for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) { + struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i); + + tuple->callback(endpoint_name, json_msg, tuple->pvt); + } +} + static int handle_msg_cb(struct ast_msg *msg) { + /* We have at most 3 subscriptions: TECH_WILDCARD, tech itself, and endpoint. */ + struct message_subscription *matching_subscriptions[3]; struct message_subscription *sub; - int i; + int i, j; + int result; char buf[256]; const char *endpoint_name; struct ast_json *json_msg; msg_to_endpoint(msg, buf, sizeof(buf)); + endpoint_name = buf; + json_msg = msg_to_json(msg); + if (!json_msg) { + return -1; + } + result = -1; + /* Find subscriptions to TECH_WILDCARD and to the endpoint's technology. */ ast_rwlock_rdlock(&tech_subscriptions_lock); - for (i = 0; i < AST_VECTOR_SIZE(&tech_subscriptions); i++) { + for (i = 0, j = 0; i < AST_VECTOR_SIZE(&tech_subscriptions) && j < 2; i++) { sub = AST_VECTOR_GET(&tech_subscriptions, i); if (!sub) { @@ -309,40 +333,30 @@ static int handle_msg_cb(struct ast_msg *msg) if (!strcmp(sub->token, TECH_WILDCARD) || !strncasecmp(sub->token, buf, strlen(sub->token))) { - ast_rwlock_unlock(&tech_subscriptions_lock); - ao2_bump(sub); - endpoint_name = buf; - goto match; + ao2_ref(sub, +1); + matching_subscriptions[j++] = sub; } } ast_rwlock_unlock(&tech_subscriptions_lock); + /* Find the subscription to this particular endpoint. */ sub = ao2_find(endpoint_subscriptions, buf, OBJ_SEARCH_KEY); if (sub) { - endpoint_name = buf; - goto match; + matching_subscriptions[j++] = sub; } - return -1; + /* Dispatch the message to all matching subscriptions. */ + for (i = 0; i < j; i++) { + sub = matching_subscriptions[i]; -match: - ast_debug(3, "Dispatching message for %s\n", endpoint_name); + dispatch_message(sub, endpoint_name, json_msg); - json_msg = msg_to_json(msg); - if (!json_msg) { ao2_ref(sub, -1); - return -1; - } - - for (i = 0; i < AST_VECTOR_SIZE(&sub->applications); i++) { - struct application_tuple *tuple = AST_VECTOR_GET(&sub->applications, i); - - tuple->callback(endpoint_name, json_msg, tuple->pvt); + result = 0; } ast_json_unref(json_msg); - ao2_ref(sub, -1); - return 0; + return result; } struct ast_msg_handler ari_msg_handler = {