diff --git a/main/stasis_state.c b/main/stasis_state.c index aa00f9a4d4a5d94c86e106f85ac24f0325127223..b85462fe15c9c6b217d45ac7d831385b929e1bbd 100644 --- a/main/stasis_state.c +++ b/main/stasis_state.c @@ -24,6 +24,18 @@ #include "asterisk/stasis_state.h" +/*! + * \internal + * \brief Used to link a stasis_state to it's manager + */ +struct stasis_state_proxy { + AO2_WEAKPROXY(); + /*! The manager that owns and handles this state */ + struct stasis_state_manager *manager; + /*! A unique id for this state object. */ + char id[0]; +}; + /*! * \internal * \brief Associates a stasis topic to its last known published message @@ -38,7 +50,10 @@ struct stasis_state { /*! The number of state subscribers */ unsigned int num_subscribers; - /*! The manager that owns and handles this state */ + /*! + * \brief The manager that owns and handles this state + * \note This reference is owned by stasis_state_proxy + */ struct stasis_state_manager *manager; /*! Forwarding information, i.e. this topic to manager's topic */ struct stasis_forward *forward; @@ -52,11 +67,11 @@ struct stasis_state { */ AST_VECTOR(, struct ast_eid) eids; /*! A unique id for this state object. */ - char id[0]; + char *id; }; -AO2_STRING_FIELD_HASH_FN(stasis_state, id); -AO2_STRING_FIELD_CMP_FN(stasis_state, id); +AO2_STRING_FIELD_HASH_FN(stasis_state_proxy, id); +AO2_STRING_FIELD_CMP_FN(stasis_state_proxy, id); /*! The number of buckets to use for managed states */ #define STATE_BUCKETS 57 @@ -112,17 +127,28 @@ static void state_dtor(void *obj) state->topic = NULL; ao2_cleanup(state->msg); state->msg = NULL; - ao2_cleanup(state->manager); - state->manager = NULL; /* All eids should have been removed */ ast_assert(AST_VECTOR_SIZE(&state->eids) == 0); AST_VECTOR_FREE(&state->eids); } +static void state_proxy_dtor(void *obj) { + struct stasis_state_proxy *proxy = obj; + + ao2_cleanup(proxy->manager); +} + +static void state_proxy_sub_cb(void *obj, void *data) +{ + struct stasis_state_proxy *proxy = obj; + + ao2_unlink(proxy->manager->states, proxy); +} + /*! * \internal - * \brief Allocate a stasis state object. + * \brief Allocate a stasis state object and add it to the manager. * * Create and initialize a state structure. It's required that either a state * topic, or an id is specified. If a state topic is not given then one will be @@ -134,37 +160,48 @@ static void state_dtor(void *obj) * * \return A stasis_state object or NULL * \return NULL on error + * + * \pre manager->states must be locked. + * \pre manager->states does not contain an object matching key \a id. */ static struct stasis_state *state_alloc(struct stasis_state_manager *manager, - struct stasis_topic *state_topic, const char *id) + struct stasis_topic *state_topic, const char *id, + const char *file, int line, const char *func) { - struct stasis_state *state; + struct stasis_state_proxy *proxy = NULL; + struct stasis_state *state = NULL; + + if (!id) { + /* If not given an id, then a state topic is required */ + ast_assert(state_topic != NULL); + + /* Get the id we'll key off of from the state topic */ + id = state_id_by_topic(manager->all_topic, state_topic); + } + + state = __ao2_alloc(sizeof(*state), state_dtor, AO2_ALLOC_OPT_LOCK_MUTEX, id, file, line, func); + if (!state) { + goto error_return; + } if (!state_topic) { char *name; - /* If not given a state topic, then an id is required */ - ast_assert(id != NULL); - /* * To provide further detail and to ensure that the topic is unique within the * scope of the system we prefix it with the manager's topic name, which should * itself already be unique. */ if (ast_asprintf(&name, "%s/%s", stasis_topic_name(manager->all_topic), id) < 0) { - ast_log(LOG_ERROR, "Unable to create state topic name '%s/%s'\n", - stasis_topic_name(manager->all_topic), id); - return NULL; + goto error_return; } - state_topic = stasis_topic_create(name); + state->topic = stasis_topic_create(name); - if (!state_topic) { - ast_log(LOG_ERROR, "Unable to create state topic '%s'\n", name); - ast_free(name); - return NULL; - } ast_free(name); + if (!state->topic) { + goto error_return; + } } else { /* * Since the state topic was passed in, go ahead and bump its reference. @@ -172,87 +209,57 @@ static struct stasis_state *state_alloc(struct stasis_state_manager *manager, * state allocation error. */ ao2_ref(state_topic, +1); + state->topic = state_topic; } - if (!id) { - /* If not given an id, then a state topic is required */ - ast_assert(state_topic != NULL); - - /* Get the id we'll key off of from the state topic */ - id = state_id_by_topic(manager->all_topic, state_topic); + proxy = ao2_t_weakproxy_alloc(sizeof(*proxy) + strlen(id) + 1, state_proxy_dtor, id); + if (!proxy) { + goto error_return; } - state = ao2_alloc(sizeof(*state) + strlen(id) + 1, state_dtor); - if (!state) { - ast_log(LOG_ERROR, "Unable to allocate state '%s' in manager '%s'\n", - id, stasis_topic_name(manager->all_topic)); - ao2_ref(state_topic, -1); - return NULL; - } + strcpy(proxy->id, id); /* Safe */ - strcpy(state->id, id); /* Safe */ - state->topic = state_topic; /* ref already bumped above */ + state->id = proxy->id; + proxy->manager = ao2_bump(manager); + state->manager = proxy->manager; /* state->manager is owned by the proxy */ state->forward = stasis_forward_all(state->topic, manager->all_topic); if (!state->forward) { - ast_log(LOG_ERROR, "Unable to add state '%s' forward in manager '%s'\n", - id, stasis_topic_name(manager->all_topic)); - ao2_ref(state, -1); - return NULL; + goto error_return; } if (AST_VECTOR_INIT(&state->eids, 2)) { - ast_log(LOG_ERROR, "Unable to initialize eids for state '%s' in manager '%s'\n", - id, stasis_topic_name(manager->all_topic)); - ao2_ref(state, -1); - return NULL; + goto error_return; } - state->manager = ao2_bump(manager); - - return state; -} - -/*! - * \internal - * \brief Create a state object, and add it to the manager. - * - * \note Locking on the states container is specifically not done here, thus - * appropriate locks should be applied prior to this function being called. - * - * \param manager The manager to be added to - * \param state_topic A state topic to be managed (if NULL id is required) - * \param id The unique id for the state (if NULL state_topic is required) - * - * \return The added state object - * \return NULL on error - */ -static struct stasis_state *state_add(struct stasis_state_manager *manager, - struct stasis_topic *state_topic, const char *id) -{ - struct stasis_state *state = state_alloc(manager, state_topic, id); + if (ao2_t_weakproxy_set_object(proxy, state, OBJ_NOLOCK, "weakproxy link")) { + goto error_return; + } - if (!state) { - return NULL; + if (ao2_weakproxy_subscribe(proxy, state_proxy_sub_cb, NULL, OBJ_NOLOCK)) { + goto error_return; } - if (!ao2_link_flags(manager->states, state, OBJ_NOLOCK)) { - ast_log(LOG_ERROR, "Unable to add state '%s' to manager '%s'\n", - state->id ? state->id : "", stasis_topic_name(manager->all_topic)); - ao2_ref(state, -1); - return NULL; + if (!ao2_link_flags(manager->states, proxy, OBJ_NOLOCK)) { + goto error_return; } + ao2_ref(proxy, -1); + return state; + +error_return: + ast_log(LOG_ERROR, "Unable to allocate state '%s' in manager '%s'\n", + id, stasis_topic_name(manager->all_topic)); + ao2_cleanup(state); + ao2_cleanup(proxy); + return NULL; } /*! * \internal * \brief Find a state by id, or create one if not found and add it to the manager. * - * \note Locking on the states container is specifically not done here, thus - * appropriate locks should be applied prior to this function being called. - * * \param manager The manager to be added to * \param state_topic A state topic to be managed (if NULL id is required) * \param id The unique id for the state (if NULL state_topic is required) @@ -260,18 +267,26 @@ static struct stasis_state *state_add(struct stasis_state_manager *manager, * \return The added state object * \return NULL on error */ -static struct stasis_state *state_find_or_add(struct stasis_state_manager *manager, - struct stasis_topic *state_topic, const char *id) +#define state_find_or_add(mgr, top, id) __state_find_or_add(mgr, top, id, __FILE__, __LINE__, __PRETTY_FUNCTION__) +static struct stasis_state *__state_find_or_add(struct stasis_state_manager *manager, + struct stasis_topic *state_topic, const char *id, + const char *file, int line, const char *func) { struct stasis_state *state; + ao2_lock(manager->states); if (ast_strlen_zero(id)) { id = state_id_by_topic(manager->all_topic, state_topic); } - state = ao2_find(manager->states, id, OBJ_SEARCH_KEY | OBJ_NOLOCK); + state = ao2_weakproxy_find(manager->states, id, OBJ_SEARCH_KEY | OBJ_NOLOCK, ""); + if (!state) { + state = state_alloc(manager, state_topic, id, file, line, func); + } + + ao2_unlock(manager->states); - return state ? state : state_add(manager, state_topic, id); + return state; } static void state_manager_dtor(void *obj) @@ -317,7 +332,7 @@ struct stasis_state_manager *stasis_state_manager_create(const char *topic_name) } manager->states = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, - STATE_BUCKETS, stasis_state_hash_fn, NULL, stasis_state_cmp_fn); + STATE_BUCKETS, stasis_state_proxy_hash_fn, NULL, stasis_state_proxy_cmp_fn); if (!manager->states) { ao2_ref(manager, -1); return NULL; @@ -356,10 +371,7 @@ struct stasis_topic *stasis_state_topic(struct stasis_state_manager *manager, co struct stasis_topic *topic; struct stasis_state *state; - ao2_lock(manager->states); state = state_find_or_add(manager, NULL, id); - ao2_unlock(manager->states); - if (!state) { return NULL; } @@ -369,53 +381,6 @@ struct stasis_topic *stasis_state_topic(struct stasis_state_manager *manager, co return topic; } -/*! - * \internal - * \brief Remove a state from the stasis manager - * - * State should only be removed from the manager under the following conditions: - * - * There are no more subscribers to it - * There are no more explicit publishers publishing to it - * There are no more implicit publishers publishing to it - * - * Subscribers and explicit publishers hold a reference to the state object itself, so - * once a state's reference count drops to 2 (1 for the manager, 1 passed in) then we - * know there are no more subscribers or explicit publishers. Implicit publishers are - * tracked by eids, so once that container is empty no more implicit publishers exist - * for the state either. Only then can a state be removed. - * - * \param state The state to remove - */ -static void state_remove(struct stasis_state *state) -{ - ao2_lock(state); - - /* - * The manager's state container also needs to be locked here prior to checking - * the state's reference count, and potentially removing since we don't want its - * count to possibly increase between the check and unlinking. - */ - ao2_lock(state->manager->states); - - /* - * If there are only 2 references left then it's the one owned by the manager, - * and the one passed in to this function. However, before removing it from the - * manager we need to also check that no eid is associated with the given state. - * If an eid still remains then this means that an implicit publisher is still - * publishing to this state. - */ - if (ao2_ref(state, 0) == 2 && AST_VECTOR_SIZE(&state->eids) == 0) { - ao2_unlink_flags(state->manager->states, state, 0); - } - - ao2_unlock(state->manager->states); - ao2_unlock(state); - - /* Now it's safe to remove the reference that is held on the given state */ - ao2_ref(state, -1); -} - struct stasis_state_subscriber { /*! The stasis state subscribed to */ struct stasis_state *state; @@ -441,7 +406,7 @@ static void subscriber_dtor(void *obj) --sub->state->num_subscribers; ao2_unlock(sub->state); - state_remove(sub->state); + ao2_ref(sub->state, -1); } struct stasis_state_subscriber *stasis_state_add_subscriber( @@ -457,14 +422,11 @@ struct stasis_state_subscriber *stasis_state_add_subscriber( return NULL; } - ao2_lock(manager->states); sub->state = state_find_or_add(manager, NULL, id); if (!sub->state) { - ao2_unlock(manager->states); ao2_ref(sub, -1); return NULL; } - ao2_unlock(manager->states); ao2_lock(sub->state); ++sub->state->num_subscribers; @@ -563,7 +525,7 @@ static void publisher_dtor(void *obj) { struct stasis_state_publisher *pub = obj; - state_remove(pub->state); + ao2_ref(pub->state, -1); } struct stasis_state_publisher *stasis_state_add_publisher( @@ -578,14 +540,11 @@ struct stasis_state_publisher *stasis_state_add_publisher( return NULL; } - ao2_lock(manager->states); pub->state = state_find_or_add(manager, NULL, id); if (!pub->state) { - ao2_unlock(manager->states); ao2_ref(pub, -1); return NULL; } - ao2_unlock(manager->states); return pub; } @@ -639,7 +598,10 @@ static void state_find_or_add_eid(struct stasis_state *state, const struct ast_e } if (i == AST_VECTOR_SIZE(&state->eids)) { - AST_VECTOR_APPEND(&state->eids, *eid); + if (!AST_VECTOR_APPEND(&state->eids, *eid)) { + /* This ensures state cannot be freed if it has any eids */ + ao2_ref(state, +1); + } } } @@ -666,6 +628,8 @@ static void state_find_and_remove_eid(struct stasis_state *state, const struct a for (i = 0; i < AST_VECTOR_SIZE(&state->eids); ++i) { if (!ast_eid_cmp(AST_VECTOR_GET_ADDR(&state->eids, i), eid)) { AST_VECTOR_REMOVE_UNORDERED(&state->eids, i); + /* Balance the reference from state_find_or_add_eid */ + ao2_ref(state, -1); return; } } @@ -676,10 +640,7 @@ void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char { struct stasis_state *state; - ao2_lock(manager->states); state = state_find_or_add(manager, NULL, id); - ao2_unlock(manager->states); - if (!state) { return; } @@ -697,7 +658,7 @@ void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager, const char *id, const struct ast_eid *eid, struct stasis_message *msg) { - struct stasis_state *state = ao2_find(manager->states, id, OBJ_SEARCH_KEY); + struct stasis_state *state = ao2_weakproxy_find(manager->states, id, OBJ_SEARCH_KEY, ""); if (!state) { /* @@ -721,7 +682,7 @@ void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager, state_find_and_remove_eid(state, eid); ao2_unlock(state); - state_remove(state); + ao2_ref(state, -1); } int stasis_state_add_observer(struct stasis_state_manager *manager, @@ -744,10 +705,8 @@ void stasis_state_remove_observer(struct stasis_state_manager *manager, AST_VECTOR_RW_UNLOCK(&manager->observers); } -static int handle_stasis_state(void *obj, void *arg, void *data, int flags) +static int handle_stasis_state(struct stasis_state *state, on_stasis_state handler, void *data) { - struct stasis_state *state = obj; - on_stasis_state handler = arg; struct stasis_message *msg; int res; @@ -764,24 +723,41 @@ static int handle_stasis_state(void *obj, void *arg, void *data, int flags) return res; } +static int handle_stasis_state_proxy(void *obj, void *arg, void *data, int flags) +{ + struct stasis_state *state = ao2_weakproxy_get_object(obj, 0); + + if (state) { + int res; + res = handle_stasis_state(state, arg, data); + ao2_ref(state, -1); + return res; + } + + return 0; +} + void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler, void *data) { ast_assert(handler != NULL); ao2_callback_data(manager->states, OBJ_MULTIPLE | OBJ_NODATA, - handle_stasis_state, handler, data); + handle_stasis_state_proxy, handler, data); } static int handle_stasis_state_subscribed(void *obj, void *arg, void *data, int flags) { - struct stasis_state *state = obj; + struct stasis_state *state = ao2_weakproxy_get_object(obj, 0); + int res = 0; - if (state->num_subscribers) { - return handle_stasis_state(obj, arg, data, flags); + if (state && state->num_subscribers) { + res = handle_stasis_state(state, arg, data); } - return 0; + ao2_cleanup(state); + + return res; } void stasis_state_callback_subscribed(struct stasis_state_manager *manager, on_stasis_state handler,