Skip to content

Commit

Permalink
UCT/CM/RDMACM: add UCX_RDMA_CM_SOURCE_ADDRESS config
Browse files Browse the repository at this point in the history
  • Loading branch information
evgeny-leksikov committed Dec 16, 2020
1 parent d0c7539 commit 9788cb3
Show file tree
Hide file tree
Showing 14 changed files with 262 additions and 73 deletions.
27 changes: 27 additions & 0 deletions src/ucs/sys/sock.c
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,33 @@ const char* ucs_sockaddr_str(const struct sockaddr *sock_addr,
return str;
}

ucs_status_t ucs_sock_ipstr_to_sockaddr(const char *ip_str,
struct sockaddr_storage *sa_storage)
{
struct sockaddr_in* sa_in;
struct sockaddr_in6* sa_in6;
int ret;

/* try IPv4 */
sa_in = (struct sockaddr_in*)sa_storage;
sa_in->sin_family = AF_INET;
ret = inet_pton(AF_INET, ip_str, &sa_in->sin_addr);
if (ret == 1) {
return UCS_OK;
}

/* try IPv6 */
sa_in6 = (struct sockaddr_in6*)sa_storage;
sa_in6->sin6_family = AF_INET6;
ret = inet_pton(AF_INET6, ip_str, &sa_in6->sin6_addr);
if (ret == 1) {
return UCS_OK;
}

ucs_error("invalid address %s", ip_str);
return UCS_ERR_INVALID_ADDR;
}

int ucs_sockaddr_cmp(const struct sockaddr *sa1,
const struct sockaddr *sa2,
ucs_status_t *status_p)
Expand Down
14 changes: 14 additions & 0 deletions src/ucs/sys/sock.h
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,20 @@ const char* ucs_sockaddr_str(const struct sockaddr *sock_addr,
char *str, size_t max_size);


/**
* Extract the IP address from a given string and return it as a sockaddr storage.
*
* @param [in] ip_str A string to take IP address from.
* @param [out] sa_storage sockaddr storage filled with the IP address and
* address family.
*
* @return UCS_OK if @a ip_str has a valid IP address or UCS_ERR_INVALID_ADDR
* otherwise.
*/
ucs_status_t ucs_sock_ipstr_to_sockaddr(const char *ip_str,
struct sockaddr_storage *sa_storage);


/**
* Check if the address family of the given sockaddr is IPv4 or IPv6
*
Expand Down
48 changes: 48 additions & 0 deletions src/uct/ib/rdmacm/rdmacm_cm.c
Original file line number Diff line number Diff line change
Expand Up @@ -621,9 +621,46 @@ static uct_iface_ops_t uct_rdmacm_cm_iface_ops = {
.iface_is_reachable = (uct_iface_is_reachable_func_t)ucs_empty_function_return_zero
};

static ucs_status_t
uct_rdmacm_cm_ipstr_to_sockaddr(const char *ip_str, struct sockaddr **saddr_p,
const char *debug_name)
{
struct sockaddr_storage *sa_storage;
ucs_status_t status;

/* NULL-pointer for empty parameter */
if (ip_str[0] == '\0') {
sa_storage = NULL;
goto out;
}

sa_storage = ucs_calloc(1, sizeof(struct sockaddr_storage), debug_name);
if (sa_storage == NULL) {
status = UCS_ERR_NO_MEMORY;
ucs_error("cannot allocate memory for rdmacm source address");
goto err;
}

status = ucs_sock_ipstr_to_sockaddr(ip_str, sa_storage);
if (status != UCS_OK) {
goto err_free;
}

out:
*saddr_p = (struct sockaddr*)sa_storage;
return UCS_OK;

err_free:
ucs_free(sa_storage);
err:
return status;
}

