From b8c720ae53023a209768a97b0dc510471a854656 Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Sun, 24 Mar 2024 20:11:46 +0000 Subject: [PATCH] Periodically refresh a collector's list of assigned producers - Add an endpoint in Nexus's internal API for listing the assigned producers for a collector. - Spawn a task in the `oximeter-collector` which will periodically fetch the list; remove any producers not in that list; and ensure any that are. - Adds the time of this last refresh to the `oximeter-collector` server's API for fetching info about the collector - Remove old use of `reqwest` directly to register collector, opt-in to the generated Nexus client. - Adds some type conversions in the nexus client crate to simplify these new interfaces --- Cargo.toml | 92 --------- clients/nexus-client/src/lib.rs | 27 +++ dev-tools/omdb/src/bin/omdb/oximeter.rs | 5 + nexus/db-model/src/producer_endpoint.rs | 17 +- nexus/src/app/oximeter.rs | 16 +- nexus/src/internal_api/http_entrypoints.rs | 44 +++++ nexus/test-utils/src/lib.rs | 1 + openapi/nexus-internal.json | 87 +++++++++ openapi/oximeter.json | 6 + oximeter/collector/Cargo.toml | 2 +- oximeter/collector/src/agent.rs | 206 +++++++++++++++++++-- oximeter/collector/src/http_entrypoints.rs | 8 +- oximeter/collector/src/lib.rs | 49 +++-- 13 files changed, 439 insertions(+), 121 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a8fa57d5d6..5f1748e735 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -173,7 +173,6 @@ backoff = { version = "0.4.0", features = [ "tokio" ] } base64 = "0.22.0" bb8 = "0.8.3" bcs = "0.1.6" -bincode = "1.3.3" bootstore = { path = "bootstore" } bootstrap-agent-client = { path = "clients/bootstrap-agent-client" } buf-list = { version = "1.0.3", features = ["tokio1"] } @@ -190,13 +189,11 @@ clap = { version = "4.5", features = ["cargo", "derive", "env", "wrap_help"] } colored = "2.1" cookie = "0.18" criterion = { version = "0.5.1", features = [ "async_tokio" ] } -crossbeam = "0.8" crossterm = { version = "0.27.0", features = ["event-stream"] } crucible-agent-client = { git = "https://github.com/oxidecomputer/crucible", rev = "952c7d60d22be5198b892bec8d92f4291b9160c2" } crucible-pantry-client = { git = "https://github.com/oxidecomputer/crucible", rev = "952c7d60d22be5198b892bec8d92f4291b9160c2" } crucible-smf = { git = "https://github.com/oxidecomputer/crucible", rev = "952c7d60d22be5198b892bec8d92f4291b9160c2" } csv = "1.3.0" -curve25519-dalek = "4" datatest-stable = "0.2.3" display-error-chain = "0.2.0" omicron-ddm-admin-client = { path = "clients/ddm-admin-client" } @@ -252,7 +249,6 @@ ipcc = { path = "ipcc" } ipnet = "2.9" itertools = "0.12.1" internet-checksum = "0.2" -ipcc-key-value = { path = "ipcc-key-value" } ipnetwork = { version = "0.20", features = ["schemars"] } ispf = { git = "https://github.com/oxidecomputer/ispf" } key-manager = { path = "key-manager" } @@ -292,7 +288,6 @@ num = { version = "0.4.1", default-features = false, features = [ "libm" ] } omicron-common = { path = "common" } omicron-gateway = { path = "gateway" } omicron-nexus = { path = "nexus" } -omicron-package = { path = "package" } omicron-rpaths = { path = "rpaths" } omicron-sled-agent = { path = "sled-agent" } omicron-test-utils = { path = "test-utils" } @@ -305,7 +300,6 @@ openapiv3 = "2.0.0" # must match samael's crate! openssl = "0.10" openssl-sys = "0.9" -openssl-probe = "0.1.5" opte-ioctl = { git = "https://github.com/oxidecomputer/opte", rev = "7ee353a470ea59529ee1b34729681da887aa88ce" } oso = "0.27" owo-colors = "4.0.0" @@ -316,15 +310,12 @@ oximeter-collector = { path = "oximeter/collector" } oximeter-instruments = { path = "oximeter/instruments" } oximeter-macro-impl = { path = "oximeter/oximeter-macro-impl" } oximeter-producer = { path = "oximeter/producer" } -p256 = "0.13" parse-display = "0.9.0" partial-io = { version = "0.5.4", features = ["proptest1", "tokio1"] } parse-size = "1.0.0" paste = "1.0.14" -percent-encoding = "2.3.1" pem = "3.0" petgraph = "0.6.4" -postgres-protocol = "0.6.6" predicates = "3.1.0" pretty_assertions = "1.4.0" pretty-hex = "0.4.1" @@ -359,7 +350,6 @@ schemars = "0.8.16" secrecy = "0.8.0" semver = { version = "1.0.22", features = ["std", "serde"] } serde = { version = "1.0", default-features = false, features = [ "derive", "rc" ] } -serde_derive = "1.0" serde_human_bytes = { git = "http://github.com/oxidecomputer/serde_human_bytes", branch = "main" } serde_json = "1.0.114" serde_path_to_error = "0.1.16" @@ -385,11 +375,8 @@ slog-envlogger = "2.2" slog-error-chain = { git = "https://github.com/oxidecomputer/slog-error-chain", branch = "main", features = ["derive"] } slog-term = "2.9" smf = "0.2" -snafu = "0.7" socket2 = { version = "0.5", features = ["all"] } sp-sim = { path = "sp-sim" } -sprockets-common = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" } -sprockets-host = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" } sprockets-rot = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" } sqlparser = { version = "0.43.1", features = [ "visitor" ] } static_assertions = "1.1.0" @@ -471,23 +458,6 @@ debug = "line-tables-only" # times, because it allows target and host dependencies to be unified. debug = "line-tables-only" -# `bindgen` is used by `samael`'s build script; building it with optimizations -# makes that build script run ~5x faster, more than offsetting the additional -# build time added to `bindgen` itself. -[profile.dev.package.bindgen] -opt-level = 3 - -# `lalrpop` is used by `polar-core`'s build script; building it with -# optimizations makes that build script run ~20x faster, more than offsetting -# the additional build time added to `lalrpop` itself. -[profile.dev.package.lalrpop] -opt-level = 3 - -# `polar-core` is exercised heavily during the test suite and it's worthwhile to -# have it built with optimizations. -[profile.dev.package.polar-core] -opt-level = 3 - # Password hashing is expensive by construction. Build the hashing libraries # with optimizations to significantly speed up tests. [profile.dev.package.argon2] @@ -514,46 +484,12 @@ opt-level = 3 opt-level = 3 [profile.dev.package.chacha20poly1305] opt-level = 3 -[profile.dev.package.chacha20] -opt-level = 3 [profile.dev.package.vsss-rs] opt-level = 3 -[profile.dev.package.curve25519-dalek] -opt-level = 3 -[profile.dev.package.aead] -opt-level = 3 -[profile.dev.package.aes] -opt-level = 3 -[profile.dev.package.aes-gcm] -opt-level = 3 -[profile.dev.package.bcrypt-pbkdf] -opt-level = 3 -[profile.dev.package.blake2] -opt-level = 3 -[profile.dev.package.blake2b_simd] -opt-level = 3 -[profile.dev.package.block-buffer] -opt-level = 3 -[profile.dev.package.block-padding] -opt-level = 3 -[profile.dev.package.blowfish] -opt-level = 3 -[profile.dev.package.constant_time_eq] -opt-level = 3 -[profile.dev.package.crypto-bigint] -opt-level = 3 [profile.dev.package.crypto-common] opt-level = 3 -[profile.dev.package.ctr] -opt-level = 3 -[profile.dev.package.cbc] -opt-level = 3 [profile.dev.package.digest] opt-level = 3 -[profile.dev.package.ed25519] -opt-level = 3 -[profile.dev.package.ed25519-dalek] -opt-level = 3 [profile.dev.package.elliptic-curve] opt-level = 3 [profile.dev.package.generic-array] @@ -562,48 +498,20 @@ opt-level = 3 opt-level = 3 [profile.dev.package.hmac] opt-level = 3 -[profile.dev.package.lpc55_sign] -opt-level = 3 -[profile.dev.package.md5] -opt-level = 3 -[profile.dev.package.md-5] -opt-level = 3 [profile.dev.package.num-bigint] opt-level = 3 -[profile.dev.package.num-bigint-dig] -opt-level = 3 [profile.dev.package.rand] opt-level = 3 [profile.dev.package.rand_chacha] opt-level = 3 -[profile.dev.package.rand_core] -opt-level = 3 -[profile.dev.package.rand_hc] -opt-level = 3 -[profile.dev.package.rand_xorshift] -opt-level = 3 -[profile.dev.package.rsa] -opt-level = 3 -[profile.dev.package.salty] -opt-level = 3 -[profile.dev.package.signature] -opt-level = 3 [profile.dev.package.subtle] opt-level = 3 -[profile.dev.package.tiny-keccak] -opt-level = 3 [profile.dev.package.uuid] opt-level = 3 [profile.dev.package.cipher] opt-level = 3 -[profile.dev.package.cpufeatures] -opt-level = 3 -[profile.dev.package.poly1305] -opt-level = 3 [profile.dev.package.inout] opt-level = 3 -[profile.dev.package.keccak] -opt-level = 3 # diff --git a/clients/nexus-client/src/lib.rs b/clients/nexus-client/src/lib.rs index ad8269e675..a6d3259259 100644 --- a/clients/nexus-client/src/lib.rs +++ b/clients/nexus-client/src/lib.rs @@ -382,3 +382,30 @@ impl From } } } + +impl From + for omicron_common::api::internal::nexus::ProducerKind +{ + fn from(kind: types::ProducerKind) -> Self { + use omicron_common::api::internal::nexus::ProducerKind; + match kind { + types::ProducerKind::SledAgent => ProducerKind::SledAgent, + types::ProducerKind::Instance => ProducerKind::Instance, + types::ProducerKind::Service => ProducerKind::Service, + } + } +} + +impl From + for omicron_common::api::internal::nexus::ProducerEndpoint +{ + fn from(ep: types::ProducerEndpoint) -> Self { + Self { + id: ep.id, + kind: ep.kind.into(), + address: ep.address.parse().unwrap(), + base_route: ep.base_route, + interval: ep.interval.into(), + } + } +} diff --git a/dev-tools/omdb/src/bin/omdb/oximeter.rs b/dev-tools/omdb/src/bin/omdb/oximeter.rs index e0f20556a2..29491bb083 100644 --- a/dev-tools/omdb/src/bin/omdb/oximeter.rs +++ b/dev-tools/omdb/src/bin/omdb/oximeter.rs @@ -67,6 +67,11 @@ impl OximeterArgs { .with(tabled::settings::Padding::new(0, 1, 0, 0)) .to_string(); println!("Collector ID: {}\n", info.id); + let last_refresh = info + .last_refresh + .map(|r| r.to_string()) + .unwrap_or(String::from("Never")); + println!("Last refresh: {}\n", last_refresh); println!("{table}"); Ok(()) } diff --git a/nexus/db-model/src/producer_endpoint.rs b/nexus/db-model/src/producer_endpoint.rs index 55533690f1..d3fa8ae8c8 100644 --- a/nexus/db-model/src/producer_endpoint.rs +++ b/nexus/db-model/src/producer_endpoint.rs @@ -2,6 +2,9 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. +use std::net::SocketAddr; +use std::time::Duration; + use super::SqlU16; use crate::impl_enum_type; use crate::schema::metric_producer; @@ -44,7 +47,19 @@ impl From for internal::nexus::ProducerKind { } } -/// Information announced by a metric server, used so that clients can contact it and collect +impl From for internal::nexus::ProducerEndpoint { + fn from(ep: ProducerEndpoint) -> Self { + internal::nexus::ProducerEndpoint { + id: ep.id(), + kind: ep.kind.into(), + address: SocketAddr::new(ep.ip.ip(), *ep.port), + base_route: ep.base_route.clone(), + interval: Duration::from_secs_f64(ep.interval), + } + } +} + +/// Informa fromtion announced by a metric server, used so that clients can contact it and collect /// available metric data from it. #[derive(Queryable, Insertable, Debug, Clone, Selectable, Asset)] #[diesel(table_name = metric_producer)] diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index a168b35293..b0c79abefc 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -12,10 +12,10 @@ use internal_dns::ServiceName; use nexus_db_queries::db; use nexus_db_queries::db::identity::Asset; use omicron_common::address::CLICKHOUSE_PORT; -use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Error; use omicron_common::api::external::PaginationOrder; -use omicron_common::api::internal::nexus; +use omicron_common::api::external::{DataPageParams, ListResultVec}; +use omicron_common::api::internal::nexus::{self, ProducerEndpoint}; use omicron_common::backoff; use oximeter_client::Client as OximeterClient; use oximeter_db::query::Timestamp; @@ -146,6 +146,18 @@ impl super::Nexus { } } + /// List the producers assigned to an oximeter collector. + pub(crate) async fn list_assigned_producers( + &self, + collector_id: Uuid, + pagparams: &DataPageParams<'_, Uuid>, + ) -> ListResultVec { + self.db_datastore + .producers_list_by_oximeter_id(collector_id, pagparams) + .await + .map(|list| list.into_iter().map(ProducerEndpoint::from).collect()) + } + /// Register as a metric producer with the oximeter metric collection server. pub(crate) async fn register_as_producer(&self, address: SocketAddr) { let producer_endpoint = nexus::ProducerEndpoint { diff --git a/nexus/src/internal_api/http_entrypoints.rs b/nexus/src/internal_api/http_entrypoints.rs index 0676ace70c..c2c77a6fde 100644 --- a/nexus/src/internal_api/http_entrypoints.rs +++ b/nexus/src/internal_api/http_entrypoints.rs @@ -83,6 +83,7 @@ pub(crate) fn internal_api() -> NexusApiDescription { api.register(cpapi_volume_remove_read_only_parent)?; api.register(cpapi_disk_remove_read_only_parent)?; api.register(cpapi_producers_post)?; + api.register(cpapi_assigned_producers_list)?; api.register(cpapi_collectors_post)?; api.register(cpapi_metrics_collect)?; api.register(cpapi_artifact_download)?; @@ -466,6 +467,49 @@ async fn cpapi_producers_post( .await } +#[derive( + Clone, + Copy, + Debug, + serde::Deserialize, + schemars::JsonSchema, + serde::Serialize, +)] +pub struct CollectorIdPathParams { + /// The ID of the oximeter collector. + pub collector_id: Uuid, +} + +/// List all metric producers assigned to an oximeter collector. +#[endpoint { + method = GET, + path = "/metrics/collectors/{collector_id}/producers", + }] +async fn cpapi_assigned_producers_list( + request_context: RequestContext>, + path_params: Path, + query_params: Query, +) -> Result>, HttpError> { + let context = request_context.context(); + let handler = async { + let nexus = &context.nexus; + let collector_id = path_params.into_inner().collector_id; + let query = query_params.into_inner(); + let pagparams = data_page_params_for(&request_context, &query)?; + let producers = + nexus.list_assigned_producers(collector_id, &pagparams).await?; + Ok(HttpResponseOk(ScanById::results_page( + &query, + producers, + &|_, producer: &ProducerEndpoint| producer.id, + )?)) + }; + context + .internal_latencies + .instrument_dropshot_handler(&request_context, handler) + .await +} + /// Accept a notification of a new oximeter collection server. #[endpoint { method = POST, diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index cc9c8c43df..e5616a4641 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -1411,6 +1411,7 @@ pub async fn start_oximeter( let config = oximeter_collector::Config { nexus_address: Some(nexus_address), db, + refresh_interval: oximeter_collector::default_refresh_interval(), log: ConfigLogging::StderrTerminal { level: ConfigLoggingLevel::Error }, }; let args = oximeter_collector::OximeterArguments { diff --git a/openapi/nexus-internal.json b/openapi/nexus-internal.json index cba8063b7e..db3199833e 100644 --- a/openapi/nexus-internal.json +++ b/openapi/nexus-internal.json @@ -775,6 +775,72 @@ } } }, + "/metrics/collectors/{collector_id}/producers": { + "get": { + "summary": "List all metric producers assigned to an oximeter collector.", + "operationId": "cpapi_assigned_producers_list", + "parameters": [ + { + "in": "path", + "name": "collector_id", + "description": "The ID of the oximeter collector.", + "required": true, + "schema": { + "type": "string", + "format": "uuid" + } + }, + { + "in": "query", + "name": "limit", + "description": "Maximum number of items returned by a single call", + "schema": { + "nullable": true, + "type": "integer", + "format": "uint32", + "minimum": 1 + } + }, + { + "in": "query", + "name": "page_token", + "description": "Token returned by previous call to retrieve the subsequent page", + "schema": { + "nullable": true, + "type": "string" + } + }, + { + "in": "query", + "name": "sort_by", + "schema": { + "$ref": "#/components/schemas/IdSortMode" + } + } + ], + "responses": { + "200": { + "description": "successful operation", + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/ProducerEndpointResultsPage" + } + } + } + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + }, + "x-dropshot-pagination": { + "required": [] + } + } + }, "/metrics/producers": { "post": { "summary": "Accept a registration from a new metric producer", @@ -6171,6 +6237,27 @@ "kind" ] }, + "ProducerEndpointResultsPage": { + "description": "A single page of results", + "type": "object", + "properties": { + "items": { + "description": "list of items on this page of results", + "type": "array", + "items": { + "$ref": "#/components/schemas/ProducerEndpoint" + } + }, + "next_page": { + "nullable": true, + "description": "token used to fetch the next page of results (if any)", + "type": "string" + } + }, + "required": [ + "items" + ] + }, "ProducerKind": { "description": "The kind of metric producer this is.", "oneOf": [ diff --git a/openapi/oximeter.json b/openapi/oximeter.json index f5c78d53cd..4c609474ca 100644 --- a/openapi/oximeter.json +++ b/openapi/oximeter.json @@ -142,6 +142,12 @@ "description": "The collector's UUID.", "type": "string", "format": "uuid" + }, + "last_refresh": { + "nullable": true, + "description": "Last time we refreshed our producer list with Nexus.", + "type": "string", + "format": "date-time" } }, "required": [ diff --git a/oximeter/collector/Cargo.toml b/oximeter/collector/Cargo.toml index 92c91ca101..b7dac716c6 100644 --- a/oximeter/collector/Cargo.toml +++ b/oximeter/collector/Cargo.toml @@ -13,7 +13,6 @@ clap.workspace = true dropshot.workspace = true futures.workspace = true internal-dns.workspace = true -nexus-client.workspace = true nexus-types.workspace = true omicron-common.workspace = true oximeter.workspace = true @@ -33,6 +32,7 @@ tokio.workspace = true toml.workspace = true uuid.workspace = true omicron-workspace-hack.workspace = true +nexus-client.workspace = true [dev-dependencies] expectorate.workspace = true diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index 8fff44bb2d..85f3aa44cc 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -11,9 +11,16 @@ use crate::DbConfig; use crate::Error; use crate::ProducerEndpoint; use anyhow::anyhow; +use chrono::DateTime; +use chrono::Utc; +use futures::TryStreamExt; use internal_dns::resolver::Resolver; use internal_dns::ServiceName; +use nexus_client::types::IdSortMode; use omicron_common::address::CLICKHOUSE_PORT; +use omicron_common::address::NEXUS_INTERNAL_PORT; +use omicron_common::backoff; +use omicron_common::backoff::BackoffError; use oximeter::types::ProducerResults; use oximeter::types::ProducerResultsItem; use oximeter_db::Client; @@ -29,12 +36,15 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::net::SocketAddr; use std::net::SocketAddrV6; +use std::num::NonZeroU32; use std::ops::Bound; use std::sync::Arc; +use std::sync::Mutex as StdMutex; use std::time::Duration; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::Mutex; +use tokio::sync::MutexGuard; use tokio::task::JoinHandle; use tokio::time::interval; use uuid::Uuid; @@ -343,7 +353,7 @@ async fn results_sink( } /// The internal agent the oximeter server uses to collect metrics from producers. -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct OximeterAgent { /// The collector ID for this agent pub id: Uuid, @@ -355,6 +365,10 @@ pub struct OximeterAgent { // The actual tokio tasks running the collection on a timer. collection_tasks: Arc>>, + // The interval on which we refresh our list of producers from Nexus + refresh_interval: Duration, + /// The last time we've refreshed our list of producers from Nexus. + pub last_refresh_time: Arc>>>, } impl OximeterAgent { @@ -362,6 +376,7 @@ impl OximeterAgent { pub async fn with_id( id: Uuid, address: SocketAddrV6, + refresh_interval: Duration, db_config: DbConfig, resolver: &Resolver, log: &Logger, @@ -435,13 +450,29 @@ impl OximeterAgent { ) .await }); - Ok(Self { + + let self_ = Self { id, log, collection_target, result_sender, collection_tasks: Arc::new(Mutex::new(BTreeMap::new())), - }) + refresh_interval, + last_refresh_time: Arc::new(StdMutex::new(None)), + }; + + // And spawn our task for periodically updating our list of producers + // from Nexus. + // + // This is part of a coordination mechansim between Nexus, the + // producers, and us, to ensure that we have a reasonably up-to-date + // list of producers. Producers are required to re-register periodically + // with Nexus as a deadman -- if they fail to do so, Nexus will remove + // them. We fetch the list every so often to make sure that gets to us + // too. + tokio::spawn(refresh_producer_list(self_.clone(), resolver.clone())); + + Ok(self_) } /// Construct a new standalone `oximeter` collector. @@ -455,6 +486,7 @@ impl OximeterAgent { pub async fn new_standalone( id: Uuid, address: SocketAddrV6, + refresh_interval: Duration, db_config: Option, log: &Logger, ) -> Result { @@ -503,12 +535,20 @@ impl OximeterAgent { collector_ip: (*address.ip()).into(), collector_port: address.port(), }; + + // We don't spawn the task to periodically refresh producers when run + // in standalone mode. We can just pretend we registered once, and + // that's it. + let last_refresh_time = Arc::new(StdMutex::new(Some(Utc::now()))); + Ok(Self { id, log, collection_target, result_sender, collection_tasks: Arc::new(Mutex::new(BTreeMap::new())), + refresh_interval, + last_refresh_time, }) } @@ -517,8 +557,23 @@ impl OximeterAgent { &self, info: ProducerEndpoint, ) -> Result<(), Error> { + let mut tasks = self.collection_tasks.lock().await; + self.register_producer_locked(&mut tasks, info).await; + Ok(()) + } + + // Internal implementation that registers a producer, assuming the lock on + // the map is held. + async fn register_producer_locked( + &self, + tasks: &mut MutexGuard< + '_, + BTreeMap, + >, + info: ProducerEndpoint, + ) { let id = info.id; - match self.collection_tasks.lock().await.entry(id) { + match tasks.entry(id) { Entry::Vacant(value) => { debug!( self.log, @@ -557,7 +612,6 @@ impl OximeterAgent { .unwrap(); } } - Ok(()) } /// Forces a collection from all producers. @@ -607,12 +661,22 @@ impl OximeterAgent { /// Delete a producer by ID, stopping its collection task. pub async fn delete_producer(&self, id: Uuid) -> Result<(), Error> { - let (_info, task) = self - .collection_tasks - .lock() - .await - .remove(&id) - .ok_or_else(|| Error::NoSuchProducer(id))?; + let mut tasks = self.collection_tasks.lock().await; + self.delete_producer_locked(&mut tasks, id).await + } + + // Internal implementation that deletes a producer, assuming the lock on + // the map is held. + async fn delete_producer_locked( + &self, + tasks: &mut MutexGuard< + '_, + BTreeMap, + >, + id: Uuid, + ) -> Result<(), Error> { + let (_info, task) = + tasks.remove(&id).ok_or_else(|| Error::NoSuchProducer(id))?; debug!( self.log, "removed collection task from set"; @@ -633,6 +697,123 @@ impl OximeterAgent { } Ok(()) } + + // Ensure that exactly the set of producers is registered with `self`. + // + // Errors logged, but not returned, and an attempt to register all producers + // is made, even if an error is encountered part-way through. + // + // This returns the number of pruned tasks. + async fn ensure_producers( + &self, + expected_producers: BTreeMap, + ) -> usize { + let mut tasks = self.collection_tasks.lock().await; + + // First prune unwanted collection tasks. + // + // This is set of all producers that we currently have, which are not in + // the new list from Nexus. + let ids_to_prune: Vec<_> = tasks + .keys() + .filter(|id| !expected_producers.contains_key(id)) + .copied() + .collect(); + let n_pruned = ids_to_prune.len(); + for id in ids_to_prune.into_iter() { + // This method only returns an error if the provided ID does not + // exist in the current tasks. That is impossible, because we hold + // the lock, and we've just computed this as the set that _is_ in + // the map, and not in the new set from Nexus. + self.delete_producer_locked(&mut tasks, id).await.unwrap(); + } + + // And then ensure everything in the list. + // + // This will insert new tasks, and update any that we already know + // about. + for info in expected_producers.into_values() { + self.register_producer_locked(&mut tasks, info).await; + } + n_pruned + } +} + +// A task which periodically updates our list of producers from Nexus. +async fn refresh_producer_list(agent: OximeterAgent, resolver: Resolver) { + let mut interval = tokio::time::interval(agent.refresh_interval); + interval.tick().await; // Completes immediately. + let page_size = Some(NonZeroU32::new(100).unwrap()); + loop { + interval.tick().await; + info!(agent.log, "refreshing list of producers from Nexus"); + let nexus_addr = + resolve_nexus_with_backoff(&agent.log, &resolver).await; + let url = format!("http://{}", nexus_addr); + let client = nexus_client::Client::new(&url, agent.log.clone()); + let mut stream = client.cpapi_assigned_producers_list_stream( + &agent.id, + page_size, + Some(IdSortMode::IdAscending), + ); + let mut expected_producers = BTreeMap::new(); + loop { + match stream.try_next().await { + Err(e) => { + error!( + agent.log, + "error fetching next assigned producer"; + "err" => ?e, + ); + } + Ok(Some(p)) => { + let endpoint = ProducerEndpoint::from(p); + let old = expected_producers.insert(endpoint.id, endpoint); + assert!(old.is_none()); + } + Ok(None) => break, + } + } + let n_current_tasks = expected_producers.len(); + let n_pruned_tasks = agent.ensure_producers(expected_producers).await; + let _ = agent.last_refresh_time.lock().unwrap().insert(Utc::now()); + info!( + agent.log, + "refreshed list of producers from Nexus"; + "n_pruned_tasks" => n_pruned_tasks, + "n_current_tasks" => n_current_tasks, + ); + } +} + +async fn resolve_nexus_with_backoff( + log: &Logger, + resolver: &Resolver, +) -> SocketAddr { + let log_failure = |error, delay| { + warn!( + log, + "failed to lookup Nexus IP, will retry"; + "delay" => ?delay, + "error" => ?error, + ); + }; + let do_lookup = || async { + resolver + .lookup_ipv6(ServiceName::Nexus) + .await + .map_err(|e| BackoffError::transient(e.to_string())) + .map(|ip| { + SocketAddr::V6(SocketAddrV6::new(ip, NEXUS_INTERNAL_PORT, 0, 0)) + }) + }; + backoff::retry_notify( + backoff::retry_policy_internal_service(), + do_lookup, + log_failure, + ) + .await + .expect("Expected infinite retry loop resolving Nexus address") } #[cfg(test)] @@ -696,6 +877,7 @@ mod tests { let collector = OximeterAgent::new_standalone( Uuid::new_v4(), SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0), + crate::default_refresh_interval(), None, log, ) @@ -772,6 +954,7 @@ mod tests { let collector = OximeterAgent::new_standalone( Uuid::new_v4(), SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0), + crate::default_refresh_interval(), None, log, ) @@ -842,6 +1025,7 @@ mod tests { let collector = OximeterAgent::new_standalone( Uuid::new_v4(), SocketAddrV6::new(Ipv6Addr::LOCALHOST, 0, 0, 0), + crate::default_refresh_interval(), None, log, ) diff --git a/oximeter/collector/src/http_entrypoints.rs b/oximeter/collector/src/http_entrypoints.rs index 493083a40d..e876ed047d 100644 --- a/oximeter/collector/src/http_entrypoints.rs +++ b/oximeter/collector/src/http_entrypoints.rs @@ -7,6 +7,8 @@ // Copyright 2023 Oxide Computer Company use crate::OximeterAgent; +use chrono::DateTime; +use chrono::Utc; use dropshot::endpoint; use dropshot::ApiDescription; use dropshot::EmptyScanParams; @@ -117,6 +119,8 @@ async fn producer_delete( pub struct CollectorInfo { /// The collector's UUID. pub id: Uuid, + /// Last time we refreshed our producer list with Nexus. + pub last_refresh: Option>, } // Return identifying information about this collector @@ -128,6 +132,8 @@ async fn collector_info( request_context: RequestContext>, ) -> Result, HttpError> { let agent = request_context.context(); - let info = CollectorInfo { id: agent.id }; + let id = agent.id; + let last_refresh = *agent.last_refresh_time.lock().unwrap(); + let info = CollectorInfo { id, last_refresh }; Ok(HttpResponseOk(info)) } diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index f3c793d5c2..e1926afd31 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -31,6 +31,7 @@ use std::net::SocketAddr; use std::net::SocketAddrV6; use std::path::Path; use std::sync::Arc; +use std::time::Duration; use thiserror::Error; use uuid::Uuid; @@ -114,6 +115,11 @@ impl DbConfig { } } +/// Default interval on which we refresh our list of producers from Nexus. +pub const fn default_refresh_interval() -> Duration { + Duration::from_secs(60 * 10) +} + /// Configuration used to initialize an oximeter server #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Config { @@ -123,6 +129,11 @@ pub struct Config { #[serde(default, skip_serializing_if = "Option::is_none")] pub nexus_address: Option, + /// The interval on which we periodically refresh our list of producers from + /// Nexus. + #[serde(default = "default_refresh_interval")] + pub refresh_interval: Duration, + /// Configuration for working with ClickHouse pub db: DbConfig, @@ -202,6 +213,7 @@ impl Oximeter { OximeterAgent::with_id( args.id, args.address, + config.refresh_interval, config.db, &resolver, &log, @@ -239,7 +251,10 @@ impl Oximeter { .start(); // Notify Nexus that this oximeter instance is available. - let client = reqwest::Client::new(); + let our_info = nexus_client::types::OximeterInfo { + address: server.local_addr().to_string(), + collector_id: agent.id, + }; let notify_nexus = || async { debug!(log, "contacting nexus"); let nexus_address = if let Some(address) = config.nexus_address { @@ -254,18 +269,25 @@ impl Oximeter { 0, )) }; - - client - .post(format!("http://{}/metrics/collectors", nexus_address,)) - .json(&nexus_client::types::OximeterInfo { - address: server.local_addr().to_string(), - collector_id: agent.id, - }) - .send() - .await - .map_err(|e| backoff::BackoffError::transient(e.to_string()))? - .error_for_status() - .map_err(|e| backoff::BackoffError::transient(e.to_string())) + let client = nexus_client::Client::new( + &format!("http://{nexus_address}"), + log.clone(), + ); + client.cpapi_collectors_post(&our_info).await.map_err(|e| { + match &e { + // Failures to reach nexus, or server errors on its side + // are retryable. Everything else is permanent. + nexus_client::Error::CommunicationError(_) => { + backoff::BackoffError::transient(e.to_string()) + } + nexus_client::Error::ErrorResponse(inner) + if inner.status().is_server_error() => + { + backoff::BackoffError::transient(e.to_string()) + } + _ => backoff::BackoffError::permanent(e.to_string()), + } + }) }; let log_notification_failure = |error, delay| { warn!( @@ -298,6 +320,7 @@ impl Oximeter { OximeterAgent::new_standalone( args.id, args.address, + crate::default_refresh_interval(), db_config, &log, )