-
Notifications
You must be signed in to change notification settings - Fork 865
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add the acoll component #12484
Add the acoll component #12484
Conversation
How this compares with #10470 ? |
We can discuss it at the meeting. Part of the goal of filing the pr was to give people the ability to have a look at it ahead of the meeting if they want/can. |
ompi/mca/coll/acoll/Makefile.am
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have plan to add alltoall(v) to acoll?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we are planning to add alltoall to acoll next.
* chosen, further decides if [ring|lin] allgather is to be used. | ||
* | ||
*/ | ||
static inline void coll_allgather_decision_fixed(int size, size_t total_dsize, int sg_size, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you shed some lights on how to choose which methods for other intel/amd achitectures? you might also want some utility to let the user to adjust the decisions according for other systems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our testing has been mostly focused on Zen architectures, we will soon test on other architectures. We do not have the utility/config option to override decisions, we will plan to add it.
ompi/mca/coll/acoll/README
Outdated
@@ -0,0 +1,15 @@ | |||
Copyright (c) 2023-2024 Advanced Micro Devices, Inc. All rights |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you thought about what needs to be done to extend this for multiple nodes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some of the APIs (like bcast, barrier, allgather) support multi-node case. However, it is not extensively tested for multi-node, we will test them and extend other APIs also to multi-node.
/* | ||
* rd_allgather_sub | ||
* | ||
* Function: Uses recursive doubling based allgather for the group. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have you compared the performance of other methods, besides recursive doubling?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, acoll/allgather chooses among recursive doubling, ring and linear based on process count and message sizes.
} | ||
|
||
/* This barrier is needed to prevent random hangs */ | ||
err = ompi_coll_base_barrier_intra_tree(comm, module); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the barrier is needed here? This barrier will also add cost to small message allgather.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is removed now.
if (sbuf != MPI_IN_PLACE) | ||
memcpy(tmp_rbuf, sbuf, my_count_size * dsize); | ||
} else { | ||
ompi_3buff_op_reduce(op, (char *) data->xpmem_saddr[0] + chunk * rank * dsize, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the 3 operator reduce function to maintain the order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this was a bit faster than copying the chunks first and then reducing later in the following "for" loop.
Please rebase to current main to get rid of the |
I tested the PR in AWS CI. I'm seeing assertion errors with
You can try |
@amd-nithyavs could you rebase this PR to see if that clears up the mpi4py CI failure? |
@hppritcha we did have issues after rebase. Have fixed the issues, will update the PR soon. Thanks. |
The updated PR (yet to be pushed) will fix this issue. Thanks. |
The issue is fixed in the updated PR. |
We have updated the PR, it passes the mpi4py tests. |
Running AWS CI |
@amd-nithyavs I noticed that the PR is currently split into 3 commits. Please squash them before merging. |
Passed AWS CI. Note that we don't test with xpmem. |
Hello! The Git Commit Checker CI bot found a few problems with this PR: 2f7c5e2: Merge latest of local ompiv5
Please fix these problems and, if necessary, force-push new commits back up to the PR branch. Thanks! |
@wenduwan We have rebased to the latest and squashed the commits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed a couple files. Haven't looked into the algorithm themselves yet. Will pick up later.
ompi/mca/coll/acoll/LICENSE.md
Outdated
@@ -0,0 +1,11 @@ | |||
Copyright (C) 2024, Advanced Micro Devices, Inc. All rights reserved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, the copyright notice format in ompi is
Copyright (c) YYYY[-YYYY] Entity etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change in the updated patch
ompi/mca/coll/acoll/configure.m4
Outdated
AC_DEFUN([MCA_ompi_coll_acoll_CONFIG],[ | ||
AC_CONFIG_FILES([ompi/mca/coll/acoll/Makefile]) | ||
|
||
# ToDo: Check for a proper way to pass args 1 and 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain what this TODO means? Positional arguments can be accesses in shell style, e.g. $1
$2
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove TODO. We want acoll to build with or without xpmem, this comment was to check if there is a better way to than " OPAL_CHECK_XPMEM([coll_acoll], [should_build=1], [should_build=1])". Passing $1 $2 wouldn't build acoll if xpmem is not present.
return temp_ptr; | ||
} | ||
|
||
static inline void coll_acoll_free(coll_acoll_reserve_mem_t *reserve_mem_ptr, void *ptr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO the function name could be more specific.
Also I don't quite understand coll_acoll_reserve_mem_t
. It looks to me that each instance tracks a single memory allocation. Since coll_acoll_reserve_mem_t.reserve_mem
already points to the allocation, why does the caller have to pass in the ptr again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use coll_acoll_reserve_mem_t to track the use of a pre-allocated memory. However, when the requested size during coll_acoll_malloc() is greater than that of the pre-allocated memory, we allocate new memory, which needs to be then freed during coll_acoll_free(). Hence the need to pass ptr.
} | ||
} | ||
|
||
static inline int log_sg_bcast_intra(void *buff, int count, struct ompi_datatype_t *datatype, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC the *sg_bcast_intra
functions are only used in allgather, so should they be moved to allgather file?
I don't have a strong opinion, but it caught my eye that we are doing actual collectives in utils - that's unusual.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, will move to allgather.
if ((false == reserve_mem_ptr->reserve_mem_allocate) | ||
|| (false == reserve_mem_ptr->reserve_mem_in_use)) { | ||
if (NULL != ptr) { | ||
free(ptr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the NULL check should be moved to the top of the function. But please see my comment about coll_acoll_reserve_mem_t
- maybe we don't need ptr at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see the reply to the previous comment on coll_acoll_reserve_mem_t. Since ptr will be the same as reserve_mem (the pre-allocated memory) when the memory size needed is less than or equal to that of the pre-allocated memory, the null check should be inside to ensure it is the case corresponding to a new buffer.
ret = -1; | ||
goto error_hndl; | ||
} | ||
sprintf(rc_name, "acoll_%d_%d_%d", cid, rank, i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit If we care about the name length then we should consider snprintf
instead of sprintf
|
||
data->rcache[i] = mca_rcache_base_module_create("grdma", NULL, &rcache_element); | ||
if (data->rcache[i] == NULL) { | ||
printf("Error in rcache create\n"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reminder to clean up printf
in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will do.
ompi/mca/coll/acoll/coll_acoll.h
Outdated
# include "opal/mca/rcache/base/base.h" | ||
# include <xpmem.h> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit I don't think we need to indent includes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, will fix it.
|
||
END_C_DECLS | ||
|
||
#define MCA_COLL_ACOLL_MAX_CID 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See my other comment about CID. I'm not sure if it is intended to be used in collectives.
} | ||
#endif | ||
|
||
void mca_coll_acoll_barrier(coll_acoll_data_t *data, int offset, int *group, int gp_size, int rank, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function seems to be at the wrong file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will rename the function, this is not a generic MPI barrier function, it is used internallly for small message allreduce only
@wenduwan we have addressed the comments and incorporated some big count related changes as well. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looked at allgather. Left a few comments.
for (i = msb_pos + 1, mask = 1 << i; i <= dim; ++i, mask <<= 1) { | ||
peer = sub_rank | mask; | ||
if (peer >= sg_size) { | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Peer is monotonically increasing in the loop. At this point you should be able to break instead of continue.
Also, peer is signed - wouldn't sub_rank | mask
somehow change the sign bit and make it negative?
Overall there are a lot of bit magic, which I'm not good at. Need to get seconds opinions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can change the continue
to break
.
Since size
is int
, sub_rank
, mask
and hence peer
will be ensured to be within the positive int
range.
No magic 🙂, the logic is similar to the one in mca_coll_basic_bcast_log_intra()
.
for (peer = sg_start; peer <= sg_end; peer++) { | ||
if (peer == cur_base) { | ||
continue; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general we prefer the trick to iterate peers starting from rank+1 and wrap around to rank-1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, will modify.
int send_peer = ((adj_rank - i + subgrp_size) % subgrp_size) + sg_start; | ||
|
||
tmprecv = (char *) rbuf + (ptrdiff_t) recv_peer * (ptrdiff_t) rcount * rext; | ||
tmpsend = (char *) rbuf + (ptrdiff_t) send_peer * (ptrdiff_t) rcount * rext; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you explain why tmpsend is using rbuf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we are using the ring algorithm where at each step, the rank sends the data it received in the previous step. Since rbuf
contains the data received by the rank, it is used to derive tmpsend
.
* ompi_coll_base_allgather_intra_recursivedoubling(). | ||
* | ||
*/ | ||
static inline int rd_allgather_sub(void *rbuf, struct ompi_datatype_t *rdtype, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why doesn't this function take sbuf?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The relevant data from sbuf
is copied to rbuf
at the beginning of mca_coll_acoll_allgather_intra()
, so it does not need sbuf
.
int mca_coll_acoll_allgather(const void *sbuf, size_t scount, struct ompi_datatype_t *sdtype, | ||
void *rbuf, size_t rcount, struct ompi_datatype_t *rdtype, | ||
struct ompi_communicator_t *comm, mca_coll_base_module_t *module) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I'm confused about the use of sbuf vs rbuf and MPI_IN_PLACE handling. Please see my other comments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have reviewed this again. It is a lot of changes so I didn't dive particularly deep, but nothing stood out to me as needing to be addressed immediately. LGTM.
if (rank == group[0]) { | ||
__atomic_store_n((int *) ((char *) data->allshmmmap_sbuf[group[0]] + offset | ||
+ 64 * group[0]), | ||
val, __ATOMIC_RELAXED); | ||
} | ||
|
||
while (tmp0 != val) { | ||
tmp0 = __atomic_load_n((int *) ((char *) data->allshmmmap_sbuf[group[0]] + offset | ||
+ 64 * group[0]), | ||
__ATOMIC_RELAXED); | ||
} | ||
|
||
if (rank != group[0]) { | ||
val++; | ||
__atomic_store_n(tmp, val, __ATOMIC_RELAXED); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reading this I see we don't have __atomic_store_n
or __atomic_load_n
in opal. XHC runs into this too:
ompi/ompi/mca/coll/xhc/coll_xhc_atomic.h
Line 47 in ff12b69
#if OPAL_USE_GCC_BUILTIN_ATOMICS || OPAL_USE_C11_ATOMICS |
opal_atomic_load
or something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack
#include "opal/include/opal/align.h" | ||
|
||
/* Function to allocate scratch buffer */ | ||
static inline void *coll_acoll_buf_alloc(coll_acoll_reserve_mem_t *reserve_mem_ptr, uint64_t size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you find these functions improve performance much? I found that malloc will behave much like this if malloc_opt is used to set the MALLOC_MMAP_MAX threshold and MALLOC_TRIM_THRESHOLD up to the same size you have here (4MB).
(MMAP'ed memory size is default 128K, and if malloc returns memory using mmap, the memory will be immediately free'd on return)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For gather, we found this to improve performance over malloc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the owner.txt file.
int cid = ompi_comm_get_local_cid(comm); | ||
|
||
/* Fallback to linear if cid is beyond supported limit */ | ||
if (cid >= MCA_COLL_ACOLL_MAX_CID) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed @wenduwan comment above, and overall he is correct. But I have a more fundamental question here: what exactly are you trying to achieve with this test ? Only provide support with acoll for the first MCA_COLL_ACOLL_MAX_CID
communicators ? How do you define those globally to make sense in distributed, not symmetric, applications ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, currently support is provided for the first 100 communicators. We will have a follow-up patch where we remove this dependency.
Didn't quite understand "How do you define those globally to make sense in distributed, not symmetric, applications ?" Could you please elaborate?
* Memory: The base rank of each subgroup may create temporary buffer. | ||
* | ||
*/ | ||
int mca_coll_acoll_gather_intra(const void *sbuf, size_t scount, struct ompi_datatype_t *sdtype, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand how is this better than gather in HAN ? HAN will split the communicator in two, node-level and inter-nodes, and will then do a local gather, and then an inter-node gather with data reshuffling. This algorithm seems to assume a map-by core
distribution across the entire communicator. How is that applicable to sub-communicators ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the gather algorithm is optimal for -map-by core
option. For subcommunicators, the same algorithm is used which may not be optimal.
Ack |
bot:retest |
data->offset[0] = 16 * 1024; | ||
data->offset[1] = data->offset[0] + size * 64; | ||
data->offset[2] = data->offset[1] + size * 64; | ||
data->offset[3] = data->offset[2] + rank * 8 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where do these magic numbers come from? What offsets do they encode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated in the lastest push.
int offset = 16 * 1024; | ||
memset(((char *) data->allshmmmap_sbuf[data->l1_gp[0]]) + offset + 64 * rank, 0, 64); | ||
if (data->l1_gp[0] == rank) { | ||
memset(((char *) data->allshmmmap_sbuf[data->l2_gp[0]]) + (offset + 64 * size) + 64 * rank, | ||
0, 64); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. I'd prefer to have names for numbers instead of magic numbers strewn across the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
ompi_datatype_type_size(datatype, &dsize); | ||
total_dsize = dsize * count; | ||
|
||
if (total_dsize <= 8192) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like 8192
comes up often here. Why was that chosen? Is that related to cache sizes? Should this be a #define
constant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These numbers / conditions are empirically derived. I don't think we should #define these.
volatile int tmp1 = __atomic_load_n( | ||
(int *) ((char *) data->allshmmmap_sbuf[group[0]] + offset + CACHE_LINE_SIZE * group[i]), | ||
__ATOMIC_RELAXED); | ||
while (tmp1 == val) { | ||
tmp1 = __atomic_load_n((int *) ((char *) data->allshmmmap_sbuf[group[0]] + offset | ||
+ CACHE_LINE_SIZE * group[i]), | ||
__ATOMIC_RELAXED); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for volatile
(I believe) and maybe we can use the opal_atomic
API instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are planning to refactor some of the code in the next iteration, will keep your comment in mind. Just curious, what would be the benefit of opal_atomic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
opal_atomic
selects whatever atomic API is available. Strictly speaking, __atomic*
is a GCC extension (that is meant to resemble the C11 standard _Atomic
API) and I'm worried that we hit a compiler that doesn't support the __atomic
API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const int leader_shm_size = 16 * 1024; | ||
const int cache_line_size = 64; | ||
const int per_rank_shm_size = 8 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Are those numbers used anywhere else? I tried to find similar values. I'm a bit concerned that changing the values here would break code elsewhere. Maybe they should be #define
d in this header and used wherever the sizes of the buffers are relevant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The numbers are primarily used within *utils.h, but as you mentioned, it could be used elsewhere as well.
acoll is a collective component optimized for AMD "Zen"-based processors. It supports Bcast, Allreduce, Reduce, Barrier, Gather and Allgather APIs. Signed-off-by: Nithya V S <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm OK with merging this but I hope we get the replace of __atomic
with opal_atomic
soon.
This PR introduces "acoll", a high-performant collective component that is optimized for communications within a single node of AMD EPYC CPUs. It mainly uses subcommunicators based on l3cache or numa to reduce cross-cache or cross-numa accesses. The supported collectives include Bcast, Allreduce, Gather, Reduce, Barrier, Allgather.
OSU micro-benchmarks were run on 2-socket AMD EPYC 9654 96-Core Processor with 4 NUMA domains per socket, with a total of 192 cores per node, on top of commit bb7ecde.
Average percentage latency reduction over "tuned" across 32, 64, 96, 128, 192 ranks over message sizes of 8 bytes to 8 MB (varied in powers of 2):
Sample graphs:
Allreduce
Bcast
Gather