From 658662273db393a92a8cd36e048dfd48bb04d8cb Mon Sep 17 00:00:00 2001 From: Federico Gimenez Date: Wed, 6 Nov 2024 11:17:40 +0100 Subject: [PATCH] use thread instead of tokio task --- crates/engine/tree/src/tree/root.rs | 36 ++++++++++++++++++----------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index f3d51089d3be0..eac9dea1ce979 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -1,6 +1,5 @@ //! State root task related functionality. -use futures::StreamExt; use reth_provider::providers::ConsistentDbView; use reth_trie::{updates::TrieUpdates, TrieInput}; use reth_trie_parallel::parallel_root::ParallelStateRootError; @@ -21,7 +20,6 @@ pub(crate) struct StateRootHandle { impl StateRootHandle { /// Waits for the state root calculation to complete. - #[allow(dead_code)] pub(crate) fn wait_result(self) -> StateRootResult { self.rx.recv().expect("state root task was dropped without sending result") } @@ -35,7 +33,6 @@ impl StateRootHandle { /// fetches the proofs for relevant accounts from the database and reveal them /// to the tree. /// Then it updates relevant leaves according to the result of the transaction. -#[allow(dead_code)] pub(crate) struct StateRootTask { /// View over the state in the database. consistent_view: ConsistentDbView, @@ -60,27 +57,38 @@ where } /// Spawns the state root task and returns a handle to await its result. - pub(crate) fn spawn(mut self) -> StateRootHandle { + pub(crate) fn spawn(self) -> StateRootHandle { let (tx, rx) = mpsc::channel(); // Spawn the task that will process state updates and calculate the root - tokio::spawn(async move { - debug!(target: "engine::tree", "Starting state root task"); - let result = self.run().await; - let _ = tx.send(result); - }); - + std::thread::Builder::new() + .name("State Root Task".to_string()) + .spawn(move || { + debug!(target: "engine::tree", "Starting state root task"); + let result = self.run(); + let _ = tx.send(result); + }) + .expect("failed to spawn state root thread"); StateRootHandle { rx } } /// Handles state updates. - fn on_state_update(&self, _update: EvmState) { + fn on_state_update( + _view: &ConsistentDbView, + _input: &Arc, + _state: EvmState, + ) { // TODO: calculate hashed state update and dispatch proof gathering for it. } - async fn run(&mut self) -> StateRootResult { - while let Some(state) = self.state_stream.next().await { - self.on_state_update(state); + fn run(self) -> StateRootResult { + let Self { state_stream, consistent_view, input } = self; + + let mut receiver = state_stream.into_inner(); + + // Process all items until the channel is closed + while let Ok(state) = receiver.try_recv() { + Self::on_state_update(&consistent_view, &input, state); } // TODO: