Skip to content

Commit

Permalink
fix: use recv timeout if persistence task is active (#10087)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Aug 5, 2024
1 parent cf9b6bd commit 2d0c10e
Showing 1 changed file with 48 additions and 6 deletions.
54 changes: 48 additions & 6 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ use reth_trie::HashedPostState;
use std::{
collections::{BTreeMap, HashMap, HashSet},
ops::Bound,
sync::{mpsc::Receiver, Arc},
sync::{
mpsc::{Receiver, RecvError, RecvTimeoutError},
Arc,
},
time::Instant,
};
use tokio::sync::{
Expand Down Expand Up @@ -539,15 +542,54 @@ where
///
/// This will block the current thread and process incoming messages.
pub fn run(mut self) {
while let Ok(msg) = self.incoming.recv() {
self.run_once(msg);
loop {
match self.try_recv_engine_message() {
Ok(Some(msg)) => {
self.on_engine_message(msg);
}
Ok(None) => {
debug!(target: "engine", "received no engine message for some time, while waiting for persistence task to complete");
}
Err(err) => {
error!(target: "engine", "Engine channel disconnected");
return
}
}

self.advance_persistence();
}
}

/// Run the engine API handler once.
fn run_once(&mut self, msg: FromEngine<BeaconEngineMessage<T>>) {
self.on_engine_message(msg);
/// Attempts to receive the next engine request.
///
/// If there's currently no persistence action in progress, this will block until a new request
/// is received. If there's a persistence action in progress, this will try to receive the
/// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is
/// received in time.
///
/// Returns an error if the engine channel is disconnected.
fn try_recv_engine_message(
&self,
) -> Result<Option<FromEngine<BeaconEngineMessage<T>>>, RecvError> {
if self.persistence_state.in_progress() {
// try to receive the next request with a timeout to not block indefinitely
match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
Ok(msg) => Ok(Some(msg)),
Err(err) => match err {
RecvTimeoutError::Timeout => Ok(None),
RecvTimeoutError::Disconnected => Err(RecvError),
},
}
} else {
self.incoming.recv().map(Some)
}
}

/// Attempts to advance the persistence state.
///
/// If we're currently awaiting a response this will try to receive the response (non-blocking)
/// or send a new persistence action if necessary.
fn advance_persistence(&mut self) {
if self.should_persist() && !self.persistence_state.in_progress() {
let blocks_to_persist = self.get_canonical_blocks_to_persist();
if !blocks_to_persist.is_empty() {
Expand Down

0 comments on commit 2d0c10e

Please sign in to comment.