From aa81e57e67a1d3d67a267fbff12514aad93a5813 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Sat, 21 Oct 2023 22:26:17 +0000 Subject: [PATCH 1/3] Sort fields when extracting timeseries schema - Fields are reported in a sample sorted within a target or metric, but not necessarily between them. When we derive a schema from a sample, this commit collects fields into a set to impose a total order. - Convert between a `DbFieldList` and `BTreeSet` when inserting / reading fields from the nested tables in ClickHouse. - Add sanity test that we're sorting field schema correctly, including on read from the database. - Errors for schema mismatches report entire schema, not just fields. - Logic around new schema was pretty complicated, and it was difficult to reason about its correctness. Make the lock async, and check both the internal cache and database when looking for a schema for a new sample. - Add test that we don't add a schema to the DB again when it exists in the DB, but not the internal cache. Instead, we update the cache. --- oximeter/db/src/client.rs | 165 ++++++++++++++++++++++++-------------- oximeter/db/src/lib.rs | 140 ++++++++++++++++++++++++++++++-- oximeter/db/src/model.rs | 18 +++-- oximeter/db/src/query.rs | 29 ++++--- 4 files changed, 268 insertions(+), 84 deletions(-) diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index ffa5d97d52..df8bfe1575 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -35,7 +35,7 @@ use std::collections::BTreeSet; use std::convert::TryFrom; use std::net::SocketAddr; use std::num::NonZeroU32; -use std::sync::Mutex; +use tokio::sync::Mutex; use uuid::Uuid; #[usdt::provider(provider = "clickhouse__client")] @@ -208,16 +208,12 @@ impl Client { &self, name: &TimeseriesName, ) -> Result, Error> { - { - let map = self.schema.lock().unwrap(); - if let Some(s) = map.get(name) { - return Ok(Some(s.clone())); - } + let mut schema = self.schema.lock().await; + if let Some(s) = schema.get(name) { + return Ok(Some(s.clone())); } - // `get_schema` acquires the lock internally, so the above scope is required to avoid - // deadlock. - self.get_schema().await?; - Ok(self.schema.lock().unwrap().get(name).map(Clone::clone)) + self.get_schema_locked(&mut schema).await?; + Ok(schema.get(name).map(Clone::clone)) } /// List timeseries schema, paginated. @@ -384,30 +380,47 @@ impl Client { &self, sample: &Sample, ) -> Result, Error> { - let schema = model::schema_for(sample); - let name = schema.timeseries_name.clone(); - let maybe_new_schema = match self.schema.lock().unwrap().entry(name) { - Entry::Vacant(entry) => Ok(Some(entry.insert(schema).clone())), + let sample_schema = model::schema_for(sample); + let name = sample_schema.timeseries_name.clone(); + let mut schema = self.schema.lock().await; + + // We need to possibly check that this schema is in the local cache, or + // in the database, all while we hold the lock to ensure there's no + // concurrent additions. This containment check is needed so that we + // check both the local cache and the database, to avoid adding a schema + // a second time. + if !schema.contains_key(&name) { + self.get_schema_locked(&mut schema).await?; + } + match schema.entry(name) { Entry::Occupied(entry) => { let existing_schema = entry.get(); - if existing_schema == &schema { + if existing_schema == &sample_schema { Ok(None) } else { - let err = - error_for_schema_mismatch(&schema, &existing_schema); error!( self.log, - "timeseries schema mismatch, sample will be skipped: {}", - err + "timeseries schema mismatch, sample will be skipped"; + "expected" => ?existing_schema, + "actual" => ?sample_schema, + "sample" => ?sample, ); - Err(err) + Err(Error::SchemaMismatch { + expected: existing_schema.clone(), + actual: sample_schema, + }) } } - }?; - Ok(maybe_new_schema.map(|schema| { - serde_json::to_string(&model::DbTimeseriesSchema::from(schema)) - .expect("Failed to convert schema to DB model") - })) + Entry::Vacant(entry) => { + entry.insert(sample_schema.clone()); + Ok(Some( + serde_json::to_string(&model::DbTimeseriesSchema::from( + sample_schema, + )) + .expect("Failed to convert schema to DB model"), + )) + } + } } // Select the timeseries, including keys and field values, that match the given field-selection @@ -503,10 +516,12 @@ impl Client { response } - async fn get_schema(&self) -> Result<(), Error> { + async fn get_schema_locked( + &self, + schema: &mut BTreeMap, + ) -> Result<(), Error> { debug!(self.log, "retrieving timeseries schema from database"); let sql = { - let schema = self.schema.lock().unwrap(); if schema.is_empty() { format!( "SELECT * FROM {db_name}.timeseries_schema FORMAT JSONEachRow;", @@ -545,7 +560,7 @@ impl Client { ); (schema.timeseries_name.clone(), schema) }); - self.schema.lock().unwrap().extend(new); + schema.extend(new); } Ok(()) } @@ -593,7 +608,7 @@ impl DbWrite for Client { } Ok(schema) => { if let Some(schema) = schema { - debug!(self.log, "new timeseries schema: {:?}", schema); + debug!(self.log, "new timeseries schema"; "schema" => ?schema); new_schema.push(schema); } } @@ -730,28 +745,6 @@ async fn handle_db_response( } } -// Generate an error describing a schema mismatch -fn error_for_schema_mismatch( - schema: &TimeseriesSchema, - existing_schema: &TimeseriesSchema, -) -> Error { - let expected = existing_schema - .field_schema - .iter() - .map(|field| (field.name.clone(), field.ty)) - .collect(); - let actual = schema - .field_schema - .iter() - .map(|field| (field.name.clone(), field.ty)) - .collect(); - Error::SchemaMismatch { - name: schema.timeseries_name.to_string(), - expected, - actual, - } -} - #[cfg(test)] mod tests { use super::*; @@ -1599,7 +1592,7 @@ mod tests { ); // Clear the internal caches of seen schema - client.schema.lock().unwrap().clear(); + client.schema.lock().await.clear(); // Insert the new sample client.insert_samples(&[sample.clone()]).await.unwrap(); @@ -1611,7 +1604,7 @@ mod tests { let expected_schema = client .schema .lock() - .unwrap() + .await .get(×eries_name) .expect( "After inserting a new sample, its schema should be included", @@ -2484,13 +2477,13 @@ mod tests { #[tokio::test] async fn test_get_schema_no_new_values() { let (mut db, client, _) = setup_filter_testcase().await; - let schema = &client.schema.lock().unwrap().clone(); - client.get_schema().await.expect("Failed to get timeseries schema"); - assert_eq!( - schema, - &*client.schema.lock().unwrap(), - "Schema shouldn't change" - ); + let original_schema = client.schema.lock().await.clone(); + let mut schema = client.schema.lock().await; + client + .get_schema_locked(&mut schema) + .await + .expect("Failed to get timeseries schema"); + assert_eq!(&original_schema, &*schema, "Schema shouldn't change"); db.cleanup().await.expect("Failed to cleanup database"); } @@ -2585,4 +2578,56 @@ mod tests { ); db.cleanup().await.expect("Failed to cleanup database"); } + + #[tokio::test] + async fn test_update_schema_cache_on_new_sample() { + usdt::register_probes().unwrap(); + let logctx = test_setup_log("test_update_schema_cache_on_new_sample"); + let log = &logctx.log; + + // Let the OS assign a port and discover it after ClickHouse starts + let mut db = ClickHouseInstance::new_single_node(0) + .await + .expect("Failed to start ClickHouse"); + let address = SocketAddr::new("::1".parse().unwrap(), db.port()); + + let client = Client::new(address, &log); + client + .init_single_node_db() + .await + .expect("Failed to initialize timeseries database"); + let samples = [test_util::make_sample()]; + client.insert_samples(&samples).await.unwrap(); + + // Get the count of schema directly from the DB, which should have just + // one. + let response = client.execute_with_body( + "SELECT COUNT() FROM oximeter.timeseries_schema FORMAT JSONEachRow; + ").await.unwrap(); + assert_eq!(response.lines().count(), 1, "Expected exactly 1 schema"); + assert_eq!(client.schema.lock().await.len(), 1); + + // Clear the internal cache, and insert the sample again. + // + // This should cause us to look up the schema in the DB again, but _not_ + // insert a new one. + client.schema.lock().await.clear(); + assert!(client.schema.lock().await.is_empty()); + + client.insert_samples(&samples).await.unwrap(); + + // Get the count of schema directly from the DB, which should still have + // only the one schema. + let response = client.execute_with_body( + "SELECT COUNT() FROM oximeter.timeseries_schema FORMAT JSONEachRow; + ").await.unwrap(); + assert_eq!( + response.lines().count(), + 1, + "Expected exactly 1 schema again" + ); + assert_eq!(client.schema.lock().await.len(), 1); + db.cleanup().await.expect("Failed to cleanup ClickHouse server"); + logctx.cleanup_successful(); + } } diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index c878b8ff2a..11ecbeddc8 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -4,7 +4,7 @@ //! Tools for interacting with the control plane telemetry database. -// Copyright 2021 Oxide Computer Company +// Copyright 2023 Oxide Computer Company use crate::query::StringFieldSelector; use chrono::{DateTime, Utc}; @@ -13,6 +13,7 @@ pub use oximeter::{DatumType, Field, FieldType, Measurement, Sample}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; +use std::collections::BTreeSet; use std::convert::TryFrom; use std::num::NonZeroU32; use thiserror::Error; @@ -36,12 +37,8 @@ pub enum Error { Database(String), /// A schema provided when collecting samples did not match the expected schema - #[error("Schema mismatch for timeseries '{name}', expected fields {expected:?} found fields {actual:?}")] - SchemaMismatch { - name: String, - expected: BTreeMap, - actual: BTreeMap, - }, + #[error("Schema mismatch for timeseries '{0}'", expected.timeseries_name)] + SchemaMismatch { expected: TimeseriesSchema, actual: TimeseriesSchema }, #[error("Timeseries not found for: {0}")] TimeseriesNotFound(String), @@ -153,6 +150,13 @@ impl std::convert::TryFrom for TimeseriesName { } } +impl std::str::FromStr for TimeseriesName { + type Err = Error; + fn from_str(s: &str) -> Result { + s.try_into() + } +} + impl PartialEq for TimeseriesName where T: AsRef, @@ -177,7 +181,7 @@ fn validate_timeseries_name(s: &str) -> Result<&str, Error> { #[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] pub struct TimeseriesSchema { pub timeseries_name: TimeseriesName, - pub field_schema: Vec, + pub field_schema: BTreeSet, pub datum_type: DatumType, pub created: DateTime, } @@ -398,6 +402,8 @@ const TIMESERIES_NAME_REGEX: &str = #[cfg(test)] mod tests { use super::*; + use crate::model::DbFieldList; + use crate::model::DbTimeseriesSchema; use std::convert::TryFrom; use uuid::Uuid; @@ -505,4 +511,122 @@ mod tests { &output.join("\n"), ); } + + // Test that we correctly order field across a target and metric. + // + // In an earlier commit, we switched from storing fields in an unordered Vec + // to using a BTree{Map,Set} to ensure ordering by name. However, the + // `TimeseriesSchema` type stored all its fields by chaining the sorted + // fields from the target and metric, without then sorting _across_ them. + // + // This was exacerbated by the error reporting, where we did in fact sort + // all fields across the target and metric, making it difficult to tell how + // the derived schema was different, if at all. + // + // This test generates a sample with a schema where the target and metric + // fields are sorted within them, but not across them. We check that the + // derived schema are actually equal, which means we've imposed that + // ordering when deriving the schema. + #[test] + fn test_schema_field_ordering_across_target_metric() { + let target_field = FieldSchema { + name: String::from("later"), + ty: FieldType::U64, + source: FieldSource::Target, + }; + let metric_field = FieldSchema { + name: String::from("earlier"), + ty: FieldType::U64, + source: FieldSource::Metric, + }; + let timeseries_name: TimeseriesName = "foo:bar".parse().unwrap(); + let datum_type = DatumType::U64; + let field_schema = + [target_field.clone(), metric_field.clone()].into_iter().collect(); + let expected_schema = TimeseriesSchema { + timeseries_name, + field_schema, + datum_type, + created: Utc::now(), + }; + + #[derive(oximeter::Target)] + struct Foo { + later: u64, + } + #[derive(oximeter::Metric)] + struct Bar { + earlier: u64, + datum: u64, + } + + let target = Foo { later: 1 }; + let metric = Bar { earlier: 2, datum: 10 }; + let sample = Sample::new(&target, &metric).unwrap(); + let derived_schema = model::schema_for(&sample); + assert_eq!(derived_schema, expected_schema); + } + + #[test] + fn test_unsorted_db_fields_are_sorted_on_read() { + let target_field = FieldSchema { + name: String::from("later"), + ty: FieldType::U64, + source: FieldSource::Target, + }; + let metric_field = FieldSchema { + name: String::from("earlier"), + ty: FieldType::U64, + source: FieldSource::Metric, + }; + let timeseries_name: TimeseriesName = "foo:bar".parse().unwrap(); + let datum_type = DatumType::U64; + let field_schema = + [target_field.clone(), metric_field.clone()].into_iter().collect(); + let expected_schema = TimeseriesSchema { + timeseries_name: timeseries_name.clone(), + field_schema, + datum_type, + created: Utc::now(), + }; + + // The fields here are sorted by target and then metric, which is how we + // used to insert them into the DB. We're checking that they are totally + // sorted when we read them out of the DB, even though they are not in + // the extracted model type. + let db_fields = DbFieldList { + names: vec![target_field.name.clone(), metric_field.name.clone()], + types: vec![target_field.ty.into(), metric_field.ty.into()], + sources: vec![ + target_field.source.into(), + metric_field.source.into(), + ], + }; + let db_schema = DbTimeseriesSchema { + timeseries_name: timeseries_name.to_string(), + field_schema: db_fields, + datum_type: datum_type.into(), + created: expected_schema.created, + }; + assert_eq!(expected_schema, TimeseriesSchema::from(db_schema)); + } + + #[test] + fn test_field_schema_ordering() { + let mut fields = BTreeSet::new(); + fields.insert(FieldSchema { + name: String::from("second"), + ty: FieldType::U64, + source: FieldSource::Target, + }); + fields.insert(FieldSchema { + name: String::from("first"), + ty: FieldType::U64, + source: FieldSource::Target, + }); + let mut iter = fields.iter(); + assert_eq!(iter.next().unwrap().name, "first"); + assert_eq!(iter.next().unwrap().name, "second"); + assert!(iter.next().is_none()); + } } diff --git a/oximeter/db/src/model.rs b/oximeter/db/src/model.rs index 1314c5c649..7f5b150b46 100644 --- a/oximeter/db/src/model.rs +++ b/oximeter/db/src/model.rs @@ -30,6 +30,7 @@ use oximeter::types::Sample; use serde::Deserialize; use serde::Serialize; use std::collections::BTreeMap; +use std::collections::BTreeSet; use std::convert::TryFrom; use std::net::IpAddr; use std::net::Ipv6Addr; @@ -107,7 +108,7 @@ pub(crate) struct DbFieldList { pub sources: Vec, } -impl From for Vec { +impl From for BTreeSet { fn from(list: DbFieldList) -> Self { list.names .into_iter() @@ -122,8 +123,8 @@ impl From for Vec { } } -impl From> for DbFieldList { - fn from(list: Vec) -> Self { +impl From> for DbFieldList { + fn from(list: BTreeSet) -> Self { let mut names = Vec::with_capacity(list.len()); let mut types = Vec::with_capacity(list.len()); let mut sources = Vec::with_capacity(list.len()); @@ -914,6 +915,9 @@ pub(crate) fn unroll_measurement_row(sample: &Sample) -> (String, String) { /// Return the schema for a `Sample`. pub(crate) fn schema_for(sample: &Sample) -> TimeseriesSchema { + // The fields are iterated through whatever order the `Target` or `Metric` + // impl chooses. We'll store in a set ordered by field name, to ignore the + // declaration order. let created = Utc::now(); let field_schema = sample .target_fields() @@ -1403,7 +1407,7 @@ mod tests { sources: vec![DbFieldSource::Target, DbFieldSource::Metric], }; - let list = vec![ + let list: BTreeSet<_> = [ FieldSchema { name: String::from("field0"), ty: FieldType::I64, @@ -1414,11 +1418,13 @@ mod tests { ty: FieldType::IpAddr, source: FieldSource::Metric, }, - ]; + ] + .into_iter() + .collect(); assert_eq!(DbFieldList::from(list.clone()), db_list); assert_eq!(db_list, list.clone().into()); - let round_trip: Vec = + let round_trip: BTreeSet = DbFieldList::from(list.clone()).into(); assert_eq!(round_trip, list); } diff --git a/oximeter/db/src/query.rs b/oximeter/db/src/query.rs index e9e1600739..6a55d3f518 100644 --- a/oximeter/db/src/query.rs +++ b/oximeter/db/src/query.rs @@ -721,6 +721,7 @@ mod tests { use crate::FieldSource; use crate::TimeseriesName; use chrono::NaiveDateTime; + use std::collections::BTreeSet; use std::convert::TryFrom; #[test] @@ -774,7 +775,7 @@ mod tests { fn test_select_query_builder_filter_raw() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![ + field_schema: [ FieldSchema { name: "f0".to_string(), ty: FieldType::I64, @@ -785,7 +786,9 @@ mod tests { ty: FieldType::Bool, source: FieldSource::Target, }, - ], + ] + .into_iter() + .collect(), datum_type: DatumType::I64, created: Utc::now(), }; @@ -905,7 +908,7 @@ mod tests { fn test_select_query_builder_no_fields() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![], + field_schema: BTreeSet::new(), datum_type: DatumType::I64, created: Utc::now(), }; @@ -927,7 +930,7 @@ mod tests { fn test_select_query_builder_limit_offset() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![], + field_schema: BTreeSet::new(), datum_type: DatumType::I64, created: Utc::now(), }; @@ -996,7 +999,7 @@ mod tests { fn test_select_query_builder_no_selectors() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![ + field_schema: [ FieldSchema { name: "f0".to_string(), ty: FieldType::I64, @@ -1007,7 +1010,9 @@ mod tests { ty: FieldType::Bool, source: FieldSource::Target, }, - ], + ] + .into_iter() + .collect(), datum_type: DatumType::I64, created: Utc::now(), }; @@ -1057,7 +1062,7 @@ mod tests { fn test_select_query_builder_field_selectors() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![ + field_schema: [ FieldSchema { name: "f0".to_string(), ty: FieldType::I64, @@ -1068,7 +1073,9 @@ mod tests { ty: FieldType::Bool, source: FieldSource::Target, }, - ], + ] + .into_iter() + .collect(), datum_type: DatumType::I64, created: Utc::now(), }; @@ -1106,7 +1113,7 @@ mod tests { fn test_select_query_builder_full() { let schema = TimeseriesSchema { timeseries_name: TimeseriesName::try_from("foo:bar").unwrap(), - field_schema: vec![ + field_schema: [ FieldSchema { name: "f0".to_string(), ty: FieldType::I64, @@ -1117,7 +1124,9 @@ mod tests { ty: FieldType::Bool, source: FieldSource::Target, }, - ], + ] + .into_iter() + .collect(), datum_type: DatumType::I64, created: Utc::now(), }; From a67f226179b2bc68e313399d6cacfab86960368f Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Tue, 24 Oct 2023 20:11:16 +0000 Subject: [PATCH 2/3] Review feedback --- oximeter/db/src/client.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index df8bfe1575..da68552e6e 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -516,6 +516,9 @@ impl Client { response } + // Get timeseries schema from the database. + // + // Can only be called after acquiring the lock around `self.schema`. async fn get_schema_locked( &self, schema: &mut BTreeMap, From c1b28752080cab696126f1d104ce379ec0a0cf78 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Tue, 24 Oct 2023 20:35:10 +0000 Subject: [PATCH 3/3] Improve comment around locking schema map --- oximeter/db/src/client.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/oximeter/db/src/client.rs b/oximeter/db/src/client.rs index da68552e6e..c2b7c820a8 100644 --- a/oximeter/db/src/client.rs +++ b/oximeter/db/src/client.rs @@ -384,11 +384,12 @@ impl Client { let name = sample_schema.timeseries_name.clone(); let mut schema = self.schema.lock().await; - // We need to possibly check that this schema is in the local cache, or - // in the database, all while we hold the lock to ensure there's no - // concurrent additions. This containment check is needed so that we - // check both the local cache and the database, to avoid adding a schema - // a second time. + // We've taken the lock before we do any checks for schema. First, we + // check if we've already got one in the cache. If not, we update all + // the schema from the database, and then check the map again. If we + // find a schema (which now either came from the cache or the latest + // read of the DB), then we check that the derived schema matches. If + // not, we can insert it in the cache and the DB. if !schema.contains_key(&name) { self.get_schema_locked(&mut schema).await?; }