Skip to content

Commit

Permalink
Periodically refresh a collector's list of assigned producers
Browse files Browse the repository at this point in the history
- Add an endpoint in Nexus's internal API for listing the assigned
  producers for a collector.
- Spawn a task in the `oximeter-collector` which will periodically fetch
  the list; remove any producers not in that list; and ensure any that
  are.
- Adds the time of this last refresh to the `oximeter-collector`
  server's API for fetching info about the collector
- Remove old use of `reqwest` directly to register collector, opt-in to
  the generated Nexus client.
- Adds some type conversions in the nexus client crate to simplify these
  new interfaces
  • Loading branch information
bnaecker committed Mar 25, 2024
1 parent 0ad5b9a commit b8c720a
Show file tree
Hide file tree
Showing 13 changed files with 439 additions and 121 deletions.
92 changes: 0 additions & 92 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,6 @@ backoff = { version = "0.4.0", features = [ "tokio" ] }
base64 = "0.22.0"
bb8 = "0.8.3"
bcs = "0.1.6"
bincode = "1.3.3"
bootstore = { path = "bootstore" }
bootstrap-agent-client = { path = "clients/bootstrap-agent-client" }
buf-list = { version = "1.0.3", features = ["tokio1"] }
Expand All @@ -190,13 +189,11 @@ clap = { version = "4.5", features = ["cargo", "derive", "env", "wrap_help"] }
colored = "2.1"
cookie = "0.18"
criterion = { version = "0.5.1", features = [ "async_tokio" ] }
crossbeam = "0.8"
crossterm = { version = "0.27.0", features = ["event-stream"] }
crucible-agent-client = { git = "https://github.com/oxidecomputer/crucible", rev = "952c7d60d22be5198b892bec8d92f4291b9160c2" }
crucible-pantry-client = { git = "https://github.com/oxidecomputer/crucible", rev = "952c7d60d22be5198b892bec8d92f4291b9160c2" }
crucible-smf = { git = "https://github.com/oxidecomputer/crucible", rev = "952c7d60d22be5198b892bec8d92f4291b9160c2" }
csv = "1.3.0"
curve25519-dalek = "4"
datatest-stable = "0.2.3"
display-error-chain = "0.2.0"
omicron-ddm-admin-client = { path = "clients/ddm-admin-client" }
Expand Down Expand Up @@ -252,7 +249,6 @@ ipcc = { path = "ipcc" }
ipnet = "2.9"
itertools = "0.12.1"
internet-checksum = "0.2"
ipcc-key-value = { path = "ipcc-key-value" }
ipnetwork = { version = "0.20", features = ["schemars"] }
ispf = { git = "https://github.com/oxidecomputer/ispf" }
key-manager = { path = "key-manager" }
Expand Down Expand Up @@ -292,7 +288,6 @@ num = { version = "0.4.1", default-features = false, features = [ "libm" ] }
omicron-common = { path = "common" }
omicron-gateway = { path = "gateway" }
omicron-nexus = { path = "nexus" }
omicron-package = { path = "package" }
omicron-rpaths = { path = "rpaths" }
omicron-sled-agent = { path = "sled-agent" }
omicron-test-utils = { path = "test-utils" }
Expand All @@ -305,7 +300,6 @@ openapiv3 = "2.0.0"
# must match samael's crate!
openssl = "0.10"
openssl-sys = "0.9"
openssl-probe = "0.1.5"
opte-ioctl = { git = "https://github.com/oxidecomputer/opte", rev = "7ee353a470ea59529ee1b34729681da887aa88ce" }
oso = "0.27"
owo-colors = "4.0.0"
Expand All @@ -316,15 +310,12 @@ oximeter-collector = { path = "oximeter/collector" }
oximeter-instruments = { path = "oximeter/instruments" }
oximeter-macro-impl = { path = "oximeter/oximeter-macro-impl" }
oximeter-producer = { path = "oximeter/producer" }
p256 = "0.13"
parse-display = "0.9.0"
partial-io = { version = "0.5.4", features = ["proptest1", "tokio1"] }
parse-size = "1.0.0"
paste = "1.0.14"
percent-encoding = "2.3.1"
pem = "3.0"
petgraph = "0.6.4"
postgres-protocol = "0.6.6"
predicates = "3.1.0"
pretty_assertions = "1.4.0"
pretty-hex = "0.4.1"
Expand Down Expand Up @@ -359,7 +350,6 @@ schemars = "0.8.16"
secrecy = "0.8.0"
semver = { version = "1.0.22", features = ["std", "serde"] }
serde = { version = "1.0", default-features = false, features = [ "derive", "rc" ] }
serde_derive = "1.0"
serde_human_bytes = { git = "http://github.com/oxidecomputer/serde_human_bytes", branch = "main" }
serde_json = "1.0.114"
serde_path_to_error = "0.1.16"
Expand All @@ -385,11 +375,8 @@ slog-envlogger = "2.2"
slog-error-chain = { git = "https://github.com/oxidecomputer/slog-error-chain", branch = "main", features = ["derive"] }
slog-term = "2.9"
smf = "0.2"
snafu = "0.7"
socket2 = { version = "0.5", features = ["all"] }
sp-sim = { path = "sp-sim" }
sprockets-common = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" }
sprockets-host = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" }
sprockets-rot = { git = "http://github.com/oxidecomputer/sprockets", rev = "77df31efa5619d0767ffc837ef7468101608aee9" }
sqlparser = { version = "0.43.1", features = [ "visitor" ] }
static_assertions = "1.1.0"
Expand Down Expand Up @@ -471,23 +458,6 @@ debug = "line-tables-only"
# times, because it allows target and host dependencies to be unified.
debug = "line-tables-only"

