diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 459082901f82308a150b7f88c4836b74eda276d8..37b7662119b372e8f5a8c6b19bc42c036f23d661 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -2116,4 +2116,10 @@ int ast_sip_get_host_ip(int af, pj_sockaddr *addr); */ const char *ast_sip_get_host_ip_string(int af); +/*! + * \brief Return the size of the SIP threadpool's task queue + * \since 13.7.0 + */ +long ast_sip_threadpool_queue_size(void); + #endif /* _RES_PJSIP_H */ diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index f16f144cb558ef9fcdc130a4ce91da89d8987dfc..06368867a8f309d24666b1a7941d6fa3380a4108 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -262,4 +262,10 @@ int ast_taskprocessor_is_task(struct ast_taskprocessor *tps); */ const char *ast_taskprocessor_name(struct ast_taskprocessor *tps); +/*! + * \brief Return the current size of the taskprocessor queue + * \since 13.7.0 + */ +long ast_taskprocessor_size(struct ast_taskprocessor *tps); + #endif /* __AST_TASKPROCESSOR_H__ */ diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index 75ce0e4e4ef114901e48ef3062015601c9a79653..0f360c7a4a49539d567fd19b04544c470e2d7360 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -292,4 +292,10 @@ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name, struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group); +/*! + * \brief Return the size of the threadpool's task queue + * \since 13.7.0 + */ +long ast_threadpool_queue_size(struct ast_threadpool *pool); + #endif /* ASTERISK_THREADPOOL_H */ diff --git a/main/taskprocessor.c b/main/taskprocessor.c index f382814af9796f4beb13321f98ea2814c9a793ab..eefa85f6162cb2165d33758f7d6fd366b6975c96 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -130,9 +130,6 @@ static int tps_ping_handler(void *datap); /*! \brief Remove the front task off the taskprocessor queue */ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps); -/*! \brief Return the size of the taskprocessor queue */ -static int tps_taskprocessor_depth(struct ast_taskprocessor *tps); - static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); @@ -508,7 +505,7 @@ static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps) return task; } -static int tps_taskprocessor_depth(struct ast_taskprocessor *tps) +long ast_taskprocessor_size(struct ast_taskprocessor *tps) { return (tps) ? tps->tps_queue_size : -1; } @@ -765,7 +762,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) { struct ast_taskprocessor_local local; struct tps_task *t; - int size; + long size; ao2_lock(tps); t = tps_taskprocessor_pop(tps); @@ -797,7 +794,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps) * after we pop an empty stack. */ tps->executing = 0; - size = tps_taskprocessor_depth(tps); + size = ast_taskprocessor_size(tps); /* If we executed a task, bump the stats */ if (tps->stats) { tps->stats->_tasks_processed_count++; diff --git a/main/threadpool.c b/main/threadpool.c index 46de9b7f805266f4c8bdb186ee4f7214c1c9301b..60e1e9a3b474cf8d1471f7d800d1196f67ba445e 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -1397,3 +1397,8 @@ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast { return ast_threadpool_serializer_group(name, pool, NULL); } + +long ast_threadpool_queue_size(struct ast_threadpool *pool) +{ + return ast_taskprocessor_size(pool->tps); +} diff --git a/res/res_pjsip.c b/res/res_pjsip.c index d2b393fccf930633e91362d24d377438676fd666..babbe7aaaa943bbb9f7018571baaa310fce218d6 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -3755,6 +3755,11 @@ static void remove_request_headers(pjsip_endpoint *endpt) } } +long ast_sip_threadpool_queue_size(void) +{ + return ast_threadpool_queue_size(sip_threadpool); +} + AST_TEST_DEFINE(xml_sanitization_end_null) { char sanitized[8]; diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c index 9b052603a954d756fa04ff86dedd8864f809e689..1d39e0fd2802f230014e79c07322392872ce5e66 100644 --- a/res/res_pjsip/pjsip_distributor.c +++ b/res/res_pjsip/pjsip_distributor.c @@ -246,6 +246,8 @@ static pjsip_module endpoint_mod = { .on_rx_request = endpoint_lookup, }; +#define SIP_MAX_QUEUE 500L + static pj_bool_t distributor(pjsip_rx_data *rdata) { pjsip_dialog *dlg = find_dialog(rdata); @@ -280,7 +282,17 @@ static pj_bool_t distributor(pjsip_rx_data *rdata) clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint); } - ast_sip_push_task(serializer, distribute, clone); + if (ast_sip_threadpool_queue_size() > SIP_MAX_QUEUE) { + /* When the threadpool is backed up this much, there is a good chance that we have encountered + * some sort of terrible condition and don't need to be adding more work to the threadpool. + * It's in our best interest to send back a 503 response and be done with it. + */ + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL); + ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]); + pjsip_rx_data_free_cloned(clone); + } else { + ast_sip_push_task(serializer, distribute, clone); + } end: if (dlg) {