diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index 24706c9a563fcdc949dfe8d96a77b87bf594f07d..1f9276b4116318faa397e57c572ec12a62ecc4be 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -1112,6 +1112,23 @@ struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void); */ struct ast_taskprocessor *ast_sip_create_serializer(void); +struct ast_serializer_shutdown_group; + +/*! + * \brief Create a new serializer for SIP tasks + * \since 13.5.0 + * + * See \ref ast_threadpool_serializer for more information on serializers. + * SIP creates serializers so that tasks operating on similar data will run + * in sequence. + * + * \param shutdown_group Group shutdown controller. (NULL if no group association) + * + * \retval NULL Failure + * \retval non-NULL Newly-created serializer + */ +struct ast_taskprocessor *ast_sip_create_serializer_group(struct ast_serializer_shutdown_group *shutdown_group); + /*! * \brief Set a serializer on a SIP dialog so requests and responses are automatically serialized * diff --git a/include/asterisk/threadpool.h b/include/asterisk/threadpool.h index e1e7727f5f86bc7649a16df4a760f8d1b41d2935..942d14fc1e3bd5d0f12dee37515909cfd52220d4 100644 --- a/include/asterisk/threadpool.h +++ b/include/asterisk/threadpool.h @@ -195,6 +195,28 @@ int ast_threadpool_push(struct ast_threadpool *pool, int (*task)(void *data), vo */ void ast_threadpool_shutdown(struct ast_threadpool *pool); +struct ast_serializer_shutdown_group; + +/*! + * \brief Create a serializer group shutdown control object. + * \since 13.5.0 + * + * \return ao2 object to control shutdown of a serializer group. + */ +struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void); + +/*! + * \brief Wait for the serializers in the group to shutdown with timeout. + * \since 13.5.0 + * + * \param shutdown_group Group shutdown controller. (Returns 0 immediately if NULL) + * \param timeout Number of seconds to wait for the serializers in the group to shutdown. + * Zero if the timeout is disabled. + * + * \return Number of seriaizers that did not get shutdown within the timeout. + */ +int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout); + /*! * \brief Serialized execution of tasks within a \ref ast_threadpool. * @@ -218,9 +240,40 @@ void ast_threadpool_shutdown(struct ast_threadpool *pool); * * \param name Name of the serializer. (must be unique) * \param pool \ref ast_threadpool for execution. + * * \return \ref ast_taskprocessor for enqueuing work. * \return \c NULL on error. */ struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool); +/*! + * \brief Serialized execution of tasks within a \ref ast_threadpool. + * \since 13.5.0 + * + * A \ref ast_taskprocessor with the same contract as a default taskprocessor + * (tasks execute serially) except instead of executing out of a dedicated + * thread, execution occurs in a thread from a \ref ast_threadpool. Think of it + * as a lightweight thread. + * + * While it guarantees that each task will complete before executing the next, + * there is no guarantee as to which thread from the \c pool individual tasks + * will execute. This normally only matters if your code relys on thread + * specific information, such as thread locals. + * + * Use ast_taskprocessor_unreference() to dispose of the returned \ref + * ast_taskprocessor. + * + * Only a single taskprocessor with a given name may exist. This function will fail + * if a taskprocessor with the given name already exists. + * + * \param name Name of the serializer. (must be unique) + * \param pool \ref ast_threadpool for execution. + * \param shutdown_group Group shutdown controller. (NULL if no group association) + * + * \return \ref ast_taskprocessor for enqueuing work. + * \return \c NULL on error. + */ +struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name, + struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group); + #endif /* ASTERISK_THREADPOOL_H */ diff --git a/main/threadpool.c b/main/threadpool.c index 597e83e10616af033007ec2be8df5a0f13ddf399..479938959e0399805ec7613dc0a31a4adc06e467 100644 --- a/main/threadpool.c +++ b/main/threadpool.c @@ -1126,18 +1126,126 @@ static void worker_set_state(struct worker_thread *worker, enum worker_state sta ast_cond_signal(&worker->cond); } +/*! Serializer group shutdown control object. */ +struct ast_serializer_shutdown_group { + /*! Shutdown thread waits on this conditional. */ + ast_cond_t cond; + /*! Count of serializers needing to shutdown. */ + int count; +}; + +static void serializer_shutdown_group_dtor(void *vdoomed) +{ + struct ast_serializer_shutdown_group *doomed = vdoomed; + + ast_cond_destroy(&doomed->cond); +} + +struct ast_serializer_shutdown_group *ast_serializer_shutdown_group_alloc(void) +{ + struct ast_serializer_shutdown_group *shutdown_group; + + shutdown_group = ao2_alloc(sizeof(*shutdown_group), serializer_shutdown_group_dtor); + if (!shutdown_group) { + return NULL; + } + ast_cond_init(&shutdown_group->cond, NULL); + return shutdown_group; +} + +int ast_serializer_shutdown_group_join(struct ast_serializer_shutdown_group *shutdown_group, int timeout) +{ + int remaining; + ast_mutex_t *lock; + + if (!shutdown_group) { + return 0; + } + + lock = ao2_object_get_lockaddr(shutdown_group); + ast_assert(lock != NULL); + + ao2_lock(shutdown_group); + if (timeout) { + struct timeval start; + struct timespec end; + + start = ast_tvnow(); + end.tv_sec = start.tv_sec + timeout; + end.tv_nsec = start.tv_usec * 1000; + while (shutdown_group->count) { + if (ast_cond_timedwait(&shutdown_group->cond, lock, &end)) { + /* Error or timed out waiting for the count to reach zero. */ + break; + } + } + } else { + while (shutdown_group->count) { + if (ast_cond_wait(&shutdown_group->cond, lock)) { + /* Error */ + break; + } + } + } + remaining = shutdown_group->count; + ao2_unlock(shutdown_group); + return remaining; +} + +/*! + * \internal + * \brief Increment the number of serializer members in the group. + * \since 13.5.0 + * + * \param shutdown_group Group shutdown controller. + * + * \return Nothing + */ +static void serializer_shutdown_group_inc(struct ast_serializer_shutdown_group *shutdown_group) +{ + ao2_lock(shutdown_group); + ++shutdown_group->count; + ao2_unlock(shutdown_group); +} + +/*! + * \internal + * \brief Decrement the number of serializer members in the group. + * \since 13.5.0 + * + * \param shutdown_group Group shutdown controller. + * + * \return Nothing + */ +static void serializer_shutdown_group_dec(struct ast_serializer_shutdown_group *shutdown_group) +{ + ao2_lock(shutdown_group); + --shutdown_group->count; + if (!shutdown_group->count) { + ast_cond_signal(&shutdown_group->cond); + } + ao2_unlock(shutdown_group); +} + struct serializer { + /*! Threadpool the serializer will use to process the jobs. */ struct ast_threadpool *pool; + /*! Which group will wait for this serializer to shutdown. */ + struct ast_serializer_shutdown_group *shutdown_group; }; static void serializer_dtor(void *obj) { struct serializer *ser = obj; + ao2_cleanup(ser->pool); ser->pool = NULL; + ao2_cleanup(ser->shutdown_group); + ser->shutdown_group = NULL; } -static struct serializer *serializer_create(struct ast_threadpool *pool) +static struct serializer *serializer_create(struct ast_threadpool *pool, + struct ast_serializer_shutdown_group *shutdown_group) { struct serializer *ser; @@ -1147,6 +1255,7 @@ static struct serializer *serializer_create(struct ast_threadpool *pool) } ao2_ref(pool, +1); ser->pool = pool; + ser->shutdown_group = ao2_bump(shutdown_group); return ser; } @@ -1183,6 +1292,10 @@ static int serializer_start(struct ast_taskprocessor_listener *listener) static void serializer_shutdown(struct ast_taskprocessor_listener *listener) { struct serializer *ser = ast_taskprocessor_listener_get_user_data(listener); + + if (ser->shutdown_group) { + serializer_shutdown_group_dec(ser->shutdown_group); + } ao2_cleanup(ser); } @@ -1192,27 +1305,35 @@ static struct ast_taskprocessor_listener_callbacks serializer_tps_listener_callb .shutdown = serializer_shutdown, }; -struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool) +struct ast_taskprocessor *ast_threadpool_serializer_group(const char *name, + struct ast_threadpool *pool, struct ast_serializer_shutdown_group *shutdown_group) { - RAII_VAR(struct serializer *, ser, NULL, ao2_cleanup); - RAII_VAR(struct ast_taskprocessor_listener *, listener, NULL, ao2_cleanup); - struct ast_taskprocessor *tps = NULL; + struct serializer *ser; + struct ast_taskprocessor_listener *listener; + struct ast_taskprocessor *tps; - ser = serializer_create(pool); + ser = serializer_create(pool, shutdown_group); if (!ser) { return NULL; } listener = ast_taskprocessor_listener_alloc(&serializer_tps_listener_callbacks, ser); if (!listener) { + ao2_ref(ser, -1); return NULL; } - ser = NULL; /* ownership transferred to listener */ + /* ser ref transferred to listener */ tps = ast_taskprocessor_create_with_listener(name, listener); - if (!tps) { - return NULL; + if (tps && shutdown_group) { + serializer_shutdown_group_inc(shutdown_group); } + ao2_ref(listener, -1); return tps; } + +struct ast_taskprocessor *ast_threadpool_serializer(const char *name, struct ast_threadpool *pool) +{ + return ast_threadpool_serializer_group(name, pool, NULL); +} diff --git a/res/res_pjsip.c b/res/res_pjsip.c index 5389087e01837f4a54df77d6ad7a7d4e9349d2c9..e92de51bb147d232079f557a86befb74d254dfdb 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -3322,20 +3322,25 @@ int ast_sip_append_body(pjsip_tx_data *tdata, const char *body_text) return 0; } -struct ast_taskprocessor *ast_sip_create_serializer(void) +struct ast_taskprocessor *ast_sip_create_serializer_group(struct ast_serializer_shutdown_group *shutdown_group) { struct ast_taskprocessor *serializer; char name[AST_UUID_STR_LEN]; ast_uuid_generate_str(name, sizeof(name)); - serializer = ast_threadpool_serializer(name, sip_threadpool); + serializer = ast_threadpool_serializer_group(name, sip_threadpool, shutdown_group); if (!serializer) { return NULL; } return serializer; } +struct ast_taskprocessor *ast_sip_create_serializer(void) +{ + return ast_sip_create_serializer_group(NULL); +} + int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { if (serializer) {