diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index af11ea4b6fb7c2abc510f7447b1f18a18394a205..0ce50912c038c3956a02e5afb12d9655c754a796 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -2105,4 +2105,10 @@ int ast_sip_get_host_ip(int af, pj_sockaddr *addr); */ const char *ast_sip_get_host_ip_string(int af); +/*! + * \brief Return the size of the SIP threadpool's task queue + * \since 13.7.0 + */ +long ast_sip_threadpool_queue_size(void); + #endif /* _RES_PJSIP_H */ diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index f16f144cb558ef9fcdc130a4ce91da89d8987dfc..06368867a8f309d24666b1a7941d6fa3380a4108 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -262,4 +262,10 @@ int ast_taskprocessor_is_task(struct ast_taskprocessor *tps); */ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps); +/*! + * \brief Return the current size of the taskprocessor queue + * \since 13.7.0 + */ +long ast_taskprocessor_size(struct ast_taskprocessor *tps); + #endif /* __AST_TASKPROCESSOR_H__ */ diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index 1c6705856b1b7a03f1a2c6e40eb225303838c8d1..9a09d17d4466dc69b9122cbc92d7c10346a25dbb 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -239,4 +239,10 @@ struct ast_taskprocessor *ast_threadpool_serializer_get_current(void); */ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool); +/*! + * \brief Return the size of the threadpool's task queue + * \since 13.7.0 + */ +long ast_threadpool_queue_size(struct ast_threadpool *pool); + #endif /* ASTERISK_THREADPOOL_H */ diff --git a/main/taskprocessor.c b/main/taskprocessor.c index 227fc3e81f14335496c9b501715406370a87d11e..cdac2c8a002fbb1e7a759993b4e258d9e6da9143 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -130,9 +130,6 @@ static int tps_ping_handler(void *datap); /*! \brief Remove the front task off the taskprocessor queue */ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps); -/*! \brief Return the size of the taskprocessor queue */ -static int tps_taskprocessor_depth(struct ast_taskprocessor *tps); - static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); @@ -508,7 +505,7 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps) return task; } -static int tps_taskprocessor_depth(struct ast_taskprocessor *tps) +long ast_taskprocessor_size(struct ast_taskprocessor *tps) { return (tps) ? tps->tps_queue_size : -1; } @@ -766,7 +763,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) { struct ast_taskprocessor_local local; struct tps_task *t; - int size; + long size; ao2_lock(tps); t = tps_taskprocessor_pop(tps); @@ -798,7 +795,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) * after we pop an empty stack. */ tps->executing = 0; - size = tps_taskprocessor_depth(tps); + size = ast_taskprocessor_size(tps); /* If we executed a task, bump the stats */ if (tps->stats) { tps->stats->_tasks_processed_count++; diff --git a/main/threadpool.c b/main/threadpool.c index 229528c030bfbc6bce28db6587bdec70720b9bd7..81d76c4457ecc4ff2c84b4e367bb4da2290846cf 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -1276,3 +1276,8 @@ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast return tps; } + +long ast_threadpool_queue_size(struct ast_threadpool *pool) +{ + return ast_taskprocessor_size(pool->tps); +} diff --git a/res/res_pjsip.c b/res/res_pjsip.c index 0b541afe989fe3fd9bf7d79787ce32d6815aed0a..d5b68bccdee8de0faeb5d45f102c67a78a06e20c 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -3408,6 +3408,11 @@ static void remove_request_headers(pjsip_endpoint *endpt) } } +long ast_sip_threadpool_queue_size(void) +{ + return ast_threadpool_queue_size(sip_threadpool); +} + AST_TEST_DEFINE(xml_sanitization_end_null) { char sanitized[8]; diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c index 1294379bcb9778dd67853d7937331294504a3780..3b6c262ecd09a94d02546fdc1a07fe905e2cb734 100644 --- a/res/res_pjsip/pjsip_distributor.c +++ b/res/res_pjsip/pjsip_distributor.c @@ -245,6 +245,8 @@ static pjsip_module endpoint_mod = { .on_rx_request = endpoint_lookup, }; +#define SIP_MAX_QUEUE 500L + static pj_bool_t distributor(pjsip_rx_data *rdata) { pjsip_dialog *dlg = find_dialog(rdata); @@ -279,7 +281,17 @@ static pj_bool_t distributor(pjsip_rx_data *rdata) clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint); } - ast_sip_push_task(serializer, distribute, clone); + if (ast_sip_threadpool_queue_size() > SIP_MAX_QUEUE) { + /* When the threadpool is backed up this much, there is a good chance that we have encountered + * some sort of terrible condition and don't need to be adding more work to the threadpool. + * It's in our best interest to send back a 503 response and be done with it. + */ + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL); + ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]); + pjsip_rx_data_free_cloned(clone); + } else { + ast_sip_push_task(serializer, distribute, clone); + } end: if (dlg) {