Skip to content

Commit

Permalink
ompi/osc/ucx: move osc ucx internal req to common ucx directory.
Browse files Browse the repository at this point in the history
Signed-off-by: Xin Zhao <[email protected]>
  • Loading branch information
xinzhao3 committed Dec 1, 2018
1 parent 05d06a6 commit ac21207
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 65 deletions.
3 changes: 0 additions & 3 deletions ompi/mca/osc/ucx/osc_ucx.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,4 @@ int ompi_osc_find_attached_region_position(ompi_osc_dynamic_win_info_t *dynamic_
int min_index, int max_index,
uint64_t base, size_t len, int *insert);

void req_completion(void *request, ucs_status_t status);
void internal_req_init(void *request);

#endif /* OMPI_OSC_UCX_H */
30 changes: 8 additions & 22 deletions ompi/mca/osc/ucx/osc_ucx_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,6 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
ompi_osc_ucx_request_t *ucx_req = NULL;
ompi_osc_ucx_internal_request_t *internal_req = NULL;
int ret = OMPI_SUCCESS;

ret = check_sync_state(module, target, true);
Expand Down Expand Up @@ -848,21 +847,15 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
return OMPI_ERROR;
}

mca_osc_ucx_component.num_incomplete_req_ops++;
ret = opal_common_ucx_wpmem_fetch_nb(module->mem, UCP_ATOMIC_FETCH_OP_FADD,
0, target, &(module->req_result),
sizeof(uint64_t), remote_addr,
(ucs_status_ptr_t *)&internal_req);
0, target, &(module->req_result),
sizeof(uint64_t), remote_addr,
req_completion, ucx_req);
if (ret != OMPI_SUCCESS) {
return ret;
}

if (UCS_PTR_IS_PTR(internal_req)) {
internal_req->external_req = ucx_req;
mca_osc_ucx_component.num_incomplete_req_ops++;
} else {
ompi_request_complete(&ucx_req->super, true);
}

*request = &ucx_req->super;

return incr_and_check_ops_num(module, target);
Expand All @@ -876,7 +869,6 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
ompi_osc_ucx_request_t *ucx_req = NULL;
ompi_osc_ucx_internal_request_t *internal_req = NULL;
int ret = OMPI_SUCCESS;

ret = check_sync_state(module, target, true);
Expand Down Expand Up @@ -906,21 +898,15 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
return OMPI_ERROR;
}

mca_osc_ucx_component.num_incomplete_req_ops++;
ret = opal_common_ucx_wpmem_fetch_nb(module->mem, UCP_ATOMIC_FETCH_OP_FADD,
0, target, &(module->req_result),
sizeof(uint64_t), remote_addr,
(ucs_status_ptr_t *)&internal_req);
0, target, &(module->req_result),
sizeof(uint64_t), remote_addr,
req_completion, ucx_req);
if (ret != OMPI_SUCCESS) {
return ret;
}

if (UCS_PTR_IS_PTR(internal_req)) {
internal_req->external_req = ucx_req;
mca_osc_ucx_component.num_incomplete_req_ops++;
} else {
ompi_request_complete(&ucx_req->super, true);
}

*request = &ucx_req->super;

return incr_and_check_ops_num(module, target);
Expand Down
2 changes: 0 additions & 2 deletions ompi/mca/osc/ucx/osc_ucx_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,6 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in

ret = opal_common_ucx_wpool_init(mca_osc_ucx_component.wpool,
ompi_proc_world_size(),
internal_req_init,
sizeof(ompi_osc_ucx_internal_request_t),
mca_osc_ucx_component.enable_mpi_threads);
if (OMPI_SUCCESS != ret) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_wpool_init failed: %d", ret);
Expand Down
19 changes: 5 additions & 14 deletions ompi/mca/osc/ucx/osc_ucx_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,11 @@ static void request_construct(ompi_osc_ucx_request_t *request)
request->super.req_cancel = request_cancel;
}

void internal_req_init(void *request) {
ompi_osc_ucx_internal_request_t *req = (ompi_osc_ucx_internal_request_t *)request;
req->external_req = NULL;
}