# `bindgen` is used by `samael`'s build script; building it with optimizations
# makes that build script run ~5x faster, more than offsetting the additional
# build time added to `bindgen` itself.
[profile.dev.package.bindgen]
opt-level = 3

# `lalrpop` is used by `polar-core`'s build script; building it with
# optimizations makes that build script run ~20x faster, more than offsetting
# the additional build time added to `lalrpop` itself.
[profile.dev.package.lalrpop]
opt-level = 3

# `polar-core` is exercised heavily during the test suite and it's worthwhile to
# have it built with optimizations.
[profile.dev.package.polar-core]
opt-level = 3

# Password hashing is expensive by construction. Build the hashing libraries
# with optimizations to significantly speed up tests.
[profile.dev.package.argon2]
Expand All @@ -514,46 +484,12 @@ opt-level = 3
opt-level = 3
[profile.dev.package.chacha20poly1305]
opt-level = 3
[profile.dev.package.chacha20]
opt-level = 3
[profile.dev.package.vsss-rs]
opt-level = 3
[profile.dev.package.curve25519-dalek]
opt-level = 3
[profile.dev.package.aead]
opt-level = 3
[profile.dev.package.aes]
opt-level = 3
[profile.dev.package.aes-gcm]
opt-level = 3
[profile.dev.package.bcrypt-pbkdf]
opt-level = 3
[profile.dev.package.blake2]
opt-level = 3
[profile.dev.package.blake2b_simd]
opt-level = 3
[profile.dev.package.block-buffer]
opt-level = 3
[profile.dev.package.block-padding]
opt-level = 3
[profile.dev.package.blowfish]
opt-level = 3
[profile.dev.package.constant_time_eq]
opt-level = 3
[profile.dev.package.crypto-bigint]
opt-level = 3
[profile.dev.package.crypto-common]
opt-level = 3
[profile.dev.package.ctr]
opt-level = 3
[profile.dev.package.cbc]
opt-level = 3
[profile.dev.package.digest]
opt-level = 3
[profile.dev.package.ed25519]
opt-level = 3
[profile.dev.package.ed25519-dalek]
opt-level = 3
[profile.dev.package.elliptic-curve]
opt-level = 3
[profile.dev.package.generic-array]
Expand All @@ -562,48 +498,20 @@ opt-level = 3
opt-level = 3
[profile.dev.package.hmac]
opt-level = 3
[profile.dev.package.lpc55_sign]
opt-level = 3
[profile.dev.package.md5]
opt-level = 3
[profile.dev.package.md-5]
opt-level = 3
[profile.dev.package.num-bigint]
opt-level = 3
[profile.dev.package.num-bigint-dig]
opt-level = 3
[profile.dev.package.rand]
opt-level = 3
[profile.dev.package.rand_chacha]
opt-level = 3
[profile.dev.package.rand_core]
opt-level = 3
[profile.dev.package.rand_hc]
opt-level = 3
[profile.dev.package.rand_xorshift]
opt-level = 3
[profile.dev.package.rsa]
opt-level = 3
[profile.dev.package.salty]
opt-level = 3
[profile.dev.package.signature]
opt-level = 3
[profile.dev.package.subtle]
opt-level = 3
[profile.dev.package.tiny-keccak]
opt-level = 3
[profile.dev.package.uuid]
opt-level = 3
[profile.dev.package.cipher]
opt-level = 3
[profile.dev.package.cpufeatures]
opt-level = 3
[profile.dev.package.poly1305]
opt-level = 3
[profile.dev.package.inout]
opt-level = 3
[profile.dev.package.keccak]
opt-level = 3


