Skip to content

Commit

Permalink
[Consensus] Rename sync_to() to sync_to_target()
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Oct 16, 2024
1 parent 0e4f5df commit 1373b32
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 21 deletions.
2 changes: 1 addition & 1 deletion consensus/src/block_storage/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ impl BlockStore {
storage.save_tree(blocks.clone(), quorum_certs.clone())?;

execution_client
.sync_to(highest_commit_cert.ledger_info().clone())
.sync_to_target(highest_commit_cert.ledger_info().clone())
.await?;

// we do not need to update block_tree.highest_commit_decision_ledger_info here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ fn sync_to_commit_decision(
// Sync to the commit decision
if let Err(error) = execution_client
.clone()
.sync_to(commit_decision.commit_proof().clone())
.sync_to_target(commit_decision.commit_proof().clone())
.await
{
warn!(
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/dag_state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ impl DagStateSynchronizer {
},
}

self.execution_client.sync_to(commit_li).await?;
self.execution_client.sync_to_target(commit_li).await?;

let inner =
Arc::into_inner(sync_dag_store).expect("Only one strong reference should exists");
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
// make sure storage is on this ledger_info too, it should be no-op if it's already committed
// panic if this doesn't succeed since the current processors are already shutdown.
self.execution_client
.sync_to(ledger_info.clone())
.sync_to_target(ledger_info.clone())
.await
.context(format!(
"[EpochManager] State sync to new epoch {}",
Expand Down
14 changes: 7 additions & 7 deletions consensus/src/pipeline/execution_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ pub trait TExecutionClient: Send + Sync {
commit_msg: IncomingCommitRequest,
) -> Result<()>;

/// Synchronize to a commit that not present locally.
async fn sync_to(&self, target: LedgerInfoWithSignatures) -> Result<(), StateSyncError>;
/// Synchronize to a commit that is not present locally.
async fn sync_to_target(&self, target: LedgerInfoWithSignatures) -> Result<(), StateSyncError>;

/// Resets the internal state of the rand and buffer managers.
async fn reset(&self, target: &LedgerInfoWithSignatures) -> Result<()>;
Expand Down Expand Up @@ -400,17 +400,17 @@ impl TExecutionClient for ExecutionProxyClient {
}
}

async fn sync_to(&self, target: LedgerInfoWithSignatures) -> Result<(), StateSyncError> {
fail_point!("consensus::sync_to", |_| {
Err(anyhow::anyhow!("Injected error in sync_to").into())
async fn sync_to_target(&self, target: LedgerInfoWithSignatures) -> Result<(), StateSyncError> {
fail_point!("consensus::sync_to_target", |_| {
Err(anyhow::anyhow!("Injected error in sync_to_target").into())
});

// Reset the rand and buffer managers to the target round
self.reset(&target).await?;

// TODO: handle the sync error, should re-push the ordered blocks to buffer manager
// when it's reset but sync fails.
self.execution_proxy.sync_to(target).await?;
self.execution_proxy.sync_to_target(target).await?;
Ok(())
}

Expand Down Expand Up @@ -523,7 +523,7 @@ impl TExecutionClient for DummyExecutionClient {
Ok(())
}

async fn sync_to(&self, _: LedgerInfoWithSignatures) -> Result<(), StateSyncError> {
async fn sync_to_target(&self, _: LedgerInfoWithSignatures) -> Result<(), StateSyncError> {
Ok(())
}

Expand Down
12 changes: 6 additions & 6 deletions consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ impl StateComputer for ExecutionProxy {
}

/// Synchronize to a commit that not present locally.
async fn sync_to(&self, target: LedgerInfoWithSignatures) -> Result<(), StateSyncError> {
async fn sync_to_target(&self, target: LedgerInfoWithSignatures) -> Result<(), StateSyncError> {
let mut latest_logical_time = self.write_mutex.lock().await;
let logical_time =
LogicalTime::new(target.ledger_info().epoch(), target.ledger_info().round());
Expand Down Expand Up @@ -353,16 +353,16 @@ impl StateComputer for ExecutionProxy {
.notify_commit(block_timestamp, Vec::new());
}

fail_point!("consensus::sync_to", |_| {
Err(anyhow::anyhow!("Injected error in sync_to").into())
fail_point!("consensus::sync_to_target", |_| {
Err(anyhow::anyhow!("Injected error in sync_to_target").into())
});
// Here to start to do state synchronization where ChunkExecutor inside will
// process chunks and commit to Storage. However, after block execution and
// commitments, the sync state of ChunkExecutor may be not up to date so
// it is required to reset the cache of ChunkExecutor in State Sync
// when requested to sync.
let res = monitor!(
"sync_to",
"sync_to_target",
self.state_sync_notifier.sync_to_target(target).await
);
*latest_logical_time = logical_time;
Expand Down Expand Up @@ -574,8 +574,8 @@ async fn test_commit_sync_race() {
.commit(&[], generate_li(1, 10), callback)
.await
.unwrap();
assert!(executor.sync_to(generate_li(1, 8)).await.is_ok());
assert!(executor.sync_to_target(generate_li(1, 8)).await.is_ok());
assert_eq!(*recorded_commit.time.lock(), LogicalTime::new(1, 10));
assert!(executor.sync_to(generate_li(2, 8)).await.is_ok());
assert!(executor.sync_to_target(generate_li(2, 8)).await.is_ok());
assert_eq!(*recorded_commit.time.lock(), LogicalTime::new(2, 8));
}
2 changes: 1 addition & 1 deletion consensus/src/state_replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub trait StateComputer: Send + Sync {
/// In case of success (`Result::Ok`) the LI of storage is at the given target.
/// In case of failure (`Result::Error`) the LI of storage remains unchanged, and the validator
/// can assume there were no modifications to the storage made.
async fn sync_to(&self, target: LedgerInfoWithSignatures) -> Result<(), StateSyncError>;
async fn sync_to_target(&self, target: LedgerInfoWithSignatures) -> Result<(), StateSyncError>;

// Reconfigure to execute transactions for a new epoch.
fn new_epoch(
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/test_utils/mock_execution_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl TExecutionClient for MockExecutionClient {
Ok(())
}

async fn sync_to(&self, commit: LedgerInfoWithSignatures) -> Result<(), StateSyncError> {
async fn sync_to_target(&self, commit: LedgerInfoWithSignatures) -> Result<(), StateSyncError> {
debug!(
"Fake sync to block id {}",
commit.ledger_info().consensus_block_id()
Expand Down
10 changes: 8 additions & 2 deletions consensus/src/test_utils/mock_state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ impl StateComputer for EmptyStateComputer {
Ok(())
}

async fn sync_to(&self, _commit: LedgerInfoWithSignatures) -> Result<(), StateSyncError> {
async fn sync_to_target(
&self,
_target: LedgerInfoWithSignatures,
) -> Result<(), StateSyncError> {
Ok(())
}

Expand Down Expand Up @@ -141,7 +144,10 @@ impl StateComputer for RandomComputeResultStateComputer {
Ok(())
}

async fn sync_to(&self, _commit: LedgerInfoWithSignatures) -> Result<(), StateSyncError> {
async fn sync_to_target(
&self,
_target: LedgerInfoWithSignatures,
) -> Result<(), StateSyncError> {
Ok(())
}

Expand Down

0 comments on commit 1373b32

Please sign in to comment.