diff --git a/Cargo.lock b/Cargo.lock index 4e81a981fa4..809e4b303fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4702,7 +4702,6 @@ dependencies = [ "tower-test", "tracing", "tracing-futures", - "zebra-consensus", "zebra-test", ] diff --git a/tower-batch-control/Cargo.toml b/tower-batch-control/Cargo.toml index 494bc0829b9..08400e29f96 100644 --- a/tower-batch-control/Cargo.toml +++ b/tower-batch-control/Cargo.toml @@ -1,7 +1,10 @@ [package] name = "tower-batch-control" version = "0.2.41-beta.10" -authors = ["Zcash Foundation ", "Tower Maintainers "] +authors = [ + "Zcash Foundation ", + "Tower Maintainers ", +] description = "Tower middleware for batch request processing" # # Legal # @@ -43,8 +46,7 @@ rand = "0.8.5" tokio = { version = "1.36.0", features = ["full", "tracing", "test-util"] } tokio-test = "0.4.3" -tower-fallback = { path = "../tower-fallback/" } +tower-fallback = { path = "../tower-fallback/", version = "0.2.41-beta.10" } tower-test = "0.4.0" -zebra-consensus = { path = "../zebra-consensus/" } -zebra-test = { path = "../zebra-test/" } +zebra-test = { path = "../zebra-test/", version = "1.0.0-beta.34" } diff --git a/tower-batch-control/tests/ed25519.rs b/tower-batch-control/tests/ed25519.rs index 773b1e3e017..5bbee2f72fb 100644 --- a/tower-batch-control/tests/ed25519.rs +++ b/tower-batch-control/tests/ed25519.rs @@ -1,18 +1,172 @@ //! Test batching using ed25519 verification. -use std::time::Duration; +use std::{ + mem, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; use color_eyre::{eyre::eyre, Report}; -use ed25519_zebra::*; +use ed25519_zebra::{batch, Error, SigningKey, VerificationKeyBytes}; use futures::stream::{FuturesOrdered, StreamExt}; +use futures::FutureExt; +use futures_core::Future; use rand::thread_rng; +use tokio::sync::{oneshot::error::RecvError, watch}; use tower::{Service, ServiceExt}; -use tower_batch_control::Batch; +use tower_batch_control::{Batch, BatchControl}; use tower_fallback::Fallback; // ============ service impl ============ -use zebra_consensus::ed25519::{Item as Ed25519Item, Verifier as Ed25519Verifier}; +/// A boxed [`std::error::Error`]. +type BoxError = Box; + +/// The type of the batch verifier. +type BatchVerifier = batch::Verifier; + +/// The type of verification results. +type VerifyResult = Result<(), Error>; + +/// The type of the batch sender channel. +type Sender = watch::Sender>; + +/// The type of the batch item. +/// This is an `Ed25519Item`. +type Item = batch::Item; + +/// Ed25519 signature verifier service +struct Verifier { + /// A batch verifier for ed25519 signatures. + batch: BatchVerifier, + + /// A channel for broadcasting the result of a batch to the futures for each batch item. + /// + /// Each batch gets a newly created channel, so there is only ever one result sent per channel. + /// Tokio doesn't have a oneshot multi-consumer channel, so we use a watch channel. + tx: Sender, +} + +impl Default for Verifier { + fn default() -> Self { + let batch = BatchVerifier::default(); + let (tx, _) = watch::channel(None); + Self { batch, tx } + } +} + +impl Verifier { + /// Returns the batch verifier and channel sender from `self`, + /// replacing them with a new empty batch. + fn take(&mut self) -> (BatchVerifier, Sender) { + // Use a new verifier and channel for each batch. + let batch = mem::take(&mut self.batch); + + let (tx, _) = watch::channel(None); + let tx = mem::replace(&mut self.tx, tx); + + (batch, tx) + } + + /// Synchronously process the batch, and send the result using the channel sender. + /// This function blocks until the batch is completed. + fn verify(batch: BatchVerifier, tx: Sender) { + let result = batch.verify(thread_rng()); + let _ = tx.send(Some(result)); + } + + /// Flush the batch using a thread pool, and return the result via the channel. + /// This returns immediately, usually before the batch is completed. + fn flush_blocking(&mut self) { + let (batch, tx) = self.take(); + + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + // + // We don't care about execution order here, because this method is only called on drop. + tokio::task::block_in_place(|| rayon::spawn_fifo(|| Self::verify(batch, tx))); + } + + /// Flush the batch using a thread pool, and return the result via the channel. + /// This function returns a future that becomes ready when the batch is completed. + async fn flush_spawning(batch: BatchVerifier, tx: Sender) { + // Correctness: Do CPU-intensive work on a dedicated thread, to avoid blocking other futures. + let _ = tx.send(spawn_fifo(move || batch.verify(thread_rng())).await.ok()); + } +} + +impl Service> for Verifier { + type Response = (); + type Error = BoxError; + type Future = Pin> + Send + 'static>>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: BatchControl) -> Self::Future { + match req { + BatchControl::Item(item) => { + tracing::trace!("got ed25519 item"); + self.batch.queue(item); + let mut rx = self.tx.subscribe(); + + Box::pin(async move { + match rx.changed().await { + Ok(()) => { + // We use a new channel for each batch, + // so we always get the correct batch result here. + let result = rx.borrow() + .ok_or("threadpool unexpectedly dropped response channel sender. Is Zebra shutting down?")?; + + if result.is_ok() { + tracing::trace!(?result, "validated ed25519 signature"); + } else { + tracing::trace!(?result, "invalid ed25519 signature"); + } + result.map_err(BoxError::from) + } + Err(_recv_error) => panic!("ed25519 verifier was dropped without flushing"), + } + }) + } + + BatchControl::Flush => { + tracing::trace!("got ed25519 flush command"); + + let (batch, tx) = self.take(); + + Box::pin(Self::flush_spawning(batch, tx).map(Ok)) + } + } + } +} + +impl Drop for Verifier { + fn drop(&mut self) { + // We need to flush the current batch in case there are still any pending futures. + // This returns immediately, usually before the batch is completed. + self.flush_blocking(); + } +} + +/// Fires off a task into the Rayon threadpool and awaits the result through a oneshot channel. +async fn spawn_fifo< + E: 'static + std::error::Error + Sync + Send, + F: 'static + FnOnce() -> Result<(), E> + Send, +>( + f: F, +) -> Result, RecvError> { + // Rayon doesn't have a spawn function that returns a value, + // so we use a oneshot channel instead. + let (rsp_tx, rsp_rx) = tokio::sync::oneshot::channel(); + + rayon::spawn_fifo(move || { + let _ = rsp_tx.send(f()); + }); + + rsp_rx.await +} // =============== testing code ======== @@ -22,7 +176,7 @@ async fn sign_and_verify( bad_index: Option, ) -> Result<(), V::Error> where - V: Service, + V: Service, { let mut results = FuturesOrdered::new(); for i in 0..n { @@ -61,7 +215,7 @@ async fn batch_flushes_on_max_items() -> Result<(), Report> { // flushing is happening based on hitting max_items. // // Create our own verifier, so we don't shut down a shared verifier used by other tests. - let verifier = Batch::new(Ed25519Verifier::default(), 10, 5, Duration::from_secs(1000)); + let verifier = Batch::new(Verifier::default(), 10, 5, Duration::from_secs(1000)); timeout(Duration::from_secs(1), sign_and_verify(verifier, 100, None)) .await .map_err(|e| eyre!(e))? @@ -79,12 +233,7 @@ async fn batch_flushes_on_max_latency() -> Result<(), Report> { // flushing is happening based on hitting max_latency. // // Create our own verifier, so we don't shut down a shared verifier used by other tests. - let verifier = Batch::new( - Ed25519Verifier::default(), - 100, - 10, - Duration::from_millis(500), - ); + let verifier = Batch::new(Verifier::default(), 100, 10, Duration::from_millis(500)); timeout(Duration::from_secs(1), sign_and_verify(verifier, 10, None)) .await .map_err(|e| eyre!(e))? @@ -99,13 +248,8 @@ async fn fallback_verification() -> Result<(), Report> { // Create our own verifier, so we don't shut down a shared verifier used by other tests. let verifier = Fallback::new( - Batch::new( - Ed25519Verifier::default(), - 10, - 1, - Duration::from_millis(100), - ), - tower::service_fn(|item: Ed25519Item| async move { item.verify_single() }), + Batch::new(Verifier::default(), 10, 1, Duration::from_millis(100)), + tower::service_fn(|item: Item| async move { item.verify_single() }), ); sign_and_verify(verifier, 100, Some(39)) diff --git a/tower-fallback/Cargo.toml b/tower-fallback/Cargo.toml index 7a90391eed6..1954c75d48a 100644 --- a/tower-fallback/Cargo.toml +++ b/tower-fallback/Cargo.toml @@ -24,4 +24,4 @@ tracing = "0.1.39" [dev-dependencies] tokio = { version = "1.36.0", features = ["full", "tracing", "test-util"] } -zebra-test = { path = "../zebra-test/" } +zebra-test = { path = "../zebra-test/", version = "1.0.0-beta.34" } diff --git a/zebra-chain/Cargo.toml b/zebra-chain/Cargo.toml index 3a626c052d2..9284c064e91 100644 --- a/zebra-chain/Cargo.toml +++ b/zebra-chain/Cargo.toml @@ -168,7 +168,7 @@ rand_chacha = "0.3.1" tokio = { version = "1.36.0", features = ["full", "tracing", "test-util"] } -zebra-test = { path = "../zebra-test/" } +zebra-test = { path = "../zebra-test/", version = "1.0.0-beta.34" } [[bench]] name = "block" diff --git a/zebra-consensus/Cargo.toml b/zebra-consensus/Cargo.toml index 69769d0b12b..f1dbf95b20b 100644 --- a/zebra-consensus/Cargo.toml +++ b/zebra-consensus/Cargo.toml @@ -94,6 +94,6 @@ tokio = { version = "1.36.0", features = ["full", "tracing", "test-util"] } tracing-error = "0.2.0" tracing-subscriber = "0.3.18" -zebra-state = { path = "../zebra-state", features = ["proptest-impl"] } -zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] } -zebra-test = { path = "../zebra-test/" } +zebra-state = { path = "../zebra-state", version = "1.0.0-beta.34", features = ["proptest-impl"] } +zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.34", features = ["proptest-impl"] } +zebra-test = { path = "../zebra-test/", version = "1.0.0-beta.34" } diff --git a/zebra-consensus/src/primitives/ed25519/tests.rs b/zebra-consensus/src/primitives/ed25519/tests.rs index 0847ed08202..cc75e71fa36 100644 --- a/zebra-consensus/src/primitives/ed25519/tests.rs +++ b/zebra-consensus/src/primitives/ed25519/tests.rs @@ -2,70 +2,97 @@ use std::time::Duration; -use color_eyre::eyre::{eyre, Result}; -use futures::stream::{FuturesUnordered, StreamExt}; +use color_eyre::eyre::{eyre, Report, Result}; +use futures::stream::{FuturesOrdered, StreamExt}; use tower::ServiceExt; use tower_batch_control::Batch; use crate::primitives::ed25519::*; -async fn sign_and_verify(mut verifier: V, n: usize) -> Result<(), V::Error> +async fn sign_and_verify( + mut verifier: V, + n: usize, + bad_index: Option, +) -> Result<(), V::Error> where V: Service, { - let mut rng = thread_rng(); - let mut results = FuturesUnordered::new(); + let mut results = FuturesOrdered::new(); for i in 0..n { let span = tracing::trace_span!("sig", i); + let sk = SigningKey::new(thread_rng()); + let vk_bytes = VerificationKeyBytes::from(&sk); let msg = b"BatchVerifyTest"; + let sig = if Some(i) == bad_index { + sk.sign(b"badmsg") + } else { + sk.sign(&msg[..]) + }; - let sk = SigningKey::new(&mut rng); - let vk = VerificationKey::from(&sk); - let sig = sk.sign(&msg[..]); verifier.ready().await?; - results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into()))) + results.push_back(span.in_scope(|| verifier.call((vk_bytes, sig, msg).into()))) } - while let Some(result) = results.next().await { - result?; + let mut numbered_results = results.enumerate(); + while let Some((i, result)) = numbered_results.next().await { + if Some(i) == bad_index { + assert!(result.is_err()); + } else { + result?; + } } Ok(()) } #[tokio::test(flavor = "multi_thread")] -async fn batch_flushes_on_max_items_test() -> Result<()> { - batch_flushes_on_max_items().await -} - -#[spandoc::spandoc] -async fn batch_flushes_on_max_items() -> Result<()> { +async fn batch_flushes_on_max_items() -> Result<(), Report> { use tokio::time::timeout; + let _init_guard = zebra_test::init(); // Use a very long max_latency and a short timeout to check that // flushing is happening based on hitting max_items. + // + // Create our own verifier, so we don't shut down a shared verifier used by other tests. let verifier = Batch::new(Verifier::default(), 10, 5, Duration::from_secs(1000)); - timeout(Duration::from_secs(5), sign_and_verify(verifier, 100)) - .await? + timeout(Duration::from_secs(5), sign_and_verify(verifier, 100, None)) + .await + .map_err(|e| eyre!(e))? .map_err(|e| eyre!(e))?; Ok(()) } #[tokio::test(flavor = "multi_thread")] -async fn batch_flushes_on_max_latency_test() -> Result<()> { - batch_flushes_on_max_latency().await -} - -#[spandoc::spandoc] -async fn batch_flushes_on_max_latency() -> Result<()> { +async fn batch_flushes_on_max_latency() -> Result<(), Report> { use tokio::time::timeout; + let _init_guard = zebra_test::init(); // Use a very high max_items and a short timeout to check that // flushing is happening based on hitting max_latency. + // + // Create our own verifier, so we don't shut down a shared verifier used by other tests. let verifier = Batch::new(Verifier::default(), 100, 10, Duration::from_millis(500)); - timeout(Duration::from_secs(5), sign_and_verify(verifier, 10)) - .await? + timeout(Duration::from_secs(5), sign_and_verify(verifier, 10, None)) + .await + .map_err(|e| eyre!(e))? + .map_err(|e| eyre!(e))?; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn fallback_verification() -> Result<(), Report> { + let _init_guard = zebra_test::init(); + + // Create our own verifier, so we don't shut down a shared verifier used by other tests. + let verifier = Fallback::new( + Batch::new(Verifier::default(), 10, 1, Duration::from_millis(100)), + tower::service_fn(|item: Item| async move { item.verify_single() }), + ); + + sign_and_verify(verifier, 100, Some(39)) + .await .map_err(|e| eyre!(e))?; Ok(()) diff --git a/zebra-rpc/Cargo.toml b/zebra-rpc/Cargo.toml index dcd8aacd138..756ad79b958 100644 --- a/zebra-rpc/Cargo.toml +++ b/zebra-rpc/Cargo.toml @@ -87,9 +87,9 @@ proptest = "1.4.0" thiserror = "1.0.57" tokio = { version = "1.36.0", features = ["full", "tracing", "test-util"] } -zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] } -zebra-consensus = { path = "../zebra-consensus", features = ["proptest-impl"] } -zebra-network = { path = "../zebra-network", features = ["proptest-impl"] } -zebra-state = { path = "../zebra-state", features = ["proptest-impl"] } +zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.34", features = ["proptest-impl"] } +zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.34", features = ["proptest-impl"] } +zebra-network = { path = "../zebra-network", version = "1.0.0-beta.34", features = ["proptest-impl"] } +zebra-state = { path = "../zebra-state", version = "1.0.0-beta.34", features = ["proptest-impl"] } -zebra-test = { path = "../zebra-test" } +zebra-test = { path = "../zebra-test", version = "1.0.0-beta.34" } diff --git a/zebra-scan/Cargo.toml b/zebra-scan/Cargo.toml index a282e1812d2..143f04b76fd 100644 --- a/zebra-scan/Cargo.toml +++ b/zebra-scan/Cargo.toml @@ -26,17 +26,17 @@ required-features = ["proptest-impl"] # Test features proptest-impl = [ - "proptest", - "proptest-derive", - "zebra-state/proptest-impl", - "zebra-chain/proptest-impl", - "zebra-test", - "bls12_381", - "ff", - "group", - "jubjub", - "rand", - "zcash_note_encryption", + "proptest", + "proptest-derive", + "zebra-state/proptest-impl", + "zebra-chain/proptest-impl", + "zebra-test", + "bls12_381", + "ff", + "group", + "jubjub", + "rand", + "zcash_note_encryption", ] [dependencies] @@ -87,5 +87,7 @@ jubjub = "0.10.0" rand = "0.8.5" zcash_note_encryption = "0.4.0" -zebra-state = { path = "../zebra-state", version = "1.0.0-beta.34", features = ["proptest-impl"] } +zebra-state = { path = "../zebra-state", version = "1.0.0-beta.34", features = [ + "proptest-impl", +] } zebra-test = { path = "../zebra-test", version = "1.0.0-beta.34" } diff --git a/zebra-script/Cargo.toml b/zebra-script/Cargo.toml index b5deaea219a..ba3487e1f2d 100644 --- a/zebra-script/Cargo.toml +++ b/zebra-script/Cargo.toml @@ -25,4 +25,4 @@ displaydoc = "0.2.4" [dev-dependencies] hex = "0.4.3" lazy_static = "1.4.0" -zebra-test = { path = "../zebra-test" } +zebra-test = { path = "../zebra-test", version = "1.0.0-beta.34" } diff --git a/zebra-state/Cargo.toml b/zebra-state/Cargo.toml index f085a5c5d88..34519690219 100644 --- a/zebra-state/Cargo.toml +++ b/zebra-state/Cargo.toml @@ -107,5 +107,5 @@ jubjub = "0.10.0" tokio = { version = "1.36.0", features = ["full", "tracing", "test-util"] } -zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] } -zebra-test = { path = "../zebra-test/" } +zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.34", features = ["proptest-impl"] } +zebra-test = { path = "../zebra-test/", version = "1.0.0-beta.34" } diff --git a/zebrad/Cargo.toml b/zebrad/Cargo.toml index 5efbe59d53e..5add19da2ca 100644 --- a/zebrad/Cargo.toml +++ b/zebrad/Cargo.toml @@ -280,16 +280,16 @@ proptest-derive = "0.4.0" # enable span traces and track caller in tests color-eyre = { version = "0.6.2" } -zebra-chain = { path = "../zebra-chain", features = ["proptest-impl"] } -zebra-consensus = { path = "../zebra-consensus", features = ["proptest-impl"] } -zebra-network = { path = "../zebra-network", features = ["proptest-impl"] } -zebra-state = { path = "../zebra-state", features = ["proptest-impl"] } -zebra-scan = { path = "../zebra-scan", features = ["proptest-impl"] } +zebra-chain = { path = "../zebra-chain", version = "1.0.0-beta.34", features = ["proptest-impl"] } +zebra-consensus = { path = "../zebra-consensus", version = "1.0.0-beta.34", features = ["proptest-impl"] } +zebra-network = { path = "../zebra-network", version = "1.0.0-beta.34", features = ["proptest-impl"] } +zebra-scan = { path = "../zebra-scan", version = "0.1.0-alpha.3", features = ["proptest-impl"] } +zebra-state = { path = "../zebra-state", version = "1.0.0-beta.34", features = ["proptest-impl"] } -zebra-node-services = { path = "../zebra-node-services", features = ["rpc-client"] } +zebra-node-services = { path = "../zebra-node-services", version = "1.0.0-beta.34", features = ["rpc-client"] } -zebra-test = { path = "../zebra-test" } -zebra-grpc = { path = "../zebra-grpc" } +zebra-test = { path = "../zebra-test", version = "1.0.0-beta.34" } +zebra-grpc = { path = "../zebra-grpc", version = "0.1.0-alpha.1" } # Used by the checkpoint generation tests via the zebra-checkpoints feature # (the binaries in this crate won't be built unless their features are enabled). @@ -300,4 +300,4 @@ zebra-grpc = { path = "../zebra-grpc" } # When `-Z bindeps` is stabilised, enable this binary dependency instead: # https://github.com/rust-lang/cargo/issues/9096 # zebra-utils { path = "../zebra-utils", artifact = "bin:zebra-checkpoints" } -zebra-utils = { path = "../zebra-utils" } +zebra-utils = { path = "../zebra-utils", version = "1.0.0-beta.34" }