Skip to content

Commit

Permalink
Add retry loop to deleting timeseries by name (#6504)
Browse files Browse the repository at this point in the history
We currently support deleting timeseries by name in a schema upgrade.
This deletion is implemented as a mutation in ClickHouse, which walks
all affected data parts and deletes the relevant records in a merge
operation. That's asynchronous by default, and run in a pool of
background tasks. Despite that, with large tables, it can take a while
for each mutation to complete, which blocks the server from queueing new
deletion requests. This can lead to timeouts, like seen in #6501.

This should fix #6501, but I'm not certain of that because I don't have
a good way to reproduce the bug. It seems likely that this is only seen
when the database is already heavily loaded, as it might be when doing
these mutations on large tables.
  • Loading branch information
bnaecker authored Sep 9, 2024
1 parent a3917ea commit 63ec6f3
Showing 1 changed file with 58 additions and 11 deletions.
69 changes: 58 additions & 11 deletions oximeter/db/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use dropshot::EmptyScanParams;
use dropshot::PaginationOrder;
use dropshot::ResultsPage;
use dropshot::WhichPage;
use omicron_common::backoff;
use oximeter::schema::TimeseriesKey;
use oximeter::types::Sample;
use oximeter::TimeseriesName;
Expand Down Expand Up @@ -627,12 +628,20 @@ impl Client {
upgrade_files
.insert(name.to_string(), (path.to_owned(), contents));
} else {
warn!(
log,
"skipping non-SQL schema dir entry";
"dir" => version_schema_dir.display(),
"path" => path.display(),
);
// Warn on unexpected files, _except_ the
// timeseries-to-delete.txt files we use to expunge timeseries.
if path
.file_name()
.map(|name| name == crate::TIMESERIES_TO_DELETE_FILE)
.unwrap_or(true)
{
warn!(
log,
"skipping non-SQL schema dir entry";
"dir" => version_schema_dir.display(),
"path" => path.display(),
);
}
continue;
}
}
Expand Down Expand Up @@ -1008,10 +1017,46 @@ impl Client {

/// Given a list of timeseries by name, delete their schema and any
/// associated data records from all tables.
///
/// If the database isn't available or the request times out, this method
/// will continue to retry the operation until it succeeds.
async fn expunge_timeseries_by_name(
&self,
replicated: bool,
to_delete: &[TimeseriesName],
) -> Result<(), Error> {
let op = || async {
self.expunge_timeseries_by_name_once(replicated, to_delete)
.await
.map_err(|err| match err {
Error::DatabaseUnavailable(_) => {
backoff::BackoffError::transient(err)
}
_ => backoff::BackoffError::permanent(err),
})
};
let notify = |error, count, delay| {
warn!(
self.log,
"failed to delete some timeseries";
"error" => ?error,
"call_count" => count,
"retry_after" => ?delay,
);
};
backoff::retry_notify_ext(
backoff::retry_policy_internal_service(),
op,
notify,
)
.await
}

/// Attempt to delete the named timeseries once.
async fn expunge_timeseries_by_name_once(
&self,
replicated: bool,
to_delete: &[TimeseriesName],
) -> Result<(), Error> {
// The version table should not have any matching data, but let's avoid
// it entirely anyway.
Expand Down Expand Up @@ -4685,7 +4730,7 @@ mod tests {
// timeseries share some field types and have others that are distinct
// between them, so that we can test that we don't touch tables we
// shouldn't, and only delete the parts we should.
let samples = generate_expunge_timeseries_samples();
let samples = generate_expunge_timeseries_samples(4);
client
.insert_samples(&samples)
.await
Expand Down Expand Up @@ -4805,7 +4850,9 @@ mod tests {
}
}

fn generate_expunge_timeseries_samples() -> Vec<Sample> {
fn generate_expunge_timeseries_samples(
n_samples_per_timeseries: u64,
) -> Vec<Sample> {
#[derive(oximeter::Target)]
struct FirstTarget {
first_field: String,
Expand Down Expand Up @@ -4833,12 +4880,12 @@ mod tests {
};
let mut m = SharedMetric { datum: 0 };

let mut out = Vec::with_capacity(8);
for i in 0..4 {
let mut out = Vec::with_capacity(2 * n_samples_per_timeseries as usize);
for i in 0..n_samples_per_timeseries {
m.datum = i;
out.push(Sample::new(&ft, &m).unwrap());
}
for i in 4..8 {
for i in n_samples_per_timeseries..(2 * n_samples_per_timeseries) {
m.datum = i;
out.push(Sample::new(&st, &m).unwrap());
}
Expand Down

0 comments on commit 63ec6f3

Please sign in to comment.