diff --git a/Cargo.lock b/Cargo.lock index 8ff57cd451..55b9bafa82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5310,6 +5310,7 @@ dependencies = [ "omicron-common", "omicron-workspace-hack", "oximeter-macro-impl", + "regex", "rstest", "schemars", "serde", @@ -7528,9 +7529,9 @@ dependencies = [ [[package]] name = "similar" -version = "2.2.1" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "420acb44afdae038210c99e69aae24109f32f15500aa708e81d46c9f29d55fcf" +checksum = "2aeaf503862c419d66959f5d7ca015337d864e9c49485d771b732e2a20453597" dependencies = [ "bstr 0.2.17", "unicode-segmentation", diff --git a/openapi/nexus-internal.json b/openapi/nexus-internal.json index caf1414f53..f909710ab4 100644 --- a/openapi/nexus-internal.json +++ b/openapi/nexus-internal.json @@ -4231,6 +4231,20 @@ "content", "type" ] + }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "invalid_timeseries_name" + ] + } + }, + "required": [ + "type" + ] } ] }, diff --git a/openapi/sled-agent.json b/openapi/sled-agent.json index f809cfa57b..d71f8de644 100644 --- a/openapi/sled-agent.json +++ b/openapi/sled-agent.json @@ -5209,6 +5209,20 @@ "content", "type" ] + }, + { + "type": "object", + "properties": { + "type": { + "type": "string", + "enum": [ + "invalid_timeseries_name" + ] + } + }, + "required": [ + "type" + ] } ] }, diff --git a/oximeter/collector/src/self_stats.rs b/oximeter/collector/src/self_stats.rs index dd1701203e..8d39e6e282 100644 --- a/oximeter/collector/src/self_stats.rs +++ b/oximeter/collector/src/self_stats.rs @@ -154,8 +154,15 @@ impl CollectionTaskStats { #[cfg(test)] mod tests { + use super::Collections; + use super::Cumulative; + use super::FailedCollections; use super::FailureReason; + use super::OximeterCollector; use super::StatusCode; + use oximeter::schema::SchemaSet; + use std::net::IpAddr; + use std::net::Ipv6Addr; #[test] fn test_failure_reason_serialization() { @@ -168,4 +175,50 @@ mod tests { assert_eq!(variant.to_string(), *as_str); } } + + const fn collector() -> OximeterCollector { + OximeterCollector { + collector_id: uuid::uuid!("cfebaa5f-3ba9-4bb5-9145-648d287df78a"), + collector_ip: IpAddr::V6(Ipv6Addr::LOCALHOST), + collector_port: 12345, + } + } + + fn collections() -> Collections { + Collections { + producer_id: uuid::uuid!("718452ab-7cca-42f6-b8b1-1aaaa1b09104"), + producer_ip: IpAddr::V6(Ipv6Addr::LOCALHOST), + producer_port: 12345, + base_route: String::from("/"), + datum: Cumulative::new(0), + } + } + + fn failed_collections() -> FailedCollections { + FailedCollections { + producer_id: uuid::uuid!("718452ab-7cca-42f6-b8b1-1aaaa1b09104"), + producer_ip: IpAddr::V6(Ipv6Addr::LOCALHOST), + producer_port: 12345, + base_route: String::from("/"), + reason: FailureReason::Unreachable.to_string(), + datum: Cumulative::new(0), + } + } + + // Check that the self-stat timeseries schema have not changed. + #[test] + fn test_no_schema_changes() { + let collector = collector(); + let collections = collections(); + let failed = failed_collections(); + let mut set = SchemaSet::default(); + assert!(set.insert_checked(&collector, &collections).is_none()); + assert!(set.insert_checked(&collector, &failed).is_none()); + + const PATH: &'static str = concat!( + env!("CARGO_MANIFEST_DIR"), + "/tests/output/self-stat-schema.json" + ); + set.assert_contents(PATH); + } } diff --git a/oximeter/collector/tests/output/self-stat-schema.json b/oximeter/collector/tests/output/self-stat-schema.json new file mode 100644 index 0000000000..0caf2d27e9 --- /dev/null +++ b/oximeter/collector/tests/output/self-stat-schema.json @@ -0,0 +1,91 @@ +{ + "oximeter_collector:collections": { + "timeseries_name": "oximeter_collector:collections", + "field_schema": [ + { + "name": "base_route", + "field_type": "string", + "source": "metric" + }, + { + "name": "collector_id", + "field_type": "uuid", + "source": "target" + }, + { + "name": "collector_ip", + "field_type": "ip_addr", + "source": "target" + }, + { + "name": "collector_port", + "field_type": "u16", + "source": "target" + }, + { + "name": "producer_id", + "field_type": "uuid", + "source": "metric" + }, + { + "name": "producer_ip", + "field_type": "ip_addr", + "source": "metric" + }, + { + "name": "producer_port", + "field_type": "u16", + "source": "metric" + } + ], + "datum_type": "cumulative_u64", + "created": "2023-12-04T17:49:47.797495948Z" + }, + "oximeter_collector:failed_collections": { + "timeseries_name": "oximeter_collector:failed_collections", + "field_schema": [ + { + "name": "base_route", + "field_type": "string", + "source": "metric" + }, + { + "name": "collector_id", + "field_type": "uuid", + "source": "target" + }, + { + "name": "collector_ip", + "field_type": "ip_addr", + "source": "target" + }, + { + "name": "collector_port", + "field_type": "u16", + "source": "target" + }, + { + "name": "producer_id", + "field_type": "uuid", + "source": "metric" + }, + { + "name": "producer_ip", + "field_type": "ip_addr", + "source": "metric" + }, + { + "name": "producer_port", + "field_type": "u16", + "source": "metric" + }, + { + "name": "reason", + "field_type": "string", + "source": "metric" + } + ], + "datum_type": "cumulative_u64", + "created": "2023-12-04T17:49:47.799970009Z" + } +} \ No newline at end of file diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index c8a7db20cb..d295d0dcdf 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -710,7 +710,7 @@ impl Client { &self, sample: &Sample, ) -> Result, Error> { - let sample_schema = model::schema_for(sample); + let sample_schema = TimeseriesSchema::from(sample); let name = sample_schema.timeseries_name.clone(); let mut schema = self.schema.lock().await; @@ -1873,7 +1873,7 @@ mod tests { client.insert_samples(&[sample.clone()]).await.unwrap(); // The internal map should now contain both the new timeseries schema - let actual_schema = model::schema_for(&sample); + let actual_schema = TimeseriesSchema::from(&sample); let timeseries_name = TimeseriesName::try_from(sample.timeseries_name.as_str()).unwrap(); let expected_schema = client diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index 425c5189ee..9029319048 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -7,13 +7,23 @@ // Copyright 2023 Oxide Computer Company use crate::query::StringFieldSelector; -use chrono::{DateTime, Utc}; -use dropshot::{EmptyScanParams, PaginationParams}; -pub use oximeter::{DatumType, Field, FieldType, Measurement, Sample}; +use chrono::DateTime; +use chrono::Utc; +use dropshot::EmptyScanParams; +use dropshot::PaginationParams; +pub use oximeter::schema::FieldSchema; +pub use oximeter::schema::FieldSource; +pub use oximeter::schema::TimeseriesName; +pub use oximeter::schema::TimeseriesSchema; +pub use oximeter::DatumType; +pub use oximeter::Field; +pub use oximeter::FieldType; +pub use oximeter::Measurement; +pub use oximeter::Sample; use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; +use serde::Deserialize; +use serde::Serialize; use std::collections::BTreeMap; -use std::collections::BTreeSet; use std::convert::TryFrom; use std::io; use std::num::NonZeroU32; @@ -23,7 +33,8 @@ use thiserror::Error; mod client; pub mod model; pub mod query; -pub use client::{Client, DbWrite}; +pub use client::Client; +pub use client::DbWrite; pub use model::OXIMETER_VERSION; @@ -78,9 +89,6 @@ pub enum Error { #[error("The field comparison {op} is not valid for the type {ty}")] InvalidFieldCmp { op: String, ty: FieldType }, - #[error("Invalid timeseries name")] - InvalidTimeseriesName, - #[error("Query must resolve to a single timeseries if limit is specified")] InvalidLimitQuery, @@ -117,136 +125,6 @@ pub enum Error { NonSequentialSchemaVersions, } -/// A timeseries name. -/// -/// Timeseries are named by concatenating the names of their target and metric, joined with a -/// colon. -#[derive( - Debug, Clone, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize, -)] -#[serde(try_from = "&str")] -pub struct TimeseriesName(String); - -impl JsonSchema for TimeseriesName { - fn schema_name() -> String { - "TimeseriesName".to_string() - } - - fn json_schema( - _: &mut schemars::gen::SchemaGenerator, - ) -> schemars::schema::Schema { - schemars::schema::SchemaObject { - metadata: Some(Box::new(schemars::schema::Metadata { - title: Some("The name of a timeseries".to_string()), - description: Some( - "Names are constructed by concatenating the target \ - and metric names with ':'. Target and metric \ - names must be lowercase alphanumeric characters \ - with '_' separating words." - .to_string(), - ), - ..Default::default() - })), - instance_type: Some(schemars::schema::InstanceType::String.into()), - string: Some(Box::new(schemars::schema::StringValidation { - pattern: Some(TIMESERIES_NAME_REGEX.to_string()), - ..Default::default() - })), - ..Default::default() - } - .into() - } -} - -impl std::ops::Deref for TimeseriesName { - type Target = String; - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl std::fmt::Display for TimeseriesName { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -impl std::convert::TryFrom<&str> for TimeseriesName { - type Error = Error; - fn try_from(s: &str) -> Result { - validate_timeseries_name(s).map(|s| TimeseriesName(s.to_string())) - } -} - -impl std::convert::TryFrom for TimeseriesName { - type Error = Error; - fn try_from(s: String) -> Result { - validate_timeseries_name(&s)?; - Ok(TimeseriesName(s)) - } -} - -impl std::str::FromStr for TimeseriesName { - type Err = Error; - fn from_str(s: &str) -> Result { - s.try_into() - } -} - -impl PartialEq for TimeseriesName -where - T: AsRef, -{ - fn eq(&self, other: &T) -> bool { - self.0.eq(other.as_ref()) - } -} - -fn validate_timeseries_name(s: &str) -> Result<&str, Error> { - if regex::Regex::new(TIMESERIES_NAME_REGEX).unwrap().is_match(s) { - Ok(s) - } else { - Err(Error::InvalidTimeseriesName) - } -} - -/// The schema for a timeseries. -/// -/// This includes the name of the timeseries, as well as the datum type of its metric and the -/// schema for each field. -#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] -pub struct TimeseriesSchema { - pub timeseries_name: TimeseriesName, - pub field_schema: BTreeSet, - pub datum_type: DatumType, - pub created: DateTime, -} - -impl TimeseriesSchema { - /// Return the schema for the given field. - pub fn field_schema(&self, name: S) -> Option<&FieldSchema> - where - S: AsRef, - { - self.field_schema.iter().find(|field| field.name == name.as_ref()) - } - - /// Return the target and metric component names for this timeseries - pub fn component_names(&self) -> (&str, &str) { - self.timeseries_name - .split_once(':') - .expect("Incorrectly formatted timseries name") - } -} - -impl PartialEq for TimeseriesSchema { - fn eq(&self, other: &TimeseriesSchema) -> bool { - self.timeseries_name == other.timeseries_name - && self.datum_type == other.datum_type - && self.field_schema == other.field_schema - } -} - impl From for TimeseriesSchema { fn from(schema: model::DbTimeseriesSchema) -> TimeseriesSchema { TimeseriesSchema { @@ -285,25 +163,6 @@ pub struct Timeseries { pub measurements: Vec, } -/// The source from which a field is derived, the target or metric. -#[derive( - Clone, - Copy, - Debug, - PartialEq, - Eq, - PartialOrd, - Ord, - Deserialize, - Serialize, - JsonSchema, -)] -#[serde(rename_all = "snake_case")] -pub enum FieldSource { - Target, - Metric, -} - #[derive( Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, )] @@ -329,24 +188,6 @@ impl From for DbFieldSource { } } -/// The name and type information for a field of a timeseries schema. -#[derive( - Clone, - Debug, - PartialEq, - Eq, - PartialOrd, - Ord, - Deserialize, - Serialize, - JsonSchema, -)] -pub struct FieldSchema { - pub name: String, - pub ty: FieldType, - pub source: FieldSource, -} - /// Type used to paginate request to list timeseries schema. pub type TimeseriesSchemaPaginationParams = PaginationParams; @@ -422,19 +263,6 @@ const DATABASE_NAME: &str = "oximeter"; // See https://clickhouse.com/docs/en/interfaces/formats/#jsoneachrow for details. const DATABASE_SELECT_FORMAT: &str = "JSONEachRow"; -// Regular expression describing valid timeseries names. -// -// Names are derived from the names of the Rust structs for the target and metric, converted to -// snake case. So the names must be valid identifiers, and generally: -// -// - Start with lowercase a-z -// - Any number of alphanumerics -// - Zero or more of the above, delimited by '-'. -// -// That describes the target/metric name, and the timeseries is two of those, joined with ':'. -const TIMESERIES_NAME_REGEX: &str = - "(([a-z]+[a-z0-9]*)(_([a-z0-9]+))*):(([a-z]+[a-z0-9]*)(_([a-z0-9]+))*)"; - #[cfg(test)] mod tests { use super::*; @@ -548,71 +376,16 @@ mod tests { ); } - // Test that we correctly order field across a target and metric. - // - // In an earlier commit, we switched from storing fields in an unordered Vec - // to using a BTree{Map,Set} to ensure ordering by name. However, the - // `TimeseriesSchema` type stored all its fields by chaining the sorted - // fields from the target and metric, without then sorting _across_ them. - // - // This was exacerbated by the error reporting, where we did in fact sort - // all fields across the target and metric, making it difficult to tell how - // the derived schema was different, if at all. - // - // This test generates a sample with a schema where the target and metric - // fields are sorted within them, but not across them. We check that the - // derived schema are actually equal, which means we've imposed that - // ordering when deriving the schema. - #[test] - fn test_schema_field_ordering_across_target_metric() { - let target_field = FieldSchema { - name: String::from("later"), - ty: FieldType::U64, - source: FieldSource::Target, - }; - let metric_field = FieldSchema { - name: String::from("earlier"), - ty: FieldType::U64, - source: FieldSource::Metric, - }; - let timeseries_name: TimeseriesName = "foo:bar".parse().unwrap(); - let datum_type = DatumType::U64; - let field_schema = - [target_field.clone(), metric_field.clone()].into_iter().collect(); - let expected_schema = TimeseriesSchema { - timeseries_name, - field_schema, - datum_type, - created: Utc::now(), - }; - - #[derive(oximeter::Target)] - struct Foo { - later: u64, - } - #[derive(oximeter::Metric)] - struct Bar { - earlier: u64, - datum: u64, - } - - let target = Foo { later: 1 }; - let metric = Bar { earlier: 2, datum: 10 }; - let sample = Sample::new(&target, &metric).unwrap(); - let derived_schema = model::schema_for(&sample); - assert_eq!(derived_schema, expected_schema); - } - #[test] fn test_unsorted_db_fields_are_sorted_on_read() { let target_field = FieldSchema { name: String::from("later"), - ty: FieldType::U64, + field_type: FieldType::U64, source: FieldSource::Target, }; let metric_field = FieldSchema { name: String::from("earlier"), - ty: FieldType::U64, + field_type: FieldType::U64, source: FieldSource::Metric, }; let timeseries_name: TimeseriesName = "foo:bar".parse().unwrap(); @@ -632,7 +405,10 @@ mod tests { // the extracted model type. let db_fields = DbFieldList { names: vec![target_field.name.clone(), metric_field.name.clone()], - types: vec![target_field.ty.into(), metric_field.ty.into()], + types: vec![ + target_field.field_type.into(), + metric_field.field_type.into(), + ], sources: vec![ target_field.source.into(), metric_field.source.into(), @@ -646,23 +422,4 @@ mod tests { }; assert_eq!(expected_schema, TimeseriesSchema::from(db_schema)); } - - #[test] - fn test_field_schema_ordering() { - let mut fields = BTreeSet::new(); - fields.insert(FieldSchema { - name: String::from("second"), - ty: FieldType::U64, - source: FieldSource::Target, - }); - fields.insert(FieldSchema { - name: String::from("first"), - ty: FieldType::U64, - source: FieldSource::Target, - }); - let mut iter = fields.iter(); - assert_eq!(iter.next().unwrap().name, "first"); - assert_eq!(iter.next().unwrap().name, "second"); - assert!(iter.next().is_none()); - } } diff --git a/oximeter/db/src/model.rs b/oximeter/db/src/model.rs index d92e646e89..b1b45eabc4 100644 --- a/oximeter/db/src/model.rs +++ b/oximeter/db/src/model.rs @@ -12,7 +12,6 @@ use crate::FieldSource; use crate::Metric; use crate::Target; use crate::TimeseriesKey; -use crate::TimeseriesName; use crate::TimeseriesSchema; use bytes::Bytes; use chrono::DateTime; @@ -118,7 +117,7 @@ impl From for BTreeSet { .zip(list.sources) .map(|((name, ty), source)| FieldSchema { name, - ty: ty.into(), + field_type: ty.into(), source: source.into(), }) .collect() @@ -131,8 +130,8 @@ impl From> for DbFieldList { let mut types = Vec::with_capacity(list.len()); let mut sources = Vec::with_capacity(list.len()); for field in list.into_iter() { - names.push(field.name); - types.push(field.ty.into()); + names.push(field.name.to_string()); + types.push(field.field_type.into()); sources.push(field.source.into()); } DbFieldList { names, types, sources } @@ -1233,70 +1232,6 @@ pub(crate) fn unroll_measurement_row_impl( } } -/// Return the schema for a `Sample`. -pub(crate) fn schema_for(sample: &Sample) -> TimeseriesSchema { - // The fields are iterated through whatever order the `Target` or `Metric` - // impl chooses. We'll store in a set ordered by field name, to ignore the - // declaration order. - let created = Utc::now(); - let field_schema = sample - .target_fields() - .map(|field| FieldSchema { - name: field.name.clone(), - ty: field.value.field_type(), - source: FieldSource::Target, - }) - .chain(sample.metric_fields().map(|field| FieldSchema { - name: field.name.clone(), - ty: field.value.field_type(), - source: FieldSource::Metric, - })) - .collect(); - TimeseriesSchema { - timeseries_name: TimeseriesName::try_from( - sample.timeseries_name.as_str(), - ) - .expect("Failed to parse timeseries name"), - field_schema, - datum_type: sample.measurement.datum_type(), - created, - } -} - -/// Return the schema for a `Target` and `Metric` -pub(crate) fn schema_for_parts(target: &T, metric: &M) -> TimeseriesSchema -where - T: traits::Target, - M: traits::Metric, -{ - let make_field_schema = |name: &str, - value: FieldValue, - source: FieldSource| { - FieldSchema { name: name.to_string(), ty: value.field_type(), source } - }; - let target_field_schema = - target.field_names().iter().zip(target.field_values()); - let metric_field_schema = - metric.field_names().iter().zip(metric.field_values()); - let field_schema = target_field_schema - .map(|(name, value)| { - make_field_schema(name, value, FieldSource::Target) - }) - .chain(metric_field_schema.map(|(name, value)| { - make_field_schema(name, value, FieldSource::Metric) - })) - .collect(); - TimeseriesSchema { - timeseries_name: TimeseriesName::try_from(oximeter::timeseries_name( - target, metric, - )) - .expect("Failed to parse timeseries name"), - field_schema, - datum_type: metric.datum_type(), - created: Utc::now(), - } -} - // A scalar timestamped sample from a gauge timeseries, as extracted from a query to the database. #[derive(Debug, Clone, Deserialize)] struct DbTimeseriesScalarGaugeSample { @@ -1669,11 +1604,10 @@ pub(crate) fn parse_field_select_row( "Expected pairs of (field_name, field_value) from the field query" ); let (target_name, metric_name) = schema.component_names(); - let mut n_fields = 0; let mut target_fields = Vec::new(); let mut metric_fields = Vec::new(); let mut actual_fields = row.fields.values(); - while n_fields < schema.field_schema.len() { + for _ in 0..schema.field_schema.len() { // Extract the field name from the row and find a matching expected field. let actual_field_name = actual_fields .next() @@ -1682,7 +1616,7 @@ pub(crate) fn parse_field_select_row( .as_str() .expect("Expected a string field name") .to_string(); - let expected_field = schema.field_schema(&name).expect( + let expected_field = schema.schema_for_field(&name).expect( "Found field with name that is not part of the timeseries schema", ); @@ -1690,7 +1624,7 @@ pub(crate) fn parse_field_select_row( let actual_field_value = actual_fields .next() .expect("Missing a field value from a field select query"); - let value = match expected_field.ty { + let value = match expected_field.field_type { FieldType::Bool => { FieldValue::Bool(bool::from(DbBool::from( actual_field_value @@ -1797,7 +1731,6 @@ pub(crate) fn parse_field_select_row( FieldSource::Target => target_fields.push(field), FieldSource::Metric => metric_fields.push(field), } - n_fields += 1; } ( row.timeseries_key, @@ -1874,12 +1807,12 @@ mod tests { let list: BTreeSet<_> = [ FieldSchema { name: String::from("field0"), - ty: FieldType::I64, + field_type: FieldType::I64, source: FieldSource::Target, }, FieldSchema { name: String::from("field1"), - ty: FieldType::IpAddr, + field_type: FieldType::IpAddr, source: FieldSource::Metric, }, ] diff --git a/oximeter/db/src/query.rs b/oximeter/db/src/query.rs index 6a55d3f518..2caefb24c3 100644 --- a/oximeter/db/src/query.rs +++ b/oximeter/db/src/query.rs @@ -101,7 +101,7 @@ impl SelectQueryBuilder { let field_name = field_name.as_ref().to_string(); let field_schema = self .timeseries_schema - .field_schema(&field_name) + .schema_for_field(&field_name) .ok_or_else(|| Error::NoSuchField { timeseries_name: self .timeseries_schema @@ -110,7 +110,7 @@ impl SelectQueryBuilder { field_name: field_name.clone(), })?; let field_value: FieldValue = field_value.into(); - let expected_type = field_schema.ty; + let expected_type = field_schema.field_type; let found_type = field_value.field_type(); if expected_type != found_type { return Err(Error::IncorrectFieldType { @@ -150,7 +150,7 @@ impl SelectQueryBuilder { ) -> Result { let field_schema = self .timeseries_schema - .field_schema(&selector.name) + .schema_for_field(&selector.name) .ok_or_else(|| Error::NoSuchField { timeseries_name: self .timeseries_schema @@ -158,13 +158,14 @@ impl SelectQueryBuilder { .to_string(), field_name: selector.name.clone(), })?; - if !selector.op.valid_for_type(field_schema.ty) { + let field_type = field_schema.field_type; + if !selector.op.valid_for_type(field_type) { return Err(Error::InvalidFieldCmp { op: format!("{:?}", selector.op), - ty: field_schema.ty, + ty: field_schema.field_type, }); } - let field_value = match field_schema.ty { + let field_value = match field_type { FieldType::String => FieldValue::from(&selector.value), FieldType::I8 => parse_selector_field_value::( &field_schema, @@ -214,9 +215,9 @@ impl SelectQueryBuilder { let comparison = FieldComparison { op: selector.op, value: field_value }; let selector = FieldSelector { - name: field_schema.name.clone(), + name: field_schema.name.to_string(), comparison: Some(comparison), - ty: field_schema.ty, + ty: field_type, }; self.field_selectors.insert(field_schema.clone(), selector); Ok(self) @@ -248,7 +249,7 @@ impl SelectQueryBuilder { T: Target, M: Metric, { - let schema = crate::model::schema_for_parts(target, metric); + let schema = TimeseriesSchema::new(target, metric); let mut builder = Self::new(&schema); let target_fields = target.field_names().iter().zip(target.field_values()); @@ -279,9 +280,9 @@ impl SelectQueryBuilder { for field in timeseries_schema.field_schema.iter() { let key = field.clone(); field_selectors.entry(key).or_insert_with(|| FieldSelector { - name: field.name.clone(), + name: field.name.to_string(), comparison: None, - ty: field.ty, + ty: field.field_type, }); } SelectQuery { @@ -309,8 +310,8 @@ where { Ok(FieldValue::from(s.parse::().map_err(|_| { Error::InvalidFieldValue { - field_name: field.name.clone(), - field_type: field.ty, + field_name: field.name.to_string(), + field_type: field.field_type, value: s.to_string(), } })?)) @@ -778,12 +779,12 @@ mod tests { field_schema: [ FieldSchema { name: "f0".to_string(), - ty: FieldType::I64, + field_type: FieldType::I64, source: FieldSource::Target, }, FieldSchema { name: "f1".to_string(), - ty: FieldType::Bool, + field_type: FieldType::Bool, source: FieldSource::Target, }, ] @@ -981,6 +982,7 @@ mod tests { "Expected an exact comparison when building a query from parts", ); + println!("{builder:#?}"); assert_eq!( builder.field_selector(FieldSource::Metric, "baz").unwrap(), &FieldSelector { @@ -1002,12 +1004,12 @@ mod tests { field_schema: [ FieldSchema { name: "f0".to_string(), - ty: FieldType::I64, + field_type: FieldType::I64, source: FieldSource::Target, }, FieldSchema { name: "f1".to_string(), - ty: FieldType::Bool, + field_type: FieldType::Bool, source: FieldSource::Target, }, ] @@ -1065,12 +1067,12 @@ mod tests { field_schema: [ FieldSchema { name: "f0".to_string(), - ty: FieldType::I64, + field_type: FieldType::I64, source: FieldSource::Target, }, FieldSchema { name: "f1".to_string(), - ty: FieldType::Bool, + field_type: FieldType::Bool, source: FieldSource::Target, }, ] @@ -1116,12 +1118,12 @@ mod tests { field_schema: [ FieldSchema { name: "f0".to_string(), - ty: FieldType::I64, + field_type: FieldType::I64, source: FieldSource::Target, }, FieldSchema { name: "f1".to_string(), - ty: FieldType::Bool, + field_type: FieldType::Bool, source: FieldSource::Target, }, ] diff --git a/oximeter/oximeter/Cargo.toml b/oximeter/oximeter/Cargo.toml index 0cb2d8cace..b545c697de 100644 --- a/oximeter/oximeter/Cargo.toml +++ b/oximeter/oximeter/Cargo.toml @@ -11,8 +11,10 @@ chrono.workspace = true num.workspace = true omicron-common.workspace = true oximeter-macro-impl.workspace = true +regex.workspace = true schemars = { workspace = true, features = [ "uuid1", "bytes", "chrono" ] } serde.workspace = true +serde_json.workspace = true strum.workspace = true thiserror.workspace = true uuid.workspace = true diff --git a/oximeter/oximeter/src/lib.rs b/oximeter/oximeter/src/lib.rs index 2ced404eae..1855762abe 100644 --- a/oximeter/oximeter/src/lib.rs +++ b/oximeter/oximeter/src/lib.rs @@ -108,10 +108,14 @@ pub use oximeter_macro_impl::*; extern crate self as oximeter; pub mod histogram; +pub mod schema; pub mod test_util; pub mod traits; pub mod types; +pub use schema::FieldSchema; +pub use schema::TimeseriesName; +pub use schema::TimeseriesSchema; pub use traits::Metric; pub use traits::Producer; pub use traits::Target; diff --git a/oximeter/oximeter/src/schema.rs b/oximeter/oximeter/src/schema.rs new file mode 100644 index 0000000000..b6953fda52 --- /dev/null +++ b/oximeter/oximeter/src/schema.rs @@ -0,0 +1,640 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +// Copyright 2023 Oxide Computer Company + +//! Tools for working with schema for fields and timeseries. + +use crate::types::DatumType; +use crate::types::FieldType; +use crate::types::MetricsError; +use crate::types::Sample; +use crate::Metric; +use crate::Target; +use chrono::DateTime; +use chrono::Utc; +use schemars::JsonSchema; +use serde::Deserialize; +use serde::Serialize; +use std::collections::btree_map::Entry; +use std::collections::BTreeMap; +use std::collections::BTreeSet; +use std::fmt::Write; +use std::path::Path; + +/// The name and type information for a field of a timeseries schema. +#[derive( + Clone, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Deserialize, + Serialize, + JsonSchema, +)] +pub struct FieldSchema { + pub name: String, + pub field_type: FieldType, + pub source: FieldSource, +} + +/// The source from which a field is derived, the target or metric. +#[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Deserialize, + Serialize, + JsonSchema, +)] +#[serde(rename_all = "snake_case")] +pub enum FieldSource { + Target, + Metric, +} + +/// A timeseries name. +/// +/// Timeseries are named by concatenating the names of their target and metric, joined with a +/// colon. +#[derive( + Debug, Clone, PartialEq, PartialOrd, Ord, Eq, Hash, Serialize, Deserialize, +)] +#[serde(try_from = "&str")] +pub struct TimeseriesName(String); + +impl JsonSchema for TimeseriesName { + fn schema_name() -> String { + "TimeseriesName".to_string() + } + + fn json_schema( + _: &mut schemars::gen::SchemaGenerator, + ) -> schemars::schema::Schema { + schemars::schema::SchemaObject { + metadata: Some(Box::new(schemars::schema::Metadata { + title: Some("The name of a timeseries".to_string()), + description: Some( + "Names are constructed by concatenating the target \ + and metric names with ':'. Target and metric \ + names must be lowercase alphanumeric characters \ + with '_' separating words." + .to_string(), + ), + ..Default::default() + })), + instance_type: Some(schemars::schema::InstanceType::String.into()), + string: Some(Box::new(schemars::schema::StringValidation { + pattern: Some(TIMESERIES_NAME_REGEX.to_string()), + ..Default::default() + })), + ..Default::default() + } + .into() + } +} + +impl std::ops::Deref for TimeseriesName { + type Target = String; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl std::fmt::Display for TimeseriesName { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl std::convert::TryFrom<&str> for TimeseriesName { + type Error = MetricsError; + fn try_from(s: &str) -> Result { + validate_timeseries_name(s).map(|s| TimeseriesName(s.to_string())) + } +} + +impl std::convert::TryFrom for TimeseriesName { + type Error = MetricsError; + fn try_from(s: String) -> Result { + validate_timeseries_name(&s)?; + Ok(TimeseriesName(s)) + } +} + +impl std::str::FromStr for TimeseriesName { + type Err = MetricsError; + fn from_str(s: &str) -> Result { + s.try_into() + } +} + +impl PartialEq for TimeseriesName +where + T: AsRef, +{ + fn eq(&self, other: &T) -> bool { + self.0.eq(other.as_ref()) + } +} + +fn validate_timeseries_name(s: &str) -> Result<&str, MetricsError> { + if regex::Regex::new(TIMESERIES_NAME_REGEX).unwrap().is_match(s) { + Ok(s) + } else { + Err(MetricsError::InvalidTimeseriesName) + } +} + +/// The schema for a timeseries. +/// +/// This includes the name of the timeseries, as well as the datum type of its metric and the +/// schema for each field. +#[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] +pub struct TimeseriesSchema { + pub timeseries_name: TimeseriesName, + pub field_schema: BTreeSet, + pub datum_type: DatumType, + pub created: DateTime, +} + +impl From<&Sample> for TimeseriesSchema { + fn from(sample: &Sample) -> Self { + let timeseries_name = sample.timeseries_name.parse().unwrap(); + let mut field_schema = BTreeSet::new(); + for field in sample.target_fields() { + let schema = FieldSchema { + name: field.name.clone(), + field_type: field.value.field_type(), + source: FieldSource::Target, + }; + field_schema.insert(schema); + } + for field in sample.metric_fields() { + let schema = FieldSchema { + name: field.name.clone(), + field_type: field.value.field_type(), + source: FieldSource::Metric, + }; + field_schema.insert(schema); + } + let datum_type = sample.measurement.datum_type(); + Self { timeseries_name, field_schema, datum_type, created: Utc::now() } + } +} + +impl TimeseriesSchema { + /// Construct a timeseries schema from a target and metric. + pub fn new(target: &T, metric: &M) -> Self + where + T: Target, + M: Metric, + { + let timeseries_name = + TimeseriesName::try_from(crate::timeseries_name(target, metric)) + .unwrap(); + let mut field_schema = BTreeSet::new(); + for field in target.fields() { + let schema = FieldSchema { + name: field.name.clone(), + field_type: field.value.field_type(), + source: FieldSource::Target, + }; + field_schema.insert(schema); + } + for field in metric.fields() { + let schema = FieldSchema { + name: field.name.clone(), + field_type: field.value.field_type(), + source: FieldSource::Metric, + }; + field_schema.insert(schema); + } + let datum_type = metric.datum_type(); + Self { timeseries_name, field_schema, datum_type, created: Utc::now() } + } + + /// Construct a timeseries schema from a sample + pub fn from_sample(sample: &Sample) -> Self { + Self::from(sample) + } + + /// Return the schema for the given field. + pub fn schema_for_field(&self, name: S) -> Option<&FieldSchema> + where + S: AsRef, + { + self.field_schema.iter().find(|field| field.name == name.as_ref()) + } + + /// Return the target and metric component names for this timeseries + pub fn component_names(&self) -> (&str, &str) { + self.timeseries_name + .split_once(':') + .expect("Incorrectly formatted timseries name") + } +} + +impl PartialEq for TimeseriesSchema { + fn eq(&self, other: &TimeseriesSchema) -> bool { + self.timeseries_name == other.timeseries_name + && self.datum_type == other.datum_type + && self.field_schema == other.field_schema + } +} + +// Regular expression describing valid timeseries names. +// +// Names are derived from the names of the Rust structs for the target and metric, converted to +// snake case. So the names must be valid identifiers, and generally: +// +// - Start with lowercase a-z +// - Any number of alphanumerics +// - Zero or more of the above, delimited by '-'. +// +// That describes the target/metric name, and the timeseries is two of those, joined with ':'. +const TIMESERIES_NAME_REGEX: &str = + "(([a-z]+[a-z0-9]*)(_([a-z0-9]+))*):(([a-z]+[a-z0-9]*)(_([a-z0-9]+))*)"; + +/// A set of timeseries schema, useful for testing changes to targets or +/// metrics. +#[derive(Debug, Default, Deserialize, PartialEq, Serialize)] +pub struct SchemaSet { + #[serde(flatten)] + inner: BTreeMap, +} + +impl SchemaSet { + /// Insert a timeseries schema, checking for conflicts. + /// + /// This inserts the schema derived from `target` and `metric`. If one + /// does _not_ already exist in `self` or a _matching_ one exists, `None` + /// is returned. + /// + /// If the derived schema _conflicts_ with one in `self`, the existing + /// schema is returned. + pub fn insert_checked( + &mut self, + target: &T, + metric: &M, + ) -> Option + where + T: Target, + M: Metric, + { + let new = TimeseriesSchema::new(target, metric); + let name = new.timeseries_name.clone(); + match self.inner.entry(name) { + Entry::Vacant(entry) => { + entry.insert(new); + None + } + Entry::Occupied(entry) => { + let existing = entry.get(); + if existing == &new { + None + } else { + Some(existing.clone()) + } + } + } + } + + /// Compare the set of schema against the contents of a file. + /// + /// This function loads a `SchemaSet` from the provided JSON file, and + /// asserts that the contained schema matches those in `self`. Note that + /// equality of `TimeseriesSchema` ignores creation timestamps, so this + /// compares the "identity" data: timeseries name, field names, field types, + /// and field sources. + /// + /// This is intentionally similar to `expectorate::assert_contents()`. If + /// the provided file doesn't exist, it's treated as empty. If it does, a + /// `SchemaSet` is deserialized from it and a comparison between that and + /// `self` is done. + /// + /// You can use `EXPECTORATE=overwrite` to overwrite the existing file, + /// rather than panicking. + pub fn assert_contents(&self, path: impl AsRef) { + let path = path.as_ref(); + let v = std::env::var_os("EXPECTORATE"); + let overwrite = + v.as_deref().and_then(std::ffi::OsStr::to_str) == Some("overwrite"); + let expected_contents = serde_json::to_string_pretty(self).unwrap(); + if overwrite { + if let Err(e) = std::fs::write(path, &expected_contents) { + panic!( + "Failed to write contents to '{}': {}", + path.display(), + e + ); + } + } else { + // If the file doesn't exist, it's just empty and we'll create an + // empty set of schema. + let contents = if !path.exists() { + String::from("{}") + } else { + match std::fs::read_to_string(path) { + Err(e) => { + panic!("Failed to read '{}': {}", path.display(), e) + } + Ok(c) => c, + } + }; + let other: Self = serde_json::from_str(&contents).unwrap(); + if self == &other { + return; + } + + let mut diffs = String::new(); + writeln!( + &mut diffs, + "Timeseries schema in \"{}\" do not match\n", + path.display() + ) + .unwrap(); + + // Print schema in self that are not in the file, or mismatched + // schema. + for (name, schema) in self.inner.iter() { + let Some(other_schema) = other.inner.get(name) else { + writeln!( + &mut diffs, + "File is missing timeseries \"{}\"", + name + ) + .unwrap(); + continue; + }; + if schema == other_schema { + continue; + } + writeln!(&mut diffs, "Timeseries \"{name}\" differs").unwrap(); + + // Print out any differences in the datum type. + if schema.datum_type != other_schema.datum_type { + writeln!( + &mut diffs, + " Expected datum type: {}", + schema.datum_type + ) + .unwrap(); + writeln!( + &mut diffs, + " Actual datum type: {}", + other_schema.datum_type + ) + .unwrap(); + } + + // Print fields in self that are not in other, or are mismatched + for field in schema.field_schema.iter() { + let Some(other_field) = + other_schema.field_schema.get(field) + else { + writeln!( + &mut diffs, + " File is missing {:?} field \"{}\"", + field.source, field.name, + ) + .unwrap(); + continue; + }; + if field == other_field { + continue; + } + + writeln!( + &mut diffs, + " File has mismatched field \"{}\"", + field.name + ) + .unwrap(); + writeln!( + &mut diffs, + " Expected type: {}", + field.field_type + ) + .unwrap(); + writeln!( + &mut diffs, + " Actual type: {}", + other_field.field_type + ) + .unwrap(); + writeln!( + &mut diffs, + " Expected source: {:?}", + field.source + ) + .unwrap(); + writeln!( + &mut diffs, + " Actual source: {:?}", + other_field.source + ) + .unwrap(); + } + + // Print fields in other that are not in self, fields that are + // in both but don't match are taken care of in the above loop. + for other_field in other_schema.field_schema.iter() { + if schema.field_schema.contains(other_field) { + continue; + } + + writeln!( + &mut diffs, + " Current set is missing {:?} field \"{}\"", + other_field.source, other_field.name, + ) + .unwrap(); + } + } + + // Print schema that are in the file, but not self. Those that don't + // match are handled in the above block. + for key in other.inner.keys() { + if !self.inner.contains_key(key) { + writeln!( + &mut diffs, + " Current set is missing timeseries \"{}\"", + key + ) + .unwrap(); + } + } + panic!("{}", diffs); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::convert::TryFrom; + use uuid::Uuid; + + #[test] + fn test_timeseries_name() { + let name = TimeseriesName::try_from("foo:bar").unwrap(); + assert_eq!(format!("{}", name), "foo:bar"); + } + + #[test] + fn test_timeseries_name_from_str() { + assert!(TimeseriesName::try_from("a:b").is_ok()); + assert!(TimeseriesName::try_from("a_a:b_b").is_ok()); + assert!(TimeseriesName::try_from("a0:b0").is_ok()); + assert!(TimeseriesName::try_from("a_0:b_0").is_ok()); + + assert!(TimeseriesName::try_from("_:b").is_err()); + assert!(TimeseriesName::try_from("a_:b").is_err()); + assert!(TimeseriesName::try_from("0:b").is_err()); + assert!(TimeseriesName::try_from(":b").is_err()); + assert!(TimeseriesName::try_from("a:").is_err()); + assert!(TimeseriesName::try_from("123").is_err()); + } + + #[derive(Target)] + struct MyTarget { + id: Uuid, + name: String, + } + + const ID: Uuid = uuid::uuid!("ca565ef4-65dc-4ab0-8622-7be43ed72105"); + + impl Default for MyTarget { + fn default() -> Self { + Self { id: ID, name: String::from("name") } + } + } + + #[derive(Metric)] + struct MyMetric { + happy: bool, + datum: u64, + } + + impl Default for MyMetric { + fn default() -> Self { + Self { happy: true, datum: 0 } + } + } + + #[test] + fn test_timeseries_schema_from_parts() { + let target = MyTarget::default(); + let metric = MyMetric::default(); + let schema = TimeseriesSchema::new(&target, &metric); + + assert_eq!(schema.timeseries_name, "my_target:my_metric"); + let f = schema.schema_for_field("id").unwrap(); + assert_eq!(f.name, "id"); + assert_eq!(f.field_type, FieldType::Uuid); + assert_eq!(f.source, FieldSource::Target); + + let f = schema.schema_for_field("name").unwrap(); + assert_eq!(f.name, "name"); + assert_eq!(f.field_type, FieldType::String); + assert_eq!(f.source, FieldSource::Target); + + let f = schema.schema_for_field("happy").unwrap(); + assert_eq!(f.name, "happy"); + assert_eq!(f.field_type, FieldType::Bool); + assert_eq!(f.source, FieldSource::Metric); + assert_eq!(schema.datum_type, DatumType::U64); + } + + #[test] + fn test_timeseries_schema_from_sample() { + let target = MyTarget::default(); + let metric = MyMetric::default(); + let sample = Sample::new(&target, &metric).unwrap(); + let schema = TimeseriesSchema::new(&target, &metric); + let schema_from_sample = TimeseriesSchema::from(&sample); + assert_eq!(schema, schema_from_sample); + } + + // Test that we correctly order field across a target and metric. + // + // In an earlier commit, we switched from storing fields in an unordered Vec + // to using a BTree{Map,Set} to ensure ordering by name. However, the + // `TimeseriesSchema` type stored all its fields by chaining the sorted + // fields from the target and metric, without then sorting _across_ them. + // + // This was exacerbated by the error reporting, where we did in fact sort + // all fields across the target and metric, making it difficult to tell how + // the derived schema was different, if at all. + // + // This test generates a sample with a schema where the target and metric + // fields are sorted within them, but not across them. We check that the + // derived schema are actually equal, which means we've imposed that + // ordering when deriving the schema. + #[test] + fn test_schema_field_ordering_across_target_metric() { + let target_field = FieldSchema { + name: String::from("later"), + field_type: FieldType::U64, + source: FieldSource::Target, + }; + let metric_field = FieldSchema { + name: String::from("earlier"), + field_type: FieldType::U64, + source: FieldSource::Metric, + }; + let timeseries_name: TimeseriesName = "foo:bar".parse().unwrap(); + let datum_type = DatumType::U64; + let field_schema = + [target_field.clone(), metric_field.clone()].into_iter().collect(); + let expected_schema = TimeseriesSchema { + timeseries_name, + field_schema, + datum_type, + created: Utc::now(), + }; + + #[derive(oximeter::Target)] + struct Foo { + later: u64, + } + #[derive(oximeter::Metric)] + struct Bar { + earlier: u64, + datum: u64, + } + + let target = Foo { later: 1 }; + let metric = Bar { earlier: 2, datum: 10 }; + let sample = Sample::new(&target, &metric).unwrap(); + let derived_schema = TimeseriesSchema::from(&sample); + assert_eq!(derived_schema, expected_schema); + } + + #[test] + fn test_field_schema_ordering() { + let mut fields = BTreeSet::new(); + fields.insert(FieldSchema { + name: String::from("second"), + field_type: FieldType::U64, + source: FieldSource::Target, + }); + fields.insert(FieldSchema { + name: String::from("first"), + field_type: FieldType::U64, + source: FieldSource::Target, + }); + let mut iter = fields.iter(); + assert_eq!(iter.next().unwrap().name, "first"); + assert_eq!(iter.next().unwrap().name, "second"); + assert!(iter.next().is_none()); + } +} diff --git a/oximeter/oximeter/src/types.rs b/oximeter/oximeter/src/types.rs index 23dbe2be6b..3d74bec72c 100644 --- a/oximeter/oximeter/src/types.rs +++ b/oximeter/oximeter/src/types.rs @@ -629,6 +629,8 @@ pub enum MetricsError { #[error("Missing datum of type {datum_type} cannot have a start time")] MissingDatumCannotHaveStartTime { datum_type: DatumType }, + #[error("Invalid timeseries name")] + InvalidTimeseriesName, } impl From for omicron_common::api::external::Error { diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 88cadda842..e51390d0c4 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -89,7 +89,7 @@ semver = { version = "1.0.20", features = ["serde"] } serde = { version = "1.0.192", features = ["alloc", "derive", "rc"] } serde_json = { version = "1.0.108", features = ["raw_value"] } sha2 = { version = "0.10.8", features = ["oid"] } -similar = { version = "2.2.1", features = ["inline", "unicode"] } +similar = { version = "2.3.0", features = ["inline", "unicode"] } slog = { version = "2.7.0", features = ["dynamic-keys", "max_level_trace", "release_max_level_debug", "release_max_level_trace"] } snafu = { version = "0.7.5", features = ["futures"] } spin = { version = "0.9.8" } @@ -190,7 +190,7 @@ semver = { version = "1.0.20", features = ["serde"] } serde = { version = "1.0.192", features = ["alloc", "derive", "rc"] } serde_json = { version = "1.0.108", features = ["raw_value"] } sha2 = { version = "0.10.8", features = ["oid"] } -similar = { version = "2.2.1", features = ["inline", "unicode"] } +similar = { version = "2.3.0", features = ["inline", "unicode"] } slog = { version = "2.7.0", features = ["dynamic-keys", "max_level_trace", "release_max_level_debug", "release_max_level_trace"] } snafu = { version = "0.7.5", features = ["futures"] } spin = { version = "0.9.8" }