Skip to content

Commit

Permalink
ipcc: Add an async connect API
Browse files Browse the repository at this point in the history
  • Loading branch information
chrissie-c committed Nov 26, 2021
1 parent a2691b9 commit 255b1c4
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 6 deletions.
29 changes: 29 additions & 0 deletions include/qb/qbipcc.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,35 @@ typedef struct qb_ipcc_connection qb_ipcc_connection_t;
qb_ipcc_connection_t*
qb_ipcc_connect(const char *name, size_t max_msg_size);

/**
* Asynchronously connect to an IPC service
* @param name name of the service.
* @param max_msg_size biggest msg size.
* @return NULL (error: see errno) or a connection object.
*
* qb_ipcc_connect_async() also returns a connection FD which
* should be used added to the application's mainloop and when
* active, qb_ipcc_connect_continue() should be called for the
* connection to be finalised.
* NOTE: This is NOT be the FD that is used for normal applicaion
* polling. qb_ipcc_fd_get() must still be called once the connection
* is established.
*/
qb_ipcc_connection_t *
qb_ipcc_connect_async(const char *name, size_t max_msg_size, int *connect_fd);

/**
* Finish up an asynchonous IPC connection
* @param c connection handle as returned from qb_ipcc_connect_async()
* @return 0 or -errno.
*
* Finishes up a connection that was initiated by qb_ipcc_connect_async(),
* this should only be called then the fd returned by qb_ipcc_connect_async()
* becomes active, usually as a callback in the application's main loop.
*/
int
qb_ipcc_connect_continue(struct qb_ipcc_connection * c);

/**
* Test kernel dgram socket buffers to verify the largest size up
* to the max_msg_size value a single msg can be. Rounds down to the
Expand Down
3 changes: 2 additions & 1 deletion lib/ipc_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ struct qb_ipcc_connection {
};

int32_t qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
struct qb_ipc_connection_response *r);
struct qb_ipc_connection_response *r);
int qb_ipcc_setup_connect_continue(struct qb_ipcc_connection *c, struct qb_ipc_connection_response *response);
ssize_t qb_ipc_us_send(struct qb_ipc_one_way *one_way, const void *msg, size_t len);
ssize_t qb_ipc_us_recv(struct qb_ipc_one_way *one_way, void *msg, size_t len, int32_t timeout);
int32_t qb_ipc_us_ready(struct qb_ipc_one_way *ow_data, struct qb_ipc_one_way *ow_conn,
Expand Down
16 changes: 13 additions & 3 deletions lib/ipc_setup.c
Original file line number Diff line number Diff line change
Expand Up @@ -446,9 +446,7 @@ qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
{
int32_t res;
struct qb_ipc_connection_request request;
struct ipc_auth_data *data;
#ifdef QB_LINUX
int off = 0;
int on = 1;
#endif

Expand All @@ -471,13 +469,24 @@ qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
return res;
}

/* ... To be continued ... */
return 0;
}

/* Called from ipcc_connect_continue() when async connect socket is active */
int qb_ipcc_setup_connect_continue(struct qb_ipcc_connection *c, struct qb_ipc_connection_response *r)
{
struct ipc_auth_data *data;
int32_t res;
#ifdef QB_LINUX
int off = 0;
#endif
data = init_ipc_auth_data(c->setup.u.us.sock, sizeof(struct qb_ipc_connection_response));
if (data == NULL) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
return -ENOMEM;
}

qb_ipc_us_ready(&c->setup, NULL, -1, POLLIN);
res = qb_ipc_us_recv_msghdr(data);

#ifdef QB_LINUX
Expand All @@ -498,6 +507,7 @@ qb_ipcc_us_setup_connect(struct qb_ipcc_connection *c,
c->server_pid = data->ugp.pid;

destroy_ipc_auth_data(data);

return r->hdr.error;
}

Expand Down
69 changes: 67 additions & 2 deletions lib/ipcc.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,70 @@ qb_ipcc_connect(const char *name, size_t max_msg_size)
if (res < 0) {
goto disconnect_and_cleanup;
}
qb_ipc_us_ready(&c->setup, NULL, -1, POLLIN);
res = qb_ipcc_connect_continue(c);
if (res < 0) {
/* qb_ipcc_connect_continue() has cleaned up for us */
errno = -res;
return NULL;
}

return c;

disconnect_and_cleanup:
if (c->setup.u.us.sock >= 0) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
}
free(c->receive_buf);
free(c);
errno = -res;
return NULL;
}

qb_ipcc_connection_t *
qb_ipcc_connect_async(const char *name, size_t max_msg_size, int *connect_fd)
{
int32_t res;
qb_ipcc_connection_t *c = NULL;
struct qb_ipc_connection_response response;

c = calloc(1, sizeof(struct qb_ipcc_connection));
if (c == NULL) {
return NULL;
}

c->setup.max_msg_size = QB_MAX(max_msg_size,
sizeof(struct qb_ipc_connection_response));
(void)strlcpy(c->name, name, NAME_MAX);
res = qb_ipcc_us_setup_connect(c, &response);
if (res < 0) {
goto disconnect_and_cleanup;
}

*connect_fd = c->setup.u.us.sock;
return c;

disconnect_and_cleanup:
if (c->setup.u.us.sock >= 0) {
qb_ipcc_us_sock_close(c->setup.u.us.sock);
}
free(c->receive_buf);
free(c);
errno = -res;
return NULL;
}

