From 245e2cc364b93d63128713e00299dd6d1f420a8c Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Wed, 20 Jul 2022 19:16:04 -0300 Subject: [PATCH 1/5] try to do deserialization of transaction in a rayon thread --- Cargo.lock | 1 + zebra-network/Cargo.toml | 1 + zebra-network/src/protocol/external/codec.rs | 35 ++++++++++++++++++-- 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 978dc716d9d..b320b67e14e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6408,6 +6408,7 @@ dependencies = [ "proptest", "proptest-derive", "rand 0.8.5", + "rayon", "regex", "serde", "static_assertions", diff --git a/zebra-network/Cargo.toml b/zebra-network/Cargo.toml index b1bba7858c0..cb3206c8b3d 100644 --- a/zebra-network/Cargo.toml +++ b/zebra-network/Cargo.toml @@ -24,6 +24,7 @@ lazy_static = "1.4.0" ordered-map = "0.4.2" pin-project = "1.0.10" rand = { version = "0.8.5", package = "rand" } +rayon = "1.5.3" regex = "1.5.6" serde = { version = "1.0.137", features = ["serde_derive"] } thiserror = "1.0.31" diff --git a/zebra-network/src/protocol/external/codec.rs b/zebra-network/src/protocol/external/codec.rs index d249eca09d3..c96fb4860b8 100644 --- a/zebra-network/src/protocol/external/codec.rs +++ b/zebra-network/src/protocol/external/codec.rs @@ -4,12 +4,15 @@ use std::{ cmp::min, convert::TryInto, fmt, + future::Future, io::{Cursor, Read, Write}, }; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use bytes::{BufMut, BytesMut}; use chrono::{TimeZone, Utc}; +use futures::FutureExt; +use tokio::sync::oneshot; use tokio_util::codec::{Decoder, Encoder}; use zebra_chain::{ @@ -625,8 +628,15 @@ impl Codec { Ok(Message::NotFound(Vec::zcash_deserialize(reader)?)) } - fn read_tx(&self, reader: R) -> Result { - Ok(Message::Tx(Transaction::zcash_deserialize(reader)?.into())) + fn read_tx(&self, reader: R) -> Result { + let (tx, rx) = oneshot::channel::>(); + + let _ = Self::deserialize_transaction_spawning(reader, tx); + let result = rx + .blocking_recv() + .expect("we just sent so we should have results"); + + Ok(Message::Tx(result?.into())) } fn read_mempool(&self, mut _reader: R) -> Result { @@ -674,6 +684,27 @@ impl Codec { fn read_filterclear(&self, mut _reader: R) -> Result { Ok(Message::FilterClear) } + + /// TBA + fn deserialize_transaction_spawning( + reader: R, + tx: oneshot::Sender>, + ) -> impl Future { + // Correctness: TBA + tokio::task::spawn_blocking(|| { + rayon::scope_fifo(|s| s.spawn_fifo(|_s| Self::deserialize_transaction(reader, tx))) + }) + .map(|join_result| join_result.expect("panic in the transaction deserialization")) + } + + /// TBA + fn deserialize_transaction( + reader: R, + tx: oneshot::Sender>, + ) { + let result = Transaction::zcash_deserialize(reader); + let _ = tx.send(result); + } } // XXX replace these interior unit tests with exterior integration tests + proptest From 47156967f855a57d54c6e778607de1379e0d9da8 Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 21 Jul 2022 10:36:07 +1000 Subject: [PATCH 2/5] Try tokio::task::block_in_place instead --- zebra-network/src/protocol/external/codec.rs | 45 +++++++++----------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/zebra-network/src/protocol/external/codec.rs b/zebra-network/src/protocol/external/codec.rs index c96fb4860b8..bc208501dd0 100644 --- a/zebra-network/src/protocol/external/codec.rs +++ b/zebra-network/src/protocol/external/codec.rs @@ -4,15 +4,12 @@ use std::{ cmp::min, convert::TryInto, fmt, - future::Future, io::{Cursor, Read, Write}, }; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; use bytes::{BufMut, BytesMut}; use chrono::{TimeZone, Utc}; -use futures::FutureExt; -use tokio::sync::oneshot; use tokio_util::codec::{Decoder, Encoder}; use zebra_chain::{ @@ -628,13 +625,8 @@ impl Codec { Ok(Message::NotFound(Vec::zcash_deserialize(reader)?)) } - fn read_tx(&self, reader: R) -> Result { - let (tx, rx) = oneshot::channel::>(); - - let _ = Self::deserialize_transaction_spawning(reader, tx); - let result = rx - .blocking_recv() - .expect("we just sent so we should have results"); + fn read_tx(&self, reader: R) -> Result { + let result = Self::deserialize_transaction_spawning(reader); Ok(Message::Tx(result?.into())) } @@ -686,24 +678,27 @@ impl Codec { } /// TBA - fn deserialize_transaction_spawning( + #[allow(clippy::unwrap_in_result)] + fn deserialize_transaction_spawning( reader: R, - tx: oneshot::Sender>, - ) -> impl Future { + ) -> Result { + let mut result = None; + // Correctness: TBA - tokio::task::spawn_blocking(|| { - rayon::scope_fifo(|s| s.spawn_fifo(|_s| Self::deserialize_transaction(reader, tx))) - }) - .map(|join_result| join_result.expect("panic in the transaction deserialization")) - } + // + // Since we use `block_in_place()`, other futures running on the connection task will be blocked: + // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html + // + // We can't use `spawn_blocking()` because: + // - The `reader` has a lifetime (but we could replace it with a `Vec` of message data) + // - There is no way to check the blocking task's future for panics + tokio::task::block_in_place(|| { + rayon::in_place_scope_fifo(|s| { + s.spawn_fifo(|_s| result = Some(Transaction::zcash_deserialize(reader))) + }) + }); - /// TBA - fn deserialize_transaction( - reader: R, - tx: oneshot::Sender>, - ) { - let result = Transaction::zcash_deserialize(reader); - let _ = tx.send(result); + result.expect("scope has already finished") } } From 7866ba372ead4e914b884b33ce99e69da19bcec0 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Thu, 21 Jul 2022 11:22:04 -0300 Subject: [PATCH 3/5] fix tests --- zebra-network/src/protocol/external/codec.rs | 11 ++++++----- zebrad/src/components/inbound/tests/real_peer_set.rs | 4 ++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/zebra-network/src/protocol/external/codec.rs b/zebra-network/src/protocol/external/codec.rs index bc208501dd0..d0527f8c6aa 100644 --- a/zebra-network/src/protocol/external/codec.rs +++ b/zebra-network/src/protocol/external/codec.rs @@ -969,7 +969,8 @@ mod tests { fn max_msg_size_round_trip() { use zebra_chain::serialization::ZcashDeserializeInto; - let rt = zebra_test::init_async(); + //let rt = zebra_test::init_async(); + zebra_test::init(); // make tests with a Tx message let tx: Transaction = zebra_test::vectors::DUMMY_TX1 @@ -983,7 +984,7 @@ mod tests { let size = 85; // reducing the max size to body size - 1 - rt.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let mut bytes = Vec::new(); { let mut fw = FramedWrite::new( @@ -997,7 +998,7 @@ mod tests { }); // send again with the msg body size as max size - let msg_bytes = rt.block_on(async { + let msg_bytes = zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let mut bytes = Vec::new(); { let mut fw = FramedWrite::new( @@ -1012,7 +1013,7 @@ mod tests { }); // receive with a reduced max size - rt.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let mut fr = FramedRead::new( Cursor::new(&msg_bytes), Codec::builder().with_max_body_len(size - 1).finish(), @@ -1024,7 +1025,7 @@ mod tests { }); // receive again with the tx size as max size - rt.block_on(async { + zebra_test::MULTI_THREADED_RUNTIME.block_on(async { let mut fr = FramedRead::new( Cursor::new(&msg_bytes), Codec::builder().with_max_body_len(size).finish(), diff --git a/zebrad/src/components/inbound/tests/real_peer_set.rs b/zebrad/src/components/inbound/tests/real_peer_set.rs index dc5ef009669..cef025d09b5 100644 --- a/zebrad/src/components/inbound/tests/real_peer_set.rs +++ b/zebrad/src/components/inbound/tests/real_peer_set.rs @@ -336,7 +336,7 @@ async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> { /// /// Uses a Zebra network stack's peer set to query an isolated Zebra TCP connection, /// with an unrelated transaction test responder. -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn outbound_tx_unrelated_response_notfound() -> Result<(), crate::BoxError> { // We respond with an unrelated transaction, so the peer gives up on the request. let unrelated_response: Transaction = @@ -486,7 +486,7 @@ async fn outbound_tx_unrelated_response_notfound() -> Result<(), crate::BoxError /// - returns a `NotFoundRegistry` error for repeated requests to a non-responsive peer. /// /// The requests are coming from the full stack to the isolated peer. -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn outbound_tx_partial_response_notfound() -> Result<(), crate::BoxError> { // We repeatedly respond with the same transaction, so the peer gives up on the second response. let repeated_tx: Transaction = zebra_test::vectors::DUMMY_TX1.zcash_deserialize_into()?; From 4bad83ace7142366424e0e7b83672fae33b08c7b Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Thu, 21 Jul 2022 16:26:02 -0300 Subject: [PATCH 4/5] add deserialize block into rayon pool --- zebra-network/src/protocol/external/codec.rs | 28 ++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/zebra-network/src/protocol/external/codec.rs b/zebra-network/src/protocol/external/codec.rs index d0527f8c6aa..2aee641982f 100644 --- a/zebra-network/src/protocol/external/codec.rs +++ b/zebra-network/src/protocol/external/codec.rs @@ -570,8 +570,10 @@ impl Codec { Ok(Message::GetAddr) } - fn read_block(&self, reader: R) -> Result { - Ok(Message::Block(Block::zcash_deserialize(reader)?.into())) + fn read_block(&self, reader: R) -> Result { + let result = Self::deserialize_block_spawning(reader); + + Ok(Message::Block(result?.into())) } fn read_getblocks(&self, mut reader: R) -> Result { @@ -700,6 +702,28 @@ impl Codec { result.expect("scope has already finished") } + + /// TBA + #[allow(clippy::unwrap_in_result)] + fn deserialize_block_spawning(reader: R) -> Result { + let mut result = None; + + // Correctness: TBA + // + // Since we use `block_in_place()`, other futures running on the connection task will be blocked: + // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html + // + // We can't use `spawn_blocking()` because: + // - The `reader` has a lifetime (but we could replace it with a `Vec` of message data) + // - There is no way to check the blocking task's future for panics + tokio::task::block_in_place(|| { + rayon::in_place_scope_fifo(|s| { + s.spawn_fifo(|_s| result = Some(Block::zcash_deserialize(reader))) + }) + }); + + result.expect("scope has already finished") + } } // XXX replace these interior unit tests with exterior integration tests + proptest From 06ed353ad32827e0f1891afb5dac18f33d032af6 Mon Sep 17 00:00:00 2001 From: Alfredo Garcia Date: Thu, 21 Jul 2022 16:41:26 -0300 Subject: [PATCH 5/5] fill some docs --- zebra-network/src/protocol/external/codec.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/zebra-network/src/protocol/external/codec.rs b/zebra-network/src/protocol/external/codec.rs index 2aee641982f..89fc68fffb3 100644 --- a/zebra-network/src/protocol/external/codec.rs +++ b/zebra-network/src/protocol/external/codec.rs @@ -572,7 +572,6 @@ impl Codec { fn read_block(&self, reader: R) -> Result { let result = Self::deserialize_block_spawning(reader); - Ok(Message::Block(result?.into())) } @@ -629,7 +628,6 @@ impl Codec { fn read_tx(&self, reader: R) -> Result { let result = Self::deserialize_transaction_spawning(reader); - Ok(Message::Tx(result?.into())) } @@ -679,14 +677,14 @@ impl Codec { Ok(Message::FilterClear) } - /// TBA + /// Given the reader, deserialize the transaction in the rayon thread pool. #[allow(clippy::unwrap_in_result)] fn deserialize_transaction_spawning( reader: R, ) -> Result { let mut result = None; - // Correctness: TBA + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. // // Since we use `block_in_place()`, other futures running on the connection task will be blocked: // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html @@ -703,12 +701,12 @@ impl Codec { result.expect("scope has already finished") } - /// TBA + /// Given the reader, deserialize the block in the rayon thread pool. #[allow(clippy::unwrap_in_result)] fn deserialize_block_spawning(reader: R) -> Result { let mut result = None; - // Correctness: TBA + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. // // Since we use `block_in_place()`, other futures running on the connection task will be blocked: // https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html