diff --git a/changelog b/changelog index 07b6acee74bf3147a226b8f362ab7cc050260139..8b79f121a958bb98e0d7b6c277b5d4f9f8ad5c24 100644 --- a/changelog +++ b/changelog @@ -20,6 +20,15 @@ User api additions after the service call... if it's still nonzero, the descriptor belongs to you and you need to take care of it. + - libwebsocket_rx_flow_allow_all_protocol(protocol) will unthrottle all + connections with the established protocol. It's designed to be + called from user server code when it sees it can accept more input + and may have throttled connections using the server rx flow apis + while it was unable to accept any other input The user server code + then does not have to try to track while connections it choked, this + will free up all of them in one call. + + User api changes ---------------- diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c index c96db244702c29c597728ef8aa16789b2d933930..acbf4624988123eee77e4c7ccbf108a43f3cb425 100644 --- a/lib/libwebsockets.c +++ b/lib/libwebsockets.c @@ -1579,6 +1579,33 @@ libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable) return 0; } +/** + * libwebsocket_rx_flow_allow_all_protocol() - Allow all connections with this protocol to receive + * + * When the user server code realizes it can accept more input, it can + * call this to have the RX flow restriction removed from all connections using + * the given protocol. + * + * @protocol: all connections using this protocol will be allowed to receive + */ + +void +libwebsocket_rx_flow_allow_all_protocol( + const struct libwebsocket_protocols *protocol) +{ + struct libwebsocket_context *context = protocol->owning_server; + int n; + struct libwebsocket *wsi; + + for (n = 0; n < context->fds_count; n++) { + wsi = context->lws_lookup[context->fds[n].fd]; + if (!wsi) + continue; + if (wsi->protocol == protocol) + libwebsocket_rx_flow_control(wsi, LWS_RXFLOW_ALLOW); + } +} + /** * libwebsocket_canonical_hostname() - returns this host's hostname diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h index 5ad749dd717721899d1ffe113d19e7773a315bf0..b7eaa4126a60ad4994972254923200bb4eed0a52 100644 --- a/lib/libwebsockets.h +++ b/lib/libwebsockets.h @@ -888,6 +888,10 @@ libwebsocket_get_reserved_bits(struct libwebsocket *wsi); LWS_EXTERN int libwebsocket_rx_flow_control(struct libwebsocket *wsi, int enable); +LWS_EXTERN void +libwebsocket_rx_flow_allow_all_protocol( + const struct libwebsocket_protocols *protocol); + LWS_EXTERN size_t libwebsockets_remaining_packet_payload(struct libwebsocket *wsi); diff --git a/libwebsockets-api-doc.html b/libwebsockets-api-doc.html index 7c8e1c1834bbfbb96795205c8d08d8649d490980..7a150456c8d6970913f7ef1a03a65d933cedbb60 100644 --- a/libwebsockets-api-doc.html +++ b/libwebsockets-api-doc.html @@ -273,6 +273,23 @@ If the output side of a server process becomes choked, this allows flow control for the input side. </blockquote> <hr> +<h2>libwebsocket_rx_flow_allow_all_protocol - Allow all connections with this protocol to receive</h2> +<i>void</i> +<b>libwebsocket_rx_flow_allow_all_protocol</b> +(<i>const struct libwebsocket_protocols *</i> <b>protocol</b>) +<h3>Arguments</h3> +<dl> +<dt><b>protocol</b> +<dd>all connections using this protocol will be allowed to receive +</dl> +<h3>Description</h3> +<blockquote> +<p> +When the user server code realizes it can accept more input, it can +call this to have the RX flow restriction removed from all connections using +the given protocol. +</blockquote> +<hr> <h2>libwebsocket_canonical_hostname - returns this host's hostname</h2> <i>const char *</i> <b>libwebsocket_canonical_hostname</b> diff --git a/test-server/test-server.c b/test-server/test-server.c index cfe34ee362ce47c13e0cb553c174ce897cd07578..7ee43750f1f3d434dffe8d21635f7df382ab26cf 100644 --- a/test-server/test-server.c +++ b/test-server/test-server.c @@ -446,9 +446,6 @@ struct a_message { static struct a_message ringbuffer[MAX_MESSAGE_QUEUE]; static int ringbuffer_head; -static struct libwebsocket *wsi_choked[20]; -static int num_wsi_choked; - static int callback_lws_mirror(struct libwebsocket_context *context, struct libwebsocket *wsi, @@ -497,11 +494,10 @@ callback_lws_mirror(struct libwebsocket_context *context, pss->ringbuffer_tail++; if (((ringbuffer_head - pss->ringbuffer_tail) & - (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15)) { - for (n = 0; n < num_wsi_choked; n++) - libwebsocket_rx_flow_control(wsi_choked[n], 1); - num_wsi_choked = 0; - } + (MAX_MESSAGE_QUEUE - 1)) == (MAX_MESSAGE_QUEUE - 15)) + libwebsocket_rx_flow_allow_all_protocol( + libwebsockets_get_protocol(wsi)); + // lwsl_debug("tx fifo %d\n", (ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)); if (lws_send_pipe_choked(wsi)) { @@ -543,11 +539,8 @@ callback_lws_mirror(struct libwebsocket_context *context, goto done; choke: - if (num_wsi_choked < sizeof wsi_choked / sizeof wsi_choked[0]) { - lwsl_debug("LWS_CALLBACK_RECEIVE: throttling %p\n", wsi); - libwebsocket_rx_flow_control(wsi, 0); - wsi_choked[num_wsi_choked++] = wsi; - } + lwsl_debug("LWS_CALLBACK_RECEIVE: throttling %p\n", wsi); + libwebsocket_rx_flow_control(wsi, 0); // lwsl_debug("rx fifo %d\n", (ringbuffer_head - pss->ringbuffer_tail) & (MAX_MESSAGE_QUEUE - 1)); done: