Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix on osc/mt_v4 #6

Merged
merged 3 commits into from
Dec 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions ompi/mca/osc/ucx/osc_ucx.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,4 @@ int ompi_osc_find_attached_region_position(ompi_osc_dynamic_win_info_t *dynamic_
int min_index, int max_index,
uint64_t base, size_t len, int *insert);

void req_completion(void *request, ucs_status_t status);
void internal_req_init(void *request);

#endif /* OMPI_OSC_UCX_H */
30 changes: 8 additions & 22 deletions ompi/mca/osc/ucx/osc_ucx_comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,6 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
ompi_osc_ucx_request_t *ucx_req = NULL;
ompi_osc_ucx_internal_request_t *internal_req = NULL;
int ret = OMPI_SUCCESS;

ret = check_sync_state(module, target, true);
Expand Down Expand Up @@ -848,21 +847,15 @@ int ompi_osc_ucx_rput(const void *origin_addr, int origin_count,
return OMPI_ERROR;
}

mca_osc_ucx_component.num_incomplete_req_ops++;
ret = opal_common_ucx_wpmem_fetch_nb(module->mem, UCP_ATOMIC_FETCH_OP_FADD,
0, target, &(module->req_result),
sizeof(uint64_t), remote_addr,
(ucs_status_ptr_t *)&internal_req);
0, target, &(module->req_result),
sizeof(uint64_t), remote_addr,
req_completion, ucx_req);
if (ret != OMPI_SUCCESS) {
return ret;
}

if (UCS_PTR_IS_PTR(internal_req)) {
internal_req->external_req = ucx_req;
mca_osc_ucx_component.num_incomplete_req_ops++;
} else {
ompi_request_complete(&ucx_req->super, true);
}

*request = &ucx_req->super;

return incr_and_check_ops_num(module, target);
Expand All @@ -876,7 +869,6 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
ompi_osc_ucx_module_t *module = (ompi_osc_ucx_module_t*) win->w_osc_module;
uint64_t remote_addr = (module->addrs[target]) + target_disp * OSC_UCX_GET_DISP(module, target);
ompi_osc_ucx_request_t *ucx_req = NULL;
ompi_osc_ucx_internal_request_t *internal_req = NULL;
int ret = OMPI_SUCCESS;

ret = check_sync_state(module, target, true);
Expand Down Expand Up @@ -906,21 +898,15 @@ int ompi_osc_ucx_rget(void *origin_addr, int origin_count,
return OMPI_ERROR;
}

mca_osc_ucx_component.num_incomplete_req_ops++;
ret = opal_common_ucx_wpmem_fetch_nb(module->mem, UCP_ATOMIC_FETCH_OP_FADD,
0, target, &(module->req_result),
sizeof(uint64_t), remote_addr,
(ucs_status_ptr_t *)&internal_req);
0, target, &(module->req_result),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

increment the counters before calling this. Otherwise may go negative.

sizeof(uint64_t), remote_addr,
req_completion, ucx_req);
if (ret != OMPI_SUCCESS) {
return ret;
}

if (UCS_PTR_IS_PTR(internal_req)) {
internal_req->external_req = ucx_req;
mca_osc_ucx_component.num_incomplete_req_ops++;
} else {
ompi_request_complete(&ucx_req->super, true);
}

*request = &ucx_req->super;

