Skip to content

Commit

Permalink
Remove the accumulator thread for now
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Jan 18, 2024
1 parent cc5a1f0 commit 2f592f5
Showing 1 changed file with 30 additions and 71 deletions.
101 changes: 30 additions & 71 deletions unified-scheduler-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ enum SubchanneledPayload<P1, P2> {
}

type NewTaskPayload = SubchanneledPayload<Task, SchedulingContext>;
type ExecutedTaskPayload = SubchanneledPayload<Box<ExecutedTask>, ()>;

// A tiny generic message type to synchronize multiple threads everytime some contextual data needs
// to be switched (ie. SchedulingContext), just using a single communication channel.
Expand Down Expand Up @@ -381,7 +380,6 @@ struct ThreadManager<S: SpawnableScheduler<TH>, TH: TaskHandler> {
session_result_with_timings: Option<ResultWithTimings>,
scheduler_thread: Option<JoinHandle<()>>,
handler_threads: Vec<JoinHandle<()>>,
accumulator_thread: Option<JoinHandle<()>>,
}

impl<TH: TaskHandler> PooledScheduler<TH> {
Expand Down Expand Up @@ -415,7 +413,6 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
session_result_with_timings: None,
scheduler_thread: None,
handler_threads: Vec::with_capacity(handler_count),
accumulator_thread: None,
}
}

Expand All @@ -435,6 +432,24 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
);
}

fn accumulate_result_with_timings(
(result, timings): &mut ResultWithTimings,
executed_task: Box<ExecutedTask>,
) {
match executed_task.result_with_timings.0 {
Ok(()) => {}
Err(error) => {
error!("error is detected while accumulating....: {error:?}");
// Override errors intentionally for simplicity, not retaining the
// first error unlike the block verification in the
// blockstore_processor. This will be addressed with more
// full-fledged impl later.
*result = Err(error);
}
}
timings.accumulate(&executed_task.result_with_timings.1);
}

fn take_session_result_with_timings(&mut self) -> ResultWithTimings {
self.session_result_with_timings.take().unwrap()
}
Expand All @@ -450,10 +465,7 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
fn start_threads(&mut self, context: &SchedulingContext) {
let (mut runnable_task_sender, runnable_task_receiver) =
chained_channel::unbounded::<Task, SchedulingContext>(context.clone());
let (executed_task_sender, executed_task_receiver) = unbounded::<ExecutedTaskPayload>();
let (finished_task_sender, finished_task_receiver) = unbounded::<Box<ExecutedTask>>();
let (accumulated_result_sender, accumulated_result_receiver) =
unbounded::<Option<ResultWithTimings>>();

let mut result_with_timings = self.session_result_with_timings.take();

Expand All @@ -464,9 +476,6 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
// 4. the handler thread processes the dispatched task.
// 5. the handler thread reply back to the scheduler thread as an executed task.
// 6. the scheduler thread post-processes the executed task.
// 7. the scheduler thread send the executed task to the accumulator thread.
// 8. the accumulator thread examines the executed task's result and accumulate its timing,
// finally dropping the transaction inside the executed task.
let scheduler_main_loop = || {
let handler_count = self.handler_count;
let session_result_sender = self.session_result_sender.clone();
Expand Down Expand Up @@ -521,35 +530,33 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
let executed_task = executed_task.unwrap();

active_task_count = active_task_count.checked_sub(1).unwrap();
executed_task_sender
.send(ExecutedTaskPayload::Payload(executed_task))
.unwrap();
let result_with_timings = result_with_timings.as_mut().unwrap();
Self::accumulate_result_with_timings(result_with_timings, executed_task);
},
recv(new_task_receiver) -> message => {
assert!(!session_ending);

match message.unwrap() {
NewTaskPayload::Payload(task) => {
assert!(!session_ending);

// so, we're NOT scheduling at all here; rather, just execute
// tx straight off. the inter-tx locking deps aren't needed to
// be resolved in the case of single-threaded FIFO like this.
active_task_count = active_task_count.checked_add(1).unwrap();
runnable_task_sender
.send_payload(task)
.unwrap();
active_task_count = active_task_count.checked_add(1).unwrap();
}
NewTaskPayload::OpenSubchannel(context) => {
// signal about new SchedulingContext to both handler and
// accumulator threads
// signal about new SchedulingContext to handler threads
runnable_task_sender
.send_chained_channel(context, handler_count)
.unwrap();
executed_task_sender
.send(ExecutedTaskPayload::OpenSubchannel(()))
.unwrap();
assert_matches!(
result_with_timings.replace(initialized_result_with_timings()),
None
);
}
NewTaskPayload::CloseSubchannel => {
assert!(!session_ending);
session_ending = true;
}
}
Expand All @@ -562,14 +569,10 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}

if session_ending {
executed_task_sender
.send(ExecutedTaskPayload::CloseSubchannel)
.unwrap();
session_result_sender
.send(Some(
accumulated_result_receiver
.recv()
.unwrap()
result_with_timings
.take()
.unwrap_or_else(initialized_result_with_timings),
))
.unwrap();
Expand Down Expand Up @@ -603,57 +606,13 @@ impl<S: SpawnableScheduler<TH>, TH: TaskHandler> ThreadManager<S, TH> {
}
};

// This thread is needed because .accumualte() and drop::<SanitizedTransaction>() both are
// relatively heavy operations to be put inside the scheduler thread.
let accumulator_main_loop = || {
move || loop {
match executed_task_receiver.recv().unwrap() {
ExecutedTaskPayload::Payload(executed_task) => {
let result_with_timings = result_with_timings.as_mut().unwrap();
match executed_task.result_with_timings.0 {
Ok(()) => {}
Err(error) => {
error!("error is detected while accumulating....: {error:?}");
// Override errors intentionally for simplicity, not retaining the
// first error unlike the block verification in the
// blockstore_processor. This will be addressed with more
// full-fledged impl later.
result_with_timings.0 = Err(error);
}
}
result_with_timings
.1
.accumulate(&executed_task.result_with_timings.1);
}
ExecutedTaskPayload::OpenSubchannel(()) => {
assert_matches!(
result_with_timings.replace(initialized_result_with_timings()),
None
);
}
ExecutedTaskPayload::CloseSubchannel => {
accumulated_result_sender
.send(result_with_timings.take())
.unwrap();
}
}
}
};

self.scheduler_thread = Some(
thread::Builder::new()
.name("solScheduler".to_owned())
.spawn(scheduler_main_loop())
.unwrap(),
);

self.accumulator_thread = Some(
thread::Builder::new()
.name("solScAccmltr".to_owned())
.spawn(accumulator_main_loop())
.unwrap(),
);

self.handler_threads = (0..self.handler_count)
.map({
|thx| {
Expand Down

0 comments on commit 2f592f5

Please sign in to comment.