Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

ethcore/VerificationQueue don't spawn up extra worker-threads when explictly specified not to #9620

Merged
merged 3 commits into from
Sep 26, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 70 additions & 13 deletions ethcore/src/verification/queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ pub mod kind;
const MIN_MEM_LIMIT: usize = 16384;
const MIN_QUEUE_LIMIT: usize = 512;

// maximum possible number of verification threads.
const MAX_VERIFIERS: usize = 8;
Copy link
Collaborator Author

@niklasad1 niklasad1 Sep 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is removed because it assumes max 8 CPUs now instead use on num_cpus::get()


/// Type alias for block queue convenience.
pub type BlockQueue = VerificationQueue<self::kind::Blocks>;

Expand Down Expand Up @@ -85,7 +82,7 @@ impl Default for VerifierSettings {
fn default() -> Self {
VerifierSettings {
scale_verifiers: false,
num_verifiers: MAX_VERIFIERS,
num_verifiers: ::num_cpus::get(),
}
}
}
Expand Down Expand Up @@ -231,16 +228,24 @@ impl<K: Kind> VerificationQueue<K> {
let empty = Arc::new(Condvar::new());
let scale_verifiers = config.verifier_settings.scale_verifiers;

let num_cpus = ::num_cpus::get();
let max_verifiers = cmp::min(num_cpus, MAX_VERIFIERS);
let max_verifiers = ::num_cpus::get();
let default_amount = cmp::max(1, cmp::min(max_verifiers, config.verifier_settings.num_verifiers));

// if `auto-scaling` is enabled spawn up extra threads as they might be needed
// otherwise just spawn the number of threads specified by the config
let number_of_threads = if scale_verifiers {
max_verifiers
} else {
cmp::min(default_amount, max_verifiers)
};

let state = Arc::new((Mutex::new(State::Work(default_amount)), Condvar::new()));
let mut verifier_handles = Vec::with_capacity(max_verifiers);
let mut verifier_handles = Vec::with_capacity(number_of_threads);

debug!(target: "verification", "Allocating {} verifiers, {} initially active", max_verifiers, default_amount);
debug!(target: "verification", "Allocating {} verifiers, {} initially active", number_of_threads, default_amount);
debug!(target: "verification", "Verifier auto-scaling {}", if scale_verifiers { "enabled" } else { "disabled" });

for i in 0..max_verifiers {
for i in 0..number_of_threads {
debug!(target: "verification", "Adding verification thread #{}", i);

let verification = verification.clone();
Expand Down Expand Up @@ -743,6 +748,13 @@ mod tests {
BlockQueue::new(config, engine, IoChannel::disconnected(), true)
}

fn get_test_config(num_verifiers: usize, is_auto_scale: bool) -> Config {
let mut config = Config::default();
config.verifier_settings.num_verifiers = num_verifiers;
config.verifier_settings.scale_verifiers = is_auto_scale;
config
}

fn new_unverified(bytes: Bytes) -> Unverified {
Unverified::from_rlp(bytes).expect("Should be valid rlp")
}
Expand Down Expand Up @@ -843,12 +855,11 @@ mod tests {

#[test]
fn scaling_limits() {
use super::MAX_VERIFIERS;

let max_verifiers = ::num_cpus::get();
let queue = get_test_queue(true);
queue.scale_verifiers(MAX_VERIFIERS + 1);
queue.scale_verifiers(max_verifiers + 1);

assert!(queue.num_verifiers() < MAX_VERIFIERS + 1);
assert!(queue.num_verifiers() < max_verifiers + 1);

queue.scale_verifiers(0);

Expand Down Expand Up @@ -877,4 +888,50 @@ mod tests {
queue.collect_garbage();
assert_eq!(queue.num_verifiers(), 1);
}

#[test]
fn worker_threads_honor_specified_number_without_scaling() {
let spec = Spec::new_test();
let engine = spec.engine;
let config = get_test_config(1, false);
let queue = BlockQueue::new(config, engine, IoChannel::disconnected(), true);

assert_eq!(queue.num_verifiers(), 1);
}

#[test]
fn worker_threads_specified_to_zero_should_set_to_one() {
let spec = Spec::new_test();
let engine = spec.engine;
let config = get_test_config(0, false);
let queue = BlockQueue::new(config, engine, IoChannel::disconnected(), true);

assert_eq!(queue.num_verifiers(), 1);
}

#[test]
fn worker_threads_should_only_accept_max_number_cpus() {
let spec = Spec::new_test();
let engine = spec.engine;
let config = get_test_config(10_000, false);
let queue = BlockQueue::new(config, engine, IoChannel::disconnected(), true);
let num_cpus = ::num_cpus::get();

assert_eq!(queue.num_verifiers(), num_cpus);
}

#[test]
fn worker_threads_scaling_with_specifed_num_of_workers() {
let num_cpus = ::num_cpus::get();
// only run the test with at least 2 CPUs
if num_cpus > 1 {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is only useful on multicore CPUs

let spec = Spec::new_test();
let engine = spec.engine;
let config = get_test_config(num_cpus - 1, true);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can't overflow because num_cpus is at least 2

let queue = BlockQueue::new(config, engine, IoChannel::disconnected(), true);
queue.scale_verifiers(num_cpus);

assert_eq!(queue.num_verifiers(), num_cpus);
}
}
}