diff --git a/include/asterisk/serializer.h b/include/asterisk/serializer.h new file mode 100644 index 0000000000000000000000000000000000000000..1a1eb83bb3eaf76bf4216fd14cfb3dae06840c6f --- /dev/null +++ b/include/asterisk/serializer.h @@ -0,0 +1,85 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2019, Sangoma Technologies Corporation + * + * Kevin Harwell <kharwell@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#ifndef _AST_SERIALIZER_H +#define _AST_SERIALIZER_H + +struct ast_threadpool; + +/*! + * Maintains a named pool of thread pooled taskprocessors. Also if configured + * a shutdown group can be enabled that will ensure all serializers have + * completed any assigned task before destruction. + */ +struct ast_serializer_pool; + +/*! + * \brief Destroy the serializer pool. + * + * Attempt to destroy the serializer pool. If a shutdown group has been enabled, + * and times out waiting for threads to complete, then this function will return + * the number of remaining threads, and the pool will not be destroyed. + * + * \param pool The pool to destroy + */ +int ast_serializer_pool_destroy(struct ast_serializer_pool *pool); + +/*! + * \brief Create a serializer pool. + * + * Create a serializer pool with an optional shutdown group. If a timeout greater + * than -1 is specified then a shutdown group is enabled on the pool. + * + * \param name The base name for the pool, and used when building taskprocessor(s) + * \param size The size of the pool + * \param threadpool The backing threadpool to use + * \param timeout The timeout used if using a shutdown group (-1 = disabled) + * + * \retval A newly allocated serializer pool object, or NULL on error + */ +struct ast_serializer_pool *ast_serializer_pool_create(const char *name, + unsigned int size, struct ast_threadpool *threadpool, int timeout); + +/*! + * \brief Retrieve the base name of the serializer pool. + * + * \param pool The pool object + * + * \retval The base name given to the pool + */ +const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool); + +/*! + * \brief Retrieve a serializer from the pool. + * + * \param pool The pool object + * + * \retval A serializer/taskprocessor + */ +struct ast_taskprocessor *ast_serializer_pool_get(struct ast_serializer_pool *pool); + +/*! + * \brief Set taskprocessor alert levels for the serializers in the pool. + * + * \param pool The pool to destroy + * + * \retval 0 on success, or -1 on error. + */ +int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low); + +#endif /* _AST_SERIALIZER_H */ diff --git a/include/asterisk/taskprocessor.h b/include/asterisk/taskprocessor.h index 2f49e472797cd7aff82a0d2c88e4eeecdb3bb300..5145565c6d7031a065a08253095b51a853393408 100644 --- a/include/asterisk/taskprocessor.h +++ b/include/asterisk/taskprocessor.h @@ -304,6 +304,15 @@ int ast_taskprocessor_is_task(struct ast_taskprocessor *tps); */ unsigned int ast_taskprocessor_seq_num(void); +/*! + * \brief Append the next sequence number to the given string, and copy into the buffer. + * + * \param buf Where to copy the appended taskprocessor name. + * \param size How large is buf including null terminator. + * \param name A name to append the sequence number to. + */ +void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name); + /*! * \brief Build a taskprocessor name with a sequence number on the end. * \since 13.8.0 diff --git a/main/serializer.c b/main/serializer.c new file mode 100644 index 0000000000000000000000000000000000000000..280ada03b8dc98da50cdd760d22df072bc53779d --- /dev/null +++ b/main/serializer.c @@ -0,0 +1,189 @@ +/* + * Asterisk -- An open source telephony toolkit. + * + * Copyright (C) 2019, Sangoma Technologies Corporation + * + * Kevin Harwell <kharwell@digium.com> + * + * See http://www.asterisk.org for more information about + * the Asterisk project. Please do not directly contact + * any of the maintainers of this project for assistance; + * the project provides a web site, mailing lists and IRC + * channels for your use. + * + * This program is free software, distributed under the terms of + * the GNU General Public License Version 2. See the LICENSE file + * at the top of the source tree. + */ + +#include "asterisk.h" + +#include "asterisk/astobj2.h" +#include "asterisk/serializer.h" +#include "asterisk/taskprocessor.h" +#include "asterisk/threadpool.h" +#include "asterisk/utils.h" +#include "asterisk/vector.h" + +struct ast_serializer_pool { + /*! Shutdown group to monitor serializers. */ + struct ast_serializer_shutdown_group *shutdown_group; + /*! Time to wait if using a shutdown group. */ + int shutdown_group_timeout; + /*! A pool of taskprocessor(s) */ + AST_VECTOR_RW(, struct ast_taskprocessor *) serializers; + /*! Base name for the pool */ + char name[]; +}; + +int ast_serializer_pool_destroy(struct ast_serializer_pool *pool) +{ + if (!pool) { + return 0; + } + + /* Clear out the serializers */ + AST_VECTOR_RW_WRLOCK(&pool->serializers); + AST_VECTOR_RESET(&pool->serializers, ast_taskprocessor_unreference); + AST_VECTOR_RW_UNLOCK(&pool->serializers); + + /* If using a shutdown group then wait for all threads to complete */ + if (pool->shutdown_group) { + int remaining; + + ast_debug(3, "Waiting on serializers before destroying pool '%s'\n", pool->name); + + remaining = ast_serializer_shutdown_group_join( + pool->shutdown_group, pool->shutdown_group_timeout); + + if (remaining) { + /* If we've timed out don't fully cleanup yet */ + ast_log(LOG_WARNING, "'%s' serializer pool destruction timeout. " + "'%d' dependencies still processing.\n", pool->name, remaining); + return remaining; + } + + ao2_ref(pool->shutdown_group, -1); + pool->shutdown_group = NULL; + } + + AST_VECTOR_RW_FREE(&pool->serializers); + ast_free(pool); + + return 0; +} + +struct ast_serializer_pool *ast_serializer_pool_create(const char *name, + unsigned int size, struct ast_threadpool *threadpool, int timeout) +{ + struct ast_serializer_pool *pool; + char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; + size_t idx; + + ast_assert(size > 0); + + pool = ast_malloc(sizeof(*pool) + strlen(name) + 1); + if (!pool) { + return NULL; + } + + strcpy(pool->name, name); /* safe */ + + pool->shutdown_group_timeout = timeout; + pool->shutdown_group = timeout > -1 ? ast_serializer_shutdown_group_alloc() : NULL; + + AST_VECTOR_RW_INIT(&pool->serializers, size); + + for (idx = 0; idx < size; ++idx) { + struct ast_taskprocessor *tps; + + /* Create name with seq number appended. */ + ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name); + + tps = ast_threadpool_serializer_group(tps_name, threadpool, pool->shutdown_group); + if (!tps) { + ast_serializer_pool_destroy(pool); + ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n", + tps_name); + return NULL; + } + + if (AST_VECTOR_APPEND(&pool->serializers, tps)) { + ast_serializer_pool_destroy(pool); + ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n", + tps_name); + return NULL; + } + } + + return pool; +} + +const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool) +{ + return pool->name; +} + +struct ast_taskprocessor *ast_serializer_pool_get(struct ast_serializer_pool *pool) +{ + struct ast_taskprocessor *res; + size_t idx; + + if (!pool) { + return NULL; + } + + AST_VECTOR_RW_RDLOCK(&pool->serializers); + if (AST_VECTOR_SIZE(&pool->serializers) == 0) { + AST_VECTOR_RW_UNLOCK(&pool->serializers); + return NULL; + } + + res = AST_VECTOR_GET(&pool->serializers, 0); + + /* Choose the taskprocessor with the smallest queue */ + for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) { + struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx); + if (ast_taskprocessor_size(cur) < ast_taskprocessor_size(res)) { + res = cur; + } + } + + AST_VECTOR_RW_UNLOCK(&pool->serializers); + return res; +} + +int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low) +{ + size_t idx; + long tps_queue_high; + long tps_queue_low; + + if (!pool) { + return 0; + } + + tps_queue_high = high; + if (tps_queue_high <= 0) { + ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor high water alert " + "trigger level '%ld'\n", pool->name, tps_queue_high); + tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL; + } + + tps_queue_low = low; + if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) { + ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor low water clear alert " + "level '%ld'\n", pool->name, tps_queue_low); + tps_queue_low = -1; + } + + for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) { + struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx); + if (ast_taskprocessor_alert_set_levels(cur, tps_queue_low, tps_queue_high)) { + ast_log(AST_LOG_WARNING, "Failed to set alert levels for serializer '%s'.\n", + ast_taskprocessor_name(cur)); + } + } + + return 0; +} diff --git a/main/taskprocessor.c b/main/taskprocessor.c index 47d75d34fa04e2bd68629cf06e875e40b0c587b2..52cc5e0e0cf4380c66cf169cc9b202233848bb69 100644 --- a/main/taskprocessor.c +++ b/main/taskprocessor.c @@ -1280,11 +1280,22 @@ unsigned int ast_taskprocessor_seq_num(void) return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1); } +#define SEQ_STR_SIZE (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */ + +void ast_taskprocessor_name_append(char *buf, unsigned int size, const char *name) +{ + int final_size = strlen(name) + SEQ_STR_SIZE; + + ast_assert(buf != NULL && name != NULL); + ast_assert(final_size <= size); + + snprintf(buf, final_size, "%s-%08x", name, ast_taskprocessor_seq_num()); +} + void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format, ...) { va_list ap; int user_size; -#define SEQ_STR_SIZE (1 + 8 + 1) /* Dash plus 8 hex digits plus null terminator */ ast_assert(buf != NULL); ast_assert(SEQ_STR_SIZE <= size); diff --git a/tests/test_taskprocessor.c b/tests/test_taskprocessor.c index 70cb5563848911e999331d8c4c6e85f8fffda400..031151cf22478de9c03a9ac51ca5aa56ccbb6698 100644 --- a/tests/test_taskprocessor.c +++ b/tests/test_taskprocessor.c @@ -35,6 +35,8 @@ #include "asterisk/taskprocessor.h" #include "asterisk/module.h" #include "asterisk/astobj2.h" +#include "asterisk/serializer.h" +#include "asterisk/threadpool.h" /*! * \brief userdata associated with baseline taskprocessor test @@ -889,6 +891,78 @@ AST_TEST_DEFINE(taskprocessor_push_local) return AST_TEST_PASS; } +/*! + * \brief Baseline test for a serializer pool + * + * This test ensures that when a task is added to a taskprocessor that + * has been allocated with a default listener that the task gets executed + * as expected + */ +AST_TEST_DEFINE(serializer_pool) +{ + RAII_VAR(struct ast_threadpool *, threadpool, NULL, ast_threadpool_shutdown); + RAII_VAR(struct ast_serializer_pool *, serializer_pool, NULL, ast_serializer_pool_destroy); + RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup); + struct ast_threadpool_options options = { + .version = AST_THREADPOOL_OPTIONS_VERSION, + .idle_timeout = 0, + .auto_increment = 0, + .initial_size = 1, + .max_size = 0, + }; + /* struct ast_taskprocessor *tps; */ + + switch (cmd) { + case TEST_INIT: + info->name = "serializer_pool"; + info->category = "/main/taskprocessor/"; + info->summary = "Test using a serializer pool"; + info->description = + "Ensures that a queued task gets executed."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } + + ast_test_validate(test, threadpool = ast_threadpool_create("test", NULL, &options)); + ast_test_validate(test, serializer_pool = ast_serializer_pool_create( + "test/test", 5, threadpool, 2)); /* 2 second shutdown group time out */ + ast_test_validate(test, !strcmp(ast_serializer_pool_name(serializer_pool), "test/test")); + ast_test_validate(test, !ast_serializer_pool_set_alerts(serializer_pool, 5, 0)); + ast_test_validate(test, task_data = task_data_create()); + + task_data->wait_time = 4000; /* task takes 4 seconds */ + ast_test_validate(test, !ast_taskprocessor_push( + ast_serializer_pool_get(serializer_pool), task, task_data)); + + if (!ast_serializer_pool_destroy(serializer_pool)) { + ast_test_status_update(test, "Unexpected pool destruction!\n"); + /* + * The pool should have timed out, so if it destruction reports success + * we need to fail. + */ + serializer_pool = NULL; + return AST_TEST_FAIL; + } + + ast_test_validate(test, !task_wait(task_data)); + + /* The first attempt should have failed. Second try should destroy successfully */ + if (ast_serializer_pool_destroy(serializer_pool)) { + ast_test_status_update(test, "Unable to destroy serializer pool in allotted time!\n"); + /* + * If this fails we'll try again on return to hopefully avoid a memory leak. + * If it again times out a third time, well not much we can do. + */ + return AST_TEST_FAIL; + } + + /* Test passed, so set pool to NULL to avoid "re-running" destroy */ + serializer_pool = NULL; + + return AST_TEST_PASS; +} + static int unload_module(void) { ast_test_unregister(default_taskprocessor); @@ -897,6 +971,7 @@ static int unload_module(void) ast_test_unregister(taskprocessor_listener); ast_test_unregister(taskprocessor_shutdown); ast_test_unregister(taskprocessor_push_local); + ast_test_unregister(serializer_pool); return 0; } @@ -908,6 +983,7 @@ static int load_module(void) ast_test_register(taskprocessor_listener); ast_test_register(taskprocessor_shutdown); ast_test_register(taskprocessor_push_local); + ast_test_register(serializer_pool); return AST_MODULE_LOAD_SUCCESS; }