Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bind replication threads to specific cores #1305

Merged
merged 1 commit into from
Oct 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,16 @@ jobs:
steps:
- configure_environment_variables
- checkout
- run:
name: Install hwloc 1.11.9
command: |
cd /tmp
curl https://www.open-mpi.org/software/hwloc/v1.11/downloads/hwloc-1.11.9.tar.gz --location --output /tmp/hwloc-1.11.9.tar.gz
tar xzvf hwloc-1.11.9.tar.gz
cd hwloc-1.11.9
./configure
make
sudo make install
- run:
name: Install Rust
command: |
Expand Down
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ The instructions below assume you have independently installed `rust-fil-proofs`

**NOTE:** `rust-fil-proofs` can only be built for and run on 64-bit platforms; building will panic if the target architecture is not 64-bits.

Before building you will need OpenCL to be installed, on Ubuntu this can be achieved with `apt install ocl-icd-opencl-dev`. Other system dependencies such as 'gcc/clang', 'wall' and 'cmake' are also required.
Before building you will need OpenCL to be installed. On Ubuntu, this can be achieved with `apt install ocl-icd-opencl-dev`. Other system dependencies such as 'gcc/clang', 'wall' and 'cmake' are also required.

You will also need to install the hwloc library. On Ubuntu, this can be achieved with `apt install hwloc libhwloc-dev`. For other platforms, please see the [hwloc-rs Prerequisites section](https://github.com/daschl/hwloc-rs).


```
> cargo build --release --all
Expand Down Expand Up @@ -179,6 +182,18 @@ accomplished either by running the process as root, or by increasing the system
-l`. Two sector size's worth of data (for current and previous layers) must be locked -- along with 56 *
`FIL_PROOFS_PARENT_CACHE_SIZE` bytes for the parent cache.

Default parameters have been tuned to provide good performance on the AMD Ryzen Threadripper 3970x. It may be useful to
experiment with these, especially on different hardware. We have made an effort to use sensible heuristics and to ensure
reasonable behavior for a range of configurations and hardware, but actual performance or behavior of mulitcore
replication is not yet well tested except on our target. The following settings may be useful, but do expect some
failure in the search for good parameters. This might take the form of failed replication (bad proofs), errors during
replication, or even potentially crashes if parameters prove pathological. For now, this is an experimental feature, and
only the default configuration on default hardware (3970x) is known to work well.

`FIL_PROOFS_MULTICORE_SDR_PRODUCERS`: This is the number of worker threads loading node parents in parallel. The default is `3` so the producers and main thread together use a full core complex (but no more).
`FIL_PROOFS_MULTICORE_SDR_PRODUCER_STRIDE`: This is the (max) number of nodes for which a producer thread will load parents in each iteration of its loop. The default is`128`.
`FIL_PROOFS_MULTICORE_SDR_LOOKAHEAD`: This is the size of the lookahead buffer into which node parents are pre-loaded by the producer threads. The default is 800.

### GPU Usage

We can now optionally build the column hashed tree 'tree_c' using the GPU with noticeable speed-up over the CPU. To activate the GPU for this, use the environment variable
Expand Down
6 changes: 6 additions & 0 deletions storage-proofs/core/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ pub struct Settings {
pub parent_cache: String,
pub use_fil_blst: bool,
pub use_multicore_sdr: bool,
pub multicore_sdr_producers: usize,
pub multicore_sdr_producer_stride: u64,
pub multicore_sdr_lookahead: usize,
}

impl Default for Settings {
Expand All @@ -54,6 +57,9 @@ impl Default for Settings {
parent_cache: cache("filecoin-parents"),
use_fil_blst: false,
use_multicore_sdr: false,
multicore_sdr_producers: 3,
multicore_sdr_producer_stride: 128,
multicore_sdr_lookahead: 800,
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions storage-proofs/porep/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ bincode = "1.1.2"
byteorder = "1.3.4"
lazy_static = "1.2"
byte-slice-cast = "0.3.5"
hwloc = "0.3.0"
libc = "0.2"

[dev-dependencies]
tempfile = "3"
Expand Down
219 changes: 219 additions & 0 deletions storage-proofs/porep/src/stacked/vanilla/cores.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
use log::*;
use std::sync::{Mutex, MutexGuard};

use anyhow::Result;
use hwloc::{ObjectType, Topology, TopologyObject, CPUBIND_THREAD};
use lazy_static::lazy_static;

use storage_proofs_core::settings;

type CoreGroup = Vec<CoreIndex>;
lazy_static! {
pub static ref TOPOLOGY: Mutex<Topology> = Mutex::new(Topology::new());
pub static ref CORE_GROUPS: Option<Vec<Mutex<CoreGroup>>> = {
let settings = settings::SETTINGS.lock().expect("settings lock failure");
let num_producers = settings.multicore_sdr_producers;
let cores_per_unit = num_producers + 1;

core_groups(cores_per_unit)
};
}

#[derive(Clone, Copy, Debug, PartialEq)]
/// `CoreIndex` is a simple wrapper type for indexes into the set of vixible cores. A `CoreIndex` should only ever be
/// created with a value known to be less than the number of visible cores.
pub struct CoreIndex(usize);
porcuquine marked this conversation as resolved.
Show resolved Hide resolved

pub fn checkout_core_group() -> Option<MutexGuard<'static, CoreGroup>> {
match &*CORE_GROUPS {
Some(groups) => {
for (i, group) in groups.iter().enumerate() {
match group.try_lock() {
Ok(guard) => {
debug!("checked out core group {}", i);
return Some(guard);
}
Err(_) => debug!("core group {} locked, could not checkout", i),
}
}
None
}
None => None,
}
}

#[cfg(not(target_os = "windows"))]
pub type ThreadId = libc::pthread_t;

#[cfg(target_os = "windows")]
pub type ThreadId = winapi::winnt::HANDLE;

/// Helper method to get the thread id through libc, with current rust stable (1.5.0) its not
/// possible otherwise I think.
#[cfg(not(target_os = "windows"))]
fn get_thread_id() -> ThreadId {
unsafe { libc::pthread_self() }
}

#[cfg(target_os = "windows")]
fn get_thread_id() -> ThreadId {
unsafe { kernel32::GetCurrentThread() }
}

pub struct Cleanup {
tid: ThreadId,
prior_state: Option<hwloc::Bitmap>,
}

impl Drop for Cleanup {
fn drop(&mut self) {
match self.prior_state.take() {
Some(prior) => {
let child_topo = &TOPOLOGY;
let mut locked_topo = child_topo.lock().unwrap();
let _ = locked_topo.set_cpubind_for_thread(self.tid, prior, CPUBIND_THREAD);
}
None => (),
}
}
}

pub fn bind_core(core_index: CoreIndex) -> Result<Cleanup> {
let child_topo = &TOPOLOGY;
let tid = get_thread_id();
let mut locked_topo = child_topo.lock().unwrap();
let core = get_core_by_index(&locked_topo, core_index).map_err(|err| {
anyhow::format_err!("failed to get core at index {}: {:?}", core_index.0, err)
})?;

let cpuset = core.allowed_cpuset().ok_or_else(|| {
anyhow::format_err!("no allowed cpuset for core at index {}", core_index.0,)
})?;
debug!("allowed cpuset: {:?}", cpuset);
let mut bind_to = cpuset;

// Get only one logical processor (in case the core is SMT/hyper-threaded).
bind_to.singlify();

// Thread binding before explicit set.
let before = locked_topo.get_cpubind_for_thread(tid, CPUBIND_THREAD);

debug!("binding to {:?}", bind_to);
// Set the binding.
let result = locked_topo
.set_cpubind_for_thread(tid, bind_to, CPUBIND_THREAD)
.map_err(|err| anyhow::format_err!("failed to bind CPU: {:?}", err));

if result.is_err() {
warn!("error in bind_core, {:?}", result);
}

Ok(Cleanup {
tid,
prior_state: before,
})
}

fn get_core_by_index<'a>(topo: &'a Topology, index: CoreIndex) -> Result<&'a TopologyObject> {
let idx = index.0;

match topo.objects_with_type(&ObjectType::Core) {
Ok(all_cores) if idx < all_cores.len() => Ok(all_cores[idx]),
Ok(all_cores) => Err(anyhow::format_err!(
"idx ({}) out of range for {} cores",
idx,
all_cores.len()
)),
_e => Err(anyhow::format_err!("failed to get core by index {}", idx,)),
}
}

