diff --git a/res/res_pjsip_transport_websocket.c b/res/res_pjsip_transport_websocket.c index 914c8b8ffd668db48444accab8ee3fd562586658..e98756fdf402293c30eb7fc6707c9cb0c5a5e081 100644 --- a/res/res_pjsip_transport_websocket.c +++ b/res/res_pjsip_transport_websocket.c @@ -79,6 +79,25 @@ static pj_status_t ws_send_msg(pjsip_transport *transport, static pj_status_t ws_destroy(pjsip_transport *transport) { struct ws_transport *wstransport = (struct ws_transport *)transport; + int fd = ast_websocket_fd(wstransport->ws_session); + + if (fd > 0) { + ast_websocket_close(wstransport->ws_session, 1000); + shutdown(fd, SHUT_RDWR); + } + + ao2_ref(wstransport, -1); + + return PJ_SUCCESS; +} + +static void transport_dtor(void *arg) +{ + struct ws_transport *wstransport = arg; + + if (wstransport->ws_session) { + ast_websocket_unref(wstransport->ws_session); + } if (wstransport->transport.ref_cnt) { pj_atomic_destroy(wstransport->transport.ref_cnt); @@ -88,20 +107,28 @@ static pj_status_t ws_destroy(pjsip_transport *transport) pj_lock_destroy(wstransport->transport.lock); } - pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->transport.pool); + if (wstransport->transport.endpt && wstransport->transport.pool) { + pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->transport.pool); + } if (wstransport->rdata.tp_info.pool) { pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->rdata.tp_info.pool); } - - return PJ_SUCCESS; } static int transport_shutdown(void *data) { - pjsip_transport *transport = data; + struct ws_transport *wstransport = data; + + if (!wstransport->transport.is_shutdown && !wstransport->transport.is_destroying) { + pjsip_transport_shutdown(&wstransport->transport); + } + + /* Note that the destructor calls PJSIP functions, + * therefore it must be called in a PJSIP thread. + */ + ao2_ref(wstransport, -1); - pjsip_transport_shutdown(transport); return 0; } @@ -116,32 +143,45 @@ struct transport_create_data { static int transport_create(void *data) { struct transport_create_data *create_data = data; - struct ws_transport *newtransport; + struct ws_transport *newtransport = NULL; pjsip_endpoint *endpt = ast_sip_get_pjsip_endpoint(); struct pjsip_tpmgr *tpmgr = pjsip_endpt_get_tpmgr(endpt); pj_pool_t *pool; - pj_str_t buf; + pj_status_t status; - if (!(pool = pjsip_endpt_create_pool(endpt, "ws", 512, 512))) { - ast_log(LOG_ERROR, "Failed to allocate WebSocket endpoint pool.\n"); - return -1; + newtransport = ao2_t_alloc_options(sizeof(*newtransport), transport_dtor, + AO2_ALLOC_OPT_LOCK_NOLOCK, "pjsip websocket transport"); + if (!newtransport) { + ast_log(LOG_ERROR, "Failed to allocate WebSocket transport.\n"); + goto on_error; } - if (!(newtransport = PJ_POOL_ZALLOC_T(pool, struct ws_transport))) { - ast_log(LOG_ERROR, "Failed to allocate WebSocket transport.\n"); - pjsip_endpt_release_pool(endpt, pool); - return -1; + newtransport->transport.endpt = endpt; + + if (!(pool = pjsip_endpt_create_pool(endpt, "ws", 512, 512))) { + ast_log(LOG_ERROR, "Failed to allocate WebSocket endpoint pool.\n"); + goto on_error; } + newtransport->transport.pool = pool; newtransport->ws_session = create_data->ws_session; - pj_atomic_create(pool, 0, &newtransport->transport.ref_cnt); - pj_lock_create_recursive_mutex(pool, pool->obj_name, &newtransport->transport.lock); + /* Keep the session until transport dies */ + ast_websocket_ref(newtransport->ws_session); + + status = pj_atomic_create(pool, 0, &newtransport->transport.ref_cnt); + if (status != PJ_SUCCESS) { + goto on_error; + } + + status = pj_lock_create_recursive_mutex(pool, pool->obj_name, &newtransport->transport.lock); + if (status != PJ_SUCCESS) { + goto on_error; + } - newtransport->transport.pool = pool; pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ast_sockaddr_stringify(ast_websocket_remote_address(newtransport->ws_session))), &newtransport->transport.key.rem_addr); newtransport->transport.key.rem_addr.addr.sa_family = pj_AF_INET(); newtransport->transport.key.type = ast_websocket_is_secure(newtransport->ws_session) ? transport_type_wss : transport_type_ws; @@ -159,24 +199,34 @@ static int transport_create(void *data) newtransport->transport.flag = pjsip_transport_get_flag_from_type((pjsip_transport_type_e)newtransport->transport.key.type); newtransport->transport.info = (char *)pj_pool_alloc(newtransport->transport.pool, 64); - newtransport->transport.endpt = endpt; newtransport->transport.tpmgr = tpmgr; newtransport->transport.send_msg = &ws_send_msg; newtransport->transport.destroy = &ws_destroy; - pjsip_transport_register(newtransport->transport.tpmgr, (pjsip_transport *)newtransport); + status = pjsip_transport_register(newtransport->transport.tpmgr, + (pjsip_transport *)newtransport); + if (status != PJ_SUCCESS) { + goto on_error; + } + + /* Add a reference for pjsip transport manager */ + ao2_ref(newtransport, +1); newtransport->rdata.tp_info.transport = &newtransport->transport; newtransport->rdata.tp_info.pool = pjsip_endpt_create_pool(endpt, "rtd%p", PJSIP_POOL_RDATA_LEN, PJSIP_POOL_RDATA_INC); if (!newtransport->rdata.tp_info.pool) { ast_log(LOG_ERROR, "Failed to allocate WebSocket rdata.\n"); - pjsip_endpt_release_pool(endpt, pool); - return -1; + pjsip_transport_destroy((pjsip_transport *)newtransport); + goto on_error; } create_data->transport = newtransport; return 0; + +on_error: + ao2_cleanup(newtransport); + return -1; } struct transport_read_data {