diff --git a/Cargo.lock b/Cargo.lock index e5130b6b33..f457d10ddf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9896,6 +9896,7 @@ dependencies = [ "installinator-artifact-client", "installinator-artifactd", "installinator-common", + "itertools 0.11.0", "omicron-certificates", "omicron-common 0.1.0", "omicron-passwords 0.1.0", diff --git a/openapi/wicketd.json b/openapi/wicketd.json index 40d798da00..d67fc79f7a 100644 --- a/openapi/wicketd.json +++ b/openapi/wicketd.json @@ -598,6 +598,33 @@ } } }, + "/update": { + "post": { + "summary": "An endpoint to start updating one or more sleds, switches and PSCs.", + "operationId": "post_start_update", + "requestBody": { + "content": { + "application/json": { + "schema": { + "$ref": "#/components/schemas/StartUpdateParams" + } + } + }, + "required": true + }, + "responses": { + "204": { + "description": "resource updated" + }, + "4XX": { + "$ref": "#/components/responses/Error" + }, + "5XX": { + "$ref": "#/components/responses/Error" + } + } + } + }, "/update/{type}/{slot}": { "get": { "summary": "An endpoint to get the status of any update being performed or recently", @@ -641,51 +668,6 @@ "$ref": "#/components/responses/Error" } } - }, - "post": { - "summary": "An endpoint to start updating a sled.", - "operationId": "post_start_update", - "parameters": [ - { - "in": "path", - "name": "slot", - "required": true, - "schema": { - "type": "integer", - "format": "uint32", - "minimum": 0 - } - }, - { - "in": "path", - "name": "type", - "required": true, - "schema": { - "$ref": "#/components/schemas/SpType" - } - } - ], - "requestBody": { - "content": { - "application/json": { - "schema": { - "$ref": "#/components/schemas/StartUpdateOptions" - } - } - }, - "required": true - }, - "responses": { - "204": { - "description": "resource updated" - }, - "4XX": { - "$ref": "#/components/responses/Error" - }, - "5XX": { - "$ref": "#/components/responses/Error" - } - } } } }, @@ -2761,6 +2743,31 @@ "skip_sp_version_check" ] }, + "StartUpdateParams": { + "type": "object", + "properties": { + "options": { + "description": "Options for the update.", + "allOf": [ + { + "$ref": "#/components/schemas/StartUpdateOptions" + } + ] + }, + "targets": { + "description": "The SP identifiers to start the update with. Must be non-empty.", + "type": "array", + "items": { + "$ref": "#/components/schemas/SpIdentifier" + }, + "uniqueItems": true + } + }, + "required": [ + "options", + "targets" + ] + }, "StepComponentSummaryForGenericSpec": { "type": "object", "properties": { diff --git a/update-engine/src/buffer.rs b/update-engine/src/buffer.rs index 3de0e45f24..6e12604d88 100644 --- a/update-engine/src/buffer.rs +++ b/update-engine/src/buffer.rs @@ -6,6 +6,7 @@ use std::{ collections::{HashMap, VecDeque}, + fmt, time::Duration, }; @@ -96,7 +97,7 @@ impl EventBuffer { /// /// This might cause older low-priority events to fall off the list. pub fn add_step_event(&mut self, event: StepEvent) { - self.event_store.handle_step_event(event, self.max_low_priority); + self.event_store.handle_root_step_event(event, self.max_low_priority); } /// Returns the root execution ID, if this event buffer is aware of any @@ -132,7 +133,8 @@ impl EventBuffer { let mut step_events = Vec::new(); let mut progress_events = Vec::new(); for (_, step_data) in self.steps().as_slice() { - step_events.extend(step_data.step_events_since(last_seen).cloned()); + step_events + .extend(step_data.step_events_since_impl(last_seen).cloned()); progress_events .extend(step_data.step_status.progress_event().cloned()); } @@ -161,7 +163,7 @@ impl EventBuffer { /// have been reported before a sender shuts down. pub fn has_pending_events_since(&self, last_seen: Option) -> bool { for (_, step_data) in self.steps().as_slice() { - if step_data.step_events_since(last_seen).next().is_some() { + if step_data.step_events_since_impl(last_seen).next().is_some() { return true; } } @@ -223,8 +225,8 @@ impl EventStore { }) } - /// Handles a step event. - fn handle_step_event( + /// Handles a non-nested step event. + fn handle_root_step_event( &mut self, event: StepEvent, max_low_priority: usize, @@ -234,12 +236,17 @@ impl EventStore { return; } + // This is a non-nested step event so the event index is a root event + // index. + let root_event_index = RootEventIndex(event.event_index); + let actions = - self.recurse_for_step_event(&event, 0, None, event.event_index); + self.recurse_for_step_event(&event, 0, None, root_event_index); if let Some(new_execution) = actions.new_execution { if new_execution.nest_level == 0 { self.root_execution_id = Some(new_execution.execution_id); } + let total_steps = new_execution.steps_to_add.len(); for (new_step_key, new_step, sort_key) in new_execution.steps_to_add { // These are brand new steps so their keys shouldn't exist in the @@ -249,6 +256,8 @@ impl EventStore { new_step, sort_key, new_execution.nest_level, + total_steps, + root_event_index, ) }); } @@ -302,7 +311,7 @@ impl EventStore { event: &StepEvent, nest_level: usize, parent_sort_key: Option<&StepSortKey>, - root_event_index: usize, + root_event_index: RootEventIndex, ) -> RecurseActions { let mut new_execution = None; let (step_key, progress_key) = match &event.kind { @@ -318,7 +327,7 @@ impl EventStore { }; let sort_key = StepSortKey::new( parent_sort_key, - root_event_index, + root_event_index.0, step.index, ); let step_node = self.add_step_node(step_key); @@ -360,7 +369,7 @@ impl EventStore { attempt_elapsed: *attempt_elapsed, }; // Mark this key and all child keys completed. - self.mark_step_key_completed(key, info); + self.mark_step_key_completed(key, info, root_event_index); // Register the next step in the event map. let next_key = StepKey { @@ -400,7 +409,7 @@ impl EventStore { attempt_elapsed: *attempt_elapsed, }; // Mark this key and all child keys completed. - self.mark_execution_id_completed(key, info); + self.mark_execution_id_completed(key, info, root_event_index); (Some(key), Some(key)) } @@ -426,7 +435,7 @@ impl EventStore { step_elapsed: *step_elapsed, attempt_elapsed: *attempt_elapsed, }; - self.mark_step_failed(key, info); + self.mark_step_failed(key, info, root_event_index); (Some(key), Some(key)) } @@ -450,7 +459,7 @@ impl EventStore { step_elapsed: *step_elapsed, attempt_elapsed: *attempt_elapsed, }; - self.mark_step_aborted(key, info); + self.mark_step_aborted(key, info, root_event_index); (Some(key), Some(key)) } @@ -524,11 +533,12 @@ impl EventStore { &mut self, root_key: StepKey, info: CompletionInfo, + root_event_index: RootEventIndex, ) { if let Some(value) = self.map.get_mut(&root_key) { // Completion status only applies to the root key. Nodes reachable // from this node are still marked as complete, but without status. - value.mark_completed(Some(info)); + value.mark_completed(Some(info), root_event_index); } // Mark anything reachable from this node as completed. @@ -538,7 +548,7 @@ impl EventStore { if let EventTreeNode::Step(key) = key { if key != root_key { if let Some(value) = self.map.get_mut(&key) { - value.mark_completed(None); + value.mark_completed(None, root_event_index); } } } @@ -549,10 +559,11 @@ impl EventStore { &mut self, root_key: StepKey, info: CompletionInfo, + root_event_index: RootEventIndex, ) { if let Some(value) = self.map.get_mut(&root_key) { // Completion status only applies to the root key. - value.mark_completed(Some(info)); + value.mark_completed(Some(info), root_event_index); } let mut dfs = DfsPostOrder::new( @@ -563,58 +574,87 @@ impl EventStore { if let EventTreeNode::Step(key) = key { if key != root_key { if let Some(value) = self.map.get_mut(&key) { - value.mark_completed(None); + value.mark_completed(None, root_event_index); } } } } } - fn mark_step_failed(&mut self, root_key: StepKey, info: FailureInfo) { - self.mark_step_failed_impl(root_key, |value, kind| { + fn mark_step_failed( + &mut self, + root_key: StepKey, + info: FailureInfo, + root_event_index: RootEventIndex, + ) { + self.mark_step_failed_impl(root_key, root_event_index, |value, kind| { match kind { MarkStepFailedImplKind::Root => { - value.mark_failed(Some(info.clone())); + value.mark_failed( + FailureReason::StepFailed(info.clone()), + root_event_index, + ); } MarkStepFailedImplKind::Descendant => { - value.mark_failed(None); + value.mark_failed( + FailureReason::ParentFailed { parent_step: root_key }, + root_event_index, + ); } MarkStepFailedImplKind::Future => { value.mark_will_not_be_run( WillNotBeRunReason::PreviousStepFailed { step: root_key, }, + root_event_index, ); } }; }) } - fn mark_step_aborted(&mut self, root_key: StepKey, info: AbortInfo) { - self.mark_step_failed_impl(root_key, |value, kind| { - match kind { - MarkStepFailedImplKind::Root => { - value.mark_aborted(AbortReason::StepAborted(info.clone())); - } - MarkStepFailedImplKind::Descendant => { - value.mark_aborted(AbortReason::ParentAborted { - parent_step: root_key, - }); - } - MarkStepFailedImplKind::Future => { - value.mark_will_not_be_run( - WillNotBeRunReason::PreviousStepAborted { - step: root_key, - }, - ); - } - }; - }); + fn mark_step_aborted( + &mut self, + root_key: StepKey, + info: AbortInfo, + root_event_index: RootEventIndex, + ) { + self.mark_step_failed_impl( + root_key, + root_event_index, + |value, kind| { + match kind { + MarkStepFailedImplKind::Root => { + value.mark_aborted( + AbortReason::StepAborted(info.clone()), + root_event_index, + ); + } + MarkStepFailedImplKind::Descendant => { + value.mark_aborted( + AbortReason::ParentAborted { + parent_step: root_key, + }, + root_event_index, + ); + } + MarkStepFailedImplKind::Future => { + value.mark_will_not_be_run( + WillNotBeRunReason::PreviousStepAborted { + step: root_key, + }, + root_event_index, + ); + } + }; + }, + ); } fn mark_step_failed_impl( &mut self, root_key: StepKey, + root_event_index: RootEventIndex, mut cb: impl FnMut(&mut EventBufferStepData, MarkStepFailedImplKind), ) { if let Some(value) = self.map.get_mut(&root_key) { @@ -627,7 +667,7 @@ impl EventStore { for index in 0..root_key.index { let key = StepKey { execution_id: root_key.execution_id, index }; if let Some(value) = self.map.get_mut(&key) { - value.mark_completed(None); + value.mark_completed(None, root_event_index); } } @@ -744,10 +784,17 @@ impl<'buf, S: StepSpec> EventBufferSteps<'buf, S> { pub struct EventBufferStepData { step_info: StepInfo, sort_key: StepSortKey, + // XXX: nest_level and total_steps are common to each execution, but are + // stored separately here. Should we store them in a separate map + // indexed by execution ID? nest_level: usize, - // Invariant: stored in order sorted by event_index. + total_steps: usize, + // Invariant: stored in order sorted by leaf event index. high_priority: Vec>, step_status: StepStatus, + // The last root event index that caused the data within this step to be + // updated. + last_root_event_index: RootEventIndex, } impl EventBufferStepData { @@ -755,36 +802,65 @@ impl EventBufferStepData { step_info: StepInfo, sort_key: StepSortKey, nest_level: usize, + total_steps: usize, + root_event_index: RootEventIndex, ) -> Self { Self { step_info, sort_key, nest_level, + total_steps, high_priority: Vec::new(), step_status: StepStatus::NotStarted, + last_root_event_index: root_event_index, } } + #[inline] pub fn step_info(&self) -> &StepInfo { &self.step_info } + #[inline] pub fn nest_level(&self) -> usize { self.nest_level } + #[inline] + pub fn total_steps(&self) -> usize { + self.total_steps + } + + #[inline] pub fn step_status(&self) -> &StepStatus { &self.step_status } + #[inline] + pub fn last_root_event_index(&self) -> RootEventIndex { + self.last_root_event_index + } + + #[inline] fn sort_key(&self) -> &StepSortKey { &self.sort_key } + /// Returns step events since the provided event index. + pub fn step_events_since( + &self, + last_seen: Option, + ) -> Vec<&StepEvent> { + let mut events: Vec<_> = + self.step_events_since_impl(last_seen).collect(); + events.sort_unstable_by_key(|event| event.event_index); + events + } + // Returns step events since the provided event index. // // Does not necessarily return results in sorted order. - fn step_events_since( + fn step_events_since_impl( &self, last_seen: Option, ) -> impl Iterator> { @@ -799,11 +875,12 @@ impl EventBufferStepData { iter.chain(iter2) } - fn add_high_priority_step_event(&mut self, event: StepEvent) { + fn add_high_priority_step_event(&mut self, root_event: StepEvent) { + let root_event_index = RootEventIndex(root_event.event_index); // Dedup by the *leaf index* in case nested reports aren't deduped // coming in. match self.high_priority.binary_search_by(|probe| { - probe.leaf_event_index().cmp(&event.leaf_event_index()) + probe.leaf_event_index().cmp(&root_event.leaf_event_index()) }) { Ok(_) => { // This is a duplicate. @@ -811,16 +888,19 @@ impl EventBufferStepData { Err(index) => { // index is typically the last element, so this should be quite // efficient. - self.high_priority.insert(index, event); + self.update_root_event_index(root_event_index); + self.high_priority.insert(index, root_event); } } } fn add_low_priority_step_event( &mut self, - event: StepEvent, + root_event: StepEvent, max_low_priority: usize, ) { + let root_event_index = RootEventIndex(root_event.event_index); + let mut updated = false; match &mut self.step_status { StepStatus::NotStarted => { unreachable!( @@ -831,7 +911,7 @@ impl EventBufferStepData { // Dedup by the *leaf index* in case nested reports aren't // deduped coming in. match low_priority.binary_search_by(|probe| { - probe.leaf_event_index().cmp(&event.leaf_event_index()) + probe.leaf_event_index().cmp(&root_event.leaf_event_index()) }) { Ok(_) => { // This is a duplicate. @@ -839,7 +919,8 @@ impl EventBufferStepData { Err(index) => { // The index is almost always at the end, so this is // efficient enough. - low_priority.insert(index, event); + low_priority.insert(index, root_event); + updated = true; } } @@ -857,12 +938,21 @@ impl EventBufferStepData { // likely duplicate events. } } + + if updated { + self.update_root_event_index(root_event_index); + } } - fn mark_completed(&mut self, status: Option) { + fn mark_completed( + &mut self, + status: Option, + root_event_index: RootEventIndex, + ) { match self.step_status { StepStatus::NotStarted | StepStatus::Running { .. } => { self.step_status = StepStatus::Completed { info: status }; + self.update_root_event_index(root_event_index); } StepStatus::Completed { .. } | StepStatus::Failed { .. } @@ -874,10 +964,15 @@ impl EventBufferStepData { } } - fn mark_failed(&mut self, info: Option) { + fn mark_failed( + &mut self, + reason: FailureReason, + root_event_index: RootEventIndex, + ) { match self.step_status { StepStatus::NotStarted | StepStatus::Running { .. } => { - self.step_status = StepStatus::Failed { info }; + self.step_status = StepStatus::Failed { reason }; + self.update_root_event_index(root_event_index); } StepStatus::Completed { .. } | StepStatus::Failed { .. } @@ -889,7 +984,11 @@ impl EventBufferStepData { } } - fn mark_aborted(&mut self, reason: AbortReason) { + fn mark_aborted( + &mut self, + reason: AbortReason, + root_event_index: RootEventIndex, + ) { match &mut self.step_status { StepStatus::NotStarted => { match reason { @@ -909,12 +1008,14 @@ impl EventBufferStepData { }; } } + self.update_root_event_index(root_event_index); } StepStatus::Running { progress_event, .. } => { self.step_status = StepStatus::Aborted { reason, last_progress: Some(progress_event.clone()), }; + self.update_root_event_index(root_event_index); } StepStatus::Completed { .. } | StepStatus::Failed { .. } @@ -926,10 +1027,15 @@ impl EventBufferStepData { } } - fn mark_will_not_be_run(&mut self, reason: WillNotBeRunReason) { + fn mark_will_not_be_run( + &mut self, + reason: WillNotBeRunReason, + root_event_index: RootEventIndex, + ) { match self.step_status { StepStatus::NotStarted => { self.step_status = StepStatus::WillNotBeRun { reason }; + self.update_root_event_index(root_event_index); } StepStatus::Running { .. } => { // This is a weird situation. We should never encounter it in @@ -966,6 +1072,15 @@ impl EventBufferStepData { } } } + + fn update_root_event_index(&mut self, root_event_index: RootEventIndex) { + debug_assert!( + root_event_index >= self.last_root_event_index, + "event index must be monotonically increasing" + ); + self.last_root_event_index = + self.last_root_event_index.max(root_event_index); + } } /// The step status as last seen by events. @@ -990,8 +1105,8 @@ pub enum StepStatus { /// The step has failed. Failed { - /// Failure information. - info: Option, + /// The reason for the failure. + reason: FailureReason, }, /// Execution was aborted while this step was running. @@ -1053,6 +1168,17 @@ pub struct CompletionInfo { pub attempt_elapsed: Duration, } +#[derive(Clone, Debug)] +pub enum FailureReason { + /// This step failed. + StepFailed(FailureInfo), + /// A parent step failed. + ParentFailed { + /// The parent step that failed. + parent_step: StepKey, + }, +} + #[derive(Clone, Debug)] pub struct FailureInfo { pub total_attempts: usize, @@ -1230,12 +1356,24 @@ pub struct StepKey { pub index: usize, } +/// A newtype to track root event indexes within [`EventBuffer`]s, to ensure +/// that we aren't mixing them with leaf event indexes in this code. +#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Hash)] +pub struct RootEventIndex(pub usize); + +impl fmt::Display for RootEventIndex { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + #[cfg(test)] mod tests { use std::collections::HashSet; use anyhow::{bail, ensure, Context}; use futures::StreamExt; + use indexmap::IndexSet; use omicron_test_utils::dev::test_setup_log; use serde::{de::IntoDeserializer, Deserialize}; use tokio::sync::mpsc; @@ -1389,7 +1527,10 @@ mod tests { test_cx .run_filtered_test( "all events passed in", - |buffer, event| buffer.add_event(event.clone()), + |buffer, event| { + buffer.add_event(event.clone()); + true + }, WithDeltas::No, ) .unwrap(); @@ -1397,10 +1538,12 @@ mod tests { test_cx .run_filtered_test( "progress events skipped", - |buffer, event| { - if let Event::Step(event) = event { + |buffer, event| match event { + Event::Step(event) => { buffer.add_step_event(event.clone()); + true } + Event::Progress(_) => false, }, WithDeltas::Both, ) @@ -1410,13 +1553,16 @@ mod tests { .run_filtered_test( "low-priority events skipped", |buffer, event| match event { - Event::Step(event) => { - if event.kind.priority() == StepEventPriority::Low { + Event::Step(event) => match event.kind.priority() { + StepEventPriority::High => { buffer.add_step_event(event.clone()); + true } - } + StepEventPriority::Low => false, + }, Event::Progress(event) => { buffer.add_progress_event(event.clone()); + true } }, WithDeltas::Both, @@ -1427,13 +1573,16 @@ mod tests { .run_filtered_test( "low-priority and progress events skipped", |buffer, event| match event { - Event::Step(event) => { - if event.kind.priority() == StepEventPriority::Low { + Event::Step(event) => match event.kind.priority() { + StepEventPriority::High => { buffer.add_step_event(event.clone()); + true } - } + StepEventPriority::Low => false, + }, Event::Progress(_) => { - // Don't add progress events either. + // Don't add progress events. + false } }, WithDeltas::Both, @@ -1535,6 +1684,18 @@ mod tests { reported_step_events.extend(report.step_events); last_seen = report.last_seen; + // Ensure that the last root index was updated for this + // event's corresponding steps, but not for any others. + if let Event::Step(event) = event { + check_last_root_event_index(event, &buffer) + .with_context(|| { + format!( + "{description}, at index {i} (time {time}):\ + error with last root event index" + ) + })?; + } + // Call last_seen without feeding a new event in to ensure that // a report with no step events is produced. let report = buffer.generate_report_since(last_seen); @@ -1565,7 +1726,10 @@ mod tests { fn run_filtered_test( &self, event_fn_description: &str, - mut event_fn: impl FnMut(&mut EventBuffer, &Event), + mut event_fn: impl FnMut( + &mut EventBuffer, + &Event, + ) -> bool, with_deltas: WithDeltas, ) -> anyhow::Result<()> { match with_deltas { @@ -1590,7 +1754,10 @@ mod tests { fn run_filtered_test_inner( &self, - mut event_fn: impl FnMut(&mut EventBuffer, &Event), + mut event_fn: impl FnMut( + &mut EventBuffer, + &Event, + ) -> bool, with_deltas: bool, ) -> anyhow::Result<()> { let description = format!("with deltas = {with_deltas}"); @@ -1608,8 +1775,8 @@ mod tests { let mut last_seen_opt = with_deltas.then_some(None); for (i, event) in self.generated_events.iter().enumerate() { - (event_fn)(&mut buffer, event); - buffer.add_event(event.clone()); + let event_added = (event_fn)(&mut buffer, event); + let report = match last_seen_opt { Some(last_seen) => buffer.generate_report_since(last_seen), None => buffer.generate_report(), @@ -1628,6 +1795,18 @@ mod tests { }) .unwrap(); + if let Event::Step(event) = event { + if event_added { + check_last_root_event_index(event, &buffer) + .with_context(|| { + format!( + "{description}, at index {i}: \ + error with last root event index" + ) + })?; + } + } + receive_buffer.add_event_report(report.clone()); let this_step_events = receive_buffer.generate_report().step_events; @@ -1814,6 +1993,78 @@ mod tests { } } + fn check_last_root_event_index( + event: &StepEvent, + buffer: &EventBuffer, + ) -> anyhow::Result<()> { + let root_event_index = RootEventIndex(event.event_index); + let event_step_keys = step_keys(event); + let steps = buffer.steps(); + for (step_key, data) in steps.as_slice() { + let data_index = data.last_root_event_index(); + if event_step_keys.contains(step_key) { + ensure!( + data_index == root_event_index, + "last_root_event_index should have been updated \ + but wasn't (actual: {data_index}, expected: {root_event_index}) \ + for step {step_key:?} (event: {event:?})", + ); + } else { + ensure!( + data_index < root_event_index, + "last_root_event_index should *not* have been updated \ + but wasn't (current: {data_index}, new: {root_event_index}) \ + for step {step_key:?} (event: {event:?})", + ); + } + } + + Ok(()) + } + + /// Returns the step keys that this step event would cause updates against, + /// in order from root to leaf. + fn step_keys(event: &StepEvent) -> IndexSet { + let mut out = IndexSet::new(); + step_keys_impl(event, &mut out); + out + } + + fn step_keys_impl( + event: &StepEvent, + out: &mut IndexSet, + ) { + match &event.kind { + StepEventKind::NoStepsDefined | StepEventKind::Unknown => {} + StepEventKind::ExecutionStarted { steps, .. } => { + for step in steps { + out.insert(StepKey { + execution_id: event.execution_id, + index: step.index, + }); + } + } + StepEventKind::ProgressReset { step, .. } + | StepEventKind::AttemptRetry { step, .. } + | StepEventKind::StepCompleted { step, .. } + | StepEventKind::ExecutionCompleted { last_step: step, .. } + | StepEventKind::ExecutionFailed { failed_step: step, .. } + | StepEventKind::ExecutionAborted { aborted_step: step, .. } => { + out.insert(StepKey { + execution_id: event.execution_id, + index: step.info.index, + }); + } + StepEventKind::Nested { step, event, .. } => { + out.insert(StepKey { + execution_id: event.execution_id, + index: step.info.index, + }); + step_keys_impl(event, out); + } + } + } + #[derive(Copy, Clone, Debug)] #[allow(unused)] enum WithDeltas { diff --git a/wicket/src/ui/panes/update.rs b/wicket/src/ui/panes/update.rs index ea68cb4a16..da6f10cf88 100644 --- a/wicket/src/ui/panes/update.rs +++ b/wicket/src/ui/panes/update.rs @@ -29,7 +29,7 @@ use ratatui::widgets::{ use slog::{info, o, Logger}; use tui_tree_widget::{Tree, TreeItem, TreeState}; use update_engine::{ - AbortReason, ExecutionStatus, StepKey, WillNotBeRunReason, + AbortReason, ExecutionStatus, FailureReason, StepKey, WillNotBeRunReason, }; use wicket_common::update_events::{ EventBuffer, EventReport, ProgressEvent, StepOutcome, StepStatus, @@ -340,7 +340,7 @@ impl UpdatePane { Span::styled("Completed", style::successful_update_bold()), ])); } - StepStatus::Failed { info: Some(info) } => { + StepStatus::Failed { reason: FailureReason::StepFailed(info) } => { let mut spans = vec![ Span::styled("Status: ", style::selected()), Span::styled("Failed", style::failed_update_bold()), @@ -381,13 +381,23 @@ impl UpdatePane { } } } - StepStatus::Failed { info: None } => { - // No information is available, so all we can do is say that - // this step failed. - let spans = vec![ + StepStatus::Failed { + reason: FailureReason::ParentFailed { parent_step }, + } => { + let mut spans = vec![ Span::styled("Status: ", style::selected()), Span::styled("Failed", style::failed_update_bold()), ]; + if let Some(value) = id_state.event_buffer.get(parent_step) { + spans.push(Span::styled( + " at parent step ", + style::plain_text(), + )); + spans.push(Span::styled( + value.step_info().description.as_ref(), + style::selected(), + )); + } body.lines.push(Line::from(spans)); } StepStatus::Aborted { diff --git a/wicket/src/wicketd.rs b/wicket/src/wicketd.rs index 160bcb1c6a..2411542429 100644 --- a/wicket/src/wicketd.rs +++ b/wicket/src/wicketd.rs @@ -12,7 +12,7 @@ use tokio::time::{interval, Duration, MissedTickBehavior}; use wicketd_client::types::{ AbortUpdateOptions, ClearUpdateStateOptions, GetInventoryParams, GetInventoryResponse, GetLocationResponse, IgnitionCommand, SpIdentifier, - SpType, StartUpdateOptions, + SpType, StartUpdateOptions, StartUpdateParams, }; use crate::events::EventReportMap; @@ -164,10 +164,11 @@ impl WicketdManager { tokio::spawn(async move { let update_client = create_wicketd_client(&log, addr, WICKETD_TIMEOUT); - let sp: SpIdentifier = component_id.into(); - let response = match update_client - .post_start_update(sp.type_, sp.slot, &options) - .await + let params = StartUpdateParams { + targets: vec![component_id.into()], + options, + }; + let response = match update_client.post_start_update(¶ms).await { Ok(_) => Ok(()), Err(error) => Err(error.to_string()), diff --git a/wicketd/Cargo.toml b/wicketd/Cargo.toml index 8f4faf6c40..4894e370ba 100644 --- a/wicketd/Cargo.toml +++ b/wicketd/Cargo.toml @@ -24,6 +24,7 @@ hubtools.workspace = true http.workspace = true hyper.workspace = true illumos-utils.workspace = true +itertools.workspace = true reqwest.workspace = true schemars.workspace = true serde.workspace = true diff --git a/wicketd/src/helpers.rs b/wicketd/src/helpers.rs new file mode 100644 index 0000000000..a8b47d4f12 --- /dev/null +++ b/wicketd/src/helpers.rs @@ -0,0 +1,41 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Helpers and utility functions for wicketd. + +use std::fmt; + +use gateway_client::types::{SpIdentifier, SpType}; +use itertools::Itertools; + +#[derive(Clone, Debug, Eq, PartialEq, PartialOrd, Ord, Hash)] +pub(crate) struct SpIdentifierDisplay(pub(crate) SpIdentifier); + +impl From for SpIdentifierDisplay { + fn from(id: SpIdentifier) -> Self { + SpIdentifierDisplay(id) + } +} + +impl<'a> From<&'a SpIdentifier> for SpIdentifierDisplay { + fn from(id: &'a SpIdentifier) -> Self { + SpIdentifierDisplay(*id) + } +} + +impl fmt::Display for SpIdentifierDisplay { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self.0.type_ { + SpType::Sled => write!(f, "sled {}", self.0.slot), + SpType::Switch => write!(f, "switch {}", self.0.slot), + SpType::Power => write!(f, "PSC {}", self.0.slot), + } + } +} + +pub(crate) fn sps_to_string>( + sps: impl IntoIterator, +) -> String { + sps.into_iter().map_into().join(", ") +} diff --git a/wicketd/src/http_entrypoints.rs b/wicketd/src/http_entrypoints.rs index 98cac8dc5d..db21d72777 100644 --- a/wicketd/src/http_entrypoints.rs +++ b/wicketd/src/http_entrypoints.rs @@ -4,6 +4,8 @@ //! HTTP entrypoint functions for wicketd +use crate::helpers::sps_to_string; +use crate::helpers::SpIdentifierDisplay; use crate::mgs::GetInventoryError; use crate::mgs::GetInventoryResponse; use crate::mgs::MgsHandle; @@ -44,7 +46,6 @@ use std::net::IpAddr; use std::net::Ipv6Addr; use std::time::Duration; use tokio::io::AsyncWriteExt; -use uuid::Uuid; use wicket_common::rack_setup::PutRssUserConfigInsensitive; use wicket_common::update_events::EventReport; @@ -652,6 +653,15 @@ async fn get_artifacts_and_event_reports( Ok(HttpResponseOk(response)) } +#[derive(Clone, Debug, JsonSchema, Deserialize)] +pub(crate) struct StartUpdateParams { + /// The SP identifiers to start the update with. Must be non-empty. + pub(crate) targets: BTreeSet, + + /// Options for the update. + pub(crate) options: StartUpdateOptions, +} + #[derive(Clone, Debug, JsonSchema, Deserialize)] pub(crate) struct StartUpdateOptions { /// If passed in, fails the update with a simulated error. @@ -730,19 +740,24 @@ impl UpdateTestError { log: &slog::Logger, reason: &str, ) -> HttpError { + let message = self.into_error_string(log, reason).await; + HttpError::for_bad_request(None, message) + } + + pub(crate) async fn into_error_string( + self, + log: &slog::Logger, + reason: &str, + ) -> String { match self { - UpdateTestError::Fail => HttpError::for_bad_request( - None, - format!("Simulated failure while {reason}"), - ), + UpdateTestError::Fail => { + format!("Simulated failure while {reason}") + } UpdateTestError::Timeout { secs } => { slog::info!(log, "Simulating timeout while {reason}"); // 15 seconds should be enough to cause a timeout. tokio::time::sleep(Duration::from_secs(secs)).await; - HttpError::for_bad_request( - None, - "XXX request should time out before this is hit".into(), - ) + "XXX request should time out before this is hit".into() } } } @@ -834,21 +849,27 @@ async fn get_location( })) } -/// An endpoint to start updating a sled. +/// An endpoint to start updating one or more sleds, switches and PSCs. #[endpoint { method = POST, - path = "/update/{type}/{slot}", + path = "/update", }] async fn post_start_update( rqctx: RequestContext, - target: Path, - opts: TypedBody, + params: TypedBody, ) -> Result { let log = &rqctx.log; let rqctx = rqctx.context(); - let target = target.into_inner(); + let params = params.into_inner(); + + if params.targets.is_empty() { + return Err(HttpError::for_bad_request( + None, + "No update targets specified".into(), + )); + } - // Can we update the target SP? We refuse to update if: + // Can we update the target SPs? We refuse to update if: // // 1. We haven't pulled its state in our inventory (most likely cause: the // cubby is empty; less likely cause: the SP is misbehaving, which will @@ -870,70 +891,136 @@ async fn post_start_update( } }; - // Next, do we have the state of the target SP? - let sp_state = match inventory { + // Error cases. + let mut inventory_absent = BTreeSet::new(); + let mut self_update = None; + let mut maybe_self_update = BTreeSet::new(); + + // Next, do we have the states of the target SP? + let sp_states = match inventory { GetInventoryResponse::Response { inventory, .. } => inventory .sps .into_iter() - .filter_map(|sp| if sp.id == target { sp.state } else { None }) - .next(), - GetInventoryResponse::Unavailable => None, - }; - let Some(sp_state) = sp_state else { - return Err(HttpError::for_bad_request( - None, - "cannot update target sled (no inventory state present)".into(), - )); + .filter_map(|sp| { + if params.targets.contains(&sp.id) { + if let Some(sp_state) = sp.state { + Some((sp.id, sp_state)) + } else { + None + } + } else { + None + } + }) + .collect(), + GetInventoryResponse::Unavailable => BTreeMap::new(), }; - // If we have the state of the SP, are we allowed to update it? We - // refuse to try to update our own sled. - match &rqctx.baseboard { - Some(baseboard) => { - if baseboard.identifier() == sp_state.serial_number - && baseboard.model() == sp_state.model - && baseboard.revision() == i64::from(sp_state.revision) - { - return Err(HttpError::for_bad_request( - None, - "cannot update sled where wicketd is running".into(), - )); + for target in ¶ms.targets { + let sp_state = match sp_states.get(target) { + Some(sp_state) => sp_state, + None => { + // The state isn't present, so add to inventory_absent. + inventory_absent.insert(*target); + continue; } - } - None => { - // We don't know our own baseboard, which is a very - // questionable state to be in! For now, we will hard-code - // the possibly locations where we could be running: - // scrimlets can only be in cubbies 14 or 16, so we refuse - // to update either of those. - let target_is_scrimlet = - matches!((target.type_, target.slot), (SpType::Sled, 14 | 16)); - if target_is_scrimlet { - return Err(HttpError::for_bad_request( - None, - "wicketd does not know its own baseboard details: \ - refusing to update either scrimlet" - .into(), - )); + }; + + // If we have the state of the SP, are we allowed to update it? We + // refuse to try to update our own sled. + match &rqctx.baseboard { + Some(baseboard) => { + if baseboard.identifier() == sp_state.serial_number + && baseboard.model() == sp_state.model + && baseboard.revision() == i64::from(sp_state.revision) + { + self_update = Some(*target); + continue; + } + } + None => { + // We don't know our own baseboard, which is a very questionable + // state to be in! For now, we will hard-code the possibly + // locations where we could be running: scrimlets can only be in + // cubbies 14 or 16, so we refuse to update either of those. + let target_is_scrimlet = matches!( + (target.type_, target.slot), + (SpType::Sled, 14 | 16) + ); + if target_is_scrimlet { + maybe_self_update.insert(*target); + continue; + } } } } - let opts = opts.into_inner(); - if let Some(test_error) = opts.test_error { - return Err(test_error.into_http_error(log, "starting update").await); + // Do we have any errors? + let mut errors = Vec::new(); + if !inventory_absent.is_empty() { + errors.push(format!( + "cannot update sleds (no inventory state present for {})", + sps_to_string(&inventory_absent) + )); + } + if let Some(self_update) = self_update { + errors.push(format!( + "cannot update sled where wicketd is running ({})", + SpIdentifierDisplay(self_update) + )); + } + if !maybe_self_update.is_empty() { + errors.push(format!( + "wicketd does not know its own baseboard details: \ + refusing to update either scrimlet ({})", + sps_to_string(&inventory_absent) + )); } - // All pre-flight update checks look OK: start the update. - // - // Generate an ID for this update; the update tracker will send it to the - // sled as part of the InstallinatorImageId, and installinator will send it - // back to our artifact server with its progress reports. - let update_id = Uuid::new_v4(); + if let Some(test_error) = ¶ms.options.test_error { + errors.push(test_error.into_error_string(log, "starting update").await); + } - match rqctx.update_tracker.start(target, update_id, opts).await { - Ok(()) => Ok(HttpResponseUpdatedNoContent {}), - Err(err) => Err(err.to_http_error()), + let start_update_errors = if errors.is_empty() { + // No errors: we can try and proceed with this update. + match rqctx.update_tracker.start(params.targets, params.options).await { + Ok(()) => return Ok(HttpResponseUpdatedNoContent {}), + Err(errors) => errors, + } + } else { + // We've already found errors, so all we want to do is to check whether + // the update tracker thinks there are any errors as well. + match rqctx.update_tracker.update_pre_checks(params.targets).await { + Ok(()) => Vec::new(), + Err(errors) => errors, + } + }; + + errors.extend(start_update_errors.iter().map(|error| error.to_string())); + + // If we get here, we have errors to report. + + match errors.len() { + 0 => { + unreachable!( + "we already returned Ok(_) above if there were no errors" + ) + } + 1 => { + return Err(HttpError::for_bad_request( + None, + errors.pop().unwrap(), + )); + } + _ => { + return Err(HttpError::for_bad_request( + None, + format!( + "multiple errors encountered:\n - {}", + itertools::join(errors, "\n - ") + ), + )); + } } } diff --git a/wicketd/src/lib.rs b/wicketd/src/lib.rs index 78209ea04a..e17c15642c 100644 --- a/wicketd/src/lib.rs +++ b/wicketd/src/lib.rs @@ -6,6 +6,7 @@ mod artifacts; mod bootstrap_addrs; mod config; mod context; +mod helpers; mod http_entrypoints; mod installinator_progress; mod inventory; diff --git a/wicketd/src/update_tracker.rs b/wicketd/src/update_tracker.rs index a95a98bd72..6e36a935f5 100644 --- a/wicketd/src/update_tracker.rs +++ b/wicketd/src/update_tracker.rs @@ -7,6 +7,7 @@ use crate::artifacts::ArtifactIdData; use crate::artifacts::UpdatePlan; use crate::artifacts::WicketdArtifactStore; +use crate::helpers::sps_to_string; use crate::http_entrypoints::GetArtifactsAndEventReportsResponse; use crate::http_entrypoints::StartUpdateOptions; use crate::http_entrypoints::UpdateSimulatedResult; @@ -19,7 +20,6 @@ use anyhow::ensure; use anyhow::Context; use display_error_chain::DisplayErrorChain; use dropshot::HttpError; -use futures::Future; use gateway_client::types::HostPhase2Progress; use gateway_client::types::HostPhase2RecoveryImageId; use gateway_client::types::HostStartupOptions; @@ -156,146 +156,23 @@ impl UpdateTracker { pub(crate) async fn start( &self, - sp: SpIdentifier, - update_id: Uuid, + sps: BTreeSet, opts: StartUpdateOptions, - ) -> Result<(), StartUpdateError> { - self.start_impl(sp, |plan| async { - // Do we need to upload this plan's trampoline phase 2 to MGS? - let upload_trampoline_phase_2_to_mgs = { - let mut upload_trampoline_phase_2_to_mgs = - self.upload_trampoline_phase_2_to_mgs.lock().await; - - match upload_trampoline_phase_2_to_mgs.as_mut() { - Some(prev) => { - // We've previously started an upload - does it match - // this artifact? If not, cancel the old task (which - // might still be trying to upload) and start a new one - // with our current image. - if prev.status.borrow().hash - != plan.trampoline_phase_2.data.hash() - { - // It does _not_ match - we have a new plan with a - // different trampoline image. If the old task is - // still running, cancel it, and start a new one. - prev.task.abort(); - *prev = self - .spawn_upload_trampoline_phase_2_to_mgs(&plan); - } - } - None => { - *upload_trampoline_phase_2_to_mgs = Some( - self.spawn_upload_trampoline_phase_2_to_mgs(&plan), - ); - } - } - - // Both branches above leave `upload_trampoline_phase_2_to_mgs` - // with data, so we can unwrap here to clone the `watch` - // channel. - upload_trampoline_phase_2_to_mgs - .as_ref() - .unwrap() - .status - .clone() - }; - - let event_buffer = Arc::new(StdMutex::new(EventBuffer::new(16))); - let ipr_start_receiver = - self.ipr_update_tracker.register(update_id); - - let update_cx = UpdateContext { - update_id, - sp, - mgs_client: self.mgs_client.clone(), - upload_trampoline_phase_2_to_mgs, - log: self.log.new(o!( - "sp" => format!("{sp:?}"), - "update_id" => update_id.to_string(), - )), - }; - // TODO do we need `UpdateDriver` as a distinct type? - let update_driver = UpdateDriver {}; - - // Using a oneshot channel to communicate the abort handle isn't - // ideal, but it works and is the easiest way to send it without - // restructuring this code. - let (abort_handle_sender, abort_handle_receiver) = - oneshot::channel(); - let task = tokio::spawn(update_driver.run( - plan, - update_cx, - event_buffer.clone(), - ipr_start_receiver, - opts, - abort_handle_sender, - )); - - let abort_handle = abort_handle_receiver - .await - .expect("abort handle is sent immediately"); - - SpUpdateData { task, abort_handle, event_buffer } - }) - .await + ) -> Result<(), Vec> { + let imp = RealSpawnUpdateDriver { update_tracker: self, opts }; + self.start_impl(sps, Some(imp)).await } /// Starts a fake update that doesn't perform any steps, but simply waits - /// for a oneshot receiver to resolve. + /// for a watch receiver to resolve. #[doc(hidden)] pub async fn start_fake_update( &self, - sp: SpIdentifier, - oneshot_receiver: oneshot::Receiver<()>, - ) -> Result<(), StartUpdateError> { - self.start_impl(sp, |_plan| async move { - let (sender, mut receiver) = mpsc::channel(128); - let event_buffer = Arc::new(StdMutex::new(EventBuffer::new(16))); - let event_buffer_2 = event_buffer.clone(); - let log = self.log.clone(); - - let engine = UpdateEngine::new(&log, sender); - let abort_handle = engine.abort_handle(); - - let task = tokio::spawn(async move { - // The step component and ID have been chosen arbitrarily here -- - // they aren't important. - engine - .new_step( - UpdateComponent::Host, - UpdateStepId::RunningInstallinator, - "Fake step that waits for receiver to resolve", - move |_cx| async move { - _ = oneshot_receiver.await; - StepSuccess::new(()).into() - }, - ) - .register(); - - // Spawn a task to accept all events from the executing engine. - let event_receiving_task = tokio::spawn(async move { - while let Some(event) = receiver.recv().await { - event_buffer_2.lock().unwrap().add_event(event); - } - }); - - match engine.execute().await { - Ok(_cx) => (), - Err(err) => { - error!(log, "update failed"; "err" => %err); - } - } - - // Wait for all events to be received and written to the event - // buffer. - event_receiving_task - .await - .expect("event receiving task panicked"); - }); - - SpUpdateData { task, abort_handle, event_buffer } - }) - .await + sps: BTreeSet, + watch_receiver: watch::Receiver<()>, + ) -> Result<(), Vec> { + let imp = FakeUpdateDriver { watch_receiver, log: self.log.clone() }; + self.start_impl(sps, Some(imp)).await } pub(crate) async fn clear_update_state( @@ -315,40 +192,105 @@ impl UpdateTracker { update_data.abort_update(sp, message).await } - async fn start_impl( + /// Checks whether an update can be started for the given SPs, without + /// actually starting it. + /// + /// This should only be used in situations where starting the update is not + /// desired (for example, if we've already encountered errors earlier in the + /// process and we're just adding to the list of errors). In cases where the + /// start method *is* desired, prefer the [`Self::start`] method, which also + /// performs the same checks. + pub(crate) async fn update_pre_checks( &self, - sp: SpIdentifier, - spawn_update_driver: F, - ) -> Result<(), StartUpdateError> + sps: BTreeSet, + ) -> Result<(), Vec> { + self.start_impl::(sps, None).await + } + + async fn start_impl( + &self, + sps: BTreeSet, + spawn_update_driver: Option, + ) -> Result<(), Vec> where - F: FnOnce(UpdatePlan) -> Fut, - Fut: Future + Send, + Spawn: SpawnUpdateDriver, { let mut update_data = self.sp_update_data.lock().await; - let plan = update_data - .artifact_store - .current_plan() - .ok_or(StartUpdateError::TufRepositoryUnavailable)?; + let mut errors = Vec::new(); - match update_data.sp_update_data.entry(sp) { - // Vacant: this is the first time we've started an update to this - // sp. - Entry::Vacant(slot) => { - slot.insert(spawn_update_driver(plan).await); - Ok(()) - } - // Occupied: we've previously started an update to this sp; only - // allow this one if that update is no longer running. - Entry::Occupied(mut slot) => { - if slot.get().task.is_finished() { - slot.insert(spawn_update_driver(plan).await); - Ok(()) - } else { - Err(StartUpdateError::UpdateInProgress(sp)) + let plan = update_data.artifact_store.current_plan(); + if plan.is_none() { + errors.push(StartUpdateError::TufRepositoryUnavailable); + } + + // Check that we're not already updating any of these SPs. + let update_in_progress: Vec<_> = sps + .iter() + .filter(|sp| { + // If we don't have any update data for this SP, it's not in + // progress. + // + // If we do, it's in progress if the task is not finished. + update_data + .sp_update_data + .get(sp) + .map_or(false, |data| !data.task.is_finished()) + }) + .copied() + .collect(); + + if !update_in_progress.is_empty() { + errors.push(StartUpdateError::UpdateInProgress(update_in_progress)); + } + + // If there are any errors, return now. + if !errors.is_empty() { + return Err(errors); + } + + let plan = plan.expect("we'd have returned an error if plan was None"); + + // Call the setup method now. + if let Some(mut spawn_update_driver) = spawn_update_driver { + let setup_data = spawn_update_driver.setup(&plan).await; + + for sp in sps { + match update_data.sp_update_data.entry(sp) { + // Vacant: this is the first time we've started an update to this + // sp. + Entry::Vacant(slot) => { + slot.insert( + spawn_update_driver + .spawn_update_driver( + sp, + plan.clone(), + &setup_data, + ) + .await, + ); + } + // Occupied: we've previously started an update to this sp. + Entry::Occupied(mut slot) => { + assert!( + slot.get().task.is_finished(), + "we just checked that the task was finished" + ); + slot.insert( + spawn_update_driver + .spawn_update_driver( + sp, + plan.clone(), + &setup_data, + ) + .await, + ); + } } } } + + Ok(()) } fn spawn_upload_trampoline_phase_2_to_mgs( @@ -425,6 +367,226 @@ impl UpdateTracker { } } +/// A trait that represents a backend implementation for spawning the update +/// driver. +#[async_trait::async_trait] +trait SpawnUpdateDriver { + /// The type returned by the [`Self::setup`] method. This is passed in by + /// reference to [`Self::spawn_update_driver`]. + type Setup; + + /// Perform setup required to spawn the update driver. + /// + /// This is called *once*, before any calls to + /// [`Self::spawn_update_driver`]. + async fn setup(&mut self, plan: &UpdatePlan) -> Self::Setup; + + /// Spawn the update driver for the given SP. + /// + /// This is called once per SP. + async fn spawn_update_driver( + &mut self, + sp: SpIdentifier, + plan: UpdatePlan, + setup_data: &Self::Setup, + ) -> SpUpdateData; +} + +/// The production implementation of [`SpawnUpdateDriver`]. +/// +/// This implementation spawns real update drivers. +#[derive(Debug)] +struct RealSpawnUpdateDriver<'tr> { + update_tracker: &'tr UpdateTracker, + opts: StartUpdateOptions, +} + +#[async_trait::async_trait] +impl<'tr> SpawnUpdateDriver for RealSpawnUpdateDriver<'tr> { + type Setup = watch::Receiver; + + async fn setup(&mut self, plan: &UpdatePlan) -> Self::Setup { + // Do we need to upload this plan's trampoline phase 2 to MGS? + + let mut upload_trampoline_phase_2_to_mgs = + self.update_tracker.upload_trampoline_phase_2_to_mgs.lock().await; + + match upload_trampoline_phase_2_to_mgs.as_mut() { + Some(prev) => { + // We've previously started an upload - does it match + // this artifact? If not, cancel the old task (which + // might still be trying to upload) and start a new one + // with our current image. + if prev.status.borrow().hash + != plan.trampoline_phase_2.data.hash() + { + // It does _not_ match - we have a new plan with a + // different trampoline image. If the old task is + // still running, cancel it, and start a new one. + prev.task.abort(); + *prev = self + .update_tracker + .spawn_upload_trampoline_phase_2_to_mgs(&plan); + } + } + None => { + *upload_trampoline_phase_2_to_mgs = Some( + self.update_tracker + .spawn_upload_trampoline_phase_2_to_mgs(&plan), + ); + } + } + + // Both branches above leave `upload_trampoline_phase_2_to_mgs` + // with data, so we can unwrap here to clone the `watch` + // channel. + upload_trampoline_phase_2_to_mgs.as_ref().unwrap().status.clone() + } + + async fn spawn_update_driver( + &mut self, + sp: SpIdentifier, + plan: UpdatePlan, + setup_data: &Self::Setup, + ) -> SpUpdateData { + // Generate an ID for this update; the update tracker will send it to the + // sled as part of the InstallinatorImageId, and installinator will send it + // back to our artifact server with its progress reports. + let update_id = Uuid::new_v4(); + + let event_buffer = Arc::new(StdMutex::new(EventBuffer::new(16))); + let ipr_start_receiver = + self.update_tracker.ipr_update_tracker.register(update_id); + + let update_cx = UpdateContext { + update_id, + sp, + mgs_client: self.update_tracker.mgs_client.clone(), + upload_trampoline_phase_2_to_mgs: setup_data.clone(), + log: self.update_tracker.log.new(o!( + "sp" => format!("{sp:?}"), + "update_id" => update_id.to_string(), + )), + }; + // TODO do we need `UpdateDriver` as a distinct type? + let update_driver = UpdateDriver {}; + + // Using a oneshot channel to communicate the abort handle isn't + // ideal, but it works and is the easiest way to send it without + // restructuring this code. + let (abort_handle_sender, abort_handle_receiver) = oneshot::channel(); + let task = tokio::spawn(update_driver.run( + plan, + update_cx, + event_buffer.clone(), + ipr_start_receiver, + self.opts.clone(), + abort_handle_sender, + )); + + let abort_handle = abort_handle_receiver + .await + .expect("abort handle is sent immediately"); + + SpUpdateData { task, abort_handle, event_buffer } + } +} + +/// A fake implementation of [`SpawnUpdateDriver`]. +/// +/// This implementation is only used by tests. It contains a single step that +/// waits for a [`watch::Receiver`] to resolve. +#[derive(Debug)] +struct FakeUpdateDriver { + watch_receiver: watch::Receiver<()>, + log: Logger, +} + +#[async_trait::async_trait] +impl SpawnUpdateDriver for FakeUpdateDriver { + type Setup = (); + + async fn setup(&mut self, _plan: &UpdatePlan) -> Self::Setup {} + + async fn spawn_update_driver( + &mut self, + _sp: SpIdentifier, + _plan: UpdatePlan, + _setup_data: &Self::Setup, + ) -> SpUpdateData { + let (sender, mut receiver) = mpsc::channel(128); + let event_buffer = Arc::new(StdMutex::new(EventBuffer::new(16))); + let event_buffer_2 = event_buffer.clone(); + let log = self.log.clone(); + + let engine = UpdateEngine::new(&log, sender); + let abort_handle = engine.abort_handle(); + + let mut watch_receiver = self.watch_receiver.clone(); + + let task = tokio::spawn(async move { + // The step component and ID have been chosen arbitrarily here -- + // they aren't important. + engine + .new_step( + UpdateComponent::Host, + UpdateStepId::RunningInstallinator, + "Fake step that waits for receiver to resolve", + move |_cx| async move { + // This will resolve as soon as the watch sender + // (typically a test) sends a value over the watch + // channel. + _ = watch_receiver.changed().await; + StepSuccess::new(()).into() + }, + ) + .register(); + + // Spawn a task to accept all events from the executing engine. + let event_receiving_task = tokio::spawn(async move { + while let Some(event) = receiver.recv().await { + event_buffer_2.lock().unwrap().add_event(event); + } + }); + + match engine.execute().await { + Ok(_cx) => (), + Err(err) => { + error!(log, "update failed"; "err" => %err); + } + } + + // Wait for all events to be received and written to the event + // buffer. + event_receiving_task.await.expect("event receiving task panicked"); + }); + + SpUpdateData { task, abort_handle, event_buffer } + } +} + +/// An implementation of [`SpawnUpdateDriver`] that cannot be constructed. +/// +/// This is an uninhabited type (an empty enum), and is only used to provide a +/// type parameter for the [`UpdateTracker::update_pre_checks`] method. +enum NeverUpdateDriver {} + +#[async_trait::async_trait] +impl SpawnUpdateDriver for NeverUpdateDriver { + type Setup = (); + + async fn setup(&mut self, _plan: &UpdatePlan) -> Self::Setup {} + + async fn spawn_update_driver( + &mut self, + _sp: SpIdentifier, + _plan: UpdatePlan, + _setup_data: &Self::Setup, + ) -> SpUpdateData { + unreachable!("this update driver cannot be constructed") + } +} + #[derive(Debug)] struct UpdateTrackerData { artifact_store: WicketdArtifactStore, @@ -518,21 +680,8 @@ impl UpdateTrackerData { pub enum StartUpdateError { #[error("no TUF repository available")] TufRepositoryUnavailable, - #[error("target is already being updated: {0:?}")] - UpdateInProgress(SpIdentifier), -} - -impl StartUpdateError { - pub(crate) fn to_http_error(&self) -> HttpError { - let message = DisplayErrorChain::new(self).to_string(); - - match self { - StartUpdateError::TufRepositoryUnavailable - | StartUpdateError::UpdateInProgress(_) => { - HttpError::for_bad_request(None, message) - } - } - } + #[error("targets are already being updated: {}", sps_to_string(.0))] + UpdateInProgress(Vec), } #[derive(Debug, Clone, Error, Eq, PartialEq)] diff --git a/wicketd/tests/integration_tests/updates.rs b/wicketd/tests/integration_tests/updates.rs index a4b330930a..a198068ef3 100644 --- a/wicketd/tests/integration_tests/updates.rs +++ b/wicketd/tests/integration_tests/updates.rs @@ -16,13 +16,13 @@ use omicron_common::{ api::internal::nexus::KnownArtifactKind, update::{ArtifactHashId, ArtifactKind}, }; -use tokio::sync::oneshot; +use tokio::sync::watch; use uuid::Uuid; use wicket_common::update_events::{StepEventKind, UpdateComponent}; use wicketd::{RunningUpdateState, StartUpdateError}; use wicketd_client::types::{ GetInventoryParams, GetInventoryResponse, SpIdentifier, SpType, - StartUpdateOptions, + StartUpdateOptions, StartUpdateParams, }; #[tokio::test] @@ -138,13 +138,11 @@ async fn test_updates() { } // Now, try starting the update on SP 0. + let options = StartUpdateOptions::default(); + let params = StartUpdateParams { targets: vec![target_sp], options }; wicketd_testctx .wicketd_client - .post_start_update( - target_sp.type_, - target_sp.slot, - &StartUpdateOptions::default(), - ) + .post_start_update(¶ms) .await .expect("update started successfully"); @@ -352,12 +350,13 @@ async fn test_update_races() { slot: 0, type_: gateway_client::types::SpType::Sled, }; + let sps: BTreeSet<_> = vec![sp].into_iter().collect(); - let (sender, receiver) = oneshot::channel(); + let (sender, receiver) = watch::channel(()); wicketd_testctx .server .update_tracker - .start_fake_update(sp, receiver) + .start_fake_update(sps.clone(), receiver) .await .expect("start_fake_update successful"); @@ -372,14 +371,18 @@ async fn test_update_races() { // Also try starting another fake update, which should fail -- we don't let // updates be started in the middle of other updates. { - let (_, receiver) = oneshot::channel(); + let (_, receiver) = watch::channel(()); let err = wicketd_testctx .server .update_tracker - .start_fake_update(sp, receiver) + .start_fake_update(sps, receiver) .await .expect_err("start_fake_update failed while update is running"); - assert_eq!(err, StartUpdateError::UpdateInProgress(sp)); + assert_eq!(err.len(), 1, "one error returned: {err:?}"); + assert_eq!( + err.first().unwrap(), + &StartUpdateError::UpdateInProgress(vec![sp]) + ); } // Unblock the update, letting it run to completion.