Skip to content

Commit

Permalink
Fetch whether task is interruptible as part of taskMetaData (flyteorg#68
Browse files Browse the repository at this point in the history
)

* wip

* generate mockery

* upd to IsInterruptible

* handling interruptible

* node selector and label

* tests

* .

* .

* Addressing CR comments

* .

Co-authored-by: Surinder Singh <[email protected]>
  • Loading branch information
migueltol22 and Surinder Singh authored Mar 19, 2020
1 parent e79dce8 commit 798212c
Show file tree
Hide file tree
Showing 19 changed files with 169 additions and 17 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 // indirect
golang.org/x/net v0.0.0-20200202094626-16171245cfb2
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5 // indirect
golang.org/x/tools v0.0.0-20200124170513-3f4d10fc73b4
google.golang.org/api v0.16.0 // indirect
google.golang.org/genproto v0.0.0-20200205142000-a86caf926a67 // indirect
google.golang.org/grpc v1.27.1
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200117161641-43d50277825c/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200124170513-3f4d10fc73b4 h1:BPUNhs1Rsd9Ly0hbjDwBxaNBrAyo/CKpkMcA3pkTwgg=
golang.org/x/tools v0.0.0-20200124170513-3f4d10fc73b4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898 h1:/atklqdjdhuosWIl6AIbOeHJjicWYPqR9bpxqxYG2pA=
Expand Down
4 changes: 3 additions & 1 deletion go/tasks/config_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func TestLoadConfig(t *testing.T) {
assert.Equal(t, []v1.Toleration{tolStorage}, k8sConfig.ResourceTolerations[v1.ResourceStorage])
assert.Equal(t, "1000m", k8sConfig.DefaultCPURequest)
assert.Equal(t, "1024Mi", k8sConfig.DefaultMemoryRequest)
assert.Equal(t, map[string]string{"x/interruptible": "true"}, k8sConfig.InterruptibleNodeSelector)
assert.Equal(t, "x/flyte", k8sConfig.InterruptibleTolerations[0].Key)
assert.Equal(t, "interruptible", k8sConfig.InterruptibleTolerations[0].Value)
})

t.Run("logs-config-test", func(t *testing.T) {
Expand All @@ -80,5 +83,4 @@ func TestLoadConfig(t *testing.T) {
assert.NotNil(t, spark.GetSparkConfig())
assert.NotNil(t, spark.GetSparkConfig().DefaultSparkConfig)
})

}
1 change: 1 addition & 0 deletions go/tasks/pluginmachinery/core/exec_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@ type TaskExecutionMetadata interface {
GetLabels() map[string]string
GetAnnotations() map[string]string
GetK8sServiceAccount() string
IsInterruptible() bool
}
5 changes: 0 additions & 5 deletions go/tasks/pluginmachinery/core/mocks/resource_registrar.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 32 additions & 0 deletions go/tasks/pluginmachinery/core/mocks/task_execution_metadata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions go/tasks/pluginmachinery/flytek8s/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ type K8sPluginConfig struct {
DefaultCPURequest string `json:"default-cpus" pflag:",Defines a default value for cpu for containers if not specified."`
// default memory requests for a container
DefaultMemoryRequest string `json:"default-memory" pflag:",Defines a default value for memory for containers if not specified."`
// Tolerations for interruptible k8s pods: These tolerations are added to the pods that can tolerate getting evicted from a node. We
// can leverage this for better bin-packing and using low-reliability cheaper machines.
InterruptibleTolerations []v1.Toleration `json:"interruptible-tolerations" pflag:"-,Tolerations to be applied for interruptible pods"`
// Node Selector Labels for interruptible pods: Similar to InterruptibleTolerations, these node selector labels are added for pods that can tolerate
// eviction.
InterruptibleNodeSelector map[string]string `json:"interruptible-node-selector" pflag:"-,Defines a set of node selector labels to add to the interruptible pods."`
}