#
Expand Down
27 changes: 27 additions & 0 deletions clients/nexus-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,30 @@ impl From<omicron_common::api::internal::shared::ExternalPortDiscovery>
}
}
}

impl From<types::ProducerKind>
for omicron_common::api::internal::nexus::ProducerKind
{
fn from(kind: types::ProducerKind) -> Self {
use omicron_common::api::internal::nexus::ProducerKind;
match kind {
types::ProducerKind::SledAgent => ProducerKind::SledAgent,
types::ProducerKind::Instance => ProducerKind::Instance,
types::ProducerKind::Service => ProducerKind::Service,
}
}
}

impl From<types::ProducerEndpoint>
for omicron_common::api::internal::nexus::ProducerEndpoint
{
fn from(ep: types::ProducerEndpoint) -> Self {
Self {
id: ep.id,
kind: ep.kind.into(),
address: ep.address.parse().unwrap(),
base_route: ep.base_route,
interval: ep.interval.into(),
}
}
}
5 changes: 5 additions & 0 deletions dev-tools/omdb/src/bin/omdb/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ impl OximeterArgs {
.with(tabled::settings::Padding::new(0, 1, 0, 0))
.to_string();
println!("Collector ID: {}\n", info.id);
let last_refresh = info
.last_refresh
.map(|r| r.to_string())
.unwrap_or(String::from("Never"));
println!("Last refresh: {}\n", last_refresh);
println!("{table}");
Ok(())
}
Expand Down
17 changes: 16 additions & 1 deletion nexus/db-model/src/producer_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

use std::net::SocketAddr;
use std::time::Duration;

use super::SqlU16;
use crate::impl_enum_type;
use crate::schema::metric_producer;
Expand Down Expand Up @@ -44,7 +47,19 @@ impl From<ProducerKind> for internal::nexus::ProducerKind {
}
}

/// Information announced by a metric server, used so that clients can contact it and collect
impl From<ProducerEndpoint> for internal::nexus::ProducerEndpoint {
fn from(ep: ProducerEndpoint) -> Self {
internal::nexus::ProducerEndpoint {
id: ep.id(),
kind: ep.kind.into(),
address: SocketAddr::new(ep.ip.ip(), *ep.port),
base_route: ep.base_route.clone(),
interval: Duration::from_secs_f64(ep.interval),
}
}
}

