Skip to content

Commit

Permalink
Progress
Browse files Browse the repository at this point in the history
  • Loading branch information
skailasa committed Dec 11, 2024
1 parent f193be9 commit 9385b2c
Show file tree
Hide file tree
Showing 14 changed files with 160 additions and 39 deletions.
14 changes: 11 additions & 3 deletions kifmm/examples/mpi_test_fmm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn main() {
datatype::PartitionMut,
traits::{Communicator, Root},
};
use rlst::RawAccess;
use rlst::{rlst_dynamic_array2, RawAccess, RawAccessMut};

let (universe, _threading) = mpi::initialize_with_threading(mpi::Threading::Funneled).unwrap();
let world = universe.world();
Expand Down Expand Up @@ -45,6 +45,10 @@ fn main() {

// Generate some random test data local to each process
let points = points_fixture::<f32>(n_points, None, None, Some(world.rank() as u64));
let nvecs = 1;
let tmp = vec![1.0; n_points];
let mut charges = rlst_dynamic_array2!(f32, [n_points, 1]);
charges.data_mut().copy_from_slice(&tmp);

let mut multi_fmm = MultiNodeBuilder::new(false)
.tree(
Expand All @@ -57,7 +61,7 @@ fn main() {
sort_kind.clone(),

Check warning on line 61 in kifmm/examples/mpi_test_fmm.rs

View workflow job for this annotation

GitHub Actions / Rust style checks (--features "strict")

Diff in /home/runner/work/kifmm/kifmm/kifmm/examples/mpi_test_fmm.rs

Check warning on line 61 in kifmm/examples/mpi_test_fmm.rs

View workflow job for this annotation

GitHub Actions / Rust style checks

Diff in /home/runner/work/kifmm/kifmm/kifmm/examples/mpi_test_fmm.rs
)
.unwrap()
.parameters(expansion_order, kernel.clone(), source_to_target)
.parameters(charges.data(), expansion_order, kernel.clone(), source_to_target)
.unwrap()
.build()
.unwrap();
Expand Down Expand Up @@ -158,6 +162,10 @@ fn main() {

// Generate some random test data local to each process
let points = points_fixture::<f32>(n_points, None, None, Some(world.rank() as u64));
let nvecs = 1;
let tmp = vec![1.0; n_points];
let mut charges = rlst_dynamic_array2!(f32, [n_points, 1]);
charges.data_mut().copy_from_slice(&tmp);

let mut multi_fmm = MultiNodeBuilder::new(false)
.tree(
Expand All @@ -170,7 +178,7 @@ fn main() {
sort_kind,
)
.unwrap()
.parameters(expansion_order, kernel, source_to_target)
.parameters(charges.data(), expansion_order, kernel, source_to_target)
.unwrap()
.build()
.unwrap();
Expand Down
8 changes: 6 additions & 2 deletions kifmm/examples/mpi_test_layout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fn main() {
datatype::PartitionMut,
traits::{Communicator, CommunicatorCollectives},
};
use rlst::RawAccess;
use rlst::{rlst_dynamic_array2, RawAccess, RawAccessMut};

let (universe, _threading) = mpi::initialize_with_threading(mpi::Threading::Single).unwrap();
let world = universe.world();
Expand All @@ -40,6 +40,10 @@ fn main() {

// Generate some random test data local to each process
let points = points_fixture::<f32>(n_points, None, None, None);
let nvecs = 1;
let tmp = vec![1.0; n_points];
let mut charges = rlst_dynamic_array2!(f32, [n_points, 1]);
charges.data_mut().copy_from_slice(&tmp);

ThreadPoolBuilder::new()
.num_threads(1)
Expand All @@ -58,7 +62,7 @@ fn main() {
sort_kind,
)
.unwrap()
.parameters(expansion_order, kernel, source_to_target)
.parameters(charges.data(), expansion_order, kernel, source_to_target)
.unwrap()
.build()
.unwrap();
Expand Down
7 changes: 5 additions & 2 deletions kifmm/examples/mpi_test_p2m.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {
datatype::PartitionMut,
traits::{Communicator, Root},
};
use rlst::RawAccess;
use rlst::{rlst_dynamic_array2, RawAccess, RawAccessMut};

let (universe, _threading) = mpi::initialize_with_threading(mpi::Threading::Single).unwrap();
let world = universe.world();
Expand All @@ -44,6 +44,9 @@ fn main() {

// Generate some random test data local to each process
let points = points_fixture::<f32>(n_points, None, None, None);
let tmp = vec![1.0; n_points];
let mut charges = rlst_dynamic_array2!(f32, [n_points, 1]);
charges.data_mut().copy_from_slice(&tmp);

ThreadPoolBuilder::new()
.num_threads(1)
Expand All @@ -62,7 +65,7 @@ fn main() {
sort_kind,
)
.unwrap()
.parameters(expansion_order, kernel, source_to_target)
.parameters(charges.data(), expansion_order, kernel, source_to_target)
.unwrap()
.build()
.unwrap();
Expand Down
7 changes: 5 additions & 2 deletions kifmm/examples/mpi_test_trees.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn main() {
datatype::PartitionMut,
traits::{Communicator, Root},
};
use rlst::RawAccess;
use rlst::{rlst_dynamic_array2, RawAccess, RawAccessMut};

let (universe, _threading) = mpi::initialize_with_threading(mpi::Threading::Single).unwrap();
let world = universe.world();
Expand All @@ -41,6 +41,9 @@ fn main() {

// Generate some random test data local to each process
let points = points_fixture::<f32>(n_points, None, None, Some(world.rank() as u64));
let tmp = vec![1.0; n_points];
let mut charges = rlst_dynamic_array2!(f32, [n_points, 1]);
charges.data_mut().copy_from_slice(&tmp);

ThreadPoolBuilder::new()
.num_threads(1)
Expand All @@ -58,7 +61,7 @@ fn main() {
sort_kind,
)
.unwrap()
.parameters(expansion_order, kernel, source_to_target)
.parameters(charges.data(), expansion_order, kernel, source_to_target)
.unwrap()
.build()
.unwrap();
Expand Down
7 changes: 5 additions & 2 deletions kifmm/examples/mpi_test_upward_pass.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn main() {
datatype::PartitionMut,
traits::{Communicator, Root},
};
use rlst::{RawAccess, RlstScalar};
use rlst::{rlst_dynamic_array2, RawAccess, RawAccessMut, RlstScalar};

let (universe, _threading) = mpi::initialize_with_threading(mpi::Threading::Funneled).unwrap();
let world = universe.world();
Expand All @@ -38,6 +38,9 @@ fn main() {

// Generate some random test data local to each process
let points = points_fixture::<f32>(n_points, None, None, None);
let tmp = vec![1.0; n_points];
let mut charges = rlst_dynamic_array2!(f32, [n_points, 1]);
charges.data_mut().copy_from_slice(&tmp);

let mut fmm = MultiNodeBuilder::new(false)
.tree(
Expand All @@ -50,7 +53,7 @@ fn main() {
sort_kind,
)
.unwrap()
.parameters(expansion_order, kernel, source_to_target)
.parameters(charges.data(), expansion_order, kernel, source_to_target)
.unwrap()
.build()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion kifmm/src/fmm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod builder;
pub mod constants;
mod data_access;
mod eval;
mod field_translation;
pub mod field_translation;
pub mod helpers;
pub mod isa;
mod kernel;
Expand Down
70 changes: 68 additions & 2 deletions kifmm/src/fmm/builder/multi_node.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use itertools::Itertools;

use mpi::{

Check warning on line 3 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Rust style checks (--features "strict")

Diff in /home/runner/work/kifmm/kifmm/kifmm/src/fmm/builder/multi_node.rs

Check warning on line 3 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Rust style checks

Diff in /home/runner/work/kifmm/kifmm/kifmm/src/fmm/builder/multi_node.rs
datatype::{Partition, PartitionMut},
topology::{Communicator, SimpleCommunicator},
traits::Equivalence,
traits::{Equivalence, CommunicatorCollectives}, Count,
};
use num::Float;
use rlst::{rlst_dynamic_array2, MatrixSvd, RlstScalar};
Expand All @@ -22,6 +25,7 @@ use crate::{
fmm::{HomogenousKernel, Metadata, MetadataAccess},
general::{multi_node::GlobalFmmMetadata, single_node::Epsilon},
types::{CommunicationTime, CommunicationType, MetadataTime, MetadataType},
tree::MultiTree,

Check failure on line 28 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Run Rust tests (stable, mpich, --features "strict")

unused import: `tree::MultiTree`

Check failure on line 28 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Build docs

unused import: `tree::MultiTree`

Check failure on line 28 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Run Rust tests (stable, openmpi, --features "strict")

unused import: `tree::MultiTree`
},
tree::{
types::{Domain, SortKind},
Expand Down Expand Up @@ -206,6 +210,7 @@ where
/// Parameters
pub fn parameters(
mut self,
charges: &[Scalar],
expansion_order: usize,
kernel: Kernel,
source_to_target: FieldTranslation,
Expand All @@ -216,7 +221,68 @@ where
"Must build tree before specifying FMM parameters",

Check warning on line 221 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Rust style checks (--features "strict")

Diff in /home/runner/work/kifmm/kifmm/kifmm/src/fmm/builder/multi_node.rs

Check warning on line 221 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Rust style checks

Diff in /home/runner/work/kifmm/kifmm/kifmm/src/fmm/builder/multi_node.rs
))
} else {
// TODO: Mapping of global indices needs to happen here eventually.

let global_indices = &self
.tree
.as_ref()
.unwrap()
.source_tree

Check warning on line 229 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Rust style checks (--features "strict")

Diff in /home/runner/work/kifmm/kifmm/kifmm/src/fmm/builder/multi_node.rs

Check warning on line 229 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Rust style checks

Diff in /home/runner/work/kifmm/kifmm/kifmm/src/fmm/builder/multi_node.rs
.unsorted_global_indices;

let destination_ranks = &self
.tree
.as_ref()
.unwrap()
.source_tree
.destination_ranks;

let packets= charges.iter().zip(global_indices.iter())
.map(|(c, gidx)| (c, gidx)).collect_vec();

// Communicate message sizes
let size = self.tree.as_ref().unwrap().source_tree.communicator.size();
let communicator = &self.tree.as_ref().unwrap().source_tree.communicator;

let mut counts_snd = vec![0i32; size as usize];
for &destination_rank in destination_ranks.iter() {
counts_snd[destination_rank as usize] += 1
}

let displs_snd = counts_snd
.iter()
.scan(0, |acc, &x| {
let tmp = *acc;
*acc += x;
Some(tmp)
})
.collect_vec();

let mut counts_recv = vec![0 as Count; size as usize];

communicator.all_to_all_into(&counts_snd, &mut counts_recv);

// Communicate packets
let displs_recv = counts_recv
.iter()
.scan(0, |acc, &x| {
let tmp = *acc;
*acc += x;
Some(tmp)
})
.collect_vec();

let total = counts_recv.iter().sum::<Count>();

let total = counts_recv.iter().sum::<Count>();

let mut received = vec![T::default(); total as usize];

Check failure on line 278 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Run Rust tests (stable, mpich, --features "strict")

failed to resolve: use of undeclared type `T`

Check failure on line 278 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Build docs

failed to resolve: use of undeclared type `T`

Check failure on line 278 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Run Rust tests (stable, openmpi, --features "strict")

failed to resolve: use of undeclared type `T`
let mut partition_received =

Check warning on line 279 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Rust style checks (--features "strict")

Diff in /home/runner/work/kifmm/kifmm/kifmm/src/fmm/builder/multi_node.rs

Check warning on line 279 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Rust style checks

Diff in /home/runner/work/kifmm/kifmm/kifmm/src/fmm/builder/multi_node.rs
PartitionMut::new(&mut received[..], counts_recv, &displs_recv[..]);
let partition_snd = Partition::new(&packets[..], counts_snd, &displs_snd[..]);

Check failure on line 281 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Run Rust tests (stable, mpich, --features "strict")

Buffer` is not satisfied

Check failure on line 281 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Build docs

Buffer` is not satisfied

Check failure on line 281 in kifmm/src/fmm/builder/multi_node.rs

View workflow job for this annotation

GitHub Actions / Run Rust tests (stable, openmpi, --features "strict")

Buffer` is not satisfied




self.n_coeffs_equivalent_surface = Some(ncoeffs_kifmm(expansion_order));
self.n_coeffs_check_surface = Some(ncoeffs_kifmm(expansion_order));
self.kernel = Some(kernel);
Expand Down
16 changes: 9 additions & 7 deletions kifmm/src/sorting/samplesort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use mpi::{
datatype::{Partition, PartitionMut},
topology::SimpleCommunicator,
traits::{Communicator, CommunicatorCollectives, Equivalence},
Count,
Count, Rank,
};
use rand::{thread_rng, Rng};

Expand All @@ -21,7 +21,7 @@ pub fn samplesort<T>(
array: &mut Vec<T>,
communicator: &SimpleCommunicator,
n_samples: usize,
) -> Result<(), std::io::Error>
) -> Result<Option<Vec<Rank>>, std::io::Error>
where
T: Equivalence + Ord + Default + Clone + Debug,
{
Expand Down Expand Up @@ -67,7 +67,7 @@ where
.step_by(n_samples)
.collect_vec();

let mut ranks = Vec::new();
let mut destination_ranks = Vec::new();
for item in array.iter() {
let mut rank_index = -1i32;
for (i, splitter) in splitters.iter().enumerate() {
Expand All @@ -80,12 +80,12 @@ where
}
}

ranks.push(rank_index);
destination_ranks.push(rank_index);
}

let mut counts_snd = vec![0i32; size as usize];
for &rank in ranks.iter() {
counts_snd[rank as usize] += 1
for &destination_rank in destination_ranks.iter() {
counts_snd[destination_rank as usize] += 1
}

let displs_snd = counts_snd
Expand Down Expand Up @@ -120,7 +120,9 @@ where
communicator.all_to_all_varcount_into(&partition_snd, &mut partition_received);
received.sort();

Check warning on line 121 in kifmm/src/sorting/samplesort.rs

View workflow job for this annotation

GitHub Actions / Rust style checks (--features "strict")

Diff in /home/runner/work/kifmm/kifmm/kifmm/src/sorting/samplesort.rs

Check warning on line 121 in kifmm/src/sorting/samplesort.rs

View workflow job for this annotation

GitHub Actions / Rust style checks

Diff in /home/runner/work/kifmm/kifmm/kifmm/src/sorting/samplesort.rs
*array = received;

return Ok(Some(destination_ranks))
}

Ok(())
Ok(None)
}
14 changes: 8 additions & 6 deletions kifmm/src/sorting/simplesort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use mpi::{
datatype::{Partition, PartitionMut},
topology::SimpleCommunicator,
traits::{Communicator, CommunicatorCollectives, Equivalence},
Count,
Count, Rank,
};

/// Simple sort is a bucket style sort specialised for octrees. In this case, the number of 'splitters'
Expand All @@ -26,7 +26,7 @@ pub fn simplesort<T>(
array: &mut Vec<T>,
communicator: &SimpleCommunicator,
splitters: &mut [T],
) -> Result<(), std::io::Error>
) -> Result<Option<Vec<Rank>>, std::io::Error>
where
T: Equivalence + Ord + Default + Clone + Debug,
{
Expand All @@ -47,7 +47,7 @@ where

if size > 1 {
// Sort local data into buckets
let mut ranks = Vec::new();
let mut destination_ranks = Vec::new();
for item in array.iter() {
let mut rank_index = -1i32;
for (i, splitter) in splitters.iter().enumerate() {
Expand All @@ -60,11 +60,11 @@ where
}
}

ranks.push(rank_index);
destination_ranks.push(rank_index);
}

let mut counts_snd = vec![0i32; size as usize];
for &rank in ranks.iter() {
for &rank in destination_ranks.iter() {
counts_snd[rank as usize] += 1
}

Expand Down Expand Up @@ -101,7 +101,9 @@ where
received.sort();

Check warning on line 102 in kifmm/src/sorting/simplesort.rs

View workflow job for this annotation

GitHub Actions / Rust style checks (--features "strict")

Diff in /home/runner/work/kifmm/kifmm/kifmm/src/sorting/simplesort.rs

Check warning on line 102 in kifmm/src/sorting/simplesort.rs

View workflow job for this annotation

GitHub Actions / Rust style checks

Diff in /home/runner/work/kifmm/kifmm/kifmm/src/sorting/simplesort.rs
*array = received;

return Ok(Some(destination_ranks))
}

Ok(())
Ok(None)
}
5 changes: 4 additions & 1 deletion kifmm/src/traits/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ pub trait SingleTree {
/// - `leaf` - node being queried.
fn global_indices(&self, leaf: &Self::Node) -> Option<&[usize]>;

/// gets all global indices (local in mult inode setting)
/// gets all global indices
fn all_global_indices(&self) -> Option<&[usize]>;

/// Get domain defined by the points, gets global domain in multi node setting.
Expand Down Expand Up @@ -217,6 +217,9 @@ pub trait MultiTree {
/// - `idx` - Index being query.
fn node(&self, idx: usize) -> Option<&<Self::SingleTree as SingleTree>::Node>;

/// gets all global indices
fn all_global_indices(&self) -> Option<&[usize]>;

/// Get domain defined by the points across all nodes.
fn domain(&self) -> &<Self::SingleTree as SingleTree>::Domain;
}
Expand Down
Loading

0 comments on commit 9385b2c

Please sign in to comment.