diff --git a/.travis.yml b/.travis.yml index 5ee9a1052..2c5fbdca2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,6 +8,8 @@ addons: packages: - check - splint + # for ipc.test:test_ipc_dispatch_*_deadlock_provoke + - libglib2.0-dev # natively present, but doesn't hurt # for "make rpm" - doxygen - rpm diff --git a/include/qb/qbipcs.h b/include/qb/qbipcs.h index 55c0f8151..7cb858662 100644 --- a/include/qb/qbipcs.h +++ b/include/qb/qbipcs.h @@ -43,6 +43,12 @@ extern "C" { * @example ipcserver.c */ +/** + * Rates to be passed to #qb_ipcs_request_rate_limit. The exact interpretation + * depends on how the event loop implementation understands the concept of + * priorities, see the discussion at #qb_ipcs_poll_handlers structure -- an + * integration point between IPC server instance and the underlying event loop. + */ enum qb_ipcs_rate_limit { QB_IPCS_RATE_FAST, QB_IPCS_RATE_NORMAL, @@ -104,6 +110,22 @@ typedef int32_t (*qb_ipcs_job_add_fn)(enum qb_loop_priority p, void *data, qb_loop_job_dispatch_fn dispatch_fn); +/** + * A set of callbacks that need to be provided (only #job_add can be #NULL) + * whenever the IPC server is to be run (by the means of #qb_ipcs_run). + * It is possible to use accordingly named functions defined in qbloop.h module + * or integrate with other existing (like GLib's event loop) or entirely new + * code -- see the subtle distinction amongst the possible event loops pointed + * out in the introductory comment at qbloop.h. + * + * At that occasion, please note the correlation of #QB_IPCS_RATE_FAST etc. + * symbolic names with said advisory effect of the priorities in the native + * implementation. This correspondence will not be this intuitively seemless + * if some other event loop implementation is hooked in given that it abids + * them strictly as mentioned (e.g. GLib's event loop over poll'able sources). + * Differences between the two paradigms should also be accounted for when + * the requirement to swap the event loop implementations arises. + */ struct qb_ipcs_poll_handlers { qb_ipcs_job_add_fn job_add; qb_ipcs_dispatch_add_fn dispatch_add; diff --git a/include/qb/qbloop.h b/include/qb/qbloop.h index 6bded75bb..db0c480d4 100644 --- a/include/qb/qbloop.h +++ b/include/qb/qbloop.h @@ -36,6 +36,26 @@ extern "C" { * * Main loop manages timers, jobs and polling sockets. * + * Only a weaker sense of priorities is implemented, alluding to distinct + * set of pros and cons compared to the stronger, strict approach to them + * as widely applied in this problem space (since the latter gives the + * application more control as the effect of the former can still be + * achieved with some reductions, whereas it is not straightforward the + * other way around; cf. static priority task scheduling vs. relative + * fine-tuning within a single priority domain with nice(2)): + * + * + implicit mitigation for deadlock-prone priority arrangements + * + * - less predictable (proportional probability based, we can talk + * about an advisory effect of the priorities) responses to the arrival + * of the high-ranked events (i.e. in the process of the picking the next + * event to handle from the priority queue when at least two different + * priorities are eligible at the moment) + * + * One practical application for this module of libqb is in combination with + * IPC servers based on qbipcs.h published one (the #qb_ipcs_poll_handlers + * structure maps fittingly to the control functions published here). + * * @example tcpserver.c */ diff --git a/lib/ipc_setup.c b/lib/ipc_setup.c index 3f53c4b06..7cd1fd91e 100644 --- a/lib/ipc_setup.c +++ b/lib/ipc_setup.c @@ -843,12 +843,13 @@ qb_ipcs_uc_recv_and_auth(int32_t sock, struct qb_ipcs_service *s) setsockopt(sock, SOL_SOCKET, SO_PASSCRED, &on, sizeof(on)); #endif - res = s->poll_fns.dispatch_add(QB_LOOP_MED, - data->sock, - POLLIN | POLLPRI | POLLNVAL, - data, process_auth); + res = s->poll_fns.dispatch_add(s->poll_priority, + data->sock, + POLLIN | POLLPRI | POLLNVAL, + data, process_auth); if (res < 0) { - qb_util_log(LOG_DEBUG, "Failed to process AUTH for fd (%d)", data->sock); + qb_util_log(LOG_DEBUG, "Failed to arrange for AUTH for fd (%d)", + data->sock); close(sock); destroy_ipc_auth_data(data); } diff --git a/tests/Makefile.am b/tests/Makefile.am index df1af81a7..da8f3a5b0 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -147,6 +147,11 @@ ipc_test_LDADD = $(top_builddir)/lib/libqb.la @CHECK_LIBS@ if HAVE_FAILURE_INJECTION ipc_test_LDADD += _failure_injection.la +if HAVE_GLIB +ipc_test_CFLAGS += $(GLIB_CFLAGS) +ipc_test_LDADD += $(GLIB_LIBS) +endif + check_LTLIBRARIES += _failure_injection.la _failure_injection_la_SOURCES = _failure_injection.c _failure_injection.h _failure_injection_la_LDFLAGS = -module diff --git a/tests/check_ipc.c b/tests/check_ipc.c index 71b3a7fa5..5ded7db54 100644 --- a/tests/check_ipc.c +++ b/tests/check_ipc.c @@ -24,6 +24,12 @@ #include "os_base.h" #include #include +#include +#include + +#ifdef HAVE_GLIB +#include +#endif #include "check_common.h" @@ -62,9 +68,12 @@ static const int MAX_MSG_SIZE = DEFAULT_MAX_MSG_SIZE; * this the largests msg we can successfully send. */ #define GIANT_MSG_DATA_SIZE MAX_MSG_SIZE - sizeof(struct qb_ipc_response_header) - 8 -static int enforce_server_buffer=0; +static int enforce_server_buffer; static qb_ipcc_connection_t *conn; static enum qb_ipc_type ipc_type; +static enum qb_loop_priority global_loop_prio = QB_LOOP_MED; +static bool global_use_glib; +static int global_pipefd[2]; enum my_msg_ids { IPC_MSG_REQ_TX_RX, @@ -75,12 +84,92 @@ enum my_msg_ids { IPC_MSG_RES_BULK_EVENTS, IPC_MSG_REQ_STRESS_EVENT, IPC_MSG_RES_STRESS_EVENT, + IPC_MSG_REQ_SELF_FEED, + IPC_MSG_RES_SELF_FEED, IPC_MSG_REQ_SERVER_FAIL, IPC_MSG_RES_SERVER_FAIL, IPC_MSG_REQ_SERVER_DISCONNECT, IPC_MSG_RES_SERVER_DISCONNECT, }; + +/* these 2 functions from pacemaker code */ +static enum qb_ipcs_rate_limit +conv_libqb_prio2ratelimit(enum qb_loop_priority prio) +{ + /* this is an inversion of what libqb's qb_ipcs_request_rate_limit does */ + enum qb_ipcs_rate_limit ret = QB_IPCS_RATE_NORMAL; + switch (prio) { + case QB_LOOP_LOW: + ret = QB_IPCS_RATE_SLOW; + break; + case QB_LOOP_HIGH: + ret = QB_IPCS_RATE_FAST; + break; + default: + qb_log(LOG_DEBUG, "Invalid libqb's loop priority %d," + " assuming QB_LOOP_MED", prio); + /* fall-through */ + case QB_LOOP_MED: + break; + } + return ret; +} +#ifdef HAVE_GLIB +static gint +conv_prio_libqb2glib(enum qb_loop_priority prio) +{ + gint ret = G_PRIORITY_DEFAULT; + switch (prio) { + case QB_LOOP_LOW: + ret = G_PRIORITY_LOW; + break; + case QB_LOOP_HIGH: + ret = G_PRIORITY_HIGH; + break; + default: + qb_log(LOG_DEBUG, "Invalid libqb's loop priority %d," + " assuming QB_LOOP_MED", prio); + /* fall-through */ + case QB_LOOP_MED: + break; + } + return ret; +} + +/* these 3 glue functions inspired from pacemaker, too */ +static gboolean +gio_source_prepare(GSource *source, gint *timeout) +{ + qb_enter(); + *timeout = 500; + return FALSE; +} +static gboolean +gio_source_check(GSource *source) +{ + qb_enter(); + return TRUE; +} +static gboolean +gio_source_dispatch(GSource *source, GSourceFunc callback, gpointer user_data) +{ + gboolean ret = G_SOURCE_CONTINUE; + qb_enter(); + if (callback) { + ret = callback(user_data); + } + return ret; +} +static GSourceFuncs gio_source_funcs = { + .prepare = gio_source_prepare, + .check = gio_source_check, + .dispatch = gio_source_dispatch, +}; + +#endif + + /* Test Cases * * 1) basic send & recv different message sizes @@ -140,10 +229,65 @@ set_ipc_name(const char *prefix) t_sec[sizeof(t_sec) - 1] = '\0'; } - snprintf(ipc_name, sizeof(ipc_name), "%s%s%lX%.4x", prefix, t_sec, + snprintf(ipc_name, sizeof(ipc_name), "%.44s%s%lX%.4x", prefix, t_sec, (unsigned long)getpid(), (unsigned) ((long) time(NULL) % (0x10000))); } +static int +pipe_writer(int fd, int revents, void *data) { + qb_enter(); + static const char buf[8] = { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h' }; + + ssize_t wbytes = 0, wbytes_sum = 0; + + //for (size_t i = 0; i < SIZE_MAX; i++) { + for (size_t i = 0; i < 4096; i++) { + wbytes_sum += wbytes; + if ((wbytes = write(fd, buf, sizeof(buf))) == -1) { + if (errno != EAGAIN) { + perror("write"); + exit(-1); + } + break; + } + } + if (wbytes_sum > 0) { + qb_log(LOG_DEBUG, "written %zd bytes", wbytes_sum); + } + qb_leave(); + return 1; +} + +static int +pipe_reader(int fd, int revents, void *data) { + qb_enter(); + ssize_t rbytes, rbytes_sum = 0; + size_t cnt = SIZE_MAX; + char buf[4096] = { '\0' }; + while ((rbytes = read(fd, buf, sizeof(buf))) > 0 && rbytes < cnt) { + cnt -= rbytes; + rbytes_sum += rbytes; + } + if (rbytes_sum > 0) { + fail_if(buf[0] == '\0'); /* avoid dead store elimination */ + qb_log(LOG_DEBUG, "read %zd bytes", rbytes_sum); + sleep(1); + } + qb_leave(); + return 1; +} + +#if HAVE_GLIB +static gboolean +gio_pipe_reader(void *data) { + return (pipe_reader(*((int *) data), 0, NULL) > 0); +} +static gboolean +gio_pipe_writer(void *data) { + return (pipe_writer(*((int *) data), 0, NULL) > 0); +} +#endif + static int32_t s1_msg_process_fn(qb_ipcs_connection_t *c, void *data, size_t size) @@ -264,6 +408,39 @@ s1_msg_process_fn(qb_ipcs_connection_t *c, giant_event_send.hdr.id++; } + } else if (req_pt->id == IPC_MSG_REQ_SELF_FEED) { + if (pipe(global_pipefd) != 0) { + perror("pipefd"); + fail_if(1); + } + fcntl(global_pipefd[0], F_SETFL, O_NONBLOCK); + fcntl(global_pipefd[1], F_SETFL, O_NONBLOCK); + if (global_use_glib) { +#ifdef HAVE_GLIB + GSource *source_r, *source_w; + source_r = g_source_new(&gio_source_funcs, sizeof(GSource)); + source_w = g_source_new(&gio_source_funcs, sizeof(GSource)); + fail_if(source_r == NULL || source_w == NULL); + g_source_set_priority(source_r, conv_prio_libqb2glib(QB_LOOP_HIGH)); + g_source_set_priority(source_w, conv_prio_libqb2glib(QB_LOOP_HIGH)); + g_source_set_can_recurse(source_r, FALSE); + g_source_set_can_recurse(source_w, FALSE); + g_source_set_callback(source_r, gio_pipe_reader, &global_pipefd[0], NULL); + g_source_set_callback(source_w, gio_pipe_writer, &global_pipefd[1], NULL); + g_source_add_unix_fd(source_r, global_pipefd[0], G_IO_IN); + g_source_add_unix_fd(source_w, global_pipefd[1], G_IO_OUT); + g_source_attach(source_r, NULL); + g_source_attach(source_w, NULL); +#else + fail_if(1); +#endif + } else { + qb_loop_poll_add(my_loop, QB_LOOP_HIGH, global_pipefd[1], + POLLOUT|POLLERR, NULL, pipe_writer); + qb_loop_poll_add(my_loop, QB_LOOP_HIGH, global_pipefd[0], + POLLIN|POLLERR, NULL, pipe_reader); + } + } else if (req_pt->id == IPC_MSG_REQ_SERVER_FAIL) { exit(0); } else if (req_pt->id == IPC_MSG_REQ_SERVER_DISCONNECT) { @@ -301,6 +478,122 @@ my_dispatch_del(int32_t fd) return qb_loop_poll_del(my_loop, fd); } + +/* taken from examples/ipcserver.c, with s/my_g/gio/ */ +#ifdef HAVE_GLIB + +#include + +static qb_array_t *gio_map; +static GMainLoop *glib_loop; + +struct gio_to_qb_poll { + int32_t is_used; + int32_t events; + int32_t source; + int32_t fd; + void *data; + qb_ipcs_dispatch_fn_t fn; + enum qb_loop_priority p; +}; + +static gboolean +gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data) +{ + struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; + gint fd = g_io_channel_unix_get_fd(gio); + + qb_enter(); + + return (adaptor->fn(fd, condition, adaptor->data) == 0); +} + +static void +gio_poll_destroy(gpointer data) +{ + struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data; + + adaptor->is_used--; + if (adaptor->is_used == 0) { + qb_log(LOG_DEBUG, "fd %d adaptor destroyed\n", adaptor->fd); + adaptor->fd = 0; + adaptor->source = 0; + } +} + +static int32_t +gio_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts, + void *data, qb_ipcs_dispatch_fn_t fn, gboolean is_new) +{ + struct gio_to_qb_poll *adaptor; + GIOChannel *channel; + int32_t res = 0; + + qb_enter(); + + res = qb_array_index(gio_map, fd, (void **)&adaptor); + if (res < 0) { + return res; + } + if (adaptor->is_used && adaptor->source) { + if (is_new) { + return -EEXIST; + } + g_source_remove(adaptor->source); + adaptor->source = 0; + } + + channel = g_io_channel_unix_new(fd); + if (!channel) { + return -ENOMEM; + } + + adaptor->fn = fn; + adaptor->events = evts; + adaptor->data = data; + adaptor->p = p; + adaptor->is_used++; + adaptor->fd = fd; + + adaptor->source = g_io_add_watch_full(channel, conv_prio_libqb2glib(p), + evts, gio_read_socket, adaptor, + gio_poll_destroy); + + /* we are handing the channel off to be managed by mainloop now. + * remove our reference. */ + g_io_channel_unref(channel); + + return 0; +} + +static int32_t +gio_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts, + void *data, qb_ipcs_dispatch_fn_t fn) +{ + return gio_dispatch_update(p, fd, evts, data, fn, TRUE); +} + +static int32_t +gio_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts, + void *data, qb_ipcs_dispatch_fn_t fn) +{ + return gio_dispatch_update(p, fd, evts, data, fn, FALSE); +} + +static int32_t +gio_dispatch_del(int32_t fd) +{ + struct gio_to_qb_poll *adaptor; + if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) { + g_source_remove(adaptor->source); + adaptor->source = 0; + } + return 0; +} + +#endif /* HAVE_GLIB */ + + static int32_t s1_connection_closed(qb_ipcs_connection_t *c) { @@ -395,8 +688,30 @@ s1_connection_created(qb_ipcs_connection_t *c) } -static void -run_ipc_server(void) +static volatile sig_atomic_t usr1_bit; + +static void usr1_bit_setter(int signal) { + if (signal == SIGUSR1) { + usr1_bit = 1; + } +} + +#define READY_SIGNALLER(name, data_arg) void (name)(void *data_arg) +typedef READY_SIGNALLER(ready_signaller_fn, ); + +static +READY_SIGNALLER(usr1_signaller, parent_target) +{ + kill(*((pid_t *) parent_target), SIGUSR1); +} + +#define NEW_PROCESS_RUNNER(name, ready_signaller_arg, signaller_data_arg, data_arg) \ + void (name)(ready_signaller_fn ready_signaller_arg, \ + void *signaller_data_arg, void *data_arg) +typedef NEW_PROCESS_RUNNER(new_process_runner_fn, , , ); + +static +NEW_PROCESS_RUNNER(run_ipc_server, ready_signaller, signaller_data, data) { int32_t res; qb_loop_signal_handle handle; @@ -409,22 +724,44 @@ run_ipc_server(void) .connection_closed = s1_connection_closed, }; - struct qb_ipcs_poll_handlers ph = { - .job_add = my_job_add, - .dispatch_add = my_dispatch_add, - .dispatch_mod = my_dispatch_mod, - .dispatch_del = my_dispatch_del, - }; + struct qb_ipcs_poll_handlers ph; uint32_t max_size = MAX_MSG_SIZE; my_loop = qb_loop_create(); qb_loop_signal_add(my_loop, QB_LOOP_HIGH, SIGTERM, - NULL, exit_handler, &handle); + NULL, exit_handler, &handle); s1 = qb_ipcs_create(ipc_name, 4, ipc_type, &sh); fail_if(s1 == 0); + if (global_loop_prio != QB_LOOP_MED) { + qb_ipcs_request_rate_limit(s1, + conv_libqb_prio2ratelimit(global_loop_prio)); + } + if (global_use_glib) { +#ifdef HAVE_GLIB + ph = (struct qb_ipcs_poll_handlers) { + .job_add = NULL, + .dispatch_add = gio_dispatch_add, + .dispatch_mod = gio_dispatch_mod, + .dispatch_del = gio_dispatch_del, + }; + glib_loop = g_main_loop_new(NULL, FALSE); + gio_map = qb_array_create_2(16, sizeof(struct gio_to_qb_poll), 1); + fail_if (gio_map == NULL); +#else + fail_if(1); +#endif + } else { + ph = (struct qb_ipcs_poll_handlers) { + .job_add = my_job_add, + .dispatch_add = my_dispatch_add, + .dispatch_mod = my_dispatch_mod, + .dispatch_del = my_dispatch_del, + }; + } + if (enforce_server_buffer) { qb_ipcs_enforce_buffer_size(s1, max_size); } @@ -433,14 +770,44 @@ run_ipc_server(void) res = qb_ipcs_run(s1); ck_assert_int_eq(res, 0); - qb_loop_run(my_loop); + if (ready_signaller != NULL) { + ready_signaller(signaller_data); + } + + if (global_use_glib) { +#ifdef HAVE_GLIB + g_main_loop_run(glib_loop); +#endif + } else { + qb_loop_run(my_loop); + } qb_log(LOG_DEBUG, "loop finished - done ..."); } static pid_t -run_function_in_new_process(void (*run_ipc_server_fn)(void)) +run_function_in_new_process(const char *role, + new_process_runner_fn new_process_runner, + void *data) { - pid_t pid = fork (); + char formatbuf[1024]; + pid_t parent_target, pid; + + struct sigaction orig_sa, purpose_sa; + sigset_t orig_mask, purpose_mask, purpose_clear_mask; + + sigemptyset(&purpose_mask); + sigaddset(&purpose_mask, SIGUSR1); + + sigprocmask(SIG_BLOCK, &purpose_mask, &orig_mask); + purpose_clear_mask = orig_mask; + sigdelset(&purpose_clear_mask, SIGUSR1); + + purpose_sa.sa_handler = usr1_bit_setter; + purpose_sa.sa_mask = purpose_mask; + purpose_sa.sa_flags = SA_RESTART; + + parent_target = getpid(); + pid = fork(); if (pid == -1) { fprintf (stderr, "Can't fork\n"); @@ -448,9 +815,33 @@ run_function_in_new_process(void (*run_ipc_server_fn)(void)) } if (pid == 0) { - run_ipc_server_fn(); + sigprocmask(SIG_SETMASK, &orig_mask, NULL); + + if (role == NULL) { + qb_log_format_set(QB_LOG_STDERR, "lib/%f|%l[%P] %b"); + } else { + snprintf(formatbuf, sizeof(formatbuf), + "lib/%%f|%%l|%s[%%P] %%b", role); + qb_log_format_set(QB_LOG_STDERR, formatbuf); + } + + new_process_runner(usr1_signaller, &parent_target, data); exit(0); } + + usr1_bit = 0; + /* XXX assume never fails */ + sigaction(SIGUSR1, &purpose_sa, &orig_sa); + + do { + /* XXX assume never fails with EFAULT */ + sigsuspend(&purpose_clear_mask); + } while (usr1_bit != 1); + usr1_bit = 0; + sigprocmask(SIG_SETMASK, &orig_mask, NULL); + /* give children a slight/non-strict scheduling advantage */ + sched_yield(); + return pid; } @@ -598,16 +989,15 @@ test_ipc_txrx_timeout(void) pid_t pid; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -631,11 +1021,6 @@ test_ipc_txrx_timeout(void) request_server_exit(); verify_graceful_stop(pid); - /* - * wait a bit for the server to die. - */ - sleep(1); - /* * this needs to free up the shared mem */ @@ -652,16 +1037,15 @@ test_ipc_txrx(void) pid_t pid; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -703,16 +1087,15 @@ test_ipc_exit(void) pid_t pid; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -732,11 +1115,6 @@ test_ipc_exit(void) request_server_exit(); verify_graceful_stop(pid); - /* - * wait a bit for the server to die. - */ - sleep(1); - /* * this needs to free up the shared mem */ @@ -862,40 +1240,66 @@ struct my_res { char message[1024 * 1024]; }; -static void -test_ipc_dispatch(void) +struct dispatch_data { + pid_t server_pid; + enum my_msg_ids msg_type; + uint32_t repetitions; +}; + +static inline +NEW_PROCESS_RUNNER(client_dispatch, ready_signaller, signaller_data, data) { - int32_t j; - int32_t c = 0; - pid_t pid; - int32_t size; uint32_t max_size = MAX_MSG_SIZE; - - pid = run_function_in_new_process(run_ipc_server); - fail_if(pid == -1); - sleep(1); + int32_t size; + int32_t c = 0; + int32_t j; + pid_t server_pid = ((struct dispatch_data *) data)->server_pid; + enum my_msg_ids msg_type = ((struct dispatch_data *) data)->msg_type; do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { - j = waitpid(pid, NULL, WNOHANG); + j = waitpid(server_pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); fail_if(conn == NULL); + if (ready_signaller != NULL) { + ready_signaller(signaller_data); + } + size = QB_MIN(sizeof(struct qb_ipc_request_header), 64); - for (j = 1; j < 19; j++) { - size *= 2; - if (size >= max_size) - break; - if (send_and_check(IPC_MSG_REQ_DISPATCH, size, - recv_timeout, QB_TRUE) < 0) { - break; + + for (uint32_t r = ((struct dispatch_data *) data)->repetitions; + r > 0; r--) { + for (j = 1; j < 19; j++) { + size *= 2; + if (size >= max_size) + break; + if (send_and_check(msg_type, size, + recv_timeout, QB_TRUE) < 0) { + break; + } } } +} + +static void +test_ipc_dispatch(void) +{ + pid_t pid; + struct dispatch_data data; + + pid = run_function_in_new_process(NULL, run_ipc_server, NULL); + fail_if(pid == -1); + data = (struct dispatch_data){.server_pid = pid, + .msg_type = IPC_MSG_REQ_DISPATCH, + .repetitions = 1}; + + client_dispatch(NULL, NULL, (void *) &data); request_server_exit(); qb_ipcc_disconnect(conn); @@ -999,9 +1403,8 @@ test_ipc_stress_connections(void) QB_LOG_FILTER_FILE, "*", LOG_INFO); qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); - sleep(1); for (connections = 1; connections < 70000; connections++) { if (conn) { @@ -1047,16 +1450,15 @@ test_ipc_bulk_events(void) int32_t fd; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -1112,17 +1514,16 @@ test_ipc_stress_test(void) int32_t real_buf_size; enforce_server_buffer = 1; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); enforce_server_buffer = 0; fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, client_buf_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -1202,6 +1603,93 @@ START_TEST(test_ipc_bulk_events_us) } END_TEST +static +READY_SIGNALLER(connected_signaller, _) +{ + request_server_exit(); +} + +START_TEST(test_ipc_dispatch_us_native_prio_deadlock_provoke) +{ + pid_t server_pid, alphaclient_pid; + struct dispatch_data data; + + qb_enter(); + ipc_type = QB_IPC_SOCKET; + set_ipc_name(__func__); + + /* this is to demonstrate that native event loop can deal even + with "extreme" priority disproportions */ + global_loop_prio = QB_LOOP_LOW; + multiple_connections = QB_TRUE; + recv_timeout = -1; + + server_pid = run_function_in_new_process("server", run_ipc_server, + NULL); + fail_if(server_pid == -1); + data = (struct dispatch_data){.server_pid = server_pid, + .msg_type = IPC_MSG_REQ_SELF_FEED, + .repetitions = 1}; + alphaclient_pid = run_function_in_new_process("alphaclient", + client_dispatch, + (void *) &data); + fail_if(alphaclient_pid == -1); + + //sleep(1); + sched_yield(); + + data.repetitions = 0; + client_dispatch(connected_signaller, NULL, (void *) &data); + verify_graceful_stop(server_pid); + + multiple_connections = QB_FALSE; + qb_leave(); +} +END_TEST + +#if HAVE_GLIB +START_TEST(test_ipc_dispatch_us_glib_prio_deadlock_provoke) +{ + pid_t server_pid, alphaclient_pid; + struct dispatch_data data; + + qb_enter(); + ipc_type = QB_IPC_SOCKET; + set_ipc_name(__func__); + + global_use_glib = QB_TRUE; + /* this is to make the test pass at all, since GLib is strict + on priorities -- QB_LOOP_MED or lower would fail for sure */ + global_loop_prio = QB_LOOP_HIGH; + multiple_connections = QB_TRUE; + recv_timeout = -1; + + server_pid = run_function_in_new_process("server", run_ipc_server, + NULL); + fail_if(server_pid == -1); + data = (struct dispatch_data){.server_pid = server_pid, + .msg_type = IPC_MSG_REQ_SELF_FEED, + .repetitions = 1}; + alphaclient_pid = run_function_in_new_process("alphaclient", + client_dispatch, + (void *) &data); + fail_if(alphaclient_pid == -1); + + //sleep(1); + sched_yield(); + + data.repetitions = 0; + client_dispatch(connected_signaller, NULL, (void *) &data); + verify_graceful_stop(server_pid); + + multiple_connections = QB_FALSE; + global_loop_prio = QB_LOOP_MED; + global_use_glib = QB_FALSE; + qb_leave(); +} +END_TEST +#endif + static void test_ipc_event_on_created(void) { @@ -1215,16 +1703,15 @@ test_ipc_event_on_created(void) num_bulk_events = 1; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -1270,16 +1757,15 @@ test_ipc_disconnect_after_created(void) int32_t res; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -1328,16 +1814,15 @@ test_ipc_server_fail(void) pid_t pid; uint32_t max_size = MAX_MSG_SIZE; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -1396,6 +1881,87 @@ START_TEST(test_ipc_stress_connections_shm) } END_TEST +START_TEST(test_ipc_dispatch_shm_native_prio_deadlock_provoke) +{ + pid_t server_pid, alphaclient_pid; + struct dispatch_data data; + + qb_enter(); + ipc_type = QB_IPC_SHM; + set_ipc_name(__func__); + + /* this is to demonstrate that native event loop can deal even + with "extreme" priority disproportions */ + global_loop_prio = QB_LOOP_LOW; + multiple_connections = QB_TRUE; + recv_timeout = -1; + + server_pid = run_function_in_new_process("server", run_ipc_server, + NULL); + fail_if(server_pid == -1); + data = (struct dispatch_data){.server_pid = server_pid, + .msg_type = IPC_MSG_REQ_SELF_FEED, + .repetitions = 1}; + alphaclient_pid = run_function_in_new_process("alphaclient", + client_dispatch, + (void *) &data); + fail_if(alphaclient_pid == -1); + + //sleep(1); + sched_yield(); + + data.repetitions = 0; + client_dispatch(connected_signaller, NULL, (void *) &data); + verify_graceful_stop(server_pid); + + multiple_connections = QB_FALSE; + qb_leave(); +} +END_TEST + +#if HAVE_GLIB +START_TEST(test_ipc_dispatch_shm_glib_prio_deadlock_provoke) +{ + pid_t server_pid, alphaclient_pid; + struct dispatch_data data; + + qb_enter(); + ipc_type = QB_IPC_SOCKET; + set_ipc_name(__func__); + + global_use_glib = QB_TRUE; + /* this is to make the test pass at all, since GLib is strict + on priorities -- QB_LOOP_MED or lower would fail for sure */ + global_loop_prio = QB_LOOP_HIGH; + multiple_connections = QB_TRUE; + recv_timeout = -1; + + server_pid = run_function_in_new_process("server", run_ipc_server, + NULL); + fail_if(server_pid == -1); + data = (struct dispatch_data){.server_pid = server_pid, + .msg_type = IPC_MSG_REQ_SELF_FEED, + .repetitions = 1}; + alphaclient_pid = run_function_in_new_process("alphaclient", + client_dispatch, + (void *) &data); + fail_if(alphaclient_pid == -1); + + //sleep(1); + sched_yield(); + + data.repetitions = 0; + client_dispatch(connected_signaller, NULL, (void *) &data); + verify_graceful_stop(server_pid); + + multiple_connections = QB_FALSE; + global_loop_prio = QB_LOOP_MED; + global_use_glib = QB_FALSE; + qb_leave(); +} +END_TEST +#endif + START_TEST(test_ipc_bulk_events_shm) { qb_enter(); @@ -1458,16 +2024,15 @@ test_ipc_service_ref_count(void) reference_count_test = QB_TRUE; - pid = run_function_in_new_process(run_ipc_server); + pid = run_function_in_new_process("server", run_ipc_server, NULL); fail_if(pid == -1); - sleep(1); do { conn = qb_ipcc_connect(ipc_name, max_size); if (conn == NULL) { j = waitpid(pid, NULL, WNOHANG); ck_assert_int_eq(j, 0); - sleep(1); + poll(NULL, 0, 400); c++; } } while (conn == NULL && c < 5); @@ -1551,19 +2116,22 @@ make_shm_suite(void) TCase *tc; Suite *s = suite_create("shm"); - add_tcase(s, tc, test_ipc_txrx_shm_timeout, 30); - add_tcase(s, tc, test_ipc_server_fail_shm, 8); - add_tcase(s, tc, test_ipc_txrx_shm_block, 8); - add_tcase(s, tc, test_ipc_txrx_shm_tmo, 8); - add_tcase(s, tc, test_ipc_fc_shm, 8); - add_tcase(s, tc, test_ipc_dispatch_shm, 16); - add_tcase(s, tc, test_ipc_stress_test_shm, 16); - add_tcase(s, tc, test_ipc_bulk_events_shm, 16); - add_tcase(s, tc, test_ipc_exit_shm, 8); - add_tcase(s, tc, test_ipc_event_on_created_shm, 10); - add_tcase(s, tc, test_ipc_service_ref_count_shm, 10); - add_tcase(s, tc, test_ipc_stress_connections_shm, 3600); - + add_tcase(s, tc, test_ipc_txrx_shm_timeout, 28); + add_tcase(s, tc, test_ipc_server_fail_shm, 7); + add_tcase(s, tc, test_ipc_txrx_shm_block, 7); + add_tcase(s, tc, test_ipc_txrx_shm_tmo, 7); + add_tcase(s, tc, test_ipc_fc_shm, 7); + add_tcase(s, tc, test_ipc_dispatch_shm, 15); + add_tcase(s, tc, test_ipc_stress_test_shm, 15); + add_tcase(s, tc, test_ipc_bulk_events_shm, 15); + add_tcase(s, tc, test_ipc_exit_shm, 6); + add_tcase(s, tc, test_ipc_event_on_created_shm, 9); + add_tcase(s, tc, test_ipc_service_ref_count_shm, 9); + add_tcase(s, tc, test_ipc_stress_connections_shm, 3600 /* ? */); + add_tcase(s, tc, test_ipc_dispatch_shm_native_prio_deadlock_provoke, 15); +#if HAVE_GLIB + add_tcase(s, tc, test_ipc_dispatch_shm_glib_prio_deadlock_provoke, 15); +#endif #ifdef HAVE_FAILURE_INJECTION add_tcase(s, tc, test_ipcc_truncate_when_unlink_fails_shm, 8); #endif @@ -1577,24 +2145,28 @@ make_soc_suite(void) Suite *s = suite_create("socket"); TCase *tc; - add_tcase(s, tc, test_ipc_txrx_us_timeout, 30); + add_tcase(s, tc, test_ipc_txrx_us_timeout, 28); /* Commented out for the moment as space in /dev/shm on the CI machines causes random failures */ /* add_tcase(s, tc, test_ipc_max_dgram_size, 30); */ - add_tcase(s, tc, test_ipc_server_fail_soc, 8); - add_tcase(s, tc, test_ipc_txrx_us_block, 8); - add_tcase(s, tc, test_ipc_txrx_us_tmo, 8); - add_tcase(s, tc, test_ipc_fc_us, 8); - add_tcase(s, tc, test_ipc_exit_us, 8); - add_tcase(s, tc, test_ipc_dispatch_us, 16); + add_tcase(s, tc, test_ipc_server_fail_soc, 7); + add_tcase(s, tc, test_ipc_txrx_us_block, 7); + add_tcase(s, tc, test_ipc_txrx_us_tmo, 7); + add_tcase(s, tc, test_ipc_fc_us, 7); + add_tcase(s, tc, test_ipc_exit_us, 6); + add_tcase(s, tc, test_ipc_dispatch_us, 15); #ifndef __clang__ /* see variable length array in structure' at the top */ - add_tcase(s, tc, test_ipc_stress_test_us, 60); + add_tcase(s, tc, test_ipc_stress_test_us, 58); +#endif + add_tcase(s, tc, test_ipc_bulk_events_us, 15); + add_tcase(s, tc, test_ipc_event_on_created_us, 9); + add_tcase(s, tc, test_ipc_disconnect_after_created_us, 9); + add_tcase(s, tc, test_ipc_service_ref_count_us, 9); + add_tcase(s, tc, test_ipc_stress_connections_us, 3600 /* ? */); + add_tcase(s, tc, test_ipc_dispatch_us_native_prio_deadlock_provoke, 15); +#if HAVE_GLIB + add_tcase(s, tc, test_ipc_dispatch_us_glib_prio_deadlock_provoke, 15); #endif - add_tcase(s, tc, test_ipc_bulk_events_us, 16); - add_tcase(s, tc, test_ipc_event_on_created_us, 10); - add_tcase(s, tc, test_ipc_disconnect_after_created_us, 10); - add_tcase(s, tc, test_ipc_service_ref_count_us, 10); - add_tcase(s, tc, test_ipc_stress_connections_us, 3600); return s; }