Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use recv timeout if persistence task is active #10087

Merged
merged 1 commit into from
Aug 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 48 additions & 6 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ use reth_trie::HashedPostState;
use std::{
collections::{BTreeMap, HashMap, HashSet},
ops::Bound,
sync::{mpsc::Receiver, Arc},
sync::{
mpsc::{Receiver, RecvError, RecvTimeoutError},
Arc,
},
time::Instant,
};
use tokio::sync::{
Expand Down Expand Up @@ -539,15 +542,54 @@ where
///
/// This will block the current thread and process incoming messages.
pub fn run(mut self) {
while let Ok(msg) = self.incoming.recv() {
self.run_once(msg);
loop {
match self.try_recv_engine_message() {
Ok(Some(msg)) => {
self.on_engine_message(msg);
}
Ok(None) => {
debug!(target: "engine", "received no engine message for some time, while waiting for persistence task to complete");
}
Err(err) => {
error!(target: "engine", "Engine channel disconnected");
return
}
}

self.advance_persistence();
}
}

/// Run the engine API handler once.
fn run_once(&mut self, msg: FromEngine<BeaconEngineMessage<T>>) {
self.on_engine_message(msg);
/// Attempts to receive the next engine request.
///
/// If there's currently no persistence action in progress, this will block until a new request
/// is received. If there's a persistence action in progress, this will try to receive the
/// next request with a timeout to not block indefinitely and return `Ok(None)` if no request is
/// received in time.
///
/// Returns an error if the engine channel is disconnected.
fn try_recv_engine_message(
&self,
) -> Result<Option<FromEngine<BeaconEngineMessage<T>>>, RecvError> {
if self.persistence_state.in_progress() {
// try to receive the next request with a timeout to not block indefinitely
match self.incoming.recv_timeout(std::time::Duration::from_millis(500)) {
Ok(msg) => Ok(Some(msg)),
Err(err) => match err {
RecvTimeoutError::Timeout => Ok(None),
RecvTimeoutError::Disconnected => Err(RecvError),
},
}
} else {
self.incoming.recv().map(Some)
}
}

/// Attempts to advance the persistence state.
///
/// If we're currently awaiting a response this will try to receive the response (non-blocking)
/// or send a new persistence action if necessary.
fn advance_persistence(&mut self) {
if self.should_persist() && !self.persistence_state.in_progress() {
let blocks_to_persist = self.get_canonical_blocks_to_persist();
if !blocks_to_persist.is_empty() {
Expand Down
Loading