Skip to content
Snippets Groups Projects
Commit a95a7639 authored by Andy Green's avatar Andy Green
Browse files

threadpool: add return flag for outlive wsi

parent 64ea98f1
No related branches found
No related tags found
No related merge requests found
...@@ -58,7 +58,10 @@ enum lws_threadpool_task_return { ...@@ -58,7 +58,10 @@ enum lws_threadpool_task_return {
/** No more work to do... */ /** No more work to do... */
LWS_TP_RETURN_FINISHED, LWS_TP_RETURN_FINISHED,
/** Responding to request to stop */ /** Responding to request to stop */
LWS_TP_RETURN_STOPPED LWS_TP_RETURN_STOPPED,
/* OR on to indicate this task wishes to outlive its wsi */
LWS_TP_RETURN_FLAG_OUTLIVE = 64
}; };
struct lws_threadpool_create_args { struct lws_threadpool_create_args {
...@@ -70,6 +73,9 @@ struct lws_threadpool_task_args { ...@@ -70,6 +73,9 @@ struct lws_threadpool_task_args {
struct lws *wsi; /**< user must set to wsi task is bound to */ struct lws *wsi; /**< user must set to wsi task is bound to */
void *user; /**< user may set (user-private pointer) */ void *user; /**< user may set (user-private pointer) */
const char *name; /**< user may set to describe task */ const char *name; /**< user may set to describe task */
char async_task; /**< set to allow the task to shrug off the loss
of the associated wsi and continue to
completion */
enum lws_threadpool_task_return (*task)(void *user, enum lws_threadpool_task_return (*task)(void *user,
enum lws_threadpool_task_status s); enum lws_threadpool_task_status s);
/**< user must set to actual task function */ /**< user must set to actual task function */
......
...@@ -126,6 +126,13 @@ LWS_TP_RETURN_SYNC|Task wants to trigger a WRITABLE callback and block until lws ...@@ -126,6 +126,13 @@ LWS_TP_RETURN_SYNC|Task wants to trigger a WRITABLE callback and block until lws
LWS_TP_RETURN_FINISHED|Task has finished, successfully as far as it goes LWS_TP_RETURN_FINISHED|Task has finished, successfully as far as it goes
LWS_TP_RETURN_STOPPED|Task has finished, aborting in response to a request to stop LWS_TP_RETURN_STOPPED|Task has finished, aborting in response to a request to stop
The SYNC or CHECKING_IN return may also have a flag `LWS_TP_RETURN_FLAG_OUTLIVE`
applied to it, which indicates to threadpool that this task wishes to remain
unstopped after the wsi closes. This is useful in the case where the task
understands it will take a long time to complete, and wants to return a
complete status and maybe close the connection, perhaps with a token identifying
the task. The task can then be monitored separately by using the token.
#### Synchronizing #### Synchronizing
The task can choose to "SYNC" with the lws service thread, in other words The task can choose to "SYNC" with the lws service thread, in other words
......
...@@ -49,6 +49,7 @@ struct lws_threadpool_task { ...@@ -49,6 +49,7 @@ struct lws_threadpool_task {
int late_sync_retries; int late_sync_retries;
char wanted_writeable_cb; char wanted_writeable_cb;
char outlive;
}; };
struct lws_pool { struct lws_pool {
...@@ -538,17 +539,29 @@ lws_threadpool_worker(void *d) ...@@ -538,17 +539,29 @@ lws_threadpool_worker(void *d)
lws_usec_t then; lws_usec_t then;
int n; int n;
if (tp->destroying || !task->args.wsi) if (tp->destroying || !task->args.wsi) {
lwsl_info("%s: stopping on wsi gone\n", __func__);
state_transition(task, LWS_TP_STATUS_STOPPING); state_transition(task, LWS_TP_STATUS_STOPPING);
}
then = lws_now_usecs(); then = lws_now_usecs();
n = task->args.task(task->args.user, task->status); n = task->args.task(task->args.user, task->status);
lwsl_debug(" %d, status %d\n", n, task->status);
us_accrue(&task->acc_running, then); us_accrue(&task->acc_running, then);
switch (n) { if (n & LWS_TP_RETURN_FLAG_OUTLIVE)
task->outlive = 1;
switch (n & 7) {
case LWS_TP_RETURN_CHECKING_IN: case LWS_TP_RETURN_CHECKING_IN:
/* if not destroying the tp, continue */ /* if not destroying the tp, continue */
break; break;
case LWS_TP_RETURN_SYNC: case LWS_TP_RETURN_SYNC:
if (!task->args.wsi) {
lwsl_debug("%s: task that wants to "
"outlive lost wsi asked "
"to sync: bypassed\n",
__func__);
break;
}
/* block until writable acknowledges */ /* block until writable acknowledges */
then = lws_now_usecs(); then = lws_now_usecs();
lws_threadpool_worker_sync(pool, task); lws_threadpool_worker_sync(pool, task);
...@@ -777,6 +790,17 @@ lws_threadpool_dequeue(struct lws *wsi) ...@@ -777,6 +790,17 @@ lws_threadpool_dequeue(struct lws *wsi)
tp = task->tp; tp = task->tp;
pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */ pthread_mutex_lock(&tp->lock); /* ======================== tpool lock */
if (task->outlive && !tp->destroying) {
/* disconnect from wsi, and wsi from task */
wsi->tp_task = NULL;
task->args.wsi = NULL;
goto bail;
}
c = &tp->task_queue_head; c = &tp->task_queue_head;
/* is he queued waiting for a chance to run? Mark him as stopped and /* is he queued waiting for a chance to run? Mark him as stopped and
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment