Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement interface for set! and use it to set! distributed fields better #3817

Merged
merged 11 commits into from
Oct 5, 2024
2 changes: 1 addition & 1 deletion src/Architectures.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ architecture(a::OffsetArray) = architecture(parent(a))
Return `arch`itecture of child processes.
On single-process, non-distributed systems, return `arch`.
"""
child_architecture(arch) = arch
child_architecture(arch::AbstractSerialArchitecture) = arch

array_type(::CPU) = Array
array_type(::GPU) = CuArray
Expand Down
2 changes: 1 addition & 1 deletion src/DistributedComputations/DistributedComputations.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module DistributedComputations

export
Distributed, Partition, Equal, Fractional,
child_architecture, reconstruct_global_grid,
child_architecture, reconstruct_global_grid, partition,
inject_halo_communication_boundary_conditions,
DistributedFFTBasedPoissonSolver

Expand Down
66 changes: 32 additions & 34 deletions src/DistributedComputations/distributed_fields.jl
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import Oceananigans.Fields: Field, FieldBoundaryBuffers, location, set!
import Oceananigans.BoundaryConditions: fill_halo_regions!

using CUDA: CuArray
using OffsetArrays: OffsetArray
using Oceananigans.Grids: topology
using Oceananigans.Fields: validate_field_data, indices, validate_boundary_conditions, validate_indices, recv_from_buffers!
using Oceananigans.Fields: validate_field_data, indices, validate_boundary_conditions
using Oceananigans.Fields: validate_indices, recv_from_buffers!, set_to_array!, set_to_field!

import Oceananigans.Fields: Field, FieldBoundaryBuffers, location, set!
import Oceananigans.BoundaryConditions: fill_halo_regions!

function Field((LX, LY, LZ)::Tuple, grid::DistributedGrid, data, old_bcs, indices::Tuple, op, status)
arch = architecture(grid)
indices = validate_indices(indices, (LX, LY, LZ), grid)
validate_field_data((LX, LY, LZ), data, grid, indices)
validate_boundary_conditions((LX, LY, LZ), grid, old_bcs)
new_bcs = inject_halo_communication_boundary_conditions(old_bcs, arch.local_rank, arch.connectivity, topology(grid))

arch = architecture(grid)
rank = arch.local_rank
new_bcs = inject_halo_communication_boundary_conditions(old_bcs, rank, arch.connectivity, topology(grid))
buffers = FieldBoundaryBuffers(grid, data, new_bcs)

return Field{LX, LY, LZ}(grid, data, new_bcs, indices, op, status, buffers)
Expand All @@ -19,42 +23,36 @@ end
const DistributedField = Field{<:Any, <:Any, <:Any, <:Any, <:DistributedGrid}
const DistributedFieldTuple = NamedTuple{S, <:NTuple{N, DistributedField}} where {S, N}

function set!(u::DistributedField, f::Function)
arch = architecture(u)
if child_architecture(arch) isa GPU
cpu_grid = on_architecture(cpu_architecture(arch), u.grid)
u_cpu = Field(location(u), cpu_grid, eltype(u); indices = indices(u))
f_field = field(location(u), f, cpu_grid)
set!(u_cpu, f_field)
set!(u, u_cpu)
elseif child_architecture(arch) isa CPU
f_field = field(location(u), f, u.grid)
set!(u, f_field)
global_size(f::DistributedField) = global_size(architecture(f), size(f))

# Automatically partition under the hood if sizes are compatible
function set!(u::DistributedField, V::Union{Array, CuArray, OffsetArray})
NV = size(V)
Nu = global_size(u)

# Suppress singleton indices
NV′ = filter(n -> n > 1, NV)
Nu′ = filter(n -> n > 1, Nu)

if NV′ == Nu′
v = partition(V, u)
else
v = V
end

return u
return set_to_array!(u, v)
end

# Automatically partition under the hood if sizes are compatible
function set!(u::DistributedField, v::Union{Array, CuArray})
gsize = global_size(architecture(u), size(u))

if size(v) == gsize
f = partition_global_array(architecture(u), v, size(u))
u .= f
return u
function set!(u::DistributedField, V::Field)
if size(V) == global_size(u)
v = partition(V, u)
return set_to_array!(u, v)
else
try
f = on_architecture(architecture(u), v)
u .= f
return u

catch
throw(ArgumentError("ERROR: DimensionMismatch: array could not be set to match destination field"))
end
return set_to_field!(u, V)
end
end


"""
synchronize_communication!(field)