/// Informa fromtion announced by a metric server, used so that clients can contact it and collect
/// available metric data from it.
#[derive(Queryable, Insertable, Debug, Clone, Selectable, Asset)]
#[diesel(table_name = metric_producer)]
Expand Down
16 changes: 14 additions & 2 deletions nexus/src/app/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ use internal_dns::ServiceName;
use nexus_db_queries::db;
use nexus_db_queries::db::identity::Asset;
use omicron_common::address::CLICKHOUSE_PORT;
use omicron_common::api::external::DataPageParams;
use omicron_common::api::external::Error;
use omicron_common::api::external::PaginationOrder;
use omicron_common::api::internal::nexus;
use omicron_common::api::external::{DataPageParams, ListResultVec};
use omicron_common::api::internal::nexus::{self, ProducerEndpoint};
use omicron_common::backoff;
use oximeter_client::Client as OximeterClient;
use oximeter_db::query::Timestamp;
Expand Down Expand Up @@ -146,6 +146,18 @@ impl super::Nexus {
}
}

/// List the producers assigned to an oximeter collector.
pub(crate) async fn list_assigned_producers(
&self,
collector_id: Uuid,
pagparams: &DataPageParams<'_, Uuid>,
) -> ListResultVec<ProducerEndpoint> {
self.db_datastore
.producers_list_by_oximeter_id(collector_id, pagparams)
.await
.map(|list| list.into_iter().map(ProducerEndpoint::from).collect())
}

/// Register as a metric producer with the oximeter metric collection server.
pub(crate) async fn register_as_producer(&self, address: SocketAddr) {
let producer_endpoint = nexus::ProducerEndpoint {
Expand Down
44 changes: 44 additions & 0 deletions nexus/src/internal_api/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub(crate) fn internal_api() -> NexusApiDescription {
api.register(cpapi_volume_remove_read_only_parent)?;
api.register(cpapi_disk_remove_read_only_parent)?;
api.register(cpapi_producers_post)?;
api.register(cpapi_assigned_producers_list)?;
api.register(cpapi_collectors_post)?;
api.register(cpapi_metrics_collect)?;
api.register(cpapi_artifact_download)?;
Expand Down Expand Up @@ -466,6 +467,49 @@ async fn cpapi_producers_post(
.await
}

#[derive(
Clone,
Copy,
Debug,
serde::Deserialize,
schemars::JsonSchema,
serde::Serialize,
)]
pub struct CollectorIdPathParams {
/// The ID of the oximeter collector.
pub collector_id: Uuid,
}

/// List all metric producers assigned to an oximeter collector.
#[endpoint {
method = GET,
path = "/metrics/collectors/{collector_id}/producers",
}]
async fn cpapi_assigned_producers_list(
request_context: RequestContext<Arc<ServerContext>>,
path_params: Path<CollectorIdPathParams>,
query_params: Query<PaginatedById>,
) -> Result<HttpResponseOk<ResultsPage<ProducerEndpoint>>, HttpError> {
let context = request_context.context();
let handler = async {
let nexus = &context.nexus;
let collector_id = path_params.into_inner().collector_id;
let query = query_params.into_inner();
let pagparams = data_page_params_for(&request_context, &query)?;
let producers =
nexus.list_assigned_producers(collector_id, &pagparams).await?;
Ok(HttpResponseOk(ScanById::results_page(
&query,
producers,
&|_, producer: &ProducerEndpoint| producer.id,
)?))
};
context
.internal_latencies
.instrument_dropshot_handler(&request_context, handler)
.await
}

/// Accept a notification of a new oximeter collection server.
#[endpoint {
method = POST,
Expand Down
1 change: 1 addition & 0 deletions nexus/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1411,6 +1411,7 @@ pub async fn start_oximeter(
let config = oximeter_collector::Config {
nexus_address: Some(nexus_address),
db,
refresh_interval: oximeter_collector::default_refresh_interval(),
log: ConfigLogging::StderrTerminal { level: ConfigLoggingLevel::Error },
};
let args = oximeter_collector::OximeterArguments {
Expand Down
Loading

0 comments on commit b8c720a

Please sign in to comment.