Skip to content
Snippets Groups Projects
  • Kevin Harwell's avatar
    c0efe19c
    serializer: move/add asterisk serializer pool functionality · c0efe19c
    Kevin Harwell authored
    Serializer pools have previously existed in Asterisk. However, for the most
    part the code has been duplicated across modules. This patch abstracts the
    code into an 'ast_serializer_pool' object. As well the code is now centralized
    in serializer.c/h.
    
    In addition serializer pools can now optionally be monitored by a shutdown
    group. This will prevent the pool from being destroyed until all serializers
    have completed.
    
    Change-Id: Ib1e906144b90ffd4d5ed9826f0b719ca9c6d2971
    c0efe19c
    History
    serializer: move/add asterisk serializer pool functionality
    Kevin Harwell authored
    Serializer pools have previously existed in Asterisk. However, for the most
    part the code has been duplicated across modules. This patch abstracts the
    code into an 'ast_serializer_pool' object. As well the code is now centralized
    in serializer.c/h.
    
    In addition serializer pools can now optionally be monitored by a shutdown
    group. This will prevent the pool from being destroyed until all serializers
    have completed.
    
    Change-Id: Ib1e906144b90ffd4d5ed9826f0b719ca9c6d2971
serializer.c 5.07 KiB
/*
 * Asterisk -- An open source telephony toolkit.
 *
 * Copyright (C) 2019, Sangoma Technologies Corporation
 *
 * Kevin Harwell <kharwell@digium.com>
 *
 * See http://www.asterisk.org for more information about
 * the Asterisk project. Please do not directly contact
 * any of the maintainers of this project for assistance;
 * the project provides a web site, mailing lists and IRC
 * channels for your use.
 *
 * This program is free software, distributed under the terms of
 * the GNU General Public License Version 2. See the LICENSE file
 * at the top of the source tree.
 */

#include "asterisk.h"

#include "asterisk/astobj2.h"
#include "asterisk/serializer.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/threadpool.h"
#include "asterisk/utils.h"
#include "asterisk/vector.h"

struct ast_serializer_pool {
	/*! Shutdown group to monitor serializers. */
	struct ast_serializer_shutdown_group *shutdown_group;
	/*! Time to wait if using a shutdown group. */
	int shutdown_group_timeout;
	/*! A pool of taskprocessor(s) */
	AST_VECTOR_RW(, struct ast_taskprocessor *) serializers;
	/*! Base name for the pool */
	char name[];
};

int ast_serializer_pool_destroy(struct ast_serializer_pool *pool)
{
	if (!pool) {
		return 0;
	}

	/* Clear out the serializers */
	AST_VECTOR_RW_WRLOCK(&pool->serializers);
	AST_VECTOR_RESET(&pool->serializers, ast_taskprocessor_unreference);
	AST_VECTOR_RW_UNLOCK(&pool->serializers);

	/* If using a shutdown group then wait for all threads to complete */
	if (pool->shutdown_group) {
		int remaining;

		ast_debug(3, "Waiting on serializers before destroying pool '%s'\n", pool->name);

		remaining = ast_serializer_shutdown_group_join(
			pool->shutdown_group, pool->shutdown_group_timeout);

		if (remaining) {
			/* If we've timed out don't fully cleanup yet */
			ast_log(LOG_WARNING, "'%s' serializer pool destruction timeout. "
				"'%d' dependencies still processing.\n", pool->name, remaining);
			return remaining;
		}

		ao2_ref(pool->shutdown_group, -1);
		pool->shutdown_group = NULL;
	}

	AST_VECTOR_RW_FREE(&pool->serializers);
	ast_free(pool);

	return 0;
}

struct ast_serializer_pool *ast_serializer_pool_create(const char *name,
	unsigned int size, struct ast_threadpool *threadpool, int timeout)
{
	struct ast_serializer_pool *pool;
	char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
	size_t idx;

	ast_assert(size > 0);

	pool = ast_malloc(sizeof(*pool) + strlen(name) + 1);
	if (!pool) {
		return NULL;
	}

	strcpy(pool->name, name); /* safe */

	pool->shutdown_group_timeout = timeout;
	pool->shutdown_group = timeout > -1 ? ast_serializer_shutdown_group_alloc() : NULL;

	AST_VECTOR_RW_INIT(&pool->serializers, size);

	for (idx = 0; idx < size; ++idx) {
		struct ast_taskprocessor *tps;

		/* Create name with seq number appended. */
		ast_taskprocessor_name_append(tps_name, sizeof(tps_name), name);

		tps = ast_threadpool_serializer_group(tps_name, threadpool, pool->shutdown_group);
		if (!tps) {
			ast_serializer_pool_destroy(pool);
			ast_log(LOG_ERROR, "Pool create: unable to create named serializer '%s'\n",
					tps_name);
			return NULL;
		}

		if (AST_VECTOR_APPEND(&pool->serializers, tps)) {
			ast_serializer_pool_destroy(pool);
			ast_log(LOG_ERROR, "Pool create: unable to append named serializer '%s'\n",
					tps_name);
			return NULL;
		}
	}

	return pool;
}

const char *ast_serializer_pool_name(const struct ast_serializer_pool *pool)
{
	return pool->name;
}

struct ast_taskprocessor *ast_serializer_pool_get(struct ast_serializer_pool *pool)
{
	struct ast_taskprocessor *res;
	size_t idx;

	if (!pool) {
		return NULL;
	}

	AST_VECTOR_RW_RDLOCK(&pool->serializers);
	if (AST_VECTOR_SIZE(&pool->serializers) == 0) {
		AST_VECTOR_RW_UNLOCK(&pool->serializers);
		return NULL;
	}

	res = AST_VECTOR_GET(&pool->serializers, 0);

	/* Choose the taskprocessor with the smallest queue */
	for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
		struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
		if (ast_taskprocessor_size(cur) < ast_taskprocessor_size(res)) {
			res = cur;
		}
	}

	AST_VECTOR_RW_UNLOCK(&pool->serializers);
	return res;
}

int ast_serializer_pool_set_alerts(struct ast_serializer_pool *pool, long high, long low)
{
	size_t idx;
	long tps_queue_high;
	long tps_queue_low;

	if (!pool) {
		return 0;
	}

	tps_queue_high = high;
	if (tps_queue_high <= 0) {
		ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor high water alert "
				"trigger level '%ld'\n", pool->name, tps_queue_high);
		tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
	}

	tps_queue_low = low;
	if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
		ast_log(AST_LOG_WARNING, "Invalid '%s-*' taskprocessor low water clear alert "
				"level '%ld'\n", pool->name, tps_queue_low);
		tps_queue_low = -1;
	}

	for (idx = 1; idx < AST_VECTOR_SIZE(&pool->serializers); ++idx) {
		struct ast_taskprocessor *cur = AST_VECTOR_GET(&pool->serializers, idx);
		if (ast_taskprocessor_alert_set_levels(cur, tps_queue_low, tps_queue_high)) {
			ast_log(AST_LOG_WARNING, "Failed to set alert levels for serializer '%s'.\n",
					ast_taskprocessor_name(cur));
		}
	}

	return 0;
}