Skip to content

Commit

Permalink
state_updates_until_last_checkpoint: lazy clone
Browse files Browse the repository at this point in the history
  • Loading branch information
msmouse committed Oct 16, 2024
1 parent 130aa35 commit fa4d47c
Show file tree
Hide file tree
Showing 16 changed files with 23 additions and 36 deletions.
3 changes: 1 addition & 2 deletions execution/executor/src/block_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,7 @@ where
parent_block.output.state().base_version,
false,
result_in_memory_state,
// TODO(grao): Avoid this clone.
ledger_update.state_updates_until_last_checkpoint.clone(),
ledger_update.state_updates_until_last_checkpoint.as_ref(),
Some(&ledger_update.sharded_state_cache),
)?;
TRANSACTIONS_SAVED.observe(ledger_update.num_txns() as f64);
Expand Down
3 changes: 1 addition & 2 deletions execution/executor/src/chunk_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,11 +264,10 @@ impl<V: VMExecutor> ChunkExecutorInner<V> {
chunk.ledger_info.as_ref(),
false, // sync_commit
chunk.result_state.clone(),
// TODO(aldenhu): avoid cloning
chunk
.ledger_update_output
.state_updates_until_last_checkpoint
.clone(),
.as_ref(),
Some(&chunk.ledger_update_output.sharded_state_cache),
)?;
}
Expand Down
2 changes: 1 addition & 1 deletion execution/executor/src/db_bootstrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl GenesisCommitter {
self.output
.ledger_update_output
.state_updates_until_last_checkpoint
.clone(),
.as_ref(),
Some(&self.output.ledger_update_output.sharded_state_cache),
)?;
info!("Genesis commited.");
Expand Down
2 changes: 1 addition & 1 deletion execution/executor/src/fuzzing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl DbWriter for FakeDb {
_base_state_version: Option<Version>,
_sync_commit: bool,
_latest_in_memory_state: StateDelta,
_state_updates_until_last_checkpoint: Option<ShardedStateUpdates>,
_state_updates_until_last_checkpoint: Option<&ShardedStateUpdates>,
_sharded_state_cache: Option<&ShardedStateCache>,
) -> aptos_storage_interface::Result<()> {
Ok(())
Expand Down
6 changes: 2 additions & 4 deletions execution/executor/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,10 +508,9 @@ fn apply_transaction_by_writeset(
ledger_info.as_ref(),
true, /* sync_commit */
result_state,
// TODO(aldenhu): avoid clone
ledger_update_output
.state_updates_until_last_checkpoint
.clone(),
.as_ref(),
Some(&ledger_update_output.sharded_state_cache),
)
.unwrap();
Expand Down Expand Up @@ -720,10 +719,9 @@ fn run_transactions_naive(
ledger_info.as_ref(),
true, /* sync_commit */
result_state,
// TODO(aldenhu): avoid clone
ledger_update_output
.state_updates_until_last_checkpoint
.clone(),
.as_ref(),
Some(&ledger_update_output.sharded_state_cache),
)
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion state-sync/state-sync-driver/src/tests/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ mock! {
ledger_info_with_sigs: Option<&'a LedgerInfoWithSignatures>,
sync_commit: bool,
in_memory_state: StateDelta,
state_updates_until_last_checkpoint: Option<ShardedStateUpdates>,
state_updates_until_last_checkpoint: Option<&'b ShardedStateUpdates>,
sharded_state_cache: Option<&'b ShardedStateCache>,
) -> Result<()>;
}
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/db/fake_aptosdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl FakeBufferedState {

pub fn update(
&mut self,
updates_until_next_checkpoint_since_current_option: Option<ShardedStateUpdates>,
updates_until_next_checkpoint_since_current_option: Option<&ShardedStateUpdates>,
new_state_after_checkpoint: StateDelta,
) -> Result<()> {
ensure!(
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/db/include/aptosdb_testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl AptosDB {
ledger_info_with_sigs,
sync_commit,
latest_in_memory_state,
state_updates_until_last_checkpoint,
state_updates_until_last_checkpoint.as_ref(),
None,
)
}
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/db/include/aptosdb_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl DbWriter for AptosDB {
base_state_version: Option<Version>,
sync_commit: bool,
latest_in_memory_state: StateDelta,
state_updates_until_last_checkpoint: Option<ShardedStateUpdates>,
state_updates_until_last_checkpoint: Option<&ShardedStateUpdates>,
sharded_state_cache: Option<&ShardedStateCache>,
) -> Result<()> {
gauged_api("pre_commit_ledger", || {
Expand Down
6 changes: 4 additions & 2 deletions storage/aptosdb/src/db/test_helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,8 @@ pub fn test_sync_transactions_impl(
cur_ver,
&in_memory_state,
txns_to_commit_batch,
),
)
.as_ref(),
None,
)
.unwrap();
Expand All @@ -1025,7 +1026,8 @@ pub fn test_sync_transactions_impl(
Some(ledger_info_with_sigs),
false, /* sync_commit */
in_memory_state.clone(),
gather_state_updates_until_last_checkpoint(ver, &in_memory_state, txns_to_commit_batch),
gather_state_updates_until_last_checkpoint(ver, &in_memory_state, txns_to_commit_batch)
.as_ref(),
None,
)
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/fast_sync_storage_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl DbWriter for FastSyncStorageWrapper {
base_state_version: Option<Version>,
sync_commit: bool,
latest_in_memory_state: StateDelta,
state_updates_until_last_checkpoint: Option<ShardedStateUpdates>,
state_updates_until_last_checkpoint: Option<&ShardedStateUpdates>,
sharded_state_cache: Option<&ShardedStateCache>,
) -> Result<()> {
self.get_aptos_db_write_ref().pre_commit_ledger(
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/state_store/buffered_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl BufferedState {
/// This method updates the buffered state with new data.
pub fn update(
&mut self,
updates_until_next_checkpoint_since_current_option: Option<ShardedStateUpdates>,
updates_until_next_checkpoint_since_current_option: Option<&ShardedStateUpdates>,
new_state_after_checkpoint: StateDelta,
sync_commit: bool,
) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion storage/aptosdb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ impl StateStore {

// synchronously commit the snapshot at the last checkpoint here if not committed to disk yet.
buffered_state.update(
updates_until_last_checkpoint,
updates_until_last_checkpoint.as_ref(),
state_after_last_checkpoint,
true, /* sync_commit */
)?;
Expand Down
4 changes: 2 additions & 2 deletions storage/storage-interface/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ pub trait DbWriter: Send + Sync {
ledger_info_with_sigs: Option<&LedgerInfoWithSignatures>,
sync_commit: bool,
latest_in_memory_state: StateDelta,
state_updates_until_last_checkpoint: Option<ShardedStateUpdates>,
state_updates_until_last_checkpoint: Option<&ShardedStateUpdates>,
sharded_state_cache: Option<&ShardedStateCache>,
) -> Result<()> {
// For reconfig suffix.
Expand Down Expand Up @@ -596,7 +596,7 @@ pub trait DbWriter: Send + Sync {
base_state_version: Option<Version>,
sync_commit: bool,
latest_in_memory_state: StateDelta,
state_updates_until_last_checkpoint: Option<ShardedStateUpdates>,
state_updates_until_last_checkpoint: Option<&ShardedStateUpdates>,
sharded_state_cache: Option<&ShardedStateCache>,
) -> Result<()> {
unimplemented!()
Expand Down
2 changes: 1 addition & 1 deletion storage/storage-interface/src/state_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl StateDelta {

pub fn merge(&mut self, other: StateDelta) {
assert!(other.follow(self));
combine_sharded_state_updates(&mut self.updates_since_base, other.updates_since_base);
combine_sharded_state_updates(&mut self.updates_since_base, &other.updates_since_base);

self.current = other.current;
self.current_version = other.current_version;
Expand Down
17 changes: 3 additions & 14 deletions types/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,25 +102,14 @@ pub fn create_empty_sharded_state_updates() -> ShardedStateUpdates {
arr![HashMap::new(); 16]
}

pub fn combine_or_add_sharded_state_updates(
lhs: &mut Option<ShardedStateUpdates>,
rhs: ShardedStateUpdates,
) {
if let Some(lhs) = lhs {
combine_sharded_state_updates(lhs, rhs);
} else {
*lhs = Some(rhs);
}
}

pub fn combine_sharded_state_updates(lhs: &mut ShardedStateUpdates, rhs: ShardedStateUpdates) {
pub fn combine_sharded_state_updates(lhs: &mut ShardedStateUpdates, rhs: &ShardedStateUpdates) {
use rayon::prelude::*;

THREAD_MANAGER.get_exe_cpu_pool().install(|| {
lhs.par_iter_mut()
.zip_eq(rhs.into_par_iter())
.zip_eq(rhs.par_iter())
.for_each(|(l, r)| {
l.extend(r);
l.extend(r.clone());
})
})
}
Expand Down

0 comments on commit fa4d47c

Please sign in to comment.