Skip to content

Commit

Permalink
fix: switch to yastl thread pool
Browse files Browse the repository at this point in the history
This avoids all deadlocks as so far observed. Also uses a more flexible channel
for sync
  • Loading branch information
dignifiedquire committed Jul 23, 2021
1 parent d535fb8 commit 3983d18
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 14 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,6 @@ members = [
"sha2raw",
"filecoin-hashers",
]

[patch.crates-io]
bellperson = { git = "https://github.com/filecoin-project/bellperson", branch = "feat-threadpool" }
1 change: 1 addition & 0 deletions storage-proofs-porep/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ hwloc = { version = "0.3.0", optional = true }
libc = "0.2"
fdlimit = "0.2.0"
fr32 = { path = "../fr32", version = "^1.0.0", default-features = false }
yastl = "0.1.2"

[target."cfg(target_arch = \"aarch64\")".dependencies]
sha2 = { version = "0.9.3", features = ["compress", "asm"] }
Expand Down
33 changes: 19 additions & 14 deletions storage-proofs-porep/src/stacked/vanilla/proof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use storage_proofs_core::{
settings::SETTINGS,
util::{default_rows_to_discard, NODE_SIZE},
};
use yastl::Pool;

use crate::{
encode::{decode, encode},
Expand Down Expand Up @@ -59,6 +60,8 @@ lazy_static! {
/// It might be possible to relax this constraint, but in that case, only one builder
/// should actually be active at any given time, so the mutex should still be used.
static ref GPU_LOCK: Mutex<()> = Mutex::new(());

static ref THREAD_POOL: Pool = Pool::new(num_cpus::get());
}

#[derive(Debug)]
Expand Down Expand Up @@ -478,8 +481,9 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr
ColumnArity: 'static + PoseidonArity,
TreeArity: PoseidonArity,
{
use crossbeam::channel::unbounded as channel;
use std::cmp::min;
use std::sync::{mpsc::sync_channel, Arc, RwLock};
use std::sync::{Arc, RwLock};

use bellperson::bls::Fr;
use fr32::fr_into_bytes;
Expand Down Expand Up @@ -509,14 +513,14 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr
let column_write_batch_size = SETTINGS.column_write_batch_size as usize;

// This channel will receive batches of columns and add them to the ColumnTreeBuilder.
let (builder_tx, builder_rx) = sync_channel(0);
let (builder_tx, builder_rx) = channel();

let config_count = configs.len(); // Don't move config into closure below.
rayon::scope(|s| {
THREAD_POOL.scoped(|s| {
// This channel will receive the finished tree data to be written to disk.
let (writer_tx, writer_rx) = sync_channel::<(Vec<Fr>, Vec<Fr>)>(0);
let (writer_tx, writer_rx) = channel::<(Vec<Fr>, Vec<Fr>)>();

s.spawn(move |_| {
s.execute(move || {
for i in 0..config_count {
let mut node_index = 0;
let builder_tx = builder_tx.clone();
Expand Down Expand Up @@ -585,7 +589,7 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr
}
}
});
s.spawn(move |_| {
s.execute(move || {
let _gpu_lock = GPU_LOCK.lock().expect("failed to get gpu lock");
let mut column_tree_builder = ColumnTreeBuilder::<ColumnArity, TreeArity>::new(
Some(BatcherType::OpenCL),
Expand Down Expand Up @@ -735,7 +739,7 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr
let mut hashes: Vec<<Tree::Hasher as Hasher>::Domain> =
vec![<Tree::Hasher as Hasher>::Domain::default(); nodes_count];

rayon::scope(|s| {
THREAD_POOL.scoped(|s| {
let n = num_cpus::get();

// only split if we have at least two elements per thread
Expand All @@ -748,7 +752,7 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr
for (chunk, hashes_chunk) in hashes.chunks_mut(chunk_size).enumerate() {
let labels = &labels;

s.spawn(move |_| {
s.execute(move || {
for (j, hash) in hashes_chunk.iter_mut().enumerate() {
let data: Vec<_> = (1..=layers)
.map(|layer| {
Expand Down Expand Up @@ -850,10 +854,10 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr
where
TreeArity: PoseidonArity,
{
use crossbeam::channel::unbounded as channel;
use std::cmp::min;
use std::fs::OpenOptions;
use std::io::Write;
use std::sync::mpsc::sync_channel;

use bellperson::bls::Fr;
use fr32::fr_into_bytes;
Expand All @@ -877,15 +881,16 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr
let max_gpu_tree_batch_size = SETTINGS.max_gpu_tree_batch_size as usize;

// This channel will receive batches of leaf nodes and add them to the TreeBuilder.
let (builder_tx, builder_rx) = sync_channel::<(Vec<Fr>, bool)>(0);
let (builder_tx, builder_rx) = channel::<(Vec<Fr>, bool)>();
let config_count = configs.len(); // Don't move config into closure below.
let configs = &configs;
let tree_r_last_config = &tree_r_last_config;
rayon::scope(|s| {

THREAD_POOL.scoped(|s| {
// This channel will receive the finished tree data to be written to disk.
let (writer_tx, writer_rx) = sync_channel::<Vec<Fr>>(0);
let (writer_tx, writer_rx) = channel::<Vec<Fr>>();

s.spawn(move |_| {
s.execute(move || {
for i in 0..config_count {
let mut node_index = 0;
while node_index != nodes_count {
Expand Down Expand Up @@ -959,7 +964,7 @@ impl<'a, Tree: 'static + MerkleTreeTrait, G: 'static + Hasher> StackedDrg<'a, Tr
}
}
});
s.spawn(move |_| {
s.execute(move || {
let _gpu_lock = GPU_LOCK.lock().expect("failed to get gpu lock");
let mut tree_builder = TreeBuilder::<Tree::Arity>::new(
Some(BatcherType::OpenCL),
Expand Down

0 comments on commit 3983d18

Please sign in to comment.