void req_completion(void *request, ucs_status_t status) {
ompi_osc_ucx_internal_request_t *req = (ompi_osc_ucx_internal_request_t *)request;

if(req->external_req != NULL) {
ompi_request_complete(&(req->external_req->super), true);
ucp_request_release(req);
mca_osc_ucx_component.num_incomplete_req_ops--;
assert(mca_osc_ucx_component.num_incomplete_req_ops >= 0);
}
void req_completion(void *request) {
ompi_osc_ucx_request_t *req = (ompi_osc_ucx_request_t *)request;
ompi_request_complete(&(req->super), true);
mca_osc_ucx_component.num_incomplete_req_ops--;
assert(mca_osc_ucx_component.num_incomplete_req_ops >= 0);
}

OBJ_CLASS_INSTANCE(ompi_osc_ucx_request_t, ompi_request_t,
Expand Down
6 changes: 2 additions & 4 deletions ompi/mca/osc/ucx/osc_ucx_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ typedef struct ompi_osc_ucx_request {

OBJ_CLASS_DECLARATION(ompi_osc_ucx_request_t);

typedef struct ompi_osc_ucx_internal_request {
ompi_osc_ucx_request_t *external_req;
} ompi_osc_ucx_internal_request_t;

#define OMPI_OSC_UCX_REQUEST_ALLOC(win, req) \
do { \
opal_free_list_item_t *item; \
Expand All @@ -52,4 +48,6 @@ typedef struct ompi_osc_ucx_internal_request {
(opal_free_list_item_t*) req); \
} while (0)

void req_completion(void *request);

#endif /* OMPI_OSC_UCX_REQUEST_H */
6 changes: 4 additions & 2 deletions opal/mca/common/ucx/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ headers = \
common_ucx.h \
common_ucx_int.h \
common_ucx_wpool.h \
common_ucx_wpool_int.h
common_ucx_wpool_int.h \
common_ucx_request.h

# Source files

sources = \
common_ucx.c \
common_ucx_wpool.c
common_ucx_wpool.c \
common_ucx_request.c

# Help file

Expand Down
1 change: 1 addition & 0 deletions opal/mca/common/ucx/common_ucx.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@

#include "common_ucx_int.h"
#include "common_ucx_wpool.h"
#include "common_ucx_request.h"

#endif
4 changes: 3 additions & 1 deletion opal/mca/common/ucx/common_ucx_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define COMMON_UCX_INT_H

#include "opal_config.h"
#include "common_ucx_request.h"

#include <stdint.h>

Expand Down Expand Up @@ -170,10 +171,11 @@ static inline
ucs_status_ptr_t opal_common_ucx_atomic_fetch_nb(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
uint64_t value, void *result, size_t op_size,
uint64_t remote_addr, ucp_rkey_h rkey,
ucp_send_callback_t req_handler,
ucp_worker_h worker)
{
return ucp_atomic_fetch_nb(ep, opcode, value, result, op_size,
remote_addr, rkey, opal_common_ucx_empty_complete_cb);
remote_addr, rkey, req_handler);
}

static inline
Expand Down
17 changes: 17 additions & 0 deletions opal/mca/common/ucx/common_ucx_request.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#include "common_ucx_request.h"

OPAL_DECLSPEC void
opal_common_ucx_req_init(void *request) {
opal_common_ucx_request_t *req = (opal_common_ucx_request_t *)request;
req->ext_req = NULL;
req->ext_cb = NULL;
}

OPAL_DECLSPEC void
opal_common_ucx_req_completion(void *request, ucs_status_t status) {
opal_common_ucx_request_t *req = (opal_common_ucx_request_t *)request;
if (req->ext_cb != NULL) {
(*req->ext_cb)(req->ext_req);
}
ucp_request_release(req);
}
17 changes: 17 additions & 0 deletions opal/mca/common/ucx/common_ucx_request.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#ifndef COMMON_UCX_REQUEST_H
#define COMMON_UCX_REQUEST_H

#include "opal_config.h"
#include <ucp/api/ucp.h>

typedef void (*opal_common_ucx_user_req_handler_t)(void *request);

typedef struct {
void *ext_req;
opal_common_ucx_user_req_handler_t ext_cb;
} opal_common_ucx_request_t;

OPAL_DECLSPEC void opal_common_ucx_req_init(void *request);
OPAL_DECLSPEC void opal_common_ucx_req_completion(void *request, ucs_status_t status);

#endif // COMMON_UCX_REQUEST_H
9 changes: 3 additions & 6 deletions opal/mca/common/ucx/common_ucx_wpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool)

OPAL_DECLSPEC int
opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
int proc_world_size,
ucp_request_init_callback_t req_init_ptr,
size_t req_size, bool enable_mt)
int proc_world_size, bool enable_mt)
{
ucp_config_t *config = NULL;
ucp_params_t context_params;
Expand Down Expand Up @@ -164,8 +162,8 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
UCP_FEATURE_AMO64;
context_params.mt_workers_shared = (enable_mt ? 1 : 0);
context_params.estimated_num_eps = proc_world_size;
context_params.request_init = req_init_ptr;
context_params.request_size = req_size;
context_params.request_init = opal_common_ucx_req_init;
context_params.request_size = sizeof(opal_common_ucx_request_t);

status = ucp_init(&context_params, config, &wpool->ucp_ctx);
ucp_config_release(config);
Expand Down Expand Up @@ -1272,4 +1270,3 @@ opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem) {
/* TODO */
return OPAL_SUCCESS;
}