fn core_groups(cores_per_unit: usize) -> Option<Vec<Mutex<Vec<CoreIndex>>>> {
let topo = TOPOLOGY.lock().unwrap();

let core_depth = match topo.depth_or_below_for_type(&ObjectType::Core) {
Ok(depth) => depth,
Err(_) => return None,
};
let all_cores = topo.objects_with_type(&ObjectType::Core).unwrap();
let core_count = all_cores.len();

let mut cache_depth = core_depth;
let mut cache_count = 0;

while cache_depth > 0 {
let objs = topo.objects_at_depth(cache_depth);
let obj_count = objs.len();
if obj_count < core_count {
cache_count = obj_count;
break;
}

cache_depth -= 1;
}

assert_eq!(0, core_count % cache_count);
let mut group_size = core_count / cache_count;
let mut group_count = cache_count;

if cache_count <= 1 {
// If there are not more than one shared caches, there is no benefit in trying to group cores by cache.
// In that case, prefer more groups so we can still bind cores and also get some parallelism.
// Create as many full groups as possible. The last group may not be full.
group_count = core_count / cores_per_unit;
group_size = cores_per_unit;

info!(
"found only {} shared cache(s), heuristically grouping cores into {} groups",
cache_count, group_count
);
} else {
debug!(
"Cores: {}, Shared Caches: {}, cores per cache (group_size): {}",
core_count, cache_count, group_size
);
}

let core_groups = (0..group_count)
.map(|i| {
(0..group_size)
.map(|j| {
let core_index = i * group_size + j;
assert!(core_index < core_count);
CoreIndex(core_index)
})
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();

Some(
core_groups
.iter()
.map(|group| Mutex::new(group.clone()))
.collect::<Vec<_>>(),
)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_cores() {
core_groups(2);
}

#[test]
fn test_checkout_cores() {
let checkout1 = checkout_core_group();
dbg!(&checkout1);
let checkout2 = checkout_core_group();
dbg!(&checkout2);

// This test might fail if run on a machine with fewer than four cores.
match (checkout1, checkout2) {
(Some(c1), Some(c2)) => assert!(*c1 != *c2),
_ => panic!("failed to get two checkouts"),
}
}
}
Loading