Skip to content

Commit

Permalink
Added IReduce! and IAllreduce!
Browse files Browse the repository at this point in the history
  • Loading branch information
Keluaa authored and vchuravy committed Jun 23, 2024
1 parent 71acbb7 commit d026536
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 0 deletions.
90 changes: 90 additions & 0 deletions src/collective.jl
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,57 @@ function Reduce(object::T, op, root::Integer, comm::Comm) where {T}
end
end

## IReduce

"""
IReduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0)
IReduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()]; root::Integer=0)
Starts a nonblocking elementwise reduction using the operator `op` on the buffer `sendbuf` and
stores the result in `recvbuf` on the process of rank `root`.
On non-root processes `recvbuf` is ignored, and can be `nothing`.
To perform the reduction in place, provide a single buffer `sendrecvbuf`.
Returns the [`AbstractRequest`](@ref) object for the nonblocking reduction.
# See also
- [`Reduce!`](@ref) the equivalent blocking operation.
- [`IAllreduce!`](@ref) to send reduction to all ranks.
- [`Op`](@ref) for details on reduction operators.
# External links
$(_doc_external("MPI_Ireduce"))
"""
IReduce!(sendrecvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) =
IReduce!(sendrecvbuf, op, root, comm, req)
IReduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request(); root::Integer=Cint(0)) =
IReduce!(sendbuf, recvbuf, op, root, comm, req)

function IReduce!(rbuf::RBuffer, op::Union{Op,MPI_Op}, root::Integer, comm::Comm, req::AbstractRequest=Request())
# int MPI_Ireduce(const void* sendbuf, void* recvbuf, int count,
# MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm,
# MPI_Request* req)
API.MPI_Ireduce(rbuf.senddata, rbuf.recvdata, rbuf.count, rbuf.datatype, op, root, comm, req)
setbuffer!(req, rbuf)
return req
end

IReduce!(rbuf::RBuffer, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) =
IReduce!(rbuf, Op(op, eltype(rbuf)), root, comm, req)
IReduce!(sendbuf, recvbuf, op, root::Integer, comm::Comm, req::AbstractRequest=Request()) =
IReduce!(RBuffer(sendbuf, recvbuf), op, root, comm, req)

# inplace
function IReduce!(buf, op, root::Integer, comm::Comm, req::AbstractRequest=Request())
if Comm_rank(comm) == root
IReduce!(IN_PLACE, buf, op, root, comm, req)
else
IReduce!(buf, nothing, op, root, comm, req)
end
end

## Allreduce

# mutating
Expand Down Expand Up @@ -775,6 +826,45 @@ Allreduce(sendbuf::AbstractArray, op, comm::Comm) =
Allreduce(obj::T, op, comm::Comm) where {T} =
Allreduce!(Ref(obj), Ref{T}(), op, comm)[]

## IAllreduce

"""
IAllreduce!(sendbuf, recvbuf, op, comm::Comm[, req::AbstractRequest = Request()])
IAllreduce!(sendrecvbuf, op, comm::Comm[, req::AbstractRequest = Request()])
Starts a nonblocking elementwise reduction using the operator `op` on the buffer `sendbuf`, storing
the result in the `recvbuf` of all processes in the group.
If only one `sendrecvbuf` buffer is provided, then the operation is performed in-place.
Returns the [`AbstractRequest`](@ref) object for the nonblocking reduction.
# See also
- [`Allreduce!`](@ref) the equivalent blocking operation.
- [`IReduce!`](@ref) to send reduction to a single rank.
- [`Op`](@ref) for details on reduction operators.
# External links
$(_doc_external("MPI_Iallreduce"))
"""
function IAllreduce!(rbuf::RBuffer, op::Union{Op, MPI_Op}, comm::Comm, req::AbstractRequest=Request())
@assert isnull(req)
# int MPI_Iallreduce(const void* sendbuf, void* recvbuf, int count,
# MPI_Datatype datatype, MPI_Op op, MPI_Comm comm,
# MPI_Request* req)
API.MPI_Iallreduce(rbuf.senddata, rbuf.recvdata, rbuf.count, rbuf.datatype, op, comm, req)
setbuffer!(req, rbuf)
return req
end
IAllreduce!(rbuf::RBuffer, op, comm::Comm, req::AbstractRequest=Request()) =
IAllreduce!(rbuf, Op(op, eltype(rbuf)), comm, req)
IAllreduce!(sendbuf, recvbuf, op, comm::Comm, req::AbstractRequest=Request()) =
IAllreduce!(RBuffer(sendbuf, recvbuf), op, comm, req)

# inplace
IAllreduce!(rbuf, op, comm::Comm, req::AbstractRequest=Request()) =
IAllreduce!(IN_PLACE, rbuf, op, comm, req)

## Scan

# mutating
Expand Down
13 changes: 13 additions & 0 deletions test/test_allreduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ for T = [Int]
vals = MPI.Allreduce(send_arr, op, MPI.COMM_WORLD)
@test vals isa ArrayType{T}
@test vals == comm_size .* send_arr

# Nonblocking
recv_arr = ArrayType{T}(undef, size(send_arr))
req = MPI.IAllreduce!(send_arr, recv_arr, op, MPI.COMM_WORLD)
MPI.Wait(req)
@test recv_arr == comm_size .* send_arr

# Nonblocking (IN_PLACE)
recv_arr = copy(send_arr)
synchronize()
req = MPI.IAllreduce!(recv_arr, op, MPI.COMM_WORLD)
MPI.Wait(req)
@test recv_arr == comm_size .* send_arr
end
end
end
Expand Down
23 changes: 23 additions & 0 deletions test/test_reduce.jl
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,22 @@ for T = [Int]
@test recv_arr isa ArrayType{T}
@test recv_arr == sz .* view(send_arr, 2:3)
end

# Nonblocking
recv_arr = ArrayType{T}(undef, size(send_arr))
req = MPI.IReduce!(send_arr, recv_arr, op, MPI.COMM_WORLD; root=root)
MPI.Wait(req)
if isroot
@test recv_arr == sz .* send_arr
end

# Nonblocking (IN_PLACE)
recv_arr = copy(send_arr)
req = MPI.IReduce!(recv_arr, op, MPI.COMM_WORLD; root=root)
MPI.Wait(req)
if isroot
@test recv_arr == sz .* send_arr
end
end
end
end
Expand All @@ -127,6 +143,13 @@ if can_do_closures
@test result === nothing
end

recv_arr = isroot ? zeros(eltype(send_arr), size(send_arr)) : nothing
req = MPI.IReduce!(send_arr, recv_arr, +, MPI.COMM_WORLD; root=root)
MPI.Wait(req)
if rank == root
@test recv_arr [Double64(sz*i)/10 for i = 1:10] rtol=sz*eps(Double64)
end

MPI.Barrier( MPI.COMM_WORLD )
end

Expand Down

0 comments on commit d026536

Please sign in to comment.