diff --git a/internal/internal_nexus_task_handler.go b/internal/internal_nexus_task_handler.go index 02ed585dc..55164b120 100644 --- a/internal/internal_nexus_task_handler.go +++ b/internal/internal_nexus_task_handler.go @@ -45,6 +45,11 @@ import ( "go.temporal.io/sdk/log" ) +// errNexusTaskTimeout is returned when the Nexus task handler times out. +// It is used instead of context.DeadlineExceeded to allow the poller to differentiate between Nexus task handler +// timeout and other errors. +var errNexusTaskTimeout = errors.New("nexus task timeout") + func nexusHandlerError(t nexus.HandlerErrorType, message string) *nexuspb.HandlerError { return &nexuspb.HandlerError{ ErrorType: string(t), @@ -211,7 +216,7 @@ func (h *nexusTaskHandler) handleStartOperation( opres, err = h.nexusHandler.StartOperation(ctx, req.GetService(), req.GetOperation(), input, startOptions) }() if ctx.Err() != nil { - return nil, nil, ctx.Err() + return nil, nil, errNexusTaskTimeout } if err != nil { var unsuccessfulOperationErr *nexus.UnsuccessfulOperationError @@ -302,7 +307,7 @@ func (h *nexusTaskHandler) handleCancelOperation(ctx context.Context, nctx *Nexu err = h.nexusHandler.CancelOperation(ctx, req.GetService(), req.GetOperation(), req.GetOperationId(), cancelOptions) }() if ctx.Err() != nil { - return nil, nil, ctx.Err() + return nil, nil, errNexusTaskTimeout } if err != nil { err = convertKnownErrors(err) diff --git a/internal/internal_nexus_task_poller.go b/internal/internal_nexus_task_poller.go index 2fcf15c3c..39e515c71 100644 --- a/internal/internal_nexus_task_poller.go +++ b/internal/internal_nexus_task_poller.go @@ -158,8 +158,14 @@ func (ntp *nexusTaskPoller) ProcessTask(task interface{}) error { // Failure from user handler. // Special case for the start response with operation error. if err != nil { + var failureTag string + if err == errNexusTaskTimeout { + failureTag = "timeout" + } else { + failureTag = "internal_sdk_error" + } metricsHandler. - WithTags(metrics.NexusTaskFailureTags("internal_sdk_error")). + WithTags(metrics.NexusTaskFailureTags(failureTag)). Counter(metrics.NexusTaskExecutionFailedCounter). Inc(1) } else if failure != nil { diff --git a/test/nexus_test.go b/test/nexus_test.go index 041a44bfd..bbe7c0bd1 100644 --- a/test/nexus_test.go +++ b/test/nexus_test.go @@ -194,6 +194,9 @@ var syncOp = temporalnexus.NewSyncOperation("sync-op", func(ctx context.Context, return "", temporal.NewApplicationErrorWithOptions("fake app error for test", "FakeTestError", temporal.ApplicationErrorOptions{ NonRetryable: true, }) + case "timeout": + <-ctx.Done() + return "", ctx.Err() case "panic": panic("panic requested") } @@ -351,6 +354,23 @@ func TestNexusSyncOperation(t *testing.T) { tc.requireFailureCounter(t, service.Name, syncOp.Name(), "handler_error_INTERNAL") }, time.Second*3, time.Millisecond*100) }) + + t.Run("timeout", func(t *testing.T) { + _, err := nexus.ExecuteOperation(ctx, nc, syncOp, "timeout", nexus.ExecuteOperationOptions{ + // Force shorter timeout to speed up the test and get a response back. + Header: nexus.Header{nexus.HeaderRequestTimeout: "300ms"}, + }) + var handlerErr *nexus.HandlerError + require.ErrorAs(t, err, &handlerErr) + require.Equal(t, nexus.HandlerErrorTypeUpstreamTimeout, handlerErr.Type) + + require.EventuallyWithT(t, func(t *assert.CollectT) { + // NOTE metrics.NexusTaskEndToEndLatency isn't recorded on timeouts. + tc.requireTimer(t, metrics.NexusTaskScheduleToStartLatency, service.Name, syncOp.Name()) + tc.requireTimer(t, metrics.NexusTaskExecutionLatency, service.Name, syncOp.Name()) + tc.requireFailureCounter(t, service.Name, syncOp.Name(), "timeout") + }, time.Second*3, time.Millisecond*100) + }) } func TestNexusWorkflowRunOperation(t *testing.T) {