Skip to content

Commit

Permalink
WIP: pruning (seems to require table scan, not sure that it should, n…
Browse files Browse the repository at this point in the history
…ot worth debugging)
  • Loading branch information
davepacheco committed Oct 13, 2023
1 parent 5372a10 commit 49312b7
Show file tree
Hide file tree
Showing 9 changed files with 326 additions and 5 deletions.
12 changes: 12 additions & 0 deletions common/src/nexus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,18 @@ pub struct ExternalEndpointsConfig {
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct InventoryConfig {
/// period (in seconds) for periodic activations of this background task
///
/// Each activation fetches information about all harware and software in
/// the system and inserts it into the database. This generates a moderate
/// amount of data.
#[serde_as(as = "DurationSeconds<u64>")]
pub period_secs: Duration,

/// maximum number of past collections to keep in the database
///
/// This is a very coarse mechanism to keep the system from overwhelming
/// itself with inventory data.
pub nkeep: u32,
}

/// Configuration for a nexus server
Expand Down Expand Up @@ -690,6 +700,7 @@ mod test {
},
inventory: InventoryConfig {
period_secs: Duration::from_secs(10),
nkeep: 3,
}
},
default_region_allocation_strategy:
Expand Down Expand Up @@ -744,6 +755,7 @@ mod test {
dns_external.max_concurrent_server_updates = 8
external_endpoints.period_secs = 9
inventory.period_secs = 10
inventory.nkeep = 3
[default_region_allocation_strategy]
type = "random"
"##,
Expand Down
1 change: 1 addition & 0 deletions nexus/db-model/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,7 @@ joinable!(system_update_component_update -> component_update (component_update_i
allow_tables_to_appear_in_same_query!(ip_pool_range, ip_pool);
joinable!(ip_pool_range -> ip_pool (ip_pool_id));

allow_tables_to_appear_in_same_query!(inv_collection, inv_collection_error);
allow_tables_to_appear_in_same_query!(sw_caboose, inv_caboose);

allow_tables_to_appear_in_same_query!(
Expand Down
287 changes: 286 additions & 1 deletion nexus/db-queries/src/db/datastore/inventory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::db::error::ErrorHandler;
use crate::db::TransactionError;
use async_bb8_diesel::AsyncConnection;
use async_bb8_diesel::AsyncRunQueryDsl;
use async_bb8_diesel::OptionalExtension;
use chrono::DateTime;
use chrono::Utc;
use diesel::sql_types;
Expand All @@ -36,6 +37,7 @@ use nexus_types::inventory::BaseboardId;
use nexus_types::inventory::CabooseFound;
use nexus_types::inventory::Collection;
use omicron_common::api::external::Error;
use omicron_common::api::external::InternalContext;
use uuid::Uuid;

impl DataStore {
Expand Down Expand Up @@ -350,7 +352,290 @@ impl DataStore {
TransactionError::Connection(e) => {
public_error_from_diesel(e, ErrorHandler::Server)
}
})
})?;

info!(
&opctx.log,
"inserted inventory collection";
"collection_id" => collection.id.to_string(),
);

Ok(())
}

// XXX-dap TODO-doc
// This seems like a high-level function, but we want to push some of the
// logic into the SQL.
pub async fn inventory_prune_collections(
&self,
opctx: &OpContext,
nkeep: u32,
// XXX-dap can we not just update the log inside the opctx?
log: &slog::Logger,
) -> Result<(), Error> {
// There could be any number of collections in the database: 0, 1, ...,
// nkeep, nkeep + 1, ..., up to a very large number. Of these, some of
// these are potentially incomplete. We use a non-zero error count as a
// proxy for that. We never want to remove the last successful
// collection, even if there have been a lot of more recent (incomplete)
// collections.
//
// Here's how we'll do it:
//
// - From the database, select the latest-started collection that has no
// errors associated with it. We're going to hang onto this one.
// - Select the oldest `nkeep` collections that *aren't* the one we want
// to hang onto.
// - Remove the oldest collection in the returned set.
// - Repeat until the returned set is empty.
//
// This admittedly feels overcomplicated. If we were willing to start
// by pruning the *latest* collection eligible for removal, we could
// simply select collections most recent collections and use `OFFSET` to
// skip the last "nkeep". But if for some reason we're falling behind,
// it seems much better to prune from the oldest first.
//
// Is this going to work if multiple Nexuses are doing it concurrently?
// This cannot remove the last complete collection because a given Nexus
// will only remove a complete collection if it has seen a newer
// complete one. This cannot result in keeping fewer than "nkeep"
// collections because any Nexus will only remove a collection if there
// are "nkeep" newer ones. In both of these cases, another Nexus might
// remove one of the ones that the first Nexus was counting on keeping,
// but only if there was a newer one to replace it.

opctx.authorize(authz::Action::Modify, &authz::INVENTORY).await?;

loop {
match self.inventory_find_pruneable(opctx, nkeep, log).await? {
None => break,
Some(collection_id) => {
self.inventory_delete_collection(opctx, log, collection_id)
.await?
}
}
}

Ok(())
}

async fn inventory_find_pruneable(
&self,
opctx: &OpContext,
nkeep: u32,
log: &slog::Logger,
) -> Result<Option<Uuid>, Error> {
// The caller of this (non-pub) function is responsible for authz.

use db::schema::inv_collection::dsl as cdsl;
use db::schema::inv_collection_error::dsl as edsl;

let conn = self.pool_connection_authorized(opctx).await?;
let errors_subquery = edsl::inv_collection_error
.select(edsl::inv_collection_id)
.filter(edsl::inv_collection_id.eq(cdsl::id));

let latest_complete: Option<Uuid> = cdsl::inv_collection
.select(cdsl::id)
.filter(cdsl::time_done.is_not_null())
.filter(diesel::dsl::not(diesel::dsl::exists(errors_subquery)))
.order_by(cdsl::time_started.desc())
.limit(1)
.first_async(&*conn)
.await
.optional()
.map_err(|e| {
public_error_from_diesel(e.into(), ErrorHandler::Server)
})
.internal_context("finding latest successful collection")?;

debug!(
log,
"inventory_prune_one: latest";
"latest_complete" => ?latest_complete
);

// If we ever start truly supporting inserting collections that are not
// yet finished, we'll have to figure out how to tell if somebody's
// still trying to update them when we go to clean them.
// XXX-dap should we just make that illegal now?
let mut candidates_query = cdsl::inv_collection
.select(cdsl::id)
.filter(cdsl::time_done.is_not_null())
.into_boxed();

if let Some(latest) = latest_complete {
candidates_query = candidates_query.filter(cdsl::id.ne(latest));
}

let candidates: Vec<Uuid> = candidates_query
.order_by(cdsl::time_started.asc())
.limit(i64::from(nkeep) + 1)
.load_async(&*conn)
.await
.map_err(|e| {
public_error_from_diesel(e.into(), ErrorHandler::Server)
})
.internal_context("finding oldest collections")?;

if u32::try_from(candidates.len()).unwrap() <= nkeep {
debug!(log, "inventory_prune_one: found nothing to prune");
return Ok(None);
}

let oldest = candidates[0];
debug!(
log,
"inventory_prune_one: eligible for removal";
"collection_id" => oldest.to_string()
);

Ok(Some(oldest))
}

async fn inventory_delete_collection(
&self,
opctx: &OpContext,
log: &slog::Logger,
collection_id: Uuid,
) -> Result<(), Error> {
// The caller of this (non-pub) function is responsible for authz.

// We do this in a transaction for simplicity. If these transactions
// got too big, we could break it up, but we'd need a way to update the
// collection to say it's being deleted. (We can't delete it first
// because then we'd have no efficient way to identify records in the
// other tables that need to be deleted.)
let conn = self.pool_connection_authorized(opctx).await?;
let (ncollections, nsps, nrots, ncabooses, nerrors) = conn
.transaction_async(|conn| async move {
// Remove the record describing the collection itself.
let ncollections = {
use db::schema::inv_collection::dsl;
diesel::delete(
dsl::inv_collection.filter(dsl::id.eq(collection_id)),
)
.execute_async(&conn)
.await
.map_err(|e| {
TransactionError::CustomError(
public_error_from_diesel(
e.into(),
ErrorHandler::Server,
)
.internal_context(
"removing inventory collection record",
),
)
})?;
};

// Remove rows for service processors.
let nsps = {
use db::schema::inv_service_processor::dsl;
diesel::delete(
dsl::inv_service_processor
.filter(dsl::inv_collection_id.eq(collection_id)),
)
.execute_async(&conn)
.await
.map_err(|e| {
TransactionError::CustomError(
public_error_from_diesel(
e.into(),
ErrorHandler::Server,
)
.internal_context(
"removing service processor records",
),
)
})?;
};

// Remove rows for service processors.
let nrots = {
use db::schema::inv_root_of_trust::dsl;
diesel::delete(
dsl::inv_root_of_trust
.filter(dsl::inv_collection_id.eq(collection_id)),
)
.execute_async(&conn)
.await
.map_err(|e| {
TransactionError::CustomError(
public_error_from_diesel(
e.into(),
ErrorHandler::Server,
)
.internal_context("removing root of trust records"),
)
})?;
};

// Remove rows for cabooses found.
let ncabooses = {
use db::schema::inv_caboose::dsl;
diesel::delete(
dsl::inv_caboose
.filter(dsl::inv_collection_id.eq(collection_id)),
)
.execute_async(&conn)
.await
.map_err(|e| {
TransactionError::CustomError(
public_error_from_diesel(
e.into(),
ErrorHandler::Server,
)
.internal_context(
"removing cabooses-found records",
),
)
})?;
};

// Remove rows for errors encountered.
let nerrors = {
use db::schema::inv_collection_error::dsl;
diesel::delete(
dsl::inv_collection_error
.filter(dsl::inv_collection_id.eq(collection_id)),
)
.execute_async(&conn)
.await
.map_err(|e| {
// XXX-dap do we really have to do this mapping in every
// one of these?
TransactionError::CustomError(
public_error_from_diesel(
e.into(),
ErrorHandler::Server,
)
.internal_context("removing error records"),
)
})?;
};

Ok((ncollections, nsps, nrots, ncabooses, nerrors))
})
.await
.map_err(|error| match error {
TransactionError::CustomError(e) => e,
TransactionError::Connection(e) => {
public_error_from_diesel(e, ErrorHandler::Server)
}
})?;

info!(log, "removed inventory collection";
"collection_id" => collection_id.to_string(),
"ncollections" => ncollections,
"nsps" => nsps,
"nrots" => nrots,
"ncabooses" => ncabooses,
"nerrors" => nerrors,
);

Ok(())
}
}

Expand Down
2 changes: 2 additions & 0 deletions nexus/examples/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ external_endpoints.period_secs = 60
# How frequently to collect hardware/software inventory from the whole system
# (even if we don't have reason to believe anything has changed).
inventory.period_secs = 600
# Maximum number of past collections to keep in the database
inventory.nkeep = 5

[default_region_allocation_strategy]
# allocate region on 3 random distinct zpools, on 3 random distinct sleds.
Expand Down
1 change: 1 addition & 0 deletions nexus/src/app/background/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ impl BackgroundTasks {
datastore,
resolver,
&nexus_id.to_string(),
config.inventory.nkeep,
);
let task = driver.register(
String::from("inventory_collection"),
Expand Down
Loading

0 comments on commit 49312b7

Please sign in to comment.