Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

try to do deserialization of transaction in a rayon thread #4801

Merged
merged 5 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions zebra-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ lazy_static = "1.4.0"
ordered-map = "0.4.2"
pin-project = "1.0.10"
rand = { version = "0.8.5", package = "rand" }
rayon = "1.5.3"
regex = "1.5.6"
serde = { version = "1.0.137", features = ["serde_derive"] }
thiserror = "1.0.31"
Expand Down
67 changes: 58 additions & 9 deletions zebra-network/src/protocol/external/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,9 @@ impl Codec {
Ok(Message::GetAddr)
}

fn read_block<R: Read>(&self, reader: R) -> Result<Message, Error> {
Ok(Message::Block(Block::zcash_deserialize(reader)?.into()))
fn read_block<R: Read + std::marker::Send>(&self, reader: R) -> Result<Message, Error> {
let result = Self::deserialize_block_spawning(reader);
Ok(Message::Block(result?.into()))
}

fn read_getblocks<R: Read>(&self, mut reader: R) -> Result<Message, Error> {
Expand Down Expand Up @@ -625,8 +626,9 @@ impl Codec {
Ok(Message::NotFound(Vec::zcash_deserialize(reader)?))
}

fn read_tx<R: Read>(&self, reader: R) -> Result<Message, Error> {
Ok(Message::Tx(Transaction::zcash_deserialize(reader)?.into()))
fn read_tx<R: Read + std::marker::Send>(&self, reader: R) -> Result<Message, Error> {
let result = Self::deserialize_transaction_spawning(reader);
Ok(Message::Tx(result?.into()))
}

fn read_mempool<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
Expand Down Expand Up @@ -674,6 +676,52 @@ impl Codec {
fn read_filterclear<R: Read>(&self, mut _reader: R) -> Result<Message, Error> {
Ok(Message::FilterClear)
}

/// Given the reader, deserialize the transaction in the rayon thread pool.
#[allow(clippy::unwrap_in_result)]
fn deserialize_transaction_spawning<R: Read + std::marker::Send>(
reader: R,
) -> Result<Transaction, Error> {
let mut result = None;

// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// Since we use `block_in_place()`, other futures running on the connection task will be blocked:
// https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
//
// We can't use `spawn_blocking()` because:
// - The `reader` has a lifetime (but we could replace it with a `Vec` of message data)
// - There is no way to check the blocking task's future for panics
oxarbitrage marked this conversation as resolved.
Show resolved Hide resolved
tokio::task::block_in_place(|| {
rayon::in_place_scope_fifo(|s| {
s.spawn_fifo(|_s| result = Some(Transaction::zcash_deserialize(reader)))
})
});

result.expect("scope has already finished")
}

/// Given the reader, deserialize the block in the rayon thread pool.
#[allow(clippy::unwrap_in_result)]
fn deserialize_block_spawning<R: Read + std::marker::Send>(reader: R) -> Result<Block, Error> {
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
let mut result = None;

// Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures.
//
// Since we use `block_in_place()`, other futures running on the connection task will be blocked:
// https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html
//
// We can't use `spawn_blocking()` because:
// - The `reader` has a lifetime (but we could replace it with a `Vec` of message data)
// - There is no way to check the blocking task's future for panics
tokio::task::block_in_place(|| {
rayon::in_place_scope_fifo(|s| {
s.spawn_fifo(|_s| result = Some(Block::zcash_deserialize(reader)))
})
});

result.expect("scope has already finished")
}
}

// XXX replace these interior unit tests with exterior integration tests + proptest
Expand Down Expand Up @@ -943,7 +991,8 @@ mod tests {
fn max_msg_size_round_trip() {
use zebra_chain::serialization::ZcashDeserializeInto;

let rt = zebra_test::init_async();
//let rt = zebra_test::init_async();
zebra_test::init();

// make tests with a Tx message
let tx: Transaction = zebra_test::vectors::DUMMY_TX1
Expand All @@ -957,7 +1006,7 @@ mod tests {
let size = 85;

// reducing the max size to body size - 1
rt.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let mut bytes = Vec::new();
{
let mut fw = FramedWrite::new(
Expand All @@ -971,7 +1020,7 @@ mod tests {
});

// send again with the msg body size as max size
let msg_bytes = rt.block_on(async {
let msg_bytes = zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let mut bytes = Vec::new();
{
let mut fw = FramedWrite::new(
Expand All @@ -986,7 +1035,7 @@ mod tests {
});

// receive with a reduced max size
rt.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let mut fr = FramedRead::new(
Cursor::new(&msg_bytes),
Codec::builder().with_max_body_len(size - 1).finish(),
Expand All @@ -998,7 +1047,7 @@ mod tests {
});

// receive again with the tx size as max size
rt.block_on(async {
zebra_test::MULTI_THREADED_RUNTIME.block_on(async {
let mut fr = FramedRead::new(
Cursor::new(&msg_bytes),
Codec::builder().with_max_body_len(size).finish(),
Expand Down
4 changes: 2 additions & 2 deletions zebrad/src/components/inbound/tests/real_peer_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ async fn inbound_tx_empty_state_notfound() -> Result<(), crate::BoxError> {
///
/// Uses a Zebra network stack's peer set to query an isolated Zebra TCP connection,
/// with an unrelated transaction test responder.
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn outbound_tx_unrelated_response_notfound() -> Result<(), crate::BoxError> {
// We respond with an unrelated transaction, so the peer gives up on the request.
let unrelated_response: Transaction =
Expand Down Expand Up @@ -486,7 +486,7 @@ async fn outbound_tx_unrelated_response_notfound() -> Result<(), crate::BoxError
/// - returns a `NotFoundRegistry` error for repeated requests to a non-responsive peer.
///
/// The requests are coming from the full stack to the isolated peer.
#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn outbound_tx_partial_response_notfound() -> Result<(), crate::BoxError> {
// We repeatedly respond with the same transaction, so the peer gives up on the second response.
let repeated_tx: Transaction = zebra_test::vectors::DUMMY_TX1.zcash_deserialize_into()?;
Expand Down