Skip to content

Commit

Permalink
Merge pull request #2374 from eqlabs/chris/pipe-each-unordered
Browse files Browse the repository at this point in the history
rayonize class hash verification and class compilation in track sync, remove pipe_each
  • Loading branch information
CHr15F0x authored Nov 15, 2024
2 parents 7395898 + 0993d5b commit c832542
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 68 deletions.
34 changes: 24 additions & 10 deletions crates/pathfinder/src/sync/class_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,14 @@ pub struct VerifyLayout;
impl ProcessStage for VerifyLayout {
const NAME: &'static str = "Class::VerifyLayout";

type Input = P2PClassDefinition;
type Output = ClassWithLayout;
type Input = Vec<P2PClassDefinition>;
type Output = Vec<ClassWithLayout>;

fn map(&mut self, peer: &PeerId, input: Self::Input) -> Result<Self::Output, SyncError> {
verify_layout_impl(peer, input)
input
.into_par_iter()
.map(|class| verify_layout_impl(peer, class))
.collect()
}
}

Expand Down Expand Up @@ -181,11 +184,17 @@ pub struct VerifyHash;
impl ProcessStage for VerifyHash {
const NAME: &'static str = "Class::VerifyHash";

type Input = ClassWithLayout;
type Output = Class;
type Input = Vec<ClassWithLayout>;
type Output = Vec<Class>;

fn map(&mut self, peer: &PeerId, input: Self::Input) -> Result<Self::Output, SyncError> {
verify_hash_impl(peer, input)
input
.into_par_iter()
.map(|class| {
let compiled = verify_hash_impl(peer, class)?;
Ok(compiled)
})
.collect::<Result<Vec<Class>, SyncError>>()
}
}

Expand Down Expand Up @@ -418,12 +427,17 @@ impl<T> CompileSierraToCasm<T> {
impl<T: GatewayApi + Clone + Send + 'static> ProcessStage for CompileSierraToCasm<T> {
const NAME: &'static str = "Class::CompileSierraToCasm";

type Input = Class;
type Output = CompiledClass;
type Input = Vec<Class>;
type Output = Vec<CompiledClass>;

fn map(&mut self, _: &PeerId, input: Self::Input) -> Result<Self::Output, SyncError> {
let compiled = compile_or_fetch_impl(input, &self.fgw, &self.tokio_handle)?;
Ok(compiled)
input
.into_par_iter()
.map(|class| {
let compiled = compile_or_fetch_impl(class, &self.fgw, &self.tokio_handle)?;
Ok(compiled)
})
.collect::<Result<Vec<CompiledClass>, SyncError>>()
}
}

Expand Down
55 changes: 0 additions & 55 deletions crates/pathfinder/src/sync/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,61 +66,6 @@ impl<T: Send + 'static> SyncReceiver<T> {
self.pipe_impl(stage, buffer, |_| 1)
}

/// Similar to [SyncReceiver::pipe], but processes each element in the
/// input individually.
pub fn pipe_each<S, U>(mut self, mut stage: S, buffer: usize) -> SyncReceiver<Vec<S::Output>>
where
T: IntoIterator<Item = U> + Send + 'static,
S: ProcessStage<Input = U> + Send + 'static,
S::Output: Send,
{
let (tx, rx) = tokio::sync::mpsc::channel(buffer);

std::thread::spawn(move || {
let queue_capacity = self.inner.max_capacity();

while let Some(input) = self.inner.blocking_recv() {
let result = match input {
Ok(PeerData { peer, data }) => {
// Stats for tracing and metrics.
let t = std::time::Instant::now();

// Process the data.
let output: Result<Vec<_>, _> = data
.into_iter()
.map(|data| {
stage.map(&peer, data).inspect_err(|error| {
tracing::debug!(%error, "Processing item failed");
})
})
.collect();
let output = output.map(|x| PeerData::new(peer, x));

// Log trace and metrics.
let elements_per_sec = 1.0 / t.elapsed().as_secs_f32();
let queue_fullness = queue_capacity - self.inner.capacity();
let input_queue = Fullness(queue_fullness, queue_capacity);
tracing::debug!(
"Stage: {}, queue: {}, {elements_per_sec:.0} items/s",
S::NAME,
input_queue
);

output
}
Err(e) => Err(e),
};

let is_err = result.is_err();
if tx.blocking_send(result).is_err() || is_err {
return;
}
}
});

SyncReceiver::from_receiver(rx)
}

/// A private impl which hides the ugly `count_fn` used to differentiate
/// between processing a single element from [SyncReceiver] and multiple
/// elements from [ChunkSyncReceiver].
Expand Down
6 changes: 3 additions & 3 deletions crates/pathfinder/src/sync/track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ impl<L, P> Sync<L, P> {
start: next,
}
.spawn()
.pipe_each(class_definitions::VerifyLayout, 10)
.pipe_each(class_definitions::VerifyHash, 10)
.pipe_each(
.pipe(class_definitions::VerifyLayout, 10)
.pipe(class_definitions::VerifyHash, 10)
.pipe(
class_definitions::CompileSierraToCasm::new(fgw, tokio::runtime::Handle::current()),
10,
)
Expand Down

0 comments on commit c832542

Please sign in to comment.