diff --git a/Cargo.lock b/Cargo.lock index c256d90af5..d39b76d616 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3227,6 +3227,7 @@ dependencies = [ "cfg-if 1.0.0", "chrono", "clap 3.2.12", + "crucible", "crucible-agent-client", "ddm-admin-client", "dropshot", @@ -3245,6 +3246,8 @@ dependencies = [ "openapiv3", "opte", "opte-ioctl", + "oximeter 0.1.0", + "oximeter-producer 0.1.0", "p256", "percent-encoding", "progenitor", diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index 7f6fb9b6ff..050e228b71 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -8,7 +8,9 @@ use crate::authz; use crate::context::OpContext; use crate::db; use crate::db::identity::Asset; +use crate::external_api::params::ResourceMetrics; use crate::internal_api::params::OximeterInfo; +use dropshot::PaginationParams; use internal_dns_client::{ multiclient::{ResolveError, Resolver}, names::{ServiceName, SRV}, @@ -21,6 +23,8 @@ use omicron_common::api::external::PaginationOrder; use omicron_common::api::internal::nexus; use omicron_common::backoff; use oximeter_client::Client as OximeterClient; +use oximeter_db::query::Timestamp; +use oximeter_db::Measurement; use oximeter_db::TimeseriesSchema; use oximeter_db::TimeseriesSchemaPaginationParams; use oximeter_producer::register; @@ -212,14 +216,112 @@ impl super::Nexus { .map_err(|e| Error::internal_error(&e.to_string()))? .timeseries_schema_list(&pag_params.page, limit) .await - .map_err(|e| match e { - oximeter_db::Error::DatabaseUnavailable(_) => { - Error::ServiceUnavailable { - internal_message: e.to_string(), - } + .map_err(map_oximeter_err) + } + + /// Returns a results from the timeseries DB based on the provided query + /// parameters. + /// + /// * `timeseries_name`: The "target:metric" name identifying the metric to + /// be queried. + /// * `criteria`: Any additional parameters to help narrow down the query + /// selection further. These parameters are passed directly to + /// [oximeter::db::Client::select_timeseries_with]. + /// * `query_params`: Pagination parameter, identifying which page of + /// results to return. + /// * `limit`: The maximum number of results to return in a paginated + /// request. + pub async fn select_timeseries( + &self, + timeseries_name: &str, + criteria: &[&str], + query_params: PaginationParams, + limit: NonZeroU32, + ) -> Result, Error> { + #[inline] + fn no_results() -> dropshot::ResultsPage { + dropshot::ResultsPage { next_page: None, items: Vec::new() } + } + + let (start_time, end_time, query) = match query_params.page { + // Generally, we want the time bounds to be inclusive for the + // start time, and exclusive for the end time... + dropshot::WhichPage::First(query) => ( + Timestamp::Inclusive(query.start_time), + Timestamp::Exclusive(query.end_time), + query, + ), + // ... but for subsequent pages, we use the "last observed" + // timestamp as the start time. If we used an inclusive bound, + // we'd duplicate the returned measurement. To return each + // measurement exactly once, we make the start time "exclusive" + // on all "next" pages. + dropshot::WhichPage::Next(query) => ( + Timestamp::Exclusive(query.start_time), + Timestamp::Exclusive(query.end_time), + query, + ), + }; + if query.start_time >= query.end_time { + return Ok(no_results()); + } + + let timeseries_list = self + .timeseries_client + .get() + .await + .map_err(|e| { + Error::internal_error(&format!( + "Cannot access timeseries DB: {}", + e + )) + })? + .select_timeseries_with( + timeseries_name, + criteria, + Some(start_time), + Some(end_time), + Some(limit), + ) + .await + .or_else(|err| { + // If the timeseries name exists in the API, but not in Clickhouse, + // it might just not have been populated yet. + match err { + oximeter_db::Error::TimeseriesNotFound(_) => Ok(vec![]), + _ => Err(err), } - _ => Error::InternalError { internal_message: e.to_string() }, }) + .map_err(map_oximeter_err)?; + + if timeseries_list.len() > 1 { + return Err(Error::internal_error(&format!( + "expected 1 timeseries but got {} ({:?} {:?})", + timeseries_list.len(), + timeseries_name, + criteria + ))); + } + + // If we received no data, exit early. + let timeseries = + if let Some(timeseries) = timeseries_list.into_iter().next() { + timeseries + } else { + return Ok(no_results()); + }; + + Ok(dropshot::ResultsPage::new( + timeseries.measurements, + &query, + |last_measurement: &Measurement, query: &ResourceMetrics| { + ResourceMetrics { + start_time: last_measurement.timestamp(), + end_time: query.end_time, + } + }, + ) + .unwrap()) } // Internal helper to build an Oximeter client from its ID and address (common data between @@ -259,3 +361,12 @@ impl super::Nexus { Ok((self.build_oximeter_client(&id, address), id)) } } + +fn map_oximeter_err(error: oximeter_db::Error) -> Error { + match error { + oximeter_db::Error::DatabaseUnavailable(_) => { + Error::ServiceUnavailable { internal_message: error.to_string() } + } + _ => Error::InternalError { internal_message: error.to_string() }, + } +} diff --git a/nexus/src/external_api/http_entrypoints.rs b/nexus/src/external_api/http_entrypoints.rs index 4a94968b0f..7b6e4fdcfb 100644 --- a/nexus/src/external_api/http_entrypoints.rs +++ b/nexus/src/external_api/http_entrypoints.rs @@ -67,6 +67,7 @@ use omicron_common::api::external::Saga; use omicron_common::api::external::VpcFirewallRuleUpdateParams; use omicron_common::api::external::VpcFirewallRules; use omicron_common::bail_unless; +use parse_display::Display; use ref_cast::RefCast; use schemars::JsonSchema; use serde::Deserialize; @@ -115,6 +116,7 @@ pub fn external_api() -> NexusApiDescription { api.register(disk_view)?; api.register(disk_view_by_id)?; api.register(disk_delete)?; + api.register(disk_metrics_list)?; api.register(instance_list)?; api.register(instance_create)?; @@ -1515,6 +1517,65 @@ async fn disk_delete( apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await } +#[derive(Display, Deserialize, JsonSchema)] +#[display(style = "snake_case")] +#[serde(rename_all = "snake_case")] +pub enum DiskMetricName { + Activated, + Flush, + Read, + ReadBytes, + Write, + WriteBytes, +} + +/// Fetch metrics for a disk. +#[endpoint { + method = GET, + path = "/organizations/{organization_name}/projects/{project_name}/disks/{disk_name}/metrics/{metric_name}", + tags = ["disks"], +}] +async fn disk_metrics_list( + rqctx: Arc>>, + path_params: Path>, + query_params: Query< + PaginationParams, + >, +) -> Result>, HttpError> { + let apictx = rqctx.context(); + let nexus = &apictx.nexus; + + let path = path_params.into_inner(); + let organization_name = &path.inner.organization_name; + let project_name = &path.inner.project_name; + let disk_name = &path.inner.disk_name; + let metric_name = path.metric_name; + + let query = query_params.into_inner(); + let limit = rqctx.page_limit(&query)?; + + let handler = async { + let opctx = OpContext::for_external_api(&rqctx).await?; + + // This ensures the user is authorized on Action::Read for this disk + let disk = nexus + .disk_fetch(&opctx, organization_name, project_name, disk_name) + .await?; + let upstairs_uuid = disk.id(); + let result = nexus + .select_timeseries( + &format!("crucible_upstairs:{}", metric_name), + &[&format!("upstairs_uuid=={}", upstairs_uuid)], + query, + limit, + ) + .await?; + + Ok(HttpResponseOk(result)) + }; + apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await +} + // Instances /// List instances in a project. @@ -4093,6 +4154,15 @@ async fn session_sshkey_delete( apictx.external_latencies.instrument_dropshot_handler(&rqctx, handler).await } +/// Path parameters for metrics requests where `/metrics/{metric_name}` is +/// appended to an existing path parameter type +#[derive(Deserialize, JsonSchema)] +struct MetricsPathParam { + #[serde(flatten)] + inner: T, + metric_name: M, +} + #[cfg(test)] mod test { use super::external_api; diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index 211e3116a8..73279124b5 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -159,6 +159,7 @@ pub async fn test_setup_with_config( ) .await .unwrap(); + register_test_producer(&producer).unwrap(); ControlPlaneTestContext { server, @@ -253,6 +254,10 @@ impl oximeter::Producer for IntegrationProducer { } } +/// Creates and starts a producer server. +/// +/// Actual producers can be registered with the [`register_producer`] +/// helper function. pub async fn start_producer_server( nexus_address: SocketAddr, id: Uuid, @@ -281,9 +286,22 @@ pub async fn start_producer_server( }; let server = ProducerServer::start(&config).await.map_err(|e| e.to_string())?; + Ok(server) +} +/// Registers an arbitrary producer with the test server. +pub fn register_producer( + server: &ProducerServer, + producer: impl oximeter::Producer, +) -> Result<(), String> { + server.registry().register_producer(producer).map_err(|e| e.to_string())?; + Ok(()) +} + +/// Registers a sample-generating test-specific producer. +pub fn register_test_producer(server: &ProducerServer) -> Result<(), String> { // Create and register an actual metric producer. - let producer = IntegrationProducer { + let test_producer = IntegrationProducer { target: IntegrationTarget { name: "integration-test-target".to_string(), }, @@ -292,8 +310,7 @@ pub async fn start_producer_server( datum: 0, }, }; - server.registry().register_producer(producer).map_err(|e| e.to_string())?; - Ok(server) + register_producer(server, test_producer) } /// Returns whether the two identity metadata objects are identical. diff --git a/nexus/test-utils/src/resource_helpers.rs b/nexus/test-utils/src/resource_helpers.rs index c3028f19f8..504c9cdd5c 100644 --- a/nexus/test-utils/src/resource_helpers.rs +++ b/nexus/test-utils/src/resource_helpers.rs @@ -188,28 +188,35 @@ pub async fn create_disk( .await } +/// Creates an instance with a default NIC and no disks. +/// +/// Wrapper around [`create_instance_with`]. pub async fn create_instance( client: &ClientTestContext, organization_name: &str, project_name: &str, instance_name: &str, ) -> Instance { - create_instance_with_nics( + create_instance_with( client, organization_name, project_name, instance_name, ¶ms::InstanceNetworkInterfaceAttachment::Default, + // Disks= + vec![], ) .await } -pub async fn create_instance_with_nics( +/// Creates an instance with attached resou8rces. +pub async fn create_instance_with( client: &ClientTestContext, organization_name: &str, project_name: &str, instance_name: &str, nics: ¶ms::InstanceNetworkInterfaceAttachment, + disks: Vec, ) -> Instance { let url = format!( "/organizations/{}/projects/{}/instances", @@ -231,7 +238,7 @@ pub async fn create_instance_with_nics( .to_vec(), network_interfaces: nics.clone(), external_ips: vec![], - disks: vec![], + disks, }, ) .await diff --git a/nexus/tests/integration_tests/disks.rs b/nexus/tests/integration_tests/disks.rs index 5060fb0fc8..ed88978bd4 100644 --- a/nexus/tests/integration_tests/disks.rs +++ b/nexus/tests/integration_tests/disks.rs @@ -4,20 +4,25 @@ //! Tests basic disk support in the API +use chrono::Utc; use crucible_agent_client::types::State as RegionState; use dropshot::test_util::ClientTestContext; use dropshot::HttpErrorResponseBody; +use dropshot::ResultsPage; use http::method::Method; use http::StatusCode; use nexus_test_utils::http_testing::AuthnMode; +use nexus_test_utils::http_testing::Collection; use nexus_test_utils::http_testing::NexusRequest; use nexus_test_utils::http_testing::RequestBuilder; use nexus_test_utils::identity_eq; use nexus_test_utils::resource_helpers::create_disk; use nexus_test_utils::resource_helpers::create_instance; +use nexus_test_utils::resource_helpers::create_instance_with; use nexus_test_utils::resource_helpers::create_ip_pool; use nexus_test_utils::resource_helpers::create_organization; use nexus_test_utils::resource_helpers::create_project; +use nexus_test_utils::resource_helpers::objects_list_page_authz; use nexus_test_utils::resource_helpers::DiskTest; use nexus_test_utils::ControlPlaneTestContext; use nexus_test_utils_macros::nexus_test; @@ -27,8 +32,11 @@ use omicron_common::api::external::DiskState; use omicron_common::api::external::IdentityMetadataCreateParams; use omicron_common::api::external::Instance; use omicron_common::api::external::Name; +use omicron_common::backoff; use omicron_nexus::TestInterfaces as _; use omicron_nexus::{external_api::params, Nexus}; +use oximeter::types::Datum; +use oximeter::types::Measurement; use sled_agent_client::TestInterfaces as _; use std::sync::Arc; use uuid::Uuid; @@ -822,6 +830,146 @@ async fn test_disk_reject_total_size_not_divisible_by_min_disk_size( ); } +async fn create_instance_with_disk(client: &ClientTestContext) { + create_instance_with( + &client, + ORG_NAME, + PROJECT_NAME, + INSTANCE_NAME, + ¶ms::InstanceNetworkInterfaceAttachment::Default, + vec![params::InstanceDiskAttachment::Attach( + params::InstanceDiskAttach { name: DISK_NAME.parse().unwrap() }, + )], + ) + .await; +} + +const ALL_METRICS: [&'static str; 6] = + ["activated", "read", "write", "read_bytes", "write_bytes", "flush"]; + +async fn query_for_metrics_until_they_exist( + client: &ClientTestContext, + path: &str, +) -> ResultsPage { + backoff::retry_notify( + backoff::internal_service_policy(), + || async { + let measurements: ResultsPage = + objects_list_page_authz(client, path).await; + + if measurements.items.is_empty() { + return Err(backoff::BackoffError::transient("No metrics yet")); + } + Ok(measurements) + }, + |_, _| {}, + ) + .await + .expect("Failed to query for measurements") +} + +#[nexus_test] +async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) { + let client = &cptestctx.external_client; + DiskTest::new(&cptestctx).await; + create_org_and_project(client).await; + create_disk(&client, ORG_NAME, PROJECT_NAME, DISK_NAME).await; + + // Whenever we grab this URL, get the surrounding few seconds of metrics. + let metric_url = |metric_type: &str| { + let disk_url = format!("{}/{}", get_disks_url(), DISK_NAME); + format!( + "{disk_url}/metrics/{metric_type}?start_time={:?}&end_time={:?}", + Utc::now() - chrono::Duration::seconds(2), + Utc::now() + chrono::Duration::seconds(2), + ) + }; + + // Try accessing metrics before we attach the disk to an instance. + // + // Observe that no metrics exist yet; no "upstairs" should have been + // instantiated on a sled. + let measurements: ResultsPage = + objects_list_page_authz(client, &metric_url("read")).await; + assert!(measurements.items.is_empty()); + + // Create an instance, attach the disk to it. + create_instance_with_disk(client).await; + + for metric in &ALL_METRICS { + let measurements = + query_for_metrics_until_they_exist(client, &metric_url(metric)) + .await; + + assert!(!measurements.items.is_empty()); + for item in &measurements.items { + let cumulative = match item.datum() { + Datum::CumulativeI64(c) => c, + _ => panic!("Unexpected datum type {:?}", item.datum()), + }; + assert!(cumulative.start_time() <= item.timestamp()); + } + } +} + +#[nexus_test] +async fn test_disk_metrics_paginated(cptestctx: &ControlPlaneTestContext) { + let client = &cptestctx.external_client; + DiskTest::new(&cptestctx).await; + create_org_and_project(client).await; + create_disk(&client, ORG_NAME, PROJECT_NAME, DISK_NAME).await; + create_instance_with_disk(client).await; + + for metric in &ALL_METRICS { + let collection_url = + format!("{}/{DISK_NAME}/metrics/{metric}", get_disks_url()); + let initial_params = format!( + "start_time={:?}&end_time={:?}", + Utc::now() - chrono::Duration::seconds(2), + Utc::now() + chrono::Duration::seconds(2), + ); + + query_for_metrics_until_they_exist( + client, + &format!("{collection_url}?{initial_params}"), + ) + .await; + + let measurements_paginated: Collection = + NexusRequest::iter_collection_authn( + client, + &collection_url, + &initial_params, + Some(10), + ) + .await + .expect("failed to iterate over metrics"); + assert!(!measurements_paginated.all_items.is_empty()); + + let mut last_timestamp = None; + let mut last_value = None; + for item in &measurements_paginated.all_items { + let cumulative = match item.datum() { + Datum::CumulativeI64(c) => c, + _ => panic!("Unexpected datum type {:?}", item.datum()), + }; + assert!(cumulative.start_time() <= item.timestamp()); + + // Validate that the timestamps are non-decreasing. + if let Some(last_ts) = last_timestamp { + assert!(last_ts <= item.timestamp()); + } + // Validate that the values increase. + if let Some(last_value) = last_value { + assert!(last_value < cumulative.value()); + } + + last_timestamp = Some(item.timestamp()); + last_value = Some(cumulative.value()); + } + } +} + async fn disk_get(client: &ClientTestContext, disk_url: &str) -> Disk { NexusRequest::object_get(client, disk_url) .authn_as(AuthnMode::PrivilegedUser) diff --git a/nexus/tests/integration_tests/endpoints.rs b/nexus/tests/integration_tests/endpoints.rs index aaecd3b960..2abbd6c0cf 100644 --- a/nexus/tests/integration_tests/endpoints.rs +++ b/nexus/tests/integration_tests/endpoints.rs @@ -8,6 +8,7 @@ //! THERE ARE NO TESTS IN THIS FILE. use crate::integration_tests::unauthorized::HTTP_SERVER; +use chrono::Utc; use http::method::Method; use lazy_static::lazy_static; use nexus_test_utils::RACK_UUID; @@ -177,6 +178,13 @@ lazy_static! { disk_source: params::DiskSource::Blank { block_size: params::BlockSize::try_from(4096).unwrap() }, size: ByteCount::from_gibibytes_u32(16), }; + pub static ref DEMO_DISK_METRICS_URL: String = + format!( + "{}/metrics/activated?start_time={:?}&end_time={:?}", + *DEMO_DISK_URL, + Utc::now(), + Utc::now(), + ); // Instance used for testing pub static ref DEMO_INSTANCE_NAME: Name = "demo-instance".parse().unwrap(); @@ -970,6 +978,15 @@ lazy_static! { ], }, + VerifyEndpoint { + url: &*DEMO_DISK_METRICS_URL, + visibility: Visibility::Protected, + unprivileged_access: UnprivilegedAccess::None, + allowed_methods: vec![ + AllowedMethod::Get, + ], + }, + VerifyEndpoint { url: &*DEMO_INSTANCE_DISKS_URL, visibility: Visibility::Protected, diff --git a/nexus/tests/integration_tests/oximeter.rs b/nexus/tests/integration_tests/oximeter.rs index 65046ac68d..8455277b83 100644 --- a/nexus/tests/integration_tests/oximeter.rs +++ b/nexus/tests/integration_tests/oximeter.rs @@ -111,7 +111,7 @@ async fn test_oximeter_reregistration() { let timeseries_name = "integration_target:integration_metric"; let retrieve_timeseries = || async { match client - .select_timeseries_with(timeseries_name, &[], None, None) + .select_timeseries_with(timeseries_name, &[], None, None, None) .await { Ok(maybe_series) => { @@ -121,7 +121,7 @@ async fn test_oximeter_reregistration() { Ok(maybe_series) } } - Err(oximeter_db::Error::QueryError(_)) => { + Err(oximeter_db::Error::TimeseriesNotFound(_)) => { Err(CondCheckError::NotYet) } Err(e) => Err(CondCheckError::from(e)), @@ -204,6 +204,8 @@ async fn test_oximeter_reregistration() { ) .await .expect("Failed to restart metric producer server"); + nexus_test_utils::register_test_producer(&context.producer) + .expect("Failed to register producer"); // Run the verification in a loop using wait_for_condition, waiting until there is more data, // or failing the test otherwise. diff --git a/nexus/tests/integration_tests/subnet_allocation.rs b/nexus/tests/integration_tests/subnet_allocation.rs index 149d86903a..924a4b200c 100644 --- a/nexus/tests/integration_tests/subnet_allocation.rs +++ b/nexus/tests/integration_tests/subnet_allocation.rs @@ -14,7 +14,7 @@ use nexus_defaults::NUM_INITIAL_RESERVED_IP_ADDRESSES; use nexus_test_utils::http_testing::AuthnMode; use nexus_test_utils::http_testing::NexusRequest; use nexus_test_utils::http_testing::RequestBuilder; -use nexus_test_utils::resource_helpers::create_instance_with_nics; +use nexus_test_utils::resource_helpers::create_instance_with; use nexus_test_utils::resource_helpers::create_ip_pool; use nexus_test_utils::resource_helpers::objects_list_page_authz; use nexus_test_utils::resource_helpers::{create_organization, create_project}; @@ -138,12 +138,14 @@ async fn test_subnet_allocation(cptestctx: &ControlPlaneTestContext) { NUM_INITIAL_RESERVED_IP_ADDRESSES + n_final_reserved_addresses; let subnet_size = subnet.size() as usize - n_reserved_addresses; for i in 0..subnet_size { - create_instance_with_nics( + create_instance_with( client, organization_name, project_name, &format!("i{}", i), &nic, + // Disks= + vec![], ) .await; } diff --git a/nexus/tests/output/nexus_tags.txt b/nexus/tests/output/nexus_tags.txt index 87fa250073..17002ee17d 100644 --- a/nexus/tests/output/nexus_tags.txt +++ b/nexus/tests/output/nexus_tags.txt @@ -3,6 +3,7 @@ OPERATION ID URL PATH disk_create /organizations/{organization_name}/projects/{project_name}/disks disk_delete /organizations/{organization_name}/projects/{project_name}/disks/{disk_name} disk_list /organizations/{organization_name}/projects/{project_name}/disks +disk_metrics_list /organizations/{organization_name}/projects/{project_name}/disks/{disk_name}/metrics/{metric_name} disk_view /organizations/{organization_name}/projects/{project_name}/disks/{disk_name} disk_view_by_id /by-id/disks/{id} diff --git a/nexus/types/src/external_api/params.rs b/nexus/types/src/external_api/params.rs index 8b42b0b34a..0f59257a61 100644 --- a/nexus/types/src/external_api/params.rs +++ b/nexus/types/src/external_api/params.rs @@ -5,6 +5,7 @@ //! Params define the request bodies of API endpoints for creating or updating resources. use crate::external_api::shared; +use chrono::{DateTime, Utc}; use omicron_common::api::external::{ ByteCount, IdentityMetadataCreateParams, IdentityMetadataUpdateParams, InstanceCpuCount, Ipv4Net, Ipv6Net, Name, @@ -835,6 +836,17 @@ pub struct SshKeyCreate { pub public_key: String, } +// METRICS + +/// Query parameters common to resource metrics endpoints. +#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] +pub struct ResourceMetrics { + /// An inclusive start time of metrics. + pub start_time: DateTime, + /// An exclusive end time of metrics. + pub end_time: DateTime, +} + #[cfg(test)] mod test { use super::*; diff --git a/openapi/nexus.json b/openapi/nexus.json index 1aa3679fbe..2770ecd639 100644 --- a/openapi/nexus.json +++ b/openapi/nexus.json @@ -2229,6 +2229,114 @@ } } }, + "/organizations/{organization_name}/projects/{project_name}/disks/{disk_name}/metrics/{metric_name}": { + "get": { + "tags": [ + "disks" + ], + "summary": "Fetch metrics for a disk.", + "operationId": "disk_metrics_list", + "parameters": [ + { + "in": "path", + "name": "disk_name", + "required": true, + "schema": { + "$ref": "#/components/schemas/Name" + }, + "style": "simple" + }, + { + "in": "path", + "name": "metric_name", + "required": true, + "schema": { + "$ref": "#/components/schemas/DiskMetricName" + }, + "style": "simple" + }, + { + "in": "path", + "name": "organization_name", + "required": true, + "schema": { + "$ref": "#/components/schemas/Name" + }, + "style": "simple" + }, + { + "in": "path", + "name": "project_name", + "required": true, + "schema": { + "$ref": "#/components/schemas/Name" + }, + "style": "simple" + }, + { + "in": "query", + "name": "end_time", + "description": "An exclusive end time of metrics.", + "schema": { + "type": "string", + "format": "date-time" + }, + "style": "form" + }, + { + "in": "query", + "name": "limit", + "description": "Maximum number of items returned by a single call", + "schema": { + "nullable": true, + "type": "integer", + "format": "uint32", + "minimum": 1 + }, + "style": "form" + }, + { + "in": "query", + "name": "page_token", + "description": "Token returned by previous call to retrieve the subsequent page", + "schema": { + "nullable": true, + "type": "string" + }, + "style": "form" + }, + { + "in": "query", + "name": "start_time", + "description": "An inclusive start time of metrics.", + "schema": { + "type": "string", + "format": "date-time" + }, + "style": "form" + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/MeasurementResultsPage" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + }, + "x-dropshot-pagination": true + } + }, "/organizations/{organization_name}/projects/{project_name}/images": { "get": { "tags": [ @@ -6708,6 +6816,194 @@ } }, "schemas": { + "BinRangedouble": { + "description": "A type storing a range over `T`.\n\nThis type supports ranges similar to the `RangeTo`, `Range` and `RangeFrom` types in the standard library. Those cover `(..end)`, `(start..end)`, and `(start..)` respectively.", + "oneOf": [ + { + "description": "A range unbounded below and exclusively above, `..end`.", + "type": "object", + "properties": { + "end": { + "type": "number", + "format": "double" + }, + "type": { + "type": "string", + "enum": [ + "range_to" + ] + } + }, + "required": [ + "end", + "type" + ] + }, + { + "description": "A range bounded inclusively below and exclusively above, `start..end`.", + "type": "object", + "properties": { + "end": { + "type": "number", + "format": "double" + }, + "start": { + "type": "number", + "format": "double" + }, + "type": { + "type": "string", + "enum": [ + "range" + ] + } + }, + "required": [ + "end", + "start", + "type" + ] + }, + { + "description": "A range bounded inclusively below and unbounded above, `start..`.", + "type": "object", + "properties": { + "start": { + "type": "number", + "format": "double" + }, + "type": { + "type": "string", + "enum": [ + "range_from" + ] + } + }, + "required": [ + "start", + "type" + ] + } + ] + }, + "BinRangeint64": { + "description": "A type storing a range over `T`.\n\nThis type supports ranges similar to the `RangeTo`, `Range` and `RangeFrom` types in the standard library. Those cover `(..end)`, `(start..end)`, and `(start..)` respectively.", + "oneOf": [ + { + "description": "A range unbounded below and exclusively above, `..end`.", + "type": "object", + "properties": { + "end": { + "type": "integer", + "format": "int64" + }, + "type": { + "type": "string", + "enum": [ + "range_to" + ] + } + }, + "required": [ + "end", + "type" + ] + }, + { + "description": "A range bounded inclusively below and exclusively above, `start..end`.", + "type": "object", + "properties": { + "end": { + "type": "integer", + "format": "int64" + }, + "start": { + "type": "integer", + "format": "int64" + }, + "type": { + "type": "string", + "enum": [ + "range" + ] + } + }, + "required": [ + "end", + "start", + "type" + ] + }, + { + "description": "A range bounded inclusively below and unbounded above, `start..`.", + "type": "object", + "properties": { + "start": { + "type": "integer", + "format": "int64" + }, + "type": { + "type": "string", + "enum": [ + "range_from" + ] + } + }, + "required": [ + "start", + "type" + ] + } + ] + }, + "Bindouble": { + "description": "Type storing bin edges and a count of samples within it.", + "type": "object", + "properties": { + "count": { + "description": "The total count of samples in this bin.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "range": { + "description": "The range of the support covered by this bin.", + "allOf": [ + { + "$ref": "#/components/schemas/BinRangedouble" + } + ] + } + }, + "required": [ + "count", + "range" + ] + }, + "Binint64": { + "description": "Type storing bin edges and a count of samples within it.", + "type": "object", + "properties": { + "count": { + "description": "The total count of samples in this bin.", + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "range": { + "description": "The range of the support covered by this bin.", + "allOf": [ + { + "$ref": "#/components/schemas/BinRangeint64" + } + ] + } + }, + "required": [ + "count", + "range" + ] + }, "BlockSize": { "title": "disk block size in bytes", "type": "integer", @@ -6723,6 +7019,216 @@ "format": "uint64", "minimum": 0 }, + "Cumulativedouble": { + "description": "A cumulative or counter data type.", + "type": "object", + "properties": { + "start_time": { + "type": "string", + "format": "date-time" + }, + "value": { + "type": "number", + "format": "double" + } + }, + "required": [ + "start_time", + "value" + ] + }, + "Cumulativeint64": { + "description": "A cumulative or counter data type.", + "type": "object", + "properties": { + "start_time": { + "type": "string", + "format": "date-time" + }, + "value": { + "type": "integer", + "format": "int64" + } + }, + "required": [ + "start_time", + "value" + ] + }, + "Datum": { + "description": "A `Datum` is a single sampled data point from a metric.", + "oneOf": [ + { + "type": "object", + "properties": { + "datum": { + "type": "boolean" + }, + "type": { + "type": "string", + "enum": [ + "bool" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "type": "integer", + "format": "int64" + }, + "type": { + "type": "string", + "enum": [ + "i64" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "type": "number", + "format": "double" + }, + "type": { + "type": "string", + "enum": [ + "f64" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "type": "string" + }, + "type": { + "type": "string", + "enum": [ + "string" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "type": "array", + "items": { + "type": "integer", + "format": "uint8", + "minimum": 0 + } + }, + "type": { + "type": "string", + "enum": [ + "bytes" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "$ref": "#/components/schemas/Cumulativeint64" + }, + "type": { + "type": "string", + "enum": [ + "cumulative_i64" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "$ref": "#/components/schemas/Cumulativedouble" + }, + "type": { + "type": "string", + "enum": [ + "cumulative_f64" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "$ref": "#/components/schemas/Histogramint64" + }, + "type": { + "type": "string", + "enum": [ + "histogram_i64" + ] + } + }, + "required": [ + "datum", + "type" + ] + }, + { + "type": "object", + "properties": { + "datum": { + "$ref": "#/components/schemas/Histogramdouble" + }, + "type": { + "type": "string", + "enum": [ + "histogram_f64" + ] + } + }, + "required": [ + "datum", + "type" + ] + } + ] + }, "DatumType": { "description": "The type of an individual datum of a metric.", "type": "string", @@ -7508,6 +8014,58 @@ "items" ] }, + "Histogramdouble": { + "description": "A simple type for managing a histogram metric.\n\nA histogram maintains the count of any number of samples, over a set of bins. Bins are specified on construction via their _left_ edges, inclusive. There can't be any \"gaps\" in the bins, and an additional bin may be added to the left, right, or both so that the bins extend to the entire range of the support.\n\nNote that any gaps, unsorted bins, or non-finite values will result in an error.\n\nExample ------- ```rust use oximeter::histogram::{BinRange, Histogram};\n\nlet edges = [0i64, 10, 20]; let mut hist = Histogram::new(&edges).unwrap(); assert_eq!(hist.n_bins(), 4); // One additional bin for the range (20..) assert_eq!(hist.n_samples(), 0); hist.sample(4); hist.sample(100); assert_eq!(hist.n_samples(), 2);\n\nlet data = hist.iter().collect::>(); assert_eq!(data[0].range, BinRange::range(i64::MIN, 0)); // An additional bin for `..0` assert_eq!(data[0].count, 0); // Nothing is in this bin\n\nassert_eq!(data[1].range, BinRange::range(0, 10)); // The range `0..10` assert_eq!(data[1].count, 1); // 4 is sampled into this bin ```\n\nNotes -----\n\nHistograms may be constructed either from their left bin edges, or from a sequence of ranges. In either case, the left-most bin may be converted upon construction. In particular, if the left-most value is not equal to the minimum of the support, a new bin will be added from the minimum to that provided value. If the left-most value _is_ the support's minimum, because the provided bin was unbounded below, such as `(..0)`, then that bin will be converted into one bounded below, `(MIN..0)` in this case.\n\nThe short of this is that, most of the time, it shouldn't matter. If one specifies the extremes of the support as their bins, be aware that the left-most may be converted from a `BinRange::RangeTo` into a `BinRange::Range`. In other words, the first bin of a histogram is _always_ a `Bin::Range` or a `Bin::RangeFrom` after construction. In fact, every bin is one of those variants, the `BinRange::RangeTo` is only provided as a convenience during construction.", + "type": "object", + "properties": { + "bins": { + "type": "array", + "items": { + "$ref": "#/components/schemas/Bindouble" + } + }, + "n_samples": { + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "start_time": { + "type": "string", + "format": "date-time" + } + }, + "required": [ + "bins", + "n_samples", + "start_time" + ] + }, + "Histogramint64": { + "description": "A simple type for managing a histogram metric.\n\nA histogram maintains the count of any number of samples, over a set of bins. Bins are specified on construction via their _left_ edges, inclusive. There can't be any \"gaps\" in the bins, and an additional bin may be added to the left, right, or both so that the bins extend to the entire range of the support.\n\nNote that any gaps, unsorted bins, or non-finite values will result in an error.\n\nExample ------- ```rust use oximeter::histogram::{BinRange, Histogram};\n\nlet edges = [0i64, 10, 20]; let mut hist = Histogram::new(&edges).unwrap(); assert_eq!(hist.n_bins(), 4); // One additional bin for the range (20..) assert_eq!(hist.n_samples(), 0); hist.sample(4); hist.sample(100); assert_eq!(hist.n_samples(), 2);\n\nlet data = hist.iter().collect::>(); assert_eq!(data[0].range, BinRange::range(i64::MIN, 0)); // An additional bin for `..0` assert_eq!(data[0].count, 0); // Nothing is in this bin\n\nassert_eq!(data[1].range, BinRange::range(0, 10)); // The range `0..10` assert_eq!(data[1].count, 1); // 4 is sampled into this bin ```\n\nNotes -----\n\nHistograms may be constructed either from their left bin edges, or from a sequence of ranges. In either case, the left-most bin may be converted upon construction. In particular, if the left-most value is not equal to the minimum of the support, a new bin will be added from the minimum to that provided value. If the left-most value _is_ the support's minimum, because the provided bin was unbounded below, such as `(..0)`, then that bin will be converted into one bounded below, `(MIN..0)` in this case.\n\nThe short of this is that, most of the time, it shouldn't matter. If one specifies the extremes of the support as their bins, be aware that the left-most may be converted from a `BinRange::RangeTo` into a `BinRange::Range`. In other words, the first bin of a histogram is _always_ a `Bin::Range` or a `Bin::RangeFrom` after construction. In fact, every bin is one of those variants, the `BinRange::RangeTo` is only provided as a convenience during construction.", + "type": "object", + "properties": { + "bins": { + "type": "array", + "items": { + "$ref": "#/components/schemas/Binint64" + } + }, + "n_samples": { + "type": "integer", + "format": "uint64", + "minimum": 0 + }, + "start_time": { + "type": "string", + "format": "date-time" + } + }, + "required": [ + "bins", + "n_samples", + "start_time" + ] + }, "IdentityProvider": { "description": "Client view of an [`IdentityProvider`]", "type": "object", @@ -8437,6 +8995,44 @@ "minLength": 17, "maxLength": 17 }, + "Measurement": { + "description": "A `Measurement` is a timestamped datum from a single metric", + "type": "object", + "properties": { + "datum": { + "$ref": "#/components/schemas/Datum" + }, + "timestamp": { + "type": "string", + "format": "date-time" + } + }, + "required": [ + "datum", + "timestamp" + ] + }, + "MeasurementResultsPage": { + "description": "A single page of results", + "type": "object", + "properties": { + "items": { + "description": "list of items on this page of results", + "type": "array", + "items": { + "$ref": "#/components/schemas/Measurement" + } + }, + "next_page": { + "nullable": true, + "description": "token used to fetch the next page of results (if any)", + "type": "string" + } + }, + "required": [ + "items" + ] + }, "Name": { "title": "A name unique within the parent collection", "description": "Names must begin with a lower case ASCII letter, be composed exclusively of lowercase ASCII, uppercase ASCII, numbers, and '-', and may not end with a '-'. Names cannot be a UUID though they may contain a UUID.", @@ -11060,6 +11656,17 @@ "name_descending", "id_ascending" ] + }, + "DiskMetricName": { + "type": "string", + "enum": [ + "activated", + "flush", + "read", + "read_bytes", + "write", + "write_bytes" + ] } } }, diff --git a/oximeter/db/src/bin/oxdb.rs b/oximeter/db/src/bin/oxdb.rs index 5410bc5a74..478aa79615 100644 --- a/oximeter/db/src/bin/oxdb.rs +++ b/oximeter/db/src/bin/oxdb.rs @@ -287,6 +287,7 @@ async fn query( filters.as_slice(), start, end, + None, ) .await?; println!("{}", serde_json::to_string(×eries).unwrap()); diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index 60e04958eb..d476b1189c 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -66,6 +66,7 @@ impl Client { criteria: &[&str], start_time: Option, end_time: Option, + limit: Option, ) -> Result, Error> { // Querying uses up to three queries to the database: // 1. Retrieve the schema @@ -79,18 +80,20 @@ impl Client { // to/from the database, as well as the cost of parsing them for each measurement, only to // promptly throw away almost all of them (except for the first). let timeseries_name = TimeseriesName::try_from(timeseries_name)?; - let schema = self - .schema_for_timeseries(×eries_name) - .await? - .ok_or_else(|| { - Error::QueryError(format!( - "No such timeseries: '{}'", - timeseries_name - )) - })?; - let mut query_builder = query::SelectQueryBuilder::new(&schema) + let schema = + self.schema_for_timeseries(×eries_name).await?.ok_or_else( + || Error::TimeseriesNotFound(format!("{timeseries_name}")), + )?; + let query_builder = query::SelectQueryBuilder::new(&schema) .start_time(start_time) .end_time(end_time); + + let mut query_builder = if let Some(limit) = limit { + query_builder.limit(limit) + } else { + query_builder + }; + for criterion in criteria.iter() { query_builder = query_builder.filter_raw(criterion)?; } @@ -123,10 +126,7 @@ impl Client { .schema_for_timeseries(¶ms.timeseries_name) .await? .ok_or_else(|| { - Error::QueryError(format!( - "No such timeseries: '{}'", - params.timeseries_name - )) + Error::TimeseriesNotFound(format!("{}", params.timeseries_name)) })?; // TODO: Handle inclusive/exclusive timestamps in general. // @@ -838,6 +838,7 @@ mod tests { &criteria.iter().map(|x| x.as_str()).collect::>(), None, None, + None, ) .await .unwrap(); @@ -1002,6 +1003,7 @@ mod tests { &["id==0"], None, None, + None, ) .await .expect("Failed to select test samples"); @@ -1090,6 +1092,7 @@ mod tests { criteria, start_time, end_time, + None, ) .await .expect("Failed to select timeseries"); @@ -1346,6 +1349,7 @@ mod tests { &[], Some(query::Timestamp::Exclusive(start_time)), None, + None, ) .await .expect("Failed to select timeseries"); @@ -1363,6 +1367,82 @@ mod tests { db.cleanup().await.expect("Failed to cleanup database"); } + #[tokio::test] + async fn test_select_timeseries_with_limit() { + let (_, _, samples) = setup_select_test(); + let mut db = ClickHouseInstance::new(0) + .await + .expect("Failed to start ClickHouse"); + let address = SocketAddr::new("::1".parse().unwrap(), db.port()); + let log = Logger::root(slog::Discard, o!()); + let client = Client::new(address, &log); + client + .init_db() + .await + .expect("Failed to initialize timeseries database"); + client + .insert_samples(&samples) + .await + .expect("Failed to insert samples"); + let timeseries_name = "service:request_latency"; + + // First, query without a limit. We should see all the results. + let all_measurements = &client + .select_timeseries_with(timeseries_name, &[], None, None, None) + .await + .expect("Failed to select timeseries")[0] + .measurements; + + // Check some constraints on the number of measurements - we + // can change these, but these assumptions make the test simpler. + // + // For now, assume we can cleanly cut the number of measurements in + // half. + assert!(all_measurements.len() >= 2); + assert!(all_measurements.len() % 2 == 0); + + // Next, let's set a limit to half the results and query again. + let limit = + NonZeroU32::new(u32::try_from(all_measurements.len() / 2).unwrap()) + .unwrap(); + let timeseries = &client + .select_timeseries_with( + timeseries_name, + &[], + None, + None, + Some(limit), + ) + .await + .expect("Failed to select timeseries")[0]; + assert_eq!(timeseries.measurements.len() as u32, limit.get()); + assert_eq!( + all_measurements[..all_measurements.len() / 2], + timeseries.measurements + ); + + // Get the other half of the results. + let timeseries = &client + .select_timeseries_with( + timeseries_name, + &[], + Some(query::Timestamp::Exclusive( + timeseries.measurements.last().unwrap().timestamp(), + )), + None, + Some(limit), + ) + .await + .expect("Failed to select timeseries")[0]; + assert_eq!(timeseries.measurements.len() as u32, limit.get()); + assert_eq!( + all_measurements[all_measurements.len() / 2..], + timeseries.measurements + ); + + db.cleanup().await.expect("Failed to cleanup database"); + } + #[tokio::test] async fn test_get_schema_no_new_values() { let (mut db, client, _) = setup_filter_testcase().await; diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index 27b50d6572..10890de04f 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -43,9 +43,8 @@ pub enum Error { actual: BTreeMap, }, - /// An error querying or filtering data - #[error("Invalid query or data filter: {0}")] - QueryError(String), + #[error("Timeseries not found for: {0}")] + TimeseriesNotFound(String), #[error("The field comparison operation '{op}' is not valid for field '{field_name}' with type {field_type}")] InvalidSelectionOp { op: String, field_name: String, field_type: FieldType }, diff --git a/sled-agent/Cargo.toml b/sled-agent/Cargo.toml index 550abdd71e..15ac146f83 100644 --- a/sled-agent/Cargo.toml +++ b/sled-agent/Cargo.toml @@ -14,6 +14,11 @@ cfg-if = "1.0" chrono = { version = "0.4", features = [ "serde" ] } clap = { version = "3.2", features = ["derive"] } # Only used by the simulated sled agent. +# TODO: This is probably overkill. We'd like to only depend on +# the "crucible-agent-client", but the VolumeConstructionRequest object +# does not exist there. +crucible = { git = "https://github.com/oxidecomputer/crucible", rev = "2add0de8489f1d4de901bfe98fc28b0a6efcc3ea" } +# Only used by the simulated sled agent. crucible-agent-client = { git = "https://github.com/oxidecomputer/crucible", rev = "2add0de8489f1d4de901bfe98fc28b0a6efcc3ea" } ddm-admin-client = { path = "../ddm-admin-client" } dropshot = { git = "https://github.com/oxidecomputer/dropshot", branch = "main", features = [ "usdt-probes" ] } @@ -24,6 +29,8 @@ libc = "0.2.126" macaddr = { version = "1.0.1", features = [ "serde_std" ] } nexus-client = { path = "../nexus-client" } omicron-common = { path = "../common" } +oximeter = { version = "0.1.0", path = "../oximeter/oximeter" } +oximeter-producer = { version = "0.1.0", path = "../oximeter/producer" } p256 = "0.9.0" percent-encoding = "2.1.0" progenitor = { git = "https://github.com/oxidecomputer/progenitor" } diff --git a/sled-agent/src/sim/collection.rs b/sled-agent/src/sim/collection.rs index 6884131107..225bf6cd0f 100644 --- a/sled-agent/src/sim/collection.rs +++ b/sled-agent/src/sim/collection.rs @@ -250,6 +250,22 @@ impl SimCollection { } } + pub async fn sim_ensure_producer( + self: &Arc, + id: &Uuid, + args: S::ProducerArgs, + ) -> Result<(), Error> { + self.objects + .lock() + .await + .get_mut(id) + .expect("Setting producer on object that does not exist") + .object + .set_producer(args) + .await?; + Ok(()) + } + /// Move the object identified by `id` from its current state to the /// requested state `target`. The object does not need to exist already; if /// not, it will be created from `current`. (This is the only case where diff --git a/sled-agent/src/sim/disk.rs b/sled-agent/src/sim/disk.rs index 5d67b8decb..4c8fc55ac3 100644 --- a/sled-agent/src/sim/disk.rs +++ b/sled-agent/src/sim/disk.rs @@ -8,32 +8,211 @@ use crate::nexus::NexusClient; use crate::params::DiskStateRequested; use crate::sim::simulatable::Simulatable; use async_trait::async_trait; +use dropshot::ConfigDropshot; +use dropshot::ConfigLogging; +use dropshot::ConfigLoggingLevel; use omicron_common::api::external::DiskState; use omicron_common::api::external::Error; use omicron_common::api::external::Generation; use omicron_common::api::internal::nexus::DiskRuntimeState; +use omicron_common::api::internal::nexus::ProducerEndpoint; +use oximeter_producer::Server as ProducerServer; use propolis_client::api::DiskAttachmentState as PropolisDiskState; +use std::net::{Ipv6Addr, SocketAddr}; use std::sync::Arc; +use std::time::Duration; use uuid::Uuid; use crate::common::disk::{Action as DiskAction, DiskStates}; +// Oximeter timeseries names are derived based on the precise names of structs, +// so we shove this in a module to more liberally use arbitrary names (like +// "Read"). +mod producers { + use super::*; + use oximeter::{ + types::{Cumulative, Sample}, + Metric, Target, + }; + + #[derive(Debug, Clone, Target)] + pub struct CrucibleUpstairs { + pub upstairs_uuid: Uuid, + } + + // TODO: It would be a lot nicer if we could just depend on the Crucible + // types here directly, rather than recreate them. However, in doing so, + // we bump into issues with the "Metric" trait - the implementation of + // oximeter::Producer claims that "Metric" is not implemented for the + // Crucible-defined structure, even though it is derived. + // I suspect this is due to version skew between Crucible vs Omicron's copy + // of Oximeter. + + #[derive(Debug, Default, Copy, Clone, Metric)] + struct Activated { + /// Count of times this upstairs has activated. + #[datum] + count: Cumulative, + } + #[derive(Debug, Default, Copy, Clone, Metric)] + struct Write { + /// Count of region writes this upstairs has completed + #[datum] + count: Cumulative, + } + #[derive(Debug, Default, Copy, Clone, Metric)] + struct WriteBytes { + /// Count of bytes written + #[datum] + count: Cumulative, + } + #[derive(Debug, Default, Copy, Clone, Metric)] + struct Read { + /// Count of region reads this upstairs has completed + #[datum] + count: Cumulative, + } + #[derive(Debug, Default, Copy, Clone, Metric)] + struct ReadBytes { + /// Count of bytes read + #[datum] + count: Cumulative, + } + #[derive(Debug, Default, Copy, Clone, Metric)] + struct Flush { + /// Count of region flushes this upstairs has completed + #[datum] + count: Cumulative, + } + + #[derive(Debug, Clone)] + pub struct DiskProducer { + target: CrucibleUpstairs, + activated_count: Activated, + write_count: Write, + write_bytes: WriteBytes, + read_count: Read, + read_bytes: ReadBytes, + flush_count: Flush, + } + + impl DiskProducer { + pub fn new(id: Uuid) -> Self { + Self { + target: CrucibleUpstairs { upstairs_uuid: id }, + activated_count: Default::default(), + write_count: Default::default(), + write_bytes: Default::default(), + read_count: Default::default(), + read_bytes: Default::default(), + flush_count: Default::default(), + } + } + } + + impl oximeter::Producer for DiskProducer { + fn produce( + &mut self, + ) -> Result< + Box<(dyn Iterator + 'static)>, + oximeter::MetricsError, + > { + let samples = vec![ + Sample::new(&self.target, &self.activated_count), + Sample::new(&self.target, &self.write_count), + Sample::new(&self.target, &self.write_bytes), + Sample::new(&self.target, &self.read_count), + Sample::new(&self.target, &self.read_bytes), + Sample::new(&self.target, &self.flush_count), + ]; + + *self.activated_count.datum_mut() += 1; + *self.write_count.datum_mut() += 1; + *self.write_bytes.datum_mut() += 1; + *self.read_count.datum_mut() += 1; + *self.read_bytes.datum_mut() += 1; + *self.flush_count.datum_mut() += 1; + + Ok(Box::new(samples.into_iter())) + } + } +} + /// Simulated Disk (network block device), as created by the external Oxide API /// /// See `Simulatable` for how this works. -#[derive(Debug)] pub struct SimDisk { state: DiskStates, + producer: Option, +} + +// "producer" doesn't implement Debug, so we can't derive it on SimDisk. +impl std::fmt::Debug for SimDisk { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SimDisk").field("state", &self.state).finish() + } +} + +impl SimDisk { + pub async fn start_producer_server( + &mut self, + nexus_address: SocketAddr, + id: Uuid, + ) -> Result<(), String> { + // Set up a producer server. + // + // This listens on any available port, and the server internally updates this to the actual + // bound port of the Dropshot HTTP server. + let producer_address = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), 0); + let server_info = ProducerEndpoint { + id, + address: producer_address, + base_route: "/collect".to_string(), + interval: Duration::from_millis(200), + }; + let config = oximeter_producer::Config { + server_info, + registration_address: nexus_address, + dropshot_config: ConfigDropshot { + bind_address: producer_address, + ..Default::default() + }, + logging_config: ConfigLogging::StderrTerminal { + level: ConfigLoggingLevel::Error, + }, + }; + let server = + ProducerServer::start(&config).await.map_err(|e| e.to_string())?; + + let producer = producers::DiskProducer::new(id); + server + .registry() + .register_producer(producer) + .map_err(|e| e.to_string())?; + self.producer.replace(server); + Ok(()) + } } #[async_trait] impl Simulatable for SimDisk { type CurrentState = DiskRuntimeState; type RequestedState = DiskStateRequested; + type ProducerArgs = (std::net::SocketAddr, Uuid); type Action = DiskAction; fn new(current: DiskRuntimeState) -> Self { - SimDisk { state: DiskStates::new(current) } + SimDisk { state: DiskStates::new(current), producer: None } + } + + async fn set_producer( + &mut self, + args: Self::ProducerArgs, + ) -> Result<(), Error> { + self.start_producer_server(args.0, args.1).await.map_err(|e| { + Error::internal_error(&format!("Setting producer server: {e}")) + })?; + Ok(()) } fn request_transition( diff --git a/sled-agent/src/sim/instance.rs b/sled-agent/src/sim/instance.rs index 428138348e..39d9bc624d 100644 --- a/sled-agent/src/sim/instance.rs +++ b/sled-agent/src/sim/instance.rs @@ -30,12 +30,21 @@ pub struct SimInstance { impl Simulatable for SimInstance { type CurrentState = InstanceRuntimeState; type RequestedState = InstanceRuntimeStateRequested; + type ProducerArgs = (); type Action = InstanceAction; fn new(current: InstanceRuntimeState) -> Self { SimInstance { state: InstanceStates::new(current) } } + async fn set_producer( + &mut self, + _args: Self::ProducerArgs, + ) -> Result<(), Error> { + // NOTE: Not implemented, yet. + Ok(()) + } + fn request_transition( &mut self, target: &InstanceRuntimeStateRequested, diff --git a/sled-agent/src/sim/server.rs b/sled-agent/src/sim/server.rs index 555fa2f56f..6413fdf6ff 100644 --- a/sled-agent/src/sim/server.rs +++ b/sled-agent/src/sim/server.rs @@ -46,6 +46,7 @@ impl Server { let sled_agent = Arc::new(SledAgent::new_simulated_with_id( &config, sa_log, + config.nexus_address, Arc::clone(&nexus_client), )); diff --git a/sled-agent/src/sim/simulatable.rs b/sled-agent/src/sim/simulatable.rs index ae94409dde..4e8f88f711 100644 --- a/sled-agent/src/sim/simulatable.rs +++ b/sled-agent/src/sim/simulatable.rs @@ -61,6 +61,9 @@ pub trait Simulatable: fmt::Debug + Send + Sync { /// transitions to intermediate states. type RequestedState: Send + Clone + fmt::Debug; + /// Arguments to start a producer on the simulated object. + type ProducerArgs: Send + Clone + fmt::Debug; + /// Represents an action that should be taken by the Sled Agent. /// Generated in response to a state change, either requested or observed. type Action: Send + Clone + fmt::Debug; @@ -68,6 +71,12 @@ pub trait Simulatable: fmt::Debug + Send + Sync { /// Creates a new Simulatable object. fn new(current: Self::CurrentState) -> Self; + /// Sets the producer based on the provided arguments. + async fn set_producer( + &mut self, + args: Self::ProducerArgs, + ) -> Result<(), Error>; + /// Requests that the simulated object transition to a new target. /// /// If successful, returns the action that must be taken by the Sled Agent diff --git a/sled-agent/src/sim/sled_agent.rs b/sled-agent/src/sim/sled_agent.rs index b11baf5665..5a7d3dd0c2 100644 --- a/sled-agent/src/sim/sled_agent.rs +++ b/sled-agent/src/sim/sled_agent.rs @@ -15,6 +15,7 @@ use omicron_common::api::external::Error; use omicron_common::api::internal::nexus::DiskRuntimeState; use omicron_common::api::internal::nexus::InstanceRuntimeState; use slog::Logger; +use std::net::SocketAddr; use std::sync::Arc; use uuid::Uuid; @@ -37,6 +38,7 @@ pub struct SledAgent { /// collection of simulated disks, indexed by disk uuid disks: Arc>, storage: Mutex, + nexus_address: SocketAddr, pub nexus_client: Arc, } @@ -47,6 +49,7 @@ impl SledAgent { pub fn new_simulated_with_id( config: &Config, log: Logger, + nexus_address: SocketAddr, nexus_client: Arc, ) -> SledAgent { let id = config.id; @@ -74,6 +77,7 @@ impl SledAgent { config.storage.ip, storage_log, )), + nexus_address, nexus_client, } } @@ -87,6 +91,25 @@ impl SledAgent { initial_hardware: InstanceHardware, target: InstanceRuntimeStateRequested, ) -> Result { + for disk in &initial_hardware.disks { + let initial_state = DiskRuntimeState { + disk_state: omicron_common::api::external::DiskState::Attached( + instance_id, + ), + gen: omicron_common::api::external::Generation::new(), + time_updated: chrono::Utc::now(), + }; + let target = DiskStateRequested::Attached(instance_id); + + let id = match disk.volume_construction_request { + crucible::VolumeConstructionRequest::Volume { id, .. } => id, + _ => panic!("Unexpected construction type"), + }; + self.disks.sim_ensure(&id, initial_state, target).await?; + self.disks + .sim_ensure_producer(&id, (self.nexus_address, id)) + .await?; + } self.instances .sim_ensure(&instance_id, initial_hardware.runtime, target) .await