Skip to content

Commit

Permalink
Create a helper MockService type to help with writing tests that us…
Browse files Browse the repository at this point in the history
…e mock `tower::Service`s (#2748)

* Implement initial service mocking helpers

Adds a [`MockService`] type, which can be configured and built for usage
in unit tests or proptests. The mocked service can then be used to
intercept requests and respond indivdiually to them.

* Use `MockService in the `mempool::Crawler` test

Refactor it to remove the helper mock function, and use the new
`MockService` helper type.

* Use `MockService` in `CandidateSet` test vectors

Refactor to remove the manual mocking of the peer set service.

* Panic if a response is not sent by `MockService`

Change the current semantics to require all `MockService` usages to
respond to every intercepted request.

A `must_use` attribute was added to the `ResponseSender` so that the
compiler can warn when this doesn't happen.

* Allow generic error types in `MockService`

Replace the hard-coded `BoxError` as the `Service`'s error type with a
generic type parameter. This allows mocking services in locations that
require specific error types.

* Add a `ResponseSender::request` getter

Allow inspecting the request again before responding, and using
information from the request in the response.

Co-authored-by: Conrado Gouvea <[email protected]>
  • Loading branch information
jvff and conradoplg authored Sep 21, 2021
1 parent 061ad55 commit b714b2b
Show file tree
Hide file tree
Showing 4 changed files with 798 additions and 83 deletions.
98 changes: 37 additions & 61 deletions zebra-network/src/peer_set/candidate_set/tests/vectors.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
use std::{
collections::VecDeque,
convert::TryInto,
iter,
net::{IpAddr, SocketAddr},
str::FromStr,
sync::Arc,
time::Duration as StdDuration,
};

use chrono::{DateTime, Duration, Utc};
use futures::future;
use tokio::{
runtime::Runtime,
sync::watch,
time::{self, Instant},
};
use tower::Service;
use tracing::Span;

use zebra_chain::serialization::DateTime32;
use zebra_test::mock_service::{MockService, PanicAssertion};

use super::super::{validate_addrs, CandidateSet};
use crate::{
Expand Down Expand Up @@ -146,30 +142,36 @@ fn candidate_set_updates_are_rate_limited() {
let _guard = runtime.enter();

let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let (peer_service, call_count) = mock_peer_service();
let mut candidate_set =
CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);
let mut peer_service = MockService::build().for_unit_tests();
let mut candidate_set = CandidateSet::new(
Arc::new(std::sync::Mutex::new(address_book)),
peer_service.clone(),
);

runtime.block_on(async move {
time::pause();

let time_limit = Instant::now()
+ INTERVALS_TO_RUN * MIN_PEER_GET_ADDR_INTERVAL
+ StdDuration::from_secs(1);
let mut next_allowed_request_time = Instant::now();

while Instant::now() <= time_limit {
candidate_set
.update()
.await
.expect("Call to CandidateSet::update should not fail");

if Instant::now() >= next_allowed_request_time {
verify_fanned_out_requests(&mut peer_service).await;

next_allowed_request_time = Instant::now() + MIN_PEER_GET_ADDR_INTERVAL;
} else {
peer_service.expect_no_requests().await;
}

time::advance(MIN_PEER_GET_ADDR_INTERVAL / POLL_FREQUENCY_FACTOR).await;
}

assert_eq!(
*call_count.borrow(),
INTERVALS_TO_RUN as usize * GET_ADDR_FANOUT
);
});
}

Expand All @@ -181,9 +183,11 @@ fn candidate_set_update_after_update_initial_is_rate_limited() {
let _guard = runtime.enter();

let address_book = AddressBook::new(SocketAddr::from_str("0.0.0.0:0").unwrap(), Span::none());
let (peer_service, call_count) = mock_peer_service();
let mut candidate_set =
CandidateSet::new(Arc::new(std::sync::Mutex::new(address_book)), peer_service);
let mut peer_service = MockService::build().for_unit_tests();
let mut candidate_set = CandidateSet::new(
Arc::new(std::sync::Mutex::new(address_book)),
peer_service.clone(),
);

runtime.block_on(async move {
time::pause();
Expand All @@ -194,7 +198,7 @@ fn candidate_set_update_after_update_initial_is_rate_limited() {
.await
.expect("Call to CandidateSet::update should not fail");

assert_eq!(*call_count.borrow(), GET_ADDR_FANOUT);
verify_fanned_out_requests(&mut peer_service).await;

// The following two calls to `update` should be skipped
candidate_set
Expand All @@ -207,7 +211,7 @@ fn candidate_set_update_after_update_initial_is_rate_limited() {
.await
.expect("Call to CandidateSet::update should not fail");

assert_eq!(*call_count.borrow(), GET_ADDR_FANOUT);
peer_service.expect_no_requests().await;

// After waiting for at least the minimum interval the call to `update` should succeed
time::advance(MIN_PEER_GET_ADDR_INTERVAL).await;
Expand All @@ -216,7 +220,7 @@ fn candidate_set_update_after_update_initial_is_rate_limited() {
.await
.expect("Call to CandidateSet::update should not fail");

assert_eq!(*call_count.borrow(), 2 * GET_ADDR_FANOUT);
verify_fanned_out_requests(&mut peer_service).await;
});
}

Expand All @@ -243,49 +247,21 @@ fn mock_gossiped_peers(last_seen_times: impl IntoIterator<Item = DateTime<Utc>>)
.collect()
}

/// Create a mock `PeerSet` service that checks that requests to it are rate limited.
/// Verify that a batch of fanned out requests are sent by the candidate set.
///
/// # Panics
///
/// The function also returns a call count watcher, that can be used for checking how many times the
/// service was called.
fn mock_peer_service<E>() -> (
impl Service<
Request,
Response = Response,
Future = future::Ready<Result<Response, E>>,
Error = E,
> + 'static,
watch::Receiver<usize>,
/// This will panic (causing the test to fail) if more or less requests are received than the
/// expected [`GET_ADDR_FANOUT`] amount.
async fn verify_fanned_out_requests(
peer_service: &mut MockService<Request, Response, PanicAssertion>,
) {
let rate_limit_interval = MIN_PEER_GET_ADDR_INTERVAL;

let mut call_counter = 0;
let (call_count_sender, call_count_receiver) = watch::channel(call_counter);

let mut peer_request_tracker: VecDeque<_> =
iter::repeat(Instant::now()).take(GET_ADDR_FANOUT).collect();

let service = tower::service_fn(move |request| {
match request {
Request::Peers => {
// Get time from queue that the request is authorized to be sent
let authorized_request_time = peer_request_tracker
.pop_front()
.expect("peer_request_tracker should always have GET_ADDR_FANOUT elements");
// Check that the request was rate limited
assert!(Instant::now() >= authorized_request_time);
// Push a new authorization, updated by the rate limit interval
peer_request_tracker.push_back(Instant::now() + rate_limit_interval);

// Increment count of calls
call_counter += 1;
let _ = call_count_sender.send(call_counter);

// Return an empty list of peer addresses
future::ok(Response::Peers(vec![]))
}
_ => unreachable!("Received an unexpected internal message: {:?}", request),
}
});
for _ in 0..GET_ADDR_FANOUT {
peer_service
.expect_request_that(|request| matches!(request, Request::Peers))
.await
.respond(Response::Peers(vec![]));
}

(service, call_count_receiver)
peer_service.expect_no_requests().await;
}
1 change: 1 addition & 0 deletions zebra-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Once;

#[allow(missing_docs)]
pub mod command;
pub mod mock_service;
pub mod net;
pub mod prelude;
pub mod transcript;
Expand Down
Loading

0 comments on commit b714b2b

Please sign in to comment.