return incr_and_check_ops_num(module, target);
Expand Down
2 changes: 0 additions & 2 deletions ompi/mca/osc/ucx/osc_ucx_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,6 @@ static int component_select(struct ompi_win_t *win, void **base, size_t size, in

ret = opal_common_ucx_wpool_init(mca_osc_ucx_component.wpool,
ompi_proc_world_size(),
internal_req_init,
sizeof(ompi_osc_ucx_internal_request_t),
mca_osc_ucx_component.enable_mpi_threads);
if (OMPI_SUCCESS != ret) {
OSC_UCX_VERBOSE(1, "opal_common_ucx_wpool_init failed: %d", ret);
Expand Down
19 changes: 5 additions & 14 deletions ompi/mca/osc/ucx/osc_ucx_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,11 @@ static void request_construct(ompi_osc_ucx_request_t *request)
request->super.req_cancel = request_cancel;
}

void internal_req_init(void *request) {
ompi_osc_ucx_internal_request_t *req = (ompi_osc_ucx_internal_request_t *)request;
req->external_req = NULL;
}

void req_completion(void *request, ucs_status_t status) {
ompi_osc_ucx_internal_request_t *req = (ompi_osc_ucx_internal_request_t *)request;

if(req->external_req != NULL) {
ompi_request_complete(&(req->external_req->super), true);
ucp_request_release(req);
mca_osc_ucx_component.num_incomplete_req_ops--;
assert(mca_osc_ucx_component.num_incomplete_req_ops >= 0);
}
void req_completion(void *request) {
ompi_osc_ucx_request_t *req = (ompi_osc_ucx_request_t *)request;
ompi_request_complete(&(req->super), true);
mca_osc_ucx_component.num_incomplete_req_ops--;
assert(mca_osc_ucx_component.num_incomplete_req_ops >= 0);
}

OBJ_CLASS_INSTANCE(ompi_osc_ucx_request_t, ompi_request_t,
Expand Down
6 changes: 2 additions & 4 deletions ompi/mca/osc/ucx/osc_ucx_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ typedef struct ompi_osc_ucx_request {

OBJ_CLASS_DECLARATION(ompi_osc_ucx_request_t);

typedef struct ompi_osc_ucx_internal_request {
ompi_osc_ucx_request_t *external_req;
} ompi_osc_ucx_internal_request_t;

#define OMPI_OSC_UCX_REQUEST_ALLOC(win, req) \
do { \
opal_free_list_item_t *item; \
Expand All @@ -52,4 +48,6 @@ typedef struct ompi_osc_ucx_internal_request {
(opal_free_list_item_t*) req); \
} while (0)

void req_completion(void *request);

#endif /* OMPI_OSC_UCX_REQUEST_H */
6 changes: 4 additions & 2 deletions opal/mca/common/ucx/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ headers = \
common_ucx.h \
common_ucx_int.h \
common_ucx_wpool.h \
common_ucx_wpool_int.h
common_ucx_wpool_int.h \
common_ucx_request.h

# Source files

sources = \
common_ucx.c \
common_ucx_wpool.c
common_ucx_wpool.c \
common_ucx_request.c

# Help file

Expand Down
1 change: 1 addition & 0 deletions opal/mca/common/ucx/common_ucx.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@

#include "common_ucx_int.h"
#include "common_ucx_wpool.h"
#include "common_ucx_request.h"

#endif
4 changes: 3 additions & 1 deletion opal/mca/common/ucx/common_ucx_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#define COMMON_UCX_INT_H

#include "opal_config.h"
#include "common_ucx_request.h"

#include <stdint.h>

Expand Down Expand Up @@ -170,10 +171,11 @@ static inline
ucs_status_ptr_t opal_common_ucx_atomic_fetch_nb(ucp_ep_h ep, ucp_atomic_fetch_op_t opcode,
uint64_t value, void *result, size_t op_size,
uint64_t remote_addr, ucp_rkey_h rkey,
ucp_send_callback_t req_handler,
ucp_worker_h worker)
{
return ucp_atomic_fetch_nb(ep, opcode, value, result, op_size,
remote_addr, rkey, opal_common_ucx_empty_complete_cb);
remote_addr, rkey, req_handler);
}

static inline
Expand Down
17 changes: 17 additions & 0 deletions opal/mca/common/ucx/common_ucx_request.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#include "common_ucx_request.h"

OPAL_DECLSPEC void
opal_common_ucx_req_init(void *request) {
opal_common_ucx_request_t *req = (opal_common_ucx_request_t *)request;
req->ext_req = NULL;
req->ext_cb = NULL;
}

OPAL_DECLSPEC void
opal_common_ucx_req_completion(void *request, ucs_status_t status) {
opal_common_ucx_request_t *req = (opal_common_ucx_request_t *)request;
if (req->ext_cb != NULL) {
(*req->ext_cb)(req->ext_req);
}
ucp_request_release(req);
}
17 changes: 17 additions & 0 deletions opal/mca/common/ucx/common_ucx_request.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#ifndef COMMON_UCX_REQUEST_H
#define COMMON_UCX_REQUEST_H

#include "opal_config.h"
#include <ucp/api/ucp.h>

typedef void (*opal_common_ucx_user_req_handler_t)(void *request);

typedef struct {
void *ext_req;
opal_common_ucx_user_req_handler_t ext_cb;
} opal_common_ucx_request_t;

OPAL_DECLSPEC void opal_common_ucx_req_init(void *request);
OPAL_DECLSPEC void opal_common_ucx_req_completion(void *request, ucs_status_t status);

#endif // COMMON_UCX_REQUEST_H
53 changes: 24 additions & 29 deletions opal/mca/common/ucx/common_ucx_wpool.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,7 @@ opal_common_ucx_wpool_free(opal_common_ucx_wpool_t *wpool)

OPAL_DECLSPEC int
opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
int proc_world_size,
ucp_request_init_callback_t req_init_ptr,
size_t req_size, bool enable_mt)
int proc_world_size, bool enable_mt)
{
ucp_config_t *config = NULL;
ucp_params_t context_params;
Expand Down Expand Up @@ -164,8 +162,8 @@ opal_common_ucx_wpool_init(opal_common_ucx_wpool_t *wpool,
UCP_FEATURE_AMO64;
context_params.mt_workers_shared = (enable_mt ? 1 : 0);
context_params.estimated_num_eps = proc_world_size;
context_params.request_init = req_init_ptr;
context_params.request_size = req_size;
context_params.request_init = opal_common_ucx_req_init;
context_params.request_size = sizeof(opal_common_ucx_request_t);

status = ucp_init(&context_params, config, &wpool->ucp_ctx);
ucp_config_release(config);
Expand Down Expand Up @@ -817,20 +815,19 @@ static void _common_ucx_tls_cleanup(_tlocal_table_t *tls)
// Cleanup memory table
size = tls->mem_tbl_size;
for (i = 0; i < size; i++) {
if (NULL == tls->mem_tbl[i]->gmem){
continue;
if (NULL != tls->mem_tbl[i]->gmem){
_tlocal_mem_record_cleanup(tls->mem_tbl[i]);
}
_tlocal_mem_record_cleanup(tls->mem_tbl[i]);

free(tls->mem_tbl[i]);
}

// Cleanup ctx table
size = tls->ctx_tbl_size;
for (i = 0; i < size; i++) {
if (NULL == tls->ctx_tbl[i]->gctx){
continue;
if (NULL != tls->ctx_tbl[i]->gctx){
_tlocal_ctx_record_cleanup(tls->ctx_tbl[i]);
}
_tlocal_ctx_record_cleanup(tls->ctx_tbl[i]);
free(tls->ctx_tbl[i]);
}

Expand Down Expand Up @@ -918,7 +915,7 @@ static _tlocal_ctx_t *
_tlocal_add_ctx(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx)
{
size_t i, free_idx = -1;
int rc;
int rc, found = 0;

/* Try to find available record in the TLS table
* In parallel perform deferred cleanups */
Expand All @@ -929,14 +926,15 @@ _tlocal_add_ctx(_tlocal_table_t *tls, opal_common_ucx_ctx_t *ctx)
_tlocal_ctx_record_cleanup(tls->ctx_tbl[i]);
}
}
if ((NULL != tls->ctx_tbl[i]->gctx) && (0 > free_idx)) {
if ((NULL == tls->ctx_tbl[i]->gctx) && !found) {
/* Found clean record */
free_idx = i;
found = 1;
}
}

/* if needed - extend the table */
if (0 > free_idx) {
if (!found) {
free_idx = tls->ctx_tbl_size;
rc = _tlocal_tls_ctxtbl_extend(tls, 4);
if (rc) {
Expand Down Expand Up @@ -1025,15 +1023,6 @@ _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec)
size_t i;
WPOOL_DBG_OUT(_dbg_tls || _dbg_mem, "record=%p, is_freed = %d\n",
(void *)mem_rec, mem_rec->gmem->released);
if (mem_rec->gmem->released) {
return;
}
/* Remove myself from the memory context structure
* This may result in context release as we are using
* delayed cleanup */
_common_ucx_mem_signout(mem_rec->gmem);
WPOOL_DBG_OUT(_dbg_tls || _dbg_mem, "gmem = %p mem_rec = %p\n",
(void *)mem_rec->gmem, (void *)mem_rec);

for(i = 0; i < mem_rec->gmem->ctx->comm_size; i++) {
if (mem_rec->mem->rkeys[i]) {
Expand All @@ -1044,6 +1033,13 @@ _tlocal_mem_record_cleanup(_tlocal_mem_t *mem_rec)
}
free(mem_rec->mem->rkeys);

/* Remove myself from the memory context structure
* This may result in context release as we are using
* delayed cleanup */
_common_ucx_mem_signout(mem_rec->gmem);
WPOOL_DBG_OUT(_dbg_tls || _dbg_mem, "gmem = %p mem_rec = %p\n",
(void *)mem_rec->gmem, (void *)mem_rec);

/* Release fast-path pointers */
if (NULL != mem_rec->mem_tls_ptr) {
free(mem_rec->mem_tls_ptr);
Expand All @@ -1059,24 +1055,24 @@ static _tlocal_mem_t *_tlocal_add_mem(_tlocal_table_t *tls,
{
size_t i, free_idx = -1;
_tlocal_ctx_t *ctx_rec = NULL;
int rc = OPAL_SUCCESS;
int rc = OPAL_SUCCESS, found = 0;

/* Try to find available spot in the table */
for (i=0; i<tls->mem_tbl_size; i++) {
if (NULL == tls->mem_tbl[i]->gmem) {
if (NULL != tls->mem_tbl[i]->gmem) {
if (tls->mem_tbl[i]->gmem->released) {
/* Found a dirty record. Need to clean it first */
_tlocal_mem_record_cleanup(tls->mem_tbl[i]);
break;
}
}
if ((NULL == tls->mem_tbl[i]->gmem) && (0 > free_idx)) {
if ((NULL == tls->mem_tbl[i]->gmem) && !found) {
/* Found a clear record */
free_idx = i;
found = 1;
}
}

if (0 > free_idx){
if (!found){
free_idx = tls->mem_tbl_size;
rc = _tlocal_tls_memtbl_extend(tls, 4);
if (rc != OPAL_SUCCESS) {
Expand Down Expand Up @@ -1274,4 +1270,3 @@ opal_common_ucx_wpmem_fence(opal_common_ucx_wpmem_t *mem) {
/* TODO */
return OPAL_SUCCESS;
}

Loading