Skip to content

Commit

Permalink
Fix LA start to close timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
Quinn-With-Two-Ns committed Jul 27, 2023
1 parent 746bcf2 commit e305c8d
Show file tree
Hide file tree
Showing 9 changed files with 143 additions and 51 deletions.
54 changes: 38 additions & 16 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,13 @@ type (
// LocalActivityOptions stores local activity specific parameters that will be stored inside of a context.
LocalActivityOptions struct {
// ScheduleToCloseTimeout - The end to end timeout for the local activity including retries.
// This field is required.
// At least on of ScheduleToCloseTimeout or StartToCloseTimeout is required.
// defaults to StartToCloseTimeout if not set.
ScheduleToCloseTimeout time.Duration

// StartToCloseTimeout - The timeout for a single execution of the local activity.
// Optional: defaults to ScheduleToClose
// At least on of ScheduleToCloseTimeout or StartToCloseTimeout is required.
// defaults to ScheduleToCloseTimeout if not set.
StartToCloseTimeout time.Duration

// RetryPolicy specify how to retry activity if error happens.
Expand Down Expand Up @@ -254,25 +256,12 @@ func WithActivityTask(
contextPropagators []ContextPropagator,
interceptors []WorkerInterceptor,
) (context.Context, error) {
var deadline time.Time
scheduled := common.TimeValue(task.GetScheduledTime())
started := common.TimeValue(task.GetStartedTime())
scheduleToCloseTimeout := common.DurationValue(task.GetScheduleToCloseTimeout())
startToCloseTimeout := common.DurationValue(task.GetStartToCloseTimeout())
heartbeatTimeout := common.DurationValue(task.GetHeartbeatTimeout())

startToCloseDeadline := started.Add(startToCloseTimeout)
if scheduleToCloseTimeout > 0 {
scheduleToCloseDeadline := scheduled.Add(scheduleToCloseTimeout)
// Minimum of the two deadlines.
if scheduleToCloseDeadline.Before(startToCloseDeadline) {
deadline = scheduleToCloseDeadline
} else {
deadline = startToCloseDeadline
}
} else {
deadline = startToCloseDeadline
}
deadline := calculateActivityDeadline(scheduled, started, scheduleToCloseTimeout, startToCloseTimeout)

logger = log.With(logger,
tagActivityID, task.ActivityId,
Expand Down Expand Up @@ -333,6 +322,21 @@ func WithLocalActivityTask(
tagWorkflowID, task.params.WorkflowInfo.WorkflowExecution.ID,
tagRunID, task.params.WorkflowInfo.WorkflowExecution.RunID,
)
startedTime := time.Now()
scheduleToCloseTimeout := task.params.ScheduleToCloseTimeout
startToCloseTimeout := task.params.StartToCloseTimeout

if startToCloseTimeout == 0 {
startToCloseTimeout = scheduleToCloseTimeout
}
if scheduleToCloseTimeout == 0 {
scheduleToCloseTimeout = startToCloseTimeout
}
deadline := calculateActivityDeadline(task.scheduledTime, startedTime, scheduleToCloseTimeout, startToCloseTimeout)
if task.attempt > 1 && !task.expireTime.IsZero() && task.expireTime.Before(deadline) {
// this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout
deadline = task.expireTime
}
return newActivityContext(ctx, interceptors, &activityEnvironment{
workflowType: &workflowTypeLocal,
workflowNamespace: task.params.WorkflowInfo.Namespace,
Expand All @@ -343,6 +347,9 @@ func WithLocalActivityTask(
logger: logger,
metricsHandler: metricsHandler,
isLocalActivity: true,
deadline: deadline,
scheduledTime: task.scheduledTime,
startedTime: startedTime,
dataConverter: dataConverter,
attempt: task.attempt,
})
Expand Down Expand Up @@ -375,3 +382,18 @@ func newActivityContext(

return ctx, nil
}

func calculateActivityDeadline(scheduled, started time.Time, scheduleToCloseTimeout, startToCloseTimeout time.Duration) time.Time {
startToCloseDeadline := started.Add(startToCloseTimeout)
if scheduleToCloseTimeout > 0 {
scheduleToCloseDeadline := scheduled.Add(scheduleToCloseTimeout)
// Minimum of the two deadlines.
if scheduleToCloseDeadline.Before(startToCloseDeadline) {
return scheduleToCloseDeadline
} else {
return startToCloseDeadline
}
} else {
return startToCloseDeadline
}
}
3 changes: 2 additions & 1 deletion internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ func getValidatedLocalActivityOptions(ctx Context) (*ExecuteLocalActivityOptions
}
if p.ScheduleToCloseTimeout == 0 {
p.ScheduleToCloseTimeout = p.StartToCloseTimeout
} else {
}
if p.StartToCloseTimeout == 0 {
p.StartToCloseTimeout = p.ScheduleToCloseTimeout
}
return p, nil
Expand Down
42 changes: 23 additions & 19 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,19 +172,21 @@ type (

localActivityTask struct {
sync.Mutex
workflowTask *workflowTask
activityID string
params *ExecuteLocalActivityParams
callback LocalActivityResultHandler
wc *workflowExecutionContextImpl
canceled bool
cancelFunc func()
attempt int32 // attempt starting from 1
attemptsThisWFT uint32 // Number of attempts started during this workflow task
pastFirstWFT bool // Set true once this LA has lived for more than one workflow task
retryPolicy *RetryPolicy
expireTime time.Time
header *commonpb.Header
workflowTask *workflowTask
activityID string
params *ExecuteLocalActivityParams
callback LocalActivityResultHandler
wc *workflowExecutionContextImpl
canceled bool
cancelFunc func()
attempt int32 // attempt starting from 1
attemptsThisWFT uint32 // Number of attempts started during this workflow task
pastFirstWFT bool // Set true once this LA has lived for more than one workflow task
retryPolicy *RetryPolicy
expireTime time.Time
scheduledTime time.Time // Time the activity was scheduled initially.
currentAttemptScheduledTime time.Time // Time this attempt of the activity was scheduled.
header *commonpb.Header
}

localActivityMarkerData struct {
Expand Down Expand Up @@ -683,12 +685,14 @@ func (wc *workflowEnvironmentImpl) ExecuteLocalActivity(params ExecuteLocalActiv

func newLocalActivityTask(params ExecuteLocalActivityParams, callback LocalActivityResultHandler, activityID string) *localActivityTask {
task := &localActivityTask{
activityID: activityID,
params: &params,
callback: callback,
retryPolicy: params.RetryPolicy,
attempt: params.Attempt,
header: params.Header,
activityID: activityID,
params: &params,
callback: callback,
retryPolicy: params.RetryPolicy,
attempt: params.Attempt,
header: params.Header,
scheduledTime: time.Now(),
currentAttemptScheduledTime: time.Now(),
}

if params.ScheduleToCloseTimeout > 0 {
Expand Down
5 changes: 5 additions & 0 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,7 @@ processWorkflowLoop:
}

laRetry.attempt++
laRetry.currentAttemptScheduledTime = time.Now()

if !wth.laTunnel.sendTask(laRetry) {
laRetry.attempt--
Expand Down Expand Up @@ -1194,6 +1195,10 @@ func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workfl
task := eventHandler.pendingLaTasks[activityID]
task.wc = w
task.workflowTask = workflowTask

task.scheduledTime = time.Now()
task.currentAttemptScheduledTime = task.scheduledTime

if !w.laTunnel.sendTask(task) {
unstartedLaTasks[activityID] = struct{}{}
task.wc = nil
Expand Down
23 changes: 9 additions & 14 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -564,18 +564,8 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
return &localActivityResult{task: task, err: err}
}

timeout := task.params.ScheduleToCloseTimeout
if task.params.StartToCloseTimeout != 0 && task.params.StartToCloseTimeout < timeout {
timeout = task.params.StartToCloseTimeout
}
timeoutDuration := timeout
deadline := time.Now().Add(timeoutDuration)
if task.attempt > 1 && !task.expireTime.IsZero() && task.expireTime.Before(deadline) {
// this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout
deadline = task.expireTime
}

ctx, cancel := context.WithDeadline(ctx, deadline)
info := getActivityEnv(ctx)
ctx, cancel := context.WithDeadline(ctx, info.deadline)
defer cancel()

task.Lock()
Expand Down Expand Up @@ -617,13 +607,14 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi
laResult, err = ae.ExecuteWithActualArgs(ctx, task.params.InputArgs)
executionLatency := time.Since(laStartTime)
metricsHandler.Timer(metrics.LocalActivityExecutionLatency).Record(executionLatency)
if executionLatency > timeoutDuration {
if time.Now().After(info.deadline) {
// If local activity takes longer than expected timeout, the context would already be DeadlineExceeded and
// the result would be discarded. Print a warning in this case.
lath.logger.Warn("LocalActivity takes too long to complete.",
"LocalActivityID", task.activityID,
"LocalActivityType", activityType,
"ScheduleToCloseTimeout", task.params.ScheduleToCloseTimeout,
"StartToCloseTimeout", task.params.StartToCloseTimeout,
"ActualExecutionDuration", executionLatency)
}
}(doneCh)
Expand All @@ -644,7 +635,11 @@ WaitResult:
metricsHandler.Counter(metrics.LocalActivityExecutionCanceledCounter).Inc(1)
return &localActivityResult{err: ErrCanceled, task: task}
} else if ctx.Err() == context.DeadlineExceeded {
return &localActivityResult{err: ErrDeadlineExceeded, task: task}
if task.params.ScheduleToCloseTimeout != 0 && time.Now().After(info.scheduledTime.Add(task.params.ScheduleToCloseTimeout)) {
return &localActivityResult{err: ErrDeadlineExceeded, task: task}
} else {
return &localActivityResult{err: NewTimeoutError("deadline exceeded", enumspb.TIMEOUT_TYPE_START_TO_CLOSE, nil), task: task}
}
} else {
// should not happen
return &localActivityResult{err: NewApplicationError("unexpected context done", "", true, nil), task: task}
Expand Down
4 changes: 3 additions & 1 deletion internal/internal_workflow_testsuite.go
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,9 @@ func (env *testWorkflowEnvironmentImpl) executeLocalActivity(
params: &params,
callback: func(lar *LocalActivityResultWrapper) {
},
attempt: 1,
attempt: 1,
scheduledTime: time.Now(),
currentAttemptScheduledTime: time.Now(),
}
taskHandler := localActivityTaskHandler{
userContext: env.workerOptions.BackgroundActivityContext,
Expand Down
18 changes: 18 additions & 0 deletions test/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,15 @@ func (a *Activities) Sleep(_ context.Context, delay time.Duration) error {
return nil
}

func (a *Activities) SleepN(ctx context.Context, delay time.Duration, times int) (int32, error) {
a.append("sleepN")
if activity.GetInfo(ctx).Attempt >= int32(times) {
return activity.GetInfo(ctx).Attempt, nil
}
time.Sleep(delay)
return activity.GetInfo(ctx).Attempt, nil
}

func LocalSleep(_ context.Context, delay time.Duration) error {
time.Sleep(delay)
return nil
Expand Down Expand Up @@ -169,6 +178,15 @@ func (a *Activities) InspectActivityInfo(ctx context.Context, namespace, taskQue
if info.TaskQueue != taskQueue {
return fmt.Errorf("expected taskQueue %v but got %v", taskQueue, info.TaskQueue)
}
if info.Deadline.IsZero() {
return errors.New("expected non zero deadline")
}
if info.StartedTime.IsZero() {
return errors.New("expected non zero started time")
}
if info.ScheduledTime.IsZero() {
return errors.New("expected non zero scheduled time")
}
if info.IsLocalActivity != isLocalActivity {
return fmt.Errorf("expected IsLocalActivity %v but got %v", isLocalActivity, info.IsLocalActivity)
}
Expand Down
4 changes: 4 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,10 @@ func (ts *IntegrationTestSuite) TestWorkflowWithParallelSideEffects() {
ts.NoError(ts.executeWorkflow("test-wf-parallel-side-effects", ts.workflows.WorkflowWithParallelSideEffects, nil))
}

func (ts *IntegrationTestSuite) TestWorkflowWithLocalActivityStartToClose() {
ts.NoError(ts.executeWorkflow("test-wf-la-start-to-close", ts.workflows.WorkflowWithLocalActivityStartToCloseTimeout, nil))
}

func (ts *IntegrationTestSuite) TestWorkflowWithParallelSideEffectsUsingReplay() {
replayer := worker.NewWorkflowReplayer()
replayer.RegisterWorkflowWithOptions(ts.workflows.WorkflowWithParallelSideEffects, workflow.RegisterOptions{DisableAlreadyRegisteredCheck: true})
Expand Down
41 changes: 41 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,46 @@ func (w *Workflows) WorkflowWithParallelLongLocalActivityAndHeartbeat(ctx workfl
return nil
}

func (w *Workflows) WorkflowWithLocalActivityStartToCloseTimeout(ctx workflow.Context) error {
// Validate that local activities respect StartToCloseTimeout and retry correctly
ao := w.defaultLocalActivityOptions()
ao.ScheduleToCloseTimeout = 10 * time.Second
ao.StartToCloseTimeout = 1 * time.Second
ao.RetryPolicy = &temporal.RetryPolicy{
MaximumInterval: time.Second,
MaximumAttempts: 5,
}
ctx = workflow.WithLocalActivityOptions(ctx, ao)

var activities *Activities
future := workflow.ExecuteLocalActivity(ctx, activities.SleepN, 3*time.Second, 3)
var count int32
err := future.Get(ctx, &count)
if err != nil {
return err
}
if count != 3 {
return fmt.Errorf("expected 3, got %v", count)
}
// Validate the correct timeout error is returned
ao.StartToCloseTimeout = 1 * time.Second
ao.RetryPolicy = &temporal.RetryPolicy{
MaximumInterval: time.Second,
MaximumAttempts: 1,
}
ctx = workflow.WithLocalActivityOptions(ctx, ao)
future = workflow.ExecuteLocalActivity(ctx, activities.SleepN, 3*time.Second, 3)
err = future.Get(ctx, nil)
var timeoutErr *temporal.TimeoutError
if errors.As(err, &timeoutErr) {
if timeoutErr.TimeoutType() != enumspb.TIMEOUT_TYPE_START_TO_CLOSE {
return fmt.Errorf("expected start to close timeout, got %v", timeoutErr.TimeoutType())
}
return nil
}
return errors.New("expected timeout error")
}

func (w *Workflows) WorkflowWithLocalActivityRetries(ctx workflow.Context) error {
laOpts := w.defaultLocalActivityOptions()
laOpts.RetryPolicy = &internal.RetryPolicy{
Expand Down Expand Up @@ -2285,6 +2325,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.WorkflowWithLocalActivityStartWhenTimerCancel)
worker.RegisterWorkflow(w.WorkflowWithParallelSideEffects)
worker.RegisterWorkflow(w.WorkflowWithParallelMutableSideEffects)
worker.RegisterWorkflow(w.WorkflowWithLocalActivityStartToCloseTimeout)
worker.RegisterWorkflow(w.LocalActivityStaleCache)
worker.RegisterWorkflow(w.UpdateInfoWorkflow)
worker.RegisterWorkflow(w.SignalWorkflow)
Expand Down

0 comments on commit e305c8d

Please sign in to comment.