From a9003b9ec1ecf36432fe2206e574789f805fe3de Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Wed, 13 Nov 2024 16:44:20 +0100 Subject: [PATCH 1/2] refactor(sync/track): rayonize class hash verification and class compilation --- .../pathfinder/src/sync/class_definitions.rs | 25 +++++++++++++------ crates/pathfinder/src/sync/track.rs | 4 +-- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/crates/pathfinder/src/sync/class_definitions.rs b/crates/pathfinder/src/sync/class_definitions.rs index 47df093fd9..4a40b64c18 100644 --- a/crates/pathfinder/src/sync/class_definitions.rs +++ b/crates/pathfinder/src/sync/class_definitions.rs @@ -181,11 +181,17 @@ pub struct VerifyHash; impl ProcessStage for VerifyHash { const NAME: &'static str = "Class::VerifyHash"; - type Input = ClassWithLayout; - type Output = Class; + type Input = Vec; + type Output = Vec; fn map(&mut self, peer: &PeerId, input: Self::Input) -> Result { - verify_hash_impl(peer, input) + input + .into_par_iter() + .map(|class| { + let compiled = verify_hash_impl(peer, class)?; + Ok(compiled) + }) + .collect::, SyncError>>() } } @@ -526,12 +532,17 @@ impl CompileSierraToCasm { impl ProcessStage for CompileSierraToCasm { const NAME: &'static str = "Class::CompileSierraToCasm"; - type Input = Class; - type Output = CompiledClass; + type Input = Vec; + type Output = Vec; fn map(&mut self, _: &PeerId, input: Self::Input) -> Result { - let compiled = compile_or_fetch_impl(input, &self.fgw, &self.tokio_handle)?; - Ok(compiled) + input + .into_par_iter() + .map(|class| { + let compiled = compile_or_fetch_impl(class, &self.fgw, &self.tokio_handle)?; + Ok(compiled) + }) + .collect::, SyncError>>() } } diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index 440caaf4ff..9238f69a70 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -144,8 +144,8 @@ impl Sync { } .spawn() .pipe_each(class_definitions::VerifyLayout, 10) - .pipe_each(class_definitions::VerifyHash, 10) - .pipe_each( + .pipe(class_definitions::VerifyHash, 10) + .pipe( class_definitions::CompileSierraToCasm::new(fgw, tokio::runtime::Handle::current()), 10, ) From 0993d5b5643fecf0d7c8fcea4a09c60302d23eae Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Wed, 13 Nov 2024 16:44:43 +0100 Subject: [PATCH 2/2] refactor(sync/track): remove pipe_each, rayonize layout verification --- .../pathfinder/src/sync/class_definitions.rs | 9 ++- crates/pathfinder/src/sync/stream.rs | 55 ------------------- crates/pathfinder/src/sync/track.rs | 2 +- 3 files changed, 7 insertions(+), 59 deletions(-) diff --git a/crates/pathfinder/src/sync/class_definitions.rs b/crates/pathfinder/src/sync/class_definitions.rs index 4a40b64c18..131f72e958 100644 --- a/crates/pathfinder/src/sync/class_definitions.rs +++ b/crates/pathfinder/src/sync/class_definitions.rs @@ -124,11 +124,14 @@ pub struct VerifyLayout; impl ProcessStage for VerifyLayout { const NAME: &'static str = "Class::VerifyLayout"; - type Input = P2PClassDefinition; - type Output = ClassWithLayout; + type Input = Vec; + type Output = Vec; fn map(&mut self, peer: &PeerId, input: Self::Input) -> Result { - verify_layout_impl(peer, input) + input + .into_par_iter() + .map(|class| verify_layout_impl(peer, class)) + .collect() } } diff --git a/crates/pathfinder/src/sync/stream.rs b/crates/pathfinder/src/sync/stream.rs index ad68ec1e1c..6d6404340c 100644 --- a/crates/pathfinder/src/sync/stream.rs +++ b/crates/pathfinder/src/sync/stream.rs @@ -66,61 +66,6 @@ impl SyncReceiver { self.pipe_impl(stage, buffer, |_| 1) } - /// Similar to [SyncReceiver::pipe], but processes each element in the - /// input individually. - pub fn pipe_each(mut self, mut stage: S, buffer: usize) -> SyncReceiver> - where - T: IntoIterator + Send + 'static, - S: ProcessStage + Send + 'static, - S::Output: Send, - { - let (tx, rx) = tokio::sync::mpsc::channel(buffer); - - std::thread::spawn(move || { - let queue_capacity = self.inner.max_capacity(); - - while let Some(input) = self.inner.blocking_recv() { - let result = match input { - Ok(PeerData { peer, data }) => { - // Stats for tracing and metrics. - let t = std::time::Instant::now(); - - // Process the data. - let output: Result, _> = data - .into_iter() - .map(|data| { - stage.map(&peer, data).inspect_err(|error| { - tracing::debug!(%error, "Processing item failed"); - }) - }) - .collect(); - let output = output.map(|x| PeerData::new(peer, x)); - - // Log trace and metrics. - let elements_per_sec = 1.0 / t.elapsed().as_secs_f32(); - let queue_fullness = queue_capacity - self.inner.capacity(); - let input_queue = Fullness(queue_fullness, queue_capacity); - tracing::debug!( - "Stage: {}, queue: {}, {elements_per_sec:.0} items/s", - S::NAME, - input_queue - ); - - output - } - Err(e) => Err(e), - }; - - let is_err = result.is_err(); - if tx.blocking_send(result).is_err() || is_err { - return; - } - } - }); - - SyncReceiver::from_receiver(rx) - } - /// A private impl which hides the ugly `count_fn` used to differentiate /// between processing a single element from [SyncReceiver] and multiple /// elements from [ChunkSyncReceiver]. diff --git a/crates/pathfinder/src/sync/track.rs b/crates/pathfinder/src/sync/track.rs index 9238f69a70..070110629a 100644 --- a/crates/pathfinder/src/sync/track.rs +++ b/crates/pathfinder/src/sync/track.rs @@ -143,7 +143,7 @@ impl Sync { start: next, } .spawn() - .pipe_each(class_definitions::VerifyLayout, 10) + .pipe(class_definitions::VerifyLayout, 10) .pipe(class_definitions::VerifyHash, 10) .pipe( class_definitions::CompileSierraToCasm::new(fgw, tokio::runtime::Handle::current()),