Expand Down
12 changes: 6 additions & 6 deletions src/DistributedComputations/distributed_grids.jl
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ function reconstruct_global_grid(grid::DistributedRectilinearGrid)

nx, ny, nz = n = size(grid)
Hx, Hy, Hz = H = halo_size(grid)
Nx, Ny, Nz = map(sum, concatenate_local_sizes(n, arch))
Nx, Ny, Nz = global_size(arch, n)

TX, TY, TZ = topology(grid)

Expand Down Expand Up @@ -219,7 +219,7 @@ function reconstruct_global_grid(grid::DistributedLatitudeLongitudeGrid)

nλ, nφ, nz = n = size(grid)
Hλ, Hφ, Hz = H = halo_size(grid)
Nλ, Nφ, Nz = map(sum, concatenate_local_sizes(n, arch))
Nλ, Nφ, Nz = global_size(arch, n)

TX, TY, TZ = topology(grid)

Expand Down Expand Up @@ -266,12 +266,12 @@ end
# take precedence on `DistributedGrid`
function with_halo(new_halo, grid::DistributedRectilinearGrid)
new_grid = with_halo(new_halo, reconstruct_global_grid(grid))
return scatter_local_grids(architecture(grid), new_grid, size(grid))
return scatter_local_grids(new_grid, architecture(grid), size(grid))
end

function with_halo(new_halo, grid::DistributedLatitudeLongitudeGrid)
new_grid = with_halo(new_halo, reconstruct_global_grid(grid))
return scatter_local_grids(architecture(grid), new_grid, size(grid))
return scatter_local_grids(new_grid, architecture(grid), size(grid))
end

# Extending child_architecture for grids
Expand All @@ -295,13 +295,13 @@ function scatter_grid_properties(global_grid)
return x, y, z, topo, halo
end

function scatter_local_grids(arch::Distributed, global_grid::RectilinearGrid, local_size)
function scatter_local_grids(global_grid::RectilinearGrid, arch::Distributed, local_size)
x, y, z, topo, halo = scatter_grid_properties(global_grid)
global_sz = global_size(arch, local_size)
return RectilinearGrid(arch, eltype(global_grid); size=global_sz, x=x, y=y, z=z, halo=halo, topology=topo)
end

