diff --git a/host/client_integration_test.go b/host/client_integration_test.go index 6991ed0bbe6..d76f92decc1 100644 --- a/host/client_integration_test.go +++ b/host/client_integration_test.go @@ -585,18 +585,16 @@ func (s *clientIntegrationSuite) Test_ActivityTimeouts() { //s.printHistory(id, workflowRun.GetRunID()) } +// This test simulates workflow try to complete itself while there is buffered event. +// Event sequence: +// 1st WorkflowTask runs a local activity. +// While local activity is running, a signal is received by server. +// After signal is received, local activity completed, and workflow drains signal chan (no signal yet) and complete workflow. +// Server failed the complete request because there is unhandled signal. +// Server rescheduled a new workflow task. +// Workflow runs the local activity again and drain the signal chan (with one signal) and complete workflow. +// Server complete workflow as requested. func (s *clientIntegrationSuite) Test_UnhandledCommandAndNewTask() { - /* - Event sequence: - 1st WorkflowTask runs a local activity. - While local activity is running, a signal is received by server. - After signal is received, local activity completed, and workflow drains signal chan (no signal yet) and complete workflow. - Server failed the complete request because there is unhandled signal. - Server rescheduled a new workflow task. - Workflow runs the local activity again and drain the signal chan (with one signal) and complete workflow. - Server complete workflow as requested. - */ - sigReadyToSendChan := make(chan struct{}, 1) sigSendDoneChan := make(chan struct{}) localActivityFn := func(ctx context.Context) error { @@ -694,6 +692,70 @@ func (s *clientIntegrationSuite) Test_UnhandledCommandAndNewTask() { s.assertHistory(id, workflowRun.GetRunID(), expectedHistory) } +// This test simulates workflow generate command with invalid attributes. +// Server is expected to fail the workflow task and schedule a retry immediately for first attempt, +// but if workflow task keeps failing, server will drop the task and wait for timeout to schedule additional retries. +// This is the same behavior as the SDK used to do, but now we would do on server. +func (s *clientIntegrationSuite) Test_InvalidCommandAttribute() { + activityFn := func(ctx context.Context) error { + return nil + } + + var calledTime []time.Time + workflowFn := func(ctx workflow.Context) error { + calledTime = append(calledTime, time.Now().UTC()) + ao := workflow.ActivityOptions{} // invalid activity option without StartToClose timeout + ctx = workflow.WithActivityOptions(ctx, ao) + + err := workflow.ExecuteActivity(ctx, activityFn).Get(ctx, nil) + return err + } + + s.worker.RegisterWorkflow(workflowFn) + s.worker.RegisterActivity(activityFn) + + id := "integration-test-invalid-command-attributes" + workflowOptions := sdkclient.StartWorkflowOptions{ + ID: id, + TaskQueue: s.taskQueue, + // With 3s TaskTimeout and 5s RunTimeout, we expect to see total of 3 attempts. + // First attempt follow by immediate retry follow by timeout and 3rd attempt after WorkflowTaskTimeout. + WorkflowTaskTimeout: 3 * time.Second, + WorkflowRunTimeout: 5 * time.Second, + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + workflowRun, err := s.sdkClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn) + if err != nil { + s.Logger.Fatal("Start workflow failed with err", tag.Error(err)) + } + + s.NotNil(workflowRun) + s.True(workflowRun.GetRunID() != "") + + // wait until workflow close (it will be timeout) + err = workflowRun.Get(ctx, nil) + s.Error(err) + s.Contains(err.Error(), "timeout") + + // verify event sequence + expectedHistory := []enumspb.EventType{ + enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_STARTED, + enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED, + enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED, + enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED, + enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT, + } + s.assertHistory(id, workflowRun.GetRunID(), expectedHistory) + + // assert workflow task retried 3 times + s.Equal(3, len(calledTime)) + + s.True(calledTime[1].Sub(calledTime[0]) < time.Second) // retry immediately + s.True(calledTime[2].Sub(calledTime[1]) > time.Second*3) // retry after WorkflowTaskTimeout +} + func (s *clientIntegrationSuite) Test_BufferedQuery() { localActivityFn := func(ctx context.Context) error { time.Sleep(5 * time.Second) // use local activity sleep to block workflow task to force query to be buffered diff --git a/service/history/workflowTaskHandlerCallbacks.go b/service/history/workflowTaskHandlerCallbacks.go index 049c3a91eaa..5a42224f081 100644 --- a/service/history/workflowTaskHandlerCallbacks.go +++ b/service/history/workflowTaskHandlerCallbacks.go @@ -490,6 +490,10 @@ func (handler *workflowTaskHandlerCallbacksImpl) handleWorkflowTaskCompleted( tag.WorkflowID(token.GetWorkflowId()), tag.WorkflowRunID(token.GetRunId()), tag.WorkflowNamespaceID(namespaceID.String())) + if currentWorkflowTask.Attempt > 1 { + // drop this workflow task if it keeps failing. This will cause the workflow task to timeout and get retried after timeout. + return nil, serviceerror.NewInvalidArgument(wtFailedCause.Message()) + } msBuilder, err = handler.historyEngine.failWorkflowTask(weContext, scheduleID, startedID, wtFailedCause, request) if err != nil { return nil, err