From 2f592f5a84dba4fa61f92e1c23d1923c7bfea238 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Thu, 18 Jan 2024 16:33:10 +0900 Subject: [PATCH] Remove the accumulator thread for now --- unified-scheduler-pool/src/lib.rs | 101 +++++++++--------------------- 1 file changed, 30 insertions(+), 71 deletions(-) diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index f28c6ca49d6291..4e073461eb0729 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -227,7 +227,6 @@ enum SubchanneledPayload { } type NewTaskPayload = SubchanneledPayload; -type ExecutedTaskPayload = SubchanneledPayload, ()>; // A tiny generic message type to synchronize multiple threads everytime some contextual data needs // to be switched (ie. SchedulingContext), just using a single communication channel. @@ -381,7 +380,6 @@ struct ThreadManager, TH: TaskHandler> { session_result_with_timings: Option, scheduler_thread: Option>, handler_threads: Vec>, - accumulator_thread: Option>, } impl PooledScheduler { @@ -415,7 +413,6 @@ impl, TH: TaskHandler> ThreadManager { session_result_with_timings: None, scheduler_thread: None, handler_threads: Vec::with_capacity(handler_count), - accumulator_thread: None, } } @@ -435,6 +432,24 @@ impl, TH: TaskHandler> ThreadManager { ); } + fn accumulate_result_with_timings( + (result, timings): &mut ResultWithTimings, + executed_task: Box, + ) { + match executed_task.result_with_timings.0 { + Ok(()) => {} + Err(error) => { + error!("error is detected while accumulating....: {error:?}"); + // Override errors intentionally for simplicity, not retaining the + // first error unlike the block verification in the + // blockstore_processor. This will be addressed with more + // full-fledged impl later. + *result = Err(error); + } + } + timings.accumulate(&executed_task.result_with_timings.1); + } + fn take_session_result_with_timings(&mut self) -> ResultWithTimings { self.session_result_with_timings.take().unwrap() } @@ -450,10 +465,7 @@ impl, TH: TaskHandler> ThreadManager { fn start_threads(&mut self, context: &SchedulingContext) { let (mut runnable_task_sender, runnable_task_receiver) = chained_channel::unbounded::(context.clone()); - let (executed_task_sender, executed_task_receiver) = unbounded::(); let (finished_task_sender, finished_task_receiver) = unbounded::>(); - let (accumulated_result_sender, accumulated_result_receiver) = - unbounded::>(); let mut result_with_timings = self.session_result_with_timings.take(); @@ -464,9 +476,6 @@ impl, TH: TaskHandler> ThreadManager { // 4. the handler thread processes the dispatched task. // 5. the handler thread reply back to the scheduler thread as an executed task. // 6. the scheduler thread post-processes the executed task. - // 7. the scheduler thread send the executed task to the accumulator thread. - // 8. the accumulator thread examines the executed task's result and accumulate its timing, - // finally dropping the transaction inside the executed task. let scheduler_main_loop = || { let handler_count = self.handler_count; let session_result_sender = self.session_result_sender.clone(); @@ -521,35 +530,33 @@ impl, TH: TaskHandler> ThreadManager { let executed_task = executed_task.unwrap(); active_task_count = active_task_count.checked_sub(1).unwrap(); - executed_task_sender - .send(ExecutedTaskPayload::Payload(executed_task)) - .unwrap(); + let result_with_timings = result_with_timings.as_mut().unwrap(); + Self::accumulate_result_with_timings(result_with_timings, executed_task); }, recv(new_task_receiver) -> message => { + assert!(!session_ending); + match message.unwrap() { NewTaskPayload::Payload(task) => { - assert!(!session_ending); - // so, we're NOT scheduling at all here; rather, just execute // tx straight off. the inter-tx locking deps aren't needed to // be resolved in the case of single-threaded FIFO like this. - active_task_count = active_task_count.checked_add(1).unwrap(); runnable_task_sender .send_payload(task) .unwrap(); + active_task_count = active_task_count.checked_add(1).unwrap(); } NewTaskPayload::OpenSubchannel(context) => { - // signal about new SchedulingContext to both handler and - // accumulator threads + // signal about new SchedulingContext to handler threads runnable_task_sender .send_chained_channel(context, handler_count) .unwrap(); - executed_task_sender - .send(ExecutedTaskPayload::OpenSubchannel(())) - .unwrap(); + assert_matches!( + result_with_timings.replace(initialized_result_with_timings()), + None + ); } NewTaskPayload::CloseSubchannel => { - assert!(!session_ending); session_ending = true; } } @@ -562,14 +569,10 @@ impl, TH: TaskHandler> ThreadManager { } if session_ending { - executed_task_sender - .send(ExecutedTaskPayload::CloseSubchannel) - .unwrap(); session_result_sender .send(Some( - accumulated_result_receiver - .recv() - .unwrap() + result_with_timings + .take() .unwrap_or_else(initialized_result_with_timings), )) .unwrap(); @@ -603,43 +606,6 @@ impl, TH: TaskHandler> ThreadManager { } }; - // This thread is needed because .accumualte() and drop::() both are - // relatively heavy operations to be put inside the scheduler thread. - let accumulator_main_loop = || { - move || loop { - match executed_task_receiver.recv().unwrap() { - ExecutedTaskPayload::Payload(executed_task) => { - let result_with_timings = result_with_timings.as_mut().unwrap(); - match executed_task.result_with_timings.0 { - Ok(()) => {} - Err(error) => { - error!("error is detected while accumulating....: {error:?}"); - // Override errors intentionally for simplicity, not retaining the - // first error unlike the block verification in the - // blockstore_processor. This will be addressed with more - // full-fledged impl later. - result_with_timings.0 = Err(error); - } - } - result_with_timings - .1 - .accumulate(&executed_task.result_with_timings.1); - } - ExecutedTaskPayload::OpenSubchannel(()) => { - assert_matches!( - result_with_timings.replace(initialized_result_with_timings()), - None - ); - } - ExecutedTaskPayload::CloseSubchannel => { - accumulated_result_sender - .send(result_with_timings.take()) - .unwrap(); - } - } - } - }; - self.scheduler_thread = Some( thread::Builder::new() .name("solScheduler".to_owned()) @@ -647,13 +613,6 @@ impl, TH: TaskHandler> ThreadManager { .unwrap(), ); - self.accumulator_thread = Some( - thread::Builder::new() - .name("solScAccmltr".to_owned()) - .spawn(accumulator_main_loop()) - .unwrap(), - ); - self.handler_threads = (0..self.handler_count) .map({ |thx| {