Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Enable Resource Manager for k8s array plugin #71

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d4efa26
add resource manager
migueltol22 Mar 11, 2020
74523da
launchandmonitor func
migueltol22 Mar 20, 2020
e2c3aae
check allocation status
migueltol22 Mar 20, 2020
e12e287
fmt
migueltol22 Mar 20, 2020
e7c226d
fix build
migueltol22 Mar 20, 2020
8ed48d4
errors2
migueltol22 Mar 20, 2020
6090a7b
update state machine
migueltol22 Mar 23, 2020
3bd1ef4
upd state machine
migueltol22 Mar 24, 2020
b913806
remove phaseLaunchAndMonitor state
migueltol22 Mar 24, 2020
61f4751
generate
migueltol22 Mar 26, 2020
d39b457
initialize array if empty
migueltol22 Mar 27, 2020
78ee66d
merge conflicts
migueltol22 Mar 27, 2020
e371f27
create resource contraint spec
migueltol22 Mar 30, 2020
f2a16bf
add json doc for pflag
migueltol22 Mar 30, 2020
ae4f146
use primary label
migueltol22 Apr 1, 2020
286425c
upd config and task launch
migueltol22 Apr 3, 2020
9cd4e4e
switch to enum return status
migueltol22 Apr 6, 2020
1a5c62c
impl Monitor and Finalize in task
migueltol22 Apr 6, 2020
a72846d
use task in finalize
migueltol22 Apr 6, 2020
94f695d
upd comment
migueltol22 Apr 6, 2020
98409ba
upd Skip to Waiting
migueltol22 Apr 6, 2020
08b75e7
lint and err
migueltol22 Apr 6, 2020
ba2a74d
upd based on pr feedback
migueltol22 Apr 14, 2020
dd5db90
upd allocate to return status, throw error on no containers
migueltol22 Apr 15, 2020
438b0fd
merge with master and remove launchSubTasks
migueltol22 Apr 15, 2020
edd233e
lint
migueltol22 Apr 15, 2020
5f5d1a9
upd pflag, launch and monitor status
migueltol22 Apr 16, 2020
ad4ce20
abort then finalize
migueltol22 Apr 24, 2020
050e8b2
rm unnecessary code & update state transition
migueltol22 Apr 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion boilerplate/lyft/golang_support_tools/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
package tools

import (
_ "github.com/alvaroloes/enumer"
_ "github.com/golangci/golangci-lint/cmd/golangci-lint"
_ "github.com/lyft/flytestdlib/cli/pflags"
_ "github.com/vektra/mockery/cmd/mockery"
_ "github.com/alvaroloes/enumer"
)
9 changes: 6 additions & 3 deletions go/tasks/pluginmachinery/core/mocks/resource_negotiator.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions go/tasks/pluginmachinery/core/phase.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (p Phase) IsSuccess() bool {
return p == PhaseSuccess
}

func (p Phase) IsWaitingForResources() bool {
return p == PhaseWaitingForResources
}

