diff --git a/channels/chan_pjsip.c b/channels/chan_pjsip.c index 6b266481925ff13ec52e6f016e2c8307c3b57d36..dde7416c3180f4a0ed6671b5a91d9410f8c88e0a 100644 --- a/channels/chan_pjsip.c +++ b/channels/chan_pjsip.c @@ -718,7 +718,7 @@ static int chan_pjsip_answer(struct ast_channel *ast) can occur between this thread and bridging (specifically when native bridging attempts to do direct media) */ ast_channel_unlock(ast); - res = ast_sip_push_task_synchronous(session->serializer, answer, session); + res = ast_sip_push_task_wait_serializer(session->serializer, answer, session); if (res) { if (res == -1) { ast_log(LOG_ERROR,"Cannot answer '%s': Unable to push answer task to the threadpool.\n", @@ -2502,10 +2502,10 @@ static struct ast_channel *chan_pjsip_request_with_stream_topology(const char *t req_data.topology = topology; req_data.dest = data; - /* Default failure value in case ast_sip_push_task_synchronous() itself fails. */ + /* Default failure value in case ast_sip_push_task_wait_servant() itself fails. */ req_data.cause = AST_CAUSE_FAILURE; - if (ast_sip_push_task_synchronous(NULL, request, &req_data)) { + if (ast_sip_push_task_wait_servant(NULL, request, &req_data)) { *cause = req_data.cause; return NULL; } diff --git a/channels/pjsip/dialplan_functions.c b/channels/pjsip/dialplan_functions.c index aa376f89218b07c8f49890a478f3d3c37feec2f9..ce347dcd98b3c8c79487593dc3125b25c47ab264 100644 --- a/channels/pjsip/dialplan_functions.c +++ b/channels/pjsip/dialplan_functions.c @@ -897,7 +897,7 @@ int pjsip_acf_channel_read(struct ast_channel *chan, const char *cmd, char *data func_args.field = args.field; func_args.buf = buf; func_args.len = len; - if (ast_sip_push_task_synchronous(func_args.session->serializer, read_pjsip, &func_args)) { + if (ast_sip_push_task_wait_serializer(func_args.session->serializer, read_pjsip, &func_args)) { ast_log(LOG_WARNING, "Unable to read properties of channel %s: failed to push task\n", ast_channel_name(chan)); ao2_ref(func_args.session, -1); return -1; @@ -1219,7 +1219,7 @@ int pjsip_acf_media_offer_write(struct ast_channel *chan, const char *cmd, char mdata.media_type = AST_MEDIA_TYPE_VIDEO; } - return ast_sip_push_task_synchronous(channel->session->serializer, media_offer_write_av, &mdata); + return ast_sip_push_task_wait_serializer(channel->session->serializer, media_offer_write_av, &mdata); } int pjsip_acf_dtmf_mode_read(struct ast_channel *chan, const char *cmd, char *data, char *buf, size_t len) @@ -1390,7 +1390,7 @@ int pjsip_acf_dtmf_mode_write(struct ast_channel *chan, const char *cmd, char *d ast_channel_unlock(chan); - return ast_sip_push_task_synchronous(channel->session->serializer, dtmf_mode_refresh_cb, &rdata); + return ast_sip_push_task_wait_serializer(channel->session->serializer, dtmf_mode_refresh_cb, &rdata); } static int refresh_write_cb(void *obj) @@ -1438,5 +1438,5 @@ int pjsip_acf_session_refresh_write(struct ast_channel *chan, const char *cmd, c rdata.method = AST_SIP_SESSION_REFRESH_METHOD_UPDATE; } - return ast_sip_push_task_synchronous(channel->session->serializer, refresh_write_cb, &rdata); + return ast_sip_push_task_wait_serializer(channel->session->serializer, refresh_write_cb, &rdata); } diff --git a/include/asterisk/res_pjsip.h b/include/asterisk/res_pjsip.h index b01d6f5d09375b312c0e1fecc7a1d0988898ce81..e937018f29d33bd2a2d003d9eb10b6d5284cee98 100644 --- a/include/asterisk/res_pjsip.h +++ b/include/asterisk/res_pjsip.h @@ -1543,26 +1543,90 @@ struct ast_sip_endpoint *ast_sip_dialog_get_endpoint(pjsip_dialog *dlg); int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); /*! - * \brief Push a task to SIP servants and wait for it to complete + * \brief Push a task to SIP servants and wait for it to complete. + * + * Like \ref ast_sip_push_task except that it blocks until the task + * completes. If the current thread is a SIP servant thread then the + * task executes immediately. Otherwise, the specified serializer + * executes the task and the current thread waits for it to complete. + * + * \note PJPROJECT callbacks tend to have locks already held when + * called. + * + * \warning \b Never hold locks that may be acquired by a SIP servant + * thread when calling this function. Doing so may cause a deadlock + * if all SIP servant threads are blocked waiting to acquire the lock + * while the thread holding the lock is waiting for a free SIP servant + * thread. + * + * \warning \b Use of this function in an ao2 destructor callback is a + * bad idea. You don't have control over which thread executes the + * destructor. Attempting to shift execution to another thread with + * this function is likely to cause deadlock. + * + * \param serializer The SIP serializer to execute the task if the + * current thread is not a SIP servant. NULL if any of the default + * serializers can be used. + * \param sip_task The task to execute + * \param task_data The parameter to pass to the task when it executes + * + * \note The sip_task() return value may need to be distinguished from + * the failure to push the task. + * + * \return sip_task() return value on success. + * \retval -1 Failure to push the task. + */ +int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); + +/*! + * \brief Push a task to SIP servants and wait for it to complete. + * \deprecated Replaced with ast_sip_push_task_wait_servant(). + */ +int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); + +/*! + * \brief Push a task to the serializer and wait for it to complete. + * + * Like \ref ast_sip_push_task except that it blocks until the task is + * completed by the specified serializer. If the specified serializer + * is the current thread then the task executes immediately. + * + * \note PJPROJECT callbacks tend to have locks already held when + * called. * - * Like \ref ast_sip_push_task except that it blocks until the task completes. + * \warning \b Never hold locks that may be acquired by a SIP servant + * thread when calling this function. Doing so may cause a deadlock + * if all SIP servant threads are blocked waiting to acquire the lock + * while the thread holding the lock is waiting for a free SIP servant + * thread for the serializer to execute in. * - * \warning \b Never use this function in a SIP servant thread. This can potentially - * cause a deadlock. If you are in a SIP servant thread, just call your function - * in-line. + * \warning \b Never hold locks that may be acquired by the serializer + * when calling this function. Doing so will cause a deadlock. * - * \warning \b Never hold locks that may be acquired by a SIP servant thread when - * calling this function. Doing so may cause a deadlock if all SIP servant threads - * are blocked waiting to acquire the lock while the thread holding the lock is - * waiting for a free SIP servant thread. + * \warning \b Never use this function in the pjsip monitor thread (It + * is a SIP servant thread). This is likely to cause a deadlock. * - * \param serializer The SIP serializer to which the task belongs. May be NULL. + * \warning \b Use of this function in an ao2 destructor callback is a + * bad idea. You don't have control over which thread executes the + * destructor. Attempting to shift execution to another thread with + * this function is likely to cause deadlock. + * + * \param serializer The SIP serializer to execute the task. NULL if + * any of the default serializers can be used. * \param sip_task The task to execute * \param task_data The parameter to pass to the task when it executes - * \retval 0 Success - * \retval -1 Failure + * + * \note It is generally better to call + * ast_sip_push_task_wait_servant() if you pass NULL for the + * serializer parameter. + * + * \note The sip_task() return value may need to be distinguished from + * the failure to push the task. + * + * \return sip_task() return value on success. + * \retval -1 Failure to push the task. */ -int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); +int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data); /*! * \brief Determine if the current thread is a SIP servant thread diff --git a/res/res_pjsip.c b/res/res_pjsip.c index 3241d777d8cbcba2b7f3eeccd1d7d79359b8cb32..19e6e1d132ca1644fa441c749296c08c0ddb447f 100644 --- a/res/res_pjsip.c +++ b/res/res_pjsip.c @@ -2743,7 +2743,7 @@ static int register_service(void *data) int ast_sip_register_service(pjsip_module *module) { - return ast_sip_push_task_synchronous(NULL, register_service, &module); + return ast_sip_push_task_wait_servant(NULL, register_service, &module); } static int unregister_service(void *data) @@ -2759,7 +2759,7 @@ static int unregister_service(void *data) void ast_sip_unregister_service(pjsip_module *module) { - ast_sip_push_task_synchronous(NULL, unregister_service, &module); + ast_sip_push_task_wait_servant(NULL, unregister_service, &module); } static struct ast_sip_authenticator *registered_authenticator; @@ -3009,7 +3009,7 @@ static char *cli_dump_endpt(struct ast_cli_entry *e, int cmd, struct ast_cli_arg return CLI_SHOWUSAGE; } - ast_sip_push_task_synchronous(NULL, do_cli_dump_endpt, a); + ast_sip_push_task_wait_servant(NULL, do_cli_dump_endpt, a); return CLI_SUCCESS; } @@ -4484,21 +4484,30 @@ static int serializer_pool_setup(void) return 0; } +static struct ast_taskprocessor *serializer_pool_pick(void) +{ + struct ast_taskprocessor *serializer; + + unsigned int pos; + + /* + * Pick a serializer to use from the pool. + * + * Note: We don't care about any reentrancy behavior + * when incrementing serializer_pool_pos. If it gets + * incorrectly incremented it doesn't matter. + */ + pos = serializer_pool_pos++; + pos %= SERIALIZER_POOL_SIZE; + serializer = serializer_pool[pos]; + + return serializer; +} + int ast_sip_push_task(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { if (!serializer) { - unsigned int pos; - - /* - * Pick a serializer to use from the pool. - * - * Note: We don't care about any reentrancy behavior - * when incrementing serializer_pool_pos. If it gets - * incorrectly incremented it doesn't matter. - */ - pos = serializer_pool_pos++; - pos %= SERIALIZER_POOL_SIZE; - serializer = serializer_pool[pos]; + serializer = serializer_pool_pick(); } return ast_taskprocessor_push(serializer, sip_task, task_data); @@ -4522,9 +4531,8 @@ static int sync_task(void *data) /* * Once we unlock std->lock after signaling, we cannot access - * std again. The thread waiting within - * ast_sip_push_task_synchronous() is free to continue and - * release its local variable (std). + * std again. The thread waiting within ast_sip_push_task_wait() + * is free to continue and release its local variable (std). */ ast_mutex_lock(&std->lock); std->complete = 1; @@ -4534,15 +4542,11 @@ static int sync_task(void *data) return ret; } -int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +static int ast_sip_push_task_wait(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) { /* This method is an onion */ struct sync_task_data std; - if (ast_sip_thread_is_servant()) { - return sip_task(task_data); - } - memset(&std, 0, sizeof(std)); ast_mutex_init(&std.lock); ast_cond_init(&std.cond, NULL); @@ -4566,6 +4570,42 @@ int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*si return std.fail; } +int ast_sip_push_task_wait_servant(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +{ + if (ast_sip_thread_is_servant()) { + return sip_task(task_data); + } + + return ast_sip_push_task_wait(serializer, sip_task, task_data); +} + +int ast_sip_push_task_synchronous(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +{ + return ast_sip_push_task_wait_servant(serializer, sip_task, task_data); +} + +int ast_sip_push_task_wait_serializer(struct ast_taskprocessor *serializer, int (*sip_task)(void *), void *task_data) +{ + if (!serializer) { + /* Caller doesn't care which PJSIP serializer the task executes under. */ + serializer = serializer_pool_pick(); + if (!serializer) { + /* No serializer picked to execute the task */ + return -1; + } + } + if (ast_taskprocessor_is_task(serializer)) { + /* + * We are the requested serializer so we must execute + * the task now or deadlock waiting on ourself to + * execute it. + */ + return sip_task(task_data); + } + + return ast_sip_push_task_wait(serializer, sip_task, task_data); +} + void ast_copy_pj_str(char *dest, const pj_str_t *src, size_t size) { size_t chars_to_copy = MIN(size - 1, pj_strlen(src)); @@ -5191,7 +5231,7 @@ static int reload_module(void) * We must wait for the reload to complete so multiple * reloads cannot happen at the same time. */ - if (ast_sip_push_task_synchronous(NULL, reload_configuration_task, NULL)) { + if (ast_sip_push_task_wait_servant(NULL, reload_configuration_task, NULL)) { ast_log(LOG_WARNING, "Failed to reload PJSIP\n"); return -1; } @@ -5208,7 +5248,7 @@ static int unload_module(void) /* The thread this is called from cannot call PJSIP/PJLIB functions, * so we have to push the work to the threadpool to handle */ - ast_sip_push_task_synchronous(NULL, unload_pjsip, NULL); + ast_sip_push_task_wait_servant(NULL, unload_pjsip, NULL); ast_sip_destroy_scheduler(); serializer_pool_shutdown(); ast_threadpool_shutdown(sip_threadpool); diff --git a/res/res_pjsip/config_system.c b/res/res_pjsip/config_system.c index dfd92404bfd85ed374c70cf04b55d2806e93a25f..ed2b5d232b27f6f431c77a9873ad97df19a80a85 100644 --- a/res/res_pjsip/config_system.c +++ b/res/res_pjsip/config_system.c @@ -282,5 +282,5 @@ static int system_create_resolver_and_set_nameservers(void *data) void ast_sip_initialize_dns(void) { - ast_sip_push_task_synchronous(NULL, system_create_resolver_and_set_nameservers, NULL); + ast_sip_push_task_wait_servant(NULL, system_create_resolver_and_set_nameservers, NULL); } diff --git a/res/res_pjsip/config_transport.c b/res/res_pjsip/config_transport.c index 15c03769b818366506cc434b49982075701c2b8e..dd7c7049dcb26df73b6889346fc0822bf0aad3fd 100644 --- a/res/res_pjsip/config_transport.c +++ b/res/res_pjsip/config_transport.c @@ -267,7 +267,7 @@ static void sip_transport_state_destroy(void *obj) { struct ast_sip_transport_state *state = obj; - ast_sip_push_task_synchronous(NULL, destroy_sip_transport_state, state); + ast_sip_push_task_wait_servant(NULL, destroy_sip_transport_state, state); } /*! \brief Destructor for ast_sip_transport state information */ diff --git a/res/res_pjsip_header_funcs.c b/res/res_pjsip_header_funcs.c index 6c0f9151db36893e4a80aadcc69f19eb40a81b5a..798a1cde6dcaaf09e7c35e43adb7e42bacf19b9c 100644 --- a/res/res_pjsip_header_funcs.c +++ b/res/res_pjsip_header_funcs.c @@ -153,7 +153,7 @@ static const struct ast_datastore_info header_datastore = { .type = "header_datastore", }; -/*! \brief Data structure used for ast_sip_push_task_synchronous */ +/*! \brief Data structure used for ast_sip_push_task_wait_serializer */ struct header_data { struct ast_sip_channel_pvt *channel; char *header_name; @@ -480,11 +480,11 @@ static int func_read_header(struct ast_channel *chan, const char *function, char header_data.len = len; if (!strcasecmp(args.action, "read")) { - return ast_sip_push_task_synchronous(channel->session->serializer, read_header, - &header_data); + return ast_sip_push_task_wait_serializer(channel->session->serializer, + read_header, &header_data); } else if (!strcasecmp(args.action, "remove")) { - return ast_sip_push_task_synchronous(channel->session->serializer, remove_header, - &header_data); + return ast_sip_push_task_wait_serializer(channel->session->serializer, + remove_header, &header_data); } else { ast_log(AST_LOG_ERROR, "Unknown action '%s' is not valid, must be 'read' or 'remove'.\n", @@ -539,14 +539,14 @@ static int func_write_header(struct ast_channel *chan, const char *cmd, char *da header_data.len = 0; if (!strcasecmp(args.action, "add")) { - return ast_sip_push_task_synchronous(channel->session->serializer, add_header, - &header_data); + return ast_sip_push_task_wait_serializer(channel->session->serializer, + add_header, &header_data); } else if (!strcasecmp(args.action, "update")) { - return ast_sip_push_task_synchronous(channel->session->serializer, update_header, - &header_data); + return ast_sip_push_task_wait_serializer(channel->session->serializer, + update_header, &header_data); } else if (!strcasecmp(args.action, "remove")) { - return ast_sip_push_task_synchronous(channel->session->serializer, remove_header, - &header_data); + return ast_sip_push_task_wait_serializer(channel->session->serializer, + remove_header, &header_data); } else { ast_log(AST_LOG_ERROR, "Unknown action '%s' is not valid, must be 'add', 'update', or 'remove'.\n", diff --git a/res/res_pjsip_history.c b/res/res_pjsip_history.c index ab035a2966a25d1e614c0e22bb2be45b00718ece..eed06eed8238f6fb3d3fb6fe414a496a1cd54bf5 100644 --- a/res/res_pjsip_history.c +++ b/res/res_pjsip_history.c @@ -1385,7 +1385,7 @@ static int unload_module(void) ast_cli_unregister_multiple(cli_pjsip, ARRAY_LEN(cli_pjsip)); ast_sip_unregister_service(&logging_module); - ast_sip_push_task_synchronous(NULL, clear_history_entries, NULL); + ast_sip_push_task_wait_servant(NULL, clear_history_entries, NULL); AST_VECTOR_FREE(&vector_history); ast_pjproject_caching_pool_destroy(&cachingpool); diff --git a/res/res_pjsip_outbound_publish.c b/res/res_pjsip_outbound_publish.c index 8befbc1e85a9a79a9c93c69fcfa50c036a9bff76..4894e55d1a40068b4c357cf7e00747112432daad 100644 --- a/res/res_pjsip_outbound_publish.c +++ b/res/res_pjsip_outbound_publish.c @@ -1070,7 +1070,7 @@ static struct sip_outbound_publisher *sip_outbound_publisher_alloc( return NULL; } - if (ast_sip_push_task_synchronous(NULL, sip_outbound_publisher_init, publisher)) { + if (ast_sip_push_task_wait_servant(NULL, sip_outbound_publisher_init, publisher)) { ast_log(LOG_ERROR, "Unable to create publisher for outbound publish '%s'\n", ast_sorcery_object_get_id(client->publish)); ao2_ref(publisher, -1); @@ -1514,8 +1514,8 @@ static int current_state_reusable(struct ast_sip_outbound_publish *publish, */ old_publish = current_state->client->publish; current_state->client->publish = publish; - if (ast_sip_push_task_synchronous( - NULL, sip_outbound_publisher_reinit_all, current_state->client->publishers)) { + if (ast_sip_push_task_wait_servant(NULL, sip_outbound_publisher_reinit_all, + current_state->client->publishers)) { /* * If the state object fails to re-initialize then swap * the old publish info back in. diff --git a/res/res_pjsip_outbound_registration.c b/res/res_pjsip_outbound_registration.c index 2839ecbab0d750a9c384ca5ca4f9cbc127b5aa91..8a90849c0ab2457cec39d18e3164fd8b5852831b 100644 --- a/res/res_pjsip_outbound_registration.c +++ b/res/res_pjsip_outbound_registration.c @@ -1480,7 +1480,7 @@ static int sip_outbound_registration_apply(const struct ast_sorcery *sorcery, vo return -1; } - if (ast_sip_push_task_synchronous(new_state->client_state->serializer, + if (ast_sip_push_task_wait_serializer(new_state->client_state->serializer, sip_outbound_registration_regc_alloc, new_state)) { return -1; } @@ -1850,8 +1850,7 @@ static int ami_outbound_registration_detail(void *obj, void *arg, int flags) struct sip_ami_outbound *ami = arg; ami->registration = obj; - return ast_sip_push_task_synchronous( - NULL, ami_outbound_registration_task, ami); + return ast_sip_push_task_wait_servant(NULL, ami_outbound_registration_task, ami); } static int ami_show_outbound_registrations(struct mansession *s, diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c index 69c256dab3cc39fda91af7b741898fc1c2bc7119..9e0718f515143289eaa8a71de790f5b812295864 100644 --- a/res/res_pjsip_pubsub.c +++ b/res/res_pjsip_pubsub.c @@ -1318,7 +1318,8 @@ static void subscription_tree_destructor(void *obj) destroy_subscriptions(sub_tree->root); if (sub_tree->dlg) { - ast_sip_push_task_synchronous(sub_tree->serializer, subscription_unreference_dialog, sub_tree); + ast_sip_push_task_wait_servant(sub_tree->serializer, + subscription_unreference_dialog, sub_tree); } ao2_cleanup(sub_tree->endpoint); @@ -1665,7 +1666,8 @@ static int subscription_persistence_recreate(void *obj, void *arg, int flags) } recreate_data.persistence = persistence; recreate_data.rdata = &rdata; - if (ast_sip_push_task_synchronous(serializer, sub_persistence_recreate, &recreate_data)) { + if (ast_sip_push_task_wait_serializer(serializer, sub_persistence_recreate, + &recreate_data)) { ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not continue under distributor serializer.\n", persistence->endpoint); ast_sorcery_delete(ast_sip_get_sorcery(), persistence); diff --git a/res/res_pjsip_refer.c b/res/res_pjsip_refer.c index 120203c95e1b7bef55d3c8c9bfe82a6408216d76..1e6ca7f46eaaeb9323dfb101fe100b110417d90f 100644 --- a/res/res_pjsip_refer.c +++ b/res/res_pjsip_refer.c @@ -316,7 +316,15 @@ static void refer_progress_on_evsub_state(pjsip_evsub *sub, pjsip_event *event) /* It's possible that a task is waiting to remove us already, so bump the refcount of progress so it doesn't get destroyed */ ao2_ref(progress, +1); pjsip_dlg_dec_lock(progress->dlg); - ast_sip_push_task_synchronous(progress->serializer, refer_progress_terminate, progress); + /* + * XXX We are always going to execute this inline rather than + * in the serializer because this function is a PJPROJECT + * callback and thus has to be a SIP servant thread. + * + * The likely remedy is to push most of this function into + * refer_progress_terminate() with ast_sip_push_task(). + */ + ast_sip_push_task_wait_servant(progress->serializer, refer_progress_terminate, progress); pjsip_dlg_inc_lock(progress->dlg); ao2_ref(progress, -1); @@ -960,7 +968,8 @@ static int refer_incoming_invite_request(struct ast_sip_session *session, struct invite.session = other_session; - if (ast_sip_push_task_synchronous(other_session->serializer, invite_replaces, &invite)) { + if (ast_sip_push_task_wait_serializer(other_session->serializer, invite_replaces, + &invite)) { response = 481; goto inv_replace_failed; } diff --git a/res/res_pjsip_transport_websocket.c b/res/res_pjsip_transport_websocket.c index 974b150876f0cefd96f130b8adb5fbc0fb36b7f3..633594359f331dfbe19ff5802c280f2f17d123a8 100644 --- a/res/res_pjsip_transport_websocket.c +++ b/res/res_pjsip_transport_websocket.c @@ -377,7 +377,7 @@ static void websocket_cb(struct ast_websocket *session, struct ast_variable *par create_data.ws_session = session; - if (ast_sip_push_task_synchronous(serializer, transport_create, &create_data)) { + if (ast_sip_push_task_wait_serializer(serializer, transport_create, &create_data)) { ast_log(LOG_ERROR, "Could not create WebSocket transport.\n"); ast_taskprocessor_unreference(serializer); ast_websocket_unref(session); @@ -396,13 +396,13 @@ static void websocket_cb(struct ast_websocket *session, struct ast_variable *par } if (opcode == AST_WEBSOCKET_OPCODE_TEXT || opcode == AST_WEBSOCKET_OPCODE_BINARY) { - ast_sip_push_task_synchronous(serializer, transport_read, &read_data); + ast_sip_push_task_wait_serializer(serializer, transport_read, &read_data); } else if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) { break; } } - ast_sip_push_task_synchronous(serializer, transport_shutdown, transport); + ast_sip_push_task_wait_serializer(serializer, transport_shutdown, transport); ast_taskprocessor_unreference(serializer); ast_websocket_unref(session);