From 62af7934c8bf569afdfea59c067b8ff8c6c7825f Mon Sep 17 00:00:00 2001
From: Andy Green <andy@warmcat.com>
Date: Tue, 17 Apr 2018 11:43:20 +0800
Subject: [PATCH] rxflow buflist: handle forced service
---
lib/event-libs/libuv.c | 2 +
lib/libwebsockets.c | 6 +-
lib/libwebsockets.h | 10 +++
lib/plat/lws-plat-unix.c | 4 +
lib/private-libwebsockets.h | 8 +-
lib/roles/h1/client-h1.c | 4 +-
lib/roles/h1/ops-h1.c | 2 +-
lib/roles/h2/http2.c | 43 ++++++---
lib/roles/h2/ops-h2.c | 90 +++++++++++++++----
lib/roles/http/client/client-handshake.c | 1 +
lib/roles/http/client/client.c | 14 +--
lib/roles/http/server/server.c | 1 +
lib/roles/ws/ops-ws.c | 15 ++--
lib/roles/ws/server-ws.c | 26 +++---
lib/service.c | 63 ++++++++++++-
.../minimal-http-server-tls.c | 2 +-
plugins/protocol_post_demo.c | 8 +-
17 files changed, 232 insertions(+), 67 deletions(-)
diff --git a/lib/event-libs/libuv.c b/lib/event-libs/libuv.c
index 2420d8b9..984628e3 100644
--- a/lib/event-libs/libuv.c
+++ b/lib/event-libs/libuv.c
@@ -59,6 +59,8 @@ lws_uv_idle(uv_idle_t *handle
struct lws_context_per_thread, uv_idle);
lws_usec_t us;
+ lws_service_do_ripe_rxflow(pt);
+
/*
* is there anybody with pending stuff that needs service forcing?
*/
diff --git a/lib/libwebsockets.c b/lib/libwebsockets.c
index e6ec7db8..9eb5568e 100644
--- a/lib/libwebsockets.c
+++ b/lib/libwebsockets.c
@@ -963,7 +963,7 @@ int
lws_buflist_append_segment(struct lws_buflist **head, uint8_t *buf, size_t len)
{
int first = !*head;
- void *p;
+ void *p = *head;
assert(buf);
assert(len);
@@ -972,7 +972,7 @@ lws_buflist_append_segment(struct lws_buflist **head, uint8_t *buf, size_t len)
while (*head)
head = &((*head)->next);
- lwsl_info("%s: len %u\n", __func__, (uint32_t)len);
+ lwsl_info("%s: len %u first %d %p\n", __func__, (uint32_t)len, first, p);
*head = (struct lws_buflist *)
lws_malloc(sizeof(**head) + len, __func__);
@@ -1060,7 +1060,7 @@ lws_buflist_use_segment(struct lws_buflist **head, size_t len)
if (!*head)
return 0;
- return (*head)->len;
+ return (*head)->len - (*head)->pos;
}
/* ... */
diff --git a/lib/libwebsockets.h b/lib/libwebsockets.h
index db614c06..7eb1b1ac 100644
--- a/lib/libwebsockets.h
+++ b/lib/libwebsockets.h
@@ -5724,6 +5724,16 @@ lws_dll_lws_remove(struct lws_dll_lws *_a)
} \
}
+#define lws_start_foreach_dll(___type, ___it, ___start) \
+{ \
+ ___type ___it = ___start; \
+ while (___it) {
+
+#define lws_end_foreach_dll(___it) \
+ ___it = (___it)->next; \
+ } \
+}
+
struct lws_buflist;
/**
diff --git a/lib/plat/lws-plat-unix.c b/lib/plat/lws-plat-unix.c
index a252e672..55d26d41 100644
--- a/lib/plat/lws-plat-unix.c
+++ b/lib/plat/lws-plat-unix.c
@@ -266,6 +266,8 @@ _lws_plat_service_tsi(struct lws_context *context, int timeout_ms, int tsi)
if (!pt->rx_draining_ext_list && !n) /* poll timeout */ {
#endif
lws_service_fd_tsi(context, NULL, tsi);
+ lws_service_do_ripe_rxflow(pt);
+
return 0;
}
@@ -296,6 +298,8 @@ faked_service:
n--;
}
+ lws_service_do_ripe_rxflow(pt);
+
return 0;
}
diff --git a/lib/private-libwebsockets.h b/lib/private-libwebsockets.h
index 8f578733..123ca704 100644
--- a/lib/private-libwebsockets.h
+++ b/lib/private-libwebsockets.h
@@ -959,7 +959,7 @@ struct lws_context_per_thread {
struct lws *tx_draining_ext_list;
struct lws_dll_lws dll_head_timeout;
struct lws_dll_lws dll_head_hrtimer;
- struct lws_dll_lws dll_head_rxflow;
+ struct lws_dll_lws dll_head_rxflow; /* guys with pending rxflow */
#if defined(LWS_WITH_LIBUV) || defined(LWS_WITH_LIBEVENT)
struct lws_context *context;
#endif
@@ -2066,7 +2066,8 @@ struct lws {
struct lws_dll_lws dll_timeout;
struct lws_dll_lws dll_hrtimer;
- struct lws_dll_lws dll_rxflow;
+ struct lws_dll_lws dll_rxflow; /* guys with pending rxflow */
+
#if defined(LWS_WITH_PEER_LIMITS)
struct lws_peer *peer;
#endif
@@ -2236,6 +2237,9 @@ struct lws {
#define lws_is_flowcontrolled(w) (!!(wsi->rxflow_bitmap))
+void
+lws_service_do_ripe_rxflow(struct lws_context_per_thread *pt);
+
LWS_EXTERN int log_level;
LWS_EXTERN int
diff --git a/lib/roles/h1/client-h1.c b/lib/roles/h1/client-h1.c
index e3b8607f..f84ef061 100644
--- a/lib/roles/h1/client-h1.c
+++ b/lib/roles/h1/client-h1.c
@@ -54,8 +54,10 @@ lws_handshake_client(struct lws *wsi, unsigned char **buf, size_t len)
}
/* account for what we're using in rxflow buffer */
if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL) &&
- !lws_buflist_use_segment(&wsi->buflist_rxflow, 1))
+ !lws_buflist_use_segment(&wsi->buflist_rxflow, 1)) {
+ lwsl_debug("%s: removed wsi %p from rxflow list\n", __func__, wsi);
lws_dll_lws_remove(&wsi->dll_rxflow);
+ }
if (lws_client_rx_sm(wsi, *(*buf)++)) {
lwsl_debug("client_rx_sm exited\n");
diff --git a/lib/roles/h1/ops-h1.c b/lib/roles/h1/ops-h1.c
index fd9c0382..bc4943e4 100644
--- a/lib/roles/h1/ops-h1.c
+++ b/lib/roles/h1/ops-h1.c
@@ -178,7 +178,7 @@ postbody_completion:
if (!wsi->cgi)
#endif
{
- lwsl_info("HTTP_BODY_COMPLETION\n");
+ lwsl_info("HTTP_BODY_COMPLETION: %p (%s)\n", wsi, wsi->protocol->name);
n = wsi->protocol->callback(wsi,
LWS_CALLBACK_HTTP_BODY_COMPLETION,
wsi->user_space, NULL, 0);
diff --git a/lib/roles/h2/http2.c b/lib/roles/h2/http2.c
index b68152c0..7ac2be67 100644
--- a/lib/roles/h2/http2.c
+++ b/lib/roles/h2/http2.c
@@ -597,6 +597,8 @@ static void lws_h2_set_bin(struct lws *wsi, int n, unsigned char *buf)
*buf = wsi->h2.h2n->set.s[n];
}
+/* we get called on the network connection */
+
int lws_h2_do_pps_send(struct lws *wsi)
{
struct lws_h2_netconn *h2n = wsi->h2.h2n;
@@ -743,8 +745,10 @@ int lws_h2_do_pps_send(struct lws *wsi)
goto bail;
}
cwsi = lws_h2_wsi_from_id(wsi, pps->u.rs.sid);
- if (cwsi)
+ if (cwsi) {
+ lwsl_debug("%s: closing cwsi %p %s %s (wsi %p)\n", __func__, cwsi, cwsi->role_ops->name, cwsi->protocol->name, wsi);
lws_close_free_wsi(cwsi, 0, "reset stream");
+ }
break;
case LWS_H2_PPS_UPDATE_WINDOW:
@@ -1399,13 +1403,13 @@ lws_h2_parse_end_of_frame(struct lws *wsi)
}
}
+ wsi->vhost->conn_stats.h2_trans++;
p = lws_hdr_simple_ptr(h2n->swsi, WSI_TOKEN_HTTP_COLON_METHOD);
if (!strcmp(p, "POST"))
h2n->swsi->ah->frag_index[WSI_TOKEN_POST_URI] =
h2n->swsi->ah->frag_index[WSI_TOKEN_HTTP_COLON_PATH];
- wsi->vhost->conn_stats.h2_trans++;
-
+ lwsl_debug("%s: setting DEF_ACT from 0x%x\n", __func__, h2n->swsi->wsistate);
lwsi_set_state(h2n->swsi, LRS_DEFERRING_ACTION);
lws_callback_on_writable(h2n->swsi);
break;
@@ -1769,6 +1773,26 @@ lws_h2_parser(struct lws *wsi, unsigned char *in, lws_filepos_t inlen,
break;
} else {
+ if (lwsi_state(h2n->swsi) == LRS_DEFERRING_ACTION) {
+ m = lws_buflist_append_segment(
+ &h2n->swsi->buflist_rxflow,
+ in - 1, n);
+ if (m < 0)
+ return -1;
+ if (m) {
+ struct lws_context_per_thread *pt = &wsi->context->pt[(int)wsi->tsi];
+ lwsl_debug("%s: added %p to rxflow list\n", __func__, wsi);
+ lws_dll_lws_add_front(&h2n->swsi->dll_rxflow, &pt->dll_head_rxflow);
+ }
+ in += n - 1;
+ h2n->inside += n;
+ h2n->count += n - 1;
+ inlen -= n - 1;
+
+ lwsl_debug("%s: deferred %d\n", __func__, n);
+ goto do_windows;
+ }
+
h2n->swsi->outer_will_close = 1;
/*
* choose the length for this go so that we end at
@@ -1782,8 +1806,9 @@ lws_h2_parser(struct lws *wsi, unsigned char *in, lws_filepos_t inlen,
* can return 0 in POST body with
* content len exhausted somehow.
*/
- if (n <= 0) {
- lwsl_debug("%s: lws_read_h1 told %d %d / %d\n",
+ if (n < 0 ||
+ (!n && !lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL))) {
+ lwsl_info("%s: lws_read_h1 told %d %d / %d\n",
__func__, n, h2n->count, h2n->length);
in += h2n->length - h2n->count;
h2n->inside = h2n->length;
@@ -1800,6 +1825,7 @@ lws_h2_parser(struct lws *wsi, unsigned char *in, lws_filepos_t inlen,
h2n->count += n - 1;
}
+do_windows:
/* account for both network and stream wsi windows */
wsi->h2.peer_tx_cr_est -= n;
@@ -2172,7 +2198,7 @@ lws_read_h2(struct lws *wsi, unsigned char *buf, lws_filepos_t len)
m = lws_h2_parser(wsi, buf, len, &body_chunk_len);
if (m && m != 2) {
- lwsl_debug("%s: http2_parser bailed\n", __func__);
+ lwsl_debug("%s: http2_parser bailed: %d\n", __func__, m);
lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS,
"lws_read_h2 bail");
@@ -2185,11 +2211,6 @@ lws_read_h2(struct lws *wsi, unsigned char *buf, lws_filepos_t len)
break;
}
- /* account for what we're using in rxflow buffer */
- if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL) &&
- !lws_buflist_use_segment(&wsi->buflist_rxflow, body_chunk_len))
- lws_dll_lws_remove(&wsi->dll_rxflow);
-
buf += body_chunk_len;
len -= body_chunk_len;
}
diff --git a/lib/roles/h2/ops-h2.c b/lib/roles/h2/ops-h2.c
index 5c9a475b..1cbf86b0 100644
--- a/lib/roles/h2/ops-h2.c
+++ b/lib/roles/h2/ops-h2.c
@@ -91,7 +91,7 @@ rops_handle_POLLIN_h2(struct lws_context_per_thread *pt, struct lws *wsi,
unsigned int pending = 0;
char draining_flow = 0;
struct lws *wsi1;
- int n;
+ int n, m;
#ifdef LWS_WITH_CGI
if (wsi->cgi && (pollfd->revents & LWS_POLLOUT)) {
@@ -155,11 +155,13 @@ rops_handle_POLLIN_h2(struct lws_context_per_thread *pt, struct lws *wsi,
wsi->ws->tx_draining_ext = 0;
}
+#if 0 /* not so for h2 */
if (lws_is_flowcontrolled(wsi))
/* We cannot deal with any kind of new RX because we are
* RX-flowcontrolled.
*/
return LWS_HPI_RET_HANDLED;
+#endif
if (wsi->http2_substream || wsi->upgraded_to_http2) {
wsi1 = lws_get_network_wsi(wsi);
@@ -313,14 +315,6 @@ drain:
/* service incoming data */
if (eff_buf.token_len) {
- /*
- * if draining from rxflow buffer, not
- * critical to track what was used since at the
- * use it bumps wsi->rxflow_pos. If we come
- * around again it will pick up from where it
- * left off.
- */
-
if (lwsi_role_h2(wsi) && lwsi_state(wsi) != LRS_BODY)
n = lws_read_h2(wsi, (unsigned char *)eff_buf.token,
eff_buf.token_len);
@@ -333,6 +327,17 @@ drain:
n = 0;
return LWS_HPI_RET_DIE;
}
+
+ if (draining_flow) {
+ m = lws_buflist_use_segment(&wsi->buflist_rxflow, n);
+ lwsl_info("%s: draining rxflow: used %d, next %d\n",
+ __func__, n, m);
+ if (!m) {
+ lwsl_notice("%s: removed wsi %p from rxflow list\n",
+ __func__, wsi);
+ lws_dll_lws_remove(&wsi->dll_rxflow);
+ }
+ }
}
eff_buf.token = NULL;
@@ -386,6 +391,10 @@ int rops_handle_POLLOUT_h2(struct lws *wsi)
#endif
) && wsi->h2.h2n->pps) {
lwsl_info("servicing pps\n");
+ /*
+ * this is called on the network connection, but may close
+ * substreams... that may affect callers
+ */
if (lws_h2_do_pps_send(wsi)) {
wsi->socket_is_permanently_unusable = 1;
return LWS_HP_RET_BAIL_DIE;
@@ -418,9 +427,13 @@ rops_service_flag_pending_h2(struct lws_context *context, int tsi)
struct lws_context_per_thread *pt = &context->pt[tsi];
struct allocated_headers *ah;
int forced = 0;
+#endif
/* POLLIN faking (the pt lock is taken by the parent) */
+
+
+#if !defined(LWS_ROLE_H1)
/*
* 3) For any wsi who have an ah with pending RX who did not
* complete their current headers, and are not flowcontrolled,
@@ -621,7 +634,8 @@ rops_close_kill_connection_h2(struct lws *wsi, enum lws_close_status reason)
wsi->h2.parent_wsi);
lws_start_foreach_llp(struct lws **, w,
wsi->h2.parent_wsi->h2.child_list) {
- lwsl_info(" \\---- child %p\n", *w);
+ lwsl_info(" \\---- child %s %p\n",
+ (*w)->role_ops ? (*w)->role_ops->name : "?", *w);
} lws_end_foreach_llp(w, h2.sibling_list);
}
@@ -632,7 +646,9 @@ rops_close_kill_connection_h2(struct lws *wsi, enum lws_close_status reason)
lwsl_info(" parent %p: closing children: list:\n", wsi);
lws_start_foreach_llp(struct lws **, w,
wsi->h2.child_list) {
- lwsl_info(" \\---- child %p\n", *w);
+ lwsl_info(" \\---- child %s %p\n",
+ (*w)->role_ops ? (*w)->role_ops->name : "?",
+ *w);
} lws_end_foreach_llp(w, h2.sibling_list);
/* trigger closing of all of our http2 children first */
lws_start_foreach_llp(struct lws **, w,
@@ -652,6 +668,7 @@ rops_close_kill_connection_h2(struct lws *wsi, enum lws_close_status reason)
if (wsi->upgraded_to_http2) {
/* remove pps */
struct lws_h2_protocol_send *w = wsi->h2.h2n->pps, *w1;
+
while (w) {
w1 = w->next;
free(w);
@@ -770,16 +787,57 @@ lws_h2_dump_waiting_children(struct lws *wsi)
wsi = wsi->h2.child_list;
while (wsi) {
- if (wsi->h2.requested_POLLOUT)
- lwsl_info(" * %p %s\n", wsi, wsi->protocol->name);
- else
- lwsl_info(" %p %s\n", wsi, wsi->protocol->name);
+ lwsl_info(" %c %p %s %s\n",
+ wsi->h2.requested_POLLOUT ? '*' : ' ',
+ wsi, wsi->role_ops->name, wsi->protocol->name);
wsi = wsi->h2.sibling_list;
}
#endif
}
+static int
+lws_h2_bind_for_post_before_action(struct lws *wsi)
+{
+ const char *p;
+
+ p = lws_hdr_simple_ptr(wsi, WSI_TOKEN_HTTP_COLON_METHOD);
+ if (!strcmp(p, "POST")) {
+ const struct lws_protocols *pp;
+ const char *name;
+ const struct lws_http_mount *hit =
+ lws_find_mount(wsi,
+ lws_hdr_simple_ptr(wsi,
+ WSI_TOKEN_HTTP_COLON_PATH),
+ lws_hdr_total_length(wsi,
+ WSI_TOKEN_HTTP_COLON_PATH));
+
+ lwsl_debug("%s: %s: hit %p: %s\n", __func__,
+ lws_hdr_simple_ptr(wsi, WSI_TOKEN_HTTP_COLON_PATH),
+ hit, hit ? hit->origin : "null");
+ if (hit) {
+ name = hit->origin;
+ if (hit->protocol)
+ name = hit->protocol;
+
+ pp = lws_vhost_name_to_protocol(wsi->vhost, name);
+ if (!pp) {
+ lwsl_err("Unable to find plugin '%s'\n", name);
+ return 1;
+ }
+
+ if (lws_bind_protocol(wsi, pp))
+ return 1;
+ }
+
+ lwsl_notice("%s: setting LRS_BODY from 0x%x (%s)\n", __func__,
+ wsi->wsistate, wsi->protocol->name);
+ lwsi_set_state(wsi, LRS_BODY);
+ }
+
+ return 0;
+}
+
/*
* we are the 'network wsi' for potentially many muxed child wsi with
* no network connection of their own, who have to use us for all their
@@ -886,6 +944,8 @@ rops_perform_user_POLLOUT_h2(struct lws *wsi)
lwsi_set_state(w, LRS_ESTABLISHED);
+ lws_h2_bind_for_post_before_action(w);
+
lwsl_info(" h2 action start...\n");
n = lws_http_action(w);
lwsl_info(" h2 action result %d "
diff --git a/lib/roles/http/client/client-handshake.c b/lib/roles/http/client/client-handshake.c
index bf24bc26..247dc480 100644
--- a/lib/roles/http/client/client-handshake.c
+++ b/lib/roles/http/client/client-handshake.c
@@ -109,6 +109,7 @@ lws_client_connect_2(struct lws *wsi)
lwsl_info("%s: just join h2 directly\n",
__func__);
+ wsi->client_h2_alpn = 1;
lws_wsi_h2_adopt(w, wsi);
lws_vhost_unlock(wsi->vhost);
diff --git a/lib/roles/http/client/client.c b/lib/roles/http/client/client.c
index 4b60bf8a..e731befd 100644
--- a/lib/roles/http/client/client.c
+++ b/lib/roles/http/client/client.c
@@ -422,9 +422,9 @@ start_ws_handshake:
case LRS_ISSUE_HTTP_BODY:
if (wsi->client_http_body_pending) {
- lws_set_timeout(wsi,
- PENDING_TIMEOUT_CLIENT_ISSUE_PAYLOAD,
- context->timeout_secs);
+ //lws_set_timeout(wsi,
+ // PENDING_TIMEOUT_CLIENT_ISSUE_PAYLOAD,
+ // context->timeout_secs);
/* user code must ask for writable callback */
break;
}
@@ -658,13 +658,17 @@ lws_client_interpret_server_handshake(struct lws *wsi)
/* we are being an http client...
*/
#if defined(LWS_ROLE_H2)
- if (wsi->client_h2_alpn)
+ if (wsi->client_h2_alpn || wsi->client_h2_substream) {
+ lwsl_debug("%s: %p: transitioning to h2 client\n", __func__, wsi);
lws_role_transition(wsi, LWSIFR_CLIENT,
LRS_ESTABLISHED, &role_ops_h2);
- else
+ } else
#endif
+ {
+ lwsl_debug("%s: %p: transitioning to h1 client\n", __func__, wsi);
lws_role_transition(wsi, LWSIFR_CLIENT,
LRS_ESTABLISHED, &role_ops_h1);
+ }
wsi->ah = ah;
ah->http_response = 0;
diff --git a/lib/roles/http/server/server.c b/lib/roles/http/server/server.c
index 1348d0eb..19e57b83 100644
--- a/lib/roles/http/server/server.c
+++ b/lib/roles/http/server/server.c
@@ -1718,6 +1718,7 @@ lws_http_transaction_completed(struct lws *wsi)
* until we can verify POLLOUT. The part of this that confirms POLLOUT
* with no partials is in lws_server_socket_service() below.
*/
+ lwsl_debug("%s: setting DEF_ACT from 0x%x\n", __func__, wsi->wsistate);
lwsi_set_state(wsi, LRS_DEFERRING_ACTION);
wsi->http.tx_content_length = 0;
wsi->http.tx_content_remain = 0;
diff --git a/lib/roles/ws/ops-ws.c b/lib/roles/ws/ops-ws.c
index 9e143607..fae43ead 100644
--- a/lib/roles/ws/ops-ws.c
+++ b/lib/roles/ws/ops-ws.c
@@ -1093,13 +1093,6 @@ drain:
/* service incoming data */
if (eff_buf.token_len) {
- /*
- * if draining from rxflow buffer, not
- * critical to track what was used since at the
- * use it bumps wsi->rxflow_pos. If we come
- * around again it will pick up from where it
- * left off.
- */
#if defined(LWS_ROLE_H2)
if (lwsi_role_h2(wsi) && lwsi_state(wsi) != LRS_BODY)
n = lws_read_h2(wsi, (unsigned char *)eff_buf.token,
@@ -1114,6 +1107,14 @@ drain:
n = 0;
return LWS_HPI_RET_DIE;
}
+ if (draining_flow) {
+ m = lws_buflist_use_segment(&wsi->buflist_rxflow, n);
+ lwsl_debug("%s: draining rxflow: used %d, next %d\n", __func__, n, m);
+ if (!m) {
+ lwsl_notice("%s: removed wsi %p from rxflow list\n", __func__, wsi);
+ lws_dll_lws_remove(&wsi->dll_rxflow);
+ }
+ }
}
eff_buf.token = NULL;
diff --git a/lib/roles/ws/server-ws.c b/lib/roles/ws/server-ws.c
index fa7ddfe8..0f09cd3a 100644
--- a/lib/roles/ws/server-ws.c
+++ b/lib/roles/ws/server-ws.c
@@ -549,7 +549,6 @@ handshake_0405(struct lws_context *context, struct lws *wsi)
return 0;
-
bail:
/* caller will free up his parsing allocations */
return -1;
@@ -561,6 +560,9 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
{
int m, draining_flow = 0;
+ if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL))
+ draining_flow = 1;
+
lwsl_parser("%s: received %d byte packet\n", __func__, (int)len);
/* let the rx protocol state machine have as much as it needs */
@@ -583,22 +585,13 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
continue;
}
- /* account for what we're using in rxflow buffer */
- if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) {
- draining_flow = 1;
- if (!lws_buflist_use_segment(&wsi->buflist_rxflow, 1))
- lws_dll_lws_remove(&wsi->dll_rxflow);
- }
-
/* consume payload bytes efficiently */
if (wsi->lws_rx_parse_state ==
LWS_RXPS_PAYLOAD_UNTIL_LENGTH_EXHAUSTED) {
m = lws_payload_until_length_exhausted(wsi, buf, &len);
- if (lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) {
- draining_flow = 1;
- if (!lws_buflist_use_segment(&wsi->buflist_rxflow, m))
- lws_dll_lws_remove(&wsi->dll_rxflow);
- }
+ if (draining_flow &&
+ !lws_buflist_use_segment(&wsi->buflist_rxflow, m))
+ lws_dll_lws_remove(&wsi->dll_rxflow);
}
/* process the byte */
@@ -607,8 +600,11 @@ lws_interpret_incoming_packet(struct lws *wsi, unsigned char **buf, size_t len)
return -1;
len--;
- if (draining_flow && /* were draining, now nothing left */
- !lws_buflist_next_segment_len(&wsi->buflist_rxflow, NULL)) {
+ /* account for what we're using in rxflow buffer */
+ if (draining_flow &&
+ !lws_buflist_use_segment(&wsi->buflist_rxflow, 1)) {
+ lws_dll_lws_remove(&wsi->dll_rxflow);
+
lwsl_debug("%s: %p flow buf: drained\n", __func__, wsi);
/* having drained the rxflow buffer, can rearm POLLIN */
diff --git a/lib/service.c b/lib/service.c
index f9b709f4..b83587fa 100644
--- a/lib/service.c
+++ b/lib/service.c
@@ -136,11 +136,12 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
if (pollfd) {
int eff = vwsi->leave_pollout_active;
- if (!eff)
+ if (!eff) {
if (lws_change_pollfd(wsi, LWS_POLLOUT, 0)) {
lwsl_info("failed at set pollfd\n");
goto bail_die;
}
+ }
vwsi->handling_pollout = 0;
@@ -162,7 +163,9 @@ lws_handle_POLLOUT_event(struct lws *wsi, struct lws_pollfd *pollfd)
if (lwsi_role_client(wsi) &&
!wsi->hdr_parsing_completed &&
- lwsi_state(wsi) != LRS_H2_WAITING_TO_SEND_HEADERS)
+ lwsi_state(wsi) != LRS_H2_WAITING_TO_SEND_HEADERS &&
+ lwsi_state(wsi) != LRS_ISSUE_HTTP_BODY
+ )
goto bail_ok;
@@ -301,10 +304,13 @@ int lws_rxflow_cache(struct lws *wsi, unsigned char *buf, int n, int len)
/* a new rxflow, buffer it and warn caller */
m = lws_buflist_append_segment(&wsi->buflist_rxflow, buf + n, len - n);
+
if (m < 0)
return -1;
- if (m)
+ if (m) {
+ lwsl_debug("%s: added %p to rxflow list\n", __func__, wsi);
lws_dll_lws_add_front(&wsi->dll_rxflow, &pt->dll_head_rxflow);
+ }
return ret;
}
@@ -394,6 +400,43 @@ lws_read_or_use_preamble(struct lws_context_per_thread *pt, struct lws *wsi)
return len;
}
+void
+lws_service_do_ripe_rxflow(struct lws_context_per_thread *pt)
+{
+ struct lws_pollfd pfd;
+
+ if (!pt->dll_head_rxflow.next)
+ return;
+
+ /*
+ * service all guys with pending rxflow that reached a state they can
+ * accept the pending data
+ */
+
+ lws_pt_lock(pt, __func__);
+
+ lws_start_foreach_dll_safe(struct lws_dll_lws *, d, d1,
+ pt->dll_head_rxflow.next) {
+ struct lws *wsi = lws_container_of(d, struct lws, dll_rxflow);
+
+ pfd.events = POLLIN;
+ pfd.revents = POLLIN;
+ pfd.fd = -1;
+
+ lwsl_debug("%s: rxflow processing: %p 0x%x\n", __func__, wsi,
+ wsi->wsistate);
+
+ if (!lws_is_flowcontrolled(wsi) &&
+ lwsi_state(wsi) != LRS_DEFERRING_ACTION &&
+ (wsi->role_ops->handle_POLLIN)(pt, wsi, &pfd) ==
+ LWS_HPI_RET_CLOSE_HANDLED)
+ lws_close_free_wsi(wsi, LWS_CLOSE_STATUS_NOSTATUS,
+ "close_and_handled");
+
+ } lws_end_foreach_dll_safe(d, d1);
+
+ lws_pt_unlock(pt);
+}
/*
* guys that need POLLIN service again without waiting for network action
@@ -413,6 +456,20 @@ lws_service_flag_pending(struct lws_context *context, int tsi)
lws_pt_lock(pt, __func__);
+ /*
+ * 1) If there is any wsi with rxflow buffered and in a state to process
+ * it, we should not wait in poll
+ */
+
+ lws_start_foreach_dll(struct lws_dll_lws *, d, pt->dll_head_rxflow.next) {
+ struct lws *wsi = lws_container_of(d, struct lws, dll_rxflow);
+
+ if (lwsi_state(wsi) != LRS_DEFERRING_ACTION) {
+ forced = 1;
+ break;
+ }
+ } lws_end_foreach_dll(d);
+
#if defined(LWS_ROLE_WS)
forced |= role_ops_ws.service_flag_pending(context, tsi);
#endif
diff --git a/minimal-examples/http-server/minimal-http-server-tls/minimal-http-server-tls.c b/minimal-examples/http-server/minimal-http-server-tls/minimal-http-server-tls.c
index c6a29565..96dc19c7 100644
--- a/minimal-examples/http-server/minimal-http-server-tls/minimal-http-server-tls.c
+++ b/minimal-examples/http-server/minimal-http-server-tls/minimal-http-server-tls.c
@@ -59,7 +59,7 @@ int main(int argc, char **argv)
info.port = 7681;
info.mounts = &mount;
info.error_document_404 = "/404.html";
-
+ info.max_http_header_pool = 32;
info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
info.ssl_cert_filepath = "localhost-100y.cert";
info.ssl_private_key_filepath = "localhost-100y.key";
diff --git a/plugins/protocol_post_demo.c b/plugins/protocol_post_demo.c
index 4fab8e2d..37cb9745 100644
--- a/plugins/protocol_post_demo.c
+++ b/plugins/protocol_post_demo.c
@@ -139,7 +139,7 @@ callback_post_demo(struct lws *wsi, enum lws_callback_reasons reason,
break;
case LWS_CALLBACK_HTTP_BODY_COMPLETION:
- lwsl_debug("LWS_CALLBACK_HTTP_BODY_COMPLETION\n");
+ lwsl_debug("LWS_CALLBACK_HTTP_BODY_COMPLETION: %p\n", wsi);
/* call to inform no more payload data coming */
lws_spa_finalize(pss->spa);
@@ -194,12 +194,14 @@ callback_post_demo(struct lws *wsi, enum lws_callback_reasons reason,
break;
case LWS_CALLBACK_HTTP_WRITEABLE:
- if (!pss->result_len)
+ if (!pss->result_len) {
+ lwsl_debug("nothing in result_len\n");
break;
+ }
lwsl_debug("LWS_CALLBACK_HTTP_WRITEABLE: sending %d\n",
pss->result_len);
n = lws_write(wsi, (unsigned char *)pss->result + LWS_PRE,
- pss->result_len, LWS_WRITE_HTTP);
+ pss->result_len, LWS_WRITE_HTTP_FINAL);
if (n < 0)
return 1;
goto try_to_reuse;
--
GitLab