Skip to content

Commit

Permalink
[3/n] use update engine for reconfigurator execution (#6399)
Browse files Browse the repository at this point in the history
This starts to introduce the ability for the reconfigurator execution to
be driven by the update engine. Doing so does add some complexity --
there's a new system driving updates -- but has several advantages that
will be coming in future patches:

1. More fine-grained step reporting.
2. Progress reporting (not hooked up yet).

This is really basic so far -- for example, we don't currently report
skipped steps, nor do we model nested steps within the update engine. But
it's a good place to get started, and we can keep enhancing this as time
permits.

One change is that we now return and store a single `anyhow::Error`
rather than a list of errors. If a step returns more than one error, we
use a hack to combine the cause chains into a single message.

To serialize errors, we use the `NestedError` that's currently in
`update-engine`. One benefit is that we now record the entire chain of
error sources.

Within omdb, I also switched the blueprint executor display (and nothing
else so far) to use tabled for easy alignment. This is what it looks
like now:

```
task: "blueprint_executor"
  configured period: every 1m
  currently executing: no
  last completed activation: iter 110, triggered by a periodic timer firing
    started at 2024-08-27T05:36:57.748Z (7s ago) and ran for 227ms
    target blueprint:  70004e05-fc43-4534-ba51-ab0239b13f63
    execution:         enabled
    status:            failed at: Ensure dataset records (step 6/13)
    error:             step failed: Ensure dataset records
      caused by:       failed to insert dataset record for dataset 9d128b27-c9e9-4acf-b40b-44cbfbb0e2ba
      caused by:       Object (of type ById(6549a5fa-3f2e-4132-b2a1-56caf9028064)) not found: zpool
```
  • Loading branch information
sunshowers authored Sep 7, 2024
1 parent 3d3f6d7 commit b45ec6d
Show file tree
Hide file tree
Showing 22 changed files with 1,253 additions and 151 deletions.
14 changes: 13 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ illumos-utils = { path = "illumos-utils" }
indent_write = "2.2.0"
indexmap = "2.4.0"
indicatif = { version = "0.17.8", features = ["rayon"] }
indoc = "2.0.5"
installinator = { path = "installinator" }
installinator-api = { path = "installinator-api" }
installinator-client = { path = "clients/installinator-client" }
Expand Down
1 change: 1 addition & 0 deletions dev-tools/omdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ tabled.workspace = true
textwrap.workspace = true
tokio = { workspace = true, features = [ "full" ] }
unicode-width.workspace = true
update-engine.workspace = true
url.workspace = true
uuid.workspace = true
ipnetwork.workspace = true
Expand Down
248 changes: 236 additions & 12 deletions dev-tools/omdb/src/bin/omdb/nexus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,19 @@ use slog_error_chain::InlineErrorChain;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::str::FromStr;
use tabled::settings::object::Columns;
use tabled::settings::Padding;
use tabled::Tabled;
use tokio::sync::OnceCell;
use update_engine::display::ProgressRatioDisplay;
use update_engine::events::EventReport;
use update_engine::events::StepOutcome;
use update_engine::EventBuffer;
use update_engine::ExecutionStatus;
use update_engine::ExecutionTerminalInfo;
use update_engine::NestedError;
use update_engine::NestedSpec;
use update_engine::TerminalKind;
use uuid::Uuid;

/// Arguments to the "omdb nexus" subcommand
Expand Down Expand Up @@ -1595,30 +1606,68 @@ fn print_task_details(bgtask: &BackgroundTask, details: &serde_json::Value) {
}
}
} else if name == "blueprint_executor" {
let mut value = details.clone();
// Extract and remove the event report. (If we don't do this, the
// `Debug` output can be quite large.)
//
// TODO: show more of the event buffer.
let event_buffer = extract_event_buffer(&mut value);

#[derive(Deserialize)]
struct BlueprintExecutorStatus {
target_id: Uuid,
enabled: bool,
errors: Option<Vec<String>>,
execution_error: Option<NestedError>,
}

match serde_json::from_value::<BlueprintExecutorStatus>(details.clone())
{
match serde_json::from_value::<BlueprintExecutorStatus>(value) {
Err(error) => eprintln!(
"warning: failed to interpret task details: {:?}: {:?}",
error, details
),
Ok(status) => {
println!(" target blueprint: {}", status.target_id);
println!(
" execution: {}",
if status.enabled { "enabled" } else { "disabled" }
);
let errors = status.errors.as_deref().unwrap_or(&[]);
println!(" errors: {}", errors.len());
for (i, e) in errors.iter().enumerate() {
println!(" error {}: {}", i, e);
// TODO: switch the other outputs to tabled as well.
let mut builder = tabled::builder::Builder::default();
builder.push_record([
"target blueprint:".to_string(),
status.target_id.to_string(),
]);
builder.push_record([
"execution:".to_string(),
if status.enabled {
"enabled".to_string()
} else {
"disabled".to_string()
},
]);

push_event_buffer_summary(event_buffer, &mut builder);

match status.execution_error {
Some(error) => {
builder.push_record([
"error:".to_string(),
error.to_string(),
]);

for source in error.sources() {
builder.push_record([
" caused by:".to_string(),
source.to_string(),
]);
}
}
None => {
builder.push_record([
"error:".to_string(),
"(none)".to_string(),
]);
}
}

let mut table = builder.build();
bgtask_apply_kv_style(&mut table);
println!("{}", table);
}
}
} else if name == "region_snapshot_replacement_finish" {
Expand Down Expand Up @@ -1665,6 +1714,181 @@ fn reason_str(reason: &ActivationReason) -> &'static str {
}
}

fn bgtask_apply_kv_style(table: &mut tabled::Table) {
let style = tabled::settings::Style::empty();
table.with(style).with(
tabled::settings::Modify::new(Columns::first())
// Background task tables are offset by 4 characters.
.with(Padding::new(4, 0, 0, 0)),
);
}

/// Extract and remove the event report, returning None if it wasn't found and
/// an error if something else went wrong. (If we don't do this, the `Debug`
/// output can be quite large.)
fn extract_event_buffer(
value: &mut serde_json::Value,
) -> anyhow::Result<Option<EventBuffer<NestedSpec>>> {
let Some(obj) = value.as_object_mut() else {
bail!("expected value to be an object")
};
let Some(event_report) = obj.remove("event_report") else {
return Ok(None);
};

// Try deserializing the event report generically. We could deserialize to
// a more explicit spec, e.g. `ReconfiguratorExecutionSpec`, but that's
// unnecessary for omdb's purposes.
let value: Result<EventReport<NestedSpec>, NestedError> =
serde_json::from_value(event_report)
.context("failed to deserialize event report")?;
let event_report = value.context(
"event report stored as Err rather than Ok (did receiver task panic?)",
)?;

let mut event_buffer = EventBuffer::default();
event_buffer.add_event_report(event_report);
Ok(Some(event_buffer))
}

// Make a short summary of the current state of an execution based on an event
// buffer, and add it to the table.
fn push_event_buffer_summary(
event_buffer: anyhow::Result<Option<EventBuffer<NestedSpec>>>,
builder: &mut tabled::builder::Builder,
) {
match event_buffer {
Ok(Some(buffer)) => {
event_buffer_summary_impl(buffer, builder);
}
Ok(None) => {
builder.push_record(["status:", "(no event report found)"]);
}
Err(error) => {
builder.push_record([
"event report error:".to_string(),
error.to_string(),
]);
for source in error.chain() {
builder.push_record([
" caused by:".to_string(),
source.to_string(),
]);
}
}
}
}

fn event_buffer_summary_impl(
buffer: EventBuffer<NestedSpec>,
builder: &mut tabled::builder::Builder,
) {
let Some(summary) = buffer.root_execution_summary() else {
builder.push_record(["status:", "(no information found)"]);
return;
};

match summary.execution_status {
ExecutionStatus::NotStarted => {
builder.push_record(["status:", "not started"]);
}
ExecutionStatus::Running { step_key, .. } => {
let step_data = buffer.get(&step_key).expect("step exists");
builder.push_record([
"status:".to_string(),
format!(
"running: {} (step {})",
step_data.step_info().description,
ProgressRatioDisplay::index_and_total(
step_key.index,
summary.total_steps,
),
),
]);
}
ExecutionStatus::Terminal(info) => {
push_event_buffer_terminal_info(
&info,
summary.total_steps,
&buffer,
builder,
);
}
}

// Also look for warnings.
for (_, step_data) in buffer.iter_steps_recursive() {
if let Some(reason) = step_data.step_status().completion_reason() {
if let Some(info) = reason.step_completed_info() {
if let StepOutcome::Warning { message, .. } = &info.outcome {
builder.push_record([
"warning:".to_string(),
// This can be a nested step, so don't print out the
// index.
format!(
"at: {}: {}",
step_data.step_info().description,
message
),
]);
}
}
}
}
}

fn push_event_buffer_terminal_info(
info: &ExecutionTerminalInfo,
total_steps: usize,
buffer: &EventBuffer<NestedSpec>,
builder: &mut tabled::builder::Builder,
) {
let step_data = buffer.get(&info.step_key).expect("step exists");

match info.kind {
TerminalKind::Completed => {
let v = format!("completed ({} steps)", total_steps);
builder.push_record(["status:".to_string(), v]);
}
TerminalKind::Failed => {
let v = format!(
"failed at: {} (step {})",
step_data.step_info().description,
ProgressRatioDisplay::index_and_total(
info.step_key.index,
total_steps,
)
);
builder.push_record(["status:".to_string(), v]);

// Don't show the error here, because it's duplicated in another
// field that's already shown.
}
TerminalKind::Aborted => {
let v = format!(
"aborted at: {} (step {})",
step_data.step_info().description,
ProgressRatioDisplay::index_and_total(
info.step_key.index,
total_steps,
)
);
builder.push_record(["status:".to_string(), v]);

let Some(reason) = step_data.step_status().abort_reason() else {
builder.push_record([" reason:", "(no reason found)"]);
return;
};

builder.push_record([
" reason:".to_string(),
reason.message_display(&buffer).to_string(),
]);
// TODO: show last progress event
}
}
}

/// Used for printing background task status as a table
#[derive(Tabled)]
struct BackgroundTaskStatusRow {
Expand Down
4 changes: 1 addition & 3 deletions installinator/src/reporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ where
update_id: Uuid,
discover_fn: F,
) -> (Self, mpsc::Sender<Event>) {
// Set a large enough buffer that it filling up isn't an actual problem
// outside of something going horribly wrong.
let (event_sender, event_receiver) = mpsc::channel(512);
let (event_sender, event_receiver) = update_engine::channel();
let ret = Self {
log: log.new(slog::o!("component" => "EventReporter")),
update_id,
Expand Down
2 changes: 1 addition & 1 deletion installinator/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,7 @@ mod tests {
(SharedTransport(Arc::clone(&inner)), SharedTransport(inner))
};

let (event_sender, event_receiver) = tokio::sync::mpsc::channel(512);
let (event_sender, event_receiver) = update_engine::channel();

let receiver_handle = tokio::spawn(async move {
ReceiverStream::new(event_receiver).collect::<Vec<_>>().await
Expand Down
1 change: 1 addition & 0 deletions nexus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ oximeter-producer.workspace = true
rustls = { workspace = true }
rustls-pemfile = { workspace = true }
update-common.workspace = true
update-engine.workspace = true
omicron-workspace-hack.workspace = true
omicron-uuid-kinds.workspace = true

Expand Down
Loading

0 comments on commit b45ec6d

Please sign in to comment.