Skip to content

Commit

Permalink
use thread instead of tokio task
Browse files Browse the repository at this point in the history
  • Loading branch information
fgimenez committed Nov 6, 2024
1 parent 0f1a456 commit 6586622
Showing 1 changed file with 22 additions and 14 deletions.
36 changes: 22 additions & 14 deletions crates/engine/tree/src/tree/root.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
//! State root task related functionality.

use futures::StreamExt;
use reth_provider::providers::ConsistentDbView;
use reth_trie::{updates::TrieUpdates, TrieInput};
use reth_trie_parallel::parallel_root::ParallelStateRootError;
Expand All @@ -21,7 +20,6 @@ pub(crate) struct StateRootHandle {

impl StateRootHandle {
/// Waits for the state root calculation to complete.
#[allow(dead_code)]
pub(crate) fn wait_result(self) -> StateRootResult {
self.rx.recv().expect("state root task was dropped without sending result")
}
Expand All @@ -35,7 +33,6 @@ impl StateRootHandle {
/// fetches the proofs for relevant accounts from the database and reveal them
/// to the tree.
/// Then it updates relevant leaves according to the result of the transaction.
#[allow(dead_code)]
pub(crate) struct StateRootTask<Factory> {
/// View over the state in the database.
consistent_view: ConsistentDbView<Factory>,
Expand All @@ -60,27 +57,38 @@ where
}

/// Spawns the state root task and returns a handle to await its result.
pub(crate) fn spawn(mut self) -> StateRootHandle {
pub(crate) fn spawn(self) -> StateRootHandle {
let (tx, rx) = mpsc::channel();

// Spawn the task that will process state updates and calculate the root
tokio::spawn(async move {
debug!(target: "engine::tree", "Starting state root task");
let result = self.run().await;
let _ = tx.send(result);
});

std::thread::Builder::new()
.name("State Root Task".to_string())
.spawn(move || {
debug!(target: "engine::tree", "Starting state root task");
let result = self.run();
let _ = tx.send(result);
})
.expect("failed to spawn state root thread");
StateRootHandle { rx }
}

/// Handles state updates.
fn on_state_update(&self, _update: EvmState) {
fn on_state_update(
_view: &ConsistentDbView<Factory>,
_input: &Arc<TrieInput>,
_state: EvmState,
) {
// TODO: calculate hashed state update and dispatch proof gathering for it.
}

async fn run(&mut self) -> StateRootResult {
while let Some(state) = self.state_stream.next().await {
self.on_state_update(state);
fn run(self) -> StateRootResult {
let Self { state_stream, consistent_view, input } = self;

let mut receiver = state_stream.into_inner();

// Process all items until the channel is closed
while let Ok(state) = receiver.try_recv() {
Self::on_state_update(&consistent_view, &input, state);
}

// TODO:
Expand Down

0 comments on commit 6586622

Please sign in to comment.