// Retrieves the current k8s plugin config or default.
Expand Down
9 changes: 8 additions & 1 deletion go/tasks/pluginmachinery/flytek8s/k8s_resource_adds.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ func DecorateEnvVars(ctx context.Context, envVars []v1.EnvVar, id pluginsCore.Ta
return envVars
}

func GetTolerationsForResources(resourceRequirements ...v1.ResourceRequirements) []v1.Toleration {
func GetPodTolerations(interruptible bool, resourceRequirements ...v1.ResourceRequirements) []v1.Toleration {
// 1. Get the tolerations for the resources requested
var tolerations []v1.Toleration
resourceNames := sets.NewString()
for _, resources := range resourceRequirements {
Expand All @@ -141,5 +142,11 @@ func GetTolerationsForResources(resourceRequirements ...v1.ResourceRequirements)
tolerations = append(tolerations, v...)
}
}

// 2. Get the tolerations for interruptible pods
if interruptible && len(config.GetK8sPluginConfig().InterruptibleTolerations) > 0 {
tolerations = append(tolerations, config.GetK8sPluginConfig().InterruptibleTolerations...)
}

return tolerations
}
4 changes: 2 additions & 2 deletions go/tasks/pluginmachinery/flytek8s/k8s_resource_adds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ func TestGetTolerationsForResources(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ResourceTolerations: tt.setVal}))
if got := GetTolerationsForResources(tt.args.resources); len(got) != len(tt.want) {
t.Errorf("GetTolerationsForResources() = %v, want %v", got, tt.want)
if got := GetPodTolerations(true, tt.args.resources); len(got) != len(tt.want) {
t.Errorf("GetPodTolerations() = %v, want %v", got, tt.want)
} else {
for _, tol := range tt.want {
assert.Contains(t, got, tol)
Expand Down
27 changes: 23 additions & 4 deletions go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@ import (
v12 "k8s.io/apimachinery/pkg/apis/meta/v1"

pluginsCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io"
)

const PodKind = "pod"
const OOMKilled = "OOMKilled"
const Interrupted = "Interrupted"
const SIGKILL = 137

func ToK8sPodSpec(ctx context.Context, taskExecutionMetadata pluginsCore.TaskExecutionMetadata, taskReader pluginsCore.TaskReader,
inputs io.InputReader, outputPaths io.OutputFilePaths) (*v1.PodSpec, error) {
Expand All @@ -32,13 +35,24 @@ func ToK8sPodSpec(ctx context.Context, taskExecutionMetadata pluginsCore.TaskExe
containers := []v1.Container{
*c,
}
if taskExecutionMetadata.IsInterruptible() && len(config.GetK8sPluginConfig().InterruptibleNodeSelector) > 0 {
return &v1.PodSpec{
// We could specify Scheduler, Affinity, nodename etc
RestartPolicy: v1.RestartPolicyNever,
Containers: containers,
Tolerations: GetPodTolerations(taskExecutionMetadata.IsInterruptible(), c.Resources),
ServiceAccountName: taskExecutionMetadata.GetK8sServiceAccount(),
NodeSelector: config.GetK8sPluginConfig().InterruptibleNodeSelector,
}, nil
}
return &v1.PodSpec{
// We could specify Scheduler, Affinity, nodename etc
RestartPolicy: v1.RestartPolicyNever,
Containers: containers,
Tolerations: GetTolerationsForResources(c.Resources),
Tolerations: GetPodTolerations(taskExecutionMetadata.IsInterruptible(), c.Resources),
ServiceAccountName: taskExecutionMetadata.GetK8sServiceAccount(),
}, nil

}

func BuildPodWithSpec(podSpec *v1.PodSpec) *v1.Pod {
Expand Down Expand Up @@ -180,9 +194,9 @@ func DemystifySuccess(status v1.PodStatus, info pluginsCore.TaskInfo) (pluginsCo
return pluginsCore.PhaseInfoSuccess(&info), nil
}

func ConvertPodFailureToError(status v1.PodStatus) (string, string) {
code := "UnknownError"
message := "Container/Pod failed. No message received from kubernetes."
func ConvertPodFailureToError(status v1.PodStatus) (code, message string) {
code = "UnknownError"
message = "Container/Pod failed. No message received from kubernetes."
if len(status.Reason) > 0 {
code = status.Reason
}
Expand All @@ -202,7 +216,12 @@ func ConvertPodFailureToError(status v1.PodStatus) (string, string) {
if containerState.Terminated != nil {
if strings.Contains(c.State.Terminated.Reason, OOMKilled) {
code = OOMKilled
} else if containerState.Terminated.ExitCode == SIGKILL {
// in some setups, node termination sends SIGKILL to all the containers running on that node. Capturing and
// tagging that correctly.
code = Interrupted
}

message += fmt.Sprintf("\r\nContainer [%v] terminated with exit code (%v). Reason [%v]. Message: [%v].",
c.Name,
containerState.Terminated.ExitCode,
Expand Down
38 changes: 37 additions & 1 deletion go/tasks/pluginmachinery/flytek8s/pod_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"testing"

config1 "github.com/lyft/flytestdlib/config"
"github.com/lyft/flytestdlib/config/viper"

"github.com/lyft/flytestdlib/storage"
"github.com/stretchr/testify/mock"

Expand Down Expand Up @@ -47,7 +50,7 @@ func dummyTaskExecutionMetadata(resources *v1.ResourceRequirements) pluginsCore.
to := &pluginsCoreMock.TaskOverrides{}
to.On("GetResources").Return(resources)
taskExecutionMetadata.On("GetOverrides").Return(to)

taskExecutionMetadata.On("IsInterruptible").Return(true)
return taskExecutionMetadata
}

Expand All @@ -74,6 +77,39 @@ func dummyInputReader() io.InputReader {
return inputReader
}

func TestToK8sPodIterruptible(t *testing.T) {
ctx := context.TODO()
configAccessor := viper.NewAccessor(config1.Options{
StrictMode: true,
SearchPaths: []string{"testdata/config.yaml"},
})
err := configAccessor.UpdateConfig(context.TODO())
assert.NoError(t, err)

op := &pluginsIOMock.OutputFilePaths{}
op.On("GetOutputPrefixPath").Return(storage.DataReference(""))

x := dummyTaskExecutionMetadata(&v1.ResourceRequirements{
Limits: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
ResourceNvidiaGPU: resource.MustParse("1"),
},
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1024m"),
v1.ResourceStorage: resource.MustParse("100M"),
},
})

p, err := ToK8sPodSpec(ctx, x, dummyTaskReader(), dummyInputReader(), op)
assert.NoError(t, err)
assert.Equal(t, 2, len(p.Tolerations))
assert.Equal(t, "x/flyte", p.Tolerations[1].Key)
assert.Equal(t, "interruptible", p.Tolerations[1].Value)
assert.Equal(t, 1, len(p.NodeSelector))
assert.Equal(t, "true", p.NodeSelector["x/interruptible"])
}

func TestToK8sPod(t *testing.T) {
ctx := context.TODO()

Expand Down
37 changes: 37 additions & 0 deletions go/tasks/pluginmachinery/flytek8s/testdata/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Sample plugins config
plugins:
# Set of enabled plugins at root level
enabled-plugins:
- container
# All k8s plugins default configuration
k8s:
default-annotations:
- annotationKey1: annotationValue1
- annotationKey2: annotationValue2
default-labels:
- label1: labelValue1
- label2: labelValue2
resource-tolerations:
nvidia.com/gpu:
key: flyte/gpu
value: dedicated
operator: Equal
effect: NoSchedule
storage:
- key: storage
value: special
operator: Equal
effect: PreferNoSchedule
interruptible-node-selector:
- x/interruptible: "true"
interruptible-tolerations:
- key: x/flyte
value: interruptible
operator: Equal
effect: NoSchedule
default-env-vars:
- AWS_METADATA_SERVICE_TIMEOUT: 5
- AWS_METADATA_SERVICE_NUM_ATTEMPTS: 20
- FLYTE_AWS_ENDPOINT: "http://minio.flyte:9000"
- FLYTE_AWS_ACCESS_KEY_ID: minio
- FLYTE_AWS_SECRET_ACCESS_KEY: miniostorage
1 change: 1 addition & 0 deletions go/tasks/plugins/hive/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func GetMockTaskExecutionMetadata() core.TaskExecutionMetadata {
Kind: "node",
Name: "blah",
})
taskMetadata.On("IsInterruptible").Return(true)
taskMetadata.On("GetK8sServiceAccount").Return("service-account")
taskMetadata.On("GetOwnerID").Return(types.NamespacedName{
Namespace: "test-namespace",
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/k8s/container/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func dummyContainerTaskMetadata(resources *v1.ResourceRequirements) pluginsCore.
to := &pluginsCoreMock.TaskOverrides{}
to.On("GetResources").Return(resources)
taskMetadata.On("GetOverrides").Return(to)

taskMetadata.On("IsInterruptible").Return(true)
return taskMetadata
}

Expand Down
7 changes: 6 additions & 1 deletion go/tasks/plugins/k8s/sidecar/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"

pluginsCore "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/core"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/k8s"
Expand Down Expand Up @@ -60,7 +61,11 @@ func validateAndFinalizeContainers(

}
pod.Spec.Containers = finalizedContainers
pod.Spec.Tolerations = flytek8s.GetTolerationsForResources(resReqs...)
pod.Spec.Tolerations = flytek8s.GetPodTolerations(taskCtx.TaskExecutionMetadata().IsInterruptible(), resReqs...)
if taskCtx.TaskExecutionMetadata().IsInterruptible() && len(config.GetK8sPluginConfig().InterruptibleNodeSelector) > 0 {
pod.Spec.NodeSelector = config.GetK8sPluginConfig().InterruptibleNodeSelector
}

return &pod, nil
}

Expand Down
1 change: 1 addition & 0 deletions go/tasks/plugins/k8s/sidecar/sidecar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func dummyContainerTaskMetadata(resources *v1.ResourceRequirements) pluginsCore.
Kind: "node",
Name: "blah",
})
taskMetadata.On("IsInterruptible").Return(true)
taskMetadata.On("GetK8sServiceAccount").Return("service-account")
taskMetadata.On("GetOwnerID").Return(types.NamespacedName{
Namespace: "test-namespace",
Expand Down
2 changes: 1 addition & 1 deletion go/tasks/plugins/k8s/spark/spark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func dummySparkTaskContext(taskTemplate *core.TaskTemplate) pluginsCore.TaskExec
Kind: "node",
Name: "blah",
})

taskExecutionMetadata.On("IsInterruptible").Return(true)
taskCtx.On("TaskExecutionMetadata").Return(taskExecutionMetadata)
return taskCtx
}
Expand Down
7 changes: 7 additions & 0 deletions go/tasks/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ plugins:
value: special
operator: Equal
effect: PreferNoSchedule
interruptible-node-selector:
- x/interruptible: "true"
interruptible-tolerations:
- key: x/flyte
value: interruptible
operator: Equal
effect: NoSchedule
default-env-vars:
- AWS_METADATA_SERVICE_TIMEOUT: 5
- AWS_METADATA_SERVICE_NUM_ATTEMPTS: 20
Expand Down
1 change: 1 addition & 0 deletions tests/end_to_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ func RunPluginEndToEndTest(t *testing.T, executor pluginCore.Plugin, template *i
tMeta.OnGetNamespace().Return("fake-development")
tMeta.OnGetLabels().Return(map[string]string{})
tMeta.OnGetAnnotations().Return(map[string]string{})
tMeta.OnIsInterruptible().Return(true)
tMeta.OnGetOwnerReference().Return(v12.OwnerReference{})
tMeta.OnGetOwnerID().Return(types.NamespacedName{
Namespace: "fake-development",
Expand Down

0 comments on commit 798212c

Please sign in to comment.