34 changes: 23 additions & 11 deletions opal/mca/common/ucx/common_ucx_wpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include "opal_config.h"

#include "common_ucx_int.h"
#include "common_ucx_request.h"
#include <stdint.h>

#include <ucp/api/ucp.h>
Expand Down Expand Up @@ -176,9 +177,7 @@ static inline void opal_common_ucx_wpool_dbg_init(void)
OPAL_DECLSPEC opal_common_ucx_wpool_t * opal_common_ucx_wpool_allocate(void);
OPAL_DECLSPEC void opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool);
OPAL_DECLSPEC int opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
int proc_world_size,
ucp_request_init_callback_t req_init_ptr,
size_t req_size, bool enable_mt);
int proc_world_size, bool enable_mt);
OPAL_DECLSPEC void opal_common_ucx_wpool_finalize(opal_common_ucx_wpool_t *wpool);
OPAL_DECLSPEC void opal_common_ucx_wpool_progress(opal_common_ucx_wpool_t *wpool);

Expand Down Expand Up @@ -394,27 +393,40 @@ opal_common_ucx_wpmem_fetch(opal_common_ucx_wpmem_t *mem,

static inline int
opal_common_ucx_wpmem_fetch_nb(opal_common_ucx_wpmem_t *mem,
ucp_atomic_fetch_op_t opcode,
uint64_t value,
int target, void *buffer, size_t len,
uint64_t rem_addr, ucs_status_ptr_t *ptr)
ucp_atomic_fetch_op_t opcode,
uint64_t value,
int target, void *buffer, size_t len,
uint64_t rem_addr,
opal_common_ucx_user_req_handler_t user_req_cb,
void *user_req_ptr)
{
ucp_ep_h ep = NULL;
ucp_rkey_h rkey = NULL;
opal_common_ucx_winfo_t *winfo = NULL;
int rc = OPAL_SUCCESS;
opal_common_ucx_request_t *req;

rc = opal_common_ucx_tlocal_fetch(mem, target, &ep, &rkey, &winfo);
if(OPAL_UNLIKELY(OPAL_SUCCESS != rc)){
MCA_COMMON_UCX_ERROR("tlocal_fetch failed: %d", rc);
return rc;
}
/* Perform the operation */
opal_mutex_lock(&winfo->mutex);
(*ptr) = opal_common_ucx_atomic_fetch_nb(ep, opcode, value,
buffer, len,
rem_addr, rkey,
winfo->worker);
req = opal_common_ucx_atomic_fetch_nb(ep, opcode, value, buffer, len,
rem_addr, rkey, opal_common_ucx_req_completion,
winfo->worker);
opal_mutex_unlock(&winfo->mutex);

if (UCS_PTR_IS_PTR(req)) {
req->ext_req = user_req_ptr;
req->ext_cb = user_req_cb;
} else {
if (user_req_cb != NULL) {
(*user_req_cb)(user_req_ptr);
}
}

return rc;
}

Expand Down

0 comments on commit ac21207

Please sign in to comment.