diff --git a/nexus/db-model/src/oximeter_info.rs b/nexus/db-model/src/oximeter_info.rs index 5579425a63..017c1bf0ba 100644 --- a/nexus/db-model/src/oximeter_info.rs +++ b/nexus/db-model/src/oximeter_info.rs @@ -9,7 +9,9 @@ use nexus_types::internal_api; use uuid::Uuid; /// A record representing a registered `oximeter` collector. -#[derive(Queryable, Insertable, Debug, Clone, Copy, PartialEq, Eq)] +#[derive( + Queryable, Insertable, Selectable, Debug, Clone, Copy, PartialEq, Eq, +)] #[diesel(table_name = oximeter)] pub struct OximeterInfo { /// The ID for this oximeter instance. diff --git a/nexus/db-queries/src/db/datastore/oximeter.rs b/nexus/db-queries/src/db/datastore/oximeter.rs index f95156a780..f43ab4a051 100644 --- a/nexus/db-queries/src/db/datastore/oximeter.rs +++ b/nexus/db-queries/src/db/datastore/oximeter.rs @@ -22,12 +22,11 @@ use chrono::Utc; use diesel::prelude::*; use diesel::result::DatabaseErrorKind; use diesel::result::Error as DieselError; -use diesel::sql_types; -use nexus_db_model::ProducerKindEnum; use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; use omicron_common::api::external::ListResultVec; use omicron_common::api::external::ResourceType; +use omicron_common::api::internal; use uuid::Uuid; /// Type returned when reassigning producers from an Oximeter collector. @@ -168,94 +167,29 @@ impl DataStore { } } - /// Create or update a record for a new producer endpoint + /// Create or update a record for a producer endpoint /// - /// If this producer record is being updated, this method does _not_ update - /// the assigned Oximeter to match `producer.oximeter_id` if it differs from - /// the existing record in the database. We currently only expect a single - /// Oximeter instance to be running at a time: - /// - pub async fn producer_endpoint_create( + /// If the endpoint is being created, a randomly-chosen Oximeter instance + /// will be assigned. If the endpoint is being updated, it will keep its + /// existing Oximeter assignment. + /// + /// Returns the oximeter ID assigned to this producer (either the + /// randomly-chosen one, if newly inserted, or the previously-chosen, if + /// updated). + pub async fn producer_endpoint_upsert_and_assign( &self, opctx: &OpContext, - producer: &ProducerEndpoint, - ) -> Result<(), Error> { - // Our caller has already chosen an Oximeter instance for this producer, - // but we don't want to allow it to use a nonexistent or expunged - // Oximeter. This query turns into a `SELECT all_the_fields_of_producer - // WHERE producer.oximeter_id is legal` in a diesel-compatible way. I'm - // not aware of a helper method to generate "all the fields of - // `producer`", so instead we have a big tuple of its fields that must - // stay in sync with the `table!` definition and field ordering for the - // `metric_producer` table. The compiler will catch any mistakes - // _except_ incorrect orderings where the types still line up (e.g., - // swapping two Uuid columns), which is not ideal but is hopefully good - // enough. - let producer_subquery = { - use db::schema::oximeter::dsl; - - dsl::oximeter - .select(( - producer.id().into_sql::(), - producer - .time_created() - .into_sql::(), - producer - .time_modified() - .into_sql::(), - producer.kind.into_sql::(), - producer.ip.into_sql::(), - producer.port.into_sql::(), - producer.interval.into_sql::(), - producer.oximeter_id.into_sql::(), - )) - .filter( - dsl::id - .eq(producer.oximeter_id) - .and(dsl::time_expunged.is_null()), - ) - }; - - use db::schema::metric_producer::dsl; - - // TODO: see https://github.com/oxidecomputer/omicron/issues/323 - let n = diesel::insert_into(dsl::metric_producer) - .values(producer_subquery) - .on_conflict(dsl::id) - .do_update() - .set(( - dsl::time_modified.eq(Utc::now()), - dsl::kind.eq(producer.kind), - dsl::ip.eq(producer.ip), - dsl::port.eq(producer.port), - dsl::interval.eq(producer.interval), - )) - .execute_async(&*self.pool_connection_authorized(opctx).await?) + producer: &internal::nexus::ProducerEndpoint, + ) -> Result { + match queries::oximeter::upsert_producer(producer) + .get_result_async(&*self.pool_connection_authorized(opctx).await?) .await - .map_err(|e| { - public_error_from_diesel( - e, - ErrorHandler::Conflict( - ResourceType::MetricProducer, - "Producer Endpoint", - ), - ) - })?; - - // We expect `n` to basically always be 1 (1 row was inserted or - // updated). It can be 0 if `producer.oximeter_id` doesn't exist or has - // been expunged. It can never be 2 or greater because - // `producer_subquery` filters on finding an exact row for its Oximeter - // instance's ID. - match n { - 0 => Err(Error::not_found_by_id( - ResourceType::Oximeter, - &producer.oximeter_id, + { + Ok(info) => Ok(info), + Err(DieselError::NotFound) => Err(Error::unavail( + "no Oximeter instances available for assignment", )), - 1 => Ok(()), - _ => Err(Error::internal_error(&format!( - "multiple rows inserted ({n}) in `producer_endpoint_create`" - ))), + Err(e) => Err(public_error_from_diesel(e, ErrorHandler::Server)), } } @@ -361,7 +295,6 @@ mod tests { use db::datastore::pub_test_utils::datastore_test; use nexus_test_utils::db::test_setup_database; use nexus_types::internal_api::params; - use omicron_common::api::external::LookupType; use omicron_common::api::internal::nexus; use omicron_test_utils::dev; use std::time::Duration; @@ -519,10 +452,137 @@ mod tests { } #[tokio::test] - async fn test_producer_endpoint_create_rejects_expunged_oximeters() { + async fn test_producer_endpoint_reassigns_if_oximeter_expunged() { // Setup let logctx = dev::test_setup_log( - "test_producer_endpoint_create_rejects_expunged_oximeters", + "test_producer_endpoint_reassigns_if_oximeter_expunged", + ); + let mut db = test_setup_database(&logctx.log).await; + let (opctx, datastore) = + datastore_test(&logctx, &db, Uuid::new_v4()).await; + + // Insert an Oximeter collector. + let oximeter1_id = Uuid::new_v4(); + datastore + .oximeter_create( + &opctx, + &OximeterInfo::new(¶ms::OximeterInfo { + collector_id: oximeter1_id, + address: "[::1]:0".parse().unwrap(), // unused + }), + ) + .await + .expect("inserted collector"); + + // Insert a producer. + let producer = nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), + interval: Duration::from_secs(0), + }; + let chosen_oximeter = datastore + .producer_endpoint_upsert_and_assign(&opctx, &producer) + .await + .expect("inserted producer"); + assert_eq!(chosen_oximeter.id, oximeter1_id); + + // Grab the inserted producer (so we have its time_modified for checks + // below). + let producer_info = datastore + .producers_list_by_oximeter_id( + &opctx, + oximeter1_id, + &DataPageParams::max_page(), + ) + .await + .expect("listed producers") + .pop() + .expect("got producer"); + assert_eq!(producer_info.id(), producer.id); + + // Expunge the oximeter. + datastore + .oximeter_expunge(&opctx, oximeter1_id) + .await + .expect("expunged oximeter"); + + // Attempting to upsert our producer again should fail; our oximeter has + // been expunged, and our time modified should be unchanged. + let err = datastore + .producer_endpoint_upsert_and_assign(&opctx, &producer) + .await + .expect_err("producer upsert failed") + .to_string(); + assert!( + err.contains("no Oximeter instances available for assignment"), + "unexpected error: {err}" + ); + { + let check_info = datastore + .producers_list_by_oximeter_id( + &opctx, + oximeter1_id, + &DataPageParams::max_page(), + ) + .await + .expect("listed producers") + .pop() + .expect("got producer"); + assert_eq!( + producer_info, check_info, + "unexpected modification in failed upsert" + ); + } + + // Add a new, non-expunged Oximeter. + let oximeter2_id = Uuid::new_v4(); + datastore + .oximeter_create( + &opctx, + &OximeterInfo::new(¶ms::OximeterInfo { + collector_id: oximeter2_id, + address: "[::1]:0".parse().unwrap(), // unused + }), + ) + .await + .expect("inserted collector"); + + // Retry updating our existing producer; it should get reassigned to a + // the new Oximeter. + let chosen_oximeter = datastore + .producer_endpoint_upsert_and_assign(&opctx, &producer) + .await + .expect("inserted producer"); + assert_eq!(chosen_oximeter.id, oximeter2_id); + { + let check_info = datastore + .producers_list_by_oximeter_id( + &opctx, + oximeter2_id, + &DataPageParams::max_page(), + ) + .await + .expect("listed producers") + .pop() + .expect("got producer"); + assert_eq!(check_info.id(), producer_info.id()); + assert!( + check_info.time_modified() > producer_info.time_modified(), + "producer time modified was not advanced" + ); + } + + // Cleanup + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + + #[tokio::test] + async fn test_producer_endpoint_upsert_rejects_expunged_oximeters() { + // Setup + let logctx = dev::test_setup_log( + "test_producer_endpoint_upsert_rejects_expunged_oximeters", ); let mut db = test_setup_database(&logctx.log).await; let (opctx, datastore) = @@ -541,77 +601,87 @@ mod tests { .expect("inserted collector"); } - // We can insert metric producers for each collector. - for &collector_id in &collector_ids { - let producer = ProducerEndpoint::new( - &nexus::ProducerEndpoint { - id: Uuid::new_v4(), - kind: nexus::ProducerKind::Service, - address: "[::1]:0".parse().unwrap(), // unused - interval: Duration::from_secs(0), // unused - }, - collector_id, - ); - datastore - .producer_endpoint_create(&opctx, &producer) + // Creating a producer randomly chooses one of our collectors. Create + // 1000 and check that we saw each collector at least once. + let mut seen_collector_counts = vec![0; collector_ids.len()]; + for _ in 0..1000 { + let producer = nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }; + let collector_id = datastore + .producer_endpoint_upsert_and_assign(&opctx, &producer) .await - .expect("created producer"); + .expect("inserted producer") + .id; + let i = collector_ids + .iter() + .position(|id| *id == collector_id) + .expect("found collector position"); + seen_collector_counts[i] += 1; + } + eprintln!("saw collector counts: {seen_collector_counts:?}"); + for count in seen_collector_counts { + assert_ne!(count, 0); } - // Delete the first collector. + // Expunge the first collector. datastore .oximeter_expunge(&opctx, collector_ids[0]) .await .expect("expunged collector"); - // Attempting to insert a producer assigned to the first collector - // should fail, now that it's expunged. - let err = { - let producer = ProducerEndpoint::new( - &nexus::ProducerEndpoint { - id: Uuid::new_v4(), - kind: nexus::ProducerKind::Service, - address: "[::1]:0".parse().unwrap(), // unused - interval: Duration::from_secs(0), // unused - }, - collector_ids[0], - ); - datastore - .producer_endpoint_create(&opctx, &producer) + // Repeat the test above; we should never see collector 0 chosen. + let mut seen_collector_counts = vec![0; collector_ids.len()]; + for _ in 0..1000 { + let producer = nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }; + let collector_id = datastore + .producer_endpoint_upsert_and_assign(&opctx, &producer) .await - .expect_err("producer creation fails") - }; - assert_eq!( - err, - Error::ObjectNotFound { - type_name: ResourceType::Oximeter, - lookup_type: LookupType::ById(collector_ids[0]) - } - ); + .expect("inserted producer") + .id; + let i = collector_ids + .iter() + .position(|id| *id == collector_id) + .expect("found collector position"); + seen_collector_counts[i] += 1; + } + eprintln!("saw collector counts: {seen_collector_counts:?}"); + assert_eq!(seen_collector_counts[0], 0); + for count in seen_collector_counts.into_iter().skip(1) { + assert_ne!(count, 0); + } - // We can still insert metric producers for the other collectors... + // Expunge the remaining collectors; trying to create a producer now + // should fail. for &collector_id in &collector_ids[1..] { - let mut producer = ProducerEndpoint::new( - &nexus::ProducerEndpoint { - id: Uuid::new_v4(), - kind: nexus::ProducerKind::Service, - address: "[::1]:0".parse().unwrap(), // unused - interval: Duration::from_secs(0), // unused - }, - collector_id, - ); datastore - .producer_endpoint_create(&opctx, &producer) - .await - .expect("created producer"); - - // ... and we can update them. - producer.port = 100.into(); - datastore - .producer_endpoint_create(&opctx, &producer) + .oximeter_expunge(&opctx, collector_id) .await - .expect("created producer"); + .expect("expunged collector"); } + let producer = nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }; + let err = datastore + .producer_endpoint_upsert_and_assign(&opctx, &producer) + .await + .expect_err("unexpected success - all oximeters expunged") + .to_string(); + assert!( + err.contains("no Oximeter instances available for assignment"), + "unexpected error: {err}" + ); // Cleanup db.cleanup().await.unwrap(); @@ -639,26 +709,35 @@ mod tests { .expect("inserted collector"); } - // Insert 250 metric producers assigned to each collector. - for &collector_id in &collector_ids { - for _ in 0..250 { - let producer = ProducerEndpoint::new( - &nexus::ProducerEndpoint { - id: Uuid::new_v4(), - kind: nexus::ProducerKind::Service, - address: "[::1]:0".parse().unwrap(), // unused - interval: Duration::from_secs(0), // unused - }, - collector_id, - ); - datastore - .producer_endpoint_create(&opctx, &producer) - .await - .expect("created producer"); - } + // Insert 1000 metric producers. + let mut seen_collector_counts = vec![0; collector_ids.len()]; + for _ in 0..1000 { + let producer = nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }; + let collector_id = datastore + .producer_endpoint_upsert_and_assign(&opctx, &producer) + .await + .expect("inserted producer") + .id; + let i = collector_ids + .iter() + .position(|id| *id == collector_id) + .expect("found collector position"); + seen_collector_counts[i] += 1; } + eprintln!("saw collector counts: {seen_collector_counts:?}"); + // Sanity check that we got at least one assignment to collector 0 (so + // our reassignment below actually does something). + assert!( + seen_collector_counts[0] > 0, + "expected more than 0 assignments to collector 0 (very unlucky?!)" + ); - // Delete one collector. + // Expunge one collector. datastore .oximeter_expunge(&opctx, collector_ids[0]) .await @@ -669,7 +748,10 @@ mod tests { .oximeter_reassign_all_producers(&opctx, collector_ids[0]) .await .expect("reassigned producers"); - assert_eq!(num_reassigned, CollectorReassignment::Complete(250)); + assert_eq!( + num_reassigned, + CollectorReassignment::Complete(seen_collector_counts[0]) + ); // Check the distribution of producers for each of the remaining // collectors. We don't know the exact count, so we'll check that: @@ -679,10 +761,10 @@ mod tests { // enough that most calculators give up and call it 0) // * All 1000 producers are assigned to one of the three collectors // - // to guard against "the reassignment query gave all 250 to exactly one - // of the remaining collectors", which is an easy failure mode for this - // kind of SQL query, where the query engine only evaluates the - // randomness once instead of once for each producer. + // to guard against "the reassignment query gave all of collector 0's + // producers to exactly one of the remaining collectors", which is an + // easy failure mode for this kind of SQL query, where the query engine + // only evaluates the randomness once instead of once for each producer. let mut producer_counts = [0; 4]; for i in 0..4 { producer_counts[i] = datastore @@ -696,9 +778,13 @@ mod tests { .len(); } assert_eq!(producer_counts[0], 0); // all reassigned - assert!(producer_counts[1] > 250); // gained at least one - assert!(producer_counts[2] > 250); // gained at least one - assert!(producer_counts[3] > 250); // gained at least one + + // each gained at least one + assert!(producer_counts[1] > seen_collector_counts[1]); + assert!(producer_counts[2] > seen_collector_counts[2]); + assert!(producer_counts[3] > seen_collector_counts[3]); + + // all producers are assigned assert_eq!(producer_counts[1..].iter().sum::(), 1000); // Cleanup @@ -729,23 +815,25 @@ mod tests { .expect("inserted collector"); } - // Insert 10 metric producers assigned to each collector. - for &collector_id in &collector_ids { - for _ in 0..10 { - let producer = ProducerEndpoint::new( - &nexus::ProducerEndpoint { - id: Uuid::new_v4(), - kind: nexus::ProducerKind::Service, - address: "[::1]:0".parse().unwrap(), // unused - interval: Duration::from_secs(0), // unused - }, - collector_id, - ); - datastore - .producer_endpoint_create(&opctx, &producer) - .await - .expect("created producer"); - } + // Insert 100 metric producers. + let mut seen_collector_counts = vec![0; collector_ids.len()]; + for _ in 0..100 { + let producer = nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }; + let collector_id = datastore + .producer_endpoint_upsert_and_assign(&opctx, &producer) + .await + .expect("inserted producer") + .id; + let i = collector_ids + .iter() + .position(|id| *id == collector_id) + .expect("found collector position"); + seen_collector_counts[i] += 1; } // Delete all four collectors. @@ -783,15 +871,18 @@ mod tests { .expect("inserted collector"); // Reassigning the original four collectors should now all succeed. - for &collector_id in &collector_ids { + for (i, &collector_id) in collector_ids.iter().enumerate() { let num_reassigned = datastore .oximeter_reassign_all_producers(&opctx, collector_id) .await .expect("reassigned producers"); - assert_eq!(num_reassigned, CollectorReassignment::Complete(10)); + assert_eq!( + num_reassigned, + CollectorReassignment::Complete(seen_collector_counts[i]) + ); } - // All 40 producers should be assigned to our new collector. + // All 100 producers should be assigned to our new collector. let nproducers = datastore .producers_list_by_oximeter_id( &opctx, @@ -801,7 +892,7 @@ mod tests { .await .expect("listed producers") .len(); - assert_eq!(nproducers, 40); + assert_eq!(nproducers, 100); // Cleanup db.cleanup().await.unwrap(); @@ -827,17 +918,14 @@ mod tests { .expect("failed to insert collector"); // Insert a producer - let producer = ProducerEndpoint::new( - &nexus::ProducerEndpoint { - id: Uuid::new_v4(), - kind: nexus::ProducerKind::Service, - address: "[::1]:0".parse().unwrap(), // unused - interval: Duration::from_secs(0), // unused - }, - collector_info.id, - ); + let producer = nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }; datastore - .producer_endpoint_create(&opctx, &producer) + .producer_endpoint_upsert_and_assign(&opctx, &producer) .await .expect("failed to insert producer"); @@ -851,7 +939,7 @@ mod tests { .await .expect("failed to list all producers"); assert_eq!(all_producers.len(), 1); - assert_eq!(all_producers[0].id(), producer.id()); + assert_eq!(all_producers[0].id(), producer.id); // Steal this producer so we have a database-precision timestamp and can // use full equality checks moving forward. diff --git a/nexus/db-queries/src/db/queries/oximeter.rs b/nexus/db-queries/src/db/queries/oximeter.rs index 40f7a2b493..ab2d194c3e 100644 --- a/nexus/db-queries/src/db/queries/oximeter.rs +++ b/nexus/db-queries/src/db/queries/oximeter.rs @@ -4,10 +4,165 @@ //! Implementation of queries for Oximeter collectors and producers. +use crate::db::column_walker::AllColumnsOf; use crate::db::raw_query_builder::{QueryBuilder, TypedSqlQuery}; +use diesel::pg::Pg; use diesel::sql_types; +use ipnetwork::IpNetwork; +use nexus_db_model::{OximeterInfo, ProducerKind, ProducerKindEnum, SqlU16}; +use omicron_common::api::internal; use uuid::Uuid; +type AllColumnsOfOximeterInfo = + AllColumnsOf; +type SelectableSql = < + >::SelectExpression as diesel::Expression +>::SqlType; + +/// Upsert a metric producer. +/// +/// If the producer is being inserted for the first time, a random Oximeter will +/// be chosen from among all non-expunged entries in the `oximeter` table. +/// +/// If the producer is being updated, it will keep its existing Oximeter as long +/// as that Oximeter has not been expunged. If its previously-chosen Oximeter +/// has been expunged, its assignment will be changed to a random non-expunged +/// Oximeter. +/// +/// If this query succeeds but returns 0 rows inserted/updated, there are no +/// non-expunged `Oximeter` instances to choose. +/// +/// Returns the oximeter ID assigned to this producer (either the +/// randomly-chosen one, if newly inserted or updated-from-an-expunged, or the +/// previously-chosen, if updated and the existing assignment is still valid). +pub fn upsert_producer( + producer: &internal::nexus::ProducerEndpoint, +) -> TypedSqlQuery> { + let builder = QueryBuilder::new(); + + // Select the existing oximeter ID for this producer, if it exists and is + // not expunged. + let builder = builder + .sql( + r#" + WITH existing_oximeter AS ( + SELECT oximeter.id + FROM metric_producer INNER JOIN oximeter + ON (metric_producer.oximeter_id = oximeter.id) + WHERE + oximeter.time_expunged IS NULL + AND metric_producer.id = "#, + ) + .param() + .bind::(producer.id) + .sql("), "); + + // Choose a random non-expunged Oximeter instance to use if the previous + // clause did not find an existing, non-expunged Oximeter. + let builder = builder.sql( + r#" + random_oximeter AS ( + SELECT id FROM oximeter + WHERE time_expunged IS NULL + ORDER BY random() + LIMIT 1 + ), + "#, + ); + + // Combine the previous two queries. The `LEFT JOIN ... ON true` ensures we + // always get a row from this clause if there is _any_ non-expunged Oximeter + // available. + let builder = builder.sql( + r#" + chosen_oximeter AS ( + SELECT COALESCE(existing_oximeter.id, random_oximeter.id) AS oximeter_id + FROM random_oximeter LEFT JOIN existing_oximeter ON true + ), + "#, + ); + + // Build the INSERT for new producers... + let builder = builder.sql( + r#" + inserted_producer AS ( + INSERT INTO metric_producer ( + id, + time_created, + time_modified, + kind, + ip, + port, + interval, + oximeter_id + ) + "#, + ); + + // ... by querying our chosen oximeter ID and the values from `producer`. + let builder = builder + .sql("SELECT ") + .param() + .bind::(producer.id) + .sql(", now()") // time_created + .sql(", now()") // time_modified + .sql(", ") + .param() + .bind::(producer.kind.into()) + .sql(", ") + .param() + .bind::(producer.address.ip().into()) + .sql(", ") + .param() + .bind::(producer.address.port().into()) + .sql(", ") + .param() + .bind::(producer.interval.as_secs_f32()) + .sql(", oximeter_id FROM chosen_oximeter"); + + // If the producer already exists, update everything except id/time_created. + // This will keep the existing `oximeter_id` if we got a non-NULL value from + // the first clause in our CTE (selecting the existing oximeter id if it's + // not expunged), or reassign to our randomly-chosen one (the second clause + // above) if our current assignment is expunged. + let builder = builder.sql( + r#" + ON CONFLICT (id) + DO UPDATE SET + time_modified = now(), + kind = excluded.kind, + ip = excluded.ip, + port = excluded.port, + interval = excluded.interval, + oximeter_id = excluded.oximeter_id + "#, + ); + + // ... and return this producer's assigned collector ID. + let builder = builder.sql( + r#" + RETURNING oximeter_id + ) + "#, + ); + + // Finally, join the oximeter ID from our inserted or updated producer with + // the `oximeter` table to get all of its information. + let builder = builder + .sql("SELECT ") + .sql(AllColumnsOfOximeterInfo::with_prefix("oximeter")) + .sql( + r#" + FROM oximeter + INNER JOIN inserted_producer + ON (oximeter.id = inserted_producer.oximeter_id) + WHERE oximeter.time_expunged IS NULL + "#, + ); + + builder.query() +} + /// For a given Oximeter instance (which is presumably no longer running), /// reassign any producers assigned to it to a different Oximeter. Each /// assignment is randomly chosen from among the non-expunged Oximeter instances @@ -70,13 +225,32 @@ mod test { use crate::db::raw_query_builder::expectorate_query_contents; use nexus_test_utils::db::test_setup_database; use omicron_test_utils::dev; + use std::time::Duration; use uuid::Uuid; - // This test is a bit of a "change detector", but it's here to help with - // debugging too. If you change this query, it can be useful to see exactly + // These tests are a bit of a "change detector", but it's here to help with + // debugging too. If you change these query, it can be useful to see exactly // how the output SQL has been altered. #[tokio::test] - async fn expectorate_query() { + async fn expectorate_query_upsert_producer() { + let producer = internal::nexus::ProducerEndpoint { + id: Uuid::nil(), + kind: ProducerKind::SledAgent.into(), + address: "[::1]:0".parse().unwrap(), + interval: Duration::from_secs(30), + }; + + let query = upsert_producer(&producer); + + expectorate_query_contents( + &query, + "tests/output/oximeter_upsert_producer.sql", + ) + .await; + } + + #[tokio::test] + async fn expectorate_query_reassign_producers() { let oximeter_id = Uuid::nil(); let query = reassign_producers_query(oximeter_id); @@ -88,10 +262,36 @@ mod test { .await; } - // Explain the SQL query to ensure that it creates a valid SQL string. + // Explain the SQL queries to ensure that they create valid SQL strings. + #[tokio::test] + async fn explainable_upsert_producer() { + let logctx = dev::test_setup_log("explainable_upsert_producer"); + let log = logctx.log.new(o!()); + let mut db = test_setup_database(&log).await; + let cfg = crate::db::Config { url: db.pg_config().clone() }; + let pool = crate::db::Pool::new_single_host(&logctx.log, &cfg); + let conn = pool.claim().await.unwrap(); + + let producer = internal::nexus::ProducerEndpoint { + id: Uuid::nil(), + kind: ProducerKind::SledAgent.into(), + address: "[::1]:0".parse().unwrap(), + interval: Duration::from_secs(30), + }; + + let query = upsert_producer(&producer); + let _ = query + .explain_async(&conn) + .await + .expect("Failed to explain query - is it valid SQL?"); + + db.cleanup().await.unwrap(); + logctx.cleanup_successful(); + } + #[tokio::test] - async fn explainable() { - let logctx = dev::test_setup_log("explainable"); + async fn explainable_reassign_producers() { + let logctx = dev::test_setup_log("explainable_reassign_producers"); let log = logctx.log.new(o!()); let mut db = test_setup_database(&log).await; let cfg = crate::db::Config { url: db.pg_config().clone() }; diff --git a/nexus/db-queries/tests/output/oximeter_upsert_producer.sql b/nexus/db-queries/tests/output/oximeter_upsert_producer.sql new file mode 100644 index 0000000000..4ef2b4082f --- /dev/null +++ b/nexus/db-queries/tests/output/oximeter_upsert_producer.sql @@ -0,0 +1,52 @@ +WITH + existing_oximeter + AS ( + SELECT + oximeter.id + FROM + metric_producer INNER JOIN oximeter ON metric_producer.oximeter_id = oximeter.id + WHERE + oximeter.time_expunged IS NULL AND metric_producer.id = $1 + ), + random_oximeter + AS (SELECT id FROM oximeter WHERE time_expunged IS NULL ORDER BY random() LIMIT 1), + chosen_oximeter + AS ( + SELECT + COALESCE(existing_oximeter.id, random_oximeter.id) AS oximeter_id + FROM + random_oximeter LEFT JOIN existing_oximeter ON true + ), + inserted_producer + AS ( + INSERT + INTO + metric_producer (id, time_created, time_modified, kind, ip, port, "interval", oximeter_id) + SELECT + $2, now(), now(), $3, $4, $5, $6, oximeter_id + FROM + chosen_oximeter + ON CONFLICT + (id) + DO + UPDATE SET + time_modified = now(), + kind = excluded.kind, + ip = excluded.ip, + port = excluded.port, + "interval" = excluded.interval, + oximeter_id = excluded.oximeter_id + RETURNING + oximeter_id + ) +SELECT + oximeter.id, + oximeter.time_created, + oximeter.time_modified, + oximeter.time_expunged, + oximeter.ip, + oximeter.port +FROM + oximeter INNER JOIN inserted_producer ON oximeter.id = inserted_producer.oximeter_id +WHERE + oximeter.time_expunged IS NULL diff --git a/nexus/metrics-producer-gc/src/lib.rs b/nexus/metrics-producer-gc/src/lib.rs index 4ed8f1bbb5..407af2fdbd 100644 --- a/nexus/metrics-producer-gc/src/lib.rs +++ b/nexus/metrics-producer-gc/src/lib.rs @@ -239,22 +239,19 @@ mod tests { assert!(pruned.failures.is_empty()); // Insert a producer. - let producer = ProducerEndpoint::new( - &nexus::ProducerEndpoint { - id: Uuid::new_v4(), - kind: nexus::ProducerKind::Service, - address: "[::1]:0".parse().unwrap(), // unused - interval: Duration::from_secs(0), // unused - }, - collector_info.id, - ); + let producer = nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }; datastore - .producer_endpoint_create(&opctx, &producer) + .producer_endpoint_upsert_and_assign(&opctx, &producer) .await .expect("failed to insert producer"); let producer_time_modified = - read_time_modified(&datastore, producer.id()).await; + read_time_modified(&datastore, producer.id).await; // GC'ing expired producers with an expiration time older than our // producer's `time_modified` should not prune anything. @@ -278,7 +275,7 @@ mod tests { .await .expect("failed to prune expired producers"); let expected_success = - [producer.id()].into_iter().collect::>(); + [producer.id].into_iter().collect::>(); assert_eq!(pruned.successes, expected_success); assert!(pruned.failures.is_empty()); @@ -321,22 +318,19 @@ mod tests { .expect("failed to insert collector"); // Insert a producer. - let producer = ProducerEndpoint::new( - &nexus::ProducerEndpoint { - id: Uuid::new_v4(), - kind: nexus::ProducerKind::Service, - address: "[::1]:0".parse().unwrap(), // unused - interval: Duration::from_secs(0), // unused - }, - collector_info.id, - ); + let producer = nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }; datastore - .producer_endpoint_create(&opctx, &producer) + .producer_endpoint_upsert_and_assign(&opctx, &producer) .await .expect("failed to insert producer"); let producer_time_modified = - read_time_modified(&datastore, producer.id()).await; + read_time_modified(&datastore, producer.id).await; // GC'ing expired producers with an expiration time _newer_ than our // producer's `time_modified` should prune our one producer and notify @@ -344,7 +338,7 @@ mod tests { collector.expect( Expectation::matching(request::method_path( "DELETE", - format!("/producers/{}", producer.id()), + format!("/producers/{}", producer.id), )) .respond_with(status_code(204)), ); @@ -357,7 +351,7 @@ mod tests { .await .expect("failed to prune expired producers"); let expected_success = - [producer.id()].into_iter().collect::>(); + [producer.id].into_iter().collect::>(); assert_eq!(pruned.successes, expected_success); assert!(pruned.failures.is_empty()); diff --git a/nexus/src/app/background/tasks/metrics_producer_gc.rs b/nexus/src/app/background/tasks/metrics_producer_gc.rs index 1df0afb7ed..7e06148041 100644 --- a/nexus/src/app/background/tasks/metrics_producer_gc.rs +++ b/nexus/src/app/background/tasks/metrics_producer_gc.rs @@ -116,10 +116,9 @@ mod tests { use httptest::Expectation; use nexus_db_model::OximeterInfo; use nexus_db_queries::context::OpContext; - use nexus_db_queries::db::model::ProducerEndpoint; use nexus_test_utils_macros::nexus_test; - use nexus_types::identity::Asset; use nexus_types::internal_api::params; + use omicron_common::api::external::DataPageParams; use omicron_common::api::internal::nexus; use omicron_common::api::internal::nexus::ProducerRegistrationResponse; use serde_json::json; @@ -158,17 +157,24 @@ mod tests { datastore.clone(), ); - let mut collector = httptest::Server::run(); - - // Insert an Oximeter collector - let collector_info = OximeterInfo::new(¶ms::OximeterInfo { - collector_id: Uuid::new_v4(), - address: collector.addr(), - }); - datastore - .oximeter_create(&opctx, &collector_info) + // Producer <-> collector assignment is random. We're going to create a + // mock collector below then insert a producer, and we want to guarantee + // the producer is assigned to the mock collector. To do so, we need to + // expunge the "real" collector set up by `nexus_test`. We'll phrase + // this as a loop to match the datastore methods and in case nexus_test + // ever starts multiple collectors. + for oximeter_info in datastore + .oximeter_list(&opctx, &DataPageParams::max_page()) .await - .expect("failed to insert collector"); + .expect("listed oximeters") + { + datastore + .oximeter_expunge(&opctx, oximeter_info.id) + .await + .expect("expunged oximeter"); + } + + let mut collector = httptest::Server::run(); // There are several producers which automatically register themselves // during tests, from Nexus and the simulated sled-agent for example. We @@ -184,18 +190,25 @@ mod tests { .respond_with(status_code(201).body(body)), ); + // Insert an Oximeter collector + let collector_info = OximeterInfo::new(¶ms::OximeterInfo { + collector_id: Uuid::new_v4(), + address: collector.addr(), + }); + datastore + .oximeter_create(&opctx, &collector_info) + .await + .expect("failed to insert collector"); + // Insert a producer. - let producer = ProducerEndpoint::new( - &nexus::ProducerEndpoint { - id: Uuid::new_v4(), - kind: nexus::ProducerKind::Service, - address: "[::1]:0".parse().unwrap(), // unused - interval: Duration::from_secs(0), // unused - }, - collector_info.id, - ); + let producer = nexus::ProducerEndpoint { + id: Uuid::new_v4(), + kind: nexus::ProducerKind::Service, + address: "[::1]:0".parse().unwrap(), // unused + interval: Duration::from_secs(0), // unused + }; datastore - .producer_endpoint_create(&opctx, &producer) + .producer_endpoint_upsert_and_assign(&opctx, &producer) .await .expect("failed to insert producer"); @@ -215,7 +228,7 @@ mod tests { // ago, which should result in it being pruned. set_time_modified( &datastore, - producer.id(), + producer.id, Utc::now() - chrono::TimeDelta::hours(2), ) .await; @@ -224,7 +237,7 @@ mod tests { collector.expect( Expectation::matching(request::method_path( "DELETE", - format!("/producers/{}", producer.id()), + format!("/producers/{}", producer.id), )) .respond_with(status_code(204)), ); @@ -235,7 +248,7 @@ mod tests { assert!(value.contains_key("expiration")); assert_eq!( *value.get("pruned").expect("missing `pruned`"), - json!([producer.id()]) + json!([producer.id]) ); collector.verify_and_clear(); diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index 10ec048654..b935c95349 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -13,8 +13,7 @@ use nexus_db_queries::context::OpContext; use nexus_db_queries::db; use nexus_db_queries::db::DataStore; use omicron_common::address::CLICKHOUSE_HTTP_PORT; -use omicron_common::api::external::{DataPageParams, ListResultVec}; -use omicron_common::api::external::{Error, LookupType, ResourceType}; +use omicron_common::api::external::{DataPageParams, Error, ListResultVec}; use omicron_common::api::internal::nexus::{self, ProducerEndpoint}; use oximeter_client::Client as OximeterClient; use oximeter_db::query::Timestamp; @@ -113,55 +112,32 @@ impl super::Nexus { opctx: &OpContext, producer_info: nexus::ProducerEndpoint, ) -> Result<(), Error> { - for attempt in 0.. { - let (collector, id) = self.next_collector(opctx).await?; - let db_info = db::model::ProducerEndpoint::new(&producer_info, id); + let collector_info = self + .db_datastore + .producer_endpoint_upsert_and_assign(opctx, &producer_info) + .await?; - // We chose the collector in `self.next_collector` above; if we get - // an "Oximeter not found" error when we try to create a producer - // assigned to that collector, we've lost an extremely rare race - // where the collector we chose was deleted in between when we chose - // it and when we tried to assign it. If we hit this, we should just - // pick another collector and try again. - // - // To safeguard against some other bug forcing us into an infinite - // loop here, we'll only retry once. Losing this particular race - // once is exceedingly unlikely; losing it twice probably means - // something else is wrong, so we'll just return the error. - match self - .db_datastore - .producer_endpoint_create(opctx, &db_info) - .await - { - Ok(()) => (), // fallthrough - Err(Error::ObjectNotFound { - type_name: ResourceType::Oximeter, - lookup_type: LookupType::ById(bad_id), - }) if id == bad_id && attempt == 0 => { - // We lost the race on our first try; try again. - continue; - } - // Any other error or we lost the race twice; fail. - Err(err) => return Err(err), - } + let address = SocketAddr::from(( + collector_info.ip.ip(), + collector_info.port.try_into().unwrap(), + )); + let collector = + build_oximeter_client(&self.log, &collector_info.id, address); - collector - .producers_post( - &oximeter_client::types::ProducerEndpoint::from( - &producer_info, - ), - ) - .await - .map_err(Error::from)?; - info!( - self.log, - "assigned collector to new producer"; - "producer_id" => ?producer_info.id, - "collector_id" => ?id, - ); - return Ok(()); - } - unreachable!("for loop always returns after at most two iterations") + collector + .producers_post(&oximeter_client::types::ProducerEndpoint::from( + &producer_info, + )) + .await + .map_err(Error::from)?; + info!( + self.log, + "assigned collector to new producer"; + "producer_id" => %producer_info.id, + "collector_id" => %collector_info.id, + ); + + Ok(()) } /// Returns a results from the timeseries DB based on the provided query @@ -272,27 +248,6 @@ impl super::Nexus { ) .unwrap()) } - - // Return an oximeter collector to assign a newly-registered producer - async fn next_collector( - &self, - opctx: &OpContext, - ) -> Result<(OximeterClient, Uuid), Error> { - // TODO-robustness Replace with a real load-balancing strategy. - let page_params = DataPageParams { - marker: None, - direction: dropshot::PaginationOrder::Ascending, - limit: std::num::NonZeroU32::new(1).unwrap(), - }; - let oxs = self.db_datastore.oximeter_list(opctx, &page_params).await?; - let info = oxs.first().ok_or_else(|| Error::ServiceUnavailable { - internal_message: String::from("no oximeter collectors available"), - })?; - let address = - SocketAddr::from((info.ip.ip(), info.port.try_into().unwrap())); - let id = info.id; - Ok((build_oximeter_client(&self.log, &id, address), id)) - } } /// Idempotently un-assign a producer from an oximeter collector.