Skip to content

Commit

Permalink
[Parallel Executor] Rolling Commit (#6079)
Browse files Browse the repository at this point in the history
* [WIP] Rolling commit

* fix liveness

* debug unit test

* committed is executed, sometimes

* fix unit tests and cleanup

* lint

* add comments

* add thread yielding

* replace mutex with parkinglot::RwLock for txn_status

* cleanup

* lint

* addressing comments, next step adding more tests

* add unit test

* address comments and add wave explanations

* clean up

* address comments

---------

Co-authored-by: Rati Gelashvili <[email protected]>
Co-authored-by: Rati Gelashvili <[email protected]>
  • Loading branch information
3 people authored Jan 28, 2023
1 parent 802a7e4 commit f8283fb
Show file tree
Hide file tree
Showing 5 changed files with 575 additions and 295 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aptos-move/block-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ dashmap = { workspace = true }
move-binary-format = { workspace = true }
num_cpus = { workspace = true }
once_cell = { workspace = true }
parking_lot = { workspace = true }
proptest = { workspace = true, optional = true }
proptest-derive = { workspace = true, optional = true }
rayon = { workspace = true }
Expand Down
51 changes: 33 additions & 18 deletions aptos-move/block-executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
counters::{TASK_EXECUTE_SECONDS, TASK_VALIDATE_SECONDS, VM_INIT_SECONDS},
errors::*,
output_delta_resolver::OutputDeltaResolver,
scheduler::{Scheduler, SchedulerTask, TaskGuard, Version},
scheduler::{Scheduler, SchedulerTask, Version, Wave},
task::{ExecutionStatus, ExecutorTask, Transaction, TransactionOutput},
txn_last_input_output::TxnLastInputOutput,
view::{LatestView, MVHashMapView},
Expand All @@ -17,7 +17,11 @@ use aptos_state_view::TStateView;
use aptos_types::write_set::WriteOp;
use num_cpus;
use once_cell::sync::Lazy;
use std::{collections::btree_map::BTreeMap, marker::PhantomData};
use std::{
collections::btree_map::BTreeMap,
marker::PhantomData,
sync::atomic::{AtomicBool, Ordering},
};

pub static RAYON_EXEC_POOL: Lazy<rayon::ThreadPool> = Lazy::new(|| {
rayon::ThreadPoolBuilder::new()
Expand Down Expand Up @@ -54,17 +58,16 @@ where
}
}

fn execute<'a>(
fn execute(
&self,
version: Version,
guard: TaskGuard<'a>,
signature_verified_block: &[T],
last_input_output: &TxnLastInputOutput<T::Key, E::Output, E::Error>,
versioned_data_cache: &MVHashMap<T::Key, T::Value>,
scheduler: &'a Scheduler,
scheduler: &Scheduler,
executor: &E,
base_view: &S,
) -> SchedulerTask<'a> {
) -> SchedulerTask {
let _timer = TASK_EXECUTE_SECONDS.start_timer();
let (idx_to_execute, incarnation) = version;
let txn = &signature_verified_block[idx_to_execute];
Expand Down Expand Up @@ -128,17 +131,17 @@ where
}

last_input_output.record(idx_to_execute, speculative_view.take_reads(), result);
scheduler.finish_execution(idx_to_execute, incarnation, updates_outside, guard)
scheduler.finish_execution(idx_to_execute, incarnation, updates_outside)
}

fn validate<'a>(
fn validate(
&self,
version_to_validate: Version,
guard: TaskGuard<'a>,
validation_wave: Wave,
last_input_output: &TxnLastInputOutput<T::Key, E::Output, E::Error>,
versioned_data_cache: &MVHashMap<T::Key, T::Value>,
scheduler: &'a Scheduler,
) -> SchedulerTask<'a> {
scheduler: &Scheduler,
) -> SchedulerTask {
use MVHashMapError::*;
use MVHashMapOutput::*;

Expand Down Expand Up @@ -175,8 +178,9 @@ where
versioned_data_cache.mark_estimate(&k, idx_to_validate);
}

scheduler.finish_abort(idx_to_validate, incarnation, guard)
scheduler.finish_abort(idx_to_validate, incarnation)
} else {
scheduler.finish_validation(idx_to_validate, validation_wave);
SchedulerTask::NoTask
}
}
Expand All @@ -189,6 +193,7 @@ where
versioned_data_cache: &MVHashMap<T::Key, T::Value>,
scheduler: &Scheduler,
base_view: &S,
committing: bool,
) {
// Make executor for each task. TODO: fast concurrent executor.
let init_timer = VM_INIT_SECONDS.start_timer();
Expand All @@ -197,25 +202,33 @@ where

let mut scheduler_task = SchedulerTask::NoTask;
loop {
// Only one thread try_commit to avoid contention.
if committing {
// Keep committing txns until there is no more that can be committed now.
loop {
if scheduler.try_commit().is_none() {
break;
}
}
}
scheduler_task = match scheduler_task {
SchedulerTask::ValidationTask(version_to_validate, guard) => self.validate(
SchedulerTask::ValidationTask(version_to_validate, wave) => self.validate(
version_to_validate,
guard,
wave,
last_input_output,
versioned_data_cache,
scheduler,
),
SchedulerTask::ExecutionTask(version_to_execute, None, guard) => self.execute(
SchedulerTask::ExecutionTask(version_to_execute, None) => self.execute(
version_to_execute,
guard,
block,
last_input_output,
versioned_data_cache,
scheduler,
&executor,
base_view,
),
SchedulerTask::ExecutionTask(_, Some(condvar), _guard) => {
SchedulerTask::ExecutionTask(_, Some(condvar)) => {
let (lock, cvar) = &*condvar;
// Mark dependency resolved.
*lock.lock() = true;
Expand All @@ -224,7 +237,7 @@ where

SchedulerTask::NoTask
},
SchedulerTask::NoTask => scheduler.next_task(),
SchedulerTask::NoTask => scheduler.next_task(committing),
SchedulerTask::Done => {
break;
},
Expand All @@ -248,6 +261,7 @@ where

let num_txns = signature_verified_block.len();
let last_input_output = TxnLastInputOutput::new(num_txns);
let committing = AtomicBool::new(true);
let scheduler = Scheduler::new(num_txns);

RAYON_EXEC_POOL.scope(|s| {
Expand All @@ -260,6 +274,7 @@ where
&versioned_data_cache,
&scheduler,
base_view,
committing.swap(false, Ordering::SeqCst),
);
});
}
Expand Down
Loading

0 comments on commit f8283fb

Please sign in to comment.