From c2adeb9dc2e4fec349f149c3d90693694da86027 Mon Sep 17 00:00:00 2001 From: George Joseph <gjoseph@digium.com> Date: Fri, 15 Feb 2019 11:53:50 -0700 Subject: [PATCH] taskprocessor: Enable subsystems and overload by subsystem To prevent one subsystem's taskprocessors from causing others to stall, new capabilities have been added to taskprocessors. * Any taskprocessor name that has a '/' will have the part before the '/' saved as its "subsystem". Examples: "sorcery/acl-0000006a" and "sorcery/aor-00000019" will be grouped to subsystem "sorcery". "pjsip/distributor-00000025" and "pjsip/distributor-00000026" will bn grouped to subsystem "pjsip". Taskprocessors with no '/' have an empty subsystem. * When a taskprocessor enters high-water alert status and it has a non-empty subsystem, the subsystem alert count will be incremented. * When a taskprocessor leaves high-water alert status and it has a non-empty subsystem, the subsystem alert count will be decremented. * A new api ast_taskprocessor_get_subsystem_alert() has been added that returns the number of taskprocessors in alert for the subsystem. * A new CLI command "core show taskprocessor alerted subsystems" has been added. * A new unit test was addded. REMINDER: The taskprocessor code itself doesn't take any action based on high-water alerts or overloading. It's up to taskprocessor users to check and take action themselves. Currently only the pjsip distributor does this. * A new pjsip/global option "taskprocessor_overload_trigger" has been added that allows the user to select the trigger mechanism the distributor uses to pause accepting new requests. "none": Don't pause on any overload condition. "global": Pause on ANY taskprocessor overload (the default and current behavior) "pjsip_only": Pause only on pjsip taskprocessor overloads. * The core pjsip pool was renamed from "SIP" to "pjsip" so it can be properly grouped into the "pjsip" subsystem. * stasis taskprocessor names were changed to "stasis" as the subsystem. * Sorcery core taskprocessor names were changed to "sorcery" to match the object taskprocessors. Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56 --- CHANGES | 9 + configs/samples/pjsip.conf.sample | 11 + ...b8695b66_taskprocessor_overload_trigger.py | 42 ++++ include/asterisk/taskprocessor.h | 13 ++ main/sorcery.c | 2 +- main/stasis.c | 4 +- main/taskprocessor.c | 201 +++++++++++++++++- main/threadpool.c | 7 +- res/res_pjsip.c | 22 +- res/res_pjsip/config_global.c | 58 +++++ res/res_pjsip/include/res_pjsip_private.h | 10 + res/res_pjsip/pjsip_distributor.c | 8 +- tests/test_taskprocessor.c | 146 +++++++++++++ 13 files changed, 523 insertions(+), 10 deletions(-) create mode 100644 contrib/ast-db-manage/config/versions/f3c0b8695b66_taskprocessor_overload_trigger.py diff --git a/CHANGES b/CHANGES index 78e019efd7..7ae60000a9 100644 --- a/CHANGES +++ b/CHANGES @@ -83,6 +83,15 @@ ARI types defined in the "disallowed" list are not sent to the application. Note that if a type is specified in both lists "disallowed" takes precedence. +res_pjsip +------------------ + * A new configuration parameter "taskprocessor_overload_trigger" has been + added to the pjsip.conf "globals" section. The distributor currently stops + accepting new requests when any taskprocessor overload is triggered. The + new option allows you to completely disable overload detection (NOT + RECOMMENDED), keep the current behavior, or trigger only on pjsip + taskprocessor overloads. + ------------------------------------------------------------------------------ --- Functionality changes from Asterisk 16.1.0 to Asterisk 16.2.0 ------------ ------------------------------------------------------------------------------ diff --git a/configs/samples/pjsip.conf.sample b/configs/samples/pjsip.conf.sample index 0ed01f0905..29f53a5e51 100644 --- a/configs/samples/pjsip.conf.sample +++ b/configs/samples/pjsip.conf.sample @@ -1149,6 +1149,17 @@ ; event when a device refreshes its registration ; (default: "no") +;taskprocessor_overload_trigger=global + ; Set the trigger the distributor will use to detect + ; taskprocessor overloads. When triggered, the distributor + ; will not accept any new requests until the overload has + ; cleared. + : "global": (default) Any taskprocessor overload will trigger. + ; "pjsip_only": Only pjsip taskprocessor overloads will trigger. + ; "none": No overload detection will be performed. + ; WARNING: The "none" and "pjsip_only" options should be used + ; with extreme caution and only to mitigate specific issues. + ; Under certain conditions they could make things worse. ; MODULE PROVIDING BELOW SECTION(S): res_pjsip_acl ;==========================ACL SECTION OPTIONS========================= diff --git a/contrib/ast-db-manage/config/versions/f3c0b8695b66_taskprocessor_overload_trigger.py b/contrib/ast-db-manage/config/versions/f3c0b8695b66_taskprocessor_overload_trigger.py new file mode 100644 index 0000000000..6a5b9b230b --- /dev/null +++ b/contrib/ast-db-manage/config/versions/f3c0b8695b66_taskprocessor_overload_trigger.py @@ -0,0 +1,42 @@ +"""taskprocessor_overload_trigger + +Revision ID: f3c0b8695b66 +Revises: 0838f8db6a61 +Create Date: 2019-02-15 15:03:50.106790 + +""" + +# revision identifiers, used by Alembic. +revision = 'f3c0b8695b66' +down_revision = '0838f8db6a61' + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects.postgresql import ENUM + +PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME = 'pjsip_taskprocessor_overload_trigger_values' +PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES = ['none', 'global', 'pjsip_only'] + +def upgrade(): + context = op.get_context() + + if context.bind.dialect.name == 'postgresql': + enum = ENUM(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES, + name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME) + enum.create(op.get_bind(), checkfirst=False) + + op.add_column('ps_globals', + sa.Column('taskprocessor_overload_trigger', + sa.Enum(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES, + name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME, + create_type=False))) + +def downgrade(): + if op.get_context().bind.dialect.name == 'mssql': + op.drop_constraint('ck_ps_globals_taskprocessor_overload_trigger_pjsip_taskprocessor_overload_trigger_values', 'ps_globals') + op.drop_column('ps_globals', 'taskprocessor_overload_trigger') + + if context.bind.dialect.name == 'postgresql': + enum = ENUM(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES, + name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME) + enum.drop(op.get_bind(), checkfirst=False) diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index f74989a3c5..5278595c48 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -341,6 +341,19 @@ long ast_taskprocessor_size(struct ast_taskprocessor *tps); */ unsigned int ast_taskprocessor_alert_get(void); + +/*! + * \brief Get the current taskprocessor high water alert count by sybsystem. + * \since 13.26.0 + * \since 16.3.0 + * + * \param subsystem The subsystem name + * + * \retval 0 if no taskprocessors are in high water alert. + * \retval non-zero if some task processors are in high water alert. + */ +unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem); + /*! * \brief Set the high and low alert water marks of the given taskprocessor queue. * \since 13.10.0 diff --git a/main/sorcery.c b/main/sorcery.c index beaad21ce6..8e14881998 100644 --- a/main/sorcery.c +++ b/main/sorcery.c @@ -380,7 +380,7 @@ int ast_sorcery_init(void) }; ast_assert(wizards == NULL); - threadpool = ast_threadpool_create("Sorcery", NULL, &options); + threadpool = ast_threadpool_create("sorcery", NULL, &options); if (!threadpool) { return -1; } diff --git a/main/stasis.c b/main/stasis.c index f05f5ffa5b..204e7c86f6 100644 --- a/main/stasis.c +++ b/main/stasis.c @@ -677,7 +677,7 @@ struct stasis_subscription *internal_stasis_subscribe( char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; /* Create name with seq number appended. */ - ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s", + ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s", use_thread_pool ? 'p' : 'm', stasis_topic_name(topic)); @@ -2593,7 +2593,7 @@ int stasis_init(void) threadpool_opts.auto_increment = 1; threadpool_opts.max_size = cfg->threadpool_options->max_size; threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec; - pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts); + pool = ast_threadpool_create("stasis", NULL, &threadpool_opts); ao2_ref(cfg, -1); if (!pool) { ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n"); diff --git a/main/taskprocessor.c b/main/taskprocessor.c index 30aeddbd82..9ebbf390e5 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -89,7 +89,11 @@ struct ast_taskprocessor { unsigned int high_water_alert:1; /*! Indicates if the taskprocessor is currently suspended */ unsigned int suspended:1; - /*! \brief Friendly name of the taskprocessor */ + /*! \brief Anything before the first '/' in the name (if there is one) */ + char *subsystem; + /*! \brief Friendly name of the taskprocessor. + * Subsystem is appended after the name's NULL terminator. + */ char name[0]; }; @@ -112,6 +116,16 @@ struct ast_taskprocessor_listener { void *user_data; }; +/*! + * Keep track of which subsystems are in alert + * and how many of their taskprocessors are overloaded. + */ +struct subsystem_alert { + unsigned int alert_count; + char subsystem[0]; +}; +static AST_VECTOR_RW(subsystem_alert_vector, struct subsystem_alert *) overloaded_subsystems; + #ifdef LOW_MEMORY #define TPS_MAX_BUCKETS 61 #else @@ -138,10 +152,12 @@ static int tps_ping_handler(void *datap); static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); +static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a); static struct ast_cli_entry taskprocessor_clis[] = { AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"), AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"), + AST_CLI_DEFINE(cli_subsystem_alert_report, "List task processor subsystems in alert"), }; struct default_taskprocessor_listener_pvt { @@ -271,6 +287,8 @@ static const struct ast_taskprocessor_listener_callbacks default_listener_callba static void tps_shutdown(void) { ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis)); + AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, ast_free); + AST_VECTOR_RW_FREE(&overloaded_subsystems); ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown"); tps_singletons = NULL; } @@ -285,6 +303,12 @@ int ast_tps_init(void) return -1; } + if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) { + ao2_ref(tps_singletons, -1); + ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n"); + return -1; + } + ast_cond_init(&cli_ping_cond, NULL); ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis)); @@ -548,6 +572,157 @@ static int tps_cmp_cb(void *obj, void *arg, int flags) return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0; } +static int subsystem_match(struct subsystem_alert *alert, const char *subsystem) +{ + return !strcmp(alert->subsystem, subsystem); +} + +static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b) +{ + return strcmp(a->subsystem, b->subsystem); +} + +unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem) +{ + struct subsystem_alert *alert; + unsigned int count = 0; + int idx; + + AST_VECTOR_RW_RDLOCK(&overloaded_subsystems); + idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match); + if (idx >= 0) { + alert = AST_VECTOR_GET(&overloaded_subsystems, idx); + count = alert->alert_count; + } + AST_VECTOR_RW_UNLOCK(&overloaded_subsystems); + + return count; +} + +static void subsystem_alert_increment(const char *subsystem) +{ + struct subsystem_alert *alert; + int idx; + + if (ast_strlen_zero(subsystem)) { + return; + } + + AST_VECTOR_RW_WRLOCK(&overloaded_subsystems); + idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match); + if (idx >= 0) { + alert = AST_VECTOR_GET(&overloaded_subsystems, idx); + alert->alert_count++; + AST_VECTOR_RW_UNLOCK(&overloaded_subsystems); + return; + } + + alert = ast_malloc(sizeof(*alert) + strlen(subsystem) + 1); + if (!alert) { + AST_VECTOR_RW_UNLOCK(&overloaded_subsystems); + return; + } + alert->alert_count = 1; + strcpy(alert->subsystem, subsystem); /* Safe */ + + if (AST_VECTOR_APPEND(&overloaded_subsystems, alert)) { + ast_free(alert); + } + AST_VECTOR_RW_UNLOCK(&overloaded_subsystems); +} + +static void subsystem_alert_decrement(const char *subsystem) +{ + struct subsystem_alert *alert; + int idx; + + if (ast_strlen_zero(subsystem)) { + return; + } + + AST_VECTOR_RW_WRLOCK(&overloaded_subsystems); + idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match); + if (idx < 0) { + ast_log(LOG_ERROR, + "Can't decrement alert count for subsystem '%s' as it wasn't in alert\n", subsystem); + AST_VECTOR_RW_UNLOCK(&overloaded_subsystems); + return; + } + alert = AST_VECTOR_GET(&overloaded_subsystems, idx); + + alert->alert_count--; + if (alert->alert_count <= 0) { + AST_VECTOR_REMOVE(&overloaded_subsystems, idx, 0); + ast_free(alert); + } + + AST_VECTOR_RW_UNLOCK(&overloaded_subsystems); +} + +static void subsystem_copy(struct subsystem_alert *alert, + struct subsystem_alert_vector *vector) +{ + struct subsystem_alert *alert_copy; + alert_copy = ast_malloc(sizeof(*alert_copy) + strlen(alert->subsystem) + 1); + if (!alert_copy) { + return; + } + alert_copy->alert_count = alert->alert_count; + strcpy(alert_copy->subsystem, alert->subsystem); /* Safe */ + if (AST_VECTOR_ADD_SORTED(vector, alert_copy, subsystem_cmp)) { + ast_free(alert_copy); + } +} + +static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + struct subsystem_alert_vector sorted_subsystems; + int i; + +#define FMT_HEADERS_SUBSYSTEM "%-32s %12s\n" +#define FMT_FIELDS_SUBSYSTEM "%-32s %12u\n" + + switch (cmd) { + case CLI_INIT: + e->command = "core show taskprocessor alerted subsystems"; + e->usage = + "Usage: core show taskprocessor alerted subsystems\n" + " Shows a list of task processor subsystems that are currently alerted\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + + if (a->argc != e->args) { + return CLI_SHOWUSAGE; + } + + if (AST_VECTOR_INIT(&sorted_subsystems, AST_VECTOR_SIZE(&overloaded_subsystems))) { + return CLI_FAILURE; + } + + AST_VECTOR_RW_RDLOCK(&overloaded_subsystems); + for (i = 0; i < AST_VECTOR_SIZE(&overloaded_subsystems); i++) { + subsystem_copy(AST_VECTOR_GET(&overloaded_subsystems, i), &sorted_subsystems); + } + AST_VECTOR_RW_UNLOCK(&overloaded_subsystems); + + ast_cli(a->fd, "\n" FMT_HEADERS_SUBSYSTEM, "Subsystem", "Alert Count"); + + for (i = 0; i < AST_VECTOR_SIZE(&sorted_subsystems); i++) { + struct subsystem_alert *alert = AST_VECTOR_GET(&sorted_subsystems, i); + ast_cli(a->fd, FMT_FIELDS_SUBSYSTEM, alert->subsystem, alert->alert_count); + } + + ast_cli(a->fd, "\n%lu subsystems\n\n", AST_VECTOR_SIZE(&sorted_subsystems)); + + AST_VECTOR_CALLBACK_VOID(&sorted_subsystems, ast_free); + AST_VECTOR_FREE(&sorted_subsystems); + + return CLI_SUCCESS; +} + + /*! Count of the number of taskprocessors in high water alert. */ static unsigned int tps_alert_count; @@ -577,6 +752,15 @@ static void tps_alert_add(struct ast_taskprocessor *tps, int delta) ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n", tps->name, tps_alert_count ? "triggered" : "cleared"); } + + if (tps->subsystem[0] != '\0') { + if (delta > 0) { + subsystem_alert_increment(tps->subsystem); + } else { + subsystem_alert_decrement(tps->subsystem); + } + } + ast_rwlock_unlock(&tps_alert_lock); } @@ -747,8 +931,17 @@ static void *default_listener_pvt_alloc(void) static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener) { struct ast_taskprocessor *p; + char *subsystem_separator; + size_t subsystem_length = 0; + size_t name_length; + + name_length = strlen(name); + subsystem_separator = strchr(name, '/'); + if (subsystem_separator) { + subsystem_length = subsystem_separator - name; + } - p = ao2_alloc(sizeof(*p) + strlen(name) + 1, tps_taskprocessor_dtor); + p = ao2_alloc(sizeof(*p) + name_length + subsystem_length + 2, tps_taskprocessor_dtor); if (!p) { ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name); return NULL; @@ -758,7 +951,9 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10; p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL; - strcpy(p->name, name); /*SAFE*/ + strcpy(p->name, name); /* Safe */ + p->subsystem = p->name + name_length + 1; + ast_copy_string(p->subsystem, name, subsystem_length + 1); ao2_ref(listener, +1); p->listener = listener; diff --git a/main/threadpool.c b/main/threadpool.c index 2ab093663c..56fbb2c33c 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -413,7 +413,7 @@ static struct ast_threadpool *threadpool_alloc(const char *name, const struct as return NULL; } - ast_str_set(&control_tps_name, 0, "%s-control", name); + ast_str_set(&control_tps_name, 0, "%s/pool-control", name); pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT); ast_free(control_tps_name); @@ -919,6 +919,7 @@ struct ast_threadpool *ast_threadpool_create(const char *name, struct ast_taskprocessor *tps; RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup); RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup); + char *fullname; pool = threadpool_alloc(name, options); if (!pool) { @@ -935,7 +936,9 @@ struct ast_threadpool *ast_threadpool_create(const char *name, return NULL; } - tps = ast_taskprocessor_create_with_listener(name, tps_listener); + fullname = ast_alloca(strlen(name) + strlen("/pool") + 1); + sprintf(fullname, "%s/pool", name); /* Safe */ + tps = ast_taskprocessor_create_with_listener(fullname, tps_listener); if (!tps) { return NULL; } diff --git a/res/res_pjsip.c b/res/res_pjsip.c index 4f18010a43..24796fc13a 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -1908,6 +1908,26 @@ <configOption name="send_contact_status_on_update_registration" default="no"> <synopsis>Enable sending AMI ContactStatus event when a device refreshes its registration.</synopsis> </configOption> + <configOption name="taskprocessor_overload_trigger"> + <synopsis>Trigger scope for taskprocessor overloads</synopsis> + <description><para> + This option specifies the trigger the distributor will use for + detecting taskprocessor overloads. When it detects an overload condition, + the distrubutor will stop accepting new requests until the overload is + cleared. + </para> + <enumlist> + <enum name="global"><para>(default) Any taskprocessor overload will trigger.</para></enum> + <enum name="pjsip_only"><para>Only pjsip taskprocessor overloads will trigger.</para></enum> + <enum name="none"><para>No overload detection will be performed.</para></enum> + </enumlist> + <warning><para> + The "none" and "pjsip_only" options should be used + with extreme caution and only to mitigate specific issues. + Under certain conditions they could make things worse. + </para></warning> + </description> + </configOption> </configObject> </configFile> </configInfo> @@ -5298,7 +5318,7 @@ static int load_module(void) /* The serializer needs threadpool and threadpool needs pjproject to be initialized so it's next */ sip_get_threadpool_options(&options); options.thread_start = sip_thread_start; - sip_threadpool = ast_threadpool_create("SIP", NULL, &options); + sip_threadpool = ast_threadpool_create("pjsip", NULL, &options); if (!sip_threadpool) { goto error; } diff --git a/res/res_pjsip/config_global.c b/res/res_pjsip/config_global.c index 38383c5c2b..8f21e50598 100644 --- a/res/res_pjsip/config_global.c +++ b/res/res_pjsip/config_global.c @@ -51,6 +51,7 @@ #define DEFAULT_IGNORE_URI_USER_OPTIONS 0 #define DEFAULT_USE_CALLERID_CONTACT 0 #define DEFAULT_SEND_CONTACT_STATUS_ON_UPDATE_REGISTRATION 0 +#define DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL /*! * \brief Cached global config object @@ -110,6 +111,8 @@ struct global_config { unsigned int use_callerid_contact; /*! Nonzero if need to send AMI ContactStatus event when a contact is updated */ unsigned int send_contact_status_on_update_registration; + /*! Trigger the distributor should use to pause accepting new dialogs */ + enum ast_sip_taskprocessor_overload_trigger overload_trigger; }; static void global_destructor(void *obj) @@ -483,6 +486,58 @@ unsigned int ast_sip_get_send_contact_status_on_update_registration(void) return send_contact_status_on_update_registration; } +enum ast_sip_taskprocessor_overload_trigger ast_sip_get_taskprocessor_overload_trigger(void) +{ + enum ast_sip_taskprocessor_overload_trigger trigger; + struct global_config *cfg; + + cfg = get_global_cfg(); + if (!cfg) { + return DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER; + } + + trigger = cfg->overload_trigger; + ao2_ref(cfg, -1); + return trigger; +} + +static int overload_trigger_handler(const struct aco_option *opt, + struct ast_variable *var, void *obj) +{ + struct global_config *cfg = obj; + if (!strcasecmp(var->value, "none")) { + cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_NONE; + } else if (!strcasecmp(var->value, "global")) { + cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL; + } else if (!strcasecmp(var->value, "pjsip_only")) { + cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY; + } else { + ast_log(LOG_WARNING, "Unknown overload trigger '%s' specified for %s\n", + var->value, var->name); + return -1; + } + return 0; +} + +static const char *overload_trigger_map[] = { + [TASKPROCESSOR_OVERLOAD_TRIGGER_NONE] = "none", + [TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL] = "global", + [TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY] = "pjsip_only" +}; + +const char *ast_sip_overload_trigger_to_str(enum ast_sip_taskprocessor_overload_trigger trigger) +{ + return ARRAY_IN_BOUNDS(trigger, overload_trigger_map) ? + overload_trigger_map[trigger] : ""; +} + +static int overload_trigger_to_str(const void *obj, const intptr_t *args, char **buf) +{ + const struct global_config *cfg = obj; + *buf = ast_strdup(ast_sip_overload_trigger_to_str(cfg->overload_trigger)); + return 0; +} + /*! * \internal * \brief Observer to set default global object if none exist. @@ -646,6 +701,9 @@ int ast_sip_initialize_sorcery_global(void) ast_sorcery_object_field_register(sorcery, "global", "send_contact_status_on_update_registration", DEFAULT_SEND_CONTACT_STATUS_ON_UPDATE_REGISTRATION ? "yes" : "no", OPT_YESNO_T, 1, FLDSET(struct global_config, send_contact_status_on_update_registration)); + ast_sorcery_object_field_register_custom(sorcery, "global", "taskprocessor_overload_trigger", + overload_trigger_map[DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER], + overload_trigger_handler, overload_trigger_to_str, NULL, 0, 0); if (ast_sorcery_instance_observer_add(sorcery, &observer_callbacks_global)) { return -1; diff --git a/res/res_pjsip/include/res_pjsip_private.h b/res/res_pjsip/include/res_pjsip_private.h index 7af5b27bb8..f6333bf3a0 100644 --- a/res/res_pjsip/include/res_pjsip_private.h +++ b/res/res_pjsip/include/res_pjsip_private.h @@ -408,4 +408,14 @@ void ast_sip_destroy_transport_management(void); */ int ast_sip_persistent_endpoint_add_to_regcontext(const char *regcontext); +enum ast_sip_taskprocessor_overload_trigger { + TASKPROCESSOR_OVERLOAD_TRIGGER_NONE = 0, + TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL, + TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY +}; + +enum ast_sip_taskprocessor_overload_trigger ast_sip_get_taskprocessor_overload_trigger(void); + +const char *ast_sip_overload_trigger_to_str(enum ast_sip_taskprocessor_overload_trigger trigger); + #endif /* RES_PJSIP_PRIVATE_H_ */ diff --git a/res/res_pjsip/pjsip_distributor.c b/res/res_pjsip/pjsip_distributor.c index d356e3794b..72ed35b35e 100644 --- a/res/res_pjsip/pjsip_distributor.c +++ b/res/res_pjsip/pjsip_distributor.c @@ -51,6 +51,7 @@ static unsigned int unidentified_count; static unsigned int unidentified_period; static unsigned int unidentified_prune_interval; static int using_auth_username; +static enum ast_sip_taskprocessor_overload_trigger overload_trigger; struct unidentified_request{ struct timeval first_seen; @@ -534,7 +535,10 @@ static pj_bool_t distributor(pjsip_rx_data *rdata) ao2_cleanup(dist); return PJ_TRUE; } else { - if (ast_taskprocessor_alert_get()) { + if ((overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL && + ast_taskprocessor_alert_get()) + || (overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY && + ast_taskprocessor_get_subsystem_alert("pjsip"))) { /* * When taskprocessors get backed up, there is a good chance that * we are being overloaded and need to defer adding new work to @@ -1196,6 +1200,8 @@ static void global_loaded(const char *object_type) ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval); + overload_trigger = ast_sip_get_taskprocessor_overload_trigger(); + /* Clean out the old task, if any */ ast_sched_clean_by_callback(prune_context, prune_task, clean_task); /* Have to do something with the return value to shut up the stupid compiler. */ diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c index 642874625c..70cb556384 100644 --- a/tests/test_taskprocessor.c +++ b/tests/test_taskprocessor.c @@ -46,6 +46,8 @@ struct task_data { ast_mutex_t lock; /*! Boolean indicating that the task was run */ int task_complete; + /*! Milliseconds to wait before returning */ + unsigned long wait_time; }; static void task_data_dtor(void *obj) @@ -69,6 +71,7 @@ static struct task_data *task_data_create(void) ast_cond_init(&task_data->cond, NULL); ast_mutex_init(&task_data->lock); task_data->task_complete = 0; + task_data->wait_time = 0; return task_data; } @@ -83,7 +86,11 @@ static struct task_data *task_data_create(void) static int task(void *data) { struct task_data *task_data = data; + SCOPED_MUTEX(lock, &task_data->lock); + if (task_data->wait_time > 0) { + usleep(task_data->wait_time * 1000); + } task_data->task_complete = 1; ast_cond_signal(&task_data->cond); return 0; @@ -165,6 +172,143 @@ AST_TEST_DEFINE(default_taskprocessor) return AST_TEST_PASS; } +/*! + * \brief Baseline test for subsystem alert + */ +AST_TEST_DEFINE(subsystem_alert) +{ + RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference); +#define TEST_DATA_ARRAY_SIZE 10 +#define LOW_WATER_MARK 3 +#define HIGH_WATER_MARK 6 + struct task_data *task_data[(TEST_DATA_ARRAY_SIZE + 1)] = { 0 }; + int res; + int i; + long queue_count; + unsigned int alert_level; + unsigned int subsystem_alert_level; + + switch (cmd) { + case TEST_INIT: + info->name = "subsystem_alert"; + info->category = "/main/taskprocessor/"; + info->summary = "Test of subsystem alerts"; + info->description = + "Ensures alerts are generated properly."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + tps = ast_taskprocessor_get("test_subsystem/test", TPS_REF_DEFAULT); + + if (!tps) { + ast_test_status_update(test, "Unable to create test taskprocessor\n"); + return AST_TEST_FAIL; + } + + ast_taskprocessor_alert_set_levels(tps, LOW_WATER_MARK, HIGH_WATER_MARK); + ast_taskprocessor_suspend(tps); + + for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) { + task_data[i] = task_data_create(); + if (!task_data[i]) { + ast_test_status_update(test, "Unable to create task_data\n"); + res = -1; + goto data_cleanup; + } + task_data[i]->wait_time = 500; + + ast_test_status_update(test, "Pushing task %d\n", i); + if (ast_taskprocessor_push(tps, task, task_data[i])) { + ast_test_status_update(test, "Failed to queue task\n"); + res = -1; + goto data_cleanup; + } + + queue_count = ast_taskprocessor_size(tps); + alert_level = ast_taskprocessor_alert_get(); + subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem"); + + if (queue_count == HIGH_WATER_MARK) { + if (subsystem_alert_level) { + ast_test_status_update(test, "Subsystem alert triggered correctly at %ld\n", queue_count); + } + if (alert_level) { + ast_test_status_update(test, "Global alert triggered correctly at %ld\n", queue_count); + } + } else if (queue_count < HIGH_WATER_MARK) { + if (subsystem_alert_level > 0) { + ast_test_status_update(test, "Subsystem alert triggered unexpectedly at %ld\n", queue_count); + res = -1; + } + if (alert_level > 0) { + ast_test_status_update(test, "Global alert triggered unexpectedly at %ld\n", queue_count); + res = -1; + } + } else { + if (subsystem_alert_level == 0) { + ast_test_status_update(test, "Subsystem alert failed to trigger at %ld\n", queue_count); + res = -1; + } + if (alert_level == 0) { + ast_test_status_update(test, "Global alert failed to trigger at %ld\n", queue_count); + res = -1; + } + } + } + + ast_taskprocessor_unsuspend(tps); + + for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) { + ast_test_status_update(test, "Waiting on task %d\n", i); + if (task_wait(task_data[i])) { + ast_test_status_update(test, "Queued task '%d' did not execute!\n", i); + res = -1; + goto data_cleanup; + } + + queue_count = ast_taskprocessor_size(tps); + alert_level = ast_taskprocessor_alert_get(); + subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem"); + + if (queue_count == LOW_WATER_MARK) { + if (!subsystem_alert_level) { + ast_test_status_update(test, "Subsystem alert cleared correctly at %ld\n", queue_count); + } + if (!alert_level) { + ast_test_status_update(test, "Global alert cleared correctly at %ld\n", queue_count); + } + } else if (queue_count > LOW_WATER_MARK) { + if (subsystem_alert_level == 0) { + ast_test_status_update(test, "Subsystem alert cleared unexpectedly at %ld\n", queue_count); + res = -1; + } + if (alert_level == 0) { + ast_test_status_update(test, "Global alert cleared unexpectedly at %ld\n", queue_count); + res = -1; + } + } else { + if (subsystem_alert_level > 0) { + ast_test_status_update(test, "Subsystem alert failed to clear at %ld\n", queue_count); + res = -1; + } + if (alert_level > 0) { + ast_test_status_update(test, "Global alert failed to clear at %ld\n", queue_count); + res = -1; + } + } + + } + +data_cleanup: + for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) { + ao2_cleanup(task_data[i]); + } + + return res ? AST_TEST_FAIL : AST_TEST_PASS; +} + #define NUM_TASKS 20000 /*! @@ -749,6 +893,7 @@ static int unload_module(void) { ast_test_unregister(default_taskprocessor); ast_test_unregister(default_taskprocessor_load); + ast_test_unregister(subsystem_alert); ast_test_unregister(taskprocessor_listener); ast_test_unregister(taskprocessor_shutdown); ast_test_unregister(taskprocessor_push_local); @@ -759,6 +904,7 @@ static int load_module(void) { ast_test_register(default_taskprocessor); ast_test_register(default_taskprocessor_load); + ast_test_register(subsystem_alert); ast_test_register(taskprocessor_listener); ast_test_register(taskprocessor_shutdown); ast_test_register(taskprocessor_push_local); -- GitLab