Skip to content

Commit

Permalink
Building task execution context if not exists on fasttask CheckStatus (
Browse files Browse the repository at this point in the history
…#325)

## Overview
This PR fixes an issue where FlytePropeller restarts and the fasttask plugin checks the status of a task execution where the task execution context does not exist. Rather than fail, it creates the task execution context (that will be cleaned up later) and returns running.

## Test Plan
This was tested locally under a variety of failure scenarios.

## Rollout Plan (if applicable)
This may be rolled out immediately.

## Upstream Changes
Should this change be upstreamed to OSS (flyteorg/flyte)? If not, please uncheck this box, which is used for auditing. Note, it is the responsibility of each developer to actually upstream their changes. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [ ] To be upstreamed to OSS

## Issue
https://linear.app/unionai/issue/COR-1128/fasttask-plugin-checks-status-of-task-execution-failure-on-restart

## Checklist
* [x] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [ ] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation
  • Loading branch information
hamersaw authored Jun 18, 2024
1 parent fc193fe commit db42532
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 4 deletions.
7 changes: 5 additions & 2 deletions fasttask/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ import (

const fastTaskType = "fast-task"

var statusUpdateNotFoundError = errors.New("StatusUpdateNotFound")
var (
statusUpdateNotFoundError = errors.New("StatusUpdateNotFound")
taskContextNotFoundError = errors.New("TaskContextNotFound")
)

type SubmissionPhase int

Expand Down Expand Up @@ -285,7 +288,7 @@ func (p *Plugin) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (co
phase, reason, err := p.fastTaskService.CheckStatus(ctx, taskID, fastTaskEnvironment.GetQueueId(), pluginState.WorkerID)

now := time.Now()
if err != nil && !errors.Is(err, statusUpdateNotFoundError) {
if err != nil && !errors.Is(err, statusUpdateNotFoundError) && !errors.Is(err, taskContextNotFoundError) {
return core.UnknownTransition, err
} else if errors.Is(err, statusUpdateNotFoundError) && now.Sub(pluginState.LastUpdated) > GetConfig().GracePeriodStatusNotFound.Duration {
// if task has not been updated within the grace period we should abort
Expand Down
2 changes: 1 addition & 1 deletion fasttask/plugin/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ func (f *fastTaskServiceImpl) CheckStatus(ctx context.Context, taskID, queueID,
// if this plugin restarts then TaskContexts may not exist for tasks that are still active. we can
// create a TaskContext here because we ensure it will be cleaned up when the task completes.
f.taskStatusChannels.Store(taskID, make(chan *workerTaskStatus, GetConfig().TaskStatusBufferSize))
return core.PhaseUndefined, "", fmt.Errorf("task context not found")
return core.PhaseUndefined, "", fmt.Errorf("task context not found: %w", taskContextNotFoundError)
}

taskStatusChannel := taskStatusChannelResult.(chan *workerTaskStatus)
Expand Down
2 changes: 1 addition & 1 deletion fasttask/plugin/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestCheckStatus(t *testing.T) {
workerID: "w1",
taskStatuses: nil,
expectedPhase: core.PhaseUndefined,
expectedError: fmt.Errorf("task context not found"),
expectedError: fmt.Errorf("task context not found: %w", taskContextNotFoundError),
},
{
name: "NoUpdates",
Expand Down

0 comments on commit db42532

Please sign in to comment.