-
Notifications
You must be signed in to change notification settings - Fork 215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix LA start to close timeout #1180
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -155,11 +155,13 @@ type ( | |
// LocalActivityOptions stores local activity specific parameters that will be stored inside of a context. | ||
LocalActivityOptions struct { | ||
// ScheduleToCloseTimeout - The end to end timeout for the local activity including retries. | ||
// This field is required. | ||
// At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required. | ||
// defaults to StartToCloseTimeout if not set. | ||
ScheduleToCloseTimeout time.Duration | ||
|
||
// StartToCloseTimeout - The timeout for a single execution of the local activity. | ||
// Optional: defaults to ScheduleToClose | ||
// At least one of ScheduleToCloseTimeout or StartToCloseTimeout is required. | ||
// defaults to ScheduleToCloseTimeout if not set. | ||
StartToCloseTimeout time.Duration | ||
|
||
// RetryPolicy specify how to retry activity if error happens. | ||
|
@@ -253,25 +255,12 @@ func WithActivityTask( | |
contextPropagators []ContextPropagator, | ||
interceptors []WorkerInterceptor, | ||
) (context.Context, error) { | ||
var deadline time.Time | ||
scheduled := common.TimeValue(task.GetScheduledTime()) | ||
started := common.TimeValue(task.GetStartedTime()) | ||
scheduleToCloseTimeout := common.DurationValue(task.GetScheduleToCloseTimeout()) | ||
startToCloseTimeout := common.DurationValue(task.GetStartToCloseTimeout()) | ||
heartbeatTimeout := common.DurationValue(task.GetHeartbeatTimeout()) | ||
|
||
startToCloseDeadline := started.Add(startToCloseTimeout) | ||
if scheduleToCloseTimeout > 0 { | ||
scheduleToCloseDeadline := scheduled.Add(scheduleToCloseTimeout) | ||
// Minimum of the two deadlines. | ||
if scheduleToCloseDeadline.Before(startToCloseDeadline) { | ||
deadline = scheduleToCloseDeadline | ||
} else { | ||
deadline = startToCloseDeadline | ||
} | ||
} else { | ||
deadline = startToCloseDeadline | ||
} | ||
deadline := calculateActivityDeadline(scheduled, started, scheduleToCloseTimeout, startToCloseTimeout) | ||
|
||
logger = log.With(logger, | ||
tagActivityID, task.ActivityId, | ||
|
@@ -332,6 +321,21 @@ func WithLocalActivityTask( | |
tagWorkflowID, task.params.WorkflowInfo.WorkflowExecution.ID, | ||
tagRunID, task.params.WorkflowInfo.WorkflowExecution.RunID, | ||
) | ||
startedTime := time.Now() | ||
scheduleToCloseTimeout := task.params.ScheduleToCloseTimeout | ||
startToCloseTimeout := task.params.StartToCloseTimeout | ||
|
||
if startToCloseTimeout == 0 { | ||
startToCloseTimeout = scheduleToCloseTimeout | ||
} | ||
if scheduleToCloseTimeout == 0 { | ||
scheduleToCloseTimeout = startToCloseTimeout | ||
} | ||
deadline := calculateActivityDeadline(task.scheduledTime, startedTime, scheduleToCloseTimeout, startToCloseTimeout) | ||
if task.attempt > 1 && !task.expireTime.IsZero() && task.expireTime.Before(deadline) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is still an open issue if we should removed this/can remove this. This There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While wrong, let's leave for now There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I agree, just wanted to call this out since when we put Go on Core this will be something we have to deal with. |
||
// this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout | ||
deadline = task.expireTime | ||
} | ||
return newActivityContext(ctx, interceptors, &activityEnvironment{ | ||
workflowType: &workflowTypeLocal, | ||
workflowNamespace: task.params.WorkflowInfo.Namespace, | ||
|
@@ -342,6 +346,9 @@ func WithLocalActivityTask( | |
logger: logger, | ||
metricsHandler: metricsHandler, | ||
isLocalActivity: true, | ||
deadline: deadline, | ||
scheduledTime: task.scheduledTime, | ||
startedTime: startedTime, | ||
dataConverter: dataConverter, | ||
attempt: task.attempt, | ||
}) | ||
|
@@ -374,3 +381,15 @@ func newActivityContext( | |
|
||
return ctx, nil | ||
} | ||
|
||
func calculateActivityDeadline(scheduled, started time.Time, scheduleToCloseTimeout, startToCloseTimeout time.Duration) time.Time { | ||
startToCloseDeadline := started.Add(startToCloseTimeout) | ||
if scheduleToCloseTimeout > 0 { | ||
scheduleToCloseDeadline := scheduled.Add(scheduleToCloseTimeout) | ||
// Minimum of the two deadlines. | ||
if scheduleToCloseDeadline.Before(startToCloseDeadline) { | ||
return scheduleToCloseDeadline | ||
} | ||
} | ||
return startToCloseDeadline | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -578,18 +578,8 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi | |
return &localActivityResult{task: task, err: err} | ||
} | ||
|
||
timeout := task.params.ScheduleToCloseTimeout | ||
if task.params.StartToCloseTimeout != 0 && task.params.StartToCloseTimeout < timeout { | ||
timeout = task.params.StartToCloseTimeout | ||
} | ||
timeoutDuration := timeout | ||
deadline := time.Now().Add(timeoutDuration) | ||
if task.attempt > 1 && !task.expireTime.IsZero() && task.expireTime.Before(deadline) { | ||
// this is attempt and expire time is before SCHEDULE_TO_CLOSE timeout | ||
deadline = task.expireTime | ||
} | ||
Comment on lines
-581
to
-590
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see anything obvious, but are there any ways that you can think of that this logic has changed with your new logic? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only change here is using the scheduled time + There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So if today I have a schedule to close timeout of 5s, my second attempt would get the full 5s (even though that's improper)? But after this change, it will only get however much time remains the beginning of the first attempt? Sorry I didn't dig into how There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Today you would have no second attempt because schedule to close is not retried There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Confirmed off-PR that second-attempt after first-attempt-immediate-failure is bounded today by |
||
|
||
ctx, cancel := context.WithDeadline(ctx, deadline) | ||
info := getActivityEnv(ctx) | ||
ctx, cancel := context.WithDeadline(ctx, info.deadline) | ||
defer cancel() | ||
|
||
task.Lock() | ||
|
@@ -631,13 +621,14 @@ func (lath *localActivityTaskHandler) executeLocalActivityTask(task *localActivi | |
laResult, err = ae.ExecuteWithActualArgs(ctx, task.params.InputArgs) | ||
executionLatency := time.Since(laStartTime) | ||
metricsHandler.Timer(metrics.LocalActivityExecutionLatency).Record(executionLatency) | ||
if executionLatency > timeoutDuration { | ||
if time.Now().After(info.deadline) { | ||
// If local activity takes longer than expected timeout, the context would already be DeadlineExceeded and | ||
// the result would be discarded. Print a warning in this case. | ||
lath.logger.Warn("LocalActivity takes too long to complete.", | ||
"LocalActivityID", task.activityID, | ||
"LocalActivityType", activityType, | ||
"ScheduleToCloseTimeout", task.params.ScheduleToCloseTimeout, | ||
"StartToCloseTimeout", task.params.StartToCloseTimeout, | ||
"ActualExecutionDuration", executionLatency) | ||
} | ||
}(doneCh) | ||
|
@@ -658,7 +649,11 @@ WaitResult: | |
metricsHandler.Counter(metrics.LocalActivityExecutionCanceledCounter).Inc(1) | ||
return &localActivityResult{err: ErrCanceled, task: task} | ||
} else if ctx.Err() == context.DeadlineExceeded { | ||
return &localActivityResult{err: ErrDeadlineExceeded, task: task} | ||
if task.params.ScheduleToCloseTimeout != 0 && time.Now().After(info.scheduledTime.Add(task.params.ScheduleToCloseTimeout)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wish there was a better way to know which timeout was hit instead of having to do math here. |
||
return &localActivityResult{err: ErrDeadlineExceeded, task: task} | ||
} else { | ||
return &localActivityResult{err: NewTimeoutError("deadline exceeded", enumspb.TIMEOUT_TYPE_START_TO_CLOSE, nil), task: task} | ||
} | ||
} else { | ||
// should not happen | ||
return &localActivityResult{err: NewApplicationError("unexpected context done", "", true, nil), task: task} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default for
ScheduleToCloseTimeout
for normal activities is infinity. We could be consistent here, but that might be a breaking change.