diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 25c8010c1f71..4d967cf509ed 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -44,7 +44,10 @@ use reth_trie::HashedPostState; use std::{ collections::{BTreeMap, HashMap, HashSet}, ops::Bound, - sync::{mpsc::Receiver, Arc}, + sync::{ + mpsc::{Receiver, RecvError, RecvTimeoutError}, + Arc, + }, time::Instant, }; use tokio::sync::{ @@ -539,15 +542,54 @@ where /// /// This will block the current thread and process incoming messages. pub fn run(mut self) { - while let Ok(msg) = self.incoming.recv() { - self.run_once(msg); + loop { + match self.try_recv_engine_message() { + Ok(Some(msg)) => { + self.on_engine_message(msg); + } + Ok(None) => { + debug!(target: "engine", "received no engine message for some time, while waiting for persistence task to complete"); + } + Err(err) => { + error!(target: "engine", "Engine channel disconnected"); + return + } + } + + self.advance_persistence(); } } - /// Run the engine API handler once. - fn run_once(&mut self, msg: FromEngine>) { - self.on_engine_message(msg); + /// Attempts to receive the next engine request. + /// + /// If there's currently no persistence action in progress, this will block until a new request + /// is received. If there's a persistence action in progress, this will try to receive the + /// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is + /// received in time. + /// + /// Returns an error if the engine channel is disconnected. + fn try_recv_engine_message( + &self, + ) -> Result>>, RecvError> { + if self.persistence_state.in_progress() { + // try to receive the next request with a timeout to not block indefinitely + match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) { + Ok(msg) => Ok(Some(msg)), + Err(err) => match err { + RecvTimeoutError::Timeout => Ok(None), + RecvTimeoutError::Disconnected => Err(RecvError), + }, + } + } else { + self.incoming.recv().map(Some) + } + } + /// Attempts to advance the persistence state. + /// + /// If we're currently awaiting a response this will try to receive the response (non-blocking) + /// or send a new persistence action if necessary. + fn advance_persistence(&mut self) { if self.should_persist() && !self.persistence_state.in_progress() { let blocks_to_persist = self.get_canonical_blocks_to_persist(); if !blocks_to_persist.is_empty() {