diff --git a/crates/core/tedge_agent/src/operation_workflows/actor.rs b/crates/core/tedge_agent/src/operation_workflows/actor.rs index 5623d0820e..9dbbe50a1f 100644 --- a/crates/core/tedge_agent/src/operation_workflows/actor.rs +++ b/crates/core/tedge_agent/src/operation_workflows/actor.rs @@ -159,7 +159,7 @@ impl WorkflowActor { Ok(Some(new_state)) => { self.persist_command_board().await?; if new_state.is_init() { - self.process_command_update(new_state.set_log_path(&log_file.path)) + self.process_command_update(new_state.with_log_path(&log_file.path)) .await?; } } diff --git a/crates/core/tedge_agent/src/operation_workflows/persist.rs b/crates/core/tedge_agent/src/operation_workflows/persist.rs index 96eb153214..a537650d24 100644 --- a/crates/core/tedge_agent/src/operation_workflows/persist.rs +++ b/crates/core/tedge_agent/src/operation_workflows/persist.rs @@ -261,11 +261,7 @@ impl WorkflowRepository { /// Copy the workflow definition file to the persisted state directory, /// unless this has already been done. - async fn persist_workflow_definition( - &mut self, - operation: &OperationName, - version: &WorkflowVersion, - ) { + async fn persist_workflow_definition(&mut self, operation: &str, version: &str) { if version_is_builtin(version) { return; } @@ -279,16 +275,12 @@ impl WorkflowRepository { if let Err(err) = tokio::fs::copy(source.clone(), target.clone()).await { error!("Fail to persist a copy of {source} as {target}: {err}"); } else { - self.in_use_copies.insert(version.clone(), 1); + self.in_use_copies.insert(version.to_owned(), 1); } } } - fn workflow_copy_path( - &self, - operation: &OperationName, - version: &WorkflowVersion, - ) -> Utf8PathBuf { + fn workflow_copy_path(&self, operation: &str, version: &str) -> Utf8PathBuf { let filename = format!("{operation}-{version}"); self.state_dir.join(filename).with_extension("toml") } @@ -305,7 +297,7 @@ impl WorkflowRepository { } } - async fn release_in_use_copy(&mut self, operation: &OperationName, version: &WorkflowVersion) { + async fn release_in_use_copy(&mut self, operation: &str, version: &str) { if version_is_builtin(version) { return; } @@ -359,7 +351,7 @@ impl WorkflowRepository { if let Some(current_version) = self.workflows.use_current_version(&operation) { self.persist_workflow_definition(&operation, ¤t_version) .await; - *command = command.clone().set_workflow_version(¤t_version); + command.set_workflow_version(¤t_version); } } } @@ -410,14 +402,14 @@ impl WorkflowRepository { operation: &OperationType, command_state: GenericCommandState, ) -> Result, WorkflowExecutionError> { + let operation_name = operation.name(); if command_state.is_init() { // A new command instance must use the latest on-disk version of the operation workflow - self.load_latest_version(&operation.to_string()).await; + self.load_latest_version(&operation_name).await; } else if command_state.is_finished() { // Clear the cache if this happens to be the latest instance using that version of the workflow if let Some(version) = command_state.workflow_version() { - self.release_in_use_copy(&operation.to_string(), &version) - .await; + self.release_in_use_copy(&operation_name, version).await; } } @@ -429,7 +421,7 @@ impl WorkflowRepository { Some(new_state) if new_state.is_init() => { if let Some(version) = new_state.workflow_version() { - self.persist_workflow_definition(&operation.to_string(), &version) + self.persist_workflow_definition(&operation_name, version) .await; } Ok(Some(new_state)) diff --git a/crates/core/tedge_api/src/mqtt_topics.rs b/crates/core/tedge_api/src/mqtt_topics.rs index c33ed54485..7f98e74ac3 100644 --- a/crates/core/tedge_api/src/mqtt_topics.rs +++ b/crates/core/tedge_api/src/mqtt_topics.rs @@ -737,12 +737,6 @@ impl<'a> From<&'a str> for OperationType { } } -impl From<&OperationType> for String { - fn from(value: &OperationType) -> Self { - format!("{value}") - } -} - impl Display for OperationType { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { @@ -760,6 +754,12 @@ impl Display for OperationType { } } +impl OperationType { + pub fn name(&self) -> String { + format!("{self}") + } +} + #[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)] pub enum ChannelError { #[error("Channel needs to have at least 2 segments")] diff --git a/crates/core/tedge_api/src/workflow/mod.rs b/crates/core/tedge_api/src/workflow/mod.rs index f1f57c6d65..4065670a4b 100644 --- a/crates/core/tedge_api/src/workflow/mod.rs +++ b/crates/core/tedge_api/src/workflow/mod.rs @@ -32,7 +32,7 @@ pub type WorkflowVersion = String; const BUILT_IN: &str = "builtin"; -pub fn version_is_builtin(version: &WorkflowVersion) -> bool { +pub fn version_is_builtin(version: &str) -> bool { version == BUILT_IN } @@ -342,7 +342,7 @@ impl OperationWorkflow { self.states .get(&command_state.status) .ok_or_else(|| WorkflowExecutionError::UnknownStep { - operation: (&self.operation).into(), + operation: self.operation.name(), step: command_state.status.clone(), }) .map(|action| action.inject_state(command_state)) diff --git a/crates/core/tedge_api/src/workflow/state.rs b/crates/core/tedge_api/src/workflow/state.rs index 7aef332d23..5ecfba3d54 100644 --- a/crates/core/tedge_api/src/workflow/state.rs +++ b/crates/core/tedge_api/src/workflow/state.rs @@ -186,8 +186,18 @@ impl GenericCommandState { } } - pub fn update_with_key_value(self, key: &str, val: &str) -> Self { - self.update_with_json(json!({ key: val })) + pub fn with_key_value(mut self, key: &str, val: &str) -> Self { + self.set_key_value(key, val); + self + } + + fn set_key_value(&mut self, key: &str, val: &str) { + if let Some(o) = self.payload.as_object_mut() { + o.insert(key.into(), val.into()); + } + if key == STATUS { + self.status = val.to_owned(); + } } pub fn get_log_path(&self) -> Option { @@ -197,19 +207,28 @@ impl GenericCommandState { .map(Utf8PathBuf::from) } - pub fn set_log_path>(self, path: P) -> Self { - self.update_with_key_value(OP_LOG_PATH_KEY, path.as_ref().as_str()) + pub fn with_log_path>(mut self, path: P) -> Self { + self.set_log_path(path); + self } - pub fn workflow_version(&self) -> Option { + pub fn set_log_path>(&mut self, path: P) { + self.set_key_value(OP_LOG_PATH_KEY, path.as_ref().as_str()) + } + + pub fn workflow_version(&self) -> Option<&str> { self.payload .get(OP_WORKFLOW_VERSION_KEY) .and_then(|val| val.as_str()) - .map(|str| str.to_string()) } - pub fn set_workflow_version(self, version: &str) -> Self { - self.update_with_key_value(OP_WORKFLOW_VERSION_KEY, version) + pub fn with_workflow_version(mut self, version: &str) -> Self { + self.set_workflow_version(version); + self + } + + pub fn set_workflow_version(&mut self, version: &str) { + self.set_key_value(OP_WORKFLOW_VERSION_KEY, version) } /// Update the command state with the outcome of a script diff --git a/crates/core/tedge_api/src/workflow/supervisor.rs b/crates/core/tedge_api/src/workflow/supervisor.rs index 29200b0b0c..3fc7265a6b 100644 --- a/crates/core/tedge_api/src/workflow/supervisor.rs +++ b/crates/core/tedge_api/src/workflow/supervisor.rs @@ -80,7 +80,7 @@ impl WorkflowSupervisor { self.commands = commands; self.commands .iter() - .filter_map(|(t, s)| self.resume_command(t, s)) + .filter_map(|(t, s)| self.resume_command(t, s.clone())) .collect() } @@ -136,10 +136,10 @@ impl WorkflowSupervisor { /// /// Return the current version if any. pub fn use_current_version(&mut self, operation: &OperationName) -> Option { - match self.workflows.get_mut(&operation.as_str().into()) { - Some(versions) => versions.use_current_version().cloned(), - None => None, - } + self.workflows + .get_mut(&operation.as_str().into())? + .use_current_version() + .cloned() } /// Update the state of the command board on reception of a message sent by a peer over MQTT @@ -162,9 +162,9 @@ impl WorkflowSupervisor { } else if command_state.is_init() { // This is a new command request if let Some(current_version) = workflow_versions.use_current_version() { - let command_state = command_state.set_workflow_version(current_version); - self.commands.insert(command_state.clone())?; - Ok(Some(command_state)) + let updated_state = command_state.with_workflow_version(current_version); + self.commands.insert(updated_state.clone())?; + Ok(Some(updated_state)) } else { return Err(WorkflowExecutionError::DeprecatedOperation { operation: operation.to_string(), @@ -192,7 +192,7 @@ impl WorkflowSupervisor { }); }; - let Some(version) = &command_state.workflow_version() else { + let Some(version) = command_state.workflow_version() else { return Err(WorkflowExecutionError::MissingVersion); }; @@ -287,21 +287,17 @@ impl WorkflowSupervisor { fn resume_command( &self, timestamp: &Timestamp, - command: &GenericCommandState, + command: GenericCommandState, ) -> Option { - let action = match self.get_action(command) { + let action = match self.get_action(&command) { Ok(action) => action, Err(err) => { - return Some( - command - .clone() - .fail_with(format!("Fail to resume on start: {err:?}")), - ); + return Some(command.fail_with(format!("Fail to resume on start: {err:?}"))); } }; let epoch = format!("{}.{}", timestamp.unix_timestamp(), timestamp.millisecond()); - let command = command.clone().update_with_key_value("resumed_at", &epoch); + let command = command.with_key_value("resumed_at", &epoch); match action { OperationAction::AwaitingAgentRestart(handlers) => { Some(command.update(handlers.on_success)) @@ -425,7 +421,7 @@ impl WorkflowVersions { self.in_use.contains_key(BUILT_IN) } - fn get(&self, version: &WorkflowVersion) -> Result<&OperationWorkflow, WorkflowExecutionError> { + fn get(&self, version: &str) -> Result<&OperationWorkflow, WorkflowExecutionError> { self.in_use .get(version) .ok_or(WorkflowExecutionError::UnknownVersion {