Skip to content

Commit

Permalink
save progress
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand committed Dec 16, 2024
1 parent 1c4f6ed commit 29f45e6
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 11 deletions.
27 changes: 25 additions & 2 deletions execution/executor-benchmark/src/transaction_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub struct TransactionCommitter<V> {
executor: Arc<BlockExecutor<V>>,
start_version: Version,
block_receiver: mpsc::Receiver<CommitBlockMessage>,
batch_sender: mpsc::Sender<SchemaBatch>,
batch_receiver: mpsc::Receiver<SchemaBatch>,
}

impl<V> TransactionCommitter<V>
Expand All @@ -62,16 +64,38 @@ where
start_version: Version,
block_receiver: mpsc::Receiver<CommitBlockMessage>,
) -> Self {
// spawn a new thread in backgrond to do the actual commit
let (batch_sender, batch_receiver) = mpsc::channel();

Self {
executor,
start_version,
block_receiver,
batch_sender,
batch_receiver,
}
}

fn commit_batch(&self, batch: SchemaBatch) -> Result<()> {
Ok(())
}

fn prepare_commit(&self, block_id: u64, ledger_info_sigs: LedgerInfoWithSignatures) -> Result<()> {
self.executor.pre_commit_block(block_id)?;
self.executor.commit_ledger(ledger_info_sigs)?;
Ok(())
}

pub fn run(&mut self) {
info!("Start with version: {}", self.start_version);

// Spawn a new thread in backgrond to do the actual commit
let commit_thread = thread::spawn(move || {
while let Ok(batch) = self.batch_receiver.recv() {
self.commit_batch(batch).unwrap();
}
});

while let Ok(msg) = self.block_receiver.recv() {
let CommitBlockMessage {
block_id,
Expand All @@ -93,8 +117,7 @@ where
let version = output.expect_last_version();
let commit_start = Instant::now();
let ledger_info_with_sigs = gen_li_with_sigs(block_id, root_hash, version);
self.executor.pre_commit_block(block_id).unwrap();
self.executor.commit_ledger(ledger_info_with_sigs).unwrap();
self.prepare_commit(block_id, ledger_info_sigs).unwrap();

report_block(
self.start_version,
Expand Down
Empty file.
9 changes: 0 additions & 9 deletions storage/aptosdb/src/db/include/aptosdb_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,15 +380,6 @@ impl AptosDB {
.write_schemas(ledger_metadata_batch)
.unwrap();
});
s.spawn(|_| {
self.state_kv_db
.commit(
chunk.expect_last_version(),
state_kv_metadata_batch,
sharded_state_kv_batches,
)
.unwrap();
});
});

Ok(())
Expand Down
1 change: 1 addition & 0 deletions storage/aptosdb/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ pub struct AptosDB {
indexer: Option<Indexer>,
skip_index_and_usage: bool,
update_subscriber: Option<Sender<Version>>,
dummy_cache: HashMap<HashValue, Vec<(version, StateValue)>>,
}

// DbReader implementations and private functions used by them.
Expand Down

0 comments on commit 29f45e6

Please sign in to comment.