diff --git a/fasttask/plugin/plugin.go b/fasttask/plugin/plugin.go index a8689545cc..d33ec1b629 100644 --- a/fasttask/plugin/plugin.go +++ b/fasttask/plugin/plugin.go @@ -31,7 +31,10 @@ import ( const fastTaskType = "fast-task" -var statusUpdateNotFoundError = errors.New("StatusUpdateNotFound") +var ( + statusUpdateNotFoundError = errors.New("StatusUpdateNotFound") + taskContextNotFoundError = errors.New("TaskContextNotFound") +) type SubmissionPhase int @@ -285,7 +288,7 @@ func (p *Plugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (co phase, reason, err := p.fastTaskService.CheckStatus(ctx, taskID, fastTaskEnvironment.GetQueueId(), pluginState.WorkerID) now := time.Now() - if err != nil && !errors.Is(err, statusUpdateNotFoundError) { + if err != nil && !errors.Is(err, statusUpdateNotFoundError) && !errors.Is(err, taskContextNotFoundError) { return core.UnknownTransition, err } else if errors.Is(err, statusUpdateNotFoundError) && now.Sub(pluginState.LastUpdated) > GetConfig().GracePeriodStatusNotFound.Duration { // if task has not been updated within the grace period we should abort diff --git a/fasttask/plugin/service.go b/fasttask/plugin/service.go index 55dd46c756..2f8003f45f 100644 --- a/fasttask/plugin/service.go +++ b/fasttask/plugin/service.go @@ -348,7 +348,7 @@ func (f *fastTaskServiceImpl) CheckStatus(ctx context.Context, taskID, queueID, // if this plugin restarts then TaskContexts may not exist for tasks that are still active. we can // create a TaskContext here because we ensure it will be cleaned up when the task completes. f.taskStatusChannels.Store(taskID, make(chan *workerTaskStatus, GetConfig().TaskStatusBufferSize)) - return core.PhaseUndefined, "", fmt.Errorf("task context not found") + return core.PhaseUndefined, "", fmt.Errorf("task context not found: %w", taskContextNotFoundError) } taskStatusChannel := taskStatusChannelResult.(chan *workerTaskStatus) diff --git a/fasttask/plugin/service_test.go b/fasttask/plugin/service_test.go index 2d63f0591e..612b93e4aa 100644 --- a/fasttask/plugin/service_test.go +++ b/fasttask/plugin/service_test.go @@ -32,7 +32,7 @@ func TestCheckStatus(t *testing.T) { workerID: "w1", taskStatuses: nil, expectedPhase: core.PhaseUndefined, - expectedError: fmt.Errorf("task context not found"), + expectedError: fmt.Errorf("task context not found: %w", taskContextNotFoundError), }, { name: "NoUpdates",