function scatter_local_grids(arch::Distributed, global_grid::LatitudeLongitudeGrid, local_size)
function scatter_local_grids(global_grid::LatitudeLongitudeGrid, arch::Distributed, local_size)
x, y, z, topo, halo = scatter_grid_properties(global_grid)
global_sz = global_size(arch, local_size)
return LatitudeLongitudeGrid(arch, eltype(global_grid); size=global_sz, longitude=x,
Expand Down
149 changes: 87 additions & 62 deletions src/DistributedComputations/partition_assemble.jl
Original file line number Diff line number Diff line change
@@ -1,61 +1,66 @@
import Oceananigans.Architectures: on_architecture
using Oceananigans.Fields: Field

all_reduce(op, val, arch::Distributed) =
MPI.Allreduce(val, op, arch.communicator)
import Oceananigans.Architectures: on_architecture

all_reduce(op, val, arch::Distributed) = MPI.Allreduce(val, op, arch.communicator)
all_reduce(op, val, arch) = val

# MPI Barrier
barrier!(arch) = nothing
barrier!(arch::Distributed) = MPI.Barrier(arch.communicator)

"""
concatenate_local_sizes(n, arch::Distributed)
concatenate_local_sizes(local_size, arch::Distributed)

Return a 3-Tuple containing a vector of `size(grid, idx)` for each rank in
Return a 3-Tuple containing a vector of `size(grid, dim)` for each rank in
all 3 directions.
"""
concatenate_local_sizes(n, arch::Distributed) =
Tuple(concatenate_local_sizes(n, arch, i) for i in 1:length(n))
concatenate_local_sizes(local_size, arch::Distributed) =
Tuple(concatenate_local_sizes(local_size, arch, d) for d in 1:length(local_size))

concatenate_local_sizes(sz, arch, dim) = concatenate_local_sizes(sz[dim], arch, dim)

function concatenate_local_sizes(n, arch::Distributed, idx)
R = arch.ranks[idx]
r = arch.local_index[idx]
n = n isa Number ? n : n[idx]
l = zeros(Int, R)
function concatenate_local_sizes(n::Number, arch::Distributed, dim)
R = arch.ranks[dim]
r = arch.local_index[dim]
N = zeros(Int, R)

r1, r2 = arch.local_index[[1, 2, 3] .!= idx]
r1, r2 = arch.local_index[[1, 2, 3] .!= dim]

if r1 == 1 && r2 == 1
l[r] = n
N[r] = n
end

MPI.Allreduce!(l, +, arch.communicator)
MPI.Allreduce!(N, +, arch.communicator)

return l
return N
end

# Partitioning (localization of global objects) and assembly (global assembly of local objects)
# Used for grid constructors (cpu_face_constructor_x, cpu_face_constructor_y, cpu_face_constructor_z)
# We need to repeat the value at the right boundary
function partition_coordinate(c::AbstractVector, n, arch, idx)
nl = concatenate_local_sizes(n, arch, idx)
r = arch.local_index[idx]
"""
partition_coordinate(coordinate, n, arch, dim)

Return the local component of the global `coordinate`, which has
local length `n` and is distributed on `arch`itecture
in the x-, y-, or z- `dim`ension.
"""
function partition_coordinate(c::AbstractVector, n, arch, dim)
nl = concatenate_local_sizes(n, arch, dim)
r = arch.local_index[dim]

start_idx = sum(nl[1:r-1]) + 1 # sum of all previous rank's dimension + 1
end_idx = if r == ranks(arch)[idx]
end_idx = if r == ranks(arch)[dim]
length(c)
else
sum(nl[1:r]) + 1
end

return c[start_idx : end_idx]
return c[start_idx:end_idx]
end

function partition_coordinate(c::Tuple, n, arch, idx)
nl = concatenate_local_sizes(n, arch, idx)
function partition_coordinate(c::Tuple, n, arch, dim)
nl = concatenate_local_sizes(n, arch, dim)
N = sum(nl)
R = arch.ranks[idx]
R = arch.ranks[dim]
Δl = (c[2] - c[1]) / N

l = Tuple{Float64, Float64}[(c[1], c[1] + Δl * nl[1])]
Expand All @@ -64,7 +69,7 @@ function partition_coordinate(c::Tuple, n, arch, idx)
push!(l, (lp, lp + Δl * nl[i]))
end

return l[arch.local_index[idx]]
return l[arch.local_index[dim]]
end

"""
Expand All @@ -75,11 +80,11 @@ a local number of elements `Nc`, number of ranks `Nr`, rank `r`,
and `arch`itecture. Since we use a global reduction, only ranks at positions
1 in the other two directions `r1 == 1` and `r2 == 1` fill the 1D array.
"""
function assemble_coordinate(c_local::AbstractVector, n, arch, idx)
nl = concatenate_local_sizes(n, arch, idx)
R = arch.ranks[idx]
r = arch.local_index[idx]
r2 = [arch.local_index[i] for i in filter(x -> x != idx, (1, 2, 3))]
function assemble_coordinate(c_local::AbstractVector, n, arch, dim)
nl = concatenate_local_sizes(n, arch, dim)
R = arch.ranks[dim]
r = arch.local_index[dim]
r2 = [arch.local_index[i] for i in filter(x -> x != dim, (1, 2, 3))]

c_global = zeros(eltype(c_local), sum(nl)+1)

Expand All @@ -94,13 +99,13 @@ function assemble_coordinate(c_local::AbstractVector, n, arch, idx)
end

# Simple case, just take the first and the last core
function assemble_coordinate(c_local::Tuple, n, arch, idx)
function assemble_coordinate(c_local::Tuple, n, arch, dim)
c_global = zeros(Float64, 2)

rank = arch.local_index
R = arch.ranks[idx]
r = rank[idx]
r2 = [rank[i] for i in filter(x -> x != idx, (1, 2, 3))]
R = arch.ranks[dim]
r = rank[dim]
r2 = [rank[i] for i in filter(x -> x != dim, (1, 2, 3))]

if rank[1] == 1 && rank[2] == 1 && rank[3] == 1
c_global[1] = c_local[1]
Expand All @@ -113,42 +118,62 @@ function assemble_coordinate(c_local::Tuple, n, arch, idx)
return tuple(c_global...)
end

# TODO: partition_global_array and construct_global_array
# do not currently work for 3D parallelizations
# (They are not used anywhere in the code at the moment exept for immersed boundaries)
# TODO: make partition and construct_global_array work for 3D distribution.

"""
partition_global_array(arch, c_global, (nx, ny, nz))
partition(A, b)

Partition the globally-sized `A` into local arrays with the same size as `b`.
"""
partition(A, b::Field) = partition(A, architecture(b), size(b))
partition(F::Field, b::Field) = partition(interior(F), b)
partition(f::Function, arch, n) = f
partition(A::AbstractArray, arch::AbstractSerialArchitecture, local_size) = A

Partition a global array in local arrays of size `(nx, ny)` if 2D or `(nx, ny, nz)` is 3D.
Usefull for boundary arrays, forcings and initial conditions.
"""
partition_global_array(arch, c_global::AbstractArray, n) = c_global
partition_global_array(arch, c_global::Function, n) = c_global
partition(A, arch, local_size)

# Here we assume that we cannot partition in z (we should remove support for that)
function partition_global_array(arch::Distributed, c_global::AbstractArray, n)
c_global = on_architecture(CPU(), c_global)
Partition the globally-sized `A` into local arrays with `local_size` on `arch`itecture.
"""
function partition(A::AbstractArray, arch::Distributed, local_size)
A = on_architecture(CPU(), A)

ri, rj, rk = arch.local_index
dims = length(size(A))

dims = length(size(c_global))
nx, ny, nz = concatenate_local_sizes(n, arch)
# Vectors with the local size for every rank
nxs, nys, nzs = concatenate_local_sizes(local_size, arch)

nz = nz[1]
# The local size
nx = nxs[ri]
ny = nys[rj]
nz = nzs[1]
# @assert (nx, ny, nz) == local_size

if dims == 2
c_local = zeros(eltype(c_global), nx[ri], ny[rj])
up_to = nxs[1:ri-1]
including = nxs[1:ri]
i₁ = sum(up_to) + 1
i₂ = sum(including)

c_local .= c_global[1 + sum(nx[1:ri-1]) : sum(nx[1:ri]),
1 + sum(ny[1:rj-1]) : sum(ny[1:rj])]
else
c_local = zeros(eltype(c_global), nx[ri], ny[rj], nz)
up_to = nys[1:rj-1]
including = nys[1:rj]
j₁ = sum(up_to) + 1
j₂ = sum(including)

c_local .= c_global[1 + sum(nx[1:ri-1]) : sum(nx[1:ri]),
1 + sum(ny[1:rj-1]) : sum(ny[1:rj]),
1:nz]
ii = UnitRange(i₁, i₂)
jj = UnitRange(j₁, j₂)
kk = 1:nz # no partitioning in z

# TODO: undo this toxic assumption that all 2D arrays span x, y.
if dims == 2
a = zeros(eltype(A), nx, ny)
a .= A[ii, jj]
else
a = zeros(eltype(A), nx, ny, nz)
a .= A[ii, jj, 1:nz]
end
return on_architecture(child_architecture(arch), c_local)

return on_architecture(child_architecture(arch), a)
end

"""
Expand All @@ -160,7 +185,7 @@ Usefull for boundary arrays, forcings and initial conditions.
construct_global_array(arch, c_local::AbstractArray, n) = c_local
construct_global_array(arch, c_local::Function, N) = c_local

# TODO: This does not work for 3D parallelizations!!!
# TODO: This does not work for 3D parallelizations
function construct_global_array(arch::Distributed, c_local::AbstractArray, n)
c_local = on_architecture(CPU(), c_local)

Expand Down
Loading