Skip to content

Commit

Permalink
walk: Limit batch sizes in --exec mode
Browse files Browse the repository at this point in the history
  • Loading branch information
tavianator committed Nov 13, 2023
1 parent 815b3b1 commit e7e16a4
Showing 1 changed file with 16 additions and 6 deletions.
22 changes: 16 additions & 6 deletions src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,23 @@ impl IntoIterator for Batch {
struct BatchSender {
batch: Batch,
tx: Sender<Batch>,
limit: usize,
}

impl BatchSender {
fn new(tx: Sender<Batch>) -> Self {
fn new(tx: Sender<Batch>, limit: usize) -> Self {
Self {
batch: Batch::new(),
tx,
limit,
}
}

/// Check if we need to flush a batch.
fn needs_flush(batch: Option<&Vec<WorkerResult>>) -> bool {
fn needs_flush(&self, batch: Option<&Vec<WorkerResult>>) -> bool {
match batch {
// Limit the batch size to provide some backpressure
Some(vec) => vec.len() >= 0x400,
Some(vec) => vec.len() >= self.limit,
// Batch was already taken by the receiver, so make a new one
None => true,
}
Expand All @@ -99,7 +101,7 @@ impl BatchSender {
fn send(&mut self, item: WorkerResult) -> Result<(), SendError<()>> {
let mut batch = self.batch.lock();

if Self::needs_flush(batch.as_ref()) {
if self.needs_flush(batch.as_ref()) {
drop(batch);
self.batch = Batch::new();
batch = self.batch.lock();
Expand Down Expand Up @@ -443,7 +445,15 @@ impl WorkerState {
let patterns = &self.patterns;
let config = &self.config;
let quit_flag = self.quit_flag.as_ref();
let mut tx = BatchSender::new(tx.clone());

let mut limit = 0x100;
if let Some(cmd) = &config.command {
if !cmd.in_batch_mode() && config.threads > 1 {
// Evenly distribute work between multiple receivers
limit = 1;
}
}
let mut tx = BatchSender::new(tx.clone(), limit);

Box::new(move |entry| {
if quit_flag.load(Ordering::Relaxed) {
Expand Down Expand Up @@ -628,7 +638,7 @@ impl WorkerState {
.unwrap();
}

let (tx, rx) = bounded(config.threads);
let (tx, rx) = bounded(2 * config.threads);

let exit_code = thread::scope(|scope| {
// Spawn the receiver thread(s)
Expand Down

0 comments on commit e7e16a4

Please sign in to comment.