diff --git a/execution/executor-benchmark/src/transaction_committer.rs b/execution/executor-benchmark/src/transaction_committer.rs index d0311e27a457d..f25119bf04e9d 100644 --- a/execution/executor-benchmark/src/transaction_committer.rs +++ b/execution/executor-benchmark/src/transaction_committer.rs @@ -51,6 +51,8 @@ pub struct TransactionCommitter { executor: Arc>, start_version: Version, block_receiver: mpsc::Receiver, + batch_sender: mpsc::Sender, + batch_receiver: mpsc::Receiver, } impl TransactionCommitter @@ -62,16 +64,38 @@ where start_version: Version, block_receiver: mpsc::Receiver, ) -> Self { + // spawn a new thread in backgrond to do the actual commit + let (batch_sender, batch_receiver) = mpsc::channel(); + Self { executor, start_version, block_receiver, + batch_sender, + batch_receiver, } } + fn commit_batch(&self, batch: SchemaBatch) -> Result<()> { + Ok(()) + } + + fn prepare_commit(&self, block_id: u64, ledger_info_sigs: LedgerInfoWithSignatures) -> Result<()> { + self.executor.pre_commit_block(block_id)?; + self.executor.commit_ledger(ledger_info_sigs)?; + Ok(()) + } + pub fn run(&mut self) { info!("Start with version: {}", self.start_version); + // Spawn a new thread in backgrond to do the actual commit + let commit_thread = thread::spawn(move || { + while let Ok(batch) = self.batch_receiver.recv() { + self.commit_batch(batch).unwrap(); + } + }); + while let Ok(msg) = self.block_receiver.recv() { let CommitBlockMessage { block_id, @@ -93,8 +117,7 @@ where let version = output.expect_last_version(); let commit_start = Instant::now(); let ledger_info_with_sigs = gen_li_with_sigs(block_id, root_hash, version); - self.executor.pre_commit_block(block_id).unwrap(); - self.executor.commit_ledger(ledger_info_with_sigs).unwrap(); + self.prepare_commit(block_id, ledger_info_sigs).unwrap(); report_block( self.start_version, diff --git a/experimental/storage/commit_experiments/Cargo.toml b/experimental/storage/commit_experiments/Cargo.toml new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/storage/aptosdb/src/db/include/aptosdb_writer.rs b/storage/aptosdb/src/db/include/aptosdb_writer.rs index babbd5c5d1c78..4fa1bd29be701 100644 --- a/storage/aptosdb/src/db/include/aptosdb_writer.rs +++ b/storage/aptosdb/src/db/include/aptosdb_writer.rs @@ -380,15 +380,6 @@ impl AptosDB { .write_schemas(ledger_metadata_batch) .unwrap(); }); - s.spawn(|_| { - self.state_kv_db - .commit( - chunk.expect_last_version(), - state_kv_metadata_batch, - sharded_state_kv_batches, - ) - .unwrap(); - }); }); Ok(()) diff --git a/storage/aptosdb/src/db/mod.rs b/storage/aptosdb/src/db/mod.rs index 5a70281687533..bd6c3f55c8c92 100644 --- a/storage/aptosdb/src/db/mod.rs +++ b/storage/aptosdb/src/db/mod.rs @@ -104,6 +104,7 @@ pub struct AptosDB { indexer: Option, skip_index_and_usage: bool, update_subscriber: Option>, + dummy_cache: HashMap>, } // DbReader implementations and private functions used by them.