Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support user metadata in child workflow, timer, and activity #846

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 57 additions & 5 deletions core/src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::{
advance_fut, job_assert, prost_dur,
test_help::{
build_fake_worker, build_mock_pollers, canned_histories, gen_assert_and_reply,
mock_manual_poller, mock_poller, mock_poller_from_resps, mock_worker, poll_and_reply,
single_hist_mock_sg, test_worker_cfg, MockPollCfg, MockWorkerInputs, MocksHolder,
QueueResponse, ResponseType, WorkerExt, WorkflowCachingPolicy, TEST_Q,
mock_manual_poller, mock_poller, mock_poller_from_resps, mock_sdk_cfg, mock_worker,
poll_and_reply, single_hist_mock_sg, test_worker_cfg, MockPollCfg, MockWorkerInputs,
MocksHolder, QueueResponse, ResponseType, WorkerExt, WorkflowCachingPolicy, TEST_Q,
},
worker::client::mocks::{mock_manual_workflow_client, mock_workflow_client},
ActivityHeartbeat, Worker,
Expand Down Expand Up @@ -45,17 +45,18 @@ use temporal_sdk_core_protos::{
},
temporal::api::{
command::v1::{command::Attributes, ScheduleActivityTaskCommandAttributes},
enums::v1::EventType,
enums::v1::{CommandType, EventType},
history::v1::{
history_event::Attributes as EventAttributes, ActivityTaskScheduledEventAttributes,
},
sdk::v1::UserMetadata,
workflowservice::v1::{
PollActivityTaskQueueResponse, RecordActivityTaskHeartbeatResponse,
RespondActivityTaskCanceledResponse, RespondActivityTaskCompletedResponse,
RespondActivityTaskFailedResponse, RespondWorkflowTaskCompletedResponse,
},
},
TestHistoryBuilder, DEFAULT_WORKFLOW_TYPE,
TestHistoryBuilder, DEFAULT_ACTIVITY_TYPE, DEFAULT_WORKFLOW_TYPE,
};
use temporal_sdk_core_test_utils::{fanout_tasks, start_timer_cmd, TestWorker};
use tokio::{join, sync::Barrier, time::sleep};
Expand Down Expand Up @@ -1187,3 +1188,54 @@ async fn activities_must_be_flushed_to_server_on_shutdown(#[values(true, false)]
};
join!(shutdown_task, complete_task);
}

