diff --git a/include/qb/qbipcc.h b/include/qb/qbipcc.h index de96c7289..867ba045b 100644 --- a/include/qb/qbipcc.h +++ b/include/qb/qbipcc.h @@ -80,6 +80,36 @@ typedef struct qb_ipcc_connection qb_ipcc_connection_t; qb_ipcc_connection_t* qb_ipcc_connect(const char *name, size_t max_msg_size); +/** + * Asynchronously connect to an IPC service + * @param name name of the service. + * @param max_msg_size biggest msg size. + * @param connect_fd return FD to continue connection with + * @return NULL (error: see errno) or a connection object. + * + * qb_ipcc_connect_async() returns a connection FD which + * should be used added to the application's mainloop - when it is + * active, qb_ipcc_connect_continue() should be called for the + * connection to be finalised. + * NOTE: This is NOT the same FD that is used for normal applicaion + * polling. qb_ipcc_fd_get() must still be called once the connection + * is established. + */ +qb_ipcc_connection_t * +qb_ipcc_connect_async(const char *name, size_t max_msg_size, int *connect_fd); + +/** + * Finish up an asynchonous IPC connection + * @param c connection handle as returned from qb_ipcc_connect_async() + * @return 0 or -errno. + * + * Finishes up a connection that was initiated by qb_ipcc_connect_async(), + * this should only be called when the fd returned by qb_ipcc_connect_async() + * becomes active, usually as a callback in the application's main loop. + */ +int +qb_ipcc_connect_continue(struct qb_ipcc_connection * c); + /** * Test kernel dgram socket buffers to verify the largest size up * to the max_msg_size value a single msg can be. Rounds down to the diff --git a/lib/ipc_int.h b/lib/ipc_int.h index 03c5dabbb..87f1de1d9 100644 --- a/lib/ipc_int.h +++ b/lib/ipc_int.h @@ -106,7 +106,8 @@ struct qb_ipcc_connection { }; int32_t qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c, - struct qb_ipc_connection_response *r); + struct qb_ipc_connection_response *r); +int qb_ipcc_setup_connect_continue(struct qb_ipcc_connection *c, struct qb_ipc_connection_response *response); ssize_t qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len); ssize_t qb_ipc_us_recv(struct qb_ipc_one_way *one_way, void *msg, size_t len, int32_t timeout); int32_t qb_ipc_us_ready(struct qb_ipc_one_way *ow_data, struct qb_ipc_one_way *ow_conn, diff --git a/lib/ipc_setup.c b/lib/ipc_setup.c index c144a5ef2..0ef9bb6bc 100644 --- a/lib/ipc_setup.c +++ b/lib/ipc_setup.c @@ -446,9 +446,7 @@ qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c, { int32_t res; struct qb_ipc_connection_request request; - struct ipc_auth_data *data; #ifdef QB_LINUX - int off = 0; int on = 1; #endif @@ -471,13 +469,24 @@ qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c, return res; } + /* ... To be continued ... (when the FD is active) */ + return 0; +} + +/* Called from ipcc_connect_continue() when async connect socket is active */ +int qb_ipcc_setup_connect_continue(struct qb_ipcc_connection *c, struct qb_ipc_connection_response *r) +{ + struct ipc_auth_data *data; + int32_t res; +#ifdef QB_LINUX + int off = 0; +#endif data = init_ipc_auth_data(c->setup.u.us.sock, sizeof(struct qb_ipc_connection_response)); if (data == NULL) { qb_ipcc_us_sock_close(c->setup.u.us.sock); return -ENOMEM; } - qb_ipc_us_ready(&c->setup, NULL, -1, POLLIN); res = qb_ipc_us_recv_msghdr(data); #ifdef QB_LINUX @@ -498,6 +507,7 @@ qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c, c->server_pid = data->ugp.pid; destroy_ipc_auth_data(data); + return r->hdr.error; } diff --git a/lib/ipcc.c b/lib/ipcc.c index a6cf409b6..c744ea170 100644 --- a/lib/ipcc.c +++ b/lib/ipcc.c @@ -45,6 +45,70 @@ qb_ipcc_connect(const char *name, size_t max_msg_size) if (res < 0) { goto disconnect_and_cleanup; } + qb_ipc_us_ready(&c->setup, NULL, -1, POLLIN); + res = qb_ipcc_connect_continue(c); + if (res != 0) { + /* qb_ipcc_connect_continue() has cleaned up for us */ + errno = -res; + return NULL; + } + + return c; + +disconnect_and_cleanup: + if (c->setup.u.us.sock >= 0) { + qb_ipcc_us_sock_close(c->setup.u.us.sock); + } + free(c->receive_buf); + free(c); + errno = -res; + return NULL; +} + +qb_ipcc_connection_t * +qb_ipcc_connect_async(const char *name, size_t max_msg_size, int *connect_fd) +{ + int32_t res; + qb_ipcc_connection_t *c = NULL; + struct qb_ipc_connection_response response; + + c = calloc(1, sizeof(struct qb_ipcc_connection)); + if (c == NULL) { + return NULL; + } + + c->setup.max_msg_size = QB_MAX(max_msg_size, + sizeof(struct qb_ipc_connection_response)); + (void)strlcpy(c->name, name, NAME_MAX); + res = qb_ipcc_us_setup_connect(c, &response); + if (res < 0) { + goto disconnect_and_cleanup; + } + + *connect_fd = c->setup.u.us.sock; + return c; + +disconnect_and_cleanup: + if (c->setup.u.us.sock >= 0) { + qb_ipcc_us_sock_close(c->setup.u.us.sock); + } + free(c->receive_buf); + free(c); + errno = -res; + return NULL; +} + +int qb_ipcc_connect_continue(struct qb_ipcc_connection * c) +{ + struct qb_ipc_connection_response response; + int32_t res; + + /* Finish up the authentication part */ + res = qb_ipcc_setup_connect_continue(c, &response); + if (res != 0) { + goto disconnect_and_cleanup; + } + c->response.type = response.connection_type; c->request.type = response.connection_type; c->event.type = response.connection_type; @@ -79,7 +143,7 @@ qb_ipcc_connect(const char *name, size_t max_msg_size) goto disconnect_and_cleanup; } c->is_connected = QB_TRUE; - return c; + return 0; disconnect_and_cleanup: if (c->setup.u.us.sock >= 0) { @@ -88,7 +152,8 @@ qb_ipcc_connect(const char *name, size_t max_msg_size) free(c->receive_buf); free(c); errno = -res; - return NULL; + return -res; + } static int32_t diff --git a/tests/check_ipc.c b/tests/check_ipc.c index e8f81f3ac..6090354ad 100644 --- a/tests/check_ipc.c +++ b/tests/check_ipc.c @@ -1007,6 +1007,62 @@ send_and_check(int32_t req_id, uint32_t size, return res; } + +static int32_t +process_async_connect(int32_t fd, int32_t revents, void *data) +{ + qb_loop_t *cl = (qb_loop_t *)data; + int res; + + res = qb_ipcc_connect_continue(conn); + ck_assert_int_eq(res, 0); + qb_loop_stop(cl); + return 0; +} +static void test_ipc_connect_async(void) +{ + struct qb_ipc_request_header req_header; + struct qb_ipc_response_header res_header; + int32_t res; + pid_t pid; + uint32_t max_size = MAX_MSG_SIZE; + int connect_fd; + struct iovec iov[1]; + static qb_loop_t *cl; + + pid = run_function_in_new_process("server", run_ipc_server, NULL); + ck_assert(pid != -1); + + conn = qb_ipcc_connect_async(ipc_name, max_size, &connect_fd); + ck_assert(conn != NULL); + + cl = qb_loop_create(); + res = qb_loop_poll_add(cl, QB_LOOP_MED, + connect_fd, POLLIN, + cl, process_async_connect); + ck_assert_int_eq(res, 0); + qb_loop_run(cl); + + /* Send some data */ + req_header.id = IPC_MSG_REQ_TX_RX; + req_header.size = sizeof(struct qb_ipc_request_header); + + iov[0].iov_len = req_header.size; + iov[0].iov_base = &req_header; + + res = qb_ipcc_sendv_recv(conn, iov, 1, + &res_header, + sizeof(struct qb_ipc_response_header), 5000); + + ck_assert_int_ge(res, 0); + + request_server_exit(); + verify_graceful_stop(pid); + + + qb_ipcc_disconnect(conn); +} + static void test_ipc_txrx_timeout(void) { @@ -1226,6 +1282,7 @@ START_TEST(test_ipc_txrx_shm_timeout) } END_TEST + START_TEST(test_ipc_txrx_us_timeout) { qb_enter(); @@ -1236,6 +1293,25 @@ START_TEST(test_ipc_txrx_us_timeout) } END_TEST +START_TEST(test_ipc_shm_connect_async) +{ + qb_enter(); + ipc_type = QB_IPC_SHM; + set_ipc_name(__func__); + test_ipc_connect_async(); + qb_leave(); +} +END_TEST + +START_TEST(test_ipc_us_connect_async) +{ + qb_enter(); + ipc_type = QB_IPC_SHM; + set_ipc_name(__func__); + test_ipc_connect_async(); + qb_leave(); +} +END_TEST START_TEST(test_ipc_txrx_shm_getauth) { @@ -2277,6 +2353,8 @@ make_shm_suite(void) TCase *tc; Suite *s = suite_create("shm"); + add_tcase(s, tc, test_ipc_shm_connect_async, 7); + add_tcase(s, tc, test_ipc_txrx_shm_getauth, 7); add_tcase(s, tc, test_ipc_txrx_shm_timeout, 28); add_tcase(s, tc, test_ipc_server_fail_shm, 7); @@ -2308,6 +2386,8 @@ make_soc_suite(void) Suite *s = suite_create("socket"); TCase *tc; + add_tcase(s, tc, test_ipc_us_connect_async, 7); + add_tcase(s, tc, test_ipc_txrx_us_getauth, 7); add_tcase(s, tc, test_ipc_txrx_us_timeout, 28); /* Commented out for the moment as space in /dev/shm on the CI machines