diff --git a/Cargo.lock b/Cargo.lock index 26b6de0e8c..e42110f831 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6246,10 +6246,10 @@ dependencies = [ "chrono", "clap", "dropshot", - "httptest", "internal-dns", "nexus-client", "omicron-common", + "omicron-test-utils", "omicron-workspace-hack", "oximeter", "schemars", diff --git a/oximeter/producer/Cargo.toml b/oximeter/producer/Cargo.toml index ba6fd13a6b..b0df929c3b 100644 --- a/oximeter/producer/Cargo.toml +++ b/oximeter/producer/Cargo.toml @@ -24,6 +24,6 @@ internal-dns.workspace = true [dev-dependencies] anyhow.workspace = true clap.workspace = true -httptest.workspace = true +omicron-test-utils.workspace = true serde_json.workspace = true slog-term.workspace = true diff --git a/oximeter/producer/src/lib.rs b/oximeter/producer/src/lib.rs index 89f9a70fbb..5e9e576b12 100644 --- a/oximeter/producer/src/lib.rs +++ b/oximeter/producer/src/lib.rs @@ -460,16 +460,23 @@ mod tests { use super::LogConfig; use super::ProducerEndpoint; use super::Server; - use httptest::matchers::request; - use httptest::responders::status_code; - use httptest::Expectation; - use httptest::Server as TestServer; + use dropshot::endpoint; + use dropshot::ApiDescription; + use dropshot::ConfigDropshot; + use dropshot::HttpError; + use dropshot::HttpResponseCreated; + use dropshot::HttpServer; + use dropshot::HttpServerStarter; + use dropshot::RequestContext; use omicron_common::api::internal::nexus::ProducerKind; use omicron_common::api::internal::nexus::ProducerRegistrationResponse; + use omicron_test_utils::dev::poll::{wait_for_condition, CondCheckError}; use slog::Drain; use slog::Logger; + use std::sync::atomic::AtomicU32; + use std::sync::atomic::Ordering; + use std::sync::Arc; use std::time::Duration; - use tokio::time::Instant; use uuid::Uuid; fn test_logger() -> Logger { @@ -481,60 +488,94 @@ mod tests { log } + // Re-registration interval for tests. + const INTERVAL: Duration = Duration::from_secs(1); + + type Context = Arc; + + // Mock endpoint for the test Nexus server. + #[endpoint { + method = POST, + path = "/metrics/producers", + }] + async fn register_producer( + rqctx: RequestContext, + ) -> Result, HttpError> + { + rqctx.context().fetch_add(1, Ordering::SeqCst); + Ok(HttpResponseCreated(ProducerRegistrationResponse { + lease_duration: INTERVAL, + })) + } + + // Start a Dropshot server mocking the Nexus registration endpoint. + fn spawn_fake_nexus_server(log: &Logger) -> HttpServer { + let mut api = ApiDescription::new(); + api.register(register_producer).expect("Expected to register endpoint"); + HttpServerStarter::new( + &ConfigDropshot { + bind_address: "[::1]:0".parse().unwrap(), + request_body_max_bytes: 2048, + ..Default::default() + }, + api, + Arc::new(AtomicU32::new(0)), + log, + ) + .expect("Expected to start Dropshot server") + .start() + } + #[tokio::test] async fn test_producer_registration_task() { let log = test_logger(); - let fake_nexus = TestServer::run(); + let fake_nexus = spawn_fake_nexus_server(&log); slog::info!( log, "fake nexus test server listening"; - "address" => ?fake_nexus.addr(), + "address" => ?fake_nexus.local_addr(), ); - const INTERVAL: Duration = Duration::from_secs(1); - let body = ProducerRegistrationResponse { lease_duration: INTERVAL }; - fake_nexus.expect( - Expectation::matching(request::method_path( - "POST", - "/metrics/producers", - )) - .times(2..) - .respond_with( - status_code(201).body(serde_json::to_string(&body).unwrap()), - ), - ); let address = "[::1]:0".parse().unwrap(); let config = Config { server_info: ProducerEndpoint { id: Uuid::new_v4(), kind: ProducerKind::Service, address, - base_route: String::from("/collect"), + base_route: String::new(), interval: Duration::from_secs(10), }, - registration_address: Some(fake_nexus.addr()), + registration_address: Some(fake_nexus.local_addr()), request_body_max_bytes: 1024, log: LogConfig::Logger(log), }; // Ideally, we would check pretty carefully that there are exactly N // registrations after N renewal periods. That's brittle, especially on - // a loaded system. Instead, we'll run for a number of intervals, and - // just check that we have more than one. + // a loaded system. Instead, we'll wait until we've received the + // expected number of registration requests. let _server = Server::start(&config).unwrap(); - let now = Instant::now(); - tokio::time::pause(); - let max = INTERVAL * 10; - while now.elapsed() < max { - tokio::time::advance(Duration::from_millis(10)).await; - } - tokio::time::resume(); - - // Drop the server to check its requests, rather than calling - // `verify_and_clear()`. It's possible for requests to be in-flight - // between when we called `verify_and_clear()` and when we exit this - // test, at which point the drop impl of the `TestServer` checks the - // expectations _again_. Let's just do the check by dropping here. - drop(fake_nexus); + const N_REQUESTS: u32 = 10; + const POLL_INTERVAL: Duration = Duration::from_millis(100); + + // The poll interval is 1s (see `INTERVAL`), and the producer attempts + // to register every 1/4 interval, so this should be quite sufficient + // for even heavily-loaded tests. + const POLL_DURATION: Duration = Duration::from_secs(30); + wait_for_condition( + || async { + if fake_nexus.app_private().load(Ordering::SeqCst) >= N_REQUESTS + { + Ok(()) + } else { + Err(CondCheckError::<()>::NotYet) + } + }, + &POLL_INTERVAL, + &POLL_DURATION, + ) + .await + .expect("Expected all registration requests to be made within timeout"); + fake_nexus.close().await.expect("Expected to close server"); } }