diff --git a/apps/app_queue.c b/apps/app_queue.c index ddbd1bdfdd170c7a0079c0880aaad2031a7b4ba6..ce00b5e67630c7981d12afab55de247f0fc1331b 100644 --- a/apps/app_queue.c +++ b/apps/app_queue.c @@ -11007,6 +11007,8 @@ static int load_module(void) if (!device_state_sub) { err = -1; } + stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type()); + stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); manager_topic = ast_manager_get_topic(); queue_topic = ast_queue_topic_all(); diff --git a/channels/chan_dahdi.c b/channels/chan_dahdi.c index 3b05d721709679e454e14901b9159952f0543aa9..c40e084c91a0e254b99a7dd1e6b53ecce7b233cb 100644 --- a/channels/chan_dahdi.c +++ b/channels/chan_dahdi.c @@ -12690,6 +12690,8 @@ static struct dahdi_pvt *mkintf(int channel, const struct dahdi_chan_conf *conf, * knows that we care about it. Then, chan_dahdi will get the MWI from the * event cache instead of checking the mailbox directly. */ tmp->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL); + stasis_subscription_accept_message_type(tmp->mwi_event_sub, ast_mwi_state_type()); + stasis_subscription_set_filter(tmp->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } #ifdef HAVE_DAHDI_LINEREVERSE_VMWI diff --git a/channels/chan_iax2.c b/channels/chan_iax2.c index 7a369e1a749dd19ef157700c14d43a2ca60bbadb..d534c0003ffe45a3d2488ddf548acd6c688b073a 100644 --- a/channels/chan_iax2.c +++ b/channels/chan_iax2.c @@ -1463,6 +1463,8 @@ static void network_change_stasis_subscribe(void) if (!network_change_sub) { network_change_sub = stasis_subscribe(ast_system_topic(), network_change_stasis_cb, NULL); + stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type()); + stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } @@ -1476,6 +1478,8 @@ static void acl_change_stasis_subscribe(void) if (!acl_change_sub) { acl_change_sub = stasis_subscribe(ast_security_topic(), acl_change_stasis_cb, NULL); + stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type()); + stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } @@ -13100,6 +13104,8 @@ static struct iax2_peer *build_peer(const char *name, struct ast_variable *v, st * mailboxes. However, we just grab the events out of the cache when it * is time to send MWI, since it is only sent with a REGACK. */ peer->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL); + stasis_subscription_accept_message_type(peer->mwi_event_sub, ast_mwi_state_type()); + stasis_subscription_set_filter(peer->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } diff --git a/channels/chan_mgcp.c b/channels/chan_mgcp.c index c9ed1e2f5981bcaa58303e484a817ba172ea9185..5b3089b75fb994cff91c9d7636dab9a390185ecb 100644 --- a/channels/chan_mgcp.c +++ b/channels/chan_mgcp.c @@ -4234,6 +4234,8 @@ static struct mgcp_gateway *build_gateway(char *cat, struct ast_variable *v) * knows that we care about it. Then, chan_mgcp will get the MWI from the * event cache instead of checking the mailbox directly. */ e->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, stasis_subscription_cb_noop, NULL); + stasis_subscription_accept_message_type(e->mwi_event_sub, ast_mwi_state_type()); + stasis_subscription_set_filter(e->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", (unsigned long)ast_random()); diff --git a/channels/chan_sip.c b/channels/chan_sip.c index 97ce93c554425196b7ced17d151bd291da3603e2..49d5f64a5b261f85465a553ad8454a06a3866eb2 100644 --- a/channels/chan_sip.c +++ b/channels/chan_sip.c @@ -17384,6 +17384,8 @@ static void network_change_stasis_subscribe(void) if (!network_change_sub) { network_change_sub = stasis_subscribe(ast_system_topic(), network_change_stasis_cb, NULL); + stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type()); + stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } @@ -17397,6 +17399,8 @@ static void acl_change_stasis_subscribe(void) if (!acl_change_sub) { acl_change_sub = stasis_subscribe(ast_security_topic(), acl_change_stasis_cb, NULL); + stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type()); + stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } @@ -28163,6 +28167,9 @@ static void add_peer_mwi_subs(struct sip_peer *peer) mailbox_specific_topic = ast_mwi_topic(mailbox->id); if (mailbox_specific_topic) { mailbox->event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, peer); + stasis_subscription_accept_message_type(mailbox->event_sub, ast_mwi_state_type()); + stasis_subscription_accept_message_type(mailbox->event_sub, stasis_subscription_change_type()); + stasis_subscription_set_filter(mailbox->event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } } diff --git a/channels/chan_skinny.c b/channels/chan_skinny.c index 43cc32a2be4916a4857b08797ad3e9e8c0249b26..9c63da9027a27e564913875c6d36a5d523c9b4e7 100644 --- a/channels/chan_skinny.c +++ b/channels/chan_skinny.c @@ -8330,6 +8330,8 @@ static struct skinny_line *config_line(const char *lname, struct ast_variable *v mailbox_specific_topic = ast_mwi_topic(l->mailbox); if (mailbox_specific_topic) { l->mwi_event_sub = stasis_subscribe_pool(mailbox_specific_topic, mwi_event_cb, l); + stasis_subscription_accept_message_type(l->mwi_event_sub, ast_mwi_state_type()); + stasis_subscription_set_filter(l->mwi_event_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } diff --git a/channels/sig_pri.c b/channels/sig_pri.c index f371fbf3678d7fd66bf1206da27676cd616050cb..682abf37a4bb406d4f4b72227bd03f856dbc8bfe 100644 --- a/channels/sig_pri.c +++ b/channels/sig_pri.c @@ -9143,6 +9143,9 @@ int sig_pri_start_pri(struct sig_pri_span *pri) if (!pri->mbox[i].sub) { ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s(%s).\n", sig_pri_cc_type_name, pri->span, pri->mbox[i].vm_box, mbox_id); + } else { + stasis_subscription_accept_message_type(pri->mbox[i].sub, ast_mwi_state_type()); + stasis_subscription_set_filter(pri->mbox[i].sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } #if defined(HAVE_PRI_MWI_V2) if (ast_strlen_zero(pri->mbox[i].vm_number)) { diff --git a/include/asterisk/stasis.h b/include/asterisk/stasis.h index 25faa467b17d685dda5cc33af6d5506af8db94bc..a9d5a74c8cb9f6ba05bab1af5023a782a6e9ba5f 100644 --- a/include/asterisk/stasis.h +++ b/include/asterisk/stasis.h @@ -293,6 +293,15 @@ enum stasis_message_type_result { STASIS_MESSAGE_TYPE_DECLINED, /*!< Message type was not created due to configuration */ }; +/*! + * \brief Stasis subscription message filters + */ +enum stasis_subscription_message_filter { + STASIS_SUBSCRIPTION_FILTER_NONE = 0, /*!< No filter is in place, all messages are raised */ + STASIS_SUBSCRIPTION_FILTER_FORCED_NONE, /*!< No filter is in place or can be set, all messages are raised */ + STASIS_SUBSCRIPTION_FILTER_SELECTIVE, /*!< Only messages of allowed message types are raised */ +}; + /*! * \brief Create a new message type. * @@ -328,6 +337,14 @@ const char *stasis_message_type_name(const struct stasis_message_type *type); */ unsigned int stasis_message_type_hash(const struct stasis_message_type *type); +/*! + * \brief Gets the id of a given message type + * \param type The type to get the id of. + * \return The id + * \since 17.0.0 + */ +int stasis_message_type_id(const struct stasis_message_type *type); + /*! * \brief Check whether a message type is declined * @@ -500,6 +517,14 @@ struct stasis_topic *stasis_topic_create(const char *name); */ const char *stasis_topic_name(const struct stasis_topic *topic); +/*! + * \brief Return the number of subscribers of a topic. + * \param topic Topic. + * \return Number of subscribers of the topic. + * \since 17.0.0 + */ +size_t stasis_topic_subscribers(const struct stasis_topic *topic); + /*! * \brief Publish a message to a topic's subscribers. * \param topic Topic. @@ -569,6 +594,10 @@ void stasis_subscription_cb_noop(void *data, struct stasis_subscription *sub, st * \return New \ref stasis_subscription object. * \return \c NULL on error. * \since 12 + * + * \note This callback will receive a callback with a message indicating it + * has been subscribed. This occurs immediately before accepted message + * types can be set and the callback must expect to receive it. */ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data); @@ -594,10 +623,68 @@ struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, * \return New \ref stasis_subscription object. * \return \c NULL on error. * \since 12.8.0 + * + * \note This callback will receive a callback with a message indicating it + * has been subscribed. This occurs immediately before accepted message + * types can be set and the callback must expect to receive it. */ struct stasis_subscription *stasis_subscribe_pool(struct stasis_topic *topic, stasis_subscription_cb callback, void *data); +/*! + * \brief Indicate to a subscription that we are interested in a message type. + * + * This will cause the subscription to allow the given message type to be + * raised to our subscription callback. This enables internal filtering in + * the stasis message bus to reduce messages. + * + * \param subscription Subscription to add message type to. + * \param type The message type we wish to receive. + * \retval 0 on success + * \retval -1 failure + * + * \since 17.0.0 + * + * \note If you are wanting to use stasis_final_message you will need to accept + * \ref stasis_subscription_change_type as a message type. + * + * \note Until the subscription is set to selective filtering it is possible for it + * to receive messages of message types that would not normally be accepted. + */ +int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, + const struct stasis_message_type *type); + +/*! + * \brief Indicate to a subscription that we are not interested in a message type. + * + * \param subscription Subscription to remove message type from. + * \param type The message type we don't wish to receive. + * \retval 0 on success + * \retval -1 failure + * + * \since 17.0.0 + */ +int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, + const struct stasis_message_type *type); + +/*! + * \brief Set the message type filtering level on a subscription + * + * This will cause the subscription to filter messages according to the + * provided filter level. For example if selective is used then only + * messages matching those provided to \ref stasis_subscription_accept_message_type + * will be raised to the subscription callback. + * + * \param subscription Subscription that should receive all messages. + * \param filter What filter to use + * \retval 0 on success + * \retval -1 failure + * + * \since 17.0.0 + */ +int stasis_subscription_set_filter(struct stasis_subscription *subscription, + enum stasis_subscription_message_filter filter); + /*! * \brief Cancel a subscription. * @@ -1052,6 +1139,41 @@ struct stasis_caching_topic *stasis_caching_unsubscribe_and_join( struct stasis_topic *stasis_caching_get_topic( struct stasis_caching_topic *caching_topic); +/*! + * \brief Indicate to a caching topic that we are interested in a message type. + * + * This will cause the caching topic to receive messages of the given message + * type. This enables internal filtering in the stasis message bus to reduce + * messages. + * + * \param caching_topic The caching topic. + * \param type The message type we wish to receive. + * \retval 0 on success + * \retval -1 failure + * + * \since 17.0.0 + */ +int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, + struct stasis_message_type *type); + +/*! + * \brief Set the message type filtering level on a cache + * + * This will cause the underlying subscription to filter messages according to the + * provided filter level. For example if selective is used then only + * messages matching those provided to \ref stasis_subscription_accept_message_type + * will be raised to the subscription callback. + * + * \param caching_topic The caching topic. + * \param filter What filter to use + * \retval 0 on success + * \retval -1 failure + * + * \since 17.0.0 + */ +int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, + enum stasis_subscription_message_filter filter); + /*! * \brief A message which instructs the caching topic to remove an entry from * its cache. diff --git a/include/asterisk/stasis_cache_pattern.h b/include/asterisk/stasis_cache_pattern.h index e61d3e931cfdc52fcebbfdf63f8f233869e000bc..514d62e695a509a92e9984770c463761e2f603f7 100644 --- a/include/asterisk/stasis_cache_pattern.h +++ b/include/asterisk/stasis_cache_pattern.h @@ -169,4 +169,39 @@ struct stasis_topic *stasis_cp_single_topic(struct stasis_cp_single *one); struct stasis_topic *stasis_cp_single_topic_cached( struct stasis_cp_single *one); +/*! + * \brief Indicate to an instance that we are interested in a message type. + * + * This will cause the caching topic to receive messages of the given message + * type. This enables internal filtering in the stasis message bus to reduce + * messages. + * + * \param one One side of the cache pattern. + * \param type The message type we wish to receive. + * \retval 0 on success + * \retval -1 failure + * + * \since 17.0.0 + */ +int stasis_cp_single_accept_message_type(struct stasis_cp_single *one, + struct stasis_message_type *type); + +/*! + * \brief Set the message type filtering level on a cache + * + * This will cause the underlying subscription to filter messages according to the + * provided filter level. For example if selective is used then only + * messages matching those provided to \ref stasis_subscription_accept_message_type + * will be raised to the subscription callback. + * + * \param one One side of the cache pattern. + * \param filter What filter to use + * \retval 0 on success + * \retval -1 failure + * + * \since 17.0.0 + */ +int stasis_cp_single_set_filter(struct stasis_cp_single *one, + enum stasis_subscription_message_filter filter); + #endif /* _ASTERISK_STASIS_CACHE_PATTERN_H */ diff --git a/include/asterisk/stasis_message_router.h b/include/asterisk/stasis_message_router.h index 50270a788b4c89537d5ce9ad3e26f863ff1408d5..8dcdfcc913ec6b9e98898cd9e4566a8e57239123 100644 --- a/include/asterisk/stasis_message_router.h +++ b/include/asterisk/stasis_message_router.h @@ -233,6 +233,10 @@ void stasis_message_router_remove_cache_update( * \retval -1 on failure * * \since 12 + * + * \note Setting a default callback will automatically cause the underlying + * subscription to receive all messages and not be filtered. If filtering is + * desired then a specific route for each message type should be provided. */ int stasis_message_router_set_default(struct stasis_message_router *router, stasis_subscription_cb callback, diff --git a/main/ccss.c b/main/ccss.c index 7ff77fdab255b3f8d051e474b835f42aaf046d32..205dc1b063854a69a2e992f4ef59b3da294e5409 100644 --- a/main/ccss.c +++ b/main/ccss.c @@ -1439,6 +1439,8 @@ static struct generic_monitor_instance_list *create_new_generic_list(struct ast_ cc_unref(generic_list, "Failed to subscribe to device state"); return NULL; } + stasis_subscription_accept_message_type(generic_list->sub, ast_device_state_message_type()); + stasis_subscription_set_filter(generic_list->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); generic_list->current_state = ast_device_state(monitor->interface->device_name); ao2_t_link(generic_monitors, generic_list, "linking new generic monitor instance list"); return generic_list; @@ -2810,6 +2812,9 @@ static int cc_generic_agent_start_monitoring(struct ast_cc_agent *agent) if (!(generic_pvt->sub = stasis_subscribe(device_specific_topic, generic_agent_devstate_cb, agent))) { return -1; } + stasis_subscription_accept_message_type(generic_pvt->sub, ast_device_state_message_type()); + stasis_subscription_accept_message_type(generic_pvt->sub, stasis_subscription_change_type()); + stasis_subscription_set_filter(generic_pvt->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); cc_ref(agent, "Ref agent for subscription"); return 0; } diff --git a/main/devicestate.c b/main/devicestate.c index b6650457a8f05ed90a7794a667e97fb07fc1a23b..6706725e5e377fcfa183521a554a61c6071d73a4 100644 --- a/main/devicestate.c +++ b/main/devicestate.c @@ -945,6 +945,8 @@ int devstate_init(void) if (!device_state_topic_cached) { return -1; } + stasis_caching_accept_message_type(device_state_topic_cached, ast_device_state_message_type()); + stasis_caching_set_filter(device_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); devstate_message_sub = stasis_subscribe(ast_device_state_topic_all(), devstate_change_cb, NULL); @@ -952,6 +954,8 @@ int devstate_init(void) ast_log(LOG_ERROR, "Failed to create subscription creating uncached device state aggregate events.\n"); return -1; } + stasis_subscription_accept_message_type(devstate_message_sub, ast_device_state_message_type()); + stasis_subscription_set_filter(devstate_message_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); return 0; } diff --git a/main/endpoints.c b/main/endpoints.c index 88506a4c84af15dc9df698ed0acce41e4a5a85a8..69d022fb0a68e23e6711104e08350413c5d995cc 100644 --- a/main/endpoints.c +++ b/main/endpoints.c @@ -204,7 +204,7 @@ static void endpoint_cache_clear(void *data, endpoint_publish_snapshot(endpoint); } -static void endpoint_default(void *data, +static void endpoint_subscription_change(void *data, struct stasis_subscription *sub, struct stasis_message *message) { @@ -265,6 +265,8 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha if (!endpoint->topics) { return NULL; } + stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type()); + stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint)); if (!endpoint->router) { @@ -273,8 +275,9 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha r |= stasis_message_router_add(endpoint->router, stasis_cache_clear_type(), endpoint_cache_clear, endpoint); - r |= stasis_message_router_set_default(endpoint->router, - endpoint_default, endpoint); + r |= stasis_message_router_add(endpoint->router, + stasis_subscription_change_type(), endpoint_subscription_change, + endpoint); if (r) { return NULL; } @@ -290,6 +293,8 @@ static struct ast_endpoint *endpoint_internal_create(const char *tech, const cha if (!endpoint->topics) { return NULL; } + stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type()); + stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); ao2_link(tech_endpoints, endpoint); } diff --git a/main/manager.c b/main/manager.c index e5ca57122ba42dc49bf9ac7caa286418d7e099d3..5b4cc3af34bbd9b5d2fcb9a010faa6ef7ebaf444 100644 --- a/main/manager.c +++ b/main/manager.c @@ -1521,6 +1521,8 @@ static void acl_change_stasis_subscribe(void) if (!acl_change_sub) { acl_change_sub = stasis_subscribe(ast_security_topic(), acl_change_stasis_cb, NULL); + stasis_subscription_accept_message_type(acl_change_sub, ast_named_acl_change_type()); + stasis_subscription_set_filter(acl_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } diff --git a/main/pbx.c b/main/pbx.c index c87496b7307b761590085c65054edac4419f7f1c..434173d678bf1695bef170e320673eb8b04ccbae 100644 --- a/main/pbx.c +++ b/main/pbx.c @@ -8339,10 +8339,15 @@ int load_pbx(void) if (!(device_state_sub = stasis_subscribe(ast_device_state_topic_all(), device_state_cb, NULL))) { return -1; } + stasis_subscription_accept_message_type(device_state_sub, ast_device_state_message_type()); + stasis_subscription_accept_message_type(device_state_sub, hint_change_message_type()); + stasis_subscription_set_filter(device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); if (!(presence_state_sub = stasis_subscribe(ast_presence_state_topic_all(), presence_state_cb, NULL))) { return -1; } + stasis_subscription_accept_message_type(presence_state_sub, ast_presence_state_message_type()); + stasis_subscription_set_filter(presence_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); return 0; } diff --git a/main/presencestate.c b/main/presencestate.c index 56c903cf288e675867376b324a6702a51e257850..ff4934ade132d0c6f9e59b38ef50ac5fa8ca4aab 100644 --- a/main/presencestate.c +++ b/main/presencestate.c @@ -389,6 +389,8 @@ int ast_presence_state_engine_init(void) if (!presence_state_topic_cached) { return -1; } + stasis_caching_accept_message_type(presence_state_topic_cached, ast_presence_state_message_type()); + stasis_caching_set_filter(presence_state_topic_cached, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); return 0; } diff --git a/main/stasis.c b/main/stasis.c index 26e404c836517d1e016791037af898d3e50a951f..d054897a9c1e1ed4d23a7925ccd5edadc9f3d19a 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -372,6 +372,11 @@ const char *stasis_topic_name(const struct stasis_topic *topic) return topic->name; } +size_t stasis_topic_subscribers(const struct stasis_topic *topic) +{ + return AST_VECTOR_SIZE(&topic->subscribers); +} + /*! \internal */ struct stasis_subscription { /*! Unique ID for this subscription */ @@ -393,6 +398,11 @@ struct stasis_subscription { /*! Flag set when final message for sub has been processed. * Be sure join_lock is held before reading/setting. */ int final_message_processed; + + /*! The message types this subscription is accepting */ + AST_VECTOR(, char) accepted_message_types; + /*! The message filter currently in use */ + enum stasis_subscription_message_filter filter; }; static void subscription_dtor(void *obj) @@ -411,6 +421,8 @@ static void subscription_dtor(void *obj) ast_taskprocessor_unreference(sub->mailbox); sub->mailbox = NULL; ast_cond_destroy(&sub->join_cond); + + AST_VECTOR_FREE(&sub->accepted_message_types); } /*! @@ -422,19 +434,25 @@ static void subscription_dtor(void *obj) static void subscription_invoke(struct stasis_subscription *sub, struct stasis_message *message) { + unsigned int final = stasis_subscription_final_message(sub, message); + int message_type_id = stasis_message_type_id(stasis_subscription_change_type()); + /* Notify that the final message has been received */ - if (stasis_subscription_final_message(sub, message)) { + if (final) { ao2_lock(sub); sub->final_message_rxed = 1; ast_cond_signal(&sub->join_cond); ao2_unlock(sub); } - /* Since sub is mostly immutable, no need to lock sub */ - sub->callback(sub->data, sub, message); + if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE || + (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) { + /* Since sub is mostly immutable, no need to lock sub */ + sub->callback(sub->data, sub, message); + } /* Notify that the final message has been processed */ - if (stasis_subscription_final_message(sub, message)) { + if (final) { ao2_lock(sub); sub->final_message_processed = 1; ast_cond_signal(&sub->join_cond); @@ -502,6 +520,8 @@ struct stasis_subscription *internal_stasis_subscribe( sub->callback = callback; sub->data = data; ast_cond_init(&sub->join_cond, NULL); + sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE; + AST_VECTOR_INIT(&sub->accepted_message_types, 0); if (topic_add_subscription(topic, sub) != 0) { ao2_ref(sub, -1); @@ -588,6 +608,76 @@ int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscr return res; } +int stasis_subscription_accept_message_type(struct stasis_subscription *subscription, + const struct stasis_message_type *type) +{ + if (!subscription) { + return -1; + } + + ast_assert(type != NULL); + ast_assert(stasis_message_type_name(type) != NULL); + + if (!type || !stasis_message_type_name(type)) { + /* Filtering is unreliable as this message type is not yet initialized + * so force all messages through. + */ + subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE; + return 0; + } + + ao2_lock(subscription->topic); + if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) { + /* We do this for the same reason as above. The subscription can still operate, so allow + * it to do so by forcing all messages through. + */ + subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE; + } + ao2_unlock(subscription->topic); + + return 0; +} + +int stasis_subscription_decline_message_type(struct stasis_subscription *subscription, + const struct stasis_message_type *type) +{ + if (!subscription) { + return -1; + } + + ast_assert(type != NULL); + ast_assert(stasis_message_type_name(type) != NULL); + + if (!type || !stasis_message_type_name(type)) { + return 0; + } + + ao2_lock(subscription->topic); + if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) { + /* The memory is already allocated so this can't fail */ + AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0); + } + ao2_unlock(subscription->topic); + + return 0; +} + +int stasis_subscription_set_filter(struct stasis_subscription *subscription, + enum stasis_subscription_message_filter filter) +{ + if (!subscription) { + return -1; + } + + ao2_lock(subscription->topic); + if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) { + subscription->filter = filter; + } + ao2_unlock(subscription->topic); + + return 0; +} + void stasis_subscription_join(struct stasis_subscription *subscription) { if (subscription) { @@ -783,6 +873,18 @@ static void dispatch_message(struct stasis_subscription *sub, struct stasis_message *message, int synchronous) { + /* Determine if this subscription is interested in this message. Note that final + * messages are special and are always invoked on the subscription. + */ + if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) { + int message_type_id = stasis_message_type_id(stasis_message_type(message)); + if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) || + !AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) && + !stasis_subscription_final_message(sub, message)) { + return; + } + } + if (!sub->mailbox) { /* Dispatch directly */ subscription_invoke(sub, message); @@ -842,6 +944,11 @@ static void publish_msg(struct stasis_topic *topic, ast_assert(topic != NULL); ast_assert(message != NULL); + /* If there are no subscribers don't bother */ + if (!stasis_topic_subscribers(topic)) { + return; + } + /* * The topic may be unref'ed by the subscription invocation. * Make sure we hold onto a reference while dispatching. diff --git a/main/stasis_cache.c b/main/stasis_cache.c index 1adfc0ee5f9624d3173bfb39fe2c2068e80f8e0f..fd560b00f48f55d0fa4fce598a8c1e585207b6f0 100644 --- a/main/stasis_cache.c +++ b/main/stasis_cache.c @@ -89,6 +89,35 @@ struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *cachi return caching_topic->topic; } +int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic, + struct stasis_message_type *type) +{ + int res; + + if (!caching_topic) { + return -1; + } + + /* We wait to accept the stasis specific message types until now so that by default everything + * will flow to us. + */ + res = stasis_subscription_accept_message_type(caching_topic->sub, stasis_cache_clear_type()); + res |= stasis_subscription_accept_message_type(caching_topic->sub, stasis_subscription_change_type()); + res |= stasis_subscription_accept_message_type(caching_topic->sub, type); + + return res; +} + +int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic, + enum stasis_subscription_message_filter filter) +{ + if (!caching_topic) { + return -1; + } + return stasis_subscription_set_filter(caching_topic->sub, filter); +} + + struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic) { if (!caching_topic) { @@ -858,11 +887,13 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, /* Update the cache */ snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put); if (snapshots.old || msg_put) { - update = update_create(snapshots.old, msg_put); - if (update) { - stasis_publish(caching_topic->topic, update); + if (stasis_topic_subscribers(caching_topic->topic)) { + update = update_create(snapshots.old, msg_put); + if (update) { + stasis_publish(caching_topic->topic, update); + ao2_ref(update, -1); + } } - ao2_cleanup(update); } else { ast_debug(1, "Attempting to remove an item from the %s cache that isn't there: %s %s\n", @@ -875,11 +906,13 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub, caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic, snapshots.aggregate_new); } - update = update_create(snapshots.aggregate_old, snapshots.aggregate_new); - if (update) { - stasis_publish(caching_topic->topic, update); + if (stasis_topic_subscribers(caching_topic->topic)) { + update = update_create(snapshots.aggregate_old, snapshots.aggregate_new); + if (update) { + stasis_publish(caching_topic->topic, update); + ao2_ref(update, -1); + } } - ao2_cleanup(update); } ao2_cleanup(snapshots.old); diff --git a/main/stasis_cache_pattern.c b/main/stasis_cache_pattern.c index 2a2ea44e5efda3cc9b3b4e12a13fa871800b3ffc..c2ea76622063512948e9fe8dca944f26b3e80590 100644 --- a/main/stasis_cache_pattern.c +++ b/main/stasis_cache_pattern.c @@ -219,3 +219,21 @@ struct stasis_topic *stasis_cp_single_topic_cached( } return stasis_caching_get_topic(one->topic_cached); } + +int stasis_cp_single_accept_message_type(struct stasis_cp_single *one, + struct stasis_message_type *type) +{ + if (!one) { + return -1; + } + return stasis_caching_accept_message_type(one->topic_cached, type); +} + +int stasis_cp_single_set_filter(struct stasis_cp_single *one, + enum stasis_subscription_message_filter filter) +{ + if (!one) { + return -1; + } + return stasis_caching_set_filter(one->topic_cached, filter); +} diff --git a/main/stasis_message.c b/main/stasis_message.c index ef03d1389a6117cc4287e1f0bf78ec103e2f2166..2685a43f4430da511ebc0ab3729c54b234b24171 100644 --- a/main/stasis_message.c +++ b/main/stasis_message.c @@ -41,9 +41,11 @@ struct stasis_message_type { struct stasis_message_vtable *vtable; char *name; unsigned int hash; + int id; }; static struct stasis_message_vtable null_vtable = {}; +static int message_type_id; static void message_type_dtor(void *obj) { @@ -80,6 +82,7 @@ int stasis_message_type_create(const char *name, } type->hash = ast_hashtab_hash_string(name); type->vtable = vtable; + type->id = ast_atomic_fetchadd_int(&message_type_id, +1); *result = type; return STASIS_MESSAGE_TYPE_SUCCESS; @@ -95,6 +98,11 @@ unsigned int stasis_message_type_hash(const struct stasis_message_type *type) return type->hash; } +int stasis_message_type_id(const struct stasis_message_type *type) +{ + return type->id; +} + /*! \internal */ struct stasis_message { /*! Time the message was created */ diff --git a/main/stasis_message_router.c b/main/stasis_message_router.c index 498ddd6c2f9f2ea51ff1c096efdd0b20076ad8c2..e9aebe8a6027b5911daea7b3d7b51e7acd4ad96d 100644 --- a/main/stasis_message_router.c +++ b/main/stasis_message_router.c @@ -237,6 +237,9 @@ static struct stasis_message_router *stasis_message_router_create_internal( return NULL; } + /* We need to receive subscription change messages so we know when our subscription goes away */ + stasis_subscription_accept_message_type(router->subscription, stasis_subscription_change_type()); + return router; } @@ -318,6 +321,14 @@ int stasis_message_router_add(struct stasis_message_router *router, } ao2_lock(router); res = route_table_add(&router->routes, message_type, callback, data); + if (!res) { + stasis_subscription_accept_message_type(router->subscription, message_type); + /* Until a specific message type was added we would already drop the message, so being + * selective now doesn't harm us. If we have a default route then we are already forced + * to filter nothing and messages will come in regardless. + */ + stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); + } ao2_unlock(router); return res; } @@ -336,6 +347,10 @@ int stasis_message_router_add_cache_update(struct stasis_message_router *router, } ao2_lock(router); res = route_table_add(&router->cache_routes, message_type, callback, data); + if (!res) { + stasis_subscription_accept_message_type(router->subscription, stasis_cache_update_type()); + stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); + } ao2_unlock(router); return res; } @@ -380,6 +395,9 @@ int stasis_message_router_set_default(struct stasis_message_router *router, router->default_route.callback = callback; router->default_route.data = data; ao2_unlock(router); + + stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE); + /* While this implementation can never fail, it used to be able to */ return 0; } diff --git a/res/parking/parking_applications.c b/res/parking/parking_applications.c index 99999c9e2d675d15f8c7020b69cf85917896e141..fea598cba93b1f06856e94d6e3450ae0e36d6440 100644 --- a/res/parking/parking_applications.c +++ b/res/parking/parking_applications.c @@ -870,6 +870,10 @@ static void park_announce_update_cb(void *data, struct stasis_subscription *sub, return; } + if (ast_parked_call_type() != stasis_message_type(message)) { + return; + } + if (payload->event_type != PARKED_CALL) { /* We are only concerned with calls parked */ return; @@ -956,6 +960,10 @@ static int park_and_announce_app_exec(struct ast_channel *chan, const char *data return -1; } + stasis_subscription_accept_message_type(parking_subscription, ast_parked_call_type()); + stasis_subscription_accept_message_type(parking_subscription, stasis_subscription_change_type()); + stasis_subscription_set_filter(parking_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); + /* Now for the fun part... park it! */ ast_bridge_join(parking_bridge, chan, NULL, &chan_features, NULL, 0); diff --git a/res/parking/parking_bridge_features.c b/res/parking/parking_bridge_features.c index b4884dbfb637b016993d6610eae5bec6f541e759..cf5cc721db0470ea776907906b27f9575496cdb2 100644 --- a/res/parking/parking_bridge_features.c +++ b/res/parking/parking_bridge_features.c @@ -195,6 +195,9 @@ static int create_parked_subscription_full(struct ast_channel *chan, const char if (!(parked_datastore->parked_subscription = stasis_subscribe_pool(ast_parking_topic(), parker_update_cb, subscription_data))) { return -1; } + stasis_subscription_accept_message_type(parked_datastore->parked_subscription, ast_parked_call_type()); + stasis_subscription_accept_message_type(parked_datastore->parked_subscription, stasis_subscription_change_type()); + stasis_subscription_set_filter(parked_datastore->parked_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); datastore->data = parked_datastore; diff --git a/res/parking/parking_manager.c b/res/parking/parking_manager.c index ed28164feb652c4a8787a5fae7b29bc4a59341cd..5919b995d9465526c2977e4ec77ab742cbb1d31c 100644 --- a/res/parking/parking_manager.c +++ b/res/parking/parking_manager.c @@ -688,6 +688,8 @@ static void parking_manager_enable_stasis(void) { if (!parking_sub) { parking_sub = stasis_subscribe(ast_parking_topic(), parking_event_cb, NULL); + stasis_subscription_accept_message_type(parking_sub, ast_parked_call_type()); + stasis_subscription_set_filter(parking_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } diff --git a/res/res_hep_rtcp.c b/res/res_hep_rtcp.c index d799b46201b1ef3c6f0fdb2a648c2c582f870bd6..418b63e00b3081003822c9e3603bc260af709d3c 100644 --- a/res/res_hep_rtcp.c +++ b/res/res_hep_rtcp.c @@ -169,6 +169,9 @@ static int load_module(void) if (!stasis_rtp_subscription) { return AST_MODULE_LOAD_DECLINE; } + stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_sent_type()); + stasis_subscription_accept_message_type(stasis_rtp_subscription, ast_rtp_rtcp_received_type()); + stasis_subscription_set_filter(stasis_rtp_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); return AST_MODULE_LOAD_SUCCESS; } diff --git a/res/res_pjsip_mwi.c b/res/res_pjsip_mwi.c index f2ddf576e261c19b381d0b1c8d46767ab6de1134..f8c2392ec384efbed6a2f2ce087fa0e03a370d80 100644 --- a/res/res_pjsip_mwi.c +++ b/res/res_pjsip_mwi.c @@ -269,6 +269,9 @@ static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char ao2_ref(mwi_sub, -1); mwi_stasis_sub = NULL; } + stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, ast_mwi_state_type()); + stasis_subscription_accept_message_type(mwi_stasis_sub->stasis_sub, stasis_subscription_change_type()); + stasis_subscription_set_filter(mwi_stasis_sub->stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); return mwi_stasis_sub; } @@ -1366,7 +1369,11 @@ static int load_module(void) if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { ast_sip_push_task(NULL, send_initial_notify_all, NULL); } else { - stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL); + struct stasis_subscription *sub; + + sub = stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL); + stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type()); + stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c index 4d80437aa6803751e1d43d15b0988b6f4c5d075b..c42f59e50da30f33aed231c270e2a2f75218ef84 100644 --- a/res/res_pjsip_outbound_registration.c +++ b/res/res_pjsip_outbound_registration.c @@ -2285,6 +2285,8 @@ static int load_module(void) network_change_sub = stasis_subscribe(ast_system_topic(), network_change_stasis_cb, NULL); + stasis_subscription_accept_message_type(network_change_sub, ast_network_change_type()); + stasis_subscription_set_filter(network_change_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); return AST_MODULE_LOAD_SUCCESS; } diff --git a/res/res_pjsip_publish_asterisk.c b/res/res_pjsip_publish_asterisk.c index 53ee60fe4a3135a44d84d382269d6b36e5546845..2271d8bddc3ae666b436143469491b3c6b01789b 100644 --- a/res/res_pjsip_publish_asterisk.c +++ b/res/res_pjsip_publish_asterisk.c @@ -360,6 +360,9 @@ static int asterisk_start_devicestate_publishing(struct ast_sip_outbound_publish ao2_ref(datastore, -1); return -1; } + stasis_subscription_accept_message_type(publisher_state->device_state_subscription, ast_device_state_message_type()); + stasis_subscription_accept_message_type(publisher_state->device_state_subscription, stasis_subscription_change_type()); + stasis_subscription_set_filter(publisher_state->device_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); cached = stasis_cache_dump(ast_device_state_cache(), NULL); ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, datastore); @@ -435,6 +438,9 @@ static int asterisk_start_mwi_publishing(struct ast_sip_outbound_publish *config ao2_ref(datastore, -1); return -1; } + stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, ast_mwi_state_type()); + stasis_subscription_accept_message_type(publisher_state->mailbox_state_subscription, stasis_subscription_change_type()); + stasis_subscription_set_filter(publisher_state->mailbox_state_subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); cached = stasis_cache_dump(ast_mwi_state_cache(), NULL); ao2_callback(cached, OBJ_NODATA, cached_mwistate_cb, datastore); diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c index 1c0145cff943678b75517971915f05d0e62db6aa..eb4545b1133bf51d1099f6b6df936a5b1a082a54 100644 --- a/res/res_pjsip_pubsub.c +++ b/res/res_pjsip_pubsub.c @@ -5634,7 +5634,11 @@ static int load_module(void) if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { ast_sip_push_task(NULL, subscription_persistence_load, NULL); } else { - stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL); + struct stasis_subscription *sub; + + sub = stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL); + stasis_subscription_accept_message_type(sub, ast_manager_get_generic_type()); + stasis_subscription_set_filter(sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM, diff --git a/res/res_pjsip_refer.c b/res/res_pjsip_refer.c index b45b5a5883ae417c40009020e2dff5868dd20696..9b35c6aabab3f92f830f38698676e9b38786483d 100644 --- a/res/res_pjsip_refer.c +++ b/res/res_pjsip_refer.c @@ -686,6 +686,10 @@ static void refer_blind_callback(struct ast_channel *chan, struct transfer_chann ast_channel_unlock(chan); ao2_cleanup(refer->progress); + } else { + stasis_subscription_accept_message_type(refer->progress->bridge_sub, ast_channel_entered_bridge_type()); + stasis_subscription_accept_message_type(refer->progress->bridge_sub, stasis_subscription_change_type()); + stasis_subscription_set_filter(refer->progress->bridge_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); } } diff --git a/res/res_security_log.c b/res/res_security_log.c index c3fb3cfd2de38640786a17d1df48bf3dcc7c7e15..760d1558329214c150273813b876f577922fd891 100644 --- a/res/res_security_log.c +++ b/res/res_security_log.c @@ -143,6 +143,8 @@ static int load_module(void) LOG_SECURITY = -1; return AST_MODULE_LOAD_DECLINE; } + stasis_subscription_accept_message_type(security_stasis_sub, ast_security_event_type()); + stasis_subscription_set_filter(security_stasis_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); ast_verb(3, "Security Logging Enabled\n"); diff --git a/res/res_stasis_device_state.c b/res/res_stasis_device_state.c index 34c77d9f095401d451527e21abb07f33a98198f0..fbdfb3d52505a16c668684684975ca4c8327d3c3 100644 --- a/res/res_stasis_device_state.c +++ b/res/res_stasis_device_state.c @@ -396,6 +396,9 @@ static int subscribe_device_state(struct stasis_app *app, void *obj) ao2_ref(sub, -1); return -1; } + stasis_subscription_accept_message_type(sub->sub, ast_device_state_message_type()); + stasis_subscription_accept_message_type(sub->sub, stasis_subscription_change_type()); + stasis_subscription_set_filter(sub->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); ao2_link_flags(device_state_subscriptions, sub, OBJ_NOLOCK); ao2_unlock(device_state_subscriptions); diff --git a/res/res_xmpp.c b/res/res_xmpp.c index 41f89961ce63dd581cdee4ae69971aa77b9156b4..a85892fc850ccc7ada6b4180f6d74c35fba0a80c 100644 --- a/res/res_xmpp.c +++ b/res/res_xmpp.c @@ -1628,11 +1628,15 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client) if (!(client->mwi_sub = stasis_subscribe_pool(ast_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) { return; } + stasis_subscription_accept_message_type(client->mwi_sub, ast_mwi_state_type()); + stasis_subscription_set_filter(client->mwi_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); if (!(client->device_state_sub = stasis_subscribe(ast_device_state_topic_all(), xmpp_pubsub_devstate_cb, client))) { client->mwi_sub = stasis_unsubscribe(client->mwi_sub); return; } + stasis_subscription_accept_message_type(client->device_state_sub, ast_device_state_message_type()); + stasis_subscription_set_filter(client->device_state_sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE); cached = stasis_cache_dump(ast_device_state_cache(), NULL); ao2_callback(cached, OBJ_NODATA, cached_devstate_cb, client);