Newer
Older
}
res = wait_for_completion(test, std3);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_for_completion(test, std4);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_for_empty_notice(test, tld);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_until_thread_state(test, tld, 0, 3);
if (res == AST_TEST_FAIL) {
goto end;
}
res = listener_check(test, listener, 1, 0, 4, 0, 3, 1);
ast_threadpool_shutdown(pool);
ao2_cleanup(listener);
ast_free(std1);
ast_free(std2);
ast_free(std3);
ast_free(std4);
ast_free(tld);
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
AST_TEST_DEFINE(threadpool_max_size)
{
struct ast_threadpool *pool = NULL;
struct ast_threadpool_listener *listener = NULL;
struct simple_task_data *std = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
struct test_listener_data *tld = NULL;
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
.auto_increment = 3,
.initial_size = 0,
.max_size = 2,
};
switch (cmd) {
case TEST_INIT:
info->name = "max_size";
info->category = "/main/threadpool/";
info->summary = "Test that the threadpool does not exceed its maximum size restriction";
info->description =
"Create an empty threadpool and push a task to it. Once the task is\n"
"pushed, the threadpool should attempt to grow by three threads, but the\n"
"pool's restrictions should only allow two threads to be added.";
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
tld = test_alloc();
if (!tld) {
return AST_TEST_FAIL;
}
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, &options);
if (!pool) {
goto end;
}
std = simple_task_data_alloc();
if (!std) {
goto end;
}
ast_threadpool_push(pool, simple_task, std);
res = wait_for_completion(test, std);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_until_thread_state(test, tld, 0, 2);
if (res == AST_TEST_FAIL) {
goto end;
}
res = listener_check(test, listener, 1, 1, 1, 0, 2, 1);
end:
ast_threadpool_shutdown(pool);
ao2_cleanup(listener);
ast_free(std);
ast_free(tld);
return res;
}
AST_TEST_DEFINE(threadpool_reactivation)
{
struct ast_threadpool *pool = NULL;
struct ast_threadpool_listener *listener = NULL;
struct simple_task_data *std1 = NULL;
struct simple_task_data *std2 = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
struct test_listener_data *tld = NULL;
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
.initial_size = 0,
switch (cmd) {
case TEST_INIT:
Mark Michelson
committed
info->name = "reactivation";
info->category = "/main/threadpool/";
info->summary = "Test that a threadpool reactivates when work is added";
info->description =
"Push a task into a threadpool. Make sure the task executes and the\n"
"thread goes idle. Then push a second task and ensure that the thread\n"
"awakens and executes the second task.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
tld = test_alloc();
if (!tld) {
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, &options);
if (!pool) {
goto end;
}
std1 = simple_task_data_alloc();
std2 = simple_task_data_alloc();
if (!std1 || !std2) {
goto end;
}
ast_threadpool_push(pool, simple_task, std1);
ast_threadpool_set_size(pool, 1);
res = wait_for_completion(test, std1);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_for_empty_notice(test, tld);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_until_thread_state(test, tld, 0, 1);
if (res == AST_TEST_FAIL) {
goto end;
}
res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
if (res == AST_TEST_FAIL) {
goto end;
}
/* Now make sure the threadpool reactivates when we add a second task */
ast_threadpool_push(pool, simple_task, std2);
res = wait_for_completion(test, std2);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_for_empty_notice(test, tld);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_until_thread_state(test, tld, 0, 1);
if (res == AST_TEST_FAIL) {
goto end;
}
res = listener_check(test, listener, 1, 1, 2, 0, 1, 1);
end:
ast_threadpool_shutdown(pool);
ao2_cleanup(listener);
ast_free(std1);
ast_free(std2);
ast_free(tld);
int task_executed;
int continue_task;
ast_mutex_t lock;
ast_cond_t stall_cond;
};
static struct complex_task_data *complex_task_data_alloc(void)
{
struct complex_task_data *ctd = ast_calloc(1, sizeof(*ctd));
if (!ctd) {
return NULL;
}
ast_mutex_init(&ctd->lock);
ast_cond_init(&ctd->stall_cond, NULL);
ast_cond_init(&ctd->notify_cond, NULL);
return ctd;
}
static int complex_task(void *data)
{
struct complex_task_data *ctd = data;
SCOPED_MUTEX(lock, &ctd->lock);
/* Notify that we started */
ctd->task_started = 1;
ast_cond_signal(&ctd->notify_cond);
while (!ctd->continue_task) {
ast_cond_wait(&ctd->stall_cond, lock);
}
/* We got poked. Finish up */
ctd->task_executed = 1;
ast_cond_signal(&ctd->notify_cond);
return 0;
}
static void poke_worker(struct complex_task_data *ctd)
{
SCOPED_MUTEX(lock, &ctd->lock);
ctd->continue_task = 1;
ast_cond_signal(&ctd->stall_cond);
}
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
static int wait_for_complex_start(struct complex_task_data *ctd)
{
struct timeval start = ast_tvnow();
struct timespec end = {
.tv_sec = start.tv_sec + 5,
.tv_nsec = start.tv_usec * 1000
};
SCOPED_MUTEX(lock, &ctd->lock);
while (!ctd->task_started) {
if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
break;
}
}
return ctd->task_started;
}
static int has_complex_started(struct complex_task_data *ctd)
{
struct timeval start = ast_tvnow();
struct timespec end = {
.tv_sec = start.tv_sec + 1,
.tv_nsec = start.tv_usec * 1000
};
SCOPED_MUTEX(lock, &ctd->lock);
while (!ctd->task_started) {
if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
break;
}
}
return ctd->task_started;
}
static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
{
struct timeval start = ast_tvnow();
struct timespec end = {
.tv_sec = start.tv_sec + 5,
.tv_nsec = start.tv_usec * 1000
};
enum ast_test_result_state res = AST_TEST_PASS;
SCOPED_MUTEX(lock, &ctd->lock);
while (!ctd->task_executed) {
if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
break;
}
}
if (!ctd->task_executed) {
res = AST_TEST_FAIL;
}
return res;
}
AST_TEST_DEFINE(threadpool_task_distribution)
{
struct ast_threadpool *pool = NULL;
struct ast_threadpool_listener *listener = NULL;
struct complex_task_data *ctd1 = NULL;
struct complex_task_data *ctd2 = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
struct test_listener_data *tld = NULL;
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
.initial_size = 0,
switch (cmd) {
case TEST_INIT:
Mark Michelson
committed
info->name = "task_distribution";
info->category = "/main/threadpool/";
info->summary = "Test that tasks are evenly distributed to threads";
info->description =
"Push two tasks into a threadpool. Ensure that each is handled by\n"
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
tld = test_alloc();
if (!tld) {
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, &options);
if (!pool) {
goto end;
}
ctd1 = complex_task_data_alloc();
ctd2 = complex_task_data_alloc();
if (!ctd1 || !ctd2) {
goto end;
}
ast_threadpool_push(pool, complex_task, ctd1);
ast_threadpool_push(pool, complex_task, ctd2);
ast_threadpool_set_size(pool, 2);
res = wait_until_thread_state(test, tld, 2, 0);
if (res == AST_TEST_FAIL) {
goto end;
}
res = listener_check(test, listener, 1, 0, 2, 2, 0, 0);
if (res == AST_TEST_FAIL) {
goto end;
}
/* The tasks are stalled until we poke them */
poke_worker(ctd1);
poke_worker(ctd2);
res = wait_for_complex_completion(ctd1);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_for_complex_completion(ctd2);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_until_thread_state(test, tld, 0, 2);
if (res == AST_TEST_FAIL) {
goto end;
}
res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
end:
ast_threadpool_shutdown(pool);
ao2_cleanup(listener);
ast_free(ctd1);
ast_free(ctd2);
ast_free(tld);
AST_TEST_DEFINE(threadpool_more_destruction)
{
struct ast_threadpool *pool = NULL;
struct ast_threadpool_listener *listener = NULL;
struct complex_task_data *ctd1 = NULL;
struct complex_task_data *ctd2 = NULL;
enum ast_test_result_state res = AST_TEST_FAIL;
struct test_listener_data *tld = NULL;
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
.initial_size = 0,
switch (cmd) {
case TEST_INIT:
Mark Michelson
committed
info->name = "more_destruction";
info->category = "/main/threadpool/";
info->summary = "Test that threads are destroyed as expected";
info->description =
"Push two tasks into a threadpool. Set the threadpool size to 4\n"
"Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
"threadpool down to 1 thread. Ensure that the thread leftover is active\n"
"and ensure that both tasks complete.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
tld = test_alloc();
if (!tld) {
listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
if (!listener) {
goto end;
}
pool = ast_threadpool_create(info->name, listener, &options);
if (!pool) {
goto end;
}
ctd1 = complex_task_data_alloc();
ctd2 = complex_task_data_alloc();
if (!ctd1 || !ctd2) {
goto end;
}
ast_threadpool_push(pool, complex_task, ctd1);
ast_threadpool_push(pool, complex_task, ctd2);
ast_threadpool_set_size(pool, 4);
res = wait_until_thread_state(test, tld, 2, 2);
if (res == AST_TEST_FAIL) {
goto end;
}
res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
if (res == AST_TEST_FAIL) {
goto end;
}
ast_threadpool_set_size(pool, 1);
/* Shrinking the threadpool should kill off the two idle threads
* and one of the active threads.
*/
res = wait_until_thread_state(test, tld, 1, 0);
if (res == AST_TEST_FAIL) {
goto end;
}
res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
if (res == AST_TEST_FAIL) {
goto end;
}
/* The tasks are stalled until we poke them */
poke_worker(ctd1);
poke_worker(ctd2);
res = wait_for_complex_completion(ctd1);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_for_complex_completion(ctd2);
if (res == AST_TEST_FAIL) {
goto end;
}
res = wait_until_thread_state(test, tld, 0, 1);
if (res == AST_TEST_FAIL) {
goto end;
}
res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
end:
ast_threadpool_shutdown(pool);
ao2_cleanup(listener);
ast_free(ctd1);
ast_free(ctd2);
ast_free(tld);
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
AST_TEST_DEFINE(threadpool_serializer)
{
int started = 0;
int finished = 0;
enum ast_test_result_state res = AST_TEST_FAIL;
struct ast_threadpool *pool = NULL;
struct ast_taskprocessor *uut = NULL;
struct complex_task_data *data1 = NULL;
struct complex_task_data *data2 = NULL;
struct complex_task_data *data3 = NULL;
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
.auto_increment = 0,
.initial_size = 2,
.max_size = 0,
};
switch (cmd) {
case TEST_INIT:
info->name = "threadpool_serializer";
info->category = "/main/threadpool/";
info->summary = "Test that serializers";
info->description =
"Ensures that tasks enqueued to a serialize execute in sequence.";
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
if (!pool) {
ast_test_status_update(test, "Could not create threadpool\n");
goto end;
}
uut = ast_threadpool_serializer("ser1", pool);
data1 = complex_task_data_alloc();
data2 = complex_task_data_alloc();
data3 = complex_task_data_alloc();
if (!uut || !data1 || !data2 || !data3) {
ast_test_status_update(test, "Allocation failed\n");
goto end;
}
/* This should start right away */
if (ast_taskprocessor_push(uut, complex_task, data1)) {
ast_test_status_update(test, "Failed to enqueue data1\n");
goto end;
}
started = wait_for_complex_start(data1);
if (!started) {
ast_test_status_update(test, "Failed to start data1\n");
goto end;
}
/* This should not start until data 1 is complete */
if (ast_taskprocessor_push(uut, complex_task, data2)) {
ast_test_status_update(test, "Failed to enqueue data2\n");
goto end;
}
started = has_complex_started(data2);
if (started) {
ast_test_status_update(test, "data2 started out of order\n");
goto end;
}
/* But the free thread in the pool can still run */
if (ast_threadpool_push(pool, complex_task, data3)) {
ast_test_status_update(test, "Failed to enqueue data3\n");
}
started = wait_for_complex_start(data3);
if (!started) {
ast_test_status_update(test, "Failed to start data3\n");
goto end;
}
/* Finishing data1 should allow data2 to start */
poke_worker(data1);
finished = wait_for_complex_completion(data1) == AST_TEST_PASS;
if (!finished) {
ast_test_status_update(test, "data1 couldn't finish\n");
goto end;
}
started = wait_for_complex_start(data2);
if (!started) {
ast_test_status_update(test, "Failed to start data2\n");
goto end;
}
/* Finish up */
poke_worker(data2);
finished = wait_for_complex_completion(data2) == AST_TEST_PASS;
if (!finished) {
ast_test_status_update(test, "data2 couldn't finish\n");
goto end;
}
poke_worker(data3);
finished = wait_for_complex_completion(data3) == AST_TEST_PASS;
if (!finished) {
ast_test_status_update(test, "data3 couldn't finish\n");
goto end;
}
res = AST_TEST_PASS;
end:
poke_worker(data1);
poke_worker(data2);
poke_worker(data3);
ast_taskprocessor_unreference(uut);
ast_threadpool_shutdown(pool);
ast_free(data1);
ast_free(data2);
ast_free(data3);
return res;
}
AST_TEST_DEFINE(threadpool_serializer_dupe)
{
enum ast_test_result_state res = AST_TEST_FAIL;
struct ast_threadpool *pool = NULL;
struct ast_taskprocessor *uut = NULL;
struct ast_taskprocessor *there_can_be_only_one = NULL;
struct ast_threadpool_options options = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.idle_timeout = 0,
.auto_increment = 0,
.initial_size = 2,
.max_size = 0,
};
switch (cmd) {
case TEST_INIT:
info->name = "threadpool_serializer_dupe";
info->category = "/main/threadpool/";
info->summary = "Test that serializers are uniquely named";
info->description =
"Creating two serializers with the same name should\n"
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
if (!pool) {
ast_test_status_update(test, "Could not create threadpool\n");
goto end;
}
uut = ast_threadpool_serializer("highlander", pool);
if (!uut) {
ast_test_status_update(test, "Allocation failed\n");
goto end;
}
there_can_be_only_one = ast_threadpool_serializer("highlander", pool);
if (there_can_be_only_one) {
ast_taskprocessor_unreference(there_can_be_only_one);
ast_test_status_update(test, "Duplicate name error\n");
goto end;
}
res = AST_TEST_PASS;
end:
ast_taskprocessor_unreference(uut);
ast_threadpool_shutdown(pool);
return res;
}
static int unload_module(void)
{
ast_test_unregister(threadpool_push);
ast_test_unregister(threadpool_initial_threads);
ast_test_unregister(threadpool_thread_creation);
ast_test_unregister(threadpool_thread_destruction);
ast_test_unregister(threadpool_thread_timeout);
ast_test_unregister(threadpool_thread_timeout_thrash);
ast_test_unregister(threadpool_one_task_one_thread);
ast_test_unregister(threadpool_one_thread_one_task);
ast_test_unregister(threadpool_one_thread_multiple_tasks);
ast_test_unregister(threadpool_auto_increment);
ast_test_unregister(threadpool_max_size);
ast_test_unregister(threadpool_reactivation);
ast_test_unregister(threadpool_task_distribution);
ast_test_unregister(threadpool_more_destruction);
ast_test_unregister(threadpool_serializer);
ast_test_unregister(threadpool_serializer_dupe);
return 0;
}
static int load_module(void)
{
ast_test_register(threadpool_push);
ast_test_register(threadpool_initial_threads);
ast_test_register(threadpool_thread_creation);
ast_test_register(threadpool_thread_destruction);
ast_test_register(threadpool_thread_timeout);
ast_test_register(threadpool_thread_timeout_thrash);
ast_test_register(threadpool_one_task_one_thread);
ast_test_register(threadpool_one_thread_one_task);
ast_test_register(threadpool_one_thread_multiple_tasks);
ast_test_register(threadpool_auto_increment);
ast_test_register(threadpool_max_size);
ast_test_register(threadpool_reactivation);
ast_test_register(threadpool_task_distribution);
ast_test_register(threadpool_more_destruction);
ast_test_register(threadpool_serializer);
ast_test_register(threadpool_serializer_dupe);
return AST_MODULE_LOAD_SUCCESS;
}
AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "threadpool test module");