Skip to content

Commit

Permalink
[Execution] Reset executor after buffer/rand manager.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind committed Oct 13, 2024
1 parent 4d9e0ae commit f231228
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl StateSyncManager {
// Log that we're starting to sync in fallback mode
info!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Started syncing in fallback mode! Syncing duration: {:?}!",
"Started syncing in fallback mode! Syncing duration: {:?} secs!",
self.consensus_observer_config
.observer_fallback_duration_secs
))
Expand Down
17 changes: 16 additions & 1 deletion consensus/src/pipeline/execution_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,24 @@ impl TExecutionClient for ExecutionProxyClient {
// Sync for the specified duration
let result = self.execution_proxy.sync_for_duration(duration).await;

// Reset the rand and buffer managers to the new synced round
info!("sync_for_duration result in TExecutionClient: {:?}", result);

// If the sync was successful, reset any pending state!
if let Ok(latest_synced_ledger_info) = &result {
info!(
"Resetting to the latest synced round: {:?}",
latest_synced_ledger_info
);

// Reset the rand and buffer managers to the new synced round
self.reset(latest_synced_ledger_info).await?;

info!("Resetting the execution proxy executor to the new synced round");

// Reset the execution proxy executor to the new synced round
self.execution_proxy.reset_executor()?;

info!("Finished resetting the execution proxy executor to the new synced round");
}

result
Expand Down
20 changes: 14 additions & 6 deletions consensus/src/state_computer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ impl ExecutionProxy {
},
)
}

/// Resets the executor
pub fn reset_executor(&self) -> Result<()> {
self.executor.reset()
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -328,7 +333,9 @@ impl StateComputer for ExecutionProxy {
Ok(())
}

/// Best effort state synchronization for the specified duration
/// Best effort state synchronization for the specified duration.
/// If the result is successful, the caller must also reset the block
/// executor manually, after the rand and buffer managers are reset.
async fn sync_for_duration(
&self,
duration: Duration,
Expand All @@ -355,17 +362,18 @@ impl StateComputer for ExecutionProxy {
self.state_sync_notifier.sync_for_duration(duration).await
);

info!(
"State sync for duration in StateComputer (ExecutionProxy) completed with result: {:?}",
result
);

// Update the latest logical time
if let Ok(latest_synced_ledger_info) = &result {
let ledger_info = latest_synced_ledger_info.ledger_info();
let synced_logical_time = LogicalTime::new(ledger_info.epoch(), ledger_info.round());
*latest_logical_time = synced_logical_time;
}

// Similarly, after state synchronization, we have to reset the cache of
// the BlockExecutor to guarantee the latest committed state is up to date.
self.executor.reset()?;

// Return the result
result.map_err(|error| {
let anyhow_error: anyhow::Error = error.into();
Expand Down Expand Up @@ -423,7 +431,7 @@ impl StateComputer for ExecutionProxy {

// Similarly, after state synchronization, we have to reset the cache of
// the BlockExecutor to guarantee the latest committed state is up to date.
self.executor.reset()?;
self.reset_executor()?;

// Return the result
result.map_err(|error| {
Expand Down
16 changes: 11 additions & 5 deletions execution/executor/src/components/block_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,6 @@ impl BlockTree {
Ok(Self { root, block_lookup })
}

pub fn reset(&self, db: &Arc<dyn DbReader>) -> Result<()> {
*self.root.lock() = Self::root_from_db(&self.block_lookup, db)?;
Ok(())
}

pub fn get_block(&self, id: HashValue) -> Result<Arc<Block>> {
Ok(self.get_blocks(&[id])?.pop().expect("Must exist."))
}
Expand All @@ -213,6 +208,17 @@ impl BlockTree {
let ledger_info = ledger_info_with_sigs.ledger_info();
let ledger_view = db.get_latest_executed_trees()?;

info!(
"Ledger info version: {:?}, ledger info: {:?}",
ledger_info.version(),
ledger_info
);
info!(
"Ledger view version: {:?}, ledger view: {:?}",
ledger_view.version(),
ledger_view
);

ensure!(
ledger_view.version() == Some(ledger_info.version()),
"Missing ledger info at the end of the ledger. latest version {:?}, LI version {}",
Expand Down

0 comments on commit f231228

Please sign in to comment.