From ba3c476dbeb417c191574d9aa680781d4a1d3180 Mon Sep 17 00:00:00 2001 From: Janito Vaqueiro Ferreira Filho Date: Wed, 20 Oct 2021 14:19:21 +0000 Subject: [PATCH] Replace usage of `ServiceExt::ready_and` It was deprecated in favor of `ServiceExt::ready`. --- tower-batch/src/worker.rs | 4 +- tower-batch/tests/ed25519.rs | 2 +- tower-batch/tests/worker.rs | 12 ++--- tower-fallback/tests/fallback.rs | 6 +-- zebra-consensus/src/block.rs | 6 +-- zebra-consensus/src/chain.rs | 2 +- zebra-consensus/src/checkpoint/tests.rs | 48 ++++--------------- .../src/primitives/ed25519/tests.rs | 2 +- .../src/primitives/groth16/tests.rs | 6 +-- .../src/primitives/redjubjub/tests.rs | 4 +- .../src/primitives/redpallas/tests.rs | 4 +- zebra-network/src/peer/connection.rs | 2 +- zebra-network/src/peer/connector.rs | 2 +- zebra-network/src/peer_set/candidate_set.rs | 2 +- zebra-network/src/peer_set/initialize.rs | 4 +- zebra-state/src/service/tests.rs | 2 +- zebra-test/src/transcript.rs | 2 +- zebra-test/tests/transcript.rs | 8 ++-- zebrad/src/components/inbound/tests.rs | 2 +- zebrad/src/components/mempool/crawler.rs | 4 +- zebrad/src/components/mempool/downloads.rs | 2 +- zebrad/src/components/mempool/gossip.rs | 2 +- .../src/components/mempool/queue_checker.rs | 2 +- zebrad/src/components/mempool/tests/vector.rs | 44 ++++++++--------- zebrad/src/components/sync.rs | 36 ++++++-------- zebrad/src/components/sync/downloads.rs | 4 +- zebrad/src/components/sync/gossip.rs | 2 +- 27 files changed, 90 insertions(+), 126 deletions(-) diff --git a/tower-batch/src/worker.rs b/tower-batch/src/worker.rs index 1c96240f88f..75c39215dbd 100644 --- a/tower-batch/src/worker.rs +++ b/tower-batch/src/worker.rs @@ -83,7 +83,7 @@ where tracing::trace!("notifying caller about worker failure"); let _ = tx.send(Err(failed.clone())); } else { - match self.service.ready_and().await { + match self.service.ready().await { Ok(svc) => { let rsp = svc.call(req.into()); let _ = tx.send(Ok(rsp)); @@ -109,7 +109,7 @@ where async fn flush_service(&mut self) { if let Err(e) = self .service - .ready_and() + .ready() .and_then(|svc| svc.call(BatchControl::Flush)) .await { diff --git a/tower-batch/tests/ed25519.rs b/tower-batch/tests/ed25519.rs index a687381e7de..e96a4bec5a8 100644 --- a/tower-batch/tests/ed25519.rs +++ b/tower-batch/tests/ed25519.rs @@ -112,7 +112,7 @@ where sk.sign(&msg[..]) }; - verifier.ready_and().await?; + verifier.ready().await?; results.push(span.in_scope(|| verifier.call((vk_bytes, sig, msg).into()))) } diff --git a/tower-batch/tests/worker.rs b/tower-batch/tests/worker.rs index c11dc4a2254..bc21209b633 100644 --- a/tower-batch/tests/worker.rs +++ b/tower-batch/tests/worker.rs @@ -21,18 +21,18 @@ async fn wakes_pending_waiters_on_close() { // // keep the request in the worker handle.allow(0); - let service1 = service.ready_and().await.unwrap(); + let service1 = service.ready().await.unwrap(); let poll = worker.poll(); assert_pending!(poll); let mut response = task::spawn(service1.call(())); let mut service1 = service.clone(); - let mut ready1 = task::spawn(service1.ready_and()); + let mut ready1 = task::spawn(service1.ready()); assert_pending!(worker.poll()); assert_pending!(ready1.poll(), "no capacity"); let mut service1 = service.clone(); - let mut ready2 = task::spawn(service1.ready_and()); + let mut ready2 = task::spawn(service1.ready()); assert_pending!(worker.poll()); assert_pending!(ready2.poll(), "no capacity"); @@ -80,17 +80,17 @@ async fn wakes_pending_waiters_on_failure() { // keep the request in the worker handle.allow(0); - let service1 = service.ready_and().await.unwrap(); + let service1 = service.ready().await.unwrap(); assert_pending!(worker.poll()); let mut response = task::spawn(service1.call("hello")); let mut service1 = service.clone(); - let mut ready1 = task::spawn(service1.ready_and()); + let mut ready1 = task::spawn(service1.ready()); assert_pending!(worker.poll()); assert_pending!(ready1.poll(), "no capacity"); let mut service1 = service.clone(); - let mut ready2 = task::spawn(service1.ready_and()); + let mut ready2 = task::spawn(service1.ready()); assert_pending!(worker.poll()); assert_pending!(ready2.poll(), "no capacity"); diff --git a/tower-fallback/tests/fallback.rs b/tower-fallback/tests/fallback.rs index c610d178430..5ca1d3400a2 100644 --- a/tower-fallback/tests/fallback.rs +++ b/tower-fallback/tests/fallback.rs @@ -30,7 +30,7 @@ async fn fallback() { let mut svc = Fallback::new(svc1, svc2); - assert_eq!(svc.ready_and().await.unwrap().call(1).await.unwrap(), 1); - assert_eq!(svc.ready_and().await.unwrap().call(11).await.unwrap(), 111); - assert!(svc.ready_and().await.unwrap().call(21).await.is_err()); + assert_eq!(svc.ready().await.unwrap().call(1).await.unwrap(), 1); + assert_eq!(svc.ready().await.unwrap().call(11).await.unwrap(), 111); + assert!(svc.ready().await.unwrap().call(21).await.is_err()); } diff --git a/zebra-consensus/src/block.rs b/zebra-consensus/src/block.rs index dfda62165ba..469c86637b9 100644 --- a/zebra-consensus/src/block.rs +++ b/zebra-consensus/src/block.rs @@ -125,7 +125,7 @@ where // Check that this block is actually a new block. tracing::trace!("checking that block is not already in state"); match state_service - .ready_and() + .ready() .await .map_err(|source| VerifyBlockError::Depth { source, hash })? .call(zs::Request::Depth(hash)) @@ -179,7 +179,7 @@ where )); for transaction in &block.transactions { let rsp = transaction_verifier - .ready_and() + .ready() .await .expect("transaction verifier is always ready") .call(tx::Request::Block { @@ -211,7 +211,7 @@ where transaction_hashes, }; match state_service - .ready_and() + .ready() .await .map_err(VerifyBlockError::Commit)? .call(zs::Request::CommitBlock(prepared_block)) diff --git a/zebra-consensus/src/chain.rs b/zebra-consensus/src/chain.rs index 5e0d946a4d3..48bbacb5be7 100644 --- a/zebra-consensus/src/chain.rs +++ b/zebra-consensus/src/chain.rs @@ -203,7 +203,7 @@ where }; let tip = match state_service - .ready_and() + .ready() .await .unwrap() .call(zs::Request::Tip) diff --git a/zebra-consensus/src/checkpoint/tests.rs b/zebra-consensus/src/checkpoint/tests.rs index b364fade1c6..6a2cec14158 100644 --- a/zebra-consensus/src/checkpoint/tests.rs +++ b/zebra-consensus/src/checkpoint/tests.rs @@ -64,10 +64,7 @@ async fn single_item_checkpoint_list() -> Result<(), Report> { ); /// SPANDOC: Make sure the verifier service is ready - let ready_verifier_service = checkpoint_verifier - .ready_and() - .map_err(|e| eyre!(e)) - .await?; + let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?; /// SPANDOC: Set up the future for block 0 let verify_future = timeout( Duration::from_secs(VERIFY_TIMEOUT_SECONDS), @@ -148,10 +145,7 @@ async fn multi_item_checkpoint_list() -> Result<(), Report> { // Now verify each block for (block, height, hash) in checkpoint_data { /// SPANDOC: Make sure the verifier service is ready - let ready_verifier_service = checkpoint_verifier - .ready_and() - .map_err(|e| eyre!(e)) - .await?; + let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?; /// SPANDOC: Set up the future for block {?height} let verify_future = timeout( @@ -325,8 +319,7 @@ async fn continuous_blockchain( if height <= restart_height { let mut state_service = state_service.clone(); /// SPANDOC: Make sure the state service is ready for block {?height} - let ready_state_service = - state_service.ready_and().map_err(|e| eyre!(e)).await?; + let ready_state_service = state_service.ready().map_err(|e| eyre!(e)).await?; /// SPANDOC: Add block directly to the state {?height} ready_state_service @@ -342,10 +335,7 @@ async fn continuous_blockchain( } /// SPANDOC: Make sure the verifier service is ready for block {?height} - let ready_verifier_service = checkpoint_verifier - .ready_and() - .map_err(|e| eyre!(e)) - .await?; + let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?; /// SPANDOC: Set up the future for block {?height} let verify_future = timeout( @@ -470,10 +460,7 @@ async fn block_higher_than_max_checkpoint_fail() -> Result<(), Report> { ); /// SPANDOC: Make sure the verifier service is ready - let ready_verifier_service = checkpoint_verifier - .ready_and() - .map_err(|e| eyre!(e)) - .await?; + let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?; /// SPANDOC: Set up the future for block 415000 let verify_future = timeout( Duration::from_secs(VERIFY_TIMEOUT_SECONDS), @@ -547,10 +534,7 @@ async fn wrong_checkpoint_hash_fail() -> Result<(), Report> { ); /// SPANDOC: Make sure the verifier service is ready (1/3) - let ready_verifier_service = checkpoint_verifier - .ready_and() - .map_err(|e| eyre!(e)) - .await?; + let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?; /// SPANDOC: Set up the future for bad block 0 (1/3) // TODO(teor || jlusby): check error kind let bad_verify_future_1 = timeout( @@ -574,10 +558,7 @@ async fn wrong_checkpoint_hash_fail() -> Result<(), Report> { ); /// SPANDOC: Make sure the verifier service is ready (2/3) - let ready_verifier_service = checkpoint_verifier - .ready_and() - .map_err(|e| eyre!(e)) - .await?; + let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?; /// SPANDOC: Set up the future for bad block 0 again (2/3) // TODO(teor || jlusby): check error kind let bad_verify_future_2 = timeout( @@ -601,10 +582,7 @@ async fn wrong_checkpoint_hash_fail() -> Result<(), Report> { ); /// SPANDOC: Make sure the verifier service is ready (3/3) - let ready_verifier_service = checkpoint_verifier - .ready_and() - .map_err(|e| eyre!(e)) - .await?; + let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?; /// SPANDOC: Set up the future for good block 0 (3/3) let good_verify_future = timeout( Duration::from_secs(VERIFY_TIMEOUT_SECONDS), @@ -732,10 +710,7 @@ async fn checkpoint_drop_cancel() -> Result<(), Report> { // Now collect verify futures for each block for (block, height, hash) in checkpoint_data { /// SPANDOC: Make sure the verifier service is ready - let ready_verifier_service = checkpoint_verifier - .ready_and() - .map_err(|e| eyre!(e)) - .await?; + let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?; /// SPANDOC: Set up the future for block {?height} let verify_future = timeout( @@ -811,10 +786,7 @@ async fn hard_coded_mainnet() -> Result<(), Report> { assert!(checkpoint_verifier.checkpoint_list.max_height() > block::Height(0)); /// SPANDOC: Make sure the verifier service is ready - let ready_verifier_service = checkpoint_verifier - .ready_and() - .map_err(|e| eyre!(e)) - .await?; + let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?; /// SPANDOC: Set up the future for block 0 let verify_future = timeout( Duration::from_secs(VERIFY_TIMEOUT_SECONDS), diff --git a/zebra-consensus/src/primitives/ed25519/tests.rs b/zebra-consensus/src/primitives/ed25519/tests.rs index 471b875622f..a95fd3bcaf0 100644 --- a/zebra-consensus/src/primitives/ed25519/tests.rs +++ b/zebra-consensus/src/primitives/ed25519/tests.rs @@ -22,7 +22,7 @@ where let sk = SigningKey::new(&mut rng); let vk = VerificationKey::from(&sk); let sig = sk.sign(&msg[..]); - verifier.ready_and().await?; + verifier.ready().await?; results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into()))) } diff --git a/zebra-consensus/src/primitives/groth16/tests.rs b/zebra-consensus/src/primitives/groth16/tests.rs index 929e2c8b891..e22b4c642b4 100644 --- a/zebra-consensus/src/primitives/groth16/tests.rs +++ b/zebra-consensus/src/primitives/groth16/tests.rs @@ -32,7 +32,7 @@ where tracing::trace!(?spend); let spend_rsp = spend_verifier - .ready_and() + .ready() .await? .call(groth16::ItemWrapper::from(&spend).into()); @@ -43,7 +43,7 @@ where tracing::trace!(?output); let output_rsp = output_verifier - .ready_and() + .ready() .await? .call(groth16::ItemWrapper::from(output).into()); @@ -131,7 +131,7 @@ where tracing::trace!(?modified_output); let output_rsp = output_verifier - .ready_and() + .ready() .await? .call(groth16::ItemWrapper::from(&modified_output).into()); diff --git a/zebra-consensus/src/primitives/redjubjub/tests.rs b/zebra-consensus/src/primitives/redjubjub/tests.rs index 0822e0dfbfa..0f0700094cf 100644 --- a/zebra-consensus/src/primitives/redjubjub/tests.rs +++ b/zebra-consensus/src/primitives/redjubjub/tests.rs @@ -24,14 +24,14 @@ where let sk = SigningKey::::new(&mut rng); let vk = VerificationKey::from(&sk); let sig = sk.sign(&mut rng, &msg[..]); - verifier.ready_and().await?; + verifier.ready().await?; results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into()))) } 1 => { let sk = SigningKey::::new(&mut rng); let vk = VerificationKey::from(&sk); let sig = sk.sign(&mut rng, &msg[..]); - verifier.ready_and().await?; + verifier.ready().await?; results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into()))) } _ => panic!(), diff --git a/zebra-consensus/src/primitives/redpallas/tests.rs b/zebra-consensus/src/primitives/redpallas/tests.rs index b2537e8a9e8..a5ed4e2a32a 100644 --- a/zebra-consensus/src/primitives/redpallas/tests.rs +++ b/zebra-consensus/src/primitives/redpallas/tests.rs @@ -24,14 +24,14 @@ where let sk = SigningKey::::new(&mut rng); let vk = VerificationKey::from(&sk); let sig = sk.sign(&mut rng, &msg[..]); - verifier.ready_and().await?; + verifier.ready().await?; results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into()))) } 1 => { let sk = SigningKey::::new(&mut rng); let vk = VerificationKey::from(&sk); let sig = sk.sign(&mut rng, &msg[..]); - verifier.ready_and().await?; + verifier.ready().await?; results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into()))) } _ => panic!(), diff --git a/zebra-network/src/peer/connection.rs b/zebra-network/src/peer/connection.rs index 8e59e8b80db..3cd36fd58df 100644 --- a/zebra-network/src/peer/connection.rs +++ b/zebra-network/src/peer/connection.rs @@ -924,7 +924,7 @@ where trace!(?req); use tower::{load_shed::error::Overloaded, ServiceExt}; - if self.svc.ready_and().await.is_err() { + if self.svc.ready().await.is_err() { // Treat all service readiness errors as Overloaded // TODO: treat `TryRecvError::Closed` in `Inbound::poll_ready` as a fatal error (#1655) self.fail_with(PeerError::Overloaded); diff --git a/zebra-network/src/peer/connector.rs b/zebra-network/src/peer/connector.rs index 7cf9a585dfb..8f44c152c65 100644 --- a/zebra-network/src/peer/connector.rs +++ b/zebra-network/src/peer/connector.rs @@ -78,7 +78,7 @@ where async move { let stream = TcpStream::connect(addr).await?; - hs.ready_and().await?; + hs.ready().await?; let client = hs .call(HandshakeRequest { tcp_stream: stream, diff --git a/zebra-network/src/peer_set/candidate_set.rs b/zebra-network/src/peer_set/candidate_set.rs index 9383f6775e0..98ef1791d21 100644 --- a/zebra-network/src/peer_set/candidate_set.rs +++ b/zebra-network/src/peer_set/candidate_set.rs @@ -239,7 +239,7 @@ where debug!(?fanout_limit, "sending GetPeers requests"); // TODO: launch each fanout in its own task (might require tokio 1.6) for _ in 0..fanout_limit { - let peer_service = self.peer_service.ready_and().await?; + let peer_service = self.peer_service.ready().await?; responses.push(peer_service.call(Request::Peers)); } while let Some(rsp) = responses.next().await { diff --git a/zebra-network/src/peer_set/initialize.rs b/zebra-network/src/peer_set/initialize.rs index ece01b72bfb..4b52640051f 100644 --- a/zebra-network/src/peer_set/initialize.rs +++ b/zebra-network/src/peer_set/initialize.rs @@ -477,7 +477,7 @@ where let _guard = accept_span.enter(); debug!("got incoming connection"); - handshaker.ready_and().await?; + handshaker.ready().await?; // TODO: distinguish between proxied listeners and direct listeners let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr); @@ -750,7 +750,7 @@ where // the connector is always ready, so this can't hang let outbound_connector = outbound_connector - .ready_and() + .ready() .await .expect("outbound connector never errors"); diff --git a/zebra-state/src/service/tests.rs b/zebra-state/src/service/tests.rs index 61dc712d301..e2369f21bb8 100644 --- a/zebra-state/src/service/tests.rs +++ b/zebra-state/src/service/tests.rs @@ -37,7 +37,7 @@ async fn populated_state( let mut responses = FuturesUnordered::new(); for request in requests { - let rsp = state.ready_and().await.unwrap().call(request); + let rsp = state.ready().await.unwrap().call(request); responses.push(rsp); } diff --git a/zebra-test/src/transcript.rs b/zebra-test/src/transcript.rs index 89f921c613b..04ecd7e12df 100644 --- a/zebra-test/src/transcript.rs +++ b/zebra-test/src/transcript.rs @@ -98,7 +98,7 @@ where // These unwraps could propagate errors with the correct // bound on C::Error let fut = to_check - .ready_and() + .ready() .await .map_err(Into::into) .map_err(|e| eyre!(e)) diff --git a/zebra-test/tests/transcript.rs b/zebra-test/tests/transcript.rs index 90b1b8bd5b7..b8d3ffbb039 100644 --- a/zebra-test/tests/transcript.rs +++ b/zebra-test/tests/transcript.rs @@ -23,11 +23,11 @@ async fn transcript_returns_responses_and_ends() { for (req, rsp) in TRANSCRIPT_DATA.iter() { assert_eq!( - svc.ready_and().await.unwrap().call(req).await.unwrap(), + svc.ready().await.unwrap().call(req).await.unwrap(), *rsp.as_ref().unwrap() ); } - assert!(svc.ready_and().await.unwrap().call("end").await.is_err()); + assert!(svc.ready().await.unwrap().call("end").await.is_err()); } #[tokio::test] @@ -37,10 +37,10 @@ async fn transcript_errors_wrong_request() { let mut svc = Transcript::from(TRANSCRIPT_DATA.iter().cloned()); assert_eq!( - svc.ready_and().await.unwrap().call("req1").await.unwrap(), + svc.ready().await.unwrap().call("req1").await.unwrap(), "rsp1", ); - assert!(svc.ready_and().await.unwrap().call("bad").await.is_err()); + assert!(svc.ready().await.unwrap().call("bad").await.is_err()); } #[tokio::test] diff --git a/zebrad/src/components/inbound/tests.rs b/zebrad/src/components/inbound/tests.rs index 904c30462ea..e66f7dc1598 100644 --- a/zebrad/src/components/inbound/tests.rs +++ b/zebrad/src/components/inbound/tests.rs @@ -609,7 +609,7 @@ async fn setup( .zcash_deserialize_into() .unwrap(); state_service - .ready_and() + .ready() .await .unwrap() .call(zebra_state::Request::CommitFinalizedBlock( diff --git a/zebrad/src/components/mempool/crawler.rs b/zebrad/src/components/mempool/crawler.rs index d88fddcadb3..275e784de6c 100644 --- a/zebrad/src/components/mempool/crawler.rs +++ b/zebrad/src/components/mempool/crawler.rs @@ -197,7 +197,7 @@ where for _ in 0..FANOUT { let mut peer_set = peer_set.clone(); // end the task on permanent peer set errors - let peer_set = peer_set.ready_and().await?; + let peer_set = peer_set.ready().await?; requests.push(peer_set.call(zn::Request::MempoolTransactionIds)); } @@ -242,7 +242,7 @@ where let call_result = self .mempool - .ready_and() + .ready() .await? .call(mempool::Request::Queue(transaction_ids)) .await; diff --git a/zebrad/src/components/mempool/downloads.rs b/zebrad/src/components/mempool/downloads.rs index 7d893d4fba5..4a05c908033 100644 --- a/zebrad/src/components/mempool/downloads.rs +++ b/zebrad/src/components/mempool/downloads.rs @@ -443,7 +443,7 @@ where ) -> Result<(), TransactionDownloadVerifyError> { // Check if the transaction is already in the state. match state - .ready_and() + .ready() .await .map_err(|e| TransactionDownloadVerifyError::StateError(e))? .call(zs::Request::Transaction(txid.mined_id())) diff --git a/zebrad/src/components/mempool/gossip.rs b/zebrad/src/components/mempool/gossip.rs index d02e41b75a7..ac6071bb91a 100644 --- a/zebrad/src/components/mempool/gossip.rs +++ b/zebrad/src/components/mempool/gossip.rs @@ -44,7 +44,7 @@ where info!(?request, "sending mempool transaction broadcast"); // broadcast requests don't return errors, and we'd just want to ignore them anyway - let _ = broadcast_network.ready_and().await?.call(request).await; + let _ = broadcast_network.ready().await?.call(request).await; metrics::counter!("mempool.gossiped.transactions.total", txs_len as _); } diff --git a/zebrad/src/components/mempool/queue_checker.rs b/zebrad/src/components/mempool/queue_checker.rs index 4d828dc5055..b02a2563645 100644 --- a/zebrad/src/components/mempool/queue_checker.rs +++ b/zebrad/src/components/mempool/queue_checker.rs @@ -69,7 +69,7 @@ where // So we propagate any unexpected errors to the task that spawned us. let response = self .mempool - .ready_and() + .ready() .await? .call(mempool::Request::CheckForVerifiedTransactions) .await?; diff --git a/zebrad/src/components/mempool/tests/vector.rs b/zebrad/src/components/mempool/tests/vector.rs index 5fd76886fc3..1c63d2246fb 100644 --- a/zebrad/src/components/mempool/tests/vector.rs +++ b/zebrad/src/components/mempool/tests/vector.rs @@ -63,7 +63,7 @@ async fn mempool_service_basic_single() -> Result<(), Report> { // Test `Request::TransactionIds` let response = service - .ready_and() + .ready() .await .unwrap() .call(Request::TransactionIds) @@ -80,7 +80,7 @@ async fn mempool_service_basic_single() -> Result<(), Report> { .copied() .collect::>(); let response = service - .ready_and() + .ready() .await .unwrap() .call(Request::TransactionsById( @@ -109,7 +109,7 @@ async fn mempool_service_basic_single() -> Result<(), Report> { // Test `Request::RejectedTransactionIds` let response = service - .ready_and() + .ready() .await .unwrap() .call(Request::RejectedTransactionIds( @@ -127,7 +127,7 @@ async fn mempool_service_basic_single() -> Result<(), Report> { // Test `Request::Queue` // Use the ID of the last transaction in the list let response = service - .ready_and() + .ready() .await .unwrap() .call(Request::Queue(vec![last_transaction.transaction.id.into()])) @@ -190,7 +190,7 @@ async fn mempool_queue_single() -> Result<(), Report> { // Test `Request::Queue` for a new transaction let response = service - .ready_and() + .ready() .await .unwrap() .call(Request::Queue(vec![new_tx.transaction.id.into()])) @@ -207,7 +207,7 @@ async fn mempool_queue_single() -> Result<(), Report> { // They should all be rejected; either because they are already in the mempool, // or because they are in the recently evicted list. let response = service - .ready_and() + .ready() .await .unwrap() .call(Request::Queue( @@ -273,7 +273,7 @@ async fn mempool_service_disabled() -> Result<(), Report> { // Test if the mempool answers correctly (i.e. is enabled) let response = service - .ready_and() + .ready() .await .unwrap() .call(Request::TransactionIds) @@ -288,7 +288,7 @@ async fn mempool_service_disabled() -> Result<(), Report> { // Use the ID of the last transaction in the list let txid = more_transactions.last().unwrap().transaction.id; let response = service - .ready_and() + .ready() .await .unwrap() .call(Request::Queue(vec![txid.into()])) @@ -310,7 +310,7 @@ async fn mempool_service_disabled() -> Result<(), Report> { // Test if the mempool returns no transactions when disabled let response = service - .ready_and() + .ready() .await .unwrap() .call(Request::TransactionIds) @@ -329,7 +329,7 @@ async fn mempool_service_disabled() -> Result<(), Report> { // Test if the mempool returns to Queue requests correctly when disabled let response = service - .ready_and() + .ready() .await .unwrap() .call(Request::Queue(vec![txid.into()])) @@ -371,7 +371,7 @@ async fn mempool_cancel_mined() -> Result<(), Report> { .zcash_deserialize_into() .unwrap(); state_service - .ready_and() + .ready() .await .unwrap() .call(zebra_state::Request::CommitFinalizedBlock( @@ -385,7 +385,7 @@ async fn mempool_cancel_mined() -> Result<(), Report> { // Push block 1 to the state state_service - .ready_and() + .ready() .await .unwrap() .call(zebra_state::Request::CommitFinalizedBlock( @@ -402,7 +402,7 @@ async fn mempool_cancel_mined() -> Result<(), Report> { // which cancels all downloads. let txid = block2.transactions[0].unmined_id(); let response = mempool - .ready_and() + .ready() .await .unwrap() .call(Request::Queue(vec![txid.into()])) @@ -464,7 +464,7 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> .zcash_deserialize_into() .unwrap(); state_service - .ready_and() + .ready() .await .unwrap() .call(zebra_state::Request::CommitFinalizedBlock( @@ -476,7 +476,7 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> // Queue transaction from block 2 for download let txid = block2.transactions[0].unmined_id(); let response = mempool - .ready_and() + .ready() .await .unwrap() .call(Request::Queue(vec![txid.into()])) @@ -496,7 +496,7 @@ async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> // Push block 1 to the state. This is considered a network upgrade, // and thus must cancel all pending transaction downloads. state_service - .ready_and() + .ready() .await .unwrap() .call(zebra_state::Request::CommitFinalizedBlock( @@ -537,7 +537,7 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> { .zcash_deserialize_into() .unwrap(); state_service - .ready_and() + .ready() .await .unwrap() .call(zebra_state::Request::CommitFinalizedBlock( @@ -549,7 +549,7 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> { // Queue first transaction for verification // (queue the transaction itself to avoid a download). let request = mempool - .ready_and() + .ready() .await .unwrap() .call(Request::Queue(vec![rejected_tx.transaction.clone().into()])); @@ -576,7 +576,7 @@ async fn mempool_failed_verification_is_rejected() -> Result<(), Report> { // Try to queue the same transaction by its ID and check if it's correctly // rejected. let response = mempool - .ready_and() + .ready() .await .unwrap() .call(Request::Queue(vec![rejected_tx.transaction.id.into()])) @@ -620,7 +620,7 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> { .zcash_deserialize_into() .unwrap(); state_service - .ready_and() + .ready() .await .unwrap() .call(zebra_state::Request::CommitFinalizedBlock( @@ -631,7 +631,7 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> { // Queue second transaction for download and verification. let request = mempool - .ready_and() + .ready() .await .unwrap() .call(Request::Queue(vec![rejected_valid_tx @@ -663,7 +663,7 @@ async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> { // Try to queue the same transaction by its ID and check if it's not being // rejected. let response = mempool - .ready_and() + .ready() .await .unwrap() .call(Request::Queue(vec![rejected_valid_tx diff --git a/zebrad/src/components/sync.rs b/zebrad/src/components/sync.rs index 0385269bbdf..232cdbecce1 100644 --- a/zebrad/src/components/sync.rs +++ b/zebrad/src/components/sync.rs @@ -385,7 +385,7 @@ where async fn obtain_tips(&mut self) -> Result<(), Report> { let block_locator = self .state - .ready_and() + .ready() .await .map_err(|e| eyre!(e))? .call(zebra_state::Request::BlockLocator) @@ -403,16 +403,12 @@ where let mut requests = FuturesUnordered::new(); for _ in 0..FANOUT { - requests.push( - self.tip_network - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zn::Request::FindBlocks { - known_blocks: block_locator.clone(), - stop: None, - }), - ); + requests.push(self.tip_network.ready().await.map_err(|e| eyre!(e))?.call( + zn::Request::FindBlocks { + known_blocks: block_locator.clone(), + stop: None, + }, + )); } let mut download_set = HashSet::new(); @@ -526,16 +522,12 @@ where tracing::debug!(?tip, "asking peers to extend chain tip"); let mut responses = FuturesUnordered::new(); for _ in 0..FANOUT { - responses.push( - self.tip_network - .ready_and() - .await - .map_err(|e| eyre!(e))? - .call(zn::Request::FindBlocks { - known_blocks: vec![tip.tip], - stop: None, - }), - ); + responses.push(self.tip_network.ready().await.map_err(|e| eyre!(e))?.call( + zn::Request::FindBlocks { + known_blocks: vec![tip.tip], + stop: None, + }, + )); } while let Some(res) = responses.next().await { match res.map_err::(|e| eyre!(e)) { @@ -689,7 +681,7 @@ where async fn state_contains(&mut self, hash: block::Hash) -> Result { match self .state - .ready_and() + .ready() .await .map_err(|e| eyre!(e))? .call(zebra_state::Request::Depth(hash)) diff --git a/zebrad/src/components/sync/downloads.rs b/zebrad/src/components/sync/downloads.rs index 63484204542..d6e6b9b59e6 100644 --- a/zebrad/src/components/sync/downloads.rs +++ b/zebrad/src/components/sync/downloads.rs @@ -167,7 +167,7 @@ where tracing::debug!("waiting to request block"); let block_req = self .network - .ready_and() + .ready() .await .map_err(|e| eyre!(e))? .call(zn::Request::BlocksByHash(std::iter::once(hash).collect())); @@ -201,7 +201,7 @@ where metrics::counter!("sync.downloaded.block.count", 1); let rsp = verifier - .ready_and() + .ready() .await .map_err(BlockDownloadVerifyError::VerifierError)? .call(block); diff --git a/zebrad/src/components/sync/gossip.rs b/zebrad/src/components/sync/gossip.rs index c394504a1ae..59765e90e45 100644 --- a/zebrad/src/components/sync/gossip.rs +++ b/zebrad/src/components/sync/gossip.rs @@ -76,7 +76,7 @@ where // broadcast requests don't return errors, and we'd just want to ignore them anyway let _ = broadcast_network - .ready_and() + .ready() .await .map_err(PeerSetReadiness)? .call(request)