Skip to content

Commit

Permalink
Merge pull request #3309 from didier-wenzek/refactor/simplify_command…
Browse files Browse the repository at this point in the history
…_state_updates

refactor: Simplify command state update methods
  • Loading branch information
didier-wenzek authored Dec 19, 2024
2 parents b48c19c + b9fea1b commit b843d72
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 52 deletions.
2 changes: 1 addition & 1 deletion crates/core/tedge_agent/src/operation_workflows/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl WorkflowActor {
Ok(Some(new_state)) => {
self.persist_command_board().await?;
if new_state.is_init() {
self.process_command_update(new_state.set_log_path(&log_file.path))
self.process_command_update(new_state.with_log_path(&log_file.path))
.await?;
}
}
Expand Down
26 changes: 9 additions & 17 deletions crates/core/tedge_agent/src/operation_workflows/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,7 @@ impl WorkflowRepository {

/// Copy the workflow definition file to the persisted state directory,
/// unless this has already been done.
async fn persist_workflow_definition(
&mut self,
operation: &OperationName,
version: &WorkflowVersion,
) {
async fn persist_workflow_definition(&mut self, operation: &str, version: &str) {
if version_is_builtin(version) {
return;
}
Expand All @@ -279,16 +275,12 @@ impl WorkflowRepository {
if let Err(err) = tokio::fs::copy(source.clone(), target.clone()).await {
error!("Fail to persist a copy of {source} as {target}: {err}");
} else {
self.in_use_copies.insert(version.clone(), 1);
self.in_use_copies.insert(version.to_owned(), 1);
}
}
}

fn workflow_copy_path(
&self,
operation: &OperationName,
version: &WorkflowVersion,
) -> Utf8PathBuf {
fn workflow_copy_path(&self, operation: &str, version: &str) -> Utf8PathBuf {
let filename = format!("{operation}-{version}");
self.state_dir.join(filename).with_extension("toml")
}
Expand All @@ -305,7 +297,7 @@ impl WorkflowRepository {
}
}

async fn release_in_use_copy(&mut self, operation: &OperationName, version: &WorkflowVersion) {
async fn release_in_use_copy(&mut self, operation: &str, version: &str) {
if version_is_builtin(version) {
return;
}
Expand Down Expand Up @@ -359,7 +351,7 @@ impl WorkflowRepository {
if let Some(current_version) = self.workflows.use_current_version(&operation) {
self.persist_workflow_definition(&operation, &current_version)
.await;
*command = command.clone().set_workflow_version(&current_version);
command.set_workflow_version(&current_version);
}
}
}
Expand Down Expand Up @@ -410,14 +402,14 @@ impl WorkflowRepository {
operation: &OperationType,
command_state: GenericCommandState,
) -> Result<Option<GenericCommandState>, WorkflowExecutionError> {
let operation_name = operation.name();
if command_state.is_init() {
// A new command instance must use the latest on-disk version of the operation workflow
self.load_latest_version(&operation.to_string()).await;
self.load_latest_version(&operation_name).await;
} else if command_state.is_finished() {
// Clear the cache if this happens to be the latest instance using that version of the workflow
if let Some(version) = command_state.workflow_version() {
self.release_in_use_copy(&operation.to_string(), &version)
.await;
self.release_in_use_copy(&operation_name, version).await;
}
}

Expand All @@ -429,7 +421,7 @@ impl WorkflowRepository {

Some(new_state) if new_state.is_init() => {
if let Some(version) = new_state.workflow_version() {
self.persist_workflow_definition(&operation.to_string(), &version)
self.persist_workflow_definition(&operation_name, version)
.await;
}
Ok(Some(new_state))
Expand Down
12 changes: 6 additions & 6 deletions crates/core/tedge_api/src/mqtt_topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -737,12 +737,6 @@ impl<'a> From<&'a str> for OperationType {
}
}

impl From<&OperationType> for String {
fn from(value: &OperationType) -> Self {
format!("{value}")
}
}

impl Display for OperationType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Expand All @@ -760,6 +754,12 @@ impl Display for OperationType {
}
}

impl OperationType {
pub fn name(&self) -> String {
format!("{self}")
}
}

#[derive(Debug, thiserror::Error, PartialEq, Eq, Clone)]
pub enum ChannelError {
#[error("Channel needs to have at least 2 segments")]
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_api/src/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub type WorkflowVersion = String;

const BUILT_IN: &str = "builtin";

pub fn version_is_builtin(version: &WorkflowVersion) -> bool {
pub fn version_is_builtin(version: &str) -> bool {
version == BUILT_IN
}

Expand Down Expand Up @@ -342,7 +342,7 @@ impl OperationWorkflow {
self.states
.get(&command_state.status)
.ok_or_else(|| WorkflowExecutionError::UnknownStep {
operation: (&self.operation).into(),
operation: self.operation.name(),
step: command_state.status.clone(),
})
.map(|action| action.inject_state(command_state))
Expand Down
35 changes: 27 additions & 8 deletions crates/core/tedge_api/src/workflow/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,18 @@ impl GenericCommandState {
}
}

pub fn update_with_key_value(self, key: &str, val: &str) -> Self {
self.update_with_json(json!({ key: val }))
pub fn with_key_value(mut self, key: &str, val: &str) -> Self {
self.set_key_value(key, val);
self
}

fn set_key_value(&mut self, key: &str, val: &str) {
if let Some(o) = self.payload.as_object_mut() {
o.insert(key.into(), val.into());
}
if key == STATUS {
self.status = val.to_owned();
}
}

pub fn get_log_path(&self) -> Option<Utf8PathBuf> {
Expand All @@ -197,19 +207,28 @@ impl GenericCommandState {
.map(Utf8PathBuf::from)
}

pub fn set_log_path<P: AsRef<Utf8Path>>(self, path: P) -> Self {
self.update_with_key_value(OP_LOG_PATH_KEY, path.as_ref().as_str())
pub fn with_log_path<P: AsRef<Utf8Path>>(mut self, path: P) -> Self {
self.set_log_path(path);
self
}

pub fn workflow_version(&self) -> Option<String> {
pub fn set_log_path<P: AsRef<Utf8Path>>(&mut self, path: P) {
self.set_key_value(OP_LOG_PATH_KEY, path.as_ref().as_str())
}

pub fn workflow_version(&self) -> Option<&str> {
self.payload
.get(OP_WORKFLOW_VERSION_KEY)
.and_then(|val| val.as_str())
.map(|str| str.to_string())
}

pub fn set_workflow_version(self, version: &str) -> Self {
self.update_with_key_value(OP_WORKFLOW_VERSION_KEY, version)
pub fn with_workflow_version(mut self, version: &str) -> Self {
self.set_workflow_version(version);
self
}

pub fn set_workflow_version(&mut self, version: &str) {
self.set_key_value(OP_WORKFLOW_VERSION_KEY, version)
}

/// Update the command state with the outcome of a script
Expand Down
32 changes: 14 additions & 18 deletions crates/core/tedge_api/src/workflow/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl WorkflowSupervisor {
self.commands = commands;
self.commands
.iter()
.filter_map(|(t, s)| self.resume_command(t, s))
.filter_map(|(t, s)| self.resume_command(t, s.clone()))
.collect()
}

Expand Down Expand Up @@ -136,10 +136,10 @@ impl WorkflowSupervisor {
///
/// Return the current version if any.
pub fn use_current_version(&mut self, operation: &OperationName) -> Option<WorkflowVersion> {
match self.workflows.get_mut(&operation.as_str().into()) {
Some(versions) => versions.use_current_version().cloned(),
None => None,
}
self.workflows
.get_mut(&operation.as_str().into())?
.use_current_version()
.cloned()
}

/// Update the state of the command board on reception of a message sent by a peer over MQTT
Expand All @@ -162,9 +162,9 @@ impl WorkflowSupervisor {
} else if command_state.is_init() {
// This is a new command request
if let Some(current_version) = workflow_versions.use_current_version() {
let command_state = command_state.set_workflow_version(current_version);
self.commands.insert(command_state.clone())?;
Ok(Some(command_state))
let updated_state = command_state.with_workflow_version(current_version);
self.commands.insert(updated_state.clone())?;
Ok(Some(updated_state))
} else {
return Err(WorkflowExecutionError::DeprecatedOperation {
operation: operation.to_string(),
Expand Down Expand Up @@ -192,7 +192,7 @@ impl WorkflowSupervisor {
});
};

let Some(version) = &command_state.workflow_version() else {
let Some(version) = command_state.workflow_version() else {
return Err(WorkflowExecutionError::MissingVersion);
};

Expand Down Expand Up @@ -287,21 +287,17 @@ impl WorkflowSupervisor {
fn resume_command(
&self,
timestamp: &Timestamp,
command: &GenericCommandState,
command: GenericCommandState,
) -> Option<GenericCommandState> {
let action = match self.get_action(command) {
let action = match self.get_action(&command) {
Ok(action) => action,
Err(err) => {
return Some(
command
.clone()
.fail_with(format!("Fail to resume on start: {err:?}")),
);
return Some(command.fail_with(format!("Fail to resume on start: {err:?}")));
}
};

let epoch = format!("{}.{}", timestamp.unix_timestamp(), timestamp.millisecond());
let command = command.clone().update_with_key_value("resumed_at", &epoch);
let command = command.with_key_value("resumed_at", &epoch);
match action {
OperationAction::AwaitingAgentRestart(handlers) => {
Some(command.update(handlers.on_success))
Expand Down Expand Up @@ -425,7 +421,7 @@ impl WorkflowVersions {
self.in_use.contains_key(BUILT_IN)
}

fn get(&self, version: &WorkflowVersion) -> Result<&OperationWorkflow, WorkflowExecutionError> {
fn get(&self, version: &str) -> Result<&OperationWorkflow, WorkflowExecutionError> {
self.in_use
.get(version)
.ok_or(WorkflowExecutionError::UnknownVersion {
Expand Down

0 comments on commit b843d72

Please sign in to comment.