Skip to content

Commit

Permalink
Replace usage of ServiceExt::ready_and
Browse files Browse the repository at this point in the history
It was deprecated in favor of `ServiceExt::ready`.
  • Loading branch information
jvff committed Nov 2, 2021
1 parent 8e67f86 commit ba3c476
Show file tree
Hide file tree
Showing 27 changed files with 90 additions and 126 deletions.
4 changes: 2 additions & 2 deletions tower-batch/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ where
tracing::trace!("notifying caller about worker failure");
let _ = tx.send(Err(failed.clone()));
} else {
match self.service.ready_and().await {
match self.service.ready().await {
Ok(svc) => {
let rsp = svc.call(req.into());
let _ = tx.send(Ok(rsp));
Expand All @@ -109,7 +109,7 @@ where
async fn flush_service(&mut self) {
if let Err(e) = self
.service
.ready_and()
.ready()
.and_then(|svc| svc.call(BatchControl::Flush))
.await
{
Expand Down
2 changes: 1 addition & 1 deletion tower-batch/tests/ed25519.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ where
sk.sign(&msg[..])
};

verifier.ready_and().await?;
verifier.ready().await?;
results.push(span.in_scope(|| verifier.call((vk_bytes, sig, msg).into())))
}

Expand Down
12 changes: 6 additions & 6 deletions tower-batch/tests/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@ async fn wakes_pending_waiters_on_close() {

// // keep the request in the worker
handle.allow(0);
let service1 = service.ready_and().await.unwrap();
let service1 = service.ready().await.unwrap();
let poll = worker.poll();
assert_pending!(poll);
let mut response = task::spawn(service1.call(()));

let mut service1 = service.clone();
let mut ready1 = task::spawn(service1.ready_and());
let mut ready1 = task::spawn(service1.ready());
assert_pending!(worker.poll());
assert_pending!(ready1.poll(), "no capacity");

let mut service1 = service.clone();
let mut ready2 = task::spawn(service1.ready_and());
let mut ready2 = task::spawn(service1.ready());
assert_pending!(worker.poll());
assert_pending!(ready2.poll(), "no capacity");

Expand Down Expand Up @@ -80,17 +80,17 @@ async fn wakes_pending_waiters_on_failure() {

// keep the request in the worker
handle.allow(0);
let service1 = service.ready_and().await.unwrap();
let service1 = service.ready().await.unwrap();
assert_pending!(worker.poll());
let mut response = task::spawn(service1.call("hello"));

let mut service1 = service.clone();
let mut ready1 = task::spawn(service1.ready_and());
let mut ready1 = task::spawn(service1.ready());
assert_pending!(worker.poll());
assert_pending!(ready1.poll(), "no capacity");

let mut service1 = service.clone();
let mut ready2 = task::spawn(service1.ready_and());
let mut ready2 = task::spawn(service1.ready());
assert_pending!(worker.poll());
assert_pending!(ready2.poll(), "no capacity");

Expand Down
6 changes: 3 additions & 3 deletions tower-fallback/tests/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async fn fallback() {

let mut svc = Fallback::new(svc1, svc2);

assert_eq!(svc.ready_and().await.unwrap().call(1).await.unwrap(), 1);
assert_eq!(svc.ready_and().await.unwrap().call(11).await.unwrap(), 111);
assert!(svc.ready_and().await.unwrap().call(21).await.is_err());
assert_eq!(svc.ready().await.unwrap().call(1).await.unwrap(), 1);
assert_eq!(svc.ready().await.unwrap().call(11).await.unwrap(), 111);
assert!(svc.ready().await.unwrap().call(21).await.is_err());
}
6 changes: 3 additions & 3 deletions zebra-consensus/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ where
// Check that this block is actually a new block.
tracing::trace!("checking that block is not already in state");
match state_service
.ready_and()
.ready()
.await
.map_err(|source| VerifyBlockError::Depth { source, hash })?
.call(zs::Request::Depth(hash))
Expand Down Expand Up @@ -179,7 +179,7 @@ where
));
for transaction in &block.transactions {
let rsp = transaction_verifier
.ready_and()
.ready()
.await
.expect("transaction verifier is always ready")
.call(tx::Request::Block {
Expand Down Expand Up @@ -211,7 +211,7 @@ where
transaction_hashes,
};
match state_service
.ready_and()
.ready()
.await
.map_err(VerifyBlockError::Commit)?
.call(zs::Request::CommitBlock(prepared_block))
Expand Down
2 changes: 1 addition & 1 deletion zebra-consensus/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ where
};

let tip = match state_service
.ready_and()
.ready()
.await
.unwrap()
.call(zs::Request::Tip)
Expand Down
48 changes: 10 additions & 38 deletions zebra-consensus/src/checkpoint/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ async fn single_item_checkpoint_list() -> Result<(), Report> {
);

/// SPANDOC: Make sure the verifier service is ready
let ready_verifier_service = checkpoint_verifier
.ready_and()
.map_err(|e| eyre!(e))
.await?;
let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?;
/// SPANDOC: Set up the future for block 0
let verify_future = timeout(
Duration::from_secs(VERIFY_TIMEOUT_SECONDS),
Expand Down Expand Up @@ -148,10 +145,7 @@ async fn multi_item_checkpoint_list() -> Result<(), Report> {
// Now verify each block
for (block, height, hash) in checkpoint_data {
/// SPANDOC: Make sure the verifier service is ready
let ready_verifier_service = checkpoint_verifier
.ready_and()
.map_err(|e| eyre!(e))
.await?;
let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?;

/// SPANDOC: Set up the future for block {?height}
let verify_future = timeout(
Expand Down Expand Up @@ -325,8 +319,7 @@ async fn continuous_blockchain(
if height <= restart_height {
let mut state_service = state_service.clone();
/// SPANDOC: Make sure the state service is ready for block {?height}
let ready_state_service =
state_service.ready_and().map_err(|e| eyre!(e)).await?;
let ready_state_service = state_service.ready().map_err(|e| eyre!(e)).await?;

/// SPANDOC: Add block directly to the state {?height}
ready_state_service
Expand All @@ -342,10 +335,7 @@ async fn continuous_blockchain(
}

/// SPANDOC: Make sure the verifier service is ready for block {?height}
let ready_verifier_service = checkpoint_verifier
.ready_and()
.map_err(|e| eyre!(e))
.await?;
let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?;

/// SPANDOC: Set up the future for block {?height}
let verify_future = timeout(
Expand Down Expand Up @@ -470,10 +460,7 @@ async fn block_higher_than_max_checkpoint_fail() -> Result<(), Report> {
);

/// SPANDOC: Make sure the verifier service is ready
let ready_verifier_service = checkpoint_verifier
.ready_and()
.map_err(|e| eyre!(e))
.await?;
let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?;
/// SPANDOC: Set up the future for block 415000
let verify_future = timeout(
Duration::from_secs(VERIFY_TIMEOUT_SECONDS),
Expand Down Expand Up @@ -547,10 +534,7 @@ async fn wrong_checkpoint_hash_fail() -> Result<(), Report> {
);

/// SPANDOC: Make sure the verifier service is ready (1/3)
let ready_verifier_service = checkpoint_verifier
.ready_and()
.map_err(|e| eyre!(e))
.await?;
let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?;
/// SPANDOC: Set up the future for bad block 0 (1/3)
// TODO(teor || jlusby): check error kind
let bad_verify_future_1 = timeout(
Expand All @@ -574,10 +558,7 @@ async fn wrong_checkpoint_hash_fail() -> Result<(), Report> {
);

/// SPANDOC: Make sure the verifier service is ready (2/3)
let ready_verifier_service = checkpoint_verifier
.ready_and()
.map_err(|e| eyre!(e))
.await?;
let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?;
/// SPANDOC: Set up the future for bad block 0 again (2/3)
// TODO(teor || jlusby): check error kind
let bad_verify_future_2 = timeout(
Expand All @@ -601,10 +582,7 @@ async fn wrong_checkpoint_hash_fail() -> Result<(), Report> {
);

/// SPANDOC: Make sure the verifier service is ready (3/3)
let ready_verifier_service = checkpoint_verifier
.ready_and()
.map_err(|e| eyre!(e))
.await?;
let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?;
/// SPANDOC: Set up the future for good block 0 (3/3)
let good_verify_future = timeout(
Duration::from_secs(VERIFY_TIMEOUT_SECONDS),
Expand Down Expand Up @@ -732,10 +710,7 @@ async fn checkpoint_drop_cancel() -> Result<(), Report> {
// Now collect verify futures for each block
for (block, height, hash) in checkpoint_data {
/// SPANDOC: Make sure the verifier service is ready
let ready_verifier_service = checkpoint_verifier
.ready_and()
.map_err(|e| eyre!(e))
.await?;
let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?;

/// SPANDOC: Set up the future for block {?height}
let verify_future = timeout(
Expand Down Expand Up @@ -811,10 +786,7 @@ async fn hard_coded_mainnet() -> Result<(), Report> {
assert!(checkpoint_verifier.checkpoint_list.max_height() > block::Height(0));

/// SPANDOC: Make sure the verifier service is ready
let ready_verifier_service = checkpoint_verifier
.ready_and()
.map_err(|e| eyre!(e))
.await?;
let ready_verifier_service = checkpoint_verifier.ready().map_err(|e| eyre!(e)).await?;
/// SPANDOC: Set up the future for block 0
let verify_future = timeout(
Duration::from_secs(VERIFY_TIMEOUT_SECONDS),
Expand Down
2 changes: 1 addition & 1 deletion zebra-consensus/src/primitives/ed25519/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ where
let sk = SigningKey::new(&mut rng);
let vk = VerificationKey::from(&sk);
let sig = sk.sign(&msg[..]);
verifier.ready_and().await?;
verifier.ready().await?;
results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into())))
}

Expand Down
6 changes: 3 additions & 3 deletions zebra-consensus/src/primitives/groth16/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ where
tracing::trace!(?spend);

let spend_rsp = spend_verifier
.ready_and()
.ready()
.await?
.call(groth16::ItemWrapper::from(&spend).into());

Expand All @@ -43,7 +43,7 @@ where
tracing::trace!(?output);

let output_rsp = output_verifier
.ready_and()
.ready()
.await?
.call(groth16::ItemWrapper::from(output).into());

Expand Down Expand Up @@ -131,7 +131,7 @@ where
tracing::trace!(?modified_output);

let output_rsp = output_verifier
.ready_and()
.ready()
.await?
.call(groth16::ItemWrapper::from(&modified_output).into());

Expand Down
4 changes: 2 additions & 2 deletions zebra-consensus/src/primitives/redjubjub/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ where
let sk = SigningKey::<SpendAuth>::new(&mut rng);
let vk = VerificationKey::from(&sk);
let sig = sk.sign(&mut rng, &msg[..]);
verifier.ready_and().await?;
verifier.ready().await?;
results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into())))
}
1 => {
let sk = SigningKey::<Binding>::new(&mut rng);
let vk = VerificationKey::from(&sk);
let sig = sk.sign(&mut rng, &msg[..]);
verifier.ready_and().await?;
verifier.ready().await?;
results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into())))
}
_ => panic!(),
Expand Down
4 changes: 2 additions & 2 deletions zebra-consensus/src/primitives/redpallas/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ where
let sk = SigningKey::<SpendAuth>::new(&mut rng);
let vk = VerificationKey::from(&sk);
let sig = sk.sign(&mut rng, &msg[..]);
verifier.ready_and().await?;
verifier.ready().await?;
results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into())))
}
1 => {
let sk = SigningKey::<Binding>::new(&mut rng);
let vk = VerificationKey::from(&sk);
let sig = sk.sign(&mut rng, &msg[..]);
verifier.ready_and().await?;
verifier.ready().await?;
results.push(span.in_scope(|| verifier.call((vk.into(), sig, msg).into())))
}
_ => panic!(),
Expand Down
2 changes: 1 addition & 1 deletion zebra-network/src/peer/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,7 +924,7 @@ where
trace!(?req);
use tower::{load_shed::error::Overloaded, ServiceExt};

if self.svc.ready_and().await.is_err() {
if self.svc.ready().await.is_err() {
// Treat all service readiness errors as Overloaded
// TODO: treat `TryRecvError::Closed` in `Inbound::poll_ready` as a fatal error (#1655)
self.fail_with(PeerError::Overloaded);
Expand Down
2 changes: 1 addition & 1 deletion zebra-network/src/peer/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ where

async move {
let stream = TcpStream::connect(addr).await?;
hs.ready_and().await?;
hs.ready().await?;
let client = hs
.call(HandshakeRequest {
tcp_stream: stream,
Expand Down
2 changes: 1 addition & 1 deletion zebra-network/src/peer_set/candidate_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ where
debug!(?fanout_limit, "sending GetPeers requests");
// TODO: launch each fanout in its own task (might require tokio 1.6)
for _ in 0..fanout_limit {
let peer_service = self.peer_service.ready_and().await?;
let peer_service = self.peer_service.ready().await?;
responses.push(peer_service.call(Request::Peers));
}
while let Some(rsp) = responses.next().await {
Expand Down
4 changes: 2 additions & 2 deletions zebra-network/src/peer_set/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ where
let _guard = accept_span.enter();

debug!("got incoming connection");
handshaker.ready_and().await?;
handshaker.ready().await?;
// TODO: distinguish between proxied listeners and direct listeners
let handshaker_span = info_span!("listen_handshaker", peer = ?connected_addr);

Expand Down Expand Up @@ -750,7 +750,7 @@ where

// the connector is always ready, so this can't hang
let outbound_connector = outbound_connector
.ready_and()
.ready()
.await
.expect("outbound connector never errors");

Expand Down
2 changes: 1 addition & 1 deletion zebra-state/src/service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ async fn populated_state(
let mut responses = FuturesUnordered::new();

for request in requests {
let rsp = state.ready_and().await.unwrap().call(request);
let rsp = state.ready().await.unwrap().call(request);
responses.push(rsp);
}

Expand Down
2 changes: 1 addition & 1 deletion zebra-test/src/transcript.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
// These unwraps could propagate errors with the correct
// bound on C::Error
let fut = to_check
.ready_and()
.ready()
.await
.map_err(Into::into)
.map_err(|e| eyre!(e))
Expand Down
8 changes: 4 additions & 4 deletions zebra-test/tests/transcript.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ async fn transcript_returns_responses_and_ends() {

for (req, rsp) in TRANSCRIPT_DATA.iter() {
assert_eq!(
svc.ready_and().await.unwrap().call(req).await.unwrap(),
svc.ready().await.unwrap().call(req).await.unwrap(),
*rsp.as_ref().unwrap()
);
}
assert!(svc.ready_and().await.unwrap().call("end").await.is_err());
assert!(svc.ready().await.unwrap().call("end").await.is_err());
}

#[tokio::test]
Expand All @@ -37,10 +37,10 @@ async fn transcript_errors_wrong_request() {
let mut svc = Transcript::from(TRANSCRIPT_DATA.iter().cloned());

assert_eq!(
svc.ready_and().await.unwrap().call("req1").await.unwrap(),
svc.ready().await.unwrap().call("req1").await.unwrap(),
"rsp1",
);
assert!(svc.ready_and().await.unwrap().call("bad").await.is_err());
assert!(svc.ready().await.unwrap().call("bad").await.is_err());
}

#[tokio::test]
Expand Down
Loading

0 comments on commit ba3c476

Please sign in to comment.