UCS_CLASS_INIT_FUNC(uct_rdmacm_cm_t, uct_component_h component,
uct_worker_h worker, const uct_cm_config_t *config)
{
const uct_rdmacm_cm_config_t *rdmacm_config = ucs_derived_of(config,
uct_rdmacm_cm_config_t);
uct_priv_worker_t *worker_priv;
ucs_status_t status;

Expand Down Expand Up @@ -657,11 +694,20 @@ UCS_CLASS_INIT_FUNC(uct_rdmacm_cm_t, uct_component_h component,
goto err_destroy_ev_ch;
}

status = uct_rdmacm_cm_ipstr_to_sockaddr(rdmacm_config->src_addr,
&self->config.src_addr,
"rdmacm_src_addr");
if (status != UCS_OK) {
goto ucs_async_remove_handler;
}

ucs_debug("created rdmacm_cm %p with event_channel %p (fd=%d)",
self, self->ev_ch, self->ev_ch->fd);

return UCS_OK;

ucs_async_remove_handler:
ucs_async_remove_handler(self->ev_ch->fd, 1);
err_destroy_ev_ch:
rdma_destroy_event_channel(self->ev_ch);
err:
Expand All @@ -672,6 +718,8 @@ UCS_CLASS_CLEANUP_FUNC(uct_rdmacm_cm_t)
{
ucs_status_t status;

ucs_free(self->config.src_addr);

status = ucs_async_remove_handler(self->ev_ch->fd, 1);
if (status != UCS_OK) {
ucs_warn("failed to remove event handler for fd %d: %s",
Expand Down
10 changes: 10 additions & 0 deletions src/uct/ib/rdmacm/rdmacm_cm.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,19 @@ typedef struct uct_rdmacm_cm {
uct_cm_t super;
struct rdma_event_channel *ev_ch;
khash_t(uct_rdmacm_cm_cqs) cqs;

struct {
struct sockaddr *src_addr;
} config;
} uct_rdmacm_cm_t;


typedef struct uct_rdmacm_cm_config {
uct_cm_config_t super;
char *src_addr;
} uct_rdmacm_cm_config_t;


UCS_CLASS_DECLARE_NEW_FUNC(uct_rdmacm_cm_t, uct_cm_t, uct_component_h,
uct_worker_h, const uct_cm_config_t *);
UCS_CLASS_DECLARE_DELETE_FUNC(uct_rdmacm_cm_t, uct_cm_t);
Expand Down
8 changes: 5 additions & 3 deletions src/uct/ib/rdmacm/rdmacm_cm_ep.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ ucs_status_t uct_rdmacm_cm_ep_pack_cb(uct_rdmacm_cm_ep_t *cep,
static ucs_status_t uct_rdamcm_cm_ep_client_init(uct_rdmacm_cm_ep_t *cep,
const uct_ep_params_t *params)
{
uct_rdmacm_cm_t *rdmacm_cm = uct_rdmacm_cm_ep_get_cm(cep);
uct_cm_base_ep_t *cm_ep = &cep->super;
uct_rdmacm_cm_t *rdmacm_cm = uct_rdmacm_cm_ep_get_cm(cep);
char ip_port_str[UCS_SOCKADDR_STRING_LEN];
char ep_str[UCT_RDMACM_EP_STRING_LEN];
ucs_status_t status;
Expand Down Expand Up @@ -284,8 +284,10 @@ static ucs_status_t uct_rdamcm_cm_ep_client_init(uct_rdmacm_cm_ep_t *cep,
* thread. Therefore, all ep fields have to be initialized before this
* function is called. */
ucs_trace("%s: rdma_resolve_addr on cm_id %p",
uct_rdmacm_cm_ep_str(cep, ep_str, UCT_RDMACM_EP_STRING_LEN), cep->id);
if (rdma_resolve_addr(cep->id, NULL, (struct sockaddr*)params->sockaddr->addr,
uct_rdmacm_cm_ep_str(cep, ep_str, UCT_RDMACM_EP_STRING_LEN),
cep->id);
if (rdma_resolve_addr(cep->id, rdmacm_cm->config.src_addr,
(struct sockaddr*)params->sockaddr->addr,
1000/* TODO */)) {
ucs_error("rdma_resolve_addr() to dst addr %s failed: %m",
ucs_sockaddr_str((struct sockaddr*)params->sockaddr->addr,
Expand Down
18 changes: 15 additions & 3 deletions src/uct/ib/rdmacm/rdmacm_md.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,23 @@ static ucs_config_field_t uct_rdmacm_md_config_table[] = {

{"ADDR_RESOLVE_TIMEOUT", "500ms",
"Time to wait for address resolution to complete",
ucs_offsetof(uct_rdmacm_md_config_t, addr_resolve_timeout), UCS_CONFIG_TYPE_TIME},
ucs_offsetof(uct_rdmacm_md_config_t, addr_resolve_timeout), UCS_CONFIG_TYPE_TIME},

{NULL}
};

static ucs_config_field_t uct_rdmacm_cm_config_table[] = {
{"", "", NULL,
ucs_offsetof(uct_rdmacm_cm_config_t, super), UCS_CONFIG_TYPE_TABLE(uct_cm_config_table)},

{"SOURCE_ADDRESS", "",
"If non-empty, specify the local source address (IPv4 or IPv6) to use \n"
"when creating a client connection",
ucs_offsetof(uct_rdmacm_cm_config_t, src_addr), UCS_CONFIG_TYPE_STRING},

{NULL}
};

static void uct_rdmacm_md_close(uct_md_h md);

static uct_md_ops_t uct_rdmacm_md_ops = {
Expand Down Expand Up @@ -253,8 +265,8 @@ uct_component_t uct_rdmacm_component = {
.cm_config = {
.name = "RDMA-CM connection manager",
.prefix = "RDMA_CM_",
.table = uct_cm_config_table,
.size = sizeof(uct_cm_config_t),
.table = uct_rdmacm_cm_config_table,
.size = sizeof(uct_rdmacm_cm_config_t),
},
.tl_list = UCT_COMPONENT_TL_LIST_INITIALIZER(&uct_rdmacm_component),
#if HAVE_RDMACM_QP_LESS
Expand Down
12 changes: 12 additions & 0 deletions test/gtest/common/test_helpers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,18 @@ const struct sockaddr* sock_addr_storage::get_sock_addr_ptr() const {
return m_is_valid ? (struct sockaddr *)(&m_storage) : NULL;
}

const void* sock_addr_storage::get_sock_addr_in_buf() const {
const struct sockaddr* saddr = get_sock_addr_ptr();

ucs_assert_always(saddr != NULL);
ucs_assert_always((saddr->sa_family == AF_INET) ||
(saddr->sa_family == AF_INET6));

return (saddr->sa_family == AF_INET) ?
(const void*)&((struct sockaddr_in*)saddr)->sin_addr :
(const void*)&((struct sockaddr_in6*)saddr)->sin6_addr;
}

std::ostream& operator<<(std::ostream& os, const sock_addr_storage& sa_storage)
{
return os << ucs::sockaddr_to_str(sa_storage.get_sock_addr_ptr());
Expand Down
2 changes: 2 additions & 0 deletions test/gtest/common/test_helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ class sock_addr_storage {

const struct sockaddr* get_sock_addr_ptr() const;

const void* get_sock_addr_in_buf() const;

private:
struct sockaddr_storage m_storage;
size_t m_size;
Expand Down
18 changes: 12 additions & 6 deletions test/gtest/ucs/test_sock.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,13 +154,14 @@ UCS_TEST_F(test_socket, sockaddr_get_inet_addr) {
}
}

UCS_TEST_F(test_socket, sockaddr_str) {
const uint16_t port = 65534;
const char *ipv4_addr = "192.168.122.157";
const char *ipv6_addr = "fe80::218:e7ff:fe16:fb97";
UCS_TEST_F(test_socket, str_sockaddr_str) {
const uint16_t port = 65534;
const char *ipv4_addr = "192.168.122.157";
const char *ipv6_addr = "fe80::218:e7ff:fe16:fb97";
struct sockaddr_in sa_in;
struct sockaddr_in6 sa_in6;
char ipv4_addr_out[128], ipv6_addr_out[128], *str, test_str[1024];
ucs_status_t status;

sa_in.sin_family = AF_INET;
sa_in.sin_port = htons(port);
Expand All @@ -170,8 +171,13 @@ UCS_TEST_F(test_socket, sockaddr_str) {
sprintf(ipv4_addr_out, "%s:%d", ipv4_addr, port);
sprintf(ipv6_addr_out, "%s:%d", ipv6_addr, port);

inet_pton(AF_INET, ipv4_addr, &(sa_in.sin_addr));
inet_pton(AF_INET6, ipv6_addr, &(sa_in6.sin6_addr));
status = ucs_sock_ipstr_to_sockaddr(ipv4_addr,
(struct sockaddr_storage *)&sa_in);
ASSERT_EQ(UCS_OK, status);

status = ucs_sock_ipstr_to_sockaddr(ipv6_addr,
(struct sockaddr_storage *)&sa_in6);
ASSERT_EQ(UCS_OK, status);

/* Check with short `str_len` to fit IP address only */
{
Expand Down
34 changes: 30 additions & 4 deletions test/gtest/uct/ib/test_sockaddr.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,9 @@ class test_uct_sockaddr : public uct_test {
/* initiate the client's private data callback argument */
client->max_conn_priv = server->iface_attr().max_conn_priv;

UCS_TEST_MESSAGE << "Testing " << m_listen_addr
<< " Interface: " << GetParam()->dev_name;
UCS_TEST_MESSAGE << "Testing " << GetParam()->component_name
<< " on " << m_listen_addr
<< " interface: " << GetParam()->dev_name;
}

size_t iface_priv_data_do_pack(void *priv_data)
Expand Down Expand Up @@ -391,12 +392,33 @@ class test_uct_cm_sockaddr : public uct_test {
}

void init() {
struct {
bool is_set;
char cstr[UCS_SOCKADDR_STRING_LEN];
} src_addr = {
.is_set = false,
.cstr = {0}
};

uct_test::init();

/* This address is accessible, as it was tested at the resource creation */
m_listen_addr = GetParam()->listen_sock_addr;
m_connect_addr = GetParam()->connect_sock_addr;

const ucs::sock_addr_storage &src_sock_addr =
GetParam()->source_sock_addr;
if (src_sock_addr.get_sock_addr_ptr() != NULL) {
int sa_family = src_sock_addr.get_sock_addr_ptr()->sa_family;
const char *ret = inet_ntop(sa_family,
src_sock_addr.get_sock_addr_in_buf(),
src_addr.cstr, UCS_SOCKADDR_STRING_LEN);
EXPECT_EQ(src_addr.cstr, ret);
set_config((std::string("RDMA_CM_SOURCE_ADDRESS?=") +
src_addr.cstr).c_str());
src_addr.is_set = true;
}

uint16_t port = ucs::get_port();
m_listen_addr.set_port(port);
m_connect_addr.set_port(port);
Expand All @@ -418,8 +440,12 @@ class test_uct_cm_sockaddr : public uct_test {
m_long_priv_data.resize(m_long_priv_data_len);
ucs::fill_random(m_long_priv_data);

UCS_TEST_MESSAGE << "Testing " << m_listen_addr
<< " Interface: " << GetParam()->dev_name;
UCS_TEST_MESSAGE << "Testing " << GetParam()->component_name << " on "
<< m_listen_addr << " interface "
<< GetParam()->dev_name
<< (src_addr.is_set ?
(std::string(" with RDMA_CM_SOURCE_ADDRESS=") +
src_addr.cstr) : "");
}

protected:
Expand Down
4 changes: 2 additions & 2 deletions test/gtest/uct/test_mm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ class test_uct_mm : public uct_test {
std::string shm_dir;

mm_resource(const resource& res, const std::string& shm_dir = "") :
resource(res.component, res.md_name, res.local_cpus, res.tl_name,
res.dev_name, res.dev_type),
resource(res.component, res.component_name, res.md_name,
res.local_cpus, res.tl_name, res.dev_name, res.dev_type),
shm_dir(shm_dir)
{
}
Expand Down
7 changes: 4 additions & 3 deletions test/gtest/uct/uct_p2p_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ class uct_p2p_test : public uct_test {
bool loopback;

p2p_resource(const resource& res) :
resource(res.component, res.md_name, res.local_cpus,
res.tl_name, res.dev_name, res.dev_type),
loopback(false) { }
resource(res.component, res.component_name, res.md_name,
res.local_cpus, res.tl_name, res.dev_name,
res.dev_type), loopback(false) {
}
};

virtual void test_xfer(send_func_t send, size_t length, unsigned flags,
Expand Down
Loading

0 comments on commit 9788cb3

Please sign in to comment.