type TaskInfo struct {
// log information for the task execution
Logs []*core.TaskLog
Expand Down
8 changes: 8 additions & 0 deletions go/tasks/plugins/array/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus
totalSuccesses := int64(0)
totalFailures := int64(0)
totalRunning := int64(0)
totalWaitingForResources := int64(0)
for phase, count := range summary {
totalCount += count
if phase.IsTerminal() {
Expand All @@ -238,11 +239,18 @@ func SummaryToPhase(ctx context.Context, minSuccesses int64, summary arraystatus
// TODO: preferable to auto-combine to array tasks for now.
totalFailures += count
}
} else if phase.IsWaitingForResources() {
totalWaitingForResources += count
} else {
totalRunning += count
}
}

if totalWaitingForResources > 0 {
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
logger.Infof(ctx, "Array is still running and waiting for resources totalWaitingForResources[%v]", totalWaitingForResources)
return PhaseWaitingForResources
}

if totalCount < minSuccesses {
logger.Infof(ctx, "Array failed because totalCount[%v] < minSuccesses[%v]", totalCount, minSuccesses)
return PhaseWriteToDiscoveryThenFail
Expand Down
10 changes: 10 additions & 0 deletions go/tasks/plugins/array/k8s/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@ var (
configSection = config.MustRegisterSection(configSectionKey, defaultConfig)
)

type ResourceConfig struct {
PrimaryLabel string `json:"primaryLabel" pflag:",PrimaryLabel of a given service cluster"`
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
Limit int `json:"limit" pflag:",Resource quota (in the number of outstanding requests) for the cluster"`
}

// Defines custom config for K8s Array plugin
type Config struct {
DefaultScheduler string `json:"scheduler" pflag:",Decides the scheduler to use when launching array-pods."`
MaxErrorStringLength int `json:"maxErrLength" pflag:",Determines the maximum length of the error string returned for the array."`
MaxArrayJobSize int64 `json:"maxArrayJobSize" pflag:",Maximum size of array job."`
ResourceConfig ResourceConfig `json:"resourceConfig" pflag:",ResourceConfiguration to limit number of resources used by k8s-array."`
NodeSelector map[string]string `json:"node-selector" pflag:"-,Defines a set of node selector labels to add to the pod."`
Tolerations []v1.Toleration `json:"tolerations" pflag:"-,Tolerations to be applied for k8s-array pods"`
OutputAssembler workqueue.Config
Expand All @@ -48,3 +54,7 @@ type Config struct {
func GetConfig() *Config {
return configSection.GetConfig().(*Config)
}

func IsResourceConfigSet() bool {
return GetConfig().ResourceConfig != (ResourceConfig{})
}
2 changes: 2 additions & 0 deletions go/tasks/plugins/array/k8s/config_flags.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions go/tasks/plugins/array/k8s/config_flags_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 18 additions & 5 deletions go/tasks/plugins/array/k8s/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/lyft/flyteplugins/go/tasks/plugins/array"
arrayCore "github.com/lyft/flyteplugins/go/tasks/plugins/array/core"
"github.com/lyft/flytestdlib/logger"
"github.com/lyft/flytestdlib/promutils"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery"
Expand Down Expand Up @@ -77,11 +78,15 @@ func (e Executor) Handle(ctx context.Context, tCtx core.TaskExecutionContext) (c
fallthrough

case arrayCore.PhaseLaunch:
nextState, err = LaunchSubTasks(ctx, tCtx, e.kubeClient, pluginConfig, pluginState)
// In order to maintain backwards compatibility with the state transitions
// in the aws batch plugin. Forward to PhaseCheckingSubTasksExecutions where the launching
// is actually occurring.
nextState = pluginState.SetPhase(arrayCore.PhaseCheckingSubTaskExecutions, core.DefaultPhaseVersion).SetReason("Nothing to do in Launch phase.")
err = nil

case arrayCore.PhaseCheckingSubTaskExecutions:
nextState, logLinks, err = CheckSubTasksState(ctx, tCtx, e.kubeClient, tCtx.DataStore(),
tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), pluginState)
nextState, logLinks, err = LaunchAndCheckSubTasksState(ctx, tCtx, e.kubeClient, pluginConfig,
tCtx.DataStore(), tCtx.OutputWriter().GetOutputPrefixPath(), tCtx.OutputWriter().GetRawOutputPrefix(), pluginState)

case arrayCore.PhaseAssembleFinalOutput:
nextState, err = array.AssembleFinalOutputs(ctx, e.outputsAssembler, tCtx, arrayCore.PhaseSuccess, pluginState)
Expand Down Expand Up @@ -128,8 +133,7 @@ func (e Executor) Finalize(ctx context.Context, tCtx core.TaskExecutionContext)
return errors.Wrapf(errors.CorruptedPluginState, err, "Failed to read unmarshal custom state")
}

return TerminateSubTasks(ctx, tCtx.TaskExecutionMetadata(), e.kubeClient, pluginConfig.MaxErrorStringLength,
pluginState)
return TerminateSubTasks(ctx, tCtx, e.kubeClient, pluginConfig, pluginState)
}

func (e Executor) Start(ctx context.Context) error {
Expand Down Expand Up @@ -164,5 +168,14 @@ func GetNewExecutorPlugin(ctx context.Context, iCtx core.SetupContext) (core.Plu
return nil, err
}

if IsResourceConfigSet() {
primaryLabel := GetConfig().ResourceConfig.PrimaryLabel
limit := GetConfig().ResourceConfig.Limit
if err := iCtx.ResourceRegistrar().RegisterResourceQuota(ctx, core.ResourceNamespace(primaryLabel), limit); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@EngHabu regarding the resource name: We discussed earlier that we might need to add flyte cluster information to the resource name as a preparation step for moving toward a centralized Redis. Do you think the change should be done in this PR or a separate one? cc @migueltol22 just FYI

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have been mistaken. Maybe there is no way to get the cluster name... we might have to do something in our deployment to do that... might not be so trivial... let me ask on the channel..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, found out we already pass the cluster name to spark deployment.

So we can do the same to flytepropeller to make it aware of its cluster name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great! We should then be able to move to elasticache without a problem

logger.Errorf(ctx, "Token Resource registration for [%v] failed due to error [%v]", primaryLabel, err)
return nil, err
}
}

return exec, nil
}
122 changes: 15 additions & 107 deletions go/tasks/plugins/array/k8s/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,14 @@ package k8s
import (
"context"
"fmt"
"strconv"
"strings"

"github.com/lyft/flyteplugins/go/tasks/plugins/array/errorcollector"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

arrayCore "github.com/lyft/flyteplugins/go/tasks/plugins/array/core"

arraystatus2 "github.com/lyft/flyteplugins/go/tasks/plugins/array/arraystatus"
errors2 "github.com/lyft/flytestdlib/errors"

"github.com/lyft/flytestdlib/logger"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/utils"

corev1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
)
Expand All @@ -31,6 +21,7 @@ const (
ErrSubmitJob errors2.ErrorCode = "SUBMIT_JOB_FAILED"
JobIndexVarName string = "BATCH_JOB_ARRAY_INDEX_VAR_NAME"
FlyteK8sArrayIndexVarName string = "FLYTE_K8S_ARRAY_INDEX"
ResourcesPrimaryLabel string = "token"
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
)

var arrayJobEnvVars = []corev1.EnvVar{
Expand Down Expand Up @@ -68,113 +59,30 @@ func applyPodTolerations(_ context.Context, cfg *Config, pod *corev1.Pod) *corev
return pod
}

// Launches subtasks
func LaunchSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, kubeClient core.KubeClient,
config *Config, currentState *arrayCore.State) (newState *arrayCore.State, err error) {

if int64(currentState.GetExecutionArraySize()) > config.MaxArrayJobSize {
ee := fmt.Errorf("array size > max allowed. Requested [%v]. Allowed [%v]", currentState.GetExecutionArraySize(), config.MaxArrayJobSize)
logger.Info(ctx, ee)
currentState = currentState.SetPhase(arrayCore.PhasePermanentFailure, 0).SetReason(ee.Error())
return currentState, nil
}

podTemplate, _, err := FlyteArrayJobToK8sPodTemplate(ctx, tCtx)
if err != nil {
return currentState, errors2.Wrapf(ErrBuildPodTemplate, err, "Failed to convert task template to a pod template for task")
}

var args []string
if len(podTemplate.Spec.Containers) > 0 {
args = append(podTemplate.Spec.Containers[0].Command, podTemplate.Spec.Containers[0].Args...)
podTemplate.Spec.Containers[0].Command = []string{}
}

size := currentState.GetExecutionArraySize()
// TODO: Respect parallelism param
for i := 0; i < size; i++ {
pod := podTemplate.DeepCopy()
indexStr := strconv.Itoa(i)
pod.Name = formatSubTaskName(ctx, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), indexStr)
pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, corev1.EnvVar{
Name: FlyteK8sArrayIndexVarName,
Value: indexStr,
})