int qb_ipcc_connect_continue(struct qb_ipcc_connection * c)
{
struct qb_ipc_connection_response response;
int32_t res;

/* Finish up the authentication part */
res = qb_ipcc_setup_connect_continue(c, &response);
if (res != 0) {
goto disconnect_and_cleanup;
}

c->response.type = response.connection_type;
c->request.type = response.connection_type;
c->event.type = response.connection_type;
Expand Down Expand Up @@ -79,7 +143,7 @@ qb_ipcc_connect(const char *name, size_t max_msg_size)
goto disconnect_and_cleanup;
}
c->is_connected = QB_TRUE;
return c;
return 0;

disconnect_and_cleanup:
if (c->setup.u.us.sock >= 0) {
Expand All @@ -88,7 +152,8 @@ qb_ipcc_connect(const char *name, size_t max_msg_size)
free(c->receive_buf);
free(c);
errno = -res;
return NULL;
return -res;

}

static int32_t
Expand Down
80 changes: 80 additions & 0 deletions tests/check_ipc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,62 @@ send_and_check(int32_t req_id, uint32_t size,
return res;
}


static int32_t
process_async_connect(int32_t fd, int32_t revents, void *data)
{
qb_loop_t *cl = (qb_loop_t *)data;
int res;

res = qb_ipcc_connect_continue(conn);
ck_assert_int_eq(res, 0);
qb_loop_stop(cl);
return 0;
}
static void test_ipc_connect_async(void)
{
struct qb_ipc_request_header req_header;
struct qb_ipc_response_header res_header;
int32_t res;
pid_t pid;
uint32_t max_size = MAX_MSG_SIZE;
int connect_fd;
struct iovec iov[1];
static qb_loop_t *cl;

pid = run_function_in_new_process("server", run_ipc_server, NULL);
ck_assert(pid != -1);

conn = qb_ipcc_connect_async(ipc_name, max_size, &connect_fd);
ck_assert(conn != NULL);

cl = qb_loop_create();
res = qb_loop_poll_add(cl, QB_LOOP_MED,
connect_fd, POLLIN,
cl, process_async_connect);
ck_assert_int_eq(res, 0);
qb_loop_run(cl);

/* Send some data */
req_header.id = IPC_MSG_REQ_TX_RX;
req_header.size = sizeof(struct qb_ipc_request_header);

iov[0].iov_len = req_header.size;
iov[0].iov_base = &req_header;

res = qb_ipcc_sendv_recv(conn, iov, 1,
&res_header,
sizeof(struct qb_ipc_response_header), 5000);

ck_assert_int_ge(res, 0);

request_server_exit();
verify_graceful_stop(pid);


qb_ipcc_disconnect(conn);
}

static void
test_ipc_txrx_timeout(void)
{
Expand Down Expand Up @@ -1226,6 +1282,7 @@ START_TEST(test_ipc_txrx_shm_timeout)
}
END_TEST


START_TEST(test_ipc_txrx_us_timeout)
{
qb_enter();
Expand All @@ -1236,6 +1293,25 @@ START_TEST(test_ipc_txrx_us_timeout)
}
END_TEST

START_TEST(test_ipc_shm_connect_async)
{
qb_enter();
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
test_ipc_connect_async();
qb_leave();
}
END_TEST

START_TEST(test_ipc_us_connect_async)
{
qb_enter();
ipc_type = QB_IPC_SHM;
set_ipc_name(__func__);
test_ipc_connect_async();
qb_leave();
}
END_TEST

START_TEST(test_ipc_txrx_shm_getauth)
{
Expand Down Expand Up @@ -2277,6 +2353,8 @@ make_shm_suite(void)
TCase *tc;
Suite *s = suite_create("shm");

add_tcase(s, tc, test_ipc_shm_connect_async, 7);

add_tcase(s, tc, test_ipc_txrx_shm_getauth, 7);
add_tcase(s, tc, test_ipc_txrx_shm_timeout, 28);
add_tcase(s, tc, test_ipc_server_fail_shm, 7);
Expand Down Expand Up @@ -2308,6 +2386,8 @@ make_soc_suite(void)
Suite *s = suite_create("socket");
TCase *tc;

add_tcase(s, tc, test_ipc_us_connect_async, 7);

add_tcase(s, tc, test_ipc_txrx_us_getauth, 7);
add_tcase(s, tc, test_ipc_txrx_us_timeout, 28);
/* Commented out for the moment as space in /dev/shm on the CI machines
Expand Down

0 comments on commit 255b1c4

Please sign in to comment.