From 039bd76882984ebada5f3e3801a5d6e51d74f172 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= Date: Tue, 11 Jun 2019 15:02:59 +0200 Subject: [PATCH 1/9] tests: ipc: avoid problems when UNIX_PATH_MAX (108) limits is hit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit There's some slight reserve for when bigger PID ranges are in use. The method to yield the limit on prefix string was derived from practical experience (rather than based on exact calculations). Signed-off-by: Jan Pokorný --- tests/check_ipc.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/check_ipc.c b/tests/check_ipc.c index 71b3a7fa5..2231efe1f 100644 --- a/tests/check_ipc.c +++ b/tests/check_ipc.c @@ -140,7 +140,7 @@ set_ipc_name(const char *prefix) t_sec[sizeof(t_sec) - 1] = '\0'; } - snprintf(ipc_name, sizeof(ipc_name), "%s%s%lX%.4x", prefix, t_sec, + snprintf(ipc_name, sizeof(ipc_name), "%.44s%s%lX%.4x", prefix, t_sec, (unsigned long)getpid(), (unsigned) ((long) time(NULL) % (0x10000))); } From 1dc71575dad1c8f62540eaaf4b38ecb2e3ab9065 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= Date: Tue, 11 Jun 2019 15:14:16 +0200 Subject: [PATCH 2/9] tests: ipc: speed the suite up with avoiding expendable sleep(3)s MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Using i7-6820HQ CPU yields these results: Before: ~2:54 After: ~2:26 Speedup: ~16% The main optimization lies in how run_function_in_new_process helper is constructed, since now, there's an actual synchronization between the parent and its child (that needs to be prioritized here, which is furthermore help with making the parent immediately give up it's processor possession) after the fork, so that a subsequent sleep is completely omitted -- at worst (unlikely), additional sleep round(s) will need to be undertaken as already arranged for (and now, just 400 ms is waited rather than excessive 1 second). Another slight optimization is likewise in omission of sleep where the control gets returned to once the waited for process has been suceesfully examined post-mortem, without worries it's previous life is still resounding. Signed-off-by: Jan Pokorný --- tests/check_ipc.c | 161 ++++++++++++++++++++++++++++------------------ 1 file changed, 99 insertions(+), 62 deletions(-) diff --git a/tests/check_ipc.c b/tests/check_ipc.c index 2231efe1f..37ef74dee 100644 --- a/tests/check_ipc.c +++ b/tests/check_ipc.c @@ -395,8 +395,30 @@ s1_connection_created(qb_ipcs_connection_t *c) } -static void -run_ipc_server(void) +static volatile sig_atomic_t usr1_bit; + +static void usr1_bit_setter(int signal) { + if (signal == SIGUSR1) { + usr1_bit = 1; + } +} + +#define READY_SIGNALLER(name, data_arg) void (name)(void *data_arg) +typedef READY_SIGNALLER(ready_signaller_fn, ); + +static +READY_SIGNALLER(usr1_signaller, parent_target) +{ + kill(*((pid_t *) parent_target), SIGUSR1); +} + +#define NEW_PROCESS_RUNNER(name, ready_signaller_arg, signaller_data_arg) \ + void (name)(ready_signaller_fn ready_signaller_arg, \ + void *signaller_data_arg) +typedef NEW_PROCESS_RUNNER(new_process_runner_fn, , ); + +static +NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data) { int32_t res; qb_loop_signal_handle handle; @@ -419,7 +441,7 @@ run_ipc_server(void) my_loop = qb_loop_create(); qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGTERM, - NULL, exit_handler, &handle); + NULL, exit_handler, &handle); s1 = qb_ipcs_create(ipc_name, 4, ipc_type, &sh); @@ -433,14 +455,35 @@ run_ipc_server(void) res = qb_ipcs_run(s1); ck_assert_int_eq(res, 0); + if (ready_signaller != NULL) { + ready_signaller(signaller_data); + } + qb_loop_run(my_loop); qb_log(LOG_DEBUG, "loop finished - done ..."); } static pid_t -run_function_in_new_process(void (*run_ipc_server_fn)(void)) +run_function_in_new_process(new_process_runner_fn new_process_runner) { - pid_t pid = fork (); + pid_t parent_target, pid; + + struct sigaction orig_sa, purpose_sa; + sigset_t orig_mask, purpose_mask, purpose_clear_mask; + + sigemptyset(&purpose_mask); + sigaddset(&purpose_mask, SIGUSR1); + + sigprocmask(SIG_BLOCK, &purpose_mask, &orig_mask); + purpose_clear_mask = orig_mask; + sigdelset(&purpose_clear_mask, SIGUSR1); + + purpose_sa.sa_handler = usr1_bit_setter; + purpose_sa.sa_mask = purpose_mask; + purpose_sa.sa_flags = SA_RESTART; + + parent_target = getpid(); + pid = fork(); if (pid == -1) { fprintf (stderr, "Can't fork\n"); @@ -448,9 +491,24 @@ run_function_in_new_process(void (*run_ipc_server_fn)(void)) } if (pid == 0) { - run_ipc_server_fn(); + sigprocmask(SIG_SETMASK, &orig_mask, NULL); + new_process_runner(usr1_signaller, &parent_target); exit(0); } + + usr1_bit = 0; + /* XXX assume never fails */ + sigaction(SIGUSR1, &purpose_sa, &orig_sa); + + do { + /* XXX assume never fails with EFAULT */ + sigsuspend(&purpose_clear_mask); + } while (usr1_bit != 1); + usr1_bit = 0; + sigprocmask(SIG_SETMASK, &orig_mask, NULL); + /* give children a slight/non-strict scheduling advantage */ + sched_yield(); + return pid; } @@ -600,14 +658,13 @@ test_ipc_txrx_timeout(void) pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -631,11 +688,6 @@ test_ipc_txrx_timeout(void) request_server_exit(); verify_graceful_stop(pid); - /* - * wait a bit for the server to die. - */ - sleep(1); - /* * this needs to free up the shared mem */ @@ -654,14 +706,13 @@ test_ipc_txrx(void) pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -705,14 +756,13 @@ test_ipc_exit(void) pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -732,11 +782,6 @@ test_ipc_exit(void) request_server_exit(); verify_graceful_stop(pid); - /* - * wait a bit for the server to die. - */ - sleep(1); - /* * this needs to free up the shared mem */ @@ -873,14 +918,13 @@ test_ipc_dispatch(void) pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -1001,7 +1045,6 @@ test_ipc_stress_connections(void) pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); - sleep(1); for (connections = 1; connections < 70000; connections++) { if (conn) { @@ -1049,14 +1092,13 @@ test_ipc_bulk_events(void) pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -1115,14 +1157,13 @@ test_ipc_stress_test(void) pid = run_function_in_new_process(run_ipc_server); enforce_server_buffer = 0; fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, client_buf_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -1217,14 +1258,13 @@ test_ipc_event_on_created(void) pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -1272,14 +1312,13 @@ test_ipc_disconnect_after_created(void) pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -1330,14 +1369,13 @@ test_ipc_server_fail(void) pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -1460,14 +1498,13 @@ test_ipc_service_ref_count(void) pid = run_function_in_new_process(run_ipc_server); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -1551,18 +1588,18 @@ make_shm_suite(void) TCase *tc; Suite *s = suite_create("shm"); - add_tcase(s, tc, test_ipc_txrx_shm_timeout, 30); - add_tcase(s, tc, test_ipc_server_fail_shm, 8); - add_tcase(s, tc, test_ipc_txrx_shm_block, 8); - add_tcase(s, tc, test_ipc_txrx_shm_tmo, 8); - add_tcase(s, tc, test_ipc_fc_shm, 8); - add_tcase(s, tc, test_ipc_dispatch_shm, 16); - add_tcase(s, tc, test_ipc_stress_test_shm, 16); - add_tcase(s, tc, test_ipc_bulk_events_shm, 16); - add_tcase(s, tc, test_ipc_exit_shm, 8); - add_tcase(s, tc, test_ipc_event_on_created_shm, 10); - add_tcase(s, tc, test_ipc_service_ref_count_shm, 10); - add_tcase(s, tc, test_ipc_stress_connections_shm, 3600); + add_tcase(s, tc, test_ipc_txrx_shm_timeout, 28); + add_tcase(s, tc, test_ipc_server_fail_shm, 7); + add_tcase(s, tc, test_ipc_txrx_shm_block, 7); + add_tcase(s, tc, test_ipc_txrx_shm_tmo, 7); + add_tcase(s, tc, test_ipc_fc_shm, 7); + add_tcase(s, tc, test_ipc_dispatch_shm, 15); + add_tcase(s, tc, test_ipc_stress_test_shm, 15); + add_tcase(s, tc, test_ipc_bulk_events_shm, 15); + add_tcase(s, tc, test_ipc_exit_shm, 6); + add_tcase(s, tc, test_ipc_event_on_created_shm, 9); + add_tcase(s, tc, test_ipc_service_ref_count_shm, 9); + add_tcase(s, tc, test_ipc_stress_connections_shm, 3600 /* ? */); #ifdef HAVE_FAILURE_INJECTION add_tcase(s, tc, test_ipcc_truncate_when_unlink_fails_shm, 8); @@ -1577,24 +1614,24 @@ make_soc_suite(void) Suite *s = suite_create("socket"); TCase *tc; - add_tcase(s, tc, test_ipc_txrx_us_timeout, 30); + add_tcase(s, tc, test_ipc_txrx_us_timeout, 28); /* Commented out for the moment as space in /dev/shm on the CI machines causes random failures */ /* add_tcase(s, tc, test_ipc_max_dgram_size, 30); */ - add_tcase(s, tc, test_ipc_server_fail_soc, 8); - add_tcase(s, tc, test_ipc_txrx_us_block, 8); - add_tcase(s, tc, test_ipc_txrx_us_tmo, 8); - add_tcase(s, tc, test_ipc_fc_us, 8); - add_tcase(s, tc, test_ipc_exit_us, 8); - add_tcase(s, tc, test_ipc_dispatch_us, 16); + add_tcase(s, tc, test_ipc_server_fail_soc, 7); + add_tcase(s, tc, test_ipc_txrx_us_block, 7); + add_tcase(s, tc, test_ipc_txrx_us_tmo, 7); + add_tcase(s, tc, test_ipc_fc_us, 7); + add_tcase(s, tc, test_ipc_exit_us, 6); + add_tcase(s, tc, test_ipc_dispatch_us, 15); #ifndef __clang__ /* see variable length array in structure' at the top */ - add_tcase(s, tc, test_ipc_stress_test_us, 60); + add_tcase(s, tc, test_ipc_stress_test_us, 58); #endif - add_tcase(s, tc, test_ipc_bulk_events_us, 16); - add_tcase(s, tc, test_ipc_event_on_created_us, 10); - add_tcase(s, tc, test_ipc_disconnect_after_created_us, 10); - add_tcase(s, tc, test_ipc_service_ref_count_us, 10); - add_tcase(s, tc, test_ipc_stress_connections_us, 3600); + add_tcase(s, tc, test_ipc_bulk_events_us, 15); + add_tcase(s, tc, test_ipc_event_on_created_us, 9); + add_tcase(s, tc, test_ipc_disconnect_after_created_us, 9); + add_tcase(s, tc, test_ipc_service_ref_count_us, 9); + add_tcase(s, tc, test_ipc_stress_connections_us, 3600 /* ? */); return s; } From 248010aa7164305a3abbd6ee9e09a503e4d4034f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= Date: Tue, 11 Jun 2019 15:41:50 +0200 Subject: [PATCH 3/9] tests: ipc: allow for easier tests debugging by discerning PIDs/roles MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Roles specifications are currently not applied and are rather a preparation for the actual meaningful use to come. Signed-off-by: Jan Pokorný --- tests/check_ipc.c | 35 +++++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/tests/check_ipc.c b/tests/check_ipc.c index 37ef74dee..852b8ca30 100644 --- a/tests/check_ipc.c +++ b/tests/check_ipc.c @@ -464,8 +464,10 @@ NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data) } static pid_t -run_function_in_new_process(new_process_runner_fn new_process_runner) +run_function_in_new_process(const char *role, + new_process_runner_fn new_process_runner) { + char formatbuf[1024]; pid_t parent_target, pid; struct sigaction orig_sa, purpose_sa; @@ -492,6 +494,15 @@ run_function_in_new_process(new_process_runner_fn new_process_runner) if (pid == 0) { sigprocmask(SIG_SETMASK, &orig_mask, NULL); + + if (role == NULL) { + qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l[%P] %b"); + } else { + snprintf(formatbuf, sizeof(formatbuf), + "lib/%%f|%%l|%s[%%P] %%b", role); + qb_log_format_set(QB_LOG_STDERR, formatbuf); + } + new_process_runner(usr1_signaller, &parent_target); exit(0); } @@ -656,7 +667,7 @@ test_ipc_txrx_timeout(void) pid_t pid; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server); fail_if(pid == -1); do { @@ -704,7 +715,7 @@ test_ipc_txrx(void) pid_t pid; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server); fail_if(pid == -1); do { @@ -754,7 +765,7 @@ test_ipc_exit(void) pid_t pid; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server); fail_if(pid == -1); do { @@ -916,7 +927,7 @@ test_ipc_dispatch(void) int32_t size; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server); fail_if(pid == -1); do { @@ -1043,7 +1054,7 @@ test_ipc_stress_connections(void) QB_LOG_FILTER_FILE, "*", LOG_INFO); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server); fail_if(pid == -1); for (connections = 1; connections < 70000; connections++) { @@ -1090,7 +1101,7 @@ test_ipc_bulk_events(void) int32_t fd; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server); fail_if(pid == -1); do { @@ -1154,7 +1165,7 @@ test_ipc_stress_test(void) int32_t real_buf_size; enforce_server_buffer = 1; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server); enforce_server_buffer = 0; fail_if(pid == -1); @@ -1256,7 +1267,7 @@ test_ipc_event_on_created(void) num_bulk_events = 1; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server); fail_if(pid == -1); do { @@ -1310,7 +1321,7 @@ test_ipc_disconnect_after_created(void) int32_t res; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server); fail_if(pid == -1); do { @@ -1367,7 +1378,7 @@ test_ipc_server_fail(void) pid_t pid; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server); fail_if(pid == -1); do { @@ -1496,7 +1507,7 @@ test_ipc_service_ref_count(void) reference_count_test = QB_TRUE; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server); fail_if(pid == -1); do { From 8c838db56650ca4b653e3060deeedea9e6c7329c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= Date: Thu, 23 May 2019 15:33:20 +0200 Subject: [PATCH 4/9] tests: ipc: refactor/split test_ipc_dispatch part into client_dispatch MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This way, this core part can be easily reused where needed. Note that "ready_signaller" similarity with run_ipc_server is not accidental, following commit will justify it. Signed-off-by: Jan Pokorný --- tests/check_ipc.c | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/tests/check_ipc.c b/tests/check_ipc.c index 852b8ca30..d75352f62 100644 --- a/tests/check_ipc.c +++ b/tests/check_ipc.c @@ -918,22 +918,19 @@ struct my_res { char message[1024 * 1024]; }; -static void -test_ipc_dispatch(void) +static inline +NEW_PROCESS_RUNNER(client_dispatch, ready_signaller, signaller_data) { - int32_t j; - int32_t c = 0; - pid_t pid; - int32_t size; uint32_t max_size = MAX_MSG_SIZE; - - pid = run_function_in_new_process("server", run_ipc_server); - fail_if(pid == -1); + int32_t size; + int32_t c = 0; + int32_t j; + pid_t server_pid = *((pid_t *) signaller_data); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { - j = waitpid(pid, NULL, WNOHANG); + j = waitpid(server_pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); poll(NULL, 0, 400); c++; @@ -941,16 +938,31 @@ test_ipc_dispatch(void) } while (conn == NULL && c < 5); fail_if(conn == NULL); + if (ready_signaller != NULL) { + ready_signaller(signaller_data); + } + size = QB_MIN(sizeof(struct qb_ipc_request_header), 64); for (j = 1; j < 19; j++) { size *= 2; if (size >= max_size) break; if (send_and_check(IPC_MSG_REQ_DISPATCH, size, - recv_timeout, QB_TRUE) < 0) { + recv_timeout, QB_TRUE) < 0) { break; } } +} + +static void +test_ipc_dispatch(void) +{ + pid_t pid; + + pid = run_function_in_new_process(NULL, run_ipc_server); + fail_if(pid == -1); + + client_dispatch(NULL, (void *) &pid); request_server_exit(); qb_ipcc_disconnect(conn); From 2aefe5318849394d4a49db41938d070cb5d198d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= Date: Tue, 11 Jun 2019 16:09:28 +0200 Subject: [PATCH 5/9] tests: ipc: check deadlock-like situation due to mixing priorities MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Compared to the outer world, libqb brings rather unintuitive approach to priorities within a native event loop (qbloop.h) -- it doesn't do an exhaustive high-to-low priorities in a batched (clean-the-level) manner, but rather linearly adds a possibility to pick the handling task from the higher priority level as opposed to lower priority ones. This has the advantage of limiting the chances of starvation and deadlock opportunities in the incorrectly constructed SW, on the other hand, it means that libqb is not fulfilling the architected intentions regarding what deserves a priority truthfully, so these priorities are worth just a hint rather than urgency-based separation. And consequently, a discovery of these deadlocks etc. is deferred to the (as Murphy's laws have it) least convenient moment, e.g., when said native event loop is exchanged for other (this time priority trully abiding, like GLib) implementation, while retaining the same basic notion and high-level handling of priorities on libqb side, in IPC server (service handling) context. Hence, demonstration of such a degenerate blocking is not trivial, and we must defer such other event loop implementation. After this hassle, we are rewarded with a practical proof said "high-level handling [...] in IPC server (service handling) context" contains a bug (which we are going to subsequently fix) -- this is contrasted with libqb's native loop implementation that works just fine even prior that fix. Signed-off-by: Jan Pokorný --- tests/Makefile.am | 5 + tests/check_ipc.c | 584 +++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 553 insertions(+), 36 deletions(-) diff --git a/tests/Makefile.am b/tests/Makefile.am index df1af81a7..da8f3a5b0 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -147,6 +147,11 @@ ipc_test_LDADD = $(top_builddir)/lib/libqb.la @CHECK_LIBS@ if HAVE_FAILURE_INJECTION ipc_test_LDADD += _failure_injection.la +if HAVE_GLIB +ipc_test_CFLAGS += $(GLIB_CFLAGS) +ipc_test_LDADD += $(GLIB_LIBS) +endif + check_LTLIBRARIES += _failure_injection.la _failure_injection_la_SOURCES = _failure_injection.c _failure_injection.h _failure_injection_la_LDFLAGS = -module diff --git a/tests/check_ipc.c b/tests/check_ipc.c index d75352f62..5ded7db54 100644 --- a/tests/check_ipc.c +++ b/tests/check_ipc.c @@ -24,6 +24,12 @@ #include "os_base.h" #include #include +#include +#include + +#ifdef HAVE_GLIB +#include +#endif #include "check_common.h" @@ -62,9 +68,12 @@ static const int MAX_MSG_SIZE = DEFAULT_MAX_MSG_SIZE; * this the largests msg we can successfully send. */ #define GIANT_MSG_DATA_SIZE MAX_MSG_SIZE - sizeof(struct qb_ipc_response_header) - 8 -static int enforce_server_buffer=0; +static int enforce_server_buffer; static qb_ipcc_connection_t *conn; static enum qb_ipc_type ipc_type; +static enum qb_loop_priority global_loop_prio = QB_LOOP_MED; +static bool global_use_glib; +static int global_pipefd[2]; enum my_msg_ids { IPC_MSG_REQ_TX_RX, @@ -75,12 +84,92 @@ enum my_msg_ids { IPC_MSG_RES_BULK_EVENTS, IPC_MSG_REQ_STRESS_EVENT, IPC_MSG_RES_STRESS_EVENT, + IPC_MSG_REQ_SELF_FEED, + IPC_MSG_RES_SELF_FEED, IPC_MSG_REQ_SERVER_FAIL, IPC_MSG_RES_SERVER_FAIL, IPC_MSG_REQ_SERVER_DISCONNECT, IPC_MSG_RES_SERVER_DISCONNECT, }; + +/* these 2 functions from pacemaker code */ +static enum qb_ipcs_rate_limit +conv_libqb_prio2ratelimit(enum qb_loop_priority prio) +{ + /* this is an inversion of what libqb's qb_ipcs_request_rate_limit does */ + enum qb_ipcs_rate_limit ret = QB_IPCS_RATE_NORMAL; + switch (prio) { + case QB_LOOP_LOW: + ret = QB_IPCS_RATE_SLOW; + break; + case QB_LOOP_HIGH: + ret = QB_IPCS_RATE_FAST; + break; + default: + qb_log(LOG_DEBUG, "Invalid libqb's loop priority %d," + " assuming QB_LOOP_MED", prio); + /* fall-through */ + case QB_LOOP_MED: + break; + } + return ret; +} +#ifdef HAVE_GLIB +static gint +conv_prio_libqb2glib(enum qb_loop_priority prio) +{ + gint ret = G_PRIORITY_DEFAULT; + switch (prio) { + case QB_LOOP_LOW: + ret = G_PRIORITY_LOW; + break; + case QB_LOOP_HIGH: + ret = G_PRIORITY_HIGH; + break; + default: + qb_log(LOG_DEBUG, "Invalid libqb's loop priority %d," + " assuming QB_LOOP_MED", prio); + /* fall-through */ + case QB_LOOP_MED: + break; + } + return ret; +} + +/* these 3 glue functions inspired from pacemaker, too */ +static gboolean +gio_source_prepare(GSource *source, gint *timeout) +{ + qb_enter(); + *timeout = 500; + return FALSE; +} +static gboolean +gio_source_check(GSource *source) +{ + qb_enter(); + return TRUE; +} +static gboolean +gio_source_dispatch(GSource *source, GSourceFunc callback, gpointer user_data) +{ + gboolean ret = G_SOURCE_CONTINUE; + qb_enter(); + if (callback) { + ret = callback(user_data); + } + return ret; +} +static GSourceFuncs gio_source_funcs = { + .prepare = gio_source_prepare, + .check = gio_source_check, + .dispatch = gio_source_dispatch, +}; + +#endif + + /* Test Cases * * 1) basic send & recv different message sizes @@ -144,6 +233,61 @@ set_ipc_name(const char *prefix) (unsigned long)getpid(), (unsigned) ((long) time(NULL) % (0x10000))); } +static int +pipe_writer(int fd, int revents, void *data) { + qb_enter(); + static const char buf[8] = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h' }; + + ssize_t wbytes = 0, wbytes_sum = 0; + + //for (size_t i = 0; i < SIZE_MAX; i++) { + for (size_t i = 0; i < 4096; i++) { + wbytes_sum += wbytes; + if ((wbytes = write(fd, buf, sizeof(buf))) == -1) { + if (errno != EAGAIN) { + perror("write"); + exit(-1); + } + break; + } + } + if (wbytes_sum > 0) { + qb_log(LOG_DEBUG, "written %zd bytes", wbytes_sum); + } + qb_leave(); + return 1; +} + +static int +pipe_reader(int fd, int revents, void *data) { + qb_enter(); + ssize_t rbytes, rbytes_sum = 0; + size_t cnt = SIZE_MAX; + char buf[4096] = { '\0' }; + while ((rbytes = read(fd, buf, sizeof(buf))) > 0 && rbytes < cnt) { + cnt -= rbytes; + rbytes_sum += rbytes; + } + if (rbytes_sum > 0) { + fail_if(buf[0] == '\0'); /* avoid dead store elimination */ + qb_log(LOG_DEBUG, "read %zd bytes", rbytes_sum); + sleep(1); + } + qb_leave(); + return 1; +} + +#if HAVE_GLIB +static gboolean +gio_pipe_reader(void *data) { + return (pipe_reader(*((int *) data), 0, NULL) > 0); +} +static gboolean +gio_pipe_writer(void *data) { + return (pipe_writer(*((int *) data), 0, NULL) > 0); +} +#endif + static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c, void *data, size_t size) @@ -264,6 +408,39 @@ s1_msg_process_fn(qb_ipcs_connection_t *c, giant_event_send.hdr.id++; } + } else if (req_pt->id == IPC_MSG_REQ_SELF_FEED) { + if (pipe(global_pipefd) != 0) { + perror("pipefd"); + fail_if(1); + } + fcntl(global_pipefd[0], F_SETFL, O_NONBLOCK); + fcntl(global_pipefd[1], F_SETFL, O_NONBLOCK); + if (global_use_glib) { +#ifdef HAVE_GLIB + GSource *source_r, *source_w; + source_r = g_source_new(&gio_source_funcs, sizeof(GSource)); + source_w = g_source_new(&gio_source_funcs, sizeof(GSource)); + fail_if(source_r == NULL || source_w == NULL); + g_source_set_priority(source_r, conv_prio_libqb2glib(QB_LOOP_HIGH)); + g_source_set_priority(source_w, conv_prio_libqb2glib(QB_LOOP_HIGH)); + g_source_set_can_recurse(source_r, FALSE); + g_source_set_can_recurse(source_w, FALSE); + g_source_set_callback(source_r, gio_pipe_reader, &global_pipefd[0], NULL); + g_source_set_callback(source_w, gio_pipe_writer, &global_pipefd[1], NULL); + g_source_add_unix_fd(source_r, global_pipefd[0], G_IO_IN); + g_source_add_unix_fd(source_w, global_pipefd[1], G_IO_OUT); + g_source_attach(source_r, NULL); + g_source_attach(source_w, NULL); +#else + fail_if(1); +#endif + } else { + qb_loop_poll_add(my_loop, QB_LOOP_HIGH, global_pipefd[1], + POLLOUT|POLLERR, NULL, pipe_writer); + qb_loop_poll_add(my_loop, QB_LOOP_HIGH, global_pipefd[0], + POLLIN|POLLERR, NULL, pipe_reader); + } + } else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) { exit(0); } else if (req_pt->id == IPC_MSG_REQ_SERVER_DISCONNECT) { @@ -301,6 +478,122 @@ my_dispatch_del(int32_t fd) return qb_loop_poll_del(my_loop, fd); } + +/* taken from examples/ipcserver.c, with s/my_g/gio/ */ +#ifdef HAVE_GLIB + +#include + +static qb_array_t *gio_map; +static GMainLoop *glib_loop; + +struct gio_to_qb_poll { + int32_t is_used; + int32_t events; + int32_t source; + int32_t fd; + void *data; + qb_ipcs_dispatch_fn_t fn; + enum qb_loop_priority p; +}; + +static gboolean +gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data) +{ + struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; + gint fd = g_io_channel_unix_get_fd(gio); + + qb_enter(); + + return (adaptor->fn(fd, condition, adaptor->data) == 0); +} + +static void +gio_poll_destroy(gpointer data) +{ + struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; + + adaptor->is_used--; + if (adaptor->is_used == 0) { + qb_log(LOG_DEBUG, "fd %d adaptor destroyed\n", adaptor->fd); + adaptor->fd = 0; + adaptor->source = 0; + } +} + +static int32_t +gio_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts, + void *data, qb_ipcs_dispatch_fn_t fn, gboolean is_new) +{ + struct gio_to_qb_poll *adaptor; + GIOChannel *channel; + int32_t res = 0; + + qb_enter(); + + res = qb_array_index(gio_map, fd, (void **)&adaptor); + if (res < 0) { + return res; + } + if (adaptor->is_used && adaptor->source) { + if (is_new) { + return -EEXIST; + } + g_source_remove(adaptor->source); + adaptor->source = 0; + } + + channel = g_io_channel_unix_new(fd); + if (!channel) { + return -ENOMEM; + } + + adaptor->fn = fn; + adaptor->events = evts; + adaptor->data = data; + adaptor->p = p; + adaptor->is_used++; + adaptor->fd = fd; + + adaptor->source = g_io_add_watch_full(channel, conv_prio_libqb2glib(p), + evts, gio_read_socket, adaptor, + gio_poll_destroy); + + /* we are handing the channel off to be managed by mainloop now. + * remove our reference. */ + g_io_channel_unref(channel); + + return 0; +} + +static int32_t +gio_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts, + void *data, qb_ipcs_dispatch_fn_t fn) +{ + return gio_dispatch_update(p, fd, evts, data, fn, TRUE); +} + +static int32_t +gio_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts, + void *data, qb_ipcs_dispatch_fn_t fn) +{ + return gio_dispatch_update(p, fd, evts, data, fn, FALSE); +} + +static int32_t +gio_dispatch_del(int32_t fd) +{ + struct gio_to_qb_poll *adaptor; + if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) { + g_source_remove(adaptor->source); + adaptor->source = 0; + } + return 0; +} + +#endif /* HAVE_GLIB */ + + static int32_t s1_connection_closed(qb_ipcs_connection_t *c) { @@ -412,13 +705,13 @@ READY_SIGNALLER(usr1_signaller, parent_target) kill(*((pid_t *) parent_target), SIGUSR1); } -#define NEW_PROCESS_RUNNER(name, ready_signaller_arg, signaller_data_arg) \ +#define NEW_PROCESS_RUNNER(name, ready_signaller_arg, signaller_data_arg, data_arg) \ void (name)(ready_signaller_fn ready_signaller_arg, \ - void *signaller_data_arg) -typedef NEW_PROCESS_RUNNER(new_process_runner_fn, , ); + void *signaller_data_arg, void *data_arg) +typedef NEW_PROCESS_RUNNER(new_process_runner_fn, , , ); static -NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data) +NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data, data) { int32_t res; qb_loop_signal_handle handle; @@ -431,12 +724,7 @@ NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data) .connection_closed = s1_connection_closed, }; - struct qb_ipcs_poll_handlers ph = { - .job_add = my_job_add, - .dispatch_add = my_dispatch_add, - .dispatch_mod = my_dispatch_mod, - .dispatch_del = my_dispatch_del, - }; + struct qb_ipcs_poll_handlers ph; uint32_t max_size = MAX_MSG_SIZE; my_loop = qb_loop_create(); @@ -447,6 +735,33 @@ NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data) s1 = qb_ipcs_create(ipc_name, 4, ipc_type, &sh); fail_if(s1 == 0); + if (global_loop_prio != QB_LOOP_MED) { + qb_ipcs_request_rate_limit(s1, + conv_libqb_prio2ratelimit(global_loop_prio)); + } + if (global_use_glib) { +#ifdef HAVE_GLIB + ph = (struct qb_ipcs_poll_handlers) { + .job_add = NULL, + .dispatch_add = gio_dispatch_add, + .dispatch_mod = gio_dispatch_mod, + .dispatch_del = gio_dispatch_del, + }; + glib_loop = g_main_loop_new(NULL, FALSE); + gio_map = qb_array_create_2(16, sizeof(struct gio_to_qb_poll), 1); + fail_if (gio_map == NULL); +#else + fail_if(1); +#endif + } else { + ph = (struct qb_ipcs_poll_handlers) { + .job_add = my_job_add, + .dispatch_add = my_dispatch_add, + .dispatch_mod = my_dispatch_mod, + .dispatch_del = my_dispatch_del, + }; + } + if (enforce_server_buffer) { qb_ipcs_enforce_buffer_size(s1, max_size); } @@ -459,13 +774,20 @@ NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data) ready_signaller(signaller_data); } - qb_loop_run(my_loop); + if (global_use_glib) { +#ifdef HAVE_GLIB + g_main_loop_run(glib_loop); +#endif + } else { + qb_loop_run(my_loop); + } qb_log(LOG_DEBUG, "loop finished - done ..."); } static pid_t run_function_in_new_process(const char *role, - new_process_runner_fn new_process_runner) + new_process_runner_fn new_process_runner, + void *data) { char formatbuf[1024]; pid_t parent_target, pid; @@ -503,7 +825,7 @@ run_function_in_new_process(const char *role, qb_log_format_set(QB_LOG_STDERR, formatbuf); } - new_process_runner(usr1_signaller, &parent_target); + new_process_runner(usr1_signaller, &parent_target, data); exit(0); } @@ -667,7 +989,7 @@ test_ipc_txrx_timeout(void) pid_t pid; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process("server", run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); do { @@ -715,7 +1037,7 @@ test_ipc_txrx(void) pid_t pid; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process("server", run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); do { @@ -765,7 +1087,7 @@ test_ipc_exit(void) pid_t pid; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process("server", run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); do { @@ -918,14 +1240,21 @@ struct my_res { char message[1024 * 1024]; }; +struct dispatch_data { + pid_t server_pid; + enum my_msg_ids msg_type; + uint32_t repetitions; +}; + static inline -NEW_PROCESS_RUNNER(client_dispatch, ready_signaller, signaller_data) +NEW_PROCESS_RUNNER(client_dispatch, ready_signaller, signaller_data, data) { uint32_t max_size = MAX_MSG_SIZE; int32_t size; int32_t c = 0; int32_t j; - pid_t server_pid = *((pid_t *) signaller_data); + pid_t server_pid = ((struct dispatch_data *) data)->server_pid; + enum my_msg_ids msg_type = ((struct dispatch_data *) data)->msg_type; do { conn = qb_ipcc_connect(ipc_name, max_size); @@ -943,13 +1272,17 @@ NEW_PROCESS_RUNNER(client_dispatch, ready_signaller, signaller_data) } size = QB_MIN(sizeof(struct qb_ipc_request_header), 64); - for (j = 1; j < 19; j++) { - size *= 2; - if (size >= max_size) - break; - if (send_and_check(IPC_MSG_REQ_DISPATCH, size, - recv_timeout, QB_TRUE) < 0) { - break; + + for (uint32_t r = ((struct dispatch_data *) data)->repetitions; + r > 0; r--) { + for (j = 1; j < 19; j++) { + size *= 2; + if (size >= max_size) + break; + if (send_and_check(msg_type, size, + recv_timeout, QB_TRUE) < 0) { + break; + } } } } @@ -958,11 +1291,15 @@ static void test_ipc_dispatch(void) { pid_t pid; + struct dispatch_data data; - pid = run_function_in_new_process(NULL, run_ipc_server); + pid = run_function_in_new_process(NULL, run_ipc_server, NULL); fail_if(pid == -1); + data = (struct dispatch_data){.server_pid = pid, + .msg_type = IPC_MSG_REQ_DISPATCH, + .repetitions = 1}; - client_dispatch(NULL, (void *) &pid); + client_dispatch(NULL, NULL, (void *) &data); request_server_exit(); qb_ipcc_disconnect(conn); @@ -1066,7 +1403,7 @@ test_ipc_stress_connections(void) QB_LOG_FILTER_FILE, "*", LOG_INFO); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); - pid = run_function_in_new_process("server", run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); for (connections = 1; connections < 70000; connections++) { @@ -1113,7 +1450,7 @@ test_ipc_bulk_events(void) int32_t fd; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process("server", run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); do { @@ -1177,7 +1514,7 @@ test_ipc_stress_test(void) int32_t real_buf_size; enforce_server_buffer = 1; - pid = run_function_in_new_process("server", run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); enforce_server_buffer = 0; fail_if(pid == -1); @@ -1266,6 +1603,93 @@ START_TEST(test_ipc_bulk_events_us) } END_TEST +static +READY_SIGNALLER(connected_signaller, _) +{ + request_server_exit(); +} + +START_TEST(test_ipc_dispatch_us_native_prio_deadlock_provoke) +{ + pid_t server_pid, alphaclient_pid; + struct dispatch_data data; + + qb_enter(); + ipc_type = QB_IPC_SOCKET; + set_ipc_name(__func__); + + /* this is to demonstrate that native event loop can deal even + with "extreme" priority disproportions */ + global_loop_prio = QB_LOOP_LOW; + multiple_connections = QB_TRUE; + recv_timeout = -1; + + server_pid = run_function_in_new_process("server", run_ipc_server, + NULL); + fail_if(server_pid == -1); + data = (struct dispatch_data){.server_pid = server_pid, + .msg_type = IPC_MSG_REQ_SELF_FEED, + .repetitions = 1}; + alphaclient_pid = run_function_in_new_process("alphaclient", + client_dispatch, + (void *) &data); + fail_if(alphaclient_pid == -1); + + //sleep(1); + sched_yield(); + + data.repetitions = 0; + client_dispatch(connected_signaller, NULL, (void *) &data); + verify_graceful_stop(server_pid); + + multiple_connections = QB_FALSE; + qb_leave(); +} +END_TEST + +#if HAVE_GLIB +START_TEST(test_ipc_dispatch_us_glib_prio_deadlock_provoke) +{ + pid_t server_pid, alphaclient_pid; + struct dispatch_data data; + + qb_enter(); + ipc_type = QB_IPC_SOCKET; + set_ipc_name(__func__); + + global_use_glib = QB_TRUE; + /* this is to make the test pass at all, since GLib is strict + on priorities -- QB_LOOP_MED or lower would fail for sure */ + global_loop_prio = QB_LOOP_HIGH; + multiple_connections = QB_TRUE; + recv_timeout = -1; + + server_pid = run_function_in_new_process("server", run_ipc_server, + NULL); + fail_if(server_pid == -1); + data = (struct dispatch_data){.server_pid = server_pid, + .msg_type = IPC_MSG_REQ_SELF_FEED, + .repetitions = 1}; + alphaclient_pid = run_function_in_new_process("alphaclient", + client_dispatch, + (void *) &data); + fail_if(alphaclient_pid == -1); + + //sleep(1); + sched_yield(); + + data.repetitions = 0; + client_dispatch(connected_signaller, NULL, (void *) &data); + verify_graceful_stop(server_pid); + + multiple_connections = QB_FALSE; + global_loop_prio = QB_LOOP_MED; + global_use_glib = QB_FALSE; + qb_leave(); +} +END_TEST +#endif + static void test_ipc_event_on_created(void) { @@ -1279,7 +1703,7 @@ test_ipc_event_on_created(void) num_bulk_events = 1; - pid = run_function_in_new_process("server", run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); do { @@ -1333,7 +1757,7 @@ test_ipc_disconnect_after_created(void) int32_t res; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process("server", run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); do { @@ -1390,7 +1814,7 @@ test_ipc_server_fail(void) pid_t pid; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process("server", run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); do { @@ -1457,6 +1881,87 @@ START_TEST(test_ipc_stress_connections_shm) } END_TEST +START_TEST(test_ipc_dispatch_shm_native_prio_deadlock_provoke) +{ + pid_t server_pid, alphaclient_pid; + struct dispatch_data data; + + qb_enter(); + ipc_type = QB_IPC_SHM; + set_ipc_name(__func__); + + /* this is to demonstrate that native event loop can deal even + with "extreme" priority disproportions */ + global_loop_prio = QB_LOOP_LOW; + multiple_connections = QB_TRUE; + recv_timeout = -1; + + server_pid = run_function_in_new_process("server", run_ipc_server, + NULL); + fail_if(server_pid == -1); + data = (struct dispatch_data){.server_pid = server_pid, + .msg_type = IPC_MSG_REQ_SELF_FEED, + .repetitions = 1}; + alphaclient_pid = run_function_in_new_process("alphaclient", + client_dispatch, + (void *) &data); + fail_if(alphaclient_pid == -1); + + //sleep(1); + sched_yield(); + + data.repetitions = 0; + client_dispatch(connected_signaller, NULL, (void *) &data); + verify_graceful_stop(server_pid); + + multiple_connections = QB_FALSE; + qb_leave(); +} +END_TEST + +#if HAVE_GLIB +START_TEST(test_ipc_dispatch_shm_glib_prio_deadlock_provoke) +{ + pid_t server_pid, alphaclient_pid; + struct dispatch_data data; + + qb_enter(); + ipc_type = QB_IPC_SOCKET; + set_ipc_name(__func__); + + global_use_glib = QB_TRUE; + /* this is to make the test pass at all, since GLib is strict + on priorities -- QB_LOOP_MED or lower would fail for sure */ + global_loop_prio = QB_LOOP_HIGH; + multiple_connections = QB_TRUE; + recv_timeout = -1; + + server_pid = run_function_in_new_process("server", run_ipc_server, + NULL); + fail_if(server_pid == -1); + data = (struct dispatch_data){.server_pid = server_pid, + .msg_type = IPC_MSG_REQ_SELF_FEED, + .repetitions = 1}; + alphaclient_pid = run_function_in_new_process("alphaclient", + client_dispatch, + (void *) &data); + fail_if(alphaclient_pid == -1); + + //sleep(1); + sched_yield(); + + data.repetitions = 0; + client_dispatch(connected_signaller, NULL, (void *) &data); + verify_graceful_stop(server_pid); + + multiple_connections = QB_FALSE; + global_loop_prio = QB_LOOP_MED; + global_use_glib = QB_FALSE; + qb_leave(); +} +END_TEST +#endif + START_TEST(test_ipc_bulk_events_shm) { qb_enter(); @@ -1519,7 +2024,7 @@ test_ipc_service_ref_count(void) reference_count_test = QB_TRUE; - pid = run_function_in_new_process("server", run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); do { @@ -1623,7 +2128,10 @@ make_shm_suite(void) add_tcase(s, tc, test_ipc_event_on_created_shm, 9); add_tcase(s, tc, test_ipc_service_ref_count_shm, 9); add_tcase(s, tc, test_ipc_stress_connections_shm, 3600 /* ? */); - + add_tcase(s, tc, test_ipc_dispatch_shm_native_prio_deadlock_provoke, 15); +#if HAVE_GLIB + add_tcase(s, tc, test_ipc_dispatch_shm_glib_prio_deadlock_provoke, 15); +#endif #ifdef HAVE_FAILURE_INJECTION add_tcase(s, tc, test_ipcc_truncate_when_unlink_fails_shm, 8); #endif @@ -1655,6 +2163,10 @@ make_soc_suite(void) add_tcase(s, tc, test_ipc_disconnect_after_created_us, 9); add_tcase(s, tc, test_ipc_service_ref_count_us, 9); add_tcase(s, tc, test_ipc_stress_connections_us, 3600 /* ? */); + add_tcase(s, tc, test_ipc_dispatch_us_native_prio_deadlock_provoke, 15); +#if HAVE_GLIB + add_tcase(s, tc, test_ipc_dispatch_us_glib_prio_deadlock_provoke, 15); +#endif return s; } From 6f6be63800b395506b3d3c0f1f2fc1093b4f863d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= Date: Fri, 24 May 2019 22:18:55 +0200 Subject: [PATCH 6/9] IPC: server: avoid temporary channel priority loss, up to deadlock-worth MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It turns out that while 7f56f58 allowed for less blocking (thus throughput increasing) initial handling of connections from clients within the abstract (out-of-libqb managed) event loop, it unfortunately subscribes itself back to such polling mechanism for UNIX-socket-check with a default priority, which can be lower than desired (via explicit qb_ipcs_request_rate_limit() configuration) for particular channel (amongst attention-competing siblings in the pool, the term here refers to associated communication, that is, both server and on-server abstraction for particular clients). And priority-based discrepancies are not forgiven in true priority abiding systems (that is, unlikele with libqb's native event loop harness as detailed in the previous commit, for which this would be soft-torelated hence the problem would not be spotted in the first place -- but that's expliicitly excluded from further discussion). On top of that, it violates the natural assumption that once (single threaded, which is imposed by libqb, at least between initial accept() and after-said-UNIX-socket-check) server accepts the connection, it shall rather take care of serving it (at least within stated initial scope of client connection life cycle) rather than be rushing to accept new ones -- which is exactly what used to happen previously once the library user set the effectively priority in the abstract poll above the default one. It's conceivable, just as with the former case of attention-competing siblings with higher priority whereby they could _infinitely_ live on at the expense of starving the client in the initial handling phase (authentication) despite the library user's as-high-as-siblings intention (for using the default priority for that unconditionally instead, which we address here), the dead lock is imminent also in this latter accept-to-client-authentication-handling case as well if there's an _unlimited_ fast-paced arrival queue (well, limited by with number of allowable open descriptors within the system, but for the Linux built-in maximum of 1M, there may be no practical difference, at least for time-sensitive applications). The only hope then is that such dead-locks are rather theoretical, since a "spontaneous" constant stream of either communication on unrelated, higher-prio sibling channels, or of new connection arrivals can as well testify the poor design of the libqb's IPC application. That being said, unconditional default priority in the isolated context of initial server-side client authentication is clearly a bug, but such application shall apply appropriate rate-limiting measures (exactly on priority basis) to handle unexpected flux nonetheless. The fix makes test_ipc_dispatch_*_glib_prio_deadlock_provoke tests pass. Signed-off-by: Jan Pokorný --- lib/ipc_setup.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/ipc_setup.c b/lib/ipc_setup.c index 3f53c4b06..9aea41163 100644 --- a/lib/ipc_setup.c +++ b/lib/ipc_setup.c @@ -843,10 +843,10 @@ qb_ipcs_uc_recv_and_auth(int32_t sock, struct qb_ipcs_service *s) setsockopt(sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on)); #endif - res = s->poll_fns.dispatch_add(QB_LOOP_MED, - data->sock, - POLLIN | POLLPRI | POLLNVAL, - data, process_auth); + res = s->poll_fns.dispatch_add(s->poll_priority, + data->sock, + POLLIN | POLLPRI | POLLNVAL, + data, process_auth); if (res < 0) { qb_util_log(LOG_DEBUG, "Failed to process AUTH for fd (%d)", data->sock); close(sock); From 87c90ee8a969472c638c83cddd4685d02a284265 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= Date: Thu, 16 May 2019 18:59:54 +0200 Subject: [PATCH 7/9] IPC: server: fix debug message wrt. what actually went wrong MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It's misleading towards a random code observer, at least, hiding the fact that what failed is actually the queing up of some handling to perform asynchronously in the future, rather than invoking it synchronously right away. Signed-off-by: Jan Pokorný --- lib/ipc_setup.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/ipc_setup.c b/lib/ipc_setup.c index 9aea41163..7cd1fd91e 100644 --- a/lib/ipc_setup.c +++ b/lib/ipc_setup.c @@ -848,7 +848,8 @@ qb_ipcs_uc_recv_and_auth(int32_t sock, struct qb_ipcs_service *s) POLLIN | POLLPRI | POLLNVAL, data, process_auth); if (res < 0) { - qb_util_log(LOG_DEBUG, "Failed to process AUTH for fd (%d)", data->sock); + qb_util_log(LOG_DEBUG, "Failed to arrange for AUTH for fd (%d)", + data->sock); close(sock); destroy_ipc_auth_data(data); } From e2d5be4d1e881fc118d00fe109cf8aff669c00c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= Date: Fri, 24 May 2019 14:52:09 +0200 Subject: [PATCH 8/9] doc: qbloop.h: document pros/cons of using built-in event loop impl MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make the qbipcs.h module interdependence clear (also shedding light to some semantic dependencies) as well. Signed-off-by: Jan Pokorný --- include/qb/qbipcs.h | 22 ++++++++++++++++++++++ include/qb/qbloop.h | 20 ++++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h index 55c0f8151..7cb858662 100644 --- a/include/qb/qbipcs.h +++ b/include/qb/qbipcs.h @@ -43,6 +43,12 @@ extern "C" { * @example ipcserver.c */ +/** + * Rates to be passed to #qb_ipcs_request_rate_limit. The exact interpretation + * depends on how the event loop implementation understands the concept of + * priorities, see the discussion at #qb_ipcs_poll_handlers structure -- an + * integration point between IPC server instance and the underlying event loop. + */ enum qb_ipcs_rate_limit { QB_IPCS_RATE_FAST, QB_IPCS_RATE_NORMAL, @@ -104,6 +110,22 @@ typedef int32_t (*qb_ipcs_job_add_fn)(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn dispatch_fn); +/** + * A set of callbacks that need to be provided (only #job_add can be #NULL) + * whenever the IPC server is to be run (by the means of #qb_ipcs_run). + * It is possible to use accordingly named functions defined in qbloop.h module + * or integrate with other existing (like GLib's event loop) or entirely new + * code -- see the subtle distinction amongst the possible event loops pointed + * out in the introductory comment at qbloop.h. + * + * At that occasion, please note the correlation of #QB_IPCS_RATE_FAST etc. + * symbolic names with said advisory effect of the priorities in the native + * implementation. This correspondence will not be this intuitively seemless + * if some other event loop implementation is hooked in given that it abids + * them strictly as mentioned (e.g. GLib's event loop over poll'able sources). + * Differences between the two paradigms should also be accounted for when + * the requirement to swap the event loop implementations arises. + */ struct qb_ipcs_poll_handlers { qb_ipcs_job_add_fn job_add; qb_ipcs_dispatch_add_fn dispatch_add; diff --git a/include/qb/qbloop.h b/include/qb/qbloop.h index 6bded75bb..db0c480d4 100644 --- a/include/qb/qbloop.h +++ b/include/qb/qbloop.h @@ -36,6 +36,26 @@ extern "C" { * * Main loop manages timers, jobs and polling sockets. * + * Only a weaker sense of priorities is implemented, alluding to distinct + * set of pros and cons compared to the stronger, strict approach to them + * as widely applied in this problem space (since the latter gives the + * application more control as the effect of the former can still be + * achieved with some reductions, whereas it is not straightforward the + * other way around; cf. static priority task scheduling vs. relative + * fine-tuning within a single priority domain with nice(2)): + * + * + implicit mitigation for deadlock-prone priority arrangements + * + * - less predictable (proportional probability based, we can talk + * about an advisory effect of the priorities) responses to the arrival + * of the high-ranked events (i.e. in the process of the picking the next + * event to handle from the priority queue when at least two different + * priorities are eligible at the moment) + * + * One practical application for this module of libqb is in combination with + * IPC servers based on qbipcs.h published one (the #qb_ipcs_poll_handlers + * structure maps fittingly to the control functions published here). + * * @example tcpserver.c */ From c5cb0db0f29b5473b87b454de852e09fbde8f91e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Pokorn=C3=BD?= Date: Tue, 28 May 2019 20:37:09 +0200 Subject: [PATCH 9/9] CI: travis: add (redundant for now, but...) libglib2.0-dev prerequisite MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We want to run every and each test we can, without reliance on transitive deoendencies and environment "invariants". Signed-off-by: Jan Pokorný --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index 5ee9a1052..2c5fbdca2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,6 +8,8 @@ addons: packages: - check - splint + # for ipc.test:test_ipc_dispatch_*_deadlock_provoke + - libglib2.0-dev # natively present, but doesn't hurt # for "make rpm" - doxygen - rpm