#[tokio::test]
async fn pass_activity_summary_to_metadata() {
let t = canned_histories::single_activity("1");
let mut mock_cfg = MockPollCfg::from_hist_builder(t);
let wf_id = mock_cfg.hists[0].wf_id.clone();
let wf_type = DEFAULT_WORKFLOW_TYPE;
let expected_user_metadata = Some(UserMetadata {
summary: Some(b"activity summary".into()),
details: None,
});
mock_cfg.completion_asserts_from_expectations(|mut asserts| {
asserts
.then(move |wft| {
assert_eq!(wft.commands.len(), 1);
assert_eq!(
wft.commands[0].command_type(),
CommandType::ScheduleActivityTask
);
assert_eq!(wft.commands[0].user_metadata, expected_user_metadata)
})
.then(move |wft| {
assert_eq!(wft.commands.len(), 1);
assert_eq!(
wft.commands[0].command_type(),
CommandType::CompleteWorkflowExecution
);
});
});

let mut worker = mock_sdk_cfg(mock_cfg, |_| {});
worker.register_wf(wf_type, |ctx: WfContext| async move {
ctx.activity(ActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
summary: Some("activity summary".to_string()),
..Default::default()
})
.await;
Ok(().into())
});
worker
.submit_wf(
wf_id.to_owned(),
wf_type.to_owned(),
vec![],
WorkflowOptions::default(),
)
.await
.unwrap();
worker.run_until_done().await.unwrap();
}
73 changes: 65 additions & 8 deletions core/src/core_tests/child_workflows.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
use crate::{
replay::DEFAULT_WORKFLOW_TYPE,
test_help::{
build_fake_sdk, canned_histories, mock_sdk, mock_worker, single_hist_mock_sg, MockPollCfg,
ResponseType,
build_fake_sdk, canned_histories, mock_sdk, mock_sdk_cfg, mock_worker, single_hist_mock_sg,
MockPollCfg, ResponseType,
},
worker::client::mocks::mock_workflow_client,
};
use temporal_client::WorkflowOptions;
use temporal_sdk::{ChildWorkflowOptions, Signal, WfContext, WorkflowResult};
use temporal_sdk_core_api::Worker;
use temporal_sdk_core_protos::coresdk::{
child_workflow::{child_workflow_result, ChildWorkflowCancellationType},
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
workflow_commands::{
CancelChildWorkflowExecution, CompleteWorkflowExecution, StartChildWorkflowExecution,
use temporal_sdk_core_protos::{
coresdk::{
child_workflow::{child_workflow_result, ChildWorkflowCancellationType},
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
workflow_commands::{
CancelChildWorkflowExecution, CompleteWorkflowExecution, StartChildWorkflowExecution,
},
workflow_completion::WorkflowActivationCompletion,
},
workflow_completion::WorkflowActivationCompletion,
temporal::api::{enums::v1::CommandType, sdk::v1::UserMetadata},
};
use tokio::join;

Expand Down Expand Up @@ -220,3 +223,57 @@ async fn cancel_already_complete_child_ignored() {
.await
.unwrap();
}

#[tokio::test]
async fn pass_child_workflow_summary_to_metadata() {
let wf_id = "1";
let wf_type = DEFAULT_WORKFLOW_TYPE;
let t = canned_histories::single_child_workflow(wf_id);
let mut mock_cfg = MockPollCfg::from_hist_builder(t);
let expected_user_metadata = Some(UserMetadata {
summary: Some(b"child summary".into()),
details: Some(b"child details".into()),
});
mock_cfg.completion_asserts_from_expectations(|mut asserts| {
asserts
.then(move |wft| {
assert_eq!(wft.commands.len(), 1);
assert_eq!(
wft.commands[0].command_type(),
CommandType::StartChildWorkflowExecution
);
assert_eq!(wft.commands[0].user_metadata, expected_user_metadata)
})
.then(move |wft| {
assert_eq!(wft.commands.len(), 1);
assert_eq!(
wft.commands[0].command_type(),
CommandType::CompleteWorkflowExecution
);
});
});

let mut worker = mock_sdk_cfg(mock_cfg, |_| {});
worker.register_wf(wf_type, move |ctx: WfContext| async move {
ctx.child_workflow(ChildWorkflowOptions {
workflow_id: wf_id.to_string(),
workflow_type: "child".to_string(),
static_summary: Some("child summary".to_string()),
static_details: Some("child details".to_string()),
..Default::default()
})
.start(&ctx)
.await;
Ok(().into())
});
worker
.submit_wf(
wf_id.to_owned(),
wf_type.to_owned(),
vec![],
WorkflowOptions::default(),
)
.await
.unwrap();
worker.run_until_done().await.unwrap();
}
1 change: 1 addition & 0 deletions core/src/core_tests/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ async fn replay_with_signal_and_update_same_task() {
StartTimer {
seq: 1,
start_to_fire_timeout: Some(prost_dur!(from_secs(1))),
summary: None,
}
.into(),
UpdateResponse {
Expand Down
2 changes: 2 additions & 0 deletions core/src/core_tests/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async fn after_shutdown_of_worker_get_shutdown_err() {
workflow_command::Variant::StartTimer(StartTimer {
seq: 1,
start_to_fire_timeout: Some(prost_dur!(from_secs(1))),
summary: None,
}),
))
.await
Expand Down Expand Up @@ -352,6 +353,7 @@ async fn worker_shutdown_api(#[case] use_cache: bool, #[case] api_success: bool)
workflow_command::Variant::StartTimer(StartTimer {
seq: 1,
start_to_fire_timeout: Some(prost_dur!(from_secs(1))),
summary: None,
}),
))
.await
Expand Down
50 changes: 49 additions & 1 deletion core/src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{
time::Duration,
};
use temporal_client::WorkflowOptions;
use temporal_sdk::{ActivityOptions, CancellableFuture, WfContext};
use temporal_sdk::{ActivityOptions, CancellableFuture, TimerOptions, WfContext};
use temporal_sdk_core_api::{
errors::PollWfError,
worker::{
Expand Down Expand Up @@ -64,6 +64,7 @@ use temporal_sdk_core_protos::{
history_event, TimerFiredEventAttributes,
WorkflowPropertiesModifiedExternallyEventAttributes,
},
sdk::v1::UserMetadata,
workflowservice::v1::{
GetWorkflowExecutionHistoryResponse, RespondWorkflowTaskCompletedResponse,
},
Expand Down Expand Up @@ -3086,3 +3087,50 @@ async fn slot_provider_cant_hand_out_more_permits_than_cache_size() {
// polling is not exceeding the task limit
assert_eq!(popped_tasks.load(Ordering::Relaxed), 10);
}

#[tokio::test]
async fn pass_timer_summary_to_metadata() {
let t = canned_histories::single_timer("1");
let mut mock_cfg = MockPollCfg::from_hist_builder(t);
let wf_id = mock_cfg.hists[0].wf_id.clone();
let wf_type = DEFAULT_WORKFLOW_TYPE;
let expected_user_metadata = Some(UserMetadata {
summary: Some(b"timer summary".into()),
details: None,
});
mock_cfg.completion_asserts_from_expectations(|mut asserts| {
asserts
.then(move |wft| {
assert_eq!(wft.commands.len(), 1);
assert_eq!(wft.commands[0].command_type(), CommandType::StartTimer);
assert_eq!(wft.commands[0].user_metadata, expected_user_metadata)
})
.then(move |wft| {
assert_eq!(wft.commands.len(), 1);
assert_eq!(
wft.commands[0].command_type(),
CommandType::CompleteWorkflowExecution
);
});
});

let mut worker = mock_sdk_cfg(mock_cfg, |_| {});
worker.register_wf(wf_type, |ctx: WfContext| async move {
ctx.timer(TimerOptions {
duration: Duration::from_secs(1),
summary: Some("timer summary".to_string()),
})
.await;
Ok(().into())
});
worker
.submit_wf(
wf_id.to_owned(),
wf_type.to_owned(),
vec![],
WorkflowOptions::default(),
)
.await
.unwrap();
worker.run_until_done().await.unwrap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use temporal_sdk_core_protos::{
ActivityTaskCompletedEventAttributes, ActivityTaskFailedEventAttributes,
ActivityTaskTimedOutEventAttributes,
},
sdk::v1::UserMetadata,
},
};

Expand Down Expand Up @@ -114,6 +115,10 @@ impl ActivityMachine {
internal_flags: InternalFlagsRef,
use_compatible_version: bool,
) -> NewMachineWithCommand {
let user_metadata = attrs.summary.clone().map(|x| UserMetadata {
summary: Some(x),
details: None,
});
let mut s = Self::from_parts(
Created {}.into(),
SharedState {
Expand All @@ -134,7 +139,7 @@ impl ActivityMachine {
s.shared_state().attrs.clone(),
use_compatible_version,
)),
user_metadata: Default::default(),
user_metadata,
};
NewMachineWithCommand {
command,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use temporal_sdk_core_protos::{
ChildWorkflowExecutionStartedEventAttributes,
StartChildWorkflowExecutionFailedEventAttributes,
},
sdk::v1::UserMetadata,
},
};

Expand Down Expand Up @@ -449,6 +450,15 @@ impl ChildWorkflowMachine {
cancelled_before_sent: false,
},
);
let user_metadata = if attribs.static_summary.is_some() || attribs.static_details.is_some()
{
Some(UserMetadata {
summary: attribs.static_summary.clone(),
details: attribs.static_details.clone(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little nitpicky, but, instead of cloning here, since these values aren't used by start_child_workflow_cmd_to_api, let's instead https://doc.rust-lang.org/std/mem/fn.take.html the string out and we avoid a clone

})
} else {
None
};
OnEventWrapper::on_event_mut(&mut s, ChildWorkflowMachineEvents::Schedule)
.expect("Scheduling child workflows doesn't fail");
let cmd = Command {
Expand All @@ -457,7 +467,7 @@ impl ChildWorkflowMachine {
attribs,
use_compatible_version,
)),
user_metadata: Default::default(),
user_metadata,
};
NewMachineWithCommand {
command: cmd,
Expand Down
9 changes: 7 additions & 2 deletions core/src/worker/workflow/machines/timer_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use temporal_sdk_core_protos::{
command::v1::Command,
enums::v1::{CommandType, EventType},
history::v1::{history_event, TimerFiredEventAttributes},
sdk::v1::UserMetadata,
},
};

Expand Down Expand Up @@ -73,13 +74,17 @@ pub(super) fn new_timer(attribs: StartTimer) -> NewMachineWithCommand {
impl TimerMachine {
/// Create a new timer and immediately schedule it
fn new_scheduled(attribs: StartTimer) -> (Self, Command) {
let user_metadata = attribs.summary.clone().map(|x| UserMetadata {
summary: Some(x),
details: None,
});
let mut s = Self::new(attribs);
OnEventWrapper::on_event_mut(&mut s, TimerMachineEvents::Schedule)
.expect("Scheduling timers doesn't fail");
let cmd = Command {
command_type: CommandType::StartTimer as i32,
attributes: Some(s.shared_state().attrs.into()),
user_metadata: Default::default(),
attributes: Some(s.shared_state().attrs.clone().into()),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clone shouldn't be needed, and we don't want to clone the entire attributes for commands, generally speaking, as it can have a lot of stuff in it (though, for timer, that's likely not the case)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without the clone, I get the error

cannot move out of a shared reference
move occurs because value has type `temporal_sdk_core_protos::coresdk::workflow_commands::StartTimer`, which does not implement the `Copy` traitrustc[Click for full compiler diagnostic](rust-analyzer-diagnostics-view:/diagnostic message [0]?0#file:///Users/andrewyuan/temporal/core/core/src/worker/workflow/machines/timer_state_machine.rs)

Looks like we clone the attrs of Activity https://github.com/yuandrew/sdk-core/blob/dd37e2946a385090ded0fc3312180027002d4b16/core/src/worker/workflow/machines/activity_state_machine.rs#L134

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, curious why we're even getting this error in the first place.. the line I added for user_metadata doesn't involve s directly, but maybe because I'm cloning attribs.summary, it messes with the borrow checker, when s is created?let mut s = Self::new(attribs);

Is there a different workaround here, or would we have to clone (or implement Copy) here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah I didn't see it getting moved later. This clone is fine to keep then, but we could change the line above to also take the summary.

And yeah, in general I want to reduce how many places lots of various strings get cloned, like the place you linked, but that's a bigger project. We may not ever do it since ultimately it's not that big of a deal but, I want to avoid adding more spots where we unnecessarily clone.

user_metadata,
};
(s, cmd)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ message StartTimer {
// Lang's incremental sequence number, used as the operation identifier
uint32 seq = 1;
google.protobuf.Duration start_to_fire_timeout = 2;
// Summary that is stored as user_metadata
temporal.api.common.v1.Payload summary = 3;
}

message CancelTimer {
Expand Down Expand Up @@ -88,6 +90,8 @@ message ScheduleActivity {
bool do_not_eagerly_execute = 14;
// Whether this activity should run on a worker with a compatible build id or not.
coresdk.common.VersioningIntent versioning_intent = 15;
// Summary that is stored as user_metadata
temporal.api.common.v1.Payload summary = 16;
}

message ScheduleLocalActivity {
Expand Down Expand Up @@ -253,6 +257,10 @@ message StartChildWorkflowExecution {
child_workflow.ChildWorkflowCancellationType cancellation_type = 18;
// Whether this child should run on a worker with a compatible build id or not.
coresdk.common.VersioningIntent versioning_intent = 19;
// Static summary of the child workflow
temporal.api.common.v1.Payload static_summary = 20;
// Static details of the child workflow
temporal.api.common.v1.Payload static_details = 21;
}

// Cancel a child workflow
Expand Down
2 changes: 1 addition & 1 deletion sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use tracing::{field, Instrument, Span};
pub use workflow_context::{
ActivityOptions, CancellableFuture, ChildWorkflow, ChildWorkflowOptions, LocalActivityOptions,
PendingChildWorkflow, Signal, SignalData, SignalWorkflowOptions, StartedChildWorkflow,
WfContext,
TimerOptions, WfContext,
};

use crate::{interceptors::WorkerInterceptor, workflow_context::ChildWfCommon};
Expand Down
Loading
Loading