Skip to content

Commit

Permalink
Timo/distributed (#20)
Browse files Browse the repository at this point in the history
* Added MPI dependencies

* Removed MPI as default dependenc

* Small improvements

* Fixed MPI dependency declaration for RLST
  • Loading branch information
tbetcke authored Dec 16, 2024
1 parent 6c3f7c3 commit 7708ae0
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 2 deletions.
11 changes: 9 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ nightly = ["pulp/nightly"]
strict = []
sleef = ["rlst/sleef"]
default = ["sleef"]
mpi = ["dep:mpi", "dep:bempp-distributed-tools", "rlst/mpi"]


[package]
name = "green-kernels"
Expand All @@ -29,13 +31,16 @@ approx = { version = "0.5", features = ["num-complex"] }
rayon = "1.9"
num = "0.4"
num_cpus = "1"
rlst = { git = "https://github.com/linalg-rs/rlst.git", default-features = false }
rlst = { git = "https://github.com/linalg-rs/rlst.git" }
# rlst = { path = "../rlst", features = ["mpi"] }
rand = "0.8.5"
itertools = { version = "0.13.0", default-features = false }
coe-rs = "0.1.2"
pulp = { version = "0.21" }
bytemuck = "1.16.0"
hexf = "0.2.1"
mpi = { version = "0.8.*", optional = true }
bempp-distributed-tools = { git = "https://github.com/bempp/distributed_tools.git", optional = true }

[dev-dependencies]
criterion = { version = "0.5.1", features = ["html_reports"] }
Expand Down Expand Up @@ -72,5 +77,7 @@ harness = false
name = "helmholtz_c64"
harness = false


[[example]]
name = "evaluate_distributed"
required-features = ["mpi"]

105 changes: 105 additions & 0 deletions examples/evaluate_distributed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
//! Distributed evaluation of sources and targets.
use bempp_distributed_tools::IndexLayoutFromLocalCounts;
use green_kernels::traits::*;
use green_kernels::{laplace_3d::Laplace3dKernel, types::GreenKernelEvalType};
use mpi::traits::Communicator;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use rlst::prelude::*;
use rlst::{
assert_array_relative_eq, rlst_dynamic_array1, DistributedVector, RawAccess, RawAccessMut,
};

fn main() {
// Create the MPI communicator
let universe = mpi::initialize().unwrap();
let world = universe.world();

// Number of sources on each process.
let n_sources = 10000;
// Number of targets on each process.
let n_targets = 1000;

// Init the random number generator.
let mut rng = ChaCha8Rng::seed_from_u64(0);

// Create a Laplace kernel.
let kernel = Laplace3dKernel::<f64>::default();

// We create index layout for sources and targets.
let source_layout = IndexLayoutFromLocalCounts::new(3 * n_sources, &world);
let target_layout = IndexLayoutFromLocalCounts::new(3 * n_targets, &world);
let charge_layout = IndexLayoutFromLocalCounts::new(n_sources, &world);
let result_layout = IndexLayoutFromLocalCounts::new(n_targets, &world);

// Create the sources and charges.
let sources = DistributedVector::<_, f64>::new(&source_layout);
let targets = DistributedVector::<_, f64>::new(&target_layout);

sources.local_mut().fill_from_equally_distributed(&mut rng);
targets.local_mut().fill_from_equally_distributed(&mut rng);

// Create the charges.
let charges = DistributedVector::<_, f64>::new(&charge_layout);
charges.local_mut().fill_from_equally_distributed(&mut rng);

// Create the result vector.
let mut result = DistributedVector::<_, f64>::new(&result_layout);

// Evaluate the kernel.

kernel.evaluate_distributed(
GreenKernelEvalType::Value,
&sources,
&targets,
&charges,
&mut result,
false,
);

// We now check the result with an evaluation only on the first rank.

if world.rank() != 0 {
sources.gather_to_rank(0);
targets.gather_to_rank(0);
charges.gather_to_rank(0);
result.gather_to_rank(0);
} else {
let sources = {
let mut tmp = rlst_dynamic_array1!(f64, [3 * n_sources * world.size() as usize]);
sources.gather_to_rank_root(tmp.r_mut());
tmp
};

let targets = {
let mut tmp = rlst_dynamic_array1!(f64, [3 * n_targets * world.size() as usize]);
targets.gather_to_rank_root(tmp.r_mut());
tmp
};

let charges = {
let mut tmp = rlst_dynamic_array1!(f64, [n_sources * world.size() as usize]);
charges.gather_to_rank_root(tmp.r_mut());
tmp
};

let result = {
let mut tmp = rlst_dynamic_array1!(f64, [n_targets * world.size() as usize]);
result.gather_to_rank_root(tmp.r_mut());
tmp
};

let mut expected = rlst_dynamic_array1!(f64, [n_targets * world.size() as usize]);

kernel.evaluate_mt(
GreenKernelEvalType::Value,
sources.data(),
targets.data(),
charges.data(),
expected.data_mut(),
);

assert_array_relative_eq!(result, expected, 1e-13);
}
}
106 changes: 106 additions & 0 deletions src/traits.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
//! Trait for Green's function kernels
use crate::types::GreenKernelEvalType;
#[cfg(feature = "mpi")]
use mpi::traits::{Communicator, Equivalence, Root};
use rlst::RlstScalar;
#[cfg(feature = "mpi")]
use rlst::{rlst_dynamic_array1, DistributedVector, IndexLayout, RawAccess, RawAccessMut};

/// Interface to evaluating Green's functions for given sources and targets.
pub trait Kernel: Sync {
Expand Down Expand Up @@ -109,3 +114,104 @@ pub trait Kernel: Sync {
/// given, and `4` if [EvalType::ValueDeriv] is given.

Check warning on line 114 in src/traits.rs

View workflow job for this annotation

GitHub Actions / Build docs

unresolved link to `EvalType::ValueDeriv`

Check warning on line 114 in src/traits.rs

View workflow job for this annotation

GitHub Actions / Build and deploy docs

unresolved link to `EvalType::ValueDeriv`
fn range_component_count(&self, eval_type: GreenKernelEvalType) -> usize;
}

// Note that we cannot just add the `evaluate_distributed` method to the `Kernel` trait
// since currently the C interface is implemented by making `Kernel` a trait object.
// This requires that methods do not introduce additional template parameters. Can change this
// again once we move to the better C interface in `c-api-tools`.

/// Distributed evaluation of a Green's function kernel.
///
/// If `use_multithreaded` is set to true, the evaluation uses Rayon multi-threading on each rank.
/// Otherwise, the evaluation on each rank is single-threaded.
#[cfg(feature = "mpi")]
pub trait DistributedKernelEvaluator: Kernel {
fn evaluate_distributed<
SourceLayout: IndexLayout,
TargetLayout: IndexLayout,
ChargeLayout: IndexLayout,
ResultLayout: IndexLayout,
>(
&self,
eval_type: GreenKernelEvalType,
sources: &DistributedVector<'_, SourceLayout, <Self::T as RlstScalar>::Real>,
targets: &DistributedVector<'_, TargetLayout, <Self::T as RlstScalar>::Real>,
charges: &DistributedVector<'_, ChargeLayout, Self::T>,
result: &mut DistributedVector<'_, ResultLayout, Self::T>,
use_multithreaded: bool,
) where
Self::T: Equivalence,
<Self::T as RlstScalar>::Real: Equivalence,
{
// We want that everything has the same communicator
assert!(std::ptr::addr_eq(
sources.index_layout().comm(),
charges.index_layout().comm()
));
assert!(std::ptr::addr_eq(
sources.index_layout().comm(),
targets.index_layout().comm()
));
assert!(std::ptr::addr_eq(
sources.index_layout().comm(),
result.index_layout().comm()
));

// Check that the output vector has the correct size.
assert_eq!(
self.range_component_count(eval_type)
* targets.index_layout().number_of_local_indices(),
3 * result.index_layout().number_of_local_indices()
);

let size = sources.index_layout().comm().size();

// We now iterate through each rank associated with the sources and communicate from that rank
// the sources to all target ranks.

for rank in 0..size as usize {
// Communicate the sources and charges from `rank` to all ranks.

let root_process = sources.index_layout().comm().process_at_rank(rank as i32);
let source_range = sources.index_layout().index_range(rank).unwrap();
let charge_range = charges.index_layout().index_range(rank).unwrap();
let nsources = source_range.1 - source_range.0;
let ncharges = charge_range.1 - charge_range.0;
// Make sure that number of sources and charges are compatible.
assert_eq!(nsources, 3 * ncharges);
let mut root_sources = rlst_dynamic_array1!(<Self::T as RlstScalar>::Real, [nsources]);
let mut root_charges = rlst_dynamic_array1!(Self::T, [ncharges]);
// If we are on `rank` fill the sources and charges.
if sources.index_layout().comm().rank() == rank as i32 {
root_sources.fill_from(sources.local().r());
root_charges.fill_from(charges.local().r());
}

root_process.broadcast_into(&mut root_sources.data_mut()[..]);
root_process.broadcast_into(&mut root_charges.data_mut()[..]);

// We now have the sources and charges on all ranks. We can now simply evaluate.

if use_multithreaded {
self.evaluate_mt(
eval_type,
&root_sources.data()[..],
targets.local().data(),
&root_charges.data()[..],
result.local_mut().data_mut(),
);
} else {
self.evaluate_st(
eval_type,
&root_sources.data()[..],
targets.local().data(),
&root_charges.data()[..],
result.local_mut().data_mut(),
);
}
}
}
}

#[cfg(feature = "mpi")]
impl<K: Kernel> DistributedKernelEvaluator for K {}

0 comments on commit 7708ae0

Please sign in to comment.