Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backoff failed workflow task #2548

Merged
merged 3 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 73 additions & 11 deletions host/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,18 +585,16 @@ func (s *clientIntegrationSuite) Test_ActivityTimeouts() {
//s.printHistory(id, workflowRun.GetRunID())
}

// This test simulates workflow try to complete itself while there is buffered event.
// Event sequence:
// 1st WorkflowTask runs a local activity.
// While local activity is running, a signal is received by server.
// After signal is received, local activity completed, and workflow drains signal chan (no signal yet) and complete workflow.
// Server failed the complete request because there is unhandled signal.
// Server rescheduled a new workflow task.
// Workflow runs the local activity again and drain the signal chan (with one signal) and complete workflow.
// Server complete workflow as requested.
func (s *clientIntegrationSuite) Test_UnhandledCommandAndNewTask() {
/*
Event sequence:
1st WorkflowTask runs a local activity.
While local activity is running, a signal is received by server.
After signal is received, local activity completed, and workflow drains signal chan (no signal yet) and complete workflow.
Server failed the complete request because there is unhandled signal.
Server rescheduled a new workflow task.
Workflow runs the local activity again and drain the signal chan (with one signal) and complete workflow.
Server complete workflow as requested.
*/

sigReadyToSendChan := make(chan struct{}, 1)
sigSendDoneChan := make(chan struct{})
localActivityFn := func(ctx context.Context) error {
Expand Down Expand Up @@ -694,6 +692,70 @@ func (s *clientIntegrationSuite) Test_UnhandledCommandAndNewTask() {
s.assertHistory(id, workflowRun.GetRunID(), expectedHistory)
}

// This test simulates workflow generate command with invalid attributes.
// Server is expected to fail the workflow task and schedule a retry immediately for first attempt,
// but if workflow task keeps failing, server will drop the task and wait for timeout to schedule additional retries.
// This is the same behavior as the SDK used to do, but now we would do on server.
func (s *clientIntegrationSuite) Test_InvalidCommandAttribute() {
activityFn := func(ctx context.Context) error {
return nil
}

var calledTime []time.Time
workflowFn := func(ctx workflow.Context) error {
calledTime = append(calledTime, time.Now().UTC())
ao := workflow.ActivityOptions{} // invalid activity option without StartToClose timeout
ctx = workflow.WithActivityOptions(ctx, ao)

err := workflow.ExecuteActivity(ctx, activityFn).Get(ctx, nil)
return err
}

s.worker.RegisterWorkflow(workflowFn)
s.worker.RegisterActivity(activityFn)

id := "integration-test-invalid-command-attributes"
workflowOptions := sdkclient.StartWorkflowOptions{
ID: id,
TaskQueue: s.taskQueue,
// With 3s TaskTimeout and 5s RunTimeout, we expect to see total of 3 attempts.
// First attempt follow by immediate retry follow by timeout and 3rd attempt after WorkflowTaskTimeout.
WorkflowTaskTimeout: 3 * time.Second,
WorkflowRunTimeout: 5 * time.Second,
}

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
workflowRun, err := s.sdkClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn)
if err != nil {
s.Logger.Fatal("Start workflow failed with err", tag.Error(err))
}

s.NotNil(workflowRun)
s.True(workflowRun.GetRunID() != "")

// wait until workflow close (it will be timeout)
err = workflowRun.Get(ctx, nil)
s.Error(err)
s.Contains(err.Error(), "timeout")

// verify event sequence
expectedHistory := []enumspb.EventType{
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED,
enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED,
enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED,
enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED,
enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT,
}
s.assertHistory(id, workflowRun.GetRunID(), expectedHistory)

// assert workflow task retried 3 times
s.Equal(3, len(calledTime))

s.True(calledTime[1].Sub(calledTime[0]) < time.Second) // retry immediately
s.True(calledTime[2].Sub(calledTime[1]) > time.Second*3) // retry after WorkflowTaskTimeout
}

func (s *clientIntegrationSuite) Test_BufferedQuery() {
localActivityFn := func(ctx context.Context) error {
time.Sleep(5 * time.Second) // use local activity sleep to block workflow task to force query to be buffered
Expand Down
4 changes: 4 additions & 0 deletions service/history/workflowTaskHandlerCallbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,10 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted(
tag.WorkflowID(token.GetWorkflowId()),
tag.WorkflowRunID(token.GetRunId()),
tag.WorkflowNamespaceID(namespaceID.String()))
if currentWorkflowTask.Attempt > 1 {
// drop this workflow task if it keeps failing. This will cause the workflow task to timeout and get retried after timeout.
return nil, serviceerror.NewInvalidArgument(wtFailedCause.Message())
}
msBuilder, err = handler.historyEngine.failWorkflowTask(weContext, scheduleID, startedID, wtFailedCause, request)
if err != nil {
return nil, err
Expand Down