diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 890ce59fba26ae7a19949769cd4cd3aac17f2e47..31db3676628de9bffd1947437f3c45dc5bec7a17 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -2926,4 +2926,91 @@ int ast_sip_dtmf_to_str(const enum ast_sip_dtmf_mode dtmf, */ int ast_sip_str_to_dtmf(const char *dtmf_mode); +/*! + * \brief Transport shutdown monitor callback. + * \since 13.18.0 + * + * \param data User data to know what to do when transport shuts down. + * + * \note The callback does not need to care that data is an ao2 object. + * + * \return Nothing + */ +typedef void (*ast_transport_monitor_shutdown_cb)(void *data); + +enum ast_transport_monitor_reg { + /*! \brief Successfully registered the transport monitor */ + AST_TRANSPORT_MONITOR_REG_SUCCESS, + /*! \brief Replaced the already existing transport monitor with new one. */ + AST_TRANSPORT_MONITOR_REG_REPLACED, + /*! + * \brief Transport not found to monitor. + * \note Transport is either already shutdown or is not reliable. + */ + AST_TRANSPORT_MONITOR_REG_NOT_FOUND, + /*! \brief Error while registering transport monitor. */ + AST_TRANSPORT_MONITOR_REG_FAILED, +}; + +/*! + * \brief Register a reliable transport shutdown monitor callback. + * \since 13.18.0 + * + * \param transport Transport to monitor for shutdown. + * \param cb Who to call when transport is shutdown. + * \param ao2_data Data to pass with the callback. + * + * \return enum ast_transport_monitor_reg + */ +enum ast_transport_monitor_reg ast_sip_transport_monitor_register(pjsip_transport *transport, + ast_transport_monitor_shutdown_cb cb, void *ao2_data); + +/*! + * \brief Unregister a reliable transport shutdown monitor callback. + * \since 13.18.0 + * + * \param transport Transport to monitor for shutdown. + * \param cb Who to call when transport is shutdown. + * + * \return Nothing + */ +void ast_sip_transport_monitor_unregister(pjsip_transport *transport, ast_transport_monitor_shutdown_cb cb); + +/*! + * \brief Unregister monitor callback from all reliable transports. + * \since 13.18.0 + * + * \param cb Who to call when a transport is shutdown. + * + * \return Nothing + */ +void ast_sip_transport_monitor_unregister_all(ast_transport_monitor_shutdown_cb cb); + +/*! Transport state notification registration element. */ +struct ast_sip_tpmgr_state_callback { + /*! PJPROJECT transport state notification callback */ + pjsip_tp_state_callback cb; + AST_LIST_ENTRY(ast_sip_tpmgr_state_callback) node; +}; + +/*! + * \brief Register a transport state notification callback element. + * \since 13.18.0 + * + * \param element What we are registering. + * + * \return Nothing + */ +void ast_sip_transport_state_register(struct ast_sip_tpmgr_state_callback *element); + +/*! + * \brief Unregister a transport state notification callback element. + * \since 13.18.0 + * + * \param element What we are unregistering. + * + * \return Nothing + */ +void ast_sip_transport_state_unregister(struct ast_sip_tpmgr_state_callback *element); + #endif /* _RES_PJSIP_H */ diff --git a/res/res_pjsip.c b/res/res_pjsip.c index f3648acdbff9e3104ff16f5d047aee82f10136a9..2917df3c597738332ef24417d8f52012f132a035 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -4662,6 +4662,7 @@ static int unload_pjsip(void *data) ast_sip_destroy_system(); ast_sip_destroy_global_headers(); internal_sip_unregister_service(&supplement_module); + ast_sip_destroy_transport_events(); } if (monitor_thread) { @@ -4740,7 +4741,6 @@ static int load_pjsip(void) return AST_MODULE_LOAD_SUCCESS; error: - unload_pjsip(NULL); return AST_MODULE_LOAD_DECLINE; } @@ -4806,6 +4806,11 @@ static int load_module(void) goto error; } + if (ast_sip_initialize_transport_events()) { + ast_log(LOG_ERROR, "Failed to initialize SIP transport monitor. Aborting load\n"); + goto error; + } + ast_sip_initialize_dns(); ast_sip_initialize_global_headers(); diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h index 0bdb63325af1f280bb1f57c2ffb736bf7e509f01..2969f0e40c8f67bbbfb8e85baad611e4cca7bbcf 100644 --- a/res/res_pjsip/include/res_pjsip_private.h +++ b/res/res_pjsip/include/res_pjsip_private.h @@ -133,6 +133,29 @@ int ast_sip_initialize_distributor(void); */ void ast_sip_destroy_distributor(void); +/*! + * \internal + * \brief Initialize the transport events notify module + * \since 13.18.0 + * + * The transport events notify module is responsible for monitoring + * when transports die and calling any registered callbacks when that + * happens. It also manages any PJPROJECT transport state callbacks + * registered to it so the callbacks be more dynamic allowing module + * loading/unloading. + * + * \retval -1 Failure + * \retval 0 Success + */ +int ast_sip_initialize_transport_events(void); + +/*! + * \internal + * \brief Destruct the transport events notify module. + * \since 13.18.0 + */ +void ast_sip_destroy_transport_events(void); + /*! * \internal * \brief Initialize global type on a sorcery instance diff --git a/res/res_pjsip/pjsip_transport_events.c b/res/res_pjsip/pjsip_transport_events.c new file mode 100644 index 0000000000000000000000000000000000000000..0f57303ba22fc385d65ded6a8d214fee2dec81d8 --- /dev/null +++ b/res/res_pjsip/pjsip_transport_events.c @@ -0,0 +1,366 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2017, Digium Inc. + * + * Richard Mudgett <rmudgett@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +/*! + * \file + * \brief Manages the global transport event notification callbacks. + * + * \author Richard Mudgett <rmudgett@digium.com> + * See Also: + * + * \arg \ref AstCREDITS + */ + + +#include "asterisk.h" + +#include "asterisk/res_pjsip.h" +#include "include/res_pjsip_private.h" +#include "asterisk/linkedlists.h" +#include "asterisk/vector.h" + +/* ------------------------------------------------------------------- */ + +/*! \brief Number of buckets for monitored active transports */ +#define ACTIVE_TRANSPORTS_BUCKETS 127 + +/*! Who to notify when transport shuts down. */ +struct transport_monitor_notifier { + /*! Who to call when transport shuts down. */ + ast_transport_monitor_shutdown_cb cb; + /*! ao2 data object to pass to callback. */ + void *data; +}; + +/*! \brief Structure for transport to be monitored */ +struct transport_monitor { + /*! \brief The underlying PJSIP transport */ + pjsip_transport *transport; + /*! Who is interested in when this transport shuts down. */ + AST_VECTOR(, struct transport_monitor_notifier) monitors; +}; + +/*! \brief Global container of active reliable transports */ +static AO2_GLOBAL_OBJ_STATIC(active_transports); + +/*! \brief Existing transport events callback that we need to invoke */ +static pjsip_tp_state_callback tpmgr_state_callback; + +/*! List of registered transport state callbacks. */ +static AST_RWLIST_HEAD(, ast_sip_tpmgr_state_callback) transport_state_list; + + +/*! \brief Hashing function for struct transport_monitor */ +AO2_STRING_FIELD_HASH_FN(transport_monitor, transport->obj_name); + +/*! \brief Comparison function for struct transport_monitor */ +AO2_STRING_FIELD_CMP_FN(transport_monitor, transport->obj_name); + +static const char *transport_state2str(pjsip_transport_state state) +{ + const char *name; + + switch (state) { + case PJSIP_TP_STATE_CONNECTED: + name = "CONNECTED"; + break; + case PJSIP_TP_STATE_DISCONNECTED: + name = "DISCONNECTED"; + break; + case PJSIP_TP_STATE_SHUTDOWN: + name = "SHUTDOWN"; + break; + case PJSIP_TP_STATE_DESTROY: + name = "DESTROY"; + break; + default: + /* + * We have to have a default case because the enum is + * defined by a third-party library. + */ + ast_assert(0); + name = "<unknown>"; + break; + } + return name; +} + +static void transport_monitor_dtor(void *vdoomed) +{ + struct transport_monitor *monitored = vdoomed; + int idx; + + for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) { + struct transport_monitor_notifier *notifier; + + notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx); + ao2_cleanup(notifier->data); + } + AST_VECTOR_FREE(&monitored->monitors); +} + +/*! \brief Callback invoked when transport state changes occur */ +static void transport_state_callback(pjsip_transport *transport, + pjsip_transport_state state, const pjsip_transport_state_info *info) +{ + struct ao2_container *transports; + + /* We only care about monitoring reliable transports */ + if (PJSIP_TRANSPORT_IS_RELIABLE(transport) + && (transports = ao2_global_obj_ref(active_transports))) { + struct transport_monitor *monitored; + + ast_debug(3, "Reliable transport '%s' state:%s\n", + transport->obj_name, transport_state2str(state)); + switch (state) { + case PJSIP_TP_STATE_CONNECTED: + monitored = ao2_alloc_options(sizeof(*monitored), + transport_monitor_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK); + if (!monitored) { + break; + } + monitored->transport = transport; + if (AST_VECTOR_INIT(&monitored->monitors, 2)) { + ao2_ref(monitored, -1); + break; + } + + ao2_link(transports, monitored); + ao2_ref(monitored, -1); + break; + case PJSIP_TP_STATE_DISCONNECTED: + if (!transport->is_shutdown) { + pjsip_transport_shutdown(transport); + } + break; + case PJSIP_TP_STATE_SHUTDOWN: + /* + * Set shutdown flag early so we can force a new transport to be + * created if a monitor callback needs to reestablish a link. + * PJPROJECT sets the flag after this routine returns even though + * it has already called the transport's shutdown routine. + */ + transport->is_shutdown = PJ_TRUE; + + monitored = ao2_find(transports, transport->obj_name, + OBJ_SEARCH_KEY | OBJ_UNLINK); + if (monitored) { + int idx; + + for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) { + struct transport_monitor_notifier *notifier; + + notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx); + notifier->cb(notifier->data); + } + ao2_ref(monitored, -1); + } + break; + default: + break; + } + + ao2_ref(transports, -1); + } + + /* Loop over other transport state callbacks registered with us. */ + if (!AST_LIST_EMPTY(&transport_state_list)) { + struct ast_sip_tpmgr_state_callback *tpmgr_notifier; + + AST_RWLIST_RDLOCK(&transport_state_list); + AST_LIST_TRAVERSE(&transport_state_list, tpmgr_notifier, node) { + tpmgr_notifier->cb(transport, state, info); + } + AST_RWLIST_UNLOCK(&transport_state_list); + } + + /* Forward to the old state callback if present */ + if (tpmgr_state_callback) { + tpmgr_state_callback(transport, state, info); + } +} + +static int transport_monitor_unregister_all(void *obj, void *arg, int flags) +{ + struct transport_monitor *monitored = obj; + ast_transport_monitor_shutdown_cb cb = arg; + int idx; + + for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) { + struct transport_monitor_notifier *notifier; + + notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx); + if (notifier->cb == cb) { + ao2_cleanup(notifier->data); + AST_VECTOR_REMOVE_UNORDERED(&monitored->monitors, idx); + break; + } + } + return 0; +} + +void ast_sip_transport_monitor_unregister_all(ast_transport_monitor_shutdown_cb cb) +{ + struct ao2_container *transports; + + transports = ao2_global_obj_ref(active_transports); + if (!transports) { + return; + } + ao2_callback(transports, OBJ_MULTIPLE | OBJ_NODATA, transport_monitor_unregister_all, + cb); + ao2_ref(transports, -1); +} + +void ast_sip_transport_monitor_unregister(pjsip_transport *transport, ast_transport_monitor_shutdown_cb cb) +{ + struct ao2_container *transports; + struct transport_monitor *monitored; + + transports = ao2_global_obj_ref(active_transports); + if (!transports) { + return; + } + + ao2_lock(transports); + monitored = ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (monitored) { + int idx; + + for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) { + struct transport_monitor_notifier *notifier; + + notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx); + if (notifier->cb == cb) { + ao2_cleanup(notifier->data); + AST_VECTOR_REMOVE_UNORDERED(&monitored->monitors, idx); + break; + } + } + ao2_ref(monitored, -1); + } + ao2_unlock(transports); + ao2_ref(transports, -1); +} + +enum ast_transport_monitor_reg ast_sip_transport_monitor_register(pjsip_transport *transport, + ast_transport_monitor_shutdown_cb cb, void *ao2_data) +{ + struct ao2_container *transports; + struct transport_monitor *monitored; + enum ast_transport_monitor_reg res = AST_TRANSPORT_MONITOR_REG_NOT_FOUND; + + transports = ao2_global_obj_ref(active_transports); + if (!transports) { + return res; + } + + ao2_lock(transports); + monitored = ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NOLOCK); + if (monitored) { + int idx; + struct transport_monitor_notifier new_monitor; + + /* Check if the callback monitor already exists */ + for (idx = AST_VECTOR_SIZE(&monitored->monitors); idx--;) { + struct transport_monitor_notifier *notifier; + + notifier = AST_VECTOR_GET_ADDR(&monitored->monitors, idx); + if (notifier->cb == cb) { + /* The monitor is already in the vector replace with new ao2_data. */ + ao2_replace(notifier->data, ao2_data); + res = AST_TRANSPORT_MONITOR_REG_REPLACED; + goto register_done; + } + } + + /* Add new monitor to vector */ + new_monitor.cb = cb; + new_monitor.data = ao2_bump(ao2_data); + if (AST_VECTOR_APPEND(&monitored->monitors, new_monitor)) { + ao2_cleanup(ao2_data); + res = AST_TRANSPORT_MONITOR_REG_FAILED; + } + +register_done: + ao2_ref(monitored, -1); + } + ao2_unlock(transports); + ao2_ref(transports, -1); + return res; +} + +void ast_sip_transport_state_unregister(struct ast_sip_tpmgr_state_callback *element) +{ + AST_RWLIST_WRLOCK(&transport_state_list); + AST_LIST_REMOVE(&transport_state_list, element, node); + AST_RWLIST_UNLOCK(&transport_state_list); +} + +void ast_sip_transport_state_register(struct ast_sip_tpmgr_state_callback *element) +{ + struct ast_sip_tpmgr_state_callback *tpmgr_notifier; + + AST_RWLIST_WRLOCK(&transport_state_list); + AST_LIST_TRAVERSE(&transport_state_list, tpmgr_notifier, node) { + if (element == tpmgr_notifier) { + /* Already registered. */ + AST_RWLIST_UNLOCK(&transport_state_list); + return; + } + } + AST_LIST_INSERT_HEAD(&transport_state_list, element, node); + AST_RWLIST_UNLOCK(&transport_state_list); +} + +void ast_sip_destroy_transport_events(void) +{ + pjsip_tpmgr *tpmgr; + + tpmgr = pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint()); + if (tpmgr) { + pjsip_tpmgr_set_state_cb(tpmgr, tpmgr_state_callback); + } + + ao2_global_obj_release(active_transports); +} + +int ast_sip_initialize_transport_events(void) +{ + pjsip_tpmgr *tpmgr; + struct ao2_container *transports; + + tpmgr = pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint()); + if (!tpmgr) { + return -1; + } + + transports = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, + ACTIVE_TRANSPORTS_BUCKETS, transport_monitor_hash_fn, NULL, + transport_monitor_cmp_fn); + if (!transports) { + return -1; + } + ao2_global_obj_replace_unref(active_transports, transports); + ao2_ref(transports, -1); + + tpmgr_state_callback = pjsip_tpmgr_get_state_cb(tpmgr); + pjsip_tpmgr_set_state_cb(tpmgr, &transport_state_callback); + + return 0; +} diff --git a/res/res_pjsip_transport_management.c b/res/res_pjsip_transport_management.c index 86c53ca9e27e33a15fed7dc7591e3b5714fc38e2..eb92eb7a519ca8dd233cd41f7b3e30157c7e401f 100644 --- a/res/res_pjsip_transport_management.c +++ b/res/res_pjsip_transport_management.c @@ -34,7 +34,7 @@ #include "asterisk/astobj2.h" /*! \brief Number of buckets for monitored transports */ -#define TRANSPORTS_BUCKETS 53 +#define TRANSPORTS_BUCKETS 127 #define IDLE_TIMEOUT (pjsip_cfg()->tsx.td) @@ -53,9 +53,6 @@ static pthread_t keepalive_thread = AST_PTHREADT_NULL; /*! \brief The global interval at which to send keepalives */ static unsigned int keepalive_interval; -/*! \brief Existing transport manager callback that we need to invoke */ -static pjsip_tp_state_callback tpmgr_state_callback; - /*! \brief Structure for transport to be monitored */ struct monitored_transport { /*! \brief The underlying PJSIP transport */ @@ -178,14 +175,13 @@ static void monitored_transport_state_callback(pjsip_transport *transport, pjsip /* Let the scheduler inherit the reference from allocation */ if (ast_sched_add_variable(sched, IDLE_TIMEOUT, idle_sched_cb, monitored, 1) < 0) { /* Uh Oh. Could not schedule the idle check. Kill the transport. */ - ao2_unlink(transports, monitored); - ao2_ref(monitored, -1); pjsip_transport_shutdown(transport); + } else { + /* monitored ref successfully passed to idle_sched_cb() */ + break; } - } else { - /* No scheduled task, so get rid of the allocation reference */ - ao2_ref(monitored, -1); } + ao2_ref(monitored, -1); break; case PJSIP_TP_STATE_SHUTDOWN: case PJSIP_TP_STATE_DISCONNECTED: @@ -197,13 +193,12 @@ static void monitored_transport_state_callback(pjsip_transport *transport, pjsip ao2_ref(transports, -1); } - - /* Forward to the old state callback if present */ - if (tpmgr_state_callback) { - tpmgr_state_callback(transport, state, info); - } } +struct ast_sip_tpmgr_state_callback monitored_transport_reg = { + monitored_transport_state_callback, +}; + /*! \brief Hashing function for monitored transport */ static int monitored_transport_hash_fn(const void *obj, int flags) { @@ -327,16 +322,9 @@ static pjsip_module idle_monitor_module = { static int load_module(void) { struct ao2_container *transports; - pjsip_tpmgr *tpmgr; CHECK_PJSIP_MODULE_LOADED(); - tpmgr = pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint()); - if (!tpmgr) { - ast_log(LOG_ERROR, "No transport manager to attach keepalive functionality to.\n"); - return AST_MODULE_LOAD_DECLINE; - } - transports = ao2_container_alloc(TRANSPORTS_BUCKETS, monitored_transport_hash_fn, monitored_transport_cmp_fn); if (!transports) { @@ -363,8 +351,7 @@ static int load_module(void) ast_sip_register_service(&idle_monitor_module); - tpmgr_state_callback = pjsip_tpmgr_get_state_cb(tpmgr); - pjsip_tpmgr_set_state_cb(tpmgr, &monitored_transport_state_callback); + ast_sip_transport_state_register(&monitored_transport_reg); ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &keepalive_global_observer); ast_sorcery_reload_object(ast_sip_get_sorcery(), "global"); @@ -375,8 +362,6 @@ static int load_module(void) static int unload_module(void) { - pjsip_tpmgr *tpmgr; - if (keepalive_interval) { keepalive_interval = 0; if (keepalive_thread != AST_PTHREADT_NULL) { @@ -388,10 +373,7 @@ static int unload_module(void) ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &keepalive_global_observer); - tpmgr = pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint()); - if (tpmgr) { - pjsip_tpmgr_set_state_cb(tpmgr, tpmgr_state_callback); - } + ast_sip_transport_state_unregister(&monitored_transport_reg); ast_sip_unregister_service(&idle_monitor_module);