diff --git a/nexus/db-model/src/oximeter_info.rs b/nexus/db-model/src/oximeter_info.rs index ac30384c59..39bde98ea8 100644 --- a/nexus/db-model/src/oximeter_info.rs +++ b/nexus/db-model/src/oximeter_info.rs @@ -8,7 +8,7 @@ use chrono::{DateTime, Utc}; use nexus_types::internal_api; use uuid::Uuid; -/// Message used to notify Nexus that this oximeter instance is up and running. +/// A record representing a registered `oximeter` collector. #[derive(Queryable, Insertable, Debug, Clone, Copy)] #[diesel(table_name = oximeter)] pub struct OximeterInfo { @@ -18,8 +18,9 @@ pub struct OximeterInfo { pub time_created: DateTime, /// When this resource was last modified. pub time_modified: DateTime, - /// The address on which this oximeter instance listens for requests + /// The address on which this `oximeter` instance listens for requests. pub ip: ipnetwork::IpNetwork, + /// The port on which this `oximeter` instance listens for requests. pub port: SqlU16, } diff --git a/nexus/db-queries/src/db/datastore/oximeter.rs b/nexus/db-queries/src/db/datastore/oximeter.rs index c9b3a59b05..55b650ea53 100644 --- a/nexus/db-queries/src/db/datastore/oximeter.rs +++ b/nexus/db-queries/src/db/datastore/oximeter.rs @@ -21,7 +21,20 @@ use omicron_common::api::external::ResourceType; use uuid::Uuid; impl DataStore { - // Create a record for a new Oximeter instance + /// Lookup an oximeter instance by its ID. + pub async fn oximeter_lookup( + &self, + id: &Uuid, + ) -> Result { + use db::schema::oximeter::dsl; + dsl::oximeter + .find(*id) + .first_async(&*self.pool_connection_unauthorized().await?) + .await + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// Create a record for a new Oximeter instance pub async fn oximeter_create( &self, info: &OximeterInfo, @@ -55,7 +68,7 @@ impl DataStore { Ok(()) } - // List the oximeter collector instances + /// List the oximeter collector instances pub async fn oximeter_list( &self, page_params: &DataPageParams<'_, Uuid>, @@ -69,7 +82,7 @@ impl DataStore { .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) } - // Create a record for a new producer endpoint + /// Create a record for a new producer endpoint pub async fn producer_endpoint_create( &self, producer: &ProducerEndpoint, @@ -102,7 +115,27 @@ impl DataStore { Ok(()) } - // List the producer endpoint records by the oximeter instance to which they're assigned. + /// Delete a record for a producer endpoint, by its ID. + /// + /// This is idempotent, and deleting a record that is already removed is a + /// no-op. If the record existed, then the ID of the `oximeter` collector is + /// returned. If there was no record, `None` is returned. + pub async fn producer_endpoint_delete( + &self, + id: &Uuid, + ) -> Result, Error> { + use db::schema::metric_producer::dsl; + diesel::delete(dsl::metric_producer.find(*id)) + .returning(dsl::oximeter_id) + .get_result_async::( + &*self.pool_connection_unauthorized().await?, + ) + .await + .optional() + .map_err(|e| public_error_from_diesel(e, ErrorHandler::Server)) + } + + /// List the producer endpoint records by the oximeter instance to which they're assigned. pub async fn producers_list_by_oximeter_id( &self, oximeter_id: Uuid, diff --git a/nexus/src/app/instance.rs b/nexus/src/app/instance.rs index 17d033c5a0..923bb1777e 100644 --- a/nexus/src/app/instance.rs +++ b/nexus/src/app/instance.rs @@ -1320,7 +1320,9 @@ impl super::Nexus { .await?; // If the supplied instance state indicates that the instance no longer - // has an active VMM, attempt to delete the virtual provisioning record + // has an active VMM, attempt to delete the virtual provisioning record, + // and the assignment of the Propolis metric producer to an oximeter + // collector. // // As with updating networking state, this must be done before // committing the new runtime state to the database: once the DB is @@ -1338,6 +1340,21 @@ impl super::Nexus { (&new_runtime_state.instance_state.gen).into(), ) .await?; + + // TODO-correctness: The `notify_instance_updated` method can run + // concurrently with itself in some situations, such as where a + // sled-agent attempts to update Nexus about a stopped instance; + // that times out; and it makes another request to a different + // Nexus. The call to `unassign_producer` is racy in those + // situations, and we may end with instances with no metrics. + // + // This unfortunate case should be handled as part of + // instance-lifecycle improvements, notably using a reliable + // persistent workflow to correctly update the oximete assignment as + // an instance's state changes. + // + // Tracked in https://github.com/oxidecomputer/omicron/issues/3742. + self.unassign_producer(instance_id).await?; } // Write the new instance and VMM states back to CRDB. This needs to be diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index bc947cf4bc..03f833b087 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -187,6 +187,58 @@ impl super::Nexus { Ok(()) } + /// Idempotently un-assign a producer from an oximeter collector. + pub(crate) async fn unassign_producer( + &self, + id: &Uuid, + ) -> Result<(), Error> { + if let Some(collector_id) = + self.db_datastore.producer_endpoint_delete(id).await? + { + debug!( + self.log, + "deleted metric producer assignment"; + "producer_id" => %id, + "collector_id" => %collector_id, + ); + let oximeter_info = + self.db_datastore.oximeter_lookup(&collector_id).await?; + let address = + SocketAddr::new(oximeter_info.ip.ip(), *oximeter_info.port); + let client = self.build_oximeter_client(&id, address); + if let Err(e) = client.producer_delete(&id).await { + error!( + self.log, + "failed to delete producer from collector"; + "producer_id" => %id, + "collector_id" => %collector_id, + "address" => %address, + "error" => ?e, + ); + return Err(Error::internal_error( + format!("failed to delete producer from collector: {e:?}") + .as_str(), + )); + } else { + debug!( + self.log, + "successfully deleted producer from collector"; + "producer_id" => %id, + "collector_id" => %collector_id, + "address" => %address, + ); + Ok(()) + } + } else { + trace!( + self.log, + "un-assigned non-existent metric producer"; + "producer_id" => %id, + ); + Ok(()) + } + } + /// Returns a results from the timeseries DB based on the provided query /// parameters. ///