pod.Spec.Containers[0].Env = append(pod.Spec.Containers[0].Env, arrayJobEnvVars...)

pod.Spec.Containers[0].Args, err = utils.ReplaceTemplateCommandArgs(ctx, args, arrayJobInputReader{tCtx.InputReader()}, tCtx.OutputWriter())
if err != nil {
return currentState, errors2.Wrapf(ErrReplaceCmdTemplate, err, "Failed to replace cmd args")
}

pod = ApplyPodPolicies(ctx, config, pod)
pod = applyNodeSelectorLabels(ctx, config, pod)
pod = applyPodTolerations(ctx, config, pod)

err = kubeClient.GetClient().Create(ctx, pod)
if err != nil && !k8serrors.IsAlreadyExists(err) {
if k8serrors.IsForbidden(err) {
if strings.Contains(err.Error(), "exceeded quota") {
// TODO: Quota errors are retried forever, it would be good to have support for backoff strategy.
logger.Infof(ctx, "Failed to launch job, resource quota exceeded. Err: %v", err)
currentState = currentState.SetPhase(arrayCore.PhaseWaitingForResources, 0).SetReason("Not enough resources to launch job.")
} else {
currentState = currentState.SetPhase(arrayCore.PhaseRetryableFailure, 0).SetReason("Failed to launch job.")
}

currentState = currentState.SetReason(err.Error())
return currentState, nil
}

return currentState, errors2.Wrapf(ErrSubmitJob, err, "Failed to submit job")
}
}

