Skip to content

Commit

Permalink
Fix flaky oximeter reregistration test (#5661)
Browse files Browse the repository at this point in the history
- Fixes #5645
- Reworks the registration task to use `wait_for_condition`. This
extends the duration of the test, but is much less flaky when the system
is loaded.
  • Loading branch information
bnaecker authored Apr 29, 2024
1 parent cfa6bd9 commit 161f5ea
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 39 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion oximeter/producer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ internal-dns.workspace = true
[dev-dependencies]
anyhow.workspace = true
clap.workspace = true
httptest.workspace = true
omicron-test-utils.workspace = true
serde_json.workspace = true
slog-term.workspace = true
115 changes: 78 additions & 37 deletions oximeter/producer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,16 +460,23 @@ mod tests {
use super::LogConfig;
use super::ProducerEndpoint;
use super::Server;
use httptest::matchers::request;
use httptest::responders::status_code;
use httptest::Expectation;
use httptest::Server as TestServer;
use dropshot::endpoint;
use dropshot::ApiDescription;
use dropshot::ConfigDropshot;
use dropshot::HttpError;
use dropshot::HttpResponseCreated;
use dropshot::HttpServer;
use dropshot::HttpServerStarter;
use dropshot::RequestContext;
use omicron_common::api::internal::nexus::ProducerKind;
use omicron_common::api::internal::nexus::ProducerRegistrationResponse;
use omicron_test_utils::dev::poll::{wait_for_condition, CondCheckError};
use slog::Drain;
use slog::Logger;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::Instant;
use uuid::Uuid;

fn test_logger() -> Logger {
Expand All @@ -481,60 +488,94 @@ mod tests {
log
}

// Re-registration interval for tests.
const INTERVAL: Duration = Duration::from_secs(1);

type Context = Arc<AtomicU32>;

// Mock endpoint for the test Nexus server.
#[endpoint {
method = POST,
path = "/metrics/producers",
}]
async fn register_producer(
rqctx: RequestContext<Context>,
) -> Result<HttpResponseCreated<ProducerRegistrationResponse>, HttpError>
{
rqctx.context().fetch_add(1, Ordering::SeqCst);
Ok(HttpResponseCreated(ProducerRegistrationResponse {
lease_duration: INTERVAL,
}))
}

// Start a Dropshot server mocking the Nexus registration endpoint.
fn spawn_fake_nexus_server(log: &Logger) -> HttpServer<Context> {
let mut api = ApiDescription::new();
api.register(register_producer).expect("Expected to register endpoint");
HttpServerStarter::new(
&ConfigDropshot {
bind_address: "[::1]:0".parse().unwrap(),
request_body_max_bytes: 2048,
..Default::default()
},
api,
Arc::new(AtomicU32::new(0)),
log,
)
.expect("Expected to start Dropshot server")
.start()
}

#[tokio::test]
async fn test_producer_registration_task() {
let log = test_logger();
let fake_nexus = TestServer::run();
let fake_nexus = spawn_fake_nexus_server(&log);
slog::info!(
log,
"fake nexus test server listening";
"address" => ?fake_nexus.addr(),
"address" => ?fake_nexus.local_addr(),
);

const INTERVAL: Duration = Duration::from_secs(1);
let body = ProducerRegistrationResponse { lease_duration: INTERVAL };
fake_nexus.expect(
Expectation::matching(request::method_path(
"POST",
"/metrics/producers",
))
.times(2..)
.respond_with(
status_code(201).body(serde_json::to_string(&body).unwrap()),
),
);
let address = "[::1]:0".parse().unwrap();
let config = Config {
server_info: ProducerEndpoint {
id: Uuid::new_v4(),
kind: ProducerKind::Service,
address,
base_route: String::from("/collect"),
base_route: String::new(),
interval: Duration::from_secs(10),
},
registration_address: Some(fake_nexus.addr()),
registration_address: Some(fake_nexus.local_addr()),
request_body_max_bytes: 1024,
log: LogConfig::Logger(log),
};

// Ideally, we would check pretty carefully that there are exactly N
// registrations after N renewal periods. That's brittle, especially on
// a loaded system. Instead, we'll run for a number of intervals, and
// just check that we have more than one.
// a loaded system. Instead, we'll wait until we've received the
// expected number of registration requests.
let _server = Server::start(&config).unwrap();
let now = Instant::now();
tokio::time::pause();
let max = INTERVAL * 10;
while now.elapsed() < max {
tokio::time::advance(Duration::from_millis(10)).await;
}
tokio::time::resume();

// Drop the server to check its requests, rather than calling
// `verify_and_clear()`. It's possible for requests to be in-flight
// between when we called `verify_and_clear()` and when we exit this
// test, at which point the drop impl of the `TestServer` checks the
// expectations _again_. Let's just do the check by dropping here.
drop(fake_nexus);
const N_REQUESTS: u32 = 10;
const POLL_INTERVAL: Duration = Duration::from_millis(100);

// The poll interval is 1s (see `INTERVAL`), and the producer attempts
// to register every 1/4 interval, so this should be quite sufficient
// for even heavily-loaded tests.
const POLL_DURATION: Duration = Duration::from_secs(30);
wait_for_condition(
|| async {
if fake_nexus.app_private().load(Ordering::SeqCst) >= N_REQUESTS
{
Ok(())
} else {
Err(CondCheckError::<()>::NotYet)
}
},
&POLL_INTERVAL,
&POLL_DURATION,
)
.await
.expect("Expected all registration requests to be made within timeout");
fake_nexus.close().await.expect("Expected to close server");
}
}

0 comments on commit 161f5ea

Please sign in to comment.