Skip to content

Commit

Permalink
Reduce maptask transitions between WaitingForResources and CheckingSu…
Browse files Browse the repository at this point in the history
…btaskExecutions (#4790)

* treating PhaseWaitingForResources and PhaseCheckingSubTskExecutions as the same

Signed-off-by: Daniel Rammer <[email protected]>

* using PhaseCheckingSubTaskExecutions only

Signed-off-by: Daniel Rammer <[email protected]>

* fixed unit test

Signed-off-by: Daniel Rammer <[email protected]>

---------

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Jan 30, 2024
1 parent ffd3ab2 commit f8a1317
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
6 changes: 3 additions & 3 deletions flyteplugins/go/tasks/plugins/array/k8s/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,16 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c
case arrayCore.PhasePreLaunch:
nextState = pluginState.SetPhase(arrayCore.PhaseLaunch, version+1).SetReason("Nothing to do in PreLaunch phase.")

case arrayCore.PhaseWaitingForResources:
fallthrough

case arrayCore.PhaseLaunch:
// In order to maintain backwards compatibility with the state transitions
// in the aws batch plugin. Forward to PhaseCheckingSubTasksExecutions where the launching
// is actually occurring.
nextState = pluginState.SetPhase(arrayCore.PhaseCheckingSubTaskExecutions, version+1).SetReason("Nothing to do in Launch phase.")
err = nil

case arrayCore.PhaseWaitingForResources:
fallthrough

case arrayCore.PhaseCheckingSubTaskExecutions:
nextState, externalResources, err = LaunchAndCheckSubTasksState(ctx, tCtx, e.kubeClient, pluginConfig,
tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), pluginState)
Expand Down
4 changes: 2 additions & 2 deletions flyteplugins/go/tasks/plugins/array/k8s/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
}

_, version := currentState.GetPhase()
if phase == arrayCore.PhaseCheckingSubTaskExecutions {
if phase == arrayCore.PhaseCheckingSubTaskExecutions || phase == arrayCore.PhaseWaitingForResources {
newSubTaskPhaseHash, err := newState.GetArrayStatus().HashCode()
if err != nil {
return currentState, externalResources, err
Expand All @@ -316,7 +316,7 @@ func LaunchAndCheckSubTasksState(ctx context.Context, tCtx core.TaskExecutionCon
version++
}

newState = newState.SetPhase(phase, version).SetReason("Task is still running")
newState = newState.SetPhase(arrayCore.PhaseCheckingSubTaskExecutions, version).SetReason("Task is still running")
} else {
newState = newState.SetPhase(phase, version+1)
}
Expand Down
2 changes: 1 addition & 1 deletion flyteplugins/go/tasks/plugins/array/k8s/management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ func TestCheckSubTasksState(t *testing.T) {
// validate results
assert.Nil(t, err)
p, _ := newState.GetPhase()
assert.Equal(t, arrayCore.PhaseWaitingForResources.String(), p.String())
assert.Equal(t, arrayCore.PhaseCheckingSubTaskExecutions.String(), p.String())
resourceManager.AssertNumberOfCalls(t, "AllocateResource", subtaskCount)
for _, subtaskPhaseIndex := range newState.GetArrayStatus().Detailed.GetItems() {
assert.Equal(t, core.PhaseWaitingForResources, core.Phases[subtaskPhaseIndex])
Expand Down

0 comments on commit f8a1317

Please sign in to comment.