logger.Infof(ctx, "Successfully submitted Job(s) with Prefix:[%v], Count:[%v]", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), size)

arrayStatus := arraystatus2.ArrayStatus{
Summary: arraystatus2.ArraySummary{},
Detailed: arrayCore.NewPhasesCompactArray(uint(size)),
}

currentState.SetPhase(arrayCore.PhaseCheckingSubTaskExecutions, 0).SetReason("Job launched.")
currentState.SetArrayStatus(arrayStatus)

return currentState, nil
}

func TerminateSubTasks(ctx context.Context, tMeta core.TaskExecutionMetadata, kubeClient core.KubeClient,
errsMaxLength int, currentState *arrayCore.State) error {
func TerminateSubTasks(ctx context.Context, tCtx core.TaskExecutionContext, kubeClient core.KubeClient, config *Config,
EngHabu marked this conversation as resolved.
Show resolved Hide resolved
currentState *arrayCore.State) error {

size := currentState.GetExecutionArraySize()
errs := errorcollector.NewErrorMessageCollector()
for i := 0; i < size; i++ {
indexStr := strconv.Itoa(i)
podName := formatSubTaskName(ctx, tMeta.GetTaskExecutionID().GetGeneratedName(), indexStr)
pod := &corev1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: PodKind,
APIVersion: metav1.SchemeGroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: tMeta.GetNamespace(),
},
for childIdx := 0; childIdx < size; childIdx++ {
task := Task{
ChildIdx: childIdx,
Config: config,
State: currentState,
}

err := kubeClient.GetClient().Delete(ctx, pod)
err := task.Abort(ctx, tCtx, kubeClient)
if err != nil {
if k8serrors.IsNotFound(err) {
continue
}

errs.Collect(i, err.Error())
errs.Collect(childIdx, err.Error())
}
err = task.Finalize(ctx, tCtx, kubeClient)
if err != nil {
errs.Collect(childIdx, err.Error())
}
}

if errs.Length() > 0 {
return fmt.Errorf(errs.Summary(errsMaxLength))
return fmt.Errorf(errs.Summary(config.MaxErrorStringLength))
}

return nil
Expand Down
Loading