diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index e1e7727f5f86bc7649a16df4a760f8d1b41d2935..1c6705856b1b7a03f1a2c6e40eb225303838c8d1 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -195,6 +195,22 @@ int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), vo */ void ast_threadpool_shutdown(struct ast_threadpool *pool); +/*! + * \brief Get the threadpool serializer currently associated with this thread. + * \since 14.0.0 + * + * \note The returned pointer is valid while the serializer + * thread is running. + * + * \note Use ao2_ref() on serializer if you are going to keep it + * for another thread. To unref it you must then use + * ast_taskprocessor_unreference(). + * + * \retval serializer on success. + * \retval NULL on error or no serializer associated with the thread. + */ +struct ast_taskprocessor *ast_threadpool_serializer_get_current(void); + /*! * \brief Serialized execution of tasks within a \ref ast_threadpool. * diff --git a/main/threadpool.c b/main/threadpool.c index 597e83e10616af033007ec2be8df5a0f13ddf399..6b412d27f9fa0e7054b1445e933423d029783dd8 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -1150,13 +1150,17 @@ static struct serializer *serializer_create(struct ast_threadpool *pool) return ser; } +AST_THREADSTORAGE_RAW(current_serializer); + static int execute_tasks(void *data) { struct ast_taskprocessor *tps = data; + ast_threadstorage_set_ptr(¤t_serializer, tps); while (ast_taskprocessor_execute(tps)) { /* No-op */ } + ast_threadstorage_set_ptr(¤t_serializer, NULL); ast_taskprocessor_unreference(tps); return 0; @@ -1192,6 +1196,11 @@ static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callb .shutdown = serializer_shutdown, }; +struct ast_taskprocessor *ast_threadpool_serializer_get_current(void) +{ + return ast_threadstorage_get_ptr(¤t_serializer); +} + struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool) { RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup); diff --git a/res/res_pjsip.c b/res/res_pjsip.c index 6e7bd68c080d25a27437920739fc7fd382275387..f90b47552fe486feb9d0ca97179f630611667194 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -1864,6 +1864,15 @@ #define MOD_DATA_CONTACT "contact" +/*! Number of serializers in pool if one not supplied. */ +#define SERIALIZER_POOL_SIZE 8 + +/*! Next serializer pool index to use. */ +static int serializer_pool_pos; + +/*! Pool of serializers to use if not supplied. */ +static struct ast_taskprocessor *serializer_pool[SERIALIZER_POOL_SIZE]; + static pjsip_endpoint *ast_pjsip_endpoint; static struct ast_threadpool *sip_threadpool; @@ -3323,8 +3332,62 @@ struct ast_taskprocessor *ast_sip_create_serializer(void) return serializer; } +/*! + * \internal + * \brief Shutdown the serializers in the default pool. + * \since 14.0.0 + * + * \return Nothing + */ +static void serializer_pool_shutdown(void) +{ + int idx; + + for (idx = 0; idx < SERIALIZER_POOL_SIZE; ++idx) { + ast_taskprocessor_unreference(serializer_pool[idx]); + serializer_pool[idx] = NULL; + } +} + +/*! + * \internal + * \brief Setup the serializers in the default pool. + * \since 14.0.0 + * + * \retval 0 on success. + * \retval -1 on error. + */ +static int serializer_pool_setup(void) +{ + int idx; + + for (idx = 0; idx < SERIALIZER_POOL_SIZE; ++idx) { + serializer_pool[idx] = ast_sip_create_serializer(); + if (!serializer_pool[idx]) { + serializer_pool_shutdown(); + return -1; + } + } + return 0; +} + int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { + if (!serializer) { + unsigned int pos; + + /* + * Pick a serializer to use from the pool. + * + * Note: We don't care about any reentrancy behavior + * when incrementing serializer_pool_pos. If it gets + * incorrectly incremented it doesn't matter. + */ + pos = serializer_pool_pos++; + pos %= SERIALIZER_POOL_SIZE; + serializer = serializer_pool[pos]; + } + if (serializer) { return ast_taskprocessor_push(serializer, sip_task, task_data); } else { @@ -3377,18 +3440,10 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si std.task = sip_task; std.task_data = task_data; - if (serializer) { - if (ast_taskprocessor_push(serializer, sync_task, &std)) { - ast_mutex_destroy(&std.lock); - ast_cond_destroy(&std.cond); - return -1; - } - } else { - if (ast_threadpool_push(sip_threadpool, sync_task, &std)) { - ast_mutex_destroy(&std.lock); - ast_cond_destroy(&std.cond); - return -1; - } + if (ast_sip_push_task(serializer, sync_task, &std)) { + ast_mutex_destroy(&std.lock); + ast_cond_destroy(&std.cond); + return -1; } ast_mutex_lock(&std.lock); @@ -3679,6 +3734,18 @@ static int load_module(void) return AST_MODULE_LOAD_DECLINE; } + if (serializer_pool_setup()) { + ast_log(LOG_ERROR, "Failed to create SIP serializer pool. Aborting load\n"); + ast_threadpool_shutdown(sip_threadpool); + ast_sip_destroy_system(); + pj_pool_release(memory_pool); + memory_pool = NULL; + pjsip_endpt_destroy(ast_pjsip_endpoint); + ast_pjsip_endpoint = NULL; + pj_caching_pool_destroy(&caching_pool); + return AST_MODULE_LOAD_DECLINE; + } + pjsip_tsx_layer_init_module(ast_pjsip_endpoint); pjsip_ua_init_module(ast_pjsip_endpoint, NULL); @@ -3792,6 +3859,7 @@ static int unload_module(void) */ ast_sip_push_task_synchronous(NULL, unload_pjsip, NULL); + serializer_pool_shutdown(); ast_threadpool_shutdown(sip_threadpool); ast_sip_destroy_cli(); diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c index e32f02833f7dc94561c556e58fa1430a07ad6d31..9b052603a954d756fa04ff86dedd8864f809e689 100644 --- a/res/res_pjsip/pjsip_distributor.c +++ b/res/res_pjsip/pjsip_distributor.c @@ -22,22 +22,106 @@ #include "asterisk/res_pjsip.h" #include "include/res_pjsip_private.h" +#include "asterisk/taskprocessor.h" +#include "asterisk/threadpool.h" static int distribute(void *data); static pj_bool_t distributor(pjsip_rx_data *rdata); +static pj_status_t record_serializer(pjsip_tx_data *tdata); static pjsip_module distributor_mod = { .name = {"Request Distributor", 19}, .priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 6, + .on_tx_request = record_serializer, .on_rx_request = distributor, .on_rx_response = distributor, }; +/*! + * \internal + * \brief Record the task's serializer name on the tdata structure. + * \since 14.0.0 + * + * \param tdata The outgoing message. + * + * \retval PJ_SUCCESS. + */ +static pj_status_t record_serializer(pjsip_tx_data *tdata) +{ + struct ast_taskprocessor *serializer; + + serializer = ast_threadpool_serializer_get_current(); + if (serializer) { + const char *name; + + name = ast_taskprocessor_name(serializer); + if (!ast_strlen_zero(name) + && (!tdata->mod_data[distributor_mod.id] + || strcmp(tdata->mod_data[distributor_mod.id], name))) { + char *tdata_name; + + /* The serializer in use changed. */ + tdata_name = pj_pool_alloc(tdata->pool, strlen(name) + 1); + strcpy(tdata_name, name);/* Safe */ + + tdata->mod_data[distributor_mod.id] = tdata_name; + } + } + + return PJ_SUCCESS; +} + +/*! + * \internal + * \brief Find the request tdata to get the serializer it used. + * \since 14.0.0 + * + * \param rdata The incoming message. + * + * \retval serializer on success. + * \retval NULL on error or could not find the serializer. + */ +static struct ast_taskprocessor *find_request_serializer(pjsip_rx_data *rdata) +{ + struct ast_taskprocessor *serializer = NULL; + pj_str_t tsx_key; + pjsip_transaction *tsx; + + pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAC, + &rdata->msg_info.cseq->method, rdata); + + tsx = pjsip_tsx_layer_find_tsx(&tsx_key, PJ_TRUE); + if (!tsx) { + ast_debug(1, "Could not find %.*s transaction for %d response.\n", + (int) pj_strlen(&rdata->msg_info.cseq->method.name), + pj_strbuf(&rdata->msg_info.cseq->method.name), + rdata->msg_info.msg->line.status.code); + return NULL; + } + + if (tsx->last_tx) { + const char *serializer_name; + + serializer_name = tsx->last_tx->mod_data[distributor_mod.id]; + if (!ast_strlen_zero(serializer_name)) { + serializer = ast_taskprocessor_get(serializer_name, TPS_REF_IF_EXISTS); + } + } + +#ifdef HAVE_PJ_TRANSACTION_GRP_LOCK + pj_grp_lock_release(tsx->grp_lock); +#else + pj_mutex_unlock(tsx->mutex); +#endif + + return serializer; +} + /*! Dialog-specific information the distributor uses */ struct distributor_dialog_data { - /* Serializer to distribute tasks to for this dialog */ + /*! Serializer to distribute tasks to for this dialog */ struct ast_taskprocessor *serializer; - /* Endpoint associated with this dialog */ + /*! Endpoint associated with this dialog */ struct ast_sip_endpoint *endpoint; }; @@ -167,6 +251,7 @@ static pj_bool_t distributor(pjsip_rx_data *rdata) pjsip_dialog *dlg = find_dialog(rdata); struct distributor_dialog_data *dist = NULL; struct ast_taskprocessor *serializer = NULL; + struct ast_taskprocessor *req_serializer = NULL; pjsip_rx_data *clone; if (dlg) { @@ -176,11 +261,16 @@ static pj_bool_t distributor(pjsip_rx_data *rdata) } } - if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG && ( - !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method) || - !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) && - !serializer) { - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 481, NULL, NULL, NULL); + if (serializer) { + /* We have a serializer so we know where to send the message. */ + } else if (rdata->msg_info.msg->type == PJSIP_RESPONSE_MSG) { + req_serializer = find_request_serializer(rdata); + serializer = req_serializer; + } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method) + || !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) { + /* We have a BYE or CANCEL request without a serializer. */ + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, + PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL); goto end; } @@ -196,6 +286,7 @@ end: if (dlg) { pjsip_dlg_dec_lock(dlg); } + ast_taskprocessor_unreference(req_serializer); return PJ_TRUE; } diff --git a/res/res_pjsip/pjsip_resolver.c b/res/res_pjsip/pjsip_resolver.c index 915d1d90a53672b02bf1f1a79ede6c31114cf7bb..4d4d36e27508736bf17a32ebfec6dac6cb11de39 100644 --- a/res/res_pjsip/pjsip_resolver.c +++ b/res/res_pjsip/pjsip_resolver.c @@ -30,6 +30,8 @@ #include "asterisk/dns_naptr.h" #include "asterisk/res_pjsip.h" #include "include/res_pjsip_private.h" +#include "asterisk/taskprocessor.h" +#include "asterisk/threadpool.h" #ifdef HAVE_PJSIP_EXTERNAL_RESOLVER @@ -52,6 +54,8 @@ struct sip_resolve { struct ast_dns_query_set *queries; /*! \brief Current viable server addresses */ pjsip_server_addresses addresses; + /*! \brief Serializer to run async callback into pjlib. */ + struct ast_taskprocessor *serializer; /*! \brief Callback to invoke upon completion */ pjsip_resolver_callback *callback; /*! \brief User provided data */ @@ -97,6 +101,7 @@ static void sip_resolve_destroy(void *data) AST_VECTOR_FREE(&resolve->resolving); ao2_cleanup(resolve->queries); + ast_taskprocessor_unreference(resolve->serializer); } /*! @@ -398,7 +403,7 @@ static void sip_resolve_callback(const struct ast_dns_query_set *query_set) /* Push a task to invoke the callback, we do this so it is guaranteed to run in a PJSIP thread */ ao2_ref(resolve, +1); - if (ast_sip_push_task(NULL, sip_resolve_invoke_user_callback, resolve)) { + if (ast_sip_push_task(resolve->serializer, sip_resolve_invoke_user_callback, resolve)) { ao2_ref(resolve, -1); } @@ -572,6 +577,8 @@ static void sip_resolve(pjsip_resolver_t *resolver, pj_pool_t *pool, const pjsip return; } + resolve->serializer = ao2_bump(ast_threadpool_serializer_get_current()); + ast_debug(2, "[%p] Starting initial resolution using parallel queries for target '%s'\n", resolve, host); ast_dns_query_set_resolve_async(resolve->queries, sip_resolve_callback, resolve);