Skip to content

Commit

Permalink
Merge pull request #3307 from didier-wenzek/fix/agent-self-update-wit…
Browse files Browse the repository at this point in the history
…h-unversionned-pending-commands

fix: Persist workflow definition of pending commands on start
  • Loading branch information
reubenmiller authored Dec 19, 2024
2 parents d710940 + b36ce65 commit 99b38fa
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 22 deletions.
1 change: 1 addition & 0 deletions crates/core/tedge_agent/src/operation_workflows/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,7 @@ impl WorkflowActor {
for command in self
.workflow_repository
.load_pending_commands(pending_commands)
.await
{
// Make sure the latest state is visible over MQTT
self.mqtt_publisher
Expand Down
20 changes: 19 additions & 1 deletion crates/core/tedge_agent/src/operation_workflows/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,25 @@ impl WorkflowRepository {
None
}

pub fn load_pending_commands(&mut self, commands: CommandBoard) -> Vec<GenericCommandState> {
pub async fn load_pending_commands(
&mut self,
mut commands: CommandBoard,
) -> Vec<GenericCommandState> {
// If the resumed commands have been triggered by an agent without workflow version management
// then these commands are assigned the current version of the operation workflow.
// These currents versions have also to be marked as in use and persisted.
for (_, ref mut command) in commands.iter_mut() {
if command.workflow_version().is_none() {
if let Some(operation) = command.operation() {
if let Some(current_version) = self.workflows.use_current_version(&operation) {
self.persist_workflow_definition(&operation, &current_version)
.await;
*command = command.clone().set_workflow_version(&current_version);
}
}
}
}

self.workflows.load_pending_commands(commands)
}

Expand Down
34 changes: 13 additions & 21 deletions crates/core/tedge_api/src/workflow/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,7 @@ impl WorkflowSupervisor {
}

/// Update on start the set of pending commands
pub fn load_pending_commands(
&mut self,
mut commands: CommandBoard,
) -> Vec<GenericCommandState> {
// If the resumed commands have been triggered by an agent without workflow version management
// then these commands are assigned the current version for the operation.
// These currents versions have also to be marked as in use.
for (_, ref mut command) in commands.iter_mut() {
if command.workflow_version().is_none() {
if let Some(versions) = command
.operation()
.and_then(|operation| self.workflows.get_mut(&operation.as_str().into()))
{
if let Some(current_version) = versions.use_current_version() {
*command = command.clone().set_workflow_version(current_version);
}
}
}
}
pub fn load_pending_commands(&mut self, commands: CommandBoard) -> Vec<GenericCommandState> {
self.commands = commands;
self.commands
.iter()
Expand Down Expand Up @@ -150,6 +132,16 @@ impl WorkflowSupervisor {
}
}

/// Mark the current version of an operation workflow as being in use.
///
/// Return the current version if any.
pub fn use_current_version(&mut self, operation: &OperationName) -> Option<WorkflowVersion> {
match self.workflows.get_mut(&operation.as_str().into()) {
Some(versions) => versions.use_current_version().cloned(),
None => None,
}
}

/// Update the state of the command board on reception of a message sent by a peer over MQTT
///
/// Return the new CommandRequest state if any.
Expand Down Expand Up @@ -399,7 +391,7 @@ impl WorkflowVersions {
}
}

// Mark the current version as being in-use.
/// Mark the current version as being in-use.
fn use_current_version(&mut self) -> Option<&WorkflowVersion> {
match self.current.as_ref() {
Some((version, workflow)) => {
Expand All @@ -416,7 +408,7 @@ impl WorkflowVersions {
}
}

// Remove the current version from this list of versions, restoring the built-in version if any
/// Remove the current version from this list of versions, restoring the built-in version if any
fn remove(&mut self, version: &WorkflowVersion) {
if self.current.as_ref().map(|(v, _)| v == version) == Some(true) {
self.current = None;
Expand Down

0 comments on commit 99b38fa

Please sign in to comment.